6 Commits

25 changed files with 1070 additions and 132 deletions
+59 -9
View File
@@ -8,7 +8,7 @@
| Last reviewed | 2026-05-16 | | Last reviewed | 2026-05-16 |
| Reviewer | claude-agent | | Reviewer | claude-agent |
| Commit reviewed | `9c60592` | | Commit reviewed | `9c60592` |
| Open findings | 12 | | Open findings | 8 |
## Summary ## Summary
@@ -101,7 +101,7 @@ whose message references `DataConnectionLayer-001`.
|--|--| |--|--|
| Severity | High | | Severity | High |
| Category | Akka.NET conventions | | Category | Akka.NET conventions |
| Status | Open | | Status | Resolved |
| Location | `src/ScadaLink.DataConnectionLayer/Actors/DataConnectionManagerActor.cs:131-141` | | Location | `src/ScadaLink.DataConnectionLayer/Actors/DataConnectionManagerActor.cs:131-141` |
**Description** **Description**
@@ -127,7 +127,20 @@ after a crash and surface the lost-state condition rather than failing silently.
**Resolution** **Resolution**
_Unresolved._ Resolved 2026-05-16. The `DataConnectionManagerActor.SupervisorStrategy` was changed
from `Directive.Restart` to `Directive.Resume` for `DataConnectionActor` failures.
`Resume` keeps the existing actor instance and all its in-memory subscription state
(`_subscriptionsByInstance`, `_subscriptionIds`, `_subscribers`, quality counters)
intact across a transient handler exception, so the design doc's "transparent
re-subscribe" guarantee (WP-10) is preserved. The actor is a long-lived stateful
coordinator and its own Become/Stash reconnect state machine already recovers
connection-level faults — it does not need a restart. This also aligns with the
ScadaLink convention of `Resume` for coordinator actors. Regression test
`DCL002_ConnectionActorCrash_PreservesSubscriptionState` crashes the connection actor
via a synchronously-throwing write and asserts the subscription survives (health
report still shows 1 subscribed/resolved tag); it fails against the pre-fix `Restart`
code and passes after. Fixed by the commit whose message references
`DataConnectionLayer-002` (commit `<pending>`).
### DataConnectionLayer-003 — `RealOpcUaClient` callback/monitored-item dictionaries mutated without synchronization ### DataConnectionLayer-003 — `RealOpcUaClient` callback/monitored-item dictionaries mutated without synchronization
@@ -135,7 +148,7 @@ _Unresolved._
|--|--| |--|--|
| Severity | High | | Severity | High |
| Category | Concurrency & thread safety | | Category | Concurrency & thread safety |
| Status | Open | | Status | Resolved |
| Location | `src/ScadaLink.DataConnectionLayer/Adapters/RealOpcUaClient.cs:16-17,130-131,153,163,173,183-184` | | Location | `src/ScadaLink.DataConnectionLayer/Adapters/RealOpcUaClient.cs:16-17,130-131,153,163,173,183-184` |
**Description** **Description**
@@ -162,7 +175,18 @@ threads still read `_callbacks` concurrently with `RemoveSubscriptionAsync` /
**Resolution** **Resolution**
_Unresolved._ Resolved 2026-05-16. `_monitoredItems` and `_callbacks` in `RealOpcUaClient` were
changed from plain `Dictionary<,>` to `ConcurrentDictionary<,>`, and the two
`Remove(key)` call sites switched to `TryRemove`. This makes the maps safe to read
from the OPC Foundation SDK's publish threads (`MonitoredItem.Notification` reading
`_callbacks`) concurrently with subscribe/disconnect mutations on other threads.
`RealOpcUaClient` wraps concrete OPC Foundation SDK types (`ISession`,
`Subscription`, `MonitoredItem`) and cannot be exercised without a live OPC UA
server, so the regression is guarded structurally by
`DCL003_SharedDictionaryFields_AreConcurrentCollections` (a reflection test asserting
both fields are `ConcurrentDictionary<,>`); it fails against the pre-fix `Dictionary`
code and passes after. Fixed by the commit whose message references
`DataConnectionLayer-003` (commit `<pending>`).
### DataConnectionLayer-004 — Subscribe-time tag-resolution failure leaves the connection healthy but never recovers correctly ### DataConnectionLayer-004 — Subscribe-time tag-resolution failure leaves the connection healthy but never recovers correctly
@@ -170,7 +194,7 @@ _Unresolved._
|--|--| |--|--|
| Severity | High | | Severity | High |
| Category | Error handling & resilience | | Category | Error handling & resilience |
| Status | Open | | Status | Resolved |
| Location | `src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs:495-503,529-537` | | Location | `src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs:495-503,529-537` |
**Description** **Description**
@@ -197,7 +221,21 @@ Instance Actor so it reflects the documented behaviour.
**Resolution** **Resolution**
_Unresolved._ Resolved 2026-05-16. The subscribe background task now classifies each subscribe
exception via the new `IsConnectionLevelFailure` helper (`InvalidOperationException`
— thrown by `EnsureConnected()` — plus `SocketException`/`TimeoutException`/
`IOException` count as connection-level; anything else is a genuine resolution
failure). The classification is carried on `SubscribeTagResult.ConnectionLevelFailure`
and applied on the actor thread in `HandleSubscribeCompleted`: connection-level
failures no longer become unresolved tags and instead drive the reconnection state
machine (`HandleSubscribeCompleted` returns a flag and the Connected-state handler
calls `BecomeReconnecting`); genuine resolution failures still go to `_unresolvedTags`
and the retry timer, and now also push a `TagValueUpdate` with `QualityCode.Bad` to
the subscribing Instance Actor, matching the design doc's Tag Path Resolution step 2.
Regression tests `DCL004_GenuineTagResolutionFailure_PushesBadQualityToSubscriber`
and `DCL004_ConnectionLevelSubscribeFailure_TriggersReconnect_NotTagRetry` both fail
against the pre-fix code and pass after. Fixed by the commit whose message references
`DataConnectionLayer-004` (commit `<pending>`).
### DataConnectionLayer-005 — `WriteTimeout` option is documented and configured but never applied ### DataConnectionLayer-005 — `WriteTimeout` option is documented and configured but never applied
@@ -205,7 +243,7 @@ _Unresolved._
|--|--| |--|--|
| Severity | High | | Severity | High |
| Category | Design-document adherence | | Category | Design-document adherence |
| Status | Open | | Status | Resolved |
| Location | `src/ScadaLink.DataConnectionLayer/DataConnectionOptions.cs:15`, `src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs:573-590` | | Location | `src/ScadaLink.DataConnectionLayer/DataConnectionOptions.cs:15`, `src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs:573-590` |
**Description** **Description**
@@ -229,7 +267,19 @@ the initial-value seed and to `WriteBatchAndWaitAsync` paths if they are reachab
**Resolution** **Resolution**
_Unresolved._ Resolved 2026-05-16. `HandleWrite` now creates a `CancellationTokenSource(_options.WriteTimeout)`,
passes its token to `_adapter.WriteAsync(...)`, and disposes the source in the
continuation. A cancelled/timed-out write (`Task.IsCanceled` or a base
`OperationCanceledException`) is translated into a failed `WriteTagResponse` with a
`"Write timeout after Ns"` message, so a hung device write is bounded and the failure
is returned synchronously to the calling script (WP-11) instead of blocking until the
script's own Ask-timeout. (The `WriteBatchAndWaitAsync` adapter path already accepts
an explicit `timeout`/`CancellationToken` and is not invoked by `HandleWrite`, so no
change was needed there.) Regression test
`DCL005_Write_ThatHangs_TimesOutAndReturnsFailureSynchronously` uses an adapter whose
`WriteAsync` only completes when its token fires; it fails against the pre-fix
unbounded code and passes after. Fixed by the commit whose message references
`DataConnectionLayer-005` (commit `<pending>`).
### DataConnectionLayer-006 — Health quality counters not reset/recomputed after failover or re-subscribe ### DataConnectionLayer-006 — Health quality counters not reset/recomputed after failover or re-subscribe
+31 -6
View File
@@ -8,7 +8,7 @@
| Last reviewed | 2026-05-16 | | Last reviewed | 2026-05-16 |
| Reviewer | claude-agent | | Reviewer | claude-agent |
| Commit reviewed | `9c60592` | | Commit reviewed | `9c60592` |
| Open findings | 14 | | Open findings | 12 |
## Summary ## Summary
@@ -53,7 +53,7 @@ exercises a successful deployment or the lifecycle success paths.
|--|--| |--|--|
| Severity | High | | Severity | High |
| Category | Error handling & resilience | | Category | Error handling & resilience |
| Status | Open | | Status | Resolved |
| Location | `src/ScadaLink.DeploymentManager/DeploymentService.cs:141-199` | | Location | `src/ScadaLink.DeploymentManager/DeploymentService.cs:141-199` |
**Description** **Description**
@@ -81,7 +81,13 @@ every exit path out of the `try`.
**Resolution** **Resolution**
_Unresolved._ Resolved 2026-05-16 (commit `<pending>`): broadened the `catch` in
`DeployInstanceAsync` to `catch (Exception ex)` so any exception (transport,
serialization, DB, `InvalidOperationException` from an uninitialized
`CommunicationService`) marks the deployment record `Failed` with the error
message and audit-logs the failure, instead of escaping and leaving the record
stuck in `InProgress`. Regression test:
`DeployInstanceAsync_CommunicationThrowsUnexpectedException_RecordMarkedFailed`.
### DeploymentManager-002 — Failure-status write uses a possibly-cancelled cancellation token ### DeploymentManager-002 — Failure-status write uses a possibly-cancelled cancellation token
@@ -89,7 +95,7 @@ _Unresolved._
|--|--| |--|--|
| Severity | High | | Severity | High |
| Category | Error handling & resilience | | Category | Error handling & resilience |
| Status | Open | | Status | Resolved |
| Location | `src/ScadaLink.DeploymentManager/DeploymentService.cs:186-196` | | Location | `src/ScadaLink.DeploymentManager/DeploymentService.cs:186-196` |
**Description** **Description**
@@ -113,7 +119,14 @@ cancelled or timed out.
**Resolution** **Resolution**
_Unresolved._ Resolved 2026-05-16 (commit `<pending>`): the broadened `catch` block now
performs the failure-status write (`UpdateDeploymentRecordAsync`,
`SaveChangesAsync`) and the audit `LogAsync` with `CancellationToken.None`
instead of the operation's (possibly-cancelled) token, so the `Failed` status
is durably recorded even after a timeout/cancellation. The cleanup writes are
themselves wrapped in a `try`/`catch` that logs (without masking the original
error) if persistence still fails. Regression test:
`DeployInstanceAsync_FailureWrite_UsesNonCancellableToken`.
### DeploymentManager-003 — Successful-deployment cleanup is not atomic with the status write ### DeploymentManager-003 — Successful-deployment cleanup is not atomic with the status write
@@ -248,7 +261,19 @@ stale-rejection.
**Resolution** **Resolution**
_Unresolved._ _Unresolved._ Finding confirmed valid against the source — `GetDeploymentStatusAsync`
only reads the local `DeploymentRecord` via `GetDeploymentByDeploymentIdAsync`,
and `DeployInstanceAsync` unconditionally generates a new deployment ID with no
site reconciliation. Left Open: a proper fix is a cross-module new feature, not
a bug fix scoped to `ScadaLink.DeploymentManager`. It requires (1) a new
request/response message contract in `ScadaLink.Commons`, (2) a new
`CommunicationService` query method in `ScadaLink.Communication`, and (3)
site-side handling of the query — all outside the DeploymentManager module — plus
a design decision on the query protocol. The reconciliation logic in
`DeploymentService` cannot be implemented without those. Recommend tracking as a
dedicated cross-module feature work item (or, alternatively, amending the design
doc to delegate reconciliation entirely to site-side stale-rejection — also
outside this module's editable scope).
### DeploymentManager-007 — "Diff View" reduced to a hash comparison with no diff detail ### DeploymentManager-007 — "Diff View" reduced to a hash comparison with no diff detail
+33 -5
View File
@@ -8,7 +8,7 @@
| Last reviewed | 2026-05-16 | | Last reviewed | 2026-05-16 |
| Reviewer | claude-agent | | Reviewer | claude-agent |
| Commit reviewed | `9c60592` | | Commit reviewed | `9c60592` |
| Open findings | 13 | | Open findings | 11 |
## Summary ## Summary
@@ -109,7 +109,7 @@ transient-retry paths. Fixed by the commit whose message references
|--|--| |--|--|
| Severity | High | | Severity | High |
| Category | Error handling & resilience | | Category | Error handling & resilience |
| Status | Open | | Status | Resolved |
| Location | `src/ScadaLink.ExternalSystemGateway/ExternalSystemClient.cs:130`, `src/ScadaLink.ExternalSystemGateway/ServiceCollectionExtensions.cs:13` | | Location | `src/ScadaLink.ExternalSystemGateway/ExternalSystemClient.cs:130`, `src/ScadaLink.ExternalSystemGateway/ServiceCollectionExtensions.cs:13` |
**Description** **Description**
@@ -142,7 +142,24 @@ is classified as transient.
**Resolution** **Resolution**
_Unresolved._ Resolved 2026-05-16 (commit `<pending>`). `InvokeHttpAsync` now enforces a call
timeout: `ExternalSystemClient` takes an `IOptions<ExternalSystemGatewayOptions>` and
links a `CancellationTokenSource(DefaultHttpTimeout)` with the caller's token before
`SendAsync` and the response-body read, so the design's "timeout applies to the HTTP
request round-trip" guarantee now holds within the configured window (default 30s)
instead of `HttpClient`'s default 100s. A timeout is reclassified as a
`TransientExternalSystemException`; a caller-initiated cancellation is distinguished
from a timeout and propagated as `OperationCanceledException` rather than being
swallowed as transient. Regression tests:
`Call_SlowSystem_TimesOutAsTransientErrorWithinConfiguredWindow` and
`Call_CallerCancellation_IsNotMisreportedAsTimeout`.
Note (partial scope): the per-*system* `Timeout` field on `ExternalSystemDefinition`
remains unimplemented — adding it requires a change to `ScadaLink.Commons`, which is
outside this module's edit scope. Until that entity field exists, the configured
`DefaultHttpTimeout` is the effective per-call limit for every system. A follow-up
against the Commons module should add the `Timeout` field and have `InvokeHttpAsync`
prefer it over the default. This is a tracked follow-up, not a regression.
### ExternalSystemGateway-003 — `CachedCall` double-dispatches the HTTP request ### ExternalSystemGateway-003 — `CachedCall` double-dispatches the HTTP request
@@ -150,7 +167,7 @@ _Unresolved._
|--|--| |--|--|
| Severity | High | | Severity | High |
| Category | Correctness & logic bugs | | Category | Correctness & logic bugs |
| Status | Open | | Status | Resolved |
| Location | `src/ScadaLink.ExternalSystemGateway/ExternalSystemClient.cs:84-117` | | Location | `src/ScadaLink.ExternalSystemGateway/ExternalSystemClient.cs:84-117` |
**Description** **Description**
@@ -179,7 +196,18 @@ the duplicated logic.
**Resolution** **Resolution**
_Unresolved._ Resolved 2026-05-16 (commit `<pending>`). Re-triage: this finding was already fixed in
the codebase as a side effect of the `ExternalSystemGateway-001` fix and is no longer
reproducible against the current source. `StoreAndForwardService.EnqueueAsync` gained an
`attemptImmediateDelivery` parameter (recommendation approach (b)), and
`CachedCallAsync` passes `attemptImmediateDelivery: false` after its own first HTTP
attempt — so `EnqueueAsync` buffers the message for the background retry sweep without
re-invoking the registered delivery handler, eliminating the duplicate dispatch. A
dedicated regression test, `CachedCall_TransientFailure_DoesNotImmediatelyRedispatchViaRegisteredHandler`,
was added in this module's test suite: it registers a counting delivery handler, drives
a `CachedCall` whose HTTP attempt fails transiently, and asserts the handler is invoked
zero times during enqueue. The test was verified to fail if `attemptImmediateDelivery`
is flipped back to `true`.
### ExternalSystemGateway-004 — System retry settings are not honoured for cached calls/writes ### ExternalSystemGateway-004 — System retry settings are not honoured for cached calls/writes
+30 -5
View File
@@ -8,7 +8,7 @@
| Last reviewed | 2026-05-16 | | Last reviewed | 2026-05-16 |
| Reviewer | claude-agent | | Reviewer | claude-agent |
| Commit reviewed | `9c60592` | | Commit reviewed | `9c60592` |
| Open findings | 12 | | Open findings | 10 |
## Summary ## Summary
@@ -55,7 +55,7 @@ design-adherence gap.
|--|--| |--|--|
| Severity | High | | Severity | High |
| Category | Design-document adherence | | Category | Design-document adherence |
| Status | Open | | Status | Resolved |
| Location | `src/ScadaLink.HealthMonitoring/SiteHealthCollector.cs:104`, `src/ScadaLink.HealthMonitoring/HealthReportSender.cs:79` | | Location | `src/ScadaLink.HealthMonitoring/SiteHealthCollector.cs:104`, `src/ScadaLink.HealthMonitoring/HealthReportSender.cs:79` |
**Description** **Description**
@@ -79,7 +79,17 @@ the dead setter. Update the placeholder test accordingly once implemented.
**Resolution** **Resolution**
_Unresolved._ Resolved 2026-05-16 (commit `<pending>`). `HealthReportSender.ExecuteAsync` now
queries the existing public `StoreAndForwardStorage.GetBufferDepthByCategoryAsync()`
API alongside the parked-count call and feeds the per-category depths into
`SiteHealthCollector.SetStoreAndForwardDepths` (category enum names as keys), so the
documented store-and-forward buffer depth metric is populated in every emitted
report. Regression test `HealthReportSenderTests.ReportsIncludeStoreAndForwardBufferDepthsFromStorage`
verifies populated per-category depths. The obsolete placeholder test
`SiteHealthCollectorTests.StoreAndForwardBufferDepths_IsEmptyPlaceholder` continues
to pass — it only exercises the collector with no setter call and still correctly
asserts the empty default; it was left in place as the collector-level default-state
test. No StoreAndForward source was modified (existing public API only).
### HealthMonitoring-002 — `SiteHealthState` mutable fields written from multiple threads without synchronization ### HealthMonitoring-002 — `SiteHealthState` mutable fields written from multiple threads without synchronization
@@ -87,7 +97,7 @@ _Unresolved._
|--|--| |--|--|
| Severity | High | | Severity | High |
| Category | Concurrency & thread safety | | Category | Concurrency & thread safety |
| Status | Open | | Status | Resolved |
| Location | `src/ScadaLink.HealthMonitoring/SiteHealthState.cs:11`, `src/ScadaLink.HealthMonitoring/CentralHealthAggregator.cs:86`, `src/ScadaLink.HealthMonitoring/CentralHealthAggregator.cs:137` | | Location | `src/ScadaLink.HealthMonitoring/SiteHealthState.cs:11`, `src/ScadaLink.HealthMonitoring/CentralHealthAggregator.cs:86`, `src/ScadaLink.HealthMonitoring/CentralHealthAggregator.cs:137` |
**Description** **Description**
@@ -112,7 +122,22 @@ a single atomic reference swap.
**Resolution** **Resolution**
_Unresolved._ Resolved 2026-05-16 (commit `<pending>`). `SiteHealthState` is now a `sealed record`
with `init`-only properties. `CentralHealthAggregator.ProcessReport`,
`MarkHeartbeat`, and `CheckForOfflineSites` were rewritten to perform every state
transition as an atomic compare-and-swap (`TryAdd`/`TryUpdate`) producing a new
record instance — no field of a stored state is ever mutated in place. `ProcessReport`
uses an explicit CAS retry loop instead of the `AddOrUpdate` update delegate so the
sequence-number guard and the field writes are evaluated against the value actually
installed (this also closes the root cause behind HealthMonitoring-003). Reads via
`GetAllSiteStates`/`GetSiteState` now hand out immutable snapshots, so a concurrent
reader can never observe a torn or half-applied state. `LatestReport` was changed
from `SiteHealthReport` (`null!`) to `SiteHealthReport?`, making the contract honest;
all existing consumers (CentralUI, integration/perf tests) already null-checked it
and continue to build clean. Regression test
`CentralHealthAggregatorTests.ProcessReport_ConcurrentUpdates_NeverLoseSequenceOrTearState`
exercises concurrent report/heartbeat/read threads and asserts snapshot consistency
and no lost updates.
### HealthMonitoring-003 — Shared state mutated inside `ConcurrentDictionary.AddOrUpdate` update delegate ### HealthMonitoring-003 — Shared state mutated inside `ConcurrentDictionary.AddOrUpdate` update delegate
+14 -3
View File
@@ -8,7 +8,7 @@
| Last reviewed | 2026-05-16 | | Last reviewed | 2026-05-16 |
| Reviewer | claude-agent | | Reviewer | claude-agent |
| Commit reviewed | `9c60592` | | Commit reviewed | `9c60592` |
| Open findings | 11 | | Open findings | 10 |
## Summary ## Summary
@@ -54,7 +54,7 @@ no safe workaround.
|--|--| |--|--|
| Severity | High | | Severity | High |
| Category | Correctness & logic bugs | | Category | Correctness & logic bugs |
| Status | Open | | Status | Resolved |
| Location | `src/ScadaLink.Host/Program.cs:135-145` | | Location | `src/ScadaLink.Host/Program.cs:135-145` |
**Description** **Description**
@@ -81,7 +81,18 @@ checks and filter by tag). Add a regression test asserting a non-leader node ret
**Resolution** **Resolution**
_Unresolved._ Resolved 2026-05-16 (commit `<pending>`). Root cause confirmed against
`Program.cs`: the `/health/ready` mapping had no `Predicate`, so it executed all
three registered checks including the leader-only `active-node` check, while
`ActiveNodeHealthCheck` returns `Unhealthy` on any non-leader node — making a fully
operational standby central node permanently report `503`. Fix: added
`Predicate = check => check.Name != "active-node"` to the `/health/ready`
`HealthCheckOptions`, so readiness now reflects cluster membership + DB connectivity
only (REQ-HOST-4a); leadership remains reported solely by `/health/active`.
Regression test `HealthCheckTests.HealthReady_Endpoint_ExcludesActiveNodeCheck`
asserts the `active-node` check name does not appear in the `/health/ready`
response body; it failed before the fix and passes after. Full Host suite green
(156 passed).
### Host-002 — Akka.Persistence required by REQ-HOST-6 is not configured and not used ### Host-002 — Akka.Persistence required by REQ-HOST-6 is not configured and not used
+8 -19
View File
@@ -40,10 +40,10 @@ module file and counted in **Total**.
| Severity | Open findings | | Severity | Open findings |
|----------|---------------| |----------|---------------|
| Critical | 0 | | Critical | 0 |
| High | 39 | | High | 28 |
| Medium | 100 | | Medium | 100 |
| Low | 89 | | Low | 89 |
| **Total** | **228** | | **Total** | **217** |
## Module Status ## Module Status
@@ -55,11 +55,11 @@ module file and counted in **Total**.
| [Commons](Commons/findings.md) | 2026-05-16 | `9c60592` | 0/0/4/8 | 12 | 12 | | [Commons](Commons/findings.md) | 2026-05-16 | `9c60592` | 0/0/4/8 | 12 | 12 |
| [Communication](Communication/findings.md) | 2026-05-16 | `9c60592` | 0/0/5/3 | 8 | 11 | | [Communication](Communication/findings.md) | 2026-05-16 | `9c60592` | 0/0/5/3 | 8 | 11 |
| [ConfigurationDatabase](ConfigurationDatabase/findings.md) | 2026-05-16 | `9c60592` | 0/0/4/6 | 10 | 11 | | [ConfigurationDatabase](ConfigurationDatabase/findings.md) | 2026-05-16 | `9c60592` | 0/0/4/6 | 10 | 11 |
| [DataConnectionLayer](DataConnectionLayer/findings.md) | 2026-05-16 | `9c60592` | 0/4/6/2 | 12 | 13 | | [DataConnectionLayer](DataConnectionLayer/findings.md) | 2026-05-16 | `9c60592` | 0/0/6/2 | 8 | 13 |
| [DeploymentManager](DeploymentManager/findings.md) | 2026-05-16 | `9c60592` | 0/3/6/5 | 14 | 14 | | [DeploymentManager](DeploymentManager/findings.md) | 2026-05-16 | `9c60592` | 0/1/6/5 | 12 | 14 |
| [ExternalSystemGateway](ExternalSystemGateway/findings.md) | 2026-05-16 | `9c60592` | 0/2/7/4 | 13 | 14 | | [ExternalSystemGateway](ExternalSystemGateway/findings.md) | 2026-05-16 | `9c60592` | 0/0/7/4 | 11 | 14 |
| [HealthMonitoring](HealthMonitoring/findings.md) | 2026-05-16 | `9c60592` | 0/2/5/5 | 12 | 12 | | [HealthMonitoring](HealthMonitoring/findings.md) | 2026-05-16 | `9c60592` | 0/0/5/5 | 10 | 12 |
| [Host](Host/findings.md) | 2026-05-16 | `9c60592` | 0/1/3/7 | 11 | 11 | | [Host](Host/findings.md) | 2026-05-16 | `9c60592` | 0/0/3/7 | 10 | 11 |
| [InboundAPI](InboundAPI/findings.md) | 2026-05-16 | `9c60592` | 0/3/5/5 | 13 | 13 | | [InboundAPI](InboundAPI/findings.md) | 2026-05-16 | `9c60592` | 0/3/5/5 | 13 | 13 |
| [ManagementService](ManagementService/findings.md) | 2026-05-16 | `9c60592` | 0/3/5/5 | 13 | 13 | | [ManagementService](ManagementService/findings.md) | 2026-05-16 | `9c60592` | 0/3/5/5 | 13 | 13 |
| [NotificationService](NotificationService/findings.md) | 2026-05-16 | `9c60592` | 0/3/5/3 | 11 | 12 | | [NotificationService](NotificationService/findings.md) | 2026-05-16 | `9c60592` | 0/3/5/3 | 11 | 12 |
@@ -80,23 +80,12 @@ description, location, recommendation — lives in the module's `findings.md`.
_None open._ _None open._
### High (39) ### High (28)
| ID | Module | Title | | ID | Module | Title |
|----|--------|-------| |----|--------|-------|
| ClusterInfrastructure-001 | [ClusterInfrastructure](ClusterInfrastructure/findings.md) | Module implements none of its documented responsibilities | | ClusterInfrastructure-001 | [ClusterInfrastructure](ClusterInfrastructure/findings.md) | Module implements none of its documented responsibilities |
| DataConnectionLayer-002 | [DataConnectionLayer](DataConnectionLayer/findings.md) | `Restart` supervision discards all subscription state on connection-actor crash |
| DataConnectionLayer-003 | [DataConnectionLayer](DataConnectionLayer/findings.md) | `RealOpcUaClient` callback/monitored-item dictionaries mutated without synchronization |
| DataConnectionLayer-004 | [DataConnectionLayer](DataConnectionLayer/findings.md) | Subscribe-time tag-resolution failure leaves the connection healthy but never recovers correctly |
| DataConnectionLayer-005 | [DataConnectionLayer](DataConnectionLayer/findings.md) | `WriteTimeout` option is documented and configured but never applied |
| DeploymentManager-001 | [DeploymentManager](DeploymentManager/findings.md) | Unexpected exceptions leave the deployment record stuck in `InProgress` |
| DeploymentManager-002 | [DeploymentManager](DeploymentManager/findings.md) | Failure-status write uses a possibly-cancelled cancellation token |
| DeploymentManager-006 | [DeploymentManager](DeploymentManager/findings.md) | Query-the-site-before-redeploy idempotency requirement not implemented | | DeploymentManager-006 | [DeploymentManager](DeploymentManager/findings.md) | Query-the-site-before-redeploy idempotency requirement not implemented |
| ExternalSystemGateway-002 | [ExternalSystemGateway](ExternalSystemGateway/findings.md) | Per-system call timeout is never applied to HTTP requests |
| ExternalSystemGateway-003 | [ExternalSystemGateway](ExternalSystemGateway/findings.md) | `CachedCall` double-dispatches the HTTP request |
| HealthMonitoring-001 | [HealthMonitoring](HealthMonitoring/findings.md) | Store-and-forward buffer depth metric is never populated |
| HealthMonitoring-002 | [HealthMonitoring](HealthMonitoring/findings.md) | `SiteHealthState` mutable fields written from multiple threads without synchronization |
| Host-001 | [Host](Host/findings.md) | `/health/ready` includes the leader-only `active-node` check |
| InboundAPI-001 | [InboundAPI](InboundAPI/findings.md) | Singleton script handler cache mutated without synchronization | | InboundAPI-001 | [InboundAPI](InboundAPI/findings.md) | Singleton script handler cache mutated without synchronization |
| InboundAPI-003 | [InboundAPI](InboundAPI/findings.md) | API key compared with non-constant-time string equality | | InboundAPI-003 | [InboundAPI](InboundAPI/findings.md) | API key compared with non-constant-time string equality |
| InboundAPI-005 | [InboundAPI](InboundAPI/findings.md) | Compiled API scripts run with no script-trust-model enforcement | | InboundAPI-005 | [InboundAPI](InboundAPI/findings.md) | Compiled API scripts run with no script-trust-model enforcement |
@@ -213,7 +213,13 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
HandleSubscribe(req); HandleSubscribe(req);
break; break;
case SubscribeCompleted sc: case SubscribeCompleted sc:
HandleSubscribeCompleted(sc); // In Connected state, a connection-level subscribe failure must drive
// the reconnection state machine (DataConnectionLayer-004).
if (HandleSubscribeCompleted(sc))
{
_log.Warning("[{0}] Connection-level subscribe failure — entering Reconnecting", _connectionName);
BecomeReconnecting();
}
break; break;
case UnsubscribeTagsRequest req: case UnsubscribeTagsRequest req:
HandleUnsubscribe(req); HandleUnsubscribe(req);
@@ -514,8 +520,14 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
} }
catch (Exception ex) catch (Exception ex)
{ {
// WP-12: Tag path resolution failure — reported back as unresolved. // DataConnectionLayer-004: distinguish a connection-level fault
results.Add(new SubscribeTagResult(tagPath, AlreadySubscribed: false, Success: false, null, ex.Message)); // (adapter not connected / transport down) from a genuine
// node-not-found. Connection-level faults must drive the
// reconnection state machine, not be retried as unresolved tags.
var connectionLevel = IsConnectionLevelFailure(ex);
results.Add(new SubscribeTagResult(
tagPath, AlreadySubscribed: false, Success: false, null, ex.Message,
ConnectionLevelFailure: connectionLevel));
} }
} }
@@ -546,8 +558,10 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
/// Applies the result of an asynchronous subscribe on the actor thread. ALL mutation /// Applies the result of an asynchronous subscribe on the actor thread. ALL mutation
/// of subscription state and counters happens here — never on the background task — /// of subscription state and counters happens here — never on the background task —
/// so the actor model's single-threaded state guarantee holds. /// so the actor model's single-threaded state guarantee holds.
/// Returns <c>true</c> if any tag failed at connection level (DataConnectionLayer-004),
/// signalling the caller (only the Connected state) to enter Reconnecting.
/// </summary> /// </summary>
private void HandleSubscribeCompleted(SubscribeCompleted msg) private bool HandleSubscribeCompleted(SubscribeCompleted msg)
{ {
var instanceName = msg.Request.InstanceUniqueName; var instanceName = msg.Request.InstanceUniqueName;
if (!_subscriptionsByInstance.TryGetValue(instanceName, out var instanceTags)) if (!_subscriptionsByInstance.TryGetValue(instanceName, out var instanceTags))
@@ -557,6 +571,12 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
_subscriptionsByInstance[instanceName] = instanceTags; _subscriptionsByInstance[instanceName] = instanceTags;
} }
// DataConnectionLayer-004: if any tag failed because the adapter is not
// connected (a connection-level fault), the subscribe needs the reconnection
// state machine, not the tag-resolution retry. Drive a disconnect and let the
// request be re-stashed/retried after reconnect via ReSubscribeAll.
var connectionLevelFailure = msg.Results.Any(r => !r.Success && r.ConnectionLevelFailure);
foreach (var result in msg.Results) foreach (var result in msg.Results)
{ {
instanceTags.Add(result.TagPath); instanceTags.Add(result.TagPath);
@@ -572,13 +592,31 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
_totalSubscribed++; _totalSubscribed++;
_resolvedTags++; _resolvedTags++;
} }
else if (result.ConnectionLevelFailure)
{
// Connection-level fault — do not count as an unresolved tag.
// ReSubscribeAll after reconnect derives the tag from
// _subscriptionsByInstance (already updated above).
_log.Warning("[{0}] Subscribe for {1} failed at connection level: {2}",
_connectionName, result.TagPath, result.Error);
}
else else
{ {
// WP-12: mark unresolved so the periodic retry timer picks it up. // WP-12: genuine tag resolution failure — mark unresolved so the
// periodic retry timer picks it up.
_unresolvedTags.Add(result.TagPath); _unresolvedTags.Add(result.TagPath);
_totalSubscribed++; _totalSubscribed++;
_log.Debug("[{0}] Tag resolution failed for {1}: {2}", _log.Debug("[{0}] Tag resolution failed for {1}: {2}",
_connectionName, result.TagPath, result.Error); _connectionName, result.TagPath, result.Error);
// DataConnectionLayer-004 / design doc Tag Path Resolution step 2:
// mark the attribute quality `bad` so the Instance Actor sees a
// signal rather than staying Uncertain indefinitely.
if (_subscribers.TryGetValue(instanceName, out var subscriber))
{
subscriber.Tell(new TagValueUpdate(
_connectionName, result.TagPath, null, QualityCode.Bad, DateTimeOffset.UtcNow));
}
} }
} }
@@ -594,6 +632,27 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
msg.ReplyTo.Tell(new SubscribeTagsResponse( msg.ReplyTo.Tell(new SubscribeTagsResponse(
msg.Request.CorrelationId, instanceName, true, null, DateTimeOffset.UtcNow)); msg.Request.CorrelationId, instanceName, true, null, DateTimeOffset.UtcNow));
// The caller (Connected state only) decides whether to enter Reconnecting.
// In Connecting/Reconnecting the connection is not established anyway, so the
// existing reconnect cycle handles recovery without a re-trigger here.
return connectionLevelFailure;
}
/// <summary>
/// DataConnectionLayer-004: classifies a subscribe exception as a connection-level
/// fault (adapter not connected / transport down) versus a genuine tag-resolution
/// failure (the node does not exist on the device). Connection-level faults must
/// drive the reconnection state machine; resolution failures are retried on the
/// tag-resolution timer.
/// </summary>
private static bool IsConnectionLevelFailure(Exception ex)
{
var baseEx = ex is AggregateException agg ? agg.GetBaseException() : ex;
return baseEx is InvalidOperationException
or System.Net.Sockets.SocketException
or TimeoutException
or System.IO.IOException;
} }
private void HandleUnsubscribe(UnsubscribeTagsRequest request) private void HandleUnsubscribe(UnsubscribeTagsRequest request)
@@ -634,15 +693,29 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
_log.Debug("[{0}] Writing to tag {1}", _connectionName, request.TagPath); _log.Debug("[{0}] Writing to tag {1}", _connectionName, request.TagPath);
var sender = Sender; var sender = Sender;
// DataConnectionLayer-005: bound the write with WriteTimeout. A hung device
// write (TCP black-hole) would otherwise never complete, so PipeTo never
// fires and the calling script gets no DCL-level error. The CancellationToken
// is passed to the adapter; on timeout we translate cancellation into a
// failed WriteTagResponse so the failure is returned synchronously (WP-11).
var cts = new CancellationTokenSource(_options.WriteTimeout);
// WP-11: Write through DCL to device, failure returned synchronously // WP-11: Write through DCL to device, failure returned synchronously
_adapter.WriteAsync(request.TagPath, request.Value).ContinueWith(t => _adapter.WriteAsync(request.TagPath, request.Value, cts.Token).ContinueWith(t =>
{ {
cts.Dispose();
if (t.IsCompletedSuccessfully) if (t.IsCompletedSuccessfully)
{ {
var result = t.Result; var result = t.Result;
return new WriteTagResponse( return new WriteTagResponse(
request.CorrelationId, result.Success, result.ErrorMessage, DateTimeOffset.UtcNow); request.CorrelationId, result.Success, result.ErrorMessage, DateTimeOffset.UtcNow);
} }
if (t.IsCanceled || t.Exception?.GetBaseException() is OperationCanceledException)
{
return new WriteTagResponse(
request.CorrelationId, false,
$"Write timeout after {_options.WriteTimeout.TotalSeconds:F0}s", DateTimeOffset.UtcNow);
}
return new WriteTagResponse( return new WriteTagResponse(
request.CorrelationId, false, t.Exception?.GetBaseException().Message, DateTimeOffset.UtcNow); request.CorrelationId, false, t.Exception?.GetBaseException().Message, DateTimeOffset.UtcNow);
}).PipeTo(sender); }).PipeTo(sender);
@@ -824,7 +897,8 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
internal record TagResolutionSucceeded(string TagPath, string SubscriptionId); internal record TagResolutionSucceeded(string TagPath, string SubscriptionId);
internal record RetryTagResolution; internal record RetryTagResolution;
internal record SubscribeTagResult( internal record SubscribeTagResult(
string TagPath, bool AlreadySubscribed, bool Success, string? SubscriptionId, string? Error); string TagPath, bool AlreadySubscribed, bool Success, string? SubscriptionId, string? Error,
bool ConnectionLevelFailure = false);
internal record SubscribeCompleted( internal record SubscribeCompleted(
SubscribeTagsRequest Request, IActorRef ReplyTo, IReadOnlyList<SubscribeTagResult> Results); SubscribeTagsRequest Request, IActorRef ReplyTo, IReadOnlyList<SubscribeTagResult> Results);
public record GetHealthReport; public record GetHealthReport;
@@ -125,8 +125,20 @@ public class DataConnectionManagerActor : ReceiveActor
} }
/// <summary> /// <summary>
/// OneForOneStrategy with Restart for connection actors — a failed connection /// OneForOneStrategy with Resume for connection actors.
/// should restart and attempt reconnection. ///
/// DataConnectionLayer-002: a DataConnectionActor is a long-lived, stateful
/// coordinator — its in-memory subscription registry (_subscriptionsByInstance,
/// _subscriptionIds, _subscribers) is the only record of which Instance Actors
/// subscribed to which tags, and there is no durable store to rebuild it from.
/// Restart would create a fresh instance and silently discard that registry,
/// breaking the design doc's "transparent re-subscribe" guarantee (WP-10):
/// subscribers would never be re-subscribed and would sit at stale quality with
/// no error. Resume keeps the actor instance and its state intact, so a transient
/// exception in a message handler does not lose subscription state. The actor's
/// own Become/Stash reconnect state machine already recovers connection-level
/// faults, so it does not need a restart to re-establish the connection.
/// This matches the ScadaLink convention of Resume for coordinator actors.
/// </summary> /// </summary>
protected override SupervisorStrategy SupervisorStrategy() protected override SupervisorStrategy SupervisorStrategy()
{ {
@@ -135,8 +147,8 @@ public class DataConnectionManagerActor : ReceiveActor
withinTimeRange: TimeSpan.FromMinutes(1), withinTimeRange: TimeSpan.FromMinutes(1),
decider: Decider.From(ex => decider: Decider.From(ex =>
{ {
_log.Warning(ex, "DataConnectionActor threw exception, restarting"); _log.Warning(ex, "DataConnectionActor threw exception, resuming (subscription state preserved)");
return Directive.Restart; return Directive.Resume;
})); }));
} }
} }
@@ -1,3 +1,4 @@
using System.Collections.Concurrent;
using System.Security.Cryptography.X509Certificates; using System.Security.Cryptography.X509Certificates;
using Opc.Ua; using Opc.Ua;
using Opc.Ua.Client; using Opc.Ua.Client;
@@ -13,8 +14,14 @@ public class RealOpcUaClient : IOpcUaClient
{ {
private ISession? _session; private ISession? _session;
private Subscription? _subscription; private Subscription? _subscription;
private readonly Dictionary<string, MonitoredItem> _monitoredItems = new();
private readonly Dictionary<string, Action<string, object?, DateTime, uint>> _callbacks = new(); // DataConnectionLayer-003: these maps are read from the OPC Foundation SDK's
// internal publish threads (the MonitoredItem.Notification handler reads
// _callbacks) concurrently with subscribe/disconnect mutations that run on
// thread-pool threads. Plain Dictionary access during a concurrent resize or
// Clear() is undefined behaviour, so they must be ConcurrentDictionary.
private readonly ConcurrentDictionary<string, MonitoredItem> _monitoredItems = new();
private readonly ConcurrentDictionary<string, Action<string, object?, DateTime, uint>> _callbacks = new();
private volatile bool _connectionLostFired; private volatile bool _connectionLostFired;
private OpcUaConnectionOptions _options = new(); private OpcUaConnectionOptions _options = new();
private readonly OpcUaGlobalOptions _globalOptions; private readonly OpcUaGlobalOptions _globalOptions;
@@ -180,8 +187,8 @@ public class RealOpcUaClient : IOpcUaClient
{ {
_subscription.RemoveItem(item); _subscription.RemoveItem(item);
await _subscription.ApplyChangesAsync(cancellationToken); await _subscription.ApplyChangesAsync(cancellationToken);
_monitoredItems.Remove(subscriptionHandle); _monitoredItems.TryRemove(subscriptionHandle, out _);
_callbacks.Remove(subscriptionHandle); _callbacks.TryRemove(subscriptionHandle, out _);
} }
} }
@@ -183,19 +183,53 @@ public class DeploymentService
: Result<DeploymentRecord>.Failure( : Result<DeploymentRecord>.Failure(
$"Deployment failed: {response.ErrorMessage ?? "Unknown error"}"); $"Deployment failed: {response.ErrorMessage ?? "Unknown error"}");
} }
catch (Exception ex) when (ex is TimeoutException or OperationCanceledException) catch (Exception ex)
{ {
// DeploymentManager-001: any exception out of the try (timeout,
// cancellation, transport, serialization, DB) must leave the
// deployment record as Failed -- the design requires an interrupted
// deployment to be treated as failed, never stuck in InProgress.
//
// DeploymentManager-002: the failure-status write must NOT use the
// operation's cancellation token. If the operation was cancelled or
// timed out, that token is already cancelled and the cleanup writes
// would themselves throw before the Failed status is persisted.
// Use CancellationToken.None so the failure is durably recorded.
var isTimeout = ex is TimeoutException or OperationCanceledException;
record.Status = DeploymentStatus.Failed; record.Status = DeploymentStatus.Failed;
record.ErrorMessage = $"Communication failure: {ex.Message}"; record.ErrorMessage = isTimeout
? $"Communication failure: {ex.Message}"
: $"Deployment error: {ex.Message}";
record.CompletedAt = DateTimeOffset.UtcNow; record.CompletedAt = DateTimeOffset.UtcNow;
await _repository.UpdateDeploymentRecordAsync(record, cancellationToken);
await _repository.SaveChangesAsync(cancellationToken);
await _auditService.LogAsync(user, "DeployFailed", "Instance", instanceId.ToString(), try
instance.UniqueName, new { DeploymentId = deploymentId, Error = ex.Message }, {
cancellationToken); await _repository.UpdateDeploymentRecordAsync(record, CancellationToken.None);
await _repository.SaveChangesAsync(CancellationToken.None);
return Result<DeploymentRecord>.Failure($"Deployment timed out: {ex.Message}"); await _auditService.LogAsync(user, "DeployFailed", "Instance", instanceId.ToString(),
instance.UniqueName, new { DeploymentId = deploymentId, Error = ex.Message },
CancellationToken.None);
}
catch (Exception cleanupEx)
{
// The deployment already failed; a failed cleanup write must not
// mask the original error. Log loudly so an operator can reconcile.
_logger.LogError(cleanupEx,
"Failed to persist Failed status for deployment {DeploymentId} of instance {Instance} " +
"after deployment error: {Error}",
deploymentId, instance.UniqueName, ex.Message);
}
_logger.LogError(ex,
"Deployment {DeploymentId} for instance {Instance} failed",
deploymentId, instance.UniqueName);
return Result<DeploymentRecord>.Failure(
isTimeout
? $"Deployment timed out: {ex.Message}"
: $"Deployment failed: {ex.Message}");
} }
} }
@@ -3,6 +3,7 @@ using System.Net.Http.Headers;
using System.Text; using System.Text;
using System.Text.Json; using System.Text.Json;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ScadaLink.Commons.Entities.ExternalSystems; using ScadaLink.Commons.Entities.ExternalSystems;
using ScadaLink.Commons.Interfaces.Repositories; using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Interfaces.Services; using ScadaLink.Commons.Interfaces.Services;
@@ -22,17 +23,20 @@ public class ExternalSystemClient : IExternalSystemClient
private readonly IExternalSystemRepository _repository; private readonly IExternalSystemRepository _repository;
private readonly StoreAndForwardService? _storeAndForward; private readonly StoreAndForwardService? _storeAndForward;
private readonly ILogger<ExternalSystemClient> _logger; private readonly ILogger<ExternalSystemClient> _logger;
private readonly ExternalSystemGatewayOptions _options;
public ExternalSystemClient( public ExternalSystemClient(
IHttpClientFactory httpClientFactory, IHttpClientFactory httpClientFactory,
IExternalSystemRepository repository, IExternalSystemRepository repository,
ILogger<ExternalSystemClient> logger, ILogger<ExternalSystemClient> logger,
StoreAndForwardService? storeAndForward = null) StoreAndForwardService? storeAndForward = null,
IOptions<ExternalSystemGatewayOptions>? options = null)
{ {
_httpClientFactory = httpClientFactory; _httpClientFactory = httpClientFactory;
_repository = repository; _repository = repository;
_logger = logger; _logger = logger;
_storeAndForward = storeAndForward; _storeAndForward = storeAndForward;
_options = options?.Value ?? new ExternalSystemGatewayOptions();
} }
/// <summary> /// <summary>
@@ -198,22 +202,59 @@ public class ExternalSystemClient : IExternalSystemClient
} }
} }
// Enforce the per-call timeout. ExternalSystemDefinition has no per-system
// Timeout field yet, so the configured DefaultHttpTimeout is the effective
// round-trip limit (the design's "timeout applies to the HTTP request
// round-trip" guarantee). A linked CTS lets us distinguish a timeout from a
// caller-initiated cancellation: only the timeout is reclassified as transient.
using var timeoutCts = new CancellationTokenSource(_options.DefaultHttpTimeout);
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(
cancellationToken, timeoutCts.Token);
HttpResponseMessage response; HttpResponseMessage response;
try try
{ {
response = await client.SendAsync(request, cancellationToken); response = await client.SendAsync(request, linkedCts.Token);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// The caller asked to abandon the work — do not reclassify as transient.
throw;
}
catch (OperationCanceledException ex) when (timeoutCts.IsCancellationRequested)
{
// Our own timeout elapsed — a transient failure per the design.
throw ErrorClassifier.AsTransient(
$"Timeout calling {system.Name} after {_options.DefaultHttpTimeout.TotalSeconds:0.##}s", ex);
} }
catch (Exception ex) when (ErrorClassifier.IsTransient(ex)) catch (Exception ex) when (ErrorClassifier.IsTransient(ex))
{ {
throw ErrorClassifier.AsTransient($"Connection error to {system.Name}: {ex.Message}", ex); throw ErrorClassifier.AsTransient($"Connection error to {system.Name}: {ex.Message}", ex);
} }
if (response.IsSuccessStatusCode) // The timeout also covers reading the response body (the design's
// "round-trip" guarantee), so the linked token is used for the read too.
string body;
try
{ {
return await response.Content.ReadAsStringAsync(cancellationToken); body = await response.Content.ReadAsStringAsync(linkedCts.Token);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
throw;
}
catch (OperationCanceledException ex) when (timeoutCts.IsCancellationRequested)
{
throw ErrorClassifier.AsTransient(
$"Timeout reading response from {system.Name} after {_options.DefaultHttpTimeout.TotalSeconds:0.##}s", ex);
} }
var errorBody = await response.Content.ReadAsStringAsync(cancellationToken); if (response.IsSuccessStatusCode)
{
return body;
}
var errorBody = body;
if (ErrorClassifier.IsTransient(response.StatusCode)) if (ErrorClassifier.IsTransient(response.StatusCode))
{ {
@@ -33,16 +33,24 @@ public class CentralHealthAggregator : BackgroundService, ICentralHealthAggregat
/// Only replaces stored state if incoming sequence number is greater than last received. /// Only replaces stored state if incoming sequence number is greater than last received.
/// Auto-marks previously offline sites as online. /// Auto-marks previously offline sites as online.
/// </summary> /// </summary>
/// <remarks>
/// <see cref="SiteHealthState"/> is immutable: each transition produces a brand-new
/// instance, and the dictionary entry is replaced atomically. The mutation is
/// performed in a compare-and-swap retry loop rather than via the
/// <c>AddOrUpdate</c> update delegate so the sequence-number guard and the field
/// writes are evaluated as a single atomic step against the value actually
/// installed — the <c>AddOrUpdate</c> delegate may be invoked more than once
/// under contention and could otherwise act on a value that is then discarded.
/// </remarks>
public void ProcessReport(SiteHealthReport report) public void ProcessReport(SiteHealthReport report)
{ {
var now = _timeProvider.GetUtcNow(); var now = _timeProvider.GetUtcNow();
_siteStates.AddOrUpdate( while (true)
report.SiteId, {
_ => if (!_siteStates.TryGetValue(report.SiteId, out var existing))
{ {
_logger.LogInformation("Site {SiteId} registered with sequence #{Seq}", report.SiteId, report.SequenceNumber); var registered = new SiteHealthState
return new SiteHealthState
{ {
SiteId = report.SiteId, SiteId = report.SiteId,
LatestReport = report, LatestReport = report,
@@ -51,50 +59,84 @@ public class CentralHealthAggregator : BackgroundService, ICentralHealthAggregat
LastSequenceNumber = report.SequenceNumber, LastSequenceNumber = report.SequenceNumber,
IsOnline = true IsOnline = true
}; };
},
(_, existing) => if (_siteStates.TryAdd(report.SiteId, registered))
{
_logger.LogInformation(
"Site {SiteId} registered with sequence #{Seq}", report.SiteId, report.SequenceNumber);
return;
}
// Lost the race — another thread registered first; retry as an update.
continue;
}
if (report.SequenceNumber <= existing.LastSequenceNumber)
{ {
if (report.SequenceNumber <= existing.LastSequenceNumber) _logger.LogDebug(
"Rejecting stale report from site {SiteId}: seq {Incoming} <= {Last}",
report.SiteId, report.SequenceNumber, existing.LastSequenceNumber);
return;
}
var updated = existing with
{
LatestReport = report,
LastReportReceivedAt = now,
LastHeartbeatAt = now,
LastSequenceNumber = report.SequenceNumber,
IsOnline = true
};
if (_siteStates.TryUpdate(report.SiteId, updated, existing))
{
if (!existing.IsOnline)
{ {
_logger.LogDebug( _logger.LogInformation(
"Rejecting stale report from site {SiteId}: seq {Incoming} <= {Last}", "Site {SiteId} is back online (seq #{Seq})", report.SiteId, report.SequenceNumber);
report.SiteId, report.SequenceNumber, existing.LastSequenceNumber);
return existing;
} }
return;
}
var wasOffline = !existing.IsOnline; // CAS lost — the entry changed under us; retry with the fresh value.
existing.LatestReport = report; }
existing.LastReportReceivedAt = now;
existing.LastHeartbeatAt = now;
existing.LastSequenceNumber = report.SequenceNumber;
existing.IsOnline = true;
if (wasOffline)
{
_logger.LogInformation("Site {SiteId} is back online (seq #{Seq})", report.SiteId, report.SequenceNumber);
}
return existing;
});
} }
/// <summary> /// <summary>
/// Bumps the last-seen timestamp for a site already known via a prior /// Bumps the last-seen timestamp for a site already known via a prior
/// SiteHealthReport. Heartbeats from sites we have not yet received a /// SiteHealthReport. Heartbeats from sites we have not yet received a
/// full report from are ignored — registration only happens on report. /// full report from are ignored — registration only happens on report.
/// The update is an atomic compare-and-swap of the immutable state.
/// </summary> /// </summary>
public void MarkHeartbeat(string siteId, DateTimeOffset receivedAt) public void MarkHeartbeat(string siteId, DateTimeOffset receivedAt)
{ {
if (!_siteStates.TryGetValue(siteId, out var state)) while (true)
return;
if (receivedAt > state.LastHeartbeatAt)
state.LastHeartbeatAt = receivedAt;
if (!state.IsOnline)
{ {
state.IsOnline = true; if (!_siteStates.TryGetValue(siteId, out var existing))
_logger.LogInformation("Site {SiteId} is back online (heartbeat)", siteId); return;
var newHeartbeat = receivedAt > existing.LastHeartbeatAt
? receivedAt
: existing.LastHeartbeatAt;
// Nothing to change — avoid a needless swap.
if (newHeartbeat == existing.LastHeartbeatAt && existing.IsOnline)
return;
var updated = existing with
{
LastHeartbeatAt = newHeartbeat,
IsOnline = true
};
if (_siteStates.TryUpdate(siteId, updated, existing))
{
if (!existing.IsOnline)
_logger.LogInformation("Site {SiteId} is back online (heartbeat)", siteId);
return;
}
// CAS lost — retry with the fresh value.
} }
} }
@@ -143,13 +185,20 @@ public class CentralHealthAggregator : BackgroundService, ICentralHealthAggregat
var state = kvp.Value; var state = kvp.Value;
if (!state.IsOnline) continue; if (!state.IsOnline) continue;
// Use LastHeartbeatAt — heartbeats arrive every ~5s from any // Use LastHeartbeatAt — heartbeats arrive frequently from any
// healthy site node, so OfflineTimeout only fires when no node // healthy site node (cadence owned by Cluster Infrastructure /
// can reach central, not during single-node failovers. // SiteCommunicationActor), so OfflineTimeout only fires when no
// node can reach central, not during single-node failovers.
var elapsed = now - state.LastHeartbeatAt; var elapsed = now - state.LastHeartbeatAt;
if (elapsed > _options.OfflineTimeout) if (elapsed <= _options.OfflineTimeout)
continue;
// Atomically swap to an offline copy. If the CAS loses to a
// concurrent report/heartbeat the site was just heard from, so
// leaving it online is the correct outcome — no retry needed.
var offline = state with { IsOnline = false };
if (_siteStates.TryUpdate(kvp.Key, offline, state))
{ {
state.IsOnline = false;
_logger.LogWarning( _logger.LogWarning(
"Site {SiteId} marked offline — no signal for {Elapsed}s (timeout: {Timeout}s)", "Site {SiteId} marked offline — no signal for {Elapsed}s (timeout: {Timeout}s)",
state.SiteId, elapsed.TotalSeconds, _options.OfflineTimeout.TotalSeconds); state.SiteId, elapsed.TotalSeconds, _options.OfflineTimeout.TotalSeconds);
@@ -84,6 +84,20 @@ public class HealthReportSender : BackgroundService
_collector.SetParkedMessageCount(parkedCount); _collector.SetParkedMessageCount(parkedCount);
} }
catch { /* Non-fatal — parked count will be 0 */ } catch { /* Non-fatal — parked count will be 0 */ }
try
{
// Per-category pending-message buffer depths (the documented
// "store-and-forward buffer depth" triage metric). Keyed by
// StoreAndForwardCategory name so the central dashboard can
// render external/notification/DB-write depths separately.
var depthsByCategory = await _sfStorage.GetBufferDepthByCategoryAsync();
var depths = depthsByCategory.ToDictionary(
kvp => kvp.Key.ToString(),
kvp => kvp.Value);
_collector.SetStoreAndForwardDepths(depths);
}
catch { /* Non-fatal — buffer depths will be empty */ }
} }
var seq = Interlocked.Increment(ref _sequenceNumber); var seq = Interlocked.Increment(ref _sequenceNumber);
@@ -4,26 +4,37 @@ namespace ScadaLink.HealthMonitoring;
/// <summary> /// <summary>
/// In-memory state for a single site's health, stored by the central aggregator. /// In-memory state for a single site's health, stored by the central aggregator.
/// Immutable: every state transition produces a new instance which the aggregator
/// installs into its <c>ConcurrentDictionary</c> via an atomic compare-and-swap.
/// This makes handing the reference straight to UI callers safe — a consumer can
/// never observe a torn or half-applied update.
/// </summary> /// </summary>
public class SiteHealthState public sealed record SiteHealthState
{ {
public required string SiteId { get; init; } public required string SiteId { get; init; }
public SiteHealthReport LatestReport { get; set; } = null!;
/// <summary>
/// The latest full <see cref="SiteHealthReport"/> received for the site, or
/// <c>null</c> if the site is known only via heartbeats and has not yet sent
/// a report.
/// </summary>
public SiteHealthReport? LatestReport { get; init; }
/// <summary> /// <summary>
/// Time the latest full <see cref="SiteHealthReport"/> was processed. /// Time the latest full <see cref="SiteHealthReport"/> was processed.
/// Used by the UI to surface report staleness during failover. /// Used by the UI to surface report staleness during failover.
/// </summary> /// </summary>
public DateTimeOffset LastReportReceivedAt { get; set; } public DateTimeOffset LastReportReceivedAt { get; init; }
/// <summary> /// <summary>
/// Time the most recent signal of any kind (full report OR ~5s heartbeat) /// Time the most recent signal of any kind (full report OR heartbeat) was
/// was received. Drives offline detection — heartbeats from the standby /// received. Drives offline detection — heartbeats from the standby keep the
/// keep the site marked online even when the active node is unable to /// site marked online even when the active node is unable to produce a report
/// produce a report (mid-failover, brief stalls). /// (mid-failover, brief stalls). See the heartbeat scheduler owned by the
/// Cluster Infrastructure / SiteCommunicationActor for the actual cadence.
/// </summary> /// </summary>
public DateTimeOffset LastHeartbeatAt { get; set; } public DateTimeOffset LastHeartbeatAt { get; init; }
public long LastSequenceNumber { get; set; } public long LastSequenceNumber { get; init; }
public bool IsOnline { get; set; } public bool IsOnline { get; init; }
} }
+6 -1
View File
@@ -131,9 +131,14 @@ try
app.UseAuthorization(); app.UseAuthorization();
app.UseAntiforgery(); app.UseAntiforgery();
// WP-12: Map readiness endpoint — returns 503 until all checks pass, 200 when ready // WP-12: Map readiness endpoint — returns 503 until ready, 200 when ready.
// REQ-HOST-4a defines readiness as cluster membership + DB connectivity,
// explicitly NOT cluster leadership. The leader-only "active-node" check is
// excluded here so a fully operational standby central node reports ready;
// leadership is reported separately on /health/active.
app.MapHealthChecks("/health/ready", new HealthCheckOptions app.MapHealthChecks("/health/ready", new HealthCheckOptions
{ {
Predicate = check => check.Name != "active-node",
ResponseWriter = UIResponseWriter.WriteHealthCheckUIResponse ResponseWriter = UIResponseWriter.WriteHealthCheckUIResponse
}); });
@@ -512,6 +512,106 @@ public class DataConnectionActorTests : TestKit
Assert.Equal(instances * tagsPerInstance, report.ResolvedTags); Assert.Equal(instances * tagsPerInstance, report.ResolvedTags);
} }
// ── DataConnectionLayer-004: subscribe-time failure classification ──
[Fact]
public async Task DCL004_GenuineTagResolutionFailure_PushesBadQualityToSubscriber()
{
// Regression test for DataConnectionLayer-004. When a tag genuinely fails to
// resolve at subscribe time, the design doc (Tag Path Resolution, step 2)
// requires the attribute to be marked quality `bad`. The pre-fix code only
// logged and added the tag to _unresolvedTags — the Instance Actor never got
// a signal. After the fix, a bad-quality TagValueUpdate is pushed.
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
_mockAdapter.Status.Returns(ConnectionHealth.Connected);
// Genuine node-not-found: a non-connection exception.
_mockAdapter.SubscribeAsync("missing/tag", Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
.Returns(Task.FromException<string>(new KeyNotFoundException("node not found")));
_mockAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
.Returns(new ReadResult(false, null, null));
var actor = CreateConnectionActor("dcl004-bad-quality");
await Task.Delay(300);
actor.Tell(new SubscribeTagsRequest(
"c1", "inst1", "dcl004-bad-quality", ["missing/tag"], DateTimeOffset.UtcNow));
// Two messages arrive: the subscribe ack and a bad-quality update for the tag.
var bad = ExpectMsg<TagValueUpdate>(TimeSpan.FromSeconds(5));
Assert.Equal("missing/tag", bad.TagPath);
Assert.Equal(QualityCode.Bad, bad.Quality);
var ack = ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(5));
Assert.True(ack.Success);
}
[Fact]
public async Task DCL004_ConnectionLevelSubscribeFailure_TriggersReconnect_NotTagRetry()
{
// Regression test for DataConnectionLayer-004. A subscribe failing because the
// adapter is not connected (InvalidOperationException from EnsureConnected) is
// a connection problem, not a bad tag path. The pre-fix code misclassified it
// as an unresolved tag and retried it on the 10s tag-resolution timer. After
// the fix it drives the reconnection state machine instead.
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
_mockAdapter.Status.Returns(ConnectionHealth.Connected);
_mockAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
.Returns(Task.FromException<string>(
new InvalidOperationException("OPC UA client is not connected.")));
_mockAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
.Returns(new ReadResult(false, null, null));
var actor = CreateConnectionActor("dcl004-conn-level");
await Task.Delay(300);
actor.Tell(new SubscribeTagsRequest(
"c1", "inst1", "dcl004-conn-level", ["some/tag"], DateTimeOffset.UtcNow));
// The connection-level failure must drive the actor into Reconnecting, which
// re-attempts ConnectAsync. Pre-fix the actor stayed Connected and only armed
// the tag-resolution timer, so ConnectAsync is called exactly once.
AwaitCondition(() =>
_mockAdapter.ReceivedCalls().Count(c => c.GetMethodInfo().Name == "ConnectAsync") >= 2,
TimeSpan.FromSeconds(5));
}
// ── DataConnectionLayer-005: WriteTimeout must bound a hung write ──
[Fact]
public async Task DCL005_Write_ThatHangs_TimesOutAndReturnsFailureSynchronously()
{
// Regression test for DataConnectionLayer-005. HandleWrite called WriteAsync
// with no CancellationToken and no timeout, so a hung device write never
// produced a WriteTagResponse. The calling script would block until its own
// Ask-timeout with no DCL-level error. After the fix, _options.WriteTimeout
// bounds the write and a timeout is surfaced as a failed WriteTagResponse.
_options.WriteTimeout = TimeSpan.FromMilliseconds(300);
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
// WriteAsync never completes unless its cancellation token fires.
_mockAdapter.WriteAsync("tag1", 42, Arg.Any<CancellationToken>())
.Returns(ci =>
{
var ct = ci.Arg<CancellationToken>();
var tcs = new TaskCompletionSource<WriteResult>();
ct.Register(() => tcs.TrySetCanceled(ct));
return tcs.Task;
});
var actor = CreateConnectionActor("dcl005-write-timeout");
await Task.Delay(300); // reach Connected state
actor.Tell(new WriteTagRequest("corr1", "dcl005-write-timeout", "tag1", 42, DateTimeOffset.UtcNow));
var response = ExpectMsg<WriteTagResponse>(TimeSpan.FromSeconds(3));
Assert.False(response.Success);
Assert.Contains("timeout", response.ErrorMessage, StringComparison.OrdinalIgnoreCase);
}
[Fact] [Fact]
public async Task DCL001_SubscribeWithFailedTags_CountsResolvedAndUnresolvedSeparately() public async Task DCL001_SubscribeWithFailedTags_CountsResolvedAndUnresolvedSeparately()
{ {
@@ -533,7 +633,11 @@ public class DataConnectionActorTests : TestKit
actor.Tell(new SubscribeTagsRequest( actor.Tell(new SubscribeTagsRequest(
"c1", "inst1", "dcl001-failed-tags", "c1", "inst1", "dcl001-failed-tags",
["good/a", "good/b", "good/c", "bad/x", "bad/y"], DateTimeOffset.UtcNow)); ["good/a", "good/b", "good/c", "bad/x", "bad/y"], DateTimeOffset.UtcNow));
ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(5));
// Two genuine resolution failures now also push a bad-quality TagValueUpdate
// to the subscriber (DataConnectionLayer-004); skip past those to the ack.
var ack = FishForMessage<SubscribeTagsResponse>(_ => true, TimeSpan.FromSeconds(5));
Assert.True(ack.Success);
actor.Tell(new DataConnectionActor.GetHealthReport()); actor.Tell(new DataConnectionActor.GetHealthReport());
var report = ExpectMsg<DataConnectionHealthReport>(TimeSpan.FromSeconds(3)); var report = ExpectMsg<DataConnectionHealthReport>(TimeSpan.FromSeconds(3));
@@ -3,6 +3,7 @@ using Akka.TestKit.Xunit2;
using NSubstitute; using NSubstitute;
using ScadaLink.Commons.Interfaces.Protocol; using ScadaLink.Commons.Interfaces.Protocol;
using ScadaLink.Commons.Messages.DataConnection; using ScadaLink.Commons.Messages.DataConnection;
using ScadaLink.Commons.Types.Enums;
using ScadaLink.DataConnectionLayer.Actors; using ScadaLink.DataConnectionLayer.Actors;
using ScadaLink.HealthMonitoring; using ScadaLink.HealthMonitoring;
@@ -57,6 +58,52 @@ public class DataConnectionManagerActorTests : TestKit
Assert.Contains("Unknown connection", response.ErrorMessage); Assert.Contains("Unknown connection", response.ErrorMessage);
} }
[Fact]
public async Task DCL002_ConnectionActorCrash_PreservesSubscriptionState()
{
// Regression test for DataConnectionLayer-002. The supervisor used
// Directive.Restart, which discards the connection actor's in-memory
// subscription registry — breaking the design doc's "transparent
// re-subscribe" guarantee (subscribers are never re-subscribed and sit at
// stale quality forever). After the fix the supervisor uses Resume, which
// keeps the actor instance and its state across a transient exception.
var mockAdapter = Substitute.For<IDataConnection>();
mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
mockAdapter.Status.Returns(ConnectionHealth.Connected);
mockAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
.Returns("sub-001");
mockAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
.Returns(new ReadResult(false, null, null));
// A write throws synchronously, escaping the message handler and crashing
// the connection actor — exercising the supervisor strategy.
mockAdapter.WriteAsync(Arg.Any<string>(), Arg.Any<object?>(), Arg.Any<CancellationToken>())
.Returns<Task<WriteResult>>(_ => throw new InvalidOperationException("boom"));
_mockFactory.Create("OpcUa", Arg.Any<IDictionary<string, string>>()).Returns(mockAdapter);
var manager = Sys.ActorOf(Props.Create(() =>
new DataConnectionManagerActor(_mockFactory, _options, _mockHealthCollector)));
manager.Tell(new CreateConnectionCommand("conn1", "OpcUa", new Dictionary<string, string>(), null, 3));
await Task.Delay(300); // connection actor reaches Connected
// Register a subscription.
manager.Tell(new SubscribeTagsRequest("c1", "inst1", "conn1", ["tag1"], DateTimeOffset.UtcNow));
ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(3));
// Crash the connection actor via a synchronously-throwing write.
manager.Tell(new WriteTagRequest("c2", "conn1", "tag1", 42, DateTimeOffset.UtcNow));
await Task.Delay(300); // supervisor handles the failure
// After the crash the subscription state must survive: the health report
// still shows the subscribed/resolved tag. With Restart it would be 0.
manager.Tell(new GetAllHealthReports());
var report = ExpectMsg<DataConnectionHealthReport>(TimeSpan.FromSeconds(3));
Assert.Equal(1, report.TotalSubscribedTags);
Assert.Equal(1, report.ResolvedTags);
}
[Fact] [Fact]
public void CreateConnection_UsesFactory() public void CreateConnection_UsesFactory()
{ {
@@ -6,6 +6,37 @@ using ScadaLink.DataConnectionLayer.Adapters;
namespace ScadaLink.DataConnectionLayer.Tests; namespace ScadaLink.DataConnectionLayer.Tests;
/// <summary>
/// DataConnectionLayer-003: structural regression guard. RealOpcUaClient's
/// monitored-item / callback maps are read from the OPC UA SDK's publish threads
/// concurrently with subscribe/disconnect mutations on other threads. They must be
/// concurrent collections, not plain Dictionary. This is verified structurally
/// because RealOpcUaClient wraps concrete OPC Foundation SDK types and cannot be
/// exercised without a live OPC UA server.
/// </summary>
public class RealOpcUaClientThreadSafetyTests
{
[Theory]
[InlineData("_callbacks")]
[InlineData("_monitoredItems")]
public void DCL003_SharedDictionaryFields_AreConcurrentCollections(string fieldName)
{
var field = typeof(RealOpcUaClient)
.GetField(fieldName,
System.Reflection.BindingFlags.Instance |
System.Reflection.BindingFlags.NonPublic);
Assert.NotNull(field);
var fieldType = field!.FieldType;
Assert.True(
fieldType.IsGenericType &&
fieldType.GetGenericTypeDefinition() == typeof(System.Collections.Concurrent.ConcurrentDictionary<,>),
$"RealOpcUaClient.{fieldName} must be a ConcurrentDictionary<,> for thread safety, " +
$"but was {fieldType.Name}.");
}
}
/// <summary> /// <summary>
/// WP-7: Tests for OPC UA adapter. /// WP-7: Tests for OPC UA adapter.
/// </summary> /// </summary>
@@ -150,6 +150,82 @@ public class DeploymentServiceTests
await _repo.Received().AddDeploymentRecordAsync(Arg.Any<DeploymentRecord>(), Arg.Any<CancellationToken>()); await _repo.Received().AddDeploymentRecordAsync(Arg.Any<DeploymentRecord>(), Arg.Any<CancellationToken>());
} }
// ── DeploymentManager-001: unexpected exception must not leave record InProgress ──
[Fact]
public async Task DeployInstanceAsync_CommunicationThrowsUnexpectedException_RecordMarkedFailed()
{
var instance = new Instance("TestInst") { Id = 1, SiteId = 1, State = InstanceState.NotDeployed };
_repo.GetInstanceByIdAsync(1).Returns(instance);
var config = new FlattenedConfiguration { InstanceUniqueName = "TestInst" };
_pipeline.FlattenAndValidateAsync(1, Arg.Any<CancellationToken>())
.Returns(Result<FlatteningPipelineResult>.Success(
new FlatteningPipelineResult(config, "sha256:abc", ValidationResult.Success())));
// Capture the deployment record so we can inspect its final state.
DeploymentRecord? captured = null;
await _repo.AddDeploymentRecordAsync(
Arg.Do<DeploymentRecord>(r => captured = r), Arg.Any<CancellationToken>());
// _comms has no actor set, so DeployInstanceAsync throws
// InvalidOperationException -- a non-timeout, non-cancellation exception.
var result = await _service.DeployInstanceAsync(1, "admin");
// The exception must be handled, not escape.
Assert.True(result.IsFailure);
Assert.Contains("Deployment failed", result.Error);
// The record must not be left stuck in InProgress.
Assert.NotNull(captured);
Assert.Equal(DeploymentStatus.Failed, captured!.Status);
Assert.NotNull(captured.ErrorMessage);
Assert.NotNull(captured.CompletedAt);
}
// ── DeploymentManager-002: failure write must not use a cancelled token ──
[Fact]
public async Task DeployInstanceAsync_FailureWrite_UsesNonCancellableToken()
{
var instance = new Instance("TestInst") { Id = 1, SiteId = 1, State = InstanceState.NotDeployed };
_repo.GetInstanceByIdAsync(Arg.Any<int>(), Arg.Any<CancellationToken>()).Returns(instance);
var config = new FlattenedConfiguration { InstanceUniqueName = "TestInst" };
_pipeline.FlattenAndValidateAsync(Arg.Any<int>(), Arg.Any<CancellationToken>())
.Returns(Result<FlatteningPipelineResult>.Success(
new FlatteningPipelineResult(config, "sha256:abc", ValidationResult.Success())));
DeploymentRecord? captured = null;
await _repo.AddDeploymentRecordAsync(
Arg.Do<DeploymentRecord>(r => captured = r), Arg.Any<CancellationToken>());
// Simulate a repository that rejects already-cancelled tokens (the
// real EF Core behaviour when the operation token is cancelled). If the
// catch block passes the operation's cancelled token, the Failed-status
// write throws and the record stays InProgress -- the exact bug.
_repo.UpdateDeploymentRecordAsync(
Arg.Is<DeploymentRecord>(r => r.Status == DeploymentStatus.Failed),
Arg.Is<CancellationToken>(ct => ct.IsCancellationRequested))
.Returns<Task>(_ => throw new OperationCanceledException());
_repo.SaveChangesAsync(Arg.Is<CancellationToken>(ct => ct.IsCancellationRequested))
.Returns<Task<int>>(_ => throw new OperationCanceledException());
// The communication call fails (no actor set). The catch block must
// persist the Failed status with a non-cancellable token, so cleanup
// succeeds even when the caller's token is cancelled.
var result = await _service.DeployInstanceAsync(1, "admin");
Assert.True(result.IsFailure);
Assert.NotNull(captured);
Assert.Equal(DeploymentStatus.Failed, captured!.Status);
// The Failed-status write happened with a non-cancelled token.
await _repo.Received().UpdateDeploymentRecordAsync(
Arg.Is<DeploymentRecord>(r => r.Status == DeploymentStatus.Failed),
Arg.Is<CancellationToken>(ct => !ct.IsCancellationRequested));
}
// ── WP-6: Lifecycle commands ── // ── WP-6: Lifecycle commands ──
[Fact] [Fact]
@@ -1,8 +1,10 @@
using System.Net; using System.Net;
using Microsoft.Data.Sqlite;
using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Logging.Abstractions;
using NSubstitute; using NSubstitute;
using ScadaLink.Commons.Entities.ExternalSystems; using ScadaLink.Commons.Entities.ExternalSystems;
using ScadaLink.Commons.Interfaces.Repositories; using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.StoreAndForward;
namespace ScadaLink.ExternalSystemGateway.Tests; namespace ScadaLink.ExternalSystemGateway.Tests;
@@ -216,6 +218,116 @@ public class ExternalSystemClientTests
() => client.DeliverBufferedAsync(BufferedCall("TestAPI", "failMethod"))); () => client.DeliverBufferedAsync(BufferedCall("TestAPI", "failMethod")));
} }
// ── ExternalSystemGateway-003: CachedCall must not double-dispatch ──
[Fact]
public async Task CachedCall_TransientFailure_DoesNotImmediatelyRedispatchViaRegisteredHandler()
{
var system = new ExternalSystemDefinition("TestAPI", "https://api.example.com", "none") { Id = 1 };
var method = new ExternalSystemMethod("postData", "POST", "/post") { Id = 1, ExternalSystemDefinitionId = 1 };
_repository.GetAllExternalSystemsAsync(Arg.Any<CancellationToken>())
.Returns(new List<ExternalSystemDefinition> { system });
_repository.GetMethodsByExternalSystemIdAsync(1, Arg.Any<CancellationToken>())
.Returns(new List<ExternalSystemMethod> { method });
// The HTTP layer always fails transiently (500).
var httpClient = new HttpClient(new MockHttpMessageHandler(HttpStatusCode.InternalServerError, "boom"));
_httpClientFactory.CreateClient(Arg.Any<string>()).Returns(httpClient);
// A real S&F service with a registered delivery handler that counts invocations.
var dbName = $"EsgDoubleDispatch_{Guid.NewGuid():N}";
var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared";
using var keepAlive = new SqliteConnection(connStr);
keepAlive.Open();
var storage = new StoreAndForwardStorage(connStr, NullLogger<StoreAndForwardStorage>.Instance);
await storage.InitializeAsync();
var sfOptions = new StoreAndForwardOptions
{
DefaultRetryInterval = TimeSpan.FromMinutes(10),
RetryTimerInterval = TimeSpan.FromMinutes(10),
};
var sf = new StoreAndForwardService(storage, sfOptions, NullLogger<StoreAndForwardService>.Instance);
var handlerInvocations = 0;
sf.RegisterDeliveryHandler(
ScadaLink.Commons.Types.Enums.StoreAndForwardCategory.ExternalSystem,
_ => { Interlocked.Increment(ref handlerInvocations); return Task.FromResult(false); });
var client = new ExternalSystemClient(
_httpClientFactory, _repository,
NullLogger<ExternalSystemClient>.Instance,
storeAndForward: sf);
var result = await client.CachedCallAsync("TestAPI", "postData");
// The call already made one HTTP attempt; EnqueueAsync must NOT invoke the
// registered handler again synchronously (which would dispatch a 2nd request).
Assert.True(result.WasBuffered);
Assert.Equal(0, handlerInvocations);
}
// ── ExternalSystemGateway-002: per-system call timeout ──
[Fact]
public async Task Call_SlowSystem_TimesOutAsTransientErrorWithinConfiguredWindow()
{
var system = new ExternalSystemDefinition("TestAPI", "https://api.example.com", "none") { Id = 1 };
var method = new ExternalSystemMethod("getData", "GET", "/data") { Id = 1, ExternalSystemDefinitionId = 1 };
_repository.GetAllExternalSystemsAsync(Arg.Any<CancellationToken>())
.Returns(new List<ExternalSystemDefinition> { system });
_repository.GetMethodsByExternalSystemIdAsync(1, Arg.Any<CancellationToken>())
.Returns(new List<ExternalSystemMethod> { method });
// Handler that hangs far longer than the configured timeout and the test budget.
var httpClient = new HttpClient(new HangingHttpMessageHandler(TimeSpan.FromMinutes(10)));
_httpClientFactory.CreateClient(Arg.Any<string>()).Returns(httpClient);
// Configure a short timeout so the call must fail quickly.
var options = new ExternalSystemGatewayOptions { DefaultHttpTimeout = TimeSpan.FromMilliseconds(200) };
var client = new ExternalSystemClient(
_httpClientFactory, _repository,
NullLogger<ExternalSystemClient>.Instance,
options: Microsoft.Extensions.Options.Options.Create(options));
var sw = System.Diagnostics.Stopwatch.StartNew();
var result = await client.CallAsync("TestAPI", "getData");
sw.Stop();
Assert.False(result.Success);
Assert.Contains("Transient error", result.ErrorMessage);
Assert.Contains("Timeout", result.ErrorMessage);
// Must fail near the configured 200ms, well before HttpClient's default 100s.
Assert.True(sw.Elapsed < TimeSpan.FromSeconds(10),
$"Call took {sw.Elapsed}, expected to time out near the configured 200ms window");
}
[Fact]
public async Task Call_CallerCancellation_IsNotMisreportedAsTimeout()
{
var system = new ExternalSystemDefinition("TestAPI", "https://api.example.com", "none") { Id = 1 };
var method = new ExternalSystemMethod("getData", "GET", "/data") { Id = 1, ExternalSystemDefinitionId = 1 };
_repository.GetAllExternalSystemsAsync(Arg.Any<CancellationToken>())
.Returns(new List<ExternalSystemDefinition> { system });
_repository.GetMethodsByExternalSystemIdAsync(1, Arg.Any<CancellationToken>())
.Returns(new List<ExternalSystemMethod> { method });
var httpClient = new HttpClient(new HangingHttpMessageHandler(TimeSpan.FromMinutes(10)));
_httpClientFactory.CreateClient(Arg.Any<string>()).Returns(httpClient);
var options = new ExternalSystemGatewayOptions { DefaultHttpTimeout = TimeSpan.FromMinutes(5) };
var client = new ExternalSystemClient(
_httpClientFactory, _repository,
NullLogger<ExternalSystemClient>.Instance,
options: Microsoft.Extensions.Options.Options.Create(options));
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200));
// Caller-initiated cancellation must surface as OperationCanceledException,
// not be swallowed as a transient timeout error.
await Assert.ThrowsAnyAsync<OperationCanceledException>(
() => client.CallAsync("TestAPI", "getData", cancellationToken: cts.Token));
}
/// <summary> /// <summary>
/// Test helper: mock HTTP message handler. /// Test helper: mock HTTP message handler.
/// </summary> /// </summary>
@@ -238,4 +350,20 @@ public class ExternalSystemClientTests
}); });
} }
} }
/// <summary>
/// Test helper: an HTTP handler that hangs until cancelled (simulates a slow/hung system).
/// </summary>
private class HangingHttpMessageHandler : HttpMessageHandler
{
private readonly TimeSpan _delay;
public HangingHttpMessageHandler(TimeSpan delay) => _delay = delay;
protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
await Task.Delay(_delay, cancellationToken);
return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent("{}") };
}
}
} }
@@ -23,6 +23,7 @@
<ItemGroup> <ItemGroup>
<ProjectReference Include="../../src/ScadaLink.ExternalSystemGateway/ScadaLink.ExternalSystemGateway.csproj" /> <ProjectReference Include="../../src/ScadaLink.ExternalSystemGateway/ScadaLink.ExternalSystemGateway.csproj" />
<ProjectReference Include="../../src/ScadaLink.Commons/ScadaLink.Commons.csproj" /> <ProjectReference Include="../../src/ScadaLink.Commons/ScadaLink.Commons.csproj" />
<ProjectReference Include="../../src/ScadaLink.StoreAndForward/ScadaLink.StoreAndForward.csproj" />
</ItemGroup> </ItemGroup>
</Project> </Project>
@@ -161,7 +161,62 @@ public class CentralHealthAggregatorTests
_aggregator.ProcessReport(report); _aggregator.ProcessReport(report);
var state = _aggregator.GetSiteState("site-1"); var state = _aggregator.GetSiteState("site-1");
Assert.Equal(42, state!.LatestReport.ScriptErrorCount); Assert.Equal(42, state!.LatestReport!.ScriptErrorCount);
}
/// <summary>
/// HealthMonitoring-002 regression: SiteHealthState is mutated from multiple
/// threads (ProcessReport, MarkHeartbeat, CheckForOfflineSites). With a mutable
/// class and unsynchronized field writes, a snapshot read could observe a torn
/// or half-applied state. The state must be immutable and every transition an
/// atomic reference swap, so a snapshot is always internally consistent and the
/// monotonic sequence-number guard is never subverted by a lost update.
/// </summary>
[Fact]
public async Task ProcessReport_ConcurrentUpdates_NeverLoseSequenceOrTearState()
{
const int iterations = 5_000;
// SiteHealthState must be an immutable record so handing the reference to
// UI callers (and reading it concurrently) is safe.
Assert.True(typeof(SiteHealthState).GetMethod("<Clone>$") != null,
"SiteHealthState must be an immutable record for safe concurrent reads.");
_aggregator.ProcessReport(MakeReport("site-1", 0));
var writer = Task.Run(() =>
{
for (long seq = 1; seq <= iterations; seq++)
_aggregator.ProcessReport(MakeReport("site-1", seq));
});
var heartbeater = Task.Run(() =>
{
for (int i = 0; i < iterations; i++)
_aggregator.MarkHeartbeat("site-1", _timeProvider.GetUtcNow());
});
long maxObserved = 0;
var reader = Task.Run(() =>
{
for (int i = 0; i < iterations; i++)
{
var state = _aggregator.GetSiteState("site-1");
if (state == null) continue;
// A consistent snapshot: the stored report's sequence number must
// always match the state's LastSequenceNumber (no half-applied update).
Assert.Equal(state.LastSequenceNumber, state.LatestReport!.SequenceNumber);
if (state.LastSequenceNumber > maxObserved)
maxObserved = state.LastSequenceNumber;
}
});
await Task.WhenAll(writer, heartbeater, reader);
// The final state must reflect the highest sequence — no lost update.
var final = _aggregator.GetSiteState("site-1");
Assert.Equal(iterations, final!.LastSequenceNumber);
Assert.Equal(iterations, final.LatestReport!.SequenceNumber);
Assert.True(final.IsOnline);
} }
[Fact] [Fact]
@@ -173,11 +228,11 @@ public class CentralHealthAggregatorTests
_aggregator.ProcessReport(MakeReport("site-1", 10)); _aggregator.ProcessReport(MakeReport("site-1", 10));
_aggregator.ProcessReport(MakeReport("site-1", 1)); _aggregator.ProcessReport(MakeReport("site-1", 1));
var state = _aggregator.GetSiteState("site-1"); Assert.Equal(10, _aggregator.GetSiteState("site-1")!.LastSequenceNumber);
Assert.Equal(10, state!.LastSequenceNumber);
// Once it exceeds the old max, it works again // Once it exceeds the old max, it works again. SiteHealthState is an
// immutable snapshot, so re-fetch to observe the new state.
_aggregator.ProcessReport(MakeReport("site-1", 11)); _aggregator.ProcessReport(MakeReport("site-1", 11));
Assert.Equal(11, state.LastSequenceNumber); Assert.Equal(11, _aggregator.GetSiteState("site-1")!.LastSequenceNumber);
} }
} }
@@ -1,7 +1,9 @@
using Microsoft.Data.Sqlite;
using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using ScadaLink.Commons.Messages.Health; using ScadaLink.Commons.Messages.Health;
using ScadaLink.Commons.Types.Enums; using ScadaLink.Commons.Types.Enums;
using ScadaLink.StoreAndForward;
namespace ScadaLink.HealthMonitoring.Tests; namespace ScadaLink.HealthMonitoring.Tests;
@@ -136,6 +138,76 @@ public class HealthReportSenderTests
} }
} }
/// <summary>
/// HealthMonitoring-001 regression: the documented "store-and-forward buffer
/// depth" metric (pending messages by category) must actually be populated in
/// the emitted report. Previously SetStoreAndForwardDepths had no callers, so
/// StoreAndForwardBufferDepths was always empty. The sender must query the S&amp;F
/// engine's per-category depth API and include it alongside the parked count.
/// </summary>
[Fact]
public async Task ReportsIncludeStoreAndForwardBufferDepthsFromStorage()
{
var dbName = $"HealthSfDepth_{Guid.NewGuid():N}";
var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared";
// Keep one connection alive so the in-memory DB persists for the test.
using var keepAlive = new SqliteConnection(connStr);
keepAlive.Open();
var storage = new StoreAndForwardStorage(connStr, NullLogger<StoreAndForwardStorage>.Instance);
await storage.InitializeAsync();
// Two pending ExternalSystem messages and one pending Notification message.
await storage.EnqueueAsync(MakePendingMessage("m1", StoreAndForwardCategory.ExternalSystem));
await storage.EnqueueAsync(MakePendingMessage("m2", StoreAndForwardCategory.ExternalSystem));
await storage.EnqueueAsync(MakePendingMessage("m3", StoreAndForwardCategory.Notification));
var transport = new FakeTransport();
var collector = new SiteHealthCollector();
collector.SetActiveNode(true);
var options = Options.Create(new HealthMonitoringOptions
{
ReportInterval = TimeSpan.FromMilliseconds(50)
});
var sender = new HealthReportSender(
collector,
transport,
options,
NullLogger<HealthReportSender>.Instance,
new FakeSiteIdentityProvider(),
sfStorage: storage);
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(300));
try
{
await sender.StartAsync(cts.Token);
await Task.Delay(250, CancellationToken.None);
await sender.StopAsync(CancellationToken.None);
}
catch (OperationCanceledException) { }
Assert.True(transport.SentReports.Count >= 1);
var depths = transport.SentReports[^1].StoreAndForwardBufferDepths;
Assert.Equal(2, depths[nameof(StoreAndForwardCategory.ExternalSystem)]);
Assert.Equal(1, depths[nameof(StoreAndForwardCategory.Notification)]);
Assert.False(depths.ContainsKey(nameof(StoreAndForwardCategory.CachedDbWrite)));
}
private static StoreAndForwardMessage MakePendingMessage(string id, StoreAndForwardCategory category) =>
new()
{
Id = id,
Category = category,
Target = "target",
PayloadJson = "{}",
RetryCount = 0,
MaxRetries = 50,
RetryIntervalMs = 30_000,
CreatedAt = DateTimeOffset.UtcNow,
Status = StoreAndForwardMessageStatus.Pending
};
[Fact] [Fact]
public void InitialSequenceNumberSeededWithUnixMs() public void InitialSequenceNumberSeededWithUnixMs()
{ {
@@ -10,6 +10,7 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="coverlet.collector" /> <PackageReference Include="coverlet.collector" />
<PackageReference Include="Microsoft.Data.Sqlite" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" /> <PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
<PackageReference Include="Microsoft.Extensions.Options" /> <PackageReference Include="Microsoft.Extensions.Options" />
<PackageReference Include="Microsoft.NET.Test.Sdk" /> <PackageReference Include="Microsoft.NET.Test.Sdk" />
@@ -110,6 +110,54 @@ public class HealthCheckTests : IDisposable
} }
} }
[Fact]
public async Task HealthReady_Endpoint_ExcludesActiveNodeCheck()
{
// Host-001 regression: /health/ready must reflect cluster membership + DB
// connectivity only (REQ-HOST-4a), NOT cluster leadership. The leader-only
// "active-node" check belongs solely to /health/active. If /health/ready
// included "active-node", a fully operational standby central node would
// permanently report 503, breaking load-balancer failover readiness.
var previousEnv = Environment.GetEnvironmentVariable("DOTNET_ENVIRONMENT");
try
{
Environment.SetEnvironmentVariable("DOTNET_ENVIRONMENT", "Central");
var factory = new WebApplicationFactory<Program>()
.WithWebHostBuilder(builder =>
{
builder.ConfigureAppConfiguration((context, config) =>
{
config.AddInMemoryCollection(new Dictionary<string, string?>
{
["ScadaLink:Node:NodeHostname"] = "localhost",
["ScadaLink:Node:RemotingPort"] = "0",
["ScadaLink:Cluster:SeedNodes:0"] = "akka.tcp://scadalink@localhost:2551",
["ScadaLink:Cluster:SeedNodes:1"] = "akka.tcp://scadalink@localhost:2552",
["ScadaLink:Database:SkipMigrations"] = "true",
});
});
builder.UseSetting("ScadaLink:Node:Role", "Central");
builder.UseSetting("ScadaLink:Database:SkipMigrations", "true");
});
_disposables.Add(factory);
var client = factory.CreateClient();
_disposables.Add(client);
var response = await client.GetAsync("/health/ready");
var body = await response.Content.ReadAsStringAsync();
// The readiness body lists each executed check by name in its entries map.
// The leader-only "active-node" check must not be among them.
Assert.DoesNotContain("active-node", body);
}
finally
{
Environment.SetEnvironmentVariable("DOTNET_ENVIRONMENT", previousEnv);
}
}
[Fact] [Fact]
public async Task ActiveNodeHealthCheck_SystemNotStarted_ReturnsUnhealthy() public async Task ActiveNodeHealthCheck_SystemNotStarted_ReturnsUnhealthy()
{ {