From 9fc6964bc3e17b0b35e5a0bb97c83817815ee665 Mon Sep 17 00:00:00 2001 From: shiroyashik Date: Fri, 12 Sep 2025 01:37:59 +0300 Subject: [PATCH] feat: added state pings --- Cargo.lock | 21 ----------- Cargo.toml | 2 +- src/api/figura/websocket/handler.rs | 44 +++++++++++++++++++---- src/api/figura/websocket/types/session.rs | 2 ++ src/main.rs | 22 ++++++------ src/state/state.rs | 2 ++ 6 files changed, 54 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6f49d29..c72fc56 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1306,30 +1306,9 @@ dependencies = [ "memchr", "parking_lot", "procfs", - "protobuf", "thiserror 2.0.12", ] -[[package]] -name = "protobuf" -version = "3.7.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d65a1d4ddae7d8b5de68153b48f6aa3bba8cb002b243dbdbc55a5afbc98f99f4" -dependencies = [ - "once_cell", - "protobuf-support", - "thiserror 1.0.69", -] - -[[package]] -name = "protobuf-support" -version = "3.7.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e36c2f31e0a47f9280fb347ef5e461ffcd2c52dd520d8e216b52f93b0b0d7d6" -dependencies = [ - "thiserror 1.0.69", -] - [[package]] name = "quinn" version = "0.11.8" diff --git a/Cargo.toml b/Cargo.toml index 73eb071..b2b3719 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,4 +41,4 @@ rand = "0.9" axum = { version = "0.8", features = ["ws", "macros", "http2"] } tower-http = { version = "0.6", features = ["trace"] } tokio = { version = "1.41", features = ["full"] } -prometheus = { version = "0.14", features = ["process"] } \ No newline at end of file +prometheus = { version = "0.14", default-features = false, features = ["process"] } \ No newline at end of file diff --git a/src/api/figura/websocket/handler.rs b/src/api/figura/websocket/handler.rs index 80424aa..2d5d7fe 100644 --- a/src/api/figura/websocket/handler.rs +++ b/src/api/figura/websocket/handler.rs @@ -66,6 +66,9 @@ async fn handle_socket(mut ws: WebSocket, state: AppState) { #[instrument(skip_all, fields(nickname = %session.user.nickname))] async fn main_worker(session: &mut WSSession, ws: &mut WebSocket, state: &AppState) -> anyhow::Result<()> { + state.state_pings.insert(session.user.uuid, vec![]); + let mut next_state_ping = false; + tracing::debug!("WebSocket control for {} is transferred to the main worker", session.user.nickname); loop { tokio::select! { @@ -83,18 +86,35 @@ async fn main_worker(session: &mut WSSession, ws: &mut WebSocket, state: &AppSta }, }; + tracing::trace!(ping = ?external_msg); // Processing message match external_msg { C2SMessage::Token(_) => bail!("authentication passed, but the client sent the Token again"), C2SMessage::Ping(func_id, echo, data) => { - let s2c_ping: Vec = S2CMessage::Ping(session.user.uuid, func_id, echo, data).into(); - - // Echo check - if echo { - ws.send(Message::Binary(s2c_ping.clone().into())).await? + if !(func_id == 252645133) { + // Normal procedure + let s2c_ping: Vec = S2CMessage::Ping(session.user.uuid, func_id, echo, data).into(); + + // State ping storing + if next_state_ping { + let mut vec = state.state_pings.get_mut(&session.user.uuid).unwrap(); + vec.push(s2c_ping.clone().into()); + next_state_ping = false; + } + // Echo check + if echo { + ws.send(Message::Binary(s2c_ping.clone().into())).await? + } + // Sending to others + let _ = session.subs_tx.send(s2c_ping); + } else { + // State ping procedure + match data[1] { + 0 => { state.state_pings.insert(session.user.uuid, vec![]); }, + 1 => { next_state_ping = !next_state_ping; }, + _ => {} + } } - // Sending to others - let _ = session.subs_tx.send(s2c_ping); }, C2SMessage::Sub(uuid) => { tracing::debug!("[WebSocket] {} subscribes to {}", session.user.nickname, uuid); @@ -112,6 +132,16 @@ async fn main_worker(session: &mut WSSession, ws: &mut WebSocket, state: &AppSta }; let handle = tokio::spawn(sub_worker(session.own_tx.clone(), rx)).abort_handle(); session.sub_workers_aborthandles.insert(uuid, handle); + + // Apply state pings / bmpdpvw / 252645133 + if let Some(vec) = state.state_pings.get(&uuid) { + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + for ping in &*vec { + let msg = Message::Binary(ping.clone().into()); + tracing::trace!(msg = ?msg); + ws.send(msg).await? + } + }; } }, C2SMessage::Unsub(uuid) => { diff --git a/src/api/figura/websocket/types/session.rs b/src/api/figura/websocket/types/session.rs index c8e1e99..663b6f2 100644 --- a/src/api/figura/websocket/types/session.rs +++ b/src/api/figura/websocket/types/session.rs @@ -1,5 +1,6 @@ use dashmap::DashMap; use tokio::{sync::{broadcast, mpsc}, task::AbortHandle}; +// use uuid::Uuid; pub struct WSSession { pub user: crate::auth::Userinfo, @@ -11,5 +12,6 @@ pub struct WSSession { pub enum SessionMessage { Ping(Vec), + // Sub(Uuid), Banned, } \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index ab37851..bbc470d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -158,6 +158,7 @@ async fn app() -> Result { uptime: Instant::now(), user_manager: Arc::new(UManager::new()), session: Arc::new(DashMap::new()), + state_pings: Arc::new(DashMap::new()), subscribes: Arc::new(DashMap::new()), figura_versions: Arc::new(RwLock::new(None)), config: Arc::new(RwLock::new(config.clone())), @@ -200,16 +201,17 @@ async fn app() -> Result { .route("/ws", get(ws)) .merge(metrics::metrics_router(config.metrics_enabled)) .with_state(state) - .layer(TraceLayer::new_for_http() - // .on_request(|request: &axum::http::Request<_>, _span: &tracing::Span| { - // // only for developing purposes - // tracing::trace!(headers = ?request.headers(), "started processing request"); - // }) - .on_response(|response: &axum::http::Response<_>, latency: std::time::Duration, _span: &tracing::Span| { - tracing::trace!(latency = ?latency, status = ?response.status(), "finished processing request"); - }) - .on_request(()) - ) + // .layer(TraceLayer::new_for_http() + // // .on_request(|request: &axum::http::Request<_>, _span: &tracing::Span| { + // // // only for developing purposes + // // tracing::trace!(headers = ?request.headers(), "started processing request"); + // // }) + // .on_response(|response: &axum::http::Response<_>, latency: std::time::Duration, _span: &tracing::Span| { + // tracing::trace!(latency = ?latency, status = ?response.status(), "finished processing request"); + // }) + // .on_request(()) + // ) + .layer(TraceLayer::new_for_http().on_request(())) .layer(axum::middleware::from_fn(track_metrics)) .route("/health", get(|| async { "ok" })); diff --git a/src/state/state.rs b/src/state/state.rs index 70601e6..560b138 100644 --- a/src/state/state.rs +++ b/src/state/state.rs @@ -14,6 +14,8 @@ pub struct AppState { pub user_manager: Arc, /// Send into WebSocket pub session: Arc>>, + + pub state_pings: Arc>>>, /// Send messages for subscribers pub subscribes: Arc>>>, /// Current configuration