86 lines
3.9 KiB
C#
86 lines
3.9 KiB
C#
using System.Net.WebSockets;
|
|
using System.Text.Json;
|
|
namespace InnovEnergy.App.Middleware;
|
|
using System.Text;
|
|
using RabbitMQ.Client;
|
|
using RabbitMQ.Client.Events;
|
|
|
|
public static class RabbitMqConsumer
|
|
{
|
|
|
|
private static ConnectionFactory _factory = null!;
|
|
private static IConnection _connection = null!;
|
|
private static IModel _channel= null!;
|
|
|
|
public static void StartRabbitMqConsumer(Dictionary<Int32, InstallationInfo> installationConnections, Object sharedDataLock)
|
|
{
|
|
string vpnServerIp = "194.182.190.208";
|
|
_factory = new ConnectionFactory { HostName = vpnServerIp };
|
|
_connection = _factory.CreateConnection();
|
|
_channel = _connection.CreateModel();
|
|
Console.WriteLine("Middleware subscribed to RabbitMQ queue, ready for receiving messages");
|
|
_channel.QueueDeclare(queue: "statusQueue", durable: false, exclusive: false, autoDelete: false, arguments: null);
|
|
|
|
var consumer = new EventingBasicConsumer(_channel);
|
|
consumer.Received += (model, ea) =>
|
|
{
|
|
var body = ea.Body.ToArray();
|
|
var message = Encoding.UTF8.GetString(body);
|
|
StatusMessage? receivedStatusMessage = JsonSerializer.Deserialize<StatusMessage>(message);
|
|
|
|
lock (sharedDataLock)
|
|
{
|
|
// Process the received message
|
|
if (receivedStatusMessage != null)
|
|
{
|
|
Console.WriteLine("Received a message from installation: " + receivedStatusMessage.InstallationId + " and status is: " + receivedStatusMessage.Status);
|
|
Console.WriteLine("----------------------------------------------");
|
|
Console.WriteLine("Update installation connection table");
|
|
var installationId = receivedStatusMessage.InstallationId;
|
|
|
|
if (!installationConnections.ContainsKey(installationId))
|
|
{
|
|
Console.WriteLine("Create new empty list for installation: " + installationId);
|
|
installationConnections[installationId] = new InstallationInfo
|
|
{
|
|
Status = receivedStatusMessage.Status
|
|
};
|
|
}
|
|
|
|
Console.WriteLine("----------------------------------------------");
|
|
|
|
foreach (var installationConnection in installationConnections)
|
|
{
|
|
if (installationConnection.Key == installationId && installationConnection.Value.Connections.Count > 0)
|
|
{
|
|
Console.WriteLine("Update all the connected websockets for installation " + installationId);
|
|
installationConnection.Value.Status = receivedStatusMessage.Status;
|
|
|
|
var jsonObject = new
|
|
{
|
|
id = installationId,
|
|
status = receivedStatusMessage.Status
|
|
};
|
|
|
|
string jsonString = JsonSerializer.Serialize(jsonObject);
|
|
byte[] dataToSend = Encoding.UTF8.GetBytes(jsonString);
|
|
|
|
foreach (var connection in installationConnection.Value.Connections)
|
|
{
|
|
connection.SendAsync(
|
|
new ArraySegment<byte>(dataToSend, 0, dataToSend.Length),
|
|
WebSocketMessageType.Text,
|
|
true, // Indicates that this is the end of the message
|
|
CancellationToken.None
|
|
);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
_channel.BasicConsume(queue: "statusQueue", autoAck: true, consumer: consumer);
|
|
|
|
}
|
|
} |