fix(scadabridge): queue-depth seed uses Add (no lost concurrent enqueue) + clarify registration/discard comments

This commit is contained in:
Joseph Doherty
2026-06-01 17:07:03 -04:00
parent 782fb73015
commit 15a626390b
2 changed files with 61 additions and 6 deletions
@@ -123,10 +123,21 @@ public class StoreAndForwardService
private long _bufferedCount;
/// <summary>
/// WP-14 (telemetry): guards one-time registration of the queue-depth provider
/// with <see cref="ScadaBridgeTelemetry"/>. The gauge is process-global, so only
/// the first <see cref="StartAsync"/> wins; registering per message (or per start
/// of multiple service instances) is avoided. 0 = not yet registered, 1 = done.
/// Test seam (WP-14 telemetry): simulates a concurrent pre-seed
/// <see cref="BufferAsync"/> increment landing on <see cref="_bufferedCount"/>
/// before <see cref="StartAsync"/> seeds it, so a test can prove the seed uses
/// <see cref="Interlocked.Add"/> (additive) rather than Exchange (clobbering).
/// </summary>
internal void TestOnly_IncrementBufferedCount() =>
Interlocked.Increment(ref _bufferedCount);
/// <summary>
/// WP-14 (telemetry): an instance field that guards against a single instance
/// registering the queue-depth provider (and re-seeding the counter) more than
/// once — e.g. a second <see cref="StartAsync"/> on the same instance. It does NOT
/// coordinate across instances: the gauge slot in <see cref="ScadaBridgeTelemetry"/>
/// is process-global, so in a multi-instance process the last <see cref="StartAsync"/>
/// wins the global slot. 0 = not yet registered, 1 = done.
/// </summary>
private int _queueDepthProviderRegistered;
@@ -206,12 +217,20 @@ public class StoreAndForwardService
// WP-14 (telemetry): seed the cached buffered-message count from the
// store exactly once (the gauge callback cannot run an async COUNT), then
// register the sync, non-blocking provider with the process-global
// ScadaBridgeTelemetry gauge — guarded so only the first start registers.
// ScadaBridgeTelemetry gauge. Both steps are inside the one-time guard so a
// second StartAsync on the same instance cannot double-seed.
//
// The seed is an Interlocked.Add — NOT an Exchange — to avoid a startup race:
// between the await above returning and this point, a concurrent BufferAsync
// could already have Interlocked.Increment'd _bufferedCount. Exchange would
// clobber that increment (losing a +1); Add preserves it. _bufferedCount
// starts at 0 and only BufferAsync increments it before the seed, so
// 0 + pending + (any concurrent increments) is the correct live count.
var pending = await _storage.GetMessageCountByStatusAsync(
StoreAndForwardMessageStatus.Pending);
Interlocked.Exchange(ref _bufferedCount, pending);
if (Interlocked.CompareExchange(ref _queueDepthProviderRegistered, 1, 0) == 0)
{
Interlocked.Add(ref _bufferedCount, pending);
ScadaBridgeTelemetry.SetQueueDepthProvider(() => Interlocked.Read(ref _bufferedCount));
}
@@ -831,6 +850,7 @@ public class StoreAndForwardService
{
// Capture the category before the row is deleted so the activity log is
// labelled correctly.
// WP-14 (telemetry): Parked rows are not in _bufferedCount; discarding a Parked row needs no counter adjustment.
var message = await _storage.GetMessageByIdAsync(messageId);
var success = await _storage.DiscardParkedMessageAsync(messageId);
if (success)
@@ -173,4 +173,39 @@ public class QueueDepthGaugeTests : IAsyncLifetime, IDisposable
await fresh.StopAsync();
}
}
/// <summary>
/// Review finding (FINDING 1): the startup seed must ADD to whatever the counter
/// already holds, not overwrite it. A concurrent <c>BufferAsync</c> can
/// <c>Interlocked.Increment</c> <c>_bufferedCount</c> in the window between
/// <c>StartAsync</c>'s async <c>COUNT(*)</c> returning and the seed running; with an
/// <c>Interlocked.Exchange</c> seed that increment would be clobbered (lost +1). This
/// pre-increments the in-memory counter (standing in for that concurrent enqueue),
/// then starts the service over an empty store and asserts the pre-increment survives.
/// </summary>
[Fact]
public async Task Gauge_SeedAddsToConcurrentPreSeedIncrement_NotClobber()
{
// Store is empty (StartAsync's pending COUNT(*) = 0), so the only contribution
// is the simulated concurrent pre-seed enqueue increment.
var fresh = new StoreAndForwardService(
_storage,
new StoreAndForwardOptions { RetryTimerInterval = TimeSpan.FromMinutes(10) },
NullLogger<StoreAndForwardService>.Instance);
// Stand in for a BufferAsync increment that landed before StartAsync seeded.
fresh.TestOnly_IncrementBufferedCount();
try
{
await fresh.StartAsync();
// Add(0 seed) over the pre-existing +1 → 1. An Exchange(0) seed would clobber
// it to 0, losing the concurrent enqueue — the bug this fix prevents.
Assert.Equal(1, ReadQueueDepthGauge());
}
finally
{
await fresh.StopAsync();
}
}
}