using System.Reactive; using System.Reactive.Linq; using System.Reactive.Subjects; using System.Reactive.Threading.Tasks; using CliWrap; namespace InnovEnergy.Lib.Utils.WIP; using Data = IReadOnlyList; public class ObserverPipeSource : PipeSource, IObserver { private readonly IObservable _Observable; private readonly IObserver _Observer; public ObserverPipeSource() { var subject = new Subject(); _Observer = subject; _Observable = subject .Synchronize() // make thread-safe .Publish() .RefCount(); } public override Task CopyToAsync(Stream dest, CancellationToken ct = new CancellationToken()) { return _Observable .Select(Push) .Concat() .ToTask(ct); IObservable Push(Data data) => Observable.FromAsync(async () => { if (ct.IsCancellationRequested) throw new Exception("Broken Pipe"); await dest.WriteAsync(data.ToArray(), ct); await dest.FlushAsync(ct); return Unit.Default; }); } public void OnCompleted() => _Observer.OnCompleted(); public void OnError(Exception error) => _Observer.OnError(error); public void OnNext (Data value) => _Observer.OnNext(value); }