change Salidomo siganl name to match with Salimax for Venus

This commit is contained in:
Yinyin Liu 2024-06-19 13:35:00 +02:00
parent 919cc67ae3
commit 10b7c7b267
61 changed files with 2378 additions and 649 deletions

View File

@ -1,644 +0,0 @@
#!/usr/bin/python -u
# coding=utf-8
import logging
import os
import time
import states as State
import target_type as TargetType
from random import randint
from python_libs.ie_dbus.dbus_service import DBusService
from python_libs.ie_utils.main_loop import run_on_main_loop
# noinspection PyUnreachableCode
if False:
from typing import NoReturn, Optional, Any, Iterable, List
logging.basicConfig(level=logging.INFO)
_log = logging.getLogger(__name__)
VERSION = '1.0.0'
PRODUCT = 'Controller'
GRID_SERVICE_PREFIX = 'com.victronenergy.grid.'
BATTERY_SERVICE_PREFIX = 'com.victronenergy.battery.'
INVERTER_SERVICE_PREFIX = 'com.victronenergy.vebus.'
SYSTEM_SERVICE_PREFIX = 'com.victronenergy.system'
HUB4_SERVICE_PREFIX = 'com.victronenergy.hub4'
SETTINGS_SERVICE_PREFIX = 'com.victronenergy.settings'
UPDATE_PERIOD_MS = 2000
MAX_POWER_PER_BATTERY = 2500
MAX_DAYS_WITHOUT_EOC = 7
SECONDS_PER_DAY = 24 * 60 * 60
GRID_SET_POINT_SETTING = PRODUCT + '/GridSetPoint'
LAST_EOC_SETTING = PRODUCT + '/LastEOC'
CALIBRATION_CHARGE_START_TIME_OF_DAY_SETTING = PRODUCT + '/CalibrationChargeStartTime'
HEAT_LOSS = 150 # W
P_CONST = 0.5 # W/W
Epoch = int
Seconds = int
def time_now():
return int(time.time())
class Controller(object):
def __init__(self, measurement, target, target_type, state):
# type: (float, float, int, int) -> NoReturn
self.target_type = target_type
self.target = target
self.measurement = measurement
self.state = state
d_p = target - measurement
self.delta = d_p * P_CONST
@staticmethod
def min(controllers):
# type: (Iterable[Controller]) -> Controller
return min(controllers, key=lambda c: c.delta)
@staticmethod
def max(controllers):
# type: (Iterable[Controller]) -> Controller
return max(controllers, key=lambda c: c.delta)
def clamp(self, lower_limit_controllers, upper_limit_controllers):
# type: (List[Controller],List[Controller]) -> Controller
c_min = Controller.min(upper_limit_controllers + [self])
return Controller.max(lower_limit_controllers + [c_min])
# noinspection PyMethodMayBeStatic
class InnovEnergyController(DBusService):
def __init__(self):
super(InnovEnergyController, self).__init__(PRODUCT.lower())
self.settings.add_setting(path=LAST_EOC_SETTING, default_value=0) # unix epoch timestamp
self.settings.add_setting(path=GRID_SET_POINT_SETTING, default_value=0) # grid setpoint, Watts
self.settings.add_setting(path=CALIBRATION_CHARGE_START_TIME_OF_DAY_SETTING, default_value=32400) # 09:00
self.own_properties.set('/ProductName', PRODUCT)
self.own_properties.set('/Mgmt/ProcessName', __file__)
self.own_properties.set('/Mgmt/ProcessVersion', VERSION)
self.own_properties.set('/Mgmt/Connection', 'dbus')
self.own_properties.set('/ProductId', PRODUCT)
self.own_properties.set('/FirmwareVersion', VERSION)
self.own_properties.set('/HardwareVersion', VERSION)
self.own_properties.set('/Connected', 1)
self.own_properties.set('/TimeToCalibrationCharge', 'unknown')
self.own_properties.set('/State', 0)
self.phases = [
p for p in ['/Hub4/L1/AcPowerSetpoint', '/Hub4/L2/AcPowerSetpoint', '/Hub4/L3/AcPowerSetpoint']
if self.remote_properties.exists(self.inverter_service + p)
]
self.n_phases = len(self.phases)
print ('The system has ' + str(self.n_phases) + ' phase' + ('s' if self.n_phases != 1 else ''))
self.max_inverter_power = 32700
# ^ defined in https://github.com/victronenergy/dbus_modbustcp/blob/master/CCGX-Modbus-TCP-register-list.xlsx
def clamp_power_command(self, value):
# type: (float) -> int
value = max(value, -self.max_inverter_power)
value = min(value, self.max_inverter_power)
return int(value)
def get_service(self, prefix):
# type: (str) -> Optional[unicode]
service = next((s for s in self.available_services if s.startswith(prefix)), None)
if service is None:
raise Exception('no service matching ' + prefix + '* available')
return service
def is_service_available(self, prefix):
# type: (str) -> bool
return next((True for s in self.available_services if s.startswith(prefix)), False)
@property
def battery_service(self):
# type: () -> Optional[unicode]
return self.get_service(BATTERY_SERVICE_PREFIX)
@property
def battery_available(self):
# type: () -> bool
return self.is_service_available(BATTERY_SERVICE_PREFIX)
@property
def grid_service(self):
# type: () -> Optional[unicode]
return self.get_service(GRID_SERVICE_PREFIX)
@property
def grid_meter_available(self):
# type: () -> bool
return self.is_service_available(GRID_SERVICE_PREFIX)
@property
def inverter_service(self):
# type: () -> Optional[unicode]
return self.get_service(INVERTER_SERVICE_PREFIX)
@property
def inverter_available(self):
# type: () -> bool
return self.is_service_available(INVERTER_SERVICE_PREFIX)
@property
def system_service(self):
# type: () -> Optional[unicode]
return self.get_service(SYSTEM_SERVICE_PREFIX)
@property
def system_service_available(self):
# type: () -> bool
return self.is_service_available(SYSTEM_SERVICE_PREFIX)
@property
def hub4_service(self):
# type: () -> Optional[unicode]
return self.get_service(HUB4_SERVICE_PREFIX)
@property
def hub4_service_available(self):
# type: () -> bool
return self.is_service_available(HUB4_SERVICE_PREFIX)
@property
def inverter_power_setpoint(self):
# type: () -> float
return sum((self.get_inverter_prop(p) for p in self.phases))
def get_battery_prop(self, dbus_path):
# type: (str) -> Any
battery_service = self.battery_service
return self.remote_properties.get(battery_service + dbus_path).value
def get_grid_prop(self, dbus_path):
# type: (str) -> Any
return self.remote_properties.get(self.grid_service + dbus_path).value
def get_inverter_prop(self, dbus_path):
# type: (str) -> Any
return self.remote_properties.get(self.inverter_service + dbus_path).value
def get_system_prop(self, dbus_path):
# type: (str) -> Any
system_service = self.system_service
return self.remote_properties.get(system_service + dbus_path).value
def get_hub4_prop(self, dbus_path):
# type: (str) -> Any
hub4_service = self.hub4_service
return self.remote_properties.get(hub4_service + dbus_path).value
def set_settings_prop(self, dbus_path, value):
# type: (str, Any) -> bool
return self.remote_properties.set(SETTINGS_SERVICE_PREFIX + dbus_path, value)
def set_inverter_prop(self, dbus_path, value):
# type: (str, Any) -> bool
inverter_service = self.inverter_service
return self.remote_properties.set(inverter_service + dbus_path, value)
@property
def max_battery_charge_power(self):
# type: () -> int
return self.get_battery_prop('/Info/MaxChargePower')
@property
def max_battery_discharge_power(self):
# type: () -> int
return self.get_battery_prop('/Info/MaxDischargePower')
@property
def max_configured_charge_power(self):
# type: () -> Optional[int]
max_power = self.settings.get('/Settings/CGwacs/MaxChargePower')
return max_power if max_power >= 0 else None
@property
def max_configured_discharge_power(self): # unsigned
# type: () -> Optional[int]
max_power = self.settings.get('/Settings/CGwacs/MaxDischargePower')
return max_power if max_power >= 0 else None
@property
def max_charge_power(self):
# type: () -> int
if self.max_configured_charge_power is None:
return self.max_battery_charge_power
else:
return min(self.max_battery_charge_power, self.max_configured_charge_power)
@property
def max_discharge_power(self): # unsigned
# type: () -> int
if self.max_configured_discharge_power is None:
return self.max_battery_discharge_power
else:
return min(self.max_battery_discharge_power, self.max_configured_discharge_power)
def set_inverter_power_setpoint(self, power):
# type: (float) -> NoReturn
if self.settings.get('/Settings/CGwacs/BatteryLife/State') == 9:
self.settings.set('/Settings/CGwacs/BatteryLife/State', 0) # enables scheduled charge
self.settings.set('/Settings/CGwacs/Hub4Mode', 3) # disable hub4
self.set_inverter_prop('/Hub4/DisableCharge', 0)
self.set_inverter_prop('/Hub4/DisableFeedIn', 0)
power = self.clamp_power_command(power / self.n_phases)
for p in self.phases:
self.set_inverter_prop(p, power + randint(-1, 1)) # use randint to force dbus re-send
def set_controller_state(self, state):
# type: (int) -> NoReturn
self.own_properties.set('/State', state)
@property
def grid_power(self):
# type: () -> Optional[float]
try:
return self.get_grid_prop('/Ac/Power')
except:
return None
@property
def battery_cold(self):
# type: () -> bool
return self.get_battery_prop('/IoStatus/BatteryCold') == 1
@property
def eoc_reached(self):
# type: () -> bool
if not self.battery_available:
return False
return min(self.get_battery_prop('/EOCReached')) == 1
@property
def battery_power(self):
# type: () -> float
return self.get_battery_prop('/Dc/0/Power')
@property
def inverter_ac_in_power(self):
# type: () -> float
return self.get_inverter_prop('/Ac/ActiveIn/P')
@property
def inverter_ac_out_power(self):
# type: () -> float
return self.get_inverter_prop('/Ac/Out/P')
@property
def soc(self):
# type: () -> float
return self.get_battery_prop('/Soc')
@property
def n_batteries(self):
# type: () -> int
return self.get_battery_prop('/NbOfBatteries')
@property
def min_soc(self):
# type: () -> float
return self.settings.get('/Settings/CGwacs/BatteryLife/MinimumSocLimit')
@property
def should_hold_min_soc(self):
# type: () -> bool
return self.min_soc <= self.soc <= self.min_soc + 5
@property
def utc_offset(self):
# type: () -> int
# stackoverflow.com/a/1301528
# stackoverflow.com/a/3168394
os.environ['TZ'] = self.settings.get('/Settings/System/TimeZone')
time.tzset()
is_dst = time.daylight and time.localtime().tm_isdst > 0
return -(time.altzone if is_dst else time.timezone)
@property
def grid_set_point(self):
# type: () -> float
return self.settings.get('/Settings/CGwacs/AcPowerSetPoint')
@property
def time_to_calibration_charge_str(self):
# type: () -> str
return self.own_properties.get('/TimeToCalibrationCharge').text
@property
def calibration_charge_deadline(self):
# type: () -> Epoch
utc_offset = self.utc_offset
ultimate_deadline = self.settings.get(LAST_EOC_SETTING) + MAX_DAYS_WITHOUT_EOC * SECONDS_PER_DAY
midnight_before_udl = int((ultimate_deadline + utc_offset) / SECONDS_PER_DAY) * SECONDS_PER_DAY - utc_offset # round off to last midnight
dead_line = midnight_before_udl + self.calibration_charge_start_time_of_day
while dead_line > ultimate_deadline: # should fire at most once, but let's be defensive...
dead_line -= SECONDS_PER_DAY # too late, advance one day
return dead_line
@property
def time_to_calibration_charge(self):
# type: () -> Seconds
return self.calibration_charge_deadline - time_now()
@property
def grid_blackout(self):
# type: () -> bool
return self.get_inverter_prop('/Leds/Mains') < 1
@property
def scheduled_charge(self):
# type: () -> bool
return self.get_hub4_prop('/Overrides/ForceCharge') != 0
@property
def calibration_charge_start_time_of_day(self):
# type: () -> Seconds
return self.settings.get(CALIBRATION_CHARGE_START_TIME_OF_DAY_SETTING) # seconds since midnight
@property
def must_do_calibration_charge(self):
# type: () -> bool
return self.time_to_calibration_charge <= 0
def controller_charge_to_min_soc(self):
# type: () -> Controller
return Controller(
measurement=self.battery_power,
target=self.max_charge_power,
target_type=TargetType.BATTERY_DC,
state=State.CHARGE_TO_MIN_SOC
)
def controller_hold_min_soc(self):
# type: () -> Controller
# TODO: explain
a = -4 * HEAT_LOSS * self.n_batteries
b = -a * (self.min_soc + .5)
target_dc_power = a * self.soc + b
return Controller(
measurement = self.battery_power,
target = target_dc_power,
target_type = TargetType.BATTERY_DC,
state = State.HOLD_MIN_SOC
)
def controller_calibration_charge(self):
# type: () -> Controller
return Controller(
measurement = self.battery_power,
target = self.max_charge_power,
target_type = TargetType.BATTERY_DC,
state = State.CALIBRATION_CHARGE
)
def controller_limit_discharge_power(self): # signed
# type: () -> Controller
return Controller(
measurement = self.battery_power,
target = -self.max_discharge_power, # add sign!
target_type = TargetType.BATTERY_DC,
state = State.LIMIT_DISCHARGE_POWER
)
def controller_limit_charge_power(self):
# type: () -> Controller
return Controller(
measurement = self.battery_power,
target = self.max_charge_power,
target_type = TargetType.BATTERY_DC,
state = State.LIMIT_CHARGE_POWER
)
def controller_optimize_self_consumption(self):
# type: () -> Controller
return Controller(
measurement = self.grid_power,
target = self.grid_set_point,
target_type = TargetType.GRID_AC,
state = State.OPTIMIZE_SELF_CONSUMPTION
)
def controller_heating(self):
# type: () -> Controller
return Controller(
measurement = self.battery_power,
target = self.max_charge_power,
target_type = TargetType.BATTERY_DC,
state = State.HEATING
)
def controller_scheduled_charge(self):
# type: () -> Controller
return Controller(
measurement = self.battery_power,
target = self.max_charge_power,
target_type = TargetType.BATTERY_DC,
state = State.SCHEDULED_CHARGE
)
def controller_no_grid_meter(self):
# type: () -> Controller
return Controller(
measurement = self.battery_power,
target = self.max_charge_power,
target_type = TargetType.BATTERY_DC,
state = State.NO_GRID_METER_AVAILABLE
)
def controller_no_battery(self):
# type: () -> Controller
return Controller(
measurement = self.inverter_ac_in_power,
target = 0,
target_type = TargetType.INVERTER_AC_IN,
state = State.NO_BATTERY_AVAILABLE
)
def controller_bridge_grid_blackout(self):
# type: () -> Controller
return Controller(
measurement = 0,
target = 0,
target_type = TargetType.GRID_AC,
state = State.BRIDGE_GRID_BLACKOUT
)
def update_eoc(self):
if self.eoc_reached:
print('battery has reached EOC')
self.settings.set(LAST_EOC_SETTING, time_now())
self.publish_time_to_calibration_charge()
def publish_time_to_calibration_charge(self):
total_seconds = self.time_to_calibration_charge
if total_seconds <= 0:
time_to_eoc_str = 'now'
else:
total_minutes, seconds = divmod(total_seconds, 60)
total_hours, minutes = divmod(total_minutes, 60)
total_days, hours = divmod(total_hours, 24)
days_str = (str(total_days) + 'd') if total_days > 0 else ''
hours_str = (str(hours) + 'h') if total_hours > 0 else ''
minutes_str = (str(minutes) + 'm') if total_days == 0 else ''
time_to_eoc_str = "{0} {1} {2}".format(days_str, hours_str, minutes_str).strip()
self.own_properties.set('/TimeToCalibrationCharge', time_to_eoc_str)
def print_system_stats(self, controller):
# type: (Controller) -> NoReturn
def soc_setpoint():
if controller.state == State.CALIBRATION_CHARGE or controller.state == State.NO_GRID_METER_AVAILABLE:
return ' => 100%'
if controller.state == State.CHARGE_TO_MIN_SOC:
return ' => ' + str(int(self.min_soc)) + '%'
return ''
def setpoint(target_type):
if target_type != controller.target_type:
return ''
return ' => ' + str(int(controller.target)) + 'W'
def p(power):
# type: (Optional[float]) -> str
if power is None:
return ' --- W'
else:
return str(int(power)) + 'W'
ac_loads = None if self.grid_power is None else self.grid_power - self.inverter_ac_in_power
delta = p(controller.delta) if controller.delta < 0 else '+' + p(controller.delta)
battery_power = self.battery_power if self.battery_available else None
soc_ = str(self.soc) + '%' if self.battery_available else '---'
print (State.name_of[controller.state])
print ('')
print ('time to CC: ' + self.time_to_calibration_charge_str)
print (' SOC: ' + soc_ + soc_setpoint())
print (' grid: ' + p(self.grid_power) + setpoint(TargetType.GRID_AC))
print (' battery: ' + p(battery_power) + setpoint(TargetType.BATTERY_DC))
print (' AC in: ' + p(self.inverter_ac_in_power) + ' ' + delta)
print (' AC out: ' + p(self.inverter_ac_out_power))
print (' AC loads: ' + p(ac_loads))
def choose_controller(self):
# type: () -> Controller
if self.grid_blackout:
return self.controller_bridge_grid_blackout()
if not self.battery_available:
return self.controller_no_battery()
if self.battery_cold:
return self.controller_heating()
if self.scheduled_charge:
return self.controller_scheduled_charge()
if self.must_do_calibration_charge:
return self.controller_calibration_charge()
if self.soc < self.min_soc:
return self.controller_charge_to_min_soc()
if not self.grid_meter_available:
return self.controller_no_grid_meter()
hold_min_soc = self.controller_hold_min_soc()
limit_discharge_power = self.controller_limit_discharge_power() # signed
lower_limit = [limit_discharge_power, hold_min_soc]
# No upper limit. We no longer actively limit charge power. DC/DC Charger inside the BMS will do that for us.
upper_limit = []
optimize_self_consumption = self.controller_optimize_self_consumption()
return optimize_self_consumption.clamp(lower_limit, upper_limit)
def update(self):
print('iteration started\n')
self.update_eoc()
if self.inverter_available:
controller = self.choose_controller()
power = self.inverter_ac_in_power + controller.delta
self.set_inverter_power_setpoint(power)
self.set_controller_state(controller.state)
self.print_system_stats(controller) # for debug
else:
self.set_controller_state(State.NO_INVERTER_AVAILABLE)
print('inverter not available!')
print('\niteration finished\n')
def main():
print('starting ' + __file__)
with InnovEnergyController() as service:
run_on_main_loop(service.update, UPDATE_PERIOD_MS)
print(__file__ + ' has shut down')
if __name__ == '__main__':
main()

