Closes Stream D per docs/v2/implementation/phase-6-1-resilience-and-observability.md. New Configuration.LocalCache types (alongside the existing single-file LiteDbConfigCache): - GenerationSealedCache — file-per-generation sealed snapshots per decision #148. Each SealAsync writes <cache-root>/<clusterId>/<generationId>.db as a read-only LiteDB file, then atomically publishes the CURRENT pointer via temp-file + File.Replace. Prior-generation files stay on disk for audit. Mixed-generation reads are structurally impossible: ReadCurrentAsync opens the single file named by CURRENT. Corruption of the pointer or the sealed file raises GenerationCacheUnavailableException — fails closed, never falls back silently to an older generation. TryGetCurrentGenerationId returns the pointer value or null for diagnostics. - StaleConfigFlag — thread-safe (Volatile.Read/Write) bool. MarkStale when a read fell back to the cache; MarkFresh when a central-DB read succeeded. Surfaced on /healthz body and Admin /hosts (Stream C wiring already in place). - ResilientConfigReader — wraps a central-DB fetch function with the Stream D.2 pipeline: timeout 2 s → retry N× jittered (skipped when retryCount=0) → fallback to the sealed cache. Toggles StaleConfigFlag per outcome. Read path only — the write path is expected to bypass this wrapper and fail hard on DB outage so inconsistent writes never land. Cancellation passes through and is NOT retried. Configuration.csproj: - Polly.Core 8.6.6 + Microsoft.Extensions.Logging.Abstractions added. Tests (17 new, all pass): - GenerationSealedCacheTests (10): first-boot-no-snapshot throws GenerationCacheUnavailableException (D.4 scenario C), seal-then-read round trip, sealed file is ReadOnly on disk, pointer advances to latest, prior generation file preserved, corrupt sealed file fails closed, missing sealed file fails closed, corrupt pointer fails closed (D.4 scenario B), same generation sealed twice is idempotent, independent clusters don't interfere. - ResilientConfigReaderTests (4): central-DB success returns value + marks fresh; central-DB failure exhausts retries + falls back to cache + marks stale (D.4 scenario A); central-DB + cache both unavailable throws; cancellation not retried. - StaleConfigFlagTests (3): default is fresh; toggles; concurrent writes converge. Full solution dotnet test: 1033 passing (baseline 906, +127 net across Phase 6.1 Streams A/B/C/D). Pre-existing Client.CLI Subscribe flake unchanged. Integration into Configuration read paths (DriverInstance enumeration, LdapGroupRoleMapping fetches, etc.) + the sp_PublishGeneration hook that writes sealed files lands in the Phase 6.1 Stream E / Admin-refresh PR where the DB integration surfaces are already touched. Existing LiteDbConfigCache continues serving its single-file role for the NodeBootstrap path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
171 lines
7.7 KiB
C#
171 lines
7.7 KiB
C#
using LiteDB;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Configuration.LocalCache;
|
|
|
|
/// <summary>
|
|
/// Generation-sealed LiteDB cache per <c>docs/v2/plan.md</c> decision #148 and Phase 6.1
|
|
/// Stream D.1. Each published generation writes one <b>read-only</b> LiteDB file under
|
|
/// <c><cache-root>/<clusterId>/<generationId>.db</c>. A per-cluster
|
|
/// <c>CURRENT</c> text file holds the currently-active generation id; it is updated
|
|
/// atomically (temp file + <see cref="File.Replace(string, string, string?)"/>) only after
|
|
/// the sealed file is fully written.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// <para>Mixed-generation reads are impossible: any read opens the single file pointed to
|
|
/// by <c>CURRENT</c>, which is a coherent snapshot. Corruption of the CURRENT file or the
|
|
/// sealed file surfaces as <see cref="GenerationCacheUnavailableException"/> — the reader
|
|
/// fails closed rather than silently falling back to an older generation. Recovery path
|
|
/// is to re-fetch from the central DB (and the Phase 6.1 Stream C <c>UsingStaleConfig</c>
|
|
/// flag goes true until that succeeds).</para>
|
|
///
|
|
/// <para>This cache is the read-path fallback when the central DB is unreachable. The
|
|
/// write path (draft edits, publish) bypasses the cache and fails hard on DB outage per
|
|
/// Stream D.2 — inconsistent writes are worse than a temporary inability to edit.</para>
|
|
/// </remarks>
|
|
public sealed class GenerationSealedCache
|
|
{
|
|
private const string CollectionName = "generation";
|
|
private const string CurrentPointerFileName = "CURRENT";
|
|
private readonly string _cacheRoot;
|
|
|
|
/// <summary>Root directory for all clusters' sealed caches.</summary>
|
|
public string CacheRoot => _cacheRoot;
|
|
|
|
public GenerationSealedCache(string cacheRoot)
|
|
{
|
|
ArgumentException.ThrowIfNullOrWhiteSpace(cacheRoot);
|
|
_cacheRoot = cacheRoot;
|
|
Directory.CreateDirectory(_cacheRoot);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Seal a generation: write the snapshot to <c><cluster>/<generationId>.db</c>,
|
|
/// mark the file read-only, then atomically publish the <c>CURRENT</c> pointer. Existing
|
|
/// sealed files for prior generations are preserved (prune separately).
|
|
/// </summary>
|
|
public async Task SealAsync(GenerationSnapshot snapshot, CancellationToken ct = default)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(snapshot);
|
|
ct.ThrowIfCancellationRequested();
|
|
|
|
var clusterDir = Path.Combine(_cacheRoot, snapshot.ClusterId);
|
|
Directory.CreateDirectory(clusterDir);
|
|
var sealedPath = Path.Combine(clusterDir, $"{snapshot.GenerationId}.db");
|
|
|
|
if (File.Exists(sealedPath))
|
|
{
|
|
// Already sealed — idempotent. Treat as no-op + update pointer in case an earlier
|
|
// seal succeeded but the pointer update failed (crash recovery).
|
|
WritePointerAtomically(clusterDir, snapshot.GenerationId);
|
|
return;
|
|
}
|
|
|
|
var tmpPath = sealedPath + ".tmp";
|
|
try
|
|
{
|
|
using (var db = new LiteDatabase(new ConnectionString { Filename = tmpPath, Upgrade = false }))
|
|
{
|
|
var col = db.GetCollection<GenerationSnapshot>(CollectionName);
|
|
col.Insert(snapshot);
|
|
}
|
|
|
|
File.Move(tmpPath, sealedPath);
|
|
File.SetAttributes(sealedPath, File.GetAttributes(sealedPath) | FileAttributes.ReadOnly);
|
|
WritePointerAtomically(clusterDir, snapshot.GenerationId);
|
|
}
|
|
catch
|
|
{
|
|
try { if (File.Exists(tmpPath)) File.Delete(tmpPath); } catch { /* best-effort */ }
|
|
throw;
|
|
}
|
|
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Read the current sealed snapshot for <paramref name="clusterId"/>. Throws
|
|
/// <see cref="GenerationCacheUnavailableException"/> when the pointer is missing
|
|
/// (first-boot-no-snapshot case) or when the sealed file is corrupt. Never silently
|
|
/// falls back to a prior generation.
|
|
/// </summary>
|
|
public Task<GenerationSnapshot> ReadCurrentAsync(string clusterId, CancellationToken ct = default)
|
|
{
|
|
ArgumentException.ThrowIfNullOrWhiteSpace(clusterId);
|
|
ct.ThrowIfCancellationRequested();
|
|
|
|
var clusterDir = Path.Combine(_cacheRoot, clusterId);
|
|
var pointerPath = Path.Combine(clusterDir, CurrentPointerFileName);
|
|
if (!File.Exists(pointerPath))
|
|
throw new GenerationCacheUnavailableException(
|
|
$"No sealed generation for cluster '{clusterId}' at '{clusterDir}'. First-boot case: the central DB must be reachable at least once before cache fallback is possible.");
|
|
|
|
long generationId;
|
|
try
|
|
{
|
|
var text = File.ReadAllText(pointerPath).Trim();
|
|
generationId = long.Parse(text, System.Globalization.CultureInfo.InvariantCulture);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
throw new GenerationCacheUnavailableException(
|
|
$"CURRENT pointer at '{pointerPath}' is corrupt or unreadable.", ex);
|
|
}
|
|
|
|
var sealedPath = Path.Combine(clusterDir, $"{generationId}.db");
|
|
if (!File.Exists(sealedPath))
|
|
throw new GenerationCacheUnavailableException(
|
|
$"CURRENT points at generation {generationId} but '{sealedPath}' is missing — fails closed rather than serving an older generation.");
|
|
|
|
try
|
|
{
|
|
using var db = new LiteDatabase(new ConnectionString { Filename = sealedPath, ReadOnly = true });
|
|
var col = db.GetCollection<GenerationSnapshot>(CollectionName);
|
|
var snapshot = col.FindAll().FirstOrDefault()
|
|
?? throw new GenerationCacheUnavailableException(
|
|
$"Sealed file '{sealedPath}' contains no snapshot row — file is corrupt.");
|
|
return Task.FromResult(snapshot);
|
|
}
|
|
catch (GenerationCacheUnavailableException) { throw; }
|
|
catch (Exception ex) when (ex is LiteException or InvalidDataException or IOException
|
|
or NotSupportedException or FormatException)
|
|
{
|
|
throw new GenerationCacheUnavailableException(
|
|
$"Sealed file '{sealedPath}' is corrupt or unreadable — fails closed rather than falling back to an older generation.", ex);
|
|
}
|
|
}
|
|
|
|
/// <summary>Return the generation id the <c>CURRENT</c> pointer points at, or null if no pointer exists.</summary>
|
|
public long? TryGetCurrentGenerationId(string clusterId)
|
|
{
|
|
ArgumentException.ThrowIfNullOrWhiteSpace(clusterId);
|
|
var pointerPath = Path.Combine(_cacheRoot, clusterId, CurrentPointerFileName);
|
|
if (!File.Exists(pointerPath)) return null;
|
|
try
|
|
{
|
|
return long.Parse(File.ReadAllText(pointerPath).Trim(), System.Globalization.CultureInfo.InvariantCulture);
|
|
}
|
|
catch
|
|
{
|
|
return null;
|
|
}
|
|
}
|
|
|
|
private static void WritePointerAtomically(string clusterDir, long generationId)
|
|
{
|
|
var pointerPath = Path.Combine(clusterDir, CurrentPointerFileName);
|
|
var tmpPath = pointerPath + ".tmp";
|
|
File.WriteAllText(tmpPath, generationId.ToString(System.Globalization.CultureInfo.InvariantCulture));
|
|
if (File.Exists(pointerPath))
|
|
File.Replace(tmpPath, pointerPath, destinationBackupFileName: null);
|
|
else
|
|
File.Move(tmpPath, pointerPath);
|
|
}
|
|
}
|
|
|
|
/// <summary>Sealed cache is unreachable — caller must fail closed.</summary>
|
|
public sealed class GenerationCacheUnavailableException : Exception
|
|
{
|
|
public GenerationCacheUnavailableException(string message) : base(message) { }
|
|
public GenerationCacheUnavailableException(string message, Exception inner) : base(message, inner) { }
|
|
}
|