Reformat/cleanup
All checks were successful
NuGet Package Publish / nuget (push) Successful in 1m10s

This commit is contained in:
Joseph Doherty
2026-02-21 07:53:53 -05:00
parent c6f6d9329a
commit 7ebc2cb567
160 changed files with 7258 additions and 7262 deletions

View File

@@ -1,6 +1,6 @@
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Collections.Concurrent;
using System.Text.Json;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
@@ -16,24 +16,20 @@ namespace ZB.MOM.WW.CBDDC.E2E.Tests;
public class ClusterCrudSyncE2ETests
{
/// <summary>
/// Verifies two real peers replicate create, update, and delete operations in both directions.
/// Verifies two real peers replicate create, update, and delete operations in both directions.
/// </summary>
[Fact]
public async Task TwoPeers_ShouldReplicateCrudBidirectionally()
{
var clusterToken = Guid.NewGuid().ToString("N");
var nodeAPort = GetAvailableTcpPort();
var nodeBPort = GetAvailableTcpPort();
while (nodeBPort == nodeAPort)
{
nodeBPort = GetAvailableTcpPort();
}
int nodeAPort = GetAvailableTcpPort();
int nodeBPort = GetAvailableTcpPort();
while (nodeBPort == nodeAPort) nodeBPort = GetAvailableTcpPort();
await using var nodeA = TestPeerNode.Create(
nodeId: "node-a",
tcpPort: nodeAPort,
authToken: clusterToken,
knownPeers:
"node-a",
nodeAPort,
clusterToken,
[
new KnownPeerConfiguration
{
@@ -44,10 +40,9 @@ public class ClusterCrudSyncE2ETests
]);
await using var nodeB = TestPeerNode.Create(
nodeId: "node-b",
tcpPort: nodeBPort,
authToken: clusterToken,
knownPeers:
"node-b",
nodeBPort,
clusterToken,
[
new KnownPeerConfiguration
{
@@ -75,9 +70,9 @@ public class ClusterCrudSyncE2ETests
{
var replicated = nodeB.ReadUser(nodeAUserId);
return replicated is not null
&& replicated.Name == "Alice"
&& replicated.Age == 30
&& replicated.Address?.City == "Austin";
&& replicated.Name == "Alice"
&& replicated.Age == 30
&& replicated.Address?.City == "Austin";
}, timeoutSeconds, "Node B did not receive create from node A.", () => BuildDiagnostics(nodeA, nodeB));
await AssertEventuallyAsync(
@@ -100,9 +95,9 @@ public class ClusterCrudSyncE2ETests
{
var replicated = nodeB.ReadUser(nodeAUserId);
return replicated is not null
&& replicated.Name == "Alice Updated"
&& replicated.Age == 31
&& replicated.Address?.City == "Dallas";
&& replicated.Name == "Alice Updated"
&& replicated.Age == 31
&& replicated.Address?.City == "Dallas";
}, timeoutSeconds, "Node B did not receive update from node A.", () => BuildDiagnostics(nodeA, nodeB));
await nodeA.DeleteUserAsync(nodeAUserId);
@@ -126,9 +121,9 @@ public class ClusterCrudSyncE2ETests
{
var replicated = nodeA.ReadUser(nodeBUserId);
return replicated is not null
&& replicated.Name == "Bob"
&& replicated.Age == 40
&& replicated.Address?.City == "Boston";
&& replicated.Name == "Bob"
&& replicated.Age == 40
&& replicated.Address?.City == "Boston";
}, timeoutSeconds, "Node A did not receive create from node B.", () => BuildDiagnostics(nodeA, nodeB));
await AssertEventuallyAsync(
@@ -151,9 +146,9 @@ public class ClusterCrudSyncE2ETests
{
var replicated = nodeA.ReadUser(nodeBUserId);
return replicated is not null
&& replicated.Name == "Bob Updated"
&& replicated.Age == 41
&& replicated.Address?.City == "Denver";
&& replicated.Name == "Bob Updated"
&& replicated.Age == 41
&& replicated.Address?.City == "Denver";
}, timeoutSeconds, "Node A did not receive update from node B.", () => BuildDiagnostics(nodeA, nodeB));
await nodeB.DeleteUserAsync(nodeBUserId);
@@ -175,36 +170,35 @@ public class ClusterCrudSyncE2ETests
var startedAt = DateTime.UtcNow;
while (DateTime.UtcNow - startedAt < timeout)
{
if (predicate())
{
return;
}
if (predicate()) return;
await Task.Delay(250);
}
var suffix = diagnostics is null ? string.Empty : $"{Environment.NewLine}{diagnostics()}";
throw new Shouldly.ShouldAssertException($"{failureMessage}{suffix}");
string suffix = diagnostics is null ? string.Empty : $"{Environment.NewLine}{diagnostics()}";
throw new ShouldAssertException($"{failureMessage}{suffix}");
}
private static string BuildDiagnostics(TestPeerNode nodeA, TestPeerNode nodeB)
{
var nodeAUserCount = nodeA.Context.Users.FindAll().Count();
var nodeBUserCount = nodeB.Context.Users.FindAll().Count();
var nodeAOplogCount = nodeA.Context.OplogEntries.FindAll().Count();
var nodeBOplogCount = nodeB.Context.OplogEntries.FindAll().Count();
var nodeAOplogByAuthor = string.Join(
int nodeAUserCount = nodeA.Context.Users.FindAll().Count();
int nodeBUserCount = nodeB.Context.Users.FindAll().Count();
int nodeAOplogCount = nodeA.Context.OplogEntries.FindAll().Count();
int nodeBOplogCount = nodeB.Context.OplogEntries.FindAll().Count();
string nodeAOplogByAuthor = string.Join(
", ",
nodeA.Context.OplogEntries.FindAll()
.GroupBy(e => e.TimestampNodeId)
.Select(g => $"{g.Key}:{g.Count()}"));
var nodeBOplogByAuthor = string.Join(
string nodeBOplogByAuthor = string.Join(
", ",
nodeB.Context.OplogEntries.FindAll()
.GroupBy(e => e.TimestampNodeId)
.Select(g => $"{g.Key}:{g.Count()}"));
var nodeAUsers = string.Join(", ", nodeA.Context.Users.FindAll().Select(u => $"{u.Id}:{u.Name}:{u.Age}:{u.Address?.City}"));
var nodeBUsers = string.Join(", ", nodeB.Context.Users.FindAll().Select(u => $"{u.Id}:{u.Name}:{u.Age}:{u.Address?.City}"));
string nodeAUsers = string.Join(", ",
nodeA.Context.Users.FindAll().Select(u => $"{u.Id}:{u.Name}:{u.Age}:{u.Address?.City}"));
string nodeBUsers = string.Join(", ",
nodeB.Context.Users.FindAll().Select(u => $"{u.Id}:{u.Name}:{u.Age}:{u.Address?.City}"));
return string.Join(
Environment.NewLine,
@@ -230,20 +224,15 @@ public class ClusterCrudSyncE2ETests
private sealed class TestPeerNode : IAsyncDisposable
{
private readonly ServiceProvider _services;
private readonly ICBDDCNode _node;
private readonly IOplogStore _oplogStore;
private readonly string _nodeId;
private readonly string _workDir;
private readonly InMemoryLogSink _logSink;
private bool _started;
private readonly ICBDDCNode _node;
private readonly string _nodeId;
private readonly IOplogStore _oplogStore;
private readonly ServiceProvider _services;
private readonly string _workDir;
private long _lastPhysicalTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
private int _logicalCounter;
/// <summary>
/// Gets the LiteDB-backed context used by this test peer.
/// </summary>
public SampleDbContext Context { get; }
private bool _started;
private TestPeerNode(
ServiceProvider services,
@@ -264,82 +253,9 @@ public class ClusterCrudSyncE2ETests
}
/// <summary>
/// Creates a test peer node and wires all required services.
/// Gets the LiteDB-backed context used by this test peer.
/// </summary>
/// <param name="nodeId">The unique node identifier.</param>
/// <param name="tcpPort">The TCP port used by the node listener.</param>
/// <param name="authToken">The cluster authentication token.</param>
/// <param name="knownPeers">The known peers this node can connect to.</param>
/// <returns>A configured <see cref="TestPeerNode"/> instance.</returns>
public static TestPeerNode Create(
string nodeId,
int tcpPort,
string authToken,
IReadOnlyList<KnownPeerConfiguration> knownPeers)
{
var workDir = Path.Combine(Path.GetTempPath(), $"cbddc-e2e-{nodeId}-{Guid.NewGuid():N}");
Directory.CreateDirectory(workDir);
var dbPath = Path.Combine(workDir, "node.blite");
var configProvider = new StaticPeerNodeConfigurationProvider(new PeerNodeConfiguration
{
NodeId = nodeId,
TcpPort = tcpPort,
AuthToken = authToken,
KnownPeers = knownPeers.ToList()
});
var services = new ServiceCollection();
services.AddSingleton(new InMemoryLogSink(nodeId));
services.AddSingleton<ILoggerProvider, InMemoryLoggerProvider>();
services.AddLogging(builder => builder.SetMinimumLevel(LogLevel.Debug));
services.AddSingleton(configProvider);
services.AddSingleton<IPeerNodeConfigurationProvider>(configProvider);
services.AddCBDDCCore()
.AddCBDDCBLite<SampleDbContext, SampleDocumentStore>(_ => new SampleDbContext(dbPath))
.AddCBDDCNetwork<StaticPeerNodeConfigurationProvider>(useHostedService: false);
// Deterministic tests: sync uses explicit known peers, so disable UDP discovery.
services.AddSingleton<IDiscoveryService, PassiveDiscoveryService>();
services.AddSingleton<IPeerHandshakeService, NoOpHandshakeService>();
var provider = services.BuildServiceProvider();
var node = provider.GetRequiredService<ICBDDCNode>();
var oplogStore = provider.GetRequiredService<IOplogStore>();
var context = provider.GetRequiredService<SampleDbContext>();
var logSink = provider.GetRequiredService<InMemoryLogSink>();
return new TestPeerNode(provider, node, oplogStore, context, logSink, workDir, nodeId);
}
/// <summary>
/// Starts the underlying node when it has not been started yet.
/// </summary>
/// <returns>A task that represents the asynchronous operation.</returns>
public async Task StartAsync()
{
if (_started)
{
return;
}
await _node.Start();
_started = true;
}
/// <summary>
/// Stops the underlying node when it is currently running.
/// </summary>
/// <returns>A task that represents the asynchronous operation.</returns>
public async Task StopAsync()
{
if (!_started)
{
return;
}
await _node.Stop();
_started = false;
}
public SampleDbContext Context { get; }
/// <inheritdoc />
public async ValueTask DisposeAsync()
@@ -357,17 +273,89 @@ public class ClusterCrudSyncE2ETests
}
/// <summary>
/// Reads a user document by identifier.
/// Creates a test peer node and wires all required services.
/// </summary>
/// <param name="nodeId">The unique node identifier.</param>
/// <param name="tcpPort">The TCP port used by the node listener.</param>
/// <param name="authToken">The cluster authentication token.</param>
/// <param name="knownPeers">The known peers this node can connect to.</param>
/// <returns>A configured <see cref="TestPeerNode" /> instance.</returns>
public static TestPeerNode Create(
string nodeId,
int tcpPort,
string authToken,
IReadOnlyList<KnownPeerConfiguration> knownPeers)
{
string workDir = Path.Combine(Path.GetTempPath(), $"cbddc-e2e-{nodeId}-{Guid.NewGuid():N}");
Directory.CreateDirectory(workDir);
string dbPath = Path.Combine(workDir, "node.blite");
var configProvider = new StaticPeerNodeConfigurationProvider(new PeerNodeConfiguration
{
NodeId = nodeId,
TcpPort = tcpPort,
AuthToken = authToken,
KnownPeers = knownPeers.ToList()
});
var services = new ServiceCollection();
services.AddSingleton(new InMemoryLogSink(nodeId));
services.AddSingleton<ILoggerProvider, InMemoryLoggerProvider>();
services.AddLogging(builder => builder.SetMinimumLevel(LogLevel.Debug));
services.AddSingleton(configProvider);
services.AddSingleton<IPeerNodeConfigurationProvider>(configProvider);
services.AddCBDDCCore()
.AddCBDDCBLite<SampleDbContext, SampleDocumentStore>(_ => new SampleDbContext(dbPath))
.AddCBDDCNetwork<StaticPeerNodeConfigurationProvider>(false);
// Deterministic tests: sync uses explicit known peers, so disable UDP discovery.
services.AddSingleton<IDiscoveryService, PassiveDiscoveryService>();
services.AddSingleton<IPeerHandshakeService, NoOpHandshakeService>();
var provider = services.BuildServiceProvider();
var node = provider.GetRequiredService<ICBDDCNode>();
var oplogStore = provider.GetRequiredService<IOplogStore>();
var context = provider.GetRequiredService<SampleDbContext>();
var logSink = provider.GetRequiredService<InMemoryLogSink>();
return new TestPeerNode(provider, node, oplogStore, context, logSink, workDir, nodeId);
}
/// <summary>
/// Starts the underlying node when it has not been started yet.
/// </summary>
/// <returns>A task that represents the asynchronous operation.</returns>
public async Task StartAsync()
{
if (_started) return;
await _node.Start();
_started = true;
}
/// <summary>
/// Stops the underlying node when it is currently running.
/// </summary>
/// <returns>A task that represents the asynchronous operation.</returns>
public async Task StopAsync()
{
if (!_started) return;
await _node.Stop();
_started = false;
}
/// <summary>
/// Reads a user document by identifier.
/// </summary>
/// <param name="userId">The identifier of the user to read.</param>
/// <returns>The matching user when found; otherwise <see langword="null"/>.</returns>
/// <returns>The matching user when found; otherwise <see langword="null" />.</returns>
public User? ReadUser(string userId)
{
return Context.Users.Find(u => u.Id == userId).FirstOrDefault();
}
/// <summary>
/// Inserts or updates a user and persists the matching oplog entry.
/// Inserts or updates a user and persists the matching oplog entry.
/// </summary>
/// <param name="user">The user payload to upsert.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
@@ -381,20 +369,16 @@ public class ClusterCrudSyncE2ETests
{
var existing = Context.Users.Find(u => u.Id == user.Id).FirstOrDefault();
if (existing == null)
{
await Context.Users.InsertAsync(user);
}
else
{
await Context.Users.UpdateAsync(user);
}
await Context.SaveChangesAsync();
});
}
/// <summary>
/// Deletes a user and persists the matching oplog entry.
/// Deletes a user and persists the matching oplog entry.
/// </summary>
/// <param name="userId">The identifier of the user to delete.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
@@ -403,7 +387,7 @@ public class ClusterCrudSyncE2ETests
await PersistUserMutationWithOplogFallbackAsync(
userId,
OperationType.Delete,
payload: null,
null,
async () =>
{
await Context.Users.DeleteAsync(userId);
@@ -412,7 +396,7 @@ public class ClusterCrudSyncE2ETests
}
/// <summary>
/// Gets recent in-memory logs captured for this node.
/// Gets recent in-memory logs captured for this node.
/// </summary>
/// <param name="max">The maximum number of log entries to return.</param>
/// <returns>A newline-delimited string of recent log entries.</returns>
@@ -427,29 +411,26 @@ public class ClusterCrudSyncE2ETests
JsonElement? payload,
Func<Task> mutation)
{
var oplogCountBefore = Context.OplogEntries.FindAll().Count();
int oplogCountBefore = Context.OplogEntries.FindAll().Count();
await mutation();
// Prefer native CDC path; fallback only when CDC fails to emit.
var deadline = DateTime.UtcNow.AddSeconds(3);
while (DateTime.UtcNow < deadline)
{
if (Context.OplogEntries.FindAll().Count() > oplogCountBefore)
{
return;
}
if (Context.OplogEntries.FindAll().Count() > oplogCountBefore) return;
await Task.Delay(50);
}
var previousHash = await _oplogStore.GetLastEntryHashAsync(_nodeId) ?? string.Empty;
string previousHash = await _oplogStore.GetLastEntryHashAsync(_nodeId) ?? string.Empty;
var fallbackEntry = new OplogEntry(
collection: "Users",
key: userId,
operation: operationType,
payload: payload,
timestamp: NextTimestamp(),
previousHash: previousHash);
"Users",
userId,
operationType,
payload,
NextTimestamp(),
previousHash);
await _oplogStore.AppendOplogEntryAsync(fallbackEntry);
await Context.SaveChangesAsync();
@@ -457,7 +438,7 @@ public class ClusterCrudSyncE2ETests
private HlcTimestamp NextTimestamp()
{
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
long now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
if (now > _lastPhysicalTime)
{
_lastPhysicalTime = now;
@@ -473,14 +454,11 @@ public class ClusterCrudSyncE2ETests
private static void TryDeleteDirectory(string path)
{
if (!Directory.Exists(path))
{
return;
}
if (!Directory.Exists(path)) return;
try
{
Directory.Delete(path, recursive: true);
Directory.Delete(path, true);
}
catch
{
@@ -514,7 +492,7 @@ public class ClusterCrudSyncE2ETests
private PeerNodeConfiguration _configuration;
/// <summary>
/// Initializes a new instance of the <see cref="StaticPeerNodeConfigurationProvider"/> class.
/// Initializes a new instance of the <see cref="StaticPeerNodeConfigurationProvider" /> class.
/// </summary>
/// <param name="configuration">The initial peer node configuration.</param>
public StaticPeerNodeConfigurationProvider(PeerNodeConfiguration configuration)
@@ -545,7 +523,7 @@ public class ClusterCrudSyncE2ETests
private readonly string _nodeId;
/// <summary>
/// Initializes a new instance of the <see cref="InMemoryLogSink"/> class.
/// Initializes a new instance of the <see cref="InMemoryLogSink" /> class.
/// </summary>
/// <param name="nodeId">The node identifier associated with emitted logs.</param>
public InMemoryLogSink(string nodeId)
@@ -554,7 +532,7 @@ public class ClusterCrudSyncE2ETests
}
/// <summary>
/// Adds a log entry to the in-memory sink.
/// Adds a log entry to the in-memory sink.
/// </summary>
/// <param name="category">The log category.</param>
/// <param name="level">The log level.</param>
@@ -563,10 +541,7 @@ public class ClusterCrudSyncE2ETests
public void Add(string category, LogLevel level, string message, Exception? exception)
{
var text = $"[{DateTime.UtcNow:O}] {_nodeId} {level} {category}: {message}";
if (exception is not null)
{
text = $"{text}{Environment.NewLine}{exception}";
}
if (exception is not null) text = $"{text}{Environment.NewLine}{exception}";
_entries.Enqueue(text);
while (_entries.Count > 500 && _entries.TryDequeue(out _))
@@ -575,17 +550,14 @@ public class ClusterCrudSyncE2ETests
}
/// <summary>
/// Gets the most recent log entries from the sink.
/// Gets the most recent log entries from the sink.
/// </summary>
/// <param name="max">The maximum number of entries to return.</param>
/// <returns>A newline-delimited string of recent log entries, or a placeholder when none exist.</returns>
public string GetRecent(int max)
{
var entries = _entries.ToArray();
if (entries.Length == 0)
{
return "<no logs>";
}
string[] entries = _entries.ToArray();
if (entries.Length == 0) return "<no logs>";
return string.Join(Environment.NewLine, entries.TakeLast(max));
}
@@ -596,7 +568,7 @@ public class ClusterCrudSyncE2ETests
private readonly InMemoryLogSink _sink;
/// <summary>
/// Initializes a new instance of the <see cref="InMemoryLoggerProvider"/> class.
/// Initializes a new instance of the <see cref="InMemoryLoggerProvider" /> class.
/// </summary>
/// <param name="sink">The shared sink used to capture log messages.</param>
public InMemoryLoggerProvider(InMemoryLogSink sink)
@@ -622,7 +594,7 @@ public class ClusterCrudSyncE2ETests
private readonly InMemoryLogSink _sink;
/// <summary>
/// Initializes a new instance of the <see cref="InMemoryLogger"/> class.
/// Initializes a new instance of the <see cref="InMemoryLogger" /> class.
/// </summary>
/// <param name="categoryName">The logger category name.</param>
/// <param name="sink">The sink that stores emitted log messages.</param>
@@ -665,4 +637,4 @@ public class ClusterCrudSyncE2ETests
{
}
}
}
}

View File

@@ -1,2 +1,2 @@
global using Shouldly;
global using ZB.MOM.WW.CBDDC.Sample.Console;
global using ZB.MOM.WW.CBDDC.Sample.Console;

View File

@@ -1,33 +1,33 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<AssemblyName>ZB.MOM.WW.CBDDC.E2E.Tests</AssemblyName>
<RootNamespace>ZB.MOM.WW.CBDDC.E2E.Tests</RootNamespace>
<PackageId>ZB.MOM.WW.CBDDC.E2E.Tests</PackageId>
<TargetFramework>net10.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<NoWarn>$(NoWarn);xUnit1031;xUnit1051</NoWarn>
<IsPackable>false</IsPackable>
</PropertyGroup>
<PropertyGroup>
<AssemblyName>ZB.MOM.WW.CBDDC.E2E.Tests</AssemblyName>
<RootNamespace>ZB.MOM.WW.CBDDC.E2E.Tests</RootNamespace>
<PackageId>ZB.MOM.WW.CBDDC.E2E.Tests</PackageId>
<TargetFramework>net10.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<NoWarn>$(NoWarn);xUnit1031;xUnit1051</NoWarn>
<IsPackable>false</IsPackable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="coverlet.collector" Version="6.0.4" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.14.1" />
<PackageReference Include="Shouldly" Version="4.3.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="3.1.4" />
<PackageReference Include="xunit.v3" Version="3.2.0" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="coverlet.collector" Version="6.0.4"/>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.14.1"/>
<PackageReference Include="Shouldly" Version="4.3.0"/>
<PackageReference Include="xunit.runner.visualstudio" Version="3.1.4"/>
<PackageReference Include="xunit.v3" Version="3.2.0"/>
</ItemGroup>
<ItemGroup>
<Using Include="Xunit" />
</ItemGroup>
<ItemGroup>
<Using Include="Xunit"/>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\samples\ZB.MOM.WW.CBDDC.Sample.Console\ZB.MOM.WW.CBDDC.Sample.Console.csproj" />
<ProjectReference Include="..\..\src\ZB.MOM.WW.CBDDC.Core\ZB.MOM.WW.CBDDC.Core.csproj" />
<ProjectReference Include="..\..\src\ZB.MOM.WW.CBDDC.Network\ZB.MOM.WW.CBDDC.Network.csproj" />
<ProjectReference Include="..\..\src\ZB.MOM.WW.CBDDC.Persistence\ZB.MOM.WW.CBDDC.Persistence.csproj" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\samples\ZB.MOM.WW.CBDDC.Sample.Console\ZB.MOM.WW.CBDDC.Sample.Console.csproj"/>
<ProjectReference Include="..\..\src\ZB.MOM.WW.CBDDC.Core\ZB.MOM.WW.CBDDC.Core.csproj"/>
<ProjectReference Include="..\..\src\ZB.MOM.WW.CBDDC.Network\ZB.MOM.WW.CBDDC.Network.csproj"/>
<ProjectReference Include="..\..\src\ZB.MOM.WW.CBDDC.Persistence\ZB.MOM.WW.CBDDC.Persistence.csproj"/>
</ItemGroup>
</Project>