Phase 6.1 Stream E.2 partial — ResilienceStatusPublisherHostedService persists tracker snapshots to DB

Closes the HostedService half of Phase 6.1 Stream E.2 flagged as a follow-up
when the DriverResilienceStatusTracker shipped in PR #82. The Admin /hosts
column refresh + SignalR push + red-badge visual (Stream E.3) remain
deferred to the visual-compliance pass — this PR owns the persistence
story alone.

Server.Hosting:
- ResilienceStatusPublisherHostedService : BackgroundService. Samples the
  DriverResilienceStatusTracker every TickInterval (default 5 s) and upserts
  each (DriverInstanceId, HostName) counter pair into
  DriverInstanceResilienceStatus via EF. New rows on first sight; in-place
  updates on subsequent ticks.
- PersistOnceAsync extracted public so tests drive one tick directly —
  matches the ScheduledRecycleHostedService pattern for deterministic
  timing.
- Best-effort persistence: a DB outage logs a warning + continues; the next
  tick retries. Never crashes the app on sample failure. Cancellation
  propagates through cleanly.
- Tracks the bulkhead depth / recycle / footprint columns the entity was
  designed for. CurrentBulkheadDepth currently persisted as 0 — the tracker
  doesn't yet expose live bulkhead depth; a narrower follow-up wires the
  Polly bulkhead-depth observer into the tracker.

Tests (6 new in ResilienceStatusPublisherHostedServiceTests):
- Empty tracker → tick is a no-op, zero rows written.
- Single-host counters → upsert a new row with ConsecutiveFailures + breaker
  timestamp + sampled timestamp.
- Second tick updates the existing row in place (not a second insert).
- Multi-host pairs persist independently.
- Footprint counters (Baseline + Current) round-trip.
- TickCount advances on every PersistOnceAsync call.

Full solution dotnet test: 1225 passing (was 1219, +6). Pre-existing
Client.CLI Subscribe flake unchanged.

Production wiring (Program.cs) example:
  builder.Services.AddSingleton<DriverResilienceStatusTracker>();
  builder.Services.AddHostedService<ResilienceStatusPublisherHostedService>();
  // Tracker gets wired into CapabilityInvoker via OtOpcUaServer resolution
  // + the existing Phase 6.1 layer.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-04-19 14:36:00 -04:00
parent 244a36e03e
commit 016122841b
2 changed files with 299 additions and 0 deletions

View File

