feat: Initialize server websocket #8

Merged
awiteb merged 4 commits from awiteb/init-websocket into master 2024-07-06 13:35:10 +02:00 AGit
8 changed files with 491 additions and 12 deletions
Showing only changes of commit c0d5efe0c3 - Show all commits

View file

@ -142,17 +142,14 @@ impl OnlineUsersExt for OnlineUsers {
}
async fn disconnect_inactive_users(&self) {
let now = Utc::now().timestamp();
let is_inactive =
|u: &SocketUserData| u.pinged_at > u.ponged_at && now - u.pinged_at.timestamp() >= 5;
self.read()
.await
.iter()
.filter(|(_, u)| is_inactive(u))
.for_each(|(_, u)| {
self.write().await.retain(|_, u| {
// if we send ping and the client doesn't send pong
if u.pinged_at > u.ponged_at {
awiteb marked this conversation as resolved
Review

not sure if this >= 5 is intentional, since if someone set the ping time to be very short it will always return false.

Good to be in sync with the code in websocket/mod.rs or just remove this check, since pinged_at > ponged_at should be enough

not sure if this `>= 5` is intentional, since if someone set the ping time to be very short it will always return `false`. Good to be in sync with the code in `websocket/mod.rs` or just remove this check, since `pinged_at > ponged_at` should be enough
Review

since pinged_at > ponged_at should be enough

Agree, because we already sleep in the ping loop

> since pinged_at > ponged_at should be enough Agree, because we already sleep in the ping loop
log::info!("Disconnected from {}, inactive", u.public_key);
u.sender.close_channel()
});
self.write().await.retain(|_, u| !is_inactive(u));
u.sender.close_channel();
return false;
}
true
});
}
}

View file

@ -30,6 +30,7 @@ mod middlewares;
mod routes;
mod schemas;
mod utils;
mod websocket;
/// Nonce cache type, used to store nonces for a certain amount of time
pub type NonceCache = Mutex<HashMap<[u8; 16], i64>>;

View file

