feat(opcua,host): #81 ServiceLevel SDK publisher

SdkServiceLevelPublisher writes Server.ServiceLevel through the SDK's
ServerObjectState — the standard OPC UA non-transparent-redundancy signal
clients use to pick a primary. Writes are guarded by DiagnosticsLock so
concurrent SDK diagnostics scans don't fight with our updates.

DeferredServiceLevelPublisher mirrors the DeferredAddressSpaceSink late-
binding pattern: Akka actors resolve IServiceLevelPublisher at construction,
hosted service swaps the SDK publisher in after StandardServer.Start. Host
Program.cs registers DeferredServiceLevelPublisher as the singleton bound
to IServiceLevelPublisher; OtOpcUaServerHostedService gets it injected and
fills it once IServerInternal is available.

Tests boot a real StandardServer on a free port (cross-platform), call
Publish, then verify ServerObject.ServiceLevel.Value reflects the write.
5 new tests; OpcUaServer suite now 45/45 green (was 40, +5).

Closes #81 residual. Unblocks Task 60 (OPC UA dual-endpoint + ServiceLevel
tests).
This commit is contained in:
Joseph Doherty
2026-05-26 10:37:42 -04:00
parent 52997ee164
commit 2697af31d1
6 changed files with 238 additions and 2 deletions

View File

@@ -0,0 +1,19 @@
namespace ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
/// <summary>
/// Late-binding adapter that holds an inner <see cref="IServiceLevelPublisher"/> reference
/// swappable at runtime. Mirrors <see cref="DeferredAddressSpaceSink"/>: Akka actors resolve
/// the publisher at DI time, but the production <c>SdkServiceLevelPublisher</c> only exists
/// after <c>StandardServer.Start</c>. The Host's hosted service swaps the inner once the SDK
/// is up; until then writes route through <see cref="NullServiceLevelPublisher"/>.
/// </summary>
public sealed class DeferredServiceLevelPublisher : IServiceLevelPublisher
{
private volatile IServiceLevelPublisher _inner = NullServiceLevelPublisher.Instance;
/// <summary>Swap the underlying publisher. Pass null to revert to the Null no-op.</summary>
public void SetInner(IServiceLevelPublisher? inner) =>
_inner = inner ?? NullServiceLevelPublisher.Instance;
public void Publish(byte serviceLevel) => _inner.Publish(serviceLevel);
}

View File

@@ -22,6 +22,7 @@ public sealed class OtOpcUaServerHostedService : IHostedService, IAsyncDisposabl
{
private readonly IConfiguration _configuration;
private readonly DeferredAddressSpaceSink _deferredSink;
private readonly DeferredServiceLevelPublisher _deferredServiceLevel;
private readonly IOpcUaUserAuthenticator _userAuthenticator;
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger<OtOpcUaServerHostedService> _logger;
@@ -32,11 +33,13 @@ public sealed class OtOpcUaServerHostedService : IHostedService, IAsyncDisposabl
public OtOpcUaServerHostedService(
IConfiguration configuration,
DeferredAddressSpaceSink deferredSink,
DeferredServiceLevelPublisher deferredServiceLevel,
IOpcUaUserAuthenticator userAuthenticator,
ILoggerFactory loggerFactory)
{
_configuration = configuration;
_deferredSink = deferredSink;
_deferredServiceLevel = deferredServiceLevel;
_userAuthenticator = userAuthenticator;
_loggerFactory = loggerFactory;
_logger = loggerFactory.CreateLogger<OtOpcUaServerHostedService>();
@@ -75,14 +78,24 @@ public sealed class OtOpcUaServerHostedService : IHostedService, IAsyncDisposabl
}
_deferredSink.SetSink(new SdkAddressSpaceSink(_server.NodeManager));
_logger.LogInformation("OtOpcUaServerHostedService: SDK started, address-space sink bound");
// ServiceLevel publisher needs IServerInternal — only available after Start.
if (_server.CurrentInstance is { } serverInternal)
{
_deferredServiceLevel.SetInner(new SdkServiceLevelPublisher(
serverInternal,
_loggerFactory.CreateLogger<SdkServiceLevelPublisher>()));
}
_logger.LogInformation("OtOpcUaServerHostedService: SDK started, address-space + ServiceLevel sinks bound");
}
public Task StopAsync(CancellationToken cancellationToken)
{
// Revert to Null sink so any in-flight writes from a poison-pilled actor don't hit a
// Revert to Null adapters so any in-flight writes from a poison-pilled actor don't hit a
// half-disposed NodeManager.
_deferredSink.SetSink(null);
_deferredServiceLevel.SetInner(null);
return Task.CompletedTask;
}

