using System.Collections.Immutable; using System.Reactive.Linq; using System.Reactive.Threading.Tasks; using InnovEnergy.Lib.Protocols.DBus; using InnovEnergy.Lib.Protocols.DBus.Daemon; using InnovEnergy.Lib.Protocols.DBus.Protocol.DataTypes; using InnovEnergy.Lib.Utils; namespace InnovEnergy.Lib.Victron.VeDBus; using VeProps = ImmutableDictionary; public record struct ServiceProperties(String ServiceName, VeProps Properties); public static class VeDBus { public static IObservable ObservePropertiesOfService(this DBusConnection connection, String serviceName) { return connection .ObservePropertiesOfServices(busName => busName == serviceName) .SelectMany(s => s.Take(1)) .Select(s => s.Properties); } public static IObservable> ObservePropertiesOfServices(this DBusConnection connection, Func serviceSelector) { // subscription management could probably be done using a combination of GroupBy and Switch. // however I dont understand GroupBy :( var services = new Dictionary props, IDisposable subscription)>(); return connection .Services .Where(c => serviceSelector(c.BusNameOrId)) .Where(c => c.Change != ServiceChange.NoChange) .Select(ObserveProps) .Switch() .OnDispose(DisposeSubscriptions); IObservable> ObserveProps(ServiceChanged serviceChanged) { var (change, serviceName, _, _) = serviceChanged; lock (services) { if (services.TryGetValue(serviceName, out var sv)) { sv.subscription.Dispose(); services.Remove(serviceName); } if (change is ServiceChange.BusNameAdded or ServiceChange.IdAdded) { var props = connection .ObserveAllProperties(serviceName) .Replay(1); services.Add(serviceName, (props, props.Connect())); } var observeProps = services.Select(kv => kv.Value.props).ToList(); // ToList is important! var serviceNames = services.Select(kv => kv.Key).ToList(); // ToList is important! return Observable .CombineLatest(observeProps) .Select(props => Enumerable .Zip(serviceNames, props, (s, ps) => new ServiceProperties(s, ps)) .ToList()); } } void DisposeSubscriptions() { lock (services) { foreach (var service in services.Values) service.subscription.Dispose(); services.Clear(); } } } public static IObservable ObservePropertiesChanged(this DBusConnection connection, String service , ObjectPath? objectPath = default) { // some VeServices dont set the interface (!) so don't use it in the filter return connection.ObserveSignalMessages(sender: service, objectPath: objectPath, member: VeDBusApi.PropertiesChanged) .TrySelect(VeProperty.Decode); } private static IObservable ObserveAllProperties(this DBusConnection connection, String serviceName) { var init = connection .GetAllProperties(serviceName) .Catch(Observable.Empty()) .Aggregate(VeProps.Empty, UpdateItem); // produces a single aggregated dict return init .Select(ObserveChanges) .Switch(); VeProps UpdateItem(VeProps props, VeProperty prop) => props.SetItem(prop.ObjectPath, prop); IObservable ObserveChanges(VeProps initialProps) { return connection .ObservePropertiesChanged(serviceName) .Scan(initialProps, UpdateItem) .StartWith(initialProps); // not sure if necessary } } public static IObservable GetAllProperties(this DBusConnection connection, String serviceName) { var texts = connection.GetAllTexts(serviceName) .ToObservable(); var values = connection.GetAllValues(serviceName).ToObservable(); return Observable .Zip(values, texts, Merge) .SelectMany(ps => ps); static IEnumerable Merge(IReadOnlyDictionary values, IReadOnlyDictionary texts) { return from kv in values let path = kv.Key let value = kv.Value let text = texts.TryGetValue(path, out var txt) ? txt : value.ToString() select new VeProperty(path, value, text); } } public static async Task> GetAllValues(this DBusConnection connection, String destination) { var result = await connection.CallMethod(destination: destination, @interface: VeDBusApi.Interface, objectPath: ObjectPath.Root, member: VeDBusApi.GetValue); while (result is Variant v) result = v.Value; return result is IReadOnlyDictionary variantDict ? variantDict.ToDictionary(kv => kv.Key, kv => kv.Value.Value) : new Dictionary(); } public static async Task> GetAllTexts(this DBusConnection connection, String destination) { var result = await connection.CallMethod(destination: destination, @interface: VeDBusApi.Interface, objectPath: ObjectPath.Root, member: VeDBusApi.GetText); while (result is Variant v) result = v.Value; return result as IReadOnlyDictionary ?? new Dictionary(); } public static async Task GetValue(this DBusConnection connection, String destination, ObjectPath objectPath) { var value = await connection.GetValue(destination, objectPath); return (T)value; } public static async Task GetValue(this DBusConnection connection, String destination, ObjectPath objectPath) { var variant = await connection.CallMethod(destination: destination, @interface: VeDBusApi.Interface, objectPath: objectPath, member: VeDBusApi.GetValue); return variant.Value; } public static Task GetText(this DBusConnection connection, String destination, ObjectPath objectPath) { return connection.CallMethod(destination: destination, @interface: VeDBusApi.Interface, objectPath: objectPath, member: VeDBusApi.GetText); } public static async Task SetValue(this DBusConnection connection, String destination, ObjectPath objectPath, T value) { var setValue = await connection.CallMethod(destination: destination, @interface: VeDBusApi.Interface, objectPath: objectPath, member: VeDBusApi.SetValue, payload: value!.Variant()); return setValue == 0; } public static void BroadcastPropertiesChanged(this DBusConnection connection, ObjectPath path, Object value, String text) { var veProp = new VeProperty(path, value, text); connection.BroadcastPropertiesChanged(veProp); } public static void BroadcastPropertiesChanged(this DBusConnection connection, VeProperty property) { var veProp = new Dictionary { { "Value", property.Value.Variant() }, { "Text" , property.Text.Variant() } }; connection.BroadcastSignal(VeDBusApi.Interface, property.ObjectPath, VeDBusApi.PropertiesChanged, veProp); } }