@ -25,7 +25,7 @@ use salvo::rate_limiter::{BasicQuota, FixedGuard, MokaStore, RateLimiter, Remote
use salvo::{catcher::Catcher, logging::Logger, prelude::*};
use crate::schemas::MessageSchema;
use crate::{middlewares, NonceCache};
use crate::{middlewares, websocket, NonceCache};
mod user;
@ -141,6 +141,7 @@ pub fn service(conn: sea_orm::DatabaseConnection, config: &Config) -> Service {
let router = Router::new()
.push(Router::with_path("user").push(user::route()))
.push(Router::with_path("ws").push(websocket::route()))
.hoop(middlewares::add_server_headers)
.hoop(Logger::new())
.hoop(

View file

@ -0,0 +1,57 @@
// OxideTalis Messaging Protocol homeserver implementation
// Copyright (C) 2024 OxideTalis Developers <otmp@4rs.nl>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://gnu.org/licenses/agpl-3.0>.
//! Websocket errors
/// Result type of websocket
pub type WsResult<T> = Result<T, WsError>;
/// Websocket errors, returned in the websocket communication
#[derive(Debug)]
pub enum WsError {
/// The signature is invalid
InvalidSignature,
/// Message type must be text
NotTextMessage,
/// Invalid json data
InvalidJsonData,
/// Unknown client event
UnknownClientEvent,
}
impl WsError {
/// Returns error name
pub const fn name(&self) -> &'static str {
match self {
WsError::InvalidSignature => "InvalidSignature",
WsError::NotTextMessage => "NotTextMessage",
WsError::InvalidJsonData => "InvalidJsonData",
WsError::UnknownClientEvent => "UnknownClientEvent",
}
}
/// Returns the error reason
pub const fn reason(&self) -> &'static str {
match self {
WsError::InvalidSignature => "Invalid event signature",
WsError::NotTextMessage => "The websocket message must be text message",
WsError::InvalidJsonData => "Received invalid json data, the text must be valid json",
WsError::UnknownClientEvent => {
"Unknown client event, the event is not recognized by the server"
}
}
}
}

View file

@ -0,0 +1,61 @@
// OxideTalis Messaging Protocol homeserver implementation
// Copyright (C) 2024 OxideTalis Developers <otmp@4rs.nl>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://gnu.org/licenses/agpl-3.0>.
//! Events that the client send it
use oxidetalis_core::types::Signature;
use serde::{Deserialize, Serialize};
use crate::{utils, NonceCache};
/// Client websocket event
#[derive(Deserialize, Clone, Debug)]
pub struct ClientEvent {
awiteb marked this conversation as resolved
Review

The separation between the type and event data and then checking manually with is_of_type looks wrong.

Why not do it in a single single enum to host all events and data

#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "event", content = "data")]
pub enum ClientEventType {
    Ping { timestamp: u64 },
    Pong { timestamp: u64 },
}

#[derive(Serialize, Deserialize, Debug)]
pub struct ClientEvent {
    #[serde(flatten)]
    pub event_type: ClientEventType,
    pub signature: String,
}

There is duplicate now in ping/pong timestamp, but it would be better in the long run with more events and data.

The separation between the type and event data and then checking manually with `is_of_type` looks wrong. Why not do it in a single single enum to host all events and data ```rust #[derive(Serialize, Deserialize, Debug)] #[serde(tag = "event", content = "data")] pub enum ClientEventType { Ping { timestamp: u64 }, Pong { timestamp: u64 }, } #[derive(Serialize, Deserialize, Debug)] pub struct ClientEvent { #[serde(flatten)] pub event_type: ClientEventType, pub signature: String, } ``` There is duplicate now in `ping/pong` `timestamp`, but it would be better in the long run with more events and data.
Review

Bro!!! I didn't know about tag-content, is amazing

Bro!!! I didn't know about [tag-content](https://serde.rs/container-attrs.html#tag--content), is amazing
Review

Is good, but we have a problem. We need the event data for the signature, but we can't extract it directly in this enum.

I written a function that extract the data by serializing the enum variant to serde_json::Value then extract the data from it, what do you think?

impl ClientEventType {
    /// Returns event data as json bytes
    pub fn data(&self) -> Vec<u8> {
        serde_json::to_vec(&serde_json::to_value(self).expect("can't fail")["data"])
            .expect("can't fail")
    }
}

This will output {"timestamp":4398} as bytes, we will make the signature from it

Is good, but we have a problem. We need the event data for the signature, but we can't extract it directly in this enum. I written a function that extract the data by serializing the enum variant to `serde_json::Value` then extract the `data` from it, what do you think? ```rust impl ClientEventType { /// Returns event data as json bytes pub fn data(&self) -> Vec<u8> { serde_json::to_vec(&serde_json::to_value(self).expect("can't fail")["data"]) .expect("can't fail") } } ``` This will output `{"timestamp":4398}` as bytes, we will make the signature from it
Review

I would use this

serde_json::to_value(&self.event_type).unwrap()["data"].to_string().into_bytes())

first, we don't need to serialize the whole thing as it would include the signature and then we will discard everything except for data, so better to serialize only the smallest needed parts.
second, we can directly use to_string from Value, and take the bytes out of it, I think its a bit better since to_vec would call the serializer for Value

I would use this ```rust serde_json::to_value(&self.event_type).unwrap()["data"].to_string().into_bytes()) ``` first, we don't need to serialize the whole thing as it would include the `signature` and then we will discard everything except for `data`, so better to serialize only the smallest needed parts. second, we can directly use `to_string` from `Value`, and take the bytes out of it, I think its a bit better since `to_vec` would call the serializer for `Value`
Review

small thing, maybe better to rename event_type to event? shorter and its not just the type, as it contain the data as well

small thing, maybe better to rename `event_type` to `event`? shorter and its not just the type, as it contain the data as well
Review

first, we don't need to serialize the whole thing as it would include the signature and then we will discard everything except for data

Right, the function above is for ClientEventType not Event.

I'll force push here today.

> first, we don't need to serialize the whole thing as it would include the signature and then we will discard everything except for data Right, the function above is for `ClientEventType` not `Event`. I'll force push here today.
Review

Ah yes, didn't notice

Ah yes, didn't notice
pub event: ClientEventType,
signature: Signature,
}
/// Client websocket event type
#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Debug)]
#[serde(rename_all = "PascalCase", tag = "event", content = "data")]
pub enum ClientEventType {
/// Ping event
Ping { timestamp: u64 },
/// Pong event
Pong { timestamp: u64 },
}
impl ClientEventType {
/// Returns event data as json bytes
pub fn data(&self) -> Vec<u8> {
serde_json::to_value(self).expect("can't fail")["data"]
.to_string()
.into_bytes()
awiteb marked this conversation as resolved
Review

this is different from ServerEventType, there, its using .to_string().into_bytes()

this is different from `ServerEventType`, there, its using `.to_string().into_bytes()`
Review

Forget it 😬

Forget it 😬
}
}
impl ClientEvent {
/// Verify the signature of the event
pub fn verify_signature(
&self,
shared_secret: &[u8; 32],
nonce_cache: &NonceCache,
nonce_limit: &usize,
) -> bool {
utils::is_valid_nonce(&self.signature, nonce_cache, nonce_limit)
&& self.signature.verify(&self.event.data(), shared_secret)
}
}

