From 927a345802871092d50b496dc18c28090780d4bf Mon Sep 17 00:00:00 2001
From: Awiteb
Date: Fri, 12 Jul 2024 23:15:33 +0300
Subject: [PATCH] feat: Handle `ChatRequest` event
Signed-off-by: Awiteb
---
crates/oxidetalis/src/websocket/mod.rs | 42 ++++++++++++++++++++++----
1 file changed, 36 insertions(+), 6 deletions(-)
diff --git a/crates/oxidetalis/src/websocket/mod.rs b/crates/oxidetalis/src/websocket/mod.rs
index ed4a901..8607236 100644
--- a/crates/oxidetalis/src/websocket/mod.rs
+++ b/crates/oxidetalis/src/websocket/mod.rs
@@ -21,6 +21,7 @@ use errors::{WsError, WsResult};
use futures::{channel::mpsc, FutureExt, StreamExt, TryStreamExt};
use once_cell::sync::Lazy;
use oxidetalis_core::{cipher::K256Secret, types::PublicKey};
+use oxidetalis_entities::prelude::*;
use salvo::{
handler,
http::StatusError,
@@ -30,15 +31,18 @@ use salvo::{
Response,
Router,
};
+use sea_orm::DatabaseConnection;
use tokio::{sync::RwLock, task::spawn as tokio_spawn, time::sleep as tokio_sleep};
-mod errors;
+pub mod errors;
mod events;
+mod handlers;
pub use events::*;
use uuid::Uuid;
use crate::{
+ database::UserTableExt,
extensions::{DepotExt, OnlineUsersExt},
middlewares,
nonce::NonceCache,
@@ -92,6 +96,7 @@ pub async fn user_connected(
depot: &Depot,
) -> Result<(), StatusError> {
let nonce_cache = depot.nonce_cache();
+ let db_conn = depot.db_conn();
let public_key =
utils::extract_public_key(req).expect("The public key was checked in the middleware");
// FIXME: The config should hold `K256Secret` not `PrivateKey`
@@ -100,7 +105,7 @@ pub async fn user_connected(
WebSocketUpgrade::new()
.upgrade(req, res, move |ws| {
- handle_socket(ws, nonce_cache, public_key, shared_secret)
+ handle_socket(ws, db_conn, nonce_cache, public_key, shared_secret)
})
.await
}
@@ -108,6 +113,7 @@ pub async fn user_connected(
/// Handle the websocket connection
async fn handle_socket(
ws: WebSocket,
+ db_conn: Arc,
nonce_cache: Arc,
user_public_key: PublicKey,
user_shared_secret: [u8; 32],
@@ -123,15 +129,31 @@ async fn handle_socket(
});
tokio_spawn(fut);
let conn_id = Uuid::new_v4();
- let user = SocketUserData::new(user_public_key, user_shared_secret, sender.clone());
- ONLINE_USERS.add_user(&conn_id, user).await;
+ let Ok(user) = db_conn.get_user_by_pubk(&user_public_key).await else {
+ let _ = sender.unbounded_send(Ok(ServerEvent::from(WsError::InternalServerError)
+ .sign(&user_shared_secret)
+ .as_ref()
+ .into()));
+ return;
+ };
+ ONLINE_USERS
+ .add_user(
+ &conn_id,
+ SocketUserData::new(user_public_key, user_shared_secret, sender.clone()),
+ )
+ .await;
log::info!("New user connected: ConnId(={conn_id}) PublicKey(={user_public_key})");
+ // TODO: Send the incoming chat request to the user, while they are offline.
+ // This after adding last_login col to the user table
+
let fut = async move {
while let Some(Ok(msg)) = user_ws_receiver.next().await {
match handle_ws_msg(msg, &nonce_cache, &user_shared_secret).await {
Ok(event) => {
- if let Some(server_event) = handle_events(event, &conn_id).await {
+ if let Some(server_event) =
+ handle_events(event, &db_conn, &conn_id, user.as_ref()).await
+ {
if let Err(err) = sender.unbounded_send(Ok(server_event
.sign(&user_shared_secret)
.as_ref()
@@ -182,13 +204,21 @@ async fn handle_ws_msg(
}
/// Handle user events, and return the server event if needed
-async fn handle_events(event: ClientEvent, conn_id: &Uuid) -> Option> {
+async fn handle_events(
+ event: ClientEvent,
+ db: &DatabaseConnection,
+ conn_id: &Uuid,
+ user: Option<&UserModel>,
+) -> Option> {
match &event.event {
ClientEventType::Ping { .. } => Some(ServerEvent::pong()),
ClientEventType::Pong { .. } => {
ONLINE_USERS.update_pong(conn_id).await;
None
}
+ ClientEventType::ChatRequest { to } => {
+ Some(handlers::handle_chat_request(db, user, to).await)
+ }
}
}