new file: .dockerignore

new file:   .env.example
	new file:   Dockerfile
	new file:   app.py
	new file:   blueprints/__init__.py
	new file:   blueprints/auth.py
	new file:   blueprints/chat.py
	new file:   blueprints/context.py
	new file:   blueprints/documents.py
	new file:   blueprints/main.py
	new file:   config.py
	new file:   docker-compose.yml
	new file:   models/__init__.py
	new file:   models/chat_session.py
	new file:   models/document.py
	new file:   models/user.py
	new file:   requirements.txt
	new file:   services/__init__.py
	new file:   services/document_parser.py
	new file:   services/llm_service.py
	new file:   services/rag_service.py
	new file:   services/url_scraper.py
	new file:   static/css/style.css
	new file:   static/js/chat.js
	new file:   static/js/inline_chat.js
	new file:   static/js/main.js
	new file:   templates/base.html
	new file:   templates/document_view.html
	new file:   templates/index.html
	new file:   templates/login.html
	new file:   templates/register.html
This commit is contained in:
SimolZimol
2026-05-22 16:03:50 +02:00
commit 939cc13689
31 changed files with 2025 additions and 0 deletions

1
services/__init__.py Normal file
View File

@@ -0,0 +1 @@
# services package

View File

@@ -0,0 +1,32 @@
import os
import io
def parse_document(file_path: str, file_type: str) -> str:
"""Extract plain text from a document file."""
ext = file_type.lower().lstrip(".")
if ext == "txt" or ext == "md":
with open(file_path, "r", encoding="utf-8", errors="replace") as f:
return f.read()
if ext == "pdf":
return _parse_pdf(file_path)
if ext == "docx":
return _parse_docx(file_path)
raise ValueError(f"Unsupported file type: {ext}")
def _parse_pdf(file_path: str) -> str:
from pdfminer.high_level import extract_text
text = extract_text(file_path)
return text or ""
def _parse_docx(file_path: str) -> str:
from docx import Document
doc = Document(file_path)
paragraphs = [p.text for p in doc.paragraphs if p.text.strip()]
return "\n".join(paragraphs)

109
services/llm_service.py Normal file
View File

@@ -0,0 +1,109 @@
"""
Abstracted LLM service.
Supports AI_PROVIDER=lmstudio (default) or AI_PROVIDER=openai.
Both use the openai Python client — only base_url / api_key differ.
"""
from typing import Optional
from flask import current_app
_client = None
def _get_client():
global _client
if _client is not None:
return _client
import openai
provider = current_app.config.get("AI_PROVIDER", "lmstudio").lower()
if provider == "openai":
api_key = current_app.config.get("OPENAI_API_KEY", "")
_client = openai.OpenAI(api_key=api_key)
else:
# LM Studio or any OpenAI-compatible endpoint
base_url = current_app.config.get("LM_STUDIO_URL", "http://localhost:1234")
_client = openai.OpenAI(
base_url=f"{base_url.rstrip('/')}/v1",
api_key="lm-studio",
)
return _client
def _get_model() -> str:
provider = current_app.config.get("AI_PROVIDER", "lmstudio").lower()
if provider == "openai":
return current_app.config.get("OPENAI_MODEL", "gpt-4o")
return current_app.config.get("LM_STUDIO_MODEL", "local-model")
def ask(
user_message: str,
context_chunks: Optional[list[str]] = None,
history: Optional[list[dict]] = None,
system_extra: Optional[str] = None,
) -> str:
"""
Send a message to the LLM with optional RAG context and chat history.
Returns the assistant reply as a string.
"""
client = _get_client()
model = _get_model()
system_parts = [
"You are a helpful AI assistant. Answer questions accurately based on the provided context.",
"If the context does not contain enough information, say so clearly.",
]
if context_chunks:
context_text = "\n\n---\n\n".join(context_chunks)
system_parts.append(f"\n\n## Context\n\n{context_text}")
if system_extra:
system_parts.append(system_extra)
messages = [{"role": "system", "content": "\n".join(system_parts)}]
if history:
messages.extend(history)
messages.append({"role": "user", "content": user_message})
response = client.chat.completions.create(
model=model,
messages=messages,
temperature=0.7,
)
return response.choices[0].message.content.strip()
def ask_inline(selected_text: str, question: str) -> str:
"""
Inline chat: use selected_text directly as context — no RAG lookup.
"""
system = (
"You are a helpful AI assistant. The user has selected the following text "
"and has a question about it. Answer specifically about the selected content."
)
messages = [
{"role": "system", "content": system},
{
"role": "user",
"content": f"## Selected text\n\n{selected_text}\n\n## Question\n\n{question}",
},
]
client = _get_client()
model = _get_model()
response = client.chat.completions.create(
model=model,
messages=messages,
temperature=0.7,
)
return response.choices[0].message.content.strip()

