using System.Net; using System.Net.Sockets; using System.Collections.Concurrent; using System.Text.Json; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using ZB.MOM.WW.CBDDC.Core; using ZB.MOM.WW.CBDDC.Core.Network; using ZB.MOM.WW.CBDDC.Core.Storage; using ZB.MOM.WW.CBDDC.Network; using ZB.MOM.WW.CBDDC.Network.Security; using ZB.MOM.WW.CBDDC.Persistence.BLite; namespace ZB.MOM.WW.CBDDC.E2E.Tests; public class ClusterCrudSyncE2ETests { /// /// Verifies two real peers replicate create, update, and delete operations in both directions. /// [Fact] public async Task TwoPeers_ShouldReplicateCrudBidirectionally() { var clusterToken = Guid.NewGuid().ToString("N"); var nodeAPort = GetAvailableTcpPort(); var nodeBPort = GetAvailableTcpPort(); while (nodeBPort == nodeAPort) { nodeBPort = GetAvailableTcpPort(); } await using var nodeA = TestPeerNode.Create( nodeId: "node-a", tcpPort: nodeAPort, authToken: clusterToken, knownPeers: [ new KnownPeerConfiguration { NodeId = "node-b", Host = "127.0.0.1", Port = nodeBPort } ]); await using var nodeB = TestPeerNode.Create( nodeId: "node-b", tcpPort: nodeBPort, authToken: clusterToken, knownPeers: [ new KnownPeerConfiguration { NodeId = "node-a", Host = "127.0.0.1", Port = nodeAPort } ]); await nodeA.StartAsync(); await nodeB.StartAsync(); const int timeoutSeconds = 45; var nodeAUserId = "user-from-a"; await nodeA.UpsertUserAsync(new User { Id = nodeAUserId, Name = "Alice", Age = 30, Address = new Address { City = "Austin" } }); await AssertEventuallyAsync(() => { var replicated = nodeB.ReadUser(nodeAUserId); return replicated is not null && replicated.Name == "Alice" && replicated.Age == 30 && replicated.Address?.City == "Austin"; }, timeoutSeconds, "Node B did not receive create from node A.", () => BuildDiagnostics(nodeA, nodeB)); await AssertEventuallyAsync( () => nodeA.ReadUser(nodeAUserId) is not null, timeoutSeconds, "Node A could not read back its own created user.", () => BuildDiagnostics(nodeA, nodeB)); await nodeA.DeleteUserAsync(nodeAUserId); await nodeA.UpsertUserAsync(new User { Id = nodeAUserId, Name = "Alice Updated", Age = 31, Address = new Address { City = "Dallas" } }); await AssertEventuallyAsync(() => { var replicated = nodeB.ReadUser(nodeAUserId); return replicated is not null && replicated.Name == "Alice Updated" && replicated.Age == 31 && replicated.Address?.City == "Dallas"; }, timeoutSeconds, "Node B did not receive update from node A.", () => BuildDiagnostics(nodeA, nodeB)); await nodeA.DeleteUserAsync(nodeAUserId); await AssertEventuallyAsync( () => nodeB.ReadUser(nodeAUserId) is null, timeoutSeconds, "Node B did not receive delete from node A.", () => BuildDiagnostics(nodeA, nodeB)); var nodeBUserId = "user-from-b"; await nodeB.UpsertUserAsync(new User { Id = nodeBUserId, Name = "Bob", Age = 40, Address = new Address { City = "Boston" } }); await AssertEventuallyAsync(() => { var replicated = nodeA.ReadUser(nodeBUserId); return replicated is not null && replicated.Name == "Bob" && replicated.Age == 40 && replicated.Address?.City == "Boston"; }, timeoutSeconds, "Node A did not receive create from node B.", () => BuildDiagnostics(nodeA, nodeB)); await AssertEventuallyAsync( () => nodeB.ReadUser(nodeBUserId) is not null, timeoutSeconds, "Node B could not read back its own created user.", () => BuildDiagnostics(nodeA, nodeB)); await nodeB.DeleteUserAsync(nodeBUserId); await nodeB.UpsertUserAsync(new User { Id = nodeBUserId, Name = "Bob Updated", Age = 41, Address = new Address { City = "Denver" } }); await AssertEventuallyAsync(() => { var replicated = nodeA.ReadUser(nodeBUserId); return replicated is not null && replicated.Name == "Bob Updated" && replicated.Age == 41 && replicated.Address?.City == "Denver"; }, timeoutSeconds, "Node A did not receive update from node B.", () => BuildDiagnostics(nodeA, nodeB)); await nodeB.DeleteUserAsync(nodeBUserId); await AssertEventuallyAsync( () => nodeA.ReadUser(nodeBUserId) is null, timeoutSeconds, "Node A did not receive delete from node B.", () => BuildDiagnostics(nodeA, nodeB)); } private static async Task AssertEventuallyAsync( Func predicate, int timeoutSeconds, string failureMessage, Func? diagnostics = null) { var timeout = TimeSpan.FromSeconds(timeoutSeconds); var startedAt = DateTime.UtcNow; while (DateTime.UtcNow - startedAt < timeout) { if (predicate()) { return; } await Task.Delay(250); } var suffix = diagnostics is null ? string.Empty : $"{Environment.NewLine}{diagnostics()}"; throw new Shouldly.ShouldAssertException($"{failureMessage}{suffix}"); } private static string BuildDiagnostics(TestPeerNode nodeA, TestPeerNode nodeB) { var nodeAUserCount = nodeA.Context.Users.FindAll().Count(); var nodeBUserCount = nodeB.Context.Users.FindAll().Count(); var nodeAOplogCount = nodeA.Context.OplogEntries.FindAll().Count(); var nodeBOplogCount = nodeB.Context.OplogEntries.FindAll().Count(); var nodeAOplogByAuthor = string.Join( ", ", nodeA.Context.OplogEntries.FindAll() .GroupBy(e => e.TimestampNodeId) .Select(g => $"{g.Key}:{g.Count()}")); var nodeBOplogByAuthor = string.Join( ", ", nodeB.Context.OplogEntries.FindAll() .GroupBy(e => e.TimestampNodeId) .Select(g => $"{g.Key}:{g.Count()}")); var nodeAUsers = string.Join(", ", nodeA.Context.Users.FindAll().Select(u => $"{u.Id}:{u.Name}:{u.Age}:{u.Address?.City}")); var nodeBUsers = string.Join(", ", nodeB.Context.Users.FindAll().Select(u => $"{u.Id}:{u.Name}:{u.Age}:{u.Address?.City}")); return string.Join( Environment.NewLine, "Diagnostics:", $"NodeA users={nodeAUserCount}, oplog={nodeAOplogCount}", $"NodeA users detail={nodeAUsers}", $"NodeA oplog by author={nodeAOplogByAuthor}", $"NodeB users={nodeBUserCount}, oplog={nodeBOplogCount}", $"NodeB users detail={nodeBUsers}", $"NodeB oplog by author={nodeBOplogByAuthor}", "NodeA logs:", nodeA.GetRecentLogs(), "NodeB logs:", nodeB.GetRecentLogs()); } private static int GetAvailableTcpPort() { using var listener = new TcpListener(IPAddress.Loopback, 0); listener.Start(); return ((IPEndPoint)listener.LocalEndpoint).Port; } private sealed class TestPeerNode : IAsyncDisposable { private readonly ServiceProvider _services; private readonly ICBDDCNode _node; private readonly IOplogStore _oplogStore; private readonly string _nodeId; private readonly string _workDir; private readonly InMemoryLogSink _logSink; private bool _started; private long _lastPhysicalTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); private int _logicalCounter; public SampleDbContext Context { get; } private TestPeerNode( ServiceProvider services, ICBDDCNode node, IOplogStore oplogStore, SampleDbContext context, InMemoryLogSink logSink, string workDir, string nodeId) { _services = services; _node = node; _oplogStore = oplogStore; Context = context; _logSink = logSink; _workDir = workDir; _nodeId = nodeId; } public static TestPeerNode Create( string nodeId, int tcpPort, string authToken, IReadOnlyList knownPeers) { var workDir = Path.Combine(Path.GetTempPath(), $"cbddc-e2e-{nodeId}-{Guid.NewGuid():N}"); Directory.CreateDirectory(workDir); var dbPath = Path.Combine(workDir, "node.blite"); var configProvider = new StaticPeerNodeConfigurationProvider(new PeerNodeConfiguration { NodeId = nodeId, TcpPort = tcpPort, AuthToken = authToken, KnownPeers = knownPeers.ToList() }); var services = new ServiceCollection(); services.AddSingleton(new InMemoryLogSink(nodeId)); services.AddSingleton(); services.AddLogging(builder => builder.SetMinimumLevel(LogLevel.Debug)); services.AddSingleton(configProvider); services.AddSingleton(configProvider); services.AddCBDDCCore() .AddCBDDCBLite(_ => new SampleDbContext(dbPath)) .AddCBDDCNetwork(useHostedService: false); // Deterministic tests: sync uses explicit known peers, so disable UDP discovery. services.AddSingleton(); services.AddSingleton(); var provider = services.BuildServiceProvider(); var node = provider.GetRequiredService(); var oplogStore = provider.GetRequiredService(); var context = provider.GetRequiredService(); var logSink = provider.GetRequiredService(); return new TestPeerNode(provider, node, oplogStore, context, logSink, workDir, nodeId); } public async Task StartAsync() { if (_started) { return; } await _node.Start(); _started = true; } public async Task StopAsync() { if (!_started) { return; } await _node.Stop(); _started = false; } public async ValueTask DisposeAsync() { try { await StopAsync(); } catch { } _services.Dispose(); TryDeleteDirectory(_workDir); } public User? ReadUser(string userId) { return Context.Users.Find(u => u.Id == userId).FirstOrDefault(); } public async Task UpsertUserAsync(User user) { await PersistUserMutationWithOplogFallbackAsync( user.Id, OperationType.Put, JsonSerializer.SerializeToElement(user), async () => { var existing = Context.Users.Find(u => u.Id == user.Id).FirstOrDefault(); if (existing == null) { await Context.Users.InsertAsync(user); } else { await Context.Users.UpdateAsync(user); } await Context.SaveChangesAsync(); }); } public async Task DeleteUserAsync(string userId) { await PersistUserMutationWithOplogFallbackAsync( userId, OperationType.Delete, payload: null, async () => { await Context.Users.DeleteAsync(userId); await Context.SaveChangesAsync(); }); } public string GetRecentLogs(int max = 50) { return _logSink.GetRecent(max); } private async Task PersistUserMutationWithOplogFallbackAsync( string userId, OperationType operationType, JsonElement? payload, Func mutation) { var oplogCountBefore = Context.OplogEntries.FindAll().Count(); await mutation(); // Prefer native CDC path; fallback only when CDC fails to emit. var deadline = DateTime.UtcNow.AddSeconds(3); while (DateTime.UtcNow < deadline) { if (Context.OplogEntries.FindAll().Count() > oplogCountBefore) { return; } await Task.Delay(50); } var previousHash = await _oplogStore.GetLastEntryHashAsync(_nodeId) ?? string.Empty; var fallbackEntry = new OplogEntry( collection: "Users", key: userId, operation: operationType, payload: payload, timestamp: NextTimestamp(), previousHash: previousHash); await _oplogStore.AppendOplogEntryAsync(fallbackEntry); await Context.SaveChangesAsync(); } private HlcTimestamp NextTimestamp() { var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); if (now > _lastPhysicalTime) { _lastPhysicalTime = now; _logicalCounter = 0; } else { _logicalCounter++; } return new HlcTimestamp(_lastPhysicalTime, _logicalCounter, _nodeId); } private static void TryDeleteDirectory(string path) { if (!Directory.Exists(path)) { return; } try { Directory.Delete(path, recursive: true); } catch { } } } private sealed class PassiveDiscoveryService : IDiscoveryService { public IEnumerable GetActivePeers() { return Array.Empty(); } public Task Start() { return Task.CompletedTask; } public Task Stop() { return Task.CompletedTask; } } private sealed class StaticPeerNodeConfigurationProvider : IPeerNodeConfigurationProvider { private PeerNodeConfiguration _configuration; public StaticPeerNodeConfigurationProvider(PeerNodeConfiguration configuration) { _configuration = configuration; } public event PeerNodeConfigurationChangedEventHandler? ConfigurationChanged; public Task GetConfiguration() { return Task.FromResult(_configuration); } public void Update(PeerNodeConfiguration configuration) { _configuration = configuration; ConfigurationChanged?.Invoke(this, configuration); } } private sealed class InMemoryLogSink { private readonly ConcurrentQueue _entries = new(); private readonly string _nodeId; public InMemoryLogSink(string nodeId) { _nodeId = nodeId; } public void Add(string category, LogLevel level, string message, Exception? exception) { var text = $"[{DateTime.UtcNow:O}] {_nodeId} {level} {category}: {message}"; if (exception is not null) { text = $"{text}{Environment.NewLine}{exception}"; } _entries.Enqueue(text); while (_entries.Count > 500 && _entries.TryDequeue(out _)) { } } public string GetRecent(int max) { var entries = _entries.ToArray(); if (entries.Length == 0) { return ""; } return string.Join(Environment.NewLine, entries.TakeLast(max)); } } private sealed class InMemoryLoggerProvider : ILoggerProvider { private readonly InMemoryLogSink _sink; public InMemoryLoggerProvider(InMemoryLogSink sink) { _sink = sink; } public ILogger CreateLogger(string categoryName) { return new InMemoryLogger(categoryName, _sink); } public void Dispose() { } } private sealed class InMemoryLogger : ILogger { private readonly string _categoryName; private readonly InMemoryLogSink _sink; public InMemoryLogger(string categoryName, InMemoryLogSink sink) { _categoryName = categoryName; _sink = sink; } public IDisposable BeginScope(TState state) where TState : notnull { return NullScope.Instance; } public bool IsEnabled(LogLevel logLevel) { return true; } public void Log( LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func formatter) { _sink.Add(_categoryName, logLevel, formatter(state, exception), exception); } } private sealed class NullScope : IDisposable { public static readonly NullScope Instance = new(); public void Dispose() { } } }