Files
Discord-ai-chatbot/app.py
SimolZimol d59e1bff21 modified: app.py
modified:   templates/logs.html
2024-09-03 13:16:18 +02:00

193 lines
6.1 KiB
Python

from flask import Flask, render_template, redirect, url_for, request, session, jsonify, send_file
import os
import subprocess
import psutil
import mysql.connector
from datetime import datetime
app = Flask(__name__)
app.secret_key = os.getenv("FLASK_SECRET_KEY", "default_secret_key")
LOG_FILE_PATH = os.path.join("logs", f"{datetime.now().strftime('%Y-%m-%d')}.log")
# Verwende Umgebungsvariablen aus Coolify für die Datenbankverbindung
DB_HOST = os.getenv("DB_HOST")
DB_PORT = os.getenv("DB_PORT")
DB_USER = os.getenv("DB_USER")
DB_PASS = os.getenv("DB_PASSWORD")
DB_NAME = os.getenv("DB_DATABASE")
# Globale Variablen für die Intros
INTRO_FILE = "introduction.txt"
ASKNOTES_INTRO_FILE = "asknotesintro.txt"
# Speichern der Prozess-ID
bot_process = None
def bot_status():
"""Überprüft, ob der Bot läuft."""
global bot_process
if bot_process is None:
return False
return bot_process.poll() is None # None bedeutet, dass der Prozess noch läuft
def start_bot():
"""Startet den Bot."""
global bot_process
if not bot_status():
bot_process = subprocess.Popen(["python", "bot.py"], cwd=os.path.dirname(os.path.abspath(__file__)))
else:
print("Bot läuft bereits.")
def stop_bot():
"""Stoppt den Bot."""
global bot_process
if bot_process and bot_status():
bot_process.terminate()
bot_process.wait() # Warten, bis der Prozess beendet ist
bot_process = None
else:
print("Bot läuft nicht.")
def load_text_file(file_path):
"""Lädt den Inhalt einer Textdatei."""
if os.path.exists(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
return ""
def save_text_file(file_path, content):
"""Speichert den Inhalt in einer Textdatei."""
with open(file_path, 'w', encoding='utf-8') as file:
file.write(content)
def get_db_connection():
"""Stellt eine Verbindung zur MySQL-Datenbank her."""
return mysql.connector.connect(
host=DB_HOST,
port=DB_PORT,
user=DB_USER,
password=DB_PASS,
database=DB_NAME
)
@app.route("/")
def index():
if "username" in session:
return render_template("index.html", bot_running=bot_status())
return redirect(url_for("login"))
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
username = request.form["username"]
password = request.form["password"]
if username == os.getenv("ADMIN_USER") and password == os.getenv("ADMIN_PASS"):
session["username"] = username
return redirect(url_for("index"))
else:
return "Invalid credentials!"
return render_template("login.html")
@app.route("/logout")
def logout():
session.pop("username", None)
return redirect(url_for("login"))
@app.route("/start_bot")
def start():
if "username" in session:
start_bot()
return redirect(url_for("index"))
return redirect(url_for("login"))
@app.route("/stop_bot")
def stop():
if "username" in session:
stop_bot()
return redirect(url_for("index"))
return redirect(url_for("login"))
@app.route("/settings", methods=["GET", "POST"])
def settings():
if "username" in session:
if request.method == "POST":
introduction = request.form.get("introduction")
asknotes_introduction = request.form.get("asknotes_introduction")
# Speichern der Intros
save_text_file(INTRO_FILE, introduction)
save_text_file(ASKNOTES_INTRO_FILE, asknotes_introduction)
return redirect(url_for("settings"))
# Laden der aktuellen Inhalte aus den Textdateien
introduction = load_text_file(INTRO_FILE)
asknotes_introduction = load_text_file(ASKNOTES_INTRO_FILE)
return render_template("settings.html", introduction=introduction, asknotes_introduction=asknotes_introduction)
return redirect(url_for("login"))
@app.route("/users")
def users():
"""Zeigt eine Liste aller Benutzer aus der Datenbank an."""
if "username" in session:
connection = get_db_connection()
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT * FROM user_data")
users = cursor.fetchall()
cursor.close()
connection.close()
return render_template("users.html", users=users)
return redirect(url_for("login"))
@app.route("/logs")
def view_logs():
"""Zeigt die Logs des Bots im Admin-Panel an."""
if "username" in session:
return render_template("logs.html")
return redirect(url_for("login"))
@app.route("/get_logs")
def get_logs():
"""Liest den Inhalt der Log-Datei und gibt ihn zurück."""
if "username" in session:
try:
with open(LOG_FILE_PATH, 'r', encoding='utf-8') as file:
logs = file.read()
return jsonify({"logs": logs})
except FileNotFoundError:
return jsonify({"logs": "Log file not found."})
return redirect(url_for("login"))
@app.route("/download_logs")
def download_logs():
"""Bietet die Log-Datei zum Download an."""
if "username" in session:
return send_file(LOG_FILE_PATH, as_attachment=True)
return redirect(url_for("login"))
import shutil
ARCHIVE_DIR = "archive_logs"
if not os.path.exists(ARCHIVE_DIR):
os.makedirs(ARCHIVE_DIR)
@app.route("/archive_logs", methods=["POST"])
def archive_logs():
"""Archiviert die aktuelle Log-Datei und beginnt eine neue Log-Datei."""
if "username" in session:
if os.path.exists(LOG_FILE_PATH):
timestamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
archive_file = os.path.join(ARCHIVE_DIR, f"log_{timestamp}.log")
shutil.move(LOG_FILE_PATH, archive_file)
with open(LOG_FILE_PATH, 'w', encoding='utf-8') as file:
file.write("") # Neue leere Log-Datei starten
return jsonify({"status": "success", "message": "Logs archived successfully."})
else:
return jsonify({"status": "error", "message": "Log file not found."})
return redirect(url_for("login"))
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=True)