modified: bot.py

This commit is contained in:
SimolZimol
2025-08-29 17:57:02 +02:00
parent 4c4c15f63e
commit 4e587a2cc5

470
bot.py
View File

@@ -21,7 +21,6 @@ import hashlib
from datetime import datetime, timedelta from datetime import datetime, timedelta
import concurrent.futures import concurrent.futures
from gtts import gTTS from gtts import gTTS
import speech_recognition as sr
import shutil import shutil
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from dotenv import load_dotenv from dotenv import load_dotenv
@@ -2359,21 +2358,9 @@ async def process_ai_queue():
await channel.send(embed=embed) await channel.send(embed=embed)
if ctx.voice_client: # If bot is in a voice channel if ctx.voice_client: # If bot is in a voice channel
try:
import shutil
# Check if FFmpeg is available
if not shutil.which("ffmpeg"):
await channel.send("⚠️ FFmpeg is not available. Audio playback disabled. (This should not happen in Docker deployment)")
else:
tts = gTTS(assistant_message, lang="en") tts = gTTS(assistant_message, lang="en")
tts.save("response.mp3") tts.save("response.mp3")
# Check if bot is still connected before playing
if ctx.voice_client and ctx.voice_client.is_connected():
ctx.voice_client.play(discord.FFmpegPCMAudio("response.mp3")) ctx.voice_client.play(discord.FFmpegPCMAudio("response.mp3"))
await channel.send("🔊 Playing TTS audio...")
except Exception as e:
await channel.send(f"⚠️ TTS audio playback failed: {str(e)}")
user_history.append({"role": "assistant", "content": assistant_message}) user_history.append({"role": "assistant", "content": assistant_message})
@@ -2519,312 +2506,6 @@ async def leave(ctx):
else: else:
await ctx.send("I am not in a voice channel.") await ctx.send("I am not in a voice channel.")
@client.hybrid_command()
async def speech_to_text(ctx, *, audio_attachment: discord.Attachment = None):
"""Converts audio file to text using speech recognition.
Usage:
/speech_to_text [attach audio file]
Supported formats: .wav, .mp3, .m4a, .ogg, .flac
"""
# Check if it's a slash command and defer if needed
is_slash_command = hasattr(ctx, 'interaction') and ctx.interaction
if is_slash_command:
await ctx.defer()
# Helper function for sending responses
async def send_response(content=None, embed=None, ephemeral=False):
try:
if is_slash_command and hasattr(ctx, 'followup'):
if embed:
await ctx.followup.send(embed=embed, ephemeral=ephemeral)
else:
await ctx.followup.send(content, ephemeral=ephemeral)
else:
if embed:
await ctx.send(embed=embed)
else:
await ctx.send(content)
except Exception as e:
logger.error(f"Error sending response in speech_to_text: {e}")
# Fallback to regular send if followup fails
try:
if embed:
await ctx.send(embed=embed)
else:
await ctx.send(content)
except Exception as fallback_error:
logger.error(f"Fallback send also failed: {fallback_error}")
try:
# Get attachment (either from parameter or message)
attachment = audio_attachment
if not attachment and hasattr(ctx, 'message') and ctx.message and ctx.message.attachments:
attachment = ctx.message.attachments[0]
if not attachment:
embed = discord.Embed(
title="❌ No Audio File",
description="Please attach an audio file to convert to text.\n\nSupported formats: `.wav`, `.mp3`, `.m4a`, `.ogg`, `.flac`",
color=0xff0000
)
await send_response(embed=embed, ephemeral=True)
return
# Check file type
supported_formats = ['.wav', '.mp3', '.m4a', '.ogg', '.flac']
file_extension = os.path.splitext(attachment.filename)[1].lower()
if file_extension not in supported_formats:
embed = discord.Embed(
title="❌ Unsupported Format",
description=f"File format `{file_extension}` is not supported.\n\nSupported formats: {', '.join(supported_formats)}",
color=0xff0000
)
await send_response(embed=embed, ephemeral=True)
return
# Check file size (max 25MB for Discord, but we'll be more conservative)
max_size = 10 * 1024 * 1024 # 10MB
if attachment.size > max_size:
embed = discord.Embed(
title="❌ File Too Large",
description=f"File size ({attachment.size / 1024 / 1024:.1f}MB) exceeds maximum allowed size (10MB).",
color=0xff0000
)
await send_response(embed=embed, ephemeral=True)
return
# Download and process the audio file
embed = discord.Embed(
title="🎤 Processing Audio",
description="Downloading and converting audio to text...",
color=0x3498db
)
await send_response(embed=embed)
# Create temp directory if it doesn't exist
temp_dir = "temp_audio"
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
# Download file
temp_filename = f"{uuid.uuid4()}_{attachment.filename}"
temp_filepath = os.path.join(temp_dir, temp_filename)
audio_data = await attachment.read()
with open(temp_filepath, 'wb') as f:
f.write(audio_data)
# Initialize speech recognizer
recognizer = sr.Recognizer()
try:
# Convert audio to WAV if needed (speech_recognition works best with WAV)
wav_filepath = temp_filepath
if file_extension != '.wav':
from pydub import AudioSegment
wav_filepath = os.path.splitext(temp_filepath)[0] + '.wav'
audio = AudioSegment.from_file(temp_filepath)
audio.export(wav_filepath, format="wav")
# Process audio file
with sr.AudioFile(wav_filepath) as source:
# Adjust for ambient noise
recognizer.adjust_for_ambient_noise(source, duration=0.5)
audio = recognizer.record(source)
# Convert speech to text using Google Speech Recognition
try:
text = recognizer.recognize_google(audio, language='en-US')
if not text.strip():
embed = discord.Embed(
title="⚠️ No Speech Detected",
description="No speech was detected in the audio file. Please ensure the audio contains clear speech.",
color=0xffa500
)
await send_response(embed=embed)
return
# Create success embed
embed = discord.Embed(
title="🎤 Speech to Text Result",
color=0x00ff00,
timestamp=datetime.now()
)
# Truncate text if too long for embed
if len(text) > 1000:
embed.add_field(
name="📝 Transcribed Text",
value=text[:1000] + "...",
inline=False
)
embed.add_field(
name=" Note",
value=f"Text was truncated. Full length: {len(text)} characters.",
inline=False
)
else:
embed.add_field(
name="📝 Transcribed Text",
value=text,
inline=False
)
embed.add_field(
name="📊 File Info",
value=f"**Filename:** {attachment.filename}\n**Size:** {attachment.size / 1024:.1f} KB\n**Format:** {file_extension.upper()}",
inline=True
)
embed.set_footer(text=f"Processed by {ctx.author.display_name}")
await send_response(embed=embed)
# If text is very long, also send as text file
if len(text) > 1500:
text_filename = f"transcription_{ctx.author.id}_{int(time.time())}.txt"
text_filepath = os.path.join(temp_dir, text_filename)
with open(text_filepath, 'w', encoding='utf-8') as f:
f.write(f"Speech-to-Text Transcription\n")
f.write(f"Original file: {attachment.filename}\n")
f.write(f"Processed by: {ctx.author.display_name}\n")
f.write(f"Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"\n{'='*50}\n\n")
f.write(text)
with open(text_filepath, 'rb') as f:
await ctx.send(
"📄 **Full transcription** (text too long for embed):",
file=discord.File(f, text_filename)
)
# Clean up text file
try:
os.remove(text_filepath)
except:
pass
except sr.UnknownValueError:
embed = discord.Embed(
title="❌ Speech Not Recognized",
description="Could not understand the speech in the audio file. Please ensure:\n• Audio is clear and not too noisy\n• Speech is in English\n• Audio quality is good",
color=0xff0000
)
await send_response(embed=embed)
except sr.RequestError as e:
embed = discord.Embed(
title="❌ Recognition Service Error",
description=f"Could not request results from speech recognition service: {e}",
color=0xff0000
)
await send_response(embed=embed)
finally:
# Clean up temporary files
try:
if os.path.exists(temp_filepath):
os.remove(temp_filepath)
if wav_filepath != temp_filepath and os.path.exists(wav_filepath):
os.remove(wav_filepath)
except Exception as e:
logger.warning(f"Could not clean up temp files: {e}")
except Exception as e:
logger.error(f"Error in speech_to_text command: {e}")
embed = discord.Embed(
title="❌ Error",
description="An error occurred while processing the audio file. Please try again.",
color=0xff0000
)
await send_response(embed=embed)
@client.hybrid_command()
async def live_speech(ctx, duration: int = 10):
"""Records audio from your microphone and converts it to text.
Usage:
/live_speech [duration in seconds (default: 10, max: 30)]
Note: You need to be in a voice channel with the bot.
"""
# Check if it's a slash command and defer if needed
is_slash_command = hasattr(ctx, 'interaction') and ctx.interaction
if is_slash_command:
await ctx.defer()
# Helper function for sending responses
async def send_response(content=None, embed=None, ephemeral=False):
try:
if is_slash_command and hasattr(ctx, 'followup'):
if embed:
await ctx.followup.send(embed=embed, ephemeral=ephemeral)
else:
await ctx.followup.send(content, ephemeral=ephemeral)
else:
if embed:
await ctx.send(embed=embed)
else:
await ctx.send(content)
except Exception as e:
logger.error(f"Error sending response in live_speech: {e}")
# Fallback to regular send if followup fails
try:
if embed:
await ctx.send(embed=embed)
else:
await ctx.send(content)
except Exception as fallback_error:
logger.error(f"Fallback send also failed: {fallback_error}")
try:
# Validate duration
if duration < 1:
duration = 1
elif duration > 30:
duration = 30
# Check if user is in voice channel
if not ctx.author.voice:
embed = discord.Embed(
title="❌ Not in Voice Channel",
description="You need to be in a voice channel to use live speech recognition.",
color=0xff0000
)
await send_response(embed=embed, ephemeral=True)
return
# Start recording notification
embed = discord.Embed(
title="🎤 Live Speech Recognition",
description=f"🔴 **Recording for {duration} seconds...**\n\nSpeak clearly into your microphone!",
color=0xff0000
)
await send_response(embed=embed)
# Note: This is a simplified version as Discord bots can't directly access user microphones
# This would require a client-side application or different implementation
embed = discord.Embed(
title="🎤 Live Speech Recognition",
description="⚠️ **Feature Note**\n\nLive microphone recording requires additional setup. Use `/speech_to_text` with audio files instead.",
color=0xffa500
)
await send_response(embed=embed)
except Exception as e:
logger.error(f"Error in live_speech command: {e}")
embed = discord.Embed(
title="🎤 Live Speech Recognition",
description="❌ **Error occurred**\n\nCould not process live speech recognition.",
color=0xff0000
)
await send_response(embed=embed)
@client.hybrid_command() @client.hybrid_command()
async def toggle_feature(ctx, feature: str, state: str): async def toggle_feature(ctx, feature: str, state: str):
"""Allows admin to enable or disable bot features.""" """Allows admin to enable or disable bot features."""
@@ -5952,157 +5633,6 @@ async def contact_status(ctx):
await ctx.send(f"Error getting contact status: {e}") await ctx.send(f"Error getting contact status: {e}")
logger.error(f"Error in contact_status: {e}") logger.error(f"Error in contact_status: {e}")
@client.hybrid_command()
async def join_voice(ctx):
"""Join the voice channel that the user is currently in."""
# Check if it's a slash command and defer if needed
is_slash_command = hasattr(ctx, 'interaction') and ctx.interaction
if is_slash_command:
await ctx.defer()
# Helper function for sending responses
async def send_response(content=None, embed=None, ephemeral=False):
try:
if is_slash_command and hasattr(ctx, 'followup'):
if embed:
return await ctx.followup.send(embed=embed, ephemeral=ephemeral)
else:
return await ctx.followup.send(content, ephemeral=ephemeral)
else:
# Fallback for regular commands or if followup isn't available
if embed:
return await ctx.send(embed=embed)
else:
return await ctx.send(content)
except Exception as e:
# Final fallback
if embed:
return await ctx.send(embed=embed)
else:
return await ctx.send(content)
if ctx.author.voice is None:
await send_response("❌ You need to be in a voice channel for me to join!")
return
voice_channel = ctx.author.voice.channel
if ctx.voice_client is not None:
if ctx.voice_client.channel == voice_channel:
await send_response("🔊 I'm already in your voice channel!")
return
else:
await ctx.voice_client.move_to(voice_channel)
await send_response(f"🔊 Moved to **{voice_channel.name}**!")
return
try:
await voice_channel.connect()
await send_response(f"✅ Joined **{voice_channel.name}**! You can now use TTS features.")
except Exception as e:
await send_response(f"❌ Failed to join voice channel: {str(e)}")
@client.hybrid_command()
async def leave_voice(ctx):
"""Leave the current voice channel."""
# Check if it's a slash command and defer if needed
is_slash_command = hasattr(ctx, 'interaction') and ctx.interaction
if is_slash_command:
await ctx.defer()
# Helper function for sending responses
async def send_response(content=None, embed=None, ephemeral=False):
try:
if is_slash_command and hasattr(ctx, 'followup'):
if embed:
return await ctx.followup.send(embed=embed, ephemeral=ephemeral)
else:
return await ctx.followup.send(content, ephemeral=ephemeral)
else:
# Fallback for regular commands or if followup isn't available
if embed:
return await ctx.send(embed=embed)
else:
return await ctx.send(content)
except Exception as e:
# Final fallback
if embed:
return await ctx.send(embed=embed)
else:
return await ctx.send(content)
if ctx.voice_client is None:
await send_response("❌ I'm not in a voice channel!")
return
try:
await ctx.voice_client.disconnect()
await send_response("👋 Left the voice channel!")
except Exception as e:
await send_response(f"❌ Failed to leave voice channel: {str(e)}")
@client.hybrid_command()
async def test_tts(ctx, *, text: str = "Hello! This is a TTS test."):
"""Test TTS functionality with custom text."""
# Check if it's a slash command and defer if needed
is_slash_command = hasattr(ctx, 'interaction') and ctx.interaction
if is_slash_command:
await ctx.defer()
# Helper function for sending responses
async def send_response(content=None, embed=None, ephemeral=False):
try:
if is_slash_command and hasattr(ctx, 'followup'):
if embed:
return await ctx.followup.send(embed=embed, ephemeral=ephemeral)
else:
return await ctx.followup.send(content, ephemeral=ephemeral)
else:
# Fallback for regular commands or if followup isn't available
if embed:
return await ctx.send(embed=embed)
else:
return await ctx.send(content)
except Exception as e:
# Final fallback
if embed:
return await ctx.send(embed=embed)
else:
return await ctx.send(content)
if ctx.voice_client is None:
await send_response("❌ I need to be in a voice channel first! Use `/join_voice` to get me connected.")
return
if len(text) > 500:
await send_response("❌ Text is too long! Please keep it under 500 characters.")
return
try:
import shutil
from gtts import gTTS
# Check if FFmpeg is available
if not shutil.which("ffmpeg"):
await send_response("⚠️ FFmpeg is not available. (This should not happen in Docker deployment)")
return
# Generate TTS audio
await send_response("🔄 Generating TTS audio...")
tts = gTTS(text, lang="en")
tts.save("tts_test.mp3")
# Check if bot is still connected before playing
if ctx.voice_client and ctx.voice_client.is_connected():
ctx.voice_client.play(discord.FFmpegPCMAudio("tts_test.mp3"))
await send_response(f"🔊 Playing TTS: \"{text[:100]}{'...' if len(text) > 100 else ''}\"")
else:
await send_response("❌ Lost connection to voice channel!")
except Exception as e:
await send_response(f"❌ TTS test failed: {str(e)}")
try: try:
# Initialize database tables # Initialize database tables
create_warnings_table() create_warnings_table()