From d3b4ebfd535ef5f5cb9394eb70ba89c1f4617071 Mon Sep 17 00:00:00 2001 From: Awiteb Date: Mon, 3 Jun 2024 13:16:02 +0300 Subject: [PATCH] chore: Pass the pinged bots to the threads as mutex Signed-off-by: Awiteb --- src/api.rs | 16 +++++++++++----- src/main.rs | 21 +++++++++++---------- src/superbot.rs | 22 ++++++++++++++++------ 3 files changed, 38 insertions(+), 21 deletions(-) diff --git a/src/api.rs b/src/api.rs index a70f373..04c046a 100644 --- a/src/api.rs +++ b/src/api.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use salvo::{catcher::Catcher, http::HeaderValue, hyper::header, logging::Logger, prelude::*}; @@ -91,13 +91,18 @@ async fn ping(req: &Request, res: &mut Response, depot: &mut Depot) { let app_state = depot .obtain::>() .expect("The app state is injected"); + let bots = depot + .obtain::>>>() + .expect("The bots is injected"); + log::debug!("Bots: {bots:?}"); let msg = if !app_state.bots.contains(&bot_username) { MessageSchema::new("Is not authorized to check the status of this bot") .code(StatusCode::BAD_REQUEST) - } else if let Ok(telegram_id) = superbot::send_start(&app_state.tg_client, &bot_username).await + } else if let Ok(telegram_id) = + superbot::send_start(&app_state.tg_client, &bot_username, bots).await { - if crate::PINGED_BOTS.check(telegram_id) { + if bots.check(telegram_id) { MessageSchema::new("Alive") } else { MessageSchema::new("No response from the bot").code(StatusCode::NOT_FOUND) @@ -177,10 +182,11 @@ async fn add_server_headers(res: &mut Response) { headers.insert("X-Powered-By", HeaderValue::from_static("Rust/Salvo")); } -pub(crate) fn service(app_state: AppState) -> Service { +pub(crate) fn service(app_state: AppState, bots: Arc>>) -> Service { + log::debug!("Bots: {bots:?}"); let router = Router::new() .hoop(Logger::new()) - .hoop(affix::inject(Arc::new(app_state))) + .hoop(affix::inject(Arc::new(app_state)).inject(Arc::clone(&bots))) .hoop(add_server_headers) .hoop(auth) .push(Router::with_path("ping/@").get(ping)); diff --git a/src/main.rs b/src/main.rs index 63d2b5e..f092458 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,9 +16,11 @@ #![doc = include_str!("../README.md")] -use std::{process::ExitCode, sync::Mutex}; +use std::{ + process::ExitCode, + sync::{Arc, Mutex}, +}; -use lazy_static::lazy_static; use salvo::{conn::TcpListener, Listener}; mod api; @@ -32,7 +34,7 @@ pub(crate) use errors::{Error as ServerError, Result as ServerResult}; use tokio::signal; pub(crate) use traits::PingList; -#[derive(Default, Clone)] +#[derive(Default, Clone, Debug)] pub(crate) struct PingedBot { telegram_id: u64, ping_in: i64, @@ -55,13 +57,8 @@ impl PingedBot { } } -lazy_static! { - static ref PINGED_BOTS: Mutex> = Mutex::new(Vec::new()); -} - async fn try_main() -> ServerResult<()> { pretty_env_logger::init(); - dotenv::dotenv().ok(); log::info!("Starting the API"); let cli_args = cli_parser::Args::parse()?; @@ -70,6 +67,9 @@ async fn try_main() -> ServerResult<()> { } let config = config::Config::from_toml_file(&cli_args.config_file)?; + let pinged_bots = Arc::new(Mutex::new(Vec::::new())); + let client_bots = Arc::clone(&pinged_bots); + let server_bots = Arc::clone(&pinged_bots); let (client, sign_out) = superbot::login(config.client.api_hash, config.client.api_id).await?; let app_state = api::AppState::new(config.bots, config.tokens, client.clone()); @@ -78,10 +78,11 @@ async fn try_main() -> ServerResult<()> { let acceptor = TcpListener::new(format!("{}:{}", config.api.host, config.api.port)) .bind() .await; - let client_handler = tokio::spawn(async move { superbot::handler(handler_client).await }); + let client_handler = + tokio::spawn(async move { superbot::handler(handler_client, client_bots).await }); let server_handler = tokio::spawn(async move { salvo::Server::new(acceptor) - .serve(api::service(app_state)) + .serve(api::service(app_state, server_bots)) .await }); log::info!("Bind the API to {}:{}", config.api.host, config.api.port); diff --git a/src/superbot.rs b/src/superbot.rs index 7cf10ad..e1a371d 100644 --- a/src/superbot.rs +++ b/src/superbot.rs @@ -14,6 +14,8 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use std::sync::{Arc, Mutex}; + use grammers_client::{Client, Config, InitParams, SignInError, Update}; use grammers_session::Session; use tokio::{signal, time}; @@ -72,16 +74,19 @@ pub(crate) async fn login(api_hash: String, api_id: i32) -> ServerResult<(Client Ok((client, sign_out)) } -fn update_handler(upd: Update) { +fn update_handler(upd: Update, bots: Arc>>) { + log::debug!("Bots: {bots:?}"); if let Update::NewMessage(msg) = upd { if let Some(sender) = msg.sender() { - crate::PINGED_BOTS.new_res(sender.id() as u64) + bots.new_res(sender.id() as u64) } } } -pub(crate) async fn handler(client: Client) { +pub(crate) async fn handler(client: Client, bots: Arc>>) { + log::debug!("Bots: {bots:?}"); loop { + let cbots = Arc::clone(&bots); tokio::select! { _ = signal::ctrl_c() => { break; @@ -89,17 +94,22 @@ pub(crate) async fn handler(client: Client) { Ok(Some(update)) = client.next_update() => { log::debug!("New update: {update:?}"); tokio::spawn(async move { - update_handler(update) + update_handler(update, cbots) }); } } } } -pub(crate) async fn send_start(client: &Client, bot_username: &str) -> ServerResult { +pub(crate) async fn send_start( + client: &Client, + bot_username: &str, + bots: &Arc>>, +) -> ServerResult { if let Some(chat) = client.resolve_username(bot_username).await? { + log::debug!("Bots: {bots:?}"); let telegram_id = chat.id() as u64; - crate::PINGED_BOTS.add_new(telegram_id); + bots.add_new(telegram_id); client.send_message(chat, "/start").await?; // Sleep, wating the response time::sleep(time::Duration::from_secs(2)).await;