Compare commits
4 Commits
977d7369a7
...
61253e3269
| Author | SHA1 | Date | |
|---|---|---|---|
| 61253e3269 | |||
| a9bd7ee37c | |||
| a9ceba00d0 | |||
| 239bee3bc4 |
@@ -8,7 +8,7 @@
|
|||||||
| Last reviewed | 2026-05-16 |
|
| Last reviewed | 2026-05-16 |
|
||||||
| Reviewer | claude-agent |
|
| Reviewer | claude-agent |
|
||||||
| Commit reviewed | `9c60592` |
|
| Commit reviewed | `9c60592` |
|
||||||
| Open findings | 19 |
|
| Open findings | 18 |
|
||||||
|
|
||||||
## Summary
|
## Summary
|
||||||
|
|
||||||
@@ -55,7 +55,7 @@ pages and the auth bridge are untested.
|
|||||||
|--|--|
|
|--|--|
|
||||||
| Severity | Critical |
|
| Severity | Critical |
|
||||||
| Category | Security |
|
| Category | Security |
|
||||||
| Status | Open |
|
| Status | Resolved |
|
||||||
| Location | `src/ScadaLink.CentralUI/ScriptAnalysis/ScriptAnalysisService.cs:171-424` |
|
| Location | `src/ScadaLink.CentralUI/ScriptAnalysis/ScriptAnalysisService.cs:171-424` |
|
||||||
|
|
||||||
**Description**
|
**Description**
|
||||||
@@ -85,7 +85,22 @@ an editor hint.
|
|||||||
|
|
||||||
**Resolution**
|
**Resolution**
|
||||||
|
|
||||||
_Unresolved._
|
Resolved 2026-05-16. A Roslyn semantic trust-model gate was added. `RunInSandboxAsync`
|
||||||
|
now calls `EnforceTrustModel` after compilation and before `script.RunAsync`; if the
|
||||||
|
script references any forbidden API the run is rejected (`SandboxErrorKind.CompileError`)
|
||||||
|
with the offending markers, and the same gate is applied to nested shared scripts in
|
||||||
|
`callSharedFunc`. `FindForbiddenApiUsages` was reworked so it resolves every identifier
|
||||||
|
(not just the leftmost) against the semantic model and checks types **and** members —
|
||||||
|
so a fully-qualified call such as `System.IO.File.WriteAllText(...)` is now caught, not
|
||||||
|
only `using`-directive or bare-type forms. This is a static semantic gate consistent
|
||||||
|
with the documented trust model; it is not a process sandbox — reflection-based
|
||||||
|
indirection remains out of its reach, and full isolation would require running scripts
|
||||||
|
in a separate constrained process (a larger change deliberately not taken here).
|
||||||
|
Regression tests `RunInSandbox_FullyQualifiedForbiddenApi_IsBlockedBeforeExecution`,
|
||||||
|
`RunInSandbox_ForbiddenUsingDirective_IsBlockedBeforeExecution` and
|
||||||
|
`Diagnose_FullyQualifiedForbiddenCall_RaisesSCADA002` fail against the pre-fix code and
|
||||||
|
pass after; `RunInSandbox_CleanScript_StillRuns` guards against over-blocking. Fixed by
|
||||||
|
the commit whose message references `CentralUI-001`.
|
||||||
|
|
||||||
### CentralUI-002 — Site-scoped Deployment permissions are issued but never enforced
|
### CentralUI-002 — Site-scoped Deployment permissions are issued but never enforced
|
||||||
|
|
||||||
|
|||||||
@@ -8,7 +8,7 @@
|
|||||||
| Last reviewed | 2026-05-16 |
|
| Last reviewed | 2026-05-16 |
|
||||||
| Reviewer | claude-agent |
|
| Reviewer | claude-agent |
|
||||||
| Commit reviewed | `9c60592` |
|
| Commit reviewed | `9c60592` |
|
||||||
| Open findings | 11 |
|
| Open findings | 10 |
|
||||||
|
|
||||||
## Summary
|
## Summary
|
||||||
|
|
||||||
@@ -16,9 +16,7 @@ The Communication module is generally well-structured and matches the design doc
|
|||||||
two-transport model (ClusterClient for command/control, gRPC server-streaming for
|
two-transport model (ClusterClient for command/control, gRPC server-streaming for
|
||||||
real-time data). The actors keep mutable state on the actor thread, use `PipeTo` for
|
real-time data). The actors keep mutable state on the actor thread, use `PipeTo` for
|
||||||
async work, and the gRPC server/client lifecycle is mostly disciplined. However the
|
async work, and the gRPC server/client lifecycle is mostly disciplined. However the
|
||||||
review found one Critical issue (a `TimeoutException` from `DebugStreamService` leaves
|
review found several High and Medium issues clustered around two themes:
|
||||||
an orphaned bridge actor and an active site-side subscription, leaking resources on
|
|
||||||
every snapshot timeout) and several High/Medium issues clustered around two themes:
|
|
||||||
**(a) gRPC subscription bookkeeping races** — `SiteStreamGrpcClient` overwrites and
|
**(a) gRPC subscription bookkeeping races** — `SiteStreamGrpcClient` overwrites and
|
||||||
removes subscription entries by correlation ID without disposal or ownership checks,
|
removes subscription entries by correlation ID without disposal or ownership checks,
|
||||||
so reconnect cycles leak `CancellationTokenSource`es and can cancel the wrong stream;
|
so reconnect cycles leak `CancellationTokenSource`es and can cancel the wrong stream;
|
||||||
@@ -44,43 +42,55 @@ mutation races, and the snapshot-timeout cleanup path.
|
|||||||
|
|
||||||
## Findings
|
## Findings
|
||||||
|
|
||||||
### Communication-001 — Snapshot timeout leaves orphaned bridge actor and site subscription
|
### Communication-001 — Early stream termination escapes StartStreamAsync's narrow exception handling
|
||||||
|
|
||||||
| | |
|
| | |
|
||||||
|--|--|
|
|--|--|
|
||||||
| Severity | Critical |
|
| Severity | Medium |
|
||||||
| Category | Performance & resource management |
|
| Category | Error handling & resilience |
|
||||||
| Status | Open |
|
| Status | Resolved |
|
||||||
| Location | `src/ScadaLink.Communication/DebugStreamService.cs:139`, `src/ScadaLink.Communication/DebugStreamService.cs:149` |
|
| Location | `src/ScadaLink.Communication/DebugStreamService.cs:130-143` |
|
||||||
|
|
||||||
|
**Re-triaged 2026-05-16:** originally filed Critical, claiming an orphaned bridge actor
|
||||||
|
and a multi-minute site-side resource leak on every snapshot timeout. On verification
|
||||||
|
that impact does **not** occur: `DebugStreamBridgeActor` calls `CleanupGrpc()` and
|
||||||
|
`Context.Stop(Self)` on every path that invokes `onTerminated` (site disconnect, gRPC
|
||||||
|
max-retries, `ReceiveTimeout`), so it always self-terminates and releases its gRPC
|
||||||
|
subscription; and the pure-timeout path does reach `StopStream`, which also stops it.
|
||||||
|
The genuine defect described below is an error-handling gap, not a leak — severity
|
||||||
|
corrected to Medium.
|
||||||
|
|
||||||
**Description**
|
**Description**
|
||||||
|
|
||||||
When `StartStreamAsync` times out waiting for the initial snapshot it calls
|
`StartStreamAsync` awaits the initial snapshot inside a `try` whose only handler is
|
||||||
`StopStream(sessionId)` and throws. `StopStream` only sends `StopDebugStream` to the
|
`catch (OperationCanceledException)`. When the stream terminates before the snapshot
|
||||||
bridge actor **if the session is still in `_sessions`**. But the bridge actor was added
|
arrives, `onTerminatedWrapper` completes the await via
|
||||||
to `_sessions` at line 124 and is only removed by `onTerminatedWrapper`. The serious
|
`snapshotTcs.TrySetException(new InvalidOperationException(...))`. That
|
||||||
case is the race where `onTerminatedWrapper` fires first (e.g. site disconnect arrives
|
`InvalidOperationException` is not an `OperationCanceledException`, so it escapes the
|
||||||
during the wait): `snapshotTcs.TrySetException` completes the await with an
|
catch entirely: the caller (Blazor debug view / SignalR hub) receives a raw,
|
||||||
`InvalidOperationException` rather than `OperationCanceledException`, which is **not**
|
untranslated exception, and `StartStreamAsync` performs no teardown of its own on that
|
||||||
caught by the `catch (OperationCanceledException)` block. The exception propagates
|
path — it relies implicitly on the bridge actor self-terminating. Cleanup from the
|
||||||
uncaught, `StopStream` is never reached, and if the bridge actor is instead orphaned
|
service side is therefore not deterministic, and the failure surfaced to the caller is
|
||||||
(snapshot never arrives, site silent, no terminate) the only cleanup is the 5-minute
|
not a meaningful, documented result.
|
||||||
`ReceiveTimeout` in the actor — meaning a site-side `StreamRelayActor` and gRPC stream
|
|
||||||
can stay alive for up to 5 minutes after the central caller has given up. Combined with
|
|
||||||
the 30s timeout, every transient snapshot delay leaks site resources for minutes.
|
|
||||||
|
|
||||||
**Recommendation**
|
**Recommendation**
|
||||||
|
|
||||||
In `StartStreamAsync`, wrap the `await` so that *any* failure or cancellation
|
In `StartStreamAsync`, catch any exception from the snapshot await, deterministically
|
||||||
deterministically calls `StopStream(sessionId)` (e.g. `try/catch (Exception)` or a
|
tear down the bridge actor (`Tell(StopDebugStream)` via the local actor reference, since
|
||||||
`finally` that stops the session when the result was not returned). Ensure
|
a racing `onTerminatedWrapper` may already have removed the session entry), and translate
|
||||||
`StopStream` is idempotent and always sends `StopDebugStream` even if the session was
|
the failure into a meaningful exception for the caller.
|
||||||
already removed, so the bridge actor (and its site-side subscription) is torn down
|
|
||||||
promptly rather than waiting for the orphan `ReceiveTimeout`.
|
|
||||||
|
|
||||||
**Resolution**
|
**Resolution**
|
||||||
|
|
||||||
_Unresolved._
|
Resolved 2026-05-16. The `catch (OperationCanceledException)`-only block in
|
||||||
|
`StartStreamAsync` was replaced with `catch (Exception)`: it removes the session entry,
|
||||||
|
sends `StopDebugStream` to the bridge actor via the local reference (idempotent — the
|
||||||
|
actor may already be stopping itself), and throws a descriptive exception —
|
||||||
|
`TimeoutException` for the 30s timeout, otherwise an `InvalidOperationException` that
|
||||||
|
names the instance/site and wraps the underlying cause. Regression test
|
||||||
|
`DebugStreamServiceTests.StartStreamAsync_StreamTerminatesBeforeSnapshot_ThrowsMeaningfulException`
|
||||||
|
fails against the pre-fix code and passes after. Fixed by the commit whose message
|
||||||
|
references `Communication-001`.
|
||||||
|
|
||||||
### Communication-002 — gRPC reconnect does not unsubscribe the previous stream, leaking site-side relay actors
|
### Communication-002 — gRPC reconnect does not unsubscribe the previous stream, leaking site-side relay actors
|
||||||
|
|
||||||
|
|||||||
@@ -8,7 +8,7 @@
|
|||||||
| Last reviewed | 2026-05-16 |
|
| Last reviewed | 2026-05-16 |
|
||||||
| Reviewer | claude-agent |
|
| Reviewer | claude-agent |
|
||||||
| Commit reviewed | `9c60592` |
|
| Commit reviewed | `9c60592` |
|
||||||
| Open findings | 13 |
|
| Open findings | 12 |
|
||||||
|
|
||||||
## Summary
|
## Summary
|
||||||
|
|
||||||
@@ -53,7 +53,7 @@ tag-resolution retry, disconnect/re-subscribe, and concurrency around `HandleSub
|
|||||||
|--|--|
|
|--|--|
|
||||||
| Severity | Critical |
|
| Severity | Critical |
|
||||||
| Category | Concurrency & thread safety |
|
| Category | Concurrency & thread safety |
|
||||||
| Status | Open |
|
| Status | Resolved |
|
||||||
| Location | `src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs:473-538` |
|
| Location | `src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs:473-538` |
|
||||||
|
|
||||||
**Description**
|
**Description**
|
||||||
@@ -82,7 +82,18 @@ handler too.
|
|||||||
|
|
||||||
**Resolution**
|
**Resolution**
|
||||||
|
|
||||||
_Unresolved._
|
Resolved 2026-05-16. `HandleSubscribe` was restructured to follow the actor's own
|
||||||
|
`PipeTo(Self)` pattern (the one already used by `HandleRetryTagResolution`): the
|
||||||
|
background `Task.Run` now performs only adapter I/O (`SubscribeAsync`/`ReadAsync`),
|
||||||
|
collects per-tag outcomes into an immutable `SubscribeCompleted` message, and pipes
|
||||||
|
that to `Self`. All mutation of `_subscriptionIds`, `_subscriptionsByInstance`,
|
||||||
|
`_totalSubscribed`, `_resolvedTags` and `_unresolvedTags` now happens in the new
|
||||||
|
`HandleSubscribeCompleted` handler on the actor thread; it is wired into the
|
||||||
|
Connected, Connecting and Reconnecting states so an in-flight subscribe is applied
|
||||||
|
regardless of state transitions. Regression test
|
||||||
|
`DCL001_ConcurrentSubscribes_DoNotCorruptSubscriptionCounters` (30×30 concurrent
|
||||||
|
subscribes) fails against the pre-fix code and passes after. Fixed by the commit
|
||||||
|
whose message references `DataConnectionLayer-001`.
|
||||||
|
|
||||||
### DataConnectionLayer-002 — `Restart` supervision discards all subscription state on connection-actor crash
|
### DataConnectionLayer-002 — `Restart` supervision discards all subscription state on connection-actor crash
|
||||||
|
|
||||||
|
|||||||
@@ -8,7 +8,7 @@
|
|||||||
| Last reviewed | 2026-05-16 |
|
| Last reviewed | 2026-05-16 |
|
||||||
| Reviewer | claude-agent |
|
| Reviewer | claude-agent |
|
||||||
| Commit reviewed | `9c60592` |
|
| Commit reviewed | `9c60592` |
|
||||||
| Open findings | 14 |
|
| Open findings | 13 |
|
||||||
|
|
||||||
## Summary
|
## Summary
|
||||||
|
|
||||||
@@ -53,7 +53,7 @@ requirements (timeout, retry settings) that are declared but not implemented.
|
|||||||
|--|--|
|
|--|--|
|
||||||
| Severity | Critical |
|
| Severity | Critical |
|
||||||
| Category | Error handling & resilience |
|
| Category | Error handling & resilience |
|
||||||
| Status | Open |
|
| Status | Resolved |
|
||||||
| Location | `src/ScadaLink.ExternalSystemGateway/ExternalSystemClient.cs:109`, `src/ScadaLink.ExternalSystemGateway/DatabaseGateway.cs:81` |
|
| Location | `src/ScadaLink.ExternalSystemGateway/ExternalSystemClient.cs:109`, `src/ScadaLink.ExternalSystemGateway/DatabaseGateway.cs:81` |
|
||||||
|
|
||||||
**Description**
|
**Description**
|
||||||
@@ -89,7 +89,19 @@ verifies it is delivered by a retry sweep.
|
|||||||
|
|
||||||
**Resolution**
|
**Resolution**
|
||||||
|
|
||||||
_Unresolved._
|
Resolved 2026-05-16. Delivery handlers for `StoreAndForwardCategory.ExternalSystem` and
|
||||||
|
`CachedDbWrite` are now registered at site startup in `AkkaHostedService`, after
|
||||||
|
`StoreAndForwardService.StartAsync()`. Each handler resolves its consumer in a fresh DI
|
||||||
|
scope and calls a new `DeliverBufferedAsync`: `ExternalSystemClient.DeliverBufferedAsync`
|
||||||
|
re-resolves the system/method and re-invokes `InvokeHttpAsync`, and
|
||||||
|
`DatabaseGateway.DeliverBufferedAsync` executes the buffered SQL — each returning `true`
|
||||||
|
on success, `false` (park) when the target no longer exists or fails permanently, and
|
||||||
|
throwing on transient failure so the engine retries. `EnqueueAsync` gained an
|
||||||
|
`attemptImmediateDelivery` parameter; `CachedCallAsync` passes `false` so registering the
|
||||||
|
handler does not dispatch the request twice (the double-dispatch noted in
|
||||||
|
`ExternalSystemGateway-003`). Regression tests cover the success, target-removed and
|
||||||
|
transient-retry paths. Fixed by the commit whose message references
|
||||||
|
`ExternalSystemGateway-001`.
|
||||||
|
|
||||||
### ExternalSystemGateway-002 — Per-system call timeout is never applied to HTTP requests
|
### ExternalSystemGateway-002 — Per-system call timeout is never applied to HTTP requests
|
||||||
|
|
||||||
|
|||||||
@@ -8,7 +8,7 @@
|
|||||||
| Last reviewed | 2026-05-16 |
|
| Last reviewed | 2026-05-16 |
|
||||||
| Reviewer | claude-agent |
|
| Reviewer | claude-agent |
|
||||||
| Commit reviewed | `9c60592` |
|
| Commit reviewed | `9c60592` |
|
||||||
| Open findings | 12 |
|
| Open findings | 11 |
|
||||||
|
|
||||||
## Summary
|
## Summary
|
||||||
|
|
||||||
@@ -53,7 +53,7 @@ fallback in `DeliverAsync`, and concurrency on the token cache.
|
|||||||
|--|--|
|
|--|--|
|
||||||
| Severity | Critical |
|
| Severity | Critical |
|
||||||
| Category | Error handling & resilience |
|
| Category | Error handling & resilience |
|
||||||
| Status | Open |
|
| Status | Resolved |
|
||||||
| Location | `src/ScadaLink.NotificationService/NotificationDeliveryService.cs:96`, `src/ScadaLink.NotificationService/ServiceCollectionExtensions.cs:8` |
|
| Location | `src/ScadaLink.NotificationService/NotificationDeliveryService.cs:96`, `src/ScadaLink.NotificationService/ServiceCollectionExtensions.cs:8` |
|
||||||
|
|
||||||
**Description**
|
**Description**
|
||||||
@@ -66,7 +66,15 @@ Register a delivery handler for `StoreAndForwardCategory.Notification` during st
|
|||||||
|
|
||||||
**Resolution**
|
**Resolution**
|
||||||
|
|
||||||
_Unresolved._
|
Resolved 2026-05-16. A delivery handler for `StoreAndForwardCategory.Notification` is now
|
||||||
|
registered at site startup in `AkkaHostedService`. The handler resolves
|
||||||
|
`NotificationDeliveryService` in a fresh DI scope and calls the new `DeliverBufferedAsync`,
|
||||||
|
which re-resolves the list, recipients and SMTP config and re-attempts delivery —
|
||||||
|
returning `true` on success, `false` (park) on permanent failure or missing
|
||||||
|
configuration, and throwing on transient failure so the engine retries. `SendAsync` now
|
||||||
|
buffers with `attemptImmediateDelivery: false` so registering the handler does not send
|
||||||
|
the notification twice. Regression tests cover the happy path and the list-removed park
|
||||||
|
path. Fixed by the commit whose message references `NotificationService-001`.
|
||||||
|
|
||||||
### NotificationService-002 — `TimeoutException`/`OperationCanceledException` misclassified as transient
|
### NotificationService-002 — `TimeoutException`/`OperationCanceledException` misclassified as transient
|
||||||
|
|
||||||
|
|||||||
+32
-36
@@ -28,56 +28,52 @@ code-reviews/
|
|||||||
|
|
||||||
## Baseline review — 2026-05-16
|
## Baseline review — 2026-05-16
|
||||||
|
|
||||||
All 19 modules were reviewed at commit `9c60592`. This established the baseline below.
|
All 19 modules were reviewed at commit `9c60592` (241 findings: 6 Critical, 46 High,
|
||||||
|
100 Medium, 89 Low). The tables below track what remains **open** as findings are
|
||||||
|
resolved and re-triaged.
|
||||||
|
|
||||||
| Severity | Open findings |
|
| Severity | Open findings |
|
||||||
|----------|---------------|
|
|----------|---------------|
|
||||||
| Critical | 6 |
|
| Critical | 0 |
|
||||||
| High | 46 |
|
| High | 46 |
|
||||||
| Medium | 100 |
|
| Medium | 100 |
|
||||||
| Low | 89 |
|
| Low | 89 |
|
||||||
| **Total** | **241** |
|
| **Total** | **235** |
|
||||||
|
|
||||||
## Module Status
|
## Module Status
|
||||||
|
|
||||||
| Module | Review status | Last reviewed | Commit | Open (C/H/M/L) | Total |
|
| Module | Last reviewed | Commit | Open (C/H/M/L) | Open | Total |
|
||||||
|--------|---------------|---------------|--------|----------------|-------|
|
|--------|---------------|--------|----------------|------|-------|
|
||||||
| [CentralUI](CentralUI/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 1/3/10/5 | 19 |
|
| [CentralUI](CentralUI/findings.md) | 2026-05-16 | `9c60592` | 0/3/10/5 | 18 | 19 |
|
||||||
| [CLI](CLI/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/1/6/6 | 13 |
|
| [CLI](CLI/findings.md) | 2026-05-16 | `9c60592` | 0/1/6/6 | 13 | 13 |
|
||||||
| [ClusterInfrastructure](ClusterInfrastructure/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/1/4/3 | 8 |
|
| [ClusterInfrastructure](ClusterInfrastructure/findings.md) | 2026-05-16 | `9c60592` | 0/1/4/3 | 8 | 8 |
|
||||||
| [Commons](Commons/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/0/4/8 | 12 |
|
| [Commons](Commons/findings.md) | 2026-05-16 | `9c60592` | 0/0/4/8 | 12 | 12 |
|
||||||
| [Communication](Communication/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 1/2/5/3 | 11 |
|
| [Communication](Communication/findings.md) | 2026-05-16 | `9c60592` | 0/2/5/3 | 10 | 11 |
|
||||||
| [ConfigurationDatabase](ConfigurationDatabase/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/1/4/6 | 11 |
|
| [ConfigurationDatabase](ConfigurationDatabase/findings.md) | 2026-05-16 | `9c60592` | 0/1/4/6 | 11 | 11 |
|
||||||
| [DataConnectionLayer](DataConnectionLayer/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 1/4/6/2 | 13 |
|
| [DataConnectionLayer](DataConnectionLayer/findings.md) | 2026-05-16 | `9c60592` | 0/4/6/2 | 12 | 13 |
|
||||||
| [DeploymentManager](DeploymentManager/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/3/6/5 | 14 |
|
| [DeploymentManager](DeploymentManager/findings.md) | 2026-05-16 | `9c60592` | 0/3/6/5 | 14 | 14 |
|
||||||
| [ExternalSystemGateway](ExternalSystemGateway/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 1/2/7/4 | 14 |
|
| [ExternalSystemGateway](ExternalSystemGateway/findings.md) | 2026-05-16 | `9c60592` | 0/2/7/4 | 13 | 14 |
|
||||||
| [HealthMonitoring](HealthMonitoring/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/2/5/5 | 12 |
|
| [HealthMonitoring](HealthMonitoring/findings.md) | 2026-05-16 | `9c60592` | 0/2/5/5 | 12 | 12 |
|
||||||
| [Host](Host/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/1/3/7 | 11 |
|
| [Host](Host/findings.md) | 2026-05-16 | `9c60592` | 0/1/3/7 | 11 | 11 |
|
||||||
| [InboundAPI](InboundAPI/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/3/5/5 | 13 |
|
| [InboundAPI](InboundAPI/findings.md) | 2026-05-16 | `9c60592` | 0/3/5/5 | 13 | 13 |
|
||||||
| [ManagementService](ManagementService/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/3/5/5 | 13 |
|
| [ManagementService](ManagementService/findings.md) | 2026-05-16 | `9c60592` | 0/3/5/5 | 13 | 13 |
|
||||||
| [NotificationService](NotificationService/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 1/3/5/3 | 12 |
|
| [NotificationService](NotificationService/findings.md) | 2026-05-16 | `9c60592` | 0/3/5/3 | 11 | 12 |
|
||||||
| [Security](Security/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/3/4/4 | 11 |
|
| [Security](Security/findings.md) | 2026-05-16 | `9c60592` | 0/3/4/4 | 11 | 11 |
|
||||||
| [SiteEventLogging](SiteEventLogging/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/4/4/3 | 11 |
|
| [SiteEventLogging](SiteEventLogging/findings.md) | 2026-05-16 | `9c60592` | 0/4/4/3 | 11 | 11 |
|
||||||
| [SiteRuntime](SiteRuntime/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/3/8/5 | 16 |
|
| [SiteRuntime](SiteRuntime/findings.md) | 2026-05-16 | `9c60592` | 0/3/8/5 | 16 | 16 |
|
||||||
| [StoreAndForward](StoreAndForward/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 1/2/4/6 | 13 |
|
| [StoreAndForward](StoreAndForward/findings.md) | 2026-05-16 | `9c60592` | 0/2/4/6 | 12 | 13 |
|
||||||
| [TemplateEngine](TemplateEngine/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/5/5/4 | 14 |
|
| [TemplateEngine](TemplateEngine/findings.md) | 2026-05-16 | `9c60592` | 0/5/5/4 | 14 | 14 |
|
||||||
|
|
||||||
## Pending Findings
|
## Pending Findings
|
||||||
|
|
||||||
All findings are currently `Open`. As findings are resolved, remove them from the
|
Every `Open` / `In Progress` finding across all modules, highest severity first.
|
||||||
tables below (see [REVIEW-PROCESS.md](REVIEW-PROCESS.md) §5). Full detail for each
|
Resolved findings drop off this list but remain recorded in their module's
|
||||||
finding — description, location, recommendation — lives in the module's `findings.md`.
|
`findings.md` (see [REVIEW-PROCESS.md](REVIEW-PROCESS.md) §4–§5). Full detail —
|
||||||
|
description, location, recommendation — lives in the module's `findings.md`.
|
||||||
|
|
||||||
### Critical (6)
|
### Critical (0)
|
||||||
|
|
||||||
| ID | Module | Title |
|
_None open._
|
||||||
|----|--------|-------|
|
|
||||||
| CentralUI-001 | [CentralUI](CentralUI/findings.md) | Test Run sandbox executes arbitrary C# with no trust-model enforcement |
|
|
||||||
| Communication-001 | [Communication](Communication/findings.md) | Snapshot timeout leaves orphaned bridge actor and site subscription |
|
|
||||||
| DataConnectionLayer-001 | [DataConnectionLayer](DataConnectionLayer/findings.md) | `Task.Run` in `HandleSubscribe` mutates actor state off the actor thread |
|
|
||||||
| ExternalSystemGateway-001 | [ExternalSystemGateway](ExternalSystemGateway/findings.md) | No S&F delivery handler registered; cached calls and writes can never be delivered |
|
|
||||||
| NotificationService-001 | [NotificationService](NotificationService/findings.md) | Buffered notifications are never retried (no S&F delivery handler) |
|
|
||||||
| StoreAndForward-001 | [StoreAndForward](StoreAndForward/findings.md) | Replication to standby is never triggered by the active node |
|
|
||||||
|
|
||||||
### High (46)
|
### High (46)
|
||||||
|
|
||||||
|
|||||||
@@ -8,7 +8,7 @@
|
|||||||
| Last reviewed | 2026-05-16 |
|
| Last reviewed | 2026-05-16 |
|
||||||
| Reviewer | claude-agent |
|
| Reviewer | claude-agent |
|
||||||
| Commit reviewed | `9c60592` |
|
| Commit reviewed | `9c60592` |
|
||||||
| Open findings | 13 |
|
| Open findings | 12 |
|
||||||
|
|
||||||
## Summary
|
## Summary
|
||||||
|
|
||||||
@@ -53,7 +53,7 @@ replication and retry-count issues are functional defects against the design.
|
|||||||
|--|--|
|
|--|--|
|
||||||
| Severity | Critical |
|
| Severity | Critical |
|
||||||
| Category | Error handling & resilience |
|
| Category | Error handling & resilience |
|
||||||
| Status | Open |
|
| Status | Resolved |
|
||||||
| Location | `src/ScadaLink.StoreAndForward/ReplicationService.cs:40`, `:53`, `:66`; `src/ScadaLink.StoreAndForward/StoreAndForwardService.cs:155`, `:212`, `:222`, `:236` |
|
| Location | `src/ScadaLink.StoreAndForward/ReplicationService.cs:40`, `:53`, `:66`; `src/ScadaLink.StoreAndForward/StoreAndForwardService.cs:155`, `:212`, `:222`, `:236` |
|
||||||
|
|
||||||
**Description**
|
**Description**
|
||||||
@@ -81,7 +81,14 @@ asserts the replication handler observes each operation type.
|
|||||||
|
|
||||||
**Resolution**
|
**Resolution**
|
||||||
|
|
||||||
_Unresolved._
|
Resolved 2026-05-16. `ReplicationService` is now injected into `StoreAndForwardService`
|
||||||
|
(wired in `AddStoreAndForward`), and every buffer operation is forwarded to the standby:
|
||||||
|
a new `BufferAsync` helper calls `ReplicateEnqueue` after each persist, `ReplicateRemove`
|
||||||
|
runs after a successful retry removes a message, and `ReplicatePark` runs on both park
|
||||||
|
paths. Replication stays fire-and-forget and is a no-op when `ReplicationEnabled` is
|
||||||
|
false or no handler is wired. Regression tests `StoreAndForwardReplicationTests` assert
|
||||||
|
the replication handler observes the Add, Remove and Park operations. Fixed by the
|
||||||
|
commit whose message references `StoreAndForward-001`.
|
||||||
|
|
||||||
### StoreAndForward-002 — Messages enqueued with no registered handler are buffered but never deliverable
|
### StoreAndForward-002 — Messages enqueued with no registered handler are buffered but never deliverable
|
||||||
|
|
||||||
|
|||||||
@@ -220,6 +220,20 @@ public class ScriptAnalysisService
|
|||||||
SandboxErrorKind.CompileError, 0, markers);
|
SandboxErrorKind.CompileError, 0, markers);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Trust-model gate (CentralUI-001): the documented forbidden-API set is
|
||||||
|
// enforced HERE, before execution — not merely surfaced as an editor hint.
|
||||||
|
// Without this, a Design-role user could run arbitrary file/process/
|
||||||
|
// reflection/network code in the central host process.
|
||||||
|
var trustViolations = EnforceTrustModel(script.GetCompilation());
|
||||||
|
if (trustViolations.Count > 0)
|
||||||
|
{
|
||||||
|
return new SandboxRunResult(false, null, null, "",
|
||||||
|
"Script blocked by the trust model — it references forbidden APIs "
|
||||||
|
+ "(System.IO, System.Diagnostics, System.Reflection, System.Net, threading). "
|
||||||
|
+ "See the highlighted diagnostics.",
|
||||||
|
SandboxErrorKind.CompileError, 0, trustViolations);
|
||||||
|
}
|
||||||
|
|
||||||
var parameters = ConvertJsonParameters(request.Parameters);
|
var parameters = ConvertJsonParameters(request.Parameters);
|
||||||
|
|
||||||
using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(timeoutSeconds));
|
using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(timeoutSeconds));
|
||||||
@@ -311,6 +325,13 @@ public class ScriptAnalysisService
|
|||||||
throw new ScriptSandboxException(
|
throw new ScriptSandboxException(
|
||||||
$"Scripts.CallShared(\"{name}\") compile failed: {string.Join("; ", nestedErrors.Select(d => d.GetMessage()))}");
|
$"Scripts.CallShared(\"{name}\") compile failed: {string.Join("; ", nestedErrors.Select(d => d.GetMessage()))}");
|
||||||
|
|
||||||
|
// Trust-model gate (CentralUI-001) — a nested shared script runs
|
||||||
|
// arbitrary code too, so it must clear the same forbidden-API gate.
|
||||||
|
if (EnforceTrustModel(built.GetCompilation()).Count > 0)
|
||||||
|
throw new ScriptSandboxException(
|
||||||
|
$"Scripts.CallShared(\"{name}\") is blocked by the script trust model — "
|
||||||
|
+ "the shared script references forbidden APIs.");
|
||||||
|
|
||||||
lock (compileCacheLock)
|
lock (compileCacheLock)
|
||||||
{
|
{
|
||||||
if (!compileCache.TryGetValue(name, out compiled))
|
if (!compileCache.TryGetValue(name, out compiled))
|
||||||
@@ -1086,15 +1107,25 @@ public class ScriptAnalysisService
|
|||||||
return new(AttributeContextKind.None, null);
|
return new(AttributeContextKind.None, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Finds every reference to a forbidden API — the documented script trust model,
|
||||||
|
/// see <see cref="ForbiddenNamespacePrefixes"/>. Identifiers are resolved against
|
||||||
|
/// the semantic model, so a forbidden type or member is caught however it is
|
||||||
|
/// written: bare (<c>File</c>), fully qualified
|
||||||
|
/// (<c>System.IO.File.WriteAllText</c>), or via an alias — while a user identifier
|
||||||
|
/// that merely shares a name with a forbidden type (<c>var File = …</c>) does not
|
||||||
|
/// false-positive. Used both for editor diagnostics and as the pre-execution
|
||||||
|
/// trust-model gate (see <see cref="EnforceTrustModel"/>).
|
||||||
|
/// </summary>
|
||||||
private static IEnumerable<DiagnosticMarker> FindForbiddenApiUsages(SyntaxTree tree, SemanticModel model)
|
private static IEnumerable<DiagnosticMarker> FindForbiddenApiUsages(SyntaxTree tree, SemanticModel model)
|
||||||
{
|
{
|
||||||
var root = tree.GetRoot();
|
var root = tree.GetRoot();
|
||||||
|
|
||||||
// Banned using directives — pure namespace string match is fine here.
|
// Banned using directives.
|
||||||
foreach (var u in root.DescendantNodes().OfType<UsingDirectiveSyntax>())
|
foreach (var u in root.DescendantNodes().OfType<UsingDirectiveSyntax>())
|
||||||
{
|
{
|
||||||
var name = u.Name?.ToString() ?? "";
|
var name = u.Name?.ToString() ?? "";
|
||||||
if (ForbiddenNamespacePrefixes.Any(p => name == p || name.StartsWith(p + ".")))
|
if (IsForbiddenName(name))
|
||||||
{
|
{
|
||||||
var span = u.GetLocation().GetLineSpan().Span;
|
var span = u.GetLocation().GetLineSpan().Span;
|
||||||
yield return new DiagnosticMarker(
|
yield return new DiagnosticMarker(
|
||||||
@@ -1108,20 +1139,14 @@ public class ScriptAnalysisService
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Banned type usages — resolved via the semantic model so a user
|
// Banned type / member references, resolved via the semantic model. Every
|
||||||
// identifier named "File" or "Thread" does NOT trigger the diagnostic
|
// identifier is checked — including the right-hand side of a member access —
|
||||||
// unless it actually resolves to a forbidden type.
|
// so a fully-qualified forbidden call (System.IO.File.WriteAllText) cannot
|
||||||
|
// slip past by avoiding a `using` directive or a bare type name.
|
||||||
foreach (var ident in root.DescendantNodes().OfType<IdentifierNameSyntax>())
|
foreach (var ident in root.DescendantNodes().OfType<IdentifierNameSyntax>())
|
||||||
{
|
{
|
||||||
// Skip the identifier on the right side of a member access — only
|
var forbidden = ForbiddenNameFor(model.GetSymbolInfo(ident).Symbol);
|
||||||
// the leftmost (the type or qualifier) is what we want to check.
|
if (forbidden == null) continue;
|
||||||
if (ident.Parent is MemberAccessExpressionSyntax m && m.Name == ident) continue;
|
|
||||||
|
|
||||||
var symbol = model.GetSymbolInfo(ident).Symbol;
|
|
||||||
if (symbol is not INamedTypeSymbol type) continue;
|
|
||||||
|
|
||||||
var ns = type.ContainingNamespace?.ToDisplayString() ?? "";
|
|
||||||
if (!ForbiddenNamespacePrefixes.Any(p => ns == p || ns.StartsWith(p + "."))) continue;
|
|
||||||
|
|
||||||
var span = ident.GetLocation().GetLineSpan().Span;
|
var span = ident.GetLocation().GetLineSpan().Span;
|
||||||
yield return new DiagnosticMarker(
|
yield return new DiagnosticMarker(
|
||||||
@@ -1130,11 +1155,75 @@ public class ScriptAnalysisService
|
|||||||
StartColumn: span.Start.Character + 1,
|
StartColumn: span.Start.Character + 1,
|
||||||
EndLineNumber: span.End.Line + 1,
|
EndLineNumber: span.End.Line + 1,
|
||||||
EndColumn: span.End.Character + 1,
|
EndColumn: span.End.Character + 1,
|
||||||
Message: $"Type '{type.Name}' from forbidden namespace '{ns}' is not allowed in scripts.",
|
Message: $"'{ident.Identifier.ValueText}' resolves to forbidden API '{forbidden}', " +
|
||||||
|
"which is not allowed in scripts (script trust model).",
|
||||||
Code: "SCADA002");
|
Code: "SCADA002");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// The forbidden namespace/type a symbol implicates, or null if it is allowed.
|
||||||
|
/// Checks the symbol's namespace and — for a type or member — the type's full
|
||||||
|
/// name, so an entry like <c>System.Threading.Thread</c> bans that exact type
|
||||||
|
/// while <c>System.Threading</c> (e.g. <c>CancellationToken</c>) stays allowed.
|
||||||
|
/// </summary>
|
||||||
|
private static string? ForbiddenNameFor(ISymbol? symbol)
|
||||||
|
{
|
||||||
|
if (symbol == null) return null;
|
||||||
|
foreach (var name in QualifiedNamesOf(symbol))
|
||||||
|
if (IsForbiddenName(name))
|
||||||
|
return name;
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Fully-qualified names a symbol reference implicates for trust-model checking.</summary>
|
||||||
|
private static IEnumerable<string> QualifiedNamesOf(ISymbol symbol)
|
||||||
|
{
|
||||||
|
switch (symbol)
|
||||||
|
{
|
||||||
|
case INamespaceSymbol { IsGlobalNamespace: false } ns:
|
||||||
|
yield return ns.ToDisplayString();
|
||||||
|
break;
|
||||||
|
case ITypeSymbol type:
|
||||||
|
if (type.ContainingNamespace is { IsGlobalNamespace: false } tn)
|
||||||
|
yield return tn.ToDisplayString();
|
||||||
|
yield return FullTypeName(type);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
if (symbol.ContainingType is { } ct)
|
||||||
|
{
|
||||||
|
if (ct.ContainingNamespace is { IsGlobalNamespace: false } cn)
|
||||||
|
yield return cn.ToDisplayString();
|
||||||
|
yield return FullTypeName(ct);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static string FullTypeName(ITypeSymbol type) =>
|
||||||
|
type.ContainingNamespace is { IsGlobalNamespace: false } ns
|
||||||
|
? ns.ToDisplayString() + "." + type.Name
|
||||||
|
: type.Name;
|
||||||
|
|
||||||
|
private static bool IsForbiddenName(string qualifiedName) =>
|
||||||
|
ForbiddenNamespacePrefixes.Any(p =>
|
||||||
|
qualifiedName == p || qualifiedName.StartsWith(p + ".", StringComparison.Ordinal));
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Pre-execution trust-model gate (CentralUI-001). Returns the forbidden-API
|
||||||
|
/// markers (SCADA001/SCADA002) for a compiled script; an empty list means the
|
||||||
|
/// script is clear to run. This is a static semantic gate, not a process
|
||||||
|
/// sandbox — reflection-based indirection is still out of its reach; full
|
||||||
|
/// isolation would require running scripts in a separate constrained process.
|
||||||
|
/// </summary>
|
||||||
|
private static IReadOnlyList<DiagnosticMarker> EnforceTrustModel(Compilation compilation)
|
||||||
|
{
|
||||||
|
var tree = compilation.SyntaxTrees.FirstOrDefault();
|
||||||
|
if (tree == null) return Array.Empty<DiagnosticMarker>();
|
||||||
|
var model = compilation.GetSemanticModel(tree);
|
||||||
|
return FindForbiddenApiUsages(tree, model).ToList();
|
||||||
|
}
|
||||||
|
|
||||||
private static CompletionItem ToCompletionItem(ISymbol symbol)
|
private static CompletionItem ToCompletionItem(ISymbol symbol)
|
||||||
{
|
{
|
||||||
var kind = symbol.Kind switch
|
var kind = symbol.Kind switch
|
||||||
|
|||||||
@@ -127,21 +127,37 @@ public class DebugStreamService
|
|||||||
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
||||||
timeoutCts.CancelAfter(TimeSpan.FromSeconds(30));
|
timeoutCts.CancelAfter(TimeSpan.FromSeconds(30));
|
||||||
|
|
||||||
|
DebugViewSnapshot snapshot;
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
var snapshot = await snapshotTcs.Task.WaitAsync(timeoutCts.Token);
|
snapshot = await snapshotTcs.Task.WaitAsync(timeoutCts.Token);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
// Any failure before the snapshot arrives — the 30s timeout, or the stream
|
||||||
|
// terminating early (site disconnect / gRPC failure, surfaced by
|
||||||
|
// onTerminatedWrapper as an InvalidOperationException) — must deterministically
|
||||||
|
// tear down the bridge actor and its site-side subscription. Use the local
|
||||||
|
// actor reference: a racing onTerminatedWrapper may already have removed the
|
||||||
|
// session, which would make StopStream a no-op. StopDebugStream is idempotent
|
||||||
|
// (the actor may already be stopping itself).
|
||||||
|
_sessions.TryRemove(sessionId, out _);
|
||||||
|
bridgeActor.Tell(new StopDebugStream());
|
||||||
|
|
||||||
|
if (ex is OperationCanceledException)
|
||||||
|
throw new TimeoutException(
|
||||||
|
$"Timed out waiting for debug snapshot from {instanceUniqueName} on site {siteIdentifier}.");
|
||||||
|
|
||||||
|
throw new InvalidOperationException(
|
||||||
|
$"Debug stream for {instanceUniqueName} on site {siteIdentifier} terminated before a snapshot was received.",
|
||||||
|
ex);
|
||||||
|
}
|
||||||
|
|
||||||
_logger.LogInformation("Debug stream {SessionId} started for {Instance} on site {Site}",
|
_logger.LogInformation("Debug stream {SessionId} started for {Instance} on site {Site}",
|
||||||
sessionId, instanceUniqueName, siteIdentifier);
|
sessionId, instanceUniqueName, siteIdentifier);
|
||||||
|
|
||||||
return new DebugStreamSession(sessionId, snapshot);
|
return new DebugStreamSession(sessionId, snapshot);
|
||||||
}
|
}
|
||||||
catch (OperationCanceledException)
|
|
||||||
{
|
|
||||||
StopStream(sessionId);
|
|
||||||
throw new TimeoutException($"Timed out waiting for debug snapshot from {instanceUniqueName} on site {siteIdentifier}.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Stops an active debug stream session.
|
/// Stops an active debug stream session.
|
||||||
|
|||||||
@@ -171,6 +171,11 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
|
|||||||
case UnsubscribeTagsRequest:
|
case UnsubscribeTagsRequest:
|
||||||
Stash.Stash();
|
Stash.Stash();
|
||||||
break;
|
break;
|
||||||
|
case SubscribeCompleted sc:
|
||||||
|
// A subscribe started while Connected can complete after a transition;
|
||||||
|
// apply it so its state survives into the next ReSubscribeAll.
|
||||||
|
HandleSubscribeCompleted(sc);
|
||||||
|
break;
|
||||||
case GetHealthReport:
|
case GetHealthReport:
|
||||||
ReplyWithHealthReport();
|
ReplyWithHealthReport();
|
||||||
break;
|
break;
|
||||||
@@ -207,6 +212,9 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
|
|||||||
case SubscribeTagsRequest req:
|
case SubscribeTagsRequest req:
|
||||||
HandleSubscribe(req);
|
HandleSubscribe(req);
|
||||||
break;
|
break;
|
||||||
|
case SubscribeCompleted sc:
|
||||||
|
HandleSubscribeCompleted(sc);
|
||||||
|
break;
|
||||||
case UnsubscribeTagsRequest req:
|
case UnsubscribeTagsRequest req:
|
||||||
HandleUnsubscribe(req);
|
HandleUnsubscribe(req);
|
||||||
break;
|
break;
|
||||||
@@ -338,6 +346,11 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
|
|||||||
case TagResolutionFailed:
|
case TagResolutionFailed:
|
||||||
// Ignore — stale results from previous connection; ReSubscribeAll runs after reconnect
|
// Ignore — stale results from previous connection; ReSubscribeAll runs after reconnect
|
||||||
break;
|
break;
|
||||||
|
case SubscribeCompleted sc:
|
||||||
|
// A subscribe started while Connected can complete after a transition;
|
||||||
|
// apply it so its state survives into the next ReSubscribeAll.
|
||||||
|
HandleSubscribeCompleted(sc);
|
||||||
|
break;
|
||||||
case GetHealthReport:
|
case GetHealthReport:
|
||||||
ReplyWithHealthReport();
|
ReplyWithHealthReport();
|
||||||
break;
|
break;
|
||||||
@@ -466,18 +479,27 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
|
|||||||
if (!_subscriptionsByInstance.ContainsKey(request.InstanceUniqueName))
|
if (!_subscriptionsByInstance.ContainsKey(request.InstanceUniqueName))
|
||||||
_subscriptionsByInstance[request.InstanceUniqueName] = new HashSet<string>();
|
_subscriptionsByInstance[request.InstanceUniqueName] = new HashSet<string>();
|
||||||
|
|
||||||
var instanceTags = _subscriptionsByInstance[request.InstanceUniqueName];
|
|
||||||
var self = Self;
|
var self = Self;
|
||||||
var sender = Sender;
|
var sender = Sender;
|
||||||
|
|
||||||
|
// Snapshot the already-subscribed tag set on the actor thread. The background
|
||||||
|
// task below must NOT read or mutate actor state — it performs only adapter
|
||||||
|
// I/O and reports results back via a SubscribeCompleted message, which is
|
||||||
|
// applied to actor state on the actor thread (see HandleSubscribeCompleted).
|
||||||
|
var alreadySubscribed = new HashSet<string>(_subscriptionIds.Keys);
|
||||||
|
|
||||||
Task.Run(async () =>
|
Task.Run(async () =>
|
||||||
{
|
{
|
||||||
|
var results = new List<SubscribeTagResult>(request.TagPaths.Count);
|
||||||
|
var tagsToSeed = new List<string>();
|
||||||
|
|
||||||
foreach (var tagPath in request.TagPaths)
|
foreach (var tagPath in request.TagPaths)
|
||||||
{
|
{
|
||||||
if (_subscriptionIds.ContainsKey(tagPath))
|
if (alreadySubscribed.Contains(tagPath))
|
||||||
{
|
{
|
||||||
// Already subscribed — just track for this instance
|
// Already subscribed by another instance — just track for this one.
|
||||||
instanceTags.Add(tagPath);
|
results.Add(new SubscribeTagResult(tagPath, AlreadySubscribed: true, Success: true, null, null));
|
||||||
|
tagsToSeed.Add(tagPath);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -487,27 +509,21 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
|
|||||||
{
|
{
|
||||||
self.Tell(new TagValueReceived(path, value));
|
self.Tell(new TagValueReceived(path, value));
|
||||||
});
|
});
|
||||||
_subscriptionIds[tagPath] = subId;
|
results.Add(new SubscribeTagResult(tagPath, AlreadySubscribed: false, Success: true, subId, null));
|
||||||
instanceTags.Add(tagPath);
|
tagsToSeed.Add(tagPath);
|
||||||
_totalSubscribed++;
|
|
||||||
_resolvedTags++;
|
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
// WP-12: Tag path resolution failure — mark as unresolved, retry later
|
// WP-12: Tag path resolution failure — reported back as unresolved.
|
||||||
_unresolvedTags.Add(tagPath);
|
results.Add(new SubscribeTagResult(tagPath, AlreadySubscribed: false, Success: false, null, ex.Message));
|
||||||
instanceTags.Add(tagPath);
|
|
||||||
_totalSubscribed++;
|
|
||||||
|
|
||||||
self.Tell(new TagResolutionFailed(tagPath, ex.Message));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initial read — seed current values for all resolved tags so the Instance Actor
|
// Initial read — seed current values for resolved tags so the Instance Actor
|
||||||
// doesn't stay Uncertain until the next OPC UA data change notification
|
// doesn't stay Uncertain until the next OPC UA data change notification.
|
||||||
foreach (var tagPath in instanceTags)
|
// Tell is thread-safe, so seeded values are delivered directly as messages.
|
||||||
|
foreach (var tagPath in tagsToSeed)
|
||||||
{
|
{
|
||||||
if (_unresolvedTags.Contains(tagPath)) continue;
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
var readResult = await _adapter.ReadAsync(tagPath);
|
var readResult = await _adapter.ReadAsync(tagPath);
|
||||||
@@ -522,11 +538,51 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return new SubscribeTagsResponse(
|
return new SubscribeCompleted(request, sender, results);
|
||||||
request.CorrelationId, request.InstanceUniqueName, true, null, DateTimeOffset.UtcNow);
|
}).PipeTo(self);
|
||||||
}).PipeTo(sender);
|
}
|
||||||
|
|
||||||
// Start tag resolution retry timer if we have unresolved tags
|
/// <summary>
|
||||||
|
/// Applies the result of an asynchronous subscribe on the actor thread. ALL mutation
|
||||||
|
/// of subscription state and counters happens here — never on the background task —
|
||||||
|
/// so the actor model's single-threaded state guarantee holds.
|
||||||
|
/// </summary>
|
||||||
|
private void HandleSubscribeCompleted(SubscribeCompleted msg)
|
||||||
|
{
|
||||||
|
var instanceName = msg.Request.InstanceUniqueName;
|
||||||
|
if (!_subscriptionsByInstance.TryGetValue(instanceName, out var instanceTags))
|
||||||
|
{
|
||||||
|
// The instance was unsubscribed while the subscribe I/O was in flight.
|
||||||
|
instanceTags = new HashSet<string>();
|
||||||
|
_subscriptionsByInstance[instanceName] = instanceTags;
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach (var result in msg.Results)
|
||||||
|
{
|
||||||
|
instanceTags.Add(result.TagPath);
|
||||||
|
|
||||||
|
// Re-check against current state: another subscribe may have resolved the
|
||||||
|
// same tag while this request's I/O was in flight.
|
||||||
|
if (result.AlreadySubscribed || _subscriptionIds.ContainsKey(result.TagPath))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
if (result.Success)
|
||||||
|
{
|
||||||
|
_subscriptionIds[result.TagPath] = result.SubscriptionId!;
|
||||||
|
_totalSubscribed++;
|
||||||
|
_resolvedTags++;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// WP-12: mark unresolved so the periodic retry timer picks it up.
|
||||||
|
_unresolvedTags.Add(result.TagPath);
|
||||||
|
_totalSubscribed++;
|
||||||
|
_log.Debug("[{0}] Tag resolution failed for {1}: {2}",
|
||||||
|
_connectionName, result.TagPath, result.Error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start the tag-resolution retry timer if any tags are unresolved.
|
||||||
if (_unresolvedTags.Count > 0)
|
if (_unresolvedTags.Count > 0)
|
||||||
{
|
{
|
||||||
Timers.StartPeriodicTimer(
|
Timers.StartPeriodicTimer(
|
||||||
@@ -535,6 +591,9 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
|
|||||||
_options.TagResolutionRetryInterval,
|
_options.TagResolutionRetryInterval,
|
||||||
_options.TagResolutionRetryInterval);
|
_options.TagResolutionRetryInterval);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
msg.ReplyTo.Tell(new SubscribeTagsResponse(
|
||||||
|
msg.Request.CorrelationId, instanceName, true, null, DateTimeOffset.UtcNow));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void HandleUnsubscribe(UnsubscribeTagsRequest request)
|
private void HandleUnsubscribe(UnsubscribeTagsRequest request)
|
||||||
@@ -764,5 +823,9 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
|
|||||||
internal record TagResolutionFailed(string TagPath, string Error);
|
internal record TagResolutionFailed(string TagPath, string Error);
|
||||||
internal record TagResolutionSucceeded(string TagPath, string SubscriptionId);
|
internal record TagResolutionSucceeded(string TagPath, string SubscriptionId);
|
||||||
internal record RetryTagResolution;
|
internal record RetryTagResolution;
|
||||||
|
internal record SubscribeTagResult(
|
||||||
|
string TagPath, bool AlreadySubscribed, bool Success, string? SubscriptionId, string? Error);
|
||||||
|
internal record SubscribeCompleted(
|
||||||
|
SubscribeTagsRequest Request, IActorRef ReplyTo, IReadOnlyList<SubscribeTagResult> Results);
|
||||||
public record GetHealthReport;
|
public record GetHealthReport;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -87,6 +87,64 @@ public class DatabaseGateway : IDatabaseGateway
|
|||||||
definition.RetryDelay > TimeSpan.Zero ? definition.RetryDelay : null);
|
definition.RetryDelay > TimeSpan.Zero ? definition.RetryDelay : null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// WP-9/10: Delivers a buffered CachedDbWrite during a store-and-forward retry
|
||||||
|
/// sweep — executes the SQL against the named connection. Returns true on
|
||||||
|
/// success, false if the connection no longer exists (the message is parked);
|
||||||
|
/// throws on any execution error so the engine retries.
|
||||||
|
/// </summary>
|
||||||
|
public async Task<bool> DeliverBufferedAsync(
|
||||||
|
StoreAndForwardMessage message, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
var payload = JsonSerializer.Deserialize<CachedWritePayload>(message.PayloadJson);
|
||||||
|
if (payload == null || string.IsNullOrEmpty(payload.ConnectionName) || string.IsNullOrEmpty(payload.Sql))
|
||||||
|
{
|
||||||
|
_logger.LogError("Buffered CachedDbWrite message {Id} has an unreadable payload; parking.", message.Id);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
var definition = await ResolveConnectionAsync(payload.ConnectionName, cancellationToken);
|
||||||
|
if (definition == null)
|
||||||
|
{
|
||||||
|
_logger.LogError(
|
||||||
|
"Buffered DB write to '{Connection}' cannot be delivered — the connection no longer exists; parking.",
|
||||||
|
payload.ConnectionName);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
await using var connection = new SqlConnection(definition.ConnectionString);
|
||||||
|
await connection.OpenAsync(cancellationToken);
|
||||||
|
using var command = connection.CreateCommand();
|
||||||
|
command.CommandText = payload.Sql;
|
||||||
|
if (payload.Parameters != null)
|
||||||
|
{
|
||||||
|
foreach (var (key, value) in payload.Parameters)
|
||||||
|
{
|
||||||
|
var parameter = command.CreateParameter();
|
||||||
|
parameter.ParameterName = key.StartsWith('@') ? key : "@" + key;
|
||||||
|
parameter.Value = JsonElementToParameterValue(value);
|
||||||
|
command.Parameters.Add(parameter);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
await command.ExecuteNonQueryAsync(cancellationToken);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static object JsonElementToParameterValue(JsonElement element) => element.ValueKind switch
|
||||||
|
{
|
||||||
|
JsonValueKind.String => (object?)element.GetString() ?? DBNull.Value,
|
||||||
|
JsonValueKind.Number => element.TryGetInt64(out var l) ? l : element.GetDouble(),
|
||||||
|
JsonValueKind.True => true,
|
||||||
|
JsonValueKind.False => false,
|
||||||
|
JsonValueKind.Null or JsonValueKind.Undefined => DBNull.Value,
|
||||||
|
_ => element.GetRawText()
|
||||||
|
};
|
||||||
|
|
||||||
|
private sealed record CachedWritePayload(
|
||||||
|
string ConnectionName,
|
||||||
|
string Sql,
|
||||||
|
Dictionary<string, JsonElement>? Parameters);
|
||||||
|
|
||||||
private async Task<DatabaseConnectionDefinition?> ResolveConnectionAsync(
|
private async Task<DatabaseConnectionDefinition?> ResolveConnectionAsync(
|
||||||
string connectionName,
|
string connectionName,
|
||||||
CancellationToken cancellationToken)
|
CancellationToken cancellationToken)
|
||||||
|
|||||||
@@ -106,18 +106,67 @@ public class ExternalSystemClient : IExternalSystemClient
|
|||||||
Parameters = parameters
|
Parameters = parameters
|
||||||
});
|
});
|
||||||
|
|
||||||
var sfResult = await _storeAndForward.EnqueueAsync(
|
// attemptImmediateDelivery: false — this method already made the HTTP
|
||||||
|
// attempt above; letting EnqueueAsync re-invoke the handler would
|
||||||
|
// dispatch the same request a second time.
|
||||||
|
await _storeAndForward.EnqueueAsync(
|
||||||
StoreAndForwardCategory.ExternalSystem,
|
StoreAndForwardCategory.ExternalSystem,
|
||||||
systemName,
|
systemName,
|
||||||
payload,
|
payload,
|
||||||
originInstanceName,
|
originInstanceName,
|
||||||
system.MaxRetries > 0 ? system.MaxRetries : null,
|
system.MaxRetries > 0 ? system.MaxRetries : null,
|
||||||
system.RetryDelay > TimeSpan.Zero ? system.RetryDelay : null);
|
system.RetryDelay > TimeSpan.Zero ? system.RetryDelay : null,
|
||||||
|
attemptImmediateDelivery: false);
|
||||||
|
|
||||||
return new ExternalCallResult(true, null, null, WasBuffered: true);
|
return new ExternalCallResult(true, null, null, WasBuffered: true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// WP-7/10: Delivers a buffered ExternalSystem call during a store-and-forward
|
||||||
|
/// retry sweep. Returns true on success, false on permanent failure (the message
|
||||||
|
/// is parked); throws <see cref="TransientExternalSystemException"/> on a
|
||||||
|
/// transient failure so the engine retries.
|
||||||
|
/// </summary>
|
||||||
|
public async Task<bool> DeliverBufferedAsync(
|
||||||
|
StoreAndForwardMessage message, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
var payload = JsonSerializer.Deserialize<CachedCallPayload>(message.PayloadJson);
|
||||||
|
if (payload == null || string.IsNullOrEmpty(payload.SystemName) || string.IsNullOrEmpty(payload.MethodName))
|
||||||
|
{
|
||||||
|
_logger.LogError("Buffered ExternalSystem message {Id} has an unreadable payload; parking.", message.Id);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
var (system, method) = await ResolveSystemAndMethodAsync(
|
||||||
|
payload.SystemName, payload.MethodName, cancellationToken);
|
||||||
|
if (system == null || method == null)
|
||||||
|
{
|
||||||
|
_logger.LogError(
|
||||||
|
"Buffered call to '{System}'/'{Method}' cannot be delivered — the system or method no longer exists; parking.",
|
||||||
|
payload.SystemName, payload.MethodName);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
var parameters = payload.Parameters?.ToDictionary(kv => kv.Key, kv => (object?)kv.Value);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await InvokeHttpAsync(system, method, parameters, cancellationToken);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
catch (PermanentExternalSystemException ex)
|
||||||
|
{
|
||||||
|
_logger.LogError(ex, "Buffered call to '{System}' failed permanently; parking.", payload.SystemName);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
// TransientExternalSystemException propagates — the S&F engine retries.
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed record CachedCallPayload(
|
||||||
|
string SystemName,
|
||||||
|
string MethodName,
|
||||||
|
Dictionary<string, JsonElement>? Parameters);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// WP-6: Executes the HTTP request against the external system.
|
/// WP-6: Executes the HTTP request against the external system.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
|||||||
@@ -344,6 +344,42 @@ akka {{
|
|||||||
// any actor or HTTP handler touches the service.
|
// any actor or HTTP handler touches the service.
|
||||||
storeAndForwardService.StartAsync().GetAwaiter().GetResult();
|
storeAndForwardService.StartAsync().GetAwaiter().GetResult();
|
||||||
|
|
||||||
|
// Register the store-and-forward delivery handlers so buffered
|
||||||
|
// ExternalSystem calls, cached DB writes and notifications are actually
|
||||||
|
// delivered by the retry sweep. Without this, every buffered message is
|
||||||
|
// persisted but never delivered. Each handler resolves its scoped consumer
|
||||||
|
// service in a fresh DI scope — the sweep runs on a timer thread, outside
|
||||||
|
// any request scope.
|
||||||
|
storeAndForwardService.RegisterDeliveryHandler(
|
||||||
|
ScadaLink.Commons.Types.Enums.StoreAndForwardCategory.ExternalSystem,
|
||||||
|
async msg =>
|
||||||
|
{
|
||||||
|
using var scope = _serviceProvider.CreateScope();
|
||||||
|
return await scope.ServiceProvider
|
||||||
|
.GetRequiredService<ScadaLink.ExternalSystemGateway.ExternalSystemClient>()
|
||||||
|
.DeliverBufferedAsync(msg);
|
||||||
|
});
|
||||||
|
storeAndForwardService.RegisterDeliveryHandler(
|
||||||
|
ScadaLink.Commons.Types.Enums.StoreAndForwardCategory.CachedDbWrite,
|
||||||
|
async msg =>
|
||||||
|
{
|
||||||
|
using var scope = _serviceProvider.CreateScope();
|
||||||
|
return await scope.ServiceProvider
|
||||||
|
.GetRequiredService<ScadaLink.ExternalSystemGateway.DatabaseGateway>()
|
||||||
|
.DeliverBufferedAsync(msg);
|
||||||
|
});
|
||||||
|
storeAndForwardService.RegisterDeliveryHandler(
|
||||||
|
ScadaLink.Commons.Types.Enums.StoreAndForwardCategory.Notification,
|
||||||
|
async msg =>
|
||||||
|
{
|
||||||
|
using var scope = _serviceProvider.CreateScope();
|
||||||
|
return await scope.ServiceProvider
|
||||||
|
.GetRequiredService<ScadaLink.NotificationService.NotificationDeliveryService>()
|
||||||
|
.DeliverBufferedAsync(msg);
|
||||||
|
});
|
||||||
|
_logger.LogInformation(
|
||||||
|
"Store-and-forward delivery handlers registered (ExternalSystem, CachedDbWrite, Notification)");
|
||||||
|
|
||||||
var parkedMessageHandler = _actorSystem.ActorOf(
|
var parkedMessageHandler = _actorSystem.ActorOf(
|
||||||
Props.Create(() => new ParkedMessageHandlerActor(
|
Props.Create(() => new ParkedMessageHandlerActor(
|
||||||
storeAndForwardService, _nodeOptions.SiteId!)),
|
storeAndForwardService, _nodeOptions.SiteId!)),
|
||||||
|
|||||||
@@ -93,18 +93,75 @@ public class NotificationDeliveryService : INotificationDeliveryService
|
|||||||
Message = message
|
Message = message
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// attemptImmediateDelivery: false — DeliverAsync was already attempted
|
||||||
|
// above; letting EnqueueAsync re-invoke the handler would send twice.
|
||||||
await _storeAndForward.EnqueueAsync(
|
await _storeAndForward.EnqueueAsync(
|
||||||
StoreAndForwardCategory.Notification,
|
StoreAndForwardCategory.Notification,
|
||||||
listName,
|
listName,
|
||||||
payload,
|
payload,
|
||||||
originInstanceName,
|
originInstanceName,
|
||||||
smtpConfig.MaxRetries > 0 ? smtpConfig.MaxRetries : null,
|
smtpConfig.MaxRetries > 0 ? smtpConfig.MaxRetries : null,
|
||||||
smtpConfig.RetryDelay > TimeSpan.Zero ? smtpConfig.RetryDelay : null);
|
smtpConfig.RetryDelay > TimeSpan.Zero ? smtpConfig.RetryDelay : null,
|
||||||
|
attemptImmediateDelivery: false);
|
||||||
|
|
||||||
return new NotificationResult(true, null, WasBuffered: true);
|
return new NotificationResult(true, null, WasBuffered: true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// WP-11/12: Delivers a buffered notification during a store-and-forward retry
|
||||||
|
/// sweep — re-resolves the list, recipients and SMTP config and re-attempts
|
||||||
|
/// delivery. Returns true on success, false on permanent failure (the message
|
||||||
|
/// is parked); throws on a transient failure so the engine retries.
|
||||||
|
/// </summary>
|
||||||
|
public async Task<bool> DeliverBufferedAsync(
|
||||||
|
StoreAndForwardMessage message, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
var payload = JsonSerializer.Deserialize<BufferedNotification>(message.PayloadJson);
|
||||||
|
if (payload == null || string.IsNullOrEmpty(payload.ListName))
|
||||||
|
{
|
||||||
|
_logger.LogError("Buffered notification message {Id} has an unreadable payload; parking.", message.Id);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
var list = await _repository.GetListByNameAsync(payload.ListName, cancellationToken);
|
||||||
|
if (list == null)
|
||||||
|
{
|
||||||
|
_logger.LogError(
|
||||||
|
"Buffered notification to list '{List}' cannot be delivered — the list no longer exists; parking.",
|
||||||
|
payload.ListName);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
var recipients = await _repository.GetRecipientsByListIdAsync(list.Id, cancellationToken);
|
||||||
|
if (recipients.Count == 0)
|
||||||
|
{
|
||||||
|
_logger.LogError("Buffered notification to list '{List}' has no recipients; parking.", payload.ListName);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
var smtpConfig = (await _repository.GetAllSmtpConfigurationsAsync(cancellationToken)).FirstOrDefault();
|
||||||
|
if (smtpConfig == null)
|
||||||
|
{
|
||||||
|
_logger.LogError("Buffered notification cannot be delivered — no SMTP configuration available; parking.");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await DeliverAsync(smtpConfig, recipients, payload.Subject, payload.Message, cancellationToken);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
catch (SmtpPermanentException ex)
|
||||||
|
{
|
||||||
|
_logger.LogError(ex, "Buffered notification to list '{List}' failed permanently; parking.", payload.ListName);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
// Transient SMTP errors propagate out of DeliverAsync — the S&F engine retries.
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed record BufferedNotification(string ListName, string Subject, string Message);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Delivers an email via SMTP. Throws on failure.
|
/// Delivers an email via SMTP. Throws on failure.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
|||||||
@@ -22,7 +22,8 @@ public static class ServiceCollectionExtensions
|
|||||||
var storage = sp.GetRequiredService<StoreAndForwardStorage>();
|
var storage = sp.GetRequiredService<StoreAndForwardStorage>();
|
||||||
var options = sp.GetRequiredService<IOptions<StoreAndForwardOptions>>().Value;
|
var options = sp.GetRequiredService<IOptions<StoreAndForwardOptions>>().Value;
|
||||||
var logger = sp.GetRequiredService<ILogger<StoreAndForwardService>>();
|
var logger = sp.GetRequiredService<ILogger<StoreAndForwardService>>();
|
||||||
return new StoreAndForwardService(storage, options, logger);
|
var replication = sp.GetRequiredService<ReplicationService>();
|
||||||
|
return new StoreAndForwardService(storage, options, logger, replication);
|
||||||
});
|
});
|
||||||
|
|
||||||
services.AddSingleton<ReplicationService>(sp =>
|
services.AddSingleton<ReplicationService>(sp =>
|
||||||
|
|||||||
@@ -30,6 +30,7 @@ public class StoreAndForwardService
|
|||||||
{
|
{
|
||||||
private readonly StoreAndForwardStorage _storage;
|
private readonly StoreAndForwardStorage _storage;
|
||||||
private readonly StoreAndForwardOptions _options;
|
private readonly StoreAndForwardOptions _options;
|
||||||
|
private readonly ReplicationService? _replication;
|
||||||
private readonly ILogger<StoreAndForwardService> _logger;
|
private readonly ILogger<StoreAndForwardService> _logger;
|
||||||
private Timer? _retryTimer;
|
private Timer? _retryTimer;
|
||||||
private int _retryInProgress;
|
private int _retryInProgress;
|
||||||
@@ -48,11 +49,13 @@ public class StoreAndForwardService
|
|||||||
public StoreAndForwardService(
|
public StoreAndForwardService(
|
||||||
StoreAndForwardStorage storage,
|
StoreAndForwardStorage storage,
|
||||||
StoreAndForwardOptions options,
|
StoreAndForwardOptions options,
|
||||||
ILogger<StoreAndForwardService> logger)
|
ILogger<StoreAndForwardService> logger,
|
||||||
|
ReplicationService? replication = null)
|
||||||
{
|
{
|
||||||
_storage = storage;
|
_storage = storage;
|
||||||
_options = options;
|
_options = options;
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
|
_replication = replication;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -109,7 +112,8 @@ public class StoreAndForwardService
|
|||||||
string payloadJson,
|
string payloadJson,
|
||||||
string? originInstanceName = null,
|
string? originInstanceName = null,
|
||||||
int? maxRetries = null,
|
int? maxRetries = null,
|
||||||
TimeSpan? retryInterval = null)
|
TimeSpan? retryInterval = null,
|
||||||
|
bool attemptImmediateDelivery = true)
|
||||||
{
|
{
|
||||||
var message = new StoreAndForwardMessage
|
var message = new StoreAndForwardMessage
|
||||||
{
|
{
|
||||||
@@ -125,8 +129,10 @@ public class StoreAndForwardService
|
|||||||
OriginInstanceName = originInstanceName
|
OriginInstanceName = originInstanceName
|
||||||
};
|
};
|
||||||
|
|
||||||
// Attempt immediate delivery
|
// Attempt immediate delivery — unless the caller has already made a
|
||||||
if (_deliveryHandlers.TryGetValue(category, out var handler))
|
// delivery attempt of its own (attemptImmediateDelivery: false). In that
|
||||||
|
// case re-invoking the handler here would dispatch the request twice.
|
||||||
|
if (attemptImmediateDelivery && _deliveryHandlers.TryGetValue(category, out var handler))
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@@ -136,12 +142,10 @@ public class StoreAndForwardService
|
|||||||
RaiseActivity("Delivered", category, $"Immediate delivery to {target}");
|
RaiseActivity("Delivered", category, $"Immediate delivery to {target}");
|
||||||
return new StoreAndForwardResult(true, message.Id, false);
|
return new StoreAndForwardResult(true, message.Id, false);
|
||||||
}
|
}
|
||||||
else
|
|
||||||
{
|
|
||||||
// Permanent failure — do not buffer
|
// Permanent failure — do not buffer
|
||||||
return new StoreAndForwardResult(false, message.Id, false);
|
return new StoreAndForwardResult(false, message.Id, false);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
// Transient failure — buffer for retry
|
// Transient failure — buffer for retry
|
||||||
@@ -152,19 +156,39 @@ public class StoreAndForwardService
|
|||||||
message.LastAttemptAt = DateTimeOffset.UtcNow;
|
message.LastAttemptAt = DateTimeOffset.UtcNow;
|
||||||
message.RetryCount = 1;
|
message.RetryCount = 1;
|
||||||
message.LastError = ex.Message;
|
message.LastError = ex.Message;
|
||||||
await _storage.EnqueueAsync(message);
|
await BufferAsync(message);
|
||||||
|
|
||||||
RaiseActivity("Queued", category, $"Buffered for retry: {target} ({ex.Message})");
|
RaiseActivity("Queued", category, $"Buffered for retry: {target} ({ex.Message})");
|
||||||
return new StoreAndForwardResult(true, message.Id, true);
|
return new StoreAndForwardResult(true, message.Id, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// No handler registered — buffer for later
|
// Either no handler is registered yet, or the caller already attempted
|
||||||
await _storage.EnqueueAsync(message);
|
// delivery itself — buffer for the background retry sweep to deliver.
|
||||||
RaiseActivity("Queued", category, $"No handler registered, buffered: {target}");
|
if (!attemptImmediateDelivery)
|
||||||
|
{
|
||||||
|
// The caller made (and failed) one attempt before handing the
|
||||||
|
// message over, so it counts as the first retry.
|
||||||
|
message.RetryCount = 1;
|
||||||
|
message.LastAttemptAt = DateTimeOffset.UtcNow;
|
||||||
|
}
|
||||||
|
await BufferAsync(message);
|
||||||
|
RaiseActivity("Queued", category, attemptImmediateDelivery
|
||||||
|
? $"No handler registered, buffered: {target}"
|
||||||
|
: $"Buffered for retry: {target}");
|
||||||
return new StoreAndForwardResult(true, message.Id, true);
|
return new StoreAndForwardResult(true, message.Id, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Persists a message to the local SQLite buffer and (WP-11) replicates the
|
||||||
|
/// add to the standby node so a failover does not lose the buffered message.
|
||||||
|
/// </summary>
|
||||||
|
private async Task BufferAsync(StoreAndForwardMessage message)
|
||||||
|
{
|
||||||
|
await _storage.EnqueueAsync(message);
|
||||||
|
_replication?.ReplicateEnqueue(message);
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// WP-10: Background retry sweep. Processes all pending messages that are due for retry.
|
/// WP-10: Background retry sweep. Processes all pending messages that are due for retry.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
@@ -210,6 +234,7 @@ public class StoreAndForwardService
|
|||||||
if (success)
|
if (success)
|
||||||
{
|
{
|
||||||
await _storage.RemoveMessageAsync(message.Id);
|
await _storage.RemoveMessageAsync(message.Id);
|
||||||
|
_replication?.ReplicateRemove(message.Id);
|
||||||
RaiseActivity("Delivered", message.Category,
|
RaiseActivity("Delivered", message.Category,
|
||||||
$"Delivered to {message.Target} after {message.RetryCount} retries");
|
$"Delivered to {message.Target} after {message.RetryCount} retries");
|
||||||
return;
|
return;
|
||||||
@@ -220,6 +245,7 @@ public class StoreAndForwardService
|
|||||||
message.LastAttemptAt = DateTimeOffset.UtcNow;
|
message.LastAttemptAt = DateTimeOffset.UtcNow;
|
||||||
message.LastError = "Permanent failure (handler returned false)";
|
message.LastError = "Permanent failure (handler returned false)";
|
||||||
await _storage.UpdateMessageAsync(message);
|
await _storage.UpdateMessageAsync(message);
|
||||||
|
_replication?.ReplicatePark(message);
|
||||||
RaiseActivity("Parked", message.Category,
|
RaiseActivity("Parked", message.Category,
|
||||||
$"Permanent failure for {message.Target}: handler returned false");
|
$"Permanent failure for {message.Target}: handler returned false");
|
||||||
}
|
}
|
||||||
@@ -234,6 +260,7 @@ public class StoreAndForwardService
|
|||||||
{
|
{
|
||||||
message.Status = StoreAndForwardMessageStatus.Parked;
|
message.Status = StoreAndForwardMessageStatus.Parked;
|
||||||
await _storage.UpdateMessageAsync(message);
|
await _storage.UpdateMessageAsync(message);
|
||||||
|
_replication?.ReplicatePark(message);
|
||||||
RaiseActivity("Parked", message.Category,
|
RaiseActivity("Parked", message.Category,
|
||||||
$"Max retries ({message.MaxRetries}) reached for {message.Target}");
|
$"Max retries ({message.MaxRetries}) reached for {message.Target}");
|
||||||
_logger.LogWarning(
|
_logger.LogWarning(
|
||||||
|
|||||||
@@ -405,4 +405,64 @@ public class ScriptAnalysisServiceTests
|
|||||||
Assert.Contains("name", resp.Markdown);
|
Assert.Contains("name", resp.Markdown);
|
||||||
Assert.Contains("String", resp.Markdown);
|
Assert.Contains("String", resp.Markdown);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── CentralUI-001: trust-model gate before sandbox execution ──────────
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Diagnose_FullyQualifiedForbiddenCall_RaisesSCADA002()
|
||||||
|
{
|
||||||
|
// A forbidden API reached by fully-qualified name (no `using`, no bare
|
||||||
|
// type identifier) must still be flagged — the pre-fix semantic check
|
||||||
|
// only inspected the leftmost identifier and missed this shape.
|
||||||
|
var resp = _svc.Diagnose(new DiagnoseRequest(
|
||||||
|
"var d = System.IO.Directory.GetCurrentDirectory(); return d;"));
|
||||||
|
Assert.Contains(resp.Markers, m => m.Code == "SCADA002");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task RunInSandbox_FullyQualifiedForbiddenApi_IsBlockedBeforeExecution()
|
||||||
|
{
|
||||||
|
// Regression test for CentralUI-001. RunInSandboxAsync used to execute any
|
||||||
|
// script that compiled, with no trust-model enforcement — so fully-qualified
|
||||||
|
// forbidden API code ran in the central host process. The fix gates execution
|
||||||
|
// on the forbidden-API analysis.
|
||||||
|
var result = await _svc.RunInSandboxAsync(
|
||||||
|
new SandboxRunRequest(
|
||||||
|
"var d = System.IO.Directory.GetCurrentDirectory(); return d;",
|
||||||
|
Parameters: null,
|
||||||
|
TimeoutSeconds: null),
|
||||||
|
CancellationToken.None);
|
||||||
|
|
||||||
|
Assert.False(result.Success);
|
||||||
|
Assert.Equal(SandboxErrorKind.CompileError, result.ErrorKind);
|
||||||
|
Assert.Contains("trust model", result.Error);
|
||||||
|
Assert.NotNull(result.Markers);
|
||||||
|
Assert.Contains(result.Markers!, m => m.Code is "SCADA001" or "SCADA002");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task RunInSandbox_ForbiddenUsingDirective_IsBlockedBeforeExecution()
|
||||||
|
{
|
||||||
|
var result = await _svc.RunInSandboxAsync(
|
||||||
|
new SandboxRunRequest(
|
||||||
|
"using System.Diagnostics; var p = Process.GetCurrentProcess().Id; return p;",
|
||||||
|
Parameters: null,
|
||||||
|
TimeoutSeconds: null),
|
||||||
|
CancellationToken.None);
|
||||||
|
|
||||||
|
Assert.False(result.Success);
|
||||||
|
Assert.Equal(SandboxErrorKind.CompileError, result.ErrorKind);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task RunInSandbox_CleanScript_StillRuns()
|
||||||
|
{
|
||||||
|
// The gate must not block a script that stays within the allowed surface.
|
||||||
|
var result = await _svc.RunInSandboxAsync(
|
||||||
|
new SandboxRunRequest("return 21 * 2;", Parameters: null, TimeoutSeconds: null),
|
||||||
|
CancellationToken.None);
|
||||||
|
|
||||||
|
Assert.True(result.Success);
|
||||||
|
Assert.Equal("42", result.ReturnValueJson);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,77 @@
|
|||||||
|
using Akka.Actor;
|
||||||
|
using Akka.TestKit.Xunit2;
|
||||||
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
using Microsoft.Extensions.Options;
|
||||||
|
using NSubstitute;
|
||||||
|
using ScadaLink.Commons.Entities.Instances;
|
||||||
|
using ScadaLink.Commons.Entities.Sites;
|
||||||
|
using ScadaLink.Commons.Interfaces.Repositories;
|
||||||
|
using ScadaLink.Communication;
|
||||||
|
using ScadaLink.Communication.Actors;
|
||||||
|
using ScadaLink.Communication.Grpc;
|
||||||
|
|
||||||
|
namespace ScadaLink.Communication.Tests;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Tests for DebugStreamService session lifecycle.
|
||||||
|
/// </summary>
|
||||||
|
public class DebugStreamServiceTests : TestKit
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public async Task StartStreamAsync_StreamTerminatesBeforeSnapshot_ThrowsMeaningfulException()
|
||||||
|
{
|
||||||
|
// Regression test for Communication-001. When the debug stream terminates before
|
||||||
|
// the initial snapshot arrives, StartStreamAsync used to let the raw
|
||||||
|
// InvalidOperationException from onTerminatedWrapper escape its
|
||||||
|
// OperationCanceledException-only catch — the caller saw an untranslated exception
|
||||||
|
// and the failure path did not deterministically tear the bridge actor down.
|
||||||
|
// The fix catches any failure, tells the bridge actor StopDebugStream, and throws
|
||||||
|
// a descriptive exception that names the instance and wraps the underlying cause.
|
||||||
|
var instance = new Instance("Site1.Pump01") { Id = 7, SiteId = 3 };
|
||||||
|
var site = new Site("Site One", "site-1")
|
||||||
|
{
|
||||||
|
Id = 3,
|
||||||
|
GrpcNodeAAddress = "http://localhost:5100",
|
||||||
|
GrpcNodeBAddress = "http://localhost:5200"
|
||||||
|
};
|
||||||
|
|
||||||
|
var instanceRepo = Substitute.For<ITemplateEngineRepository>();
|
||||||
|
instanceRepo.GetInstanceByIdAsync(7, Arg.Any<CancellationToken>()).Returns(instance);
|
||||||
|
var siteRepo = Substitute.For<ISiteRepository>();
|
||||||
|
siteRepo.GetSiteByIdAsync(3, Arg.Any<CancellationToken>()).Returns(site);
|
||||||
|
|
||||||
|
var services = new ServiceCollection();
|
||||||
|
services.AddScoped(_ => instanceRepo);
|
||||||
|
services.AddScoped(_ => siteRepo);
|
||||||
|
using var provider = services.BuildServiceProvider();
|
||||||
|
|
||||||
|
var commProbe = CreateTestProbe();
|
||||||
|
var commService = new CommunicationService(
|
||||||
|
Options.Create(new CommunicationOptions()),
|
||||||
|
NullLogger<CommunicationService>.Instance);
|
||||||
|
commService.SetCommunicationActor(commProbe.Ref);
|
||||||
|
|
||||||
|
using var grpcFactory = new SiteStreamGrpcClientFactory(NullLoggerFactory.Instance);
|
||||||
|
var service = new DebugStreamService(
|
||||||
|
commService, provider, grpcFactory, NullLogger<DebugStreamService>.Instance);
|
||||||
|
service.SetActorSystem(Sys);
|
||||||
|
|
||||||
|
// Act — start the stream; it blocks awaiting the initial snapshot.
|
||||||
|
var startTask = service.StartStreamAsync(instanceId: 7, onEvent: _ => { }, onTerminated: () => { });
|
||||||
|
|
||||||
|
// The bridge actor's PreStart sends SubscribeDebugViewRequest to the comm actor;
|
||||||
|
// the envelope's sender is the bridge actor itself.
|
||||||
|
commProbe.ExpectMsg<SiteEnvelope>(TimeSpan.FromSeconds(5));
|
||||||
|
var bridgeActor = commProbe.LastSender;
|
||||||
|
|
||||||
|
// Simulate the site terminating the stream before any snapshot is delivered.
|
||||||
|
bridgeActor.Tell(new DebugStreamTerminated("site-1", "corr"));
|
||||||
|
|
||||||
|
// Assert — a descriptive exception that names the instance and wraps the cause,
|
||||||
|
// not the raw "terminated before snapshot received" InvalidOperationException.
|
||||||
|
var ex = await Assert.ThrowsAsync<InvalidOperationException>(() => startTask);
|
||||||
|
Assert.Contains("Site1.Pump01", ex.Message);
|
||||||
|
Assert.NotNull(ex.InnerException);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -458,4 +458,87 @@ public class DataConnectionActorTests : TestKit
|
|||||||
await backupAdapter.Received().SubscribeAsync(
|
await backupAdapter.Received().SubscribeAsync(
|
||||||
"sensor/temp", Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>());
|
"sensor/temp", Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── DataConnectionLayer-001: subscribe must not mutate actor state off-thread ──
|
||||||
|
|
||||||
|
private static async Task<string> DelayedSubscribeAsync()
|
||||||
|
{
|
||||||
|
// A short delay so concurrent subscribe background tasks pile up and their
|
||||||
|
// post-await state mutations would race under the pre-fix implementation.
|
||||||
|
await Task.Delay(1);
|
||||||
|
return "sub-" + Guid.NewGuid().ToString("N");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task DCL001_ConcurrentSubscribes_DoNotCorruptSubscriptionCounters()
|
||||||
|
{
|
||||||
|
// Regression test for DataConnectionLayer-001. HandleSubscribe used to mutate
|
||||||
|
// actor state (_subscriptionIds, _totalSubscribed, _resolvedTags, the per-instance
|
||||||
|
// HashSet) from a Task.Run background thread. Many concurrent subscribes then race
|
||||||
|
// on non-thread-safe Dictionary/HashSet and on non-atomic int++ — losing increments
|
||||||
|
// or throwing. After the fix every mutation is applied on the actor thread via a
|
||||||
|
// SubscribeCompleted message, so the final counts are exact.
|
||||||
|
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
||||||
|
.Returns(Task.CompletedTask);
|
||||||
|
_mockAdapter.Status.Returns(ConnectionHealth.Connected);
|
||||||
|
_mockAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
|
||||||
|
.Returns(_ => DelayedSubscribeAsync());
|
||||||
|
_mockAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
||||||
|
.Returns(new ReadResult(false, null, null));
|
||||||
|
|
||||||
|
var actor = CreateConnectionActor("dcl001-concurrent");
|
||||||
|
await Task.Delay(300); // reach Connected state
|
||||||
|
|
||||||
|
const int instances = 30;
|
||||||
|
const int tagsPerInstance = 30;
|
||||||
|
for (var i = 0; i < instances; i++)
|
||||||
|
{
|
||||||
|
var tags = Enumerable.Range(0, tagsPerInstance)
|
||||||
|
.Select(j => $"inst{i}/tag{j}")
|
||||||
|
.ToArray();
|
||||||
|
actor.Tell(new SubscribeTagsRequest(
|
||||||
|
$"corr{i}", $"inst{i}", "dcl001-concurrent", tags, DateTimeOffset.UtcNow));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Every subscribe must be acknowledged.
|
||||||
|
for (var i = 0; i < instances; i++)
|
||||||
|
ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(15));
|
||||||
|
|
||||||
|
actor.Tell(new DataConnectionActor.GetHealthReport());
|
||||||
|
var report = ExpectMsg<DataConnectionHealthReport>(TimeSpan.FromSeconds(5));
|
||||||
|
|
||||||
|
// Every tag is distinct, so each is a fresh, resolved subscription.
|
||||||
|
Assert.Equal(instances * tagsPerInstance, report.TotalSubscribedTags);
|
||||||
|
Assert.Equal(instances * tagsPerInstance, report.ResolvedTags);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task DCL001_SubscribeWithFailedTags_CountsResolvedAndUnresolvedSeparately()
|
||||||
|
{
|
||||||
|
// Behavioural guard: the restructured subscribe must preserve the original
|
||||||
|
// accounting — failed tags count toward TotalSubscribed but not ResolvedTags.
|
||||||
|
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
||||||
|
.Returns(Task.CompletedTask);
|
||||||
|
_mockAdapter.Status.Returns(ConnectionHealth.Connected);
|
||||||
|
_mockAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
|
||||||
|
.Returns(ci => ((string)ci[0]).StartsWith("bad")
|
||||||
|
? Task.FromException<string>(new Exception("tag not found"))
|
||||||
|
: Task.FromResult("sub-ok"));
|
||||||
|
_mockAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
||||||
|
.Returns(new ReadResult(false, null, null));
|
||||||
|
|
||||||
|
var actor = CreateConnectionActor("dcl001-failed-tags");
|
||||||
|
await Task.Delay(300);
|
||||||
|
|
||||||
|
actor.Tell(new SubscribeTagsRequest(
|
||||||
|
"c1", "inst1", "dcl001-failed-tags",
|
||||||
|
["good/a", "good/b", "good/c", "bad/x", "bad/y"], DateTimeOffset.UtcNow));
|
||||||
|
ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(5));
|
||||||
|
|
||||||
|
actor.Tell(new DataConnectionActor.GetHealthReport());
|
||||||
|
var report = ExpectMsg<DataConnectionHealthReport>(TimeSpan.FromSeconds(3));
|
||||||
|
|
||||||
|
Assert.Equal(5, report.TotalSubscribedTags); // all 5 tags tracked
|
||||||
|
Assert.Equal(3, report.ResolvedTags); // only the 3 good ones resolved
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -53,4 +53,26 @@ public class DatabaseGatewayTests
|
|||||||
await Assert.ThrowsAsync<InvalidOperationException>(
|
await Assert.ThrowsAsync<InvalidOperationException>(
|
||||||
() => gateway.CachedWriteAsync("nonexistent", "INSERT INTO t VALUES (1)"));
|
() => gateway.CachedWriteAsync("nonexistent", "INSERT INTO t VALUES (1)"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── ExternalSystemGateway-001: buffered CachedDbWrite delivery handler ──
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task DeliverBuffered_ConnectionNoLongerExists_ReturnsFalseSoMessageParks()
|
||||||
|
{
|
||||||
|
_repository.GetAllDatabaseConnectionsAsync().Returns(new List<DatabaseConnectionDefinition>());
|
||||||
|
var gateway = new DatabaseGateway(_repository, NullLogger<DatabaseGateway>.Instance);
|
||||||
|
|
||||||
|
var message = new ScadaLink.StoreAndForward.StoreAndForwardMessage
|
||||||
|
{
|
||||||
|
Id = Guid.NewGuid().ToString("N"),
|
||||||
|
Category = ScadaLink.Commons.Types.Enums.StoreAndForwardCategory.CachedDbWrite,
|
||||||
|
Target = "gone-db",
|
||||||
|
PayloadJson =
|
||||||
|
"""{"ConnectionName":"gone-db","Sql":"INSERT INTO t VALUES (1)","Parameters":null}""",
|
||||||
|
};
|
||||||
|
|
||||||
|
var delivered = await gateway.DeliverBufferedAsync(message);
|
||||||
|
|
||||||
|
Assert.False(delivered); // permanent — the S&F engine parks the message
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -153,6 +153,69 @@ public class ExternalSystemClientTests
|
|||||||
Assert.False(result.WasBuffered);
|
Assert.False(result.WasBuffered);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── ExternalSystemGateway-001: buffered-call delivery handler ──
|
||||||
|
|
||||||
|
private static ScadaLink.StoreAndForward.StoreAndForwardMessage BufferedCall(
|
||||||
|
string systemName, string methodName) =>
|
||||||
|
new()
|
||||||
|
{
|
||||||
|
Id = Guid.NewGuid().ToString("N"),
|
||||||
|
Category = ScadaLink.Commons.Types.Enums.StoreAndForwardCategory.ExternalSystem,
|
||||||
|
Target = systemName,
|
||||||
|
PayloadJson =
|
||||||
|
$$"""{"SystemName":"{{systemName}}","MethodName":"{{methodName}}","Parameters":null}""",
|
||||||
|
};
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task DeliverBuffered_SuccessfulHttp_ReturnsTrue()
|
||||||
|
{
|
||||||
|
var system = new ExternalSystemDefinition("TestAPI", "https://api.example.com", "none") { Id = 1 };
|
||||||
|
var method = new ExternalSystemMethod("getData", "GET", "/data") { Id = 1, ExternalSystemDefinitionId = 1 };
|
||||||
|
_repository.GetAllExternalSystemsAsync().Returns(new List<ExternalSystemDefinition> { system });
|
||||||
|
_repository.GetMethodsByExternalSystemIdAsync(1).Returns(new List<ExternalSystemMethod> { method });
|
||||||
|
|
||||||
|
var httpClient = new HttpClient(new MockHttpMessageHandler(HttpStatusCode.OK, "{\"ok\":true}"));
|
||||||
|
_httpClientFactory.CreateClient(Arg.Any<string>()).Returns(httpClient);
|
||||||
|
|
||||||
|
var client = new ExternalSystemClient(
|
||||||
|
_httpClientFactory, _repository, NullLogger<ExternalSystemClient>.Instance);
|
||||||
|
|
||||||
|
var delivered = await client.DeliverBufferedAsync(BufferedCall("TestAPI", "getData"));
|
||||||
|
|
||||||
|
Assert.True(delivered);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task DeliverBuffered_SystemNoLongerExists_ReturnsFalseSoMessageParks()
|
||||||
|
{
|
||||||
|
_repository.GetAllExternalSystemsAsync().Returns(new List<ExternalSystemDefinition>());
|
||||||
|
|
||||||
|
var client = new ExternalSystemClient(
|
||||||
|
_httpClientFactory, _repository, NullLogger<ExternalSystemClient>.Instance);
|
||||||
|
|
||||||
|
var delivered = await client.DeliverBufferedAsync(BufferedCall("GoneAPI", "method"));
|
||||||
|
|
||||||
|
Assert.False(delivered); // permanent — the S&F engine parks the message
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task DeliverBuffered_Transient500_ThrowsSoEngineRetries()
|
||||||
|
{
|
||||||
|
var system = new ExternalSystemDefinition("TestAPI", "https://api.example.com", "none") { Id = 1 };
|
||||||
|
var method = new ExternalSystemMethod("failMethod", "POST", "/fail") { Id = 1, ExternalSystemDefinitionId = 1 };
|
||||||
|
_repository.GetAllExternalSystemsAsync().Returns(new List<ExternalSystemDefinition> { system });
|
||||||
|
_repository.GetMethodsByExternalSystemIdAsync(1).Returns(new List<ExternalSystemMethod> { method });
|
||||||
|
|
||||||
|
var httpClient = new HttpClient(new MockHttpMessageHandler(HttpStatusCode.InternalServerError, "boom"));
|
||||||
|
_httpClientFactory.CreateClient(Arg.Any<string>()).Returns(httpClient);
|
||||||
|
|
||||||
|
var client = new ExternalSystemClient(
|
||||||
|
_httpClientFactory, _repository, NullLogger<ExternalSystemClient>.Instance);
|
||||||
|
|
||||||
|
await Assert.ThrowsAsync<TransientExternalSystemException>(
|
||||||
|
() => client.DeliverBufferedAsync(BufferedCall("TestAPI", "failMethod")));
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Test helper: mock HTTP message handler.
|
/// Test helper: mock HTTP message handler.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
|||||||
@@ -192,4 +192,37 @@ public class NotificationDeliveryServiceTests
|
|||||||
Assert.True(result.Success);
|
Assert.True(result.Success);
|
||||||
Assert.True(result.WasBuffered);
|
Assert.True(result.WasBuffered);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── NotificationService-001: buffered-notification delivery handler ──
|
||||||
|
|
||||||
|
private static StoreAndForward.StoreAndForwardMessage BufferedNotification(string listName) =>
|
||||||
|
new()
|
||||||
|
{
|
||||||
|
Id = Guid.NewGuid().ToString("N"),
|
||||||
|
Category = ScadaLink.Commons.Types.Enums.StoreAndForwardCategory.Notification,
|
||||||
|
Target = listName,
|
||||||
|
PayloadJson = $$"""{"ListName":"{{listName}}","Subject":"Alert","Message":"Body"}""",
|
||||||
|
};
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task DeliverBuffered_HappyPath_ReturnsTrue()
|
||||||
|
{
|
||||||
|
SetupHappyPath();
|
||||||
|
var service = CreateService();
|
||||||
|
|
||||||
|
var delivered = await service.DeliverBufferedAsync(BufferedNotification("ops-team"));
|
||||||
|
|
||||||
|
Assert.True(delivered);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task DeliverBuffered_ListNoLongerExists_ReturnsFalseSoMessageParks()
|
||||||
|
{
|
||||||
|
_repository.GetListByNameAsync("gone-list").Returns((NotificationList?)null);
|
||||||
|
var service = CreateService();
|
||||||
|
|
||||||
|
var delivered = await service.DeliverBufferedAsync(BufferedNotification("gone-list"));
|
||||||
|
|
||||||
|
Assert.False(delivered); // permanent — the S&F engine parks the message
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,108 @@
|
|||||||
|
using Microsoft.Data.Sqlite;
|
||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
using ScadaLink.Commons.Types.Enums;
|
||||||
|
|
||||||
|
namespace ScadaLink.StoreAndForward.Tests;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// StoreAndForward-001: the active node must forward every buffer operation
|
||||||
|
/// (add / remove / park) to the standby via the ReplicationService, so a
|
||||||
|
/// failover does not lose the buffer.
|
||||||
|
/// </summary>
|
||||||
|
public class StoreAndForwardReplicationTests : IAsyncLifetime, IDisposable
|
||||||
|
{
|
||||||
|
private readonly SqliteConnection _keepAlive;
|
||||||
|
private readonly StoreAndForwardStorage _storage;
|
||||||
|
private readonly StoreAndForwardService _service;
|
||||||
|
private readonly List<ReplicationOperation> _replicated = new();
|
||||||
|
|
||||||
|
public StoreAndForwardReplicationTests()
|
||||||
|
{
|
||||||
|
var connStr = $"Data Source=ReplTests_{Guid.NewGuid():N};Mode=Memory;Cache=Shared";
|
||||||
|
_keepAlive = new SqliteConnection(connStr);
|
||||||
|
_keepAlive.Open();
|
||||||
|
|
||||||
|
_storage = new StoreAndForwardStorage(connStr, NullLogger<StoreAndForwardStorage>.Instance);
|
||||||
|
|
||||||
|
var options = new StoreAndForwardOptions
|
||||||
|
{
|
||||||
|
DefaultRetryInterval = TimeSpan.Zero,
|
||||||
|
DefaultMaxRetries = 1,
|
||||||
|
RetryTimerInterval = TimeSpan.FromMinutes(10),
|
||||||
|
ReplicationEnabled = true,
|
||||||
|
};
|
||||||
|
|
||||||
|
var replication = new ReplicationService(options, NullLogger<ReplicationService>.Instance);
|
||||||
|
replication.SetReplicationHandler(op =>
|
||||||
|
{
|
||||||
|
lock (_replicated) _replicated.Add(op);
|
||||||
|
return Task.CompletedTask;
|
||||||
|
});
|
||||||
|
|
||||||
|
_service = new StoreAndForwardService(
|
||||||
|
_storage, options, NullLogger<StoreAndForwardService>.Instance, replication);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task InitializeAsync() => await _storage.InitializeAsync();
|
||||||
|
public Task DisposeAsync() => Task.CompletedTask;
|
||||||
|
public void Dispose() => _keepAlive.Dispose();
|
||||||
|
|
||||||
|
/// <summary>Replication is fire-and-forget (Task.Run); poll until the expected ops arrive.</summary>
|
||||||
|
private async Task<List<ReplicationOperation>> WaitForReplicationAsync(int count)
|
||||||
|
{
|
||||||
|
for (var i = 0; i < 100; i++)
|
||||||
|
{
|
||||||
|
lock (_replicated)
|
||||||
|
if (_replicated.Count >= count) return _replicated.ToList();
|
||||||
|
await Task.Delay(20);
|
||||||
|
}
|
||||||
|
lock (_replicated) return _replicated.ToList();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task BufferingAMessage_ReplicatesAnAddOperation()
|
||||||
|
{
|
||||||
|
// No handler registered → message is buffered → an Add is replicated.
|
||||||
|
var result = await _service.EnqueueAsync(
|
||||||
|
StoreAndForwardCategory.ExternalSystem, "api", """{}""");
|
||||||
|
Assert.True(result.WasBuffered);
|
||||||
|
|
||||||
|
var ops = await WaitForReplicationAsync(1);
|
||||||
|
Assert.Contains(ops, o =>
|
||||||
|
o.OperationType == ReplicationOperationType.Add && o.MessageId == result.MessageId);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task SuccessfulRetry_ReplicatesARemoveOperation()
|
||||||
|
{
|
||||||
|
var calls = 0;
|
||||||
|
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
|
||||||
|
_ => ++calls == 1
|
||||||
|
? throw new HttpRequestException("transient")
|
||||||
|
: Task.FromResult(true));
|
||||||
|
|
||||||
|
var result = await _service.EnqueueAsync(
|
||||||
|
StoreAndForwardCategory.ExternalSystem, "api", """{}""");
|
||||||
|
await _service.RetryPendingMessagesAsync();
|
||||||
|
|
||||||
|
var ops = await WaitForReplicationAsync(2);
|
||||||
|
Assert.Contains(ops, o => o.OperationType == ReplicationOperationType.Add);
|
||||||
|
Assert.Contains(ops, o =>
|
||||||
|
o.OperationType == ReplicationOperationType.Remove && o.MessageId == result.MessageId);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ParkedMessage_ReplicatesAParkOperation()
|
||||||
|
{
|
||||||
|
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
|
||||||
|
_ => throw new HttpRequestException("always fails"));
|
||||||
|
|
||||||
|
var result = await _service.EnqueueAsync(
|
||||||
|
StoreAndForwardCategory.ExternalSystem, "api", """{}""", maxRetries: 1);
|
||||||
|
await _service.RetryPendingMessagesAsync();
|
||||||
|
|
||||||
|
var ops = await WaitForReplicationAsync(2);
|
||||||
|
Assert.Contains(ops, o =>
|
||||||
|
o.OperationType == ReplicationOperationType.Park && o.MessageId == result.MessageId);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -310,4 +310,28 @@ public class StoreAndForwardServiceTests : IAsyncLifetime, IDisposable
|
|||||||
Assert.Equal(100, msg!.MaxRetries);
|
Assert.Equal(100, msg!.MaxRetries);
|
||||||
Assert.Equal(60000, msg.RetryIntervalMs);
|
Assert.Equal(60000, msg.RetryIntervalMs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── attemptImmediateDelivery: false — caller already attempted delivery ──
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task EnqueueAsync_AttemptImmediateDeliveryFalse_BuffersWithoutInvokingHandler()
|
||||||
|
{
|
||||||
|
// A caller that has already made its own delivery attempt passes
|
||||||
|
// attemptImmediateDelivery: false so the request is not dispatched twice.
|
||||||
|
var handlerCalls = 0;
|
||||||
|
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
|
||||||
|
_ => { Interlocked.Increment(ref handlerCalls); return Task.FromResult(true); });
|
||||||
|
|
||||||
|
var result = await _service.EnqueueAsync(
|
||||||
|
StoreAndForwardCategory.ExternalSystem, "api", """{}""",
|
||||||
|
attemptImmediateDelivery: false);
|
||||||
|
|
||||||
|
Assert.Equal(0, handlerCalls); // handler NOT invoked at enqueue time
|
||||||
|
Assert.True(result.WasBuffered);
|
||||||
|
|
||||||
|
var msg = await _storage.GetMessageByIdAsync(result.MessageId);
|
||||||
|
Assert.NotNull(msg);
|
||||||
|
Assert.Equal(StoreAndForwardMessageStatus.Pending, msg!.Status);
|
||||||
|
Assert.Equal(1, msg.RetryCount); // counts as the caller's first attempt
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user