Files
CBDDC/tests/ZB.MOM.WW.CBDDC.Sample.Console.Tests/SurrealCdcDurabilityTests.cs
Joseph Doherty bd10914828
All checks were successful
NuGet Package Publish / nuget (push) Successful in 1m17s
Harden Surreal migration with retry/coverage fixes and XML docs cleanup
2026-02-22 05:39:00 -05:00

645 lines
24 KiB
C#

using System.Text.Json;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using ZB.MOM.WW.CBDDC.Core;
using ZB.MOM.WW.CBDDC.Core.Network;
using ZB.MOM.WW.CBDDC.Core.Storage;
using ZB.MOM.WW.CBDDC.Core.Sync;
using ZB.MOM.WW.CBDDC.Persistence;
using ZB.MOM.WW.CBDDC.Persistence.Surreal;
namespace ZB.MOM.WW.CBDDC.Sample.Console.Tests;
[Collection("SurrealCdcDurability")]
public class SurrealCdcDurabilityTests
{
/// <summary>
/// Verifies checkpoints persist latest local changes per consumer across restarts.
/// </summary>
[Fact]
public async Task CheckpointPersistence_ShouldTrackLatestLocalChange_AndPersistPerConsumer()
{
string dbPath = CreateTemporaryDatabasePath();
const string nodeId = "node-checkpoint";
const string defaultConsumer = "consumer-default";
const string secondaryConsumer = "consumer-secondary";
try
{
HlcTimestamp expectedTimestamp = default;
string expectedHash = "";
DateTimeOffset previousUpdatedUtc = DateTimeOffset.MinValue;
await using (var harness = await CdcTestHarness.OpenWithRetriesAsync(dbPath, nodeId, defaultConsumer))
{
var user = CreateUser("checkpoint-user", "Alice", 30, "Austin");
await harness.Context.Users.InsertAsync(user);
await harness.Context.SaveChangesAsync();
await harness.PollAsync();
user.Age = 31;
user.Address = new Address { City = "Dallas" };
await harness.Context.Users.UpdateAsync(user);
await harness.Context.SaveChangesAsync();
await harness.PollAsync();
await WaitForConditionAsync(
async () => (await harness.GetEntriesByKeyAsync("Users", "checkpoint-user")).Count >= 2,
"Timed out waiting for checkpoint-user oplog entries.");
var entries = await harness.GetEntriesByKeyAsync("Users", "checkpoint-user");
entries.Count.ShouldBe(2);
expectedTimestamp = entries[^1].Timestamp;
expectedHash = entries[^1].Hash;
var checkpoint = await harness.CheckpointPersistence.GetCheckpointAsync();
checkpoint.ShouldNotBeNull();
checkpoint!.Timestamp.ShouldBe(expectedTimestamp);
checkpoint.LastHash.ShouldBe(expectedHash);
previousUpdatedUtc = checkpoint.UpdatedUtc;
await harness.CheckpointPersistence.UpsertCheckpointAsync(
entries[0].Timestamp,
entries[0].Hash,
secondaryConsumer);
var secondary = await harness.CheckpointPersistence.GetCheckpointAsync(secondaryConsumer);
secondary.ShouldNotBeNull();
secondary!.Timestamp.ShouldBe(entries[0].Timestamp);
secondary.LastHash.ShouldBe(entries[0].Hash);
}
await using (var restarted = await CdcTestHarness.OpenWithRetriesAsync(dbPath, nodeId, defaultConsumer))
{
var restoredDefault = await restarted.CheckpointPersistence.GetCheckpointAsync();
restoredDefault.ShouldNotBeNull();
restoredDefault!.Timestamp.ShouldBe(expectedTimestamp);
restoredDefault.LastHash.ShouldBe(expectedHash);
restoredDefault.UpdatedUtc.ShouldBe(previousUpdatedUtc);
var restoredSecondary = await restarted.CheckpointPersistence.GetCheckpointAsync(secondaryConsumer);
restoredSecondary.ShouldNotBeNull();
restoredSecondary!.LastHash.ShouldNotBe(restoredDefault.LastHash);
}
}
finally
{
await DeleteDirectoryWithRetriesAsync(dbPath);
}
}
/// <summary>
/// Verifies recovery resumes from a persisted checkpoint and advances after catch-up.
/// </summary>
[Fact]
public async Task RestartRecovery_ShouldResumeCatchUpFromPersistedCheckpoint_InRocksDb()
{
string dbPath = CreateTemporaryDatabasePath();
const string nodeId = "node-resume";
const string consumerId = "consumer-resume";
HlcTimestamp resumeTimestamp = default;
string resumeHash = "";
string expectedFinalHash = "";
try
{
await using (var initial = await CdcTestHarness.OpenWithRetriesAsync(dbPath, nodeId, consumerId))
{
await initial.Context.Users.InsertAsync(CreateUser("resume-1", "User One", 18, "Rome"));
await initial.Context.SaveChangesAsync();
await initial.PollAsync();
await initial.Context.Users.InsertAsync(CreateUser("resume-2", "User Two", 19, "Milan"));
await initial.Context.SaveChangesAsync();
await initial.PollAsync();
await WaitForConditionAsync(
async () => (await initial.GetEntriesByCollectionAsync("Users")).Count >= 2,
"Timed out waiting for resume oplog entries.");
var entries = await initial.GetEntriesByCollectionAsync("Users");
entries.Count.ShouldBe(2);
resumeTimestamp = entries[0].Timestamp;
resumeHash = entries[0].Hash;
expectedFinalHash = entries[1].Hash;
await initial.CheckpointPersistence.UpsertCheckpointAsync(resumeTimestamp, resumeHash);
}
await using (var restarted = await CdcTestHarness.OpenWithRetriesAsync(dbPath, nodeId, consumerId))
{
var checkpoint = await restarted.CheckpointPersistence.GetCheckpointAsync();
checkpoint.ShouldNotBeNull();
checkpoint!.Timestamp.ShouldBe(resumeTimestamp);
checkpoint.LastHash.ShouldBe(resumeHash);
var catchUp = (await restarted.OplogStore.GetOplogAfterAsync(checkpoint.Timestamp))
.OrderBy(e => e.Timestamp.PhysicalTime)
.ThenBy(e => e.Timestamp.LogicalCounter)
.ToList();
catchUp.Count.ShouldBe(1);
catchUp[0].Hash.ShouldBe(expectedFinalHash);
await restarted.CheckpointPersistence.AdvanceCheckpointAsync(catchUp[0]);
}
await using (var recovered = await CdcTestHarness.OpenWithRetriesAsync(dbPath, nodeId, consumerId))
{
var finalCheckpoint = await recovered.CheckpointPersistence.GetCheckpointAsync();
finalCheckpoint.ShouldNotBeNull();
finalCheckpoint!.LastHash.ShouldBe(expectedFinalHash);
var remaining = await recovered.OplogStore.GetOplogAfterAsync(finalCheckpoint.Timestamp);
remaining.ShouldBeEmpty();
}
}
finally
{
await DeleteDirectoryWithRetriesAsync(dbPath);
}
}
/// <summary>
/// Verifies duplicate remote apply windows are idempotent without loopback entries.
/// </summary>
[Fact]
public async Task RemoteApply_ShouldBeIdempotentAcrossDuplicateWindow_WithoutLoopbackEntries()
{
string dbPath = CreateTemporaryDatabasePath();
const string localNodeId = "node-local";
const string remoteNodeId = "node-remote";
try
{
await using var harness = await CdcTestHarness.OpenWithRetriesAsync(
dbPath,
localNodeId,
"consumer-loopback");
await harness.Context.Users.InsertAsync(CreateUser("loopback-user", "Loopback", 40, "Boston"));
await harness.Context.SaveChangesAsync();
await harness.PollAsync();
await WaitForConditionAsync(
async () => (await harness.GetEntriesByKeyAsync("Users", "loopback-user")).Count >= 1,
"Timed out waiting for loopback-user insert oplog entry.");
var localEntries = await harness.GetEntriesByKeyAsync("Users", "loopback-user");
localEntries.Count.ShouldBe(1);
localEntries[0].Operation.ShouldBe(OperationType.Put);
localEntries[0].Timestamp.NodeId.ShouldBe(localNodeId);
var remoteDelete = new OplogEntry(
"Users",
"loopback-user",
OperationType.Delete,
null,
new HlcTimestamp(localEntries[0].Timestamp.PhysicalTime + 10, 0, remoteNodeId),
localEntries[0].Hash);
var duplicateWindow = new[] { remoteDelete, remoteDelete };
await harness.OplogStore.ApplyBatchAsync(duplicateWindow);
await harness.OplogStore.ApplyBatchAsync(duplicateWindow);
harness.Context.Users.FindById("loopback-user").ShouldBeNull();
var allEntries = await harness.GetEntriesByKeyAsync("Users", "loopback-user");
allEntries.Count(e => e.Hash == remoteDelete.Hash).ShouldBe(1);
allEntries.Count(e => e.Operation == OperationType.Delete && e.Timestamp.NodeId == localNodeId)
.ShouldBe(0);
allEntries.Count(e => e.Operation == OperationType.Delete && e.Timestamp.NodeId == remoteNodeId)
.ShouldBe(1);
}
finally
{
await DeleteDirectoryWithRetriesAsync(dbPath);
}
}
/// <summary>
/// Verifies local deletes persist tombstone metadata and advance checkpoints.
/// </summary>
[Fact]
public async Task LocalDelete_ShouldPersistTombstoneMetadata_AndAdvanceCheckpoint()
{
string dbPath = CreateTemporaryDatabasePath();
const string nodeId = "node-tombstone";
try
{
await using var harness = await CdcTestHarness.OpenWithRetriesAsync(
dbPath,
nodeId,
"consumer-tombstone");
await harness.Context.Users.InsertAsync(CreateUser("tombstone-user", "Before Delete", 28, "Turin"));
await harness.Context.SaveChangesAsync();
await harness.PollAsync();
await harness.Context.Users.DeleteAsync("tombstone-user");
await harness.Context.SaveChangesAsync();
await harness.PollAsync();
harness.Context.Users.FindById("tombstone-user").ShouldBeNull();
await WaitForConditionAsync(
async () => (await harness.GetEntriesByKeyAsync("Users", "tombstone-user")).Count >= 2,
"Timed out waiting for tombstone-user oplog entries.");
var entries = await harness.GetEntriesByKeyAsync("Users", "tombstone-user");
entries.Count.ShouldBe(2);
var deleteEntry = entries.Last(e => e.Operation == OperationType.Delete);
var metadata = await harness.MetadataStore.GetMetadataAsync("Users", "tombstone-user");
metadata.ShouldNotBeNull();
metadata!.IsDeleted.ShouldBeTrue();
metadata.UpdatedAt.ShouldBe(deleteEntry.Timestamp);
var checkpoint = await harness.CheckpointPersistence.GetCheckpointAsync();
checkpoint.ShouldNotBeNull();
checkpoint!.LastHash.ShouldBe(deleteEntry.Hash);
checkpoint.Timestamp.ShouldBe(deleteEntry.Timestamp);
}
finally
{
await DeleteDirectoryWithRetriesAsync(dbPath);
}
}
private static User CreateUser(string id, string name, int age, string city)
{
return new User
{
Id = id,
Name = name,
Age = age,
Address = new Address { City = city }
};
}
private static string CreateTemporaryDatabasePath()
{
return Path.Combine(Path.GetTempPath(), $"cbddc-cdc-{Guid.NewGuid():N}.rocksdb");
}
private static async Task DeleteDirectoryWithRetriesAsync(string path)
{
for (var attempt = 0; attempt < 5; attempt++)
try
{
if (Directory.Exists(path)) Directory.Delete(path, true);
return;
}
catch when (attempt < 4)
{
await Task.Delay(50);
}
}
private static async Task WaitForConditionAsync(
Func<Task<bool>> predicate,
string failureMessage,
int timeoutMs = 6000,
int pollMs = 50)
{
DateTimeOffset deadline = DateTimeOffset.UtcNow.AddMilliseconds(timeoutMs);
while (DateTimeOffset.UtcNow < deadline)
{
if (await predicate()) return;
await Task.Delay(pollMs);
}
throw new TimeoutException(failureMessage);
}
}
[CollectionDefinition("SurrealCdcDurability", DisableParallelization = true)]
public sealed class SurrealCdcDurabilityCollection;
internal sealed class CdcTestHarness : IAsyncDisposable
{
private readonly VectorClockService _vectorClock;
private readonly CBDDCSurrealEmbeddedOptions _options;
private CdcTestHarness(string databasePath, string nodeId, string consumerId)
{
_options = new CBDDCSurrealEmbeddedOptions
{
Cdc = new CBDDCSurrealCdcOptions
{
Enabled = true,
ConsumerId = consumerId,
CheckpointTable = "cbddc_cdc_checkpoint"
}
};
Context = new SampleDbContext(databasePath);
_vectorClock = new VectorClockService();
var configProvider = Substitute.For<IPeerNodeConfigurationProvider>();
configProvider.GetConfiguration().Returns(new PeerNodeConfiguration
{
NodeId = nodeId,
AuthToken = "test-token",
TcpPort = 0
});
CheckpointPersistence = new SurrealCdcCheckpointPersistence(
Context.SurrealEmbeddedClient,
Context.SchemaInitializer,
_options);
DocumentStore = new CheckpointedSampleDocumentStore(
Context,
configProvider,
_vectorClock,
CheckpointPersistence,
_options,
NullLogger<CheckpointedSampleDocumentStore>.Instance);
OplogStore = new SurrealOplogStore(
Context.SurrealEmbeddedClient,
Context.SchemaInitializer,
DocumentStore,
new LastWriteWinsConflictResolver(),
_vectorClock,
null,
NullLogger<SurrealOplogStore>.Instance);
MetadataStore = new SurrealDocumentMetadataStore(
Context.SurrealEmbeddedClient,
Context.SchemaInitializer,
NullLogger<SurrealDocumentMetadataStore>.Instance);
}
/// <summary>
/// Gets the sample database context.
/// </summary>
public SampleDbContext Context { get; }
/// <summary>
/// Gets the checkpointed sample document store.
/// </summary>
public CheckpointedSampleDocumentStore DocumentStore { get; }
/// <summary>
/// Gets the oplog store used by the harness.
/// </summary>
public SurrealOplogStore OplogStore { get; }
/// <summary>
/// Gets the document metadata store.
/// </summary>
public SurrealDocumentMetadataStore MetadataStore { get; }
/// <summary>
/// Gets checkpoint persistence used for CDC progress tracking.
/// </summary>
public ISurrealCdcCheckpointPersistence CheckpointPersistence { get; }
/// <summary>
/// Polls CDC once through the document store.
/// </summary>
public async Task PollAsync()
{
await DocumentStore.PollCdcOnceAsync();
}
/// <summary>
/// Creates a harness instance with retries for transient RocksDB lock contention.
/// </summary>
/// <param name="databasePath">Database directory path.</param>
/// <param name="nodeId">Node identifier.</param>
/// <param name="consumerId">CDC consumer identifier.</param>
/// <returns>Initialized test harness.</returns>
public static async Task<CdcTestHarness> OpenWithRetriesAsync(
string databasePath,
string nodeId,
string consumerId)
{
for (var attempt = 0; attempt < 8; attempt++)
try
{
return new CdcTestHarness(databasePath, nodeId, consumerId);
}
catch (Exception ex) when (IsLockContention(ex) && attempt < 7)
{
await Task.Delay(75);
}
throw new InvalidOperationException("Unable to acquire RocksDB lock for test harness.");
}
/// <summary>
/// Gets oplog entries for a collection ordered by timestamp.
/// </summary>
/// <param name="collection">Collection name.</param>
/// <returns>Ordered oplog entries.</returns>
public async Task<List<OplogEntry>> GetEntriesByCollectionAsync(string collection)
{
return (await OplogStore.ExportAsync())
.Where(e => string.Equals(e.Collection, collection, StringComparison.Ordinal))
.OrderBy(e => e.Timestamp.PhysicalTime)
.ThenBy(e => e.Timestamp.LogicalCounter)
.ToList();
}
/// <summary>
/// Gets oplog entries for a collection key ordered by timestamp.
/// </summary>
/// <param name="collection">Collection name.</param>
/// <param name="key">Document key.</param>
/// <returns>Ordered oplog entries.</returns>
public async Task<List<OplogEntry>> GetEntriesByKeyAsync(string collection, string key)
{
return (await OplogStore.ExportAsync())
.Where(e => string.Equals(e.Collection, collection, StringComparison.Ordinal) &&
string.Equals(e.Key, key, StringComparison.Ordinal))
.OrderBy(e => e.Timestamp.PhysicalTime)
.ThenBy(e => e.Timestamp.LogicalCounter)
.ToList();
}
/// <inheritdoc />
public async ValueTask DisposeAsync()
{
DocumentStore.Dispose();
Context.Dispose();
await Task.Delay(75);
}
private static bool IsLockContention(Exception exception)
{
return exception.ToString().Contains("No locks available", StringComparison.OrdinalIgnoreCase);
}
}
internal sealed class CheckpointedSampleDocumentStore : SurrealDocumentStore<SampleDbContext>
{
private const string UsersCollection = "Users";
private const string TodoListsCollection = "TodoLists";
/// <summary>
/// Initializes a new instance of the <see cref="CheckpointedSampleDocumentStore"/> class.
/// </summary>
/// <param name="context">Sample database context.</param>
/// <param name="configProvider">Peer configuration provider.</param>
/// <param name="vectorClockService">Vector clock service.</param>
/// <param name="checkpointPersistence">Checkpoint persistence implementation.</param>
/// <param name="surrealOptions">Optional Surreal embedded options.</param>
/// <param name="logger">Optional logger.</param>
public CheckpointedSampleDocumentStore(
SampleDbContext context,
IPeerNodeConfigurationProvider configProvider,
IVectorClockService vectorClockService,
ISurrealCdcCheckpointPersistence checkpointPersistence,
CBDDCSurrealEmbeddedOptions? surrealOptions = null,
ILogger<CheckpointedSampleDocumentStore>? logger = null)
: base(
context,
context.SurrealEmbeddedClient,
context.SchemaInitializer,
configProvider,
vectorClockService,
new LastWriteWinsConflictResolver(),
checkpointPersistence,
BuildPollingOptions(surrealOptions),
logger)
{
WatchCollection(UsersCollection, context.Users, u => u.Id, subscribeForInMemoryEvents: false);
WatchCollection(TodoListsCollection, context.TodoLists, t => t.Id, subscribeForInMemoryEvents: false);
}
/// <inheritdoc />
protected override async Task ApplyContentToEntityAsync(
string collection,
string key,
JsonElement content,
CancellationToken cancellationToken)
{
await UpsertEntityAsync(collection, key, content, cancellationToken);
}
/// <inheritdoc />
protected override async Task ApplyContentToEntitiesBatchAsync(
IEnumerable<(string Collection, string Key, JsonElement Content)> documents,
CancellationToken cancellationToken)
{
foreach ((string collection, string key, var content) in documents)
await UpsertEntityAsync(collection, key, content, cancellationToken);
}
/// <inheritdoc />
protected override async Task<JsonElement?> GetEntityAsJsonAsync(
string collection,
string key,
CancellationToken cancellationToken)
{
return collection switch
{
UsersCollection => SerializeEntity(await _context.Users.FindByIdAsync(key, cancellationToken)),
TodoListsCollection => SerializeEntity(await _context.TodoLists.FindByIdAsync(key, cancellationToken)),
_ => null
};
}
/// <inheritdoc />
protected override async Task RemoveEntityAsync(
string collection,
string key,
CancellationToken cancellationToken)
{
await DeleteEntityAsync(collection, key, cancellationToken);
}
/// <inheritdoc />
protected override async Task RemoveEntitiesBatchAsync(
IEnumerable<(string Collection, string Key)> documents,
CancellationToken cancellationToken)
{
foreach ((string collection, string key) in documents)
await DeleteEntityAsync(collection, key, cancellationToken);
}
/// <inheritdoc />
protected override async Task<IEnumerable<(string Key, JsonElement Content)>> GetAllEntitiesAsJsonAsync(
string collection,
CancellationToken cancellationToken)
{
return collection switch
{
UsersCollection => (await _context.Users.FindAllAsync(cancellationToken))
.Select(u => (u.Id, SerializeEntity(u)!.Value))
.ToList(),
TodoListsCollection => (await _context.TodoLists.FindAllAsync(cancellationToken))
.Select(t => (t.Id, SerializeEntity(t)!.Value))
.ToList(),
_ => []
};
}
private async Task UpsertEntityAsync(
string collection,
string key,
JsonElement content,
CancellationToken cancellationToken)
{
switch (collection)
{
case UsersCollection:
var user = content.Deserialize<User>() ??
throw new InvalidOperationException("Failed to deserialize user.");
user.Id = key;
if (await _context.Users.FindByIdAsync(key, cancellationToken) == null)
await _context.Users.InsertAsync(user, cancellationToken);
else
await _context.Users.UpdateAsync(user, cancellationToken);
break;
case TodoListsCollection:
var todo = content.Deserialize<TodoList>() ??
throw new InvalidOperationException("Failed to deserialize todo list.");
todo.Id = key;
if (await _context.TodoLists.FindByIdAsync(key, cancellationToken) == null)
await _context.TodoLists.InsertAsync(todo, cancellationToken);
else
await _context.TodoLists.UpdateAsync(todo, cancellationToken);
break;
default:
throw new NotSupportedException($"Collection '{collection}' is not supported for sync.");
}
}
private async Task DeleteEntityAsync(string collection, string key, CancellationToken cancellationToken)
{
switch (collection)
{
case UsersCollection:
await _context.Users.DeleteAsync(key, cancellationToken);
break;
case TodoListsCollection:
await _context.TodoLists.DeleteAsync(key, cancellationToken);
break;
}
}
private static JsonElement? SerializeEntity<T>(T? entity) where T : class
{
return entity == null ? null : JsonSerializer.SerializeToElement(entity);
}
private static SurrealCdcPollingOptions? BuildPollingOptions(CBDDCSurrealEmbeddedOptions? options)
{
if (options == null) return null;
return new SurrealCdcPollingOptions
{
Enabled = options.Cdc.Enabled,
PollInterval = options.Cdc.PollingInterval,
BatchSize = options.Cdc.BatchSize,
EnableLiveSelectAccelerator = options.Cdc.EnableLiveSelectAccelerator,
LiveSelectReconnectDelay = options.Cdc.LiveSelectReconnectDelay
};
}
}