Innovenergy_trunk/csharp/Lib/Protocols/Modbus/Channels/PushToPullHelper.cs

42 lines
1.2 KiB
C#

using System.Reactive.Linq;
using InnovEnergy.Lib.Utils;
namespace InnovEnergy.Lib.Protocols.Modbus.Channels;
public static class PushToPullHelper
{
// TODO: this is incredibly hacky, improve
public static Func<Int32, Task<IReadOnlyList<T>>> PushToPull<T>(this IObservable<T> src)
{
var buffer = new Queue<T>();
var nAvailable = src
.Do(buffer.Enqueue)
.Select(_ => buffer.Count)
.Publish()
.RefCount();
nAvailable.SelectError()
.Subscribe(e => e.WriteLine());
async Task<IReadOnlyList<T>> Read(Int32 n)
{
Console.WriteLine($"requesting {n}");
var available = buffer.Count;
if (available < n)
available = await nAvailable.FirstOrDefaultAsync(a => a >= n);
if (available < n)
throw new Exception("Connection closed");
return Enumerable
.Range(0, n)
.Select(_ => buffer.Dequeue())
.ToArray(n);
}
return Read;
}
}