85 lines
2.5 KiB
C#
85 lines
2.5 KiB
C#
using System.Reactive.Concurrency;
|
|
using System.Reactive.Linq;
|
|
using System.Reactive.Subjects;
|
|
using System.Reactive.Threading.Tasks;
|
|
using InnovEnergy.Lib.Protocols.DBus.Protocol;
|
|
using InnovEnergy.Lib.Protocols.DBus.Transport;
|
|
using InnovEnergy.Lib.Protocols.DBus.Utils;
|
|
|
|
namespace InnovEnergy.Lib.Protocols.DBus;
|
|
|
|
using static Protocol.Header.MessageType;
|
|
|
|
public class DBusMessageStream : DBusSocket
|
|
{
|
|
public IObserver<Message> OutgoingMessages { get; }
|
|
public IObservable<Message> IncomingMessages { get; }
|
|
|
|
private Boolean Disposed { get; set; } = false;
|
|
|
|
public DBusMessageStream(Bus bus) : base(bus)
|
|
{
|
|
// Those don't need a scheduler/locking because sockets are thread-safe
|
|
OutgoingMessages = InitOutgoing();
|
|
IncomingMessages = InitIncoming();
|
|
}
|
|
|
|
protected void DispatchMessage(Message message) => OutgoingMessages.OnNext(message);
|
|
|
|
protected async Task<T> CallMethod<T>(Message msg)
|
|
{
|
|
var reply = await SendMessageAndAwaitReply(msg);
|
|
|
|
if (reply.Type != Error)
|
|
return (T) reply.Payload!;
|
|
|
|
var error = !reply.ErrorName.IsNullOrEmpty() ? reply.ErrorName
|
|
: reply.Payload is String err && !err.IsNullOrEmpty() ? err
|
|
: "Got an error reply";
|
|
|
|
throw new Exception(error);
|
|
}
|
|
|
|
private Task<Message> SendMessageAndAwaitReply(Message msg)
|
|
{
|
|
// TODO: timeout?
|
|
|
|
if (msg.Type != MethodCall)
|
|
throw new ArgumentException($"Expected a message of type {MethodCall}, got {msg.Type}", nameof(Message.Type));
|
|
|
|
var reply = IncomingMessages
|
|
.FirstAsync(m => m.ReplySerial == msg.Serial &&
|
|
m.Type is MethodReturn or Error);
|
|
|
|
DispatchMessage(msg);
|
|
|
|
return reply.ToTask();
|
|
}
|
|
|
|
private IConnectableObservable<Message> InitIncoming()
|
|
{
|
|
var incomingMessages = Observable
|
|
.Repeat(0, TaskPoolScheduler.Default)
|
|
.TakeWhile(_ => Connected)
|
|
.Select(_ => ReadMessage())
|
|
.Publish();
|
|
|
|
incomingMessages.Connect().Apply(AddDisposable);
|
|
|
|
return incomingMessages;
|
|
}
|
|
|
|
private Subject<Message> InitOutgoing()
|
|
{
|
|
var outgoing = new Subject<Message>();
|
|
|
|
outgoing.Synchronize()
|
|
.ObserveOn(TaskPoolScheduler.Default)
|
|
.DumpErrors() // $$$
|
|
.Subscribe(SendMessage, e => throw e)
|
|
.Apply(AddDisposable);
|
|
|
|
return outgoing;
|
|
}
|
|
|
|
} |