Innovenergy_trunk/csharp/Lib/Utils/WIP/ObservablePipeTarget.cs

65 lines
1.8 KiB
C#
Raw Normal View History

using System.Buffers;
using System.Reactive.Subjects;
using CliWrap;
namespace InnovEnergy.Lib.Utils.WIP;
using Data = IReadOnlyList<Byte>;
public class ObservablePipeTarget : PipeTarget, IObservable<Data>
{
private readonly Subject<Data> _Data = new();
public override async Task CopyFromAsync(Stream source, CancellationToken ct = new CancellationToken())
{
using var buffer = MemoryPool<Byte>.Shared.Rent(81920);
while (true)
{
Int32 bytesRead;
try
{
bytesRead = await source
.ReadAsync(buffer.Memory, ct)
.ConfigureAwait(false);
}
catch (Exception e)
{
_Data.OnError(e);
break;
}
if (ct.IsCancellationRequested || bytesRead == 0)
{
_Data.OnCompleted();
break;
}
buffer.Memory[..bytesRead]
.ToArray()
.Apply(_Data.OnNext);
}
// var memory = buffer.Memory;
//
// return Observable
// .Repeat(Unit.Default)
// .TakeWhile(_ => !ct.IsCancellationRequested)
// .Select(async _ =>
// {
// var bytesRead = await source.ReadAsync(memory, ct);
// _Data.OnNext(memory[..bytesRead]);
// })
// .Select(t => t.ToObservable())
// .Merge(maxConcurrent: 1)
// .ToTask(ct);
}
public IDisposable Subscribe(IObserver<IReadOnlyList<Byte>> observer) => _Data.Subscribe(observer);
//public IDisposable Subscribe(IObserver<String> observer) => _Lines.Subscribe(observer);
}