diff --git a/docs/plans/2026-05-26-akka-hosting-alignment-plan.md.tasks.json b/docs/plans/2026-05-26-akka-hosting-alignment-plan.md.tasks.json index 55c7ebf..982c8cd 100644 --- a/docs/plans/2026-05-26-akka-hosting-alignment-plan.md.tasks.json +++ b/docs/plans/2026-05-26-akka-hosting-alignment-plan.md.tasks.json @@ -77,8 +77,8 @@ {"id": 65, "subject": "Task 65: New v2 docs (Architecture-v2, Cluster, ControlPlane, Runtime)", "status": "completed", "classification": "standard", "estMinutes": 5, "parallelizableWith": [62,63,64], "blockedBy": [57], "commit": "1689901"}, {"id": "F1", "subject": "Follow-up: AuthEndpoints integration tests against fused Host", "status": "completed", "classification": "small", "estMinutes": 10, "parallelizableWith": ["F2"], "blockedBy": [53], "commit": "463512d", "origin": "Deviation from Task 29 (commit 38ea0c5) — deferred until Task 53 wires AddOtOpcUaAuth/MapOtOpcUaAuth in Program. Add WebApplicationFactory tests for /auth/login (204/401/503), /auth/ping (401/200), /auth/token (200+JWT), /auth/logout (204+cookie clear) using a stub ILdapAuthService.", "deviation": "Used HostBuilder + TestServer directly (Security.Tests/AuthEndpointsIntegrationTests) instead of WebApplicationFactory — Host needs Akka cluster bootstrap that's out of scope for this contract test. Cluster-mode auth coverage belongs in Task 58."}, {"id": "F2", "subject": "Follow-up: Replace JwtBearer BuildServiceProvider antipattern with IPostConfigureOptions", "status": "completed", "classification": "small", "estMinutes": 5, "parallelizableWith": ["F1"], "blockedBy": [], "commit": "45a8c79", "origin": "Deviation from Task 26 (commit 207fc6a) — AddOtOpcUaAuth uses services.BuildServiceProvider().CreateScope() inside .AddJwtBearer lambda (ASP0000). Refactor to IPostConfigureOptions so validation parameters resolve lazily from the real request provider."}, - {"id": "F3", "subject": "Follow-up: Add EventId unique column to ConfigAuditLog for cross-restart audit idempotency", "status": "pending", "classification": "small", "estMinutes": 15, "parallelizableWith": ["F4"], "blockedBy": [], "origin": "Deviation from Task 33 — AuditWriterActor only dedups in-buffer; ConfigAuditLog lacks EventId column so a duplicate AuditEvent that arrives after a flush becomes a duplicate row. Add nullable EventId Guid + filtered unique index, migration, and refactor AuditWriterActor.WrapDetails away."}, - {"id": "F4", "subject": "Follow-up: Harden AuditWriterActor.WrapDetails JSON synthesis with System.Text.Json", "status": "pending", "classification": "small", "estMinutes": 5, "parallelizableWith": ["F3"], "blockedBy": [], "origin": "Self-review of Task 33 — WrapDetails uses string concat; malformed caller DetailsJson would produce invalid JSON and trip the CK_ConfigAuditLog_DetailsJson_IsJson constraint, killing the entire flush batch. Discard this task if F3 lands first (F3 removes WrapDetails entirely)."}, + {"id": "F3", "subject": "Follow-up: Add EventId unique column to ConfigAuditLog for cross-restart audit idempotency", "status": "completed", "classification": "small", "estMinutes": 15, "parallelizableWith": ["F4"], "blockedBy": [], "commit": "f57f61d", "origin": "Deviation from Task 33 — AuditWriterActor only dedups in-buffer; ConfigAuditLog lacks EventId column so a duplicate AuditEvent that arrives after a flush becomes a duplicate row. Add nullable EventId Guid + filtered unique index, migration, and refactor AuditWriterActor.WrapDetails away."}, + {"id": "F4", "subject": "Follow-up: Harden AuditWriterActor.WrapDetails JSON synthesis with System.Text.Json", "status": "completed", "classification": "small", "estMinutes": 5, "parallelizableWith": ["F3"], "blockedBy": [], "commit": "f57f61d", "deviation": "Moot — F3 deleted WrapDetails entirely (EventId/CorrelationId now live in dedicated columns).", "origin": "Self-review of Task 33 — WrapDetails uses string concat; malformed caller DetailsJson would produce invalid JSON and trip the CK_ConfigAuditLog_DetailsJson_IsJson constraint, killing the entire flush batch. Discard this task if F3 lands first (F3 removes WrapDetails entirely)."}, {"id": "F5", "subject": "Follow-up: ConfigPublishCoordinator multi-node happy-path test", "status": "pending", "classification": "standard", "estMinutes": 30, "parallelizableWith": [], "blockedBy": [], "origin": "Self-review of Task 30 — single-ActorSystem TestKit can't simulate the plan's 'dispatch to N driver nodes, all ack, seals' happy path because DiscoverDriverNodes() needs real cluster membership. Add a multi-system test (two ActorSystems joined into one cluster, driver-role on the second)."}, {"id": "F6", "subject": "Follow-up: RedundancyStateActor publisher abstraction so tests don't need DPS bootstrap", "status": "completed", "classification": "small", "estMinutes": 10, "parallelizableWith": [], "blockedBy": [], "commit": "dfc143c", "origin": "Self-review of Task 35 — RedundancyStateActorTests are skipped because single-node DistributedPubSub bootstrap is unreliable in TestKit. Inject an Action broadcast so tests can replace it with a probe; un-skip both tests."}, {"id": "F7", "subject": "Follow-up: DriverInstanceActor full engine wiring (subscriptions, writes, ApplyDelta diff)", "status": "pending", "classification": "standard", "estMinutes": 45, "parallelizableWith": [], "blockedBy": [44], "origin": "Self-review of Task 41 — subscription publishing, ApplyDelta diffing, bad-quality-on-disconnect, write path, and supervisor backoff are stubbed. Wire after OpcUaPublishActor lands."}, diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Health/PeerOpcUaProbeActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Health/PeerOpcUaProbeActor.cs index 7e8c157..ba4258a 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Health/PeerOpcUaProbeActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Health/PeerOpcUaProbeActor.cs @@ -1,3 +1,4 @@ +using System.Net.Sockets; using Akka.Actor; using Akka.Cluster.Tools.PublishSubscribe; using Akka.Event; @@ -8,7 +9,13 @@ namespace ZB.MOM.WW.OtOpcUa.Runtime.Health; /// /// Periodically pings a peer node's OPC UA endpoint (opc.tcp://peer:4840) and publishes /// the result on the cluster's redundancy-state input topic so the admin RedundancyStateActor -/// can react. Real OPC UA probe call is staged for follow-up F12. +/// can react. +/// +/// The probe is a plain TCP connect to the OPC UA port with a short timeout — enough to detect +/// "is the OPC UA server process up and accepting connections?" A full secure-channel handshake +/// (Hello / Acknowledge) needs the OPC UA Client SDK and a session/PKI setup, which is more than +/// what the redundancy calculator needs. Upgrade to a real Hello probe if a deeper liveness signal +/// is ever required. /// public sealed class PeerOpcUaProbeActor : ReceiveActor, IWithTimers { @@ -17,38 +24,87 @@ public sealed class PeerOpcUaProbeActor : ReceiveActor, IWithTimers public const string RedundancyStateTopic = "redundancy-state"; public static readonly TimeSpan DefaultProbeInterval = TimeSpan.FromSeconds(10); + public static readonly TimeSpan DefaultConnectTimeout = TimeSpan.FromSeconds(2); + public const int DefaultOpcUaPort = 4840; public sealed record OpcUaProbeResult(NodeId NodeId, bool Ok); public sealed class Tick { public static readonly Tick Instance = new(); private Tick() { } } private readonly NodeId _peer; private readonly TimeSpan _interval; + private readonly TimeSpan _connectTimeout; + private readonly int _opcUaPort; private readonly Action? _broadcastOverride; private readonly ILoggingAdapter _log = Context.GetLogger(); public ITimerScheduler Timers { get; set; } = null!; - public static Props Props(NodeId peer, TimeSpan? interval = null, Action? broadcast = null) => - Akka.Actor.Props.Create(() => new PeerOpcUaProbeActor(peer, interval ?? DefaultProbeInterval, broadcast)); + public static Props Props( + NodeId peer, + TimeSpan? interval = null, + TimeSpan? connectTimeout = null, + int opcUaPort = DefaultOpcUaPort, + Action? broadcast = null) => + Akka.Actor.Props.Create(() => new PeerOpcUaProbeActor( + peer, + interval ?? DefaultProbeInterval, + connectTimeout ?? DefaultConnectTimeout, + opcUaPort, + broadcast)); - public PeerOpcUaProbeActor(NodeId peer, TimeSpan interval, Action? broadcastOverride) + public PeerOpcUaProbeActor( + NodeId peer, + TimeSpan interval, + TimeSpan connectTimeout, + int opcUaPort, + Action? broadcastOverride) { _peer = peer; _interval = interval; + _connectTimeout = connectTimeout; + _opcUaPort = opcUaPort; _broadcastOverride = broadcastOverride; - Receive(_ => RunProbe()); + ReceiveAsync(_ => RunProbeAsync()); } protected override void PreStart() => Timers.StartPeriodicTimer("probe", Tick.Instance, _interval); - private void RunProbe() + private async Task RunProbeAsync() { - // F12: actual opc.tcp ping. Assume Ok=true until the probe is wired. - var msg = new OpcUaProbeResult(_peer, Ok: true); + var host = ExtractHost(_peer); + var ok = await TryTcpConnectAsync(host, _opcUaPort, _connectTimeout); + var msg = new OpcUaProbeResult(_peer, ok); if (_broadcastOverride is not null) _broadcastOverride(msg); else DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(RedundancyStateTopic, msg)); - _log.Debug("PeerOpcUaProbe: pinged {Peer} (probe staged for F12)", _peer); + _log.Debug("PeerOpcUaProbe: pinged {Peer} ({Host}:{Port}) → ok={Ok}", + _peer, host, _opcUaPort, ok); + } + + /// + /// values are derived as host:port (see ClusterRoleInfo). + /// Strip the port suffix to get the hostname for the TCP connect. + /// + private static string ExtractHost(NodeId nodeId) + { + var s = nodeId.Value; + var colon = s.LastIndexOf(':'); + return colon > 0 ? s[..colon] : s; + } + + private static async Task TryTcpConnectAsync(string host, int port, TimeSpan timeout) + { + try + { + using var client = new TcpClient(); + using var cts = new CancellationTokenSource(timeout); + await client.ConnectAsync(host, port, cts.Token); + return client.Connected; + } + catch (Exception) + { + return false; + } } } diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Health/HealthProbeActorTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Health/HealthProbeActorTests.cs index fc7c910..5b2e618 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Health/HealthProbeActorTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Health/HealthProbeActorTests.cs @@ -1,3 +1,5 @@ +using System.Net; +using System.Net.Sockets; using Akka.Actor; using Shouldly; using Xunit; @@ -24,16 +26,38 @@ public sealed class HealthProbeActorTests : RuntimeActorTestBase } [Fact] - public void PeerOpcUaProbeActor_publishes_probe_result_at_each_tick() + public void PeerOpcUaProbeActor_reports_Ok_true_against_a_live_listener() { + using var listener = new TcpListener(IPAddress.Loopback, 0); + listener.Start(); + var port = ((IPEndPoint)listener.LocalEndpoint).Port; + var received = new System.Collections.Generic.List(); - var actor = Sys.ActorOf(PeerOpcUaProbeActor.Props( - NodeId.Parse("peer-1"), + Sys.ActorOf(PeerOpcUaProbeActor.Props( + NodeId.Parse($"127.0.0.1:{port}"), interval: TimeSpan.FromMilliseconds(50), + connectTimeout: TimeSpan.FromMilliseconds(500), + opcUaPort: port, broadcast: msg => received.Add(msg))); - AwaitCondition(() => received.Count >= 2, TimeSpan.FromSeconds(2)); - received.OfType().ShouldNotBeEmpty(); + AwaitCondition(() => received.OfType().Any(r => r.Ok), + TimeSpan.FromSeconds(3)); + } + + [Fact] + public void PeerOpcUaProbeActor_reports_Ok_false_against_an_unreachable_endpoint() + { + // Port 1 is reserved (tcpmux) and almost never bound on dev machines, so the connect fails fast. + var received = new System.Collections.Generic.List(); + Sys.ActorOf(PeerOpcUaProbeActor.Props( + NodeId.Parse("127.0.0.1:1"), + interval: TimeSpan.FromMilliseconds(50), + connectTimeout: TimeSpan.FromMilliseconds(300), + opcUaPort: 1, + broadcast: msg => received.Add(msg))); + + AwaitCondition(() => received.OfType().Any(r => !r.Ok), + TimeSpan.FromSeconds(3)); } [Fact]