using System.Net; using System.Net.Sockets; using System.Text; using System.Text.Json; using InnovEnergy.App.Backend.Database; using InnovEnergy.App.Backend.DataTypes; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace InnovEnergy.App.Backend.Websockets; public static class RabbitMqManager { public static ConnectionFactory Factory = null!; public static IConnection Connection = null!; public static IModel Channel = null!; public static void InitializeEnvironment() { //string vpnServerIp = "194.182.190.208"; string vpnServerIp = "10.2.0.11"; //Subscribe to RabbitMq queue as a consumer Factory = new ConnectionFactory { HostName = vpnServerIp, Port = 5672, VirtualHost = "/", UserName = "consumer", Password = "faceaddb5005815199f8366d3d15ff8a", }; Connection = Factory.CreateConnection(); Channel = Connection.CreateModel(); Console.WriteLine("Middleware subscribed to RabbitMQ queue, ready for receiving messages"); Channel.QueueDeclare(queue: "statusQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); } public static async Task StartRabbitMqConsumer() { //Wait to receive a message from an installation var consumer = new EventingBasicConsumer(Channel); consumer.Received += (_, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); //A message can be an alarm, a warning or a heartbit StatusMessage? receivedStatusMessage = JsonSerializer.Deserialize(message); lock (WebsocketManager.InstallationConnections) { //Consumer received a message if (receivedStatusMessage != null) { Console.WriteLine("----------------------------------------------"); int installationId = (int)Db.Installations.Where(f => f.Product == receivedStatusMessage.Product && f.S3BucketId == receivedStatusMessage.InstallationId).Select(f => f.Id).FirstOrDefault(); Console.WriteLine("Received a message from installation: " + installationId + " , product is: "+receivedStatusMessage.Product+ " and status is: " + receivedStatusMessage.Status); //This is a heartbit message, just update the timestamp for this installation. //There is no need to notify the corresponding front-ends. //Every 15 iterations(30 seconds), the installation sends a heartbit message to the queue if (receivedStatusMessage.Type == MessageType.Heartbit) { Console.WriteLine("This is a heartbit message from installation: " + installationId); } else { //Traverse the Warnings list, and store each of them to the database if (receivedStatusMessage.Warnings != null) { foreach (var warning in receivedStatusMessage.Warnings) { Warning newWarning = new Warning { InstallationId = installationId, Description = warning.Description, Date = warning.Date, Time = warning.Time, DeviceCreatedTheMessage = warning.CreatedBy, Seen = false }; //Create a new warning and add it to the database Db.HandleWarning(newWarning, installationId); } } //Traverse the Alarm list, and store each of them to the database if (receivedStatusMessage.Alarms != null) { Console.WriteLine("Add an alarm for installation "+installationId); foreach (var alarm in receivedStatusMessage.Alarms) { Error newError = new Error { InstallationId = installationId, Description = alarm.Description, Date = alarm.Date, Time = alarm.Time, DeviceCreatedTheMessage = alarm.CreatedBy, Seen = false }; //Create a new error and add it to the database Db.HandleError(newError, installationId); } } } var prevStatus = 0; //This installation id does not exist in our data structure, add it. if (!WebsocketManager.InstallationConnections.ContainsKey(installationId)) { prevStatus = -2; Console.WriteLine("Create new empty list for installation: " + installationId); WebsocketManager.InstallationConnections[installationId] = new InstallationInfo { Status = receivedStatusMessage.Status, Timestamp = DateTime.Now }; } else { prevStatus = WebsocketManager.InstallationConnections[installationId].Status; WebsocketManager.InstallationConnections[installationId].Status = receivedStatusMessage.Status; WebsocketManager.InstallationConnections[installationId].Timestamp = DateTime.Now; } //Console.WriteLine("----------------------------------------------"); //If the status has changed, update all the connected front-ends regarding this installation if(prevStatus != receivedStatusMessage.Status && WebsocketManager.InstallationConnections[installationId].Connections.Count > 0) { WebsocketManager.InformWebsocketsForInstallation(installationId); } } } }; Channel.BasicConsume(queue: "statusQueue", autoAck: true, consumer: consumer); } public static void InformInstallationsToSubscribeToRabbitMq() { var installationIps = Db.Installations.Select(inst => inst.VpnIp).ToList(); Console.WriteLine("Count is "+installationIps.Count); var maxRetransmissions = 2; UdpClient udpClient = new UdpClient(); udpClient.Client.ReceiveTimeout = 2000; int port = 9000; //Send a message to each installation and tell it to subscribe to the queue using (udpClient) { for (int i = 0; i < installationIps.Count; i++) { if(installationIps[i]==""){continue;} Console.WriteLine("-----------------------------------------------------------"); Console.WriteLine("Trying to reach installation with IP: " + installationIps[i]); //Try at most MAX_RETRANSMISSIONS times to reach an installation. for (int j = 0; j < maxRetransmissions; j++) { string message = "This is a message from RabbitMQ server, you can subscribe to the RabbitMQ queue"; byte[] data = Encoding.UTF8.GetBytes(message); udpClient.Send(data, data.Length, installationIps[i], port); Console.WriteLine($"Sent UDP message to {installationIps[i]}:{port}: {message}"); IPEndPoint remoteEndPoint = new IPEndPoint(IPAddress.Parse(installationIps[i]), port); try { byte[] replyData = udpClient.Receive(ref remoteEndPoint); string replyMessage = Encoding.UTF8.GetString(replyData); Console.WriteLine("Received " + replyMessage + " from installation " + installationIps[i]); break; } catch (SocketException ex) { if (ex.SocketErrorCode == SocketError.TimedOut){Console.WriteLine("Timed out waiting for a response. Retry...");} else{Console.WriteLine("Error: " + ex.Message);} } } } } Console.WriteLine("Start RabbitMQ Consumer"); } }