98 lines
3.2 KiB
Plaintext
98 lines
3.2 KiB
Plaintext
|
#!/usr/bin/python -u
|
||
|
# coding=utf-8
|
||
|
from argparse import ArgumentParser
|
||
|
from time import sleep
|
||
|
import dbus
|
||
|
from _dbus_bindings import Connection, Message, MethodCallMessage, \
|
||
|
BUS_DAEMON_NAME, BUS_DAEMON_PATH, BUS_DAEMON_IFACE
|
||
|
|
||
|
from os import environ, system
|
||
|
|
||
|
|
||
|
|
||
|
def parse_args():
|
||
|
# type: () -> Args
|
||
|
parser = ArgumentParser(description='lsdbus', add_help=True)
|
||
|
|
||
|
lsdbus = parser.add_argument_group(title = 'process id', description = 'if none of these are specified, all messages will be shown')
|
||
|
lsdbus.add_argument('-p', '--process_id', action="store_true", help='show process ids')
|
||
|
# lsdbus.add_argument('-h', '--help', action="store_true", help='shows this help')
|
||
|
lsdbus.add_argument('-c', '--cmd_path', action="store_true", help='show command line path')
|
||
|
lsdbus.add_argument('--continuously', action="store_true", help='process is run continuously')
|
||
|
lsdbus.add_argument('-a', '--unnamed', action="store_true", help='show unnamed processes as well')
|
||
|
|
||
|
return parser.parse_args()
|
||
|
|
||
|
# ARGS: -h help, -n names
|
||
|
|
||
|
bus = dbus.SessionBus() if 'DBUS_SESSION_BUS_ADDRESS' in environ else dbus.SystemBus()
|
||
|
# documentation="This script displays information about processes running on dbus. nid, name, pid and if requested path" \
|
||
|
# " Args: -h display this help" \
|
||
|
# " -c continously update 1/s" \
|
||
|
# " -n display path"
|
||
|
|
||
|
def get_current_name_owners(args):
|
||
|
#
|
||
|
# args = parse_args()
|
||
|
|
||
|
# fake tuples for appending later
|
||
|
if args.unnamed:
|
||
|
tuples = []
|
||
|
for name in bus.list_names():
|
||
|
if name.startswith(':') and not name in [tuples[i][0] for i in range(0, len(tuples))]:
|
||
|
tuples.append([bus.get_name_owner(name) if not name.startswith(':') else name, name if not name.startswith(':') else "Unnamed"])
|
||
|
elif not name.startswith(':') and not bus.get_name_owner(name) in [tuples[i][0] for i in range(0, len(tuples))] and name != 'org.freedesktop.DBus':
|
||
|
tuples.append([bus.get_name_owner(name), name])
|
||
|
|
||
|
else:
|
||
|
tuples = [
|
||
|
[bus.get_name_owner(name), name]
|
||
|
for name in bus.list_names()
|
||
|
if not name.startswith(':') and name != 'org.freedesktop.DBus'
|
||
|
]
|
||
|
|
||
|
|
||
|
if args.cmd_path:
|
||
|
for tup in tuples:
|
||
|
tup.append(open(("/proc/" + str(
|
||
|
call_daemon_blocking(bus, 'GetConnectionUnixProcessID', tup[0]).get_args_list()[0]) + '/cmdline').format(
|
||
|
str(tup[0]))).read().replace('\0', ' ').rstrip())
|
||
|
|
||
|
|
||
|
if args.process_id:
|
||
|
for tup in tuples:
|
||
|
tup.append(str(call_daemon_blocking(bus, 'GetConnectionUnixProcessID', tup[0]).get_args_list()[0]))
|
||
|
|
||
|
# length = max([len(x[0]) for x in tuples]) + 2
|
||
|
|
||
|
|
||
|
tuples.sort(key=lambda t: t[0])
|
||
|
|
||
|
|
||
|
for tup in tuples:
|
||
|
print(' '.join([x.ljust(max([len(x[i]) for x in tuples]) + 2) for i, x in enumerate(tup)]))
|
||
|
# TODO PYTHON3 print(tup, sep=' ', end='', flush=True,)
|
||
|
|
||
|
if args.continuously:
|
||
|
sleep(2)
|
||
|
system('clear')
|
||
|
get_current_name_owners(args)
|
||
|
|
||
|
def call_daemon_blocking(con, member, *args):
|
||
|
# type: (Connection, str, List[Any]) -> Message
|
||
|
|
||
|
msg = MethodCallMessage(BUS_DAEMON_NAME, BUS_DAEMON_PATH, BUS_DAEMON_IFACE, member)
|
||
|
|
||
|
for arg in args:
|
||
|
msg.append(arg)
|
||
|
|
||
|
return con.send_message_with_reply_and_block(msg)
|
||
|
|
||
|
|
||
|
argv = parse_args()
|
||
|
get_current_name_owners(argv)
|
||
|
# while True:
|
||
|
# system('clear')
|
||
|
# get_current_name_owners()
|
||
|
# sleep(1)
|