diff --git a/src/ZB.MOM.WW.ScadaBridge.StoreAndForward/StoreAndForwardService.cs b/src/ZB.MOM.WW.ScadaBridge.StoreAndForward/StoreAndForwardService.cs index 8286ffd4..36cf4702 100644 --- a/src/ZB.MOM.WW.ScadaBridge.StoreAndForward/StoreAndForwardService.cs +++ b/src/ZB.MOM.WW.ScadaBridge.StoreAndForward/StoreAndForwardService.cs @@ -123,10 +123,21 @@ public class StoreAndForwardService private long _bufferedCount; /// - /// WP-14 (telemetry): guards one-time registration of the queue-depth provider - /// with . The gauge is process-global, so only - /// the first wins; registering per message (or per start - /// of multiple service instances) is avoided. 0 = not yet registered, 1 = done. + /// Test seam (WP-14 telemetry): simulates a concurrent pre-seed + /// increment landing on + /// before seeds it, so a test can prove the seed uses + /// (additive) rather than Exchange (clobbering). + /// + internal void TestOnly_IncrementBufferedCount() => + Interlocked.Increment(ref _bufferedCount); + + /// + /// WP-14 (telemetry): an instance field that guards against a single instance + /// registering the queue-depth provider (and re-seeding the counter) more than + /// once — e.g. a second on the same instance. It does NOT + /// coordinate across instances: the gauge slot in + /// is process-global, so in a multi-instance process the last + /// wins the global slot. 0 = not yet registered, 1 = done. /// private int _queueDepthProviderRegistered; @@ -206,12 +217,20 @@ public class StoreAndForwardService // WP-14 (telemetry): seed the cached buffered-message count from the // store exactly once (the gauge callback cannot run an async COUNT), then // register the sync, non-blocking provider with the process-global - // ScadaBridgeTelemetry gauge — guarded so only the first start registers. + // ScadaBridgeTelemetry gauge. Both steps are inside the one-time guard so a + // second StartAsync on the same instance cannot double-seed. + // + // The seed is an Interlocked.Add — NOT an Exchange — to avoid a startup race: + // between the await above returning and this point, a concurrent BufferAsync + // could already have Interlocked.Increment'd _bufferedCount. Exchange would + // clobber that increment (losing a +1); Add preserves it. _bufferedCount + // starts at 0 and only BufferAsync increments it before the seed, so + // 0 + pending + (any concurrent increments) is the correct live count. var pending = await _storage.GetMessageCountByStatusAsync( StoreAndForwardMessageStatus.Pending); - Interlocked.Exchange(ref _bufferedCount, pending); if (Interlocked.CompareExchange(ref _queueDepthProviderRegistered, 1, 0) == 0) { + Interlocked.Add(ref _bufferedCount, pending); ScadaBridgeTelemetry.SetQueueDepthProvider(() => Interlocked.Read(ref _bufferedCount)); } @@ -831,6 +850,7 @@ public class StoreAndForwardService { // Capture the category before the row is deleted so the activity log is // labelled correctly. + // WP-14 (telemetry): Parked rows are not in _bufferedCount; discarding a Parked row needs no counter adjustment. var message = await _storage.GetMessageByIdAsync(messageId); var success = await _storage.DiscardParkedMessageAsync(messageId); if (success) diff --git a/tests/ZB.MOM.WW.ScadaBridge.StoreAndForward.Tests/QueueDepthGaugeTests.cs b/tests/ZB.MOM.WW.ScadaBridge.StoreAndForward.Tests/QueueDepthGaugeTests.cs index 7a4a6e3e..a09a4b39 100644 --- a/tests/ZB.MOM.WW.ScadaBridge.StoreAndForward.Tests/QueueDepthGaugeTests.cs +++ b/tests/ZB.MOM.WW.ScadaBridge.StoreAndForward.Tests/QueueDepthGaugeTests.cs @@ -173,4 +173,39 @@ public class QueueDepthGaugeTests : IAsyncLifetime, IDisposable await fresh.StopAsync(); } } + + /// + /// Review finding (FINDING 1): the startup seed must ADD to whatever the counter + /// already holds, not overwrite it. A concurrent BufferAsync can + /// Interlocked.Increment _bufferedCount in the window between + /// StartAsync's async COUNT(*) returning and the seed running; with an + /// Interlocked.Exchange seed that increment would be clobbered (lost +1). This + /// pre-increments the in-memory counter (standing in for that concurrent enqueue), + /// then starts the service over an empty store and asserts the pre-increment survives. + /// + [Fact] + public async Task Gauge_SeedAddsToConcurrentPreSeedIncrement_NotClobber() + { + // Store is empty (StartAsync's pending COUNT(*) = 0), so the only contribution + // is the simulated concurrent pre-seed enqueue increment. + var fresh = new StoreAndForwardService( + _storage, + new StoreAndForwardOptions { RetryTimerInterval = TimeSpan.FromMinutes(10) }, + NullLogger.Instance); + + // Stand in for a BufferAsync increment that landed before StartAsync seeded. + fresh.TestOnly_IncrementBufferedCount(); + + try + { + await fresh.StartAsync(); + // Add(0 seed) over the pre-existing +1 → 1. An Exchange(0) seed would clobber + // it to 0, losing the concurrent enqueue — the bug this fix prevents. + Assert.Equal(1, ReadQueueDepthGauge()); + } + finally + { + await fresh.StopAsync(); + } + } }