2023-11-27 15:43:42 +00:00
using System.Net ;
using System.Net.Sockets ;
using System.Text ;
using System.Text.Json ;
using InnovEnergy.App.Backend.Database ;
using InnovEnergy.App.Backend.DataTypes ;
using RabbitMQ.Client ;
using RabbitMQ.Client.Events ;
2024-06-18 11:11:58 +00:00
using InnovEnergy.Lib.Mailer ;
2023-11-27 15:43:42 +00:00
namespace InnovEnergy.App.Backend.Websockets ;
public static class RabbitMqManager
{
public static ConnectionFactory Factory = null ! ;
public static IConnection Connection = null ! ;
public static IModel Channel = null ! ;
public static void InitializeEnvironment ( )
{
2023-11-29 20:28:11 +00:00
//string vpnServerIp = "194.182.190.208";
string vpnServerIp = "10.2.0.11" ;
2024-04-02 12:36:43 +00:00
//Subscribe to RabbitMq queue as a consumer
2023-11-29 20:28:11 +00:00
Factory = new ConnectionFactory
{
HostName = vpnServerIp ,
Port = 5672 ,
VirtualHost = "/" ,
UserName = "consumer" ,
Password = "faceaddb5005815199f8366d3d15ff8a" ,
} ;
2023-11-27 15:43:42 +00:00
Connection = Factory . CreateConnection ( ) ;
Channel = Connection . CreateModel ( ) ;
Console . WriteLine ( "Middleware subscribed to RabbitMQ queue, ready for receiving messages" ) ;
Channel . QueueDeclare ( queue : "statusQueue" , durable : true , exclusive : false , autoDelete : false , arguments : null ) ;
}
public static async Task StartRabbitMqConsumer ( )
{
2024-04-02 12:36:43 +00:00
//Wait to receive a message from an installation
2023-11-27 15:43:42 +00:00
var consumer = new EventingBasicConsumer ( Channel ) ;
consumer . Received + = ( _ , ea ) = >
{
var body = ea . Body . ToArray ( ) ;
var message = Encoding . UTF8 . GetString ( body ) ;
2024-04-02 12:36:43 +00:00
//A message can be an alarm, a warning or a heartbit
2023-11-27 15:43:42 +00:00
StatusMessage ? receivedStatusMessage = JsonSerializer . Deserialize < StatusMessage > ( message ) ;
2024-05-21 09:42:24 +00:00
2023-11-27 15:43:42 +00:00
2023-11-29 20:28:11 +00:00
lock ( WebsocketManager . InstallationConnections )
2023-11-27 15:43:42 +00:00
{
//Consumer received a message
if ( receivedStatusMessage ! = null )
{
2024-06-18 14:19:40 +00:00
Installation installation = Db . Installations . FirstOrDefault ( f = > f . Product = = receivedStatusMessage . Product & & f . S3BucketId = = receivedStatusMessage . InstallationId ) ;
int installationId = ( int ) installation . Id ;
2024-05-10 17:16:16 +00:00
Console . WriteLine ( "Received a message from installation: " + installationId + " , product is: " + receivedStatusMessage . Product + " and status is: " + receivedStatusMessage . Status ) ;
2023-11-27 15:43:42 +00:00
//This is a heartbit message, just update the timestamp for this installation.
//There is no need to notify the corresponding front-ends.
2024-04-02 12:36:43 +00:00
//Every 15 iterations(30 seconds), the installation sends a heartbit message to the queue
2023-11-27 15:43:42 +00:00
if ( receivedStatusMessage . Type = = MessageType . Heartbit )
{
Console . WriteLine ( "This is a heartbit message from installation: " + installationId ) ;
}
else
{
//Traverse the Warnings list, and store each of them to the database
if ( receivedStatusMessage . Warnings ! = null )
{
2024-05-21 09:42:24 +00:00
2023-11-27 15:43:42 +00:00
foreach ( var warning in receivedStatusMessage . Warnings )
{
Warning newWarning = new Warning
{
2024-05-10 17:16:16 +00:00
InstallationId = installationId ,
2023-11-27 15:43:42 +00:00
Description = warning . Description ,
Date = warning . Date ,
Time = warning . Time ,
DeviceCreatedTheMessage = warning . CreatedBy ,
Seen = false
} ;
//Create a new warning and add it to the database
2024-05-21 09:42:24 +00:00
Console . WriteLine ( "Add a warning for installation " + installationId ) ;
2024-05-10 17:16:16 +00:00
Db . HandleWarning ( newWarning , installationId ) ;
2023-11-27 15:43:42 +00:00
}
}
//Traverse the Alarm list, and store each of them to the database
if ( receivedStatusMessage . Alarms ! = null )
{
2024-06-18 14:19:40 +00:00
string monitorLink ;
if ( installation . Product = = 0 )
{
monitorLink =
$"https://monitor.innov.energy/installations/list/installation/{installation.S3BucketId}/batteryview" ;
}
else
{
monitorLink =
$"https://monitor.innov.energy/salidomo_installations/list/installation/{installation.S3BucketId}/batteryview" ;
}
2024-05-21 09:42:24 +00:00
2023-11-27 15:43:42 +00:00
foreach ( var alarm in receivedStatusMessage . Alarms )
{
Error newError = new Error
{
2024-06-18 14:19:40 +00:00
InstallationId = installation . Id ,
2023-11-27 15:43:42 +00:00
Description = alarm . Description ,
Date = alarm . Date ,
Time = alarm . Time ,
DeviceCreatedTheMessage = alarm . CreatedBy ,
Seen = false
2024-06-18 14:19:40 +00:00
} ;
Console . WriteLine ( "Add an alarm for installation " + installationId ) ;
2024-06-18 11:11:58 +00:00
// Send replace battery email to support team if this alarm is "NeedToReplaceBattery"
2024-06-19 11:38:27 +00:00
if ( alarm . Description = = "2 or more string are disabled" )
2024-06-18 11:11:58 +00:00
{
Console . WriteLine ( "Send replace battery email to the support team for installation " + installationId ) ;
string recipient = "support@innov.energy" ;
2024-06-19 11:38:27 +00:00
string subject = $"Battery Alarm from {installation.InstallationName}: 2 or more strings broken" ;
2024-06-18 11:11:58 +00:00
string text = $"Dear InnovEnergy Support Team,\n" +
$"\n" +
2024-06-18 14:19:40 +00:00
$"Installation Name: {installation.InstallationName}\n" +
2024-06-18 11:11:58 +00:00
$"\n" +
$"Installation Monitor Link: {monitorLink}\n" +
$"\n" +
$"Please exchange: {alarm.CreatedBy}\n" +
$"\n" +
$"Error created date and time: {alarm.Date} {alarm.Time}\n" +
$"\n" +
$"Thank you for your great support:)" ;
Mailer . Send ( "InnovEnergy Support Team" , recipient , subject , text ) ;
}
2023-11-27 15:43:42 +00:00
//Create a new error and add it to the database
2024-05-10 17:16:16 +00:00
Db . HandleError ( newError , installationId ) ;
2023-11-27 15:43:42 +00:00
}
}
}
2023-11-29 20:28:11 +00:00
var prevStatus = 0 ;
2023-11-27 15:43:42 +00:00
//This installation id does not exist in our data structure, add it.
2023-11-29 20:28:11 +00:00
if ( ! WebsocketManager . InstallationConnections . ContainsKey ( installationId ) )
2023-11-27 15:43:42 +00:00
{
2023-11-29 20:28:11 +00:00
prevStatus = - 2 ;
2023-11-27 15:43:42 +00:00
Console . WriteLine ( "Create new empty list for installation: " + installationId ) ;
2023-11-29 20:28:11 +00:00
WebsocketManager . InstallationConnections [ installationId ] = new InstallationInfo
2023-11-27 15:43:42 +00:00
{
Status = receivedStatusMessage . Status ,
Timestamp = DateTime . Now
} ;
}
else
{
2023-11-29 20:28:11 +00:00
prevStatus = WebsocketManager . InstallationConnections [ installationId ] . Status ;
WebsocketManager . InstallationConnections [ installationId ] . Status = receivedStatusMessage . Status ;
WebsocketManager . InstallationConnections [ installationId ] . Timestamp = DateTime . Now ;
2023-11-27 15:43:42 +00:00
}
//Console.WriteLine("----------------------------------------------");
2024-04-02 12:36:43 +00:00
//If the status has changed, update all the connected front-ends regarding this installation
2023-11-29 20:28:11 +00:00
if ( prevStatus ! = receivedStatusMessage . Status & & WebsocketManager . InstallationConnections [ installationId ] . Connections . Count > 0 )
2023-11-27 15:43:42 +00:00
{
WebsocketManager . InformWebsocketsForInstallation ( installationId ) ;
}
}
2024-05-21 09:42:24 +00:00
2023-11-27 15:43:42 +00:00
}
} ;
Channel . BasicConsume ( queue : "statusQueue" , autoAck : true , consumer : consumer ) ;
}
public static void InformInstallationsToSubscribeToRabbitMq ( )
{
var installationIps = Db . Installations . Select ( inst = > inst . VpnIp ) . ToList ( ) ;
Console . WriteLine ( "Count is " + installationIps . Count ) ;
var maxRetransmissions = 2 ;
UdpClient udpClient = new UdpClient ( ) ;
udpClient . Client . ReceiveTimeout = 2000 ;
int port = 9000 ;
//Send a message to each installation and tell it to subscribe to the queue
using ( udpClient )
{
for ( int i = 0 ; i < installationIps . Count ; i + + )
{
if ( installationIps [ i ] = = "" ) { continue ; }
Console . WriteLine ( "-----------------------------------------------------------" ) ;
Console . WriteLine ( "Trying to reach installation with IP: " + installationIps [ i ] ) ;
//Try at most MAX_RETRANSMISSIONS times to reach an installation.
for ( int j = 0 ; j < maxRetransmissions ; j + + )
{
string message = "This is a message from RabbitMQ server, you can subscribe to the RabbitMQ queue" ;
byte [ ] data = Encoding . UTF8 . GetBytes ( message ) ;
udpClient . Send ( data , data . Length , installationIps [ i ] , port ) ;
Console . WriteLine ( $"Sent UDP message to {installationIps[i]}:{port}: {message}" ) ;
IPEndPoint remoteEndPoint = new IPEndPoint ( IPAddress . Parse ( installationIps [ i ] ) , port ) ;
try
{
byte [ ] replyData = udpClient . Receive ( ref remoteEndPoint ) ;
string replyMessage = Encoding . UTF8 . GetString ( replyData ) ;
Console . WriteLine ( "Received " + replyMessage + " from installation " + installationIps [ i ] ) ;
break ;
}
catch ( SocketException ex )
{
if ( ex . SocketErrorCode = = SocketError . TimedOut ) { Console . WriteLine ( "Timed out waiting for a response. Retry..." ) ; }
else { Console . WriteLine ( "Error: " + ex . Message ) ; }
}
}
}
}
Console . WriteLine ( "Start RabbitMQ Consumer" ) ;
}
}