View File

@@ -60,6 +60,12 @@ if (hasDriver)
builder.Services.AddSingleton<IOpcUaAddressSpaceSink>(sp =>
sp.GetRequiredService<DeferredAddressSpaceSink>());
// Same late-binding pattern for the ServiceLevel publisher — actor wants it at ctor time,
// production SdkServiceLevelPublisher needs IServerInternal which only exists after Start.
builder.Services.AddSingleton<DeferredServiceLevelPublisher>();
builder.Services.AddSingleton<IServiceLevelPublisher>(sp =>
sp.GetRequiredService<DeferredServiceLevelPublisher>());
// F13c — bind UserName tokens to the same LDAP backend the Admin cookie/JWT flows use.
// ILdapAuthService is registered by AddOtOpcUaAuth on admin nodes; on driver-only nodes
// it isn't, so we register the LDAP options + service unconditionally for driver hosts

View File

@@ -0,0 +1,56 @@
using Microsoft.Extensions.Logging;
using Opc.Ua;
using Opc.Ua.Server;
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
namespace ZB.MOM.WW.OtOpcUa.OpcUaServer;
/// <summary>
/// Production <see cref="IServiceLevelPublisher"/> that writes the OPC UA Server object's
/// <c>ServiceLevel</c> Variable through the SDK. Clients reading
/// <c>VariableIds.Server_ServiceLevel</c> see the live value updated whenever the redundancy
/// state changes — that's the standard OPC UA non-transparent-redundancy signal callers use
/// to pick a primary.
///
/// Uses <see cref="IServerInternal.ServerObject"/> (a <see cref="ServerObjectState"/>) and
/// its <see cref="ServerObjectState.ServiceLevel"/> child variable, which the SDK populates
/// automatically during <see cref="DiagnosticsNodeManager"/> initialization. Writes are
/// guarded by <see cref="IServerInternal.DiagnosticsLock"/> so concurrent diagnostics scans
/// from the SDK don't fight with our update.
/// </summary>
public sealed class SdkServiceLevelPublisher : IServiceLevelPublisher
{
private readonly IServerInternal _serverInternal;
private readonly ILogger<SdkServiceLevelPublisher> _logger;
public SdkServiceLevelPublisher(IServerInternal serverInternal, ILogger<SdkServiceLevelPublisher> logger)
{
_serverInternal = serverInternal;
_logger = logger;
}
public void Publish(byte serviceLevel)
{
var node = _serverInternal.ServerObject?.ServiceLevel;
if (node is null)
{
_logger.LogWarning("SdkServiceLevelPublisher: ServerObject.ServiceLevel unavailable; skipping write");
return;
}
try
{
lock (_serverInternal.DiagnosticsLock)
{
node.Value = serviceLevel;
node.Timestamp = DateTime.UtcNow;
node.StatusCode = StatusCodes.Good;
node.ClearChangeMasks(_serverInternal.DefaultSystemContext, includeChildren: false);
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, "SdkServiceLevelPublisher: write to Server.ServiceLevel threw");
}
}
}

View File

