feat(alarms): route inbound Part 9 alarm methods through AlarmAck gate (T18)
Wire the materialised AlarmConditionState method handlers so a client calling Acknowledge/Confirm/Shelve/AddComment is gated on the AlarmAck data-plane role and, when allowed, routed back to the scripted-alarm engine via a new `alarm-commands` DistributedPubSub topic. - Commons: new AlarmCommand DTO (AlarmId/Operation/User/Comment/UnshelveAtUtc). - ScriptedAlarmHostActor: add AlarmCommandsTopic const. - OtOpcUaNodeManager: settable AlarmCommandRouter + wire OnAcknowledge/OnConfirm/ OnAddComment/OnShelve/OnTimedUnshelve. Each resolves the principal off ISessionOperationContext.UserIdentity as RoleCarryingUserIdentity, fails closed (BadUserAccessDenied) when the AlarmAck role is absent or no identity, else maps + routes an AlarmCommand and returns Good. OnShelve discriminates OneShotShelve/ TimedShelve/Unshelve from the SDK flags; TimedShelve expiry = UtcNow + ms. No Akka/IActorRef handle — only the Action<AlarmCommand> delegate. T20 de-dup note left; WriteAlarmCondition untouched. - OpcUaServer.Security: OpcUaDataPlaneRoles.AlarmAck shared const (the role was a bare string everywhere; introduced one symbol for the gate + tests). - OtOpcUaSdkServer: SetAlarmCommandRouter pass-through. - Host: boot wiring publishes each command via mediator.Tell(Publish(...)) using a lazy ActorSystem accessor (mirrors DpsScriptLogPublisher). - Tests: 11 new gate + mapping tests (OpcUaServer.Tests 88->99, all green).
This commit is contained in:
@@ -1,9 +1,12 @@
|
||||
using Akka.Actor;
|
||||
using Akka.Cluster.Tools.PublishSubscribe;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
||||
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
|
||||
using ZB.MOM.WW.OtOpcUa.OpcUaServer.Security;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Host.OpcUa;
|
||||
|
||||
@@ -24,6 +27,7 @@ public sealed class OtOpcUaServerHostedService : IHostedService, IAsyncDisposabl
|
||||
private readonly DeferredAddressSpaceSink _deferredSink;
|
||||
private readonly DeferredServiceLevelPublisher _deferredServiceLevel;
|
||||
private readonly IOpcUaUserAuthenticator _userAuthenticator;
|
||||
private readonly Func<ActorSystem> _actorSystemAccessor;
|
||||
private readonly ILoggerFactory _loggerFactory;
|
||||
private readonly ILogger<OtOpcUaServerHostedService> _logger;
|
||||
|
||||
@@ -37,18 +41,23 @@ public sealed class OtOpcUaServerHostedService : IHostedService, IAsyncDisposabl
|
||||
/// <param name="deferredSink">The deferred address space sink that receives the real sink once the server is ready.</param>
|
||||
/// <param name="deferredServiceLevel">The deferred service level publisher that receives the real publisher once the server is ready.</param>
|
||||
/// <param name="userAuthenticator">The OPC UA user authenticator.</param>
|
||||
/// <param name="actorSystemAccessor">Lazy accessor for the running <see cref="ActorSystem"/>, used to
|
||||
/// resolve the DistributedPubSub mediator the inbound alarm-command router publishes through. Resolved
|
||||
/// lazily (mirroring <c>DpsScriptLogPublisher</c>) so construction never races Akka startup.</param>
|
||||
/// <param name="loggerFactory">The logger factory for creating loggers.</param>
|
||||
public OtOpcUaServerHostedService(
|
||||
IOptions<OpcUaApplicationHostOptions> options,
|
||||
DeferredAddressSpaceSink deferredSink,
|
||||
DeferredServiceLevelPublisher deferredServiceLevel,
|
||||
IOpcUaUserAuthenticator userAuthenticator,
|
||||
Func<ActorSystem> actorSystemAccessor,
|
||||
ILoggerFactory loggerFactory)
|
||||
{
|
||||
_options = options.Value;
|
||||
_deferredSink = deferredSink;
|
||||
_deferredServiceLevel = deferredServiceLevel;
|
||||
_userAuthenticator = userAuthenticator;
|
||||
_actorSystemAccessor = actorSystemAccessor;
|
||||
_loggerFactory = loggerFactory;
|
||||
_logger = loggerFactory.CreateLogger<OtOpcUaServerHostedService>();
|
||||
}
|
||||
@@ -88,6 +97,30 @@ public sealed class OtOpcUaServerHostedService : IHostedService, IAsyncDisposabl
|
||||
|
||||
_deferredSink.SetSink(new SdkAddressSpaceSink(_server.NodeManager));
|
||||
|
||||
// Wire the reverse-path inbound-alarm-command router: a client Acknowledge/Confirm/Shelve that
|
||||
// passes the node manager's AlarmAck gate publishes the mapped AlarmCommand onto the cluster
|
||||
// `alarm-commands` topic (same DistributedPubSub mediator the `alerts`/`script-logs` topics use).
|
||||
// The Tell is fire-and-forget so the handler — which runs under the SDK's Lock — never blocks.
|
||||
// The mediator is resolved per-publish via the lazy ActorSystem accessor so a transient cluster
|
||||
// condition is tolerated and construction never raced Akka startup.
|
||||
_server.SetAlarmCommandRouter(cmd =>
|
||||
{
|
||||
try
|
||||
{
|
||||
var mediator = DistributedPubSub.Get(_actorSystemAccessor()).Mediator;
|
||||
mediator.Tell(new Publish(ScriptedAlarmHostActor.AlarmCommandsTopic, cmd));
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// The router runs under the SDK Lock on a server thread; a cluster hiccup must not
|
||||
// escape into the SDK's Call path. Log + drop — the client still gets Good for the
|
||||
// node-state change; the missed command surfaces as a non-applied engine transition.
|
||||
_logger.LogWarning(ex,
|
||||
"OtOpcUaServerHostedService: failed to route inbound alarm command {Operation} for {AlarmId}",
|
||||
cmd.Operation, cmd.AlarmId);
|
||||
}
|
||||
});
|
||||
|
||||
// ServiceLevel publisher needs IServerInternal — only available after Start.
|
||||
if (_server.CurrentInstance is { } serverInternal)
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user