288 lines
14 KiB
C#
Executable File
288 lines
14 KiB
C#
Executable File
using System;
|
|
using System.Collections.Generic;
|
|
using System.IO;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using Microsoft.Extensions.Logging;
|
|
using ZB.MOM.WW.CBDDC.Core;
|
|
using ZB.MOM.WW.CBDDC.Core.Network;
|
|
using ZB.MOM.WW.CBDDC.Core.Storage;
|
|
using ZB.MOM.WW.CBDDC.Network;
|
|
using ZB.MOM.WW.CBDDC.Network.Security;
|
|
using ZB.MOM.WW.CBDDC.Network.Telemetry;
|
|
using Microsoft.Extensions.Logging.Abstractions;
|
|
using Xunit;
|
|
|
|
namespace ZB.MOM.WW.CBDDC.Network.Tests
|
|
{
|
|
public class SnapshotReconnectRegressionTests
|
|
{
|
|
// Subclass to expose private method
|
|
private class TestableSyncOrchestrator : SyncOrchestrator
|
|
{
|
|
/// <summary>
|
|
/// Initializes a new instance of the <see cref="TestableSyncOrchestrator"/> class.
|
|
/// </summary>
|
|
/// <param name="discovery">The discovery service.</param>
|
|
/// <param name="oplogStore">The oplog store.</param>
|
|
/// <param name="documentStore">The document store.</param>
|
|
/// <param name="snapshotMetadataStore">The snapshot metadata store.</param>
|
|
/// <param name="snapshotService">The snapshot service.</param>
|
|
/// <param name="peerNodeConfigurationProvider">The peer node configuration provider.</param>
|
|
/// <param name="peerOplogConfirmationStore">The peer oplog confirmation store.</param>
|
|
public TestableSyncOrchestrator(
|
|
IDiscoveryService discovery,
|
|
IOplogStore oplogStore,
|
|
IDocumentStore documentStore,
|
|
ISnapshotMetadataStore snapshotMetadataStore,
|
|
ISnapshotService snapshotService,
|
|
IPeerNodeConfigurationProvider peerNodeConfigurationProvider,
|
|
IPeerOplogConfirmationStore peerOplogConfirmationStore)
|
|
: base(
|
|
discovery,
|
|
oplogStore,
|
|
documentStore,
|
|
snapshotMetadataStore,
|
|
snapshotService,
|
|
peerNodeConfigurationProvider,
|
|
NullLoggerFactory.Instance,
|
|
peerOplogConfirmationStore)
|
|
{
|
|
}
|
|
|
|
/// <summary>
|
|
/// Invokes the inbound batch processing path through reflection for regression testing.
|
|
/// </summary>
|
|
/// <param name="client">The peer client.</param>
|
|
/// <param name="peerNodeId">The peer node identifier.</param>
|
|
/// <param name="changes">The incoming oplog changes.</param>
|
|
/// <param name="token">The cancellation token.</param>
|
|
public async Task<string> TestProcessInboundBatchAsync(
|
|
TcpPeerClient client,
|
|
string peerNodeId,
|
|
IList<OplogEntry> changes,
|
|
CancellationToken token)
|
|
{
|
|
// Reflection to invoke private method since it's private not protected
|
|
var method = typeof(SyncOrchestrator).GetMethod(
|
|
"ProcessInboundBatchAsync",
|
|
System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance);
|
|
|
|
if (method == null)
|
|
throw new InvalidOperationException("ProcessInboundBatchAsync method not found.");
|
|
|
|
try
|
|
{
|
|
var task = (Task)method.Invoke(this, new object[] { client, peerNodeId, changes, token })!;
|
|
await task.ConfigureAwait(false);
|
|
|
|
// Access .Result via reflection because generic type is private
|
|
var resultProp = task.GetType().GetProperty("Result");
|
|
var result = resultProp?.GetValue(task);
|
|
|
|
return result?.ToString() ?? "null";
|
|
}
|
|
catch (System.Reflection.TargetInvocationException ex)
|
|
{
|
|
if (ex.InnerException != null) throw ex.InnerException;
|
|
throw;
|
|
}
|
|
}
|
|
}
|
|
|
|
private static ISnapshotMetadataStore CreateSnapshotMetadataStore()
|
|
{
|
|
var snapshotMetadataStore = Substitute.For<ISnapshotMetadataStore>();
|
|
snapshotMetadataStore.GetSnapshotMetadataAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
|
.Returns((SnapshotMetadata?)null);
|
|
snapshotMetadataStore.GetSnapshotHashAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
|
.Returns((string?)null);
|
|
snapshotMetadataStore.GetAllSnapshotMetadataAsync(Arg.Any<CancellationToken>())
|
|
.Returns(Array.Empty<SnapshotMetadata>());
|
|
return snapshotMetadataStore;
|
|
}
|
|
|
|
private static ISnapshotService CreateSnapshotService()
|
|
{
|
|
var snapshotService = Substitute.For<ISnapshotService>();
|
|
snapshotService.CreateSnapshotAsync(Arg.Any<Stream>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
snapshotService.ReplaceDatabaseAsync(Arg.Any<Stream>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
snapshotService.MergeSnapshotAsync(Arg.Any<Stream>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
return snapshotService;
|
|
}
|
|
|
|
private static IDocumentStore CreateDocumentStore()
|
|
{
|
|
var documentStore = Substitute.For<IDocumentStore>();
|
|
documentStore.InterestedCollection.Returns(["Users", "TodoLists"]);
|
|
documentStore.GetDocumentAsync(Arg.Any<string>(), Arg.Any<string>(), Arg.Any<CancellationToken>())
|
|
.Returns((Document?)null);
|
|
documentStore.GetDocumentsByCollectionAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
|
.Returns(Array.Empty<Document>());
|
|
documentStore.GetDocumentsAsync(Arg.Any<List<(string Collection, string Key)>>(), Arg.Any<CancellationToken>())
|
|
.Returns(Array.Empty<Document>());
|
|
documentStore.PutDocumentAsync(Arg.Any<Document>(), Arg.Any<CancellationToken>())
|
|
.Returns(true);
|
|
documentStore.InsertBatchDocumentsAsync(Arg.Any<IEnumerable<Document>>(), Arg.Any<CancellationToken>())
|
|
.Returns(true);
|
|
documentStore.UpdateBatchDocumentsAsync(Arg.Any<IEnumerable<Document>>(), Arg.Any<CancellationToken>())
|
|
.Returns(true);
|
|
documentStore.DeleteDocumentAsync(Arg.Any<string>(), Arg.Any<string>(), Arg.Any<CancellationToken>())
|
|
.Returns(true);
|
|
documentStore.DeleteBatchDocumentsAsync(Arg.Any<IEnumerable<string>>(), Arg.Any<CancellationToken>())
|
|
.Returns(true);
|
|
documentStore.MergeAsync(Arg.Any<Document>(), Arg.Any<CancellationToken>())
|
|
.Returns(ci => ci.ArgAt<Document>(0));
|
|
documentStore.DropAsync(Arg.Any<CancellationToken>()).Returns(Task.CompletedTask);
|
|
documentStore.ExportAsync(Arg.Any<CancellationToken>()).Returns(Array.Empty<Document>());
|
|
documentStore.ImportAsync(Arg.Any<IEnumerable<Document>>(), Arg.Any<CancellationToken>()).Returns(Task.CompletedTask);
|
|
documentStore.MergeAsync(Arg.Any<IEnumerable<Document>>(), Arg.Any<CancellationToken>()).Returns(Task.CompletedTask);
|
|
return documentStore;
|
|
}
|
|
|
|
private static IOplogStore CreateOplogStore(string? localHeadHash)
|
|
{
|
|
var oplogStore = Substitute.For<IOplogStore>();
|
|
oplogStore.GetLastEntryHashAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
|
.Returns(localHeadHash);
|
|
oplogStore.ApplyBatchAsync(Arg.Any<IEnumerable<OplogEntry>>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
return oplogStore;
|
|
}
|
|
|
|
private static TcpPeerClient CreateSnapshotRequiredClient()
|
|
{
|
|
var logger = Substitute.For<ILogger<TcpPeerClient>>();
|
|
var client = Substitute.For<TcpPeerClient>(
|
|
"127.0.0.1:0",
|
|
logger,
|
|
(IPeerHandshakeService?)null,
|
|
(INetworkTelemetryService?)null);
|
|
client.GetChainRangeAsync(Arg.Any<string>(), Arg.Any<string>(), Arg.Any<CancellationToken>())
|
|
.Returns(_ => Task.FromException<List<OplogEntry>>(new SnapshotRequiredException()));
|
|
return client;
|
|
}
|
|
|
|
private static IDiscoveryService CreateDiscovery()
|
|
{
|
|
var discovery = Substitute.For<IDiscoveryService>();
|
|
discovery.GetActivePeers().Returns(Array.Empty<PeerNode>());
|
|
discovery.Start().Returns(Task.CompletedTask);
|
|
discovery.Stop().Returns(Task.CompletedTask);
|
|
return discovery;
|
|
}
|
|
|
|
private static IPeerNodeConfigurationProvider CreateConfig()
|
|
{
|
|
var configProvider = Substitute.For<IPeerNodeConfigurationProvider>();
|
|
configProvider.GetConfiguration().Returns(new PeerNodeConfiguration { NodeId = "local" });
|
|
return configProvider;
|
|
}
|
|
|
|
private static IPeerOplogConfirmationStore CreatePeerOplogConfirmationStore()
|
|
{
|
|
var store = Substitute.For<IPeerOplogConfirmationStore>();
|
|
store.EnsurePeerRegisteredAsync(Arg.Any<string>(), Arg.Any<string>(), Arg.Any<PeerType>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
store.UpdateConfirmationAsync(Arg.Any<string>(), Arg.Any<string>(), Arg.Any<HlcTimestamp>(), Arg.Any<string>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
store.GetConfirmationsAsync(Arg.Any<CancellationToken>()).Returns(Array.Empty<PeerOplogConfirmation>());
|
|
store.GetConfirmationsForPeerAsync(Arg.Any<string>(), Arg.Any<CancellationToken>()).Returns(Array.Empty<PeerOplogConfirmation>());
|
|
store.RemovePeerTrackingAsync(Arg.Any<string>(), Arg.Any<CancellationToken>()).Returns(Task.CompletedTask);
|
|
store.GetActiveTrackedPeersAsync(Arg.Any<CancellationToken>()).Returns(Array.Empty<string>());
|
|
store.ExportAsync(Arg.Any<CancellationToken>()).Returns(Array.Empty<PeerOplogConfirmation>());
|
|
store.ImportAsync(Arg.Any<IEnumerable<PeerOplogConfirmation>>(), Arg.Any<CancellationToken>()).Returns(Task.CompletedTask);
|
|
store.MergeAsync(Arg.Any<IEnumerable<PeerOplogConfirmation>>(), Arg.Any<CancellationToken>()).Returns(Task.CompletedTask);
|
|
return store;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Verifies that gap recovery is skipped when an inbound entry already matches the snapshot boundary hash.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task ProcessInboundBatch_ShouldSkipGapRecovery_WhenEntryMatchesSnapshotBoundary()
|
|
{
|
|
// Arrange
|
|
var oplogStore = CreateOplogStore("snapshot-boundary-hash");
|
|
var snapshotMetadataStore = CreateSnapshotMetadataStore();
|
|
snapshotMetadataStore.GetSnapshotHashAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
|
.Returns("snapshot-boundary-hash");
|
|
var snapshotService = CreateSnapshotService();
|
|
|
|
var orch = new TestableSyncOrchestrator(
|
|
CreateDiscovery(),
|
|
oplogStore,
|
|
CreateDocumentStore(),
|
|
snapshotMetadataStore,
|
|
snapshotService,
|
|
CreateConfig(),
|
|
CreatePeerOplogConfirmationStore());
|
|
|
|
using var client = CreateSnapshotRequiredClient();
|
|
|
|
// Incoming entry that connects to snapshot boundary
|
|
var entries = new List<OplogEntry>
|
|
{
|
|
new OplogEntry(
|
|
"col", "key", OperationType.Put, null,
|
|
new HlcTimestamp(100, 1, "remote-node"),
|
|
"snapshot-boundary-hash" // PreviousHash matches SnapshotHash!
|
|
)
|
|
};
|
|
|
|
// Act
|
|
var result = await orch.TestProcessInboundBatchAsync(client, "remote-node", entries, CancellationToken.None);
|
|
|
|
// Assert
|
|
result.ShouldBe("Success");
|
|
await client.DidNotReceive().GetChainRangeAsync(Arg.Any<string>(), Arg.Any<string>(), Arg.Any<CancellationToken>());
|
|
}
|
|
|
|
/// <summary>
|
|
/// Verifies that gap recovery is attempted when the inbound entry does not match the snapshot boundary hash.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task ProcessInboundBatch_ShouldTryRecovery_WhenSnapshotMismatch()
|
|
{
|
|
// Arrange
|
|
var oplogStore = CreateOplogStore("some-old-hash");
|
|
var snapshotMetadataStore = CreateSnapshotMetadataStore();
|
|
snapshotMetadataStore.GetSnapshotHashAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
|
.Returns("snapshot-boundary-hash");
|
|
var snapshotService = CreateSnapshotService();
|
|
|
|
var orch = new TestableSyncOrchestrator(
|
|
CreateDiscovery(),
|
|
oplogStore,
|
|
CreateDocumentStore(),
|
|
snapshotMetadataStore,
|
|
snapshotService,
|
|
CreateConfig(),
|
|
CreatePeerOplogConfirmationStore());
|
|
using var client = CreateSnapshotRequiredClient();
|
|
|
|
var entries = new List<OplogEntry>
|
|
{
|
|
new OplogEntry(
|
|
"col", "key", OperationType.Put, null,
|
|
new HlcTimestamp(100, 1, "remote-node"),
|
|
"different-hash" // Mismatch!
|
|
)
|
|
};
|
|
|
|
// Act & Assert
|
|
// When gap recovery triggers, the client throws SnapshotRequiredException.
|
|
// SyncOrchestrator catches SnapshotRequiredException and re-throws it to trigger full sync
|
|
// So we expect SnapshotRequiredException to bubble up (wrapped in TargetInvocationException/AggregateException if not unwrapped by helper)
|
|
|
|
await Should.ThrowAsync<SnapshotRequiredException>(async () =>
|
|
await orch.TestProcessInboundBatchAsync(client, "remote-node", entries, CancellationToken.None));
|
|
|
|
await client.Received(1).GetChainRangeAsync(Arg.Any<string>(), Arg.Any<string>(), Arg.Any<CancellationToken>());
|
|
}
|
|
}
|
|
}
|