using System.Net.WebSockets; using System.Text.Json; namespace InnovEnergy.App.Middleware; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; public static class RabbitMqConsumer { private static ConnectionFactory _factory = null!; private static IConnection _connection = null!; private static IModel _channel= null!; public static void StartRabbitMqConsumer(Dictionary installationConnections, Object sharedDataLock) { string vpnServerIp = "194.182.190.208"; _factory = new ConnectionFactory { HostName = vpnServerIp }; _connection = _factory.CreateConnection(); _channel = _connection.CreateModel(); Console.WriteLine("Middleware subscribed to RabbitMQ queue, ready for receiving messages"); _channel.QueueDeclare(queue: "statusQueue", durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(_channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); StatusMessage receivedStatusMessage = JsonSerializer.Deserialize(message); lock (sharedDataLock) { // Process the received message if (receivedStatusMessage != null) { Console.WriteLine("Received a message from installation:"+receivedStatusMessage); Console.WriteLine("Received a message from installation: " + receivedStatusMessage.InstallationId + " and status is: " + receivedStatusMessage.Status); Console.WriteLine("----------------------------------------------"); Console.WriteLine("Update installation connection table"); var installationId = receivedStatusMessage.InstallationId; if (!installationConnections.ContainsKey(installationId)) { Console.WriteLine("Create new empty list for installation: " + installationId); installationConnections[installationId] = new InstallationInfo { Status = receivedStatusMessage.Status }; } Console.WriteLine("----------------------------------------------"); foreach (var installationConnection in installationConnections) { if (installationConnection.Key == installationId && installationConnection.Value.Connections.Count > 0) { Console.WriteLine("Update all the connected websockets for installation " + installationId); installationConnection.Value.Status = receivedStatusMessage.Status; var jsonObject = new { id = installationId, status = receivedStatusMessage.Status }; string jsonString = JsonSerializer.Serialize(jsonObject); byte[] dataToSend = Encoding.UTF8.GetBytes(jsonString); foreach (var connection in installationConnection.Value.Connections) { connection.SendAsync( new ArraySegment(dataToSend, 0, dataToSend.Length), WebSocketMessageType.Text, true, // Indicates that this is the end of the message CancellationToken.None ); } } } } } }; _channel.BasicConsume(queue: "statusQueue", autoAck: true, consumer: consumer); } }