From 7fa863f6da28bd01973b9b03a69e28066030f44f Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 26 May 2026 09:43:06 -0400 Subject: [PATCH] =?UTF-8?q?feat(runtime):=20#113=20DependencyMuxActor=20?= =?UTF-8?q?=E2=80=94=20drivers=20=E2=86=92=20virtual-tag=20fan-out?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit End-to-end data path is now wired on the read side: driver subscriptions fire AttributeValuePublished → DriverHostActor → DependencyMuxActor → DependencyValueChanged to every interested VirtualTagActor. Previously the publish hit a dead-letter at the host. DependencyMuxActor: - Per-node fan-out router. Maintains tagRef → Set with a reverse subscriber → refs index so unregister/replace are O(refs). - Watches subscribers; Terminated triggers automatic unregister so dead virtual-tag actors stop receiving publishes. - Re-register replaces the prior interest set — no stale-ref leaks on actor restart. - Drops publishes for refs with no interested subscribers. VirtualTagActor: - New Props params: dependencyRefs + mux ActorRef. - PreStart sends RegisterInterest to the mux; PostStop sends UnregisterInterest. Default both null so older callers stay quiet. DriverHostActor: - New dependencyMux Props param. Steady + Applying states now receive AttributeValuePublished from their DriverInstance children and forward to the mux. Null mux is a no-op (dev/Mac). ServiceCollectionExtensions: - WithOtOpcUaRuntimeActors spawns DependencyMuxActor before DriverHostActor and threads its ActorRef into the host's Props. New DependencyMuxActorKey + DependencyMuxActorName. Tests: Runtime 57 -> 63 (+6): - Mux forwards to only subscribers interested in each ref - Publish for unregistered ref is dropped silently - Unregister stops forwarding - Re-register replaces prior interest set - VirtualTagActor PreStart registration drives end-to-end eval (uses AwaitAssert to race-safely settle the PreStart Tell) - DriverHostActor forwards AttributeValuePublished through to mux All 6 v2 test suites green: 163 tests passing. F8 (#79) state updated — dep subscribe seam shipped, Core.VirtualTags production engine binding (compile + ITagUpstreamSource subscribe) is the residual. --- ...-akka-hosting-alignment-plan.md.tasks.json | 2 +- .../Drivers/DriverHostActor.cs | 21 ++- .../ServiceCollectionExtensions.cs | 11 +- .../VirtualTags/DependencyMuxActor.cs | 107 ++++++++++++ .../VirtualTags/VirtualTagActor.cs | 29 +++- .../VirtualTags/DependencyMuxActorTests.cs | 155 ++++++++++++++++++ 6 files changed, 317 insertions(+), 8 deletions(-) create mode 100644 src/Server/ZB.MOM.WW.OtOpcUa.Runtime/VirtualTags/DependencyMuxActor.cs create mode 100644 tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/VirtualTags/DependencyMuxActorTests.cs diff --git a/docs/plans/2026-05-26-akka-hosting-alignment-plan.md.tasks.json b/docs/plans/2026-05-26-akka-hosting-alignment-plan.md.tasks.json index 392c6ec..85a9a82 100644 --- a/docs/plans/2026-05-26-akka-hosting-alignment-plan.md.tasks.json +++ b/docs/plans/2026-05-26-akka-hosting-alignment-plan.md.tasks.json @@ -82,7 +82,7 @@ {"id": "F5", "subject": "Follow-up: ConfigPublishCoordinator multi-node happy-path test", "status": "completed", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "commit": "5cfbe8b", "deviation": "Delivered by Task 59 — DeployHappyPathTests.StartDeployment_seals_after_both_nodes_apply exercises the exact 'dispatch to N driver nodes, all ack, seals' flow via the real 2-node TwoNodeClusterHarness rather than a multi-system TestKit. Cleaner because it tests the production code path end-to-end.", "origin": "Self-review of Task 30 — single-ActorSystem TestKit can't simulate the plan's 'dispatch to N driver nodes, all ack, seals' happy path because DiscoverDriverNodes() needs real cluster membership. Add a multi-system test (two ActorSystems joined into one cluster, driver-role on the second)."}, {"id": "F6", "subject": "Follow-up: RedundancyStateActor publisher abstraction so tests don't need DPS bootstrap", "status": "completed", "classification": "small", "estMinutes": 10, "parallelizableWith": [], "blockedBy": [], "commit": "dfc143c", "origin": "Self-review of Task 35 — RedundancyStateActorTests are skipped because single-node DistributedPubSub bootstrap is unreliable in TestKit. Inject an Action broadcast so tests can replace it with a probe; un-skip both tests."}, {"id": "F7", "subject": "Follow-up: DriverInstanceActor full engine wiring (subscriptions, writes, ApplyDelta diff)", "status": "completed", "classification": "standard", "estMinutes": 45, "parallelizableWith": [], "blockedBy": [44], "origin": "Self-review of Task 41 — subscription publishing, ApplyDelta diffing, bad-quality-on-disconnect, write path, and supervisor backoff are stubbed. Wire after OpcUaPublishActor lands.", "shipped": "All three pieces landed: (1) spawn lifecycle in DriverHostActor (DriverSpawnPlanner + IDriverFactory seam) — da14149, (2) ISubscribable wiring + OPC UA status-code → OpcUaQuality severity-bit mapping + DetachSubscription on disconnect/PostStop, (3) IWritable.WriteAsync write path with 5s timeout, status-code bubble-up, and AttributeValuePublished published to parent on every OnDataChange — both shipped in the F7-residual batch. Host DI binding (DriverFactoryBootstrap registers AbCip/AbLegacy/FOCAS/Galaxy/Modbus/S7/TwinCAT factories) lives in src/Server/ZB.MOM.WW.OtOpcUa.Host/Drivers/."}, - {"id": "F8", "subject": "Follow-up: VirtualTagActor engine wiring (compile expression, subscribe deps, publish result)", "status": "partial", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 42 — VirtualTagEngine.Evaluate not called; DependencyValueChanged just buffers.", "shipped": "IVirtualTagEvaluator seam in Commons.Engines + NullVirtualTagEvaluator default. VirtualTagActor calls evaluator on DependencyValueChanged, dedupes unchanged results, emits EvaluationResult to parent, publishes ScriptLogEntry Warning to script-logs DPS topic on evaluator failure. Production binding to Core.VirtualTags.VirtualTagEngine still TODO (compile + ITagUpstreamSource subscribe) — split as F8b."}, + {"id": "F8", "subject": "Follow-up: VirtualTagActor engine wiring (compile expression, subscribe deps, publish result)", "status": "partial", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 42 — VirtualTagEngine.Evaluate not called; DependencyValueChanged just buffers.", "shipped": "(1) IVirtualTagEvaluator seam + NullVirtualTagEvaluator default. VirtualTagActor calls evaluator on DependencyValueChanged, dedupes unchanged results, emits EvaluationResult to parent, publishes Warning ScriptLogEntry on failure. (2) DependencyMuxActor in Runtime fans out DriverInstanceActor.AttributeValuePublished from DriverHostActor through to interested VirtualTagActor subscribers. VirtualTagActor takes dependencyRefs + mux ActorRef in Props, registers interest in PreStart, unregisters in PostStop. WithOtOpcUaRuntimeActors spawns the mux + threads it into DriverHostActor. Production binding to Core.VirtualTags.VirtualTagEngine (expression compile + dep extraction) still TODO — split as F8b."}, {"id": "F9", "subject": "Follow-up: ScriptedAlarmActor engine wiring + state persistence", "status": "partial", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 43 — AlarmConditionService not called; PreRestart persistence to ScriptedAlarmState DB not wired; HistorianAdapter rows not emitted.", "shipped": "(1) IScriptedAlarmEvaluator seam + NullScriptedAlarmEvaluator default. ScriptedAlarmActor takes AlarmConfig (id/name/path/severity/predicate), evaluates on DependencyValueChanged, publishes AlarmTransitionEvent + ScriptLogEntry on every transition. (2) IAlarmActorStateStore seam in Commons.Engines + NullAlarmActorStateStore default + EfAlarmActorStateStore production adapter over the ScriptedAlarmState entity. ScriptedAlarmActor PreStart loads + restores; every Transition fires a fire-and-forget save with lastAckUser. Predicate binding to Core.ScriptedAlarms.ScriptedAlarmEngine still TODO — split as F9b."}, {"id": "F10", "subject": "Follow-up: OpcUaPublishActor SDK integration (address-space writes + ServiceLevel + RebuildAddressSpace)", "status": "partial", "classification": "high-risk", "estMinutes": 60, "parallelizableWith": [], "blockedBy": [47], "origin": "Self-review of Task 44 — SDK calls stubbed; counters only. Wire after Phase 7 OpcUaServer extraction.", "shipped": "IOpcUaAddressSpaceSink + IServiceLevelPublisher seams in Commons.OpcUa with Null* defaults. OpcUaPublishActor routes AttributeValueUpdate/AlarmStateUpdate/RebuildAddressSpace to the sink, dedupes ServiceLevelChanged, subscribes to redundancy-state DPS topic, and maps per-local-node redundancy snapshot to a coarse ServiceLevel (Primary+leader=240, Primary=200, Secondary=100, Detached=0). Production binding to a real SDK NodeManager + Variable nodes still TODO — split as F10b. Task 60 still blocked on F10b."}, {"id": "F11", "subject": "Follow-up: HistorianAdapterActor named-pipe IPC + SqliteStoreAndForwardSink wiring", "status": "completed", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "commit": "6861381", "deviationNotes": "Reshaped HistorianAdapterActor around the existing IAlarmHistorianSink abstraction (alarm-event shape, not the original tag-history-row stub). Defaults to NullAlarmHistorianSink; production deployments wire SqliteStoreAndForwardSink + WonderwareHistorianClient via AddOtOpcUaRuntime overrides. Actor now exposes GetStatus returning HistorianSinkStatus for diagnostics. Named-pipe transport implementation lives unchanged in src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/WonderwareHistorianClient.cs — the actor is intentionally just a fire-and-forget bridge.", "origin": "Self-review of Task 45 — stub buffers in-memory; named-pipe + SQLite store-and-forward not wired."}, diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs index cffa90e..28af2ae 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs @@ -41,6 +41,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers private readonly IActorRef? _coordinatorOverride; private readonly IDriverFactory _driverFactory; private readonly IReadOnlySet _localRoles; + private readonly IActorRef? _dependencyMux; private readonly ILoggingAdapter _log = Context.GetLogger(); private RevisionHash? _currentRevision; @@ -63,21 +64,25 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers CommonsNodeId localNode, IActorRef? coordinator = null, IDriverFactory? driverFactory = null, - IReadOnlySet? localRoles = null) => - Akka.Actor.Props.Create(() => new DriverHostActor(dbFactory, localNode, coordinator, driverFactory, localRoles)); + IReadOnlySet? localRoles = null, + IActorRef? dependencyMux = null) => + Akka.Actor.Props.Create(() => new DriverHostActor( + dbFactory, localNode, coordinator, driverFactory, localRoles, dependencyMux)); public DriverHostActor( IDbContextFactory dbFactory, CommonsNodeId localNode, IActorRef? coordinator, IDriverFactory? driverFactory = null, - IReadOnlySet? localRoles = null) + IReadOnlySet? localRoles = null, + IActorRef? dependencyMux = null) { _dbFactory = dbFactory; _localNode = localNode; _coordinatorOverride = coordinator; _driverFactory = driverFactory ?? NullDriverFactory.Instance; _localRoles = localRoles ?? new HashSet(StringComparer.Ordinal); + _dependencyMux = dependencyMux; // Default behavior is Steady — PreStart may flip to Stale or replay an orphan apply. Become(Steady); @@ -150,6 +155,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers { Receive(HandleDispatchFromSteady); Receive(HandleGetDiagnostics); + Receive(ForwardToMux); Receive(_ => { /* PubSub ack */ }); } @@ -168,9 +174,18 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers Self.Forward(msg); // re-deliver after we transition back }); Receive(HandleGetDiagnostics); + Receive(ForwardToMux); Receive(_ => { /* PubSub ack */ }); } + private void ForwardToMux(DriverInstanceActor.AttributeValuePublished msg) + { + // Pass driver-published values to the dependency mux when one is wired. Without a mux, + // VirtualTagActor evaluation can't fire — values just drop here. That's the dev/Mac path + // (no virtual tags registered); production binds the mux via the RuntimeActors extension. + _dependencyMux?.Tell(msg); + } + private void Stale() { Receive(_ => diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs index 54b897c..3fab542 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs @@ -10,6 +10,7 @@ using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian; using ZB.MOM.WW.OtOpcUa.Runtime.Drivers; using ZB.MOM.WW.OtOpcUa.Runtime.Health; using ZB.MOM.WW.OtOpcUa.Runtime.Historian; +using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags; namespace ZB.MOM.WW.OtOpcUa.Runtime; @@ -20,6 +21,7 @@ public static class ServiceCollectionExtensions public const string DriverHostActorName = "driver-host"; public const string DbHealthProbeActorName = "db-health"; public const string HistorianAdapterActorName = "historian-adapter"; + public const string DependencyMuxActorName = "dependency-mux"; /// /// Registers shared runtime services. Currently binds @@ -65,9 +67,15 @@ public static class ServiceCollectionExtensions DbHealthProbeActorName); registry.Register(dbHealth); + // Dependency mux must be spawned before DriverHostActor so the host can forward + // AttributeValuePublished into it from the very first driver spawn. + var mux = system.ActorOf(DependencyMuxActor.Props(), DependencyMuxActorName); + registry.Register(mux); + var driverHost = system.ActorOf( DriverHostActor.Props(dbFactory, roleInfo.LocalNode, coordinator: null, - driverFactory: driverFactory, localRoles: roleInfo.LocalRoles), + driverFactory: driverFactory, localRoles: roleInfo.LocalRoles, + dependencyMux: mux), DriverHostActorName); registry.Register(driverHost); @@ -85,3 +93,4 @@ public static class ServiceCollectionExtensions public sealed class DriverHostActorKey { } public sealed class DbHealthProbeActorKey { } public sealed class HistorianAdapterActorKey { } +public sealed class DependencyMuxActorKey { } diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/VirtualTags/DependencyMuxActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/VirtualTags/DependencyMuxActor.cs new file mode 100644 index 0000000..d7c9360 --- /dev/null +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/VirtualTags/DependencyMuxActor.cs @@ -0,0 +1,107 @@ +using Akka.Actor; +using Akka.Event; +using ZB.MOM.WW.OtOpcUa.Runtime.Drivers; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags; + +/// +/// Per-node fan-out router from +/// to interested instances. VirtualTagActor sends +/// on start-up listing the tag refs it depends on; the mux +/// keeps a map of tagRef → Set<IActorRef> and on every AttributeValuePublished +/// forwards a to each interested +/// subscriber. +/// +/// DriverHostActor forwards every AttributeValuePublished it receives from its +/// DriverInstanceActor children to this mux (one mux per driver-role node). The mux is +/// deliberately not a DPS subscriber — virtual-tag evaluation is local to each node and +/// would over-emit if it spanned the cluster. +/// +public sealed class DependencyMuxActor : ReceiveActor +{ + public const string ActorName = "dependency-mux"; + + /// Register a subscriber's interest in a set of tag refs. Idempotent on re-register — + /// the second call replaces the prior interest set for that subscriber. + public sealed record RegisterInterest(IReadOnlyList TagRefs, IActorRef Subscriber); + + /// Unregister every interest held by . Sent on PostStop by + /// the subscriber, or by Terminated handling when the mux watches. + public sealed record UnregisterInterest(IActorRef Subscriber); + + private readonly Dictionary> _byRef = new(StringComparer.Ordinal); + private readonly Dictionary> _bySubscriber = new(); + private readonly ILoggingAdapter _log = Context.GetLogger(); + + public static Props Props() => Akka.Actor.Props.Create(); + + public DependencyMuxActor() + { + Receive(OnRegister); + Receive(msg => RemoveSubscriber(msg.Subscriber)); + Receive(OnAttributeValuePublished); + Receive(msg => RemoveSubscriber(msg.ActorRef)); + } + + private void OnRegister(RegisterInterest msg) + { + // Replace any prior interest set so re-registers on actor restart don't leak old refs. + if (_bySubscriber.TryGetValue(msg.Subscriber, out var priorRefs)) + { + foreach (var r in priorRefs) + { + if (_byRef.TryGetValue(r, out var set)) + { + set.Remove(msg.Subscriber); + if (set.Count == 0) _byRef.Remove(r); + } + } + } + + var refs = new HashSet(msg.TagRefs, StringComparer.Ordinal); + _bySubscriber[msg.Subscriber] = refs; + foreach (var r in refs) + { + if (!_byRef.TryGetValue(r, out var set)) + { + set = new HashSet(); + _byRef[r] = set; + } + set.Add(msg.Subscriber); + } + Context.Watch(msg.Subscriber); + _log.Debug("DependencyMux: subscriber {Sub} registered for {Count} refs", msg.Subscriber, refs.Count); + } + + private void RemoveSubscriber(IActorRef subscriber) + { + if (!_bySubscriber.TryGetValue(subscriber, out var refs)) return; + foreach (var r in refs) + { + if (_byRef.TryGetValue(r, out var set)) + { + set.Remove(subscriber); + if (set.Count == 0) _byRef.Remove(r); + } + } + _bySubscriber.Remove(subscriber); + Context.Unwatch(subscriber); + _log.Debug("DependencyMux: subscriber {Sub} removed", subscriber); + } + + private void OnAttributeValuePublished(DriverInstanceActor.AttributeValuePublished msg) + { + if (!_byRef.TryGetValue(msg.FullReference, out var subscribers) || subscribers.Count == 0) + { + // No virtual tag cares about this ref — drop. Common in normal operation; the address + // space carries thousands of tags and only a fraction feed virtual-tag expressions. + return; + } + var dep = new VirtualTagActor.DependencyValueChanged(msg.FullReference, msg.Value, msg.TimestampUtc); + foreach (var sub in subscribers) + { + sub.Tell(dep); + } + } + +} diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/VirtualTags/VirtualTagActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/VirtualTags/VirtualTagActor.cs index c2cb1d9..01194a8 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/VirtualTags/VirtualTagActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/VirtualTags/VirtualTagActor.cs @@ -26,6 +26,8 @@ public sealed class VirtualTagActor : ReceiveActor private readonly string _expression; private readonly IVirtualTagEvaluator _evaluator; private readonly Func? _publisherFactory; + private readonly IReadOnlyList _dependencyRefs; + private readonly IActorRef? _mux; private readonly ILoggingAdapter _log = Context.GetLogger(); private readonly Dictionary _dependencies = new(StringComparer.Ordinal); @@ -37,29 +39,50 @@ public sealed class VirtualTagActor : ReceiveActor string expression, IVirtualTagEvaluator? evaluator = null, string? scriptId = null, - Func? publisherFactory = null) => + Func? publisherFactory = null, + IReadOnlyList? dependencyRefs = null, + IActorRef? mux = null) => Akka.Actor.Props.Create(() => new VirtualTagActor( virtualTagId, expression, evaluator ?? NullVirtualTagEvaluator.Instance, scriptId ?? virtualTagId, - publisherFactory)); + publisherFactory, + dependencyRefs ?? Array.Empty(), + mux)); public VirtualTagActor( string virtualTagId, string expression, IVirtualTagEvaluator evaluator, string scriptId, - Func? publisherFactory) + Func? publisherFactory, + IReadOnlyList dependencyRefs, + IActorRef? mux) { _virtualTagId = virtualTagId; _scriptId = scriptId; _expression = expression; _evaluator = evaluator; _publisherFactory = publisherFactory; + _dependencyRefs = dependencyRefs; + _mux = mux; Receive(OnDependencyChanged); } + protected override void PreStart() + { + if (_mux is not null && _dependencyRefs.Count > 0) + { + _mux.Tell(new DependencyMuxActor.RegisterInterest(_dependencyRefs, Self)); + } + } + + protected override void PostStop() + { + _mux?.Tell(new DependencyMuxActor.UnregisterInterest(Self)); + } + private void OnDependencyChanged(DependencyValueChanged msg) { _dependencies[msg.TagId] = msg.Value; diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/VirtualTags/DependencyMuxActorTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/VirtualTags/DependencyMuxActorTests.cs new file mode 100644 index 0000000..bbb3e7f --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/VirtualTags/DependencyMuxActorTests.cs @@ -0,0 +1,155 @@ +using Akka.Actor; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; +using ZB.MOM.WW.OtOpcUa.Runtime.Drivers; +using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; +using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.VirtualTags; + +public sealed class DependencyMuxActorTests : RuntimeActorTestBase +{ + [Fact] + public void AttributeValuePublished_is_forwarded_only_to_subscribers_interested_in_that_ref() + { + var mux = Sys.ActorOf(DependencyMuxActor.Props()); + var subA = CreateTestProbe(); + var subB = CreateTestProbe(); + + mux.Tell(new DependencyMuxActor.RegisterInterest(new[] { "tag-1", "tag-2" }, subA.Ref)); + mux.Tell(new DependencyMuxActor.RegisterInterest(new[] { "tag-2", "tag-3" }, subB.Ref)); + + mux.Tell(new DriverInstanceActor.AttributeValuePublished("tag-1", 10, OpcUaQuality.Good, DateTime.UtcNow)); + mux.Tell(new DriverInstanceActor.AttributeValuePublished("tag-3", 30, OpcUaQuality.Good, DateTime.UtcNow)); + mux.Tell(new DriverInstanceActor.AttributeValuePublished("tag-2", 20, OpcUaQuality.Good, DateTime.UtcNow)); + + // subA hears tag-1 + tag-2. + var aMsgs = new[] + { + subA.ExpectMsg(), + subA.ExpectMsg(), + }.OrderBy(m => m.TagId).ToList(); + aMsgs.Select(m => m.TagId).ShouldBe(new[] { "tag-1", "tag-2" }); + + // subB hears tag-3 + tag-2. + var bMsgs = new[] + { + subB.ExpectMsg(), + subB.ExpectMsg(), + }.OrderBy(m => m.TagId).ToList(); + bMsgs.Select(m => m.TagId).ShouldBe(new[] { "tag-2", "tag-3" }); + + subA.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + subB.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + } + + [Fact] + public void Publish_for_unregistered_ref_is_silently_dropped() + { + var mux = Sys.ActorOf(DependencyMuxActor.Props()); + var sub = CreateTestProbe(); + + mux.Tell(new DependencyMuxActor.RegisterInterest(new[] { "tag-1" }, sub.Ref)); + mux.Tell(new DriverInstanceActor.AttributeValuePublished("nobody-cares", 99, OpcUaQuality.Good, DateTime.UtcNow)); + + sub.ExpectNoMsg(TimeSpan.FromMilliseconds(200)); + } + + [Fact] + public void UnregisterInterest_stops_forwarding() + { + var mux = Sys.ActorOf(DependencyMuxActor.Props()); + var sub = CreateTestProbe(); + + mux.Tell(new DependencyMuxActor.RegisterInterest(new[] { "tag-1" }, sub.Ref)); + mux.Tell(new DriverInstanceActor.AttributeValuePublished("tag-1", 10, OpcUaQuality.Good, DateTime.UtcNow)); + sub.ExpectMsg(); + + mux.Tell(new DependencyMuxActor.UnregisterInterest(sub.Ref)); + mux.Tell(new DriverInstanceActor.AttributeValuePublished("tag-1", 20, OpcUaQuality.Good, DateTime.UtcNow)); + sub.ExpectNoMsg(TimeSpan.FromMilliseconds(200)); + } + + [Fact] + public void Re_register_replaces_prior_interest_set() + { + var mux = Sys.ActorOf(DependencyMuxActor.Props()); + var sub = CreateTestProbe(); + + mux.Tell(new DependencyMuxActor.RegisterInterest(new[] { "tag-1" }, sub.Ref)); + mux.Tell(new DependencyMuxActor.RegisterInterest(new[] { "tag-2" }, sub.Ref)); // replaces tag-1 + + mux.Tell(new DriverInstanceActor.AttributeValuePublished("tag-1", 10, OpcUaQuality.Good, DateTime.UtcNow)); + sub.ExpectNoMsg(TimeSpan.FromMilliseconds(200)); + + mux.Tell(new DriverInstanceActor.AttributeValuePublished("tag-2", 20, OpcUaQuality.Good, DateTime.UtcNow)); + sub.ExpectMsg().TagId.ShouldBe("tag-2"); + } + + [Fact] + public void VirtualTagActor_PreStart_registers_deps_with_mux_and_eval_fires_end_to_end() + { + var mux = Sys.ActorOf(DependencyMuxActor.Props()); + var parent = CreateTestProbe(); + var evaluator = new EchoSumEvaluator(); + var actor = parent.ChildActorOf(VirtualTagActor.Props( + "vt-1", "a+b", + evaluator: evaluator, + dependencyRefs: new[] { "ref-a", "ref-b" }, + mux: mux)); + + // Race-safe end-to-end check: AwaitAssert retries until the PreStart RegisterInterest + // has actually landed at the mux + the publish has fanned out. Until then the publish + // gets dropped (no subscriber for "ref-a" yet), so we re-publish each pass. + AwaitAssert(() => + { + mux.Tell(new DriverInstanceActor.AttributeValuePublished( + "ref-a", 10, OpcUaQuality.Good, DateTime.UtcNow)); + parent.ExpectMsg(TimeSpan.FromMilliseconds(200)) + .Value.ShouldBe(10); + }, duration: TimeSpan.FromSeconds(3)); + + // From here the actor is wired — second publish drives the sum. + mux.Tell(new DriverInstanceActor.AttributeValuePublished("ref-b", 32, OpcUaQuality.Good, DateTime.UtcNow)); + parent.ExpectMsg(TimeSpan.FromSeconds(2)).Value.ShouldBe(42); + + // Unrelated ref shouldn't fire eval. + mux.Tell(new DriverInstanceActor.AttributeValuePublished("ref-unrelated", 99, OpcUaQuality.Good, DateTime.UtcNow)); + parent.ExpectNoMsg(TimeSpan.FromMilliseconds(200)); + } + + [Fact] + public void DriverHostActor_forwards_AttributeValuePublished_through_to_mux() + { + // Spin a mux + a stand-in for DriverHostActor that wraps the real DriverHostActor's + // forward path. We feed it AttributeValuePublished directly to verify the routing — + // exercising the actual DriverHost spawn would require a deployment artifact + DI. + var mux = Sys.ActorOf(DependencyMuxActor.Props()); + var subscriber = CreateTestProbe(); + mux.Tell(new DependencyMuxActor.RegisterInterest(new[] { "ref-1" }, subscriber.Ref)); + + var hostProbe = CreateTestProbe(); + var hostActor = Sys.ActorOf(DriverHostActor.Props( + dbFactory: NewInMemoryDbFactory(), + localNode: ZB.MOM.WW.OtOpcUa.Commons.Types.NodeId.Parse("host-1"), + coordinator: hostProbe.Ref, + dependencyMux: mux)); + + // Tell the host an AttributeValuePublished — it should fan out to the mux + subscriber. + hostActor.Tell(new DriverInstanceActor.AttributeValuePublished( + "ref-1", 42, OpcUaQuality.Good, DateTime.UtcNow)); + + subscriber.ExpectMsg().TagId.ShouldBe("ref-1"); + } + + private sealed class EchoSumEvaluator : ZB.MOM.WW.OtOpcUa.Commons.Engines.IVirtualTagEvaluator + { + public ZB.MOM.WW.OtOpcUa.Commons.Engines.VirtualTagEvalResult Evaluate( + string id, string expression, IReadOnlyDictionary deps) + { + var sum = deps.Values.OfType().Sum(); + return ZB.MOM.WW.OtOpcUa.Commons.Engines.VirtualTagEvalResult.Ok(sum); + } + } +}