using Microsoft.Data.Sqlite; using Microsoft.Extensions.Logging.Abstractions; using ScadaLink.Commons.Types.Enums; namespace ScadaLink.StoreAndForward.Tests; /// /// WP-10/12/13/14: Tests for the StoreAndForwardService retry engine and management. /// public class StoreAndForwardServiceTests : IAsyncLifetime, IDisposable { private readonly SqliteConnection _keepAlive; private readonly StoreAndForwardStorage _storage; private readonly StoreAndForwardService _service; private readonly StoreAndForwardOptions _options; public StoreAndForwardServiceTests() { var dbName = $"SvcTests_{Guid.NewGuid():N}"; var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared"; _keepAlive = new SqliteConnection(connStr); _keepAlive.Open(); _storage = new StoreAndForwardStorage(connStr, NullLogger.Instance); _options = new StoreAndForwardOptions { DefaultRetryInterval = TimeSpan.Zero, DefaultMaxRetries = 3, RetryTimerInterval = TimeSpan.FromMinutes(10) }; _service = new StoreAndForwardService( _storage, _options, NullLogger.Instance); } public async Task InitializeAsync() => await _storage.InitializeAsync(); public Task DisposeAsync() => Task.CompletedTask; public void Dispose() => _keepAlive.Dispose(); // ── WP-10: Immediate delivery ── [Fact] public async Task EnqueueAsync_ImmediateDeliverySuccess_ReturnsAcceptedNotBuffered() { _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => Task.FromResult(true)); var result = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api.example.com", """{"method":"Test"}""", "Pump1"); Assert.True(result.Accepted); Assert.False(result.WasBuffered); } [Fact] public async Task EnqueueAsync_PermanentFailure_ReturnsNotAccepted() { _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => Task.FromResult(false)); var result = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api.example.com", """{"method":"Test"}"""); Assert.False(result.Accepted); Assert.False(result.WasBuffered); } [Fact] public async Task EnqueueAsync_TransientFailure_BuffersForRetry() { _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => throw new HttpRequestException("Connection refused")); var result = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api.example.com", """{"method":"Test"}""", "Pump1"); Assert.True(result.Accepted); Assert.True(result.WasBuffered); var msg = await _storage.GetMessageByIdAsync(result.MessageId); Assert.NotNull(msg); Assert.Equal(StoreAndForwardMessageStatus.Pending, msg!.Status); // StoreAndForward-003: RetryCount counts sweep retries only; the immediate // attempt is attempt 0, so a freshly buffered message has RetryCount 0. Assert.Equal(0, msg.RetryCount); } [Fact] public async Task EnqueueAsync_NoHandler_BuffersForLater() { var result = await _service.EnqueueAsync( StoreAndForwardCategory.Notification, "alerts@company.com", """{"subject":"Alert"}"""); Assert.True(result.Accepted); Assert.True(result.WasBuffered); } // ── WP-10: Retry engine ── [Fact] public async Task RetryPendingMessagesAsync_SuccessfulRetry_RemovesMessage() { int callCount = 0; _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => { callCount++; if (callCount == 1) throw new HttpRequestException("fail"); return Task.FromResult(true); }); var result = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api", """{}"""); Assert.True(result.WasBuffered); await _service.RetryPendingMessagesAsync(); var msg = await _storage.GetMessageByIdAsync(result.MessageId); Assert.Null(msg); } [Fact] public async Task RetryPendingMessagesAsync_MaxRetriesReached_ParksMessage() { _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => throw new HttpRequestException("always fails")); var result = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api", """{}""", maxRetries: 2); // StoreAndForward-003: MaxRetries bounds sweep retries (not the immediate // attempt), so a message with MaxRetries=2 needs two retry sweeps to park. await _service.RetryPendingMessagesAsync(); var afterFirst = await _storage.GetMessageByIdAsync(result.MessageId); Assert.Equal(StoreAndForwardMessageStatus.Pending, afterFirst!.Status); await _service.RetryPendingMessagesAsync(); var msg = await _storage.GetMessageByIdAsync(result.MessageId); Assert.NotNull(msg); Assert.Equal(StoreAndForwardMessageStatus.Parked, msg!.Status); } // ── StoreAndForward-003: retry-count accounting ── [Fact] public async Task RetryPendingMessagesAsync_MaxRetriesOne_PerformsExactlyOneRetryBeforeParking() { // The immediate attempt is attempt 0; MaxRetries=1 must allow exactly one // retry sweep before parking. The pre-fix off-by-one parked with zero retries. var attempts = 0; _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => { Interlocked.Increment(ref attempts); throw new HttpRequestException("always fails"); }); var result = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api", """{}""", maxRetries: 1); // After the immediate failed attempt the message is buffered, not parked. var buffered = await _storage.GetMessageByIdAsync(result.MessageId); Assert.Equal(StoreAndForwardMessageStatus.Pending, buffered!.Status); Assert.Equal(1, attempts); // only the immediate attempt so far await _service.RetryPendingMessagesAsync(); var msg = await _storage.GetMessageByIdAsync(result.MessageId); Assert.Equal(StoreAndForwardMessageStatus.Parked, msg!.Status); Assert.Equal(2, attempts); // immediate attempt + exactly one retry Assert.Equal(1, msg.RetryCount); // one sweep retry recorded } // ── StoreAndForward-005: sweep-vs-management race hardening ── [Fact] public async Task RetryMessageAsync_StatusChangedDuringDelivery_SweepParkWriteIsSkipped() { // StoreAndForward-005: the retry sweep's state-changing writes must be // conditional on the status it observed, so a concurrent operator action that // moved the row out of Pending (e.g. between the sweep's snapshot load and its // park write) is not silently overwritten by the sweep's stale view. var result = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api", """{}""", attemptImmediateDelivery: false, maxRetries: 1); _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, async msg => { // Simulate an operator action winning the race: the row leaves Pending // (here: parked) while the sweep is still mid-delivery. The sweep would // otherwise unconditionally re-write this row from its stale snapshot. var parkedOutFromUnderTheSweep = new StoreAndForwardMessage { Id = msg.Id, Category = msg.Category, Target = msg.Target, PayloadJson = msg.PayloadJson, RetryCount = 7, MaxRetries = msg.MaxRetries, RetryIntervalMs = msg.RetryIntervalMs, CreatedAt = msg.CreatedAt, LastAttemptAt = DateTimeOffset.UtcNow, Status = StoreAndForwardMessageStatus.Parked, LastError = "operator/other writer" }; await _storage.UpdateMessageAsync(parkedOutFromUnderTheSweep); throw new HttpRequestException("transient — sweep will try to park"); }); await _service.RetryPendingMessagesAsync(); // The sweep observed Pending; the row is now Parked with the other writer's // RetryCount (7), not the sweep's (1). The sweep's conditional write was skipped. var msg = await _storage.GetMessageByIdAsync(result.MessageId); Assert.NotNull(msg); Assert.Equal(StoreAndForwardMessageStatus.Parked, msg!.Status); Assert.Equal(7, msg.RetryCount); Assert.Equal("operator/other writer", msg.LastError); } [Fact] public async Task RetryPendingMessagesAsync_PermanentFailureOnRetry_ParksMessage() { int callCount = 0; _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => { callCount++; if (callCount == 1) throw new HttpRequestException("transient"); return Task.FromResult(false); }); var result = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api", """{}"""); await _service.RetryPendingMessagesAsync(); var msg = await _storage.GetMessageByIdAsync(result.MessageId); Assert.NotNull(msg); Assert.Equal(StoreAndForwardMessageStatus.Parked, msg!.Status); } // ── WP-12: Parked message management ── [Fact] public async Task RetryParkedMessageAsync_MovesBackToQueue() { _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => throw new HttpRequestException("fail")); var result = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api", """{}""", maxRetries: 1); await _service.RetryPendingMessagesAsync(); var msg = await _storage.GetMessageByIdAsync(result.MessageId); Assert.Equal(StoreAndForwardMessageStatus.Parked, msg!.Status); var retried = await _service.RetryParkedMessageAsync(result.MessageId); Assert.True(retried); msg = await _storage.GetMessageByIdAsync(result.MessageId); Assert.Equal(StoreAndForwardMessageStatus.Pending, msg!.Status); Assert.Equal(0, msg.RetryCount); } [Fact] public async Task DiscardParkedMessageAsync_PermanentlyRemoves() { _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => throw new HttpRequestException("fail")); var result = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api", """{}""", maxRetries: 1); await _service.RetryPendingMessagesAsync(); var discarded = await _service.DiscardParkedMessageAsync(result.MessageId); Assert.True(discarded); var msg = await _storage.GetMessageByIdAsync(result.MessageId); Assert.Null(msg); } [Fact] public async Task GetParkedMessagesAsync_ReturnsPaginatedResults() { _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => throw new HttpRequestException("fail")); for (int i = 0; i < 3; i++) { await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, $"api{i}", """{}""", maxRetries: 1); } await _service.RetryPendingMessagesAsync(); var (messages, total) = await _service.GetParkedMessagesAsync( StoreAndForwardCategory.ExternalSystem, 1, 2); Assert.Equal(2, messages.Count); Assert.True(total >= 3); } // ── WP-13: Messages survive instance deletion ── [Fact] public async Task MessagesForInstance_SurviveAfterDeletion() { _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => throw new HttpRequestException("fail")); await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api", """{}""", "Pump1"); await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api2", """{}""", "Pump1"); var count = await _service.GetMessageCountForInstanceAsync("Pump1"); Assert.Equal(2, count); } // ── WP-14: Health metrics ── [Fact] public async Task GetBufferDepthAsync_ReturnsCorrectDepth() { _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => throw new HttpRequestException("fail")); _service.RegisterDeliveryHandler(StoreAndForwardCategory.Notification, _ => throw new HttpRequestException("fail")); await _service.EnqueueAsync(StoreAndForwardCategory.ExternalSystem, "api1", """{}"""); await _service.EnqueueAsync(StoreAndForwardCategory.ExternalSystem, "api2", """{}"""); await _service.EnqueueAsync(StoreAndForwardCategory.Notification, "email", """{}"""); var depth = await _service.GetBufferDepthAsync(); Assert.True(depth.GetValueOrDefault(StoreAndForwardCategory.ExternalSystem) >= 2); Assert.True(depth.GetValueOrDefault(StoreAndForwardCategory.Notification) >= 1); } [Fact] public async Task OnActivity_RaisedOnEnqueue() { var activities = new List(); _service.OnActivity += (action, _, _) => activities.Add(action); _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => Task.FromResult(true)); await _service.EnqueueAsync(StoreAndForwardCategory.ExternalSystem, "api", """{}"""); Assert.Contains("Delivered", activities); } [Fact] public async Task OnActivity_RaisedOnBuffer() { var activities = new List(); _service.OnActivity += (action, _, _) => activities.Add(action); _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => throw new HttpRequestException("fail")); await _service.EnqueueAsync(StoreAndForwardCategory.ExternalSystem, "api", """{}"""); Assert.Contains("Queued", activities); } // ── WP-10: Per-source-entity retry settings ── [Fact] public async Task EnqueueAsync_CustomRetrySettings_Respected() { _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => throw new HttpRequestException("fail")); var result = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api", """{}""", maxRetries: 100, retryInterval: TimeSpan.FromSeconds(60)); var msg = await _storage.GetMessageByIdAsync(result.MessageId); Assert.Equal(100, msg!.MaxRetries); Assert.Equal(60000, msg.RetryIntervalMs); } // ── attemptImmediateDelivery: false — caller already attempted delivery ── [Fact] public async Task EnqueueAsync_AttemptImmediateDeliveryFalse_BuffersWithoutInvokingHandler() { // A caller that has already made its own delivery attempt passes // attemptImmediateDelivery: false so the request is not dispatched twice. var handlerCalls = 0; _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => { Interlocked.Increment(ref handlerCalls); return Task.FromResult(true); }); var result = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api", """{}""", attemptImmediateDelivery: false); Assert.Equal(0, handlerCalls); // handler NOT invoked at enqueue time Assert.True(result.WasBuffered); var msg = await _storage.GetMessageByIdAsync(result.MessageId); Assert.NotNull(msg); Assert.Equal(StoreAndForwardMessageStatus.Pending, msg!.Status); // StoreAndForward-003: the caller's own attempt is attempt 0; RetryCount // counts only sweep retries, so a freshly buffered message has RetryCount 0. Assert.Equal(0, msg.RetryCount); } }