diff --git a/src/Core/ZB.MOM.WW.OtOpcUa.Commons/OpcUa/DeferredAddressSpaceSink.cs b/src/Core/ZB.MOM.WW.OtOpcUa.Commons/OpcUa/DeferredAddressSpaceSink.cs new file mode 100644 index 0000000..f79258f --- /dev/null +++ b/src/Core/ZB.MOM.WW.OtOpcUa.Commons/OpcUa/DeferredAddressSpaceSink.cs @@ -0,0 +1,31 @@ +namespace ZB.MOM.WW.OtOpcUa.Commons.OpcUa; + +/// +/// Wrapper that defers to an inner sink swapped in at +/// runtime. Needed because the production sink (SdkAddressSpaceSink) wraps an +/// OtOpcUaNodeManager that only exists after the SDK StandardServer has +/// started — but Akka actors resolve their sink dependency at construction time, before +/// the hosted service has booted the SDK. +/// +/// Bound as a singleton in DI on driver-role hosts; the OPC UA hosted service calls +/// once the server is up. Until that swap happens, every call is a +/// no-op against , so the actor stays safe to +/// receive messages from the moment it boots. +/// +public sealed class DeferredAddressSpaceSink : IOpcUaAddressSpaceSink +{ + private volatile IOpcUaAddressSpaceSink _inner = NullOpcUaAddressSpaceSink.Instance; + + /// Swap in the production sink. Pass null to revert to the null sink + /// (used during graceful shutdown so post-stop writes don't hit a half-disposed manager). + public void SetSink(IOpcUaAddressSpaceSink? sink) => + _inner = sink ?? NullOpcUaAddressSpaceSink.Instance; + + public void WriteValue(string nodeId, object? value, OpcUaQuality quality, DateTime sourceTimestampUtc) + => _inner.WriteValue(nodeId, value, quality, sourceTimestampUtc); + + public void WriteAlarmState(string alarmNodeId, bool active, bool acknowledged, DateTime sourceTimestampUtc) + => _inner.WriteAlarmState(alarmNodeId, active, acknowledged, sourceTimestampUtc); + + public void RebuildAddressSpace() => _inner.RebuildAddressSpace(); +} diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Host/OpcUa/OtOpcUaServerHostedService.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Host/OpcUa/OtOpcUaServerHostedService.cs new file mode 100644 index 0000000..5e58088 --- /dev/null +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Host/OpcUa/OtOpcUaServerHostedService.cs @@ -0,0 +1,86 @@ +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; +using ZB.MOM.WW.OtOpcUa.OpcUaServer; + +namespace ZB.MOM.WW.OtOpcUa.Host.OpcUa; + +/// +/// Owns the OPC UA SDK lifecycle on driver-role hosts. Reads +/// from the OpcUa config section, boots +/// an through , then +/// swaps a real into the +/// singleton so OpcUaPublishActor's writes +/// start landing in the real address space. +/// +/// Tests boot the OPC UA server directly via ; this +/// hosted service is the production wiring. +/// +public sealed class OtOpcUaServerHostedService : IHostedService, IAsyncDisposable +{ + private readonly IConfiguration _configuration; + private readonly DeferredAddressSpaceSink _deferredSink; + private readonly ILoggerFactory _loggerFactory; + private readonly ILogger _logger; + + private OpcUaApplicationHost? _appHost; + private OtOpcUaSdkServer? _server; + + public OtOpcUaServerHostedService( + IConfiguration configuration, + DeferredAddressSpaceSink deferredSink, + ILoggerFactory loggerFactory) + { + _configuration = configuration; + _deferredSink = deferredSink; + _loggerFactory = loggerFactory; + _logger = loggerFactory.CreateLogger(); + } + + public async Task StartAsync(CancellationToken cancellationToken) + { + var options = new OpcUaApplicationHostOptions(); + _configuration.GetSection("OpcUa").Bind(options); + + _server = new OtOpcUaSdkServer(); + _appHost = new OpcUaApplicationHost(options, _loggerFactory.CreateLogger()); + + try + { + await _appHost.StartAsync(_server, cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + _logger.LogError(ex, + "OtOpcUaServerHostedService: SDK start failed; OpcUaPublishActor writes will continue to no-op"); + // Don't rethrow — the rest of the host (admin UI, driver actors, etc.) can still boot. + // Operators see the failure via the logs + can correct config without a process bounce + // of the whole binary. + return; + } + + if (_server.NodeManager is null) + { + _logger.LogWarning( + "OtOpcUaServerHostedService: SDK reported started but NodeManager is null; sink stays Null"); + return; + } + + _deferredSink.SetSink(new SdkAddressSpaceSink(_server.NodeManager)); + _logger.LogInformation("OtOpcUaServerHostedService: SDK started, address-space sink bound"); + } + + public Task StopAsync(CancellationToken cancellationToken) + { + // Revert to Null sink so any in-flight writes from a poison-pilled actor don't hit a + // half-disposed NodeManager. + _deferredSink.SetSink(null); + return Task.CompletedTask; + } + + public async ValueTask DisposeAsync() + { + if (_appHost is not null) await _appHost.DisposeAsync(); + } +} diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Host/Program.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Host/Program.cs index a847d9e..fcaab4b 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Host/Program.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Host/Program.cs @@ -7,9 +7,11 @@ using ZB.MOM.WW.OtOpcUa.AdminUI.Hubs; using ZB.MOM.WW.OtOpcUa.Cluster; using ZB.MOM.WW.OtOpcUa.Configuration; using ZB.MOM.WW.OtOpcUa.ControlPlane; +using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; using ZB.MOM.WW.OtOpcUa.Host; using ZB.MOM.WW.OtOpcUa.Host.Drivers; using ZB.MOM.WW.OtOpcUa.Host.Health; +using ZB.MOM.WW.OtOpcUa.Host.OpcUa; using ZB.MOM.WW.OtOpcUa.Runtime; using ZB.MOM.WW.OtOpcUa.Security; using ZB.MOM.WW.OtOpcUa.Security.Endpoints; @@ -47,6 +49,14 @@ if (hasDriver) // the F7-default NullDriverFactory with a real DriverFactoryRegistryAdapter so DriverHostActor // can materialise real IDriver instances on deploy. builder.Services.AddOtOpcUaDriverFactories(); + + // Deferred sink so Akka actors can resolve IOpcUaAddressSpaceSink at construction time — + // the OPC UA hosted service swaps in a real SdkAddressSpaceSink once StandardServer has + // started. Until then writes route through NullOpcUaAddressSpaceSink. + builder.Services.AddSingleton(); + builder.Services.AddSingleton(sp => + sp.GetRequiredService()); + builder.Services.AddHostedService(); } // Akka cluster bootstrap. Role-specific singletons are registered on the AkkaConfigurationBuilder diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs index 3fab542..5d93627 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs @@ -3,13 +3,18 @@ using Akka.Hosting; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; using ZB.MOM.WW.OtOpcUa.Commons.Interfaces; +using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; using ZB.MOM.WW.OtOpcUa.Configuration; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian; +using ZB.MOM.WW.OtOpcUa.OpcUaServer; using ZB.MOM.WW.OtOpcUa.Runtime.Drivers; using ZB.MOM.WW.OtOpcUa.Runtime.Health; using ZB.MOM.WW.OtOpcUa.Runtime.Historian; +using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa; using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags; namespace ZB.MOM.WW.OtOpcUa.Runtime; @@ -22,6 +27,7 @@ public static class ServiceCollectionExtensions public const string DbHealthProbeActorName = "db-health"; public const string HistorianAdapterActorName = "historian-adapter"; public const string DependencyMuxActorName = "dependency-mux"; + public const string OpcUaPublishActorName = "opcua-publish"; /// /// Registers shared runtime services. Currently binds @@ -33,6 +39,8 @@ public static class ServiceCollectionExtensions { services.TryAddSingleton(NullAlarmHistorianSink.Instance); services.TryAddSingleton(NullDriverFactory.Instance); + services.TryAddSingleton(NullOpcUaAddressSpaceSink.Instance); + services.TryAddSingleton(NullServiceLevelPublisher.Instance); return services; } @@ -54,6 +62,19 @@ public static class ServiceCollectionExtensions /// public static AkkaConfigurationBuilder WithOtOpcUaRuntimeActors(this AkkaConfigurationBuilder builder) { + // Production cluster HOCON (akka.conf) carries this dispatcher block, but consumers that + // bootstrap their own HOCON (e.g. ServiceCollectionExtensionsTests) wouldn't pick it up + // — OpcUaPublishActor.Props pins itself to opcua-synchronized-dispatcher and Akka throws + // ConfigurationException if it doesn't exist. Prepend a fallback so the runtime extension + // is self-contained. + builder.AddHocon(@" + opcua-synchronized-dispatcher { + type = ""PinnedDispatcher"" + executor = ""thread-pool-executor"" + throughput = 1 + } + ", HoconAddMode.Prepend); + builder.WithActors((system, registry, resolver) => { var dbFactory = resolver.GetService>(); @@ -61,6 +82,9 @@ public static class ServiceCollectionExtensions // Fallback to Null* if AddOtOpcUaRuntime wasn't called (e.g., test harnesses). var historianSink = resolver.GetService() ?? NullAlarmHistorianSink.Instance; var driverFactory = resolver.GetService() ?? NullDriverFactory.Instance; + var addressSpaceSink = resolver.GetService() ?? NullOpcUaAddressSpaceSink.Instance; + var serviceLevel = resolver.GetService() ?? NullServiceLevelPublisher.Instance; + var loggerFactory = resolver.GetService() ?? NullLoggerFactory.Instance; var dbHealth = system.ActorOf( DbHealthProbeActor.Props(dbFactory), @@ -72,10 +96,25 @@ public static class ServiceCollectionExtensions var mux = system.ActorOf(DependencyMuxActor.Props(), DependencyMuxActorName); registry.Register(mux); + // OPC UA publish actor — pinned dispatcher, owns the address-space side of the + // pipeline. Phase7Applier is constructed here so the actor + applier share the + // same sink reference (when DeferredAddressSpaceSink swaps later, both see it). + var applier = new Phase7Applier(addressSpaceSink, loggerFactory.CreateLogger()); + var publishActor = system.ActorOf( + OpcUaPublishActor.Props( + sink: addressSpaceSink, + serviceLevel: serviceLevel, + localNode: roleInfo.LocalNode, + dbFactory: dbFactory, + applier: applier), + OpcUaPublishActorName); + registry.Register(publishActor); + var driverHost = system.ActorOf( DriverHostActor.Props(dbFactory, roleInfo.LocalNode, coordinator: null, driverFactory: driverFactory, localRoles: roleInfo.LocalRoles, - dependencyMux: mux), + dependencyMux: mux, + opcUaPublishActor: publishActor), DriverHostActorName); registry.Register(driverHost); @@ -94,3 +133,4 @@ public sealed class DriverHostActorKey { } public sealed class DbHealthProbeActorKey { } public sealed class HistorianAdapterActorKey { } public sealed class DependencyMuxActorKey { } +public sealed class OpcUaPublishActorKey { } diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests/DeferredAddressSpaceSinkTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests/DeferredAddressSpaceSinkTests.cs new file mode 100644 index 0000000..84b5996 --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests/DeferredAddressSpaceSinkTests.cs @@ -0,0 +1,77 @@ +using System.Collections.Concurrent; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; + +namespace ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests; + +public sealed class DeferredAddressSpaceSinkTests +{ + [Fact] + public void Default_inner_is_null_sink_so_calls_before_SetSink_are_safe() + { + var deferred = new DeferredAddressSpaceSink(); + + // No throw, no observable side effect. + deferred.WriteValue("x", 1, OpcUaQuality.Good, DateTime.UtcNow); + deferred.WriteAlarmState("a", true, false, DateTime.UtcNow); + deferred.RebuildAddressSpace(); + } + + [Fact] + public void Calls_after_SetSink_are_forwarded_to_the_inner() + { + var deferred = new DeferredAddressSpaceSink(); + var inner = new RecordingSink(); + deferred.SetSink(inner); + + deferred.WriteValue("x", 42, OpcUaQuality.Good, DateTime.UtcNow); + deferred.WriteAlarmState("a-1", true, false, DateTime.UtcNow); + deferred.RebuildAddressSpace(); + + inner.Calls.ShouldBe(new[] { "WV:x", "WA:a-1", "RB" }); + } + + [Fact] + public void SetSink_to_null_reverts_to_null_sink() + { + var deferred = new DeferredAddressSpaceSink(); + var inner = new RecordingSink(); + deferred.SetSink(inner); + deferred.WriteValue("x", 1, OpcUaQuality.Good, DateTime.UtcNow); + inner.Calls.Count.ShouldBe(1); + + deferred.SetSink(null); + deferred.WriteValue("y", 2, OpcUaQuality.Good, DateTime.UtcNow); // dropped + inner.Calls.Count.ShouldBe(1); + } + + [Fact] + public void SetSink_can_swap_between_implementations() + { + var deferred = new DeferredAddressSpaceSink(); + var first = new RecordingSink(); + var second = new RecordingSink(); + + deferred.SetSink(first); + deferred.WriteValue("a", 1, OpcUaQuality.Good, DateTime.UtcNow); + + deferred.SetSink(second); + deferred.WriteValue("b", 2, OpcUaQuality.Good, DateTime.UtcNow); + + first.Calls.Single().ShouldBe("WV:a"); + second.Calls.Single().ShouldBe("WV:b"); + } + + private sealed class RecordingSink : IOpcUaAddressSpaceSink + { + public ConcurrentQueue CallQueue { get; } = new(); + public List Calls => CallQueue.ToList(); + + public void WriteValue(string nodeId, object? value, OpcUaQuality quality, DateTime sourceTimestampUtc) + => CallQueue.Enqueue($"WV:{nodeId}"); + public void WriteAlarmState(string alarmNodeId, bool active, bool acknowledged, DateTime sourceTimestampUtc) + => CallQueue.Enqueue($"WA:{alarmNodeId}"); + public void RebuildAddressSpace() => CallQueue.Enqueue("RB"); + } +} diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ServiceCollectionExtensionsTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ServiceCollectionExtensionsTests.cs index 0d5d2a1..dec9c37 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ServiceCollectionExtensionsTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ServiceCollectionExtensionsTests.cs @@ -47,13 +47,19 @@ public sealed class ServiceCollectionExtensionsTests var driverHost = host.Services.GetRequiredService>(); var dbHealth = host.Services.GetRequiredService>(); var historian = host.Services.GetRequiredService>(); + var mux = host.Services.GetRequiredService>(); + var publish = host.Services.GetRequiredService>(); driverHost.ActorRef.ShouldNotBeNull(); dbHealth.ActorRef.ShouldNotBeNull(); historian.ActorRef.ShouldNotBeNull(); + mux.ActorRef.ShouldNotBeNull(); + publish.ActorRef.ShouldNotBeNull(); driverHost.ActorRef.Path.Name.ShouldBe(ServiceCollectionExtensions.DriverHostActorName); dbHealth.ActorRef.Path.Name.ShouldBe(ServiceCollectionExtensions.DbHealthProbeActorName); historian.ActorRef.Path.Name.ShouldBe(ServiceCollectionExtensions.HistorianAdapterActorName); + mux.ActorRef.Path.Name.ShouldBe(ServiceCollectionExtensions.DependencyMuxActorName); + publish.ActorRef.Path.Name.ShouldBe(ServiceCollectionExtensions.OpcUaPublishActorName); } finally {