fix(concurrency/lifetime): close Theme 5 — 10 concurrency / DI / scope findings

Concurrency hazards, DI lifetime hygiene, and one verify-only confirmation
across 8 modules. Highlights:

Concurrency:
- CentralUI-030: SandboxConsoleCapture writes routed through WriteSynchronized
  locking on the captured StringWriter — intra-script Task fan-out can no
  longer corrupt the per-call buffer.
- Commons-021: ExternalCallResult.Response now backed by Lazy<dynamic?>
  (ExecutionAndPublication) — no more benign double-parse race.
- CD-017: DeploymentManagerRepository.DeleteDeploymentRecordAsync now takes
  an expected RowVersion and seeds entry.OriginalValues so EF emits
  DELETE ... WHERE Id=@id AND RowVersion=@prior; stale RowVersion now
  throws DbUpdateConcurrencyException instead of silent overwrite.
- Transport-009: AuditCorrelationContext.BundleImportId backed by
  AsyncLocal<Guid?> so concurrent imports get per-logical-call isolation
  (was a scoped instance shared via AuditService across runs).

DI / lifetime:
- AuditLog-003: All 3 AuditLog actor handlers switched to CreateAsyncScope
  + await using — async EF disposal no longer swallowed.
- AuditLog-007: INodeIdentityProvider resolution standardised on
  GetRequiredService<>() (was mixed with GetService<>()).
- AuditLog-011: AddAuditLogHealthMetricsBridge guarded by sentinel
  descriptor check — calling twice no longer double-registers the hosted
  service.

Shutdown / supervision:
- SiteCallAudit-002: AkkaHostedService adds a CoordinatedShutdown
  cluster-leave task (drain-site-call-audit-singleton) that issues a
  bounded GracefulStop(10s) so failover waits for in-flight upserts.

Registration safety:
- NS-020: AkkaHostedService now guards NotificationForwarder S&F
  registration with _notificationDeliveryHandlerRegistered + throws
  InvalidOperationException on double-register to make the regression loud.

VERIFY-only closures:
- NotifOutbox-005: Confirmed already closed by CD-015 fix (ac96b83) —
  NotificationOutboxRepository.InsertIfNotExistsAsync uses the same
  raw-SQL IF NOT EXISTS + 2601/2627 swallow pattern; race eliminated.

