64e3fbe035
v2-ci / build (push) Failing after 1m43s
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.IntegrationTests) (push) Has been skipped
Adds <summary>, <param>, <typeparam>, and <inheritdoc/> tags to public members surfaced by commentchecker — resolves 5,847 of 5,869 issues (99.6%) across three /fixdocs passes.
122 lines
5.2 KiB
C#
122 lines
5.2 KiB
C#
using LiteDB;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Configuration.LocalCache;
|
|
|
|
/// <summary>
|
|
/// LiteDB-backed <see cref="ILocalConfigCache"/>. One file per node (default
|
|
/// <c>config_cache.db</c>), one collection per snapshot. Corruption surfaces as
|
|
/// <see cref="LocalConfigCacheCorruptException"/> on construction or read — callers should
|
|
/// delete and re-fetch from the central DB (decision #80).
|
|
/// </summary>
|
|
public sealed class LiteDbConfigCache : ILocalConfigCache, IDisposable
|
|
{
|
|
private const string CollectionName = "generations";
|
|
private readonly LiteDatabase _db;
|
|
private readonly ILiteCollection<GenerationSnapshot> _col;
|
|
// PutAsync is a find-then-insert/update; without serialization, two concurrent puts for the
|
|
// same (ClusterId, GenerationId) can both observe `existing is null` and both Insert,
|
|
// producing duplicate rows (Configuration-005). Serialize writes through this semaphore so
|
|
// the read-modify-write block is atomic for a given instance. LiteDB itself only locks the
|
|
// page-level write, not the find-then-insert window.
|
|
private readonly SemaphoreSlim _writeGate = new(initialCount: 1, maxCount: 1);
|
|
|
|
/// <summary>Initializes a new instance of the <see cref="LiteDbConfigCache"/> class.</summary>
|
|
/// <param name="dbPath">Path to the LiteDB database file.</param>
|
|
public LiteDbConfigCache(string dbPath)
|
|
{
|
|
// LiteDB can be tolerant of header-only corruption at construction time (it may overwrite
|
|
// the header and "recover"), so we force a write + read probe to fail fast on real corruption.
|
|
try
|
|
{
|
|
_db = new LiteDatabase(new ConnectionString { Filename = dbPath, Upgrade = true });
|
|
_col = _db.GetCollection<GenerationSnapshot>(CollectionName);
|
|
_col.EnsureIndex(s => s.ClusterId);
|
|
_col.EnsureIndex(s => s.GenerationId);
|
|
_ = _col.Count();
|
|
}
|
|
catch (Exception ex) when (ex is LiteException or InvalidDataException or IOException
|
|
or NotSupportedException or UnauthorizedAccessException
|
|
or ArgumentOutOfRangeException or FormatException)
|
|
{
|
|
_db?.Dispose();
|
|
throw new LocalConfigCacheCorruptException(
|
|
$"LiteDB cache at '{dbPath}' is corrupt or unreadable — delete the file and refetch from the central DB.",
|
|
ex);
|
|
}
|
|
}
|
|
|
|
/// <summary>Gets the most recent snapshot for the specified cluster.</summary>
|
|
/// <param name="clusterId">The cluster ID.</param>
|
|
/// <param name="ct">Cancellation token.</param>
|
|
public Task<GenerationSnapshot?> GetMostRecentAsync(string clusterId, CancellationToken ct = default)
|
|
{
|
|
ct.ThrowIfCancellationRequested();
|
|
var snapshot = _col
|
|
.Find(s => s.ClusterId == clusterId)
|
|
.OrderByDescending(s => s.GenerationId)
|
|
.FirstOrDefault();
|
|
return Task.FromResult<GenerationSnapshot?>(snapshot);
|
|
}
|
|
|
|
/// <summary>Stores a snapshot in the cache.</summary>
|
|
/// <param name="snapshot">The snapshot to store.</param>
|
|
/// <param name="ct">Cancellation token.</param>
|
|
public async Task PutAsync(GenerationSnapshot snapshot, CancellationToken ct = default)
|
|
{
|
|
ct.ThrowIfCancellationRequested();
|
|
// Serialize the find-then-insert/update so concurrent callers do not observe a stale
|
|
// `existing is null` and both Insert (Configuration-005). LiteDB's per-call lock is
|
|
// not enough — the read and the write are independent calls.
|
|
await _writeGate.WaitAsync(ct).ConfigureAwait(false);
|
|
try
|
|
{
|
|
// upsert by (ClusterId, GenerationId) — replace in place if already cached
|
|
var existing = _col
|
|
.Find(s => s.ClusterId == snapshot.ClusterId && s.GenerationId == snapshot.GenerationId)
|
|
.FirstOrDefault();
|
|
|
|
if (existing is null)
|
|
_col.Insert(snapshot);
|
|
else
|
|
{
|
|
snapshot.Id = existing.Id;
|
|
_col.Update(snapshot);
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
_writeGate.Release();
|
|
}
|
|
}
|
|
|
|
/// <summary>Removes old generation snapshots, keeping only the latest ones.</summary>
|
|
/// <param name="clusterId">The cluster ID.</param>
|
|
/// <param name="keepLatest">Number of latest generations to keep.</param>
|
|
/// <param name="ct">Cancellation token.</param>
|
|
public Task PruneOldGenerationsAsync(string clusterId, int keepLatest = 10, CancellationToken ct = default)
|
|
{
|
|
ct.ThrowIfCancellationRequested();
|
|
var doomed = _col
|
|
.Find(s => s.ClusterId == clusterId)
|
|
.OrderByDescending(s => s.GenerationId)
|
|
.Skip(keepLatest)
|
|
.Select(s => s.Id)
|
|
.ToList();
|
|
|
|
foreach (var id in doomed)
|
|
_col.Delete(id);
|
|
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
/// <summary>Releases all resources used by the cache.</summary>
|
|
public void Dispose()
|
|
{
|
|
_writeGate.Dispose();
|
|
_db.Dispose();
|
|
}
|
|
}
|
|
|
|
public sealed class LocalConfigCacheCorruptException(string message, Exception inner)
|
|
: Exception(message, inner);
|