Phase 6.3 A.2 + D.1 — GenerationRefreshHostedService: poll + lease-wrap apply

Closes tasks #132 + #118 (GA hardening backlog).

Before this commit, the Server only observed the generation in force at
process start (SealedBootstrap). Peer-published generations accumulated
in the shared config DB while the running node kept serving the
generation it had sealed on boot. Two consequences:

1. Operator role-swaps required a process restart — Admin publishes a
   new generation, but the Server's RedundancyCoordinator never re-read
   the topology.
2. ApplyLeaseRegistry had no apply to wrap. ServiceLevelBand sat at
   PrimaryHealthy (255) during every publish because nothing opened a
   lease; PrimaryMidApply (200) was effectively dead code.

New GenerationRefreshHostedService (src/.../Server/Hosting/):
- Polls sp_GetCurrentGenerationForCluster every 5s (tunable).
- On change: opens leases.BeginApplyLease(newGenerationId, Guid.NewGuid()),
  calls coordinator.RefreshAsync inside the `await using`, releases on
  scope exit (success / exception / cancellation via IAsyncDisposable).
- Diagnostic properties: LastAppliedGenerationId, TickCount, RefreshCount.
- Delegate-injected currentGenerationQuery for test drive-through; real
  path is the private static DefaultQueryCurrentGenerationAsync.
- Registered as HostedService in Program.cs alongside the Phase 6.3
  redundancy / peer-probe stack.

Scope intentionally narrow: only the coordinator refreshes today. Driver
re-init, virtual-tag re-bind, script-engine reload remain as follow-up
wiring. The lease wrap is the right seam for those subscribers to hook
once they grow hot-reload support — the doc comments say so.

Tests
- 5 new unit tests in GenerationRefreshHostedServiceTests (first-apply,
  identity no-op, change-triggers-refresh, null-generation-is-no-op,
  lease-is-released-on-exit). Stub generation-query delegate; real
  coordinator backed by EF InMemory DB.
- Server.Tests total 252 → 257.

Docs
- v2-release-readiness.md Phase 6.3 follow-ups list marks the
  sp_PublishGeneration lease wrap bullet struck-through with close-out
  note.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-04-24 15:02:33 -04:00
parent de77d42eab
commit a23de2a7e4
4 changed files with 317 additions and 1 deletions

View File