View File

@ -54,6 +54,6 @@ INNOVENERGY_PROTOCOL_VERSION = '48TL200V3'
# S3 Credentials # S3 Credentials
S3BUCKET = "5-c0436b6a-d276-4cd8-9c44-1eae86cf5d0e" S3BUCKET = "10-c0436b6a-d276-4cd8-9c44-1eae86cf5d0e"
S3KEY = "EXO6bb63d9bbe5f938a68fa444b" S3KEY = "EXOa8cc58d2e51e389fed9ccbfa"
S3SECRET = "A4-5wIjIlAqn-p0cUkQu0f9fBIrX1V5PGTBDwjsrlR8" S3SECRET = "hofDGMmSSN1OACYXHWRUGdG61mFjBxKC18sF0VpMQgY"

Binary file not shown.

View File

@ -169,6 +169,26 @@ def read_bitmap(register):
return get_value return get_value
def read_limb_string(register):
# type: (int) -> Callable[[BatteryStatus], bitmap]
def get_value(status):
# type: (BatteryStatus) -> bitmap
value = status.modbus_data[register - cfg.BASE_ADDRESS]
string1_disabled = int((value & 0b00001) != 0)
string2_disabled = int((value & 0b00010) != 0)
string3_disabled = int((value & 0b00100) != 0)
string4_disabled = int((value & 0b01000) != 0)
string5_disabled = int((value & 0b10000) != 0)
n_limb_strings = string1_disabled+string2_disabled+string3_disabled+string4_disabled+string5_disabled
if n_limb_strings>=2:
return True
else:
return False
return get_value
def return_in_list(ts): def return_in_list(ts):
return ts return ts

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,54 @@
from logging import getLogger
from python_libs.ie_utils.mixins import Disposable, RequiresMainLoop, Record
from python_libs.ie_dbus.private.dbus_daemon import DBusDaemon
from python_libs.ie_dbus.private.own_properties import OwnProperties
from python_libs.ie_dbus.private.remote_properties import RemoteProperties
from python_libs.ie_dbus.private.ve_constants import SERVICE_PREFIX
from python_libs.ie_dbus.private.settings import Settings
_log = getLogger(__name__)
# noinspection PyUnreachableCode
if False:
from typing import Union, AnyStr, NoReturn, List
def _enforce_ve_prefix(service_name_filter):
if not service_name_filter.startswith(SERVICE_PREFIX):
raise ValueError('service_name_filter must start with ' + SERVICE_PREFIX)
SESSION_BUS = 0
SYSTEM_BUS = 1
class DBusService(Record, Disposable, RequiresMainLoop):
def __init__(self, service_name=None, device_instance=1, connection_type_or_address=SYSTEM_BUS):
# type: (str, int, Union[int, AnyStr]) -> NoReturn
service_name = service_name if service_name.startswith(SERVICE_PREFIX) else SERVICE_PREFIX + service_name
self._daemon = DBusDaemon(connection_type_or_address)
self.remote_properties = RemoteProperties(self._daemon)
self.own_properties = OwnProperties(self._daemon)
self.own_properties.set('/DeviceInstance', device_instance) # must be set before request_name, sigh
self.settings = Settings(self._daemon, self.remote_properties)
self.name = service_name
if service_name is not None:
self._bus_name = self._daemon.request_name(service_name)
_log.info('service name is ' + service_name)
_log.info('id is ' + self.bus_id)
@property
def available_services(self):
# type: () -> List[unicode]
return [s.name for s in self._daemon.services]
@property
def bus_id(self):
# type: () -> unicode
return self._daemon.bus_id

