From 9218f205df5e55501fd29eb99b1f4ecf82675d18 Mon Sep 17 00:00:00 2001
From: Awiteb
Date: Mon, 29 Jul 2024 14:02:58 +0300
Subject: [PATCH] feat: Send chat requests and responses when the user is
online
Signed-off-by: Awiteb
---
.../oxidetalis/src/database/incoming_chat.rs | 115 +++++++++++++++---
.../src/websocket/handlers/chat_request.rs | 5 +-
crates/oxidetalis/src/websocket/mod.rs | 57 ++++++++-
3 files changed, 154 insertions(+), 23 deletions(-)
diff --git a/crates/oxidetalis/src/database/incoming_chat.rs b/crates/oxidetalis/src/database/incoming_chat.rs
index 72ebaa0..fe0c3f5 100644
--- a/crates/oxidetalis/src/database/incoming_chat.rs
+++ b/crates/oxidetalis/src/database/incoming_chat.rs
@@ -32,6 +32,25 @@ pub trait IncomingChatExt {
chat_request_sender: &PublicKey,
) -> ServerResult<()>;
+ /// Returns all incoming chat requests for the given recipient
+ async fn get_all_chat_requests(
+ &self,
+ chat_request_recipient: &UserModel,
+ ) -> ServerResult>;
+
+ /// Save the incoming chat response
+ async fn save_in_chat_response(
+ &self,
+ chat_response_recipient: &UserModel,
+ chat_response_sender: &PublicKey,
+ accepted_response: bool,
+ ) -> ServerResult<()>;
+
+ /// Returns all incoming chat responses for the given recipient
+ async fn get_all_chat_responses(
+ &self,
+ chat_response_recipient: &UserModel,
+ ) -> ServerResult>;
}
impl IncomingChatExt for DatabaseConnection {
@@ -41,23 +60,83 @@ impl IncomingChatExt for DatabaseConnection {
chat_request_recipient: &UserModel,
chat_request_sender: &PublicKey,
) -> ServerResult<()> {
- IncomingChatEntity::insert(IncomingChatActiveModel {
- recipient_id: Set(chat_request_recipient.id),
- sender: Set(*chat_request_sender),
- received_timestamp: Set(Utc::now()),
- ..Default::default()
- })
- .on_conflict(
- OnConflict::columns([
- IncomingChatColumn::RecipientId,
- IncomingChatColumn::Sender,
- IncomingChatColumn::AcceptedResponse,
- ])
- .do_nothing()
- .to_owned(),
- )
- .exec(self)
- .await?;
- Ok(())
+ save(self, chat_request_recipient, chat_request_sender, None).await
}
+ async fn get_all_chat_requests(
+ &self,
+ chat_request_recipient: &UserModel,
+ ) -> ServerResult> {
+ get_all::(self, chat_request_recipient).await
+ }
+
+ #[logcall::logcall]
+ async fn save_in_chat_response(
+ &self,
+ chat_response_recipient: &UserModel,
+ chat_response_sender: &PublicKey,
+ accepted_response: bool,
+ ) -> ServerResult<()> {
+ save(
+ self,
+ chat_response_recipient,
+ chat_response_sender,
+ Some(accepted_response),
+ )
+ .await
+ }
+
+ async fn get_all_chat_responses(
+ &self,
+ chat_response_recipient: &UserModel,
+ ) -> ServerResult> {
+ get_all::(self, chat_response_recipient).await
+ }
+}
+
+/// Utility function to save incoming chat request or response
+async fn save(
+ db: &DatabaseConnection,
+ recipient: &UserModel,
+ sender: &PublicKey,
+ accepted_response: Option,
+) -> ServerResult<()> {
+ IncomingChatEntity::insert(IncomingChatActiveModel {
+ recipient_id: Set(recipient.id),
+ sender: Set(*sender),
+ received_timestamp: Set(Utc::now()),
+ accepted_response: Set(accepted_response),
+ ..Default::default()
+ })
+ .on_conflict(
+ OnConflict::columns([
+ IncomingChatColumn::RecipientId,
+ IncomingChatColumn::Sender,
+ IncomingChatColumn::AcceptedResponse,
+ ])
+ .do_nothing()
+ .to_owned(),
+ )
+ .exec(db)
+ .await?;
+ Ok(())
+}
+
+/// Utility function to get all incoming chat requests or responses
+async fn get_all(
+ db: &DatabaseConnection,
+ recipient: &UserModel,
+) -> ServerResult> {
+ recipient
+ .find_related(IncomingChatEntity)
+ .filter(
+ if IS_REQUEST {
+ IncomingChatColumn::AcceptedResponse.is_null()
+ } else {
+ IncomingChatColumn::AcceptedResponse.is_not_null()
+ },
+ )
+ .all(db)
+ .await
+ .map_err(Into::into)
+}
diff --git a/crates/oxidetalis/src/websocket/handlers/chat_request.rs b/crates/oxidetalis/src/websocket/handlers/chat_request.rs
index 8a9914b..7957dcf 100644
--- a/crates/oxidetalis/src/websocket/handlers/chat_request.rs
+++ b/crates/oxidetalis/src/websocket/handlers/chat_request.rs
@@ -141,8 +141,9 @@ pub async fn handle_chat_response(
)
.await;
} else {
- // TODO: Create a table for chat request responses, and send them when
- // the user logs in
+ try_ws!(Some
+ db.save_in_chat_response(&response_recipient, &response_sender.public_key, accepted).await
+ );
}
None
diff --git a/crates/oxidetalis/src/websocket/mod.rs b/crates/oxidetalis/src/websocket/mod.rs
index 84b3396..20ea8bb 100644
--- a/crates/oxidetalis/src/websocket/mod.rs
+++ b/crates/oxidetalis/src/websocket/mod.rs
@@ -45,7 +45,7 @@ pub use events::*;
use uuid::Uuid;
use crate::{
- database::UserTableExt,
+ database::{IncomingChatExt, UserTableExt},
extensions::{DepotExt, OnlineUsersExt},
middlewares,
nonce::NonceCache,
@@ -145,8 +145,9 @@ async fn handle_socket(
.await;
log::info!("New user connected: ConnId(={conn_id}) PublicKey(={user_public_key})");
- // TODO: Send the incoming chat request to the user, while they are offline.
- // This after adding last_login col to the user table
+ if let Some(server_user) = &user {
+ send_chat_requests_and_responses(&db_conn, &user_shared_secret, &sender, server_user).await;
+ }
while let Some(Ok(msg)) = user_ws_receiver.next().await {
match handle_ws_msg(msg, &nonce_cache, &user_shared_secret).await {
@@ -177,6 +178,56 @@ async fn handle_socket(
user_disconnected(&db_conn, &conn_id, &user_public_key, user).await;
}
+/// Send the incoming chat requests and responses to the user while they were
+/// offline
+///
+/// ### Note
+/// The errors are ignored, if there is an issue with user connection, the user
+/// will be disconnected after this function is called.
+///
+/// Also we make sure that the user receives the chat requests and responses
+/// before we delete them from the database.
+async fn send_chat_requests_and_responses(
+ db_conn: &DatabaseConnection,
+ user_shared_secret: &[u8; 32],
+ sender: &mpsc::UnboundedSender>,
+ server_user: &UserModel,
+) {
+ let Ok(requests) = db_conn.get_all_chat_requests(server_user).await else {
+ return;
+ };
+ let Ok(responses) = db_conn.get_all_chat_responses(server_user).await else {
+ return;
+ };
+
+ for request in requests {
+ if sender
+ .unbounded_send(Ok(ServerEvent::chat_request(&request.sender)
+ .sign(user_shared_secret)
+ .as_ref()
+ .into()))
+ .is_ok()
+ {
+ let _ = request.delete(db_conn).await;
+ }
+ }
+
+ for response in responses {
+ if sender
+ .unbounded_send(Ok(ServerEvent::chat_request_response(
+ response.sender,
+ response.accepted_response.unwrap_or_default(),
+ )
+ .sign(user_shared_secret)
+ .as_ref()
+ .into()))
+ .is_ok()
+ {
+ let _ = response.delete(db_conn).await;
+ }
+ }
+}
+
/// Handle websocket msg
async fn handle_ws_msg(
msg: Message,