using System.Reactive.Linq; using InnovEnergy.Lib.Utils; namespace InnovEnergy.Lib.Protocols.Modbus.Channels; public static class PushToPullHelper { // TODO: this is incredibly hacky, improve public static Func>> PushToPull(this IObservable src) { var buffer = new Queue(); var nAvailable = src .Do(buffer.Enqueue) .Select(_ => buffer.Count) .Publish() .RefCount(); nAvailable.SelectError() .Subscribe(e => e.WriteLine()); async Task> Read(Int32 n) { Console.WriteLine($"requesting {n}"); var available = buffer.Count; if (available < n) available = await nAvailable.FirstOrDefaultAsync(a => a >= n); if (available < n) throw new Exception("Connection closed"); return Enumerable .Range(0, n) .Select(_ => buffer.Dequeue()) .ToArray(n); } return Read; } }