View File

@ -0,0 +1,22 @@
from logging import getLogger
from python_libs.ie_utils.mixins import Record
_log = getLogger(__name__)
# noinspection PyUnreachableCode
if False:
from typing import AnyStr
class ServiceInfo(Record):
# noinspection PyShadowingBuiltins
def __init__(self, name, id, pid, proc_name, cmd):
# type: (AnyStr, AnyStr, int, str, str) -> ServiceInfo
self.proc_name = proc_name
self.name = name
self.id = id
self.cmd = cmd
self.pid = pid

View File

@ -0,0 +1,185 @@
from logging import getLogger
from _dbus_bindings import Connection, MethodCallMessage, SignalMessage, BUS_DAEMON_NAME, \
BUS_DAEMON_PATH, BUS_DAEMON_IFACE, NAME_FLAG_DO_NOT_QUEUE, Message, HANDLER_RESULT_HANDLED
from python_libs.ie_dbus.private.dbus_types import dbus_string, dbus_uint32
from python_libs.ie_dbus.private.message_types import DBusException
from python_libs.ie_utils.mixins import Disposable
_log = getLogger(__name__)
# noinspection PyUnreachableCode
if False:
from typing import List, Optional, Iterable, Callable, Union, NoReturn, AnyStr, Any
from python_libs.ie_dbus.private.dbus_types import DbusType
class DbusConnection(Disposable):
"""
A collection of stateless functions operating on a Connection object
"""
def __init__(self, connection_type_or_address):
# type: (Union[int, AnyStr]) -> NoReturn
self._address = connection_type_or_address
# noinspection PyProtectedMember
self._connection = Connection._new_for_bus(connection_type_or_address) # it's not disposable
self.chain_disposable(self._connection.close, 'connection ' + self._connection.get_unique_name())
@property
def bus_id(self):
return self._connection.get_unique_name()
def fork(self):
return DbusConnection(self._address)
def get_ids_and_service_names(self):
# type: () -> Iterable[unicode]
# noinspection PyTypeChecker
return map(unicode, self.call_daemon_method('ListNames')[0])
def get_service_names(self):
# type: () -> Iterable[AnyStr]
return (
unicode(name)
for name
in self.get_ids_and_service_names()
if not name.startswith(':')
)
def get_service_ids(self):
# type: () -> Iterable[AnyStr]
return (
name
for name in self.get_ids_and_service_names() if name.startswith(':'))
# noinspection PyBroadException
def get_pid_of_service(self, service_name):
# type: (AnyStr) -> Optional[int]
try:
reply = self.call_daemon_method('GetConnectionUnixProcessID', dbus_string(service_name))
return int(reply[0])
except:
return None
def get_id_of_service(self, service_name):
# type: (AnyStr) -> AnyStr
reply = self.call_daemon_method('GetNameOwner', dbus_string(service_name))
return unicode(reply[0])
def call_method(self, service_name, object_path, interface, member, *args):
# type: (AnyStr, AnyStr, Optional[str], str, List[Any]) -> List[Any]
msg = MethodCallMessage(service_name, object_path, interface, member)
for arg in args:
msg.append(arg)
reply = self._connection.send_message_with_reply_and_block(msg) # with py3 we could use asyncio here
DBusException.raise_if_error_reply(reply)
return reply.get_args_list() # TODO: utf8_strings=True ?
def send_message(self, msg):
# type: (Message) -> NoReturn
self._connection.send_message(msg)
def call_daemon_method(self, method_name, *args):
# type: (AnyStr, Iterable[DbusType])-> List[any]
return self.call_method(BUS_DAEMON_NAME, BUS_DAEMON_PATH, BUS_DAEMON_IFACE, method_name, *args)
def request_name(self, service_name):
# type: (AnyStr) -> Disposable
_log.debug('requesting bus name ' + service_name)
self.call_daemon_method('RequestName', dbus_string(service_name), dbus_uint32(NAME_FLAG_DO_NOT_QUEUE))
def dispose():
self.call_daemon_method('ReleaseName', dbus_string(service_name))
return self.create_dependent_disposable(dispose, 'bus name ' + service_name)
def broadcast_signal(self, object_path, interface, member, *args):
# type: (AnyStr, AnyStr, AnyStr, List[Any]) -> NoReturn
msg = SignalMessage(object_path, interface, member)
for arg in args:
msg.append(arg)
self._connection.send_message(msg)
def add_message_callback(self, callback, filter_rule, fork=True):
# type: (Callable[[Message], NoReturn], AnyStr, Optional[bool]) -> Disposable
if fork:
return self._add_message_callback_fork(callback, filter_rule)
else:
return self._add_message_callback_no_fork(callback, filter_rule)
def _add_message_callback_no_fork(self, callback, filter_rule): # TODO: forking for incoming method calls
# type: (Callable[[Message], NoReturn], AnyStr) -> Disposable
def dispatch(_, msg):
# type: (Connection, Message) -> int
#_log.info(' ####### got message type=' + str(msg.get_type()) + ' ' + msg.get_path() + '/' + msg.get_member())
callback(msg)
#_log.debug('DONE')
return HANDLER_RESULT_HANDLED
msg_filter = self._add_message_filter(dispatch)
match = self._add_match(filter_rule)
def dispose():
match.dispose()
msg_filter.dispose()
return self.create_dependent_disposable(dispose)
def _add_message_callback_fork(self, callback, filter_rule):
# type: (Callable[[Message], NoReturn], AnyStr) -> Disposable
forked = self.fork()
_log.debug('forked connection ' + forked.bus_id)
def dispatch(_, msg):
# type: (Connection, Message) -> int
# _log.debug('got message type=' + str(msg.get_type()) + ' ' + msg.get_path() + '/' + msg.get_member())
callback(msg)
return HANDLER_RESULT_HANDLED
forked._add_message_filter(dispatch)
forked._add_match(filter_rule)
return self.create_dependent_disposable(forked)
def _add_message_filter(self, callback):
# type: (Callable[[Connection, Message], int]) -> Disposable
_log.debug('added filter on ' + self.bus_id)
self._connection.add_message_filter(callback)
def dispose():
self._connection.remove_message_filter(callback)
return self.create_dependent_disposable(dispose, 'message filter on ' + self.bus_id)
def _add_match(self, filter_rule):
# type: (AnyStr) -> Disposable
self.call_daemon_method('AddMatch', dbus_string(filter_rule))
_log.debug('added match_rule: ' + filter_rule)
def dispose():
self.call_daemon_method('RemoveMatch', dbus_string(filter_rule))
return self.create_dependent_disposable(dispose, 'Match ' + filter_rule)

View File

