diff --git a/docs/v2/v2-release-readiness.md b/docs/v2/v2-release-readiness.md index 173124c..c950d5e 100644 --- a/docs/v2/v2-release-readiness.md +++ b/docs/v2/v2-release-readiness.md @@ -58,7 +58,7 @@ Remaining Phase 6.3 surfaces (hardening, not release-blocking): - ~~`PeerHttpProbeLoop` + `PeerUaProbeLoop` HostedServices populating `PeerReachabilityTracker` on each tick.~~ **Closed 2026-04-24.** Two-layer probe model shipped: HTTP probe at 2 s / 1 s timeout against `/healthz`; OPC UA probe at 10 s / 5 s timeout via `DiscoveryClient.GetEndpoints`, short-circuiting when HTTP reports the peer unhealthy. Registered on the Server as `AddHostedService` + `AddHostedService`. Publisher now sees accurate `PeerReachability` per peer instead of degrading to `Unknown` → Isolated-Primary band (230). - OPC UA variable-node wiring: bind `ServiceLevel` Byte + `ServerUriArray` String[] to the publisher's events via `BaseDataVariable.OnReadValue` / direct value push. -- `sp_PublishGeneration` wraps its apply in `await using var lease = coordinator.BeginApplyLease(...)` so the `PrimaryMidApply` band (200) fires during actual publishes (task #148 part 2). +- ~~`sp_PublishGeneration` wraps its apply in `await using var lease = coordinator.BeginApplyLease(...)` so the `PrimaryMidApply` band (200) fires during actual publishes (task #148 part 2).~~ **Closed 2026-04-24.** The apply loop now lives in `GenerationRefreshHostedService` — polls `sp_GetCurrentGenerationForCluster` every 5s, opens a lease when a new generation is detected, calls `RedundancyCoordinator.RefreshAsync` inside the `await using`, releases the lease on all exit paths. Replaces the previous "topology never refreshes without a process restart" behaviour. - Client interop matrix — Ignition / Kepware / Aveva OI Gateway (Stream F, task #150). Manual + doc-only. ### ~~Phase 5 driver complement~~ (task #120 — **CLOSED** 2026-04-24) diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/Hosting/GenerationRefreshHostedService.cs b/src/ZB.MOM.WW.OtOpcUa.Server/Hosting/GenerationRefreshHostedService.cs new file mode 100644 index 0000000..c0a401a --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Server/Hosting/GenerationRefreshHostedService.cs @@ -0,0 +1,160 @@ +using Microsoft.Data.SqlClient; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using ZB.MOM.WW.OtOpcUa.Server.Redundancy; + +namespace ZB.MOM.WW.OtOpcUa.Server.Hosting; + +/// +/// Phase 6.3 A.2 + Phase 6.1 Stream D follow-up — polls +/// sp_GetCurrentGenerationForCluster on a cadence and, when a newer generation +/// is detected, wraps the apply in an lease +/// (flipping ServiceLevel to ) and +/// refreshes the so operator role-swaps take +/// effect without a process restart. +/// +/// +/// +/// Before this service shipped, the Server only ever saw the generation in force +/// at process start (). Peer-published generations +/// silently accumulated in the shared config DB; the running node kept serving +/// the generation it had sealed on boot until the operator restarted it. +/// +/// +/// Closes the Phase 6.3 D.1 design hole around PrimaryMidApply: the +/// coordinator.BeginApplyLease(...) wrap now encloses an actual apply +/// (the coordinator refresh + future subscriber fan-out). Lease dispose fires +/// on every exit path — success, exception, cancellation — so +/// ApplyLeaseRegistry can never pin a crashed refresh at +/// PrimaryMidApply. +/// +/// +/// Deliberately narrow scope: refreshes +/// only. Driver re-init, virtual-tag re-bind, script-engine reload, etc. remain +/// as follow-up wiring — add subscribers to this service's apply path as those +/// components grow hot-reload support. The lease wrap is the right seam for +/// those subscribers to hook. +/// +/// +public sealed class GenerationRefreshHostedService( + NodeOptions options, + ApplyLeaseRegistry leases, + RedundancyCoordinator coordinator, + ILogger logger, + TimeSpan? tickInterval = null, + Func>? currentGenerationQuery = null) : BackgroundService +{ + private readonly Func> _generationQuery = currentGenerationQuery + ?? new Func>(ct => DefaultQueryCurrentGenerationAsync(options, logger, ct)); + /// + /// How often the service polls sp_GetCurrentGenerationForCluster. Default 5 s — + /// low enough that operator publishes take effect promptly, high enough that the + /// overhead on the central DB is negligible even across a 100-node fleet. + /// + public TimeSpan TickInterval { get; } = tickInterval ?? TimeSpan.FromSeconds(5); + + /// + /// Newest generation the service has applied. Exposed for diagnostics + + /// style health surfaces. null before the first + /// successful poll. + /// + public long? LastAppliedGenerationId { get; private set; } + + /// Successful ticks — whether or not a generation change was detected. + public int TickCount { get; private set; } + + /// Ticks that observed a generation change and ran a refresh. + public int RefreshCount { get; private set; } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + logger.LogInformation( + "GenerationRefreshHostedService running — polling every {Tick}s", + TickInterval.TotalSeconds); + + while (!stoppingToken.IsCancellationRequested) + { + try + { + await TickAsync(stoppingToken).ConfigureAwait(false); + } + catch (Exception ex) when (ex is not OperationCanceledException) + { + logger.LogWarning(ex, "GenerationRefreshHostedService tick failed"); + } + + try { await Task.Delay(TickInterval, stoppingToken).ConfigureAwait(false); } + catch (OperationCanceledException) { break; } + } + } + + // internal for tests — single-tick entry point. + internal async Task TickAsync(CancellationToken cancellationToken) + { + var current = await _generationQuery(cancellationToken).ConfigureAwait(false); + TickCount++; + if (current is null) return; + + if (LastAppliedGenerationId is long last && current == last) + { + return; // no change + } + + logger.LogInformation( + "Generation change detected — {Previous} → {Current}; applying", + LastAppliedGenerationId?.ToString() ?? "(none)", current); + + // Lease wraps the apply window: ServiceLevelCalculator reads + // ApplyLeaseRegistry.IsApplyInProgress and returns PrimaryMidApply (200) while any + // lease is open. Publisher ticks in parallel (1s cadence) will observe the band + // transition and push it onto the OPC UA Server.ServiceLevel node. + var publishRequestId = Guid.NewGuid(); + await using (leases.BeginApplyLease(current.Value, publishRequestId)) + { + await coordinator.RefreshAsync(cancellationToken).ConfigureAwait(false); + // Future: fire a domain event that driver hosts / virtual-tag engine / + // scripted-alarm engine subscribe to. For now the topology refresh is the + // only thing we rewire — everything else still requires a process restart. + } + + LastAppliedGenerationId = current; + RefreshCount++; + } + + /// + /// Default generation-query implementation — reads via + /// sp_GetCurrentGenerationForCluster. Returns null when no generation + /// has been published yet, or when the DB call fails (logged at Warning; next tick + /// retries). Tests inject a stub via the + /// currentGenerationQuery constructor parameter instead. + /// + private static async Task DefaultQueryCurrentGenerationAsync( + NodeOptions options, + ILogger logger, + CancellationToken cancellationToken) + { + try + { + await using var conn = new SqlConnection(options.ConfigDbConnectionString); + await conn.OpenAsync(cancellationToken).ConfigureAwait(false); + + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "EXEC dbo.sp_GetCurrentGenerationForCluster @NodeId=@n, @ClusterId=@c"; + cmd.Parameters.AddWithValue("@n", options.NodeId); + cmd.Parameters.AddWithValue("@c", options.ClusterId); + + await using var reader = await cmd.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false); + if (!await reader.ReadAsync(cancellationToken).ConfigureAwait(false)) + { + return null; + } + + return reader.GetInt64(0); + } + catch (Exception ex) when (ex is not OperationCanceledException) + { + logger.LogWarning(ex, "sp_GetCurrentGenerationForCluster failed — will retry"); + return null; + } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/Program.cs b/src/ZB.MOM.WW.OtOpcUa.Server/Program.cs index aef4d44..3239146 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Server/Program.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Server/Program.cs @@ -173,6 +173,12 @@ builder.Services.AddHttpClient(PeerHttpProbeLoop.HttpClientName); builder.Services.AddHostedService(); builder.Services.AddHostedService(); +// Phase 6.3 A.2 + 6.1 Stream D — periodic generation refresh. Detects peer-published +// generations, opens an ApplyLeaseRegistry lease during the refresh window (so the +// publisher surfaces PrimaryMidApply=200 instead of sitting at PrimaryHealthy=255 +// through the apply), and calls coordinator.RefreshAsync to pick up topology changes. +builder.Services.AddHostedService(); + // Phase 7 follow-up #246 — historian sink + engine composer. NullAlarmHistorianSink // is the default until the Galaxy.Host SqliteStoreAndForwardSink writer adapter // lands (task #248). The composer reads Script/VirtualTag/ScriptedAlarm rows on diff --git a/tests/ZB.MOM.WW.OtOpcUa.Server.Tests/GenerationRefreshHostedServiceTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Server.Tests/GenerationRefreshHostedServiceTests.cs new file mode 100644 index 0000000..82e9649 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Server.Tests/GenerationRefreshHostedServiceTests.cs @@ -0,0 +1,150 @@ +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging.Abstractions; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Configuration; +using ZB.MOM.WW.OtOpcUa.Configuration.Entities; +using ZB.MOM.WW.OtOpcUa.Configuration.Enums; +using ZB.MOM.WW.OtOpcUa.Server.Hosting; +using ZB.MOM.WW.OtOpcUa.Server.Redundancy; + +namespace ZB.MOM.WW.OtOpcUa.Server.Tests; + +/// +/// Unit tests for . Exercises the +/// lease-around-refresh semantics via a stub generation-query delegate — the real +/// DB path is exercised end-to-end by the Phase 6.3 compliance script. +/// +[Trait("Category", "Unit")] +public sealed class GenerationRefreshHostedServiceTests : IDisposable +{ + private readonly OtOpcUaConfigDbContext _db; + private readonly IDbContextFactory _dbFactory; + + public GenerationRefreshHostedServiceTests() + { + var opts = new DbContextOptionsBuilder() + .UseInMemoryDatabase($"gen-refresh-{Guid.NewGuid():N}") + .Options; + _db = new OtOpcUaConfigDbContext(opts); + _dbFactory = new DbContextFactory(opts); + } + + public void Dispose() => _db.Dispose(); + + [Fact] + public async Task First_tick_applies_current_generation_and_closes_the_lease() + { + var coordinator = await SeedCoordinatorAsync(); + var leases = new ApplyLeaseRegistry(); + var service = NewService(coordinator, leases, currentGeneration: () => 42); + + leases.IsApplyInProgress.ShouldBeFalse("no lease before first tick"); + await service.TickAsync(CancellationToken.None); + + service.LastAppliedGenerationId.ShouldBe(42); + service.TickCount.ShouldBe(1); + service.RefreshCount.ShouldBe(1); + leases.IsApplyInProgress.ShouldBeFalse("lease must be disposed after the apply window"); + } + + [Fact] + public async Task Subsequent_tick_with_same_generation_is_a_no_op() + { + var coordinator = await SeedCoordinatorAsync(); + var leases = new ApplyLeaseRegistry(); + var service = NewService(coordinator, leases, currentGeneration: () => 42); + + await service.TickAsync(CancellationToken.None); + await service.TickAsync(CancellationToken.None); + + service.TickCount.ShouldBe(2); + service.RefreshCount.ShouldBe(1, "second identical tick must skip the refresh"); + leases.IsApplyInProgress.ShouldBeFalse(); + } + + [Fact] + public async Task Generation_change_triggers_new_refresh() + { + var coordinator = await SeedCoordinatorAsync(); + var leases = new ApplyLeaseRegistry(); + var current = 42L; + var service = NewService(coordinator, leases, currentGeneration: () => current); + + await service.TickAsync(CancellationToken.None); + current = 43L; + await service.TickAsync(CancellationToken.None); + + service.LastAppliedGenerationId.ShouldBe(43); + service.RefreshCount.ShouldBe(2); + } + + [Fact] + public async Task Null_generation_means_no_published_config_yet_and_does_not_apply() + { + var coordinator = await SeedCoordinatorAsync(); + var leases = new ApplyLeaseRegistry(); + var service = NewService(coordinator, leases, currentGeneration: () => null); + + await service.TickAsync(CancellationToken.None); + + service.LastAppliedGenerationId.ShouldBeNull(); + service.RefreshCount.ShouldBe(0); + service.TickCount.ShouldBe(1); + } + + [Fact] + public async Task Lease_is_opened_during_the_refresh_window() + { + // Drive a query delegate that *also* observes lease state mid-call: the delegate + // fires before BeginApplyLease, so it sees IsApplyInProgress=false here, not + // during the lease window. We observe the lease from the outside by checking + // OpenLeaseCount on completion — if the `await using` mis-disposed we'd see 1 + // dangling. Cleanest assertion in a stub-only world. + var coordinator = await SeedCoordinatorAsync(); + var leases = new ApplyLeaseRegistry(); + var service = NewService(coordinator, leases, currentGeneration: () => 42); + + await service.TickAsync(CancellationToken.None); + + leases.OpenLeaseCount.ShouldBe(0, "IAsyncDisposable dispose must fire regardless of outcome"); + } + + // ---- fixture helpers --------------------------------------------------- + + private async Task SeedCoordinatorAsync() + { + _db.ServerClusters.Add(new ServerCluster + { + ClusterId = "c1", Name = "W", Enterprise = "zb", Site = "w", + RedundancyMode = RedundancyMode.None, CreatedBy = "test", + }); + _db.ClusterNodes.Add(new ClusterNode + { + NodeId = "A", ClusterId = "c1", + RedundancyRole = RedundancyRole.Primary, Host = "a", + ApplicationUri = "urn:A", CreatedBy = "test", + }); + await _db.SaveChangesAsync(); + + var coordinator = new RedundancyCoordinator( + _dbFactory, NullLogger.Instance, "A", "c1"); + await coordinator.InitializeAsync(CancellationToken.None); + return coordinator; + } + + private static GenerationRefreshHostedService NewService( + RedundancyCoordinator coordinator, + ApplyLeaseRegistry leases, + Func currentGeneration) => + new(new NodeOptions { NodeId = "A", ClusterId = "c1", ConfigDbConnectionString = "unused" }, + leases, coordinator, NullLogger.Instance, + tickInterval: TimeSpan.FromSeconds(1), + currentGenerationQuery: _ => Task.FromResult(currentGeneration())); + + private sealed class DbContextFactory(DbContextOptions options) + : IDbContextFactory + { + public OtOpcUaConfigDbContext CreateDbContext() => new(options); + } +}