@@ -0,0 +1,48 @@
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
namespace ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests;
public sealed class DeferredServiceLevelPublisherTests
{
[Fact]
public void Publish_before_SetInner_is_a_safe_noop()
{
var deferred = new DeferredServiceLevelPublisher();
Should.NotThrow(() => deferred.Publish(123));
}
[Fact]
public void Publish_after_SetInner_routes_to_the_inner()
{
var recording = new RecordingPublisher();
var deferred = new DeferredServiceLevelPublisher();
deferred.SetInner(recording);
deferred.Publish(200);
recording.LastValue.ShouldBe((byte)200);
}
[Fact]
public void SetInner_null_reverts_to_Null_publisher()
{
var recording = new RecordingPublisher();
var deferred = new DeferredServiceLevelPublisher();
deferred.SetInner(recording);
deferred.Publish(50);
deferred.SetInner(null);
deferred.Publish(99);
recording.LastValue.ShouldBe((byte)50, "writes after SetInner(null) must not reach the previous inner");
}
private sealed class RecordingPublisher : IServiceLevelPublisher
{
public byte? LastValue { get; private set; }
public void Publish(byte serviceLevel) => LastValue = serviceLevel;
}
}

View File

@@ -0,0 +1,94 @@
using Microsoft.Extensions.Logging.Abstractions;
using Opc.Ua.Server;
using Shouldly;
using Xunit;
namespace ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests;
/// <summary>
/// #81 residual — verifies <see cref="SdkServiceLevelPublisher"/> locates the standard
/// <c>VariableIds.Server_ServiceLevel</c> node through the SDK's DiagnosticsNodeManager and
/// writes the byte value. Boots a real <see cref="StandardServer"/> on a free port so the
/// SDK populates its predefined diagnostics nodes — that's what production sees.
/// </summary>
public sealed class SdkServiceLevelPublisherTests : IDisposable
{
private static CancellationToken Ct => TestContext.Current.CancellationToken;
private readonly string _pkiRoot = Path.Combine(
Path.GetTempPath(),
$"otopcua-pki-{Guid.NewGuid():N}");
[Fact]
public async Task Publish_writes_value_to_Server_ServiceLevel_variable()
{
var server = new StandardServer();
await using var host = new OpcUaApplicationHost(
new OpcUaApplicationHostOptions
{
ApplicationName = "OtOpcUa.SvcLevel",
ApplicationUri = $"urn:OtOpcUa.SvcLevel:{Guid.NewGuid():N}",
OpcUaPort = AllocateFreePort(),
PublicHostname = "localhost",
PkiStoreRoot = _pkiRoot,
},
NullLogger<OpcUaApplicationHost>.Instance);
await host.StartAsync(server, Ct);
var publisher = new SdkServiceLevelPublisher(
server.CurrentInstance,
NullLogger<SdkServiceLevelPublisher>.Instance);
publisher.Publish(200);
var variable = server.CurrentInstance.ServerObject.ServiceLevel;
variable.ShouldNotBeNull("Server.ServiceLevel must be present in the address space");
variable.Value.ShouldBe((byte)200);
}
[Fact]
public async Task Publish_is_idempotent_when_called_multiple_times()
{
var server = new StandardServer();
await using var host = new OpcUaApplicationHost(
new OpcUaApplicationHostOptions
{
ApplicationName = "OtOpcUa.SvcLevel.Idem",
ApplicationUri = $"urn:OtOpcUa.SvcLevel.Idem:{Guid.NewGuid():N}",
OpcUaPort = AllocateFreePort(),
PublicHostname = "localhost",
PkiStoreRoot = _pkiRoot,
},
NullLogger<OpcUaApplicationHost>.Instance);
await host.StartAsync(server, Ct);
var publisher = new SdkServiceLevelPublisher(
server.CurrentInstance,
NullLogger<SdkServiceLevelPublisher>.Instance);
publisher.Publish(100);
publisher.Publish(150);
publisher.Publish(240);
server.CurrentInstance.ServerObject.ServiceLevel.Value.ShouldBe((byte)240);
}
private static int AllocateFreePort()
{
using var listener = new System.Net.Sockets.TcpListener(System.Net.IPAddress.Loopback, 0);
listener.Start();
var port = ((System.Net.IPEndPoint)listener.LocalEndpoint).Port;
listener.Stop();
return port;
}
public void Dispose()
{
if (Directory.Exists(_pkiRoot))
{
try { Directory.Delete(_pkiRoot, recursive: true); }
catch { /* best-effort */ }
}
}
}