@ -0,0 +1,273 @@
from logging import getLogger
from _dbus_bindings import Message, ErrorMessage, BUS_DAEMON_NAME, BUS_DAEMON_PATH, BUS_DAEMON_IFACE
from python_libs.ie_dbus.private.datatypes import ServiceInfo
from python_libs.ie_dbus.private.dbus_connection import DbusConnection
from python_libs.ie_dbus.private.message_types import MatchedMessage, MessageFilter, ResolvedMessage
from python_libs.ie_utils.mixins import Disposable, RequiresMainLoop
_log = getLogger(__name__)
NONE = '<none>'
# noinspection PyUnreachableCode
if False:
from typing import Callable, List, Optional, Iterable, Union, AnyStr, NoReturn, Any, Dict
from python_libs.ie_dbus.private.dbus_types import DbusType
class DBusDaemon(Disposable, RequiresMainLoop):
_services = None # type: Dict[str, ServiceInfo]
def __init__(self, connection_type_or_address):
# type: (Union[int, AnyStr]) -> NoReturn
self._dbus = DbusConnection(connection_type_or_address)
# self._dbus.add_message_callback(lambda _: None, 'type=method_call', fork=False) # sink method calls, TODO
self._name_changed = self.subscribe_to_signal_message(
self._on_name_owner_changed,
sender_id=BUS_DAEMON_NAME,
object_path=BUS_DAEMON_PATH,
interface=BUS_DAEMON_IFACE,
member='NameOwnerChanged')
self._services = self._init_services()
@property
def bus_id(self):
# type: () -> AnyStr
return self._dbus.bus_id
@property
def services(self):
# type: () -> Iterable[ServiceInfo]
return self._services.itervalues()
def subscribe_to_signal_message(
self,
callback,
sender_id='*',
sender_name='*',
object_path='*',
interface='*',
member='*',
signature='*'):
# type: (Callable[[MatchedMessage], None], Optional[AnyStr], Optional[AnyStr], Optional[AnyStr], Optional[AnyStr], Optional[AnyStr], Optional[AnyStr]) -> Disposable
message_filter = MessageFilter(
message_type='signal',
sender_id=sender_id,
sender_name=sender_name,
object_path=object_path,
interface=interface,
member=member,
signature=signature)
def dispatch(msg):
# type: (Message) -> NoReturn
resolved_msg = self._resolve_message(msg)
matched = message_filter.match_message(resolved_msg)
if matched is not None:
callback(matched)
return self._dbus.add_message_callback(dispatch, message_filter.filter_rule)
def subscribe_to_method_call_message(
self,
callback,
sender_id='*',
sender_name='*',
object_path='*',
interface='*',
member='*',
signature='*',
destination_id='*',
destination_name='*'):
# type: (Callable[[MatchedMessage], Any], Optional[AnyStr], Optional[AnyStr], Optional[AnyStr], Optional[AnyStr], Optional[AnyStr], Optional[AnyStr], Optional[AnyStr], Optional[bool]) -> Disposable
message_filter = MessageFilter(
message_type='method_call',
sender_id=sender_id,
sender_name=sender_name,
object_path=object_path,
interface=interface,
member=member,
signature=signature,
destination_id=destination_id,
destination_name=destination_name) # TODO: eavesdrop logic
def dispatch(msg):
# type: (Message) -> NoReturn
if msg.get_type() != 1:
return
resolved_msg = self._resolve_message(msg)
matched = message_filter.match_message(resolved_msg)
if matched is None:
reply = ErrorMessage(msg, 'com.victronenergy.method_call_refused', 'refused')
else:
try:
result = callback(matched)
except Exception as e:
# _log.debug('method_call threw an exception ' + str(e))
# traceback.print_exc()
reply = matched.create_error_reply(e)
else:
reply = matched.create_method_reply(result)
self._dbus.send_message(reply)
return self._dbus.add_message_callback(dispatch, message_filter.filter_rule, fork=False)
def request_name(self, service_name):
# type: (AnyStr) -> Disposable
return self._dbus.request_name(service_name)
def call_method(self, service_name, object_path, interface, member, *args):
# type: (AnyStr, AnyStr, AnyStr, AnyStr, Iterable[DbusType]) -> List[Any]
return self._dbus.call_method(service_name, object_path, interface, member, *args)
def broadcast_signal(self, object_path, interface, member, *args):
# type: (AnyStr, AnyStr, AnyStr, List[DbusType]) -> NoReturn
self._dbus.broadcast_signal(object_path, interface, member, *args)
def get_service_names_of_id(self, service_id):
# type: (str) -> List[AnyStr]
if service_id is None:
return []
return [
s.name
for s in self.services
if s.id == service_id
]
def get_id_for_service_name(self, service_name):
# type: (AnyStr) -> Optional[AnyStr]
return next((s.id for s in self.services if s.name == service_name), None)
def exists_service_with_name(self, service_name):
# type: (AnyStr) -> bool
return self.get_id_for_service_name(service_name) is not None
def _resolve_message(self, msg):
# type: (Message) -> ResolvedMessage
sender_id, sender_names = self._resolve_name(msg.get_sender())
destination_id, destination_names = self._resolve_name(msg.get_destination())
return ResolvedMessage(msg, sender_id, sender_names, destination_id, destination_names)
# noinspection PyShadowingBuiltins
def _resolve_name(self, name):
# type: (str) -> (str, List[str])
if name is None:
id = NONE
names = []
elif name.startswith(':'):
id = name
names = self.get_service_names_of_id(name)
else:
id = self.get_id_for_service_name(name)
names = [name]
return id, names
def _on_name_owner_changed(self, msg):
# type: (MatchedMessage) -> NoReturn
(name, old_id, new_id) = msg.arguments
old_id = old_id.strip()
new_id = new_id.strip()
name = name.strip()
if name.startswith(':'):
name = None
added = old_id == '' and new_id != ''
changed = old_id != '' and new_id != ''
removed = old_id != '' and new_id == ''
# 'changed' is dispatched as 'removed' followed by 'added'
if removed or changed:
self._services.pop(old_id, None)
if added or changed:
service = self._create_service(name, new_id)
self._services[new_id] = service
# noinspection PyShadowingBuiltins
def _init_services(self):
# type: () -> Dict[str, ServiceInfo]
services = dict()
names_and_ids = self._dbus.get_ids_and_service_names()
ids = set([i for i in names_and_ids if i.startswith(':')])
names = [n for n in names_and_ids if not n.startswith(':')]
for service_name in names:
service = self._create_service(service_name)
services[service.id] = service
ids.discard(service.id)
self._services = services # UGLY, because _create_service below references it.
for id in ids:
services[id] = self._create_service(id=id)
return services
def _search_service_name_by_pid(self, pid):
# type: (int) -> Optional[AnyStr]
return next((s.name for s in self.services if s.pid == pid and s.name != NONE), NONE)
# noinspection PyShadowingBuiltins
def _create_service(self, name=None, id=None):
# type: (Optional[AnyStr], Optional[AnyStr]) -> ServiceInfo
id = id or self._dbus.get_id_of_service(name)
pid = self._dbus.get_pid_of_service(id)
proc = self._get_process_name_of_pid(pid)
cmd = self._get_commandline_of_pid(pid)
name = name or self._search_service_name_by_pid(pid)
return ServiceInfo(name, id, pid, proc, cmd)
# noinspection PyBroadException
@staticmethod
def _get_process_name_of_pid(service_pid):
# type: (int) -> str
try:
with open('/proc/{0}/comm'.format(service_pid)) as proc:
return proc.read().replace('\0', ' ').rstrip()
except Exception as _:
return '<unknown>'
# noinspection PyBroadException
@staticmethod
def _get_commandline_of_pid(service_pid):
# type: (int) -> str
try:
with open('/proc/{0}/cmdline'.format(service_pid)) as proc:
return proc.read().replace('\0', ' ').rstrip()
except Exception as _:
return '<unknown>'

View File

@ -0,0 +1,259 @@
from fnmatch import fnmatch as glob
from logging import getLogger
from _dbus_bindings import ErrorMessage, Message, MethodReturnMessage
from python_libs.ie_utils.mixins import Record
_log = getLogger(__name__)
# noinspection PyUnreachableCode
if False:
from typing import List, Optional, Iterable, AnyStr, NoReturn, Any
class MessageType(object):
invalid = 0
method_call = 1
method_return = 2
error = 3
signal = 4
@staticmethod
def parse(message_type):
# type: (int) -> str
if message_type == 1:
return 'method_call'
if message_type == 2:
return 'method_return'
if message_type == 3:
return 'error'
if message_type == 4:
return 'signal'
return 'invalid'
class DBusMessage(Record):
def __init__(self, msg, sender_id, destination_id):
# type: (Message, str, str) -> NoReturn
self.sender_id = sender_id
self.destination_id = destination_id
self._msg = msg
@property
def expects_reply(self):
# type: () -> bool
return not self._msg.get_no_reply()
@property
def message_type(self):
# type: () -> int
return int(self._msg.get_type())
@property
def reply_serial(self):
# type: () -> int
return int(self._msg.get_reply_serial())
@property
def object_path(self):
# type: () -> str
return str(self._msg.get_path())
@property
def interface(self):
# type: () -> str
return str(self._msg.get_interface())
@property
def arguments(self):
# type: () -> List[Any]
return self._msg.get_args_list(utf8_strings=True)
@property
def signature(self):
# type: () -> str
return str(self._msg.get_signature())
@property
def serial(self):
# type: () -> int
return int(self._msg.get_serial())
@property
def member(self):
# type: () -> str
return str(self._msg.get_member())
def create_method_reply(self, *args):
# type: (List[any]) -> MethodReturnMessage
if self.message_type != MessageType.method_call:
raise Exception('cannot create a reply for a message that is not a method call')
reply = MethodReturnMessage(self._msg)
for arg in args:
reply.append(arg)
return reply
def create_error_reply(self, exception):
# type: (Exception) -> ErrorMessage
if self.message_type != MessageType.method_call:
raise Exception('cannot create an error reply for a message that is not a method call')
return ErrorMessage(self._msg, 'com.victronenergy.' + exception.__class__.__name__, exception.message) # TODO prefix
class ResolvedMessage(DBusMessage):
def __init__(self, msg, sender_id, sender_names, destination_id, destination_names):
# type: (Message, str, List[str], str, List[str]) -> NoReturn
super(ResolvedMessage, self).__init__(msg, sender_id, destination_id)
self.sender_names = sender_names
self.destination_names = destination_names
class MatchedMessage(DBusMessage):
def __init__(self, resolved_msg, sender_name, destination_name):
# type: (ResolvedMessage, str, str) -> NoReturn
super(MatchedMessage, self).__init__(resolved_msg._msg, resolved_msg.sender_id, resolved_msg.destination_id)
self.sender_name = sender_name
self.destination_name = destination_name
class MessageFilter(Record):
def __init__(
self,
message_type='*',
sender_id='*',
sender_name='*',
object_path='*',
interface='*',
member='*',
signature='*',
destination_id='*',
destination_name='*',
eavesdrop=False):
# type: (Optional[AnyStr],Optional[AnyStr],Optional[AnyStr],Optional[AnyStr],Optional[AnyStr],Optional[AnyStr],Optional[AnyStr],Optional[AnyStr],Optional[AnyStr],Optional[bool]) -> NoReturn
self.signature = signature
self.message_type = message_type
self.member = member
self.interface = interface
self.object_path = object_path
self.sender_id = sender_id
self.sender_name = sender_name
self.destination_id = destination_id
self.destination_name = destination_name
self.eavesdrop = eavesdrop
@staticmethod
def create_filter_rule(
message_type='*',
sender_id='*',
sender_name='*',
object_path='*',
interface='*',
member='*',
destination_id='*',
eavesdrop=False):
# type: (Optional[AnyStr],Optional[AnyStr],Optional[AnyStr],Optional[AnyStr],Optional[AnyStr],Optional[AnyStr],Optional[AnyStr],bool) -> AnyStr
rules = []
def rule(key, value):
if '*' not in value and '?' not in value:
rules.append("%s='%s'" % (key, value))
rule('type', message_type)
rule('sender', sender_id if sender_name == '*' and sender_id != '*' else sender_name)
rule('destination', destination_id)
rule('eavesdrop', 'true' if eavesdrop else 'false')
rule('path', object_path) # TODO: endswith *, object namespace
rule('interface', interface)
rule('member', member)
return ','.join(rules)
@property
def filter_rule(self):
# type: () -> AnyStr
return self.create_filter_rule(
message_type=self.message_type,
sender_id=self.sender_id,
sender_name=self.sender_name,
object_path=self.object_path,
interface=self.interface,
member=self.member,
destination_id=self.destination_id,
eavesdrop=self.eavesdrop)
@staticmethod
def _get_matching_name(names, name_filter):
# type: (Iterable[AnyStr], AnyStr) -> Optional[AnyStr]
matching_names = (
name
for name
in names
if glob(name, name_filter)
)
return next(matching_names, None)
def match_message(self, msg):
# type: (ResolvedMessage) -> Optional[MatchedMessage]
match = \
glob(msg.object_path, self.object_path) and \
glob(msg.interface or '<none>', self.interface) and \
glob(msg.member, self.member) and \
glob(msg.signature, self.signature) and \
glob(msg.sender_id, self.sender_id) and \
glob(msg.destination_id or '<none>', self.destination_id)
if not match:
return None
sender_name = self._get_matching_name(msg.sender_names, self.sender_name)
if sender_name is None and self.sender_name != '*': # sender might not have a well known name
return None
destination_name = self._get_matching_name(msg.destination_names, self.destination_name)
if destination_name is None and self.destination_name != '*':
return None
return MatchedMessage(msg, sender_name, destination_name)
class DBusException(Exception):
def __init__(self, message):
super(Exception, self).__init__(message)
@classmethod
def raise_if_error_reply(cls, reply):
# type: (Message) -> Message
if isinstance(reply, ErrorMessage):
raise DBusException(reply.get_error_name())
else:
return reply

View File

