feat(runtime): PeerOpcUaProbeActor real TCP-connect probe (F12)

Replaces the Ok=true stub with a TCP connect to the peer's OPC UA port (4840
default) with a 2s timeout. A successful connect indicates the OPC UA server
process is up + accepting connections — enough for the redundancy calculator
to treat the peer as live. A full secure-channel Hello/Acknowledge handshake
is overkill for what the redundancy calc consumes and would pull in the OPC
UA Client SDK + a PKI setup. Upgrade later if a deeper liveness signal is ever
required.

Probe extracts the host from NodeId by stripping the :port suffix (commit
5cfbe8b encoded host:port into NodeId for cluster-member identity).

Tests: 2 new tests — Ok=true against a live TcpListener on a chosen port,
Ok=false against an unreachable endpoint. All 17 Runtime tests pass (was 16
covering only the message-contract surface).
This commit is contained in:
Joseph Doherty
2026-05-26 06:54:51 -04:00
parent f57f61deac
commit b06e3ae740
3 changed files with 96 additions and 16 deletions

View File

@@ -77,8 +77,8 @@
{"id": 65, "subject": "Task 65: New v2 docs (Architecture-v2, Cluster, ControlPlane, Runtime)", "status": "completed", "classification": "standard", "estMinutes": 5, "parallelizableWith": [62,63,64], "blockedBy": [57], "commit": "1689901"}, {"id": 65, "subject": "Task 65: New v2 docs (Architecture-v2, Cluster, ControlPlane, Runtime)", "status": "completed", "classification": "standard", "estMinutes": 5, "parallelizableWith": [62,63,64], "blockedBy": [57], "commit": "1689901"},
{"id": "F1", "subject": "Follow-up: AuthEndpoints integration tests against fused Host", "status": "completed", "classification": "small", "estMinutes": 10, "parallelizableWith": ["F2"], "blockedBy": [53], "commit": "463512d", "origin": "Deviation from Task 29 (commit 38ea0c5) — deferred until Task 53 wires AddOtOpcUaAuth/MapOtOpcUaAuth in Program. Add WebApplicationFactory<OtOpcUa.Host.Program> tests for /auth/login (204/401/503), /auth/ping (401/200), /auth/token (200+JWT), /auth/logout (204+cookie clear) using a stub ILdapAuthService.", "deviation": "Used HostBuilder + TestServer directly (Security.Tests/AuthEndpointsIntegrationTests) instead of WebApplicationFactory<Program> — Host needs Akka cluster bootstrap that's out of scope for this contract test. Cluster-mode auth coverage belongs in Task 58."}, {"id": "F1", "subject": "Follow-up: AuthEndpoints integration tests against fused Host", "status": "completed", "classification": "small", "estMinutes": 10, "parallelizableWith": ["F2"], "blockedBy": [53], "commit": "463512d", "origin": "Deviation from Task 29 (commit 38ea0c5) — deferred until Task 53 wires AddOtOpcUaAuth/MapOtOpcUaAuth in Program. Add WebApplicationFactory<OtOpcUa.Host.Program> tests for /auth/login (204/401/503), /auth/ping (401/200), /auth/token (200+JWT), /auth/logout (204+cookie clear) using a stub ILdapAuthService.", "deviation": "Used HostBuilder + TestServer directly (Security.Tests/AuthEndpointsIntegrationTests) instead of WebApplicationFactory<Program> — Host needs Akka cluster bootstrap that's out of scope for this contract test. Cluster-mode auth coverage belongs in Task 58."},
{"id": "F2", "subject": "Follow-up: Replace JwtBearer BuildServiceProvider antipattern with IPostConfigureOptions", "status": "completed", "classification": "small", "estMinutes": 5, "parallelizableWith": ["F1"], "blockedBy": [], "commit": "45a8c79", "origin": "Deviation from Task 26 (commit 207fc6a) — AddOtOpcUaAuth uses services.BuildServiceProvider().CreateScope() inside .AddJwtBearer lambda (ASP0000). Refactor to IPostConfigureOptions<JwtBearerOptions> so validation parameters resolve lazily from the real request provider."}, {"id": "F2", "subject": "Follow-up: Replace JwtBearer BuildServiceProvider antipattern with IPostConfigureOptions", "status": "completed", "classification": "small", "estMinutes": 5, "parallelizableWith": ["F1"], "blockedBy": [], "commit": "45a8c79", "origin": "Deviation from Task 26 (commit 207fc6a) — AddOtOpcUaAuth uses services.BuildServiceProvider().CreateScope() inside .AddJwtBearer lambda (ASP0000). Refactor to IPostConfigureOptions<JwtBearerOptions> so validation parameters resolve lazily from the real request provider."},
{"id": "F3", "subject": "Follow-up: Add EventId unique column to ConfigAuditLog for cross-restart audit idempotency", "status": "pending", "classification": "small", "estMinutes": 15, "parallelizableWith": ["F4"], "blockedBy": [], "origin": "Deviation from Task 33 — AuditWriterActor only dedups in-buffer; ConfigAuditLog lacks EventId column so a duplicate AuditEvent that arrives after a flush becomes a duplicate row. Add nullable EventId Guid + filtered unique index, migration, and refactor AuditWriterActor.WrapDetails away."}, {"id": "F3", "subject": "Follow-up: Add EventId unique column to ConfigAuditLog for cross-restart audit idempotency", "status": "completed", "classification": "small", "estMinutes": 15, "parallelizableWith": ["F4"], "blockedBy": [], "commit": "f57f61d", "origin": "Deviation from Task 33 — AuditWriterActor only dedups in-buffer; ConfigAuditLog lacks EventId column so a duplicate AuditEvent that arrives after a flush becomes a duplicate row. Add nullable EventId Guid + filtered unique index, migration, and refactor AuditWriterActor.WrapDetails away."},
{"id": "F4", "subject": "Follow-up: Harden AuditWriterActor.WrapDetails JSON synthesis with System.Text.Json", "status": "pending", "classification": "small", "estMinutes": 5, "parallelizableWith": ["F3"], "blockedBy": [], "origin": "Self-review of Task 33 — WrapDetails uses string concat; malformed caller DetailsJson would produce invalid JSON and trip the CK_ConfigAuditLog_DetailsJson_IsJson constraint, killing the entire flush batch. Discard this task if F3 lands first (F3 removes WrapDetails entirely)."}, {"id": "F4", "subject": "Follow-up: Harden AuditWriterActor.WrapDetails JSON synthesis with System.Text.Json", "status": "completed", "classification": "small", "estMinutes": 5, "parallelizableWith": ["F3"], "blockedBy": [], "commit": "f57f61d", "deviation": "Moot — F3 deleted WrapDetails entirely (EventId/CorrelationId now live in dedicated columns).", "origin": "Self-review of Task 33 — WrapDetails uses string concat; malformed caller DetailsJson would produce invalid JSON and trip the CK_ConfigAuditLog_DetailsJson_IsJson constraint, killing the entire flush batch. Discard this task if F3 lands first (F3 removes WrapDetails entirely)."},
{"id": "F5", "subject": "Follow-up: ConfigPublishCoordinator multi-node happy-path test", "status": "pending", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 30 — single-ActorSystem TestKit can't simulate the plan's 'dispatch to N driver nodes, all ack, seals' happy path because DiscoverDriverNodes() needs real cluster membership. Add a multi-system test (two ActorSystems joined into one cluster, driver-role on the second)."}, {"id": "F5", "subject": "Follow-up: ConfigPublishCoordinator multi-node happy-path test", "status": "pending", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 30 — single-ActorSystem TestKit can't simulate the plan's 'dispatch to N driver nodes, all ack, seals' happy path because DiscoverDriverNodes() needs real cluster membership. Add a multi-system test (two ActorSystems joined into one cluster, driver-role on the second)."},
{"id": "F6", "subject": "Follow-up: RedundancyStateActor publisher abstraction so tests don't need DPS bootstrap", "status": "completed", "classification": "small", "estMinutes": 10, "parallelizableWith": [], "blockedBy": [], "commit": "dfc143c", "origin": "Self-review of Task 35 — RedundancyStateActorTests are skipped because single-node DistributedPubSub bootstrap is unreliable in TestKit. Inject an Action<object> broadcast so tests can replace it with a probe; un-skip both tests."}, {"id": "F6", "subject": "Follow-up: RedundancyStateActor publisher abstraction so tests don't need DPS bootstrap", "status": "completed", "classification": "small", "estMinutes": 10, "parallelizableWith": [], "blockedBy": [], "commit": "dfc143c", "origin": "Self-review of Task 35 — RedundancyStateActorTests are skipped because single-node DistributedPubSub bootstrap is unreliable in TestKit. Inject an Action<object> broadcast so tests can replace it with a probe; un-skip both tests."},
{"id": "F7", "subject": "Follow-up: DriverInstanceActor full engine wiring (subscriptions, writes, ApplyDelta diff)", "status": "pending", "classification": "standard", "estMinutes": 45, "parallelizableWith": [], "blockedBy": [44], "origin": "Self-review of Task 41 — subscription publishing, ApplyDelta diffing, bad-quality-on-disconnect, write path, and supervisor backoff are stubbed. Wire after OpcUaPublishActor lands."}, {"id": "F7", "subject": "Follow-up: DriverInstanceActor full engine wiring (subscriptions, writes, ApplyDelta diff)", "status": "pending", "classification": "standard", "estMinutes": 45, "parallelizableWith": [], "blockedBy": [44], "origin": "Self-review of Task 41 — subscription publishing, ApplyDelta diffing, bad-quality-on-disconnect, write path, and supervisor backoff are stubbed. Wire after OpcUaPublishActor lands."},

