using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ZB.MOM.WW.CBDDC.Core;
using ZB.MOM.WW.CBDDC.Core.Network;
using ZB.MOM.WW.CBDDC.Core.Storage;
using Xunit;
namespace ZB.MOM.WW.CBDDC.Network.Tests;
public class VectorClockSyncTests
{
///
/// Verifies sync pull selection includes only nodes where the remote clock is ahead.
///
[Fact]
public async Task VectorClockSync_ShouldPullOnlyNodesWithUpdates()
{
// Arrange
var (localStore, localVectorClock, _) = CreatePeerStore();
var (remoteStore, remoteVectorClock, remoteOplogEntries) = CreatePeerStore();
// Local knows about node1 and node2
localVectorClock.SetTimestamp("node1", new HlcTimestamp(100, 1, "node1"));
localVectorClock.SetTimestamp("node2", new HlcTimestamp(100, 1, "node2"));
// Remote has updates for node1 only
remoteVectorClock.SetTimestamp("node1", new HlcTimestamp(200, 5, "node1"));
remoteVectorClock.SetTimestamp("node2", new HlcTimestamp(100, 1, "node2"));
// Add oplog entries for node1 in remote
remoteOplogEntries.Add(new OplogEntry(
"users", "user1", OperationType.Put,
System.Text.Json.JsonSerializer.Deserialize("{\"name\":\"Alice\"}"),
new HlcTimestamp(150, 2, "node1"), "", "hash1"
));
remoteOplogEntries.Add(new OplogEntry(
"users", "user2", OperationType.Put,
System.Text.Json.JsonSerializer.Deserialize("{\"name\":\"Bob\"}"),
new HlcTimestamp(200, 5, "node1"), "hash1", "hash2"
));
// Act
var localVC = await localStore.GetVectorClockAsync(default);
var remoteVC = remoteVectorClock;
var nodesToPull = localVC.GetNodesWithUpdates(remoteVC).ToList();
// Assert
nodesToPull.Count().ShouldBe(1);
nodesToPull.ShouldContain("node1");
// Simulate pull
foreach (var nodeId in nodesToPull)
{
var localTs = localVC.GetTimestamp(nodeId);
var changes = await remoteStore.GetOplogForNodeAfterAsync(nodeId, localTs, default);
changes.Count().ShouldBe(2);
}
}
///
/// Verifies sync push selection includes only nodes where the local clock is ahead.
///
[Fact]
public async Task VectorClockSync_ShouldPushOnlyNodesWithLocalUpdates()
{
// Arrange
var (localStore, localVectorClock, localOplogEntries) = CreatePeerStore();
var (_, remoteVectorClock, _) = CreatePeerStore();
// Local has updates for node1
localVectorClock.SetTimestamp("node1", new HlcTimestamp(200, 5, "node1"));
localVectorClock.SetTimestamp("node2", new HlcTimestamp(100, 1, "node2"));
// Remote is behind on node1
remoteVectorClock.SetTimestamp("node1", new HlcTimestamp(100, 1, "node1"));
remoteVectorClock.SetTimestamp("node2", new HlcTimestamp(100, 1, "node2"));
// Add oplog entries for node1 in local
localOplogEntries.Add(new OplogEntry(
"users", "user1", OperationType.Put,
System.Text.Json.JsonSerializer.Deserialize("{\"name\":\"Charlie\"}"),
new HlcTimestamp(150, 2, "node1"), "", "hash1"
));
// Act
var localVC = localVectorClock;
var remoteVC = remoteVectorClock;
var nodesToPush = localVC.GetNodesToPush(remoteVC).ToList();
// Assert
nodesToPush.Count().ShouldBe(1);
nodesToPush.ShouldContain("node1");
// Simulate push
foreach (var nodeId in nodesToPush)
{
var remoteTs = remoteVC.GetTimestamp(nodeId);
var changes = await localStore.GetOplogForNodeAfterAsync(nodeId, remoteTs, default);
changes.Count().ShouldBe(1);
}
}
///
/// Verifies split-brain clocks result in bidirectional synchronization requirements.
///
[Fact]
public async Task VectorClockSync_SplitBrain_ShouldSyncBothDirections()
{
// Arrange - Simulating split brain
var (partition1Store, partition1VectorClock, partition1OplogEntries) = CreatePeerStore();
var (partition2Store, partition2VectorClock, partition2OplogEntries) = CreatePeerStore();
// Partition 1 has node1 and node2 updates
partition1VectorClock.SetTimestamp("node1", new HlcTimestamp(300, 5, "node1"));
partition1VectorClock.SetTimestamp("node2", new HlcTimestamp(200, 3, "node2"));
partition1VectorClock.SetTimestamp("node3", new HlcTimestamp(50, 1, "node3"));
// Partition 2 has node3 updates
partition2VectorClock.SetTimestamp("node1", new HlcTimestamp(100, 1, "node1"));
partition2VectorClock.SetTimestamp("node2", new HlcTimestamp(100, 1, "node2"));
partition2VectorClock.SetTimestamp("node3", new HlcTimestamp(400, 8, "node3"));
partition1OplogEntries.Add(new OplogEntry(
"users", "user1", OperationType.Put,
System.Text.Json.JsonSerializer.Deserialize("{\"name\":\"P1User\"}"),
new HlcTimestamp(300, 5, "node1"), "", "hash_p1"
));
partition2OplogEntries.Add(new OplogEntry(
"users", "user2", OperationType.Put,
System.Text.Json.JsonSerializer.Deserialize("{\"name\":\"P2User\"}"),
new HlcTimestamp(400, 8, "node3"), "", "hash_p2"
));
// Act
var vc1 = partition1VectorClock;
var vc2 = partition2VectorClock;
var relation = vc1.CompareTo(vc2);
var partition1NeedsToPull = vc1.GetNodesWithUpdates(vc2).ToList();
var partition1NeedsToPush = vc1.GetNodesToPush(vc2).ToList();
// Assert
relation.ShouldBe(CausalityRelation.Concurrent);
// Partition 1 needs to pull node3
partition1NeedsToPull.Count().ShouldBe(1);
partition1NeedsToPull.ShouldContain("node3");
// Partition 1 needs to push node1 and node2
partition1NeedsToPush.Count.ShouldBe(2);
partition1NeedsToPush.ShouldContain("node1");
partition1NeedsToPush.ShouldContain("node2");
// Verify data can be synced
var changesToPull = await partition2Store.GetOplogForNodeAfterAsync("node3", vc1.GetTimestamp("node3"), default);
changesToPull.Count().ShouldBe(1);
var changesToPush = await partition1Store.GetOplogForNodeAfterAsync("node1", vc2.GetTimestamp("node1"), default);
changesToPush.Count().ShouldBe(1);
}
///
/// Verifies no pull or push is required when vector clocks are equal.
///
[Fact]
public void VectorClockSync_EqualClocks_ShouldNotSync()
{
// Arrange
var vc1 = new VectorClock();
vc1.SetTimestamp("node1", new HlcTimestamp(100, 1, "node1"));
vc1.SetTimestamp("node2", new HlcTimestamp(200, 2, "node2"));
var vc2 = new VectorClock();
vc2.SetTimestamp("node1", new HlcTimestamp(100, 1, "node1"));
vc2.SetTimestamp("node2", new HlcTimestamp(200, 2, "node2"));
// Act
var relation = vc1.CompareTo(vc2);
var nodesToPull = vc1.GetNodesWithUpdates(vc2).ToList();
var nodesToPush = vc1.GetNodesToPush(vc2).ToList();
// Assert
relation.ShouldBe(CausalityRelation.Equal);
nodesToPull.ShouldBeEmpty();
nodesToPush.ShouldBeEmpty();
}
///
/// Verifies a newly observed node is detected as a required pull source.
///
[Fact]
public async Task VectorClockSync_NewNodeJoins_ShouldBeDetected()
{
// Arrange - Simulating a new node joining the cluster
var (_, existingNodeVectorClock, _) = CreatePeerStore();
existingNodeVectorClock.SetTimestamp("node1", new HlcTimestamp(100, 1, "node1"));
existingNodeVectorClock.SetTimestamp("node2", new HlcTimestamp(100, 1, "node2"));
var (newNodeStore, newNodeVectorClock, newNodeOplogEntries) = CreatePeerStore();
newNodeVectorClock.SetTimestamp("node1", new HlcTimestamp(100, 1, "node1"));
newNodeVectorClock.SetTimestamp("node2", new HlcTimestamp(100, 1, "node2"));
newNodeVectorClock.SetTimestamp("node3", new HlcTimestamp(50, 1, "node3")); // New node
newNodeOplogEntries.Add(new OplogEntry(
"users", "user3", OperationType.Put,
System.Text.Json.JsonSerializer.Deserialize("{\"name\":\"NewNode\"}"),
new HlcTimestamp(50, 1, "node3"), "", "hash_new"
));
// Act
var existingVC = existingNodeVectorClock;
var newNodeVC = newNodeVectorClock;
var nodesToPull = existingVC.GetNodesWithUpdates(newNodeVC).ToList();
// Assert
nodesToPull.Count().ShouldBe(1);
nodesToPull.ShouldContain("node3");
var changes = await newNodeStore.GetOplogForNodeAfterAsync("node3", existingVC.GetTimestamp("node3"), default);
changes.Count().ShouldBe(1);
}
private static (IOplogStore Store, VectorClock VectorClock, List OplogEntries) CreatePeerStore()
{
var vectorClock = new VectorClock();
var oplogEntries = new List();
var store = Substitute.For();
store.GetVectorClockAsync(Arg.Any())
.Returns(Task.FromResult(vectorClock));
store.GetOplogForNodeAfterAsync(
Arg.Any(),
Arg.Any(),
Arg.Any?>(),
Arg.Any())
.Returns(callInfo =>
{
var nodeId = callInfo.ArgAt(0);
var since = callInfo.ArgAt(1);
var collections = callInfo.ArgAt?>(2)?.ToList();
IEnumerable query = oplogEntries
.Where(e => e.Timestamp.NodeId == nodeId && e.Timestamp.CompareTo(since) > 0);
if (collections is { Count: > 0 })
{
query = query.Where(e => collections.Contains(e.Collection));
}
return Task.FromResult>(query.OrderBy(e => e.Timestamp).ToList());
});
return (store, vectorClock, oplogEntries);
}
}