Add Prometheus metrics support and update Rust to edition 2024 and version 1.85.0
Some checks failed
Push Dev / docker (push) Has been cancelled

This commit is contained in:
Shiroyasha 2025-02-26 01:14:01 +03:00
parent bac1203df8
commit 45aa79da6d
Signed by: shiroyashik
GPG key ID: E4953D3940D7860A
14 changed files with 306 additions and 128 deletions

View file

@ -1,7 +1,7 @@
use axum::{debug_handler, extract::{Query, State}, response::{IntoResponse, Response}, routing::get, Router};
use reqwest::StatusCode;
use axum::{extract::{Query, State}, http::HeaderMap, response::{IntoResponse, Response}, routing::get, Router};
use reqwest::{header::USER_AGENT, StatusCode};
use ring::digest::{self, digest};
use tracing::{error, info};
use tracing::{error, info, instrument};
use crate::{auth::{has_joined, Userinfo}, utils::rand, AppState};
use super::types::auth::*;
@ -12,7 +12,6 @@ pub fn router() -> Router<AppState> {
.route("/verify", get(verify))
}
#[debug_handler]
async fn id(
// First stage of authentication
Query(query): Query<Id>,
@ -25,10 +24,11 @@ async fn id(
server_id
}
#[debug_handler]
#[instrument(skip_all)]
async fn verify(
// Second stage of authentication
Query(query): Query<Verify>,
header: HeaderMap,
State(state): State<AppState>,
) -> Response {
let server_id = query.id.clone();
@ -47,17 +47,23 @@ async fn verify(
if let Some((uuid, auth_provider)) = userinfo {
let umanager = state.user_manager;
if umanager.is_banned(&uuid) {
info!("[Authentication] {nickname} tried to log in, but was banned");
info!("{nickname} tried to log in, but was banned");
return (StatusCode::BAD_REQUEST, "You're banned!".to_string()).into_response();
}
info!("[Authentication] {nickname} logged in using {}", auth_provider.name);
let userinfo = Userinfo {
let mut userinfo = Userinfo {
nickname,
uuid,
token: Some(server_id.clone()),
auth_provider,
..Default::default()
};
if let Some(agent) = header.get(USER_AGENT) {
if let Ok(agent) = agent.to_str() {
userinfo.version = agent.to_string();
}
}
info!("{} logged in using {} with {}", userinfo.nickname, userinfo.auth_provider.name, userinfo.version);
match umanager.insert(uuid, server_id.clone(), userinfo.clone()) {
Ok(_) => {},
Err(_) => {
@ -70,7 +76,7 @@ async fn verify(
}
(StatusCode::OK, server_id.to_string()).into_response()
} else {
info!("[Authentication] failed to verify {nickname}");
info!("failed to verify {nickname}");
(StatusCode::BAD_REQUEST, "failed to verify".to_string()).into_response()
}
}

View file

@ -15,6 +15,7 @@ pub async fn initial(
ws.on_upgrade(|socket| handle_socket(socket, state))
}
#[instrument(skip_all)]
async fn handle_socket(mut ws: WebSocket, state: AppState) {
// Trying authenticate & get user data or dropping connection
match authenticate(&mut ws, &state).await {
@ -44,7 +45,7 @@ async fn handle_socket(mut ws: WebSocket, state: AppState) {
// Starting main worker
if let Err(kind) = main_worker(&mut session, &mut ws, &state).await {
tracing::error!("[WebSocket] Main worker halted due to {}.", kind)
tracing::info!(error = %kind, nickname = %session.user.nickname, "Main worker exited");
}
for (_, handle) in session.sub_workers_aborthandles {
@ -56,7 +57,7 @@ async fn handle_socket(mut ws: WebSocket, state: AppState) {
state.user_manager.remove(&user.uuid);
},
Err(kind) => {
tracing::info!("[WebSocket] Can't authenticate: {}", kind);
tracing::info!(error = %kind, "Can't authenticate");
}
}
@ -64,7 +65,7 @@ async fn handle_socket(mut ws: WebSocket, state: AppState) {
if let Err(kind) = ws.send(Message::Close(None)).await { tracing::trace!("[WebSocket] Closing fault: {}", kind) }
}
#[instrument(skip_all, fields(nickname = %session.user.nickname))]
#[instrument(skip_all, parent = None, fields(nickname = %session.user.nickname))]
async fn main_worker(session: &mut WSSession, ws: &mut WebSocket, state: &AppState) -> anyhow::Result<()> {
tracing::debug!("WebSocket control for {} is transferred to the main worker", session.user.nickname);
loop {

View file

@ -89,6 +89,16 @@ impl From<C2SMessage> for Vec<u8> {
a
}
}
impl C2SMessage {
pub fn name(&self) -> &'static str {
match self {
C2SMessage::Token(_) => "c2s>token",
C2SMessage::Ping(_, _, _) => "c2s>ping",
C2SMessage::Sub(_) => "c2s>sub",
C2SMessage::Unsub(_) => "c2s>unsub",
}
}
}
// impl<'a> C2SMessage<'a> {
// pub fn to_array(&self) -> Box<[u8]> {

View file

@ -3,6 +3,8 @@ mod s2c;
mod errors;
mod session;
use std::time::Instant;
pub use session::*;
pub use errors::*;
pub use c2s::*;
@ -10,6 +12,8 @@ pub use s2c::*;
use axum::extract::ws::{Message, WebSocket};
use crate::{PINGS, PINGS_ERROR};
pub trait RecvAndDecode {
async fn recv_and_decode(&mut self) -> Result<C2SMessage, RADError>;
}
@ -21,9 +25,17 @@ impl RecvAndDecode for WebSocket {
if let Message::Close(frame) = msg {
return Err(RADError::Close(frame.map(|f| format!("code: {}, reason: {}", f.code, f.reason))));
}
let start = Instant::now();
let data = msg.into_data();
C2SMessage::try_from(data.as_ref())
.map_err(|e| RADError::DecodeError(e, faster_hex::hex_string(&data)))
let msg = C2SMessage::try_from(data.as_ref())
.map_err(|e| { PINGS_ERROR.inc(); RADError::DecodeError(e, faster_hex::hex_string(&data)) });
let latency = start.elapsed().as_secs_f64();
PINGS
.with_label_values(&[msg.as_ref().map(|m| m.name()).unwrap_or("error")])
.observe(latency);
msg
}
}

View file

@ -85,6 +85,18 @@ impl From<S2CMessage> for Vec<u8> {
}
}
}
impl S2CMessage {
pub fn name(&self) -> &'static str {
match self {
S2CMessage::Auth => "s2c>auth",
S2CMessage::Ping(_, _, _, _) => "s2c>ping",
S2CMessage::Event(_) => "s2c>event",
S2CMessage::Toast(_, _, _) => "s2c>toast",
S2CMessage::Chat(_) => "s2c>chat",
S2CMessage::Notice(_) => "s2c>notice",
}
}
}
// impl<'a> S2CMessage<'a> {
// pub fn to_array(&self) -> Box<[u8]> {

View file

@ -2,10 +2,11 @@ use axum::{
extract::{Path, State},
Json
};
use dashmap::DashMap;
use tracing::{debug, info};
use uuid::Uuid;
use crate::{api::errors::internal_and_log, auth::{Token, Userinfo}, ApiResult, AppState};
use crate::{auth::{Token, Userinfo}, ApiResult, AppState};
pub(super) async fn create_user(
Token(token): Token,
@ -50,17 +51,17 @@ pub(super) async fn unban(
pub(super) async fn list(
Token(token): Token,
State(state): State<AppState>,
) -> ApiResult<String> {
) -> ApiResult<Json<DashMap<Uuid, Userinfo>>> {
state.config.read().await.clone().verify_token(&token)?;
serde_json::to_string_pretty(&state.user_manager.get_all_registered()).map_err(|err| { internal_and_log(err) })
Ok(Json(state.user_manager.get_all_registered()))
}
pub(super) async fn list_sessions(
Token(token): Token,
State(state): State<AppState>,
) -> ApiResult<String> {
) -> ApiResult<Json<DashMap<String, Uuid>>> {
state.config.read().await.clone().verify_token(&token)?;
serde_json::to_string_pretty(&state.user_manager.get_all_authenticated()).map_err(|err| { internal_and_log(err) })
Ok(Json(state.user_manager.get_all_authenticated()))
}

View file

@ -182,6 +182,12 @@ pub struct UManager {
registered: Arc<DashMap<Uuid, Userinfo>>,
}
impl Default for UManager {
fn default() -> Self {
Self::new()
}
}
impl UManager {
pub fn new() -> Self {
Self {
@ -229,6 +235,7 @@ impl UManager {
if userinfo.rank != Userinfo::default().rank { exist.rank = userinfo.rank };
if userinfo.token.is_some() { exist.token = userinfo.token };
if userinfo.version != Userinfo::default().version { exist.version = userinfo.version };
exist.last_used = userinfo.last_used;
}).or_insert(usercopy);
}
pub fn get(
@ -274,7 +281,6 @@ impl UManager {
}
// End of User manager
#[axum::debug_handler]
#[instrument(skip_all)]
pub async fn check_auth(
token: Option<Token>,

View file

@ -6,10 +6,9 @@ use axum::{
use dashmap::DashMap;
use tracing_panic::panic_hook;
use tracing_subscriber::{fmt::{self, time::ChronoLocal}, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
use std::{path::PathBuf, sync::Arc, env::var};
use std::{env::var, path::PathBuf, sync::{Arc, LazyLock}};
use tokio::{fs, sync::RwLock, time::Instant};
use tower_http::trace::TraceLayer;
use lazy_static::lazy_static;
// Consts
mod consts;
@ -18,12 +17,13 @@ pub use consts::*;
// Errors
pub use api::errors::{ApiResult, ApiError};
// Metrics
mod metrics;
pub use metrics::*;
// API
mod api;
use api::{
figura::{ws, info as api_info, profile as api_profile, auth as api_auth, assets as api_assets},
// v1::{},
};
use api::figura::{ws, info as api_info, profile as api_profile, auth as api_auth, assets as api_assets};
// Auth
mod auth;
@ -37,23 +37,21 @@ use state::{Config, AppState};
mod utils;
use utils::*;
lazy_static! {
pub static ref LOGGER_VAR: String = {
var(LOGGER_ENV).unwrap_or(String::from("info"))
};
pub static ref CONFIG_VAR: String = {
var(CONFIG_ENV).unwrap_or(String::from("Config.toml"))
};
pub static ref LOGS_VAR: String = {
var(LOGS_ENV).unwrap_or(String::from("logs"))
};
pub static ref ASSETS_VAR: String = {
var(ASSETS_ENV).unwrap_or(String::from("data/assets"))
};
pub static ref AVATARS_VAR: String = {
var(AVATARS_ENV).unwrap_or(String::from("data/avatars"))
};
}
pub static LOGGER_VAR: LazyLock<String> = LazyLock::new(|| {
var(LOGGER_ENV).unwrap_or(String::from("info"))
});
pub static CONFIG_VAR: LazyLock<String> = LazyLock::new(|| {
var(CONFIG_ENV).unwrap_or(String::from("Config.toml"))
});
pub static LOGS_VAR: LazyLock<String> = LazyLock::new(|| {
var(LOGS_ENV).unwrap_or(String::from("logs"))
});
pub static ASSETS_VAR: LazyLock<String> = LazyLock::new(|| {
var(ASSETS_ENV).unwrap_or(String::from("data/assets"))
});
pub static AVATARS_VAR: LazyLock<String> = LazyLock::new(|| {
var(AVATARS_ENV).unwrap_or(String::from("data/avatars"))
});
#[tokio::main]
async fn main() -> Result<()> {
@ -128,11 +126,11 @@ async fn app() -> Result<bool> {
}
// Config
let config = Arc::new(RwLock::new(Config::parse(CONFIG_VAR.clone().into())));
let listen = config.read().await.listen.clone();
let limit = get_limit_as_bytes(config.read().await.limitations.max_avatar_size as usize);
let config = Config::parse(CONFIG_VAR.clone().into());
let listen = config.listen.clone();
let limit = get_limit_as_bytes(config.limitations.max_avatar_size as usize);
if config.read().await.assets_updater_enabled {
if config.assets_updater_enabled {
// Force update assets if folder or hash file doesn't exists.
if !(PathBuf::from(&*ASSETS_VAR).is_dir() && get_path_to_assets_hash().is_file()) {
tracing::debug!("Removing broken assets...");
@ -164,7 +162,7 @@ async fn app() -> Result<bool> {
session: Arc::new(DashMap::new()),
subscribes: Arc::new(DashMap::new()),
figura_versions: Arc::new(RwLock::new(None)),
config,
config: Arc::new(RwLock::new(config.clone())),
};
// Automatic update of configuration/ban list while the server is running
@ -175,7 +173,7 @@ async fn app() -> Result<bool> {
Arc::clone(&state.config)
));
// Blacklist auto update
if state.config.read().await.mc_folder.exists() {
if config.mc_folder.exists() {
tokio::spawn(update_bans_from_minecraft(
state.config.read().await.mc_folder.clone(),
Arc::clone(&state.user_manager),
@ -200,8 +198,19 @@ async fn app() -> Result<bool> {
.nest("/api", api)
.route("/api/", get(check_auth))
.route("/ws", get(ws))
.with_state(state)
.layer(TraceLayer::new_for_http().on_request(()))
.layer(TraceLayer::new_for_http()
// .on_request(|request: &axum::http::Request<_>, _span: &tracing::Span| {
// // only for developing purposes
// tracing::trace!(headers = ?request.headers(), "started processing request");
// })
.on_response(|response: &axum::http::Response<_>, latency: std::time::Duration, _span: &tracing::Span| {
tracing::trace!(latency = ?latency, status = ?response.status(), "finished processing request");
})
.on_request(())
)
.layer(axum::middleware::from_fn(track_metrics))
.merge(metrics::metrics_router(config.metrics_enabled))
.with_state(state)
.route("/health", get(|| async { "ok" }));
let listener = tokio::net::TcpListener::bind(listen).await?;

71
src/metrics.rs Normal file
View file

@ -0,0 +1,71 @@
use std::{sync::LazyLock, time::Instant};
use axum::{body::Body, extract::State, http::{Request, Response}, middleware::Next, routing::get, Router};
use prometheus::{proto::{Metric, MetricType}, register_histogram_vec, register_int_counter};
use reqwest::StatusCode;
use crate::state::AppState;
pub fn metrics_router(enabled: bool) -> Router<AppState> {
if !enabled { return Router::new(); }
tracing::info!("Metrics enabled! You can access them on /metrics");
Router::new()
.route("/metrics", get(metrics))
}
async fn metrics(State(state): State<AppState>) -> String {
let mut metric_families = prometheus::gather();
// Add new custom metrics
let players = {
let mut metric = prometheus::proto::Metric::default();
metric.set_gauge(prometheus::proto::Gauge::default());
metric.mut_gauge().set_value(state.session.len() as f64);
create_mf("players_count".to_string(), "Number of players".to_string(), MetricType::GAUGE, metric)
};
metric_families.push(players);
prometheus::TextEncoder::new()
.encode_to_string(&metric_families)
.unwrap()
}
#[inline]
fn create_mf(name: String, help: String, field_type: MetricType, metric: Metric) -> prometheus::proto::MetricFamily {
let mut mf = prometheus::proto::MetricFamily::default();
mf.set_name(name);
mf.set_help(help);
mf.set_field_type(field_type);
mf.mut_metric().push(metric);
mf
}
pub async fn track_metrics(req: Request<Body>, next: Next) -> Result<Response<Body>, StatusCode> {
let start = Instant::now();
let uri = req.uri().path().to_string();
// Call the next middleware or handler
let response = next.run(req).await;
let latency = start.elapsed().as_secs_f64();
REQUESTS
.with_label_values(&[&uri, response.status().as_str()])
.observe(latency);
Ok(response)
}
pub static PINGS_ERROR: LazyLock<prometheus::IntCounter> = LazyLock::new(|| {
register_int_counter!("pings_error", "Number of ping decoding errors").unwrap()
});
pub static REQUESTS: LazyLock<prometheus::HistogramVec> = LazyLock::new(|| {
register_histogram_vec!("requests_count", "Number of requests", &["uri", "code"], vec![0.025, 0.250, 0.500]).unwrap()
});
pub static PINGS: LazyLock<prometheus::HistogramVec> = LazyLock::new(|| {
register_histogram_vec!("pings_count", "Number of pings", &["type"], vec![0.000003, 0.00002, 0.0002]).unwrap()
});

View file

@ -10,6 +10,8 @@ use crate::auth::{default_authproviders, AuthProviders, Userinfo};
#[serde(rename_all = "camelCase")]
pub struct Config {
pub listen: String,
#[serde(default)]
pub metrics_enabled: bool,
pub token: Option<String>,
pub assets_updater_enabled: bool,
pub motd: CMotd,