193 lines
8.9 KiB
C#
193 lines
8.9 KiB
C#
using System.Net;
|
|
using System.Net.Sockets;
|
|
using System.Text;
|
|
using System.Text.Json;
|
|
using InnovEnergy.App.Backend.Database;
|
|
using InnovEnergy.App.Backend.DataTypes;
|
|
using RabbitMQ.Client;
|
|
using RabbitMQ.Client.Events;
|
|
|
|
namespace InnovEnergy.App.Backend.Websockets;
|
|
|
|
public static class RabbitMqManager
|
|
{
|
|
|
|
public static ConnectionFactory Factory = null!;
|
|
public static IConnection Connection = null!;
|
|
public static IModel Channel = null!;
|
|
|
|
|
|
public static void InitializeEnvironment()
|
|
{
|
|
|
|
//string vpnServerIp = "194.182.190.208";
|
|
string vpnServerIp = "10.2.0.11";
|
|
|
|
//Subscribe to RabbitMq queue as a consumer
|
|
Factory = new ConnectionFactory
|
|
{
|
|
HostName = vpnServerIp,
|
|
Port = 5672,
|
|
VirtualHost = "/",
|
|
UserName = "consumer",
|
|
Password = "faceaddb5005815199f8366d3d15ff8a",
|
|
|
|
};
|
|
|
|
Connection = Factory.CreateConnection();
|
|
Channel = Connection.CreateModel();
|
|
Console.WriteLine("Middleware subscribed to RabbitMQ queue, ready for receiving messages");
|
|
Channel.QueueDeclare(queue: "statusQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
|
|
}
|
|
|
|
public static async Task StartRabbitMqConsumer()
|
|
{
|
|
//Wait to receive a message from an installation
|
|
var consumer = new EventingBasicConsumer(Channel);
|
|
consumer.Received += (_, ea) =>
|
|
{
|
|
var body = ea.Body.ToArray();
|
|
var message = Encoding.UTF8.GetString(body);
|
|
//A message can be an alarm, a warning or a heartbit
|
|
StatusMessage? receivedStatusMessage = JsonSerializer.Deserialize<StatusMessage>(message);
|
|
|
|
lock (WebsocketManager.InstallationConnections)
|
|
{
|
|
//Consumer received a message
|
|
if (receivedStatusMessage != null)
|
|
{
|
|
Console.WriteLine("----------------------------------------------");
|
|
Console.WriteLine("Received a message from installation: " + receivedStatusMessage.InstallationId + " and status is: " + receivedStatusMessage.Status);
|
|
var installationId = receivedStatusMessage.InstallationId;
|
|
|
|
//This is a heartbit message, just update the timestamp for this installation.
|
|
//There is no need to notify the corresponding front-ends.
|
|
//Every 15 iterations(30 seconds), the installation sends a heartbit message to the queue
|
|
if (receivedStatusMessage.Type == MessageType.Heartbit)
|
|
{
|
|
Console.WriteLine("This is a heartbit message from installation: " + installationId);
|
|
}
|
|
else
|
|
{
|
|
//Traverse the Warnings list, and store each of them to the database
|
|
if (receivedStatusMessage.Warnings != null)
|
|
{
|
|
foreach (var warning in receivedStatusMessage.Warnings)
|
|
{
|
|
Warning newWarning = new Warning
|
|
{
|
|
InstallationId = receivedStatusMessage.InstallationId,
|
|
Description = warning.Description,
|
|
Date = warning.Date,
|
|
Time = warning.Time,
|
|
DeviceCreatedTheMessage = warning.CreatedBy,
|
|
Seen = false
|
|
};
|
|
//Create a new warning and add it to the database
|
|
Db.HandleWarning(newWarning, receivedStatusMessage.InstallationId);
|
|
}
|
|
}
|
|
|
|
|
|
//Traverse the Alarm list, and store each of them to the database
|
|
if (receivedStatusMessage.Alarms != null)
|
|
{
|
|
Console.WriteLine("Add an alarm for installation "+receivedStatusMessage.InstallationId);
|
|
foreach (var alarm in receivedStatusMessage.Alarms)
|
|
{
|
|
Error newError = new Error
|
|
{
|
|
InstallationId = receivedStatusMessage.InstallationId,
|
|
Description = alarm.Description,
|
|
Date = alarm.Date,
|
|
Time = alarm.Time,
|
|
DeviceCreatedTheMessage = alarm.CreatedBy,
|
|
Seen = false
|
|
};
|
|
//Create a new error and add it to the database
|
|
Db.HandleError(newError, receivedStatusMessage.InstallationId);
|
|
}
|
|
}
|
|
}
|
|
|
|
var prevStatus = 0;
|
|
|
|
//This installation id does not exist in our data structure, add it.
|
|
if (!WebsocketManager.InstallationConnections.ContainsKey(installationId))
|
|
{
|
|
prevStatus = -2;
|
|
Console.WriteLine("Create new empty list for installation: " + installationId);
|
|
WebsocketManager.InstallationConnections[installationId] = new InstallationInfo
|
|
{
|
|
Status = receivedStatusMessage.Status,
|
|
Timestamp = DateTime.Now
|
|
};
|
|
}
|
|
else
|
|
{
|
|
prevStatus = WebsocketManager.InstallationConnections[installationId].Status;
|
|
WebsocketManager.InstallationConnections[installationId].Status = receivedStatusMessage.Status;
|
|
WebsocketManager.InstallationConnections[installationId].Timestamp = DateTime.Now;
|
|
}
|
|
|
|
//Console.WriteLine("----------------------------------------------");
|
|
//If the status has changed, update all the connected front-ends regarding this installation
|
|
if(prevStatus != receivedStatusMessage.Status && WebsocketManager.InstallationConnections[installationId].Connections.Count > 0)
|
|
{
|
|
WebsocketManager.InformWebsocketsForInstallation(installationId);
|
|
}
|
|
|
|
}
|
|
}
|
|
};
|
|
Channel.BasicConsume(queue: "statusQueue", autoAck: true, consumer: consumer);
|
|
}
|
|
|
|
public static void InformInstallationsToSubscribeToRabbitMq()
|
|
{
|
|
var installationIps = Db.Installations.Select(inst => inst.VpnIp).ToList();
|
|
Console.WriteLine("Count is "+installationIps.Count);
|
|
var maxRetransmissions = 2;
|
|
|
|
UdpClient udpClient = new UdpClient();
|
|
udpClient.Client.ReceiveTimeout = 2000;
|
|
int port = 9000;
|
|
//Send a message to each installation and tell it to subscribe to the queue
|
|
using (udpClient)
|
|
{
|
|
for (int i = 0; i < installationIps.Count; i++)
|
|
{
|
|
if(installationIps[i]==""){continue;}
|
|
Console.WriteLine("-----------------------------------------------------------");
|
|
Console.WriteLine("Trying to reach installation with IP: " + installationIps[i]);
|
|
//Try at most MAX_RETRANSMISSIONS times to reach an installation.
|
|
for (int j = 0; j < maxRetransmissions; j++)
|
|
{
|
|
string message = "This is a message from RabbitMQ server, you can subscribe to the RabbitMQ queue";
|
|
byte[] data = Encoding.UTF8.GetBytes(message);
|
|
udpClient.Send(data, data.Length, installationIps[i], port);
|
|
|
|
Console.WriteLine($"Sent UDP message to {installationIps[i]}:{port}: {message}");
|
|
IPEndPoint remoteEndPoint = new IPEndPoint(IPAddress.Parse(installationIps[i]), port);
|
|
|
|
try
|
|
{
|
|
byte[] replyData = udpClient.Receive(ref remoteEndPoint);
|
|
string replyMessage = Encoding.UTF8.GetString(replyData);
|
|
Console.WriteLine("Received " + replyMessage + " from installation " + installationIps[i]);
|
|
break;
|
|
}
|
|
catch (SocketException ex)
|
|
{
|
|
if (ex.SocketErrorCode == SocketError.TimedOut){Console.WriteLine("Timed out waiting for a response. Retry...");}
|
|
else{Console.WriteLine("Error: " + ex.Message);}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
Console.WriteLine("Start RabbitMQ Consumer");
|
|
|
|
}
|
|
|
|
} |