feat: Initialize server websocket #8

Merged
awiteb merged 4 commits from awiteb/init-websocket into master 2024-07-06 13:35:10 +02:00 AGit
Showing only changes of commit cd2a9ea03e - Show all commits

View file

@ -18,10 +18,16 @@ use std::sync::Arc;
use chrono::Utc; use chrono::Utc;
use oxidetalis_config::Config; use oxidetalis_config::Config;
use salvo::Depot; use rayon::iter::{IntoParallelRefMutIterator, ParallelIterator};
use salvo::{websocket::Message, Depot};
use sea_orm::DatabaseConnection; use sea_orm::DatabaseConnection;
use uuid::Uuid;
use crate::{routes::DEPOT_NONCE_CACHE_SIZE, NonceCache}; use crate::{
routes::DEPOT_NONCE_CACHE_SIZE,
websocket::{OnlineUsers, ServerEvent, SocketUserData},
NonceCache,
};
/// Extension trait for the Depot. /// Extension trait for the Depot.
pub trait DepotExt { pub trait DepotExt {
@ -42,6 +48,24 @@ pub trait NonceCacheExt {
fn add_nonce(&self, nonce: &[u8; 16], limit: &usize) -> bool; fn add_nonce(&self, nonce: &[u8; 16], limit: &usize) -> bool;
} }
/// Extension trait for online websocket users
pub trait OnlineUsersExt {
/// Add new user to the online users
async fn add_user(&self, conn_id: &Uuid, data: SocketUserData);
/// Remove user from online users
async fn remove_user(&self, conn_id: &Uuid);
/// Ping all online users
async fn ping_all(&self);
/// Update user pong at time
async fn update_pong(&self, conn_id: &Uuid);
/// Disconnect inactive users (who not respond for the ping event)
async fn disconnect_inactive_users(&self);
}
impl DepotExt for Depot { impl DepotExt for Depot {
fn db_conn(&self) -> &DatabaseConnection { fn db_conn(&self) -> &DatabaseConnection {
self.obtain::<Arc<DatabaseConnection>>() self.obtain::<Arc<DatabaseConnection>>()
@ -91,3 +115,44 @@ impl NonceCacheExt for &NonceCache {
true true
} }
} }
impl OnlineUsersExt for OnlineUsers {
async fn add_user(&self, conn_id: &Uuid, data: SocketUserData) {
self.write().await.insert(*conn_id, data);
}
async fn remove_user(&self, conn_id: &Uuid) {
self.write().await.remove(conn_id);
}
async fn ping_all(&self) {
let now = Utc::now();
self.write().await.par_iter_mut().for_each(|(_, u)| {
u.pinged_at = now;
let _ = u.sender.unbounded_send(Ok(Message::from(
awiteb marked this conversation as resolved
Review

Error is silently being ignored here

Error is silently being ignored here
Review

It is ok, because while the connection is in the online list, that mean the user is there, we will enter the user_disconnected function if there is a connection problem with the user.

It is ok, because while the connection is in the online list, that mean the user is there, we will enter the `user_disconnected` function if there is a connection problem with the user.
&ServerEvent::ping().sign(&u.shared_secret),
)));
});
}
async fn update_pong(&self, conn_id: &Uuid) {
if let Some(user) = self.write().await.get_mut(conn_id) {
user.ponged_at = Utc::now()
}
}
async fn disconnect_inactive_users(&self) {
let now = Utc::now().timestamp();
let is_inactive =
|u: &SocketUserData| u.pinged_at > u.ponged_at && now - u.pinged_at.timestamp() >= 5;
awiteb marked this conversation as resolved
Review

not sure if this >= 5 is intentional, since if someone set the ping time to be very short it will always return false.

Good to be in sync with the code in websocket/mod.rs or just remove this check, since pinged_at > ponged_at should be enough

not sure if this `>= 5` is intentional, since if someone set the ping time to be very short it will always return `false`. Good to be in sync with the code in `websocket/mod.rs` or just remove this check, since `pinged_at > ponged_at` should be enough
Review

since pinged_at > ponged_at should be enough

Agree, because we already sleep in the ping loop

> since pinged_at > ponged_at should be enough Agree, because we already sleep in the ping loop
self.read()
.await
.iter()
.filter(|(_, u)| is_inactive(u))
.for_each(|(_, u)| {
log::info!("Disconnected from {}, inactive", u.public_key);
u.sender.close_channel()
});
self.write().await.retain(|_, u| !is_inactive(u));
awiteb marked this conversation as resolved
Review

both locks can be replaced by one

self.write().await.retain(|_, u| {
    if is_inactive(u) {
        u.sender.close_channel();
        false
    } else {
        true
    }
})
both locks can be replaced by one ```rust self.write().await.retain(|_, u| { if is_inactive(u) { u.sender.close_channel(); false } else { true } }) ```
}
}