feat(runtime): NodeId->driver reverse routing + primary-gated RouteNodeWrite

This commit is contained in:
Joseph Doherty
2026-06-13 11:44:26 -04:00
parent b031a6ceef
commit f8f1027287
2 changed files with 384 additions and 0 deletions
@@ -10,6 +10,7 @@ using ZB.MOM.WW.OtOpcUa.Commons.Interfaces;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy;
using ZB.MOM.WW.OtOpcUa.Commons.Observability;
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
using ZB.MOM.WW.OtOpcUa.Commons.Types;
@@ -96,6 +97,44 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
/// </summary>
private readonly Dictionary<(string DriverInstanceId, string FullName), List<string>> _nodeIdByDriverRef = new();
/// <summary>
/// Inverse of <see cref="_nodeIdByDriverRef"/>: <c>folder-scoped equipment NodeId →
/// (DriverInstanceId, FullName)</c>. Rebuilt every apply by <see cref="PushDesiredSubscriptions"/>
/// from the same <c>EquipmentTags</c> pass, and resolved by <see cref="HandleRouteNodeWrite"/> so an
/// inbound operator write targeting an equipment variable's NodeId is forwarded to the owning
/// driver child as a write of its wire-ref <c>FullName</c>. Each NodeId maps to exactly one driver
/// ref (a variable is backed by a single driver attribute), so this is a flat 1:1 map (the forward
/// map fans out 1:N because one ref can back several variables).
/// </summary>
private readonly Dictionary<string, (string DriverInstanceId, string FullName)> _driverRefByNodeId =
new(StringComparer.Ordinal);
/// <summary>
/// Cached local <see cref="RedundancyRole"/> from the latest <see cref="RedundancyStateChanged"/>
/// snapshot (null = unknown until the first snapshot arrives, or no local node match). The inbound
/// write gate in <see cref="HandleRouteNodeWrite"/> reuses this signal — the SAME one the
/// scripted-alarm emit gate uses (<c>ScriptedAlarmHostActor._localRole</c>): only the Primary
/// services writes, default-allow while unknown so single-node deploys + the boot window never
/// reject (a node is the sole Primary until told otherwise).
/// </summary>
private RedundancyRole? _localRole;
/// <summary>
/// Routes an inbound operator write (resolved from the OPC UA node-manager side, Task 11) to the
/// owning driver child. <see cref="HandleRouteNodeWrite"/> gates on the local node being the
/// driver Primary, resolves the <see cref="_driverRefByNodeId"/> reverse map, and Asks the child a
/// <see cref="DriverInstanceActor.WriteAttribute"/> carrying the driver-side <see cref="FullName"/>.
/// </summary>
/// <param name="NodeId">The folder-scoped equipment-variable NodeId the operator wrote to.</param>
/// <param name="Value">The value to write (the driver coerces it to the attribute's data type).</param>
public sealed record RouteNodeWrite(string NodeId, object? Value);
/// <summary>Reply to <see cref="RouteNodeWrite"/>: the outcome of forwarding the write to the driver
/// (or a gate/lookup failure that never reached the driver).</summary>
/// <param name="Success">True when the driver accepted the write.</param>
/// <param name="Reason">Failure detail when <paramref name="Success"/> is false; null on success.</param>
public sealed record NodeWriteResult(bool Success, string? Reason);
private sealed record ChildEntry(IActorRef Actor, DriverInstanceSpec Spec, bool Stubbed)
{
// Convenience accessors for sites that don't need the full spec.
@@ -218,6 +257,11 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(DeploymentsTopic, Self));
// Subscribe to driver-control topic so AdminUI Reconnect/Restart commands land here.
DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(DriverControlTopic, Self));
// Subscribe to the redundancy-state topic so cluster role changes cache this node's role — the
// inbound-write gate in HandleRouteNodeWrite reuses the SAME signal the scripted-alarm emit gate
// uses so only the Primary services operator writes (the secondary keeps state warm for failover).
DistributedPubSub.Get(Context.System).Mediator.Tell(
new Subscribe(ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.RedundancyStateTopic, Self));
// Spawn the VirtualTag host BEFORE Bootstrap so the bootstrap-restore path (which routes
// through PushDesiredSubscriptions and Tells ApplyVirtualTags) has a live host to target.
SpawnVirtualTagHost();
@@ -363,6 +407,8 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
Receive<DriverInstanceActor.AttributeValuePublished>(ForwardToMux);
Receive<RestartDriver>(HandleRestartDriver);
Receive<ReconnectDriver>(HandleReconnectDriver);
Receive<RouteNodeWrite>(HandleRouteNodeWrite);
Receive<RedundancyStateChanged>(OnRedundancyStateChanged);
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
}
@@ -384,6 +430,8 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
Receive<DriverInstanceActor.AttributeValuePublished>(ForwardToMux);
Receive<RestartDriver>(HandleRestartDriver);
Receive<ReconnectDriver>(HandleReconnectDriver);
Receive<RouteNodeWrite>(HandleRouteNodeWrite);
Receive<RedundancyStateChanged>(OnRedundancyStateChanged);
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
}
@@ -417,6 +465,75 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
}
}
/// <summary>
/// Routes an inbound operator write (Task 11 Asks this from the OPC UA node-manager side) to the
/// owning driver child. Order matters:
/// <list type="number">
/// <item>PRIMARY gate FIRST — reuses the same <see cref="_localRole"/> redundancy signal the
/// scripted-alarm emit gate uses. Only the Primary services writes (default-allow while the
/// role is unknown, so single-node deploys + the boot window never reject). A Secondary/Detached
/// node keeps its address space + driver state warm for failover but must NOT push writes to the
/// shared field device.</item>
/// <item>Resolve the <see cref="_driverRefByNodeId"/> reverse map to the owning
/// <c>(DriverInstanceId, FullName)</c>.</item>
/// <item>Resolve the running driver child.</item>
/// <item>Ask the child a bounded <see cref="DriverInstanceActor.WriteAttribute"/> of the driver-side
/// <see cref="DriverInstanceActor.WriteAttribute.TagId">FullName</see> and pipe the translated
/// result back to the asker.</item>
/// </list>
/// Every branch replies the asker a <see cref="NodeWriteResult"/> exactly once.
/// </summary>
private void HandleRouteNodeWrite(RouteNodeWrite msg)
{
// PRIMARY GATE FIRST — only the Primary services operator writes (same signal as the alarm-emit
// gate; unknown role ⇒ treated as Primary so single-node deploys + the boot window aren't blocked).
if (_localRole is RedundancyRole.Secondary or RedundancyRole.Detached)
{
Sender.Tell(new NodeWriteResult(false, "not primary"));
return;
}
if (!_driverRefByNodeId.TryGetValue(msg.NodeId, out var target))
{
Sender.Tell(new NodeWriteResult(false, $"no driver mapping for node {msg.NodeId}"));
return;
}
if (!_children.TryGetValue(target.DriverInstanceId, out var entry))
{
Sender.Tell(new NodeWriteResult(false, "driver not running"));
return;
}
// Ask the child a bounded write — DriverInstanceActor.HandleWriteAsync already bounds the backend
// call to 5s, so the 8s Ask is a safety net for an actor that never replies. Capture Sender before
// the continuation: it runs off the actor thread, where raw Sender is unsafe to read.
var replyTo = Sender;
entry.Actor.Ask<DriverInstanceActor.WriteAttributeResult>(
new DriverInstanceActor.WriteAttribute(target.FullName, msg.Value!), TimeSpan.FromSeconds(8))
.ContinueWith(
t => t.IsCompletedSuccessfully
? new NodeWriteResult(t.Result.Success, t.Result.Reason)
: new NodeWriteResult(false, "write timeout"),
CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default)
.PipeTo(replyTo);
}
/// <summary>Caches this node's <see cref="RedundancyRole"/> from a cluster redundancy snapshot so
/// <see cref="HandleRouteNodeWrite"/> can gate inbound writes to the Primary. A snapshot that doesn't
/// mention this node leaves the cached role unchanged ⇒ default-allow. Mirrors
/// <c>ScriptedAlarmHostActor.OnRedundancyStateChanged</c> / <c>OpcUaPublishActor</c>.</summary>
private void OnRedundancyStateChanged(RedundancyStateChanged msg)
{
var local = msg.Nodes.FirstOrDefault(n => n.NodeId == _localNode);
if (local is not null)
{
_localRole = local.Role;
}
}
private void Stale()
{
Receive<DispatchDeployment>(_ =>
@@ -427,6 +544,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
Receive<RetryConfigDbConnection>(_ => TryRecoverFromStale());
Receive<RestartDriver>(HandleRestartDriver);
Receive<ReconnectDriver>(HandleReconnectDriver);
Receive<RedundancyStateChanged>(OnRedundancyStateChanged);
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
Timers.StartPeriodicTimer("retry-db", RetryConfigDbConnection.Instance, ReconnectInterval);
}
@@ -623,6 +741,12 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
// can land driver values on the right node. Clear-and-repopulate every apply so renames
// (Name/FolderPath/EquipmentId changes) and removals are reflected.
_nodeIdByDriverRef.Clear();
// Inverse map for the inbound operator-write path (NodeId → (DriverInstanceId, FullName)): an
// operator writes to the variable's folder-scoped NodeId, but the driver writes by its wire-ref
// FullName. Cleared + repopulated from the SAME EquipmentTags pass so renames/removals are
// reflected. Each NodeId maps to exactly one driver ref (a variable is backed by a single driver
// attribute), so last-writer-wins on the rare duplicate is harmless.
_driverRefByNodeId.Clear();
foreach (var t in composition.EquipmentTags)
{
var key = (t.DriverInstanceId, t.FullName);
@@ -630,6 +754,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
if (!_nodeIdByDriverRef.TryGetValue(key, out var list))
_nodeIdByDriverRef[key] = list = new List<string>();
if (!list.Contains(nodeId)) list.Add(nodeId);
_driverRefByNodeId[nodeId] = key;
}
var total = 0;