workshop/main.py

291 lines
9.3 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import time
import asyncio
import aiosqlite
from quart import Quart, render_template, request, send_from_directory, Response
from datetime import datetime
from babel.dates import format_datetime
app = Quart(__name__, template_folder='frontend', static_folder='frontend')
DB_PATH = 'maps.db'
DATA = "/data"
GAME_MODES = {
"Classic": "Классический",
"Deathmatch": "Бой насмерть",
"Demolition": "Уничтожение объекта",
"Armsrace": "Гонка вооружений",
"Custom": "Пользовательский",
"Training": "Обучение",
"Co-op Strike": "Совместный налёт",
"Wingman": "Напарники",
"Flying Scoutsman": "Перелётные снайперы"
}
last_download_times = {}
DOWNLOAD_COOLDOWN = 10
async def get_maps(page=1, per_page=30):
async with aiosqlite.connect(DB_PATH) as conn:
cursor = await conn.cursor()
offset = (page - 1) * per_page
await cursor.execute('''
SELECT FilePath, Title, COALESCE(Stars, 0) as Stars, Description
FROM maps
ORDER BY DateTime DESC
LIMIT ? OFFSET ?
''', (per_page, offset))
maps = await cursor.fetchall()
return maps
def get_image_path(filepath):
image_path = os.path.join(DATA, filepath)
if not os.path.exists(image_path):
return "/images/image.jpg"
return f"/images/{filepath.split('/')[0]}/{filepath.split('/')[1]}/{filepath.split('/')[1]}.jpg"
def get_star_image(stars):
if stars is None or stars == 0:
return "/stars/0-star.png"
return f"/stars/{stars}-star.png"
@app.route('/images/<path:filename>')
async def serve_image(filename):
image_path = os.path.join(DATA, filename)
if os.path.exists(image_path):
return await send_from_directory(DATA, filename)
else:
default_image_path = os.path.join(DATA, 'image.jpg')
if os.path.exists(default_image_path):
return await send_from_directory(DATA, 'image.jpg')
return "Default image not found", 404
@app.route('/stars/<filename>')
async def serve_star_image(filename):
stars = os.path.join(DATA, 'stars')
star_path = os.path.join(stars, filename)
if os.path.exists(star_path):
return await send_from_directory(stars, filename)
else:
return "Star image not found", 404
@app.route('/download_bsp')
async def download_bsp():
user_ip = request.remote_addr
current_time = time.time()
last_time = last_download_times.get(user_ip, 0)
if current_time - last_time < DOWNLOAD_COOLDOWN:
wait_time = DOWNLOAD_COOLDOWN - (current_time - last_time)
return f"Please wait {int(wait_time)} seconds before downloading again.", 429
last_download_times[user_ip] = current_time
image_path = request.args.get('image_path')
if not image_path:
return "No image path provided", 400
image_folder = os.path.dirname(image_path.replace("/images/", ""))
bsp_filename = None
for file in os.listdir(os.path.join(DATA, image_folder)):
if file.endswith('.bsp'):
bsp_filename = file
break
if not bsp_filename:
return "No .bsp file found in the same directory", 404
file_path = os.path.join(DATA, image_folder, bsp_filename)
SPEED_LIMIT = 40 * 1024 * 1024 // 8
async def file_stream():
with open(file_path, 'rb') as f:
while chunk := f.read(SPEED_LIMIT):
yield chunk
await asyncio.sleep(1)
headers = {
"Content-Disposition": f"attachment; filename={bsp_filename}"
}
return Response(file_stream(), headers=headers, content_type='application/octet-stream')
@app.route('/main')
async def main_page():
image_url = request.args.get('image_url', 'default_image.jpg')
map_title = request.args.get('map_title', 'Default Map Title')
async with aiosqlite.connect(DB_PATH) as conn:
cursor = await conn.cursor()
await cursor.execute('''
SELECT GameMode, Tags, FilePath, DateTime, YoutubeLink, Description
FROM maps
WHERE Title = ?
''', (map_title,))
row = await cursor.fetchone()
game_mode = row[0] if row else None
tags = row[1] if row else None
file_path = row[2] if row else None
date_time = row[3] if row else None
youtube_link = row[4] if row else None
description = row[5] if row else "Нет описания"
if date_time:
dt = datetime.fromisoformat(date_time)
added_time = format_datetime(dt, format='d MMM yг., HH:mm', locale='ru_RU')
else:
added_time = 'Не найдено'
if game_mode:
game_modes = game_mode.split(', ')
game_modes = [GAME_MODES.get(mode, mode) for mode in game_modes]
game_mode = ', '.join(game_modes)
else:
game_mode = 'Не найден'
if tags:
tags_list = tags.split(', ')
tags_list = [GAME_MODES.get(tag, tag) for tag in tags_list]
tags = ', '.join(tags_list)
else:
tags = 'Не найдено'
if file_path:
file_size = os.path.getsize(os.path.join(DATA, file_path))
file_size_mb = file_size / (1024 * 1024)
file_size_display = f"{file_size_mb:.2f} MB"
else:
file_size_display = 'Не найден'
return await render_template(
'main.html',
image_url=image_url,
map_title=map_title,
game_mode=game_mode,
tags=tags,
file_size=file_size_display,
added_time=added_time,
youtube_link=youtube_link,
description=description
)
@app.route('/')
async def index():
page = int(request.args.get('page', 1))
selected_game_modes = request.args.getlist('game_modes')
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
search_title = request.args.get('search_title')
selected_stars = request.args.get('stars')
maps_data = await get_maps_filtered(page, selected_game_modes, start_date, end_date, search_title, selected_stars)
async with aiosqlite.connect(DB_PATH) as conn:
cursor = await conn.cursor()
query = 'SELECT COUNT(*) FROM maps WHERE 1=1'
params = []
if selected_game_modes:
placeholders = ', '.join('?' for _ in selected_game_modes)
query += f' AND GameMode IN ({placeholders})'
params.extend(selected_game_modes)
if start_date:
query += ' AND DateTime >= ?'
params.append(start_date)
if end_date:
query += ' AND DateTime <= ?'
params.append(end_date)
if search_title:
query += ' AND Title LIKE ?'
params.append(f'%{search_title}%')
if selected_stars:
query += ' AND Stars = ?'
params.append(selected_stars)
await cursor.execute(query, params)
total_maps = await cursor.fetchone()
total_maps = total_maps[0]
per_page = 30
total_pages = (total_maps + per_page - 1) // per_page
filters = '&'.join(
[f'game_modes={mode}' for mode in selected_game_modes] +
([f'start_date={start_date}'] if start_date else []) +
([f'end_date={end_date}'] if end_date else []) +
([f'search_title={search_title}'] if search_title else []) +
([f'stars={selected_stars}'] if selected_stars else [])
)
return await render_template(
'workshop.html',
maps_data=maps_data,
page=page,
total_pages=total_pages,
get_image_path=get_image_path,
get_star_image=get_star_image,
selected_game_modes=selected_game_modes,
filters=filters,
selected_stars=selected_stars
)
async def get_maps_filtered(page=1, selected_game_modes=None, start_date=None, end_date=None, search_title=None, selected_stars=None):
async with aiosqlite.connect(DB_PATH) as conn:
cursor = await conn.cursor()
offset = (page - 1) * 30
query = '''
SELECT FilePath, Title, COALESCE(Stars, 0) as Stars, Description
FROM maps
WHERE 1=1
'''
params = []
if selected_stars:
query += ' AND Stars = ?'
params.append(selected_stars)
if search_title:
query += ' AND Title LIKE ?'
params.append(f'%{search_title}%')
if selected_game_modes:
query += ' AND GameMode IN ({})'.format(','.join('?' for _ in selected_game_modes))
params.extend(selected_game_modes)
if start_date:
query += ' AND DateTime >= ?'
params.append(start_date)
if end_date:
query += ' AND DateTime <= ?'
params.append(end_date)
query += ' ORDER BY DateTime DESC LIMIT ? OFFSET ?'
params.extend([30, offset])
await cursor.execute(query, params)
maps = await cursor.fetchall()
return maps
if __name__ == '__main__':
import hypercorn.asyncio
from hypercorn.config import Config
config = Config()
config.bind = ["0.0.0.0:5000"]
hypercorn.asyncio.run(app, config)