using Microsoft.Data.Sqlite; using Microsoft.Extensions.Logging.Abstractions; using ScadaLink.Commons.Types.Enums; namespace ScadaLink.StoreAndForward.Tests; /// /// StoreAndForward-001: the active node must forward every buffer operation /// (add / remove / park) to the standby via the ReplicationService, so a /// failover does not lose the buffer. /// public class StoreAndForwardReplicationTests : IAsyncLifetime, IDisposable { private readonly SqliteConnection _keepAlive; private readonly StoreAndForwardStorage _storage; private readonly StoreAndForwardService _service; private readonly List _replicated = new(); public StoreAndForwardReplicationTests() { var connStr = $"Data Source=ReplTests_{Guid.NewGuid():N};Mode=Memory;Cache=Shared"; _keepAlive = new SqliteConnection(connStr); _keepAlive.Open(); _storage = new StoreAndForwardStorage(connStr, NullLogger.Instance); var options = new StoreAndForwardOptions { DefaultRetryInterval = TimeSpan.Zero, DefaultMaxRetries = 1, RetryTimerInterval = TimeSpan.FromMinutes(10), ReplicationEnabled = true, }; var replication = new ReplicationService(options, NullLogger.Instance); replication.SetReplicationHandler(op => { lock (_replicated) _replicated.Add(op); return Task.CompletedTask; }); _service = new StoreAndForwardService( _storage, options, NullLogger.Instance, replication); } public async Task InitializeAsync() => await _storage.InitializeAsync(); public Task DisposeAsync() => Task.CompletedTask; public void Dispose() => _keepAlive.Dispose(); /// Replication is fire-and-forget (Task.Run); poll until the expected ops arrive. private async Task> WaitForReplicationAsync(int count) { for (var i = 0; i < 100; i++) { lock (_replicated) if (_replicated.Count >= count) return _replicated.ToList(); await Task.Delay(20); } lock (_replicated) return _replicated.ToList(); } [Fact] public async Task BufferingAMessage_ReplicatesAnAddOperation() { // No handler registered → message is buffered → an Add is replicated. var result = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api", """{}"""); Assert.True(result.WasBuffered); var ops = await WaitForReplicationAsync(1); Assert.Contains(ops, o => o.OperationType == ReplicationOperationType.Add && o.MessageId == result.MessageId); } [Fact] public async Task SuccessfulRetry_ReplicatesARemoveOperation() { var calls = 0; _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => ++calls == 1 ? throw new HttpRequestException("transient") : Task.FromResult(true)); var result = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api", """{}"""); await _service.RetryPendingMessagesAsync(); var ops = await WaitForReplicationAsync(2); Assert.Contains(ops, o => o.OperationType == ReplicationOperationType.Add); Assert.Contains(ops, o => o.OperationType == ReplicationOperationType.Remove && o.MessageId == result.MessageId); } [Fact] public async Task ParkedMessage_ReplicatesAParkOperation() { _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => throw new HttpRequestException("always fails")); var result = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api", """{}""", maxRetries: 1); await _service.RetryPendingMessagesAsync(); var ops = await WaitForReplicationAsync(2); Assert.Contains(ops, o => o.OperationType == ReplicationOperationType.Park && o.MessageId == result.MessageId); } /// /// StoreAndForward-016: an operator discarding a parked message must replicate /// a Remove so the standby's copy is also deleted (otherwise the discarded /// message reappears in the parked list after a failover). /// [Fact] public async Task DiscardingAParkedMessage_ReplicatesARemoveOperation() { _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => throw new HttpRequestException("always fails")); var result = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api", """{}""", maxRetries: 1); await _service.RetryPendingMessagesAsync(); // -> parked await WaitForReplicationAsync(2); var discarded = await _service.DiscardParkedMessageAsync(result.MessageId); Assert.True(discarded); var ops = await WaitForReplicationAsync(3); Assert.Contains(ops, o => o.OperationType == ReplicationOperationType.Remove && o.MessageId == result.MessageId); } /// /// StoreAndForward-016: an operator retrying a parked message must replicate a /// Requeue so the standby's copy moves back to Pending (otherwise it stays /// Parked on the standby and the operator's retry is lost across a failover). /// [Fact] public async Task RetryingAParkedMessage_ReplicatesARequeueOperation() { _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => throw new HttpRequestException("always fails")); var result = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api", """{}""", maxRetries: 1); await _service.RetryPendingMessagesAsync(); // -> parked await WaitForReplicationAsync(2); var retried = await _service.RetryParkedMessageAsync(result.MessageId); Assert.True(retried); var ops = await WaitForReplicationAsync(3); var requeue = ops.SingleOrDefault(o => o.OperationType == ReplicationOperationType.Requeue && o.MessageId == result.MessageId); Assert.NotNull(requeue); Assert.NotNull(requeue!.Message); Assert.Equal(StoreAndForwardMessageStatus.Pending, requeue.Message!.Status); } /// /// StoreAndForward-016: the standby applies a Requeue by moving its row back to /// Pending with retry_count = 0, mirroring the active node's local state. /// [Fact] public async Task ApplyReplicatedOperation_Requeue_MovesStandbyRowBackToPending() { var replication = new ReplicationService( new StoreAndForwardOptions { ReplicationEnabled = true }, NullLogger.Instance); var parked = new StoreAndForwardMessage { Id = "requeue1", Category = StoreAndForwardCategory.ExternalSystem, Target = "api", PayloadJson = "{}", RetryCount = 5, MaxRetries = 1, RetryIntervalMs = 0, CreatedAt = DateTimeOffset.UtcNow, Status = StoreAndForwardMessageStatus.Parked, }; await _storage.EnqueueAsync(parked); var requeued = new StoreAndForwardMessage { Id = parked.Id, Category = parked.Category, Target = parked.Target, PayloadJson = parked.PayloadJson, RetryCount = 0, MaxRetries = parked.MaxRetries, RetryIntervalMs = parked.RetryIntervalMs, CreatedAt = parked.CreatedAt, Status = StoreAndForwardMessageStatus.Pending, }; await replication.ApplyReplicatedOperationAsync( new ReplicationOperation(ReplicationOperationType.Requeue, parked.Id, requeued), _storage); var row = await _storage.GetMessageByIdAsync(parked.Id); Assert.NotNull(row); Assert.Equal(StoreAndForwardMessageStatus.Pending, row!.Status); Assert.Equal(0, row.RetryCount); } }