From 22ad7b361200f582705f0c3597e2c86800fe1356 Mon Sep 17 00:00:00 2001 From: Noe Date: Thu, 2 Nov 2023 15:50:04 +0100 Subject: [PATCH] Add Middleware functionality, add middleware connection to the controller --- csharp/App/Middleware/InstallationInfo.cs | 9 ++ csharp/App/Middleware/Middleware.csproj | 18 +++ csharp/App/Middleware/Program.cs | 46 ++++++ csharp/App/Middleware/RabbitMQConsumer.cs | 87 +++++++++++ csharp/App/Middleware/StatusMessage.cs | 8 + csharp/App/Middleware/WebSocketListener.cs | 141 ++++++++++++++++++ csharp/App/SaliMax/SaliMax.csproj | 1 + .../src/MiddlewareClasses/StatusMessage.cs | 8 + csharp/App/SaliMax/src/Program.cs | 122 ++++++++++++++- 9 files changed, 435 insertions(+), 5 deletions(-) create mode 100644 csharp/App/Middleware/InstallationInfo.cs create mode 100644 csharp/App/Middleware/Middleware.csproj create mode 100644 csharp/App/Middleware/Program.cs create mode 100644 csharp/App/Middleware/RabbitMQConsumer.cs create mode 100644 csharp/App/Middleware/StatusMessage.cs create mode 100644 csharp/App/Middleware/WebSocketListener.cs create mode 100644 csharp/App/SaliMax/src/MiddlewareClasses/StatusMessage.cs diff --git a/csharp/App/Middleware/InstallationInfo.cs b/csharp/App/Middleware/InstallationInfo.cs new file mode 100644 index 000000000..2e1f5bd79 --- /dev/null +++ b/csharp/App/Middleware/InstallationInfo.cs @@ -0,0 +1,9 @@ +using System.Net.WebSockets; + +namespace InnovEnergy.App.Middleware; + +public class InstallationInfo +{ + public int Status { get; set; } + public List Connections { get; } = new List(); +} \ No newline at end of file diff --git a/csharp/App/Middleware/Middleware.csproj b/csharp/App/Middleware/Middleware.csproj new file mode 100644 index 000000000..34fc15eab --- /dev/null +++ b/csharp/App/Middleware/Middleware.csproj @@ -0,0 +1,18 @@ + + + + + + InnovEnergy.App.Middleware + + + + + + + + + + + + diff --git a/csharp/App/Middleware/Program.cs b/csharp/App/Middleware/Program.cs new file mode 100644 index 000000000..091e76a0f --- /dev/null +++ b/csharp/App/Middleware/Program.cs @@ -0,0 +1,46 @@ +using InnovEnergy.App.Middleware; +using System; +using System.Net; +using System.Net.Sockets; +using System.Net.WebSockets; +using System.Text; + +internal class Program +{ + public static readonly object SharedDataLock = new object(); + + public static void Main(string[] args) + { + //For each installation id, we maintain a list of the connected clients + var installationConnections = new Dictionary(); + var installationsIds = new List {1}; + var installationIps = new List {"10.2.3.115"}; + RabbitMqConsumer.StartRabbitMqConsumer(installationConnections,SharedDataLock); + + UdpClient udpClient = new UdpClient(); + int port = 9000; + for (int i = 0; i < installationsIds.Count; i++) + { + using (udpClient) + { + // Convert the message to bytes (UTF-8 encoding is used here) + string message = "This is a message from RabbitMQ server, you can subscribe to the RabbitMQ queue"; + byte[] data = Encoding.UTF8.GetBytes(message); + + // Send the UDP message to the specified IP address and port + udpClient.Send(data, data.Length, installationIps[i], port); + + Console.WriteLine($"Sent UDP message to {installationIps[i]}:{port}: {message}"); + IPEndPoint remoteEndPoint = new IPEndPoint(IPAddress.Parse(installationIps[i]), port); + byte[] replyData = udpClient.Receive(ref remoteEndPoint); + string replyMessage = Encoding.UTF8.GetString(replyData); + Console.WriteLine("Received message from installation "+installationsIds[i]); + } + } + + WebSocketListener.StartServerAsync(installationConnections,SharedDataLock); + Console.WriteLine("WebSocket server is running. Press Enter to exit."); + + Console.ReadLine(); + } +} \ No newline at end of file diff --git a/csharp/App/Middleware/RabbitMQConsumer.cs b/csharp/App/Middleware/RabbitMQConsumer.cs new file mode 100644 index 000000000..66ea10989 --- /dev/null +++ b/csharp/App/Middleware/RabbitMQConsumer.cs @@ -0,0 +1,87 @@ +using System.Net.WebSockets; +using System.Text.Json; + +namespace InnovEnergy.App.Middleware; +using System.Text; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; + +public static class RabbitMqConsumer +{ + + private static ConnectionFactory _factory = null!; + private static IConnection _connection = null!; + private static IModel _channel= null!; + + public static void StartRabbitMqConsumer(Dictionary installationConnections, Object sharedDataLock) + { + string vpnServerIp = "10.2.0.11"; + _factory = new ConnectionFactory { HostName = vpnServerIp }; + _connection = _factory.CreateConnection(); + _channel = _connection.CreateModel(); + Console.WriteLine("Middleware subscribed to RabbitMQ queue, ready for receiving messages"); + _channel.QueueDeclare(queue: "statusQueue", durable: false, exclusive: false, autoDelete: false, arguments: null); + + var consumer = new EventingBasicConsumer(_channel); + consumer.Received += (model, ea) => + { + var body = ea.Body.ToArray(); + var message = Encoding.UTF8.GetString(body); + StatusMessage receivedStatusMessage = JsonSerializer.Deserialize(message); + + lock (sharedDataLock) + { + // Process the received message + if (receivedStatusMessage != null) + { + Console.WriteLine("Received a message from installation: " + receivedStatusMessage.InstallationId + " and status is: " + receivedStatusMessage.Status); + Console.WriteLine("----------------------------------------------"); + Console.WriteLine("Update installation connection table"); + var installationId = receivedStatusMessage.InstallationId; + + if (!installationConnections.ContainsKey(installationId)) + { + Console.WriteLine("Create new empty list for installation: " + installationId); + installationConnections[installationId] = new InstallationInfo + { + Status = receivedStatusMessage.Status + }; + } + + Console.WriteLine("----------------------------------------------"); + + foreach (var installationConnection in installationConnections) + { + if (installationConnection.Key == installationId && installationConnection.Value.Connections.Count > 0) + { + Console.WriteLine("Update all the connected websockets for installation " + installationId); + installationConnection.Value.Status = receivedStatusMessage.Status; + + var jsonObject = new + { + id = installationId, + status = receivedStatusMessage.Status + }; + + string jsonString = JsonSerializer.Serialize(jsonObject); + byte[] dataToSend = Encoding.UTF8.GetBytes(jsonString); + + foreach (var connection in installationConnection.Value.Connections) + { + connection.SendAsync( + new ArraySegment(dataToSend, 0, dataToSend.Length), + WebSocketMessageType.Text, + true, // Indicates that this is the end of the message + CancellationToken.None + ); + } + } + } + } + } + }; + + _channel.BasicConsume(queue: "statusQueue", autoAck: true, consumer: consumer); + + } +} \ No newline at end of file diff --git a/csharp/App/Middleware/StatusMessage.cs b/csharp/App/Middleware/StatusMessage.cs new file mode 100644 index 000000000..fd96aa3df --- /dev/null +++ b/csharp/App/Middleware/StatusMessage.cs @@ -0,0 +1,8 @@ + +namespace InnovEnergy.App.Middleware; + +public class StatusMessage +{ + public required int InstallationId { get; init; } + public required int Status { get; init; } +} \ No newline at end of file diff --git a/csharp/App/Middleware/WebSocketListener.cs b/csharp/App/Middleware/WebSocketListener.cs new file mode 100644 index 000000000..acb7f0857 --- /dev/null +++ b/csharp/App/Middleware/WebSocketListener.cs @@ -0,0 +1,141 @@ +using System.Net; +using System.Net.WebSockets; +using System.Text; +using System.Text.Json; + +namespace InnovEnergy.App.Middleware; + +public static class WebSocketListener +{ + //For each installation id, we maintain a list of the connected clients + + public static async Task StartServerAsync(Dictionary installationConnections, Object sharedDataLock) + { + var listener = new HttpListener(); + listener.Prefixes.Add("http://localhost:8080/websocket/"); + listener.Start(); + + Console.WriteLine("WebSocket server listening on http://localhost:8080/websocket/"); + + //Http listener listens for connections. When it accepts a new connection, it creates a new Task to handle this connection + while (true) + { + var context = await listener.GetContextAsync(); + if (context.Request.IsWebSocketRequest) + { + var webSocketContext = await context.AcceptWebSocketAsync(null); + var webSocket = webSocketContext.WebSocket; + + // Add the connected WebSocket to the collection + Console.WriteLine("Accepted a new websocket connection"); + HandleWebSocketConnection(webSocket, installationConnections); + } + else + { + context.Response.StatusCode = 400; + context.Response.Close(); + } + } + + async Task HandleWebSocketConnection(WebSocket currentWebSocket, Dictionary installationConnections) + { + + var buffer = new byte[4096]; + // Console.WriteLine("This is a new Task, the installation connections table is the following: "); + // foreach (var installationConnection in installationConnections) + // { + // Console.WriteLine($"Installation ID: {installationConnection.Key}, Number of Connections: {installationConnection.Value.Count}"); + // } + // Console.WriteLine("----------------------------------------------"); + + try + { + while (currentWebSocket.State == WebSocketState.Open) + { + //Listen for incoming messages on this WebSocket + var result = await currentWebSocket.ReceiveAsync(buffer, CancellationToken.None); + Console.WriteLine("Received a new message from websocket"); + if (result.MessageType != WebSocketMessageType.Text) + continue; + + var message = Encoding.UTF8.GetString(buffer, 0, result.Count); + var installationIds = JsonSerializer.Deserialize(message); + //Console.WriteLine($"length is {installationIds.Count()}"); + + lock (sharedDataLock) + { + //Each front-end will send the list of the installations it wants to access + //If this is a new key (installation id), initialize the list for this key and then add the websocket object of this client + foreach (var installationId in installationIds) + { + Console.WriteLine(installationId); + if (!installationConnections.ContainsKey(installationId)) + { + Console.WriteLine("Create new empty list for this installation id"); + installationConnections[installationId] = new InstallationInfo + { + Status = -2 + }; + } + + installationConnections[installationId].Connections.Add(currentWebSocket); + //Console.WriteLine($"ADD a new websocket, Installation ID: {installationId}, Number of Connections: {installationConnections[installationId].Count}"); + + var jsonObject = new + { + id = installationId, + status = installationConnections[installationId].Status + }; + + var jsonString = JsonSerializer.Serialize(jsonObject); + var dataToSend = Encoding.UTF8.GetBytes(jsonString); + + + currentWebSocket.SendAsync(dataToSend, + WebSocketMessageType.Text, + true, // Indicates that this is the end of the message + CancellationToken.None + ); + } + + Console.WriteLine("Printing installation connection list"); + //Print the installationConnections dictionary after inserting a websocket + Console.WriteLine("----------------------------------------------"); + foreach (var installationConnection in installationConnections) + { + Console.WriteLine("Installation ID: " + installationConnection.Key + " Number of Connections: " + installationConnection.Value.Connections.Count); + } + + Console.WriteLine("----------------------------------------------"); + } + } + + //When the front-end terminates the connection, the following code will be executed + Console.WriteLine("The connection has been terminated"); + + foreach (var installationConnection in installationConnections) + { + if (installationConnection.Value.Connections.Contains(currentWebSocket)) + { + installationConnection.Value.Connections.Remove(currentWebSocket); + } + } + + await currentWebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Connection closed by server", CancellationToken.None); + //Print the installationConnections dictionary after deleting a websocket + Console.WriteLine("Print the installation connections list after deleting a websocket"); + Console.WriteLine("----------------------------------------------"); + foreach (var installationConnection in installationConnections) + { + Console.WriteLine("Installation ID: "+ installationConnection.Key+" Number of Connections: "+installationConnection.Value.Connections.Count); + } + Console.WriteLine("----------------------------------------------"); + } + catch (Exception ex) + { + Console.WriteLine("WebSocket error: " + ex.Message); + } + } + } +} + diff --git a/csharp/App/SaliMax/SaliMax.csproj b/csharp/App/SaliMax/SaliMax.csproj index e350ddf37..0afc37933 100644 --- a/csharp/App/SaliMax/SaliMax.csproj +++ b/csharp/App/SaliMax/SaliMax.csproj @@ -8,6 +8,7 @@ + diff --git a/csharp/App/SaliMax/src/MiddlewareClasses/StatusMessage.cs b/csharp/App/SaliMax/src/MiddlewareClasses/StatusMessage.cs new file mode 100644 index 000000000..8af41f9a3 --- /dev/null +++ b/csharp/App/SaliMax/src/MiddlewareClasses/StatusMessage.cs @@ -0,0 +1,8 @@ + +namespace InnovEnergy.App.SaliMax.MiddlewareClasses; + +public class StatusMessage +{ + public required int InstallationId { get; init; } + public required int Status { get; init; } +} \ No newline at end of file diff --git a/csharp/App/SaliMax/src/Program.cs b/csharp/App/SaliMax/src/Program.cs index b3cf2c582..4d4fd2740 100644 --- a/csharp/App/SaliMax/src/Program.cs +++ b/csharp/App/SaliMax/src/Program.cs @@ -1,8 +1,13 @@ +using System.Net; +using System.Net.NetworkInformation; +using System.Net.Sockets; using System.Reactive.Linq; using System.Reactive.Threading.Tasks; +using System.Text; using Flurl.Http; using InnovEnergy.App.SaliMax.Devices; using InnovEnergy.App.SaliMax.Ess; +using InnovEnergy.App.SaliMax.MiddlewareClasses; using InnovEnergy.App.SaliMax.SaliMaxRelays; using InnovEnergy.App.SaliMax.System; using InnovEnergy.App.SaliMax.SystemConfig; @@ -18,6 +23,8 @@ using InnovEnergy.Lib.Devices.Trumpf.TruConvertDc.Control; using InnovEnergy.Lib.Protocols.Modbus.Channels; using InnovEnergy.Lib.Units; using InnovEnergy.Lib.Utils; +using System.Text.Json; +using RabbitMQ.Client; using static InnovEnergy.Lib.Devices.Trumpf.SystemControl.DataTypes.SystemConfig; using DeviceState = InnovEnergy.App.SaliMax.Devices.DeviceState; @@ -29,7 +36,6 @@ namespace InnovEnergy.App.SaliMax; internal static class Program { private static readonly TimeSpan UpdateInterval = TimeSpan.FromSeconds(2); - private static readonly IReadOnlyList BatteryNodes; private static readonly Channel TruConvertAcChannel ; @@ -41,6 +47,15 @@ internal static class Program private static readonly Channel PvOnAcIsland ; private static readonly Channel RelaysChannel ; private static readonly Channel BatteriesChannel ; + + private static IPAddress _controllerIpAddress; + private static UdpClient _udpListener; + private const string VpnServerIp = "10.2.0.11"; + private static ConnectionFactory? _factory ; + private static IConnection ?_connection; + private static IModel? _channel; + private static bool _subscribedToQueue = false; + private static SalimaxAlarmState _prevSalimaxState = SalimaxAlarmState.Orange; static Program() { @@ -70,6 +85,22 @@ internal static class Program public static async Task Main(String[] args) { + + _controllerIpAddress=FindVpnIp(); + if (_controllerIpAddress == null) + { + Console.WriteLine("There is no VPN interface, exiting..."); + return; + } + + + int udpPort = 9000; + IPEndPoint endPoint = new IPEndPoint(_controllerIpAddress, udpPort); + + _udpListener = new UdpClient(); + _udpListener.Client.Blocking = false; + _udpListener.Client.Bind(endPoint); + while (true) { //CreateAverage(); @@ -149,7 +180,6 @@ internal static class Program LoadOnAcGrid = gridBusLoad, LoadOnAcIsland = loadOnAcIsland, LoadOnDc = dcLoad, - StateMachine = StateMachine.Default, EssControl = EssControl.Default, Log = new SystemLog { SalimaxAlarmState = SalimaxAlarmState.Green, Message = null }, //TODO: Put real stuff @@ -183,9 +213,24 @@ internal static class Program { Watchdog.NotifyAlive(); - var record = ReadStatus(); - - var salimaxAlarmsState = GetSalimaxStateAlarm(record); + var record = ReadStatus(); + var currentSalimaxState = GetSalimaxStateAlarm(record); + + //-------------------------------------------------------------------------------------------- + //If already subscribed to the queue and the status has been changed, update the queue + if (_subscribedToQueue && currentSalimaxState != _prevSalimaxState) + { + _prevSalimaxState = currentSalimaxState; + InformMiddleware(Config.Load().S3.Bucket, (int)currentSalimaxState); + } + + //If there is an available message, subscribe to the queue + if (_udpListener.Available > 0) + { + subscribedToQueue(currentSalimaxState); + } + //-------------------------------------------------------------------------------------------- + record.ControlConstants(); record.ControlSystemState(); @@ -219,6 +264,73 @@ internal static class Program // ReSharper disable once FunctionNeverReturns } + private static void subscribedToQueue(SalimaxAlarmState currentSalimaxState) + { + IPEndPoint serverEndpoint = null; + byte[] udpMessage = _udpListener.Receive(ref serverEndpoint); + + string message = Encoding.UTF8.GetString(udpMessage); + Console.WriteLine($"Received a message: {message}"); + + string replyMessage = "ACK"; + byte[] replyData = Encoding.UTF8.GetBytes(replyMessage); + + // Send the reply to the sender's endpoint + _udpListener.Send(replyData, replyData.Length, serverEndpoint); + Console.WriteLine($"Replied to {serverEndpoint}: {replyMessage}"); + + _factory = new ConnectionFactory { HostName = VpnServerIp }; + _connection = _factory.CreateConnection(); + _channel = _connection.CreateModel(); + _channel.QueueDeclare(queue: "statusQueue", durable: false, exclusive: false, autoDelete: false, arguments: null); + + Console.WriteLine("The controller sends its status to the middleware for the first time"); + InformMiddleware(Config.Load().S3.Bucket, (int)currentSalimaxState); + _subscribedToQueue = true; + } + + private static IPAddress FindVpnIp() + { + string interfaceName = "innovenergy"; + IPAddress controllerIpAddress = null; + + NetworkInterface[] networkInterfaces = NetworkInterface.GetAllNetworkInterfaces(); + foreach (NetworkInterface networkInterface in networkInterfaces) + { + if (networkInterface.Name == interfaceName) + { + IPInterfaceProperties ipProps = networkInterface.GetIPProperties(); + UnicastIPAddressInformationCollection unicastIPs = ipProps.UnicastAddresses; + Console.WriteLine("VPN IP is: "+unicastIPs[0].Address); + controllerIpAddress = unicastIPs[0].Address; + break; + } + } + + return (controllerIpAddress); + } + + private static void InformMiddleware(string bucket, int status) + { + int.TryParse(bucket[0].ToString(), out var installationId); + + var jsonObject = new StatusMessage + { + InstallationId = installationId, + Status = status + }; + + var message = JsonSerializer.Serialize(jsonObject); + var body = Encoding.UTF8.GetBytes(message); + + _channel.BasicPublish(exchange: string.Empty, + routingKey: "statusQueue", + basicProperties: null, + body: body); + Console.WriteLine($"Producer sent message: {message}"); + + } + private static SalimaxAlarmState GetSalimaxStateAlarm(StatusRecord record) { var alarmCondition = record.DetectAlarmStates();