@ -0,0 +1,177 @@
from logging import getLogger
import dbus
from python_libs.ie_dbus.private.dbus_types import dbus_variant, dbus_string
from python_libs.ie_dbus.private.dbus_daemon import DBusDaemon
from python_libs.ie_dbus.private.message_types import MatchedMessage
from python_libs.ie_dbus.private.ve_constants import GET_TEXT, INTERFACE_BUS_ITEM, PROPERTIES_CHANGED, GET_VALUE, SET_VALUE
from python_libs.ie_utils.mixins import Disposable, Record
_log = getLogger(__name__)
# noinspection PyUnreachableCode
if False:
from typing import Optional, AnyStr, NoReturn, Dict, Any
from python_libs.ie_dbus.private.dbus_types import DbusVariant, DbusString, DbusVariantDict, DbusType
class OwnProperty(Record):
def __init__(self, value, unit='', writable=False):
str_value = round(value, 2) if isinstance(value, float) else value
self.text = unicode(str_value) + unit
self.value = value
self.unit = unit
self.writable = writable
@property
def dbus_dict(self):
# type: () -> dbus.Dictionary
d = {
dbus.String('Text'): dbus_variant(self.text),
dbus.String('Value'): dbus_variant(self.value)
}
return dbus.Dictionary(d, signature='sv')
@property
def dbus_value(self):
# type: () -> DbusVariant
return dbus_variant(self.value)
@property
def dbus_text(self):
# type: () -> DbusString
return dbus_string(self.text)
def update_value(self, value):
# type: (any) -> OwnProperty
return OwnProperty(value, self.unit, self.writable)
def __iter__(self):
yield self.value
yield self.text
class OwnProperties(Disposable):
_own_properties = None # type: Dict[AnyStr, OwnProperty]
# noinspection PyProtectedMember
def __init__(self, daemon):
# type: (DBusDaemon) -> NoReturn
self._daemon = daemon
self._own_properties = dict()
self._method_call_subs = self._daemon.subscribe_to_method_call_message(self._on_method_called) # no filter whatsoever
def get(self, object_path):
# type: (AnyStr) -> OwnProperty
return self._own_properties[object_path]
def set(self, object_path, value, unit='', writable=False):
# type: (AnyStr, any, Optional[AnyStr], Optional[bool]) -> bool
prop = OwnProperty(value, unit, writable)
if object_path in self._own_properties:
if self._own_properties[object_path] == prop:
return False
self._own_properties[object_path] = prop
# object_path, interface, member, *args):
self._daemon.broadcast_signal(
object_path,
INTERFACE_BUS_ITEM,
PROPERTIES_CHANGED,
prop.dbus_dict)
return True
def _on_method_called(self, message):
# type: (MatchedMessage) -> Any
# _log.info(str(message.sender_name) + '(' + str(message.sender_id) + ') asked ' + message.member + ' ' + message.object_path)
if message.member == GET_VALUE:
return self._on_get_value_called(message)
elif message.member == GET_TEXT:
return self._on_get_text_called(message)
elif message.member == SET_VALUE:
return self._on_set_value_called(message)
def _on_set_value_called(self, message):
# type: (MatchedMessage) -> bool
path = message.object_path
if path not in self._own_properties:
raise Exception('property ' + path + ' does not exist')
prop = self._own_properties[path]
if not prop.writable:
raise Exception('property ' + path + ' is read-only')
value = message.arguments[0]
if prop.value == value:
return False
prop = prop.update_value(value)
self._own_properties[path] = prop
# object_path, interface, member, *args):
self._daemon.broadcast_signal(
path,
INTERFACE_BUS_ITEM,
PROPERTIES_CHANGED,
prop.dbus_dict)
return True
def _on_get_value_called(self, message):
# type: (MatchedMessage) -> DbusType
path = message.object_path
if path in self._own_properties:
return self._own_properties[path].dbus_value
if path.endswith('/'): # "Tree Export"
values = {
dbus.String(k.lstrip('/')): dbus_variant(p.value)
for (k, p)
in self._own_properties.iteritems()
if k.startswith(path)
}
return dbus.Dictionary(values, signature='sv', variant_level=1) # variant for tree export !!
raise Exception('property ' + path + ' does not exist')
def _on_get_text_called(self, message):
# type: (MatchedMessage) -> DbusType
path = message.object_path
if path in self._own_properties:
return self._own_properties[message.object_path].dbus_text
if path.endswith('/'): # "Tree Export"
values = {
dbus.String(k.lstrip('/')): dbus.String(p.text)
for (k, p)
in self._own_properties.iteritems()
if k.startswith(path)
}
return dbus.Dictionary(values, signature='ss', variant_level=1) # variant for tree export !!
raise Exception('property ' + path + ' does not exist')
def __contains__(self, object_path):
# type: (AnyStr) -> bool
return object_path in self._own_properties

View File

@ -0,0 +1,166 @@
from logging import getLogger
from python_libs.ie_dbus.private.dbus_types import dbus_variant
from python_libs.ie_utils.mixins import Disposable, Record
from python_libs.ie_dbus.private.dbus_daemon import DBusDaemon
from python_libs.ie_dbus.private.message_types import MatchedMessage
from python_libs.ie_dbus.private.ve_constants import GET_TEXT, INTERFACE_BUS_ITEM, PROPERTIES_CHANGED, GET_VALUE, SERVICE_PREFIX, SET_VALUE
_log = getLogger(__name__)
_UNKNOWN_TEXT = '<UNKNOWN_TEXT>'
# noinspection PyUnreachableCode
if False:
from typing import List, AnyStr, NoReturn, Dict, Any
class RemoteProperty(Record):
def __init__(self, value, text):
self.text = text
self.value = value
@staticmethod
def from_dbus_dict(dbus_dict):
value = dbus_dict['Value']
text = dbus_dict['Text']
return RemoteProperty(value, text)
class RemoteProperties(Disposable):
_remote_properties = None # type: Dict[AnyStr, RemoteProperty]
def __init__(self, daemon):
# type: (DBusDaemon) -> NoReturn
self._daemon = daemon
self._remote_properties = dict()
# noinspection PyBroadException
def available_properties(self, service_name):
# type: (unicode) -> List[unicode]
if not self._daemon.exists_service_with_name(service_name):
return []
try:
paths = self._call_remote(service_name=service_name, object_path='/', member=GET_TEXT)[0].keys()
except Exception as _:
return []
else:
return ['/' + str(path) for path in paths]
def exists(self, combined_path):
# type: (AnyStr) -> bool
service_name, object_path, combined_path = self._parse_combined_path(combined_path)
return object_path in self.available_properties(service_name)
def get(self, combined_path):
# type: (AnyStr) -> RemoteProperty
service_name, object_path, combined_path = self._parse_combined_path(combined_path)
if combined_path in self._remote_properties:
cached = self._remote_properties[combined_path]
# a cached prop might have an unknown text, because its value has been written before,
# but it has never read or updated via property-changed
if cached.text != _UNKNOWN_TEXT:
return cached
text = self._get_text(service_name, object_path)
self._remote_properties[combined_path] = RemoteProperty(cached.value, text)
return self._remote_properties[combined_path]
prop = self._get_property(service_name, object_path)
self._remote_properties[combined_path] = prop
self._subscribe_to_property_changed(service_name, object_path)
return prop
def set(self, combined_path, value):
# type: (AnyStr, any) -> bool
service_name, object_path, combined_path = self._parse_combined_path(combined_path)
if combined_path in self._remote_properties:
if self._remote_properties[combined_path].value == value:
return False # property already has the requested value => nothing to do
else:
self._subscribe_to_property_changed(service_name, object_path)
result = self._call_remote(service_name, object_path, SET_VALUE, dbus_variant(value))[0]
if result != 0:
raise Exception(service_name + ' refused to set value of ' + object_path + ' to ' + str(value))
self._remote_properties[combined_path] = RemoteProperty(value, _UNKNOWN_TEXT)
return True
def _subscribe_to_property_changed(self, service_name, object_path):
# type: (unicode, unicode) -> NoReturn
def callback(msg):
# type: (MatchedMessage) -> NoReturn
prop = RemoteProperty.from_dbus_dict(msg.arguments[0])
key = msg.sender_name+msg.object_path
self._remote_properties[key] = prop
signal = self._daemon.subscribe_to_signal_message(
callback=callback,
sender_name=service_name,
object_path=object_path,
interface=INTERFACE_BUS_ITEM, # TODO: <- this could be removed to make it more robust, in theory
member=PROPERTIES_CHANGED) # TODO: OTOH, don't fix if it is not broken
self.chain_disposable(signal, 'signal subscription on ' + self._daemon.bus_id + ' ' + service_name + object_path)
def _get_value(self, service_name, object_path):
# type: (unicode, unicode) -> any
return self._call_remote(service_name, object_path, GET_VALUE)[0]
def _get_text(self, service_name, object_path):
# type: (unicode, unicode) -> unicode
result = self._call_remote(service_name, object_path, GET_TEXT)[0]
return unicode(result)
def _get_property(self, service_name, object_path):
# type: (unicode, unicode) -> RemoteProperty
value = self._get_value(service_name, object_path)
text = self._get_text(service_name, object_path)
return RemoteProperty(value, text)
def _call_remote(self, service_name, object_path, member, *args):
# type: (unicode, unicode, unicode, List[Any]) -> List[Any]
return self._daemon.call_method(service_name, object_path, INTERFACE_BUS_ITEM, member, *args)
def _parse_combined_path(self, combined_path):
# type: (str) -> (unicode,unicode,unicode)
service_name, object_path = combined_path.lstrip('/').split('/', 1)
if service_name == '':
raise Exception('Failed to parse service name. \ncombined_path must be of the form "service_name/path/to/property"')
if object_path == '':
raise Exception('Failed to parse object path. \ncombined_path must be of the form "service_name/path/to/property"')
service_name = service_name if service_name.startswith(SERVICE_PREFIX) else SERVICE_PREFIX + service_name
if not self._daemon.exists_service_with_name(service_name):
raise Exception('there is no service with the name "' + service_name + '" on the bus')
object_path = '/' + object_path
return unicode(service_name), unicode(object_path), unicode(service_name + object_path)

View File

