diff --git a/docs/plans/2026-05-26-akka-hosting-alignment-plan.md.tasks.json b/docs/plans/2026-05-26-akka-hosting-alignment-plan.md.tasks.json index 85a9a82..639022e 100644 --- a/docs/plans/2026-05-26-akka-hosting-alignment-plan.md.tasks.json +++ b/docs/plans/2026-05-26-akka-hosting-alignment-plan.md.tasks.json @@ -84,7 +84,7 @@ {"id": "F7", "subject": "Follow-up: DriverInstanceActor full engine wiring (subscriptions, writes, ApplyDelta diff)", "status": "completed", "classification": "standard", "estMinutes": 45, "parallelizableWith": [], "blockedBy": [44], "origin": "Self-review of Task 41 — subscription publishing, ApplyDelta diffing, bad-quality-on-disconnect, write path, and supervisor backoff are stubbed. Wire after OpcUaPublishActor lands.", "shipped": "All three pieces landed: (1) spawn lifecycle in DriverHostActor (DriverSpawnPlanner + IDriverFactory seam) — da14149, (2) ISubscribable wiring + OPC UA status-code → OpcUaQuality severity-bit mapping + DetachSubscription on disconnect/PostStop, (3) IWritable.WriteAsync write path with 5s timeout, status-code bubble-up, and AttributeValuePublished published to parent on every OnDataChange — both shipped in the F7-residual batch. Host DI binding (DriverFactoryBootstrap registers AbCip/AbLegacy/FOCAS/Galaxy/Modbus/S7/TwinCAT factories) lives in src/Server/ZB.MOM.WW.OtOpcUa.Host/Drivers/."}, {"id": "F8", "subject": "Follow-up: VirtualTagActor engine wiring (compile expression, subscribe deps, publish result)", "status": "partial", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 42 — VirtualTagEngine.Evaluate not called; DependencyValueChanged just buffers.", "shipped": "(1) IVirtualTagEvaluator seam + NullVirtualTagEvaluator default. VirtualTagActor calls evaluator on DependencyValueChanged, dedupes unchanged results, emits EvaluationResult to parent, publishes Warning ScriptLogEntry on failure. (2) DependencyMuxActor in Runtime fans out DriverInstanceActor.AttributeValuePublished from DriverHostActor through to interested VirtualTagActor subscribers. VirtualTagActor takes dependencyRefs + mux ActorRef in Props, registers interest in PreStart, unregisters in PostStop. WithOtOpcUaRuntimeActors spawns the mux + threads it into DriverHostActor. Production binding to Core.VirtualTags.VirtualTagEngine (expression compile + dep extraction) still TODO — split as F8b."}, {"id": "F9", "subject": "Follow-up: ScriptedAlarmActor engine wiring + state persistence", "status": "partial", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 43 — AlarmConditionService not called; PreRestart persistence to ScriptedAlarmState DB not wired; HistorianAdapter rows not emitted.", "shipped": "(1) IScriptedAlarmEvaluator seam + NullScriptedAlarmEvaluator default. ScriptedAlarmActor takes AlarmConfig (id/name/path/severity/predicate), evaluates on DependencyValueChanged, publishes AlarmTransitionEvent + ScriptLogEntry on every transition. (2) IAlarmActorStateStore seam in Commons.Engines + NullAlarmActorStateStore default + EfAlarmActorStateStore production adapter over the ScriptedAlarmState entity. ScriptedAlarmActor PreStart loads + restores; every Transition fires a fire-and-forget save with lastAckUser. Predicate binding to Core.ScriptedAlarms.ScriptedAlarmEngine still TODO — split as F9b."}, - {"id": "F10", "subject": "Follow-up: OpcUaPublishActor SDK integration (address-space writes + ServiceLevel + RebuildAddressSpace)", "status": "partial", "classification": "high-risk", "estMinutes": 60, "parallelizableWith": [], "blockedBy": [47], "origin": "Self-review of Task 44 — SDK calls stubbed; counters only. Wire after Phase 7 OpcUaServer extraction.", "shipped": "IOpcUaAddressSpaceSink + IServiceLevelPublisher seams in Commons.OpcUa with Null* defaults. OpcUaPublishActor routes AttributeValueUpdate/AlarmStateUpdate/RebuildAddressSpace to the sink, dedupes ServiceLevelChanged, subscribes to redundancy-state DPS topic, and maps per-local-node redundancy snapshot to a coarse ServiceLevel (Primary+leader=240, Primary=200, Secondary=100, Detached=0). Production binding to a real SDK NodeManager + Variable nodes still TODO — split as F10b. Task 60 still blocked on F10b."}, + {"id": "F10", "subject": "Follow-up: OpcUaPublishActor SDK integration (address-space writes + ServiceLevel + RebuildAddressSpace)", "status": "partial", "classification": "high-risk", "estMinutes": 60, "parallelizableWith": [], "blockedBy": [47], "origin": "Self-review of Task 44 — SDK calls stubbed; counters only. Wire after Phase 7 OpcUaServer extraction.", "shipped": "(1) IOpcUaAddressSpaceSink + IServiceLevelPublisher seams in Commons.OpcUa with Null* defaults. OpcUaPublishActor routes through the sink, dedupes ServiceLevelChanged, subscribes to redundancy-state DPS topic, maps redundancy snapshot to a coarse ServiceLevel (Primary+leader=240, Primary=200, Secondary=100, Detached=0). (2) OtOpcUaNodeManager (CustomNodeManager2) + OtOpcUaSdkServer (StandardServer subclass) + SdkAddressSpaceSink in OpcUaServer — lazy variable creation on first WriteValue, WriteAlarmState shape, RebuildAddressSpace tear-down. Variable updates propagate via ClearChangeMasks so subscribed OPC UA clients see them. Tests boot a real StandardServer + verify sink writes show up in the manager. Production wiring through OpcUaApplicationHost.StartAsync (default server = OtOpcUaSdkServer) + IServiceLevelPublisher SDK binding + #109 OpcUaPublishActor→Phase7Applier integration are the remaining pieces."}, {"id": "F11", "subject": "Follow-up: HistorianAdapterActor named-pipe IPC + SqliteStoreAndForwardSink wiring", "status": "completed", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "commit": "6861381", "deviationNotes": "Reshaped HistorianAdapterActor around the existing IAlarmHistorianSink abstraction (alarm-event shape, not the original tag-history-row stub). Defaults to NullAlarmHistorianSink; production deployments wire SqliteStoreAndForwardSink + WonderwareHistorianClient via AddOtOpcUaRuntime overrides. Actor now exposes GetStatus returning HistorianSinkStatus for diagnostics. Named-pipe transport implementation lives unchanged in src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Client/WonderwareHistorianClient.cs — the actor is intentionally just a fire-and-forget bridge.", "origin": "Self-review of Task 45 — stub buffers in-memory; named-pipe + SQLite store-and-forward not wired."}, {"id": "F12", "subject": "Follow-up: PeerOpcUaProbeActor real opc.tcp ping (replace Ok=true stub)", "status": "completed", "classification": "small", "estMinutes": 20, "parallelizableWith": [], "blockedBy": [], "commit": "b06e3ae", "deviation": "TCP-connect probe rather than full OPC UA Hello/Acknowledge handshake. Enough for the redundancy calc; deeper liveness signals can layer on later without changing the actor's contract.", "origin": "Self-review of Task 45 — RunProbe always returns Ok=true; replace with OPC UA Client connect."}, {"id": "F13", "subject": "Follow-up: Full OpcUaApplicationHost extraction (security/alarms/history/observability)", "status": "partial", "classification": "high-risk", "estMinutes": 120, "parallelizableWith": [], "blockedBy": [], "commit": "36c4751-partial", "deviationNotes": "F13a (cert auto-creation) shipped in 36c4751. Remaining: endpoint-security wiring (SecurityProfileResolver into ServerConfiguration.SecurityPolicies), LDAP user-token validator (the OPC UA UserNameToken path; HTTP-layer LDAP auth is separate and already in OtOpcUa.Security), scripted-alarm node manager creation, history backend wiring, observability hooks (OpenTelemetry metrics + traces). These are gated by F10's OpcUaPublishActor SDK integration — until F10 lands, nothing instantiates OpcUaApplicationHost so the missing wiring is dead weight.", "origin": "Self-review of Task 46 — facade only boots ApplicationInstance + StandardServer. Legacy 391-line file pulls Server.Security/Alarms/History/Observability. Pull those into thin OpcUaServer interfaces."}, diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/OtOpcUaNodeManager.cs b/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/OtOpcUaNodeManager.cs new file mode 100644 index 0000000..b5ec9ea --- /dev/null +++ b/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/OtOpcUaNodeManager.cs @@ -0,0 +1,147 @@ +using System.Collections.Concurrent; +using Opc.Ua; +using Opc.Ua.Server; +using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; + +namespace ZB.MOM.WW.OtOpcUa.OpcUaServer; + +/// +/// Custom OPC UA that owns the writable address space for +/// the OtOpcUa server. Variable nodes are created lazily on first +/// under the manager's namespace; subsequent writes update the existing node's Value + +/// StatusCode + SourceTimestamp and notify subscribed clients via the standard +/// ClearChangeMasks path. +/// +/// This is the F10b production wiring behind the v2 +/// seam — once a is bound, OpcUaPublishActor's writes +/// materialise as real OPC UA Variable updates that clients can browse + subscribe to. +/// +/// Node-id encoding uses the manager's default namespace + the caller-supplied string id +/// as the identifier portion (e.g. "ns=2;s=eq-1/temp"). Equipment-folder hierarchy +/// and OPC UA type metadata still come from the Phase7Applier / EquipmentNodeWalker +/// integration (F14b, tracked under #85) — this manager treats every id as a flat +/// under the namespace root. +/// +public sealed class OtOpcUaNodeManager : CustomNodeManager2 +{ + public const string DefaultNamespaceUri = "https://zb.com/otopcua/ns"; + + private readonly ConcurrentDictionary _variables = new(StringComparer.Ordinal); + private FolderState? _root; + + public OtOpcUaNodeManager(IServerInternal server, ApplicationConfiguration configuration) + : base(server, configuration, DefaultNamespaceUri) + { + // SystemContext is initialised by the base ctor. + } + + public int VariableCount => _variables.Count; + + /// + /// Apply a value write from . Creates the + /// variable node on first call; subsequent calls update Value + StatusCode + + /// SourceTimestamp and call ClearChangeMasks so subscribed clients see the change. + /// + public void WriteValue(string nodeId, object? value, OpcUaQuality quality, DateTime sourceTimestampUtc) + { + ArgumentException.ThrowIfNullOrEmpty(nodeId); + var variable = _variables.GetOrAdd(nodeId, CreateVariable); + + lock (Lock) + { + variable.Value = value; + variable.StatusCode = StatusFromQuality(quality); + variable.Timestamp = sourceTimestampUtc; + variable.ClearChangeMasks(SystemContext, includeChildren: false); + } + } + + /// Apply an alarm-state write. Surfaced as a two-element Variable carrying + /// [active, acknowledged] — proper AlarmConditionState + event firing + /// comes when the F14b walker integration lands and registers real condition nodes. + public void WriteAlarmState(string alarmNodeId, bool active, bool acknowledged, DateTime sourceTimestampUtc) + { + ArgumentException.ThrowIfNullOrEmpty(alarmNodeId); + var variable = _variables.GetOrAdd(alarmNodeId, CreateVariable); + + lock (Lock) + { + variable.Value = new[] { active, acknowledged }; + variable.StatusCode = StatusCodes.Good; + variable.Timestamp = sourceTimestampUtc; + variable.ClearChangeMasks(SystemContext, includeChildren: false); + } + } + + /// Clear every registered variable from the address space. Phase7Applier calls this + /// when Equipment/Alarm topology changes; the populator then re-adds via WriteValue on the + /// next pass. + public void RebuildAddressSpace() + { + lock (Lock) + { + foreach (var v in _variables.Values) + { + v.Parent?.RemoveChild(v); + PredefinedNodes?.Remove(v.NodeId); + } + _variables.Clear(); + } + } + + /// + public override void CreateAddressSpace(IDictionary> externalReferences) + { + lock (Lock) + { + base.CreateAddressSpace(externalReferences); + + // Create one root folder under Objects/ for every variable we mint to hang under. + _root = new FolderState(null) + { + NodeId = new NodeId("OtOpcUa", NamespaceIndex), + BrowseName = new QualifiedName("OtOpcUa", NamespaceIndex), + DisplayName = "OtOpcUa", + EventNotifier = EventNotifiers.None, + TypeDefinitionId = ObjectTypeIds.FolderType, + }; + _root.AddReference(ReferenceTypeIds.Organizes, isInverse: true, ObjectIds.ObjectsFolder); + + if (!externalReferences.TryGetValue(ObjectIds.ObjectsFolder, out var refs)) + { + refs = new List(); + externalReferences[ObjectIds.ObjectsFolder] = refs; + } + refs.Add(new NodeStateReference(ReferenceTypeIds.Organizes, isInverse: false, _root.NodeId)); + + AddPredefinedNode(SystemContext, _root); + } + } + + private BaseDataVariableState CreateVariable(string nodeId) + { + var v = new BaseDataVariableState(_root) + { + NodeId = new NodeId(nodeId, NamespaceIndex), + BrowseName = new QualifiedName(nodeId, NamespaceIndex), + DisplayName = nodeId, + TypeDefinitionId = VariableTypeIds.BaseDataVariableType, + ReferenceTypeId = ReferenceTypeIds.Organizes, + DataType = DataTypeIds.BaseDataType, + ValueRank = ValueRanks.Scalar, + AccessLevel = AccessLevels.CurrentRead, + UserAccessLevel = AccessLevels.CurrentRead, + Historizing = false, + }; + _root?.AddChild(v); + AddPredefinedNode(SystemContext, v); + return v; + } + + private static StatusCode StatusFromQuality(OpcUaQuality quality) => quality switch + { + OpcUaQuality.Good => StatusCodes.Good, + OpcUaQuality.Uncertain => StatusCodes.Uncertain, + _ => StatusCodes.Bad, + }; +} diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/OtOpcUaSdkServer.cs b/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/OtOpcUaSdkServer.cs new file mode 100644 index 0000000..aa62856 --- /dev/null +++ b/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/OtOpcUaSdkServer.cs @@ -0,0 +1,27 @@ +using Opc.Ua; +using Opc.Ua.Server; + +namespace ZB.MOM.WW.OtOpcUa.OpcUaServer; + +/// +/// subclass that wires in the v2 . +/// Exposes the live node manager after start so callers (, +/// the fused Host's DI binding) can wrap it in a and hand +/// it to OpcUaPublishActor. +/// +public sealed class OtOpcUaSdkServer : StandardServer +{ + private OtOpcUaNodeManager? _otOpcUaNodeManager; + + /// The custom node manager once StartAsync has called + /// . Null until the SDK has bootstrapped. + public OtOpcUaNodeManager? NodeManager => _otOpcUaNodeManager; + + /// + protected override MasterNodeManager CreateMasterNodeManager( + IServerInternal server, ApplicationConfiguration configuration) + { + _otOpcUaNodeManager = new OtOpcUaNodeManager(server, configuration); + return new MasterNodeManager(server, configuration, dynamicNamespaceUri: null, _otOpcUaNodeManager); + } +} diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/SdkAddressSpaceSink.cs b/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/SdkAddressSpaceSink.cs new file mode 100644 index 0000000..cd18d98 --- /dev/null +++ b/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/SdkAddressSpaceSink.cs @@ -0,0 +1,28 @@ +using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; + +namespace ZB.MOM.WW.OtOpcUa.OpcUaServer; + +/// +/// Production binding for v2 — bridges +/// OpcUaPublishActor's writes to the SDK address space owned by +/// . The host wires this in once the StandardServer has +/// been started (so the node manager exists). +/// +public sealed class SdkAddressSpaceSink : IOpcUaAddressSpaceSink +{ + private readonly OtOpcUaNodeManager _nodeManager; + + public SdkAddressSpaceSink(OtOpcUaNodeManager nodeManager) + { + ArgumentNullException.ThrowIfNull(nodeManager); + _nodeManager = nodeManager; + } + + public void WriteValue(string nodeId, object? value, OpcUaQuality quality, DateTime sourceTimestampUtc) + => _nodeManager.WriteValue(nodeId, value, quality, sourceTimestampUtc); + + public void WriteAlarmState(string alarmNodeId, bool active, bool acknowledged, DateTime sourceTimestampUtc) + => _nodeManager.WriteAlarmState(alarmNodeId, active, acknowledged, sourceTimestampUtc); + + public void RebuildAddressSpace() => _nodeManager.RebuildAddressSpace(); +} diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests/SdkAddressSpaceSinkTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests/SdkAddressSpaceSinkTests.cs new file mode 100644 index 0000000..94a925f --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests/SdkAddressSpaceSinkTests.cs @@ -0,0 +1,119 @@ +using Microsoft.Extensions.Logging.Abstractions; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; + +namespace ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests; + +/// +/// Integration tests for the F10b production binding: boot a real +/// through , attach a , +/// drive WriteValue/WriteAlarmState/RebuildAddressSpace, and verify the +/// reflects the writes. +/// +public sealed class SdkAddressSpaceSinkTests : IDisposable +{ + private static CancellationToken Ct => TestContext.Current.CancellationToken; + + private readonly string _pkiRoot = Path.Combine( + Path.GetTempPath(), + $"otopcua-sink-{Guid.NewGuid():N}"); + + [Fact] + public async Task WriteValue_creates_and_updates_variable_in_node_manager() + { + var (host, server) = await BootAsync(); + var sink = new SdkAddressSpaceSink(server.NodeManager!); + + sink.WriteValue("eq-1/temp", 22.5, OpcUaQuality.Good, DateTime.UtcNow); + sink.WriteValue("eq-1/temp", 23.1, OpcUaQuality.Good, DateTime.UtcNow); + sink.WriteValue("eq-1/pressure", 100, OpcUaQuality.Uncertain, DateTime.UtcNow); + + server.NodeManager!.VariableCount.ShouldBe(2); + + await host.DisposeAsync(); + } + + [Fact] + public async Task WriteAlarmState_creates_dedicated_node_distinct_from_value_writes() + { + var (host, server) = await BootAsync(); + var sink = new SdkAddressSpaceSink(server.NodeManager!); + + sink.WriteAlarmState("alarm-7", active: true, acknowledged: false, DateTime.UtcNow); + sink.WriteValue("eq-1/temp", 22.5, OpcUaQuality.Good, DateTime.UtcNow); + + server.NodeManager!.VariableCount.ShouldBe(2); + + await host.DisposeAsync(); + } + + [Fact] + public async Task RebuildAddressSpace_clears_all_registered_variables() + { + var (host, server) = await BootAsync(); + var sink = new SdkAddressSpaceSink(server.NodeManager!); + + sink.WriteValue("a", 1, OpcUaQuality.Good, DateTime.UtcNow); + sink.WriteValue("b", 2, OpcUaQuality.Good, DateTime.UtcNow); + sink.WriteAlarmState("alarm-c", true, false, DateTime.UtcNow); + server.NodeManager!.VariableCount.ShouldBe(3); + + sink.RebuildAddressSpace(); + server.NodeManager.VariableCount.ShouldBe(0); + + // After rebuild, subsequent writes start fresh. + sink.WriteValue("a", 99, OpcUaQuality.Good, DateTime.UtcNow); + server.NodeManager.VariableCount.ShouldBe(1); + + await host.DisposeAsync(); + } + + [Fact] + public async Task NullOpcUaAddressSpaceSink_does_not_crash_on_any_call() + { + // Sanity check that the F10 fallback still works — production callers default to + // NullOpcUaAddressSpaceSink when no SDK NodeManager is wired. + var sink = NullOpcUaAddressSpaceSink.Instance; + sink.WriteValue("x", 1, OpcUaQuality.Good, DateTime.UtcNow); + sink.WriteAlarmState("a", true, false, DateTime.UtcNow); + sink.RebuildAddressSpace(); + await Task.CompletedTask; + } + + private async Task<(OpcUaApplicationHost Host, OtOpcUaSdkServer Server)> BootAsync() + { + var host = new OpcUaApplicationHost( + new OpcUaApplicationHostOptions + { + ApplicationName = "OtOpcUa.SinkTest", + ApplicationUri = $"urn:OtOpcUa.SinkTest:{Guid.NewGuid():N}", + OpcUaPort = AllocateFreePort(), + PublicHostname = "localhost", + PkiStoreRoot = _pkiRoot, + }, + NullLogger.Instance); + + var server = new OtOpcUaSdkServer(); + await host.StartAsync(server, Ct); + return (host, server); + } + + private static int AllocateFreePort() + { + using var listener = new System.Net.Sockets.TcpListener(System.Net.IPAddress.Loopback, 0); + listener.Start(); + var port = ((System.Net.IPEndPoint)listener.LocalEndpoint).Port; + listener.Stop(); + return port; + } + + public void Dispose() + { + if (Directory.Exists(_pkiRoot)) + { + try { Directory.Delete(_pkiRoot, recursive: true); } + catch { /* best-effort cleanup */ } + } + } +}