using System.Net; using System.Net.Sockets; using System.Text; using System.Text.Json; using InnovEnergy.App.Backend.Database; using InnovEnergy.App.Backend.DataTypes; using RabbitMQ.Client; using RabbitMQ.Client.Events; using InnovEnergy.Lib.Mailer; namespace InnovEnergy.App.Backend.Websockets; public static class RabbitMqManager { public static ConnectionFactory Factory = null!; public static IConnection Connection = null!; public static IModel Channel = null!; public static void InitializeEnvironment() { //string vpnServerIp = "194.182.190.208"; string vpnServerIp = "10.2.0.11"; //Subscribe to RabbitMq queue as a consumer Factory = new ConnectionFactory { HostName = vpnServerIp, Port = 5672, VirtualHost = "/", UserName = "consumer", Password = "faceaddb5005815199f8366d3d15ff8a", }; Connection = Factory.CreateConnection(); Channel = Connection.CreateModel(); Console.WriteLine("Middleware subscribed to RabbitMQ queue, ready for receiving messages"); Channel.QueueDeclare(queue: "statusQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); } public static async Task StartRabbitMqConsumer() { //Wait to receive a message from an installation var consumer = new EventingBasicConsumer(Channel); consumer.Received += (_, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); //A message can be an alarm, a warning or a heartbit StatusMessage? receivedStatusMessage = JsonSerializer.Deserialize(message); lock (WebsocketManager.InstallationConnections) { //Consumer received a message if (receivedStatusMessage != null) { Installation installation = Db.Installations.FirstOrDefault(f => f.Product == receivedStatusMessage.Product && f.S3BucketId == receivedStatusMessage.InstallationId); int installationId = (int)installation.Id; //if (installationId == 138) //{ // Console.WriteLine("Received a message from installation: " + installationId + " , product is: " + receivedStatusMessage.Product + " and status is: " + receivedStatusMessage.Status); //} //This is a heartbit message, just update the timestamp for this installation. //There is no need to notify the corresponding front-ends. //Every 15 iterations(30 seconds), the installation sends a heartbit message to the queue if (receivedStatusMessage.Type == MessageType.Heartbit) { if (installation.Product == 1 && installation.Device == 2) { Console.WriteLine("This is a heartbit message from installation: " + installationId + " Name of the file is " + receivedStatusMessage.Timestamp); } } else { //Traverse the Warnings list, and store each of them to the database if (receivedStatusMessage.Warnings != null) { foreach (var warning in receivedStatusMessage.Warnings) { Warning newWarning = new Warning { InstallationId = installationId, Description = warning.Description, Date = warning.Date, Time = warning.Time, DeviceCreatedTheMessage = warning.CreatedBy, Seen = false }; //Create a new warning and add it to the database Console.WriteLine("Add a warning for installation "+installationId); Db.HandleWarning(newWarning, installationId); } } //Traverse the Alarm list, and store each of them to the database if (receivedStatusMessage.Alarms != null) { string monitorLink; if (installation.Product == 0) { monitorLink = $"https://monitor.innov.energy/installations/list/installation/{installation.S3BucketId}/batteryview"; } else { monitorLink = $"https://monitor.innov.energy/salidomo_installations/list/installation/{installation.S3BucketId}/batteryview"; } foreach (var alarm in receivedStatusMessage.Alarms) { Error newError = new Error { InstallationId = installation.Id, Description = alarm.Description, Date = alarm.Date, Time = alarm.Time, DeviceCreatedTheMessage = alarm.CreatedBy, Seen = false }; //Console.WriteLine("Add an alarm for installation "+installationId); // Send replace battery email to support team if this alarm is "NeedToReplaceBattery" if (alarm.Description == "2 or more string are disabled") { Console.WriteLine("Send replace battery email to the support team for installation "+installationId); string recipient = "support@innov.energy"; string subject = $"Battery Alarm from {installation.InstallationName}: 2 or more strings broken"; string text = $"Dear InnovEnergy Support Team,\n" + $"\n"+ $"Installation Name: {installation.InstallationName}\n"+ $"\n"+ $"Installation Monitor Link: {monitorLink}\n"+ $"\n"+ $"Please exchange: {alarm.CreatedBy}\n"+ $"\n"+ $"Error created date and time: {alarm.Date} {alarm.Time}\n"+ $"\n"+ $"Thank you for your great support:)"; // Disable this function now //Mailer.Send("InnovEnergy Support Team", recipient, subject, text); } //Create a new error and add it to the database Db.HandleError(newError, installationId); } } } var prevStatus = 0; //This installation id does not exist in our data structure, add it. if (!WebsocketManager.InstallationConnections.ContainsKey(installationId)) { prevStatus = -2; //Console.WriteLine("Create new empty list for installation: " + installationId); WebsocketManager.InstallationConnections[installationId] = new InstallationInfo { Status = receivedStatusMessage.Status, Timestamp = DateTime.Now, Product = installation.Product }; } else { prevStatus = WebsocketManager.InstallationConnections[installationId].Status; WebsocketManager.InstallationConnections[installationId].Status = receivedStatusMessage.Status; WebsocketManager.InstallationConnections[installationId].Timestamp = DateTime.Now; } //Console.WriteLine("----------------------------------------------"); //If the status has changed, update all the connected front-ends regarding this installation if(prevStatus != receivedStatusMessage.Status && WebsocketManager.InstallationConnections[installationId].Connections.Count > 0) { WebsocketManager.InformWebsocketsForInstallation(installationId); } } } }; Channel.BasicConsume(queue: "statusQueue", autoAck: true, consumer: consumer); } }