Innovenergy_trunk/NodeRed/NodeRedFiles/dbus-fzsonick-48tl-Ivo/python_libs/ie_utils/mixins.py

116 lines
2.7 KiB
Python
Raw Normal View History

2024-06-05 10:33:44 +00:00
from logging import getLogger
from _dbus_glib_bindings import DBusGMainLoop
# noinspection PyUnreachableCode
if False:
from typing import Callable, NoReturn, List, AnyStr, Optional, Union
_log = getLogger(__name__)
def nop(*_args):
pass
def memoize(fn):
attr_name = '_memoized_' + fn.__name__
def _memoized(self):
if not hasattr(self, attr_name):
setattr(self, attr_name, fn(self))
return getattr(self, attr_name)
return _memoized
# noinspection PyAttributeOutsideInit
class Disposable(object):
_dispose_actions = None # type: List[Callable[[],NoReturn]]
def __enter__(self):
return self
def __exit__(self, typ, value, tb):
self.dispose()
def dispose(self):
# type: () -> NoReturn
while self._dispose_actions:
dispose = self._dispose_actions.pop()
dispose()
for k, v in self.__dict__.iteritems():
if isinstance(v, Disposable) and v._dispose_actions:
_log.debug('disposing ' + type(self).__name__ + '.' + k)
v.dispose()
def chain_disposable(self, dispose, message=None):
# type: (Union[Callable[[],None],Disposable], Optional[AnyStr]) -> NoReturn
if self._dispose_actions is None:
self._dispose_actions = []
if isinstance(dispose, Disposable):
dispose = dispose.dispose
if message is None:
self._dispose_actions.append(dispose)
return
def dispose_with_log_msg():
_log.debug('disposing ' + message)
dispose()
# _log.debug('new disposable ' + message)
self._dispose_actions.append(dispose_with_log_msg)
@classmethod
def create(cls, dispose_action, message=None):
# type: (Union[Callable[[],None],Disposable], Optional[AnyStr]) -> Disposable
disposable = Disposable()
disposable.chain_disposable(dispose_action, message)
return disposable
def create_dependent_disposable(self, dispose_action, message=None):
# type: (Union[Callable[[],None],Disposable], Optional[AnyStr]) -> Disposable
disposable = Disposable.create(dispose_action, message)
self.chain_disposable(disposable)
return disposable
class Record(object):
@memoize
def __str__(self):
return self.__class__.__name__ + ' ' + unicode(vars(self))
def __repr__(self):
return self.__str__()
@memoize
def __hash__(self):
return self.__str__().__hash__()
def __eq__(self, other):
# TODO: improve, iterable vars are not correctly handled
return str(other) == str(self)
# make readonly
def __setattr__(self, key, value):
# type: (str, any) -> NoReturn
if not key.startswith('_') and hasattr(self, key): # disallow redefining
raise ValueError(key + ' is read-only' + str(dir()))
super(Record, self).__setattr__(key, value)
class RequiresMainLoop(object):
main_loop = DBusGMainLoop(set_as_default=True) # initialized only once for all subclasses that need it