63 lines
1.6 KiB
C#
63 lines
1.6 KiB
C#
|
using System.Reactive.Linq;
|
||
|
using System.Reactive.Subjects;
|
||
|
using System.Reactive.Threading.Tasks;
|
||
|
using CliWrap;
|
||
|
|
||
|
namespace InnovEnergy.Lib.Utils.WIP;
|
||
|
|
||
|
using Data = IReadOnlyList<Byte>;
|
||
|
|
||
|
|
||
|
public class ReadablePipeTarget : PipeTarget
|
||
|
{
|
||
|
private readonly ISubject<Int32> _Requests = new ReplaySubject<Int32>();
|
||
|
private readonly Subject<Data> _Replies = new Subject<Data>();
|
||
|
|
||
|
public async Task<Data> Read(Int32 nBytes)
|
||
|
{
|
||
|
var data = _Replies.FirstAsync();
|
||
|
_Requests.OnNext(nBytes);
|
||
|
return await data;
|
||
|
}
|
||
|
|
||
|
public override Task CopyFromAsync(Stream source, CancellationToken ct = new CancellationToken())
|
||
|
{
|
||
|
var replies = _Requests
|
||
|
.Synchronize()
|
||
|
.Select(Pull)
|
||
|
.Concat()
|
||
|
.Publish()
|
||
|
.RefCount();
|
||
|
|
||
|
replies.Subscribe(_Replies, ct);
|
||
|
|
||
|
return replies.ToTask(ct);
|
||
|
|
||
|
IObservable<Data> Pull(Int32 nToRead) => Observable.FromAsync(async () =>
|
||
|
{
|
||
|
if (ct.IsCancellationRequested)
|
||
|
throw new Exception("End of Stream");
|
||
|
|
||
|
var buffer = new Byte[nToRead];
|
||
|
var nReceived = 0;
|
||
|
|
||
|
do
|
||
|
{
|
||
|
var nRemaining = nToRead - nReceived;
|
||
|
var read = await source.ReadAsync(buffer, nReceived, nRemaining, ct);
|
||
|
if (read <= 0 || ct.IsCancellationRequested)
|
||
|
throw new Exception("End of Stream");
|
||
|
|
||
|
nReceived += read;
|
||
|
}
|
||
|
while (nReceived < nToRead);
|
||
|
|
||
|
return buffer;
|
||
|
});
|
||
|
}
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
}
|