85 lines
2.1 KiB
C#
85 lines
2.1 KiB
C#
using System.Runtime.CompilerServices;
|
|
|
|
namespace InnovEnergy.Lib.Utils;
|
|
|
|
public static class AsyncEnumerableEx
|
|
{
|
|
|
|
public static IAsyncEnumerable<R> SelectMany<T, R>(this IAsyncEnumerable<T> ts, Func<T, IEnumerable<R>> map)
|
|
{
|
|
return ts.SelectMany<T, R>(t => map(t).ToAsyncEnumerable());
|
|
}
|
|
|
|
|
|
public static async IAsyncEnumerable<T> Throttle<T>(this IAsyncEnumerable<T> ts,
|
|
TimeSpan dueTime,
|
|
[EnumeratorCancellation] CancellationToken cancel = default)
|
|
{
|
|
var it = ts.GetAsyncEnumerator(cancel);
|
|
|
|
if(! await it.MoveNextAsync())
|
|
yield break;
|
|
|
|
while (true)
|
|
{
|
|
var current = it.Current;
|
|
|
|
using var delayCts = new CancellationTokenSource();
|
|
|
|
var delay = Task.Delay(dueTime, delayCts.Token);
|
|
var next = it.MoveNextAsync().AsTask();
|
|
|
|
await Task.WhenAny(delay, next);
|
|
|
|
// if (next.IsCompletedSuccessfully)
|
|
// {
|
|
// delayCts.Cancel();
|
|
// }
|
|
// else if (next.IsCanceled)
|
|
// {
|
|
// delayCts.Cancel();
|
|
// yield break;
|
|
// }
|
|
// else
|
|
// {
|
|
// yield return current;
|
|
// await next;
|
|
// }
|
|
|
|
if (delay.IsCompletedSuccessfully)
|
|
{
|
|
yield return current;
|
|
if (await next)
|
|
continue;
|
|
}
|
|
|
|
delayCts.Cancel();
|
|
|
|
if (next.IsCanceled)
|
|
yield break;
|
|
|
|
}
|
|
}
|
|
|
|
|
|
public static async IAsyncEnumerable<Int64> Periodic(this TimeSpan period, [EnumeratorCancellation] CancellationToken ct = default)
|
|
{
|
|
Int64 i = 0;
|
|
while(!ct.IsCancellationRequested)
|
|
{
|
|
yield return i;
|
|
unchecked { i++; }
|
|
await Task.Delay(period, ct);
|
|
}
|
|
}
|
|
|
|
#pragma warning disable 1998
|
|
public static async IAsyncEnumerable<T> Repeat<T>(this T t, [EnumeratorCancellation] CancellationToken ct = default)
|
|
{
|
|
while(!ct.IsCancellationRequested)
|
|
{
|
|
yield return t;
|
|
}
|
|
}
|
|
|
|
} |