Files
wetter/app.py
simon 0baf55d8bc modified: app.py
modified:   templates/base.html
	modified:   templates/index.html
2026-04-27 11:20:01 +02:00

984 lines
41 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
from flask import Flask, render_template, request, jsonify, redirect, make_response, g
from translations import TRANSLATIONS
from geopy.geocoders import Nominatim
from geopy.exc import GeocoderTimedOut
import math
import pandas as pd
import datetime as _dt
import logging
from collections import deque
from threading import Lock
from time import time
from wetterdienst.provider.dwd.mosmix import DwdMosmixRequest
from cachetools import TTLCache
from astral import LocationInfo
from astral.sun import sun as astral_sun
import requests as _requests
app = Flask(__name__)
app.logger.setLevel(logging.INFO)
@app.before_request
def _set_language():
# URL parameter ?lang= takes priority over cookie
lang = request.args.get("lang") or request.cookies.get("lang", "de")
if lang not in TRANSLATIONS:
lang = "de"
g.lang = lang
g.T = TRANSLATIONS[lang]
@app.after_request
def _persist_language_cookie(response):
"""If ?lang= was in the URL, save it as a cookie so it survives navigation."""
url_lang = request.args.get("lang")
if url_lang in TRANSLATIONS:
response.set_cookie("lang", url_lang, max_age=60*60*24*365, samesite="Lax")
return response
@app.context_processor
def _inject_i18n():
return {"T": g.get("T", TRANSLATIONS["de"]), "lang": g.get("lang", "de")}
_GEOLOCATOR = Nominatim(user_agent="skywatcher-app/1.0")
# ── Zeitzone ────────────────────────────────────────────────────────────
try:
import zoneinfo
BERLIN = zoneinfo.ZoneInfo("Europe/Berlin")
except ImportError:
import pytz
BERLIN = pytz.timezone("Europe/Berlin")
# ── Cache: Forecasts 45 Min, DWD-Warnungen 15 Min ────────────────────────────
_forecast_cache = TTLCache(maxsize=64, ttl=2700)
_warn_cache = TTLCache(maxsize=32, ttl=900)
_suggest_cache = TTLCache(maxsize=256, ttl=900)
_sun_cache: dict = {} # (lat_r, lon_r, date) -> (naive_sunrise_ts, naive_sunset_ts)
# API protection: simple in-memory rate limiting by client IP.
_suggest_rate_lock = Lock()
_suggest_rate_hits = {}
_SUGGEST_RATE_WINDOW_SECONDS = 60
_SUGGEST_RATE_MAX_REQUESTS = 30
STATE_ALIASES = {
"baden-wurttemberg": {"baden-wurttemberg", "baden wuerttemberg", "bw"},
"bayern": {"bayern", "by"},
"berlin": {"berlin", "be"},
"brandenburg": {"brandenburg", "bb"},
"bremen": {"bremen", "hb"},
"hamburg": {"hamburg", "hh"},
"hessen": {"hessen", "he"},
"mecklenburg-vorpommern": {"mecklenburg-vorpommern", "mecklenburg vorpommern", "mv"},
"niedersachsen": {"niedersachsen", "ni"},
"nordrhein-westfalen": {"nordrhein-westfalen", "nordrhein westfalen", "nrw", "nw"},
"rheinland-pfalz": {"rheinland-pfalz", "rheinland pfalz", "rp"},
"saarland": {"saarland", "sl"},
"sachsen": {"sachsen", "sn"},
"sachsen-anhalt": {"sachsen-anhalt", "sachsen anhalt", "st"},
"schleswig-holstein": {"schleswig-holstein", "schleswig holstein", "sh"},
"thuringen": {"thuringen", "thueringen", "th"},
}
# ── MOSMIX Parameter ─────────────────────────────────────────────────────────
MOSMIX_PARAMS = [
"hourly/large/temperature_air_mean_2m",
"hourly/large/wind_speed",
"hourly/large/wind_direction",
"hourly/large/wind_gust_max_last_1h",
"hourly/large/cloud_cover_total",
"hourly/large/pressure_air_site_reduced",
"hourly/large/precipitation_height_last_1h",
"hourly/large/sunshine_duration",
"hourly/large/probability_precipitation_height_gt_0_1mm_last_1h",
"hourly/large/uv_index",
"hourly/large/visibility",
"hourly/large/weather_significant",
]
def _get_berlin():
try:
import zoneinfo
return zoneinfo.ZoneInfo("Europe/Berlin")
except ImportError:
import pytz
return pytz.timezone("Europe/Berlin")
_TF = None # TimezoneFinder singleton
def _get_location_tz(lat, lon):
"""Return (tzinfo, tz_name) for the given coordinates using timezonefinder."""
global _TF
try:
if _TF is None:
from timezonefinder import TimezoneFinder
_TF = TimezoneFinder()
tz_name = _TF.timezone_at(lat=lat, lng=lon)
if tz_name:
try:
import zoneinfo
return zoneinfo.ZoneInfo(tz_name), tz_name
except ImportError:
import pytz
return pytz.timezone(tz_name), tz_name
except Exception:
pass
return _get_berlin(), "Europe/Berlin"
def _normalize_text(value):
if not value:
return ""
text = str(value).strip().lower()
replacements = {
"ä": "a",
"ö": "o",
"ü": "u",
"ß": "ss",
}
for src, dst in replacements.items():
text = text.replace(src, dst)
return " ".join(text.split())
def _state_tokens(state_name):
normalized = _normalize_text(state_name)
if not normalized:
return set()
for _, aliases in STATE_ALIASES.items():
if normalized in aliases:
return aliases
return {normalized}
def _warning_matches_state(state_hint, warning_state, warning_state_short):
hint_tokens = _state_tokens(state_hint)
warning_tokens = _state_tokens(warning_state)
warning_short = _normalize_text(warning_state_short)
if warning_short:
warning_tokens.add(warning_short)
return bool(hint_tokens and warning_tokens and hint_tokens.intersection(warning_tokens))
def _extract_state_from_location(loc):
try:
return (loc.raw or {}).get("address", {}).get("state")
except Exception:
return None
def _extract_location_names(loc):
"""Return a list of normalised name strings (city, county, …) from a Nominatim result
for matching against DWD warning regionName fields."""
names = []
try:
raw = (loc.raw or {}).get("address", {})
for key in ("city", "town", "village", "hamlet", "municipality",
"city_district", "county", "district", "suburb", "borough"):
val = raw.get(key)
if val:
n = _normalize_text(val)
if n and len(n) >= 3 and n not in names:
names.append(n)
except Exception:
pass
return names
def _client_ip(req):
forwarded = req.headers.get("X-Forwarded-For", "").strip()
if forwarded:
return forwarded.split(",")[0].strip()
return req.remote_addr or "unknown"
def _is_suggest_rate_limited(client_ip):
now = time()
with _suggest_rate_lock:
hits = _suggest_rate_hits.get(client_ip)
if hits is None:
hits = deque()
_suggest_rate_hits[client_ip] = hits
cutoff = now - _SUGGEST_RATE_WINDOW_SECONDS
while hits and hits[0] < cutoff:
hits.popleft()
if len(hits) >= _SUGGEST_RATE_MAX_REQUESTS:
return True
hits.append(now)
if len(hits) == 1:
stale_ips = [ip for ip, values in _suggest_rate_hits.items() if not values or values[-1] < cutoff]
for stale_ip in stale_ips:
_suggest_rate_hits.pop(stale_ip, None)
return False
def geocode_location(query):
try:
loc = _GEOLOCATOR.geocode(query, language="de", addressdetails=True, timeout=10)
if loc:
return (loc.latitude, loc.longitude, loc.address,
_extract_state_from_location(loc), _extract_location_names(loc))
except GeocoderTimedOut:
pass
except Exception:
app.logger.exception("Geocoding failed for query '%s'", query)
return None, None, None, None, []
def haversine(lat1, lon1, lat2, lon2):
R = 6371
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = (math.sin(dlat/2)**2 + math.cos(math.radians(lat1))*math.cos(math.radians(lat2))*math.sin(dlon/2)**2)
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
def _isnan(v):
try:
return math.isnan(float(v))
except (TypeError, ValueError):
return True
def _round_temp(k):
if k is None or _isnan(k):
return None
return round(float(k), 1)
def _clamp(value, min_value, max_value):
return max(min_value, min(max_value, value))
def _estimate_uv(dt_local, lat, cloud_pct):
"""Rough UV index estimate for Central Europe when the MOSMIX station doesn't
provide the UVI parameter. Uses time-of-day, season, latitude, and cloud cover."""
hour = dt_local.hour + dt_local.minute / 60.0
if hour < 5.5 or hour > 20.5:
return 0.0
noon = 13.0 # approximate solar noon in Germany (CET)
hour_factor = max(0.0, math.cos(math.pi * (hour - noon) / 14.0))
# Typical clear-sky peak UV at solar noon for ~51°N, JanDec
monthly_peak = [1.0, 1.8, 3.5, 5.0, 6.5, 7.5, 7.2, 6.2, 4.5, 2.5, 1.2, 0.8]
seasonal = monthly_peak[dt_local.month - 1]
lat_factor = max(0.5, 1.0 - (float(lat) - 51.0) * 0.015)
cloud_factor = 1.0 - (cloud_pct or 0) / 100.0 * 0.75
uv = seasonal * hour_factor * lat_factor * cloud_factor
return round(max(0.0, uv), 1)
def uv_risk_info(uv_index):
if uv_index is None:
return "", "na"
uv = float(uv_index)
if uv < 3:
return "niedrig", "low"
if uv < 6:
return "moderat", "moderate"
if uv < 8:
return "hoch", "high"
if uv < 11:
return "sehr hoch", "very-high"
return "extrem", "extreme"
def hour_confidence_score(temp_c, precip_mm, rain_prob, wind_kmh, gust_kmh, cloud_pct):
score = 100
if temp_c is None:
score -= 18
if precip_mm is not None:
score -= _clamp(precip_mm * 20, 0, 40)
if rain_prob is not None:
score -= _clamp(rain_prob * 0.35, 0, 30)
if wind_kmh is not None:
score -= _clamp((wind_kmh - 25) * 0.6, 0, 18)
if gust_kmh is not None:
score -= _clamp((gust_kmh - 45) * 0.45, 0, 16)
if cloud_pct is not None:
score -= _clamp((cloud_pct - 85) * 0.4, 0, 8)
score = int(round(_clamp(score, 5, 99)))
if score >= 80:
return score, "hoch"
if score >= 60:
return score, "mittel"
return score, "niedrig"
def activity_score(temp_c, precip_mm, rain_prob, wind_kmh, gust_kmh, uv_index):
score = 100.0
if temp_c is not None:
score -= abs(temp_c - 20) * 3.5
if precip_mm is not None:
score -= _clamp(precip_mm * 35, 0, 45)
if rain_prob is not None:
score -= _clamp(rain_prob * 0.45, 0, 35)
if wind_kmh is not None:
score -= _clamp((wind_kmh - 18) * 0.7, 0, 16)
if gust_kmh is not None:
score -= _clamp((gust_kmh - 35) * 0.55, 0, 12)
if uv_index is not None and uv_index > 6:
score -= _clamp((uv_index - 6) * 6, 0, 16)
return int(round(_clamp(score, 0, 100)))
def best_activity_window(forecast, horizon_hours=24, window_size=2):
hours = forecast[:horizon_hours]
if len(hours) < window_size:
return None
best = None
for i in range(0, len(hours) - window_size + 1):
segment = hours[i:i + window_size]
scores = [h.get("activity_score") for h in segment if h.get("activity_score") is not None]
if not scores:
continue
avg_score = round(sum(scores) / len(scores))
if best is None or avg_score > best["score"]:
best = {
"start": segment[0]["datetime"],
"end": segment[-1]["datetime"],
"score": int(avg_score),
}
return best
def pressure_trend_info(forecast, step_hours=6):
if len(forecast) <= step_hours:
return None, None
p0 = forecast[0].get("pressure_hpa")
p1 = forecast[step_hours].get("pressure_hpa")
if p0 is None or p1 is None:
return None, None
delta = round(p1 - p0, 1)
if delta >= 1.5:
return delta, "steigend"
if delta <= -1.5:
return delta, "fallend"
return delta, "stabil"
def temp_trend_info(forecast, step_hours=6):
if len(forecast) <= step_hours:
return None, None
t0 = forecast[0].get("temp_c")
t1 = forecast[step_hours].get("temp_c")
if t0 is None or t1 is None:
return None, None
delta = round(t1 - t0, 1)
if delta >= 1.0:
return delta, "wärmer"
if delta <= -1.0:
return delta, "kälter"
return delta, "konstant"
def _parse_warning_datetime(value):
if value in (None, ""):
return None
try:
if isinstance(value, (int, float)):
ts = pd.Timestamp(value, unit="ms", tz="UTC")
else:
v = str(value).strip()
if v.isdigit():
ts = pd.Timestamp(int(v), unit="ms", tz="UTC")
else:
ts = pd.Timestamp(v)
if ts.tzinfo is None:
ts = ts.tz_localize("UTC")
return ts.tz_convert(_get_berlin()).tz_localize(None).to_pydatetime()
except Exception:
return None
# ── Icon key → static/icons/{key}.png ──────────────────────────────────────
# sonne clear day
# wolkig(2) sun + one cloud (partly cloudy)
# wolke single cloud (mostly cloudy)
# wolkig two clouds (overcast)
# wolkig(1) sun+cloud+rain (showers)
# regen heavy rain
# schnee snow
# blitz dry thunderstorm (WW 17, 91-92)
# sturm thunderstorm with rain (WW 95, 97-98)
# hagel hail (WW 27, 89-90, 93-94)
# schneebedeckt hail + snow (hagel WW + temp ≤ 2°C)
# sturmundhagel thunderstorm + rain + hail (WW 96, 99)
# schneebedecktsonne snow + sunshine (temp ≤ 2°C, cloud < 60 %)
# nebel dense fog (visibility < 1000 m, day)
# nebel_nacht dense fog (visibility < 1000 m, night)
# nebel_wolkig foggy + overcast (1000-5000 m, cloud > 60 %)
# wolkig_nebel_sonne patchy fog / haze (1000-5000 m, less cloud)
# nacht clear night (moon)
# nacht(1) moon behind cloud
# nacht(2) moon+cloud+rain
# nacht(3) moon+cloud+snow
_WW_HAIL = {27, 89, 90, 93, 94} # hail showers (no thunder)
_WW_GRAUPEL = {87, 88} # graupel → schneebedeckt
_WW_THUNDER_HAIL = {96, 99} # thunderstorm + hail → sturmundhagel
_WW_THUNDER_RAIN = {91, 92, 95, 97, 98} # thunderstorm + precipitation
_WW_THUNDER_DRY = {17} # dry thunderstorm
def weather_icon(cloud_pct, precip_mm, rain_prob, temp_c, is_night=False, visibility_m=None, weather_code=None):
"""Return the icon key for static/icons/{key}.png."""
# Fog (takes priority; fog unlikely when precipitating heavily)
if visibility_m is not None and visibility_m < 5000 and not (precip_mm and precip_mm > 0.5):
if visibility_m < 1000:
return "nebel_nacht" if is_night else "nebel"
if cloud_pct is not None and cloud_pct > 60:
return "nebel_wolkig"
return "wolkig_nebel_sonne"
# ── WW-code based icons (thunderstorm / hail / graupel) ──────────
if weather_code is not None:
ww = int(weather_code)
if ww in _WW_THUNDER_HAIL:
return "sturmundhagel"
if ww in _WW_HAIL:
return "schneebedeckt" if (temp_c is not None and temp_c <= 2) else "hagel"
if ww in _WW_GRAUPEL:
return "schneebedeckt"
if ww in _WW_THUNDER_RAIN:
return "sturm"
if ww in _WW_THUNDER_DRY:
return "blitz"
# Snow / sleet (day and night)
if temp_c is not None and temp_c <= 2 and (
(precip_mm and precip_mm > 0.1) or (rain_prob and rain_prob >= 40)
):
if is_night:
return "nacht(3)"
return "schneebedecktsonne" if (cloud_pct is not None and cloud_pct < 60) else "schnee"
# ── Night icons ───────────────────────────────────────────────────
if is_night:
if (precip_mm and precip_mm > 0.1) or (rain_prob is not None and rain_prob >= 50):
return "nacht(2)"
if rain_prob is not None and rain_prob >= 30 and cloud_pct is not None and cloud_pct > 50:
return "nacht(2)"
if cloud_pct is not None:
if cloud_pct > 80: return "wolkig" # too cloudy to see moon
if cloud_pct > 30: return "nacht(1)"
return "nacht"
# ── Day icons ─────────────────────────────────────────────────────
if (precip_mm and precip_mm > 0.1) or (rain_prob is not None and rain_prob >= 50):
return "wolkig(1)" if (cloud_pct is None or cloud_pct < 75) else "regen"
if rain_prob is not None and rain_prob >= 30 and cloud_pct is not None and cloud_pct > 50:
return "wolkig(1)"
if cloud_pct is not None:
if cloud_pct > 80: return "wolkig"
if cloud_pct > 50: return "wolke"
if cloud_pct > 20: return "wolkig(2)"
return "sonne"
def pick_daily_icon(hours):
"""Choose the most representative icon key for a whole day."""
if not hours:
return "sonne"
# Thunderstorm / hail: any hour with matching WW code takes priority
ww_codes = [h.get("weather_code") for h in hours if h.get("weather_code") is not None]
if ww_codes:
has_t_hail = any(w in _WW_THUNDER_HAIL for w in ww_codes)
has_hail = any(w in _WW_HAIL for w in ww_codes)
has_graupel = any(w in _WW_GRAUPEL for w in ww_codes)
has_t_rain = any(w in _WW_THUNDER_RAIN for w in ww_codes)
has_t_dry = any(w in _WW_THUNDER_DRY for w in ww_codes)
avg_temp = sum(h["temp_c"] for h in hours if h.get("temp_c") is not None)
n_temp = sum(1 for h in hours if h.get("temp_c") is not None)
mean_temp = avg_temp / n_temp if n_temp else 10
if has_t_hail:
return "sturmundhagel"
if has_hail:
return "schneebedeckt" if mean_temp <= 2 else "hagel"
if has_graupel:
return "schneebedeckt"
if has_t_rain:
return "sturm"
if has_t_dry:
return "blitz"
# Fog: majority of hours with reduced visibility and no heavy precipitation
fog_hours = [
h for h in hours
if h.get("visibility_m") is not None
and h["visibility_m"] < 5000
and (h.get("precip_mm") or 0) <= 0.5
]
if len(fog_hours) >= len(hours) // 2 and fog_hours:
avg_vis = sum(h["visibility_m"] for h in fog_hours) / len(fog_hours)
avg_cloud_fog = sum(h.get("cloud_pct") or 0 for h in fog_hours) / len(fog_hours)
if avg_vis < 1000:
return "nebel"
if avg_cloud_fog > 60:
return "nebel_wolkig"
return "wolkig_nebel_sonne"
# Snow / sleet
if any(
h.get("temp_c") is not None and h["temp_c"] <= 2
and ((h.get("precip_mm") or 0) > 0.1 or (h.get("rain_prob") or 0) >= 40)
for h in hours
):
return "schnee"
clouds = [h.get("cloud_pct") for h in hours if h.get("cloud_pct") is not None]
avg_cloud = sum(clouds) / len(clouds) if clouds else 0
# Significant rain/showers
if any((h.get("precip_mm") or 0) >= 0.8 or (h.get("rain_prob") or 0) >= 65 for h in hours):
return "regen" if avg_cloud >= 75 else "wolkig(1)"
# Light showers
if any((h.get("precip_mm") or 0) > 0.1 or (h.get("rain_prob") or 0) >= 35 for h in hours):
return "wolkig(1)"
# Cloud cover
if avg_cloud > 80: return "wolkig"
if avg_cloud > 50: return "wolke"
if avg_cloud > 20: return "wolkig(2)"
return "sonne"
def feels_like(temp_c, wind_kmh, cloud_pct):
"""Apparent / perceived temperature.
* ≤10 °C + wind >4.8 km/h → Windchill (JAG/TI formula)
* ≥27 °C → Heat index (Rothfusz, RH 60 %)
* 1027 °C → Small sun/wind corrections:
- Very clear sky (<20 % clouds): up to +2 °C
- Notable wind (>20 km/h): up to -2 °C
Only reported when result differs by ≥1 °C from actual temp.
"""
if temp_c is None:
return None
# ── Wind chill (cold range) ───────────────────────────────────────
if temp_c <= 10 and wind_kmh is not None and wind_kmh > 4.8:
v = wind_kmh
wc = 13.12 + 0.6215*temp_c - 11.37*(v**0.16) + 0.3965*temp_c*(v**0.16)
return round(wc, 1)
# ── Heat index (hot range) ────────────────────────────────────────
if temp_c >= 27:
rh = 60
hi = (-8.78469475556 + 1.61139411*temp_c + 2.33854883889*rh
- 0.14611605*temp_c*rh - 0.012308094*temp_c**2
- 0.016424828*rh**2 + 0.002211732*temp_c**2*rh
+ 0.00072546*temp_c*rh**2 - 0.000003582*temp_c**2*rh**2)
return round(hi, 1)
# ── Mild range: conservative corrections only ─────────────────────
adjusted = float(temp_c)
# Sunshine: very clear sky (<20 % clouds) adds max +2 °C
if cloud_pct is not None and cloud_pct < 20:
adjusted += (20 - cloud_pct) / 20 * 2.0
# Wind: only meaningful wind (>20 km/h) cools, max -2 °C
if wind_kmh is not None and wind_kmh > 20:
adjusted -= _clamp((wind_kmh - 20) / 60 * 2.0, 0, 2.0)
result = round(adjusted, 1)
return result if abs(result - temp_c) >= 1.0 else temp_c
def get_sun_times(lat, lon, date=None):
try:
loc_tz, tz_name = _get_location_tz(lat, lon)
loc = LocationInfo(latitude=lat, longitude=lon, timezone=tz_name)
d = date or _dt.date.today()
s = astral_sun(loc.observer, date=d, tzinfo=loc_tz)
return (s["sunrise"].strftime("%H:%M"), s["sunset"].strftime("%H:%M"),
s["dawn"].strftime("%H:%M"), s["dusk"].strftime("%H:%M"))
except Exception:
return None, None, None, None
def _sunrise_sunset(lat, lon, d):
"""Return (naive_sunrise, naive_sunset) in location local time for date d.
Results are cached in _sun_cache to avoid recomputing for every forecast hour."""
key = (round(lat, 1), round(lon, 1), d)
if key in _sun_cache:
return _sun_cache[key]
try:
loc_tz, tz_name = _get_location_tz(lat, lon)
loc = LocationInfo(latitude=lat, longitude=lon, timezone=tz_name)
s = astral_sun(loc.observer, date=d, tzinfo=loc_tz)
sr = pd.Timestamp(s["sunrise"]).tz_convert(loc_tz).tz_localize(None)
ss = pd.Timestamp(s["sunset"]).tz_convert(loc_tz).tz_localize(None)
result = (sr, ss)
except Exception:
result = (None, None)
_sun_cache[key] = result
return result
def get_dwd_warnings(lat, lon, state_hint=None, location_names=None):
"""Fetch DWD warnings and filter by regionName matching the user's municipality/county.
The DWD JSON uses warncell-IDs as keys and each warning carries a ``regionName``
field (e.g. "Krefeld", "Kreis Kleve - Niederrhein", "Sauerland") that identifies
the affected area far more precisely than the federal-state field. We only include
a warning when its regionName contains at least one of the place names extracted from
the geocoded location (city, county, …). Warnings for neighbouring NRW regions
(Eifel, Sauerland, …) are therefore never shown for Krefeld.
Duplicates (same event type + onset + expires) are removed before returning.
"""
loc_names = location_names or []
key = (round(lat, 1), round(lon, 1))
if key in _warn_cache:
return _warn_cache[key]
try:
url = "https://www.dwd.de/DWD/warnungen/warnapp/json/warnings.json"
resp = _requests.get(url, timeout=8)
if resp.status_code != 200:
_warn_cache[key] = []
return []
text = resp.text
if text.startswith("warnWetter.loadWarnings("):
text = text[len("warnWetter.loadWarnings("):-2]
import json as _json
data = _json.loads(text)
matched = []
for region_warns in data.get("warnings", {}).values():
for w in (region_warns or []):
level = w.get("level", 0)
if level < 1:
continue
region_name = w.get("regionName", "") or ""
norm_region = _normalize_text(region_name)
# Require the user's city / county to appear in regionName
region_match = False
for name in loc_names:
if name and len(name) >= 3 and name in norm_region:
region_match = True
break
if not region_match:
continue
matched.append({
"level": level,
"type": w.get("event", ""),
"headline": w.get("headline", ""),
"description": w.get("description", ""),
"onset": w.get("onset", ""),
"expires": w.get("expires", ""),
})
# Deduplicate: same event type + onset time + expires time
seen = set()
deduped = []
for w in matched:
dk = (w["type"], w["onset"], w["expires"])
if dk not in seen:
seen.add(dk)
deduped.append(w)
deduped.sort(key=lambda x: x["level"], reverse=True)
result = deduped[:5]
_warn_cache[key] = result
return result
except Exception:
app.logger.exception("Could not load DWD warnings")
_warn_cache[key] = []
return []
def wind_direction_name(degrees):
if degrees is None or _isnan(degrees):
return ""
dirs = ["N","NNO","NO","ONO","O","OSO","SO","SSO","S","SSW","SW","WSW","W","WNW","NW","NNW"]
idx = round(float(degrees)/22.5) % 16
return dirs[idx]
def get_mosmix_forecast(lat, lon, hours=72, loc_tz=None):
cache_key = (round(lat,2), round(lon,2), hours)
if cache_key in _forecast_cache:
return _forecast_cache[cache_key]
try:
if loc_tz is None:
loc_tz, _ = _get_location_tz(lat, lon)
req = DwdMosmixRequest(parameters=MOSMIX_PARAMS)
nearest = req.filter_by_rank(latlon=(lat, lon), rank=1)
result = nearest.values.all()
df = result.df
if df is None or (hasattr(df,"__len__") and len(df)==0):
return [], {}
if hasattr(df,"to_pandas"): df = df.to_pandas()
station_info = {}
sdf = nearest.df
if sdf is not None and len(sdf) > 0:
if hasattr(sdf,"to_pandas"): sdf = sdf.to_pandas()
station_info = sdf.iloc[0].to_dict()
df = df.sort_values("date").copy()
min_date = df["date"].min()
cutoff = min_date + pd.Timedelta(hours=hours)
df = df[df["date"] <= cutoff]
forecast = []
for date_val, group in df.groupby("date"):
p = {row["parameter"]: row["value"] for _, row in group.iterrows()}
temp_c = _round_temp(p.get("temperature_air_mean_2m"))
ff = p.get("wind_speed")
wind_kmh = round(float(ff)*3.6,1) if not _isnan(ff) else None
fx1 = p.get("wind_gust_max_last_1h")
gust_kmh = round(float(fx1)*3.6,1) if not _isnan(fx1) else None
pppp = p.get("pressure_air_site_reduced")
pressure = round(float(pppp),1) if not _isnan(pppp) else None
rr1c = p.get("precipitation_height_significant_weather_last_1h")
rr1 = p.get("precipitation_height_last_1h")
prec_raw = rr1c if not _isnan(rr1c) else (rr1 if not _isnan(rr1) else None)
precip = round(float(prec_raw),1) if prec_raw is not None else 0.0
rprob = p.get("probability_precipitation_height_gt_0_1mm_last_1h")
rain_prob = round(float(rprob)*100) if not _isnan(rprob) else None
n = p.get("cloud_cover_total")
if not _isnan(n):
nf = float(n)
# wetterdienst returns cloud cover as a 0-1 fraction (like probabilities);
# guard against stations that already return 0-100 percent.
clouds = _clamp(round(nf * 100 if nf <= 1.0 else nf), 0, 100)
else:
clouds = None
sun = p.get("sunshine_duration")
sun_min = round(float(sun)/60) if not _isnan(sun) else 0
wd = p.get("wind_direction")
wind_dir = float(wd) if not _isnan(wd) else None
vis_raw = p.get("visibility")
visibility_m = round(float(vis_raw)) if not _isnan(vis_raw) else None
ww_raw = p.get("weather_significant")
weather_code = int(float(ww_raw)) if not _isnan(ww_raw) else None
uv_raw = p.get("uv_index")
dt_local = pd.Timestamp(date_val).tz_convert(loc_tz).tz_localize(None)
# Determine day/night for icon selection
_sr, _ss = _sunrise_sunset(lat, lon, dt_local.date())
is_night = bool(
_sr is not None and _ss is not None
and (pd.Timestamp(dt_local) < _sr or pd.Timestamp(dt_local) > _ss)
)
if not _isnan(uv_raw):
uv = round(float(uv_raw), 1)
else:
uv = _estimate_uv(dt_local, lat, clouds)
uv_label, uv_level = uv_risk_info(uv)
feels = feels_like(temp_c, wind_kmh, clouds)
confidence_score, confidence_label = hour_confidence_score(temp_c, precip, rain_prob, wind_kmh, gust_kmh, clouds)
a_score = activity_score(temp_c, precip, rain_prob, wind_kmh, gust_kmh, uv)
forecast.append({
"datetime": dt_local,
"temp_c": temp_c,
"feels_like": feels,
"wind_kmh": wind_kmh,
"gust_kmh": gust_kmh,
"pressure_hpa": pressure,
"precip_mm": precip,
"rain_prob": rain_prob,
"cloud_pct": clouds,
"sun_min": sun_min,
"wind_dir": wind_dir,
"uv_index": uv,
"uv_label": uv_label,
"uv_level": uv_level,
"confidence": confidence_score,
"confidence_label": confidence_label,
"activity_score": a_score,
"visibility_m": visibility_m,
"weather_code": weather_code,
"icon": weather_icon(clouds, precip, rain_prob, temp_c, is_night=is_night, visibility_m=visibility_m, weather_code=weather_code),
})
result_data = (forecast, station_info)
_forecast_cache[cache_key] = result_data
return result_data
except Exception:
app.logger.exception("MOSMIX forecast loading failed")
return [], {}
def filter_unrealistic_warnings(warnings, forecast, now_local=None):
"""Remove warnings that contradict the MOSMIX forecast.
Frost warnings are suppressed when the minimum forecast temperature within the
warning window (or next 48 h if the window is unknown) stays above 3 °C.
Rain / heavy-rain warnings are suppressed when neither significant precipitation
nor a meaningful rain probability is forecast.
Both onset/expires timestamps from DWD and forecast datetimes are compared as
naive Berlin-local datetimes so no tz-mismatch can cause an empty overlap window.
"""
if not warnings or not forecast:
return warnings
frost_keywords = {"frost", "glatte", "glatt", "eis", "schnee"}
rain_keywords = {"regen", "starkregen", "dauerregen"}
result = []
for w in warnings:
warn_type = _normalize_text(w.get("type", ""))
headline = _normalize_text(w.get("headline", ""))
onset_dt = _parse_warning_datetime(w.get("onset"))
expires_dt = _parse_warning_datetime(w.get("expires"))
# Discard already-expired warnings
if now_local is not None and expires_dt is not None and expires_dt < now_local:
app.logger.info("Discarded expired warning: %s (expires %s)", w.get("headline"), expires_dt)
continue
# Select forecast hours that fall within the warning window.
# Convert everything to pandas Timestamps (naive) so comparisons are type-safe.
if onset_dt is not None and expires_dt is not None:
ts_onset = pd.Timestamp(onset_dt)
ts_expires = pd.Timestamp(expires_dt)
relevant = [
h for h in forecast[:48]
if h.get("datetime") is not None
and ts_onset <= pd.Timestamp(h["datetime"]) <= ts_expires
]
# If the window is entirely in the future and forecast doesn't reach it,
# fall back to the full 48-h slice to be conservative.
if not relevant:
relevant = forecast[:48]
else:
relevant = forecast[:48]
temps = [h["temp_c"] for h in relevant if h.get("temp_c") is not None]
precips = [h["precip_mm"] for h in relevant if h.get("precip_mm") is not None]
rainprobs = [h["rain_prob"] for h in relevant if h.get("rain_prob") is not None]
min_temp = min(temps) if temps else None
max_precip = max(precips) if precips else 0.0
max_rain_prob = max(rainprobs) if rainprobs else 0
is_frost = any(kw in warn_type or kw in headline for kw in frost_keywords)
is_rain = any(kw in warn_type or kw in headline for kw in rain_keywords)
skip = False
if is_frost and min_temp is not None and min_temp > 3:
app.logger.info("Suppressed frost warning (min %.1f °C): %s", min_temp, w.get("headline"))
skip = True
elif is_rain and max_precip < 0.2 and max_rain_prob < 35:
app.logger.info("Suppressed rain warning (max_precip %.1f mm): %s", max_precip, w.get("headline"))
skip = True
if not skip:
w["onset_dt"] = onset_dt
w["expires_dt"] = expires_dt
result.append(w)
return result
@app.route("/set-lang")
def set_lang():
lang = request.args.get("lang", "de")
if lang not in TRANSLATIONS:
lang = "de"
dest = request.args.get("next") or request.referrer or "/"
resp = make_response(redirect(dest))
resp.set_cookie("lang", lang, max_age=60*60*24*365, samesite="Lax")
return resp
@app.route("/")
def index():
return render_template("index.html")
@app.route("/wetter", methods=["GET"])
def wetter():
ort = request.args.get("ort","").strip()
lat_param = request.args.get("lat")
lon_param = request.args.get("lon")
# Geolocation via Browser-Koordinaten
lat, lon, display_name, state_hint, location_names = None, None, None, None, []
if lat_param and lon_param:
try:
lat = float(lat_param)
lon = float(lon_param)
if not (-90 <= lat <= 90 and -180 <= lon <= 180):
raise ValueError("Invalid coordinate range")
loc = _GEOLOCATOR.reverse((lat, lon), language="de", addressdetails=True, timeout=10)
display_name = loc.address if loc else f"{lat:.2f}, {lon:.2f}"
state_hint = _extract_state_from_location(loc) if loc else None
location_names = _extract_location_names(loc) if loc else []
if not ort or ort == "Mein Standort":
ort = display_name.split(",")[0]
except Exception:
app.logger.info("Invalid or unusable browser coordinates for /wetter")
lat, lon, display_name, state_hint, location_names = None, None, None, None, []
if lat is None:
if not ort:
return render_template("index.html", error=g.T["error_no_place"])
lat, lon, display_name, state_hint, location_names = geocode_location(ort)
if lat is None:
return render_template("index.html", error=g.T["error_place_not_found"].format(ort=ort))
loc_tz, location_tz = _get_location_tz(lat, lon)
forecast, mosmix_station = get_mosmix_forecast(lat, lon, hours=240, loc_tz=loc_tz)
if not forecast:
return render_template("index.html", error=g.T["error_no_data"])
station_name = mosmix_station.get("name", ort)
station_id = mosmix_station.get("station_id", "")
station_lat = float(mosmix_station.get("latitude", lat))
station_lon = float(mosmix_station.get("longitude", lon))
station_dist = round(haversine(lat, lon, station_lat, station_lon), 1)
now_local_dt = _dt.datetime.now(loc_tz)
now_local = now_local_dt.strftime("%H:%M")
now_loc_naive = now_local_dt.replace(minute=0, second=0, microsecond=0, tzinfo=None)
current_idx = 0
for i, h in enumerate(forecast):
dt = h["datetime"]
dt_naive = dt.replace(tzinfo=None) if hasattr(dt,"tzinfo") and dt.tzinfo is not None else dt
if dt_naive >= now_loc_naive:
current_idx = i
break
current = forecast[current_idx]
forecast = forecast[current_idx:]
sunrise, sunset, dawn, dusk = get_sun_times(lat, lon)
warnings = get_dwd_warnings(lat, lon, state_hint=state_hint, location_names=location_names)
warnings = filter_unrealistic_warnings(warnings, forecast, now_local=now_local_dt.replace(tzinfo=None))
pressure_delta, pressure_trend = pressure_trend_info(forecast)
temp_delta_6h, temp_trend_6h = temp_trend_info(forecast)
best_window = best_activity_window(forecast, horizon_hours=24, window_size=2)
daily = {}
for h in forecast:
dt = h["datetime"]
day = dt.date() if hasattr(dt,"date") else str(dt)[:10]
if day not in daily:
daily[day] = {"temps":[], "precip":0.0, "cloud":[], "wind":[], "icons":[], "rain_prob":[], "uv":[], "hours":[]}
daily[day]["hours"].append(h)
if h["temp_c"] is not None: daily[day]["temps"].append(h["temp_c"])
daily[day]["precip"] += h.get("precip_mm") or 0
if h["cloud_pct"] is not None: daily[day]["cloud"].append(h["cloud_pct"])
if h["wind_kmh"] is not None: daily[day]["wind"].append(h["wind_kmh"])
if h.get("rain_prob") is not None: daily[day]["rain_prob"].append(h["rain_prob"])
if h.get("uv_index") is not None: daily[day]["uv"].append(h["uv_index"])
daily[day]["icons"].append(h["icon"])
daily_summary = []
for day, d in daily.items():
daily_summary.append({
"date": day,
"temp_min": min(d["temps"]) if d["temps"] else None,
"temp_max": max(d["temps"]) if d["temps"] else None,
"precip": round(d["precip"],1),
"rain_prob": max(d["rain_prob"]) if d["rain_prob"] else None,
"cloud": round(sum(d["cloud"])/len(d["cloud"])) if d["cloud"] else None,
"wind_max": max(d["wind"]) if d["wind"] else None,
"uv_max": max(d["uv"]) if d["uv"] else None,
"icon": pick_daily_icon(d["hours"]),
})
chart_labels, chart_temps, chart_feels, chart_precip, chart_rain_prob = [], [], [], [], []
for h in forecast[:72]:
dt = h["datetime"]
label = dt.strftime("%d.%m %H:%M") if hasattr(dt,"strftime") else str(dt)[5:16]
chart_labels.append(label)
chart_temps.append(h["temp_c"])
chart_feels.append(h.get("feels_like"))
chart_precip.append(h.get("precip_mm") or 0)
chart_rain_prob.append(h.get("rain_prob") or 0)
return render_template(
"weather.html",
ort=ort, display_name=display_name, lat=lat, lon=lon,
station_name=station_name, station_id=station_id, station_dist=station_dist,
current=current, now_local=now_local,
location_tz=location_tz,
pressure_delta=pressure_delta, pressure_trend=pressure_trend,
temp_delta_6h=temp_delta_6h, temp_trend_6h=temp_trend_6h,
best_window=best_window,
sunrise=sunrise, sunset=sunset,
warnings=warnings,
forecast=forecast[:48], daily=daily_summary,
chart_labels=chart_labels, chart_temps=chart_temps, chart_feels=chart_feels,
chart_precip=chart_precip, chart_rain_prob=chart_rain_prob,
wind_dir_name=wind_direction_name,
uv_risk_info=uv_risk_info,
)
@app.route("/api/suggest")
def suggest():
q = request.args.get("q","").strip()
if len(q) < 2:
return jsonify([])
client_ip = _client_ip(request)
if _is_suggest_rate_limited(client_ip):
return jsonify({"error": "Zu viele Anfragen. Bitte kurz warten."}), 429
cache_key = _normalize_text(q)
if cache_key in _suggest_cache:
return jsonify(_suggest_cache[cache_key])
try:
results = _GEOLOCATOR.geocode(q, exactly_one=False, limit=5, language="de", addressdetails=True, timeout=5)
payload = [{"name": r.address, "lat": r.latitude, "lon": r.longitude} for r in results] if results else []
_suggest_cache[cache_key] = payload
return jsonify(payload)
except Exception:
app.logger.exception("Suggest lookup failed for query '%s'", q)
return jsonify([])
if __name__ == "__main__":
app.run(debug=True, host="0.0.0.0", port=5000)