From cd2a9ea03ef08000401fd0a9807cfa36301d48f9 Mon Sep 17 00:00:00 2001 From: Awiteb Date: Fri, 5 Jul 2024 02:17:10 +0300 Subject: [PATCH] feat: New extension trait for websocket online users Signed-off-by: Awiteb --- crates/oxidetalis/src/extensions.rs | 69 ++++++++++++++++++++++++++++- 1 file changed, 67 insertions(+), 2 deletions(-) diff --git a/crates/oxidetalis/src/extensions.rs b/crates/oxidetalis/src/extensions.rs index 9e6b2e0..7cc2c27 100644 --- a/crates/oxidetalis/src/extensions.rs +++ b/crates/oxidetalis/src/extensions.rs @@ -18,10 +18,16 @@ use std::sync::Arc; use chrono::Utc; use oxidetalis_config::Config; -use salvo::Depot; +use rayon::iter::{IntoParallelRefMutIterator, ParallelIterator}; +use salvo::{websocket::Message, Depot}; use sea_orm::DatabaseConnection; +use uuid::Uuid; -use crate::{routes::DEPOT_NONCE_CACHE_SIZE, NonceCache}; +use crate::{ + routes::DEPOT_NONCE_CACHE_SIZE, + websocket::{OnlineUsers, ServerEvent, SocketUserData}, + NonceCache, +}; /// Extension trait for the Depot. pub trait DepotExt { @@ -42,6 +48,24 @@ pub trait NonceCacheExt { fn add_nonce(&self, nonce: &[u8; 16], limit: &usize) -> bool; } +/// Extension trait for online websocket users +pub trait OnlineUsersExt { + /// Add new user to the online users + async fn add_user(&self, conn_id: &Uuid, data: SocketUserData); + + /// Remove user from online users + async fn remove_user(&self, conn_id: &Uuid); + + /// Ping all online users + async fn ping_all(&self); + + /// Update user pong at time + async fn update_pong(&self, conn_id: &Uuid); + + /// Disconnect inactive users (who not respond for the ping event) + async fn disconnect_inactive_users(&self); +} + impl DepotExt for Depot { fn db_conn(&self) -> &DatabaseConnection { self.obtain::>() @@ -91,3 +115,44 @@ impl NonceCacheExt for &NonceCache { true } } + +impl OnlineUsersExt for OnlineUsers { + async fn add_user(&self, conn_id: &Uuid, data: SocketUserData) { + self.write().await.insert(*conn_id, data); + } + + async fn remove_user(&self, conn_id: &Uuid) { + self.write().await.remove(conn_id); + } + + async fn ping_all(&self) { + let now = Utc::now(); + self.write().await.par_iter_mut().for_each(|(_, u)| { + u.pinged_at = now; + let _ = u.sender.unbounded_send(Ok(Message::from( + &ServerEvent::ping().sign(&u.shared_secret), + ))); + }); + } + + async fn update_pong(&self, conn_id: &Uuid) { + if let Some(user) = self.write().await.get_mut(conn_id) { + user.ponged_at = Utc::now() + } + } + + async fn disconnect_inactive_users(&self) { + let now = Utc::now().timestamp(); + let is_inactive = + |u: &SocketUserData| u.pinged_at > u.ponged_at && now - u.pinged_at.timestamp() >= 5; + self.read() + .await + .iter() + .filter(|(_, u)| is_inactive(u)) + .for_each(|(_, u)| { + log::info!("Disconnected from {}, inactive", u.public_key); + u.sender.close_channel() + }); + self.write().await.retain(|_, u| !is_inactive(u)); + } +}