refactor: Refactor in_chat_requests table
All checks were successful
Write changelog / write-changelog (push) Successful in 4s
Update Contributors / Update Contributors (push) Successful in 4s
Rust CI / Rust CI (push) Successful in 4m40s

- Rename `in_chat_requests` table to `incoming_chat`
- Send chat requests and responses when the user is online
- Remove `IncomingChatExt::remove_in_chat_request` trait function
- Rename `in_on` col of `incoming_chat` table to `received_timestamp`
- Add `accepted_response` col to `incoming_chat` table
- Rename `UserId` to `IdCol` and make it private

Reviewed-by: Amjad Alsharafi <me@amjad.alsharafi.dev>
Reviewed-on: #35
Signed-off-by: Awiteb <a@4rs.nl>
This commit is contained in:
Awiteb 2024-07-30 08:45:01 +03:00
parent 9617f69eda
commit fc0642fce6
Signed by: awiteb
GPG key ID: 3F6B55640AA6682F
13 changed files with 295 additions and 131 deletions

View file

@ -1,61 +0,0 @@
// OxideTalis Messaging Protocol homeserver implementation
// Copyright (C) 2024 Awiteb <a@4rs.nl>, OxideTalis Contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://gnu.org/licenses/agpl-3.0>.
//! Database extension for the `in_chat_requests` table.
use chrono::Utc;
use oxidetalis_core::types::PublicKey;
use oxidetalis_entities::prelude::*;
use sea_orm::{sea_query::OnConflict, DatabaseConnection};
use crate::errors::ServerResult;
/// Extension trait for the `in_chat_requests` table.
pub trait InChatRequestsExt {
/// Save the chat request in the recipient table
async fn save_in_chat_request(
&self,
requester: &PublicKey,
recipient: &UserModel,
) -> ServerResult<()>;
}
impl InChatRequestsExt for DatabaseConnection {
#[logcall::logcall]
async fn save_in_chat_request(
&self,
sender: &PublicKey,
recipient: &UserModel,
) -> ServerResult<()> {
InChatRequestsEntity::insert(InChatRequestsActiveModel {
recipient_id: Set(recipient.id),
sender: Set(*sender),
in_on: Set(Utc::now()),
..Default::default()
})
.on_conflict(
OnConflict::columns([
InChatRequestsColumn::RecipientId,
InChatRequestsColumn::Sender,
])
.do_nothing()
.to_owned(),
)
.exec(self)
.await?;
Ok(())
}
}

View file

