fix(concurrency): close 8 race / thread-safety findings across CD, DCL, SR

CD-015: rewrite NotificationOutboxRepository.InsertIfNotExistsAsync as raw-SQL
IF NOT EXISTS … INSERT with SqlException 2601/2627 catch, ending the
at-least-once livelock on the site→central notification handoff.

DCL-018/019/020/021/022: add _subscribesInFlight guard so concurrent
same-tag subscribes don't orphan an adapter handle; delete the latent
dead _subscriptionHandles dictionary; stop double-counting
_totalSubscribed when an unresolved tag is promoted via another instance;
release adapter handles on mid-flight unsubscribe; gate the
tag-resolution retry timer with IsTimerActive so subscribe bursts don't
reset it into starvation.

SR-020: add _terminatingActorsByName shadow so a third deploy arriving
during a pending redeploy doesn't crash on InvalidActorNameException —
displaced senders get a Failed/superseded response and the latest
command wins on Terminated.

SR-024: split OperationTrackingStore reads from writes (fresh
SqliteConnection per GetStatusAsync) so long writes don't block status
queries; rewrite Dispose to drop the sync-over-async bridge that could
deadlock on a non-reentrant SyncContext; Interlocked.Exchange makes the
dispose-once flag race-safe across both paths.
This commit is contained in:
Joseph Doherty
2026-05-28 05:20:13 -04:00
parent 5d2386cc9d
commit f936f55f51
15 changed files with 1152 additions and 170 deletions
@@ -36,10 +36,20 @@ namespace ScadaLink.SiteRuntime.Tracking;
/// </remarks>
public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable, IDisposable
{
private readonly SqliteConnection _connection;
private readonly SemaphoreSlim _gate = new(1, 1);
// SiteRuntime-024: writer state — one owned SqliteConnection serialised behind
// _writeGate. Readers do NOT share this connection or gate; see GetStatusAsync.
private readonly SqliteConnection _writeConnection;
private readonly SemaphoreSlim _writeGate = new(1, 1);
private readonly string _connectionString;
private readonly ILogger<OperationTrackingStore> _logger;
private bool _disposed;
// SiteRuntime-024: dispose-once state shared by the sync Dispose and async
// DisposeAsync paths. Interlocked.Exchange is the race-safe primitive here —
// a plain bool can be flipped twice if Dispose() and DisposeAsync() are
// invoked concurrently (e.g. host shutdown bridging both). 0 = live,
// 1 = disposed. Read by other methods via Volatile.Read after the gate is
// taken; they raise ObjectDisposedException when set.
private int _disposeState;
/// <summary>
/// Initializes the tracking store, opens the SQLite connection, and applies the schema.
@@ -54,14 +64,15 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable,
ArgumentNullException.ThrowIfNull(logger);
_logger = logger;
_connection = new SqliteConnection(options.Value.ConnectionString);
_connection.Open();
_connectionString = options.Value.ConnectionString;
_writeConnection = new SqliteConnection(_connectionString);
_writeConnection.Open();
InitializeSchema();
}
private void InitializeSchema()
{
using var cmd = _connection.CreateCommand();
using var cmd = _writeConnection.CreateCommand();
cmd.CommandText = """
CREATE TABLE IF NOT EXISTS OperationTracking (
TrackedOperationId TEXT NOT NULL PRIMARY KEY,
@@ -112,7 +123,7 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable,
/// </summary>
private void AddColumnIfMissing(string columnName, string columnDefinition)
{
using var probe = _connection.CreateCommand();
using var probe = _writeConnection.CreateCommand();
probe.CommandText = "SELECT COUNT(*) FROM pragma_table_info('OperationTracking') WHERE name = $name";
probe.Parameters.AddWithValue("$name", columnName);
var exists = Convert.ToInt32(probe.ExecuteScalar()) > 0;
@@ -121,7 +132,7 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable,
return;
}
using var alter = _connection.CreateCommand();
using var alter = _writeConnection.CreateCommand();
// Column name + definition are caller-controlled constants, never user
// input — safe to interpolate (parameters are not permitted in DDL).
alter.CommandText = $"ALTER TABLE OperationTracking ADD COLUMN {columnName} {columnDefinition}";
@@ -140,14 +151,14 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable,
{
ArgumentNullException.ThrowIfNull(kind);
await _gate.WaitAsync(ct).ConfigureAwait(false);
await _writeGate.WaitAsync(ct).ConfigureAwait(false);
try
{
ObjectDisposedException.ThrowIf(_disposed, this);
ObjectDisposedException.ThrowIf(Volatile.Read(ref _disposeState) != 0, this);
var now = DateTime.UtcNow.ToString("o", CultureInfo.InvariantCulture);
using var cmd = _connection.CreateCommand();
using var cmd = _writeConnection.CreateCommand();
// INSERT OR IGNORE: duplicate ids are no-ops (first-write-wins) —
// matches the at-least-once semantics the site emits under.
cmd.CommandText = """
@@ -176,7 +187,7 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable,
}
finally
{
_gate.Release();
_writeGate.Release();
}
}
@@ -191,14 +202,14 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable,
{
ArgumentNullException.ThrowIfNull(status);
await _gate.WaitAsync(ct).ConfigureAwait(false);
await _writeGate.WaitAsync(ct).ConfigureAwait(false);
try
{
ObjectDisposedException.ThrowIf(_disposed, this);
ObjectDisposedException.ThrowIf(Volatile.Read(ref _disposeState) != 0, this);
var now = DateTime.UtcNow.ToString("o", CultureInfo.InvariantCulture);
using var cmd = _connection.CreateCommand();
using var cmd = _writeConnection.CreateCommand();
// Terminal rows are immutable — the WHERE clause filters them out so
// late-arriving attempt telemetry never overwrites a resolved row.
cmd.CommandText = """
@@ -222,7 +233,7 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable,
}
finally
{
_gate.Release();
_writeGate.Release();
}
}
@@ -236,14 +247,14 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable,
{
ArgumentNullException.ThrowIfNull(status);
await _gate.WaitAsync(ct).ConfigureAwait(false);
await _writeGate.WaitAsync(ct).ConfigureAwait(false);
try
{
ObjectDisposedException.ThrowIf(_disposed, this);
ObjectDisposedException.ThrowIf(Volatile.Read(ref _disposeState) != 0, this);
var now = DateTime.UtcNow.ToString("o", CultureInfo.InvariantCulture);
using var cmd = _connection.CreateCommand();
using var cmd = _writeConnection.CreateCommand();
// First-write-wins on the terminal flip: only update rows that
// haven't already terminated.
cmd.CommandText = """
@@ -266,7 +277,7 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable,
}
finally
{
_gate.Release();
_writeGate.Release();
}
}
@@ -275,47 +286,48 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable,
TrackedOperationId id,
CancellationToken ct = default)
{
await _gate.WaitAsync(ct).ConfigureAwait(false);
try
ObjectDisposedException.ThrowIf(Volatile.Read(ref _disposeState) != 0, this);
// SiteRuntime-024: reads open a fresh, ungated SqliteConnection so a
// long-running write doesn't block status queries. The connection
// string is shared with the writer; SQLite handles cross-connection
// isolation natively (a reader sees a consistent snapshot via the
// shared cache lock for in-memory DBs, or a WAL snapshot for file DBs).
// Mirrors the SiteStorageService precedent.
await using var readConnection = new SqliteConnection(_connectionString);
await readConnection.OpenAsync(ct).ConfigureAwait(false);
await using var cmd = readConnection.CreateCommand();
cmd.CommandText = """
SELECT TrackedOperationId, Kind, TargetSummary, Status,
RetryCount, LastError, HttpStatus,
CreatedAtUtc, UpdatedAtUtc, TerminalAtUtc,
SourceInstanceId, SourceScript, SourceNode
FROM OperationTracking
WHERE TrackedOperationId = $id;
""";
cmd.Parameters.AddWithValue("$id", id.ToString());
await using var reader = await cmd.ExecuteReaderAsync(ct).ConfigureAwait(false);
if (!await reader.ReadAsync(ct).ConfigureAwait(false))
{
ObjectDisposedException.ThrowIf(_disposed, this);
using var cmd = _connection.CreateCommand();
cmd.CommandText = """
SELECT TrackedOperationId, Kind, TargetSummary, Status,
RetryCount, LastError, HttpStatus,
CreatedAtUtc, UpdatedAtUtc, TerminalAtUtc,
SourceInstanceId, SourceScript, SourceNode
FROM OperationTracking
WHERE TrackedOperationId = $id;
""";
cmd.Parameters.AddWithValue("$id", id.ToString());
using var reader = cmd.ExecuteReader();
if (!reader.Read())
{
return null;
}
return new TrackingStatusSnapshot(
Id: TrackedOperationId.Parse(reader.GetString(0)),
Kind: reader.GetString(1),
TargetSummary: reader.IsDBNull(2) ? null : reader.GetString(2),
Status: reader.GetString(3),
RetryCount: reader.GetInt32(4),
LastError: reader.IsDBNull(5) ? null : reader.GetString(5),
HttpStatus: reader.IsDBNull(6) ? null : reader.GetInt32(6),
CreatedAtUtc: ParseUtc(reader.GetString(7)),
UpdatedAtUtc: ParseUtc(reader.GetString(8)),
TerminalAtUtc: reader.IsDBNull(9) ? null : ParseUtc(reader.GetString(9)),
SourceInstanceId: reader.IsDBNull(10) ? null : reader.GetString(10),
SourceScript: reader.IsDBNull(11) ? null : reader.GetString(11),
SourceNode: reader.IsDBNull(12) ? null : reader.GetString(12));
}
finally
{
_gate.Release();
return null;
}
return new TrackingStatusSnapshot(
Id: TrackedOperationId.Parse(reader.GetString(0)),
Kind: reader.GetString(1),
TargetSummary: reader.IsDBNull(2) ? null : reader.GetString(2),
Status: reader.GetString(3),
RetryCount: reader.GetInt32(4),
LastError: reader.IsDBNull(5) ? null : reader.GetString(5),
HttpStatus: reader.IsDBNull(6) ? null : reader.GetInt32(6),
CreatedAtUtc: ParseUtc(reader.GetString(7)),
UpdatedAtUtc: ParseUtc(reader.GetString(8)),
TerminalAtUtc: reader.IsDBNull(9) ? null : ParseUtc(reader.GetString(9)),
SourceInstanceId: reader.IsDBNull(10) ? null : reader.GetString(10),
SourceScript: reader.IsDBNull(11) ? null : reader.GetString(11),
SourceNode: reader.IsDBNull(12) ? null : reader.GetString(12));
}
/// <inheritdoc/>
@@ -323,12 +335,12 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable,
DateTime olderThanUtc,
CancellationToken ct = default)
{
await _gate.WaitAsync(ct).ConfigureAwait(false);
await _writeGate.WaitAsync(ct).ConfigureAwait(false);
try
{
ObjectDisposedException.ThrowIf(_disposed, this);
ObjectDisposedException.ThrowIf(Volatile.Read(ref _disposeState) != 0, this);
using var cmd = _connection.CreateCommand();
using var cmd = _writeConnection.CreateCommand();
// Non-terminal rows (TerminalAtUtc IS NULL) are kept regardless of
// age — the operation is still in flight.
cmd.CommandText = """
@@ -344,7 +356,7 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable,
}
finally
{
_gate.Release();
_writeGate.Release();
}
}
@@ -356,33 +368,68 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable,
DateTimeStyles.RoundtripKind);
}
/// <summary>Synchronously disposes the tracking store and its SQLite connection.</summary>
/// <summary>
/// Synchronously disposes the tracking store and its SQLite connection.
/// </summary>
/// <remarks>
/// SiteRuntime-024: this path does NOT bridge to async via
/// <c>.AsTask().GetAwaiter().GetResult()</c>. Sync-over-async on a SemaphoreSlim
/// can deadlock when invoked from a non-reentrant SyncContext (e.g. host
/// shutdown continuations observed on the host sync context). In-flight writes
/// at the moment of <see cref="Dispose"/> will fail their next operation
/// against the disposed connection with <see cref="ObjectDisposedException"/> —
/// the caller's responsibility is to ensure no concurrent operations during
/// the synchronous dispose. Use <see cref="DisposeAsync"/> if you need to
/// drain in-flight writes before close.
/// </remarks>
public void Dispose()
{
DisposeAsyncCore().AsTask().GetAwaiter().GetResult();
if (Interlocked.Exchange(ref _disposeState, 1) != 0)
{
return;
}
_writeConnection.Dispose();
_writeGate.Dispose();
GC.SuppressFinalize(this);
}
/// <summary>Asynchronously disposes the tracking store and its SQLite connection.</summary>
/// <summary>
/// Asynchronously disposes the tracking store and its SQLite connection.
/// Drains in-flight writes by acquiring the write gate before closing the
/// connection, so a write currently executing a SqliteCommand completes
/// before the connection is freed.
/// </summary>
public async ValueTask DisposeAsync()
{
await DisposeAsyncCore().ConfigureAwait(false);
GC.SuppressFinalize(this);
}
if (Interlocked.Exchange(ref _disposeState, 1) != 0)
{
return;
}
private async ValueTask DisposeAsyncCore()
{
await _gate.WaitAsync().ConfigureAwait(false);
// Drain any in-flight write by taking the write gate. Past this point
// no new write can acquire the gate because _disposeState is set, so
// the next ThrowIf check in each writer raises ObjectDisposedException.
try
{
if (_disposed) return;
_disposed = true;
_connection.Dispose();
await _writeGate.WaitAsync().ConfigureAwait(false);
}
catch (ObjectDisposedException)
{
// Race with another disposer that already disposed the gate — the
// _disposeState exchange above should prevent this, but be defensive.
}
try
{
_writeConnection.Dispose();
}
finally
{
_gate.Release();
_gate.Dispose();
try { _writeGate.Release(); } catch (ObjectDisposedException) { }
_writeGate.Dispose();
}
GC.SuppressFinalize(this);
}
}