modified: app.py

modified:   static/css/style.css
	modified:   templates/weather.html
This commit is contained in:
simon
2026-04-22 13:48:21 +02:00
parent 1f0849d9a1
commit 778c024335
3 changed files with 316 additions and 21 deletions

199
app.py
View File

@@ -172,6 +172,140 @@ def _round_temp(k):
return None
return round(float(k), 1)
def _clamp(value, min_value, max_value):
return max(min_value, min(max_value, value))
def uv_risk_info(uv_index):
if uv_index is None:
return "", "na"
uv = float(uv_index)
if uv < 3:
return "niedrig", "low"
if uv < 6:
return "moderat", "moderate"
if uv < 8:
return "hoch", "high"
if uv < 11:
return "sehr hoch", "very-high"
return "extrem", "extreme"
def hour_confidence_score(temp_c, precip_mm, rain_prob, wind_kmh, gust_kmh, cloud_pct):
score = 100
if temp_c is None:
score -= 18
if precip_mm is not None:
score -= _clamp(precip_mm * 20, 0, 40)
if rain_prob is not None:
score -= _clamp(rain_prob * 0.35, 0, 30)
if wind_kmh is not None:
score -= _clamp((wind_kmh - 25) * 0.6, 0, 18)
if gust_kmh is not None:
score -= _clamp((gust_kmh - 45) * 0.45, 0, 16)
if cloud_pct is not None:
score -= _clamp((cloud_pct - 85) * 0.4, 0, 8)
score = int(round(_clamp(score, 5, 99)))
if score >= 80:
return score, "hoch"
if score >= 60:
return score, "mittel"
return score, "niedrig"
def activity_score(temp_c, precip_mm, rain_prob, wind_kmh, gust_kmh, uv_index):
score = 100.0
if temp_c is not None:
score -= abs(temp_c - 20) * 3.5
if precip_mm is not None:
score -= _clamp(precip_mm * 35, 0, 45)
if rain_prob is not None:
score -= _clamp(rain_prob * 0.45, 0, 35)
if wind_kmh is not None:
score -= _clamp((wind_kmh - 18) * 0.7, 0, 16)
if gust_kmh is not None:
score -= _clamp((gust_kmh - 35) * 0.55, 0, 12)
if uv_index is not None and uv_index > 6:
score -= _clamp((uv_index - 6) * 6, 0, 16)
return int(round(_clamp(score, 0, 100)))
def best_activity_window(forecast, horizon_hours=24, window_size=2):
hours = forecast[:horizon_hours]
if len(hours) < window_size:
return None
best = None
for i in range(0, len(hours) - window_size + 1):
segment = hours[i:i + window_size]
scores = [h.get("activity_score") for h in segment if h.get("activity_score") is not None]
if not scores:
continue
avg_score = round(sum(scores) / len(scores))
if best is None or avg_score > best["score"]:
best = {
"start": segment[0]["datetime"],
"end": segment[-1]["datetime"],
"score": int(avg_score),
}
return best
def pressure_trend_info(forecast, step_hours=6):
if len(forecast) <= step_hours:
return None, None
p0 = forecast[0].get("pressure_hpa")
p1 = forecast[step_hours].get("pressure_hpa")
if p0 is None or p1 is None:
return None, None
delta = round(p1 - p0, 1)
if delta >= 1.5:
return delta, "steigend"
if delta <= -1.5:
return delta, "fallend"
return delta, "stabil"
def temp_trend_info(forecast, step_hours=6):
if len(forecast) <= step_hours:
return None, None
t0 = forecast[0].get("temp_c")
t1 = forecast[step_hours].get("temp_c")
if t0 is None or t1 is None:
return None, None
delta = round(t1 - t0, 1)
if delta >= 1.0:
return delta, "waermer"
if delta <= -1.0:
return delta, "kaelter"
return delta, "konstant"
def _parse_warning_datetime(value):
if value in (None, ""):
return None
try:
if isinstance(value, (int, float)):
ts = pd.Timestamp(value, unit="ms", tz="UTC")
else:
v = str(value).strip()
if v.isdigit():
ts = pd.Timestamp(int(v), unit="ms", tz="UTC")
else:
ts = pd.Timestamp(v)
if ts.tzinfo is None:
ts = ts.tz_localize("UTC")
return ts.tz_convert(_get_berlin()).tz_localize(None).to_pydatetime()
except Exception:
return None
def pick_daily_icon(hours):
if not hours:
return "☀️"
if any((h.get("temp_c") is not None and h["temp_c"] <= 0 and ((h.get("precip_mm") or 0) > 0.1 or (h.get("rain_prob") or 0) >= 50)) for h in hours):
return "❄️"
if any(((h.get("precip_mm") or 0) >= 0.6 or (h.get("rain_prob") or 0) >= 70) for h in hours):
return "🌧️"
clouds = [h.get("cloud_pct") for h in hours if h.get("cloud_pct") is not None]
avg_cloud = sum(clouds) / len(clouds) if clouds else 0
if avg_cloud > 80:
return "☁️"
if avg_cloud > 40:
return ""
return "☀️"
def feels_like(temp_c, wind_kmh, cloud_pct):
if temp_c is None:
return None
@@ -319,7 +453,10 @@ def get_mosmix_forecast(lat, lon, hours=72):
wind_dir = float(wd) if not _isnan(wd) else None
uv_raw = p.get("uv_index")
uv = round(float(uv_raw),1) if not _isnan(uv_raw) else None
uv_label, uv_level = uv_risk_info(uv)
feels = feels_like(temp_c, wind_kmh, clouds)
confidence_score, confidence_label = hour_confidence_score(temp_c, precip, rain_prob, wind_kmh, gust_kmh, clouds)
a_score = activity_score(temp_c, precip, rain_prob, wind_kmh, gust_kmh, uv)
dt_local = pd.Timestamp(date_val).tz_convert(berlin).tz_localize(None)
forecast.append({
"datetime": dt_local,
@@ -334,6 +471,11 @@ def get_mosmix_forecast(lat, lon, hours=72):
"sun_min": sun_min,
"wind_dir": wind_dir,
"uv_index": uv,
"uv_label": uv_label,
"uv_level": uv_level,
"confidence": confidence_score,
"confidence_label": confidence_label,
"activity_score": a_score,
"icon": weather_icon(clouds, precip, rain_prob, temp_c),
})
result_data = (forecast, station_info)
@@ -343,7 +485,7 @@ def get_mosmix_forecast(lat, lon, hours=72):
app.logger.exception("MOSMIX forecast loading failed")
return [], {}
def filter_unrealistic_warnings(warnings, forecast):
def filter_unrealistic_warnings(warnings, forecast, now_local=None):
"""
Filter out warnings that contradict the actual forecast.
E.g., frost warning when min temp is > 0°C in next 48 hours.
@@ -356,29 +498,45 @@ def filter_unrealistic_warnings(warnings, forecast):
rain_keywords = {"regen", "starkregen", "dauerregen"}
try:
temps_48h = [h.get("temp_c") for h in forecast[:48] if h.get("temp_c") is not None]
precip_48h = [h.get("precip_mm") for h in forecast[:48] if h.get("precip_mm") is not None]
min_temp_48h = min(temps_48h) if temps_48h else None
max_precip_48h = max(precip_48h) if precip_48h else 0
for w in warnings:
warn_type = _normalize_text(w.get("type", ""))
headline = _normalize_text(w.get("headline", ""))
onset_dt = _parse_warning_datetime(w.get("onset"))
expires_dt = _parse_warning_datetime(w.get("expires"))
if now_local and expires_dt and expires_dt < now_local:
continue
if onset_dt and expires_dt:
relevant_hours = [
h for h in forecast[:48]
if h.get("datetime") is not None and onset_dt <= h["datetime"] <= expires_dt
]
else:
relevant_hours = forecast[:48]
temps = [h.get("temp_c") for h in relevant_hours if h.get("temp_c") is not None]
precip = [h.get("precip_mm") for h in relevant_hours if h.get("precip_mm") is not None]
rain_prob = [h.get("rain_prob") for h in relevant_hours if h.get("rain_prob") is not None]
min_temp = min(temps) if temps else None
max_precip = max(precip) if precip else 0
max_rain_prob = max(rain_prob) if rain_prob else 0
skip = False
if min_temp_48h is not None and min_temp_48h > 2:
if min_temp is not None and min_temp > 3:
if any(kw in warn_type or kw in headline for kw in frost_keywords):
app.logger.info("Filtered frost warning: min_temp %.1f°C", min_temp_48h)
app.logger.info("Filtered frost warning: min_temp %.1f°C", min_temp)
skip = True
if max_precip_48h < 0.1:
if max_precip < 0.2 and max_rain_prob < 35:
if any(kw in warn_type or kw in headline for kw in rain_keywords):
app.logger.info("Filtered rain warning: max_precip %.1f mm", max_precip_48h)
app.logger.info("Filtered rain warning: max_precip %.1f mm", max_precip)
skip = True
if not skip:
w["onset_dt"] = onset_dt
w["expires_dt"] = expires_dt
filtered.append(w)
except Exception:
app.logger.exception("Error filtering unrealistic warnings")
@@ -442,13 +600,17 @@ def wetter():
forecast = forecast[current_idx:]
sunrise, sunset, dawn, dusk = get_sun_times(lat, lon)
warnings = get_dwd_warnings(lat, lon, state_hint=state_hint)
warnings = filter_unrealistic_warnings(warnings, forecast)
warnings = filter_unrealistic_warnings(warnings, forecast, now_local=now_local_dt.replace(tzinfo=None))
pressure_delta, pressure_trend = pressure_trend_info(forecast)
temp_delta_6h, temp_trend_6h = temp_trend_info(forecast)
best_window = best_activity_window(forecast, horizon_hours=24, window_size=2)
daily = {}
for h in forecast:
dt = h["datetime"]
day = dt.date() if hasattr(dt,"date") else str(dt)[:10]
if day not in daily:
daily[day] = {"temps":[], "precip":0.0, "cloud":[], "wind":[], "icons":[], "rain_prob":[], "uv":[]}
daily[day] = {"temps":[], "precip":0.0, "cloud":[], "wind":[], "icons":[], "rain_prob":[], "uv":[], "hours":[]}
daily[day]["hours"].append(h)
if h["temp_c"] is not None: daily[day]["temps"].append(h["temp_c"])
daily[day]["precip"] += h.get("precip_mm") or 0
if h["cloud_pct"] is not None: daily[day]["cloud"].append(h["cloud_pct"])
@@ -467,14 +629,15 @@ def wetter():
"cloud": round(sum(d["cloud"])/len(d["cloud"])) if d["cloud"] else None,
"wind_max": max(d["wind"]) if d["wind"] else None,
"uv_max": max(d["uv"]) if d["uv"] else None,
"icon": max(set(d["icons"]), key=d["icons"].count),
"icon": pick_daily_icon(d["hours"]),
})
chart_labels, chart_temps, chart_precip, chart_rain_prob = [], [], [], []
chart_labels, chart_temps, chart_feels, chart_precip, chart_rain_prob = [], [], [], [], []
for h in forecast[:48]:
dt = h["datetime"]
label = dt.strftime("%d.%m %H:%M") if hasattr(dt,"strftime") else str(dt)[5:16]
chart_labels.append(label)
chart_temps.append(h["temp_c"])
chart_feels.append(h.get("feels_like"))
chart_precip.append(h.get("precip_mm") or 0)
chart_rain_prob.append(h.get("rain_prob") or 0)
return render_template(
@@ -482,12 +645,16 @@ def wetter():
ort=ort, display_name=display_name, lat=lat, lon=lon,
station_name=station_name, station_id=station_id, station_dist=station_dist,
current=current, now_local=now_local,
pressure_delta=pressure_delta, pressure_trend=pressure_trend,
temp_delta_6h=temp_delta_6h, temp_trend_6h=temp_trend_6h,
best_window=best_window,
sunrise=sunrise, sunset=sunset,
warnings=warnings,
forecast=forecast[:48], daily=daily_summary,
chart_labels=chart_labels, chart_temps=chart_temps,
chart_labels=chart_labels, chart_temps=chart_temps, chart_feels=chart_feels,
chart_precip=chart_precip, chart_rain_prob=chart_rain_prob,
wind_dir_name=wind_direction_name,
uv_risk_info=uv_risk_info,
)
@app.route("/api/suggest")