modified: app.py

This commit is contained in:
simon
2026-04-22 14:00:07 +02:00
parent 778c024335
commit 7b3b66c6fc

221
app.py
View File

@@ -118,6 +118,23 @@ def _extract_state_from_location(loc):
except Exception:
return None
def _extract_location_names(loc):
"""Return a list of normalised name strings (city, county, …) from a Nominatim result
for matching against DWD warning regionName fields."""
names = []
try:
raw = (loc.raw or {}).get("address", {})
for key in ("city", "town", "village", "hamlet", "municipality",
"city_district", "county", "district", "suburb", "borough"):
val = raw.get(key)
if val:
n = _normalize_text(val)
if n and len(n) >= 3 and n not in names:
names.append(n)
except Exception:
pass
return names
def _client_ip(req):
forwarded = req.headers.get("X-Forwarded-For", "").strip()
if forwarded:
@@ -147,12 +164,13 @@ def geocode_location(query):
try:
loc = _GEOLOCATOR.geocode(query, language="de", addressdetails=True, timeout=10)
if loc:
return loc.latitude, loc.longitude, loc.address, _extract_state_from_location(loc)
return (loc.latitude, loc.longitude, loc.address,
_extract_state_from_location(loc), _extract_location_names(loc))
except GeocoderTimedOut:
pass
except Exception:
app.logger.exception("Geocoding failed for query '%s'", query)
return None, None, None, None
return None, None, None, None, []
def haversine(lat1, lon1, lat2, lon2):
R = 6371
@@ -332,9 +350,20 @@ def get_sun_times(lat, lon, date=None):
except Exception:
return None, None, None, None
def get_dwd_warnings(lat, lon, state_hint=None):
state_key = _normalize_text(state_hint)
key = (round(lat, 1), round(lon, 1), state_key)
def get_dwd_warnings(lat, lon, state_hint=None, location_names=None):
"""Fetch DWD warnings and filter by regionName matching the user's municipality/county.
The DWD JSON uses warncell-IDs as keys and each warning carries a ``regionName``
field (e.g. "Krefeld", "Kreis Kleve - Niederrhein", "Sauerland") that identifies
the affected area far more precisely than the federal-state field. We only include
a warning when its regionName contains at least one of the place names extracted from
the geocoded location (city, county, …). Warnings for neighbouring NRW regions
(Eifel, Sauerland, …) are therefore never shown for Krefeld.
Duplicates (same event type + onset + expires) are removed before returning.
"""
loc_names = location_names or []
key = (round(lat, 1), round(lon, 1))
if key in _warn_cache:
return _warn_cache[key]
try:
@@ -346,39 +375,47 @@ def get_dwd_warnings(lat, lon, state_hint=None):
text = resp.text
if text.startswith("warnWetter.loadWarnings("):
text = text[len("warnWetter.loadWarnings("):-2]
import json
data = json.loads(text)
warnings = []
import json as _json
data = _json.loads(text)
matched = []
for region_warns in data.get("warnings", {}).values():
for w in region_warns:
for w in (region_warns or []):
level = w.get("level", 0)
if level >= 1:
warnings.append({
"level": level,
"type": w.get("event", ""),
"headline": w.get("headline", ""),
"description": w.get("description", ""),
"state": w.get("state", ""),
"state_short": w.get("stateShort", ""),
"onset": w.get("onset", ""),
"expires": w.get("expires", ""),
})
filtered = []
if state_hint:
filtered = [w for w in warnings if _warning_matches_state(state_hint, w.get("state"), w.get("state_short"))]
result_source = filtered if filtered else []
result_source.sort(key=lambda x: x["level"], reverse=True)
result = [
{
"level": w["level"],
"type": w["type"],
"headline": w["headline"],
"description": w["description"],
"onset": w.get("onset", ""),
"expires": w.get("expires", ""),
}
for w in result_source[:5]
]
if level < 1:
continue
region_name = w.get("regionName", "") or ""
norm_region = _normalize_text(region_name)
# Require the user's city / county to appear in regionName
region_match = False
for name in loc_names:
if name and len(name) >= 3 and name in norm_region:
region_match = True
break
if not region_match:
continue
matched.append({
"level": level,
"type": w.get("event", ""),
"headline": w.get("headline", ""),
"description": w.get("description", ""),
"onset": w.get("onset", ""),
"expires": w.get("expires", ""),
})
# Deduplicate: same event type + onset time + expires time
seen = set()
deduped = []
for w in matched:
dk = (w["type"], w["onset"], w["expires"])
if dk not in seen:
seen.add(dk)
deduped.append(w)
deduped.sort(key=lambda x: x["level"], reverse=True)
result = deduped[:5]
_warn_cache[key] = result
return result
except Exception:
@@ -486,63 +523,76 @@ def get_mosmix_forecast(lat, lon, hours=72):
return [], {}
def filter_unrealistic_warnings(warnings, forecast, now_local=None):
"""
Filter out warnings that contradict the actual forecast.
E.g., frost warning when min temp is > 0°C in next 48 hours.
"""Remove warnings that contradict the MOSMIX forecast.
Frost warnings are suppressed when the minimum forecast temperature within the
warning window (or next 48 h if the window is unknown) stays above 3 °C.
Rain / heavy-rain warnings are suppressed when neither significant precipitation
nor a meaningful rain probability is forecast.
Both onset/expires timestamps from DWD and forecast datetimes are compared as
naive Berlin-local datetimes so no tz-mismatch can cause an empty overlap window.
"""
if not warnings or not forecast:
return warnings
filtered = []
frost_keywords = {"frost", "glatte", "glatt", "eis", "schnee"}
rain_keywords = {"regen", "starkregen", "dauerregen"}
rain_keywords = {"regen", "starkregen", "dauerregen"}
try:
for w in warnings:
warn_type = _normalize_text(w.get("type", ""))
headline = _normalize_text(w.get("headline", ""))
onset_dt = _parse_warning_datetime(w.get("onset"))
expires_dt = _parse_warning_datetime(w.get("expires"))
result = []
for w in warnings:
warn_type = _normalize_text(w.get("type", ""))
headline = _normalize_text(w.get("headline", ""))
onset_dt = _parse_warning_datetime(w.get("onset"))
expires_dt = _parse_warning_datetime(w.get("expires"))
if now_local and expires_dt and expires_dt < now_local:
continue
# Discard already-expired warnings
if now_local is not None and expires_dt is not None and expires_dt < now_local:
app.logger.info("Discarded expired warning: %s (expires %s)", w.get("headline"), expires_dt)
continue
if onset_dt and expires_dt:
relevant_hours = [
h for h in forecast[:48]
if h.get("datetime") is not None and onset_dt <= h["datetime"] <= expires_dt
]
else:
relevant_hours = forecast[:48]
# Select forecast hours that fall within the warning window.
# Convert everything to pandas Timestamps (naive) so comparisons are type-safe.
if onset_dt is not None and expires_dt is not None:
ts_onset = pd.Timestamp(onset_dt)
ts_expires = pd.Timestamp(expires_dt)
relevant = [
h for h in forecast[:48]
if h.get("datetime") is not None
and ts_onset <= pd.Timestamp(h["datetime"]) <= ts_expires
]
# If the window is entirely in the future and forecast doesn't reach it,
# fall back to the full 48-h slice to be conservative.
if not relevant:
relevant = forecast[:48]
else:
relevant = forecast[:48]
temps = [h.get("temp_c") for h in relevant_hours if h.get("temp_c") is not None]
precip = [h.get("precip_mm") for h in relevant_hours if h.get("precip_mm") is not None]
rain_prob = [h.get("rain_prob") for h in relevant_hours if h.get("rain_prob") is not None]
min_temp = min(temps) if temps else None
max_precip = max(precip) if precip else 0
max_rain_prob = max(rain_prob) if rain_prob else 0
temps = [h["temp_c"] for h in relevant if h.get("temp_c") is not None]
precips = [h["precip_mm"] for h in relevant if h.get("precip_mm") is not None]
rainprobs = [h["rain_prob"] for h in relevant if h.get("rain_prob") is not None]
skip = False
min_temp = min(temps) if temps else None
max_precip = max(precips) if precips else 0.0
max_rain_prob = max(rainprobs) if rainprobs else 0
if min_temp is not None and min_temp > 3:
if any(kw in warn_type or kw in headline for kw in frost_keywords):
app.logger.info("Filtered frost warning: min_temp %.1f°C", min_temp)
skip = True
is_frost = any(kw in warn_type or kw in headline for kw in frost_keywords)
is_rain = any(kw in warn_type or kw in headline for kw in rain_keywords)
if max_precip < 0.2 and max_rain_prob < 35:
if any(kw in warn_type or kw in headline for kw in rain_keywords):
app.logger.info("Filtered rain warning: max_precip %.1f mm", max_precip)
skip = True
skip = False
if is_frost and min_temp is not None and min_temp > 3:
app.logger.info("Suppressed frost warning (min %.1f °C): %s", min_temp, w.get("headline"))
skip = True
elif is_rain and max_precip < 0.2 and max_rain_prob < 35:
app.logger.info("Suppressed rain warning (max_precip %.1f mm): %s", max_precip, w.get("headline"))
skip = True
if not skip:
w["onset_dt"] = onset_dt
w["expires_dt"] = expires_dt
filtered.append(w)
except Exception:
app.logger.exception("Error filtering unrealistic warnings")
return warnings
if not skip:
w["onset_dt"] = onset_dt
w["expires_dt"] = expires_dt
result.append(w)
return filtered
return result
@app.route("/")
def index():
@@ -555,7 +605,7 @@ def wetter():
lon_param = request.args.get("lon")
# Geolocation via Browser-Koordinaten
lat, lon, display_name, state_hint = None, None, None, None
lat, lon, display_name, state_hint, location_names = None, None, None, None, []
if lat_param and lon_param:
try:
lat = float(lat_param)
@@ -563,18 +613,19 @@ def wetter():
if not (-90 <= lat <= 90 and -180 <= lon <= 180):
raise ValueError("Invalid coordinate range")
loc = _GEOLOCATOR.reverse((lat, lon), language="de", addressdetails=True, timeout=10)
display_name = loc.address if loc else f"{lat:.2f}, {lon:.2f}"
state_hint = _extract_state_from_location(loc)
display_name = loc.address if loc else f"{lat:.2f}, {lon:.2f}"
state_hint = _extract_state_from_location(loc) if loc else None
location_names = _extract_location_names(loc) if loc else []
if not ort or ort == "Mein Standort":
ort = display_name.split(",")[0]
except Exception:
app.logger.info("Invalid or unusable browser coordinates for /wetter")
lat, lon, display_name, state_hint = None, None, None, None
lat, lon, display_name, state_hint, location_names = None, None, None, None, []
if lat is None:
if not ort:
return render_template("index.html", error="Bitte einen Ort eingeben.")
lat, lon, display_name, state_hint = geocode_location(ort)
lat, lon, display_name, state_hint, location_names = geocode_location(ort)
if lat is None:
return render_template("index.html", error=f'Ort "{ort}" konnte nicht gefunden werden.')
forecast, mosmix_station = get_mosmix_forecast(lat, lon, hours=72)
@@ -599,7 +650,7 @@ def wetter():
current = forecast[current_idx]
forecast = forecast[current_idx:]
sunrise, sunset, dawn, dusk = get_sun_times(lat, lon)
warnings = get_dwd_warnings(lat, lon, state_hint=state_hint)
warnings = get_dwd_warnings(lat, lon, state_hint=state_hint, location_names=location_names)
warnings = filter_unrealistic_warnings(warnings, forecast, now_local=now_local_dt.replace(tzinfo=None))
pressure_delta, pressure_trend = pressure_trend_info(forecast)
temp_delta_6h, temp_trend_6h = temp_trend_info(forecast)