diff --git a/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/FeatureFlagOplogStore.cs b/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/FeatureFlagOplogStore.cs
index feb83f6..c3c9489 100644
--- a/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/FeatureFlagOplogStore.cs
+++ b/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/FeatureFlagOplogStore.cs
@@ -24,6 +24,11 @@ public sealed class FeatureFlagOplogStore : IOplogStore
///
/// Initializes a new instance of the class.
///
+ /// The Surreal-backed oplog store.
+ /// The LMDB-backed oplog store.
+ /// Feature flags controlling migration and routing behavior.
+ /// Optional telemetry collector for migration metrics.
+ /// Optional logger for routing and fallback diagnostics.
public FeatureFlagOplogStore(
SurrealOplogStore surreal,
LmdbOplogStore lmdb,
diff --git a/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/LmdbOplogBackfillTool.cs b/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/LmdbOplogBackfillTool.cs
index 9b7cf24..d717a79 100644
--- a/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/LmdbOplogBackfillTool.cs
+++ b/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/LmdbOplogBackfillTool.cs
@@ -18,6 +18,9 @@ public sealed class LmdbOplogBackfillTool
///
/// Initializes a new instance of the class.
///
+ /// The Surreal oplog source.
+ /// The LMDB oplog destination.
+ /// Optional logger instance.
public LmdbOplogBackfillTool(
SurrealOplogStore source,
LmdbOplogStore destination,
@@ -31,6 +34,8 @@ public sealed class LmdbOplogBackfillTool
///
/// Backfills one dataset from Surreal to LMDB and validates parity.
///
+ /// Dataset identifier to migrate.
+ /// Cancellation token.
public async Task BackfillAsync(
string datasetId,
CancellationToken cancellationToken = default)
@@ -61,6 +66,8 @@ public sealed class LmdbOplogBackfillTool
///
/// Validates parity only without running a backfill merge.
///
+ /// Dataset identifier to validate.
+ /// Cancellation token.
public async Task ValidateParityAsync(
string datasetId,
CancellationToken cancellationToken = default)
@@ -73,6 +80,8 @@ public sealed class LmdbOplogBackfillTool
///
/// Backfills and throws when parity validation fails.
///
+ /// Dataset identifier to backfill.
+ /// Cancellation token.
public async Task BackfillOrThrowAsync(
string datasetId,
CancellationToken cancellationToken = default)
diff --git a/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/LmdbOplogStore.cs b/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/LmdbOplogStore.cs
index 821df3d..75a38d9 100644
--- a/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/LmdbOplogStore.cs
+++ b/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/LmdbOplogStore.cs
@@ -35,6 +35,12 @@ public sealed class LmdbOplogStore : OplogStore, IDisposable
///
/// Initializes a new instance of the class.
///
+ /// Document store used for snapshot metadata persistence.
+ /// Conflict resolver used for resolving oplog merges.
+ /// Vector-clock service used for dataset ordering.
+ /// Configuration options for LMDB storage.
+ /// Optional snapshot metadata store.
+ /// Optional logger for LMDB diagnostics.
public LmdbOplogStore(
IDocumentStore documentStore,
IConflictResolver conflictResolver,
@@ -342,6 +348,8 @@ public sealed class LmdbOplogStore : OplogStore, IDisposable
///
/// Drops all oplog data for the specified dataset.
///
+ /// The dataset identifier.
+ /// Cancellation token.
public async Task DropAsync(string datasetId, CancellationToken cancellationToken = default)
{
string normalizedDatasetId = NormalizeDatasetId(datasetId);
@@ -373,6 +381,8 @@ public sealed class LmdbOplogStore : OplogStore, IDisposable
///
/// Exports all oplog entries for a dataset.
///
+ /// The dataset identifier.
+ /// Cancellation token.
public Task> ExportAsync(string datasetId, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
@@ -408,6 +418,9 @@ public sealed class LmdbOplogStore : OplogStore, IDisposable
///
/// Imports oplog entries for a dataset (upsert semantics).
///
+ /// Entries to import.
+ /// The dataset identifier.
+ /// Cancellation token.
public Task ImportAsync(
IEnumerable items,
string datasetId,
@@ -425,6 +438,9 @@ public sealed class LmdbOplogStore : OplogStore, IDisposable
///
/// Merges oplog entries into a dataset (dedupe by hash).
///
+ /// Entries to merge.
+ /// The dataset identifier.
+ /// Cancellation token.
public Task MergeAsync(
IEnumerable items,
string datasetId,
@@ -626,6 +642,8 @@ public sealed class LmdbOplogStore : OplogStore, IDisposable
///
/// Returns index-level diagnostics for a dataset, useful in contract/unit tests.
///
+ /// The dataset identifier.
+ /// Cancellation token.
public Task GetIndexDiagnosticsAsync(
string datasetId,
CancellationToken cancellationToken = default)
@@ -1360,15 +1378,54 @@ public sealed class LmdbOplogStore : OplogStore, IDisposable
private sealed class OplogEntryDto
{
+ ///
+ /// Dataset identifier for the serialized oplog entry.
+ ///
public string DatasetId { get; set; } = global::ZB.MOM.WW.CBDDC.Core.DatasetId.Primary;
+
+ ///
+ /// Entry collection name.
+ ///
public string Collection { get; set; } = string.Empty;
+
+ ///
+ /// Entry key within the collection.
+ ///
public string Key { get; set; } = string.Empty;
+
+ ///
+ /// Operation performed for this entry.
+ ///
public OperationType Operation { get; set; }
+
+ ///
+ /// Serialized payload for the entry.
+ ///
public JsonElement? Payload { get; set; }
+
+ ///
+ /// Physical time component of the HLC timestamp.
+ ///
public long PhysicalTime { get; set; }
+
+ ///
+ /// Logical counter component of the HLC timestamp.
+ ///
public int LogicalCounter { get; set; }
+
+ ///
+ /// Logical-clock node identifier.
+ ///
public string NodeId { get; set; } = string.Empty;
+
+ ///
+ /// Previous hash in the oplog chain.
+ ///
public string PreviousHash { get; set; } = string.Empty;
+
+ ///
+ /// Entry hash.
+ ///
public string Hash { get; set; } = string.Empty;
}
}
@@ -1376,10 +1433,60 @@ public sealed class LmdbOplogStore : OplogStore, IDisposable
///
/// Dataset-scoped LMDB oplog index counts.
///
-public readonly record struct LmdbOplogIndexDiagnostics(
- string DatasetId,
- long OplogByHashCount,
- long OplogByHlcCount,
- long OplogByNodeHlcCount,
- long OplogPrevToHashCount,
- long OplogNodeHeadCount);
+public readonly record struct LmdbOplogIndexDiagnostics
+{
+ ///
+ /// The dataset identifier for these diagnostics.
+ ///
+ public string DatasetId { get; init; }
+
+ ///
+ /// Count of entries in the hash index.
+ ///
+ public long OplogByHashCount { get; init; }
+
+ ///
+ /// Count of entries in the HLC index.
+ ///
+ public long OplogByHlcCount { get; init; }
+
+ ///
+ /// Count of entries in the per-node HLC index.
+ ///
+ public long OplogByNodeHlcCount { get; init; }
+
+ ///
+ /// Count of entries in the previous-hash index.
+ ///
+ public long OplogPrevToHashCount { get; init; }
+
+ ///
+ /// Count of entries tracked in the node head index.
+ ///
+ public long OplogNodeHeadCount { get; init; }
+
+ ///
+ /// Initializes a diagnostics snapshot for a dataset.
+ ///
+ /// Dataset identifier for the diagnostics.
+ /// Count of entries in the hash index.
+ /// Count of entries in the HLC index.
+ /// Count of entries in the per-node HLC index.
+ /// Count of entries in the previous-hash index.
+ /// Count of node-head entries.
+ public LmdbOplogIndexDiagnostics(
+ string datasetId,
+ long oplogByHashCount,
+ long oplogByHlcCount,
+ long oplogByNodeHlcCount,
+ long oplogPrevToHashCount,
+ long oplogNodeHeadCount)
+ {
+ DatasetId = datasetId;
+ OplogByHashCount = oplogByHashCount;
+ OplogByHlcCount = oplogByHlcCount;
+ OplogByNodeHlcCount = oplogByNodeHlcCount;
+ OplogPrevToHashCount = oplogPrevToHashCount;
+ OplogNodeHeadCount = oplogNodeHeadCount;
+ }
+}
diff --git a/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/OplogMigrationTelemetry.cs b/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/OplogMigrationTelemetry.cs
index bb6e9f5..ad1c2aa 100644
--- a/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/OplogMigrationTelemetry.cs
+++ b/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/OplogMigrationTelemetry.cs
@@ -25,6 +25,7 @@ public sealed class OplogMigrationTelemetry
///
/// Records the outcome of one shadow comparison.
///
+ /// when source and LMDB entries matched.
public void RecordShadowComparison(bool isMatch)
{
Interlocked.Increment(ref _shadowComparisons);
@@ -42,6 +43,8 @@ public sealed class OplogMigrationTelemetry
///
/// Records one reconciliation/backfill run for a dataset.
///
+ /// Dataset identifier for this reconciliation.
+ /// Count of entries merged into LMDB.
public void RecordReconciliation(string datasetId, int entriesMerged)
{
string normalizedDatasetId = DatasetId.Normalize(datasetId);
diff --git a/tests/ZB.MOM.WW.CBDDC.Core.Tests/DatasetAwareModelTests.cs b/tests/ZB.MOM.WW.CBDDC.Core.Tests/DatasetAwareModelTests.cs
index 9d0f130..b5a6c49 100644
--- a/tests/ZB.MOM.WW.CBDDC.Core.Tests/DatasetAwareModelTests.cs
+++ b/tests/ZB.MOM.WW.CBDDC.Core.Tests/DatasetAwareModelTests.cs
@@ -5,6 +5,9 @@ namespace ZB.MOM.WW.CBDDC.Core.Tests;
public class DatasetAwareModelTests
{
+ ///
+ /// Verifies defaults to the primary dataset.
+ ///
[Fact]
public void DocumentMetadata_ShouldDefaultDatasetId_ToPrimary()
{
@@ -13,6 +16,9 @@ public class DatasetAwareModelTests
metadata.DatasetId.ShouldBe(DatasetId.Primary);
}
+ ///
+ /// Verifies dataset identifiers survive JSON round-trips for .
+ ///
[Fact]
public void DocumentMetadata_SerializationRoundTrip_ShouldPreserveDatasetId()
{
@@ -25,6 +31,9 @@ public class DatasetAwareModelTests
restored.DatasetId.ShouldBe("logs");
}
+ ///
+ /// Verifies defaults to the primary dataset.
+ ///
[Fact]
public void SnapshotMetadata_ShouldDefaultDatasetId_ToPrimary()
{
@@ -33,6 +42,9 @@ public class DatasetAwareModelTests
metadata.DatasetId.ShouldBe(DatasetId.Primary);
}
+ ///
+ /// Verifies defaults to the primary dataset.
+ ///
[Fact]
public void PeerOplogConfirmation_ShouldDefaultDatasetId_ToPrimary()
{
diff --git a/tests/ZB.MOM.WW.CBDDC.Network.Tests/MultiDatasetRegistrationTests.cs b/tests/ZB.MOM.WW.CBDDC.Network.Tests/MultiDatasetRegistrationTests.cs
index 0d25786..cae6f12 100644
--- a/tests/ZB.MOM.WW.CBDDC.Network.Tests/MultiDatasetRegistrationTests.cs
+++ b/tests/ZB.MOM.WW.CBDDC.Network.Tests/MultiDatasetRegistrationTests.cs
@@ -6,6 +6,9 @@ namespace ZB.MOM.WW.CBDDC.Network.Tests;
public class MultiDatasetRegistrationTests
{
+ ///
+ /// Verifies CBDDC network registration replaces the default orchestrator with multi-dataset wiring.
+ ///
[Fact]
public void AddCBDDCMultiDataset_ShouldRegisterCoordinatorAndReplaceSyncOrchestrator()
{
@@ -28,12 +31,18 @@ public class MultiDatasetRegistrationTests
private sealed class TestPeerNodeConfigurationProvider : IPeerNodeConfigurationProvider
{
+ ///
+ /// Raised when the peer configuration changes.
+ ///
public event PeerNodeConfigurationChangedEventHandler? ConfigurationChanged
{
add { }
remove { }
}
+ ///
+ /// Returns the current peer configuration.
+ ///
public Task GetConfiguration()
{
return Task.FromResult(new PeerNodeConfiguration
diff --git a/tests/ZB.MOM.WW.CBDDC.Network.Tests/MultiDatasetSyncOrchestratorTests.cs b/tests/ZB.MOM.WW.CBDDC.Network.Tests/MultiDatasetSyncOrchestratorTests.cs
index 357022b..0661211 100644
--- a/tests/ZB.MOM.WW.CBDDC.Network.Tests/MultiDatasetSyncOrchestratorTests.cs
+++ b/tests/ZB.MOM.WW.CBDDC.Network.Tests/MultiDatasetSyncOrchestratorTests.cs
@@ -7,6 +7,9 @@ namespace ZB.MOM.WW.CBDDC.Network.Tests;
public class MultiDatasetSyncOrchestratorTests
{
+ ///
+ /// Verifies multi-dataset sync is disabled, only the primary context is created.
+ ///
[Fact]
public void Constructor_WhenMultiDatasetDisabled_ShouldOnlyCreatePrimaryContext()
{
@@ -27,6 +30,9 @@ public class MultiDatasetSyncOrchestratorTests
datasetIds[0].ShouldBe(DatasetId.Primary);
}
+ ///
+ /// Verifies that failures in one orchestrator do not prevent remaining contexts from starting and stopping.
+ ///
[Fact]
public async Task StartStop_WhenOneDatasetThrows_ShouldContinueOtherDatasets()
{
@@ -82,9 +88,19 @@ public class MultiDatasetSyncOrchestratorTests
private sealed class TrackingSyncOrchestrator(Exception? startException = null, Exception? stopException = null)
: ISyncOrchestrator
{
+ ///
+ /// Number of times has been called.
+ ///
public int StartCalls { get; private set; }
+
+ ///
+ /// Number of times has been called.
+ ///
public int StopCalls { get; private set; }
+ ///
+ /// Starts the orchestrator.
+ ///
public Task Start()
{
StartCalls++;
@@ -92,6 +108,9 @@ public class MultiDatasetSyncOrchestratorTests
return Task.CompletedTask;
}
+ ///
+ /// Stops the orchestrator.
+ ///
public Task Stop()
{
StopCalls++;
diff --git a/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/LmdbOplogMigrationTests.cs b/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/LmdbOplogMigrationTests.cs
index f10c792..50e5433 100644
--- a/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/LmdbOplogMigrationTests.cs
+++ b/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/LmdbOplogMigrationTests.cs
@@ -12,6 +12,9 @@ namespace ZB.MOM.WW.CBDDC.Sample.Console.Tests;
public class LmdbOplogMigrationTests
{
+ ///
+ /// Verifies dual-write mode writes each entry to both Surreal and LMDB stores.
+ ///
[Fact]
public async Task FeatureFlags_DualWrite_WritesToBothStores()
{
@@ -39,6 +42,9 @@ public class LmdbOplogMigrationTests
(await lmdbStore.GetEntryByHashAsync(entry.Hash)).ShouldNotBeNull();
}
+ ///
+ /// Verifies preferred LMDB reads reconcile missing LMDB data from Surreal.
+ ///
[Fact]
public async Task FeatureFlags_PreferLmdbReads_ReconcilesFromSurrealWhenLmdbMissingEntries()
{
@@ -78,6 +84,9 @@ public class LmdbOplogMigrationTests
telemetry.ReconciledEntries.ShouldBeGreaterThanOrEqualTo(1);
}
+ ///
+ /// Verifies shadow validation records mismatches when LMDB and Surreal diverge.
+ ///
[Fact]
public async Task FeatureFlags_ShadowValidation_RecordsMismatchTelemetry()
{
@@ -112,6 +121,9 @@ public class LmdbOplogMigrationTests
snapshot.ShadowMismatches.ShouldBe(1);
}
+ ///
+ /// Verifies rollback to Surreal when dual-write is disabled uses Surreal for all writes and reads.
+ ///
[Fact]
public async Task FeatureFlags_RollbackToSurreal_UsesSurrealForWritesAndReads()
{
@@ -143,6 +155,9 @@ public class LmdbOplogMigrationTests
routedRead.Hash.ShouldBe(entry.Hash);
}
+ ///
+ /// Verifies backfill succeeds and records matching dataset counts.
+ ///
[Fact]
public async Task BackfillTool_BackfillAndValidate_ReportsSuccess()
{
@@ -173,6 +188,9 @@ public class LmdbOplogMigrationTests
report.DestinationCount.ShouldBe(4);
}
+ ///
+ /// Verifies backfill can target non-primary datasets successfully.
+ ///
[Fact]
public async Task BackfillTool_BackfillAndValidate_WorksPerDataset()
{
diff --git a/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/LmdbOplogStoreContractTests.cs b/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/LmdbOplogStoreContractTests.cs
index 9943633..aea204f 100644
--- a/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/LmdbOplogStoreContractTests.cs
+++ b/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/LmdbOplogStoreContractTests.cs
@@ -12,6 +12,7 @@ namespace ZB.MOM.WW.CBDDC.Sample.Console.Tests;
public class SurrealOplogStoreContractParityTests : OplogStoreContractTestBase
{
+ ///
protected override Task CreateHarnessAsync()
{
return Task.FromResult(new SurrealOplogStoreContractHarness());
@@ -20,11 +21,15 @@ public class SurrealOplogStoreContractParityTests : OplogStoreContractTestBase
public class LmdbOplogStoreContractTests : OplogStoreContractTestBase
{
+ ///
protected override Task CreateHarnessAsync()
{
return Task.FromResult(new LmdbOplogStoreContractHarness());
}
+ ///
+ /// Verifies prune operations clear index tables as expected.
+ ///
[Fact]
public async Task Lmdb_IndexConsistency_InsertPopulatesAndPruneRemovesIndexes()
{
@@ -54,6 +59,9 @@ public class LmdbOplogStoreContractTests : OplogStoreContractTestBase
after.OplogNodeHeadCount.ShouldBe(0);
}
+ ///
+ /// Verifies prune retains newer entries while removing qualifying stale records.
+ ///
[Fact]
public async Task Lmdb_Prune_RemovesAtOrBeforeCutoff_AndKeepsNewerInterleavedEntries()
{
@@ -79,6 +87,9 @@ public class LmdbOplogStoreContractTests : OplogStoreContractTestBase
remaining.Contains(nodeANew.Hash).ShouldBeTrue();
}
+ ///
+ /// Verifies node head values recompute correctly after prune operations.
+ ///
[Fact]
public async Task Lmdb_NodeHead_AdvancesAndRecomputesAcrossPrune()
{
@@ -100,6 +111,9 @@ public class LmdbOplogStoreContractTests : OplogStoreContractTestBase
(await store.GetLastEntryHashAsync("node-a")).ShouldBeNull();
}
+ ///
+ /// Verifies durable persistence preserves node head after reopen.
+ ///
[Fact]
public async Task Lmdb_RestartDurability_PreservesHeadAndScans()
{
@@ -121,6 +135,9 @@ public class LmdbOplogStoreContractTests : OplogStoreContractTestBase
after[1].Hash.ShouldBe(entry2.Hash);
}
+ ///
+ /// Verifies appending duplicate entries remains idempotent.
+ ///
[Fact]
public async Task Lmdb_Dedupe_DuplicateHashAppendIsIdempotent()
{
@@ -137,6 +154,9 @@ public class LmdbOplogStoreContractTests : OplogStoreContractTestBase
exported[0].Hash.ShouldBe(entry.Hash);
}
+ ///
+ /// Verifies prune performance remains bounded under large synthetic datasets.
+ ///
[Fact]
public async Task Lmdb_PrunePerformanceSmoke_LargeSyntheticWindow_CompletesWithinGenerousBudget()
{
@@ -164,30 +184,53 @@ internal sealed class SurrealOplogStoreContractHarness : IOplogStoreContractHarn
{
private readonly SurrealTestHarness _harness;
+ ///
+ /// Initializes a new surrogate Surreal contract harness.
+ ///
public SurrealOplogStoreContractHarness()
{
_harness = new SurrealTestHarness();
Store = _harness.CreateOplogStore();
}
+ ///
+ /// Gets the active store instance.
+ ///
public IOplogStore Store { get; private set; }
+ ///
+ /// Reopens the Surreal store and returns a fresh harness handle.
+ ///
public IOplogStore ReopenStore()
{
Store = _harness.CreateOplogStore();
return Store;
}
+ ///
+ /// Appends an entry into the Surreal store for a dataset.
+ ///
+ /// The oplog entry to append.
+ /// The dataset identifier for the append operation.
+ /// The cancellation token.
public Task AppendOplogEntryAsync(OplogEntry entry, string datasetId, CancellationToken cancellationToken = default)
{
return ((SurrealOplogStore)Store).AppendOplogEntryAsync(entry, datasetId, cancellationToken);
}
+ ///
+ /// Exports all entries for a dataset from the Surreal store.
+ ///
+ /// The dataset identifier to export.
+ /// The cancellation token.
public Task> ExportAsync(string datasetId, CancellationToken cancellationToken = default)
{
return ((SurrealOplogStore)Store).ExportAsync(datasetId, cancellationToken);
}
+ ///
+ /// Disposes Surreal harness resources.
+ ///
public ValueTask DisposeAsync()
{
return _harness.DisposeAsync();
@@ -199,6 +242,9 @@ internal sealed class LmdbOplogStoreContractHarness : IOplogStoreContractHarness
private readonly string _rootPath;
private LmdbOplogStore? _store;
+ ///
+ /// Initializes a new LMDB contract harness and backing store.
+ ///
public LmdbOplogStoreContractHarness()
{
_rootPath = Path.Combine(Path.GetTempPath(), "cbddc-lmdb-tests", Guid.NewGuid().ToString("N"));
@@ -206,8 +252,14 @@ internal sealed class LmdbOplogStoreContractHarness : IOplogStoreContractHarness
_store = CreateStore();
}
+ ///
+ /// Gets the active LMDB store.
+ ///
public IOplogStore Store => _store ?? throw new ObjectDisposedException(nameof(LmdbOplogStoreContractHarness));
+ ///
+ /// Recreates the LMDB store instance and returns it.
+ ///
public IOplogStore ReopenStore()
{
_store?.Dispose();
@@ -215,18 +267,32 @@ internal sealed class LmdbOplogStoreContractHarness : IOplogStoreContractHarness
return _store;
}
+ ///
+ /// Appends an entry into the LMDB store for a dataset.
+ ///
+ /// The oplog entry to append.
+ /// The dataset identifier for the append operation.
+ /// The cancellation token.
public Task AppendOplogEntryAsync(OplogEntry entry, string datasetId, CancellationToken cancellationToken = default)
{
return (_store ?? throw new ObjectDisposedException(nameof(LmdbOplogStoreContractHarness)))
.AppendOplogEntryAsync(entry, datasetId, cancellationToken);
}
+ ///
+ /// Exports all entries for a dataset from the LMDB store.
+ ///
+ /// The dataset identifier to export.
+ /// The cancellation token.
public Task> ExportAsync(string datasetId, CancellationToken cancellationToken = default)
{
return (_store ?? throw new ObjectDisposedException(nameof(LmdbOplogStoreContractHarness)))
.ExportAsync(datasetId, cancellationToken);
}
+ ///
+ /// Disposes LMDB harness resources.
+ ///
public async ValueTask DisposeAsync()
{
_store?.Dispose();
diff --git a/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/MultiDatasetConfigParsingTests.cs b/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/MultiDatasetConfigParsingTests.cs
index 6cdc6d5..53954d7 100644
--- a/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/MultiDatasetConfigParsingTests.cs
+++ b/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/MultiDatasetConfigParsingTests.cs
@@ -6,6 +6,9 @@ namespace ZB.MOM.WW.CBDDC.Sample.Console.Tests;
public class MultiDatasetConfigParsingTests
{
+ ///
+ /// Verifies multi-dataset section binds runtime options from JSON.
+ ///
[Fact]
public void MultiDatasetSection_ShouldBindRuntimeOptions()
{
diff --git a/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/OplogStoreContractTestBase.cs b/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/OplogStoreContractTestBase.cs
index 9b6efa3..478f091 100644
--- a/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/OplogStoreContractTestBase.cs
+++ b/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/OplogStoreContractTestBase.cs
@@ -6,6 +6,9 @@ namespace ZB.MOM.WW.CBDDC.Sample.Console.Tests;
public abstract class OplogStoreContractTestBase
{
+ ///
+ /// Verifies append, merge, and drop behavior across query, chain, and restart scenarios.
+ ///
[Fact]
public async Task OplogStore_AppendQueryMergeDrop_AndLastHash_Works()
{
@@ -46,6 +49,9 @@ public abstract class OplogStoreContractTestBase
(await rehydratedStore.ExportAsync()).ShouldBeEmpty();
}
+ ///
+ /// Verifies dataset isolation between primary and secondary stores.
+ ///
[Fact]
public async Task OplogStore_DatasetIsolation_Works()
{
@@ -68,6 +74,9 @@ public abstract class OplogStoreContractTestBase
logs[0].DatasetId.ShouldBe(DatasetId.Logs);
}
+ ///
+ /// Verifies chain range queries return ordered linked entries.
+ ///
[Fact]
public async Task OplogStore_GetChainRangeAsync_ReturnsOrderedLinkedRange()
{
@@ -89,8 +98,20 @@ public abstract class OplogStoreContractTestBase
range[1].Hash.ShouldBe(entry3.Hash);
}
+ ///
+ /// Creates the contract harness for this test class.
+ ///
protected abstract Task CreateHarnessAsync();
+ ///
+ /// Creates a reusable oplog entry with deterministic timestamps.
+ ///
+ /// The collection name.
+ /// The entry key.
+ /// The node identifier generating the entry.
+ /// The wall-clock component of the HLC timestamp.
+ /// The logical clock component of the HLC timestamp.
+ /// The previous entry hash.
protected static OplogEntry CreateOplogEntry(
string collection,
string key,
@@ -111,11 +132,28 @@ public abstract class OplogStoreContractTestBase
public interface IOplogStoreContractHarness : IAsyncDisposable
{
+ ///
+ /// Gets the active contract store.
+ ///
IOplogStore Store { get; }
+ ///
+ /// Reopens the harness storage.
+ ///
IOplogStore ReopenStore();
+ ///
+ /// Appends an entry for the specified dataset.
+ ///
+ /// The oplog entry to append.
+ /// The dataset identifier.
+ /// The cancellation token.
Task AppendOplogEntryAsync(OplogEntry entry, string datasetId, CancellationToken cancellationToken = default);
+ ///
+ /// Exports entries for the specified dataset.
+ ///
+ /// The dataset identifier.
+ /// The cancellation token.
Task> ExportAsync(string datasetId, CancellationToken cancellationToken = default);
}
diff --git a/tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests/BenchmarkPeerNode.cs b/tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests/BenchmarkPeerNode.cs
index b44d2dc..267d165 100644
--- a/tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests/BenchmarkPeerNode.cs
+++ b/tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests/BenchmarkPeerNode.cs
@@ -3,6 +3,7 @@ using Microsoft.Extensions.Logging;
using ZB.MOM.WW.CBDDC.Core.Network;
using ZB.MOM.WW.CBDDC.Network;
using ZB.MOM.WW.CBDDC.Network.Security;
+using ZB.MOM.WW.CBDDC.Persistence.Lmdb;
using ZB.MOM.WW.CBDDC.Persistence.Surreal;
using ZB.MOM.WW.CBDDC.Sample.Console;
@@ -15,6 +16,18 @@ internal sealed class BenchmarkPeerNode : IAsyncDisposable
private readonly string _workDir;
private bool _started;
+ ///
+ /// Gets the active EF/Core context for generated users.
+ ///
+ public SampleDbContext Context { get; }
+
+ ///
+ /// Creates and initializes a benchmark peer node.
+ ///
+ /// Service provider containing node dependencies.
+ /// Benchmark node abstraction.
+ /// Live data context used by the benchmark.
+ /// Temporary working directory for this node.
private BenchmarkPeerNode(
ServiceProvider serviceProvider,
ICBDDCNode node,
@@ -27,8 +40,13 @@ internal sealed class BenchmarkPeerNode : IAsyncDisposable
_workDir = workDir;
}
- public SampleDbContext Context { get; }
-
+ ///
+ /// Creates and starts a benchmark peer node from configuration.
+ ///
+ /// Unique peer identifier.
+ /// Local TCP port for the node.
+ /// Authentication token shared across peers.
+ /// Known peers to connect to at startup.
public static BenchmarkPeerNode Create(
string nodeId,
int tcpPort,
@@ -58,7 +76,13 @@ internal sealed class BenchmarkPeerNode : IAsyncDisposable
services.AddSingleton();
services.AddSingleton();
- services.AddCBDDCCore()
+ bool useLmdb = GetBoolEnv("CBDDC_BENCH_USE_LMDB", defaultValue: true);
+ bool dualWrite = GetBoolEnv("CBDDC_BENCH_DUAL_WRITE", defaultValue: true);
+ bool preferLmdbReads = GetBoolEnv("CBDDC_BENCH_PREFER_LMDB_READS", defaultValue: true);
+ bool enableShadowValidation = GetBoolEnv("CBDDC_BENCH_SHADOW_READ_VALIDATE", defaultValue: false);
+ int reconcileIntervalMs = GetIntEnv("CBDDC_BENCH_RECONCILE_INTERVAL_MS", defaultValue: 0);
+
+ var registration = services.AddCBDDCCore()
.AddCBDDCSurrealEmbedded(_ => new CBDDCSurrealEmbeddedOptions
{
Endpoint = "rocksdb://local",
@@ -72,8 +96,27 @@ internal sealed class BenchmarkPeerNode : IAsyncDisposable
PollingInterval = TimeSpan.FromMilliseconds(50),
EnableLiveSelectAccelerator = true
}
- })
- .AddCBDDCNetwork(false);
+ });
+
+ if (useLmdb)
+ registration.AddCBDDCLmdbOplog(
+ _ => new LmdbOplogOptions
+ {
+ EnvironmentPath = Path.Combine(workDir, "oplog-lmdb"),
+ MapSizeBytes = 256L * 1024 * 1024,
+ MaxDatabases = 16,
+ PruneBatchSize = 512
+ },
+ flags =>
+ {
+ flags.UseLmdbOplog = true;
+ flags.DualWriteOplog = dualWrite;
+ flags.PreferLmdbReads = preferLmdbReads;
+ flags.EnableReadShadowValidation = enableShadowValidation;
+ flags.ReconciliationInterval = TimeSpan.FromMilliseconds(Math.Max(0, reconcileIntervalMs));
+ });
+
+ registration.AddCBDDCNetwork(false);
// Benchmark runs use explicit known peers; disable UDP discovery and handshake overhead.
services.AddSingleton();
@@ -86,6 +129,9 @@ internal sealed class BenchmarkPeerNode : IAsyncDisposable
return new BenchmarkPeerNode(provider, node, context, workDir);
}
+ ///
+ /// Starts the node asynchronously.
+ ///
public async Task StartAsync()
{
if (_started) return;
@@ -93,6 +139,9 @@ internal sealed class BenchmarkPeerNode : IAsyncDisposable
_started = true;
}
+ ///
+ /// Stops the node asynchronously.
+ ///
public async Task StopAsync()
{
if (!_started) return;
@@ -111,22 +160,37 @@ internal sealed class BenchmarkPeerNode : IAsyncDisposable
_started = false;
}
+ ///
+ /// Inserts or updates a user record.
+ ///
+ /// User payload.
public async Task UpsertUserAsync(User user)
{
await Context.Users.UpdateAsync(user);
await Context.SaveChangesAsync();
}
+ ///
+ /// Returns whether a user identifier exists.
+ ///
+ /// Target user identifier.
public bool ContainsUser(string userId)
{
return Context.Users.Find(u => u.Id == userId).Any();
}
+ ///
+ /// Counts user identifiers matching the provided prefix.
+ ///
+ /// User identifier prefix.
public int CountUsersWithPrefix(string prefix)
{
return Context.Users.FindAll().Count(u => u.Id.StartsWith(prefix, StringComparison.Ordinal));
}
+ ///
+ /// Disposes the node and any unmanaged resources.
+ ///
public async ValueTask DisposeAsync()
{
try
@@ -156,18 +220,43 @@ internal sealed class BenchmarkPeerNode : IAsyncDisposable
}
}
+ private static bool GetBoolEnv(string key, bool defaultValue)
+ {
+ string? raw = Environment.GetEnvironmentVariable(key);
+ if (string.IsNullOrWhiteSpace(raw)) return defaultValue;
+ if (bool.TryParse(raw, out bool parsed)) return parsed;
+ return defaultValue;
+ }
+
+ private static int GetIntEnv(string key, int defaultValue)
+ {
+ string? raw = Environment.GetEnvironmentVariable(key);
+ if (string.IsNullOrWhiteSpace(raw)) return defaultValue;
+ if (int.TryParse(raw, out int parsed)) return parsed;
+ return defaultValue;
+ }
+
private sealed class PassiveDiscoveryService : IDiscoveryService
{
+ ///
+ /// Gets the current list of active peers.
+ ///
public IEnumerable GetActivePeers()
{
return Array.Empty();
}
+ ///
+ /// Starts discovery.
+ ///
public Task Start()
{
return Task.CompletedTask;
}
+ ///
+ /// Stops discovery.
+ ///
public Task Stop()
{
return Task.CompletedTask;
diff --git a/tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests/E2EThroughputBenchmarks.cs b/tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests/E2EThroughputBenchmarks.cs
index 6a01d65..4e62e79 100644
--- a/tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests/E2EThroughputBenchmarks.cs
+++ b/tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests/E2EThroughputBenchmarks.cs
@@ -15,6 +15,9 @@ public class E2EThroughputBenchmarks
private BenchmarkPeerNode _nodeB = null!;
private int _sequence;
+ ///
+ /// Sets up benchmark nodes and prepares the cluster.
+ ///
[GlobalSetup]
public async Task GlobalSetupAsync()
{
@@ -58,6 +61,9 @@ public class E2EThroughputBenchmarks
await Task.Delay(500);
}
+ ///
+ /// Handles benchmark teardown for the throughput test suite.
+ ///
[GlobalCleanup]
public Task GlobalCleanupAsync()
{
@@ -66,6 +72,9 @@ public class E2EThroughputBenchmarks
return Task.CompletedTask;
}
+ ///
+ /// Measures local write throughput against a single node.
+ ///
[Benchmark(Description = "Local write throughput", OperationsPerInvoke = BatchSize)]
public async Task LocalWriteThroughput()
{
@@ -74,6 +83,9 @@ public class E2EThroughputBenchmarks
await _nodeA.UpsertUserAsync(CreateUser(userId));
}
+ ///
+ /// Measures replicated write throughput across two nodes.
+ ///
[Benchmark(Description = "Cross-node replicated throughput", OperationsPerInvoke = BatchSize)]
public async Task ReplicatedWriteThroughput()
{
diff --git a/tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests/OfflineResyncThroughputBenchmarks.cs b/tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests/OfflineResyncThroughputBenchmarks.cs
index 8cc07f5..5ea2e04 100644
--- a/tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests/OfflineResyncThroughputBenchmarks.cs
+++ b/tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests/OfflineResyncThroughputBenchmarks.cs
@@ -10,18 +10,24 @@ namespace ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests;
[SimpleJob(launchCount: 1, warmupCount: 0, iterationCount: 1)]
public class OfflineResyncThroughputBenchmarks
{
- private const int BacklogOperationCount = 10_000;
+ private const int BacklogOperationCount = 100_000;
private BenchmarkPeerNode _onlineNode = null!;
private BenchmarkPeerNode _offlineNode = null!;
private int _runSequence;
private string _currentPrefix = string.Empty;
+ ///
+ /// Sets up benchmark resources for offline resync scenarios.
+ ///
[GlobalSetup]
public Task GlobalSetupAsync()
{
return Task.CompletedTask;
}
+ ///
+ /// Handles benchmark teardown for offline resync scenarios.
+ ///
[GlobalCleanup]
public Task GlobalCleanupAsync()
{
@@ -30,6 +36,9 @@ public class OfflineResyncThroughputBenchmarks
return Task.CompletedTask;
}
+ ///
+ /// Prepares write-only workload state for the 100K throughput benchmark.
+ ///
[IterationSetup(Target = nameof(OfflineBacklogWriteThroughput100k))]
public void SetupOfflineWriteThroughput()
{
@@ -37,12 +46,18 @@ public class OfflineResyncThroughputBenchmarks
InitializeIterationNodesAsync().GetAwaiter().GetResult();
}
- [Benchmark(Description = "Offline backlog write throughput (10K ops)", OperationsPerInvoke = BacklogOperationCount)]
+ ///
+ /// Measures offline backlog write throughput for 100K operations.
+ ///
+ [Benchmark(Description = "Offline backlog write throughput (100K ops)", OperationsPerInvoke = BacklogOperationCount)]
public async Task OfflineBacklogWriteThroughput100k()
{
await WriteBatchAsync(_currentPrefix, BacklogOperationCount);
}
+ ///
+ /// Prepares nodes and backlog before the re-sync benchmark iteration.
+ ///
[IterationSetup(Target = nameof(OfflineNodeResyncDurationAfter100kBacklog))]
public void SetupOfflineResyncBenchmark()
{
@@ -51,7 +66,10 @@ public class OfflineResyncThroughputBenchmarks
WriteBatchAsync(_currentPrefix, BacklogOperationCount).GetAwaiter().GetResult();
}
- [Benchmark(Description = "Offline node re-sync duration after 10K backlog")]
+ ///
+ /// Measures re-sync duration after processing a 100K-entry offline backlog.
+ ///
+ [Benchmark(Description = "Offline node re-sync duration after 100K backlog")]
public async Task OfflineNodeResyncDurationAfter100kBacklog()
{
await _offlineNode.StartAsync();
diff --git a/tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests/SurrealLogStorageBenchmarks.cs b/tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests/SurrealLogStorageBenchmarks.cs
index 764cb9f..5fe6e17 100644
--- a/tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests/SurrealLogStorageBenchmarks.cs
+++ b/tests/ZB.MOM.WW.CDBBC.E2E.Benchmark.Tests/SurrealLogStorageBenchmarks.cs
@@ -57,6 +57,9 @@ public class SurrealLogStorageBenchmarks
private string _contextNumericKeyQueryValue = string.Empty;
private int _contextNumericValueQueryValue;
+ ///
+ /// Initializes the benchmark environment and seeds the dataset.
+ ///
[GlobalSetup]
public async Task GlobalSetupAsync()
{
@@ -97,6 +100,9 @@ public class SurrealLogStorageBenchmarks
$"RocksDB size: {sizeBytes / (1024d * 1024d):F2} MiB ({sizeBytes:N0} bytes). Path: {_databasePath}");
}
+ ///
+ /// Handles benchmark teardown for surreal log storage scenarios.
+ ///
[GlobalCleanup]
public Task GlobalCleanupAsync()
{
@@ -104,6 +110,9 @@ public class SurrealLogStorageBenchmarks
return Task.CompletedTask;
}
+ ///
+ /// Queries the latest log rows by context identifier.
+ ///
[Benchmark(Description = "Query by contextId (latest 200 rows)")]
public async Task QueryByContextIdAsync()
{
@@ -117,6 +126,9 @@ public class SurrealLogStorageBenchmarks
new Dictionary { ["contextId"] = _contextIdQueryValue });
}
+ ///
+ /// Queries the latest rows by logger and timestamp range.
+ ///
[Benchmark(Description = "Query by loggerName + timestamp range (latest 200 rows)")]
public async Task QueryByLoggerAndTimestampAsync()
{
@@ -137,6 +149,9 @@ public class SurrealLogStorageBenchmarks
});
}
+ ///
+ /// Queries rows by logger and context key/value pairs.
+ ///
[Benchmark(Description = "Query by loggerName + timestamp + arbitrary context string key/value")]
public async Task QueryByLoggerTimestampAndContextKeyAsync()
{
@@ -169,6 +184,9 @@ public class SurrealLogStorageBenchmarks
});
}
+ ///
+ /// Queries rows by logger and numeric context key/value pairs.
+ ///
[Benchmark(Description = "Query by loggerName + timestamp + arbitrary context number key/value")]
public async Task QueryByLoggerTimestampAndNumericContextKeyAsync()
{
@@ -201,6 +219,9 @@ public class SurrealLogStorageBenchmarks
});
}
+ ///
+ /// Reports RocksDB size for the seeded benchmark database.
+ ///
[Benchmark(Description = "RocksDB size (bytes)")]
public long GetDatabaseFileSizeBytes()
{