using System.Reactive.Concurrency; using System.Reactive.Linq; using System.Reactive.Subjects; using System.Reactive.Threading.Tasks; using InnovEnergy.Lib.Protocols.DBus.Protocol; using InnovEnergy.Lib.Protocols.DBus.Transport; using InnovEnergy.Lib.Protocols.DBus.Utils; namespace InnovEnergy.Lib.Protocols.DBus; using static Protocol.Header.MessageType; public class DBusMessageStream : DBusSocket { public IObserver OutgoingMessages { get; } public IObservable IncomingMessages { get; } private Boolean Disposed { get; set; } = false; public DBusMessageStream(Bus bus) : base(bus) { // Those don't need a scheduler/locking because sockets are thread-safe OutgoingMessages = InitOutgoing(); IncomingMessages = InitIncoming(); } protected void DispatchMessage(Message message) => OutgoingMessages.OnNext(message); protected async Task CallMethod(Message msg) { var reply = await SendMessageAndAwaitReply(msg); if (reply.Type != Error) return (T) reply.Payload!; var error = !reply.ErrorName.IsNullOrEmpty() ? reply.ErrorName : reply.Payload is String err && !err.IsNullOrEmpty() ? err : "Got an error reply"; throw new Exception(error); } private Task SendMessageAndAwaitReply(Message msg) { // TODO: timeout? if (msg.Type != MethodCall) throw new ArgumentException($"Expected a message of type {MethodCall}, got {msg.Type}", nameof(Message.Type)); var reply = IncomingMessages .FirstAsync(m => m.ReplySerial == msg.Serial && m.Type is MethodReturn or Error); DispatchMessage(msg); return reply.ToTask(); } private IConnectableObservable InitIncoming() { var incomingMessages = Observable .Repeat(0, TaskPoolScheduler.Default) .TakeWhile(_ => Connected) .Select(_ => ReadMessage()) .Publish(); incomingMessages.Connect().Apply(AddDisposable); return incomingMessages; } private Subject InitOutgoing() { var outgoing = new Subject(); outgoing.Synchronize() .ObserveOn(TaskPoolScheduler.Default) .DumpErrors() // $$$ .Subscribe(SendMessage, e => throw e) .Apply(AddDisposable); return outgoing; } }