@@ -0,0 +1,138 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.OtOpcUa.Configuration;
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
namespace ZB.MOM.WW.OtOpcUa.Server.Hosting;
/// <summary>
/// Samples <see cref="DriverResilienceStatusTracker"/> at a fixed tick + upserts each
/// <c>(DriverInstanceId, HostName)</c> snapshot into <see cref="DriverInstanceResilienceStatus"/>
/// so Admin <c>/hosts</c> can render live resilience counters across restarts.
/// </summary>
/// <remarks>
/// <para>Closes the HostedService piece of Phase 6.1 Stream E.2 flagged as a follow-up
/// when the tracker shipped in PR #82. The Admin UI column-refresh piece (red badge when
/// ConsecutiveFailures &gt; breakerThreshold / 2 + SignalR push) is still deferred to
/// the visual-compliance pass — this service owns the persistence half alone.</para>
///
/// <para>Tick interval defaults to 5 s. Persistence is best-effort: a DB outage during
/// a tick logs + continues; the next tick tries again with the latest snapshots. The
/// hosted service never crashes the app on sample failure.</para>
///
/// <para><see cref="PersistOnceAsync"/> factored as a public method so tests can drive
/// it directly, matching the <see cref="ScheduledRecycleHostedService.TickOnceAsync"/>
/// pattern for deterministic unit-test timing.</para>
/// </remarks>
public sealed class ResilienceStatusPublisherHostedService : BackgroundService
{
private readonly DriverResilienceStatusTracker _tracker;
private readonly IDbContextFactory<OtOpcUaConfigDbContext> _dbContextFactory;
private readonly ILogger<ResilienceStatusPublisherHostedService> _logger;
private readonly TimeProvider _timeProvider;
/// <summary>Tick interval — how often the tracker snapshot is persisted.</summary>
public TimeSpan TickInterval { get; }
/// <summary>Snapshot of the tick count for diagnostics + test assertions.</summary>
public int TickCount { get; private set; }
public ResilienceStatusPublisherHostedService(
DriverResilienceStatusTracker tracker,
IDbContextFactory<OtOpcUaConfigDbContext> dbContextFactory,
ILogger<ResilienceStatusPublisherHostedService> logger,
TimeProvider? timeProvider = null,
TimeSpan? tickInterval = null)
{
ArgumentNullException.ThrowIfNull(tracker);
ArgumentNullException.ThrowIfNull(dbContextFactory);
_tracker = tracker;
_dbContextFactory = dbContextFactory;
_logger = logger;
_timeProvider = timeProvider ?? TimeProvider.System;
TickInterval = tickInterval ?? TimeSpan.FromSeconds(5);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation(
"ResilienceStatusPublisherHostedService starting — tick interval = {Interval}",
TickInterval);
while (!stoppingToken.IsCancellationRequested)
{
try
{
await Task.Delay(TickInterval, _timeProvider, stoppingToken).ConfigureAwait(false);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
await PersistOnceAsync(stoppingToken).ConfigureAwait(false);
}
_logger.LogInformation("ResilienceStatusPublisherHostedService stopping after {TickCount} tick(s).", TickCount);
}
/// <summary>
/// Take one snapshot of the tracker + upsert each pair into the persistence table.
/// Swallows transient exceptions + logs them; never throws from a sample failure.
/// </summary>
public async Task PersistOnceAsync(CancellationToken cancellationToken)
{
TickCount++;
var snapshot = _tracker.Snapshot();
if (snapshot.Count == 0) return;
try
{
await using var db = await _dbContextFactory.CreateDbContextAsync(cancellationToken).ConfigureAwait(false);
var now = _timeProvider.GetUtcNow().UtcDateTime;
foreach (var (driverInstanceId, hostName, counters) in snapshot)
{
var existing = await db.DriverInstanceResilienceStatuses
.FirstOrDefaultAsync(x => x.DriverInstanceId == driverInstanceId && x.HostName == hostName, cancellationToken)
.ConfigureAwait(false);
if (existing is null)
{
db.DriverInstanceResilienceStatuses.Add(new DriverInstanceResilienceStatus
{
DriverInstanceId = driverInstanceId,
HostName = hostName,
LastCircuitBreakerOpenUtc = counters.LastBreakerOpenUtc,
ConsecutiveFailures = counters.ConsecutiveFailures,
CurrentBulkheadDepth = 0, // Phase 6.1 Stream A tracker doesn't emit bulkhead depth yet
LastRecycleUtc = counters.LastRecycleUtc,
BaselineFootprintBytes = counters.BaselineFootprintBytes,
CurrentFootprintBytes = counters.CurrentFootprintBytes,
LastSampledUtc = now,
});
}
else
{
existing.LastCircuitBreakerOpenUtc = counters.LastBreakerOpenUtc;
existing.ConsecutiveFailures = counters.ConsecutiveFailures;
existing.LastRecycleUtc = counters.LastRecycleUtc;
existing.BaselineFootprintBytes = counters.BaselineFootprintBytes;
existing.CurrentFootprintBytes = counters.CurrentFootprintBytes;
existing.LastSampledUtc = now;
}
}
await db.SaveChangesAsync(cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException) { throw; }
catch (Exception ex)
{
_logger.LogWarning(ex,
"ResilienceStatusPublisher persistence tick failed; next tick will retry with latest snapshots.");
}
}
}