Closes Stream C per docs/v2/implementation/phase-6-1-resilience-and-observability.md. Core.Observability (new namespace): - DriverHealthReport — pure-function aggregation over DriverHealthSnapshot list. Empty fleet = Healthy. Any Faulted = Faulted. Any Unknown/Initializing (no Faulted) = NotReady. Any Degraded or Reconnecting (no Faulted, no NotReady) = Degraded. Else Healthy. HttpStatus(verdict) maps to the Stream C.1 state matrix: Healthy/Degraded → 200, NotReady/Faulted → 503. - LogContextEnricher — Serilog LogContext wrapper. Push(id, type, capability, correlationId) returns an IDisposable scope; inner log calls carry DriverInstanceId / DriverType / CapabilityName / CorrelationId structured properties automatically. NewCorrelationId = 12-hex-char GUID slice for cases where no OPC UA RequestHeader.RequestHandle is in flight. CapabilityInvoker — now threads LogContextEnricher around every ExecuteAsync / ExecuteWriteAsync call site. OtOpcUaServer passes driver.DriverType through so logs correlate to the driver type too. Every capability call emits structured fields per the Stream C.4 compliance check. Server.Observability: - HealthEndpointsHost — standalone HttpListener on http://localhost:4841/ (loopback avoids Windows URL-ACL elevation; remote probing via reverse proxy or explicit netsh urlacl grant). Routes: /healthz → 200 when (configDbReachable OR usingStaleConfig); 503 otherwise. Body: status, uptimeSeconds, configDbReachable, usingStaleConfig. /readyz → DriverHealthReport.Aggregate + HttpStatus mapping. Body: verdict, drivers[], degradedDrivers[], uptimeSeconds. anything else → 404. Disposal cooperative with the HttpListener shutdown. - OpcUaApplicationHost starts the health host after the OPC UA server comes up and disposes it on shutdown. New OpcUaServerOptions knobs: HealthEndpointsEnabled (default true), HealthEndpointsPrefix (default http://localhost:4841/). Program.cs: - Serilog pipeline adds Enrich.FromLogContext + opt-in JSON file sink via `Serilog:WriteJson = true` appsetting. Uses Serilog.Formatting.Compact's CompactJsonFormatter (one JSON object per line — SIEMs like Splunk, Datadog, Graylog ingest without a regex parser). Server.Tests: - Existing 3 OpcUaApplicationHost integration tests now set HealthEndpointsEnabled=false to avoid port :4841 collisions under parallel execution. - New HealthEndpointsHostTests (9): /healthz healthy empty fleet; stale-config returns 200 with flag; unreachable+no-cache returns 503; /readyz empty/ Healthy/Faulted/Degraded/Initializing drivers return correct status and bodies; unknown path → 404. Uses ephemeral ports via Interlocked counter. Core.Tests: - DriverHealthReportTests (8): empty fleet, all-healthy, any-Faulted trumps, any-NotReady without Faulted, Degraded without Faulted/NotReady, HttpStatus per-verdict theory. - LogContextEnricherTests (8): all 4 properties attach; scope disposes cleanly; NewCorrelationId shape; null/whitespace driverInstanceId throws. - CapabilityInvokerEnrichmentTests (2): inner logs carry structured properties; no context leak outside the call site. Full solution dotnet test: 1016 passing (baseline 906, +110 for Phase 6.1 so far across Streams A+B+C). Pre-existing Client.CLI Subscribe flake unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
182 lines
6.5 KiB
C#
182 lines
6.5 KiB
C#
using System.Net;
|
|
using System.Text;
|
|
using System.Text.Json;
|
|
using Microsoft.Extensions.Logging;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Hosting;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Observability;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Server.Observability;
|
|
|
|
/// <summary>
|
|
/// Standalone <see cref="HttpListener"/> host for <c>/healthz</c> and <c>/readyz</c>
|
|
/// separate from the OPC UA binding. Per <c>docs/v2/implementation/phase-6-1-resilience-
|
|
/// and-observability.md</c> §Stream C.1.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Binds to <c>http://localhost:4841</c> by default — loopback avoids the Windows URL-ACL
|
|
/// elevation requirement that binding to <c>http://+:4841</c> (wildcard) would impose.
|
|
/// When a deployment needs remote probing, a reverse proxy or explicit netsh urlacl grant
|
|
/// is the expected path; documented in <c>docs/v2/Server-Deployment.md</c> in a follow-up.
|
|
/// </remarks>
|
|
public sealed class HealthEndpointsHost : IAsyncDisposable
|
|
{
|
|
private readonly string _prefix;
|
|
private readonly DriverHost _driverHost;
|
|
private readonly Func<bool> _configDbHealthy;
|
|
private readonly Func<bool> _usingStaleConfig;
|
|
private readonly ILogger<HealthEndpointsHost> _logger;
|
|
private readonly HttpListener _listener = new();
|
|
private readonly DateTime _startedUtc = DateTime.UtcNow;
|
|
private CancellationTokenSource? _cts;
|
|
private Task? _acceptLoop;
|
|
private bool _disposed;
|
|
|
|
public HealthEndpointsHost(
|
|
DriverHost driverHost,
|
|
ILogger<HealthEndpointsHost> logger,
|
|
Func<bool>? configDbHealthy = null,
|
|
Func<bool>? usingStaleConfig = null,
|
|
string prefix = "http://localhost:4841/")
|
|
{
|
|
_driverHost = driverHost;
|
|
_logger = logger;
|
|
_configDbHealthy = configDbHealthy ?? (() => true);
|
|
_usingStaleConfig = usingStaleConfig ?? (() => false);
|
|
_prefix = prefix.EndsWith('/') ? prefix : prefix + "/";
|
|
_listener.Prefixes.Add(_prefix);
|
|
}
|
|
|
|
public void Start()
|
|
{
|
|
_listener.Start();
|
|
_cts = new CancellationTokenSource();
|
|
_acceptLoop = Task.Run(() => AcceptLoopAsync(_cts.Token));
|
|
_logger.LogInformation("Health endpoints listening on {Prefix}", _prefix);
|
|
}
|
|
|
|
private async Task AcceptLoopAsync(CancellationToken ct)
|
|
{
|
|
while (!ct.IsCancellationRequested)
|
|
{
|
|
HttpListenerContext ctx;
|
|
try
|
|
{
|
|
ctx = await _listener.GetContextAsync().ConfigureAwait(false);
|
|
}
|
|
catch (HttpListenerException) when (ct.IsCancellationRequested) { break; }
|
|
catch (ObjectDisposedException) { break; }
|
|
|
|
_ = Task.Run(() => HandleAsync(ctx), ct);
|
|
}
|
|
}
|
|
|
|
private async Task HandleAsync(HttpListenerContext ctx)
|
|
{
|
|
try
|
|
{
|
|
var path = ctx.Request.Url?.AbsolutePath ?? "/";
|
|
switch (path)
|
|
{
|
|
case "/healthz":
|
|
await WriteHealthzAsync(ctx).ConfigureAwait(false);
|
|
break;
|
|
case "/readyz":
|
|
await WriteReadyzAsync(ctx).ConfigureAwait(false);
|
|
break;
|
|
default:
|
|
ctx.Response.StatusCode = 404;
|
|
break;
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogWarning(ex, "Health endpoint handler failure");
|
|
try { ctx.Response.StatusCode = 500; } catch { /* ignore */ }
|
|
}
|
|
finally
|
|
{
|
|
try { ctx.Response.Close(); } catch { /* ignore */ }
|
|
}
|
|
}
|
|
|
|
private async Task WriteHealthzAsync(HttpListenerContext ctx)
|
|
{
|
|
var configHealthy = _configDbHealthy();
|
|
var staleConfig = _usingStaleConfig();
|
|
// /healthz is 200 when process alive + (config DB reachable OR cache-warm).
|
|
// Stale-config still serves 200 so the process isn't flagged dead when the DB
|
|
// blips; the body surfaces the stale flag for operators.
|
|
var healthy = configHealthy || staleConfig;
|
|
ctx.Response.StatusCode = healthy ? 200 : 503;
|
|
|
|
var body = JsonSerializer.Serialize(new
|
|
{
|
|
status = healthy ? "healthy" : "unhealthy",
|
|
uptimeSeconds = (int)(DateTime.UtcNow - _startedUtc).TotalSeconds,
|
|
configDbReachable = configHealthy,
|
|
usingStaleConfig = staleConfig,
|
|
});
|
|
await WriteBodyAsync(ctx, body).ConfigureAwait(false);
|
|
}
|
|
|
|
private async Task WriteReadyzAsync(HttpListenerContext ctx)
|
|
{
|
|
var snapshots = BuildSnapshots();
|
|
var verdict = DriverHealthReport.Aggregate(snapshots);
|
|
ctx.Response.StatusCode = DriverHealthReport.HttpStatus(verdict);
|
|
|
|
var body = JsonSerializer.Serialize(new
|
|
{
|
|
verdict = verdict.ToString(),
|
|
uptimeSeconds = (int)(DateTime.UtcNow - _startedUtc).TotalSeconds,
|
|
drivers = snapshots.Select(d => new
|
|
{
|
|
id = d.DriverInstanceId,
|
|
state = d.State.ToString(),
|
|
detail = d.DetailMessage,
|
|
}).ToArray(),
|
|
degradedDrivers = snapshots
|
|
.Where(d => d.State == DriverState.Degraded || d.State == DriverState.Reconnecting)
|
|
.Select(d => d.DriverInstanceId)
|
|
.ToArray(),
|
|
});
|
|
await WriteBodyAsync(ctx, body).ConfigureAwait(false);
|
|
}
|
|
|
|
private IReadOnlyList<DriverHealthSnapshot> BuildSnapshots()
|
|
{
|
|
var list = new List<DriverHealthSnapshot>();
|
|
foreach (var id in _driverHost.RegisteredDriverIds)
|
|
{
|
|
var driver = _driverHost.GetDriver(id);
|
|
if (driver is null) continue;
|
|
var health = driver.GetHealth();
|
|
list.Add(new DriverHealthSnapshot(driver.DriverInstanceId, health.State, health.LastError));
|
|
}
|
|
return list;
|
|
}
|
|
|
|
private static async Task WriteBodyAsync(HttpListenerContext ctx, string body)
|
|
{
|
|
var bytes = Encoding.UTF8.GetBytes(body);
|
|
ctx.Response.ContentType = "application/json; charset=utf-8";
|
|
ctx.Response.ContentLength64 = bytes.LongLength;
|
|
await ctx.Response.OutputStream.WriteAsync(bytes).ConfigureAwait(false);
|
|
}
|
|
|
|
public async ValueTask DisposeAsync()
|
|
{
|
|
if (_disposed) return;
|
|
_disposed = true;
|
|
_cts?.Cancel();
|
|
try { _listener.Stop(); } catch { /* ignore */ }
|
|
if (_acceptLoop is not null)
|
|
{
|
|
try { await _acceptLoop.ConfigureAwait(false); } catch { /* ignore */ }
|
|
}
|
|
_listener.Close();
|
|
_cts?.Dispose();
|
|
}
|
|
}
|