using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
using CliWrap;

namespace InnovEnergy.Lib.Utils.WIP;

using Data = IReadOnlyList<Byte>;


public class ObserverPipeSource : PipeSource, IObserver<Data>
{
    private readonly IObservable<Data> _Observable;
    private readonly IObserver<Data>   _Observer;

    public ObserverPipeSource()
    {
        var subject = new Subject<Data>();

        _Observer   = subject;
        _Observable = subject
                     .Synchronize() // make thread-safe
                     .Publish()
                     .RefCount();
    }

    public override Task CopyToAsync(Stream dest, CancellationToken ct = new CancellationToken())
    {
        return _Observable
               .Select(Push)
               .Concat()
               .ToTask(ct);

        IObservable<Unit> Push(Data data) => Observable.FromAsync(async () =>
        {
            if (ct.IsCancellationRequested)
                throw new Exception("Broken Pipe");

            await dest.WriteAsync(data.ToArray(), ct);
            await dest.FlushAsync(ct);
            
            return Unit.Default;
        });
    }

    

    public void OnCompleted() => _Observer.OnCompleted();

    public void OnError(Exception error) => _Observer.OnError(error);
    
    public void OnNext (Data value) => _Observer.OnNext(value);

}