using Microsoft.Extensions.Logging.Abstractions; using ScadaLink.Commons.Types.Enums; using ScadaLink.StoreAndForward; namespace ScadaLink.IntegrationTests; /// /// WP-3 (Phase 8): Dual-node failure recovery. /// Both nodes down, first up forms cluster, rebuilds from persistent storage. /// Tests for both central and site topologies. /// public class DualNodeRecoveryTests { [Trait("Category", "Integration")] [Fact] public async Task SiteTopology_BothNodesDown_FirstNodeRebuildsFromSQLite() { // Scenario: both site nodes crash. First node to restart opens the existing // SQLite database and finds all buffered S&F messages intact. var dbPath = Path.Combine(Path.GetTempPath(), $"sf_dual_{Guid.NewGuid():N}.db"); var connStr = $"Data Source={dbPath}"; try { // Setup: populate SQLite with messages (simulating pre-crash state) var storage = new StoreAndForwardStorage(connStr, NullLogger.Instance); await storage.InitializeAsync(); var messageIds = new List(); for (var i = 0; i < 10; i++) { var msg = new StoreAndForwardMessage { Id = Guid.NewGuid().ToString("N"), Category = StoreAndForwardCategory.ExternalSystem, Target = $"api-{i % 3}", PayloadJson = $$"""{"index":{{i}}}""", RetryCount = i, MaxRetries = 50, RetryIntervalMs = 30000, CreatedAt = DateTimeOffset.UtcNow.AddMinutes(-i), Status = StoreAndForwardMessageStatus.Pending, OriginInstanceName = $"instance-{i % 2}" }; await storage.EnqueueAsync(msg); messageIds.Add(msg.Id); } // Both nodes down — simulate by creating a fresh storage instance // (new process connecting to same SQLite file) var recoveryStorage = new StoreAndForwardStorage(connStr, NullLogger.Instance); await recoveryStorage.InitializeAsync(); // Verify all messages are available for retry var pending = await recoveryStorage.GetMessagesForRetryAsync(); Assert.Equal(10, pending.Count); // Verify messages are ordered by creation time (oldest first) for (var i = 1; i < pending.Count; i++) { Assert.True(pending[i].CreatedAt >= pending[i - 1].CreatedAt); } // Verify per-instance message counts var instance0Count = await recoveryStorage.GetMessageCountByOriginInstanceAsync("instance-0"); var instance1Count = await recoveryStorage.GetMessageCountByOriginInstanceAsync("instance-1"); Assert.Equal(5, instance0Count); Assert.Equal(5, instance1Count); } finally { if (File.Exists(dbPath)) File.Delete(dbPath); } } [Trait("Category", "Integration")] [Fact] public async Task SiteTopology_DualCrash_ParkedMessagesPreserved() { var dbPath = Path.Combine(Path.GetTempPath(), $"sf_dual_parked_{Guid.NewGuid():N}.db"); var connStr = $"Data Source={dbPath}"; try { var storage = new StoreAndForwardStorage(connStr, NullLogger.Instance); await storage.InitializeAsync(); // Mix of pending and parked messages await storage.EnqueueAsync(new StoreAndForwardMessage { Id = "pending-1", Category = StoreAndForwardCategory.ExternalSystem, Target = "api", PayloadJson = "{}", MaxRetries = 50, RetryIntervalMs = 30000, CreatedAt = DateTimeOffset.UtcNow, Status = StoreAndForwardMessageStatus.Pending, }); await storage.EnqueueAsync(new StoreAndForwardMessage { Id = "parked-1", Category = StoreAndForwardCategory.Notification, Target = "alerts", PayloadJson = "{}", MaxRetries = 3, RetryIntervalMs = 10000, CreatedAt = DateTimeOffset.UtcNow.AddHours(-2), RetryCount = 3, Status = StoreAndForwardMessageStatus.Parked, LastError = "SMTP unreachable" }); // Dual crash recovery var recoveryStorage = new StoreAndForwardStorage(connStr, NullLogger.Instance); await recoveryStorage.InitializeAsync(); var pendingCount = await recoveryStorage.GetMessageCountByStatusAsync(StoreAndForwardMessageStatus.Pending); var parkedCount = await recoveryStorage.GetMessageCountByStatusAsync(StoreAndForwardMessageStatus.Parked); Assert.Equal(1, pendingCount); Assert.Equal(1, parkedCount); // Parked message can be retried after recovery var success = await recoveryStorage.RetryParkedMessageAsync("parked-1"); Assert.True(success); pendingCount = await recoveryStorage.GetMessageCountByStatusAsync(StoreAndForwardMessageStatus.Pending); parkedCount = await recoveryStorage.GetMessageCountByStatusAsync(StoreAndForwardMessageStatus.Parked); Assert.Equal(2, pendingCount); Assert.Equal(0, parkedCount); } finally { if (File.Exists(dbPath)) File.Delete(dbPath); } } [Trait("Category", "Integration")] [Fact] public void CentralTopology_BothNodesDown_FirstNodeFormsSingleNodeCluster() { // Structural verification: Akka.NET cluster config uses min-nr-of-members = 1, // so a single node can form a cluster. The keep-oldest split-brain resolver // with down-if-alone handles the partition scenario. // // When both central nodes crash, the first node to restart: // 1. Forms a single-node cluster (min-nr-of-members = 1) // 2. Connects to SQL Server (which persists all deployment state) // 3. Becomes the active node and accepts traffic // // The second node joins the existing cluster when it starts. // Verify the deployment status model supports recovery from SQL Server var statuses = new[] { new Commons.Messages.Deployment.DeploymentStatusResponse( "dep-1", "inst-1", Commons.Types.Enums.DeploymentStatus.Success, null, DateTimeOffset.UtcNow), new Commons.Messages.Deployment.DeploymentStatusResponse( "dep-1", "inst-2", Commons.Types.Enums.DeploymentStatus.InProgress, null, DateTimeOffset.UtcNow), }; // Each instance has independent status — recovery reads from DB Assert.Equal(DeploymentStatus.Success, statuses[0].Status); Assert.Equal(DeploymentStatus.InProgress, statuses[1].Status); } [Trait("Category", "Integration")] [Fact] public async Task SQLiteStorage_InitializeIdempotent_SafeOnRecovery() { // CREATE TABLE IF NOT EXISTS is idempotent — safe to call on recovery var dbPath = Path.Combine(Path.GetTempPath(), $"sf_idempotent_{Guid.NewGuid():N}.db"); var connStr = $"Data Source={dbPath}"; try { var storage1 = new StoreAndForwardStorage(connStr, NullLogger.Instance); await storage1.InitializeAsync(); await storage1.EnqueueAsync(new StoreAndForwardMessage { Id = "test-1", Category = StoreAndForwardCategory.ExternalSystem, Target = "api", PayloadJson = "{}", MaxRetries = 50, RetryIntervalMs = 30000, CreatedAt = DateTimeOffset.UtcNow, Status = StoreAndForwardMessageStatus.Pending, }); // Second InitializeAsync on same DB should be safe (no data loss) var storage2 = new StoreAndForwardStorage(connStr, NullLogger.Instance); await storage2.InitializeAsync(); var msg = await storage2.GetMessageByIdAsync("test-1"); Assert.NotNull(msg); Assert.Equal("api", msg!.Target); } finally { if (File.Exists(dbPath)) File.Delete(dbPath); } } }