From 5672502d832c743c63cb11e1642428f1a3b577e9 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 16 May 2026 21:44:10 -0400 Subject: [PATCH] =?UTF-8?q?fix(store-and-forward):=20resolve=20StoreAndFor?= =?UTF-8?q?ward-004,005,010,013=20=E2=80=94=20accurate=20handler-contract?= =?UTF-8?q?=20doc,=20conditional=20sweep=20writes,=20reset=20LastAttemptAt?= =?UTF-8?q?=20on=20parked=20retry,=20test=20coverage?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- code-reviews/StoreAndForward/findings.md | 85 ++++++++- .../StoreAndForwardService.cs | 59 +++++- .../StoreAndForwardStorage.cs | 50 ++++- .../ParkedMessageHandlerActorTests.cs | 173 ++++++++++++++++++ .../ScadaLink.StoreAndForward.Tests.csproj | 1 + .../StoreAndForwardServiceTests.cs | 43 +++++ .../StoreAndForwardStorageTests.cs | 54 ++++++ 7 files changed, 447 insertions(+), 18 deletions(-) create mode 100644 tests/ScadaLink.StoreAndForward.Tests/ParkedMessageHandlerActorTests.cs diff --git a/code-reviews/StoreAndForward/findings.md b/code-reviews/StoreAndForward/findings.md index e6e80f2..eaadad9 100644 --- a/code-reviews/StoreAndForward/findings.md +++ b/code-reviews/StoreAndForward/findings.md @@ -8,7 +8,7 @@ | Last reviewed | 2026-05-16 | | Reviewer | claude-agent | | Commit reviewed | `9c60592` | -| Open findings | 11 | +| Open findings | 7 | ## Summary @@ -212,7 +212,7 @@ corrected semantics. |--|--| | Severity | Medium | | Category | Documentation & comments | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.StoreAndForward/StoreAndForwardService.cs:38`, `:60` | **Description** @@ -233,15 +233,25 @@ retry); exception = transient failure (buffer / increment retry). **Resolution** -_Unresolved._ +Resolved 2026-05-16 (commit pending). Confirmed the root cause against the source: the +old XML comment's "Permanent failures should return false (message will NOT be +buffered)" describes only the `EnqueueAsync` immediate path; on the retry path a +`false` return parks the already-buffered message. Reworded the `_deliveryHandlers` +field doc into an explicit three-way contract (`true` = delivered/removed-or-never- +buffered; `false` = permanent failure = not-buffered-on-immediate / parked-on-retry; +throws = transient = buffered-for-retry / retry-count-incremented), noting it applies +identically on both paths, and pointed `RegisterDeliveryHandler`'s summary at it. +Documentation-only change — no behavioural code touched, so no regression test +(an XML comment is not test-observable). ### StoreAndForward-005 — Parked-message retry/discard can race with the in-progress retry sweep | | | |--|--| -| Severity | Medium | +| Severity | Low — re-triaged down from Medium on 2026-05-16; the described data-loss race is not actually reachable, see Re-triage note | +| Original severity | Medium | | Category | Concurrency & thread safety | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.StoreAndForward/StoreAndForwardService.cs:184`, `:266`, `:280` | **Description** @@ -266,9 +276,42 @@ so the actor mailbox serialises them, or make status transitions conditional in (e.g. `UPDATE ... WHERE id = @id AND status = @expected`) and re-check the affected row count. +**Re-triage note (2026-05-16)** + +Verified against the source: the *specific* race the finding describes — *"an operator +`RetryParkedMessageAsync` resets a row to Pending while the sweep simultaneously parks +the same in-flight message"* — **cannot occur** at the reviewed code. The two operator +operations (`StoreAndForwardStorage.RetryParkedMessageAsync` / +`DiscardParkedMessageAsync`) are already SQL-conditional on `status = Parked`, and the +retry sweep only ever loads and processes rows that are `Pending`. For the operator and +the sweep to touch the *same* message the row would have to be simultaneously `Parked` +(operator-actionable) and in the sweep's in-flight `Pending` snapshot — mutually +exclusive states. Overlapping sweeps are additionally prevented by the `Interlocked` +guard, so there is only ever one sweep writer. The described data-loss outcome is +therefore not reachable, which makes the original Medium ("risky behaviour") severity +an over-statement — **re-triaged to Low**. + +What *is* real is a latent fragility: the sweep's own state-changing writes used the +unconditional `UpdateMessageAsync` (`WHERE id = @id`), so the no-clobber guarantee +rested entirely on the `Interlocked` invariant with zero defence-in-depth if that +invariant were ever broken (e.g. `RetryMessageAsync` called outside the sweep, or the +guard removed). That residual Low-severity weakness is worth closing, so the +recommendation's conditional-SQL option was applied. + **Resolution** -_Unresolved._ +Resolved 2026-05-16 (commit pending). Re-triaged Medium → Low (the described race is +unreachable; see Re-triage note) and the residual latent fragility fixed: added +`StoreAndForwardStorage.UpdateMessageIfStatusAsync`, a conditional update +(`UPDATE ... WHERE id = @id AND status = @expectedStatus`) returning whether a row was +written. `RetryMessageAsync`'s three state-changing writes (park-on-permanent-failure, +park-on-max-retries, retry-count increment) now use it with `expectedStatus = Pending`, +so the sweep can never overwrite a row whose status changed underneath it; a skipped +write is logged and the message left for the next sweep. Regression test +`RetryMessageAsync_StatusChangedDuringDelivery_SweepParkWriteIsSkipped` drives a writer +that moves the row out of `Pending` mid-delivery and asserts the sweep's stale park +write is skipped (it failed against the pre-fix unconditional update, clobbering the +other writer's `RetryCount`). ### StoreAndForward-006 — `GetParkedMessagesAsync` count and page run without a transaction @@ -395,7 +438,7 @@ _Unresolved._ |--|--| | Severity | Medium | | Category | Correctness & logic bugs | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.StoreAndForward/StoreAndForwardStorage.cs:203`, `:101` | **Description** @@ -419,7 +462,16 @@ interval. Document the chosen behaviour in the method's XML comment. **Resolution** -_Unresolved._ +Resolved 2026-05-16 (commit pending). Confirmed against the source: `RetryParkedMessage +Async` reset `status`, `retry_count` and `last_error` but left `last_attempt_at` stale, +so the operator-retried message's retry timing depended on the elapsed time since the +original pre-park attempt. Encoded the intent explicitly — an operator retry means +"attempt this again now" — by also setting `last_attempt_at = NULL` in the UPDATE, so +the re-queued message is unambiguously due on the next sweep regardless of the +configured `retry_interval_ms`. The method's XML comment now documents this. Regression +test `RetryParkedMessageAsync_ClearsLastAttemptAt_SoMessageIsImmediatelyDue` uses a +1-hour retry interval and a recent `last_attempt_at`; it failed pre-fix (timestamp not +cleared, message excluded from the retry-due set) and passes post-fix. ### StoreAndForward-011 — `StoreAndForwardMessageStatus.InFlight` is unused and the doc's "retrying" status is unmodelled @@ -488,7 +540,7 @@ _Unresolved._ |--|--| | Severity | Medium | | Category | Testing coverage | -| Status | Open | +| Status | Resolved | | Location | `tests/ScadaLink.StoreAndForward.Tests/` (whole directory); `src/ScadaLink.StoreAndForward/StoreAndForwardStorage.cs:101`; `src/ScadaLink.StoreAndForward/ParkedMessageHandlerActor.cs` | **Description** @@ -514,7 +566,20 @@ invalid-JSON payloads. **Resolution** -_Unresolved._ +Resolved 2026-05-16 (commit pending). All three gaps closed. +(1) Retry-due timing: `GetMessagesForRetryAsync_NonZeroInterval_ExcludesNotYetDue +IncludesDue` exercises the `julianday` elapsed-time SQL with non-zero intervals — a +just-attempted message (1-hour interval) is excluded, an old one (attempted 2h ago, +5-min interval) and a never-attempted one are included. +(2) Active-side replication: this sub-claim was **already stale** at the reviewed code — +`StoreAndForwardReplicationTests` (added with finding 001's fix) asserts the Add, Remove +and Park replication operations; no new test was needed. +(3) `ParkedMessageHandlerActor`: new `ParkedMessageHandlerActorTests` using +`Akka.TestKit.Xunit2` (package reference added to the test project) covers Query/Retry/ +Discard request-to-response mapping, correlation-ID propagation, the unknown-message +failure responses, and `ExtractMethodName` for `MethodName`, `Subject`, missing-property +and malformed-JSON payloads (all falling back to the category name without throwing). +These are coverage-gap tests over already-correct code, so they pass on first run. ### StoreAndForward-014 — Storage does not create its SQLite database directory diff --git a/src/ScadaLink.StoreAndForward/StoreAndForwardService.cs b/src/ScadaLink.StoreAndForward/StoreAndForwardService.cs index 4fdac2f..8a5af60 100644 --- a/src/ScadaLink.StoreAndForward/StoreAndForwardService.cs +++ b/src/ScadaLink.StoreAndForward/StoreAndForwardService.cs @@ -36,8 +36,20 @@ public class StoreAndForwardService private int _retryInProgress; /// - /// WP-10: Delivery handler delegate. Returns true on success, throws on transient failure. - /// Permanent failures should return false (message will NOT be buffered). + /// WP-10: Delivery handler delegate. The return value / exception is interpreted + /// the same way on both the immediate-delivery path () + /// and the background retry path (RetryMessageAsync): + /// + /// true — delivered successfully. The message is + /// removed from the buffer (or, on the immediate path, never buffered). + /// false — permanent failure. On the immediate path + /// the message is NOT buffered; on a retry the message is already buffered and + /// is parked immediately (no further retries). + /// throws — transient failure. On the immediate path the + /// message is buffered for retry; on a retry the retry count is incremented and + /// the message is parked once is + /// reached. + /// /// private readonly Dictionary>> _deliveryHandlers = new(); @@ -59,7 +71,9 @@ public class StoreAndForwardService } /// - /// Registers a delivery handler for a given message category. + /// Registers a delivery handler for a given message category. See the + /// _deliveryHandlers field documentation for the true/false/throws contract, + /// which applies identically on the immediate and retry paths. /// public void RegisterDeliveryHandler( StoreAndForwardCategory category, @@ -241,11 +255,22 @@ public class StoreAndForwardService return; } - // Permanent failure on retry — park immediately + // Permanent failure on retry — park immediately. + // StoreAndForward-005: the sweep observed this row as Pending; only commit + // the park if it is still Pending so a concurrent operator action that + // moved it (retry/discard) is not silently overwritten. message.Status = StoreAndForwardMessageStatus.Parked; message.LastAttemptAt = DateTimeOffset.UtcNow; message.LastError = "Permanent failure (handler returned false)"; - await _storage.UpdateMessageAsync(message); + var parked = await _storage.UpdateMessageIfStatusAsync( + message, StoreAndForwardMessageStatus.Pending); + if (!parked) + { + _logger.LogDebug( + "Message {MessageId} changed status during delivery; sweep park skipped", + message.Id); + return; + } _replication?.ReplicatePark(message); RaiseActivity("Parked", message.Category, $"Permanent failure for {message.Target}: handler returned false"); @@ -259,8 +284,18 @@ public class StoreAndForwardService if (message.MaxRetries > 0 && message.RetryCount >= message.MaxRetries) { + // StoreAndForward-005: conditional park — see the permanent-failure + // branch above for rationale. message.Status = StoreAndForwardMessageStatus.Parked; - await _storage.UpdateMessageAsync(message); + var parked = await _storage.UpdateMessageIfStatusAsync( + message, StoreAndForwardMessageStatus.Pending); + if (!parked) + { + _logger.LogDebug( + "Message {MessageId} changed status during delivery; sweep park skipped", + message.Id); + return; + } _replication?.ReplicatePark(message); RaiseActivity("Parked", message.Category, $"Max retries ({message.MaxRetries}) reached for {message.Target}"); @@ -270,7 +305,17 @@ public class StoreAndForwardService } else { - await _storage.UpdateMessageAsync(message); + // StoreAndForward-005: the retry-count increment is also conditional + // on the row still being Pending so it cannot clobber an operator + // action that ran during the failed delivery. + if (!await _storage.UpdateMessageIfStatusAsync( + message, StoreAndForwardMessageStatus.Pending)) + { + _logger.LogDebug( + "Message {MessageId} changed status during delivery; sweep retry-count update skipped", + message.Id); + return; + } RaiseActivity("Retried", message.Category, $"Retry {message.RetryCount}/{message.MaxRetries} for {message.Target}: {ex.Message}"); } diff --git a/src/ScadaLink.StoreAndForward/StoreAndForwardStorage.cs b/src/ScadaLink.StoreAndForward/StoreAndForwardStorage.cs index 89e4ec1..91ba254 100644 --- a/src/ScadaLink.StoreAndForward/StoreAndForwardStorage.cs +++ b/src/ScadaLink.StoreAndForward/StoreAndForwardStorage.cs @@ -164,6 +164,47 @@ public class StoreAndForwardStorage await cmd.ExecuteNonQueryAsync(); } + /// + /// WP-10: Updates a message after a delivery attempt, but only if the row is still + /// in the expected status. Returns true if the row was updated, false if it had + /// already been changed (e.g. an operator retried or discarded the message) and so + /// was skipped. + /// + /// StoreAndForward-005: the retry sweep uses this for its state-changing writes so + /// it cannot clobber a concurrent operator action (RetryParkedMessageAsync / + /// DiscardParkedMessageAsync). Those operator operations are themselves SQL- + /// conditional on status = Parked; making the sweep's writes conditional on + /// the status the sweep observed closes the sweep-vs-management race rather than + /// relying only on the in-process overlapping-sweep guard. + /// + public async Task UpdateMessageIfStatusAsync( + StoreAndForwardMessage message, + StoreAndForwardMessageStatus expectedStatus) + { + await using var connection = new SqliteConnection(_connectionString); + await connection.OpenAsync(); + + await using var cmd = connection.CreateCommand(); + cmd.CommandText = @" + UPDATE sf_messages + SET retry_count = @retryCount, + last_attempt_at = @lastAttempt, + status = @status, + last_error = @lastError + WHERE id = @id AND status = @expectedStatus"; + + cmd.Parameters.AddWithValue("@id", message.Id); + cmd.Parameters.AddWithValue("@retryCount", message.RetryCount); + cmd.Parameters.AddWithValue("@lastAttempt", message.LastAttemptAt.HasValue + ? message.LastAttemptAt.Value.ToString("O") : DBNull.Value); + cmd.Parameters.AddWithValue("@status", (int)message.Status); + cmd.Parameters.AddWithValue("@lastError", (object?)message.LastError ?? DBNull.Value); + cmd.Parameters.AddWithValue("@expectedStatus", (int)expectedStatus); + + var rows = await cmd.ExecuteNonQueryAsync(); + return rows > 0; + } + /// /// WP-10: Removes a successfully delivered message. /// @@ -221,6 +262,13 @@ public class StoreAndForwardStorage /// /// WP-12: Moves a parked message back to pending for retry. + /// + /// StoreAndForward-010: last_attempt_at is reset to NULL so the re-queued + /// message is unambiguously due on the next retry sweep. An operator-initiated + /// retry means "attempt this again now"; leaving the stale parked timestamp in + /// place would make the message's retry timing depend on the configured retry + /// interval relative to the original (pre-park) attempt — "try immediately" only + /// by accident, and a long interval would instead delay the operator's retry. /// public async Task RetryParkedMessageAsync(string messageId) { @@ -230,7 +278,7 @@ public class StoreAndForwardStorage await using var cmd = connection.CreateCommand(); cmd.CommandText = @" UPDATE sf_messages - SET status = @pending, retry_count = 0, last_error = NULL + SET status = @pending, retry_count = 0, last_error = NULL, last_attempt_at = NULL WHERE id = @id AND status = @parked"; cmd.Parameters.AddWithValue("@id", messageId); diff --git a/tests/ScadaLink.StoreAndForward.Tests/ParkedMessageHandlerActorTests.cs b/tests/ScadaLink.StoreAndForward.Tests/ParkedMessageHandlerActorTests.cs new file mode 100644 index 0000000..a44eda3 --- /dev/null +++ b/tests/ScadaLink.StoreAndForward.Tests/ParkedMessageHandlerActorTests.cs @@ -0,0 +1,173 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Microsoft.Data.Sqlite; +using Microsoft.Extensions.Logging.Abstractions; +using ScadaLink.Commons.Messages.RemoteQuery; +using ScadaLink.Commons.Types.Enums; + +namespace ScadaLink.StoreAndForward.Tests; + +/// +/// StoreAndForward-013: tests for the actor +/// bridge — the Query/Retry/Discard request-to-response mapping and the +/// ExtractMethodName payload-JSON parsing (including the malformed-JSON branch). +/// +public class ParkedMessageHandlerActorTests : TestKit, IAsyncLifetime, IDisposable +{ + private readonly SqliteConnection _keepAlive; + private readonly StoreAndForwardStorage _storage; + private readonly StoreAndForwardService _service; + + public ParkedMessageHandlerActorTests() + { + var connStr = $"Data Source=ActorTests_{Guid.NewGuid():N};Mode=Memory;Cache=Shared"; + _keepAlive = new SqliteConnection(connStr); + _keepAlive.Open(); + + _storage = new StoreAndForwardStorage(connStr, NullLogger.Instance); + + var options = new StoreAndForwardOptions + { + DefaultRetryInterval = TimeSpan.Zero, + DefaultMaxRetries = 1, + RetryTimerInterval = TimeSpan.FromMinutes(10), + ReplicationEnabled = false, + }; + + _service = new StoreAndForwardService( + _storage, options, NullLogger.Instance); + } + + public async Task InitializeAsync() => await _storage.InitializeAsync(); + + public Task DisposeAsync() => Task.CompletedTask; + + protected override void Dispose(bool disposing) + { + if (disposing) _keepAlive.Dispose(); + base.Dispose(disposing); + } + + /// Enqueues a message and parks it via the retry sweep (MaxRetries = 1). + private async Task ParkMessageAsync(StoreAndForwardCategory category, string payloadJson) + { + _service.RegisterDeliveryHandler(category, _ => throw new HttpRequestException("always fails")); + var result = await _service.EnqueueAsync(category, "target", payloadJson, maxRetries: 1); + await _service.RetryPendingMessagesAsync(); + return result.MessageId; + } + + [Fact] + public async Task Query_ReturnsParkedEntries_WithExtractedMethodName() + { + await ParkMessageAsync(StoreAndForwardCategory.ExternalSystem, + """{"MethodName":"StartPump","Args":{}}"""); + + var actor = Sys.ActorOf(Props.Create(() => new ParkedMessageHandlerActor(_service, "site-1"))); + actor.Tell(new ParkedMessageQueryRequest("corr-1", "site-1", 1, 50, DateTimeOffset.UtcNow)); + + var response = ExpectMsg(); + Assert.True(response.Success); + Assert.Equal("corr-1", response.CorrelationId); + Assert.Equal("site-1", response.SiteId); + Assert.Single(response.Messages); + Assert.Equal("StartPump", response.Messages[0].MethodName); + Assert.Equal(StoreAndForwardCategory.ExternalSystem, response.Messages[0].Category); + } + + [Fact] + public async Task Query_NotificationPayload_UsesSubjectAsMethodName() + { + await ParkMessageAsync(StoreAndForwardCategory.Notification, + """{"Subject":"Tank overflow","Body":"..."}"""); + + var actor = Sys.ActorOf(Props.Create(() => new ParkedMessageHandlerActor(_service, "site-1"))); + actor.Tell(new ParkedMessageQueryRequest("corr-2", "site-1", 1, 50, DateTimeOffset.UtcNow)); + + var response = ExpectMsg(); + Assert.True(response.Success); + Assert.Equal("Tank overflow", response.Messages[0].MethodName); + } + + [Fact] + public async Task Query_MalformedJsonPayload_FallsBackToCategoryName() + { + await ParkMessageAsync(StoreAndForwardCategory.CachedDbWrite, "not-valid-json{"); + + var actor = Sys.ActorOf(Props.Create(() => new ParkedMessageHandlerActor(_service, "site-1"))); + actor.Tell(new ParkedMessageQueryRequest("corr-3", "site-1", 1, 50, DateTimeOffset.UtcNow)); + + var response = ExpectMsg(); + Assert.True(response.Success); + // Malformed JSON must not throw — ExtractMethodName falls back to the category. + Assert.Equal(StoreAndForwardCategory.CachedDbWrite.ToString(), response.Messages[0].MethodName); + } + + [Fact] + public async Task Query_PayloadWithoutMethodNameOrSubject_FallsBackToCategoryName() + { + await ParkMessageAsync(StoreAndForwardCategory.ExternalSystem, """{"Unrelated":"value"}"""); + + var actor = Sys.ActorOf(Props.Create(() => new ParkedMessageHandlerActor(_service, "site-1"))); + actor.Tell(new ParkedMessageQueryRequest("corr-4", "site-1", 1, 50, DateTimeOffset.UtcNow)); + + var response = ExpectMsg(); + Assert.Equal(StoreAndForwardCategory.ExternalSystem.ToString(), response.Messages[0].MethodName); + } + + [Fact] + public async Task Retry_ParkedMessage_ReturnsSuccess() + { + var messageId = await ParkMessageAsync(StoreAndForwardCategory.ExternalSystem, """{}"""); + + var actor = Sys.ActorOf(Props.Create(() => new ParkedMessageHandlerActor(_service, "site-1"))); + actor.Tell(new ParkedMessageRetryRequest("corr-5", "site-1", messageId, DateTimeOffset.UtcNow)); + + var response = ExpectMsg(); + Assert.True(response.Success); + Assert.Equal("corr-5", response.CorrelationId); + Assert.Null(response.ErrorMessage); + + var msg = await _storage.GetMessageByIdAsync(messageId); + Assert.Equal(StoreAndForwardMessageStatus.Pending, msg!.Status); + } + + [Fact] + public void Retry_UnknownMessage_ReturnsFailureWithMessage() + { + var actor = Sys.ActorOf(Props.Create(() => new ParkedMessageHandlerActor(_service, "site-1"))); + actor.Tell(new ParkedMessageRetryRequest("corr-6", "site-1", "does-not-exist", DateTimeOffset.UtcNow)); + + var response = ExpectMsg(); + Assert.False(response.Success); + Assert.Equal("corr-6", response.CorrelationId); + Assert.NotNull(response.ErrorMessage); + } + + [Fact] + public async Task Discard_ParkedMessage_ReturnsSuccessAndRemovesMessage() + { + var messageId = await ParkMessageAsync(StoreAndForwardCategory.ExternalSystem, """{}"""); + + var actor = Sys.ActorOf(Props.Create(() => new ParkedMessageHandlerActor(_service, "site-1"))); + actor.Tell(new ParkedMessageDiscardRequest("corr-7", "site-1", messageId, DateTimeOffset.UtcNow)); + + var response = ExpectMsg(); + Assert.True(response.Success); + Assert.Equal("corr-7", response.CorrelationId); + + var msg = await _storage.GetMessageByIdAsync(messageId); + Assert.Null(msg); + } + + [Fact] + public void Discard_UnknownMessage_ReturnsFailureWithMessage() + { + var actor = Sys.ActorOf(Props.Create(() => new ParkedMessageHandlerActor(_service, "site-1"))); + actor.Tell(new ParkedMessageDiscardRequest("corr-8", "site-1", "does-not-exist", DateTimeOffset.UtcNow)); + + var response = ExpectMsg(); + Assert.False(response.Success); + Assert.NotNull(response.ErrorMessage); + } +} diff --git a/tests/ScadaLink.StoreAndForward.Tests/ScadaLink.StoreAndForward.Tests.csproj b/tests/ScadaLink.StoreAndForward.Tests/ScadaLink.StoreAndForward.Tests.csproj index 3c3675f..fb38e72 100644 --- a/tests/ScadaLink.StoreAndForward.Tests/ScadaLink.StoreAndForward.Tests.csproj +++ b/tests/ScadaLink.StoreAndForward.Tests/ScadaLink.StoreAndForward.Tests.csproj @@ -9,6 +9,7 @@ + diff --git a/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardServiceTests.cs b/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardServiceTests.cs index e7e4dda..a7f676e 100644 --- a/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardServiceTests.cs +++ b/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardServiceTests.cs @@ -177,6 +177,49 @@ public class StoreAndForwardServiceTests : IAsyncLifetime, IDisposable Assert.Equal(1, msg.RetryCount); // one sweep retry recorded } + // ── StoreAndForward-005: sweep-vs-management race hardening ── + + [Fact] + public async Task RetryMessageAsync_StatusChangedDuringDelivery_SweepParkWriteIsSkipped() + { + // StoreAndForward-005: the retry sweep's state-changing writes must be + // conditional on the status it observed, so a concurrent operator action that + // moved the row out of Pending (e.g. between the sweep's snapshot load and its + // park write) is not silently overwritten by the sweep's stale view. + var result = await _service.EnqueueAsync( + StoreAndForwardCategory.ExternalSystem, "api", """{}""", + attemptImmediateDelivery: false, maxRetries: 1); + + _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, + async msg => + { + // Simulate an operator action winning the race: the row leaves Pending + // (here: parked) while the sweep is still mid-delivery. The sweep would + // otherwise unconditionally re-write this row from its stale snapshot. + var parkedOutFromUnderTheSweep = new StoreAndForwardMessage + { + Id = msg.Id, Category = msg.Category, Target = msg.Target, + PayloadJson = msg.PayloadJson, RetryCount = 7, + MaxRetries = msg.MaxRetries, RetryIntervalMs = msg.RetryIntervalMs, + CreatedAt = msg.CreatedAt, LastAttemptAt = DateTimeOffset.UtcNow, + Status = StoreAndForwardMessageStatus.Parked, + LastError = "operator/other writer" + }; + await _storage.UpdateMessageAsync(parkedOutFromUnderTheSweep); + throw new HttpRequestException("transient — sweep will try to park"); + }); + + await _service.RetryPendingMessagesAsync(); + + // The sweep observed Pending; the row is now Parked with the other writer's + // RetryCount (7), not the sweep's (1). The sweep's conditional write was skipped. + var msg = await _storage.GetMessageByIdAsync(result.MessageId); + Assert.NotNull(msg); + Assert.Equal(StoreAndForwardMessageStatus.Parked, msg!.Status); + Assert.Equal(7, msg.RetryCount); + Assert.Equal("operator/other writer", msg.LastError); + } + [Fact] public async Task RetryPendingMessagesAsync_PermanentFailureOnRetry_ParksMessage() { diff --git a/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardStorageTests.cs b/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardStorageTests.cs index ac7ef80..a0e363c 100644 --- a/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardStorageTests.cs +++ b/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardStorageTests.cs @@ -106,6 +106,35 @@ public class StoreAndForwardStorageTests : IAsyncLifetime, IDisposable Assert.All(forRetry, m => Assert.Equal(StoreAndForwardMessageStatus.Pending, m.Status)); } + [Fact] + public async Task GetMessagesForRetryAsync_NonZeroInterval_ExcludesNotYetDueIncludesDue() + { + // StoreAndForward-013: exercise the julianday elapsed-time comparison with a + // non-zero retry interval. A message attempted just now must NOT be due; one + // attempted long ago must be due. + var notDue = CreateMessage("notdue", StoreAndForwardCategory.ExternalSystem); + notDue.RetryIntervalMs = (long)TimeSpan.FromHours(1).TotalMilliseconds; + notDue.LastAttemptAt = DateTimeOffset.UtcNow; + await _storage.EnqueueAsync(notDue); + + var due = CreateMessage("due", StoreAndForwardCategory.ExternalSystem); + due.RetryIntervalMs = (long)TimeSpan.FromMinutes(5).TotalMilliseconds; + due.LastAttemptAt = DateTimeOffset.UtcNow.AddHours(-2); + await _storage.EnqueueAsync(due); + + var neverAttempted = CreateMessage("never", StoreAndForwardCategory.ExternalSystem); + neverAttempted.RetryIntervalMs = (long)TimeSpan.FromHours(1).TotalMilliseconds; + neverAttempted.LastAttemptAt = null; + await _storage.EnqueueAsync(neverAttempted); + + var forRetry = await _storage.GetMessagesForRetryAsync(); + var ids = forRetry.Select(m => m.Id).ToHashSet(); + + Assert.DoesNotContain("notdue", ids); + Assert.Contains("due", ids); + Assert.Contains("never", ids); + } + [Fact] public async Task GetParkedMessagesAsync_ReturnsParkedOnly() { @@ -136,6 +165,31 @@ public class StoreAndForwardStorageTests : IAsyncLifetime, IDisposable Assert.Equal(0, retrieved.RetryCount); } + [Fact] + public async Task RetryParkedMessageAsync_ClearsLastAttemptAt_SoMessageIsImmediatelyDue() + { + // StoreAndForward-010: a re-queued parked message must be unambiguously due + // for the next sweep regardless of its (stale) last_attempt_at. Use a large + // retry interval so a leftover timestamp would otherwise exclude the message. + var msg = CreateMessage("requeue1", StoreAndForwardCategory.ExternalSystem); + msg.RetryIntervalMs = (long)TimeSpan.FromHours(1).TotalMilliseconds; + msg.LastAttemptAt = DateTimeOffset.UtcNow; // recent attempt + msg.Status = StoreAndForwardMessageStatus.Parked; + await _storage.EnqueueAsync(msg); + await _storage.UpdateMessageAsync(msg); + + var requeued = await _storage.RetryParkedMessageAsync("requeue1"); + Assert.True(requeued); + + var retrieved = await _storage.GetMessageByIdAsync("requeue1"); + Assert.Null(retrieved!.LastAttemptAt); + + // It must appear in the retry-due set even though the configured interval + // (1 hour) has not elapsed since the original attempt. + var due = await _storage.GetMessagesForRetryAsync(); + Assert.Contains(due, m => m.Id == "requeue1"); + } + [Fact] public async Task DiscardParkedMessageAsync_RemovesMessage() {