Innovenergy_trunk/firmware/opt/victronenergy/dbus-systemcalc-py/delegates/schedule.py

231 lines
7.2 KiB
Python
Raw Normal View History

2023-02-16 12:57:06 +00:00
import logging
import gobject
from datetime import datetime, timedelta, time
from itertools import izip, imap
from functools import partial
# Victron packages
from ve_utils import exit_on_error
from delegates.base import SystemCalcDelegate
from delegates.batterylife import BatteryLife, BLPATH
from delegates.batterylife import State as BatteryLifeState
from delegates.dvcc import Dvcc
HUB4_SERVICE = 'com.victronenergy.hub4'
# Number of scheduled charge slots
NUM_SCHEDULES = 5
def prev_week_day(adate, w):
""" finds the previous w-day of the week before adate.
Sun=0 or 7, Sat=6, that is what unix uses. """
w %= 7
return adate - timedelta(days=(adate.weekday()+7-w) % 7 + 1)
def next_week_day(adate, w):
""" Finds the next w-day after or equal to adate.
Sun=0 or 7, Sat=6, that is what unix uses. """
w %= 7
return adate + timedelta(days=(w - adate.weekday() - 1) % 7)
def next_schedule_day(adate, w):
if w < 7:
# A specific week day
return next_week_day(adate, w)
elif w == 7:
# 7 days a week
return adate
elif w == 8:
# week days
if adate.weekday() > 4:
return next_week_day(adate, 1) # Monday
return adate
# w >=9, weekend days
if adate.weekday() < 5:
return next_week_day(adate, 6) # Saturday
return adate
def prev_schedule_day(adate, w):
if w < 7:
# A specific week day
return prev_week_day(adate, w)
elif w == 7:
# 7 days a week
return adate - timedelta(days=1)
elif w == 8:
# week days
if adate.weekday() in (0, 6):
return prev_week_day(adate, 5) # Mon,Sun preceded by Friday
return adate - timedelta(days=1)
# w >= 9, weekend days
if 0 < adate.weekday() < 5:
return prev_week_day(adate, 0) # Sunday
return adate - timedelta(days=1)
class ScheduledWindow(object):
def __init__(self, starttime, duration):
self.start = starttime
self.stop = self.start + timedelta(seconds=duration)
def __contains__(self, t):
return self.start <= t < self.stop
def __eq__(self, other):
return self.start == other.start and self.stop == other.stop
def __repr__(self):
return "Start: {}, Stop: {}".format(self.start, self.stop)
class ScheduledChargeWindow(ScheduledWindow):
def __init__(self, starttime, duration, soc):
super(ScheduledChargeWindow, self).__init__(starttime, duration)
self.soc = soc
def soc_reached(self, s):
return s >= self.soc
def __repr__(self):
return "Start charge: {}, Stop: {}, Soc: {}".format(
self.start, self.stop, self.soc)
class ScheduledCharging(SystemCalcDelegate):
""" Let the system do other things based on time schedule. """
_get_time = datetime.now
def __init__(self):
super(ScheduledCharging, self).__init__()
self.soc = None
self.pvpower = 0
self.active = False
self.hysteresis = True
self.socreached = False
self._timer = gobject.timeout_add(5000, exit_on_error, self._on_timer)
def set_sources(self, dbusmonitor, settings, dbusservice):
SystemCalcDelegate.set_sources(self, dbusmonitor, settings, dbusservice)
self._dbusservice.add_path('/Control/ScheduledCharge', value=0)
def get_input(self):
return [
(HUB4_SERVICE, ['/Overrides/ForceCharge', '/Overrides/MaxDischargePower'])
]
def settings_changed(self, setting, oldvalue, newvalue):
if setting.startswith("schedule_soc_"):
# target SOC was modified. Disable the hysteresis on the next
# run.
self.hysteresis = False
def get_settings(self):
settings = []
# Paths for scheduled charging. We'll allow 5 for now.
for i in range(NUM_SCHEDULES):
settings.append(("schedule_day_{}".format(i),
BLPATH + "/Schedule/Charge/{}/Day".format(i), -7, -10, 9))
settings.append(("schedule_start_{}".format(i),
BLPATH + "/Schedule/Charge/{}/Start".format(i), 0, 0, 0))
settings.append(("schedule_duration_{}".format(i),
BLPATH + "/Schedule/Charge/{}/Duration".format(i), 0, 0, 0))
settings.append(("schedule_soc_{}".format(i),
BLPATH + "/Schedule/Charge/{}/Soc".format(i), 100, 0, 100))
return settings
@classmethod
def _charge_windows(klass, today, days, starttimes, durations, stopsocs):
starttimes = (time(x/3600, x/60 % 60, x % 60) for x in starttimes)
for d, starttime, duration, soc in izip(days, starttimes, durations, stopsocs):
if d >= 0:
d0 = prev_schedule_day(today, d)
d1 = next_schedule_day(today, d)
yield ScheduledChargeWindow(
datetime.combine(d0, starttime), duration, soc)
yield ScheduledChargeWindow(
datetime.combine(d1, starttime), duration, soc)
def charge_windows(self, today):
days = (self._settings['schedule_day_{}'.format(i)] for i in range(NUM_SCHEDULES))
starttimes = (self._settings['schedule_start_{}'.format(i)] for i in range(NUM_SCHEDULES))
durations = (self._settings['schedule_duration_{}'.format(i)] for i in range(NUM_SCHEDULES))
stopsocs = (self._settings['schedule_soc_{}'.format(i)] for i in range(NUM_SCHEDULES))
return self._charge_windows(today, days, starttimes, durations, stopsocs)
@property
def forcecharge(self):
return self._dbusmonitor.get_value(HUB4_SERVICE, '/Overrides/ForceCharge')
@forcecharge.setter
def forcecharge(self, v):
return self._dbusmonitor.set_value_async(HUB4_SERVICE,
'/Overrides/ForceCharge', 1 if v else 0)
@property
def maxdischargepower(self):
return self._dbusmonitor.get_value(HUB4_SERVICE, '/Overrides/MaxDischargePower')
@maxdischargepower.setter
def maxdischargepower(self, v):
return self._dbusmonitor.set_value_async(HUB4_SERVICE, '/Overrides/MaxDischargePower', v)
def _on_timer(self):
if self.soc is None:
return True
if not Dvcc.instance.has_ess_assistant:
return True
if BatteryLife.instance.state == BatteryLifeState.KeepCharged:
self._dbusservice['/Control/ScheduledCharge'] = 0
return True
now = self._get_time()
today = now.date()
for w in self.charge_windows(today):
if now in w:
if w.soc_reached(self.soc):
self.forcecharge = False
self.socreached = True
# elif self.hysteresis and w.soc_reached(self.soc + 5):
# If we are within 5%, keep it the same, but write it to
# avoid a timeout.
#self.forcecharge = self.forcecharge
elif not self.socreached:
# SoC not reached yet
# Note: soc_reached always returns False for a target of
# 100%, so this is the only branch that is ever excuted
# in those cases.
self.forcecharge = True
# Signal that scheduled charging is active
self.active = True
# The discharge is limited to 1W or whatever is available
# from PV. 1W essentially disables discharge without
# disabling feed-in, so Power-Assist and feeding in
# the excess continues to work. Setting this to the pv-power
# causes it to directly consume the PV for loads and charge
# only with the excess. Scale it between 80% and 93%
# of PV-power depending on the SOC.
scale = 0.8 + min(max(0, self.soc - w.soc), 1.3)*0.1
self.maxdischargepower = max(1, round(self.pvpower*scale))
break
else:
self.forcecharge = False
self.maxdischargepower = -1
self.active = False
self.socreached=False
self._dbusservice['/Control/ScheduledCharge'] = int(self.active)
self.hysteresis = True
return True
def update_values(self, newvalues):
self.soc = newvalues.get('/Dc/Battery/Soc')
self.pvpower = max(newvalues.get('/Dc/Pv/Power'), 0)