feat(opcua): write-outcome self-correction — capture prior + compare-and-revert on failure

This commit is contained in:
Joseph Doherty
2026-06-14 01:30:20 -04:00
parent 526ddb6a57
commit 10efcf4517
4 changed files with 224 additions and 133 deletions
@@ -130,35 +130,23 @@ public sealed class OtOpcUaServerHostedService : IHostedService, IAsyncDisposabl
}
});
// Wire the reverse-path inbound operator-write router: a client write to a writable equipment-tag
// Wire the reverse-path inbound operator-write gateway: a client write to a writable equipment-tag
// node that passes the node manager's WriteOperate gate routes the write to the owning driver child
// (RouteNodeWrite → NodeWriteResult) via the local DriverHostActor. This dispatch is FIRE-AND-FORGET
// (just like the alarm router): the SDK's CustomNodeManager2.Write holds the node-manager Lock while
// invoking OnWriteValue, so a blocking Ask here would freeze ALL address-space operations (reads,
// subscription notifications, the publish path) for up to the Ask timeout. We kick off the Ask and
// log failures from a continuation; the write reaches the device asynchronously and the value
// self-corrects on the next driver poll (standard OPC UA optimistic-write semantics). The
// DriverHostActor ref is resolved LAZILY per write — this hosted service's StartAsync runs before
// the Akka DriverHostActor registers, so a one-shot resolve here would always miss and leave every
// write unavailable. By write time (long after startup) the registry has it; a node that genuinely
// has no driver-host (admin-only, no writable driver nodes materialised) logs + drops the write.
_server.SetNodeWriteRouter((nodeId, value) =>
{
if (!_actorRegistry.TryGet<DriverHostActorKey>(out var driverHost))
{
_logger.LogWarning("Inbound write to {NodeId} dropped: no DriverHostActor registered", nodeId);
return;
}
driverHost.Ask<DriverHostActor.NodeWriteResult>(
new DriverHostActor.RouteNodeWrite(nodeId, value), TimeSpan.FromSeconds(10))
.ContinueWith(t =>
{
if (!t.IsCompletedSuccessfully)
_logger.LogWarning("Operator write to {NodeId} failed or timed out", nodeId);
else if (!t.Result.Success)
_logger.LogWarning("Operator write to {NodeId} rejected: {Reason}", nodeId, t.Result.Reason);
}, TaskScheduler.Default);
});
// (RouteNodeWrite → NodeWriteResult) via the local DriverHostActor. The node manager calls the
// gateway's WriteAsync FIRE-AND-FORGET: the SDK's CustomNodeManager2.Write holds the node-manager
// Lock while invoking OnWriteValue, so a blocking Ask here would freeze ALL address-space operations
// (reads, subscription notifications, the publish path) for up to the Ask timeout. The gateway kicks
// off the Ask and resolves a NodeWriteOutcome; the node manager applies the client value optimistically
// and self-corrects (reverts to the pre-write value) when the device write comes back FAILED — but only
// while the node still holds the optimistic value, so a fresh driver poll is not clobbered. The
// DriverHostActor ref is resolved LAZILY per write (inside the gateway) — this hosted service's
// StartAsync runs before the Akka DriverHostActor registers, so a one-shot resolve here would always
// miss and leave every write unavailable. By write time (long after startup) the registry has it; a
// node that genuinely has no driver-host (admin-only, no writable driver nodes materialised) logs +
// resolves the write to "writes unavailable".
_server.SetNodeWriteGateway(new ActorNodeWriteGateway(
resolveDriverHost: () => _actorRegistry.TryGet<DriverHostActorKey>(out var driverHost) ? driverHost : null,
logger: _loggerFactory.CreateLogger<ActorNodeWriteGateway>()));
// ServiceLevel publisher needs IServerInternal — only available after Start.
if (_server.CurrentInstance is { } serverInternal)
@@ -181,8 +169,8 @@ public sealed class OtOpcUaServerHostedService : IHostedService, IAsyncDisposabl
// half-disposed NodeManager.
_deferredSink.SetSink(null);
_deferredServiceLevel.SetInner(null);
// Drop the inbound-write router too so a late client write doesn't Ask a stopping DriverHostActor.
_server?.SetNodeWriteRouter(null);
// Restore the Null write gateway so a late client write doesn't Ask a stopping DriverHostActor.
_server?.SetNodeWriteGateway(null);
return Task.CompletedTask;
}
@@ -70,37 +70,40 @@ public sealed class OtOpcUaNodeManager : CustomNodeManager2
/// </summary>
public Action<AlarmCommand>? AlarmCommandRouter { get; set; }
private volatile Action<string, object?>? _nodeWriteRouter;
private volatile IOpcUaNodeWriteGateway _nodeWriteGateway = NullOpcUaNodeWriteGateway.Instance;
/// <summary>
/// Reverse-path sink for inbound OPC UA operator writes to a writable equipment-tag variable node.
/// Reverse-path gateway for inbound OPC UA operator writes to a writable equipment-tag variable node.
/// When a client writes such a node, the node's <see cref="BaseDataVariableState.OnWriteValue"/>
/// handler (<see cref="OnEquipmentTagWrite"/>, attached by <see cref="EnsureVariable"/> when the
/// variable is writable) first gates on the caller's <see cref="OpcUaDataPlaneRoles.WriteOperate"/>
/// role and, when allowed, invokes this delegate with the node's string id + the written value to
/// route the write to the backing driver.
/// role and, when allowed, calls <see cref="IOpcUaNodeWriteGateway.WriteAsync"/> with the node's
/// string id + the written value to route the write to the backing driver.
/// <para>
/// This is the write-side twin of <see cref="AlarmCommandRouter"/> — a plain
/// <see cref="Action{T1,T2}"/> (no Akka / <c>IActorRef</c> / DI handle) so this assembly stays
/// Akka-free. The handler delegates run under the node-manager <c>Lock</c> (the OPC UA SDK's
/// <c>CustomNodeManager2.Write</c> holds <c>Lock</c> while invoking <c>OnWriteValue</c>), so this
/// dispatch MUST be non-blocking and fire-and-forget — exactly like the alarm router. The host
/// sets it at boot to a lambda that kicks off an unawaited bounded <c>Ask</c> of the local
/// <c>DriverHostActor</c> (<c>RouteNodeWrite</c> → <c>NodeWriteResult</c>) and logs failures from
/// a continuation; the write reaches the device asynchronously and the value self-corrects on the
/// next driver poll (standard OPC UA optimistic-write semantics). Null (the default) makes every
/// write resolve to a "writes unavailable" failure.
/// This is the write-side twin of <see cref="AlarmCommandRouter"/>; the gateway abstraction keeps
/// this assembly Akka-free (the host wires an <c>ActorNodeWriteGateway</c> that Asks the local
/// <c>DriverHostActor</c>). The handler delegates run under the node-manager <c>Lock</c> (the OPC
/// UA SDK's <c>CustomNodeManager2.Write</c> holds <c>Lock</c> while invoking <c>OnWriteValue</c>),
/// so the dispatch is FIRE-AND-FORGET — the handler kicks off <c>WriteAsync</c> and returns
/// <c>Good</c> immediately so the SDK applies the client value optimistically; it MUST NOT block
/// on the device round-trip. When the asynchronous <see cref="NodeWriteOutcome"/> comes back
/// FAILED, an off-Lock continuation self-corrects: it re-takes <c>Lock</c> and reverts the node to
/// its real pre-write value — but only while the node still holds the optimistic value, so a fresh
/// driver poll that has already moved the node on is not clobbered (see
/// <see cref="ShouldRevert"/> / <see cref="RevertOptimisticWriteIfNeeded"/>).
/// </para>
/// <para>
/// Backed by a <c>volatile</c> field (auto-properties can't be volatile) to make the
/// Set by the host at <c>StartAsync</c>; the <see cref="NullOpcUaNodeWriteGateway"/> default
/// (assigning <c>null</c> restores it) makes every write resolve to a "writes unavailable"
/// failure. Backed by a <c>volatile</c> field (auto-properties can't be volatile) to make the
/// startup-write / SDK-thread-read explicit: the host assigns it once at boot on the start thread
/// and the SDK reads it on Write request threads.
/// </para>
/// </summary>
public Action<string, object?>? NodeWriteRouter
public IOpcUaNodeWriteGateway NodeWriteGateway
{
get => _nodeWriteRouter;
set => _nodeWriteRouter = value;
get => _nodeWriteGateway;
set => _nodeWriteGateway = value ?? NullOpcUaNodeWriteGateway.Instance;
}
/// <summary>Look up a materialised Part 9 alarm-condition node by its alarm node id (the
@@ -589,16 +592,27 @@ public sealed class OtOpcUaNodeManager : CustomNodeManager2
/// <see cref="EnsureVariable"/> (Task 11). The OPC UA SDK invokes it when a client writes the
/// node's Value. It resolves the calling principal off the SDK <paramref name="context"/> the
/// SAME way <see cref="HandleAlarmCommand"/> does, gates on the
/// <see cref="OpcUaDataPlaneRoles.WriteOperate"/> role (<b>fails closed</b>: a missing identity or
/// missing role is denied), and on pass dispatches the value through <see cref="NodeWriteRouter"/>.
/// <see cref="OpcUaDataPlaneRoles.WriteOperate"/> role + the gateway being wired
/// (<b>fails closed</b>: a missing identity / missing role ⇒ <c>BadUserAccessDenied</c>; no gateway ⇒
/// <c>BadNotWritable</c>) via the pure <see cref="EvaluateEquipmentWriteGate"/>, and on pass dispatches
/// the value through <see cref="NodeWriteGateway"/>.
/// <para>
/// The dispatch is FIRE-AND-FORGET: the SDK's <c>CustomNodeManager2.Write</c> holds the node
/// manager <c>Lock</c> while invoking this handler, so a blocking driver round-trip here would
/// freeze every address-space operation (reads, subscription notifications, the publish path) for
/// the duration. The router only kicks off the asynchronous route. Returning
/// <see cref="ServiceResult.Good"/> lets the SDK apply the written value optimistically; the next
/// driver poll republishes the confirmed register value over the optimistic one via the normal
/// <see cref="WriteValue"/> path.
/// the duration. The gateway only kicks off the asynchronous route. Returning
/// <see cref="ServiceResult.Good"/> lets the SDK apply the written value optimistically.
/// </para>
/// <para>
/// <b>Write-outcome self-correction.</b> Before returning Good (which makes the SDK overwrite the
/// node with <paramref name="value"/>) we capture both the optimistic value AND the node's REAL
/// prior value/status — at handler entry the node still holds the prior value. An off-Lock
/// continuation on the <see cref="NodeWriteOutcome"/> then reverts the node to that prior
/// value/status on a FAILED outcome, but ONLY while the node still holds the optimistic value, so a
/// fresh driver poll that already republished the confirmed register value is not clobbered
/// (<see cref="RevertOptimisticWriteIfNeeded"/> / <see cref="ShouldRevert"/>). On success the
/// optimistic value stands and the next poll re-confirms it via the normal <see cref="WriteValue"/>
/// path.
/// </para>
/// </summary>
private ServiceResult OnEquipmentTagWrite(
@@ -606,36 +620,54 @@ public sealed class OtOpcUaNodeManager : CustomNodeManager2
ref object value, ref StatusCode statusCode, ref DateTime timestamp)
{
var identity = (context as ISessionOperationContext)?.UserIdentity as RoleCarryingUserIdentity;
// Capture the value into a local so the route thunk (a lambda) can close over it — a ref parameter
// can't be captured by a lambda. The handler does not mutate the ref value: returning Good lets the
// SDK apply the client's value as-is.
var writtenValue = value;
var gateway = _nodeWriteGateway;
var gate = EvaluateEquipmentWriteGate(identity, gateway is not NullOpcUaNodeWriteGateway);
if (gate is not null) return gate;
// Capture the optimistic value + the REAL prior value/status BEFORE the SDK applies the write
// (at handler entry the node still holds the prior value; returning Good makes the SDK apply `value`).
var optimisticValue = value;
var nodeKey = node.NodeId.Identifier?.ToString() ?? string.Empty;
var router = _nodeWriteRouter;
Action? route = router is { } r ? () => r(nodeKey, writtenValue) : null;
return EvaluateEquipmentWrite(identity, route);
object? priorValue = null;
StatusCode priorStatus = StatusCodes.Good;
if (node is BaseDataVariableState variable)
{
priorValue = variable.Value;
priorStatus = variable.StatusCode;
}
// Fire-and-forget — MUST NOT block under Lock. On a FAILED outcome, compare-and-revert (off-Lock
// continuation). A faulted/cancelled WriteAsync is treated as a failure so the optimistic value never
// sticks when the route never resolved a real outcome.
_ = gateway.WriteAsync(nodeKey, optimisticValue, CancellationToken.None)
.ContinueWith(
t =>
{
var outcome = t.IsCompletedSuccessfully ? t.Result : new NodeWriteOutcome(false, "write dispatch faulted");
RevertOptimisticWriteIfNeeded(nodeKey, outcome, optimisticValue, priorValue, priorStatus);
},
CancellationToken.None, TaskContinuationOptions.None, TaskScheduler.Default);
return ServiceResult.Good;
}
/// <summary>
/// Pure decision for an inbound equipment-tag write: the <see cref="OpcUaDataPlaneRoles.WriteOperate"/>
/// role gate + the fire-and-forget dispatch, extracted off <see cref="OnEquipmentTagWrite"/> so it is
/// unit-testable without booting an SDK server. The gate fails closed (null identity or missing role
/// <c>BadUserAccessDenied</c>, <paramref name="route"/> NOT invoked). When the gate passes but no
/// router is wired (<paramref name="route"/> is null), the write resolves to <c>BadNotWritable</c>
/// ("writes unavailable"). Otherwise it invokes <paramref name="route"/> exactly once (fire-and-forget
/// — the actual driver round-trip happens asynchronously off the router lambda) and returns
/// <see cref="ServiceResult.Good"/> so the SDK applies the client value optimistically. Role
/// comparison is case-insensitive (the role set is built with
/// <see cref="StringComparer.OrdinalIgnoreCase"/>), matching the alarm gate.
/// Pure role + availability gate for an inbound equipment-tag write, extracted off
/// <see cref="OnEquipmentTagWrite"/> so it is unit-testable without booting an SDK server. Fails closed:
/// a null identity or an identity missing the <see cref="OpcUaDataPlaneRoles.WriteOperate"/> role
/// <c>BadUserAccessDenied</c>. When the gate passes but no real gateway is wired
/// (<paramref name="gatewayWired"/> is false) ⇒ <c>BadNotWritable</c> ("writes unavailable"). A
/// <c>null</c> return means "proceed" (the caller dispatches + returns Good). Role comparison is
/// case-insensitive (the role set is built with <see cref="StringComparer.OrdinalIgnoreCase"/>),
/// matching the alarm gate.
/// </summary>
/// <param name="identity">The role-carrying identity extracted off the SDK context, or null when the
/// session is anonymous / carries no role-carrying identity.</param>
/// <param name="route">A thunk that kicks off the asynchronous route of the write to the driver; invoked
/// only when the gate passes. Null when no router is wired (e.g. admin-only nodes).</param>
/// <returns><see cref="ServiceResult.Good"/> on an allowed write (dispatch started); <c>BadUserAccessDenied</c>
/// when the gate vetoes; <c>BadNotWritable</c> ("writes unavailable") when no router is wired.</returns>
internal static ServiceResult EvaluateEquipmentWrite(
RoleCarryingUserIdentity? identity, Action? route)
/// <param name="gatewayWired">True when a non-Null <see cref="IOpcUaNodeWriteGateway"/> is wired; false
/// for the Null default (no route — e.g. admin-only nodes / pre-boot).</param>
/// <returns><c>null</c> to proceed (gate passed); otherwise the veto <see cref="ServiceResult"/>
/// (<c>BadUserAccessDenied</c> on a failed role gate, <c>BadNotWritable</c> when no gateway is wired).</returns>
internal static ServiceResult? EvaluateEquipmentWriteGate(RoleCarryingUserIdentity? identity, bool gatewayWired)
{
if (identity is null || !identity.Roles.Contains(OpcUaDataPlaneRoles.WriteOperate, StringComparer.OrdinalIgnoreCase))
{
@@ -644,16 +676,56 @@ public sealed class OtOpcUaNodeManager : CustomNodeManager2
return new ServiceResult(StatusCodes.BadUserAccessDenied);
}
if (route is null)
if (!gatewayWired)
{
// Gate passed but no router wired (admin-only nodes / pre-boot) ⇒ writes unavailable.
// Gate passed but no gateway wired (admin-only nodes / pre-boot) ⇒ writes unavailable.
return new ServiceResult(StatusCodes.BadNotWritable, "writes unavailable");
}
// Fire-and-forget: kick off the asynchronous route and return Good immediately. The SDK holds the
// node-manager Lock here, so we must NOT block on the driver round-trip.
route();
return ServiceResult.Good;
return null;
}
/// <summary>
/// Pure decision for the write-outcome self-correction: revert the node to its pre-write value ONLY on
/// a FAILED outcome AND only while the node still holds the optimistic value. The
/// still-holds-the-optimistic-value check is what stops a revert from clobbering a fresh driver poll
/// that already republished the confirmed register value over the optimistic write. Pure (value
/// comparison via <see cref="object.Equals(object?, object?)"/>) so it is unit-testable without an SDK
/// server.
/// </summary>
/// <param name="outcome">The device-write outcome routed back by the gateway.</param>
/// <param name="currentNodeValue">The node's current Value at revert time.</param>
/// <param name="optimisticValue">The value the SDK optimistically applied on the write.</param>
/// <returns><c>true</c> to revert (failed outcome and node unchanged since the optimistic write);
/// <c>false</c> on success, or when a poll has already moved the node off the optimistic value.</returns>
internal static bool ShouldRevert(NodeWriteOutcome outcome, object? currentNodeValue, object? optimisticValue) =>
!outcome.Success && Equals(currentNodeValue, optimisticValue);
/// <summary>
/// Off-Lock continuation body for the write-outcome self-correction: re-takes <c>Lock</c> and, when
/// <see cref="ShouldRevert"/> says so, reverts the node's Value + StatusCode to the captured pre-write
/// value/status and notifies subscribers (same node-update shape as <see cref="WriteValue"/>). A
/// no-op when the node was rebuilt/removed in the interim, when the outcome succeeded, or when a fresh
/// poll already moved the node off the optimistic value. Silent — this node manager carries no logger;
/// the gateway logs the underlying write failure.
/// </summary>
/// <param name="nodeId">The string id of the written variable node.</param>
/// <param name="outcome">The device-write outcome routed back by the gateway.</param>
/// <param name="optimisticValue">The value the SDK optimistically applied on the write.</param>
/// <param name="priorValue">The node's real value captured before the optimistic write.</param>
/// <param name="priorStatus">The node's real status captured before the optimistic write.</param>
private void RevertOptimisticWriteIfNeeded(
string nodeId, NodeWriteOutcome outcome, object? optimisticValue, object? priorValue, StatusCode priorStatus)
{
lock (Lock)
{
if (!_variables.TryGetValue(nodeId, out var variable)) return; // rebuilt/removed ⇒ no-op
if (!ShouldRevert(outcome, variable.Value, optimisticValue)) return; // success, or poll moved it on
variable.Value = priorValue;
variable.StatusCode = priorStatus;
variable.Timestamp = DateTime.UtcNow;
variable.ClearChangeMasks(SystemContext, includeChildren: false);
}
}
/// <summary>Map our domain <c>AlarmType</c> string to the matching SDK condition subtype. Script
@@ -37,22 +37,25 @@ public sealed class OtOpcUaSdkServer : StandardServer
}
/// <summary>
/// Wire the reverse-path sink for inbound operator writes to writable equipment-tag nodes onto the
/// created <see cref="OtOpcUaNodeManager"/>. The host calls this after start with a fire-and-forget
/// lambda that kicks off a bounded <c>Ask</c> of the local <c>DriverHostActor</c>
/// (<c>RouteNodeWrite</c>) and logs failures from a continuation — it must NOT block, because the
/// handler runs under the node-manager <c>Lock</c> (mirrors <see cref="SetAlarmCommandRouter"/>).
/// No-op (returns <c>false</c>) when the node manager has not been created yet, so the caller can
/// detect a too-early call.
/// Wire the reverse-path gateway for inbound operator writes to writable equipment-tag nodes onto the
/// created <see cref="OtOpcUaNodeManager"/>. The host calls this after start with an
/// <c>ActorNodeWriteGateway</c> whose <c>WriteAsync</c> kicks off a bounded <c>Ask</c> of the local
/// <c>DriverHostActor</c> (<c>RouteNodeWrite</c>); the node manager calls it fire-and-forget and uses
/// the resolved <c>NodeWriteOutcome</c> to self-correct the node on a failed device write (mirrors
/// <see cref="SetAlarmCommandRouter"/>). Passing <c>null</c> restores the
/// <c>NullOpcUaNodeWriteGateway</c> default (writes unavailable). No-op (returns <c>false</c>) when the
/// node manager has not been created yet, so the caller can detect a too-early call.
/// </summary>
/// <param name="router">The router invoked by the writable node's <c>OnWriteValue</c> handler once the
/// <c>WriteOperate</c> gate passes; may be <c>null</c> to clear it.</param>
/// <returns><c>true</c> when the router was set on a live node manager; <c>false</c> when no node
/// <param name="gateway">The gateway invoked by the writable node's <c>OnWriteValue</c> handler once the
/// <c>WriteOperate</c> gate passes; may be <c>null</c> to restore the Null default.</param>
/// <returns><c>true</c> when the gateway was set on a live node manager; <c>false</c> when no node
/// manager exists yet.</returns>
public bool SetNodeWriteRouter(Action<string, object?>? router)
public bool SetNodeWriteGateway(IOpcUaNodeWriteGateway? gateway)
{
if (_otOpcUaNodeManager is null) return false;
_otOpcUaNodeManager.NodeWriteRouter = router;
// The NodeWriteGateway setter null-coalesces to the Null default, so a null gateway is intentional
// (restores "writes unavailable"); forgive the nullable-in here.
_otOpcUaNodeManager.NodeWriteGateway = gateway!;
return true;
}