diff --git a/Config.example.toml b/Config.example.toml index a039d2b..9fe93b3 100644 --- a/Config.example.toml +++ b/Config.example.toml @@ -1,7 +1,11 @@ -listen = "0.0.0.0:6665" # Don't touch this if you running under Docker container +## Don't touch this if you running under Docker container +listen = "0.0.0.0:6665" -# Message of The Day -# It will be displayed to every player in the Figura menu who is connected to your server +## Don't touch if you don't know what you're doing +# token = "" + +## Message of The Day +## It will be displayed to every player in the Figura menu who is connected to your server motd = """ [ { @@ -41,7 +45,7 @@ motd = """ """ [limitations] -maxAvatarSize = 100000 +maxAvatarSize = 100000 # 100 KB maxAvatars = 10 [advancedUsers.66004548-4de5-49de-bade-9c3933d8eb97] @@ -50,11 +54,11 @@ authSystem = "elyby" special = [0,0,0,1,0,0] # 6 pride = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] # 25 -# With advancedUsers you can set additional parameters +## With advancedUsers you can set additional parameters # [advancedUsers.your uuid here] # username = "Your_username_here" # authSystem = "mojang" # can be: mojang, elyby, internal (cant be authenticated) # special = [0,1,0,0,0,0] # and set badges what you want! :D # pride = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] -# you can create an unlimited number of "advancedUsers" for any users. \ No newline at end of file +## you can create an unlimited number of "advancedUsers" for any users. \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index 9d29054..4ab2978 100644 --- a/src/config.rs +++ b/src/config.rs @@ -7,11 +7,27 @@ use toml::Table; #[serde(rename_all = "camelCase")] pub struct Config { pub listen: String, + pub token: Option, pub motd: String, pub limitations: Limitations, pub advanced_users: Table, } +impl Config { + pub fn verify_token(&self, suspicious: &str) -> bool { + match &self.token { + Some(t) => { + if t == suspicious { + true + } else { + false + } + }, + None => false + } + } +} + #[derive(Deserialize, Clone, Debug, PartialEq)] #[serde(rename_all = "camelCase")] pub struct Limitations { diff --git a/src/main.rs b/src/main.rs index 6f29695..3f24c15 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,7 @@ use axum::{ }; use dashmap::DashMap; use std::sync::Arc; -use tokio::sync::{broadcast, Mutex}; +use tokio::sync::{broadcast, mpsc, Mutex}; use tower_http::trace::TraceLayer; use tracing::info; use uuid::Uuid; @@ -35,12 +35,10 @@ use config::Config; #[derive(Debug, Clone)] pub struct AppState { - /// Users with incomplete authentication - //pending: Arc>, // - /// Authenticated users - //authenticated: Arc, // NOTE: In the future, try it in a separate LockRw branch /// User manager user_manager: Arc, + /// Send into WebSocket + session: Arc>>>, /// Ping broadcasts for WebSocket connections broadcasts: Arc>>>, /// Current configuration @@ -73,6 +71,7 @@ async fn main() -> Result<()> { // State let state = AppState { user_manager: Arc::new(UManager::new()), + session: Arc::new(DashMap::new()), broadcasts: Arc::new(DashMap::new()), config: config, }; @@ -97,6 +96,7 @@ async fn main() -> Result<()> { let api = Router::new() .nest("//auth", api_auth::router()) + .nest("/v1", ws::http_router()) .route("/limits", get(api_info::limits)) .route("/version", get(api_info::version)) .route("/motd", get(api_info::motd)) diff --git a/src/profile.rs b/src/profile.rs index d377cf2..73c5815 100644 --- a/src/profile.rs +++ b/src/profile.rs @@ -136,7 +136,7 @@ pub async fn equip_avatar(Token(token): Token, State(state): State) -> .send(S2CMessage::Event(uuid).to_vec()) .is_err() { - warn!("[WebSocket] Failed to send Event! Maybe there is no one to send") + debug!("[WebSocket] Failed to send Event! Maybe there is no one to send") // TODO: Put into Handler }; "ok".to_string() diff --git a/src/ws/http.rs b/src/ws/http.rs new file mode 100644 index 0000000..6ce0518 --- /dev/null +++ b/src/ws/http.rs @@ -0,0 +1,83 @@ +use axum::{ + extract::{Query, State}, + http::StatusCode, + response::{IntoResponse, Response}, + routing::{get, post}, + Router +}; +use serde::Deserialize; +use tracing::debug; +use uuid::Uuid; + +use crate::{auth::Token, AppState}; + +pub fn router() -> Router { + Router::new() + .route("/verify", get(verify)) + // .route("/ping", post(ping)) + // .route("/event", post(event)) + // .route("/toast", post(toast)) + // .route("/chat", post(chat)) + // .route("/notice", post(notice)) + .route("/raw", post(raw)) +} + +#[derive(Deserialize)] +struct UserUuid { + uuid: Option, +} + +async fn verify( + Token(token): Token, + State(state): State, +) -> Response { + match token { + Some(t) => { + if !state.config.lock().await.verify_token(&t) { + return (StatusCode::UNAUTHORIZED, "wrong token".to_string()).into_response() + } + }, + None => return (StatusCode::UNAUTHORIZED, "unauthorized".to_string()).into_response(), + } + (StatusCode::OK, "ok".to_string()).into_response() +} + +async fn raw( + Token(token): Token, + Query(query): Query, + State(state): State, + body: String, +) -> Response { + debug!(body = body); + match token { + Some(t) => { + if !state.config.lock().await.verify_token(&t) { + return (StatusCode::UNAUTHORIZED, "wrong token".to_string()).into_response() + } + }, + None => return (StatusCode::UNAUTHORIZED, "unauthorized".to_string()).into_response(), + } + let payload = match hex::decode(body) { + Ok(v) => v, + Err(_) => return (StatusCode::NOT_ACCEPTABLE, "not raw data".to_string()).into_response(), + }; + debug!("{:?}", payload); + + match query.uuid { + Some(uuid) => { + // for only one + let tx = match state.session.get(&uuid) { + Some(d) => d, + None => return (StatusCode::NOT_FOUND, "unknown uuid".to_string()).into_response(), + }; + match tx.value().send(payload).await { + Ok(_) => return (StatusCode::OK, "ok".to_string()).into_response(), + Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, "cant send".to_string()).into_response(), + }; + }, + None => { + // for all + return (StatusCode::NOT_FOUND, "uuid doesnt defined".to_string()).into_response(); + }, + } +} \ No newline at end of file diff --git a/src/ws/mod.rs b/src/ws/mod.rs index c9ad8f7..270230d 100644 --- a/src/ws/mod.rs +++ b/src/ws/mod.rs @@ -1,9 +1,8 @@ -mod c2s; -mod errors; -mod handler; -mod s2c; +mod types; +mod websocket; +mod http; -pub use c2s::C2SMessage; -pub use errors::MessageLoadError; -pub use handler::handler; -pub use s2c::S2CMessage; +pub use types::C2SMessage; +pub use types::S2CMessage; +pub use websocket::handler; +pub use http::router as http_router; \ No newline at end of file diff --git a/src/ws/c2s.rs b/src/ws/types/c2s.rs similarity index 100% rename from src/ws/c2s.rs rename to src/ws/types/c2s.rs diff --git a/src/ws/errors.rs b/src/ws/types/errors.rs similarity index 100% rename from src/ws/errors.rs rename to src/ws/types/errors.rs diff --git a/src/ws/types/mod.rs b/src/ws/types/mod.rs new file mode 100644 index 0000000..d26b7db --- /dev/null +++ b/src/ws/types/mod.rs @@ -0,0 +1,7 @@ +mod c2s; +mod errors; +mod s2c; + +pub use c2s::C2SMessage; +pub use errors::MessageLoadError; +pub use s2c::S2CMessage; diff --git a/src/ws/s2c.rs b/src/ws/types/s2c.rs similarity index 100% rename from src/ws/s2c.rs rename to src/ws/types/s2c.rs diff --git a/src/ws/handler.rs b/src/ws/websocket.rs similarity index 74% rename from src/ws/handler.rs rename to src/ws/websocket.rs index 0e54f51..c5c55aa 100644 --- a/src/ws/handler.rs +++ b/src/ws/websocket.rs @@ -24,9 +24,6 @@ pub async fn handler(ws: WebSocketUpgrade, State(state): State) -> Res ws.on_upgrade(|socket| handle_socket(socket, state)) } -#[derive(Debug, Clone)] -struct WSOwner(Option); - #[derive(Debug, Clone)] struct WSUser { username: String, @@ -34,9 +31,13 @@ struct WSUser { uuid: Uuid, } -impl WSOwner { +trait ExtWSUser { + fn name(&self) -> String; +} + +impl ExtWSUser for Option { fn name(&self) -> String { - if let Some(user) = &self.0 { + if let Some(user) = self { format!(" ({})", user.username) } else { String::new() @@ -46,31 +47,24 @@ impl WSOwner { async fn handle_socket(mut socket: WebSocket, state: AppState) { debug!("[WebSocket] New unknown connection!"); - let mut owner = WSOwner(None); - let cutoff: DashMap> = DashMap::new(); - let (mtx, mut mrx) = mpsc::channel(64); - let mut bctx: Option>> = None; + let mut owner: Option = None; // Information about user + let cutoff: DashMap> = DashMap::new(); // Отключение подписки + let (mtx, mut mrx) = mpsc::channel(64); // multiple tx and single receive + let mut bctx: Option>> = None; // broadcast tx send loop { tokio::select! { + // Main loop what receving messages from WebSocket Some(msg) = socket.recv() => { trace!("[WebSocket{}] Raw: {msg:?}", owner.name()); let mut msg = if let Ok(msg) = msg { if let Message::Close(_) = msg { info!("[WebSocket{}] Connection successfully closed!", owner.name()); - if let Some(u) = owner.0 { - state.broadcasts.remove(&u.uuid); - state.user_manager.remove(&u.uuid); - } - return; + break; } msg } else { debug!("[WebSocket{}] Receive error! Connection terminated!", owner.name()); - if let Some(u) = owner.0 { - state.broadcasts.remove(&u.uuid); - state.user_manager.remove(&u.uuid); - } - return; + break; }; // Next is the code for processing msg let msg_vec = msg.clone().into_data(); @@ -80,11 +74,7 @@ async fn handle_socket(mut socket: WebSocket, state: AppState) { Ok(data) => data, Err(e) => { error!("[WebSocket{}] This message is not from Figura! {e:?}", owner.name()); - if let Some(u) = owner.0 { - state.broadcasts.remove(&u.uuid); - state.user_manager.remove(&u.uuid); - } - return; + break; }, }; @@ -97,7 +87,8 @@ async fn handle_socket(mut socket: WebSocket, state: AppState) { match state.user_manager.get(&token) { // The principle is simple: if there is no token in authenticated, then it's "dirty hacker" :D Some(t) => { //username = t.username.clone(); - owner.0 = Some(WSUser { username: t.username.clone(), token, uuid: t.uuid }); + owner = Some(WSUser { username: t.username.clone(), token, uuid: t.uuid }); + state.session.insert(t.uuid, mtx.clone()); msg = Message::Binary(S2CMessage::Auth.to_vec()); match state.broadcasts.get(&t.uuid) { Some(tx) => { @@ -113,53 +104,52 @@ async fn handle_socket(mut socket: WebSocket, state: AppState) { None => { warn!("[WebSocket] Authenticaton error! Connection terminated!"); debug!("[WebSocket] Tried to log in with {token}"); // Tried to log in with token: {token} - if let Some(u) = owner.0 { - state.broadcasts.remove(&u.uuid); - state.user_manager.remove(&u.uuid); - } - return; // TODO: Define the trip code + break; }, }; }, C2SMessage::Ping(_, _, _) => { debug!("[WebSocket{}] C2S : Ping", owner.name()); - let data = into_s2c_ping(msg_vec, owner.0.clone().unwrap().uuid); + let data = into_s2c_ping(msg_vec, owner.clone().unwrap().uuid); match bctx.clone().unwrap().send(data) { Ok(_) => (), Err(_) => debug!("[WebSocket{}] Failed to send Ping! Maybe there's no one to send", owner.name()), }; continue; }, + // Subscribing C2SMessage::Sub(uuid) => { // TODO: Eliminate the possibility of using SUB without authentication debug!("[WebSocket{}] C2S : Sub", owner.name()); - // Rejecting Sub to itself - if uuid == owner.0.clone().unwrap().uuid { + // Ignoring self Sub + if uuid == owner.clone().unwrap().uuid { continue; }; - let rx = match state.broadcasts.get(&uuid) { - Some(rx) => rx.to_owned().subscribe(), + let rx = match state.broadcasts.get(&uuid) { // Get sender + Some(rx) => rx.to_owned().subscribe(), // Subscribe on sender to get receiver None => { warn!("[WebSocket{}] Attention! The required UUID for subscription was not found!", owner.name()); - let (tx, rx) = broadcast::channel(64); - state.broadcasts.insert(uuid, tx); + let (tx, rx) = broadcast::channel(64); // Pre creating broadcast for future + state.broadcasts.insert(uuid, tx); // Inserting into dashmap rx }, }; - let shutdown = Arc::new(Notify::new()); - tokio::spawn(subscribe(mtx.clone(), rx, shutdown.clone())); - cutoff.insert(uuid, shutdown); + let shutdown = Arc::new(Notify::new()); // Creating new shutdown + tokio::spawn(subscribe(mtx.clone(), rx, shutdown.clone())); // + cutoff.insert(uuid, shutdown); continue; }, + // Unsubscribing C2SMessage::Unsub(uuid) => { debug!("[WebSocket{}] C2S : Unsub", owner.name()); - // Rejecting UnSub to itself - if uuid == owner.0.clone().unwrap().uuid { + // Ignoring self Unsub + if uuid == owner.clone().unwrap().uuid { continue; }; - let shutdown = cutoff.remove(&uuid).unwrap().1; - shutdown.notify_one(); + + let shutdown = cutoff.remove(&uuid).unwrap().1; // Getting from list // FIXME: UNWRAP PANIC! NONE VALUE + shutdown.notify_one(); // Shutdown function continue; }, } @@ -168,11 +158,7 @@ async fn handle_socket(mut socket: WebSocket, state: AppState) { debug!("[WebSocket{}] Answering: {msg:?}", owner.name()); if socket.send(msg).await.is_err() { warn!("[WebSocket{}] Send error! Connection terminated!", owner.name()); - if let Some(u) = owner.0 { - state.broadcasts.remove(&u.uuid); - state.user_manager.remove(&u.uuid); - } - return; + break; } } msg = mrx.recv() => { @@ -182,16 +168,19 @@ async fn handle_socket(mut socket: WebSocket, state: AppState) { } Err(_) => { warn!("[WebSocketSubscriber{}] Send error! Connection terminated!", owner.name()); - if let Some(u) = owner.0 { - state.broadcasts.remove(&u.uuid); - state.user_manager.remove(&u.uuid); - } - return; + break; } } } } } + // Closing connection + if let Some(u) = owner { + state.session.remove(&u.uuid); + // state.broadcasts.remove(&u.uuid); // NOTE: Create broadcasts manager ?? + state.user_manager.remove(&u.uuid); + } + } async fn subscribe(