Innovenergy_trunk/csharp/Lib/Protocols/DBus/Daemon/DBusDaemonConnection.cs

172 lines
5.7 KiB
C#

using System.Reactive.Concurrency;
using System.Reactive.Linq;
using InnovEnergy.Lib.Protocols.DBus.Protocol;
using InnovEnergy.Lib.Protocols.DBus.Protocol.DataTypes;
using InnovEnergy.Lib.Protocols.DBus.Utils;
using static InnovEnergy.Lib.Protocols.DBus.Daemon.DBusDaemonApi;
namespace InnovEnergy.Lib.Protocols.DBus.Daemon;
public class DBusDaemonConnection : DBusMessageStream
{
public String LocalName { get; }
public IObservable<ServiceChanged> Services { get; }
public DBusDaemonConnection(Bus bus) : base(bus)
{
LocalName = Hello().Result; // Note: blocking (no async constructors)
Services = Observable
.Defer(this.ObserveServices)
.SubscribeOn(TaskPoolScheduler.Default) // ObserveServices blocks
.Replay(1)
.RefCount();
}
private Task<String> Hello() => CallDaemon<String>("Hello");
public Task<RequestNameReply> RequestName(String name, RequestNameOptions options = RequestNameOptions.DoNotQueue)
{
return CallDaemon<RequestNameReply>("RequestName", (name, (UInt32)options));
}
public Task<ReleaseNameReply> ReleaseName(String name)
{
return CallDaemon<ReleaseNameReply>("ReleaseName", name);
}
public Task<IReadOnlyList<String>> ListNames()
{
return CallDaemon<IReadOnlyList<String>>("ListNames");
}
public Task<String> GetNameOwner(String name)
{
return CallDaemon<String>("GetNameOwner", name);
}
public Task<UInt32> GetConnectionUnixUser(String name)
{
return CallDaemon<UInt32>("GetConnectionUnixUser", name);
}
public Task<UInt32> GetConnectionUnixProcessId(String name)
{
return CallDaemon<UInt32>("GetConnectionUnixProcessID", name);
}
public Task AddMatch(IMatchRule matchRule)
{
return CallDaemon<Object>("AddMatch", matchRule.ToString());
}
public Task RemoveMatch(IMatchRule matchRule)
{
return CallDaemon<Object>("RemoveMatch", matchRule.ToString());
}
// TODO: ListQueuedOwners
// TODO: ListActivatableNames
// TODO: NameHasOwner
// TODO: StartServiceByName
// TODO: UpdateActivationEnvironment
// TODO: GetConnectionCredentials
// TODO: GetAdtAuditSessionData
// TODO: GetConnectionSELinuxSecurityContext
// TODO: GetId
// TODO: BecomeMonitor
// TODO: Features
private Task<T> CallDaemon<T>(String member, Object? payload = null)
{
var msg = Message.MethodCall
(
member: member,
payload: payload,
replyExpected: true,
destination: ServiceName,
objectPath : DBusDaemonApi.ObjectPath,
@interface : Interface
);
return CallMethod<T>(msg);
}
public IEnumerable<(String name, String id)> GetNameOwners()
{
var names = ListNames()
.Result;
return names
.Where(DBusDaemonApi.IsBusName)
.Select(name => (name, id: GetOwner(name)))
.Where(nid => nid.id != null)!;
String? GetOwner(String busName)
{
return GetNameOwner(busName)
.ContinueWith(t => t.IsCompletedSuccessfully ? t.Result : null)
.Result;
}
}
// TODO: ObserveSignalMessages without Daemon (implement one class up, then override here)
/// <summary>
///
/// </summary>
/// <param name="sender">if present, filter by sender name or id</param>
/// <param name="interface">if present, only accept messages matching interface</param>
/// <param name="objectPath">if present, only accept messages with matching object path. The objectPath parameter
/// MAY end with a *. In this case messages whose object path starts with the string preceding the * will match.</param>
/// <param name="member">if present, only accept messages matching interface</param>
/// <returns></returns>
public IObservable<Message> ObserveSignalMessages(String? sender = default,
String? @interface = default,
ObjectPath? objectPath = default,
String? member = default)
{
var prefix = objectPath.HasValue && objectPath.Value.Path.EndsWith("*")
? new ObjectPath(objectPath.Value.Path.TrimEnd('*'))
: (ObjectPath?)null;
var path = prefix is null && objectPath.HasValue
? objectPath
: null;
IMatchRule matchRule = new SignalMatchRule
{
Path = path,
Member = member,
Sender = sender,
Interface = @interface,
PathNamespace = prefix
};
DBusDaemonConnection Connect()
{
var connection = new DBusDaemonConnection(Bus); // clone connection, note: blocking
connection.AddMatch(matchRule); // fire and forget TODO: bubble exception
connection.AddDisposeAction(() => connection.RemoveMatch(matchRule)); // fire and forget
return connection;
}
var signals = Observable
.Using(Connect, c => c.IncomingMessages)
.DumpErrors()
.SubscribeOn(TaskPoolScheduler.Default); // avoid blocking subscribe
const String daemon = ServiceName;
return sender is daemon
? signals // return ALL signals
: signals.Where(s => s.Sender != daemon); // filter out signals from daemon, NameOwnerChanged etc.
}
}