4 Commits

Author SHA1 Message Date
Joseph Doherty 61253e3269 fix(store-and-forward): resolve S&F delivery + replication wiring (3 Critical findings)
Resolves StoreAndForward-001, ExternalSystemGateway-001, NotificationService-001
— one systemic gap where buffered messages were persisted but never delivered,
and the active node never replicated its buffer to the standby.

Delivery handlers (ExternalSystemGateway-001 / NotificationService-001):
- AkkaHostedService registers delivery handlers for the ExternalSystem,
  CachedDbWrite and Notification categories after StoreAndForwardService starts;
  each resolves its scoped consumer in a fresh DI scope.
- ExternalSystemClient, DatabaseGateway and NotificationDeliveryService each
  gain a DeliverBufferedAsync method: re-resolve the target and re-attempt
  delivery, returning true/false/throwing per the transient-vs-permanent contract.
- EnqueueAsync gains an attemptImmediateDelivery flag; CachedCallAsync and
  NotificationDeliveryService.SendAsync pass false (they already attempted
  delivery themselves) so registering a handler does not dispatch twice.

Replication (StoreAndForward-001):
- ReplicationService is injected into StoreAndForwardService; a new BufferAsync
  helper replicates every enqueue, and successful-retry removes and parks are
  replicated too. Fire-and-forget, no-op when replication is disabled.

Tests: StoreAndForwardReplicationTests (Add/Remove/Park observed),
attemptImmediateDelivery behaviour, and DeliverBufferedAsync paths for each
consumer. Full solution builds; StoreAndForward/ExternalSystemGateway/
NotificationService suites green.
2026-05-16 18:58:11 -04:00
Joseph Doherty a9bd7ee37c fix(central-ui): resolve CentralUI-001 — enforce script trust model before sandbox execution
ScriptAnalysisService.RunInSandboxAsync compiled and executed arbitrary
user C# in the central host process with no trust-model enforcement — the
forbidden-API set was only a Monaco editor diagnostic. A Design-role user
could run System.IO/Process/Reflection/network code on the central node.

Added a Roslyn semantic gate (EnforceTrustModel) invoked after compilation
and before script.RunAsync, and on nested shared scripts in callSharedFunc;
a script referencing any forbidden API is rejected before it runs.

Reworked FindForbiddenApiUsages: it now resolves every identifier against
the semantic model and checks types and members, so a fully-qualified call
(System.IO.File.WriteAllText) is caught — the pre-fix check only inspected
the leftmost identifier and missed that shape. This is a static semantic
gate, not a process sandbox.

Adds gate regression tests that fail against the pre-fix code, plus a
clean-script test guarding against over-blocking.
2026-05-16 18:41:12 -04:00
Joseph Doherty a9ceba00d0 fix(communication): resolve Communication-001 — early stream termination handling
DebugStreamService.StartStreamAsync awaited the initial debug snapshot inside
a try whose only handler was catch (OperationCanceledException). When the
stream terminated before the snapshot arrived, onTerminatedWrapper completed
the await with an InvalidOperationException that escaped the catch — the
caller got a raw, untranslated exception and the service did no teardown of
its own on that path.

Replaced with catch (Exception): it removes the session entry, sends
StopDebugStream to the bridge actor via the local reference (deterministic
teardown, idempotent), and throws a descriptive exception — TimeoutException
for the 30s timeout, otherwise an InvalidOperationException naming the
instance/site and wrapping the cause.

Re-triaged Critical -> Medium: the originally-claimed multi-minute site-side
resource leak does not occur (the bridge actor self-terminates on every
onTerminated path). Adds the first DebugStreamService test, which fails
against the pre-fix code.
2026-05-16 18:32:52 -04:00
Joseph Doherty 239bee3bc4 fix(data-connection): resolve DataConnectionLayer-001 — off-thread actor state mutation
HandleSubscribe spawned a Task.Run that mutated DataConnectionActor private
state (_subscriptionIds, _subscriptionsByInstance, _totalSubscribed,
_resolvedTags, _unresolvedTags) from a thread-pool thread, racing the actor's
own message loop — a data race on non-thread-safe Dictionary/HashSet and
non-atomic counters.

Restructured HandleSubscribe to follow the actor's existing PipeTo(Self)
pattern: the background task now performs only adapter I/O and pipes a
SubscribeCompleted message to Self; all subscription-state mutation happens
in the new HandleSubscribeCompleted handler on the actor thread (wired into
the Connected, Connecting and Reconnecting states).

