From 0135a6b2a6e5cb26bf9ff01af2bd72f1e0b83f3b Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 17 May 2026 03:18:41 -0400 Subject: [PATCH] =?UTF-8?q?fix(store-and-forward):=20resolve=20StoreAndFor?= =?UTF-8?q?ward-015..017=20=E2=80=94=20document=20maxRetries=3D0=20contrac?= =?UTF-8?q?t,=20replicate=20operator=20retry/discard,=20real=20category=20?= =?UTF-8?q?in=20activity=20log?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- code-reviews/StoreAndForward/findings.md | 45 +++++++-- .../ReplicationService.cs | 32 +++++- .../StoreAndForwardMessage.cs | 7 +- .../StoreAndForwardService.cs | 59 ++++++++++- .../StoreAndForwardReplicationTests.cs | 98 +++++++++++++++++++ .../StoreAndForwardServiceTests.cs | 54 ++++++++++ 6 files changed, 283 insertions(+), 12 deletions(-) diff --git a/code-reviews/StoreAndForward/findings.md b/code-reviews/StoreAndForward/findings.md index b1cd34d..5169d36 100644 --- a/code-reviews/StoreAndForward/findings.md +++ b/code-reviews/StoreAndForward/findings.md @@ -8,7 +8,7 @@ | Last reviewed | 2026-05-17 | | Reviewer | claude-agent | | Commit reviewed | `39d737e` | -| Open findings | 3 (3 Deferred: 002, 011, 012 — see notes) | +| Open findings | 0 (3 Deferred: 002, 011, 012 — see notes) | ## Summary @@ -735,7 +735,7 @@ all six `SiteActorPathTests` now pass. Fixed by the commit whose message referen |--|--| | Severity | Medium | | Category | Documentation & comments | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.StoreAndForward/StoreAndForwardService.cs:114`–`:130`, `:285` | **Description** @@ -784,7 +784,17 @@ public `EnqueueAsync` contract must state the chosen meaning; today it states no **Resolution** -_Unresolved._ +Resolved 2026-05-17. Documentation-only fix — retry semantics confirmed correct and +left unchanged. Root cause verified against the source: `EnqueueAsync`'s `maxRetries` +parameter had no `` documentation and the method/class summaries described only +the "park on max retries" path, never the `0 = no limit / retry forever` special case +that `RetryMessageAsync`'s `MaxRetries > 0` guard actually enforces. Added an explicit +`` tag for every `EnqueueAsync` parameter — `maxRetries` now states in bold that +`0` means "no limit, never parked for retry exhaustion" and is **not** a "never retry" +value — extended the method summary with a retry-count lifecycle paragraph, updated the +class-level lifecycle bullet, and tightened the `StoreAndForwardMessage.MaxRetries` field +doc to the same wording. No behavioural code touched; an XML comment is not +test-observable so no regression test was added. ### StoreAndForward-016 — Operator-initiated parked-message retry and discard are not replicated to the standby @@ -792,7 +802,7 @@ _Unresolved._ |--|--| | Severity | Medium | | Category | Error handling & resilience | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.StoreAndForward/StoreAndForwardService.cs:339`–`:362`; `src/ScadaLink.StoreAndForward/ReplicationService.cs:131`–`:136` | **Description** @@ -842,7 +852,18 @@ paths). **Resolution** -_Unresolved._ +Resolved 2026-05-17. Root cause confirmed: the two operator paths +(`RetryParkedMessageAsync`, `DiscardParkedMessageAsync`) changed local SQLite state but +never touched `_replication`, so a failover after an operator action diverged the +standby buffer. `DiscardParkedMessageAsync` now calls `_replication?.ReplicateRemove` +after a successful local delete (the existing `Remove` op deletes on the standby). A new +`ReplicationOperationType.Requeue` was added; `RetryParkedMessageAsync` re-loads the +requeued row and calls `_replication?.ReplicateRequeue`, and the standby's +`ApplyReplicatedOperationAsync` `Requeue` case resets its matching row to `Pending` with +`retry_count = 0`. Regression tests `DiscardingAParkedMessage_ReplicatesARemoveOperation`, +`RetryingAParkedMessage_ReplicatesARequeueOperation` and +`ApplyReplicatedOperation_Requeue_MovesStandbyRowBackToPending` (in +`StoreAndForwardReplicationTests`) all pass; the first two fail against the pre-fix code. ### StoreAndForward-017 — Retry/Discard activity-log entries hard-code the `ExternalSystem` category @@ -850,7 +871,7 @@ _Unresolved._ |--|--| | Severity | Low | | Category | Correctness & logic bugs | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.StoreAndForward/StoreAndForwardService.cs:344`, `:358` | **Description** @@ -882,4 +903,14 @@ allow a nullable category for management actions rather than asserting a false o **Resolution** -_Unresolved._ +Resolved 2026-05-17. Root cause confirmed: `RetryParkedMessageAsync` and +`DiscardParkedMessageAsync` raised activity notifications with a hard-coded +`StoreAndForwardCategory.ExternalSystem`, mislabelling parked `Notification` and +`CachedDbWrite` messages in the site event log. Both methods now obtain the message's +real category — `DiscardParkedMessageAsync` loads the row via `GetMessageByIdAsync` +before the delete, `RetryParkedMessageAsync` re-loads the requeued row (also used for +the StoreAndForward-016 replication) — and pass it to `RaiseActivity` (falling back to +`ExternalSystem` only if the row is unexpectedly absent). Regression tests +`RetryParkedMessageAsync_ActivityUsesMessageRealCategory` and +`DiscardParkedMessageAsync_ActivityUsesMessageRealCategory` assert the activity carries +`Notification` / `CachedDbWrite` respectively; both fail against the pre-fix code. diff --git a/src/ScadaLink.StoreAndForward/ReplicationService.cs b/src/ScadaLink.StoreAndForward/ReplicationService.cs index 4c687f1..9a0af9e 100644 --- a/src/ScadaLink.StoreAndForward/ReplicationService.cs +++ b/src/ScadaLink.StoreAndForward/ReplicationService.cs @@ -73,6 +73,22 @@ public class ReplicationService message)); } + /// + /// WP-11 / StoreAndForward-016: Replicates an operator-initiated requeue (a parked + /// message moved back to the pending queue) to standby (fire-and-forget). The + /// carried message reflects the active node's post-requeue state (Pending, + /// retry_count = 0) so the standby's copy can be brought into sync. + /// + public void ReplicateRequeue(StoreAndForwardMessage message) + { + if (!_options.ReplicationEnabled || _replicationHandler == null) return; + + FireAndForget(new ReplicationOperation( + ReplicationOperationType.Requeue, + message.Id, + message)); + } + /// /// WP-11: Applies a replicated operation received from the active node. /// Used by the standby node to keep its SQLite in sync. @@ -95,6 +111,15 @@ public class ReplicationService operation.Message.Status = StoreAndForwardMessageStatus.Parked; await storage.UpdateMessageAsync(operation.Message); break; + + case ReplicationOperationType.Requeue when operation.Message != null: + // StoreAndForward-016: an operator retried a parked message on the + // active node; mirror that on the standby by moving its row back to + // Pending with retry_count = 0 so a failover preserves the retry. + operation.Message.Status = StoreAndForwardMessageStatus.Pending; + operation.Message.RetryCount = 0; + await storage.UpdateMessageAsync(operation.Message); + break; } } @@ -132,5 +157,10 @@ public enum ReplicationOperationType { Add, Remove, - Park + Park, + /// + /// StoreAndForward-016: an operator moved a parked message back to the pending + /// queue. The standby resets its matching row to Pending with retry_count = 0. + /// + Requeue } diff --git a/src/ScadaLink.StoreAndForward/StoreAndForwardMessage.cs b/src/ScadaLink.StoreAndForward/StoreAndForwardMessage.cs index 5a62445..5695ed2 100644 --- a/src/ScadaLink.StoreAndForward/StoreAndForwardMessage.cs +++ b/src/ScadaLink.StoreAndForward/StoreAndForwardMessage.cs @@ -27,7 +27,12 @@ public class StoreAndForwardMessage /// public int RetryCount { get; set; } - /// Maximum retry-sweep attempts before parking (0 = no limit). + /// + /// Maximum retry-sweep attempts before the message is parked. + /// 0 = no limit — the message is retried on every sweep until delivered + /// and is never parked for exhausting retries. This is not a "never retry" + /// value; a positive value is required to bound delivery attempts. + /// public int MaxRetries { get; set; } /// Retry interval in milliseconds. diff --git a/src/ScadaLink.StoreAndForward/StoreAndForwardService.cs b/src/ScadaLink.StoreAndForward/StoreAndForwardService.cs index 36ea730..ab2618c 100644 --- a/src/ScadaLink.StoreAndForward/StoreAndForwardService.cs +++ b/src/ScadaLink.StoreAndForward/StoreAndForwardService.cs @@ -10,7 +10,8 @@ namespace ScadaLink.StoreAndForward; /// 1. Caller attempts immediate delivery via IDeliveryHandler /// 2. On transient failure → buffer in SQLite → retry loop /// 3. On success → remove from buffer -/// 4. On max retries → park +/// 4. On reaching MaxRetries → park (a MaxRetries of 0 means "no limit" — the +/// message is retried until delivered and is never parked for retry exhaustion) /// 5. Permanent failures are returned to caller immediately (never buffered) /// /// WP-10: Fixed retry interval (not exponential). Per-source-entity retry settings. @@ -116,10 +117,38 @@ public class StoreAndForwardService /// Attempts immediate delivery first. On transient failure, buffers for retry. /// On permanent failure (handler returns false), returns false immediately. /// + /// WP-10: Retry-count lifecycle — the immediate (or caller-made) delivery attempt + /// is attempt 0 and is not counted; the background retry sweep increments + /// on each retry. A buffered + /// message is parked once RetryCount reaches + /// — but only when is greater than 0. A + /// of 0 means no limit: the message is + /// retried on every sweep until it is delivered and is never parked on a + /// retry-count basis. It is therefore not a "do not retry" value — callers + /// that want delivery abandoned after a bounded number of attempts must pass a + /// positive . + /// /// WP-15: CachedCall idempotency note — this method does not deduplicate. /// The caller (e.g., ExternalSystem.CachedCall()) is responsible for ensuring /// that the remote system can handle duplicate deliveries safely. /// + /// Message category (selects the delivery handler). + /// Target system name (external system / notification list / DB connection). + /// JSON-serialized call payload, treated opaquely. + /// Instance that originated the message (WP-13: survives instance deletion). + /// + /// Maximum background retry-sweep attempts before the message is parked. + /// 0 = no limit — the message is retried on every sweep until + /// delivered and is never parked for exhausting retries; it is not a + /// "never retry" value. null uses . + /// Must be positive to bound delivery attempts. Mirrors the + /// contract. + /// + /// Fixed interval between retry sweeps for this message; null uses the configured default. + /// + /// When false, the caller has already made its own delivery attempt and the + /// message is buffered directly for the retry sweep (the handler is not invoked here). + /// public async Task EnqueueAsync( StoreAndForwardCategory category, string target, @@ -335,13 +364,27 @@ public class StoreAndForwardService /// /// WP-12: Retries a parked message (moves back to pending queue). + /// + /// StoreAndForward-016: an operator requeue is a buffer state change and is + /// replicated to the standby (as a ) + /// so a failover preserves the operator's retry intent. + /// StoreAndForward-017: the activity-log entry carries the message's true + /// category rather than a hard-coded one. /// public async Task RetryParkedMessageAsync(string messageId) { var success = await _storage.RetryParkedMessageAsync(messageId); if (success) { - RaiseActivity("Retry", StoreAndForwardCategory.ExternalSystem, + // Re-load the requeued row so the activity log gets the real category + // and the standby gets the post-requeue state (Pending, retry_count = 0). + var message = await _storage.GetMessageByIdAsync(messageId); + var category = message?.Category ?? StoreAndForwardCategory.ExternalSystem; + if (message != null) + { + _replication?.ReplicateRequeue(message); + } + RaiseActivity("Retry", category, $"Parked message {messageId} moved back to queue"); } return success; @@ -349,13 +392,23 @@ public class StoreAndForwardService /// /// WP-12: Permanently discards a parked message. + /// + /// StoreAndForward-016: an operator discard is a buffer removal and is replicated + /// to the standby (as a ) so the + /// discarded message does not reappear after a failover. + /// StoreAndForward-017: the activity-log entry carries the message's true + /// category rather than a hard-coded one. /// public async Task DiscardParkedMessageAsync(string messageId) { + // Capture the category before the row is deleted so the activity log is + // labelled correctly. + var message = await _storage.GetMessageByIdAsync(messageId); var success = await _storage.DiscardParkedMessageAsync(messageId); if (success) { - RaiseActivity("Discard", StoreAndForwardCategory.ExternalSystem, + _replication?.ReplicateRemove(messageId); + RaiseActivity("Discard", message?.Category ?? StoreAndForwardCategory.ExternalSystem, $"Parked message {messageId} discarded"); } return success; diff --git a/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardReplicationTests.cs b/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardReplicationTests.cs index 9b60c1f..52629a3 100644 --- a/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardReplicationTests.cs +++ b/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardReplicationTests.cs @@ -105,4 +105,102 @@ public class StoreAndForwardReplicationTests : IAsyncLifetime, IDisposable Assert.Contains(ops, o => o.OperationType == ReplicationOperationType.Park && o.MessageId == result.MessageId); } + + /// + /// StoreAndForward-016: an operator discarding a parked message must replicate + /// a Remove so the standby's copy is also deleted (otherwise the discarded + /// message reappears in the parked list after a failover). + /// + [Fact] + public async Task DiscardingAParkedMessage_ReplicatesARemoveOperation() + { + _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, + _ => throw new HttpRequestException("always fails")); + + var result = await _service.EnqueueAsync( + StoreAndForwardCategory.ExternalSystem, "api", """{}""", maxRetries: 1); + await _service.RetryPendingMessagesAsync(); // -> parked + await WaitForReplicationAsync(2); + + var discarded = await _service.DiscardParkedMessageAsync(result.MessageId); + Assert.True(discarded); + + var ops = await WaitForReplicationAsync(3); + Assert.Contains(ops, o => + o.OperationType == ReplicationOperationType.Remove && o.MessageId == result.MessageId); + } + + /// + /// StoreAndForward-016: an operator retrying a parked message must replicate a + /// Requeue so the standby's copy moves back to Pending (otherwise it stays + /// Parked on the standby and the operator's retry is lost across a failover). + /// + [Fact] + public async Task RetryingAParkedMessage_ReplicatesARequeueOperation() + { + _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, + _ => throw new HttpRequestException("always fails")); + + var result = await _service.EnqueueAsync( + StoreAndForwardCategory.ExternalSystem, "api", """{}""", maxRetries: 1); + await _service.RetryPendingMessagesAsync(); // -> parked + await WaitForReplicationAsync(2); + + var retried = await _service.RetryParkedMessageAsync(result.MessageId); + Assert.True(retried); + + var ops = await WaitForReplicationAsync(3); + var requeue = ops.SingleOrDefault(o => + o.OperationType == ReplicationOperationType.Requeue && o.MessageId == result.MessageId); + Assert.NotNull(requeue); + Assert.NotNull(requeue!.Message); + Assert.Equal(StoreAndForwardMessageStatus.Pending, requeue.Message!.Status); + } + + /// + /// StoreAndForward-016: the standby applies a Requeue by moving its row back to + /// Pending with retry_count = 0, mirroring the active node's local state. + /// + [Fact] + public async Task ApplyReplicatedOperation_Requeue_MovesStandbyRowBackToPending() + { + var replication = new ReplicationService( + new StoreAndForwardOptions { ReplicationEnabled = true }, + NullLogger.Instance); + + var parked = new StoreAndForwardMessage + { + Id = "requeue1", + Category = StoreAndForwardCategory.ExternalSystem, + Target = "api", + PayloadJson = "{}", + RetryCount = 5, + MaxRetries = 1, + RetryIntervalMs = 0, + CreatedAt = DateTimeOffset.UtcNow, + Status = StoreAndForwardMessageStatus.Parked, + }; + await _storage.EnqueueAsync(parked); + + var requeued = new StoreAndForwardMessage + { + Id = parked.Id, + Category = parked.Category, + Target = parked.Target, + PayloadJson = parked.PayloadJson, + RetryCount = 0, + MaxRetries = parked.MaxRetries, + RetryIntervalMs = parked.RetryIntervalMs, + CreatedAt = parked.CreatedAt, + Status = StoreAndForwardMessageStatus.Pending, + }; + await replication.ApplyReplicatedOperationAsync( + new ReplicationOperation(ReplicationOperationType.Requeue, parked.Id, requeued), + _storage); + + var row = await _storage.GetMessageByIdAsync(parked.Id); + Assert.NotNull(row); + Assert.Equal(StoreAndForwardMessageStatus.Pending, row!.Status); + Assert.Equal(0, row.RetryCount); + } } diff --git a/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardServiceTests.cs b/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardServiceTests.cs index 313abbd..f5b1106 100644 --- a/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardServiceTests.cs +++ b/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardServiceTests.cs @@ -267,6 +267,60 @@ public class StoreAndForwardServiceTests : IAsyncLifetime, IDisposable Assert.Equal(0, msg.RetryCount); } + /// + /// StoreAndForward-017: the Retry activity-log entry must carry the parked + /// message's true category, not a hard-coded ExternalSystem. + /// + [Fact] + public async Task RetryParkedMessageAsync_ActivityUsesMessageRealCategory() + { + _service.RegisterDeliveryHandler(StoreAndForwardCategory.Notification, + _ => throw new HttpRequestException("fail")); + + var result = await _service.EnqueueAsync( + StoreAndForwardCategory.Notification, "ops-list", """{}""", + maxRetries: 1); + await _service.RetryPendingMessagesAsync(); // -> parked + + var categories = new List(); + _service.OnActivity += (action, category, _) => + { + if (action == "Retry") categories.Add(category); + }; + + var retried = await _service.RetryParkedMessageAsync(result.MessageId); + Assert.True(retried); + + Assert.Equal(new[] { StoreAndForwardCategory.Notification }, categories); + } + + /// + /// StoreAndForward-017: the Discard activity-log entry must carry the parked + /// message's true category, not a hard-coded ExternalSystem. + /// + [Fact] + public async Task DiscardParkedMessageAsync_ActivityUsesMessageRealCategory() + { + _service.RegisterDeliveryHandler(StoreAndForwardCategory.CachedDbWrite, + _ => throw new HttpRequestException("fail")); + + var result = await _service.EnqueueAsync( + StoreAndForwardCategory.CachedDbWrite, "site-db", """{}""", + maxRetries: 1); + await _service.RetryPendingMessagesAsync(); // -> parked + + var categories = new List(); + _service.OnActivity += (action, category, _) => + { + if (action == "Discard") categories.Add(category); + }; + + var discarded = await _service.DiscardParkedMessageAsync(result.MessageId); + Assert.True(discarded); + + Assert.Equal(new[] { StoreAndForwardCategory.CachedDbWrite }, categories); + } + [Fact] public async Task DiscardParkedMessageAsync_PermanentlyRemoves() {