using System.Text.Json; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using ZB.MOM.WW.CBDDC.Core; using ZB.MOM.WW.CBDDC.Core.Network; using ZB.MOM.WW.CBDDC.Core.Storage; using ZB.MOM.WW.CBDDC.Core.Sync; using ZB.MOM.WW.CBDDC.Persistence; using ZB.MOM.WW.CBDDC.Persistence.Surreal; namespace ZB.MOM.WW.CBDDC.Sample.Console.Tests; [Collection("SurrealCdcDurability")] public class SurrealCdcDurabilityTests { /// /// Verifies checkpoints persist latest local changes per consumer across restarts. /// [Fact] public async Task CheckpointPersistence_ShouldTrackLatestLocalChange_AndPersistPerConsumer() { string dbPath = CreateTemporaryDatabasePath(); const string nodeId = "node-checkpoint"; const string defaultConsumer = "consumer-default"; const string secondaryConsumer = "consumer-secondary"; try { HlcTimestamp expectedTimestamp = default; string expectedHash = ""; DateTimeOffset previousUpdatedUtc = DateTimeOffset.MinValue; await using (var harness = await CdcTestHarness.OpenWithRetriesAsync(dbPath, nodeId, defaultConsumer)) { var user = CreateUser("checkpoint-user", "Alice", 30, "Austin"); await harness.Context.Users.InsertAsync(user); await harness.Context.SaveChangesAsync(); await harness.PollAsync(); user.Age = 31; user.Address = new Address { City = "Dallas" }; await harness.Context.Users.UpdateAsync(user); await harness.Context.SaveChangesAsync(); await harness.PollAsync(); await WaitForConditionAsync( async () => (await harness.GetEntriesByKeyAsync("Users", "checkpoint-user")).Count >= 2, "Timed out waiting for checkpoint-user oplog entries."); var entries = await harness.GetEntriesByKeyAsync("Users", "checkpoint-user"); entries.Count.ShouldBe(2); expectedTimestamp = entries[^1].Timestamp; expectedHash = entries[^1].Hash; var checkpoint = await harness.CheckpointPersistence.GetCheckpointAsync(); checkpoint.ShouldNotBeNull(); checkpoint!.Timestamp.ShouldBe(expectedTimestamp); checkpoint.LastHash.ShouldBe(expectedHash); previousUpdatedUtc = checkpoint.UpdatedUtc; await harness.CheckpointPersistence.UpsertCheckpointAsync( entries[0].Timestamp, entries[0].Hash, secondaryConsumer); var secondary = await harness.CheckpointPersistence.GetCheckpointAsync(secondaryConsumer); secondary.ShouldNotBeNull(); secondary!.Timestamp.ShouldBe(entries[0].Timestamp); secondary.LastHash.ShouldBe(entries[0].Hash); } await using (var restarted = await CdcTestHarness.OpenWithRetriesAsync(dbPath, nodeId, defaultConsumer)) { var restoredDefault = await restarted.CheckpointPersistence.GetCheckpointAsync(); restoredDefault.ShouldNotBeNull(); restoredDefault!.Timestamp.ShouldBe(expectedTimestamp); restoredDefault.LastHash.ShouldBe(expectedHash); restoredDefault.UpdatedUtc.ShouldBe(previousUpdatedUtc); var restoredSecondary = await restarted.CheckpointPersistence.GetCheckpointAsync(secondaryConsumer); restoredSecondary.ShouldNotBeNull(); restoredSecondary!.LastHash.ShouldNotBe(restoredDefault.LastHash); } } finally { await DeleteDirectoryWithRetriesAsync(dbPath); } } /// /// Verifies recovery resumes from a persisted checkpoint and advances after catch-up. /// [Fact] public async Task RestartRecovery_ShouldResumeCatchUpFromPersistedCheckpoint_InRocksDb() { string dbPath = CreateTemporaryDatabasePath(); const string nodeId = "node-resume"; const string consumerId = "consumer-resume"; HlcTimestamp resumeTimestamp = default; string resumeHash = ""; string expectedFinalHash = ""; try { await using (var initial = await CdcTestHarness.OpenWithRetriesAsync(dbPath, nodeId, consumerId)) { await initial.Context.Users.InsertAsync(CreateUser("resume-1", "User One", 18, "Rome")); await initial.Context.SaveChangesAsync(); await initial.PollAsync(); await initial.Context.Users.InsertAsync(CreateUser("resume-2", "User Two", 19, "Milan")); await initial.Context.SaveChangesAsync(); await initial.PollAsync(); await WaitForConditionAsync( async () => (await initial.GetEntriesByCollectionAsync("Users")).Count >= 2, "Timed out waiting for resume oplog entries."); var entries = await initial.GetEntriesByCollectionAsync("Users"); entries.Count.ShouldBe(2); resumeTimestamp = entries[0].Timestamp; resumeHash = entries[0].Hash; expectedFinalHash = entries[1].Hash; await initial.CheckpointPersistence.UpsertCheckpointAsync(resumeTimestamp, resumeHash); } await using (var restarted = await CdcTestHarness.OpenWithRetriesAsync(dbPath, nodeId, consumerId)) { var checkpoint = await restarted.CheckpointPersistence.GetCheckpointAsync(); checkpoint.ShouldNotBeNull(); checkpoint!.Timestamp.ShouldBe(resumeTimestamp); checkpoint.LastHash.ShouldBe(resumeHash); var catchUp = (await restarted.OplogStore.GetOplogAfterAsync(checkpoint.Timestamp)) .OrderBy(e => e.Timestamp.PhysicalTime) .ThenBy(e => e.Timestamp.LogicalCounter) .ToList(); catchUp.Count.ShouldBe(1); catchUp[0].Hash.ShouldBe(expectedFinalHash); await restarted.CheckpointPersistence.AdvanceCheckpointAsync(catchUp[0]); } await using (var recovered = await CdcTestHarness.OpenWithRetriesAsync(dbPath, nodeId, consumerId)) { var finalCheckpoint = await recovered.CheckpointPersistence.GetCheckpointAsync(); finalCheckpoint.ShouldNotBeNull(); finalCheckpoint!.LastHash.ShouldBe(expectedFinalHash); var remaining = await recovered.OplogStore.GetOplogAfterAsync(finalCheckpoint.Timestamp); remaining.ShouldBeEmpty(); } } finally { await DeleteDirectoryWithRetriesAsync(dbPath); } } /// /// Verifies duplicate remote apply windows are idempotent without loopback entries. /// [Fact] public async Task RemoteApply_ShouldBeIdempotentAcrossDuplicateWindow_WithoutLoopbackEntries() { string dbPath = CreateTemporaryDatabasePath(); const string localNodeId = "node-local"; const string remoteNodeId = "node-remote"; try { await using var harness = await CdcTestHarness.OpenWithRetriesAsync( dbPath, localNodeId, "consumer-loopback"); await harness.Context.Users.InsertAsync(CreateUser("loopback-user", "Loopback", 40, "Boston")); await harness.Context.SaveChangesAsync(); await harness.PollAsync(); await WaitForConditionAsync( async () => (await harness.GetEntriesByKeyAsync("Users", "loopback-user")).Count >= 1, "Timed out waiting for loopback-user insert oplog entry."); var localEntries = await harness.GetEntriesByKeyAsync("Users", "loopback-user"); localEntries.Count.ShouldBe(1); localEntries[0].Operation.ShouldBe(OperationType.Put); localEntries[0].Timestamp.NodeId.ShouldBe(localNodeId); var remoteDelete = new OplogEntry( "Users", "loopback-user", OperationType.Delete, null, new HlcTimestamp(localEntries[0].Timestamp.PhysicalTime + 10, 0, remoteNodeId), localEntries[0].Hash); var duplicateWindow = new[] { remoteDelete, remoteDelete }; await harness.OplogStore.ApplyBatchAsync(duplicateWindow); await harness.OplogStore.ApplyBatchAsync(duplicateWindow); harness.Context.Users.FindById("loopback-user").ShouldBeNull(); var allEntries = await harness.GetEntriesByKeyAsync("Users", "loopback-user"); allEntries.Count(e => e.Hash == remoteDelete.Hash).ShouldBe(1); allEntries.Count(e => e.Operation == OperationType.Delete && e.Timestamp.NodeId == localNodeId) .ShouldBe(0); allEntries.Count(e => e.Operation == OperationType.Delete && e.Timestamp.NodeId == remoteNodeId) .ShouldBe(1); } finally { await DeleteDirectoryWithRetriesAsync(dbPath); } } /// /// Verifies local deletes persist tombstone metadata and advance checkpoints. /// [Fact] public async Task LocalDelete_ShouldPersistTombstoneMetadata_AndAdvanceCheckpoint() { string dbPath = CreateTemporaryDatabasePath(); const string nodeId = "node-tombstone"; try { await using var harness = await CdcTestHarness.OpenWithRetriesAsync( dbPath, nodeId, "consumer-tombstone"); await harness.Context.Users.InsertAsync(CreateUser("tombstone-user", "Before Delete", 28, "Turin")); await harness.Context.SaveChangesAsync(); await harness.PollAsync(); await harness.Context.Users.DeleteAsync("tombstone-user"); await harness.Context.SaveChangesAsync(); await harness.PollAsync(); harness.Context.Users.FindById("tombstone-user").ShouldBeNull(); await WaitForConditionAsync( async () => (await harness.GetEntriesByKeyAsync("Users", "tombstone-user")).Count >= 2, "Timed out waiting for tombstone-user oplog entries."); var entries = await harness.GetEntriesByKeyAsync("Users", "tombstone-user"); entries.Count.ShouldBe(2); var deleteEntry = entries.Last(e => e.Operation == OperationType.Delete); var metadata = await harness.MetadataStore.GetMetadataAsync("Users", "tombstone-user"); metadata.ShouldNotBeNull(); metadata!.IsDeleted.ShouldBeTrue(); metadata.UpdatedAt.ShouldBe(deleteEntry.Timestamp); var checkpoint = await harness.CheckpointPersistence.GetCheckpointAsync(); checkpoint.ShouldNotBeNull(); checkpoint!.LastHash.ShouldBe(deleteEntry.Hash); checkpoint.Timestamp.ShouldBe(deleteEntry.Timestamp); } finally { await DeleteDirectoryWithRetriesAsync(dbPath); } } private static User CreateUser(string id, string name, int age, string city) { return new User { Id = id, Name = name, Age = age, Address = new Address { City = city } }; } private static string CreateTemporaryDatabasePath() { return Path.Combine(Path.GetTempPath(), $"cbddc-cdc-{Guid.NewGuid():N}.rocksdb"); } private static async Task DeleteDirectoryWithRetriesAsync(string path) { for (var attempt = 0; attempt < 5; attempt++) try { if (Directory.Exists(path)) Directory.Delete(path, true); return; } catch when (attempt < 4) { await Task.Delay(50); } } private static async Task WaitForConditionAsync( Func> predicate, string failureMessage, int timeoutMs = 6000, int pollMs = 50) { DateTimeOffset deadline = DateTimeOffset.UtcNow.AddMilliseconds(timeoutMs); while (DateTimeOffset.UtcNow < deadline) { if (await predicate()) return; await Task.Delay(pollMs); } throw new TimeoutException(failureMessage); } } [CollectionDefinition("SurrealCdcDurability", DisableParallelization = true)] public sealed class SurrealCdcDurabilityCollection; internal sealed class CdcTestHarness : IAsyncDisposable { private readonly VectorClockService _vectorClock; private readonly CBDDCSurrealEmbeddedOptions _options; private CdcTestHarness(string databasePath, string nodeId, string consumerId) { _options = new CBDDCSurrealEmbeddedOptions { Cdc = new CBDDCSurrealCdcOptions { Enabled = true, ConsumerId = consumerId, CheckpointTable = "cbddc_cdc_checkpoint" } }; Context = new SampleDbContext(databasePath); _vectorClock = new VectorClockService(); var configProvider = Substitute.For(); configProvider.GetConfiguration().Returns(new PeerNodeConfiguration { NodeId = nodeId, AuthToken = "test-token", TcpPort = 0 }); CheckpointPersistence = new SurrealCdcCheckpointPersistence( Context.SurrealEmbeddedClient, Context.SchemaInitializer, _options); DocumentStore = new CheckpointedSampleDocumentStore( Context, configProvider, _vectorClock, CheckpointPersistence, _options, NullLogger.Instance); OplogStore = new SurrealOplogStore( Context.SurrealEmbeddedClient, Context.SchemaInitializer, DocumentStore, new LastWriteWinsConflictResolver(), _vectorClock, null, NullLogger.Instance); MetadataStore = new SurrealDocumentMetadataStore( Context.SurrealEmbeddedClient, Context.SchemaInitializer, NullLogger.Instance); } /// /// Gets the sample database context. /// public SampleDbContext Context { get; } /// /// Gets the checkpointed sample document store. /// public CheckpointedSampleDocumentStore DocumentStore { get; } /// /// Gets the oplog store used by the harness. /// public SurrealOplogStore OplogStore { get; } /// /// Gets the document metadata store. /// public SurrealDocumentMetadataStore MetadataStore { get; } /// /// Gets checkpoint persistence used for CDC progress tracking. /// public ISurrealCdcCheckpointPersistence CheckpointPersistence { get; } /// /// Polls CDC once through the document store. /// public async Task PollAsync() { await DocumentStore.PollCdcOnceAsync(); } /// /// Creates a harness instance with retries for transient RocksDB lock contention. /// /// Database directory path. /// Node identifier. /// CDC consumer identifier. /// Initialized test harness. public static async Task OpenWithRetriesAsync( string databasePath, string nodeId, string consumerId) { for (var attempt = 0; attempt < 8; attempt++) try { return new CdcTestHarness(databasePath, nodeId, consumerId); } catch (Exception ex) when (IsLockContention(ex) && attempt < 7) { await Task.Delay(75); } throw new InvalidOperationException("Unable to acquire RocksDB lock for test harness."); } /// /// Gets oplog entries for a collection ordered by timestamp. /// /// Collection name. /// Ordered oplog entries. public async Task> GetEntriesByCollectionAsync(string collection) { return (await OplogStore.ExportAsync()) .Where(e => string.Equals(e.Collection, collection, StringComparison.Ordinal)) .OrderBy(e => e.Timestamp.PhysicalTime) .ThenBy(e => e.Timestamp.LogicalCounter) .ToList(); } /// /// Gets oplog entries for a collection key ordered by timestamp. /// /// Collection name. /// Document key. /// Ordered oplog entries. public async Task> GetEntriesByKeyAsync(string collection, string key) { return (await OplogStore.ExportAsync()) .Where(e => string.Equals(e.Collection, collection, StringComparison.Ordinal) && string.Equals(e.Key, key, StringComparison.Ordinal)) .OrderBy(e => e.Timestamp.PhysicalTime) .ThenBy(e => e.Timestamp.LogicalCounter) .ToList(); } /// public async ValueTask DisposeAsync() { DocumentStore.Dispose(); Context.Dispose(); await Task.Delay(75); } private static bool IsLockContention(Exception exception) { return exception.ToString().Contains("No locks available", StringComparison.OrdinalIgnoreCase); } } internal sealed class CheckpointedSampleDocumentStore : SurrealDocumentStore { private const string UsersCollection = "Users"; private const string TodoListsCollection = "TodoLists"; /// /// Initializes a new instance of the class. /// /// Sample database context. /// Peer configuration provider. /// Vector clock service. /// Checkpoint persistence implementation. /// Optional Surreal embedded options. /// Optional logger. public CheckpointedSampleDocumentStore( SampleDbContext context, IPeerNodeConfigurationProvider configProvider, IVectorClockService vectorClockService, ISurrealCdcCheckpointPersistence checkpointPersistence, CBDDCSurrealEmbeddedOptions? surrealOptions = null, ILogger? logger = null) : base( context, context.SurrealEmbeddedClient, context.SchemaInitializer, configProvider, vectorClockService, new LastWriteWinsConflictResolver(), checkpointPersistence, BuildPollingOptions(surrealOptions), logger) { WatchCollection(UsersCollection, context.Users, u => u.Id, subscribeForInMemoryEvents: false); WatchCollection(TodoListsCollection, context.TodoLists, t => t.Id, subscribeForInMemoryEvents: false); } /// protected override async Task ApplyContentToEntityAsync( string collection, string key, JsonElement content, CancellationToken cancellationToken) { await UpsertEntityAsync(collection, key, content, cancellationToken); } /// protected override async Task ApplyContentToEntitiesBatchAsync( IEnumerable<(string Collection, string Key, JsonElement Content)> documents, CancellationToken cancellationToken) { foreach ((string collection, string key, var content) in documents) await UpsertEntityAsync(collection, key, content, cancellationToken); } /// protected override async Task GetEntityAsJsonAsync( string collection, string key, CancellationToken cancellationToken) { return collection switch { UsersCollection => SerializeEntity(await _context.Users.FindByIdAsync(key, cancellationToken)), TodoListsCollection => SerializeEntity(await _context.TodoLists.FindByIdAsync(key, cancellationToken)), _ => null }; } /// protected override async Task RemoveEntityAsync( string collection, string key, CancellationToken cancellationToken) { await DeleteEntityAsync(collection, key, cancellationToken); } /// protected override async Task RemoveEntitiesBatchAsync( IEnumerable<(string Collection, string Key)> documents, CancellationToken cancellationToken) { foreach ((string collection, string key) in documents) await DeleteEntityAsync(collection, key, cancellationToken); } /// protected override async Task> GetAllEntitiesAsJsonAsync( string collection, CancellationToken cancellationToken) { return collection switch { UsersCollection => (await _context.Users.FindAllAsync(cancellationToken)) .Select(u => (u.Id, SerializeEntity(u)!.Value)) .ToList(), TodoListsCollection => (await _context.TodoLists.FindAllAsync(cancellationToken)) .Select(t => (t.Id, SerializeEntity(t)!.Value)) .ToList(), _ => [] }; } private async Task UpsertEntityAsync( string collection, string key, JsonElement content, CancellationToken cancellationToken) { switch (collection) { case UsersCollection: var user = content.Deserialize() ?? throw new InvalidOperationException("Failed to deserialize user."); user.Id = key; if (await _context.Users.FindByIdAsync(key, cancellationToken) == null) await _context.Users.InsertAsync(user, cancellationToken); else await _context.Users.UpdateAsync(user, cancellationToken); break; case TodoListsCollection: var todo = content.Deserialize() ?? throw new InvalidOperationException("Failed to deserialize todo list."); todo.Id = key; if (await _context.TodoLists.FindByIdAsync(key, cancellationToken) == null) await _context.TodoLists.InsertAsync(todo, cancellationToken); else await _context.TodoLists.UpdateAsync(todo, cancellationToken); break; default: throw new NotSupportedException($"Collection '{collection}' is not supported for sync."); } } private async Task DeleteEntityAsync(string collection, string key, CancellationToken cancellationToken) { switch (collection) { case UsersCollection: await _context.Users.DeleteAsync(key, cancellationToken); break; case TodoListsCollection: await _context.TodoLists.DeleteAsync(key, cancellationToken); break; } } private static JsonElement? SerializeEntity(T? entity) where T : class { return entity == null ? null : JsonSerializer.SerializeToElement(entity); } private static SurrealCdcPollingOptions? BuildPollingOptions(CBDDCSurrealEmbeddedOptions? options) { if (options == null) return null; return new SurrealCdcPollingOptions { Enabled = options.Cdc.Enabled, PollInterval = options.Cdc.PollingInterval, BatchSize = options.Cdc.BatchSize, EnableLiveSelectAccelerator = options.Cdc.EnableLiveSelectAccelerator, LiveSelectReconnectDelay = options.Cdc.LiveSelectReconnectDelay }; } }