273 lines
7.5 KiB
C#
273 lines
7.5 KiB
C#
|
using System.Diagnostics;
|
||
|
using System.Reactive;
|
||
|
using System.Reactive.Concurrency;
|
||
|
using System.Reactive.Linq;
|
||
|
|
||
|
namespace InnovEnergy.Lib.Utils;
|
||
|
|
||
|
public static class ObservableUtils
|
||
|
{
|
||
|
|
||
|
public static IObservable<R> TrySelect<T, R>(this IObservable<T> source, Func<T, R> map)
|
||
|
{
|
||
|
|
||
|
return Observable.Create<R>(o =>
|
||
|
{
|
||
|
void OnNext(T next)
|
||
|
{
|
||
|
var success = false;
|
||
|
R result = default!;
|
||
|
|
||
|
try
|
||
|
{
|
||
|
result = map(next);
|
||
|
success = true;
|
||
|
}
|
||
|
catch
|
||
|
{
|
||
|
// ignored
|
||
|
}
|
||
|
|
||
|
if (success)
|
||
|
o.OnNext(result!);
|
||
|
}
|
||
|
|
||
|
return source.Subscribe(OnNext, o.OnError, o.OnCompleted);
|
||
|
|
||
|
});
|
||
|
}
|
||
|
|
||
|
public static IObservable<Exception> MergeErrors(params IObservable<Object>[] sources)
|
||
|
{
|
||
|
return Observable.Merge(sources.Select(s=>s.SelectError())).Take(1);
|
||
|
}
|
||
|
|
||
|
public static IObservable<Exception> MergeErrors<L,R>(this IObservable<L> fst, IObservable<R> snd)
|
||
|
{
|
||
|
return Observable.Merge(fst.SelectError(), snd.SelectError()).Take(1);
|
||
|
}
|
||
|
|
||
|
public static IObservable<Exception> SelectErrors(this IObservable<Exception> source)
|
||
|
{
|
||
|
return source
|
||
|
.Materialize()
|
||
|
.Where(n => n.Kind != NotificationKind.OnCompleted)
|
||
|
.Select(n => n.Kind == NotificationKind.OnError ? n.Exception! : n.Value);
|
||
|
}
|
||
|
|
||
|
public static IObservable<Exception> SelectError<TSource>(this IObservable<TSource> source)
|
||
|
{
|
||
|
if (typeof(TSource).IsAssignableTo(typeof(Exception)))
|
||
|
return source.Select(s =>
|
||
|
{
|
||
|
Console.WriteLine(s);
|
||
|
return (s as Exception)!;
|
||
|
}).Take(1);
|
||
|
|
||
|
return source
|
||
|
.Materialize()
|
||
|
.Where(n => n.Kind == NotificationKind.OnError)
|
||
|
.Select(n => n.Exception!);
|
||
|
}
|
||
|
|
||
|
|
||
|
|
||
|
public static IObservable<TSource> OnDispose<TSource>(this IObservable<TSource> source, Action onDispose)
|
||
|
{
|
||
|
return Observable
|
||
|
.Create<TSource>(observer =>
|
||
|
{
|
||
|
var subscription = source.Subscribe(observer);
|
||
|
return () =>
|
||
|
{
|
||
|
lock (onDispose)
|
||
|
{
|
||
|
subscription.Dispose();
|
||
|
onDispose();
|
||
|
}
|
||
|
};
|
||
|
});
|
||
|
}
|
||
|
|
||
|
public static IObservable<TSource> OnSubscribe<TSource>(this IObservable<TSource> source, Action onSubscribe)
|
||
|
{
|
||
|
return Observable
|
||
|
.Create<TSource>(observer =>
|
||
|
{
|
||
|
var subscription = source.Subscribe(observer);
|
||
|
onSubscribe();
|
||
|
return () => subscription.Dispose();
|
||
|
});
|
||
|
}
|
||
|
|
||
|
|
||
|
public static IObservable<T> Repeat<T>(this IScheduler scheduler, Func<T> func)
|
||
|
{
|
||
|
return Observable
|
||
|
.Repeat(0, scheduler)
|
||
|
.Select(_ => func());
|
||
|
}
|
||
|
|
||
|
|
||
|
public static IObservable<T> LazyAsync<T>(Func<IObservable<T>> observableFactory)
|
||
|
{
|
||
|
IObservable<T> FromObservable() => observableFactory().FirstAsync();
|
||
|
|
||
|
return Observable
|
||
|
.Defer(FromObservable)
|
||
|
.Replay(1)
|
||
|
.RefCount();
|
||
|
}
|
||
|
|
||
|
|
||
|
public static IObservable<T> LazyAsync<T>(Func<Task<T>> taskFactory)
|
||
|
{
|
||
|
IObservable<T> FromAsync() => Observable.FromAsync(taskFactory);
|
||
|
|
||
|
return Observable
|
||
|
.Defer(FromAsync)
|
||
|
.Replay(1)
|
||
|
.RefCount();
|
||
|
}
|
||
|
|
||
|
// public static IObservable<T> ThrottleDistinct<T>(this IObservable<T> source,
|
||
|
// TimeSpan duration,
|
||
|
// IScheduler scheduler)
|
||
|
// {
|
||
|
// scheduler ??= Scheduler.Default;
|
||
|
//
|
||
|
// return source
|
||
|
// .GroupByUntil(k => k, _ => Observable.Timer(duration, scheduler))
|
||
|
// .SelectMany(Observable.FirstAsync);
|
||
|
// }
|
||
|
//
|
||
|
//
|
||
|
// public static IObservable<T> ThrottleDistinct<T,K>(this IObservable<T> source,
|
||
|
// Func<T,K> keySelector,
|
||
|
// TimeSpan duration,
|
||
|
// IScheduler scheduler)
|
||
|
// {
|
||
|
//
|
||
|
//
|
||
|
// scheduler ??= Scheduler.Default;
|
||
|
//
|
||
|
// return source
|
||
|
// .GroupByUntil(keySelector, _ => Observable.Timer(duration, scheduler))
|
||
|
// .SelectMany(Observable.FirstAsync);
|
||
|
// }
|
||
|
|
||
|
public static IObservable<T> ThrottleDistinct<T, K>(this IObservable<T> source,
|
||
|
Func<T, K> keySelector,
|
||
|
TimeSpan duration,
|
||
|
IScheduler? scheduler = null)
|
||
|
{
|
||
|
scheduler ??= Scheduler.Default;
|
||
|
|
||
|
var hs = new HashSet<K>();
|
||
|
|
||
|
Boolean Add(K key)
|
||
|
{
|
||
|
lock (hs)
|
||
|
return hs.Add(key);
|
||
|
}
|
||
|
|
||
|
void Remove(K key)
|
||
|
{
|
||
|
lock (hs)
|
||
|
{
|
||
|
var removed = hs.Remove(key);
|
||
|
Debug.Assert(removed);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
Boolean Filter(K key)
|
||
|
{
|
||
|
var added = Add(key);
|
||
|
|
||
|
if (added)
|
||
|
scheduler.Schedule(duration, _ => Remove(key));
|
||
|
|
||
|
return added;
|
||
|
}
|
||
|
|
||
|
return source.Where(t => Filter(keySelector(t)));
|
||
|
}
|
||
|
|
||
|
|
||
|
public static IObservable<T> ThrottleDistinct<T>(this IObservable<T> source,
|
||
|
TimeSpan duration,
|
||
|
IScheduler? scheduler = null)
|
||
|
{
|
||
|
scheduler ??= Scheduler.Default;
|
||
|
|
||
|
var hs = new HashSet<T>();
|
||
|
|
||
|
Boolean Add(T key)
|
||
|
{
|
||
|
lock (hs)
|
||
|
return hs.Add(key);
|
||
|
}
|
||
|
|
||
|
void Remove(T key)
|
||
|
{
|
||
|
lock (hs)
|
||
|
{
|
||
|
var removed = hs.Remove(key);
|
||
|
Debug.Assert(removed);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
Boolean Filter(T key)
|
||
|
{
|
||
|
var added = Add(key);
|
||
|
|
||
|
if (added)
|
||
|
scheduler.Schedule(duration, _ => Remove(key));
|
||
|
|
||
|
return added;
|
||
|
}
|
||
|
|
||
|
return source.Where(Filter);
|
||
|
}
|
||
|
|
||
|
|
||
|
public static IObservable<IList<T>> BufferBy<T, X>(this IObservable<T> src, Func<T, X> keySelector)
|
||
|
{
|
||
|
return src.Publish(s =>
|
||
|
{
|
||
|
var boundaries = s.Select(keySelector).DistinctUntilChanged();
|
||
|
return s.Buffer(boundaries);
|
||
|
});
|
||
|
}
|
||
|
|
||
|
public static IObservable<Double> Integrate(this IObservable<Double> source, Double initConst = 0)
|
||
|
{
|
||
|
return source.Scan(initConst, (x, y) => x + y);
|
||
|
}
|
||
|
|
||
|
public static IObservable<Double> IntegrateNormalize(this IObservable<Double> source, Double initConst = 0)
|
||
|
{
|
||
|
return source.Scan(initConst, (x, y) => Math.Tanh(x + y));
|
||
|
}
|
||
|
|
||
|
|
||
|
// TODO
|
||
|
// public static IObservable<Double> MovingAverage(this IObservable<Double> source, Int32 windowSize)
|
||
|
// {
|
||
|
// if (windowSize < 1)
|
||
|
// throw new ArgumentException(nameof(windowSize) + " must be positive", nameof(windowSize));
|
||
|
//
|
||
|
// using var e = source.GetEnumerator();
|
||
|
//
|
||
|
// if (!e.MoveNext()) yield break;
|
||
|
//
|
||
|
// var state = e.Current;
|
||
|
//
|
||
|
// do
|
||
|
// {
|
||
|
// state = ((windowSize - 1) * state + e.Current) / windowSize;
|
||
|
// yield return state;
|
||
|
// }
|
||
|
// while (e.MoveNext());
|
||
|
// }
|
||
|
}
|