""" Battery Charging Optimizer für OpenEMS + GoodWe Nutzt das bestehende manuelle Steuerungssystem Speicherort: /config/pyscript/battery_charging_optimizer.py Version: 3.3.1 - FIXED: SOC-Plausibilitäts-Check (filtert 65535% Spikes beim Modus-Wechsel) Setzt charge_power_battery als negativen Wert (Laden = negativ) Nutzt bestehende Automations für ESS-Modus und Keep-Alive """ import json from datetime import datetime, timedelta from zoneinfo import ZoneInfo # Konstante für Timezone TIMEZONE = ZoneInfo("Europe/Berlin") def get_local_now(): """Gibt die aktuelle Zeit in lokaler Timezone zurück (Europe/Berlin)""" return datetime.now(TIMEZONE) @service def calculate_charging_schedule(): """ Berechnet den optimalen Ladeplan für die nächsten 24-48 Stunden Wird täglich um 14:05 Uhr nach Strompreis-Update ausgeführt Nutzt Ranking-Methode: Wählt die N günstigsten Stunden aus """ log.info("=== Batterie-Optimierung gestartet (v3.2 - FIXED Timezones) ===") # Prüfe ob Optimierung aktiviert ist if state.get('input_boolean.battery_optimizer_enabled') != 'on': log.info("Optimierung ist deaktiviert") input_text.battery_optimizer_status = "Deaktiviert" return try: # Konfiguration laden config = load_configuration() log.info(f"Konfiguration geladen: SOC {config['min_soc']}-{config['max_soc']}%, Max {config['max_charge_power']}W") # Strompreise laden (MIT TOMORROW-SUPPORT!) price_data = get_electricity_prices() if not price_data: log.error("Keine Strompreis-Daten verfügbar") input_text.battery_optimizer_status = "Fehler: Keine Preisdaten" return # Check ob Tomorrow-Daten dabei sind has_tomorrow = False for p in price_data: if p.get('is_tomorrow', False): has_tomorrow = True break log.info(f"Strompreise geladen: {len(price_data)} Stunden (Tomorrow: {has_tomorrow})") # PV-Prognose laden pv_forecast = get_pv_forecast() pv_today = pv_forecast.get('today', 0) pv_tomorrow = pv_forecast.get('tomorrow', 0) log.info(f"PV-Prognose: Heute {pv_today:.1f} kWh, Morgen {pv_tomorrow:.1f} kWh") # Batterie-Status laden mit Plausibilitäts-Check current_soc = float(state.get('sensor.esssoc') or 50) # SOC-Plausibilitäts-Check (filtert ungültige Werte wie 65535%) if current_soc > 100 or current_soc < 0: log.warning(f"⚠ Ungültiger SOC-Wert erkannt: {current_soc}%. Verwende Fallback-Wert 50%.") current_soc = 50 log.info(f"Aktueller SOC: {current_soc}%") # Optimierung durchführen schedule = optimize_charging( price_data=price_data, pv_forecast=pv_forecast, current_soc=current_soc, config=config ) # Plan speichern save_schedule(schedule, has_tomorrow) # Statistiken ausgeben log_statistics(schedule, price_data, has_tomorrow) log.info("=== Optimierung abgeschlossen ===") except Exception as e: log.error(f"Fehler bei Optimierung: {e}") import traceback log.error(f"Traceback: {traceback.format_exc()}") input_text.battery_optimizer_status = f"Fehler: {str(e)[:100]}" def load_configuration(): """Lädt alle Konfigurations-Parameter aus Input Helpern""" return { 'battery_capacity': float(state.get('input_number.battery_capacity_kwh') or 10) * 1000, # in Wh 'min_soc': float(state.get('input_number.battery_optimizer_min_soc') or 20), 'max_soc': float(state.get('input_number.battery_optimizer_max_soc') or 100), 'max_charge_power': float(state.get('input_number.battery_optimizer_max_charge_power') or 5000), 'price_threshold': float(state.get('input_number.battery_optimizer_price_threshold') or 28), 'reserve_capacity': float(state.get('input_number.battery_optimizer_reserve_capacity') or 2) * 1000, # in Wh 'pv_threshold': float(state.get('input_number.battery_optimizer_pv_threshold') or 500), # in Wh } def get_electricity_prices(): """ Holt Strompreise von haStrom FLEX PRO Extended (mit Tomorrow-Support) Fallback auf alten Sensor falls Extended nicht verfügbar FIXED: Proper timezone handling - alle Datetimes in Europe/Berlin """ # Versuche ZUERST den Extended-Sensor (mit Tomorrow) price_entity_ext = 'sensor.hastrom_flex_pro_ext' prices_attr_ext = state.getattr(price_entity_ext) # Fallback auf alten Sensor price_entity_old = 'sensor.hastrom_flex_pro' prices_attr_old = state.getattr(price_entity_old) # Wähle den besten verfügbaren Sensor if prices_attr_ext and prices_attr_ext.get('prices_today'): price_entity = price_entity_ext prices_attr = prices_attr_ext log.info(f"✓ Nutze Extended-Sensor: {price_entity}") elif prices_attr_old and prices_attr_old.get('prices_today'): price_entity = price_entity_old prices_attr = prices_attr_old log.warning(f"⚠ Extended-Sensor nicht verfügbar, nutze alten Sensor: {price_entity}") else: log.error("Keine Strompreis-Sensoren verfügbar") return None # Heute (immer vorhanden) prices_today = prices_attr.get('prices_today', []) datetime_today = prices_attr.get('datetime_today', []) # Morgen (nur bei Extended-Sensor) prices_tomorrow = prices_attr.get('prices_tomorrow', []) datetime_tomorrow = prices_attr.get('datetime_tomorrow', []) tomorrow_available = prices_attr.get('tomorrow_available', False) if not prices_today or not datetime_today: log.error(f"Keine Preis-Daten in {price_entity}") return None if len(prices_today) != len(datetime_today): log.error(f"Preis-Array und DateTime-Array haben unterschiedliche Längen") return None # FIXED: Konvertiere zu einheitlichem Format mit LOKALER Timezone price_data = [] now_local = get_local_now() current_date = now_local.date() # HEUTE for i, price in enumerate(prices_today): try: dt_str = datetime_today[i] # Parse als naive datetime, dann lokalisieren nach Europe/Berlin dt_naive = datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S") dt = dt_naive.replace(tzinfo=TIMEZONE) price_data.append({ 'datetime': dt, 'hour': dt.hour, 'date': dt.date(), 'price': float(price), 'is_tomorrow': False }) except Exception as e: log.warning(f"Fehler beim Verarbeiten von Heute-Preis {i}: {e}") continue # MORGEN (nur wenn verfügbar) if tomorrow_available and prices_tomorrow and datetime_tomorrow: if len(prices_tomorrow) == len(datetime_tomorrow): log.info(f"✓ Tomorrow-Daten verfügbar: {len(prices_tomorrow)} Stunden") for i, price in enumerate(prices_tomorrow): try: dt_str = datetime_tomorrow[i] # Parse als naive datetime, dann lokalisieren nach Europe/Berlin dt_naive = datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S") dt = dt_naive.replace(tzinfo=TIMEZONE) price_data.append({ 'datetime': dt, 'hour': dt.hour, 'date': dt.date(), 'price': float(price), 'is_tomorrow': True }) except Exception as e: log.warning(f"Fehler beim Verarbeiten von Morgen-Preis {i}: {e}") continue else: log.warning("Tomorrow-Arrays haben unterschiedliche Längen") else: log.info("Tomorrow-Daten noch nicht verfügbar (normal vor 14 Uhr)") return price_data def get_pv_forecast(): """ Holt PV-Prognose von Forecast.Solar (Ost + West Array) """ sensor_east = 'sensor.energy_production_today' sensor_west = 'sensor.energy_production_today_2' sensor_east_tomorrow = 'sensor.energy_production_tomorrow' sensor_west_tomorrow = 'sensor.energy_production_tomorrow_2' # Heute try: east_today = float(state.get(sensor_east) or 0) west_today = float(state.get(sensor_west) or 0) pv_today = east_today + west_today except: pv_today = 0 log.warning("Konnte PV-Heute nicht laden") # Morgen try: east_tomorrow = float(state.get(sensor_east_tomorrow) or 0) west_tomorrow = float(state.get(sensor_west_tomorrow) or 0) pv_tomorrow = east_tomorrow + west_tomorrow except: pv_tomorrow = 0 log.warning("Konnte PV-Morgen nicht laden") # Stündliche Werte (vereinfacht) pv_wh_per_hour = {} return { 'today': pv_today, 'tomorrow': pv_tomorrow, 'hourly': pv_wh_per_hour } def optimize_charging(price_data, pv_forecast, current_soc, config): """ NEUE Ranking-basierte Optimierung Strategie: 1. Berechne benötigte Ladestunden basierend auf Kapazität 2. Ranke ALLE Stunden nach Preis 3. Wähle die N günstigsten aus 4. Führe chronologisch aus FIXED: Proper timezone-aware datetime handling """ now = get_local_now() current_hour = now.hour current_date = now.date() log.info(f"Lokale Zeit: {now.strftime('%Y-%m-%d %H:%M:%S %Z')}") # Filter: Nur zukünftige Stunden # FIXED: Berücksichtige auch Minuten - wenn es 14:30 ist, schließe Stunde 14 aus future_price_data = [] for p in price_data: # Vergleiche volle datetime-Objekte if p['datetime'] > now: future_price_data.append(p) log.info(f"Planungsfenster: {len(future_price_data)} Stunden (ab jetzt)") if not future_price_data: log.error("Keine zukünftigen Preise verfügbar") return [] # Preis-Statistik all_prices = [p['price'] for p in future_price_data] min_price = min(all_prices) max_price = max(all_prices) avg_price = sum(all_prices) / len(all_prices) log.info(f"Preise: Min={min_price:.2f}, Max={max_price:.2f}, Avg={avg_price:.2f} ct/kWh") # Verfügbare Ladekapazität berechnen available_capacity_wh = (config['max_soc'] - current_soc) / 100 * config['battery_capacity'] available_capacity_wh -= config['reserve_capacity'] if available_capacity_wh <= 0: log.info("Batterie ist voll oder Reserve erreicht - keine Ladung nötig") return create_auto_only_schedule(future_price_data) log.info(f"Verfügbare Ladekapazität: {available_capacity_wh/1000:.2f} kWh") # ========================================== # Berechne benötigte Ladestunden # ========================================== max_charge_per_hour = config['max_charge_power'] # Wh pro Stunde needed_hours = int((available_capacity_wh + max_charge_per_hour - 1) / max_charge_per_hour) # Aufrunden # Mindestens 1 Stunde, maximal verfügbare Stunden needed_hours = max(1, min(needed_hours, len(future_price_data))) log.info(f"🎯 Benötigte Ladestunden: {needed_hours} (bei {max_charge_per_hour}W pro Stunde)") # ========================================== # RANKING: Erstelle Kandidaten-Liste # ========================================== charging_candidates = [] for p in future_price_data: # PV-Prognose für diese Stunde pv_wh = pv_forecast['hourly'].get(p['hour'], 0) # Score: Niedriger = besser # Hauptfaktor: Preis # Bonus: PV reduziert Score leicht score = p['price'] - (pv_wh / 1000) charging_candidates.append({ 'datetime': p['datetime'], 'hour': p['hour'], 'date': p['date'], 'price': p['price'], 'pv_wh': pv_wh, 'is_tomorrow': p.get('is_tomorrow', False), 'score': score }) # Sortiere nach Score (beste zuerst) charging_candidates.sort(key=lambda x: x['score']) # Wähle die N besten Stunden selected_hours = charging_candidates[:needed_hours] # ========================================== # Logging: Zeige Auswahl # ========================================== if selected_hours: prices_selected = [h['price'] for h in selected_hours] log.info(f"✓ Top {needed_hours} günstigste Stunden ausgewählt:") log.info(f" - Preise: {min(prices_selected):.2f} - {max(prices_selected):.2f} ct/kWh") log.info(f" - Durchschnitt: {sum(prices_selected)/len(prices_selected):.2f} ct/kWh") log.info(f" - Ersparnis vs. Durchschnitt: {(avg_price - sum(prices_selected)/len(prices_selected)):.2f} ct/kWh") # Zähle Tomorrow tomorrow_count = 0 for h in selected_hours: if h['is_tomorrow']: tomorrow_count += 1 log.info(f" - Davon morgen: {tomorrow_count}") # Zeige die ausgewählten Zeiten times = [] for h in sorted(selected_hours, key=lambda x: x['datetime']): day_str = "morgen" if h['is_tomorrow'] else "heute" times.append(f"{h['hour']:02d}:00 {day_str} ({h['price']:.2f}ct)") log.info(f" - Zeiten: {', '.join(times[:5])}") # Ersten 5 zeigen # ========================================== # Erstelle Ladeplan # ========================================== schedule = [] remaining_capacity = available_capacity_wh # Erstelle Set der ausgewählten Datetimes für schnellen Lookup selected_datetimes = set() for h in selected_hours: selected_datetimes.add(h['datetime']) for p in future_price_data: if p['datetime'] in selected_datetimes and remaining_capacity > 0: # Diese Stunde laden charge_wh = min(config['max_charge_power'], remaining_capacity) # Finde das Kandidaten-Objekt für Details candidate = None for h in selected_hours: if h['datetime'] == p['datetime']: candidate = h break day_str = "morgen" if p.get('is_tomorrow', False) else "heute" pv_wh = candidate['pv_wh'] if candidate else 0 # Finde Rang rank = 1 for c in charging_candidates: if c['datetime'] == p['datetime']: break rank += 1 schedule.append({ 'datetime': p['datetime'].isoformat(), 'hour': p['hour'], 'date': p['date'].isoformat(), 'action': 'charge', 'power_w': -int(charge_wh), 'price': p['price'], 'pv_wh': pv_wh, 'is_tomorrow': p.get('is_tomorrow', False), 'reason': f"Rang {rank}/{len(charging_candidates)}: {p['price']:.2f}ct [{day_str}]" }) remaining_capacity -= charge_wh else: # Auto-Modus reason = "Automatik" # Debugging: Warum nicht geladen? if p['datetime'] not in selected_datetimes: # Finde Position im Ranking rank = 1 for candidate in charging_candidates: if candidate['datetime'] == p['datetime']: break rank += 1 if rank <= len(charging_candidates): reason = f"Rang {rank} (nicht unter Top {needed_hours})" schedule.append({ 'datetime': p['datetime'].isoformat(), 'hour': p['hour'], 'date': p['date'].isoformat(), 'action': 'auto', 'power_w': 0, 'price': p['price'], 'is_tomorrow': p.get('is_tomorrow', False), 'reason': reason }) return schedule def create_auto_only_schedule(price_data): """Erstellt einen Plan nur mit Auto-Modus (keine Ladung)""" schedule = [] for p in price_data: schedule.append({ 'datetime': p['datetime'].isoformat(), 'hour': p['hour'], 'date': p['date'].isoformat(), 'action': 'auto', 'power_w': 0, 'price': p['price'], 'is_tomorrow': p.get('is_tomorrow', False), 'reason': "Keine Ladung nötig (Batterie voll)" }) return schedule def save_schedule(schedule, has_tomorrow): """Speichert den Schedule als PyScript State mit Attributen""" if not schedule: log.warning("Leerer Schedule - nichts zu speichern") return # Berechne Statistiken num_charges = 0 num_charges_tomorrow = 0 total_energy = 0 total_price = 0 first_charge = None first_charge_tomorrow = None for s in schedule: if s['action'] == 'charge': num_charges += 1 total_energy += abs(s['power_w']) total_price += s['price'] if first_charge is None: first_charge = s['datetime'] if s.get('is_tomorrow', False): num_charges_tomorrow += 1 if first_charge_tomorrow is None: first_charge_tomorrow = s['datetime'] total_energy_kwh = total_energy / 1000 avg_price = total_price / num_charges if num_charges > 0 else 0 # Speichere als PyScript State state.set( 'pyscript.battery_charging_schedule', value='active', new_attributes={ 'schedule': schedule, 'last_update': get_local_now().isoformat(), 'num_hours': len(schedule), 'num_charges': num_charges, 'num_charges_tomorrow': num_charges_tomorrow, 'total_energy_kwh': round(total_energy_kwh, 2), 'avg_charge_price': round(avg_price, 2), 'first_charge_time': first_charge, 'first_charge_tomorrow': first_charge_tomorrow, 'has_tomorrow_data': has_tomorrow, 'estimated_savings': 0 } ) log.info(f"✓ Ladeplan gespeichert: {len(schedule)} Stunden, {num_charges} Ladungen ({num_charges_tomorrow} morgen)") # Status aktualisieren status_parts = [] if num_charges > 0: status_parts.append(f"{num_charges} Ladungen") if num_charges_tomorrow > 0: status_parts.append(f"({num_charges_tomorrow} morgen)") else: status_parts.append("Keine Ladung") input_text.battery_optimizer_status = " ".join(status_parts) def log_statistics(schedule, price_data, has_tomorrow): """Gibt Statistiken über den erstellten Plan aus""" # Filterung charges_today = [] charges_tomorrow = [] for s in schedule: if s['action'] == 'charge': if s.get('is_tomorrow', False): charges_tomorrow.append(s) else: charges_today.append(s) log.info(f"📊 Statistik:") log.info(f" - Planungsfenster: {len(schedule)} Stunden") log.info(f" - Tomorrow-Daten: {'✓ Ja' if has_tomorrow else '✗ Nein'}") log.info(f" - Ladungen heute: {len(charges_today)}") log.info(f" - Ladungen morgen: {len(charges_tomorrow)}") if charges_today: total_energy_today = 0 total_price_today = 0 for s in charges_today: total_energy_today += abs(s['power_w']) total_price_today += s['price'] total_energy_today = total_energy_today / 1000 avg_price_today = total_price_today / len(charges_today) log.info(f" - Energie heute: {total_energy_today:.1f} kWh @ {avg_price_today:.2f} ct/kWh") if charges_tomorrow: total_energy_tomorrow = 0 total_price_tomorrow = 0 for s in charges_tomorrow: total_energy_tomorrow += abs(s['power_w']) total_price_tomorrow += s['price'] total_energy_tomorrow = total_energy_tomorrow / 1000 avg_price_tomorrow = total_price_tomorrow / len(charges_tomorrow) log.info(f" - Energie morgen: {total_energy_tomorrow:.1f} kWh @ {avg_price_tomorrow:.2f} ct/kWh") @service def execute_charging_schedule(): """ Führt den aktuellen Ladeplan aus (stündlich aufgerufen) FIXED: Proper timezone-aware datetime comparison """ # Prüfe ob Optimierung aktiv ist if state.get('input_boolean.battery_optimizer_enabled') != 'on': return # Prüfe auf manuelle Überschreibung if state.get('input_boolean.battery_optimizer_manual_override') == 'on': log.info("Manuelle Überschreibung aktiv - überspringe Ausführung") return # Lade Schedule schedule_attr = state.getattr('pyscript.battery_charging_schedule') if not schedule_attr or 'schedule' not in schedule_attr: log.warning("Kein Ladeplan vorhanden") return schedule = schedule_attr['schedule'] # FIXED: Aktuelle Zeit in lokaler Timezone now = get_local_now() current_hour = now.hour current_date = now.date() log.info(f"Suche Ladeplan für {current_date} {current_hour}:00 Uhr (lokal)") # Finde passenden Eintrag current_entry = None for entry in schedule: # FIXED: Parse datetime mit timezone entry_dt = datetime.fromisoformat(entry['datetime']) # Wenn kein timezone info, füge Europe/Berlin hinzu if entry_dt.tzinfo is None: entry_dt = entry_dt.replace(tzinfo=TIMEZONE) entry_date = entry_dt.date() entry_hour = entry_dt.hour # Exakter Match auf Datum + Stunde if entry_date == current_date and entry_hour == current_hour: current_entry = entry log.info(f"✓ Gefunden: {entry['datetime']}") break if not current_entry: log.warning(f"⚠ Keine Daten für {current_date} {current_hour}:00") return # Führe Aktion aus action = current_entry['action'] power_w = current_entry['power_w'] price = current_entry['price'] reason = current_entry.get('reason', '') is_tomorrow = current_entry.get('is_tomorrow', False) day_str = "morgen" if is_tomorrow else "heute" log.info(f"⚡ Stunde {current_hour}:00 [{day_str}]: action={action}, power={power_w}W, price={price:.2f}ct") log.info(f" Grund: {reason}") if action == 'charge': log.info(f"🔋 AKTIVIERE LADEN mit {abs(power_w)}W") # Setze Ziel-Leistung als NEGATIVEN Wert (Laden = negativ) # Die Keep-Alive Automation liest diesen Wert und sendet ihn direkt an ess_set_power charging_power = -abs(power_w) # Negativ für Laden! input_number.charge_power_battery = float(charging_power) log.info(f"⚡ Ladeleistung gesetzt: {charging_power}W") # Aktiviere manuellen Modus # Dies triggert die Automation "manuelle_speicherbeladung_aktivieren" die: # 1. ESS auf REMOTE-Modus schaltet # 2. Die Keep-Alive Automation aktiviert (sendet alle 30s Modbus-Befehl) input_boolean.goodwe_manual_control = "on" log.info("✓ Manuelles Laden aktiviert - Keep-Alive Automation übernimmt") elif action == 'auto': # Deaktiviere manuelles Laden if state.get('input_boolean.goodwe_manual_control') == 'on': log.info("🔄 DEAKTIVIERE manuelles Laden → Auto-Modus") # Dies triggert die Automation "manuelle_speicherbeladung_deaktivieren" die: # 1. Die Keep-Alive Automation deaktiviert # 2. ESS auf INTERNAL-Modus zurückschaltet input_boolean.goodwe_manual_control = "off" log.info("✓ Auto-Modus aktiviert - OpenEMS steuert Batterie") else: log.info("✓ Auto-Modus bereits aktiv") # ==================== # KEINE Zeit-Trigger im Script! # ==================== # Trigger werden über Home Assistant Automations gesteuert: # - Tägliche Berechnung um 14:05 → pyscript.calculate_charging_schedule # - Stündliche Ausführung um xx:05 → pyscript.execute_charging_schedule # - Nach HA-Neustart → pyscript.calculate_charging_schedule # - Mitternacht (optional) → pyscript.calculate_charging_schedule # # Siehe: v3/battery_optimizer_automations.yaml