using Microsoft.Data.Sqlite; using Microsoft.Extensions.Logging.Abstractions; using ScadaLink.Commons.Types.Enums; namespace ScadaLink.StoreAndForward.Tests; /// /// StoreAndForward-001: the active node must forward every buffer operation /// (add / remove / park) to the standby via the ReplicationService, so a /// failover does not lose the buffer. /// public class StoreAndForwardReplicationTests : IAsyncLifetime, IDisposable { private readonly SqliteConnection _keepAlive; private readonly StoreAndForwardStorage _storage; private readonly StoreAndForwardService _service; private readonly List _replicated = new(); public StoreAndForwardReplicationTests() { var connStr = $"Data Source=ReplTests_{Guid.NewGuid():N};Mode=Memory;Cache=Shared"; _keepAlive = new SqliteConnection(connStr); _keepAlive.Open(); _storage = new StoreAndForwardStorage(connStr, NullLogger.Instance); var options = new StoreAndForwardOptions { DefaultRetryInterval = TimeSpan.Zero, DefaultMaxRetries = 1, RetryTimerInterval = TimeSpan.FromMinutes(10), ReplicationEnabled = true, }; var replication = new ReplicationService(options, NullLogger.Instance); replication.SetReplicationHandler(op => { lock (_replicated) _replicated.Add(op); return Task.CompletedTask; }); _service = new StoreAndForwardService( _storage, options, NullLogger.Instance, replication); } public async Task InitializeAsync() => await _storage.InitializeAsync(); public Task DisposeAsync() => Task.CompletedTask; public void Dispose() => _keepAlive.Dispose(); /// Replication is fire-and-forget (Task.Run); poll until the expected ops arrive. private async Task> WaitForReplicationAsync(int count) { for (var i = 0; i < 100; i++) { lock (_replicated) if (_replicated.Count >= count) return _replicated.ToList(); await Task.Delay(20); } lock (_replicated) return _replicated.ToList(); } [Fact] public async Task BufferingAMessage_ReplicatesAnAddOperation() { // No handler registered → message is buffered → an Add is replicated. var result = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api", """{}"""); Assert.True(result.WasBuffered); var ops = await WaitForReplicationAsync(1); Assert.Contains(ops, o => o.OperationType == ReplicationOperationType.Add && o.MessageId == result.MessageId); } [Fact] public async Task SuccessfulRetry_ReplicatesARemoveOperation() { var calls = 0; _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => ++calls == 1 ? throw new HttpRequestException("transient") : Task.FromResult(true)); var result = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api", """{}"""); await _service.RetryPendingMessagesAsync(); var ops = await WaitForReplicationAsync(2); Assert.Contains(ops, o => o.OperationType == ReplicationOperationType.Add); Assert.Contains(ops, o => o.OperationType == ReplicationOperationType.Remove && o.MessageId == result.MessageId); } [Fact] public async Task ParkedMessage_ReplicatesAParkOperation() { _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => throw new HttpRequestException("always fails")); var result = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api", """{}""", maxRetries: 1); await _service.RetryPendingMessagesAsync(); var ops = await WaitForReplicationAsync(2); Assert.Contains(ops, o => o.OperationType == ReplicationOperationType.Park && o.MessageId == result.MessageId); } }