Innovenergy_trunk/firmware/opt/innovenergy/scripts/lsdbus

98 lines
3.2 KiB
Plaintext
Raw Normal View History

2023-02-16 12:57:06 +00:00
#!/usr/bin/python -u
# coding=utf-8
from argparse import ArgumentParser
from time import sleep
import dbus
from _dbus_bindings import Connection, Message, MethodCallMessage, \
BUS_DAEMON_NAME, BUS_DAEMON_PATH, BUS_DAEMON_IFACE
from os import environ, system
def parse_args():
# type: () -> Args
parser = ArgumentParser(description='lsdbus', add_help=True)
lsdbus = parser.add_argument_group(title = 'process id', description = 'if none of these are specified, all messages will be shown')
lsdbus.add_argument('-p', '--process_id', action="store_true", help='show process ids')
# lsdbus.add_argument('-h', '--help', action="store_true", help='shows this help')
lsdbus.add_argument('-c', '--cmd_path', action="store_true", help='show command line path')
lsdbus.add_argument('--continuously', action="store_true", help='process is run continuously')
lsdbus.add_argument('-a', '--unnamed', action="store_true", help='show unnamed processes as well')
return parser.parse_args()
# ARGS: -h help, -n names
bus = dbus.SessionBus() if 'DBUS_SESSION_BUS_ADDRESS' in environ else dbus.SystemBus()
# documentation="This script displays information about processes running on dbus. nid, name, pid and if requested path" \
# " Args: -h display this help" \
# " -c continously update 1/s" \
# " -n display path"
def get_current_name_owners(args):
#
# args = parse_args()
# fake tuples for appending later
if args.unnamed:
tuples = []
for name in bus.list_names():
if name.startswith(':') and not name in [tuples[i][0] for i in range(0, len(tuples))]:
tuples.append([bus.get_name_owner(name) if not name.startswith(':') else name, name if not name.startswith(':') else "Unnamed"])
elif not name.startswith(':') and not bus.get_name_owner(name) in [tuples[i][0] for i in range(0, len(tuples))] and name != 'org.freedesktop.DBus':
tuples.append([bus.get_name_owner(name), name])
else:
tuples = [
[bus.get_name_owner(name), name]
for name in bus.list_names()
if not name.startswith(':') and name != 'org.freedesktop.DBus'
]
if args.cmd_path:
for tup in tuples:
tup.append(open(("/proc/" + str(
call_daemon_blocking(bus, 'GetConnectionUnixProcessID', tup[0]).get_args_list()[0]) + '/cmdline').format(
str(tup[0]))).read().replace('\0', ' ').rstrip())
if args.process_id:
for tup in tuples:
tup.append(str(call_daemon_blocking(bus, 'GetConnectionUnixProcessID', tup[0]).get_args_list()[0]))
# length = max([len(x[0]) for x in tuples]) + 2
tuples.sort(key=lambda t: t[0])
for tup in tuples:
print(' '.join([x.ljust(max([len(x[i]) for x in tuples]) + 2) for i, x in enumerate(tup)]))
# TODO PYTHON3 print(tup, sep=' ', end='', flush=True,)
if args.continuously:
sleep(2)
system('clear')
get_current_name_owners(args)
def call_daemon_blocking(con, member, *args):
# type: (Connection, str, List[Any]) -> Message
msg = MethodCallMessage(BUS_DAEMON_NAME, BUS_DAEMON_PATH, BUS_DAEMON_IFACE, member)
for arg in args:
msg.append(arg)
return con.send_message_with_reply_and_block(msg)
argv = parse_args()
get_current_name_owners(argv)
# while True:
# system('clear')
# get_current_name_owners()
# sleep(1)