fd618cf1dc
Remediation from the full per-module code review at 4307c381 (findings recorded
separately in code-reviews/).
Highs fixed:
- DeploymentManager-025/SiteRuntime-031: stop broadcasting notification lists + SMTP
configs (incl. credentials) to sites; site purges already-persisted rows on apply
(enforces the central-only delivery design; clears plaintext SMTP creds at rest).
- DataConnectionLayer-023: guard the native-alarm subscribe path against the
mid-flight-unsubscribe adapter-feed leak (mirrors the DCL-021 tag-path fix).
- SiteEventLogging-024: normalize From/To query bounds to UTC (the -016 fix the
audit trail claimed but never committed).
- KpiHistory-001: add an in-flight guard to the recorder sample tick.
- ScriptAnalysis-001: harden the trust analyzer's TPA-absent fallback (resolve
forbidden anchors in the minimal reference set; warn on degraded mode) — anchors
added to validation references only, never the compile gate.
(InboundAPI-026 left to the feat/ipsen-movein effort per owner decision.)
Medium/Low: DM-026 deterministic deploy-status tiebreaker; SR-027/028/029/030
native-alarm leak/phantom-active/delete-during-redeploy fixes; AL-013/014/016;
TE-024 (folder-mutation audit rows now persisted)/025; SF-025 gauge-provider
clear-on-stop; ESG-025/026; SEC-023/024/025; SCA-007/008/009; plus doc/test
accuracy COM-023/024, HOST-025/026, HM-024/025, NS-027/028.
Full-solution build 0 warnings; ~3560 tests across 18 touched suites green.
290 lines
12 KiB
C#
290 lines
12 KiB
C#
using System.Diagnostics.Metrics;
|
|
using Microsoft.Data.Sqlite;
|
|
using Microsoft.Extensions.Logging.Abstractions;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
|
|
|
|
namespace ZB.MOM.WW.ScadaBridge.StoreAndForward.Tests;
|
|
|
|
/// <summary>
|
|
/// WP-14 (telemetry follow-on): verifies the cached buffered-message counter that
|
|
/// backs the <c>scadabridge.store_and_forward.queue.depth</c> observable gauge tracks
|
|
/// the live (Pending) queue across the existing enqueue / drain / park / requeue paths,
|
|
/// and that the sync gauge callback reports it.
|
|
///
|
|
/// The gauge is read the way the OpenTelemetry collector reads it — via a
|
|
/// <see cref="MeterListener"/> that forces an observation (the callback is synchronous
|
|
/// and does no I/O, which is the whole point of caching the count). <see cref="StartAsync"/>
|
|
/// seeds the counter from storage and registers the provider against this service
|
|
/// instance, so the gauge resolves to this test's counter.
|
|
/// </summary>
|
|
public class QueueDepthGaugeTests : IAsyncLifetime, IDisposable
|
|
{
|
|
private readonly SqliteConnection _keepAlive;
|
|
private readonly StoreAndForwardStorage _storage;
|
|
private readonly StoreAndForwardService _service;
|
|
|
|
public QueueDepthGaugeTests()
|
|
{
|
|
var dbName = $"QueueDepthTests_{Guid.NewGuid():N}";
|
|
var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared";
|
|
_keepAlive = new SqliteConnection(connStr);
|
|
_keepAlive.Open();
|
|
|
|
_storage = new StoreAndForwardStorage(connStr, NullLogger<StoreAndForwardStorage>.Instance);
|
|
|
|
var options = new StoreAndForwardOptions
|
|
{
|
|
DefaultRetryInterval = TimeSpan.Zero,
|
|
DefaultMaxRetries = 3,
|
|
// Long interval so no background sweep fires on its own during the test;
|
|
// sweeps are driven explicitly via RetryPendingMessagesAsync.
|
|
RetryTimerInterval = TimeSpan.FromMinutes(10)
|
|
};
|
|
|
|
_service = new StoreAndForwardService(
|
|
_storage, options, NullLogger<StoreAndForwardService>.Instance);
|
|
}
|
|
|
|
public async Task InitializeAsync()
|
|
{
|
|
await _storage.InitializeAsync();
|
|
// StartAsync seeds _bufferedCount from the (empty) store and registers the
|
|
// queue-depth provider against this service instance.
|
|
await _service.StartAsync();
|
|
}
|
|
|
|
public async Task DisposeAsync() => await _service.StopAsync();
|
|
|
|
public void Dispose() => _keepAlive.Dispose();
|
|
|
|
/// <summary>
|
|
/// Reads the current value of the <c>scadabridge.store_and_forward.queue.depth</c>
|
|
/// gauge by forcing a synchronous observation through a transient MeterListener —
|
|
/// exactly the path the Prometheus/OTLP collector exercises on each scrape.
|
|
/// </summary>
|
|
private static long ReadQueueDepthGauge()
|
|
{
|
|
long observed = -1;
|
|
using var listener = new MeterListener
|
|
{
|
|
InstrumentPublished = (instrument, l) =>
|
|
{
|
|
if (instrument.Meter.Name == ScadaBridgeTelemetry.MeterName &&
|
|
instrument.Name == "scadabridge.store_and_forward.queue.depth")
|
|
{
|
|
l.EnableMeasurementEvents(instrument);
|
|
}
|
|
}
|
|
};
|
|
listener.SetMeasurementEventCallback<long>((_, measurement, _, _) => observed = measurement);
|
|
listener.Start();
|
|
listener.RecordObservableInstruments();
|
|
return observed;
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Gauge_TracksBufferedDepth_AcrossEnqueueDrainAndPark()
|
|
{
|
|
// Empty store seeded at StartAsync → gauge reports 0.
|
|
Assert.Equal(0, ReadQueueDepthGauge());
|
|
|
|
// A handler that fails transiently so each enqueue buffers a Pending row
|
|
// (immediate attempt 0 throws → BufferAsync → +1).
|
|
var deliver = false;
|
|
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
|
|
_ =>
|
|
{
|
|
if (!deliver) throw new HttpRequestException("transient");
|
|
return Task.FromResult(true);
|
|
});
|
|
|
|
// Enqueue 3 → cached depth = 3 → gauge reports 3.
|
|
for (var i = 0; i < 3; i++)
|
|
{
|
|
var r = await _service.EnqueueAsync(
|
|
StoreAndForwardCategory.ExternalSystem, "api", """{}""");
|
|
Assert.True(r.WasBuffered);
|
|
}
|
|
Assert.Equal(3, ReadQueueDepthGauge());
|
|
|
|
// Drain: handler now succeeds → the retry sweep removes all 3 Pending rows → depth 0.
|
|
deliver = true;
|
|
await _service.RetryPendingMessagesAsync();
|
|
Assert.Equal(0, ReadQueueDepthGauge());
|
|
|
|
// Park path: buffer one more, then make it park (maxRetries:1 parks after one
|
|
// sweep). Pending→Parked leaves the live queue → depth back to 0.
|
|
deliver = false;
|
|
var parkResult = await _service.EnqueueAsync(
|
|
StoreAndForwardCategory.ExternalSystem, "api", """{}""", maxRetries: 1);
|
|
Assert.True(parkResult.WasBuffered);
|
|
Assert.Equal(1, ReadQueueDepthGauge());
|
|
|
|
await _service.RetryPendingMessagesAsync();
|
|
var parked = await _storage.GetMessageByIdAsync(parkResult.MessageId);
|
|
Assert.Equal(StoreAndForwardMessageStatus.Parked, parked!.Status);
|
|
Assert.Equal(0, ReadQueueDepthGauge());
|
|
|
|
// Operator requeue: Parked→Pending re-adds to the live queue → depth 1.
|
|
Assert.True(await _service.RetryParkedMessageAsync(parkResult.MessageId));
|
|
Assert.Equal(1, ReadQueueDepthGauge());
|
|
}
|
|
|
|
/// <summary>
|
|
/// StoreAndForward-025: after a graceful <see cref="StoreAndForwardService.StopAsync"/>
|
|
/// the service must deregister its queue-depth provider from the process-global gauge
|
|
/// slot, so the gauge stops reporting the stopped instance's (now-frozen) depth and the
|
|
/// provider closure no longer pins the dead service. With the provider cleared the gauge
|
|
/// falls back to 0.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task StopAsync_ClearsQueueDepthProvider_GaugeFallsBackToZero()
|
|
{
|
|
var fresh = new StoreAndForwardService(
|
|
_storage,
|
|
new StoreAndForwardOptions { RetryTimerInterval = TimeSpan.FromMinutes(10) },
|
|
NullLogger<StoreAndForwardService>.Instance);
|
|
|
|
// Register a Pending row this instance owns, then start so the instance registers
|
|
// its provider and seeds the cached count to 1 → gauge reports 1.
|
|
await _storage.EnqueueAsync(new StoreAndForwardMessage
|
|
{
|
|
Id = Guid.NewGuid().ToString("N"),
|
|
Category = StoreAndForwardCategory.ExternalSystem,
|
|
Target = "api",
|
|
PayloadJson = "{}",
|
|
Status = StoreAndForwardMessageStatus.Pending,
|
|
CreatedAt = DateTimeOffset.UtcNow,
|
|
MaxRetries = 3
|
|
});
|
|
await fresh.StartAsync();
|
|
Assert.Equal(1, ReadQueueDepthGauge());
|
|
|
|
// Graceful stop must deregister the provider; the gauge falls back to 0 rather
|
|
// than reporting this dead instance's frozen depth of 1.
|
|
await fresh.StopAsync();
|
|
Assert.Equal(0, ReadQueueDepthGauge());
|
|
}
|
|
|
|
/// <summary>
|
|
/// StoreAndForward-025 (compare-and-clear): when a newer instance has already
|
|
/// registered its provider into the process-global slot, a late
|
|
/// <see cref="StoreAndForwardService.StopAsync"/> of an older instance must NOT clear
|
|
/// the slot — the identity-checked clear only removes the slot when it still holds the
|
|
/// stopping instance's own delegate. After the late stop the gauge must still report
|
|
/// the newer instance's depth, not 0.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task StopAsync_DoesNotClobberNewerInstanceProvider()
|
|
{
|
|
// Old instance: starts over an empty store, registers its provider (gauge → 0),
|
|
// then takes a single buffered message so it would report 1 if it stayed live.
|
|
var older = new StoreAndForwardService(
|
|
_storage,
|
|
new StoreAndForwardOptions { RetryTimerInterval = TimeSpan.FromMinutes(10) },
|
|
NullLogger<StoreAndForwardService>.Instance);
|
|
await older.StartAsync();
|
|
older.TestOnly_IncrementBufferedCount(); // older's depth would be 1
|
|
Assert.Equal(1, ReadQueueDepthGauge());
|
|
|
|
// New instance: starts and re-registers into the same global slot, winning it.
|
|
// It seeds from the (empty) store and stands in two buffered messages → depth 2.
|
|
var newer = new StoreAndForwardService(
|
|
_storage,
|
|
new StoreAndForwardOptions { RetryTimerInterval = TimeSpan.FromMinutes(10) },
|
|
NullLogger<StoreAndForwardService>.Instance);
|
|
await newer.StartAsync();
|
|
newer.TestOnly_IncrementBufferedCount();
|
|
newer.TestOnly_IncrementBufferedCount();
|
|
Assert.Equal(2, ReadQueueDepthGauge());
|
|
|
|
// Late stop of the OLDER instance: compare-and-clear must fail the identity check
|
|
// (the slot now holds the newer instance's delegate), so the newer provider stays.
|
|
await older.StopAsync();
|
|
Assert.Equal(2, ReadQueueDepthGauge());
|
|
|
|
// Cleanup: stop the newer instance, which legitimately clears its own provider.
|
|
await newer.StopAsync();
|
|
Assert.Equal(0, ReadQueueDepthGauge());
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Gauge_SeedsFromExistingPendingRows_OnStart()
|
|
{
|
|
// Pre-seed two Pending rows directly in storage *before* a fresh service starts,
|
|
// simulating a process restart over a non-empty buffer. StartAsync must seed the
|
|
// cached counter from the store so the gauge does not under-report on restart.
|
|
await _storage.EnqueueAsync(new StoreAndForwardMessage
|
|
{
|
|
Id = Guid.NewGuid().ToString("N"),
|
|
Category = StoreAndForwardCategory.ExternalSystem,
|
|
Target = "api",
|
|
PayloadJson = "{}",
|
|
Status = StoreAndForwardMessageStatus.Pending,
|
|
CreatedAt = DateTimeOffset.UtcNow,
|
|
MaxRetries = 3
|
|
});
|
|
await _storage.EnqueueAsync(new StoreAndForwardMessage
|
|
{
|
|
Id = Guid.NewGuid().ToString("N"),
|
|
Category = StoreAndForwardCategory.Notification,
|
|
Target = "list",
|
|
PayloadJson = "{}",
|
|
Status = StoreAndForwardMessageStatus.Pending,
|
|
CreatedAt = DateTimeOffset.UtcNow,
|
|
MaxRetries = 3
|
|
});
|
|
|
|
var fresh = new StoreAndForwardService(
|
|
_storage,
|
|
new StoreAndForwardOptions { RetryTimerInterval = TimeSpan.FromMinutes(10) },
|
|
NullLogger<StoreAndForwardService>.Instance);
|
|
try
|
|
{
|
|
await fresh.StartAsync();
|
|
// The fresh service registered itself as the global provider and seeded 2.
|
|
Assert.Equal(2, ReadQueueDepthGauge());
|
|
}
|
|
finally
|
|
{
|
|
await fresh.StopAsync();
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Review finding (FINDING 1): the startup seed must ADD to whatever the counter
|
|
/// already holds, not overwrite it. A concurrent <c>BufferAsync</c> can
|
|
/// <c>Interlocked.Increment</c> <c>_bufferedCount</c> in the window between
|
|
/// <c>StartAsync</c>'s async <c>COUNT(*)</c> returning and the seed running; with an
|
|
/// <c>Interlocked.Exchange</c> seed that increment would be clobbered (lost +1). This
|
|
/// pre-increments the in-memory counter (standing in for that concurrent enqueue),
|
|
/// then starts the service over an empty store and asserts the pre-increment survives.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task Gauge_SeedAddsToConcurrentPreSeedIncrement_NotClobber()
|
|
{
|
|
// Store is empty (StartAsync's pending COUNT(*) = 0), so the only contribution
|
|
// is the simulated concurrent pre-seed enqueue increment.
|
|
var fresh = new StoreAndForwardService(
|
|
_storage,
|
|
new StoreAndForwardOptions { RetryTimerInterval = TimeSpan.FromMinutes(10) },
|
|
NullLogger<StoreAndForwardService>.Instance);
|
|
|
|
// Stand in for a BufferAsync increment that landed before StartAsync seeded.
|
|
fresh.TestOnly_IncrementBufferedCount();
|
|
|
|
try
|
|
{
|
|
await fresh.StartAsync();
|
|
// Add(0 seed) over the pre-existing +1 → 1. An Exchange(0) seed would clobber
|
|
// it to 0, losing the concurrent enqueue — the bug this fix prevents.
|
|
Assert.Equal(1, ReadQueueDepthGauge());
|
|
}
|
|
finally
|
|
{
|
|
await fresh.StopAsync();
|
|
}
|
|
}
|
|
}
|