From 30a3a05de0f704cd2f90dbd4028438ba93d8b207 Mon Sep 17 00:00:00 2001 From: Noe Date: Mon, 6 Nov 2023 14:26:15 +0100 Subject: [PATCH] Fixed websockets, updated Salimax controller code --- csharp/App/Middleware/Program.cs | 50 +++++++++++++++------- csharp/App/Middleware/RabbitMQConsumer.cs | 4 +- csharp/App/Middleware/WebSocketListener.cs | 24 +++-------- csharp/App/SaliMax/src/Program.cs | 42 ++++++++++-------- 4 files changed, 66 insertions(+), 54 deletions(-) diff --git a/csharp/App/Middleware/Program.cs b/csharp/App/Middleware/Program.cs index 091e76a0f..65bf0efb9 100644 --- a/csharp/App/Middleware/Program.cs +++ b/csharp/App/Middleware/Program.cs @@ -9,38 +9,58 @@ internal class Program { public static readonly object SharedDataLock = new object(); - public static void Main(string[] args) + public static async Task Main(string[] args) { //For each installation id, we maintain a list of the connected clients var installationConnections = new Dictionary(); var installationsIds = new List {1}; var installationIps = new List {"10.2.3.115"}; + var MAX_RETRANSMISSIONS = 2; RabbitMqConsumer.StartRabbitMqConsumer(installationConnections,SharedDataLock); - + UdpClient udpClient = new UdpClient(); + udpClient.Client.ReceiveTimeout = 2000; int port = 9000; + //Send a message to each installation and tell it to subscribe to the queue for (int i = 0; i < installationsIds.Count; i++) { using (udpClient) { - // Convert the message to bytes (UTF-8 encoding is used here) - string message = "This is a message from RabbitMQ server, you can subscribe to the RabbitMQ queue"; - byte[] data = Encoding.UTF8.GetBytes(message); + //Try at most MAX_RETRANSMISSIONS times to reach an installation. + for (int j = 0; j < MAX_RETRANSMISSIONS; j++) + { + string message = "This is a message from RabbitMQ server, you can subscribe to the RabbitMQ queue"; + byte[] data = Encoding.UTF8.GetBytes(message); + udpClient.Send(data, data.Length, installationIps[i], port); - // Send the UDP message to the specified IP address and port - udpClient.Send(data, data.Length, installationIps[i], port); - - Console.WriteLine($"Sent UDP message to {installationIps[i]}:{port}: {message}"); - IPEndPoint remoteEndPoint = new IPEndPoint(IPAddress.Parse(installationIps[i]), port); - byte[] replyData = udpClient.Receive(ref remoteEndPoint); - string replyMessage = Encoding.UTF8.GetString(replyData); - Console.WriteLine("Received message from installation "+installationsIds[i]); + Console.WriteLine($"Sent UDP message to {installationIps[i]}:{port}: {message}"); + IPEndPoint remoteEndPoint = new IPEndPoint(IPAddress.Parse(installationIps[i]), port); + + try + { + byte[] replyData = udpClient.Receive(ref remoteEndPoint); + string replyMessage = Encoding.UTF8.GetString(replyData); + Console.WriteLine("Received message from installation " + installationsIds[i]); + break; + } + catch (SocketException ex) + { + if (ex.SocketErrorCode == SocketError.TimedOut) + { + Console.WriteLine("Timed out waiting for a response. Retry..."); + } + else + { + Console.WriteLine("Error: " + ex.Message); + } + } + } } } - WebSocketListener.StartServerAsync(installationConnections,SharedDataLock); Console.WriteLine("WebSocket server is running. Press Enter to exit."); - Console.ReadLine(); + Console.WriteLine("WebSocket server is running. Press Enter to exit."); + await WebSocketListener.StartServerAsync(installationConnections,SharedDataLock); } } \ No newline at end of file diff --git a/csharp/App/Middleware/RabbitMQConsumer.cs b/csharp/App/Middleware/RabbitMQConsumer.cs index b3379ea33..90967731e 100644 --- a/csharp/App/Middleware/RabbitMQConsumer.cs +++ b/csharp/App/Middleware/RabbitMQConsumer.cs @@ -1,6 +1,5 @@ using System.Net.WebSockets; using System.Text.Json; - namespace InnovEnergy.App.Middleware; using System.Text; using RabbitMQ.Client; @@ -27,14 +26,13 @@ public static class RabbitMqConsumer { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); - StatusMessage receivedStatusMessage = JsonSerializer.Deserialize(message); + StatusMessage? receivedStatusMessage = JsonSerializer.Deserialize(message); lock (sharedDataLock) { // Process the received message if (receivedStatusMessage != null) { - Console.WriteLine("Received a message from installation:"+receivedStatusMessage); Console.WriteLine("Received a message from installation: " + receivedStatusMessage.InstallationId + " and status is: " + receivedStatusMessage.Status); Console.WriteLine("----------------------------------------------"); Console.WriteLine("Update installation connection table"); diff --git a/csharp/App/Middleware/WebSocketListener.cs b/csharp/App/Middleware/WebSocketListener.cs index acb7f0857..5abf6c263 100644 --- a/csharp/App/Middleware/WebSocketListener.cs +++ b/csharp/App/Middleware/WebSocketListener.cs @@ -7,15 +7,13 @@ namespace InnovEnergy.App.Middleware; public static class WebSocketListener { - //For each installation id, we maintain a list of the connected clients public static async Task StartServerAsync(Dictionary installationConnections, Object sharedDataLock) { var listener = new HttpListener(); - listener.Prefixes.Add("http://localhost:8080/websocket/"); - listener.Start(); + listener.Prefixes.Add("http://127.0.0.1:8080/"); - Console.WriteLine("WebSocket server listening on http://localhost:8080/websocket/"); + listener.Start(); //Http listener listens for connections. When it accepts a new connection, it creates a new Task to handle this connection while (true) @@ -37,16 +35,11 @@ public static class WebSocketListener } } + //We have a task per websocket connection async Task HandleWebSocketConnection(WebSocket currentWebSocket, Dictionary installationConnections) { var buffer = new byte[4096]; - // Console.WriteLine("This is a new Task, the installation connections table is the following: "); - // foreach (var installationConnection in installationConnections) - // { - // Console.WriteLine($"Installation ID: {installationConnection.Key}, Number of Connections: {installationConnection.Value.Count}"); - // } - // Console.WriteLine("----------------------------------------------"); try { @@ -60,18 +53,17 @@ public static class WebSocketListener var message = Encoding.UTF8.GetString(buffer, 0, result.Count); var installationIds = JsonSerializer.Deserialize(message); - //Console.WriteLine($"length is {installationIds.Count()}"); lock (sharedDataLock) { //Each front-end will send the list of the installations it wants to access - //If this is a new key (installation id), initialize the list for this key and then add the websocket object of this client + //If this is a new key (installation id), initialize the list for this key and then add the websocket object for this client + //Then, report the status of each requested installation to the front-end that created the websocket connection foreach (var installationId in installationIds) { - Console.WriteLine(installationId); if (!installationConnections.ContainsKey(installationId)) { - Console.WriteLine("Create new empty list for this installation id"); + Console.WriteLine("Create new empty list for installation id "+installationId); installationConnections[installationId] = new InstallationInfo { Status = -2 @@ -79,7 +71,6 @@ public static class WebSocketListener } installationConnections[installationId].Connections.Add(currentWebSocket); - //Console.WriteLine($"ADD a new websocket, Installation ID: {installationId}, Number of Connections: {installationConnections[installationId].Count}"); var jsonObject = new { @@ -99,20 +90,17 @@ public static class WebSocketListener } Console.WriteLine("Printing installation connection list"); - //Print the installationConnections dictionary after inserting a websocket Console.WriteLine("----------------------------------------------"); foreach (var installationConnection in installationConnections) { Console.WriteLine("Installation ID: " + installationConnection.Key + " Number of Connections: " + installationConnection.Value.Connections.Count); } - Console.WriteLine("----------------------------------------------"); } } //When the front-end terminates the connection, the following code will be executed Console.WriteLine("The connection has been terminated"); - foreach (var installationConnection in installationConnections) { if (installationConnection.Value.Connections.Contains(currentWebSocket)) diff --git a/csharp/App/SaliMax/src/Program.cs b/csharp/App/SaliMax/src/Program.cs index 07ad3c078..32e391b31 100644 --- a/csharp/App/SaliMax/src/Program.cs +++ b/csharp/App/SaliMax/src/Program.cs @@ -57,6 +57,7 @@ internal static class Program private static IConnection ? _connection; private static IModel? _channel; private static Boolean _subscribedToQueue = false; + private static Boolean _subscribeToQueueForTheFirstTime = false; private static SalimaxAlarmState _prevSalimaxState = SalimaxAlarmState.Green; static Program() @@ -220,6 +221,7 @@ internal static class Program var record = ReadStatus(); var currentSalimaxState = GetSalimaxStateAlarm(record); + SendSalimaxStateAlarm(currentSalimaxState); record.ControlConstants(); @@ -257,6 +259,13 @@ internal static class Program private static void SendSalimaxStateAlarm(SalimaxAlarmState currentSalimaxState) { var s3Bucket = Config.Load().S3?.Bucket; + + //When the controller boots, it tries to subscribe to the queue + if (_subscribeToQueueForTheFirstTime==false) + { + _subscribeToQueueForTheFirstTime = true; + SubscribeToQueue(currentSalimaxState, s3Bucket); + } //If already subscribed to the queue and the status has been changed, update the queue if (_subscribedToQueue && currentSalimaxState != _prevSalimaxState) @@ -266,37 +275,34 @@ internal static class Program InformMiddleware(s3Bucket, (Int32)currentSalimaxState); } - //If there is an available message, subscribe to the queue + //If there is an available message from the RabbitMQ Broker, subscribe to the queue if (_udpListener.Available > 0) { + IPEndPoint? serverEndpoint = null; + + var replyMessage = "ACK"; + var replyData = Encoding.UTF8.GetBytes(replyMessage); + + var udpMessage = _udpListener.Receive(ref serverEndpoint); + var message = Encoding.UTF8.GetString(udpMessage); + + Console.WriteLine($"Received a message: {message}"); + + // Send the reply to the sender's endpoint + _udpListener.Send(replyData, replyData.Length, serverEndpoint); + Console.WriteLine($"Replied to {serverEndpoint}: {replyMessage}"); + SubscribeToQueue(currentSalimaxState, s3Bucket); } } private static void SubscribeToQueue(SalimaxAlarmState currentSalimaxState, String? s3Bucket) { - IPEndPoint? serverEndpoint = null; - - var replyMessage = "ACK"; - var replyData = Encoding.UTF8.GetBytes(replyMessage); - - var udpMessage = _udpListener.Receive(ref serverEndpoint); - var message = Encoding.UTF8.GetString(udpMessage); - - Console.WriteLine($"Received a message: {message}"); - - // Send the reply to the sender's endpoint - _udpListener.Send(replyData, replyData.Length, serverEndpoint); - Console.WriteLine($"Replied to {serverEndpoint}: {replyMessage}"); - _factory = new ConnectionFactory { HostName = VpnServerIp }; _connection = _factory.CreateConnection(); _channel = _connection.CreateModel(); - _channel.QueueDeclare(queue: "statusQueue", durable: false, exclusive: false, autoDelete: false, arguments: null); - Console.WriteLine("The controller sends its status to the middleware for the first time"); - if (s3Bucket != null) InformMiddleware(s3Bucket, (Int32)currentSalimaxState); _subscribedToQueue = true;