experiment "Observable Process", unstable

This commit is contained in:
ig 2023-05-04 09:45:32 +02:00
parent d981a66d59
commit 028ff7dcae
4 changed files with 240 additions and 0 deletions

View File

@ -0,0 +1,66 @@
using System.Buffers;
using System.Reactive.Subjects;
using CliWrap;
namespace InnovEnergy.Lib.Utils.WIP;
using Data = IReadOnlyList<Byte>;
public class ObservablePipeTarget : PipeTarget, IObservable<Data>
{
private readonly Subject<Data> _Data = new();
public override async Task CopyFromAsync(Stream source, CancellationToken ct = new CancellationToken())
{
using var buffer = MemoryPool<Byte>.Shared.Rent(81920);
while (true)
{
Int32 bytesRead;
try
{
bytesRead = await source
.ReadAsync(buffer.Memory, ct)
.ConfigureAwait(false);
}
catch (Exception e)
{
_Data.OnError(e);
break;
}
if (ct.IsCancellationRequested || bytesRead == 0)
{
_Data.OnCompleted();
break;
}
buffer.Memory[..bytesRead]
.ToArray()
.Apply(_Data.OnNext);
}
// var memory = buffer.Memory;
//
// return Observable
// .Repeat(Unit.Default)
// .TakeWhile(_ => !ct.IsCancellationRequested)
// .Select(async _ =>
// {
// var bytesRead = await source.ReadAsync(memory, ct);
// _Data.OnNext(memory[..bytesRead]);
// })
// .Select(t => t.ToObservable())
// .Merge(maxConcurrent: 1)
// .ToTask(ct);
}
public IDisposable Subscribe(IObserver<IReadOnlyList<Byte>> observer) => _Data.Subscribe(observer);
//public IDisposable Subscribe(IObserver<String> observer) => _Lines.Subscribe(observer);
}

View File

@ -0,0 +1,56 @@
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
using CliWrap;
namespace InnovEnergy.Lib.Utils.WIP;
using Data = IReadOnlyList<Byte>;
public class ObservableProcess
{
private readonly Command _Command;
private readonly CancellationTokenSource _Kill = new CancellationTokenSource();
private readonly CancellationTokenSource _Interrupt = new CancellationTokenSource();
private readonly ObserverPipeSource _StdIn = new ObserverPipeSource();
private readonly ReadablePipeTarget _StdOut = new ReadablePipeTarget();
private readonly ReadablePipeTarget _StdErr = new ReadablePipeTarget();
public IObserver<Data> StdIn => _StdIn;
private readonly Subject<Int32> _Pid = new Subject<Int32>();
private readonly Subject<Int32> _ExitCode = new Subject<Int32>();
public Task<Int32> Pid { get; }
public Task<Int32> ExitCode { get; }
public Task<Data> Read(Int32 nBytes) => _StdOut.Read(nBytes);
public ObservableProcess(Command command)
{
_Command = command;
Pid = _Pid.ToTask();
ExitCode = _ExitCode.ToTask();
}
public void Start()
{
var commandTask = _Command
.WithStandardInputPipe(_StdIn)
.WithStandardOutputPipe(_StdOut)
.WithStandardErrorPipe(_StdErr)
.ExecuteAsync(_Kill.Token, _Interrupt.Token);
_Pid.OnNext(commandTask.ProcessId);
commandTask.Task
.ToObservable()
.Select(t => t.ExitCode)
.Subscribe(_ExitCode);
}
public void Kill() => _Kill.Cancel();
public void Interrupt() => _Interrupt.Cancel();
}

View File

@ -0,0 +1,55 @@
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
using CliWrap;
namespace InnovEnergy.Lib.Utils.WIP;
using Data = IReadOnlyList<Byte>;
public class ObserverPipeSource : PipeSource, IObserver<Data>
{
private readonly IObservable<Data> _Observable;
private readonly IObserver<Data> _Observer;
public ObserverPipeSource()
{
var subject = new Subject<Data>();
_Observer = subject;
_Observable = subject
.Synchronize() // make thread-safe
.Publish()
.RefCount();
}
public override Task CopyToAsync(Stream dest, CancellationToken ct = new CancellationToken())
{
return _Observable
.Select(Push)
.Concat()
.ToTask(ct);
IObservable<Unit> Push(Data data) => Observable.FromAsync(async () =>
{
if (ct.IsCancellationRequested)
throw new Exception("Broken Pipe");
await dest.WriteAsync(data.ToArray(), ct);
await dest.FlushAsync(ct);
return Unit.Default;
});
}
public void OnCompleted() => _Observer.OnCompleted();
public void OnError(Exception error) => _Observer.OnError(error);
public void OnNext (Data value) => _Observer.OnNext(value);
}

View File

@ -0,0 +1,63 @@
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
using CliWrap;
namespace InnovEnergy.Lib.Utils.WIP;
using Data = IReadOnlyList<Byte>;
public class ReadablePipeTarget : PipeTarget
{
private readonly ISubject<Int32> _Requests = new ReplaySubject<Int32>();
private readonly Subject<Data> _Replies = new Subject<Data>();
public async Task<Data> Read(Int32 nBytes)
{
var data = _Replies.FirstAsync();
_Requests.OnNext(nBytes);
return await data;
}
public override Task CopyFromAsync(Stream source, CancellationToken ct = new CancellationToken())
{
var replies = _Requests
.Synchronize()
.Select(Pull)
.Concat()
.Publish()
.RefCount();
replies.Subscribe(_Replies, ct);
return replies.ToTask(ct);
IObservable<Data> Pull(Int32 nToRead) => Observable.FromAsync(async () =>
{
if (ct.IsCancellationRequested)
throw new Exception("End of Stream");
var buffer = new Byte[nToRead];
var nReceived = 0;
do
{
var nRemaining = nToRead - nReceived;
var read = await source.ReadAsync(buffer, nReceived, nRemaining, ct);
if (read <= 0 || ct.IsCancellationRequested)
throw new Exception("End of Stream");
nReceived += read;
}
while (nReceived < nToRead);
return buffer;
});
}
}