diff --git a/src/ZB.MOM.WW.ScadaBridge.StoreAndForward/StoreAndForwardService.cs b/src/ZB.MOM.WW.ScadaBridge.StoreAndForward/StoreAndForwardService.cs index b00ddba2..8286ffd4 100644 --- a/src/ZB.MOM.WW.ScadaBridge.StoreAndForward/StoreAndForwardService.cs +++ b/src/ZB.MOM.WW.ScadaBridge.StoreAndForward/StoreAndForwardService.cs @@ -1,5 +1,6 @@ using Microsoft.Extensions.Logging; using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services; +using ZB.MOM.WW.ScadaBridge.Commons.Observability; using ZB.MOM.WW.ScadaBridge.Commons.Types; using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; @@ -98,6 +99,37 @@ public class StoreAndForwardService /// private static readonly TimeSpan SweepShutdownWaitTimeout = TimeSpan.FromSeconds(10); + /// + /// WP-14 (telemetry): cached count of messages currently buffered for + /// forwarding — i.e. rows in , + /// the live store-and-forward queue waiting to be delivered. This backs the + /// scadabridge.store_and_forward.queue.depth observable gauge. + /// + /// The gauge's collection callback is synchronous and is invoked frequently by + /// the OpenTelemetry/Prometheus collector, so it must never run an async SQLite + /// COUNT(*). Instead this is seeded once from storage + /// in and then adjusted in-process on the existing + /// paths that change the Pending population: (+1), + /// successful-retry removal and Pending→Parked transitions in + /// (-1), and operator requeue in + /// (+1). The provider registered with + /// reads it via + /// — non-blocking and sync-safe. It is an + /// approximate, eventually-consistent gauge (concurrent failover replication + /// applies to the standby's own store, not this counter), which is exactly + /// what a queue-depth metric needs. + /// + /// + private long _bufferedCount; + + /// + /// WP-14 (telemetry): guards one-time registration of the queue-depth provider + /// with . The gauge is process-global, so only + /// the first wins; registering per message (or per start + /// of multiple service instances) is avoided. 0 = not yet registered, 1 = done. + /// + private int _queueDepthProviderRegistered; + /// /// WP-10: Delivery handler delegate. The return value / exception is interpreted /// the same way on both the immediate-delivery path () @@ -170,6 +202,19 @@ public class StoreAndForwardService public async Task StartAsync() { await _storage.InitializeAsync(); + + // WP-14 (telemetry): seed the cached buffered-message count from the + // store exactly once (the gauge callback cannot run an async COUNT), then + // register the sync, non-blocking provider with the process-global + // ScadaBridgeTelemetry gauge — guarded so only the first start registers. + var pending = await _storage.GetMessageCountByStatusAsync( + StoreAndForwardMessageStatus.Pending); + Interlocked.Exchange(ref _bufferedCount, pending); + if (Interlocked.CompareExchange(ref _queueDepthProviderRegistered, 1, 0) == 0) + { + ScadaBridgeTelemetry.SetQueueDepthProvider(() => Interlocked.Read(ref _bufferedCount)); + } + _retryTimer = new Timer( // StoreAndForward-024: capture the sweep Task on each tick so // StopAsync can await any in-flight invocation before the host @@ -396,6 +441,10 @@ public class StoreAndForwardService { await _storage.EnqueueAsync(message); _replication?.ReplicateEnqueue(message); + // WP-14 (telemetry): a freshly buffered row is Pending → grows the live + // queue depth. Bumped after the durable write so the gauge never leads the + // store. + Interlocked.Increment(ref _bufferedCount); } /// @@ -452,6 +501,8 @@ public class StoreAndForwardService { await _storage.RemoveMessageAsync(message.Id); _replication?.ReplicateRemove(message.Id); + // WP-14 (telemetry): a delivered row leaves the Pending queue. + Interlocked.Decrement(ref _bufferedCount); RaiseActivity("Delivered", message.Category, $"Delivered to {message.Target} after {message.RetryCount} retries"); @@ -483,6 +534,9 @@ public class StoreAndForwardService message.Id); return; } + // WP-14 (telemetry): the row committed Pending→Parked, leaving the live + // forward queue. Only counted when the conditional update actually won. + Interlocked.Decrement(ref _bufferedCount); _replication?.ReplicatePark(message); RaiseActivity("Parked", message.Category, $"Permanent failure for {message.Target}: handler returned false"); @@ -519,6 +573,9 @@ public class StoreAndForwardService message.Id); return; } + // WP-14 (telemetry): the row committed Pending→Parked, leaving the + // live forward queue. Only counted when the conditional update won. + Interlocked.Decrement(ref _bufferedCount); _replication?.ReplicatePark(message); RaiseActivity("Parked", message.Category, $"Max retries ({message.MaxRetries}) reached for {message.Target}"); @@ -737,6 +794,11 @@ public class StoreAndForwardService return false; } + // WP-14 (telemetry): an operator requeue moves Parked→Pending, re-adding the + // row to the live forward queue. Counted only when the conditional storage + // update actually flipped the row. + Interlocked.Increment(ref _bufferedCount); + // The active node just rewrote this row to Pending with retry_count = 0 // and cleared last_error / last_attempt_at (see // StoreAndForwardStorage.RetryParkedMessageAsync). Reconstruct the diff --git a/tests/ZB.MOM.WW.ScadaBridge.StoreAndForward.Tests/QueueDepthGaugeTests.cs b/tests/ZB.MOM.WW.ScadaBridge.StoreAndForward.Tests/QueueDepthGaugeTests.cs new file mode 100644 index 00000000..7a4a6e3e --- /dev/null +++ b/tests/ZB.MOM.WW.ScadaBridge.StoreAndForward.Tests/QueueDepthGaugeTests.cs @@ -0,0 +1,176 @@ +using System.Diagnostics.Metrics; +using Microsoft.Data.Sqlite; +using Microsoft.Extensions.Logging.Abstractions; +using ZB.MOM.WW.ScadaBridge.Commons.Observability; +using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; + +namespace ZB.MOM.WW.ScadaBridge.StoreAndForward.Tests; + +/// +/// WP-14 (telemetry follow-on): verifies the cached buffered-message counter that +/// backs the scadabridge.store_and_forward.queue.depth observable gauge tracks +/// the live (Pending) queue across the existing enqueue / drain / park / requeue paths, +/// and that the sync gauge callback reports it. +/// +/// The gauge is read the way the OpenTelemetry collector reads it — via a +/// that forces an observation (the callback is synchronous +/// and does no I/O, which is the whole point of caching the count). +/// seeds the counter from storage and registers the provider against this service +/// instance, so the gauge resolves to this test's counter. +/// +public class QueueDepthGaugeTests : IAsyncLifetime, IDisposable +{ + private readonly SqliteConnection _keepAlive; + private readonly StoreAndForwardStorage _storage; + private readonly StoreAndForwardService _service; + + public QueueDepthGaugeTests() + { + var dbName = $"QueueDepthTests_{Guid.NewGuid():N}"; + var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared"; + _keepAlive = new SqliteConnection(connStr); + _keepAlive.Open(); + + _storage = new StoreAndForwardStorage(connStr, NullLogger.Instance); + + var options = new StoreAndForwardOptions + { + DefaultRetryInterval = TimeSpan.Zero, + DefaultMaxRetries = 3, + // Long interval so no background sweep fires on its own during the test; + // sweeps are driven explicitly via RetryPendingMessagesAsync. + RetryTimerInterval = TimeSpan.FromMinutes(10) + }; + + _service = new StoreAndForwardService( + _storage, options, NullLogger.Instance); + } + + public async Task InitializeAsync() + { + await _storage.InitializeAsync(); + // StartAsync seeds _bufferedCount from the (empty) store and registers the + // queue-depth provider against this service instance. + await _service.StartAsync(); + } + + public async Task DisposeAsync() => await _service.StopAsync(); + + public void Dispose() => _keepAlive.Dispose(); + + /// + /// Reads the current value of the scadabridge.store_and_forward.queue.depth + /// gauge by forcing a synchronous observation through a transient MeterListener — + /// exactly the path the Prometheus/OTLP collector exercises on each scrape. + /// + private static long ReadQueueDepthGauge() + { + long observed = -1; + using var listener = new MeterListener + { + InstrumentPublished = (instrument, l) => + { + if (instrument.Meter.Name == ScadaBridgeTelemetry.MeterName && + instrument.Name == "scadabridge.store_and_forward.queue.depth") + { + l.EnableMeasurementEvents(instrument); + } + } + }; + listener.SetMeasurementEventCallback((_, measurement, _, _) => observed = measurement); + listener.Start(); + listener.RecordObservableInstruments(); + return observed; + } + + [Fact] + public async Task Gauge_TracksBufferedDepth_AcrossEnqueueDrainAndPark() + { + // Empty store seeded at StartAsync → gauge reports 0. + Assert.Equal(0, ReadQueueDepthGauge()); + + // A handler that fails transiently so each enqueue buffers a Pending row + // (immediate attempt 0 throws → BufferAsync → +1). + var deliver = false; + _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, + _ => + { + if (!deliver) throw new HttpRequestException("transient"); + return Task.FromResult(true); + }); + + // Enqueue 3 → cached depth = 3 → gauge reports 3. + for (var i = 0; i < 3; i++) + { + var r = await _service.EnqueueAsync( + StoreAndForwardCategory.ExternalSystem, "api", """{}"""); + Assert.True(r.WasBuffered); + } + Assert.Equal(3, ReadQueueDepthGauge()); + + // Drain: handler now succeeds → the retry sweep removes all 3 Pending rows → depth 0. + deliver = true; + await _service.RetryPendingMessagesAsync(); + Assert.Equal(0, ReadQueueDepthGauge()); + + // Park path: buffer one more, then make it park (maxRetries:1 parks after one + // sweep). Pending→Parked leaves the live queue → depth back to 0. + deliver = false; + var parkResult = await _service.EnqueueAsync( + StoreAndForwardCategory.ExternalSystem, "api", """{}""", maxRetries: 1); + Assert.True(parkResult.WasBuffered); + Assert.Equal(1, ReadQueueDepthGauge()); + + await _service.RetryPendingMessagesAsync(); + var parked = await _storage.GetMessageByIdAsync(parkResult.MessageId); + Assert.Equal(StoreAndForwardMessageStatus.Parked, parked!.Status); + Assert.Equal(0, ReadQueueDepthGauge()); + + // Operator requeue: Parked→Pending re-adds to the live queue → depth 1. + Assert.True(await _service.RetryParkedMessageAsync(parkResult.MessageId)); + Assert.Equal(1, ReadQueueDepthGauge()); + } + + [Fact] + public async Task Gauge_SeedsFromExistingPendingRows_OnStart() + { + // Pre-seed two Pending rows directly in storage *before* a fresh service starts, + // simulating a process restart over a non-empty buffer. StartAsync must seed the + // cached counter from the store so the gauge does not under-report on restart. + await _storage.EnqueueAsync(new StoreAndForwardMessage + { + Id = Guid.NewGuid().ToString("N"), + Category = StoreAndForwardCategory.ExternalSystem, + Target = "api", + PayloadJson = "{}", + Status = StoreAndForwardMessageStatus.Pending, + CreatedAt = DateTimeOffset.UtcNow, + MaxRetries = 3 + }); + await _storage.EnqueueAsync(new StoreAndForwardMessage + { + Id = Guid.NewGuid().ToString("N"), + Category = StoreAndForwardCategory.Notification, + Target = "list", + PayloadJson = "{}", + Status = StoreAndForwardMessageStatus.Pending, + CreatedAt = DateTimeOffset.UtcNow, + MaxRetries = 3 + }); + + var fresh = new StoreAndForwardService( + _storage, + new StoreAndForwardOptions { RetryTimerInterval = TimeSpan.FromMinutes(10) }, + NullLogger.Instance); + try + { + await fresh.StartAsync(); + // The fresh service registered itself as the global provider and seeded 2. + Assert.Equal(2, ReadQueueDepthGauge()); + } + finally + { + await fresh.StopAsync(); + } + } +}