using System.Runtime.CompilerServices; namespace InnovEnergy.Lib.Utils; public static class AsyncEnumerableEx { public static IAsyncEnumerable SelectMany(this IAsyncEnumerable ts, Func> map) { return ts.SelectMany(t => map(t).ToAsyncEnumerable()); } public static async IAsyncEnumerable Throttle(this IAsyncEnumerable ts, TimeSpan dueTime, [EnumeratorCancellation] CancellationToken cancel = default) { var it = ts.GetAsyncEnumerator(cancel); if(! await it.MoveNextAsync()) yield break; while (true) { var current = it.Current; using var delayCts = new CancellationTokenSource(); var delay = Task.Delay(dueTime, delayCts.Token); var next = it.MoveNextAsync().AsTask(); await Task.WhenAny(delay, next); // if (next.IsCompletedSuccessfully) // { // delayCts.Cancel(); // } // else if (next.IsCanceled) // { // delayCts.Cancel(); // yield break; // } // else // { // yield return current; // await next; // } if (delay.IsCompletedSuccessfully) { yield return current; if (await next) continue; } delayCts.Cancel(); if (next.IsCanceled) yield break; } } public static async IAsyncEnumerable Periodic(this TimeSpan period, [EnumeratorCancellation] CancellationToken ct = default) { Int64 i = 0; while(!ct.IsCancellationRequested) { yield return i; unchecked { i++; } await Task.Delay(period, ct); } } public static async void ForEach(this IAsyncEnumerable ts, Action action) { await foreach (var t in ts) action(t); } #pragma warning disable 1998 public static async IAsyncEnumerable Repeat(this T t, [EnumeratorCancellation] CancellationToken ct = default) { while(!ct.IsCancellationRequested) { yield return t; } } }