using System.Net; using System.Net.Sockets; using System.Net.WebSockets; using System.Text; using System.Text.Json; using InnovEnergy.App.Backend.Database; using InnovEnergy.App.Backend.DataTypes; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace InnovEnergy.App.Backend.Websockets; public static class WebsocketManager { public static Dictionary InstallationConnections = new Dictionary(); public static ConnectionFactory Factory = null!; public static IConnection Connection = null!; public static IModel Channel = null!; public static void InformInstallationsToSubscribeToRabbitMq() { var installationIps = Db.Installations.Select(inst => inst.VpnIp).ToList(); Console.WriteLine("Count is "+installationIps.Count); var maxRetransmissions = 2; UdpClient udpClient = new UdpClient(); udpClient.Client.ReceiveTimeout = 2000; int port = 9000; //Send a message to each installation and tell it to subscribe to the queue using (udpClient) { for (int i = 0; i < installationIps.Count; i++) { if(installationIps[i]==""){continue;} Console.WriteLine("-----------------------------------------------------------"); Console.WriteLine("Trying to reach installation with IP: " + installationIps[i]); //Try at most MAX_RETRANSMISSIONS times to reach an installation. for (int j = 0; j < maxRetransmissions; j++) { string message = "This is a message from RabbitMQ server, you can subscribe to the RabbitMQ queue"; byte[] data = Encoding.UTF8.GetBytes(message); udpClient.Send(data, data.Length, installationIps[i], port); Console.WriteLine($"Sent UDP message to {installationIps[i]}:{port}: {message}"); IPEndPoint remoteEndPoint = new IPEndPoint(IPAddress.Parse(installationIps[i]), port); try { byte[] replyData = udpClient.Receive(ref remoteEndPoint); string replyMessage = Encoding.UTF8.GetString(replyData); Console.WriteLine("Received " + replyMessage + " from installation " + installationIps[i]); break; } catch (SocketException ex) { if (ex.SocketErrorCode == SocketError.TimedOut){Console.WriteLine("Timed out waiting for a response. Retry...");} else{Console.WriteLine("Error: " + ex.Message);} } } } } Console.WriteLine("Start RabbitMQ Consumer"); } //Every 1 minute, check the timestamp of the latest received message for every installation. //If the difference between the two timestamps is more than one minute, we consider this installation unavailable. public static async Task MonitorInstallationTable() { while (true){ lock (InstallationConnections){ foreach (var installationConnection in InstallationConnections){ if ((DateTime.Now - installationConnection.Value.Timestamp) > TimeSpan.FromMinutes(1)){ installationConnection.Value.Status = -1; if (installationConnection.Value.Connections.Count > 0){InformWebsocketsForInstallation(installationConnection.Key);} } } } await Task.Delay(TimeSpan.FromMinutes(1)); } } public static async Task StartRabbitMqConsumer() { var consumer = new EventingBasicConsumer(Channel); consumer.Received += (_, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); StatusMessage? receivedStatusMessage = JsonSerializer.Deserialize(message); lock (InstallationConnections) { //Consumer received a message if (receivedStatusMessage != null) { Console.WriteLine("Received a message from installation: " + receivedStatusMessage.InstallationId + " and status is: " + receivedStatusMessage.Status); Console.WriteLine("----------------------------------------------"); Console.WriteLine("Update installation connection table"); var installationId = receivedStatusMessage.InstallationId; //This is a heartbit message, just update the timestamp for this installation. //There is no need to notify the corresponding front-ends. if (receivedStatusMessage.Type == MessageType.Heartbit) { InstallationConnections[installationId].Timestamp = DateTime.Now; } else { //Traverse the Warnings list, and store each of them to the database if (receivedStatusMessage.Warnings != null) { foreach (var warning in receivedStatusMessage.Warnings) { Warning newWarning = new Warning { InstallationId = receivedStatusMessage.InstallationId, Description = warning.Description, Date = warning.Date, Time = warning.Time, DeviceCreatedTheMessage = warning.CreatedBy, Seen = false }; //Create a new warning and add it to the database Db.HandleWarning(newWarning, receivedStatusMessage.InstallationId); } } //Traverse the Alarm list, and store each of them to the database if (receivedStatusMessage.Alarms != null) { foreach (var alarm in receivedStatusMessage.Alarms) { Error newError = new Error { InstallationId = receivedStatusMessage.InstallationId, Description = alarm.Description, Date = alarm.Date, Time = alarm.Time, DeviceCreatedTheMessage = alarm.CreatedBy, Seen = false }; //Create a new error and add it to the database Db.HandleError(newError, receivedStatusMessage.InstallationId); } } //This installation id does not exist in our data structure, add it. if (!InstallationConnections.ContainsKey(installationId)) { Console.WriteLine("Create new empty list for installation: " + installationId); InstallationConnections[installationId] = new InstallationInfo { Status = receivedStatusMessage.Status, Timestamp = DateTime.Now }; } else { InstallationConnections[installationId].Status = receivedStatusMessage.Status; InstallationConnections[installationId].Timestamp = DateTime.Now; } Console.WriteLine("----------------------------------------------"); //Update all the connected front-ends regarding this installation if(InstallationConnections[installationId].Connections.Count > 0) { InformWebsocketsForInstallation(installationId); } } } } }; Channel.BasicConsume(queue: "statusQueue", autoAck: true, consumer: consumer); } //Inform all the connected websockets regarding installation "installationId" public static void InformWebsocketsForInstallation(int installationId) { var installationConnection = InstallationConnections[installationId]; Console.WriteLine("Update all the connected websockets for installation " + installationId); var jsonObject = new { id = installationId, status = installationConnection.Status }; string jsonString = JsonSerializer.Serialize(jsonObject); byte[] dataToSend = Encoding.UTF8.GetBytes(jsonString); foreach (var connection in installationConnection.Connections) { connection.SendAsync( new ArraySegment(dataToSend, 0, dataToSend.Length), WebSocketMessageType.Text, true, // Indicates that this is the end of the message CancellationToken.None ); } } public static async Task HandleWebSocketConnection(WebSocket currentWebSocket) { var buffer = new byte[4096]; try { while (currentWebSocket.State == WebSocketState.Open) { //Listen for incoming messages on this WebSocket var result = await currentWebSocket.ReceiveAsync(buffer, CancellationToken.None); if (result.MessageType != WebSocketMessageType.Text) continue; var message = Encoding.UTF8.GetString(buffer, 0, result.Count); var installationIds = JsonSerializer.Deserialize(message); //This is a ping message to keep the connection alive, reply with a pong if (installationIds[0] == -1) { var jsonObject = new { id = -1, status = -1 }; var jsonString = JsonSerializer.Serialize(jsonObject); var dataToSend = Encoding.UTF8.GetBytes(jsonString); currentWebSocket.SendAsync(dataToSend, WebSocketMessageType.Text, true, CancellationToken.None ); continue; } Console.WriteLine("Received a new message from websocket"); lock (InstallationConnections) { //Each front-end will send the list of the installations it wants to access //If this is a new key (installation id), initialize the list for this key and then add the websocket object for this client //Then, report the status of each requested installation to the front-end that created the websocket connection foreach (var installationId in installationIds) { if (!InstallationConnections.ContainsKey(installationId)) { Console.WriteLine("Create new empty list for installation id " + installationId); InstallationConnections[installationId] = new InstallationInfo { Status = -1 }; } InstallationConnections[installationId].Connections.Add(currentWebSocket); var jsonObject = new { id = installationId, status = InstallationConnections[installationId].Status }; var jsonString = JsonSerializer.Serialize(jsonObject); var dataToSend = Encoding.UTF8.GetBytes(jsonString); currentWebSocket.SendAsync(dataToSend, WebSocketMessageType.Text, true, // Indicates that this is the end of the message CancellationToken.None ); } Console.WriteLine("Printing installation connection list"); Console.WriteLine("----------------------------------------------"); foreach (var installationConnection in InstallationConnections) { Console.WriteLine("Installation ID: " + installationConnection.Key + " Number of Connections: " + installationConnection.Value.Connections.Count); } Console.WriteLine("----------------------------------------------"); } } lock (InstallationConnections) { //When the front-end terminates the connection, the following code will be executed Console.WriteLine("The connection has been terminated"); foreach (var installationConnection in InstallationConnections) { if (installationConnection.Value.Connections.Contains(currentWebSocket)) { installationConnection.Value.Connections.Remove(currentWebSocket); } } } await currentWebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Connection closed by server", CancellationToken.None); lock (InstallationConnections) { //Print the installationConnections dictionary after deleting a websocket Console.WriteLine("Print the installation connections list after deleting a websocket"); Console.WriteLine("----------------------------------------------"); foreach (var installationConnection in InstallationConnections) { Console.WriteLine("Installation ID: " + installationConnection.Key + " Number of Connections: " + installationConnection.Value.Connections.Count); } Console.WriteLine("----------------------------------------------"); } } catch (Exception ex) { Console.WriteLine("WebSocket error: " + ex.Message); } } }