diff --git a/samples/ZB.MOM.WW.CBDDC.Sample.Console/ConsoleInteractiveService.cs b/samples/ZB.MOM.WW.CBDDC.Sample.Console/ConsoleInteractiveService.cs index 1dbc701..b2ceeba 100755 --- a/samples/ZB.MOM.WW.CBDDC.Sample.Console/ConsoleInteractiveService.cs +++ b/samples/ZB.MOM.WW.CBDDC.Sample.Console/ConsoleInteractiveService.cs @@ -124,7 +124,9 @@ public class ConsoleInteractiveService : BackgroundService var ts = DateTime.Now.ToString("HH:mm:ss.fff"); var user = new User { - Id = Guid.NewGuid().ToString(), Name = $"User-{ts}", Age = new Random().Next(18, 90), + Id = Guid.NewGuid().ToString(), + Name = $"User-{ts}", + Age = new Random().Next(18, 90), Address = new Address { City = "AutoCity" } }; await _db.Users.InsertAsync(user); @@ -138,7 +140,9 @@ public class ConsoleInteractiveService : BackgroundService var ts = DateTime.Now.ToString("HH:mm:ss.fff"); var user = new User { - Id = Guid.NewGuid().ToString(), Name = $"User-{ts}", Age = new Random().Next(18, 90), + Id = Guid.NewGuid().ToString(), + Name = $"User-{ts}", + Age = new Random().Next(18, 90), Address = new Address { City = "SpamCity" } }; await _db.Users.InsertAsync(user); @@ -158,9 +162,9 @@ public class ConsoleInteractiveService : BackgroundService else if (input.StartsWith("p")) { var alice = new User - { Id = Guid.NewGuid().ToString(), Name = "Alice", Age = 30, Address = new Address { City = "Paris" } }; + { Id = Guid.NewGuid().ToString(), Name = "Alice", Age = 30, Address = new Address { City = "Paris" } }; var bob = new User - { Id = Guid.NewGuid().ToString(), Name = "Bob", Age = 25, Address = new Address { City = "Rome" } }; + { Id = Guid.NewGuid().ToString(), Name = "Bob", Age = 25, Address = new Address { City = "Rome" } }; await _db.Users.InsertAsync(alice); await _db.Users.InsertAsync(bob); await _db.SaveChangesAsync(); diff --git a/samples/ZB.MOM.WW.CBDDC.Sample.Console/SampleDbContext.cs b/samples/ZB.MOM.WW.CBDDC.Sample.Console/SampleDbContext.cs index 498663b..b6fd508 100755 --- a/samples/ZB.MOM.WW.CBDDC.Sample.Console/SampleDbContext.cs +++ b/samples/ZB.MOM.WW.CBDDC.Sample.Console/SampleDbContext.cs @@ -14,6 +14,11 @@ public class SampleDbContext : IDisposable private readonly bool _ownsClient; + /// + /// Initializes a new instance of the class. + /// + /// The embedded SurrealDB client. + /// The schema initializer. public SampleDbContext( ICBDDCSurrealEmbeddedClient surrealEmbeddedClient, ICBDDCSurrealSchemaInitializer schemaInitializer) @@ -29,6 +34,10 @@ public class SampleDbContext : IDisposable SchemaInitializer); } + /// + /// Initializes a new instance of the class. + /// + /// The database path used for the embedded store. public SampleDbContext(string databasePath) { string normalizedPath = NormalizeDatabasePath(databasePath); @@ -54,21 +63,41 @@ public class SampleDbContext : IDisposable SchemaInitializer); } + /// + /// Gets the embedded SurrealDB client. + /// public ICBDDCSurrealEmbeddedClient SurrealEmbeddedClient { get; } + /// + /// Gets the schema initializer. + /// public ICBDDCSurrealSchemaInitializer SchemaInitializer { get; private set; } + /// + /// Gets the users collection. + /// public SampleSurrealCollection Users { get; private set; } + /// + /// Gets the todo lists collection. + /// public SampleSurrealCollection TodoLists { get; private set; } + /// + /// Gets the operation log entries collection. + /// public SampleSurrealReadOnlyCollection OplogEntries { get; private set; } + /// + /// Ensures schema changes are applied before persisting updates. + /// + /// A cancellation token. public async Task SaveChangesAsync(CancellationToken cancellationToken = default) { await SchemaInitializer.EnsureInitializedAsync(cancellationToken); } + /// public void Dispose() { Users.Dispose(); @@ -101,11 +130,16 @@ public sealed class SampleSurrealSchemaInitializer : ICBDDCSurrealSchemaInitiali private readonly ICBDDCSurrealEmbeddedClient _client; private int _initialized; + /// + /// Initializes a new instance of the class. + /// + /// The embedded SurrealDB client. public SampleSurrealSchemaInitializer(ICBDDCSurrealEmbeddedClient client) { _client = client ?? throw new ArgumentNullException(nameof(client)); } + /// public async Task EnsureInitializedAsync(CancellationToken cancellationToken = default) { if (Volatile.Read(ref _initialized) == 1) return; @@ -124,6 +158,13 @@ public sealed class SampleSurrealCollection : ISurrealWatchableCollecti private readonly ICBDDCSurrealSchemaInitializer _schemaInitializer; private readonly string _tableName; + /// + /// Initializes a new instance of the class. + /// + /// The backing table name. + /// The key selector for entities. + /// The embedded SurrealDB client. + /// The schema initializer. public SampleSurrealCollection( string tableName, Func keySelector, @@ -139,21 +180,25 @@ public sealed class SampleSurrealCollection : ISurrealWatchableCollecti _schemaInitializer = schemaInitializer ?? throw new ArgumentNullException(nameof(schemaInitializer)); } + /// public IDisposable Subscribe(IObserver> observer) { return _changeFeed.Subscribe(observer); } + /// public async Task InsertAsync(TEntity entity, CancellationToken cancellationToken = default) { await UpsertAsync(entity, cancellationToken); } + /// public async Task UpdateAsync(TEntity entity, CancellationToken cancellationToken = default) { await UpsertAsync(entity, cancellationToken); } + /// public async Task DeleteAsync(string id, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(id)) @@ -164,11 +209,22 @@ public sealed class SampleSurrealCollection : ISurrealWatchableCollecti _changeFeed.PublishDelete(id); } + /// + /// Finds an entity by identifier. + /// + /// The entity identifier. + /// The matching entity when found; otherwise . public TEntity? FindById(string id) { return FindByIdAsync(id).GetAwaiter().GetResult(); } + /// + /// Finds an entity by identifier asynchronously. + /// + /// The entity identifier. + /// A cancellation token. + /// The matching entity when found; otherwise . public async Task FindByIdAsync(string id, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(id)) @@ -179,11 +235,13 @@ public sealed class SampleSurrealCollection : ISurrealWatchableCollecti return record?.Entity; } + /// public IEnumerable FindAll() { return FindAllAsync().GetAwaiter().GetResult(); } + /// public async Task> FindAllAsync(CancellationToken cancellationToken = default) { await EnsureReadyAsync(cancellationToken); @@ -195,12 +253,14 @@ public sealed class SampleSurrealCollection : ISurrealWatchableCollecti ?? []; } + /// public IEnumerable Find(Func predicate) { ArgumentNullException.ThrowIfNull(predicate); return FindAll().Where(predicate); } + /// public void Dispose() { _changeFeed.Dispose(); @@ -235,6 +295,12 @@ public sealed class SampleSurrealReadOnlyCollection private readonly ICBDDCSurrealSchemaInitializer _schemaInitializer; private readonly string _tableName; + /// + /// Initializes a new instance of the class. + /// + /// The backing table name. + /// The embedded SurrealDB client. + /// The schema initializer. public SampleSurrealReadOnlyCollection( string tableName, ICBDDCSurrealEmbeddedClient surrealEmbeddedClient, @@ -248,11 +314,20 @@ public sealed class SampleSurrealReadOnlyCollection _schemaInitializer = schemaInitializer ?? throw new ArgumentNullException(nameof(schemaInitializer)); } + /// + /// Returns all entities from the collection. + /// + /// The entities in the collection. public IEnumerable FindAll() { return FindAllAsync().GetAwaiter().GetResult(); } + /// + /// Returns all entities from the collection asynchronously. + /// + /// A cancellation token. + /// The entities in the collection. public async Task> FindAllAsync(CancellationToken cancellationToken = default) { await _schemaInitializer.EnsureInitializedAsync(cancellationToken); @@ -260,6 +335,11 @@ public sealed class SampleSurrealReadOnlyCollection return rows?.ToList() ?? []; } + /// + /// Returns entities that match the provided predicate. + /// + /// The predicate used to filter entities. + /// The entities that satisfy the predicate. public IEnumerable Find(Func predicate) { ArgumentNullException.ThrowIfNull(predicate); @@ -270,30 +350,54 @@ public sealed class SampleSurrealReadOnlyCollection public sealed class SampleEntityRecord : Record where TEntity : class { + /// + /// Gets or sets the stored entity payload. + /// [JsonPropertyName("entity")] public TEntity? Entity { get; set; } } public sealed class SampleOplogEntry : Record { + /// + /// Gets or sets the collection name. + /// [JsonPropertyName("collection")] public string Collection { get; set; } = ""; + /// + /// Gets or sets the entity key. + /// [JsonPropertyName("key")] public string Key { get; set; } = ""; + /// + /// Gets or sets the operation code. + /// [JsonPropertyName("operation")] public int Operation { get; set; } + /// + /// Gets or sets the node identifier portion of the timestamp. + /// [JsonPropertyName("timestampNodeId")] public string TimestampNodeId { get; set; } = ""; + /// + /// Gets or sets the physical time portion of the timestamp. + /// [JsonPropertyName("timestampPhysicalTime")] public long TimestampPhysicalTime { get; set; } + /// + /// Gets or sets the logical counter portion of the timestamp. + /// [JsonPropertyName("timestampLogicalCounter")] public int TimestampLogicalCounter { get; set; } + /// + /// Gets or sets the hash for the operation entry. + /// [JsonPropertyName("hash")] public string Hash { get; set; } = ""; } diff --git a/samples/ZB.MOM.WW.CBDDC.Sample.Console/SampleDocumentStore.cs b/samples/ZB.MOM.WW.CBDDC.Sample.Console/SampleDocumentStore.cs index 527d2b3..ef11042 100755 --- a/samples/ZB.MOM.WW.CBDDC.Sample.Console/SampleDocumentStore.cs +++ b/samples/ZB.MOM.WW.CBDDC.Sample.Console/SampleDocumentStore.cs @@ -15,6 +15,13 @@ public class SampleDocumentStore : SurrealDocumentStore private const string UsersCollection = "Users"; private const string TodoListsCollection = "TodoLists"; + /// + /// Initializes a new instance of the class. + /// + /// Sample database context. + /// Peer configuration provider. + /// Vector clock service. + /// Optional logger. public SampleDocumentStore( SampleDbContext context, IPeerNodeConfigurationProvider configProvider, @@ -35,6 +42,7 @@ public class SampleDocumentStore : SurrealDocumentStore WatchCollection(TodoListsCollection, context.TodoLists, t => t.Id); } + /// protected override async Task ApplyContentToEntityAsync( string collection, string key, @@ -44,6 +52,7 @@ public class SampleDocumentStore : SurrealDocumentStore await UpsertEntityAsync(collection, key, content, cancellationToken); } + /// protected override async Task ApplyContentToEntitiesBatchAsync( IEnumerable<(string Collection, string Key, JsonElement Content)> documents, CancellationToken cancellationToken) @@ -52,6 +61,7 @@ public class SampleDocumentStore : SurrealDocumentStore await UpsertEntityAsync(collection, key, content, cancellationToken); } + /// protected override async Task GetEntityAsJsonAsync( string collection, string key, @@ -65,6 +75,7 @@ public class SampleDocumentStore : SurrealDocumentStore }; } + /// protected override async Task RemoveEntityAsync( string collection, string key, @@ -73,6 +84,7 @@ public class SampleDocumentStore : SurrealDocumentStore await DeleteEntityAsync(collection, key, cancellationToken); } + /// protected override async Task RemoveEntitiesBatchAsync( IEnumerable<(string Collection, string Key)> documents, CancellationToken cancellationToken) @@ -81,6 +93,7 @@ public class SampleDocumentStore : SurrealDocumentStore await DeleteEntityAsync(collection, key, cancellationToken); } + /// protected override async Task> GetAllEntitiesAsJsonAsync( string collection, CancellationToken cancellationToken) diff --git a/src/ZB.MOM.WW.CBDDC.Core/Resilience/RetryPolicy.cs b/src/ZB.MOM.WW.CBDDC.Core/Resilience/RetryPolicy.cs index 6198ed3..414a3fc 100755 --- a/src/ZB.MOM.WW.CBDDC.Core/Resilience/RetryPolicy.cs +++ b/src/ZB.MOM.WW.CBDDC.Core/Resilience/RetryPolicy.cs @@ -55,14 +55,16 @@ public class RetryPolicy : IRetryPolicy return await operation(); } - catch (Exception ex) when (attempt < config.RetryAttempts && IsTransient(ex)) - { - lastException = ex; - int delay = config.RetryDelayMs * attempt; // Exponential backoff - - _logger.LogWarning(ex, - "Operation {Operation} failed (attempt {Attempt}/{Max}). Retrying in {Delay}ms...", - operationName, attempt, config.RetryAttempts, delay); + catch (Exception ex) when (IsTransient(ex)) + { + lastException = ex; + if (attempt >= config.RetryAttempts) break; + + int delay = config.RetryDelayMs * attempt; // Exponential backoff + + _logger.LogWarning(ex, + "Operation {Operation} failed (attempt {Attempt}/{Max}). Retrying in {Delay}ms...", + operationName, attempt, config.RetryAttempts, delay); await Task.Delay(delay, cancellationToken); } @@ -111,4 +113,4 @@ public class RetryPolicy : IRetryPolicy return false; } -} \ No newline at end of file +} diff --git a/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/ICBDDCSurrealEmbeddedClient.cs b/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/ICBDDCSurrealEmbeddedClient.cs index 403aa1c..3a0c8af 100644 --- a/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/ICBDDCSurrealEmbeddedClient.cs +++ b/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/ICBDDCSurrealEmbeddedClient.cs @@ -16,11 +16,15 @@ public interface ICBDDCSurrealEmbeddedClient : IAsyncDisposable, IDisposable /// /// Connects and selects namespace/database exactly once. /// + /// Cancellation token. Task InitializeAsync(CancellationToken cancellationToken = default); /// /// Executes a raw SurrealQL statement. /// + /// The SurrealQL query to execute. + /// Optional named parameters for the query. + /// Cancellation token. Task RawQueryAsync(string query, IReadOnlyDictionary? parameters = null, CancellationToken cancellationToken = default); @@ -28,5 +32,6 @@ public interface ICBDDCSurrealEmbeddedClient : IAsyncDisposable, IDisposable /// /// Checks whether the embedded client responds to health probes. /// + /// Cancellation token. Task HealthAsync(CancellationToken cancellationToken = default); } diff --git a/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/ICBDDCSurrealReadinessProbe.cs b/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/ICBDDCSurrealReadinessProbe.cs index 91de1d8..bc7afd3 100644 --- a/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/ICBDDCSurrealReadinessProbe.cs +++ b/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/ICBDDCSurrealReadinessProbe.cs @@ -8,5 +8,6 @@ public interface ICBDDCSurrealReadinessProbe /// /// Returns true when client initialization, schema initialization, and health checks pass. /// + /// The cancellation token. Task IsReadyAsync(CancellationToken cancellationToken = default); } diff --git a/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/ICBDDCSurrealSchemaInitializer.cs b/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/ICBDDCSurrealSchemaInitializer.cs index 6335569..d95068a 100644 --- a/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/ICBDDCSurrealSchemaInitializer.cs +++ b/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/ICBDDCSurrealSchemaInitializer.cs @@ -8,5 +8,6 @@ public interface ICBDDCSurrealSchemaInitializer /// /// Creates required tables/indexes/checkpoint schema for CBDDC stores. /// + /// Cancellation token. Task EnsureInitializedAsync(CancellationToken cancellationToken = default); } diff --git a/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/ISurrealCdcWorkerLifecycle.cs b/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/ISurrealCdcWorkerLifecycle.cs index ae1b487..43a995c 100644 --- a/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/ISurrealCdcWorkerLifecycle.cs +++ b/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/ISurrealCdcWorkerLifecycle.cs @@ -13,15 +13,18 @@ public interface ISurrealCdcWorkerLifecycle /// /// Starts the CDC worker. /// + /// The token used to cancel the asynchronous operation. Task StartCdcWorkerAsync(CancellationToken cancellationToken = default); /// /// Executes one CDC polling pass across all watched collections. /// + /// The token used to cancel the asynchronous operation. Task PollCdcOnceAsync(CancellationToken cancellationToken = default); /// /// Stops the CDC worker. /// + /// The token used to cancel the asynchronous operation. Task StopCdcWorkerAsync(CancellationToken cancellationToken = default); } diff --git a/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealCdcCheckpointPersistence.cs b/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealCdcCheckpointPersistence.cs index 5f8390b..1908e6b 100644 --- a/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealCdcCheckpointPersistence.cs +++ b/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealCdcCheckpointPersistence.cs @@ -150,30 +150,56 @@ public sealed class SurrealCdcCheckpointPersistence : ISurrealCdcCheckpointPersi internal sealed class SurrealCdcCheckpointRecord : Record { + /// + /// Gets or sets the CDC consumer identifier. + /// [JsonPropertyName("consumerId")] public string ConsumerId { get; set; } = ""; + /// + /// Gets or sets the physical time component of the checkpoint timestamp. + /// [JsonPropertyName("timestampPhysicalTime")] public long TimestampPhysicalTime { get; set; } + /// + /// Gets or sets the logical counter component of the checkpoint timestamp. + /// [JsonPropertyName("timestampLogicalCounter")] public int TimestampLogicalCounter { get; set; } + /// + /// Gets or sets the node identifier component of the checkpoint timestamp. + /// [JsonPropertyName("timestampNodeId")] public string TimestampNodeId { get; set; } = ""; + /// + /// Gets or sets the hash associated with the checkpoint. + /// [JsonPropertyName("lastHash")] public string LastHash { get; set; } = ""; + /// + /// Gets or sets the last update time in Unix milliseconds. + /// [JsonPropertyName("updatedUtcMs")] public long UpdatedUtcMs { get; set; } + /// + /// Gets or sets the optional encoded versionstamp cursor. + /// [JsonPropertyName("versionstampCursor")] public long? VersionstampCursor { get; set; } } internal static class SurrealCdcCheckpointRecordMappers { + /// + /// Converts a checkpoint record into the domain checkpoint model. + /// + /// The Surreal checkpoint record. + /// The mapped domain checkpoint instance. public static SurrealCdcCheckpoint ToDomain(this SurrealCdcCheckpointRecord record) { return new SurrealCdcCheckpoint diff --git a/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealDocumentMetadataStore.cs b/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealDocumentMetadataStore.cs index 51c3bf2..91f4e19 100644 --- a/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealDocumentMetadataStore.cs +++ b/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealDocumentMetadataStore.cs @@ -13,6 +13,12 @@ public class SurrealDocumentMetadataStore : DocumentMetadataStore private readonly ICBDDCSurrealSchemaInitializer _schemaInitializer; private readonly ISurrealDbClient _surrealClient; + /// + /// Initializes a new instance of the class. + /// + /// The embedded Surreal client provider. + /// The schema initializer. + /// Optional logger. public SurrealDocumentMetadataStore( ICBDDCSurrealEmbeddedClient surrealEmbeddedClient, ICBDDCSurrealSchemaInitializer schemaInitializer, @@ -24,6 +30,7 @@ public class SurrealDocumentMetadataStore : DocumentMetadataStore _logger = logger ?? NullLogger.Instance; } + /// public override async Task GetMetadataAsync(string collection, string key, CancellationToken cancellationToken = default) { @@ -31,6 +38,7 @@ public class SurrealDocumentMetadataStore : DocumentMetadataStore return existing?.ToDomain(); } + /// public override async Task> GetMetadataByCollectionAsync(string collection, CancellationToken cancellationToken = default) { @@ -41,6 +49,7 @@ public class SurrealDocumentMetadataStore : DocumentMetadataStore .ToList(); } + /// public override async Task UpsertMetadataAsync(DocumentMetadata metadata, CancellationToken cancellationToken = default) { @@ -55,6 +64,7 @@ public class SurrealDocumentMetadataStore : DocumentMetadataStore cancellationToken); } + /// public override async Task UpsertMetadataBatchAsync(IEnumerable metadatas, CancellationToken cancellationToken = default) { @@ -62,6 +72,7 @@ public class SurrealDocumentMetadataStore : DocumentMetadataStore await UpsertMetadataAsync(metadata, cancellationToken); } + /// public override async Task MarkDeletedAsync(string collection, string key, HlcTimestamp timestamp, CancellationToken cancellationToken = default) { @@ -69,6 +80,7 @@ public class SurrealDocumentMetadataStore : DocumentMetadataStore await UpsertMetadataAsync(metadata, cancellationToken); } + /// public override async Task> GetMetadataAfterAsync(HlcTimestamp since, IEnumerable? collections = null, CancellationToken cancellationToken = default) { @@ -86,24 +98,28 @@ public class SurrealDocumentMetadataStore : DocumentMetadataStore .ToList(); } + /// public override async Task DropAsync(CancellationToken cancellationToken = default) { await EnsureReadyAsync(cancellationToken); await _surrealClient.Delete(CBDDCSurrealSchemaNames.DocumentMetadataTable, cancellationToken); } + /// public override async Task> ExportAsync(CancellationToken cancellationToken = default) { var all = await SelectAllAsync(cancellationToken); return all.Select(m => m.ToDomain()).ToList(); } + /// public override async Task ImportAsync(IEnumerable items, CancellationToken cancellationToken = default) { foreach (var item in items) await UpsertMetadataAsync(item, cancellationToken); } + /// public override async Task MergeAsync(IEnumerable items, CancellationToken cancellationToken = default) { diff --git a/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealDocumentStore.cs b/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealDocumentStore.cs index 8213e67..98d9d66 100644 --- a/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealDocumentStore.cs +++ b/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealDocumentStore.cs @@ -61,6 +61,15 @@ public abstract class SurrealDocumentStore : IDocumentStore, ISurrealC /// /// Initializes a new instance of the class. /// + /// The application context used by the concrete store. + /// The embedded Surreal client provider. + /// The Surreal schema initializer. + /// The peer node configuration provider. + /// The vector clock service used for local oplog state. + /// Optional conflict resolver; defaults to last-write-wins. + /// Optional CDC checkpoint persistence component. + /// Optional CDC polling options. + /// Optional logger instance. protected SurrealDocumentStore( TContext context, ICBDDCSurrealEmbeddedClient surrealEmbeddedClient, @@ -128,21 +137,28 @@ public abstract class SurrealDocumentStore : IDocumentStore, ISurrealC { private readonly ILogger _inner; + /// + /// Initializes a new instance of the class. + /// + /// The logger instance to forward calls to. public ForwardingLogger(ILogger inner) { _inner = inner; } + /// public IDisposable? BeginScope(TState state) where TState : notnull { return _inner.BeginScope(state); } + /// public bool IsEnabled(LogLevel logLevel) { return _inner.IsEnabled(logLevel); } + /// public void Log( LogLevel logLevel, EventId eventId, @@ -191,6 +207,7 @@ public abstract class SurrealDocumentStore : IDocumentStore, ISurrealC /// Logical collection name used by oplog and metadata records. /// Watchable change source. /// Function used to resolve the entity key. + /// Whether to subscribe to in-memory collection events. protected void WatchCollection( string collectionName, ISurrealWatchableCollection collection, @@ -220,6 +237,12 @@ public abstract class SurrealDocumentStore : IDocumentStore, ISurrealC private readonly Func _keySelector; private readonly SurrealDocumentStore _store; + /// + /// Initializes a new instance of the class. + /// + /// The logical collection name. + /// The key selector for observed entities. + /// The owning document store. public CdcObserver( string collectionName, Func keySelector, @@ -230,6 +253,7 @@ public abstract class SurrealDocumentStore : IDocumentStore, ISurrealC _store = store; } + /// public void OnNext(SurrealCollectionChange changeEvent) { if (_store.IsCdcPollingWorkerActiveForCollection(_collectionName)) return; @@ -267,10 +291,12 @@ public abstract class SurrealDocumentStore : IDocumentStore, ISurrealC .GetAwaiter().GetResult(); } + /// public void OnError(Exception error) { } + /// public void OnCompleted() { } @@ -760,22 +786,58 @@ public abstract class SurrealDocumentStore : IDocumentStore, ISurrealC #region Abstract Methods - Implemented by subclass + /// + /// Applies JSON content to a single entity in the backing store. + /// + /// The collection name. + /// The document key. + /// The JSON payload to persist. + /// The cancellation token. protected abstract Task ApplyContentToEntityAsync( string collection, string key, JsonElement content, CancellationToken cancellationToken); + /// + /// Applies JSON content to multiple entities in the backing store. + /// + /// The documents to persist. + /// The cancellation token. protected abstract Task ApplyContentToEntitiesBatchAsync( IEnumerable<(string Collection, string Key, JsonElement Content)> documents, CancellationToken cancellationToken); + /// + /// Gets a single entity as JSON content. + /// + /// The collection name. + /// The document key. + /// The cancellation token. + /// The JSON content when found; otherwise . protected abstract Task GetEntityAsJsonAsync( string collection, string key, CancellationToken cancellationToken); + /// + /// Removes a single entity from the backing store. + /// + /// The collection name. + /// The document key. + /// The cancellation token. protected abstract Task RemoveEntityAsync( string collection, string key, CancellationToken cancellationToken); + /// + /// Removes multiple entities from the backing store. + /// + /// The documents to remove. + /// The cancellation token. protected abstract Task RemoveEntitiesBatchAsync( IEnumerable<(string Collection, string Key)> documents, CancellationToken cancellationToken); + /// + /// Gets all entities from a collection as JSON content. + /// + /// The collection name. + /// The cancellation token. + /// A sequence of key/content pairs. protected abstract Task> GetAllEntitiesAsJsonAsync( string collection, CancellationToken cancellationToken); @@ -1055,6 +1117,12 @@ public abstract class SurrealDocumentStore : IDocumentStore, ISurrealC /// /// Handles a local collection change and records oplog/metadata when not suppressed. /// + /// The collection name. + /// The document key. + /// The detected operation type. + /// Optional JSON content for non-delete operations. + /// Optional pending cursor checkpoint to persist. + /// The cancellation token. protected async Task OnLocalChangeDetectedAsync( string collection, string key, @@ -1315,11 +1383,16 @@ public abstract class SurrealDocumentStore : IDocumentStore, ISurrealC private readonly SemaphoreSlim _guard; private int _disposed; + /// + /// Initializes a new instance of the class. + /// + /// The guard semaphore to release on dispose. public RemoteSyncScope(SemaphoreSlim guard) { _guard = guard; } + /// public void Dispose() { if (Interlocked.Exchange(ref _disposed, 1) == 1) return; diff --git a/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealDocumentStoreWatch.cs b/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealDocumentStoreWatch.cs index b438f5f..38e6fba 100644 --- a/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealDocumentStoreWatch.cs +++ b/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealDocumentStoreWatch.cs @@ -127,6 +127,11 @@ public sealed class SurrealCollectionChangeFeed : ISurrealWatchableColl private readonly IObserver> _observer; private int _disposed; + /// + /// Initializes a new instance of the class. + /// + /// The owning change feed. + /// The observer to unsubscribe on disposal. public Subscription( SurrealCollectionChangeFeed owner, IObserver> observer) @@ -135,6 +140,7 @@ public sealed class SurrealCollectionChangeFeed : ISurrealWatchableColl _observer = observer; } + /// public void Dispose() { if (Interlocked.Exchange(ref _disposed, 1) == 1) return; diff --git a/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealOplogStore.cs b/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealOplogStore.cs index e458f58..162b1db 100644 --- a/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealOplogStore.cs +++ b/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealOplogStore.cs @@ -14,6 +14,16 @@ public class SurrealOplogStore : OplogStore private readonly ICBDDCSurrealSchemaInitializer? _schemaInitializer; private readonly ISurrealDbClient? _surrealClient; + /// + /// Initializes a new instance of the class. + /// + /// The Surreal embedded client provider. + /// The schema initializer used to prepare storage. + /// The document store used for entity operations. + /// The conflict resolver for replicated mutations. + /// The vector clock service for causal ordering. + /// The optional snapshot metadata store. + /// The optional logger instance. public SurrealOplogStore( ICBDDCSurrealEmbeddedClient surrealEmbeddedClient, ICBDDCSurrealSchemaInitializer schemaInitializer, @@ -36,6 +46,7 @@ public class SurrealOplogStore : OplogStore InitializeVectorClock(); } + /// public override async Task> GetChainRangeAsync(string startHash, string endHash, CancellationToken cancellationToken = default) { @@ -61,12 +72,14 @@ public class SurrealOplogStore : OplogStore .ToList(); } + /// public override async Task GetEntryByHashAsync(string hash, CancellationToken cancellationToken = default) { var existing = await FindByHashAsync(hash, cancellationToken); return existing?.ToDomain(); } + /// public override async Task> GetOplogAfterAsync(HlcTimestamp timestamp, IEnumerable? collections = null, CancellationToken cancellationToken = default) { @@ -85,6 +98,7 @@ public class SurrealOplogStore : OplogStore .ToList(); } + /// public override async Task> GetOplogForNodeAfterAsync(string nodeId, HlcTimestamp since, IEnumerable? collections = null, CancellationToken cancellationToken = default) { @@ -104,6 +118,7 @@ public class SurrealOplogStore : OplogStore .ToList(); } + /// public override async Task PruneOplogAsync(HlcTimestamp cutoff, CancellationToken cancellationToken = default) { var all = await SelectAllAsync(cancellationToken); @@ -121,6 +136,7 @@ public class SurrealOplogStore : OplogStore } } + /// public override async Task DropAsync(CancellationToken cancellationToken = default) { await EnsureReadyAsync(cancellationToken); @@ -128,12 +144,14 @@ public class SurrealOplogStore : OplogStore _vectorClock.Invalidate(); } + /// public override async Task> ExportAsync(CancellationToken cancellationToken = default) { var all = await SelectAllAsync(cancellationToken); return all.Select(o => o.ToDomain()).ToList(); } + /// public override async Task ImportAsync(IEnumerable items, CancellationToken cancellationToken = default) { foreach (var item in items) @@ -144,6 +162,7 @@ public class SurrealOplogStore : OplogStore } } + /// public override async Task MergeAsync(IEnumerable items, CancellationToken cancellationToken = default) { foreach (var item in items) @@ -155,6 +174,7 @@ public class SurrealOplogStore : OplogStore } } + /// protected override void InitializeVectorClock() { if (_vectorClock.IsInitialized) return; @@ -206,6 +226,7 @@ public class SurrealOplogStore : OplogStore _vectorClock.IsInitialized = true; } + /// protected override async Task InsertOplogEntryAsync(OplogEntry entry, CancellationToken cancellationToken = default) { var existing = await FindByHashAsync(entry.Hash, cancellationToken); @@ -214,6 +235,7 @@ public class SurrealOplogStore : OplogStore await UpsertAsync(entry, SurrealStoreRecordIds.Oplog(entry.Hash), cancellationToken); } + /// protected override async Task QueryLastHashForNodeAsync(string nodeId, CancellationToken cancellationToken = default) { @@ -226,6 +248,7 @@ public class SurrealOplogStore : OplogStore return lastEntry?.Hash; } + /// protected override async Task<(long Wall, int Logic)?> QueryLastHashTimestampFromOplogAsync(string hash, CancellationToken cancellationToken = default) { diff --git a/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealPeerConfigurationStore.cs b/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealPeerConfigurationStore.cs index b1f0d35..f102c05 100644 --- a/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealPeerConfigurationStore.cs +++ b/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealPeerConfigurationStore.cs @@ -12,6 +12,12 @@ public class SurrealPeerConfigurationStore : PeerConfigurationStore private readonly ICBDDCSurrealSchemaInitializer _schemaInitializer; private readonly ISurrealDbClient _surrealClient; + /// + /// Initializes a new instance of the class. + /// + /// The embedded SurrealDB client. + /// The schema initializer. + /// The logger instance. public SurrealPeerConfigurationStore( ICBDDCSurrealEmbeddedClient surrealEmbeddedClient, ICBDDCSurrealSchemaInitializer schemaInitializer, @@ -23,6 +29,7 @@ public class SurrealPeerConfigurationStore : PeerConfigurationStore _logger = logger ?? NullLogger.Instance; } + /// public override async Task> GetRemotePeersAsync( CancellationToken cancellationToken = default) { @@ -30,6 +37,7 @@ public class SurrealPeerConfigurationStore : PeerConfigurationStore return all.Select(p => p.ToDomain()).ToList(); } + /// public override async Task GetRemotePeerAsync(string nodeId, CancellationToken cancellationToken) { @@ -37,6 +45,7 @@ public class SurrealPeerConfigurationStore : PeerConfigurationStore return existing?.ToDomain(); } + /// public override async Task RemoveRemotePeerAsync(string nodeId, CancellationToken cancellationToken = default) { await EnsureReadyAsync(cancellationToken); @@ -52,6 +61,7 @@ public class SurrealPeerConfigurationStore : PeerConfigurationStore _logger.LogInformation("Removed remote peer configuration: {NodeId}", nodeId); } + /// public override async Task SaveRemotePeerAsync(RemotePeerConfiguration peer, CancellationToken cancellationToken = default) { @@ -67,6 +77,7 @@ public class SurrealPeerConfigurationStore : PeerConfigurationStore _logger.LogInformation("Saved remote peer configuration: {NodeId} ({Type})", peer.NodeId, peer.Type); } + /// public override async Task DropAsync(CancellationToken cancellationToken = default) { _logger.LogWarning( @@ -76,6 +87,7 @@ public class SurrealPeerConfigurationStore : PeerConfigurationStore _logger.LogInformation("Peer configuration store dropped successfully."); } + /// public override async Task> ExportAsync( CancellationToken cancellationToken = default) { diff --git a/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealPeerOplogConfirmationStore.cs b/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealPeerOplogConfirmationStore.cs index 61c749a..66dcf80 100644 --- a/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealPeerOplogConfirmationStore.cs +++ b/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealPeerOplogConfirmationStore.cs @@ -15,6 +15,12 @@ public class SurrealPeerOplogConfirmationStore : PeerOplogConfirmationStore private readonly ICBDDCSurrealSchemaInitializer _schemaInitializer; private readonly ISurrealDbClient _surrealClient; + /// + /// Initializes a new instance of the class. + /// + /// Embedded Surreal client wrapper. + /// Schema initializer. + /// Optional logger. public SurrealPeerOplogConfirmationStore( ICBDDCSurrealEmbeddedClient surrealEmbeddedClient, ICBDDCSurrealSchemaInitializer schemaInitializer, @@ -26,6 +32,7 @@ public class SurrealPeerOplogConfirmationStore : PeerOplogConfirmationStore _logger = logger ?? NullLogger.Instance; } + /// public override async Task EnsurePeerRegisteredAsync( string peerNodeId, string address, @@ -68,6 +75,7 @@ public class SurrealPeerOplogConfirmationStore : PeerOplogConfirmationStore await UpsertAsync(existing, recordId, cancellationToken); } + /// public override async Task UpdateConfirmationAsync( string peerNodeId, string sourceNodeId, @@ -118,6 +126,7 @@ public class SurrealPeerOplogConfirmationStore : PeerOplogConfirmationStore await UpsertAsync(existing, recordId, cancellationToken); } + /// public override async Task> GetConfirmationsAsync( CancellationToken cancellationToken = default) { @@ -128,6 +137,7 @@ public class SurrealPeerOplogConfirmationStore : PeerOplogConfirmationStore .ToList(); } + /// public override async Task> GetConfirmationsForPeerAsync( string peerNodeId, CancellationToken cancellationToken = default) @@ -143,6 +153,7 @@ public class SurrealPeerOplogConfirmationStore : PeerOplogConfirmationStore .ToList(); } + /// public override async Task RemovePeerTrackingAsync(string peerNodeId, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(peerNodeId)) @@ -167,6 +178,7 @@ public class SurrealPeerOplogConfirmationStore : PeerOplogConfirmationStore } } + /// public override async Task> GetActiveTrackedPeersAsync( CancellationToken cancellationToken = default) { @@ -178,18 +190,21 @@ public class SurrealPeerOplogConfirmationStore : PeerOplogConfirmationStore .ToList(); } + /// public override async Task DropAsync(CancellationToken cancellationToken = default) { await EnsureReadyAsync(cancellationToken); await _surrealClient.Delete(CBDDCSurrealSchemaNames.PeerOplogConfirmationsTable, cancellationToken); } + /// public override async Task> ExportAsync(CancellationToken cancellationToken = default) { var all = await SelectAllAsync(cancellationToken); return all.Select(c => c.ToDomain()).ToList(); } + /// public override async Task ImportAsync(IEnumerable items, CancellationToken cancellationToken = default) { @@ -202,6 +217,7 @@ public class SurrealPeerOplogConfirmationStore : PeerOplogConfirmationStore } } + /// public override async Task MergeAsync(IEnumerable items, CancellationToken cancellationToken = default) { diff --git a/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealShowChangesCborDecoder.cs b/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealShowChangesCborDecoder.cs index a20fd8d..9799212 100644 --- a/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealShowChangesCborDecoder.cs +++ b/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealShowChangesCborDecoder.cs @@ -17,6 +17,12 @@ internal static class SurrealShowChangesCborDecoder { private static readonly string[] PutChangeKinds = ["create", "update", "upsert", "insert", "set", "replace"]; + /// + /// Decodes change rows returned by a SurrealDB show changes query. + /// + /// The CBOR rows to decode. + /// The expected table name used to validate row identifiers. + /// The decoded set of change rows. public static IReadOnlyList DecodeRows( IEnumerable rows, string expectedTableName) diff --git a/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealSnapshotMetadataStore.cs b/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealSnapshotMetadataStore.cs index 6a97d48..cd20999 100644 --- a/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealSnapshotMetadataStore.cs +++ b/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealSnapshotMetadataStore.cs @@ -12,6 +12,12 @@ public class SurrealSnapshotMetadataStore : SnapshotMetadataStore private readonly ICBDDCSurrealSchemaInitializer _schemaInitializer; private readonly ISurrealDbClient _surrealClient; + /// + /// Initializes a new instance of the class. + /// + /// The Surreal embedded client provider. + /// The schema initializer used to prepare storage. + /// The optional logger instance. public SurrealSnapshotMetadataStore( ICBDDCSurrealEmbeddedClient surrealEmbeddedClient, ICBDDCSurrealSchemaInitializer schemaInitializer, @@ -23,18 +29,21 @@ public class SurrealSnapshotMetadataStore : SnapshotMetadataStore _logger = logger ?? NullLogger.Instance; } + /// public override async Task DropAsync(CancellationToken cancellationToken = default) { await EnsureReadyAsync(cancellationToken); await _surrealClient.Delete(CBDDCSurrealSchemaNames.SnapshotMetadataTable, cancellationToken); } + /// public override async Task> ExportAsync(CancellationToken cancellationToken = default) { var all = await SelectAllAsync(cancellationToken); return all.Select(m => m.ToDomain()).ToList(); } + /// public override async Task GetSnapshotMetadataAsync(string nodeId, CancellationToken cancellationToken = default) { @@ -42,12 +51,14 @@ public class SurrealSnapshotMetadataStore : SnapshotMetadataStore return existing?.ToDomain(); } + /// public override async Task GetSnapshotHashAsync(string nodeId, CancellationToken cancellationToken = default) { var existing = await FindByNodeIdAsync(nodeId, cancellationToken); return existing?.Hash; } + /// public override async Task ImportAsync(IEnumerable items, CancellationToken cancellationToken = default) { @@ -59,6 +70,7 @@ public class SurrealSnapshotMetadataStore : SnapshotMetadataStore } } + /// public override async Task InsertSnapshotMetadataAsync(SnapshotMetadata metadata, CancellationToken cancellationToken = default) { @@ -67,6 +79,7 @@ public class SurrealSnapshotMetadataStore : SnapshotMetadataStore await UpsertAsync(metadata, recordId, cancellationToken); } + /// public override async Task MergeAsync(IEnumerable items, CancellationToken cancellationToken = default) { foreach (var metadata in items) @@ -88,6 +101,7 @@ public class SurrealSnapshotMetadataStore : SnapshotMetadataStore } } + /// public override async Task UpdateSnapshotMetadataAsync(SnapshotMetadata existingMeta, CancellationToken cancellationToken) { @@ -98,6 +112,7 @@ public class SurrealSnapshotMetadataStore : SnapshotMetadataStore await UpsertAsync(existingMeta, recordId, cancellationToken); } + /// public override async Task> GetAllSnapshotMetadataAsync( CancellationToken cancellationToken = default) { diff --git a/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealStoreRecords.cs b/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealStoreRecords.cs index 502671a..a26234f 100644 --- a/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealStoreRecords.cs +++ b/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealStoreRecords.cs @@ -11,11 +11,22 @@ namespace ZB.MOM.WW.CBDDC.Persistence.Surreal; internal static class SurrealStoreRecordIds { + /// + /// Creates the record identifier for an oplog entry. + /// + /// The oplog entry hash. + /// The SurrealDB record identifier. public static RecordId Oplog(string hash) { return RecordId.From(CBDDCSurrealSchemaNames.OplogEntriesTable, hash); } + /// + /// Creates the record identifier for document metadata. + /// + /// The document collection name. + /// The document key. + /// The SurrealDB record identifier. public static RecordId DocumentMetadata(string collection, string key) { return RecordId.From( @@ -23,16 +34,32 @@ internal static class SurrealStoreRecordIds CompositeKey("docmeta", collection, key)); } + /// + /// Creates the record identifier for snapshot metadata. + /// + /// The node identifier. + /// The SurrealDB record identifier. public static RecordId SnapshotMetadata(string nodeId) { return RecordId.From(CBDDCSurrealSchemaNames.SnapshotMetadataTable, nodeId); } + /// + /// Creates the record identifier for a remote peer configuration. + /// + /// The peer node identifier. + /// The SurrealDB record identifier. public static RecordId RemotePeer(string nodeId) { return RecordId.From(CBDDCSurrealSchemaNames.RemotePeerConfigurationsTable, nodeId); } + /// + /// Creates the record identifier for a peer oplog confirmation. + /// + /// The peer node identifier. + /// The source node identifier. + /// The SurrealDB record identifier. public static RecordId PeerOplogConfirmation(string peerNodeId, string sourceNodeId) { return RecordId.From( @@ -49,114 +76,212 @@ internal static class SurrealStoreRecordIds internal sealed class SurrealOplogRecord : Record { + /// + /// Gets or sets the collection name. + /// [JsonPropertyName("collection")] public string Collection { get; set; } = ""; + /// + /// Gets or sets the document key. + /// [JsonPropertyName("key")] public string Key { get; set; } = ""; + /// + /// Gets or sets the operation type value. + /// [JsonPropertyName("operation")] public int Operation { get; set; } + /// + /// Gets or sets the serialized payload JSON. + /// [JsonPropertyName("payloadJson")] public string PayloadJson { get; set; } = ""; + /// + /// Gets or sets the timestamp physical time. + /// [JsonPropertyName("timestampPhysicalTime")] public long TimestampPhysicalTime { get; set; } + /// + /// Gets or sets the timestamp logical counter. + /// [JsonPropertyName("timestampLogicalCounter")] public int TimestampLogicalCounter { get; set; } + /// + /// Gets or sets the timestamp node identifier. + /// [JsonPropertyName("timestampNodeId")] public string TimestampNodeId { get; set; } = ""; + /// + /// Gets or sets the entry hash. + /// [JsonPropertyName("hash")] public string Hash { get; set; } = ""; + /// + /// Gets or sets the previous entry hash. + /// [JsonPropertyName("previousHash")] public string PreviousHash { get; set; } = ""; } internal sealed class SurrealDocumentMetadataRecord : Record { + /// + /// Gets or sets the collection name. + /// [JsonPropertyName("collection")] public string Collection { get; set; } = ""; + /// + /// Gets or sets the document key. + /// [JsonPropertyName("key")] public string Key { get; set; } = ""; + /// + /// Gets or sets the HLC physical time. + /// [JsonPropertyName("hlcPhysicalTime")] public long HlcPhysicalTime { get; set; } + /// + /// Gets or sets the HLC logical counter. + /// [JsonPropertyName("hlcLogicalCounter")] public int HlcLogicalCounter { get; set; } + /// + /// Gets or sets the HLC node identifier. + /// [JsonPropertyName("hlcNodeId")] public string HlcNodeId { get; set; } = ""; + /// + /// Gets or sets a value indicating whether the document is deleted. + /// [JsonPropertyName("isDeleted")] public bool IsDeleted { get; set; } } internal sealed class SurrealRemotePeerRecord : Record { + /// + /// Gets or sets the peer node identifier. + /// [JsonPropertyName("nodeId")] public string NodeId { get; set; } = ""; + /// + /// Gets or sets the peer network address. + /// [JsonPropertyName("address")] public string Address { get; set; } = ""; + /// + /// Gets or sets the peer type value. + /// [JsonPropertyName("type")] public int Type { get; set; } + /// + /// Gets or sets a value indicating whether the peer is enabled. + /// [JsonPropertyName("isEnabled")] public bool IsEnabled { get; set; } + /// + /// Gets or sets the serialized list of collection interests. + /// [JsonPropertyName("interestsJson")] public string InterestsJson { get; set; } = ""; } internal sealed class SurrealPeerOplogConfirmationRecord : Record { + /// + /// Gets or sets the peer node identifier. + /// [JsonPropertyName("peerNodeId")] public string PeerNodeId { get; set; } = ""; + /// + /// Gets or sets the source node identifier. + /// [JsonPropertyName("sourceNodeId")] public string SourceNodeId { get; set; } = ""; + /// + /// Gets or sets the confirmed wall clock component. + /// [JsonPropertyName("confirmedWall")] public long ConfirmedWall { get; set; } + /// + /// Gets or sets the confirmed logical component. + /// [JsonPropertyName("confirmedLogic")] public int ConfirmedLogic { get; set; } + /// + /// Gets or sets the confirmed hash. + /// [JsonPropertyName("confirmedHash")] public string ConfirmedHash { get; set; } = ""; + /// + /// Gets or sets the last confirmation time in Unix milliseconds. + /// [JsonPropertyName("lastConfirmedUtcMs")] public long LastConfirmedUtcMs { get; set; } + /// + /// Gets or sets a value indicating whether the confirmation is active. + /// [JsonPropertyName("isActive")] public bool IsActive { get; set; } } internal sealed class SurrealSnapshotMetadataRecord : Record { + /// + /// Gets or sets the node identifier. + /// [JsonPropertyName("nodeId")] public string NodeId { get; set; } = ""; + /// + /// Gets or sets the timestamp physical time. + /// [JsonPropertyName("timestampPhysicalTime")] public long TimestampPhysicalTime { get; set; } + /// + /// Gets or sets the timestamp logical counter. + /// [JsonPropertyName("timestampLogicalCounter")] public int TimestampLogicalCounter { get; set; } + /// + /// Gets or sets the snapshot hash. + /// [JsonPropertyName("hash")] public string Hash { get; set; } = ""; } internal static class SurrealStoreRecordMappers { + /// + /// Maps a domain oplog entry to a SurrealDB record. + /// + /// The domain oplog entry. + /// The SurrealDB oplog record. public static SurrealOplogRecord ToSurrealRecord(this OplogEntry entry) { return new SurrealOplogRecord @@ -173,6 +298,11 @@ internal static class SurrealStoreRecordMappers }; } + /// + /// Maps a SurrealDB oplog record to a domain oplog entry. + /// + /// The SurrealDB oplog record. + /// The domain oplog entry. public static OplogEntry ToDomain(this SurrealOplogRecord record) { JsonElement? payload = null; @@ -189,6 +319,11 @@ internal static class SurrealStoreRecordMappers record.Hash); } + /// + /// Maps domain document metadata to a SurrealDB record. + /// + /// The domain document metadata. + /// The SurrealDB document metadata record. public static SurrealDocumentMetadataRecord ToSurrealRecord(this DocumentMetadata metadata) { return new SurrealDocumentMetadataRecord @@ -202,6 +337,11 @@ internal static class SurrealStoreRecordMappers }; } + /// + /// Maps a SurrealDB document metadata record to domain document metadata. + /// + /// The SurrealDB document metadata record. + /// The domain document metadata. public static DocumentMetadata ToDomain(this SurrealDocumentMetadataRecord record) { return new DocumentMetadata( @@ -211,6 +351,11 @@ internal static class SurrealStoreRecordMappers record.IsDeleted); } + /// + /// Maps a domain remote peer configuration to a SurrealDB record. + /// + /// The domain remote peer configuration. + /// The SurrealDB remote peer record. public static SurrealRemotePeerRecord ToSurrealRecord(this RemotePeerConfiguration peer) { return new SurrealRemotePeerRecord @@ -225,6 +370,11 @@ internal static class SurrealStoreRecordMappers }; } + /// + /// Maps a SurrealDB remote peer record to a domain remote peer configuration. + /// + /// The SurrealDB remote peer record. + /// The domain remote peer configuration. public static RemotePeerConfiguration ToDomain(this SurrealRemotePeerRecord record) { var result = new RemotePeerConfiguration @@ -242,6 +392,11 @@ internal static class SurrealStoreRecordMappers return result; } + /// + /// Maps a domain peer oplog confirmation to a SurrealDB record. + /// + /// The domain peer oplog confirmation. + /// The SurrealDB peer oplog confirmation record. public static SurrealPeerOplogConfirmationRecord ToSurrealRecord(this PeerOplogConfirmation confirmation) { return new SurrealPeerOplogConfirmationRecord @@ -256,6 +411,11 @@ internal static class SurrealStoreRecordMappers }; } + /// + /// Maps a SurrealDB peer oplog confirmation record to a domain model. + /// + /// The SurrealDB peer oplog confirmation record. + /// The domain peer oplog confirmation. public static PeerOplogConfirmation ToDomain(this SurrealPeerOplogConfirmationRecord record) { return new PeerOplogConfirmation @@ -270,6 +430,11 @@ internal static class SurrealStoreRecordMappers }; } + /// + /// Maps domain snapshot metadata to a SurrealDB record. + /// + /// The domain snapshot metadata. + /// The SurrealDB snapshot metadata record. public static SurrealSnapshotMetadataRecord ToSurrealRecord(this SnapshotMetadata metadata) { return new SurrealSnapshotMetadataRecord @@ -281,6 +446,11 @@ internal static class SurrealStoreRecordMappers }; } + /// + /// Maps a SurrealDB snapshot metadata record to a domain model. + /// + /// The SurrealDB snapshot metadata record. + /// The domain snapshot metadata. public static SnapshotMetadata ToDomain(this SurrealSnapshotMetadataRecord record) { return new SnapshotMetadata diff --git a/surreal.md b/surreal.md new file mode 100644 index 0000000..b3d4229 --- /dev/null +++ b/surreal.md @@ -0,0 +1,360 @@ +# BLite -> SurrealDB (Embedded + RocksDB) Migration Plan + +## 1) Goal and Scope + +Replace all BLite-backed persistence in this repository with SurrealDB embedded using RocksDB persistence, while preserving current CBDDC behavior: + +1. Automatic CDC-driven oplog generation for local writes. +2. Reliable sync across peers (including reconnect and snapshot flows). +3. Existing storage contracts (`IDocumentStore`, `IOplogStore`, `IPeerConfigurationStore`, `IDocumentMetadataStore`, `ISnapshotMetadataStore`, `IPeerOplogConfirmationStore`) and test semantics. +4. Full removal of BLite dependencies, APIs, and documentation references. + +## 2) Current-State Inventory (Repository-Specific) + +Primary BLite implementation and integration points currently live in: + +1. `src/ZB.MOM.WW.CBDDC.Persistence/BLite/CBDDCBLiteExtensions.cs` +2. `src/ZB.MOM.WW.CBDDC.Persistence/BLite/CBDDCDocumentDbContext.cs` +3. `src/ZB.MOM.WW.CBDDC.Persistence/BLite/BLiteDocumentStore.cs` +4. `src/ZB.MOM.WW.CBDDC.Persistence/BLite/BLiteOplogStore.cs` +5. `src/ZB.MOM.WW.CBDDC.Persistence/BLite/BLiteDocumentMetadataStore.cs` +6. `src/ZB.MOM.WW.CBDDC.Persistence/BLite/BLitePeerConfigurationStore.cs` +7. `src/ZB.MOM.WW.CBDDC.Persistence/BLite/BLitePeerOplogConfirmationStore.cs` +8. `src/ZB.MOM.WW.CBDDC.Persistence/BLite/BLiteSnapshotMetadataStore.cs` +9. `samples/ZB.MOM.WW.CBDDC.Sample.Console/SampleDbContext.cs` +10. `samples/ZB.MOM.WW.CBDDC.Sample.Console/SampleDocumentStore.cs` +11. `samples/ZB.MOM.WW.CBDDC.Sample.Console/Program.cs` +12. `tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/*.cs` (BLite-focused tests) +13. `tests/ZB.MOM.WW.CBDDC.E2E.Tests/ClusterCrudSyncE2ETests.cs` +14. `src/ZB.MOM.WW.CBDDC.Persistence/ZB.MOM.WW.CBDDC.Persistence.csproj` and sample/test package references +15. `README.md` and related docs that currently describe BLite as the embedded provider. + +## 3) Target Architecture + +### 3.1 Provider Surface + +Create a Surreal provider namespace and extension entrypoint that mirrors current integration shape: + +1. Add `AddCBDDCSurrealEmbedded<...>()` in a new file (e.g., `src/ZB.MOM.WW.CBDDC.Persistence/Surreal/CBDDCSurrealExtensions.cs`). +2. Register Surreal-backed implementations for all existing persistence interfaces. +3. Keep singleton lifetime for store services and Surreal client factory (equivalent to current BLite singleton model). +4. Expose options object including: +- RocksDB endpoint/path (`rocksdb://...`) +- Namespace +- Database +- CDC polling interval +- CDC batch size +- CDC retention duration + +### 3.2 Surreal Connection and Embedded Startup + +Use official embedded .NET guidance: + +1. Add Surreal embedded packages. +2. Use `SurrealDbEmbeddedClient`/RocksDB embedded client with `rocksdb://` endpoint. +3. Run `USE NS DB ` at startup. +4. Dispose/close client on host shutdown. + +### 3.3 Table Design (Schema + Indexing) + +Define internal tables as `SCHEMAFULL` and strongly typed fields to reduce runtime drift. + +Proposed tables: + +1. `oplog_entries` +2. `snapshot_metadatas` +3. `remote_peer_configurations` +4. `document_metadatas` +5. `peer_oplog_confirmations` +6. `cdc_checkpoints` (new: durable cursor per watched table) +7. Optional: `cdc_dedup` (new: idempotency window for duplicate/overlapping reads) + +Indexes and IDs: + +1. Prefer deterministic record IDs for point lookups (`table:id`) where possible. +2. Add unique indexes for business keys currently enforced in BLite: +- `oplog_entries.hash` +- `snapshot_metadatas.node_id` +- `(document_metadatas.collection, document_metadatas.key)` +- `(peer_oplog_confirmations.peer_node_id, peer_oplog_confirmations.source_node_id)` +3. Add composite indexes for hot sync queries: +- Oplog by `(timestamp_physical, timestamp_logical)` +- Oplog by `(timestamp_node_id, timestamp_physical, timestamp_logical)` +- Metadata by `(hlc_physical, hlc_logical)` +4. Use `EXPLAIN FULL` during test/benchmark phase to verify index usage. + +### 3.4 CDC Strategy (Durable + Low Latency) + +Implement CDC with Surreal Change Feeds as source of truth and Live Queries as optional accelerators. + +1. Enable `CHANGEFEED ` per watched table (`INCLUDE ORIGINAL` when old values are required for conflict handling/debug). +2. Persist checkpoint cursor (`versionstamp` preferred) in `cdc_checkpoints`. +3. Poll with `SHOW CHANGES FOR TABLE SINCE LIMIT `. +4. Process changes idempotently; tolerate duplicate windows when timestamp cursors overlap. +5. Commit checkpoint only after oplog + metadata writes commit successfully. +6. Optionally run `LIVE SELECT` subscribers for lower-latency wakeups, but never rely on live events alone for durability. +7. On startup/reconnect, always catch up via `SHOW CHANGES` from last persisted cursor. + +### 3.5 Transaction Boundaries + +Use explicit SurrealQL transactions for atomic state transitions: + +1. Local CDC event -> write oplog entry + document metadata + vector clock backing data in one transaction. +2. Remote apply batch -> apply documents + merge oplog + metadata updates atomically in bounded batches. +3. Snapshot replace/merge -> table-level clear/import or merge in deterministic order with rollback on failure. + +## 4) Execution Plan (Phased) + +## Phase 0: Design Freeze and Safety Rails + +1. Finalize data model and table schema DDL. +2. Finalize CDC cursor semantics (`versionstamp` vs timestamp fallback). +3. Freeze shared contracts in `ZB.MOM.WW.CBDDC.Core` (no signature churn during provider port). +4. Add migration feature flag for temporary cutover control (`UseSurrealPersistence`), removed in final cleanup. + +Exit criteria: + +1. Design doc approved. +2. DDL + index plan reviewed. +3. CDC retention value chosen (must exceed maximum offline peer window). + +## Phase 1: Surreal Infrastructure Layer + +1. Add Surreal packages and connection factory. +2. Implement startup initialization: NS/DB selection, table/index creation, capability checks. +3. Introduce provider options and DI extension (`AddCBDDCSurrealEmbedded`). +4. Add health probe for embedded connection and schema readiness. + +Exit criteria: + +1. `dotnet build` succeeds. +2. Basic smoke test can connect, create, read, and delete records in RocksDB-backed embedded Surreal. + +## Phase 2: Port Store Implementations + +Port each BLite store to Surreal while preserving interface behavior: + +1. `BLiteOplogStore` -> `SurrealOplogStore` +2. `BLiteDocumentMetadataStore` -> `SurrealDocumentMetadataStore` +3. `BLitePeerConfigurationStore` -> `SurrealPeerConfigurationStore` +4. `BLitePeerOplogConfirmationStore` -> `SurrealPeerOplogConfirmationStore` +5. `BLiteSnapshotMetadataStore` -> `SurrealSnapshotMetadataStore` + +Implementation requirements: + +1. Keep existing merge/drop/export/import semantics. +2. Preserve ordering guarantees for hash-chain methods. +3. Preserve vector clock bootstrap behavior (snapshot metadata first, oplog second). + +Exit criteria: + +1. Store-level unit tests pass with Surreal backend. +2. No BLite store classes used in DI path. + +## Phase 3: Document Store + CDC Engine + +1. Replace `BLiteDocumentStore` with Surreal-aware document store base. +2. Implement collection registration + watched table catalog. +3. Implement CDC worker: +- Poll `SHOW CHANGES` +- Map CDC events to `OperationType` +- Generate oplog + metadata +- Enforce remote-sync suppression/idempotency +4. Keep equivalent remote apply guard semantics to prevent CDC loopback during sync replay. +5. Add graceful start/stop lifecycle hooks for CDC worker. + +Exit criteria: + +1. Local direct writes produce expected oplog entries. +2. Remote replay does not create duplicate local oplog entries. +3. Restart resumes CDC from persisted checkpoint without missing changes. + +## Phase 4: Sample App and E2E Harness Migration + +1. Replace sample BLite context usage with Surreal-backed sample persistence. +2. Replace `AddCBDDCBLite` usage in sample and tests. +3. Update `ClusterCrudSyncE2ETests` internals that currently access BLite collections directly. +4. Refactor fallback CDC assertion logic to Surreal-based observability hooks. + +Exit criteria: + +1. Sample runs two-node sync with Surreal embedded RocksDB. +2. E2E CRUD bidirectional test passes unchanged in behavior. + +## Phase 5: Data Migration Tooling and Cutover + +1. Build one-time migration utility: +- Read BLite data via existing stores +- Write to Surreal tables +- Preserve hashes/timestamps exactly +2. Add verification routine comparing counts, hashes, and key spot checks. +3. Document migration command and rollback artifacts. + +Exit criteria: + +1. Dry-run migration succeeds on fixture DB. +2. Post-migration parity checks are clean. + +## Phase 6: Remove BLite Completely + +1. Delete `src/ZB.MOM.WW.CBDDC.Persistence/BLite/*` after Surreal parity is proven. +2. Remove BLite package references and BLite source generators from project files. +3. Remove `.blite` path assumptions from sample/tests/docs. +4. Update docs and READMEs to SurrealDB terminology. +5. Ensure `rg -n "BLite|blite|AddCBDDCBLite|CBDDCDocumentDbContext"` returns no functional references (except historical notes if intentionally retained). + +Exit criteria: + +1. Solution builds/tests pass with zero BLite runtime dependency. +2. Docs reflect Surreal-only provider path. + +## 5) Safe Parallel Subagent Plan + +Use parallel subagents only with strict ownership boundaries and integration gates. + +## 5.1 Subagent Work Split + +1. Subagent A (Infrastructure/DI) +- Owns: new Surreal options, connection factory, DI extension, startup schema init. +- Files: new `src/.../Surreal/*` infra files, `*.csproj` package refs. + +2. Subagent B (Core Stores) +- Owns: oplog/document metadata/snapshot metadata/peer config/peer confirmation Surreal stores. +- Files: `src/ZB.MOM.WW.CBDDC.Persistence/Surreal/*Store.cs`. + +3. Subagent C (CDC + DocumentStore) +- Owns: Surreal document store base, CDC poller, checkpoint persistence, suppression loop prevention. +- Files: `src/ZB.MOM.WW.CBDDC.Persistence/Surreal/*DocumentStore*`, CDC worker files. + +4. Subagent D (Tests) +- Owns: unit/integration/E2E tests migrated to Surreal. +- Files: `tests/*` touched by provider swap. + +5. Subagent E (Sample + Docs) +- Owns: sample console migration and doc rewrites. +- Files: `samples/*`, `README.md`, `docs/*` provider docs. + +## 5.2 Parallel Safety Rules + +1. No overlapping file ownership between active subagents. +2. Shared contract files are locked unless explicitly assigned to one subagent. +3. Each subagent must submit: +- changed file list +- rationale +- commands run +- test evidence +4. Integrator rebases/merges sequentially, never blindly squashing conflicting edits. +5. If a subagent encounters unrelated dirty changes, it must stop and escalate before editing. + +## 5.3 Integration Order + +1. Merge A -> B -> C -> D -> E. +2. Run full verification after each merge step, not only at the end. + +## 6) Required Unit/Integration Test Matrix + +## 6.1 Store Contract Tests + +1. Oplog append/export/import/merge/drop parity. +2. `GetChainRangeAsync` correctness by hash chain ordering. +3. `GetLastEntryHashAsync` behavior with oplog hit and snapshot fallback. +4. Pruning respects cutoff and confirmations. +5. Document metadata upsert/mark-deleted/get-after ordering. +6. Peer config save/get/remove/merge semantics. +7. Peer confirmation registration/update/deactivate/merge semantics. +8. Snapshot metadata insert/update/merge and hash lookup. + +## 6.2 CDC Tests + +1. Local write on watched table emits exactly one oplog entry. +2. Delete mutation emits delete oplog + metadata tombstone. +3. Remote apply path does not re-emit local CDC oplog entries. +4. CDC checkpoint persists only after atomic write success. +5. Restart from checkpoint catches missed changes. +6. Duplicate window replay is idempotent. +7. Changefeed retention boundary behavior is explicit and logged. + +## 6.3 Snapshot and Recovery Tests + +1. `CreateSnapshotAsync` includes docs/oplog/peers/confirmations. +2. `ReplaceDatabaseAsync` restores full state. +3. `MergeSnapshotAsync` conflict behavior unchanged. +4. Recovery after process restart retains Surreal RocksDB data. + +## 6.4 E2E Sync Tests + +1. Two peers replicate create/update/delete bidirectionally. +2. Peer reconnect performs incremental catch-up from CDC cursor. +3. Multi-change burst preserves deterministic final state. +4. Optional fault-injection test: crash between oplog write and checkpoint update should replay safely on restart. + +## 7) Verification After Each Subagent Completion + +Run this checklist after each merged subagent contribution: + +1. `dotnet restore` +2. `dotnet build CBDDC.slnx -c Release` +3. Targeted tests for modified projects (fast gate) +4. Full test suite before moving to next major phase: +- `dotnet test CBDDC.slnx -c Release` +5. Regression grep checks: +- `rg -n "BLite|AddCBDDCBLite|\.blite|CBDDCDocumentDbContext" src samples tests README.md docs` +6. Surreal smoke test: +- create temp RocksDB path +- start sample node +- perform write/update/delete +- restart process and verify persisted state +7. CDC durability test: +- stop node +- mutate source +- restart node +- confirm catch-up via `SHOW CHANGES` cursor + +## 8) Rollout and Rollback + +## Rollout + +1. Internal canary branch with Surreal-only provider. +2. Run full CI + extended E2E soak (long-running sync/reconnect). +3. Migrate one test dataset from BLite to Surreal and validate parity. +4. Promote after acceptance criteria are met. + +## Rollback + +1. Keep BLite export snapshots until Surreal cutover is accepted. +2. If severe defect appears, restore from pre-cutover snapshot and redeploy previous BLite-tagged build. +3. Preserve migration logs and parity reports for audit. + +## 9) Definition of Done + +1. No runtime BLite dependency remains. +2. All store contracts pass with Surreal backend. +3. CDC is durable (checkpointed), idempotent, and restart-safe. +4. Sample + E2E prove sync parity. +5. Documentation and onboarding instructions updated to Surreal embedded RocksDB. +6. Migration utility + validation report available for production cutover. + +## 10) SurrealDB Best-Practice Notes Applied in This Plan + +This plan explicitly applies official Surreal guidance: + +1. Embedded .NET with RocksDB endpoint (`rocksdb://`) and explicit NS/DB usage. +2. Schema-first design with strict table/field definitions and typed record references. +3. Query/index discipline (`EXPLAIN FULL`, indexed lookups, avoid broad scans). +4. CDC durability with changefeeds and checkpointed `SHOW CHANGES` replay. +5. Live queries used as low-latency signals, not as sole durable CDC transport. +6. Security hardening (authentication, encryption/backups, restricted capabilities) for any non-embedded server deployments used in tooling/CI. + +## References (Primary Sources) + +1. SurrealDB .NET embedded engine docs: [https://surrealdb.com/docs/surrealdb/embedding/dotnet](https://surrealdb.com/docs/surrealdb/embedding/dotnet) +2. SurrealDB .NET SDK embedding guide: [https://surrealdb.com/docs/sdk/dotnet/embedding](https://surrealdb.com/docs/sdk/dotnet/embedding) +3. SurrealDB connection strings (protocol formats incl. RocksDB): [https://surrealdb.com/docs/surrealdb/reference-guide/connection-strings](https://surrealdb.com/docs/surrealdb/reference-guide/connection-strings) +4. SurrealDB schema best practices: [https://surrealdb.com/docs/surrealdb/reference-guide/schema-creation-best-practices](https://surrealdb.com/docs/surrealdb/reference-guide/schema-creation-best-practices) +5. SurrealDB performance best practices: [https://surrealdb.com/docs/surrealdb/reference-guide/performance-best-practices](https://surrealdb.com/docs/surrealdb/reference-guide/performance-best-practices) +6. SurrealDB real-time/events best practices: [https://surrealdb.com/docs/surrealdb/reference-guide/realtime-best-practices](https://surrealdb.com/docs/surrealdb/reference-guide/realtime-best-practices) +7. SurrealQL `DEFINE TABLE` (changefeed options): [https://surrealdb.com/docs/surrealql/statements/define/table](https://surrealdb.com/docs/surrealql/statements/define/table) +8. SurrealQL `SHOW CHANGES` (durable CDC read): [https://surrealdb.com/docs/surrealql/statements/show](https://surrealdb.com/docs/surrealql/statements/show) +9. SurrealQL `LIVE SELECT` behavior and caveats: [https://surrealdb.com/docs/surrealql/statements/live](https://surrealdb.com/docs/surrealql/statements/live) +10. SurrealDB security best practices: [https://surrealdb.com/docs/surrealdb/security/security-best-practices](https://surrealdb.com/docs/surrealdb/security/security-best-practices) +11. SurrealQL transactions (`BEGIN`/`COMMIT`): [https://surrealdb.com/docs/surrealql/statements/begin](https://surrealdb.com/docs/surrealql/statements/begin), [https://surrealdb.com/docs/surrealql/statements/commit](https://surrealdb.com/docs/surrealql/statements/commit) diff --git a/tests/ZB.MOM.WW.CBDDC.Core.Tests/DocumentCacheTests.cs b/tests/ZB.MOM.WW.CBDDC.Core.Tests/DocumentCacheTests.cs new file mode 100644 index 0000000..0b087ac --- /dev/null +++ b/tests/ZB.MOM.WW.CBDDC.Core.Tests/DocumentCacheTests.cs @@ -0,0 +1,87 @@ +using System.Text.Json; +using ZB.MOM.WW.CBDDC.Core.Cache; +using ZB.MOM.WW.CBDDC.Core.Network; + +namespace ZB.MOM.WW.CBDDC.Core.Tests; + +public class DocumentCacheTests +{ + /// + /// Verifies cache hit/miss statistics after get and set operations. + /// + [Fact] + public async Task GetAndSet_ShouldTrackCacheHitsAndMisses() + { + var cache = new DocumentCache(CreateConfigProvider(maxDocumentCacheSize: 2)); + + Document? missing = await cache.Get("users", "1"); + missing.ShouldBeNull(); + + var document = CreateDocument("users", "1"); + await cache.Set("users", "1", document); + Document? hit = await cache.Get("users", "1"); + + hit.ShouldNotBeNull(); + hit.Key.ShouldBe("1"); + var stats = cache.GetStatistics(); + stats.Hits.ShouldBe(1); + stats.Misses.ShouldBe(1); + stats.Size.ShouldBe(1); + stats.HitRate.ShouldBe(0.5d); + } + + /// + /// Verifies least-recently-used eviction when cache capacity is reached. + /// + [Fact] + public async Task Set_WhenCacheIsFull_EvictsLeastRecentlyUsedEntry() + { + var cache = new DocumentCache(CreateConfigProvider(maxDocumentCacheSize: 2)); + await cache.Set("users", "1", CreateDocument("users", "1")); + await cache.Set("users", "2", CreateDocument("users", "2")); + + // Touch key 1 so key 2 becomes the LRU entry. + (await cache.Get("users", "1")).ShouldNotBeNull(); + + await cache.Set("users", "3", CreateDocument("users", "3")); + + (await cache.Get("users", "2")).ShouldBeNull(); + (await cache.Get("users", "1")).ShouldNotBeNull(); + (await cache.Get("users", "3")).ShouldNotBeNull(); + } + + /// + /// Verifies remove and clear operations delete entries from the cache. + /// + [Fact] + public async Task RemoveAndClear_ShouldDeleteEntriesFromCache() + { + var cache = new DocumentCache(CreateConfigProvider(maxDocumentCacheSize: 3)); + await cache.Set("users", "1", CreateDocument("users", "1")); + await cache.Set("users", "2", CreateDocument("users", "2")); + cache.GetStatistics().Size.ShouldBe(2); + + cache.Remove("users", "1"); + (await cache.Get("users", "1")).ShouldBeNull(); + cache.GetStatistics().Size.ShouldBe(1); + + cache.Clear(); + cache.GetStatistics().Size.ShouldBe(0); + } + + private static Document CreateDocument(string collection, string key) + { + using var json = JsonDocument.Parse("""{"name":"test"}"""); + return new Document(collection, key, json.RootElement.Clone(), new HlcTimestamp(1, 0, "node-a"), false); + } + + private static IPeerNodeConfigurationProvider CreateConfigProvider(int maxDocumentCacheSize) + { + var configProvider = Substitute.For(); + configProvider.GetConfiguration().Returns(new PeerNodeConfiguration + { + MaxDocumentCacheSize = maxDocumentCacheSize + }); + return configProvider; + } +} diff --git a/tests/ZB.MOM.WW.CBDDC.Core.Tests/OfflineQueueTests.cs b/tests/ZB.MOM.WW.CBDDC.Core.Tests/OfflineQueueTests.cs new file mode 100644 index 0000000..05ae8b1 --- /dev/null +++ b/tests/ZB.MOM.WW.CBDDC.Core.Tests/OfflineQueueTests.cs @@ -0,0 +1,92 @@ +using ZB.MOM.WW.CBDDC.Core.Network; +using ZB.MOM.WW.CBDDC.Core.Sync; + +namespace ZB.MOM.WW.CBDDC.Core.Tests; + +public class OfflineQueueTests +{ + /// + /// Verifies that enqueuing beyond capacity drops the oldest operation. + /// + [Fact] + public async Task Enqueue_WhenQueueIsFull_DropsOldestOperation() + { + var queue = new OfflineQueue(CreateConfigProvider(maxQueueSize: 2)); + await queue.Enqueue(CreateOperation("1")); + await queue.Enqueue(CreateOperation("2")); + await queue.Enqueue(CreateOperation("3")); + + var flushed = new List(); + (int successful, int failed) = await queue.FlushAsync(op => + { + flushed.Add(op.Key); + return Task.CompletedTask; + }); + + successful.ShouldBe(2); + failed.ShouldBe(0); + flushed.ShouldBe(["2", "3"]); + } + + /// + /// Verifies that flush continues when an executor throws and returns the failure count. + /// + [Fact] + public async Task FlushAsync_WhenExecutorThrows_ContinuesAndReturnsFailureCount() + { + var queue = new OfflineQueue(CreateConfigProvider(maxQueueSize: 10)); + await queue.Enqueue(CreateOperation("1")); + await queue.Enqueue(CreateOperation("2")); + + (int successful, int failed) = await queue.FlushAsync(op => + { + if (op.Key == "1") throw new InvalidOperationException("boom"); + return Task.CompletedTask; + }); + + successful.ShouldBe(1); + failed.ShouldBe(1); + queue.Count.ShouldBe(0); + } + + /// + /// Verifies that clear removes all queued operations. + /// + [Fact] + public async Task Clear_RemovesAllQueuedOperations() + { + var queue = new OfflineQueue(CreateConfigProvider(maxQueueSize: 10)); + await queue.Enqueue(CreateOperation("1")); + await queue.Enqueue(CreateOperation("2")); + queue.Count.ShouldBe(2); + + await queue.Clear(); + + queue.Count.ShouldBe(0); + (int successful, int failed) = await queue.FlushAsync(_ => Task.CompletedTask); + successful.ShouldBe(0); + failed.ShouldBe(0); + } + + private static PendingOperation CreateOperation(string key) + { + return new PendingOperation + { + Type = "upsert", + Collection = "users", + Key = key, + Data = new { Value = key }, + QueuedAt = DateTime.UtcNow + }; + } + + private static IPeerNodeConfigurationProvider CreateConfigProvider(int maxQueueSize) + { + var configProvider = Substitute.For(); + configProvider.GetConfiguration().Returns(new PeerNodeConfiguration + { + MaxQueueSize = maxQueueSize + }); + return configProvider; + } +} diff --git a/tests/ZB.MOM.WW.CBDDC.Core.Tests/RetryPolicyTests.cs b/tests/ZB.MOM.WW.CBDDC.Core.Tests/RetryPolicyTests.cs new file mode 100644 index 0000000..54f601d --- /dev/null +++ b/tests/ZB.MOM.WW.CBDDC.Core.Tests/RetryPolicyTests.cs @@ -0,0 +1,78 @@ +using ZB.MOM.WW.CBDDC.Core.Exceptions; +using ZB.MOM.WW.CBDDC.Core.Network; +using ZB.MOM.WW.CBDDC.Core.Resilience; + +namespace ZB.MOM.WW.CBDDC.Core.Tests; + +public class RetryPolicyTests +{ + /// + /// Verifies transient failures are retried until a successful result is returned. + /// + [Fact] + public async Task ExecuteAsync_WhenTransientFailureEventuallySucceeds_RetriesAndReturnsResult() + { + var policy = new RetryPolicy(CreateConfigProvider(retryAttempts: 3, retryDelayMs: 1)); + var attempts = 0; + + int result = await policy.ExecuteAsync(async () => + { + attempts++; + if (attempts < 3) throw new NetworkException("transient"); + await Task.CompletedTask; + return 42; + }, "test-op"); + + result.ShouldBe(42); + attempts.ShouldBe(3); + } + + /// + /// Verifies transient failures throw retry exhausted when all retries are consumed. + /// + [Fact] + public async Task ExecuteAsync_WhenTransientFailureExhausted_ThrowsRetryExhaustedException() + { + var policy = new RetryPolicy(CreateConfigProvider(retryAttempts: 2, retryDelayMs: 1)); + var attempts = 0; + + var ex = await Should.ThrowAsync(() => policy.ExecuteAsync(() => + { + attempts++; + throw new NetworkException("still transient"); + }, "test-op")); + + ex.ErrorCode.ShouldBe("RETRY_EXHAUSTED"); + ex.InnerException.ShouldBeOfType(); + attempts.ShouldBe(2); + } + + /// + /// Verifies non-transient failures are not retried. + /// + [Fact] + public async Task ExecuteAsync_WhenFailureIsNonTransient_DoesNotRetry() + { + var policy = new RetryPolicy(CreateConfigProvider(retryAttempts: 3, retryDelayMs: 1)); + var attempts = 0; + + await Should.ThrowAsync(() => policy.ExecuteAsync(() => + { + attempts++; + throw new InvalidOperationException("non-transient"); + }, "test-op")); + + attempts.ShouldBe(1); + } + + private static IPeerNodeConfigurationProvider CreateConfigProvider(int retryAttempts, int retryDelayMs) + { + var configProvider = Substitute.For(); + configProvider.GetConfiguration().Returns(new PeerNodeConfiguration + { + RetryAttempts = retryAttempts, + RetryDelayMs = retryDelayMs + }); + return configProvider; + } +} diff --git a/tests/ZB.MOM.WW.CBDDC.E2E.Tests/ClusterCrudSyncE2ETests.cs b/tests/ZB.MOM.WW.CBDDC.E2E.Tests/ClusterCrudSyncE2ETests.cs index 06f6e5a..2a15da5 100644 --- a/tests/ZB.MOM.WW.CBDDC.E2E.Tests/ClusterCrudSyncE2ETests.cs +++ b/tests/ZB.MOM.WW.CBDDC.E2E.Tests/ClusterCrudSyncE2ETests.cs @@ -428,12 +428,12 @@ public class ClusterCrudSyncE2ETests && replicated.Address?.City == payload.Address?.City; }, 60, "Node B did not converge after crash-window recovery.", () => BuildDiagnostics(recoveredNodeA, nodeB)); - await AssertEventuallyAsync( - () => recoveredNodeA.GetOplogCountForKey("Users", userId) == 1 && - nodeB.GetOplogCountForKey("Users", userId) == 1, - 60, - "Crash-window recovery created duplicate oplog entries.", - () => BuildDiagnostics(recoveredNodeA, nodeB)); + await AssertEventuallyAsync( + () => recoveredNodeA.GetOplogCountForKey("Users", userId) == 1 && + nodeB.GetOplogCountForKey("Users", userId) == 1, + 60, + "Crash-window recovery created duplicate oplog entries.", + () => BuildDiagnostics(recoveredNodeA, nodeB)); } } finally @@ -569,6 +569,9 @@ public class ClusterCrudSyncE2ETests /// The TCP port used by the node listener. /// The cluster authentication token. /// The known peers this node can connect to. + /// An optional working directory override for test artifacts. + /// A value indicating whether to preserve the working directory on dispose. + /// A value indicating whether to inject a checkpoint persistence that fails once. /// A configured instance. public static TestPeerNode Create( string nodeId, @@ -690,6 +693,12 @@ public class ClusterCrudSyncE2ETests return Context.Users.Find(u => u.Id == userId).FirstOrDefault(); } + /// + /// Gets the local oplog entry count for a collection/key pair produced by this node. + /// + /// The collection name. + /// The document key. + /// The number of local oplog entries matching the key. public int GetLocalOplogCountForKey(string collection, string key) { return Context.OplogEntries.FindAll() @@ -699,6 +708,12 @@ public class ClusterCrudSyncE2ETests string.Equals(e.TimestampNodeId, _nodeId, StringComparison.Ordinal)); } + /// + /// Gets the total oplog entry count for a collection/key pair across nodes. + /// + /// The collection name. + /// The document key. + /// The number of oplog entries matching the key. public int GetOplogCountForKey(string collection, string key) { return Context.OplogEntries.FindAll() @@ -824,6 +839,14 @@ public class ClusterCrudSyncE2ETests private const string UsersCollection = "Users"; private const string TodoListsCollection = "TodoLists"; + /// + /// Initializes a new instance of the class. + /// + /// The sample database context. + /// The peer node configuration provider. + /// The vector clock service. + /// The checkpoint persistence implementation. + /// The optional logger instance. public FaultInjectedSampleDocumentStore( SampleDbContext context, IPeerNodeConfigurationProvider configProvider, @@ -849,6 +872,7 @@ public class ClusterCrudSyncE2ETests WatchCollection(TodoListsCollection, context.TodoLists, t => t.Id); } + /// protected override async Task ApplyContentToEntityAsync( string collection, string key, @@ -858,6 +882,7 @@ public class ClusterCrudSyncE2ETests await UpsertEntityAsync(collection, key, content, cancellationToken); } + /// protected override async Task ApplyContentToEntitiesBatchAsync( IEnumerable<(string Collection, string Key, JsonElement Content)> documents, CancellationToken cancellationToken) @@ -866,6 +891,7 @@ public class ClusterCrudSyncE2ETests await UpsertEntityAsync(collection, key, content, cancellationToken); } + /// protected override async Task GetEntityAsJsonAsync( string collection, string key, @@ -879,6 +905,7 @@ public class ClusterCrudSyncE2ETests }; } + /// protected override async Task RemoveEntityAsync( string collection, string key, @@ -887,6 +914,7 @@ public class ClusterCrudSyncE2ETests await DeleteEntityAsync(collection, key, cancellationToken); } + /// protected override async Task RemoveEntitiesBatchAsync( IEnumerable<(string Collection, string Key)> documents, CancellationToken cancellationToken) @@ -895,6 +923,7 @@ public class ClusterCrudSyncE2ETests await DeleteEntityAsync(collection, key, cancellationToken); } + /// protected override async Task> GetAllEntitiesAsJsonAsync( string collection, CancellationToken cancellationToken) @@ -967,6 +996,7 @@ public class ClusterCrudSyncE2ETests { private int _failOnNextAdvance = 1; + /// public Task GetCheckpointAsync( string? consumerId = null, CancellationToken cancellationToken = default) @@ -974,6 +1004,7 @@ public class ClusterCrudSyncE2ETests return Task.FromResult(null); } + /// public Task UpsertCheckpointAsync( HlcTimestamp timestamp, string lastHash, @@ -984,6 +1015,7 @@ public class ClusterCrudSyncE2ETests return Task.CompletedTask; } + /// public Task AdvanceCheckpointAsync( OplogEntry entry, string? consumerId = null, diff --git a/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/SurrealCdcDurabilityTests.cs b/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/SurrealCdcDurabilityTests.cs index 7f759a3..2d9ede8 100644 --- a/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/SurrealCdcDurabilityTests.cs +++ b/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/SurrealCdcDurabilityTests.cs @@ -13,6 +13,9 @@ namespace ZB.MOM.WW.CBDDC.Sample.Console.Tests; [Collection("SurrealCdcDurability")] public class SurrealCdcDurabilityTests { + /// + /// Verifies checkpoints persist latest local changes per consumer across restarts. + /// [Fact] public async Task CheckpointPersistence_ShouldTrackLatestLocalChange_AndPersistPerConsumer() { @@ -85,6 +88,9 @@ public class SurrealCdcDurabilityTests } } + /// + /// Verifies recovery resumes from a persisted checkpoint and advances after catch-up. + /// [Fact] public async Task RestartRecovery_ShouldResumeCatchUpFromPersistedCheckpoint_InRocksDb() { @@ -153,6 +159,9 @@ public class SurrealCdcDurabilityTests } } + /// + /// Verifies duplicate remote apply windows are idempotent without loopback entries. + /// [Fact] public async Task RemoteApply_ShouldBeIdempotentAcrossDuplicateWindow_WithoutLoopbackEntries() { @@ -207,6 +216,9 @@ public class SurrealCdcDurabilityTests } } + /// + /// Verifies local deletes persist tombstone metadata and advance checkpoints. + /// [Fact] public async Task LocalDelete_ShouldPersistTombstoneMetadata_AndAdvanceCheckpoint() { @@ -358,21 +370,46 @@ internal sealed class CdcTestHarness : IAsyncDisposable NullLogger.Instance); } + /// + /// Gets the sample database context. + /// public SampleDbContext Context { get; } + /// + /// Gets the checkpointed sample document store. + /// public CheckpointedSampleDocumentStore DocumentStore { get; } + /// + /// Gets the oplog store used by the harness. + /// public SurrealOplogStore OplogStore { get; } + /// + /// Gets the document metadata store. + /// public SurrealDocumentMetadataStore MetadataStore { get; } + /// + /// Gets checkpoint persistence used for CDC progress tracking. + /// public ISurrealCdcCheckpointPersistence CheckpointPersistence { get; } + /// + /// Polls CDC once through the document store. + /// public async Task PollAsync() { await DocumentStore.PollCdcOnceAsync(); } + /// + /// Creates a harness instance with retries for transient RocksDB lock contention. + /// + /// Database directory path. + /// Node identifier. + /// CDC consumer identifier. + /// Initialized test harness. public static async Task OpenWithRetriesAsync( string databasePath, string nodeId, @@ -391,6 +428,11 @@ internal sealed class CdcTestHarness : IAsyncDisposable throw new InvalidOperationException("Unable to acquire RocksDB lock for test harness."); } + /// + /// Gets oplog entries for a collection ordered by timestamp. + /// + /// Collection name. + /// Ordered oplog entries. public async Task> GetEntriesByCollectionAsync(string collection) { return (await OplogStore.ExportAsync()) @@ -400,6 +442,12 @@ internal sealed class CdcTestHarness : IAsyncDisposable .ToList(); } + /// + /// Gets oplog entries for a collection key ordered by timestamp. + /// + /// Collection name. + /// Document key. + /// Ordered oplog entries. public async Task> GetEntriesByKeyAsync(string collection, string key) { return (await OplogStore.ExportAsync()) @@ -410,6 +458,7 @@ internal sealed class CdcTestHarness : IAsyncDisposable .ToList(); } + /// public async ValueTask DisposeAsync() { DocumentStore.Dispose(); @@ -428,6 +477,15 @@ internal sealed class CheckpointedSampleDocumentStore : SurrealDocumentStore + /// Initializes a new instance of the class. + /// + /// Sample database context. + /// Peer configuration provider. + /// Vector clock service. + /// Checkpoint persistence implementation. + /// Optional Surreal embedded options. + /// Optional logger. public CheckpointedSampleDocumentStore( SampleDbContext context, IPeerNodeConfigurationProvider configProvider, @@ -450,6 +508,7 @@ internal sealed class CheckpointedSampleDocumentStore : SurrealDocumentStore t.Id, subscribeForInMemoryEvents: false); } + /// protected override async Task ApplyContentToEntityAsync( string collection, string key, @@ -459,6 +518,7 @@ internal sealed class CheckpointedSampleDocumentStore : SurrealDocumentStore protected override async Task ApplyContentToEntitiesBatchAsync( IEnumerable<(string Collection, string Key, JsonElement Content)> documents, CancellationToken cancellationToken) @@ -467,6 +527,7 @@ internal sealed class CheckpointedSampleDocumentStore : SurrealDocumentStore protected override async Task GetEntityAsJsonAsync( string collection, string key, @@ -480,6 +541,7 @@ internal sealed class CheckpointedSampleDocumentStore : SurrealDocumentStore protected override async Task RemoveEntityAsync( string collection, string key, @@ -488,6 +550,7 @@ internal sealed class CheckpointedSampleDocumentStore : SurrealDocumentStore protected override async Task RemoveEntitiesBatchAsync( IEnumerable<(string Collection, string Key)> documents, CancellationToken cancellationToken) @@ -496,6 +559,7 @@ internal sealed class CheckpointedSampleDocumentStore : SurrealDocumentStore protected override async Task> GetAllEntitiesAsJsonAsync( string collection, CancellationToken cancellationToken) diff --git a/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/SurrealCdcMatrixCompletionTests.cs b/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/SurrealCdcMatrixCompletionTests.cs index 870cc88..9be457d 100644 --- a/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/SurrealCdcMatrixCompletionTests.cs +++ b/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/SurrealCdcMatrixCompletionTests.cs @@ -13,6 +13,11 @@ namespace ZB.MOM.WW.CBDDC.Sample.Console.Tests; public class SurrealCdcMatrixCompletionTests { + /// + /// Verifies retention-boundary classifier behavior across expected exception message patterns. + /// + /// The exception message sample. + /// Expected classifier outcome. [Theory] [InlineData("versionstamp is outside the configured retention window", true)] [InlineData("change feed history since cursor is unavailable", true)] @@ -29,6 +34,9 @@ public class SurrealCdcMatrixCompletionTests actual.ShouldBe(expected); } + /// + /// Verifies a local write produces exactly one oplog entry. + /// [Fact] public async Task LocalWrite_ShouldEmitExactlyOneOplogEntry() { @@ -63,6 +71,9 @@ public class SurrealCdcMatrixCompletionTests } } + /// + /// Verifies checkpoint persistence does not advance when atomic write fails. + /// [Fact] public async Task Checkpoint_ShouldNotAdvance_WhenAtomicWriteFails() { @@ -139,6 +150,14 @@ public class SurrealCdcMatrixCompletionTests internal sealed class FailureInjectedDocumentStore : SurrealDocumentStore { + /// + /// Initializes a new instance of the class. + /// + /// The embedded Surreal client provider. + /// The schema initializer. + /// The node configuration provider. + /// The vector clock service. + /// The CDC checkpoint persistence dependency. public FailureInjectedDocumentStore( ICBDDCSurrealEmbeddedClient surrealEmbeddedClient, ICBDDCSurrealSchemaInitializer schemaInitializer, @@ -158,6 +177,15 @@ internal sealed class FailureInjectedDocumentStore : SurrealDocumentStore + /// Triggers local change handling for testing failure scenarios. + /// + /// The collection name. + /// The document key. + /// The operation type. + /// Optional document content payload. + /// Cancellation token. + /// A task that completes when processing is finished. public Task TriggerLocalChangeAsync( string collection, string key, @@ -174,6 +202,7 @@ internal sealed class FailureInjectedDocumentStore : SurrealDocumentStore protected override Task ApplyContentToEntityAsync( string collection, string key, @@ -183,6 +212,7 @@ internal sealed class FailureInjectedDocumentStore : SurrealDocumentStore protected override Task ApplyContentToEntitiesBatchAsync( IEnumerable<(string Collection, string Key, JsonElement Content)> documents, CancellationToken cancellationToken) @@ -190,6 +220,7 @@ internal sealed class FailureInjectedDocumentStore : SurrealDocumentStore protected override Task GetEntityAsJsonAsync( string collection, string key, @@ -198,11 +229,13 @@ internal sealed class FailureInjectedDocumentStore : SurrealDocumentStore(null); } + /// protected override Task RemoveEntityAsync(string collection, string key, CancellationToken cancellationToken) { return Task.CompletedTask; } + /// protected override Task RemoveEntitiesBatchAsync( IEnumerable<(string Collection, string Key)> documents, CancellationToken cancellationToken) @@ -210,6 +243,7 @@ internal sealed class FailureInjectedDocumentStore : SurrealDocumentStore protected override Task> GetAllEntitiesAsJsonAsync( string collection, CancellationToken cancellationToken) diff --git a/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/SurrealInfrastructureReadinessTests.cs b/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/SurrealInfrastructureReadinessTests.cs new file mode 100644 index 0000000..ad2d5a0 --- /dev/null +++ b/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/SurrealInfrastructureReadinessTests.cs @@ -0,0 +1,98 @@ +using SurrealDb.Net.Models.Response; +using ZB.MOM.WW.CBDDC.Persistence.Surreal; + +namespace ZB.MOM.WW.CBDDC.Sample.Console.Tests; + +public class SurrealInfrastructureReadinessTests +{ + /// + /// Verifies schema initialization runs only once for repeated calls. + /// + [Fact] + public async Task SchemaInitializer_ShouldApplySchemaOnlyOnce() + { + var embeddedClient = Substitute.For(); + embeddedClient.RawQueryAsync( + Arg.Any(), + Arg.Any>(), + Arg.Any()) + .Returns(Task.FromResult(default(SurrealDbResponse)!)); + + var options = new CBDDCSurrealEmbeddedOptions + { + Cdc = new CBDDCSurrealCdcOptions + { + CheckpointTable = "custom_checkpoint", + RetentionDuration = TimeSpan.FromHours(12) + } + }; + + var initializer = new CBDDCSurrealSchemaInitializer(embeddedClient, options); + await initializer.EnsureInitializedAsync(); + await initializer.EnsureInitializedAsync(); + + await embeddedClient.Received(1).RawQueryAsync( + Arg.Is(sql => + sql.Contains("DEFINE TABLE OVERWRITE cbddc_oplog_entries", StringComparison.Ordinal) && + sql.Contains("CHANGEFEED 12h", StringComparison.Ordinal) && + sql.Contains("DEFINE TABLE OVERWRITE custom_checkpoint", StringComparison.Ordinal)), + Arg.Any>(), + Arg.Any()); + } + + /// + /// Verifies invalid checkpoint table identifiers are rejected. + /// + [Fact] + public void SchemaInitializer_WhenCheckpointIdentifierIsInvalid_ThrowsArgumentException() + { + var embeddedClient = Substitute.For(); + var options = new CBDDCSurrealEmbeddedOptions + { + Cdc = new CBDDCSurrealCdcOptions + { + CheckpointTable = "invalid-checkpoint" + } + }; + + Should.Throw(() => new CBDDCSurrealSchemaInitializer(embeddedClient, options)); + } + + /// + /// Verifies readiness returns true when schema init and health checks succeed. + /// + [Fact] + public async Task ReadinessProbe_WhenSchemaAndHealthSucceed_ReturnsTrue() + { + var embeddedClient = Substitute.For(); + embeddedClient.HealthAsync(Arg.Any()).Returns(true); + + var schemaInitializer = Substitute.For(); + schemaInitializer.EnsureInitializedAsync(Arg.Any()).Returns(Task.CompletedTask); + + var probe = new CBDDCSurrealReadinessProbe(embeddedClient, schemaInitializer); + bool isReady = await probe.IsReadyAsync(); + + isReady.ShouldBeTrue(); + await schemaInitializer.Received(1).EnsureInitializedAsync(Arg.Any()); + await embeddedClient.Received(1).HealthAsync(Arg.Any()); + } + + /// + /// Verifies readiness returns false when schema initialization fails. + /// + [Fact] + public async Task ReadinessProbe_WhenSchemaInitializationThrows_ReturnsFalse() + { + var embeddedClient = Substitute.For(); + var schemaInitializer = Substitute.For(); + schemaInitializer.EnsureInitializedAsync(Arg.Any()) + .Returns(Task.FromException(new InvalidOperationException("boom"))); + + var probe = new CBDDCSurrealReadinessProbe(embeddedClient, schemaInitializer); + bool isReady = await probe.IsReadyAsync(); + + isReady.ShouldBeFalse(); + await embeddedClient.DidNotReceive().HealthAsync(Arg.Any()); + } +} diff --git a/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/SurrealStoreContractTests.cs b/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/SurrealStoreContractTests.cs index 53bbabd..dfbf6fd 100644 --- a/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/SurrealStoreContractTests.cs +++ b/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/SurrealStoreContractTests.cs @@ -11,6 +11,9 @@ namespace ZB.MOM.WW.CBDDC.Sample.Console.Tests; public class SurrealOplogStoreContractTests { + /// + /// Verifies append, range query, merge, drop, and last-hash behavior for the oplog store. + /// [Fact] public async Task OplogStore_AppendQueryMergeDrop_AndLastHash_Works() { @@ -71,6 +74,9 @@ public class SurrealOplogStoreContractTests public class SurrealDocumentMetadataStoreContractTests { + /// + /// Verifies upsert, deletion marking, incremental reads, and merge precedence for document metadata. + /// [Fact] public async Task DocumentMetadataStore_UpsertMarkDeletedGetAfterAndMergeNewer_Works() { @@ -108,6 +114,9 @@ public class SurrealDocumentMetadataStoreContractTests public class SurrealPeerConfigurationStoreContractTests { + /// + /// Verifies save, read, remove, and merge behavior for remote peer configuration records. + /// [Fact] public async Task PeerConfigurationStore_SaveGetRemoveAndMerge_Works() { @@ -163,6 +172,9 @@ public class SurrealPeerConfigurationStoreContractTests public class SurrealPeerOplogConfirmationStoreContractTests { + /// + /// Verifies peer registration, confirmation updates, and peer deactivation semantics. + /// [Fact] public async Task PeerOplogConfirmationStore_EnsureUpdateAndDeactivate_Works() { @@ -194,6 +206,9 @@ public class SurrealPeerOplogConfirmationStoreContractTests afterDeactivate.All(x => x.IsActive == false).ShouldBeTrue(); } + /// + /// Verifies merge semantics prefer newer confirmations and preserve active-state transitions. + /// [Fact] public async Task PeerOplogConfirmationStore_Merge_UsesNewerAndActiveStateSemantics() { @@ -262,6 +277,9 @@ public class SurrealPeerOplogConfirmationStoreContractTests public class SurrealSnapshotMetadataStoreContractTests { + /// + /// Verifies insert, update, merge, and hash lookup behavior for snapshot metadata records. + /// [Fact] public async Task SnapshotMetadataStore_InsertUpdateMergeAndHashLookup_Works() { @@ -333,6 +351,9 @@ internal sealed class SurrealTestHarness : IAsyncDisposable private readonly string _rootPath; private readonly ICBDDCSurrealSchemaInitializer _schemaInitializer; + /// + /// Initializes a temporary embedded Surreal environment for contract tests. + /// public SurrealTestHarness() { string suffix = Guid.NewGuid().ToString("N"); @@ -351,6 +372,9 @@ internal sealed class SurrealTestHarness : IAsyncDisposable _schemaInitializer = new TestSurrealSchemaInitializer(_client); } + /// + /// Creates a document metadata store instance bound to the test harness database. + /// public SurrealDocumentMetadataStore CreateDocumentMetadataStore() { return new SurrealDocumentMetadataStore( @@ -359,6 +383,9 @@ internal sealed class SurrealTestHarness : IAsyncDisposable NullLogger.Instance); } + /// + /// Creates an oplog store instance bound to the test harness database. + /// public SurrealOplogStore CreateOplogStore() { return new SurrealOplogStore( @@ -371,6 +398,9 @@ internal sealed class SurrealTestHarness : IAsyncDisposable NullLogger.Instance); } + /// + /// Creates a peer configuration store instance bound to the test harness database. + /// public SurrealPeerConfigurationStore CreatePeerConfigurationStore() { return new SurrealPeerConfigurationStore( @@ -379,6 +409,9 @@ internal sealed class SurrealTestHarness : IAsyncDisposable NullLogger.Instance); } + /// + /// Creates a peer oplog confirmation store instance bound to the test harness database. + /// public SurrealPeerOplogConfirmationStore CreatePeerOplogConfirmationStore() { return new SurrealPeerOplogConfirmationStore( @@ -387,6 +420,9 @@ internal sealed class SurrealTestHarness : IAsyncDisposable NullLogger.Instance); } + /// + /// Creates a snapshot metadata store instance bound to the test harness database. + /// public SurrealSnapshotMetadataStore CreateSnapshotMetadataStore() { return new SurrealSnapshotMetadataStore( @@ -395,6 +431,7 @@ internal sealed class SurrealTestHarness : IAsyncDisposable NullLogger.Instance); } + /// public async ValueTask DisposeAsync() { await _client.DisposeAsync(); @@ -421,11 +458,16 @@ internal sealed class TestSurrealSchemaInitializer : ICBDDCSurrealSchemaInitiali private readonly ICBDDCSurrealEmbeddedClient _client; private int _initialized; + /// + /// Initializes a new instance of the class. + /// + /// The embedded client to initialize. public TestSurrealSchemaInitializer(ICBDDCSurrealEmbeddedClient client) { _client = client; } + /// public async Task EnsureInitializedAsync(CancellationToken cancellationToken = default) { if (Interlocked.Exchange(ref _initialized, 1) == 1) return;