From f8f1027287f90b00667027cce4ceb0894292f5b5 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 13 Jun 2026 11:44:26 -0400 Subject: [PATCH] feat(runtime): NodeId->driver reverse routing + primary-gated RouteNodeWrite --- .../Drivers/DriverHostActor.cs | 125 +++++++++ .../DriverHostActorWriteRoutingTests.cs | 259 ++++++++++++++++++ 2 files changed, 384 insertions(+) create mode 100644 tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverHostActorWriteRoutingTests.cs diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs index b8b70427..de4fbd2b 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs @@ -10,6 +10,7 @@ using ZB.MOM.WW.OtOpcUa.Commons.Interfaces; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy; using ZB.MOM.WW.OtOpcUa.Commons.Observability; using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; using ZB.MOM.WW.OtOpcUa.Commons.Types; @@ -96,6 +97,44 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers /// private readonly Dictionary<(string DriverInstanceId, string FullName), List> _nodeIdByDriverRef = new(); + /// + /// Inverse of : folder-scoped equipment NodeId → + /// (DriverInstanceId, FullName). Rebuilt every apply by + /// from the same EquipmentTags pass, and resolved by so an + /// inbound operator write targeting an equipment variable's NodeId is forwarded to the owning + /// driver child as a write of its wire-ref FullName. Each NodeId maps to exactly one driver + /// ref (a variable is backed by a single driver attribute), so this is a flat 1:1 map (the forward + /// map fans out 1:N because one ref can back several variables). + /// + private readonly Dictionary _driverRefByNodeId = + new(StringComparer.Ordinal); + + /// + /// Cached local from the latest + /// snapshot (null = unknown until the first snapshot arrives, or no local node match). The inbound + /// write gate in reuses this signal — the SAME one the + /// scripted-alarm emit gate uses (ScriptedAlarmHostActor._localRole): only the Primary + /// services writes, default-allow while unknown so single-node deploys + the boot window never + /// reject (a node is the sole Primary until told otherwise). + /// + private RedundancyRole? _localRole; + + /// + /// Routes an inbound operator write (resolved from the OPC UA node-manager side, Task 11) to the + /// owning driver child. gates on the local node being the + /// driver Primary, resolves the reverse map, and Asks the child a + /// carrying the driver-side . + /// + /// The folder-scoped equipment-variable NodeId the operator wrote to. + /// The value to write (the driver coerces it to the attribute's data type). + public sealed record RouteNodeWrite(string NodeId, object? Value); + + /// Reply to : the outcome of forwarding the write to the driver + /// (or a gate/lookup failure that never reached the driver). + /// True when the driver accepted the write. + /// Failure detail when is false; null on success. + public sealed record NodeWriteResult(bool Success, string? Reason); + private sealed record ChildEntry(IActorRef Actor, DriverInstanceSpec Spec, bool Stubbed) { // Convenience accessors for sites that don't need the full spec. @@ -218,6 +257,11 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(DeploymentsTopic, Self)); // Subscribe to driver-control topic so AdminUI Reconnect/Restart commands land here. DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(DriverControlTopic, Self)); + // Subscribe to the redundancy-state topic so cluster role changes cache this node's role — the + // inbound-write gate in HandleRouteNodeWrite reuses the SAME signal the scripted-alarm emit gate + // uses so only the Primary services operator writes (the secondary keeps state warm for failover). + DistributedPubSub.Get(Context.System).Mediator.Tell( + new Subscribe(ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.RedundancyStateTopic, Self)); // Spawn the VirtualTag host BEFORE Bootstrap so the bootstrap-restore path (which routes // through PushDesiredSubscriptions and Tells ApplyVirtualTags) has a live host to target. SpawnVirtualTagHost(); @@ -363,6 +407,8 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers Receive(ForwardToMux); Receive(HandleRestartDriver); Receive(HandleReconnectDriver); + Receive(HandleRouteNodeWrite); + Receive(OnRedundancyStateChanged); Receive(_ => { /* PubSub ack */ }); } @@ -384,6 +430,8 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers Receive(ForwardToMux); Receive(HandleRestartDriver); Receive(HandleReconnectDriver); + Receive(HandleRouteNodeWrite); + Receive(OnRedundancyStateChanged); Receive(_ => { /* PubSub ack */ }); } @@ -417,6 +465,75 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers } } + /// + /// Routes an inbound operator write (Task 11 Asks this from the OPC UA node-manager side) to the + /// owning driver child. Order matters: + /// + /// PRIMARY gate FIRST — reuses the same redundancy signal the + /// scripted-alarm emit gate uses. Only the Primary services writes (default-allow while the + /// role is unknown, so single-node deploys + the boot window never reject). A Secondary/Detached + /// node keeps its address space + driver state warm for failover but must NOT push writes to the + /// shared field device. + /// Resolve the reverse map to the owning + /// (DriverInstanceId, FullName). + /// Resolve the running driver child. + /// Ask the child a bounded of the driver-side + /// FullName and pipe the translated + /// result back to the asker. + /// + /// Every branch replies the asker a exactly once. + /// + private void HandleRouteNodeWrite(RouteNodeWrite msg) + { + // PRIMARY GATE FIRST — only the Primary services operator writes (same signal as the alarm-emit + // gate; unknown role ⇒ treated as Primary so single-node deploys + the boot window aren't blocked). + if (_localRole is RedundancyRole.Secondary or RedundancyRole.Detached) + { + Sender.Tell(new NodeWriteResult(false, "not primary")); + return; + } + + if (!_driverRefByNodeId.TryGetValue(msg.NodeId, out var target)) + { + Sender.Tell(new NodeWriteResult(false, $"no driver mapping for node {msg.NodeId}")); + return; + } + + if (!_children.TryGetValue(target.DriverInstanceId, out var entry)) + { + Sender.Tell(new NodeWriteResult(false, "driver not running")); + return; + } + + // Ask the child a bounded write — DriverInstanceActor.HandleWriteAsync already bounds the backend + // call to 5s, so the 8s Ask is a safety net for an actor that never replies. Capture Sender before + // the continuation: it runs off the actor thread, where raw Sender is unsafe to read. + var replyTo = Sender; + entry.Actor.Ask( + new DriverInstanceActor.WriteAttribute(target.FullName, msg.Value!), TimeSpan.FromSeconds(8)) + .ContinueWith( + t => t.IsCompletedSuccessfully + ? new NodeWriteResult(t.Result.Success, t.Result.Reason) + : new NodeWriteResult(false, "write timeout"), + CancellationToken.None, + TaskContinuationOptions.ExecuteSynchronously, + TaskScheduler.Default) + .PipeTo(replyTo); + } + + /// Caches this node's from a cluster redundancy snapshot so + /// can gate inbound writes to the Primary. A snapshot that doesn't + /// mention this node leaves the cached role unchanged ⇒ default-allow. Mirrors + /// ScriptedAlarmHostActor.OnRedundancyStateChanged / OpcUaPublishActor. + private void OnRedundancyStateChanged(RedundancyStateChanged msg) + { + var local = msg.Nodes.FirstOrDefault(n => n.NodeId == _localNode); + if (local is not null) + { + _localRole = local.Role; + } + } + private void Stale() { Receive(_ => @@ -427,6 +544,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers Receive(_ => TryRecoverFromStale()); Receive(HandleRestartDriver); Receive(HandleReconnectDriver); + Receive(OnRedundancyStateChanged); Receive(_ => { /* PubSub ack */ }); Timers.StartPeriodicTimer("retry-db", RetryConfigDbConnection.Instance, ReconnectInterval); } @@ -623,6 +741,12 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers // can land driver values on the right node. Clear-and-repopulate every apply so renames // (Name/FolderPath/EquipmentId changes) and removals are reflected. _nodeIdByDriverRef.Clear(); + // Inverse map for the inbound operator-write path (NodeId → (DriverInstanceId, FullName)): an + // operator writes to the variable's folder-scoped NodeId, but the driver writes by its wire-ref + // FullName. Cleared + repopulated from the SAME EquipmentTags pass so renames/removals are + // reflected. Each NodeId maps to exactly one driver ref (a variable is backed by a single driver + // attribute), so last-writer-wins on the rare duplicate is harmless. + _driverRefByNodeId.Clear(); foreach (var t in composition.EquipmentTags) { var key = (t.DriverInstanceId, t.FullName); @@ -630,6 +754,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers if (!_nodeIdByDriverRef.TryGetValue(key, out var list)) _nodeIdByDriverRef[key] = list = new List(); if (!list.Contains(nodeId)) list.Add(nodeId); + _driverRefByNodeId[nodeId] = key; } var total = 0; diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverHostActorWriteRoutingTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverHostActorWriteRoutingTests.cs new file mode 100644 index 00000000..91f751bd --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverHostActorWriteRoutingTests.cs @@ -0,0 +1,259 @@ +using System.Collections.Concurrent; +using System.Text.Json; +using Akka.Actor; +using Microsoft.EntityFrameworkCore; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Commons.Engines; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy; +using ZB.MOM.WW.OtOpcUa.Commons.Types; +using ZB.MOM.WW.OtOpcUa.Configuration; +using ZB.MOM.WW.OtOpcUa.Configuration.Entities; +using ZB.MOM.WW.OtOpcUa.Configuration.Enums; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Runtime.Drivers; +using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa; +using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers; + +/// +/// Verifies the inbound operator-write routing wired into : a +/// for a materialised equipment-variable NodeId resolves +/// the NodeId → (DriverInstanceId, FullName) reverse map (built alongside the forward map in +/// PushDesiredSubscriptions), gates on this node being the driver PRIMARY (reusing the same +/// RedundancyStateChanged signal the alarm-emit gate uses), forwards a +/// carrying the driver-side FullName to the +/// right driver child, and replies a to the asker. +/// +/// +/// Drives a real apply through the existing harness (same artifact shape as +/// DriverHostActorLiveValueTests) so the reverse map is populated authentically and a real +/// (non-stubbed) child is spawned. The child is backed by a +/// recording driver so the test can observe the forwarded write and assert +/// the no-write case on the secondary. There is no test seam to inject a TestProbe as a +/// driver child, so this end-to-end approach (recording driver) is the closest faithful test the +/// harness allows. +/// +/// +public sealed class DriverHostActorWriteRoutingTests : RuntimeActorTestBase +{ + private static readonly NodeId TestNode = NodeId.Parse("driver-wr-test"); + private static readonly RevisionHash RevA = RevisionHash.Parse(new string('a', 64)); + private static readonly TimeSpan Timeout = TimeSpan.FromSeconds(5); + + /// On the PRIMARY, a RouteNodeWrite for a mapped NodeId forwards the driver-side FullName to + /// the right driver child (observed via the recording driver) and replies NodeWriteResult(true). + [Fact] + public void Primary_routes_write_to_driver_by_full_name_and_replies_success() + { + var db = NewInMemoryDbFactory(); + var recorder = new RecordingDriverFactory("Modbus"); + // One equipment tag: eq-1, drv-1, FullName "40001", no folder, Name "speed" → NodeId "eq-1/speed". + var deploymentId = SeedDeploymentWithEquipmentTags(db, RevA, + (Equip: "eq-1", Driver: "drv-1", FullName: "40001", Folder: null, Name: "speed")); + + var actor = SpawnHostAndApply(db, deploymentId, recorder); + + // Local role unknown ⇒ treated as Primary ⇒ write allowed (default-emit semantics). + var asker = CreateTestProbe(); + actor.Tell(new DriverHostActor.RouteNodeWrite("eq-1/speed", 123.0), asker.Ref); + + var result = asker.ExpectMsg(Timeout); + result.Success.ShouldBeTrue(); + + // The driver received exactly the write, keyed by its wire-ref FullName (not the NodeId). + AwaitAssert(() => + { + recorder.Writes.Count.ShouldBe(1); + recorder.Writes[0].FullReference.ShouldBe("40001"); + recorder.Writes[0].Value.ShouldBe(123.0); + }, duration: Timeout); + } + + /// On a SECONDARY, RouteNodeWrite replies NodeWriteResult(false, "not primary") and the + /// driver receives NO write — the primary gate fires before the reverse-map lookup. + [Fact] + public void Secondary_rejects_write_and_does_not_forward_to_driver() + { + var db = NewInMemoryDbFactory(); + var recorder = new RecordingDriverFactory("Modbus"); + var deploymentId = SeedDeploymentWithEquipmentTags(db, RevA, + (Equip: "eq-1", Driver: "drv-1", FullName: "40001", Folder: null, Name: "speed")); + + var actor = SpawnHostAndApply(db, deploymentId, recorder); + + // Force this node Secondary so the primary gate rejects. + actor.Tell(new RedundancyStateChanged( + new[] + { + new NodeRedundancyState(TestNode, RedundancyRole.Secondary, + IsClusterLeader: false, IsRoleLeaderForDriver: false, AsOfUtc: DateTime.UtcNow), + }, + CorrelationId.NewId())); + + var asker = CreateTestProbe(); + actor.Tell(new DriverHostActor.RouteNodeWrite("eq-1/speed", 123.0), asker.Ref); + + var result = asker.ExpectMsg(Timeout); + result.Success.ShouldBeFalse(); + result.Reason.ShouldBe("not primary"); + + // No write reached the driver — the gate short-circuited before the reverse-map lookup. + recorder.Writes.ShouldBeEmpty(); + } + + /// An unknown NodeId (no reverse-map entry) replies NodeWriteResult(false) and writes nothing. + [Fact] + public void Unknown_node_id_replies_failure() + { + var db = NewInMemoryDbFactory(); + var recorder = new RecordingDriverFactory("Modbus"); + var deploymentId = SeedDeploymentWithEquipmentTags(db, RevA, + (Equip: "eq-1", Driver: "drv-1", FullName: "40001", Folder: null, Name: "speed")); + + var actor = SpawnHostAndApply(db, deploymentId, recorder); + + var asker = CreateTestProbe(); + actor.Tell(new DriverHostActor.RouteNodeWrite("eq-1/does-not-exist", 123.0), asker.Ref); + + var result = asker.ExpectMsg(Timeout); + result.Success.ShouldBeFalse(); + result.Reason.ShouldNotBeNull(); + recorder.Writes.ShouldBeEmpty(); + } + + /// Spawns the host with the recording driver factory, dispatches the deployment, and waits + /// for the Applied ACK so the apply (and thus the reverse-map build in PushDesiredSubscriptions) has + /// completed before the test routes a write. No OPC UA / mux probes are wired — this test exercises + /// only the write path, which doesn't depend on the publish actor. + private IActorRef SpawnHostAndApply( + IDbContextFactory db, DeploymentId deploymentId, IDriverFactory factory) + { + var coordinator = CreateTestProbe(); + var actor = Sys.ActorOf(DriverHostActor.Props( + db, TestNode, coordinator.Ref, + driverFactory: factory, + localRoles: new HashSet { "driver" })); + + actor.Tell(new DispatchDeployment(deploymentId, RevA, CorrelationId.NewId())); + coordinator.ExpectMsg(Timeout).Outcome.ShouldBe(ApplyAckOutcome.Applied); + + return actor; + } + + /// + /// Seeds a Sealed deployment whose artifact carries the minimal arrays + /// DeploymentArtifact.BuildEquipmentTagPlans needs to project equipment tags. Mirrors + /// DriverHostActorLiveValueTests.SeedDeploymentWithEquipmentTags but also carries a + /// DriverInstances row with a non-Windows-only DriverType ("Modbus") + Enabled flag + /// so a REAL (non-stubbed) child is spawned for the write path. + /// + private static DeploymentId SeedDeploymentWithEquipmentTags( + IDbContextFactory db, RevisionHash rev, + params (string Equip, string Driver, string FullName, string? Folder, string Name)[] tags) + { + var driverIds = tags.Select(t => t.Driver).Distinct(StringComparer.Ordinal).ToArray(); + + var artifact = JsonSerializer.SerializeToUtf8Bytes(new + { + Namespaces = new[] + { + new { NamespaceId = "ns-eq", Kind = 0 }, // NamespaceKind.Equipment = 0 + }, + DriverInstances = driverIds.Select(d => new + { + DriverInstanceRowId = Guid.NewGuid(), + DriverInstanceId = d, + Name = d, + DriverType = "Modbus", // not Windows-only ⇒ a real child is spawned (not stubbed) + Enabled = true, + DriverConfig = "{}", + NamespaceId = "ns-eq", + }).ToArray(), + Tags = tags.Select((t, i) => new + { + TagId = $"tag-{i}", + EquipmentId = t.Equip, + DriverInstanceId = t.Driver, + Name = t.Name, + FolderPath = t.Folder, + DataType = "Double", + TagConfig = JsonSerializer.Serialize(new { FullName = t.FullName }), + }).ToArray(), + }); + + var id = DeploymentId.NewId(); + using var ctx = db.CreateDbContext(); + ctx.Deployments.Add(new Deployment + { + DeploymentId = id.Value, + RevisionHash = rev.Value, + Status = DeploymentStatus.Sealed, + CreatedBy = "test", + SealedAtUtc = DateTime.UtcNow, + ArtifactBlob = artifact, + }); + ctx.SaveChanges(); + return id; + } + + /// Factory producing a single for the supported type, whose + /// recorded write list is exposed for assertions. + private sealed class RecordingDriverFactory : IDriverFactory + { + private readonly string _supportedType; + private readonly RecordingDriver _driver = new(); + public RecordingDriverFactory(string supportedType) { _supportedType = supportedType; } + + /// The writes the spawned driver received (thread-safe — WriteAsync runs off the actor thread). + public IReadOnlyList Writes => _driver.Writes; + + /// + public IDriver? TryCreate(string driverType, string driverInstanceId, string driverConfigJson) + { + if (!string.Equals(driverType, _supportedType, StringComparison.Ordinal)) return null; + _driver.Bind(driverInstanceId, driverType); + return _driver; + } + + /// + public IReadOnlyCollection SupportedTypes => new[] { _supportedType }; + } + + /// An + that records every write and returns Good. + private sealed class RecordingDriver : IDriver, IWritable + { + private readonly ConcurrentQueue _writes = new(); + /// + public string DriverInstanceId { get; private set; } = string.Empty; + /// + public string DriverType { get; private set; } = string.Empty; + /// The writes received so far. + public IReadOnlyList Writes => _writes.ToArray(); + /// Sets the identity once the factory is asked to create it. + public void Bind(string id, string type) { DriverInstanceId = id; DriverType = type; } + /// + public Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken) => Task.CompletedTask; + /// + public Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken) => Task.CompletedTask; + /// + public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask; + /// + public DriverHealth GetHealth() => new(DriverState.Healthy, DateTime.UtcNow, LastError: null); + /// + public long GetMemoryFootprint() => 0; + /// + public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask; + /// + public Task> WriteAsync( + IReadOnlyList writes, CancellationToken cancellationToken) + { + foreach (var w in writes) _writes.Enqueue(w); + return Task.FromResult>( + writes.Select(_ => new WriteResult(0u)).ToArray()); // 0x0 = Good + } + } +}