Innovenergy_trunk/csharp/Lib/Utils/WIP/ObserverPipeSource.cs

55 lines
1.4 KiB
C#

using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
using CliWrap;
namespace InnovEnergy.Lib.Utils.WIP;
using Data = IReadOnlyList<Byte>;
public class ObserverPipeSource : PipeSource, IObserver<Data>
{
private readonly IObservable<Data> _Observable;
private readonly IObserver<Data> _Observer;
public ObserverPipeSource()
{
var subject = new Subject<Data>();
_Observer = subject;
_Observable = subject
.Synchronize() // make thread-safe
.Publish()
.RefCount();
}
public override Task CopyToAsync(Stream dest, CancellationToken ct = new CancellationToken())
{
return _Observable
.Select(Push)
.Concat()
.ToTask(ct);
IObservable<Unit> Push(Data data) => Observable.FromAsync(async () =>
{
if (ct.IsCancellationRequested)
throw new Exception("Broken Pipe");
await dest.WriteAsync(data.ToArray(), ct);
await dest.FlushAsync(ct);
return Unit.Default;
});
}
public void OnCompleted() => _Observer.OnCompleted();
public void OnError(Exception error) => _Observer.OnError(error);
public void OnNext (Data value) => _Observer.OnNext(value);
}