using ZB.MOM.WW.CBDDC.Core; using ZB.MOM.WW.CBDDC.Core.Network; using ZB.MOM.WW.CBDDC.Core.Storage; using ZB.MOM.WW.CBDDC.Core.Sync; using ZB.MOM.WW.CBDDC.Persistence.BLite; using Microsoft.Extensions.Logging.Abstractions; using System.Text.Json; using System.Text.Json.Nodes; using Xunit; using ZB.MOM.WW.CBDDC.Persistence; namespace ZB.MOM.WW.CBDDC.Sample.Console.Tests; public class SnapshotStoreTests : IDisposable { private readonly string _testDbPath; private readonly SampleDbContext _context; private readonly SampleDocumentStore _documentStore; private readonly BLiteOplogStore _oplogStore; private readonly BLitePeerConfigurationStore _peerConfigStore; private readonly BLitePeerOplogConfirmationStore _peerConfirmationStore; private readonly SnapshotStore _snapshotStore; private readonly IPeerNodeConfigurationProvider _configProvider; /// /// Initializes a new instance of the class. /// public SnapshotStoreTests() { _testDbPath = Path.Combine(Path.GetTempPath(), $"test-snapshot-{Guid.NewGuid()}.blite"); _context = new SampleDbContext(_testDbPath); _configProvider = CreateConfigProvider("test-node"); var vectorClock = new VectorClockService(); _documentStore = new SampleDocumentStore(_context, _configProvider, vectorClock, NullLogger.Instance); var snapshotMetadataStore = new BLiteSnapshotMetadataStore( _context, NullLogger>.Instance); _oplogStore = new BLiteOplogStore( _context, _documentStore, new LastWriteWinsConflictResolver(), vectorClock, snapshotMetadataStore, NullLogger>.Instance); _peerConfigStore = new BLitePeerConfigurationStore( _context, NullLogger>.Instance); _peerConfirmationStore = new BLitePeerOplogConfirmationStore( _context, NullLogger>.Instance); _snapshotStore = new SnapshotStore( _documentStore, _peerConfigStore, _oplogStore, new LastWriteWinsConflictResolver(), NullLogger.Instance, _peerConfirmationStore); } /// /// Verifies that creating a snapshot writes valid JSON to the output stream. /// [Fact] public async Task CreateSnapshotAsync_WritesValidJsonToStream() { // Arrange - Add some data var user = new User { Id = "user-1", Name = "Alice", Age = 30 }; await _context.Users.InsertAsync(user); await _context.SaveChangesAsync(); // Act - Create snapshot using var stream = new MemoryStream(); await _snapshotStore.CreateSnapshotAsync(stream); // Assert - Stream should contain valid JSON (stream.Length > 0).ShouldBeTrue("Snapshot stream should not be empty"); // Reset stream position and verify JSON is valid stream.Position = 0; var json = await new StreamReader(stream).ReadToEndAsync(); string.IsNullOrWhiteSpace(json).ShouldBeFalse("Snapshot JSON should not be empty"); json.Trim().ShouldStartWith("{"); // Verify it's valid JSON by parsing var doc = JsonDocument.Parse(json); doc.ShouldNotBeNull(); // Verify structure doc.RootElement.TryGetProperty("Version", out _).ShouldBeTrue("Should have Version property"); doc.RootElement.TryGetProperty("Documents", out _).ShouldBeTrue("Should have Documents property"); doc.RootElement.TryGetProperty("Oplog", out _).ShouldBeTrue("Should have Oplog property"); doc.RootElement.TryGetProperty("PeerConfirmations", out _).ShouldBeTrue("Should have PeerConfirmations property"); } /// /// Verifies that snapshot creation includes all persisted documents. /// [Fact] public async Task CreateSnapshotAsync_IncludesAllDocuments() { // Arrange - Add multiple documents await _context.Users.InsertAsync(new User { Id = "u1", Name = "User 1", Age = 20 }); await _context.Users.InsertAsync(new User { Id = "u2", Name = "User 2", Age = 25 }); await _context.TodoLists.InsertAsync(new TodoList { Id = "t1", Name = "My List", Items = [new TodoItem { Task = "Task 1", Completed = false }] }); await _context.SaveChangesAsync(); // Act using var stream = new MemoryStream(); await _snapshotStore.CreateSnapshotAsync(stream); // Assert stream.Position = 0; var json = await new StreamReader(stream).ReadToEndAsync(); var doc = JsonDocument.Parse(json); var documents = doc.RootElement.GetProperty("Documents"); documents.GetArrayLength().ShouldBe(3); } /// /// Verifies that creating and replacing a snapshot preserves document data. /// [Fact] public async Task RoundTrip_CreateAndReplace_PreservesData() { // Arrange - Add data to source var originalUser = new User { Id = "user-rt", Name = "RoundTrip User", Age = 42 }; await _context.Users.InsertAsync(originalUser); await _peerConfirmationStore.UpdateConfirmationAsync( "peer-rt", "source-rt", new HlcTimestamp(500, 2, "source-rt"), "hash-rt"); await _context.SaveChangesAsync(); // Create snapshot using var snapshotStream = new MemoryStream(); await _snapshotStore.CreateSnapshotAsync(snapshotStream); snapshotStream.Position = 0; var snapshotJson = await new StreamReader(snapshotStream).ReadToEndAsync(); var snapshotDoc = JsonDocument.Parse(snapshotJson); snapshotDoc.RootElement.GetProperty("PeerConfirmations").GetArrayLength().ShouldBe(1); snapshotStream.Position = 0; // Create a new context/stores (simulating a different node) var newDbPath = Path.Combine(Path.GetTempPath(), $"test-snapshot-target-{Guid.NewGuid()}.blite"); try { using var newContext = new SampleDbContext(newDbPath); var newConfigProvider = CreateConfigProvider("test-new-node"); var newVectorClock = new VectorClockService(); var newDocStore = new SampleDocumentStore(newContext, newConfigProvider, newVectorClock, NullLogger.Instance); var newSnapshotMetaStore = new BLiteSnapshotMetadataStore( newContext, NullLogger>.Instance); var newOplogStore = new BLiteOplogStore( newContext, newDocStore, new LastWriteWinsConflictResolver(), newVectorClock, newSnapshotMetaStore, NullLogger>.Instance); var newPeerStore = new BLitePeerConfigurationStore( newContext, NullLogger>.Instance); var newPeerConfirmationStore = new BLitePeerOplogConfirmationStore( newContext, NullLogger>.Instance); var newSnapshotStore = new SnapshotStore( newDocStore, newPeerStore, newOplogStore, new LastWriteWinsConflictResolver(), NullLogger.Instance, newPeerConfirmationStore); // Act - Replace database with snapshot await newSnapshotStore.ReplaceDatabaseAsync(snapshotStream); // Assert - Data should be restored var restoredUser = newContext.Users.FindById("user-rt"); restoredUser.ShouldNotBeNull(); restoredUser.Name.ShouldBe("RoundTrip User"); restoredUser.Age.ShouldBe(42); var restoredConfirmations = (await newPeerConfirmationStore.GetConfirmationsAsync()).ToList(); restoredConfirmations.Count.ShouldBe(1); restoredConfirmations[0].PeerNodeId.ShouldBe("peer-rt"); restoredConfirmations[0].SourceNodeId.ShouldBe("source-rt"); restoredConfirmations[0].ConfirmedWall.ShouldBe(500); restoredConfirmations[0].ConfirmedLogic.ShouldBe(2); restoredConfirmations[0].ConfirmedHash.ShouldBe("hash-rt"); } finally { if (File.Exists(newDbPath)) try { File.Delete(newDbPath); } catch { } } } /// /// Verifies that merging a snapshot preserves existing data and adds new data. /// [Fact] public async Task MergeSnapshotAsync_MergesWithExistingData() { // Arrange - Add initial data await _context.Users.InsertAsync(new User { Id = "existing", Name = "Existing User", Age = 30 }); await _peerConfirmationStore.UpdateConfirmationAsync( "peer-merge", "source-a", new HlcTimestamp(100, 0, "source-a"), "target-hash-old"); await _peerConfirmationStore.UpdateConfirmationAsync( "peer-local-only", "source-local", new HlcTimestamp(50, 0, "source-local"), "target-local-hash"); await _context.SaveChangesAsync(); // Create snapshot with different data var sourceDbPath = Path.Combine(Path.GetTempPath(), $"test-snapshot-source-{Guid.NewGuid()}.blite"); MemoryStream snapshotStream; try { using var sourceContext = new SampleDbContext(sourceDbPath); await sourceContext.Users.InsertAsync(new User { Id = "new-user", Name = "New User", Age = 25 }); await sourceContext.SaveChangesAsync(); var sourceConfigProvider = CreateConfigProvider("test-source-node"); var sourceVectorClock = new VectorClockService(); var sourceDocStore = new SampleDocumentStore(sourceContext, sourceConfigProvider, sourceVectorClock, NullLogger.Instance); var sourceSnapshotMetaStore = new BLiteSnapshotMetadataStore( sourceContext, NullLogger>.Instance); var sourceOplogStore = new BLiteOplogStore( sourceContext, sourceDocStore, new LastWriteWinsConflictResolver(), sourceVectorClock, sourceSnapshotMetaStore, NullLogger>.Instance); var sourcePeerStore = new BLitePeerConfigurationStore( sourceContext, NullLogger>.Instance); var sourcePeerConfirmationStore = new BLitePeerOplogConfirmationStore( sourceContext, NullLogger>.Instance); await sourcePeerConfirmationStore.UpdateConfirmationAsync( "peer-merge", "source-a", new HlcTimestamp(200, 1, "source-a"), "source-hash-new"); await sourcePeerConfirmationStore.UpdateConfirmationAsync( "peer-merge", "source-b", new HlcTimestamp(300, 0, "source-b"), "source-hash-b"); var sourceSnapshotStore = new SnapshotStore( sourceDocStore, sourcePeerStore, sourceOplogStore, new LastWriteWinsConflictResolver(), NullLogger.Instance, sourcePeerConfirmationStore); snapshotStream = new MemoryStream(); await sourceSnapshotStore.CreateSnapshotAsync(snapshotStream); snapshotStream.Position = 0; } finally { if (File.Exists(sourceDbPath)) try { File.Delete(sourceDbPath); } catch { } } // Act - Merge snapshot into existing data await _snapshotStore.MergeSnapshotAsync(snapshotStream); // Assert - Both users should exist var existingUser = _context.Users.FindById("existing"); var newUser = _context.Users.FindById("new-user"); existingUser.ShouldNotBeNull(); newUser.ShouldNotBeNull(); existingUser.Name.ShouldBe("Existing User"); newUser.Name.ShouldBe("New User"); var confirmations = (await _peerConfirmationStore.GetConfirmationsAsync()) .OrderBy(c => c.PeerNodeId) .ThenBy(c => c.SourceNodeId) .ToList(); confirmations.Count.ShouldBe(3); confirmations[0].PeerNodeId.ShouldBe("peer-local-only"); confirmations[0].SourceNodeId.ShouldBe("source-local"); confirmations[0].ConfirmedWall.ShouldBe(50); confirmations[0].ConfirmedHash.ShouldBe("target-local-hash"); confirmations[1].PeerNodeId.ShouldBe("peer-merge"); confirmations[1].SourceNodeId.ShouldBe("source-a"); confirmations[1].ConfirmedWall.ShouldBe(200); confirmations[1].ConfirmedLogic.ShouldBe(1); confirmations[1].ConfirmedHash.ShouldBe("source-hash-new"); confirmations[2].PeerNodeId.ShouldBe("peer-merge"); confirmations[2].SourceNodeId.ShouldBe("source-b"); confirmations[2].ConfirmedWall.ShouldBe(300); confirmations[2].ConfirmedHash.ShouldBe("source-hash-b"); } /// /// Verifies that replace can consume legacy snapshots that do not include peer confirmations. /// [Fact] public async Task ReplaceDatabaseAsync_LegacySnapshotWithoutPeerConfirmations_IsSupported() { // Arrange await _context.Users.InsertAsync(new User { Id = "legacy-user", Name = "Legacy User", Age = 33 }); await _context.SaveChangesAsync(); using var snapshotStream = new MemoryStream(); await _snapshotStore.CreateSnapshotAsync(snapshotStream); snapshotStream.Position = 0; var snapshotJson = await new StreamReader(snapshotStream).ReadToEndAsync(); var legacySnapshot = JsonNode.Parse(snapshotJson)!.AsObject(); legacySnapshot.Remove("PeerConfirmations"); using var legacyStream = new MemoryStream(); await using (var writer = new Utf8JsonWriter(legacyStream)) { legacySnapshot.WriteTo(writer); } legacyStream.Position = 0; // Act await _snapshotStore.ReplaceDatabaseAsync(legacyStream); // Assert _context.Users.FindById("legacy-user").ShouldNotBeNull(); (await _peerConfirmationStore.GetConfirmationsAsync()).Count().ShouldBe(0); } /// /// Verifies that snapshot creation succeeds for an empty database. /// [Fact] public async Task CreateSnapshotAsync_HandlesEmptyDatabase() { // Act - Create snapshot from empty database using var stream = new MemoryStream(); await _snapshotStore.CreateSnapshotAsync(stream); // Assert - Should still produce valid JSON (stream.Length > 0).ShouldBeTrue(); stream.Position = 0; var json = await new StreamReader(stream).ReadToEndAsync(); var doc = JsonDocument.Parse(json); var documents = doc.RootElement.GetProperty("Documents"); documents.GetArrayLength().ShouldBe(0); } /// /// Verifies that snapshot creation includes oplog entries. /// [Fact] public async Task CreateSnapshotAsync_IncludesOplogEntries() { // Arrange - Create some oplog entries via document changes await _context.Users.InsertAsync(new User { Id = "op-user", Name = "Oplog User", Age = 20 }); await _context.SaveChangesAsync(); // Manually add an oplog entry to ensure it's captured var oplogEntry = new OplogEntry( "Users", "manual-key", OperationType.Put, JsonDocument.Parse("{\"test\": true}").RootElement, new HlcTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), 0, "test-node"), "" ); await _oplogStore.AppendOplogEntryAsync(oplogEntry); // Act using var stream = new MemoryStream(); await _snapshotStore.CreateSnapshotAsync(stream); // Assert stream.Position = 0; var json = await new StreamReader(stream).ReadToEndAsync(); var doc = JsonDocument.Parse(json); var oplog = doc.RootElement.GetProperty("Oplog"); (oplog.GetArrayLength() >= 1).ShouldBeTrue("Should have at least one oplog entry"); } /// /// Releases resources created for test execution. /// public void Dispose() { _documentStore?.Dispose(); _context?.Dispose(); if (File.Exists(_testDbPath)) { try { File.Delete(_testDbPath); } catch { } } } private static IPeerNodeConfigurationProvider CreateConfigProvider(string nodeId) { var configProvider = Substitute.For(); configProvider.GetConfiguration().Returns(new PeerNodeConfiguration { NodeId = nodeId, TcpPort = 5000, AuthToken = "test-token", OplogRetentionHours = 24, MaintenanceIntervalMinutes = 60 }); return configProvider; } }