Files
battery-charging-optimizer/pyscripts/battery_charging_optimizer.py

669 lines
24 KiB
Python

"""
Battery Charging Optimizer für OpenEMS + GoodWe
Nutzt das bestehende manuelle Steuerungssystem
Speicherort: /config/pyscript/battery_charging_optimizer.py
Version: 3.3.1 - FIXED: SOC-Plausibilitäts-Check (filtert 65535% Spikes beim Modus-Wechsel)
Setzt charge_power_battery als negativen Wert (Laden = negativ)
Nutzt bestehende Automations für ESS-Modus und Keep-Alive
"""
import json
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
# Konstante für Timezone
TIMEZONE = ZoneInfo("Europe/Berlin")
def get_local_now():
"""Gibt die aktuelle Zeit in lokaler Timezone zurück (Europe/Berlin)"""
return datetime.now(TIMEZONE)
@service
def calculate_charging_schedule():
"""
Berechnet den optimalen Ladeplan für die nächsten 24-48 Stunden
Wird täglich um 14:05 Uhr nach Strompreis-Update ausgeführt
Nutzt Ranking-Methode: Wählt die N günstigsten Stunden aus
"""
log.info("=== Batterie-Optimierung gestartet (v3.2 - FIXED Timezones) ===")
# Prüfe ob Optimierung aktiviert ist
if state.get('input_boolean.battery_optimizer_enabled') != 'on':
log.info("Optimierung ist deaktiviert")
input_text.battery_optimizer_status = "Deaktiviert"
return
try:
# Konfiguration laden
config = load_configuration()
log.info(f"Konfiguration geladen: SOC {config['min_soc']}-{config['max_soc']}%, Max {config['max_charge_power']}W")
# Strompreise laden (MIT TOMORROW-SUPPORT!)
price_data = get_electricity_prices()
if not price_data:
log.error("Keine Strompreis-Daten verfügbar")
input_text.battery_optimizer_status = "Fehler: Keine Preisdaten"
return
# Check ob Tomorrow-Daten dabei sind
has_tomorrow = False
for p in price_data:
if p.get('is_tomorrow', False):
has_tomorrow = True
break
log.info(f"Strompreise geladen: {len(price_data)} Stunden (Tomorrow: {has_tomorrow})")
# PV-Prognose laden
pv_forecast = get_pv_forecast()
pv_today = pv_forecast.get('today', 0)
pv_tomorrow = pv_forecast.get('tomorrow', 0)
log.info(f"PV-Prognose: Heute {pv_today:.1f} kWh, Morgen {pv_tomorrow:.1f} kWh")
# Batterie-Status laden mit Plausibilitäts-Check
current_soc = float(state.get('sensor.esssoc') or 50)
# SOC-Plausibilitäts-Check (filtert ungültige Werte wie 65535%)
if current_soc > 100 or current_soc < 0:
log.warning(f"⚠ Ungültiger SOC-Wert erkannt: {current_soc}%. Verwende Fallback-Wert 50%.")
current_soc = 50
log.info(f"Aktueller SOC: {current_soc}%")
# Optimierung durchführen
schedule = optimize_charging(
price_data=price_data,
pv_forecast=pv_forecast,
current_soc=current_soc,
config=config
)
# Plan speichern
save_schedule(schedule, has_tomorrow)
# Statistiken ausgeben
log_statistics(schedule, price_data, has_tomorrow)
log.info("=== Optimierung abgeschlossen ===")
except Exception as e:
log.error(f"Fehler bei Optimierung: {e}")
import traceback
log.error(f"Traceback: {traceback.format_exc()}")
input_text.battery_optimizer_status = f"Fehler: {str(e)[:100]}"
def load_configuration():
"""Lädt alle Konfigurations-Parameter aus Input Helpern"""
return {
'battery_capacity': float(state.get('input_number.battery_capacity_kwh') or 10) * 1000, # in Wh
'min_soc': float(state.get('input_number.battery_optimizer_min_soc') or 20),
'max_soc': float(state.get('input_number.battery_optimizer_max_soc') or 100),
'max_charge_power': float(state.get('input_number.battery_optimizer_max_charge_power') or 5000),
'price_threshold': float(state.get('input_number.battery_optimizer_price_threshold') or 28),
'reserve_capacity': float(state.get('input_number.battery_optimizer_reserve_capacity') or 2) * 1000, # in Wh
'pv_threshold': float(state.get('input_number.battery_optimizer_pv_threshold') or 500), # in Wh
}
def get_electricity_prices():
"""
Holt Strompreise von haStrom FLEX PRO Extended (mit Tomorrow-Support)
Fallback auf alten Sensor falls Extended nicht verfügbar
FIXED: Proper timezone handling - alle Datetimes in Europe/Berlin
"""
# Versuche ZUERST den Extended-Sensor (mit Tomorrow)
price_entity_ext = 'sensor.hastrom_flex_pro_ext'
prices_attr_ext = state.getattr(price_entity_ext)
# Fallback auf alten Sensor
price_entity_old = 'sensor.hastrom_flex_pro'
prices_attr_old = state.getattr(price_entity_old)
# Wähle den besten verfügbaren Sensor
if prices_attr_ext and prices_attr_ext.get('prices_today'):
price_entity = price_entity_ext
prices_attr = prices_attr_ext
log.info(f"✓ Nutze Extended-Sensor: {price_entity}")
elif prices_attr_old and prices_attr_old.get('prices_today'):
price_entity = price_entity_old
prices_attr = prices_attr_old
log.warning(f"⚠ Extended-Sensor nicht verfügbar, nutze alten Sensor: {price_entity}")
else:
log.error("Keine Strompreis-Sensoren verfügbar")
return None
# Heute (immer vorhanden)
prices_today = prices_attr.get('prices_today', [])
datetime_today = prices_attr.get('datetime_today', [])
# Morgen (nur bei Extended-Sensor)
prices_tomorrow = prices_attr.get('prices_tomorrow', [])
datetime_tomorrow = prices_attr.get('datetime_tomorrow', [])
tomorrow_available = prices_attr.get('tomorrow_available', False)
if not prices_today or not datetime_today:
log.error(f"Keine Preis-Daten in {price_entity}")
return None
if len(prices_today) != len(datetime_today):
log.error(f"Preis-Array und DateTime-Array haben unterschiedliche Längen")
return None
# FIXED: Konvertiere zu einheitlichem Format mit LOKALER Timezone
price_data = []
now_local = get_local_now()
current_date = now_local.date()
# HEUTE
for i, price in enumerate(prices_today):
try:
dt_str = datetime_today[i]
# Parse als naive datetime, dann lokalisieren nach Europe/Berlin
dt_naive = datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S")
dt = dt_naive.replace(tzinfo=TIMEZONE)
price_data.append({
'datetime': dt,
'hour': dt.hour,
'date': dt.date(),
'price': float(price),
'is_tomorrow': False
})
except Exception as e:
log.warning(f"Fehler beim Verarbeiten von Heute-Preis {i}: {e}")
continue
# MORGEN (nur wenn verfügbar)
if tomorrow_available and prices_tomorrow and datetime_tomorrow:
if len(prices_tomorrow) == len(datetime_tomorrow):
log.info(f"✓ Tomorrow-Daten verfügbar: {len(prices_tomorrow)} Stunden")
for i, price in enumerate(prices_tomorrow):
try:
dt_str = datetime_tomorrow[i]
# Parse als naive datetime, dann lokalisieren nach Europe/Berlin
dt_naive = datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S")
dt = dt_naive.replace(tzinfo=TIMEZONE)
price_data.append({
'datetime': dt,
'hour': dt.hour,
'date': dt.date(),
'price': float(price),
'is_tomorrow': True
})
except Exception as e:
log.warning(f"Fehler beim Verarbeiten von Morgen-Preis {i}: {e}")
continue
else:
log.warning("Tomorrow-Arrays haben unterschiedliche Längen")
else:
log.info("Tomorrow-Daten noch nicht verfügbar (normal vor 14 Uhr)")
return price_data
def get_pv_forecast():
"""
Holt PV-Prognose von Forecast.Solar (Ost + West Array)
"""
sensor_east = 'sensor.energy_production_today'
sensor_west = 'sensor.energy_production_today_2'
sensor_east_tomorrow = 'sensor.energy_production_tomorrow'
sensor_west_tomorrow = 'sensor.energy_production_tomorrow_2'
# Heute
try:
east_today = float(state.get(sensor_east) or 0)
west_today = float(state.get(sensor_west) or 0)
pv_today = east_today + west_today
except:
pv_today = 0
log.warning("Konnte PV-Heute nicht laden")
# Morgen
try:
east_tomorrow = float(state.get(sensor_east_tomorrow) or 0)
west_tomorrow = float(state.get(sensor_west_tomorrow) or 0)
pv_tomorrow = east_tomorrow + west_tomorrow
except:
pv_tomorrow = 0
log.warning("Konnte PV-Morgen nicht laden")
# Stündliche Werte (vereinfacht)
pv_wh_per_hour = {}
return {
'today': pv_today,
'tomorrow': pv_tomorrow,
'hourly': pv_wh_per_hour
}
def optimize_charging(price_data, pv_forecast, current_soc, config):
"""
NEUE Ranking-basierte Optimierung
Strategie:
1. Berechne benötigte Ladestunden basierend auf Kapazität
2. Ranke ALLE Stunden nach Preis
3. Wähle die N günstigsten aus
4. Führe chronologisch aus
FIXED: Proper timezone-aware datetime handling
"""
now = get_local_now()
current_hour = now.hour
current_date = now.date()
log.info(f"Lokale Zeit: {now.strftime('%Y-%m-%d %H:%M:%S %Z')}")
# Filter: Nur zukünftige Stunden
# FIXED: Berücksichtige auch Minuten - wenn es 14:30 ist, schließe Stunde 14 aus
future_price_data = []
for p in price_data:
# Vergleiche volle datetime-Objekte
if p['datetime'] > now:
future_price_data.append(p)
log.info(f"Planungsfenster: {len(future_price_data)} Stunden (ab jetzt)")
if not future_price_data:
log.error("Keine zukünftigen Preise verfügbar")
return []
# Preis-Statistik
all_prices = [p['price'] for p in future_price_data]
min_price = min(all_prices)
max_price = max(all_prices)
avg_price = sum(all_prices) / len(all_prices)
log.info(f"Preise: Min={min_price:.2f}, Max={max_price:.2f}, Avg={avg_price:.2f} ct/kWh")
# Verfügbare Ladekapazität berechnen
available_capacity_wh = (config['max_soc'] - current_soc) / 100 * config['battery_capacity']
available_capacity_wh -= config['reserve_capacity']
if available_capacity_wh <= 0:
log.info("Batterie ist voll oder Reserve erreicht - keine Ladung nötig")
return create_auto_only_schedule(future_price_data)
log.info(f"Verfügbare Ladekapazität: {available_capacity_wh/1000:.2f} kWh")
# ==========================================
# Berechne benötigte Ladestunden
# ==========================================
max_charge_per_hour = config['max_charge_power'] # Wh pro Stunde
needed_hours = int((available_capacity_wh + max_charge_per_hour - 1) / max_charge_per_hour) # Aufrunden
# Mindestens 1 Stunde, maximal verfügbare Stunden
needed_hours = max(1, min(needed_hours, len(future_price_data)))
log.info(f"🎯 Benötigte Ladestunden: {needed_hours} (bei {max_charge_per_hour}W pro Stunde)")
# ==========================================
# RANKING: Erstelle Kandidaten-Liste
# ==========================================
charging_candidates = []
for p in future_price_data:
# PV-Prognose für diese Stunde
pv_wh = pv_forecast['hourly'].get(p['hour'], 0)
# Score: Niedriger = besser
# Hauptfaktor: Preis
# Bonus: PV reduziert Score leicht
score = p['price'] - (pv_wh / 1000)
charging_candidates.append({
'datetime': p['datetime'],
'hour': p['hour'],
'date': p['date'],
'price': p['price'],
'pv_wh': pv_wh,
'is_tomorrow': p.get('is_tomorrow', False),
'score': score
})
# Sortiere nach Score (beste zuerst)
charging_candidates.sort(key=lambda x: x['score'])
# Wähle die N besten Stunden
selected_hours = charging_candidates[:needed_hours]
# ==========================================
# Logging: Zeige Auswahl
# ==========================================
if selected_hours:
prices_selected = [h['price'] for h in selected_hours]
log.info(f"✓ Top {needed_hours} günstigste Stunden ausgewählt:")
log.info(f" - Preise: {min(prices_selected):.2f} - {max(prices_selected):.2f} ct/kWh")
log.info(f" - Durchschnitt: {sum(prices_selected)/len(prices_selected):.2f} ct/kWh")
log.info(f" - Ersparnis vs. Durchschnitt: {(avg_price - sum(prices_selected)/len(prices_selected)):.2f} ct/kWh")
# Zähle Tomorrow
tomorrow_count = 0
for h in selected_hours:
if h['is_tomorrow']:
tomorrow_count += 1
log.info(f" - Davon morgen: {tomorrow_count}")
# Zeige die ausgewählten Zeiten
times = []
for h in sorted(selected_hours, key=lambda x: x['datetime']):
day_str = "morgen" if h['is_tomorrow'] else "heute"
times.append(f"{h['hour']:02d}:00 {day_str} ({h['price']:.2f}ct)")
log.info(f" - Zeiten: {', '.join(times[:5])}") # Ersten 5 zeigen
# ==========================================
# Erstelle Ladeplan
# ==========================================
schedule = []
remaining_capacity = available_capacity_wh
# Erstelle Set der ausgewählten Datetimes für schnellen Lookup
selected_datetimes = set()
for h in selected_hours:
selected_datetimes.add(h['datetime'])
for p in future_price_data:
if p['datetime'] in selected_datetimes and remaining_capacity > 0:
# Diese Stunde laden
charge_wh = min(config['max_charge_power'], remaining_capacity)
# Finde das Kandidaten-Objekt für Details
candidate = None
for h in selected_hours:
if h['datetime'] == p['datetime']:
candidate = h
break
day_str = "morgen" if p.get('is_tomorrow', False) else "heute"
pv_wh = candidate['pv_wh'] if candidate else 0
# Finde Rang
rank = 1
for c in charging_candidates:
if c['datetime'] == p['datetime']:
break
rank += 1
schedule.append({
'datetime': p['datetime'].isoformat(),
'hour': p['hour'],
'date': p['date'].isoformat(),
'action': 'charge',
'power_w': -int(charge_wh),
'price': p['price'],
'pv_wh': pv_wh,
'is_tomorrow': p.get('is_tomorrow', False),
'reason': f"Rang {rank}/{len(charging_candidates)}: {p['price']:.2f}ct [{day_str}]"
})
remaining_capacity -= charge_wh
else:
# Auto-Modus
reason = "Automatik"
# Debugging: Warum nicht geladen?
if p['datetime'] not in selected_datetimes:
# Finde Position im Ranking
rank = 1
for candidate in charging_candidates:
if candidate['datetime'] == p['datetime']:
break
rank += 1
if rank <= len(charging_candidates):
reason = f"Rang {rank} (nicht unter Top {needed_hours})"
schedule.append({
'datetime': p['datetime'].isoformat(),
'hour': p['hour'],
'date': p['date'].isoformat(),
'action': 'auto',
'power_w': 0,
'price': p['price'],
'is_tomorrow': p.get('is_tomorrow', False),
'reason': reason
})
return schedule
def create_auto_only_schedule(price_data):
"""Erstellt einen Plan nur mit Auto-Modus (keine Ladung)"""
schedule = []
for p in price_data:
schedule.append({
'datetime': p['datetime'].isoformat(),
'hour': p['hour'],
'date': p['date'].isoformat(),
'action': 'auto',
'power_w': 0,
'price': p['price'],
'is_tomorrow': p.get('is_tomorrow', False),
'reason': "Keine Ladung nötig (Batterie voll)"
})
return schedule
def save_schedule(schedule, has_tomorrow):
"""Speichert den Schedule als PyScript State mit Attributen"""
if not schedule:
log.warning("Leerer Schedule - nichts zu speichern")
return
# Berechne Statistiken
num_charges = 0
num_charges_tomorrow = 0
total_energy = 0
total_price = 0
first_charge = None
first_charge_tomorrow = None
for s in schedule:
if s['action'] == 'charge':
num_charges += 1
total_energy += abs(s['power_w'])
total_price += s['price']
if first_charge is None:
first_charge = s['datetime']
if s.get('is_tomorrow', False):
num_charges_tomorrow += 1
if first_charge_tomorrow is None:
first_charge_tomorrow = s['datetime']
total_energy_kwh = total_energy / 1000
avg_price = total_price / num_charges if num_charges > 0 else 0
# Speichere als PyScript State
state.set(
'pyscript.battery_charging_schedule',
value='active',
new_attributes={
'schedule': schedule,
'last_update': get_local_now().isoformat(),
'num_hours': len(schedule),
'num_charges': num_charges,
'num_charges_tomorrow': num_charges_tomorrow,
'total_energy_kwh': round(total_energy_kwh, 2),
'avg_charge_price': round(avg_price, 2),
'first_charge_time': first_charge,
'first_charge_tomorrow': first_charge_tomorrow,
'has_tomorrow_data': has_tomorrow,
'estimated_savings': 0
}
)
log.info(f"✓ Ladeplan gespeichert: {len(schedule)} Stunden, {num_charges} Ladungen ({num_charges_tomorrow} morgen)")
# Status aktualisieren
status_parts = []
if num_charges > 0:
status_parts.append(f"{num_charges} Ladungen")
if num_charges_tomorrow > 0:
status_parts.append(f"({num_charges_tomorrow} morgen)")
else:
status_parts.append("Keine Ladung")
input_text.battery_optimizer_status = " ".join(status_parts)
def log_statistics(schedule, price_data, has_tomorrow):
"""Gibt Statistiken über den erstellten Plan aus"""
# Filterung
charges_today = []
charges_tomorrow = []
for s in schedule:
if s['action'] == 'charge':
if s.get('is_tomorrow', False):
charges_tomorrow.append(s)
else:
charges_today.append(s)
log.info(f"📊 Statistik:")
log.info(f" - Planungsfenster: {len(schedule)} Stunden")
log.info(f" - Tomorrow-Daten: {'✓ Ja' if has_tomorrow else '✗ Nein'}")
log.info(f" - Ladungen heute: {len(charges_today)}")
log.info(f" - Ladungen morgen: {len(charges_tomorrow)}")
if charges_today:
total_energy_today = 0
total_price_today = 0
for s in charges_today:
total_energy_today += abs(s['power_w'])
total_price_today += s['price']
total_energy_today = total_energy_today / 1000
avg_price_today = total_price_today / len(charges_today)
log.info(f" - Energie heute: {total_energy_today:.1f} kWh @ {avg_price_today:.2f} ct/kWh")
if charges_tomorrow:
total_energy_tomorrow = 0
total_price_tomorrow = 0
for s in charges_tomorrow:
total_energy_tomorrow += abs(s['power_w'])
total_price_tomorrow += s['price']
total_energy_tomorrow = total_energy_tomorrow / 1000
avg_price_tomorrow = total_price_tomorrow / len(charges_tomorrow)
log.info(f" - Energie morgen: {total_energy_tomorrow:.1f} kWh @ {avg_price_tomorrow:.2f} ct/kWh")
@service
def execute_charging_schedule():
"""
Führt den aktuellen Ladeplan aus (stündlich aufgerufen)
FIXED: Proper timezone-aware datetime comparison
"""
# Prüfe ob Optimierung aktiv ist
if state.get('input_boolean.battery_optimizer_enabled') != 'on':
return
# Prüfe auf manuelle Überschreibung
if state.get('input_boolean.battery_optimizer_manual_override') == 'on':
log.info("Manuelle Überschreibung aktiv - überspringe Ausführung")
return
# Lade Schedule
schedule_attr = state.getattr('pyscript.battery_charging_schedule')
if not schedule_attr or 'schedule' not in schedule_attr:
log.warning("Kein Ladeplan vorhanden")
return
schedule = schedule_attr['schedule']
# FIXED: Aktuelle Zeit in lokaler Timezone
now = get_local_now()
current_hour = now.hour
current_date = now.date()
log.info(f"Suche Ladeplan für {current_date} {current_hour}:00 Uhr (lokal)")
# Finde passenden Eintrag
current_entry = None
for entry in schedule:
# FIXED: Parse datetime mit timezone
entry_dt = datetime.fromisoformat(entry['datetime'])
# Wenn kein timezone info, füge Europe/Berlin hinzu
if entry_dt.tzinfo is None:
entry_dt = entry_dt.replace(tzinfo=TIMEZONE)
entry_date = entry_dt.date()
entry_hour = entry_dt.hour
# Exakter Match auf Datum + Stunde
if entry_date == current_date and entry_hour == current_hour:
current_entry = entry
log.info(f"✓ Gefunden: {entry['datetime']}")
break
if not current_entry:
log.warning(f"⚠ Keine Daten für {current_date} {current_hour}:00")
return
# Führe Aktion aus
action = current_entry['action']
power_w = current_entry['power_w']
price = current_entry['price']
reason = current_entry.get('reason', '')
is_tomorrow = current_entry.get('is_tomorrow', False)
day_str = "morgen" if is_tomorrow else "heute"
log.info(f"⚡ Stunde {current_hour}:00 [{day_str}]: action={action}, power={power_w}W, price={price:.2f}ct")
log.info(f" Grund: {reason}")
if action == 'charge':
log.info(f"🔋 AKTIVIERE LADEN mit {abs(power_w)}W")
# Setze Ziel-Leistung als NEGATIVEN Wert (Laden = negativ)
# Die Keep-Alive Automation liest diesen Wert und sendet ihn direkt an ess_set_power
charging_power = -abs(power_w) # Negativ für Laden!
input_number.charge_power_battery = float(charging_power)
log.info(f"⚡ Ladeleistung gesetzt: {charging_power}W")
# Aktiviere manuellen Modus
# Dies triggert die Automation "manuelle_speicherbeladung_aktivieren" die:
# 1. ESS auf REMOTE-Modus schaltet
# 2. Die Keep-Alive Automation aktiviert (sendet alle 30s Modbus-Befehl)
input_boolean.goodwe_manual_control = "on"
log.info("✓ Manuelles Laden aktiviert - Keep-Alive Automation übernimmt")
elif action == 'auto':
# Deaktiviere manuelles Laden
if state.get('input_boolean.goodwe_manual_control') == 'on':
log.info("🔄 DEAKTIVIERE manuelles Laden → Auto-Modus")
# Dies triggert die Automation "manuelle_speicherbeladung_deaktivieren" die:
# 1. Die Keep-Alive Automation deaktiviert
# 2. ESS auf INTERNAL-Modus zurückschaltet
input_boolean.goodwe_manual_control = "off"
log.info("✓ Auto-Modus aktiviert - OpenEMS steuert Batterie")
else:
log.info("✓ Auto-Modus bereits aktiv")
# ====================
# KEINE Zeit-Trigger im Script!
# ====================
# Trigger werden über Home Assistant Automations gesteuert:
# - Tägliche Berechnung um 14:05 → pyscript.calculate_charging_schedule
# - Stündliche Ausführung um xx:05 → pyscript.execute_charging_schedule
# - Nach HA-Neustart → pyscript.calculate_charging_schedule
# - Mitternacht (optional) → pyscript.calculate_charging_schedule
#
# Siehe: v3/battery_optimizer_automations.yaml