View file

@ -0,0 +1,23 @@
// OxideTalis Messaging Protocol homeserver implementation
// Copyright (C) 2024 OxideTalis Developers <otmp@4rs.nl>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://gnu.org/licenses/agpl-3.0>.
//! Server and client websocket events
mod client;
mod server;
pub use client::*;
pub use server::*;

View file

@ -0,0 +1,117 @@
// OxideTalis Messaging Protocol homeserver implementation
// Copyright (C) 2024 OxideTalis Developers <otmp@4rs.nl>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://gnu.org/licenses/agpl-3.0>.
//! Events that the server send it
use std::marker::PhantomData;
use chrono::Utc;
use oxidetalis_core::{cipher::K256Secret, types::Signature};
use salvo::websocket::Message;
use serde::Serialize;
use crate::websocket::errors::WsError;
/// Signed marker, used to indicate that the event is signed
pub struct Signed;
/// Unsigned marker, used to indicate that the event is unsigned
pub struct Unsigned;
/// Server websocket event
#[derive(Serialize, Clone, Debug)]
pub struct ServerEvent<T> {
#[serde(flatten)]
event: ServerEventType,
signature: Signature,
#[serde(skip)]
phantom: PhantomData<T>,
}
/// server websocket event type
#[derive(Serialize, Clone, Eq, PartialEq, Debug)]
#[serde(rename_all = "PascalCase")]
pub enum ServerEventType {
/// Ping event
Ping { timestamp: u64 },
/// Pong event
Pong { timestamp: u64 },
/// Error event
Error {
name: &'static str,
reason: &'static str,
},
}
impl ServerEventType {
/// Returns event data as json bytes
pub fn data(&self) -> Vec<u8> {
serde_json::to_value(self).expect("can't fail")["data"]
.to_string()
.into_bytes()
}
}
impl ServerEvent<Unsigned> {
/// Creates new [`ServerEvent`]
pub fn new(event: ServerEventType) -> Self {
Self {
event,
signature: Signature::from([0u8; 56]),
phantom: PhantomData,
}
}
/// Creates ping event
pub fn ping() -> Self {
Self::new(ServerEventType::Ping {
timestamp: Utc::now().timestamp() as u64,
})
}
/// Creates pong event
pub fn pong() -> Self {
Self::new(ServerEventType::Pong {
timestamp: Utc::now().timestamp() as u64,
})
}
/// Sign the event
pub fn sign(self, shared_secret: &[u8; 32]) -> ServerEvent<Signed> {
ServerEvent::<Signed> {
signature: K256Secret::sign_with_shared_secret(
&serde_json::to_vec(&self.event.data()).expect("Can't fail"),
shared_secret,
),
event: self.event,
phantom: PhantomData,
}
}
}
impl From<&ServerEvent<Signed>> for Message {
fn from(value: &ServerEvent<Signed>) -> Self {
Message::text(serde_json::to_string(value).expect("This can't fail"))
}
}
impl From<WsError> for ServerEvent<Unsigned> {
fn from(err: WsError) -> Self {
ServerEvent::new(ServerEventType::Error {
name: err.name(),
reason: err.reason(),
})
}
}

View file

