Innovenergy_trunk/csharp/lib/Utils/ObservableUtils.cs

273 lines
7.5 KiB
C#

using System.Diagnostics;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
namespace InnovEnergy.Lib.Utils;
public static class ObservableUtils
{
public static IObservable<R> TrySelect<T, R>(this IObservable<T> source, Func<T, R> map)
{
return Observable.Create<R>(o =>
{
void OnNext(T next)
{
var success = false;
R result = default!;
try
{
result = map(next);
success = true;
}
catch
{
// ignored
}
if (success)
o.OnNext(result!);
}
return source.Subscribe(OnNext, o.OnError, o.OnCompleted);
});
}
public static IObservable<Exception> MergeErrors(params IObservable<Object>[] sources)
{
return Observable.Merge(sources.Select(s=>s.SelectError())).Take(1);
}
public static IObservable<Exception> MergeErrors<L,R>(this IObservable<L> fst, IObservable<R> snd)
{
return Observable.Merge(fst.SelectError(), snd.SelectError()).Take(1);
}
public static IObservable<Exception> SelectErrors(this IObservable<Exception> source)
{
return source
.Materialize()
.Where(n => n.Kind != NotificationKind.OnCompleted)
.Select(n => n.Kind == NotificationKind.OnError ? n.Exception! : n.Value);
}
public static IObservable<Exception> SelectError<TSource>(this IObservable<TSource> source)
{
if (typeof(TSource).IsAssignableTo(typeof(Exception)))
return source.Select(s =>
{
Console.WriteLine(s);
return (s as Exception)!;
}).Take(1);
return source
.Materialize()
.Where(n => n.Kind == NotificationKind.OnError)
.Select(n => n.Exception!);
}
public static IObservable<TSource> OnDispose<TSource>(this IObservable<TSource> source, Action onDispose)
{
return Observable
.Create<TSource>(observer =>
{
var subscription = source.Subscribe(observer);
return () =>
{
lock (onDispose)
{
subscription.Dispose();
onDispose();
}
};
});
}
public static IObservable<TSource> OnSubscribe<TSource>(this IObservable<TSource> source, Action onSubscribe)
{
return Observable
.Create<TSource>(observer =>
{
var subscription = source.Subscribe(observer);
onSubscribe();
return () => subscription.Dispose();
});
}
public static IObservable<T> Repeat<T>(this IScheduler scheduler, Func<T> func)
{
return Observable
.Repeat(0, scheduler)
.Select(_ => func());
}
public static IObservable<T> LazyAsync<T>(Func<IObservable<T>> observableFactory)
{
IObservable<T> FromObservable() => observableFactory().FirstAsync();
return Observable
.Defer(FromObservable)
.Replay(1)
.RefCount();
}
public static IObservable<T> LazyAsync<T>(Func<Task<T>> taskFactory)
{
IObservable<T> FromAsync() => Observable.FromAsync(taskFactory);
return Observable
.Defer(FromAsync)
.Replay(1)
.RefCount();
}
// public static IObservable<T> ThrottleDistinct<T>(this IObservable<T> source,
// TimeSpan duration,
// IScheduler scheduler)
// {
// scheduler ??= Scheduler.Default;
//
// return source
// .GroupByUntil(k => k, _ => Observable.Timer(duration, scheduler))
// .SelectMany(Observable.FirstAsync);
// }
//
//
// public static IObservable<T> ThrottleDistinct<T,K>(this IObservable<T> source,
// Func<T,K> keySelector,
// TimeSpan duration,
// IScheduler scheduler)
// {
//
//
// scheduler ??= Scheduler.Default;
//
// return source
// .GroupByUntil(keySelector, _ => Observable.Timer(duration, scheduler))
// .SelectMany(Observable.FirstAsync);
// }
public static IObservable<T> ThrottleDistinct<T, K>(this IObservable<T> source,
Func<T, K> keySelector,
TimeSpan duration,
IScheduler? scheduler = null)
{
scheduler ??= Scheduler.Default;
var hs = new HashSet<K>();
Boolean Add(K key)
{
lock (hs)
return hs.Add(key);
}
void Remove(K key)
{
lock (hs)
{
var removed = hs.Remove(key);
Debug.Assert(removed);
}
}
Boolean Filter(K key)
{
var added = Add(key);
if (added)
scheduler.Schedule(duration, _ => Remove(key));
return added;
}
return source.Where(t => Filter(keySelector(t)));
}
public static IObservable<T> ThrottleDistinct<T>(this IObservable<T> source,
TimeSpan duration,
IScheduler? scheduler = null)
{
scheduler ??= Scheduler.Default;
var hs = new HashSet<T>();
Boolean Add(T key)
{
lock (hs)
return hs.Add(key);
}
void Remove(T key)
{
lock (hs)
{
var removed = hs.Remove(key);
Debug.Assert(removed);
}
}
Boolean Filter(T key)
{
var added = Add(key);
if (added)
scheduler.Schedule(duration, _ => Remove(key));
return added;
}
return source.Where(Filter);
}
public static IObservable<IList<T>> BufferBy<T, X>(this IObservable<T> src, Func<T, X> keySelector)
{
return src.Publish(s =>
{
var boundaries = s.Select(keySelector).DistinctUntilChanged();
return s.Buffer(boundaries);
});
}
public static IObservable<Double> Integrate(this IObservable<Double> source, Double initConst = 0)
{
return source.Scan(initConst, (x, y) => x + y);
}
public static IObservable<Double> IntegrateNormalize(this IObservable<Double> source, Double initConst = 0)
{
return source.Scan(initConst, (x, y) => Math.Tanh(x + y));
}
// TODO
// public static IObservable<Double> MovingAverage(this IObservable<Double> source, Int32 windowSize)
// {
// if (windowSize < 1)
// throw new ArgumentException(nameof(windowSize) + " must be positive", nameof(windowSize));
//
// using var e = source.GetEnumerator();
//
// if (!e.MoveNext()) yield break;
//
// var state = e.Current;
//
// do
// {
// state = ((windowSize - 1) * state + e.Current) / windowSize;
// yield return state;
// }
// while (e.MoveNext());
// }
}