Files
battery-charging-optimizer/legacy/v1/PHASE2_INFLUXDB.md

11 KiB

Phase 2: InfluxDB Integration - Roadmap

Ziel

Nutzung historischer Verbrauchsdaten aus InfluxDB2 für:

  • Bessere Verbrauchsprognosen
  • Optimierte Ladeplanung
  • Lernender Algorithmus

Datenquellen in InfluxDB

Zu analysierende Daten

Verbrauch

  • sensor.house_consumption (Hausverbrauch in W)
  • sensor.totay_load (Tages-Gesamtverbrauch)
  • sensor.bought_from_grid_today (Netzbezug)

Erzeugung

  • sensor.pv_power (PV-Leistung)
  • sensor.today_s_pv_generation (Tagesertrag)

Batterie

  • sensor.battery_power (Ladung/Entladung)
  • sensor.battery_state_of_charge (SOC)
  • sensor.today_battery_charge (Geladen heute)
  • sensor.today_battery_discharge (Entladen heute)

Netz

  • sensor.gw_netzbezug (Bezug)
  • sensor.gw_netzeinspeisung (Einspeisung)

Implementierungsschritte

Schritt 1: InfluxDB Verbindung in PyScript

"""
InfluxDB Connector für historische Daten
Speicherort: /config/pyscript/influxdb_connector.py
"""

from influxdb_client import InfluxDBClient
from datetime import datetime, timedelta

# Konfiguration (später in configuration.yaml)
INFLUXDB_URL = "http://your-influxdb-server:8086"
INFLUXDB_TOKEN = "your-token"
INFLUXDB_ORG = "your-org"
INFLUXDB_BUCKET = "home_assistant"

@service
def get_historical_consumption(days: int = 30):
    """
    Holt historische Verbrauchsdaten aus InfluxDB
    
    Args:
        days: Anzahl vergangener Tage
        
    Returns:
        Dict mit stündlichen Durchschnittswerten
    """
    
    client = InfluxDBClient(
        url=INFLUXDB_URL,
        token=INFLUXDB_TOKEN,
        org=INFLUXDB_ORG
    )
    
    query_api = client.query_api()
    
    # Flux Query für stündliche Durchschnittswerte
    query = f'''
    from(bucket: "{INFLUXDB_BUCKET}")
      |> range(start: -{days}d)
      |> filter(fn: (r) => r["entity_id"] == "house_consumption")
      |> filter(fn: (r) => r["_field"] == "value")
      |> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
      |> yield(name: "mean")
    '''
    
    result = query_api.query(query)
    
    # Verarbeite Ergebnisse nach Wochentag und Stunde
    consumption_by_hour = {}
    
    for table in result:
        for record in table.records:
            timestamp = record.get_time()
            value = record.get_value()
            
            weekday = timestamp.weekday()  # 0=Montag, 6=Sonntag
            hour = timestamp.hour
            
            key = f"{weekday}_{hour}"
            if key not in consumption_by_hour:
                consumption_by_hour[key] = []
            consumption_by_hour[key].append(value)
    
    # Berechne Durchschnittswerte
    avg_consumption = {}
    for key, values in consumption_by_hour.items():
        avg_consumption[key] = sum(values) / len(values)
    
    client.close()
    
    log.info(f"Historische Daten geladen: {len(avg_consumption)} Stunden-Profile")
    
    return avg_consumption

Schritt 2: Erweiterte Verbrauchsprognose

def predict_consumption(start_time, hours=24):
    """
    Prognostiziert Verbrauch basierend auf historischen Daten
    
    Args:
        start_time: Startzeit der Prognose
        hours: Anzahl Stunden
        
    Returns:
        Dict mit stündlichen Verbrauchsprognosen
    """
    
    # Lade historische Daten (gecacht)
    if not hasattr(predict_consumption, 'historical_data'):
        predict_consumption.historical_data = get_historical_consumption(30)
    
    historical = predict_consumption.historical_data
    
    forecast = {}
    
    for h in range(hours):
        dt = start_time + timedelta(hours=h)
        weekday = dt.weekday()
        hour = dt.hour
        
        key = f"{weekday}_{hour}"
        
        # Durchschnittlicher Verbrauch für diese Wochentag/Stunde
        avg_consumption = historical.get(key, 800)  # Fallback 800W
        
        # Saisonale Anpassungen
        month = dt.month
        if month in [12, 1, 2]:  # Winter
            avg_consumption *= 1.2
        elif month in [6, 7, 8]:  # Sommer
            avg_consumption *= 0.9
        
        forecast[dt] = avg_consumption
    
    return forecast

Schritt 3: Optimierung mit Verbrauchsprognose

def optimize_charging_schedule_v2(price_data, pv_forecast, battery_state, config):
    """
    Erweiterte Optimierung mit Verbrauchsprognose
    """
    
    schedule = {}
    
    # NEU: Verbrauchsprognose holen
    consumption_forecast = predict_consumption(datetime.now(), hours=48)
    
    # Sortiere Preise
    sorted_prices = sorted(price_data.items(), key=lambda x: x[1])
    threshold = calculate_price_threshold(price_data, config)
    
    # Batterie-Simulation
    current_energy_kwh = (battery_state['soc'] / 100.0) * config['battery_capacity']
    
    for dt, price in sorted(price_data.items()):
        if dt <= datetime.now():
            continue
        
        # PV und Verbrauch für diese Stunde
        pv_kwh = pv_forecast.get(dt, 0)
        consumption_w = consumption_forecast.get(dt, 800)
        consumption_kwh = consumption_w / 1000.0
        
        # Berechne Energie-Bilanz
        net_energy = pv_kwh - consumption_kwh
        
        # Entscheidung: Laden oder nicht?
        action = 'auto'
        power_w = 0
        reason = []
        
        if price <= threshold:
            # Prüfe ob Batterie-Kapazität benötigt wird
            max_capacity_kwh = (config['max_soc'] / 100.0) * config['battery_capacity']
            available_capacity = max_capacity_kwh - current_energy_kwh
            
            # Erwartetes Defizit in den nächsten 6 Stunden
            future_deficit = calculate_future_deficit(
                dt, consumption_forecast, pv_forecast, hours=6
            )
            
            # Lade wenn:
            # 1. Günstiger Preis
            # 2. Defizit erwartet
            # 3. Kapazität vorhanden
            if future_deficit > 0.5 and available_capacity > 0.5:
                action = 'charge'
                charge_kwh = min(available_capacity, future_deficit, 
                               config['max_charge_power'] / 1000.0)
                power_w = -int(charge_kwh * 1000)
                current_energy_kwh += charge_kwh
                reason.append(f"Defizit erwartet: {future_deficit:.1f} kWh")
        
        # Update Batterie-Stand für nächste Iteration
        current_energy_kwh += net_energy
        current_energy_kwh = max(
            (config['min_soc'] / 100.0) * config['battery_capacity'],
            min(current_energy_kwh, max_capacity_kwh)
        )
        
        schedule[dt.isoformat()] = {
            'action': action,
            'power_w': power_w,
            'price': price,
            'pv_forecast': pv_kwh,
            'consumption_forecast': consumption_kwh,
            'net_energy': net_energy,
            'battery_soc_forecast': (current_energy_kwh / config['battery_capacity']) * 100,
            'reason': ', '.join(reason)
        }
    
    return schedule


def calculate_future_deficit(start_dt, consumption_forecast, pv_forecast, hours=6):
    """
    Berechnet erwartetes Energie-Defizit in den nächsten X Stunden
    """
    
    total_deficit = 0
    
    for h in range(hours):
        dt = start_dt + timedelta(hours=h)
        consumption_w = consumption_forecast.get(dt, 800)
        pv_kwh = pv_forecast.get(dt, 0)
        
        consumption_kwh = consumption_w / 1000.0
        net = consumption_kwh - pv_kwh
        
        if net > 0:
            total_deficit += net
    
    return total_deficit

Schritt 4: Konfiguration erweitern

# Neue Input Helper für InfluxDB

input_text:
  influxdb_url:
    name: "InfluxDB URL"
    initial: "http://192.168.xxx.xxx:8086"
    
  influxdb_token:
    name: "InfluxDB Token"
    initial: "your-token"
    
  influxdb_org:
    name: "InfluxDB Organization"
    initial: "homeassistant"
    
  influxdb_bucket:
    name: "InfluxDB Bucket"
    initial: "home_assistant"

input_number:
  historical_data_days:
    name: "Historische Daten (Tage)"
    min: 7
    max: 365
    step: 1
    initial: 30
    icon: mdi:calendar-range

Schritt 5: Neue Automatisierung

automation:
  # Wöchentliches Update der historischen Daten
  - id: battery_optimizer_update_historical_data
    alias: "Batterie Optimierung: Historische Daten aktualisieren"
    description: "Lädt wöchentlich neue historische Daten aus InfluxDB"
    trigger:
      - platform: time
        at: "03:00:00"
      - platform: time_pattern
        # Jeden Sonntag
        days: /7
    action:
      - service: pyscript.get_historical_consumption
        data:
          days: 30
      - service: notify.persistent_notification
        data:
          title: "Batterie-Optimierung"
          message: "Historische Daten aktualisiert"

Metriken & KPIs

Neue Dashboard-Elemente

template:
  - sensor:
      - name: "Verbrauchsprognose Genauigkeit"
        unique_id: consumption_forecast_accuracy
        state: >
          {% set actual = states('sensor.today_load') | float %}
          {% set forecast = state_attr('input_text.battery_charging_schedule', 'total_consumption_forecast') | float %}
          {% if forecast > 0 %}
            {{ ((1 - abs(actual - forecast) / forecast) * 100) | round(1) }}
          {% else %}
            unknown
          {% endif %}
        unit_of_measurement: "%"
        icon: mdi:target
        
      - name: "Optimierungs-Einsparung"
        unique_id: optimizer_savings
        state: >
          # Berechne tatsächliche Kosten vs. ohne Optimierung
          # TODO: Implementierung basierend auf InfluxDB Daten
        unit_of_measurement: "EUR"
        icon: mdi:piggy-bank

Erwartete Verbesserungen

Genauigkeit

  • Verbrauchsprognose: +40% durch historische Daten
  • Ladeplanung: +25% durch bessere Vorhersage
  • ROI: Messbare Einsparungen

Intelligenz

  • Wochenend-Muster erkennen
  • Saisonale Anpassungen
  • Feiertags-Berücksichtigung

Adaptivität

  • System lernt aus Fehlprognosen
  • Automatische Parameter-Anpassung
  • Kontinuierliche Verbesserung

Nächste Schritte

  1. InfluxDB Setup prüfen

    • Sind alle Sensoren geloggt?
    • Retention Policy konfiguriert?
    • Genug historische Daten (min. 30 Tage)?
  2. Connector implementieren

    • InfluxDB Client installieren: pip install influxdb-client
    • Token und Zugangsdaten konfigurieren
    • Erste Testabfrage durchführen
  3. Verbrauchsmuster analysieren

    • Wochentag vs. Wochenende
    • Tagesverlauf typisch?
    • Saisonale Unterschiede?
  4. Integration testen

    • Mit historischen Daten simulieren
    • Prognose-Genauigkeit messen
    • Schrittweise in Produktion nehmen
  5. Dashboard erweitern

    • Prognose vs. Ist-Verbrauch
    • Einsparungen visualisieren
    • Lernkurve anzeigen

Fragen für Phase 2

Bevor wir starten:

  1. InfluxDB: Ist bereits konfiguriert? Zugangsdaten?
  2. Daten-Historie: Wieviel Tage sind verfügbar?
  3. Sensoren: Welche sind in InfluxDB geloggt?
  4. Retention: Wie lange werden Daten behalten?
  5. Performance: Wie groß ist die Datenbank?

Lass uns das in einem nächsten Schritt gemeinsam analysieren!


Status: Phase 1 abgeschlossen Nächster Meilenstein: InfluxDB Integration 🎯