Files
battery-charging-optimizer/legacy/v1/battery_charging_optimizer.py

429 lines
14 KiB
Python

"""
Battery Charging Optimizer für OpenEMS + GoodWe
Optimiert die Batterieladung basierend auf Strompreisen und PV-Prognose
Speicherort: /config/pyscript/battery_charging_optimizer.py
"""
import json
from datetime import datetime, timedelta
@service
def calculate_charging_schedule():
"""
Berechnet den optimalen Ladeplan für die nächsten 24 Stunden
Berücksichtigt: Strompreise, PV-Prognose, Batterie-SOC, Verbrauch
"""
log.info("=== Batterie-Optimierung gestartet ===")
# Konfiguration laden
config = load_configuration()
if not config['enabled']:
log.info("Optimierung ist deaktiviert")
return
# Daten sammeln
price_data = get_price_data()
pv_forecast = get_pv_forecast()
battery_state = get_battery_state()
if not price_data:
log.error("Keine Strompreis-Daten verfügbar")
return
# Optimierung durchführen
schedule = optimize_charging_schedule(
price_data=price_data,
pv_forecast=pv_forecast,
battery_state=battery_state,
config=config
)
# Plan speichern
save_schedule(schedule)
# Statistiken loggen
log_schedule_statistics(schedule, price_data)
log.info("=== Optimierung abgeschlossen ===")
def load_configuration():
"""Lädt die Konfiguration aus den Input-Helpern"""
return {
'enabled': state.get('input_boolean.battery_optimizer_enabled') == 'on',
'min_soc': float(state.get('input_number.battery_optimizer_min_soc') or 20),
'max_soc': float(state.get('input_number.battery_optimizer_max_soc') or 100),
'price_threshold': float(state.get('input_number.battery_optimizer_price_threshold') or 28),
'max_charge_power': float(state.get('input_number.battery_optimizer_max_charge_power') or 10000),
'reserve_capacity': float(state.get('input_number.battery_optimizer_reserve_capacity') or 2),
'strategy': state.get('input_select.battery_optimizer_strategy') or 'Konservativ (nur sehr günstig)',
'battery_capacity': 10.0, # kWh
}
def get_price_data():
"""Holt die Strompreis-Daten für heute und morgen"""
prices_today = state.getattr('sensor.hastrom_flex_pro')['prices_today']
datetime_today = state.getattr('sensor.hastrom_flex_pro')['datetime_today']
if not prices_today or not datetime_today:
return None
# Kombiniere Datum und Preis
price_schedule = {}
for i, (dt_str, price) in enumerate(zip(datetime_today, prices_today)):
dt = datetime.fromisoformat(dt_str)
price_schedule[dt] = price
log.info(f"Strompreise geladen: {len(price_schedule)} Stunden")
return price_schedule
def get_pv_forecast():
"""Holt die PV-Prognose für beide Dächer (Ost + West)"""
# Tagesertrag Prognosen
pv_today_east = float(state.get('sensor.energy_production_today') or 0)
pv_tomorrow_east = float(state.get('sensor.energy_production_tomorrow') or 0)
pv_today_west = float(state.get('sensor.energy_production_today_2') or 0)
pv_tomorrow_west = float(state.get('sensor.energy_production_tomorrow_2') or 0)
pv_today_total = pv_today_east + pv_today_west
pv_tomorrow_total = pv_tomorrow_east + pv_tomorrow_west
log.info(f"PV-Prognose: Heute {pv_today_total:.1f} kWh, Morgen {pv_tomorrow_total:.1f} kWh")
# Vereinfachte stündliche Verteilung (kann später verfeinert werden)
# Annahme: Typische PV-Kurve mit Peak um 12-13 Uhr
hourly_distribution = create_pv_distribution(pv_today_total, pv_tomorrow_total)
return hourly_distribution
def create_pv_distribution(today_kwh, tomorrow_kwh):
"""
Erstellt eine vereinfachte stündliche PV-Verteilung
Basierend auf typischer Einstrahlungskurve
"""
distribution = {}
now = datetime.now()
# Verteilungsfaktoren für jede Stunde (0-23)
# Vereinfachte Gauß-Kurve mit Peak um 12 Uhr
factors = [
0.00, 0.00, 0.00, 0.00, 0.00, 0.01, # 0-5 Uhr
0.03, 0.06, 0.10, 0.14, 0.17, 0.18, # 6-11 Uhr
0.18, 0.17, 0.14, 0.10, 0.06, 0.03, # 12-17 Uhr
0.01, 0.00, 0.00, 0.00, 0.00, 0.00 # 18-23 Uhr
]
# Heute
for hour in range(24):
dt = datetime(now.year, now.month, now.day, hour, 0, 0)
if dt >= now:
distribution[dt] = today_kwh * factors[hour]
# Morgen
tomorrow = now + timedelta(days=1)
for hour in range(24):
dt = datetime(tomorrow.year, tomorrow.month, tomorrow.day, hour, 0, 0)
distribution[dt] = tomorrow_kwh * factors[hour]
return distribution
def get_battery_state():
"""Holt den aktuellen Batterie-Zustand"""
soc = float(state.get('sensor.battery_state_of_charge') or 0)
power = float(state.get('sensor.battery_power') or 0)
return {
'soc': soc,
'power': power,
'capacity_kwh': 10.0
}
def optimize_charging_schedule(price_data, pv_forecast, battery_state, config):
"""
Hauptoptimierungs-Algorithmus
Strategie: Konservativ (nur sehr günstig laden)
- Lade nur in den günstigsten Stunden
- Berücksichtige PV-Prognose (nicht laden wenn viel PV erwartet)
- Halte Reserve für Eigenverbrauch
"""
schedule = {}
# Sortiere Preise nach Höhe
sorted_prices = sorted(price_data.items(), key=lambda x: x[1])
# Berechne Schwellwert basierend auf Strategie
threshold = calculate_price_threshold(price_data, config)
log.info(f"Preis-Schwellwert: {threshold:.2f} ct/kWh")
# Aktuelle Batterie-Energie in kWh
current_energy_kwh = (battery_state['soc'] / 100.0) * config['battery_capacity']
# Simuliere die nächsten 24 Stunden
for dt, price in sorted(price_data.items()):
# Berücksichtige alle zukünftigen Stunden UND die aktuelle Stunde
# (auch wenn wir schon ein paar Minuten drin sind)
current_hour = datetime.now().replace(minute=0, second=0, microsecond=0)
if dt < current_hour:
continue # Nur vergangene Stunden überspringen
# PV-Prognose für diese Stunde
pv_kwh = pv_forecast.get(dt, 0)
# Entscheidung: Laden oder nicht?
action = 'auto'
power_w = 0
reason = []
# Prüfe ob Laden sinnvoll ist
if price <= threshold:
# Prüfe ob genug Kapazität vorhanden
max_capacity_kwh = (config['max_soc'] / 100.0) * config['battery_capacity']
available_capacity = max_capacity_kwh - current_energy_kwh - config['reserve_capacity']
if available_capacity > 0.5: # Mindestens 0.5 kWh Platz
# Prüfe PV-Prognose
if pv_kwh < 1.0: # Wenig PV erwartet
action = 'charge'
# Ladeleistung: Max oder was noch Platz hat
charge_kwh = min(available_capacity, config['max_charge_power'] / 1000.0)
power_w = -int(charge_kwh * 1000) # Negativ = Laden
current_energy_kwh += charge_kwh
reason.append(f"Günstiger Preis ({price:.2f} ct)")
reason.append(f"Wenig PV ({pv_kwh:.1f} kWh)")
else:
reason.append(f"Viel PV erwartet ({pv_kwh:.1f} kWh)")
else:
reason.append("Batterie bereits voll")
else:
reason.append(f"Preis zu hoch ({price:.2f} > {threshold:.2f} ct)")
schedule[dt.isoformat()] = {
'action': action,
'power_w': power_w,
'price': price,
'pv_forecast': pv_kwh,
'reason': ', '.join(reason)
}
return schedule
def calculate_price_threshold(price_data, config):
"""Berechnet den Preis-Schwellwert basierend auf Strategie"""
prices = list(price_data.values())
avg_price = sum(prices) / len(prices)
min_price = min(prices)
max_price = max(prices)
strategy = config['strategy']
if 'Konservativ' in strategy:
# Nur die günstigsten 20% der Stunden
threshold = min_price + (avg_price - min_price) * 0.3
elif 'Moderat' in strategy:
# Alle Stunden unter Durchschnitt
threshold = avg_price
elif 'Aggressiv' in strategy:
# Alles unter 70% des Durchschnitts
threshold = avg_price * 0.7
else:
# Fallback: Konfigurierter Schwellwert
threshold = config['price_threshold']
# Nie über den konfigurierten Max-Wert
threshold = min(threshold, config['price_threshold'])
return threshold
def save_schedule(schedule):
"""
Speichert den Ladeplan als PyScript State mit Attribut
PyScript kann States mit beliebig großen Attributen erstellen!
Kein 255-Zeichen Limit wie bei input_text.
"""
# Erstelle einen PyScript State mit dem Schedule als Attribut
state.set(
'pyscript.battery_charging_schedule',
value='active', # State-Wert (beliebig)
new_attributes={
'schedule': schedule, # Das komplette Schedule-Dict
'last_update': datetime.now().isoformat(),
'num_hours': len(schedule)
}
)
log.info(f"Ladeplan gespeichert: {len(schedule)} Stunden als Attribut")
def log_schedule_statistics(schedule, price_data):
"""Loggt Statistiken über den erstellten Plan"""
charging_hours = [h for h, d in schedule.items() if d['action'] == 'charge']
if charging_hours:
total_charge_kwh = sum([abs(d['power_w']) / 1000.0 for d in schedule.values() if d['action'] == 'charge'])
avg_charge_price = sum([price_data.get(datetime.fromisoformat(h), 0) for h in charging_hours]) / len(charging_hours)
log.info(f"Geplante Ladungen: {len(charging_hours)} Stunden")
log.info(f"Gesamte Lademenge: {total_charge_kwh:.1f} kWh")
log.info(f"Durchschnittspreis beim Laden: {avg_charge_price:.2f} ct/kWh")
log.info(f"Erste Ladung: {min(charging_hours)}")
else:
log.info("Keine Ladungen geplant")
@service
def execute_current_schedule():
"""
Führt den aktuellen Ladeplan für die aktuelle Stunde aus
Wird stündlich durch Automation aufgerufen
"""
# Prüfe ob manuelle Steuerung aktiv
if state.get('input_boolean.battery_optimizer_manual_override') == 'on':
log.info("Manuelle Steuerung aktiv - keine automatische Ausführung")
return
# Prüfe ob Optimierung aktiv
if state.get('input_boolean.battery_optimizer_enabled') != 'on':
log.info("Optimierung deaktiviert")
return
# Lade aktuellen Plan aus PyScript State Attribut
schedule = state.getattr('pyscript.battery_charging_schedule').get('schedule')
if not schedule:
log.warning("Kein Ladeplan vorhanden")
return
# Finde Eintrag für aktuelle Stunde
now = datetime.now()
current_hour = now.replace(minute=0, second=0, microsecond=0)
log.info(f"Suche Ladeplan für Stunde: {current_hour.isoformat()}")
# Suche nach passenden Zeitstempel
# Toleranz: -10 Minuten bis +50 Minuten (um ganze Stunde zu matchen)
hour_data = None
matched_hour = None
for hour_key, data in schedule.items():
try:
hour_dt = datetime.fromisoformat(hour_key)
time_diff = (hour_dt - current_hour).total_seconds()
# Match wenn innerhalb von -10min bis +50min
# (erlaubt Ausführung zwischen xx:00 und xx:50)
if -600 <= time_diff <= 3000:
hour_data = data
matched_hour = hour_key
log.info(f"Gefunden: {hour_key} (Abweichung: {time_diff/60:.1f} min)")
break
except Exception as e:
log.warning(f"Fehler beim Parsen von {hour_key}: {e}")
continue
if not hour_data:
log.info(f"Keine Daten für aktuelle Stunde {current_hour.isoformat()}")
log.debug(f"Verfügbare Stunden im Plan: {list(schedule.keys())}")
return
action = hour_data.get('action', 'auto')
power_w = hour_data.get('power_w', 0)
price = hour_data.get('price', 0)
reason = hour_data.get('reason', '')
log.info(f"Stunde {matched_hour}: Aktion={action}, Leistung={power_w}W, Preis={price} ct")
log.info(f"Grund: {reason}")
if action == 'charge' and power_w != 0:
# Aktiviere Laden
activate_charging(power_w)
else:
# Deaktiviere manuelle Steuerung, zurück zu Auto
deactivate_charging()
def activate_charging(power_w):
"""
Aktiviert das Batterieladen mit der angegebenen Leistung
Ablauf:
1. ESS → REMOTE Mode
2. Leistung über Modbus setzen (Register 706)
3. Status für Keep-Alive setzen
"""
log.info(f"Aktiviere Laden: {power_w}W")
try:
# 1. ESS in REMOTE Mode setzen
# WICHTIG: VOR dem Schreiben der Leistung!
service.call('rest_command', 'set_ess_remote_mode')
task.sleep(1.0) # Warte auf Modusänderung
# 2. Ladeleistung setzen über korrigierte Modbus-Funktion
service.call('pyscript', 'set_battery_power_modbus',
power_w=float(power_w),
hub="openEMS",
slave=1)
# 3. Status für Keep-Alive setzen
state.set('pyscript.battery_charging_active', True)
state.set('pyscript.battery_charging_power', power_w)
state.set('pyscript.battery_charging_hub', "openEMS")
state.set('pyscript.battery_charging_slave', 1)
log.info("Laden aktiviert (ESS in REMOTE Mode)")
except Exception as e:
log.error(f"Fehler beim Aktivieren: {e}")
def deactivate_charging():
"""
Deaktiviert manuelles Laden und aktiviert automatischen Betrieb
Ablauf:
1. ESS → INTERNAL Mode
2. Status zurücksetzen
"""
log.info("Deaktiviere manuelles Laden, aktiviere Auto-Modus")
try:
# 1. ESS zurück in INTERNAL Mode
# WICHTIG: Nach dem Laden, um wieder auf Automatik zu schalten!
service.call('rest_command', 'set_ess_internal_mode')
task.sleep(1.0) # Warte auf Modusänderung
# 2. Status zurücksetzen
state.set('pyscript.battery_charging_active', False)
state.set('pyscript.battery_charging_power', 0)
log.info("Auto-Modus aktiviert (ESS in INTERNAL Mode)")
except Exception as e:
log.error(f"Fehler beim Deaktivieren: {e}")