Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 9d86287d08 | |||
| 2697af31d1 |
@@ -0,0 +1,19 @@
|
||||
namespace ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
||||
|
||||
/// <summary>
|
||||
/// Late-binding adapter that holds an inner <see cref="IServiceLevelPublisher"/> reference
|
||||
/// swappable at runtime. Mirrors <see cref="DeferredAddressSpaceSink"/>: Akka actors resolve
|
||||
/// the publisher at DI time, but the production <c>SdkServiceLevelPublisher</c> only exists
|
||||
/// after <c>StandardServer.Start</c>. The Host's hosted service swaps the inner once the SDK
|
||||
/// is up; until then writes route through <see cref="NullServiceLevelPublisher"/>.
|
||||
/// </summary>
|
||||
public sealed class DeferredServiceLevelPublisher : IServiceLevelPublisher
|
||||
{
|
||||
private volatile IServiceLevelPublisher _inner = NullServiceLevelPublisher.Instance;
|
||||
|
||||
/// <summary>Swap the underlying publisher. Pass null to revert to the Null no-op.</summary>
|
||||
public void SetInner(IServiceLevelPublisher? inner) =>
|
||||
_inner = inner ?? NullServiceLevelPublisher.Instance;
|
||||
|
||||
public void Publish(byte serviceLevel) => _inner.Publish(serviceLevel);
|
||||
}
|
||||
@@ -22,6 +22,7 @@ public sealed class OtOpcUaServerHostedService : IHostedService, IAsyncDisposabl
|
||||
{
|
||||
private readonly IConfiguration _configuration;
|
||||
private readonly DeferredAddressSpaceSink _deferredSink;
|
||||
private readonly DeferredServiceLevelPublisher _deferredServiceLevel;
|
||||
private readonly IOpcUaUserAuthenticator _userAuthenticator;
|
||||
private readonly ILoggerFactory _loggerFactory;
|
||||
private readonly ILogger<OtOpcUaServerHostedService> _logger;
|
||||
@@ -32,11 +33,13 @@ public sealed class OtOpcUaServerHostedService : IHostedService, IAsyncDisposabl
|
||||
public OtOpcUaServerHostedService(
|
||||
IConfiguration configuration,
|
||||
DeferredAddressSpaceSink deferredSink,
|
||||
DeferredServiceLevelPublisher deferredServiceLevel,
|
||||
IOpcUaUserAuthenticator userAuthenticator,
|
||||
ILoggerFactory loggerFactory)
|
||||
{
|
||||
_configuration = configuration;
|
||||
_deferredSink = deferredSink;
|
||||
_deferredServiceLevel = deferredServiceLevel;
|
||||
_userAuthenticator = userAuthenticator;
|
||||
_loggerFactory = loggerFactory;
|
||||
_logger = loggerFactory.CreateLogger<OtOpcUaServerHostedService>();
|
||||
@@ -75,14 +78,24 @@ public sealed class OtOpcUaServerHostedService : IHostedService, IAsyncDisposabl
|
||||
}
|
||||
|
||||
_deferredSink.SetSink(new SdkAddressSpaceSink(_server.NodeManager));
|
||||
_logger.LogInformation("OtOpcUaServerHostedService: SDK started, address-space sink bound");
|
||||
|
||||
// ServiceLevel publisher needs IServerInternal — only available after Start.
|
||||
if (_server.CurrentInstance is { } serverInternal)
|
||||
{
|
||||
_deferredServiceLevel.SetInner(new SdkServiceLevelPublisher(
|
||||
serverInternal,
|
||||
_loggerFactory.CreateLogger<SdkServiceLevelPublisher>()));
|
||||
}
|
||||
|
||||
_logger.LogInformation("OtOpcUaServerHostedService: SDK started, address-space + ServiceLevel sinks bound");
|
||||
}
|
||||
|
||||
public Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
// Revert to Null sink so any in-flight writes from a poison-pilled actor don't hit a
|
||||
// Revert to Null adapters so any in-flight writes from a poison-pilled actor don't hit a
|
||||
// half-disposed NodeManager.
|
||||
_deferredSink.SetSink(null);
|
||||
_deferredServiceLevel.SetInner(null);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
|
||||
@@ -60,6 +60,12 @@ if (hasDriver)
|
||||
builder.Services.AddSingleton<IOpcUaAddressSpaceSink>(sp =>
|
||||
sp.GetRequiredService<DeferredAddressSpaceSink>());
|
||||
|
||||
// Same late-binding pattern for the ServiceLevel publisher — actor wants it at ctor time,
|
||||
// production SdkServiceLevelPublisher needs IServerInternal which only exists after Start.
|
||||
builder.Services.AddSingleton<DeferredServiceLevelPublisher>();
|
||||
builder.Services.AddSingleton<IServiceLevelPublisher>(sp =>
|
||||
sp.GetRequiredService<DeferredServiceLevelPublisher>());
|
||||
|
||||
// F13c — bind UserName tokens to the same LDAP backend the Admin cookie/JWT flows use.
|
||||
// ILdapAuthService is registered by AddOtOpcUaAuth on admin nodes; on driver-only nodes
|
||||
// it isn't, so we register the LDAP options + service unconditionally for driver hosts
|
||||
|
||||
@@ -0,0 +1,56 @@
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Opc.Ua;
|
||||
using Opc.Ua.Server;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.OpcUaServer;
|
||||
|
||||
/// <summary>
|
||||
/// Production <see cref="IServiceLevelPublisher"/> that writes the OPC UA Server object's
|
||||
/// <c>ServiceLevel</c> Variable through the SDK. Clients reading
|
||||
/// <c>VariableIds.Server_ServiceLevel</c> see the live value updated whenever the redundancy
|
||||
/// state changes — that's the standard OPC UA non-transparent-redundancy signal callers use
|
||||
/// to pick a primary.
|
||||
///
|
||||
/// Uses <see cref="IServerInternal.ServerObject"/> (a <see cref="ServerObjectState"/>) and
|
||||
/// its <see cref="ServerObjectState.ServiceLevel"/> child variable, which the SDK populates
|
||||
/// automatically during <see cref="DiagnosticsNodeManager"/> initialization. Writes are
|
||||
/// guarded by <see cref="IServerInternal.DiagnosticsLock"/> so concurrent diagnostics scans
|
||||
/// from the SDK don't fight with our update.
|
||||
/// </summary>
|
||||
public sealed class SdkServiceLevelPublisher : IServiceLevelPublisher
|
||||
{
|
||||
private readonly IServerInternal _serverInternal;
|
||||
private readonly ILogger<SdkServiceLevelPublisher> _logger;
|
||||
|
||||
public SdkServiceLevelPublisher(IServerInternal serverInternal, ILogger<SdkServiceLevelPublisher> logger)
|
||||
{
|
||||
_serverInternal = serverInternal;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
public void Publish(byte serviceLevel)
|
||||
{
|
||||
var node = _serverInternal.ServerObject?.ServiceLevel;
|
||||
if (node is null)
|
||||
{
|
||||
_logger.LogWarning("SdkServiceLevelPublisher: ServerObject.ServiceLevel unavailable; skipping write");
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
lock (_serverInternal.DiagnosticsLock)
|
||||
{
|
||||
node.Value = serviceLevel;
|
||||
node.Timestamp = DateTime.UtcNow;
|
||||
node.StatusCode = StatusCodes.Good;
|
||||
node.ClearChangeMasks(_serverInternal.DefaultSystemContext, includeChildren: false);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "SdkServiceLevelPublisher: write to Server.ServiceLevel threw");
|
||||
}
|
||||
}
|
||||
}
|
||||
+48
@@ -0,0 +1,48 @@
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests;
|
||||
|
||||
public sealed class DeferredServiceLevelPublisherTests
|
||||
{
|
||||
[Fact]
|
||||
public void Publish_before_SetInner_is_a_safe_noop()
|
||||
{
|
||||
var deferred = new DeferredServiceLevelPublisher();
|
||||
|
||||
Should.NotThrow(() => deferred.Publish(123));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Publish_after_SetInner_routes_to_the_inner()
|
||||
{
|
||||
var recording = new RecordingPublisher();
|
||||
var deferred = new DeferredServiceLevelPublisher();
|
||||
deferred.SetInner(recording);
|
||||
|
||||
deferred.Publish(200);
|
||||
|
||||
recording.LastValue.ShouldBe((byte)200);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void SetInner_null_reverts_to_Null_publisher()
|
||||
{
|
||||
var recording = new RecordingPublisher();
|
||||
var deferred = new DeferredServiceLevelPublisher();
|
||||
deferred.SetInner(recording);
|
||||
deferred.Publish(50);
|
||||
|
||||
deferred.SetInner(null);
|
||||
deferred.Publish(99);
|
||||
|
||||
recording.LastValue.ShouldBe((byte)50, "writes after SetInner(null) must not reach the previous inner");
|
||||
}
|
||||
|
||||
private sealed class RecordingPublisher : IServiceLevelPublisher
|
||||
{
|
||||
public byte? LastValue { get; private set; }
|
||||
public void Publish(byte serviceLevel) => LastValue = serviceLevel;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,94 @@
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Opc.Ua.Server;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// #81 residual — verifies <see cref="SdkServiceLevelPublisher"/> locates the standard
|
||||
/// <c>VariableIds.Server_ServiceLevel</c> node through the SDK's DiagnosticsNodeManager and
|
||||
/// writes the byte value. Boots a real <see cref="StandardServer"/> on a free port so the
|
||||
/// SDK populates its predefined diagnostics nodes — that's what production sees.
|
||||
/// </summary>
|
||||
public sealed class SdkServiceLevelPublisherTests : IDisposable
|
||||
{
|
||||
private static CancellationToken Ct => TestContext.Current.CancellationToken;
|
||||
|
||||
private readonly string _pkiRoot = Path.Combine(
|
||||
Path.GetTempPath(),
|
||||
$"otopcua-pki-{Guid.NewGuid():N}");
|
||||
|
||||
[Fact]
|
||||
public async Task Publish_writes_value_to_Server_ServiceLevel_variable()
|
||||
{
|
||||
var server = new StandardServer();
|
||||
await using var host = new OpcUaApplicationHost(
|
||||
new OpcUaApplicationHostOptions
|
||||
{
|
||||
ApplicationName = "OtOpcUa.SvcLevel",
|
||||
ApplicationUri = $"urn:OtOpcUa.SvcLevel:{Guid.NewGuid():N}",
|
||||
OpcUaPort = AllocateFreePort(),
|
||||
PublicHostname = "localhost",
|
||||
PkiStoreRoot = _pkiRoot,
|
||||
},
|
||||
NullLogger<OpcUaApplicationHost>.Instance);
|
||||
|
||||
await host.StartAsync(server, Ct);
|
||||
|
||||
var publisher = new SdkServiceLevelPublisher(
|
||||
server.CurrentInstance,
|
||||
NullLogger<SdkServiceLevelPublisher>.Instance);
|
||||
|
||||
publisher.Publish(200);
|
||||
|
||||
var variable = server.CurrentInstance.ServerObject.ServiceLevel;
|
||||
variable.ShouldNotBeNull("Server.ServiceLevel must be present in the address space");
|
||||
variable.Value.ShouldBe((byte)200);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Publish_is_idempotent_when_called_multiple_times()
|
||||
{
|
||||
var server = new StandardServer();
|
||||
await using var host = new OpcUaApplicationHost(
|
||||
new OpcUaApplicationHostOptions
|
||||
{
|
||||
ApplicationName = "OtOpcUa.SvcLevel.Idem",
|
||||
ApplicationUri = $"urn:OtOpcUa.SvcLevel.Idem:{Guid.NewGuid():N}",
|
||||
OpcUaPort = AllocateFreePort(),
|
||||
PublicHostname = "localhost",
|
||||
PkiStoreRoot = _pkiRoot,
|
||||
},
|
||||
NullLogger<OpcUaApplicationHost>.Instance);
|
||||
|
||||
await host.StartAsync(server, Ct);
|
||||
var publisher = new SdkServiceLevelPublisher(
|
||||
server.CurrentInstance,
|
||||
NullLogger<SdkServiceLevelPublisher>.Instance);
|
||||
|
||||
publisher.Publish(100);
|
||||
publisher.Publish(150);
|
||||
publisher.Publish(240);
|
||||
|
||||
server.CurrentInstance.ServerObject.ServiceLevel.Value.ShouldBe((byte)240);
|
||||
}
|
||||
|
||||
private static int AllocateFreePort()
|
||||
{
|
||||
using var listener = new System.Net.Sockets.TcpListener(System.Net.IPAddress.Loopback, 0);
|
||||
listener.Start();
|
||||
var port = ((System.Net.IPEndPoint)listener.LocalEndpoint).Port;
|
||||
listener.Stop();
|
||||
return port;
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (Directory.Exists(_pkiRoot))
|
||||
{
|
||||
try { Directory.Delete(_pkiRoot, recursive: true); }
|
||||
catch { /* best-effort */ }
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,147 @@
|
||||
using Akka.Actor;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Opc.Ua.Server;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.OpcUa;
|
||||
|
||||
/// <summary>
|
||||
/// Task 60 / #81 — verifies the full path from cluster redundancy state to OPC UA
|
||||
/// <c>Server.ServiceLevel</c> visible on the wire. Boots a real <see cref="StandardServer"/>,
|
||||
/// wires <see cref="SdkServiceLevelPublisher"/> into a <see cref="DeferredServiceLevelPublisher"/>
|
||||
/// (the production binding pattern), spawns <see cref="OpcUaPublishActor"/> against the
|
||||
/// deferred publisher, and sends a <see cref="RedundancyStateChanged"/> snapshot. Asserts
|
||||
/// <c>ServerObject.ServiceLevel.Value</c> reflects the role-derived byte.
|
||||
/// </summary>
|
||||
public sealed class ServiceLevelEndToEndTests : RuntimeActorTestBase
|
||||
{
|
||||
private static CancellationToken Ct => CancellationToken.None;
|
||||
|
||||
[Fact]
|
||||
public async Task Primary_leader_drives_Server_ServiceLevel_to_240()
|
||||
{
|
||||
var pkiRoot = AllocatePkiRoot();
|
||||
try
|
||||
{
|
||||
var server = new StandardServer();
|
||||
await using var host = new OpcUaApplicationHost(
|
||||
BuildOptions("PrimaryLeader", pkiRoot),
|
||||
NullLogger<OpcUaApplicationHost>.Instance);
|
||||
await host.StartAsync(server, Ct);
|
||||
|
||||
var deferred = new DeferredServiceLevelPublisher();
|
||||
deferred.SetInner(new SdkServiceLevelPublisher(
|
||||
server.CurrentInstance,
|
||||
NullLogger<SdkServiceLevelPublisher>.Instance));
|
||||
|
||||
var localNode = NodeId.Parse("node-A");
|
||||
var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(
|
||||
serviceLevel: deferred,
|
||||
subscribeRedundancyTopic: false,
|
||||
localNode: localNode));
|
||||
|
||||
actor.Tell(new RedundancyStateChanged(
|
||||
Nodes: new[]
|
||||
{
|
||||
new NodeRedundancyState(localNode, RedundancyRole.Primary, IsClusterLeader: true, IsRoleLeaderForDriver: true, AsOfUtc: DateTime.UtcNow),
|
||||
},
|
||||
CorrelationId: CorrelationId.NewId()));
|
||||
|
||||
AwaitAssertion(() =>
|
||||
server.CurrentInstance.ServerObject.ServiceLevel.Value.ShouldBe((byte)240));
|
||||
}
|
||||
finally
|
||||
{
|
||||
DeletePkiRoot(pkiRoot);
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Secondary_drives_Server_ServiceLevel_to_100()
|
||||
{
|
||||
var pkiRoot = AllocatePkiRoot();
|
||||
try
|
||||
{
|
||||
var server = new StandardServer();
|
||||
await using var host = new OpcUaApplicationHost(
|
||||
BuildOptions("Secondary", pkiRoot),
|
||||
NullLogger<OpcUaApplicationHost>.Instance);
|
||||
await host.StartAsync(server, Ct);
|
||||
|
||||
var deferred = new DeferredServiceLevelPublisher();
|
||||
deferred.SetInner(new SdkServiceLevelPublisher(
|
||||
server.CurrentInstance,
|
||||
NullLogger<SdkServiceLevelPublisher>.Instance));
|
||||
|
||||
var localNode = NodeId.Parse("node-B");
|
||||
var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(
|
||||
serviceLevel: deferred,
|
||||
subscribeRedundancyTopic: false,
|
||||
localNode: localNode));
|
||||
|
||||
actor.Tell(new RedundancyStateChanged(
|
||||
Nodes: new[]
|
||||
{
|
||||
new NodeRedundancyState(localNode, RedundancyRole.Secondary, IsClusterLeader: false, IsRoleLeaderForDriver: false, AsOfUtc: DateTime.UtcNow),
|
||||
},
|
||||
CorrelationId: CorrelationId.NewId()));
|
||||
|
||||
AwaitAssertion(() =>
|
||||
server.CurrentInstance.ServerObject.ServiceLevel.Value.ShouldBe((byte)100));
|
||||
}
|
||||
finally
|
||||
{
|
||||
DeletePkiRoot(pkiRoot);
|
||||
}
|
||||
}
|
||||
|
||||
private static OpcUaApplicationHostOptions BuildOptions(string name, string pkiRoot) =>
|
||||
new()
|
||||
{
|
||||
ApplicationName = $"OtOpcUa.E2E.{name}",
|
||||
ApplicationUri = $"urn:OtOpcUa.E2E.{name}:{Guid.NewGuid():N}",
|
||||
OpcUaPort = AllocateFreePort(),
|
||||
PublicHostname = "localhost",
|
||||
PkiStoreRoot = pkiRoot,
|
||||
};
|
||||
|
||||
private static string AllocatePkiRoot() =>
|
||||
Path.Combine(Path.GetTempPath(), $"otopcua-pki-{Guid.NewGuid():N}");
|
||||
|
||||
private static void DeletePkiRoot(string root)
|
||||
{
|
||||
if (Directory.Exists(root))
|
||||
{
|
||||
try { Directory.Delete(root, recursive: true); }
|
||||
catch { /* best-effort */ }
|
||||
}
|
||||
}
|
||||
|
||||
private static int AllocateFreePort()
|
||||
{
|
||||
using var listener = new System.Net.Sockets.TcpListener(System.Net.IPAddress.Loopback, 0);
|
||||
listener.Start();
|
||||
var port = ((System.Net.IPEndPoint)listener.LocalEndpoint).Port;
|
||||
listener.Stop();
|
||||
return port;
|
||||
}
|
||||
|
||||
private void AwaitAssertion(Action assertion)
|
||||
{
|
||||
var deadline = DateTime.UtcNow.AddSeconds(3);
|
||||
Exception? last = null;
|
||||
while (DateTime.UtcNow < deadline)
|
||||
{
|
||||
try { assertion(); return; }
|
||||
catch (Exception ex) { last = ex; Thread.Sleep(30); }
|
||||
}
|
||||
if (last is not null) throw last;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user