using System.Buffers; using System.Reactive.Subjects; using CliWrap; namespace InnovEnergy.Lib.Utils.WIP; using Data = IReadOnlyList; public class ObservablePipeTarget : PipeTarget, IObservable { private readonly Subject _Data = new(); public override async Task CopyFromAsync(Stream source, CancellationToken ct = new CancellationToken()) { using var buffer = MemoryPool.Shared.Rent(81920); while (true) { Int32 bytesRead; try { bytesRead = await source .ReadAsync(buffer.Memory, ct) .ConfigureAwait(false); } catch (Exception e) { _Data.OnError(e); break; } if (ct.IsCancellationRequested || bytesRead == 0) { _Data.OnCompleted(); break; } buffer.Memory[..bytesRead] .ToArray() .Apply(_Data.OnNext); } // var memory = buffer.Memory; // // return Observable // .Repeat(Unit.Default) // .TakeWhile(_ => !ct.IsCancellationRequested) // .Select(async _ => // { // var bytesRead = await source.ReadAsync(memory, ct); // _Data.OnNext(memory[..bytesRead]); // }) // .Select(t => t.ToObservable()) // .Merge(maxConcurrent: 1) // .ToTask(ct); } public IDisposable Subscribe(IObserver> observer) => _Data.Subscribe(observer); //public IDisposable Subscribe(IObserver observer) => _Lines.Subscribe(observer); }