2023-11-27 15:43:42 +00:00
using System.Net ;
using System.Net.Sockets ;
using System.Text ;
using System.Text.Json ;
using InnovEnergy.App.Backend.Database ;
using InnovEnergy.App.Backend.DataTypes ;
using RabbitMQ.Client ;
using RabbitMQ.Client.Events ;
namespace InnovEnergy.App.Backend.Websockets ;
public static class RabbitMqManager
{
public static ConnectionFactory Factory = null ! ;
public static IConnection Connection = null ! ;
public static IModel Channel = null ! ;
public static void InitializeEnvironment ( )
{
2023-11-29 20:28:11 +00:00
//string vpnServerIp = "194.182.190.208";
string vpnServerIp = "10.2.0.11" ;
2024-04-02 12:36:43 +00:00
//Subscribe to RabbitMq queue as a consumer
2023-11-29 20:28:11 +00:00
Factory = new ConnectionFactory
{
HostName = vpnServerIp ,
Port = 5672 ,
VirtualHost = "/" ,
UserName = "consumer" ,
Password = "faceaddb5005815199f8366d3d15ff8a" ,
} ;
2023-11-27 15:43:42 +00:00
Connection = Factory . CreateConnection ( ) ;
Channel = Connection . CreateModel ( ) ;
Console . WriteLine ( "Middleware subscribed to RabbitMQ queue, ready for receiving messages" ) ;
Channel . QueueDeclare ( queue : "statusQueue" , durable : true , exclusive : false , autoDelete : false , arguments : null ) ;
}
public static async Task StartRabbitMqConsumer ( )
{
2024-04-02 12:36:43 +00:00
//Wait to receive a message from an installation
2023-11-27 15:43:42 +00:00
var consumer = new EventingBasicConsumer ( Channel ) ;
consumer . Received + = ( _ , ea ) = >
{
var body = ea . Body . ToArray ( ) ;
var message = Encoding . UTF8 . GetString ( body ) ;
2024-04-02 12:36:43 +00:00
//A message can be an alarm, a warning or a heartbit
2023-11-27 15:43:42 +00:00
StatusMessage ? receivedStatusMessage = JsonSerializer . Deserialize < StatusMessage > ( message ) ;
2024-05-21 09:42:24 +00:00
2023-11-27 15:43:42 +00:00
2023-11-29 20:28:11 +00:00
lock ( WebsocketManager . InstallationConnections )
2023-11-27 15:43:42 +00:00
{
//Consumer received a message
if ( receivedStatusMessage ! = null )
{
Console . WriteLine ( "----------------------------------------------" ) ;
2024-04-30 12:07:50 +00:00
2024-05-10 17:16:16 +00:00
int installationId = ( int ) Db . Installations . Where ( f = > f . Product = = receivedStatusMessage . Product & & f . S3BucketId = = receivedStatusMessage . InstallationId ) . Select ( f = > f . Id ) . FirstOrDefault ( ) ;
Console . WriteLine ( "Received a message from installation: " + installationId + " , product is: " + receivedStatusMessage . Product + " and status is: " + receivedStatusMessage . Status ) ;
2023-11-27 15:43:42 +00:00
//This is a heartbit message, just update the timestamp for this installation.
//There is no need to notify the corresponding front-ends.
2024-04-02 12:36:43 +00:00
//Every 15 iterations(30 seconds), the installation sends a heartbit message to the queue
2023-11-27 15:43:42 +00:00
if ( receivedStatusMessage . Type = = MessageType . Heartbit )
{
Console . WriteLine ( "This is a heartbit message from installation: " + installationId ) ;
}
else
{
//Traverse the Warnings list, and store each of them to the database
if ( receivedStatusMessage . Warnings ! = null )
{
2024-05-21 09:42:24 +00:00
2023-11-27 15:43:42 +00:00
foreach ( var warning in receivedStatusMessage . Warnings )
{
Warning newWarning = new Warning
{
2024-05-10 17:16:16 +00:00
InstallationId = installationId ,
2023-11-27 15:43:42 +00:00
Description = warning . Description ,
Date = warning . Date ,
Time = warning . Time ,
DeviceCreatedTheMessage = warning . CreatedBy ,
Seen = false
} ;
//Create a new warning and add it to the database
2024-05-21 09:42:24 +00:00
Console . WriteLine ( "Add a warning for installation " + installationId ) ;
2024-05-10 17:16:16 +00:00
Db . HandleWarning ( newWarning , installationId ) ;
2023-11-27 15:43:42 +00:00
}
}
//Traverse the Alarm list, and store each of them to the database
if ( receivedStatusMessage . Alarms ! = null )
{
2024-05-21 09:42:24 +00:00
2023-11-27 15:43:42 +00:00
foreach ( var alarm in receivedStatusMessage . Alarms )
{
Error newError = new Error
{
2024-05-10 17:16:16 +00:00
InstallationId = installationId ,
2023-11-27 15:43:42 +00:00
Description = alarm . Description ,
Date = alarm . Date ,
Time = alarm . Time ,
DeviceCreatedTheMessage = alarm . CreatedBy ,
Seen = false
2024-05-21 09:42:24 +00:00
} ; Console . WriteLine ( "Add an alarm for installation " + installationId ) ;
2023-11-27 15:43:42 +00:00
//Create a new error and add it to the database
2024-05-10 17:16:16 +00:00
Db . HandleError ( newError , installationId ) ;
2023-11-27 15:43:42 +00:00
}
}
}
2023-11-29 20:28:11 +00:00
var prevStatus = 0 ;
2023-11-27 15:43:42 +00:00
//This installation id does not exist in our data structure, add it.
2023-11-29 20:28:11 +00:00
if ( ! WebsocketManager . InstallationConnections . ContainsKey ( installationId ) )
2023-11-27 15:43:42 +00:00
{
2023-11-29 20:28:11 +00:00
prevStatus = - 2 ;
2023-11-27 15:43:42 +00:00
Console . WriteLine ( "Create new empty list for installation: " + installationId ) ;
2023-11-29 20:28:11 +00:00
WebsocketManager . InstallationConnections [ installationId ] = new InstallationInfo
2023-11-27 15:43:42 +00:00
{
Status = receivedStatusMessage . Status ,
Timestamp = DateTime . Now
} ;
}
else
{
2023-11-29 20:28:11 +00:00
prevStatus = WebsocketManager . InstallationConnections [ installationId ] . Status ;
WebsocketManager . InstallationConnections [ installationId ] . Status = receivedStatusMessage . Status ;
WebsocketManager . InstallationConnections [ installationId ] . Timestamp = DateTime . Now ;
2023-11-27 15:43:42 +00:00
}
//Console.WriteLine("----------------------------------------------");
2024-04-02 12:36:43 +00:00
//If the status has changed, update all the connected front-ends regarding this installation
2023-11-29 20:28:11 +00:00
if ( prevStatus ! = receivedStatusMessage . Status & & WebsocketManager . InstallationConnections [ installationId ] . Connections . Count > 0 )
2023-11-27 15:43:42 +00:00
{
WebsocketManager . InformWebsocketsForInstallation ( installationId ) ;
}
}
2024-05-21 09:42:24 +00:00
2023-11-27 15:43:42 +00:00
}
} ;
Channel . BasicConsume ( queue : "statusQueue" , autoAck : true , consumer : consumer ) ;
}
public static void InformInstallationsToSubscribeToRabbitMq ( )
{
var installationIps = Db . Installations . Select ( inst = > inst . VpnIp ) . ToList ( ) ;
Console . WriteLine ( "Count is " + installationIps . Count ) ;
var maxRetransmissions = 2 ;
UdpClient udpClient = new UdpClient ( ) ;
udpClient . Client . ReceiveTimeout = 2000 ;
int port = 9000 ;
//Send a message to each installation and tell it to subscribe to the queue
using ( udpClient )
{
for ( int i = 0 ; i < installationIps . Count ; i + + )
{
if ( installationIps [ i ] = = "" ) { continue ; }
Console . WriteLine ( "-----------------------------------------------------------" ) ;
Console . WriteLine ( "Trying to reach installation with IP: " + installationIps [ i ] ) ;
//Try at most MAX_RETRANSMISSIONS times to reach an installation.
for ( int j = 0 ; j < maxRetransmissions ; j + + )
{
string message = "This is a message from RabbitMQ server, you can subscribe to the RabbitMQ queue" ;
byte [ ] data = Encoding . UTF8 . GetBytes ( message ) ;
udpClient . Send ( data , data . Length , installationIps [ i ] , port ) ;
Console . WriteLine ( $"Sent UDP message to {installationIps[i]}:{port}: {message}" ) ;
IPEndPoint remoteEndPoint = new IPEndPoint ( IPAddress . Parse ( installationIps [ i ] ) , port ) ;
try
{
byte [ ] replyData = udpClient . Receive ( ref remoteEndPoint ) ;
string replyMessage = Encoding . UTF8 . GetString ( replyData ) ;
Console . WriteLine ( "Received " + replyMessage + " from installation " + installationIps [ i ] ) ;
break ;
}
catch ( SocketException ex )
{
if ( ex . SocketErrorCode = = SocketError . TimedOut ) { Console . WriteLine ( "Timed out waiting for a response. Retry..." ) ; }
else { Console . WriteLine ( "Error: " + ex . Message ) ; }
}
}
}
}
Console . WriteLine ( "Start RabbitMQ Consumer" ) ;
}
}