5+ new regression tests (CentralUI sandbox WhenAll, ExternalCallResult
64-reader Barrier, AuditLog DI idempotency, RowVersion stale-throw,
SiteCallAudit-002 shutdown drain). Build clean; affected suites all green.
README regenerated: 65 open (was 75).
This commit is contained in:
Joseph Doherty
2026-05-28 07:29:41 -04:00
parent 6ae0fea558
commit 2ed5c6c379
25 changed files with 699 additions and 239 deletions
@@ -122,65 +122,69 @@ public class AuditLogIngestActor : ReceiveActor
// ctor has no service provider — it falls through with no filter,
// which preserves the small-payload assumptions baked into the
// existing D2 fixtures.
IServiceScope? scope = null;
IAuditLogRepository repository;
IAuditPayloadFilter? filter = null;
ICentralAuditWriteFailureCounter? failureCounter = null;
// AuditLog-003: use CreateAsyncScope + await using so scoped EF Core
// services (IAsyncDisposable DbContexts) dispose asynchronously
// without blocking on sync Dispose() of pending connection cleanup.
if (_injectedRepository is not null)
{
repository = _injectedRepository;
await IngestWithRepositoryAsync(_injectedRepository, filter: null, failureCounter: null, cmd, nowUtc, accepted)
.ConfigureAwait(false);
}
else
{
scope = _serviceProvider!.CreateScope();
repository = scope.ServiceProvider.GetRequiredService<IAuditLogRepository>();
filter = scope.ServiceProvider.GetService<IAuditPayloadFilter>();
await using var scope = _serviceProvider!.CreateAsyncScope();
var repository = scope.ServiceProvider.GetRequiredService<IAuditLogRepository>();
var filter = scope.ServiceProvider.GetService<IAuditPayloadFilter>();
// M6 Bundle E (T8): central health counter is best-effort —
// unregistered (test composition roots) means the per-row catch
// simply logs without surfacing on the health dashboard.
failureCounter = scope.ServiceProvider.GetService<ICentralAuditWriteFailureCounter>();
}
try
{
foreach (var evt in cmd.Events)
{
try
{
// Stamp IngestedAtUtc here, not at the site. Bundle A's
// repository hardening already swallows duplicate-key races,
// so the same id arriving twice (site retry, reconciliation)
// is a silent no-op.
// Filter BEFORE the IngestedAtUtc stamp so the redacted
// copy carries the central-side ingest timestamp. Filter
// is contract-bound to never throw; null = pass-through.
var filtered = filter?.Apply(evt) ?? evt;
var ingested = filtered with { IngestedAtUtc = nowUtc };
await repository.InsertIfNotExistsAsync(ingested).ConfigureAwait(false);
accepted.Add(evt.EventId);
}
catch (Exception ex)
{
// Per-row catch — one bad row never sinks the whole batch.
// The row stays Pending at the site; the next drain retries.
// M6 Bundle E (T8): bump the central health counter so a
// sustained insert-throw failure surfaces on the dashboard.
try { failureCounter?.Increment(); }
catch { /* counter must never throw — defence in depth */ }
_logger.LogError(ex,
"Failed to persist audit event {EventId} during batch ingest; row will be retried by the site.",
evt.EventId);
}
}
}
finally
{
scope?.Dispose();
var failureCounter = scope.ServiceProvider.GetService<ICentralAuditWriteFailureCounter>();
await IngestWithRepositoryAsync(repository, filter, failureCounter, cmd, nowUtc, accepted)
.ConfigureAwait(false);
}
replyTo.Tell(new IngestAuditEventsReply(accepted));
}
private async Task IngestWithRepositoryAsync(
IAuditLogRepository repository,
IAuditPayloadFilter? filter,
ICentralAuditWriteFailureCounter? failureCounter,
IngestAuditEventsCommand cmd,
DateTime nowUtc,
List<Guid> accepted)
{
foreach (var evt in cmd.Events)
{
try
{
// Stamp IngestedAtUtc here, not at the site. Bundle A's
// repository hardening already swallows duplicate-key races,
// so the same id arriving twice (site retry, reconciliation)
// is a silent no-op.
// Filter BEFORE the IngestedAtUtc stamp so the redacted
// copy carries the central-side ingest timestamp. Filter
// is contract-bound to never throw; null = pass-through.
var filtered = filter?.Apply(evt) ?? evt;
var ingested = filtered with { IngestedAtUtc = nowUtc };
await repository.InsertIfNotExistsAsync(ingested).ConfigureAwait(false);
accepted.Add(evt.EventId);
}
catch (Exception ex)
{
// Per-row catch — one bad row never sinks the whole batch.
// The row stays Pending at the site; the next drain retries.
// M6 Bundle E (T8): bump the central health counter so a
// sustained insert-throw failure surfaces on the dashboard.
try { failureCounter?.Increment(); }
catch { /* counter must never throw — defence in depth */ }
_logger.LogError(ex,
"Failed to persist audit event {EventId} during batch ingest; row will be retried by the site.",
evt.EventId);
}
}
}
/// <summary>
/// M3 dual-write handler. For every <see cref="CachedTelemetryEntry"/> the
/// actor opens a fresh MS SQL transaction, inserts the AuditLog row
@@ -134,79 +134,73 @@ public class AuditLogPurgeActor : ReceiveActor
// restart.
var threshold = DateTime.UtcNow - TimeSpan.FromDays(_auditOptions.RetentionDays);
IServiceScope? scope = null;
// AuditLog-003: use CreateAsyncScope + await using so scoped EF Core
// services (IAsyncDisposable DbContexts) dispose asynchronously
// without blocking on sync Dispose() of pending connection cleanup.
await using var scope = _services.CreateAsyncScope();
IAuditLogRepository repository;
try
{
scope = _services.CreateScope();
repository = scope.ServiceProvider.GetRequiredService<IAuditLogRepository>();
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to resolve IAuditLogRepository for AuditLog purge tick.");
scope?.Dispose();
return;
}
IReadOnlyList<DateTime> boundaries;
try
{
IReadOnlyList<DateTime> boundaries;
boundaries = await repository
.GetPartitionBoundariesOlderThanAsync(threshold)
.ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogError(
ex,
"Failed to enumerate eligible AuditLog partition boundaries (threshold {ThresholdUtc:o}); skipping purge tick.",
threshold);
return;
}
if (boundaries.Count == 0)
{
return;
}
foreach (var boundary in boundaries)
{
// Per-boundary try/catch: one bad partition (transient SQL
// failure, missing object, contention with backup) does NOT
// abandon the rest of the tick.
var sw = Stopwatch.StartNew();
try
{
boundaries = await repository
.GetPartitionBoundariesOlderThanAsync(threshold)
var rowsDeleted = await repository
.SwitchOutPartitionAsync(boundary)
.ConfigureAwait(false);
sw.Stop();
eventStream.Publish(
new AuditLogPurgedEvent(boundary, rowsDeleted, sw.ElapsedMilliseconds));
_logger.LogInformation(
"Purged AuditLog partition {MonthBoundary:yyyy-MM-dd}; {RowsDeleted} rows in {DurationMs} ms.",
boundary,
rowsDeleted,
sw.ElapsedMilliseconds);
}
catch (Exception ex)
{
sw.Stop();
_logger.LogError(
ex,
"Failed to enumerate eligible AuditLog partition boundaries (threshold {ThresholdUtc:o}); skipping purge tick.",
threshold);
return;
"Failed to purge AuditLog partition {MonthBoundary:yyyy-MM-dd}; other partitions continue. Elapsed {DurationMs} ms.",
boundary,
sw.ElapsedMilliseconds);
}
if (boundaries.Count == 0)
{
return;
}
foreach (var boundary in boundaries)
{
// Per-boundary try/catch: one bad partition (transient SQL
// failure, missing object, contention with backup) does NOT
// abandon the rest of the tick.
var sw = Stopwatch.StartNew();
try
{
var rowsDeleted = await repository
.SwitchOutPartitionAsync(boundary)
.ConfigureAwait(false);
sw.Stop();
eventStream.Publish(
new AuditLogPurgedEvent(boundary, rowsDeleted, sw.ElapsedMilliseconds));
_logger.LogInformation(
"Purged AuditLog partition {MonthBoundary:yyyy-MM-dd}; {RowsDeleted} rows in {DurationMs} ms.",
boundary,
rowsDeleted,
sw.ElapsedMilliseconds);
}
catch (Exception ex)
{
sw.Stop();
_logger.LogError(
ex,
"Failed to purge AuditLog partition {MonthBoundary:yyyy-MM-dd}; other partitions continue. Elapsed {DurationMs} ms.",
boundary,
sw.ElapsedMilliseconds);
}
}
}
finally
{
scope.Dispose();
}
}
@@ -195,44 +195,38 @@ public class SiteAuditReconciliationActor : ReceiveActor
return;
}
IServiceScope? scope = null;
// AuditLog-003: use CreateAsyncScope + await using so scoped EF Core
// services (IAsyncDisposable DbContexts) dispose asynchronously
// without blocking on sync Dispose() of pending connection cleanup.
await using var scope = _services.CreateAsyncScope();
IAuditLogRepository repository;
try
{
scope = _services.CreateScope();
repository = scope.ServiceProvider.GetRequiredService<IAuditLogRepository>();
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to resolve IAuditLogRepository for reconciliation tick.");
scope?.Dispose();
return;
}
try
foreach (var site in sites)
{
foreach (var site in sites)
try
{
try
{
await PullSiteAsync(site, repository, eventStream).ConfigureAwait(false);
}
catch (Exception ex)
{
// Catch-all per the failure-isolation invariant: one site's
// fault must not sink the rest of the tick. The cursor for
// the failing site is left at its previous value so the
// next tick retries the same window.
_logger.LogWarning(
ex,
"Reconciliation pull failed for site {SiteId}; other sites continue.",
site.SiteId);
}
await PullSiteAsync(site, repository, eventStream).ConfigureAwait(false);
}
catch (Exception ex)
{
// Catch-all per the failure-isolation invariant: one site's
// fault must not sink the rest of the tick. The cursor for
// the failing site is left at its previous value so the
// next tick retries the same window.
_logger.LogWarning(
ex,
"Reconciliation pull failed for site {SiteId}; other sites continue.",
site.SiteId);
}
}
finally
{
scope.Dispose();
}
}
@@ -150,16 +150,15 @@ public static class ServiceCollectionExtensions
sp.GetRequiredService<IAuditWriter>(),
sp.GetService<ScadaLink.Commons.Interfaces.IOperationTrackingStore>(),
sp.GetRequiredService<ILogger<CachedCallTelemetryForwarder>>(),
// SourceNode-stamping (Task 14): the local node identity is
// threaded through so RecordEnqueueAsync can stamp the
// tracking row's SourceNode column. GetService (not
// GetRequiredService) — test composition roots that build a
// stripped DI container may not register the provider, in
// which case the forwarder degrades to a null SourceNode
// rather than failing the DI resolution. Production hosts
// (site + central) always register it via
// SiteServiceRegistration.BindSharedOptions.
sp.GetService<INodeIdentityProvider>()));
// AuditLog-007: INodeIdentityProvider is now required across
// every consumer in AddAuditLog. The Host's
// SiteServiceRegistration registers it as a singleton on both
// site and central paths (InboundAPI-022 / Host registration
// sweep), and the AddAuditLogTests fixture provides a
// FakeNodeIdentityProvider; a silent GetService() that
// returned null would mask a future composition root that
// forgot to register the provider.
sp.GetRequiredService<INodeIdentityProvider>()));
// M3 Bundle F: bridge the store-and-forward retry-loop observer hook
// to the cached-call forwarder so per-attempt + terminal telemetry
@@ -171,15 +170,17 @@ public static class ServiceCollectionExtensions
// INodeIdentityProvider singleton can be threaded through — the
// bridge stamps SiteCallOperational.SourceNode from
// INodeIdentityProvider.NodeName on every cached-call lifecycle row.
// GetService (not GetRequiredService) test composition roots that
// build a stripped DI container may not register the provider, in
// which case the bridge degrades to a null SourceNode rather than
// failing the DI resolution. Production hosts (site + central)
// always register it via SiteServiceRegistration.BindSharedOptions.
// AuditLog-007: the provider is resolved with GetRequiredService —
// SiteServiceRegistration.BindSharedOptions registers it on both
// site and central paths, so a missing registration is a
// composition-root bug, not a silent null-SourceNode degradation.
services.AddSingleton<CachedCallLifecycleBridge>(sp => new CachedCallLifecycleBridge(
sp.GetRequiredService<ICachedCallTelemetryForwarder>(),
sp.GetRequiredService<ILogger<CachedCallLifecycleBridge>>(),
sp.GetService<INodeIdentityProvider>()));
// AuditLog-007: required, matches the other consumers in this
// composition root — the provider is always registered by
// SiteServiceRegistration.
sp.GetRequiredService<INodeIdentityProvider>()));
services.AddSingleton<ICachedCallLifecycleObserver>(
sp => sp.GetRequiredService<CachedCallLifecycleBridge>());
@@ -245,8 +246,10 @@ public static class ServiceCollectionExtensions
/// time — by design, since a silent NoOp would mask a misconfiguration.
/// </para>
/// <para>
/// Idempotent — calling twice replaces each descriptor without piling up
/// registrations.
/// Idempotent — a sentinel check on the
/// <see cref="SiteAuditBacklogReporter"/> hosted-service descriptor
/// short-circuits subsequent calls so the hosted service is not
/// double-registered (AddHostedService has no TryAdd variant).
/// </para>
/// <para>
/// Site-side only for M5: the central composition root keeps the NoOp
@@ -261,6 +264,18 @@ public static class ServiceCollectionExtensions
{
ArgumentNullException.ThrowIfNull(services);
// AuditLog-011: guard against double-registration. AddHostedService is
// additive (no TryAdd variant) so a second call without this sentinel
// would spin up a second SiteAuditBacklogReporter, doubling the 30 s
// SQL probe rate and racing on the ISiteHealthCollector snapshot. The
// SiteAuditBacklogReporter descriptor is the discriminator: it's only
// registered by this helper, so its presence proves the bridge has
// already been wired.
if (services.Any(d => d.ImplementationType == typeof(SiteAuditBacklogReporter)))
{
return services;
}
services.Replace(
ServiceDescriptor.Singleton<IAuditWriteFailureCounter, HealthMetricsAuditWriteFailureCounter>());
services.Replace(
@@ -79,23 +79,53 @@ internal sealed class SandboxConsoleCapture : TextWriter
return new CaptureScope(this, previous);
}
// CentralUI-030: intra-script concurrency hardening. A sandboxed script
// can fan out work with `Task.WhenAll` / `Task.Run`; `AsyncLocal` flows
// the capture `StringWriter` into every child task, so two tasks can
// race the *same* buffer. `StringWriter` is not thread-safe — concurrent
// `Write`/`WriteLine` calls can corrupt the underlying `StringBuilder`
// and either throw or interleave at the character level. We lock on the
// captured writer itself so writes from one capture scope serialise;
// fall-through to the original `_fallback` (host-process console) is
// unlocked because the BCL's process-wide `Console.Out` is already
// synchronised by its TextWriter wrapper.
/// <inheritdoc />
public override void Write(char value) => Target.Write(value);
public override void Write(char value) => WriteSynchronized(t => t.Write(value));
/// <inheritdoc />
public override void Write(string? value) => Target.Write(value);
public override void Write(string? value) => WriteSynchronized(t => t.Write(value));
/// <inheritdoc />
public override void Write(char[] buffer, int index, int count) =>
Target.Write(buffer, index, count);
WriteSynchronized(t => t.Write(buffer, index, count));
/// <inheritdoc />
public override void WriteLine() => Target.WriteLine();
public override void WriteLine() => WriteSynchronized(t => t.WriteLine());
/// <inheritdoc />
public override void WriteLine(string? value) => Target.WriteLine(value);
public override void WriteLine(string? value) => WriteSynchronized(t => t.WriteLine(value));
private TextWriter Target => _current.Value ?? _fallback;
/// <summary>
/// Routes a single write through the currently-active capture buffer
/// under a lock on that buffer, or to the unwrapped fallback writer when
/// no capture scope is active. The lock target is the `StringWriter`
/// instance itself — different capture scopes have different writers,
/// so two unrelated scopes never block each other.
/// </summary>
private void WriteSynchronized(Action<TextWriter> write)
{
var captured = _current.Value;
if (captured is null)
{
write(_fallback);
return;
}
lock (captured)
{
write(captured);
}
}
internal readonly struct CaptureScope : IDisposable
{
@@ -56,12 +56,19 @@ public interface IDeploymentManagerRepository
/// <returns>A task representing the asynchronous operation.</returns>
Task UpdateDeploymentRecordAsync(DeploymentRecord record, CancellationToken cancellationToken = default);
/// <summary>
/// Deletes a deployment record by ID.
/// Deletes a deployment record by ID, enforcing optimistic concurrency against the
/// supplied <paramref name="expectedRowVersion"/>. The caller MUST pass the
/// <c>RowVersion</c> it last observed on the record so EF emits
/// <c>DELETE ... WHERE Id = @id AND RowVersion = @prior</c>. A concurrent edit
/// surfaces as <see cref="Microsoft.EntityFrameworkCore.DbUpdateConcurrencyException"/>
/// on <see cref="SaveChangesAsync(CancellationToken)"/>, matching the documented
/// "Optimistic concurrency is used on deployment status records" design rule.
/// </summary>
/// <param name="id">The deployment record ID to delete.</param>
/// <param name="expectedRowVersion">The RowVersion the caller observed; used as the optimistic-concurrency token.</param>
/// <param name="cancellationToken">A cancellation token that can be used to cancel the operation.</param>
/// <returns>A task representing the asynchronous operation.</returns>
Task DeleteDeploymentRecordAsync(int id, CancellationToken cancellationToken = default);
Task DeleteDeploymentRecordAsync(int id, byte[] expectedRowVersion, CancellationToken cancellationToken = default);
// SystemArtifactDeploymentRecord
/// <summary>
@@ -81,25 +81,23 @@ public record ExternalCallResult(
string? ErrorMessage,
bool WasBuffered = false)
{
private dynamic? _response;
private bool _responseParsed;
// Commons-021: thread-safe lazy parse — `Lazy<T>` with the default
// `LazyThreadSafetyMode.ExecutionAndPublication` guarantees that two
// concurrent readers see the same `DynamicJsonElement` instance, the
// `JsonDocument.Parse` runs at most once, and the published value is
// safe under .NET's memory model. The closure captures `ResponseJson`
// by reference to the property — the record's positional property is
// an init-only field set in the constructor, so the snapshot read at
// first-access time is stable for the lifetime of the result.
private readonly Lazy<dynamic?> _response = new(() =>
string.IsNullOrEmpty(ResponseJson)
? null
: new DynamicJsonElement(System.Text.Json.JsonDocument.Parse(ResponseJson).RootElement));
/// <summary>
/// Parsed response as a dynamic object. Returns null if ResponseJson is null or empty.
/// Access properties directly: result.Response.result, result.Response.items[0].name, etc.
/// Thread-safe: concurrent readers share a single parsed instance (Commons-021).
/// </summary>
public dynamic? Response
{
get
{
if (!_responseParsed)
{
_response = string.IsNullOrEmpty(ResponseJson)
? null
: new DynamicJsonElement(System.Text.Json.JsonDocument.Parse(ResponseJson).RootElement);
_responseParsed = true;
}
return _response;
}
}
public dynamic? Response => _response.Value;
}
@@ -1,24 +1,27 @@
namespace ScadaLink.Commons.Interfaces.Transport;
/// <summary>
/// Scoped service the bundle importer sets to thread a BundleImportId through to
/// the audit log entries emitted by the audited repository methods invoked during
/// Service the bundle importer sets to thread a BundleImportId through to the
/// audit log entries emitted by the audited repository methods invoked during
/// ApplyAsync. AuditService reads this and stamps every AuditLogEntry it writes.
/// <para>
/// Re-entrancy / thread-safety: mutating <see cref="BundleImportId"/> is NOT
/// thread-safe. The service is registered scoped, and the assumed usage is a
/// single Blazor Server circuit (or single API request) at a time — within that
/// scope <c>BundleImporter.ApplyAsync</c> (in the Transport component) is the
/// sole writer, and the audit service is the sole reader, in a strictly
/// sequential await chain. Callers that perform concurrent imports within a
/// shared scope (e.g. two <c>ApplyAsync</c> calls awaited via
/// <c>Task.WhenAll</c> on the same circuit) MUST serialize access externally —
/// there is no internal lock and the last writer wins, which would
/// cross-contaminate audit rows between imports.
/// Thread-safety / concurrency contract (Transport-009): the in-tree
/// implementation backs <see cref="BundleImportId"/> with an
/// <see cref="System.Threading.AsyncLocal{T}"/> so each logical asynchronous
/// call chain — every distinct <c>BundleImporter.ApplyAsync</c> invocation —
/// observes its own value, even when two imports share the same DI scope (e.g.
/// awaited via <c>Task.WhenAll</c> on a single Blazor circuit, or driven by a
/// misconfigured singleton registration). The value flows through every
/// <c>await</c> naturally; no cross-contamination of BundleImportIds between
/// concurrent imports.
/// </para>
/// <para>
/// Alternative implementations (e.g. ambient-context-free explicit-parameter
/// threading) MUST preserve the same per-call-context isolation guarantee.
/// </para>
/// </summary>
public interface IAuditCorrelationContext
{
/// <summary>Gets or sets the bundle import id used to correlate audit rows written during a bundle apply operation.</summary>
/// <summary>Gets or sets the bundle import id used to correlate audit rows written during a bundle apply operation. Implementations MUST isolate the value per-logical-call-context to prevent concurrent imports from cross-contaminating audit rows.</summary>
Guid? BundleImportId { get; set; }
}
@@ -81,17 +81,30 @@ public class DeploymentManagerRepository : IDeploymentManagerRepository
}
/// <inheritdoc />
public Task DeleteDeploymentRecordAsync(int id, CancellationToken cancellationToken = default)
public Task DeleteDeploymentRecordAsync(int id, byte[] expectedRowVersion, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(expectedRowVersion);
// CD-017: DeploymentRecord carries a SQL Server rowversion concurrency token.
// The stub-attach delete path must seed EF's OriginalValues["RowVersion"] with
// the caller's last-observed value so the generated SQL becomes
// `DELETE ... WHERE Id = @id AND RowVersion = @prior`. Without this seeding a
// concurrent edit is silently overwritten; with it, EF raises
// DbUpdateConcurrencyException on SaveChangesAsync — the documented
// optimistic-concurrency contract on deployment status records.
var record = _dbContext.DeploymentRecords.Local.FirstOrDefault(d => d.Id == id);
if (record != null)
{
var entry = _dbContext.Entry(record);
entry.OriginalValues["RowVersion"] = expectedRowVersion;
_dbContext.DeploymentRecords.Remove(record);
}
else
{
var stub = new DeploymentRecord("stub", "stub") { Id = id };
_dbContext.DeploymentRecords.Attach(stub);
var entry = _dbContext.Entry(stub);
entry.OriginalValues["RowVersion"] = expectedRowVersion;
_dbContext.DeploymentRecords.Remove(stub);
}
return Task.CompletedTask;
@@ -3,13 +3,34 @@ using ScadaLink.Commons.Interfaces.Transport;
namespace ScadaLink.ConfigurationDatabase.Services;
/// <summary>
/// Per-scope mutable holder for the active bundle import id. AuditService reads it
/// while writing AuditLogEntry rows. Registered as Scoped so each Blazor circuit /
/// request gets its own value; ApplyAsync explicitly creates a service scope and
/// sets the id at the top of the transaction.
/// Holder for the active bundle import id, backed by an <see cref="AsyncLocal{T}"/>
/// so each logical asynchronous call chain observes its own value. AuditService
/// reads it while writing AuditLogEntry rows.
/// <para>
/// Thread-safety / concurrency contract (Transport-009): the previous Scoped
/// instance with a plain auto-property mutated by <c>BundleImporter.ApplyAsync</c>
/// was vulnerable to cross-contamination if two imports ran concurrently inside
/// a shared DI scope — either via <c>Task.WhenAll</c> on a single Blazor circuit
/// or via a misconfigured singleton registration. Backing the property with
/// <see cref="AsyncLocal{T}"/> means every fresh logical-call-context — every
/// distinct <c>ApplyAsync</c> invocation, even ones sharing the same DI scope —
/// gets its own independent value, and the value flows naturally through every
/// <c>await</c> in the chain. Concurrent imports no longer leak BundleImportIds
/// across audit rows.
/// </para>
/// <para>
/// The class is still registered as Scoped so injection works with the existing
/// DI graph, but its in-memory state is per-call-context regardless of lifetime.
/// </para>
/// </summary>
public sealed class AuditCorrelationContext : IAuditCorrelationContext
{
private static readonly AsyncLocal<Guid?> _bundleImportId = new();
/// <inheritdoc />
public Guid? BundleImportId { get; set; }
public Guid? BundleImportId
{
get => _bundleImportId.Value;
set => _bundleImportId.Value = value;
}
}
+69 -1
View File
@@ -42,6 +42,21 @@ public class AkkaHostedService : IHostedService
/// </summary>
private readonly List<IDisposable> _trackedDisposables = new();
/// <summary>
/// NotificationService-020 guard: sentinel that flips to <c>true</c> the
/// first time a Notification-category S&amp;F delivery handler is registered
/// on this hosted service instance. <see cref="StoreAndForwardService.RegisterDeliveryHandler"/>
/// is last-write-wins on category, so a future code change that introduces
/// a second registration path (e.g. a role-branch + helper that both call
/// the registration) would silently overwrite the canonical
/// <c>NotificationForwarder</c> handler with whatever the loser registers —
/// the prior NS-001 fix did exactly this, and was silently superseded
/// when the central-only redesign moved delivery to <c>NotificationOutbox</c>.
/// This sentinel makes the duplicate noisy at startup so a maintainer
/// re-introducing the second path sees it immediately.
/// </summary>
private bool _notificationDeliveryHandlerRegistered;
/// <summary>
/// Initializes a new instance of the <see cref="AkkaHostedService"/> class.
/// </summary>
@@ -460,7 +475,42 @@ akka {{
terminationMessage: PoisonPill.Instance,
settings: ClusterSingletonManagerSettings.Create(_actorSystem!)
.WithSingletonName("site-call-audit"));
_actorSystem!.ActorOf(siteCallAuditSingletonProps, "site-call-audit-singleton");
var siteCallAuditSingletonManager =
_actorSystem!.ActorOf(siteCallAuditSingletonProps, "site-call-audit-singleton");
// SiteCallAudit-002 graceful-handover hook. The default singleton handover
// path waits for the actor's `ReceiveAsync` task to complete before
// signalling `HandOverDone` to the new oldest node — so an in-flight
// EF `UpsertAsync` IS waited for during a *clean* coordinated shutdown
// (the cluster-leave phase below fires before the singleton terminates).
// The risk the finding tracks is the seam between in-flight async work
// and the cluster-leave + singleton-stop sequence: we bound it by
// issuing an explicit `GracefulStop` to the singleton manager early
// in `cluster-leave`, with a timeout that lets the running upsert + SQL
// round-trip drain before the handover-to-other-node race window
// opens. The timeout is bounded so a misbehaving upsert cannot stall
// coordinated shutdown indefinitely — exceeding it falls through to
// the existing PoisonPill termination path. Same pattern is suitable
// for the NotificationOutbox singleton; not added here to keep this
// change minimal (out of NS-020's scope).
var siteCallAuditShutdown = Akka.Actor.CoordinatedShutdown.Get(_actorSystem);
siteCallAuditShutdown.AddTask(
Akka.Actor.CoordinatedShutdown.PhaseClusterLeave,
"drain-site-call-audit-singleton",
async () =>
{
try
{
await siteCallAuditSingletonManager.GracefulStop(TimeSpan.FromSeconds(10));
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"SiteCallAudit singleton did not drain within the graceful-stop "
+ "timeout; falling through to PoisonPill handover");
}
return Akka.Done.Instance;
});
var siteCallAuditProxyProps = ClusterSingletonProxy.Props(
singletonManagerPath: "/user/site-call-audit-singleton",
@@ -651,6 +701,23 @@ akka {{
// cluster via the SiteCommunicationActor and treating central's
// NotificationSubmitAck as the outcome (accepted → delivered; not accepted
// or timeout → throw → transient → keep buffering). Central owns SMTP.
//
// NotificationService-020: register exactly once. The sentinel guard
// catches a second registration path that re-introduces the dead
// NS-001 site-SMTP handler — see the sentinel's XML doc above for the
// historical context. Throwing here is intentional: a silent overwrite
// by a future maintainer would invert the design back to site-side
// delivery (NotificationForwarder vs. NotificationDeliveryService).
if (_notificationDeliveryHandlerRegistered)
{
throw new InvalidOperationException(
"NotificationService-020: A Notification-category store-and-forward "
+ "delivery handler was already registered. The canonical handler is "
+ "NotificationForwarder (central-only delivery, post-redesign). "
+ "If you are re-introducing a second registration path, remove the "
+ "first one — RegisterDeliveryHandler is last-write-wins per category "
+ "and a duplicate inverts the design.");
}
var notificationForwarder = new ScadaLink.StoreAndForward.NotificationForwarder(
siteCommActor,
_nodeOptions.SiteId!,
@@ -658,6 +725,7 @@ akka {{
storeAndForwardService.RegisterDeliveryHandler(
ScadaLink.Commons.Types.Enums.StoreAndForwardCategory.Notification,
notificationForwarder.DeliverAsync);
_notificationDeliveryHandlerRegistered = true;
_logger.LogInformation(
"Store-and-forward delivery handlers registered (ExternalSystem, CachedDbWrite, Notification)");