diff --git a/docs/plans/2026-05-26-akka-hosting-alignment-plan.md.tasks.json b/docs/plans/2026-05-26-akka-hosting-alignment-plan.md.tasks.json index 1a16ff1..dbfd668 100644 --- a/docs/plans/2026-05-26-akka-hosting-alignment-plan.md.tasks.json +++ b/docs/plans/2026-05-26-akka-hosting-alignment-plan.md.tasks.json @@ -84,7 +84,7 @@ {"id": "F7", "subject": "Follow-up: DriverInstanceActor full engine wiring (subscriptions, writes, ApplyDelta diff)", "status": "partial", "classification": "standard", "estMinutes": 45, "parallelizableWith": [], "blockedBy": [44], "origin": "Self-review of Task 41 — subscription publishing, ApplyDelta diffing, bad-quality-on-disconnect, write path, and supervisor backoff are stubbed. Wire after OpcUaPublishActor lands.", "shipped": "Spawn lifecycle in DriverHostActor: artifact parsing, DriverSpawnPlanner pure-diff (spawn/delta/stop), IDriverFactory abstraction in Core.Abstractions with NullDriverFactory + DriverFactoryRegistryAdapter, ApplyDelta forwarded to children. Subscription publishing + write path still stubbed — split into F7-sub (subscribe + write)."}, {"id": "F8", "subject": "Follow-up: VirtualTagActor engine wiring (compile expression, subscribe deps, publish result)", "status": "partial", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 42 — VirtualTagEngine.Evaluate not called; DependencyValueChanged just buffers.", "shipped": "IVirtualTagEvaluator seam in Commons.Engines + NullVirtualTagEvaluator default. VirtualTagActor calls evaluator on DependencyValueChanged, dedupes unchanged results, emits EvaluationResult to parent, publishes ScriptLogEntry Warning to script-logs DPS topic on evaluator failure. Production binding to Core.VirtualTags.VirtualTagEngine still TODO (compile + ITagUpstreamSource subscribe) — split as F8b."}, {"id": "F9", "subject": "Follow-up: ScriptedAlarmActor engine wiring + state persistence", "status": "partial", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 43 — AlarmConditionService not called; PreRestart persistence to ScriptedAlarmState DB not wired; HistorianAdapter rows not emitted.", "shipped": "IScriptedAlarmEvaluator seam in Commons.Engines + NullScriptedAlarmEvaluator default. ScriptedAlarmActor takes AlarmConfig (id/name/path/severity/predicate), calls evaluator on DependencyValueChanged, emits AlarmTransitionEvent on alerts DPS topic + ScriptLogEntry on script-logs at every transition (Activated/Acknowledged/Cleared with user attribution). Predicate binding to Core.ScriptedAlarms + ScriptedAlarmState DB persistence still TODO — split as F9b."}, - {"id": "F10", "subject": "Follow-up: OpcUaPublishActor SDK integration (address-space writes + ServiceLevel + RebuildAddressSpace)", "status": "pending", "classification": "high-risk", "estMinutes": 60, "parallelizableWith": [], "blockedBy": [47], "origin": "Self-review of Task 44 — SDK calls stubbed; counters only. Wire after Phase 7 OpcUaServer extraction."}, + {"id": "F10", "subject": "Follow-up: OpcUaPublishActor SDK integration (address-space writes + ServiceLevel + RebuildAddressSpace)", "status": "partial", "classification": "high-risk", "estMinutes": 60, "parallelizableWith": [], "blockedBy": [47], "origin": "Self-review of Task 44 — SDK calls stubbed; counters only. Wire after Phase 7 OpcUaServer extraction.", "shipped": "IOpcUaAddressSpaceSink + IServiceLevelPublisher seams in Commons.OpcUa with Null* defaults. OpcUaPublishActor routes AttributeValueUpdate/AlarmStateUpdate/RebuildAddressSpace to the sink, dedupes ServiceLevelChanged, subscribes to redundancy-state DPS topic, and maps per-local-node redundancy snapshot to a coarse ServiceLevel (Primary+leader=240, Primary=200, Secondary=100, Detached=0). Production binding to a real SDK NodeManager + Variable nodes still TODO — split as F10b. Task 60 still blocked on F10b."}, {"id": "F11", "subject": "Follow-up: HistorianAdapterActor named-pipe IPC + SqliteStoreAndForwardSink wiring", "status": "completed", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "commit": "6861381", "deviationNotes": "Reshaped HistorianAdapterActor around the existing IAlarmHistorianSink abstraction (alarm-event shape, not the original tag-history-row stub). Defaults to NullAlarmHistorianSink; production deployments wire SqliteStoreAndForwardSink + WonderwareHistorianClient via AddOtOpcUaRuntime overrides. Actor now exposes GetStatus returning HistorianSinkStatus for diagnostics. Named-pipe transport implementation lives unchanged in src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/WonderwareHistorianClient.cs — the actor is intentionally just a fire-and-forget bridge.", "origin": "Self-review of Task 45 — stub buffers in-memory; named-pipe + SQLite store-and-forward not wired."}, {"id": "F12", "subject": "Follow-up: PeerOpcUaProbeActor real opc.tcp ping (replace Ok=true stub)", "status": "completed", "classification": "small", "estMinutes": 20, "parallelizableWith": [], "blockedBy": [], "commit": "b06e3ae", "deviation": "TCP-connect probe rather than full OPC UA Hello/Acknowledge handshake. Enough for the redundancy calc; deeper liveness signals can layer on later without changing the actor's contract.", "origin": "Self-review of Task 45 — RunProbe always returns Ok=true; replace with OPC UA Client connect."}, {"id": "F13", "subject": "Follow-up: Full OpcUaApplicationHost extraction (security/alarms/history/observability)", "status": "partial", "classification": "high-risk", "estMinutes": 120, "parallelizableWith": [], "blockedBy": [], "commit": "36c4751-partial", "deviationNotes": "F13a (cert auto-creation) shipped in 36c4751. Remaining: endpoint-security wiring (SecurityProfileResolver into ServerConfiguration.SecurityPolicies), LDAP user-token validator (the OPC UA UserNameToken path; HTTP-layer LDAP auth is separate and already in OtOpcUa.Security), scripted-alarm node manager creation, history backend wiring, observability hooks (OpenTelemetry metrics + traces). These are gated by F10's OpcUaPublishActor SDK integration — until F10 lands, nothing instantiates OpcUaApplicationHost so the missing wiring is dead weight.", "origin": "Self-review of Task 46 — facade only boots ApplicationInstance + StandardServer. Legacy 391-line file pulls Server.Security/Alarms/History/Observability. Pull those into thin OpcUaServer interfaces."}, diff --git a/src/Core/ZB.MOM.WW.OtOpcUa.Commons/OpcUa/IOpcUaAddressSpaceSink.cs b/src/Core/ZB.MOM.WW.OtOpcUa.Commons/OpcUa/IOpcUaAddressSpaceSink.cs new file mode 100644 index 0000000..76ac52f --- /dev/null +++ b/src/Core/ZB.MOM.WW.OtOpcUa.Commons/OpcUa/IOpcUaAddressSpaceSink.cs @@ -0,0 +1,37 @@ +namespace ZB.MOM.WW.OtOpcUa.Commons.OpcUa; + +/// +/// Abstraction over the OPC UA SDK's address space. OpcUaPublishActor consumes this +/// so the Runtime project doesn't reference Opc.Ua.Server directly — production +/// binds a real SDK-backed sink in the fused Host's wiring, dev/Mac binds the +/// no-op. +/// +public interface IOpcUaAddressSpaceSink +{ + /// Write a Variable node's current value + quality + source timestamp. + void WriteValue(string nodeId, object? value, OpcUaQuality quality, DateTime sourceTimestampUtc); + + /// Write an alarm-condition Variable's active/acknowledged state. + void WriteAlarmState(string alarmNodeId, bool active, bool acknowledged, DateTime sourceTimestampUtc); + + /// + /// Tear down + repopulate the address space. Called by OpcUaPublishActor after a + /// successful deployment apply so the node manager reflects the new config. Idempotent. + /// + void RebuildAddressSpace(); +} + +/// OPC UA status code projection — Good / Uncertain / Bad. Real SDK has finer-grained +/// codes; the engine actors only need this 3-state classification. +public enum OpcUaQuality { Good, Uncertain, Bad } + +/// No-op sink. Bound by default so the actors are safe to run in dev / Mac / +/// integration tests without a real SDK behind them. +public sealed class NullOpcUaAddressSpaceSink : IOpcUaAddressSpaceSink +{ + public static readonly NullOpcUaAddressSpaceSink Instance = new(); + private NullOpcUaAddressSpaceSink() { } + public void WriteValue(string nodeId, object? value, OpcUaQuality quality, DateTime sourceTimestampUtc) { } + public void WriteAlarmState(string alarmNodeId, bool active, bool acknowledged, DateTime sourceTimestampUtc) { } + public void RebuildAddressSpace() { } +} diff --git a/src/Core/ZB.MOM.WW.OtOpcUa.Commons/OpcUa/IServiceLevelPublisher.cs b/src/Core/ZB.MOM.WW.OtOpcUa.Commons/OpcUa/IServiceLevelPublisher.cs new file mode 100644 index 0000000..67f66e0 --- /dev/null +++ b/src/Core/ZB.MOM.WW.OtOpcUa.Commons/OpcUa/IServiceLevelPublisher.cs @@ -0,0 +1,22 @@ +namespace ZB.MOM.WW.OtOpcUa.Commons.OpcUa; + +/// +/// Writes the OPC UA Server object's ServiceLevel Variable (0–255). Production binds +/// a sink that pokes the SDK's ServiceLevel node; tests + dev mode bind +/// which just records the most recently set level +/// for inspection. +/// +public interface IServiceLevelPublisher +{ + void Publish(byte serviceLevel); +} + +/// No-op default that retains the last-written ServiceLevel in +/// . Used by dev mode + verified by tests. +public sealed class NullServiceLevelPublisher : IServiceLevelPublisher +{ + public static readonly NullServiceLevelPublisher Instance = new(); + private NullServiceLevelPublisher() { } + public byte LastPublished { get; private set; } + public void Publish(byte serviceLevel) => LastPublished = serviceLevel; +} diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs index f02a0c0..05f8f99 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs @@ -1,70 +1,178 @@ using Akka.Actor; +using Akka.Cluster.Tools.PublishSubscribe; using Akka.Event; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy; +using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; using ZB.MOM.WW.OtOpcUa.Commons.Types; namespace ZB.MOM.WW.OtOpcUa.Runtime.OpcUa; /// -/// Single-threaded bridge between Akka messages and the OPC UA SDK address space. Hosted on -/// the pinned opcua-synchronized-dispatcher (Task 19 HOCON) so the OPC UA SDK sees -/// only one thread per actor instance — its session/subscription locks expect strict -/// single-threaded access. +/// Single-threaded bridge between Akka messages and the OPC UA SDK address space. Hosted on +/// the pinned opcua-synchronized-dispatcher (Task 19 HOCON) so the OPC UA SDK sees +/// only one thread per actor instance — its session/subscription locks expect strict +/// single-threaded access. /// -/// Engine wiring (call into OpcUaApplicationHost address-space writes, manage -/// ServiceLevel + ServerUriArray nodes, subscribe to the redundancy-state -/// DistributedPubSub topic) is staged for follow-up F10. This skeleton compiles + exposes the -/// message contracts so producers (DriverInstance, VirtualTag, ScriptedAlarm) can target it. +/// Address-space writes route through ; ServiceLevel +/// writes route through . Production binds SDK-backed +/// implementations; dev/Mac/tests bind the Null* defaults so the actor stays decoupled from +/// Opc.Ua.Server. The remaining piece is wiring those bindings to a real +/// StandardServer address space — tracked as F10b. /// public sealed class OpcUaPublishActor : ReceiveActor { public const string DispatcherId = "opcua-synchronized-dispatcher"; + public const string RedundancyStateTopic = "redundancy-state"; public sealed record AttributeValueUpdate(string NodeId, object? Value, OpcUaQuality Quality, DateTime TimestampUtc); public sealed record AlarmStateUpdate(string AlarmNodeId, bool Active, bool Acknowledged, DateTime TimestampUtc); public sealed record RebuildAddressSpace(CorrelationId Correlation); public sealed record ServiceLevelChanged(byte ServiceLevel); - public enum OpcUaQuality { Good, Uncertain, Bad } - + private readonly IOpcUaAddressSpaceSink _sink; + private readonly IServiceLevelPublisher _serviceLevel; + private readonly bool _subscribeRedundancyTopic; + private readonly NodeId? _localNode; private readonly ILoggingAdapter _log = Context.GetLogger(); + private int _writes; - - /// - /// Returns Props pre-configured to use the opcua-synchronized-dispatcher. Caller can - /// still override by chaining .WithDispatcher(otherId) for unit tests. - /// - public static Props Props() => - Akka.Actor.Props.Create(() => new OpcUaPublishActor()).WithDispatcher(DispatcherId); - - /// Test-only Props that omits the pinned dispatcher requirement. - public static Props PropsForTests() => - Akka.Actor.Props.Create(() => new OpcUaPublishActor()); + private byte _lastServiceLevel; public int WriteCount => _writes; + public byte LastServiceLevel => _lastServiceLevel; - public OpcUaPublishActor() + /// Production Props — pins the OPC UA dispatcher + subscribes to the + /// redundancy-state DPS topic so cluster transitions drive the local ServiceLevel + /// publish path. + public static Props Props( + IOpcUaAddressSpaceSink? sink = null, + IServiceLevelPublisher? serviceLevel = null, + NodeId? localNode = null) => + Akka.Actor.Props.Create(() => new OpcUaPublishActor( + sink ?? NullOpcUaAddressSpaceSink.Instance, + serviceLevel ?? NullServiceLevelPublisher.Instance, + subscribeRedundancyTopic: true, + localNode)).WithDispatcher(DispatcherId); + + /// Test-only Props that omits the pinned-dispatcher requirement and skips the + /// DPS subscribe so unit tests can spin up the actor on a vanilla TestKit cluster. + public static Props PropsForTests( + IOpcUaAddressSpaceSink? sink = null, + IServiceLevelPublisher? serviceLevel = null, + bool subscribeRedundancyTopic = false, + NodeId? localNode = null) => + Akka.Actor.Props.Create(() => new OpcUaPublishActor( + sink ?? NullOpcUaAddressSpaceSink.Instance, + serviceLevel ?? NullServiceLevelPublisher.Instance, + subscribeRedundancyTopic, + localNode)); + + public OpcUaPublishActor( + IOpcUaAddressSpaceSink sink, + IServiceLevelPublisher serviceLevel, + bool subscribeRedundancyTopic, + NodeId? localNode) { - Receive(msg => + _sink = sink; + _serviceLevel = serviceLevel; + _subscribeRedundancyTopic = subscribeRedundancyTopic; + _localNode = localNode; + + Receive(HandleAttributeUpdate); + Receive(HandleAlarmUpdate); + Receive(HandleRebuild); + Receive(HandleServiceLevelChanged); + Receive(HandleRedundancyStateChanged); + Receive(_ => { /* PubSub ack */ }); + } + + protected override void PreStart() + { + if (_subscribeRedundancyTopic) { - // F10: call into OpcUaApplicationHost to write the address-space node. + DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(RedundancyStateTopic, Self)); + } + } + + private void HandleAttributeUpdate(AttributeValueUpdate msg) + { + try + { + _sink.WriteValue(msg.NodeId, msg.Value, msg.Quality, msg.TimestampUtc); Interlocked.Increment(ref _writes); - _log.Debug("OpcUaPublish: queued AttributeValueUpdate for {Node} ({Quality}) (write staged for F10)", - msg.NodeId, msg.Quality); - }); - Receive(msg => + } + catch (Exception ex) { + _log.Warning(ex, "OpcUaPublish: sink.WriteValue threw for {Node}", msg.NodeId); + } + } + + private void HandleAlarmUpdate(AlarmStateUpdate msg) + { + try + { + _sink.WriteAlarmState(msg.AlarmNodeId, msg.Active, msg.Acknowledged, msg.TimestampUtc); Interlocked.Increment(ref _writes); - _log.Debug("OpcUaPublish: queued AlarmStateUpdate for {Node} (active={Active})", - msg.AlarmNodeId, msg.Active); - }); - Receive(msg => + } + catch (Exception ex) { - _log.Info("OpcUaPublish: address-space rebuild requested (correlation={Correlation}); F10 wires the SDK call", + _log.Warning(ex, "OpcUaPublish: sink.WriteAlarmState threw for {Node}", msg.AlarmNodeId); + } + } + + private void HandleRebuild(RebuildAddressSpace msg) + { + try + { + _sink.RebuildAddressSpace(); + _log.Info("OpcUaPublish: address-space rebuilt (correlation={Correlation})", msg.Correlation); + } + catch (Exception ex) + { + _log.Error(ex, "OpcUaPublish: sink.RebuildAddressSpace threw (correlation={Correlation})", msg.Correlation); - }); - Receive(msg => + } + } + + private void HandleServiceLevelChanged(ServiceLevelChanged msg) + { + if (msg.ServiceLevel == _lastServiceLevel) return; + _lastServiceLevel = msg.ServiceLevel; + try { - _log.Debug("OpcUaPublish: ServiceLevel={Level} (write staged for F10)", msg.ServiceLevel); - }); + _serviceLevel.Publish(msg.ServiceLevel); + _log.Debug("OpcUaPublish: ServiceLevel={Level}", msg.ServiceLevel); + } + catch (Exception ex) + { + _log.Warning(ex, "OpcUaPublish: ServiceLevel publisher threw at level {Level}", msg.ServiceLevel); + } + } + + /// + /// Compute a coarse ServiceLevel from the cluster snapshot and forward to the + /// . This is a placeholder for F10b's full health + /// aggregation — for now we surface "primary-leader → 240, secondary → 100, detached → 0" + /// so the local SDK at least reflects role state. The full + /// path (with DB-reachable, OPC UA probe inputs) lives in RedundancyStateActor on + /// admin nodes; this driver-side mirror exists so each node's own SDK exposes a sensible + /// ServiceLevel without round-tripping back through the admin singleton. + /// + private void HandleRedundancyStateChanged(RedundancyStateChanged msg) + { + if (_localNode is null) return; + + var local = msg.Nodes.FirstOrDefault(n => n.NodeId == _localNode.Value); + if (local is null) return; + + byte level = local.Role switch + { + RedundancyRole.Primary when local.IsRoleLeaderForDriver => 240, + RedundancyRole.Primary => 200, + RedundancyRole.Secondary => 100, + RedundancyRole.Detached => 0, + _ => 0, + }; + Self.Tell(new ServiceLevelChanged(level)); } } diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/OpcUa/OpcUaPublishActorTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/OpcUa/OpcUaPublishActorTests.cs index 3d44bda..006a5b1 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/OpcUa/OpcUaPublishActorTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/OpcUa/OpcUaPublishActorTests.cs @@ -1,6 +1,9 @@ +using System.Collections.Concurrent; using Akka.Actor; using Shouldly; using Xunit; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy; +using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; using ZB.MOM.WW.OtOpcUa.Commons.Types; using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa; using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; @@ -13,12 +16,11 @@ public sealed class OpcUaPublishActorTests : RuntimeActorTestBase public void Accepts_message_contracts_without_pinned_dispatcher_in_tests() { var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests()); - actor.Tell(new OpcUaPublishActor.AttributeValueUpdate("ns=2;s=Tag1", 42.0, OpcUaPublishActor.OpcUaQuality.Good, DateTime.UtcNow)); + actor.Tell(new OpcUaPublishActor.AttributeValueUpdate("ns=2;s=Tag1", 42.0, OpcUaQuality.Good, DateTime.UtcNow)); actor.Tell(new OpcUaPublishActor.AlarmStateUpdate("ns=2;s=Alarm1", true, false, DateTime.UtcNow)); actor.Tell(new OpcUaPublishActor.RebuildAddressSpace(CorrelationId.NewId())); actor.Tell(new OpcUaPublishActor.ServiceLevelChanged(240)); - // Actor stays alive; no exceptions surface. ExpectNoMsg(TimeSpan.FromMilliseconds(200)); } @@ -28,4 +30,135 @@ public sealed class OpcUaPublishActorTests : RuntimeActorTestBase var props = OpcUaPublishActor.Props(); props.Dispatcher.ShouldBe(OpcUaPublishActor.DispatcherId); } + + [Fact] + public void AttributeValueUpdate_routes_to_sink_WriteValue() + { + var sink = new RecordingSink(); + var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(sink: sink)); + + actor.Tell(new OpcUaPublishActor.AttributeValueUpdate("ns=2;s=T1", 3.14, OpcUaQuality.Good, DateTime.UtcNow)); + actor.Tell(new OpcUaPublishActor.AttributeValueUpdate("ns=2;s=T2", "abc", OpcUaQuality.Uncertain, DateTime.UtcNow)); + + AwaitAssert(() => + { + sink.Values.Count.ShouldBe(2); + sink.Values[0].NodeId.ShouldBe("ns=2;s=T1"); + sink.Values[0].Value.ShouldBe(3.14); + sink.Values[0].Quality.ShouldBe(OpcUaQuality.Good); + sink.Values[1].Quality.ShouldBe(OpcUaQuality.Uncertain); + }, duration: TimeSpan.FromMilliseconds(500)); + } + + [Fact] + public void AlarmStateUpdate_routes_to_sink_WriteAlarmState() + { + var sink = new RecordingSink(); + var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(sink: sink)); + + actor.Tell(new OpcUaPublishActor.AlarmStateUpdate("ns=2;s=A1", Active: true, Acknowledged: false, DateTime.UtcNow)); + + AwaitAssert(() => + { + sink.Alarms.Count.ShouldBe(1); + sink.Alarms[0].AlarmNodeId.ShouldBe("ns=2;s=A1"); + sink.Alarms[0].Active.ShouldBeTrue(); + sink.Alarms[0].Acknowledged.ShouldBeFalse(); + }, duration: TimeSpan.FromMilliseconds(500)); + } + + [Fact] + public void RebuildAddressSpace_calls_sink_Rebuild() + { + var sink = new RecordingSink(); + var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(sink: sink)); + + actor.Tell(new OpcUaPublishActor.RebuildAddressSpace(CorrelationId.NewId())); + + AwaitAssert(() => sink.RebuildCalls.ShouldBe(1), duration: TimeSpan.FromMilliseconds(500)); + } + + [Fact] + public void ServiceLevelChanged_publishes_to_IServiceLevelPublisher_once_per_unique_level() + { + var publisher = new RecordingPublisher(); + var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(serviceLevel: publisher)); + + actor.Tell(new OpcUaPublishActor.ServiceLevelChanged(240)); + actor.Tell(new OpcUaPublishActor.ServiceLevelChanged(240)); // dedup + actor.Tell(new OpcUaPublishActor.ServiceLevelChanged(100)); + + AwaitAssert(() => publisher.Levels.ShouldBe(new byte[] { 240, 100 }), + duration: TimeSpan.FromMilliseconds(500)); + } + + [Fact] + public void RedundancyStateChanged_drives_local_ServiceLevel_publish_for_primary_leader() + { + var publisher = new RecordingPublisher(); + var local = NodeId.Parse("primary-node"); + var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests( + serviceLevel: publisher, localNode: local)); + + var snapshot = new RedundancyStateChanged( + Nodes: new[] + { + new NodeRedundancyState(local, RedundancyRole.Primary, + IsClusterLeader: true, IsRoleLeaderForDriver: true, DateTime.UtcNow), + new NodeRedundancyState(NodeId.Parse("other-node"), RedundancyRole.Secondary, + IsClusterLeader: false, IsRoleLeaderForDriver: false, DateTime.UtcNow), + }, + CorrelationId.NewId()); + actor.Tell(snapshot); + + AwaitAssert(() => publisher.Levels.ShouldBe(new byte[] { 240 }), + duration: TimeSpan.FromMilliseconds(500)); + } + + [Fact] + public void RedundancyStateChanged_for_secondary_publishes_100() + { + var publisher = new RecordingPublisher(); + var local = NodeId.Parse("secondary-node"); + var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(serviceLevel: publisher, localNode: local)); + + var snapshot = new RedundancyStateChanged( + Nodes: new[] + { + new NodeRedundancyState(local, RedundancyRole.Secondary, + IsClusterLeader: false, IsRoleLeaderForDriver: false, DateTime.UtcNow), + }, + CorrelationId.NewId()); + actor.Tell(snapshot); + + AwaitAssert(() => publisher.Levels.ShouldBe(new byte[] { 100 }), + duration: TimeSpan.FromMilliseconds(500)); + } + + private sealed class RecordingSink : IOpcUaAddressSpaceSink + { + public ConcurrentQueue<(string NodeId, object? Value, OpcUaQuality Quality, DateTime Ts)> ValueQueue { get; } = new(); + public ConcurrentQueue<(string AlarmNodeId, bool Active, bool Acknowledged, DateTime Ts)> AlarmQueue { get; } = new(); + public int RebuildCalls; + + public List<(string NodeId, object? Value, OpcUaQuality Quality, DateTime Ts)> Values => + ValueQueue.ToList(); + public List<(string AlarmNodeId, bool Active, bool Acknowledged, DateTime Ts)> Alarms => + AlarmQueue.ToList(); + + public void WriteValue(string nodeId, object? value, OpcUaQuality quality, DateTime ts) => + ValueQueue.Enqueue((nodeId, value, quality, ts)); + + public void WriteAlarmState(string alarmNodeId, bool active, bool acknowledged, DateTime ts) => + AlarmQueue.Enqueue((alarmNodeId, active, acknowledged, ts)); + + public void RebuildAddressSpace() => Interlocked.Increment(ref RebuildCalls); + } + + private sealed class RecordingPublisher : IServiceLevelPublisher + { + private readonly ConcurrentQueue _q = new(); + public byte[] Levels => _q.ToArray(); + public void Publish(byte serviceLevel) => _q.Enqueue(serviceLevel); + } }