Skip to content

Commit

Permalink
show agent status from websocket on frontend
Browse files Browse the repository at this point in the history
  • Loading branch information
mlapaglia committed Nov 8, 2023
1 parent 5e01826 commit c8f30e7
Show file tree
Hide file tree
Showing 16 changed files with 472 additions and 56 deletions.
25 changes: 25 additions & 0 deletions OpenAlprWebhookProcessor/Settings/AgentStatus.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
namespace OpenAlprWebhookProcessor.Settings
{
public class AgentStatus
{
public bool IsConnected { get; set; }

public string Hostname { get; set; }

public string Version { get; set; }

public int CpuCores { get; set; }

public int CpuUsagePercent { get; set; }

public long DaemonUptimeSeconds { get; set; }

public long DiskFreeBytes { get; set; }

public long SystemUptimeSeconds { get; set; }

public long AgentEpochMs { get; set; }

public bool AlprdActive { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Microsoft.EntityFrameworkCore;
using OpenAlprWebhookProcessor.Data;
using OpenAlprWebhookProcessor.WebhookProcessor.OpenAlprWebsocket;
using System.Threading;
using System.Threading.Tasks;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
using Microsoft.EntityFrameworkCore;
using OpenAlprWebhookProcessor.Data;
using OpenAlprWebhookProcessor.WebhookProcessor.OpenAlprWebsocket;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace OpenAlprWebhookProcessor.Settings
{
public class GetAgentStatusRequestHandler
{
private readonly ProcessorContext _processorContext;

private readonly WebsocketClientOrganizer _websocketClientOrganizer;

public GetAgentStatusRequestHandler(
ProcessorContext processorContext,
WebsocketClientOrganizer websocketClientOrganizer)
{
_processorContext = processorContext;
_websocketClientOrganizer = websocketClientOrganizer;
}

public async Task<AgentStatus> HandleAsync(CancellationToken cancellationToken)
{
var agentUid = await _processorContext.Agents
.AsNoTracking()
.Select(x => x.Uid)
.FirstOrDefaultAsync(cancellationToken);

if (agentUid == null)
{
return new AgentStatus()
{
IsConnected = false,
};
}

var agentStatus = await _websocketClientOrganizer.GetAgentStatusAsync(agentUid, cancellationToken);

if (agentStatus == null)
{
return new AgentStatus()
{
IsConnected = false,
};
}

return new AgentStatus()
{
AgentEpochMs = agentStatus.AgentEpochMs,
AlprdActive = agentStatus.AgentStatus.AlprdActive,
Hostname = agentStatus.AgentStatus.AgentHostname,
IsConnected = true,
CpuCores = agentStatus.AgentStatus.CpuCores,
CpuUsagePercent = agentStatus.AgentStatus.CpuUsagePercent,
DaemonUptimeSeconds = agentStatus.AgentStatus.DaemonUptimeSeconds,
DiskFreeBytes = agentStatus.AgentStatus.DiskDriveFreeBytes,
Version = agentStatus.Version,
SystemUptimeSeconds = agentStatus.AgentStatus.SystemUptimeSeconds,
};
}
}
}
12 changes: 11 additions & 1 deletion OpenAlprWebhookProcessor/Settings/SettingsController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public class SettingsController : ControllerBase
{
private readonly GetAgentRequestHandler _getAgentRequestHandler;

private readonly GetAgentStatusRequestHandler _getAgentStatusRequestHandler;

private readonly UpsertAgentRequestHandler _upsertAgentRequestHandler;

private readonly GetIgnoresRequestHandler _getIgnoresRequestHandler;
Expand Down Expand Up @@ -55,7 +57,8 @@ public SettingsController(
UpsertEnricherRequestHandler upsertEnricherRequestHandler,
TestEnricherRequestHandler testEnricherRequestHandler,
GetDebugPlateGroupRequestHandler getDebugPlateGroupHandler,
DeleteDebugPlateGroupRequestHandler deleteDebugPlateGroupRequestHandler)
DeleteDebugPlateGroupRequestHandler deleteDebugPlateGroupRequestHandler,
GetAgentStatusRequestHandler getAgentStatusRequestHandler)
{
_getAgentRequestHandler = getAgentRequestHandler;
_upsertAgentRequestHandler = upsertAgentRequestHandler;
Expand All @@ -69,6 +72,7 @@ public SettingsController(
_testEnricherRequestHandler = testEnricherRequestHandler;
_getDebugPlateGroupHandler = getDebugPlateGroupHandler;
_deleteDebugPlateGroupRequestHandler = deleteDebugPlateGroupRequestHandler;
_getAgentStatusRequestHandler = getAgentStatusRequestHandler;
}

[HttpGet("agent")]
Expand All @@ -77,6 +81,12 @@ public async Task<Agent> GetAgent(CancellationToken cancellationToken)
return await _getAgentRequestHandler.HandleAsync(cancellationToken);
}

[HttpGet("agent/status")]
public async Task<AgentStatus> GetAgentStatus(CancellationToken cancellationToken)
{
return await _getAgentStatusRequestHandler.HandleAsync(cancellationToken);
}

[HttpPost("agent")]
public async Task UpsertAgent([FromBody] Agent agent)
{
Expand Down
5 changes: 5 additions & 0 deletions OpenAlprWebhookProcessor/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
using OpenAlprWebhookProcessor.WebPushSubscriptions;

Check warning on line 50 in OpenAlprWebhookProcessor/Startup.cs

View workflow job for this annotation

GitHub Actions / windows-build-push

The using directive for 'OpenAlprWebhookProcessor.WebPushSubscriptions' appeared previously in this namespace
using OpenAlprWebhookProcessor.Alerts.WebPush;
using Microsoft.AspNetCore.Http;
using OpenAlprWebhookProcessor.WebhookProcessor.OpenAlprWebsocket;

namespace OpenAlprWebhookProcessor
{
Expand Down Expand Up @@ -147,6 +148,7 @@ public void ConfigureServices(IServiceCollection services)
services.AddScoped<GroupWebhookHandler>();
services.AddScoped<SinglePlateWebhookHandler>();
services.AddScoped<GetAgentRequestHandler>();
services.AddScoped<GetAgentStatusRequestHandler>();
services.AddScoped<GetCameraRequestHandler>();
services.AddScoped<SetZoomAndFocusHandler>();
services.AddScoped<GetZoomAndFocusHandler>();
Expand Down Expand Up @@ -194,6 +196,9 @@ public void ConfigureServices(IServiceCollection services)
services.AddSingleton<WebPushNotificationProducer>();
services.AddHostedService<WebPushNotificationProducer>();

services.AddSingleton<WebsocketClientOrganizer>();
services.AddHostedService<WebsocketClientOrganizer>();

services.AddSingleton<CameraUpdateService.CameraUpdateService>();
services.AddSingleton<IHostedService>(p => p.GetService<CameraUpdateService.CameraUpdateService>());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ public async Task HandleWebhookAsync(
_processorContext.PlateGroups.RemoveRange(previousPreviewGroups.Skip(1));

_logger.LogInformation("Previous preview plate exists: {plateNumber}, overwriting", plateGroup.BestNumber);

}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace OpenAlprWebhookProcessor.WebhookProcessor.OpenAlprWebsocket
{
public class AccountInfo
public class AccountInfoResponse
{
[JsonPropertyName("company_id")]
public string CompanyId { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public class AgentStatus
public bool LicenseValid { get; set; }

[JsonPropertyName("memory_consumed_bytes")]
public long memoryConsumedBytes { get; set; }
public long MemoryConsumedBytes { get; set; }

[JsonPropertyName("memory_last_update")]
public long MemoryLastUpdate { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,107 @@
using System;
using System.Text;
using System.Linq;
using System.Collections.Concurrent;
using System.Text.RegularExpressions;
using System.Text.Json;
using Microsoft.Extensions.Logging;
using static System.Runtime.InteropServices.JavaScript.JSType;

namespace OpenAlprWebhookProcessor.WebhookProcessor.OpenAlprWebsocket
{
public static class OpenAlprWebsocketClient
public partial class OpenAlprWebsocketClient
{
public static async Task Echo(WebSocket webSocket, ILogger logger)
private readonly ConcurrentDictionary<Guid, string> _availableResponses;

private WebSocket _webSocket;

private readonly string _agentId;

public OpenAlprWebsocketClient(
string agentId,
WebSocket webSocket)
{
_agentId = agentId;
_webSocket = webSocket;
_availableResponses = new ConcurrentDictionary<Guid, string>();
}

public async Task ConsumeMessagesAsync(CancellationToken cancellationToken)
{
var buffer = new byte[4096 * 4];
var receiveResult = await webSocket.ReceiveAsync(
new ArraySegment<byte>(buffer), CancellationToken.None);
var receiveResult = await _webSocket.ReceiveAsync(
new ArraySegment<byte>(buffer), cancellationToken);

while (!receiveResult.CloseStatus.HasValue)
{
string message = Encoding.UTF8.GetString(buffer.ToArray());
var rawMessage = Encoding.UTF8.GetString(buffer.ToArray(), 0, Array.FindLastIndex(buffer, b => b != 0) + 1);

logger.LogInformation("websocket message received: {message", message);
var transactionMatch = TransactionIdRegex().Match(rawMessage);

receiveResult = await webSocket.ReceiveAsync(
new ArraySegment<byte>(buffer), CancellationToken.None);
if (transactionMatch.Success)
{
_availableResponses.TryAdd(Guid.Parse(transactionMatch.Groups[1].Value), rawMessage);
}

receiveResult = await _webSocket.ReceiveAsync(
new ArraySegment<byte>(buffer),
cancellationToken);
}
}

public async Task SendGetAgentStatusRequestAsync(
Guid transactionId,
CancellationToken cancellationToken)
{
var agentStatusRequest = new ConfigInfoRequest()
{
AgentId = _agentId,
Direction = "request",
TransactionId = transactionId.ToString(),
RequestType = RequestType.GetRequestType(OpenAlprRequestType.agent_status),
Version = 1,
};

var message = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(agentStatusRequest));

logger.LogInformation("connection closed");
await webSocket.CloseAsync(
receiveResult.CloseStatus.Value,
receiveResult.CloseStatusDescription,
CancellationToken.None);
await _webSocket.SendAsync(
new ArraySegment<byte>(message, 0, message.Length),
WebSocketMessageType.Binary,
true,
cancellationToken);
}

public bool TryGetAgentStatusResponse(
Guid transactionId,
out AgentStatusResponse agentStatusResponse)
{
if (_availableResponses.TryRemove(transactionId, out string message))
{
try
{
agentStatusResponse = JsonSerializer.Deserialize<AgentStatusResponse>(message);

return true;
}
catch (Exception ex)
{
agentStatusResponse = null;
return false;
}
}

agentStatusResponse = null;
return false;
}

public async Task CloseConnectionAsync(CancellationToken cancellationToken)
{
await _webSocket.CloseAsync(
WebSocketCloseStatus.NormalClosure,
"goodbye.",
cancellationToken);
}

[GeneratedRegex("transaction_id\":\"(.*?)\"")]
private static partial Regex TransactionIdRegex();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;

namespace OpenAlprWebhookProcessor.WebhookProcessor.OpenAlprWebsocket
{
public class WebsocketClientOrganizer : BackgroundService
{
private readonly ConcurrentDictionary<string, OpenAlprWebsocketClient> _connectedClients;

private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();

private readonly ILogger<WebsocketClientOrganizer> _logger;

public WebsocketClientOrganizer(ILogger<WebsocketClientOrganizer> logger)
{
_logger = logger;
_connectedClients = new ConcurrentDictionary<string, OpenAlprWebsocketClient>();
}

protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
return Task.CompletedTask;
}

public bool AddAgent(
string agentId,
OpenAlprWebsocketClient webSocketClient)
{
return _connectedClients.TryAdd(
agentId,
webSocketClient);
}

public async Task RemoveAgentAsync(
string agentId,
CancellationToken cancellationToken)
{
if (_connectedClients.TryRemove(agentId, out var webSocketClient))
{
await webSocketClient.CloseConnectionAsync(cancellationToken);
}
}

public async Task<AgentStatusResponse> GetAgentStatusAsync(
string agentId,
CancellationToken cancellationToken)
{
var agentExists = _connectedClients.TryGetValue(agentId, out var webSocketClient);

if (!agentExists)
{
throw new ArgumentException("AgentId is not connected.");
}

var transactionId = Guid.NewGuid();

await webSocketClient.SendGetAgentStatusRequestAsync(transactionId, cancellationToken);

var stopwatch = new Stopwatch();
stopwatch.Start();

while (stopwatch.ElapsedMilliseconds < 100000)
{
if (webSocketClient.TryGetAgentStatusResponse(transactionId, out var agentStatusResponse))
{
return agentStatusResponse;
}

await Task.Delay(1000, cancellationToken);
}

throw new TimeoutException("Agent did not respond to request.");
}
}
}
Loading

0 comments on commit c8f30e7

Please sign in to comment.