mirror of
https://github.com/shiroyashik/sculptor.git
synced 2025-12-06 13:01:12 +03:00
Pre-release
I don't remember exactly what was done in this commit anymore, so don't be discourteous
This commit is contained in:
parent
b280da2742
commit
59440154c1
18 changed files with 1016 additions and 185 deletions
|
|
@ -2,7 +2,7 @@ use std::sync::Arc;
|
|||
|
||||
use axum::{extract::{ws::{Message, WebSocket}, State, WebSocketUpgrade}, response::Response};
|
||||
use dashmap::DashMap;
|
||||
use log::{debug, error, info, log, warn};
|
||||
use log::{debug, error, info, trace, warn};
|
||||
use tokio::sync::{broadcast::{self, Receiver}, mpsc, Notify};
|
||||
use uuid::Uuid;
|
||||
|
||||
|
|
@ -36,18 +36,18 @@ impl WSOwner {
|
|||
}
|
||||
|
||||
async fn handle_socket(mut socket: WebSocket, state: AppState) {
|
||||
debug!("[WebSocket] New unknown connection!");
|
||||
let mut owner = WSOwner(None);
|
||||
let cutoff: DashMap<Uuid, Arc<Notify>> = DashMap::new();
|
||||
let (mtx, mut mrx) = mpsc::channel(64);
|
||||
// let (bctx, mut _bcrx) = broadcast::channel(64);
|
||||
let mut bctx: Option<broadcast::Sender<Vec<u8>>> = None;
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(msg) = socket.recv() => {
|
||||
debug!("[WebSocket{}] Raw: {msg:?}", owner.name());
|
||||
trace!("[WebSocket{}] Raw: {msg:?}", owner.name());
|
||||
let mut msg = if let Ok(msg) = msg {
|
||||
if let Message::Close(_) = msg {
|
||||
info!("[WebSocket{}] Соединение удачно закрыто!", owner.name());
|
||||
info!("[WebSocket{}] Connection successfully closed!", owner.name());
|
||||
if let Some(u) = owner.0 {
|
||||
remove_broadcast(state.broadcasts.clone(), u.uuid).await;
|
||||
}
|
||||
|
|
@ -55,8 +55,7 @@ async fn handle_socket(mut socket: WebSocket, state: AppState) {
|
|||
}
|
||||
msg
|
||||
} else {
|
||||
// если попали сюда, значит вероятнее всего клиент отключился
|
||||
warn!("[WebSocket{}] Ошибка получения! Соединение разорвано!", owner.name());
|
||||
warn!("[WebSocket{}] Receive error! Connection terminated!", owner.name());
|
||||
if let Some(u) = owner.0 {
|
||||
remove_broadcast(state.broadcasts.clone(), u.uuid).await;
|
||||
}
|
||||
|
|
@ -69,7 +68,7 @@ async fn handle_socket(mut socket: WebSocket, state: AppState) {
|
|||
let newmsg = match C2SMessage::try_from(msg_array) {
|
||||
Ok(data) => data,
|
||||
Err(e) => {
|
||||
error!("[WebSocket{}] Это сообщение не от Figura! {e:?}", owner.name());
|
||||
error!("[WebSocket{}] This message is not from Figura! {e:?}", owner.name());
|
||||
if let Some(u) = owner.0 {
|
||||
remove_broadcast(state.broadcasts.clone(), u.uuid).await;
|
||||
}
|
||||
|
|
@ -77,11 +76,11 @@ async fn handle_socket(mut socket: WebSocket, state: AppState) {
|
|||
},
|
||||
};
|
||||
|
||||
info!("[WebSocket{}] Данные: {newmsg:?}", owner.name());
|
||||
debug!("[WebSocket{}] Raw: {newmsg:?}", owner.name());
|
||||
|
||||
match newmsg {
|
||||
C2SMessage::Token(token) => { // FIXME: Написать переменную спомощью которой бужет проверяться авторизовался ли пользователь или нет
|
||||
info!("[WebSocket{}] Token", owner.name());
|
||||
debug!("[WebSocket{}] C2S : Token", owner.name());
|
||||
let token = String::from_utf8(token.to_vec()).unwrap();
|
||||
let authenticated = state.authenticated.lock().await;
|
||||
match authenticated.get(&token) { // Принцип прост: если токена в authenticated нет, значит это trash
|
||||
|
|
@ -102,7 +101,8 @@ async fn handle_socket(mut socket: WebSocket, state: AppState) {
|
|||
};
|
||||
},
|
||||
None => {
|
||||
warn!("[WebSocket] Ошибка авторизации! Соединение разорвано! {token}");
|
||||
warn!("[WebSocket] Authenticaton error! Connection terminated!");
|
||||
debug!("[WebSocket] Tried to log in with {token}"); // Tried to log in with token: {token}
|
||||
if let Some(u) = owner.0 {
|
||||
remove_broadcast(state.broadcasts.clone(), u.uuid).await;
|
||||
}
|
||||
|
|
@ -111,17 +111,16 @@ async fn handle_socket(mut socket: WebSocket, state: AppState) {
|
|||
};
|
||||
},
|
||||
C2SMessage::Ping(_, _, _) => {
|
||||
info!("[WebSocket{}] Ping", owner.name());
|
||||
debug!("[WebSocket{}] C2S : Ping", owner.name());
|
||||
let data = into_s2c_ping(msg_vec, owner.0.clone().unwrap().uuid);
|
||||
info!("Im gotcha homie! {:?}", data);
|
||||
match bctx.clone().unwrap().send(data) {
|
||||
Ok(_) => (),
|
||||
Err(_) => error!("[WebSocket{}] Не удалось отправить Пинг!", owner.name()),
|
||||
Err(_) => warn!("[WebSocket{}] Failed to send Ping! Maybe there's no one to send", owner.name()),
|
||||
};
|
||||
continue;
|
||||
},
|
||||
C2SMessage::Sub(uuid) => { // FIXME: Исключить возможность использования SUB без авторизации
|
||||
info!("[WebSocket{}] Sub", owner.name());
|
||||
debug!("[WebSocket{}] C2S : Sub", owner.name());
|
||||
// Отбрасываю Sub на самого себя
|
||||
if uuid == owner.0.clone().unwrap().uuid {
|
||||
continue;
|
||||
|
|
@ -131,7 +130,7 @@ async fn handle_socket(mut socket: WebSocket, state: AppState) {
|
|||
let rx = match broadcast.get(&uuid) {
|
||||
Some(rx) => rx.to_owned().subscribe(),
|
||||
None => {
|
||||
warn!("Внимание! Необходимый UUID для подписки не найден!");
|
||||
warn!("[WebSocket{}] Attention! The required UUID for subscription was not found!", owner.name());
|
||||
let (tx, rx) = broadcast::channel(64);
|
||||
broadcast.insert(uuid, tx);
|
||||
rx
|
||||
|
|
@ -144,7 +143,7 @@ async fn handle_socket(mut socket: WebSocket, state: AppState) {
|
|||
continue;
|
||||
},
|
||||
C2SMessage::Unsub(uuid) => {
|
||||
info!("[WebSocket{}] Unsub", owner.name());
|
||||
debug!("[WebSocket{}] C2S : Unsub", owner.name());
|
||||
// Отбрасываю Unsub на самого себя
|
||||
if uuid == owner.0.clone().unwrap().uuid {
|
||||
continue;
|
||||
|
|
@ -153,14 +152,12 @@ async fn handle_socket(mut socket: WebSocket, state: AppState) {
|
|||
shutdown.notify_one();
|
||||
continue;
|
||||
},
|
||||
// _ => continue
|
||||
}
|
||||
|
||||
// Отправка сообщения
|
||||
warn!("[WebSocket{}] Отвечаю: {msg:?}", owner.name());
|
||||
// Sending message
|
||||
debug!("[WebSocket{}] Answering: {msg:?}", owner.name());
|
||||
if socket.send(msg).await.is_err() {
|
||||
// если попали сюда, значит вероятнее всего клиент отключился
|
||||
warn!("[WebSocket{}] Ошибка отправки! Соединение разорвано!", owner.name());
|
||||
warn!("[WebSocket{}] Send error! Connection terminated!", owner.name());
|
||||
if let Some(u) = owner.0 {
|
||||
remove_broadcast(state.broadcasts.clone(), u.uuid).await;
|
||||
}
|
||||
|
|
@ -170,11 +167,10 @@ async fn handle_socket(mut socket: WebSocket, state: AppState) {
|
|||
msg = mrx.recv() => {
|
||||
match socket.send(Message::Binary(msg.clone().unwrap())).await {
|
||||
Ok(_) => {
|
||||
warn!("[WebSocketSubscribe{}] Отвечаю: {}", owner.name(), hex::encode(msg.unwrap()));
|
||||
debug!("[WebSocketSubscribe{}] Answering: {}", owner.name(), hex::encode(msg.unwrap()));
|
||||
}
|
||||
Err(_) => {
|
||||
// если попали сюда, значит вероятнее всего клиент отключился
|
||||
warn!("[WebSocketSubscriber{}] Ошибка отправки! Соединение разорвано!", owner.name());
|
||||
warn!("[WebSocketSubscriber{}] Send error! Connection terminated!", owner.name());
|
||||
if let Some(u) = owner.0 {
|
||||
remove_broadcast(state.broadcasts.clone(), u.uuid).await;
|
||||
}
|
||||
|
|
@ -190,11 +186,14 @@ async fn subscribe(socket: mpsc::Sender<Vec<u8>>, mut rx: Receiver<Vec<u8>>, shu
|
|||
loop {
|
||||
tokio::select! {
|
||||
_ = shutdown.notified() => {
|
||||
debug!("Unsubscribing!");
|
||||
debug!("Shutdown SUB!");
|
||||
return;
|
||||
}
|
||||
msg = rx.recv() => {
|
||||
socket.send(msg.unwrap()).await.unwrap();
|
||||
if socket.send(msg.unwrap()).await.is_err() {
|
||||
error!("Forced shutdown SUB due error!");
|
||||
return;
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -202,15 +201,7 @@ async fn subscribe(socket: mpsc::Sender<Vec<u8>>, mut rx: Receiver<Vec<u8>>, shu
|
|||
|
||||
fn into_s2c_ping(buf: Vec<u8>, uuid: Uuid) -> Vec<u8> {
|
||||
use std::iter::once;
|
||||
// let mut vec = Vec::new();
|
||||
// vec
|
||||
//let uuid = uuid.as_u128();
|
||||
//let uuid = uuid.into_bytes();
|
||||
// info!("UUID {} UUID BE {}", hex::encode(uuid.into_bytes()), hex::encode(uuid128.to_be_bytes()));
|
||||
let res: Vec<u8> = once(1).chain(uuid.into_bytes().iter().copied()).chain(buf.as_slice()[1..].iter().copied()).collect();
|
||||
debug!("Sending ping: {}", hex::encode(res.clone()));
|
||||
res
|
||||
// vec
|
||||
once(1).chain(uuid.into_bytes().iter().copied()).chain(buf.as_slice()[1..].iter().copied()).collect()
|
||||
}
|
||||
|
||||
async fn remove_broadcast(broadcasts: Arc<tokio::sync::Mutex<DashMap<Uuid, broadcast::Sender<Vec<u8>>>>>, uuid: Uuid) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue