#!/usr/bin/python -u # coding=utf-8 import logging import os import time import states as State import target_type as TargetType from random import randint from python_libs.ie_dbus.dbus_service import DBusService from python_libs.ie_utils.main_loop import run_on_main_loop # noinspection PyUnreachableCode if False: from typing import NoReturn, Optional, Any, Iterable, List logging.basicConfig(level=logging.INFO) _log = logging.getLogger(__name__) VERSION = '1.0.0' PRODUCT = 'Controller' GRID_SERVICE_PREFIX = 'com.victronenergy.grid.' BATTERY_SERVICE_PREFIX = 'com.victronenergy.battery.' INVERTER_SERVICE_PREFIX = 'com.victronenergy.vebus.' SYSTEM_SERVICE_PREFIX = 'com.victronenergy.system' HUB4_SERVICE_PREFIX = 'com.victronenergy.hub4' SETTINGS_SERVICE_PREFIX = 'com.victronenergy.settings' UPDATE_PERIOD_MS = 2000 MAX_POWER_PER_BATTERY = 2500 MAX_DAYS_WITHOUT_EOC = 7 SECONDS_PER_DAY = 24 * 60 * 60 GRID_SET_POINT_SETTING = PRODUCT + '/GridSetPoint' LAST_EOC_SETTING = PRODUCT + '/LastEOC' CALIBRATION_CHARGE_START_TIME_OF_DAY_SETTING = PRODUCT + '/CalibrationChargeStartTime' HEAT_LOSS = 150 # W P_CONST = 0.5 # W/W Epoch = int Seconds = int def time_now(): return int(time.time()) class Controller(object): def __init__(self, measurement, target, target_type, state): # type: (float, float, int, int) -> NoReturn self.target_type = target_type self.target = target self.measurement = measurement self.state = state d_p = target - measurement self.delta = d_p * P_CONST @staticmethod def min(controllers): # type: (Iterable[Controller]) -> Controller return min(controllers, key=lambda c: c.delta) @staticmethod def max(controllers): # type: (Iterable[Controller]) -> Controller return max(controllers, key=lambda c: c.delta) def clamp(self, lower_limit_controllers, upper_limit_controllers): # type: (List[Controller],List[Controller]) -> Controller c_min = Controller.min(upper_limit_controllers + [self]) return Controller.max(lower_limit_controllers + [c_min]) # noinspection PyMethodMayBeStatic class InnovEnergyController(DBusService): def __init__(self): super(InnovEnergyController, self).__init__(PRODUCT.lower()) self.settings.add_setting(path=LAST_EOC_SETTING, default_value=0) # unix epoch timestamp self.settings.add_setting(path=GRID_SET_POINT_SETTING, default_value=0) # grid setpoint, Watts self.settings.add_setting(path=CALIBRATION_CHARGE_START_TIME_OF_DAY_SETTING, default_value=32400) # 09:00 self.own_properties.set('/ProductName', PRODUCT) self.own_properties.set('/Mgmt/ProcessName', __file__) self.own_properties.set('/Mgmt/ProcessVersion', VERSION) self.own_properties.set('/Mgmt/Connection', 'dbus') self.own_properties.set('/ProductId', PRODUCT) self.own_properties.set('/FirmwareVersion', VERSION) self.own_properties.set('/HardwareVersion', VERSION) self.own_properties.set('/Connected', 1) self.own_properties.set('/TimeToCalibrationCharge', 'unknown') self.own_properties.set('/State', 0) self.phases = [ p for p in ['/Hub4/L1/AcPowerSetpoint', '/Hub4/L2/AcPowerSetpoint', '/Hub4/L3/AcPowerSetpoint'] if self.remote_properties.exists(self.inverter_service + p) ] self.n_phases = len(self.phases) print ('The system has ' + str(self.n_phases) + ' phase' + ('s' if self.n_phases != 1 else '')) self.max_inverter_power = 32700 # ^ defined in https://github.com/victronenergy/dbus_modbustcp/blob/master/CCGX-Modbus-TCP-register-list.xlsx def clamp_power_command(self, value): # type: (float) -> int value = max(value, -self.max_inverter_power) value = min(value, self.max_inverter_power) return int(value) def get_service(self, prefix): # type: (str) -> Optional[unicode] service = next((s for s in self.available_services if s.startswith(prefix)), None) if service is None: raise Exception('no service matching ' + prefix + '* available') return service def is_service_available(self, prefix): # type: (str) -> bool return next((True for s in self.available_services if s.startswith(prefix)), False) @property def battery_service(self): # type: () -> Optional[unicode] return self.get_service(BATTERY_SERVICE_PREFIX) @property def battery_available(self): # type: () -> bool return self.is_service_available(BATTERY_SERVICE_PREFIX) @property def grid_service(self): # type: () -> Optional[unicode] return self.get_service(GRID_SERVICE_PREFIX) @property def grid_meter_available(self): # type: () -> bool return self.is_service_available(GRID_SERVICE_PREFIX) @property def inverter_service(self): # type: () -> Optional[unicode] return self.get_service(INVERTER_SERVICE_PREFIX) @property def inverter_available(self): # type: () -> bool return self.is_service_available(INVERTER_SERVICE_PREFIX) @property def system_service(self): # type: () -> Optional[unicode] return self.get_service(SYSTEM_SERVICE_PREFIX) @property def system_service_available(self): # type: () -> bool return self.is_service_available(SYSTEM_SERVICE_PREFIX) @property def hub4_service(self): # type: () -> Optional[unicode] return self.get_service(HUB4_SERVICE_PREFIX) @property def hub4_service_available(self): # type: () -> bool return self.is_service_available(HUB4_SERVICE_PREFIX) @property def inverter_power_setpoint(self): # type: () -> float return sum((self.get_inverter_prop(p) for p in self.phases)) def get_battery_prop(self, dbus_path): # type: (str) -> Any battery_service = self.battery_service return self.remote_properties.get(battery_service + dbus_path).value def get_grid_prop(self, dbus_path): # type: (str) -> Any return self.remote_properties.get(self.grid_service + dbus_path).value def get_inverter_prop(self, dbus_path): # type: (str) -> Any return self.remote_properties.get(self.inverter_service + dbus_path).value def get_system_prop(self, dbus_path): # type: (str) -> Any system_service = self.system_service return self.remote_properties.get(system_service + dbus_path).value def get_hub4_prop(self, dbus_path): # type: (str) -> Any hub4_service = self.hub4_service return self.remote_properties.get(hub4_service + dbus_path).value def set_settings_prop(self, dbus_path, value): # type: (str, Any) -> bool return self.remote_properties.set(SETTINGS_SERVICE_PREFIX + dbus_path, value) def set_inverter_prop(self, dbus_path, value): # type: (str, Any) -> bool inverter_service = self.inverter_service return self.remote_properties.set(inverter_service + dbus_path, value) @property def max_battery_charge_power(self): # type: () -> int return self.get_battery_prop('/Info/MaxChargePower') @property def max_battery_discharge_power(self): # type: () -> int return self.get_battery_prop('/Info/MaxDischargePower') @property def max_configured_charge_power(self): # type: () -> Optional[int] max_power = self.settings.get('/Settings/CGwacs/MaxChargePower') return max_power if max_power >= 0 else None @property def max_configured_discharge_power(self): # unsigned # type: () -> Optional[int] max_power = self.settings.get('/Settings/CGwacs/MaxDischargePower') return max_power if max_power >= 0 else None @property def max_charge_power(self): # type: () -> int if self.max_configured_charge_power is None: return self.max_battery_charge_power else: return min(self.max_battery_charge_power, self.max_configured_charge_power) @property def max_discharge_power(self): # unsigned # type: () -> int if self.max_configured_discharge_power is None: return self.max_battery_discharge_power else: return min(self.max_battery_discharge_power, self.max_configured_discharge_power) def set_inverter_power_setpoint(self, power): # type: (float) -> NoReturn if self.settings.get('/Settings/CGwacs/BatteryLife/State') == 9: self.settings.set('/Settings/CGwacs/BatteryLife/State', 0) # enables scheduled charge self.settings.set('/Settings/CGwacs/Hub4Mode', 3) # disable hub4 self.set_inverter_prop('/Hub4/DisableCharge', 0) self.set_inverter_prop('/Hub4/DisableFeedIn', 0) power = self.clamp_power_command(power / self.n_phases) for p in self.phases: self.set_inverter_prop(p, power + randint(-1, 1)) # use randint to force dbus re-send def set_controller_state(self, state): # type: (int) -> NoReturn self.own_properties.set('/State', state) @property def grid_power(self): # type: () -> Optional[float] try: return self.get_grid_prop('/Ac/Power') except: return None @property def battery_cold(self): # type: () -> bool return self.get_battery_prop('/IoStatus/BatteryCold') == 1 @property def eoc_reached(self): # type: () -> bool if not self.battery_available: return False return min(self.get_battery_prop('/EOCReached')) == 1 @property def battery_power(self): # type: () -> float return self.get_battery_prop('/Dc/0/Power') @property def inverter_ac_in_power(self): # type: () -> float return self.get_inverter_prop('/Ac/ActiveIn/P') @property def inverter_ac_out_power(self): # type: () -> float return self.get_inverter_prop('/Ac/Out/P') @property def soc(self): # type: () -> float return self.get_battery_prop('/Soc') @property def n_batteries(self): # type: () -> int return self.get_battery_prop('/NbOfBatteries') @property def min_soc(self): # type: () -> float return self.settings.get('/Settings/CGwacs/BatteryLife/MinimumSocLimit') @property def should_hold_min_soc(self): # type: () -> bool return self.min_soc <= self.soc <= self.min_soc + 5 @property def utc_offset(self): # type: () -> int # stackoverflow.com/a/1301528 # stackoverflow.com/a/3168394 os.environ['TZ'] = self.settings.get('/Settings/System/TimeZone') time.tzset() is_dst = time.daylight and time.localtime().tm_isdst > 0 return -(time.altzone if is_dst else time.timezone) @property def grid_set_point(self): # type: () -> float return self.settings.get('/Settings/CGwacs/AcPowerSetPoint') @property def time_to_calibration_charge_str(self): # type: () -> str return self.own_properties.get('/TimeToCalibrationCharge').text @property def calibration_charge_deadline(self): # type: () -> Epoch utc_offset = self.utc_offset ultimate_deadline = self.settings.get(LAST_EOC_SETTING) + MAX_DAYS_WITHOUT_EOC * SECONDS_PER_DAY midnight_before_udl = int((ultimate_deadline + utc_offset) / SECONDS_PER_DAY) * SECONDS_PER_DAY - utc_offset # round off to last midnight dead_line = midnight_before_udl + self.calibration_charge_start_time_of_day while dead_line > ultimate_deadline: # should fire at most once, but let's be defensive... dead_line -= SECONDS_PER_DAY # too late, advance one day return dead_line @property def time_to_calibration_charge(self): # type: () -> Seconds return self.calibration_charge_deadline - time_now() @property def grid_blackout(self): # type: () -> bool return self.get_inverter_prop('/Leds/Mains') < 1 @property def scheduled_charge(self): # type: () -> bool return self.get_hub4_prop('/Overrides/ForceCharge') != 0 @property def calibration_charge_start_time_of_day(self): # type: () -> Seconds return self.settings.get(CALIBRATION_CHARGE_START_TIME_OF_DAY_SETTING) # seconds since midnight @property def must_do_calibration_charge(self): # type: () -> bool return self.time_to_calibration_charge <= 0 def controller_charge_to_min_soc(self): # type: () -> Controller return Controller( measurement=self.battery_power, target=self.max_charge_power, target_type=TargetType.BATTERY_DC, state=State.CHARGE_TO_MIN_SOC ) def controller_hold_min_soc(self): # type: () -> Controller # TODO: explain a = -4 * HEAT_LOSS * self.n_batteries b = -a * (self.min_soc + .5) target_dc_power = a * self.soc + b return Controller( measurement = self.battery_power, target = target_dc_power, target_type = TargetType.BATTERY_DC, state = State.HOLD_MIN_SOC ) def controller_calibration_charge(self): # type: () -> Controller return Controller( measurement = self.battery_power, target = self.max_charge_power, target_type = TargetType.BATTERY_DC, state = State.CALIBRATION_CHARGE ) def controller_limit_discharge_power(self): # signed # type: () -> Controller return Controller( measurement = self.battery_power, target = -self.max_discharge_power, # add sign! target_type = TargetType.BATTERY_DC, state = State.LIMIT_DISCHARGE_POWER ) def controller_limit_charge_power(self): # type: () -> Controller return Controller( measurement = self.battery_power, target = self.max_charge_power, target_type = TargetType.BATTERY_DC, state = State.LIMIT_CHARGE_POWER ) def controller_optimize_self_consumption(self): # type: () -> Controller return Controller( measurement = self.grid_power, target = self.grid_set_point, target_type = TargetType.GRID_AC, state = State.OPTIMIZE_SELF_CONSUMPTION ) def controller_heating(self): # type: () -> Controller return Controller( measurement = self.battery_power, target = self.max_charge_power, target_type = TargetType.BATTERY_DC, state = State.HEATING ) def controller_scheduled_charge(self): # type: () -> Controller return Controller( measurement = self.battery_power, target = self.max_charge_power, target_type = TargetType.BATTERY_DC, state = State.SCHEDULED_CHARGE ) def controller_no_grid_meter(self): # type: () -> Controller return Controller( measurement = self.battery_power, target = self.max_charge_power, target_type = TargetType.BATTERY_DC, state = State.NO_GRID_METER_AVAILABLE ) def controller_no_battery(self): # type: () -> Controller return Controller( measurement = self.inverter_ac_in_power, target = 0, target_type = TargetType.INVERTER_AC_IN, state = State.NO_BATTERY_AVAILABLE ) def controller_bridge_grid_blackout(self): # type: () -> Controller return Controller( measurement = 0, target = 0, target_type = TargetType.GRID_AC, state = State.BRIDGE_GRID_BLACKOUT ) def update_eoc(self): if self.eoc_reached: print('battery has reached EOC') self.settings.set(LAST_EOC_SETTING, time_now()) self.publish_time_to_calibration_charge() def publish_time_to_calibration_charge(self): total_seconds = self.time_to_calibration_charge if total_seconds <= 0: time_to_eoc_str = 'now' else: total_minutes, seconds = divmod(total_seconds, 60) total_hours, minutes = divmod(total_minutes, 60) total_days, hours = divmod(total_hours, 24) days_str = (str(total_days) + 'd') if total_days > 0 else '' hours_str = (str(hours) + 'h') if total_hours > 0 else '' minutes_str = (str(minutes) + 'm') if total_days == 0 else '' time_to_eoc_str = "{0} {1} {2}".format(days_str, hours_str, minutes_str).strip() self.own_properties.set('/TimeToCalibrationCharge', time_to_eoc_str) def print_system_stats(self, controller): # type: (Controller) -> NoReturn def soc_setpoint(): if controller.state == State.CALIBRATION_CHARGE or controller.state == State.NO_GRID_METER_AVAILABLE: return ' => 100%' if controller.state == State.CHARGE_TO_MIN_SOC: return ' => ' + str(int(self.min_soc)) + '%' return '' def setpoint(target_type): if target_type != controller.target_type: return '' return ' => ' + str(int(controller.target)) + 'W' def p(power): # type: (Optional[float]) -> str if power is None: return ' --- W' else: return str(int(power)) + 'W' ac_loads = None if self.grid_power is None else self.grid_power - self.inverter_ac_in_power delta = p(controller.delta) if controller.delta < 0 else '+' + p(controller.delta) battery_power = self.battery_power if self.battery_available else None soc_ = str(self.soc) + '%' if self.battery_available else '---' print (State.name_of[controller.state]) print ('') print ('time to CC: ' + self.time_to_calibration_charge_str) print (' SOC: ' + soc_ + soc_setpoint()) print (' grid: ' + p(self.grid_power) + setpoint(TargetType.GRID_AC)) print (' battery: ' + p(battery_power) + setpoint(TargetType.BATTERY_DC)) print (' AC in: ' + p(self.inverter_ac_in_power) + ' ' + delta) print (' AC out: ' + p(self.inverter_ac_out_power)) print (' AC loads: ' + p(ac_loads)) def choose_controller(self): # type: () -> Controller if self.grid_blackout: return self.controller_bridge_grid_blackout() if not self.battery_available: return self.controller_no_battery() if self.battery_cold: return self.controller_heating() if self.scheduled_charge: return self.controller_scheduled_charge() if self.must_do_calibration_charge: return self.controller_calibration_charge() if self.soc < self.min_soc: return self.controller_charge_to_min_soc() if not self.grid_meter_available: return self.controller_no_grid_meter() hold_min_soc = self.controller_hold_min_soc() limit_discharge_power = self.controller_limit_discharge_power() # signed lower_limit = [limit_discharge_power, hold_min_soc] # No upper limit. We no longer actively limit charge power. DC/DC Charger inside the BMS will do that for us. upper_limit = [] optimize_self_consumption = self.controller_optimize_self_consumption() return optimize_self_consumption.clamp(lower_limit, upper_limit) def update(self): print('iteration started\n') self.update_eoc() if self.inverter_available: controller = self.choose_controller() power = self.inverter_ac_in_power + controller.delta self.set_inverter_power_setpoint(power) self.set_controller_state(controller.state) self.print_system_stats(controller) # for debug else: self.set_controller_state(State.NO_INVERTER_AVAILABLE) print('inverter not available!') print('\niteration finished\n') def main(): print('starting ' + __file__) with InnovEnergyController() as service: run_on_main_loop(service.update, UPDATE_PERIOD_MS) print(__file__ + ' has shut down') if __name__ == '__main__': main()