feat: Chat request implementation #14
1 changed files with 36 additions and 6 deletions
|
@ -21,6 +21,7 @@ use errors::{WsError, WsResult};
|
|||
use futures::{channel::mpsc, FutureExt, StreamExt, TryStreamExt};
|
||||
use once_cell::sync::Lazy;
|
||||
use oxidetalis_core::{cipher::K256Secret, types::PublicKey};
|
||||
use oxidetalis_entities::prelude::*;
|
||||
use salvo::{
|
||||
handler,
|
||||
http::StatusError,
|
||||
|
@ -30,15 +31,18 @@ use salvo::{
|
|||
Response,
|
||||
Router,
|
||||
};
|
||||
use sea_orm::DatabaseConnection;
|
||||
use tokio::{sync::RwLock, task::spawn as tokio_spawn, time::sleep as tokio_sleep};
|
||||
|
||||
mod errors;
|
||||
pub mod errors;
|
||||
mod events;
|
||||
mod handlers;
|
||||
|
||||
pub use events::*;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
database::UserTableExt,
|
||||
extensions::{DepotExt, OnlineUsersExt},
|
||||
middlewares,
|
||||
nonce::NonceCache,
|
||||
|
@ -92,6 +96,7 @@ pub async fn user_connected(
|
|||
depot: &Depot,
|
||||
) -> Result<(), StatusError> {
|
||||
let nonce_cache = depot.nonce_cache();
|
||||
let db_conn = depot.db_conn();
|
||||
let public_key =
|
||||
utils::extract_public_key(req).expect("The public key was checked in the middleware");
|
||||
// FIXME: The config should hold `K256Secret` not `PrivateKey`
|
||||
|
@ -100,7 +105,7 @@ pub async fn user_connected(
|
|||
|
||||
WebSocketUpgrade::new()
|
||||
.upgrade(req, res, move |ws| {
|
||||
handle_socket(ws, nonce_cache, public_key, shared_secret)
|
||||
handle_socket(ws, db_conn, nonce_cache, public_key, shared_secret)
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
@ -108,6 +113,7 @@ pub async fn user_connected(
|
|||
/// Handle the websocket connection
|
||||
async fn handle_socket(
|
||||
ws: WebSocket,
|
||||
db_conn: Arc<DatabaseConnection>,
|
||||
nonce_cache: Arc<NonceCache>,
|
||||
user_public_key: PublicKey,
|
||||
user_shared_secret: [u8; 32],
|
||||
|
@ -123,15 +129,31 @@ async fn handle_socket(
|
|||
});
|
||||
tokio_spawn(fut);
|
||||
let conn_id = Uuid::new_v4();
|
||||
let user = SocketUserData::new(user_public_key, user_shared_secret, sender.clone());
|
||||
ONLINE_USERS.add_user(&conn_id, user).await;
|
||||
let Ok(user) = db_conn.get_user_by_pubk(&user_public_key).await else {
|
||||
let _ = sender.unbounded_send(Ok(ServerEvent::from(WsError::InternalServerError)
|
||||
.sign(&user_shared_secret)
|
||||
.as_ref()
|
||||
.into()));
|
||||
return;
|
||||
};
|
||||
ONLINE_USERS
|
||||
.add_user(
|
||||
&conn_id,
|
||||
SocketUserData::new(user_public_key, user_shared_secret, sender.clone()),
|
||||
)
|
||||
.await;
|
||||
log::info!("New user connected: ConnId(={conn_id}) PublicKey(={user_public_key})");
|
||||
|
||||
// TODO: Send the incoming chat request to the user, while they are offline.
|
||||
// This after adding last_login col to the user table
|
||||
|
||||
let fut = async move {
|
||||
while let Some(Ok(msg)) = user_ws_receiver.next().await {
|
||||
match handle_ws_msg(msg, &nonce_cache, &user_shared_secret).await {
|
||||
Ok(event) => {
|
||||
if let Some(server_event) = handle_events(event, &conn_id).await {
|
||||
if let Some(server_event) =
|
||||
handle_events(event, &db_conn, &conn_id, user.as_ref()).await
|
||||
{
|
||||
if let Err(err) = sender.unbounded_send(Ok(server_event
|
||||
.sign(&user_shared_secret)
|
||||
.as_ref()
|
||||
|
@ -182,13 +204,21 @@ async fn handle_ws_msg(
|
|||
}
|
||||
|
||||
/// Handle user events, and return the server event if needed
|
||||
async fn handle_events(event: ClientEvent, conn_id: &Uuid) -> Option<ServerEvent<Unsigned>> {
|
||||
async fn handle_events(
|
||||
event: ClientEvent,
|
||||
db: &DatabaseConnection,
|
||||
conn_id: &Uuid,
|
||||
user: Option<&UserModel>,
|
||||
) -> Option<ServerEvent<Unsigned>> {
|
||||
match &event.event {
|
||||
ClientEventType::Ping { .. } => Some(ServerEvent::pong()),
|
||||
ClientEventType::Pong { .. } => {
|
||||
ONLINE_USERS.update_pong(conn_id).await;
|
||||
None
|
||||
}
|
||||
ClientEventType::ChatRequest { to } => {
|
||||
Some(handlers::handle_chat_request(db, user, to).await)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue