diff --git a/docker/site-a-node-a/appsettings.Site.json b/docker/site-a-node-a/appsettings.Site.json index 841b93e1..44f588fe 100644 --- a/docker/site-a-node-a/appsettings.Site.json +++ b/docker/site-a-node-a/appsettings.Site.json @@ -6,7 +6,8 @@ "NodeHostname": "scadabridge-site-a-a", "SiteId": "site-a", "RemotingPort": 8082, - "GrpcPort": 8083 + "GrpcPort": 8083, + "MetricsPort": 8084 }, "Cluster": { "SeedNodes": [ diff --git a/docker/site-a-node-b/appsettings.Site.json b/docker/site-a-node-b/appsettings.Site.json index 1ab9fbb8..7a3a9dec 100644 --- a/docker/site-a-node-b/appsettings.Site.json +++ b/docker/site-a-node-b/appsettings.Site.json @@ -6,7 +6,8 @@ "NodeHostname": "scadabridge-site-a-b", "SiteId": "site-a", "RemotingPort": 8082, - "GrpcPort": 8083 + "GrpcPort": 8083, + "MetricsPort": 8084 }, "Cluster": { "SeedNodes": [ diff --git a/docker/site-b-node-a/appsettings.Site.json b/docker/site-b-node-a/appsettings.Site.json index fda43640..88a7c8af 100644 --- a/docker/site-b-node-a/appsettings.Site.json +++ b/docker/site-b-node-a/appsettings.Site.json @@ -6,7 +6,8 @@ "NodeHostname": "scadabridge-site-b-a", "SiteId": "site-b", "RemotingPort": 8082, - "GrpcPort": 8083 + "GrpcPort": 8083, + "MetricsPort": 8084 }, "Cluster": { "SeedNodes": [ diff --git a/docker/site-b-node-b/appsettings.Site.json b/docker/site-b-node-b/appsettings.Site.json index 451f9650..9334191d 100644 --- a/docker/site-b-node-b/appsettings.Site.json +++ b/docker/site-b-node-b/appsettings.Site.json @@ -6,7 +6,8 @@ "NodeHostname": "scadabridge-site-b-b", "SiteId": "site-b", "RemotingPort": 8082, - "GrpcPort": 8083 + "GrpcPort": 8083, + "MetricsPort": 8084 }, "Cluster": { "SeedNodes": [ diff --git a/docker/site-c-node-a/appsettings.Site.json b/docker/site-c-node-a/appsettings.Site.json index 3af46da7..33b17691 100644 --- a/docker/site-c-node-a/appsettings.Site.json +++ b/docker/site-c-node-a/appsettings.Site.json @@ -6,7 +6,8 @@ "NodeHostname": "scadabridge-site-c-a", "SiteId": "site-c", "RemotingPort": 8082, - "GrpcPort": 8083 + "GrpcPort": 8083, + "MetricsPort": 8084 }, "Cluster": { "SeedNodes": [ diff --git a/docker/site-c-node-b/appsettings.Site.json b/docker/site-c-node-b/appsettings.Site.json index eec984d1..5a69e034 100644 --- a/docker/site-c-node-b/appsettings.Site.json +++ b/docker/site-c-node-b/appsettings.Site.json @@ -6,7 +6,8 @@ "NodeHostname": "scadabridge-site-c-b", "SiteId": "site-c", "RemotingPort": 8082, - "GrpcPort": 8083 + "GrpcPort": 8083, + "MetricsPort": 8084 }, "Cluster": { "SeedNodes": [ diff --git a/src/ZB.MOM.WW.ScadaBridge.Commons/Observability/ScadaBridgeTelemetry.cs b/src/ZB.MOM.WW.ScadaBridge.Commons/Observability/ScadaBridgeTelemetry.cs new file mode 100644 index 00000000..1b382bab --- /dev/null +++ b/src/ZB.MOM.WW.ScadaBridge.Commons/Observability/ScadaBridgeTelemetry.cs @@ -0,0 +1,94 @@ +using System.Diagnostics.Metrics; + +namespace ZB.MOM.WW.ScadaBridge.Commons.Observability; + +/// +/// Central + instrument definitions for ScadaBridge's application +/// telemetry, modelled on OtOpcUa's OtOpcUaTelemetry. Modules emit through these +/// pre-created instruments so a single OpenTelemetry / Prometheus binding in +/// Host (registered via AddZbTelemetry with this meter named in +/// ZbTelemetryOptions.Meters) catches everything. No exporter is required — +/// instruments are no-op until a listener attaches, so tests and dev hosts pay nothing +/// for instrumentation that nobody scrapes. +/// +/// Instrument names follow the OpenTelemetry semantic convention pattern +/// scadabridge.<subsystem>.<event>. This task defines the instruments and +/// their emit helpers; four later tasks wire the actual emit points. Until those land the +/// helpers are dormant but inert — calling them is safe and simply records against a meter +/// that nothing observes. +/// +public static class ScadaBridgeTelemetry +{ + /// The meter name registered with OTel via ZbTelemetryOptions.Meters. + public const string MeterName = "ZB.MOM.WW.ScadaBridge"; + + /// Singleton all instruments hang off. + private static readonly Meter Meter = new(MeterName); + + // ---------------- Counters ---------------- + + /// Incremented each time a deployment is successfully applied. + private static readonly Counter _deploymentsApplied = + Meter.CreateCounter("scadabridge.deployments.applied", unit: "1", + description: "Deployments applied."); + + /// Incremented for each inbound API request, tagged with the API method. + private static readonly Counter _inboundApiRequests = + Meter.CreateCounter("scadabridge.inbound_api.requests", unit: "1", + description: "Inbound API requests, tagged by method."); + + // ---------------- Observable gauges ---------------- + + /// Current count of open site connections, mutated via . + private static long _siteConnectionsUp; + + /// Provider that yields the live StoreAndForward queue depth; set by a later task. + private static Func? _queueDepthProvider; + +#pragma warning disable IDE0052 // Held to keep the observable gauges alive for the meter's lifetime. + /// Gauge reporting the number of currently open site connections. + private static readonly ObservableGauge _siteConnectionUp = + Meter.CreateObservableGauge("scadabridge.site.connection.up", + () => Interlocked.Read(ref _siteConnectionsUp), + description: "Number of currently open site connections."); + + /// Gauge reporting the current StoreAndForward queue depth via the registered provider. + private static readonly ObservableGauge _storeAndForwardQueueDepth = + Meter.CreateObservableGauge("scadabridge.store_and_forward.queue.depth", + () => Volatile.Read(ref _queueDepthProvider) is { } p ? p() : 0L, + unit: "items", + description: "Current StoreAndForward queue depth."); +#pragma warning restore IDE0052 + + // ---------------- Emit helpers ---------------- + + /// Records that a deployment was applied. + public static void RecordDeploymentApplied() => _deploymentsApplied.Add(1); + + /// Records an inbound API request for the given . + /// The API method the request targeted. + public static void RecordInboundApiRequest(string method) => + _inboundApiRequests.Add(1, new KeyValuePair("method", method)); + + /// Records that a site connection opened (increments the up-count gauge). + public static void SiteConnectionOpened() => Interlocked.Increment(ref _siteConnectionsUp); + + /// Records that a site connection closed (decrements the up-count gauge). + public static void SiteConnectionClosed() => Interlocked.Decrement(ref _siteConnectionsUp); + + /// + /// Registers the provider the StoreAndForward queue-depth gauge reads on each observation. + /// A later task supplies a provider that reads the real StoreAndForward depth. A null + /// provider is ignored so the gauge falls back to reporting 0. + /// + /// A callback returning the current queue depth. + public static void SetQueueDepthProvider(Func provider) + { + if (provider is null) + { + return; + } + + Volatile.Write(ref _queueDepthProvider, provider); + } +} diff --git a/src/ZB.MOM.WW.ScadaBridge.Communication/Grpc/SiteStreamGrpcServer.cs b/src/ZB.MOM.WW.ScadaBridge.Communication/Grpc/SiteStreamGrpcServer.cs index 00a1393d..5900f6e5 100644 --- a/src/ZB.MOM.WW.ScadaBridge.Communication/Grpc/SiteStreamGrpcServer.cs +++ b/src/ZB.MOM.WW.ScadaBridge.Communication/Grpc/SiteStreamGrpcServer.cs @@ -7,6 +7,7 @@ using Microsoft.Extensions.Options; using ZB.MOM.WW.ScadaBridge.Commons.Entities.Audit; using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Audit; +using ZB.MOM.WW.ScadaBridge.Commons.Observability; using GrpcStatus = Grpc.Core.Status; namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc; @@ -264,6 +265,14 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase "Stream {CorrelationId} started for {Instance} (subscription {SubscriptionId})", request.CorrelationId, request.InstanceUniqueName, subscriptionId); + // Telemetry follow-on: the connection is now fully established (Subscribe + // succeeded, so no leak via the catch above). Count it up here and balance + // it in the finally below so the scadabridge.site.connection.up gauge is + // decremented on EVERY exit path — normal completion, client-cancel / + // duplicate-replacement (OperationCanceledException), server shutdown + // (CancelAllStreams -> Cts.Cancel), and any other exception — guaranteeing + // exactly one Closed per Opened and a gauge that never drifts up. + ScadaBridgeTelemetry.SiteConnectionOpened(); try { await foreach (var evt in channel.Reader.ReadAllAsync(streamCts.Token)) @@ -277,6 +286,7 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase } finally { + ScadaBridgeTelemetry.SiteConnectionClosed(); _streamSubscriber.RemoveSubscriber(relayActor); _actorSystem!.Stop(relayActor); channel.Writer.TryComplete(); diff --git a/src/ZB.MOM.WW.ScadaBridge.DeploymentManager/DeploymentService.cs b/src/ZB.MOM.WW.ScadaBridge.DeploymentManager/DeploymentService.cs index 6d583a63..1ef2c715 100644 --- a/src/ZB.MOM.WW.ScadaBridge.DeploymentManager/DeploymentService.cs +++ b/src/ZB.MOM.WW.ScadaBridge.DeploymentManager/DeploymentService.cs @@ -7,6 +7,7 @@ using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories; using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Lifecycle; +using ZB.MOM.WW.ScadaBridge.Commons.Observability; using ZB.MOM.WW.ScadaBridge.Commons.Types; using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; using ZB.MOM.WW.ScadaBridge.Commons.Types.Flattening; @@ -244,6 +245,16 @@ public class DeploymentService if (response.Status == DeploymentStatus.Success) { + // Telemetry: one instance deployment successfully applied to a + // site. Counted once per successful deploy operation (the unit + // of scadabridge.deployments.applied — one DeployInstanceAsync + // deploys exactly one instance to one site). Emitted only on this + // confirmed-Success path, so failures, timeouts/retries (the + // catch block), and the reconciliation path (which recovers a + // PRIOR timed-out apply rather than performing a fresh one) do + // not increment it. + ScadaBridgeTelemetry.RecordDeploymentApplied(); + // The site has applied the deployment. The post-success // persistence below is best-effort: a failure here must be // logged loudly for operator reconciliation but must not flip diff --git a/src/ZB.MOM.WW.ScadaBridge.Host/NodeOptions.cs b/src/ZB.MOM.WW.ScadaBridge.Host/NodeOptions.cs index 8cad6907..a4dd0c39 100644 --- a/src/ZB.MOM.WW.ScadaBridge.Host/NodeOptions.cs +++ b/src/ZB.MOM.WW.ScadaBridge.Host/NodeOptions.cs @@ -20,4 +20,11 @@ public class NodeOptions public int RemotingPort { get; set; } = 8081; /// Gets or sets the gRPC port for the site stream server. public int GrpcPort { get; set; } = 8083; + /// + /// HTTP/1.1 port serving the Prometheus /metrics scrape endpoint on site nodes. + /// Defaults to 8084 — deliberately distinct from (8082) + /// and (8083) so the Kestrel metrics listener never contends + /// with the Akka remoting port a site node binds. + /// + public int MetricsPort { get; set; } = 8084; } diff --git a/src/ZB.MOM.WW.ScadaBridge.Host/Program.cs b/src/ZB.MOM.WW.ScadaBridge.Host/Program.cs index 73f84f7c..43d32967 100644 --- a/src/ZB.MOM.WW.ScadaBridge.Host/Program.cs +++ b/src/ZB.MOM.WW.ScadaBridge.Host/Program.cs @@ -293,6 +293,13 @@ try // Read GrpcPort from config (NodeOptions already has default 8083) var grpcPort = configuration.GetValue("ScadaBridge:Node:GrpcPort", 8083); + // Read MetricsPort from config (NodeOptions already has default 8084). + // Separate HTTP/1.1 listener so a standard HTTP/1.1 Prometheus scraper can + // reach /metrics; the gRPC port stays HTTP/2-only below. The default is + // 8084 — distinct from RemotingPort (8082, Akka) and GrpcPort (8083) so the + // metrics listener never collides with the Akka remoting port on site nodes. + var metricsPort = configuration.GetValue("ScadaBridge:Node:MetricsPort", 8084); + // Configure Kestrel for HTTP/2 only on the gRPC port builder.WebHost.ConfigureKestrel(options => { @@ -300,6 +307,13 @@ try { listenOptions.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http2; }); + + // Dedicated HTTP/1.1 (and HTTP/2) listener for the Prometheus /metrics + // scrape endpoint, reachable by an HTTP/1.1 scraper. + options.ListenAnyIP(metricsPort, listenOptions => + { + listenOptions.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http1AndHttp2; + }); }); // gRPC server registration diff --git a/src/ZB.MOM.WW.ScadaBridge.Host/SiteServiceRegistration.cs b/src/ZB.MOM.WW.ScadaBridge.Host/SiteServiceRegistration.cs index 92cf8b89..564d2a6d 100644 --- a/src/ZB.MOM.WW.ScadaBridge.Host/SiteServiceRegistration.cs +++ b/src/ZB.MOM.WW.ScadaBridge.Host/SiteServiceRegistration.cs @@ -2,6 +2,7 @@ using ZB.MOM.WW.ScadaBridge.AuditLog; using ZB.MOM.WW.ScadaBridge.ClusterInfrastructure; using ZB.MOM.WW.ScadaBridge.Communication; using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services; +using ZB.MOM.WW.ScadaBridge.Commons.Observability; using ZB.MOM.WW.ScadaBridge.DataConnectionLayer; using ZB.MOM.WW.ScadaBridge.ExternalSystemGateway; using ZB.MOM.WW.ScadaBridge.HealthMonitoring; @@ -122,12 +123,19 @@ public static class SiteServiceRegistration // the always-on Prometheus exporter. Mount the /metrics scrape endpoint per role // with app.MapZbMetrics(). The same `?? "central"` SiteId default Program.cs uses // is applied here so the Resource attribute matches the log-enricher value. - // Meters left empty — application instruments are a deferred follow-on. + // The application meter is named so OTel observes its instruments; emit points are + // wired by follow-on tasks (the instruments are no-op until a listener attaches). services.AddZbTelemetry(o => { o.ServiceName = "scadabridge"; o.SiteId = config["ScadaBridge:Node:SiteId"] ?? "central"; o.NodeRole = config["ScadaBridge:Node:Role"]; + o.Meters = [ScadaBridgeTelemetry.MeterName]; + if (Enum.TryParse(config["ScadaBridge:Telemetry:Exporter"], ignoreCase: true, out var exporter)) + o.Exporter = exporter; + var otlp = config["ScadaBridge:Telemetry:OtlpEndpoint"]; + if (!string.IsNullOrWhiteSpace(otlp)) + o.OtlpEndpoint = otlp; }); } } diff --git a/src/ZB.MOM.WW.ScadaBridge.Host/StartupValidator.cs b/src/ZB.MOM.WW.ScadaBridge.Host/StartupValidator.cs index 90cda76a..7c0a9121 100644 --- a/src/ZB.MOM.WW.ScadaBridge.Host/StartupValidator.cs +++ b/src/ZB.MOM.WW.ScadaBridge.Host/StartupValidator.cs @@ -58,6 +58,21 @@ public static class StartupValidator if (port == grpcPort) errors.Add("ScadaBridge:Node:GrpcPort must differ from RemotingPort"); + var metricsPortStr = nodeSection["MetricsPort"]; + int metricsPort = 8084; // NodeOptions default when the key is absent + if (metricsPortStr != null && (!int.TryParse(metricsPortStr, out metricsPort) || metricsPort < 1 || metricsPort > 65535)) + errors.Add("ScadaBridge:Node:MetricsPort must be 1-65535"); + + // Host-007 / REQ-HOST-4: the Kestrel metrics (HTTP/1.1) listener port + // must differ from BOTH the Akka remoting port and the gRPC port. + // A collision makes the metrics listener contend with Akka.Remote or + // the gRPC listener for the same TCP port and fail opaquely at runtime. + // Uses the resolved MetricsPort, including the 8084 default. + if (metricsPort == port) + errors.Add("ScadaBridge:Node:MetricsPort must differ from RemotingPort"); + if (metricsPort == grpcPort) + errors.Add("ScadaBridge:Node:MetricsPort must differ from GrpcPort"); + var dbSection = configuration.GetSection("ScadaBridge:Database"); if (string.IsNullOrEmpty(dbSection["SiteDbPath"])) errors.Add("ScadaBridge:Database:SiteDbPath required for Site nodes"); diff --git a/src/ZB.MOM.WW.ScadaBridge.Host/appsettings.Site.json b/src/ZB.MOM.WW.ScadaBridge.Host/appsettings.Site.json index b1faee82..ee0fa91b 100644 --- a/src/ZB.MOM.WW.ScadaBridge.Host/appsettings.Site.json +++ b/src/ZB.MOM.WW.ScadaBridge.Host/appsettings.Site.json @@ -7,6 +7,7 @@ "SiteId": "site-a", "RemotingPort": 8082, "GrpcPort": 8083, + "MetricsPort": 8084, "NodeName": "node-a" }, "Cluster": { diff --git a/src/ZB.MOM.WW.ScadaBridge.InboundAPI/EndpointExtensions.cs b/src/ZB.MOM.WW.ScadaBridge.InboundAPI/EndpointExtensions.cs index ba451c54..a04c3f2e 100644 --- a/src/ZB.MOM.WW.ScadaBridge.InboundAPI/EndpointExtensions.cs +++ b/src/ZB.MOM.WW.ScadaBridge.InboundAPI/EndpointExtensions.cs @@ -5,6 +5,7 @@ using Microsoft.AspNetCore.Routing; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; +using ZB.MOM.WW.ScadaBridge.Commons.Observability; using ZB.MOM.WW.ScadaBridge.InboundAPI.Middleware; namespace ZB.MOM.WW.ScadaBridge.InboundAPI; @@ -44,6 +45,16 @@ public static class EndpointExtensions if (!validationResult.IsValid) { + // Telemetry follow-on: count every inbound request, including auth + // failures. The raw {methodName} route value is arbitrary caller input + // and would be high-cardinality, so failures are tagged with a small + // bounded set of sentinels keyed off the validator's status code rather + // than the unvalidated name (401 → "", 403 → ""). + ScadaBridgeTelemetry.RecordInboundApiRequest( + validationResult.StatusCode == StatusCodes.Status401Unauthorized + ? "" + : ""); + // WP-5: Failures-only logging logger.LogWarning( "Inbound API auth failure for method {Method}: {Error} (status {StatusCode})", @@ -56,6 +67,12 @@ public static class EndpointExtensions var method = validationResult.Method!; + // Telemetry follow-on: count this inbound request against the resolved, + // registered method name. method.Name comes from the repository's method + // catalogue (an exact-name lookup), so the `method` tag is bounded to the + // set of configured API methods — never the raw caller-supplied route value. + ScadaBridgeTelemetry.RecordInboundApiRequest(method.Name); + // Audit Log (#23 M4 Bundle D): publish the resolved API key name so // AuditWriteMiddleware can populate AuditEvent.Actor in its finally // block. Done AFTER validation succeeded — auth failures leave the diff --git a/src/ZB.MOM.WW.ScadaBridge.StoreAndForward/StoreAndForwardService.cs b/src/ZB.MOM.WW.ScadaBridge.StoreAndForward/StoreAndForwardService.cs index b00ddba2..36cf4702 100644 --- a/src/ZB.MOM.WW.ScadaBridge.StoreAndForward/StoreAndForwardService.cs +++ b/src/ZB.MOM.WW.ScadaBridge.StoreAndForward/StoreAndForwardService.cs @@ -1,5 +1,6 @@ using Microsoft.Extensions.Logging; using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services; +using ZB.MOM.WW.ScadaBridge.Commons.Observability; using ZB.MOM.WW.ScadaBridge.Commons.Types; using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; @@ -98,6 +99,48 @@ public class StoreAndForwardService /// private static readonly TimeSpan SweepShutdownWaitTimeout = TimeSpan.FromSeconds(10); + /// + /// WP-14 (telemetry): cached count of messages currently buffered for + /// forwarding — i.e. rows in , + /// the live store-and-forward queue waiting to be delivered. This backs the + /// scadabridge.store_and_forward.queue.depth observable gauge. + /// + /// The gauge's collection callback is synchronous and is invoked frequently by + /// the OpenTelemetry/Prometheus collector, so it must never run an async SQLite + /// COUNT(*). Instead this is seeded once from storage + /// in and then adjusted in-process on the existing + /// paths that change the Pending population: (+1), + /// successful-retry removal and Pending→Parked transitions in + /// (-1), and operator requeue in + /// (+1). The provider registered with + /// reads it via + /// — non-blocking and sync-safe. It is an + /// approximate, eventually-consistent gauge (concurrent failover replication + /// applies to the standby's own store, not this counter), which is exactly + /// what a queue-depth metric needs. + /// + /// + private long _bufferedCount; + + /// + /// Test seam (WP-14 telemetry): simulates a concurrent pre-seed + /// increment landing on + /// before seeds it, so a test can prove the seed uses + /// (additive) rather than Exchange (clobbering). + /// + internal void TestOnly_IncrementBufferedCount() => + Interlocked.Increment(ref _bufferedCount); + + /// + /// WP-14 (telemetry): an instance field that guards against a single instance + /// registering the queue-depth provider (and re-seeding the counter) more than + /// once — e.g. a second on the same instance. It does NOT + /// coordinate across instances: the gauge slot in + /// is process-global, so in a multi-instance process the last + /// wins the global slot. 0 = not yet registered, 1 = done. + /// + private int _queueDepthProviderRegistered; + /// /// WP-10: Delivery handler delegate. The return value / exception is interpreted /// the same way on both the immediate-delivery path () @@ -170,6 +213,27 @@ public class StoreAndForwardService public async Task StartAsync() { await _storage.InitializeAsync(); + + // WP-14 (telemetry): seed the cached buffered-message count from the + // store exactly once (the gauge callback cannot run an async COUNT), then + // register the sync, non-blocking provider with the process-global + // ScadaBridgeTelemetry gauge. Both steps are inside the one-time guard so a + // second StartAsync on the same instance cannot double-seed. + // + // The seed is an Interlocked.Add — NOT an Exchange — to avoid a startup race: + // between the await above returning and this point, a concurrent BufferAsync + // could already have Interlocked.Increment'd _bufferedCount. Exchange would + // clobber that increment (losing a +1); Add preserves it. _bufferedCount + // starts at 0 and only BufferAsync increments it before the seed, so + // 0 + pending + (any concurrent increments) is the correct live count. + var pending = await _storage.GetMessageCountByStatusAsync( + StoreAndForwardMessageStatus.Pending); + if (Interlocked.CompareExchange(ref _queueDepthProviderRegistered, 1, 0) == 0) + { + Interlocked.Add(ref _bufferedCount, pending); + ScadaBridgeTelemetry.SetQueueDepthProvider(() => Interlocked.Read(ref _bufferedCount)); + } + _retryTimer = new Timer( // StoreAndForward-024: capture the sweep Task on each tick so // StopAsync can await any in-flight invocation before the host @@ -396,6 +460,10 @@ public class StoreAndForwardService { await _storage.EnqueueAsync(message); _replication?.ReplicateEnqueue(message); + // WP-14 (telemetry): a freshly buffered row is Pending → grows the live + // queue depth. Bumped after the durable write so the gauge never leads the + // store. + Interlocked.Increment(ref _bufferedCount); } /// @@ -452,6 +520,8 @@ public class StoreAndForwardService { await _storage.RemoveMessageAsync(message.Id); _replication?.ReplicateRemove(message.Id); + // WP-14 (telemetry): a delivered row leaves the Pending queue. + Interlocked.Decrement(ref _bufferedCount); RaiseActivity("Delivered", message.Category, $"Delivered to {message.Target} after {message.RetryCount} retries"); @@ -483,6 +553,9 @@ public class StoreAndForwardService message.Id); return; } + // WP-14 (telemetry): the row committed Pending→Parked, leaving the live + // forward queue. Only counted when the conditional update actually won. + Interlocked.Decrement(ref _bufferedCount); _replication?.ReplicatePark(message); RaiseActivity("Parked", message.Category, $"Permanent failure for {message.Target}: handler returned false"); @@ -519,6 +592,9 @@ public class StoreAndForwardService message.Id); return; } + // WP-14 (telemetry): the row committed Pending→Parked, leaving the + // live forward queue. Only counted when the conditional update won. + Interlocked.Decrement(ref _bufferedCount); _replication?.ReplicatePark(message); RaiseActivity("Parked", message.Category, $"Max retries ({message.MaxRetries}) reached for {message.Target}"); @@ -737,6 +813,11 @@ public class StoreAndForwardService return false; } + // WP-14 (telemetry): an operator requeue moves Parked→Pending, re-adding the + // row to the live forward queue. Counted only when the conditional storage + // update actually flipped the row. + Interlocked.Increment(ref _bufferedCount); + // The active node just rewrote this row to Pending with retry_count = 0 // and cleared last_error / last_attempt_at (see // StoreAndForwardStorage.RetryParkedMessageAsync). Reconstruct the @@ -769,6 +850,7 @@ public class StoreAndForwardService { // Capture the category before the row is deleted so the activity log is // labelled correctly. + // WP-14 (telemetry): Parked rows are not in _bufferedCount; discarding a Parked row needs no counter adjustment. var message = await _storage.GetMessageByIdAsync(messageId); var success = await _storage.DiscardParkedMessageAsync(messageId); if (success) diff --git a/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/Grpc/SiteStreamGrpcServerTests.cs b/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/Grpc/SiteStreamGrpcServerTests.cs index ffb4fe2b..81e00fc0 100644 --- a/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/Grpc/SiteStreamGrpcServerTests.cs +++ b/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/Grpc/SiteStreamGrpcServerTests.cs @@ -1,3 +1,4 @@ +using System.Diagnostics.Metrics; using System.Threading.Channels; using Akka.Actor; using Akka.TestKit.Xunit2; @@ -5,6 +6,7 @@ using Grpc.Core; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using NSubstitute; +using ZB.MOM.WW.ScadaBridge.Commons.Observability; using ZB.MOM.WW.ScadaBridge.Communication.Grpc; namespace ZB.MOM.WW.ScadaBridge.Communication.Tests.Grpc; @@ -342,6 +344,59 @@ public class SiteStreamGrpcServerTests : TestKit subscriber.DidNotReceive().RemoveSubscriber(Arg.Any()); } + [Fact] + public async Task SiteConnectionUpGauge_GoesToOneOnConnect_AndBackToZeroOnCancel() + { + // Telemetry follow-on: the scadabridge.site.connection.up gauge must read + // exactly 1 while a site stream is established and return to 0 once the + // stream terminates on the cancel path — proving SiteConnectionOpened() is + // matched by exactly one SiteConnectionClosed() in the handler's finally. + var server = CreateServer(); + server.SetReady(Sys); + + long ReadGauge() + { + long observed = 0; + using var listener = new MeterListener(); + listener.InstrumentPublished = (instrument, l) => + { + if (instrument.Meter.Name == ScadaBridgeTelemetry.MeterName && + instrument.Name == "scadabridge.site.connection.up") + { + l.EnableMeasurementEvents(instrument); + } + }; + listener.SetMeasurementEventCallback((_, measurement, _, _) => observed = measurement); + listener.Start(); + listener.RecordObservableInstruments(); + return observed; + } + + var baseline = ReadGauge(); + + var cts = new CancellationTokenSource(); + var context = CreateMockContext(cts.Token); + var writer = Substitute.For>(); + + var streamTask = Task.Run(() => server.SubscribeInstance( + MakeRequest("corr-gauge", "Site1.Pump01"), writer, context)); + + await WaitForConditionAsync(() => server.ActiveStreamCount == 1); + + // While the stream is up the gauge is one above whatever baseline other + // (possibly parallel) tests left behind — read relative so the assertion + // is robust to test interleaving on the process-wide static counter. + Assert.Equal(baseline + 1, ReadGauge()); + + cts.Cancel(); + await streamTask; + await WaitForConditionAsync(() => server.ActiveStreamCount == 0); + + // After the cancel path runs the finally, the gauge is balanced back to + // the baseline — no leaked "up" count. + Assert.Equal(baseline, ReadGauge()); + } + [Fact] public void SetReady_AllowsStreamCreation() { diff --git a/tests/ZB.MOM.WW.ScadaBridge.DeploymentManager.Tests/DeploymentServiceTests.cs b/tests/ZB.MOM.WW.ScadaBridge.DeploymentManager.Tests/DeploymentServiceTests.cs index edd31c8e..f3902651 100644 --- a/tests/ZB.MOM.WW.ScadaBridge.DeploymentManager.Tests/DeploymentServiceTests.cs +++ b/tests/ZB.MOM.WW.ScadaBridge.DeploymentManager.Tests/DeploymentServiceTests.cs @@ -1,3 +1,4 @@ +using System.Diagnostics.Metrics; using Akka.Actor; using Akka.TestKit.Xunit2; using Microsoft.Extensions.Logging.Abstractions; @@ -10,6 +11,7 @@ using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories; using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Lifecycle; +using ZB.MOM.WW.ScadaBridge.Commons.Observability; using ZB.MOM.WW.ScadaBridge.Commons.Types; using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; using ZB.MOM.WW.ScadaBridge.Commons.Types.Flattening; @@ -558,6 +560,106 @@ public class DeploymentServiceTests : TestKit Arg.Any(), Arg.Any()); } + // ── Telemetry follow-on: scadabridge.deployments.applied on deploy success ── + + [Fact] + public async Task DeployInstanceAsync_SiteSucceeds_EmitsDeploymentsAppliedCounterOnce() + { + // A successful deployment must increment the + // scadabridge.deployments.applied counter exactly once — one + // DeployInstanceAsync deploys one instance to one site, so the unit is + // one increment per successful deploy operation. + var instance = new Instance("MetricInst") { Id = 55, SiteId = 1, State = InstanceState.NotDeployed }; + _repo.GetInstanceByIdAsync(55, Arg.Any()).Returns(instance); + SetupValidPipeline(55, "MetricInst", "sha256:target"); + _repo.GetCurrentDeploymentStatusAsync(55, Arg.Any()) + .Returns((DeploymentRecord?)null); + + var counters = new ReconcileProbeCounters(); + var commActor = Sys.ActorOf(Props.Create(() => + new ReconcileProbeActor(counters, siteHash: "sha256:target", failQuery: false))); + var service = CreateServiceWithCommActor(commActor); + + long applied = 0; + using var listener = new MeterListener + { + InstrumentPublished = (instrument, l) => + { + if (instrument.Meter.Name == ScadaBridgeTelemetry.MeterName + && instrument.Name == "scadabridge.deployments.applied") + { + l.EnableMeasurementEvents(instrument); + } + } + }; + listener.SetMeasurementEventCallback((_, measurement, _, _) => + Interlocked.Add(ref applied, measurement)); + listener.Start(); + + var result = await service.DeployInstanceAsync(55, "admin"); + + listener.Dispose(); + + Assert.True(result.IsSuccess); + // Fresh first-time deploy applied -> exactly one increment. + Assert.Equal(1, Interlocked.Read(ref applied)); + } + + [Fact] + public async Task DeployInstanceAsync_Reconciled_DoesNotEmitDeploymentsAppliedCounter() + { + // The reconciliation path recovers a PRIOR timed-out apply rather than + // performing a fresh one; counting it would risk double-counting the + // original apply, so scadabridge.deployments.applied must NOT increment + // on a reconciled (no re-deploy) success. + var instance = new Instance("MetricReconcileInst") + { + Id = 56, SiteId = 1, State = InstanceState.NotDeployed + }; + _repo.GetInstanceByIdAsync(56, Arg.Any()).Returns(instance); + SetupValidPipeline(56, "MetricReconcileInst", "sha256:target"); + + var prior = new DeploymentRecord("dep-prior-56", "admin") + { + InstanceId = 56, + Status = DeploymentStatus.InProgress, + RevisionHash = "sha256:target" + }; + _repo.GetCurrentDeploymentStatusAsync(56, Arg.Any()).Returns(prior); + _repo.GetDeployedSnapshotByInstanceIdAsync(56, Arg.Any()) + .Returns((DeployedConfigSnapshot?)null); + + var counters = new ReconcileProbeCounters(); + var commActor = Sys.ActorOf(Props.Create(() => + new ReconcileProbeActor(counters, siteHash: "sha256:target", failQuery: false))); + var service = CreateServiceWithCommActor(commActor); + + long applied = 0; + using var listener = new MeterListener + { + InstrumentPublished = (instrument, l) => + { + if (instrument.Meter.Name == ScadaBridgeTelemetry.MeterName + && instrument.Name == "scadabridge.deployments.applied") + { + l.EnableMeasurementEvents(instrument); + } + } + }; + listener.SetMeasurementEventCallback((_, measurement, _, _) => + Interlocked.Add(ref applied, measurement)); + listener.Start(); + + var result = await service.DeployInstanceAsync(56, "admin"); + + listener.Dispose(); + + Assert.True(result.IsSuccess); + // Reconciled — no fresh deploy was sent, so no increment. + Assert.Equal(0, counters.DeployCount); + Assert.Equal(0, Interlocked.Read(ref applied)); + } + // ── DeploymentManager-011: lifecycle success paths ── [Fact] diff --git a/tests/ZB.MOM.WW.ScadaBridge.Host.Tests/ScadaBridgeTelemetryTests.cs b/tests/ZB.MOM.WW.ScadaBridge.Host.Tests/ScadaBridgeTelemetryTests.cs new file mode 100644 index 00000000..c081386e --- /dev/null +++ b/tests/ZB.MOM.WW.ScadaBridge.Host.Tests/ScadaBridgeTelemetryTests.cs @@ -0,0 +1,32 @@ +using ZB.MOM.WW.ScadaBridge.Commons.Observability; + +namespace ZB.MOM.WW.ScadaBridge.Host.Tests; + +/// +/// Infrastructure-free guards for : the meter name is the +/// stable value registered with OTel, and the emit helpers are safe to call (they are no-op +/// until follow-on tasks wire real emit points and a listener attaches). +/// +public class ScadaBridgeTelemetryTests +{ + [Fact] + public void MeterName_IsStableValue() + { + Assert.Equal("ZB.MOM.WW.ScadaBridge", ScadaBridgeTelemetry.MeterName); + } + + [Fact] + public void EmitHelpers_DoNotThrow() + { + var ex = Record.Exception(() => + { + ScadaBridgeTelemetry.RecordDeploymentApplied(); + ScadaBridgeTelemetry.RecordInboundApiRequest("X"); + ScadaBridgeTelemetry.SiteConnectionOpened(); + ScadaBridgeTelemetry.SiteConnectionClosed(); + ScadaBridgeTelemetry.SetQueueDepthProvider(() => 5); + }); + + Assert.Null(ex); + } +} diff --git a/tests/ZB.MOM.WW.ScadaBridge.Host.Tests/StartupValidatorTests.cs b/tests/ZB.MOM.WW.ScadaBridge.Host.Tests/StartupValidatorTests.cs index f6e77d16..95b0af2f 100644 --- a/tests/ZB.MOM.WW.ScadaBridge.Host.Tests/StartupValidatorTests.cs +++ b/tests/ZB.MOM.WW.ScadaBridge.Host.Tests/StartupValidatorTests.cs @@ -352,6 +352,105 @@ public class StartupValidatorTests Assert.Null(ex); } + [Theory] + [InlineData("0")] + [InlineData("-1")] + [InlineData("65536")] + [InlineData("abc")] + public void Site_InvalidMetricsPort_FailsValidation(string metricsPort) + { + var values = ValidSiteConfig(); + values["ScadaBridge:Node:MetricsPort"] = metricsPort; + var config = BuildConfig(values); + + var ex = Assert.Throws(() => StartupValidator.Validate(config)); + Assert.Contains("MetricsPort must be 1-65535", ex.Message); + } + + [Fact] + public void Site_ValidMetricsPort_PassesValidation() + { + var values = ValidSiteConfig(); + values["ScadaBridge:Node:MetricsPort"] = "8084"; + var config = BuildConfig(values); + + var ex = Record.Exception(() => StartupValidator.Validate(config)); + Assert.Null(ex); + } + + [Fact] + public void Site_MetricsPortEqualsRemotingPort_FailsValidation() + { + // Host-007 regression: the Kestrel metrics (HTTP/1.1) listener port must + // differ from RemotingPort. Identical values cause the metrics listener + // and Akka.Remote to contend for the same port at runtime. + var values = ValidSiteConfig(); + values["ScadaBridge:Node:RemotingPort"] = "8082"; + values["ScadaBridge:Node:MetricsPort"] = "8082"; + var config = BuildConfig(values); + + var ex = Assert.Throws(() => StartupValidator.Validate(config)); + Assert.Contains("MetricsPort must differ from RemotingPort", ex.Message); + } + + [Fact] + public void Site_MetricsPortEqualsGrpcPort_FailsValidation() + { + // Host-007 regression: the metrics listener port must differ from GrpcPort. + var values = ValidSiteConfig(); + values["ScadaBridge:Node:GrpcPort"] = "8083"; + values["ScadaBridge:Node:MetricsPort"] = "8083"; + var config = BuildConfig(values); + + var ex = Assert.Throws(() => StartupValidator.Validate(config)); + Assert.Contains("MetricsPort must differ from GrpcPort", ex.Message); + } + + [Fact] + public void Site_DefaultMetricsPortEqualsRemotingPort_FailsValidation() + { + // MetricsPort absent => NodeOptions default 8084. A site whose RemotingPort + // is also 8084 must still be rejected. + var values = ValidSiteConfig(); + values["ScadaBridge:Node:RemotingPort"] = "8084"; + // Keep GrpcPort distinct so only the metrics-vs-remoting rule fires. + values["ScadaBridge:Node:GrpcPort"] = "8083"; + // Seed nodes default to the remoting port (8082) in ValidSiteConfig; realign + // them to 8084 so the seed-vs-grpc rule is not what trips here. + values["ScadaBridge:Cluster:SeedNodes:0"] = "akka.tcp://scadabridge@site-a-node1:8084"; + values["ScadaBridge:Cluster:SeedNodes:1"] = "akka.tcp://scadabridge@site-a-node2:8084"; + var config = BuildConfig(values); + + var ex = Assert.Throws(() => StartupValidator.Validate(config)); + Assert.Contains("MetricsPort must differ from RemotingPort", ex.Message); + } + + [Fact] + public void Site_MetricsPortDiffersFromRemotingAndGrpc_PassesValidation() + { + var values = ValidSiteConfig(); + values["ScadaBridge:Node:RemotingPort"] = "8082"; + values["ScadaBridge:Node:GrpcPort"] = "8083"; + values["ScadaBridge:Node:MetricsPort"] = "8084"; + var config = BuildConfig(values); + + var ex = Record.Exception(() => StartupValidator.Validate(config)); + Assert.Null(ex); + } + + [Fact] + public void Central_InvalidMetricsPort_NotValidated() + { + // The metrics-port rules apply to Site nodes only; a Central node runs no + // metrics listener, so an out-of-range MetricsPort must not fail startup. + var values = ValidCentralConfig(); + values["ScadaBridge:Node:MetricsPort"] = "0"; + var config = BuildConfig(values); + + var ex = Record.Exception(() => StartupValidator.Validate(config)); + Assert.Null(ex); + } + [Fact] public void MultipleErrors_AllReported() { diff --git a/tests/ZB.MOM.WW.ScadaBridge.InboundAPI.Tests/EndpointExtensionsTests.cs b/tests/ZB.MOM.WW.ScadaBridge.InboundAPI.Tests/EndpointExtensionsTests.cs index 9e5dd20b..a5a14fcb 100644 --- a/tests/ZB.MOM.WW.ScadaBridge.InboundAPI.Tests/EndpointExtensionsTests.cs +++ b/tests/ZB.MOM.WW.ScadaBridge.InboundAPI.Tests/EndpointExtensionsTests.cs @@ -9,8 +9,10 @@ using NSubstitute; using ZB.MOM.WW.ScadaBridge.Commons.Entities.InboundApi; using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories; using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services; +using ZB.MOM.WW.ScadaBridge.Commons.Observability; using ZB.MOM.WW.ScadaBridge.Commons.Types.InboundApi; using ZB.MOM.WW.ScadaBridge.InboundAPI.Middleware; +using System.Diagnostics.Metrics; using System.Net; using System.Text; @@ -232,6 +234,114 @@ public class EndpointExtensionsTests Assert.Equal("audit-actor-name", capture.CapturedActor); } + [Fact] + public async Task ValidRequest_EmitsInboundApiRequestCounter_TaggedWithResolvedMethodName() + { + // Telemetry follow-on: a successful inbound request increments + // scadabridge.inbound_api.requests once, tagged with the resolved, + // registered method name (method.Name) — the bounded identifier, not the + // raw route value. + var key = SeedKey(); + var method = SeedMethod(1, "echo", "return Parameters[\"value\"];", + """[{"name":"value","type":"Integer","required":true}]"""); + + using var collector = new InboundApiRequestCounterCollector(); + + using var host = await BuildHostAsync(key, method); + var client = host.GetTestClient(); + + var response = await client.SendAsync(BuildPost("echo", """{"value":7}""")); + + Assert.Equal(HttpStatusCode.OK, response.StatusCode); + // Filter by the method tag this test produced: the counter is a process-wide + // static, so a parallel test class could otherwise leak measurements in. + var echoTotal = collector.Measurements + .Where(m => m.Method == "echo") + .Sum(m => m.Value); + Assert.Equal(1, echoTotal); + } + + [Fact] + public async Task UnknownMethod_EmitsInboundApiRequestCounter_WithBoundedForbiddenSentinel() + { + // Telemetry follow-on: an auth/authz failure is still counted, but the + // tag is a bounded sentinel ("") rather than the arbitrary + // caller-supplied route value — so an attacker posting random method + // names cannot blow up the `method` tag cardinality. + var key = SeedKey(); + var method = SeedMethod(1, "knownMethod", "return 1;"); + + using var collector = new InboundApiRequestCounterCollector(); + + using var host = await BuildHostAsync(key, method); + var client = host.GetTestClient(); + + var response = await client.SendAsync(BuildPost("totally-made-up-name", "{}")); + + Assert.Equal(HttpStatusCode.Forbidden, response.StatusCode); + var measurements = collector.Measurements; + // Cardinality safety: the arbitrary route value is never used as a tag. + Assert.DoesNotContain(measurements, m => m.Method == "totally-made-up-name"); + // The failure path counts the request against the bounded sentinel. + Assert.Contains(measurements, m => m.Method == "" && m.Value == 1); + } + + /// + /// Captures scadabridge.inbound_api.requests measurements (value + the + /// method tag) via a for the duration of a test. + /// + private sealed class InboundApiRequestCounterCollector : IDisposable + { + private readonly MeterListener _listener; + private readonly List<(long Value, string? Method)> _measurements = new(); + private readonly object _gate = new(); + + public InboundApiRequestCounterCollector() + { + _listener = new MeterListener + { + InstrumentPublished = (instrument, listener) => + { + if (instrument.Meter.Name == ScadaBridgeTelemetry.MeterName + && instrument.Name == "scadabridge.inbound_api.requests") + { + listener.EnableMeasurementEvents(instrument); + } + }, + }; + _listener.SetMeasurementEventCallback((instrument, value, tags, state) => + { + string? method = null; + foreach (var tag in tags) + { + if (tag.Key == "method") + { + method = tag.Value as string; + } + } + + lock (_gate) + { + _measurements.Add((value, method)); + } + }); + _listener.Start(); + } + + public IReadOnlyList<(long Value, string? Method)> Measurements + { + get + { + lock (_gate) + { + return _measurements.ToList(); + } + } + } + + public void Dispose() => _listener.Dispose(); + } + private static HttpRequestMessage BuildPost(string methodName, string body) { var request = new HttpRequestMessage(HttpMethod.Post, "/api/" + methodName) diff --git a/tests/ZB.MOM.WW.ScadaBridge.StoreAndForward.Tests/QueueDepthGaugeTests.cs b/tests/ZB.MOM.WW.ScadaBridge.StoreAndForward.Tests/QueueDepthGaugeTests.cs new file mode 100644 index 00000000..a09a4b39 --- /dev/null +++ b/tests/ZB.MOM.WW.ScadaBridge.StoreAndForward.Tests/QueueDepthGaugeTests.cs @@ -0,0 +1,211 @@ +using System.Diagnostics.Metrics; +using Microsoft.Data.Sqlite; +using Microsoft.Extensions.Logging.Abstractions; +using ZB.MOM.WW.ScadaBridge.Commons.Observability; +using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; + +namespace ZB.MOM.WW.ScadaBridge.StoreAndForward.Tests; + +/// +/// WP-14 (telemetry follow-on): verifies the cached buffered-message counter that +/// backs the scadabridge.store_and_forward.queue.depth observable gauge tracks +/// the live (Pending) queue across the existing enqueue / drain / park / requeue paths, +/// and that the sync gauge callback reports it. +/// +/// The gauge is read the way the OpenTelemetry collector reads it — via a +/// that forces an observation (the callback is synchronous +/// and does no I/O, which is the whole point of caching the count). +/// seeds the counter from storage and registers the provider against this service +/// instance, so the gauge resolves to this test's counter. +/// +public class QueueDepthGaugeTests : IAsyncLifetime, IDisposable +{ + private readonly SqliteConnection _keepAlive; + private readonly StoreAndForwardStorage _storage; + private readonly StoreAndForwardService _service; + + public QueueDepthGaugeTests() + { + var dbName = $"QueueDepthTests_{Guid.NewGuid():N}"; + var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared"; + _keepAlive = new SqliteConnection(connStr); + _keepAlive.Open(); + + _storage = new StoreAndForwardStorage(connStr, NullLogger.Instance); + + var options = new StoreAndForwardOptions + { + DefaultRetryInterval = TimeSpan.Zero, + DefaultMaxRetries = 3, + // Long interval so no background sweep fires on its own during the test; + // sweeps are driven explicitly via RetryPendingMessagesAsync. + RetryTimerInterval = TimeSpan.FromMinutes(10) + }; + + _service = new StoreAndForwardService( + _storage, options, NullLogger.Instance); + } + + public async Task InitializeAsync() + { + await _storage.InitializeAsync(); + // StartAsync seeds _bufferedCount from the (empty) store and registers the + // queue-depth provider against this service instance. + await _service.StartAsync(); + } + + public async Task DisposeAsync() => await _service.StopAsync(); + + public void Dispose() => _keepAlive.Dispose(); + + /// + /// Reads the current value of the scadabridge.store_and_forward.queue.depth + /// gauge by forcing a synchronous observation through a transient MeterListener — + /// exactly the path the Prometheus/OTLP collector exercises on each scrape. + /// + private static long ReadQueueDepthGauge() + { + long observed = -1; + using var listener = new MeterListener + { + InstrumentPublished = (instrument, l) => + { + if (instrument.Meter.Name == ScadaBridgeTelemetry.MeterName && + instrument.Name == "scadabridge.store_and_forward.queue.depth") + { + l.EnableMeasurementEvents(instrument); + } + } + }; + listener.SetMeasurementEventCallback((_, measurement, _, _) => observed = measurement); + listener.Start(); + listener.RecordObservableInstruments(); + return observed; + } + + [Fact] + public async Task Gauge_TracksBufferedDepth_AcrossEnqueueDrainAndPark() + { + // Empty store seeded at StartAsync → gauge reports 0. + Assert.Equal(0, ReadQueueDepthGauge()); + + // A handler that fails transiently so each enqueue buffers a Pending row + // (immediate attempt 0 throws → BufferAsync → +1). + var deliver = false; + _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, + _ => + { + if (!deliver) throw new HttpRequestException("transient"); + return Task.FromResult(true); + }); + + // Enqueue 3 → cached depth = 3 → gauge reports 3. + for (var i = 0; i < 3; i++) + { + var r = await _service.EnqueueAsync( + StoreAndForwardCategory.ExternalSystem, "api", """{}"""); + Assert.True(r.WasBuffered); + } + Assert.Equal(3, ReadQueueDepthGauge()); + + // Drain: handler now succeeds → the retry sweep removes all 3 Pending rows → depth 0. + deliver = true; + await _service.RetryPendingMessagesAsync(); + Assert.Equal(0, ReadQueueDepthGauge()); + + // Park path: buffer one more, then make it park (maxRetries:1 parks after one + // sweep). Pending→Parked leaves the live queue → depth back to 0. + deliver = false; + var parkResult = await _service.EnqueueAsync( + StoreAndForwardCategory.ExternalSystem, "api", """{}""", maxRetries: 1); + Assert.True(parkResult.WasBuffered); + Assert.Equal(1, ReadQueueDepthGauge()); + + await _service.RetryPendingMessagesAsync(); + var parked = await _storage.GetMessageByIdAsync(parkResult.MessageId); + Assert.Equal(StoreAndForwardMessageStatus.Parked, parked!.Status); + Assert.Equal(0, ReadQueueDepthGauge()); + + // Operator requeue: Parked→Pending re-adds to the live queue → depth 1. + Assert.True(await _service.RetryParkedMessageAsync(parkResult.MessageId)); + Assert.Equal(1, ReadQueueDepthGauge()); + } + + [Fact] + public async Task Gauge_SeedsFromExistingPendingRows_OnStart() + { + // Pre-seed two Pending rows directly in storage *before* a fresh service starts, + // simulating a process restart over a non-empty buffer. StartAsync must seed the + // cached counter from the store so the gauge does not under-report on restart. + await _storage.EnqueueAsync(new StoreAndForwardMessage + { + Id = Guid.NewGuid().ToString("N"), + Category = StoreAndForwardCategory.ExternalSystem, + Target = "api", + PayloadJson = "{}", + Status = StoreAndForwardMessageStatus.Pending, + CreatedAt = DateTimeOffset.UtcNow, + MaxRetries = 3 + }); + await _storage.EnqueueAsync(new StoreAndForwardMessage + { + Id = Guid.NewGuid().ToString("N"), + Category = StoreAndForwardCategory.Notification, + Target = "list", + PayloadJson = "{}", + Status = StoreAndForwardMessageStatus.Pending, + CreatedAt = DateTimeOffset.UtcNow, + MaxRetries = 3 + }); + + var fresh = new StoreAndForwardService( + _storage, + new StoreAndForwardOptions { RetryTimerInterval = TimeSpan.FromMinutes(10) }, + NullLogger.Instance); + try + { + await fresh.StartAsync(); + // The fresh service registered itself as the global provider and seeded 2. + Assert.Equal(2, ReadQueueDepthGauge()); + } + finally + { + await fresh.StopAsync(); + } + } + + /// + /// Review finding (FINDING 1): the startup seed must ADD to whatever the counter + /// already holds, not overwrite it. A concurrent BufferAsync can + /// Interlocked.Increment _bufferedCount in the window between + /// StartAsync's async COUNT(*) returning and the seed running; with an + /// Interlocked.Exchange seed that increment would be clobbered (lost +1). This + /// pre-increments the in-memory counter (standing in for that concurrent enqueue), + /// then starts the service over an empty store and asserts the pre-increment survives. + /// + [Fact] + public async Task Gauge_SeedAddsToConcurrentPreSeedIncrement_NotClobber() + { + // Store is empty (StartAsync's pending COUNT(*) = 0), so the only contribution + // is the simulated concurrent pre-seed enqueue increment. + var fresh = new StoreAndForwardService( + _storage, + new StoreAndForwardOptions { RetryTimerInterval = TimeSpan.FromMinutes(10) }, + NullLogger.Instance); + + // Stand in for a BufferAsync increment that landed before StartAsync seeded. + fresh.TestOnly_IncrementBufferedCount(); + + try + { + await fresh.StartAsync(); + // Add(0 seed) over the pre-existing +1 → 1. An Exchange(0) seed would clobber + // it to 0, losing the concurrent enqueue — the bug this fix prevents. + Assert.Equal(1, ReadQueueDepthGauge()); + } + finally + { + await fresh.StopAsync(); + } + } +}