using Akka.Actor; using Akka.Cluster.Tools.PublishSubscribe; using Akka.Event; using Microsoft.AspNetCore.SignalR; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Drivers; namespace ZB.MOM.WW.OtOpcUa.AdminUI.Hubs; /// /// Akka actor that subscribes to the driver-health DistributedPubSub topic and /// forwards every snapshot to (a) the in-memory snapshot /// store and (b) all SignalR clients connected to grouped /// by . Spawned on admin-role nodes by /// AddOtOpcUaSignalRBridges. /// public sealed class DriverStatusSignalRBridge : ReceiveActor { public const string TopicName = DriverHealthChanged.TopicName; private readonly IHubContext _hub; private readonly IDriverStatusSnapshotStore _store; private readonly ILoggingAdapter _log = Context.GetLogger(); /// Creates actor props for a . /// The SignalR hub context for pushing snapshots to grouped clients. /// Snapshot store updated before each SignalR push. public static Props Props(IHubContext hub, IDriverStatusSnapshotStore store) => Akka.Actor.Props.Create(() => new DriverStatusSignalRBridge(hub, store)); /// Initializes a new instance of . /// The SignalR hub context for pushing snapshots to grouped clients. /// Snapshot store updated before each SignalR push. public DriverStatusSignalRBridge(IHubContext hub, IDriverStatusSnapshotStore store) { _hub = hub; _store = store; ReceiveAsync(ForwardAsync); Receive(_ => { /* DPS confirmation */ }); } /// protected override void PreStart() => DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(TopicName, Self)); private async Task ForwardAsync(DriverHealthChanged msg) { try { _store.Upsert(msg); await _hub.Clients.Group(DriverStatusHub.GroupName(msg.DriverInstanceId)) .SendAsync(DriverStatusHub.MethodName, msg); } catch (Exception ex) { _log.Warning(ex, "DriverStatusSignalRBridge: SignalR push failed (instance={Instance})", msg.DriverInstanceId); } } }