using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.CBDDC.Core;
using ZB.MOM.WW.CBDDC.Core.Network;
using ZB.MOM.WW.CBDDC.Core.Storage;
using ZB.MOM.WW.CBDDC.Network;
using ZB.MOM.WW.CBDDC.Network.Security;
using ZB.MOM.WW.CBDDC.Network.Telemetry;
using Microsoft.Extensions.Logging.Abstractions;
using Xunit;
namespace ZB.MOM.WW.CBDDC.Network.Tests
{
public class SnapshotReconnectRegressionTests
{
// Subclass to expose private method
private class TestableSyncOrchestrator : SyncOrchestrator
{
///
/// Initializes a new instance of the class.
///
/// The discovery service.
/// The oplog store.
/// The document store.
/// The snapshot metadata store.
/// The snapshot service.
/// The peer node configuration provider.
/// The peer oplog confirmation store.
public TestableSyncOrchestrator(
IDiscoveryService discovery,
IOplogStore oplogStore,
IDocumentStore documentStore,
ISnapshotMetadataStore snapshotMetadataStore,
ISnapshotService snapshotService,
IPeerNodeConfigurationProvider peerNodeConfigurationProvider,
IPeerOplogConfirmationStore peerOplogConfirmationStore)
: base(
discovery,
oplogStore,
documentStore,
snapshotMetadataStore,
snapshotService,
peerNodeConfigurationProvider,
NullLoggerFactory.Instance,
peerOplogConfirmationStore)
{
}
///
/// Invokes the inbound batch processing path through reflection for regression testing.
///
/// The peer client.
/// The peer node identifier.
/// The incoming oplog changes.
/// The cancellation token.
public async Task TestProcessInboundBatchAsync(
TcpPeerClient client,
string peerNodeId,
IList changes,
CancellationToken token)
{
// Reflection to invoke private method since it's private not protected
var method = typeof(SyncOrchestrator).GetMethod(
"ProcessInboundBatchAsync",
System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance);
if (method == null)
throw new InvalidOperationException("ProcessInboundBatchAsync method not found.");
try
{
var task = (Task)method.Invoke(this, new object[] { client, peerNodeId, changes, token })!;
await task.ConfigureAwait(false);
// Access .Result via reflection because generic type is private
var resultProp = task.GetType().GetProperty("Result");
var result = resultProp?.GetValue(task);
return result?.ToString() ?? "null";
}
catch (System.Reflection.TargetInvocationException ex)
{
if (ex.InnerException != null) throw ex.InnerException;
throw;
}
}
}
private static ISnapshotMetadataStore CreateSnapshotMetadataStore()
{
var snapshotMetadataStore = Substitute.For();
snapshotMetadataStore.GetSnapshotMetadataAsync(Arg.Any(), Arg.Any())
.Returns((SnapshotMetadata?)null);
snapshotMetadataStore.GetSnapshotHashAsync(Arg.Any(), Arg.Any())
.Returns((string?)null);
snapshotMetadataStore.GetAllSnapshotMetadataAsync(Arg.Any())
.Returns(Array.Empty());
return snapshotMetadataStore;
}
private static ISnapshotService CreateSnapshotService()
{
var snapshotService = Substitute.For();
snapshotService.CreateSnapshotAsync(Arg.Any(), Arg.Any())
.Returns(Task.CompletedTask);
snapshotService.ReplaceDatabaseAsync(Arg.Any(), Arg.Any())
.Returns(Task.CompletedTask);
snapshotService.MergeSnapshotAsync(Arg.Any(), Arg.Any())
.Returns(Task.CompletedTask);
return snapshotService;
}
private static IDocumentStore CreateDocumentStore()
{
var documentStore = Substitute.For();
documentStore.InterestedCollection.Returns(["Users", "TodoLists"]);
documentStore.GetDocumentAsync(Arg.Any(), Arg.Any(), Arg.Any())
.Returns((Document?)null);
documentStore.GetDocumentsByCollectionAsync(Arg.Any(), Arg.Any())
.Returns(Array.Empty());
documentStore.GetDocumentsAsync(Arg.Any>(), Arg.Any())
.Returns(Array.Empty());
documentStore.PutDocumentAsync(Arg.Any(), Arg.Any())
.Returns(true);
documentStore.InsertBatchDocumentsAsync(Arg.Any>(), Arg.Any())
.Returns(true);
documentStore.UpdateBatchDocumentsAsync(Arg.Any>(), Arg.Any())
.Returns(true);
documentStore.DeleteDocumentAsync(Arg.Any(), Arg.Any(), Arg.Any())
.Returns(true);
documentStore.DeleteBatchDocumentsAsync(Arg.Any>(), Arg.Any())
.Returns(true);
documentStore.MergeAsync(Arg.Any(), Arg.Any())
.Returns(ci => ci.ArgAt(0));
documentStore.DropAsync(Arg.Any()).Returns(Task.CompletedTask);
documentStore.ExportAsync(Arg.Any()).Returns(Array.Empty());
documentStore.ImportAsync(Arg.Any>(), Arg.Any()).Returns(Task.CompletedTask);
documentStore.MergeAsync(Arg.Any>(), Arg.Any()).Returns(Task.CompletedTask);
return documentStore;
}
private static IOplogStore CreateOplogStore(string? localHeadHash)
{
var oplogStore = Substitute.For();
oplogStore.GetLastEntryHashAsync(Arg.Any(), Arg.Any())
.Returns(localHeadHash);
oplogStore.ApplyBatchAsync(Arg.Any>(), Arg.Any())
.Returns(Task.CompletedTask);
return oplogStore;
}
private static TcpPeerClient CreateSnapshotRequiredClient()
{
var logger = Substitute.For>();
var client = Substitute.For(
"127.0.0.1:0",
logger,
(IPeerHandshakeService?)null,
(INetworkTelemetryService?)null);
client.GetChainRangeAsync(Arg.Any(), Arg.Any(), Arg.Any())
.Returns(_ => Task.FromException>(new SnapshotRequiredException()));
return client;
}
private static IDiscoveryService CreateDiscovery()
{
var discovery = Substitute.For();
discovery.GetActivePeers().Returns(Array.Empty());
discovery.Start().Returns(Task.CompletedTask);
discovery.Stop().Returns(Task.CompletedTask);
return discovery;
}
private static IPeerNodeConfigurationProvider CreateConfig()
{
var configProvider = Substitute.For();
configProvider.GetConfiguration().Returns(new PeerNodeConfiguration { NodeId = "local" });
return configProvider;
}
private static IPeerOplogConfirmationStore CreatePeerOplogConfirmationStore()
{
var store = Substitute.For();
store.EnsurePeerRegisteredAsync(Arg.Any(), Arg.Any(), Arg.Any(), Arg.Any())
.Returns(Task.CompletedTask);
store.UpdateConfirmationAsync(Arg.Any(), Arg.Any(), Arg.Any(), Arg.Any(), Arg.Any())
.Returns(Task.CompletedTask);
store.GetConfirmationsAsync(Arg.Any()).Returns(Array.Empty());
store.GetConfirmationsForPeerAsync(Arg.Any(), Arg.Any()).Returns(Array.Empty());
store.RemovePeerTrackingAsync(Arg.Any(), Arg.Any()).Returns(Task.CompletedTask);
store.GetActiveTrackedPeersAsync(Arg.Any()).Returns(Array.Empty());
store.ExportAsync(Arg.Any()).Returns(Array.Empty());
store.ImportAsync(Arg.Any>(), Arg.Any()).Returns(Task.CompletedTask);
store.MergeAsync(Arg.Any>(), Arg.Any()).Returns(Task.CompletedTask);
return store;
}
///
/// Verifies that gap recovery is skipped when an inbound entry already matches the snapshot boundary hash.
///
[Fact]
public async Task ProcessInboundBatch_ShouldSkipGapRecovery_WhenEntryMatchesSnapshotBoundary()
{
// Arrange
var oplogStore = CreateOplogStore("snapshot-boundary-hash");
var snapshotMetadataStore = CreateSnapshotMetadataStore();
snapshotMetadataStore.GetSnapshotHashAsync(Arg.Any(), Arg.Any())
.Returns("snapshot-boundary-hash");
var snapshotService = CreateSnapshotService();
var orch = new TestableSyncOrchestrator(
CreateDiscovery(),
oplogStore,
CreateDocumentStore(),
snapshotMetadataStore,
snapshotService,
CreateConfig(),
CreatePeerOplogConfirmationStore());
using var client = CreateSnapshotRequiredClient();
// Incoming entry that connects to snapshot boundary
var entries = new List
{
new OplogEntry(
"col", "key", OperationType.Put, null,
new HlcTimestamp(100, 1, "remote-node"),
"snapshot-boundary-hash" // PreviousHash matches SnapshotHash!
)
};
// Act
var result = await orch.TestProcessInboundBatchAsync(client, "remote-node", entries, CancellationToken.None);
// Assert
result.ShouldBe("Success");
await client.DidNotReceive().GetChainRangeAsync(Arg.Any(), Arg.Any(), Arg.Any());
}
///
/// Verifies that gap recovery is attempted when the inbound entry does not match the snapshot boundary hash.
///
[Fact]
public async Task ProcessInboundBatch_ShouldTryRecovery_WhenSnapshotMismatch()
{
// Arrange
var oplogStore = CreateOplogStore("some-old-hash");
var snapshotMetadataStore = CreateSnapshotMetadataStore();
snapshotMetadataStore.GetSnapshotHashAsync(Arg.Any(), Arg.Any())
.Returns("snapshot-boundary-hash");
var snapshotService = CreateSnapshotService();
var orch = new TestableSyncOrchestrator(
CreateDiscovery(),
oplogStore,
CreateDocumentStore(),
snapshotMetadataStore,
snapshotService,
CreateConfig(),
CreatePeerOplogConfirmationStore());
using var client = CreateSnapshotRequiredClient();
var entries = new List
{
new OplogEntry(
"col", "key", OperationType.Put, null,
new HlcTimestamp(100, 1, "remote-node"),
"different-hash" // Mismatch!
)
};
// Act & Assert
// When gap recovery triggers, the client throws SnapshotRequiredException.
// SyncOrchestrator catches SnapshotRequiredException and re-throws it to trigger full sync
// So we expect SnapshotRequiredException to bubble up (wrapped in TargetInvocationException/AggregateException if not unwrapped by helper)
await Should.ThrowAsync(async () =>
await orch.TestProcessInboundBatchAsync(client, "remote-node", entries, CancellationToken.None));
await client.Received(1).GetChainRangeAsync(Arg.Any(), Arg.Any(), Arg.Any());
}
}
}