Innovenergy_trunk/csharp/Lib/Utils/AsyncEnumerableEx.cs

91 lines
2.3 KiB
C#

using System.Runtime.CompilerServices;
namespace InnovEnergy.Lib.Utils;
public static class AsyncEnumerableEx
{
public static IAsyncEnumerable<R> SelectMany<T, R>(this IAsyncEnumerable<T> ts, Func<T, IEnumerable<R>> map)
{
return ts.SelectMany<T, R>(t => map(t).ToAsyncEnumerable());
}
public static async IAsyncEnumerable<T> Throttle<T>(this IAsyncEnumerable<T> ts,
TimeSpan dueTime,
[EnumeratorCancellation] CancellationToken cancel = default)
{
var it = ts.GetAsyncEnumerator(cancel);
if(! await it.MoveNextAsync())
yield break;
while (true)
{
var current = it.Current;
using var delayCts = new CancellationTokenSource();
var delay = Task.Delay(dueTime, delayCts.Token);
var next = it.MoveNextAsync().AsTask();
await Task.WhenAny(delay, next);
// if (next.IsCompletedSuccessfully)
// {
// delayCts.Cancel();
// }
// else if (next.IsCanceled)
// {
// delayCts.Cancel();
// yield break;
// }
// else
// {
// yield return current;
// await next;
// }
if (delay.IsCompletedSuccessfully)
{
yield return current;
if (await next)
continue;
}
delayCts.Cancel();
if (next.IsCanceled)
yield break;
}
}
public static async IAsyncEnumerable<Int64> Periodic(this TimeSpan period, [EnumeratorCancellation] CancellationToken ct = default)
{
Int64 i = 0;
while(!ct.IsCancellationRequested)
{
yield return i;
unchecked { i++; }
await Task.Delay(period, ct);
}
}
public static async void ForEach<T>(this IAsyncEnumerable<T> ts, Action<T> action)
{
await foreach (var t in ts)
action(t);
}
#pragma warning disable 1998
public static async IAsyncEnumerable<T> Repeat<T>(this T t, [EnumeratorCancellation] CancellationToken ct = default)
{
while(!ct.IsCancellationRequested)
{
yield return t;
}
}
}