From de77d42eabaf1f9d2a2a603827f2ee668afa325e Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 24 Apr 2026 14:53:38 -0400 Subject: [PATCH] =?UTF-8?q?Phase=206.3=20Stream=20B=20=E2=80=94=20peer-pro?= =?UTF-8?q?be=20HostedServices=20populating=20PeerReachabilityTracker?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes task #116 (GA hardening backlog). Before this commit the RedundancyStatePublisher saw PeerReachability.Unknown for every peer because the tracker had no writers — every healthy peer got degraded to the Isolated-Primary band (230) even when fully reachable. Not release-blocking (safe default), but not the full non-transparent- redundancy UX either. Two-layer probe model per docs/v2/implementation/phase-6-3-redundancy-runtime.md §Stream B: - PeerHttpProbeLoop (Stream B.1) — fast-fail layer at 2 s / 1 s timeout. Hits each peer's http://{Host}:{DashboardPort}/healthz via an injected IHttpClientFactory. Writes the HTTP bit of PeerReachability while preserving the UA bit from the last UA probe so a transient HTTP blip doesn't clobber the authoritative UA reading. - PeerUaProbeLoop (Stream B.2) — authoritative layer at 10 s / 5 s timeout. Calls DiscoveryClient.GetEndpoints against opc.tcp://{Host}: {OpcUaPort} — cheap compared to a full Session.Create, no cert trust required. Short-circuits when the HTTP probe last reported the peer unhealthy (no wasted handshakes on a known-dead endpoint), clearing the stale UaHealthy bit in that case. Both inherit from BackgroundService, follow the tick/delay/catch pattern RedundancyPublisherHostedService + ResilienceStatusPublisherHostedService established, and expose TickAsync() as internal for test drive-through. New PeerProbeOptions class carries the four intervals/timeouts so operators can tune cadence per site. Registered as singleton in Program.cs; HTTP client registered by name so the OtOpcUa handler chain (Serilog enrichers, potential future OpenTelemetry instrumentation) isn't bypassed. Tests — 9 new unit tests across PeerHttpProbeLoopTests (5) and PeerUaProbeLoopTests (4). All pass. Server.Tests total 243 → 252. Full solution build clean. Docs: v2-release-readiness.md Phase 6.3 follow-ups list marks the peer-probe bullet struck-through with a close-out note. Still deferred in Phase 6.3: - OPC UA variable-node binding (task #117 — ServiceLevel + ServerUriArray) - sp_PublishGeneration lease wrap (task #118) - Client interop matrix (task #119) Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/v2/v2-release-readiness.md | 2 +- .../Hosting/PeerHttpProbeLoop.cs | 103 +++++++++++ .../Hosting/PeerProbeOptions.cs | 27 +++ .../Hosting/PeerUaProbeLoop.cs | 133 +++++++++++++++ src/ZB.MOM.WW.OtOpcUa.Server/Program.cs | 9 + .../ZB.MOM.WW.OtOpcUa.Server.csproj | 1 + .../PeerHttpProbeLoopTests.cs | 161 ++++++++++++++++++ .../PeerUaProbeLoopTests.cs | 146 ++++++++++++++++ 8 files changed, 581 insertions(+), 1 deletion(-) create mode 100644 src/ZB.MOM.WW.OtOpcUa.Server/Hosting/PeerHttpProbeLoop.cs create mode 100644 src/ZB.MOM.WW.OtOpcUa.Server/Hosting/PeerProbeOptions.cs create mode 100644 src/ZB.MOM.WW.OtOpcUa.Server/Hosting/PeerUaProbeLoop.cs create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Server.Tests/PeerHttpProbeLoopTests.cs create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Server.Tests/PeerUaProbeLoopTests.cs diff --git a/docs/v2/v2-release-readiness.md b/docs/v2/v2-release-readiness.md index dc7bf6e..173124c 100644 --- a/docs/v2/v2-release-readiness.md +++ b/docs/v2/v2-release-readiness.md @@ -56,7 +56,7 @@ Remaining follow-ups (hardening): Remaining Phase 6.3 surfaces (hardening, not release-blocking): -- `PeerHttpProbeLoop` + `PeerUaProbeLoop` HostedServices populating `PeerReachabilityTracker` on each tick. Without these the publisher sees `PeerReachability.Unknown` → Isolated-Primary band (230). Safe default but not the full non-transparent-redundancy UX. +- ~~`PeerHttpProbeLoop` + `PeerUaProbeLoop` HostedServices populating `PeerReachabilityTracker` on each tick.~~ **Closed 2026-04-24.** Two-layer probe model shipped: HTTP probe at 2 s / 1 s timeout against `/healthz`; OPC UA probe at 10 s / 5 s timeout via `DiscoveryClient.GetEndpoints`, short-circuiting when HTTP reports the peer unhealthy. Registered on the Server as `AddHostedService` + `AddHostedService`. Publisher now sees accurate `PeerReachability` per peer instead of degrading to `Unknown` → Isolated-Primary band (230). - OPC UA variable-node wiring: bind `ServiceLevel` Byte + `ServerUriArray` String[] to the publisher's events via `BaseDataVariable.OnReadValue` / direct value push. - `sp_PublishGeneration` wraps its apply in `await using var lease = coordinator.BeginApplyLease(...)` so the `PrimaryMidApply` band (200) fires during actual publishes (task #148 part 2). - Client interop matrix — Ignition / Kepware / Aveva OI Gateway (Stream F, task #150). Manual + doc-only. diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/Hosting/PeerHttpProbeLoop.cs b/src/ZB.MOM.WW.OtOpcUa.Server/Hosting/PeerHttpProbeLoop.cs new file mode 100644 index 0000000..5120079 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Server/Hosting/PeerHttpProbeLoop.cs @@ -0,0 +1,103 @@ +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using ZB.MOM.WW.OtOpcUa.Server.Redundancy; + +namespace ZB.MOM.WW.OtOpcUa.Server.Hosting; + +/// +/// Phase 6.3 Stream B.1 — HTTP peer-probe loop. Polls every configured peer's +/// /healthz endpoint on a fast cadence (default 2 s) with a short timeout +/// (default 1 s) and writes the result to . +/// +/// +/// +/// Fast-fail layer — the UA probe short-circuits when HTTP says dead, so a failing +/// peer is detected within ~2 s without paying the cost of a full OPC UA session +/// setup on every tick. +/// +/// +/// Writes preserve the last UA-health bit so a transient HTTP blip doesn't stomp the +/// authoritative UA reading until the next UA tick. +/// is a record; we compose a new one per update. +/// +/// +/// Probe target is derived as http://{peer.Host}:{peer.DashboardPort}/healthz. +/// The server's own health-endpoints host serves /healthz on the dashboard +/// port, so this is symmetric with what peers expect to be probed. +/// +/// +public sealed class PeerHttpProbeLoop( + RedundancyCoordinator coordinator, + PeerReachabilityTracker tracker, + IHttpClientFactory httpClientFactory, + ILogger logger, + PeerProbeOptions? options = null) : BackgroundService +{ + private readonly PeerProbeOptions _options = options ?? new PeerProbeOptions(); + internal const string HttpClientName = "PeerHttpProbe"; + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + logger.LogInformation( + "PeerHttpProbeLoop running — probe every {Interval}ms, timeout {Timeout}ms", + _options.HttpProbeInterval.TotalMilliseconds, _options.HttpProbeTimeout.TotalMilliseconds); + + while (!stoppingToken.IsCancellationRequested) + { + try + { + await TickAsync(stoppingToken).ConfigureAwait(false); + } + catch (Exception ex) when (ex is not OperationCanceledException) + { + logger.LogWarning(ex, "PeerHttpProbeLoop tick failed"); + } + + try { await Task.Delay(_options.HttpProbeInterval, stoppingToken).ConfigureAwait(false); } + catch (OperationCanceledException) { break; } + } + } + + // internal for tests — lets a unit test drive a single tick synchronously without the loop. + internal async Task TickAsync(CancellationToken cancellationToken) + { + var topology = coordinator.Current; + if (topology is null || topology.Peers.Count == 0) return; + + // Probe every peer in parallel — one slow peer shouldn't block the cadence for others. + var probes = topology.Peers.Select(p => ProbeAsync(p, cancellationToken)).ToArray(); + await Task.WhenAll(probes).ConfigureAwait(false); + } + + private async Task ProbeAsync(RedundancyPeer peer, CancellationToken cancellationToken) + { + var url = $"http://{peer.Host}:{peer.DashboardPort}/healthz"; + var healthy = false; + try + { + using var client = httpClientFactory.CreateClient(HttpClientName); + client.Timeout = _options.HttpProbeTimeout; + using var response = await client.GetAsync(url, cancellationToken).ConfigureAwait(false); + healthy = response.IsSuccessStatusCode; + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + return; // shutdown — drop the result rather than writing a false-unhealthy + } + catch (Exception ex) when (ex is HttpRequestException or TaskCanceledException or OperationCanceledException) + { + // Any transport-level failure counts as unhealthy — connection refused, timeout, + // DNS fail, TLS fail. Swallow + mark unhealthy; don't log every tick, only when + // state transitions. + healthy = false; + } + + var previous = tracker.Get(peer.NodeId); + if (previous.HttpHealthy != healthy) + { + logger.LogInformation("Peer {NodeId} HTTP probe {Transition} ({Url})", + peer.NodeId, healthy ? "Healthy" : "Unhealthy", url); + } + tracker.Update(peer.NodeId, previous with { HttpHealthy = healthy }); + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/Hosting/PeerProbeOptions.cs b/src/ZB.MOM.WW.OtOpcUa.Server/Hosting/PeerProbeOptions.cs new file mode 100644 index 0000000..15c051a --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Server/Hosting/PeerProbeOptions.cs @@ -0,0 +1,27 @@ +namespace ZB.MOM.WW.OtOpcUa.Server.Hosting; + +/// +/// Configuration for the Phase 6.3 Stream B peer-probe HostedServices +/// ( + ). Drives cadence + +/// timeout for the two-layer probe model. Defaults match the spec in +/// docs/v2/implementation/phase-6-3-redundancy-runtime.md §Stream B. +/// +/// +/// HTTP layer is the fast-fail at 2 s / 1 s timeout; UA layer is authoritative at 10 s / +/// 5 s timeout. The UA probe short-circuits when the HTTP probe last reported the peer +/// unhealthy, to avoid burning TCP sessions on a known-dead endpoint. +/// +public sealed class PeerProbeOptions +{ + /// How often ticks. Default 2 s. + public TimeSpan HttpProbeInterval { get; init; } = TimeSpan.FromSeconds(2); + + /// Per-request timeout for the HTTP /healthz probe. Default 1 s. + public TimeSpan HttpProbeTimeout { get; init; } = TimeSpan.FromSeconds(1); + + /// How often ticks. Default 10 s. + public TimeSpan UaProbeInterval { get; init; } = TimeSpan.FromSeconds(10); + + /// Per-request timeout for the OPC UA endpoint discovery probe. Default 5 s. + public TimeSpan UaProbeTimeout { get; init; } = TimeSpan.FromSeconds(5); +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/Hosting/PeerUaProbeLoop.cs b/src/ZB.MOM.WW.OtOpcUa.Server/Hosting/PeerUaProbeLoop.cs new file mode 100644 index 0000000..8c9c549 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Server/Hosting/PeerUaProbeLoop.cs @@ -0,0 +1,133 @@ +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Opc.Ua; +using ZB.MOM.WW.OtOpcUa.Server.Redundancy; + +namespace ZB.MOM.WW.OtOpcUa.Server.Hosting; + +/// +/// Phase 6.3 Stream B.2 — OPC UA peer-probe loop. Opens a minimal discovery session to +/// each peer's OPC UA endpoint on a slow cadence (default 10 s) and records +/// in the tracker. +/// +/// +/// +/// Authoritative layer — the OPC UA discovery call verifies the endpoint actually +/// serves UA traffic (not just that the host OS answers a TCP connect on 4840). +/// If the peer passes UA discovery, it can serve real client sessions. +/// +/// +/// Short-circuits when the HTTP probe () last marked +/// the peer unhealthy — no point burning a full TCP+OPC UA handshake on a peer the +/// fast-fail probe already says is dead. In that case +/// is cleared (stale-UA-state protection) so a sustained HTTP outage doesn't leave +/// an ancient UaHealthy=true reading feeding the ServiceLevel calculator. +/// +/// +/// Implementation uses DiscoveryClient.GetEndpoints rather than opening a +/// full authenticated Session — the discovery endpoint is server-side cheap +/// (no session state), needs no certificate trust, and is specifically designed for +/// availability pinging. Timeout bounded by . +/// +/// +public sealed class PeerUaProbeLoop( + RedundancyCoordinator coordinator, + PeerReachabilityTracker tracker, + ILogger logger, + PeerProbeOptions? options = null, + Func>? endpointProbe = null) : BackgroundService +{ + private readonly PeerProbeOptions _options = options ?? new PeerProbeOptions(); + private readonly Func> _endpointProbe + = endpointProbe ?? DefaultEndpointProbeAsync; + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + logger.LogInformation( + "PeerUaProbeLoop running — probe every {Interval}ms, timeout {Timeout}ms", + _options.UaProbeInterval.TotalMilliseconds, _options.UaProbeTimeout.TotalMilliseconds); + + while (!stoppingToken.IsCancellationRequested) + { + try + { + await TickAsync(stoppingToken).ConfigureAwait(false); + } + catch (Exception ex) when (ex is not OperationCanceledException) + { + logger.LogWarning(ex, "PeerUaProbeLoop tick failed"); + } + + try { await Task.Delay(_options.UaProbeInterval, stoppingToken).ConfigureAwait(false); } + catch (OperationCanceledException) { break; } + } + } + + // internal for tests — single-tick entry point. + internal async Task TickAsync(CancellationToken cancellationToken) + { + var topology = coordinator.Current; + if (topology is null || topology.Peers.Count == 0) return; + + var probes = topology.Peers.Select(p => ProbeAsync(p, cancellationToken)).ToArray(); + await Task.WhenAll(probes).ConfigureAwait(false); + } + + private async Task ProbeAsync(RedundancyPeer peer, CancellationToken cancellationToken) + { + var previous = tracker.Get(peer.NodeId); + + // Short-circuit: don't waste a UA handshake when HTTP says the peer is down. Clear + // the UA bit so the publisher doesn't see a stale "UA still healthy" reading. + if (!previous.HttpHealthy) + { + if (previous.UaHealthy) + { + tracker.Update(peer.NodeId, previous with { UaHealthy = false }); + logger.LogInformation("Peer {NodeId} UA probe cleared (HTTP unhealthy)", peer.NodeId); + } + return; + } + + var endpoint = $"opc.tcp://{peer.Host}:{peer.OpcUaPort}"; + var healthy = await _endpointProbe(endpoint, _options.UaProbeTimeout, cancellationToken).ConfigureAwait(false); + + if (previous.UaHealthy != healthy) + { + logger.LogInformation("Peer {NodeId} UA probe {Transition} ({Endpoint})", + peer.NodeId, healthy ? "Healthy" : "Unhealthy", endpoint); + } + tracker.Update(peer.NodeId, previous with { UaHealthy = healthy }); + } + + /// + /// Default probe — DiscoveryClient.GetEndpoints against the peer's OPC UA + /// endpoint. Lightweight (no session, no certificate trust). Returns true + /// iff the call returns at least one advertised endpoint within the supplied + /// timeout; any transport, protocol, or timeout failure counts as unhealthy. + /// + internal static async Task DefaultEndpointProbeAsync( + string endpointUrl, TimeSpan timeout, CancellationToken cancellationToken) + { + try + { + using var linked = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + linked.CancelAfter(timeout); + + var config = EndpointConfiguration.Create(); + config.OperationTimeout = (int)timeout.TotalMilliseconds; + + using var discoveryClient = DiscoveryClient.Create(new Uri(endpointUrl), config); + var endpoints = await Task.Run(() => discoveryClient.GetEndpoints(null), linked.Token).ConfigureAwait(false); + return endpoints is { Count: > 0 }; + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + return false; + } + catch + { + return false; + } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/Program.cs b/src/ZB.MOM.WW.OtOpcUa.Server/Program.cs index a38ceef..aef4d44 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Server/Program.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Server/Program.cs @@ -164,6 +164,15 @@ builder.Services.AddSingleton(sp => new RedundancyStatePublisher( sp.GetRequiredService())); builder.Services.AddHostedService(); +// Phase 6.3 Stream B — two-layer peer-probe loops populating PeerReachabilityTracker. +// Without these the publisher sees PeerReachability.Unknown for every peer and degrades +// to the Isolated-Primary band (230) even when the peer is up. Safe default but not the +// full non-transparent-redundancy UX. +builder.Services.AddSingleton(); +builder.Services.AddHttpClient(PeerHttpProbeLoop.HttpClientName); +builder.Services.AddHostedService(); +builder.Services.AddHostedService(); + // Phase 7 follow-up #246 — historian sink + engine composer. NullAlarmHistorianSink // is the default until the Galaxy.Host SqliteStoreAndForwardSink writer adapter // lands (task #248). The composer reads Script/VirtualTag/ScriptedAlarm rows on diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/ZB.MOM.WW.OtOpcUa.Server.csproj b/src/ZB.MOM.WW.OtOpcUa.Server/ZB.MOM.WW.OtOpcUa.Server.csproj index 21c2a00..e013286 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Server/ZB.MOM.WW.OtOpcUa.Server.csproj +++ b/src/ZB.MOM.WW.OtOpcUa.Server/ZB.MOM.WW.OtOpcUa.Server.csproj @@ -16,6 +16,7 @@ + diff --git a/tests/ZB.MOM.WW.OtOpcUa.Server.Tests/PeerHttpProbeLoopTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Server.Tests/PeerHttpProbeLoopTests.cs new file mode 100644 index 0000000..da33a2e --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Server.Tests/PeerHttpProbeLoopTests.cs @@ -0,0 +1,161 @@ +using System.Net; +using System.Net.Http; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging.Abstractions; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Configuration; +using ZB.MOM.WW.OtOpcUa.Configuration.Entities; +using ZB.MOM.WW.OtOpcUa.Configuration.Enums; +using ZB.MOM.WW.OtOpcUa.Server.Hosting; +using ZB.MOM.WW.OtOpcUa.Server.Redundancy; + +namespace ZB.MOM.WW.OtOpcUa.Server.Tests; + +/// +/// Unit tests for . Drives TickAsync synchronously +/// via a test double so we don't race the loop's +/// Task.Delay. +/// +[Trait("Category", "Unit")] +public sealed class PeerHttpProbeLoopTests : IDisposable +{ + private readonly OtOpcUaConfigDbContext _db; + private readonly IDbContextFactory _dbFactory; + + public PeerHttpProbeLoopTests() + { + var opts = new DbContextOptionsBuilder() + .UseInMemoryDatabase($"peer-http-{Guid.NewGuid():N}") + .Options; + _db = new OtOpcUaConfigDbContext(opts); + _dbFactory = new DbContextFactory(opts); + } + + public void Dispose() => _db.Dispose(); + + [Fact] + public async Task Tick_with_no_peers_is_a_no_op() + { + var tracker = new PeerReachabilityTracker(); + var coordinator = await SeedAndInitializeAsync("A", ("A", RedundancyRole.Primary, "urn:A")); + + var loop = new PeerHttpProbeLoop(coordinator, tracker, + new StubHttpClientFactory(_ => new HttpResponseMessage(HttpStatusCode.OK)), + NullLogger.Instance); + + await loop.TickAsync(CancellationToken.None); + tracker.Get("B").ShouldBe(PeerReachability.Unknown); + } + + [Fact] + public async Task Tick_marks_peer_healthy_when_healthz_returns_200() + { + var coordinator = await SeedAndInitializeAsync("A", + ("A", RedundancyRole.Primary, "urn:A"), + ("B", RedundancyRole.Secondary, "urn:B")); + var tracker = new PeerReachabilityTracker(); + var factory = new StubHttpClientFactory(req => + { + req.RequestUri!.AbsolutePath.ShouldBe("/healthz"); + return new HttpResponseMessage(HttpStatusCode.OK); + }); + + var loop = new PeerHttpProbeLoop(coordinator, tracker, factory, NullLogger.Instance); + await loop.TickAsync(CancellationToken.None); + + tracker.Get("B").HttpHealthy.ShouldBeTrue(); + } + + [Fact] + public async Task Tick_marks_peer_unhealthy_when_healthz_throws() + { + var coordinator = await SeedAndInitializeAsync("A", + ("A", RedundancyRole.Primary, "urn:A"), + ("B", RedundancyRole.Secondary, "urn:B")); + var tracker = new PeerReachabilityTracker(); + var factory = new StubHttpClientFactory(_ => throw new HttpRequestException("no route to host")); + + var loop = new PeerHttpProbeLoop(coordinator, tracker, factory, NullLogger.Instance); + await loop.TickAsync(CancellationToken.None); + + tracker.Get("B").HttpHealthy.ShouldBeFalse(); + } + + [Fact] + public async Task Tick_preserves_UaHealthy_bit_when_flipping_HttpHealthy() + { + var coordinator = await SeedAndInitializeAsync("A", + ("A", RedundancyRole.Primary, "urn:A"), + ("B", RedundancyRole.Secondary, "urn:B")); + var tracker = new PeerReachabilityTracker(); + tracker.Update("B", new PeerReachability(HttpHealthy: false, UaHealthy: true)); + + var factory = new StubHttpClientFactory(_ => new HttpResponseMessage(HttpStatusCode.OK)); + var loop = new PeerHttpProbeLoop(coordinator, tracker, factory, NullLogger.Instance); + await loop.TickAsync(CancellationToken.None); + + var current = tracker.Get("B"); + current.HttpHealthy.ShouldBeTrue(); + current.UaHealthy.ShouldBeTrue("UA bit must not be clobbered by the HTTP probe"); + } + + [Fact] + public async Task Tick_marks_peer_unhealthy_on_non_2xx_response() + { + var coordinator = await SeedAndInitializeAsync("A", + ("A", RedundancyRole.Primary, "urn:A"), + ("B", RedundancyRole.Secondary, "urn:B")); + var tracker = new PeerReachabilityTracker(); + var factory = new StubHttpClientFactory(_ => new HttpResponseMessage(HttpStatusCode.ServiceUnavailable)); + + var loop = new PeerHttpProbeLoop(coordinator, tracker, factory, NullLogger.Instance); + await loop.TickAsync(CancellationToken.None); + + tracker.Get("B").HttpHealthy.ShouldBeFalse(); + } + + // ---- fixture helpers --------------------------------------------------- + + private async Task SeedAndInitializeAsync(string selfNodeId, params (string id, RedundancyRole role, string appUri)[] nodes) + { + _db.ServerClusters.Add(new ServerCluster + { + ClusterId = "c1", Name = "Warsaw", Enterprise = "zb", Site = "warsaw", + RedundancyMode = nodes.Length == 1 ? RedundancyMode.None : RedundancyMode.Warm, + CreatedBy = "test", + }); + foreach (var (id, role, appUri) in nodes) + { + _db.ClusterNodes.Add(new ClusterNode + { + NodeId = id, ClusterId = "c1", + RedundancyRole = role, Host = id.ToLowerInvariant(), + ApplicationUri = appUri, CreatedBy = "test", + }); + } + await _db.SaveChangesAsync(); + + var coordinator = new RedundancyCoordinator(_dbFactory, NullLogger.Instance, selfNodeId, "c1"); + await coordinator.InitializeAsync(CancellationToken.None); + return coordinator; + } + + private sealed class DbContextFactory(DbContextOptions options) + : IDbContextFactory + { + public OtOpcUaConfigDbContext CreateDbContext() => new(options); + } + + private sealed class StubHttpClientFactory(Func respond) : IHttpClientFactory + { + public HttpClient CreateClient(string name) => + new(new StubHandler(respond), disposeHandler: true) { Timeout = TimeSpan.FromSeconds(1) }; + + private sealed class StubHandler(Func respond) : HttpMessageHandler + { + protected override Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) + => Task.FromResult(respond(request)); + } + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Server.Tests/PeerUaProbeLoopTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Server.Tests/PeerUaProbeLoopTests.cs new file mode 100644 index 0000000..0b7f585 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Server.Tests/PeerUaProbeLoopTests.cs @@ -0,0 +1,146 @@ +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging.Abstractions; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Configuration; +using ZB.MOM.WW.OtOpcUa.Configuration.Entities; +using ZB.MOM.WW.OtOpcUa.Configuration.Enums; +using ZB.MOM.WW.OtOpcUa.Server.Hosting; +using ZB.MOM.WW.OtOpcUa.Server.Redundancy; + +namespace ZB.MOM.WW.OtOpcUa.Server.Tests; + +/// +/// Unit tests for . Drives TickAsync synchronously +/// with an injected endpoint-probe delegate so no real OPC UA server is needed. +/// +[Trait("Category", "Unit")] +public sealed class PeerUaProbeLoopTests : IDisposable +{ + private readonly OtOpcUaConfigDbContext _db; + private readonly IDbContextFactory _dbFactory; + + public PeerUaProbeLoopTests() + { + var opts = new DbContextOptionsBuilder() + .UseInMemoryDatabase($"peer-ua-{Guid.NewGuid():N}") + .Options; + _db = new OtOpcUaConfigDbContext(opts); + _dbFactory = new DbContextFactory(opts); + } + + public void Dispose() => _db.Dispose(); + + [Fact] + public async Task Tick_short_circuits_when_HttpHealthy_is_false() + { + var coordinator = await SeedAndInitializeAsync("A", + ("A", RedundancyRole.Primary, "urn:A"), + ("B", RedundancyRole.Secondary, "urn:B")); + var tracker = new PeerReachabilityTracker(); + tracker.Update("B", new PeerReachability(HttpHealthy: false, UaHealthy: true)); + + var probeCallCount = 0; + var loop = new PeerUaProbeLoop(coordinator, tracker, NullLogger.Instance, + options: null, + endpointProbe: (_, _, _) => { probeCallCount++; return Task.FromResult(true); }); + + await loop.TickAsync(CancellationToken.None); + + probeCallCount.ShouldBe(0, "UA probe must not run when HTTP reports the peer unhealthy"); + var current = tracker.Get("B"); + current.HttpHealthy.ShouldBeFalse(); + current.UaHealthy.ShouldBeFalse("stale UaHealthy=true must be cleared when HTTP says dead"); + } + + [Fact] + public async Task Tick_marks_UaHealthy_true_when_probe_succeeds() + { + var coordinator = await SeedAndInitializeAsync("A", + ("A", RedundancyRole.Primary, "urn:A"), + ("B", RedundancyRole.Secondary, "urn:B")); + var tracker = new PeerReachabilityTracker(); + tracker.Update("B", new PeerReachability(HttpHealthy: true, UaHealthy: false)); + + string? calledEndpoint = null; + var loop = new PeerUaProbeLoop(coordinator, tracker, NullLogger.Instance, + options: null, + endpointProbe: (endpoint, _, _) => { calledEndpoint = endpoint; return Task.FromResult(true); }); + + await loop.TickAsync(CancellationToken.None); + + calledEndpoint.ShouldNotBeNull(); + calledEndpoint!.ShouldStartWith("opc.tcp://b:"); + tracker.Get("B").UaHealthy.ShouldBeTrue(); + } + + [Fact] + public async Task Tick_marks_UaHealthy_false_when_probe_fails() + { + var coordinator = await SeedAndInitializeAsync("A", + ("A", RedundancyRole.Primary, "urn:A"), + ("B", RedundancyRole.Secondary, "urn:B")); + var tracker = new PeerReachabilityTracker(); + tracker.Update("B", new PeerReachability(HttpHealthy: true, UaHealthy: true)); + + var loop = new PeerUaProbeLoop(coordinator, tracker, NullLogger.Instance, + options: null, + endpointProbe: (_, _, _) => Task.FromResult(false)); + + await loop.TickAsync(CancellationToken.None); + + tracker.Get("B").UaHealthy.ShouldBeFalse(); + } + + [Fact] + public async Task Tick_preserves_HttpHealthy_bit_across_UA_update() + { + var coordinator = await SeedAndInitializeAsync("A", + ("A", RedundancyRole.Primary, "urn:A"), + ("B", RedundancyRole.Secondary, "urn:B")); + var tracker = new PeerReachabilityTracker(); + tracker.Update("B", new PeerReachability(HttpHealthy: true, UaHealthy: false)); + + var loop = new PeerUaProbeLoop(coordinator, tracker, NullLogger.Instance, + options: null, + endpointProbe: (_, _, _) => Task.FromResult(true)); + + await loop.TickAsync(CancellationToken.None); + + var current = tracker.Get("B"); + current.HttpHealthy.ShouldBeTrue("HTTP bit must not be clobbered by the UA probe"); + current.UaHealthy.ShouldBeTrue(); + } + + // ---- fixture helpers --------------------------------------------------- + + private async Task SeedAndInitializeAsync(string selfNodeId, params (string id, RedundancyRole role, string appUri)[] nodes) + { + _db.ServerClusters.Add(new ServerCluster + { + ClusterId = "c1", Name = "Warsaw", Enterprise = "zb", Site = "warsaw", + RedundancyMode = nodes.Length == 1 ? RedundancyMode.None : RedundancyMode.Warm, + CreatedBy = "test", + }); + foreach (var (id, role, appUri) in nodes) + { + _db.ClusterNodes.Add(new ClusterNode + { + NodeId = id, ClusterId = "c1", + RedundancyRole = role, Host = id.ToLowerInvariant(), + ApplicationUri = appUri, CreatedBy = "test", + }); + } + await _db.SaveChangesAsync(); + + var coordinator = new RedundancyCoordinator(_dbFactory, NullLogger.Instance, selfNodeId, "c1"); + await coordinator.InitializeAsync(CancellationToken.None); + return coordinator; + } + + private sealed class DbContextFactory(DbContextOptions options) + : IDbContextFactory + { + public OtOpcUaConfigDbContext CreateDbContext() => new(options); + } +}