Files
CBDDC/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/LmdbOplogStoreContractTests.cs
Joseph Doherty 6c4714f666
All checks were successful
NuGet Package Publish / nuget (push) Successful in 1m13s
Add XML docs required by CommentChecker fixes
2026-02-23 04:39:25 -05:00

334 lines
12 KiB
C#

using System.Diagnostics;
using Microsoft.Extensions.Logging.Abstractions;
using NSubstitute;
using ZB.MOM.WW.CBDDC.Core;
using ZB.MOM.WW.CBDDC.Core.Storage;
using ZB.MOM.WW.CBDDC.Core.Sync;
using ZB.MOM.WW.CBDDC.Persistence;
using ZB.MOM.WW.CBDDC.Persistence.Lmdb;
using ZB.MOM.WW.CBDDC.Persistence.Surreal;
namespace ZB.MOM.WW.CBDDC.Sample.Console.Tests;
public class SurrealOplogStoreContractParityTests : OplogStoreContractTestBase
{
/// <inheritdoc />
protected override Task<IOplogStoreContractHarness> CreateHarnessAsync()
{
return Task.FromResult<IOplogStoreContractHarness>(new SurrealOplogStoreContractHarness());
}
}
public class LmdbOplogStoreContractTests : OplogStoreContractTestBase
{
/// <inheritdoc />
protected override Task<IOplogStoreContractHarness> CreateHarnessAsync()
{
return Task.FromResult<IOplogStoreContractHarness>(new LmdbOplogStoreContractHarness());
}
/// <summary>
/// Verifies prune operations clear index tables as expected.
/// </summary>
[Fact]
public async Task Lmdb_IndexConsistency_InsertPopulatesAndPruneRemovesIndexes()
{
await using var harness = new LmdbOplogStoreContractHarness();
var store = (LmdbOplogStore)harness.Store;
var entry1 = CreateOplogEntry("Users", "i1", "node-a", 100, 0, "");
var entry2 = CreateOplogEntry("Users", "i2", "node-a", 101, 0, entry1.Hash);
await store.AppendOplogEntryAsync(entry1);
await store.AppendOplogEntryAsync(entry2);
LmdbOplogIndexDiagnostics before = await store.GetIndexDiagnosticsAsync(DatasetId.Primary);
before.OplogByHashCount.ShouldBe(2);
before.OplogByHlcCount.ShouldBe(2);
before.OplogByNodeHlcCount.ShouldBe(2);
before.OplogPrevToHashCount.ShouldBe(1);
before.OplogNodeHeadCount.ShouldBe(1);
await store.PruneOplogAsync(new HlcTimestamp(101, 0, "node-a"));
LmdbOplogIndexDiagnostics after = await store.GetIndexDiagnosticsAsync(DatasetId.Primary);
after.OplogByHashCount.ShouldBe(0);
after.OplogByHlcCount.ShouldBe(0);
after.OplogByNodeHlcCount.ShouldBe(0);
after.OplogPrevToHashCount.ShouldBe(0);
after.OplogNodeHeadCount.ShouldBe(0);
}
/// <summary>
/// Verifies prune retains newer entries while removing qualifying stale records.
/// </summary>
[Fact]
public async Task Lmdb_Prune_RemovesAtOrBeforeCutoff_AndKeepsNewerInterleavedEntries()
{
await using var harness = new LmdbOplogStoreContractHarness();
IOplogStore store = harness.Store;
var nodeAOld = CreateOplogEntry("Users", "a-old", "node-a", 100, 0, "");
var nodeBKeep = CreateOplogEntry("Users", "b-keep", "node-b", 105, 0, "");
var nodeANew = CreateOplogEntry("Users", "a-new", "node-a", 110, 0, nodeAOld.Hash);
var lateOld = CreateOplogEntry("Users", "late-old", "node-c", 90, 0, "");
await store.AppendOplogEntryAsync(nodeAOld);
await store.AppendOplogEntryAsync(nodeBKeep);
await store.AppendOplogEntryAsync(nodeANew);
await store.AppendOplogEntryAsync(lateOld);
await store.PruneOplogAsync(new HlcTimestamp(100, 0, "node-a"));
var remaining = (await store.ExportAsync()).Select(e => e.Hash).ToHashSet(StringComparer.Ordinal);
remaining.Contains(nodeAOld.Hash).ShouldBeFalse();
remaining.Contains(lateOld.Hash).ShouldBeFalse();
remaining.Contains(nodeBKeep.Hash).ShouldBeTrue();
remaining.Contains(nodeANew.Hash).ShouldBeTrue();
}
/// <summary>
/// Verifies node head values recompute correctly after prune operations.
/// </summary>
[Fact]
public async Task Lmdb_NodeHead_AdvancesAndRecomputesAcrossPrune()
{
await using var harness = new LmdbOplogStoreContractHarness();
IOplogStore store = harness.Store;
var older = CreateOplogEntry("Users", "n1", "node-a", 100, 0, "");
var newer = CreateOplogEntry("Users", "n2", "node-a", 120, 0, older.Hash);
await store.AppendOplogEntryAsync(older);
await store.AppendOplogEntryAsync(newer);
(await store.GetLastEntryHashAsync("node-a")).ShouldBe(newer.Hash);
await store.PruneOplogAsync(new HlcTimestamp(110, 0, "node-a"));
(await store.GetLastEntryHashAsync("node-a")).ShouldBe(newer.Hash);
await store.PruneOplogAsync(new HlcTimestamp(130, 0, "node-a"));
(await store.GetLastEntryHashAsync("node-a")).ShouldBeNull();
}
/// <summary>
/// Verifies durable persistence preserves node head after reopen.
/// </summary>
[Fact]
public async Task Lmdb_RestartDurability_PreservesHeadAndScans()
{
await using var harness = new LmdbOplogStoreContractHarness();
IOplogStore store = harness.Store;
var entry1 = CreateOplogEntry("Users", "r1", "node-a", 100, 0, "");
var entry2 = CreateOplogEntry("Users", "r2", "node-a", 101, 0, entry1.Hash);
await store.AppendOplogEntryAsync(entry1);
await store.AppendOplogEntryAsync(entry2);
IOplogStore reopened = harness.ReopenStore();
(await reopened.GetLastEntryHashAsync("node-a")).ShouldBe(entry2.Hash);
var after = (await reopened.GetOplogAfterAsync(new HlcTimestamp(0, 0, string.Empty))).ToList();
after.Count.ShouldBe(2);
after[0].Hash.ShouldBe(entry1.Hash);
after[1].Hash.ShouldBe(entry2.Hash);
}
/// <summary>
/// Verifies appending duplicate entries remains idempotent.
/// </summary>
[Fact]
public async Task Lmdb_Dedupe_DuplicateHashAppendIsIdempotent()
{
await using var harness = new LmdbOplogStoreContractHarness();
IOplogStore store = harness.Store;
var entry = CreateOplogEntry("Users", "d1", "node-a", 100, 0, "");
await store.AppendOplogEntryAsync(entry);
await store.AppendOplogEntryAsync(entry);
var exported = (await store.ExportAsync()).ToList();
exported.Count.ShouldBe(1);
exported[0].Hash.ShouldBe(entry.Hash);
}
/// <summary>
/// Verifies prune performance remains bounded under large synthetic datasets.
/// </summary>
[Fact]
public async Task Lmdb_PrunePerformanceSmoke_LargeSyntheticWindow_CompletesWithinGenerousBudget()
{
await using var harness = new LmdbOplogStoreContractHarness();
IOplogStore store = harness.Store;
string previousHash = string.Empty;
for (var i = 0; i < 5000; i++)
{
var entry = CreateOplogEntry("Users", $"p-{i:D5}", "node-a", 1_000 + i, 0, previousHash);
await store.AppendOplogEntryAsync(entry);
previousHash = entry.Hash;
}
var sw = Stopwatch.StartNew();
await store.PruneOplogAsync(new HlcTimestamp(6_000, 0, "node-a"));
sw.Stop();
sw.ElapsedMilliseconds.ShouldBeLessThan(30_000L);
(await store.ExportAsync()).ShouldBeEmpty();
}
}
internal sealed class SurrealOplogStoreContractHarness : IOplogStoreContractHarness
{
private readonly SurrealTestHarness _harness;
/// <summary>
/// Initializes a new surrogate Surreal contract harness.
/// </summary>
public SurrealOplogStoreContractHarness()
{
_harness = new SurrealTestHarness();
Store = _harness.CreateOplogStore();
}
/// <summary>
/// Gets the active store instance.
/// </summary>
public IOplogStore Store { get; private set; }
/// <summary>
/// Reopens the Surreal store and returns a fresh harness handle.
/// </summary>
public IOplogStore ReopenStore()
{
Store = _harness.CreateOplogStore();
return Store;
}
/// <summary>
/// Appends an entry into the Surreal store for a dataset.
/// </summary>
/// <param name="entry">The oplog entry to append.</param>
/// <param name="datasetId">The dataset identifier for the append operation.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public Task AppendOplogEntryAsync(OplogEntry entry, string datasetId, CancellationToken cancellationToken = default)
{
return ((SurrealOplogStore)Store).AppendOplogEntryAsync(entry, datasetId, cancellationToken);
}
/// <summary>
/// Exports all entries for a dataset from the Surreal store.
/// </summary>
/// <param name="datasetId">The dataset identifier to export.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public Task<IEnumerable<OplogEntry>> ExportAsync(string datasetId, CancellationToken cancellationToken = default)
{
return ((SurrealOplogStore)Store).ExportAsync(datasetId, cancellationToken);
}
/// <summary>
/// Disposes Surreal harness resources.
/// </summary>
public ValueTask DisposeAsync()
{
return _harness.DisposeAsync();
}
}
internal sealed class LmdbOplogStoreContractHarness : IOplogStoreContractHarness
{
private readonly string _rootPath;
private LmdbOplogStore? _store;
/// <summary>
/// Initializes a new LMDB contract harness and backing store.
/// </summary>
public LmdbOplogStoreContractHarness()
{
_rootPath = Path.Combine(Path.GetTempPath(), "cbddc-lmdb-tests", Guid.NewGuid().ToString("N"));
Directory.CreateDirectory(_rootPath);
_store = CreateStore();
}
/// <summary>
/// Gets the active LMDB store.
/// </summary>
public IOplogStore Store => _store ?? throw new ObjectDisposedException(nameof(LmdbOplogStoreContractHarness));
/// <summary>
/// Recreates the LMDB store instance and returns it.
/// </summary>
public IOplogStore ReopenStore()
{
_store?.Dispose();
_store = CreateStore();
return _store;
}
/// <summary>
/// Appends an entry into the LMDB store for a dataset.
/// </summary>
/// <param name="entry">The oplog entry to append.</param>
/// <param name="datasetId">The dataset identifier for the append operation.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public Task AppendOplogEntryAsync(OplogEntry entry, string datasetId, CancellationToken cancellationToken = default)
{
return (_store ?? throw new ObjectDisposedException(nameof(LmdbOplogStoreContractHarness)))
.AppendOplogEntryAsync(entry, datasetId, cancellationToken);
}
/// <summary>
/// Exports all entries for a dataset from the LMDB store.
/// </summary>
/// <param name="datasetId">The dataset identifier to export.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public Task<IEnumerable<OplogEntry>> ExportAsync(string datasetId, CancellationToken cancellationToken = default)
{
return (_store ?? throw new ObjectDisposedException(nameof(LmdbOplogStoreContractHarness)))
.ExportAsync(datasetId, cancellationToken);
}
/// <summary>
/// Disposes LMDB harness resources.
/// </summary>
public async ValueTask DisposeAsync()
{
_store?.Dispose();
_store = null;
for (var attempt = 0; attempt < 5; attempt++)
try
{
if (Directory.Exists(_rootPath))
Directory.Delete(_rootPath, true);
break;
}
catch when (attempt < 4)
{
await Task.Delay(50);
}
}
private LmdbOplogStore CreateStore()
{
string lmdbPath = Path.Combine(_rootPath, "lmdb-oplog");
Directory.CreateDirectory(lmdbPath);
return new LmdbOplogStore(
Substitute.For<IDocumentStore>(),
new LastWriteWinsConflictResolver(),
new VectorClockService(),
new LmdbOplogOptions
{
EnvironmentPath = lmdbPath,
MapSizeBytes = 64L * 1024 * 1024,
MaxDatabases = 16,
PruneBatchSize = 128
},
null,
NullLogger<LmdbOplogStore>.Instance);
}
}