using Akka.Actor;
using Microsoft.Extensions.Logging.Abstractions;
using Opc.Ua.Server;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy;
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
using ZB.MOM.WW.OtOpcUa.Commons.Types;
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa;
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.OpcUa;
///
/// Task 60 / #81 — verifies the full path from cluster redundancy state to OPC UA
/// Server.ServiceLevel visible on the wire. Boots a real ,
/// wires into a
/// (the production binding pattern), spawns against the
/// deferred publisher, and sends a snapshot. Asserts
/// ServerObject.ServiceLevel.Value reflects the role-derived byte.
///
public sealed class ServiceLevelEndToEndTests : RuntimeActorTestBase
{
private static CancellationToken Ct => CancellationToken.None;
[Fact]
public async Task Primary_leader_drives_Server_ServiceLevel_to_240()
{
var pkiRoot = AllocatePkiRoot();
try
{
var server = new StandardServer();
await using var host = new OpcUaApplicationHost(
BuildOptions("PrimaryLeader", pkiRoot),
NullLogger.Instance);
await host.StartAsync(server, Ct);
var deferred = new DeferredServiceLevelPublisher();
deferred.SetInner(new SdkServiceLevelPublisher(
server.CurrentInstance,
NullLogger.Instance));
var localNode = NodeId.Parse("node-A");
var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(
serviceLevel: deferred,
subscribeRedundancyTopic: false,
localNode: localNode));
actor.Tell(new RedundancyStateChanged(
Nodes: new[]
{
new NodeRedundancyState(localNode, RedundancyRole.Primary, IsClusterLeader: true, IsRoleLeaderForDriver: true, AsOfUtc: DateTime.UtcNow),
},
CorrelationId: CorrelationId.NewId()));
AwaitAssertion(() =>
server.CurrentInstance.ServerObject.ServiceLevel.Value.ShouldBe((byte)240));
}
finally
{
DeletePkiRoot(pkiRoot);
}
}
[Fact]
public async Task Secondary_drives_Server_ServiceLevel_to_100()
{
var pkiRoot = AllocatePkiRoot();
try
{
var server = new StandardServer();
await using var host = new OpcUaApplicationHost(
BuildOptions("Secondary", pkiRoot),
NullLogger.Instance);
await host.StartAsync(server, Ct);
var deferred = new DeferredServiceLevelPublisher();
deferred.SetInner(new SdkServiceLevelPublisher(
server.CurrentInstance,
NullLogger.Instance));
var localNode = NodeId.Parse("node-B");
var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests(
serviceLevel: deferred,
subscribeRedundancyTopic: false,
localNode: localNode));
actor.Tell(new RedundancyStateChanged(
Nodes: new[]
{
new NodeRedundancyState(localNode, RedundancyRole.Secondary, IsClusterLeader: false, IsRoleLeaderForDriver: false, AsOfUtc: DateTime.UtcNow),
},
CorrelationId: CorrelationId.NewId()));
AwaitAssertion(() =>
server.CurrentInstance.ServerObject.ServiceLevel.Value.ShouldBe((byte)100));
}
finally
{
DeletePkiRoot(pkiRoot);
}
}
private static OpcUaApplicationHostOptions BuildOptions(string name, string pkiRoot) =>
new()
{
ApplicationName = $"OtOpcUa.E2E.{name}",
ApplicationUri = $"urn:OtOpcUa.E2E.{name}:{Guid.NewGuid():N}",
OpcUaPort = AllocateFreePort(),
PublicHostname = "localhost",
PkiStoreRoot = pkiRoot,
};
private static string AllocatePkiRoot() =>
Path.Combine(Path.GetTempPath(), $"otopcua-pki-{Guid.NewGuid():N}");
private static void DeletePkiRoot(string root)
{
if (Directory.Exists(root))
{
try { Directory.Delete(root, recursive: true); }
catch { /* best-effort */ }
}
}
private static int AllocateFreePort()
{
using var listener = new System.Net.Sockets.TcpListener(System.Net.IPAddress.Loopback, 0);
listener.Start();
var port = ((System.Net.IPEndPoint)listener.LocalEndpoint).Port;
listener.Stop();
return port;
}
private void AwaitAssertion(Action assertion)
{
var deadline = DateTime.UtcNow.AddSeconds(3);
Exception? last = null;
while (DateTime.UtcNow < deadline)
{
try { assertion(); return; }
catch (Exception ex) { last = ex; Thread.Sleep(30); }
}
if (last is not null) throw last;
}
}