import discord from discord.ext import commands import os import asyncio import logging from dotenv import load_dotenv import aiomysql import json from datetime import datetime from typing import Optional, List, Dict from concurrent.futures import ThreadPoolExecutor import functools import traceback import aiohttp import time import base64 from pathlib import Path # Optional: YouTube extraction try: import yt_dlp as ytdlp except Exception: ytdlp = None # Load environment variables load_dotenv() # Configure logging logging.basicConfig(level=logging.INFO) # Database configuration DATABASE_URL = os.getenv('DATABASE_URL') DB_HOST = os.getenv('DB_HOST') DB_PORT = os.getenv('DB_PORT', '5432') # Default to PostgreSQL port DB_NAME = os.getenv('DB_NAME') DB_USER = os.getenv('DB_USER') DB_PASSWORD = os.getenv('DB_PASSWORD') # Build DATABASE_URL from individual components if not provided if not DATABASE_URL and all([DB_HOST, DB_NAME, DB_USER, DB_PASSWORD]): DATABASE_URL = f"mysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}" print(f"๐Ÿ“ Built DATABASE_URL from individual environment variables") # Parse MySQL connection details from DATABASE_URL def parse_database_url(url): """Parse MySQL connection URL into components""" if not url: return None # Remove mysql:// prefix if url.startswith('mysql://'): url = url[8:] # Split user:pass@host:port/db if '@' in url: auth, host_db = url.split('@', 1) if ':' in auth: user, password = auth.split(':', 1) else: user, password = auth, '' else: return None if '/' in host_db: host_port, database = host_db.split('/', 1) else: return None if ':' in host_port: host, port = host_port.split(':', 1) try: port = int(port) except ValueError: port = 3306 else: host, port = host_port, 3306 return { 'host': host, 'port': port, 'user': user, 'password': password, 'db': database } # Global database connection pool db_pool = None # Bot configuration intents = discord.Intents.default() intents.message_content = True intents.guilds = True intents.members = True bot = commands.Bot(command_prefix='!', intents=intents) # Thread pool for blocking operations like yt-dlp extraction executor = ThreadPoolExecutor(max_workers=4) @bot.event async def on_ready(): """Event triggered when the bot is ready""" print(f'{bot.user} is online and ready!') print(f'Bot ID: {bot.user.id}') print(f'Discord.py Version: {discord.__version__}') print('------') # Initialize database await init_database() # Set bot status await bot.change_presence( activity=discord.Game(name="Hearts of Iron IV ELO"), status=discord.Status.online ) # Sync hybrid commands on startup try: synced = await bot.tree.sync() print(f'Synced {len(synced)} hybrid commands') except Exception as e: print(f'Failed to sync commands: {e}') # Pre-warm voice: ensure PyNaCl is importable try: import nacl print(f"๐Ÿ”Š Voice ready (PyNaCl {getattr(nacl, '__version__', 'unknown')})") except Exception as e: print(f"โš ๏ธ Voice (PyNaCl) not available: {e}") @bot.event async def on_guild_join(guild): """Event triggered when the bot joins a server""" print(f'Bot joined server "{guild.name}" (ID: {guild.id})') @bot.event async def on_guild_remove(guild): """Event triggered when the bot leaves a server""" print(f'Bot left server "{guild.name}" (ID: {guild.id})') # Owner Configuration OWNER_ID = 253922739709018114 # ELO Configuration STARTING_ELO = 800 K_FACTOR = 32 T_LEVEL_MULTIPLIERS = { 1: 0.8, # T1 countries get less points 2: 1.0, # T2 countries get normal points 3: 1.2 # T3 countries get more points } # Preferred team emoji overrides (will be used if no guild/file match is found) TEAM_EMOTE_OVERRIDES: Dict[str, str] = { "axis": "<:fascism:1432391685127536762>", "allies": "<:democracy:1432391686612586528>", "comintern": "<:communism:1432391682267025479>", "cuf": "<:ChineseUnitedFront:1432422469985112156>", "default": "<:neutrality:1432391681059197102>", } # Role mapping for ELO tiers (IDs provided by user) # Assumption: Top tier is >= 900 ELO rather than strictly >900, # to avoid leaving 900 unassigned. Adjust if you want 900 handled differently. STANDARD_ELO_ROLE_IDS = { "gte_900": 1432368177014374497, "851_899": 1432368177014374496, "801_850": 1432368177014374495, "eq_800": 1432368177014374494, "751_799": 1432368177014374493, "701_750": 1432368177014374492, "lt_700": 1432368177014374491, } COMPETITIVE_ELO_ROLE_IDS = { "gte_900": 1432368177030893672, "851_899": 1432368177030893671, "801_850": 1432368177030893670, "eq_800": 1432368177030893669, "751_799": 1432368177030893668, "701_750": 1432368177014374499, "lt_700": 1432368177014374498, } def _role_id_for_elo(elo: int, category: str) -> Optional[int]: """Return the role ID for the given ELO and category ('standard'|'competitive').""" ids = STANDARD_ELO_ROLE_IDS if category == "standard" else COMPETITIVE_ELO_ROLE_IDS if elo >= 900: return ids["gte_900"] if 851 <= elo <= 899: return ids["851_899"] if 801 <= elo <= 850: return ids["801_850"] if elo == 800: return ids["eq_800"] if 751 <= elo <= 799: return ids["751_799"] if 701 <= elo <= 750: return ids["701_750"] # < 700 return ids["lt_700"] def _category_role_ids(category: str) -> List[int]: ids = STANDARD_ELO_ROLE_IDS if category == "standard" else COMPETITIVE_ELO_ROLE_IDS return list(ids.values()) async def update_member_elo_role(member: discord.Member, elo: int, category: str, reason: Optional[str] = None): """Ensure the member has exactly one rank role for the given category matching their ELO. - category: 'standard' or 'competitive' """ if member is None or member.guild is None: return try: guild = member.guild target_role_id = _role_id_for_elo(elo, category) if not target_role_id: return target_role = guild.get_role(int(target_role_id)) if not target_role: return # Role not found in this guild # Remove any other roles from the same category category_ids = set(_category_role_ids(category)) roles_to_remove = [r for r in member.roles if r.id in category_ids and r.id != target_role.id] if roles_to_remove: await member.remove_roles(*roles_to_remove, reason=reason) # Add the target role if missing if target_role not in member.roles: await member.add_roles(target_role, reason=reason) except discord.Forbidden: # Missing permissions or role hierarchy issue logging.warning(f"Insufficient permissions to modify roles for {member} in category {category}") except discord.HTTPException as e: logging.warning(f"Failed to update roles for {member}: {e}") # Emoji and formatting helpers def _all_accessible_emojis(ctx: commands.Context) -> List[discord.Emoji]: """Return emojis from current guild plus all guilds the bot is in.""" try: guild_emojis = list(ctx.guild.emojis) if ctx.guild else [] except Exception: guild_emojis = [] try: bot_emojis = list(bot.emojis) except Exception: bot_emojis = [] return guild_emojis + bot_emojis def find_custom_emoji(ctx: commands.Context, keyword_variants: List[str]) -> Optional[str]: """Try to find a custom emoji by a list of keyword variants (case-insensitive). Returns str(emoji) or None.""" emojis = _all_accessible_emojis(ctx) for kw in keyword_variants: kw = kw.lower() for e in emojis: try: if kw in (e.name or '').lower(): return str(e) except Exception: continue # Fallback to markdown-defined emojis if available try: if EMOTE_MAP: for kw in keyword_variants: key = kw.lower() # Exact name match if key in EMOTE_MAP: return EMOTE_MAP[key] # Substring match for name_lower, mention in EMOTE_MAP.items(): if key in name_lower: return mention except NameError: # EMOTE_MAP not defined yet pass return None def get_t_emoji(ctx: commands.Context, t_level: int) -> str: """Return a suitable emoji for a T level, preferring custom emojis if present.""" mapping = { 1: ["hoi4_t1", "t1", "tier1"], 2: ["hoi4_t2", "t2", "tier2"], 3: ["hoi4_t3", "t3", "tier3"], } custom = find_custom_emoji(ctx, mapping.get(t_level, [])) if custom: return custom # Fallback unicode return {1: "๐Ÿ”น", 2: "๐Ÿ”ธ", 3: "๐Ÿ”บ"}.get(t_level, "๐Ÿ”น") def get_team_emoji(ctx: commands.Context, team_name: str) -> str: """Return a team emoji using user's preferred overrides, with guild/file matches first.""" name = (team_name or "").lower() def pick(keywords: List[str], override_key: str) -> str: # Try specific keywords (ideology-first), then generic ones custom = find_custom_emoji(ctx, keywords) if custom: return custom # As an extra attempt, try a couple generic HOI4 icons custom = find_custom_emoji(ctx, ["eagle_hoi", "peace_hoi", "navy_hoi", "secretweapon_hoi"]) if custom: return custom # Fall back to explicit override mapping return TEAM_EMOTE_OVERRIDES.get(override_key, TEAM_EMOTE_OVERRIDES.get("default", "๐ŸŽ–๏ธ")) if any(k in name for k in ["axis", "achse"]): return pick(["fascism", "axis", "hoi4_axis"], "axis") if any(k in name for k in ["allies", "alliierten", "ally"]): return pick(["democracy", "allies", "hoi4_allies"], "allies") if any(k in name for k in ["comintern", "ussr", "soviet"]): return pick(["communism", "comintern", "hoi4_comintern"], "comintern") # Chinese United Front (optional special case) if ("chinese united front" in name) or ("chinese" in name and "front" in name) or ("cuf" in name): return pick(["ChineseUnitedFront", "chineseunitedfront", "cuf"], "cuf") # Default/other custom = find_custom_emoji(ctx, ["neutrality", "hoi4", "hearts_of_iron", "iron"]) if custom: return custom return TEAM_EMOTE_OVERRIDES.get("default", "๐ŸŽ–๏ธ") # ========================= # Music / YouTube playback # ========================= # Global music state per guild MUSIC_STATE: Dict[int, Dict] = {} # --- yt-dlp cookies and options support --- # You can supply YouTube cookies to bypass bot checks/age gates: # - YTDL_COOKIES_FILE: Path to a Netscape-format cookies.txt inside the container (e.g., mounted secret) # - YTDL_COOKIES_B64: Base64-encoded content of a Netscape-format cookies.txt; will be written to app directory # - YTDL_COOKIES_FROM_BROWSER: e.g., "chrome:Default" (only works on hosts with a real browser profile, usually not in containers) # - YTDL_UA: Custom User-Agent string # - YTDL_YT_CLIENT: youtube player client hint (e.g., android, web) to work around some restrictions (default: android) YTDL_COOKIEFILE: Optional[str] = None YTDL_COOKIESFROMBROWSER: Optional[tuple] = None try: # Priority 1: cookies from base64 env _cookies_b64 = os.getenv('YTDL_COOKIES_B64') if _cookies_b64: try: decoded = base64.b64decode(_cookies_b64) cookie_path = Path(__file__).with_name('cookies.txt') cookie_path.write_bytes(decoded) YTDL_COOKIEFILE = str(cookie_path) print(f"๐Ÿช Wrote YouTube cookies to {YTDL_COOKIEFILE} (from YTDL_COOKIES_B64)") except Exception as e: print(f"โš ๏ธ Failed to decode/write YTDL_COOKIES_B64: {e}") # Priority 2: cookies from a file path if not YTDL_COOKIEFILE: _cookies_file = os.getenv('YTDL_COOKIES_FILE') if _cookies_file and os.path.exists(_cookies_file): YTDL_COOKIEFILE = _cookies_file print(f"๐Ÿช Using YouTube cookies file: {YTDL_COOKIEFILE}") elif _cookies_file: print(f"โš ๏ธ YTDL_COOKIES_FILE set but not found: {_cookies_file}") # Optional: cookies from browser (rarely usable in containers) _cookies_from_browser = os.getenv('YTDL_COOKIES_FROM_BROWSER') if _cookies_from_browser: parts = _cookies_from_browser.split(':') browser = parts[0].strip().lower() if parts else None profile = parts[1].strip() if len(parts) > 1 else None if browser: # (browser, profile, keyring, container) YTDL_COOKIESFROMBROWSER = (browser, profile, None, None) print(f"๐Ÿช Will try cookies from browser: {browser} profile={profile or 'default'}") except Exception as e: print(f"โš ๏ธ Cookie configuration error: {e}") YTDL_OPTS = { 'format': 'bestaudio/best', 'noplaylist': True, 'quiet': True, 'default_search': 'ytsearch', 'skip_download': True, } def get_ytdl_opts(client_hint: Optional[str] = None) -> Dict: """Build yt-dlp options dynamically, injecting cookies and headers if configured. client_hint: preferred YouTube client (e.g., 'android', 'web', 'ios', 'tv'). """ opts = dict(YTDL_OPTS) # UA and extractor tweaks ua = os.getenv('YTDL_UA') or ( 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ' '(KHTML, like Gecko) Chrome/118.0 Safari/537.36' ) yt_client_env = os.getenv('YTDL_YT_CLIENT', 'android') yt_client = client_hint or yt_client_env opts['http_headers'] = {'User-Agent': ua} # Extractor args can help avoid some player config checks opts.setdefault('extractor_args', {}) opts['extractor_args'].setdefault('youtube', {}) # player_client hint opts['extractor_args']['youtube']['player_client'] = [yt_client] # Use cookies if available if YTDL_COOKIEFILE: opts['cookiefile'] = YTDL_COOKIEFILE elif YTDL_COOKIESFROMBROWSER: opts['cookiesfrombrowser'] = YTDL_COOKIESFROMBROWSER return opts FFMPEG_BEFORE = "-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5" FFMPEG_OPTS = { 'before_options': FFMPEG_BEFORE, 'options': '-vn' } def _get_state(guild: discord.Guild) -> Dict: st = MUSIC_STATE.get(guild.id) if not st: st = MUSIC_STATE[guild.id] = { 'queue': [], 'now': None, 'volume': 0.5, # 50% } return st async def _ensure_connected(ctx: commands.Context) -> Optional[discord.VoiceClient]: if not ctx.guild: await ctx.reply("โŒ This command can only be used in a server.") return None if not ctx.author.voice or not ctx.author.voice.channel: await ctx.reply("โŒ You must be connected to a voice channel.") return None channel = ctx.author.voice.channel if ctx.voice_client and ctx.voice_client.channel == channel: return ctx.voice_client if ctx.voice_client and ctx.voice_client.channel != channel: try: await ctx.voice_client.move_to(channel) return ctx.voice_client except Exception as e: await ctx.reply(f"โŒ Couldn't move to your channel: {e}") return None try: vc = await channel.connect() return vc except Exception as e: await ctx.reply(f"โŒ Couldn't join voice: {e}") return None async def _ytdlp_extract(loop: asyncio.AbstractEventLoop, query: str) -> Optional[Dict]: if not ytdlp: return None # Try multiple YouTube client profiles to bypass some restrictions env_client = os.getenv('YTDL_YT_CLIENT', 'android') # Prefer android first in fallbacks to avoid SABR issues on web fallback_clients = ['android', 'web', 'ios', 'tv'] clients_to_try: List[str] = [] for c in [env_client] + fallback_clients: if c not in clients_to_try: clients_to_try.append(c) last_error: Optional[Exception] = None for client in clients_to_try: def _extract_with_client(): with ytdlp.YoutubeDL(get_ytdl_opts(client)) as ytdl: return ytdl.extract_info(query, download=False) try: info = await loop.run_in_executor(executor, _extract_with_client) if info is None: continue if 'entries' in info: info = info['entries'][0] return info except Exception as e: last_error = e msg = str(e) print(f"โš ๏ธ yt-dlp attempt failed with client='{client}': {msg[:200]}") # If it's clearly a bot-check/cookies or client format/signature issue, try next client retriable_markers = ( 'Sign in to confirm', 'Use --cookies', 'pass cookies', 'HTTP Error 403', 'Signature extraction failed', 'Requested format is not available', 'Only images are available', 'missing a url', 'SABR streaming' ) if any(m in msg for m in retriable_markers): continue # Otherwise, break early for other errors break # All attempts failed if last_error: msg = str(last_error) if 'Sign in to confirm' in msg or 'Use --cookies' in msg or 'pass cookies' in msg: print("โŒ yt-dlp error: YouTube requires cookies to proceed.") print(" Provide YTDL_COOKIES_FILE or YTDL_COOKIES_B64 (Netscape cookies.txt).") traceback.print_exc() return None async def _create_audio_source(loop: asyncio.AbstractEventLoop, search: str, volume: float): # Accept either URL or search text; prepend ytsearch1: if not a URL if not (search.startswith('http://') or search.startswith('https://')): search = f"ytsearch1:{search}" info = await _ytdlp_extract(loop, search) if not info: return None url = info.get('url') webpage_url = info.get('webpage_url') or info.get('original_url') or url title = info.get('title') or 'Unknown title' uploader = info.get('uploader') or '' thumb = None if isinstance(info.get('thumbnails'), list) and info['thumbnails']: thumb = info['thumbnails'][-1].get('url') # Use PCM + volume transformer for runtime volume control audio = discord.PCMVolumeTransformer(discord.FFmpegPCMAudio(url, **FFMPEG_OPTS), volume=max(0.0, min(2.0, volume))) return { 'source': audio, 'title': title, 'webpage_url': webpage_url, 'uploader': uploader, 'thumbnail': thumb, 'stream_url': url, } def _start_next(guild: discord.Guild, ctx_channel: Optional[discord.abc.Messageable] = None): state = _get_state(guild) vc = guild.voice_client if not vc: state['now'] = None state['queue'].clear() return if vc.is_playing() or vc.is_paused(): return if not state['queue']: state['now'] = None return item = state['queue'].pop(0) state['now'] = item def _after(error): if error: print(f"Audio error: {error}") # Schedule next on event loop thread-safely bot.loop.call_soon_threadsafe(_start_next, guild, None) try: vc.play(item['source'], after=_after) except Exception as e: print(f"Failed to start playback: {e}") bot.loop.call_soon_threadsafe(_start_next, guild, None) return # Optionally announce now playing if ctx_channel: try: embed = discord.Embed(title="๐ŸŽถ Now Playing", description=f"[{item['title']}]({item['webpage_url']})", color=discord.Color.purple()) if item.get('thumbnail'): embed.set_thumbnail(url=item['thumbnail']) ctx_task = ctx_channel.send(embed=embed) asyncio.create_task(ctx_task) except Exception: pass # ------------- Music Commands ------------- @bot.hybrid_command(name='join', description='Join your voice channel') async def join(ctx: commands.Context): vc = await _ensure_connected(ctx) if vc: await ctx.reply(f"โœ… Joined {vc.channel.mention}") @bot.hybrid_command(name='leave', description='Leave the voice channel and clear the queue') async def leave(ctx: commands.Context): if not ctx.voice_client: await ctx.reply("โŒ I'm not in a voice channel.") return state = _get_state(ctx.guild) state['queue'].clear() state['now'] = None try: await ctx.voice_client.disconnect() await ctx.reply("๐Ÿ‘‹ Left the voice channel and cleared the queue.") except Exception as e: await ctx.reply(f"โŒ Couldn't leave: {e}") @bot.hybrid_command(name='play', description='Play a YouTube URL or search query') async def play(ctx: commands.Context, *, query: str): vc = await _ensure_connected(ctx) if not vc: return loop = asyncio.get_running_loop() state = _get_state(ctx.guild) await ctx.reply("๐Ÿ”Ž Searchingโ€ฆ") item = await _create_audio_source(loop, query, state['volume']) if not item: # Give a hint if cookies or client change likely required hint = ( "If this is YouTube, try YTDL_YT_CLIENT=android (or web) and/or provide cookies via " "YTDL_COOKIES_FILE or YTDL_COOKIES_B64." ) await ctx.reply(f"โŒ Couldn't get audio from that query.\n{hint}") return state['queue'].append(item) if not vc.is_playing() and not vc.is_paused() and not state['now']: _start_next(ctx.guild, ctx.channel) else: await ctx.reply(f"โž• Queued: [{item['title']}]({item['webpage_url']})") @bot.hybrid_command(name='skip', description='Skip the current track') async def skip(ctx: commands.Context): if not ctx.voice_client or not ctx.voice_client.is_playing(): await ctx.reply("โŒ Nothing is playing.") return ctx.voice_client.stop() await ctx.reply("โญ๏ธ Skipped.") @bot.hybrid_command(name='stop', description='Stop playback and clear the queue') async def stop(ctx: commands.Context): state = _get_state(ctx.guild) state['queue'].clear() state['now'] = None if ctx.voice_client and (ctx.voice_client.is_playing() or ctx.voice_client.is_paused()): ctx.voice_client.stop() await ctx.reply("โน๏ธ Stopped and cleared the queue.") @bot.hybrid_command(name='pause', description='Pause playback') async def pause(ctx: commands.Context): if not ctx.voice_client or not ctx.voice_client.is_playing(): await ctx.reply("โŒ Nothing is playing.") return ctx.voice_client.pause() await ctx.reply("โธ๏ธ Paused.") @bot.hybrid_command(name='resume', description='Resume playback') async def resume(ctx: commands.Context): if not ctx.voice_client or not ctx.voice_client.is_paused(): await ctx.reply("โŒ Nothing to resume.") return ctx.voice_client.resume() await ctx.reply("โ–ถ๏ธ Resumed.") @bot.hybrid_command(name='queue', description='Show the queue') async def queue_cmd(ctx: commands.Context): state = _get_state(ctx.guild) now = state.get('now') q = state.get('queue', []) if not now and not q: await ctx.reply("๐Ÿ—’๏ธ Queue is empty.") return desc = "" if now: desc += f"Now: [{now['title']}]({now['webpage_url']})\n\n" if q: for i, it in enumerate(q[:10], 1): desc += f"{i}. [{it['title']}]({it['webpage_url']})\n" if len(q) > 10: desc += f"โ€ฆand {len(q)-10} more" embed = discord.Embed(title="๐Ÿ“œ Queue", description=desc, color=discord.Color.blurple()) await ctx.reply(embed=embed) @bot.hybrid_command(name='np', description='Show the current track') async def now_playing(ctx: commands.Context): state = _get_state(ctx.guild) now = state.get('now') if not now: await ctx.reply("โŒ Nothing is playing.") return embed = discord.Embed(title="๐ŸŽถ Now Playing", description=f"[{now['title']}]({now['webpage_url']})", color=discord.Color.purple()) if now.get('thumbnail'): embed.set_thumbnail(url=now['thumbnail']) await ctx.reply(embed=embed) @bot.hybrid_command(name='volume', description='Set the player volume (0-200%)') async def volume(ctx: commands.Context, percent: int): if percent < 0 or percent > 200: await ctx.reply("โŒ Volume must be between 0 and 200.") return state = _get_state(ctx.guild) vol = percent / 100.0 state['volume'] = vol # Update current source if playing now = state.get('now') if now and now['source'] and isinstance(now['source'], discord.PCMVolumeTransformer): now['source'].volume = vol await ctx.reply(f"๐Ÿ”Š Volume set to {percent}%.") # ------------- Connectivity / Web Test ------------- @bot.hybrid_command(name='webtest', description='Test outbound web access and YouTube extraction') async def webtest(ctx: commands.Context, *, query: Optional[str] = None): """Check basic web reachability and try a lightweight yt-dlp extraction. - Tests HTTP GET to a few endpoints - Tries yt-dlp search or URL extraction (if yt-dlp is available) """ urls = [ ("Google 204", "https://www.google.com/generate_204"), ("YouTube", "https://www.youtube.com"), ("Discord API", "https://discord.com/api/v10/gateway"), ] results = [] timeout = aiohttp.ClientTimeout(total=8) try: async with aiohttp.ClientSession(timeout=timeout) as session: for name, url in urls: t0 = time.monotonic() status = None err = None try: async with session.get(url, allow_redirects=True) as resp: status = resp.status except Exception as e: err = str(e) dt = (time.monotonic() - t0) * 1000 results.append((name, url, status, err, dt)) except Exception as e: results.append(("session", "", None, f"Session error: {e}", 0.0)) # Try yt-dlp extraction if available ytdlp_ok = False ytdlp_msg = "" chosen = query or "ytsearch1:never gonna give you up" try: loop = asyncio.get_running_loop() info = await _ytdlp_extract(loop, chosen) if info: ytdlp_ok = True ytdlp_msg = f"Success: {info.get('title', 'unknown title')}" else: ytdlp_msg = "No info returned (possibly cookies required or blocked)." except Exception as e: ytdlp_msg = f"Error: {e}" # Cookies configured? cookies_file = os.getenv('YTDL_COOKIES_FILE') cookies_b64 = bool(os.getenv('YTDL_COOKIES_B64')) from_browser = os.getenv('YTDL_COOKIES_FROM_BROWSER') desc_lines = [] for name, url, status, err, dt in results: if status is not None: desc_lines.append(f"โ€ข {name}: {status} ({dt:.0f} ms)") else: desc_lines.append(f"โ€ข {name}: โŒ {err or 'unknown error'}") desc = "\n".join(desc_lines) embed = discord.Embed(title="๐ŸŒ Web Test", description=desc, color=discord.Color.teal()) embed.add_field(name="yt-dlp", value=("โœ… " + ytdlp_msg) if ytdlp_ok else ("โŒ " + ytdlp_msg), inline=False) cookie_status = [] if cookies_file and os.path.exists(cookies_file): cookie_status.append(f"file: {cookies_file}") elif cookies_file: cookie_status.append(f"file missing: {cookies_file}") if cookies_b64: cookie_status.append("b64: provided") if from_browser: cookie_status.append(f"from-browser: {from_browser}") if not cookie_status: cookie_status.append("none") embed.add_field(name="yt-dlp cookies", value=", ".join(cookie_status), inline=False) await ctx.reply(embed=embed) def _flag_from_iso2(code: str) -> Optional[str]: """Return unicode flag from 2-letter ISO code (e.g., 'DE' -> ๐Ÿ‡ฉ๐Ÿ‡ช).""" if not code or len(code) != 2: return None code = code.upper() base = 0x1F1E6 try: return chr(base + ord(code[0]) - ord('A')) + chr(base + ord(code[1]) - ord('A')) except Exception: return None # Emotes markdown loader and map def load_emote_markdown(path: Optional[str] = None) -> Dict[str, str]: """Parse emotes.markdown and return a mapping of lowercased emoji names to their mention strings. Expected line format: <:Name:123456789012345678> Lines that don't match are ignored.""" if path is None: base_dir = os.path.dirname(os.path.abspath(__file__)) path = os.path.join(base_dir, 'emotes.markdown') mapping: Dict[str, str] = {} try: with open(path, 'r', encoding='utf-8') as f: for raw in f: line = raw.strip() if not line or not line.startswith('<:') or ':' not in line[2:]: continue # Format is <:NAME:ID> try: inner = line[2:-1] if line.endswith('>') else line[2:] name, emoji_id = inner.split(':', 1) name = name.strip() mention = f"<:{name}:{emoji_id.strip('>')}>" mapping[name.lower()] = mention except Exception: continue except FileNotFoundError: # Silent if not present pass except Exception as e: print(f"โš ๏ธ Failed to load emotes.markdown: {e}") return mapping # Load emotes mapping at import EMOTE_MAP: Dict[str, str] = load_emote_markdown() if EMOTE_MAP: print(f"๐Ÿ˜€ Loaded {len(EMOTE_MAP)} custom emojis from emotes.markdown") def load_country_tags(path: Optional[str] = None) -> Dict[str, str]: """Load HOI4 country tags mapping from tags.txt. Supported formats per line: TAG=Country Name | TAG:Country Name | TAG,Country Name | TAG Country Name Lines starting with # are ignored. Returns dict like { 'GER': 'Germany', ... }""" if path is None: base_dir = os.path.dirname(os.path.abspath(__file__)) path = os.path.join(base_dir, 'tags.txt') mapping: Dict[str, str] = {} try: with open(path, 'r', encoding='utf-8') as f: for raw in f: line = raw.strip() if not line or line.startswith('#'): continue tag = None name = None for sep in ['=', ';', ',', ':']: if sep in line: left, right = line.split(sep, 1) tag = left.strip().upper() name = right.strip() break if tag is None: parts = line.split(None, 1) if len(parts) == 2: tag = parts[0].strip().upper() name = parts[1].strip() else: tag = line.strip().upper() name = line.strip() if tag and name: mapping[tag] = name except FileNotFoundError: print("โ„น๏ธ tags.txt not found; proceeding without country tag labels") except Exception as e: print(f"โš ๏ธ Failed to load tags.txt: {e}") return mapping # Loaded at import COUNTRY_TAGS: Dict[str, str] = load_country_tags() if COUNTRY_TAGS: print(f"๐Ÿ—บ๏ธ Loaded {len(COUNTRY_TAGS)} HOI4 country tags") def get_country_label(country_tag: Optional[str]) -> Optional[str]: """Return a display label like "[GER] Germany" if known, or "[GER]" if unknown.""" if not country_tag: return None tag = country_tag.strip().upper() name = COUNTRY_TAGS.get(tag) return f"[{tag}] {name}" if name else f"[{tag}]" def get_country_emoji(ctx: commands.Context, country: Optional[str]) -> str: """Prefer custom emoji matching the HOI4 tag (e.g., ger, hoi4_ger). If parameter is ISO2, show unicode flag. Else empty.""" if not country: return "" c = country.strip() # Try custom emoji lookups using tag variants variants = [c, c.lower(), f"hoi4_{c.lower()}", f"country_{c.lower()}"] custom = find_custom_emoji(ctx, variants) if custom: return custom # If user passed ISO2, render unicode flag if len(c) == 2: flag = _flag_from_iso2(c) if flag: return flag # Otherwise, no emoji fallback to avoid noisy globes return "" # Database Functions # Database Functions async def init_database(): """Initialize database connection and create tables""" global db_pool try: # Parse DATABASE_URL for MySQL connection db_config = parse_database_url(DATABASE_URL) if not db_config: raise ValueError("Invalid DATABASE_URL format") print(f"๐Ÿ”Œ Connecting to MySQL: {db_config['host']}:{db_config['port']}/{db_config['db']}") # Create MySQL connection pool db_pool = await aiomysql.create_pool( host=db_config['host'], port=db_config['port'], user=db_config['user'], password=db_config['password'], db=db_config['db'], charset='utf8mb4', autocommit=True, maxsize=10 ) async with db_pool.acquire() as conn: async with conn.cursor() as cursor: # Create players table (MySQL syntax) await cursor.execute(''' CREATE TABLE IF NOT EXISTS players ( id INT AUTO_INCREMENT PRIMARY KEY, discord_id BIGINT UNIQUE NOT NULL, username VARCHAR(255) NOT NULL, standard_elo INT DEFAULT 800, competitive_elo INT DEFAULT 800, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ) ''') # Create games table (MySQL syntax) await cursor.execute(''' CREATE TABLE IF NOT EXISTS games ( id INT AUTO_INCREMENT PRIMARY KEY, game_name VARCHAR(255) NOT NULL, game_type VARCHAR(50) NOT NULL, status VARCHAR(50) DEFAULT 'setup', players JSON NOT NULL, winner_team VARCHAR(255), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, finished_at TIMESTAMP NULL ) ''') # Create game_results table (MySQL syntax) await cursor.execute(''' CREATE TABLE IF NOT EXISTS game_results ( id INT AUTO_INCREMENT PRIMARY KEY, game_id INT, discord_id BIGINT NOT NULL, team_name VARCHAR(255) NOT NULL, t_level INT NOT NULL, old_elo INT NOT NULL, new_elo INT NOT NULL, elo_change INT NOT NULL, won BOOLEAN NOT NULL, result_type VARCHAR(10) DEFAULT 'loss', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (game_id) REFERENCES games(id) ) ''') # Add result_type column if it doesn't exist (for existing databases) try: await cursor.execute(''' ALTER TABLE game_results ADD COLUMN result_type VARCHAR(10) DEFAULT 'loss' ''') except: # Column already exists, ignore error pass print("โœ… Database initialized successfully") except Exception as e: print(f"โŒ Database initialization failed: {e}") import traceback traceback.print_exc() async def get_or_create_player(discord_id: int, username: str) -> Dict: """Get or create a player in the database""" async with db_pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cursor: # Try to get existing player await cursor.execute( "SELECT * FROM players WHERE discord_id = %s", (discord_id,) ) player = await cursor.fetchone() if not player: # Create new player await cursor.execute( "INSERT INTO players (discord_id, username) VALUES (%s, %s)", (discord_id, username) ) await cursor.execute( "SELECT * FROM players WHERE discord_id = %s", (discord_id,) ) player = await cursor.fetchone() else: # Update username if changed await cursor.execute( "UPDATE players SET username = %s, updated_at = CURRENT_TIMESTAMP WHERE discord_id = %s", (username, discord_id) ) return dict(player) def calculate_elo_change(player_elo: int, opponent_avg_elo: int, result: str, t_level: int) -> int: """Calculate ELO change using standard ELO formula with T-level multiplier result can be 'win', 'loss', or 'draw' In draws: - If you're expected to win (higher ELO), you lose points for drawing - If you're expected to lose (lower ELO), you gain points for drawing - The bigger the ELO difference, the bigger the swing """ expected_score = 1 / (1 + 10 ** ((opponent_avg_elo - player_elo) / 400)) if result == 'win': actual_score = 1.0 elif result == 'draw': actual_score = 0.5 # Draw = 50% score else: # loss actual_score = 0.0 # Calculate the base ELO change base_change = K_FACTOR * (actual_score - expected_score) # Apply T-level multiplier t_multiplier = T_LEVEL_MULTIPLIERS.get(t_level, 1.0) final_change = base_change * t_multiplier # For debugging/logging purposes, let's see what happens in draws if result == 'draw': elo_diff = player_elo - opponent_avg_elo expected_percentage = expected_score * 100 print(f"๐Ÿ“Š Draw calculation: Player ELO {player_elo} vs Opponent Avg {opponent_avg_elo}") print(f" ELO Difference: {elo_diff:+d} | Expected win chance: {expected_percentage:.1f}%") print(f" ELO change: {final_change:+.1f} (T{t_level} multiplier: {t_multiplier})") return round(final_change) # Owner only decorator def is_owner(): def predicate(ctx): return ctx.author.id == OWNER_ID return commands.check(predicate) # Owner Commands @bot.hybrid_command(name='reload', description='Reloads the bot and syncs slash commands (Owner only)') @is_owner() async def reload_bot(ctx): """Reloads the bot and syncs slash commands (Owner only)""" try: print(f"๐Ÿ”„ Reload command started by {ctx.author} (ID: {ctx.author.id})") # Send initial message embed = discord.Embed( title="๐Ÿ”„ Bot Reload", description="Reloading bot and syncing commands...", color=discord.Color.yellow() ) message = await ctx.send(embed=embed) print("๐Ÿ“ค Initial reload message sent") # Sync slash commands print("๐Ÿ”„ Starting command sync...") synced = await bot.tree.sync() print(f"โœ… Synced {len(synced)} commands successfully") # Update embed with success embed = discord.Embed( title="โœ… Bot Reloaded Successfully", description=f"Bot has been reloaded!\nSynced {len(synced)} slash commands.", color=discord.Color.green() ) embed.add_field(name="Servers", value=len(bot.guilds), inline=True) embed.add_field(name="Latency", value=f"{round(bot.latency * 1000)}ms", inline=True) embed.set_footer(text=f"Reloaded by {ctx.author}", icon_url=ctx.author.avatar.url if ctx.author.avatar else None) await message.edit(embed=embed) print("โœ… Reload completed successfully") except Exception as e: print(f"โŒ Reload failed with error: {type(e).__name__}: {e}") import traceback traceback.print_exc() embed = discord.Embed( title="โŒ Reload Failed", description=f"**Error Type:** {type(e).__name__}\n**Error:** {str(e)[:1500]}", color=discord.Color.red() ) await ctx.send(embed=embed) # HOI4 ELO Commands @bot.hybrid_command(name='hoi4create', description='Create a new HOI4 game') async def hoi4create(ctx, game_type: str, game_name: str): """Create a new HOI4 game""" if game_type.lower() not in ['standard', 'competitive']: await ctx.send("โŒ Game type must be either 'standard' or 'competitive'") return try: async with db_pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cursor: # Check if game name already exists and is active await cursor.execute( "SELECT * FROM games WHERE game_name = %s AND status = 'setup'", (game_name,) ) existing_game = await cursor.fetchone() if existing_game: await ctx.send(f"โŒ A game with name '{game_name}' is already in setup phase!") return # Create new game await cursor.execute( "INSERT INTO games (game_name, game_type, status, players) VALUES (%s, %s, 'setup', %s)", (game_name, game_type.lower(), '[]') ) embed = discord.Embed( title="๐ŸŽฎ Game Created", description=f"HOI4 {game_type.title()} game '{game_name}' has been created!", color=discord.Color.green() ) embed.add_field(name="Game Name", value=game_name, inline=True) embed.add_field(name="Type", value=game_type.title(), inline=True) embed.add_field(name="Status", value="Setup Phase", inline=True) embed.set_footer(text="Use /hoi4setup to add players to this game") await ctx.send(embed=embed) except Exception as e: await ctx.send(f"โŒ Error creating game: {str(e)}") @bot.hybrid_command(name='hoi4setup', description='Add a player to an existing game') async def hoi4setup(ctx, game_name: str, user: discord.Member, team_name: str, t_level: int, country: Optional[str] = None): """Add a player to an existing game""" if t_level not in [1, 2, 3]: await ctx.send("โŒ T-Level must be 1, 2, or 3") return try: async with db_pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cursor: # Get the game await cursor.execute( "SELECT * FROM games WHERE game_name = %s AND status = 'setup'", (game_name,) ) game = await cursor.fetchone() if not game: await ctx.send(f"โŒ No game found with name '{game_name}' in setup phase!") return # Get or create player player = await get_or_create_player(user.id, user.display_name) # Parse existing players players = json.loads(game['players']) if game['players'] else [] # Check if player already in game for p in players: if p['discord_id'] == user.id: await ctx.send(f"โŒ {user.display_name} is already in this game!") return # Add player to game player_data = { 'discord_id': user.id, 'username': user.display_name, 'team_name': team_name, 't_level': t_level, 'current_elo': player[f"{game['game_type']}_elo"], 'country': country.strip() if country else None } players.append(player_data) # Update game await cursor.execute( "UPDATE games SET players = %s WHERE id = %s", (json.dumps(players), game['id']) ) embed = discord.Embed( title="โœ… Player Added", description=f"{user.display_name} has been added to '{game_name}'!", color=discord.Color.green() ) embed.add_field(name="Player", value=user.display_name, inline=True) embed.add_field(name="Team", value=team_name, inline=True) embed.add_field(name="T-Level", value=f"T{t_level}", inline=True) embed.add_field(name="Current ELO", value=player[f"{game['game_type']}_elo"], inline=True) if country: flag = get_country_emoji(ctx, country) label = get_country_label(country) value = f"{flag} {label}".strip() embed.add_field(name="Country", value=value, inline=True) embed.add_field(name="Players in Game", value=len(players), inline=True) await ctx.send(embed=embed) except Exception as e: await ctx.send(f"โŒ Error adding player: {str(e)}") @bot.hybrid_command(name='hoi4end', description='End a game and calculate ELO changes') async def hoi4end(ctx, game_name: str, winner_team: str): """End a game and calculate ELO changes. Use 'draw' for ties.""" try: async with db_pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cursor: # Get the game await cursor.execute( "SELECT * FROM games WHERE game_name = %s AND status = 'setup'", (game_name,) ) game = await cursor.fetchone() if not game: await ctx.send(f"โŒ No active game found with name '{game_name}'!") return players = json.loads(game['players']) if game['players'] else [] if len(players) < 2: await ctx.send("โŒ Game needs at least 2 players to end!") return # Check if winner team exists or if it's a draw teams = {p['team_name'] for p in players} is_draw = winner_team.lower() == 'draw' if not is_draw and winner_team not in teams: available_teams = ', '.join(teams) await ctx.send(f"โŒ Team '{winner_team}' not found in game! Available teams: {available_teams}, draw") return # Calculate team averages team_elos = {} team_players = {} for player in players: team = player['team_name'] if team not in team_elos: team_elos[team] = [] team_players[team] = [] team_elos[team].append(player['current_elo']) team_players[team].append(player) # Calculate average ELOs for each team team_averages = {team: sum(elos) / len(elos) for team, elos in team_elos.items()} elo_changes = [] # Calculate ELO changes for each player for player in players: team = player['team_name'] # Determine result for this player if is_draw: result = 'draw' elif team == winner_team: result = 'win' else: result = 'loss' # Calculate opponent average (average of all other teams) opponent_elos = [] for other_team, elos in team_elos.items(): if other_team != team: opponent_elos.extend(elos) opponent_avg = sum(opponent_elos) / len(opponent_elos) if opponent_elos else player['current_elo'] elo_change = calculate_elo_change( player['current_elo'], opponent_avg, result, player['t_level'] ) new_elo = max(0, player['current_elo'] + elo_change) # Prevent negative ELO elo_changes.append({ 'discord_id': player['discord_id'], 'username': player['username'], 'team_name': team, 't_level': player['t_level'], 'old_elo': player['current_elo'], 'new_elo': new_elo, 'elo_change': elo_change, 'result': result }) # Update player ELOs and save game results for change in elo_changes: # Update player ELO elo_field = f"{game['game_type']}_elo" await cursor.execute( f"UPDATE players SET {elo_field} = %s, updated_at = CURRENT_TIMESTAMP WHERE discord_id = %s", (change['new_elo'], change['discord_id']) ) # Save game result won = change['result'] == 'win' await cursor.execute( """INSERT INTO game_results (game_id, discord_id, team_name, t_level, old_elo, new_elo, elo_change, won, result_type) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)""", (game['id'], change['discord_id'], change['team_name'], change['t_level'], change['old_elo'], change['new_elo'], change['elo_change'], won, change['result']) ) # Mark game as finished final_result = "Draw" if is_draw else winner_team await cursor.execute( "UPDATE games SET status = 'finished', winner_team = %s, finished_at = CURRENT_TIMESTAMP WHERE id = %s", (final_result, game['id']) ) # After DB updates, try to sync Discord roles for affected players (only for this game's category) try: guild = ctx.guild if guild: for change in elo_changes: member = guild.get_member(change['discord_id']) if member is None: try: member = await guild.fetch_member(change['discord_id']) except Exception: member = None if member: await update_member_elo_role( member, change['new_elo'], game['game_type'], reason=f"HOI4 {game['game_type']} ELO updated in '{game_name}'" ) except Exception as e: logging.warning(f"Role sync after game end failed: {e}") # Create result embed if is_draw: embed = discord.Embed( title="๐Ÿค Game Finished!", description=f"Game '{game_name}' has ended!\n**Result: Draw**", color=discord.Color.orange() ) else: embed = discord.Embed( title="๐Ÿ† Game Finished!", description=f"Game '{game_name}' has ended!\n**Winner: {winner_team}**", color=discord.Color.gold() ) # Group results by team teams_results = {} for change in elo_changes: team = change['team_name'] if team not in teams_results: teams_results[team] = [] teams_results[team].append(change) for team, team_changes in teams_results.items(): team_text = "" for change in team_changes: emoji = "๐Ÿ“ˆ" if change['elo_change'] > 0 else "๐Ÿ“‰" if change['elo_change'] < 0 else "โžก๏ธ" result_emoji = "" if change['result'] == 'win': result_emoji = "๐Ÿ†" elif change['result'] == 'draw': result_emoji = "๐Ÿค" else: result_emoji = "๐Ÿ’”" team_text += f"{change['username']}: {change['old_elo']} โ†’ {change['new_elo']} ({change['elo_change']:+d}) {emoji}\n" # Team header with appropriate icon if is_draw: team_header = f"๐Ÿค Team {team}" elif team == winner_team: team_header = f"๐Ÿ† Team {team}" else: team_header = f"๐Ÿ’” Team {team}" embed.add_field( name=team_header, value=team_text, inline=False ) embed.set_footer(text=f"Game Type: {game['game_type'].title()}") await ctx.send(embed=embed) except Exception as e: await ctx.send(f"โŒ Error ending game: {str(e)}") @bot.hybrid_command(name='hoi4stats', description='Show your HOI4 ELO statistics') async def hoi4stats(ctx, user: Optional[discord.Member] = None): """Show HOI4 ELO statistics for a user""" target_user = user or ctx.author try: player = await get_or_create_player(target_user.id, target_user.display_name) # Get player rankings async with db_pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cursor: # Get standard rank await cursor.execute( "SELECT COUNT(*) + 1 as player_rank FROM players WHERE standard_elo > %s", (player['standard_elo'],) ) standard_rank_result = await cursor.fetchone() standard_rank = standard_rank_result['player_rank'] # Get competitive rank await cursor.execute( "SELECT COUNT(*) + 1 as player_rank FROM players WHERE competitive_elo > %s", (player['competitive_elo'],) ) competitive_rank_result = await cursor.fetchone() competitive_rank = competitive_rank_result['player_rank'] # Get total player count await cursor.execute("SELECT COUNT(*) as total FROM players") total_players_result = await cursor.fetchone() total_players = total_players_result['total'] # Get game statistics with proper draw detection await cursor.execute( """SELECT COUNT(*) as total_games, SUM(CASE WHEN result_type = 'win' THEN 1 ELSE 0 END) as games_won, SUM(CASE WHEN result_type = 'draw' THEN 1 ELSE 0 END) as games_drawn, SUM(CASE WHEN result_type = 'loss' THEN 1 ELSE 0 END) as games_lost FROM game_results WHERE discord_id = %s""", (target_user.id,) ) game_stats = await cursor.fetchone() total_games = game_stats['total_games'] or 0 games_won = game_stats['games_won'] or 0 games_drawn = game_stats['games_drawn'] or 0 games_lost = game_stats['games_lost'] or 0 win_rate = (games_won / total_games * 100) if total_games > 0 else 0 # Create rank indicators with medals def get_rank_display(rank, total): if rank == 1: return f"๐Ÿฅ‡ #{rank} of {total}" elif rank == 2: return f"๐Ÿฅˆ #{rank} of {total}" elif rank == 3: return f"๐Ÿฅ‰ #{rank} of {total}" else: return f"#{rank} of {total}" embed = discord.Embed( title=f"๐Ÿ“Š HOI4 ELO Stats - {target_user.display_name}", color=discord.Color.blue() ) # ELO and Rankings embed.add_field( name="๐ŸŽฏ Standard ELO", value=f"**{player['standard_elo']}** ELO\n{get_rank_display(standard_rank, total_players)}", inline=True ) embed.add_field( name="๐Ÿ† Competitive ELO", value=f"**{player['competitive_elo']}** ELO\n{get_rank_display(competitive_rank, total_players)}", inline=True ) embed.add_field( name="๐Ÿ“… Player Since", value=player['created_at'].strftime("%d/%m/%Y"), inline=True ) # Game Statistics if total_games > 0: embed.add_field( name="๐ŸŽฎ Games Played", value=f"**{total_games}** total games", inline=True ) if games_drawn > 0: embed.add_field( name="๐Ÿ“Š W/D/L Record", value=f"**{games_won}W** / **{games_drawn}D** / **{games_lost}L**", inline=True ) else: embed.add_field( name="๐Ÿ“ˆ Win/Loss", value=f"**{games_won}W** / **{games_lost}L**", inline=True ) embed.add_field( name="๐Ÿ“Š Win Rate", value=f"**{win_rate:.1f}%**", inline=True ) else: embed.add_field( name="๐ŸŽฎ Games Played", value="No games played yet", inline=False ) if target_user.avatar: embed.set_thumbnail(url=target_user.avatar.url) # Add percentile information standard_percentile = ((total_players - standard_rank) / total_players * 100) if total_players > 0 else 0 competitive_percentile = ((total_players - competitive_rank) / total_players * 100) if total_players > 0 else 0 embed.set_footer( text=f"Standard: Top {100-standard_percentile:.1f}% | Competitive: Top {100-competitive_percentile:.1f}%" ) await ctx.send(embed=embed) # Try to keep user's roles in sync for both categories when stats are viewed try: if ctx.guild and isinstance(target_user, discord.Member): await update_member_elo_role( target_user, player['standard_elo'], 'standard', reason='HOI4 stats viewed: role sync') await update_member_elo_role( target_user, player['competitive_elo'], 'competitive', reason='HOI4 stats viewed: role sync') except Exception as e: logging.warning(f"Role sync on stats failed: {e}") except Exception as e: await ctx.send(f"โŒ Error getting stats: {str(e)}") @bot.hybrid_command(name='hoi4games', description='Show active games as team showcases') async def hoi4games(ctx): """Show all active games with teams presented side-by-side like a showcase.""" try: async with db_pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cursor: await cursor.execute( "SELECT * FROM games WHERE status = 'setup' ORDER BY created_at DESC" ) games = await cursor.fetchall() if not games: await ctx.send("๐Ÿ“ No active games found. Use `/hoi4create` to create a new game!") return embed = discord.Embed( title="๐ŸŽฎ Active HOI4 Games", description="Team lineups are shown side-by-side. Use /hoi4setup to add players.", color=discord.Color.green() ) # Limit total embed fields to avoid Discord limit (25). Each game uses up to 3 fields. max_games = max(1, 25 // 3) games = games[:max_games] for game in games: players = json.loads(game['players']) if game['players'] else [] # Build team structures teams: Dict[str, List[Dict]] = {} for p in players: teams.setdefault(p['team_name'], []).append(p) # Compute average ELO per team team_avgs = {t: (sum(m['current_elo'] for m in mlist) / len(mlist)) if mlist else 0 for t, mlist in teams.items()} # Sort teams by name to keep stable order ordered_teams = sorted(teams.items(), key=lambda x: x[0].lower()) # Helper to format a team's field def build_team_field(team_name: str, members: List[Dict]) -> Dict[str, str]: t_emoji = get_team_emoji(ctx, team_name) avg = team_avgs.get(team_name, 0) name = f"{t_emoji} {team_name} (avg {avg:.0f})" # Each player line: T-level emoji, name, elo lines = [] for m in sorted(members, key=lambda mm: (-mm.get('t_level', 2), mm['username'].lower())): te = get_t_emoji(ctx, int(m.get('t_level', 2))) ctry = m.get('country') flag = get_country_emoji(ctx, ctry) if ctry else "" label = get_country_label(ctry) if ctry else None parts = [te] if flag: parts.append(flag) if label: parts.append(label) parts.append(m['username']) lines.append(f"{' '.join(parts)} ({m['current_elo']})") value = "\n".join(lines) if lines else "No players yet" # Discord field value max ~1024 chars; trim if necessary if len(value) > 1000: value = value[:997] + "..." return {"name": name, "value": value} # If no teams yet, show empty placeholder for this game if not ordered_teams: embed.add_field( name=f"{game['game_name']} ({game['game_type'].title()})", value="No players yet", inline=False ) continue # For two teams, show A vs B with a center VS field; otherwise, list each team inline team_fields = [build_team_field(tn, members) for tn, members in ordered_teams] # Add a header field per game embed.add_field( name=f"๐ŸŽฏ {game['game_name']} ({game['game_type'].title()})", value=f"Players: {len(players)} | Teams: {len(ordered_teams)}", inline=False ) if len(team_fields) == 1: f1 = team_fields[0] embed.add_field(name=f1["name"], value=f1["value"], inline=False) elif len(team_fields) >= 2: f1, f2 = team_fields[0], team_fields[1] embed.add_field(name=f1["name"], value=f1["value"], inline=True) embed.add_field(name="โš”๏ธ VS โš”๏ธ", value="\u200b", inline=True) embed.add_field(name=f2["name"], value=f2["value"], inline=True) # If more teams exist, add them below for extra in team_fields[2:]: embed.add_field(name=extra["name"], value=extra["value"], inline=False) await ctx.send(embed=embed) except Exception as e: await ctx.send(f"โŒ Error getting games: {str(e)}") @bot.hybrid_command(name='hoi4history', description='Show past games with optional filters') async def hoi4history(ctx, limit: Optional[int] = 10, player: Optional[discord.Member] = None, game_name: Optional[str] = None, game_type: Optional[str] = None): """Show past games with optional filters""" if limit > 50: limit = 50 # Prevent too many results try: async with db_pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cursor: # Build dynamic query based on filters query = "SELECT * FROM games WHERE status = 'finished'" params = [] if game_name: query += " AND game_name LIKE %s" params.append(f"%{game_name}%") if game_type and game_type.lower() in ['standard', 'competitive']: query += " AND game_type = %s" params.append(game_type.lower()) if player: query += " AND JSON_CONTAINS(players, JSON_OBJECT('discord_id', %s))" params.append(player.id) query += " ORDER BY finished_at DESC LIMIT %s" params.append(limit) await cursor.execute(query, params) games = await cursor.fetchall() if not games: await ctx.send("๐Ÿ“ No finished games found with the specified filters!") return embed = discord.Embed( title="๐Ÿ“š HOI4 Game History", color=discord.Color.blue() ) if player: embed.description = f"Filtered by player: {player.display_name}" if game_name: embed.description = f"Filtered by game name: {game_name}" if game_type: embed.description = f"Filtered by type: {game_type.title()}" for game in games: players = json.loads(game['players']) if game['players'] else [] # Count teams and players teams = {} for p in players: team = p['team_name'] if team not in teams: teams[team] = 0 teams[team] += 1 # Format date finished_date = game['finished_at'].strftime("%d/%m/%Y %H:%M") if game['finished_at'] else "Unknown" # Winner indicator winner = game['winner_team'] if game['winner_team'] else "Unknown" game_info = f"**Winner:** {winner}\n" game_info += f"**Type:** {game['game_type'].title()}\n" game_info += f"**Players:** {len(players)} | **Teams:** {len(teams)}\n" game_info += f"**Finished:** {finished_date}" embed.add_field( name=f"๐Ÿ† {game['game_name']}", value=game_info, inline=False ) embed.set_footer(text=f"Showing {len(games)} of last {limit} games") await ctx.send(embed=embed) except Exception as e: await ctx.send(f"โŒ Error getting game history: {str(e)}") @bot.hybrid_command(name='hoi4leaderboard', description='Show ELO leaderboard') async def hoi4leaderboard(ctx, game_type: Optional[str] = "standard", limit: Optional[int] = 10): """Show ELO leaderboard for standard or competitive""" if game_type.lower() not in ['standard', 'competitive']: await ctx.send("โŒ Game type must be either 'standard' or 'competitive'") return if limit > 25: limit = 25 # Prevent too many results try: async with db_pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cursor: # Get top players by ELO elo_field = f"{game_type.lower()}_elo" await cursor.execute( f"SELECT discord_id, username, {elo_field}, created_at FROM players ORDER BY {elo_field} DESC LIMIT %s", (limit,) ) players = await cursor.fetchall() # Also get some statistics await cursor.execute( f"SELECT COUNT(*) as total_players, AVG({elo_field}) as avg_elo, MAX({elo_field}) as max_elo, MIN({elo_field}) as min_elo FROM players" ) stats = await cursor.fetchone() if not players: await ctx.send("๐Ÿ“ No players found in the database!") return # Create leaderboard embed embed = discord.Embed( title=f"๐Ÿ† HOI4 {game_type.title()} Leaderboard", color=discord.Color.gold() ) # Add statistics embed.description = f"**Total Players:** {stats['total_players']} | **Average ELO:** {stats['avg_elo']:.0f}" leaderboard_text = "" medals = ["๐Ÿฅ‡", "๐Ÿฅˆ", "๐Ÿฅ‰"] for i, player in enumerate(players, 1): # Get medal or rank number if i <= 3: rank_indicator = medals[i-1] else: rank_indicator = f"**{i}.**" elo_value = player[elo_field] username = player['username'] # Get additional player stats async with db_pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cursor: # Count games played with proper result detection await cursor.execute( """SELECT COUNT(*) as games_played, SUM(CASE WHEN result_type = 'win' THEN 1 ELSE 0 END) as games_won, SUM(CASE WHEN result_type = 'draw' THEN 1 ELSE 0 END) as games_drawn FROM game_results WHERE discord_id = %s""", (player['discord_id'],) ) player_stats = await cursor.fetchone() games_played = player_stats['games_played'] or 0 games_won = player_stats['games_won'] or 0 games_drawn = player_stats['games_drawn'] or 0 win_rate = (games_won / games_played * 100) if games_played > 0 else 0 leaderboard_text += f"{rank_indicator} **{username}** - {elo_value} ELO\n" if games_drawn > 0: leaderboard_text += f" ๐Ÿ“Š {games_played} games | {games_won}W-{games_drawn}D-{games_played - games_won - games_drawn}L | {win_rate:.1f}% win rate\n\n" else: leaderboard_text += f" ๐Ÿ“Š {games_played} games | {win_rate:.1f}% win rate\n\n" embed.add_field( name="Rankings", value=leaderboard_text, inline=False ) embed.set_footer(text=f"ELO Range: {stats['min_elo']:.0f} - {stats['max_elo']:.0f}") await ctx.send(embed=embed) except Exception as e: await ctx.send(f"โŒ Error getting leaderboard: {str(e)}") @bot.event async def on_command_error(ctx, error): """Handles command errors""" if isinstance(error, commands.CheckFailure): await ctx.send("โŒ You don't have permission to use this command!") elif isinstance(error, commands.CommandNotFound): # Silently ignore command not found errors pass elif isinstance(error, commands.MissingRequiredArgument): await ctx.send(f"โŒ Missing arguments! Command: `{ctx.command}`") elif isinstance(error, commands.BadArgument): await ctx.send("โŒ Invalid argument!") else: # Log detailed error information print(f"โŒ Unknown error in command '{ctx.command}': {type(error).__name__}: {error}") import traceback traceback.print_exc() # Send detailed error to user if owner; be safe for slash interactions try: content_owner = f"โŒ **Error Details (Owner only):**\n```python\n{type(error).__name__}: {str(error)[:1800]}\n```" content_user = "โŒ An unknown error occurred!" if hasattr(ctx, 'interaction') and ctx.interaction is not None: if not ctx.interaction.response.is_done(): await ctx.interaction.response.send_message(content_owner if ctx.author.id == OWNER_ID else content_user, ephemeral=ctx.author.id != OWNER_ID) else: await ctx.interaction.followup.send(content_owner if ctx.author.id == OWNER_ID else content_user, ephemeral=ctx.author.id != OWNER_ID) else: await ctx.send(content_owner if ctx.author.id == OWNER_ID else content_user) except Exception as send_err: # Final fallback: try reply, then ignore try: await ctx.reply("โŒ Error occurred, and sending details failed.") except Exception: pass async def main(): """Main function to start the bot""" # Load Discord token from environment variables token = os.getenv('DISCORD_TOKEN') if not token: print("โŒ DISCORD_TOKEN environment variable not found!") print("Please set the DISCORD_TOKEN variable in Coolify or create a .env file") return if not DATABASE_URL: print("โŒ DATABASE_URL environment variable not found!") print("Please set either DATABASE_URL or individual DB variables (DB_HOST, DB_NAME, DB_USER, DB_PASSWORD) in Coolify") print(f"Current values - HOST: {DB_HOST}, NAME: {DB_NAME}, USER: {DB_USER}, PASSWORD: {'***' if DB_PASSWORD else 'None'}") return try: print("๐Ÿš€ Starting bot...") await bot.start(token) except discord.LoginFailure: print("โŒ Invalid Discord token!") except Exception as e: print(f"โŒ Error starting bot: {e}") finally: if db_pool: await db_pool.close() if __name__ == "__main__": asyncio.run(main())