@ -0,0 +1,142 @@
// OxideTalis Messaging Protocol homeserver implementation
// Copyright (C) 2024 Awiteb <a@4rs.nl>, OxideTalis Contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://gnu.org/licenses/agpl-3.0>.
//! Database extension for the `incoming_chat` table.
use chrono::Utc;
use oxidetalis_core::types::PublicKey;
use oxidetalis_entities::prelude::*;
use sea_orm::{sea_query::OnConflict, DatabaseConnection};
use crate::errors::ServerResult;
/// Extension trait for the `incoming_chat` table.
pub trait IncomingChatExt {
/// Save the incoming chat request
async fn save_in_chat_request(
&self,
chat_request_recipient: &UserModel,
chat_request_sender: &PublicKey,
) -> ServerResult<()>;
/// Returns all incoming chat requests for the given recipient
async fn get_all_chat_requests(
&self,
chat_request_recipient: &UserModel,
) -> ServerResult<Vec<IncomingChatModel>>;
/// Save the incoming chat response
async fn save_in_chat_response(
&self,
chat_response_recipient: &UserModel,
chat_response_sender: &PublicKey,
accepted_response: bool,
) -> ServerResult<()>;
/// Returns all incoming chat responses for the given recipient
async fn get_all_chat_responses(
&self,
chat_response_recipient: &UserModel,
) -> ServerResult<Vec<IncomingChatModel>>;
}
impl IncomingChatExt for DatabaseConnection {
#[logcall::logcall]
async fn save_in_chat_request(
&self,
chat_request_recipient: &UserModel,
chat_request_sender: &PublicKey,
) -> ServerResult<()> {
save(self, chat_request_recipient, chat_request_sender, None).await
}
async fn get_all_chat_requests(
&self,
chat_request_recipient: &UserModel,
) -> ServerResult<Vec<IncomingChatModel>> {
get_all::<true>(self, chat_request_recipient).await
}
#[logcall::logcall]
async fn save_in_chat_response(
&self,
chat_response_recipient: &UserModel,
chat_response_sender: &PublicKey,
accepted_response: bool,
) -> ServerResult<()> {
save(
self,
chat_response_recipient,
chat_response_sender,
Some(accepted_response),
)
.await
}
async fn get_all_chat_responses(
&self,
chat_response_recipient: &UserModel,
) -> ServerResult<Vec<IncomingChatModel>> {
get_all::<false>(self, chat_response_recipient).await
}
}
/// Utility function to save incoming chat request or response
async fn save(
db: &DatabaseConnection,
recipient: &UserModel,
sender: &PublicKey,
accepted_response: Option<bool>,
) -> ServerResult<()> {
IncomingChatEntity::insert(IncomingChatActiveModel {
recipient_id: Set(recipient.id),
sender: Set(*sender),
received_timestamp: Set(Utc::now()),
accepted_response: Set(accepted_response),
..Default::default()
})
.on_conflict(
OnConflict::columns([
IncomingChatColumn::RecipientId,
IncomingChatColumn::Sender,
IncomingChatColumn::AcceptedResponse,
])
.do_nothing()
.to_owned(),
)
.exec(db)
.await?;
Ok(())
}
/// Utility function to get all incoming chat requests or responses
async fn get_all<const IS_REQUEST: bool>(
db: &DatabaseConnection,
recipient: &UserModel,
) -> ServerResult<Vec<IncomingChatModel>> {
recipient
.find_related(IncomingChatEntity)
.filter(
if IS_REQUEST {
IncomingChatColumn::AcceptedResponse.is_null()
} else {
IncomingChatColumn::AcceptedResponse.is_not_null()
},
)
.all(db)
.await
.map_err(Into::into)
}

View file

@ -16,12 +16,12 @@
//! Database trait extensions.
mod in_chat_requests;
mod incoming_chat;
mod out_chat_requests;
mod user;
mod user_status;
pub use in_chat_requests::*;
pub use incoming_chat::*;
pub use out_chat_requests::*;
pub use user::*;
pub use user_status::*;

View file

