429 lines
14 KiB
Python
429 lines
14 KiB
Python
"""
|
|
Battery Charging Optimizer für OpenEMS + GoodWe
|
|
Optimiert die Batterieladung basierend auf Strompreisen und PV-Prognose
|
|
|
|
Speicherort: /config/pyscript/battery_charging_optimizer.py
|
|
"""
|
|
|
|
import json
|
|
from datetime import datetime, timedelta
|
|
|
|
@service
|
|
def calculate_charging_schedule():
|
|
"""
|
|
Berechnet den optimalen Ladeplan für die nächsten 24 Stunden
|
|
Berücksichtigt: Strompreise, PV-Prognose, Batterie-SOC, Verbrauch
|
|
"""
|
|
|
|
log.info("=== Batterie-Optimierung gestartet ===")
|
|
|
|
# Konfiguration laden
|
|
config = load_configuration()
|
|
if not config['enabled']:
|
|
log.info("Optimierung ist deaktiviert")
|
|
return
|
|
|
|
# Daten sammeln
|
|
price_data = get_price_data()
|
|
pv_forecast = get_pv_forecast()
|
|
battery_state = get_battery_state()
|
|
|
|
if not price_data:
|
|
log.error("Keine Strompreis-Daten verfügbar")
|
|
return
|
|
|
|
# Optimierung durchführen
|
|
schedule = optimize_charging_schedule(
|
|
price_data=price_data,
|
|
pv_forecast=pv_forecast,
|
|
battery_state=battery_state,
|
|
config=config
|
|
)
|
|
|
|
# Plan speichern
|
|
save_schedule(schedule)
|
|
|
|
# Statistiken loggen
|
|
log_schedule_statistics(schedule, price_data)
|
|
|
|
log.info("=== Optimierung abgeschlossen ===")
|
|
|
|
|
|
def load_configuration():
|
|
"""Lädt die Konfiguration aus den Input-Helpern"""
|
|
return {
|
|
'enabled': state.get('input_boolean.battery_optimizer_enabled') == 'on',
|
|
'min_soc': float(state.get('input_number.battery_optimizer_min_soc') or 20),
|
|
'max_soc': float(state.get('input_number.battery_optimizer_max_soc') or 100),
|
|
'price_threshold': float(state.get('input_number.battery_optimizer_price_threshold') or 28),
|
|
'max_charge_power': float(state.get('input_number.battery_optimizer_max_charge_power') or 10000),
|
|
'reserve_capacity': float(state.get('input_number.battery_optimizer_reserve_capacity') or 2),
|
|
'strategy': state.get('input_select.battery_optimizer_strategy') or 'Konservativ (nur sehr günstig)',
|
|
'battery_capacity': 10.0, # kWh
|
|
}
|
|
|
|
|
|
def get_price_data():
|
|
"""Holt die Strompreis-Daten für heute und morgen"""
|
|
prices_today = state.getattr('sensor.hastrom_flex_pro')['prices_today']
|
|
datetime_today = state.getattr('sensor.hastrom_flex_pro')['datetime_today']
|
|
|
|
if not prices_today or not datetime_today:
|
|
return None
|
|
|
|
# Kombiniere Datum und Preis
|
|
price_schedule = {}
|
|
for i, (dt_str, price) in enumerate(zip(datetime_today, prices_today)):
|
|
dt = datetime.fromisoformat(dt_str)
|
|
price_schedule[dt] = price
|
|
|
|
log.info(f"Strompreise geladen: {len(price_schedule)} Stunden")
|
|
|
|
return price_schedule
|
|
|
|
|
|
def get_pv_forecast():
|
|
"""Holt die PV-Prognose für beide Dächer (Ost + West)"""
|
|
|
|
# Tagesertrag Prognosen
|
|
pv_today_east = float(state.get('sensor.energy_production_today') or 0)
|
|
pv_tomorrow_east = float(state.get('sensor.energy_production_tomorrow') or 0)
|
|
pv_today_west = float(state.get('sensor.energy_production_today_2') or 0)
|
|
pv_tomorrow_west = float(state.get('sensor.energy_production_tomorrow_2') or 0)
|
|
|
|
pv_today_total = pv_today_east + pv_today_west
|
|
pv_tomorrow_total = pv_tomorrow_east + pv_tomorrow_west
|
|
|
|
log.info(f"PV-Prognose: Heute {pv_today_total:.1f} kWh, Morgen {pv_tomorrow_total:.1f} kWh")
|
|
|
|
# Vereinfachte stündliche Verteilung (kann später verfeinert werden)
|
|
# Annahme: Typische PV-Kurve mit Peak um 12-13 Uhr
|
|
hourly_distribution = create_pv_distribution(pv_today_total, pv_tomorrow_total)
|
|
|
|
return hourly_distribution
|
|
|
|
|
|
def create_pv_distribution(today_kwh, tomorrow_kwh):
|
|
"""
|
|
Erstellt eine vereinfachte stündliche PV-Verteilung
|
|
Basierend auf typischer Einstrahlungskurve
|
|
"""
|
|
distribution = {}
|
|
now = datetime.now()
|
|
|
|
# Verteilungsfaktoren für jede Stunde (0-23)
|
|
# Vereinfachte Gauß-Kurve mit Peak um 12 Uhr
|
|
factors = [
|
|
0.00, 0.00, 0.00, 0.00, 0.00, 0.01, # 0-5 Uhr
|
|
0.03, 0.06, 0.10, 0.14, 0.17, 0.18, # 6-11 Uhr
|
|
0.18, 0.17, 0.14, 0.10, 0.06, 0.03, # 12-17 Uhr
|
|
0.01, 0.00, 0.00, 0.00, 0.00, 0.00 # 18-23 Uhr
|
|
]
|
|
|
|
# Heute
|
|
for hour in range(24):
|
|
dt = datetime(now.year, now.month, now.day, hour, 0, 0)
|
|
if dt >= now:
|
|
distribution[dt] = today_kwh * factors[hour]
|
|
|
|
# Morgen
|
|
tomorrow = now + timedelta(days=1)
|
|
for hour in range(24):
|
|
dt = datetime(tomorrow.year, tomorrow.month, tomorrow.day, hour, 0, 0)
|
|
distribution[dt] = tomorrow_kwh * factors[hour]
|
|
|
|
return distribution
|
|
|
|
|
|
def get_battery_state():
|
|
"""Holt den aktuellen Batterie-Zustand"""
|
|
soc = float(state.get('sensor.battery_state_of_charge') or 0)
|
|
power = float(state.get('sensor.battery_power') or 0)
|
|
|
|
return {
|
|
'soc': soc,
|
|
'power': power,
|
|
'capacity_kwh': 10.0
|
|
}
|
|
|
|
|
|
def optimize_charging_schedule(price_data, pv_forecast, battery_state, config):
|
|
"""
|
|
Hauptoptimierungs-Algorithmus
|
|
|
|
Strategie: Konservativ (nur sehr günstig laden)
|
|
- Lade nur in den günstigsten Stunden
|
|
- Berücksichtige PV-Prognose (nicht laden wenn viel PV erwartet)
|
|
- Halte Reserve für Eigenverbrauch
|
|
"""
|
|
|
|
schedule = {}
|
|
|
|
# Sortiere Preise nach Höhe
|
|
sorted_prices = sorted(price_data.items(), key=lambda x: x[1])
|
|
|
|
# Berechne Schwellwert basierend auf Strategie
|
|
threshold = calculate_price_threshold(price_data, config)
|
|
|
|
log.info(f"Preis-Schwellwert: {threshold:.2f} ct/kWh")
|
|
|
|
# Aktuelle Batterie-Energie in kWh
|
|
current_energy_kwh = (battery_state['soc'] / 100.0) * config['battery_capacity']
|
|
|
|
# Simuliere die nächsten 24 Stunden
|
|
for dt, price in sorted(price_data.items()):
|
|
|
|
# Berücksichtige alle zukünftigen Stunden UND die aktuelle Stunde
|
|
# (auch wenn wir schon ein paar Minuten drin sind)
|
|
current_hour = datetime.now().replace(minute=0, second=0, microsecond=0)
|
|
if dt < current_hour:
|
|
continue # Nur vergangene Stunden überspringen
|
|
|
|
# PV-Prognose für diese Stunde
|
|
pv_kwh = pv_forecast.get(dt, 0)
|
|
|
|
# Entscheidung: Laden oder nicht?
|
|
action = 'auto'
|
|
power_w = 0
|
|
reason = []
|
|
|
|
# Prüfe ob Laden sinnvoll ist
|
|
if price <= threshold:
|
|
|
|
# Prüfe ob genug Kapazität vorhanden
|
|
max_capacity_kwh = (config['max_soc'] / 100.0) * config['battery_capacity']
|
|
available_capacity = max_capacity_kwh - current_energy_kwh - config['reserve_capacity']
|
|
|
|
if available_capacity > 0.5: # Mindestens 0.5 kWh Platz
|
|
|
|
# Prüfe PV-Prognose
|
|
if pv_kwh < 1.0: # Wenig PV erwartet
|
|
action = 'charge'
|
|
# Ladeleistung: Max oder was noch Platz hat
|
|
charge_kwh = min(available_capacity, config['max_charge_power'] / 1000.0)
|
|
power_w = -int(charge_kwh * 1000) # Negativ = Laden
|
|
current_energy_kwh += charge_kwh
|
|
reason.append(f"Günstiger Preis ({price:.2f} ct)")
|
|
reason.append(f"Wenig PV ({pv_kwh:.1f} kWh)")
|
|
else:
|
|
reason.append(f"Viel PV erwartet ({pv_kwh:.1f} kWh)")
|
|
else:
|
|
reason.append("Batterie bereits voll")
|
|
else:
|
|
reason.append(f"Preis zu hoch ({price:.2f} > {threshold:.2f} ct)")
|
|
|
|
schedule[dt.isoformat()] = {
|
|
'action': action,
|
|
'power_w': power_w,
|
|
'price': price,
|
|
'pv_forecast': pv_kwh,
|
|
'reason': ', '.join(reason)
|
|
}
|
|
|
|
return schedule
|
|
|
|
|
|
def calculate_price_threshold(price_data, config):
|
|
"""Berechnet den Preis-Schwellwert basierend auf Strategie"""
|
|
|
|
prices = list(price_data.values())
|
|
avg_price = sum(prices) / len(prices)
|
|
min_price = min(prices)
|
|
max_price = max(prices)
|
|
|
|
strategy = config['strategy']
|
|
|
|
if 'Konservativ' in strategy:
|
|
# Nur die günstigsten 20% der Stunden
|
|
threshold = min_price + (avg_price - min_price) * 0.3
|
|
|
|
elif 'Moderat' in strategy:
|
|
# Alle Stunden unter Durchschnitt
|
|
threshold = avg_price
|
|
|
|
elif 'Aggressiv' in strategy:
|
|
# Alles unter 70% des Durchschnitts
|
|
threshold = avg_price * 0.7
|
|
else:
|
|
# Fallback: Konfigurierter Schwellwert
|
|
threshold = config['price_threshold']
|
|
|
|
# Nie über den konfigurierten Max-Wert
|
|
threshold = min(threshold, config['price_threshold'])
|
|
|
|
return threshold
|
|
|
|
|
|
def save_schedule(schedule):
|
|
"""
|
|
Speichert den Ladeplan als PyScript State mit Attribut
|
|
|
|
PyScript kann States mit beliebig großen Attributen erstellen!
|
|
Kein 255-Zeichen Limit wie bei input_text.
|
|
"""
|
|
|
|
# Erstelle einen PyScript State mit dem Schedule als Attribut
|
|
state.set(
|
|
'pyscript.battery_charging_schedule',
|
|
value='active', # State-Wert (beliebig)
|
|
new_attributes={
|
|
'schedule': schedule, # Das komplette Schedule-Dict
|
|
'last_update': datetime.now().isoformat(),
|
|
'num_hours': len(schedule)
|
|
}
|
|
)
|
|
|
|
log.info(f"Ladeplan gespeichert: {len(schedule)} Stunden als Attribut")
|
|
|
|
|
|
def log_schedule_statistics(schedule, price_data):
|
|
"""Loggt Statistiken über den erstellten Plan"""
|
|
|
|
charging_hours = [h for h, d in schedule.items() if d['action'] == 'charge']
|
|
|
|
if charging_hours:
|
|
total_charge_kwh = sum([abs(d['power_w']) / 1000.0 for d in schedule.values() if d['action'] == 'charge'])
|
|
avg_charge_price = sum([price_data.get(datetime.fromisoformat(h), 0) for h in charging_hours]) / len(charging_hours)
|
|
|
|
log.info(f"Geplante Ladungen: {len(charging_hours)} Stunden")
|
|
log.info(f"Gesamte Lademenge: {total_charge_kwh:.1f} kWh")
|
|
log.info(f"Durchschnittspreis beim Laden: {avg_charge_price:.2f} ct/kWh")
|
|
log.info(f"Erste Ladung: {min(charging_hours)}")
|
|
else:
|
|
log.info("Keine Ladungen geplant")
|
|
|
|
|
|
@service
|
|
def execute_current_schedule():
|
|
"""
|
|
Führt den aktuellen Ladeplan für die aktuelle Stunde aus
|
|
Wird stündlich durch Automation aufgerufen
|
|
"""
|
|
|
|
# Prüfe ob manuelle Steuerung aktiv
|
|
if state.get('input_boolean.battery_optimizer_manual_override') == 'on':
|
|
log.info("Manuelle Steuerung aktiv - keine automatische Ausführung")
|
|
return
|
|
|
|
# Prüfe ob Optimierung aktiv
|
|
if state.get('input_boolean.battery_optimizer_enabled') != 'on':
|
|
log.info("Optimierung deaktiviert")
|
|
return
|
|
|
|
# Lade aktuellen Plan aus PyScript State Attribut
|
|
schedule = state.getattr('pyscript.battery_charging_schedule').get('schedule')
|
|
|
|
if not schedule:
|
|
log.warning("Kein Ladeplan vorhanden")
|
|
return
|
|
|
|
# Finde Eintrag für aktuelle Stunde
|
|
now = datetime.now()
|
|
current_hour = now.replace(minute=0, second=0, microsecond=0)
|
|
|
|
log.info(f"Suche Ladeplan für Stunde: {current_hour.isoformat()}")
|
|
|
|
# Suche nach passenden Zeitstempel
|
|
# Toleranz: -10 Minuten bis +50 Minuten (um ganze Stunde zu matchen)
|
|
hour_data = None
|
|
matched_hour = None
|
|
|
|
for hour_key, data in schedule.items():
|
|
try:
|
|
hour_dt = datetime.fromisoformat(hour_key)
|
|
time_diff = (hour_dt - current_hour).total_seconds()
|
|
|
|
# Match wenn innerhalb von -10min bis +50min
|
|
# (erlaubt Ausführung zwischen xx:00 und xx:50)
|
|
if -600 <= time_diff <= 3000:
|
|
hour_data = data
|
|
matched_hour = hour_key
|
|
log.info(f"Gefunden: {hour_key} (Abweichung: {time_diff/60:.1f} min)")
|
|
break
|
|
except Exception as e:
|
|
log.warning(f"Fehler beim Parsen von {hour_key}: {e}")
|
|
continue
|
|
|
|
if not hour_data:
|
|
log.info(f"Keine Daten für aktuelle Stunde {current_hour.isoformat()}")
|
|
log.debug(f"Verfügbare Stunden im Plan: {list(schedule.keys())}")
|
|
return
|
|
|
|
action = hour_data.get('action', 'auto')
|
|
power_w = hour_data.get('power_w', 0)
|
|
price = hour_data.get('price', 0)
|
|
reason = hour_data.get('reason', '')
|
|
|
|
log.info(f"Stunde {matched_hour}: Aktion={action}, Leistung={power_w}W, Preis={price} ct")
|
|
log.info(f"Grund: {reason}")
|
|
|
|
if action == 'charge' and power_w != 0:
|
|
# Aktiviere Laden
|
|
activate_charging(power_w)
|
|
else:
|
|
# Deaktiviere manuelle Steuerung, zurück zu Auto
|
|
deactivate_charging()
|
|
|
|
|
|
def activate_charging(power_w):
|
|
"""
|
|
Aktiviert das Batterieladen mit der angegebenen Leistung
|
|
|
|
Ablauf:
|
|
1. ESS → REMOTE Mode
|
|
2. Leistung über Modbus setzen (Register 706)
|
|
3. Status für Keep-Alive setzen
|
|
"""
|
|
|
|
log.info(f"Aktiviere Laden: {power_w}W")
|
|
|
|
try:
|
|
# 1. ESS in REMOTE Mode setzen
|
|
# WICHTIG: VOR dem Schreiben der Leistung!
|
|
service.call('rest_command', 'set_ess_remote_mode')
|
|
task.sleep(1.0) # Warte auf Modusänderung
|
|
|
|
# 2. Ladeleistung setzen über korrigierte Modbus-Funktion
|
|
service.call('pyscript', 'set_battery_power_modbus',
|
|
power_w=float(power_w),
|
|
hub="openEMS",
|
|
slave=1)
|
|
|
|
# 3. Status für Keep-Alive setzen
|
|
state.set('pyscript.battery_charging_active', True)
|
|
state.set('pyscript.battery_charging_power', power_w)
|
|
state.set('pyscript.battery_charging_hub', "openEMS")
|
|
state.set('pyscript.battery_charging_slave', 1)
|
|
|
|
log.info("Laden aktiviert (ESS in REMOTE Mode)")
|
|
|
|
except Exception as e:
|
|
log.error(f"Fehler beim Aktivieren: {e}")
|
|
|
|
|
|
def deactivate_charging():
|
|
"""
|
|
Deaktiviert manuelles Laden und aktiviert automatischen Betrieb
|
|
|
|
Ablauf:
|
|
1. ESS → INTERNAL Mode
|
|
2. Status zurücksetzen
|
|
"""
|
|
|
|
log.info("Deaktiviere manuelles Laden, aktiviere Auto-Modus")
|
|
|
|
try:
|
|
# 1. ESS zurück in INTERNAL Mode
|
|
# WICHTIG: Nach dem Laden, um wieder auf Automatik zu schalten!
|
|
service.call('rest_command', 'set_ess_internal_mode')
|
|
task.sleep(1.0) # Warte auf Modusänderung
|
|
|
|
# 2. Status zurücksetzen
|
|
state.set('pyscript.battery_charging_active', False)
|
|
state.set('pyscript.battery_charging_power', 0)
|
|
|
|
log.info("Auto-Modus aktiviert (ESS in INTERNAL Mode)")
|
|
|
|
except Exception as e:
|
|
log.error(f"Fehler beim Deaktivieren: {e}")
|