1120 lines
44 KiB
Python
1120 lines
44 KiB
Python
import discord
|
|
from discord.ext import commands
|
|
import os
|
|
import asyncio
|
|
import logging
|
|
from dotenv import load_dotenv
|
|
import aiomysql
|
|
import json
|
|
from datetime import datetime
|
|
from typing import Optional, List, Dict
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
# Database configuration
|
|
DATABASE_URL = os.getenv('DATABASE_URL')
|
|
DB_HOST = os.getenv('DB_HOST')
|
|
DB_PORT = os.getenv('DB_PORT', '5432') # Default to PostgreSQL port
|
|
DB_NAME = os.getenv('DB_NAME')
|
|
DB_USER = os.getenv('DB_USER')
|
|
DB_PASSWORD = os.getenv('DB_PASSWORD')
|
|
|
|
# Build DATABASE_URL from individual components if not provided
|
|
if not DATABASE_URL and all([DB_HOST, DB_NAME, DB_USER, DB_PASSWORD]):
|
|
DATABASE_URL = f"mysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
|
|
print(f"📝 Built DATABASE_URL from individual environment variables")
|
|
|
|
# Parse MySQL connection details from DATABASE_URL
|
|
def parse_database_url(url):
|
|
"""Parse MySQL connection URL into components"""
|
|
if not url:
|
|
return None
|
|
|
|
# Remove mysql:// prefix
|
|
if url.startswith('mysql://'):
|
|
url = url[8:]
|
|
|
|
# Split user:pass@host:port/db
|
|
if '@' in url:
|
|
auth, host_db = url.split('@', 1)
|
|
if ':' in auth:
|
|
user, password = auth.split(':', 1)
|
|
else:
|
|
user, password = auth, ''
|
|
else:
|
|
return None
|
|
|
|
if '/' in host_db:
|
|
host_port, database = host_db.split('/', 1)
|
|
else:
|
|
return None
|
|
|
|
if ':' in host_port:
|
|
host, port = host_port.split(':', 1)
|
|
try:
|
|
port = int(port)
|
|
except ValueError:
|
|
port = 3306
|
|
else:
|
|
host, port = host_port, 3306
|
|
|
|
return {
|
|
'host': host,
|
|
'port': port,
|
|
'user': user,
|
|
'password': password,
|
|
'db': database
|
|
}
|
|
|
|
# Global database connection pool
|
|
db_pool = None
|
|
|
|
# Bot configuration
|
|
intents = discord.Intents.default()
|
|
intents.message_content = True
|
|
intents.guilds = True
|
|
intents.members = True
|
|
|
|
bot = commands.Bot(command_prefix='!', intents=intents)
|
|
|
|
@bot.event
|
|
async def on_ready():
|
|
"""Event triggered when the bot is ready"""
|
|
print(f'{bot.user} is online and ready!')
|
|
print(f'Bot ID: {bot.user.id}')
|
|
print(f'Discord.py Version: {discord.__version__}')
|
|
print('------')
|
|
|
|
# Initialize database
|
|
await init_database()
|
|
|
|
# Set bot status
|
|
await bot.change_presence(
|
|
activity=discord.Game(name="Hearts of Iron IV ELO"),
|
|
status=discord.Status.online
|
|
)
|
|
|
|
# Sync hybrid commands on startup
|
|
try:
|
|
synced = await bot.tree.sync()
|
|
print(f'Synced {len(synced)} hybrid commands')
|
|
except Exception as e:
|
|
print(f'Failed to sync commands: {e}')
|
|
|
|
@bot.event
|
|
async def on_guild_join(guild):
|
|
"""Event triggered when the bot joins a server"""
|
|
print(f'Bot joined server "{guild.name}" (ID: {guild.id})')
|
|
|
|
@bot.event
|
|
async def on_guild_remove(guild):
|
|
"""Event triggered when the bot leaves a server"""
|
|
print(f'Bot left server "{guild.name}" (ID: {guild.id})')
|
|
|
|
# Owner Configuration
|
|
OWNER_ID = 253922739709018114
|
|
|
|
# ELO Configuration
|
|
STARTING_ELO = 800
|
|
K_FACTOR = 32
|
|
T_LEVEL_MULTIPLIERS = {
|
|
1: 0.8, # T1 countries get less points
|
|
2: 1.0, # T2 countries get normal points
|
|
3: 1.2 # T3 countries get more points
|
|
}
|
|
|
|
# Emoji and formatting helpers
|
|
def _all_accessible_emojis(ctx: commands.Context) -> List[discord.Emoji]:
|
|
"""Return emojis from current guild plus all guilds the bot is in."""
|
|
try:
|
|
guild_emojis = list(ctx.guild.emojis) if ctx.guild else []
|
|
except Exception:
|
|
guild_emojis = []
|
|
try:
|
|
bot_emojis = list(bot.emojis)
|
|
except Exception:
|
|
bot_emojis = []
|
|
return guild_emojis + bot_emojis
|
|
|
|
def find_custom_emoji(ctx: commands.Context, keyword_variants: List[str]) -> Optional[str]:
|
|
"""Try to find a custom emoji by a list of keyword variants (case-insensitive). Returns str(emoji) or None."""
|
|
emojis = _all_accessible_emojis(ctx)
|
|
for kw in keyword_variants:
|
|
kw = kw.lower()
|
|
for e in emojis:
|
|
try:
|
|
if kw in (e.name or '').lower():
|
|
return str(e)
|
|
except Exception:
|
|
continue
|
|
return None
|
|
|
|
def get_t_emoji(ctx: commands.Context, t_level: int) -> str:
|
|
"""Return a suitable emoji for a T level, preferring custom emojis if present."""
|
|
mapping = {
|
|
1: ["hoi4_t1", "t1", "tier1"],
|
|
2: ["hoi4_t2", "t2", "tier2"],
|
|
3: ["hoi4_t3", "t3", "tier3"],
|
|
}
|
|
custom = find_custom_emoji(ctx, mapping.get(t_level, []))
|
|
if custom:
|
|
return custom
|
|
# Fallback unicode
|
|
return {1: "🔹", 2: "🔸", 3: "🔺"}.get(t_level, "🔹")
|
|
|
|
def get_team_emoji(ctx: commands.Context, team_name: str) -> str:
|
|
"""Return an emoji for common HOI4 team names like Axis/Allies/Comintern or a generic HOI4 emoji if available."""
|
|
name = (team_name or "").lower()
|
|
if any(k in name for k in ["axis", "achse"]):
|
|
custom = find_custom_emoji(ctx, ["axis", "hoi4_axis"])
|
|
return custom or "⚫"
|
|
if any(k in name for k in ["allies", "alliierten", "ally"]):
|
|
custom = find_custom_emoji(ctx, ["allies", "hoi4_allies"])
|
|
return custom or "🔵"
|
|
if any(k in name for k in ["comintern", "ussr", "soviet"]):
|
|
custom = find_custom_emoji(ctx, ["comintern", "hoi4_comintern"])
|
|
return custom or "🔴"
|
|
# Generic HOI4 emoji or fallback
|
|
custom = find_custom_emoji(ctx, ["hoi4", "hearts_of_iron", "iron"])
|
|
return custom or "🎖️"
|
|
|
|
# Database Functions
|
|
# Database Functions
|
|
async def init_database():
|
|
"""Initialize database connection and create tables"""
|
|
global db_pool
|
|
try:
|
|
# Parse DATABASE_URL for MySQL connection
|
|
db_config = parse_database_url(DATABASE_URL)
|
|
if not db_config:
|
|
raise ValueError("Invalid DATABASE_URL format")
|
|
|
|
print(f"🔌 Connecting to MySQL: {db_config['host']}:{db_config['port']}/{db_config['db']}")
|
|
|
|
# Create MySQL connection pool
|
|
db_pool = await aiomysql.create_pool(
|
|
host=db_config['host'],
|
|
port=db_config['port'],
|
|
user=db_config['user'],
|
|
password=db_config['password'],
|
|
db=db_config['db'],
|
|
charset='utf8mb4',
|
|
autocommit=True,
|
|
maxsize=10
|
|
)
|
|
|
|
async with db_pool.acquire() as conn:
|
|
async with conn.cursor() as cursor:
|
|
# Create players table (MySQL syntax)
|
|
await cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS players (
|
|
id INT AUTO_INCREMENT PRIMARY KEY,
|
|
discord_id BIGINT UNIQUE NOT NULL,
|
|
username VARCHAR(255) NOT NULL,
|
|
standard_elo INT DEFAULT 800,
|
|
competitive_elo INT DEFAULT 800,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
|
|
)
|
|
''')
|
|
|
|
# Create games table (MySQL syntax)
|
|
await cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS games (
|
|
id INT AUTO_INCREMENT PRIMARY KEY,
|
|
game_name VARCHAR(255) NOT NULL,
|
|
game_type VARCHAR(50) NOT NULL,
|
|
status VARCHAR(50) DEFAULT 'setup',
|
|
players JSON NOT NULL,
|
|
winner_team VARCHAR(255),
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
finished_at TIMESTAMP NULL
|
|
)
|
|
''')
|
|
|
|
# Create game_results table (MySQL syntax)
|
|
await cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS game_results (
|
|
id INT AUTO_INCREMENT PRIMARY KEY,
|
|
game_id INT,
|
|
discord_id BIGINT NOT NULL,
|
|
team_name VARCHAR(255) NOT NULL,
|
|
t_level INT NOT NULL,
|
|
old_elo INT NOT NULL,
|
|
new_elo INT NOT NULL,
|
|
elo_change INT NOT NULL,
|
|
won BOOLEAN NOT NULL,
|
|
result_type VARCHAR(10) DEFAULT 'loss',
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (game_id) REFERENCES games(id)
|
|
)
|
|
''')
|
|
|
|
# Add result_type column if it doesn't exist (for existing databases)
|
|
try:
|
|
await cursor.execute('''
|
|
ALTER TABLE game_results
|
|
ADD COLUMN result_type VARCHAR(10) DEFAULT 'loss'
|
|
''')
|
|
except:
|
|
# Column already exists, ignore error
|
|
pass
|
|
|
|
print("✅ Database initialized successfully")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Database initialization failed: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
async def get_or_create_player(discord_id: int, username: str) -> Dict:
|
|
"""Get or create a player in the database"""
|
|
async with db_pool.acquire() as conn:
|
|
async with conn.cursor(aiomysql.DictCursor) as cursor:
|
|
# Try to get existing player
|
|
await cursor.execute(
|
|
"SELECT * FROM players WHERE discord_id = %s", (discord_id,)
|
|
)
|
|
player = await cursor.fetchone()
|
|
|
|
if not player:
|
|
# Create new player
|
|
await cursor.execute(
|
|
"INSERT INTO players (discord_id, username) VALUES (%s, %s)",
|
|
(discord_id, username)
|
|
)
|
|
await cursor.execute(
|
|
"SELECT * FROM players WHERE discord_id = %s", (discord_id,)
|
|
)
|
|
player = await cursor.fetchone()
|
|
else:
|
|
# Update username if changed
|
|
await cursor.execute(
|
|
"UPDATE players SET username = %s, updated_at = CURRENT_TIMESTAMP WHERE discord_id = %s",
|
|
(username, discord_id)
|
|
)
|
|
|
|
return dict(player)
|
|
|
|
def calculate_elo_change(player_elo: int, opponent_avg_elo: int, result: str, t_level: int) -> int:
|
|
"""Calculate ELO change using standard ELO formula with T-level multiplier
|
|
result can be 'win', 'loss', or 'draw'
|
|
|
|
In draws:
|
|
- If you're expected to win (higher ELO), you lose points for drawing
|
|
- If you're expected to lose (lower ELO), you gain points for drawing
|
|
- The bigger the ELO difference, the bigger the swing
|
|
"""
|
|
expected_score = 1 / (1 + 10 ** ((opponent_avg_elo - player_elo) / 400))
|
|
|
|
if result == 'win':
|
|
actual_score = 1.0
|
|
elif result == 'draw':
|
|
actual_score = 0.5 # Draw = 50% score
|
|
else: # loss
|
|
actual_score = 0.0
|
|
|
|
# Calculate the base ELO change
|
|
base_change = K_FACTOR * (actual_score - expected_score)
|
|
|
|
# Apply T-level multiplier
|
|
t_multiplier = T_LEVEL_MULTIPLIERS.get(t_level, 1.0)
|
|
|
|
final_change = base_change * t_multiplier
|
|
|
|
# For debugging/logging purposes, let's see what happens in draws
|
|
if result == 'draw':
|
|
elo_diff = player_elo - opponent_avg_elo
|
|
expected_percentage = expected_score * 100
|
|
print(f"📊 Draw calculation: Player ELO {player_elo} vs Opponent Avg {opponent_avg_elo}")
|
|
print(f" ELO Difference: {elo_diff:+d} | Expected win chance: {expected_percentage:.1f}%")
|
|
print(f" ELO change: {final_change:+.1f} (T{t_level} multiplier: {t_multiplier})")
|
|
|
|
return round(final_change)
|
|
|
|
# Owner only decorator
|
|
def is_owner():
|
|
def predicate(ctx):
|
|
return ctx.author.id == OWNER_ID
|
|
return commands.check(predicate)
|
|
|
|
# Owner Commands
|
|
@bot.hybrid_command(name='reload', description='Reloads the bot and syncs slash commands (Owner only)')
|
|
@is_owner()
|
|
async def reload_bot(ctx):
|
|
"""Reloads the bot and syncs slash commands (Owner only)"""
|
|
try:
|
|
print(f"🔄 Reload command started by {ctx.author} (ID: {ctx.author.id})")
|
|
|
|
# Send initial message
|
|
embed = discord.Embed(
|
|
title="🔄 Bot Reload",
|
|
description="Reloading bot and syncing commands...",
|
|
color=discord.Color.yellow()
|
|
)
|
|
message = await ctx.send(embed=embed)
|
|
print("📤 Initial reload message sent")
|
|
|
|
# Sync slash commands
|
|
print("🔄 Starting command sync...")
|
|
synced = await bot.tree.sync()
|
|
print(f"✅ Synced {len(synced)} commands successfully")
|
|
|
|
# Update embed with success
|
|
embed = discord.Embed(
|
|
title="✅ Bot Reloaded Successfully",
|
|
description=f"Bot has been reloaded!\nSynced {len(synced)} slash commands.",
|
|
color=discord.Color.green()
|
|
)
|
|
embed.add_field(name="Servers", value=len(bot.guilds), inline=True)
|
|
embed.add_field(name="Latency", value=f"{round(bot.latency * 1000)}ms", inline=True)
|
|
embed.set_footer(text=f"Reloaded by {ctx.author}", icon_url=ctx.author.avatar.url if ctx.author.avatar else None)
|
|
|
|
await message.edit(embed=embed)
|
|
print("✅ Reload completed successfully")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Reload failed with error: {type(e).__name__}: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
embed = discord.Embed(
|
|
title="❌ Reload Failed",
|
|
description=f"**Error Type:** {type(e).__name__}\n**Error:** {str(e)[:1500]}",
|
|
color=discord.Color.red()
|
|
)
|
|
await ctx.send(embed=embed)
|
|
|
|
# HOI4 ELO Commands
|
|
@bot.hybrid_command(name='hoi4create', description='Create a new HOI4 game')
|
|
async def hoi4create(ctx, game_type: str, game_name: str):
|
|
"""Create a new HOI4 game"""
|
|
if game_type.lower() not in ['standard', 'competitive']:
|
|
await ctx.send("❌ Game type must be either 'standard' or 'competitive'")
|
|
return
|
|
|
|
try:
|
|
async with db_pool.acquire() as conn:
|
|
async with conn.cursor(aiomysql.DictCursor) as cursor:
|
|
# Check if game name already exists and is active
|
|
await cursor.execute(
|
|
"SELECT * FROM games WHERE game_name = %s AND status = 'setup'",
|
|
(game_name,)
|
|
)
|
|
existing_game = await cursor.fetchone()
|
|
|
|
if existing_game:
|
|
await ctx.send(f"❌ A game with name '{game_name}' is already in setup phase!")
|
|
return
|
|
|
|
# Create new game
|
|
await cursor.execute(
|
|
"INSERT INTO games (game_name, game_type, status, players) VALUES (%s, %s, 'setup', %s)",
|
|
(game_name, game_type.lower(), '[]')
|
|
)
|
|
|
|
embed = discord.Embed(
|
|
title="🎮 Game Created",
|
|
description=f"HOI4 {game_type.title()} game '{game_name}' has been created!",
|
|
color=discord.Color.green()
|
|
)
|
|
embed.add_field(name="Game Name", value=game_name, inline=True)
|
|
embed.add_field(name="Type", value=game_type.title(), inline=True)
|
|
embed.add_field(name="Status", value="Setup Phase", inline=True)
|
|
embed.set_footer(text="Use /hoi4setup to add players to this game")
|
|
|
|
await ctx.send(embed=embed)
|
|
|
|
except Exception as e:
|
|
await ctx.send(f"❌ Error creating game: {str(e)}")
|
|
|
|
@bot.hybrid_command(name='hoi4setup', description='Add a player to an existing game')
|
|
async def hoi4setup(ctx, game_name: str, user: discord.Member, team_name: str, t_level: int):
|
|
"""Add a player to an existing game"""
|
|
if t_level not in [1, 2, 3]:
|
|
await ctx.send("❌ T-Level must be 1, 2, or 3")
|
|
return
|
|
|
|
try:
|
|
async with db_pool.acquire() as conn:
|
|
async with conn.cursor(aiomysql.DictCursor) as cursor:
|
|
# Get the game
|
|
await cursor.execute(
|
|
"SELECT * FROM games WHERE game_name = %s AND status = 'setup'",
|
|
(game_name,)
|
|
)
|
|
game = await cursor.fetchone()
|
|
|
|
if not game:
|
|
await ctx.send(f"❌ No game found with name '{game_name}' in setup phase!")
|
|
return
|
|
|
|
# Get or create player
|
|
player = await get_or_create_player(user.id, user.display_name)
|
|
|
|
# Parse existing players
|
|
players = json.loads(game['players']) if game['players'] else []
|
|
|
|
# Check if player already in game
|
|
for p in players:
|
|
if p['discord_id'] == user.id:
|
|
await ctx.send(f"❌ {user.display_name} is already in this game!")
|
|
return
|
|
|
|
# Add player to game
|
|
player_data = {
|
|
'discord_id': user.id,
|
|
'username': user.display_name,
|
|
'team_name': team_name,
|
|
't_level': t_level,
|
|
'current_elo': player[f"{game['game_type']}_elo"]
|
|
}
|
|
players.append(player_data)
|
|
|
|
# Update game
|
|
await cursor.execute(
|
|
"UPDATE games SET players = %s WHERE id = %s",
|
|
(json.dumps(players), game['id'])
|
|
)
|
|
|
|
embed = discord.Embed(
|
|
title="✅ Player Added",
|
|
description=f"{user.display_name} has been added to '{game_name}'!",
|
|
color=discord.Color.green()
|
|
)
|
|
embed.add_field(name="Player", value=user.display_name, inline=True)
|
|
embed.add_field(name="Team", value=team_name, inline=True)
|
|
embed.add_field(name="T-Level", value=f"T{t_level}", inline=True)
|
|
embed.add_field(name="Current ELO", value=player[f"{game['game_type']}_elo"], inline=True)
|
|
embed.add_field(name="Players in Game", value=len(players), inline=True)
|
|
|
|
await ctx.send(embed=embed)
|
|
|
|
except Exception as e:
|
|
await ctx.send(f"❌ Error adding player: {str(e)}")
|
|
|
|
@bot.hybrid_command(name='hoi4end', description='End a game and calculate ELO changes')
|
|
async def hoi4end(ctx, game_name: str, winner_team: str):
|
|
"""End a game and calculate ELO changes. Use 'draw' for ties."""
|
|
try:
|
|
async with db_pool.acquire() as conn:
|
|
async with conn.cursor(aiomysql.DictCursor) as cursor:
|
|
# Get the game
|
|
await cursor.execute(
|
|
"SELECT * FROM games WHERE game_name = %s AND status = 'setup'",
|
|
(game_name,)
|
|
)
|
|
game = await cursor.fetchone()
|
|
|
|
if not game:
|
|
await ctx.send(f"❌ No active game found with name '{game_name}'!")
|
|
return
|
|
|
|
players = json.loads(game['players']) if game['players'] else []
|
|
|
|
if len(players) < 2:
|
|
await ctx.send("❌ Game needs at least 2 players to end!")
|
|
return
|
|
|
|
# Check if winner team exists or if it's a draw
|
|
teams = {p['team_name'] for p in players}
|
|
is_draw = winner_team.lower() == 'draw'
|
|
|
|
if not is_draw and winner_team not in teams:
|
|
available_teams = ', '.join(teams)
|
|
await ctx.send(f"❌ Team '{winner_team}' not found in game! Available teams: {available_teams}, draw")
|
|
return
|
|
|
|
# Calculate team averages
|
|
team_elos = {}
|
|
team_players = {}
|
|
|
|
for player in players:
|
|
team = player['team_name']
|
|
if team not in team_elos:
|
|
team_elos[team] = []
|
|
team_players[team] = []
|
|
|
|
team_elos[team].append(player['current_elo'])
|
|
team_players[team].append(player)
|
|
|
|
# Calculate average ELOs for each team
|
|
team_averages = {team: sum(elos) / len(elos) for team, elos in team_elos.items()}
|
|
|
|
elo_changes = []
|
|
|
|
# Calculate ELO changes for each player
|
|
for player in players:
|
|
team = player['team_name']
|
|
|
|
# Determine result for this player
|
|
if is_draw:
|
|
result = 'draw'
|
|
elif team == winner_team:
|
|
result = 'win'
|
|
else:
|
|
result = 'loss'
|
|
|
|
# Calculate opponent average (average of all other teams)
|
|
opponent_elos = []
|
|
for other_team, elos in team_elos.items():
|
|
if other_team != team:
|
|
opponent_elos.extend(elos)
|
|
|
|
opponent_avg = sum(opponent_elos) / len(opponent_elos) if opponent_elos else player['current_elo']
|
|
|
|
elo_change = calculate_elo_change(
|
|
player['current_elo'],
|
|
opponent_avg,
|
|
result,
|
|
player['t_level']
|
|
)
|
|
|
|
new_elo = max(0, player['current_elo'] + elo_change) # Prevent negative ELO
|
|
|
|
elo_changes.append({
|
|
'discord_id': player['discord_id'],
|
|
'username': player['username'],
|
|
'team_name': team,
|
|
't_level': player['t_level'],
|
|
'old_elo': player['current_elo'],
|
|
'new_elo': new_elo,
|
|
'elo_change': elo_change,
|
|
'result': result
|
|
})
|
|
|
|
# Update player ELOs and save game results
|
|
for change in elo_changes:
|
|
# Update player ELO
|
|
elo_field = f"{game['game_type']}_elo"
|
|
await cursor.execute(
|
|
f"UPDATE players SET {elo_field} = %s, updated_at = CURRENT_TIMESTAMP WHERE discord_id = %s",
|
|
(change['new_elo'], change['discord_id'])
|
|
)
|
|
|
|
# Save game result
|
|
won = change['result'] == 'win'
|
|
await cursor.execute(
|
|
"""INSERT INTO game_results
|
|
(game_id, discord_id, team_name, t_level, old_elo, new_elo, elo_change, won, result_type)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)""",
|
|
(game['id'], change['discord_id'], change['team_name'],
|
|
change['t_level'], change['old_elo'], change['new_elo'],
|
|
change['elo_change'], won, change['result'])
|
|
)
|
|
|
|
# Mark game as finished
|
|
final_result = "Draw" if is_draw else winner_team
|
|
await cursor.execute(
|
|
"UPDATE games SET status = 'finished', winner_team = %s, finished_at = CURRENT_TIMESTAMP WHERE id = %s",
|
|
(final_result, game['id'])
|
|
)
|
|
|
|
# Create result embed
|
|
if is_draw:
|
|
embed = discord.Embed(
|
|
title="🤝 Game Finished!",
|
|
description=f"Game '{game_name}' has ended!\n**Result: Draw**",
|
|
color=discord.Color.orange()
|
|
)
|
|
else:
|
|
embed = discord.Embed(
|
|
title="🏆 Game Finished!",
|
|
description=f"Game '{game_name}' has ended!\n**Winner: {winner_team}**",
|
|
color=discord.Color.gold()
|
|
)
|
|
|
|
# Group results by team
|
|
teams_results = {}
|
|
for change in elo_changes:
|
|
team = change['team_name']
|
|
if team not in teams_results:
|
|
teams_results[team] = []
|
|
teams_results[team].append(change)
|
|
|
|
for team, team_changes in teams_results.items():
|
|
team_text = ""
|
|
for change in team_changes:
|
|
emoji = "📈" if change['elo_change'] > 0 else "📉" if change['elo_change'] < 0 else "➡️"
|
|
result_emoji = ""
|
|
if change['result'] == 'win':
|
|
result_emoji = "🏆"
|
|
elif change['result'] == 'draw':
|
|
result_emoji = "🤝"
|
|
else:
|
|
result_emoji = "💔"
|
|
|
|
team_text += f"{change['username']}: {change['old_elo']} → {change['new_elo']} ({change['elo_change']:+d}) {emoji}\n"
|
|
|
|
# Team header with appropriate icon
|
|
if is_draw:
|
|
team_header = f"🤝 Team {team}"
|
|
elif team == winner_team:
|
|
team_header = f"🏆 Team {team}"
|
|
else:
|
|
team_header = f"💔 Team {team}"
|
|
|
|
embed.add_field(
|
|
name=team_header,
|
|
value=team_text,
|
|
inline=False
|
|
)
|
|
|
|
embed.set_footer(text=f"Game Type: {game['game_type'].title()}")
|
|
|
|
await ctx.send(embed=embed)
|
|
|
|
except Exception as e:
|
|
await ctx.send(f"❌ Error ending game: {str(e)}")
|
|
|
|
@bot.hybrid_command(name='hoi4stats', description='Show your HOI4 ELO statistics')
|
|
async def hoi4stats(ctx, user: Optional[discord.Member] = None):
|
|
"""Show HOI4 ELO statistics for a user"""
|
|
target_user = user or ctx.author
|
|
|
|
try:
|
|
player = await get_or_create_player(target_user.id, target_user.display_name)
|
|
|
|
# Get player rankings
|
|
async with db_pool.acquire() as conn:
|
|
async with conn.cursor(aiomysql.DictCursor) as cursor:
|
|
# Get standard rank
|
|
await cursor.execute(
|
|
"SELECT COUNT(*) + 1 as player_rank FROM players WHERE standard_elo > %s",
|
|
(player['standard_elo'],)
|
|
)
|
|
standard_rank_result = await cursor.fetchone()
|
|
standard_rank = standard_rank_result['player_rank']
|
|
|
|
# Get competitive rank
|
|
await cursor.execute(
|
|
"SELECT COUNT(*) + 1 as player_rank FROM players WHERE competitive_elo > %s",
|
|
(player['competitive_elo'],)
|
|
)
|
|
competitive_rank_result = await cursor.fetchone()
|
|
competitive_rank = competitive_rank_result['player_rank']
|
|
|
|
# Get total player count
|
|
await cursor.execute("SELECT COUNT(*) as total FROM players")
|
|
total_players_result = await cursor.fetchone()
|
|
total_players = total_players_result['total']
|
|
|
|
# Get game statistics with proper draw detection
|
|
await cursor.execute(
|
|
"""SELECT
|
|
COUNT(*) as total_games,
|
|
SUM(CASE WHEN result_type = 'win' THEN 1 ELSE 0 END) as games_won,
|
|
SUM(CASE WHEN result_type = 'draw' THEN 1 ELSE 0 END) as games_drawn,
|
|
SUM(CASE WHEN result_type = 'loss' THEN 1 ELSE 0 END) as games_lost
|
|
FROM game_results WHERE discord_id = %s""",
|
|
(target_user.id,)
|
|
)
|
|
game_stats = await cursor.fetchone()
|
|
|
|
total_games = game_stats['total_games'] or 0
|
|
games_won = game_stats['games_won'] or 0
|
|
games_drawn = game_stats['games_drawn'] or 0
|
|
games_lost = game_stats['games_lost'] or 0
|
|
win_rate = (games_won / total_games * 100) if total_games > 0 else 0
|
|
|
|
# Create rank indicators with medals
|
|
def get_rank_display(rank, total):
|
|
if rank == 1:
|
|
return f"🥇 #{rank} of {total}"
|
|
elif rank == 2:
|
|
return f"🥈 #{rank} of {total}"
|
|
elif rank == 3:
|
|
return f"🥉 #{rank} of {total}"
|
|
else:
|
|
return f"#{rank} of {total}"
|
|
|
|
embed = discord.Embed(
|
|
title=f"📊 HOI4 ELO Stats - {target_user.display_name}",
|
|
color=discord.Color.blue()
|
|
)
|
|
|
|
# ELO and Rankings
|
|
embed.add_field(
|
|
name="🎯 Standard ELO",
|
|
value=f"**{player['standard_elo']}** ELO\n{get_rank_display(standard_rank, total_players)}",
|
|
inline=True
|
|
)
|
|
embed.add_field(
|
|
name="🏆 Competitive ELO",
|
|
value=f"**{player['competitive_elo']}** ELO\n{get_rank_display(competitive_rank, total_players)}",
|
|
inline=True
|
|
)
|
|
embed.add_field(
|
|
name="📅 Player Since",
|
|
value=player['created_at'].strftime("%m/%d/%Y"),
|
|
inline=True
|
|
)
|
|
|
|
# Game Statistics
|
|
if total_games > 0:
|
|
embed.add_field(
|
|
name="🎮 Games Played",
|
|
value=f"**{total_games}** total games",
|
|
inline=True
|
|
)
|
|
|
|
if games_drawn > 0:
|
|
embed.add_field(
|
|
name="📊 W/D/L Record",
|
|
value=f"**{games_won}W** / **{games_drawn}D** / **{games_lost}L**",
|
|
inline=True
|
|
)
|
|
else:
|
|
embed.add_field(
|
|
name="📈 Win/Loss",
|
|
value=f"**{games_won}W** / **{games_lost}L**",
|
|
inline=True
|
|
)
|
|
|
|
embed.add_field(
|
|
name="📊 Win Rate",
|
|
value=f"**{win_rate:.1f}%**",
|
|
inline=True
|
|
)
|
|
else:
|
|
embed.add_field(
|
|
name="🎮 Games Played",
|
|
value="No games played yet",
|
|
inline=False
|
|
)
|
|
|
|
if target_user.avatar:
|
|
embed.set_thumbnail(url=target_user.avatar.url)
|
|
|
|
# Add percentile information
|
|
standard_percentile = ((total_players - standard_rank) / total_players * 100) if total_players > 0 else 0
|
|
competitive_percentile = ((total_players - competitive_rank) / total_players * 100) if total_players > 0 else 0
|
|
|
|
embed.set_footer(
|
|
text=f"Standard: Top {100-standard_percentile:.1f}% | Competitive: Top {100-competitive_percentile:.1f}%"
|
|
)
|
|
|
|
await ctx.send(embed=embed)
|
|
|
|
except Exception as e:
|
|
await ctx.send(f"❌ Error getting stats: {str(e)}")
|
|
|
|
@bot.hybrid_command(name='hoi4games', description='Show active games as team showcases')
|
|
async def hoi4games(ctx):
|
|
"""Show all active games with teams presented side-by-side like a showcase."""
|
|
try:
|
|
async with db_pool.acquire() as conn:
|
|
async with conn.cursor(aiomysql.DictCursor) as cursor:
|
|
await cursor.execute(
|
|
"SELECT * FROM games WHERE status = 'setup' ORDER BY created_at DESC"
|
|
)
|
|
games = await cursor.fetchall()
|
|
|
|
if not games:
|
|
await ctx.send("📝 No active games found. Use `/hoi4create` to create a new game!")
|
|
return
|
|
|
|
embed = discord.Embed(
|
|
title="🎮 Active HOI4 Games",
|
|
description="Team lineups are shown side-by-side. Use /hoi4setup to add players.",
|
|
color=discord.Color.green()
|
|
)
|
|
|
|
# Limit total embed fields to avoid Discord limit (25). Each game uses up to 3 fields.
|
|
max_games = max(1, 25 // 3)
|
|
games = games[:max_games]
|
|
|
|
for game in games:
|
|
players = json.loads(game['players']) if game['players'] else []
|
|
|
|
# Build team structures
|
|
teams: Dict[str, List[Dict]] = {}
|
|
for p in players:
|
|
teams.setdefault(p['team_name'], []).append(p)
|
|
|
|
# Compute average ELO per team
|
|
team_avgs = {t: (sum(m['current_elo'] for m in mlist) / len(mlist)) if mlist else 0 for t, mlist in teams.items()}
|
|
|
|
# Sort teams by name to keep stable order
|
|
ordered_teams = sorted(teams.items(), key=lambda x: x[0].lower())
|
|
|
|
# Helper to format a team's field
|
|
def build_team_field(team_name: str, members: List[Dict]) -> Dict[str, str]:
|
|
t_emoji = get_team_emoji(ctx, team_name)
|
|
avg = team_avgs.get(team_name, 0)
|
|
name = f"{t_emoji} {team_name} (avg {avg:.0f})"
|
|
# Each player line: T-level emoji, name, elo
|
|
lines = []
|
|
for m in sorted(members, key=lambda mm: (-mm.get('t_level', 2), mm['username'].lower())):
|
|
te = get_t_emoji(ctx, int(m.get('t_level', 2)))
|
|
lines.append(f"{te} {m['username']} ({m['current_elo']})")
|
|
value = "\n".join(lines) if lines else "No players yet"
|
|
# Discord field value max ~1024 chars; trim if necessary
|
|
if len(value) > 1000:
|
|
value = value[:997] + "..."
|
|
return {"name": name, "value": value}
|
|
|
|
# If no teams yet, show empty placeholder for this game
|
|
if not ordered_teams:
|
|
embed.add_field(
|
|
name=f"{game['game_name']} ({game['game_type'].title()})",
|
|
value="No players yet",
|
|
inline=False
|
|
)
|
|
continue
|
|
|
|
# For two teams, show A vs B with a center VS field; otherwise, list each team inline
|
|
team_fields = [build_team_field(tn, members) for tn, members in ordered_teams]
|
|
|
|
# Add a header field per game
|
|
embed.add_field(
|
|
name=f"🎯 {game['game_name']} ({game['game_type'].title()})",
|
|
value=f"Players: {len(players)} | Teams: {len(ordered_teams)}",
|
|
inline=False
|
|
)
|
|
|
|
if len(team_fields) == 1:
|
|
f1 = team_fields[0]
|
|
embed.add_field(name=f1["name"], value=f1["value"], inline=False)
|
|
elif len(team_fields) >= 2:
|
|
f1, f2 = team_fields[0], team_fields[1]
|
|
embed.add_field(name=f1["name"], value=f1["value"], inline=True)
|
|
embed.add_field(name="⚔️ VS ⚔️", value="\u200b", inline=True)
|
|
embed.add_field(name=f2["name"], value=f2["value"], inline=True)
|
|
# If more teams exist, add them below
|
|
for extra in team_fields[2:]:
|
|
embed.add_field(name=extra["name"], value=extra["value"], inline=False)
|
|
|
|
await ctx.send(embed=embed)
|
|
|
|
except Exception as e:
|
|
await ctx.send(f"❌ Error getting games: {str(e)}")
|
|
|
|
@bot.hybrid_command(name='hoi4history', description='Show past games with optional filters')
|
|
async def hoi4history(ctx, limit: Optional[int] = 10, player: Optional[discord.Member] = None, game_name: Optional[str] = None, game_type: Optional[str] = None):
|
|
"""Show past games with optional filters"""
|
|
if limit > 50:
|
|
limit = 50 # Prevent too many results
|
|
|
|
try:
|
|
async with db_pool.acquire() as conn:
|
|
async with conn.cursor(aiomysql.DictCursor) as cursor:
|
|
# Build dynamic query based on filters
|
|
query = "SELECT * FROM games WHERE status = 'finished'"
|
|
params = []
|
|
|
|
if game_name:
|
|
query += " AND game_name LIKE %s"
|
|
params.append(f"%{game_name}%")
|
|
|
|
if game_type and game_type.lower() in ['standard', 'competitive']:
|
|
query += " AND game_type = %s"
|
|
params.append(game_type.lower())
|
|
|
|
if player:
|
|
query += " AND JSON_CONTAINS(players, JSON_OBJECT('discord_id', %s))"
|
|
params.append(player.id)
|
|
|
|
query += " ORDER BY finished_at DESC LIMIT %s"
|
|
params.append(limit)
|
|
|
|
await cursor.execute(query, params)
|
|
games = await cursor.fetchall()
|
|
|
|
if not games:
|
|
await ctx.send("📝 No finished games found with the specified filters!")
|
|
return
|
|
|
|
embed = discord.Embed(
|
|
title="📚 HOI4 Game History",
|
|
color=discord.Color.blue()
|
|
)
|
|
|
|
if player:
|
|
embed.description = f"Filtered by player: {player.display_name}"
|
|
if game_name:
|
|
embed.description = f"Filtered by game name: {game_name}"
|
|
if game_type:
|
|
embed.description = f"Filtered by type: {game_type.title()}"
|
|
|
|
for game in games:
|
|
players = json.loads(game['players']) if game['players'] else []
|
|
|
|
# Count teams and players
|
|
teams = {}
|
|
for p in players:
|
|
team = p['team_name']
|
|
if team not in teams:
|
|
teams[team] = 0
|
|
teams[team] += 1
|
|
|
|
# Format date
|
|
finished_date = game['finished_at'].strftime("%m/%d/%Y %H:%M") if game['finished_at'] else "Unknown"
|
|
|
|
# Winner indicator
|
|
winner = game['winner_team'] if game['winner_team'] else "Unknown"
|
|
|
|
game_info = f"**Winner:** {winner}\n"
|
|
game_info += f"**Type:** {game['game_type'].title()}\n"
|
|
game_info += f"**Players:** {len(players)} | **Teams:** {len(teams)}\n"
|
|
game_info += f"**Finished:** {finished_date}"
|
|
|
|
embed.add_field(
|
|
name=f"🏆 {game['game_name']}",
|
|
value=game_info,
|
|
inline=False
|
|
)
|
|
|
|
embed.set_footer(text=f"Showing {len(games)} of last {limit} games")
|
|
await ctx.send(embed=embed)
|
|
|
|
except Exception as e:
|
|
await ctx.send(f"❌ Error getting game history: {str(e)}")
|
|
|
|
@bot.hybrid_command(name='hoi4leaderboard', description='Show ELO leaderboard')
|
|
async def hoi4leaderboard(ctx, game_type: Optional[str] = "standard", limit: Optional[int] = 10):
|
|
"""Show ELO leaderboard for standard or competitive"""
|
|
if game_type.lower() not in ['standard', 'competitive']:
|
|
await ctx.send("❌ Game type must be either 'standard' or 'competitive'")
|
|
return
|
|
|
|
if limit > 25:
|
|
limit = 25 # Prevent too many results
|
|
|
|
try:
|
|
async with db_pool.acquire() as conn:
|
|
async with conn.cursor(aiomysql.DictCursor) as cursor:
|
|
# Get top players by ELO
|
|
elo_field = f"{game_type.lower()}_elo"
|
|
await cursor.execute(
|
|
f"SELECT discord_id, username, {elo_field}, created_at FROM players ORDER BY {elo_field} DESC LIMIT %s",
|
|
(limit,)
|
|
)
|
|
players = await cursor.fetchall()
|
|
|
|
# Also get some statistics
|
|
await cursor.execute(
|
|
f"SELECT COUNT(*) as total_players, AVG({elo_field}) as avg_elo, MAX({elo_field}) as max_elo, MIN({elo_field}) as min_elo FROM players"
|
|
)
|
|
stats = await cursor.fetchone()
|
|
|
|
if not players:
|
|
await ctx.send("📝 No players found in the database!")
|
|
return
|
|
|
|
# Create leaderboard embed
|
|
embed = discord.Embed(
|
|
title=f"🏆 HOI4 {game_type.title()} Leaderboard",
|
|
color=discord.Color.gold()
|
|
)
|
|
|
|
# Add statistics
|
|
embed.description = f"**Total Players:** {stats['total_players']} | **Average ELO:** {stats['avg_elo']:.0f}"
|
|
|
|
leaderboard_text = ""
|
|
medals = ["🥇", "🥈", "🥉"]
|
|
|
|
for i, player in enumerate(players, 1):
|
|
# Get medal or rank number
|
|
if i <= 3:
|
|
rank_indicator = medals[i-1]
|
|
else:
|
|
rank_indicator = f"**{i}.**"
|
|
|
|
elo_value = player[elo_field]
|
|
username = player['username']
|
|
|
|
# Get additional player stats
|
|
async with db_pool.acquire() as conn:
|
|
async with conn.cursor(aiomysql.DictCursor) as cursor:
|
|
# Count games played with proper result detection
|
|
await cursor.execute(
|
|
"""SELECT
|
|
COUNT(*) as games_played,
|
|
SUM(CASE WHEN result_type = 'win' THEN 1 ELSE 0 END) as games_won,
|
|
SUM(CASE WHEN result_type = 'draw' THEN 1 ELSE 0 END) as games_drawn
|
|
FROM game_results WHERE discord_id = %s""",
|
|
(player['discord_id'],)
|
|
)
|
|
player_stats = await cursor.fetchone()
|
|
|
|
games_played = player_stats['games_played'] or 0
|
|
games_won = player_stats['games_won'] or 0
|
|
games_drawn = player_stats['games_drawn'] or 0
|
|
win_rate = (games_won / games_played * 100) if games_played > 0 else 0
|
|
|
|
leaderboard_text += f"{rank_indicator} **{username}** - {elo_value} ELO\n"
|
|
if games_drawn > 0:
|
|
leaderboard_text += f" 📊 {games_played} games | {games_won}W-{games_drawn}D-{games_played - games_won - games_drawn}L | {win_rate:.1f}% win rate\n\n"
|
|
else:
|
|
leaderboard_text += f" 📊 {games_played} games | {win_rate:.1f}% win rate\n\n"
|
|
|
|
embed.add_field(
|
|
name="Rankings",
|
|
value=leaderboard_text,
|
|
inline=False
|
|
)
|
|
|
|
embed.set_footer(text=f"ELO Range: {stats['min_elo']:.0f} - {stats['max_elo']:.0f}")
|
|
await ctx.send(embed=embed)
|
|
|
|
except Exception as e:
|
|
await ctx.send(f"❌ Error getting leaderboard: {str(e)}")
|
|
|
|
@bot.event
|
|
async def on_command_error(ctx, error):
|
|
"""Handles command errors"""
|
|
if isinstance(error, commands.CheckFailure):
|
|
await ctx.send("❌ You don't have permission to use this command!")
|
|
elif isinstance(error, commands.CommandNotFound):
|
|
# Silently ignore command not found errors
|
|
pass
|
|
elif isinstance(error, commands.MissingRequiredArgument):
|
|
await ctx.send(f"❌ Missing arguments! Command: `{ctx.command}`")
|
|
elif isinstance(error, commands.BadArgument):
|
|
await ctx.send("❌ Invalid argument!")
|
|
else:
|
|
# Log detailed error information
|
|
print(f"❌ Unknown error in command '{ctx.command}': {type(error).__name__}: {error}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
# Send detailed error to user if owner
|
|
if ctx.author.id == OWNER_ID:
|
|
await ctx.send(f"❌ **Error Details (Owner only):**\n```python\n{type(error).__name__}: {str(error)[:1800]}\n```")
|
|
else:
|
|
await ctx.send("❌ An unknown error occurred!")
|
|
|
|
async def main():
|
|
"""Main function to start the bot"""
|
|
# Load Discord token from environment variables
|
|
token = os.getenv('DISCORD_TOKEN')
|
|
|
|
if not token:
|
|
print("❌ DISCORD_TOKEN environment variable not found!")
|
|
print("Please set the DISCORD_TOKEN variable in Coolify or create a .env file")
|
|
return
|
|
|
|
if not DATABASE_URL:
|
|
print("❌ DATABASE_URL environment variable not found!")
|
|
print("Please set either DATABASE_URL or individual DB variables (DB_HOST, DB_NAME, DB_USER, DB_PASSWORD) in Coolify")
|
|
print(f"Current values - HOST: {DB_HOST}, NAME: {DB_NAME}, USER: {DB_USER}, PASSWORD: {'***' if DB_PASSWORD else 'None'}")
|
|
return
|
|
|
|
try:
|
|
print("🚀 Starting bot...")
|
|
await bot.start(token)
|
|
except discord.LoginFailure:
|
|
print("❌ Invalid Discord token!")
|
|
except Exception as e:
|
|
print(f"❌ Error starting bot: {e}")
|
|
finally:
|
|
if db_pool:
|
|
await db_pool.close()
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main()) |