diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs new file mode 100644 index 0000000..f02a0c0 --- /dev/null +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs @@ -0,0 +1,70 @@ +using Akka.Actor; +using Akka.Event; +using ZB.MOM.WW.OtOpcUa.Commons.Types; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.OpcUa; + +/// +/// Single-threaded bridge between Akka messages and the OPC UA SDK address space. Hosted on +/// the pinned opcua-synchronized-dispatcher (Task 19 HOCON) so the OPC UA SDK sees +/// only one thread per actor instance — its session/subscription locks expect strict +/// single-threaded access. +/// +/// Engine wiring (call into OpcUaApplicationHost address-space writes, manage +/// ServiceLevel + ServerUriArray nodes, subscribe to the redundancy-state +/// DistributedPubSub topic) is staged for follow-up F10. This skeleton compiles + exposes the +/// message contracts so producers (DriverInstance, VirtualTag, ScriptedAlarm) can target it. +/// +public sealed class OpcUaPublishActor : ReceiveActor +{ + public const string DispatcherId = "opcua-synchronized-dispatcher"; + + public sealed record AttributeValueUpdate(string NodeId, object? Value, OpcUaQuality Quality, DateTime TimestampUtc); + public sealed record AlarmStateUpdate(string AlarmNodeId, bool Active, bool Acknowledged, DateTime TimestampUtc); + public sealed record RebuildAddressSpace(CorrelationId Correlation); + public sealed record ServiceLevelChanged(byte ServiceLevel); + + public enum OpcUaQuality { Good, Uncertain, Bad } + + private readonly ILoggingAdapter _log = Context.GetLogger(); + private int _writes; + + /// + /// Returns Props pre-configured to use the opcua-synchronized-dispatcher. Caller can + /// still override by chaining .WithDispatcher(otherId) for unit tests. + /// + public static Props Props() => + Akka.Actor.Props.Create(() => new OpcUaPublishActor()).WithDispatcher(DispatcherId); + + /// Test-only Props that omits the pinned dispatcher requirement. + public static Props PropsForTests() => + Akka.Actor.Props.Create(() => new OpcUaPublishActor()); + + public int WriteCount => _writes; + + public OpcUaPublishActor() + { + Receive(msg => + { + // F10: call into OpcUaApplicationHost to write the address-space node. + Interlocked.Increment(ref _writes); + _log.Debug("OpcUaPublish: queued AttributeValueUpdate for {Node} ({Quality}) (write staged for F10)", + msg.NodeId, msg.Quality); + }); + Receive(msg => + { + Interlocked.Increment(ref _writes); + _log.Debug("OpcUaPublish: queued AlarmStateUpdate for {Node} (active={Active})", + msg.AlarmNodeId, msg.Active); + }); + Receive(msg => + { + _log.Info("OpcUaPublish: address-space rebuild requested (correlation={Correlation}); F10 wires the SDK call", + msg.Correlation); + }); + Receive(msg => + { + _log.Debug("OpcUaPublish: ServiceLevel={Level} (write staged for F10)", msg.ServiceLevel); + }); + } +} diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/OpcUa/OpcUaPublishActorTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/OpcUa/OpcUaPublishActorTests.cs new file mode 100644 index 0000000..3d44bda --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/OpcUa/OpcUaPublishActorTests.cs @@ -0,0 +1,31 @@ +using Akka.Actor; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Commons.Types; +using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa; +using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.OpcUa; + +public sealed class OpcUaPublishActorTests : RuntimeActorTestBase +{ + [Fact] + public void Accepts_message_contracts_without_pinned_dispatcher_in_tests() + { + var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests()); + actor.Tell(new OpcUaPublishActor.AttributeValueUpdate("ns=2;s=Tag1", 42.0, OpcUaPublishActor.OpcUaQuality.Good, DateTime.UtcNow)); + actor.Tell(new OpcUaPublishActor.AlarmStateUpdate("ns=2;s=Alarm1", true, false, DateTime.UtcNow)); + actor.Tell(new OpcUaPublishActor.RebuildAddressSpace(CorrelationId.NewId())); + actor.Tell(new OpcUaPublishActor.ServiceLevelChanged(240)); + + // Actor stays alive; no exceptions surface. + ExpectNoMsg(TimeSpan.FromMilliseconds(200)); + } + + [Fact] + public void Production_Props_targets_opcua_synchronized_dispatcher() + { + var props = OpcUaPublishActor.Props(); + props.Dispatcher.ShouldBe(OpcUaPublishActor.DispatcherId); + } +}