Merge remote-tracking branch 'origin/main'

This commit is contained in:
Kim 2023-11-06 15:17:47 +01:00
commit 6458757222
4 changed files with 80 additions and 61 deletions

View File

@ -9,38 +9,58 @@ internal class Program
{ {
public static readonly object SharedDataLock = new object(); public static readonly object SharedDataLock = new object();
public static void Main(string[] args) public static async Task Main(string[] args)
{ {
//For each installation id, we maintain a list of the connected clients //For each installation id, we maintain a list of the connected clients
var installationConnections = new Dictionary<int, InstallationInfo>(); var installationConnections = new Dictionary<int, InstallationInfo>();
var installationsIds = new List<int> {1}; var installationsIds = new List<int> {1};
var installationIps = new List<string> {"10.2.3.115"}; var installationIps = new List<string> {"10.2.3.115"};
var MAX_RETRANSMISSIONS = 2;
RabbitMqConsumer.StartRabbitMqConsumer(installationConnections,SharedDataLock); RabbitMqConsumer.StartRabbitMqConsumer(installationConnections,SharedDataLock);
UdpClient udpClient = new UdpClient(); UdpClient udpClient = new UdpClient();
udpClient.Client.ReceiveTimeout = 2000;
int port = 9000; int port = 9000;
//Send a message to each installation and tell it to subscribe to the queue
for (int i = 0; i < installationsIds.Count; i++) for (int i = 0; i < installationsIds.Count; i++)
{ {
using (udpClient) using (udpClient)
{ {
// Convert the message to bytes (UTF-8 encoding is used here) //Try at most MAX_RETRANSMISSIONS times to reach an installation.
for (int j = 0; j < MAX_RETRANSMISSIONS; j++)
{
string message = "This is a message from RabbitMQ server, you can subscribe to the RabbitMQ queue"; string message = "This is a message from RabbitMQ server, you can subscribe to the RabbitMQ queue";
byte[] data = Encoding.UTF8.GetBytes(message); byte[] data = Encoding.UTF8.GetBytes(message);
// Send the UDP message to the specified IP address and port
udpClient.Send(data, data.Length, installationIps[i], port); udpClient.Send(data, data.Length, installationIps[i], port);
Console.WriteLine($"Sent UDP message to {installationIps[i]}:{port}: {message}"); Console.WriteLine($"Sent UDP message to {installationIps[i]}:{port}: {message}");
IPEndPoint remoteEndPoint = new IPEndPoint(IPAddress.Parse(installationIps[i]), port); IPEndPoint remoteEndPoint = new IPEndPoint(IPAddress.Parse(installationIps[i]), port);
try
{
byte[] replyData = udpClient.Receive(ref remoteEndPoint); byte[] replyData = udpClient.Receive(ref remoteEndPoint);
string replyMessage = Encoding.UTF8.GetString(replyData); string replyMessage = Encoding.UTF8.GetString(replyData);
Console.WriteLine("Received message from installation " + installationsIds[i]); Console.WriteLine("Received message from installation " + installationsIds[i]);
break;
}
catch (SocketException ex)
{
if (ex.SocketErrorCode == SocketError.TimedOut)
{
Console.WriteLine("Timed out waiting for a response. Retry...");
}
else
{
Console.WriteLine("Error: " + ex.Message);
}
}
}
} }
} }
WebSocketListener.StartServerAsync(installationConnections,SharedDataLock);
Console.WriteLine("WebSocket server is running. Press Enter to exit."); Console.WriteLine("WebSocket server is running. Press Enter to exit.");
Console.ReadLine(); Console.WriteLine("WebSocket server is running. Press Enter to exit.");
await WebSocketListener.StartServerAsync(installationConnections,SharedDataLock);
} }
} }

View File

