Files
battery-charging-optimizer/legacy/v2/battery_optimizer.py

552 lines
19 KiB
Python

"""
Battery Charging Optimizer für OpenEMS + GoodWe
Nutzt das bestehende manuelle Steuerungssystem
Speicherort: /config/pyscript/battery_optimizer.py
Version: 2.0.0
"""
import json
from datetime import datetime, timedelta
@service
def calculate_charging_schedule():
"""
Berechnet den optimalen Ladeplan für die nächsten 24-36 Stunden
Wird täglich um 14:05 Uhr nach Strompreis-Update ausgeführt
"""
log.info("=== Batterie-Optimierung gestartet ===")
# Prüfe ob Optimierung aktiviert ist
if state.get('input_boolean.battery_optimizer_enabled') != 'on':
log.info("Optimierung ist deaktiviert")
input_text.battery_optimizer_status = "Deaktiviert"
return
try:
# Konfiguration laden
config = load_configuration()
log.info(f"Konfiguration geladen: SOC {config['min_soc']}-{config['max_soc']}%, Max {config['max_charge_power']}W")
# Strompreise laden
price_data = get_electricity_prices()
if not price_data:
log.error("Keine Strompreis-Daten verfügbar")
input_text.battery_optimizer_status = "Fehler: Keine Preisdaten"
return
log.info(f"Strompreise geladen: {len(price_data)} Stunden")
# PV-Prognose laden
pv_forecast = get_pv_forecast()
pv_today = pv_forecast.get('today', 0)
pv_tomorrow = pv_forecast.get('tomorrow', 0)
log.info(f"PV-Prognose: Heute {pv_today} kWh, Morgen {pv_tomorrow} kWh")
# Batterie-Status laden
current_soc = float(state.get('sensor.openems_ess0_soc') or 50)
log.info(f"Aktueller SOC: {current_soc}%")
# Optimierung durchführen
schedule = optimize_charging(
price_data=price_data,
pv_forecast=pv_forecast,
current_soc=current_soc,
config=config
)
# Plan speichern
save_schedule(schedule)
# Statistiken ausgeben
log_statistics(schedule, price_data)
log.info("=== Optimierung abgeschlossen ===")
except Exception as e:
log.error(f"Fehler bei Optimierung: {e}")
input_text.battery_optimizer_status = f"Fehler: {str(e)[:100]}"
def load_configuration():
"""Lädt alle Konfigurations-Parameter aus Input Helpern"""
return {
'battery_capacity': float(state.get('input_number.battery_capacity_kwh') or 10) * 1000, # in Wh
'min_soc': float(state.get('input_number.battery_optimizer_min_soc') or 20),
'max_soc': float(state.get('input_number.battery_optimizer_max_soc') or 100),
'max_charge_power': float(state.get('input_number.battery_optimizer_max_charge_power') or 5000),
'price_threshold': float(state.get('input_number.battery_optimizer_price_threshold') or 28),
'reserve_capacity': float(state.get('input_number.battery_optimizer_reserve_capacity') or 2) * 1000, # in Wh
'pv_threshold': float(state.get('input_number.battery_optimizer_pv_threshold') or 500), # in Wh
}
def get_electricity_prices():
"""
Holt Strompreise von haStrom FLEX PRO
Erwartet Attribute 'prices_today', 'datetime_today' (und optional tomorrow)
"""
from datetime import datetime
price_entity = 'sensor.hastrom_flex_pro'
prices_attr = state.getattr(price_entity)
if not prices_attr:
log.error(f"Sensor {price_entity} nicht gefunden")
return None
# Heute
prices_today = prices_attr.get('prices_today', [])
datetime_today = prices_attr.get('datetime_today', [])
# Morgen (falls verfügbar)
prices_tomorrow = prices_attr.get('prices_tomorrow', [])
datetime_tomorrow = prices_attr.get('datetime_tomorrow', [])
if not prices_today or not datetime_today:
log.error(f"Keine Preis-Daten in {price_entity} (prices_today oder datetime_today fehlt)")
return None
if len(prices_today) != len(datetime_today):
log.error(f"Preis-Array und DateTime-Array haben unterschiedliche Längen")
return None
# Konvertiere zu einheitlichem Format
price_data = []
# Heute
for i, price in enumerate(prices_today):
try:
# datetime_today enthält Strings wie "2025-11-09 00:00:00"
dt_str = datetime_today[i]
dt = datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S")
price_data.append({
'datetime': dt,
'hour': dt.hour,
'date': dt.date(),
'price': float(price)
})
except Exception as e:
log.warning(f"Fehler beim Verarbeiten von Preis {i}: {e}")
continue
# Morgen (falls vorhanden)
if prices_tomorrow and datetime_tomorrow and len(prices_tomorrow) == len(datetime_tomorrow):
for i, price in enumerate(prices_tomorrow):
try:
dt_str = datetime_tomorrow[i]
dt = datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S")
price_data.append({
'datetime': dt,
'hour': dt.hour,
'date': dt.date(),
'price': float(price)
})
except Exception as e:
log.warning(f"Fehler beim Verarbeiten von Morgen-Preis {i}: {e}")
continue
return price_data
def get_pv_forecast():
"""
Holt PV-Prognose von Forecast.Solar (Ost + West Array)
"""
# Sensor-Namen für deine beiden Arrays
sensor_east = 'sensor.energy_production_today'
sensor_west = 'sensor.energy_production_today_2'
sensor_east_tomorrow = 'sensor.energy_production_tomorrow'
sensor_west_tomorrow = 'sensor.energy_production_tomorrow_2'
# Heute
east_today_attr = state.getattr(sensor_east) or {}
west_today_attr = state.getattr(sensor_west) or {}
pv_today = (float(east_today_attr.get('wh_period', 0)) +
float(west_today_attr.get('wh_period', 0)))
# Morgen
east_tomorrow_attr = state.getattr(sensor_east_tomorrow) or {}
west_tomorrow_attr = state.getattr(sensor_west_tomorrow) or {}
pv_tomorrow = (float(east_tomorrow_attr.get('wh_period', 0)) +
float(west_tomorrow_attr.get('wh_period', 0)))
# Stündliche Werte kombinieren
pv_wh_per_hour = {}
# Ost-Array
for hour_str, wh in (east_today_attr.get('wh_hours', {}) or {}).items():
pv_wh_per_hour[int(hour_str)] = wh
# West-Array addieren
for hour_str, wh in (west_today_attr.get('wh_hours', {}) or {}).items():
hour = int(hour_str)
pv_wh_per_hour[hour] = pv_wh_per_hour.get(hour, 0) + wh
# Morgen-Werte (24+ Stunden)
for hour_str, wh in (east_tomorrow_attr.get('wh_hours', {}) or {}).items():
hour = int(hour_str) + 24
pv_wh_per_hour[hour] = wh
for hour_str, wh in (west_tomorrow_attr.get('wh_hours', {}) or {}).items():
hour = int(hour_str) + 24
pv_wh_per_hour[hour] = pv_wh_per_hour.get(hour, 0) + wh
return {
'today': pv_today / 1000, # in kWh
'tomorrow': pv_tomorrow / 1000, # in kWh
'hourly': pv_wh_per_hour # in Wh per Stunde
}
def optimize_charging(price_data, pv_forecast, current_soc, config):
"""
Kern-Optimierungs-Algorithmus
Strategie:
1. Finde Stunden mit niedrigen Preisen UND wenig PV
2. Berechne verfügbare Ladekapazität
3. Erstelle Ladeplan für günstigste Stunden
"""
# Berechne dynamischen Preis-Schwellwert (90. Perzentil)
all_prices = [p['price'] for p in price_data]
all_prices.sort()
threshold_index = int(len(all_prices) * 0.9)
price_threshold = all_prices[threshold_index] if all_prices else config['price_threshold']
log.info(f"Preis-Schwellwert: {price_threshold:.2f} ct/kWh")
# Verfügbare Ladekapazität berechnen
available_capacity_wh = (config['max_soc'] - current_soc) / 100 * config['battery_capacity']
available_capacity_wh -= config['reserve_capacity'] # Reserve abziehen
if available_capacity_wh <= 0:
log.info("Batterie ist voll oder Reserve erreicht - keine Ladung nötig")
# Erstelle Plan nur mit "auto" Einträgen
return create_auto_only_schedule(price_data)
log.info(f"Verfügbare Ladekapazität: {available_capacity_wh/1000:.2f} kWh")
# Finde günstige Lade-Gelegenheiten
charging_opportunities = []
current_hour = datetime.now().hour
current_date = datetime.now().date()
for price_hour in price_data:
hour = price_hour['hour']
hour_date = price_hour['date']
price = price_hour['price']
# Berechne absolute Stunde (0-47 für heute+morgen)
if hour_date == current_date:
abs_hour = hour
elif hour_date > current_date:
abs_hour = hour + 24
else:
continue # Vergangenheit ignorieren
# Nur zukünftige Stunden
if abs_hour < current_hour:
continue
# PV-Prognose für diese Stunde
pv_wh = pv_forecast['hourly'].get(abs_hour, 0)
# Kriterien: Günstiger Preis UND wenig PV
if price < price_threshold and pv_wh < config['pv_threshold']:
charging_opportunities.append({
'datetime': price_hour['datetime'],
'hour': hour,
'abs_hour': abs_hour,
'price': price,
'pv_wh': pv_wh,
'score': price - (pv_wh / 1000) # Je niedriger, desto besser
})
# Sortiere nach Score (beste zuerst)
charging_opportunities.sort(key=lambda x: x['score'])
log.info(f"Gefundene Lade-Gelegenheiten: {len(charging_opportunities)}")
# Erstelle Ladeplan
schedule = []
remaining_capacity = available_capacity_wh
total_charge_energy = 0
total_charge_cost = 0
for price_hour in price_data:
hour = price_hour['hour']
abs_hour = price_hour['hour']
hour_date = price_hour['date']
# Berechne absolute Stunde
if hour_date == current_date:
abs_hour = hour
elif hour_date > current_date:
abs_hour = hour + 24
else:
continue
# Nur zukünftige Stunden (inkl. aktuelle!)
if abs_hour < current_hour:
continue
# Prüfe ob diese Stunde zum Laden vorgesehen ist
should_charge = False
charge_opportunity = None
for opp in charging_opportunities:
if opp['abs_hour'] == abs_hour:
should_charge = True
charge_opportunity = opp
break
if should_charge and remaining_capacity > 0:
# Ladeleistung berechnen
charge_wh = min(config['max_charge_power'], remaining_capacity)
schedule.append({
'datetime': price_hour['datetime'].isoformat(),
'hour': hour,
'action': 'charge',
'power_w': -int(charge_wh), # Negativ = Laden
'price': price_hour['price'],
'pv_wh': charge_opportunity['pv_wh'],
'reason': f"Günstig ({price_hour['price']:.2f} ct) + wenig PV ({charge_opportunity['pv_wh']} Wh)"
})
remaining_capacity -= charge_wh
total_charge_energy += charge_wh / 1000 # kWh
total_charge_cost += (charge_wh / 1000) * (price_hour['price'] / 100) # Euro
else:
# Auto-Modus (Standard-Betrieb)
reason = "Automatik"
if not should_charge and abs_hour < current_hour + 24:
if price_hour['price'] >= price_threshold:
reason = f"Preis zu hoch ({price_hour['price']:.2f} > {price_threshold:.2f} ct)"
else:
pv_wh = pv_forecast['hourly'].get(abs_hour, 0)
if pv_wh >= config['pv_threshold']:
reason = f"Genug PV ({pv_wh} Wh)"
schedule.append({
'datetime': price_hour['datetime'].isoformat(),
'hour': hour,
'action': 'auto',
'power_w': 0,
'price': price_hour['price'],
'reason': reason
})
# Berechne Anzahl Ladungen für Log
num_charges = 0
for s in schedule:
if s['action'] == 'charge':
num_charges += 1
log.info(f"Ladeplan erstellt: {len(schedule)} Stunden, davon {num_charges} Ladungen")
log.info(f"Gesamte Ladeenergie: {total_charge_energy:.2f} kWh, Kosten: {total_charge_cost:.2f}")
return schedule
def create_auto_only_schedule(price_data):
"""Erstellt einen Plan nur mit Auto-Modus (keine Ladung)"""
schedule = []
current_hour = datetime.now().hour
for price_hour in price_data:
if price_hour['hour'] >= current_hour:
schedule.append({
'datetime': price_hour['datetime'].isoformat(),
'hour': price_hour['hour'],
'action': 'auto',
'power_w': 0,
'price': price_hour['price'],
'reason': "Keine Ladung nötig (Batterie voll)"
})
return schedule
def save_schedule(schedule):
"""
Speichert den Schedule als PyScript State mit Attributen
"""
if not schedule:
log.warning("Leerer Schedule - nichts zu speichern")
return
# Berechne Statistiken
num_charges = 0
total_energy = 0
total_price = 0
first_charge = None
for s in schedule:
if s['action'] == 'charge':
num_charges += 1
total_energy += abs(s['power_w'])
total_price += s['price']
if first_charge is None:
first_charge = s['datetime']
total_energy = total_energy / 1000 # kWh
avg_price = total_price / num_charges if num_charges > 0 else 0
# Speichere als PyScript State
state.set(
'pyscript.battery_charging_schedule',
value='active',
new_attributes={
'schedule': schedule,
'last_update': datetime.now().isoformat(),
'num_hours': len(schedule),
'num_charges': num_charges,
'total_energy_kwh': round(total_energy, 2),
'avg_charge_price': round(avg_price, 2),
'first_charge_time': first_charge,
'estimated_savings': 0 # Wird später berechnet
}
)
log.info(f"Ladeplan gespeichert: {len(schedule)} Stunden als Attribut")
# Status aktualisieren
if num_charges > 0:
input_text.battery_optimizer_status = f"{num_charges} Ladungen geplant, ab {first_charge}"
else:
input_text.battery_optimizer_status = "Keine Ladung nötig"
def log_statistics(schedule, price_data):
"""Gibt Statistiken über den erstellten Plan aus"""
# Filter charges
charges = []
for s in schedule:
if s['action'] == 'charge':
charges.append(s)
if not charges:
log.info("Keine Ladungen geplant")
return
total_energy = 0
total_price = 0
for s in charges:
total_energy += abs(s['power_w'])
total_price += s['price']
total_energy = total_energy / 1000 # kWh
avg_price = total_price / len(charges)
first_charge = charges[0]['datetime']
log.info(f"Geplante Ladungen: {len(charges)} Stunden")
log.info(f"Gesamte Lademenge: {total_energy:.1f} kWh")
log.info(f"Durchschnittspreis beim Laden: {avg_price:.2f} ct/kWh")
log.info(f"Erste Ladung: {first_charge}")
@service
def execute_charging_schedule():
"""
Führt den aktuellen Ladeplan aus (stündlich aufgerufen)
Nutzt das bestehende manuelle Steuerungs-System
"""
# Prüfe ob Optimierung aktiv ist
if state.get('input_boolean.battery_optimizer_enabled') != 'on':
return
# Prüfe auf manuelle Überschreibung
if state.get('input_boolean.battery_optimizer_manual_override') == 'on':
log.info("Manuelle Überschreibung aktiv - überspringe Ausführung")
return
# Lade Schedule
schedule_attr = state.getattr('pyscript.battery_charging_schedule')
if not schedule_attr or 'schedule' not in schedule_attr:
log.warning("Kein Ladeplan vorhanden")
return
schedule = schedule_attr['schedule']
# Aktuelle Stunde ermitteln
now = datetime.now()
current_hour_dt = now.replace(minute=0, second=0, microsecond=0)
log.info(f"Suche Ladeplan für Stunde: {current_hour_dt.isoformat()}")
# Finde passenden Eintrag im Schedule
current_entry = None
for entry in schedule:
entry_dt = datetime.fromisoformat(entry['datetime'])
entry_hour = entry_dt.replace(minute=0, second=0, microsecond=0)
# Prüfe ob diese Stunde passt (mit 30 Min Toleranz)
time_diff = abs((entry_hour - current_hour_dt).total_seconds() / 60)
if time_diff < 30: # Innerhalb 30 Minuten
current_entry = entry
log.info(f"Gefunden: {entry_dt.isoformat()} (Abweichung: {time_diff:.0f} min)")
break
if not current_entry:
log.warning(f"Keine Daten für aktuelle Stunde {current_hour_dt.hour}:00")
return
# Führe Aktion aus
action = current_entry['action']
power_w = current_entry['power_w']
price = current_entry['price']
reason = current_entry.get('reason', '')
log.info(f"Stunde {current_hour_dt.isoformat()}: Aktion={action}, Leistung={power_w}W, Preis={price:.2f} ct")
log.info(f"Grund: {reason}")
if action == 'charge':
# Aktiviere Laden über bestehendes System
log.info(f"Aktiviere Laden mit {power_w}W")
# Setze Ziel-Leistung
input_number.charge_power_battery = float(power_w)
# Aktiviere manuellen Modus (triggert deine Automationen)
input_boolean.goodwe_manual_control = "on"
log.info("Manuelles Laden aktiviert")
elif action == 'auto':
# Deaktiviere manuelles Laden, zurück zu Auto-Modus
if state.get('input_boolean.goodwe_manual_control') == 'on':
log.info("Deaktiviere manuelles Laden, aktiviere Auto-Modus")
input_boolean.goodwe_manual_control = "off"
else:
log.info("Auto-Modus bereits aktiv")
# ====================
# Zeit-Trigger
# ====================
@time_trigger("cron(5 14 * * *)")
def daily_optimization():
"""Tägliche Berechnung um 14:05 Uhr (nach haStrom Preis-Update)"""
log.info("=== Tägliche Optimierungs-Berechnung gestartet ===")
pyscript.calculate_charging_schedule()
@time_trigger("cron(5 * * * *)")
def hourly_execution():
"""Stündliche Ausführung des Plans um xx:05 Uhr"""
pyscript.execute_charging_schedule()