refactor: Refactor in_chat_requests table #35

Manually merged
awiteb merged 7 commits from awiteb/rename-chat-request-table-and-save-respnse-in-it into master 2024-07-30 10:39:04 +02:00 AGit
3 changed files with 154 additions and 23 deletions
Showing only changes of commit 9218f205df - Show all commits

View file

@ -32,6 +32,25 @@ pub trait IncomingChatExt {
chat_request_sender: &PublicKey,
) -> ServerResult<()>;
/// Returns all incoming chat requests for the given recipient
async fn get_all_chat_requests(
&self,
chat_request_recipient: &UserModel,
) -> ServerResult<Vec<IncomingChatModel>>;
/// Save the incoming chat response
async fn save_in_chat_response(
&self,
chat_response_recipient: &UserModel,
chat_response_sender: &PublicKey,
accepted_response: bool,
) -> ServerResult<()>;
/// Returns all incoming chat responses for the given recipient
async fn get_all_chat_responses(
&self,
chat_response_recipient: &UserModel,
) -> ServerResult<Vec<IncomingChatModel>>;
}
impl IncomingChatExt for DatabaseConnection {
@ -41,23 +60,83 @@ impl IncomingChatExt for DatabaseConnection {
chat_request_recipient: &UserModel,
chat_request_sender: &PublicKey,
) -> ServerResult<()> {
IncomingChatEntity::insert(IncomingChatActiveModel {
recipient_id: Set(chat_request_recipient.id),
sender: Set(*chat_request_sender),
received_timestamp: Set(Utc::now()),
..Default::default()
})
.on_conflict(
OnConflict::columns([
IncomingChatColumn::RecipientId,
IncomingChatColumn::Sender,
IncomingChatColumn::AcceptedResponse,
])
.do_nothing()
.to_owned(),
)
.exec(self)
.await?;
Ok(())
save(self, chat_request_recipient, chat_request_sender, None).await
}
async fn get_all_chat_requests(
&self,
chat_request_recipient: &UserModel,
) -> ServerResult<Vec<IncomingChatModel>> {
get_all::<true>(self, chat_request_recipient).await
}
#[logcall::logcall]
async fn save_in_chat_response(
&self,
chat_response_recipient: &UserModel,
chat_response_sender: &PublicKey,
accepted_response: bool,
) -> ServerResult<()> {
save(
self,
chat_response_recipient,
chat_response_sender,
Some(accepted_response),
)
.await
}
async fn get_all_chat_responses(
&self,
chat_response_recipient: &UserModel,
) -> ServerResult<Vec<IncomingChatModel>> {
get_all::<false>(self, chat_response_recipient).await
}
}
/// Utility function to save incoming chat request or response
async fn save(
db: &DatabaseConnection,
recipient: &UserModel,
sender: &PublicKey,
accepted_response: Option<bool>,
) -> ServerResult<()> {
IncomingChatEntity::insert(IncomingChatActiveModel {
recipient_id: Set(recipient.id),
sender: Set(*sender),
received_timestamp: Set(Utc::now()),
accepted_response: Set(accepted_response),
..Default::default()
})
.on_conflict(
OnConflict::columns([
IncomingChatColumn::RecipientId,
IncomingChatColumn::Sender,
IncomingChatColumn::AcceptedResponse,
])
.do_nothing()
.to_owned(),
)
.exec(db)
.await?;
Ok(())
}
/// Utility function to get all incoming chat requests or responses
async fn get_all<const IS_REQUEST: bool>(
db: &DatabaseConnection,
recipient: &UserModel,
) -> ServerResult<Vec<IncomingChatModel>> {
recipient
.find_related(IncomingChatEntity)
.filter(
if IS_REQUEST {
IncomingChatColumn::AcceptedResponse.is_null()
} else {
IncomingChatColumn::AcceptedResponse.is_not_null()
},
)
.all(db)
.await
.map_err(Into::into)
}

View file

@ -141,8 +141,9 @@ pub async fn handle_chat_response(
)
.await;
} else {
// TODO: Create a table for chat request responses, and send them when
// the user logs in
try_ws!(Some
db.save_in_chat_response(&response_recipient, &response_sender.public_key, accepted).await
);
}
None

View file

@ -45,7 +45,7 @@ pub use events::*;
use uuid::Uuid;
use crate::{
database::UserTableExt,
database::{IncomingChatExt, UserTableExt},
extensions::{DepotExt, OnlineUsersExt},
middlewares,
nonce::NonceCache,
@ -145,8 +145,9 @@ async fn handle_socket(
.await;
log::info!("New user connected: ConnId(={conn_id}) PublicKey(={user_public_key})");
// TODO: Send the incoming chat request to the user, while they are offline.
// This after adding last_login col to the user table
if let Some(server_user) = &user {
send_chat_requests_and_responses(&db_conn, &user_shared_secret, &sender, server_user).await;
}
while let Some(Ok(msg)) = user_ws_receiver.next().await {
match handle_ws_msg(msg, &nonce_cache, &user_shared_secret).await {
@ -177,6 +178,56 @@ async fn handle_socket(
user_disconnected(&db_conn, &conn_id, &user_public_key, user).await;
}
/// Send the incoming chat requests and responses to the user while they were
/// offline
///
/// ### Note
/// The errors are ignored, if there is an issue with user connection, the user
/// will be disconnected after this function is called.
///
/// Also we make sure that the user receives the chat requests and responses
/// before we delete them from the database.
async fn send_chat_requests_and_responses(
db_conn: &DatabaseConnection,
user_shared_secret: &[u8; 32],
sender: &mpsc::UnboundedSender<Result<Message, salvo::Error>>,
server_user: &UserModel,
) {
let Ok(requests) = db_conn.get_all_chat_requests(server_user).await else {
return;
};
let Ok(responses) = db_conn.get_all_chat_responses(server_user).await else {
return;
};
for request in requests {
if sender
.unbounded_send(Ok(ServerEvent::chat_request(&request.sender)
.sign(user_shared_secret)
.as_ref()
.into()))
.is_ok()
{
let _ = request.delete(db_conn).await;
}
}
for response in responses {
if sender
.unbounded_send(Ok(ServerEvent::chat_request_response(
response.sender,
response.accepted_response.unwrap_or_default(),
)
.sign(user_shared_secret)
.as_ref()
.into()))
.is_ok()
{
let _ = response.delete(db_conn).await;
}
}
}
/// Handle websocket msg
async fn handle_ws_msg(
msg: Message,