using System.Diagnostics; using System.Reactive; using System.Reactive.Concurrency; using System.Reactive.Linq; namespace InnovEnergy.Lib.Utils; public static class ObservableUtils { public static IObservable<R> TrySelect<T, R>(this IObservable<T> source, Func<T, R> map) { return Observable.Create<R>(o => { void OnNext(T next) { var success = false; R result = default!; try { result = map(next); success = true; } catch { // ignored } if (success) o.OnNext(result!); } return source.Subscribe(OnNext, o.OnError, o.OnCompleted); }); } public static IObservable<Exception> MergeErrors(params IObservable<Object>[] sources) { return Observable.Merge(sources.Select(s=>s.SelectError())).Take(1); } public static IObservable<Exception> MergeErrors<L,R>(this IObservable<L> fst, IObservable<R> snd) { return Observable.Merge(fst.SelectError(), snd.SelectError()).Take(1); } public static IObservable<Exception> SelectErrors(this IObservable<Exception> source) { return source .Materialize() .Where(n => n.Kind != NotificationKind.OnCompleted) .Select(n => n.Kind == NotificationKind.OnError ? n.Exception! : n.Value); } public static IObservable<Exception> SelectError<TSource>(this IObservable<TSource> source) { if (typeof(TSource).IsAssignableTo(typeof(Exception))) return source.Select(s => { Console.WriteLine(s); return (s as Exception)!; }).Take(1); return source .Materialize() .Where(n => n.Kind == NotificationKind.OnError) .Select(n => n.Exception!); } public static IObservable<TSource> OnDispose<TSource>(this IObservable<TSource> source, Action onDispose) { return Observable .Create<TSource>(observer => { var subscription = source.Subscribe(observer); return () => { lock (onDispose) { subscription.Dispose(); onDispose(); } }; }); } public static IObservable<TSource> OnSubscribe<TSource>(this IObservable<TSource> source, Action onSubscribe) { return Observable .Create<TSource>(observer => { var subscription = source.Subscribe(observer); onSubscribe(); return () => subscription.Dispose(); }); } public static IObservable<T> Repeat<T>(this IScheduler scheduler, Func<T> func) { return Observable .Repeat(0, scheduler) .Select(_ => func()); } public static IObservable<T> LazyAsync<T>(Func<IObservable<T>> observableFactory) { IObservable<T> FromObservable() => observableFactory().FirstAsync(); return Observable .Defer(FromObservable) .Replay(1) .RefCount(); } public static IObservable<T> LazyAsync<T>(Func<Task<T>> taskFactory) { IObservable<T> FromAsync() => Observable.FromAsync(taskFactory); return Observable .Defer(FromAsync) .Replay(1) .RefCount(); } // public static IObservable<T> ThrottleDistinct<T>(this IObservable<T> source, // TimeSpan duration, // IScheduler scheduler) // { // scheduler ??= Scheduler.Default; // // return source // .GroupByUntil(k => k, _ => Observable.Timer(duration, scheduler)) // .SelectMany(Observable.FirstAsync); // } // // // public static IObservable<T> ThrottleDistinct<T,K>(this IObservable<T> source, // Func<T,K> keySelector, // TimeSpan duration, // IScheduler scheduler) // { // // // scheduler ??= Scheduler.Default; // // return source // .GroupByUntil(keySelector, _ => Observable.Timer(duration, scheduler)) // .SelectMany(Observable.FirstAsync); // } public static IObservable<T> ThrottleDistinct<T, K>(this IObservable<T> source, Func<T, K> keySelector, TimeSpan duration, IScheduler? scheduler = null) { scheduler ??= Scheduler.Default; var hs = new HashSet<K>(); Boolean Add(K key) { lock (hs) return hs.Add(key); } void Remove(K key) { lock (hs) { var removed = hs.Remove(key); Debug.Assert(removed); } } Boolean Filter(K key) { var added = Add(key); if (added) scheduler.Schedule(duration, _ => Remove(key)); return added; } return source.Where(t => Filter(keySelector(t))); } public static IObservable<T> ThrottleDistinct<T>(this IObservable<T> source, TimeSpan duration, IScheduler? scheduler = null) { scheduler ??= Scheduler.Default; var hs = new HashSet<T>(); Boolean Add(T key) { lock (hs) return hs.Add(key); } void Remove(T key) { lock (hs) { var removed = hs.Remove(key); Debug.Assert(removed); } } Boolean Filter(T key) { var added = Add(key); if (added) scheduler.Schedule(duration, _ => Remove(key)); return added; } return source.Where(Filter); } public static IObservable<IList<T>> BufferBy<T, X>(this IObservable<T> src, Func<T, X> keySelector) { return src.Publish(s => { var boundaries = s.Select(keySelector).DistinctUntilChanged(); return s.Buffer(boundaries); }); } public static IObservable<Double> Integrate(this IObservable<Double> source, Double initConst = 0) { return source.Scan(initConst, (x, y) => x + y); } public static IObservable<Double> IntegrateNormalize(this IObservable<Double> source, Double initConst = 0) { return source.Scan(initConst, (x, y) => Math.Tanh(x + y)); } // TODO // public static IObservable<Double> MovingAverage(this IObservable<Double> source, Int32 windowSize) // { // if (windowSize < 1) // throw new ArgumentException(nameof(windowSize) + " must be positive", nameof(windowSize)); // // using var e = source.GetEnumerator(); // // if (!e.MoveNext()) yield break; // // var state = e.Current; // // do // { // state = ((windowSize - 1) * state + e.Current) / windowSize; // yield return state; // } // while (e.MoveNext()); // } }