@ -1,6 +1,5 @@
using System.Net.WebSockets; using System.Net.WebSockets;
using System.Text.Json; using System.Text.Json;
namespace InnovEnergy.App.Middleware; namespace InnovEnergy.App.Middleware;
using System.Text; using System.Text;
using RabbitMQ.Client; using RabbitMQ.Client;
@ -27,14 +26,13 @@ public static class RabbitMqConsumer
{ {
var body = ea.Body.ToArray(); var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body); var message = Encoding.UTF8.GetString(body);
StatusMessage receivedStatusMessage = JsonSerializer.Deserialize<StatusMessage>(message); StatusMessage? receivedStatusMessage = JsonSerializer.Deserialize<StatusMessage>(message);
lock (sharedDataLock) lock (sharedDataLock)
{ {
// Process the received message // Process the received message
if (receivedStatusMessage != null) if (receivedStatusMessage != null)
{ {
Console.WriteLine("Received a message from installation:"+receivedStatusMessage);
Console.WriteLine("Received a message from installation: " + receivedStatusMessage.InstallationId + " and status is: " + receivedStatusMessage.Status); Console.WriteLine("Received a message from installation: " + receivedStatusMessage.InstallationId + " and status is: " + receivedStatusMessage.Status);
Console.WriteLine("----------------------------------------------"); Console.WriteLine("----------------------------------------------");
Console.WriteLine("Update installation connection table"); Console.WriteLine("Update installation connection table");

View File

@ -7,15 +7,13 @@ namespace InnovEnergy.App.Middleware;
public static class WebSocketListener public static class WebSocketListener
{ {
//For each installation id, we maintain a list of the connected clients
public static async Task StartServerAsync(Dictionary<Int32, InstallationInfo> installationConnections, Object sharedDataLock) public static async Task StartServerAsync(Dictionary<Int32, InstallationInfo> installationConnections, Object sharedDataLock)
{ {
var listener = new HttpListener(); var listener = new HttpListener();
listener.Prefixes.Add("http://localhost:8080/websocket/"); listener.Prefixes.Add("http://127.0.0.1:8080/");
listener.Start();
Console.WriteLine("WebSocket server listening on http://localhost:8080/websocket/"); listener.Start();
//Http listener listens for connections. When it accepts a new connection, it creates a new Task to handle this connection //Http listener listens for connections. When it accepts a new connection, it creates a new Task to handle this connection
while (true) while (true)
@ -37,16 +35,11 @@ public static class WebSocketListener
} }
} }
//We have a task per websocket connection
async Task HandleWebSocketConnection(WebSocket currentWebSocket, Dictionary<Int32, InstallationInfo> installationConnections) async Task HandleWebSocketConnection(WebSocket currentWebSocket, Dictionary<Int32, InstallationInfo> installationConnections)
{ {
var buffer = new byte[4096]; var buffer = new byte[4096];
// Console.WriteLine("This is a new Task, the installation connections table is the following: ");
// foreach (var installationConnection in installationConnections)
// {
// Console.WriteLine($"Installation ID: {installationConnection.Key}, Number of Connections: {installationConnection.Value.Count}");
// }
// Console.WriteLine("----------------------------------------------");
try try
{ {
@ -60,18 +53,17 @@ public static class WebSocketListener
var message = Encoding.UTF8.GetString(buffer, 0, result.Count); var message = Encoding.UTF8.GetString(buffer, 0, result.Count);
var installationIds = JsonSerializer.Deserialize<int[]>(message); var installationIds = JsonSerializer.Deserialize<int[]>(message);
//Console.WriteLine($"length is {installationIds.Count()}");
lock (sharedDataLock) lock (sharedDataLock)
{ {
//Each front-end will send the list of the installations it wants to access //Each front-end will send the list of the installations it wants to access
//If this is a new key (installation id), initialize the list for this key and then add the websocket object of this client //If this is a new key (installation id), initialize the list for this key and then add the websocket object for this client
//Then, report the status of each requested installation to the front-end that created the websocket connection
foreach (var installationId in installationIds) foreach (var installationId in installationIds)
{ {
Console.WriteLine(installationId);
if (!installationConnections.ContainsKey(installationId)) if (!installationConnections.ContainsKey(installationId))
{ {
Console.WriteLine("Create new empty list for this installation id"); Console.WriteLine("Create new empty list for installation id "+installationId);
installationConnections[installationId] = new InstallationInfo installationConnections[installationId] = new InstallationInfo
{ {
Status = -2 Status = -2
@ -79,7 +71,6 @@ public static class WebSocketListener
} }
installationConnections[installationId].Connections.Add(currentWebSocket); installationConnections[installationId].Connections.Add(currentWebSocket);
//Console.WriteLine($"ADD a new websocket, Installation ID: {installationId}, Number of Connections: {installationConnections[installationId].Count}");
var jsonObject = new var jsonObject = new
{ {
@ -99,20 +90,17 @@ public static class WebSocketListener
} }
Console.WriteLine("Printing installation connection list"); Console.WriteLine("Printing installation connection list");
//Print the installationConnections dictionary after inserting a websocket
Console.WriteLine("----------------------------------------------"); Console.WriteLine("----------------------------------------------");
foreach (var installationConnection in installationConnections) foreach (var installationConnection in installationConnections)
{ {
Console.WriteLine("Installation ID: " + installationConnection.Key + " Number of Connections: " + installationConnection.Value.Connections.Count); Console.WriteLine("Installation ID: " + installationConnection.Key + " Number of Connections: " + installationConnection.Value.Connections.Count);
} }
Console.WriteLine("----------------------------------------------"); Console.WriteLine("----------------------------------------------");
} }
} }
//When the front-end terminates the connection, the following code will be executed //When the front-end terminates the connection, the following code will be executed
Console.WriteLine("The connection has been terminated"); Console.WriteLine("The connection has been terminated");
foreach (var installationConnection in installationConnections) foreach (var installationConnection in installationConnections)
{ {
if (installationConnection.Value.Connections.Contains(currentWebSocket)) if (installationConnection.Value.Connections.Contains(currentWebSocket))

View File

@ -57,6 +57,7 @@ internal static class Program
private static IConnection ? _connection; private static IConnection ? _connection;
private static IModel? _channel; private static IModel? _channel;
private static Boolean _subscribedToQueue = false; private static Boolean _subscribedToQueue = false;
private static Boolean _subscribeToQueueForTheFirstTime = false;
private static SalimaxAlarmState _prevSalimaxState = SalimaxAlarmState.Green; private static SalimaxAlarmState _prevSalimaxState = SalimaxAlarmState.Green;
static Program() static Program()
@ -220,6 +221,7 @@ internal static class Program
var record = ReadStatus(); var record = ReadStatus();
var currentSalimaxState = GetSalimaxStateAlarm(record); var currentSalimaxState = GetSalimaxStateAlarm(record);
SendSalimaxStateAlarm(currentSalimaxState); SendSalimaxStateAlarm(currentSalimaxState);
record.ControlConstants(); record.ControlConstants();
@ -258,6 +260,13 @@ internal static class Program
{ {
var s3Bucket = Config.Load().S3?.Bucket; var s3Bucket = Config.Load().S3?.Bucket;
//When the controller boots, it tries to subscribe to the queue
if (_subscribeToQueueForTheFirstTime==false)
{
_subscribeToQueueForTheFirstTime = true;
SubscribeToQueue(currentSalimaxState, s3Bucket);
}
//If already subscribed to the queue and the status has been changed, update the queue //If already subscribed to the queue and the status has been changed, update the queue
if (_subscribedToQueue && currentSalimaxState != _prevSalimaxState) if (_subscribedToQueue && currentSalimaxState != _prevSalimaxState)
{ {
@ -266,14 +275,8 @@ internal static class Program
InformMiddleware(s3Bucket, (Int32)currentSalimaxState); InformMiddleware(s3Bucket, (Int32)currentSalimaxState);
} }
//If there is an available message, subscribe to the queue //If there is an available message from the RabbitMQ Broker, subscribe to the queue
if (_udpListener.Available > 0) if (_udpListener.Available > 0)
{
SubscribeToQueue(currentSalimaxState, s3Bucket);
}
}
private static void SubscribeToQueue(SalimaxAlarmState currentSalimaxState, String? s3Bucket)
{ {
IPEndPoint? serverEndpoint = null; IPEndPoint? serverEndpoint = null;
@ -289,18 +292,28 @@ internal static class Program
_udpListener.Send(replyData, replyData.Length, serverEndpoint); _udpListener.Send(replyData, replyData.Length, serverEndpoint);
Console.WriteLine($"Replied to {serverEndpoint}: {replyMessage}"); Console.WriteLine($"Replied to {serverEndpoint}: {replyMessage}");
SubscribeToQueue(currentSalimaxState, s3Bucket);
}
}
private static void SubscribeToQueue(SalimaxAlarmState currentSalimaxState, String? s3Bucket)
{
try
{
_factory = new ConnectionFactory { HostName = VpnServerIp }; _factory = new ConnectionFactory { HostName = VpnServerIp };
_connection = _factory.CreateConnection(); _connection = _factory.CreateConnection();
_channel = _connection.CreateModel(); _channel = _connection.CreateModel();
_channel.QueueDeclare(queue: "statusQueue", durable: false, exclusive: false, autoDelete: false, arguments: null); _channel.QueueDeclare(queue: "statusQueue", durable: false, exclusive: false, autoDelete: false, arguments: null);
Console.WriteLine("The controller sends its status to the middleware for the first time"); Console.WriteLine("The controller sends its status to the middleware for the first time");
if (s3Bucket != null) InformMiddleware(s3Bucket, (Int32)currentSalimaxState); if (s3Bucket != null) InformMiddleware(s3Bucket, (Int32)currentSalimaxState);
_subscribedToQueue = true; _subscribedToQueue = true;
} }
catch (Exception ex)
{
Console.WriteLine("An error occurred while connecting to the RabbitMQ queue: " + ex.Message);
}
}
private static IPAddress FindVpnIp() private static IPAddress FindVpnIp()
{ {