@ -20,7 +20,7 @@ use oxidetalis_core::types::PublicKey;
use oxidetalis_entities::prelude::*;
use sea_orm::DatabaseConnection;
use crate::database::InChatRequestsExt;
use crate::database::IncomingChatExt;
use crate::errors::ServerError;
use crate::extensions::OnlineUsersExt;
use crate::{
@ -33,44 +33,58 @@ use crate::{
#[logcall::logcall]
pub async fn handle_chat_request(
db: &DatabaseConnection,
from: Option<&UserModel>,
to_public_key: &PublicKey,
chat_request_sender: Option<&UserModel>,
chat_request_recipient: &PublicKey,
) -> Option<ServerEvent<Unsigned>> {
let Some(from_user) = from else {
let Some(chat_request_sender) = chat_request_sender else {
return Some(WsError::RegistredUserEvent.into());
};
let Some(to_user) = try_ws!(Some db.get_user_by_pubk(to_public_key).await) else {
let Some(chat_request_recipient) =
try_ws!(Some db.get_user_by_pubk(chat_request_recipient).await)
else {
return Some(WsError::UserNotFound.into());
};
if from_user.id == to_user.id {
if chat_request_sender.id == chat_request_recipient.id {
return Some(WsError::CannotSendChatRequestToSelf.into());
}
if try_ws!(Some db.get_chat_request_to(from_user, to_public_key).await).is_some() {
if try_ws!(Some db.get_chat_request_to(chat_request_sender, &chat_request_recipient.public_key).await).is_some() {
return Some(WsError::AlreadySendChatRequest.into());
}
if try_ws!(Some db.is_blacklisted(&to_user, &from_user.public_key).await) {
if try_ws!(Some db.is_blacklisted(&chat_request_recipient, &chat_request_sender.public_key).await)
{
return Some(WsError::RecipientBlacklist.into());
}
// To ignore the error if the requester added the recipient to the whitelist
// table before send a request to them
if let Err(ServerError::Internal(_)) = db.add_to_whitelist(from_user, to_public_key).await {
if let Err(ServerError::Internal(_)) = db
.add_to_whitelist(chat_request_sender, &chat_request_recipient.public_key)
.await
{
return Some(WsError::InternalServerError.into());
}
if try_ws!(Some db.is_whitelisted(&to_user, &from_user.public_key).await) {
if try_ws!(Some db.is_whitelisted(&chat_request_recipient, &chat_request_sender.public_key).await)
{
return Some(WsError::AlreadyInRecipientWhitelist.into());
}
try_ws!(Some db.save_out_chat_request(from_user, to_public_key).await);
if let Some(conn_id) = ONLINE_USERS.is_online(to_public_key).await {
try_ws!(Some db.save_out_chat_request(chat_request_sender, &chat_request_recipient.public_key).await);
if let Some(conn_id) = ONLINE_USERS
.is_online(&chat_request_recipient.public_key)
.await
{
ONLINE_USERS
.send(&conn_id, ServerEvent::chat_request(&from_user.public_key))
.send(
&conn_id,
ServerEvent::chat_request(&chat_request_sender.public_key),
)
.await;
} else {
try_ws!(Some db.save_in_chat_request(&from_user.public_key, &to_user).await);
try_ws!(Some db.save_in_chat_request(&chat_request_recipient, &chat_request_sender.public_key).await);
}
None
}
@ -78,22 +92,25 @@ pub async fn handle_chat_request(
#[logcall::logcall]
pub async fn handle_chat_response(
db: &DatabaseConnection,
recipient: Option<&UserModel>,
sender_public_key: &PublicKey,
response_sender: Option<&UserModel>,
response_recipient: &PublicKey,
accepted: bool,
) -> Option<ServerEvent<Unsigned>> {
let Some(recipient_user) = recipient else {
let Some(response_sender) = response_sender else {
return Some(WsError::RegistredUserEvent.into());
};
let Some(sender_user) = try_ws!(Some db.get_user_by_pubk(sender_public_key).await) else {
let Some(response_recipient) = try_ws!(Some db.get_user_by_pubk(response_recipient).await)
else {
return Some(WsError::UserNotFound.into());
};
if recipient_user.id == sender_user.id {
if response_sender.id == response_recipient.id {
return Some(WsError::CannotRespondToOwnChatRequest.into());
}
if try_ws!(Some
db.get_chat_request_to(&sender_user, &recipient_user.public_key)
db.get_chat_request_to(&response_recipient, &response_sender.public_key)
.await
)
.is_none()
@ -104,26 +121,29 @@ pub async fn handle_chat_response(
// We don't need to handle the case where the sender is blacklisted or
// whitelisted already, just add it if it is not already there
let _ = if accepted {
db.add_to_whitelist(recipient_user, sender_public_key).await
db.add_to_whitelist(response_sender, &response_recipient.public_key)
.await
} else {
db.add_to_blacklist(recipient_user, sender_public_key).await
db.add_to_blacklist(response_sender, &response_recipient.public_key)
.await
};
try_ws!(Some
db.remove_out_chat_request(&sender_user, &recipient_user.public_key)
db.remove_out_chat_request(&response_recipient, &response_sender.public_key)
.await
);
if let Some(conn_id) = ONLINE_USERS.is_online(sender_public_key).await {
if let Some(conn_id) = ONLINE_USERS.is_online(&response_recipient.public_key).await {
ONLINE_USERS
.send(
&conn_id,
ServerEvent::chat_request_response(recipient_user.public_key, accepted),
ServerEvent::chat_request_response(response_sender.public_key, accepted),
)
.await;
} else {
// TODO: Create a table for chat request responses, and send them when
// the user logs in
try_ws!(Some
db.save_in_chat_response(&response_recipient, &response_sender.public_key, accepted).await
);
}
None

View file

@ -46,7 +46,7 @@ pub use events::*;
use uuid::Uuid;
use crate::{
database::UserTableExt,
database::{IncomingChatExt, UserTableExt},
extensions::{DepotExt, OnlineUsersExt},
middlewares,
nonce::NonceCache,
@ -144,8 +144,9 @@ async fn handle_socket(
.await;
log::info!("New user connected: ConnId(={conn_id}) PublicKey(={user_public_key})");
// TODO: Send the incoming chat request to the user, while they are offline.
// This after adding last_login col to the user table
if let Some(server_user) = &user {
send_chat_requests_and_responses(&db_conn, &user_shared_secret, &sender, server_user).await;
}
while let Some(Ok(msg)) = user_ws_receiver.next().await {
match handle_ws_msg(msg, &nonce_cache, &user_shared_secret).await {
@ -176,6 +177,56 @@ async fn handle_socket(
user_disconnected(&db_conn, &conn_id, &user_public_key, user).await;
}
/// Send the incoming chat requests and responses to the user while they were
/// offline
///
/// ### Note
/// The errors are ignored, if there is an issue with user connection, the user
/// will be disconnected after this function is called.
///
/// Also we make sure that the user receives the chat requests and responses
/// before we delete them from the database.
async fn send_chat_requests_and_responses(
db_conn: &DatabaseConnection,
user_shared_secret: &[u8; 32],
sender: &mpsc::UnboundedSender<Result<Message, salvo::Error>>,
server_user: &UserModel,
) {
let Ok(requests) = db_conn.get_all_chat_requests(server_user).await else {
return;
};
let Ok(responses) = db_conn.get_all_chat_responses(server_user).await else {
return;
};
for request in requests {
if sender
.unbounded_send(Ok(ServerEvent::chat_request(&request.sender)
.sign(user_shared_secret)
.as_ref()
.into()))
.is_ok()
{
let _ = request.delete(db_conn).await;
}
}
for response in responses {
if sender
.unbounded_send(Ok(ServerEvent::chat_request_response(
response.sender,
response.accepted_response.unwrap_or_default(),
)
.sign(user_shared_secret)
.as_ref()
.into()))
.is_ok()
{
let _ = response.delete(db_conn).await;
}
}
}
/// Handle websocket msg
async fn handle_ws_msg(
msg: Message,

View file

@ -19,7 +19,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
//! Entity for `in_chat_requests` table
//! Entity for `incoming_chat` table
use chrono::Utc;
use oxidetalis_core::types::PublicKey;
@ -28,15 +28,18 @@ use sea_orm::entity::prelude::*;
use crate::prelude::*;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "in_chat_requests")]
#[sea_orm(table_name = "incoming_chat")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: UserId,
pub recipient_id: UserId,
pub id: IdCol,
pub recipient_id: IdCol,
/// Public key of the sender
pub sender: PublicKey,
/// Whether the chat response accepted or not.
/// This will be `None` if it is chat request, otherwise `bool`
pub accepted_response: Option<bool>,
/// The timestamp of the request, when it was received
pub in_on: chrono::DateTime<Utc>,
pub received_timestamp: chrono::DateTime<Utc>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

View file

@ -21,7 +21,7 @@
#![doc = include_str!("../README.md")]
pub mod incoming_chat_requests;
pub mod incoming_chat;
pub mod outgoing_chat_requests;
pub mod prelude;
pub mod users;

View file

@ -31,8 +31,8 @@ use crate::prelude::*;
#[sea_orm(table_name = "out_chat_requests")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: UserId,
pub sender_id: UserId,
pub id: IdCol,
pub sender_id: IdCol,
/// Public key of the recipient
pub recipient: PublicKey,
/// The timestamp of the request, when it was sent

View file

@ -38,13 +38,13 @@ pub use sea_orm::{
};
/// User ID type
pub type UserId = i64;
pub(crate) type IdCol = i64;
pub use super::incoming_chat_requests::{
ActiveModel as InChatRequestsActiveModel,
Column as InChatRequestsColumn,
Entity as InChatRequestsEntity,
Model as InChatRequestsModel,
pub use super::incoming_chat::{
ActiveModel as IncomingChatActiveModel,
Column as IncomingChatColumn,
Entity as IncomingChatEntity,
Model as IncomingChatModel,
};
pub use super::outgoing_chat_requests::{
ActiveModel as OutChatRequestsActiveModel,

View file

@ -31,7 +31,7 @@ use crate::prelude::*;
#[sea_orm(table_name = "users")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: UserId,
pub id: IdCol,
pub public_key: PublicKey,
pub last_logout: chrono::DateTime<Utc>,
pub is_admin: bool,
@ -39,17 +39,17 @@ pub struct Model {
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(has_many = "InChatRequestsEntity")]
InChatRequests,
#[sea_orm(has_many = "IncomingChatEntity")]
IncomingChat,
#[sea_orm(has_many = "OutChatRequestsEntity")]
OutChatRequests,
#[sea_orm(has_many = "UsersStatusEntity")]
UsersStatus,
}
impl Related<InChatRequestsEntity> for Entity {
impl Related<IncomingChatEntity> for Entity {
fn to() -> RelationDef {
Relation::InChatRequests.def()
Relation::IncomingChat.def()
}
}

View file

@ -40,8 +40,8 @@ pub enum AccessStatus {
#[sea_orm(table_name = "users_status")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: UserId,
pub user_id: UserId,
pub id: IdCol,
pub user_id: IdCol,
pub target: PublicKey,
pub status: AccessStatus,
pub updated_at: chrono::DateTime<Utc>,

View file

@ -19,8 +19,8 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
//! Migration to create the `in_chat_requests` table, a table for incoming chat
//! requests from other users
//! Migration to create the `incoming_chat` table, a table for incoming chat
//! requests and responses from other users
use sea_orm_migration::prelude::*;
@ -35,31 +35,37 @@ impl MigrationTrait for Migration {
manager
.create_table(
Table::create()
.table(InChatRequests::Table)
.table(IncomingChat::Table)
.if_not_exists()
.col(
ColumnDef::new(InChatRequests::Id)
ColumnDef::new(IncomingChat::Id)
.big_integer()
.not_null()
.auto_increment()
.primary_key(),
)
.col(
ColumnDef::new(InChatRequests::RecipientId)
ColumnDef::new(IncomingChat::RecipientId)
.big_integer()
.not_null(),
)
.foreign_key(
ForeignKey::create()
.name("fk-in_chat_requests-users")
.from(InChatRequests::Table, InChatRequests::RecipientId)
.name("fk-incoming_chat-users")
.from(IncomingChat::Table, IncomingChat::RecipientId)
.to(Users::Table, Users::Id)
.on_update(ForeignKeyAction::NoAction)
.on_delete(ForeignKeyAction::Cascade),
)
.col(ColumnDef::new(InChatRequests::Sender).binary().not_null())
.col(ColumnDef::new(IncomingChat::Sender).binary().not_null())
.col(
ColumnDef::new(InChatRequests::InOn)
ColumnDef::new(IncomingChat::AcceptedResponse)
.boolean()
.null()
.default(Option::<bool>::None),
)
.col(
ColumnDef::new(IncomingChat::ReceivedTimestamp)
.timestamp_with_time_zone()
.not_null(),
)
@ -71,9 +77,10 @@ impl MigrationTrait for Migration {
Index::create()
.if_not_exists()
.name("sep_request")
.table(InChatRequests::Table)
.col(InChatRequests::RecipientId)
.col(InChatRequests::Sender)
.table(IncomingChat::Table)
.col(IncomingChat::RecipientId)
.col(IncomingChat::Sender)
.col(IncomingChat::AcceptedResponse)
.unique()
.to_owned(),
)
@ -82,11 +89,13 @@ impl MigrationTrait for Migration {
}
#[derive(DeriveIden)]
enum InChatRequests {
enum IncomingChat {
Table,
Id,
RecipientId,
/// Public key of the sender
Sender,
InOn,
/// Whether the chat response accepted or not
AcceptedResponse,
ReceivedTimestamp,
}

View file

@ -24,7 +24,7 @@
use sea_orm_migration::prelude::*;
pub use sea_orm_migration::MigratorTrait;
mod create_incoming_chat_requests_table;
mod create_incoming_chat_table;
mod create_outgoing_chat_requests_table;
mod create_users_status;
mod create_users_table;
@ -36,7 +36,7 @@ impl MigratorTrait for Migrator {
fn migrations() -> Vec<Box<dyn MigrationTrait>> {
vec![
Box::new(create_users_table::Migration),
Box::new(create_incoming_chat_requests_table::Migration),
Box::new(create_incoming_chat_table::Migration),
Box::new(create_outgoing_chat_requests_table::Migration),
Box::new(create_users_status::Migration),
]