using System.Diagnostics; using System.Reactive; using System.Reactive.Concurrency; using System.Reactive.Linq; namespace InnovEnergy.Lib.Utils; public static class ObservableUtils { public static IObservable TrySelect(this IObservable source, Func map) { return Observable.Create(o => { void OnNext(T next) { var success = false; R result = default!; try { result = map(next); success = true; } catch { // ignored } if (success) o.OnNext(result!); } return source.Subscribe(OnNext, o.OnError, o.OnCompleted); }); } public static IObservable MergeErrors(params IObservable[] sources) { return Observable.Merge(sources.Select(s=>s.SelectError())).Take(1); } public static IObservable MergeErrors(this IObservable fst, IObservable snd) { return Observable.Merge(fst.SelectError(), snd.SelectError()).Take(1); } public static IObservable SelectErrors(this IObservable source) { return source .Materialize() .Where(n => n.Kind != NotificationKind.OnCompleted) .Select(n => n.Kind == NotificationKind.OnError ? n.Exception! : n.Value); } public static IObservable SelectError(this IObservable source) { if (typeof(TSource).IsAssignableTo(typeof(Exception))) return source.Select(s => { Console.WriteLine(s); return (s as Exception)!; }).Take(1); return source .Materialize() .Where(n => n.Kind == NotificationKind.OnError) .Select(n => n.Exception!); } public static IObservable OnDispose(this IObservable source, Action onDispose) { return Observable .Create(observer => { var subscription = source.Subscribe(observer); return () => { lock (onDispose) { subscription.Dispose(); onDispose(); } }; }); } public static IObservable OnSubscribe(this IObservable source, Action onSubscribe) { return Observable .Create(observer => { var subscription = source.Subscribe(observer); onSubscribe(); return () => subscription.Dispose(); }); } public static IObservable Repeat(this IScheduler scheduler, Func func) { return Observable .Repeat(0, scheduler) .Select(_ => func()); } public static IObservable LazyAsync(Func> observableFactory) { IObservable FromObservable() => observableFactory().FirstAsync(); return Observable .Defer(FromObservable) .Replay(1) .RefCount(); } public static IObservable LazyAsync(Func> taskFactory) { IObservable FromAsync() => Observable.FromAsync(taskFactory); return Observable .Defer(FromAsync) .Replay(1) .RefCount(); } // public static IObservable ThrottleDistinct(this IObservable source, // TimeSpan duration, // IScheduler scheduler) // { // scheduler ??= Scheduler.Default; // // return source // .GroupByUntil(k => k, _ => Observable.Timer(duration, scheduler)) // .SelectMany(Observable.FirstAsync); // } // // // public static IObservable ThrottleDistinct(this IObservable source, // Func keySelector, // TimeSpan duration, // IScheduler scheduler) // { // // // scheduler ??= Scheduler.Default; // // return source // .GroupByUntil(keySelector, _ => Observable.Timer(duration, scheduler)) // .SelectMany(Observable.FirstAsync); // } public static IObservable ThrottleDistinct(this IObservable source, Func keySelector, TimeSpan duration, IScheduler? scheduler = null) { scheduler ??= Scheduler.Default; var hs = new HashSet(); Boolean Add(K key) { lock (hs) return hs.Add(key); } void Remove(K key) { lock (hs) { var removed = hs.Remove(key); Debug.Assert(removed); } } Boolean Filter(K key) { var added = Add(key); if (added) scheduler.Schedule(duration, _ => Remove(key)); return added; } return source.Where(t => Filter(keySelector(t))); } public static IObservable ThrottleDistinct(this IObservable source, TimeSpan duration, IScheduler? scheduler = null) { scheduler ??= Scheduler.Default; var hs = new HashSet(); Boolean Add(T key) { lock (hs) return hs.Add(key); } void Remove(T key) { lock (hs) { var removed = hs.Remove(key); Debug.Assert(removed); } } Boolean Filter(T key) { var added = Add(key); if (added) scheduler.Schedule(duration, _ => Remove(key)); return added; } return source.Where(Filter); } public static IObservable> BufferBy(this IObservable src, Func keySelector) { return src.Publish(s => { var boundaries = s.Select(keySelector).DistinctUntilChanged(); return s.Buffer(boundaries); }); } public static IObservable Integrate(this IObservable source, Double initConst = 0) { return source.Scan(initConst, (x, y) => x + y); } public static IObservable IntegrateNormalize(this IObservable source, Double initConst = 0) { return source.Scan(initConst, (x, y) => Math.Tanh(x + y)); } // TODO // public static IObservable MovingAverage(this IObservable source, Int32 windowSize) // { // if (windowSize < 1) // throw new ArgumentException(nameof(windowSize) + " must be positive", nameof(windowSize)); // // using var e = source.GetEnumerator(); // // if (!e.MoveNext()) yield break; // // var state = e.Current; // // do // { // state = ((windowSize - 1) * state + e.Current) / windowSize; // yield return state; // } // while (e.MoveNext()); // } }