@ -0,0 +1,222 @@
// OxideTalis Messaging Protocol homeserver implementation
// Copyright (C) 2024 OxideTalis Developers <otmp@4rs.nl>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://gnu.org/licenses/agpl-3.0>.
use std::{collections::HashMap, sync::Arc, time::Duration};
use chrono::Utc;
use errors::{WsError, WsResult};
use futures::{channel::mpsc, FutureExt, StreamExt, TryStreamExt};
use once_cell::sync::Lazy;
use oxidetalis_core::{cipher::K256Secret, types::PublicKey};
use salvo::{
handler,
http::StatusError,
websocket::{Message, WebSocket, WebSocketUpgrade},
Depot,
Request,
Response,
Router,
};
use tokio::{sync::RwLock, task::spawn as tokio_spawn, time::sleep as tokio_sleep};
mod errors;
mod events;
pub use events::*;
use uuid::Uuid;
use crate::{
extensions::{DepotExt, OnlineUsersExt},
middlewares,
utils,
NonceCache,
};
/// Online users type
pub type OnlineUsers = RwLock<HashMap<Uuid, SocketUserData>>;
/// List of online users, users that are connected to the server
// FIXME: Use `std::sync::LazyLock` after it becomes stable in `1.80.0`
static ONLINE_USERS: Lazy<OnlineUsers> = Lazy::new(OnlineUsers::default);
/// A user connected to the server
pub struct SocketUserData {
/// Sender to send messages to the user
pub sender: mpsc::UnboundedSender<salvo::Result<Message>>,
/// User public key
pub public_key: PublicKey,
/// Time that the user pinged at
pub pinged_at: chrono::DateTime<Utc>,
/// Time that the user ponged at
pub ponged_at: chrono::DateTime<Utc>,
/// User shared secret
pub shared_secret: [u8; 32],
}
impl SocketUserData {
/// Creates new [`SocketUserData`]
pub fn new(
public_key: PublicKey,
shared_secret: [u8; 32],
sender: mpsc::UnboundedSender<salvo::Result<Message>>,
) -> Self {
let now = Utc::now();
Self {
sender,
public_key,
shared_secret,
pinged_at: now,
ponged_at: now,
}
}
}
/// WebSocket handler, that handles the user connection
#[handler]
pub async fn user_connected(
req: &mut Request,
res: &mut Response,
depot: &Depot,
) -> Result<(), StatusError> {
let nonce_cache = depot.nonce_cache();
let nonce_limit = *depot.nonce_cache_size();
awiteb marked this conversation as resolved
Review

something I noticed, this nonce_limit is just passed everywhere without a need.

in NonceCacheExt::add_nonce, it takes limit, even though both of these items comes from the depot and just passed around in every function, handle_socket -> handle_ws_msg -> verify_signature -> ...... -> add_nonce

A better approach is to make NonceCache a specific struct (not type) that holds both the hashmap and the limit

something I noticed, this `nonce_limit` is just passed everywhere without a need. in `NonceCacheExt::add_nonce`, it takes `limit`, even though both of these items comes from the depot and just passed around in every function, `handle_socket -> handle_ws_msg -> verify_signature -> ...... -> add_nonce` A better approach is to make `NonceCache` a specific struct (not type) that holds both the hashmap and the limit
let public_key =
utils::extract_public_key(req).expect("The public key was checked in the middleware");
// FIXME: The config should hold `K256Secret` not `PrivateKey`
let shared_secret =
K256Secret::from_privkey(&depot.config().server.private_key).shared_secret(&public_key);
WebSocketUpgrade::new()
.upgrade(req, res, move |ws| {
handle_socket(ws, nonce_cache, nonce_limit, public_key, shared_secret)
})
.await
}
/// Handle the websocket connection
async fn handle_socket(
ws: WebSocket,
nonce_cache: Arc<NonceCache>,
nonce_limit: usize,
user_public_key: PublicKey,
user_shared_secret: [u8; 32],
) {
let (user_ws_sender, mut user_ws_receiver) = ws.split();
let (sender, receiver) = mpsc::unbounded();
let receiver = receiver.into_stream();
awiteb marked this conversation as resolved
Review

sender is already Clone and is implemented with Arc

sender is already `Clone` and is implemented with `Arc`
let fut = receiver.forward(user_ws_sender).map(|result| {
if let Err(err) = result {
log::error!("websocket send error: {err}");
}
});
tokio_spawn(fut);
let conn_id = Uuid::new_v4();
let user = SocketUserData::new(user_public_key, user_shared_secret, sender.clone());
ONLINE_USERS.add_user(&conn_id, user).await;
log::info!("New user connected: ConnId(={conn_id}) PublicKey(={user_public_key})");
let fut = async move {
while let Some(Ok(msg)) = user_ws_receiver.next().await {
match handle_ws_msg(msg, &nonce_cache, &nonce_limit, &user_shared_secret) {
Ok(event) => {
if let Some(server_event) = handle_events(event, &conn_id).await {
if let Err(err) = sender.unbounded_send(Ok(Message::from(
&server_event.sign(&user_shared_secret),
))) {
log::error!("Websocket Error: {err}");
break;
}
};
}
Err(err) => {
if let Err(err) = sender.unbounded_send(Ok(Message::from(
&ServerEvent::from(err).sign(&user_shared_secret),
))) {
log::error!("Websocket Error: {err}");
break;
};
}
};
}
user_disconnected(&conn_id, &user_public_key).await;
};
tokio_spawn(fut);
}
/// Handle websocket msg
fn handle_ws_msg(
msg: Message,
nonce_cache: &NonceCache,
nonce_limit: &usize,
shared_secret: &[u8; 32],
) -> WsResult<ClientEvent> {
let Ok(text) = msg.to_str() else {
return Err(WsError::NotTextMessage);
};
let event = serde_json::from_str::<ClientEvent>(text).map_err(|err| {
if err.is_data() {
WsError::UnknownClientEvent
} else {
WsError::InvalidJsonData
}
})?;
if !event.verify_signature(shared_secret, nonce_cache, nonce_limit) {
return Err(WsError::InvalidSignature);
}
Ok(event)
}
/// Handle user events, and return the server event if needed
async fn handle_events(event: ClientEvent, conn_id: &Uuid) -> Option<ServerEvent<Unsigned>> {
match &event.event {
ClientEventType::Ping { .. } => Some(ServerEvent::pong()),
ClientEventType::Pong { .. } => {
ONLINE_USERS.update_pong(conn_id).await;
None
}
}
}
/// Handle user disconnected
async fn user_disconnected(conn_id: &Uuid, public_key: &PublicKey) {
ONLINE_USERS.remove_user(conn_id).await;
log::debug!("User disconnect: ConnId(={conn_id}) PublicKey(={public_key})");
}
pub fn route() -> Router {
let users_pinger = async {
/// Seconds to wait for pongs, before disconnecting the user
const WAIT_FOR_PONGS_SECS: u32 = 10;
/// Seconds to sleep between pings (10 minutes)
const SLEEP_SECS: u32 = 60 * 10;
loop {
log::debug!("Start pinging online users");
ONLINE_USERS.ping_all().await;
tokio_sleep(Duration::from_secs(u64::from(WAIT_FOR_PONGS_SECS))).await;
awiteb marked this conversation as resolved
Review

I think both of these is better to be config as well, more configuration options for admin, 10 mins is good enough of course and probably won't change by most ppl

I think both of these is better to be config as well, more configuration options for admin, 10 mins is good enough of course and probably won't change by most ppl
Review

more configuration options for admin

I wanted to implement it today, but I felt that this is not something that should be determined by the server administrator? I've never seen a server provide it.

I will not implement it. I don't think that the server administrator must specify the period during which the server sends pings.

> more configuration options for admin I wanted to implement it today, but I felt that this is not something that should be determined by the server administrator? I've never seen a server provide it. I will not implement it. I don't think that the server administrator must specify the period during which the server sends pings.
ONLINE_USERS.disconnect_inactive_users().await;
log::debug!("Done pinging online users and disconnected inactive ones");
tokio_sleep(Duration::from_secs(u64::from(SLEEP_SECS))).await;
}
};
tokio_spawn(users_pinger);
Router::new()
.push(Router::with_path("chat").get(user_connected))
.hoop(middlewares::signature_check)
.hoop(middlewares::public_key_check)
}