diff --git a/crates/oxidetalis/src/database/incoming_chat.rs b/crates/oxidetalis/src/database/incoming_chat.rs index 72ebaa0..fe0c3f5 100644 --- a/crates/oxidetalis/src/database/incoming_chat.rs +++ b/crates/oxidetalis/src/database/incoming_chat.rs @@ -32,6 +32,25 @@ pub trait IncomingChatExt { chat_request_sender: &PublicKey, ) -> ServerResult<()>; + /// Returns all incoming chat requests for the given recipient + async fn get_all_chat_requests( + &self, + chat_request_recipient: &UserModel, + ) -> ServerResult>; + + /// Save the incoming chat response + async fn save_in_chat_response( + &self, + chat_response_recipient: &UserModel, + chat_response_sender: &PublicKey, + accepted_response: bool, + ) -> ServerResult<()>; + + /// Returns all incoming chat responses for the given recipient + async fn get_all_chat_responses( + &self, + chat_response_recipient: &UserModel, + ) -> ServerResult>; } impl IncomingChatExt for DatabaseConnection { @@ -41,23 +60,83 @@ impl IncomingChatExt for DatabaseConnection { chat_request_recipient: &UserModel, chat_request_sender: &PublicKey, ) -> ServerResult<()> { - IncomingChatEntity::insert(IncomingChatActiveModel { - recipient_id: Set(chat_request_recipient.id), - sender: Set(*chat_request_sender), - received_timestamp: Set(Utc::now()), - ..Default::default() - }) - .on_conflict( - OnConflict::columns([ - IncomingChatColumn::RecipientId, - IncomingChatColumn::Sender, - IncomingChatColumn::AcceptedResponse, - ]) - .do_nothing() - .to_owned(), - ) - .exec(self) - .await?; - Ok(()) + save(self, chat_request_recipient, chat_request_sender, None).await } + async fn get_all_chat_requests( + &self, + chat_request_recipient: &UserModel, + ) -> ServerResult> { + get_all::(self, chat_request_recipient).await + } + + #[logcall::logcall] + async fn save_in_chat_response( + &self, + chat_response_recipient: &UserModel, + chat_response_sender: &PublicKey, + accepted_response: bool, + ) -> ServerResult<()> { + save( + self, + chat_response_recipient, + chat_response_sender, + Some(accepted_response), + ) + .await + } + + async fn get_all_chat_responses( + &self, + chat_response_recipient: &UserModel, + ) -> ServerResult> { + get_all::(self, chat_response_recipient).await + } +} + +/// Utility function to save incoming chat request or response +async fn save( + db: &DatabaseConnection, + recipient: &UserModel, + sender: &PublicKey, + accepted_response: Option, +) -> ServerResult<()> { + IncomingChatEntity::insert(IncomingChatActiveModel { + recipient_id: Set(recipient.id), + sender: Set(*sender), + received_timestamp: Set(Utc::now()), + accepted_response: Set(accepted_response), + ..Default::default() + }) + .on_conflict( + OnConflict::columns([ + IncomingChatColumn::RecipientId, + IncomingChatColumn::Sender, + IncomingChatColumn::AcceptedResponse, + ]) + .do_nothing() + .to_owned(), + ) + .exec(db) + .await?; + Ok(()) +} + +/// Utility function to get all incoming chat requests or responses +async fn get_all( + db: &DatabaseConnection, + recipient: &UserModel, +) -> ServerResult> { + recipient + .find_related(IncomingChatEntity) + .filter( + if IS_REQUEST { + IncomingChatColumn::AcceptedResponse.is_null() + } else { + IncomingChatColumn::AcceptedResponse.is_not_null() + }, + ) + .all(db) + .await + .map_err(Into::into) +} diff --git a/crates/oxidetalis/src/websocket/handlers/chat_request.rs b/crates/oxidetalis/src/websocket/handlers/chat_request.rs index 8a9914b..7957dcf 100644 --- a/crates/oxidetalis/src/websocket/handlers/chat_request.rs +++ b/crates/oxidetalis/src/websocket/handlers/chat_request.rs @@ -141,8 +141,9 @@ pub async fn handle_chat_response( ) .await; } else { - // TODO: Create a table for chat request responses, and send them when - // the user logs in + try_ws!(Some + db.save_in_chat_response(&response_recipient, &response_sender.public_key, accepted).await + ); } None diff --git a/crates/oxidetalis/src/websocket/mod.rs b/crates/oxidetalis/src/websocket/mod.rs index 84b3396..20ea8bb 100644 --- a/crates/oxidetalis/src/websocket/mod.rs +++ b/crates/oxidetalis/src/websocket/mod.rs @@ -45,7 +45,7 @@ pub use events::*; use uuid::Uuid; use crate::{ - database::UserTableExt, + database::{IncomingChatExt, UserTableExt}, extensions::{DepotExt, OnlineUsersExt}, middlewares, nonce::NonceCache, @@ -145,8 +145,9 @@ async fn handle_socket( .await; log::info!("New user connected: ConnId(={conn_id}) PublicKey(={user_public_key})"); - // TODO: Send the incoming chat request to the user, while they are offline. - // This after adding last_login col to the user table + if let Some(server_user) = &user { + send_chat_requests_and_responses(&db_conn, &user_shared_secret, &sender, server_user).await; + } while let Some(Ok(msg)) = user_ws_receiver.next().await { match handle_ws_msg(msg, &nonce_cache, &user_shared_secret).await { @@ -177,6 +178,56 @@ async fn handle_socket( user_disconnected(&db_conn, &conn_id, &user_public_key, user).await; } +/// Send the incoming chat requests and responses to the user while they were +/// offline +/// +/// ### Note +/// The errors are ignored, if there is an issue with user connection, the user +/// will be disconnected after this function is called. +/// +/// Also we make sure that the user receives the chat requests and responses +/// before we delete them from the database. +async fn send_chat_requests_and_responses( + db_conn: &DatabaseConnection, + user_shared_secret: &[u8; 32], + sender: &mpsc::UnboundedSender>, + server_user: &UserModel, +) { + let Ok(requests) = db_conn.get_all_chat_requests(server_user).await else { + return; + }; + let Ok(responses) = db_conn.get_all_chat_responses(server_user).await else { + return; + }; + + for request in requests { + if sender + .unbounded_send(Ok(ServerEvent::chat_request(&request.sender) + .sign(user_shared_secret) + .as_ref() + .into())) + .is_ok() + { + let _ = request.delete(db_conn).await; + } + } + + for response in responses { + if sender + .unbounded_send(Ok(ServerEvent::chat_request_response( + response.sender, + response.accepted_response.unwrap_or_default(), + ) + .sign(user_shared_secret) + .as_ref() + .into())) + .is_ok() + { + let _ = response.delete(db_conn).await; + } + } +} + /// Handle websocket msg async fn handle_ws_msg( msg: Message,