using System.Collections.Concurrent; using System.Net; using System.Net.Sockets; using System.Text.Json; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using ZB.MOM.WW.CBDDC.Core; using ZB.MOM.WW.CBDDC.Core.Network; using ZB.MOM.WW.CBDDC.Core.Storage; using ZB.MOM.WW.CBDDC.Core.Sync; using ZB.MOM.WW.CBDDC.Network; using ZB.MOM.WW.CBDDC.Network.Security; using ZB.MOM.WW.CBDDC.Persistence.Surreal; namespace ZB.MOM.WW.CBDDC.E2E.Tests; public class ClusterCrudSyncE2ETests { /// /// Verifies two real peers replicate create, update, and delete operations in both directions. /// [Fact] public async Task TwoPeers_ShouldReplicateCrudBidirectionally() { var clusterToken = Guid.NewGuid().ToString("N"); int nodeAPort = GetAvailableTcpPort(); int nodeBPort = GetAvailableTcpPort(); while (nodeBPort == nodeAPort) nodeBPort = GetAvailableTcpPort(); var nodeA = TestPeerNode.Create( "node-a", nodeAPort, clusterToken, [ new KnownPeerConfiguration { NodeId = "node-b", Host = "127.0.0.1", Port = nodeBPort } ]); await using var nodeB = TestPeerNode.Create( "node-b", nodeBPort, clusterToken, [ new KnownPeerConfiguration { NodeId = "node-a", Host = "127.0.0.1", Port = nodeAPort } ]); await nodeA.StartAsync(); await nodeB.StartAsync(); const int timeoutSeconds = 45; var nodeAUserId = "user-from-a"; await nodeA.UpsertUserAsync(new User { Id = nodeAUserId, Name = "Alice", Age = 30, Address = new Address { City = "Austin" } }); await AssertEventuallyAsync(() => { var replicated = nodeB.ReadUser(nodeAUserId); return replicated is not null && replicated.Name == "Alice" && replicated.Age == 30 && replicated.Address?.City == "Austin"; }, timeoutSeconds, "Node B did not receive create from node A.", () => BuildDiagnostics(nodeA, nodeB)); await AssertEventuallyAsync( () => nodeA.ReadUser(nodeAUserId) is not null, timeoutSeconds, "Node A could not read back its own created user.", () => BuildDiagnostics(nodeA, nodeB)); await nodeA.DeleteUserAsync(nodeAUserId); await nodeA.UpsertUserAsync(new User { Id = nodeAUserId, Name = "Alice Updated", Age = 31, Address = new Address { City = "Dallas" } }); await AssertEventuallyAsync(() => { var replicated = nodeB.ReadUser(nodeAUserId); return replicated is not null && replicated.Name == "Alice Updated" && replicated.Age == 31 && replicated.Address?.City == "Dallas"; }, timeoutSeconds, "Node B did not receive update from node A.", () => BuildDiagnostics(nodeA, nodeB)); await nodeA.DeleteUserAsync(nodeAUserId); await AssertEventuallyAsync( () => nodeB.ReadUser(nodeAUserId) is null, timeoutSeconds, "Node B did not receive delete from node A.", () => BuildDiagnostics(nodeA, nodeB)); var nodeBUserId = "user-from-b"; await nodeB.UpsertUserAsync(new User { Id = nodeBUserId, Name = "Bob", Age = 40, Address = new Address { City = "Boston" } }); await AssertEventuallyAsync(() => { var replicated = nodeA.ReadUser(nodeBUserId); return replicated is not null && replicated.Name == "Bob" && replicated.Age == 40 && replicated.Address?.City == "Boston"; }, timeoutSeconds, "Node A did not receive create from node B.", () => BuildDiagnostics(nodeA, nodeB)); await AssertEventuallyAsync( () => nodeB.ReadUser(nodeBUserId) is not null, timeoutSeconds, "Node B could not read back its own created user.", () => BuildDiagnostics(nodeA, nodeB)); await nodeB.DeleteUserAsync(nodeBUserId); await nodeB.UpsertUserAsync(new User { Id = nodeBUserId, Name = "Bob Updated", Age = 41, Address = new Address { City = "Denver" } }); await AssertEventuallyAsync(() => { var replicated = nodeA.ReadUser(nodeBUserId); return replicated is not null && replicated.Name == "Bob Updated" && replicated.Age == 41 && replicated.Address?.City == "Denver"; }, timeoutSeconds, "Node A did not receive update from node B.", () => BuildDiagnostics(nodeA, nodeB)); await nodeB.DeleteUserAsync(nodeBUserId); await AssertEventuallyAsync( () => nodeA.ReadUser(nodeBUserId) is null, timeoutSeconds, "Node A did not receive delete from node B.", () => BuildDiagnostics(nodeA, nodeB)); } /// /// Verifies a reconnecting peer catches up mutations that happened while it was offline. /// [Fact] public async Task PeerReconnect_ShouldCatchUpMissedChanges() { var clusterToken = Guid.NewGuid().ToString("N"); int nodeAPort = GetAvailableTcpPort(); int nodeBPort = GetAvailableTcpPort(); while (nodeBPort == nodeAPort) nodeBPort = GetAvailableTcpPort(); var nodeA = TestPeerNode.Create( "node-a", nodeAPort, clusterToken, [ new KnownPeerConfiguration { NodeId = "node-b", Host = "127.0.0.1", Port = nodeBPort } ]); await using var nodeB = TestPeerNode.Create( "node-b", nodeBPort, clusterToken, [ new KnownPeerConfiguration { NodeId = "node-a", Host = "127.0.0.1", Port = nodeAPort } ]); await nodeA.StartAsync(); await nodeB.StartAsync(); await nodeB.StopAsync(); const string userId = "reconnect-user"; await nodeA.UpsertUserAsync(new User { Id = userId, Name = "Offline Create", Age = 20, Address = new Address { City = "Rome" } }); await nodeA.UpsertUserAsync(new User { Id = userId, Name = "Offline Update", Age = 21, Address = new Address { City = "Milan" } }); await nodeA.UpsertUserAsync(new User { Id = userId, Name = "Offline Final", Age = 22, Address = new Address { City = "Turin" } }); await nodeB.StartAsync(); await AssertEventuallyAsync(() => { var replicated = nodeB.ReadUser(userId); return replicated is not null && replicated.Name == "Offline Final" && replicated.Age == 22 && replicated.Address?.City == "Turin"; }, 60, "Node B did not catch up missed reconnect mutations.", () => BuildDiagnostics(nodeA, nodeB)); } /// /// Verifies a burst of rapid multi-node mutations converges to a deterministic final state. /// [Fact] public async Task MultiChangeBurst_ShouldConvergeDeterministically() { var clusterToken = Guid.NewGuid().ToString("N"); int nodeAPort = GetAvailableTcpPort(); int nodeBPort = GetAvailableTcpPort(); while (nodeBPort == nodeAPort) nodeBPort = GetAvailableTcpPort(); await using var nodeA = TestPeerNode.Create( "node-a", nodeAPort, clusterToken, [ new KnownPeerConfiguration { NodeId = "node-b", Host = "127.0.0.1", Port = nodeBPort } ]); await using var nodeB = TestPeerNode.Create( "node-b", nodeBPort, clusterToken, [ new KnownPeerConfiguration { NodeId = "node-a", Host = "127.0.0.1", Port = nodeAPort } ]); await nodeA.StartAsync(); await nodeB.StartAsync(); const int burstCount = 8; for (var i = 0; i < burstCount; i++) { string aId = $"burst-a-{i:D2}"; string bId = $"burst-b-{i:D2}"; await nodeA.UpsertUserAsync(new User { Id = aId, Name = $"A-{i}", Age = 30 + i, Address = new Address { City = $"CityA-{i}" } }); await nodeB.UpsertUserAsync(new User { Id = bId, Name = $"B-{i}", Age = 40 + i, Address = new Address { City = $"CityB-{i}" } }); } await AssertEventuallyAsync( () => nodeA.Context.Users.FindAll().Count() == burstCount * 2 && nodeB.Context.Users.FindAll().Count() == burstCount * 2, 60, "Burst convergence did not reach expected document counts.", () => BuildDiagnostics(nodeA, nodeB)); await AssertEventuallyAsync(() => { for (var i = 0; i < burstCount; i++) { var aOnB = nodeB.ReadUser($"burst-a-{i:D2}"); var bOnA = nodeA.ReadUser($"burst-b-{i:D2}"); if (aOnB is null || bOnA is null) return false; if (aOnB.Name != $"A-{i}" || bOnA.Name != $"B-{i}") return false; } return true; }, 60, "Burst convergence content mismatch.", () => BuildDiagnostics(nodeA, nodeB)); } /// /// Verifies recovery safety when a process crashes after oplog commit but before checkpoint advance. /// [Fact] public async Task CrashBetweenOplogAndCheckpoint_ShouldReplaySafelyOnRestart() { var clusterToken = Guid.NewGuid().ToString("N"); int nodeAPort = GetAvailableTcpPort(); int nodeBPort = GetAvailableTcpPort(); while (nodeBPort == nodeAPort) nodeBPort = GetAvailableTcpPort(); string sharedWorkDir = Path.Combine(Path.GetTempPath(), $"cbddc-e2e-crash-{Guid.NewGuid():N}"); Directory.CreateDirectory(sharedWorkDir); await using var nodeA = TestPeerNode.Create( "node-a", nodeAPort, clusterToken, [ new KnownPeerConfiguration { NodeId = "node-b", Host = "127.0.0.1", Port = nodeBPort } ], workDirOverride: sharedWorkDir, preserveWorkDirOnDispose: true, useFaultInjectedCheckpointStore: true); bool nodeADisposed = false; try { await using var nodeB = TestPeerNode.Create( "node-b", nodeBPort, clusterToken, [ new KnownPeerConfiguration { NodeId = "node-a", Host = "127.0.0.1", Port = nodeAPort } ]); await nodeA.StartAsync(); await nodeB.StartAsync(); const string userId = "crash-window-user"; var payload = new User { Id = userId, Name = "Crash Recovered", Age = 45, Address = new Address { City = "Naples" } }; await Should.ThrowAsync(() => nodeA.UpsertUserAsync(payload)); nodeA.ReadUser(userId).ShouldNotBeNull(); nodeA.GetLocalOplogCountForKey("Users", userId).ShouldBe(1); await nodeA.StopAsync(); await nodeA.DisposeAsync(); nodeADisposed = true; TestPeerNode? recoveredNodeA = null; for (var attempt = 0; attempt < 10; attempt++) try { recoveredNodeA = TestPeerNode.Create( "node-a", nodeAPort, clusterToken, [ new KnownPeerConfiguration { NodeId = "node-b", Host = "127.0.0.1", Port = nodeBPort } ], workDirOverride: sharedWorkDir); break; } catch (Exception ex) when (IsRocksDbLockContention(ex) && attempt < 9) { await Task.Delay(100); } recoveredNodeA.ShouldNotBeNull(); await using (recoveredNodeA) { await recoveredNodeA.StartAsync(); await AssertEventuallyAsync(() => { var replicated = nodeB.ReadUser(userId); return replicated is not null && replicated.Name == payload.Name && replicated.Age == payload.Age && replicated.Address?.City == payload.Address?.City; }, 60, "Node B did not converge after crash-window recovery.", () => BuildDiagnostics(recoveredNodeA, nodeB)); await AssertEventuallyAsync( () => recoveredNodeA.GetOplogCountForKey("Users", userId) == 1 && nodeB.GetOplogCountForKey("Users", userId) == 1, 60, "Crash-window recovery created duplicate oplog entries.", () => BuildDiagnostics(recoveredNodeA, nodeB)); } } finally { if (!nodeADisposed) await nodeA.DisposeAsync(); } } private static async Task AssertEventuallyAsync( Func predicate, int timeoutSeconds, string failureMessage, Func? diagnostics = null) { var timeout = TimeSpan.FromSeconds(timeoutSeconds); var startedAt = DateTime.UtcNow; while (DateTime.UtcNow - startedAt < timeout) { if (predicate()) return; await Task.Delay(250); } string suffix = diagnostics is null ? string.Empty : $"{Environment.NewLine}{diagnostics()}"; throw new ShouldAssertException($"{failureMessage}{suffix}"); } private static string BuildDiagnostics(TestPeerNode nodeA, TestPeerNode nodeB) { int nodeAUserCount = nodeA.Context.Users.FindAll().Count(); int nodeBUserCount = nodeB.Context.Users.FindAll().Count(); int nodeAOplogCount = nodeA.Context.OplogEntries.FindAll().Count(); int nodeBOplogCount = nodeB.Context.OplogEntries.FindAll().Count(); string nodeAOplogByAuthor = string.Join( ", ", nodeA.Context.OplogEntries.FindAll() .GroupBy(e => e.TimestampNodeId) .Select(g => $"{g.Key}:{g.Count()}")); string nodeBOplogByAuthor = string.Join( ", ", nodeB.Context.OplogEntries.FindAll() .GroupBy(e => e.TimestampNodeId) .Select(g => $"{g.Key}:{g.Count()}")); string nodeAUsers = string.Join(", ", nodeA.Context.Users.FindAll().Select(u => $"{u.Id}:{u.Name}:{u.Age}:{u.Address?.City}")); string nodeBUsers = string.Join(", ", nodeB.Context.Users.FindAll().Select(u => $"{u.Id}:{u.Name}:{u.Age}:{u.Address?.City}")); return string.Join( Environment.NewLine, "Diagnostics:", $"NodeA users={nodeAUserCount}, oplog={nodeAOplogCount}", $"NodeA users detail={nodeAUsers}", $"NodeA oplog by author={nodeAOplogByAuthor}", $"NodeB users={nodeBUserCount}, oplog={nodeBOplogCount}", $"NodeB users detail={nodeBUsers}", $"NodeB oplog by author={nodeBOplogByAuthor}", "NodeA logs:", nodeA.GetRecentLogs(), "NodeB logs:", nodeB.GetRecentLogs()); } private static int GetAvailableTcpPort() { using var listener = new TcpListener(IPAddress.Loopback, 0); listener.Start(); return ((IPEndPoint)listener.LocalEndpoint).Port; } private static bool IsRocksDbLockContention(Exception exception) { return exception.ToString().Contains("No locks available", StringComparison.OrdinalIgnoreCase); } private sealed class TestPeerNode : IAsyncDisposable { private readonly InMemoryLogSink _logSink; private readonly ICBDDCNode _node; private readonly string _nodeId; private readonly IOplogStore _oplogStore; private readonly ServiceProvider _services; private readonly string _workDir; private readonly bool _preserveWorkDirOnDispose; private long _lastPhysicalTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); private int _logicalCounter; private bool _started; private TestPeerNode( ServiceProvider services, ICBDDCNode node, IOplogStore oplogStore, SampleDbContext context, InMemoryLogSink logSink, string workDir, string nodeId, bool preserveWorkDirOnDispose) { _services = services; _node = node; _oplogStore = oplogStore; Context = context; _logSink = logSink; _workDir = workDir; _nodeId = nodeId; _preserveWorkDirOnDispose = preserveWorkDirOnDispose; } /// /// Gets the Surreal-backed context used by this test peer. /// public SampleDbContext Context { get; } /// public async ValueTask DisposeAsync() { try { await StopAsync(); } catch { } _services.Dispose(); if (!_preserveWorkDirOnDispose) TryDeleteDirectory(_workDir); } /// /// Creates a test peer node and wires all required services. /// /// The unique node identifier. /// The TCP port used by the node listener. /// The cluster authentication token. /// The known peers this node can connect to. /// A configured instance. public static TestPeerNode Create( string nodeId, int tcpPort, string authToken, IReadOnlyList knownPeers, string? workDirOverride = null, bool preserveWorkDirOnDispose = false, bool useFaultInjectedCheckpointStore = false) { string workDir = workDirOverride ?? Path.Combine(Path.GetTempPath(), $"cbddc-e2e-{nodeId}-{Guid.NewGuid():N}"); Directory.CreateDirectory(workDir); string dbPath = Path.Combine(workDir, "node.rocksdb"); string surrealDatabase = nodeId.Replace("-", "_", StringComparison.Ordinal); var configProvider = new StaticPeerNodeConfigurationProvider(new PeerNodeConfiguration { NodeId = nodeId, TcpPort = tcpPort, AuthToken = authToken, KnownPeers = knownPeers.ToList() }); var services = new ServiceCollection(); services.AddSingleton(new InMemoryLogSink(nodeId)); services.AddSingleton(); services.AddLogging(builder => builder.SetMinimumLevel(LogLevel.Debug)); services.AddSingleton(configProvider); services.AddSingleton(configProvider); services.AddSingleton(); services.AddSingleton(); var surrealOptionsFactory = new Func(_ => new CBDDCSurrealEmbeddedOptions { Endpoint = "rocksdb://local", DatabasePath = dbPath, Namespace = "cbddc_e2e", Database = surrealDatabase, Cdc = new CBDDCSurrealCdcOptions { Enabled = true, ConsumerId = $"{nodeId}-main" } }); var coreBuilder = services.AddCBDDCCore(); if (useFaultInjectedCheckpointStore) { services.AddSingleton(); coreBuilder.AddCBDDCSurrealEmbedded(surrealOptionsFactory) .AddCBDDCNetwork(false); } else { coreBuilder.AddCBDDCSurrealEmbedded(surrealOptionsFactory) .AddCBDDCNetwork(false); } // Deterministic tests: sync uses explicit known peers, so disable UDP discovery. services.AddSingleton(); services.AddSingleton(); var provider = services.BuildServiceProvider(); var node = provider.GetRequiredService(); var oplogStore = provider.GetRequiredService(); var context = provider.GetRequiredService(); var logSink = provider.GetRequiredService(); return new TestPeerNode( provider, node, oplogStore, context, logSink, workDir, nodeId, preserveWorkDirOnDispose); } /// /// Starts the underlying node when it has not been started yet. /// /// A task that represents the asynchronous operation. public async Task StartAsync() { if (_started) return; await _node.Start(); _started = true; } /// /// Stops the underlying node when it is currently running. /// /// A task that represents the asynchronous operation. public async Task StopAsync() { if (!_started) return; try { await _node.Stop(); } catch (ObjectDisposedException) { } catch (AggregateException ex) when (ex.InnerExceptions.All(e => e is ObjectDisposedException)) { } _started = false; } /// /// Reads a user document by identifier. /// /// The identifier of the user to read. /// The matching user when found; otherwise . public User? ReadUser(string userId) { return Context.Users.Find(u => u.Id == userId).FirstOrDefault(); } public int GetLocalOplogCountForKey(string collection, string key) { return Context.OplogEntries.FindAll() .Count(e => string.Equals(e.Collection, collection, StringComparison.Ordinal) && string.Equals(e.Key, key, StringComparison.Ordinal) && string.Equals(e.TimestampNodeId, _nodeId, StringComparison.Ordinal)); } public int GetOplogCountForKey(string collection, string key) { return Context.OplogEntries.FindAll() .Count(e => string.Equals(e.Collection, collection, StringComparison.Ordinal) && string.Equals(e.Key, key, StringComparison.Ordinal)); } /// /// Inserts or updates a user and persists the matching oplog entry. /// /// The user payload to upsert. /// A task that represents the asynchronous operation. public async Task UpsertUserAsync(User user) { await PersistUserMutationWithOplogFallbackAsync( user.Id, OperationType.Put, JsonSerializer.SerializeToElement(user), async () => { var existing = Context.Users.Find(u => u.Id == user.Id).FirstOrDefault(); if (existing == null) await Context.Users.InsertAsync(user); else await Context.Users.UpdateAsync(user); await Context.SaveChangesAsync(); }); } /// /// Deletes a user and persists the matching oplog entry. /// /// The identifier of the user to delete. /// A task that represents the asynchronous operation. public async Task DeleteUserAsync(string userId) { await PersistUserMutationWithOplogFallbackAsync( userId, OperationType.Delete, null, async () => { await Context.Users.DeleteAsync(userId); await Context.SaveChangesAsync(); }); } /// /// Gets recent in-memory logs captured for this node. /// /// The maximum number of log entries to return. /// A newline-delimited string of recent log entries. public string GetRecentLogs(int max = 50) { return _logSink.GetRecent(max); } private async Task PersistUserMutationWithOplogFallbackAsync( string userId, OperationType operationType, JsonElement? payload, Func mutation) { int oplogCountBefore = Context.OplogEntries.FindAll().Count(); await mutation(); // Prefer native CDC path; fallback only when CDC fails to emit. var deadline = DateTime.UtcNow.AddSeconds(3); while (DateTime.UtcNow < deadline) { if (Context.OplogEntries.FindAll().Count() > oplogCountBefore) return; await Task.Delay(50); } string previousHash = await _oplogStore.GetLastEntryHashAsync(_nodeId) ?? string.Empty; var fallbackEntry = new OplogEntry( "Users", userId, operationType, payload, NextTimestamp(), previousHash); await _oplogStore.AppendOplogEntryAsync(fallbackEntry); await Context.SaveChangesAsync(); } private HlcTimestamp NextTimestamp() { long now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); if (now > _lastPhysicalTime) { _lastPhysicalTime = now; _logicalCounter = 0; } else { _logicalCounter++; } return new HlcTimestamp(_lastPhysicalTime, _logicalCounter, _nodeId); } private static void TryDeleteDirectory(string path) { if (!Directory.Exists(path)) return; try { Directory.Delete(path, true); } catch { } } } private sealed class FaultInjectedSampleDocumentStore : SurrealDocumentStore { private const string UsersCollection = "Users"; private const string TodoListsCollection = "TodoLists"; public FaultInjectedSampleDocumentStore( SampleDbContext context, IPeerNodeConfigurationProvider configProvider, IVectorClockService vectorClockService, ISurrealCdcCheckpointPersistence checkpointPersistence, ILogger? logger = null) : base( context, context.SurrealEmbeddedClient, context.SchemaInitializer, configProvider, vectorClockService, new LastWriteWinsConflictResolver(), checkpointPersistence, new SurrealCdcPollingOptions { Enabled = false, EnableLiveSelectAccelerator = false }, logger) { WatchCollection(UsersCollection, context.Users, u => u.Id); WatchCollection(TodoListsCollection, context.TodoLists, t => t.Id); } protected override async Task ApplyContentToEntityAsync( string collection, string key, JsonElement content, CancellationToken cancellationToken) { await UpsertEntityAsync(collection, key, content, cancellationToken); } protected override async Task ApplyContentToEntitiesBatchAsync( IEnumerable<(string Collection, string Key, JsonElement Content)> documents, CancellationToken cancellationToken) { foreach ((string collection, string key, var content) in documents) await UpsertEntityAsync(collection, key, content, cancellationToken); } protected override async Task GetEntityAsJsonAsync( string collection, string key, CancellationToken cancellationToken) { return collection switch { UsersCollection => SerializeEntity(await _context.Users.FindByIdAsync(key, cancellationToken)), TodoListsCollection => SerializeEntity(await _context.TodoLists.FindByIdAsync(key, cancellationToken)), _ => null }; } protected override async Task RemoveEntityAsync( string collection, string key, CancellationToken cancellationToken) { await DeleteEntityAsync(collection, key, cancellationToken); } protected override async Task RemoveEntitiesBatchAsync( IEnumerable<(string Collection, string Key)> documents, CancellationToken cancellationToken) { foreach ((string collection, string key) in documents) await DeleteEntityAsync(collection, key, cancellationToken); } protected override async Task> GetAllEntitiesAsJsonAsync( string collection, CancellationToken cancellationToken) { return collection switch { UsersCollection => (await _context.Users.FindAllAsync(cancellationToken)) .Select(u => (u.Id, SerializeEntity(u)!.Value)) .ToList(), TodoListsCollection => (await _context.TodoLists.FindAllAsync(cancellationToken)) .Select(t => (t.Id, SerializeEntity(t)!.Value)) .ToList(), _ => [] }; } private async Task UpsertEntityAsync( string collection, string key, JsonElement content, CancellationToken cancellationToken) { switch (collection) { case UsersCollection: var user = content.Deserialize() ?? throw new InvalidOperationException("Failed to deserialize user."); user.Id = key; if (await _context.Users.FindByIdAsync(key, cancellationToken) == null) await _context.Users.InsertAsync(user, cancellationToken); else await _context.Users.UpdateAsync(user, cancellationToken); break; case TodoListsCollection: var todo = content.Deserialize() ?? throw new InvalidOperationException("Failed to deserialize todo list."); todo.Id = key; if (await _context.TodoLists.FindByIdAsync(key, cancellationToken) == null) await _context.TodoLists.InsertAsync(todo, cancellationToken); else await _context.TodoLists.UpdateAsync(todo, cancellationToken); break; default: throw new NotSupportedException($"Collection '{collection}' is not supported for sync."); } } private async Task DeleteEntityAsync(string collection, string key, CancellationToken cancellationToken) { switch (collection) { case UsersCollection: await _context.Users.DeleteAsync(key, cancellationToken); break; case TodoListsCollection: await _context.TodoLists.DeleteAsync(key, cancellationToken); break; } } private static JsonElement? SerializeEntity(T? entity) where T : class { return entity == null ? null : JsonSerializer.SerializeToElement(entity); } } private sealed class CrashAfterFirstAdvanceCheckpointPersistence : ISurrealCdcCheckpointPersistence { private int _failOnNextAdvance = 1; public Task GetCheckpointAsync( string? consumerId = null, CancellationToken cancellationToken = default) { return Task.FromResult(null); } public Task UpsertCheckpointAsync( HlcTimestamp timestamp, string lastHash, string? consumerId = null, CancellationToken cancellationToken = default, long? versionstampCursor = null) { return Task.CompletedTask; } public Task AdvanceCheckpointAsync( OplogEntry entry, string? consumerId = null, CancellationToken cancellationToken = default) { if (Interlocked.Exchange(ref _failOnNextAdvance, 0) == 1) throw new InvalidOperationException("Injected crash between oplog commit and checkpoint advance."); return Task.CompletedTask; } } private sealed class PassiveDiscoveryService : IDiscoveryService { /// public IEnumerable GetActivePeers() { return Array.Empty(); } /// public Task Start() { return Task.CompletedTask; } /// public Task Stop() { return Task.CompletedTask; } } private sealed class StaticPeerNodeConfigurationProvider : IPeerNodeConfigurationProvider { private PeerNodeConfiguration _configuration; /// /// Initializes a new instance of the class. /// /// The initial peer node configuration. public StaticPeerNodeConfigurationProvider(PeerNodeConfiguration configuration) { _configuration = configuration; } /// public event PeerNodeConfigurationChangedEventHandler? ConfigurationChanged; /// public Task GetConfiguration() { return Task.FromResult(_configuration); } /// public void Update(PeerNodeConfiguration configuration) { _configuration = configuration; ConfigurationChanged?.Invoke(this, configuration); } } private sealed class InMemoryLogSink { private readonly ConcurrentQueue _entries = new(); private readonly string _nodeId; /// /// Initializes a new instance of the class. /// /// The node identifier associated with emitted logs. public InMemoryLogSink(string nodeId) { _nodeId = nodeId; } /// /// Adds a log entry to the in-memory sink. /// /// The log category. /// The log level. /// The formatted log message. /// The optional exception associated with the log entry. public void Add(string category, LogLevel level, string message, Exception? exception) { var text = $"[{DateTime.UtcNow:O}] {_nodeId} {level} {category}: {message}"; if (exception is not null) text = $"{text}{Environment.NewLine}{exception}"; _entries.Enqueue(text); while (_entries.Count > 500 && _entries.TryDequeue(out _)) { } } /// /// Gets the most recent log entries from the sink. /// /// The maximum number of entries to return. /// A newline-delimited string of recent log entries, or a placeholder when none exist. public string GetRecent(int max) { string[] entries = _entries.ToArray(); if (entries.Length == 0) return ""; return string.Join(Environment.NewLine, entries.TakeLast(max)); } } private sealed class InMemoryLoggerProvider : ILoggerProvider { private readonly InMemoryLogSink _sink; /// /// Initializes a new instance of the class. /// /// The shared sink used to capture log messages. public InMemoryLoggerProvider(InMemoryLogSink sink) { _sink = sink; } /// public ILogger CreateLogger(string categoryName) { return new InMemoryLogger(categoryName, _sink); } /// public void Dispose() { } } private sealed class InMemoryLogger : ILogger { private readonly string _categoryName; private readonly InMemoryLogSink _sink; /// /// Initializes a new instance of the class. /// /// The logger category name. /// The sink that stores emitted log messages. public InMemoryLogger(string categoryName, InMemoryLogSink sink) { _categoryName = categoryName; _sink = sink; } /// public IDisposable BeginScope(TState state) where TState : notnull { return NullScope.Instance; } /// public bool IsEnabled(LogLevel logLevel) { return true; } /// public void Log( LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func formatter) { _sink.Add(_categoryName, logLevel, formatter(state, exception), exception); } } private sealed class NullScope : IDisposable { public static readonly NullScope Instance = new(); /// public void Dispose() { } } }