diff --git a/src/ZB.MOM.WW.OtOpcUa.Configuration/LocalCache/GenerationSealedCache.cs b/src/ZB.MOM.WW.OtOpcUa.Configuration/LocalCache/GenerationSealedCache.cs new file mode 100644 index 0000000..78408c9 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Configuration/LocalCache/GenerationSealedCache.cs @@ -0,0 +1,170 @@ +using LiteDB; + +namespace ZB.MOM.WW.OtOpcUa.Configuration.LocalCache; + +/// +/// Generation-sealed LiteDB cache per docs/v2/plan.md decision #148 and Phase 6.1 +/// Stream D.1. Each published generation writes one read-only LiteDB file under +/// <cache-root>/<clusterId>/<generationId>.db. A per-cluster +/// CURRENT text file holds the currently-active generation id; it is updated +/// atomically (temp file + ) only after +/// the sealed file is fully written. +/// +/// +/// Mixed-generation reads are impossible: any read opens the single file pointed to +/// by CURRENT, which is a coherent snapshot. Corruption of the CURRENT file or the +/// sealed file surfaces as — the reader +/// fails closed rather than silently falling back to an older generation. Recovery path +/// is to re-fetch from the central DB (and the Phase 6.1 Stream C UsingStaleConfig +/// flag goes true until that succeeds). +/// +/// This cache is the read-path fallback when the central DB is unreachable. The +/// write path (draft edits, publish) bypasses the cache and fails hard on DB outage per +/// Stream D.2 — inconsistent writes are worse than a temporary inability to edit. +/// +public sealed class GenerationSealedCache +{ + private const string CollectionName = "generation"; + private const string CurrentPointerFileName = "CURRENT"; + private readonly string _cacheRoot; + + /// Root directory for all clusters' sealed caches. + public string CacheRoot => _cacheRoot; + + public GenerationSealedCache(string cacheRoot) + { + ArgumentException.ThrowIfNullOrWhiteSpace(cacheRoot); + _cacheRoot = cacheRoot; + Directory.CreateDirectory(_cacheRoot); + } + + /// + /// Seal a generation: write the snapshot to <cluster>/<generationId>.db, + /// mark the file read-only, then atomically publish the CURRENT pointer. Existing + /// sealed files for prior generations are preserved (prune separately). + /// + public async Task SealAsync(GenerationSnapshot snapshot, CancellationToken ct = default) + { + ArgumentNullException.ThrowIfNull(snapshot); + ct.ThrowIfCancellationRequested(); + + var clusterDir = Path.Combine(_cacheRoot, snapshot.ClusterId); + Directory.CreateDirectory(clusterDir); + var sealedPath = Path.Combine(clusterDir, $"{snapshot.GenerationId}.db"); + + if (File.Exists(sealedPath)) + { + // Already sealed — idempotent. Treat as no-op + update pointer in case an earlier + // seal succeeded but the pointer update failed (crash recovery). + WritePointerAtomically(clusterDir, snapshot.GenerationId); + return; + } + + var tmpPath = sealedPath + ".tmp"; + try + { + using (var db = new LiteDatabase(new ConnectionString { Filename = tmpPath, Upgrade = false })) + { + var col = db.GetCollection(CollectionName); + col.Insert(snapshot); + } + + File.Move(tmpPath, sealedPath); + File.SetAttributes(sealedPath, File.GetAttributes(sealedPath) | FileAttributes.ReadOnly); + WritePointerAtomically(clusterDir, snapshot.GenerationId); + } + catch + { + try { if (File.Exists(tmpPath)) File.Delete(tmpPath); } catch { /* best-effort */ } + throw; + } + + await Task.CompletedTask; + } + + /// + /// Read the current sealed snapshot for . Throws + /// when the pointer is missing + /// (first-boot-no-snapshot case) or when the sealed file is corrupt. Never silently + /// falls back to a prior generation. + /// + public Task ReadCurrentAsync(string clusterId, CancellationToken ct = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(clusterId); + ct.ThrowIfCancellationRequested(); + + var clusterDir = Path.Combine(_cacheRoot, clusterId); + var pointerPath = Path.Combine(clusterDir, CurrentPointerFileName); + if (!File.Exists(pointerPath)) + throw new GenerationCacheUnavailableException( + $"No sealed generation for cluster '{clusterId}' at '{clusterDir}'. First-boot case: the central DB must be reachable at least once before cache fallback is possible."); + + long generationId; + try + { + var text = File.ReadAllText(pointerPath).Trim(); + generationId = long.Parse(text, System.Globalization.CultureInfo.InvariantCulture); + } + catch (Exception ex) + { + throw new GenerationCacheUnavailableException( + $"CURRENT pointer at '{pointerPath}' is corrupt or unreadable.", ex); + } + + var sealedPath = Path.Combine(clusterDir, $"{generationId}.db"); + if (!File.Exists(sealedPath)) + throw new GenerationCacheUnavailableException( + $"CURRENT points at generation {generationId} but '{sealedPath}' is missing — fails closed rather than serving an older generation."); + + try + { + using var db = new LiteDatabase(new ConnectionString { Filename = sealedPath, ReadOnly = true }); + var col = db.GetCollection(CollectionName); + var snapshot = col.FindAll().FirstOrDefault() + ?? throw new GenerationCacheUnavailableException( + $"Sealed file '{sealedPath}' contains no snapshot row — file is corrupt."); + return Task.FromResult(snapshot); + } + catch (GenerationCacheUnavailableException) { throw; } + catch (Exception ex) when (ex is LiteException or InvalidDataException or IOException + or NotSupportedException or FormatException) + { + throw new GenerationCacheUnavailableException( + $"Sealed file '{sealedPath}' is corrupt or unreadable — fails closed rather than falling back to an older generation.", ex); + } + } + + /// Return the generation id the CURRENT pointer points at, or null if no pointer exists. + public long? TryGetCurrentGenerationId(string clusterId) + { + ArgumentException.ThrowIfNullOrWhiteSpace(clusterId); + var pointerPath = Path.Combine(_cacheRoot, clusterId, CurrentPointerFileName); + if (!File.Exists(pointerPath)) return null; + try + { + return long.Parse(File.ReadAllText(pointerPath).Trim(), System.Globalization.CultureInfo.InvariantCulture); + } + catch + { + return null; + } + } + + private static void WritePointerAtomically(string clusterDir, long generationId) + { + var pointerPath = Path.Combine(clusterDir, CurrentPointerFileName); + var tmpPath = pointerPath + ".tmp"; + File.WriteAllText(tmpPath, generationId.ToString(System.Globalization.CultureInfo.InvariantCulture)); + if (File.Exists(pointerPath)) + File.Replace(tmpPath, pointerPath, destinationBackupFileName: null); + else + File.Move(tmpPath, pointerPath); + } +} + +/// Sealed cache is unreachable — caller must fail closed. +public sealed class GenerationCacheUnavailableException : Exception +{ + public GenerationCacheUnavailableException(string message) : base(message) { } + public GenerationCacheUnavailableException(string message, Exception inner) : base(message, inner) { } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Configuration/LocalCache/ResilientConfigReader.cs b/src/ZB.MOM.WW.OtOpcUa.Configuration/LocalCache/ResilientConfigReader.cs new file mode 100644 index 0000000..7f81378 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Configuration/LocalCache/ResilientConfigReader.cs @@ -0,0 +1,90 @@ +using Microsoft.Extensions.Logging; +using Polly; +using Polly.Retry; +using Polly.Timeout; + +namespace ZB.MOM.WW.OtOpcUa.Configuration.LocalCache; + +/// +/// Wraps a central-DB fetch function with Phase 6.1 Stream D.2 resilience: +/// timeout 2 s → retry 3× jittered → fallback to sealed cache. Maintains the +/// — fresh on central-DB success, stale on cache fallback. +/// +/// +/// Read-path only per plan. The write path (draft save, publish) bypasses this +/// wrapper entirely and fails hard on DB outage so inconsistent writes never land. +/// +/// Fallback is triggered by any exception the fetch raises (central-DB +/// unreachable, SqlException, timeout). If the sealed cache also fails (no pointer, +/// corrupt file, etc.), surfaces — caller +/// must fail the current request (InitializeAsync for a driver, etc.). +/// +public sealed class ResilientConfigReader +{ + private readonly GenerationSealedCache _cache; + private readonly StaleConfigFlag _staleFlag; + private readonly ResiliencePipeline _pipeline; + private readonly ILogger _logger; + + public ResilientConfigReader( + GenerationSealedCache cache, + StaleConfigFlag staleFlag, + ILogger logger, + TimeSpan? timeout = null, + int retryCount = 3) + { + _cache = cache; + _staleFlag = staleFlag; + _logger = logger; + var builder = new ResiliencePipelineBuilder() + .AddTimeout(new TimeoutStrategyOptions { Timeout = timeout ?? TimeSpan.FromSeconds(2) }); + + if (retryCount > 0) + { + builder.AddRetry(new RetryStrategyOptions + { + MaxRetryAttempts = retryCount, + BackoffType = DelayBackoffType.Exponential, + UseJitter = true, + Delay = TimeSpan.FromMilliseconds(100), + MaxDelay = TimeSpan.FromSeconds(1), + ShouldHandle = new PredicateBuilder().Handle(ex => ex is not OperationCanceledException), + }); + } + + _pipeline = builder.Build(); + } + + /// + /// Execute through the resilience pipeline. On full failure + /// (post-retry), reads the sealed cache for and passes the + /// snapshot to to extract the requested shape. + /// + public async ValueTask ReadAsync( + string clusterId, + Func> centralFetch, + Func fromSnapshot, + CancellationToken cancellationToken) + { + ArgumentException.ThrowIfNullOrWhiteSpace(clusterId); + ArgumentNullException.ThrowIfNull(centralFetch); + ArgumentNullException.ThrowIfNull(fromSnapshot); + + try + { + var result = await _pipeline.ExecuteAsync(centralFetch, cancellationToken).ConfigureAwait(false); + _staleFlag.MarkFresh(); + return result; + } + catch (Exception ex) when (ex is not OperationCanceledException) + { + _logger.LogWarning(ex, "Central-DB read failed after retries; falling back to sealed cache for cluster {ClusterId}", clusterId); + // GenerationCacheUnavailableException surfaces intentionally — fails the caller's + // operation. StaleConfigFlag stays unchanged; the flag only flips when we actually + // served a cache snapshot. + var snapshot = await _cache.ReadCurrentAsync(clusterId, cancellationToken).ConfigureAwait(false); + _staleFlag.MarkStale(); + return fromSnapshot(snapshot); + } + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Configuration/LocalCache/StaleConfigFlag.cs b/src/ZB.MOM.WW.OtOpcUa.Configuration/LocalCache/StaleConfigFlag.cs new file mode 100644 index 0000000..35e7e23 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Configuration/LocalCache/StaleConfigFlag.cs @@ -0,0 +1,20 @@ +namespace ZB.MOM.WW.OtOpcUa.Configuration.LocalCache; + +/// +/// Thread-safe UsingStaleConfig signal per Phase 6.1 Stream D.3. Flips true whenever +/// a read falls back to a sealed cache snapshot; flips false on the next successful central-DB +/// round-trip. Surfaced on /healthz body and on the Admin /hosts page. +/// +public sealed class StaleConfigFlag +{ + private int _stale; + + /// True when the last config read was served from the sealed cache, not the central DB. + public bool IsStale => Volatile.Read(ref _stale) != 0; + + /// Mark the current config as stale (a read fell back to the cache). + public void MarkStale() => Volatile.Write(ref _stale, 1); + + /// Mark the current config as fresh (a central-DB read succeeded). + public void MarkFresh() => Volatile.Write(ref _stale, 0); +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Configuration/ZB.MOM.WW.OtOpcUa.Configuration.csproj b/src/ZB.MOM.WW.OtOpcUa.Configuration/ZB.MOM.WW.OtOpcUa.Configuration.csproj index 33a1e4a..1d64ef1 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Configuration/ZB.MOM.WW.OtOpcUa.Configuration.csproj +++ b/src/ZB.MOM.WW.OtOpcUa.Configuration/ZB.MOM.WW.OtOpcUa.Configuration.csproj @@ -19,7 +19,9 @@ runtime; build; native; contentfiles; analyzers; buildtransitive + + diff --git a/tests/ZB.MOM.WW.OtOpcUa.Configuration.Tests/GenerationSealedCacheTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Configuration.Tests/GenerationSealedCacheTests.cs new file mode 100644 index 0000000..789581f --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Configuration.Tests/GenerationSealedCacheTests.cs @@ -0,0 +1,157 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Configuration.LocalCache; + +namespace ZB.MOM.WW.OtOpcUa.Configuration.Tests; + +[Trait("Category", "Unit")] +public sealed class GenerationSealedCacheTests : IDisposable +{ + private readonly string _root = Path.Combine(Path.GetTempPath(), $"otopcua-sealed-{Guid.NewGuid():N}"); + + public void Dispose() + { + try + { + if (!Directory.Exists(_root)) return; + // Remove ReadOnly attribute first so Directory.Delete can clean sealed files. + foreach (var f in Directory.EnumerateFiles(_root, "*", SearchOption.AllDirectories)) + File.SetAttributes(f, FileAttributes.Normal); + Directory.Delete(_root, recursive: true); + } + catch { /* best-effort cleanup */ } + } + + private GenerationSnapshot MakeSnapshot(string clusterId, long generationId, string payload = "{\"sample\":true}") => + new() + { + ClusterId = clusterId, + GenerationId = generationId, + CachedAt = DateTime.UtcNow, + PayloadJson = payload, + }; + + [Fact] + public async Task FirstBoot_NoSnapshot_ReadThrows() + { + var cache = new GenerationSealedCache(_root); + + await Should.ThrowAsync( + () => cache.ReadCurrentAsync("cluster-a")); + } + + [Fact] + public async Task SealThenRead_RoundTrips() + { + var cache = new GenerationSealedCache(_root); + var snapshot = MakeSnapshot("cluster-a", 42, "{\"hello\":\"world\"}"); + + await cache.SealAsync(snapshot); + + var read = await cache.ReadCurrentAsync("cluster-a"); + read.GenerationId.ShouldBe(42); + read.ClusterId.ShouldBe("cluster-a"); + read.PayloadJson.ShouldBe("{\"hello\":\"world\"}"); + } + + [Fact] + public async Task SealedFile_IsReadOnly_OnDisk() + { + var cache = new GenerationSealedCache(_root); + await cache.SealAsync(MakeSnapshot("cluster-a", 5)); + + var sealedPath = Path.Combine(_root, "cluster-a", "5.db"); + File.Exists(sealedPath).ShouldBeTrue(); + var attrs = File.GetAttributes(sealedPath); + attrs.HasFlag(FileAttributes.ReadOnly).ShouldBeTrue("sealed file must be read-only"); + } + + [Fact] + public async Task SealingTwoGenerations_PointerAdvances_ToLatest() + { + var cache = new GenerationSealedCache(_root); + await cache.SealAsync(MakeSnapshot("cluster-a", 1)); + await cache.SealAsync(MakeSnapshot("cluster-a", 2)); + + cache.TryGetCurrentGenerationId("cluster-a").ShouldBe(2); + var read = await cache.ReadCurrentAsync("cluster-a"); + read.GenerationId.ShouldBe(2); + } + + [Fact] + public async Task PriorGenerationFile_Survives_AfterNewSeal() + { + var cache = new GenerationSealedCache(_root); + await cache.SealAsync(MakeSnapshot("cluster-a", 1)); + await cache.SealAsync(MakeSnapshot("cluster-a", 2)); + + File.Exists(Path.Combine(_root, "cluster-a", "1.db")).ShouldBeTrue( + "prior generations preserved for audit; pruning is separate"); + File.Exists(Path.Combine(_root, "cluster-a", "2.db")).ShouldBeTrue(); + } + + [Fact] + public async Task CorruptSealedFile_ReadFailsClosed() + { + var cache = new GenerationSealedCache(_root); + await cache.SealAsync(MakeSnapshot("cluster-a", 7)); + + // Corrupt the sealed file: clear read-only, truncate, leave pointer intact. + var sealedPath = Path.Combine(_root, "cluster-a", "7.db"); + File.SetAttributes(sealedPath, FileAttributes.Normal); + File.WriteAllBytes(sealedPath, [0x00, 0x01, 0x02]); + + await Should.ThrowAsync( + () => cache.ReadCurrentAsync("cluster-a")); + } + + [Fact] + public async Task MissingSealedFile_ReadFailsClosed() + { + var cache = new GenerationSealedCache(_root); + await cache.SealAsync(MakeSnapshot("cluster-a", 3)); + + // Delete the sealed file but leave the pointer — corruption scenario. + var sealedPath = Path.Combine(_root, "cluster-a", "3.db"); + File.SetAttributes(sealedPath, FileAttributes.Normal); + File.Delete(sealedPath); + + await Should.ThrowAsync( + () => cache.ReadCurrentAsync("cluster-a")); + } + + [Fact] + public async Task CorruptPointerFile_ReadFailsClosed() + { + var cache = new GenerationSealedCache(_root); + await cache.SealAsync(MakeSnapshot("cluster-a", 9)); + + var pointerPath = Path.Combine(_root, "cluster-a", "CURRENT"); + File.WriteAllText(pointerPath, "not-a-number"); + + await Should.ThrowAsync( + () => cache.ReadCurrentAsync("cluster-a")); + } + + [Fact] + public async Task SealSameGenerationTwice_IsIdempotent() + { + var cache = new GenerationSealedCache(_root); + await cache.SealAsync(MakeSnapshot("cluster-a", 11)); + await cache.SealAsync(MakeSnapshot("cluster-a", 11, "{\"v\":2}")); + + var read = await cache.ReadCurrentAsync("cluster-a"); + read.PayloadJson.ShouldBe("{\"sample\":true}", "sealed file is immutable; second seal no-ops"); + } + + [Fact] + public async Task IndependentClusters_DoNotInterfere() + { + var cache = new GenerationSealedCache(_root); + await cache.SealAsync(MakeSnapshot("cluster-a", 1)); + await cache.SealAsync(MakeSnapshot("cluster-b", 10)); + + (await cache.ReadCurrentAsync("cluster-a")).GenerationId.ShouldBe(1); + (await cache.ReadCurrentAsync("cluster-b")).GenerationId.ShouldBe(10); + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Configuration.Tests/ResilientConfigReaderTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Configuration.Tests/ResilientConfigReaderTests.cs new file mode 100644 index 0000000..7e05565 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Configuration.Tests/ResilientConfigReaderTests.cs @@ -0,0 +1,154 @@ +using Microsoft.Extensions.Logging.Abstractions; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Configuration.LocalCache; + +namespace ZB.MOM.WW.OtOpcUa.Configuration.Tests; + +[Trait("Category", "Unit")] +public sealed class ResilientConfigReaderTests : IDisposable +{ + private readonly string _root = Path.Combine(Path.GetTempPath(), $"otopcua-reader-{Guid.NewGuid():N}"); + + public void Dispose() + { + try + { + if (!Directory.Exists(_root)) return; + foreach (var f in Directory.EnumerateFiles(_root, "*", SearchOption.AllDirectories)) + File.SetAttributes(f, FileAttributes.Normal); + Directory.Delete(_root, recursive: true); + } + catch { /* best-effort */ } + } + + [Fact] + public async Task CentralDbSucceeds_ReturnsValue_MarksFresh() + { + var cache = new GenerationSealedCache(_root); + var flag = new StaleConfigFlag { }; + flag.MarkStale(); // pre-existing stale state + var reader = new ResilientConfigReader(cache, flag, NullLogger.Instance); + + var result = await reader.ReadAsync( + "cluster-a", + _ => ValueTask.FromResult("fresh-from-db"), + _ => "from-cache", + CancellationToken.None); + + result.ShouldBe("fresh-from-db"); + flag.IsStale.ShouldBeFalse("successful central-DB read clears stale flag"); + } + + [Fact] + public async Task CentralDbFails_ExhaustsRetries_FallsBackToCache_MarksStale() + { + var cache = new GenerationSealedCache(_root); + await cache.SealAsync(new GenerationSnapshot + { + ClusterId = "cluster-a", GenerationId = 99, CachedAt = DateTime.UtcNow, + PayloadJson = "{\"cached\":true}", + }); + var flag = new StaleConfigFlag(); + var reader = new ResilientConfigReader(cache, flag, NullLogger.Instance, + timeout: TimeSpan.FromSeconds(10), retryCount: 2); + var attempts = 0; + + var result = await reader.ReadAsync( + "cluster-a", + _ => + { + attempts++; + throw new InvalidOperationException("SQL dead"); +#pragma warning disable CS0162 + return ValueTask.FromResult("never"); +#pragma warning restore CS0162 + }, + snap => snap.PayloadJson, + CancellationToken.None); + + attempts.ShouldBe(3, "1 initial + 2 retries = 3 attempts"); + result.ShouldBe("{\"cached\":true}"); + flag.IsStale.ShouldBeTrue("cache fallback flips stale flag true"); + } + + [Fact] + public async Task CentralDbFails_AndCacheAlsoUnavailable_Throws() + { + var cache = new GenerationSealedCache(_root); + var flag = new StaleConfigFlag(); + var reader = new ResilientConfigReader(cache, flag, NullLogger.Instance, + timeout: TimeSpan.FromSeconds(10), retryCount: 0); + + await Should.ThrowAsync(async () => + { + await reader.ReadAsync( + "cluster-a", + _ => throw new InvalidOperationException("SQL dead"), + _ => "never", + CancellationToken.None); + }); + + flag.IsStale.ShouldBeFalse("no snapshot ever served, so flag stays whatever it was"); + } + + [Fact] + public async Task Cancellation_NotRetried() + { + var cache = new GenerationSealedCache(_root); + var flag = new StaleConfigFlag(); + var reader = new ResilientConfigReader(cache, flag, NullLogger.Instance, + timeout: TimeSpan.FromSeconds(10), retryCount: 5); + using var cts = new CancellationTokenSource(); + cts.Cancel(); + var attempts = 0; + + await Should.ThrowAsync(async () => + { + await reader.ReadAsync( + "cluster-a", + ct => + { + attempts++; + ct.ThrowIfCancellationRequested(); + return ValueTask.FromResult("ok"); + }, + _ => "cache", + cts.Token); + }); + + attempts.ShouldBeLessThanOrEqualTo(1); + } +} + +[Trait("Category", "Unit")] +public sealed class StaleConfigFlagTests +{ + [Fact] + public void Default_IsFresh() + { + new StaleConfigFlag().IsStale.ShouldBeFalse(); + } + + [Fact] + public void MarkStale_ThenFresh_Toggles() + { + var flag = new StaleConfigFlag(); + flag.MarkStale(); + flag.IsStale.ShouldBeTrue(); + flag.MarkFresh(); + flag.IsStale.ShouldBeFalse(); + } + + [Fact] + public void ConcurrentWrites_Converge() + { + var flag = new StaleConfigFlag(); + Parallel.For(0, 1000, i => + { + if (i % 2 == 0) flag.MarkStale(); else flag.MarkFresh(); + }); + flag.MarkFresh(); + flag.IsStale.ShouldBeFalse(); + } +}