135
services/rag_service.py Normal file
View File

@@ -0,0 +1,135 @@
"""
RAG service using ChromaDB + sentence-transformers.
Each chunk is stored with metadata: user_id, source_id, source_type (doc|url).
"""
import os
import re
from typing import Optional
from flask import current_app
_chroma_client = None
_collection = None
_embedder = None
def _get_embedder():
global _embedder
if _embedder is None:
from sentence_transformers import SentenceTransformer
cache = current_app.config.get("TRANSFORMERS_CACHE", ".cache")
_embedder = SentenceTransformer("all-MiniLM-L6-v2", cache_folder=cache)
return _embedder
def _get_collection():
global _chroma_client, _collection
if _collection is None:
import chromadb
path = current_app.config["VECTORDB_PATH"]
_chroma_client = chromadb.PersistentClient(path=path)
_collection = _chroma_client.get_or_create_collection(
name="ki_context",
metadata={"hnsw:space": "cosine"},
)
return _collection
def chunk_text(text: str, chunk_size: int, overlap: int) -> list[str]:
"""Split text into overlapping word-based chunks."""
words = text.split()
chunks = []
start = 0
while start < len(words):
end = start + chunk_size
chunks.append(" ".join(words[start:end]))
start += chunk_size - overlap
return [c for c in chunks if c.strip()]
def index_source(
text: str,
user_id: int,
source_id: int,
source_type: str, # "doc" | "url"
chunk_size: int = 500,
chunk_overlap: int = 50,
):
"""Chunk, embed and store text in ChromaDB. Replaces existing chunks for this source."""
collection = _get_collection()
embedder = _get_embedder()
# Remove old chunks for this source first
delete_source(user_id, source_id, source_type)
chunks = chunk_text(text, chunk_size, chunk_overlap)
if not chunks:
return
embeddings = embedder.encode(chunks, show_progress_bar=False).tolist()
ids = [f"{source_type}_{source_id}_chunk_{i}" for i in range(len(chunks))]
metadatas = [
{"user_id": str(user_id), "source_id": str(source_id), "source_type": source_type}
for _ in chunks
]
collection.add(documents=chunks, embeddings=embeddings, ids=ids, metadatas=metadatas)
def delete_source(user_id: int, source_id: int, source_type: str):
"""Remove all chunks belonging to a source from ChromaDB."""
collection = _get_collection()
try:
collection.delete(
where={
"$and": [
{"user_id": {"$eq": str(user_id)}},
{"source_id": {"$eq": str(source_id)}},
{"source_type": {"$eq": source_type}},
]
}
)
except Exception:
pass
def similarity_search(
query: str,
user_id: int,
source_ids: Optional[list[int]] = None,
source_type: Optional[str] = None,
top_k: int = 5,
) -> list[str]:
"""
Search for relevant chunks.
Optionally filter by specific source_ids and/or source_type.
Returns list of chunk texts.
"""
collection = _get_collection()
embedder = _get_embedder()
query_embedding = embedder.encode([query], show_progress_bar=False).tolist()[0]
# Build where filter
conditions = [{"user_id": {"$eq": str(user_id)}}]
if source_ids is not None and len(source_ids) > 0:
conditions.append(
{"source_id": {"$in": [str(sid) for sid in source_ids]}}
)
if source_type:
conditions.append({"source_type": {"$eq": source_type}})
where = {"$and": conditions} if len(conditions) > 1 else conditions[0]
try:
results = collection.query(
query_embeddings=[query_embedding],
n_results=top_k,
where=where,
)
return results["documents"][0] if results["documents"] else []
except Exception:
return []

31
services/url_scraper.py Normal file
View File

@@ -0,0 +1,31 @@
import requests
from bs4 import BeautifulSoup
def scrape_url(url: str, timeout: int = 15) -> tuple[str, str]:
"""
Fetch a URL and return (title, plain_text).
Raises requests.RequestException on network errors.
"""
headers = {
"User-Agent": (
"Mozilla/5.0 (compatible; KIContextTool/1.0; "
"+https://github.com/user/ki-context-tool)"
)
}
resp = requests.get(url, headers=headers, timeout=timeout)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
# Remove script/style noise
for tag in soup(["script", "style", "nav", "footer", "header", "aside"]):
tag.decompose()
title = soup.title.string.strip() if soup.title and soup.title.string else url
# Extract readable text
text = soup.get_text(separator="\n", strip=True)
# Collapse excessive blank lines
lines = [line for line in text.splitlines() if line.strip()]
return title, "\n".join(lines)