Innovenergy_trunk/csharp/Lib/Utils/WIP/ReadablePipeTarget.cs

63 lines
1.6 KiB
C#

using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
using CliWrap;
namespace InnovEnergy.Lib.Utils.WIP;
using Data = IReadOnlyList<Byte>;
public class ReadablePipeTarget : PipeTarget
{
private readonly ISubject<Int32> _Requests = new ReplaySubject<Int32>();
private readonly Subject<Data> _Replies = new Subject<Data>();
public async Task<Data> Read(Int32 nBytes)
{
var data = _Replies.FirstAsync();
_Requests.OnNext(nBytes);
return await data;
}
public override Task CopyFromAsync(Stream source, CancellationToken ct = new CancellationToken())
{
var replies = _Requests
.Synchronize()
.Select(Pull)
.Concat()
.Publish()
.RefCount();
replies.Subscribe(_Replies, ct);
return replies.ToTask(ct);
IObservable<Data> Pull(Int32 nToRead) => Observable.FromAsync(async () =>
{
if (ct.IsCancellationRequested)
throw new Exception("End of Stream");
var buffer = new Byte[nToRead];
var nReceived = 0;
do
{
var nRemaining = nToRead - nReceived;
var read = await source.ReadAsync(buffer, nReceived, nRemaining, ct);
if (read <= 0 || ct.IsCancellationRequested)
throw new Exception("End of Stream");
nReceived += read;
}
while (nReceived < nToRead);
return buffer;
});
}
}