Innovenergy_trunk/csharp/Lib/Utils/DictObservables.cs

83 lines
2.1 KiB
C#

using System.Reactive.Linq;
using System.Reactive.Subjects;
namespace InnovEnergy.Lib.Utils;
public static class DictObservables
{
public static IObservable<IReadOnlyDictionary<K, R>> DictObserve<K, V, R>
(
this IObservable<IReadOnlyDictionary<K, V>> src,
Func<IObservable<V>, IObservable<R>> map
)
where K : notnull
{
var subjects = new Dictionary<K, IObserver<V>>();
var results = new Dictionary<K, R>();
return src.Select(Map);
// TODO: thread safety
IReadOnlyDictionary<K,R> Map(IReadOnlyDictionary<K, V> inDict)
{
results.Clear();
foreach (var (key, value) in inDict)
{
if (!subjects.ContainsKey(key))
{
var subject = new Subject<V>();
map(subject).Subscribe(r => results[key] = r);
subjects[key] = subject;
}
subjects[key].OnNext(value);
}
return results;
}
}
public static IObservable<IReadOnlyDictionary<K, V>> DictWhere<K, V>
(
this IObservable<IReadOnlyDictionary<K, V>> src,
Func<V, Boolean> predicate
)
where K : notnull
{
return from dict in src
select dict
.Where(e => predicate(e.Value))
.ToDictionary(e => e.Key, e => e.Value);
}
public static IObservable<IReadOnlyDictionary<K, R>> DictSelect<K, V, R>
(
this IObservable<IReadOnlyDictionary<K, V>> src,
Func<V, R> map
)
where K : notnull
{
return from dict in src
select dict.ToDictionary(e => e.Key, e => map(e.Value));
}
public static IObservable<IReadOnlyDictionary<K2, V2>> DictSelect<K1, V1, K2, V2>
(
this IObservable<IReadOnlyDictionary<K1, V1>> src,
Func<KeyValuePair<K1,V1>, KeyValuePair<K2,V2>> map
)
where K1 : notnull
where K2 : notnull
{
return from dict in src
select dict
.Select(map)
.ToDictionary(e => e.Key, e => e.Value);
}
}