modified: app.py
This commit is contained in:
79
app.py
79
app.py
@@ -11,6 +11,8 @@ from typing import Optional, List, Dict
|
|||||||
from concurrent.futures import ThreadPoolExecutor
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
import functools
|
import functools
|
||||||
import traceback
|
import traceback
|
||||||
|
import aiohttp
|
||||||
|
import time
|
||||||
import base64
|
import base64
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
@@ -644,6 +646,83 @@ async def volume(ctx: commands.Context, percent: int):
|
|||||||
now['source'].volume = vol
|
now['source'].volume = vol
|
||||||
await ctx.reply(f"🔊 Volume set to {percent}%.")
|
await ctx.reply(f"🔊 Volume set to {percent}%.")
|
||||||
|
|
||||||
|
# ------------- Connectivity / Web Test -------------
|
||||||
|
|
||||||
|
@bot.hybrid_command(name='webtest', description='Test outbound web access and YouTube extraction')
|
||||||
|
async def webtest(ctx: commands.Context, *, query: Optional[str] = None):
|
||||||
|
"""Check basic web reachability and try a lightweight yt-dlp extraction.
|
||||||
|
- Tests HTTP GET to a few endpoints
|
||||||
|
- Tries yt-dlp search or URL extraction (if yt-dlp is available)
|
||||||
|
"""
|
||||||
|
urls = [
|
||||||
|
("Google 204", "https://www.google.com/generate_204"),
|
||||||
|
("YouTube", "https://www.youtube.com"),
|
||||||
|
("Discord API", "https://discord.com/api/v10/gateway"),
|
||||||
|
]
|
||||||
|
results = []
|
||||||
|
timeout = aiohttp.ClientTimeout(total=8)
|
||||||
|
try:
|
||||||
|
async with aiohttp.ClientSession(timeout=timeout) as session:
|
||||||
|
for name, url in urls:
|
||||||
|
t0 = time.monotonic()
|
||||||
|
status = None
|
||||||
|
err = None
|
||||||
|
try:
|
||||||
|
async with session.get(url, allow_redirects=True) as resp:
|
||||||
|
status = resp.status
|
||||||
|
except Exception as e:
|
||||||
|
err = str(e)
|
||||||
|
dt = (time.monotonic() - t0) * 1000
|
||||||
|
results.append((name, url, status, err, dt))
|
||||||
|
except Exception as e:
|
||||||
|
results.append(("session", "<session>", None, f"Session error: {e}", 0.0))
|
||||||
|
|
||||||
|
# Try yt-dlp extraction if available
|
||||||
|
ytdlp_ok = False
|
||||||
|
ytdlp_msg = ""
|
||||||
|
chosen = query or "ytsearch1:never gonna give you up"
|
||||||
|
try:
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
info = await _ytdlp_extract(loop, chosen)
|
||||||
|
if info:
|
||||||
|
ytdlp_ok = True
|
||||||
|
ytdlp_msg = f"Success: {info.get('title', 'unknown title')}"
|
||||||
|
else:
|
||||||
|
ytdlp_msg = "No info returned (possibly cookies required or blocked)."
|
||||||
|
except Exception as e:
|
||||||
|
ytdlp_msg = f"Error: {e}"
|
||||||
|
|
||||||
|
# Cookies configured?
|
||||||
|
cookies_file = os.getenv('YTDL_COOKIES_FILE')
|
||||||
|
cookies_b64 = bool(os.getenv('YTDL_COOKIES_B64'))
|
||||||
|
from_browser = os.getenv('YTDL_COOKIES_FROM_BROWSER')
|
||||||
|
|
||||||
|
desc_lines = []
|
||||||
|
for name, url, status, err, dt in results:
|
||||||
|
if status is not None:
|
||||||
|
desc_lines.append(f"• {name}: {status} ({dt:.0f} ms)")
|
||||||
|
else:
|
||||||
|
desc_lines.append(f"• {name}: ❌ {err or 'unknown error'}")
|
||||||
|
|
||||||
|
desc = "\n".join(desc_lines)
|
||||||
|
embed = discord.Embed(title="🌐 Web Test", description=desc, color=discord.Color.teal())
|
||||||
|
embed.add_field(name="yt-dlp", value=("✅ " + ytdlp_msg) if ytdlp_ok else ("❌ " + ytdlp_msg), inline=False)
|
||||||
|
|
||||||
|
cookie_status = []
|
||||||
|
if cookies_file and os.path.exists(cookies_file):
|
||||||
|
cookie_status.append(f"file: {cookies_file}")
|
||||||
|
elif cookies_file:
|
||||||
|
cookie_status.append(f"file missing: {cookies_file}")
|
||||||
|
if cookies_b64:
|
||||||
|
cookie_status.append("b64: provided")
|
||||||
|
if from_browser:
|
||||||
|
cookie_status.append(f"from-browser: {from_browser}")
|
||||||
|
if not cookie_status:
|
||||||
|
cookie_status.append("none")
|
||||||
|
embed.add_field(name="yt-dlp cookies", value=", ".join(cookie_status), inline=False)
|
||||||
|
|
||||||
|
await ctx.reply(embed=embed)
|
||||||
|
|
||||||
def _flag_from_iso2(code: str) -> Optional[str]:
|
def _flag_from_iso2(code: str) -> Optional[str]:
|
||||||
"""Return unicode flag from 2-letter ISO code (e.g., 'DE' -> 🇩🇪)."""
|
"""Return unicode flag from 2-letter ISO code (e.g., 'DE' -> 🇩🇪)."""
|
||||||
if not code or len(code) != 2:
|
if not code or len(code) != 2:
|
||||||
|
|||||||
Reference in New Issue
Block a user