@ -0,0 +1,89 @@
from logging import getLogger
from python_libs.ie_dbus.private.dbus_types import dbus_string, dbus_int_variant, dbus_string_variant, dbus_double_variant, dbus_variant
from python_libs.ie_utils.mixins import Record
from python_libs.ie_dbus.private.dbus_daemon import DBusDaemon
from python_libs.ie_dbus.private.remote_properties import RemoteProperties
from python_libs.ie_dbus.private.ve_constants import SETTINGS_SERVICE, SETTINGS_INTERFACE, SETTINGS_PREFIX
_log = getLogger(__name__)
# noinspection PyUnreachableCode
if False:
from typing import Union, NoReturn, Optional, AnyStr
def prepend_settings_prefix(path):
# type: (AnyStr) -> any
path = '/' + path.lstrip('/')
path = path if path.startswith(SETTINGS_PREFIX) else SETTINGS_PREFIX + path
return path
class Settings(Record):
# noinspection PyProtectedMember
def __init__(self, daemon, remote_properties):
# type: (DBusDaemon, RemoteProperties) -> NoReturn
self._daemon = daemon
self._remote_properties = remote_properties
# noinspection PyShadowingBuiltins
def add_setting(self, path, default_value, min=None, max=None, silent=False):
# type: (AnyStr, Union[unicode, int, float], Union[int, float, None], Union[int, float, None], Optional[bool]) -> NoReturn
path = prepend_settings_prefix(path)
if isinstance(default_value, int):
item_type = 'i'
elif isinstance(default_value, float):
item_type = 'f'
elif isinstance(default_value, (str, unicode)):
item_type = 's'
else:
raise Exception('Unsupported Settings Type')
reply = self._daemon.call_method(
SETTINGS_SERVICE, # service_name
'/', # object_path
SETTINGS_INTERFACE, # interface
'AddSilentSetting' if silent else 'AddSetting', # member,
dbus_string(''), # "group", not used
dbus_string(path),
dbus_variant(default_value),
dbus_string(item_type),
dbus_int_variant(min or 0),
dbus_int_variant(max or 0))
if reply[0] != 0:
raise Exception('failed to add setting ' + path)
def exists(self, path):
# type: (unicode) -> bool
path = prepend_settings_prefix(path)
return path in self.available_settings
def get(self, path):
# type: (unicode) -> Union[unicode, int, float]
path = prepend_settings_prefix(path)
return self._remote_properties.get(SETTINGS_SERVICE + path).value
def set(self, path, value):
# type: (unicode, Union[unicode, int, float]) -> NoReturn
path = prepend_settings_prefix(path)
self._remote_properties.set(SETTINGS_SERVICE + path, value)
@property
def available_settings(self):
# type: () -> [unicode]
return self._remote_properties.available_properties(SETTINGS_SERVICE)
def __contains__(self, path):
# type: (unicode) -> bool
return self.exists(path)

View File

@ -0,0 +1,11 @@
SERVICE_PREFIX = 'com.victronenergy.'
VE_SERVICE_FILTER = SERVICE_PREFIX + '*'
INTERFACE_BUS_ITEM = SERVICE_PREFIX + 'BusItem'
PROPERTIES_CHANGED = 'PropertiesChanged'
GET_VALUE = 'GetValue'
SET_VALUE = 'SetValue'
GET_TEXT = 'GetText'
SETTINGS_SERVICE = 'com.victronenergy.settings'
SETTINGS_INTERFACE = 'com.victronenergy.Settings'
SETTINGS_PREFIX = '/Settings'

View File

@ -0,0 +1,73 @@
from logging import getLogger
# noinspection PyUnreachableCode
if False:
from typing import NoReturn, Optional
_log = getLogger(__name__)
class MovingAverageFilter(object):
def __init__(self, length=30, initial_value=0):
# type: (int, float) -> NoReturn
self.value = initial_value
self.length = length
def update(self, value, length=None):
# type: (float, int) -> float
if length is not None:
self.length = length
self.value = (self.value * self.length + value) / (self.length + 1)
_log.debug('real value: ' + str(value) + ', filtered value: ' + str(self.value))
return self.value
class DebounceFilter(object):
def __init__(self, initial_state=None, max_inertia=10):
# type: (Optional[bool], Optional[int]) -> NoReturn
self._max_inertia = max_inertia
self._inertia = max_inertia
self._state = initial_state
def reset(self, state=None, max_inertia=None):
# type: (Optional[bool], Optional[int]) -> bool
self._max_inertia = max_inertia or self._max_inertia
self._inertia = self._max_inertia
self._state = state or self._state
_log.debug('debounce filter reset: state={0}, inertia={1}'.format(self._state, self._inertia))
return self._state
def flip(self):
# type: () -> bool
self._state = not self._state
self._inertia = self._max_inertia
return self._state
def update(self, new_state, max_inertia=None):
# type: (bool, int) -> bool
if max_inertia is not None and max_inertia != self._max_inertia:
return self.reset(new_state, max_inertia)
if new_state != self._state:
if self._inertia > 0:
self._inertia = self._inertia - 1
else:
self.flip()
else:
self._inertia = min(self._inertia + 1, self._max_inertia)
_log.debug('debounce filter update: state={0}, inertia={1}'.format(self._state, self._inertia))
return self._state

View File

@ -0,0 +1,30 @@
from logging import getLogger
import traceback
import gobject
# noinspection PyUnreachableCode
if False:
from typing import Callable, NoReturn
_log = getLogger(__name__)
def run_on_main_loop(update_action, update_period):
# type: (Callable[[],NoReturn], int) -> NoReturn
main_loop = gobject.MainLoop()
def update(*args, **kwargs):
try:
update_action()
return True
except Exception as e:
_log.error(e.message)
traceback.print_exc()
main_loop.quit()
return False
gobject.timeout_add(update_period, update)
main_loop.run()

View File

@ -0,0 +1,115 @@
from logging import getLogger
from _dbus_glib_bindings import DBusGMainLoop
# noinspection PyUnreachableCode
if False:
from typing import Callable, NoReturn, List, AnyStr, Optional, Union
_log = getLogger(__name__)
def nop(*_args):
pass
def memoize(fn):
attr_name = '_memoized_' + fn.__name__
def _memoized(self):
if not hasattr(self, attr_name):
setattr(self, attr_name, fn(self))
return getattr(self, attr_name)
return _memoized
# noinspection PyAttributeOutsideInit
class Disposable(object):
_dispose_actions = None # type: List[Callable[[],NoReturn]]
def __enter__(self):
return self
def __exit__(self, typ, value, tb):
self.dispose()
def dispose(self):
# type: () -> NoReturn
while self._dispose_actions:
dispose = self._dispose_actions.pop()
dispose()
for k, v in self.__dict__.iteritems():
if isinstance(v, Disposable) and v._dispose_actions:
_log.debug('disposing ' + type(self).__name__ + '.' + k)
v.dispose()
def chain_disposable(self, dispose, message=None):
# type: (Union[Callable[[],None],Disposable], Optional[AnyStr]) -> NoReturn
if self._dispose_actions is None:
self._dispose_actions = []
if isinstance(dispose, Disposable):
dispose = dispose.dispose
if message is None:
self._dispose_actions.append(dispose)
return
def dispose_with_log_msg():
_log.debug('disposing ' + message)
dispose()
# _log.debug('new disposable ' + message)
self._dispose_actions.append(dispose_with_log_msg)
@classmethod
def create(cls, dispose_action, message=None):
# type: (Union[Callable[[],None],Disposable], Optional[AnyStr]) -> Disposable
disposable = Disposable()
disposable.chain_disposable(dispose_action, message)
return disposable
def create_dependent_disposable(self, dispose_action, message=None):
# type: (Union[Callable[[],None],Disposable], Optional[AnyStr]) -> Disposable
disposable = Disposable.create(dispose_action, message)
self.chain_disposable(disposable)
return disposable
class Record(object):
@memoize
def __str__(self):
return self.__class__.__name__ + ' ' + unicode(vars(self))
def __repr__(self):
return self.__str__()
@memoize
def __hash__(self):
return self.__str__().__hash__()
def __eq__(self, other):
# TODO: improve, iterable vars are not correctly handled
return str(other) == str(self)
# make readonly
def __setattr__(self, key, value):
# type: (str, any) -> NoReturn
if not key.startswith('_') and hasattr(self, key): # disallow redefining
raise ValueError(key + ' is read-only' + str(dir()))
super(Record, self).__setattr__(key, value)
class RequiresMainLoop(object):
main_loop = DBusGMainLoop(set_as_default=True) # initialized only once for all subclasses that need it

View File

@ -0,0 +1,44 @@
from logging import getLogger
import re
# noinspection PyUnreachableCode
if False:
from typing import Dict
_log = getLogger(__name__)
def make2way(dic):
# type: (Dict) -> Dict
for k, v in dic.items():
dic[v] = k
return dic
def invert_dict(src_dic):
# type: (Dict) -> Dict
dic = dict()
for k, v in src_dic.items():
dic[v] = k
return dic
def enum_file_name_of(path):
# type: (str) -> Dict[int,str]
"""
This is kinda hacky, but it works :)
The enum file must contain a single enum however!
"""
path = path[0:-1] if path.endswith('.pyc') else path
pattern = re.compile(r"^\s*(\w+)\s*=\s*(\d+)", re.M)
with open(path, "r") as f:
return {
int(m[1]): m[0]
for m
in pattern.findall(f.read())
}

View File

@ -0,0 +1,30 @@
# Copyright 2019 Ram Rachum and collaborators.
# This program is distributed under the MIT license.
'''
PySnooper - Never use print for debugging again
Usage:
import pysnooper
@pysnooper.snoop()
def your_function(x):
...
A log will be written to stderr showing the lines executed and variables
changed in the decorated function.
For more information, see https://github.com/cool-RR/PySnooper
'''
from .tracer import Tracer as snoop
from .variables import Attrs, Exploding, Indices, Keys
import collections
__VersionInfo = collections.namedtuple('VersionInfo',
('major', 'minor', 'micro'))
__version__ = '0.4.0'
__version_info__ = __VersionInfo(*(map(int, __version__.split('.'))))
del collections, __VersionInfo # Avoid polluting the namespace

View File

@ -0,0 +1,95 @@
# Copyright 2019 Ram Rachum and collaborators.
# This program is distributed under the MIT license.
'''Python 2/3 compatibility'''
import abc
import os
import inspect
import sys
import datetime as datetime_module
PY3 = (sys.version_info[0] == 3)
PY2 = not PY3
if hasattr(abc, 'ABC'):
ABC = abc.ABC
else:
class ABC(object):
"""Helper class that provides a standard way to create an ABC using
inheritance.
"""
__metaclass__ = abc.ABCMeta
__slots__ = ()
if hasattr(os, 'PathLike'):
PathLike = os.PathLike
else:
class PathLike(ABC):
"""Abstract base class for implementing the file system path protocol."""
@abc.abstractmethod
def __fspath__(self):
"""Return the file system path representation of the object."""
raise NotImplementedError
@classmethod
def __subclasshook__(cls, subclass):
return (
hasattr(subclass, '__fspath__') or
# Make a concession for older `pathlib` versions:g
(hasattr(subclass, 'open') and
'path' in subclass.__name__.lower())
)
try:
iscoroutinefunction = inspect.iscoroutinefunction
except AttributeError:
iscoroutinefunction = lambda whatever: False # Lolz
try:
isasyncgenfunction = inspect.isasyncgenfunction
except AttributeError:
isasyncgenfunction = lambda whatever: False # Lolz
if PY3:
string_types = (str,)
text_type = str
else:
string_types = (basestring,)
text_type = unicode
try:
from collections import abc as collections_abc
except ImportError: # Python 2.7
import collections as collections_abc
if sys.version_info[:2] >= (3, 6):
time_isoformat = datetime_module.time.isoformat
else:
def time_isoformat(time, timespec='microseconds'):
assert isinstance(time, datetime_module.time)
if timespec != 'microseconds':
raise NotImplementedError
result = '{:02d}:{:02d}:{:02d}.{:06d}'.format(
time.hour, time.minute, time.second, time.microsecond
)
assert len(result) == 15
return result
def timedelta_format(timedelta):
time = (datetime_module.datetime.min + timedelta).time()
return time_isoformat(time, timespec='microseconds')
def timedelta_parse(s):
hours, minutes, seconds, microseconds = map(
int,
s.replace('.', ':').split(':')
)
return datetime_module.timedelta(hours=hours, minutes=minutes,
seconds=seconds,
microseconds=microseconds)

