From 50787823d3aaa84c5a13e8cc75c5dfda4cfea7ef Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 26 May 2026 10:02:15 -0400 Subject: [PATCH] =?UTF-8?q?feat(host,runtime):=20#108=20Host=20DI=20bindin?= =?UTF-8?q?gs=20=E2=80=94=20OPC=20UA=20server=20+=20deferred=20sink?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wires the OPC UA SDK into the fused Host's lifecycle on driver-role nodes + spawns OpcUaPublishActor with the proper sink/publisher/dbFactory/ applier resolution. The full read+write data path is now live in production: Deploy → DriverHost → OpcUaPublish → SDK NodeManager → subscribed OPC UA clients. DeferredAddressSpaceSink (Commons.OpcUa): - Thread-safe wrapper IOpcUaAddressSpaceSink that delegates to an inner sink swapped in at runtime. Needed because Akka actors resolve the sink at construction time, but the production sink (SdkAddressSpaceSink wrapping OtOpcUaNodeManager) only exists after the SDK StandardServer has started. - Defaults to NullOpcUaAddressSpaceSink so calls before swap are safe; SetSink(null) reverts (for graceful shutdown). OtOpcUaServerHostedService (Host.OpcUa): - IHostedService that owns the OPC UA SDK lifecycle. Reads OpcUaApplicationHostOptions from the 'OpcUa' config section, creates an OtOpcUaSdkServer, boots it through OpcUaApplicationHost, then swaps a real SdkAddressSpaceSink into the DeferredAddressSpaceSink singleton. - SDK boot failure is logged + non-fatal — the rest of the host (admin UI, driver actors) keeps running. Stop reverts to null sink. WithOtOpcUaRuntimeActors (Runtime): - Now spawns OpcUaPublishActor (new actor) + threads its ActorRef into DriverHostActor's Props so successful applies trigger the address-space rebuild pipeline. - Phase7Applier is constructed here from the resolved sink + a logger; OpcUaPublishActor takes both. - Prepends the opcua-synchronized-dispatcher HOCON so the extension is self-contained — consumers (Host, tests) don't need to redeclare the dispatcher block. - New OpcUaPublishActorKey + OpcUaPublishActorName for actor-registry resolution. - AddOtOpcUaRuntime now also TryAddSingleton's NullOpcUaAddressSpaceSink + NullServiceLevelPublisher so admin-only nodes (or tests that don't bind the Deferred sink) stay safe. Host.Program.cs (driver-role only): - Binds DeferredAddressSpaceSink as singleton + as IOpcUaAddressSpaceSink - AddHostedService() Tests: OpcUaServer 24 -> 28 (+4 DeferredAddressSpaceSink unit tests), Runtime 69 -> 69 (existing ServiceCollectionExtensionsTests extended to verify the new mux + publish actor registration). All 6 v2 test suites green: 177 tests passing. Closes #108. Engine-wiring is now production-bound end-to-end on driver-role nodes — Deploy reaches real OPC UA Variable nodes that subscribed clients see. --- .../OpcUa/DeferredAddressSpaceSink.cs | 31 +++++++ .../OpcUa/OtOpcUaServerHostedService.cs | 86 +++++++++++++++++++ src/Server/ZB.MOM.WW.OtOpcUa.Host/Program.cs | 10 +++ .../ServiceCollectionExtensions.cs | 42 ++++++++- .../DeferredAddressSpaceSinkTests.cs | 77 +++++++++++++++++ .../ServiceCollectionExtensionsTests.cs | 6 ++ 6 files changed, 251 insertions(+), 1 deletion(-) create mode 100644 src/Core/ZB.MOM.WW.OtOpcUa.Commons/OpcUa/DeferredAddressSpaceSink.cs create mode 100644 src/Server/ZB.MOM.WW.OtOpcUa.Host/OpcUa/OtOpcUaServerHostedService.cs create mode 100644 tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests/DeferredAddressSpaceSinkTests.cs diff --git a/src/Core/ZB.MOM.WW.OtOpcUa.Commons/OpcUa/DeferredAddressSpaceSink.cs b/src/Core/ZB.MOM.WW.OtOpcUa.Commons/OpcUa/DeferredAddressSpaceSink.cs new file mode 100644 index 0000000..f79258f --- /dev/null +++ b/src/Core/ZB.MOM.WW.OtOpcUa.Commons/OpcUa/DeferredAddressSpaceSink.cs @@ -0,0 +1,31 @@ +namespace ZB.MOM.WW.OtOpcUa.Commons.OpcUa; + +/// +/// Wrapper that defers to an inner sink swapped in at +/// runtime. Needed because the production sink (SdkAddressSpaceSink) wraps an +/// OtOpcUaNodeManager that only exists after the SDK StandardServer has +/// started — but Akka actors resolve their sink dependency at construction time, before +/// the hosted service has booted the SDK. +/// +/// Bound as a singleton in DI on driver-role hosts; the OPC UA hosted service calls +/// once the server is up. Until that swap happens, every call is a +/// no-op against , so the actor stays safe to +/// receive messages from the moment it boots. +/// +public sealed class DeferredAddressSpaceSink : IOpcUaAddressSpaceSink +{ + private volatile IOpcUaAddressSpaceSink _inner = NullOpcUaAddressSpaceSink.Instance; + + /// Swap in the production sink. Pass null to revert to the null sink + /// (used during graceful shutdown so post-stop writes don't hit a half-disposed manager). + public void SetSink(IOpcUaAddressSpaceSink? sink) => + _inner = sink ?? NullOpcUaAddressSpaceSink.Instance; + + public void WriteValue(string nodeId, object? value, OpcUaQuality quality, DateTime sourceTimestampUtc) + => _inner.WriteValue(nodeId, value, quality, sourceTimestampUtc); + + public void WriteAlarmState(string alarmNodeId, bool active, bool acknowledged, DateTime sourceTimestampUtc) + => _inner.WriteAlarmState(alarmNodeId, active, acknowledged, sourceTimestampUtc); + + public void RebuildAddressSpace() => _inner.RebuildAddressSpace(); +} diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Host/OpcUa/OtOpcUaServerHostedService.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Host/OpcUa/OtOpcUaServerHostedService.cs new file mode 100644 index 0000000..5e58088 --- /dev/null +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Host/OpcUa/OtOpcUaServerHostedService.cs @@ -0,0 +1,86 @@ +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; +using ZB.MOM.WW.OtOpcUa.OpcUaServer; + +namespace ZB.MOM.WW.OtOpcUa.Host.OpcUa; + +/// +/// Owns the OPC UA SDK lifecycle on driver-role hosts. Reads +/// from the OpcUa config section, boots +/// an through , then +/// swaps a real into the +/// singleton so OpcUaPublishActor's writes +/// start landing in the real address space. +/// +/// Tests boot the OPC UA server directly via ; this +/// hosted service is the production wiring. +/// +public sealed class OtOpcUaServerHostedService : IHostedService, IAsyncDisposable +{ + private readonly IConfiguration _configuration; + private readonly DeferredAddressSpaceSink _deferredSink; + private readonly ILoggerFactory _loggerFactory; + private readonly ILogger _logger; + + private OpcUaApplicationHost? _appHost; + private OtOpcUaSdkServer? _server; + + public OtOpcUaServerHostedService( + IConfiguration configuration, + DeferredAddressSpaceSink deferredSink, + ILoggerFactory loggerFactory) + { + _configuration = configuration; + _deferredSink = deferredSink; + _loggerFactory = loggerFactory; + _logger = loggerFactory.CreateLogger(); + } + + public async Task StartAsync(CancellationToken cancellationToken) + { + var options = new OpcUaApplicationHostOptions(); + _configuration.GetSection("OpcUa").Bind(options); + + _server = new OtOpcUaSdkServer(); + _appHost = new OpcUaApplicationHost(options, _loggerFactory.CreateLogger()); + + try + { + await _appHost.StartAsync(_server, cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + _logger.LogError(ex, + "OtOpcUaServerHostedService: SDK start failed; OpcUaPublishActor writes will continue to no-op"); + // Don't rethrow — the rest of the host (admin UI, driver actors, etc.) can still boot. + // Operators see the failure via the logs + can correct config without a process bounce + // of the whole binary. + return; + } + + if (_server.NodeManager is null) + { + _logger.LogWarning( + "OtOpcUaServerHostedService: SDK reported started but NodeManager is null; sink stays Null"); + return; + } + + _deferredSink.SetSink(new SdkAddressSpaceSink(_server.NodeManager)); + _logger.LogInformation("OtOpcUaServerHostedService: SDK started, address-space sink bound"); + } + + public Task StopAsync(CancellationToken cancellationToken) + { + // Revert to Null sink so any in-flight writes from a poison-pilled actor don't hit a + // half-disposed NodeManager. + _deferredSink.SetSink(null); + return Task.CompletedTask; + } + + public async ValueTask DisposeAsync() + { + if (_appHost is not null) await _appHost.DisposeAsync(); + } +} diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Host/Program.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Host/Program.cs index a847d9e..fcaab4b 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Host/Program.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Host/Program.cs @@ -7,9 +7,11 @@ using ZB.MOM.WW.OtOpcUa.AdminUI.Hubs; using ZB.MOM.WW.OtOpcUa.Cluster; using ZB.MOM.WW.OtOpcUa.Configuration; using ZB.MOM.WW.OtOpcUa.ControlPlane; +using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; using ZB.MOM.WW.OtOpcUa.Host; using ZB.MOM.WW.OtOpcUa.Host.Drivers; using ZB.MOM.WW.OtOpcUa.Host.Health; +using ZB.MOM.WW.OtOpcUa.Host.OpcUa; using ZB.MOM.WW.OtOpcUa.Runtime; using ZB.MOM.WW.OtOpcUa.Security; using ZB.MOM.WW.OtOpcUa.Security.Endpoints; @@ -47,6 +49,14 @@ if (hasDriver) // the F7-default NullDriverFactory with a real DriverFactoryRegistryAdapter so DriverHostActor // can materialise real IDriver instances on deploy. builder.Services.AddOtOpcUaDriverFactories(); + + // Deferred sink so Akka actors can resolve IOpcUaAddressSpaceSink at construction time — + // the OPC UA hosted service swaps in a real SdkAddressSpaceSink once StandardServer has + // started. Until then writes route through NullOpcUaAddressSpaceSink. + builder.Services.AddSingleton(); + builder.Services.AddSingleton(sp => + sp.GetRequiredService()); + builder.Services.AddHostedService(); } // Akka cluster bootstrap. Role-specific singletons are registered on the AkkaConfigurationBuilder diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs index 3fab542..5d93627 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs @@ -3,13 +3,18 @@ using Akka.Hosting; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; using ZB.MOM.WW.OtOpcUa.Commons.Interfaces; +using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; using ZB.MOM.WW.OtOpcUa.Configuration; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian; +using ZB.MOM.WW.OtOpcUa.OpcUaServer; using ZB.MOM.WW.OtOpcUa.Runtime.Drivers; using ZB.MOM.WW.OtOpcUa.Runtime.Health; using ZB.MOM.WW.OtOpcUa.Runtime.Historian; +using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa; using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags; namespace ZB.MOM.WW.OtOpcUa.Runtime; @@ -22,6 +27,7 @@ public static class ServiceCollectionExtensions public const string DbHealthProbeActorName = "db-health"; public const string HistorianAdapterActorName = "historian-adapter"; public const string DependencyMuxActorName = "dependency-mux"; + public const string OpcUaPublishActorName = "opcua-publish"; /// /// Registers shared runtime services. Currently binds @@ -33,6 +39,8 @@ public static class ServiceCollectionExtensions { services.TryAddSingleton(NullAlarmHistorianSink.Instance); services.TryAddSingleton(NullDriverFactory.Instance); + services.TryAddSingleton(NullOpcUaAddressSpaceSink.Instance); + services.TryAddSingleton(NullServiceLevelPublisher.Instance); return services; } @@ -54,6 +62,19 @@ public static class ServiceCollectionExtensions /// public static AkkaConfigurationBuilder WithOtOpcUaRuntimeActors(this AkkaConfigurationBuilder builder) { + // Production cluster HOCON (akka.conf) carries this dispatcher block, but consumers that + // bootstrap their own HOCON (e.g. ServiceCollectionExtensionsTests) wouldn't pick it up + // — OpcUaPublishActor.Props pins itself to opcua-synchronized-dispatcher and Akka throws + // ConfigurationException if it doesn't exist. Prepend a fallback so the runtime extension + // is self-contained. + builder.AddHocon(@" + opcua-synchronized-dispatcher { + type = ""PinnedDispatcher"" + executor = ""thread-pool-executor"" + throughput = 1 + } + ", HoconAddMode.Prepend); + builder.WithActors((system, registry, resolver) => { var dbFactory = resolver.GetService>(); @@ -61,6 +82,9 @@ public static class ServiceCollectionExtensions // Fallback to Null* if AddOtOpcUaRuntime wasn't called (e.g., test harnesses). var historianSink = resolver.GetService() ?? NullAlarmHistorianSink.Instance; var driverFactory = resolver.GetService() ?? NullDriverFactory.Instance; + var addressSpaceSink = resolver.GetService() ?? NullOpcUaAddressSpaceSink.Instance; + var serviceLevel = resolver.GetService() ?? NullServiceLevelPublisher.Instance; + var loggerFactory = resolver.GetService() ?? NullLoggerFactory.Instance; var dbHealth = system.ActorOf( DbHealthProbeActor.Props(dbFactory), @@ -72,10 +96,25 @@ public static class ServiceCollectionExtensions var mux = system.ActorOf(DependencyMuxActor.Props(), DependencyMuxActorName); registry.Register(mux); + // OPC UA publish actor — pinned dispatcher, owns the address-space side of the + // pipeline. Phase7Applier is constructed here so the actor + applier share the + // same sink reference (when DeferredAddressSpaceSink swaps later, both see it). + var applier = new Phase7Applier(addressSpaceSink, loggerFactory.CreateLogger()); + var publishActor = system.ActorOf( + OpcUaPublishActor.Props( + sink: addressSpaceSink, + serviceLevel: serviceLevel, + localNode: roleInfo.LocalNode, + dbFactory: dbFactory, + applier: applier), + OpcUaPublishActorName); + registry.Register(publishActor); + var driverHost = system.ActorOf( DriverHostActor.Props(dbFactory, roleInfo.LocalNode, coordinator: null, driverFactory: driverFactory, localRoles: roleInfo.LocalRoles, - dependencyMux: mux), + dependencyMux: mux, + opcUaPublishActor: publishActor), DriverHostActorName); registry.Register(driverHost); @@ -94,3 +133,4 @@ public sealed class DriverHostActorKey { } public sealed class DbHealthProbeActorKey { } public sealed class HistorianAdapterActorKey { } public sealed class DependencyMuxActorKey { } +public sealed class OpcUaPublishActorKey { } diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests/DeferredAddressSpaceSinkTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests/DeferredAddressSpaceSinkTests.cs new file mode 100644 index 0000000..84b5996 --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests/DeferredAddressSpaceSinkTests.cs @@ -0,0 +1,77 @@ +using System.Collections.Concurrent; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; + +namespace ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests; + +public sealed class DeferredAddressSpaceSinkTests +{ + [Fact] + public void Default_inner_is_null_sink_so_calls_before_SetSink_are_safe() + { + var deferred = new DeferredAddressSpaceSink(); + + // No throw, no observable side effect. + deferred.WriteValue("x", 1, OpcUaQuality.Good, DateTime.UtcNow); + deferred.WriteAlarmState("a", true, false, DateTime.UtcNow); + deferred.RebuildAddressSpace(); + } + + [Fact] + public void Calls_after_SetSink_are_forwarded_to_the_inner() + { + var deferred = new DeferredAddressSpaceSink(); + var inner = new RecordingSink(); + deferred.SetSink(inner); + + deferred.WriteValue("x", 42, OpcUaQuality.Good, DateTime.UtcNow); + deferred.WriteAlarmState("a-1", true, false, DateTime.UtcNow); + deferred.RebuildAddressSpace(); + + inner.Calls.ShouldBe(new[] { "WV:x", "WA:a-1", "RB" }); + } + + [Fact] + public void SetSink_to_null_reverts_to_null_sink() + { + var deferred = new DeferredAddressSpaceSink(); + var inner = new RecordingSink(); + deferred.SetSink(inner); + deferred.WriteValue("x", 1, OpcUaQuality.Good, DateTime.UtcNow); + inner.Calls.Count.ShouldBe(1); + + deferred.SetSink(null); + deferred.WriteValue("y", 2, OpcUaQuality.Good, DateTime.UtcNow); // dropped + inner.Calls.Count.ShouldBe(1); + } + + [Fact] + public void SetSink_can_swap_between_implementations() + { + var deferred = new DeferredAddressSpaceSink(); + var first = new RecordingSink(); + var second = new RecordingSink(); + + deferred.SetSink(first); + deferred.WriteValue("a", 1, OpcUaQuality.Good, DateTime.UtcNow); + + deferred.SetSink(second); + deferred.WriteValue("b", 2, OpcUaQuality.Good, DateTime.UtcNow); + + first.Calls.Single().ShouldBe("WV:a"); + second.Calls.Single().ShouldBe("WV:b"); + } + + private sealed class RecordingSink : IOpcUaAddressSpaceSink + { + public ConcurrentQueue CallQueue { get; } = new(); + public List Calls => CallQueue.ToList(); + + public void WriteValue(string nodeId, object? value, OpcUaQuality quality, DateTime sourceTimestampUtc) + => CallQueue.Enqueue($"WV:{nodeId}"); + public void WriteAlarmState(string alarmNodeId, bool active, bool acknowledged, DateTime sourceTimestampUtc) + => CallQueue.Enqueue($"WA:{alarmNodeId}"); + public void RebuildAddressSpace() => CallQueue.Enqueue("RB"); + } +} diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ServiceCollectionExtensionsTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ServiceCollectionExtensionsTests.cs index 0d5d2a1..dec9c37 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ServiceCollectionExtensionsTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ServiceCollectionExtensionsTests.cs @@ -47,13 +47,19 @@ public sealed class ServiceCollectionExtensionsTests var driverHost = host.Services.GetRequiredService>(); var dbHealth = host.Services.GetRequiredService>(); var historian = host.Services.GetRequiredService>(); + var mux = host.Services.GetRequiredService>(); + var publish = host.Services.GetRequiredService>(); driverHost.ActorRef.ShouldNotBeNull(); dbHealth.ActorRef.ShouldNotBeNull(); historian.ActorRef.ShouldNotBeNull(); + mux.ActorRef.ShouldNotBeNull(); + publish.ActorRef.ShouldNotBeNull(); driverHost.ActorRef.Path.Name.ShouldBe(ServiceCollectionExtensions.DriverHostActorName); dbHealth.ActorRef.Path.Name.ShouldBe(ServiceCollectionExtensions.DbHealthProbeActorName); historian.ActorRef.Path.Name.ShouldBe(ServiceCollectionExtensions.HistorianAdapterActorName); + mux.ActorRef.Path.Name.ShouldBe(ServiceCollectionExtensions.DependencyMuxActorName); + publish.ActorRef.Path.Name.ShouldBe(ServiceCollectionExtensions.OpcUaPublishActorName); } finally {