Innovenergy_trunk/firmware/opt/innovenergy/relay1/relay1.py

243 lines
6.9 KiB
Python
Executable File

#!/usr/bin/python -u
# coding=utf-8
from python_libs.ie_dbus.dbus_service import DBusService
from python_libs.ie_utils.filters import DebounceFilter
from python_libs.ie_utils.main_loop import run_on_main_loop
from python_libs.ie_utils.mixins import memoize
import logging
import relay_function
import relay_polarity
import test_state
# noinspection PyUnreachableCode
if False:
from typing import NoReturn, Optional
logging.basicConfig(level=logging.DEBUG)
_log = logging.getLogger(__name__)
VERSION = '1.2.0'
PRODUCT = 'Relay1'
UPDATE_PERIOD_MS = 2000
BATTERY_SERVICE_PREFIX = 'com.victronenergy.battery.'
# settings
DebounceFilterStrength = PRODUCT + '/DebounceFilterTime'
ConnectLoadThreshold = PRODUCT + '/ConnectLoadThreshold'
NominalPowerOfLoad = PRODUCT + '/NominalPowerOfLoad'
Relay1Function = PRODUCT + '/Relay1Function'
Relay1Polarity = PRODUCT + '/Relay1Polarity'
MinimumSocLimit = 'CGwacs/BatteryLife/MinimumSocLimit'
# remote properties
Relay1State = 'system/Relay/1/State'
class Relay1Service(DBusService):
def __init__(self):
super(Relay1Service, self).__init__(PRODUCT.lower())
self.settings.add_setting(path=DebounceFilterStrength, default_value=180, min=0, max=10000)
self.settings.add_setting(path=NominalPowerOfLoad, default_value=10000, min=0, max=100000) # watts
self.settings.add_setting(path=ConnectLoadThreshold, default_value=50000, min=0, max=50000) # watts
self.settings.add_setting(path=Relay1Function, default_value=relay_function.NONE)
self.settings.add_setting(path=Relay1Polarity, default_value=relay_polarity.NORMALLY_OPEN)
self.own_properties.set('/ProductName', PRODUCT)
self.own_properties.set('/Mgmt/ProcessName', __file__)
self.own_properties.set('/Mgmt/ProcessVersion', VERSION)
self.own_properties.set('/Mgmt/Connection', 'dbus')
self.own_properties.set('/ProductId', PRODUCT)
self.own_properties.set('/FirmwareVersion', VERSION)
self.own_properties.set('/HardwareVersion', VERSION)
self.own_properties.set('/Connected', 1)
self.own_properties.set('/LogicalState', 0)
self.own_properties.set('/TestRelay1', -1, writable=True)
self.debounce_filter = DebounceFilter(False, self.settings.get(DebounceFilterStrength))
@memoize
def is_relay1_available(self):
# type: () -> bool
return self.remote_properties.exists(Relay1State)
@property
def battery_service(self):
return next((s for s in self.available_services if s.startswith(BATTERY_SERVICE_PREFIX)), None)
@property
def grid_power(self):
# type: () -> float
l1 = self.remote_properties.get('system/Ac/Grid/L1/Power').value
l2 = self.remote_properties.get('system/Ac/Grid/L2/Power').value
l3 = self.remote_properties.get('system/Ac/Grid/L3/Power').value
l1 = l1 if isinstance(l1, (float, int)) else 0.
l2 = l2 if isinstance(l2, (float, int)) else 0.
l3 = l3 if isinstance(l3, (float, int)) else 0.
return -(l1 + l2 + l3)
@property
def test_relay(self):
# type: () -> Optional[bool]
test = self.own_properties.get('/TestRelay1').value
if test == test_state.NO_TEST:
return None
return test == test_state.TEST_RELAY_ON
@property
def connect_load_threshold(self):
# type: () -> int
return self.settings.get(ConnectLoadThreshold)
@property
def disconnect_load_threshold(self):
# type: () -> int
return self.connect_load_threshold - self.settings.get(NominalPowerOfLoad)
@property
def load_relay_target_state(self):
# type: () -> Optional[bool]
if self.grid_power > self.connect_load_threshold:
return True
if self.grid_power < self.disconnect_load_threshold:
return False
else:
return None # dead-band
@property
def battery_has_alarm(self):
# type: () -> bool
return self.remote_properties.get(self.battery_service + '/NumberOfAlarmFlags').value > 0
@property
def soc(self):
# type: () -> float
return self.remote_properties.get(self.battery_service + '/Soc').value
# noinspection PyBroadException
@property
def min_soc_limit(self):
# type: () -> int
try:
return int(self.settings.get(MinimumSocLimit))
except Exception:
return 101 # force alarm when MinimumSocLimit setting is not readable
@property
def alarm_relay_target_state(self):
# type: () -> bool
if self.battery_service is None:
_log.debug('no battery available!')
return True # force alarm
_log.debug('soc = ' + str(self.soc))
_log.debug('min_soc = ' + str(self.min_soc_limit))
_log.debug('battery_alarm = ' + str(self.battery_has_alarm))
return self.soc < self.min_soc_limit or self.battery_has_alarm
@property
def relay1_target_state(self):
# type: () -> Optional[bool]
if self.test_relay is not None:
_log.info('testing ' + self.relay1_function + ' relay')
return self.test_relay
_log.debug('relay1_function is ' + self.relay1_function)
if self.relay1_function == relay_function.ALARM:
return self.alarm_relay_target_state
elif self.relay1_function == relay_function.LOAD:
return self.load_relay_target_state
return None
@property
def relay1_function(self):
# type: () -> str
return self.settings.get(Relay1Function)
@property
def relay1_polarity(self):
# type: () -> str
return self.settings.get(Relay1Polarity)
def display_state(self, state):
# type: (bool) -> str
if self.relay1_function == relay_function.ALARM:
return 'alarm is on' if state else 'alarm is off'
if self.relay1_function == relay_function.LOAD:
return 'load is connected' if state else 'load is disconnected'
return 'relay is disabled'
def set_relay1(self, state):
# type: (bool) -> NoReturn
changed = self.remote_properties.set(Relay1State, 1 if state else 0)
if changed or _log.getEffectiveLevel() <= logging.DEBUG:
_log.info('Relay is now ' + ('energized' if state else 'de-energized'))
def debounce(self, state):
# type: (Optional[bool]) -> Optional[bool]
if state is None:
return None
filter_length = int(self.settings.get(DebounceFilterStrength) * 1000 / UPDATE_PERIOD_MS)
return self.debounce_filter.update(state, filter_length)
def update(self):
if self.relay1_function == relay_function.NONE:
self.set_relay1(False) # make sure relay is de-energized when it is disabled
return
state = self.relay1_target_state # using tri-state logic, True/False/None
if state is None: # None acts as a NOP
return
if self.test_relay is None: # do not debounce when testing
state = self.debounce(state)
self.own_properties.set('/LogicalState', state)
_log.debug(self.display_state(state))
if self.relay1_polarity == relay_polarity.NORMALLY_CLOSED:
_log.debug('inverting command because relay is configured to be normally-closed')
state = not state
self.set_relay1(state)
def main():
_log.info('starting ' + __file__)
with Relay1Service() as service:
run_on_main_loop(service.update, UPDATE_PERIOD_MS)
_log.info(__file__ + ' has shut down')
if __name__ == '__main__':
main()