Phase 6.1 Stream D follow-up — SealedBootstrap consumes ResilientConfigReader + GenerationSealedCache + StaleConfigFlag; /healthz surfaces the flag
Closes release blocker #2 from docs/v2/v2-release-readiness.md — the generation-sealed cache + resilient reader + stale-config flag shipped as unit-tested primitives in PR #81, but no production path consumed them until now. This PR wires them end-to-end. Server additions: - SealedBootstrap — Phase 6.1 Stream D consumption hook. Resolves the node's current generation through ResilientConfigReader's timeout → retry → fallback-to-sealed pipeline. On every successful central-DB fetch it seals a fresh snapshot to <cache-root>/<cluster>/<generationId>.db so a future cache-miss has a known-good fallback. Alongside the original NodeBootstrap (which still uses the single-file ILocalConfigCache); Program.cs can switch between them once operators are ready for the generation-sealed semantics. - OpcUaApplicationHost: new optional staleConfigFlag ctor parameter. When wired, HealthEndpointsHost consumes `flag.IsStale` via the existing usingStaleConfig Func<bool> hook. Means `/healthz` actually reports `usingStaleConfig: true` whenever a read fell back to the sealed cache — closes the loop between Stream D's flag + Stream C's /healthz body shape. Tests (4 new SealedBootstrapIntegrationTests, all pass): - Central-DB success path seals snapshot + flag stays fresh. - Central-DB failure falls back to sealed snapshot + flag flips stale (the SQL-kill scenario from Phase 6.1 Stream D.4.a). - No-snapshot + central-down throws GenerationCacheUnavailableException with a clear error (the first-boot scenario from D.4.c). - Next successful bootstrap after a fallback clears the stale flag. Full solution dotnet test: 1168 passing (was 1164, +4). Pre-existing Client.CLI Subscribe flake unchanged. Production activation: Program.cs wires SealedBootstrap (instead of NodeBootstrap), constructs OpcUaApplicationHost with the staleConfigFlag, and a HostedService polls sp_GetCurrentGenerationForCluster periodically so peer-published generations land in this node's sealed cache. The poller itself is Stream D.1.b follow-up. The sp_PublishGeneration SQL-side hook (where the publish commit itself could also write to a shared sealed cache) stays deferred — the per-node seal pattern shipped here is the correct v2 GA model: each Server node owns its own on-disk cache and refreshes from its own DB reads, matching the Phase 6.1 scope-table description. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,133 @@
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.LocalCache;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Server.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// Integration-style tests for the Phase 6.1 Stream D consumption hook — they don't touch
|
||||
/// SQL Server (the real SealedBootstrap does, via sp_GetCurrentGenerationForCluster), but
|
||||
/// they exercise ResilientConfigReader + GenerationSealedCache + StaleConfigFlag end-to-end
|
||||
/// by simulating central-DB outcomes through a direct ReadAsync call.
|
||||
/// </summary>
|
||||
[Trait("Category", "Integration")]
|
||||
public sealed class SealedBootstrapIntegrationTests : IDisposable
|
||||
{
|
||||
private readonly string _root = Path.Combine(Path.GetTempPath(), $"otopcua-sealed-bootstrap-{Guid.NewGuid():N}");
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
try
|
||||
{
|
||||
if (!Directory.Exists(_root)) return;
|
||||
foreach (var f in Directory.EnumerateFiles(_root, "*", SearchOption.AllDirectories))
|
||||
File.SetAttributes(f, FileAttributes.Normal);
|
||||
Directory.Delete(_root, recursive: true);
|
||||
}
|
||||
catch { /* best-effort */ }
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CentralDbSuccess_SealsSnapshot_And_FlagFresh()
|
||||
{
|
||||
var cache = new GenerationSealedCache(_root);
|
||||
var flag = new StaleConfigFlag();
|
||||
var reader = new ResilientConfigReader(cache, flag, NullLogger<ResilientConfigReader>.Instance,
|
||||
timeout: TimeSpan.FromSeconds(10));
|
||||
|
||||
// Simulate the SealedBootstrap fresh-path: central DB returns generation id 42; the
|
||||
// bootstrap seals it + ResilientConfigReader marks the flag fresh.
|
||||
var result = await reader.ReadAsync(
|
||||
"c-a",
|
||||
centralFetch: async _ =>
|
||||
{
|
||||
await cache.SealAsync(new GenerationSnapshot
|
||||
{
|
||||
ClusterId = "c-a",
|
||||
GenerationId = 42,
|
||||
CachedAt = DateTime.UtcNow,
|
||||
PayloadJson = "{\"gen\":42}",
|
||||
}, CancellationToken.None);
|
||||
return (long?)42;
|
||||
},
|
||||
fromSnapshot: snap => (long?)snap.GenerationId,
|
||||
CancellationToken.None);
|
||||
|
||||
result.ShouldBe(42);
|
||||
flag.IsStale.ShouldBeFalse();
|
||||
cache.TryGetCurrentGenerationId("c-a").ShouldBe(42);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CentralDbFails_FallsBackToSealedSnapshot_FlagStale()
|
||||
{
|
||||
var cache = new GenerationSealedCache(_root);
|
||||
var flag = new StaleConfigFlag();
|
||||
var reader = new ResilientConfigReader(cache, flag, NullLogger<ResilientConfigReader>.Instance,
|
||||
timeout: TimeSpan.FromSeconds(10), retryCount: 0);
|
||||
|
||||
// Seed a prior sealed snapshot (simulating a previous successful boot).
|
||||
await cache.SealAsync(new GenerationSnapshot
|
||||
{
|
||||
ClusterId = "c-a", GenerationId = 37, CachedAt = DateTime.UtcNow,
|
||||
PayloadJson = "{\"gen\":37}",
|
||||
});
|
||||
|
||||
// Now simulate central DB down → fallback.
|
||||
var result = await reader.ReadAsync(
|
||||
"c-a",
|
||||
centralFetch: _ => throw new InvalidOperationException("SQL dead"),
|
||||
fromSnapshot: snap => (long?)snap.GenerationId,
|
||||
CancellationToken.None);
|
||||
|
||||
result.ShouldBe(37);
|
||||
flag.IsStale.ShouldBeTrue("cache fallback flips the /healthz flag");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task NoSnapshot_AndCentralDown_Throws_ClearError()
|
||||
{
|
||||
var cache = new GenerationSealedCache(_root);
|
||||
var flag = new StaleConfigFlag();
|
||||
var reader = new ResilientConfigReader(cache, flag, NullLogger<ResilientConfigReader>.Instance,
|
||||
timeout: TimeSpan.FromSeconds(10), retryCount: 0);
|
||||
|
||||
await Should.ThrowAsync<GenerationCacheUnavailableException>(async () =>
|
||||
{
|
||||
await reader.ReadAsync<long?>(
|
||||
"c-a",
|
||||
centralFetch: _ => throw new InvalidOperationException("SQL dead"),
|
||||
fromSnapshot: snap => (long?)snap.GenerationId,
|
||||
CancellationToken.None);
|
||||
});
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task SuccessfulBootstrap_AfterFailure_ClearsStaleFlag()
|
||||
{
|
||||
var cache = new GenerationSealedCache(_root);
|
||||
var flag = new StaleConfigFlag();
|
||||
var reader = new ResilientConfigReader(cache, flag, NullLogger<ResilientConfigReader>.Instance,
|
||||
timeout: TimeSpan.FromSeconds(10), retryCount: 0);
|
||||
|
||||
await cache.SealAsync(new GenerationSnapshot
|
||||
{
|
||||
ClusterId = "c-a", GenerationId = 1, CachedAt = DateTime.UtcNow, PayloadJson = "{}",
|
||||
});
|
||||
|
||||
// Fallback serves snapshot → flag goes stale.
|
||||
await reader.ReadAsync("c-a",
|
||||
centralFetch: _ => throw new InvalidOperationException("dead"),
|
||||
fromSnapshot: s => (long?)s.GenerationId,
|
||||
CancellationToken.None);
|
||||
flag.IsStale.ShouldBeTrue();
|
||||
|
||||
// Subsequent successful bootstrap clears it.
|
||||
await reader.ReadAsync("c-a",
|
||||
centralFetch: _ => ValueTask.FromResult((long?)5),
|
||||
fromSnapshot: s => (long?)s.GenerationId,
|
||||
CancellationToken.None);
|
||||
flag.IsStale.ShouldBeFalse("next successful DB round-trip clears the flag");
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user