View File

@@ -1,3 +1,4 @@
using System.Net.Sockets;
using Akka.Actor; using Akka.Actor;
using Akka.Cluster.Tools.PublishSubscribe; using Akka.Cluster.Tools.PublishSubscribe;
using Akka.Event; using Akka.Event;
@@ -8,7 +9,13 @@ namespace ZB.MOM.WW.OtOpcUa.Runtime.Health;
/// <summary> /// <summary>
/// Periodically pings a peer node's OPC UA endpoint (<c>opc.tcp://peer:4840</c>) and publishes /// Periodically pings a peer node's OPC UA endpoint (<c>opc.tcp://peer:4840</c>) and publishes
/// the result on the cluster's redundancy-state input topic so the admin <c>RedundancyStateActor</c> /// the result on the cluster's redundancy-state input topic so the admin <c>RedundancyStateActor</c>
/// can react. Real OPC UA probe call is staged for follow-up F12. /// can react.
///
/// The probe is a plain TCP connect to the OPC UA port with a short timeout — enough to detect
/// "is the OPC UA server process up and accepting connections?" A full secure-channel handshake
/// (Hello / Acknowledge) needs the OPC UA Client SDK and a session/PKI setup, which is more than
/// what the redundancy calculator needs. Upgrade to a real Hello probe if a deeper liveness signal
/// is ever required.
/// </summary> /// </summary>
public sealed class PeerOpcUaProbeActor : ReceiveActor, IWithTimers public sealed class PeerOpcUaProbeActor : ReceiveActor, IWithTimers
{ {
@@ -17,38 +24,87 @@ public sealed class PeerOpcUaProbeActor : ReceiveActor, IWithTimers
public const string RedundancyStateTopic = "redundancy-state"; public const string RedundancyStateTopic = "redundancy-state";
public static readonly TimeSpan DefaultProbeInterval = TimeSpan.FromSeconds(10); public static readonly TimeSpan DefaultProbeInterval = TimeSpan.FromSeconds(10);
public static readonly TimeSpan DefaultConnectTimeout = TimeSpan.FromSeconds(2);
public const int DefaultOpcUaPort = 4840;
public sealed record OpcUaProbeResult(NodeId NodeId, bool Ok); public sealed record OpcUaProbeResult(NodeId NodeId, bool Ok);
public sealed class Tick { public static readonly Tick Instance = new(); private Tick() { } } public sealed class Tick { public static readonly Tick Instance = new(); private Tick() { } }
private readonly NodeId _peer; private readonly NodeId _peer;
private readonly TimeSpan _interval; private readonly TimeSpan _interval;
private readonly TimeSpan _connectTimeout;
private readonly int _opcUaPort;
private readonly Action<object>? _broadcastOverride; private readonly Action<object>? _broadcastOverride;
private readonly ILoggingAdapter _log = Context.GetLogger(); private readonly ILoggingAdapter _log = Context.GetLogger();
public ITimerScheduler Timers { get; set; } = null!; public ITimerScheduler Timers { get; set; } = null!;
public static Props Props(NodeId peer, TimeSpan? interval = null, Action<object>? broadcast = null) => public static Props Props(
Akka.Actor.Props.Create(() => new PeerOpcUaProbeActor(peer, interval ?? DefaultProbeInterval, broadcast)); NodeId peer,
TimeSpan? interval = null,
TimeSpan? connectTimeout = null,
int opcUaPort = DefaultOpcUaPort,
Action<object>? broadcast = null) =>
Akka.Actor.Props.Create(() => new PeerOpcUaProbeActor(
peer,
interval ?? DefaultProbeInterval,
connectTimeout ?? DefaultConnectTimeout,
opcUaPort,
broadcast));
public PeerOpcUaProbeActor(NodeId peer, TimeSpan interval, Action<object>? broadcastOverride) public PeerOpcUaProbeActor(
NodeId peer,
TimeSpan interval,
TimeSpan connectTimeout,
int opcUaPort,
Action<object>? broadcastOverride)
{ {
_peer = peer; _peer = peer;
_interval = interval; _interval = interval;
_connectTimeout = connectTimeout;
_opcUaPort = opcUaPort;
_broadcastOverride = broadcastOverride; _broadcastOverride = broadcastOverride;
Receive<Tick>(_ => RunProbe()); ReceiveAsync<Tick>(_ => RunProbeAsync());
} }
protected override void PreStart() => protected override void PreStart() =>
Timers.StartPeriodicTimer("probe", Tick.Instance, _interval); Timers.StartPeriodicTimer("probe", Tick.Instance, _interval);
private void RunProbe() private async Task RunProbeAsync()
{ {
// F12: actual opc.tcp ping. Assume Ok=true until the probe is wired. var host = ExtractHost(_peer);
var msg = new OpcUaProbeResult(_peer, Ok: true); var ok = await TryTcpConnectAsync(host, _opcUaPort, _connectTimeout);
var msg = new OpcUaProbeResult(_peer, ok);
if (_broadcastOverride is not null) _broadcastOverride(msg); if (_broadcastOverride is not null) _broadcastOverride(msg);
else DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(RedundancyStateTopic, msg)); else DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(RedundancyStateTopic, msg));
_log.Debug("PeerOpcUaProbe: pinged {Peer} (probe staged for F12)", _peer); _log.Debug("PeerOpcUaProbe: pinged {Peer} ({Host}:{Port}) → ok={Ok}",
_peer, host, _opcUaPort, ok);
}
/// <summary>
/// <see cref="NodeId"/> values are derived as <c>host:port</c> (see <c>ClusterRoleInfo</c>).
/// Strip the port suffix to get the hostname for the TCP connect.
/// </summary>
private static string ExtractHost(NodeId nodeId)
{
var s = nodeId.Value;
var colon = s.LastIndexOf(':');
return colon > 0 ? s[..colon] : s;
}
private static async Task<bool> TryTcpConnectAsync(string host, int port, TimeSpan timeout)
{
try
{
using var client = new TcpClient();
using var cts = new CancellationTokenSource(timeout);
await client.ConnectAsync(host, port, cts.Token);
return client.Connected;
}
catch (Exception)
{
return false;
}
} }
} }