View File

@ -0,0 +1,498 @@
# Copyright 2019 Ram Rachum and collaborators.
# This program is distributed under the MIT license.
import functools
import inspect
import opcode
import os
import sys
import re
import collections
import datetime as datetime_module
import itertools
import threading
import traceback
from .variables import CommonVariable, Exploding, BaseVariable
from . import utils, pycompat
if pycompat.PY2:
from io import open
ipython_filename_pattern = re.compile('^<ipython-input-([0-9]+)-.*>$')
def get_local_reprs(frame, watch=(), custom_repr=(), max_length=None, normalize=False):
code = frame.f_code
vars_order = (code.co_varnames + code.co_cellvars + code.co_freevars +
tuple(frame.f_locals.keys()))
result_items = [(key, utils.get_shortish_repr(value, custom_repr,
max_length, normalize))
for key, value in frame.f_locals.items()]
result_items.sort(key=lambda key_value: vars_order.index(key_value[0]))
result = collections.OrderedDict(result_items)
for variable in watch:
result.update(sorted(variable.items(frame, normalize)))
return result
class UnavailableSource(object):
def __getitem__(self, i):
return u'SOURCE IS UNAVAILABLE'
source_and_path_cache = {}
def get_path_and_source_from_frame(frame):
globs = frame.f_globals or {}
module_name = globs.get('__name__')
file_name = frame.f_code.co_filename
cache_key = (module_name, file_name)
try:
return source_and_path_cache[cache_key]
except KeyError:
pass
loader = globs.get('__loader__')
source = None
if hasattr(loader, 'get_source'):
try:
source = loader.get_source(module_name)
except ImportError:
pass
if source is not None:
source = source.splitlines()
if source is None:
ipython_filename_match = ipython_filename_pattern.match(file_name)
if ipython_filename_match:
entry_number = int(ipython_filename_match.group(1))
try:
import IPython
ipython_shell = IPython.get_ipython()
((_, _, source_chunk),) = ipython_shell.history_manager. \
get_range(0, entry_number, entry_number + 1)
source = source_chunk.splitlines()
except Exception:
pass
else:
try:
with open(file_name, 'rb') as fp:
source = fp.read().splitlines()
except utils.file_reading_errors:
pass
if not source:
# We used to check `if source is None` but I found a rare bug where it
# was empty, but not `None`, so now we check `if not source`.
source = UnavailableSource()
# If we just read the source from a file, or if the loader did not
# apply tokenize.detect_encoding to decode the source into a
# string, then we should do that ourselves.
if isinstance(source[0], bytes):
encoding = 'utf-8'
for line in source[:2]:
# File coding may be specified. Match pattern from PEP-263
# (https://www.python.org/dev/peps/pep-0263/)
match = re.search(br'coding[:=]\s*([-\w.]+)', line)
if match:
encoding = match.group(1).decode('ascii')
break
source = [pycompat.text_type(sline, encoding, 'replace') for sline in
source]
result = (file_name, source)
source_and_path_cache[cache_key] = result
return result
def get_write_function(output, overwrite):
is_path = isinstance(output, (pycompat.PathLike, str))
if overwrite and not is_path:
raise Exception('`overwrite=True` can only be used when writing '
'content to file.')
if output is None:
def write(s):
stderr = sys.stderr
try:
stderr.write(s)
except UnicodeEncodeError:
# God damn Python 2
stderr.write(utils.shitcode(s))
elif is_path:
return FileWriter(output, overwrite).write
elif callable(output):
write = output
else:
assert isinstance(output, utils.WritableStream)
def write(s):
output.write(s)
return write
class FileWriter(object):
def __init__(self, path, overwrite):
self.path = pycompat.text_type(path)
self.overwrite = overwrite
def write(self, s):
with open(self.path, 'w' if self.overwrite else 'a',
encoding='utf-8') as output_file:
output_file.write(s)
self.overwrite = False
thread_global = threading.local()
DISABLED = bool(os.getenv('PYSNOOPER_DISABLED', ''))
class Tracer:
'''
Snoop on the function, writing everything it's doing to stderr.
This is useful for debugging.
When you decorate a function with `@pysnooper.snoop()`
or wrap a block of code in `with pysnooper.snoop():`, you'll get a log of
every line that ran in the function and a play-by-play of every local
variable that changed.
If stderr is not easily accessible for you, you can redirect the output to
a file::
@pysnooper.snoop('/my/log/file.log')
See values of some expressions that aren't local variables::
@pysnooper.snoop(watch=('foo.bar', 'self.x["whatever"]'))
Expand values to see all their attributes or items of lists/dictionaries:
@pysnooper.snoop(watch_explode=('foo', 'self'))
(see Advanced Usage in the README for more control)
Show snoop lines for functions that your function calls::
@pysnooper.snoop(depth=2)
Start all snoop lines with a prefix, to grep for them easily::
@pysnooper.snoop(prefix='ZZZ ')
On multi-threaded apps identify which thread are snooped in output::
@pysnooper.snoop(thread_info=True)
Customize how values are represented as strings::
@pysnooper.snoop(custom_repr=((type1, custom_repr_func1),
(condition2, custom_repr_func2), ...))
Variables and exceptions get truncated to 100 characters by default. You
can customize that:
@pysnooper.snoop(max_variable_length=200)
You can also use `max_variable_length=None` to never truncate them.
Show timestamps relative to start time rather than wall time::
@pysnooper.snoop(relative_time=True)
'''
def __init__(self, output=None, watch=(), watch_explode=(), depth=1,
prefix='', overwrite=False, thread_info=False, custom_repr=(),
max_variable_length=100, normalize=False, relative_time=False):
self._write = get_write_function(output, overwrite)
self.watch = [
v if isinstance(v, BaseVariable) else CommonVariable(v)
for v in utils.ensure_tuple(watch)
] + [
v if isinstance(v, BaseVariable) else Exploding(v)
for v in utils.ensure_tuple(watch_explode)
]
self.frame_to_local_reprs = {}
self.start_times = {}
self.depth = depth
self.prefix = prefix
self.thread_info = thread_info
self.thread_info_padding = 0
assert self.depth >= 1
self.target_codes = set()
self.target_frames = set()
self.thread_local = threading.local()
if len(custom_repr) == 2 and not all(isinstance(x,
pycompat.collections_abc.Iterable) for x in custom_repr):
custom_repr = (custom_repr,)
self.custom_repr = custom_repr
self.last_source_path = None
self.max_variable_length = max_variable_length
self.normalize = normalize
self.relative_time = relative_time
def __call__(self, function_or_class):
if DISABLED:
return function_or_class
if inspect.isclass(function_or_class):
return self._wrap_class(function_or_class)
else:
return self._wrap_function(function_or_class)
def _wrap_class(self, cls):
for attr_name, attr in cls.__dict__.items():
# Coroutines are functions, but snooping them is not supported
# at the moment
if pycompat.iscoroutinefunction(attr):
continue
if inspect.isfunction(attr):
setattr(cls, attr_name, self._wrap_function(attr))
return cls
def _wrap_function(self, function):
self.target_codes.add(function.__code__)
@functools.wraps(function)
def simple_wrapper(*args, **kwargs):
with self:
return function(*args, **kwargs)
@functools.wraps(function)
def generator_wrapper(*args, **kwargs):
gen = function(*args, **kwargs)
method, incoming = gen.send, None
while True:
with self:
try:
outgoing = method(incoming)
except StopIteration:
return
try:
method, incoming = gen.send, (yield outgoing)
except Exception as e:
method, incoming = gen.throw, e
if pycompat.iscoroutinefunction(function):
raise NotImplementedError
if pycompat.isasyncgenfunction(function):
raise NotImplementedError
elif inspect.isgeneratorfunction(function):
return generator_wrapper
else:
return simple_wrapper
def write(self, s):
s = u'{self.prefix}{s}\n'.format(**locals())
self._write(s)
def __enter__(self):
if DISABLED:
return
calling_frame = inspect.currentframe().f_back
if not self._is_internal_frame(calling_frame):
calling_frame.f_trace = self.trace
self.target_frames.add(calling_frame)
stack = self.thread_local.__dict__.setdefault(
'original_trace_functions', []
)
stack.append(sys.gettrace())
self.start_times[calling_frame] = datetime_module.datetime.now()
sys.settrace(self.trace)
def __exit__(self, exc_type, exc_value, exc_traceback):
if DISABLED:
return
stack = self.thread_local.original_trace_functions
sys.settrace(stack.pop())
calling_frame = inspect.currentframe().f_back
self.target_frames.discard(calling_frame)
self.frame_to_local_reprs.pop(calling_frame, None)
### Writing elapsed time: #############################################
# #
start_time = self.start_times.pop(calling_frame)
duration = datetime_module.datetime.now() - start_time
elapsed_time_string = pycompat.timedelta_format(duration)
indent = ' ' * 4 * (thread_global.depth + 1)
self.write(
'{indent}Elapsed time: {elapsed_time_string}'.format(**locals())
)
# #
### Finished writing elapsed time. ####################################
def _is_internal_frame(self, frame):
return frame.f_code.co_filename == Tracer.__enter__.__code__.co_filename
def set_thread_info_padding(self, thread_info):
current_thread_len = len(thread_info)
self.thread_info_padding = max(self.thread_info_padding,
current_thread_len)
return thread_info.ljust(self.thread_info_padding)
def trace(self, frame, event, arg):
### Checking whether we should trace this line: #######################
# #
# We should trace this line either if it's in the decorated function,
# or the user asked to go a few levels deeper and we're within that
# number of levels deeper.
if not (frame.f_code in self.target_codes or frame in self.target_frames):
if self.depth == 1:
# We did the most common and quickest check above, because the
# trace function runs so incredibly often, therefore it's
# crucial to hyper-optimize it for the common case.
return None
elif self._is_internal_frame(frame):
return None
else:
_frame_candidate = frame
for i in range(1, self.depth):
_frame_candidate = _frame_candidate.f_back
if _frame_candidate is None:
return None
elif _frame_candidate.f_code in self.target_codes or _frame_candidate in self.target_frames:
break
else:
return None
thread_global.__dict__.setdefault('depth', -1)
if event == 'call':
thread_global.depth += 1
indent = ' ' * 4 * thread_global.depth
# #
### Finished checking whether we should trace this line. ##############
### Making timestamp: #################################################
# #
if self.normalize:
timestamp = ' ' * 15
elif self.relative_time:
try:
start_time = self.start_times[frame]
except KeyError:
start_time = self.start_times[frame] = \
datetime_module.datetime.now()
duration = datetime_module.datetime.now() - start_time
timestamp = pycompat.timedelta_format(duration)
else:
timestamp = pycompat.time_isoformat(
datetime_module.datetime.now().time(),
timespec='microseconds'
)
# #
### Finished making timestamp. ########################################
line_no = frame.f_lineno
source_path, source = get_path_and_source_from_frame(frame)
source_path = source_path if not self.normalize else os.path.basename(source_path)
if self.last_source_path != source_path:
self.write(u'{indent}Source path:... {source_path}'.
format(**locals()))
self.last_source_path = source_path
source_line = source[line_no - 1]
thread_info = ""
if self.thread_info:
if self.normalize:
raise NotImplementedError("normalize is not supported with "
"thread_info")
current_thread = threading.current_thread()
thread_info = "{ident}-{name} ".format(
ident=current_thread.ident, name=current_thread.getName())
thread_info = self.set_thread_info_padding(thread_info)
### Reporting newish and modified variables: ##########################
# #
old_local_reprs = self.frame_to_local_reprs.get(frame, {})
self.frame_to_local_reprs[frame] = local_reprs = \
get_local_reprs(frame,
watch=self.watch, custom_repr=self.custom_repr,
max_length=self.max_variable_length,
normalize=self.normalize,
)
newish_string = ('Starting var:.. ' if event == 'call' else
'New var:....... ')
for name, value_repr in local_reprs.items():
if name not in old_local_reprs:
self.write('{indent}{newish_string}{name} = {value_repr}'.format(
**locals()))
elif old_local_reprs[name] != value_repr:
self.write('{indent}Modified var:.. {name} = {value_repr}'.format(
**locals()))
# #
### Finished newish and modified variables. ###########################
### Dealing with misplaced function definition: #######################
# #
if event == 'call' and source_line.lstrip().startswith('@'):
# If a function decorator is found, skip lines until an actual
# function definition is found.
for candidate_line_no in itertools.count(line_no):
try:
candidate_source_line = source[candidate_line_no - 1]
except IndexError:
# End of source file reached without finding a function
# definition. Fall back to original source line.
break
if candidate_source_line.lstrip().startswith('def'):
# Found the def line!
line_no = candidate_line_no
source_line = candidate_source_line
break
# #
### Finished dealing with misplaced function definition. ##############
# If a call ends due to an exception, we still get a 'return' event
# with arg = None. This seems to be the only way to tell the difference
# https://stackoverflow.com/a/12800909/2482744
code_byte = frame.f_code.co_code[frame.f_lasti]
if not isinstance(code_byte, int):
code_byte = ord(code_byte)
ended_by_exception = (
event == 'return'
and arg is None
and (opcode.opname[code_byte]
not in ('RETURN_VALUE', 'YIELD_VALUE'))
)
if ended_by_exception:
self.write('{indent}Call ended by exception'.
format(**locals()))
else:
self.write(u'{indent}{timestamp} {thread_info}{event:9} '
u'{line_no:4} {source_line}'.format(**locals()))
if event == 'return':
self.frame_to_local_reprs.pop(frame, None)
self.start_times.pop(frame, None)
thread_global.depth -= 1
if not ended_by_exception:
return_value_repr = utils.get_shortish_repr(arg,
custom_repr=self.custom_repr,
max_length=self.max_variable_length,
normalize=self.normalize,
)
self.write('{indent}Return value:.. {return_value_repr}'.
format(**locals()))
if event == 'exception':
exception = '\n'.join(traceback.format_exception_only(*arg[:2])).strip()
if self.max_variable_length:
exception = utils.truncate(exception, self.max_variable_length)
self.write('{indent}{exception}'.
format(**locals()))
return self.trace

