using System.Diagnostics.Metrics; using Microsoft.Data.Sqlite; using Microsoft.Extensions.Logging.Abstractions; using ZB.MOM.WW.ScadaBridge.Commons.Observability; using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; namespace ZB.MOM.WW.ScadaBridge.StoreAndForward.Tests; /// /// WP-14 (telemetry follow-on): verifies the cached buffered-message counter that /// backs the scadabridge.store_and_forward.queue.depth observable gauge tracks /// the live (Pending) queue across the existing enqueue / drain / park / requeue paths, /// and that the sync gauge callback reports it. /// /// The gauge is read the way the OpenTelemetry collector reads it — via a /// that forces an observation (the callback is synchronous /// and does no I/O, which is the whole point of caching the count). /// seeds the counter from storage and registers the provider against this service /// instance, so the gauge resolves to this test's counter. /// public class QueueDepthGaugeTests : IAsyncLifetime, IDisposable { private readonly SqliteConnection _keepAlive; private readonly StoreAndForwardStorage _storage; private readonly StoreAndForwardService _service; public QueueDepthGaugeTests() { var dbName = $"QueueDepthTests_{Guid.NewGuid():N}"; var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared"; _keepAlive = new SqliteConnection(connStr); _keepAlive.Open(); _storage = new StoreAndForwardStorage(connStr, NullLogger.Instance); var options = new StoreAndForwardOptions { DefaultRetryInterval = TimeSpan.Zero, DefaultMaxRetries = 3, // Long interval so no background sweep fires on its own during the test; // sweeps are driven explicitly via RetryPendingMessagesAsync. RetryTimerInterval = TimeSpan.FromMinutes(10) }; _service = new StoreAndForwardService( _storage, options, NullLogger.Instance); } public async Task InitializeAsync() { await _storage.InitializeAsync(); // StartAsync seeds _bufferedCount from the (empty) store and registers the // queue-depth provider against this service instance. await _service.StartAsync(); } public async Task DisposeAsync() => await _service.StopAsync(); public void Dispose() => _keepAlive.Dispose(); /// /// Reads the current value of the scadabridge.store_and_forward.queue.depth /// gauge by forcing a synchronous observation through a transient MeterListener — /// exactly the path the Prometheus/OTLP collector exercises on each scrape. /// private static long ReadQueueDepthGauge() { long observed = -1; using var listener = new MeterListener { InstrumentPublished = (instrument, l) => { if (instrument.Meter.Name == ScadaBridgeTelemetry.MeterName && instrument.Name == "scadabridge.store_and_forward.queue.depth") { l.EnableMeasurementEvents(instrument); } } }; listener.SetMeasurementEventCallback((_, measurement, _, _) => observed = measurement); listener.Start(); listener.RecordObservableInstruments(); return observed; } [Fact] public async Task Gauge_TracksBufferedDepth_AcrossEnqueueDrainAndPark() { // Empty store seeded at StartAsync → gauge reports 0. Assert.Equal(0, ReadQueueDepthGauge()); // A handler that fails transiently so each enqueue buffers a Pending row // (immediate attempt 0 throws → BufferAsync → +1). var deliver = false; _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => { if (!deliver) throw new HttpRequestException("transient"); return Task.FromResult(true); }); // Enqueue 3 → cached depth = 3 → gauge reports 3. for (var i = 0; i < 3; i++) { var r = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api", """{}"""); Assert.True(r.WasBuffered); } Assert.Equal(3, ReadQueueDepthGauge()); // Drain: handler now succeeds → the retry sweep removes all 3 Pending rows → depth 0. deliver = true; await _service.RetryPendingMessagesAsync(); Assert.Equal(0, ReadQueueDepthGauge()); // Park path: buffer one more, then make it park (maxRetries:1 parks after one // sweep). Pending→Parked leaves the live queue → depth back to 0. deliver = false; var parkResult = await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "api", """{}""", maxRetries: 1); Assert.True(parkResult.WasBuffered); Assert.Equal(1, ReadQueueDepthGauge()); await _service.RetryPendingMessagesAsync(); var parked = await _storage.GetMessageByIdAsync(parkResult.MessageId); Assert.Equal(StoreAndForwardMessageStatus.Parked, parked!.Status); Assert.Equal(0, ReadQueueDepthGauge()); // Operator requeue: Parked→Pending re-adds to the live queue → depth 1. Assert.True(await _service.RetryParkedMessageAsync(parkResult.MessageId)); Assert.Equal(1, ReadQueueDepthGauge()); } /// /// StoreAndForward-025: after a graceful /// the service must deregister its queue-depth provider from the process-global gauge /// slot, so the gauge stops reporting the stopped instance's (now-frozen) depth and the /// provider closure no longer pins the dead service. With the provider cleared the gauge /// falls back to 0. /// [Fact] public async Task StopAsync_ClearsQueueDepthProvider_GaugeFallsBackToZero() { var fresh = new StoreAndForwardService( _storage, new StoreAndForwardOptions { RetryTimerInterval = TimeSpan.FromMinutes(10) }, NullLogger.Instance); // Register a Pending row this instance owns, then start so the instance registers // its provider and seeds the cached count to 1 → gauge reports 1. await _storage.EnqueueAsync(new StoreAndForwardMessage { Id = Guid.NewGuid().ToString("N"), Category = StoreAndForwardCategory.ExternalSystem, Target = "api", PayloadJson = "{}", Status = StoreAndForwardMessageStatus.Pending, CreatedAt = DateTimeOffset.UtcNow, MaxRetries = 3 }); await fresh.StartAsync(); Assert.Equal(1, ReadQueueDepthGauge()); // Graceful stop must deregister the provider; the gauge falls back to 0 rather // than reporting this dead instance's frozen depth of 1. await fresh.StopAsync(); Assert.Equal(0, ReadQueueDepthGauge()); } /// /// StoreAndForward-025 (compare-and-clear): when a newer instance has already /// registered its provider into the process-global slot, a late /// of an older instance must NOT clear /// the slot — the identity-checked clear only removes the slot when it still holds the /// stopping instance's own delegate. After the late stop the gauge must still report /// the newer instance's depth, not 0. /// [Fact] public async Task StopAsync_DoesNotClobberNewerInstanceProvider() { // Old instance: starts over an empty store, registers its provider (gauge → 0), // then takes a single buffered message so it would report 1 if it stayed live. var older = new StoreAndForwardService( _storage, new StoreAndForwardOptions { RetryTimerInterval = TimeSpan.FromMinutes(10) }, NullLogger.Instance); await older.StartAsync(); older.TestOnly_IncrementBufferedCount(); // older's depth would be 1 Assert.Equal(1, ReadQueueDepthGauge()); // New instance: starts and re-registers into the same global slot, winning it. // It seeds from the (empty) store and stands in two buffered messages → depth 2. var newer = new StoreAndForwardService( _storage, new StoreAndForwardOptions { RetryTimerInterval = TimeSpan.FromMinutes(10) }, NullLogger.Instance); await newer.StartAsync(); newer.TestOnly_IncrementBufferedCount(); newer.TestOnly_IncrementBufferedCount(); Assert.Equal(2, ReadQueueDepthGauge()); // Late stop of the OLDER instance: compare-and-clear must fail the identity check // (the slot now holds the newer instance's delegate), so the newer provider stays. await older.StopAsync(); Assert.Equal(2, ReadQueueDepthGauge()); // Cleanup: stop the newer instance, which legitimately clears its own provider. await newer.StopAsync(); Assert.Equal(0, ReadQueueDepthGauge()); } [Fact] public async Task Gauge_SeedsFromExistingPendingRows_OnStart() { // Pre-seed two Pending rows directly in storage *before* a fresh service starts, // simulating a process restart over a non-empty buffer. StartAsync must seed the // cached counter from the store so the gauge does not under-report on restart. await _storage.EnqueueAsync(new StoreAndForwardMessage { Id = Guid.NewGuid().ToString("N"), Category = StoreAndForwardCategory.ExternalSystem, Target = "api", PayloadJson = "{}", Status = StoreAndForwardMessageStatus.Pending, CreatedAt = DateTimeOffset.UtcNow, MaxRetries = 3 }); await _storage.EnqueueAsync(new StoreAndForwardMessage { Id = Guid.NewGuid().ToString("N"), Category = StoreAndForwardCategory.Notification, Target = "list", PayloadJson = "{}", Status = StoreAndForwardMessageStatus.Pending, CreatedAt = DateTimeOffset.UtcNow, MaxRetries = 3 }); var fresh = new StoreAndForwardService( _storage, new StoreAndForwardOptions { RetryTimerInterval = TimeSpan.FromMinutes(10) }, NullLogger.Instance); try { await fresh.StartAsync(); // The fresh service registered itself as the global provider and seeded 2. Assert.Equal(2, ReadQueueDepthGauge()); } finally { await fresh.StopAsync(); } } /// /// Review finding (FINDING 1): the startup seed must ADD to whatever the counter /// already holds, not overwrite it. A concurrent BufferAsync can /// Interlocked.Increment _bufferedCount in the window between /// StartAsync's async COUNT(*) returning and the seed running; with an /// Interlocked.Exchange seed that increment would be clobbered (lost +1). This /// pre-increments the in-memory counter (standing in for that concurrent enqueue), /// then starts the service over an empty store and asserts the pre-increment survives. /// [Fact] public async Task Gauge_SeedAddsToConcurrentPreSeedIncrement_NotClobber() { // Store is empty (StartAsync's pending COUNT(*) = 0), so the only contribution // is the simulated concurrent pre-seed enqueue increment. var fresh = new StoreAndForwardService( _storage, new StoreAndForwardOptions { RetryTimerInterval = TimeSpan.FromMinutes(10) }, NullLogger.Instance); // Stand in for a BufferAsync increment that landed before StartAsync seeded. fresh.TestOnly_IncrementBufferedCount(); try { await fresh.StartAsync(); // Add(0 seed) over the pre-existing +1 → 1. An Exchange(0) seed would clobber // it to 0, losing the concurrent enqueue — the bug this fix prevents. Assert.Equal(1, ReadQueueDepthGauge()); } finally { await fresh.StopAsync(); } } }