Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d30ded7e72 | |||
| a0e6a36e79 | |||
| 7d7214a4ca | |||
| 340a70f0e6 | |||
| ab098bf6c8 | |||
| fccd3274d3 |
@@ -8,7 +8,7 @@
|
|||||||
| Last reviewed | 2026-05-16 |
|
| Last reviewed | 2026-05-16 |
|
||||||
| Reviewer | claude-agent |
|
| Reviewer | claude-agent |
|
||||||
| Commit reviewed | `9c60592` |
|
| Commit reviewed | `9c60592` |
|
||||||
| Open findings | 12 |
|
| Open findings | 8 |
|
||||||
|
|
||||||
## Summary
|
## Summary
|
||||||
|
|
||||||
@@ -101,7 +101,7 @@ whose message references `DataConnectionLayer-001`.
|
|||||||
|--|--|
|
|--|--|
|
||||||
| Severity | High |
|
| Severity | High |
|
||||||
| Category | Akka.NET conventions |
|
| Category | Akka.NET conventions |
|
||||||
| Status | Open |
|
| Status | Resolved |
|
||||||
| Location | `src/ScadaLink.DataConnectionLayer/Actors/DataConnectionManagerActor.cs:131-141` |
|
| Location | `src/ScadaLink.DataConnectionLayer/Actors/DataConnectionManagerActor.cs:131-141` |
|
||||||
|
|
||||||
**Description**
|
**Description**
|
||||||
@@ -127,7 +127,20 @@ after a crash and surface the lost-state condition rather than failing silently.
|
|||||||
|
|
||||||
**Resolution**
|
**Resolution**
|
||||||
|
|
||||||
_Unresolved._
|
Resolved 2026-05-16. The `DataConnectionManagerActor.SupervisorStrategy` was changed
|
||||||
|
from `Directive.Restart` to `Directive.Resume` for `DataConnectionActor` failures.
|
||||||
|
`Resume` keeps the existing actor instance and all its in-memory subscription state
|
||||||
|
(`_subscriptionsByInstance`, `_subscriptionIds`, `_subscribers`, quality counters)
|
||||||
|
intact across a transient handler exception, so the design doc's "transparent
|
||||||
|
re-subscribe" guarantee (WP-10) is preserved. The actor is a long-lived stateful
|
||||||
|
coordinator and its own Become/Stash reconnect state machine already recovers
|
||||||
|
connection-level faults — it does not need a restart. This also aligns with the
|
||||||
|
ScadaLink convention of `Resume` for coordinator actors. Regression test
|
||||||
|
`DCL002_ConnectionActorCrash_PreservesSubscriptionState` crashes the connection actor
|
||||||
|
via a synchronously-throwing write and asserts the subscription survives (health
|
||||||
|
report still shows 1 subscribed/resolved tag); it fails against the pre-fix `Restart`
|
||||||
|
code and passes after. Fixed by the commit whose message references
|
||||||
|
`DataConnectionLayer-002` (commit `<pending>`).
|
||||||
|
|
||||||
### DataConnectionLayer-003 — `RealOpcUaClient` callback/monitored-item dictionaries mutated without synchronization
|
### DataConnectionLayer-003 — `RealOpcUaClient` callback/monitored-item dictionaries mutated without synchronization
|
||||||
|
|
||||||
@@ -135,7 +148,7 @@ _Unresolved._
|
|||||||
|--|--|
|
|--|--|
|
||||||
| Severity | High |
|
| Severity | High |
|
||||||
| Category | Concurrency & thread safety |
|
| Category | Concurrency & thread safety |
|
||||||
| Status | Open |
|
| Status | Resolved |
|
||||||
| Location | `src/ScadaLink.DataConnectionLayer/Adapters/RealOpcUaClient.cs:16-17,130-131,153,163,173,183-184` |
|
| Location | `src/ScadaLink.DataConnectionLayer/Adapters/RealOpcUaClient.cs:16-17,130-131,153,163,173,183-184` |
|
||||||
|
|
||||||
**Description**
|
**Description**
|
||||||
@@ -162,7 +175,18 @@ threads still read `_callbacks` concurrently with `RemoveSubscriptionAsync` /
|
|||||||
|
|
||||||
**Resolution**
|
**Resolution**
|
||||||
|
|
||||||
_Unresolved._
|
Resolved 2026-05-16. `_monitoredItems` and `_callbacks` in `RealOpcUaClient` were
|
||||||
|
changed from plain `Dictionary<,>` to `ConcurrentDictionary<,>`, and the two
|
||||||
|
`Remove(key)` call sites switched to `TryRemove`. This makes the maps safe to read
|
||||||
|
from the OPC Foundation SDK's publish threads (`MonitoredItem.Notification` reading
|
||||||
|
`_callbacks`) concurrently with subscribe/disconnect mutations on other threads.
|
||||||
|
`RealOpcUaClient` wraps concrete OPC Foundation SDK types (`ISession`,
|
||||||
|
`Subscription`, `MonitoredItem`) and cannot be exercised without a live OPC UA
|
||||||
|
server, so the regression is guarded structurally by
|
||||||
|
`DCL003_SharedDictionaryFields_AreConcurrentCollections` (a reflection test asserting
|
||||||
|
both fields are `ConcurrentDictionary<,>`); it fails against the pre-fix `Dictionary`
|
||||||
|
code and passes after. Fixed by the commit whose message references
|
||||||
|
`DataConnectionLayer-003` (commit `<pending>`).
|
||||||
|
|
||||||
### DataConnectionLayer-004 — Subscribe-time tag-resolution failure leaves the connection healthy but never recovers correctly
|
### DataConnectionLayer-004 — Subscribe-time tag-resolution failure leaves the connection healthy but never recovers correctly
|
||||||
|
|
||||||
@@ -170,7 +194,7 @@ _Unresolved._
|
|||||||
|--|--|
|
|--|--|
|
||||||
| Severity | High |
|
| Severity | High |
|
||||||
| Category | Error handling & resilience |
|
| Category | Error handling & resilience |
|
||||||
| Status | Open |
|
| Status | Resolved |
|
||||||
| Location | `src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs:495-503,529-537` |
|
| Location | `src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs:495-503,529-537` |
|
||||||
|
|
||||||
**Description**
|
**Description**
|
||||||
@@ -197,7 +221,21 @@ Instance Actor so it reflects the documented behaviour.
|
|||||||
|
|
||||||
**Resolution**
|
**Resolution**
|
||||||
|
|
||||||
_Unresolved._
|
Resolved 2026-05-16. The subscribe background task now classifies each subscribe
|
||||||
|
exception via the new `IsConnectionLevelFailure` helper (`InvalidOperationException`
|
||||||
|
— thrown by `EnsureConnected()` — plus `SocketException`/`TimeoutException`/
|
||||||
|
`IOException` count as connection-level; anything else is a genuine resolution
|
||||||
|
failure). The classification is carried on `SubscribeTagResult.ConnectionLevelFailure`
|
||||||
|
and applied on the actor thread in `HandleSubscribeCompleted`: connection-level
|
||||||
|
failures no longer become unresolved tags and instead drive the reconnection state
|
||||||
|
machine (`HandleSubscribeCompleted` returns a flag and the Connected-state handler
|
||||||
|
calls `BecomeReconnecting`); genuine resolution failures still go to `_unresolvedTags`
|
||||||
|
and the retry timer, and now also push a `TagValueUpdate` with `QualityCode.Bad` to
|
||||||
|
the subscribing Instance Actor, matching the design doc's Tag Path Resolution step 2.
|
||||||
|
Regression tests `DCL004_GenuineTagResolutionFailure_PushesBadQualityToSubscriber`
|
||||||
|
and `DCL004_ConnectionLevelSubscribeFailure_TriggersReconnect_NotTagRetry` both fail
|
||||||
|
against the pre-fix code and pass after. Fixed by the commit whose message references
|
||||||
|
`DataConnectionLayer-004` (commit `<pending>`).
|
||||||
|
|
||||||
### DataConnectionLayer-005 — `WriteTimeout` option is documented and configured but never applied
|
### DataConnectionLayer-005 — `WriteTimeout` option is documented and configured but never applied
|
||||||
|
|
||||||
@@ -205,7 +243,7 @@ _Unresolved._
|
|||||||
|--|--|
|
|--|--|
|
||||||
| Severity | High |
|
| Severity | High |
|
||||||
| Category | Design-document adherence |
|
| Category | Design-document adherence |
|
||||||
| Status | Open |
|
| Status | Resolved |
|
||||||
| Location | `src/ScadaLink.DataConnectionLayer/DataConnectionOptions.cs:15`, `src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs:573-590` |
|
| Location | `src/ScadaLink.DataConnectionLayer/DataConnectionOptions.cs:15`, `src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs:573-590` |
|
||||||
|
|
||||||
**Description**
|
**Description**
|
||||||
@@ -229,7 +267,19 @@ the initial-value seed and to `WriteBatchAndWaitAsync` paths if they are reachab
|
|||||||
|
|
||||||
**Resolution**
|
**Resolution**
|
||||||
|
|
||||||
_Unresolved._
|
Resolved 2026-05-16. `HandleWrite` now creates a `CancellationTokenSource(_options.WriteTimeout)`,
|
||||||
|
passes its token to `_adapter.WriteAsync(...)`, and disposes the source in the
|
||||||
|
continuation. A cancelled/timed-out write (`Task.IsCanceled` or a base
|
||||||
|
`OperationCanceledException`) is translated into a failed `WriteTagResponse` with a
|
||||||
|
`"Write timeout after Ns"` message, so a hung device write is bounded and the failure
|
||||||
|
is returned synchronously to the calling script (WP-11) instead of blocking until the
|
||||||
|
script's own Ask-timeout. (The `WriteBatchAndWaitAsync` adapter path already accepts
|
||||||
|
an explicit `timeout`/`CancellationToken` and is not invoked by `HandleWrite`, so no
|
||||||
|
change was needed there.) Regression test
|
||||||
|
`DCL005_Write_ThatHangs_TimesOutAndReturnsFailureSynchronously` uses an adapter whose
|
||||||
|
`WriteAsync` only completes when its token fires; it fails against the pre-fix
|
||||||
|
unbounded code and passes after. Fixed by the commit whose message references
|
||||||
|
`DataConnectionLayer-005` (commit `<pending>`).
|
||||||
|
|
||||||
### DataConnectionLayer-006 — Health quality counters not reset/recomputed after failover or re-subscribe
|
### DataConnectionLayer-006 — Health quality counters not reset/recomputed after failover or re-subscribe
|
||||||
|
|
||||||
|
|||||||
@@ -8,7 +8,7 @@
|
|||||||
| Last reviewed | 2026-05-16 |
|
| Last reviewed | 2026-05-16 |
|
||||||
| Reviewer | claude-agent |
|
| Reviewer | claude-agent |
|
||||||
| Commit reviewed | `9c60592` |
|
| Commit reviewed | `9c60592` |
|
||||||
| Open findings | 14 |
|
| Open findings | 12 |
|
||||||
|
|
||||||
## Summary
|
## Summary
|
||||||
|
|
||||||
@@ -53,7 +53,7 @@ exercises a successful deployment or the lifecycle success paths.
|
|||||||
|--|--|
|
|--|--|
|
||||||
| Severity | High |
|
| Severity | High |
|
||||||
| Category | Error handling & resilience |
|
| Category | Error handling & resilience |
|
||||||
| Status | Open |
|
| Status | Resolved |
|
||||||
| Location | `src/ScadaLink.DeploymentManager/DeploymentService.cs:141-199` |
|
| Location | `src/ScadaLink.DeploymentManager/DeploymentService.cs:141-199` |
|
||||||
|
|
||||||
**Description**
|
**Description**
|
||||||
@@ -81,7 +81,13 @@ every exit path out of the `try`.
|
|||||||
|
|
||||||
**Resolution**
|
**Resolution**
|
||||||
|
|
||||||
_Unresolved._
|
Resolved 2026-05-16 (commit `<pending>`): broadened the `catch` in
|
||||||
|
`DeployInstanceAsync` to `catch (Exception ex)` so any exception (transport,
|
||||||
|
serialization, DB, `InvalidOperationException` from an uninitialized
|
||||||
|
`CommunicationService`) marks the deployment record `Failed` with the error
|
||||||
|
message and audit-logs the failure, instead of escaping and leaving the record
|
||||||
|
stuck in `InProgress`. Regression test:
|
||||||
|
`DeployInstanceAsync_CommunicationThrowsUnexpectedException_RecordMarkedFailed`.
|
||||||
|
|
||||||
### DeploymentManager-002 — Failure-status write uses a possibly-cancelled cancellation token
|
### DeploymentManager-002 — Failure-status write uses a possibly-cancelled cancellation token
|
||||||
|
|
||||||
@@ -89,7 +95,7 @@ _Unresolved._
|
|||||||
|--|--|
|
|--|--|
|
||||||
| Severity | High |
|
| Severity | High |
|
||||||
| Category | Error handling & resilience |
|
| Category | Error handling & resilience |
|
||||||
| Status | Open |
|
| Status | Resolved |
|
||||||
| Location | `src/ScadaLink.DeploymentManager/DeploymentService.cs:186-196` |
|
| Location | `src/ScadaLink.DeploymentManager/DeploymentService.cs:186-196` |
|
||||||
|
|
||||||
**Description**
|
**Description**
|
||||||
@@ -113,7 +119,14 @@ cancelled or timed out.
|
|||||||
|
|
||||||
**Resolution**
|
**Resolution**
|
||||||
|
|
||||||
_Unresolved._
|
Resolved 2026-05-16 (commit `<pending>`): the broadened `catch` block now
|
||||||
|
performs the failure-status write (`UpdateDeploymentRecordAsync`,
|
||||||
|
`SaveChangesAsync`) and the audit `LogAsync` with `CancellationToken.None`
|
||||||
|
instead of the operation's (possibly-cancelled) token, so the `Failed` status
|
||||||
|
is durably recorded even after a timeout/cancellation. The cleanup writes are
|
||||||
|
themselves wrapped in a `try`/`catch` that logs (without masking the original
|
||||||
|
error) if persistence still fails. Regression test:
|
||||||
|
`DeployInstanceAsync_FailureWrite_UsesNonCancellableToken`.
|
||||||
|
|
||||||
### DeploymentManager-003 — Successful-deployment cleanup is not atomic with the status write
|
### DeploymentManager-003 — Successful-deployment cleanup is not atomic with the status write
|
||||||
|
|
||||||
@@ -248,7 +261,19 @@ stale-rejection.
|
|||||||
|
|
||||||
**Resolution**
|
**Resolution**
|
||||||
|
|
||||||
_Unresolved._
|
_Unresolved._ Finding confirmed valid against the source — `GetDeploymentStatusAsync`
|
||||||
|
only reads the local `DeploymentRecord` via `GetDeploymentByDeploymentIdAsync`,
|
||||||
|
and `DeployInstanceAsync` unconditionally generates a new deployment ID with no
|
||||||
|
site reconciliation. Left Open: a proper fix is a cross-module new feature, not
|
||||||
|
a bug fix scoped to `ScadaLink.DeploymentManager`. It requires (1) a new
|
||||||
|
request/response message contract in `ScadaLink.Commons`, (2) a new
|
||||||
|
`CommunicationService` query method in `ScadaLink.Communication`, and (3)
|
||||||
|
site-side handling of the query — all outside the DeploymentManager module — plus
|
||||||
|
a design decision on the query protocol. The reconciliation logic in
|
||||||
|
`DeploymentService` cannot be implemented without those. Recommend tracking as a
|
||||||
|
dedicated cross-module feature work item (or, alternatively, amending the design
|
||||||
|
doc to delegate reconciliation entirely to site-side stale-rejection — also
|
||||||
|
outside this module's editable scope).
|
||||||
|
|
||||||
### DeploymentManager-007 — "Diff View" reduced to a hash comparison with no diff detail
|
### DeploymentManager-007 — "Diff View" reduced to a hash comparison with no diff detail
|
||||||
|
|
||||||
|
|||||||
@@ -8,7 +8,7 @@
|
|||||||
| Last reviewed | 2026-05-16 |
|
| Last reviewed | 2026-05-16 |
|
||||||
| Reviewer | claude-agent |
|
| Reviewer | claude-agent |
|
||||||
| Commit reviewed | `9c60592` |
|
| Commit reviewed | `9c60592` |
|
||||||
| Open findings | 13 |
|
| Open findings | 11 |
|
||||||
|
|
||||||
## Summary
|
## Summary
|
||||||
|
|
||||||
@@ -109,7 +109,7 @@ transient-retry paths. Fixed by the commit whose message references
|
|||||||
|--|--|
|
|--|--|
|
||||||
| Severity | High |
|
| Severity | High |
|
||||||
| Category | Error handling & resilience |
|
| Category | Error handling & resilience |
|
||||||
| Status | Open |
|
| Status | Resolved |
|
||||||
| Location | `src/ScadaLink.ExternalSystemGateway/ExternalSystemClient.cs:130`, `src/ScadaLink.ExternalSystemGateway/ServiceCollectionExtensions.cs:13` |
|
| Location | `src/ScadaLink.ExternalSystemGateway/ExternalSystemClient.cs:130`, `src/ScadaLink.ExternalSystemGateway/ServiceCollectionExtensions.cs:13` |
|
||||||
|
|
||||||
**Description**
|
**Description**
|
||||||
@@ -142,7 +142,24 @@ is classified as transient.
|
|||||||
|
|
||||||
**Resolution**
|
**Resolution**
|
||||||
|
|
||||||
_Unresolved._
|
Resolved 2026-05-16 (commit `<pending>`). `InvokeHttpAsync` now enforces a call
|
||||||
|
timeout: `ExternalSystemClient` takes an `IOptions<ExternalSystemGatewayOptions>` and
|
||||||
|
links a `CancellationTokenSource(DefaultHttpTimeout)` with the caller's token before
|
||||||
|
`SendAsync` and the response-body read, so the design's "timeout applies to the HTTP
|
||||||
|
request round-trip" guarantee now holds within the configured window (default 30s)
|
||||||
|
instead of `HttpClient`'s default 100s. A timeout is reclassified as a
|
||||||
|
`TransientExternalSystemException`; a caller-initiated cancellation is distinguished
|
||||||
|
from a timeout and propagated as `OperationCanceledException` rather than being
|
||||||
|
swallowed as transient. Regression tests:
|
||||||
|
`Call_SlowSystem_TimesOutAsTransientErrorWithinConfiguredWindow` and
|
||||||
|
`Call_CallerCancellation_IsNotMisreportedAsTimeout`.
|
||||||
|
|
||||||
|
Note (partial scope): the per-*system* `Timeout` field on `ExternalSystemDefinition`
|
||||||
|
remains unimplemented — adding it requires a change to `ScadaLink.Commons`, which is
|
||||||
|
outside this module's edit scope. Until that entity field exists, the configured
|
||||||
|
`DefaultHttpTimeout` is the effective per-call limit for every system. A follow-up
|
||||||
|
against the Commons module should add the `Timeout` field and have `InvokeHttpAsync`
|
||||||
|
prefer it over the default. This is a tracked follow-up, not a regression.
|
||||||
|
|
||||||
### ExternalSystemGateway-003 — `CachedCall` double-dispatches the HTTP request
|
### ExternalSystemGateway-003 — `CachedCall` double-dispatches the HTTP request
|
||||||
|
|
||||||
@@ -150,7 +167,7 @@ _Unresolved._
|
|||||||
|--|--|
|
|--|--|
|
||||||
| Severity | High |
|
| Severity | High |
|
||||||
| Category | Correctness & logic bugs |
|
| Category | Correctness & logic bugs |
|
||||||
| Status | Open |
|
| Status | Resolved |
|
||||||
| Location | `src/ScadaLink.ExternalSystemGateway/ExternalSystemClient.cs:84-117` |
|
| Location | `src/ScadaLink.ExternalSystemGateway/ExternalSystemClient.cs:84-117` |
|
||||||
|
|
||||||
**Description**
|
**Description**
|
||||||
@@ -179,7 +196,18 @@ the duplicated logic.
|
|||||||
|
|
||||||
**Resolution**
|
**Resolution**
|
||||||
|
|
||||||
_Unresolved._
|
Resolved 2026-05-16 (commit `<pending>`). Re-triage: this finding was already fixed in
|
||||||
|
the codebase as a side effect of the `ExternalSystemGateway-001` fix and is no longer
|
||||||
|
reproducible against the current source. `StoreAndForwardService.EnqueueAsync` gained an
|
||||||
|
`attemptImmediateDelivery` parameter (recommendation approach (b)), and
|
||||||
|
`CachedCallAsync` passes `attemptImmediateDelivery: false` after its own first HTTP
|
||||||
|
attempt — so `EnqueueAsync` buffers the message for the background retry sweep without
|
||||||
|
re-invoking the registered delivery handler, eliminating the duplicate dispatch. A
|
||||||
|
dedicated regression test, `CachedCall_TransientFailure_DoesNotImmediatelyRedispatchViaRegisteredHandler`,
|
||||||
|
was added in this module's test suite: it registers a counting delivery handler, drives
|
||||||
|
a `CachedCall` whose HTTP attempt fails transiently, and asserts the handler is invoked
|
||||||
|
zero times during enqueue. The test was verified to fail if `attemptImmediateDelivery`
|
||||||
|
is flipped back to `true`.
|
||||||
|
|
||||||
### ExternalSystemGateway-004 — System retry settings are not honoured for cached calls/writes
|
### ExternalSystemGateway-004 — System retry settings are not honoured for cached calls/writes
|
||||||
|
|
||||||
|
|||||||
@@ -8,7 +8,7 @@
|
|||||||
| Last reviewed | 2026-05-16 |
|
| Last reviewed | 2026-05-16 |
|
||||||
| Reviewer | claude-agent |
|
| Reviewer | claude-agent |
|
||||||
| Commit reviewed | `9c60592` |
|
| Commit reviewed | `9c60592` |
|
||||||
| Open findings | 12 |
|
| Open findings | 10 |
|
||||||
|
|
||||||
## Summary
|
## Summary
|
||||||
|
|
||||||
@@ -55,7 +55,7 @@ design-adherence gap.
|
|||||||
|--|--|
|
|--|--|
|
||||||
| Severity | High |
|
| Severity | High |
|
||||||
| Category | Design-document adherence |
|
| Category | Design-document adherence |
|
||||||
| Status | Open |
|
| Status | Resolved |
|
||||||
| Location | `src/ScadaLink.HealthMonitoring/SiteHealthCollector.cs:104`, `src/ScadaLink.HealthMonitoring/HealthReportSender.cs:79` |
|
| Location | `src/ScadaLink.HealthMonitoring/SiteHealthCollector.cs:104`, `src/ScadaLink.HealthMonitoring/HealthReportSender.cs:79` |
|
||||||
|
|
||||||
**Description**
|
**Description**
|
||||||
@@ -79,7 +79,17 @@ the dead setter. Update the placeholder test accordingly once implemented.
|
|||||||
|
|
||||||
**Resolution**
|
**Resolution**
|
||||||
|
|
||||||
_Unresolved._
|
Resolved 2026-05-16 (commit `<pending>`). `HealthReportSender.ExecuteAsync` now
|
||||||
|
queries the existing public `StoreAndForwardStorage.GetBufferDepthByCategoryAsync()`
|
||||||
|
API alongside the parked-count call and feeds the per-category depths into
|
||||||
|
`SiteHealthCollector.SetStoreAndForwardDepths` (category enum names as keys), so the
|
||||||
|
documented store-and-forward buffer depth metric is populated in every emitted
|
||||||
|
report. Regression test `HealthReportSenderTests.ReportsIncludeStoreAndForwardBufferDepthsFromStorage`
|
||||||
|
verifies populated per-category depths. The obsolete placeholder test
|
||||||
|
`SiteHealthCollectorTests.StoreAndForwardBufferDepths_IsEmptyPlaceholder` continues
|
||||||
|
to pass — it only exercises the collector with no setter call and still correctly
|
||||||
|
asserts the empty default; it was left in place as the collector-level default-state
|
||||||
|
test. No StoreAndForward source was modified (existing public API only).
|
||||||
|
|
||||||
### HealthMonitoring-002 — `SiteHealthState` mutable fields written from multiple threads without synchronization
|
### HealthMonitoring-002 — `SiteHealthState` mutable fields written from multiple threads without synchronization
|
||||||
|
|
||||||
@@ -87,7 +97,7 @@ _Unresolved._
|
|||||||
|--|--|
|
|--|--|
|
||||||
| Severity | High |
|
| Severity | High |
|
||||||
| Category | Concurrency & thread safety |
|
| Category | Concurrency & thread safety |
|
||||||
| Status | Open |
|
| Status | Resolved |
|
||||||
| Location | `src/ScadaLink.HealthMonitoring/SiteHealthState.cs:11`, `src/ScadaLink.HealthMonitoring/CentralHealthAggregator.cs:86`, `src/ScadaLink.HealthMonitoring/CentralHealthAggregator.cs:137` |
|
| Location | `src/ScadaLink.HealthMonitoring/SiteHealthState.cs:11`, `src/ScadaLink.HealthMonitoring/CentralHealthAggregator.cs:86`, `src/ScadaLink.HealthMonitoring/CentralHealthAggregator.cs:137` |
|
||||||
|
|
||||||
**Description**
|
**Description**
|
||||||
@@ -112,7 +122,22 @@ a single atomic reference swap.
|
|||||||
|
|
||||||
**Resolution**
|
**Resolution**
|
||||||
|
|
||||||
_Unresolved._
|
Resolved 2026-05-16 (commit `<pending>`). `SiteHealthState` is now a `sealed record`
|
||||||
|
with `init`-only properties. `CentralHealthAggregator.ProcessReport`,
|
||||||
|
`MarkHeartbeat`, and `CheckForOfflineSites` were rewritten to perform every state
|
||||||
|
transition as an atomic compare-and-swap (`TryAdd`/`TryUpdate`) producing a new
|
||||||
|
record instance — no field of a stored state is ever mutated in place. `ProcessReport`
|
||||||
|
uses an explicit CAS retry loop instead of the `AddOrUpdate` update delegate so the
|
||||||
|
sequence-number guard and the field writes are evaluated against the value actually
|
||||||
|
installed (this also closes the root cause behind HealthMonitoring-003). Reads via
|
||||||
|
`GetAllSiteStates`/`GetSiteState` now hand out immutable snapshots, so a concurrent
|
||||||
|
reader can never observe a torn or half-applied state. `LatestReport` was changed
|
||||||
|
from `SiteHealthReport` (`null!`) to `SiteHealthReport?`, making the contract honest;
|
||||||
|
all existing consumers (CentralUI, integration/perf tests) already null-checked it
|
||||||
|
and continue to build clean. Regression test
|
||||||
|
`CentralHealthAggregatorTests.ProcessReport_ConcurrentUpdates_NeverLoseSequenceOrTearState`
|
||||||
|
exercises concurrent report/heartbeat/read threads and asserts snapshot consistency
|
||||||
|
and no lost updates.
|
||||||
|
|
||||||
### HealthMonitoring-003 — Shared state mutated inside `ConcurrentDictionary.AddOrUpdate` update delegate
|
### HealthMonitoring-003 — Shared state mutated inside `ConcurrentDictionary.AddOrUpdate` update delegate
|
||||||
|
|
||||||
|
|||||||
@@ -8,7 +8,7 @@
|
|||||||
| Last reviewed | 2026-05-16 |
|
| Last reviewed | 2026-05-16 |
|
||||||
| Reviewer | claude-agent |
|
| Reviewer | claude-agent |
|
||||||
| Commit reviewed | `9c60592` |
|
| Commit reviewed | `9c60592` |
|
||||||
| Open findings | 11 |
|
| Open findings | 10 |
|
||||||
|
|
||||||
## Summary
|
## Summary
|
||||||
|
|
||||||
@@ -54,7 +54,7 @@ no safe workaround.
|
|||||||
|--|--|
|
|--|--|
|
||||||
| Severity | High |
|
| Severity | High |
|
||||||
| Category | Correctness & logic bugs |
|
| Category | Correctness & logic bugs |
|
||||||
| Status | Open |
|
| Status | Resolved |
|
||||||
| Location | `src/ScadaLink.Host/Program.cs:135-145` |
|
| Location | `src/ScadaLink.Host/Program.cs:135-145` |
|
||||||
|
|
||||||
**Description**
|
**Description**
|
||||||
@@ -81,7 +81,18 @@ checks and filter by tag). Add a regression test asserting a non-leader node ret
|
|||||||
|
|
||||||
**Resolution**
|
**Resolution**
|
||||||
|
|
||||||
_Unresolved._
|
Resolved 2026-05-16 (commit `<pending>`). Root cause confirmed against
|
||||||
|
`Program.cs`: the `/health/ready` mapping had no `Predicate`, so it executed all
|
||||||
|
three registered checks including the leader-only `active-node` check, while
|
||||||
|
`ActiveNodeHealthCheck` returns `Unhealthy` on any non-leader node — making a fully
|
||||||
|
operational standby central node permanently report `503`. Fix: added
|
||||||
|
`Predicate = check => check.Name != "active-node"` to the `/health/ready`
|
||||||
|
`HealthCheckOptions`, so readiness now reflects cluster membership + DB connectivity
|
||||||
|
only (REQ-HOST-4a); leadership remains reported solely by `/health/active`.
|
||||||
|
Regression test `HealthCheckTests.HealthReady_Endpoint_ExcludesActiveNodeCheck`
|
||||||
|
asserts the `active-node` check name does not appear in the `/health/ready`
|
||||||
|
response body; it failed before the fix and passes after. Full Host suite green
|
||||||
|
(156 passed).
|
||||||
|
|
||||||
### Host-002 — Akka.Persistence required by REQ-HOST-6 is not configured and not used
|
### Host-002 — Akka.Persistence required by REQ-HOST-6 is not configured and not used
|
||||||
|
|
||||||
|
|||||||
+8
-19
@@ -40,10 +40,10 @@ module file and counted in **Total**.
|
|||||||
| Severity | Open findings |
|
| Severity | Open findings |
|
||||||
|----------|---------------|
|
|----------|---------------|
|
||||||
| Critical | 0 |
|
| Critical | 0 |
|
||||||
| High | 39 |
|
| High | 28 |
|
||||||
| Medium | 100 |
|
| Medium | 100 |
|
||||||
| Low | 89 |
|
| Low | 89 |
|
||||||
| **Total** | **228** |
|
| **Total** | **217** |
|
||||||
|
|
||||||
## Module Status
|
## Module Status
|
||||||
|
|
||||||
@@ -55,11 +55,11 @@ module file and counted in **Total**.
|
|||||||
| [Commons](Commons/findings.md) | 2026-05-16 | `9c60592` | 0/0/4/8 | 12 | 12 |
|
| [Commons](Commons/findings.md) | 2026-05-16 | `9c60592` | 0/0/4/8 | 12 | 12 |
|
||||||
| [Communication](Communication/findings.md) | 2026-05-16 | `9c60592` | 0/0/5/3 | 8 | 11 |
|
| [Communication](Communication/findings.md) | 2026-05-16 | `9c60592` | 0/0/5/3 | 8 | 11 |
|
||||||
| [ConfigurationDatabase](ConfigurationDatabase/findings.md) | 2026-05-16 | `9c60592` | 0/0/4/6 | 10 | 11 |
|
| [ConfigurationDatabase](ConfigurationDatabase/findings.md) | 2026-05-16 | `9c60592` | 0/0/4/6 | 10 | 11 |
|
||||||
| [DataConnectionLayer](DataConnectionLayer/findings.md) | 2026-05-16 | `9c60592` | 0/4/6/2 | 12 | 13 |
|
| [DataConnectionLayer](DataConnectionLayer/findings.md) | 2026-05-16 | `9c60592` | 0/0/6/2 | 8 | 13 |
|
||||||
| [DeploymentManager](DeploymentManager/findings.md) | 2026-05-16 | `9c60592` | 0/3/6/5 | 14 | 14 |
|
| [DeploymentManager](DeploymentManager/findings.md) | 2026-05-16 | `9c60592` | 0/1/6/5 | 12 | 14 |
|
||||||
| [ExternalSystemGateway](ExternalSystemGateway/findings.md) | 2026-05-16 | `9c60592` | 0/2/7/4 | 13 | 14 |
|
| [ExternalSystemGateway](ExternalSystemGateway/findings.md) | 2026-05-16 | `9c60592` | 0/0/7/4 | 11 | 14 |
|
||||||
| [HealthMonitoring](HealthMonitoring/findings.md) | 2026-05-16 | `9c60592` | 0/2/5/5 | 12 | 12 |
|
| [HealthMonitoring](HealthMonitoring/findings.md) | 2026-05-16 | `9c60592` | 0/0/5/5 | 10 | 12 |
|
||||||
| [Host](Host/findings.md) | 2026-05-16 | `9c60592` | 0/1/3/7 | 11 | 11 |
|
| [Host](Host/findings.md) | 2026-05-16 | `9c60592` | 0/0/3/7 | 10 | 11 |
|
||||||
| [InboundAPI](InboundAPI/findings.md) | 2026-05-16 | `9c60592` | 0/3/5/5 | 13 | 13 |
|
| [InboundAPI](InboundAPI/findings.md) | 2026-05-16 | `9c60592` | 0/3/5/5 | 13 | 13 |
|
||||||
| [ManagementService](ManagementService/findings.md) | 2026-05-16 | `9c60592` | 0/3/5/5 | 13 | 13 |
|
| [ManagementService](ManagementService/findings.md) | 2026-05-16 | `9c60592` | 0/3/5/5 | 13 | 13 |
|
||||||
| [NotificationService](NotificationService/findings.md) | 2026-05-16 | `9c60592` | 0/3/5/3 | 11 | 12 |
|
| [NotificationService](NotificationService/findings.md) | 2026-05-16 | `9c60592` | 0/3/5/3 | 11 | 12 |
|
||||||
@@ -80,23 +80,12 @@ description, location, recommendation — lives in the module's `findings.md`.
|
|||||||
|
|
||||||
_None open._
|
_None open._
|
||||||
|
|
||||||
### High (39)
|
### High (28)
|
||||||
|
|
||||||
| ID | Module | Title |
|
| ID | Module | Title |
|
||||||
|----|--------|-------|
|
|----|--------|-------|
|
||||||
| ClusterInfrastructure-001 | [ClusterInfrastructure](ClusterInfrastructure/findings.md) | Module implements none of its documented responsibilities |
|
| ClusterInfrastructure-001 | [ClusterInfrastructure](ClusterInfrastructure/findings.md) | Module implements none of its documented responsibilities |
|
||||||
| DataConnectionLayer-002 | [DataConnectionLayer](DataConnectionLayer/findings.md) | `Restart` supervision discards all subscription state on connection-actor crash |
|
|
||||||
| DataConnectionLayer-003 | [DataConnectionLayer](DataConnectionLayer/findings.md) | `RealOpcUaClient` callback/monitored-item dictionaries mutated without synchronization |
|
|
||||||
| DataConnectionLayer-004 | [DataConnectionLayer](DataConnectionLayer/findings.md) | Subscribe-time tag-resolution failure leaves the connection healthy but never recovers correctly |
|
|
||||||
| DataConnectionLayer-005 | [DataConnectionLayer](DataConnectionLayer/findings.md) | `WriteTimeout` option is documented and configured but never applied |
|
|
||||||
| DeploymentManager-001 | [DeploymentManager](DeploymentManager/findings.md) | Unexpected exceptions leave the deployment record stuck in `InProgress` |
|
|
||||||
| DeploymentManager-002 | [DeploymentManager](DeploymentManager/findings.md) | Failure-status write uses a possibly-cancelled cancellation token |
|
|
||||||
| DeploymentManager-006 | [DeploymentManager](DeploymentManager/findings.md) | Query-the-site-before-redeploy idempotency requirement not implemented |
|
| DeploymentManager-006 | [DeploymentManager](DeploymentManager/findings.md) | Query-the-site-before-redeploy idempotency requirement not implemented |
|
||||||
| ExternalSystemGateway-002 | [ExternalSystemGateway](ExternalSystemGateway/findings.md) | Per-system call timeout is never applied to HTTP requests |
|
|
||||||
| ExternalSystemGateway-003 | [ExternalSystemGateway](ExternalSystemGateway/findings.md) | `CachedCall` double-dispatches the HTTP request |
|
|
||||||
| HealthMonitoring-001 | [HealthMonitoring](HealthMonitoring/findings.md) | Store-and-forward buffer depth metric is never populated |
|
|
||||||
| HealthMonitoring-002 | [HealthMonitoring](HealthMonitoring/findings.md) | `SiteHealthState` mutable fields written from multiple threads without synchronization |
|
|
||||||
| Host-001 | [Host](Host/findings.md) | `/health/ready` includes the leader-only `active-node` check |
|
|
||||||
| InboundAPI-001 | [InboundAPI](InboundAPI/findings.md) | Singleton script handler cache mutated without synchronization |
|
| InboundAPI-001 | [InboundAPI](InboundAPI/findings.md) | Singleton script handler cache mutated without synchronization |
|
||||||
| InboundAPI-003 | [InboundAPI](InboundAPI/findings.md) | API key compared with non-constant-time string equality |
|
| InboundAPI-003 | [InboundAPI](InboundAPI/findings.md) | API key compared with non-constant-time string equality |
|
||||||
| InboundAPI-005 | [InboundAPI](InboundAPI/findings.md) | Compiled API scripts run with no script-trust-model enforcement |
|
| InboundAPI-005 | [InboundAPI](InboundAPI/findings.md) | Compiled API scripts run with no script-trust-model enforcement |
|
||||||
|
|||||||
@@ -213,7 +213,13 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
|
|||||||
HandleSubscribe(req);
|
HandleSubscribe(req);
|
||||||
break;
|
break;
|
||||||
case SubscribeCompleted sc:
|
case SubscribeCompleted sc:
|
||||||
HandleSubscribeCompleted(sc);
|
// In Connected state, a connection-level subscribe failure must drive
|
||||||
|
// the reconnection state machine (DataConnectionLayer-004).
|
||||||
|
if (HandleSubscribeCompleted(sc))
|
||||||
|
{
|
||||||
|
_log.Warning("[{0}] Connection-level subscribe failure — entering Reconnecting", _connectionName);
|
||||||
|
BecomeReconnecting();
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case UnsubscribeTagsRequest req:
|
case UnsubscribeTagsRequest req:
|
||||||
HandleUnsubscribe(req);
|
HandleUnsubscribe(req);
|
||||||
@@ -514,8 +520,14 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
|
|||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
// WP-12: Tag path resolution failure — reported back as unresolved.
|
// DataConnectionLayer-004: distinguish a connection-level fault
|
||||||
results.Add(new SubscribeTagResult(tagPath, AlreadySubscribed: false, Success: false, null, ex.Message));
|
// (adapter not connected / transport down) from a genuine
|
||||||
|
// node-not-found. Connection-level faults must drive the
|
||||||
|
// reconnection state machine, not be retried as unresolved tags.
|
||||||
|
var connectionLevel = IsConnectionLevelFailure(ex);
|
||||||
|
results.Add(new SubscribeTagResult(
|
||||||
|
tagPath, AlreadySubscribed: false, Success: false, null, ex.Message,
|
||||||
|
ConnectionLevelFailure: connectionLevel));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -546,8 +558,10 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
|
|||||||
/// Applies the result of an asynchronous subscribe on the actor thread. ALL mutation
|
/// Applies the result of an asynchronous subscribe on the actor thread. ALL mutation
|
||||||
/// of subscription state and counters happens here — never on the background task —
|
/// of subscription state and counters happens here — never on the background task —
|
||||||
/// so the actor model's single-threaded state guarantee holds.
|
/// so the actor model's single-threaded state guarantee holds.
|
||||||
|
/// Returns <c>true</c> if any tag failed at connection level (DataConnectionLayer-004),
|
||||||
|
/// signalling the caller (only the Connected state) to enter Reconnecting.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
private void HandleSubscribeCompleted(SubscribeCompleted msg)
|
private bool HandleSubscribeCompleted(SubscribeCompleted msg)
|
||||||
{
|
{
|
||||||
var instanceName = msg.Request.InstanceUniqueName;
|
var instanceName = msg.Request.InstanceUniqueName;
|
||||||
if (!_subscriptionsByInstance.TryGetValue(instanceName, out var instanceTags))
|
if (!_subscriptionsByInstance.TryGetValue(instanceName, out var instanceTags))
|
||||||
@@ -557,6 +571,12 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
|
|||||||
_subscriptionsByInstance[instanceName] = instanceTags;
|
_subscriptionsByInstance[instanceName] = instanceTags;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DataConnectionLayer-004: if any tag failed because the adapter is not
|
||||||
|
// connected (a connection-level fault), the subscribe needs the reconnection
|
||||||
|
// state machine, not the tag-resolution retry. Drive a disconnect and let the
|
||||||
|
// request be re-stashed/retried after reconnect via ReSubscribeAll.
|
||||||
|
var connectionLevelFailure = msg.Results.Any(r => !r.Success && r.ConnectionLevelFailure);
|
||||||
|
|
||||||
foreach (var result in msg.Results)
|
foreach (var result in msg.Results)
|
||||||
{
|
{
|
||||||
instanceTags.Add(result.TagPath);
|
instanceTags.Add(result.TagPath);
|
||||||
@@ -572,13 +592,31 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
|
|||||||
_totalSubscribed++;
|
_totalSubscribed++;
|
||||||
_resolvedTags++;
|
_resolvedTags++;
|
||||||
}
|
}
|
||||||
|
else if (result.ConnectionLevelFailure)
|
||||||
|
{
|
||||||
|
// Connection-level fault — do not count as an unresolved tag.
|
||||||
|
// ReSubscribeAll after reconnect derives the tag from
|
||||||
|
// _subscriptionsByInstance (already updated above).
|
||||||
|
_log.Warning("[{0}] Subscribe for {1} failed at connection level: {2}",
|
||||||
|
_connectionName, result.TagPath, result.Error);
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
// WP-12: mark unresolved so the periodic retry timer picks it up.
|
// WP-12: genuine tag resolution failure — mark unresolved so the
|
||||||
|
// periodic retry timer picks it up.
|
||||||
_unresolvedTags.Add(result.TagPath);
|
_unresolvedTags.Add(result.TagPath);
|
||||||
_totalSubscribed++;
|
_totalSubscribed++;
|
||||||
_log.Debug("[{0}] Tag resolution failed for {1}: {2}",
|
_log.Debug("[{0}] Tag resolution failed for {1}: {2}",
|
||||||
_connectionName, result.TagPath, result.Error);
|
_connectionName, result.TagPath, result.Error);
|
||||||
|
|
||||||
|
// DataConnectionLayer-004 / design doc Tag Path Resolution step 2:
|
||||||
|
// mark the attribute quality `bad` so the Instance Actor sees a
|
||||||
|
// signal rather than staying Uncertain indefinitely.
|
||||||
|
if (_subscribers.TryGetValue(instanceName, out var subscriber))
|
||||||
|
{
|
||||||
|
subscriber.Tell(new TagValueUpdate(
|
||||||
|
_connectionName, result.TagPath, null, QualityCode.Bad, DateTimeOffset.UtcNow));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -594,6 +632,27 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
|
|||||||
|
|
||||||
msg.ReplyTo.Tell(new SubscribeTagsResponse(
|
msg.ReplyTo.Tell(new SubscribeTagsResponse(
|
||||||
msg.Request.CorrelationId, instanceName, true, null, DateTimeOffset.UtcNow));
|
msg.Request.CorrelationId, instanceName, true, null, DateTimeOffset.UtcNow));
|
||||||
|
|
||||||
|
// The caller (Connected state only) decides whether to enter Reconnecting.
|
||||||
|
// In Connecting/Reconnecting the connection is not established anyway, so the
|
||||||
|
// existing reconnect cycle handles recovery without a re-trigger here.
|
||||||
|
return connectionLevelFailure;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// DataConnectionLayer-004: classifies a subscribe exception as a connection-level
|
||||||
|
/// fault (adapter not connected / transport down) versus a genuine tag-resolution
|
||||||
|
/// failure (the node does not exist on the device). Connection-level faults must
|
||||||
|
/// drive the reconnection state machine; resolution failures are retried on the
|
||||||
|
/// tag-resolution timer.
|
||||||
|
/// </summary>
|
||||||
|
private static bool IsConnectionLevelFailure(Exception ex)
|
||||||
|
{
|
||||||
|
var baseEx = ex is AggregateException agg ? agg.GetBaseException() : ex;
|
||||||
|
return baseEx is InvalidOperationException
|
||||||
|
or System.Net.Sockets.SocketException
|
||||||
|
or TimeoutException
|
||||||
|
or System.IO.IOException;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void HandleUnsubscribe(UnsubscribeTagsRequest request)
|
private void HandleUnsubscribe(UnsubscribeTagsRequest request)
|
||||||
@@ -634,15 +693,29 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
|
|||||||
_log.Debug("[{0}] Writing to tag {1}", _connectionName, request.TagPath);
|
_log.Debug("[{0}] Writing to tag {1}", _connectionName, request.TagPath);
|
||||||
var sender = Sender;
|
var sender = Sender;
|
||||||
|
|
||||||
|
// DataConnectionLayer-005: bound the write with WriteTimeout. A hung device
|
||||||
|
// write (TCP black-hole) would otherwise never complete, so PipeTo never
|
||||||
|
// fires and the calling script gets no DCL-level error. The CancellationToken
|
||||||
|
// is passed to the adapter; on timeout we translate cancellation into a
|
||||||
|
// failed WriteTagResponse so the failure is returned synchronously (WP-11).
|
||||||
|
var cts = new CancellationTokenSource(_options.WriteTimeout);
|
||||||
|
|
||||||
// WP-11: Write through DCL to device, failure returned synchronously
|
// WP-11: Write through DCL to device, failure returned synchronously
|
||||||
_adapter.WriteAsync(request.TagPath, request.Value).ContinueWith(t =>
|
_adapter.WriteAsync(request.TagPath, request.Value, cts.Token).ContinueWith(t =>
|
||||||
{
|
{
|
||||||
|
cts.Dispose();
|
||||||
if (t.IsCompletedSuccessfully)
|
if (t.IsCompletedSuccessfully)
|
||||||
{
|
{
|
||||||
var result = t.Result;
|
var result = t.Result;
|
||||||
return new WriteTagResponse(
|
return new WriteTagResponse(
|
||||||
request.CorrelationId, result.Success, result.ErrorMessage, DateTimeOffset.UtcNow);
|
request.CorrelationId, result.Success, result.ErrorMessage, DateTimeOffset.UtcNow);
|
||||||
}
|
}
|
||||||
|
if (t.IsCanceled || t.Exception?.GetBaseException() is OperationCanceledException)
|
||||||
|
{
|
||||||
|
return new WriteTagResponse(
|
||||||
|
request.CorrelationId, false,
|
||||||
|
$"Write timeout after {_options.WriteTimeout.TotalSeconds:F0}s", DateTimeOffset.UtcNow);
|
||||||
|
}
|
||||||
return new WriteTagResponse(
|
return new WriteTagResponse(
|
||||||
request.CorrelationId, false, t.Exception?.GetBaseException().Message, DateTimeOffset.UtcNow);
|
request.CorrelationId, false, t.Exception?.GetBaseException().Message, DateTimeOffset.UtcNow);
|
||||||
}).PipeTo(sender);
|
}).PipeTo(sender);
|
||||||
@@ -824,7 +897,8 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
|
|||||||
internal record TagResolutionSucceeded(string TagPath, string SubscriptionId);
|
internal record TagResolutionSucceeded(string TagPath, string SubscriptionId);
|
||||||
internal record RetryTagResolution;
|
internal record RetryTagResolution;
|
||||||
internal record SubscribeTagResult(
|
internal record SubscribeTagResult(
|
||||||
string TagPath, bool AlreadySubscribed, bool Success, string? SubscriptionId, string? Error);
|
string TagPath, bool AlreadySubscribed, bool Success, string? SubscriptionId, string? Error,
|
||||||
|
bool ConnectionLevelFailure = false);
|
||||||
internal record SubscribeCompleted(
|
internal record SubscribeCompleted(
|
||||||
SubscribeTagsRequest Request, IActorRef ReplyTo, IReadOnlyList<SubscribeTagResult> Results);
|
SubscribeTagsRequest Request, IActorRef ReplyTo, IReadOnlyList<SubscribeTagResult> Results);
|
||||||
public record GetHealthReport;
|
public record GetHealthReport;
|
||||||
|
|||||||
@@ -125,8 +125,20 @@ public class DataConnectionManagerActor : ReceiveActor
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// OneForOneStrategy with Restart for connection actors — a failed connection
|
/// OneForOneStrategy with Resume for connection actors.
|
||||||
/// should restart and attempt reconnection.
|
///
|
||||||
|
/// DataConnectionLayer-002: a DataConnectionActor is a long-lived, stateful
|
||||||
|
/// coordinator — its in-memory subscription registry (_subscriptionsByInstance,
|
||||||
|
/// _subscriptionIds, _subscribers) is the only record of which Instance Actors
|
||||||
|
/// subscribed to which tags, and there is no durable store to rebuild it from.
|
||||||
|
/// Restart would create a fresh instance and silently discard that registry,
|
||||||
|
/// breaking the design doc's "transparent re-subscribe" guarantee (WP-10):
|
||||||
|
/// subscribers would never be re-subscribed and would sit at stale quality with
|
||||||
|
/// no error. Resume keeps the actor instance and its state intact, so a transient
|
||||||
|
/// exception in a message handler does not lose subscription state. The actor's
|
||||||
|
/// own Become/Stash reconnect state machine already recovers connection-level
|
||||||
|
/// faults, so it does not need a restart to re-establish the connection.
|
||||||
|
/// This matches the ScadaLink convention of Resume for coordinator actors.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
protected override SupervisorStrategy SupervisorStrategy()
|
protected override SupervisorStrategy SupervisorStrategy()
|
||||||
{
|
{
|
||||||
@@ -135,8 +147,8 @@ public class DataConnectionManagerActor : ReceiveActor
|
|||||||
withinTimeRange: TimeSpan.FromMinutes(1),
|
withinTimeRange: TimeSpan.FromMinutes(1),
|
||||||
decider: Decider.From(ex =>
|
decider: Decider.From(ex =>
|
||||||
{
|
{
|
||||||
_log.Warning(ex, "DataConnectionActor threw exception, restarting");
|
_log.Warning(ex, "DataConnectionActor threw exception, resuming (subscription state preserved)");
|
||||||
return Directive.Restart;
|
return Directive.Resume;
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
using System.Collections.Concurrent;
|
||||||
using System.Security.Cryptography.X509Certificates;
|
using System.Security.Cryptography.X509Certificates;
|
||||||
using Opc.Ua;
|
using Opc.Ua;
|
||||||
using Opc.Ua.Client;
|
using Opc.Ua.Client;
|
||||||
@@ -13,8 +14,14 @@ public class RealOpcUaClient : IOpcUaClient
|
|||||||
{
|
{
|
||||||
private ISession? _session;
|
private ISession? _session;
|
||||||
private Subscription? _subscription;
|
private Subscription? _subscription;
|
||||||
private readonly Dictionary<string, MonitoredItem> _monitoredItems = new();
|
|
||||||
private readonly Dictionary<string, Action<string, object?, DateTime, uint>> _callbacks = new();
|
// DataConnectionLayer-003: these maps are read from the OPC Foundation SDK's
|
||||||
|
// internal publish threads (the MonitoredItem.Notification handler reads
|
||||||
|
// _callbacks) concurrently with subscribe/disconnect mutations that run on
|
||||||
|
// thread-pool threads. Plain Dictionary access during a concurrent resize or
|
||||||
|
// Clear() is undefined behaviour, so they must be ConcurrentDictionary.
|
||||||
|
private readonly ConcurrentDictionary<string, MonitoredItem> _monitoredItems = new();
|
||||||
|
private readonly ConcurrentDictionary<string, Action<string, object?, DateTime, uint>> _callbacks = new();
|
||||||
private volatile bool _connectionLostFired;
|
private volatile bool _connectionLostFired;
|
||||||
private OpcUaConnectionOptions _options = new();
|
private OpcUaConnectionOptions _options = new();
|
||||||
private readonly OpcUaGlobalOptions _globalOptions;
|
private readonly OpcUaGlobalOptions _globalOptions;
|
||||||
@@ -180,8 +187,8 @@ public class RealOpcUaClient : IOpcUaClient
|
|||||||
{
|
{
|
||||||
_subscription.RemoveItem(item);
|
_subscription.RemoveItem(item);
|
||||||
await _subscription.ApplyChangesAsync(cancellationToken);
|
await _subscription.ApplyChangesAsync(cancellationToken);
|
||||||
_monitoredItems.Remove(subscriptionHandle);
|
_monitoredItems.TryRemove(subscriptionHandle, out _);
|
||||||
_callbacks.Remove(subscriptionHandle);
|
_callbacks.TryRemove(subscriptionHandle, out _);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -183,19 +183,53 @@ public class DeploymentService
|
|||||||
: Result<DeploymentRecord>.Failure(
|
: Result<DeploymentRecord>.Failure(
|
||||||
$"Deployment failed: {response.ErrorMessage ?? "Unknown error"}");
|
$"Deployment failed: {response.ErrorMessage ?? "Unknown error"}");
|
||||||
}
|
}
|
||||||
catch (Exception ex) when (ex is TimeoutException or OperationCanceledException)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
|
// DeploymentManager-001: any exception out of the try (timeout,
|
||||||
|
// cancellation, transport, serialization, DB) must leave the
|
||||||
|
// deployment record as Failed -- the design requires an interrupted
|
||||||
|
// deployment to be treated as failed, never stuck in InProgress.
|
||||||
|
//
|
||||||
|
// DeploymentManager-002: the failure-status write must NOT use the
|
||||||
|
// operation's cancellation token. If the operation was cancelled or
|
||||||
|
// timed out, that token is already cancelled and the cleanup writes
|
||||||
|
// would themselves throw before the Failed status is persisted.
|
||||||
|
// Use CancellationToken.None so the failure is durably recorded.
|
||||||
|
var isTimeout = ex is TimeoutException or OperationCanceledException;
|
||||||
|
|
||||||
record.Status = DeploymentStatus.Failed;
|
record.Status = DeploymentStatus.Failed;
|
||||||
record.ErrorMessage = $"Communication failure: {ex.Message}";
|
record.ErrorMessage = isTimeout
|
||||||
|
? $"Communication failure: {ex.Message}"
|
||||||
|
: $"Deployment error: {ex.Message}";
|
||||||
record.CompletedAt = DateTimeOffset.UtcNow;
|
record.CompletedAt = DateTimeOffset.UtcNow;
|
||||||
await _repository.UpdateDeploymentRecordAsync(record, cancellationToken);
|
|
||||||
await _repository.SaveChangesAsync(cancellationToken);
|
|
||||||
|
|
||||||
await _auditService.LogAsync(user, "DeployFailed", "Instance", instanceId.ToString(),
|
try
|
||||||
instance.UniqueName, new { DeploymentId = deploymentId, Error = ex.Message },
|
{
|
||||||
cancellationToken);
|
await _repository.UpdateDeploymentRecordAsync(record, CancellationToken.None);
|
||||||
|
await _repository.SaveChangesAsync(CancellationToken.None);
|
||||||
|
|
||||||
return Result<DeploymentRecord>.Failure($"Deployment timed out: {ex.Message}");
|
await _auditService.LogAsync(user, "DeployFailed", "Instance", instanceId.ToString(),
|
||||||
|
instance.UniqueName, new { DeploymentId = deploymentId, Error = ex.Message },
|
||||||
|
CancellationToken.None);
|
||||||
|
}
|
||||||
|
catch (Exception cleanupEx)
|
||||||
|
{
|
||||||
|
// The deployment already failed; a failed cleanup write must not
|
||||||
|
// mask the original error. Log loudly so an operator can reconcile.
|
||||||
|
_logger.LogError(cleanupEx,
|
||||||
|
"Failed to persist Failed status for deployment {DeploymentId} of instance {Instance} " +
|
||||||
|
"after deployment error: {Error}",
|
||||||
|
deploymentId, instance.UniqueName, ex.Message);
|
||||||
|
}
|
||||||
|
|
||||||
|
_logger.LogError(ex,
|
||||||
|
"Deployment {DeploymentId} for instance {Instance} failed",
|
||||||
|
deploymentId, instance.UniqueName);
|
||||||
|
|
||||||
|
return Result<DeploymentRecord>.Failure(
|
||||||
|
isTimeout
|
||||||
|
? $"Deployment timed out: {ex.Message}"
|
||||||
|
: $"Deployment failed: {ex.Message}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ using System.Net.Http.Headers;
|
|||||||
using System.Text;
|
using System.Text;
|
||||||
using System.Text.Json;
|
using System.Text.Json;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
|
using Microsoft.Extensions.Options;
|
||||||
using ScadaLink.Commons.Entities.ExternalSystems;
|
using ScadaLink.Commons.Entities.ExternalSystems;
|
||||||
using ScadaLink.Commons.Interfaces.Repositories;
|
using ScadaLink.Commons.Interfaces.Repositories;
|
||||||
using ScadaLink.Commons.Interfaces.Services;
|
using ScadaLink.Commons.Interfaces.Services;
|
||||||
@@ -22,17 +23,20 @@ public class ExternalSystemClient : IExternalSystemClient
|
|||||||
private readonly IExternalSystemRepository _repository;
|
private readonly IExternalSystemRepository _repository;
|
||||||
private readonly StoreAndForwardService? _storeAndForward;
|
private readonly StoreAndForwardService? _storeAndForward;
|
||||||
private readonly ILogger<ExternalSystemClient> _logger;
|
private readonly ILogger<ExternalSystemClient> _logger;
|
||||||
|
private readonly ExternalSystemGatewayOptions _options;
|
||||||
|
|
||||||
public ExternalSystemClient(
|
public ExternalSystemClient(
|
||||||
IHttpClientFactory httpClientFactory,
|
IHttpClientFactory httpClientFactory,
|
||||||
IExternalSystemRepository repository,
|
IExternalSystemRepository repository,
|
||||||
ILogger<ExternalSystemClient> logger,
|
ILogger<ExternalSystemClient> logger,
|
||||||
StoreAndForwardService? storeAndForward = null)
|
StoreAndForwardService? storeAndForward = null,
|
||||||
|
IOptions<ExternalSystemGatewayOptions>? options = null)
|
||||||
{
|
{
|
||||||
_httpClientFactory = httpClientFactory;
|
_httpClientFactory = httpClientFactory;
|
||||||
_repository = repository;
|
_repository = repository;
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
_storeAndForward = storeAndForward;
|
_storeAndForward = storeAndForward;
|
||||||
|
_options = options?.Value ?? new ExternalSystemGatewayOptions();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -198,22 +202,59 @@ public class ExternalSystemClient : IExternalSystemClient
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Enforce the per-call timeout. ExternalSystemDefinition has no per-system
|
||||||
|
// Timeout field yet, so the configured DefaultHttpTimeout is the effective
|
||||||
|
// round-trip limit (the design's "timeout applies to the HTTP request
|
||||||
|
// round-trip" guarantee). A linked CTS lets us distinguish a timeout from a
|
||||||
|
// caller-initiated cancellation: only the timeout is reclassified as transient.
|
||||||
|
using var timeoutCts = new CancellationTokenSource(_options.DefaultHttpTimeout);
|
||||||
|
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(
|
||||||
|
cancellationToken, timeoutCts.Token);
|
||||||
|
|
||||||
HttpResponseMessage response;
|
HttpResponseMessage response;
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
response = await client.SendAsync(request, cancellationToken);
|
response = await client.SendAsync(request, linkedCts.Token);
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
// The caller asked to abandon the work — do not reclassify as transient.
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException ex) when (timeoutCts.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
// Our own timeout elapsed — a transient failure per the design.
|
||||||
|
throw ErrorClassifier.AsTransient(
|
||||||
|
$"Timeout calling {system.Name} after {_options.DefaultHttpTimeout.TotalSeconds:0.##}s", ex);
|
||||||
}
|
}
|
||||||
catch (Exception ex) when (ErrorClassifier.IsTransient(ex))
|
catch (Exception ex) when (ErrorClassifier.IsTransient(ex))
|
||||||
{
|
{
|
||||||
throw ErrorClassifier.AsTransient($"Connection error to {system.Name}: {ex.Message}", ex);
|
throw ErrorClassifier.AsTransient($"Connection error to {system.Name}: {ex.Message}", ex);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (response.IsSuccessStatusCode)
|
// The timeout also covers reading the response body (the design's
|
||||||
|
// "round-trip" guarantee), so the linked token is used for the read too.
|
||||||
|
string body;
|
||||||
|
try
|
||||||
{
|
{
|
||||||
return await response.Content.ReadAsStringAsync(cancellationToken);
|
body = await response.Content.ReadAsStringAsync(linkedCts.Token);
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException ex) when (timeoutCts.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
throw ErrorClassifier.AsTransient(
|
||||||
|
$"Timeout reading response from {system.Name} after {_options.DefaultHttpTimeout.TotalSeconds:0.##}s", ex);
|
||||||
}
|
}
|
||||||
|
|
||||||
var errorBody = await response.Content.ReadAsStringAsync(cancellationToken);
|
if (response.IsSuccessStatusCode)
|
||||||
|
{
|
||||||
|
return body;
|
||||||
|
}
|
||||||
|
|
||||||
|
var errorBody = body;
|
||||||
|
|
||||||
if (ErrorClassifier.IsTransient(response.StatusCode))
|
if (ErrorClassifier.IsTransient(response.StatusCode))
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -33,16 +33,24 @@ public class CentralHealthAggregator : BackgroundService, ICentralHealthAggregat
|
|||||||
/// Only replaces stored state if incoming sequence number is greater than last received.
|
/// Only replaces stored state if incoming sequence number is greater than last received.
|
||||||
/// Auto-marks previously offline sites as online.
|
/// Auto-marks previously offline sites as online.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// <see cref="SiteHealthState"/> is immutable: each transition produces a brand-new
|
||||||
|
/// instance, and the dictionary entry is replaced atomically. The mutation is
|
||||||
|
/// performed in a compare-and-swap retry loop rather than via the
|
||||||
|
/// <c>AddOrUpdate</c> update delegate so the sequence-number guard and the field
|
||||||
|
/// writes are evaluated as a single atomic step against the value actually
|
||||||
|
/// installed — the <c>AddOrUpdate</c> delegate may be invoked more than once
|
||||||
|
/// under contention and could otherwise act on a value that is then discarded.
|
||||||
|
/// </remarks>
|
||||||
public void ProcessReport(SiteHealthReport report)
|
public void ProcessReport(SiteHealthReport report)
|
||||||
{
|
{
|
||||||
var now = _timeProvider.GetUtcNow();
|
var now = _timeProvider.GetUtcNow();
|
||||||
|
|
||||||
_siteStates.AddOrUpdate(
|
while (true)
|
||||||
report.SiteId,
|
{
|
||||||
_ =>
|
if (!_siteStates.TryGetValue(report.SiteId, out var existing))
|
||||||
{
|
{
|
||||||
_logger.LogInformation("Site {SiteId} registered with sequence #{Seq}", report.SiteId, report.SequenceNumber);
|
var registered = new SiteHealthState
|
||||||
return new SiteHealthState
|
|
||||||
{
|
{
|
||||||
SiteId = report.SiteId,
|
SiteId = report.SiteId,
|
||||||
LatestReport = report,
|
LatestReport = report,
|
||||||
@@ -51,50 +59,84 @@ public class CentralHealthAggregator : BackgroundService, ICentralHealthAggregat
|
|||||||
LastSequenceNumber = report.SequenceNumber,
|
LastSequenceNumber = report.SequenceNumber,
|
||||||
IsOnline = true
|
IsOnline = true
|
||||||
};
|
};
|
||||||
},
|
|
||||||
(_, existing) =>
|
if (_siteStates.TryAdd(report.SiteId, registered))
|
||||||
|
{
|
||||||
|
_logger.LogInformation(
|
||||||
|
"Site {SiteId} registered with sequence #{Seq}", report.SiteId, report.SequenceNumber);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Lost the race — another thread registered first; retry as an update.
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (report.SequenceNumber <= existing.LastSequenceNumber)
|
||||||
{
|
{
|
||||||
if (report.SequenceNumber <= existing.LastSequenceNumber)
|
_logger.LogDebug(
|
||||||
|
"Rejecting stale report from site {SiteId}: seq {Incoming} <= {Last}",
|
||||||
|
report.SiteId, report.SequenceNumber, existing.LastSequenceNumber);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
var updated = existing with
|
||||||
|
{
|
||||||
|
LatestReport = report,
|
||||||
|
LastReportReceivedAt = now,
|
||||||
|
LastHeartbeatAt = now,
|
||||||
|
LastSequenceNumber = report.SequenceNumber,
|
||||||
|
IsOnline = true
|
||||||
|
};
|
||||||
|
|
||||||
|
if (_siteStates.TryUpdate(report.SiteId, updated, existing))
|
||||||
|
{
|
||||||
|
if (!existing.IsOnline)
|
||||||
{
|
{
|
||||||
_logger.LogDebug(
|
_logger.LogInformation(
|
||||||
"Rejecting stale report from site {SiteId}: seq {Incoming} <= {Last}",
|
"Site {SiteId} is back online (seq #{Seq})", report.SiteId, report.SequenceNumber);
|
||||||
report.SiteId, report.SequenceNumber, existing.LastSequenceNumber);
|
|
||||||
return existing;
|
|
||||||
}
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
var wasOffline = !existing.IsOnline;
|
// CAS lost — the entry changed under us; retry with the fresh value.
|
||||||
existing.LatestReport = report;
|
}
|
||||||
existing.LastReportReceivedAt = now;
|
|
||||||
existing.LastHeartbeatAt = now;
|
|
||||||
existing.LastSequenceNumber = report.SequenceNumber;
|
|
||||||
existing.IsOnline = true;
|
|
||||||
|
|
||||||
if (wasOffline)
|
|
||||||
{
|
|
||||||
_logger.LogInformation("Site {SiteId} is back online (seq #{Seq})", report.SiteId, report.SequenceNumber);
|
|
||||||
}
|
|
||||||
|
|
||||||
return existing;
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Bumps the last-seen timestamp for a site already known via a prior
|
/// Bumps the last-seen timestamp for a site already known via a prior
|
||||||
/// SiteHealthReport. Heartbeats from sites we have not yet received a
|
/// SiteHealthReport. Heartbeats from sites we have not yet received a
|
||||||
/// full report from are ignored — registration only happens on report.
|
/// full report from are ignored — registration only happens on report.
|
||||||
|
/// The update is an atomic compare-and-swap of the immutable state.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public void MarkHeartbeat(string siteId, DateTimeOffset receivedAt)
|
public void MarkHeartbeat(string siteId, DateTimeOffset receivedAt)
|
||||||
{
|
{
|
||||||
if (!_siteStates.TryGetValue(siteId, out var state))
|
while (true)
|
||||||
return;
|
|
||||||
|
|
||||||
if (receivedAt > state.LastHeartbeatAt)
|
|
||||||
state.LastHeartbeatAt = receivedAt;
|
|
||||||
|
|
||||||
if (!state.IsOnline)
|
|
||||||
{
|
{
|
||||||
state.IsOnline = true;
|
if (!_siteStates.TryGetValue(siteId, out var existing))
|
||||||
_logger.LogInformation("Site {SiteId} is back online (heartbeat)", siteId);
|
return;
|
||||||
|
|
||||||
|
var newHeartbeat = receivedAt > existing.LastHeartbeatAt
|
||||||
|
? receivedAt
|
||||||
|
: existing.LastHeartbeatAt;
|
||||||
|
|
||||||
|
// Nothing to change — avoid a needless swap.
|
||||||
|
if (newHeartbeat == existing.LastHeartbeatAt && existing.IsOnline)
|
||||||
|
return;
|
||||||
|
|
||||||
|
var updated = existing with
|
||||||
|
{
|
||||||
|
LastHeartbeatAt = newHeartbeat,
|
||||||
|
IsOnline = true
|
||||||
|
};
|
||||||
|
|
||||||
|
if (_siteStates.TryUpdate(siteId, updated, existing))
|
||||||
|
{
|
||||||
|
if (!existing.IsOnline)
|
||||||
|
_logger.LogInformation("Site {SiteId} is back online (heartbeat)", siteId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// CAS lost — retry with the fresh value.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -143,13 +185,20 @@ public class CentralHealthAggregator : BackgroundService, ICentralHealthAggregat
|
|||||||
var state = kvp.Value;
|
var state = kvp.Value;
|
||||||
if (!state.IsOnline) continue;
|
if (!state.IsOnline) continue;
|
||||||
|
|
||||||
// Use LastHeartbeatAt — heartbeats arrive every ~5s from any
|
// Use LastHeartbeatAt — heartbeats arrive frequently from any
|
||||||
// healthy site node, so OfflineTimeout only fires when no node
|
// healthy site node (cadence owned by Cluster Infrastructure /
|
||||||
// can reach central, not during single-node failovers.
|
// SiteCommunicationActor), so OfflineTimeout only fires when no
|
||||||
|
// node can reach central, not during single-node failovers.
|
||||||
var elapsed = now - state.LastHeartbeatAt;
|
var elapsed = now - state.LastHeartbeatAt;
|
||||||
if (elapsed > _options.OfflineTimeout)
|
if (elapsed <= _options.OfflineTimeout)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
// Atomically swap to an offline copy. If the CAS loses to a
|
||||||
|
// concurrent report/heartbeat the site was just heard from, so
|
||||||
|
// leaving it online is the correct outcome — no retry needed.
|
||||||
|
var offline = state with { IsOnline = false };
|
||||||
|
if (_siteStates.TryUpdate(kvp.Key, offline, state))
|
||||||
{
|
{
|
||||||
state.IsOnline = false;
|
|
||||||
_logger.LogWarning(
|
_logger.LogWarning(
|
||||||
"Site {SiteId} marked offline — no signal for {Elapsed}s (timeout: {Timeout}s)",
|
"Site {SiteId} marked offline — no signal for {Elapsed}s (timeout: {Timeout}s)",
|
||||||
state.SiteId, elapsed.TotalSeconds, _options.OfflineTimeout.TotalSeconds);
|
state.SiteId, elapsed.TotalSeconds, _options.OfflineTimeout.TotalSeconds);
|
||||||
|
|||||||
@@ -84,6 +84,20 @@ public class HealthReportSender : BackgroundService
|
|||||||
_collector.SetParkedMessageCount(parkedCount);
|
_collector.SetParkedMessageCount(parkedCount);
|
||||||
}
|
}
|
||||||
catch { /* Non-fatal — parked count will be 0 */ }
|
catch { /* Non-fatal — parked count will be 0 */ }
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
// Per-category pending-message buffer depths (the documented
|
||||||
|
// "store-and-forward buffer depth" triage metric). Keyed by
|
||||||
|
// StoreAndForwardCategory name so the central dashboard can
|
||||||
|
// render external/notification/DB-write depths separately.
|
||||||
|
var depthsByCategory = await _sfStorage.GetBufferDepthByCategoryAsync();
|
||||||
|
var depths = depthsByCategory.ToDictionary(
|
||||||
|
kvp => kvp.Key.ToString(),
|
||||||
|
kvp => kvp.Value);
|
||||||
|
_collector.SetStoreAndForwardDepths(depths);
|
||||||
|
}
|
||||||
|
catch { /* Non-fatal — buffer depths will be empty */ }
|
||||||
}
|
}
|
||||||
|
|
||||||
var seq = Interlocked.Increment(ref _sequenceNumber);
|
var seq = Interlocked.Increment(ref _sequenceNumber);
|
||||||
|
|||||||
@@ -4,26 +4,37 @@ namespace ScadaLink.HealthMonitoring;
|
|||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// In-memory state for a single site's health, stored by the central aggregator.
|
/// In-memory state for a single site's health, stored by the central aggregator.
|
||||||
|
/// Immutable: every state transition produces a new instance which the aggregator
|
||||||
|
/// installs into its <c>ConcurrentDictionary</c> via an atomic compare-and-swap.
|
||||||
|
/// This makes handing the reference straight to UI callers safe — a consumer can
|
||||||
|
/// never observe a torn or half-applied update.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public class SiteHealthState
|
public sealed record SiteHealthState
|
||||||
{
|
{
|
||||||
public required string SiteId { get; init; }
|
public required string SiteId { get; init; }
|
||||||
public SiteHealthReport LatestReport { get; set; } = null!;
|
|
||||||
|
/// <summary>
|
||||||
|
/// The latest full <see cref="SiteHealthReport"/> received for the site, or
|
||||||
|
/// <c>null</c> if the site is known only via heartbeats and has not yet sent
|
||||||
|
/// a report.
|
||||||
|
/// </summary>
|
||||||
|
public SiteHealthReport? LatestReport { get; init; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Time the latest full <see cref="SiteHealthReport"/> was processed.
|
/// Time the latest full <see cref="SiteHealthReport"/> was processed.
|
||||||
/// Used by the UI to surface report staleness during failover.
|
/// Used by the UI to surface report staleness during failover.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public DateTimeOffset LastReportReceivedAt { get; set; }
|
public DateTimeOffset LastReportReceivedAt { get; init; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Time the most recent signal of any kind (full report OR ~5s heartbeat)
|
/// Time the most recent signal of any kind (full report OR heartbeat) was
|
||||||
/// was received. Drives offline detection — heartbeats from the standby
|
/// received. Drives offline detection — heartbeats from the standby keep the
|
||||||
/// keep the site marked online even when the active node is unable to
|
/// site marked online even when the active node is unable to produce a report
|
||||||
/// produce a report (mid-failover, brief stalls).
|
/// (mid-failover, brief stalls). See the heartbeat scheduler owned by the
|
||||||
|
/// Cluster Infrastructure / SiteCommunicationActor for the actual cadence.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public DateTimeOffset LastHeartbeatAt { get; set; }
|
public DateTimeOffset LastHeartbeatAt { get; init; }
|
||||||
|
|
||||||
public long LastSequenceNumber { get; set; }
|
public long LastSequenceNumber { get; init; }
|
||||||
public bool IsOnline { get; set; }
|
public bool IsOnline { get; init; }
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -131,9 +131,14 @@ try
|
|||||||
app.UseAuthorization();
|
app.UseAuthorization();
|
||||||
app.UseAntiforgery();
|
app.UseAntiforgery();
|
||||||
|
|
||||||
// WP-12: Map readiness endpoint — returns 503 until all checks pass, 200 when ready
|
// WP-12: Map readiness endpoint — returns 503 until ready, 200 when ready.
|
||||||
|
// REQ-HOST-4a defines readiness as cluster membership + DB connectivity,
|
||||||
|
// explicitly NOT cluster leadership. The leader-only "active-node" check is
|
||||||
|
// excluded here so a fully operational standby central node reports ready;
|
||||||
|
// leadership is reported separately on /health/active.
|
||||||
app.MapHealthChecks("/health/ready", new HealthCheckOptions
|
app.MapHealthChecks("/health/ready", new HealthCheckOptions
|
||||||
{
|
{
|
||||||
|
Predicate = check => check.Name != "active-node",
|
||||||
ResponseWriter = UIResponseWriter.WriteHealthCheckUIResponse
|
ResponseWriter = UIResponseWriter.WriteHealthCheckUIResponse
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -512,6 +512,106 @@ public class DataConnectionActorTests : TestKit
|
|||||||
Assert.Equal(instances * tagsPerInstance, report.ResolvedTags);
|
Assert.Equal(instances * tagsPerInstance, report.ResolvedTags);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── DataConnectionLayer-004: subscribe-time failure classification ──
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task DCL004_GenuineTagResolutionFailure_PushesBadQualityToSubscriber()
|
||||||
|
{
|
||||||
|
// Regression test for DataConnectionLayer-004. When a tag genuinely fails to
|
||||||
|
// resolve at subscribe time, the design doc (Tag Path Resolution, step 2)
|
||||||
|
// requires the attribute to be marked quality `bad`. The pre-fix code only
|
||||||
|
// logged and added the tag to _unresolvedTags — the Instance Actor never got
|
||||||
|
// a signal. After the fix, a bad-quality TagValueUpdate is pushed.
|
||||||
|
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
||||||
|
.Returns(Task.CompletedTask);
|
||||||
|
_mockAdapter.Status.Returns(ConnectionHealth.Connected);
|
||||||
|
// Genuine node-not-found: a non-connection exception.
|
||||||
|
_mockAdapter.SubscribeAsync("missing/tag", Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
|
||||||
|
.Returns(Task.FromException<string>(new KeyNotFoundException("node not found")));
|
||||||
|
_mockAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
||||||
|
.Returns(new ReadResult(false, null, null));
|
||||||
|
|
||||||
|
var actor = CreateConnectionActor("dcl004-bad-quality");
|
||||||
|
await Task.Delay(300);
|
||||||
|
|
||||||
|
actor.Tell(new SubscribeTagsRequest(
|
||||||
|
"c1", "inst1", "dcl004-bad-quality", ["missing/tag"], DateTimeOffset.UtcNow));
|
||||||
|
|
||||||
|
// Two messages arrive: the subscribe ack and a bad-quality update for the tag.
|
||||||
|
var bad = ExpectMsg<TagValueUpdate>(TimeSpan.FromSeconds(5));
|
||||||
|
Assert.Equal("missing/tag", bad.TagPath);
|
||||||
|
Assert.Equal(QualityCode.Bad, bad.Quality);
|
||||||
|
|
||||||
|
var ack = ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(5));
|
||||||
|
Assert.True(ack.Success);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task DCL004_ConnectionLevelSubscribeFailure_TriggersReconnect_NotTagRetry()
|
||||||
|
{
|
||||||
|
// Regression test for DataConnectionLayer-004. A subscribe failing because the
|
||||||
|
// adapter is not connected (InvalidOperationException from EnsureConnected) is
|
||||||
|
// a connection problem, not a bad tag path. The pre-fix code misclassified it
|
||||||
|
// as an unresolved tag and retried it on the 10s tag-resolution timer. After
|
||||||
|
// the fix it drives the reconnection state machine instead.
|
||||||
|
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
||||||
|
.Returns(Task.CompletedTask);
|
||||||
|
_mockAdapter.Status.Returns(ConnectionHealth.Connected);
|
||||||
|
_mockAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
|
||||||
|
.Returns(Task.FromException<string>(
|
||||||
|
new InvalidOperationException("OPC UA client is not connected.")));
|
||||||
|
_mockAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
||||||
|
.Returns(new ReadResult(false, null, null));
|
||||||
|
|
||||||
|
var actor = CreateConnectionActor("dcl004-conn-level");
|
||||||
|
await Task.Delay(300);
|
||||||
|
|
||||||
|
actor.Tell(new SubscribeTagsRequest(
|
||||||
|
"c1", "inst1", "dcl004-conn-level", ["some/tag"], DateTimeOffset.UtcNow));
|
||||||
|
|
||||||
|
// The connection-level failure must drive the actor into Reconnecting, which
|
||||||
|
// re-attempts ConnectAsync. Pre-fix the actor stayed Connected and only armed
|
||||||
|
// the tag-resolution timer, so ConnectAsync is called exactly once.
|
||||||
|
AwaitCondition(() =>
|
||||||
|
_mockAdapter.ReceivedCalls().Count(c => c.GetMethodInfo().Name == "ConnectAsync") >= 2,
|
||||||
|
TimeSpan.FromSeconds(5));
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── DataConnectionLayer-005: WriteTimeout must bound a hung write ──
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task DCL005_Write_ThatHangs_TimesOutAndReturnsFailureSynchronously()
|
||||||
|
{
|
||||||
|
// Regression test for DataConnectionLayer-005. HandleWrite called WriteAsync
|
||||||
|
// with no CancellationToken and no timeout, so a hung device write never
|
||||||
|
// produced a WriteTagResponse. The calling script would block until its own
|
||||||
|
// Ask-timeout with no DCL-level error. After the fix, _options.WriteTimeout
|
||||||
|
// bounds the write and a timeout is surfaced as a failed WriteTagResponse.
|
||||||
|
_options.WriteTimeout = TimeSpan.FromMilliseconds(300);
|
||||||
|
|
||||||
|
_mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
||||||
|
.Returns(Task.CompletedTask);
|
||||||
|
|
||||||
|
// WriteAsync never completes unless its cancellation token fires.
|
||||||
|
_mockAdapter.WriteAsync("tag1", 42, Arg.Any<CancellationToken>())
|
||||||
|
.Returns(ci =>
|
||||||
|
{
|
||||||
|
var ct = ci.Arg<CancellationToken>();
|
||||||
|
var tcs = new TaskCompletionSource<WriteResult>();
|
||||||
|
ct.Register(() => tcs.TrySetCanceled(ct));
|
||||||
|
return tcs.Task;
|
||||||
|
});
|
||||||
|
|
||||||
|
var actor = CreateConnectionActor("dcl005-write-timeout");
|
||||||
|
await Task.Delay(300); // reach Connected state
|
||||||
|
|
||||||
|
actor.Tell(new WriteTagRequest("corr1", "dcl005-write-timeout", "tag1", 42, DateTimeOffset.UtcNow));
|
||||||
|
|
||||||
|
var response = ExpectMsg<WriteTagResponse>(TimeSpan.FromSeconds(3));
|
||||||
|
Assert.False(response.Success);
|
||||||
|
Assert.Contains("timeout", response.ErrorMessage, StringComparison.OrdinalIgnoreCase);
|
||||||
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
public async Task DCL001_SubscribeWithFailedTags_CountsResolvedAndUnresolvedSeparately()
|
public async Task DCL001_SubscribeWithFailedTags_CountsResolvedAndUnresolvedSeparately()
|
||||||
{
|
{
|
||||||
@@ -533,7 +633,11 @@ public class DataConnectionActorTests : TestKit
|
|||||||
actor.Tell(new SubscribeTagsRequest(
|
actor.Tell(new SubscribeTagsRequest(
|
||||||
"c1", "inst1", "dcl001-failed-tags",
|
"c1", "inst1", "dcl001-failed-tags",
|
||||||
["good/a", "good/b", "good/c", "bad/x", "bad/y"], DateTimeOffset.UtcNow));
|
["good/a", "good/b", "good/c", "bad/x", "bad/y"], DateTimeOffset.UtcNow));
|
||||||
ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(5));
|
|
||||||
|
// Two genuine resolution failures now also push a bad-quality TagValueUpdate
|
||||||
|
// to the subscriber (DataConnectionLayer-004); skip past those to the ack.
|
||||||
|
var ack = FishForMessage<SubscribeTagsResponse>(_ => true, TimeSpan.FromSeconds(5));
|
||||||
|
Assert.True(ack.Success);
|
||||||
|
|
||||||
actor.Tell(new DataConnectionActor.GetHealthReport());
|
actor.Tell(new DataConnectionActor.GetHealthReport());
|
||||||
var report = ExpectMsg<DataConnectionHealthReport>(TimeSpan.FromSeconds(3));
|
var report = ExpectMsg<DataConnectionHealthReport>(TimeSpan.FromSeconds(3));
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ using Akka.TestKit.Xunit2;
|
|||||||
using NSubstitute;
|
using NSubstitute;
|
||||||
using ScadaLink.Commons.Interfaces.Protocol;
|
using ScadaLink.Commons.Interfaces.Protocol;
|
||||||
using ScadaLink.Commons.Messages.DataConnection;
|
using ScadaLink.Commons.Messages.DataConnection;
|
||||||
|
using ScadaLink.Commons.Types.Enums;
|
||||||
using ScadaLink.DataConnectionLayer.Actors;
|
using ScadaLink.DataConnectionLayer.Actors;
|
||||||
using ScadaLink.HealthMonitoring;
|
using ScadaLink.HealthMonitoring;
|
||||||
|
|
||||||
@@ -57,6 +58,52 @@ public class DataConnectionManagerActorTests : TestKit
|
|||||||
Assert.Contains("Unknown connection", response.ErrorMessage);
|
Assert.Contains("Unknown connection", response.ErrorMessage);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task DCL002_ConnectionActorCrash_PreservesSubscriptionState()
|
||||||
|
{
|
||||||
|
// Regression test for DataConnectionLayer-002. The supervisor used
|
||||||
|
// Directive.Restart, which discards the connection actor's in-memory
|
||||||
|
// subscription registry — breaking the design doc's "transparent
|
||||||
|
// re-subscribe" guarantee (subscribers are never re-subscribed and sit at
|
||||||
|
// stale quality forever). After the fix the supervisor uses Resume, which
|
||||||
|
// keeps the actor instance and its state across a transient exception.
|
||||||
|
var mockAdapter = Substitute.For<IDataConnection>();
|
||||||
|
mockAdapter.ConnectAsync(Arg.Any<IDictionary<string, string>>(), Arg.Any<CancellationToken>())
|
||||||
|
.Returns(Task.CompletedTask);
|
||||||
|
mockAdapter.Status.Returns(ConnectionHealth.Connected);
|
||||||
|
mockAdapter.SubscribeAsync(Arg.Any<string>(), Arg.Any<SubscriptionCallback>(), Arg.Any<CancellationToken>())
|
||||||
|
.Returns("sub-001");
|
||||||
|
mockAdapter.ReadAsync(Arg.Any<string>(), Arg.Any<CancellationToken>())
|
||||||
|
.Returns(new ReadResult(false, null, null));
|
||||||
|
// A write throws synchronously, escaping the message handler and crashing
|
||||||
|
// the connection actor — exercising the supervisor strategy.
|
||||||
|
mockAdapter.WriteAsync(Arg.Any<string>(), Arg.Any<object?>(), Arg.Any<CancellationToken>())
|
||||||
|
.Returns<Task<WriteResult>>(_ => throw new InvalidOperationException("boom"));
|
||||||
|
|
||||||
|
_mockFactory.Create("OpcUa", Arg.Any<IDictionary<string, string>>()).Returns(mockAdapter);
|
||||||
|
|
||||||
|
var manager = Sys.ActorOf(Props.Create(() =>
|
||||||
|
new DataConnectionManagerActor(_mockFactory, _options, _mockHealthCollector)));
|
||||||
|
|
||||||
|
manager.Tell(new CreateConnectionCommand("conn1", "OpcUa", new Dictionary<string, string>(), null, 3));
|
||||||
|
await Task.Delay(300); // connection actor reaches Connected
|
||||||
|
|
||||||
|
// Register a subscription.
|
||||||
|
manager.Tell(new SubscribeTagsRequest("c1", "inst1", "conn1", ["tag1"], DateTimeOffset.UtcNow));
|
||||||
|
ExpectMsg<SubscribeTagsResponse>(TimeSpan.FromSeconds(3));
|
||||||
|
|
||||||
|
// Crash the connection actor via a synchronously-throwing write.
|
||||||
|
manager.Tell(new WriteTagRequest("c2", "conn1", "tag1", 42, DateTimeOffset.UtcNow));
|
||||||
|
await Task.Delay(300); // supervisor handles the failure
|
||||||
|
|
||||||
|
// After the crash the subscription state must survive: the health report
|
||||||
|
// still shows the subscribed/resolved tag. With Restart it would be 0.
|
||||||
|
manager.Tell(new GetAllHealthReports());
|
||||||
|
var report = ExpectMsg<DataConnectionHealthReport>(TimeSpan.FromSeconds(3));
|
||||||
|
Assert.Equal(1, report.TotalSubscribedTags);
|
||||||
|
Assert.Equal(1, report.ResolvedTags);
|
||||||
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
public void CreateConnection_UsesFactory()
|
public void CreateConnection_UsesFactory()
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -6,6 +6,37 @@ using ScadaLink.DataConnectionLayer.Adapters;
|
|||||||
|
|
||||||
namespace ScadaLink.DataConnectionLayer.Tests;
|
namespace ScadaLink.DataConnectionLayer.Tests;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// DataConnectionLayer-003: structural regression guard. RealOpcUaClient's
|
||||||
|
/// monitored-item / callback maps are read from the OPC UA SDK's publish threads
|
||||||
|
/// concurrently with subscribe/disconnect mutations on other threads. They must be
|
||||||
|
/// concurrent collections, not plain Dictionary. This is verified structurally
|
||||||
|
/// because RealOpcUaClient wraps concrete OPC Foundation SDK types and cannot be
|
||||||
|
/// exercised without a live OPC UA server.
|
||||||
|
/// </summary>
|
||||||
|
public class RealOpcUaClientThreadSafetyTests
|
||||||
|
{
|
||||||
|
[Theory]
|
||||||
|
[InlineData("_callbacks")]
|
||||||
|
[InlineData("_monitoredItems")]
|
||||||
|
public void DCL003_SharedDictionaryFields_AreConcurrentCollections(string fieldName)
|
||||||
|
{
|
||||||
|
var field = typeof(RealOpcUaClient)
|
||||||
|
.GetField(fieldName,
|
||||||
|
System.Reflection.BindingFlags.Instance |
|
||||||
|
System.Reflection.BindingFlags.NonPublic);
|
||||||
|
|
||||||
|
Assert.NotNull(field);
|
||||||
|
|
||||||
|
var fieldType = field!.FieldType;
|
||||||
|
Assert.True(
|
||||||
|
fieldType.IsGenericType &&
|
||||||
|
fieldType.GetGenericTypeDefinition() == typeof(System.Collections.Concurrent.ConcurrentDictionary<,>),
|
||||||
|
$"RealOpcUaClient.{fieldName} must be a ConcurrentDictionary<,> for thread safety, " +
|
||||||
|
$"but was {fieldType.Name}.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// WP-7: Tests for OPC UA adapter.
|
/// WP-7: Tests for OPC UA adapter.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
|||||||
@@ -150,6 +150,82 @@ public class DeploymentServiceTests
|
|||||||
await _repo.Received().AddDeploymentRecordAsync(Arg.Any<DeploymentRecord>(), Arg.Any<CancellationToken>());
|
await _repo.Received().AddDeploymentRecordAsync(Arg.Any<DeploymentRecord>(), Arg.Any<CancellationToken>());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── DeploymentManager-001: unexpected exception must not leave record InProgress ──
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task DeployInstanceAsync_CommunicationThrowsUnexpectedException_RecordMarkedFailed()
|
||||||
|
{
|
||||||
|
var instance = new Instance("TestInst") { Id = 1, SiteId = 1, State = InstanceState.NotDeployed };
|
||||||
|
_repo.GetInstanceByIdAsync(1).Returns(instance);
|
||||||
|
|
||||||
|
var config = new FlattenedConfiguration { InstanceUniqueName = "TestInst" };
|
||||||
|
_pipeline.FlattenAndValidateAsync(1, Arg.Any<CancellationToken>())
|
||||||
|
.Returns(Result<FlatteningPipelineResult>.Success(
|
||||||
|
new FlatteningPipelineResult(config, "sha256:abc", ValidationResult.Success())));
|
||||||
|
|
||||||
|
// Capture the deployment record so we can inspect its final state.
|
||||||
|
DeploymentRecord? captured = null;
|
||||||
|
await _repo.AddDeploymentRecordAsync(
|
||||||
|
Arg.Do<DeploymentRecord>(r => captured = r), Arg.Any<CancellationToken>());
|
||||||
|
|
||||||
|
// _comms has no actor set, so DeployInstanceAsync throws
|
||||||
|
// InvalidOperationException -- a non-timeout, non-cancellation exception.
|
||||||
|
var result = await _service.DeployInstanceAsync(1, "admin");
|
||||||
|
|
||||||
|
// The exception must be handled, not escape.
|
||||||
|
Assert.True(result.IsFailure);
|
||||||
|
Assert.Contains("Deployment failed", result.Error);
|
||||||
|
|
||||||
|
// The record must not be left stuck in InProgress.
|
||||||
|
Assert.NotNull(captured);
|
||||||
|
Assert.Equal(DeploymentStatus.Failed, captured!.Status);
|
||||||
|
Assert.NotNull(captured.ErrorMessage);
|
||||||
|
Assert.NotNull(captured.CompletedAt);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── DeploymentManager-002: failure write must not use a cancelled token ──
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task DeployInstanceAsync_FailureWrite_UsesNonCancellableToken()
|
||||||
|
{
|
||||||
|
var instance = new Instance("TestInst") { Id = 1, SiteId = 1, State = InstanceState.NotDeployed };
|
||||||
|
_repo.GetInstanceByIdAsync(Arg.Any<int>(), Arg.Any<CancellationToken>()).Returns(instance);
|
||||||
|
|
||||||
|
var config = new FlattenedConfiguration { InstanceUniqueName = "TestInst" };
|
||||||
|
_pipeline.FlattenAndValidateAsync(Arg.Any<int>(), Arg.Any<CancellationToken>())
|
||||||
|
.Returns(Result<FlatteningPipelineResult>.Success(
|
||||||
|
new FlatteningPipelineResult(config, "sha256:abc", ValidationResult.Success())));
|
||||||
|
|
||||||
|
DeploymentRecord? captured = null;
|
||||||
|
await _repo.AddDeploymentRecordAsync(
|
||||||
|
Arg.Do<DeploymentRecord>(r => captured = r), Arg.Any<CancellationToken>());
|
||||||
|
|
||||||
|
// Simulate a repository that rejects already-cancelled tokens (the
|
||||||
|
// real EF Core behaviour when the operation token is cancelled). If the
|
||||||
|
// catch block passes the operation's cancelled token, the Failed-status
|
||||||
|
// write throws and the record stays InProgress -- the exact bug.
|
||||||
|
_repo.UpdateDeploymentRecordAsync(
|
||||||
|
Arg.Is<DeploymentRecord>(r => r.Status == DeploymentStatus.Failed),
|
||||||
|
Arg.Is<CancellationToken>(ct => ct.IsCancellationRequested))
|
||||||
|
.Returns<Task>(_ => throw new OperationCanceledException());
|
||||||
|
_repo.SaveChangesAsync(Arg.Is<CancellationToken>(ct => ct.IsCancellationRequested))
|
||||||
|
.Returns<Task<int>>(_ => throw new OperationCanceledException());
|
||||||
|
|
||||||
|
// The communication call fails (no actor set). The catch block must
|
||||||
|
// persist the Failed status with a non-cancellable token, so cleanup
|
||||||
|
// succeeds even when the caller's token is cancelled.
|
||||||
|
var result = await _service.DeployInstanceAsync(1, "admin");
|
||||||
|
|
||||||
|
Assert.True(result.IsFailure);
|
||||||
|
Assert.NotNull(captured);
|
||||||
|
Assert.Equal(DeploymentStatus.Failed, captured!.Status);
|
||||||
|
|
||||||
|
// The Failed-status write happened with a non-cancelled token.
|
||||||
|
await _repo.Received().UpdateDeploymentRecordAsync(
|
||||||
|
Arg.Is<DeploymentRecord>(r => r.Status == DeploymentStatus.Failed),
|
||||||
|
Arg.Is<CancellationToken>(ct => !ct.IsCancellationRequested));
|
||||||
|
}
|
||||||
|
|
||||||
// ── WP-6: Lifecycle commands ──
|
// ── WP-6: Lifecycle commands ──
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
|
|||||||
@@ -1,8 +1,10 @@
|
|||||||
using System.Net;
|
using System.Net;
|
||||||
|
using Microsoft.Data.Sqlite;
|
||||||
using Microsoft.Extensions.Logging.Abstractions;
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
using NSubstitute;
|
using NSubstitute;
|
||||||
using ScadaLink.Commons.Entities.ExternalSystems;
|
using ScadaLink.Commons.Entities.ExternalSystems;
|
||||||
using ScadaLink.Commons.Interfaces.Repositories;
|
using ScadaLink.Commons.Interfaces.Repositories;
|
||||||
|
using ScadaLink.StoreAndForward;
|
||||||
|
|
||||||
namespace ScadaLink.ExternalSystemGateway.Tests;
|
namespace ScadaLink.ExternalSystemGateway.Tests;
|
||||||
|
|
||||||
@@ -216,6 +218,116 @@ public class ExternalSystemClientTests
|
|||||||
() => client.DeliverBufferedAsync(BufferedCall("TestAPI", "failMethod")));
|
() => client.DeliverBufferedAsync(BufferedCall("TestAPI", "failMethod")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── ExternalSystemGateway-003: CachedCall must not double-dispatch ──
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task CachedCall_TransientFailure_DoesNotImmediatelyRedispatchViaRegisteredHandler()
|
||||||
|
{
|
||||||
|
var system = new ExternalSystemDefinition("TestAPI", "https://api.example.com", "none") { Id = 1 };
|
||||||
|
var method = new ExternalSystemMethod("postData", "POST", "/post") { Id = 1, ExternalSystemDefinitionId = 1 };
|
||||||
|
_repository.GetAllExternalSystemsAsync(Arg.Any<CancellationToken>())
|
||||||
|
.Returns(new List<ExternalSystemDefinition> { system });
|
||||||
|
_repository.GetMethodsByExternalSystemIdAsync(1, Arg.Any<CancellationToken>())
|
||||||
|
.Returns(new List<ExternalSystemMethod> { method });
|
||||||
|
|
||||||
|
// The HTTP layer always fails transiently (500).
|
||||||
|
var httpClient = new HttpClient(new MockHttpMessageHandler(HttpStatusCode.InternalServerError, "boom"));
|
||||||
|
_httpClientFactory.CreateClient(Arg.Any<string>()).Returns(httpClient);
|
||||||
|
|
||||||
|
// A real S&F service with a registered delivery handler that counts invocations.
|
||||||
|
var dbName = $"EsgDoubleDispatch_{Guid.NewGuid():N}";
|
||||||
|
var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared";
|
||||||
|
using var keepAlive = new SqliteConnection(connStr);
|
||||||
|
keepAlive.Open();
|
||||||
|
var storage = new StoreAndForwardStorage(connStr, NullLogger<StoreAndForwardStorage>.Instance);
|
||||||
|
await storage.InitializeAsync();
|
||||||
|
var sfOptions = new StoreAndForwardOptions
|
||||||
|
{
|
||||||
|
DefaultRetryInterval = TimeSpan.FromMinutes(10),
|
||||||
|
RetryTimerInterval = TimeSpan.FromMinutes(10),
|
||||||
|
};
|
||||||
|
var sf = new StoreAndForwardService(storage, sfOptions, NullLogger<StoreAndForwardService>.Instance);
|
||||||
|
|
||||||
|
var handlerInvocations = 0;
|
||||||
|
sf.RegisterDeliveryHandler(
|
||||||
|
ScadaLink.Commons.Types.Enums.StoreAndForwardCategory.ExternalSystem,
|
||||||
|
_ => { Interlocked.Increment(ref handlerInvocations); return Task.FromResult(false); });
|
||||||
|
|
||||||
|
var client = new ExternalSystemClient(
|
||||||
|
_httpClientFactory, _repository,
|
||||||
|
NullLogger<ExternalSystemClient>.Instance,
|
||||||
|
storeAndForward: sf);
|
||||||
|
|
||||||
|
var result = await client.CachedCallAsync("TestAPI", "postData");
|
||||||
|
|
||||||
|
// The call already made one HTTP attempt; EnqueueAsync must NOT invoke the
|
||||||
|
// registered handler again synchronously (which would dispatch a 2nd request).
|
||||||
|
Assert.True(result.WasBuffered);
|
||||||
|
Assert.Equal(0, handlerInvocations);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── ExternalSystemGateway-002: per-system call timeout ──
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Call_SlowSystem_TimesOutAsTransientErrorWithinConfiguredWindow()
|
||||||
|
{
|
||||||
|
var system = new ExternalSystemDefinition("TestAPI", "https://api.example.com", "none") { Id = 1 };
|
||||||
|
var method = new ExternalSystemMethod("getData", "GET", "/data") { Id = 1, ExternalSystemDefinitionId = 1 };
|
||||||
|
_repository.GetAllExternalSystemsAsync(Arg.Any<CancellationToken>())
|
||||||
|
.Returns(new List<ExternalSystemDefinition> { system });
|
||||||
|
_repository.GetMethodsByExternalSystemIdAsync(1, Arg.Any<CancellationToken>())
|
||||||
|
.Returns(new List<ExternalSystemMethod> { method });
|
||||||
|
|
||||||
|
// Handler that hangs far longer than the configured timeout and the test budget.
|
||||||
|
var httpClient = new HttpClient(new HangingHttpMessageHandler(TimeSpan.FromMinutes(10)));
|
||||||
|
_httpClientFactory.CreateClient(Arg.Any<string>()).Returns(httpClient);
|
||||||
|
|
||||||
|
// Configure a short timeout so the call must fail quickly.
|
||||||
|
var options = new ExternalSystemGatewayOptions { DefaultHttpTimeout = TimeSpan.FromMilliseconds(200) };
|
||||||
|
var client = new ExternalSystemClient(
|
||||||
|
_httpClientFactory, _repository,
|
||||||
|
NullLogger<ExternalSystemClient>.Instance,
|
||||||
|
options: Microsoft.Extensions.Options.Options.Create(options));
|
||||||
|
|
||||||
|
var sw = System.Diagnostics.Stopwatch.StartNew();
|
||||||
|
var result = await client.CallAsync("TestAPI", "getData");
|
||||||
|
sw.Stop();
|
||||||
|
|
||||||
|
Assert.False(result.Success);
|
||||||
|
Assert.Contains("Transient error", result.ErrorMessage);
|
||||||
|
Assert.Contains("Timeout", result.ErrorMessage);
|
||||||
|
// Must fail near the configured 200ms, well before HttpClient's default 100s.
|
||||||
|
Assert.True(sw.Elapsed < TimeSpan.FromSeconds(10),
|
||||||
|
$"Call took {sw.Elapsed}, expected to time out near the configured 200ms window");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Call_CallerCancellation_IsNotMisreportedAsTimeout()
|
||||||
|
{
|
||||||
|
var system = new ExternalSystemDefinition("TestAPI", "https://api.example.com", "none") { Id = 1 };
|
||||||
|
var method = new ExternalSystemMethod("getData", "GET", "/data") { Id = 1, ExternalSystemDefinitionId = 1 };
|
||||||
|
_repository.GetAllExternalSystemsAsync(Arg.Any<CancellationToken>())
|
||||||
|
.Returns(new List<ExternalSystemDefinition> { system });
|
||||||
|
_repository.GetMethodsByExternalSystemIdAsync(1, Arg.Any<CancellationToken>())
|
||||||
|
.Returns(new List<ExternalSystemMethod> { method });
|
||||||
|
|
||||||
|
var httpClient = new HttpClient(new HangingHttpMessageHandler(TimeSpan.FromMinutes(10)));
|
||||||
|
_httpClientFactory.CreateClient(Arg.Any<string>()).Returns(httpClient);
|
||||||
|
|
||||||
|
var options = new ExternalSystemGatewayOptions { DefaultHttpTimeout = TimeSpan.FromMinutes(5) };
|
||||||
|
var client = new ExternalSystemClient(
|
||||||
|
_httpClientFactory, _repository,
|
||||||
|
NullLogger<ExternalSystemClient>.Instance,
|
||||||
|
options: Microsoft.Extensions.Options.Options.Create(options));
|
||||||
|
|
||||||
|
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200));
|
||||||
|
|
||||||
|
// Caller-initiated cancellation must surface as OperationCanceledException,
|
||||||
|
// not be swallowed as a transient timeout error.
|
||||||
|
await Assert.ThrowsAnyAsync<OperationCanceledException>(
|
||||||
|
() => client.CallAsync("TestAPI", "getData", cancellationToken: cts.Token));
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Test helper: mock HTTP message handler.
|
/// Test helper: mock HTTP message handler.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
@@ -238,4 +350,20 @@ public class ExternalSystemClientTests
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Test helper: an HTTP handler that hangs until cancelled (simulates a slow/hung system).
|
||||||
|
/// </summary>
|
||||||
|
private class HangingHttpMessageHandler : HttpMessageHandler
|
||||||
|
{
|
||||||
|
private readonly TimeSpan _delay;
|
||||||
|
|
||||||
|
public HangingHttpMessageHandler(TimeSpan delay) => _delay = delay;
|
||||||
|
|
||||||
|
protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
await Task.Delay(_delay, cancellationToken);
|
||||||
|
return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent("{}") };
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
+1
@@ -23,6 +23,7 @@
|
|||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<ProjectReference Include="../../src/ScadaLink.ExternalSystemGateway/ScadaLink.ExternalSystemGateway.csproj" />
|
<ProjectReference Include="../../src/ScadaLink.ExternalSystemGateway/ScadaLink.ExternalSystemGateway.csproj" />
|
||||||
<ProjectReference Include="../../src/ScadaLink.Commons/ScadaLink.Commons.csproj" />
|
<ProjectReference Include="../../src/ScadaLink.Commons/ScadaLink.Commons.csproj" />
|
||||||
|
<ProjectReference Include="../../src/ScadaLink.StoreAndForward/ScadaLink.StoreAndForward.csproj" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
</Project>
|
</Project>
|
||||||
|
|||||||
@@ -161,7 +161,62 @@ public class CentralHealthAggregatorTests
|
|||||||
_aggregator.ProcessReport(report);
|
_aggregator.ProcessReport(report);
|
||||||
|
|
||||||
var state = _aggregator.GetSiteState("site-1");
|
var state = _aggregator.GetSiteState("site-1");
|
||||||
Assert.Equal(42, state!.LatestReport.ScriptErrorCount);
|
Assert.Equal(42, state!.LatestReport!.ScriptErrorCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// HealthMonitoring-002 regression: SiteHealthState is mutated from multiple
|
||||||
|
/// threads (ProcessReport, MarkHeartbeat, CheckForOfflineSites). With a mutable
|
||||||
|
/// class and unsynchronized field writes, a snapshot read could observe a torn
|
||||||
|
/// or half-applied state. The state must be immutable and every transition an
|
||||||
|
/// atomic reference swap, so a snapshot is always internally consistent and the
|
||||||
|
/// monotonic sequence-number guard is never subverted by a lost update.
|
||||||
|
/// </summary>
|
||||||
|
[Fact]
|
||||||
|
public async Task ProcessReport_ConcurrentUpdates_NeverLoseSequenceOrTearState()
|
||||||
|
{
|
||||||
|
const int iterations = 5_000;
|
||||||
|
// SiteHealthState must be an immutable record so handing the reference to
|
||||||
|
// UI callers (and reading it concurrently) is safe.
|
||||||
|
Assert.True(typeof(SiteHealthState).GetMethod("<Clone>$") != null,
|
||||||
|
"SiteHealthState must be an immutable record for safe concurrent reads.");
|
||||||
|
|
||||||
|
_aggregator.ProcessReport(MakeReport("site-1", 0));
|
||||||
|
|
||||||
|
var writer = Task.Run(() =>
|
||||||
|
{
|
||||||
|
for (long seq = 1; seq <= iterations; seq++)
|
||||||
|
_aggregator.ProcessReport(MakeReport("site-1", seq));
|
||||||
|
});
|
||||||
|
|
||||||
|
var heartbeater = Task.Run(() =>
|
||||||
|
{
|
||||||
|
for (int i = 0; i < iterations; i++)
|
||||||
|
_aggregator.MarkHeartbeat("site-1", _timeProvider.GetUtcNow());
|
||||||
|
});
|
||||||
|
|
||||||
|
long maxObserved = 0;
|
||||||
|
var reader = Task.Run(() =>
|
||||||
|
{
|
||||||
|
for (int i = 0; i < iterations; i++)
|
||||||
|
{
|
||||||
|
var state = _aggregator.GetSiteState("site-1");
|
||||||
|
if (state == null) continue;
|
||||||
|
// A consistent snapshot: the stored report's sequence number must
|
||||||
|
// always match the state's LastSequenceNumber (no half-applied update).
|
||||||
|
Assert.Equal(state.LastSequenceNumber, state.LatestReport!.SequenceNumber);
|
||||||
|
if (state.LastSequenceNumber > maxObserved)
|
||||||
|
maxObserved = state.LastSequenceNumber;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
await Task.WhenAll(writer, heartbeater, reader);
|
||||||
|
|
||||||
|
// The final state must reflect the highest sequence — no lost update.
|
||||||
|
var final = _aggregator.GetSiteState("site-1");
|
||||||
|
Assert.Equal(iterations, final!.LastSequenceNumber);
|
||||||
|
Assert.Equal(iterations, final.LatestReport!.SequenceNumber);
|
||||||
|
Assert.True(final.IsOnline);
|
||||||
}
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
@@ -173,11 +228,11 @@ public class CentralHealthAggregatorTests
|
|||||||
_aggregator.ProcessReport(MakeReport("site-1", 10));
|
_aggregator.ProcessReport(MakeReport("site-1", 10));
|
||||||
_aggregator.ProcessReport(MakeReport("site-1", 1));
|
_aggregator.ProcessReport(MakeReport("site-1", 1));
|
||||||
|
|
||||||
var state = _aggregator.GetSiteState("site-1");
|
Assert.Equal(10, _aggregator.GetSiteState("site-1")!.LastSequenceNumber);
|
||||||
Assert.Equal(10, state!.LastSequenceNumber);
|
|
||||||
|
|
||||||
// Once it exceeds the old max, it works again
|
// Once it exceeds the old max, it works again. SiteHealthState is an
|
||||||
|
// immutable snapshot, so re-fetch to observe the new state.
|
||||||
_aggregator.ProcessReport(MakeReport("site-1", 11));
|
_aggregator.ProcessReport(MakeReport("site-1", 11));
|
||||||
Assert.Equal(11, state.LastSequenceNumber);
|
Assert.Equal(11, _aggregator.GetSiteState("site-1")!.LastSequenceNumber);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,9 @@
|
|||||||
|
using Microsoft.Data.Sqlite;
|
||||||
using Microsoft.Extensions.Logging.Abstractions;
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
using Microsoft.Extensions.Options;
|
using Microsoft.Extensions.Options;
|
||||||
using ScadaLink.Commons.Messages.Health;
|
using ScadaLink.Commons.Messages.Health;
|
||||||
using ScadaLink.Commons.Types.Enums;
|
using ScadaLink.Commons.Types.Enums;
|
||||||
|
using ScadaLink.StoreAndForward;
|
||||||
|
|
||||||
namespace ScadaLink.HealthMonitoring.Tests;
|
namespace ScadaLink.HealthMonitoring.Tests;
|
||||||
|
|
||||||
@@ -136,6 +138,76 @@ public class HealthReportSenderTests
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// HealthMonitoring-001 regression: the documented "store-and-forward buffer
|
||||||
|
/// depth" metric (pending messages by category) must actually be populated in
|
||||||
|
/// the emitted report. Previously SetStoreAndForwardDepths had no callers, so
|
||||||
|
/// StoreAndForwardBufferDepths was always empty. The sender must query the S&F
|
||||||
|
/// engine's per-category depth API and include it alongside the parked count.
|
||||||
|
/// </summary>
|
||||||
|
[Fact]
|
||||||
|
public async Task ReportsIncludeStoreAndForwardBufferDepthsFromStorage()
|
||||||
|
{
|
||||||
|
var dbName = $"HealthSfDepth_{Guid.NewGuid():N}";
|
||||||
|
var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared";
|
||||||
|
// Keep one connection alive so the in-memory DB persists for the test.
|
||||||
|
using var keepAlive = new SqliteConnection(connStr);
|
||||||
|
keepAlive.Open();
|
||||||
|
|
||||||
|
var storage = new StoreAndForwardStorage(connStr, NullLogger<StoreAndForwardStorage>.Instance);
|
||||||
|
await storage.InitializeAsync();
|
||||||
|
|
||||||
|
// Two pending ExternalSystem messages and one pending Notification message.
|
||||||
|
await storage.EnqueueAsync(MakePendingMessage("m1", StoreAndForwardCategory.ExternalSystem));
|
||||||
|
await storage.EnqueueAsync(MakePendingMessage("m2", StoreAndForwardCategory.ExternalSystem));
|
||||||
|
await storage.EnqueueAsync(MakePendingMessage("m3", StoreAndForwardCategory.Notification));
|
||||||
|
|
||||||
|
var transport = new FakeTransport();
|
||||||
|
var collector = new SiteHealthCollector();
|
||||||
|
collector.SetActiveNode(true);
|
||||||
|
var options = Options.Create(new HealthMonitoringOptions
|
||||||
|
{
|
||||||
|
ReportInterval = TimeSpan.FromMilliseconds(50)
|
||||||
|
});
|
||||||
|
|
||||||
|
var sender = new HealthReportSender(
|
||||||
|
collector,
|
||||||
|
transport,
|
||||||
|
options,
|
||||||
|
NullLogger<HealthReportSender>.Instance,
|
||||||
|
new FakeSiteIdentityProvider(),
|
||||||
|
sfStorage: storage);
|
||||||
|
|
||||||
|
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(300));
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await sender.StartAsync(cts.Token);
|
||||||
|
await Task.Delay(250, CancellationToken.None);
|
||||||
|
await sender.StopAsync(CancellationToken.None);
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) { }
|
||||||
|
|
||||||
|
Assert.True(transport.SentReports.Count >= 1);
|
||||||
|
var depths = transport.SentReports[^1].StoreAndForwardBufferDepths;
|
||||||
|
Assert.Equal(2, depths[nameof(StoreAndForwardCategory.ExternalSystem)]);
|
||||||
|
Assert.Equal(1, depths[nameof(StoreAndForwardCategory.Notification)]);
|
||||||
|
Assert.False(depths.ContainsKey(nameof(StoreAndForwardCategory.CachedDbWrite)));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static StoreAndForwardMessage MakePendingMessage(string id, StoreAndForwardCategory category) =>
|
||||||
|
new()
|
||||||
|
{
|
||||||
|
Id = id,
|
||||||
|
Category = category,
|
||||||
|
Target = "target",
|
||||||
|
PayloadJson = "{}",
|
||||||
|
RetryCount = 0,
|
||||||
|
MaxRetries = 50,
|
||||||
|
RetryIntervalMs = 30_000,
|
||||||
|
CreatedAt = DateTimeOffset.UtcNow,
|
||||||
|
Status = StoreAndForwardMessageStatus.Pending
|
||||||
|
};
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
public void InitialSequenceNumberSeededWithUnixMs()
|
public void InitialSequenceNumberSeededWithUnixMs()
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -10,6 +10,7 @@
|
|||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<PackageReference Include="coverlet.collector" />
|
<PackageReference Include="coverlet.collector" />
|
||||||
|
<PackageReference Include="Microsoft.Data.Sqlite" />
|
||||||
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
|
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
|
||||||
<PackageReference Include="Microsoft.Extensions.Options" />
|
<PackageReference Include="Microsoft.Extensions.Options" />
|
||||||
<PackageReference Include="Microsoft.NET.Test.Sdk" />
|
<PackageReference Include="Microsoft.NET.Test.Sdk" />
|
||||||
|
|||||||
@@ -110,6 +110,54 @@ public class HealthCheckTests : IDisposable
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task HealthReady_Endpoint_ExcludesActiveNodeCheck()
|
||||||
|
{
|
||||||
|
// Host-001 regression: /health/ready must reflect cluster membership + DB
|
||||||
|
// connectivity only (REQ-HOST-4a), NOT cluster leadership. The leader-only
|
||||||
|
// "active-node" check belongs solely to /health/active. If /health/ready
|
||||||
|
// included "active-node", a fully operational standby central node would
|
||||||
|
// permanently report 503, breaking load-balancer failover readiness.
|
||||||
|
var previousEnv = Environment.GetEnvironmentVariable("DOTNET_ENVIRONMENT");
|
||||||
|
try
|
||||||
|
{
|
||||||
|
Environment.SetEnvironmentVariable("DOTNET_ENVIRONMENT", "Central");
|
||||||
|
|
||||||
|
var factory = new WebApplicationFactory<Program>()
|
||||||
|
.WithWebHostBuilder(builder =>
|
||||||
|
{
|
||||||
|
builder.ConfigureAppConfiguration((context, config) =>
|
||||||
|
{
|
||||||
|
config.AddInMemoryCollection(new Dictionary<string, string?>
|
||||||
|
{
|
||||||
|
["ScadaLink:Node:NodeHostname"] = "localhost",
|
||||||
|
["ScadaLink:Node:RemotingPort"] = "0",
|
||||||
|
["ScadaLink:Cluster:SeedNodes:0"] = "akka.tcp://scadalink@localhost:2551",
|
||||||
|
["ScadaLink:Cluster:SeedNodes:1"] = "akka.tcp://scadalink@localhost:2552",
|
||||||
|
["ScadaLink:Database:SkipMigrations"] = "true",
|
||||||
|
});
|
||||||
|
});
|
||||||
|
builder.UseSetting("ScadaLink:Node:Role", "Central");
|
||||||
|
builder.UseSetting("ScadaLink:Database:SkipMigrations", "true");
|
||||||
|
});
|
||||||
|
_disposables.Add(factory);
|
||||||
|
|
||||||
|
var client = factory.CreateClient();
|
||||||
|
_disposables.Add(client);
|
||||||
|
|
||||||
|
var response = await client.GetAsync("/health/ready");
|
||||||
|
var body = await response.Content.ReadAsStringAsync();
|
||||||
|
|
||||||
|
// The readiness body lists each executed check by name in its entries map.
|
||||||
|
// The leader-only "active-node" check must not be among them.
|
||||||
|
Assert.DoesNotContain("active-node", body);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
Environment.SetEnvironmentVariable("DOTNET_ENVIRONMENT", previousEnv);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
public async Task ActiveNodeHealthCheck_SystemNotStarted_ReturnsUnhealthy()
|
public async Task ActiveNodeHealthCheck_SystemNotStarted_ReturnsUnhealthy()
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user