From cce24fa8f329984c03ab511432e01c8700f040bc Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 22 Feb 2026 17:44:57 -0500 Subject: [PATCH] Add LMDB oplog migration path with dual-write cutover support Introduce LMDB oplog store, migration flags, telemetry/backfill tooling, and parity tests to enable staged Surreal-to-LMDB rollout with rollback coverage. --- docs/adr/0002-lmdb-oplog-migration.md | 55 + docs/persistence-providers.md | 77 + lmdbop.md | 307 ++++ .../Lmdb/CBDDCLmdbOplogExtensions.cs | 54 + .../Lmdb/FeatureFlagOplogStore.cs | 497 ++++++ .../Lmdb/LmdbOplogBackfillTool.cs | 277 ++++ .../Lmdb/LmdbOplogFeatureFlags.cs | 32 + .../Lmdb/LmdbOplogOptions.cs | 68 + .../Lmdb/LmdbOplogStore.cs | 1385 +++++++++++++++++ .../Lmdb/OplogMigrationTelemetry.cs | 89 ++ .../Surreal/CBDDCSurrealEmbeddedExtensions.cs | 3 +- .../ZB.MOM.WW.CBDDC.Persistence.csproj | 1 + .../ClusterCrudSyncE2ETests.cs | 137 +- .../LmdbOplogMigrationTests.cs | 237 +++ .../LmdbOplogStoreContractTests.cs | 267 ++++ .../OplogStoreContractTestBase.cs | 121 ++ 16 files changed, 3601 insertions(+), 6 deletions(-) create mode 100644 docs/adr/0002-lmdb-oplog-migration.md create mode 100644 lmdbop.md create mode 100644 src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/CBDDCLmdbOplogExtensions.cs create mode 100644 src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/FeatureFlagOplogStore.cs create mode 100644 src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/LmdbOplogBackfillTool.cs create mode 100644 src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/LmdbOplogFeatureFlags.cs create mode 100644 src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/LmdbOplogOptions.cs create mode 100644 src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/LmdbOplogStore.cs create mode 100644 src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/OplogMigrationTelemetry.cs create mode 100644 tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/LmdbOplogMigrationTests.cs create mode 100644 tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/LmdbOplogStoreContractTests.cs create mode 100644 tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/OplogStoreContractTestBase.cs diff --git a/docs/adr/0002-lmdb-oplog-migration.md b/docs/adr/0002-lmdb-oplog-migration.md new file mode 100644 index 0000000..bbd8bf8 --- /dev/null +++ b/docs/adr/0002-lmdb-oplog-migration.md @@ -0,0 +1,55 @@ +# ADR 0002: LMDB Oplog Migration + +## Status + +Accepted + +## Context + +The existing oplog persistence layer is Surreal-backed and tightly coupled to local CDC transaction boundaries. We need an LMDB-backed oplog path to improve prune efficiency and reduce query latency while preserving existing `IOplogStore` semantics and low-risk rollback. + +Key constraints: +- Dedupe by hash. +- Hash lookup, node/time scans, and chain reconstruction. +- Cutoff-based prune (not strict FIFO) with interleaved late arrivals. +- Dataset isolation in all indexes. +- Explicit handling for cross-engine atomicity when document metadata remains Surreal-backed. + +## Decision + +Adopt an LMDB oplog provider (`LmdbOplogStore`) with feature-flag controlled migration: +- `UseLmdbOplog` +- `DualWriteOplog` +- `PreferLmdbReads` + +### LMDB index schema + +Single environment with named DBIs: +- `oplog_by_hash`: `{datasetId}|{hash}` -> serialized `OplogEntry` +- `oplog_by_hlc`: `{datasetId}|{wall}|{logic}|{nodeId}|{hash}` -> marker +- `oplog_by_node_hlc`: `{datasetId}|{nodeId}|{wall}|{logic}|{hash}` -> marker +- `oplog_prev_to_hash` (`DUPSORT`): `{datasetId}|{previousHash}` -> `{hash}` +- `oplog_node_head`: `{datasetId}|{nodeId}` -> `{wall,logic,hash}` +- `oplog_meta`: schema/version markers + dataset prune watermark + +Composite keys use deterministic byte encodings with dataset prefixes on every index. + +### Prune algorithm + +Prune scans `oplog_by_hlc` up to cutoff and removes each candidate from all indexes, then recomputes touched node-head entries. Deletes run in bounded batches (`PruneBatchSize`) inside write transactions. + +### Consistency model + +Phase-1 consistency model is Option A (eventual cross-engine atomicity): +- Surreal local CDC writes remain authoritative for atomic document+metadata+checkpoint transactions. +- LMDB is backfilled/reconciled from Surreal when LMDB reads are preferred and gaps are detected. +- Dual-write is available for sync-path writes to accelerate cutover confidence. + +## Consequences + +- Enables staged rollout (dual-write and read shadow validation before cutover). +- Improves prune/query performance characteristics via ordered LMDB indexes. +- Keeps rollback low-risk by retaining Surreal source-of-truth during migration windows. +- Requires reconciliation logic and operational monitoring of mismatch counters/logs during migration. +- Includes a dedicated backfill utility (`LmdbOplogBackfillTool`) with parity report output. +- Exposes migration telemetry counters (`OplogMigrationTelemetry`) for mismatch/reconciliation tracking. diff --git a/docs/persistence-providers.md b/docs/persistence-providers.md index 2679647..3212b5d 100755 --- a/docs/persistence-providers.md +++ b/docs/persistence-providers.md @@ -237,6 +237,83 @@ Surreal persistence now stores `datasetId` on oplog, metadata, snapshot metadata 4. **Delete durability**: deletes persist as oplog delete operations plus tombstone metadata. 5. **Remote apply behavior**: remote sync applies documents without generating local loopback CDC entries. +## LMDB Oplog Migration Mode + +CBDDC now supports an LMDB-backed oplog provider for staged cutover from Surreal oplog tables. + +### Registration + +```csharp +services.AddCBDDCCore() + .AddCBDDCSurrealEmbedded(optionsFactory) + .AddCBDDCLmdbOplog( + _ => new LmdbOplogOptions + { + EnvironmentPath = "/var/lib/cbddc/oplog-lmdb", + MapSizeBytes = 256L * 1024 * 1024, + MaxDatabases = 16, + PruneBatchSize = 512 + }, + flags => + { + flags.UseLmdbOplog = true; + flags.DualWriteOplog = true; + flags.PreferLmdbReads = false; + }); +``` + +### Feature Flags + +- `UseLmdbOplog`: enables LMDB migration path. +- `DualWriteOplog`: mirrors writes to Surreal + LMDB. +- `PreferLmdbReads`: cuts reads over to LMDB. +- `EnableReadShadowValidation`: compares Surreal/LMDB read results and logs mismatches. + +### Consistency Model + +The initial migration model is eventual cross-engine atomicity (Option A): + +- Surreal local CDC transactions remain authoritative for atomic document + metadata persistence. +- LMDB is backfilled/reconciled when LMDB reads are preferred and LMDB is missing recent Surreal writes. +- During rollout, keep dual-write enabled until mismatch logs remain stable. + +### Backfill Utility + +`LmdbOplogBackfillTool` performs Surreal -> LMDB oplog backfill and parity validation per dataset: + +```csharp +var backfill = provider.GetRequiredService(); +LmdbOplogBackfillReport report = await backfill.BackfillOrThrowAsync(DatasetId.Primary); +``` + +Validation includes: +- total entry counts +- per-node entry counts +- latest hash per node +- hash spot checks +- chain-range spot checks + +### Migration Telemetry + +`FeatureFlagOplogStore` records migration counters through `OplogMigrationTelemetry`: + +- shadow comparisons +- shadow mismatches +- LMDB preferred-read fallbacks to Surreal +- reconciliation runs and reconciled entry counts (global + per dataset) + +You can resolve `OplogMigrationTelemetry` from DI or call `GetTelemetrySnapshot()` on `FeatureFlagOplogStore`. + +### Rollback Path + +To roll back read/write behavior to Surreal during migration: + +- set `PreferLmdbReads = false` +- set `DualWriteOplog = false` + +With `UseLmdbOplog = true`, this keeps LMDB services available while routing reads/writes to Surreal only. +If LMDB should be fully disabled, set `UseLmdbOplog = false`. + ## Feature Comparison | Feature | SQLite (Direct) | EF Core | PostgreSQL | Surreal Embedded | diff --git a/lmdbop.md b/lmdbop.md new file mode 100644 index 0000000..148f6b7 --- /dev/null +++ b/lmdbop.md @@ -0,0 +1,307 @@ +# LMDB Oplog Migration Plan + +## 1. Goal + +Move `IOplogStore` persistence from Surreal-backed oplog tables to an LMDB-backed store while preserving current sync behavior and improving prune efficiency. + +Primary outcomes: +- Keep existing `IOplogStore` contract semantics. +- Make `PruneOplogAsync` efficient and safe under current timestamp-based cutoff behavior. +- Keep roll-forward and rollback low risk via feature flags and verification steps. + +## 2. Current Constraints That Must Be Preserved + +The oplog is not just a queue; the implementation must support: +- append + dedupe by hash +- lookup by hash +- node/time range scans +- chain-range reconstruction by hash linkage +- prune by cutoff timestamp +- per-dataset isolation + +Key references: +- `/Users/dohertj2/Desktop/CBDDC/src/ZB.MOM.WW.CBDDC.Core/Storage/IOplogStore.cs` +- `/Users/dohertj2/Desktop/CBDDC/src/ZB.MOM.WW.CBDDC.Network/SyncOrchestrator.cs` +- `/Users/dohertj2/Desktop/CBDDC/src/ZB.MOM.WW.CBDDC.Network/TcpSyncServer.cs` + +Important behavior notes: +- Prune is cutoff-based, not pure FIFO dequeue. +- Late-arriving remote entries can have older timestamps than recent local writes, so prune can be non-contiguous in append order. +- Current Surreal local write path performs atomic oplog+metadata(+checkpoint) persistence inside one transaction; cross-engine behavior must be handled intentionally. + +## 3. Target Design (LMDB) + +## 3.1 New Provider + +Create an LMDB oplog provider in persistence: +- New class: `LmdbOplogStore : OplogStore` +- New options class: `LmdbOplogOptions` +- New DI extension, e.g. `AddCBDDCLmdbOplog(...)` + +Suggested options: +- `EnvironmentPath` +- `MapSizeBytes` +- `MaxDatabases` +- `SyncMode` (durability/perf) +- `PruneBatchSize` +- `EnableCompactionCopy` (for optional file shrink operation) + +## 3.2 LMDB Data Layout + +Use multiple named DBIs (single environment): + +1. `oplog_by_hash` +Key: `{datasetId}|{hash}` +Value: serialized `OplogEntry` (compact binary or UTF-8 JSON) + +2. `oplog_by_hlc` +Key: `{datasetId}|{wall:big-endian}|{logic:big-endian}|{nodeId}|{hash}` +Value: empty or small marker +Purpose: `GetOplogAfterAsync`, prune range scan + +3. `oplog_by_node_hlc` +Key: `{datasetId}|{nodeId}|{wall}|{logic}|{hash}` +Value: empty or marker +Purpose: `GetOplogForNodeAfterAsync`, fast node head updates + +4. `oplog_prev_to_hash` (duplicate-allowed) +Key: `{datasetId}|{previousHash}` +Value: `{hash}` +Purpose: chain traversal support for `GetChainRangeAsync` + +5. `oplog_node_head` +Key: `{datasetId}|{nodeId}` +Value: `{wall, logic, hash}` +Purpose: O(1) `GetLastEntryHashAsync` + +6. `oplog_meta` +Stores schema version, migration markers, and optional prune watermark per dataset. + +Notes: +- Use deterministic byte encoding for composite keys to preserve lexical order. +- Keep dataset prefix in every index key to guarantee dataset isolation. + +## 3.3 Write Transaction Rules + +`AppendOplogEntryAsync` transaction: +1. Check dedupe in `oplog_by_hash`. +2. If absent: insert `oplog_by_hash` + all secondary indexes. +3. Update `oplog_node_head` only if incoming timestamp > current head timestamp for that node. +4. Commit once. + +`MergeAsync`/`ImportAsync`: +- Reuse same insert routine in loops with write batching. +- Dedupe strictly by hash. + +## 3.4 Prune Strategy + +Base prune operation (must-have): +1. Cursor-scan `oplog_by_hlc` up to cutoff key for target dataset. +2. For each candidate hash: + - delete from `oplog_by_hash` + - delete node index key + - delete prev->hash duplicate mapping + - delete hlc index key +3. Recompute affected `oplog_node_head` entries lazily (on read) or eagerly for touched nodes. + +Efficiency enhancements (recommended): +- Process deletes in batches (`PruneBatchSize`) inside bounded write txns. +- Keep optional per-node dirty set during prune to limit head recomputation. +- Optional periodic LMDB compact copy if physical file shrink is needed (LMDB naturally reuses freed pages, but does not always shrink file immediately). + +## 3.5 Atomicity with Document Metadata + +Decision required (explicit in implementation review): + +Option A (phase 1 recommended): +- Accept cross-engine eventual atomicity. +- Keep current document write flow. +- Add reconciliation/repair on startup: + - detect metadata entries missing oplog hash for recent writes + - rebuild node-head and index consistency from `oplog_by_hash`. + +Option B (hard mode): +- Introduce durable outbox pattern to guarantee atomic handoff across engines. +- Higher complexity; schedule after functional cutover. + +Plan uses Option A first for lower migration risk. + +## 4. Phased Execution Plan + +## Phase 0: Prep and Design Freeze +- Add ADR documenting: + - key encoding format + - index schema + - prune algorithm + - consistency model (Option A above) +- Add config model and feature flags: + - `UseLmdbOplog` + - `DualWriteOplog` + - `PreferLmdbReads` + +Exit criteria: +- ADR approved. +- Configuration contract approved. + +## Phase 1: LMDB Store Skeleton +- Add package reference `LightningDB`. +- Implement `LmdbOplogStore` with: + - `AppendOplogEntryAsync` + - `GetEntryByHashAsync` + - `GetLastEntryHashAsync` + - `GetOplogAfterAsync` + - `GetOplogForNodeAfterAsync` + - `GetChainRangeAsync` + - `PruneOplogAsync` + - snapshot import/export/drop/merge APIs. +- Implement startup/open/close lifecycle and map-size handling. + +Exit criteria: +- Local contract tests pass for LMDB store. + +## Phase 2: Dual-Write + Read Shadow Validation +- Keep Surreal oplog as source of truth. +- Write every oplog mutation to both stores (`DualWriteOplog=true`). +- Read-path comparison mode in non-prod: + - query both stores + - assert same hashes/order for key APIs + - log mismatches. + +Exit criteria: +- Zero mismatches in soak tests. + +## Phase 3: Cutover +- Set `PreferLmdbReads=true` in staging first. +- Keep dual-write enabled for one release window. +- Monitor: + - prune duration + - oplog query latency + - mismatch counters + - restart recovery behavior. + +Exit criteria: +- Stable staging and production canary. + +## Phase 4: Cleanup +- Disable Surreal oplog writes. +- Keep migration utility for rollback window. +- Remove dual-compare instrumentation after confidence period. + +## 5. Data Migration / Backfill + +Backfill tool steps: +1. Read dataset-scoped Surreal oplog export. +2. Bulk import into LMDB by HLC order. +3. Rebuild node-head table. +4. Validate: + - counts per dataset + - counts per node + - latest hash per node + - random hash spot checks + - chain-range spot checks. + +Rollback: +- Keep Surreal oplog untouched during dual-write window. +- Flip feature flags back to Surreal reads. + +## 6. Unit Test Update Instructions + +## 6.1 Reuse Existing Oplog Contract Tests + +Use these as baseline parity requirements: +- `/Users/dohertj2/Desktop/CBDDC/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/SurrealStoreContractTests.cs` +- `/Users/dohertj2/Desktop/CBDDC/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/BLiteStoreExportImportTests.cs` (class `SurrealStoreExportImportTests`) + +Actions: +1. Extract oplog contract cases into shared test base (provider-agnostic). +2. Run same suite against: + - Surreal store + - new LMDB store + +Minimum parity cases: +- append/query/merge/drop +- dataset isolation +- legacy/default dataset behavior (if supported) +- `GetChainRangeAsync` correctness +- `GetLastEntryHashAsync` persistence across restart + +## 6.2 Add LMDB-Specific Unit Tests + +Create new file: +- `/Users/dohertj2/Desktop/CBDDC/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/LmdbOplogStoreContractTests.cs` + +Add tests for: +1. Index consistency: + - inserting one entry populates all indexes + - deleting/pruning removes all index records +2. Prune correctness: + - removes `<= cutoff` + - does not remove `> cutoff` + - handles interleaved node timestamps + - handles late-arriving old timestamp entry safely +3. Node-head maintenance: + - head advances on newer entry + - prune invalidates/recomputes correctly +4. Restart durability: + - reopen LMDB env and verify last-hash + scans +5. Dedupe: + - duplicate hash append is idempotent + +## 6.3 Update Integration/E2E Coverage + +Files to touch: +- `/Users/dohertj2/Desktop/CBDDC/tests/ZB.MOM.WW.CBDDC.E2E.Tests/ClusterCrudSyncE2ETests.cs` +- `/Users/dohertj2/Desktop/CBDDC/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/SurrealCdcDurabilityTests.cs` (or LMDB-focused equivalent) + +Add/adjust scenarios: +1. Gap recovery still works with LMDB oplog backend. +2. Peer-confirmed prune still blocks/allows correctly. +3. Crash between document commit and oplog write (Option A behavior) is detected/repaired by startup reconciliation. +4. Prune performance smoke test (large synthetic oplog, bounded runtime threshold with generous margin). + +## 6.4 Keep Existing Network Unit Tests Intact + +Most network tests mock `IOplogStore` and should remain unchanged: +- `/Users/dohertj2/Desktop/CBDDC/tests/ZB.MOM.WW.CBDDC.Network.Tests/SyncOrchestratorMaintenancePruningTests.cs` +- `/Users/dohertj2/Desktop/CBDDC/tests/ZB.MOM.WW.CBDDC.Network.Tests/SyncOrchestratorConfirmationTests.cs` +- `/Users/dohertj2/Desktop/CBDDC/tests/ZB.MOM.WW.CBDDC.Network.Tests/SnapshotReconnectRegressionTests.cs` + +Only update if method behavior/ordering contracts are intentionally changed. + +## 7. Performance and Observability Plan + +Track and compare (Surreal vs LMDB): +- `AppendOplogEntryAsync` latency p50/p95/p99 +- `GetOplogForNodeAfterAsync` latency +- prune duration and entries/sec deleted +- LMDB env file size and reclaimed free-page ratio +- mismatch counters in dual-read compare mode + +Add logs/metrics: +- prune batches processed +- dirty nodes recomputed +- startup repair actions and counts + +## 8. Risks and Mitigations + +1. Cross-engine consistency gaps (document metadata vs oplog) +- Mitigation: startup reconciliation + dual-write shadow period. + +2. Incorrect composite key encoding +- Mitigation: explicit encoding helper + property tests for sort/order invariants. + +3. Prune causing stale node-head values +- Mitigation: touched-node tracking and lazy/eager recompute tests. + +4. LMDB map-size exhaustion +- Mitigation: configurable mapsize, monitoring, and operational runbook for resize. + +## 9. Review Checklist + +- [x] ADR approved for LMDB key/index schema. +- [x] Feature flags merged (`UseLmdbOplog`, `DualWriteOplog`, `PreferLmdbReads`). +- [x] LMDB contract tests passing. +- [x] Dual-write mismatch telemetry in place. +- [x] Backfill tool implemented and validated in automated tests (staging execution ready). +- [x] Prune correctness + efficiency tests passing. +- [x] Rollback path documented and tested. diff --git a/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/CBDDCLmdbOplogExtensions.cs b/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/CBDDCLmdbOplogExtensions.cs new file mode 100644 index 0000000..83ffce3 --- /dev/null +++ b/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/CBDDCLmdbOplogExtensions.cs @@ -0,0 +1,54 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using ZB.MOM.WW.CBDDC.Core.Storage; +using ZB.MOM.WW.CBDDC.Persistence.Surreal; + +namespace ZB.MOM.WW.CBDDC.Persistence.Lmdb; + +/// +/// Extension methods for adding the LMDB oplog provider and migration feature flags. +/// +public static class CBDDCLmdbOplogExtensions +{ + /// + /// Registers LMDB oplog services and replaces with a feature-flag migration router. + /// + /// The service collection. + /// Factory creating LMDB environment options. + /// Optional migration feature-flag configuration. + /// The service collection. + public static IServiceCollection AddCBDDCLmdbOplog( + this IServiceCollection services, + Func optionsFactory, + Action? configureFlags = null) + { + if (services == null) throw new ArgumentNullException(nameof(services)); + if (optionsFactory == null) throw new ArgumentNullException(nameof(optionsFactory)); + + services.TryAddSingleton(optionsFactory); + + var flags = new LmdbOplogFeatureFlags + { + UseLmdbOplog = true, + DualWriteOplog = true, + PreferLmdbReads = false, + EnableReadShadowValidation = false + }; + configureFlags?.Invoke(flags); + services.TryAddSingleton(flags); + + services.TryAddSingleton(); + services.TryAddSingleton(); + + bool surrealRegistered = services.Any(descriptor => descriptor.ServiceType == typeof(SurrealOplogStore)); + if (surrealRegistered) + { + services.TryAddSingleton(); + services.Replace(ServiceDescriptor.Singleton()); + } + else + services.Replace(ServiceDescriptor.Singleton(sp => sp.GetRequiredService())); + + return services; + } +} diff --git a/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/FeatureFlagOplogStore.cs b/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/FeatureFlagOplogStore.cs new file mode 100644 index 0000000..feb83f6 --- /dev/null +++ b/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/FeatureFlagOplogStore.cs @@ -0,0 +1,497 @@ +using System.Collections.Concurrent; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using ZB.MOM.WW.CBDDC.Core; +using ZB.MOM.WW.CBDDC.Core.Storage; +using ZB.MOM.WW.CBDDC.Persistence.Surreal; + +namespace ZB.MOM.WW.CBDDC.Persistence.Lmdb; + +/// +/// Feature-flag controlled oplog router supporting Surreal source-of-truth, LMDB dual-write, +/// cutover reads, and shadow validation. +/// +public sealed class FeatureFlagOplogStore : IOplogStore +{ + private readonly SemaphoreSlim _reconcileGate = new(1, 1); + private readonly ConcurrentDictionary _reconcileWatermarks = new(StringComparer.Ordinal); + private readonly LmdbOplogFeatureFlags _flags; + private readonly LmdbOplogStore _lmdb; + private readonly ILogger _logger; + private readonly OplogMigrationTelemetry _telemetry; + private readonly SurrealOplogStore _surreal; + + /// + /// Initializes a new instance of the class. + /// + public FeatureFlagOplogStore( + SurrealOplogStore surreal, + LmdbOplogStore lmdb, + LmdbOplogFeatureFlags flags, + OplogMigrationTelemetry? telemetry = null, + ILogger? logger = null) + { + _surreal = surreal ?? throw new ArgumentNullException(nameof(surreal)); + _lmdb = lmdb ?? throw new ArgumentNullException(nameof(lmdb)); + _flags = flags ?? throw new ArgumentNullException(nameof(flags)); + _telemetry = telemetry ?? new OplogMigrationTelemetry(); + _logger = logger ?? NullLogger.Instance; + } + + /// + public event EventHandler? ChangesApplied; + + /// + public async Task AppendOplogEntryAsync(OplogEntry entry, CancellationToken cancellationToken = default) + { + await WriteAsync( + s => s.AppendOplogEntryAsync(entry, cancellationToken), + l => l.AppendOplogEntryAsync(entry, cancellationToken)); + } + + /// + public async Task AppendOplogEntryAsync( + OplogEntry entry, + string datasetId, + CancellationToken cancellationToken = default) + { + string normalizedDatasetId = NormalizeDatasetId(datasetId); + await WriteAsync( + s => s.AppendOplogEntryAsync(entry, normalizedDatasetId, cancellationToken), + l => l.AppendOplogEntryAsync(entry, normalizedDatasetId, cancellationToken)); + } + + /// + public Task> GetOplogAfterAsync( + HlcTimestamp timestamp, + IEnumerable? collections = null, + CancellationToken cancellationToken = default) + { + return GetOplogAfterAsync(timestamp, DatasetId.Primary, collections, cancellationToken); + } + + /// + public Task> GetOplogAfterAsync( + HlcTimestamp timestamp, + string datasetId, + IEnumerable? collections = null, + CancellationToken cancellationToken = default) + { + string normalizedDatasetId = NormalizeDatasetId(datasetId); + return ReadWithOptionalShadowCompareAsync( + normalizedDatasetId, + s => s.GetOplogAfterAsync(timestamp, normalizedDatasetId, collections, cancellationToken), + l => l.GetOplogAfterAsync(timestamp, normalizedDatasetId, collections, cancellationToken), + SequencesEquivalent, + "GetOplogAfterAsync"); + } + + /// + public Task GetLatestTimestampAsync(CancellationToken cancellationToken = default) + { + return GetLatestTimestampAsync(DatasetId.Primary, cancellationToken); + } + + /// + public Task GetLatestTimestampAsync(string datasetId, CancellationToken cancellationToken = default) + { + string normalizedDatasetId = NormalizeDatasetId(datasetId); + return ReadWithOptionalShadowCompareAsync( + normalizedDatasetId, + s => s.GetLatestTimestampAsync(normalizedDatasetId, cancellationToken), + l => l.GetLatestTimestampAsync(normalizedDatasetId, cancellationToken), + (a, b) => a.Equals(b), + "GetLatestTimestampAsync"); + } + + /// + public Task GetVectorClockAsync(CancellationToken cancellationToken = default) + { + return GetVectorClockAsync(DatasetId.Primary, cancellationToken); + } + + /// + public Task GetVectorClockAsync(string datasetId, CancellationToken cancellationToken = default) + { + string normalizedDatasetId = NormalizeDatasetId(datasetId); + return ReadWithOptionalShadowCompareAsync( + normalizedDatasetId, + s => s.GetVectorClockAsync(normalizedDatasetId, cancellationToken), + l => l.GetVectorClockAsync(normalizedDatasetId, cancellationToken), + VectorClocksEquivalent, + "GetVectorClockAsync"); + } + + /// + public Task> GetOplogForNodeAfterAsync( + string nodeId, + HlcTimestamp since, + IEnumerable? collections = null, + CancellationToken cancellationToken = default) + { + return GetOplogForNodeAfterAsync(nodeId, since, DatasetId.Primary, collections, cancellationToken); + } + + /// + public Task> GetOplogForNodeAfterAsync( + string nodeId, + HlcTimestamp since, + string datasetId, + IEnumerable? collections = null, + CancellationToken cancellationToken = default) + { + string normalizedDatasetId = NormalizeDatasetId(datasetId); + return ReadWithOptionalShadowCompareAsync( + normalizedDatasetId, + s => s.GetOplogForNodeAfterAsync(nodeId, since, normalizedDatasetId, collections, cancellationToken), + l => l.GetOplogForNodeAfterAsync(nodeId, since, normalizedDatasetId, collections, cancellationToken), + SequencesEquivalent, + "GetOplogForNodeAfterAsync"); + } + + /// + public Task GetLastEntryHashAsync(string nodeId, CancellationToken cancellationToken = default) + { + return GetLastEntryHashAsync(nodeId, DatasetId.Primary, cancellationToken); + } + + /// + public Task GetLastEntryHashAsync( + string nodeId, + string datasetId, + CancellationToken cancellationToken = default) + { + string normalizedDatasetId = NormalizeDatasetId(datasetId); + return ReadWithOptionalShadowCompareAsync( + normalizedDatasetId, + s => s.GetLastEntryHashAsync(nodeId, normalizedDatasetId, cancellationToken), + l => l.GetLastEntryHashAsync(nodeId, normalizedDatasetId, cancellationToken), + (a, b) => string.Equals(a, b, StringComparison.Ordinal), + "GetLastEntryHashAsync"); + } + + /// + public Task> GetChainRangeAsync( + string startHash, + string endHash, + CancellationToken cancellationToken = default) + { + return GetChainRangeAsync(startHash, endHash, DatasetId.Primary, cancellationToken); + } + + /// + public Task> GetChainRangeAsync( + string startHash, + string endHash, + string datasetId, + CancellationToken cancellationToken = default) + { + string normalizedDatasetId = NormalizeDatasetId(datasetId); + return ReadWithOptionalShadowCompareAsync( + normalizedDatasetId, + s => s.GetChainRangeAsync(startHash, endHash, normalizedDatasetId, cancellationToken), + l => l.GetChainRangeAsync(startHash, endHash, normalizedDatasetId, cancellationToken), + SequencesEquivalent, + "GetChainRangeAsync"); + } + + /// + public Task GetEntryByHashAsync(string hash, CancellationToken cancellationToken = default) + { + return GetEntryByHashAsync(hash, DatasetId.Primary, cancellationToken); + } + + /// + public Task GetEntryByHashAsync( + string hash, + string datasetId, + CancellationToken cancellationToken = default) + { + string normalizedDatasetId = NormalizeDatasetId(datasetId); + return ReadWithOptionalShadowCompareAsync( + normalizedDatasetId, + s => s.GetEntryByHashAsync(hash, normalizedDatasetId, cancellationToken), + l => l.GetEntryByHashAsync(hash, normalizedDatasetId, cancellationToken), + EntriesEquivalent, + "GetEntryByHashAsync"); + } + + /// + public async Task ApplyBatchAsync(IEnumerable oplogEntries, CancellationToken cancellationToken = default) + { + var entries = oplogEntries.ToList(); + await WriteAsync( + s => s.ApplyBatchAsync(entries, cancellationToken), + l => l.ApplyBatchAsync(entries, cancellationToken)); + + ChangesApplied?.Invoke(this, new ChangesAppliedEventArgs(entries)); + } + + /// + public async Task ApplyBatchAsync( + IEnumerable oplogEntries, + string datasetId, + CancellationToken cancellationToken = default) + { + string normalizedDatasetId = NormalizeDatasetId(datasetId); + var entries = oplogEntries.ToList(); + + await WriteAsync( + s => s.ApplyBatchAsync(entries, normalizedDatasetId, cancellationToken), + l => l.ApplyBatchAsync(entries, normalizedDatasetId, cancellationToken)); + + ChangesApplied?.Invoke(this, new ChangesAppliedEventArgs(entries)); + } + + /// + public Task PruneOplogAsync(HlcTimestamp cutoff, CancellationToken cancellationToken = default) + { + return PruneOplogAsync(cutoff, DatasetId.Primary, cancellationToken); + } + + /// + public Task PruneOplogAsync(HlcTimestamp cutoff, string datasetId, CancellationToken cancellationToken = default) + { + string normalizedDatasetId = NormalizeDatasetId(datasetId); + return WriteAsync( + s => s.PruneOplogAsync(cutoff, normalizedDatasetId, cancellationToken), + l => l.PruneOplogAsync(cutoff, normalizedDatasetId, cancellationToken)); + } + + /// + public Task DropAsync(CancellationToken cancellationToken = default) + { + return DropAsync(DatasetId.Primary, cancellationToken); + } + + /// + public Task DropAsync(string datasetId, CancellationToken cancellationToken = default) + { + string normalizedDatasetId = NormalizeDatasetId(datasetId); + return WriteAsync( + s => s.DropAsync(normalizedDatasetId, cancellationToken), + l => l.DropAsync(normalizedDatasetId, cancellationToken)); + } + + /// + public Task> ExportAsync(CancellationToken cancellationToken = default) + { + return ExportAsync(DatasetId.Primary, cancellationToken); + } + + /// + public Task> ExportAsync(string datasetId, CancellationToken cancellationToken = default) + { + string normalizedDatasetId = NormalizeDatasetId(datasetId); + return ReadWithOptionalShadowCompareAsync( + normalizedDatasetId, + s => s.ExportAsync(normalizedDatasetId, cancellationToken), + l => l.ExportAsync(normalizedDatasetId, cancellationToken), + SequencesEquivalent, + "ExportAsync"); + } + + /// + public Task ImportAsync(IEnumerable items, CancellationToken cancellationToken = default) + { + return ImportAsync(items, DatasetId.Primary, cancellationToken); + } + + /// + public Task ImportAsync( + IEnumerable items, + string datasetId, + CancellationToken cancellationToken = default) + { + string normalizedDatasetId = NormalizeDatasetId(datasetId); + var entries = items.ToList(); + return WriteAsync( + s => s.ImportAsync(entries, normalizedDatasetId, cancellationToken), + l => l.ImportAsync(entries, normalizedDatasetId, cancellationToken)); + } + + /// + public Task MergeAsync(IEnumerable items, CancellationToken cancellationToken = default) + { + return MergeAsync(items, DatasetId.Primary, cancellationToken); + } + + /// + public Task MergeAsync( + IEnumerable items, + string datasetId, + CancellationToken cancellationToken = default) + { + string normalizedDatasetId = NormalizeDatasetId(datasetId); + var entries = items.ToList(); + return WriteAsync( + s => s.MergeAsync(entries, normalizedDatasetId, cancellationToken), + l => l.MergeAsync(entries, normalizedDatasetId, cancellationToken)); + } + + /// + /// Returns current migration telemetry counters. + /// + public OplogMigrationTelemetrySnapshot GetTelemetrySnapshot() + { + return _telemetry.GetSnapshot(); + } + + private bool IsLmdbEnabled => _flags.UseLmdbOplog; + + private bool ShouldShadowCompare => + _flags.UseLmdbOplog && _flags.DualWriteOplog && _flags.EnableReadShadowValidation; + + private bool PreferLmdbReads => _flags.UseLmdbOplog && _flags.PreferLmdbReads; + + private static string NormalizeDatasetId(string? datasetId) + { + return DatasetId.Normalize(datasetId); + } + + private async Task ReadWithOptionalShadowCompareAsync( + string datasetId, + Func> surrealRead, + Func> lmdbRead, + Func equals, + string operationName) + { + bool preferLmdb = PreferLmdbReads; + + T primary; + if (preferLmdb) + { + try + { + await EnsureReconciledAsync(datasetId); + primary = await lmdbRead(_lmdb); + } + catch (Exception exception) + { + _telemetry.RecordPreferredReadFallback(); + _logger.LogWarning( + exception, + "LMDB preferred read fallback to Surreal for {Operation} dataset {DatasetId}.", + operationName, + datasetId); + primary = await surrealRead(_surreal); + preferLmdb = false; + } + } + else + { + primary = await surrealRead(_surreal); + } + + if (!ShouldShadowCompare) return primary; + + T secondary; + if (preferLmdb) + { + secondary = await surrealRead(_surreal); + } + else + { + await EnsureReconciledAsync(datasetId); + secondary = await lmdbRead(_lmdb); + } + + bool isMatch = equals(primary, secondary); + _telemetry.RecordShadowComparison(isMatch); + + if (!isMatch) + _logger.LogWarning( + "Oplog read shadow mismatch in {Operation} for dataset {DatasetId}. PreferLmdbReads={PreferLmdbReads}", + operationName, + datasetId, + PreferLmdbReads); + + return primary; + } + + private async Task WriteAsync( + Func surrealWrite, + Func lmdbWrite) + { + if (!IsLmdbEnabled) + { + await surrealWrite(_surreal); + return; + } + + if (_flags.DualWriteOplog) + { + await surrealWrite(_surreal); + await lmdbWrite(_lmdb); + return; + } + + if (_flags.PreferLmdbReads) + await lmdbWrite(_lmdb); + else + await surrealWrite(_surreal); + } + + private async Task EnsureReconciledAsync(string datasetId) + { + if (!PreferLmdbReads) return; + + string normalizedDatasetId = NormalizeDatasetId(datasetId); + DateTimeOffset now = DateTimeOffset.UtcNow; + + if (_reconcileWatermarks.TryGetValue(normalizedDatasetId, out DateTimeOffset watermark) && + now - watermark < _flags.ReconciliationInterval) + return; + + await _reconcileGate.WaitAsync(); + try + { + now = DateTimeOffset.UtcNow; + if (_reconcileWatermarks.TryGetValue(normalizedDatasetId, out watermark) && + now - watermark < _flags.ReconciliationInterval) + return; + + var source = (await _surreal.ExportAsync(normalizedDatasetId)).ToList(); + await _lmdb.MergeAsync(source, normalizedDatasetId); + _telemetry.RecordReconciliation(normalizedDatasetId, source.Count); + _reconcileWatermarks[normalizedDatasetId] = DateTimeOffset.UtcNow; + + _logger.LogInformation( + "Reconciled LMDB oplog for dataset {DatasetId} by merging {MergedCount} Surreal entries.", + normalizedDatasetId, + source.Count); + } + finally + { + _reconcileGate.Release(); + } + } + + private static bool EntriesEquivalent(OplogEntry? left, OplogEntry? right) + { + if (left is null && right is null) return true; + if (left is null || right is null) return false; + + return string.Equals(left.Hash, right.Hash, StringComparison.Ordinal) && + string.Equals(left.DatasetId, right.DatasetId, StringComparison.Ordinal); + } + + private static bool SequencesEquivalent(IEnumerable left, IEnumerable right) + { + string[] leftHashes = left.Select(e => e.Hash).ToArray(); + string[] rightHashes = right.Select(e => e.Hash).ToArray(); + return leftHashes.SequenceEqual(rightHashes, StringComparer.Ordinal); + } + + private static bool VectorClocksEquivalent(VectorClock left, VectorClock right) + { + var nodeIds = new HashSet(left.NodeIds, StringComparer.Ordinal); + foreach (string nodeId in right.NodeIds) nodeIds.Add(nodeId); + + foreach (string nodeId in nodeIds) + if (!left.GetTimestamp(nodeId).Equals(right.GetTimestamp(nodeId))) + return false; + + return true; + } +} diff --git a/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/LmdbOplogBackfillTool.cs b/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/LmdbOplogBackfillTool.cs new file mode 100644 index 0000000..9b7cf24 --- /dev/null +++ b/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/LmdbOplogBackfillTool.cs @@ -0,0 +1,277 @@ +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using ZB.MOM.WW.CBDDC.Core; +using ZB.MOM.WW.CBDDC.Core.Storage; +using ZB.MOM.WW.CBDDC.Persistence.Surreal; + +namespace ZB.MOM.WW.CBDDC.Persistence.Lmdb; + +/// +/// Backfills LMDB oplog content from Surreal and validates parity. +/// +public sealed class LmdbOplogBackfillTool +{ + private readonly LmdbOplogStore _destination; + private readonly ILogger _logger; + private readonly SurrealOplogStore _source; + + /// + /// Initializes a new instance of the class. + /// + public LmdbOplogBackfillTool( + SurrealOplogStore source, + LmdbOplogStore destination, + ILogger? logger = null) + { + _source = source ?? throw new ArgumentNullException(nameof(source)); + _destination = destination ?? throw new ArgumentNullException(nameof(destination)); + _logger = logger ?? NullLogger.Instance; + } + + /// + /// Backfills one dataset from Surreal to LMDB and validates parity. + /// + public async Task BackfillAsync( + string datasetId, + CancellationToken cancellationToken = default) + { + string normalizedDatasetId = DatasetId.Normalize(datasetId); + var sourceEntries = (await _source.ExportAsync(normalizedDatasetId, cancellationToken)) + .OrderBy(entry => entry.Timestamp.PhysicalTime) + .ThenBy(entry => entry.Timestamp.LogicalCounter) + .ThenBy(entry => entry.Timestamp.NodeId, StringComparer.Ordinal) + .ThenBy(entry => entry.Hash, StringComparer.Ordinal) + .ToList(); + + await _destination.MergeAsync(sourceEntries, normalizedDatasetId, cancellationToken); + LmdbOplogBackfillReport report = await ValidateParityAsync(normalizedDatasetId, sourceEntries, cancellationToken); + + _logger.LogInformation( + "LMDB oplog backfill {Result} for dataset {DatasetId}. Source={SourceCount}, Destination={DestinationCount}, HashSpotChecks={HashSpotChecks}, ChainSpotChecks={ChainSpotChecks}.", + report.IsSuccess ? "succeeded" : "failed", + report.DatasetId, + report.SourceCount, + report.DestinationCount, + report.HashSpotCheckCount, + report.ChainSpotCheckCount); + + return report; + } + + /// + /// Validates parity only without running a backfill merge. + /// + public async Task ValidateParityAsync( + string datasetId, + CancellationToken cancellationToken = default) + { + string normalizedDatasetId = DatasetId.Normalize(datasetId); + var sourceEntries = (await _source.ExportAsync(normalizedDatasetId, cancellationToken)).ToList(); + return await ValidateParityAsync(normalizedDatasetId, sourceEntries, cancellationToken); + } + + /// + /// Backfills and throws when parity validation fails. + /// + public async Task BackfillOrThrowAsync( + string datasetId, + CancellationToken cancellationToken = default) + { + LmdbOplogBackfillReport report = await BackfillAsync(datasetId, cancellationToken); + if (report.IsSuccess) return report; + + throw new InvalidOperationException( + $"LMDB oplog backfill parity failed for dataset '{report.DatasetId}'. " + + $"Source={report.SourceCount}, Destination={report.DestinationCount}, " + + $"CountsMatch={report.CountsMatch}, CountsPerNodeMatch={report.CountsPerNodeMatch}, " + + $"LatestHashPerNodeMatch={report.LatestHashPerNodeMatch}, HashSpotChecksPassed={report.HashSpotChecksPassed}, " + + $"ChainSpotChecksPassed={report.ChainSpotChecksPassed}."); + } + + private async Task ValidateParityAsync( + string datasetId, + List sourceEntries, + CancellationToken cancellationToken) + { + var sourceOrdered = sourceEntries + .OrderBy(entry => entry.Timestamp.PhysicalTime) + .ThenBy(entry => entry.Timestamp.LogicalCounter) + .ThenBy(entry => entry.Timestamp.NodeId, StringComparer.Ordinal) + .ThenBy(entry => entry.Hash, StringComparer.Ordinal) + .ToList(); + + var destinationOrdered = (await _destination.ExportAsync(datasetId, cancellationToken)) + .OrderBy(entry => entry.Timestamp.PhysicalTime) + .ThenBy(entry => entry.Timestamp.LogicalCounter) + .ThenBy(entry => entry.Timestamp.NodeId, StringComparer.Ordinal) + .ThenBy(entry => entry.Hash, StringComparer.Ordinal) + .ToList(); + + bool countsMatch = sourceOrdered.Count == destinationOrdered.Count; + IReadOnlyDictionary sourceCountByNode = CountByNode(sourceOrdered); + IReadOnlyDictionary destinationCountByNode = CountByNode(destinationOrdered); + bool countsPerNodeMatch = DictionaryEqual(sourceCountByNode, destinationCountByNode); + + IReadOnlyDictionary sourceLatestHashByNode = LatestHashByNode(sourceOrdered); + IReadOnlyDictionary destinationLatestHashByNode = LatestHashByNode(destinationOrdered); + bool latestHashPerNodeMatch = DictionaryEqual(sourceLatestHashByNode, destinationLatestHashByNode); + + (bool hashSpotChecksPassed, int hashSpotCheckCount) = await RunHashSpotChecksAsync( + datasetId, + sourceOrdered, + cancellationToken); + (bool chainSpotChecksPassed, int chainSpotCheckCount) = await RunChainSpotChecksAsync( + datasetId, + sourceOrdered, + cancellationToken); + + return new LmdbOplogBackfillReport( + datasetId, + sourceOrdered.Count, + destinationOrdered.Count, + sourceCountByNode, + destinationCountByNode, + sourceLatestHashByNode, + destinationLatestHashByNode, + hashSpotCheckCount, + chainSpotCheckCount, + countsMatch, + countsPerNodeMatch, + latestHashPerNodeMatch, + hashSpotChecksPassed, + chainSpotChecksPassed); + } + + private async Task<(bool Passed, int Count)> RunHashSpotChecksAsync( + string datasetId, + IReadOnlyList sourceEntries, + CancellationToken cancellationToken) + { + if (sourceEntries.Count == 0) return (true, 0); + + var sampleIndexes = BuildSampleIndexes(sourceEntries.Count, Math.Min(10, sourceEntries.Count)); + foreach (int index in sampleIndexes) + { + string hash = sourceEntries[index].Hash; + OplogEntry? destinationEntry = await _destination.GetEntryByHashAsync(hash, datasetId, cancellationToken); + if (destinationEntry == null) return (false, sampleIndexes.Count); + } + + return (true, sampleIndexes.Count); + } + + private async Task<(bool Passed, int Count)> RunChainSpotChecksAsync( + string datasetId, + IReadOnlyList sourceEntries, + CancellationToken cancellationToken) + { + if (sourceEntries.Count < 2) return (true, 0); + + var sourceByHash = sourceEntries.ToDictionary(entry => entry.Hash, StringComparer.Ordinal); + var checks = sourceEntries + .Where(entry => !string.IsNullOrWhiteSpace(entry.PreviousHash) && + sourceByHash.ContainsKey(entry.PreviousHash)) + .Take(5) + .Select(entry => (StartHash: entry.PreviousHash, EndHash: entry.Hash)) + .ToList(); + + foreach (var check in checks) + { + string[] sourceChain = (await _source.GetChainRangeAsync(check.StartHash, check.EndHash, datasetId, cancellationToken)) + .Select(entry => entry.Hash) + .ToArray(); + string[] destinationChain = + (await _destination.GetChainRangeAsync(check.StartHash, check.EndHash, datasetId, cancellationToken)) + .Select(entry => entry.Hash) + .ToArray(); + + if (!sourceChain.SequenceEqual(destinationChain, StringComparer.Ordinal)) + return (false, checks.Count); + } + + return (true, checks.Count); + } + + private static IReadOnlyDictionary CountByNode(IEnumerable entries) + { + return entries + .Where(entry => !string.IsNullOrWhiteSpace(entry.Timestamp.NodeId)) + .GroupBy(entry => entry.Timestamp.NodeId, StringComparer.Ordinal) + .ToDictionary(group => group.Key, group => group.Count(), StringComparer.Ordinal); + } + + private static IReadOnlyDictionary LatestHashByNode(IEnumerable entries) + { + return entries + .Where(entry => !string.IsNullOrWhiteSpace(entry.Timestamp.NodeId)) + .GroupBy(entry => entry.Timestamp.NodeId, StringComparer.Ordinal) + .ToDictionary( + group => group.Key, + group => group + .OrderByDescending(entry => entry.Timestamp.PhysicalTime) + .ThenByDescending(entry => entry.Timestamp.LogicalCounter) + .ThenByDescending(entry => entry.Hash, StringComparer.Ordinal) + .First() + .Hash, + StringComparer.Ordinal); + } + + private static bool DictionaryEqual( + IReadOnlyDictionary left, + IReadOnlyDictionary right) + { + if (left.Count != right.Count) return false; + foreach (var pair in left) + { + if (!right.TryGetValue(pair.Key, out T? rightValue)) return false; + if (!EqualityComparer.Default.Equals(pair.Value, rightValue)) return false; + } + + return true; + } + + private static List BuildSampleIndexes(int totalCount, int sampleCount) + { + if (sampleCount <= 0 || totalCount <= 0) return []; + if (sampleCount >= totalCount) return Enumerable.Range(0, totalCount).ToList(); + + var indexes = new HashSet(); + for (var i = 0; i < sampleCount; i++) + { + int index = (int)Math.Round(i * (totalCount - 1d) / (sampleCount - 1d)); + indexes.Add(Math.Clamp(index, 0, totalCount - 1)); + } + + return indexes.OrderBy(value => value).ToList(); + } +} + +/// +/// Parity report produced by the LMDB backfill tool. +/// +public sealed record LmdbOplogBackfillReport( + string DatasetId, + int SourceCount, + int DestinationCount, + IReadOnlyDictionary SourceCountByNode, + IReadOnlyDictionary DestinationCountByNode, + IReadOnlyDictionary SourceLatestHashByNode, + IReadOnlyDictionary DestinationLatestHashByNode, + int HashSpotCheckCount, + int ChainSpotCheckCount, + bool CountsMatch, + bool CountsPerNodeMatch, + bool LatestHashPerNodeMatch, + bool HashSpotChecksPassed, + bool ChainSpotChecksPassed) +{ + /// + /// Gets a value indicating whether parity validation passed all checks. + /// + public bool IsSuccess => + CountsMatch && + CountsPerNodeMatch && + LatestHashPerNodeMatch && + HashSpotChecksPassed && + ChainSpotChecksPassed; +} diff --git a/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/LmdbOplogFeatureFlags.cs b/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/LmdbOplogFeatureFlags.cs new file mode 100644 index 0000000..ae60a2b --- /dev/null +++ b/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/LmdbOplogFeatureFlags.cs @@ -0,0 +1,32 @@ +namespace ZB.MOM.WW.CBDDC.Persistence.Lmdb; + +/// +/// Runtime feature flags controlling Surreal/LMDB oplog migration behavior. +/// +public sealed class LmdbOplogFeatureFlags +{ + /// + /// Gets or sets a value indicating whether LMDB oplog support is enabled. + /// + public bool UseLmdbOplog { get; set; } + + /// + /// Gets or sets a value indicating whether writes should be mirrored to both Surreal and LMDB oplogs. + /// + public bool DualWriteOplog { get; set; } + + /// + /// Gets or sets a value indicating whether reads should prefer LMDB when LMDB is enabled. + /// + public bool PreferLmdbReads { get; set; } + + /// + /// Gets or sets a value indicating whether read results should be shadow-compared between stores. + /// + public bool EnableReadShadowValidation { get; set; } + + /// + /// Gets or sets the minimum interval between reconciliation backfills when LMDB reads are preferred. + /// + public TimeSpan ReconciliationInterval { get; set; } = TimeSpan.FromSeconds(2); +} diff --git a/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/LmdbOplogOptions.cs b/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/LmdbOplogOptions.cs new file mode 100644 index 0000000..75a48c4 --- /dev/null +++ b/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/LmdbOplogOptions.cs @@ -0,0 +1,68 @@ +namespace ZB.MOM.WW.CBDDC.Persistence.Lmdb; + +/// +/// Configuration for the LMDB-backed oplog environment. +/// +public sealed class LmdbOplogOptions +{ + /// + /// Gets or sets the LMDB environment directory path. + /// + public string EnvironmentPath { get; set; } = Path.Combine(AppContext.BaseDirectory, "cbddc-oplog-lmdb"); + + /// + /// Gets or sets the LMDB map size in bytes. + /// + public long MapSizeBytes { get; set; } = 256L * 1024 * 1024; + + /// + /// Gets or sets the maximum number of named databases in the LMDB environment. + /// + public int MaxDatabases { get; set; } = 16; + + /// + /// Gets or sets the maximum concurrent reader slots. + /// + public int MaxReaders { get; set; } = 256; + + /// + /// Gets or sets sync mode balancing durability and write throughput. + /// + public LmdbSyncMode SyncMode { get; set; } = LmdbSyncMode.Full; + + /// + /// Gets or sets the number of entries to process per prune batch transaction. + /// + public int PruneBatchSize { get; set; } = 512; + + /// + /// Gets or sets a value indicating whether prune operations may run compact-copy backup passes. + /// + public bool EnableCompactionCopy { get; set; } +} + +/// +/// LMDB durability modes. +/// +public enum LmdbSyncMode +{ + /// + /// Full durability semantics. + /// + Full, + + /// + /// Skip metadata sync on each commit. + /// + NoMetaSync, + + /// + /// Skip fsync on commit for highest throughput. + /// + NoSync, + + /// + /// Write-mapped asynchronous flush mode. + /// + MapAsync +} diff --git a/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/LmdbOplogStore.cs b/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/LmdbOplogStore.cs new file mode 100644 index 0000000..821df3d --- /dev/null +++ b/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/LmdbOplogStore.cs @@ -0,0 +1,1385 @@ +using System.Buffers.Binary; +using System.Text; +using System.Text.Json; +using LightningDB; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using ZB.MOM.WW.CBDDC.Core; +using ZB.MOM.WW.CBDDC.Core.Storage; +using ZB.MOM.WW.CBDDC.Core.Sync; + +namespace ZB.MOM.WW.CBDDC.Persistence.Lmdb; + +/// +/// LMDB-backed oplog store implementation. +/// +public sealed class LmdbOplogStore : OplogStore, IDisposable +{ + private const string PrimaryDatasetId = DatasetId.Primary; + private static readonly byte[] MarkerValue = [1]; + private static readonly byte[] SchemaVersionKey = Encoding.UTF8.GetBytes("schema_version"); + private static readonly byte[] SchemaVersionValue = Encoding.UTF8.GetBytes("1"); + + private readonly ILogger _logger; + private readonly LmdbOplogOptions _options; + private readonly SemaphoreSlim _writeGate = new(1, 1); + + private LightningEnvironment? _environment; + private LightningDatabase? _oplogByHash; + private LightningDatabase? _oplogByHlc; + private LightningDatabase? _oplogByNodeHlc; + private LightningDatabase? _oplogPrevToHash; + private LightningDatabase? _oplogNodeHead; + private LightningDatabase? _oplogMeta; + + /// + /// Initializes a new instance of the class. + /// + public LmdbOplogStore( + IDocumentStore documentStore, + IConflictResolver conflictResolver, + IVectorClockService vectorClockService, + LmdbOplogOptions options, + ISnapshotMetadataStore? snapshotMetadataStore = null, + ILogger? logger = null) + : base(documentStore, conflictResolver, vectorClockService, snapshotMetadataStore) + { + _options = NormalizeOptions(options ?? throw new ArgumentNullException(nameof(options))); + _logger = logger ?? NullLogger.Instance; + + string environmentPath = Path.GetFullPath(_options.EnvironmentPath); + Directory.CreateDirectory(environmentPath); + + var environmentConfig = new EnvironmentConfiguration + { + MapSize = _options.MapSizeBytes, + MaxDatabases = _options.MaxDatabases, + MaxReaders = _options.MaxReaders, + AutoReduceMapSizeIn32BitProcess = true + }; + + _environment = new LightningEnvironment(environmentPath, environmentConfig); + _environment.Open( + MapSyncModeToOpenFlags(_options.SyncMode), + UnixAccessMode.OwnerRead | UnixAccessMode.OwnerWrite | UnixAccessMode.GroupRead | UnixAccessMode.GroupWrite); + + using (var tx = BeginWriteTransaction()) + { + _oplogByHash = tx.OpenDatabase("oplog_by_hash", + new DatabaseConfiguration { Flags = DatabaseOpenFlags.Create }, + closeOnDispose: false); + _oplogByHlc = tx.OpenDatabase("oplog_by_hlc", + new DatabaseConfiguration { Flags = DatabaseOpenFlags.Create }, + closeOnDispose: false); + _oplogByNodeHlc = tx.OpenDatabase("oplog_by_node_hlc", + new DatabaseConfiguration { Flags = DatabaseOpenFlags.Create }, + closeOnDispose: false); + _oplogPrevToHash = tx.OpenDatabase("oplog_prev_to_hash", + new DatabaseConfiguration { Flags = DatabaseOpenFlags.Create | DatabaseOpenFlags.DuplicatesSort }, + closeOnDispose: false); + _oplogNodeHead = tx.OpenDatabase("oplog_node_head", + new DatabaseConfiguration { Flags = DatabaseOpenFlags.Create }, + closeOnDispose: false); + _oplogMeta = tx.OpenDatabase("oplog_meta", + new DatabaseConfiguration { Flags = DatabaseOpenFlags.Create }, + closeOnDispose: false); + + tx.Put(_oplogMeta, SchemaVersionKey, SchemaVersionValue).ThrowOnError(); + tx.Commit().ThrowOnError(); + } + + _vectorClock.Invalidate(); + InitializeVectorClock(); + } + + /// + public override async Task> GetChainRangeAsync( + string startHash, + string endHash, + CancellationToken cancellationToken = default) + { + return await GetChainRangeAsync(startHash, endHash, PrimaryDatasetId, cancellationToken); + } + + /// + public async Task> GetChainRangeAsync( + string startHash, + string endHash, + string datasetId, + CancellationToken cancellationToken = default) + { + string normalizedDatasetId = NormalizeDatasetId(datasetId); + OplogEntry? start = await GetEntryByHashAsync(startHash, normalizedDatasetId, cancellationToken); + if (start == null) return []; + + OplogEntry? end = await GetEntryByHashAsync(endHash, normalizedDatasetId, cancellationToken); + if (end == null) return []; + + var reverse = new List(); + string currentHash = endHash; + + for (var guard = 0; guard < 100_000; guard++) + { + if (string.Equals(currentHash, startHash, StringComparison.Ordinal)) + { + reverse.Reverse(); + return reverse; + } + + OplogEntry? entry = await GetEntryByHashAsync(currentHash, normalizedDatasetId, cancellationToken); + if (entry == null) return []; + + reverse.Add(entry); + if (string.IsNullOrWhiteSpace(entry.PreviousHash)) return []; + currentHash = entry.PreviousHash; + } + + _logger.LogWarning( + "Chain range traversal exceeded guard limit for dataset {DatasetId}. start={StartHash}, end={EndHash}.", + normalizedDatasetId, + startHash, + endHash); + return []; + } + + /// + public override Task GetEntryByHashAsync(string hash, CancellationToken cancellationToken = default) + { + return GetEntryByHashAsync(hash, PrimaryDatasetId, cancellationToken); + } + + /// + public Task GetEntryByHashAsync( + string hash, + string datasetId, + CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + + string normalizedDatasetId = NormalizeDatasetId(datasetId); + byte[] key = BuildHashKey(normalizedDatasetId, hash); + + using var tx = BeginReadTransaction(); + if (!tx.TryGet(OplogByHashDb, key, out byte[]? payload) || payload == null) + return Task.FromResult(null); + + var entry = DeserializeEntry(payload); + return Task.FromResult(entry); + } + + /// + public override Task> GetOplogAfterAsync( + HlcTimestamp timestamp, + IEnumerable? collections = null, + CancellationToken cancellationToken = default) + { + return GetOplogAfterAsync(timestamp, PrimaryDatasetId, collections, cancellationToken); + } + + /// + public Task> GetOplogAfterAsync( + HlcTimestamp timestamp, + string datasetId, + IEnumerable? collections = null, + CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + + string normalizedDatasetId = NormalizeDatasetId(datasetId); + byte[] prefix = BuildDatasetPrefix(normalizedDatasetId); + HashSet? collectionSet = collections == null ? null : new HashSet(collections, StringComparer.Ordinal); + + using var tx = BeginReadTransaction(); + using var cursor = tx.CreateCursor(OplogByHlcDb); + + var entries = new List(); + if (cursor.SetRange(prefix) != MDBResultCode.Success) return Task.FromResult>(entries); + + foreach (var (key, _) in EnumerateFromCurrent(cursor)) + { + cancellationToken.ThrowIfCancellationRequested(); + if (!HasPrefix(key, prefix)) break; + + if (!TryDecodeHlcKey(key, out DecodedHlcKey decoded)) continue; + if (!IsStrictlyAfter(decoded.Wall, decoded.Logic, timestamp)) continue; + + OplogEntry? entry = TryGetEntryByHash(tx, normalizedDatasetId, decoded.Hash); + if (entry == null) continue; + if (collectionSet != null && !collectionSet.Contains(entry.Collection)) continue; + + entries.Add(entry); + } + + return Task.FromResult>(entries); + } + + /// + public override Task> GetOplogForNodeAfterAsync( + string nodeId, + HlcTimestamp since, + IEnumerable? collections = null, + CancellationToken cancellationToken = default) + { + return GetOplogForNodeAfterAsync(nodeId, since, PrimaryDatasetId, collections, cancellationToken); + } + + /// + public Task> GetOplogForNodeAfterAsync( + string nodeId, + HlcTimestamp since, + string datasetId, + IEnumerable? collections = null, + CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + + string normalizedDatasetId = NormalizeDatasetId(datasetId); + byte[] prefix = BuildNodePrefix(normalizedDatasetId, nodeId); + HashSet? collectionSet = collections == null ? null : new HashSet(collections, StringComparer.Ordinal); + + using var tx = BeginReadTransaction(); + using var cursor = tx.CreateCursor(OplogByNodeHlcDb); + + var entries = new List(); + if (cursor.SetRange(prefix) != MDBResultCode.Success) return Task.FromResult>(entries); + + foreach (var (key, _) in EnumerateFromCurrent(cursor)) + { + cancellationToken.ThrowIfCancellationRequested(); + if (!HasPrefix(key, prefix)) break; + + if (!TryDecodeNodeHlcKey(key, out DecodedNodeHlcKey decoded)) continue; + if (!IsStrictlyAfter(decoded.Wall, decoded.Logic, since)) continue; + + OplogEntry? entry = TryGetEntryByHash(tx, normalizedDatasetId, decoded.Hash); + if (entry == null) continue; + if (collectionSet != null && !collectionSet.Contains(entry.Collection)) continue; + + entries.Add(entry); + } + + return Task.FromResult>(entries); + } + + /// + public override Task PruneOplogAsync(HlcTimestamp cutoff, CancellationToken cancellationToken = default) + { + return PruneOplogAsync(cutoff, PrimaryDatasetId, cancellationToken); + } + + /// + public async Task PruneOplogAsync( + HlcTimestamp cutoff, + string datasetId, + CancellationToken cancellationToken = default) + { + string normalizedDatasetId = NormalizeDatasetId(datasetId); + byte[]? resumeKey = null; + var totalPruned = 0; + + while (true) + { + cancellationToken.ThrowIfCancellationRequested(); + + var batch = ReadPruneBatch(normalizedDatasetId, cutoff, resumeKey, _options.PruneBatchSize, cancellationToken); + if (batch.Count == 0) break; + + var touchedNodeCount = 0; + await ExecuteWriteTransactionAsync(tx => + { + var touchedNodes = new HashSet(StringComparer.Ordinal); + + foreach (PruneCandidate candidate in batch) + { + RemoveEntryFromIndexes(tx, candidate.Entry, candidate.HlcKey); + if (!string.IsNullOrWhiteSpace(candidate.Entry.Timestamp.NodeId)) + touchedNodes.Add(candidate.Entry.Timestamp.NodeId); + } + + foreach (string touchedNode in touchedNodes) RecomputeNodeHead(tx, normalizedDatasetId, touchedNode); + touchedNodeCount = touchedNodes.Count; + + // Keep a lightweight per-dataset prune watermark for startup diagnostics/reconciliation. + byte[] watermarkKey = BuildPruneWatermarkKey(normalizedDatasetId); + byte[] watermarkValue = EncodePruneWatermark(cutoff); + tx.Put(OplogMetaDb, watermarkKey, watermarkValue).ThrowOnError(); + }, cancellationToken); + + totalPruned += batch.Count; + resumeKey = batch[^1].HlcKey; + + _logger.LogDebug( + "Prune batch processed {BatchCount} entries for dataset {DatasetId}; touched {TouchedNodeCount} node heads.", + batch.Count, + normalizedDatasetId, + touchedNodeCount); + } + + if (totalPruned > 0) + { + if (string.Equals(normalizedDatasetId, PrimaryDatasetId, StringComparison.Ordinal)) + { + _vectorClock.Invalidate(); + InitializeVectorClock(); + } + + _logger.LogInformation( + "Pruned {PrunedCount} oplog entries for dataset {DatasetId} at cutoff {Wall}/{Logic}.", + totalPruned, + normalizedDatasetId, + cutoff.PhysicalTime, + cutoff.LogicalCounter); + } + } + + /// + public override async Task DropAsync(CancellationToken cancellationToken = default) + { + await DropAsync(PrimaryDatasetId, cancellationToken); + _vectorClock.Invalidate(); + } + + /// + /// Drops all oplog data for the specified dataset. + /// + public async Task DropAsync(string datasetId, CancellationToken cancellationToken = default) + { + string normalizedDatasetId = NormalizeDatasetId(datasetId); + byte[] datasetPrefix = BuildDatasetPrefix(normalizedDatasetId); + + await ExecuteWriteTransactionAsync(tx => + { + DeleteKeysByPrefix(tx, OplogByHashDb, datasetPrefix); + DeleteKeysByPrefix(tx, OplogByHlcDb, datasetPrefix); + DeleteKeysByPrefix(tx, OplogByNodeHlcDb, datasetPrefix); + DeleteKeysByPrefix(tx, OplogNodeHeadDb, datasetPrefix); + DeleteKeysByPrefix(tx, OplogPrevToHashDb, datasetPrefix); + tx.Delete(OplogMetaDb, BuildPruneWatermarkKey(normalizedDatasetId)); + }, cancellationToken); + + if (string.Equals(normalizedDatasetId, PrimaryDatasetId, StringComparison.Ordinal)) + { + _vectorClock.Invalidate(); + InitializeVectorClock(); + } + } + + /// + public override Task> ExportAsync(CancellationToken cancellationToken = default) + { + return ExportAsync(PrimaryDatasetId, cancellationToken); + } + + /// + /// Exports all oplog entries for a dataset. + /// + public Task> ExportAsync(string datasetId, CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + + string normalizedDatasetId = NormalizeDatasetId(datasetId); + byte[] prefix = BuildDatasetPrefix(normalizedDatasetId); + var entries = new List(); + + using var tx = BeginReadTransaction(); + using var cursor = tx.CreateCursor(OplogByHlcDb); + + if (cursor.SetRange(prefix) != MDBResultCode.Success) return Task.FromResult>(entries); + + foreach (var (key, _) in EnumerateFromCurrent(cursor)) + { + cancellationToken.ThrowIfCancellationRequested(); + if (!HasPrefix(key, prefix)) break; + + if (!TryDecodeHlcKey(key, out DecodedHlcKey decoded)) continue; + OplogEntry? entry = TryGetEntryByHash(tx, normalizedDatasetId, decoded.Hash); + if (entry != null) entries.Add(entry); + } + + return Task.FromResult>(entries); + } + + /// + public override Task ImportAsync(IEnumerable items, CancellationToken cancellationToken = default) + { + return ImportAsync(items, PrimaryDatasetId, cancellationToken); + } + + /// + /// Imports oplog entries for a dataset (upsert semantics). + /// + public Task ImportAsync( + IEnumerable items, + string datasetId, + CancellationToken cancellationToken = default) + { + return UpsertEntriesAsync(items, NormalizeDatasetId(datasetId), overwriteExisting: true, cancellationToken); + } + + /// + public override Task MergeAsync(IEnumerable items, CancellationToken cancellationToken = default) + { + return MergeAsync(items, PrimaryDatasetId, cancellationToken); + } + + /// + /// Merges oplog entries into a dataset (dedupe by hash). + /// + public Task MergeAsync( + IEnumerable items, + string datasetId, + CancellationToken cancellationToken = default) + { + return UpsertEntriesAsync(items, NormalizeDatasetId(datasetId), overwriteExisting: false, cancellationToken); + } + + /// + protected override void InitializeVectorClock() + { + if (_vectorClock.IsInitialized) return; + if (_environment == null || _oplogNodeHead == null) + { + _vectorClock.IsInitialized = true; + return; + } + + if (_snapshotMetadataStore != null) + try + { + var snapshots = _snapshotMetadataStore.GetAllSnapshotMetadataAsync().GetAwaiter().GetResult(); + foreach (SnapshotMetadata snapshot in snapshots) + { + if (!string.Equals(NormalizeDatasetId(snapshot.DatasetId), PrimaryDatasetId, StringComparison.Ordinal)) + continue; + + _vectorClock.UpdateNode( + snapshot.NodeId, + new HlcTimestamp( + snapshot.TimestampPhysicalTime, + snapshot.TimestampLogicalCounter, + snapshot.NodeId), + snapshot.Hash ?? string.Empty); + } + } + catch + { + // Ignore snapshot bootstrap failures; oplog index scan remains authoritative. + } + + using var tx = BeginReadTransaction(); + byte[] primaryPrefix = BuildDatasetPrefix(PrimaryDatasetId); + using var cursor = tx.CreateCursor(OplogNodeHeadDb); + + if (cursor.SetRange(primaryPrefix) == MDBResultCode.Success) + foreach (var (key, value) in EnumerateFromCurrent(cursor)) + { + if (!HasPrefix(key, primaryPrefix)) break; + + if (!TryDecodeNodeHeadKey(key, out string nodeId)) continue; + if (!TryDecodeNodeHeadValue(value, out NodeHeadValue headValue)) continue; + + _vectorClock.UpdateNode( + nodeId, + new HlcTimestamp(headValue.Wall, headValue.Logic, nodeId), + headValue.Hash); + } + + _vectorClock.IsInitialized = true; + } + + /// + protected override Task InsertOplogEntryAsync(OplogEntry entry, CancellationToken cancellationToken = default) + { + OplogEntry normalizedEntry = NormalizeEntryDataset(entry, NormalizeDatasetId(entry.DatasetId)); + return ExecuteWriteTransactionAsync(tx => InsertIfMissing(tx, normalizedEntry), cancellationToken); + } + + /// + protected override Task QueryLastHashForNodeAsync( + string nodeId, + CancellationToken cancellationToken = default) + { + return QueryLastHashForNodeAsync(nodeId, PrimaryDatasetId, cancellationToken); + } + + /// + protected override Task<(long Wall, int Logic)?> QueryLastHashTimestampFromOplogAsync( + string hash, + CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + + using var tx = BeginReadTransaction(); + OplogEntry? entry = TryGetEntryByHash(tx, PrimaryDatasetId, hash); + if (entry == null) return Task.FromResult<(long Wall, int Logic)?>(null); + return Task.FromResult<(long Wall, int Logic)?>((entry.Timestamp.PhysicalTime, entry.Timestamp.LogicalCounter)); + } + + /// + public async Task AppendOplogEntryAsync( + OplogEntry entry, + string datasetId, + CancellationToken cancellationToken = default) + { + string normalizedDatasetId = NormalizeDatasetId(datasetId); + OplogEntry normalizedEntry = NormalizeEntryDataset(entry, normalizedDatasetId); + + await ExecuteWriteTransactionAsync(tx => InsertIfMissing(tx, normalizedEntry), cancellationToken); + if (string.Equals(normalizedDatasetId, PrimaryDatasetId, StringComparison.Ordinal)) + _vectorClock.Update(normalizedEntry); + } + + /// + public Task GetLatestTimestampAsync(string datasetId, CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + + string normalizedDatasetId = NormalizeDatasetId(datasetId); + byte[] prefix = BuildDatasetPrefix(normalizedDatasetId); + + using var tx = BeginReadTransaction(); + using var cursor = tx.CreateCursor(OplogByHlcDb); + + DecodedHlcKey? latest = null; + + if (cursor.SetRange(prefix) == MDBResultCode.Success) + foreach (var (key, _) in EnumerateFromCurrent(cursor)) + { + if (!HasPrefix(key, prefix)) break; + + if (TryDecodeHlcKey(key, out DecodedHlcKey decoded)) latest = decoded; + } + + HlcTimestamp timestamp = latest == null + ? new HlcTimestamp(0, 0, string.Empty) + : new HlcTimestamp(latest.Value.Wall, latest.Value.Logic, latest.Value.NodeId); + + return Task.FromResult(timestamp); + } + + /// + public Task GetVectorClockAsync(string datasetId, CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + + string normalizedDatasetId = NormalizeDatasetId(datasetId); + byte[] prefix = BuildDatasetPrefix(normalizedDatasetId); + var vectorClock = new VectorClock(); + + using var tx = BeginReadTransaction(); + using var cursor = tx.CreateCursor(OplogNodeHeadDb); + + if (cursor.SetRange(prefix) != MDBResultCode.Success) + return Task.FromResult(vectorClock); + + foreach (var (key, value) in EnumerateFromCurrent(cursor)) + { + if (!HasPrefix(key, prefix)) break; + + if (!TryDecodeNodeHeadKey(key, out string nodeId)) continue; + if (!TryDecodeNodeHeadValue(value, out NodeHeadValue nodeHead)) continue; + + vectorClock.SetTimestamp(nodeId, new HlcTimestamp(nodeHead.Wall, nodeHead.Logic, nodeId)); + } + + return Task.FromResult(vectorClock); + } + + /// + public Task GetLastEntryHashAsync( + string nodeId, + string datasetId, + CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + + string normalizedDatasetId = NormalizeDatasetId(datasetId); + byte[] nodeHeadKey = BuildNodeHeadKey(normalizedDatasetId, nodeId); + + using var tx = BeginReadTransaction(); + var (resultCode, _, value) = tx.Get(OplogNodeHeadDb, nodeHeadKey); + if (resultCode == MDBResultCode.Success) + { + NodeHeadValue head = DecodeNodeHeadValue(value.CopyToNewArray()); + return Task.FromResult(head.Hash); + } + + if (_snapshotMetadataStore == null) return Task.FromResult(null); + + return _snapshotMetadataStore.GetSnapshotHashAsync(nodeId, normalizedDatasetId, cancellationToken); + } + + /// + public Task ApplyBatchAsync( + IEnumerable oplogEntries, + string datasetId, + CancellationToken cancellationToken = default) + { + string normalizedDatasetId = NormalizeDatasetId(datasetId); + var normalizedEntries = oplogEntries + .Select(entry => NormalizeEntryDataset(entry, normalizedDatasetId)) + .ToList(); + + return ApplyBatchAsync(normalizedEntries, cancellationToken); + } + + /// + /// Returns index-level diagnostics for a dataset, useful in contract/unit tests. + /// + public Task GetIndexDiagnosticsAsync( + string datasetId, + CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + + string normalizedDatasetId = NormalizeDatasetId(datasetId); + byte[] prefix = BuildDatasetPrefix(normalizedDatasetId); + + using var tx = BeginReadTransaction(); + long byHashCount = CountEntriesByPrefix(tx, OplogByHashDb, prefix); + long byHlcCount = CountEntriesByPrefix(tx, OplogByHlcDb, prefix); + long byNodeHlcCount = CountEntriesByPrefix(tx, OplogByNodeHlcDb, prefix); + long nodeHeadCount = CountEntriesByPrefix(tx, OplogNodeHeadDb, prefix); + long prevToHashCount = CountEntriesByPrefix(tx, OplogPrevToHashDb, prefix); + + var diagnostics = new LmdbOplogIndexDiagnostics( + normalizedDatasetId, + byHashCount, + byHlcCount, + byNodeHlcCount, + prevToHashCount, + nodeHeadCount); + + return Task.FromResult(diagnostics); + } + + /// + public void Dispose() + { + _writeGate.Dispose(); + + _oplogMeta?.Dispose(); + _oplogNodeHead?.Dispose(); + _oplogPrevToHash?.Dispose(); + _oplogByNodeHlc?.Dispose(); + _oplogByHlc?.Dispose(); + _oplogByHash?.Dispose(); + + _oplogMeta = null; + _oplogNodeHead = null; + _oplogPrevToHash = null; + _oplogByNodeHlc = null; + _oplogByHlc = null; + _oplogByHash = null; + + _environment?.Dispose(); + _environment = null; + } + + private async Task UpsertEntriesAsync( + IEnumerable items, + string datasetId, + bool overwriteExisting, + CancellationToken cancellationToken) + { + var normalizedItems = items + .Select(item => NormalizeEntryDataset(item, datasetId)) + .ToList(); + + if (normalizedItems.Count == 0) return; + + await ExecuteWriteTransactionAsync(tx => + { + foreach (OplogEntry entry in normalizedItems) + { + byte[] hashKey = BuildHashKey(datasetId, entry.Hash); + OplogEntry? existing = TryGetEntryByHash(tx, datasetId, entry.Hash); + if (existing != null) + { + if (!overwriteExisting) continue; + RemoveEntryFromIndexes(tx, existing, BuildHlcKey(existing)); + } + + InsertEntry(tx, entry, hashKey); + } + }, cancellationToken); + + } + + private void InsertIfMissing(LightningTransaction tx, OplogEntry entry) + { + byte[] hashKey = BuildHashKey(entry.DatasetId, entry.Hash); + if (tx.ContainsKey(OplogByHashDb, hashKey)) return; + + InsertEntry(tx, entry, hashKey); + } + + private void InsertEntry(LightningTransaction tx, OplogEntry entry, byte[] hashKey) + { + byte[] payload = SerializeEntry(entry); + tx.Put(OplogByHashDb, hashKey, payload).ThrowOnError(); + + byte[] hlcKey = BuildHlcKey(entry); + tx.Put(OplogByHlcDb, hlcKey, MarkerValue).ThrowOnError(); + + byte[] nodeHlcKey = BuildNodeHlcKey(entry); + tx.Put(OplogByNodeHlcDb, nodeHlcKey, MarkerValue).ThrowOnError(); + + if (!string.IsNullOrWhiteSpace(entry.PreviousHash)) + { + byte[] prevKey = BuildPrevToHashKey(entry.DatasetId, entry.PreviousHash); + byte[] hashValue = EncodeString(entry.Hash); + tx.Put(OplogPrevToHashDb, prevKey, hashValue).ThrowOnError(); + } + + UpsertNodeHead(tx, entry); + } + + private void RemoveEntryFromIndexes(LightningTransaction tx, OplogEntry entry, byte[] hlcKey) + { + byte[] hashKey = BuildHashKey(entry.DatasetId, entry.Hash); + IgnoreNotFound(tx.Delete(OplogByHashDb, hashKey)); + + IgnoreNotFound(tx.Delete(OplogByHlcDb, hlcKey)); + + byte[] nodeHlcKey = BuildNodeHlcKey(entry); + IgnoreNotFound(tx.Delete(OplogByNodeHlcDb, nodeHlcKey)); + + if (!string.IsNullOrWhiteSpace(entry.PreviousHash)) + { + byte[] prevKey = BuildPrevToHashKey(entry.DatasetId, entry.PreviousHash); + byte[] hashValue = EncodeString(entry.Hash); + IgnoreNotFound(tx.Delete(OplogPrevToHashDb, prevKey, hashValue)); + } + } + + private void UpsertNodeHead(LightningTransaction tx, OplogEntry entry) + { + byte[] nodeHeadKey = BuildNodeHeadKey(entry.DatasetId, entry.Timestamp.NodeId); + var (resultCode, _, value) = tx.Get(OplogNodeHeadDb, nodeHeadKey); + + if (resultCode == MDBResultCode.Success) + { + NodeHeadValue existing = DecodeNodeHeadValue(value.CopyToNewArray()); + if (!IsStrictlyAfter(entry.Timestamp.PhysicalTime, entry.Timestamp.LogicalCounter, existing.Wall, existing.Logic)) + return; + } + + tx.Put(OplogNodeHeadDb, nodeHeadKey, EncodeNodeHeadValue(entry)).ThrowOnError(); + } + + private void RecomputeNodeHead(LightningTransaction tx, string datasetId, string nodeId) + { + byte[] nodePrefix = BuildNodePrefix(datasetId, nodeId); + using var cursor = tx.CreateCursor(OplogByNodeHlcDb); + + byte[]? lastKey = null; + if (cursor.SetRange(nodePrefix) == MDBResultCode.Success) + foreach (var (key, _) in EnumerateFromCurrent(cursor)) + { + if (!HasPrefix(key, nodePrefix)) break; + lastKey = key; + } + + byte[] nodeHeadKey = BuildNodeHeadKey(datasetId, nodeId); + if (lastKey == null) + { + IgnoreNotFound(tx.Delete(OplogNodeHeadDb, nodeHeadKey)); + return; + } + + if (!TryDecodeNodeHlcKey(lastKey, out DecodedNodeHlcKey decoded)) + { + IgnoreNotFound(tx.Delete(OplogNodeHeadDb, nodeHeadKey)); + return; + } + + byte[] nodeHeadValue = EncodeNodeHeadValue(decoded.Wall, decoded.Logic, decoded.Hash); + tx.Put(OplogNodeHeadDb, nodeHeadKey, nodeHeadValue).ThrowOnError(); + } + + private List ReadPruneBatch( + string datasetId, + HlcTimestamp cutoff, + byte[]? resumeKey, + int batchSize, + CancellationToken cancellationToken) + { + byte[] prefix = BuildDatasetPrefix(datasetId); + var batch = new List(batchSize); + + using var tx = BeginReadTransaction(); + using var cursor = tx.CreateCursor(OplogByHlcDb); + + byte[] startKey = resumeKey ?? prefix; + if (cursor.SetRange(startKey) != MDBResultCode.Success) return batch; + + bool skipResume = resumeKey != null; + + foreach (var (key, _) in EnumerateFromCurrent(cursor)) + { + cancellationToken.ThrowIfCancellationRequested(); + if (!HasPrefix(key, prefix)) break; + + if (skipResume) + { + skipResume = false; + if (key.AsSpan().SequenceEqual(resumeKey)) continue; + } + + if (!TryDecodeHlcKey(key, out DecodedHlcKey decoded)) continue; + if (IsStrictlyAfter(decoded.Wall, decoded.Logic, cutoff.PhysicalTime, cutoff.LogicalCounter)) break; + + OplogEntry? entry = TryGetEntryByHash(tx, datasetId, decoded.Hash); + if (entry == null) + { + // In case of index drift, synthesize a minimal entry so all index records are still removed. + entry = new OplogEntry( + string.Empty, + string.Empty, + OperationType.Put, + null, + new HlcTimestamp(decoded.Wall, decoded.Logic, decoded.NodeId), + string.Empty, + decoded.Hash, + datasetId); + } + + batch.Add(new PruneCandidate(key, entry)); + if (batch.Count >= batchSize) break; + } + + return batch; + } + + private void DeleteKeysByPrefix(LightningTransaction tx, LightningDatabase database, byte[] prefix) + { + var keys = new List(); + using var cursor = tx.CreateCursor(database); + + if (cursor.SetRange(prefix) != MDBResultCode.Success) return; + + foreach (var (key, _) in EnumerateFromCurrent(cursor)) + { + if (!HasPrefix(key, prefix)) break; + keys.Add(key); + } + + foreach (byte[] key in keys) IgnoreNotFound(tx.Delete(database, key)); + } + + private static long CountEntriesByPrefix(LightningTransaction tx, LightningDatabase database, byte[] prefix) + { + long count = 0; + using var cursor = tx.CreateCursor(database); + if (cursor.SetRange(prefix) != MDBResultCode.Success) return count; + + foreach (var (key, _) in EnumerateFromCurrent(cursor)) + { + if (!HasPrefix(key, prefix)) break; + count++; + } + + return count; + } + + private OplogEntry? TryGetEntryByHash(LightningTransaction tx, string datasetId, string hash) + { + byte[] hashKey = BuildHashKey(datasetId, hash); + if (!tx.TryGet(OplogByHashDb, hashKey, out byte[]? payload) || payload == null) + return null; + + return DeserializeEntry(payload); + } + + private Task QueryLastHashForNodeAsync(string nodeId, string datasetId, CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + + string normalizedDatasetId = NormalizeDatasetId(datasetId); + byte[] key = BuildNodeHeadKey(normalizedDatasetId, nodeId); + + using var tx = BeginReadTransaction(); + var (resultCode, _, value) = tx.Get(OplogNodeHeadDb, key); + if (resultCode != MDBResultCode.Success) return Task.FromResult(null); + + NodeHeadValue head = DecodeNodeHeadValue(value.CopyToNewArray()); + return Task.FromResult(head.Hash); + } + + private async Task ExecuteWriteTransactionAsync( + Action operation, + CancellationToken cancellationToken) + { + await _writeGate.WaitAsync(cancellationToken); + try + { + for (var attempt = 0; attempt < 2; attempt++) + try + { + cancellationToken.ThrowIfCancellationRequested(); + + using var tx = BeginWriteTransaction(); + operation(tx); + tx.Commit().ThrowOnError(); + return; + } + catch (LightningException exception) when ( + exception.StatusCode == (int)MDBResultCode.MapFull && + attempt == 0) + { + GrowMapSize(); + } + } + finally + { + _writeGate.Release(); + } + } + + private void GrowMapSize() + { + if (_environment == null) return; + + long currentSize = _environment.MapSize; + long increase = Math.Max(currentSize / 2, 64L * 1024 * 1024); + long newSize = checked(currentSize + increase); + _environment.MapSize = newSize; + + _logger.LogWarning( + "LMDB map was full; grew map size from {OldMapSize} bytes to {NewMapSize} bytes.", + currentSize, + newSize); + } + + private LightningTransaction BeginReadTransaction() + { + return Environment.BeginTransaction(TransactionBeginFlags.ReadOnly); + } + + private LightningTransaction BeginWriteTransaction() + { + return Environment.BeginTransaction(TransactionBeginFlags.None); + } + + private static LmdbOplogOptions NormalizeOptions(LmdbOplogOptions options) + { + options.EnvironmentPath = string.IsNullOrWhiteSpace(options.EnvironmentPath) + ? Path.Combine(AppContext.BaseDirectory, "cbddc-oplog-lmdb") + : options.EnvironmentPath; + + if (options.MapSizeBytes <= 0) options.MapSizeBytes = 256L * 1024 * 1024; + if (options.MaxDatabases <= 0) options.MaxDatabases = 16; + if (options.MaxReaders <= 0) options.MaxReaders = 256; + if (options.PruneBatchSize <= 0) options.PruneBatchSize = 512; + + return options; + } + + private static EnvironmentOpenFlags MapSyncModeToOpenFlags(LmdbSyncMode syncMode) + { + return syncMode switch + { + LmdbSyncMode.NoMetaSync => EnvironmentOpenFlags.NoMetaSync, + LmdbSyncMode.NoSync => EnvironmentOpenFlags.NoSync, + LmdbSyncMode.MapAsync => EnvironmentOpenFlags.WriteMap | EnvironmentOpenFlags.MapAsync, + _ => EnvironmentOpenFlags.None + }; + } + + private static string NormalizeDatasetId(string? datasetId) + { + return DatasetId.Normalize(datasetId); + } + + private static OplogEntry NormalizeEntryDataset(OplogEntry entry, string datasetId) + { + return new OplogEntry( + entry.Collection, + entry.Key, + entry.Operation, + entry.Payload, + entry.Timestamp, + entry.PreviousHash, + entry.Hash, + datasetId); + } + + private static bool IsStrictlyAfter(long wall, int logic, HlcTimestamp threshold) + { + return IsStrictlyAfter(wall, logic, threshold.PhysicalTime, threshold.LogicalCounter); + } + + private static bool IsStrictlyAfter(long wall, int logic, long otherWall, int otherLogic) + { + return wall > otherWall || + (wall == otherWall && logic > otherLogic); + } + + private static byte[] EncodeString(string value) + { + return Encoding.UTF8.GetBytes(value ?? string.Empty); + } + + private static string DecodeString(ReadOnlySpan value) + { + return Encoding.UTF8.GetString(value); + } + + private static byte[] BuildDatasetPrefix(string datasetId) + { + byte[] datasetBytes = EncodeString(datasetId); + var key = new byte[datasetBytes.Length + 1]; + Buffer.BlockCopy(datasetBytes, 0, key, 0, datasetBytes.Length); + key[^1] = 0; + return key; + } + + private static byte[] BuildHashKey(string datasetId, string hash) + { + byte[] datasetBytes = EncodeString(datasetId); + byte[] hashBytes = EncodeString(hash); + + var key = new byte[datasetBytes.Length + 1 + hashBytes.Length]; + Buffer.BlockCopy(datasetBytes, 0, key, 0, datasetBytes.Length); + key[datasetBytes.Length] = 0; + Buffer.BlockCopy(hashBytes, 0, key, datasetBytes.Length + 1, hashBytes.Length); + return key; + } + + private static byte[] BuildHlcKey(OplogEntry entry) + { + byte[] datasetBytes = EncodeString(entry.DatasetId); + byte[] wallBytes = EncodeLongSortable(entry.Timestamp.PhysicalTime); + byte[] logicBytes = EncodeIntSortable(entry.Timestamp.LogicalCounter); + byte[] nodeBytes = EncodeString(entry.Timestamp.NodeId); + byte[] hashBytes = EncodeString(entry.Hash); + + var key = new byte[ + datasetBytes.Length + 1 + + wallBytes.Length + logicBytes.Length + 1 + + nodeBytes.Length + 1 + + hashBytes.Length]; + + var offset = 0; + Buffer.BlockCopy(datasetBytes, 0, key, offset, datasetBytes.Length); + offset += datasetBytes.Length; + key[offset++] = 0; + + Buffer.BlockCopy(wallBytes, 0, key, offset, wallBytes.Length); + offset += wallBytes.Length; + Buffer.BlockCopy(logicBytes, 0, key, offset, logicBytes.Length); + offset += logicBytes.Length; + + key[offset++] = 0; + Buffer.BlockCopy(nodeBytes, 0, key, offset, nodeBytes.Length); + offset += nodeBytes.Length; + + key[offset++] = 0; + Buffer.BlockCopy(hashBytes, 0, key, offset, hashBytes.Length); + + return key; + } + + private static byte[] BuildNodePrefix(string datasetId, string nodeId) + { + byte[] datasetBytes = EncodeString(datasetId); + byte[] nodeBytes = EncodeString(nodeId); + + var key = new byte[datasetBytes.Length + 1 + nodeBytes.Length + 1]; + Buffer.BlockCopy(datasetBytes, 0, key, 0, datasetBytes.Length); + key[datasetBytes.Length] = 0; + Buffer.BlockCopy(nodeBytes, 0, key, datasetBytes.Length + 1, nodeBytes.Length); + key[^1] = 0; + return key; + } + + private static byte[] BuildNodeHlcKey(OplogEntry entry) + { + byte[] datasetBytes = EncodeString(entry.DatasetId); + byte[] nodeBytes = EncodeString(entry.Timestamp.NodeId); + byte[] wallBytes = EncodeLongSortable(entry.Timestamp.PhysicalTime); + byte[] logicBytes = EncodeIntSortable(entry.Timestamp.LogicalCounter); + byte[] hashBytes = EncodeString(entry.Hash); + + var key = new byte[ + datasetBytes.Length + 1 + + nodeBytes.Length + 1 + + wallBytes.Length + logicBytes.Length + + hashBytes.Length]; + + var offset = 0; + Buffer.BlockCopy(datasetBytes, 0, key, offset, datasetBytes.Length); + offset += datasetBytes.Length; + key[offset++] = 0; + + Buffer.BlockCopy(nodeBytes, 0, key, offset, nodeBytes.Length); + offset += nodeBytes.Length; + key[offset++] = 0; + + Buffer.BlockCopy(wallBytes, 0, key, offset, wallBytes.Length); + offset += wallBytes.Length; + + Buffer.BlockCopy(logicBytes, 0, key, offset, logicBytes.Length); + offset += logicBytes.Length; + + Buffer.BlockCopy(hashBytes, 0, key, offset, hashBytes.Length); + return key; + } + + private static byte[] BuildPrevToHashKey(string datasetId, string previousHash) + { + byte[] datasetBytes = EncodeString(datasetId); + byte[] prevBytes = EncodeString(previousHash); + + var key = new byte[datasetBytes.Length + 1 + prevBytes.Length]; + Buffer.BlockCopy(datasetBytes, 0, key, 0, datasetBytes.Length); + key[datasetBytes.Length] = 0; + Buffer.BlockCopy(prevBytes, 0, key, datasetBytes.Length + 1, prevBytes.Length); + return key; + } + + private static byte[] BuildNodeHeadKey(string datasetId, string nodeId) + { + byte[] datasetBytes = EncodeString(datasetId); + byte[] nodeBytes = EncodeString(nodeId); + + var key = new byte[datasetBytes.Length + 1 + nodeBytes.Length]; + Buffer.BlockCopy(datasetBytes, 0, key, 0, datasetBytes.Length); + key[datasetBytes.Length] = 0; + Buffer.BlockCopy(nodeBytes, 0, key, datasetBytes.Length + 1, nodeBytes.Length); + return key; + } + + private static byte[] BuildPruneWatermarkKey(string datasetId) + { + return EncodeString($"{datasetId}\u0000prune_watermark"); + } + + private static byte[] EncodePruneWatermark(HlcTimestamp cutoff) + { + var value = new byte[12]; + BinaryPrimitives.WriteInt64BigEndian(value.AsSpan(0, 8), cutoff.PhysicalTime); + BinaryPrimitives.WriteInt32BigEndian(value.AsSpan(8, 4), cutoff.LogicalCounter); + return value; + } + + private static byte[] EncodeNodeHeadValue(OplogEntry entry) + { + return EncodeNodeHeadValue(entry.Timestamp.PhysicalTime, entry.Timestamp.LogicalCounter, entry.Hash); + } + + private static byte[] EncodeNodeHeadValue(long wall, int logic, string hash) + { + byte[] hashBytes = EncodeString(hash); + var value = new byte[8 + 4 + hashBytes.Length]; + + BinaryPrimitives.WriteInt64BigEndian(value.AsSpan(0, 8), wall); + BinaryPrimitives.WriteInt32BigEndian(value.AsSpan(8, 4), logic); + Buffer.BlockCopy(hashBytes, 0, value, 12, hashBytes.Length); + + return value; + } + + private static NodeHeadValue DecodeNodeHeadValue(byte[] value) + { + if (!TryDecodeNodeHeadValue(value, out NodeHeadValue headValue)) + throw new InvalidOperationException("Invalid node-head value encoding."); + + return headValue; + } + + private static bool TryDecodeNodeHeadValue(byte[] value, out NodeHeadValue headValue) + { + headValue = default; + if (value.Length < 12) return false; + + long wall = BinaryPrimitives.ReadInt64BigEndian(value.AsSpan(0, 8)); + int logic = BinaryPrimitives.ReadInt32BigEndian(value.AsSpan(8, 4)); + string hash = DecodeString(value.AsSpan(12)); + headValue = new NodeHeadValue(wall, logic, hash); + return true; + } + + private static bool TryDecodeNodeHeadKey(byte[] key, out string nodeId) + { + nodeId = string.Empty; + + int separator = Array.IndexOf(key, (byte)0); + if (separator < 0 || separator + 1 >= key.Length) return false; + + nodeId = DecodeString(key.AsSpan(separator + 1)); + return true; + } + + private static bool TryDecodeHlcKey(byte[] key, out DecodedHlcKey decoded) + { + decoded = default; + + int datasetSeparator = Array.IndexOf(key, (byte)0); + if (datasetSeparator < 0) return false; + + int fixedStart = datasetSeparator + 1; + if (fixedStart + 12 >= key.Length) return false; + + int nodeSeparator = Array.IndexOf(key, (byte)0, fixedStart + 12); + if (nodeSeparator < 0) return false; + + int hashStart = nodeSeparator + 1; + int hashSeparator = Array.IndexOf(key, (byte)0, hashStart); + if (hashSeparator < 0 || hashSeparator + 1 > key.Length) return false; + + long wall = DecodeLongSortable(key.AsSpan(fixedStart, 8)); + int logic = DecodeIntSortable(key.AsSpan(fixedStart + 8, 4)); + + string nodeId = DecodeString(key.AsSpan(hashStart, hashSeparator - hashStart)); + string hash = DecodeString(key.AsSpan(hashSeparator + 1)); + decoded = new DecodedHlcKey(wall, logic, nodeId, hash); + return true; + } + + private static bool TryDecodeNodeHlcKey(byte[] key, out DecodedNodeHlcKey decoded) + { + decoded = default; + + int datasetSeparator = Array.IndexOf(key, (byte)0); + if (datasetSeparator < 0) return false; + + int nodeSeparator = Array.IndexOf(key, (byte)0, datasetSeparator + 1); + if (nodeSeparator < 0) return false; + + int fixedStart = nodeSeparator + 1; + if (fixedStart + 12 > key.Length) return false; + + long wall = DecodeLongSortable(key.AsSpan(fixedStart, 8)); + int logic = DecodeIntSortable(key.AsSpan(fixedStart + 8, 4)); + + string hash = DecodeString(key.AsSpan(fixedStart + 12)); + decoded = new DecodedNodeHlcKey(wall, logic, hash); + return true; + } + + private static byte[] EncodeLongSortable(long value) + { + long sortable = value ^ long.MinValue; + var bytes = new byte[8]; + BinaryPrimitives.WriteInt64BigEndian(bytes, sortable); + return bytes; + } + + private static long DecodeLongSortable(ReadOnlySpan bytes) + { + long sortable = BinaryPrimitives.ReadInt64BigEndian(bytes); + return sortable ^ long.MinValue; + } + + private static byte[] EncodeIntSortable(int value) + { + int sortable = value ^ int.MinValue; + var bytes = new byte[4]; + BinaryPrimitives.WriteInt32BigEndian(bytes, sortable); + return bytes; + } + + private static int DecodeIntSortable(ReadOnlySpan bytes) + { + int sortable = BinaryPrimitives.ReadInt32BigEndian(bytes); + return sortable ^ int.MinValue; + } + + private static bool HasPrefix(byte[] value, byte[] prefix) + { + return value.AsSpan().StartsWith(prefix); + } + + private static IEnumerable<(byte[] Key, byte[] Value)> EnumerateFromCurrent(LightningCursor cursor) + { + var current = cursor.GetCurrent(); + while (current.resultCode == MDBResultCode.Success) + { + yield return (current.key.CopyToNewArray(), current.value.CopyToNewArray()); + current = cursor.Next(); + } + } + + private static byte[] SerializeEntry(OplogEntry entry) + { + var dto = new OplogEntryDto + { + DatasetId = entry.DatasetId, + Collection = entry.Collection, + Key = entry.Key, + Operation = entry.Operation, + Payload = entry.Payload, + PhysicalTime = entry.Timestamp.PhysicalTime, + LogicalCounter = entry.Timestamp.LogicalCounter, + NodeId = entry.Timestamp.NodeId, + PreviousHash = entry.PreviousHash, + Hash = entry.Hash + }; + + return JsonSerializer.SerializeToUtf8Bytes(dto); + } + + private static OplogEntry DeserializeEntry(byte[] payload) + { + OplogEntryDto dto = JsonSerializer.Deserialize(payload) + ?? throw new InvalidOperationException("Failed to deserialize oplog entry payload."); + + JsonElement? clonedPayload = dto.Payload?.Clone(); + return new OplogEntry( + dto.Collection, + dto.Key, + dto.Operation, + clonedPayload, + new HlcTimestamp(dto.PhysicalTime, dto.LogicalCounter, dto.NodeId), + dto.PreviousHash, + dto.Hash, + dto.DatasetId); + } + + private static void IgnoreNotFound(MDBResultCode resultCode) + { + if (resultCode == MDBResultCode.Success || resultCode == MDBResultCode.NotFound) return; + resultCode.ThrowOnError(); + } + + private LightningEnvironment Environment => _environment ?? throw new ObjectDisposedException(nameof(LmdbOplogStore)); + + private LightningDatabase OplogByHashDb => _oplogByHash ?? throw new ObjectDisposedException(nameof(LmdbOplogStore)); + private LightningDatabase OplogByHlcDb => _oplogByHlc ?? throw new ObjectDisposedException(nameof(LmdbOplogStore)); + private LightningDatabase OplogByNodeHlcDb => _oplogByNodeHlc ?? throw new ObjectDisposedException(nameof(LmdbOplogStore)); + private LightningDatabase OplogPrevToHashDb => _oplogPrevToHash ?? throw new ObjectDisposedException(nameof(LmdbOplogStore)); + private LightningDatabase OplogNodeHeadDb => _oplogNodeHead ?? throw new ObjectDisposedException(nameof(LmdbOplogStore)); + private LightningDatabase OplogMetaDb => _oplogMeta ?? throw new ObjectDisposedException(nameof(LmdbOplogStore)); + + private readonly record struct DecodedHlcKey(long Wall, int Logic, string NodeId, string Hash); + private readonly record struct DecodedNodeHlcKey(long Wall, int Logic, string Hash); + private readonly record struct NodeHeadValue(long Wall, int Logic, string Hash); + private readonly record struct PruneCandidate(byte[] HlcKey, OplogEntry Entry); + + private sealed class OplogEntryDto + { + public string DatasetId { get; set; } = global::ZB.MOM.WW.CBDDC.Core.DatasetId.Primary; + public string Collection { get; set; } = string.Empty; + public string Key { get; set; } = string.Empty; + public OperationType Operation { get; set; } + public JsonElement? Payload { get; set; } + public long PhysicalTime { get; set; } + public int LogicalCounter { get; set; } + public string NodeId { get; set; } = string.Empty; + public string PreviousHash { get; set; } = string.Empty; + public string Hash { get; set; } = string.Empty; + } +} + +/// +/// Dataset-scoped LMDB oplog index counts. +/// +public readonly record struct LmdbOplogIndexDiagnostics( + string DatasetId, + long OplogByHashCount, + long OplogByHlcCount, + long OplogByNodeHlcCount, + long OplogPrevToHashCount, + long OplogNodeHeadCount); diff --git a/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/OplogMigrationTelemetry.cs b/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/OplogMigrationTelemetry.cs new file mode 100644 index 0000000..bb6e9f5 --- /dev/null +++ b/src/ZB.MOM.WW.CBDDC.Persistence/Lmdb/OplogMigrationTelemetry.cs @@ -0,0 +1,89 @@ +using System.Collections.Concurrent; +using System.Collections.ObjectModel; +using System.Threading; +using ZB.MOM.WW.CBDDC.Core; + +namespace ZB.MOM.WW.CBDDC.Persistence.Lmdb; + +/// +/// Tracks LMDB migration telemetry for shadow comparisons, reconciliation, and fallback behavior. +/// +public sealed class OplogMigrationTelemetry +{ + private readonly ConcurrentDictionary _reconciliationRunsByDataset = + new(StringComparer.Ordinal); + + private readonly ConcurrentDictionary _reconciledEntriesByDataset = + new(StringComparer.Ordinal); + + private long _shadowComparisons; + private long _shadowMismatches; + private long _preferredReadFallbacks; + private long _reconciliationRuns; + private long _reconciledEntries; + + /// + /// Records the outcome of one shadow comparison. + /// + public void RecordShadowComparison(bool isMatch) + { + Interlocked.Increment(ref _shadowComparisons); + if (!isMatch) Interlocked.Increment(ref _shadowMismatches); + } + + /// + /// Records a fallback from preferred LMDB reads back to Surreal. + /// + public void RecordPreferredReadFallback() + { + Interlocked.Increment(ref _preferredReadFallbacks); + } + + /// + /// Records one reconciliation/backfill run for a dataset. + /// + public void RecordReconciliation(string datasetId, int entriesMerged) + { + string normalizedDatasetId = DatasetId.Normalize(datasetId); + + Interlocked.Increment(ref _reconciliationRuns); + Interlocked.Add(ref _reconciledEntries, Math.Max(0, entriesMerged)); + _reconciliationRunsByDataset.AddOrUpdate(normalizedDatasetId, 1, (_, current) => current + 1); + _reconciledEntriesByDataset.AddOrUpdate( + normalizedDatasetId, + Math.Max(0, entriesMerged), + (_, current) => current + Math.Max(0, entriesMerged)); + } + + /// + /// Returns an immutable snapshot of current counters. + /// + public OplogMigrationTelemetrySnapshot GetSnapshot() + { + var runsByDataset = new ReadOnlyDictionary( + _reconciliationRunsByDataset.ToDictionary(pair => pair.Key, pair => pair.Value, StringComparer.Ordinal)); + var entriesByDataset = new ReadOnlyDictionary( + _reconciledEntriesByDataset.ToDictionary(pair => pair.Key, pair => pair.Value, StringComparer.Ordinal)); + + return new OplogMigrationTelemetrySnapshot( + Interlocked.Read(ref _shadowComparisons), + Interlocked.Read(ref _shadowMismatches), + Interlocked.Read(ref _preferredReadFallbacks), + Interlocked.Read(ref _reconciliationRuns), + Interlocked.Read(ref _reconciledEntries), + runsByDataset, + entriesByDataset); + } +} + +/// +/// Immutable snapshot for LMDB migration telemetry counters. +/// +public readonly record struct OplogMigrationTelemetrySnapshot( + long ShadowComparisons, + long ShadowMismatches, + long PreferredReadFallbacks, + long ReconciliationRuns, + long ReconciledEntries, + IReadOnlyDictionary ReconciliationRunsByDataset, + IReadOnlyDictionary ReconciledEntriesByDataset); diff --git a/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/CBDDCSurrealEmbeddedExtensions.cs b/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/CBDDCSurrealEmbeddedExtensions.cs index d763ccd..ff01354 100644 --- a/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/CBDDCSurrealEmbeddedExtensions.cs +++ b/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/CBDDCSurrealEmbeddedExtensions.cs @@ -124,7 +124,8 @@ public static class CBDDCSurrealEmbeddedExtensions services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); - services.TryAddSingleton(); + services.TryAddSingleton(); + services.TryAddSingleton(sp => sp.GetRequiredService()); // SnapshotStore registration matches the other provider extension patterns. services.TryAddSingleton(); diff --git a/src/ZB.MOM.WW.CBDDC.Persistence/ZB.MOM.WW.CBDDC.Persistence.csproj b/src/ZB.MOM.WW.CBDDC.Persistence/ZB.MOM.WW.CBDDC.Persistence.csproj index b4f4a0c..41ca74d 100755 --- a/src/ZB.MOM.WW.CBDDC.Persistence/ZB.MOM.WW.CBDDC.Persistence.csproj +++ b/src/ZB.MOM.WW.CBDDC.Persistence/ZB.MOM.WW.CBDDC.Persistence.csproj @@ -20,6 +20,7 @@ + diff --git a/tests/ZB.MOM.WW.CBDDC.E2E.Tests/ClusterCrudSyncE2ETests.cs b/tests/ZB.MOM.WW.CBDDC.E2E.Tests/ClusterCrudSyncE2ETests.cs index 2a15da5..7768f2e 100644 --- a/tests/ZB.MOM.WW.CBDDC.E2E.Tests/ClusterCrudSyncE2ETests.cs +++ b/tests/ZB.MOM.WW.CBDDC.E2E.Tests/ClusterCrudSyncE2ETests.cs @@ -10,6 +10,7 @@ using ZB.MOM.WW.CBDDC.Core.Storage; using ZB.MOM.WW.CBDDC.Core.Sync; using ZB.MOM.WW.CBDDC.Network; using ZB.MOM.WW.CBDDC.Network.Security; +using ZB.MOM.WW.CBDDC.Persistence.Lmdb; using ZB.MOM.WW.CBDDC.Persistence.Surreal; namespace ZB.MOM.WW.CBDDC.E2E.Tests; @@ -240,6 +241,92 @@ public class ClusterCrudSyncE2ETests }, 60, "Node B did not catch up missed reconnect mutations.", () => BuildDiagnostics(nodeA, nodeB)); } + /// + /// Verifies reconnect catch-up still works when reads are cut over to LMDB with dual-write enabled. + /// + [Fact] + public async Task PeerReconnect_ShouldCatchUpMissedChanges_WithLmdbPreferredReads() + { + var clusterToken = Guid.NewGuid().ToString("N"); + int nodeAPort = GetAvailableTcpPort(); + int nodeBPort = GetAvailableTcpPort(); + while (nodeBPort == nodeAPort) nodeBPort = GetAvailableTcpPort(); + + await using var nodeA = TestPeerNode.Create( + "node-a", + nodeAPort, + clusterToken, + [ + new KnownPeerConfiguration + { + NodeId = "node-b", + Host = "127.0.0.1", + Port = nodeBPort + } + ], + useLmdbOplog: true, + dualWriteOplog: true, + preferLmdbReads: true); + + await using var nodeB = TestPeerNode.Create( + "node-b", + nodeBPort, + clusterToken, + [ + new KnownPeerConfiguration + { + NodeId = "node-a", + Host = "127.0.0.1", + Port = nodeAPort + } + ], + useLmdbOplog: true, + dualWriteOplog: true, + preferLmdbReads: true); + + await nodeA.StartAsync(); + await nodeB.StartAsync(); + + await nodeB.StopAsync(); + + const string userId = "reconnect-lmdb-user"; + await nodeA.UpsertUserAsync(new User + { + Id = userId, + Name = "Offline Create", + Age = 20, + Address = new Address { City = "Rome" } + }); + + await nodeA.UpsertUserAsync(new User + { + Id = userId, + Name = "Offline Update", + Age = 21, + Address = new Address { City = "Milan" } + }); + + await nodeA.UpsertUserAsync(new User + { + Id = userId, + Name = "Offline Final", + Age = 22, + Address = new Address { City = "Turin" } + }); + + await nodeB.StartAsync(); + + await AssertEventuallyAsync(() => + { + var replicated = nodeB.ReadUser(userId); + return replicated is not null && + replicated.Name == "Offline Final" && + replicated.Age == 22 && + replicated.Address?.City == "Turin"; + }, 60, "Node B did not catch up missed reconnect mutations with LMDB preferred reads.", + () => BuildDiagnostics(nodeA, nodeB)); + } + /// /// Verifies a burst of rapid multi-node mutations converges to a deterministic final state. /// @@ -572,6 +659,9 @@ public class ClusterCrudSyncE2ETests /// An optional working directory override for test artifacts. /// A value indicating whether to preserve the working directory on dispose. /// A value indicating whether to inject a checkpoint persistence that fails once. + /// A value indicating whether to enable the LMDB oplog migration path. + /// A value indicating whether oplog writes should be mirrored to Surreal + LMDB. + /// A value indicating whether reads should prefer LMDB. /// A configured instance. public static TestPeerNode Create( string nodeId, @@ -580,7 +670,10 @@ public class ClusterCrudSyncE2ETests IReadOnlyList knownPeers, string? workDirOverride = null, bool preserveWorkDirOnDispose = false, - bool useFaultInjectedCheckpointStore = false) + bool useFaultInjectedCheckpointStore = false, + bool useLmdbOplog = false, + bool dualWriteOplog = true, + bool preferLmdbReads = false) { string workDir = workDirOverride ?? Path.Combine(Path.GetTempPath(), $"cbddc-e2e-{nodeId}-{Guid.NewGuid():N}"); Directory.CreateDirectory(workDir); @@ -620,13 +713,47 @@ public class ClusterCrudSyncE2ETests if (useFaultInjectedCheckpointStore) { services.AddSingleton(); - coreBuilder.AddCBDDCSurrealEmbedded(surrealOptionsFactory) - .AddCBDDCNetwork(false); + var registration = coreBuilder.AddCBDDCSurrealEmbedded(surrealOptionsFactory); + if (useLmdbOplog) + registration.AddCBDDCLmdbOplog( + _ => new LmdbOplogOptions + { + EnvironmentPath = Path.Combine(workDir, "oplog-lmdb"), + MapSizeBytes = 128L * 1024 * 1024, + MaxDatabases = 16, + PruneBatchSize = 256 + }, + flags => + { + flags.UseLmdbOplog = true; + flags.DualWriteOplog = dualWriteOplog; + flags.PreferLmdbReads = preferLmdbReads; + flags.ReconciliationInterval = TimeSpan.Zero; + }); + + registration.AddCBDDCNetwork(false); } else { - coreBuilder.AddCBDDCSurrealEmbedded(surrealOptionsFactory) - .AddCBDDCNetwork(false); + var registration = coreBuilder.AddCBDDCSurrealEmbedded(surrealOptionsFactory); + if (useLmdbOplog) + registration.AddCBDDCLmdbOplog( + _ => new LmdbOplogOptions + { + EnvironmentPath = Path.Combine(workDir, "oplog-lmdb"), + MapSizeBytes = 128L * 1024 * 1024, + MaxDatabases = 16, + PruneBatchSize = 256 + }, + flags => + { + flags.UseLmdbOplog = true; + flags.DualWriteOplog = dualWriteOplog; + flags.PreferLmdbReads = preferLmdbReads; + flags.ReconciliationInterval = TimeSpan.Zero; + }); + + registration.AddCBDDCNetwork(false); } // Deterministic tests: sync uses explicit known peers, so disable UDP discovery. diff --git a/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/LmdbOplogMigrationTests.cs b/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/LmdbOplogMigrationTests.cs new file mode 100644 index 0000000..f10c792 --- /dev/null +++ b/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/LmdbOplogMigrationTests.cs @@ -0,0 +1,237 @@ +using System.Text.Json; +using Microsoft.Extensions.Logging.Abstractions; +using NSubstitute; +using ZB.MOM.WW.CBDDC.Core; +using ZB.MOM.WW.CBDDC.Core.Storage; +using ZB.MOM.WW.CBDDC.Core.Sync; +using ZB.MOM.WW.CBDDC.Persistence; +using ZB.MOM.WW.CBDDC.Persistence.Lmdb; +using ZB.MOM.WW.CBDDC.Persistence.Surreal; + +namespace ZB.MOM.WW.CBDDC.Sample.Console.Tests; + +public class LmdbOplogMigrationTests +{ + [Fact] + public async Task FeatureFlags_DualWrite_WritesToBothStores() + { + await using var surrealHarness = new SurrealTestHarness(); + var surrealStore = surrealHarness.CreateOplogStore(); + using var lmdbStore = CreateLmdbStore(); + + var flags = new LmdbOplogFeatureFlags + { + UseLmdbOplog = true, + DualWriteOplog = true, + PreferLmdbReads = false + }; + + var store = new FeatureFlagOplogStore( + surrealStore, + lmdbStore, + flags, + logger: NullLogger.Instance); + + var entry = CreateEntry("Users", "dual-write", "node-a", 100, 0, ""); + await store.AppendOplogEntryAsync(entry); + + (await surrealStore.GetEntryByHashAsync(entry.Hash)).ShouldNotBeNull(); + (await lmdbStore.GetEntryByHashAsync(entry.Hash)).ShouldNotBeNull(); + } + + [Fact] + public async Task FeatureFlags_PreferLmdbReads_ReconcilesFromSurrealWhenLmdbMissingEntries() + { + await using var surrealHarness = new SurrealTestHarness(); + var surrealStore = surrealHarness.CreateOplogStore(); + using var lmdbStore = CreateLmdbStore(); + + var flags = new LmdbOplogFeatureFlags + { + UseLmdbOplog = true, + DualWriteOplog = false, + PreferLmdbReads = true, + ReconciliationInterval = TimeSpan.Zero + }; + + var entry = CreateEntry("Users", "reconcile-1", "node-a", 200, 0, ""); + + // Simulate crash window where only Surreal persisted before LMDB migration store starts. + await surrealStore.AppendOplogEntryAsync(entry); + (await lmdbStore.GetEntryByHashAsync(entry.Hash)).ShouldBeNull(); + + var store = new FeatureFlagOplogStore( + surrealStore, + lmdbStore, + flags, + logger: NullLogger.Instance); + + OplogEntry? resolved = await store.GetEntryByHashAsync(entry.Hash); + resolved.ShouldNotBeNull(); + resolved.Hash.ShouldBe(entry.Hash); + + // Reconciliation should have backfilled LMDB. + (await lmdbStore.GetEntryByHashAsync(entry.Hash)).ShouldNotBeNull(); + + OplogMigrationTelemetrySnapshot telemetry = store.GetTelemetrySnapshot(); + telemetry.ReconciliationRuns.ShouldBeGreaterThanOrEqualTo(1); + telemetry.ReconciledEntries.ShouldBeGreaterThanOrEqualTo(1); + } + + [Fact] + public async Task FeatureFlags_ShadowValidation_RecordsMismatchTelemetry() + { + await using var surrealHarness = new SurrealTestHarness(); + var surrealStore = surrealHarness.CreateOplogStore(); + using var lmdbStore = CreateLmdbStore(); + var telemetry = new OplogMigrationTelemetry(); + + var flags = new LmdbOplogFeatureFlags + { + UseLmdbOplog = true, + DualWriteOplog = true, + PreferLmdbReads = false, + EnableReadShadowValidation = true + }; + + var store = new FeatureFlagOplogStore( + surrealStore, + lmdbStore, + flags, + telemetry, + NullLogger.Instance); + + var entry = CreateEntry("Users", "shadow-mismatch-1", "node-a", 210, 0, ""); + await surrealStore.AppendOplogEntryAsync(entry); + + OplogEntry? resolved = await store.GetEntryByHashAsync(entry.Hash); + resolved.ShouldNotBeNull(); + + OplogMigrationTelemetrySnapshot snapshot = store.GetTelemetrySnapshot(); + snapshot.ShadowComparisons.ShouldBe(1); + snapshot.ShadowMismatches.ShouldBe(1); + } + + [Fact] + public async Task FeatureFlags_RollbackToSurreal_UsesSurrealForWritesAndReads() + { + await using var surrealHarness = new SurrealTestHarness(); + var surrealStore = surrealHarness.CreateOplogStore(); + using var lmdbStore = CreateLmdbStore(); + + var flags = new LmdbOplogFeatureFlags + { + UseLmdbOplog = true, + DualWriteOplog = false, + PreferLmdbReads = false + }; + + var store = new FeatureFlagOplogStore( + surrealStore, + lmdbStore, + flags, + logger: NullLogger.Instance); + + var entry = CreateEntry("Users", "rollback-1", "node-a", 220, 0, ""); + await store.AppendOplogEntryAsync(entry); + + (await surrealStore.GetEntryByHashAsync(entry.Hash)).ShouldNotBeNull(); + (await lmdbStore.GetEntryByHashAsync(entry.Hash)).ShouldBeNull(); + + OplogEntry? routedRead = await store.GetEntryByHashAsync(entry.Hash); + routedRead.ShouldNotBeNull(); + routedRead.Hash.ShouldBe(entry.Hash); + } + + [Fact] + public async Task BackfillTool_BackfillAndValidate_ReportsSuccess() + { + await using var surrealHarness = new SurrealTestHarness(); + var surrealStore = surrealHarness.CreateOplogStore(); + using var lmdbStore = CreateLmdbStore(); + var tool = new LmdbOplogBackfillTool(surrealStore, lmdbStore, NullLogger.Instance); + + var first = CreateEntry("Users", "backfill-1", "node-a", 300, 0, ""); + var second = CreateEntry("Users", "backfill-2", "node-a", 301, 0, first.Hash); + var third = CreateEntry("Users", "backfill-3", "node-b", 302, 0, ""); + var fourth = CreateEntry("Users", "backfill-4", "node-b", 303, 0, third.Hash); + + await surrealStore.AppendOplogEntryAsync(first); + await surrealStore.AppendOplogEntryAsync(second); + await surrealStore.AppendOplogEntryAsync(third); + await surrealStore.AppendOplogEntryAsync(fourth); + + LmdbOplogBackfillReport report = await tool.BackfillAsync(DatasetId.Primary); + + report.IsSuccess.ShouldBeTrue(); + report.CountsMatch.ShouldBeTrue(); + report.CountsPerNodeMatch.ShouldBeTrue(); + report.LatestHashPerNodeMatch.ShouldBeTrue(); + report.HashSpotChecksPassed.ShouldBeTrue(); + report.ChainSpotChecksPassed.ShouldBeTrue(); + report.SourceCount.ShouldBe(4); + report.DestinationCount.ShouldBe(4); + } + + [Fact] + public async Task BackfillTool_BackfillAndValidate_WorksPerDataset() + { + await using var surrealHarness = new SurrealTestHarness(); + var surrealStore = surrealHarness.CreateOplogStore(); + using var lmdbStore = CreateLmdbStore(); + var tool = new LmdbOplogBackfillTool(surrealStore, lmdbStore, NullLogger.Instance); + + var logsEntryA = CreateEntry("Logs", "log-1", "node-a", 400, 0, ""); + var logsEntryB = CreateEntry("Logs", "log-2", "node-a", 401, 0, logsEntryA.Hash); + var primaryEntry = CreateEntry("Users", "primary-1", "node-a", 500, 0, ""); + + await surrealStore.AppendOplogEntryAsync(logsEntryA, DatasetId.Logs); + await surrealStore.AppendOplogEntryAsync(logsEntryB, DatasetId.Logs); + await surrealStore.AppendOplogEntryAsync(primaryEntry, DatasetId.Primary); + + LmdbOplogBackfillReport logsReport = await tool.BackfillAsync(DatasetId.Logs); + logsReport.IsSuccess.ShouldBeTrue(); + logsReport.SourceCount.ShouldBe(2); + logsReport.DestinationCount.ShouldBe(2); + + (await lmdbStore.GetEntryByHashAsync(logsEntryA.Hash, DatasetId.Logs)).ShouldNotBeNull(); + (await lmdbStore.GetEntryByHashAsync(logsEntryB.Hash, DatasetId.Logs)).ShouldNotBeNull(); + (await lmdbStore.GetEntryByHashAsync(primaryEntry.Hash, DatasetId.Primary)).ShouldBeNull(); + } + + private static LmdbOplogStore CreateLmdbStore() + { + string rootPath = Path.Combine(Path.GetTempPath(), "cbddc-lmdb-migration", Guid.NewGuid().ToString("N")); + Directory.CreateDirectory(rootPath); + + return new LmdbOplogStore( + Substitute.For(), + new LastWriteWinsConflictResolver(), + new VectorClockService(), + new LmdbOplogOptions + { + EnvironmentPath = rootPath, + MapSizeBytes = 64L * 1024 * 1024, + MaxDatabases = 16 + }, + null, + NullLogger.Instance); + } + + private static OplogEntry CreateEntry( + string collection, + string key, + string nodeId, + long wall, + int logic, + string previousHash) + { + return new OplogEntry( + collection, + key, + OperationType.Put, + JsonSerializer.SerializeToElement(new { key }), + new HlcTimestamp(wall, logic, nodeId), + previousHash); + } +} diff --git a/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/LmdbOplogStoreContractTests.cs b/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/LmdbOplogStoreContractTests.cs new file mode 100644 index 0000000..9943633 --- /dev/null +++ b/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/LmdbOplogStoreContractTests.cs @@ -0,0 +1,267 @@ +using System.Diagnostics; +using Microsoft.Extensions.Logging.Abstractions; +using NSubstitute; +using ZB.MOM.WW.CBDDC.Core; +using ZB.MOM.WW.CBDDC.Core.Storage; +using ZB.MOM.WW.CBDDC.Core.Sync; +using ZB.MOM.WW.CBDDC.Persistence; +using ZB.MOM.WW.CBDDC.Persistence.Lmdb; +using ZB.MOM.WW.CBDDC.Persistence.Surreal; + +namespace ZB.MOM.WW.CBDDC.Sample.Console.Tests; + +public class SurrealOplogStoreContractParityTests : OplogStoreContractTestBase +{ + protected override Task CreateHarnessAsync() + { + return Task.FromResult(new SurrealOplogStoreContractHarness()); + } +} + +public class LmdbOplogStoreContractTests : OplogStoreContractTestBase +{ + protected override Task CreateHarnessAsync() + { + return Task.FromResult(new LmdbOplogStoreContractHarness()); + } + + [Fact] + public async Task Lmdb_IndexConsistency_InsertPopulatesAndPruneRemovesIndexes() + { + await using var harness = new LmdbOplogStoreContractHarness(); + var store = (LmdbOplogStore)harness.Store; + + var entry1 = CreateOplogEntry("Users", "i1", "node-a", 100, 0, ""); + var entry2 = CreateOplogEntry("Users", "i2", "node-a", 101, 0, entry1.Hash); + + await store.AppendOplogEntryAsync(entry1); + await store.AppendOplogEntryAsync(entry2); + + LmdbOplogIndexDiagnostics before = await store.GetIndexDiagnosticsAsync(DatasetId.Primary); + before.OplogByHashCount.ShouldBe(2); + before.OplogByHlcCount.ShouldBe(2); + before.OplogByNodeHlcCount.ShouldBe(2); + before.OplogPrevToHashCount.ShouldBe(1); + before.OplogNodeHeadCount.ShouldBe(1); + + await store.PruneOplogAsync(new HlcTimestamp(101, 0, "node-a")); + + LmdbOplogIndexDiagnostics after = await store.GetIndexDiagnosticsAsync(DatasetId.Primary); + after.OplogByHashCount.ShouldBe(0); + after.OplogByHlcCount.ShouldBe(0); + after.OplogByNodeHlcCount.ShouldBe(0); + after.OplogPrevToHashCount.ShouldBe(0); + after.OplogNodeHeadCount.ShouldBe(0); + } + + [Fact] + public async Task Lmdb_Prune_RemovesAtOrBeforeCutoff_AndKeepsNewerInterleavedEntries() + { + await using var harness = new LmdbOplogStoreContractHarness(); + IOplogStore store = harness.Store; + + var nodeAOld = CreateOplogEntry("Users", "a-old", "node-a", 100, 0, ""); + var nodeBKeep = CreateOplogEntry("Users", "b-keep", "node-b", 105, 0, ""); + var nodeANew = CreateOplogEntry("Users", "a-new", "node-a", 110, 0, nodeAOld.Hash); + var lateOld = CreateOplogEntry("Users", "late-old", "node-c", 90, 0, ""); + + await store.AppendOplogEntryAsync(nodeAOld); + await store.AppendOplogEntryAsync(nodeBKeep); + await store.AppendOplogEntryAsync(nodeANew); + await store.AppendOplogEntryAsync(lateOld); + + await store.PruneOplogAsync(new HlcTimestamp(100, 0, "node-a")); + + var remaining = (await store.ExportAsync()).Select(e => e.Hash).ToHashSet(StringComparer.Ordinal); + remaining.Contains(nodeAOld.Hash).ShouldBeFalse(); + remaining.Contains(lateOld.Hash).ShouldBeFalse(); + remaining.Contains(nodeBKeep.Hash).ShouldBeTrue(); + remaining.Contains(nodeANew.Hash).ShouldBeTrue(); + } + + [Fact] + public async Task Lmdb_NodeHead_AdvancesAndRecomputesAcrossPrune() + { + await using var harness = new LmdbOplogStoreContractHarness(); + IOplogStore store = harness.Store; + + var older = CreateOplogEntry("Users", "n1", "node-a", 100, 0, ""); + var newer = CreateOplogEntry("Users", "n2", "node-a", 120, 0, older.Hash); + + await store.AppendOplogEntryAsync(older); + await store.AppendOplogEntryAsync(newer); + + (await store.GetLastEntryHashAsync("node-a")).ShouldBe(newer.Hash); + + await store.PruneOplogAsync(new HlcTimestamp(110, 0, "node-a")); + (await store.GetLastEntryHashAsync("node-a")).ShouldBe(newer.Hash); + + await store.PruneOplogAsync(new HlcTimestamp(130, 0, "node-a")); + (await store.GetLastEntryHashAsync("node-a")).ShouldBeNull(); + } + + [Fact] + public async Task Lmdb_RestartDurability_PreservesHeadAndScans() + { + await using var harness = new LmdbOplogStoreContractHarness(); + IOplogStore store = harness.Store; + + var entry1 = CreateOplogEntry("Users", "r1", "node-a", 100, 0, ""); + var entry2 = CreateOplogEntry("Users", "r2", "node-a", 101, 0, entry1.Hash); + + await store.AppendOplogEntryAsync(entry1); + await store.AppendOplogEntryAsync(entry2); + + IOplogStore reopened = harness.ReopenStore(); + (await reopened.GetLastEntryHashAsync("node-a")).ShouldBe(entry2.Hash); + + var after = (await reopened.GetOplogAfterAsync(new HlcTimestamp(0, 0, string.Empty))).ToList(); + after.Count.ShouldBe(2); + after[0].Hash.ShouldBe(entry1.Hash); + after[1].Hash.ShouldBe(entry2.Hash); + } + + [Fact] + public async Task Lmdb_Dedupe_DuplicateHashAppendIsIdempotent() + { + await using var harness = new LmdbOplogStoreContractHarness(); + IOplogStore store = harness.Store; + + var entry = CreateOplogEntry("Users", "d1", "node-a", 100, 0, ""); + + await store.AppendOplogEntryAsync(entry); + await store.AppendOplogEntryAsync(entry); + + var exported = (await store.ExportAsync()).ToList(); + exported.Count.ShouldBe(1); + exported[0].Hash.ShouldBe(entry.Hash); + } + + [Fact] + public async Task Lmdb_PrunePerformanceSmoke_LargeSyntheticWindow_CompletesWithinGenerousBudget() + { + await using var harness = new LmdbOplogStoreContractHarness(); + IOplogStore store = harness.Store; + + string previousHash = string.Empty; + for (var i = 0; i < 5000; i++) + { + var entry = CreateOplogEntry("Users", $"p-{i:D5}", "node-a", 1_000 + i, 0, previousHash); + await store.AppendOplogEntryAsync(entry); + previousHash = entry.Hash; + } + + var sw = Stopwatch.StartNew(); + await store.PruneOplogAsync(new HlcTimestamp(6_000, 0, "node-a")); + sw.Stop(); + + sw.ElapsedMilliseconds.ShouldBeLessThan(30_000L); + (await store.ExportAsync()).ShouldBeEmpty(); + } +} + +internal sealed class SurrealOplogStoreContractHarness : IOplogStoreContractHarness +{ + private readonly SurrealTestHarness _harness; + + public SurrealOplogStoreContractHarness() + { + _harness = new SurrealTestHarness(); + Store = _harness.CreateOplogStore(); + } + + public IOplogStore Store { get; private set; } + + public IOplogStore ReopenStore() + { + Store = _harness.CreateOplogStore(); + return Store; + } + + public Task AppendOplogEntryAsync(OplogEntry entry, string datasetId, CancellationToken cancellationToken = default) + { + return ((SurrealOplogStore)Store).AppendOplogEntryAsync(entry, datasetId, cancellationToken); + } + + public Task> ExportAsync(string datasetId, CancellationToken cancellationToken = default) + { + return ((SurrealOplogStore)Store).ExportAsync(datasetId, cancellationToken); + } + + public ValueTask DisposeAsync() + { + return _harness.DisposeAsync(); + } +} + +internal sealed class LmdbOplogStoreContractHarness : IOplogStoreContractHarness +{ + private readonly string _rootPath; + private LmdbOplogStore? _store; + + public LmdbOplogStoreContractHarness() + { + _rootPath = Path.Combine(Path.GetTempPath(), "cbddc-lmdb-tests", Guid.NewGuid().ToString("N")); + Directory.CreateDirectory(_rootPath); + _store = CreateStore(); + } + + public IOplogStore Store => _store ?? throw new ObjectDisposedException(nameof(LmdbOplogStoreContractHarness)); + + public IOplogStore ReopenStore() + { + _store?.Dispose(); + _store = CreateStore(); + return _store; + } + + public Task AppendOplogEntryAsync(OplogEntry entry, string datasetId, CancellationToken cancellationToken = default) + { + return (_store ?? throw new ObjectDisposedException(nameof(LmdbOplogStoreContractHarness))) + .AppendOplogEntryAsync(entry, datasetId, cancellationToken); + } + + public Task> ExportAsync(string datasetId, CancellationToken cancellationToken = default) + { + return (_store ?? throw new ObjectDisposedException(nameof(LmdbOplogStoreContractHarness))) + .ExportAsync(datasetId, cancellationToken); + } + + public async ValueTask DisposeAsync() + { + _store?.Dispose(); + _store = null; + + for (var attempt = 0; attempt < 5; attempt++) + try + { + if (Directory.Exists(_rootPath)) + Directory.Delete(_rootPath, true); + break; + } + catch when (attempt < 4) + { + await Task.Delay(50); + } + } + + private LmdbOplogStore CreateStore() + { + string lmdbPath = Path.Combine(_rootPath, "lmdb-oplog"); + Directory.CreateDirectory(lmdbPath); + + return new LmdbOplogStore( + Substitute.For(), + new LastWriteWinsConflictResolver(), + new VectorClockService(), + new LmdbOplogOptions + { + EnvironmentPath = lmdbPath, + MapSizeBytes = 64L * 1024 * 1024, + MaxDatabases = 16, + PruneBatchSize = 128 + }, + null, + NullLogger.Instance); + } +} diff --git a/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/OplogStoreContractTestBase.cs b/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/OplogStoreContractTestBase.cs new file mode 100644 index 0000000..9b6efa3 --- /dev/null +++ b/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/OplogStoreContractTestBase.cs @@ -0,0 +1,121 @@ +using System.Text.Json; +using ZB.MOM.WW.CBDDC.Core; +using ZB.MOM.WW.CBDDC.Core.Storage; + +namespace ZB.MOM.WW.CBDDC.Sample.Console.Tests; + +public abstract class OplogStoreContractTestBase +{ + [Fact] + public async Task OplogStore_AppendQueryMergeDrop_AndLastHash_Works() + { + await using var harness = await CreateHarnessAsync(); + IOplogStore store = harness.Store; + + var entry1 = CreateOplogEntry("Users", "u1", "node-a", 100, 0, ""); + var entry2 = CreateOplogEntry("Users", "u2", "node-a", 110, 0, entry1.Hash); + var entry3 = CreateOplogEntry("Users", "u3", "node-a", 120, 1, entry2.Hash); + var otherNode = CreateOplogEntry("Users", "u4", "node-b", 115, 0, ""); + + await store.AppendOplogEntryAsync(entry1); + await store.AppendOplogEntryAsync(entry2); + await store.AppendOplogEntryAsync(entry3); + await store.AppendOplogEntryAsync(otherNode); + + var chainRange = (await store.GetChainRangeAsync(entry1.Hash, entry3.Hash)).ToList(); + chainRange.Select(x => x.Hash).ToList().ShouldBe(new[] { entry2.Hash, entry3.Hash }); + + var after = (await store.GetOplogAfterAsync(new HlcTimestamp(100, 0, "node-a"))).ToList(); + after.Select(x => x.Hash).ToList().ShouldBe(new[] { entry2.Hash, otherNode.Hash, entry3.Hash }); + + var mergedEntry = CreateOplogEntry("Users", "u5", "node-a", 130, 0, entry3.Hash); + await store.MergeAsync(new[] { entry2, mergedEntry }); + + var exported = (await store.ExportAsync()).ToList(); + exported.Count.ShouldBe(5); + exported.Count(x => x.Hash == entry2.Hash).ShouldBe(1); + + string? cachedLastNodeAHash = await store.GetLastEntryHashAsync("node-a"); + cachedLastNodeAHash.ShouldBe(entry3.Hash); + + IOplogStore rehydratedStore = harness.ReopenStore(); + string? persistedLastNodeAHash = await rehydratedStore.GetLastEntryHashAsync("node-a"); + persistedLastNodeAHash.ShouldBe(mergedEntry.Hash); + + await rehydratedStore.DropAsync(); + (await rehydratedStore.ExportAsync()).ShouldBeEmpty(); + } + + [Fact] + public async Task OplogStore_DatasetIsolation_Works() + { + await using var harness = await CreateHarnessAsync(); + IOplogStore store = harness.Store; + + var primaryEntry = CreateOplogEntry("Users", "p1", "node-a", 100, 0, ""); + var logsEntry = CreateOplogEntry("Users", "l1", "node-a", 100, 1, ""); + + await harness.AppendOplogEntryAsync(primaryEntry, DatasetId.Primary); + await harness.AppendOplogEntryAsync(logsEntry, DatasetId.Logs); + + var primary = (await harness.ExportAsync(DatasetId.Primary)).ToList(); + var logs = (await harness.ExportAsync(DatasetId.Logs)).ToList(); + + primary.Count.ShouldBe(1); + primary[0].DatasetId.ShouldBe(DatasetId.Primary); + + logs.Count.ShouldBe(1); + logs[0].DatasetId.ShouldBe(DatasetId.Logs); + } + + [Fact] + public async Task OplogStore_GetChainRangeAsync_ReturnsOrderedLinkedRange() + { + await using var harness = await CreateHarnessAsync(); + IOplogStore store = harness.Store; + + var entry1 = CreateOplogEntry("Users", "k1", "node1", 1000, 0, ""); + var entry2 = CreateOplogEntry("Users", "k2", "node1", 2000, 0, entry1.Hash); + var entry3 = CreateOplogEntry("Users", "k3", "node1", 3000, 0, entry2.Hash); + + await store.AppendOplogEntryAsync(entry1); + await store.AppendOplogEntryAsync(entry2); + await store.AppendOplogEntryAsync(entry3); + + var range = (await store.GetChainRangeAsync(entry1.Hash, entry3.Hash)).ToList(); + + range.Count.ShouldBe(2); + range[0].Hash.ShouldBe(entry2.Hash); + range[1].Hash.ShouldBe(entry3.Hash); + } + + protected abstract Task CreateHarnessAsync(); + + protected static OplogEntry CreateOplogEntry( + string collection, + string key, + string nodeId, + long wall, + int logic, + string previousHash) + { + return new OplogEntry( + collection, + key, + OperationType.Put, + JsonSerializer.SerializeToElement(new { key }), + new HlcTimestamp(wall, logic, nodeId), + previousHash); + } +} + +public interface IOplogStoreContractHarness : IAsyncDisposable +{ + IOplogStore Store { get; } + + IOplogStore ReopenStore(); + + Task AppendOplogEntryAsync(OplogEntry entry, string datasetId, CancellationToken cancellationToken = default); + + Task> ExportAsync(string datasetId, CancellationToken cancellationToken = default); +}