diff --git a/.gitignore b/.gitignore index 6ac6950..1335bc3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /target /Assets-main +/avatars output.log \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 2a070a9..ce7e3ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -38,6 +38,21 @@ version = "1.0.83" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "25bdb32cbbdce2b519a9cd7df3a678443100e265d5e25ca763b7572a5104f5f3" +[[package]] +name = "anyhow-http" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1e146d0ff1e765ca855fba9205903fd26a0f67956be6c2a2b3b1dddbe62c182" +dependencies = [ + "anyhow", + "axum", + "bytes", + "http", + "mime", + "serde", + "serde_json", +] + [[package]] name = "async-trait" version = "0.1.80" @@ -303,6 +318,7 @@ dependencies = [ "serde", "serde_json", "tokio", + "uuid", ] [[package]] @@ -481,6 +497,12 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "http" version = "1.1.0" @@ -1081,17 +1103,22 @@ name = "sculptor" version = "0.1.0" dependencies = [ "anyhow", + "anyhow-http", "axum", + "base64 0.22.1", "chrono", "dashmap", "elyby-api", "fern", + "hex", "log", "rand", "ring", "serde", + "serde_json", "tokio", "tower-http", + "uuid", ] [[package]] @@ -1517,6 +1544,15 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" +[[package]] +name = "uuid" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" +dependencies = [ + "serde", +] + [[package]] name = "vcpkg" version = "0.2.15" diff --git a/Cargo.toml b/Cargo.toml index c66be4b..1ad681c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,8 +12,6 @@ members = ["elyby-api"] # Logging log = "0.4.21" fern = { version = "0.6.2", features = ["colored"] } - -# Errors anyhow = "1.0.83" # Serialization @@ -29,8 +27,15 @@ ring = "0.17.8" rand = "0.8.5" # Web framework -axum = { version = "0.7.5", features = ["ws", "macros"] } +axum = { version = "0.7.5", features = ["ws", "macros", "http2"] } tower-http = { version = "0.5.2", features = ["trace"] } tokio = { version = "1.37.0", features = ["full"] } +hex = "0.4.3" +uuid = { version = "1.8.0", features = ["serde"] } +base64 = "0.22.1" +serde_json = "1.0.117" +anyhow-http = { version = "0.3.0", features = ["axum"] } +# TODO: Рассортировать! +# TODO: Заменить Vec и &[u8] на Bytes # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html \ No newline at end of file diff --git a/elyby-api/Cargo.toml b/elyby-api/Cargo.toml index b398095..7ccd3d0 100644 --- a/elyby-api/Cargo.toml +++ b/elyby-api/Cargo.toml @@ -8,7 +8,8 @@ edition = "2021" [dependencies] anyhow = "1.0.83" log = "0.4.21" -reqwest = "0.12.4" +reqwest = { version = "0.12.4" } serde = { version = "1.0.201", features = ["derive"] } serde_json = "1.0.117" tokio = { version = "1.37.0", features = ["full"] } +uuid = "1.8.0" diff --git a/elyby-api/src/lib.rs b/elyby-api/src/lib.rs index 8b301e3..c9ae645 100644 --- a/elyby-api/src/lib.rs +++ b/elyby-api/src/lib.rs @@ -1,14 +1,20 @@ use anyhow::{anyhow, Result}; use log::debug; +use serde_json::Value; +use uuid::Uuid; -pub async fn has_joined(server_id: &str, username: &str) -> Result { +pub async fn has_joined(server_id: &str, username: &str) -> Result> { let client = reqwest::Client::new(); let res = client.get( format!("http://minecraft.ely.by/session/hasJoined?serverId={server_id}&username={username}")).send().await?; debug!("{res:?}"); match res.status().as_u16() { - 200 => Ok(true), - 401 => Ok(false), + 200 => { + let json = serde_json::from_str::(&res.text().await?)?; + let uuid = Uuid::parse_str(json["id"].as_str().unwrap())?; + Ok(Some(uuid)) + }, + 401 => Ok(None), _ => Err(anyhow!("Unknown code: {}", res.status().as_u16())) } } @@ -16,7 +22,7 @@ pub async fn has_joined(server_id: &str, username: &str) -> Result { #[tokio::test] async fn test_has_joined() { let result = has_joined("0f8fef917f1f62b963804d822b67fe6f59aad7d", "test").await.unwrap(); - assert_eq!(result, false) + assert_eq!(result, None) } // #[cfg(test)] diff --git a/note.txt b/note.txt new file mode 100644 index 0000000..366b183 --- /dev/null +++ b/note.txt @@ -0,0 +1,20 @@ +Коды ошибок WebSocket из Figura +1000 Normal Closure +1001 Going Away +1002 Protocol Error +1003 Unsupported Data +1005 No Status Received +1006 Abnormal Closure +1007 Invalid Frame Payload Data +1008 Policy Violation +1009 Message Too Big +1010 Mandatory Ext. +1011 Internal Error +1012 Service Restart +1013 Try Again Later +1014 Bad Gateway +1015 TLS Handshake +3000 Unauthorized +4000 Re-Auth +4001 Banned +4002 Too Many Connections \ No newline at end of file diff --git a/src/auth.rs b/src/auth.rs index e6eb3bc..1b532e7 100644 --- a/src/auth.rs +++ b/src/auth.rs @@ -1,6 +1,8 @@ -use axum::{extract::{Query, State}, routing::get, Router, debug_handler}; +use axum::{async_trait, debug_handler, extract::{FromRequestParts, Query, State}, http::{request::Parts, StatusCode}, response::{IntoResponse, Response}, routing::get, Router}; +use log::debug; use serde::Deserialize; use ring::digest::{self, digest}; +use crate::utils::*; use crate::AppState; @@ -10,16 +12,18 @@ pub fn router() -> Router { .route("/verify", get(verify)) } + +// Веб функции #[derive(Deserialize)] struct Id {username: String} #[debug_handler] -async fn id( +async fn id( // 1 этап аутентификации Query(query): Query, State(state): State, ) -> String { let server_id = bytes_into_string(&digest(&digest::SHA1_FOR_LEGACY_USE_ONLY, &rand()).as_ref()[0 .. 20]); - let state = state.pending.lock().expect("Mutex poisoned!"); + let state = state.pending.lock().await; state.insert(server_id.clone(), query.username); server_id } @@ -28,36 +32,67 @@ async fn id( struct Verify {id: String} #[debug_handler] -async fn verify( +async fn verify( // 2 этап аутентификации Query(query): Query, State(state): State, ) -> String { let server_id = query.id.clone(); - let username = state.pending.lock().expect("Mutex poisoned!").remove(&server_id).unwrap().1; - if !elyby_api::has_joined(&server_id, &username).await.unwrap() { + let username = state.pending.lock().await.remove(&server_id).unwrap().1; + if let Some(uuid) = elyby_api::has_joined(&server_id, &username).await.unwrap() { + let authenticated = state.authenticated.lock().await; + let link = state.authenticated_link.lock().await; + authenticated.insert(server_id.clone(), crate::Userinfo { username, uuid }); + link.insert(uuid, crate::AuthenticatedLink(server_id.clone())); + return format!("{server_id}") + } else { return String::from("failed to verify") } - let authenticated = state.authenticated.lock().expect("Mutex poisoned!"); - authenticated.insert(server_id.clone(), username); - format!("{server_id}") } -fn rand() -> [u8; 50] { - use rand::{Rng, thread_rng}; - let mut rng = thread_rng(); - let distr = rand::distributions::Uniform::new_inclusive(0, 255); - let mut nums: [u8; 50] = [0u8; 50]; - for x in &mut nums { - *x = rng.sample(distr); +pub async fn status( + Token(token): Token, + State(state): State, +) -> Response { + match token { + Some(token) => { + if state.authenticated.lock().await.contains_key(&token) { + // format!("ok") // 200 + (StatusCode::OK, format!("ok")).into_response() + } else { + // format!("unauthorized") // 401 + (StatusCode::UNAUTHORIZED, format!("unauthorized")).into_response() + } + }, + None => { + // format!("bad request") // 400 + (StatusCode::BAD_REQUEST, format!("bad request")).into_response() + }, } - nums } +// Конец веб функций -pub fn bytes_into_string(code: &[u8]) -> String { - use std::fmt::Write; - let mut result = String::new(); - for byte in code { - write!(result, "{:02x}", byte).unwrap(); + +// Это экстрактор достающий из Заголовка зовущегося токен, соответственно ТОКЕН. +#[derive(PartialEq, Debug)] +pub struct Token(pub Option); + +#[async_trait] +impl FromRequestParts for Token +where + S: Send + Sync, +{ + type Rejection = StatusCode; + + async fn from_request_parts(parts: &mut Parts, _: &S) -> Result { + let token = parts + .headers + .get("token") + .and_then(|value| value.to_str().ok()); + debug!("[Extractor Token] Данные: {token:?}"); + match token { + Some(token) => Ok(Self(Some(token.to_string()))), + None => Ok(Self(None)), + } } - result -} \ No newline at end of file +} +// Конец экстрактора \ No newline at end of file diff --git a/src/info.rs b/src/info.rs new file mode 100644 index 0000000..8fd1c49 --- /dev/null +++ b/src/info.rs @@ -0,0 +1,30 @@ +use axum::Json; +use serde_json::{json, Value}; + + +pub async fn version() -> Json { + Json(json!({ + "release": "1.7.1", + "prerelease": "1.7.2" + })) +} + +pub async fn limits() -> Json { + Json(json!({ + "rate": { + "pingSize": 1024, + "pingRate": 32, // TODO: Проверить + "equip": 1, + "download": 50, + "upload": 1 + }, + "limits": { + "maxAvatarSize": 100000, + "maxAvatars": 10, + "allowedBadges": { + "special": [0,0,0,0,0,0], + "pride": [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + } + } + })) +} diff --git a/src/main.rs b/src/main.rs index d45af6b..f3cb359 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,33 +1,53 @@ use anyhow::Result; use axum::{ - extract::Path, - routing::{delete, get, post, put}, - Router, + middleware::from_extractor, routing::{delete, get, post, put}, Router }; use chrono::prelude::*; use dashmap::DashMap; use fern::colors::{Color, ColoredLevelConfig}; use log::info; -use std::sync::{Arc, Mutex}; +use uuid::Uuid; +use std::sync::Arc; +use tokio::sync::{broadcast, Mutex}; use tower_http::trace::TraceLayer; // WebSocket worker mod ws; use ws::handler; -// API +// API: Auth mod auth; use auth as api_auth; +// API: Server info +mod info; +use info as api_info; + +// API: Profile +mod profile; +use profile as api_profile; + +// Utils +mod utils; + #[derive(Debug, Clone)] pub struct Userinfo { - id: usize + username: String, + uuid: Uuid, } +#[derive(Debug, Clone)] +pub struct AuthenticatedLink(String); + #[derive(Debug, Clone)] pub struct AppState { - authenticated: Arc>>, // - pending: Arc>> + // Пользователи с незаконченной аутентификацией + pending: Arc>>, // + // Аутентифицированные пользователи + authenticated: Arc>>, // NOTE: В будущем попробовать в отдельной ветке LockRw + authenticated_link: Arc>>, // Получаем токен из Uuid + // Трансляции Ping'ов для WebSocket соединений + broadcasts: Arc>>>>, } #[tokio::main] @@ -54,13 +74,17 @@ async fn main() -> Result<()> { .chain(fern::log_file("output.log")?) .apply()?; - // Config init here + // Конфиг + // TODO: Сделать Config.toml для установки настроек сервера let listen = "0.0.0.0:6665"; - // State init here + // Состояние + // TODO: Сделать usersStorage.toml как "временная" замена базе данных. let state = AppState { + pending: Arc::new(Mutex::new(DashMap::new())), authenticated: Arc::new(Mutex::new(DashMap::new())), - pending: Arc::new(Mutex::new(DashMap::new())) + authenticated_link: Arc::new(Mutex::new(DashMap::new())), + broadcasts: Arc::new(Mutex::new(DashMap::new())), }; let api = Router::new() @@ -70,11 +94,11 @@ async fn main() -> Result<()> { ) // check Auth; return 200 OK if token valid .route( "/limits", - get(|| async { "@toomanylimits" }) + get(api_info::limits) ) // Need more info :( TODO: .route( "/version", - get(|| async { "{\"release\":\"2.7.1\",\"prerelease\":\"2.7.1\"}" }), + get(api_info::version), ) .route( "/motd", @@ -82,28 +106,32 @@ async fn main() -> Result<()> { ) .route( "/equip", - post(|| async { "Do it! NOW!" }) - ) // set Equipped; TODO: + post(api_profile::equip_avatar) + ) .route( - "/:owner/:id", - get(|Path((owner, id)): Path<(String, String)>| async move { - format!("getting user {id}, owner {owner}") - }), - ) // get Avatar + "/:uuid", + get(api_profile::user_info), + ) .route( - "/:avatar", - put(|Path(avatar): Path| async move { format!("put {avatar}") }), - ) // put Avatar + "/:uuid/avatar", + get(api_profile::download_avatar), + ) .route( - "/:avatar", - delete(|Path(avatar): Path| async move { format!("delete {avatar}") }), + "/avatar", + put(api_profile::upload_avatar), + ) + .route( + "/avatar", + delete(api_profile::delete_avatar), ); // delete Avatar let app = Router::new() .nest("/api", api) + .route("/api/", get(api_auth::status)) .route("/ws", get(handler)) - .layer(TraceLayer::new_for_http().on_request(())) - .with_state(state); + .route_layer(from_extractor::()) + .with_state(state) + .layer(TraceLayer::new_for_http().on_request(())); let listener = tokio::net::TcpListener::bind(listen).await?; info!("Listening on {}", listener.local_addr()?); diff --git a/src/profile.rs b/src/profile.rs new file mode 100644 index 0000000..41f43e3 --- /dev/null +++ b/src/profile.rs @@ -0,0 +1,117 @@ +use anyhow_http::{http_error_ret, response::Result}; +use axum::{body::Bytes, debug_handler, extract::{Path, State}, Json}; +use serde_json::{json, Value}; +use tokio::{fs, io::{AsyncReadExt, BufWriter, self}}; +use uuid::Uuid; + +use crate::{utils::{calculate_file_sha256, format_uuid}, auth::Token, AppState}; + +#[debug_handler] +pub async fn user_info( + Path(uuid): Path, + State(_state): State, // FIXME: Variable doesn't using! +) -> Json { + log::info!("Получение информации для {}",uuid); + + let formatted_uuid = format_uuid(uuid); + + let avatar_file = format!("avatars/{}.moon", formatted_uuid); + + let mut user_info_response = json!({ + "uuid": &formatted_uuid, + "rank": "default", + "equipped": [], + "lastUsed": "2024-05-11T22:20:48.884Z", + "equippedBadges": { + "special": [1,1,1,1,1,1], + "pride": [0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] + }, + "version": "0.1.4+1.20.1", + "banned": false + }); + + if fs::metadata(&avatar_file).await.is_ok() { + if let Some(equipped) = user_info_response.get_mut("equipped").and_then(Value::as_array_mut){ + match calculate_file_sha256(&avatar_file){ + Ok(hash) => { + equipped.push(json!({ + "id": "avatar", + "owner": &formatted_uuid, + "hash": hash + })) + } + Err(_e) => {} + } + + + } + } + Json(user_info_response) +} + +#[debug_handler] +pub async fn download_avatar( + Path(uuid): Path, +) -> Result> { + let uuid = format_uuid(uuid); + log::info!("Запрашиваем аватар: {}", uuid); + let mut file = if let Ok(file) = fs::File::open(format!("avatars/{}.moon", uuid)).await { + file + } else { + http_error_ret!(NOT_FOUND, "Ошибка! Данный аватар не существует!"); + }; + let mut buffer = Vec::new(); + file.read_to_end(&mut buffer).await?; + //match Body::from_file("avatars/74cf2ba3-f346-4dfe-b3b5-f453b9f5cc5e.moon").await { + // match Body::from_file(format!("avatars/{}.moon",uuid)).await { + // Ok(body) => Ok(Response::builder(StatusCode::Ok).body(body).build()), + // Err(e) => Err(e.into()), + // } + Ok(buffer) +} + +#[debug_handler] +pub async fn upload_avatar( + Token(token): Token, + State(state): State, + body: Bytes, +) -> Result { + + let request_data = body; + + let token = match token { + Some(t) => t, + None => http_error_ret!(UNAUTHORIZED, "Ошибка аутентификации!"), + }; + let userinfos = state.authenticated.lock().await; + + if let Some(user_info) = userinfos.get(token.as_str()) { + log::info!("{} ({}) пытается загрузить аватар",user_info.uuid,user_info.username); + let avatar_file = format!("avatars/{}.moon",user_info.uuid); + let mut file = BufWriter::new(fs::File::create(&avatar_file).await?); + io::copy(&mut request_data.as_ref(), &mut file).await?; + } + Ok(format!("ok")) +} + +pub async fn equip_avatar() -> String { + format!("ok") +} + +pub async fn delete_avatar( + Token(token): Token, + State(state): State, +) -> Result { + let token = match token { + Some(t) => t, + None => http_error_ret!(UNAUTHORIZED, "Ошибка аутентификации!"), + }; + let userinfos = state.authenticated.lock().await; + if let Some(user_info) = userinfos.get(token.as_str()) { + log::info!("{} ({}) пытается удалить аватар",user_info.uuid,user_info.username); + let avatar_file = format!("avatars/{}.moon",user_info.uuid); + fs::remove_file(avatar_file).await?; + } + // let avatar_file = format!("avatars/{}.moon",user_info.uuid); + Ok(format!("ok")) +} \ No newline at end of file diff --git a/src/utils.rs b/src/utils.rs new file mode 100644 index 0000000..a7fc7c1 --- /dev/null +++ b/src/utils.rs @@ -0,0 +1,59 @@ +use std::{fs::File, io::Read}; + +use rand::{distributions::Alphanumeric, thread_rng, Rng}; +use ring::digest::{self, digest}; +use uuid::Uuid; +use base64::prelude::*; + + +// Кор функции +pub fn rand() -> [u8; 50] { + let mut rng = thread_rng(); + let distr = rand::distributions::Uniform::new_inclusive(0, 255); + let mut nums: [u8; 50] = [0u8; 50]; + for x in &mut nums { + *x = rng.sample(distr); + } + nums +} + +pub fn bytes_into_string(code: &[u8]) -> String { + code.iter().map(|byte| format!("{:02x}", byte)).collect::() +} +// Конец кор функций + + +pub fn _generate_hex_string(length: usize) -> String { // FIXME: Variable doesn't using! + let rng = thread_rng(); + let random_bytes: Vec = rng + .sample_iter(&Alphanumeric) + .take(length / 2) + .collect(); + + hex::encode(random_bytes) +} + +pub fn format_uuid(uuid: Uuid) -> String { + // let uuid = Uuid::parse_str(&uuid)?; TODO: Вероятно format_uuid стоит убрать + // .map_err(|_| tide::Error::from_str(StatusCode::InternalServerError, "Failed to parse UUID"))?; + uuid.as_hyphenated().to_string() +} + +pub fn calculate_file_sha256(file_path: &str) -> Result { + // Read the file content + let mut file = File::open(file_path)?; + let mut content = Vec::new(); + file.read_to_end(&mut content)?; + + // Convert the content to base64 + let base64_content = BASE64_STANDARD.encode(&content); + + // Calculate the SHA-256 hash of the base64 string + let binding = digest(&digest::SHA256, base64_content.as_bytes()); + let hash = binding.as_ref(); + + // Convert the hash to a hexadecimal string + let hex_hash = bytes_into_string(hash); + + Ok(hex_hash) +} \ No newline at end of file diff --git a/src/ws/c2s.rs b/src/ws/c2s.rs index 378e0c1..71fbc03 100644 --- a/src/ws/c2s.rs +++ b/src/ws/c2s.rs @@ -1,3 +1,5 @@ +use uuid::Uuid; + use super::MessageLoadError; use std::convert::{TryFrom, TryInto}; @@ -6,8 +8,8 @@ use std::convert::{TryFrom, TryInto}; pub enum C2SMessage<'a> { Token(&'a [u8]) = 0, Ping(u32, bool, &'a [u8]) = 1, - Sub(u128) = 2, // owo - Unsub(u128) = 3, + Sub(Uuid) = 2, // owo + Unsub(Uuid) = 3, } // 6 - 6 impl<'a> TryFrom<&'a [u8]> for C2SMessage<'a> { @@ -36,7 +38,7 @@ impl<'a> TryFrom<&'a [u8]> for C2SMessage<'a> { } 2 => { if buf.len() == 17 { - Ok(C2SMessage::Sub(u128::from_be_bytes( + Ok(C2SMessage::Sub(Uuid::from_bytes( (&buf[1..]).try_into().unwrap(), ))) } else { @@ -50,7 +52,7 @@ impl<'a> TryFrom<&'a [u8]> for C2SMessage<'a> { } 3 => { if buf.len() == 17 { - Ok(C2SMessage::Unsub(u128::from_be_bytes( + Ok(C2SMessage::Unsub(Uuid::from_bytes( (&buf[1..]).try_into().unwrap(), ))) } else { @@ -81,8 +83,23 @@ impl<'a> Into> for C2SMessage<'a> { .chain(iter::once(s.into())) .chain(d.into_iter().copied()) .collect(), - C2SMessage::Sub(s) => iter::once(2).chain(s.to_be_bytes()).collect(), - C2SMessage::Unsub(s) => iter::once(3).chain(s.to_be_bytes()).collect(), + C2SMessage::Sub(s) => iter::once(2).chain(s.into_bytes()).collect(), + C2SMessage::Unsub(s) => iter::once(3).chain(s.into_bytes()).collect(), + }; + a + } +} + +impl<'a> C2SMessage<'a> { + pub fn ping_data(self) -> Box<[u8]> { + use std::iter; + let a: Box<[u8]> = match self { + C2SMessage::Ping(p, s, d) => iter::empty() + .chain(p.to_be_bytes()) + .chain(iter::once(s.into())) + .chain(d.into_iter().copied()) + .collect(), + _ => todo!() // FIXME: Это всё нихеровых размеров костыль! }; a } diff --git a/src/ws/handler.rs b/src/ws/handler.rs index 607b35d..ec91507 100644 --- a/src/ws/handler.rs +++ b/src/ws/handler.rs @@ -1,51 +1,219 @@ -use axum::{extract::{ws::{Message, WebSocket}, WebSocketUpgrade}, response::Response}; -use log::{error, info, warn}; +use std::sync::Arc; -use crate::ws::C2SMessage; +use axum::{extract::{ws::{Message, WebSocket}, State, WebSocketUpgrade}, response::Response}; +use dashmap::DashMap; +use log::{debug, error, info, log, warn}; +use tokio::sync::{broadcast::{self, Receiver}, mpsc, Notify}; +use uuid::Uuid; -pub async fn handler(ws: WebSocketUpgrade) -> Response { - ws.on_upgrade(handle_socket) +use crate::{ws::{C2SMessage, S2CMessage}, AppState}; + +pub async fn handler( + ws: WebSocketUpgrade, + State(state): State, +) -> Response { + ws.on_upgrade(|socket| handle_socket(socket, state)) } -async fn handle_socket(mut socket: WebSocket) { - while let Some(msg) = socket.recv().await { - info!("{msg:?}"); - let mut msg = if let Ok(msg) = msg { - msg +#[derive(Debug, Clone)] +struct WSOwner(Option); + +#[derive(Debug, Clone)] +struct WSUser { + username: String, + token: String, + uuid: Uuid, +} + +impl WSOwner { + fn name(&self) -> String { + if let Some(user) = &self.0 { + format!(" ({})", user.username) } else { - // if reached here - client disconnected - warn!("ws disconnected!"); - return; - }; - // Work with code here - let msg_array = msg.clone().into_data(); - let msg_array = msg_array.as_slice(); - - let newmsg = match C2SMessage::try_from(msg_array) { - Ok(data) => data, - Err(e) => { - error!("MessageLoadError: {e:?}"); - return; - }, - }; - - match newmsg { - C2SMessage::Token(token) => { - // TODO: Authenticated check - msg = Message::Binary(vec![0]) - }, - // C2SMessage::Ping(_, _, _) => todo!(), - // C2SMessage::Sub(_) => todo!(), - // C2SMessage::Unsub(_) => todo!(), - _ => () - } - - info!("{newmsg:?}"); - - if socket.send(msg).await.is_err() { - // if reached here - client disconnected - warn!("ws disconnected!"); - return; + String::new() } } +} + +async fn handle_socket(mut socket: WebSocket, state: AppState) { + let mut owner = WSOwner(None); + let cutoff: DashMap> = DashMap::new(); + let (mtx, mut mrx) = mpsc::channel(64); + // let (bctx, mut _bcrx) = broadcast::channel(64); + let mut bctx: Option>> = None; + loop { + tokio::select! { + Some(msg) = socket.recv() => { + debug!("[WebSocket{}] Raw: {msg:?}", owner.name()); + let mut msg = if let Ok(msg) = msg { + if let Message::Close(_) = msg { + info!("[WebSocket{}] Соединение удачно закрыто!", owner.name()); + if let Some(u) = owner.0 { + remove_broadcast(state.broadcasts.clone(), u.uuid).await; + } + return; + } + msg + } else { + // если попали сюда, значит вероятнее всего клиент отключился + warn!("[WebSocket{}] Ошибка получения! Соединение разорвано!", owner.name()); + if let Some(u) = owner.0 { + remove_broadcast(state.broadcasts.clone(), u.uuid).await; + } + return; + }; + // Далее код для обработки msg + let msg_vec = msg.clone().into_data(); + let msg_array = msg_vec.as_slice(); + + let newmsg = match C2SMessage::try_from(msg_array) { + Ok(data) => data, + Err(e) => { + error!("[WebSocket{}] Это сообщение не от Figura! {e:?}", owner.name()); + if let Some(u) = owner.0 { + remove_broadcast(state.broadcasts.clone(), u.uuid).await; + } + return; + }, + }; + + info!("[WebSocket{}] Данные: {newmsg:?}", owner.name()); + + match newmsg { + C2SMessage::Token(token) => { // FIXME: Написать переменную спомощью которой бужет проверяться авторизовался ли пользователь или нет + info!("[WebSocket{}] Token", owner.name()); + let token = String::from_utf8(token.to_vec()).unwrap(); + let authenticated = state.authenticated.lock().await; + match authenticated.get(&token) { // Принцип прост: если токена в authenticated нет, значит это trash + Some(t) => { + //username = t.username.clone(); + owner.0 = Some(WSUser { username: t.username.clone(), token, uuid: t.uuid }); + msg = Message::Binary(S2CMessage::Auth.to_vec()); + let bcs = state.broadcasts.lock().await; + match bcs.get(&t.uuid) { + Some(tx) => { + bctx = Some(tx.to_owned()); + }, + None => { + let (tx, _rx) = broadcast::channel(64); + bcs.insert(t.uuid, tx.clone()); + bctx = Some(tx.to_owned()); + }, + }; + }, + None => { + warn!("[WebSocket] Ошибка авторизации! Соединение разорвано! {token}"); + if let Some(u) = owner.0 { + remove_broadcast(state.broadcasts.clone(), u.uuid).await; + } + return; // TODO: Прописать код отключения + }, + }; + }, + C2SMessage::Ping(_, _, _) => { + info!("[WebSocket{}] Ping", owner.name()); + let data = into_s2c_ping(msg_vec, owner.0.clone().unwrap().uuid); + info!("Im gotcha homie! {:?}", data); + match bctx.clone().unwrap().send(data) { + Ok(_) => (), + Err(_) => error!("[WebSocket{}] Не удалось отправить Пинг!", owner.name()), + }; + continue; + }, + C2SMessage::Sub(uuid) => { // FIXME: Исключить возможность использования SUB без авторизации + info!("[WebSocket{}] Sub", owner.name()); + // Отбрасываю Sub на самого себя + if uuid == owner.0.clone().unwrap().uuid { + continue; + }; + + let broadcast = state.broadcasts.lock().await; + let rx = match broadcast.get(&uuid) { + Some(rx) => rx.to_owned().subscribe(), + None => { + warn!("Внимание! Необходимый UUID для подписки не найден!"); + let (tx, rx) = broadcast::channel(64); + broadcast.insert(uuid, tx); + rx + }, + }; + // .to_owned().subscribe(); + let shutdown = Arc::new(Notify::new()); + tokio::spawn(subscribe(mtx.clone(), rx, shutdown.clone())); + cutoff.insert(uuid, shutdown); + continue; + }, + C2SMessage::Unsub(uuid) => { + info!("[WebSocket{}] Unsub", owner.name()); + // Отбрасываю Unsub на самого себя + if uuid == owner.0.clone().unwrap().uuid { + continue; + }; + let shutdown = cutoff.remove(&uuid).unwrap().1; + shutdown.notify_one(); + continue; + }, + // _ => continue + } + + // Отправка сообщения + warn!("[WebSocket{}] Отвечаю: {msg:?}", owner.name()); + if socket.send(msg).await.is_err() { + // если попали сюда, значит вероятнее всего клиент отключился + warn!("[WebSocket{}] Ошибка отправки! Соединение разорвано!", owner.name()); + if let Some(u) = owner.0 { + remove_broadcast(state.broadcasts.clone(), u.uuid).await; + } + return; + } + } + msg = mrx.recv() => { + match socket.send(Message::Binary(msg.clone().unwrap())).await { + Ok(_) => { + warn!("[WebSocketSubscribe{}] Отвечаю: {}", owner.name(), hex::encode(msg.unwrap())); + } + Err(_) => { + // если попали сюда, значит вероятнее всего клиент отключился + warn!("[WebSocketSubscriber{}] Ошибка отправки! Соединение разорвано!", owner.name()); + if let Some(u) = owner.0 { + remove_broadcast(state.broadcasts.clone(), u.uuid).await; + } + return; + } + } + } + } + } +} + +async fn subscribe(socket: mpsc::Sender>, mut rx: Receiver>, shutdown: Arc) { + loop { + tokio::select! { + _ = shutdown.notified() => { + debug!("Unsubscribing!"); + return; + } + msg = rx.recv() => { + socket.send(msg.unwrap()).await.unwrap(); + } + } + } +} + +fn into_s2c_ping(buf: Vec, uuid: Uuid) -> Vec { + use std::iter::once; + // let mut vec = Vec::new(); + // vec + //let uuid = uuid.as_u128(); + //let uuid = uuid.into_bytes(); + // info!("UUID {} UUID BE {}", hex::encode(uuid.into_bytes()), hex::encode(uuid128.to_be_bytes())); + let res: Vec = once(1).chain(uuid.into_bytes().iter().copied()).chain(buf.as_slice()[1..].iter().copied()).collect(); + debug!("Sending ping: {}", hex::encode(res.clone())); + res + // vec +} + +async fn remove_broadcast(broadcasts: Arc>>>>, uuid: Uuid) { + let map = broadcasts.lock().await; + map.remove(&uuid); } \ No newline at end of file diff --git a/src/ws/s2c.rs b/src/ws/s2c.rs index 17178e2..f59a440 100644 --- a/src/ws/s2c.rs +++ b/src/ws/s2c.rs @@ -1,3 +1,6 @@ +use log::debug; +use uuid::Uuid; + use super::MessageLoadError; use std::convert::{TryFrom, TryInto}; @@ -5,8 +8,8 @@ use std::convert::{TryFrom, TryInto}; #[derive(Debug, Clone, PartialEq, Eq)] pub enum S2CMessage<'a> { Auth = 0, - Ping(u128, u32, bool, &'a [u8]) = 1, - Event(u128) = 2, + Ping(Uuid, u32, bool, &'a [u8]) = 1, + Event(Uuid) = 2, // UUID Обновляет аватар других игроков Toast(u8, &'a str, Option<&'a str>) = 3, Chat(&'a str) = 4, Notice(u8) = 5, @@ -30,7 +33,7 @@ impl<'a> TryFrom<&'a [u8]> for S2CMessage<'a> { 1 => { if buf.len() >= 22 { Ok(Ping( - u128::from_be_bytes((&buf[1..17]).try_into().unwrap()), + Uuid::from_bytes((&buf[1..17]).try_into().unwrap()), u32::from_be_bytes((&buf[17..21]).try_into().unwrap()), buf[21] != 0, &buf[22..], @@ -41,7 +44,7 @@ impl<'a> TryFrom<&'a [u8]> for S2CMessage<'a> { } 2 => { if buf.len() == 17 { - Ok(Event(u128::from_be_bytes( + Ok(Event(Uuid::from_bytes( (&buf[1..17]).try_into().unwrap(), ))) } else { @@ -63,12 +66,12 @@ impl<'a> Into> for S2CMessage<'a> { match self { Auth => Box::new([0]), Ping(u, i, s, d) => once(1) - .chain(u.to_be_bytes().iter().copied()) + .chain(u.into_bytes().iter().copied()) .chain(i.to_be_bytes().iter().copied()) .chain(once(if s { 1 } else { 0 })) .chain(d.into_iter().copied()) .collect(), - Event(u) => once(2).chain(u.to_be_bytes().iter().copied()).collect(), + Event(u) => once(2).chain(u.into_bytes().iter().copied()).collect(), Toast(t, h, d) => once(3) .chain(once(t)) .chain(h.as_bytes().into_iter().copied()) @@ -82,4 +85,23 @@ impl<'a> Into> for S2CMessage<'a> { Notice(t) => Box::new([5, t]), } } +} + +impl<'a> S2CMessage<'a> { + pub fn to_s2c_ping(uuid: Uuid, buf: &'a [u8]) -> S2CMessage<'a> { + use S2CMessage::Ping; + debug!("!!! {buf:?}"); + Ping( + uuid, + u32::from_be_bytes((&buf[1..5]).try_into().unwrap()), + buf[5] != 0, // Ping может быть короче чем ожидалось + &buf[6..], + ) + } + pub fn to_array(self) -> Box<[u8]> { + >>::into(self) + } + pub fn to_vec(self) -> Vec { + self.to_array().to_vec() + } } \ No newline at end of file