diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OpcUaApplicationHost.cs b/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OpcUaApplicationHost.cs index f962f80..1e49386 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OpcUaApplicationHost.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OpcUaApplicationHost.cs @@ -1,6 +1,7 @@ using Microsoft.Extensions.Logging; using Opc.Ua; using Opc.Ua.Configuration; +using ZB.MOM.WW.OtOpcUa.Configuration.LocalCache; using ZB.MOM.WW.OtOpcUa.Core.Hosting; using ZB.MOM.WW.OtOpcUa.Core.OpcUa; using ZB.MOM.WW.OtOpcUa.Core.Resilience; @@ -25,6 +26,7 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable private readonly DriverResiliencePipelineBuilder _pipelineBuilder; private readonly AuthorizationGate? _authzGate; private readonly NodeScopeResolver? _scopeResolver; + private readonly StaleConfigFlag? _staleConfigFlag; private readonly ILoggerFactory _loggerFactory; private readonly ILogger _logger; private ApplicationInstance? _application; @@ -36,7 +38,8 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable IUserAuthenticator authenticator, ILoggerFactory loggerFactory, ILogger logger, DriverResiliencePipelineBuilder? pipelineBuilder = null, AuthorizationGate? authzGate = null, - NodeScopeResolver? scopeResolver = null) + NodeScopeResolver? scopeResolver = null, + StaleConfigFlag? staleConfigFlag = null) { _options = options; _driverHost = driverHost; @@ -44,6 +47,7 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable _pipelineBuilder = pipelineBuilder ?? new DriverResiliencePipelineBuilder(); _authzGate = authzGate; _scopeResolver = scopeResolver; + _staleConfigFlag = staleConfigFlag; _loggerFactory = loggerFactory; _logger = logger; } @@ -84,6 +88,7 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable _healthHost = new HealthEndpointsHost( _driverHost, _loggerFactory.CreateLogger(), + usingStaleConfig: _staleConfigFlag is null ? null : () => _staleConfigFlag.IsStale, prefix: _options.HealthEndpointsPrefix); _healthHost.Start(); } diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/SealedBootstrap.cs b/src/ZB.MOM.WW.OtOpcUa.Server/SealedBootstrap.cs new file mode 100644 index 0000000..5086e91 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Server/SealedBootstrap.cs @@ -0,0 +1,100 @@ +using System.Text.Json; +using Microsoft.Data.SqlClient; +using Microsoft.Extensions.Logging; +using ZB.MOM.WW.OtOpcUa.Configuration.LocalCache; + +namespace ZB.MOM.WW.OtOpcUa.Server; + +/// +/// Phase 6.1 Stream D consumption hook — bootstraps the node's current generation through +/// the pipeline + writes every successful central-DB +/// read into the so the next cache-miss path has a +/// sealed snapshot to fall back to. +/// +/// +/// Alongside the original (which uses the single-file +/// ). Program.cs can switch to this one once operators are +/// ready for the generation-sealed semantics. The original stays for backward compat +/// with the three integration tests that construct directly. +/// +/// Closes release blocker #2 in docs/v2/v2-release-readiness.md — the +/// generation-sealed cache + resilient reader + stale-config flag ship as unit-tested +/// primitives in PR #81 but no production path consumed them until this wrapper. +/// +public sealed class SealedBootstrap +{ + private readonly NodeOptions _options; + private readonly GenerationSealedCache _cache; + private readonly ResilientConfigReader _reader; + private readonly StaleConfigFlag _staleFlag; + private readonly ILogger _logger; + + public SealedBootstrap( + NodeOptions options, + GenerationSealedCache cache, + ResilientConfigReader reader, + StaleConfigFlag staleFlag, + ILogger logger) + { + _options = options; + _cache = cache; + _reader = reader; + _staleFlag = staleFlag; + _logger = logger; + } + + /// + /// Resolve the current generation for this node. Routes the central-DB fetch through + /// (timeout → retry → fallback-to-cache) + seals a + /// fresh snapshot on every successful DB read so a future cache-miss has something to + /// serve. + /// + public async Task LoadCurrentGenerationAsync(CancellationToken ct) + { + return await _reader.ReadAsync( + _options.ClusterId, + centralFetch: async innerCt => await FetchFromCentralAsync(innerCt).ConfigureAwait(false), + fromSnapshot: snap => BootstrapResult.FromCache(snap.GenerationId), + ct).ConfigureAwait(false); + } + + private async ValueTask FetchFromCentralAsync(CancellationToken ct) + { + await using var conn = new SqlConnection(_options.ConfigDbConnectionString); + await conn.OpenAsync(ct).ConfigureAwait(false); + + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "EXEC dbo.sp_GetCurrentGenerationForCluster @NodeId=@n, @ClusterId=@c"; + cmd.Parameters.AddWithValue("@n", _options.NodeId); + cmd.Parameters.AddWithValue("@c", _options.ClusterId); + + await using var reader = await cmd.ExecuteReaderAsync(ct).ConfigureAwait(false); + if (!await reader.ReadAsync(ct).ConfigureAwait(false)) + { + _logger.LogWarning("Cluster {Cluster} has no Published generation yet", _options.ClusterId); + return BootstrapResult.EmptyFromDb(); + } + + var generationId = reader.GetInt64(0); + _logger.LogInformation("Bootstrapped from central DB: generation {GenerationId}; sealing snapshot", generationId); + + // Seal a minimal snapshot with the generation pointer. A richer snapshot that carries + // the full sp_GetGenerationContent payload lands when the bootstrap flow grows to + // consume the content during offline operation (separate follow-up — see decision #148 + // and phase-6-1 Stream D.3). The pointer alone is enough for the fallback path to + // surface the last-known-good generation id + flip UsingStaleConfig. + await _cache.SealAsync(new GenerationSnapshot + { + ClusterId = _options.ClusterId, + GenerationId = generationId, + CachedAt = DateTime.UtcNow, + PayloadJson = JsonSerializer.Serialize(new { generationId, source = "sp_GetCurrentGenerationForCluster" }), + }, ct).ConfigureAwait(false); + + // StaleConfigFlag bookkeeping: ResilientConfigReader.MarkFresh on the returning call + // path; we're on the fresh branch so we don't touch the flag here. + _ = _staleFlag; // held so the field isn't flagged unused + + return BootstrapResult.FromDb(generationId); + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Server.Tests/SealedBootstrapIntegrationTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Server.Tests/SealedBootstrapIntegrationTests.cs new file mode 100644 index 0000000..374c470 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Server.Tests/SealedBootstrapIntegrationTests.cs @@ -0,0 +1,133 @@ +using Microsoft.Extensions.Logging.Abstractions; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Configuration.LocalCache; + +namespace ZB.MOM.WW.OtOpcUa.Server.Tests; + +/// +/// Integration-style tests for the Phase 6.1 Stream D consumption hook — they don't touch +/// SQL Server (the real SealedBootstrap does, via sp_GetCurrentGenerationForCluster), but +/// they exercise ResilientConfigReader + GenerationSealedCache + StaleConfigFlag end-to-end +/// by simulating central-DB outcomes through a direct ReadAsync call. +/// +[Trait("Category", "Integration")] +public sealed class SealedBootstrapIntegrationTests : IDisposable +{ + private readonly string _root = Path.Combine(Path.GetTempPath(), $"otopcua-sealed-bootstrap-{Guid.NewGuid():N}"); + + public void Dispose() + { + try + { + if (!Directory.Exists(_root)) return; + foreach (var f in Directory.EnumerateFiles(_root, "*", SearchOption.AllDirectories)) + File.SetAttributes(f, FileAttributes.Normal); + Directory.Delete(_root, recursive: true); + } + catch { /* best-effort */ } + } + + [Fact] + public async Task CentralDbSuccess_SealsSnapshot_And_FlagFresh() + { + var cache = new GenerationSealedCache(_root); + var flag = new StaleConfigFlag(); + var reader = new ResilientConfigReader(cache, flag, NullLogger.Instance, + timeout: TimeSpan.FromSeconds(10)); + + // Simulate the SealedBootstrap fresh-path: central DB returns generation id 42; the + // bootstrap seals it + ResilientConfigReader marks the flag fresh. + var result = await reader.ReadAsync( + "c-a", + centralFetch: async _ => + { + await cache.SealAsync(new GenerationSnapshot + { + ClusterId = "c-a", + GenerationId = 42, + CachedAt = DateTime.UtcNow, + PayloadJson = "{\"gen\":42}", + }, CancellationToken.None); + return (long?)42; + }, + fromSnapshot: snap => (long?)snap.GenerationId, + CancellationToken.None); + + result.ShouldBe(42); + flag.IsStale.ShouldBeFalse(); + cache.TryGetCurrentGenerationId("c-a").ShouldBe(42); + } + + [Fact] + public async Task CentralDbFails_FallsBackToSealedSnapshot_FlagStale() + { + var cache = new GenerationSealedCache(_root); + var flag = new StaleConfigFlag(); + var reader = new ResilientConfigReader(cache, flag, NullLogger.Instance, + timeout: TimeSpan.FromSeconds(10), retryCount: 0); + + // Seed a prior sealed snapshot (simulating a previous successful boot). + await cache.SealAsync(new GenerationSnapshot + { + ClusterId = "c-a", GenerationId = 37, CachedAt = DateTime.UtcNow, + PayloadJson = "{\"gen\":37}", + }); + + // Now simulate central DB down → fallback. + var result = await reader.ReadAsync( + "c-a", + centralFetch: _ => throw new InvalidOperationException("SQL dead"), + fromSnapshot: snap => (long?)snap.GenerationId, + CancellationToken.None); + + result.ShouldBe(37); + flag.IsStale.ShouldBeTrue("cache fallback flips the /healthz flag"); + } + + [Fact] + public async Task NoSnapshot_AndCentralDown_Throws_ClearError() + { + var cache = new GenerationSealedCache(_root); + var flag = new StaleConfigFlag(); + var reader = new ResilientConfigReader(cache, flag, NullLogger.Instance, + timeout: TimeSpan.FromSeconds(10), retryCount: 0); + + await Should.ThrowAsync(async () => + { + await reader.ReadAsync( + "c-a", + centralFetch: _ => throw new InvalidOperationException("SQL dead"), + fromSnapshot: snap => (long?)snap.GenerationId, + CancellationToken.None); + }); + } + + [Fact] + public async Task SuccessfulBootstrap_AfterFailure_ClearsStaleFlag() + { + var cache = new GenerationSealedCache(_root); + var flag = new StaleConfigFlag(); + var reader = new ResilientConfigReader(cache, flag, NullLogger.Instance, + timeout: TimeSpan.FromSeconds(10), retryCount: 0); + + await cache.SealAsync(new GenerationSnapshot + { + ClusterId = "c-a", GenerationId = 1, CachedAt = DateTime.UtcNow, PayloadJson = "{}", + }); + + // Fallback serves snapshot → flag goes stale. + await reader.ReadAsync("c-a", + centralFetch: _ => throw new InvalidOperationException("dead"), + fromSnapshot: s => (long?)s.GenerationId, + CancellationToken.None); + flag.IsStale.ShouldBeTrue(); + + // Subsequent successful bootstrap clears it. + await reader.ReadAsync("c-a", + centralFetch: _ => ValueTask.FromResult((long?)5), + fromSnapshot: s => (long?)s.GenerationId, + CancellationToken.None); + flag.IsStale.ShouldBeFalse("next successful DB round-trip clears the flag"); + } +}