View File

@ -0,0 +1,98 @@
# Copyright 2019 Ram Rachum and collaborators.
# This program is distributed under the MIT license.
import abc
import re
import sys
from .pycompat import ABC, string_types, collections_abc
def _check_methods(C, *methods):
mro = C.__mro__
for method in methods:
for B in mro:
if method in B.__dict__:
if B.__dict__[method] is None:
return NotImplemented
break
else:
return NotImplemented
return True
class WritableStream(ABC):
@abc.abstractmethod
def write(self, s):
pass
@classmethod
def __subclasshook__(cls, C):
if cls is WritableStream:
return _check_methods(C, 'write')
return NotImplemented
file_reading_errors = (
IOError,
OSError,
ValueError # IronPython weirdness.
)
def shitcode(s):
return ''.join(
(c if (0 < ord(c) < 256) else '?') for c in s
)
def get_repr_function(item, custom_repr):
for condition, action in custom_repr:
if isinstance(condition, type):
condition = lambda x, y=condition: isinstance(x, y)
if condition(item):
return action
return repr
DEFAULT_REPR_RE = re.compile(r' at 0x[a-f0-9A-F]{4,}')
def normalize_repr(item_repr):
"""Remove memory address (0x...) from a default python repr"""
return DEFAULT_REPR_RE.sub('', item_repr)
def get_shortish_repr(item, custom_repr=(), max_length=None, normalize=False):
repr_function = get_repr_function(item, custom_repr)
try:
r = repr_function(item)
except Exception:
r = 'REPR FAILED'
r = r.replace('\r', '').replace('\n', '')
if normalize:
r = normalize_repr(r)
if max_length:
r = truncate(r, max_length)
return r
def truncate(string, max_length):
if (max_length is None) or (len(string) <= max_length):
return string
else:
left = (max_length - 3) // 2
right = max_length - 3 - left
return u'{}...{}'.format(string[:left], string[-right:])
def ensure_tuple(x):
if isinstance(x, collections_abc.Iterable) and \
not isinstance(x, string_types):
return tuple(x)
else:
return (x,)

View File

@ -0,0 +1,133 @@
import itertools
import abc
try:
from collections.abc import Mapping, Sequence
except ImportError:
from collections import Mapping, Sequence
from copy import deepcopy
from . import utils
from . import pycompat
def needs_parentheses(source):
def code(s):
return compile(s, '<variable>', 'eval').co_code
return code('{}.x'.format(source)) != code('({}).x'.format(source))
class BaseVariable(pycompat.ABC):
def __init__(self, source, exclude=()):
self.source = source
self.exclude = utils.ensure_tuple(exclude)
self.code = compile(source, '<variable>', 'eval')
if needs_parentheses(source):
self.unambiguous_source = '({})'.format(source)
else:
self.unambiguous_source = source
def items(self, frame, normalize=False):
try:
main_value = eval(self.code, frame.f_globals or {}, frame.f_locals)
except Exception:
return ()
return self._items(main_value, normalize)
@abc.abstractmethod
def _items(self, key, normalize=False):
raise NotImplementedError
@property
def _fingerprint(self):
return (type(self), self.source, self.exclude)
def __hash__(self):
return hash(self._fingerprint)
def __eq__(self, other):
return (isinstance(other, BaseVariable) and
self._fingerprint == other._fingerprint)
class CommonVariable(BaseVariable):
def _items(self, main_value, normalize=False):
result = [(self.source, utils.get_shortish_repr(main_value, normalize=normalize))]
for key in self._safe_keys(main_value):
try:
if key in self.exclude:
continue
value = self._get_value(main_value, key)
except Exception:
continue
result.append((
'{}{}'.format(self.unambiguous_source, self._format_key(key)),
utils.get_shortish_repr(value)
))
return result
def _safe_keys(self, main_value):
try:
for key in self._keys(main_value):
yield key
except Exception:
pass
def _keys(self, main_value):
return ()
def _format_key(self, key):
raise NotImplementedError
def _get_value(self, main_value, key):
raise NotImplementedError
class Attrs(CommonVariable):
def _keys(self, main_value):
return itertools.chain(
getattr(main_value, '__dict__', ()),
getattr(main_value, '__slots__', ())
)
def _format_key(self, key):
return '.' + key
def _get_value(self, main_value, key):
return getattr(main_value, key)
class Keys(CommonVariable):
def _keys(self, main_value):
return main_value.keys()
def _format_key(self, key):
return '[{}]'.format(utils.get_shortish_repr(key))
def _get_value(self, main_value, key):
return main_value[key]
class Indices(Keys):
_slice = slice(None)
def _keys(self, main_value):
return range(len(main_value))[self._slice]
def __getitem__(self, item):
assert isinstance(item, slice)
result = deepcopy(self)
result._slice = item
return result
class Exploding(BaseVariable):
def _items(self, main_value, normalize=False):
if isinstance(main_value, Mapping):
cls = Keys
elif isinstance(main_value, Sequence):
cls = Indices
else:
cls = Attrs
return cls(self.source, self.exclude)._items(main_value, normalize)

View File

@ -1,7 +1,7 @@
# coding=utf-8 # coding=utf-8
import config as cfg import config as cfg
from convert import mean, read_float, read_led_state, read_bool, count_bits, comma_separated, read_bitmap, return_in_list, first, read_hex_string from convert import mean, read_float, read_led_state, read_bool, count_bits, comma_separated, read_bitmap, return_in_list, first, read_hex_string,read_limb_string
from data import BatterySignal, Battery, LedColor, ServiceSignal, BatteryStatus, LedState, CsvSignal from data import BatterySignal, Battery, LedColor, ServiceSignal, BatteryStatus, LedState, CsvSignal
# noinspection PyUnreachableCode # noinspection PyUnreachableCode
@ -140,7 +140,7 @@ def init_battery_signals():
return [ return [
BatterySignal('/TimeToTOCRequest', max, read_float(register=1052)), BatterySignal('/TimeToTOCRequest', max, read_float(register=1052)),
BatterySignal('/EOCReached', return_in_list, read_eoc_reached), BatterySignal('/IoStatus/EocReached', return_in_list, read_eoc_reached),
BatterySignal('/NumOfLimbStrings', return_in_list, limp_strings_value), BatterySignal('/NumOfLimbStrings', return_in_list, limp_strings_value),
BatterySignal('/Dc/0/Voltage', mean, get_value=read_voltage(), unit='V'), BatterySignal('/Dc/0/Voltage', mean, get_value=read_voltage(), unit='V'),
BatterySignal('/Dc/0/Current', sum, get_value=read_current(), unit='A'), BatterySignal('/Dc/0/Current', sum, get_value=read_current(), unit='A'),
@ -371,4 +371,5 @@ def read_warning_and_alarm_flags():
CsvSignal('/Battery/Devices/AlarmFlags/LMPA', read_bool(base_register=1005, bit=45)), CsvSignal('/Battery/Devices/AlarmFlags/LMPA', read_bool(base_register=1005, bit=45)),
CsvSignal('/Battery/Devices/AlarmFlags/HEBT', read_bool(base_register=1005, bit=46)), CsvSignal('/Battery/Devices/AlarmFlags/HEBT', read_bool(base_register=1005, bit=46)),
CsvSignal('/Battery/Devices/AlarmFlags/CURM', read_bool(base_register=1005, bit=48)), CsvSignal('/Battery/Devices/AlarmFlags/CURM', read_bool(base_register=1005, bit=48)),
CsvSignal('/Battery/Devices/AlarmFlags/2 or more string are disabled',read_limb_string(1059)),
] ]

Binary file not shown.