Phase 6.3 Stream B — peer-probe HostedServices populating PeerReachabilityTracker
Closes task #116 (GA hardening backlog). Before this commit the RedundancyStatePublisher saw PeerReachability.Unknown for every peer because the tracker had no writers — every healthy peer got degraded to the Isolated-Primary band (230) even when fully reachable. Not release-blocking (safe default), but not the full non-transparent- redundancy UX either. Two-layer probe model per docs/v2/implementation/phase-6-3-redundancy-runtime.md §Stream B: - PeerHttpProbeLoop (Stream B.1) — fast-fail layer at 2 s / 1 s timeout. Hits each peer's http://{Host}:{DashboardPort}/healthz via an injected IHttpClientFactory. Writes the HTTP bit of PeerReachability while preserving the UA bit from the last UA probe so a transient HTTP blip doesn't clobber the authoritative UA reading. - PeerUaProbeLoop (Stream B.2) — authoritative layer at 10 s / 5 s timeout. Calls DiscoveryClient.GetEndpoints against opc.tcp://{Host}: {OpcUaPort} — cheap compared to a full Session.Create, no cert trust required. Short-circuits when the HTTP probe last reported the peer unhealthy (no wasted handshakes on a known-dead endpoint), clearing the stale UaHealthy bit in that case. Both inherit from BackgroundService, follow the tick/delay/catch pattern RedundancyPublisherHostedService + ResilienceStatusPublisherHostedService established, and expose TickAsync() as internal for test drive-through. New PeerProbeOptions class carries the four intervals/timeouts so operators can tune cadence per site. Registered as singleton in Program.cs; HTTP client registered by name so the OtOpcUa handler chain (Serilog enrichers, potential future OpenTelemetry instrumentation) isn't bypassed. Tests — 9 new unit tests across PeerHttpProbeLoopTests (5) and PeerUaProbeLoopTests (4). All pass. Server.Tests total 243 → 252. Full solution build clean. Docs: v2-release-readiness.md Phase 6.3 follow-ups list marks the peer-probe bullet struck-through with a close-out note. Still deferred in Phase 6.3: - OPC UA variable-node binding (task #117 — ServiceLevel + ServerUriArray) - sp_PublishGeneration lease wrap (task #118) - Client interop matrix (task #119) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
103
src/ZB.MOM.WW.OtOpcUa.Server/Hosting/PeerHttpProbeLoop.cs
Normal file
103
src/ZB.MOM.WW.OtOpcUa.Server/Hosting/PeerHttpProbeLoop.cs
Normal file
@@ -0,0 +1,103 @@
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using ZB.MOM.WW.OtOpcUa.Server.Redundancy;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Server.Hosting;
|
||||
|
||||
/// <summary>
|
||||
/// Phase 6.3 Stream B.1 — HTTP peer-probe loop. Polls every configured peer's
|
||||
/// <c>/healthz</c> endpoint on a fast cadence (default 2 s) with a short timeout
|
||||
/// (default 1 s) and writes the result to <see cref="PeerReachabilityTracker"/>.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Fast-fail layer — the UA probe short-circuits when HTTP says dead, so a failing
|
||||
/// peer is detected within ~2 s without paying the cost of a full OPC UA session
|
||||
/// setup on every tick.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Writes preserve the last UA-health bit so a transient HTTP blip doesn't stomp the
|
||||
/// authoritative UA reading until the next UA tick. <see cref="PeerReachability"/>
|
||||
/// is a record; we compose a new one per update.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Probe target is derived as <c>http://{peer.Host}:{peer.DashboardPort}/healthz</c>.
|
||||
/// The server's own health-endpoints host serves <c>/healthz</c> on the dashboard
|
||||
/// port, so this is symmetric with what peers expect to be probed.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class PeerHttpProbeLoop(
|
||||
RedundancyCoordinator coordinator,
|
||||
PeerReachabilityTracker tracker,
|
||||
IHttpClientFactory httpClientFactory,
|
||||
ILogger<PeerHttpProbeLoop> logger,
|
||||
PeerProbeOptions? options = null) : BackgroundService
|
||||
{
|
||||
private readonly PeerProbeOptions _options = options ?? new PeerProbeOptions();
|
||||
internal const string HttpClientName = "PeerHttpProbe";
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
logger.LogInformation(
|
||||
"PeerHttpProbeLoop running — probe every {Interval}ms, timeout {Timeout}ms",
|
||||
_options.HttpProbeInterval.TotalMilliseconds, _options.HttpProbeTimeout.TotalMilliseconds);
|
||||
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
await TickAsync(stoppingToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception ex) when (ex is not OperationCanceledException)
|
||||
{
|
||||
logger.LogWarning(ex, "PeerHttpProbeLoop tick failed");
|
||||
}
|
||||
|
||||
try { await Task.Delay(_options.HttpProbeInterval, stoppingToken).ConfigureAwait(false); }
|
||||
catch (OperationCanceledException) { break; }
|
||||
}
|
||||
}
|
||||
|
||||
// internal for tests — lets a unit test drive a single tick synchronously without the loop.
|
||||
internal async Task TickAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
var topology = coordinator.Current;
|
||||
if (topology is null || topology.Peers.Count == 0) return;
|
||||
|
||||
// Probe every peer in parallel — one slow peer shouldn't block the cadence for others.
|
||||
var probes = topology.Peers.Select(p => ProbeAsync(p, cancellationToken)).ToArray();
|
||||
await Task.WhenAll(probes).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task ProbeAsync(RedundancyPeer peer, CancellationToken cancellationToken)
|
||||
{
|
||||
var url = $"http://{peer.Host}:{peer.DashboardPort}/healthz";
|
||||
var healthy = false;
|
||||
try
|
||||
{
|
||||
using var client = httpClientFactory.CreateClient(HttpClientName);
|
||||
client.Timeout = _options.HttpProbeTimeout;
|
||||
using var response = await client.GetAsync(url, cancellationToken).ConfigureAwait(false);
|
||||
healthy = response.IsSuccessStatusCode;
|
||||
}
|
||||
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
return; // shutdown — drop the result rather than writing a false-unhealthy
|
||||
}
|
||||
catch (Exception ex) when (ex is HttpRequestException or TaskCanceledException or OperationCanceledException)
|
||||
{
|
||||
// Any transport-level failure counts as unhealthy — connection refused, timeout,
|
||||
// DNS fail, TLS fail. Swallow + mark unhealthy; don't log every tick, only when
|
||||
// state transitions.
|
||||
healthy = false;
|
||||
}
|
||||
|
||||
var previous = tracker.Get(peer.NodeId);
|
||||
if (previous.HttpHealthy != healthy)
|
||||
{
|
||||
logger.LogInformation("Peer {NodeId} HTTP probe {Transition} ({Url})",
|
||||
peer.NodeId, healthy ? "Healthy" : "Unhealthy", url);
|
||||
}
|
||||
tracker.Update(peer.NodeId, previous with { HttpHealthy = healthy });
|
||||
}
|
||||
}
|
||||
27
src/ZB.MOM.WW.OtOpcUa.Server/Hosting/PeerProbeOptions.cs
Normal file
27
src/ZB.MOM.WW.OtOpcUa.Server/Hosting/PeerProbeOptions.cs
Normal file
@@ -0,0 +1,27 @@
|
||||
namespace ZB.MOM.WW.OtOpcUa.Server.Hosting;
|
||||
|
||||
/// <summary>
|
||||
/// Configuration for the Phase 6.3 Stream B peer-probe HostedServices
|
||||
/// (<see cref="PeerHttpProbeLoop"/> + <see cref="PeerUaProbeLoop"/>). Drives cadence +
|
||||
/// timeout for the two-layer probe model. Defaults match the spec in
|
||||
/// <c>docs/v2/implementation/phase-6-3-redundancy-runtime.md</c> §Stream B.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// HTTP layer is the fast-fail at 2 s / 1 s timeout; UA layer is authoritative at 10 s /
|
||||
/// 5 s timeout. The UA probe short-circuits when the HTTP probe last reported the peer
|
||||
/// unhealthy, to avoid burning TCP sessions on a known-dead endpoint.
|
||||
/// </remarks>
|
||||
public sealed class PeerProbeOptions
|
||||
{
|
||||
/// <summary>How often <see cref="PeerHttpProbeLoop"/> ticks. Default 2 s.</summary>
|
||||
public TimeSpan HttpProbeInterval { get; init; } = TimeSpan.FromSeconds(2);
|
||||
|
||||
/// <summary>Per-request timeout for the HTTP <c>/healthz</c> probe. Default 1 s.</summary>
|
||||
public TimeSpan HttpProbeTimeout { get; init; } = TimeSpan.FromSeconds(1);
|
||||
|
||||
/// <summary>How often <see cref="PeerUaProbeLoop"/> ticks. Default 10 s.</summary>
|
||||
public TimeSpan UaProbeInterval { get; init; } = TimeSpan.FromSeconds(10);
|
||||
|
||||
/// <summary>Per-request timeout for the OPC UA endpoint discovery probe. Default 5 s.</summary>
|
||||
public TimeSpan UaProbeTimeout { get; init; } = TimeSpan.FromSeconds(5);
|
||||
}
|
||||
133
src/ZB.MOM.WW.OtOpcUa.Server/Hosting/PeerUaProbeLoop.cs
Normal file
133
src/ZB.MOM.WW.OtOpcUa.Server/Hosting/PeerUaProbeLoop.cs
Normal file
@@ -0,0 +1,133 @@
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Opc.Ua;
|
||||
using ZB.MOM.WW.OtOpcUa.Server.Redundancy;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Server.Hosting;
|
||||
|
||||
/// <summary>
|
||||
/// Phase 6.3 Stream B.2 — OPC UA peer-probe loop. Opens a minimal discovery session to
|
||||
/// each peer's OPC UA endpoint on a slow cadence (default 10 s) and records
|
||||
/// <see cref="PeerReachability.UaHealthy"/> in the tracker.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Authoritative layer — the OPC UA discovery call verifies the endpoint actually
|
||||
/// serves UA traffic (not just that the host OS answers a TCP connect on 4840).
|
||||
/// If the peer passes UA discovery, it can serve real client sessions.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Short-circuits when the HTTP probe (<see cref="PeerHttpProbeLoop"/>) last marked
|
||||
/// the peer unhealthy — no point burning a full TCP+OPC UA handshake on a peer the
|
||||
/// fast-fail probe already says is dead. In that case <see cref="PeerReachability.UaHealthy"/>
|
||||
/// is cleared (stale-UA-state protection) so a sustained HTTP outage doesn't leave
|
||||
/// an ancient UaHealthy=true reading feeding the ServiceLevel calculator.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Implementation uses <c>DiscoveryClient.GetEndpoints</c> rather than opening a
|
||||
/// full authenticated <c>Session</c> — the discovery endpoint is server-side cheap
|
||||
/// (no session state), needs no certificate trust, and is specifically designed for
|
||||
/// availability pinging. Timeout bounded by <see cref="PeerProbeOptions.UaProbeTimeout"/>.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class PeerUaProbeLoop(
|
||||
RedundancyCoordinator coordinator,
|
||||
PeerReachabilityTracker tracker,
|
||||
ILogger<PeerUaProbeLoop> logger,
|
||||
PeerProbeOptions? options = null,
|
||||
Func<string, TimeSpan, CancellationToken, Task<bool>>? endpointProbe = null) : BackgroundService
|
||||
{
|
||||
private readonly PeerProbeOptions _options = options ?? new PeerProbeOptions();
|
||||
private readonly Func<string, TimeSpan, CancellationToken, Task<bool>> _endpointProbe
|
||||
= endpointProbe ?? DefaultEndpointProbeAsync;
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
logger.LogInformation(
|
||||
"PeerUaProbeLoop running — probe every {Interval}ms, timeout {Timeout}ms",
|
||||
_options.UaProbeInterval.TotalMilliseconds, _options.UaProbeTimeout.TotalMilliseconds);
|
||||
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
await TickAsync(stoppingToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception ex) when (ex is not OperationCanceledException)
|
||||
{
|
||||
logger.LogWarning(ex, "PeerUaProbeLoop tick failed");
|
||||
}
|
||||
|
||||
try { await Task.Delay(_options.UaProbeInterval, stoppingToken).ConfigureAwait(false); }
|
||||
catch (OperationCanceledException) { break; }
|
||||
}
|
||||
}
|
||||
|
||||
// internal for tests — single-tick entry point.
|
||||
internal async Task TickAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
var topology = coordinator.Current;
|
||||
if (topology is null || topology.Peers.Count == 0) return;
|
||||
|
||||
var probes = topology.Peers.Select(p => ProbeAsync(p, cancellationToken)).ToArray();
|
||||
await Task.WhenAll(probes).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task ProbeAsync(RedundancyPeer peer, CancellationToken cancellationToken)
|
||||
{
|
||||
var previous = tracker.Get(peer.NodeId);
|
||||
|
||||
// Short-circuit: don't waste a UA handshake when HTTP says the peer is down. Clear
|
||||
// the UA bit so the publisher doesn't see a stale "UA still healthy" reading.
|
||||
if (!previous.HttpHealthy)
|
||||
{
|
||||
if (previous.UaHealthy)
|
||||
{
|
||||
tracker.Update(peer.NodeId, previous with { UaHealthy = false });
|
||||
logger.LogInformation("Peer {NodeId} UA probe cleared (HTTP unhealthy)", peer.NodeId);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
var endpoint = $"opc.tcp://{peer.Host}:{peer.OpcUaPort}";
|
||||
var healthy = await _endpointProbe(endpoint, _options.UaProbeTimeout, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (previous.UaHealthy != healthy)
|
||||
{
|
||||
logger.LogInformation("Peer {NodeId} UA probe {Transition} ({Endpoint})",
|
||||
peer.NodeId, healthy ? "Healthy" : "Unhealthy", endpoint);
|
||||
}
|
||||
tracker.Update(peer.NodeId, previous with { UaHealthy = healthy });
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Default probe — <c>DiscoveryClient.GetEndpoints</c> against the peer's OPC UA
|
||||
/// endpoint. Lightweight (no session, no certificate trust). Returns <c>true</c>
|
||||
/// iff the call returns at least one advertised endpoint within the supplied
|
||||
/// timeout; any transport, protocol, or timeout failure counts as unhealthy.
|
||||
/// </summary>
|
||||
internal static async Task<bool> DefaultEndpointProbeAsync(
|
||||
string endpointUrl, TimeSpan timeout, CancellationToken cancellationToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
using var linked = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
||||
linked.CancelAfter(timeout);
|
||||
|
||||
var config = EndpointConfiguration.Create();
|
||||
config.OperationTimeout = (int)timeout.TotalMilliseconds;
|
||||
|
||||
using var discoveryClient = DiscoveryClient.Create(new Uri(endpointUrl), config);
|
||||
var endpoints = await Task.Run(() => discoveryClient.GetEndpoints(null), linked.Token).ConfigureAwait(false);
|
||||
return endpoints is { Count: > 0 };
|
||||
}
|
||||
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
catch
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -164,6 +164,15 @@ builder.Services.AddSingleton(sp => new RedundancyStatePublisher(
|
||||
sp.GetRequiredService<PeerReachabilityTracker>()));
|
||||
builder.Services.AddHostedService<RedundancyPublisherHostedService>();
|
||||
|
||||
// Phase 6.3 Stream B — two-layer peer-probe loops populating PeerReachabilityTracker.
|
||||
// Without these the publisher sees PeerReachability.Unknown for every peer and degrades
|
||||
// to the Isolated-Primary band (230) even when the peer is up. Safe default but not the
|
||||
// full non-transparent-redundancy UX.
|
||||
builder.Services.AddSingleton<PeerProbeOptions>();
|
||||
builder.Services.AddHttpClient(PeerHttpProbeLoop.HttpClientName);
|
||||
builder.Services.AddHostedService<PeerHttpProbeLoop>();
|
||||
builder.Services.AddHostedService<PeerUaProbeLoop>();
|
||||
|
||||
// Phase 7 follow-up #246 — historian sink + engine composer. NullAlarmHistorianSink
|
||||
// is the default until the Galaxy.Host SqliteStoreAndForwardSink writer adapter
|
||||
// lands (task #248). The composer reads Script/VirtualTag/ScriptedAlarm rows on
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.Extensions.Hosting" Version="10.0.0"/>
|
||||
<PackageReference Include="Microsoft.Extensions.Hosting.WindowsServices" Version="10.0.0"/>
|
||||
<PackageReference Include="Microsoft.Extensions.Http" Version="10.0.0"/>
|
||||
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="10.0.0"/>
|
||||
<PackageReference Include="Serilog.Extensions.Hosting" Version="9.0.0"/>
|
||||
<PackageReference Include="Serilog.Settings.Configuration" Version="9.0.0"/>
|
||||
|
||||
Reference in New Issue
Block a user