feat(adminui): FleetStatusSignalRBridge — DPS → SignalR forwarding (F16)
Some checks failed
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been cancelled
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been cancelled
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been cancelled
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been cancelled
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been cancelled
v2-ci / integration (push) Has been cancelled
v2-ci / build (push) Has been cancelled
Some checks failed
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been cancelled
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been cancelled
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been cancelled
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been cancelled
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been cancelled
v2-ci / integration (push) Has been cancelled
v2-ci / build (push) Has been cancelled
New per-admin-node actor that subscribes to the fleet-status DistributedPubSub topic + forwards every FleetStatusChanged snapshot to all SignalR clients connected to FleetStatusHub via IHubContext. Wired via WithOtOpcUaSignalRBridges (new AkkaConfigurationBuilder extension in AdminUI.Hubs) — Program.cs calls it inside the if(hasAdmin) block alongside WithOtOpcUaControlPlaneSingletons. Per-node subscription rather than cluster-singleton: every admin node forwards its own snapshots to its own connected clients. Simpler than singleton coordination + acceptable because the messages are small and SignalR fan-out is per-node anyway.
This commit is contained in:
@@ -0,0 +1,52 @@
|
|||||||
|
using Akka.Actor;
|
||||||
|
using Akka.Cluster.Tools.PublishSubscribe;
|
||||||
|
using Akka.Event;
|
||||||
|
using Microsoft.AspNetCore.SignalR;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.AdminUI.Hubs;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Akka actor that subscribes to the <c>fleet-status</c> DistributedPubSub topic and forwards
|
||||||
|
/// every <see cref="FleetStatusChanged"/> snapshot to all SignalR clients connected to
|
||||||
|
/// <see cref="FleetStatusHub"/>. Spawned on admin-role nodes by
|
||||||
|
/// <c>AddOtOpcUaSignalRBridges</c>.
|
||||||
|
///
|
||||||
|
/// The bridge runs locally on each admin node — every node that hosts the hub also forwards
|
||||||
|
/// snapshots to its own connected clients. That keeps the hub-to-actor wiring simple (no
|
||||||
|
/// cluster-singleton coordination needed) at the cost of duplicated DPS subscriptions on
|
||||||
|
/// multi-admin deployments. Acceptable since the messages are small + the SignalR fan-out
|
||||||
|
/// is per-node anyway.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class FleetStatusSignalRBridge : ReceiveActor
|
||||||
|
{
|
||||||
|
public const string TopicName = "fleet-status";
|
||||||
|
|
||||||
|
private readonly IHubContext<FleetStatusHub> _hub;
|
||||||
|
private readonly ILoggingAdapter _log = Context.GetLogger();
|
||||||
|
|
||||||
|
public static Props Props(IHubContext<FleetStatusHub> hub) =>
|
||||||
|
Akka.Actor.Props.Create(() => new FleetStatusSignalRBridge(hub));
|
||||||
|
|
||||||
|
public FleetStatusSignalRBridge(IHubContext<FleetStatusHub> hub)
|
||||||
|
{
|
||||||
|
_hub = hub;
|
||||||
|
ReceiveAsync<FleetStatusChanged>(ForwardAsync);
|
||||||
|
Receive<SubscribeAck>(_ => { /* DPS confirmation */ });
|
||||||
|
}
|
||||||
|
|
||||||
|
protected override void PreStart() =>
|
||||||
|
DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(TopicName, Self));
|
||||||
|
|
||||||
|
private async Task ForwardAsync(FleetStatusChanged msg)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await _hub.Clients.All.SendAsync(FleetStatusHub.MethodName, msg);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_log.Warning(ex, "FleetStatusSignalRBridge: SignalR push failed (count={Count})", msg.Nodes.Count);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,39 @@
|
|||||||
|
using Akka.Actor;
|
||||||
|
using Akka.Hosting;
|
||||||
|
using Microsoft.AspNetCore.SignalR;
|
||||||
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.AdminUI.Hubs;
|
||||||
|
|
||||||
|
public static class HubServiceCollectionExtensions
|
||||||
|
{
|
||||||
|
public const string FleetStatusSignalRBridgeName = "fleet-status-signalr-bridge";
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Spawns the SignalR bridge actors that forward DPS messages to browser-facing SignalR
|
||||||
|
/// hubs. Currently: <see cref="FleetStatusSignalRBridge"/> (DPS <c>fleet-status</c> topic →
|
||||||
|
/// <see cref="FleetStatusHub"/> clients).
|
||||||
|
///
|
||||||
|
/// Call inside the admin-role configurator on the shared <see cref="AkkaConfigurationBuilder"/>:
|
||||||
|
/// <code>
|
||||||
|
/// if (hasAdmin)
|
||||||
|
/// {
|
||||||
|
/// ab.WithOtOpcUaControlPlaneSingletons();
|
||||||
|
/// ab.WithOtOpcUaSignalRBridges();
|
||||||
|
/// }
|
||||||
|
/// </code>
|
||||||
|
/// </summary>
|
||||||
|
public static AkkaConfigurationBuilder WithOtOpcUaSignalRBridges(this AkkaConfigurationBuilder builder)
|
||||||
|
{
|
||||||
|
builder.WithActors((system, registry, resolver) =>
|
||||||
|
{
|
||||||
|
var hub = resolver.GetService<IHubContext<FleetStatusHub>>();
|
||||||
|
var actor = system.ActorOf(FleetStatusSignalRBridge.Props(hub), FleetStatusSignalRBridgeName);
|
||||||
|
registry.Register<FleetStatusSignalRBridgeKey>(actor);
|
||||||
|
});
|
||||||
|
return builder;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Marker key for <see cref="ActorRegistry"/> lookup of the SignalR bridge actor.</summary>
|
||||||
|
public sealed class FleetStatusSignalRBridgeKey { }
|
||||||
@@ -44,7 +44,10 @@ builder.Services.AddAkka("otopcua", (ab, sp) =>
|
|||||||
{
|
{
|
||||||
ab.WithOtOpcUaClusterBootstrap(sp);
|
ab.WithOtOpcUaClusterBootstrap(sp);
|
||||||
if (hasAdmin)
|
if (hasAdmin)
|
||||||
|
{
|
||||||
ab.WithOtOpcUaControlPlaneSingletons();
|
ab.WithOtOpcUaControlPlaneSingletons();
|
||||||
|
ab.WithOtOpcUaSignalRBridges();
|
||||||
|
}
|
||||||
if (hasDriver)
|
if (hasDriver)
|
||||||
ab.WithOtOpcUaRuntimeActors();
|
ab.WithOtOpcUaRuntimeActors();
|
||||||
});
|
});
|
||||||
|
|||||||
Reference in New Issue
Block a user