Innovenergy_trunk/NodeRed/NodeRedFiles/dbus-fzsonick-48tl-Ivo/ext/velib_python/ve_utils.py

203 lines
6.1 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from traceback import print_exc
from os import _exit as os_exit
from os import statvfs
import logging
from functools import update_wrapper
import dbus
logger = logging.getLogger(__name__)
VEDBUS_INVALID = dbus.Array([], signature=dbus.Signature('i'), variant_level=1)
# Use this function to make sure the code quits on an unexpected exception. Make sure to use it
# when using gobject.idle_add and also gobject.timeout_add.
# Without this, the code will just keep running, since gobject does not stop the mainloop on an
# exception.
# Example: gobject.idle_add(exit_on_error, myfunc, arg1, arg2)
def exit_on_error(func, *args, **kwargs):
try:
return func(*args, **kwargs)
except:
try:
print 'exit_on_error: there was an exception. Printing stacktrace will be tryed and then exit'
print_exc()
except:
pass
# sys.exit() is not used, since that throws an exception, which does not lead to a program
# halt when used in a dbus callback, see connection.py in the Python/Dbus libraries, line 230.
os_exit(1)
__vrm_portal_id = None
def get_vrm_portal_id():
# For the CCGX, the definition of the VRM Portal ID is that it is the mac address of the onboard-
# ethernet port (eth0), stripped from its colons (:) and lower case.
# nice coincidence is that this also works fine when running on your (linux) development computer.
global __vrm_portal_id
if __vrm_portal_id:
return __vrm_portal_id
# Assume we are on linux
import fcntl, socket, struct
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', 'eth0'[:15]))
__vrm_portal_id = ''.join(['%02x' % ord(char) for char in info[18:24]])
return __vrm_portal_id
# See VE.Can registers - public.docx for definition of this conversion
def convert_vreg_version_to_readable(version):
def str_to_arr(x, length):
a = []
for i in range(0, len(x), length):
a.append(x[i:i+length])
return a
x = "%x" % version
x = x.upper()
if len(x) == 5 or len(x) == 3 or len(x) == 1:
x = '0' + x
a = str_to_arr(x, 2);
# remove the first 00 if there are three bytes and it is 00
if len(a) == 3 and a[0] == '00':
a.remove(0);
# if we have two or three bytes now, and the first character is a 0, remove it
if len(a) >= 2 and a[0][0:1] == '0':
a[0] = a[0][1];
result = ''
for item in a:
result += ('.' if result != '' else '') + item
result = 'v' + result
return result
def get_free_space(path):
result = -1
try:
s = statvfs(path)
result = s.f_frsize * s.f_bavail # Number of free bytes that ordinary users
except Exception, ex:
logger.info("Error while retrieving free space for path %s: %s" % (path, ex))
return result
def get_load_averages():
c = read_file('/proc/loadavg')
return c.split(' ')[:3]
# Returns False if it cannot find a machine name. Otherwise returns the string
# containing the name
def get_machine_name():
c = read_file('/proc/device-tree/model')
if c != False:
return c.strip('\x00')
return read_file('/etc/venus/machine')
# Returns False if it cannot open the file. Otherwise returns its rstripped contents
def read_file(path):
content = False
try:
with open(path, 'r') as f:
content = f.read().rstrip()
except Exception, ex:
logger.debug("Error while reading %s: %s" % (path, ex))
return content
def wrap_dbus_value(value):
if value is None:
return VEDBUS_INVALID
if isinstance(value, float):
return dbus.Double(value, variant_level=1)
if isinstance(value, bool):
return dbus.Boolean(value, variant_level=1)
if isinstance(value, int):
return dbus.Int32(value, variant_level=1)
if isinstance(value, str):
return dbus.String(value, variant_level=1)
if isinstance(value, unicode):
return dbus.String(value, variant_level=1)
if isinstance(value, list):
if len(value) == 0:
# If the list is empty we cannot infer the type of the contents. So assume unsigned integer.
# A (signed) integer is dangerous, because an empty list of signed integers is used to encode
# an invalid value.
return dbus.Array([], signature=dbus.Signature('u'), variant_level=1)
return dbus.Array([wrap_dbus_value(x) for x in value], variant_level=1)
if isinstance(value, long):
return dbus.Int64(value, variant_level=1)
if isinstance(value, dict):
# Wrapping the keys of the dictionary causes D-Bus errors like:
# 'arguments to dbus_message_iter_open_container() were incorrect,
# assertion "(type == DBUS_TYPE_ARRAY && contained_signature &&
# *contained_signature == DBUS_DICT_ENTRY_BEGIN_CHAR) || (contained_signature == NULL ||
# _dbus_check_is_valid_signature (contained_signature))" failed in file ...'
return dbus.Dictionary({(k, wrap_dbus_value(v)) for k, v in value.items()}, variant_level=1)
return value
dbus_int_types = (dbus.Int32, dbus.UInt32, dbus.Byte, dbus.Int16, dbus.UInt16, dbus.UInt32, dbus.Int64, dbus.UInt64)
def unwrap_dbus_value(val):
"""Converts D-Bus values back to the original type. For example if val is of type DBus.Double,
a float will be returned."""
if isinstance(val, dbus_int_types):
return int(val)
if isinstance(val, dbus.Double):
return float(val)
if isinstance(val, dbus.Array):
v = [unwrap_dbus_value(x) for x in val]
return None if len(v) == 0 else v
if isinstance(val, (dbus.Signature, dbus.String)):
return unicode(val)
# Python has no byte type, so we convert to an integer.
if isinstance(val, dbus.Byte):
return int(val)
if isinstance(val, dbus.ByteArray):
return "".join([str(x) for x in val])
if isinstance(val, (list, tuple)):
return [unwrap_dbus_value(x) for x in val]
if isinstance(val, (dbus.Dictionary, dict)):
# Do not unwrap the keys, see comment in wrap_dbus_value
return dict([(x, unwrap_dbus_value(y)) for x, y in val.items()])
if isinstance(val, dbus.Boolean):
return bool(val)
return val
class reify(object):
""" Decorator to replace a property of an object with the calculated value,
to make it concrete. """
def __init__(self, wrapped):
self.wrapped = wrapped
update_wrapper(self, wrapped)
def __get__(self, inst, objtype=None):
if inst is None:
return self
v = self.wrapped(inst)
setattr(inst, self.wrapped.__name__, v)
return v