diff --git a/crates/oxidetalis/src/websocket/mod.rs b/crates/oxidetalis/src/websocket/mod.rs index ed4a901..8607236 100644 --- a/crates/oxidetalis/src/websocket/mod.rs +++ b/crates/oxidetalis/src/websocket/mod.rs @@ -21,6 +21,7 @@ use errors::{WsError, WsResult}; use futures::{channel::mpsc, FutureExt, StreamExt, TryStreamExt}; use once_cell::sync::Lazy; use oxidetalis_core::{cipher::K256Secret, types::PublicKey}; +use oxidetalis_entities::prelude::*; use salvo::{ handler, http::StatusError, @@ -30,15 +31,18 @@ use salvo::{ Response, Router, }; +use sea_orm::DatabaseConnection; use tokio::{sync::RwLock, task::spawn as tokio_spawn, time::sleep as tokio_sleep}; -mod errors; +pub mod errors; mod events; +mod handlers; pub use events::*; use uuid::Uuid; use crate::{ + database::UserTableExt, extensions::{DepotExt, OnlineUsersExt}, middlewares, nonce::NonceCache, @@ -92,6 +96,7 @@ pub async fn user_connected( depot: &Depot, ) -> Result<(), StatusError> { let nonce_cache = depot.nonce_cache(); + let db_conn = depot.db_conn(); let public_key = utils::extract_public_key(req).expect("The public key was checked in the middleware"); // FIXME: The config should hold `K256Secret` not `PrivateKey` @@ -100,7 +105,7 @@ pub async fn user_connected( WebSocketUpgrade::new() .upgrade(req, res, move |ws| { - handle_socket(ws, nonce_cache, public_key, shared_secret) + handle_socket(ws, db_conn, nonce_cache, public_key, shared_secret) }) .await } @@ -108,6 +113,7 @@ pub async fn user_connected( /// Handle the websocket connection async fn handle_socket( ws: WebSocket, + db_conn: Arc, nonce_cache: Arc, user_public_key: PublicKey, user_shared_secret: [u8; 32], @@ -123,15 +129,31 @@ async fn handle_socket( }); tokio_spawn(fut); let conn_id = Uuid::new_v4(); - let user = SocketUserData::new(user_public_key, user_shared_secret, sender.clone()); - ONLINE_USERS.add_user(&conn_id, user).await; + let Ok(user) = db_conn.get_user_by_pubk(&user_public_key).await else { + let _ = sender.unbounded_send(Ok(ServerEvent::from(WsError::InternalServerError) + .sign(&user_shared_secret) + .as_ref() + .into())); + return; + }; + ONLINE_USERS + .add_user( + &conn_id, + SocketUserData::new(user_public_key, user_shared_secret, sender.clone()), + ) + .await; log::info!("New user connected: ConnId(={conn_id}) PublicKey(={user_public_key})"); + // TODO: Send the incoming chat request to the user, while they are offline. + // This after adding last_login col to the user table + let fut = async move { while let Some(Ok(msg)) = user_ws_receiver.next().await { match handle_ws_msg(msg, &nonce_cache, &user_shared_secret).await { Ok(event) => { - if let Some(server_event) = handle_events(event, &conn_id).await { + if let Some(server_event) = + handle_events(event, &db_conn, &conn_id, user.as_ref()).await + { if let Err(err) = sender.unbounded_send(Ok(server_event .sign(&user_shared_secret) .as_ref() @@ -182,13 +204,21 @@ async fn handle_ws_msg( } /// Handle user events, and return the server event if needed -async fn handle_events(event: ClientEvent, conn_id: &Uuid) -> Option> { +async fn handle_events( + event: ClientEvent, + db: &DatabaseConnection, + conn_id: &Uuid, + user: Option<&UserModel>, +) -> Option> { match &event.event { ClientEventType::Ping { .. } => Some(ServerEvent::pong()), ClientEventType::Pong { .. } => { ONLINE_USERS.update_pong(conn_id).await; None } + ClientEventType::ChatRequest { to } => { + Some(handlers::handle_chat_request(db, user, to).await) + } } }