Adds DCL001_ConcurrentSubscribes_DoNotCorruptSubscriptionCounters (30x30
concurrent subscribes) which fails against the pre-fix code and passes after.
2026-05-16 18:26:43 -04:00
24 changed files with 1068 additions and 143 deletions
+18 -3
View File
@@ -8,7 +8,7 @@
| Last reviewed | 2026-05-16 | | Last reviewed | 2026-05-16 |
| Reviewer | claude-agent | | Reviewer | claude-agent |
| Commit reviewed | `9c60592` | | Commit reviewed | `9c60592` |
| Open findings | 19 | | Open findings | 18 |
## Summary ## Summary
@@ -55,7 +55,7 @@ pages and the auth bridge are untested.
|--|--| |--|--|
| Severity | Critical | | Severity | Critical |
| Category | Security | | Category | Security |
| Status | Open | | Status | Resolved |
| Location | `src/ScadaLink.CentralUI/ScriptAnalysis/ScriptAnalysisService.cs:171-424` | | Location | `src/ScadaLink.CentralUI/ScriptAnalysis/ScriptAnalysisService.cs:171-424` |
**Description** **Description**
@@ -85,7 +85,22 @@ an editor hint.
**Resolution** **Resolution**
_Unresolved._ Resolved 2026-05-16. A Roslyn semantic trust-model gate was added. `RunInSandboxAsync`
now calls `EnforceTrustModel` after compilation and before `script.RunAsync`; if the
script references any forbidden API the run is rejected (`SandboxErrorKind.CompileError`)
with the offending markers, and the same gate is applied to nested shared scripts in
`callSharedFunc`. `FindForbiddenApiUsages` was reworked so it resolves every identifier
(not just the leftmost) against the semantic model and checks types **and** members —
so a fully-qualified call such as `System.IO.File.WriteAllText(...)` is now caught, not
only `using`-directive or bare-type forms. This is a static semantic gate consistent
with the documented trust model; it is not a process sandbox — reflection-based
indirection remains out of its reach, and full isolation would require running scripts
in a separate constrained process (a larger change deliberately not taken here).
Regression tests `RunInSandbox_FullyQualifiedForbiddenApi_IsBlockedBeforeExecution`,
`RunInSandbox_ForbiddenUsingDirective_IsBlockedBeforeExecution` and
`Diagnose_FullyQualifiedForbiddenCall_RaisesSCADA002` fail against the pre-fix code and
pass after; `RunInSandbox_CleanScript_StillRuns` guards against over-blocking. Fixed by
the commit whose message references `CentralUI-001`.
### CentralUI-002 — Site-scoped Deployment permissions are issued but never enforced ### CentralUI-002 — Site-scoped Deployment permissions are issued but never enforced
+39 -29
View File
@@ -8,7 +8,7 @@
| Last reviewed | 2026-05-16 | | Last reviewed | 2026-05-16 |
| Reviewer | claude-agent | | Reviewer | claude-agent |
| Commit reviewed | `9c60592` | | Commit reviewed | `9c60592` |
| Open findings | 11 | | Open findings | 10 |
## Summary ## Summary
@@ -16,9 +16,7 @@ The Communication module is generally well-structured and matches the design doc
two-transport model (ClusterClient for command/control, gRPC server-streaming for two-transport model (ClusterClient for command/control, gRPC server-streaming for
real-time data). The actors keep mutable state on the actor thread, use `PipeTo` for real-time data). The actors keep mutable state on the actor thread, use `PipeTo` for
async work, and the gRPC server/client lifecycle is mostly disciplined. However the async work, and the gRPC server/client lifecycle is mostly disciplined. However the
review found one Critical issue (a `TimeoutException` from `DebugStreamService` leaves review found several High and Medium issues clustered around two themes:
an orphaned bridge actor and an active site-side subscription, leaking resources on
every snapshot timeout) and several High/Medium issues clustered around two themes:
**(a) gRPC subscription bookkeeping races** — `SiteStreamGrpcClient` overwrites and **(a) gRPC subscription bookkeeping races** — `SiteStreamGrpcClient` overwrites and
removes subscription entries by correlation ID without disposal or ownership checks, removes subscription entries by correlation ID without disposal or ownership checks,
so reconnect cycles leak `CancellationTokenSource`es and can cancel the wrong stream; so reconnect cycles leak `CancellationTokenSource`es and can cancel the wrong stream;
@@ -44,43 +42,55 @@ mutation races, and the snapshot-timeout cleanup path.
## Findings ## Findings
### Communication-001 — Snapshot timeout leaves orphaned bridge actor and site subscription ### Communication-001 — Early stream termination escapes StartStreamAsync's narrow exception handling
| | | | | |
|--|--| |--|--|
| Severity | Critical | | Severity | Medium |
| Category | Performance & resource management | | Category | Error handling & resilience |
| Status | Open | | Status | Resolved |
| Location | `src/ScadaLink.Communication/DebugStreamService.cs:139`, `src/ScadaLink.Communication/DebugStreamService.cs:149` | | Location | `src/ScadaLink.Communication/DebugStreamService.cs:130-143` |
**Re-triaged 2026-05-16:** originally filed Critical, claiming an orphaned bridge actor
and a multi-minute site-side resource leak on every snapshot timeout. On verification
that impact does **not** occur: `DebugStreamBridgeActor` calls `CleanupGrpc()` and
`Context.Stop(Self)` on every path that invokes `onTerminated` (site disconnect, gRPC
max-retries, `ReceiveTimeout`), so it always self-terminates and releases its gRPC
subscription; and the pure-timeout path does reach `StopStream`, which also stops it.
The genuine defect described below is an error-handling gap, not a leak — severity
corrected to Medium.
**Description** **Description**
When `StartStreamAsync` times out waiting for the initial snapshot it calls `StartStreamAsync` awaits the initial snapshot inside a `try` whose only handler is
`StopStream(sessionId)` and throws. `StopStream` only sends `StopDebugStream` to the `catch (OperationCanceledException)`. When the stream terminates before the snapshot
bridge actor **if the session is still in `_sessions`**. But the bridge actor was added arrives, `onTerminatedWrapper` completes the await via
to `_sessions` at line 124 and is only removed by `onTerminatedWrapper`. The serious `snapshotTcs.TrySetException(new InvalidOperationException(...))`. That
case is the race where `onTerminatedWrapper` fires first (e.g. site disconnect arrives `InvalidOperationException` is not an `OperationCanceledException`, so it escapes the
during the wait): `snapshotTcs.TrySetException` completes the await with an catch entirely: the caller (Blazor debug view / SignalR hub) receives a raw,
`InvalidOperationException` rather than `OperationCanceledException`, which is **not** untranslated exception, and `StartStreamAsync` performs no teardown of its own on that
caught by the `catch (OperationCanceledException)` block. The exception propagates path — it relies implicitly on the bridge actor self-terminating. Cleanup from the
uncaught, `StopStream` is never reached, and if the bridge actor is instead orphaned service side is therefore not deterministic, and the failure surfaced to the caller is
(snapshot never arrives, site silent, no terminate) the only cleanup is the 5-minute not a meaningful, documented result.
`ReceiveTimeout` in the actor — meaning a site-side `StreamRelayActor` and gRPC stream
can stay alive for up to 5 minutes after the central caller has given up. Combined with
the 30s timeout, every transient snapshot delay leaks site resources for minutes.
**Recommendation** **Recommendation**
In `StartStreamAsync`, wrap the `await` so that *any* failure or cancellation In `StartStreamAsync`, catch any exception from the snapshot await, deterministically
deterministically calls `StopStream(sessionId)` (e.g. `try/catch (Exception)` or a tear down the bridge actor (`Tell(StopDebugStream)` via the local actor reference, since
`finally` that stops the session when the result was not returned). Ensure a racing `onTerminatedWrapper` may already have removed the session entry), and translate
`StopStream` is idempotent and always sends `StopDebugStream` even if the session was the failure into a meaningful exception for the caller.
already removed, so the bridge actor (and its site-side subscription) is torn down
promptly rather than waiting for the orphan `ReceiveTimeout`.
**Resolution** **Resolution**
_Unresolved._ Resolved 2026-05-16. The `catch (OperationCanceledException)`-only block in
`StartStreamAsync` was replaced with `catch (Exception)`: it removes the session entry,
sends `StopDebugStream` to the bridge actor via the local reference (idempotent — the
actor may already be stopping itself), and throws a descriptive exception —
`TimeoutException` for the 30s timeout, otherwise an `InvalidOperationException` that
names the instance/site and wraps the underlying cause. Regression test
`DebugStreamServiceTests.StartStreamAsync_StreamTerminatesBeforeSnapshot_ThrowsMeaningfulException`
fails against the pre-fix code and passes after. Fixed by the commit whose message
references `Communication-001`.
### Communication-002 — gRPC reconnect does not unsubscribe the previous stream, leaking site-side relay actors ### Communication-002 — gRPC reconnect does not unsubscribe the previous stream, leaking site-side relay actors
+14 -3
View File
@@ -8,7 +8,7 @@
| Last reviewed | 2026-05-16 | | Last reviewed | 2026-05-16 |
| Reviewer | claude-agent | | Reviewer | claude-agent |
| Commit reviewed | `9c60592` | | Commit reviewed | `9c60592` |
| Open findings | 13 | | Open findings | 12 |
## Summary ## Summary
@@ -53,7 +53,7 @@ tag-resolution retry, disconnect/re-subscribe, and concurrency around `HandleSub
|--|--| |--|--|
| Severity | Critical | | Severity | Critical |
| Category | Concurrency & thread safety | | Category | Concurrency & thread safety |
| Status | Open | | Status | Resolved |
| Location | `src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs:473-538` | | Location | `src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs:473-538` |
**Description** **Description**
@@ -82,7 +82,18 @@ handler too.
**Resolution** **Resolution**
_Unresolved._ Resolved 2026-05-16. `HandleSubscribe` was restructured to follow the actor's own
`PipeTo(Self)` pattern (the one already used by `HandleRetryTagResolution`): the
background `Task.Run` now performs only adapter I/O (`SubscribeAsync`/`ReadAsync`),
collects per-tag outcomes into an immutable `SubscribeCompleted` message, and pipes
that to `Self`. All mutation of `_subscriptionIds`, `_subscriptionsByInstance`,
`_totalSubscribed`, `_resolvedTags` and `_unresolvedTags` now happens in the new
`HandleSubscribeCompleted` handler on the actor thread; it is wired into the
Connected, Connecting and Reconnecting states so an in-flight subscribe is applied
regardless of state transitions. Regression test
`DCL001_ConcurrentSubscribes_DoNotCorruptSubscriptionCounters` (30×30 concurrent
subscribes) fails against the pre-fix code and passes after. Fixed by the commit
whose message references `DataConnectionLayer-001`.
### DataConnectionLayer-002 — `Restart` supervision discards all subscription state on connection-actor crash ### DataConnectionLayer-002 — `Restart` supervision discards all subscription state on connection-actor crash
+15 -3
View File
@@ -8,7 +8,7 @@
| Last reviewed | 2026-05-16 | | Last reviewed | 2026-05-16 |
| Reviewer | claude-agent | | Reviewer | claude-agent |
| Commit reviewed | `9c60592` | | Commit reviewed | `9c60592` |
| Open findings | 14 | | Open findings | 13 |
## Summary ## Summary
@@ -53,7 +53,7 @@ requirements (timeout, retry settings) that are declared but not implemented.
|--|--| |--|--|
| Severity | Critical | | Severity | Critical |
| Category | Error handling & resilience | | Category | Error handling & resilience |
| Status | Open | | Status | Resolved |
| Location | `src/ScadaLink.ExternalSystemGateway/ExternalSystemClient.cs:109`, `src/ScadaLink.ExternalSystemGateway/DatabaseGateway.cs:81` | | Location | `src/ScadaLink.ExternalSystemGateway/ExternalSystemClient.cs:109`, `src/ScadaLink.ExternalSystemGateway/DatabaseGateway.cs:81` |
**Description** **Description**
@@ -89,7 +89,19 @@ verifies it is delivered by a retry sweep.
**Resolution** **Resolution**
_Unresolved._ Resolved 2026-05-16. Delivery handlers for `StoreAndForwardCategory.ExternalSystem` and
`CachedDbWrite` are now registered at site startup in `AkkaHostedService`, after
`StoreAndForwardService.StartAsync()`. Each handler resolves its consumer in a fresh DI
scope and calls a new `DeliverBufferedAsync`: `ExternalSystemClient.DeliverBufferedAsync`
re-resolves the system/method and re-invokes `InvokeHttpAsync`, and
`DatabaseGateway.DeliverBufferedAsync` executes the buffered SQL — each returning `true`
on success, `false` (park) when the target no longer exists or fails permanently, and
throwing on transient failure so the engine retries. `EnqueueAsync` gained an
`attemptImmediateDelivery` parameter; `CachedCallAsync` passes `false` so registering the
handler does not dispatch the request twice (the double-dispatch noted in
`ExternalSystemGateway-003`). Regression tests cover the success, target-removed and
transient-retry paths. Fixed by the commit whose message references
`ExternalSystemGateway-001`.
### ExternalSystemGateway-002 — Per-system call timeout is never applied to HTTP requests ### ExternalSystemGateway-002 — Per-system call timeout is never applied to HTTP requests
+11 -3
View File
@@ -8,7 +8,7 @@
| Last reviewed | 2026-05-16 | | Last reviewed | 2026-05-16 |
| Reviewer | claude-agent | | Reviewer | claude-agent |
| Commit reviewed | `9c60592` | | Commit reviewed | `9c60592` |
| Open findings | 12 | | Open findings | 11 |
## Summary ## Summary
@@ -53,7 +53,7 @@ fallback in `DeliverAsync`, and concurrency on the token cache.
|--|--| |--|--|
| Severity | Critical | | Severity | Critical |
| Category | Error handling & resilience | | Category | Error handling & resilience |
| Status | Open | | Status | Resolved |
| Location | `src/ScadaLink.NotificationService/NotificationDeliveryService.cs:96`, `src/ScadaLink.NotificationService/ServiceCollectionExtensions.cs:8` | | Location | `src/ScadaLink.NotificationService/NotificationDeliveryService.cs:96`, `src/ScadaLink.NotificationService/ServiceCollectionExtensions.cs:8` |
**Description** **Description**
@@ -66,7 +66,15 @@ Register a delivery handler for `StoreAndForwardCategory.Notification` during st
**Resolution** **Resolution**
_Unresolved._ Resolved 2026-05-16. A delivery handler for `StoreAndForwardCategory.Notification` is now
registered at site startup in `AkkaHostedService`. The handler resolves
`NotificationDeliveryService` in a fresh DI scope and calls the new `DeliverBufferedAsync`,
which re-resolves the list, recipients and SMTP config and re-attempts delivery —
returning `true` on success, `false` (park) on permanent failure or missing
configuration, and throwing on transient failure so the engine retries. `SendAsync` now
buffers with `attemptImmediateDelivery: false` so registering the handler does not send
the notification twice. Regression tests cover the happy path and the list-removed park
path. Fixed by the commit whose message references `NotificationService-001`.
### NotificationService-002 — `TimeoutException`/`OperationCanceledException` misclassified as transient ### NotificationService-002 — `TimeoutException`/`OperationCanceledException` misclassified as transient
+32 -36
View File
@@ -28,56 +28,52 @@ code-reviews/
## Baseline review — 2026-05-16 ## Baseline review — 2026-05-16
All 19 modules were reviewed at commit `9c60592`. This established the baseline below. All 19 modules were reviewed at commit `9c60592` (241 findings: 6 Critical, 46 High,
100 Medium, 89 Low). The tables below track what remains **open** as findings are
resolved and re-triaged.
| Severity | Open findings | | Severity | Open findings |
|----------|---------------| |----------|---------------|
| Critical | 6 | | Critical | 0 |
| High | 46 | | High | 46 |
| Medium | 100 | | Medium | 100 |
| Low | 89 | | Low | 89 |
| **Total** | **241** | | **Total** | **235** |
## Module Status ## Module Status
| Module | Review status | Last reviewed | Commit | Open (C/H/M/L) | Total | | Module | Last reviewed | Commit | Open (C/H/M/L) | Open | Total |
|--------|---------------|---------------|--------|----------------|-------| |--------|---------------|--------|----------------|------|-------|
| [CentralUI](CentralUI/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 1/3/10/5 | 19 | | [CentralUI](CentralUI/findings.md) | 2026-05-16 | `9c60592` | 0/3/10/5 | 18 | 19 |
| [CLI](CLI/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/1/6/6 | 13 | | [CLI](CLI/findings.md) | 2026-05-16 | `9c60592` | 0/1/6/6 | 13 | 13 |
| [ClusterInfrastructure](ClusterInfrastructure/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/1/4/3 | 8 | | [ClusterInfrastructure](ClusterInfrastructure/findings.md) | 2026-05-16 | `9c60592` | 0/1/4/3 | 8 | 8 |
| [Commons](Commons/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/0/4/8 | 12 | | [Commons](Commons/findings.md) | 2026-05-16 | `9c60592` | 0/0/4/8 | 12 | 12 |
| [Communication](Communication/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 1/2/5/3 | 11 | | [Communication](Communication/findings.md) | 2026-05-16 | `9c60592` | 0/2/5/3 | 10 | 11 |
| [ConfigurationDatabase](ConfigurationDatabase/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/1/4/6 | 11 | | [ConfigurationDatabase](ConfigurationDatabase/findings.md) | 2026-05-16 | `9c60592` | 0/1/4/6 | 11 | 11 |
| [DataConnectionLayer](DataConnectionLayer/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 1/4/6/2 | 13 | | [DataConnectionLayer](DataConnectionLayer/findings.md) | 2026-05-16 | `9c60592` | 0/4/6/2 | 12 | 13 |
| [DeploymentManager](DeploymentManager/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/3/6/5 | 14 | | [DeploymentManager](DeploymentManager/findings.md) | 2026-05-16 | `9c60592` | 0/3/6/5 | 14 | 14 |
| [ExternalSystemGateway](ExternalSystemGateway/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 1/2/7/4 | 14 | | [ExternalSystemGateway](ExternalSystemGateway/findings.md) | 2026-05-16 | `9c60592` | 0/2/7/4 | 13 | 14 |
| [HealthMonitoring](HealthMonitoring/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/2/5/5 | 12 | | [HealthMonitoring](HealthMonitoring/findings.md) | 2026-05-16 | `9c60592` | 0/2/5/5 | 12 | 12 |
| [Host](Host/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/1/3/7 | 11 | | [Host](Host/findings.md) | 2026-05-16 | `9c60592` | 0/1/3/7 | 11 | 11 |
| [InboundAPI](InboundAPI/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/3/5/5 | 13 | | [InboundAPI](InboundAPI/findings.md) | 2026-05-16 | `9c60592` | 0/3/5/5 | 13 | 13 |
| [ManagementService](ManagementService/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/3/5/5 | 13 | | [ManagementService](ManagementService/findings.md) | 2026-05-16 | `9c60592` | 0/3/5/5 | 13 | 13 |
| [NotificationService](NotificationService/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 1/3/5/3 | 12 | | [NotificationService](NotificationService/findings.md) | 2026-05-16 | `9c60592` | 0/3/5/3 | 11 | 12 |
| [Security](Security/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/3/4/4 | 11 | | [Security](Security/findings.md) | 2026-05-16 | `9c60592` | 0/3/4/4 | 11 | 11 |
| [SiteEventLogging](SiteEventLogging/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/4/4/3 | 11 | | [SiteEventLogging](SiteEventLogging/findings.md) | 2026-05-16 | `9c60592` | 0/4/4/3 | 11 | 11 |
| [SiteRuntime](SiteRuntime/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/3/8/5 | 16 | | [SiteRuntime](SiteRuntime/findings.md) | 2026-05-16 | `9c60592` | 0/3/8/5 | 16 | 16 |
| [StoreAndForward](StoreAndForward/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 1/2/4/6 | 13 | | [StoreAndForward](StoreAndForward/findings.md) | 2026-05-16 | `9c60592` | 0/2/4/6 | 12 | 13 |
| [TemplateEngine](TemplateEngine/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/5/5/4 | 14 | | [TemplateEngine](TemplateEngine/findings.md) | 2026-05-16 | `9c60592` | 0/5/5/4 | 14 | 14 |
## Pending Findings ## Pending Findings
All findings are currently `Open`. As findings are resolved, remove them from the Every `Open` / `In Progress` finding across all modules, highest severity first.
tables below (see [REVIEW-PROCESS.md](REVIEW-PROCESS.md) §5). Full detail for each Resolved findings drop off this list but remain recorded in their module's
finding — description, location, recommendation — lives in the module's `findings.md`. `findings.md` (see [REVIEW-PROCESS.md](REVIEW-PROCESS.md) §4–§5). Full detail —
description, location, recommendation — lives in the module's `findings.md`.
### Critical (6) ### Critical (0)
| ID | Module | Title | _None open._
|----|--------|-------|
| CentralUI-001 | [CentralUI](CentralUI/findings.md) | Test Run sandbox executes arbitrary C# with no trust-model enforcement |
| Communication-001 | [Communication](Communication/findings.md) | Snapshot timeout leaves orphaned bridge actor and site subscription |
| DataConnectionLayer-001 | [DataConnectionLayer](DataConnectionLayer/findings.md) | `Task.Run` in `HandleSubscribe` mutates actor state off the actor thread |
| ExternalSystemGateway-001 | [ExternalSystemGateway](ExternalSystemGateway/findings.md) | No S&F delivery handler registered; cached calls and writes can never be delivered |
| NotificationService-001 | [NotificationService](NotificationService/findings.md) | Buffered notifications are never retried (no S&F delivery handler) |
| StoreAndForward-001 | [StoreAndForward](StoreAndForward/findings.md) | Replication to standby is never triggered by the active node |
### High (46) ### High (46)
+10 -3
View File
@@ -8,7 +8,7 @@
| Last reviewed | 2026-05-16 | | Last reviewed | 2026-05-16 |
| Reviewer | claude-agent | | Reviewer | claude-agent |
| Commit reviewed | `9c60592` | | Commit reviewed | `9c60592` |
| Open findings | 13 | | Open findings | 12 |
## Summary ## Summary
@@ -53,7 +53,7 @@ replication and retry-count issues are functional defects against the design.
|--|--| |--|--|
| Severity | Critical | | Severity | Critical |
| Category | Error handling & resilience | | Category | Error handling & resilience |
| Status | Open | | Status | Resolved |
| Location | `src/ScadaLink.StoreAndForward/ReplicationService.cs:40`, `:53`, `:66`; `src/ScadaLink.StoreAndForward/StoreAndForwardService.cs:155`, `:212`, `:222`, `:236` | | Location | `src/ScadaLink.StoreAndForward/ReplicationService.cs:40`, `:53`, `:66`; `src/ScadaLink.StoreAndForward/StoreAndForwardService.cs:155`, `:212`, `:222`, `:236` |
**Description** **Description**
@@ -81,7 +81,14 @@ asserts the replication handler observes each operation type.
**Resolution** **Resolution**
_Unresolved._ Resolved 2026-05-16. `ReplicationService` is now injected into `StoreAndForwardService`
(wired in `AddStoreAndForward`), and every buffer operation is forwarded to the standby:
a new `BufferAsync` helper calls `ReplicateEnqueue` after each persist, `ReplicateRemove`
runs after a successful retry removes a message, and `ReplicatePark` runs on both park
paths. Replication stays fire-and-forget and is a no-op when `ReplicationEnabled` is
false or no handler is wired. Regression tests `StoreAndForwardReplicationTests` assert
the replication handler observes the Add, Remove and Park operations. Fixed by the
commit whose message references `StoreAndForward-001`.
### StoreAndForward-002 — Messages enqueued with no registered handler are buffered but never deliverable ### StoreAndForward-002 — Messages enqueued with no registered handler are buffered but never deliverable
@@ -220,6 +220,20 @@ public class ScriptAnalysisService
SandboxErrorKind.CompileError, 0, markers); SandboxErrorKind.CompileError, 0, markers);
} }
// Trust-model gate (CentralUI-001): the documented forbidden-API set is
// enforced HERE, before execution — not merely surfaced as an editor hint.
// Without this, a Design-role user could run arbitrary file/process/
// reflection/network code in the central host process.
var trustViolations = EnforceTrustModel(script.GetCompilation());
if (trustViolations.Count > 0)
{
return new SandboxRunResult(false, null, null, "",
"Script blocked by the trust model — it references forbidden APIs "
+ "(System.IO, System.Diagnostics, System.Reflection, System.Net, threading). "
+ "See the highlighted diagnostics.",
SandboxErrorKind.CompileError, 0, trustViolations);
}
var parameters = ConvertJsonParameters(request.Parameters); var parameters = ConvertJsonParameters(request.Parameters);
using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(timeoutSeconds)); using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(timeoutSeconds));
@@ -311,6 +325,13 @@ public class ScriptAnalysisService
throw new ScriptSandboxException( throw new ScriptSandboxException(
$"Scripts.CallShared(\"{name}\") compile failed: {string.Join("; ", nestedErrors.Select(d => d.GetMessage()))}"); $"Scripts.CallShared(\"{name}\") compile failed: {string.Join("; ", nestedErrors.Select(d => d.GetMessage()))}");
// Trust-model gate (CentralUI-001) — a nested shared script runs
// arbitrary code too, so it must clear the same forbidden-API gate.
if (EnforceTrustModel(built.GetCompilation()).Count > 0)
throw new ScriptSandboxException(
$"Scripts.CallShared(\"{name}\") is blocked by the script trust model — "
+ "the shared script references forbidden APIs.");
lock (compileCacheLock) lock (compileCacheLock)
{ {
if (!compileCache.TryGetValue(name, out compiled)) if (!compileCache.TryGetValue(name, out compiled))
@@ -1086,15 +1107,25 @@ public class ScriptAnalysisService
return new(AttributeContextKind.None, null); return new(AttributeContextKind.None, null);
} }
/// <summary>
/// Finds every reference to a forbidden API — the documented script trust model,
/// see <see cref="ForbiddenNamespacePrefixes"/>. Identifiers are resolved against
/// the semantic model, so a forbidden type or member is caught however it is
/// written: bare (<c>File</c>), fully qualified
/// (<c>System.IO.File.WriteAllText</c>), or via an alias — while a user identifier
/// that merely shares a name with a forbidden type (<c>var File = …</c>) does not
/// false-positive. Used both for editor diagnostics and as the pre-execution
/// trust-model gate (see <see cref="EnforceTrustModel"/>).
/// </summary>
private static IEnumerable<DiagnosticMarker> FindForbiddenApiUsages(SyntaxTree tree, SemanticModel model) private static IEnumerable<DiagnosticMarker> FindForbiddenApiUsages(SyntaxTree tree, SemanticModel model)
{ {
var root = tree.GetRoot(); var root = tree.GetRoot();
// Banned using directives — pure namespace string match is fine here. // Banned using directives.
foreach (var u in root.DescendantNodes().OfType<UsingDirectiveSyntax>()) foreach (var u in root.DescendantNodes().OfType<UsingDirectiveSyntax>())
{ {
var name = u.Name?.ToString() ?? ""; var name = u.Name?.ToString() ?? "";
if (ForbiddenNamespacePrefixes.Any(p => name == p || name.StartsWith(p + "."))) if (IsForbiddenName(name))
{ {
var span = u.GetLocation().GetLineSpan().Span; var span = u.GetLocation().GetLineSpan().Span;
yield return new DiagnosticMarker( yield return new DiagnosticMarker(
@@ -1108,20 +1139,14 @@ public class ScriptAnalysisService
} }
} }
// Banned type usages — resolved via the semantic model so a user // Banned type / member references, resolved via the semantic model. Every
// identifier named "File" or "Thread" does NOT trigger the diagnostic // identifier is checked — including the right-hand side of a member access —
// unless it actually resolves to a forbidden type. // so a fully-qualified forbidden call (System.IO.File.WriteAllText) cannot
// slip past by avoiding a `using` directive or a bare type name.
foreach (var ident in root.DescendantNodes().OfType<IdentifierNameSyntax>()) foreach (var ident in root.DescendantNodes().OfType<IdentifierNameSyntax>())
{ {
// Skip the identifier on the right side of a member access — only var forbidden = ForbiddenNameFor(model.GetSymbolInfo(ident).Symbol);
// the leftmost (the type or qualifier) is what we want to check. if (forbidden == null) continue;
if (ident.Parent is MemberAccessExpressionSyntax m && m.Name == ident) continue;
var symbol = model.GetSymbolInfo(ident).Symbol;
if (symbol is not INamedTypeSymbol type) continue;
var ns = type.ContainingNamespace?.ToDisplayString() ?? "";
if (!ForbiddenNamespacePrefixes.Any(p => ns == p || ns.StartsWith(p + "."))) continue;
var span = ident.GetLocation().GetLineSpan().Span; var span = ident.GetLocation().GetLineSpan().Span;
yield return new DiagnosticMarker( yield return new DiagnosticMarker(
@@ -1130,11 +1155,75 @@ public class ScriptAnalysisService
StartColumn: span.Start.Character + 1, StartColumn: span.Start.Character + 1,
EndLineNumber: span.End.Line + 1, EndLineNumber: span.End.Line + 1,
EndColumn: span.End.Character + 1, EndColumn: span.End.Character + 1,
Message: $"Type '{type.Name}' from forbidden namespace '{ns}' is not allowed in scripts.", Message: $"'{ident.Identifier.ValueText}' resolves to forbidden API '{forbidden}', " +
"which is not allowed in scripts (script trust model).",
Code: "SCADA002"); Code: "SCADA002");
} }
} }
/// <summary>
/// The forbidden namespace/type a symbol implicates, or null if it is allowed.
/// Checks the symbol's namespace and — for a type or member — the type's full
/// name, so an entry like <c>System.Threading.Thread</c> bans that exact type
/// while <c>System.Threading</c> (e.g. <c>CancellationToken</c>) stays allowed.
/// </summary>
private static string? ForbiddenNameFor(ISymbol? symbol)
{
if (symbol == null) return null;
foreach (var name in QualifiedNamesOf(symbol))
if (IsForbiddenName(name))
return name;
return null;
}
/// <summary>Fully-qualified names a symbol reference implicates for trust-model checking.</summary>
private static IEnumerable<string> QualifiedNamesOf(ISymbol symbol)
{
switch (symbol)
{
case INamespaceSymbol { IsGlobalNamespace: false } ns:
yield return ns.ToDisplayString();
break;
case ITypeSymbol type:
if (type.ContainingNamespace is { IsGlobalNamespace: false } tn)
yield return tn.ToDisplayString();
yield return FullTypeName(type);
break;
default:
if (symbol.ContainingType is { } ct)
{
if (ct.ContainingNamespace is { IsGlobalNamespace: false } cn)
yield return cn.ToDisplayString();
yield return FullTypeName(ct);
}
break;
}
}
private static string FullTypeName(ITypeSymbol type) =>
type.ContainingNamespace is { IsGlobalNamespace: false } ns
? ns.ToDisplayString() + "." + type.Name
: type.Name;
private static bool IsForbiddenName(string qualifiedName) =>
ForbiddenNamespacePrefixes.Any(p =>
qualifiedName == p || qualifiedName.StartsWith(p + ".", StringComparison.Ordinal));
/// <summary>
/// Pre-execution trust-model gate (CentralUI-001). Returns the forbidden-API
/// markers (SCADA001/SCADA002) for a compiled script; an empty list means the
/// script is clear to run. This is a static semantic gate, not a process
/// sandbox — reflection-based indirection is still out of its reach; full
/// isolation would require running scripts in a separate constrained process.
/// </summary>
private static IReadOnlyList<DiagnosticMarker> EnforceTrustModel(Compilation compilation)
{
var tree = compilation.SyntaxTrees.FirstOrDefault();
if (tree == null) return Array.Empty<DiagnosticMarker>();
var model = compilation.GetSemanticModel(tree);
return FindForbiddenApiUsages(tree, model).ToList();
}
private static CompletionItem ToCompletionItem(ISymbol symbol) private static CompletionItem ToCompletionItem(ISymbol symbol)
{ {
var kind = symbol.Kind switch var kind = symbol.Kind switch
@@ -127,21 +127,37 @@ public class DebugStreamService
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(ct); using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
timeoutCts.CancelAfter(TimeSpan.FromSeconds(30)); timeoutCts.CancelAfter(TimeSpan.FromSeconds(30));
DebugViewSnapshot snapshot;
try try
{ {
var snapshot = await snapshotTcs.Task.WaitAsync(timeoutCts.Token); snapshot = await snapshotTcs.Task.WaitAsync(timeoutCts.Token);
}
catch (Exception ex)
{
// Any failure before the snapshot arrives — the 30s timeout, or the stream
// terminating early (site disconnect / gRPC failure, surfaced by
// onTerminatedWrapper as an InvalidOperationException) — must deterministically
// tear down the bridge actor and its site-side subscription. Use the local
// actor reference: a racing onTerminatedWrapper may already have removed the
// session, which would make StopStream a no-op. StopDebugStream is idempotent
// (the actor may already be stopping itself).
_sessions.TryRemove(sessionId, out _);
bridgeActor.Tell(new StopDebugStream());
if (ex is OperationCanceledException)
throw new TimeoutException(
$"Timed out waiting for debug snapshot from {instanceUniqueName} on site {siteIdentifier}.");
throw new InvalidOperationException(
$"Debug stream for {instanceUniqueName} on site {siteIdentifier} terminated before a snapshot was received.",
ex);
}
_logger.LogInformation("Debug stream {SessionId} started for {Instance} on site {Site}", _logger.LogInformation("Debug stream {SessionId} started for {Instance} on site {Site}",
sessionId, instanceUniqueName, siteIdentifier); sessionId, instanceUniqueName, siteIdentifier);
return new DebugStreamSession(sessionId, snapshot); return new DebugStreamSession(sessionId, snapshot);
} }
catch (OperationCanceledException)
{
StopStream(sessionId);
throw new TimeoutException($"Timed out waiting for debug snapshot from {instanceUniqueName} on site {siteIdentifier}.");
}
}
/// <summary> /// <summary>
/// Stops an active debug stream session. /// Stops an active debug stream session.
@@ -171,6 +171,11 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
case UnsubscribeTagsRequest: case UnsubscribeTagsRequest:
Stash.Stash(); Stash.Stash();
break; break;
case SubscribeCompleted sc:
// A subscribe started while Connected can complete after a transition;
// apply it so its state survives into the next ReSubscribeAll.
HandleSubscribeCompleted(sc);
break;
case GetHealthReport: case GetHealthReport:
ReplyWithHealthReport(); ReplyWithHealthReport();
break; break;
@@ -207,6 +212,9 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
case SubscribeTagsRequest req: case SubscribeTagsRequest req:
HandleSubscribe(req); HandleSubscribe(req);
break; break;
case SubscribeCompleted sc:
HandleSubscribeCompleted(sc);
break;
case UnsubscribeTagsRequest req: case UnsubscribeTagsRequest req:
HandleUnsubscribe(req); HandleUnsubscribe(req);
break; break;
@@ -338,6 +346,11 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
case TagResolutionFailed: case TagResolutionFailed:
// Ignore — stale results from previous connection; ReSubscribeAll runs after reconnect // Ignore — stale results from previous connection; ReSubscribeAll runs after reconnect
break; break;
case SubscribeCompleted sc:
// A subscribe started while Connected can complete after a transition;
// apply it so its state survives into the next ReSubscribeAll.
HandleSubscribeCompleted(sc);
break;
case GetHealthReport: case GetHealthReport:
ReplyWithHealthReport(); ReplyWithHealthReport();
break; break;
@@ -466,18 +479,27 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
if (!_subscriptionsByInstance.ContainsKey(request.InstanceUniqueName)) if (!_subscriptionsByInstance.ContainsKey(request.InstanceUniqueName))
_subscriptionsByInstance[request.InstanceUniqueName] = new HashSet<string>(); _subscriptionsByInstance[request.InstanceUniqueName] = new HashSet<string>();
var instanceTags = _subscriptionsByInstance[request.InstanceUniqueName];
var self = Self; var self = Self;
var sender = Sender; var sender = Sender;
// Snapshot the already-subscribed tag set on the actor thread. The background
// task below must NOT read or mutate actor state — it performs only adapter
// I/O and reports results back via a SubscribeCompleted message, which is
// applied to actor state on the actor thread (see HandleSubscribeCompleted).
var alreadySubscribed = new HashSet<string>(_subscriptionIds.Keys);
Task.Run(async () => Task.Run(async () =>
{ {
var results = new List<SubscribeTagResult>(request.TagPaths.Count);
var tagsToSeed = new List<string>();
foreach (var tagPath in request.TagPaths) foreach (var tagPath in request.TagPaths)
{ {
if (_subscriptionIds.ContainsKey(tagPath)) if (alreadySubscribed.Contains(tagPath))
{ {
// Already subscribed — just track for this instance // Already subscribed by another instance — just track for this one.
instanceTags.Add(tagPath); results.Add(new SubscribeTagResult(tagPath, AlreadySubscribed: true, Success: true, null, null));
tagsToSeed.Add(tagPath);
continue; continue;
} }
@@ -487,27 +509,21 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
{ {
self.Tell(new TagValueReceived(path, value)); self.Tell(new TagValueReceived(path, value));
}); });
_subscriptionIds[tagPath] = subId; results.Add(new SubscribeTagResult(tagPath, AlreadySubscribed: false, Success: true, subId, null));
instanceTags.Add(tagPath); tagsToSeed.Add(tagPath);
_totalSubscribed++;
_resolvedTags++;
} }
catch (Exception ex) catch (Exception ex)
{ {
// WP-12: Tag path resolution failure — mark as unresolved, retry later // WP-12: Tag path resolution failure — reported back as unresolved.
_unresolvedTags.Add(tagPath); results.Add(new SubscribeTagResult(tagPath, AlreadySubscribed: false, Success: false, null, ex.Message));
instanceTags.Add(tagPath);
_totalSubscribed++;
self.Tell(new TagResolutionFailed(tagPath, ex.Message));
} }
} }
// Initial read — seed current values for all resolved tags so the Instance Actor // Initial read — seed current values for resolved tags so the Instance Actor
// doesn't stay Uncertain until the next OPC UA data change notification // doesn't stay Uncertain until the next OPC UA data change notification.
foreach (var tagPath in instanceTags) // Tell is thread-safe, so seeded values are delivered directly as messages.
foreach (var tagPath in tagsToSeed)
{ {
if (_unresolvedTags.Contains(tagPath)) continue;
try try
{ {
var readResult = await _adapter.ReadAsync(tagPath); var readResult = await _adapter.ReadAsync(tagPath);
@@ -522,11 +538,51 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
} }
} }
return new SubscribeTagsResponse( return new SubscribeCompleted(request, sender, results);
request.CorrelationId, request.InstanceUniqueName, true, null, DateTimeOffset.UtcNow); }).PipeTo(self);
}).PipeTo(sender); }
// Start tag resolution retry timer if we have unresolved tags /// <summary>
/// Applies the result of an asynchronous subscribe on the actor thread. ALL mutation
/// of subscription state and counters happens here — never on the background task —
/// so the actor model's single-threaded state guarantee holds.
/// </summary>
private void HandleSubscribeCompleted(SubscribeCompleted msg)
{
var instanceName = msg.Request.InstanceUniqueName;
if (!_subscriptionsByInstance.TryGetValue(instanceName, out var instanceTags))
{
// The instance was unsubscribed while the subscribe I/O was in flight.
instanceTags = new HashSet<string>();
_subscriptionsByInstance[instanceName] = instanceTags;
}
foreach (var result in msg.Results)
{
instanceTags.Add(result.TagPath);
// Re-check against current state: another subscribe may have resolved the
// same tag while this request's I/O was in flight.
if (result.AlreadySubscribed || _subscriptionIds.ContainsKey(result.TagPath))
continue;
if (result.Success)
{
_subscriptionIds[result.TagPath] = result.SubscriptionId!;
_totalSubscribed++;
_resolvedTags++;
}
else
{
// WP-12: mark unresolved so the periodic retry timer picks it up.
_unresolvedTags.Add(result.TagPath);
_totalSubscribed++;
_log.Debug("[{0}] Tag resolution failed for {1}: {2}",
_connectionName, result.TagPath, result.Error);
}
}
// Start the tag-resolution retry timer if any tags are unresolved.
if (_unresolvedTags.Count > 0) if (_unresolvedTags.Count > 0)
{ {
Timers.StartPeriodicTimer( Timers.StartPeriodicTimer(
@@ -535,6 +591,9 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
_options.TagResolutionRetryInterval, _options.TagResolutionRetryInterval,
_options.TagResolutionRetryInterval); _options.TagResolutionRetryInterval);
} }
msg.ReplyTo.Tell(new SubscribeTagsResponse(
msg.Request.CorrelationId, instanceName, true, null, DateTimeOffset.UtcNow));
} }
private void HandleUnsubscribe(UnsubscribeTagsRequest request) private void HandleUnsubscribe(UnsubscribeTagsRequest request)
@@ -764,5 +823,9 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
internal record TagResolutionFailed(string TagPath, string Error); internal record TagResolutionFailed(string TagPath, string Error);
internal record TagResolutionSucceeded(string TagPath, string SubscriptionId); internal record TagResolutionSucceeded(string TagPath, string SubscriptionId);
internal record RetryTagResolution; internal record RetryTagResolution;
internal record SubscribeTagResult(
string TagPath, bool AlreadySubscribed, bool Success, string? SubscriptionId, string? Error);
internal record SubscribeCompleted(
SubscribeTagsRequest Request, IActorRef ReplyTo, IReadOnlyList<SubscribeTagResult> Results);
public record GetHealthReport; public record GetHealthReport;
} }
@@ -87,6 +87,64 @@ public class DatabaseGateway : IDatabaseGateway
definition.RetryDelay > TimeSpan.Zero ? definition.RetryDelay : null); definition.RetryDelay > TimeSpan.Zero ? definition.RetryDelay : null);
} }
/// <summary>
/// WP-9/10: Delivers a buffered CachedDbWrite during a store-and-forward retry
/// sweep — executes the SQL against the named connection. Returns true on
/// success, false if the connection no longer exists (the message is parked);
/// throws on any execution error so the engine retries.
/// </summary>
public async Task<bool> DeliverBufferedAsync(
StoreAndForwardMessage message, CancellationToken cancellationToken = default)
{
var payload = JsonSerializer.Deserialize<CachedWritePayload>(message.PayloadJson);
if (payload == null || string.IsNullOrEmpty(payload.ConnectionName) || string.IsNullOrEmpty(payload.Sql))
{
_logger.LogError("Buffered CachedDbWrite message {Id} has an unreadable payload; parking.", message.Id);
return false;
}
var definition = await ResolveConnectionAsync(payload.ConnectionName, cancellationToken);
if (definition == null)
{
_logger.LogError(
"Buffered DB write to '{Connection}' cannot be delivered — the connection no longer exists; parking.",
payload.ConnectionName);
return false;
}
await using var connection = new SqlConnection(definition.ConnectionString);
await connection.OpenAsync(cancellationToken);
using var command = connection.CreateCommand();
command.CommandText = payload.Sql;
if (payload.Parameters != null)
{
foreach (var (key, value) in payload.Parameters)
{
var parameter = command.CreateParameter();
parameter.ParameterName = key.StartsWith('@') ? key : "@" + key;
parameter.Value = JsonElementToParameterValue(value);
command.Parameters.Add(parameter);
}
}
await command.ExecuteNonQueryAsync(cancellationToken);
return true;
}
private static object JsonElementToParameterValue(JsonElement element) => element.ValueKind switch
{
JsonValueKind.String => (object?)element.GetString() ?? DBNull.Value,
JsonValueKind.Number => element.TryGetInt64(out var l) ? l : element.GetDouble(),
JsonValueKind.True => true,
JsonValueKind.False => false,
JsonValueKind.Null or JsonValueKind.Undefined => DBNull.Value,
_ => element.GetRawText()
};
private sealed record CachedWritePayload(
string ConnectionName,
string Sql,
Dictionary<string, JsonElement>? Parameters);
private async Task<DatabaseConnectionDefinition?> ResolveConnectionAsync( private async Task<DatabaseConnectionDefinition?> ResolveConnectionAsync(
string connectionName, string connectionName,
CancellationToken cancellationToken) CancellationToken cancellationToken)
@@ -106,18 +106,67 @@ public class ExternalSystemClient : IExternalSystemClient
Parameters = parameters Parameters = parameters
}); });
var sfResult = await _storeAndForward.EnqueueAsync( // attemptImmediateDelivery: false — this method already made the HTTP
// attempt above; letting EnqueueAsync re-invoke the handler would
// dispatch the same request a second time.
await _storeAndForward.EnqueueAsync(
StoreAndForwardCategory.ExternalSystem, StoreAndForwardCategory.ExternalSystem,
systemName, systemName,
payload, payload,
originInstanceName, originInstanceName,
system.MaxRetries > 0 ? system.MaxRetries : null, system.MaxRetries > 0 ? system.MaxRetries : null,
system.RetryDelay > TimeSpan.Zero ? system.RetryDelay : null); system.RetryDelay > TimeSpan.Zero ? system.RetryDelay : null,
attemptImmediateDelivery: false);
return new ExternalCallResult(true, null, null, WasBuffered: true); return new ExternalCallResult(true, null, null, WasBuffered: true);
} }
} }
/// <summary>
/// WP-7/10: Delivers a buffered ExternalSystem call during a store-and-forward
/// retry sweep. Returns true on success, false on permanent failure (the message
/// is parked); throws <see cref="TransientExternalSystemException"/> on a
/// transient failure so the engine retries.
/// </summary>
public async Task<bool> DeliverBufferedAsync(
StoreAndForwardMessage message, CancellationToken cancellationToken = default)
{
var payload = JsonSerializer.Deserialize<CachedCallPayload>(message.PayloadJson);
if (payload == null || string.IsNullOrEmpty(payload.SystemName) || string.IsNullOrEmpty(payload.MethodName))
{
_logger.LogError("Buffered ExternalSystem message {Id} has an unreadable payload; parking.", message.Id);
return false;
}
var (system, method) = await ResolveSystemAndMethodAsync(
payload.SystemName, payload.MethodName, cancellationToken);
if (system == null || method == null)
{
_logger.LogError(
"Buffered call to '{System}'/'{Method}' cannot be delivered — the system or method no longer exists; parking.",
payload.SystemName, payload.MethodName);
return false;
}
var parameters = payload.Parameters?.ToDictionary(kv => kv.Key, kv => (object?)kv.Value);
try
{
await InvokeHttpAsync(system, method, parameters, cancellationToken);
return true;
}
catch (PermanentExternalSystemException ex)
{
_logger.LogError(ex, "Buffered call to '{System}' failed permanently; parking.", payload.SystemName);
return false;
}
// TransientExternalSystemException propagates — the S&F engine retries.
}
private sealed record CachedCallPayload(
string SystemName,
string MethodName,
Dictionary<string, JsonElement>? Parameters);
/// <summary> /// <summary>
/// WP-6: Executes the HTTP request against the external system. /// WP-6: Executes the HTTP request against the external system.
/// </summary> /// </summary>
@@ -344,6 +344,42 @@ akka {{
// any actor or HTTP handler touches the service. // any actor or HTTP handler touches the service.
storeAndForwardService.StartAsync().GetAwaiter().GetResult(); storeAndForwardService.StartAsync().GetAwaiter().GetResult();
// Register the store-and-forward delivery handlers so buffered
// ExternalSystem calls, cached DB writes and notifications are actually
// delivered by the retry sweep. Without this, every buffered message is
// persisted but never delivered. Each handler resolves its scoped consumer
// service in a fresh DI scope — the sweep runs on a timer thread, outside
// any request scope.
storeAndForwardService.RegisterDeliveryHandler(
ScadaLink.Commons.Types.Enums.StoreAndForwardCategory.ExternalSystem,
async msg =>
{
using var scope = _serviceProvider.CreateScope();
return await scope.ServiceProvider
.GetRequiredService<ScadaLink.ExternalSystemGateway.ExternalSystemClient>()
.DeliverBufferedAsync(msg);
});
storeAndForwardService.RegisterDeliveryHandler(
ScadaLink.Commons.Types.Enums.StoreAndForwardCategory.CachedDbWrite,
async msg =>
{
using var scope = _serviceProvider.CreateScope();
return await scope.ServiceProvider
.GetRequiredService<ScadaLink.ExternalSystemGateway.DatabaseGateway>()
.DeliverBufferedAsync(msg);
});
storeAndForwardService.RegisterDeliveryHandler(
ScadaLink.Commons.Types.Enums.StoreAndForwardCategory.Notification,
async msg =>
{
using var scope = _serviceProvider.CreateScope();
return await scope.ServiceProvider
.GetRequiredService<ScadaLink.NotificationService.NotificationDeliveryService>()
.DeliverBufferedAsync(msg);
});
_logger.LogInformation(
"Store-and-forward delivery handlers registered (ExternalSystem, CachedDbWrite, Notification)");
var parkedMessageHandler = _actorSystem.ActorOf( var parkedMessageHandler = _actorSystem.ActorOf(
Props.Create(() => new ParkedMessageHandlerActor( Props.Create(() => new ParkedMessageHandlerActor(
storeAndForwardService, _nodeOptions.SiteId!)), storeAndForwardService, _nodeOptions.SiteId!)),
@@ -93,18 +93,75 @@ public class NotificationDeliveryService : INotificationDeliveryService
Message = message Message = message
}); });
// attemptImmediateDelivery: false — DeliverAsync was already attempted
// above; letting EnqueueAsync re-invoke the handler would send twice.
await _storeAndForward.EnqueueAsync( await _storeAndForward.EnqueueAsync(
StoreAndForwardCategory.Notification, StoreAndForwardCategory.Notification,
listName, listName,
payload, payload,
originInstanceName, originInstanceName,
smtpConfig.MaxRetries > 0 ? smtpConfig.MaxRetries : null, smtpConfig.MaxRetries > 0 ? smtpConfig.MaxRetries : null,
smtpConfig.RetryDelay > TimeSpan.Zero ? smtpConfig.RetryDelay : null); smtpConfig.RetryDelay > TimeSpan.Zero ? smtpConfig.RetryDelay : null,
attemptImmediateDelivery: false);
return new NotificationResult(true, null, WasBuffered: true); return new NotificationResult(true, null, WasBuffered: true);
} }
} }
/// <summary>
/// WP-11/12: Delivers a buffered notification during a store-and-forward retry
/// sweep — re-resolves the list, recipients and SMTP config and re-attempts
/// delivery. Returns true on success, false on permanent failure (the message
/// is parked); throws on a transient failure so the engine retries.
/// </summary>
public async Task<bool> DeliverBufferedAsync(
StoreAndForwardMessage message, CancellationToken cancellationToken = default)
{
var payload = JsonSerializer.Deserialize<BufferedNotification>(message.PayloadJson);
if (payload == null || string.IsNullOrEmpty(payload.ListName))
{
_logger.LogError("Buffered notification message {Id} has an unreadable payload; parking.", message.Id);
return false;
}
var list = await _repository.GetListByNameAsync(payload.ListName, cancellationToken);
if (list == null)
{
_logger.LogError(
"Buffered notification to list '{List}' cannot be delivered — the list no longer exists; parking.",
payload.ListName);
return false;
}
var recipients = await _repository.GetRecipientsByListIdAsync(list.Id, cancellationToken);
if (recipients.Count == 0)
{
_logger.LogError("Buffered notification to list '{List}' has no recipients; parking.", payload.ListName);
return false;
}
var smtpConfig = (await _repository.GetAllSmtpConfigurationsAsync(cancellationToken)).FirstOrDefault();
if (smtpConfig == null)
{
_logger.LogError("Buffered notification cannot be delivered — no SMTP configuration available; parking.");
return false;
}
try
{
await DeliverAsync(smtpConfig, recipients, payload.Subject, payload.Message, cancellationToken);
return true;
}
catch (SmtpPermanentException ex)
{
_logger.LogError(ex, "Buffered notification to list '{List}' failed permanently; parking.", payload.ListName);
return false;
}
// Transient SMTP errors propagate out of DeliverAsync — the S&F engine retries.
}
private sealed record BufferedNotification(string ListName, string Subject, string Message);
/// <summary> /// <summary>
/// Delivers an email via SMTP. Throws on failure. /// Delivers an email via SMTP. Throws on failure.
/// </summary> /// </summary>
@@ -22,7 +22,8 @@ public static class ServiceCollectionExtensions
var storage = sp.GetRequiredService<StoreAndForwardStorage>(); var storage = sp.GetRequiredService<StoreAndForwardStorage>();
var options = sp.GetRequiredService<IOptions<StoreAndForwardOptions>>().Value; var options = sp.GetRequiredService<IOptions<StoreAndForwardOptions>>().Value;
var logger = sp.GetRequiredService<ILogger<StoreAndForwardService>>(); var logger = sp.GetRequiredService<ILogger<StoreAndForwardService>>();
return new StoreAndForwardService(storage, options, logger); var replication = sp.GetRequiredService<ReplicationService>();
return new StoreAndForwardService(storage, options, logger, replication);
}); });
services.AddSingleton<ReplicationService>(sp => services.AddSingleton<ReplicationService>(sp =>
@@ -30,6 +30,7 @@ public class StoreAndForwardService
{ {
private readonly StoreAndForwardStorage _storage; private readonly StoreAndForwardStorage _storage;
private readonly StoreAndForwardOptions _options; private readonly StoreAndForwardOptions _options;
private readonly ReplicationService? _replication;
private readonly ILogger<StoreAndForwardService> _logger; private readonly ILogger<StoreAndForwardService> _logger;
private Timer? _retryTimer; private Timer? _retryTimer;
private int _retryInProgress; private int _retryInProgress;
@@ -48,11 +49,13 @@ public class StoreAndForwardService
public StoreAndForwardService( public StoreAndForwardService(
StoreAndForwardStorage storage, StoreAndForwardStorage storage,
StoreAndForwardOptions options, StoreAndForwardOptions options,
ILogger<StoreAndForwardService> logger) ILogger<StoreAndForwardService> logger,
ReplicationService? replication = null)
{ {
_storage = storage; _storage = storage;
_options = options; _options = options;
_logger = logger; _logger = logger;
_replication = replication;
} }
/// <summary> /// <summary>
@@ -109,7 +112,8 @@ public class StoreAndForwardService
string payloadJson, string payloadJson,
string? originInstanceName = null, string? originInstanceName = null,
int? maxRetries = null, int? maxRetries = null,
TimeSpan? retryInterval = null) TimeSpan? retryInterval = null,
bool attemptImmediateDelivery = true)
{ {
var message = new StoreAndForwardMessage var message = new StoreAndForwardMessage
{ {
@@ -125,8 +129,10 @@ public class StoreAndForwardService
OriginInstanceName = originInstanceName OriginInstanceName = originInstanceName
}; };
// Attempt immediate delivery // Attempt immediate delivery — unless the caller has already made a
if (_deliveryHandlers.TryGetValue(category, out var handler)) // delivery attempt of its own (attemptImmediateDelivery: false). In that
// case re-invoking the handler here would dispatch the request twice.
if (attemptImmediateDelivery && _deliveryHandlers.TryGetValue(category, out var handler))
{ {
try try
{ {
@@ -136,12 +142,10 @@ public class StoreAndForwardService
RaiseActivity("Delivered", category, $"Immediate delivery to {target}"); RaiseActivity("Delivered", category, $"Immediate delivery to {target}");
return new StoreAndForwardResult(true, message.Id, false); return new StoreAndForwardResult(true, message.Id, false);
} }
else
{
// Permanent failure — do not buffer // Permanent failure — do not buffer
return new StoreAndForwardResult(false, message.Id, false); return new StoreAndForwardResult(false, message.Id, false);
} }
}
catch (Exception ex) catch (Exception ex)
{ {
// Transient failure — buffer for retry // Transient failure — buffer for retry
@@ -152,19 +156,39 @@ public class StoreAndForwardService
message.LastAttemptAt = DateTimeOffset.UtcNow; message.LastAttemptAt = DateTimeOffset.UtcNow;
message.RetryCount = 1; message.RetryCount = 1;
message.LastError = ex.Message; message.LastError = ex.Message;
await _storage.EnqueueAsync(message); await BufferAsync(message);
RaiseActivity("Queued", category, $"Buffered for retry: {target} ({ex.Message})"); RaiseActivity("Queued", category, $"Buffered for retry: {target} ({ex.Message})");
return new StoreAndForwardResult(true, message.Id, true); return new StoreAndForwardResult(true, message.Id, true);
} }
} }
// No handler registered — buffer for later // Either no handler is registered yet, or the caller already attempted
await _storage.EnqueueAsync(message); // delivery itself — buffer for the background retry sweep to deliver.
RaiseActivity("Queued", category, $"No handler registered, buffered: {target}"); if (!attemptImmediateDelivery)
{
// The caller made (and failed) one attempt before handing the
// message over, so it counts as the first retry.
message.RetryCount = 1;
message.LastAttemptAt = DateTimeOffset.UtcNow;
}
await BufferAsync(message);
RaiseActivity("Queued", category, attemptImmediateDelivery
? $"No handler registered, buffered: {target}"
: $"Buffered for retry: {target}");
return new StoreAndForwardResult(true, message.Id, true); return new StoreAndForwardResult(true, message.Id, true);
} }
/// <summary>
/// Persists a message to the local SQLite buffer and (WP-11) replicates the
/// add to the standby node so a failover does not lose the buffered message.
/// </summary>
private async Task BufferAsync(StoreAndForwardMessage message)
{
await _storage.EnqueueAsync(message);
_replication?.ReplicateEnqueue(message);
}
/// <summary> /// <summary>
/// WP-10: Background retry sweep. Processes all pending messages that are due for retry. /// WP-10: Background retry sweep. Processes all pending messages that are due for retry.
/// </summary> /// </summary>
@@ -210,6 +234,7 @@ public class StoreAndForwardService
if (success) if (success)
{ {
await _storage.RemoveMessageAsync(message.Id); await _storage.RemoveMessageAsync(message.Id);
_replication?.ReplicateRemove(message.Id);
RaiseActivity("Delivered", message.Category, RaiseActivity("Delivered", message.Category,
$"Delivered to {message.Target} after {message.RetryCount} retries"); $"Delivered to {message.Target} after {message.RetryCount} retries");
return; return;
@@ -220,6 +245,7 @@ public class StoreAndForwardService
message.LastAttemptAt = DateTimeOffset.UtcNow; message.LastAttemptAt = DateTimeOffset.UtcNow;
message.LastError = "Permanent failure (handler returned false)"; message.LastError = "Permanent failure (handler returned false)";
await _storage.UpdateMessageAsync(message); await _storage.UpdateMessageAsync(message);
_replication?.ReplicatePark(message);
RaiseActivity("Parked", message.Category, RaiseActivity("Parked", message.Category,
$"Permanent failure for {message.Target}: handler returned false"); $"Permanent failure for {message.Target}: handler returned false");
} }
@@ -234,6 +260,7 @@ public class StoreAndForwardService
{ {
message.Status = StoreAndForwardMessageStatus.Parked; message.Status = StoreAndForwardMessageStatus.Parked;
await _storage.UpdateMessageAsync(message); await _storage.UpdateMessageAsync(message);
_replication?.ReplicatePark(message);
RaiseActivity("Parked", message.Category, RaiseActivity("Parked", message.Category,
$"Max retries ({message.MaxRetries}) reached for {message.Target}"); $"Max retries ({message.MaxRetries}) reached for {message.Target}");
_logger.LogWarning( _logger.LogWarning(
@@ -405,4 +405,64 @@ public class ScriptAnalysisServiceTests
Assert.Contains("name", resp.Markdown); Assert.Contains("name", resp.Markdown);
Assert.Contains("String", resp.Markdown); Assert.Contains("String", resp.Markdown);
} }
// ── CentralUI-001: trust-model gate before sandbox execution ──────────
[Fact]
public void Diagnose_FullyQualifiedForbiddenCall_RaisesSCADA002()
{
// A forbidden API reached by fully-qualified name (no `using`, no bare
// type identifier) must still be flagged — the pre-fix semantic check
// only inspected the leftmost identifier and missed this shape.
var resp = _svc.Diagnose(new DiagnoseRequest(
"var d = System.IO.Directory.GetCurrentDirectory(); return d;"));
Assert.Contains(resp.Markers, m => m.Code == "SCADA002");
}
[Fact]
public async Task RunInSandbox_FullyQualifiedForbiddenApi_IsBlockedBeforeExecution()
{
// Regression test for CentralUI-001. RunInSandboxAsync used to execute any
// script that compiled, with no trust-model enforcement — so fully-qualified
// forbidden API code ran in the central host process. The fix gates execution
// on the forbidden-API analysis.
var result = await _svc.RunInSandboxAsync(
new SandboxRunRequest(
"var d = System.IO.Directory.GetCurrentDirectory(); return d;",
Parameters: null,
TimeoutSeconds: null),
CancellationToken.None);
Assert.False(result.Success);
Assert.Equal(SandboxErrorKind.CompileError, result.ErrorKind);
Assert.Contains("trust model", result.Error);
Assert.NotNull(result.Markers);
Assert.Contains(result.Markers!, m => m.Code is "SCADA001" or "SCADA002");
}
[Fact]
public async Task RunInSandbox_ForbiddenUsingDirective_IsBlockedBeforeExecution()
{
var result = await _svc.RunInSandboxAsync(
new SandboxRunRequest(
"using System.Diagnostics; var p = Process.GetCurrentProcess().Id; return p;",
Parameters: null,
TimeoutSeconds: null),
CancellationToken.None);
Assert.False(result.Success);
Assert.Equal(SandboxErrorKind.CompileError, result.ErrorKind);
}
[Fact]
public async Task RunInSandbox_CleanScript_StillRuns()
{
// The gate must not block a script that stays within the allowed surface.
var result = await _svc.RunInSandboxAsync(
new SandboxRunRequest("return 21 * 2;", Parameters: null, TimeoutSeconds: null),
CancellationToken.None);
Assert.True(result.Success);
Assert.Equal("42", result.ReturnValueJson);
}
} }
@@ -0,0 +1,77 @@
using Akka.Actor;
using Akka.TestKit.Xunit2;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using NSubstitute;
using ScadaLink.Commons.Entities.Instances;
using ScadaLink.Commons.Entities.Sites;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Communication;
using ScadaLink.Communication.Actors;
using ScadaLink.Communication.Grpc;
namespace ScadaLink.Communication.Tests;
/// <summary>
/// Tests for DebugStreamService session lifecycle.
/// </summary>
public class DebugStreamServiceTests : TestKit
{
[Fact]
public async Task StartStreamAsync_StreamTerminatesBeforeSnapshot_ThrowsMeaningfulException()
{
// Regression test for Communication-001. When the debug stream terminates before
// the initial snapshot arrives, StartStreamAsync used to let the raw
// InvalidOperationException from onTerminatedWrapper escape its
// OperationCanceledException-only catch — the caller saw an untranslated exception
// and the failure path did not deterministically tear the bridge actor down.
// The fix catches any failure, tells the bridge actor StopDebugStream, and throws
// a descriptive exception that names the instance and wraps the underlying cause.
var instance = new Instance("Site1.Pump01") { Id = 7, SiteId = 3 };
var site = new Site("Site One", "site-1")
{
Id = 3,
GrpcNodeAAddress = "http://localhost:5100",
GrpcNodeBAddress = "http://localhost:5200"
};
var instanceRepo = Substitute.For<ITemplateEngineRepository>();
instanceRepo.GetInstanceByIdAsync(7, Arg.Any<CancellationToken>()).Returns(instance);
var siteRepo = Substitute.For<ISiteRepository>();
siteRepo.GetSiteByIdAsync(3, Arg.Any<CancellationToken>()).Returns(site);
var services = new ServiceCollection();
services.AddScoped(_ => instanceRepo);
services.AddScoped(_ => siteRepo);
using var provider = services.BuildServiceProvider();
var commProbe = CreateTestProbe();
var commService = new CommunicationService(
Options.Create(new CommunicationOptions()),
NullLogger<CommunicationService>.Instance);
commService.SetCommunicationActor(commProbe.Ref);
using var grpcFactory = new SiteStreamGrpcClientFactory(NullLoggerFactory.Instance);
var service = new DebugStreamService(
commService, provider, grpcFactory, NullLogger<DebugStreamService>.Instance);
service.SetActorSystem(Sys);
// Act — start the stream; it blocks awaiting the initial snapshot.
var startTask = service.StartStreamAsync(instanceId: 7, onEvent: _ => { }, onTerminated: () => { });
// The bridge actor's PreStart sends SubscribeDebugViewRequest to the comm actor;
// the envelope's sender is the bridge actor itself.
commProbe.ExpectMsg<SiteEnvelope>(TimeSpan.FromSeconds(5));
var bridgeActor = commProbe.LastSender;
// Simulate the site terminating the stream before any snapshot is delivered.
bridgeActor.Tell(new DebugStreamTerminated("site-1", "corr"));
// Assert — a descriptive exception that names the instance and wraps the cause,
// not the raw "terminated before snapshot received" InvalidOperationException.
var ex = await Assert.ThrowsAsync<InvalidOperationException>(() => startTask);
Assert.Contains("Site1.Pump01", ex.Message);
Assert.NotNull(ex.InnerException);
}
}
@@ -458,4 +458,87 @@ public class DataConnectionActorTests : TestKit
await backupAdapter.Received().SubscribeAsync( await backupAdapter.Received().SubscribeAsync(
"sensor/temp", Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>()); "sensor/temp", Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>());
} }
// ── DataConnectionLayer-001: subscribe must not mutate actor state off-thread ──
private static async Task<string> DelayedSubscribeAsync()
{
// A short delay so concurrent subscribe background tasks pile up and their
// post-await state mutations would race under the pre-fix implementation.
await Task.Delay(1);
return "sub-" + Guid.NewGuid().ToString("N");
}
[Fact]
public async Task DCL001_ConcurrentSubscribes_DoNotCorruptSubscriptionCounters()
{
// Regression test for DataConnectionLayer-001. HandleSubscribe used to mutate
// actor state (_subscriptionIds, _totalSubscribed, _resolvedTags, the per-instance
// HashSet) from a Task.Run background thread. Many concurrent subscribes then race
// on non-thread-safe Dictionary/HashSet and on non-atomic int++ — losing increments
// or throwing. After the fix every mutation is applied on the actor thread via a
// SubscribeCompleted message, so the final counts are exact.
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
_mockAdapter.Status.Returns(ConnectionHealth.Connected);
_mockAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
.Returns(_ => DelayedSubscribeAsync());
_mockAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
.Returns(new ReadResult(false, null, null));
var actor = CreateConnectionActor("dcl001-concurrent");
await Task.Delay(300); // reach Connected state
const int instances = 30;
const int tagsPerInstance = 30;
for (var i = 0; i < instances; i++)
{
var tags = Enumerable.Range(0, tagsPerInstance)
.Select(j => $"inst{i}/tag{j}")
.ToArray();
actor.Tell(new SubscribeTagsRequest(
$"corr{i}", $"inst{i}", "dcl001-concurrent", tags, DateTimeOffset.UtcNow));
}
// Every subscribe must be acknowledged.
for (var i = 0; i < instances; i++)
ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(15));
actor.Tell(new DataConnectionActor.GetHealthReport());
var report = ExpectMsg<DataConnectionHealthReport>(TimeSpan.FromSeconds(5));
// Every tag is distinct, so each is a fresh, resolved subscription.
Assert.Equal(instances * tagsPerInstance, report.TotalSubscribedTags);
Assert.Equal(instances * tagsPerInstance, report.ResolvedTags);
}
[Fact]
public async Task DCL001_SubscribeWithFailedTags_CountsResolvedAndUnresolvedSeparately()
{
// Behavioural guard: the restructured subscribe must preserve the original
// accounting — failed tags count toward TotalSubscribed but not ResolvedTags.
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
_mockAdapter.Status.Returns(ConnectionHealth.Connected);
_mockAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
.Returns(ci => ((string)ci[0]).StartsWith("bad")
? Task.FromException<string>(new Exception("tag not found"))
: Task.FromResult("sub-ok"));
_mockAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
.Returns(new ReadResult(false, null, null));
var actor = CreateConnectionActor("dcl001-failed-tags");
await Task.Delay(300);
actor.Tell(new SubscribeTagsRequest(
"c1", "inst1", "dcl001-failed-tags",
["good/a", "good/b", "good/c", "bad/x", "bad/y"], DateTimeOffset.UtcNow));
ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(5));
actor.Tell(new DataConnectionActor.GetHealthReport());
var report = ExpectMsg<DataConnectionHealthReport>(TimeSpan.FromSeconds(3));
Assert.Equal(5, report.TotalSubscribedTags); // all 5 tags tracked
Assert.Equal(3, report.ResolvedTags); // only the 3 good ones resolved
}
} }
@@ -53,4 +53,26 @@ public class DatabaseGatewayTests
await Assert.ThrowsAsync<InvalidOperationException>( await Assert.ThrowsAsync<InvalidOperationException>(
() => gateway.CachedWriteAsync("nonexistent", "INSERT INTO t VALUES (1)")); () => gateway.CachedWriteAsync("nonexistent", "INSERT INTO t VALUES (1)"));
} }
// ── ExternalSystemGateway-001: buffered CachedDbWrite delivery handler ──
[Fact]
public async Task DeliverBuffered_ConnectionNoLongerExists_ReturnsFalseSoMessageParks()
{
_repository.GetAllDatabaseConnectionsAsync().Returns(new List<DatabaseConnectionDefinition>());
var gateway = new DatabaseGateway(_repository, NullLogger<DatabaseGateway>.Instance);
var message = new ScadaLink.StoreAndForward.StoreAndForwardMessage
{
Id = Guid.NewGuid().ToString("N"),
Category = ScadaLink.Commons.Types.Enums.StoreAndForwardCategory.CachedDbWrite,
Target = "gone-db",
PayloadJson =
"""{"ConnectionName":"gone-db","Sql":"INSERT INTO t VALUES (1)","Parameters":null}""",
};
var delivered = await gateway.DeliverBufferedAsync(message);
Assert.False(delivered); // permanent — the S&F engine parks the message
}
} }
@@ -153,6 +153,69 @@ public class ExternalSystemClientTests
Assert.False(result.WasBuffered); Assert.False(result.WasBuffered);
} }
// ── ExternalSystemGateway-001: buffered-call delivery handler ──
private static ScadaLink.StoreAndForward.StoreAndForwardMessage BufferedCall(
string systemName, string methodName) =>
new()
{
Id = Guid.NewGuid().ToString("N"),
Category = ScadaLink.Commons.Types.Enums.StoreAndForwardCategory.ExternalSystem,
Target = systemName,
PayloadJson =
$$"""{"SystemName":"{{systemName}}","MethodName":"{{methodName}}","Parameters":null}""",
};
[Fact]
public async Task DeliverBuffered_SuccessfulHttp_ReturnsTrue()
{
var system = new ExternalSystemDefinition("TestAPI", "https://api.example.com", "none") { Id = 1 };
var method = new ExternalSystemMethod("getData", "GET", "/data") { Id = 1, ExternalSystemDefinitionId = 1 };
_repository.GetAllExternalSystemsAsync().Returns(new List<ExternalSystemDefinition> { system });
_repository.GetMethodsByExternalSystemIdAsync(1).Returns(new List<ExternalSystemMethod> { method });
var httpClient = new HttpClient(new MockHttpMessageHandler(HttpStatusCode.OK, "{\"ok\":true}"));
_httpClientFactory.CreateClient(Arg.Any<string>()).Returns(httpClient);
var client = new ExternalSystemClient(
_httpClientFactory, _repository, NullLogger<ExternalSystemClient>.Instance);
var delivered = await client.DeliverBufferedAsync(BufferedCall("TestAPI", "getData"));
Assert.True(delivered);
}
[Fact]
public async Task DeliverBuffered_SystemNoLongerExists_ReturnsFalseSoMessageParks()
{
_repository.GetAllExternalSystemsAsync().Returns(new List<ExternalSystemDefinition>());
var client = new ExternalSystemClient(
_httpClientFactory, _repository, NullLogger<ExternalSystemClient>.Instance);
var delivered = await client.DeliverBufferedAsync(BufferedCall("GoneAPI", "method"));
Assert.False(delivered); // permanent — the S&F engine parks the message
}
[Fact]
public async Task DeliverBuffered_Transient500_ThrowsSoEngineRetries()
{
var system = new ExternalSystemDefinition("TestAPI", "https://api.example.com", "none") { Id = 1 };
var method = new ExternalSystemMethod("failMethod", "POST", "/fail") { Id = 1, ExternalSystemDefinitionId = 1 };
_repository.GetAllExternalSystemsAsync().Returns(new List<ExternalSystemDefinition> { system });
_repository.GetMethodsByExternalSystemIdAsync(1).Returns(new List<ExternalSystemMethod> { method });
var httpClient = new HttpClient(new MockHttpMessageHandler(HttpStatusCode.InternalServerError, "boom"));
_httpClientFactory.CreateClient(Arg.Any<string>()).Returns(httpClient);
var client = new ExternalSystemClient(
_httpClientFactory, _repository, NullLogger<ExternalSystemClient>.Instance);
await Assert.ThrowsAsync<TransientExternalSystemException>(
() => client.DeliverBufferedAsync(BufferedCall("TestAPI", "failMethod")));
}
/// <summary> /// <summary>
/// Test helper: mock HTTP message handler. /// Test helper: mock HTTP message handler.
/// </summary> /// </summary>
@@ -192,4 +192,37 @@ public class NotificationDeliveryServiceTests
Assert.True(result.Success); Assert.True(result.Success);
Assert.True(result.WasBuffered); Assert.True(result.WasBuffered);
} }
// ── NotificationService-001: buffered-notification delivery handler ──
private static StoreAndForward.StoreAndForwardMessage BufferedNotification(string listName) =>
new()
{
Id = Guid.NewGuid().ToString("N"),
Category = ScadaLink.Commons.Types.Enums.StoreAndForwardCategory.Notification,
Target = listName,
PayloadJson = $$"""{"ListName":"{{listName}}","Subject":"Alert","Message":"Body"}""",
};
[Fact]
public async Task DeliverBuffered_HappyPath_ReturnsTrue()
{
SetupHappyPath();
var service = CreateService();
var delivered = await service.DeliverBufferedAsync(BufferedNotification("ops-team"));
Assert.True(delivered);
}
[Fact]
public async Task DeliverBuffered_ListNoLongerExists_ReturnsFalseSoMessageParks()
{
_repository.GetListByNameAsync("gone-list").Returns((NotificationList?)null);
var service = CreateService();
var delivered = await service.DeliverBufferedAsync(BufferedNotification("gone-list"));
Assert.False(delivered); // permanent — the S&F engine parks the message
}
} }
@@ -0,0 +1,108 @@
using Microsoft.Data.Sqlite;
using Microsoft.Extensions.Logging.Abstractions;
using ScadaLink.Commons.Types.Enums;
namespace ScadaLink.StoreAndForward.Tests;
/// <summary>
/// StoreAndForward-001: the active node must forward every buffer operation
/// (add / remove / park) to the standby via the ReplicationService, so a
/// failover does not lose the buffer.
/// </summary>
public class StoreAndForwardReplicationTests : IAsyncLifetime, IDisposable
{
private readonly SqliteConnection _keepAlive;
private readonly StoreAndForwardStorage _storage;
private readonly StoreAndForwardService _service;
private readonly List<ReplicationOperation> _replicated = new();
public StoreAndForwardReplicationTests()
{
var connStr = $"Data Source=ReplTests_{Guid.NewGuid():N};Mode=Memory;Cache=Shared";
_keepAlive = new SqliteConnection(connStr);
_keepAlive.Open();
_storage = new StoreAndForwardStorage(connStr, NullLogger<StoreAndForwardStorage>.Instance);
var options = new StoreAndForwardOptions
{
DefaultRetryInterval = TimeSpan.Zero,
DefaultMaxRetries = 1,
RetryTimerInterval = TimeSpan.FromMinutes(10),
ReplicationEnabled = true,
};
var replication = new ReplicationService(options, NullLogger<ReplicationService>.Instance);
replication.SetReplicationHandler(op =>
{
lock (_replicated) _replicated.Add(op);
return Task.CompletedTask;
});
_service = new StoreAndForwardService(
_storage, options, NullLogger<StoreAndForwardService>.Instance, replication);
}
public async Task InitializeAsync() => await _storage.InitializeAsync();
public Task DisposeAsync() => Task.CompletedTask;
public void Dispose() => _keepAlive.Dispose();
/// <summary>Replication is fire-and-forget (Task.Run); poll until the expected ops arrive.</summary>
private async Task<List<ReplicationOperation>> WaitForReplicationAsync(int count)
{
for (var i = 0; i < 100; i++)
{
lock (_replicated)
if (_replicated.Count >= count) return _replicated.ToList();
await Task.Delay(20);
}
lock (_replicated) return _replicated.ToList();
}
[Fact]
public async Task BufferingAMessage_ReplicatesAnAddOperation()
{
// No handler registered → message is buffered → an Add is replicated.
var result = await _service.EnqueueAsync(
StoreAndForwardCategory.ExternalSystem, "api", """{}""");
Assert.True(result.WasBuffered);
var ops = await WaitForReplicationAsync(1);
Assert.Contains(ops, o =>
o.OperationType == ReplicationOperationType.Add && o.MessageId == result.MessageId);
}
[Fact]
public async Task SuccessfulRetry_ReplicatesARemoveOperation()
{
var calls = 0;
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
_ => ++calls == 1
? throw new HttpRequestException("transient")
: Task.FromResult(true));
var result = await _service.EnqueueAsync(
StoreAndForwardCategory.ExternalSystem, "api", """{}""");
await _service.RetryPendingMessagesAsync();
var ops = await WaitForReplicationAsync(2);
Assert.Contains(ops, o => o.OperationType == ReplicationOperationType.Add);
Assert.Contains(ops, o =>
o.OperationType == ReplicationOperationType.Remove && o.MessageId == result.MessageId);
}
[Fact]
public async Task ParkedMessage_ReplicatesAParkOperation()
{
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
_ => throw new HttpRequestException("always fails"));
var result = await _service.EnqueueAsync(
StoreAndForwardCategory.ExternalSystem, "api", """{}""", maxRetries: 1);
await _service.RetryPendingMessagesAsync();
var ops = await WaitForReplicationAsync(2);
Assert.Contains(ops, o =>
o.OperationType == ReplicationOperationType.Park && o.MessageId == result.MessageId);
}
}
@@ -310,4 +310,28 @@ public class StoreAndForwardServiceTests : IAsyncLifetime, IDisposable
Assert.Equal(100, msg!.MaxRetries); Assert.Equal(100, msg!.MaxRetries);
Assert.Equal(60000, msg.RetryIntervalMs); Assert.Equal(60000, msg.RetryIntervalMs);
} }
// ── attemptImmediateDelivery: false — caller already attempted delivery ──
[Fact]
public async Task EnqueueAsync_AttemptImmediateDeliveryFalse_BuffersWithoutInvokingHandler()
{
// A caller that has already made its own delivery attempt passes
// attemptImmediateDelivery: false so the request is not dispatched twice.
var handlerCalls = 0;
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
_ => { Interlocked.Increment(ref handlerCalls); return Task.FromResult(true); });
var result = await _service.EnqueueAsync(
StoreAndForwardCategory.ExternalSystem, "api", """{}""",
attemptImmediateDelivery: false);
Assert.Equal(0, handlerCalls); // handler NOT invoked at enqueue time
Assert.True(result.WasBuffered);
var msg = await _storage.GetMessageByIdAsync(result.MessageId);
Assert.NotNull(msg);
Assert.Equal(StoreAndForwardMessageStatus.Pending, msg!.Status);
Assert.Equal(1, msg.RetryCount); // counts as the caller's first attempt
}
} }