From 29370fde3c1663632e7e6e8599bd521f58d758ff Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 28 May 2026 10:13:30 -0400 Subject: [PATCH] feat(adminui): add DriverStatusSignalRBridge + InMemory snapshot store --- .../EndpointRouteBuilderExtensions.cs | 2 + .../Hubs/DriverStatusSignalRBridge.cs | 58 +++++++++++++++++++ .../Hubs/HubServiceCollectionExtensions.cs | 19 ++++++ .../Hubs/IDriverStatusSnapshotStore.cs | 15 +++++ .../Hubs/InMemoryDriverStatusSnapshotStore.cs | 21 +++++++ 5 files changed, 115 insertions(+) create mode 100644 src/Server/ZB.MOM.WW.OtOpcUa.AdminUI/Hubs/DriverStatusSignalRBridge.cs create mode 100644 src/Server/ZB.MOM.WW.OtOpcUa.AdminUI/Hubs/IDriverStatusSnapshotStore.cs create mode 100644 src/Server/ZB.MOM.WW.OtOpcUa.AdminUI/Hubs/InMemoryDriverStatusSnapshotStore.cs diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.AdminUI/EndpointRouteBuilderExtensions.cs b/src/Server/ZB.MOM.WW.OtOpcUa.AdminUI/EndpointRouteBuilderExtensions.cs index 945fb146..8b7ff9c2 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.AdminUI/EndpointRouteBuilderExtensions.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.AdminUI/EndpointRouteBuilderExtensions.cs @@ -3,6 +3,7 @@ using Microsoft.AspNetCore.Components; using Microsoft.AspNetCore.Components.Web; using Microsoft.AspNetCore.Routing; using Microsoft.Extensions.DependencyInjection; +using ZB.MOM.WW.OtOpcUa.AdminUI.Hubs; namespace ZB.MOM.WW.OtOpcUa.AdminUI; @@ -37,6 +38,7 @@ public static class EndpointRouteBuilderExtensions public static IServiceCollection AddAdminUI(this IServiceCollection services) { services.AddRazorComponents().AddInteractiveServerComponents(); + services.AddOtOpcUaDriverStatusServices(); return services; } } diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.AdminUI/Hubs/DriverStatusSignalRBridge.cs b/src/Server/ZB.MOM.WW.OtOpcUa.AdminUI/Hubs/DriverStatusSignalRBridge.cs new file mode 100644 index 00000000..0802ef77 --- /dev/null +++ b/src/Server/ZB.MOM.WW.OtOpcUa.AdminUI/Hubs/DriverStatusSignalRBridge.cs @@ -0,0 +1,58 @@ +using Akka.Actor; +using Akka.Cluster.Tools.PublishSubscribe; +using Akka.Event; +using Microsoft.AspNetCore.SignalR; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Drivers; + +namespace ZB.MOM.WW.OtOpcUa.AdminUI.Hubs; + +/// +/// Akka actor that subscribes to the driver-health DistributedPubSub topic and +/// forwards every snapshot to (a) the in-memory snapshot +/// store and (b) all SignalR clients connected to grouped +/// by . Spawned on admin-role nodes by +/// AddOtOpcUaSignalRBridges. +/// +public sealed class DriverStatusSignalRBridge : ReceiveActor +{ + public const string TopicName = "driver-health"; + + private readonly IHubContext _hub; + private readonly IDriverStatusSnapshotStore _store; + private readonly ILoggingAdapter _log = Context.GetLogger(); + + /// Creates actor props for a . + /// The SignalR hub context for pushing snapshots to grouped clients. + /// Snapshot store updated before each SignalR push. + public static Props Props(IHubContext hub, IDriverStatusSnapshotStore store) => + Akka.Actor.Props.Create(() => new DriverStatusSignalRBridge(hub, store)); + + /// Initializes a new instance of . + /// The SignalR hub context for pushing snapshots to grouped clients. + /// Snapshot store updated before each SignalR push. + public DriverStatusSignalRBridge(IHubContext hub, IDriverStatusSnapshotStore store) + { + _hub = hub; + _store = store; + ReceiveAsync(ForwardAsync); + Receive(_ => { /* DPS confirmation */ }); + } + + /// + protected override void PreStart() => + DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(TopicName, Self)); + + private async Task ForwardAsync(DriverHealthChanged msg) + { + try + { + _store.Upsert(msg); + await _hub.Clients.Group(DriverStatusHub.GroupName(msg.DriverInstanceId)) + .SendAsync(DriverStatusHub.MethodName, msg); + } + catch (Exception ex) + { + _log.Warning(ex, "DriverStatusSignalRBridge: SignalR push failed (instance={Instance})", msg.DriverInstanceId); + } + } +} diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.AdminUI/Hubs/HubServiceCollectionExtensions.cs b/src/Server/ZB.MOM.WW.OtOpcUa.AdminUI/Hubs/HubServiceCollectionExtensions.cs index affb5f0a..fc26e05d 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.AdminUI/Hubs/HubServiceCollectionExtensions.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.AdminUI/Hubs/HubServiceCollectionExtensions.cs @@ -10,6 +10,19 @@ public static class HubServiceCollectionExtensions public const string FleetStatusSignalRBridgeName = "fleet-status-signalr-bridge"; public const string AlertSignalRBridgeName = "alert-signalr-bridge"; public const string ScriptLogSignalRBridgeName = "script-log-signalr-bridge"; + public const string DriverStatusSignalRBridgeName = "driver-status-signalr-bridge"; + + /// + /// Registers services required by the driver-status hub pipeline: + /// as a singleton backed by + /// . + /// + /// The service collection. + public static IServiceCollection AddOtOpcUaDriverStatusServices(this IServiceCollection services) + { + services.AddSingleton(); + return services; + } /// /// Spawns the SignalR bridge actors that forward DPS messages to browser-facing SignalR @@ -41,6 +54,11 @@ public static class HubServiceCollectionExtensions var scriptLogHub = resolver.GetService>(); var scriptLogBridge = system.ActorOf(ScriptLogSignalRBridge.Props(scriptLogHub), ScriptLogSignalRBridgeName); registry.Register(scriptLogBridge); + + var driverStatusHub = resolver.GetService>(); + var driverStatusStore = resolver.GetService(); + var driverStatusBridge = system.ActorOf(DriverStatusSignalRBridge.Props(driverStatusHub, driverStatusStore), DriverStatusSignalRBridgeName); + registry.Register(driverStatusBridge); }); return builder; } @@ -50,3 +68,4 @@ public static class HubServiceCollectionExtensions public sealed class FleetStatusSignalRBridgeKey { } public sealed class AlertSignalRBridgeKey { } public sealed class ScriptLogSignalRBridgeKey { } +public sealed class DriverStatusSignalRBridgeKey { } diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.AdminUI/Hubs/IDriverStatusSnapshotStore.cs b/src/Server/ZB.MOM.WW.OtOpcUa.AdminUI/Hubs/IDriverStatusSnapshotStore.cs new file mode 100644 index 00000000..23c4ab79 --- /dev/null +++ b/src/Server/ZB.MOM.WW.OtOpcUa.AdminUI/Hubs/IDriverStatusSnapshotStore.cs @@ -0,0 +1,15 @@ +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Drivers; + +namespace ZB.MOM.WW.OtOpcUa.AdminUI.Hubs; + +/// +/// Singleton last-snapshot-per-instance cache. Populated by +/// DriverStatusSignalRBridge as it forwards DPS messages; read by +/// so newly-joined clients see current state +/// without waiting for the next change event. +/// +public interface IDriverStatusSnapshotStore +{ + void Upsert(DriverHealthChanged snapshot); + bool TryGet(string driverInstanceId, out DriverHealthChanged snapshot); +} diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.AdminUI/Hubs/InMemoryDriverStatusSnapshotStore.cs b/src/Server/ZB.MOM.WW.OtOpcUa.AdminUI/Hubs/InMemoryDriverStatusSnapshotStore.cs new file mode 100644 index 00000000..9f272d78 --- /dev/null +++ b/src/Server/ZB.MOM.WW.OtOpcUa.AdminUI/Hubs/InMemoryDriverStatusSnapshotStore.cs @@ -0,0 +1,21 @@ +using System.Collections.Concurrent; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Drivers; + +namespace ZB.MOM.WW.OtOpcUa.AdminUI.Hubs; + +/// +/// Thread-safe in-memory implementation of . +/// Keyed by ; last write wins. +/// +public sealed class InMemoryDriverStatusSnapshotStore : IDriverStatusSnapshotStore +{ + private readonly ConcurrentDictionary _byInstance = new(); + + /// + public void Upsert(DriverHealthChanged snapshot) + => _byInstance[snapshot.DriverInstanceId] = snapshot; + + /// + public bool TryGet(string driverInstanceId, out DriverHealthChanged snapshot) + => _byInstance.TryGetValue(driverInstanceId, out snapshot!); +}