diff --git a/code-reviews/StoreAndForward/findings.md b/code-reviews/StoreAndForward/findings.md index eaadad9..d59d7b4 100644 --- a/code-reviews/StoreAndForward/findings.md +++ b/code-reviews/StoreAndForward/findings.md @@ -8,7 +8,7 @@ | Last reviewed | 2026-05-16 | | Reviewer | claude-agent | | Commit reviewed | `9c60592` | -| Open findings | 7 | +| Open findings | 0 (3 Deferred: 002, 011, 012 — see notes) | ## Summary @@ -97,7 +97,7 @@ commit whose message references `StoreAndForward-001`. | Severity | Low | | Original severity | High (re-triaged down to Low on 2026-05-16 — see Re-triage note) | | Category | Error handling & resilience | -| Status | Open | +| Status | Deferred | | Location | `src/ScadaLink.StoreAndForward/StoreAndForwardService.cs:162`, `:201` | **Description** @@ -152,9 +152,17 @@ should be made deliberately rather than forced here. **Resolution** -_Open — re-triaged to Low. Premise (no handler registration anywhere) is stale: Host -now wires all three handlers. Residual gap is minor and the prescribed fix is a -cross-module contract change needing a design decision._ +_Deferred 2026-05-16 (re-triaged High → Low). Verified again against the source in this +pass: the finding's premise (no `RegisterDeliveryHandler` caller anywhere) is stale — +`ScadaLink.Host` wires all three handlers at site startup — so the High-severity +"engine cannot deliver anything" outcome no longer occurs. The residual gap (a message +enqueued for a category that genuinely has no handler is buffered then skipped forever) +is real but minor. The prescribed fix — making `EnqueueAsync` reject when no handler is +registered — is a behavioural contract change that depends on whether late handler +registration is supported and requires updating tests in NotificationService and +ExternalSystemGateway (modules outside this review's edit scope). That is a deliberate +cross-module design decision, not a localised in-module bug fix, so it is **Deferred** +pending that decision rather than forced here._ ### StoreAndForward-003 — Off-by-one in retry accounting: immediate failure pre-counts as retry 1 @@ -319,7 +327,7 @@ other writer's `RetryCount`). |--|--| | Severity | Low | | Category | Performance & resource management | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.StoreAndForward/StoreAndForwardStorage.cs:166`, `:175` | **Description** @@ -339,7 +347,22 @@ removes the inconsistency. **Resolution** -_Unresolved._ +Resolved 2026-05-16 (commit pending). Confirmed the root cause against the source — +`GetParkedMessagesAsync` issued `COUNT(*)` then a paged `SELECT` as two separate +commands on the same connection with no surrounding transaction, so a write committed +between them yields a `TotalCount` inconsistent with the page. Applied the +recommendation's preferred option: both reads now run inside a single +`SqliteTransaction` (`BeginTransactionAsync`), and `CommitAsync` is called after the +page is read; SQLite's deferred read transaction freezes a consistent snapshot on the +first read so the count and page agree. Regression test +`GetParkedMessagesAsync_TransactionedReads_CountMatchesFullResultSet` is a functional +guard that the transaction wiring did not break pagination (reported `TotalCount` +agrees with the rows assembled across all pages). Note: a true red-then-green TDD test +of the *race itself* is not achievable deterministically — reproducing it requires a +concurrent writer to commit in the sub-millisecond window between the two adjacent +`SELECT`s; a concurrent stress harness passed even against the pre-fix code, so it +would not be a real regression test. The fix is nonetheless correct and matches the +finding's recommendation. ### StoreAndForward-007 — Async work in `ParkedMessageHandlerActor` uses `ContinueWith` without scheduler/affinity guarantees @@ -347,7 +370,7 @@ _Unresolved._ |--|--| | Severity | Low | | Category | Akka.NET conventions | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.StoreAndForward/ParkedMessageHandlerActor.cs:34`, `:68`, `:87` | **Description** @@ -370,7 +393,21 @@ off the actor thread safely, and makes the success/failure branches explicit. **Resolution** -_Unresolved._ +Resolved 2026-05-16 (commit pending). Confirmed the root cause: all three handlers +(`HandleQuery`, `HandleRetry`, `HandleDiscard`) used `ContinueWith(...).PipeTo(sender)` +with an `IsCompletedSuccessfully` check standing in for explicit success/failure +branches. Applied the recommendation exactly — each now uses +`PipeTo(sender, success: ..., failure: ...)`, the documented Akka pattern: the success +projection builds the normal response, the failure projection builds the error +response, and a faulted antecedent unambiguously routes to `failure` rather than +relying on an `IsCompletedSuccessfully` convention. `Sender` is still captured into a +local before the await, and the projections touch only locals. This is a +behaviour-preserving refactor; the existing `ParkedMessageHandlerActorTests` (8 tests +covering Query/Retry/Discard request-to-response mapping, correlation-ID propagation +and the unknown-message responses) act as the regression suite and all pass. No new +test was added because the observable behaviour is unchanged and the `failure` +projection cannot be exercised without a service that throws — `StoreAndForwardService` +is a concrete non-virtual type with no failure-injection seam. ### StoreAndForward-008 — A SQLite connection is opened and torn down on every storage call @@ -378,7 +415,7 @@ _Unresolved._ |--|--| | Severity | Low | | Category | Performance & resource management | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.StoreAndForward/StoreAndForwardStorage.cs:28`, `:61`, `:93`, `:117`, `:144`, `:162`, `:199`, `:221`, `:237`, `:267`, `:285`, `:305`, `:319` | **Description** @@ -398,7 +435,18 @@ the design relies on the Sqlite connection pool for acceptable performance. **Resolution** -_Unresolved._ +Resolved 2026-05-16 (commit pending). Confirmed the finding's analysis is accurate but +correctly classified as Low/not-a-correctness-bug: Microsoft.Data.Sqlite pools +connections, so the per-call `OpenAsync` reuses a pooled handle. Applied the "at +minimum" remedy from the recommendation — the `StoreAndForwardStorage` class XML +documentation now explicitly records that the connection-per-call style is a deliberate +trade-off, that the retry sweep's acceptable performance relies on the +Microsoft.Data.Sqlite connection pool, and that the remedy if profiling ever shows the +pooled open to be a hot-path bottleneck is a batched sweep API opening one connection +and transaction per sweep. The larger batched-API refactor was not undertaken because +it is not warranted at Low severity and the documented design intent removes the +"silent reliance on the pool" concern. Documentation-only change — no behavioural code +touched, so no regression test (the connection-pool reliance is not test-observable). ### StoreAndForward-009 — `OnActivity` event invocation is not thread-safe against concurrent subscribe/unsubscribe @@ -406,7 +454,7 @@ _Unresolved._ |--|--| | Severity | Low | | Category | Concurrency & thread safety | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.StoreAndForward/StoreAndForwardService.cs:46`, `:309` | **Description** @@ -430,7 +478,21 @@ notifications asynchronously. **Resolution** -_Unresolved._ +Resolved 2026-05-16 (commit pending). Confirmed — and found the impact is worse than +the finding states. `RaiseActivity` previously did `OnActivity?.Invoke(...)`; a +throwing subscriber's exception escaped it. On the `EnqueueAsync` immediate-success +path the `RaiseActivity("Delivered", ...)` call sits *inside* the delivery `try`, so a +throwing subscriber was caught by the transient-failure handler — a successfully +delivered message was then buffered, and because the catch block's own +`RaiseActivity("Queued", ...)` also threw, the exception escaped `EnqueueAsync` +entirely. `RaiseActivity` now snapshots `OnActivity`, iterates its invocation list, and +wraps each subscriber call in `try/catch` (logging and ignoring a fault) — activity +logging is best-effort and a slow/throwing subscriber can neither abort the caller nor +be misclassified as a delivery failure. Regression tests: +`EnqueueAsync_ImmediateDeliverySuccess_FaultingActivitySubscriber_StillReportsDelivered` +(failed pre-fix — the subscriber exception escaped and the call threw; passes post-fix +with `WasBuffered == false` and an empty buffer) and +`RetryMessageAsync_FaultingActivitySubscriber_DoesNotIncrementRetryCount`. ### StoreAndForward-010 — Retry of a parked message does not reset `LastAttemptAt`, so its retry timing is unspecified @@ -479,7 +541,7 @@ cleared, message excluded from the retry-due set) and passes post-fix. |--|--| | Severity | Low | | Category | Design-document adherence | -| Status | Open | +| Status | Deferred | | Location | `src/ScadaLink.Commons/Types/Enums/StoreAndForwardMessageStatus.cs:9`; `src/ScadaLink.StoreAndForward/StoreAndForwardService.cs:219`, `:235` | **Description** @@ -500,7 +562,18 @@ sweep is actively delivering (which would also help with finding 005). **Resolution** -_Unresolved._ +_Deferred 2026-05-16. Confirmed against the source: `StoreAndForwardMessageStatus` +defines `Pending, InFlight, Parked, Delivered`; a codebase-wide search shows the +StoreAndForward module only ever assigns `Pending` and `Parked`, and `InFlight` / +`Delivered` are never assigned anywhere (delivered messages are deleted, not marked). +The design doc's `retrying` state is unmodelled. Both options the recommendation offers +— (a) drop the unused `InFlight`/`Delivered` members, or (b) add a `Retrying` member — +require editing `StoreAndForwardMessageStatus.cs`, which lives in `src/ScadaLink.Commons` +(outside this review's edit scope: only `src/ScadaLink.StoreAndForward/**` may be +changed). The enum is also referenced by IntegrationTests and HealthMonitoring tests, so +removing members is a cross-module change. The defect is real but cannot be resolved +in-module; **Deferred** to a change that owns the Commons enum and the design doc +together._ ### StoreAndForward-012 — `StoreAndForwardMessage` is a persistence entity but lives in the component, not Commons @@ -508,7 +581,7 @@ _Unresolved._ |--|--| | Severity | Low | | Category | Code organization & conventions | -| Status | Open | +| Status | Deferred | | Location | `src/ScadaLink.StoreAndForward/StoreAndForwardMessage.cs:9` | **Description** @@ -532,7 +605,19 @@ local persistence model. Document the decision. **Resolution** -_Unresolved._ +_Deferred 2026-05-16. Confirmed: `StoreAndForwardMessage` is a persistence-ignorant POCO +mapping to `sf_messages` and is also carried across Akka remoting inside +`ReplicationOperation`, so it doubles as a de-facto wire contract while living in the +component assembly rather than the Commons `Entities`/`Messages` hierarchy. The +recommendation's primary remedy — moving `StoreAndForwardMessage` (and +`ReplicationOperation`) into Commons — crosses module boundaries (it would add a type to +`src/ScadaLink.Commons`, outside this review's edit scope of +`src/ScadaLink.StoreAndForward/**`, and change every referencing module). The alternative +"separate replication DTO" still leaves the persistence entity in the component, so it +does not actually resolve the finding's core concern (entity placement / contract- +evolution governance). Resolving this is a deliberate code-organisation decision that +must own the Commons hierarchy; **Deferred** rather than forced in-module. Flagged for a +cross-module follow-up._ ### StoreAndForward-013 — Critical paths lack test coverage: retry-due timing, replication-from-active, and the actor bridge diff --git a/src/ScadaLink.StoreAndForward/ParkedMessageHandlerActor.cs b/src/ScadaLink.StoreAndForward/ParkedMessageHandlerActor.cs index 8cf5aff..0f922cd 100644 --- a/src/ScadaLink.StoreAndForward/ParkedMessageHandlerActor.cs +++ b/src/ScadaLink.StoreAndForward/ParkedMessageHandlerActor.cs @@ -31,12 +31,15 @@ public class ParkedMessageHandlerActor : ReceiveActor var sender = Sender; var siteId = _siteId; + // StoreAndForward-007: idiomatic PipeTo with explicit success/failure + // projections instead of ContinueWith. Both projections touch only locals + // (captured before the await), so they are safe to run off the actor thread. _service.GetParkedMessagesAsync(category: null, msg.PageNumber, msg.PageSize) - .ContinueWith(t => - { - if (t.IsCompletedSuccessfully) + .PipeTo( + sender, + success: result => { - var entries = t.Result.Messages + var entries = result.Messages .Select(m => new ParkedMessageEntry( MessageId: m.Id, TargetSystem: m.Target, @@ -51,14 +54,12 @@ public class ParkedMessageHandlerActor : ReceiveActor .ToList(); return new ParkedMessageQueryResponse( - msg.CorrelationId, siteId, entries, t.Result.TotalCount, + msg.CorrelationId, siteId, entries, result.TotalCount, msg.PageNumber, msg.PageSize, true, null, DateTimeOffset.UtcNow); - } - - return new ParkedMessageQueryResponse( + }, + failure: ex => new ParkedMessageQueryResponse( msg.CorrelationId, siteId, [], 0, msg.PageNumber, msg.PageSize, - false, t.Exception?.GetBaseException().Message, DateTimeOffset.UtcNow); - }).PipeTo(sender); + false, ex.GetBaseException().Message, DateTimeOffset.UtcNow)); } private void HandleRetry(ParkedMessageRetryRequest msg) @@ -66,18 +67,13 @@ public class ParkedMessageHandlerActor : ReceiveActor var sender = Sender; _service.RetryParkedMessageAsync(msg.MessageId) - .ContinueWith(t => - { - if (t.IsCompletedSuccessfully) - { - return new ParkedMessageRetryResponse( - msg.CorrelationId, t.Result, - t.Result ? null : "Message not found or no longer parked."); - } - - return new ParkedMessageRetryResponse( - msg.CorrelationId, false, t.Exception?.GetBaseException().Message); - }).PipeTo(sender); + .PipeTo( + sender, + success: retried => new ParkedMessageRetryResponse( + msg.CorrelationId, retried, + retried ? null : "Message not found or no longer parked."), + failure: ex => new ParkedMessageRetryResponse( + msg.CorrelationId, false, ex.GetBaseException().Message)); } private void HandleDiscard(ParkedMessageDiscardRequest msg) @@ -85,18 +81,13 @@ public class ParkedMessageHandlerActor : ReceiveActor var sender = Sender; _service.DiscardParkedMessageAsync(msg.MessageId) - .ContinueWith(t => - { - if (t.IsCompletedSuccessfully) - { - return new ParkedMessageDiscardResponse( - msg.CorrelationId, t.Result, - t.Result ? null : "Message not found or no longer parked."); - } - - return new ParkedMessageDiscardResponse( - msg.CorrelationId, false, t.Exception?.GetBaseException().Message); - }).PipeTo(sender); + .PipeTo( + sender, + success: discarded => new ParkedMessageDiscardResponse( + msg.CorrelationId, discarded, + discarded ? null : "Message not found or no longer parked."), + failure: ex => new ParkedMessageDiscardResponse( + msg.CorrelationId, false, ex.GetBaseException().Message)); } private static string ExtractMethodName(string payloadJson, Commons.Types.Enums.StoreAndForwardCategory category) diff --git a/src/ScadaLink.StoreAndForward/StoreAndForwardService.cs b/src/ScadaLink.StoreAndForward/StoreAndForwardService.cs index 8a5af60..36ea730 100644 --- a/src/ScadaLink.StoreAndForward/StoreAndForwardService.cs +++ b/src/ScadaLink.StoreAndForward/StoreAndForwardService.cs @@ -377,9 +377,34 @@ public class StoreAndForwardService return await _storage.GetMessageCountByOriginInstanceAsync(instanceName); } + /// + /// WP-14: Raises the S&F activity notification. StoreAndForward-009: the + /// delegate is snapshotted (so a concurrent unsubscribe cannot NRE) and every + /// subscriber invocation is wrapped so a slow/throwing subscriber (e.g. the site + /// event log) cannot abort the caller. Crucially, a subscriber exception raised + /// from or RetryMessageAsync must NOT be + /// misclassified as a transient delivery failure — pre-fix it escaped into the + /// delivery try/catch and caused a successfully delivered message to be buffered + /// (or its retry count to be bumped). Activity logging is best-effort. + /// private void RaiseActivity(string action, StoreAndForwardCategory category, string detail) { - OnActivity?.Invoke(action, category, detail); + var handlers = OnActivity; + if (handlers == null) return; + + foreach (var handler in handlers.GetInvocationList().Cast>()) + { + try + { + handler(action, category, detail); + } + catch (Exception ex) + { + _logger.LogWarning(ex, + "Store-and-forward activity subscriber threw for action {Action}; ignored", + action); + } + } } } diff --git a/src/ScadaLink.StoreAndForward/StoreAndForwardStorage.cs b/src/ScadaLink.StoreAndForward/StoreAndForwardStorage.cs index 91ba254..7343f9b 100644 --- a/src/ScadaLink.StoreAndForward/StoreAndForwardStorage.cs +++ b/src/ScadaLink.StoreAndForward/StoreAndForwardStorage.cs @@ -8,6 +8,19 @@ namespace ScadaLink.StoreAndForward; /// WP-9: SQLite persistence layer for store-and-forward messages. /// Uses direct Microsoft.Data.Sqlite (not EF Core) for lightweight site-side storage. /// No max buffer size per design decision. +/// +/// StoreAndForward-008: every method opens a fresh for +/// the duration of the call rather than holding a long-lived connection. This is a +/// deliberate trade-off, not an oversight: Microsoft.Data.Sqlite maintains an internal +/// connection pool keyed on the connection string, so OpenAsync on a previously +/// used connection string reuses a pooled handle instead of performing a real file +/// open. The retry sweep therefore relies on that pool for acceptable performance — +/// it calls / +/// once per due message, and with no max buffer size (by design) the buffer can grow +/// large. The connection-per-call style keeps each method self-contained and +/// transaction-scoped; if profiling ever shows the pooled open to be a bottleneck on +/// the hot retry path, the remedy is a batched sweep API that opens one connection (and +/// one transaction) per sweep. /// public class StoreAndForwardStorage { @@ -222,6 +235,12 @@ public class StoreAndForwardStorage /// /// WP-12: Gets all parked messages, optionally filtered by category, with pagination. + /// + /// StoreAndForward-006: the COUNT(*) and the paged SELECT run inside a single + /// transaction so they observe one consistent snapshot. Without it, a concurrent + /// enqueue/park/discard arriving between the two statements yields a TotalCount + /// inconsistent with the returned page (flickering totals / off-by-one page math + /// in the paginated UI). /// public async Task<(List Messages, int TotalCount)> GetParkedMessagesAsync( StoreAndForwardCategory? category = null, @@ -231,8 +250,11 @@ public class StoreAndForwardStorage await using var connection = new SqliteConnection(_connectionString); await connection.OpenAsync(); + await using var transaction = (SqliteTransaction)await connection.BeginTransactionAsync(); + // Count await using var countCmd = connection.CreateCommand(); + countCmd.Transaction = transaction; countCmd.CommandText = category.HasValue ? "SELECT COUNT(*) FROM sf_messages WHERE status = @parked AND category = @category" : "SELECT COUNT(*) FROM sf_messages WHERE status = @parked"; @@ -242,6 +264,7 @@ public class StoreAndForwardStorage // Page await using var pageCmd = connection.CreateCommand(); + pageCmd.Transaction = transaction; var categoryFilter = category.HasValue ? " AND category = @category" : ""; pageCmd.CommandText = $@" SELECT id, category, target, payload_json, retry_count, max_retries, @@ -257,6 +280,8 @@ public class StoreAndForwardStorage pageCmd.Parameters.AddWithValue("@offset", (pageNumber - 1) * pageSize); var messages = await ReadMessagesAsync(pageCmd); + + await transaction.CommitAsync(); return (messages, totalCount); } diff --git a/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardServiceTests.cs b/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardServiceTests.cs index a7f676e..313abbd 100644 --- a/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardServiceTests.cs +++ b/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardServiceTests.cs @@ -372,6 +372,50 @@ public class StoreAndForwardServiceTests : IAsyncLifetime, IDisposable Assert.Contains("Queued", activities); } + // ── StoreAndForward-009: faulting activity subscriber must not corrupt delivery ── + + [Fact] + public async Task EnqueueAsync_ImmediateDeliverySuccess_FaultingActivitySubscriber_StillReportsDelivered() + { + // StoreAndForward-009: a throwing OnActivity subscriber (e.g. the site event + // log) must not be misclassified as a transient delivery failure. Pre-fix the + // subscriber's exception escaped RaiseActivity, was caught by EnqueueAsync's + // transient-failure handler, and a successfully delivered message was buffered. + _service.OnActivity += (_, _, _) => throw new InvalidOperationException("logging blew up"); + _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, + _ => Task.FromResult(true)); + + var result = await _service.EnqueueAsync( + StoreAndForwardCategory.ExternalSystem, "api", """{}"""); + + Assert.True(result.Accepted); + Assert.False(result.WasBuffered); // delivered, NOT buffered + + var msg = await _storage.GetMessageByIdAsync(result.MessageId); + Assert.Null(msg); // nothing left in the buffer + } + + [Fact] + public async Task RetryMessageAsync_FaultingActivitySubscriber_DoesNotIncrementRetryCount() + { + // StoreAndForward-009: a throwing subscriber raised after a successful retry + // delivery must not be caught by the retry-failure handler and counted as a + // transient failure. + var result = await _service.EnqueueAsync( + StoreAndForwardCategory.ExternalSystem, "api", """{}""", + attemptImmediateDelivery: false, maxRetries: 5); + + _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, + _ => Task.FromResult(true)); + _service.OnActivity += (_, _, _) => throw new InvalidOperationException("logging blew up"); + + await _service.RetryPendingMessagesAsync(); + + // The retry succeeded; the message must be gone, not re-buffered with a bumped count. + var msg = await _storage.GetMessageByIdAsync(result.MessageId); + Assert.Null(msg); + } + // ── WP-10: Per-source-entity retry settings ── [Fact] diff --git a/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardStorageTests.cs b/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardStorageTests.cs index a0e363c..13f103f 100644 --- a/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardStorageTests.cs +++ b/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardStorageTests.cs @@ -250,6 +250,39 @@ public class StoreAndForwardStorageTests : IAsyncLifetime, IDisposable Assert.Equal(2, page2.Count); } + [Fact] + public async Task GetParkedMessagesAsync_TransactionedReads_CountMatchesFullResultSet() + { + // StoreAndForward-006: the COUNT(*) and paged SELECT now run inside one + // transaction so they share a consistent snapshot. This functional check + // guards the fix — it verifies the transaction wiring did not break paging: + // the reported TotalCount and the rows assembled across all pages agree, and + // a page wide enough to hold every parked row contains exactly TotalCount rows. + for (int i = 0; i < 25; i++) + { + var m = CreateMessage($"txn-{i}", StoreAndForwardCategory.ExternalSystem); + m.Status = StoreAndForwardMessageStatus.Parked; + await _storage.EnqueueAsync(m); + await _storage.UpdateMessageAsync(m); + } + + var (wholePage, wholeTotal) = await _storage.GetParkedMessagesAsync(pageNumber: 1, pageSize: 1000); + Assert.Equal(25, wholeTotal); + Assert.Equal(wholeTotal, wholePage.Count); + + var collected = new List(); + int reportedTotal = -1; + for (int page = 1; ; page++) + { + var (rows, total) = await _storage.GetParkedMessagesAsync(pageNumber: page, pageSize: 7); + reportedTotal = total; + collected.AddRange(rows.Select(r => r.Id)); + if (rows.Count < 7) break; + } + Assert.Equal(reportedTotal, collected.Count); + Assert.Equal(25, collected.Distinct().Count()); + } + [Fact] public async Task GetMessageCountByStatusAsync_ReturnsAccurateCount() {