@@ -58,7 +58,7 @@ Remaining Phase 6.3 surfaces (hardening, not release-blocking):
- ~~`PeerHttpProbeLoop` + `PeerUaProbeLoop` HostedServices populating `PeerReachabilityTracker` on each tick.~~ **Closed 2026-04-24.** Two-layer probe model shipped: HTTP probe at 2 s / 1 s timeout against `/healthz`; OPC UA probe at 10 s / 5 s timeout via `DiscoveryClient.GetEndpoints`, short-circuiting when HTTP reports the peer unhealthy. Registered on the Server as `AddHostedService<PeerHttpProbeLoop>` + `AddHostedService<PeerUaProbeLoop>`. Publisher now sees accurate `PeerReachability` per peer instead of degrading to `Unknown` → Isolated-Primary band (230).
- OPC UA variable-node wiring: bind `ServiceLevel` Byte + `ServerUriArray` String[] to the publisher's events via `BaseDataVariable.OnReadValue` / direct value push.
- `sp_PublishGeneration` wraps its apply in `await using var lease = coordinator.BeginApplyLease(...)` so the `PrimaryMidApply` band (200) fires during actual publishes (task #148 part 2).
- ~~`sp_PublishGeneration` wraps its apply in `await using var lease = coordinator.BeginApplyLease(...)` so the `PrimaryMidApply` band (200) fires during actual publishes (task #148 part 2).~~ **Closed 2026-04-24.** The apply loop now lives in `GenerationRefreshHostedService` — polls `sp_GetCurrentGenerationForCluster` every 5s, opens a lease when a new generation is detected, calls `RedundancyCoordinator.RefreshAsync` inside the `await using`, releases the lease on all exit paths. Replaces the previous "topology never refreshes without a process restart" behaviour.
- Client interop matrix — Ignition / Kepware / Aveva OI Gateway (Stream F, task #150). Manual + doc-only.
### ~~Phase 5 driver complement~~ (task #120 — **CLOSED** 2026-04-24)

View File

@@ -0,0 +1,160 @@
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.OtOpcUa.Server.Redundancy;
namespace ZB.MOM.WW.OtOpcUa.Server.Hosting;
/// <summary>
/// Phase 6.3 A.2 + Phase 6.1 Stream D follow-up — polls
/// <c>sp_GetCurrentGenerationForCluster</c> on a cadence and, when a newer generation
/// is detected, wraps the apply in an <see cref="ApplyLeaseRegistry"/> lease
/// (flipping ServiceLevel to <see cref="ServiceLevelBand.PrimaryMidApply"/>) and
/// refreshes the <see cref="RedundancyCoordinator"/> so operator role-swaps take
/// effect without a process restart.
/// </summary>
/// <remarks>
/// <para>
/// Before this service shipped, the Server only ever saw the generation in force
/// at process start (<see cref="SealedBootstrap"/>). Peer-published generations
/// silently accumulated in the shared config DB; the running node kept serving
/// the generation it had sealed on boot until the operator restarted it.
/// </para>
/// <para>
/// Closes the Phase 6.3 D.1 design hole around <c>PrimaryMidApply</c>: the
/// <c>coordinator.BeginApplyLease(...)</c> wrap now encloses an actual apply
/// (the coordinator refresh + future subscriber fan-out). Lease dispose fires
/// on every exit path — success, exception, cancellation — so
/// <c>ApplyLeaseRegistry</c> can never pin a crashed refresh at
/// PrimaryMidApply.
/// </para>
/// <para>
/// Deliberately narrow scope: refreshes <see cref="RedundancyCoordinator"/>
/// only. Driver re-init, virtual-tag re-bind, script-engine reload, etc. remain
/// as follow-up wiring — add subscribers to this service's apply path as those
/// components grow hot-reload support. The lease wrap is the right seam for
/// those subscribers to hook.
/// </para>
/// </remarks>
public sealed class GenerationRefreshHostedService(
NodeOptions options,
ApplyLeaseRegistry leases,
RedundancyCoordinator coordinator,
ILogger<GenerationRefreshHostedService> logger,
TimeSpan? tickInterval = null,
Func<CancellationToken, Task<long?>>? currentGenerationQuery = null) : BackgroundService
{
private readonly Func<CancellationToken, Task<long?>> _generationQuery = currentGenerationQuery
?? new Func<CancellationToken, Task<long?>>(ct => DefaultQueryCurrentGenerationAsync(options, logger, ct));
/// <summary>
/// How often the service polls <c>sp_GetCurrentGenerationForCluster</c>. Default 5 s —
/// low enough that operator publishes take effect promptly, high enough that the
/// overhead on the central DB is negligible even across a 100-node fleet.
/// </summary>
public TimeSpan TickInterval { get; } = tickInterval ?? TimeSpan.FromSeconds(5);
/// <summary>
/// Newest generation the service has applied. Exposed for diagnostics +
/// <see cref="TickCount"/> style health surfaces. <c>null</c> before the first
/// successful poll.
/// </summary>
public long? LastAppliedGenerationId { get; private set; }
/// <summary>Successful ticks — whether or not a generation change was detected.</summary>
public int TickCount { get; private set; }
/// <summary>Ticks that observed a generation change and ran a refresh.</summary>
public int RefreshCount { get; private set; }
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
logger.LogInformation(
"GenerationRefreshHostedService running — polling every {Tick}s",
TickInterval.TotalSeconds);
while (!stoppingToken.IsCancellationRequested)
{
try
{
await TickAsync(stoppingToken).ConfigureAwait(false);
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
logger.LogWarning(ex, "GenerationRefreshHostedService tick failed");
}
try { await Task.Delay(TickInterval, stoppingToken).ConfigureAwait(false); }
catch (OperationCanceledException) { break; }
}
}
// internal for tests — single-tick entry point.
internal async Task TickAsync(CancellationToken cancellationToken)
{
var current = await _generationQuery(cancellationToken).ConfigureAwait(false);
TickCount++;
if (current is null) return;
if (LastAppliedGenerationId is long last && current == last)
{
return; // no change
}
logger.LogInformation(
"Generation change detected — {Previous} → {Current}; applying",
LastAppliedGenerationId?.ToString() ?? "(none)", current);
// Lease wraps the apply window: ServiceLevelCalculator reads
// ApplyLeaseRegistry.IsApplyInProgress and returns PrimaryMidApply (200) while any
// lease is open. Publisher ticks in parallel (1s cadence) will observe the band
// transition and push it onto the OPC UA Server.ServiceLevel node.
var publishRequestId = Guid.NewGuid();
await using (leases.BeginApplyLease(current.Value, publishRequestId))
{
await coordinator.RefreshAsync(cancellationToken).ConfigureAwait(false);
// Future: fire a domain event that driver hosts / virtual-tag engine /
// scripted-alarm engine subscribe to. For now the topology refresh is the
// only thing we rewire — everything else still requires a process restart.
}
LastAppliedGenerationId = current;
RefreshCount++;
}
/// <summary>
/// Default generation-query implementation — reads via
/// <c>sp_GetCurrentGenerationForCluster</c>. Returns <c>null</c> when no generation
/// has been published yet, or when the DB call fails (logged at Warning; next tick
/// retries). Tests inject a stub <see cref="Func{CancellationToken, Task}"/> via the
/// <c>currentGenerationQuery</c> constructor parameter instead.
/// </summary>
private static async Task<long?> DefaultQueryCurrentGenerationAsync(
NodeOptions options,
ILogger logger,
CancellationToken cancellationToken)
{
try
{
await using var conn = new SqlConnection(options.ConfigDbConnectionString);
await conn.OpenAsync(cancellationToken).ConfigureAwait(false);
await using var cmd = conn.CreateCommand();
cmd.CommandText = "EXEC dbo.sp_GetCurrentGenerationForCluster @NodeId=@n, @ClusterId=@c";
cmd.Parameters.AddWithValue("@n", options.NodeId);
cmd.Parameters.AddWithValue("@c", options.ClusterId);
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
if (!await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
return null;
}
return reader.GetInt64(0);
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
logger.LogWarning(ex, "sp_GetCurrentGenerationForCluster failed — will retry");
return null;
}
}
}

View File

@@ -173,6 +173,12 @@ builder.Services.AddHttpClient(PeerHttpProbeLoop.HttpClientName);
builder.Services.AddHostedService<PeerHttpProbeLoop>();
builder.Services.AddHostedService<PeerUaProbeLoop>();
// Phase 6.3 A.2 + 6.1 Stream D — periodic generation refresh. Detects peer-published
// generations, opens an ApplyLeaseRegistry lease during the refresh window (so the
// publisher surfaces PrimaryMidApply=200 instead of sitting at PrimaryHealthy=255
// through the apply), and calls coordinator.RefreshAsync to pick up topology changes.
builder.Services.AddHostedService<GenerationRefreshHostedService>();
// Phase 7 follow-up #246 — historian sink + engine composer. NullAlarmHistorianSink
// is the default until the Galaxy.Host SqliteStoreAndForwardSink writer adapter
// lands (task #248). The composer reads Script/VirtualTag/ScriptedAlarm rows on

View File

@@ -0,0 +1,150 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging.Abstractions;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Configuration;
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
using ZB.MOM.WW.OtOpcUa.Server.Hosting;
using ZB.MOM.WW.OtOpcUa.Server.Redundancy;
namespace ZB.MOM.WW.OtOpcUa.Server.Tests;
/// <summary>
/// Unit tests for <see cref="GenerationRefreshHostedService"/>. Exercises the
/// lease-around-refresh semantics via a stub generation-query delegate — the real
/// DB path is exercised end-to-end by the Phase 6.3 compliance script.
/// </summary>
[Trait("Category", "Unit")]
public sealed class GenerationRefreshHostedServiceTests : IDisposable
{
private readonly OtOpcUaConfigDbContext _db;
private readonly IDbContextFactory<OtOpcUaConfigDbContext> _dbFactory;
public GenerationRefreshHostedServiceTests()
{
var opts = new DbContextOptionsBuilder<OtOpcUaConfigDbContext>()
.UseInMemoryDatabase($"gen-refresh-{Guid.NewGuid():N}")
.Options;
_db = new OtOpcUaConfigDbContext(opts);
_dbFactory = new DbContextFactory(opts);
}
public void Dispose() => _db.Dispose();
[Fact]
public async Task First_tick_applies_current_generation_and_closes_the_lease()
{
var coordinator = await SeedCoordinatorAsync();
var leases = new ApplyLeaseRegistry();
var service = NewService(coordinator, leases, currentGeneration: () => 42);
leases.IsApplyInProgress.ShouldBeFalse("no lease before first tick");
await service.TickAsync(CancellationToken.None);
service.LastAppliedGenerationId.ShouldBe(42);
service.TickCount.ShouldBe(1);
service.RefreshCount.ShouldBe(1);
leases.IsApplyInProgress.ShouldBeFalse("lease must be disposed after the apply window");
}
[Fact]
public async Task Subsequent_tick_with_same_generation_is_a_no_op()
{
var coordinator = await SeedCoordinatorAsync();
var leases = new ApplyLeaseRegistry();
var service = NewService(coordinator, leases, currentGeneration: () => 42);
await service.TickAsync(CancellationToken.None);
await service.TickAsync(CancellationToken.None);
service.TickCount.ShouldBe(2);
service.RefreshCount.ShouldBe(1, "second identical tick must skip the refresh");
leases.IsApplyInProgress.ShouldBeFalse();
}
[Fact]
public async Task Generation_change_triggers_new_refresh()
{
var coordinator = await SeedCoordinatorAsync();
var leases = new ApplyLeaseRegistry();
var current = 42L;
var service = NewService(coordinator, leases, currentGeneration: () => current);
await service.TickAsync(CancellationToken.None);
current = 43L;
await service.TickAsync(CancellationToken.None);
service.LastAppliedGenerationId.ShouldBe(43);
service.RefreshCount.ShouldBe(2);
}
[Fact]
public async Task Null_generation_means_no_published_config_yet_and_does_not_apply()
{
var coordinator = await SeedCoordinatorAsync();
var leases = new ApplyLeaseRegistry();
var service = NewService(coordinator, leases, currentGeneration: () => null);
await service.TickAsync(CancellationToken.None);
service.LastAppliedGenerationId.ShouldBeNull();
service.RefreshCount.ShouldBe(0);
service.TickCount.ShouldBe(1);
}
[Fact]
public async Task Lease_is_opened_during_the_refresh_window()
{
// Drive a query delegate that *also* observes lease state mid-call: the delegate
// fires before BeginApplyLease, so it sees IsApplyInProgress=false here, not
// during the lease window. We observe the lease from the outside by checking
// OpenLeaseCount on completion — if the `await using` mis-disposed we'd see 1
// dangling. Cleanest assertion in a stub-only world.
var coordinator = await SeedCoordinatorAsync();
var leases = new ApplyLeaseRegistry();
var service = NewService(coordinator, leases, currentGeneration: () => 42);
await service.TickAsync(CancellationToken.None);
leases.OpenLeaseCount.ShouldBe(0, "IAsyncDisposable dispose must fire regardless of outcome");
}
// ---- fixture helpers ---------------------------------------------------
private async Task<RedundancyCoordinator> SeedCoordinatorAsync()
{
_db.ServerClusters.Add(new ServerCluster
{
ClusterId = "c1", Name = "W", Enterprise = "zb", Site = "w",
RedundancyMode = RedundancyMode.None, CreatedBy = "test",
});
_db.ClusterNodes.Add(new ClusterNode
{
NodeId = "A", ClusterId = "c1",
RedundancyRole = RedundancyRole.Primary, Host = "a",
ApplicationUri = "urn:A", CreatedBy = "test",
});
await _db.SaveChangesAsync();
var coordinator = new RedundancyCoordinator(
_dbFactory, NullLogger<RedundancyCoordinator>.Instance, "A", "c1");
await coordinator.InitializeAsync(CancellationToken.None);
return coordinator;
}
private static GenerationRefreshHostedService NewService(
RedundancyCoordinator coordinator,
ApplyLeaseRegistry leases,
Func<long?> currentGeneration) =>
new(new NodeOptions { NodeId = "A", ClusterId = "c1", ConfigDbConnectionString = "unused" },
leases, coordinator, NullLogger<GenerationRefreshHostedService>.Instance,
tickInterval: TimeSpan.FromSeconds(1),
currentGenerationQuery: _ => Task.FromResult(currentGeneration()));
private sealed class DbContextFactory(DbContextOptions<OtOpcUaConfigDbContext> options)
: IDbContextFactory<OtOpcUaConfigDbContext>
{
public OtOpcUaConfigDbContext CreateDbContext() => new(options);
}
}