Merge feat/telemetry-followons: telemetry follow-ons for ScadaBridge

Site-node HTTP/1.1 /metrics listener (NodeOptions.MetricsPort=8084, avoids the
site RemotingPort collision; StartupValidator enforces distinctness). First
application instruments: ScadaBridgeTelemetry meter + deployments.applied,
store_and_forward.queue.depth, inbound_api.requests, site.connection.up.
Config-driven OTLP exporter opt-in (default Prometheus).
This commit is contained in:
Joseph Doherty
2026-06-01 17:17:39 -04:00
22 changed files with 881 additions and 7 deletions
+2 -1
View File
@@ -6,7 +6,8 @@
"NodeHostname": "scadabridge-site-a-a",
"SiteId": "site-a",
"RemotingPort": 8082,
"GrpcPort": 8083
"GrpcPort": 8083,
"MetricsPort": 8084
},
"Cluster": {
"SeedNodes": [
+2 -1
View File
@@ -6,7 +6,8 @@
"NodeHostname": "scadabridge-site-a-b",
"SiteId": "site-a",
"RemotingPort": 8082,
"GrpcPort": 8083
"GrpcPort": 8083,
"MetricsPort": 8084
},
"Cluster": {
"SeedNodes": [
+2 -1
View File
@@ -6,7 +6,8 @@
"NodeHostname": "scadabridge-site-b-a",
"SiteId": "site-b",
"RemotingPort": 8082,
"GrpcPort": 8083
"GrpcPort": 8083,
"MetricsPort": 8084
},
"Cluster": {
"SeedNodes": [
+2 -1
View File
@@ -6,7 +6,8 @@
"NodeHostname": "scadabridge-site-b-b",
"SiteId": "site-b",
"RemotingPort": 8082,
"GrpcPort": 8083
"GrpcPort": 8083,
"MetricsPort": 8084
},
"Cluster": {
"SeedNodes": [
+2 -1
View File
@@ -6,7 +6,8 @@
"NodeHostname": "scadabridge-site-c-a",
"SiteId": "site-c",
"RemotingPort": 8082,
"GrpcPort": 8083
"GrpcPort": 8083,
"MetricsPort": 8084
},
"Cluster": {
"SeedNodes": [
+2 -1
View File
@@ -6,7 +6,8 @@
"NodeHostname": "scadabridge-site-c-b",
"SiteId": "site-c",
"RemotingPort": 8082,
"GrpcPort": 8083
"GrpcPort": 8083,
"MetricsPort": 8084
},
"Cluster": {
"SeedNodes": [
@@ -0,0 +1,94 @@
using System.Diagnostics.Metrics;
namespace ZB.MOM.WW.ScadaBridge.Commons.Observability;
/// <summary>
/// Central <see cref="Meter"/> + instrument definitions for ScadaBridge's application
/// telemetry, modelled on OtOpcUa's <c>OtOpcUaTelemetry</c>. Modules emit through these
/// pre-created instruments so a single OpenTelemetry / Prometheus binding in
/// <c>Host</c> (registered via <c>AddZbTelemetry</c> with this meter named in
/// <c>ZbTelemetryOptions.Meters</c>) catches everything. No exporter is required —
/// instruments are no-op until a listener attaches, so tests and dev hosts pay nothing
/// for instrumentation that nobody scrapes.
///
/// Instrument names follow the OpenTelemetry semantic convention pattern
/// <c>scadabridge.&lt;subsystem&gt;.&lt;event&gt;</c>. This task defines the instruments and
/// their emit helpers; four later tasks wire the actual emit points. Until those land the
/// helpers are dormant but inert — calling them is safe and simply records against a meter
/// that nothing observes.
/// </summary>
public static class ScadaBridgeTelemetry
{
/// <summary>The meter name registered with OTel via <c>ZbTelemetryOptions.Meters</c>.</summary>
public const string MeterName = "ZB.MOM.WW.ScadaBridge";
/// <summary>Singleton <see cref="Meter"/> all instruments hang off.</summary>
private static readonly Meter Meter = new(MeterName);
// ---------------- Counters ----------------
/// <summary>Incremented each time a deployment is successfully applied.</summary>
private static readonly Counter<long> _deploymentsApplied =
Meter.CreateCounter<long>("scadabridge.deployments.applied", unit: "1",
description: "Deployments applied.");
/// <summary>Incremented for each inbound API request, tagged with the API method.</summary>
private static readonly Counter<long> _inboundApiRequests =
Meter.CreateCounter<long>("scadabridge.inbound_api.requests", unit: "1",
description: "Inbound API requests, tagged by method.");
// ---------------- Observable gauges ----------------
/// <summary>Current count of open site connections, mutated via <see cref="Interlocked"/>.</summary>
private static long _siteConnectionsUp;
/// <summary>Provider that yields the live StoreAndForward queue depth; set by a later task.</summary>
private static Func<long>? _queueDepthProvider;
#pragma warning disable IDE0052 // Held to keep the observable gauges alive for the meter's lifetime.
/// <summary>Gauge reporting the number of currently open site connections.</summary>
private static readonly ObservableGauge<long> _siteConnectionUp =
Meter.CreateObservableGauge<long>("scadabridge.site.connection.up",
() => Interlocked.Read(ref _siteConnectionsUp),
description: "Number of currently open site connections.");
/// <summary>Gauge reporting the current StoreAndForward queue depth via the registered provider.</summary>
private static readonly ObservableGauge<long> _storeAndForwardQueueDepth =
Meter.CreateObservableGauge<long>("scadabridge.store_and_forward.queue.depth",
() => Volatile.Read(ref _queueDepthProvider) is { } p ? p() : 0L,
unit: "items",
description: "Current StoreAndForward queue depth.");
#pragma warning restore IDE0052
// ---------------- Emit helpers ----------------
/// <summary>Records that a deployment was applied.</summary>
public static void RecordDeploymentApplied() => _deploymentsApplied.Add(1);
/// <summary>Records an inbound API request for the given <paramref name="method"/>.</summary>
/// <param name="method">The API method the request targeted.</param>
public static void RecordInboundApiRequest(string method) =>
_inboundApiRequests.Add(1, new KeyValuePair<string, object?>("method", method));
/// <summary>Records that a site connection opened (increments the up-count gauge).</summary>
public static void SiteConnectionOpened() => Interlocked.Increment(ref _siteConnectionsUp);
/// <summary>Records that a site connection closed (decrements the up-count gauge).</summary>
public static void SiteConnectionClosed() => Interlocked.Decrement(ref _siteConnectionsUp);
/// <summary>
/// Registers the provider the StoreAndForward queue-depth gauge reads on each observation.
/// A later task supplies a provider that reads the real StoreAndForward depth. A null
/// provider is ignored so the gauge falls back to reporting 0.
/// </summary>
/// <param name="provider">A callback returning the current queue depth.</param>
public static void SetQueueDepthProvider(Func<long> provider)
{
if (provider is null)
{
return;
}
Volatile.Write(ref _queueDepthProvider, provider);
}
}
@@ -7,6 +7,7 @@ using Microsoft.Extensions.Options;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Audit;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Audit;
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
using GrpcStatus = Grpc.Core.Status;
namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc;
@@ -264,6 +265,14 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase
"Stream {CorrelationId} started for {Instance} (subscription {SubscriptionId})",
request.CorrelationId, request.InstanceUniqueName, subscriptionId);
// Telemetry follow-on: the connection is now fully established (Subscribe
// succeeded, so no leak via the catch above). Count it up here and balance
// it in the finally below so the scadabridge.site.connection.up gauge is
// decremented on EVERY exit path — normal completion, client-cancel /
// duplicate-replacement (OperationCanceledException), server shutdown
// (CancelAllStreams -> Cts.Cancel), and any other exception — guaranteeing
// exactly one Closed per Opened and a gauge that never drifts up.
ScadaBridgeTelemetry.SiteConnectionOpened();
try
{
await foreach (var evt in channel.Reader.ReadAllAsync(streamCts.Token))
@@ -277,6 +286,7 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase
}
finally
{
ScadaBridgeTelemetry.SiteConnectionClosed();
_streamSubscriber.RemoveSubscriber(relayActor);
_actorSystem!.Stop(relayActor);
channel.Writer.TryComplete();
@@ -7,6 +7,7 @@ using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Lifecycle;
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
using ZB.MOM.WW.ScadaBridge.Commons.Types;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Flattening;
@@ -244,6 +245,16 @@ public class DeploymentService
if (response.Status == DeploymentStatus.Success)
{
// Telemetry: one instance deployment successfully applied to a
// site. Counted once per successful deploy operation (the unit
// of scadabridge.deployments.applied — one DeployInstanceAsync
// deploys exactly one instance to one site). Emitted only on this
// confirmed-Success path, so failures, timeouts/retries (the
// catch block), and the reconciliation path (which recovers a
// PRIOR timed-out apply rather than performing a fresh one) do
// not increment it.
ScadaBridgeTelemetry.RecordDeploymentApplied();
// The site has applied the deployment. The post-success
// persistence below is best-effort: a failure here must be
// logged loudly for operator reconciliation but must not flip
@@ -20,4 +20,11 @@ public class NodeOptions
public int RemotingPort { get; set; } = 8081;
/// <summary>Gets or sets the gRPC port for the site stream server.</summary>
public int GrpcPort { get; set; } = 8083;
/// <summary>
/// HTTP/1.1 port serving the Prometheus /metrics scrape endpoint on site nodes.
/// Defaults to 8084 — deliberately distinct from <see cref="RemotingPort"/> (8082)
/// and <see cref="GrpcPort"/> (8083) so the Kestrel metrics listener never contends
/// with the Akka remoting port a site node binds.
/// </summary>
public int MetricsPort { get; set; } = 8084;
}
+14
View File
@@ -293,6 +293,13 @@ try
// Read GrpcPort from config (NodeOptions already has default 8083)
var grpcPort = configuration.GetValue<int>("ScadaBridge:Node:GrpcPort", 8083);
// Read MetricsPort from config (NodeOptions already has default 8084).
// Separate HTTP/1.1 listener so a standard HTTP/1.1 Prometheus scraper can
// reach /metrics; the gRPC port stays HTTP/2-only below. The default is
// 8084 — distinct from RemotingPort (8082, Akka) and GrpcPort (8083) so the
// metrics listener never collides with the Akka remoting port on site nodes.
var metricsPort = configuration.GetValue<int>("ScadaBridge:Node:MetricsPort", 8084);
// Configure Kestrel for HTTP/2 only on the gRPC port
builder.WebHost.ConfigureKestrel(options =>
{
@@ -300,6 +307,13 @@ try
{
listenOptions.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http2;
});
// Dedicated HTTP/1.1 (and HTTP/2) listener for the Prometheus /metrics
// scrape endpoint, reachable by an HTTP/1.1 scraper.
options.ListenAnyIP(metricsPort, listenOptions =>
{
listenOptions.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http1AndHttp2;
});
});
// gRPC server registration
@@ -2,6 +2,7 @@ using ZB.MOM.WW.ScadaBridge.AuditLog;
using ZB.MOM.WW.ScadaBridge.ClusterInfrastructure;
using ZB.MOM.WW.ScadaBridge.Communication;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
using ZB.MOM.WW.ScadaBridge.DataConnectionLayer;
using ZB.MOM.WW.ScadaBridge.ExternalSystemGateway;
using ZB.MOM.WW.ScadaBridge.HealthMonitoring;
@@ -122,12 +123,19 @@ public static class SiteServiceRegistration
// the always-on Prometheus exporter. Mount the /metrics scrape endpoint per role
// with app.MapZbMetrics(). The same `?? "central"` SiteId default Program.cs uses
// is applied here so the Resource attribute matches the log-enricher value.
// Meters left empty — application instruments are a deferred follow-on.
// The application meter is named so OTel observes its instruments; emit points are
// wired by follow-on tasks (the instruments are no-op until a listener attaches).
services.AddZbTelemetry(o =>
{
o.ServiceName = "scadabridge";
o.SiteId = config["ScadaBridge:Node:SiteId"] ?? "central";
o.NodeRole = config["ScadaBridge:Node:Role"];
o.Meters = [ScadaBridgeTelemetry.MeterName];
if (Enum.TryParse<ZbExporter>(config["ScadaBridge:Telemetry:Exporter"], ignoreCase: true, out var exporter))
o.Exporter = exporter;
var otlp = config["ScadaBridge:Telemetry:OtlpEndpoint"];
if (!string.IsNullOrWhiteSpace(otlp))
o.OtlpEndpoint = otlp;
});
}
}
@@ -58,6 +58,21 @@ public static class StartupValidator
if (port == grpcPort)
errors.Add("ScadaBridge:Node:GrpcPort must differ from RemotingPort");
var metricsPortStr = nodeSection["MetricsPort"];
int metricsPort = 8084; // NodeOptions default when the key is absent
if (metricsPortStr != null && (!int.TryParse(metricsPortStr, out metricsPort) || metricsPort < 1 || metricsPort > 65535))
errors.Add("ScadaBridge:Node:MetricsPort must be 1-65535");
// Host-007 / REQ-HOST-4: the Kestrel metrics (HTTP/1.1) listener port
// must differ from BOTH the Akka remoting port and the gRPC port.
// A collision makes the metrics listener contend with Akka.Remote or
// the gRPC listener for the same TCP port and fail opaquely at runtime.
// Uses the resolved MetricsPort, including the 8084 default.
if (metricsPort == port)
errors.Add("ScadaBridge:Node:MetricsPort must differ from RemotingPort");
if (metricsPort == grpcPort)
errors.Add("ScadaBridge:Node:MetricsPort must differ from GrpcPort");
var dbSection = configuration.GetSection("ScadaBridge:Database");
if (string.IsNullOrEmpty(dbSection["SiteDbPath"]))
errors.Add("ScadaBridge:Database:SiteDbPath required for Site nodes");
@@ -7,6 +7,7 @@
"SiteId": "site-a",
"RemotingPort": 8082,
"GrpcPort": 8083,
"MetricsPort": 8084,
"NodeName": "node-a"
},
"Cluster": {
@@ -5,6 +5,7 @@ using Microsoft.AspNetCore.Routing;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
using ZB.MOM.WW.ScadaBridge.InboundAPI.Middleware;
namespace ZB.MOM.WW.ScadaBridge.InboundAPI;
@@ -44,6 +45,16 @@ public static class EndpointExtensions
if (!validationResult.IsValid)
{
// Telemetry follow-on: count every inbound request, including auth
// failures. The raw {methodName} route value is arbitrary caller input
// and would be high-cardinality, so failures are tagged with a small
// bounded set of sentinels keyed off the validator's status code rather
// than the unvalidated name (401 → "<unauthorized>", 403 → "<forbidden>").
ScadaBridgeTelemetry.RecordInboundApiRequest(
validationResult.StatusCode == StatusCodes.Status401Unauthorized
? "<unauthorized>"
: "<forbidden>");
// WP-5: Failures-only logging
logger.LogWarning(
"Inbound API auth failure for method {Method}: {Error} (status {StatusCode})",
@@ -56,6 +67,12 @@ public static class EndpointExtensions
var method = validationResult.Method!;
// Telemetry follow-on: count this inbound request against the resolved,
// registered method name. method.Name comes from the repository's method
// catalogue (an exact-name lookup), so the `method` tag is bounded to the
// set of configured API methods — never the raw caller-supplied route value.
ScadaBridgeTelemetry.RecordInboundApiRequest(method.Name);
// Audit Log (#23 M4 Bundle D): publish the resolved API key name so
// AuditWriteMiddleware can populate AuditEvent.Actor in its finally
// block. Done AFTER validation succeeded — auth failures leave the
@@ -1,5 +1,6 @@
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
using ZB.MOM.WW.ScadaBridge.Commons.Types;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
@@ -98,6 +99,48 @@ public class StoreAndForwardService
/// </summary>
private static readonly TimeSpan SweepShutdownWaitTimeout = TimeSpan.FromSeconds(10);
/// <summary>
/// WP-14 (telemetry): cached count of messages currently buffered for
/// forwarding — i.e. rows in <see cref="StoreAndForwardMessageStatus.Pending"/>,
/// the live store-and-forward queue waiting to be delivered. This backs the
/// <c>scadabridge.store_and_forward.queue.depth</c> observable gauge.
/// <para>
/// The gauge's collection callback is synchronous and is invoked frequently by
/// the OpenTelemetry/Prometheus collector, so it must never run an async SQLite
/// <c>COUNT(*)</c>. Instead this <see cref="long"/> is seeded once from storage
/// in <see cref="StartAsync"/> and then adjusted in-process on the existing
/// paths that change the Pending population: <see cref="BufferAsync"/> (+1),
/// successful-retry removal and Pending→Parked transitions in
/// <see cref="RetryMessageAsync"/> (-1), and operator requeue in
/// <see cref="RetryParkedMessageAsync"/> (+1). The provider registered with
/// <see cref="ScadaBridgeTelemetry.SetQueueDepthProvider"/> reads it via
/// <see cref="Interlocked.Read"/> — non-blocking and sync-safe. It is an
/// approximate, eventually-consistent gauge (concurrent failover replication
/// applies to the standby's own store, not this counter), which is exactly
/// what a queue-depth metric needs.
/// </para>
/// </summary>
private long _bufferedCount;
/// <summary>
/// Test seam (WP-14 telemetry): simulates a concurrent pre-seed
/// <see cref="BufferAsync"/> increment landing on <see cref="_bufferedCount"/>
/// before <see cref="StartAsync"/> seeds it, so a test can prove the seed uses
/// <see cref="Interlocked.Add"/> (additive) rather than Exchange (clobbering).
/// </summary>
internal void TestOnly_IncrementBufferedCount() =>
Interlocked.Increment(ref _bufferedCount);
/// <summary>
/// WP-14 (telemetry): an instance field that guards against a single instance
/// registering the queue-depth provider (and re-seeding the counter) more than
/// once — e.g. a second <see cref="StartAsync"/> on the same instance. It does NOT
/// coordinate across instances: the gauge slot in <see cref="ScadaBridgeTelemetry"/>
/// is process-global, so in a multi-instance process the last <see cref="StartAsync"/>
/// wins the global slot. 0 = not yet registered, 1 = done.
/// </summary>
private int _queueDepthProviderRegistered;
/// <summary>
/// WP-10: Delivery handler delegate. The return value / exception is interpreted
/// the same way on both the immediate-delivery path (<see cref="EnqueueAsync"/>)
@@ -170,6 +213,27 @@ public class StoreAndForwardService
public async Task StartAsync()
{
await _storage.InitializeAsync();
// WP-14 (telemetry): seed the cached buffered-message count from the
// store exactly once (the gauge callback cannot run an async COUNT), then
// register the sync, non-blocking provider with the process-global
// ScadaBridgeTelemetry gauge. Both steps are inside the one-time guard so a
// second StartAsync on the same instance cannot double-seed.
//
// The seed is an Interlocked.Add — NOT an Exchange — to avoid a startup race:
// between the await above returning and this point, a concurrent BufferAsync
// could already have Interlocked.Increment'd _bufferedCount. Exchange would
// clobber that increment (losing a +1); Add preserves it. _bufferedCount
// starts at 0 and only BufferAsync increments it before the seed, so
// 0 + pending + (any concurrent increments) is the correct live count.
var pending = await _storage.GetMessageCountByStatusAsync(
StoreAndForwardMessageStatus.Pending);
if (Interlocked.CompareExchange(ref _queueDepthProviderRegistered, 1, 0) == 0)
{
Interlocked.Add(ref _bufferedCount, pending);
ScadaBridgeTelemetry.SetQueueDepthProvider(() => Interlocked.Read(ref _bufferedCount));
}
_retryTimer = new Timer(
// StoreAndForward-024: capture the sweep Task on each tick so
// StopAsync can await any in-flight invocation before the host
@@ -396,6 +460,10 @@ public class StoreAndForwardService
{
await _storage.EnqueueAsync(message);
_replication?.ReplicateEnqueue(message);
// WP-14 (telemetry): a freshly buffered row is Pending → grows the live
// queue depth. Bumped after the durable write so the gauge never leads the
// store.
Interlocked.Increment(ref _bufferedCount);
}
/// <summary>
@@ -452,6 +520,8 @@ public class StoreAndForwardService
{
await _storage.RemoveMessageAsync(message.Id);
_replication?.ReplicateRemove(message.Id);
// WP-14 (telemetry): a delivered row leaves the Pending queue.
Interlocked.Decrement(ref _bufferedCount);
RaiseActivity("Delivered", message.Category,
$"Delivered to {message.Target} after {message.RetryCount} retries");
@@ -483,6 +553,9 @@ public class StoreAndForwardService
message.Id);
return;
}
// WP-14 (telemetry): the row committed Pending→Parked, leaving the live
// forward queue. Only counted when the conditional update actually won.
Interlocked.Decrement(ref _bufferedCount);
_replication?.ReplicatePark(message);
RaiseActivity("Parked", message.Category,
$"Permanent failure for {message.Target}: handler returned false");
@@ -519,6 +592,9 @@ public class StoreAndForwardService
message.Id);
return;
}
// WP-14 (telemetry): the row committed Pending→Parked, leaving the
// live forward queue. Only counted when the conditional update won.
Interlocked.Decrement(ref _bufferedCount);
_replication?.ReplicatePark(message);
RaiseActivity("Parked", message.Category,
$"Max retries ({message.MaxRetries}) reached for {message.Target}");
@@ -737,6 +813,11 @@ public class StoreAndForwardService
return false;
}
// WP-14 (telemetry): an operator requeue moves Parked→Pending, re-adding the
// row to the live forward queue. Counted only when the conditional storage
// update actually flipped the row.
Interlocked.Increment(ref _bufferedCount);
// The active node just rewrote this row to Pending with retry_count = 0
// and cleared last_error / last_attempt_at (see
// StoreAndForwardStorage.RetryParkedMessageAsync). Reconstruct the
@@ -769,6 +850,7 @@ public class StoreAndForwardService
{
// Capture the category before the row is deleted so the activity log is
// labelled correctly.
// WP-14 (telemetry): Parked rows are not in _bufferedCount; discarding a Parked row needs no counter adjustment.
var message = await _storage.GetMessageByIdAsync(messageId);
var success = await _storage.DiscardParkedMessageAsync(messageId);
if (success)
@@ -1,3 +1,4 @@
using System.Diagnostics.Metrics;
using System.Threading.Channels;
using Akka.Actor;
using Akka.TestKit.Xunit2;
@@ -5,6 +6,7 @@ using Grpc.Core;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using NSubstitute;
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
using ZB.MOM.WW.ScadaBridge.Communication.Grpc;
namespace ZB.MOM.WW.ScadaBridge.Communication.Tests.Grpc;
@@ -342,6 +344,59 @@ public class SiteStreamGrpcServerTests : TestKit
subscriber.DidNotReceive().RemoveSubscriber(Arg.Any<IActorRef>());
}
[Fact]
public async Task SiteConnectionUpGauge_GoesToOneOnConnect_AndBackToZeroOnCancel()
{
// Telemetry follow-on: the scadabridge.site.connection.up gauge must read
// exactly 1 while a site stream is established and return to 0 once the
// stream terminates on the cancel path — proving SiteConnectionOpened() is
// matched by exactly one SiteConnectionClosed() in the handler's finally.
var server = CreateServer();
server.SetReady(Sys);
long ReadGauge()
{
long observed = 0;
using var listener = new MeterListener();
listener.InstrumentPublished = (instrument, l) =>
{
if (instrument.Meter.Name == ScadaBridgeTelemetry.MeterName &&
instrument.Name == "scadabridge.site.connection.up")
{
l.EnableMeasurementEvents(instrument);
}
};
listener.SetMeasurementEventCallback<long>((_, measurement, _, _) => observed = measurement);
listener.Start();
listener.RecordObservableInstruments();
return observed;
}
var baseline = ReadGauge();
var cts = new CancellationTokenSource();
var context = CreateMockContext(cts.Token);
var writer = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
var streamTask = Task.Run(() => server.SubscribeInstance(
MakeRequest("corr-gauge", "Site1.Pump01"), writer, context));
await WaitForConditionAsync(() => server.ActiveStreamCount == 1);
// While the stream is up the gauge is one above whatever baseline other
// (possibly parallel) tests left behind — read relative so the assertion
// is robust to test interleaving on the process-wide static counter.
Assert.Equal(baseline + 1, ReadGauge());
cts.Cancel();
await streamTask;
await WaitForConditionAsync(() => server.ActiveStreamCount == 0);
// After the cancel path runs the finally, the gauge is balanced back to
// the baseline — no leaked "up" count.
Assert.Equal(baseline, ReadGauge());
}
[Fact]
public void SetReady_AllowsStreamCreation()
{
@@ -1,3 +1,4 @@
using System.Diagnostics.Metrics;
using Akka.Actor;
using Akka.TestKit.Xunit2;
using Microsoft.Extensions.Logging.Abstractions;
@@ -10,6 +11,7 @@ using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Lifecycle;
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
using ZB.MOM.WW.ScadaBridge.Commons.Types;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Flattening;
@@ -558,6 +560,106 @@ public class DeploymentServiceTests : TestKit
Arg.Any<object>(), Arg.Any<CancellationToken>());
}
// ── Telemetry follow-on: scadabridge.deployments.applied on deploy success ──
[Fact]
public async Task DeployInstanceAsync_SiteSucceeds_EmitsDeploymentsAppliedCounterOnce()
{
// A successful deployment must increment the
// scadabridge.deployments.applied counter exactly once — one
// DeployInstanceAsync deploys one instance to one site, so the unit is
// one increment per successful deploy operation.
var instance = new Instance("MetricInst") { Id = 55, SiteId = 1, State = InstanceState.NotDeployed };
_repo.GetInstanceByIdAsync(55, Arg.Any<CancellationToken>()).Returns(instance);
SetupValidPipeline(55, "MetricInst", "sha256:target");
_repo.GetCurrentDeploymentStatusAsync(55, Arg.Any<CancellationToken>())
.Returns((DeploymentRecord?)null);
var counters = new ReconcileProbeCounters();
var commActor = Sys.ActorOf(Props.Create(() =>
new ReconcileProbeActor(counters, siteHash: "sha256:target", failQuery: false)));
var service = CreateServiceWithCommActor(commActor);
long applied = 0;
using var listener = new MeterListener
{
InstrumentPublished = (instrument, l) =>
{
if (instrument.Meter.Name == ScadaBridgeTelemetry.MeterName
&& instrument.Name == "scadabridge.deployments.applied")
{
l.EnableMeasurementEvents(instrument);
}
}
};
listener.SetMeasurementEventCallback<long>((_, measurement, _, _) =>
Interlocked.Add(ref applied, measurement));
listener.Start();
var result = await service.DeployInstanceAsync(55, "admin");
listener.Dispose();
Assert.True(result.IsSuccess);
// Fresh first-time deploy applied -> exactly one increment.
Assert.Equal(1, Interlocked.Read(ref applied));
}
[Fact]
public async Task DeployInstanceAsync_Reconciled_DoesNotEmitDeploymentsAppliedCounter()
{
// The reconciliation path recovers a PRIOR timed-out apply rather than
// performing a fresh one; counting it would risk double-counting the
// original apply, so scadabridge.deployments.applied must NOT increment
// on a reconciled (no re-deploy) success.
var instance = new Instance("MetricReconcileInst")
{
Id = 56, SiteId = 1, State = InstanceState.NotDeployed
};
_repo.GetInstanceByIdAsync(56, Arg.Any<CancellationToken>()).Returns(instance);
SetupValidPipeline(56, "MetricReconcileInst", "sha256:target");
var prior = new DeploymentRecord("dep-prior-56", "admin")
{
InstanceId = 56,
Status = DeploymentStatus.InProgress,
RevisionHash = "sha256:target"
};
_repo.GetCurrentDeploymentStatusAsync(56, Arg.Any<CancellationToken>()).Returns(prior);
_repo.GetDeployedSnapshotByInstanceIdAsync(56, Arg.Any<CancellationToken>())
.Returns((DeployedConfigSnapshot?)null);
var counters = new ReconcileProbeCounters();
var commActor = Sys.ActorOf(Props.Create(() =>
new ReconcileProbeActor(counters, siteHash: "sha256:target", failQuery: false)));
var service = CreateServiceWithCommActor(commActor);
long applied = 0;
using var listener = new MeterListener
{
InstrumentPublished = (instrument, l) =>
{
if (instrument.Meter.Name == ScadaBridgeTelemetry.MeterName
&& instrument.Name == "scadabridge.deployments.applied")
{
l.EnableMeasurementEvents(instrument);
}
}
};
listener.SetMeasurementEventCallback<long>((_, measurement, _, _) =>
Interlocked.Add(ref applied, measurement));
listener.Start();
var result = await service.DeployInstanceAsync(56, "admin");
listener.Dispose();
Assert.True(result.IsSuccess);
// Reconciled — no fresh deploy was sent, so no increment.
Assert.Equal(0, counters.DeployCount);
Assert.Equal(0, Interlocked.Read(ref applied));
}
// ── DeploymentManager-011: lifecycle success paths ──
[Fact]
@@ -0,0 +1,32 @@
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
namespace ZB.MOM.WW.ScadaBridge.Host.Tests;
/// <summary>
/// Infrastructure-free guards for <see cref="ScadaBridgeTelemetry"/>: the meter name is the
/// stable value registered with OTel, and the emit helpers are safe to call (they are no-op
/// until follow-on tasks wire real emit points and a listener attaches).
/// </summary>
public class ScadaBridgeTelemetryTests
{
[Fact]
public void MeterName_IsStableValue()
{
Assert.Equal("ZB.MOM.WW.ScadaBridge", ScadaBridgeTelemetry.MeterName);
}
[Fact]
public void EmitHelpers_DoNotThrow()
{
var ex = Record.Exception(() =>
{
ScadaBridgeTelemetry.RecordDeploymentApplied();
ScadaBridgeTelemetry.RecordInboundApiRequest("X");
ScadaBridgeTelemetry.SiteConnectionOpened();
ScadaBridgeTelemetry.SiteConnectionClosed();
ScadaBridgeTelemetry.SetQueueDepthProvider(() => 5);
});
Assert.Null(ex);
}
}
@@ -352,6 +352,105 @@ public class StartupValidatorTests
Assert.Null(ex);
}
[Theory]
[InlineData("0")]
[InlineData("-1")]
[InlineData("65536")]
[InlineData("abc")]
public void Site_InvalidMetricsPort_FailsValidation(string metricsPort)
{
var values = ValidSiteConfig();
values["ScadaBridge:Node:MetricsPort"] = metricsPort;
var config = BuildConfig(values);
var ex = Assert.Throws<InvalidOperationException>(() => StartupValidator.Validate(config));
Assert.Contains("MetricsPort must be 1-65535", ex.Message);
}
[Fact]
public void Site_ValidMetricsPort_PassesValidation()
{
var values = ValidSiteConfig();
values["ScadaBridge:Node:MetricsPort"] = "8084";
var config = BuildConfig(values);
var ex = Record.Exception(() => StartupValidator.Validate(config));
Assert.Null(ex);
}
[Fact]
public void Site_MetricsPortEqualsRemotingPort_FailsValidation()
{
// Host-007 regression: the Kestrel metrics (HTTP/1.1) listener port must
// differ from RemotingPort. Identical values cause the metrics listener
// and Akka.Remote to contend for the same port at runtime.
var values = ValidSiteConfig();
values["ScadaBridge:Node:RemotingPort"] = "8082";
values["ScadaBridge:Node:MetricsPort"] = "8082";
var config = BuildConfig(values);
var ex = Assert.Throws<InvalidOperationException>(() => StartupValidator.Validate(config));
Assert.Contains("MetricsPort must differ from RemotingPort", ex.Message);
}
[Fact]
public void Site_MetricsPortEqualsGrpcPort_FailsValidation()
{
// Host-007 regression: the metrics listener port must differ from GrpcPort.
var values = ValidSiteConfig();
values["ScadaBridge:Node:GrpcPort"] = "8083";
values["ScadaBridge:Node:MetricsPort"] = "8083";
var config = BuildConfig(values);
var ex = Assert.Throws<InvalidOperationException>(() => StartupValidator.Validate(config));
Assert.Contains("MetricsPort must differ from GrpcPort", ex.Message);
}
[Fact]
public void Site_DefaultMetricsPortEqualsRemotingPort_FailsValidation()
{
// MetricsPort absent => NodeOptions default 8084. A site whose RemotingPort
// is also 8084 must still be rejected.
var values = ValidSiteConfig();
values["ScadaBridge:Node:RemotingPort"] = "8084";
// Keep GrpcPort distinct so only the metrics-vs-remoting rule fires.
values["ScadaBridge:Node:GrpcPort"] = "8083";
// Seed nodes default to the remoting port (8082) in ValidSiteConfig; realign
// them to 8084 so the seed-vs-grpc rule is not what trips here.
values["ScadaBridge:Cluster:SeedNodes:0"] = "akka.tcp://scadabridge@site-a-node1:8084";
values["ScadaBridge:Cluster:SeedNodes:1"] = "akka.tcp://scadabridge@site-a-node2:8084";
var config = BuildConfig(values);
var ex = Assert.Throws<InvalidOperationException>(() => StartupValidator.Validate(config));
Assert.Contains("MetricsPort must differ from RemotingPort", ex.Message);
}
[Fact]
public void Site_MetricsPortDiffersFromRemotingAndGrpc_PassesValidation()
{
var values = ValidSiteConfig();
values["ScadaBridge:Node:RemotingPort"] = "8082";
values["ScadaBridge:Node:GrpcPort"] = "8083";
values["ScadaBridge:Node:MetricsPort"] = "8084";
var config = BuildConfig(values);
var ex = Record.Exception(() => StartupValidator.Validate(config));
Assert.Null(ex);
}
[Fact]
public void Central_InvalidMetricsPort_NotValidated()
{
// The metrics-port rules apply to Site nodes only; a Central node runs no
// metrics listener, so an out-of-range MetricsPort must not fail startup.
var values = ValidCentralConfig();
values["ScadaBridge:Node:MetricsPort"] = "0";
var config = BuildConfig(values);
var ex = Record.Exception(() => StartupValidator.Validate(config));
Assert.Null(ex);
}
[Fact]
public void MultipleErrors_AllReported()
{
@@ -9,8 +9,10 @@ using NSubstitute;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.InboundApi;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
using ZB.MOM.WW.ScadaBridge.Commons.Types.InboundApi;
using ZB.MOM.WW.ScadaBridge.InboundAPI.Middleware;
using System.Diagnostics.Metrics;
using System.Net;
using System.Text;
@@ -232,6 +234,114 @@ public class EndpointExtensionsTests
Assert.Equal("audit-actor-name", capture.CapturedActor);
}
[Fact]
public async Task ValidRequest_EmitsInboundApiRequestCounter_TaggedWithResolvedMethodName()
{
// Telemetry follow-on: a successful inbound request increments
// scadabridge.inbound_api.requests once, tagged with the resolved,
// registered method name (method.Name) — the bounded identifier, not the
// raw route value.
var key = SeedKey();
var method = SeedMethod(1, "echo", "return Parameters[\"value\"];",
"""[{"name":"value","type":"Integer","required":true}]""");
using var collector = new InboundApiRequestCounterCollector();
using var host = await BuildHostAsync(key, method);
var client = host.GetTestClient();
var response = await client.SendAsync(BuildPost("echo", """{"value":7}"""));
Assert.Equal(HttpStatusCode.OK, response.StatusCode);
// Filter by the method tag this test produced: the counter is a process-wide
// static, so a parallel test class could otherwise leak measurements in.
var echoTotal = collector.Measurements
.Where(m => m.Method == "echo")
.Sum(m => m.Value);
Assert.Equal(1, echoTotal);
}
[Fact]
public async Task UnknownMethod_EmitsInboundApiRequestCounter_WithBoundedForbiddenSentinel()
{
// Telemetry follow-on: an auth/authz failure is still counted, but the
// tag is a bounded sentinel ("<forbidden>") rather than the arbitrary
// caller-supplied route value — so an attacker posting random method
// names cannot blow up the `method` tag cardinality.
var key = SeedKey();
var method = SeedMethod(1, "knownMethod", "return 1;");
using var collector = new InboundApiRequestCounterCollector();
using var host = await BuildHostAsync(key, method);
var client = host.GetTestClient();
var response = await client.SendAsync(BuildPost("totally-made-up-name", "{}"));
Assert.Equal(HttpStatusCode.Forbidden, response.StatusCode);
var measurements = collector.Measurements;
// Cardinality safety: the arbitrary route value is never used as a tag.
Assert.DoesNotContain(measurements, m => m.Method == "totally-made-up-name");
// The failure path counts the request against the bounded sentinel.
Assert.Contains(measurements, m => m.Method == "<forbidden>" && m.Value == 1);
}
/// <summary>
/// Captures <c>scadabridge.inbound_api.requests</c> measurements (value + the
/// <c>method</c> tag) via a <see cref="MeterListener"/> for the duration of a test.
/// </summary>
private sealed class InboundApiRequestCounterCollector : IDisposable
{
private readonly MeterListener _listener;
private readonly List<(long Value, string? Method)> _measurements = new();
private readonly object _gate = new();
public InboundApiRequestCounterCollector()
{
_listener = new MeterListener
{
InstrumentPublished = (instrument, listener) =>
{
if (instrument.Meter.Name == ScadaBridgeTelemetry.MeterName
&& instrument.Name == "scadabridge.inbound_api.requests")
{
listener.EnableMeasurementEvents(instrument);
}
},
};
_listener.SetMeasurementEventCallback<long>((instrument, value, tags, state) =>
{
string? method = null;
foreach (var tag in tags)
{
if (tag.Key == "method")
{
method = tag.Value as string;
}
}
lock (_gate)
{
_measurements.Add((value, method));
}
});
_listener.Start();
}
public IReadOnlyList<(long Value, string? Method)> Measurements
{
get
{
lock (_gate)
{
return _measurements.ToList();
}
}
}
public void Dispose() => _listener.Dispose();
}
private static HttpRequestMessage BuildPost(string methodName, string body)
{
var request = new HttpRequestMessage(HttpMethod.Post, "/api/" + methodName)
@@ -0,0 +1,211 @@
using System.Diagnostics.Metrics;
using Microsoft.Data.Sqlite;
using Microsoft.Extensions.Logging.Abstractions;
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
namespace ZB.MOM.WW.ScadaBridge.StoreAndForward.Tests;
/// <summary>
/// WP-14 (telemetry follow-on): verifies the cached buffered-message counter that
/// backs the <c>scadabridge.store_and_forward.queue.depth</c> observable gauge tracks
/// the live (Pending) queue across the existing enqueue / drain / park / requeue paths,
/// and that the sync gauge callback reports it.
///
/// The gauge is read the way the OpenTelemetry collector reads it — via a
/// <see cref="MeterListener"/> that forces an observation (the callback is synchronous
/// and does no I/O, which is the whole point of caching the count). <see cref="StartAsync"/>
/// seeds the counter from storage and registers the provider against this service
/// instance, so the gauge resolves to this test's counter.
/// </summary>
public class QueueDepthGaugeTests : IAsyncLifetime, IDisposable
{
private readonly SqliteConnection _keepAlive;
private readonly StoreAndForwardStorage _storage;
private readonly StoreAndForwardService _service;
public QueueDepthGaugeTests()
{
var dbName = $"QueueDepthTests_{Guid.NewGuid():N}";
var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared";
_keepAlive = new SqliteConnection(connStr);
_keepAlive.Open();
_storage = new StoreAndForwardStorage(connStr, NullLogger<StoreAndForwardStorage>.Instance);
var options = new StoreAndForwardOptions
{
DefaultRetryInterval = TimeSpan.Zero,
DefaultMaxRetries = 3,
// Long interval so no background sweep fires on its own during the test;
// sweeps are driven explicitly via RetryPendingMessagesAsync.
RetryTimerInterval = TimeSpan.FromMinutes(10)
};
_service = new StoreAndForwardService(
_storage, options, NullLogger<StoreAndForwardService>.Instance);
}
public async Task InitializeAsync()
{
await _storage.InitializeAsync();
// StartAsync seeds _bufferedCount from the (empty) store and registers the
// queue-depth provider against this service instance.
await _service.StartAsync();
}
public async Task DisposeAsync() => await _service.StopAsync();
public void Dispose() => _keepAlive.Dispose();
/// <summary>
/// Reads the current value of the <c>scadabridge.store_and_forward.queue.depth</c>
/// gauge by forcing a synchronous observation through a transient MeterListener —
/// exactly the path the Prometheus/OTLP collector exercises on each scrape.
/// </summary>
private static long ReadQueueDepthGauge()
{
long observed = -1;
using var listener = new MeterListener
{
InstrumentPublished = (instrument, l) =>
{
if (instrument.Meter.Name == ScadaBridgeTelemetry.MeterName &&
instrument.Name == "scadabridge.store_and_forward.queue.depth")
{
l.EnableMeasurementEvents(instrument);
}
}
};
listener.SetMeasurementEventCallback<long>((_, measurement, _, _) => observed = measurement);
listener.Start();
listener.RecordObservableInstruments();
return observed;
}
[Fact]
public async Task Gauge_TracksBufferedDepth_AcrossEnqueueDrainAndPark()
{
// Empty store seeded at StartAsync → gauge reports 0.
Assert.Equal(0, ReadQueueDepthGauge());
// A handler that fails transiently so each enqueue buffers a Pending row
// (immediate attempt 0 throws → BufferAsync → +1).
var deliver = false;
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
_ =>
{
if (!deliver) throw new HttpRequestException("transient");
return Task.FromResult(true);
});
// Enqueue 3 → cached depth = 3 → gauge reports 3.
for (var i = 0; i < 3; i++)
{
var r = await _service.EnqueueAsync(
StoreAndForwardCategory.ExternalSystem, "api", """{}""");
Assert.True(r.WasBuffered);
}
Assert.Equal(3, ReadQueueDepthGauge());
// Drain: handler now succeeds → the retry sweep removes all 3 Pending rows → depth 0.
deliver = true;
await _service.RetryPendingMessagesAsync();
Assert.Equal(0, ReadQueueDepthGauge());
// Park path: buffer one more, then make it park (maxRetries:1 parks after one
// sweep). Pending→Parked leaves the live queue → depth back to 0.
deliver = false;
var parkResult = await _service.EnqueueAsync(
StoreAndForwardCategory.ExternalSystem, "api", """{}""", maxRetries: 1);
Assert.True(parkResult.WasBuffered);
Assert.Equal(1, ReadQueueDepthGauge());
await _service.RetryPendingMessagesAsync();
var parked = await _storage.GetMessageByIdAsync(parkResult.MessageId);
Assert.Equal(StoreAndForwardMessageStatus.Parked, parked!.Status);
Assert.Equal(0, ReadQueueDepthGauge());
// Operator requeue: Parked→Pending re-adds to the live queue → depth 1.
Assert.True(await _service.RetryParkedMessageAsync(parkResult.MessageId));
Assert.Equal(1, ReadQueueDepthGauge());
}
[Fact]
public async Task Gauge_SeedsFromExistingPendingRows_OnStart()
{
// Pre-seed two Pending rows directly in storage *before* a fresh service starts,
// simulating a process restart over a non-empty buffer. StartAsync must seed the
// cached counter from the store so the gauge does not under-report on restart.
await _storage.EnqueueAsync(new StoreAndForwardMessage
{
Id = Guid.NewGuid().ToString("N"),
Category = StoreAndForwardCategory.ExternalSystem,
Target = "api",
PayloadJson = "{}",
Status = StoreAndForwardMessageStatus.Pending,
CreatedAt = DateTimeOffset.UtcNow,
MaxRetries = 3
});
await _storage.EnqueueAsync(new StoreAndForwardMessage
{
Id = Guid.NewGuid().ToString("N"),
Category = StoreAndForwardCategory.Notification,
Target = "list",
PayloadJson = "{}",
Status = StoreAndForwardMessageStatus.Pending,
CreatedAt = DateTimeOffset.UtcNow,
MaxRetries = 3
});
var fresh = new StoreAndForwardService(
_storage,
new StoreAndForwardOptions { RetryTimerInterval = TimeSpan.FromMinutes(10) },
NullLogger<StoreAndForwardService>.Instance);
try
{
await fresh.StartAsync();
// The fresh service registered itself as the global provider and seeded 2.
Assert.Equal(2, ReadQueueDepthGauge());
}
finally
{
await fresh.StopAsync();
}
}
/// <summary>
/// Review finding (FINDING 1): the startup seed must ADD to whatever the counter
/// already holds, not overwrite it. A concurrent <c>BufferAsync</c> can
/// <c>Interlocked.Increment</c> <c>_bufferedCount</c> in the window between
/// <c>StartAsync</c>'s async <c>COUNT(*)</c> returning and the seed running; with an
/// <c>Interlocked.Exchange</c> seed that increment would be clobbered (lost +1). This
/// pre-increments the in-memory counter (standing in for that concurrent enqueue),
/// then starts the service over an empty store and asserts the pre-increment survives.
/// </summary>
[Fact]
public async Task Gauge_SeedAddsToConcurrentPreSeedIncrement_NotClobber()
{
// Store is empty (StartAsync's pending COUNT(*) = 0), so the only contribution
// is the simulated concurrent pre-seed enqueue increment.
var fresh = new StoreAndForwardService(
_storage,
new StoreAndForwardOptions { RetryTimerInterval = TimeSpan.FromMinutes(10) },
NullLogger<StoreAndForwardService>.Instance);
// Stand in for a BufferAsync increment that landed before StartAsync seeded.
fresh.TestOnly_IncrementBufferedCount();
try
{
await fresh.StartAsync();
// Add(0 seed) over the pre-existing +1 → 1. An Exchange(0) seed would clobber
// it to 0, losing the concurrent enqueue — the bug this fix prevents.
Assert.Equal(1, ReadQueueDepthGauge());
}
finally
{
await fresh.StopAsync();
}
}
}