feat(server): inbound operator-write pipeline — OnWriteValue authz gate + node-write router

This commit is contained in:
Joseph Doherty
2026-06-13 12:02:34 -04:00
parent a23fb2b82e
commit bb5832e900
5 changed files with 266 additions and 3 deletions
@@ -1,11 +1,14 @@
using Akka.Actor;
using Akka.Cluster.Tools.PublishSubscribe;
using Akka.Hosting;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
using ZB.MOM.WW.OtOpcUa.OpcUaServer.Security;
using ZB.MOM.WW.OtOpcUa.Runtime;
using ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
using ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms;
namespace ZB.MOM.WW.OtOpcUa.Host.OpcUa;
@@ -28,6 +31,7 @@ public sealed class OtOpcUaServerHostedService : IHostedService, IAsyncDisposabl
private readonly DeferredServiceLevelPublisher _deferredServiceLevel;
private readonly IOpcUaUserAuthenticator _userAuthenticator;
private readonly Func<ActorSystem> _actorSystemAccessor;
private readonly ActorRegistry _actorRegistry;
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger<OtOpcUaServerHostedService> _logger;
@@ -44,6 +48,9 @@ public sealed class OtOpcUaServerHostedService : IHostedService, IAsyncDisposabl
/// <param name="actorSystemAccessor">Lazy accessor for the running <see cref="ActorSystem"/>, used to
/// resolve the DistributedPubSub mediator the inbound alarm-command router publishes through. Resolved
/// lazily (mirroring <c>DpsScriptLogPublisher</c>) so construction never races Akka startup.</param>
/// <param name="actorRegistry">The Akka.Hosting actor registry, used to resolve the local
/// <c>DriverHostActor</c> ref (<c>DriverHostActorKey</c>) the inbound node-write router Asks. Resolved
/// in <see cref="StartAsync"/> after the runtime actors have been registered.</param>
/// <param name="loggerFactory">The logger factory for creating loggers.</param>
public OtOpcUaServerHostedService(
IOptions<OpcUaApplicationHostOptions> options,
@@ -51,6 +58,7 @@ public sealed class OtOpcUaServerHostedService : IHostedService, IAsyncDisposabl
DeferredServiceLevelPublisher deferredServiceLevel,
IOpcUaUserAuthenticator userAuthenticator,
Func<ActorSystem> actorSystemAccessor,
ActorRegistry actorRegistry,
ILoggerFactory loggerFactory)
{
_options = options.Value;
@@ -58,6 +66,7 @@ public sealed class OtOpcUaServerHostedService : IHostedService, IAsyncDisposabl
_deferredServiceLevel = deferredServiceLevel;
_userAuthenticator = userAuthenticator;
_actorSystemAccessor = actorSystemAccessor;
_actorRegistry = actorRegistry;
_loggerFactory = loggerFactory;
_logger = loggerFactory.CreateLogger<OtOpcUaServerHostedService>();
}
@@ -121,6 +130,36 @@ public sealed class OtOpcUaServerHostedService : IHostedService, IAsyncDisposabl
}
});
// Wire the reverse-path inbound operator-write router: a client write to a writable equipment-tag
// node that passes the node manager's WriteOperate gate routes the write to the owning driver child
// (RouteNodeWrite → NodeWriteResult) via the local DriverHostActor. This dispatch is FIRE-AND-FORGET
// (just like the alarm router): the SDK's CustomNodeManager2.Write holds the node-manager Lock while
// invoking OnWriteValue, so a blocking Ask here would freeze ALL address-space operations (reads,
// subscription notifications, the publish path) for up to the Ask timeout. We kick off the Ask and
// log failures from a continuation; the write reaches the device asynchronously and the value
// self-corrects on the next driver poll (standard OPC UA optimistic-write semantics). The
// DriverHostActor ref is resolved LAZILY per write — this hosted service's StartAsync runs before
// the Akka DriverHostActor registers, so a one-shot resolve here would always miss and leave every
// write unavailable. By write time (long after startup) the registry has it; a node that genuinely
// has no driver-host (admin-only, no writable driver nodes materialised) logs + drops the write.
_server.SetNodeWriteRouter((nodeId, value) =>
{
if (!_actorRegistry.TryGet<DriverHostActorKey>(out var driverHost))
{
_logger.LogWarning("Inbound write to {NodeId} dropped: no DriverHostActor registered", nodeId);
return;
}
driverHost.Ask<DriverHostActor.NodeWriteResult>(
new DriverHostActor.RouteNodeWrite(nodeId, value), TimeSpan.FromSeconds(10))
.ContinueWith(t =>
{
if (!t.IsCompletedSuccessfully)
_logger.LogWarning("Operator write to {NodeId} failed or timed out", nodeId);
else if (!t.Result.Success)
_logger.LogWarning("Operator write to {NodeId} rejected: {Reason}", nodeId, t.Result.Reason);
}, TaskScheduler.Default);
});
// ServiceLevel publisher needs IServerInternal — only available after Start.
if (_server.CurrentInstance is { } serverInternal)
{
@@ -142,6 +181,8 @@ public sealed class OtOpcUaServerHostedService : IHostedService, IAsyncDisposabl
// half-disposed NodeManager.
_deferredSink.SetSink(null);
_deferredServiceLevel.SetInner(null);
// Drop the inbound-write router too so a late client write doesn't Ask a stopping DriverHostActor.
_server?.SetNodeWriteRouter(null);
return Task.CompletedTask;
}