feat(scadabridge): wire scadabridge.store_and_forward.queue.depth gauge to buffered count

This commit is contained in:
Joseph Doherty
2026-06-01 16:58:09 -04:00
parent 877f2e200b
commit 547b685a42
2 changed files with 238 additions and 0 deletions
@@ -1,5 +1,6 @@
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
using ZB.MOM.WW.ScadaBridge.Commons.Types;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
@@ -98,6 +99,37 @@ public class StoreAndForwardService
/// </summary>
private static readonly TimeSpan SweepShutdownWaitTimeout = TimeSpan.FromSeconds(10);
/// <summary>
/// WP-14 (telemetry): cached count of messages currently buffered for
/// forwarding — i.e. rows in <see cref="StoreAndForwardMessageStatus.Pending"/>,
/// the live store-and-forward queue waiting to be delivered. This backs the
/// <c>scadabridge.store_and_forward.queue.depth</c> observable gauge.
/// <para>
/// The gauge's collection callback is synchronous and is invoked frequently by
/// the OpenTelemetry/Prometheus collector, so it must never run an async SQLite
/// <c>COUNT(*)</c>. Instead this <see cref="long"/> is seeded once from storage
/// in <see cref="StartAsync"/> and then adjusted in-process on the existing
/// paths that change the Pending population: <see cref="BufferAsync"/> (+1),
/// successful-retry removal and Pending→Parked transitions in
/// <see cref="RetryMessageAsync"/> (-1), and operator requeue in
/// <see cref="RetryParkedMessageAsync"/> (+1). The provider registered with
/// <see cref="ScadaBridgeTelemetry.SetQueueDepthProvider"/> reads it via
/// <see cref="Interlocked.Read"/> — non-blocking and sync-safe. It is an
/// approximate, eventually-consistent gauge (concurrent failover replication
/// applies to the standby's own store, not this counter), which is exactly
/// what a queue-depth metric needs.
/// </para>
/// </summary>
private long _bufferedCount;
/// <summary>
/// WP-14 (telemetry): guards one-time registration of the queue-depth provider
/// with <see cref="ScadaBridgeTelemetry"/>. The gauge is process-global, so only
/// the first <see cref="StartAsync"/> wins; registering per message (or per start
/// of multiple service instances) is avoided. 0 = not yet registered, 1 = done.
/// </summary>
private int _queueDepthProviderRegistered;
/// <summary>
/// WP-10: Delivery handler delegate. The return value / exception is interpreted
/// the same way on both the immediate-delivery path (<see cref="EnqueueAsync"/>)
@@ -170,6 +202,19 @@ public class StoreAndForwardService
public async Task StartAsync()
{
await _storage.InitializeAsync();
// WP-14 (telemetry): seed the cached buffered-message count from the
// store exactly once (the gauge callback cannot run an async COUNT), then
// register the sync, non-blocking provider with the process-global
// ScadaBridgeTelemetry gauge — guarded so only the first start registers.
var pending = await _storage.GetMessageCountByStatusAsync(
StoreAndForwardMessageStatus.Pending);
Interlocked.Exchange(ref _bufferedCount, pending);
if (Interlocked.CompareExchange(ref _queueDepthProviderRegistered, 1, 0) == 0)
{
ScadaBridgeTelemetry.SetQueueDepthProvider(() => Interlocked.Read(ref _bufferedCount));
}
_retryTimer = new Timer(
// StoreAndForward-024: capture the sweep Task on each tick so
// StopAsync can await any in-flight invocation before the host
@@ -396,6 +441,10 @@ public class StoreAndForwardService
{
await _storage.EnqueueAsync(message);
_replication?.ReplicateEnqueue(message);
// WP-14 (telemetry): a freshly buffered row is Pending → grows the live
// queue depth. Bumped after the durable write so the gauge never leads the
// store.
Interlocked.Increment(ref _bufferedCount);
}
/// <summary>
@@ -452,6 +501,8 @@ public class StoreAndForwardService
{
await _storage.RemoveMessageAsync(message.Id);
_replication?.ReplicateRemove(message.Id);
// WP-14 (telemetry): a delivered row leaves the Pending queue.
Interlocked.Decrement(ref _bufferedCount);
RaiseActivity("Delivered", message.Category,
$"Delivered to {message.Target} after {message.RetryCount} retries");
@@ -483,6 +534,9 @@ public class StoreAndForwardService
message.Id);
return;
}
// WP-14 (telemetry): the row committed Pending→Parked, leaving the live
// forward queue. Only counted when the conditional update actually won.
Interlocked.Decrement(ref _bufferedCount);
_replication?.ReplicatePark(message);
RaiseActivity("Parked", message.Category,
$"Permanent failure for {message.Target}: handler returned false");
@@ -519,6 +573,9 @@ public class StoreAndForwardService
message.Id);
return;
}
// WP-14 (telemetry): the row committed Pending→Parked, leaving the
// live forward queue. Only counted when the conditional update won.
Interlocked.Decrement(ref _bufferedCount);
_replication?.ReplicatePark(message);
RaiseActivity("Parked", message.Category,
$"Max retries ({message.MaxRetries}) reached for {message.Target}");
@@ -737,6 +794,11 @@ public class StoreAndForwardService
return false;
}
// WP-14 (telemetry): an operator requeue moves Parked→Pending, re-adding the
// row to the live forward queue. Counted only when the conditional storage
// update actually flipped the row.
Interlocked.Increment(ref _bufferedCount);
// The active node just rewrote this row to Pending with retry_count = 0
// and cleared last_error / last_attempt_at (see
// StoreAndForwardStorage.RetryParkedMessageAsync). Reconstruct the
@@ -0,0 +1,176 @@
using System.Diagnostics.Metrics;
using Microsoft.Data.Sqlite;
using Microsoft.Extensions.Logging.Abstractions;
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
namespace ZB.MOM.WW.ScadaBridge.StoreAndForward.Tests;
/// <summary>
/// WP-14 (telemetry follow-on): verifies the cached buffered-message counter that
/// backs the <c>scadabridge.store_and_forward.queue.depth</c> observable gauge tracks
/// the live (Pending) queue across the existing enqueue / drain / park / requeue paths,
/// and that the sync gauge callback reports it.
///
/// The gauge is read the way the OpenTelemetry collector reads it — via a
/// <see cref="MeterListener"/> that forces an observation (the callback is synchronous
/// and does no I/O, which is the whole point of caching the count). <see cref="StartAsync"/>
/// seeds the counter from storage and registers the provider against this service
/// instance, so the gauge resolves to this test's counter.
/// </summary>
public class QueueDepthGaugeTests : IAsyncLifetime, IDisposable
{
private readonly SqliteConnection _keepAlive;
private readonly StoreAndForwardStorage _storage;
private readonly StoreAndForwardService _service;
public QueueDepthGaugeTests()
{
var dbName = $"QueueDepthTests_{Guid.NewGuid():N}";
var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared";
_keepAlive = new SqliteConnection(connStr);
_keepAlive.Open();
_storage = new StoreAndForwardStorage(connStr, NullLogger<StoreAndForwardStorage>.Instance);
var options = new StoreAndForwardOptions
{
DefaultRetryInterval = TimeSpan.Zero,
DefaultMaxRetries = 3,
// Long interval so no background sweep fires on its own during the test;
// sweeps are driven explicitly via RetryPendingMessagesAsync.
RetryTimerInterval = TimeSpan.FromMinutes(10)
};
_service = new StoreAndForwardService(
_storage, options, NullLogger<StoreAndForwardService>.Instance);
}
public async Task InitializeAsync()
{
await _storage.InitializeAsync();
// StartAsync seeds _bufferedCount from the (empty) store and registers the
// queue-depth provider against this service instance.
await _service.StartAsync();
}
public async Task DisposeAsync() => await _service.StopAsync();
public void Dispose() => _keepAlive.Dispose();
/// <summary>
/// Reads the current value of the <c>scadabridge.store_and_forward.queue.depth</c>
/// gauge by forcing a synchronous observation through a transient MeterListener —
/// exactly the path the Prometheus/OTLP collector exercises on each scrape.
/// </summary>
private static long ReadQueueDepthGauge()
{
long observed = -1;
using var listener = new MeterListener
{
InstrumentPublished = (instrument, l) =>
{
if (instrument.Meter.Name == ScadaBridgeTelemetry.MeterName &&
instrument.Name == "scadabridge.store_and_forward.queue.depth")
{
l.EnableMeasurementEvents(instrument);
}
}
};
listener.SetMeasurementEventCallback<long>((_, measurement, _, _) => observed = measurement);
listener.Start();
listener.RecordObservableInstruments();
return observed;
}
[Fact]
public async Task Gauge_TracksBufferedDepth_AcrossEnqueueDrainAndPark()
{
// Empty store seeded at StartAsync → gauge reports 0.
Assert.Equal(0, ReadQueueDepthGauge());
// A handler that fails transiently so each enqueue buffers a Pending row
// (immediate attempt 0 throws → BufferAsync → +1).
var deliver = false;
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
_ =>
{
if (!deliver) throw new HttpRequestException("transient");
return Task.FromResult(true);
});
// Enqueue 3 → cached depth = 3 → gauge reports 3.
for (var i = 0; i < 3; i++)
{
var r = await _service.EnqueueAsync(
StoreAndForwardCategory.ExternalSystem, "api", """{}""");
Assert.True(r.WasBuffered);
}
Assert.Equal(3, ReadQueueDepthGauge());
// Drain: handler now succeeds → the retry sweep removes all 3 Pending rows → depth 0.
deliver = true;
await _service.RetryPendingMessagesAsync();
Assert.Equal(0, ReadQueueDepthGauge());
// Park path: buffer one more, then make it park (maxRetries:1 parks after one
// sweep). Pending→Parked leaves the live queue → depth back to 0.
deliver = false;
var parkResult = await _service.EnqueueAsync(
StoreAndForwardCategory.ExternalSystem, "api", """{}""", maxRetries: 1);
Assert.True(parkResult.WasBuffered);
Assert.Equal(1, ReadQueueDepthGauge());
await _service.RetryPendingMessagesAsync();
var parked = await _storage.GetMessageByIdAsync(parkResult.MessageId);
Assert.Equal(StoreAndForwardMessageStatus.Parked, parked!.Status);
Assert.Equal(0, ReadQueueDepthGauge());
// Operator requeue: Parked→Pending re-adds to the live queue → depth 1.
Assert.True(await _service.RetryParkedMessageAsync(parkResult.MessageId));
Assert.Equal(1, ReadQueueDepthGauge());
}
[Fact]
public async Task Gauge_SeedsFromExistingPendingRows_OnStart()
{
// Pre-seed two Pending rows directly in storage *before* a fresh service starts,
// simulating a process restart over a non-empty buffer. StartAsync must seed the
// cached counter from the store so the gauge does not under-report on restart.
await _storage.EnqueueAsync(new StoreAndForwardMessage
{
Id = Guid.NewGuid().ToString("N"),
Category = StoreAndForwardCategory.ExternalSystem,
Target = "api",
PayloadJson = "{}",
Status = StoreAndForwardMessageStatus.Pending,
CreatedAt = DateTimeOffset.UtcNow,
MaxRetries = 3
});
await _storage.EnqueueAsync(new StoreAndForwardMessage
{
Id = Guid.NewGuid().ToString("N"),
Category = StoreAndForwardCategory.Notification,
Target = "list",
PayloadJson = "{}",
Status = StoreAndForwardMessageStatus.Pending,
CreatedAt = DateTimeOffset.UtcNow,
MaxRetries = 3
});
var fresh = new StoreAndForwardService(
_storage,
new StoreAndForwardOptions { RetryTimerInterval = TimeSpan.FromMinutes(10) },
NullLogger<StoreAndForwardService>.Instance);
try
{
await fresh.StartAsync();
// The fresh service registered itself as the global provider and seeded 2.
Assert.Equal(2, ReadQueueDepthGauge());
}
finally
{
await fresh.StopAsync();
}
}
}