From f936f55f51af43c1c6781a3491b1d8c984b4b86e Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 28 May 2026 05:20:13 -0400 Subject: [PATCH] fix(concurrency): close 8 race / thread-safety findings across CD, DCL, SR MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CD-015: rewrite NotificationOutboxRepository.InsertIfNotExistsAsync as raw-SQL IF NOT EXISTS … INSERT with SqlException 2601/2627 catch, ending the at-least-once livelock on the site→central notification handoff. DCL-018/019/020/021/022: add _subscribesInFlight guard so concurrent same-tag subscribes don't orphan an adapter handle; delete the latent dead _subscriptionHandles dictionary; stop double-counting _totalSubscribed when an unresolved tag is promoted via another instance; release adapter handles on mid-flight unsubscribe; gate the tag-resolution retry timer with IsTimerActive so subscribe bursts don't reset it into starvation. SR-020: add _terminatingActorsByName shadow so a third deploy arriving during a pending redeploy doesn't crash on InvalidActorNameException — displaced senders get a Failed/superseded response and the latest command wins on Terminated. SR-024: split OperationTrackingStore reads from writes (fresh SqliteConnection per GetStatusAsync) so long writes don't block status queries; rewrite Dispose to drop the sync-over-async bridge that could deadlock on a non-reentrant SyncContext; Interlocked.Exchange makes the dispose-once flag race-safe across both paths. --- .../ConfigurationDatabase/findings.md | 15 +- code-reviews/DataConnectionLayer/findings.md | 67 +++- code-reviews/README.md | 24 +- code-reviews/SiteRuntime/findings.md | 32 +- .../NotificationOutboxRepository.cs | 75 ++++- .../Actors/DataConnectionActor.cs | 170 ++++++++-- .../Adapters/OpcUaDataConnection.cs | 23 +- .../Actors/DeploymentManagerActor.cs | 47 +++ .../Tracking/OperationTrackingStore.cs | 201 +++++++----- ...icationOutboxRepositoryIntegrationTests.cs | 132 ++++++++ .../RepositoryCoverageTests.cs | 30 +- .../DataConnectionActorTests.cs | 300 ++++++++++++++++++ .../OpcUaDataConnectionTests.cs | 36 +++ .../Actors/DeploymentManagerRedeployTests.cs | 59 ++++ .../Tracking/OperationTrackingStoreTests.cs | 111 +++++++ 15 files changed, 1152 insertions(+), 170 deletions(-) create mode 100644 tests/ScadaLink.ConfigurationDatabase.Tests/Repositories/NotificationOutboxRepositoryIntegrationTests.cs diff --git a/code-reviews/ConfigurationDatabase/findings.md b/code-reviews/ConfigurationDatabase/findings.md index a13cb08e..a6b83af9 100644 --- a/code-reviews/ConfigurationDatabase/findings.md +++ b/code-reviews/ConfigurationDatabase/findings.md @@ -891,9 +891,22 @@ columns) — asserting each column keeps an `EncryptedStringConverter`. |--|--| | Severity | High | | Category | Concurrency & thread safety | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.ConfigurationDatabase/Repositories/NotificationOutboxRepository.cs:33-45` | +**Resolution** — rewrote `InsertIfNotExistsAsync` as a single raw-SQL +`IF NOT EXISTS (...) INSERT` matching the +`AuditLogRepository.InsertIfNotExistsAsync` and +`SiteCallAuditRepository.UpsertAsync` patterns, with a `SqlException` +catch on numbers 2601 (unique-index violation) and 2627 +(primary-key/unique-constraint violation) returning `false`. Concurrent +losers are logged at Debug and treated as no-ops, eliminating the +site-retry livelock. Two SQLite-targeted assertions in +`RepositoryCoverageTests` were migrated to a new MS SQL-fixture file +`tests/ScadaLink.ConfigurationDatabase.Tests/Repositories/NotificationOutboxRepositoryIntegrationTests.cs`, +which also adds a 50-way parallel race test verifying exactly one row +lands and no exception bubbles. + **Description** `InsertIfNotExistsAsync` does `AnyAsync(x => x.NotificationId == n.NotificationId)`, diff --git a/code-reviews/DataConnectionLayer/findings.md b/code-reviews/DataConnectionLayer/findings.md index c9b24cf3..0d462de7 100644 --- a/code-reviews/DataConnectionLayer/findings.md +++ b/code-reviews/DataConnectionLayer/findings.md @@ -952,9 +952,22 @@ pre-fix code (the batch throws, no map returned) and passes after; |--|--| | Severity | High | | Category | Concurrency & thread safety | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs:557,564-594,653` | +**Resolution** — added a `_subscribesInFlight` HashSet mirroring the +existing `_resolutionInFlight` pattern. `HandleSubscribe` now partitions +each request's tags on the actor thread into "this request will +SubscribeAsync" vs. "already subscribed by us OR by another in-flight +request"; the second arrival sees the tag in `_subscribesInFlight` and +treats it as `AlreadySubscribed: true` without issuing a duplicate +adapter call. `HandleSubscribeCompleted` removes each +non-AlreadySubscribed result from the set; `ReSubscribeAll` clears it on +reconnect. Regression test +`DCL018_ConcurrentSubscribes_SameTag_DifferentInstances_IssueOneAdapterSubscribe` +parks the first subscribe in flight, fires a second for the same tag, +and asserts exactly one adapter SubscribeAsync call. + **Description** `HandleSubscribe` snapshots `_subscriptionIds.Keys` into a local `alreadySubscribed` @@ -1004,9 +1017,19 @@ exactly one `_adapter.SubscribeAsync(tag, ...)` call (and no orphan subscription |--|--| | Severity | Medium | | Category | Concurrency & thread safety | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.DataConnectionLayer/Adapters/OpcUaDataConnection.cs:31,167,177`, `src/ScadaLink.DataConnectionLayer/Adapters/OpcUaDataConnection.cs:163-164` | +**Resolution** — deleted the dead `_subscriptionHandles` field outright. +Subscription bookkeeping lives in `RealOpcUaClient._monitoredItems` / +`_callbacks` (already `ConcurrentDictionary` per DCL-003) at the device +layer and `DataConnectionActor._subscriptionIds` at the actor layer; +the adapter had no live reader and the field was a latent +race-condition trap. Added structural regression test +`DCL019_OpcUaDataConnection_HasNoNonConcurrentSharedDictionary` that +reflects over the adapter's fields and fails if any plain +`Dictionary<,>` is reintroduced. + **Description** `OpcUaDataConnection._subscriptionHandles` is declared as `Dictionary` mutated from concurrent thread-pool continuations | -| DataConnectionLayer-020 | [DataConnectionLayer](DataConnectionLayer/findings.md) | `HandleSubscribeCompleted` double-counts `_totalSubscribed` when a previously-unresolved tag is resolved by a different instance's subscribe | -| DataConnectionLayer-021 | [DataConnectionLayer](DataConnectionLayer/findings.md) | `HandleSubscribeCompleted` re-creates and leaks `_subscriptionsByInstance` entry when the instance unsubscribed mid-flight | -| DataConnectionLayer-022 | [DataConnectionLayer](DataConnectionLayer/findings.md) | `HandleSubscribeCompleted` and `HandleTagResolutionFailed` reset the tag-resolution retry timer on every call via `StartPeriodicTimer`, starving the retry under subscribe bursts | | DeploymentManager-019 | [DeploymentManager](DeploymentManager/findings.md) | Lifecycle command timeout writes no audit entry | | ExternalSystemGateway-019 | [ExternalSystemGateway](ExternalSystemGateway/findings.md) | `HttpClient.Timeout` is not set; `DefaultHttpTimeout` > 100s is silently clipped by the framework default | | ExternalSystemGateway-020 | [ExternalSystemGateway](ExternalSystemGateway/findings.md) | `JsonElementToParameterValue` silently downcasts non-Int64 JSON numbers to `double`, losing precision for `decimal` SQL parameters on retry | @@ -144,10 +138,8 @@ _None open._ | SiteCallAudit-003 | [SiteCallAudit](SiteCallAudit/findings.md) | `OnUpsertAsync` does not refresh `IngestedAtUtc`; direct-write callers must remember to stamp it | | SiteEventLogging-015 | [SiteEventLogging](SiteEventLogging/findings.md) | Background write queue is unbounded; can grow without limit under sustained writer slowness | | SiteEventLogging-017 | [SiteEventLogging](SiteEventLogging/findings.md) | Central client's `PageSize` is unbounded; defeats the "configurable page size" design rationale | -| SiteRuntime-020 | [SiteRuntime](SiteRuntime/findings.md) | Second `DeployInstanceCommand` arriving during a pending redeploy races the still-terminating actor on its name | | SiteRuntime-021 | [SiteRuntime](SiteRuntime/findings.md) | `HandleDeployArtifacts` updates `DataConnections` in SQLite but never sends `CreateConnectionCommand` to the DCL | | SiteRuntime-022 | [SiteRuntime](SiteRuntime/findings.md) | `AuditingDbCommand.DbConnection.set` uses reflection to read `AuditingDbConnection._inner` | -| SiteRuntime-024 | [SiteRuntime](SiteRuntime/findings.md) | `OperationTrackingStore` serialises all writes through one connection + `SemaphoreSlim`, and `Dispose()` does sync-over-async | | StoreAndForward-019 | [StoreAndForward](StoreAndForward/findings.md) | Notifications park after `DefaultMaxRetries` exhaustion, contradicting "retried until central acks" | | StoreAndForward-020 | [StoreAndForward](StoreAndForward/findings.md) | `RetryParkedMessageAsync` skips standby replication when the message is deleted between local update and re-load | | StoreAndForward-021 | [StoreAndForward](StoreAndForward/findings.md) | Design doc claims the Operation Tracking Table lives in StoreAndForward but the implementation is in SiteRuntime | diff --git a/code-reviews/SiteRuntime/findings.md b/code-reviews/SiteRuntime/findings.md index 90e26b13..f1dbb6f9 100644 --- a/code-reviews/SiteRuntime/findings.md +++ b/code-reviews/SiteRuntime/findings.md @@ -954,9 +954,22 @@ Instance Actor produces no `InstanceLifecycleResponse` for either command |--|--| | Severity | Medium | | Category | Concurrency & thread safety | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.SiteRuntime/Actors/DeploymentManagerActor.cs:285`, `src/ScadaLink.SiteRuntime/Actors/DeploymentManagerActor.cs:971` | +**Resolution** — added a name → terminating-actor-ref shadow +(`_terminatingActorsByName`) populated when `HandleDeploy` stops the +predecessor and cleared in `HandleTerminated`. `HandleDeploy` now +detects the mid-termination state before falling through to +`ApplyDeployment(fresh)`: on hit it tells the displaced +`PendingRedeploy.OriginalSender` a `DeploymentStatus.Failed` / +"superseded by newer deployment …" response and overwrites the buffered +pending command (last-write-wins). Regression test +`SR020_ThreeRapidDeploys_DoNotThrowInvalidActorNameException_LatestWins` +fires three rapid deploys, asserts the middle deploy is told it was +superseded, the latest succeeds, and the resulting instance is operable +(DisableInstanceCommand works). + **Description** The SiteRuntime-003 fix makes `HandleDeploy` watch + stop a running Instance @@ -1181,9 +1194,24 @@ on the host's regional settings. |--|--| | Severity | Medium | | Category | Performance & resource management | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.SiteRuntime/Tracking/OperationTrackingStore.cs:39`, `src/ScadaLink.SiteRuntime/Tracking/OperationTrackingStore.cs:360` | +**Resolution** — split reads from writes: the single owned +`_writeConnection` + `_writeGate` still serialises writers, but +`GetStatusAsync` now opens a fresh `SqliteConnection` per call against +the shared connection string (mirroring `SiteStorageService`) so reads +never block on an in-flight write. Sync `Dispose` was rewritten to NOT +bridge to async — the dispose-once flag is an `int` flipped with +`Interlocked.Exchange`, the synchronous path disposes +`_writeConnection` + `_writeGate` directly without acquiring the gate, +and `DisposeAsync` retains the gate-drain semantics for graceful +shutdown. Both paths are idempotent; the second call short-circuits via +the interlocked flag. Tests: +`SR024_ConcurrentReads_DoNotBlockOnInFlightWrite`, +`SR024_SyncDispose_DoesNotDeadlock_WhenInvokedFromFreshThread`, and +`SR024_AsyncDispose_DoesNotDeadlock_AndIsIdempotent`. + **Description** `OperationTrackingStore` owns exactly one `SqliteConnection` and gates every diff --git a/src/ScadaLink.ConfigurationDatabase/Repositories/NotificationOutboxRepository.cs b/src/ScadaLink.ConfigurationDatabase/Repositories/NotificationOutboxRepository.cs index f97fe889..2a7c4976 100644 --- a/src/ScadaLink.ConfigurationDatabase/Repositories/NotificationOutboxRepository.cs +++ b/src/ScadaLink.ConfigurationDatabase/Repositories/NotificationOutboxRepository.cs @@ -1,4 +1,7 @@ +using Microsoft.Data.SqlClient; using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; using ScadaLink.Commons.Entities.Notifications; using ScadaLink.Commons.Interfaces.Repositories; using ScadaLink.Commons.Types.Enums; @@ -12,7 +15,20 @@ namespace ScadaLink.ConfigurationDatabase.Repositories; /// public class NotificationOutboxRepository : INotificationOutboxRepository { + // SQL Server duplicate-key error numbers, matching the AuditLogRepository + // and SiteCallAuditRepository race-fixes. 2601 is a unique-index violation; + // 2627 is a primary-key/unique-constraint violation. The IF NOT EXISTS … + // INSERT pattern has a check-then-act race window — two sessions can both + // pass the EXISTS check and then both attempt the INSERT — and the loser + // surfaces as one of these. The site→central handoff is documented + // at-least-once with insert-if-not-exists, so the collision IS the expected + // contention mode; idempotency demands we swallow them rather than let the + // site retry the same NotificationId forever. + private const int SqlErrorUniqueIndexViolation = 2601; + private const int SqlErrorPrimaryKeyViolation = 2627; + private readonly ScadaLinkDbContext _context; + private readonly ILogger _logger; // Statuses that represent a finished notification lifecycle. Non-terminal is the complement. private static readonly NotificationStatus[] TerminalStatuses = @@ -24,24 +40,67 @@ public class NotificationOutboxRepository : INotificationOutboxRepository /// Initializes a new instance of with the given EF Core context. /// The EF Core database context. - public NotificationOutboxRepository(ScadaLinkDbContext context) + /// Optional logger instance. + public NotificationOutboxRepository(ScadaLinkDbContext context, ILogger? logger = null) { _context = context ?? throw new ArgumentNullException(nameof(context)); + _logger = logger ?? NullLogger.Instance; } /// public async Task InsertIfNotExistsAsync(Notification n, CancellationToken cancellationToken = default) { - var exists = await _context.Notifications - .AnyAsync(x => x.NotificationId == n.NotificationId, cancellationToken); - if (exists) + if (n is null) { - return false; + throw new ArgumentNullException(nameof(n)); } - await _context.Notifications.AddAsync(n, cancellationToken); - await _context.SaveChangesAsync(cancellationToken); - return true; + // Enum columns are stored as varchar(32) (HasConversion()); convert + // in C# rather than relying on parameter type inference (SqlClient would + // otherwise bind enums as int by default and break the column conversion). + var type = n.Type.ToString(); + var status = n.Status.ToString(); + + // FormattableString interpolation parameterises every value (no concatenation), + // so this is safe against injection even for the string columns. + try + { + var rowsAffected = await _context.Database.ExecuteSqlInterpolatedAsync( + $@"IF NOT EXISTS (SELECT 1 FROM dbo.Notifications WHERE NotificationId = {n.NotificationId}) +INSERT INTO dbo.Notifications + (NotificationId, Type, ListName, Subject, Body, TypeData, Status, RetryCount, LastError, + ResolvedTargets, SourceSiteId, SourceNode, SourceInstanceId, SourceScript, + OriginExecutionId, OriginParentExecutionId, + SiteEnqueuedAt, CreatedAt, LastAttemptAt, NextAttemptAt, DeliveredAt) +VALUES + ({n.NotificationId}, {type}, {n.ListName}, {n.Subject}, {n.Body}, {n.TypeData}, {status}, {n.RetryCount}, {n.LastError}, + {n.ResolvedTargets}, {n.SourceSiteId}, {n.SourceNode}, {n.SourceInstanceId}, {n.SourceScript}, + {n.OriginExecutionId}, {n.OriginParentExecutionId}, + {n.SiteEnqueuedAt}, {n.CreatedAt}, {n.LastAttemptAt}, {n.NextAttemptAt}, {n.DeliveredAt});", + cancellationToken); + + // rowsAffected == 1 -> we inserted; 0 -> a prior row was already there + // (IF NOT EXISTS short-circuited the INSERT). + return rowsAffected == 1; + } + catch (SqlException ex) when ( + ex.Number == SqlErrorUniqueIndexViolation + || ex.Number == SqlErrorPrimaryKeyViolation) + { + // Two concurrent sessions both passed IF NOT EXISTS and both + // attempted the INSERT — the loser raises 2601/2627 against the + // NotificationId primary key. First-write-wins idempotency is the + // documented contract (the site→central handoff is at-least-once, + // and the actor discards the return value), so the race outcome is + // semantically a no-op. Returning false here matches the + // "row already existed" branch of the success path. + _logger.LogDebug( + ex, + "InsertIfNotExistsAsync swallowed duplicate-key violation (error {SqlErrorNumber}) for NotificationId {NotificationId}; treating as no-op.", + ex.Number, + n.NotificationId); + return false; + } } /// diff --git a/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs b/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs index d32343d2..94b535ae 100644 --- a/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs +++ b/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs @@ -71,6 +71,22 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers /// private readonly HashSet _resolutionInFlight = new(); + /// + /// DataConnectionLayer-018: tags whose initial SubscribeAsync (issued from + /// ) is currently in flight. Two parallel + /// SubscribeTagsRequest messages for different instances sharing a tag + /// path would otherwise both observe "not subscribed" against + /// (the in-flight task has not yet posted its + /// ), both call _adapter.SubscribeAsync, + /// and the second subscription id gets silently dropped at the existing + /// _subscriptionIds.ContainsKey guard in + /// — orphaning the adapter's monitored item (duplicate notifications + leaked + /// memory until the connection drops). This set is read+written only on the + /// actor thread and cleared in for symmetry with + /// . + /// + private readonly HashSet _subscribesInFlight = new(); + /// /// Subscribers: instanceUniqueName → IActorRef (the Instance Actor). /// @@ -550,27 +566,43 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers // from this adapter can be distinguished from a later (post-failover) adapter. var generation = _adapterGeneration; - // Snapshot the already-subscribed tag set on the actor thread. The background - // task below must NOT read or mutate actor state — it performs only adapter - // I/O and reports results back via a SubscribeCompleted message, which is - // applied to actor state on the actor thread (see HandleSubscribeCompleted). - var alreadySubscribed = new HashSet(_subscriptionIds.Keys); + // DataConnectionLayer-018: partition tags on the actor thread into "this + // request will issue _adapter.SubscribeAsync" vs. "already subscribed (by us + // or by another in-flight SubscribeTagsRequest)". A tag that is already in + // _subscriptionIds OR currently in _subscribesInFlight is treated as + // AlreadySubscribed — the eventual SubscribeCompleted of the in-flight + // request will populate _subscriptionIds, at which point a subsequent + // unsubscribe by either instance correctly references the adapter handle. + // The background task below must NOT read or mutate actor state — these + // partitioned lists are the only state it sees. + var tagsToSubscribe = new List(request.TagPaths.Count); + var preResolvedResults = new List(); + foreach (var tagPath in request.TagPaths) + { + if (_subscriptionIds.ContainsKey(tagPath) || _subscribesInFlight.Contains(tagPath)) + { + preResolvedResults.Add(new SubscribeTagResult( + tagPath, AlreadySubscribed: true, Success: true, null, null)); + } + else + { + tagsToSubscribe.Add(tagPath); + _subscribesInFlight.Add(tagPath); + } + } Task.Run(async () => { var results = new List(request.TagPaths.Count); - var tagsToSeed = new List(); - - foreach (var tagPath in request.TagPaths) + results.AddRange(preResolvedResults); + var tagsToSeed = new List(preResolvedResults.Count + tagsToSubscribe.Count); + foreach (var r in preResolvedResults) { - if (alreadySubscribed.Contains(tagPath)) - { - // Already subscribed by another instance — just track for this one. - results.Add(new SubscribeTagResult(tagPath, AlreadySubscribed: true, Success: true, null, null)); - tagsToSeed.Add(tagPath); - continue; - } + tagsToSeed.Add(r.TagPath); + } + foreach (var tagPath in tagsToSubscribe) + { try { var subId = await _adapter.SubscribeAsync(tagPath, (path, value) => @@ -628,9 +660,45 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers var instanceName = msg.Request.InstanceUniqueName; if (!_subscriptionsByInstance.TryGetValue(instanceName, out var instanceTags)) { - // The instance was unsubscribed while the subscribe I/O was in flight. - instanceTags = new HashSet(); - _subscriptionsByInstance[instanceName] = instanceTags; + // DataConnectionLayer-021: the instance was unsubscribed while the + // subscribe I/O was in flight. Re-creating the per-instance entry and + // applying counter/handle mutations here would permanently leak state + // — _subscriptionsByInstance[instanceName] resurrected with no + // subscriber to receive callbacks, _tagSubscriberCount inflated forever + // (no future HandleUnsubscribe will drop it), and _totalSubscribed / + // _resolvedTags drifting above the real instance count across the + // adapter lifetime (also re-issued by ReSubscribeAll on every + // reconnect). Instead: drop all state mutations for this stale + // message and release the adapter-level monitored items we just + // created so the device doesn't keep streaming change notifications + // for a tag nobody is subscribed to. + _log.Warning( + "[{0}] SubscribeCompleted arrived for instance {1} but the instance " + + "was unsubscribed while the subscribe was in flight; releasing " + + "{2} adapter handle(s) and discarding state mutations.", + _connectionName, instanceName, msg.Results.Count(r => r.Success && !r.AlreadySubscribed)); + + foreach (var result in msg.Results) + { + // DCL-018: clear in-flight markers we placed in HandleSubscribe. + if (!result.AlreadySubscribed) + _subscribesInFlight.Remove(result.TagPath); + + // Fire-and-forget release of any subscription id this request + // genuinely created. AlreadySubscribed=true means another caller + // owns the adapter handle and unsubscribing it would break them. + if (result is { Success: true, AlreadySubscribed: false, SubscriptionId: not null }) + { + _ = _adapter.UnsubscribeAsync(result.SubscriptionId); + } + } + + // The original sender is already gone (unsubscribed). Telling a dead + // ref produces a dead letter, which is the harmless and observable + // outcome — but skipping the reply altogether keeps dead-letter noise + // out of the log when this race fires in the normal disable/redeploy + // path. The unsubscribe message did NOT request a response of its own. + return false; } // DataConnectionLayer-004: if any tag failed because the adapter is not @@ -641,6 +709,15 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers foreach (var result in msg.Results) { + // DataConnectionLayer-018: a result with AlreadySubscribed: false means + // this request was responsible for the SubscribeAsync call — the tag + // was added to _subscribesInFlight in HandleSubscribe. Clear it now so + // a later SubscribeTagsRequest for the same tag isn't forever treated + // as in-flight. AlreadySubscribed: true tags were not added to the + // set (another request owned the in-flight slot). + if (!result.AlreadySubscribed) + _subscribesInFlight.Remove(result.TagPath); + // DataConnectionLayer-008: only a tag newly added to THIS instance's set // increments the reference count, so the count stays an accurate "number // of distinct instances subscribed to this tag". @@ -656,8 +733,27 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers if (result.Success) { _subscriptionIds[result.TagPath] = result.SubscriptionId!; - _totalSubscribed++; - _resolvedTags++; + + // DataConnectionLayer-020: distinguish fresh subscribe from + // unresolved → resolved promotion. If an earlier instance's + // subscribe for this tag had failed at the resolution layer + // (the tag was already added to _unresolvedTags AND already + // counted in _totalSubscribed), this success transitions it + // from unresolved to resolved — increment _resolvedTags ONLY. + // Incrementing _totalSubscribed again here would over-count by + // one until HandleTagResolutionSucceeded reconciled. Mirrors + // HandleTagResolutionSucceeded's promotion shape so both paths + // resolve a previously-failed tag identically. + if (_unresolvedTags.Remove(result.TagPath)) + { + _resolutionInFlight.Remove(result.TagPath); + _resolvedTags++; + } + else + { + _totalSubscribed++; + _resolvedTags++; + } } else if (result.ConnectionLevelFailure) { @@ -670,9 +766,17 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers else { // WP-12: genuine tag resolution failure — mark unresolved so the - // periodic retry timer picks it up. - _unresolvedTags.Add(result.TagPath); - _totalSubscribed++; + // periodic retry timer picks it up. DataConnectionLayer-020: + // only increment _totalSubscribed when the tag is genuinely + // newly-tracked. A second instance failing to resolve a tag the + // first instance already added to _unresolvedTags is the same + // logical tag, counted once — bumping _totalSubscribed again + // would over-report TotalSubscribedTags forever. + var newlyUnresolved = _unresolvedTags.Add(result.TagPath); + if (newlyUnresolved) + { + _totalSubscribed++; + } _log.Debug("[{0}] Tag resolution failed for {1}: {2}", _connectionName, result.TagPath, result.Error); @@ -688,7 +792,13 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers } // Start the tag-resolution retry timer if any tags are unresolved. - if (_unresolvedTags.Count > 0) + // DataConnectionLayer-022: StartPeriodicTimer with an existing key CANCELS + // and replaces the prior timer, so a fan-out of SubscribeTagsRequests + // arriving faster than TagResolutionRetryInterval would keep resetting + // the timer and starve the retry indefinitely. Gating on IsTimerActive + // means the first failure starts the timer and subsequent failures + // simply pile onto _unresolvedTags without restarting the clock. + if (_unresolvedTags.Count > 0 && !Timers.IsTimerActive("tag-resolution-retry")) { Timers.StartPeriodicTimer( "tag-resolution-retry", @@ -918,6 +1028,12 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers _subscriptionIds.Clear(); _unresolvedTags.Clear(); _resolutionInFlight.Clear(); + // DataConnectionLayer-018: symmetric with _resolutionInFlight — any pending + // initial-subscribe completions from the previous adapter generation will + // post SubscribeCompleted to the actor, but ReSubscribeAll has just emptied + // the in-flight tracking; the stale completion simply has nothing to + // remove (idempotent HashSet.Remove on a missing key). + _subscribesInFlight.Clear(); _resolvedTags = 0; // DataConnectionLayer-006: reset the quality tracking too. Otherwise tags @@ -987,8 +1103,10 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers // it is eligible for the next retry tick again. _resolutionInFlight.Remove(msg.TagPath); - // Track as unresolved so periodic retry picks it up - if (_unresolvedTags.Add(msg.TagPath)) + // Track as unresolved so periodic retry picks it up. DCL-022: gate on + // IsTimerActive so a stream of TagResolutionFailed events doesn't keep + // cancelling and re-starting the timer faster than its own interval. + if (_unresolvedTags.Add(msg.TagPath) && !Timers.IsTimerActive("tag-resolution-retry")) { Timers.StartPeriodicTimer( "tag-resolution-retry", diff --git a/src/ScadaLink.DataConnectionLayer/Adapters/OpcUaDataConnection.cs b/src/ScadaLink.DataConnectionLayer/Adapters/OpcUaDataConnection.cs index 33b5f794..bd8c835e 100644 --- a/src/ScadaLink.DataConnectionLayer/Adapters/OpcUaDataConnection.cs +++ b/src/ScadaLink.DataConnectionLayer/Adapters/OpcUaDataConnection.cs @@ -25,10 +25,16 @@ public class OpcUaDataConnection : IDataConnection private string _endpointUrl = string.Empty; private ConnectionHealth _status = ConnectionHealth.Disconnected; - /// - /// Maps subscription IDs to their tag paths for cleanup. - /// - private readonly Dictionary _subscriptionHandles = new(); + // DataConnectionLayer-019: the previous _subscriptionHandles Dictionary was + // dead state — written by SubscribeAsync, removed by UnsubscribeAsync, but + // never read anywhere. Plain Dictionary writes from thread-pool continuations + // after await are racy (concurrent resize is undefined: InvalidOperationException, + // bucket corruption, or silently lost entries). Bookkeeping for subscriptions + // lives at two genuine layers: RealOpcUaClient._monitoredItems/_callbacks + // (already ConcurrentDictionary per DCL-003) at the device adapter, and + // DataConnectionActor._subscriptionIds at the actor — both authoritative. + // The adapter has no need for a third copy; the field is removed rather than + // converted to ConcurrentDictionary because there is no reader. private StaleTagMonitor? _staleMonitor; private string? _heartbeatSubscriptionId; @@ -155,7 +161,10 @@ public class OpcUaDataConnection : IDataConnection { EnsureConnected(); - var subscriptionId = await _client!.CreateSubscriptionAsync( + // DataConnectionLayer-019: subscriptionId is returned directly to the + // caller (DataConnectionActor stores it in _subscriptionIds). No local + // bookkeeping is kept here — see the field-deletion note above. + return await _client!.CreateSubscriptionAsync( tagPath, (nodeId, value, timestamp, statusCode) => { @@ -163,9 +172,6 @@ public class OpcUaDataConnection : IDataConnection callback(tagPath, new TagValue(value, quality, new DateTimeOffset(timestamp, TimeSpan.Zero))); }, cancellationToken); - - _subscriptionHandles[subscriptionId] = tagPath; - return subscriptionId; } /// @@ -174,7 +180,6 @@ public class OpcUaDataConnection : IDataConnection if (_client != null) { await _client.RemoveSubscriptionAsync(subscriptionId, cancellationToken); - _subscriptionHandles.Remove(subscriptionId); } } diff --git a/src/ScadaLink.SiteRuntime/Actors/DeploymentManagerActor.cs b/src/ScadaLink.SiteRuntime/Actors/DeploymentManagerActor.cs index b4a52d2b..03b39944 100644 --- a/src/ScadaLink.SiteRuntime/Actors/DeploymentManagerActor.cs +++ b/src/ScadaLink.SiteRuntime/Actors/DeploymentManagerActor.cs @@ -53,6 +53,22 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers /// confirms the child has fully stopped (SiteRuntime-003). /// private readonly Dictionary _pendingRedeploys = new(); + + /// + /// SiteRuntime-020: name → terminating actor ref shadow of . + /// Required because a third for the same instance + /// arriving WHILE a redeploy is still mid-termination would otherwise see + /// _instanceActors.TryGetValue == false and fall through to + /// + , where + /// Context.ActorOf(props, instanceName) throws InvalidActorNameException + /// — the child name is still registered until the signal fires. + /// The supervisor's Stop directive then drops the deploy command silently, leaving the + /// deployer waiting forever and persistence dangling. The shadow index lets + /// detect the mid-termination state and overwrite the + /// buffered pending command (last-write-wins) instead of trying to create a fresh actor. + /// Cleared in alongside . + /// + private readonly Dictionary _terminatingActorsByName = new(); private int _totalDeployedCount; /// Akka timer scheduler injected by the framework via . @@ -296,12 +312,36 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers { _instanceActors.Remove(instanceName); _pendingRedeploys[existing] = new PendingRedeploy(command, Sender); + _terminatingActorsByName[instanceName] = existing; Context.Watch(existing); Context.Stop(existing); UpdateInstanceCounts(); return; } + // SiteRuntime-020: a deploy arriving while the previous redeploy is still + // terminating (the Terminated signal hasn't fired yet) used to fall through + // to ApplyDeployment(fresh), where Context.ActorOf would throw + // InvalidActorNameException because the child name is still registered. + // Detect the mid-termination state and overwrite the buffered pending + // command (last-write-wins) so the latest deploy is applied when Terminated + // arrives. The displaced sender is told Failed-superseded so it doesn't + // wait forever. + if (_terminatingActorsByName.TryGetValue(instanceName, out var terminatingRef)) + { + if (_pendingRedeploys.TryGetValue(terminatingRef, out var displaced)) + { + displaced.OriginalSender.Tell(new DeploymentStatusResponse( + displaced.Command.DeploymentId, + instanceName, + DeploymentStatus.Failed, + $"superseded by newer deployment {command.DeploymentId} before predecessor finished terminating", + DateTimeOffset.UtcNow)); + } + _pendingRedeploys[terminatingRef] = new PendingRedeploy(command, Sender); + return; + } + // Fresh deployment — no existing actor to replace. ApplyDeployment(command, Sender, isRedeploy: false); } @@ -315,6 +355,13 @@ public class DeploymentManagerActor : ReceiveActor, IWithTimers if (!_pendingRedeploys.Remove(terminated.ActorRef, out var pending)) return; + // SiteRuntime-020: drop the name-keyed shadow now that the predecessor has + // fully terminated and its actor name is free. Any deploy arriving after + // this point sees neither _instanceActors[name] (cleared when we stopped + // the predecessor) nor _terminatingActorsByName[name] (cleared here), so + // ApplyDeployment + Context.ActorOf below safely reuses the name. + _terminatingActorsByName.Remove(pending.Command.InstanceUniqueName); + ApplyDeployment(pending.Command, pending.OriginalSender, isRedeploy: true); } diff --git a/src/ScadaLink.SiteRuntime/Tracking/OperationTrackingStore.cs b/src/ScadaLink.SiteRuntime/Tracking/OperationTrackingStore.cs index 5bd3ef30..7698da79 100644 --- a/src/ScadaLink.SiteRuntime/Tracking/OperationTrackingStore.cs +++ b/src/ScadaLink.SiteRuntime/Tracking/OperationTrackingStore.cs @@ -36,10 +36,20 @@ namespace ScadaLink.SiteRuntime.Tracking; /// public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable, IDisposable { - private readonly SqliteConnection _connection; - private readonly SemaphoreSlim _gate = new(1, 1); + // SiteRuntime-024: writer state — one owned SqliteConnection serialised behind + // _writeGate. Readers do NOT share this connection or gate; see GetStatusAsync. + private readonly SqliteConnection _writeConnection; + private readonly SemaphoreSlim _writeGate = new(1, 1); + private readonly string _connectionString; private readonly ILogger _logger; - private bool _disposed; + + // SiteRuntime-024: dispose-once state shared by the sync Dispose and async + // DisposeAsync paths. Interlocked.Exchange is the race-safe primitive here — + // a plain bool can be flipped twice if Dispose() and DisposeAsync() are + // invoked concurrently (e.g. host shutdown bridging both). 0 = live, + // 1 = disposed. Read by other methods via Volatile.Read after the gate is + // taken; they raise ObjectDisposedException when set. + private int _disposeState; /// /// Initializes the tracking store, opens the SQLite connection, and applies the schema. @@ -54,14 +64,15 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable, ArgumentNullException.ThrowIfNull(logger); _logger = logger; - _connection = new SqliteConnection(options.Value.ConnectionString); - _connection.Open(); + _connectionString = options.Value.ConnectionString; + _writeConnection = new SqliteConnection(_connectionString); + _writeConnection.Open(); InitializeSchema(); } private void InitializeSchema() { - using var cmd = _connection.CreateCommand(); + using var cmd = _writeConnection.CreateCommand(); cmd.CommandText = """ CREATE TABLE IF NOT EXISTS OperationTracking ( TrackedOperationId TEXT NOT NULL PRIMARY KEY, @@ -112,7 +123,7 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable, /// private void AddColumnIfMissing(string columnName, string columnDefinition) { - using var probe = _connection.CreateCommand(); + using var probe = _writeConnection.CreateCommand(); probe.CommandText = "SELECT COUNT(*) FROM pragma_table_info('OperationTracking') WHERE name = $name"; probe.Parameters.AddWithValue("$name", columnName); var exists = Convert.ToInt32(probe.ExecuteScalar()) > 0; @@ -121,7 +132,7 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable, return; } - using var alter = _connection.CreateCommand(); + using var alter = _writeConnection.CreateCommand(); // Column name + definition are caller-controlled constants, never user // input — safe to interpolate (parameters are not permitted in DDL). alter.CommandText = $"ALTER TABLE OperationTracking ADD COLUMN {columnName} {columnDefinition}"; @@ -140,14 +151,14 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable, { ArgumentNullException.ThrowIfNull(kind); - await _gate.WaitAsync(ct).ConfigureAwait(false); + await _writeGate.WaitAsync(ct).ConfigureAwait(false); try { - ObjectDisposedException.ThrowIf(_disposed, this); + ObjectDisposedException.ThrowIf(Volatile.Read(ref _disposeState) != 0, this); var now = DateTime.UtcNow.ToString("o", CultureInfo.InvariantCulture); - using var cmd = _connection.CreateCommand(); + using var cmd = _writeConnection.CreateCommand(); // INSERT OR IGNORE: duplicate ids are no-ops (first-write-wins) — // matches the at-least-once semantics the site emits under. cmd.CommandText = """ @@ -176,7 +187,7 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable, } finally { - _gate.Release(); + _writeGate.Release(); } } @@ -191,14 +202,14 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable, { ArgumentNullException.ThrowIfNull(status); - await _gate.WaitAsync(ct).ConfigureAwait(false); + await _writeGate.WaitAsync(ct).ConfigureAwait(false); try { - ObjectDisposedException.ThrowIf(_disposed, this); + ObjectDisposedException.ThrowIf(Volatile.Read(ref _disposeState) != 0, this); var now = DateTime.UtcNow.ToString("o", CultureInfo.InvariantCulture); - using var cmd = _connection.CreateCommand(); + using var cmd = _writeConnection.CreateCommand(); // Terminal rows are immutable — the WHERE clause filters them out so // late-arriving attempt telemetry never overwrites a resolved row. cmd.CommandText = """ @@ -222,7 +233,7 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable, } finally { - _gate.Release(); + _writeGate.Release(); } } @@ -236,14 +247,14 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable, { ArgumentNullException.ThrowIfNull(status); - await _gate.WaitAsync(ct).ConfigureAwait(false); + await _writeGate.WaitAsync(ct).ConfigureAwait(false); try { - ObjectDisposedException.ThrowIf(_disposed, this); + ObjectDisposedException.ThrowIf(Volatile.Read(ref _disposeState) != 0, this); var now = DateTime.UtcNow.ToString("o", CultureInfo.InvariantCulture); - using var cmd = _connection.CreateCommand(); + using var cmd = _writeConnection.CreateCommand(); // First-write-wins on the terminal flip: only update rows that // haven't already terminated. cmd.CommandText = """ @@ -266,7 +277,7 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable, } finally { - _gate.Release(); + _writeGate.Release(); } } @@ -275,47 +286,48 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable, TrackedOperationId id, CancellationToken ct = default) { - await _gate.WaitAsync(ct).ConfigureAwait(false); - try + ObjectDisposedException.ThrowIf(Volatile.Read(ref _disposeState) != 0, this); + + // SiteRuntime-024: reads open a fresh, ungated SqliteConnection so a + // long-running write doesn't block status queries. The connection + // string is shared with the writer; SQLite handles cross-connection + // isolation natively (a reader sees a consistent snapshot via the + // shared cache lock for in-memory DBs, or a WAL snapshot for file DBs). + // Mirrors the SiteStorageService precedent. + await using var readConnection = new SqliteConnection(_connectionString); + await readConnection.OpenAsync(ct).ConfigureAwait(false); + + await using var cmd = readConnection.CreateCommand(); + cmd.CommandText = """ + SELECT TrackedOperationId, Kind, TargetSummary, Status, + RetryCount, LastError, HttpStatus, + CreatedAtUtc, UpdatedAtUtc, TerminalAtUtc, + SourceInstanceId, SourceScript, SourceNode + FROM OperationTracking + WHERE TrackedOperationId = $id; + """; + cmd.Parameters.AddWithValue("$id", id.ToString()); + + await using var reader = await cmd.ExecuteReaderAsync(ct).ConfigureAwait(false); + if (!await reader.ReadAsync(ct).ConfigureAwait(false)) { - ObjectDisposedException.ThrowIf(_disposed, this); - - using var cmd = _connection.CreateCommand(); - cmd.CommandText = """ - SELECT TrackedOperationId, Kind, TargetSummary, Status, - RetryCount, LastError, HttpStatus, - CreatedAtUtc, UpdatedAtUtc, TerminalAtUtc, - SourceInstanceId, SourceScript, SourceNode - FROM OperationTracking - WHERE TrackedOperationId = $id; - """; - cmd.Parameters.AddWithValue("$id", id.ToString()); - - using var reader = cmd.ExecuteReader(); - if (!reader.Read()) - { - return null; - } - - return new TrackingStatusSnapshot( - Id: TrackedOperationId.Parse(reader.GetString(0)), - Kind: reader.GetString(1), - TargetSummary: reader.IsDBNull(2) ? null : reader.GetString(2), - Status: reader.GetString(3), - RetryCount: reader.GetInt32(4), - LastError: reader.IsDBNull(5) ? null : reader.GetString(5), - HttpStatus: reader.IsDBNull(6) ? null : reader.GetInt32(6), - CreatedAtUtc: ParseUtc(reader.GetString(7)), - UpdatedAtUtc: ParseUtc(reader.GetString(8)), - TerminalAtUtc: reader.IsDBNull(9) ? null : ParseUtc(reader.GetString(9)), - SourceInstanceId: reader.IsDBNull(10) ? null : reader.GetString(10), - SourceScript: reader.IsDBNull(11) ? null : reader.GetString(11), - SourceNode: reader.IsDBNull(12) ? null : reader.GetString(12)); - } - finally - { - _gate.Release(); + return null; } + + return new TrackingStatusSnapshot( + Id: TrackedOperationId.Parse(reader.GetString(0)), + Kind: reader.GetString(1), + TargetSummary: reader.IsDBNull(2) ? null : reader.GetString(2), + Status: reader.GetString(3), + RetryCount: reader.GetInt32(4), + LastError: reader.IsDBNull(5) ? null : reader.GetString(5), + HttpStatus: reader.IsDBNull(6) ? null : reader.GetInt32(6), + CreatedAtUtc: ParseUtc(reader.GetString(7)), + UpdatedAtUtc: ParseUtc(reader.GetString(8)), + TerminalAtUtc: reader.IsDBNull(9) ? null : ParseUtc(reader.GetString(9)), + SourceInstanceId: reader.IsDBNull(10) ? null : reader.GetString(10), + SourceScript: reader.IsDBNull(11) ? null : reader.GetString(11), + SourceNode: reader.IsDBNull(12) ? null : reader.GetString(12)); } /// @@ -323,12 +335,12 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable, DateTime olderThanUtc, CancellationToken ct = default) { - await _gate.WaitAsync(ct).ConfigureAwait(false); + await _writeGate.WaitAsync(ct).ConfigureAwait(false); try { - ObjectDisposedException.ThrowIf(_disposed, this); + ObjectDisposedException.ThrowIf(Volatile.Read(ref _disposeState) != 0, this); - using var cmd = _connection.CreateCommand(); + using var cmd = _writeConnection.CreateCommand(); // Non-terminal rows (TerminalAtUtc IS NULL) are kept regardless of // age — the operation is still in flight. cmd.CommandText = """ @@ -344,7 +356,7 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable, } finally { - _gate.Release(); + _writeGate.Release(); } } @@ -356,33 +368,68 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable, DateTimeStyles.RoundtripKind); } - /// Synchronously disposes the tracking store and its SQLite connection. + /// + /// Synchronously disposes the tracking store and its SQLite connection. + /// + /// + /// SiteRuntime-024: this path does NOT bridge to async via + /// .AsTask().GetAwaiter().GetResult(). Sync-over-async on a SemaphoreSlim + /// can deadlock when invoked from a non-reentrant SyncContext (e.g. host + /// shutdown continuations observed on the host sync context). In-flight writes + /// at the moment of will fail their next operation + /// against the disposed connection with — + /// the caller's responsibility is to ensure no concurrent operations during + /// the synchronous dispose. Use if you need to + /// drain in-flight writes before close. + /// public void Dispose() { - DisposeAsyncCore().AsTask().GetAwaiter().GetResult(); + if (Interlocked.Exchange(ref _disposeState, 1) != 0) + { + return; + } + + _writeConnection.Dispose(); + _writeGate.Dispose(); GC.SuppressFinalize(this); } - /// Asynchronously disposes the tracking store and its SQLite connection. + /// + /// Asynchronously disposes the tracking store and its SQLite connection. + /// Drains in-flight writes by acquiring the write gate before closing the + /// connection, so a write currently executing a SqliteCommand completes + /// before the connection is freed. + /// public async ValueTask DisposeAsync() { - await DisposeAsyncCore().ConfigureAwait(false); - GC.SuppressFinalize(this); - } + if (Interlocked.Exchange(ref _disposeState, 1) != 0) + { + return; + } - private async ValueTask DisposeAsyncCore() - { - await _gate.WaitAsync().ConfigureAwait(false); + // Drain any in-flight write by taking the write gate. Past this point + // no new write can acquire the gate because _disposeState is set, so + // the next ThrowIf check in each writer raises ObjectDisposedException. try { - if (_disposed) return; - _disposed = true; - _connection.Dispose(); + await _writeGate.WaitAsync().ConfigureAwait(false); + } + catch (ObjectDisposedException) + { + // Race with another disposer that already disposed the gate — the + // _disposeState exchange above should prevent this, but be defensive. + } + + try + { + _writeConnection.Dispose(); } finally { - _gate.Release(); - _gate.Dispose(); + try { _writeGate.Release(); } catch (ObjectDisposedException) { } + _writeGate.Dispose(); } + + GC.SuppressFinalize(this); } } diff --git a/tests/ScadaLink.ConfigurationDatabase.Tests/Repositories/NotificationOutboxRepositoryIntegrationTests.cs b/tests/ScadaLink.ConfigurationDatabase.Tests/Repositories/NotificationOutboxRepositoryIntegrationTests.cs new file mode 100644 index 00000000..44c09c02 --- /dev/null +++ b/tests/ScadaLink.ConfigurationDatabase.Tests/Repositories/NotificationOutboxRepositoryIntegrationTests.cs @@ -0,0 +1,132 @@ +using Microsoft.EntityFrameworkCore; +using ScadaLink.Commons.Entities.Notifications; +using ScadaLink.Commons.Types.Enums; +using ScadaLink.ConfigurationDatabase; +using ScadaLink.ConfigurationDatabase.Repositories; +using ScadaLink.ConfigurationDatabase.Tests.Migrations; +using Xunit; + +namespace ScadaLink.ConfigurationDatabase.Tests.Repositories; + +/// +/// CD-015 race-fix integration tests for +/// . The method +/// is raw-SQL (IF NOT EXISTS … INSERT) matching the AuditLog and SiteCalls +/// idempotent-insert pattern; it must execute against a real SQL Server schema, +/// so this class uses rather than the SQLite +/// in-memory provider used by . +/// +public class NotificationOutboxRepositoryIntegrationTests : IClassFixture +{ + private readonly MsSqlMigrationFixture _fixture; + + public NotificationOutboxRepositoryIntegrationTests(MsSqlMigrationFixture fixture) + { + _fixture = fixture; + } + + [SkippableFact] + public async Task InsertIfNotExistsAsync_FreshId_InsertsAndReturnsTrue() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var id = Guid.NewGuid().ToString(); + await using var context = CreateContext(); + var repo = new NotificationOutboxRepository(context); + + var inserted = await repo.InsertIfNotExistsAsync(MakeNotification(id)); + + Assert.True(inserted); + + await using var readContext = CreateContext(); + var loaded = await readContext.Notifications.FindAsync(id); + Assert.NotNull(loaded); + Assert.Equal("Subject", loaded!.Subject); + } + + [SkippableFact] + public async Task InsertIfNotExistsAsync_DuplicateId_ReturnsFalseAndLeavesExistingRow() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var id = Guid.NewGuid().ToString(); + await using (var context = CreateContext()) + { + var repo = new NotificationOutboxRepository(context); + await repo.InsertIfNotExistsAsync(MakeNotification(id, subject: "Original")); + } + + await using (var context = CreateContext()) + { + var repo = new NotificationOutboxRepository(context); + var inserted = await repo.InsertIfNotExistsAsync(MakeNotification(id, subject: "Changed")); + Assert.False(inserted); + } + + await using var readContext = CreateContext(); + var loaded = await readContext.Notifications.FindAsync(id); + Assert.NotNull(loaded); + Assert.Equal("Original", loaded!.Subject); + } + + [SkippableFact] + public async Task InsertIfNotExistsAsync_ConcurrentInserts_SameId_OnlyOneRow() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + // CD-015 race coverage. The IF NOT EXISTS … INSERT pattern has a + // check-then-act window: two concurrent sessions can both pass the + // EXISTS check and both attempt the INSERT — the loser surfaces as a + // SqlException with Number 2601/2627. The site→central handoff is + // documented at-least-once with insert-if-not-exists, so this collision + // IS the expected contention mode. The race losers MUST be swallowed + // (not bubbled) so the site doesn't retry the same NotificationId + // forever. Final row count must be exactly 1; no exceptions thrown. + var id = Guid.NewGuid().ToString(); + + await Parallel.ForEachAsync( + Enumerable.Range(0, 50), + new ParallelOptions { MaxDegreeOfParallelism = 50 }, + async (_, ct) => + { + await using var context = CreateContext(); + var repo = new NotificationOutboxRepository(context); + await repo.InsertIfNotExistsAsync(MakeNotification(id), ct); + }); + + await using var readContext = CreateContext(); + var count = await readContext.Notifications + .Where(n => n.NotificationId == id) + .CountAsync(); + Assert.Equal(1, count); + } + + // --- helpers ------------------------------------------------------------ + + private ScadaLinkDbContext CreateContext() + { + var options = new DbContextOptionsBuilder() + .UseSqlServer(_fixture.ConnectionString) + .Options; + return new ScadaLinkDbContext(options); + } + + private static Notification MakeNotification( + string id, + NotificationStatus status = NotificationStatus.Pending, + string subject = "Subject") + { + return new Notification( + id, + NotificationType.Email, + "Ops List", + subject, + "Body", + "site-cd015") + { + Status = status, + CreatedAt = new DateTimeOffset(2026, 5, 20, 10, 0, 0, TimeSpan.Zero), + SiteEnqueuedAt = new DateTimeOffset(2026, 5, 20, 9, 59, 0, TimeSpan.Zero), + }; + } +} diff --git a/tests/ScadaLink.ConfigurationDatabase.Tests/RepositoryCoverageTests.cs b/tests/ScadaLink.ConfigurationDatabase.Tests/RepositoryCoverageTests.cs index 573b9591..cafb0f84 100644 --- a/tests/ScadaLink.ConfigurationDatabase.Tests/RepositoryCoverageTests.cs +++ b/tests/ScadaLink.ConfigurationDatabase.Tests/RepositoryCoverageTests.cs @@ -419,32 +419,10 @@ public class NotificationOutboxRepositoryTests : IDisposable }; } - [Fact] - public async Task InsertIfNotExistsAsync_NewRow_InsertsAndReturnsTrue() - { - var id = Guid.NewGuid().ToString(); - - var inserted = await _repository.InsertIfNotExistsAsync(MakeNotification(id)); - - Assert.True(inserted); - _context.ChangeTracker.Clear(); - Assert.NotNull(await _context.Notifications.FindAsync(id)); - } - - [Fact] - public async Task InsertIfNotExistsAsync_DuplicateId_ReturnsFalseAndLeavesExistingRow() - { - var id = Guid.NewGuid().ToString(); - await _repository.InsertIfNotExistsAsync(MakeNotification(id, subject: "Original")); - _context.ChangeTracker.Clear(); - - var inserted = await _repository.InsertIfNotExistsAsync(MakeNotification(id, subject: "Changed")); - - Assert.False(inserted); - _context.ChangeTracker.Clear(); - var loaded = await _context.Notifications.FindAsync(id); - Assert.Equal("Original", loaded!.Subject); - } + // InsertIfNotExistsAsync coverage lives in + // tests/ScadaLink.ConfigurationDatabase.Tests/Repositories/NotificationOutboxRepositoryIntegrationTests.cs + // — the method is raw-SQL (IF NOT EXISTS … INSERT) so it must execute against + // SQL Server, not the SQLite in-memory provider this class uses. [Fact] public async Task GetDueAsync_ReturnsPendingAndDueRetrying_OrderedByCreatedAt_CappedAtBatchSize() diff --git a/tests/ScadaLink.DataConnectionLayer.Tests/DataConnectionActorTests.cs b/tests/ScadaLink.DataConnectionLayer.Tests/DataConnectionActorTests.cs index ba8312a7..c0fee246 100644 --- a/tests/ScadaLink.DataConnectionLayer.Tests/DataConnectionActorTests.cs +++ b/tests/ScadaLink.DataConnectionLayer.Tests/DataConnectionActorTests.cs @@ -730,6 +730,306 @@ public class DataConnectionActorTests : TestKit subscribeGate.SetCanceled(); } + // ── DataConnectionLayer-018: concurrent same-tag subscribes must not orphan adapter handles ── + + [Fact] + public async Task DCL018_ConcurrentSubscribes_SameTag_DifferentInstances_IssueOneAdapterSubscribe() + { + // Regression test for DataConnectionLayer-018. Before the fix, HandleSubscribe + // snapshotted _subscriptionIds.Keys on the actor thread BEFORE the Task.Run + // I/O. Two SubscribeTagsRequest messages for different instances sharing a tag + // would both observe "not subscribed" (the first request's SubscribeCompleted + // hadn't yet posted), both call _adapter.SubscribeAsync, and the second + // subscription id would be silently dropped at the + // _subscriptionIds.ContainsKey guard in HandleSubscribeCompleted — orphaning + // the adapter's monitored item permanently. With the _subscribesInFlight + // guard, the second request observes the tag in flight and treats it as + // AlreadySubscribed without issuing a second adapter call. + var subscribeStartedFirst = new TaskCompletionSource(); + var releaseFirst = new TaskCompletionSource(); + var subscribeCallCount = 0; + + _mockAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(Task.CompletedTask); + _mockAdapter.Status.Returns(ConnectionHealth.Connected); + _mockAdapter.SubscribeAsync("shared/tag", Arg.Any(), Arg.Any()) + .Returns(_ => + { + var n = Interlocked.Increment(ref subscribeCallCount); + if (n == 1) + { + // Park the first subscribe so the second SubscribeTagsRequest + // arrives on the actor thread while the first I/O is still in flight. + subscribeStartedFirst.TrySetResult(); + return releaseFirst.Task; + } + // The fix prevents this branch — fail loudly if it ever runs. + return Task.FromResult("sub-unexpected-" + n); + }); + _mockAdapter.ReadAsync(Arg.Any(), Arg.Any()) + .Returns(new ReadResult(false, null, null)); + + var actor = CreateConnectionActor("dcl018-shared-tag"); + await Task.Delay(300); + + // Request 1 — instance A. Park its adapter call in flight. + actor.Tell(new SubscribeTagsRequest( + "c1", "instA", "dcl018-shared-tag", ["shared/tag"], DateTimeOffset.UtcNow)); + await subscribeStartedFirst.Task.WaitAsync(TimeSpan.FromSeconds(5)); + + // Request 2 — instance B, same tag, sent while instance A's I/O is still in flight. + actor.Tell(new SubscribeTagsRequest( + "c2", "instB", "dcl018-shared-tag", ["shared/tag"], DateTimeOffset.UtcNow)); + + // Instance B's ack must come back before we release instance A — that proves + // instance B's request did NOT issue its own adapter SubscribeAsync (which is + // blocked) and instead saw the tag as in-flight. + SubscribeTagsResponse? bResponse = null; + for (var i = 0; i < 50 && bResponse is null; i++) + { + try { bResponse = ExpectMsg(TimeSpan.FromMilliseconds(100)); } + catch { /* keep polling */ } + if (bResponse?.InstanceUniqueName != "instB") bResponse = null; + } + + Assert.NotNull(bResponse); + Assert.True(bResponse!.Success); + Assert.Equal(1, Volatile.Read(ref subscribeCallCount)); + + // Release instance A's subscribe so the test cleans up. + releaseFirst.SetResult("sub-shared"); + ExpectMsg(TimeSpan.FromSeconds(5)); + + actor.Tell(new DataConnectionActor.GetHealthReport()); + var report = ExpectMsg(TimeSpan.FromSeconds(5)); + // One tag shared across two instances must count as exactly one subscription. + // DCL-020 (#28) tracks the related "previously-unresolved tag promoted via + // another instance" double-count case; here the tag was never unresolved. + Assert.Equal(1, report.ResolvedTags); + } + + // ── DataConnectionLayer-020: previously-unresolved tag, resolved via different instance, must not double-count ── + + [Fact] + public async Task DCL020_UnresolvedTagPromoted_ByDifferentInstance_DoesNotDoubleCountTotalSubscribed() + { + // Regression test for DataConnectionLayer-020. The first SubscribeTagsRequest + // (instance A, tag "promote/tag") fails at the resolution layer — the tag is + // added to _unresolvedTags AND _totalSubscribed is bumped to 1. The second + // SubscribeTagsRequest (instance B, same tag) succeeds the adapter call. Before + // the fix, HandleSubscribeCompleted's success branch unconditionally + // ++_totalSubscribed, taking the total to 2 — even though the logical + // subscription count is still 1. After the fix the success branch detects + // the unresolved-tag promotion and increments only _resolvedTags. + var subscribeCalls = 0; + _mockAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(Task.CompletedTask); + _mockAdapter.Status.Returns(ConnectionHealth.Connected); + _mockAdapter.SubscribeAsync("promote/tag", Arg.Any(), Arg.Any()) + .Returns(_ => + { + var n = Interlocked.Increment(ref subscribeCalls); + if (n == 1) return Task.FromException(new KeyNotFoundException("not yet")); + return Task.FromResult("sub-promote-" + n); + }); + _mockAdapter.ReadAsync(Arg.Any(), Arg.Any()) + .Returns(new ReadResult(false, null, null)); + + var actor = CreateConnectionActor("dcl020-promote"); + await Task.Delay(300); + + // Instance A — fails at resolution → _unresolvedTags has the tag, _totalSubscribed=1. + actor.Tell(new SubscribeTagsRequest( + "c1", "instA", "dcl020-promote", ["promote/tag"], DateTimeOffset.UtcNow)); + ExpectMsg(TimeSpan.FromSeconds(5)); // bad-quality push + ExpectMsg(TimeSpan.FromSeconds(5)); + + // Instance B — same tag, but this time the adapter succeeds (n==2 branch). + actor.Tell(new SubscribeTagsRequest( + "c2", "instB", "dcl020-promote", ["promote/tag"], DateTimeOffset.UtcNow)); + ExpectMsg(TimeSpan.FromSeconds(5)); + + actor.Tell(new DataConnectionActor.GetHealthReport()); + var report = ExpectMsg(TimeSpan.FromSeconds(5)); + + // Pre-fix: TotalSubscribedTags=2, ResolvedTags=1 (double-count). + // Post-fix: TotalSubscribedTags=1, ResolvedTags=1 — one logical tag, one resolved. + Assert.Equal(1, report.TotalSubscribedTags); + Assert.Equal(1, report.ResolvedTags); + } + + [Fact] + public async Task DCL020_TwoInstancesFailingSameTag_OnlyCountsTagOnceInTotal() + { + // Regression test for DataConnectionLayer-020's symmetric failure branch. + // Two instances both fail to resolve the same tag — _unresolvedTags must hold + // a single entry and _totalSubscribed must be 1, not 2. Pre-fix the failure + // branch always ran _totalSubscribed++, double-counting on the second + // instance's failure. + _mockAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(Task.CompletedTask); + _mockAdapter.Status.Returns(ConnectionHealth.Connected); + _mockAdapter.SubscribeAsync("missing/tag", Arg.Any(), Arg.Any()) + .Returns(Task.FromException(new KeyNotFoundException("not found"))); + _mockAdapter.ReadAsync(Arg.Any(), Arg.Any()) + .Returns(new ReadResult(false, null, null)); + + var actor = CreateConnectionActor("dcl020-twofail"); + await Task.Delay(300); + + actor.Tell(new SubscribeTagsRequest( + "c1", "instA", "dcl020-twofail", ["missing/tag"], DateTimeOffset.UtcNow)); + ExpectMsg(TimeSpan.FromSeconds(5)); + ExpectMsg(TimeSpan.FromSeconds(5)); + + actor.Tell(new SubscribeTagsRequest( + "c2", "instB", "dcl020-twofail", ["missing/tag"], DateTimeOffset.UtcNow)); + ExpectMsg(TimeSpan.FromSeconds(5)); + ExpectMsg(TimeSpan.FromSeconds(5)); + + actor.Tell(new DataConnectionActor.GetHealthReport()); + var report = ExpectMsg(TimeSpan.FromSeconds(5)); + + Assert.Equal(1, report.TotalSubscribedTags); + Assert.Equal(0, report.ResolvedTags); + } + + // ── DataConnectionLayer-021: mid-flight unsubscribe must release adapter handle and drop state ── + + [Fact] + public async Task DCL021_UnsubscribeDuringInFlightSubscribe_ReleasesAdapterHandle_AndKeepsStateClean() + { + // Regression test for DataConnectionLayer-021. Previously HandleSubscribeCompleted + // re-created _subscriptionsByInstance[instanceName] when the instance had been + // unsubscribed while the subscribe I/O was in flight, and then ran the same + // counter/handle mutations as the happy path. The leak permanently inflated + // _subscriptionsByInstance, _tagSubscriberCount, and _totalSubscribed (also re- + // issued by ReSubscribeAll after every reconnect), and orphaned the adapter + // monitored item. After the fix, the stale completion is logged + dropped, and + // _adapter.UnsubscribeAsync is fired for each successful subscription id. + var subscribeStarted = new TaskCompletionSource(); + var releaseSubscribe = new TaskCompletionSource(); + _mockAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(Task.CompletedTask); + _mockAdapter.Status.Returns(ConnectionHealth.Connected); + _mockAdapter.SubscribeAsync("stale/tag", Arg.Any(), Arg.Any()) + .Returns(_ => + { + subscribeStarted.TrySetResult(); + return releaseSubscribe.Task; + }); + _mockAdapter.UnsubscribeAsync(Arg.Any(), Arg.Any()) + .Returns(Task.CompletedTask); + _mockAdapter.ReadAsync(Arg.Any(), Arg.Any()) + .Returns(new ReadResult(false, null, null)); + + var actor = CreateConnectionActor("dcl021-mid-flight"); + await Task.Delay(300); + + // Subscribe instance A — block the adapter call so unsubscribe arrives first. + actor.Tell(new SubscribeTagsRequest( + "c1", "instA", "dcl021-mid-flight", ["stale/tag"], DateTimeOffset.UtcNow)); + await subscribeStarted.Task.WaitAsync(TimeSpan.FromSeconds(5)); + + // Unsubscribe instance A while the subscribe I/O is still parked. + actor.Tell(new UnsubscribeTagsRequest("unsub-c1", "instA", "dcl021-mid-flight", DateTimeOffset.UtcNow)); + await Task.Delay(100); + + // Release the subscribe — SubscribeCompleted is now stale. + releaseSubscribe.SetResult("sub-orphan"); + + // Wait for SubscribeTagsResponse OR a quiescence interval. The fix may skip + // the response (instance is gone); allow either outcome but require the + // adapter UnsubscribeAsync call to have fired. + await Task.Delay(500); + + await _mockAdapter.Received(1).UnsubscribeAsync( + Arg.Is(s => s == "sub-orphan"), Arg.Any()); + + actor.Tell(new DataConnectionActor.GetHealthReport()); + var report = ExpectMsg(TimeSpan.FromSeconds(5)); + // Total / resolved must reflect the post-unsubscribe state: 0 tags subscribed. + Assert.Equal(0, report.TotalSubscribedTags); + Assert.Equal(0, report.ResolvedTags); + } + + // ── DataConnectionLayer-022: tag-resolution retry timer must not reset on every failure ── + + [Fact] + public async Task DCL022_BurstedFailedSubscribes_DoNotResetRetryTimer() + { + // Regression test for DataConnectionLayer-022. Both HandleSubscribeCompleted + // and HandleTagResolutionFailed previously called Timers.StartPeriodicTimer + // unconditionally — StartPeriodicTimer with an existing key cancels and + // replaces the prior timer, so a burst of SubscribeTagsRequests arriving + // faster than TagResolutionRetryInterval would re-arm the timer every time + // and starve the retry indefinitely. After the fix, IsTimerActive gates + // the StartPeriodicTimer call so the first failure starts the timer and + // subsequent failures just append to _unresolvedTags. + _options.TagResolutionRetryInterval = TimeSpan.FromMilliseconds(300); + + var subscribeCalls = 0; + _mockAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(Task.CompletedTask); + _mockAdapter.Status.Returns(ConnectionHealth.Connected); + // All subscribes fail at the resolution layer (KeyNotFoundException is a + // non-connection failure → marks the tag unresolved and starts the timer). + _mockAdapter.SubscribeAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(_ => + { + Interlocked.Increment(ref subscribeCalls); + return Task.FromException(new KeyNotFoundException("not found")); + }); + _mockAdapter.ReadAsync(Arg.Any(), Arg.Any()) + .Returns(new ReadResult(false, null, null)); + + var actor = CreateConnectionActor("dcl022-retry-gate"); + await Task.Delay(300); // reach Connected + + // Fire 5 SubscribeTagsRequests with distinct tags, each ~50ms apart so the + // total burst (~250ms) is well under the 300ms retry interval. Pre-fix, + // every failure called StartPeriodicTimer — the 5th call would cancel the + // running timer and re-arm a fresh 300ms wait, pushing the first retry to + // ~550ms after the first failure. Post-fix: timer starts once, fires at + // ~300ms after the first failure regardless of subsequent failures. + var t0 = DateTimeOffset.UtcNow; + for (var i = 0; i < 5; i++) + { + actor.Tell(new SubscribeTagsRequest( + $"c{i}", $"inst{i}", "dcl022-retry-gate", [$"burst/tag{i}"], DateTimeOffset.UtcNow)); + await Task.Delay(50); + } + + // Drain the 5 ack messages and the 5 bad-quality TagValueUpdates so we + // don't accidentally compare them against the retry-induced subscribe count. + for (var i = 0; i < 5; i++) + { + // Order: bad-quality TagValueUpdate fires first, then SubscribeTagsResponse. + ExpectMsg(TimeSpan.FromSeconds(5)); + ExpectMsg(TimeSpan.FromSeconds(5)); + } + + var initialFailureCalls = Volatile.Read(ref subscribeCalls); + Assert.Equal(5, initialFailureCalls); + + // Wait one retry interval past the first failure. If the timer was reset by + // each subsequent failure, no retry has fired yet and subscribeCalls is + // still 5. With the gate, the timer has fired and re-attempted every + // unresolved tag (5 more calls). + var firstFailureToNow = DateTimeOffset.UtcNow - t0; + var waitForRetryFire = TimeSpan.FromMilliseconds(450) - firstFailureToNow; + if (waitForRetryFire > TimeSpan.Zero) + { + await Task.Delay(waitForRetryFire); + } + + var afterFirstInterval = Volatile.Read(ref subscribeCalls); + Assert.True(afterFirstInterval > initialFailureCalls, + $"Retry timer should have fired within ~300ms of the first failure, " + + $"but subscribeCalls stayed at {afterFirstInterval} (initial: {initialFailureCalls}). " + + $"This is the DCL-022 reset-on-every-call starvation regression."); + } + // ── DataConnectionLayer-011: stale callbacks from a disposed adapter must be dropped ── [Fact] diff --git a/tests/ScadaLink.DataConnectionLayer.Tests/OpcUaDataConnectionTests.cs b/tests/ScadaLink.DataConnectionLayer.Tests/OpcUaDataConnectionTests.cs index d93e447f..077c491a 100644 --- a/tests/ScadaLink.DataConnectionLayer.Tests/OpcUaDataConnectionTests.cs +++ b/tests/ScadaLink.DataConnectionLayer.Tests/OpcUaDataConnectionTests.cs @@ -39,6 +39,42 @@ public class RealOpcUaClientThreadSafetyTests } } +/// +/// DataConnectionLayer-019: previously kept a +/// dead Dictionary<string,string> _subscriptionHandles field that was +/// written and removed across thread-pool continuations but never read. Plain +/// Dictionary writes from concurrent post-await continuations are racy; the +/// field was a latent bug waiting for any future reader. The fix deletes the +/// field rather than converting it to ConcurrentDictionary (bookkeeping already +/// lives in RealOpcUaClient._monitoredItems/_callbacks and +/// DataConnectionActor._subscriptionIds). This test guards against +/// regression — anyone re-introducing a non-concurrent shared dictionary on +/// the adapter must justify it explicitly. +/// +public class OpcUaDataConnectionThreadSafetyTests +{ + [Fact] + public void DCL019_OpcUaDataConnection_HasNoNonConcurrentSharedDictionary() + { + // Reflection-walk every instance field on the adapter. Any + // System.Collections.Generic.Dictionary<,> field would be a regression: + // either dead state (return it) or live state mutated from continuations + // (convert to ConcurrentDictionary). Either way, fail the test. + var dictionaryFields = typeof(OpcUaDataConnection) + .GetFields(System.Reflection.BindingFlags.Instance | + System.Reflection.BindingFlags.NonPublic | + System.Reflection.BindingFlags.Public) + .Where(f => f.FieldType.IsGenericType && + f.FieldType.GetGenericTypeDefinition() == typeof(Dictionary<,>)) + .Select(f => f.Name) + .ToList(); + + Assert.True(dictionaryFields.Count == 0, + $"OpcUaDataConnection must not hold a non-concurrent Dictionary<,> field; " + + $"found: {string.Join(", ", dictionaryFields)}. See DCL-019."); + } +} + /// /// DataConnectionLayer-012: secure-by-default certificate handling. /// diff --git a/tests/ScadaLink.SiteRuntime.Tests/Actors/DeploymentManagerRedeployTests.cs b/tests/ScadaLink.SiteRuntime.Tests/Actors/DeploymentManagerRedeployTests.cs index e514acdd..42976027 100644 --- a/tests/ScadaLink.SiteRuntime.Tests/Actors/DeploymentManagerRedeployTests.cs +++ b/tests/ScadaLink.SiteRuntime.Tests/Actors/DeploymentManagerRedeployTests.cs @@ -125,6 +125,65 @@ public class DeploymentManagerRedeployTests : TestKit, IDisposable Assert.True(disable.Success); } + [Fact] + public async Task SR020_ThreeRapidDeploys_DoNotThrowInvalidActorNameException_LatestWins() + { + // Regression test for SiteRuntime-020. The previous implementation tracked + // pending redeploys by IActorRef (_pendingRedeploys) but had no + // name-keyed shadow, so a third DeployInstanceCommand arriving WHILE the + // first redeploy's predecessor was still terminating saw + // _instanceActors.TryGetValue==false and fell through to + // ApplyDeployment → CreateInstanceActor → Context.ActorOf, which threw + // InvalidActorNameException because the child name was still registered + // until Terminated fires. The supervisor's Stop directive then silently + // dropped the deploy, leaving the deployer waiting forever and the + // persistence Task.Run dangling. After the fix, _terminatingActorsByName + // tracks the in-flight terminator by name; the third deploy overwrites + // the buffered pending command (last-write-wins) and tells the displaced + // sender it was superseded. + var actor = CreateDeploymentManager(); + await Task.Delay(500); + + // Initial deploy — establishes the running instance. + actor.Tell(new DeployInstanceCommand( + "dep-1", "RapidPump", "h1", MakeConfigJson("RapidPump"), "admin", DateTimeOffset.UtcNow)); + var first = ExpectMsg(TimeSpan.FromSeconds(5)); + Assert.Equal(DeploymentStatus.Success, first.Status); + await Task.Delay(200); + + // Two rapid redeploys before the predecessor has time to fully terminate. + // The second deploy stops the actor (watching it) and buffers itself. + // The third deploy arrives almost immediately and must NOT crash — it + // overwrites the buffered pending command and tells dep-2 it was superseded. + var probe2 = CreateTestProbe(); + var probe3 = CreateTestProbe(); + + actor.Tell(new DeployInstanceCommand( + "dep-2", "RapidPump", "h2", MakeConfigJson("RapidPump"), "admin", DateTimeOffset.UtcNow), + probe2.Ref); + actor.Tell(new DeployInstanceCommand( + "dep-3", "RapidPump", "h3", MakeConfigJson("RapidPump"), "admin", DateTimeOffset.UtcNow), + probe3.Ref); + + // dep-2 must be told it was superseded; dep-3 must succeed once the + // predecessor finishes terminating. + var superseded = probe2.ExpectMsg(TimeSpan.FromSeconds(10)); + Assert.Equal("dep-2", superseded.DeploymentId); + Assert.Equal(DeploymentStatus.Failed, superseded.Status); + Assert.NotNull(superseded.ErrorMessage); + Assert.Contains("superseded", superseded.ErrorMessage!, StringComparison.OrdinalIgnoreCase); + + var winner = probe3.ExpectMsg(TimeSpan.FromSeconds(10)); + Assert.Equal("dep-3", winner.DeploymentId); + Assert.Equal(DeploymentStatus.Success, winner.Status); + + // The instance must still be operable — proves no orphaned actor / no + // half-created child holding the name. + actor.Tell(new DisableInstanceCommand("cmd-1", "RapidPump", DateTimeOffset.UtcNow)); + var disable = ExpectMsg(TimeSpan.FromSeconds(5)); + Assert.True(disable.Success); + } + [Fact] public async Task Redeploy_ExistingInstance_DoesNotOverCountDeployedInstances() { diff --git a/tests/ScadaLink.SiteRuntime.Tests/Tracking/OperationTrackingStoreTests.cs b/tests/ScadaLink.SiteRuntime.Tests/Tracking/OperationTrackingStoreTests.cs index b302683a..a3ac99e2 100644 --- a/tests/ScadaLink.SiteRuntime.Tests/Tracking/OperationTrackingStoreTests.cs +++ b/tests/ScadaLink.SiteRuntime.Tests/Tracking/OperationTrackingStoreTests.cs @@ -438,4 +438,115 @@ public class OperationTrackingStoreTests Assert.NotNull(await store.GetStatusAsync(bId)); // kept (recent terminal) Assert.NotNull(await store.GetStatusAsync(cId)); // kept (non-terminal) } + + // ── SiteRuntime-024: read/write split + sync-safe Dispose ────────────── + + [Fact] + public async Task SR024_ConcurrentReads_DoNotBlockOnInFlightWrite() + { + // Regression test for SiteRuntime-024 (perf half). Pre-fix, every + // GetStatusAsync took the same _gate as RecordTerminalAsync, so a single + // long-running write would queue up every concurrent status query. After + // the fix, reads open a fresh SqliteConnection per call and don't take + // the write gate at all — so they should run concurrently with a write. + // + // The test seeds a row, then issues many parallel reads while a write is + // also in flight. We assert the reads return successfully (a regression + // would either deadlock the test runner or take far longer than the gate + // would have allowed any single read). The actual timing-comparison + // assertion would be flaky in CI; this test asserts only correctness + + // forward progress. + var (store, _) = CreateStore(nameof(SR024_ConcurrentReads_DoNotBlockOnInFlightWrite)); + await using (store) + { + var id = TrackedOperationId.New(); + await store.RecordEnqueueAsync( + id, + kind: "ApiCallCached", + targetSummary: "ERP.GetOrder", + sourceInstanceId: null, + sourceScript: null, + sourceNode: "node-a"); + + // Fire 10 concurrent reads + a write in parallel; all must complete. + var readTasks = Enumerable.Range(0, 10) + .Select(_ => store.GetStatusAsync(id)) + .ToArray(); + var writeTask = store.RecordAttemptAsync( + id, status: "Retrying", retryCount: 1, lastError: "transient", httpStatus: 503); + + await Task.WhenAll(readTasks); + await writeTask; + + foreach (var t in readTasks) + { + Assert.NotNull(await t); + } + } + } + + [Fact] + public async Task SR024_SyncDispose_DoesNotDeadlock_WhenInvokedFromFreshThread() + { + // Regression test for SiteRuntime-024 (deadlock half). Pre-fix, Dispose + // bridged to async via DisposeAsyncCore().AsTask().GetAwaiter().GetResult() + // — sync-over-async on a SemaphoreSlim can deadlock under a non-reentrant + // SyncContext (host shutdown continuations). Post-fix, Dispose runs + // synchronously without acquiring the gate. + var (store, _) = CreateStore(nameof(SR024_SyncDispose_DoesNotDeadlock_WhenInvokedFromFreshThread)); + + // Seed a row so the store has live state when disposed. + await store.RecordEnqueueAsync( + TrackedOperationId.New(), + kind: "ApiCallCached", + targetSummary: "ERP.GetOrder", + sourceInstanceId: null, + sourceScript: null, + sourceNode: "node-a"); + + var disposeReturned = new TaskCompletionSource(); + var disposeThread = new Thread(() => + { + try + { + store.Dispose(); + disposeReturned.SetResult(true); + } + catch (Exception ex) + { + disposeReturned.SetException(ex); + } + }) { IsBackground = true }; + + disposeThread.Start(); + + // 5s ceiling — if Dispose deadlocks, the test fails with TimeoutException. + var completed = await Task.WhenAny( + disposeReturned.Task, Task.Delay(TimeSpan.FromSeconds(5))); + Assert.Same(disposeReturned.Task, completed); + Assert.True(await disposeReturned.Task); + } + + [Fact] + public async Task SR024_AsyncDispose_DoesNotDeadlock_AndIsIdempotent() + { + // The async path must also tolerate Dispose() being called afterwards + // (host shutdown's standard pattern). The _disposeState exchange should + // short-circuit the second call. + var (store, _) = CreateStore(nameof(SR024_AsyncDispose_DoesNotDeadlock_AndIsIdempotent)); + + await store.RecordEnqueueAsync( + TrackedOperationId.New(), + kind: "ApiCallCached", + targetSummary: "ERP.GetOrder", + sourceInstanceId: null, + sourceScript: null, + sourceNode: "node-a"); + + await store.DisposeAsync(); + // Second call must be a no-op, not throw. + store.Dispose(); + // And a third async — also a no-op. + await store.DisposeAsync(); + } }