using Microsoft.Data.Sqlite; using Microsoft.Extensions.Logging.Abstractions; using ScadaLink.Commons.Types.Enums; namespace ScadaLink.StoreAndForward.Tests; /// /// WP-10/12/13/14: Tests for the StoreAndForwardService retry engine and management. /// public class StoreAndForwardServiceTests : IAsyncLifetime, IDisposable { private readonly SqliteConnection _keepAlive; private readonly StoreAndForwardStorage _storage; private readonly StoreAndForwardService _service; private readonly StoreAndForwardOptions _options; public StoreAndForwardServiceTests() { var dbName = $"SvcTests_{Guid.NewGuid():N}"; var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared"; _keepAlive = new SqliteConnection(connStr); _keepAlive.Open(); _storage = new StoreAndForwardStorage(connStr, NullLogger.Instance); _options = new StoreAndForwardOptions { DefaultRetryInterval = TimeSpan.Zero, DefaultMaxRetries = 3, RetryTimerInterval = TimeSpan.FromMinutes(10) }; _service = new StoreAndForwardService( _storage, _options, NullLogger.Instance); } public async Task InitializeAsync() => await _storage.InitializeAsync(); public Task DisposeAsync() => Task.CompletedTask; public void Dispose() => _keepAlive.Dispose(); // ── WP-10: Immediate delivery ── [Fact] public async Task EnqueueAsync_ImmediateDeliverySuccess_ReturnsAcceptedNotBuffered() { _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => Task.FromResult(true)); var result = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api.example.com", """{"method":"Test"}""", "Pump1"); Assert.True(result.Accepted); Assert.False(result.WasBuffered); } [Fact] public async Task EnqueueAsync_PermanentFailure_ReturnsNotAccepted() { _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => Task.FromResult(false)); var result = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api.example.com", """{"method":"Test"}"""); Assert.False(result.Accepted); Assert.False(result.WasBuffered); } [Fact] public async Task EnqueueAsync_TransientFailure_BuffersForRetry() { _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => throw new HttpRequestException("Connection refused")); var result = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api.example.com", """{"method":"Test"}""", "Pump1"); Assert.True(result.Accepted); Assert.True(result.WasBuffered); var msg = await _storage.GetMessageByIdAsync(result.MessageId); Assert.NotNull(msg); Assert.Equal(StoreAndForwardMessageStatus.Pending, msg!.Status); // StoreAndForward-003: RetryCount counts sweep retries only; the immediate // attempt is attempt 0, so a freshly buffered message has RetryCount 0. Assert.Equal(0, msg.RetryCount); } [Fact] public async Task EnqueueAsync_NoHandler_BuffersForLater() { var result = await _service.EnqueueAsync( StoreAndForwardCategory.Notification, "alerts@company.com", """{"subject":"Alert"}"""); Assert.True(result.Accepted); Assert.True(result.WasBuffered); } // ── WP-10: Retry engine ── [Fact] public async Task RetryPendingMessagesAsync_SuccessfulRetry_RemovesMessage() { int callCount = 0; _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => { callCount++; if (callCount == 1) throw new HttpRequestException("fail"); return Task.FromResult(true); }); var result = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api", """{}"""); Assert.True(result.WasBuffered); await _service.RetryPendingMessagesAsync(); var msg = await _storage.GetMessageByIdAsync(result.MessageId); Assert.Null(msg); } [Fact] public async Task RetryPendingMessagesAsync_MaxRetriesReached_ParksMessage() { _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => throw new HttpRequestException("always fails")); var result = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api", """{}""", maxRetries: 2); // StoreAndForward-003: MaxRetries bounds sweep retries (not the immediate // attempt), so a message with MaxRetries=2 needs two retry sweeps to park. await _service.RetryPendingMessagesAsync(); var afterFirst = await _storage.GetMessageByIdAsync(result.MessageId); Assert.Equal(StoreAndForwardMessageStatus.Pending, afterFirst!.Status); await _service.RetryPendingMessagesAsync(); var msg = await _storage.GetMessageByIdAsync(result.MessageId); Assert.NotNull(msg); Assert.Equal(StoreAndForwardMessageStatus.Parked, msg!.Status); } // ── StoreAndForward-003: retry-count accounting ── [Fact] public async Task RetryPendingMessagesAsync_MaxRetriesOne_PerformsExactlyOneRetryBeforeParking() { // The immediate attempt is attempt 0; MaxRetries=1 must allow exactly one // retry sweep before parking. The pre-fix off-by-one parked with zero retries. var attempts = 0; _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => { Interlocked.Increment(ref attempts); throw new HttpRequestException("always fails"); }); var result = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api", """{}""", maxRetries: 1); // After the immediate failed attempt the message is buffered, not parked. var buffered = await _storage.GetMessageByIdAsync(result.MessageId); Assert.Equal(StoreAndForwardMessageStatus.Pending, buffered!.Status); Assert.Equal(1, attempts); // only the immediate attempt so far await _service.RetryPendingMessagesAsync(); var msg = await _storage.GetMessageByIdAsync(result.MessageId); Assert.Equal(StoreAndForwardMessageStatus.Parked, msg!.Status); Assert.Equal(2, attempts); // immediate attempt + exactly one retry Assert.Equal(1, msg.RetryCount); // one sweep retry recorded } // ── StoreAndForward-005: sweep-vs-management race hardening ── [Fact] public async Task RetryMessageAsync_StatusChangedDuringDelivery_SweepParkWriteIsSkipped() { // StoreAndForward-005: the retry sweep's state-changing writes must be // conditional on the status it observed, so a concurrent operator action that // moved the row out of Pending (e.g. between the sweep's snapshot load and its // park write) is not silently overwritten by the sweep's stale view. var result = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api", """{}""", attemptImmediateDelivery: false, maxRetries: 1); _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, async msg => { // Simulate an operator action winning the race: the row leaves Pending // (here: parked) while the sweep is still mid-delivery. The sweep would // otherwise unconditionally re-write this row from its stale snapshot. var parkedOutFromUnderTheSweep = new StoreAndForwardMessage { Id = msg.Id, Category = msg.Category, Target = msg.Target, PayloadJson = msg.PayloadJson, RetryCount = 7, MaxRetries = msg.MaxRetries, RetryIntervalMs = msg.RetryIntervalMs, CreatedAt = msg.CreatedAt, LastAttemptAt = DateTimeOffset.UtcNow, Status = StoreAndForwardMessageStatus.Parked, LastError = "operator/other writer" }; await _storage.UpdateMessageAsync(parkedOutFromUnderTheSweep); throw new HttpRequestException("transient — sweep will try to park"); }); await _service.RetryPendingMessagesAsync(); // The sweep observed Pending; the row is now Parked with the other writer's // RetryCount (7), not the sweep's (1). The sweep's conditional write was skipped. var msg = await _storage.GetMessageByIdAsync(result.MessageId); Assert.NotNull(msg); Assert.Equal(StoreAndForwardMessageStatus.Parked, msg!.Status); Assert.Equal(7, msg.RetryCount); Assert.Equal("operator/other writer", msg.LastError); } [Fact] public async Task RetryPendingMessagesAsync_PermanentFailureOnRetry_ParksMessage() { int callCount = 0; _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => { callCount++; if (callCount == 1) throw new HttpRequestException("transient"); return Task.FromResult(false); }); var result = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api", """{}"""); await _service.RetryPendingMessagesAsync(); var msg = await _storage.GetMessageByIdAsync(result.MessageId); Assert.NotNull(msg); Assert.Equal(StoreAndForwardMessageStatus.Parked, msg!.Status); } // ── WP-12: Parked message management ── [Fact] public async Task RetryParkedMessageAsync_MovesBackToQueue() { _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => throw new HttpRequestException("fail")); var result = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api", """{}""", maxRetries: 1); await _service.RetryPendingMessagesAsync(); var msg = await _storage.GetMessageByIdAsync(result.MessageId); Assert.Equal(StoreAndForwardMessageStatus.Parked, msg!.Status); var retried = await _service.RetryParkedMessageAsync(result.MessageId); Assert.True(retried); msg = await _storage.GetMessageByIdAsync(result.MessageId); Assert.Equal(StoreAndForwardMessageStatus.Pending, msg!.Status); Assert.Equal(0, msg.RetryCount); } /// /// StoreAndForward-017: the Retry activity-log entry must carry the parked /// message's true category, not a hard-coded ExternalSystem. /// [Fact] public async Task RetryParkedMessageAsync_ActivityUsesMessageRealCategory() { _service.RegisterDeliveryHandler(StoreAndForwardCategory.Notification, _ => throw new HttpRequestException("fail")); var result = await _service.EnqueueAsync( StoreAndForwardCategory.Notification, "ops-list", """{}""", maxRetries: 1); await _service.RetryPendingMessagesAsync(); // -> parked var categories = new List(); _service.OnActivity += (action, category, _) => { if (action == "Retry") categories.Add(category); }; var retried = await _service.RetryParkedMessageAsync(result.MessageId); Assert.True(retried); Assert.Equal(new[] { StoreAndForwardCategory.Notification }, categories); } /// /// StoreAndForward-017: the Discard activity-log entry must carry the parked /// message's true category, not a hard-coded ExternalSystem. /// [Fact] public async Task DiscardParkedMessageAsync_ActivityUsesMessageRealCategory() { _service.RegisterDeliveryHandler(StoreAndForwardCategory.CachedDbWrite, _ => throw new HttpRequestException("fail")); var result = await _service.EnqueueAsync( StoreAndForwardCategory.CachedDbWrite, "site-db", """{}""", maxRetries: 1); await _service.RetryPendingMessagesAsync(); // -> parked var categories = new List(); _service.OnActivity += (action, category, _) => { if (action == "Discard") categories.Add(category); }; var discarded = await _service.DiscardParkedMessageAsync(result.MessageId); Assert.True(discarded); Assert.Equal(new[] { StoreAndForwardCategory.CachedDbWrite }, categories); } [Fact] public async Task DiscardParkedMessageAsync_PermanentlyRemoves() { _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => throw new HttpRequestException("fail")); var result = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api", """{}""", maxRetries: 1); await _service.RetryPendingMessagesAsync(); var discarded = await _service.DiscardParkedMessageAsync(result.MessageId); Assert.True(discarded); var msg = await _storage.GetMessageByIdAsync(result.MessageId); Assert.Null(msg); } [Fact] public async Task GetParkedMessagesAsync_ReturnsPaginatedResults() { _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => throw new HttpRequestException("fail")); for (int i = 0; i < 3; i++) { await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, $"api{i}", """{}""", maxRetries: 1); } await _service.RetryPendingMessagesAsync(); var (messages, total) = await _service.GetParkedMessagesAsync( StoreAndForwardCategory.ExternalSystem, 1, 2); Assert.Equal(2, messages.Count); Assert.True(total >= 3); } // ── WP-13: Messages survive instance deletion ── [Fact] public async Task MessagesForInstance_SurviveAfterDeletion() { _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => throw new HttpRequestException("fail")); await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api", """{}""", "Pump1"); await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api2", """{}""", "Pump1"); var count = await _service.GetMessageCountForInstanceAsync("Pump1"); Assert.Equal(2, count); } // ── WP-14: Health metrics ── [Fact] public async Task GetBufferDepthAsync_ReturnsCorrectDepth() { _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => throw new HttpRequestException("fail")); _service.RegisterDeliveryHandler(StoreAndForwardCategory.Notification, _ => throw new HttpRequestException("fail")); await _service.EnqueueAsync(StoreAndForwardCategory.ExternalSystem, "api1", """{}"""); await _service.EnqueueAsync(StoreAndForwardCategory.ExternalSystem, "api2", """{}"""); await _service.EnqueueAsync(StoreAndForwardCategory.Notification, "email", """{}"""); var depth = await _service.GetBufferDepthAsync(); Assert.True(depth.GetValueOrDefault(StoreAndForwardCategory.ExternalSystem) >= 2); Assert.True(depth.GetValueOrDefault(StoreAndForwardCategory.Notification) >= 1); } [Fact] public async Task OnActivity_RaisedOnEnqueue() { var activities = new List(); _service.OnActivity += (action, _, _) => activities.Add(action); _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => Task.FromResult(true)); await _service.EnqueueAsync(StoreAndForwardCategory.ExternalSystem, "api", """{}"""); Assert.Contains("Delivered", activities); } [Fact] public async Task OnActivity_RaisedOnBuffer() { var activities = new List(); _service.OnActivity += (action, _, _) => activities.Add(action); _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => throw new HttpRequestException("fail")); await _service.EnqueueAsync(StoreAndForwardCategory.ExternalSystem, "api", """{}"""); Assert.Contains("Queued", activities); } // ── StoreAndForward-009: faulting activity subscriber must not corrupt delivery ── [Fact] public async Task EnqueueAsync_ImmediateDeliverySuccess_FaultingActivitySubscriber_StillReportsDelivered() { // StoreAndForward-009: a throwing OnActivity subscriber (e.g. the site event // log) must not be misclassified as a transient delivery failure. Pre-fix the // subscriber's exception escaped RaiseActivity, was caught by EnqueueAsync's // transient-failure handler, and a successfully delivered message was buffered. _service.OnActivity += (_, _, _) => throw new InvalidOperationException("logging blew up"); _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => Task.FromResult(true)); var result = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api", """{}"""); Assert.True(result.Accepted); Assert.False(result.WasBuffered); // delivered, NOT buffered var msg = await _storage.GetMessageByIdAsync(result.MessageId); Assert.Null(msg); // nothing left in the buffer } [Fact] public async Task RetryMessageAsync_FaultingActivitySubscriber_DoesNotIncrementRetryCount() { // StoreAndForward-009: a throwing subscriber raised after a successful retry // delivery must not be caught by the retry-failure handler and counted as a // transient failure. var result = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api", """{}""", attemptImmediateDelivery: false, maxRetries: 5); _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => Task.FromResult(true)); _service.OnActivity += (_, _, _) => throw new InvalidOperationException("logging blew up"); await _service.RetryPendingMessagesAsync(); // The retry succeeded; the message must be gone, not re-buffered with a bumped count. var msg = await _storage.GetMessageByIdAsync(result.MessageId); Assert.Null(msg); } // ── WP-10: Per-source-entity retry settings ── [Fact] public async Task EnqueueAsync_CustomRetrySettings_Respected() { _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => throw new HttpRequestException("fail")); var result = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api", """{}""", maxRetries: 100, retryInterval: TimeSpan.FromSeconds(60)); var msg = await _storage.GetMessageByIdAsync(result.MessageId); Assert.Equal(100, msg!.MaxRetries); Assert.Equal(60000, msg.RetryIntervalMs); } // ── attemptImmediateDelivery: false — caller already attempted delivery ── [Fact] public async Task EnqueueAsync_AttemptImmediateDeliveryFalse_BuffersWithoutInvokingHandler() { // A caller that has already made its own delivery attempt passes // attemptImmediateDelivery: false so the request is not dispatched twice. var handlerCalls = 0; _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => { Interlocked.Increment(ref handlerCalls); return Task.FromResult(true); }); var result = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api", """{}""", attemptImmediateDelivery: false); Assert.Equal(0, handlerCalls); // handler NOT invoked at enqueue time Assert.True(result.WasBuffered); var msg = await _storage.GetMessageByIdAsync(result.MessageId); Assert.NotNull(msg); Assert.Equal(StoreAndForwardMessageStatus.Pending, msg!.Status); // StoreAndForward-003: the caller's own attempt is attempt 0; RetryCount // counts only sweep retries, so a freshly buffered message has RetryCount 0. Assert.Equal(0, msg.RetryCount); } }