#!/usr/bin/python3 -u # coding=utf-8 from argparse import ArgumentParser from time import sleep import dbus from _dbus_bindings import Connection, Message, MethodCallMessage, \ BUS_DAEMON_NAME, BUS_DAEMON_PATH, BUS_DAEMON_IFACE from os import environ, system def parse_args(): # type: () -> Args parser = ArgumentParser(description='lsdbus', add_help=True) lsdbus = parser.add_argument_group(title = 'process id', description = 'if none of these are specified, all messages will be shown') lsdbus.add_argument('-p', '--process_id', action="store_true", help='show process ids') # lsdbus.add_argument('-h', '--help', action="store_true", help='shows this help') lsdbus.add_argument('-c', '--cmd_path', action="store_true", help='show command line path') lsdbus.add_argument('--continuously', action="store_true", help='process is run continuously') lsdbus.add_argument('-a', '--unnamed', action="store_true", help='show unnamed processes as well') return parser.parse_args() # ARGS: -h help, -n names bus = dbus.SessionBus() if 'DBUS_SESSION_BUS_ADDRESS' in environ else dbus.SystemBus() # documentation="This script displays information about processes running on dbus. nid, name, pid and if requested path" \ # " Args: -h display this help" \ # " -c continously update 1/s" \ # " -n display path" def get_current_name_owners(args): # # args = parse_args() # fake tuples for appending later if args.unnamed: tuples = [] for name in bus.list_names(): if name.startswith(':') and not name in [tuples[i][0] for i in range(0, len(tuples))]: tuples.append([bus.get_name_owner(name) if not name.startswith(':') else name, name if not name.startswith(':') else "Unnamed"]) elif not name.startswith(':') and not bus.get_name_owner(name) in [tuples[i][0] for i in range(0, len(tuples))] and name != 'org.freedesktop.DBus': tuples.append([bus.get_name_owner(name), name]) else: tuples = [ [bus.get_name_owner(name), name] for name in bus.list_names() if not name.startswith(':') and name != 'org.freedesktop.DBus' ] if args.cmd_path: for tup in tuples: tup.append(open(("/proc/" + str( call_daemon_blocking(bus, 'GetConnectionUnixProcessID', tup[0]).get_args_list()[0]) + '/cmdline').format( str(tup[0]))).read().replace('\0', ' ').rstrip()) if args.process_id: for tup in tuples: tup.append(str(call_daemon_blocking(bus, 'GetConnectionUnixProcessID', tup[0]).get_args_list()[0])) # length = max([len(x[0]) for x in tuples]) + 2 tuples.sort(key=lambda t: t[0]) for tup in tuples: print(' '.join([x.ljust(max([len(x[i]) for x in tuples]) + 2) for i, x in enumerate(tup)])) # TODO PYTHON3 print(tup, sep=' ', end='', flush=True,) if args.continuously: sleep(2) system('clear') get_current_name_owners(args) def call_daemon_blocking(con, member, *args): # type: (Connection, str, List[Any]) -> Message msg = MethodCallMessage(BUS_DAEMON_NAME, BUS_DAEMON_PATH, BUS_DAEMON_IFACE, member) for arg in args: msg.append(arg) return con.send_message_with_reply_and_block(msg) argv = parse_args() get_current_name_owners(argv) # while True: # system('clear') # get_current_name_owners() # sleep(1)