Innovenergy_trunk/csharp/App/Backend/Websockets/WebsockerManager.cs

292 lines
13 KiB
C#

using System.Diagnostics.CodeAnalysis;
using System.Net;
using System.Net.Sockets;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using InnovEnergy.App.Backend.Database;
using InnovEnergy.App.Backend.DataTypes;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace InnovEnergy.App.Backend.Websockets;
public static class WebsocketManager
{
public static Dictionary<int, InstallationInfo> InstallationConnections = new Dictionary<int, InstallationInfo>();
public static ConnectionFactory Factory = null!;
public static IConnection Connection = null!;
public static IModel Channel = null!;
public static void InformInstallationsToSubscribeToRabbitMq()
{
var installationIps = Db.Installations.Select(inst => inst.VpnIp).ToList();
Console.WriteLine("Count is "+installationIps.Count);
var maxRetransmissions = 2;
UdpClient udpClient = new UdpClient();
udpClient.Client.ReceiveTimeout = 2000;
int port = 9000;
//Send a message to each installation and tell it to subscribe to the queue
using (udpClient)
{
for (int i = 0; i < installationIps.Count; i++)
{
if(installationIps[i]==""){continue;}
Console.WriteLine("-----------------------------------------------------------");
Console.WriteLine("Trying to reach installation with IP: " + installationIps[i]);
//Try at most MAX_RETRANSMISSIONS times to reach an installation.
for (int j = 0; j < maxRetransmissions; j++)
{
string message = "This is a message from RabbitMQ server, you can subscribe to the RabbitMQ queue";
byte[] data = Encoding.UTF8.GetBytes(message);
udpClient.Send(data, data.Length, installationIps[i], port);
Console.WriteLine($"Sent UDP message to {installationIps[i]}:{port}: {message}");
IPEndPoint remoteEndPoint = new IPEndPoint(IPAddress.Parse(installationIps[i]), port);
try
{
byte[] replyData = udpClient.Receive(ref remoteEndPoint);
string replyMessage = Encoding.UTF8.GetString(replyData);
Console.WriteLine("Received " + replyMessage + " from installation " + installationIps[i]);
break;
}
catch (SocketException ex)
{
if (ex.SocketErrorCode == SocketError.TimedOut)
{
Console.WriteLine("Timed out waiting for a response. Retry...");
}
else
{
Console.WriteLine("Error: " + ex.Message);
}
}
}
}
}
Console.WriteLine("Start RabbitMQ Consumer");
}
public static async Task StartRabbitMqConsumer()
{
var consumer = new EventingBasicConsumer(Channel);
consumer.Received += (_, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
StatusMessage? receivedStatusMessage = JsonSerializer.Deserialize<StatusMessage>(message);
lock (InstallationConnections)
{
//Consumer received a message
if (receivedStatusMessage != null)
{
Console.WriteLine("Received a message from installation: " + receivedStatusMessage.InstallationId + " and status is: " + receivedStatusMessage.Status);
Console.WriteLine("----------------------------------------------");
Console.WriteLine("Update installation connection table");
var installationId = receivedStatusMessage.InstallationId;
//This is an error message
if (receivedStatusMessage.Status==2)
{
Console.WriteLine("-----------------------New error-----------------------");
Error newError = new Error
{
InstallationId = receivedStatusMessage.InstallationId,
Description = receivedStatusMessage.Description,
CreatedAt = receivedStatusMessage.CreatedAt,
DeviceCreatedTheMessage = receivedStatusMessage.CreatedBy,
Seen = false
};
//Create a new error and add it to the database
Db.HandleError(newError,receivedStatusMessage.InstallationId);
}
else if (receivedStatusMessage.Status==1)
{
Console.WriteLine("-----------------------New warning-----------------------");
Warning newWarning = new Warning
{
InstallationId = receivedStatusMessage.InstallationId,
Description = receivedStatusMessage.Description,
CreatedAt = receivedStatusMessage.CreatedAt,
DeviceCreatedTheMessage = receivedStatusMessage.CreatedBy,
Seen = false
};
//Create a new error and add it to the database
Db.HandleWarning(newWarning,receivedStatusMessage.InstallationId);
}
if (!InstallationConnections.ContainsKey(installationId))
{
Console.WriteLine("Create new empty list for installation: " + installationId);
InstallationConnections[installationId] = new InstallationInfo
{
Status = receivedStatusMessage.Status
};
}
else
{
InstallationConnections[installationId].Status = receivedStatusMessage.Status;
}
Console.WriteLine("----------------------------------------------");
foreach (var installationConnection in InstallationConnections)
{
if (installationConnection.Key == installationId && installationConnection.Value.Connections.Count > 0)
{
Console.WriteLine("Update all the connected websockets for installation " + installationId);
installationConnection.Value.Status = receivedStatusMessage.Status;
var jsonObject = new
{
id = installationId,
status = receivedStatusMessage.Status
};
string jsonString = JsonSerializer.Serialize(jsonObject);
byte[] dataToSend = Encoding.UTF8.GetBytes(jsonString);
foreach (var connection in installationConnection.Value.Connections)
{
connection.SendAsync(
new ArraySegment<byte>(dataToSend, 0, dataToSend.Length),
WebSocketMessageType.Text,
true, // Indicates that this is the end of the message
CancellationToken.None
);
}
}
}
}
}
};
Channel.BasicConsume(queue: "statusQueue", autoAck: true, consumer: consumer);
}
public static async Task HandleWebSocketConnection(WebSocket currentWebSocket)
{
var buffer = new byte[4096];
try
{
while (currentWebSocket.State == WebSocketState.Open)
{
//Listen for incoming messages on this WebSocket
var result = await currentWebSocket.ReceiveAsync(buffer, CancellationToken.None);
if (result.MessageType != WebSocketMessageType.Text)
continue;
var message = Encoding.UTF8.GetString(buffer, 0, result.Count);
var installationIds = JsonSerializer.Deserialize<int[]>(message);
//This is a ping message to keep the connection alive, reply with a pong
if (installationIds[0] == -1)
{
var jsonObject = new
{
id = -1,
status = -1
};
var jsonString = JsonSerializer.Serialize(jsonObject);
var dataToSend = Encoding.UTF8.GetBytes(jsonString);
currentWebSocket.SendAsync(dataToSend,
WebSocketMessageType.Text,
true,
CancellationToken.None
);
continue;
}
Console.WriteLine("Received a new message from websocket");
lock (InstallationConnections)
{
//Each front-end will send the list of the installations it wants to access
//If this is a new key (installation id), initialize the list for this key and then add the websocket object for this client
//Then, report the status of each requested installation to the front-end that created the websocket connection
if (installationIds != null)
foreach (var installationId in installationIds)
{
if (!InstallationConnections.ContainsKey(installationId))
{
Console.WriteLine("Create new empty list for installation id " + installationId);
InstallationConnections[installationId] = new InstallationInfo
{
Status = -1
};
}
InstallationConnections[installationId].Connections.Add(currentWebSocket);
var jsonObject = new
{
id = installationId,
status = InstallationConnections[installationId].Status
};
var jsonString = JsonSerializer.Serialize(jsonObject);
var dataToSend = Encoding.UTF8.GetBytes(jsonString);
currentWebSocket.SendAsync(dataToSend,
WebSocketMessageType.Text,
true, // Indicates that this is the end of the message
CancellationToken.None
);
}
Console.WriteLine("Printing installation connection list");
Console.WriteLine("----------------------------------------------");
foreach (var installationConnection in InstallationConnections)
{
Console.WriteLine("Installation ID: " + installationConnection.Key + " Number of Connections: " + installationConnection.Value.Connections.Count);
}
Console.WriteLine("----------------------------------------------");
}
}
lock (InstallationConnections)
{
//When the front-end terminates the connection, the following code will be executed
Console.WriteLine("The connection has been terminated");
foreach (var installationConnection in InstallationConnections)
{
if (installationConnection.Value.Connections.Contains(currentWebSocket))
{
installationConnection.Value.Connections.Remove(currentWebSocket);
}
}
}
await currentWebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Connection closed by server", CancellationToken.None);
lock (InstallationConnections)
{
//Print the installationConnections dictionary after deleting a websocket
Console.WriteLine("Print the installation connections list after deleting a websocket");
Console.WriteLine("----------------------------------------------");
foreach (var installationConnection in InstallationConnections)
{
Console.WriteLine("Installation ID: " + installationConnection.Key + " Number of Connections: " + installationConnection.Value.Connections.Count);
}
Console.WriteLine("----------------------------------------------");
}
}
catch (Exception ex)
{
Console.WriteLine("WebSocket error: " + ex.Message);
}
}
}