using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging.Abstractions; using Xunit; using ZB.MOM.WW.CBDDC.Core; using ZB.MOM.WW.CBDDC.Core.Network; using ZB.MOM.WW.CBDDC.Core.Storage; namespace ZB.MOM.WW.CBDDC.Network.Tests; public class SyncOrchestratorConfirmationTests { /// /// Verifies that merged peers are registered and the local node is skipped. /// [Fact] public async Task EnsurePeersRegisteredAsync_ShouldRegisterMergedPeers_AndSkipLocalNode() { var oplogStore = Substitute.For(); var confirmationStore = Substitute.For(); var orchestrator = CreateOrchestrator(oplogStore, confirmationStore); var now = DateTimeOffset.UtcNow; var discoveredPeers = new List { new("local", "127.0.0.1:9000", now, PeerType.LanDiscovered), new("peer-a", "10.0.0.1:9000", now, PeerType.LanDiscovered) }; var knownPeers = new List { new("peer-a", "10.99.0.1:9000", now, PeerType.StaticRemote), new("peer-b", "10.0.0.2:9010", now, PeerType.StaticRemote) }; var mergedPeers = SyncOrchestrator.BuildMergedPeerList(discoveredPeers, knownPeers, "local"); mergedPeers.Count.ShouldBe(2); await orchestrator.EnsurePeersRegisteredAsync(mergedPeers, "local", CancellationToken.None); await confirmationStore.Received(1).EnsurePeerRegisteredAsync( "peer-a", "10.0.0.1:9000", PeerType.LanDiscovered, Arg.Any()); await confirmationStore.Received(1).EnsurePeerRegisteredAsync( "peer-b", "10.0.0.2:9010", PeerType.StaticRemote, Arg.Any()); await confirmationStore.DidNotReceive().EnsurePeerRegisteredAsync( "local", Arg.Any(), Arg.Any(), Arg.Any()); } /// /// Verifies that a newly discovered node is auto-registered when peer lists are refreshed. /// [Fact] public async Task EnsurePeersRegisteredAsync_WhenNewNodeJoins_ShouldAutoRegisterJoinedNode() { var oplogStore = Substitute.For(); var confirmationStore = Substitute.For(); var orchestrator = CreateOrchestrator(oplogStore, confirmationStore); var now = DateTimeOffset.UtcNow; var knownPeers = new List { new("peer-static", "10.0.0.10:9000", now, PeerType.StaticRemote) }; var firstDiscovered = new List { new("peer-static", "10.0.0.10:9000", now, PeerType.StaticRemote) }; var firstMerged = SyncOrchestrator.BuildMergedPeerList(firstDiscovered, knownPeers, "local"); await orchestrator.EnsurePeersRegisteredAsync(firstMerged, "local", CancellationToken.None); var secondDiscovered = new List { new("peer-static", "10.0.0.10:9000", now, PeerType.StaticRemote), new("peer-new", "10.0.0.25:9010", now, PeerType.LanDiscovered) }; var secondMerged = SyncOrchestrator.BuildMergedPeerList(secondDiscovered, knownPeers, "local"); await orchestrator.EnsurePeersRegisteredAsync(secondMerged, "local", CancellationToken.None); await confirmationStore.Received(1).EnsurePeerRegisteredAsync( "peer-new", "10.0.0.25:9010", PeerType.LanDiscovered, Arg.Any()); } /// /// Verifies that confirmations advance only for nodes where remote vector-clock entries are at or ahead. /// [Fact] public async Task AdvanceConfirmationsFromVectorClockAsync_ShouldAdvanceOnlyForRemoteAtOrAhead() { var oplogStore = Substitute.For(); var confirmationStore = Substitute.For(); var orchestrator = CreateOrchestrator(oplogStore, confirmationStore); var local = new VectorClock(); local.SetTimestamp("node-equal", new HlcTimestamp(100, 1, "node-equal")); local.SetTimestamp("node-ahead", new HlcTimestamp(200, 0, "node-ahead")); local.SetTimestamp("node-behind", new HlcTimestamp(300, 0, "node-behind")); local.SetTimestamp("node-local-only", new HlcTimestamp(150, 0, "node-local-only")); var remote = new VectorClock(); remote.SetTimestamp("node-equal", new HlcTimestamp(100, 1, "node-equal")); remote.SetTimestamp("node-ahead", new HlcTimestamp(250, 0, "node-ahead")); remote.SetTimestamp("node-behind", new HlcTimestamp(299, 9, "node-behind")); remote.SetTimestamp("node-remote-only", new HlcTimestamp(900, 0, "node-remote-only")); oplogStore.GetLastEntryHashAsync("node-equal", Arg.Any()) .Returns("hash-equal"); oplogStore.GetLastEntryHashAsync("node-ahead", Arg.Any()) .Returns((string?)null); await orchestrator.AdvanceConfirmationsFromVectorClockAsync("peer-1", local, remote, CancellationToken.None); await confirmationStore.Received(1).UpdateConfirmationAsync( "peer-1", "node-equal", new HlcTimestamp(100, 1, "node-equal"), "hash-equal", Arg.Any()); await confirmationStore.Received(1).UpdateConfirmationAsync( "peer-1", "node-ahead", new HlcTimestamp(200, 0, "node-ahead"), string.Empty, Arg.Any()); await confirmationStore.DidNotReceive().UpdateConfirmationAsync( "peer-1", "node-behind", Arg.Any(), Arg.Any(), Arg.Any()); await confirmationStore.DidNotReceive().UpdateConfirmationAsync( "peer-1", "node-local-only", Arg.Any(), Arg.Any(), Arg.Any()); await confirmationStore.DidNotReceive().UpdateConfirmationAsync( "peer-1", "node-remote-only", Arg.Any(), Arg.Any(), Arg.Any()); } /// /// Verifies that pushed-batch confirmation uses the maximum timestamp and its matching hash. /// [Fact] public async Task AdvanceConfirmationForPushedBatchAsync_ShouldUseMaxTimestampAndHash() { var oplogStore = Substitute.For(); var confirmationStore = Substitute.For(); var orchestrator = CreateOrchestrator(oplogStore, confirmationStore); var pushedChanges = new List { CreateEntry("source-1", 100, 0, "hash-100"), CreateEntry("source-1", 120, 1, "hash-120"), CreateEntry("source-1", 110, 5, "hash-110") }; await orchestrator.AdvanceConfirmationForPushedBatchAsync("peer-1", "source-1", pushedChanges, CancellationToken.None); await confirmationStore.Received(1).UpdateConfirmationAsync( "peer-1", "source-1", new HlcTimestamp(120, 1, "source-1"), "hash-120", Arg.Any()); } /// /// Verifies that no confirmation update occurs when a pushed batch is empty. /// [Fact] public async Task AdvanceConfirmationForPushedBatchAsync_ShouldSkipEmptyBatch() { var oplogStore = Substitute.For(); var confirmationStore = Substitute.For(); var orchestrator = CreateOrchestrator(oplogStore, confirmationStore); await orchestrator.AdvanceConfirmationForPushedBatchAsync( "peer-1", "source-1", Array.Empty(), CancellationToken.None); await confirmationStore.DidNotReceive().UpdateConfirmationAsync( Arg.Any(), Arg.Any(), Arg.Any(), Arg.Any(), Arg.Any()); } private static SyncOrchestrator CreateOrchestrator(IOplogStore oplogStore, IPeerOplogConfirmationStore confirmationStore) { var discovery = Substitute.For(); discovery.GetActivePeers().Returns(Array.Empty()); var documentStore = Substitute.For(); documentStore.InterestedCollection.Returns(Array.Empty()); var snapshotMetadataStore = Substitute.For(); var snapshotService = Substitute.For(); var configProvider = Substitute.For(); configProvider.GetConfiguration().Returns(new PeerNodeConfiguration { NodeId = "local" }); return new SyncOrchestrator( discovery, oplogStore, documentStore, snapshotMetadataStore, snapshotService, configProvider, NullLoggerFactory.Instance, confirmationStore); } private static OplogEntry CreateEntry(string nodeId, long wall, int logic, string hash) { return new OplogEntry( "users", $"{nodeId}-{wall}-{logic}", OperationType.Put, payload: null, timestamp: new HlcTimestamp(wall, logic, nodeId), previousHash: string.Empty, hash: hash); } }