View File

@@ -1,3 +1,5 @@
using System.Net;
using System.Net.Sockets;
using Akka.Actor; using Akka.Actor;
using Shouldly; using Shouldly;
using Xunit; using Xunit;
@@ -24,16 +26,38 @@ public sealed class HealthProbeActorTests : RuntimeActorTestBase
} }
[Fact] [Fact]
public void PeerOpcUaProbeActor_publishes_probe_result_at_each_tick() public void PeerOpcUaProbeActor_reports_Ok_true_against_a_live_listener()
{ {
using var listener = new TcpListener(IPAddress.Loopback, 0);
listener.Start();
var port = ((IPEndPoint)listener.LocalEndpoint).Port;
var received = new System.Collections.Generic.List<object>(); var received = new System.Collections.Generic.List<object>();
var actor = Sys.ActorOf(PeerOpcUaProbeActor.Props( Sys.ActorOf(PeerOpcUaProbeActor.Props(
NodeId.Parse("peer-1"), NodeId.Parse($"127.0.0.1:{port}"),
interval: TimeSpan.FromMilliseconds(50), interval: TimeSpan.FromMilliseconds(50),
connectTimeout: TimeSpan.FromMilliseconds(500),
opcUaPort: port,
broadcast: msg => received.Add(msg))); broadcast: msg => received.Add(msg)));
AwaitCondition(() => received.Count >= 2, TimeSpan.FromSeconds(2)); AwaitCondition(() => received.OfType<PeerOpcUaProbeActor.OpcUaProbeResult>().Any(r => r.Ok),
received.OfType<PeerOpcUaProbeActor.OpcUaProbeResult>().ShouldNotBeEmpty(); TimeSpan.FromSeconds(3));
}
[Fact]
public void PeerOpcUaProbeActor_reports_Ok_false_against_an_unreachable_endpoint()
{
// Port 1 is reserved (tcpmux) and almost never bound on dev machines, so the connect fails fast.
var received = new System.Collections.Generic.List<object>();
Sys.ActorOf(PeerOpcUaProbeActor.Props(
NodeId.Parse("127.0.0.1:1"),
interval: TimeSpan.FromMilliseconds(50),
connectTimeout: TimeSpan.FromMilliseconds(300),
opcUaPort: 1,
broadcast: msg => received.Add(msg)));
AwaitCondition(() => received.OfType<PeerOpcUaProbeActor.OpcUaProbeResult>().Any(r => !r.Ok),
TimeSpan.FromSeconds(3));
} }
[Fact] [Fact]