238 lines
11 KiB
C#
238 lines
11 KiB
C#
using System.Diagnostics.CodeAnalysis;
|
|
using System.Net;
|
|
using System.Net.Sockets;
|
|
using System.Net.WebSockets;
|
|
using System.Text;
|
|
using System.Text.Json;
|
|
using RabbitMQ.Client;
|
|
using RabbitMQ.Client.Events;
|
|
|
|
namespace InnovEnergy.App.Backend.Websockets;
|
|
|
|
public static class WebsocketManager
|
|
{
|
|
public static readonly Dictionary<int, InstallationInfo> InstallationConnections = new Dictionary<int, InstallationInfo>();
|
|
private static ConnectionFactory _factory = null!;
|
|
private static IConnection _connection = null!;
|
|
private static IModel _channel = null!;
|
|
|
|
public static void InformInstallationsToSubscribeToRabbitMq()
|
|
{
|
|
var installationsIds = new List<int> { 1 };
|
|
var installationIps = new List<string> { "10.2.3.115" };
|
|
var maxRetransmissions = 2;
|
|
|
|
StartRabbitMqConsumer();
|
|
|
|
UdpClient udpClient = new UdpClient();
|
|
udpClient.Client.ReceiveTimeout = 2000;
|
|
int port = 9000;
|
|
//Send a message to each installation and tell it to subscribe to the queue
|
|
for (int i = 0; i < installationsIds.Count; i++)
|
|
{
|
|
using (udpClient)
|
|
{
|
|
//Try at most MAX_RETRANSMISSIONS times to reach an installation.
|
|
for (int j = 0; j < maxRetransmissions; j++)
|
|
{
|
|
string message = "This is a message from RabbitMQ server, you can subscribe to the RabbitMQ queue";
|
|
byte[] data = Encoding.UTF8.GetBytes(message);
|
|
udpClient.Send(data, data.Length, installationIps[i], port);
|
|
|
|
Console.WriteLine($"Sent UDP message to {installationIps[i]}:{port}: {message}");
|
|
IPEndPoint remoteEndPoint = new IPEndPoint(IPAddress.Parse(installationIps[i]), port);
|
|
|
|
try
|
|
{
|
|
byte[] replyData = udpClient.Receive(ref remoteEndPoint);
|
|
string replyMessage = Encoding.UTF8.GetString(replyData);
|
|
Console.WriteLine("Received "+replyMessage +" from installation " + installationsIds[i]);
|
|
break;
|
|
}
|
|
catch (SocketException ex)
|
|
{
|
|
if (ex.SocketErrorCode == SocketError.TimedOut)
|
|
{
|
|
Console.WriteLine("Timed out waiting for a response. Retry...");
|
|
}
|
|
else
|
|
{
|
|
Console.WriteLine("Error: " + ex.Message);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
public static void StartRabbitMqConsumer()
|
|
{
|
|
string vpnServerIp = "194.182.190.208";
|
|
//string vpnServerIp = "127.0.0.1";
|
|
_factory = new ConnectionFactory { HostName = vpnServerIp};
|
|
_connection = _factory.CreateConnection();
|
|
_channel = _connection.CreateModel();
|
|
Console.WriteLine("Middleware subscribed to RabbitMQ queue, ready for receiving messages");
|
|
_channel.QueueDeclare(queue: "statusQueue", durable: false, exclusive: false, autoDelete: false, arguments: null);
|
|
|
|
var consumer = new EventingBasicConsumer(_channel);
|
|
consumer.Received += (_, ea) => CallbackReceiveMessageFromQueue(ea);
|
|
_channel.BasicConsume(queue: "statusQueue", autoAck: true, consumer: consumer);
|
|
}
|
|
|
|
[UnconditionalSuppressMessage("Trimming", "IL2026:Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code", Justification = "<Pending>")]
|
|
private static void CallbackReceiveMessageFromQueue(BasicDeliverEventArgs ea)
|
|
{
|
|
var body = ea.Body.ToArray();
|
|
var message = Encoding.UTF8.GetString(body);
|
|
StatusMessage? receivedStatusMessage = JsonSerializer.Deserialize<StatusMessage>(message);
|
|
|
|
lock (InstallationConnections)
|
|
{
|
|
// Process the received message
|
|
if (receivedStatusMessage != null)
|
|
{
|
|
Console.WriteLine("Received a message from installation: " + receivedStatusMessage.InstallationId + " and status is: " + receivedStatusMessage.Status);
|
|
Console.WriteLine("----------------------------------------------");
|
|
Console.WriteLine("Update installation connection table");
|
|
var installationId = receivedStatusMessage.InstallationId;
|
|
|
|
if (!InstallationConnections.ContainsKey(installationId))
|
|
{
|
|
Console.WriteLine("Create new empty list for installation: " + installationId);
|
|
InstallationConnections[installationId] = new InstallationInfo
|
|
{
|
|
Status = receivedStatusMessage.Status
|
|
};
|
|
}
|
|
|
|
Console.WriteLine("----------------------------------------------");
|
|
|
|
foreach (var installationConnection in InstallationConnections)
|
|
{
|
|
if (installationConnection.Key == installationId && installationConnection.Value.Connections.Count > 0)
|
|
{
|
|
Console.WriteLine("Update all the connected websockets for installation " + installationId);
|
|
installationConnection.Value.Status = receivedStatusMessage.Status;
|
|
|
|
var jsonObject = new
|
|
{
|
|
id = installationId,
|
|
status = receivedStatusMessage.Status
|
|
};
|
|
|
|
string jsonString = JsonSerializer.Serialize(jsonObject);
|
|
byte[] dataToSend = Encoding.UTF8.GetBytes(jsonString);
|
|
|
|
foreach (var connection in installationConnection.Value.Connections)
|
|
{
|
|
connection.SendAsync(
|
|
new ArraySegment<byte>(dataToSend, 0, dataToSend.Length),
|
|
WebSocketMessageType.Text,
|
|
true, // Indicates that this is the end of the message
|
|
CancellationToken.None
|
|
);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
[UnconditionalSuppressMessage("Trimming", "IL2026:Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code", Justification = "<Pending>")]
|
|
public static async Task HandleWebSocketConnection(WebSocket currentWebSocket)
|
|
{
|
|
var buffer = new byte[4096];
|
|
try
|
|
{
|
|
while (currentWebSocket.State == WebSocketState.Open)
|
|
{
|
|
//Listen for incoming messages on this WebSocket
|
|
var result = await currentWebSocket.ReceiveAsync(buffer, CancellationToken.None);
|
|
Console.WriteLine("Received a new message from websocket");
|
|
if (result.MessageType != WebSocketMessageType.Text)
|
|
continue;
|
|
|
|
var message = Encoding.UTF8.GetString(buffer, 0, result.Count);
|
|
var installationIds = JsonSerializer.Deserialize<int[]>(message);
|
|
|
|
lock (InstallationConnections)
|
|
{
|
|
//Each front-end will send the list of the installations it wants to access
|
|
//If this is a new key (installation id), initialize the list for this key and then add the websocket object for this client
|
|
//Then, report the status of each requested installation to the front-end that created the websocket connection
|
|
if (installationIds != null)
|
|
foreach (var installationId in installationIds)
|
|
{
|
|
if (!InstallationConnections.ContainsKey(installationId))
|
|
{
|
|
Console.WriteLine("Create new empty list for installation id " + installationId);
|
|
InstallationConnections[installationId] = new InstallationInfo
|
|
{
|
|
Status = -2
|
|
};
|
|
}
|
|
|
|
InstallationConnections[installationId].Connections.Add(currentWebSocket);
|
|
|
|
var jsonObject = new
|
|
{
|
|
id = installationId,
|
|
status = InstallationConnections[installationId].Status
|
|
};
|
|
|
|
var jsonString = JsonSerializer.Serialize(jsonObject);
|
|
var dataToSend = Encoding.UTF8.GetBytes(jsonString);
|
|
|
|
|
|
currentWebSocket.SendAsync(dataToSend,
|
|
WebSocketMessageType.Text,
|
|
true, // Indicates that this is the end of the message
|
|
CancellationToken.None
|
|
);
|
|
}
|
|
|
|
Console.WriteLine("Printing installation connection list");
|
|
Console.WriteLine("----------------------------------------------");
|
|
foreach (var installationConnection in InstallationConnections)
|
|
{
|
|
Console.WriteLine("Installation ID: " + installationConnection.Key + " Number of Connections: " + installationConnection.Value.Connections.Count);
|
|
}
|
|
Console.WriteLine("----------------------------------------------");
|
|
}
|
|
}
|
|
|
|
lock (InstallationConnections)
|
|
{
|
|
//When the front-end terminates the connection, the following code will be executed
|
|
Console.WriteLine("The connection has been terminated");
|
|
foreach (var installationConnection in InstallationConnections)
|
|
{
|
|
if (installationConnection.Value.Connections.Contains(currentWebSocket))
|
|
{
|
|
installationConnection.Value.Connections.Remove(currentWebSocket);
|
|
}
|
|
}
|
|
}
|
|
|
|
await currentWebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Connection closed by server", CancellationToken.None);
|
|
lock (InstallationConnections)
|
|
{
|
|
//Print the installationConnections dictionary after deleting a websocket
|
|
Console.WriteLine("Print the installation connections list after deleting a websocket");
|
|
Console.WriteLine("----------------------------------------------");
|
|
foreach (var installationConnection in InstallationConnections)
|
|
{
|
|
Console.WriteLine("Installation ID: " + installationConnection.Key + " Number of Connections: " + installationConnection.Value.Connections.Count);
|
|
}
|
|
|
|
Console.WriteLine("----------------------------------------------");
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Console.WriteLine("WebSocket error: " + ex.Message);
|
|
}
|
|
}
|
|
|
|
} |