20 Commits

Author SHA1 Message Date
Joseph Doherty 145d2668e2 fix: wire ValidateOnStart for ScadaBridge HealthMonitoring + Cluster options (fail-fast at startup) 2026-06-01 23:07:46 -04:00
Joseph Doherty 9668a4e84a refactor: ScadaBridge module options registration -> AddValidatedOptions; clarify De Morgan predicates 2026-06-01 22:49:41 -04:00
Joseph Doherty 6dbbc7ad04 refactor: ScadaBridge StartupValidator -> ConfigPreflight (byte-compatible) 2026-06-01 19:04:13 -04:00
Joseph Doherty aac59c9fae refactor: ScadaBridge validators onto OptionsValidatorBase (messages unchanged) 2026-06-01 18:56:04 -04:00
Joseph Doherty 9bca6aae61 build: add ZB.MOM.WW.Configuration feed mapping + version pin 2026-06-01 18:10:29 -04:00
Joseph Doherty 7d16f8f275 Merge feat/telemetry-followons: telemetry follow-ons for ScadaBridge
Site-node HTTP/1.1 /metrics listener (NodeOptions.MetricsPort=8084, avoids the
site RemotingPort collision; StartupValidator enforces distinctness). First
application instruments: ScadaBridgeTelemetry meter + deployments.applied,
store_and_forward.queue.depth, inbound_api.requests, site.connection.up.
Config-driven OTLP exporter opt-in (default Prometheus).
2026-06-01 17:17:39 -04:00
Joseph Doherty ccf43312e8 feat(scadabridge): config-driven OTLP exporter opt-in (default Prometheus) 2026-06-01 17:14:35 -04:00
Joseph Doherty a5f8651b0f feat(scadabridge): track scadabridge.site.connection.up over site-stream lifetime (balanced open/close) 2026-06-01 17:11:39 -04:00
Joseph Doherty 15a626390b fix(scadabridge): queue-depth seed uses Add (no lost concurrent enqueue) + clarify registration/discard comments 2026-06-01 17:07:03 -04:00
Joseph Doherty 782fb73015 feat(scadabridge): emit scadabridge.inbound_api.requests (by method) at inbound API entry 2026-06-01 17:03:10 -04:00
Joseph Doherty 547b685a42 feat(scadabridge): wire scadabridge.store_and_forward.queue.depth gauge to buffered count 2026-06-01 16:58:09 -04:00
Joseph Doherty 877f2e200b feat(scadabridge): emit scadabridge.deployments.applied on deployment success 2026-06-01 16:52:09 -04:00
Joseph Doherty c41cb41c7b fix(scadabridge): default MetricsPort to 8084 (avoid site RemotingPort collision) + validate port distinctness 2026-06-01 16:46:59 -04:00
Joseph Doherty fe25ac3e51 feat(scadabridge): add ScadaBridgeTelemetry meter + 4 instruments; register with OTel 2026-06-01 16:41:52 -04:00
Joseph Doherty bbc9f09268 feat(scadabridge): add HTTP/1.1 metrics listener on site nodes (NodeOptions.MetricsPort=8082) 2026-06-01 16:36:59 -04:00
Joseph Doherty 43f5886024 Merge feat/adopt-zb-telemetry: adopt ZB.MOM.WW.Telemetry across ScadaBridge
AddZbTelemetry (shared OTel Resource + standard instrumentation + /metrics) wired
into both Central and Site composition roots; kept LoggerConfigurationFactory
(min-level governance) and added the shared TraceContextEnricher for trace<->log
correlation. Behaviour-preserving (no AddZbSerilog; factory retained).
2026-06-01 16:05:49 -04:00
Joseph Doherty f743ffaad2 feat(scadabridge): add shared TraceContextEnricher to log pipeline (trace correlation) 2026-06-01 15:40:42 -04:00
Joseph Doherty b3070c0bda feat(scadabridge): wire AddZbTelemetry + /metrics in both composition roots 2026-06-01 15:36:55 -04:00
Joseph Doherty 20a31835cf build(scadabridge): reference ZB.MOM.WW.Telemetry packages from Gitea feed 2026-06-01 15:30:00 -04:00
Joseph Doherty 59dca0d5fd Merge feat/adopt-zb-health: adopt ZB.MOM.WW.Health shared probes (/healthz, canonical writer, ActorSystem DI bridge) 2026-06-01 14:07:00 -04:00
36 changed files with 1190 additions and 238 deletions
+3
View File
@@ -75,8 +75,11 @@
<PackageVersion Include="ZB.MOM.WW.Health" Version="0.1.0" />
<PackageVersion Include="ZB.MOM.WW.Health.Akka" Version="0.1.0" />
<PackageVersion Include="ZB.MOM.WW.Health.EntityFrameworkCore" Version="0.1.0" />
<PackageVersion Include="ZB.MOM.WW.Telemetry" Version="0.1.0" />
<PackageVersion Include="ZB.MOM.WW.Telemetry.Serilog" Version="0.1.0" />
<PackageVersion Include="ZB.MOM.WW.MxGateway.Client" Version="0.1.0" />
<PackageVersion Include="ZB.MOM.WW.MxGateway.Contracts" Version="0.1.0" />
<PackageVersion Include="ZB.MOM.WW.Configuration" Version="0.1.0" />
</ItemGroup>
</Project>
+2 -1
View File
@@ -6,7 +6,8 @@
"NodeHostname": "scadabridge-site-a-a",
"SiteId": "site-a",
"RemotingPort": 8082,
"GrpcPort": 8083
"GrpcPort": 8083,
"MetricsPort": 8084
},
"Cluster": {
"SeedNodes": [
+2 -1
View File
@@ -6,7 +6,8 @@
"NodeHostname": "scadabridge-site-a-b",
"SiteId": "site-a",
"RemotingPort": 8082,
"GrpcPort": 8083
"GrpcPort": 8083,
"MetricsPort": 8084
},
"Cluster": {
"SeedNodes": [
+2 -1
View File
@@ -6,7 +6,8 @@
"NodeHostname": "scadabridge-site-b-a",
"SiteId": "site-b",
"RemotingPort": 8082,
"GrpcPort": 8083
"GrpcPort": 8083,
"MetricsPort": 8084
},
"Cluster": {
"SeedNodes": [
+2 -1
View File
@@ -6,7 +6,8 @@
"NodeHostname": "scadabridge-site-b-b",
"SiteId": "site-b",
"RemotingPort": 8082,
"GrpcPort": 8083
"GrpcPort": 8083,
"MetricsPort": 8084
},
"Cluster": {
"SeedNodes": [
+2 -1
View File
@@ -6,7 +6,8 @@
"NodeHostname": "scadabridge-site-c-a",
"SiteId": "site-c",
"RemotingPort": 8082,
"GrpcPort": 8083
"GrpcPort": 8083,
"MetricsPort": 8084
},
"Cluster": {
"SeedNodes": [
+2 -1
View File
@@ -6,7 +6,8 @@
"NodeHostname": "scadabridge-site-c-b",
"SiteId": "site-c",
"RemotingPort": 8082,
"GrpcPort": 8083
"GrpcPort": 8083,
"MetricsPort": 8084
},
"Cluster": {
"SeedNodes": [
+3
View File
@@ -18,6 +18,9 @@
<package pattern="ZB.MOM.WW.MxGateway.*" />
<package pattern="ZB.MOM.WW.Health" />
<package pattern="ZB.MOM.WW.Health.*" />
<package pattern="ZB.MOM.WW.Telemetry" />
<package pattern="ZB.MOM.WW.Telemetry.*" />
<package pattern="ZB.MOM.WW.Configuration" />
</packageSource>
</packageSourceMapping>
<!--
@@ -1,4 +1,4 @@
using Microsoft.Extensions.Options;
using ZB.MOM.WW.Configuration;
namespace ZB.MOM.WW.ScadaBridge.AuditLog.Configuration;
@@ -13,7 +13,7 @@ namespace ZB.MOM.WW.ScadaBridge.AuditLog.Configuration;
/// drop in-flight investigations, too long would defeat the partition-switch
/// purge's purpose.
/// </summary>
public sealed class AuditLogOptionsValidator : IValidateOptions<AuditLogOptions>
public sealed class AuditLogOptionsValidator : OptionsValidatorBase<AuditLogOptions>
{
/// <summary>Inclusive lower bound for <see cref="AuditLogOptions.RetentionDays"/>.</summary>
public const int MinRetentionDays = 30;
@@ -28,43 +28,29 @@ public sealed class AuditLogOptionsValidator : IValidateOptions<AuditLogOptions>
public const int MaxInboundMaxBytes = 16_777_216;
/// <inheritdoc />
public ValidateOptionsResult Validate(string? name, AuditLogOptions options)
protected override void Validate(ValidationBuilder builder, AuditLogOptions options)
{
ArgumentNullException.ThrowIfNull(options);
builder.RequireThat(options.DefaultCapBytes > 0,
$"AuditLog:{nameof(AuditLogOptions.DefaultCapBytes)} ({options.DefaultCapBytes}) " +
"must be > 0; it drives payload-summary truncation in audit writers.");
var failures = new List<string>();
builder.RequireThat(options.ErrorCapBytes >= options.DefaultCapBytes,
$"AuditLog:{nameof(AuditLogOptions.ErrorCapBytes)} ({options.ErrorCapBytes}) " +
$"must be >= {nameof(AuditLogOptions.DefaultCapBytes)} ({options.DefaultCapBytes}); " +
"the error-row cap is intended to capture more detail than the happy-path summary.");
if (options.DefaultCapBytes <= 0)
{
failures.Add(
$"AuditLog:{nameof(AuditLogOptions.DefaultCapBytes)} ({options.DefaultCapBytes}) " +
"must be > 0; it drives payload-summary truncation in audit writers.");
}
// Valid when RetentionDays is within [Min, Max] inclusive. The De Morgan'd
// guard !(below Min OR above Max) is equivalent to (>= Min AND <= Max).
builder.RequireThat(
!(options.RetentionDays < MinRetentionDays || options.RetentionDays > MaxRetentionDays),
$"AuditLog:{nameof(AuditLogOptions.RetentionDays)} ({options.RetentionDays}) " +
$"must be in [{MinRetentionDays}, {MaxRetentionDays}] days.");
if (options.ErrorCapBytes < options.DefaultCapBytes)
{
failures.Add(
$"AuditLog:{nameof(AuditLogOptions.ErrorCapBytes)} ({options.ErrorCapBytes}) " +
$"must be >= {nameof(AuditLogOptions.DefaultCapBytes)} ({options.DefaultCapBytes}); " +
"the error-row cap is intended to capture more detail than the happy-path summary.");
}
if (options.RetentionDays < MinRetentionDays || options.RetentionDays > MaxRetentionDays)
{
failures.Add(
$"AuditLog:{nameof(AuditLogOptions.RetentionDays)} ({options.RetentionDays}) " +
$"must be in [{MinRetentionDays}, {MaxRetentionDays}] days.");
}
if (options.InboundMaxBytes < MinInboundMaxBytes || options.InboundMaxBytes > MaxInboundMaxBytes)
{
failures.Add(
$"AuditLog:{nameof(AuditLogOptions.InboundMaxBytes)} ({options.InboundMaxBytes}) " +
$"must be in [{MinInboundMaxBytes}, {MaxInboundMaxBytes}] bytes.");
}
return failures.Count == 0
? ValidateOptionsResult.Success
: ValidateOptionsResult.Fail(failures);
// Valid when InboundMaxBytes is within [Min, Max] inclusive. The De Morgan'd
// guard !(below Min OR above Max) is equivalent to (>= Min AND <= Max).
builder.RequireThat(
!(options.InboundMaxBytes < MinInboundMaxBytes || options.InboundMaxBytes > MaxInboundMaxBytes),
$"AuditLog:{nameof(AuditLogOptions.InboundMaxBytes)} ({options.InboundMaxBytes}) " +
$"must be in [{MinInboundMaxBytes}, {MaxInboundMaxBytes}] bytes.");
}
}
@@ -3,7 +3,7 @@ using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ZB.MOM.WW.Configuration;
using ZB.MOM.WW.ScadaBridge.AuditLog.Central;
using ZB.MOM.WW.ScadaBridge.AuditLog.Configuration;
using ZB.MOM.WW.ScadaBridge.AuditLog.Payload;
@@ -62,10 +62,12 @@ public static class ServiceCollectionExtensions
ArgumentNullException.ThrowIfNull(config);
// M1: top-level AuditLogOptions + validator (redaction policy, payload caps, etc.).
services.AddOptions<AuditLogOptions>()
.Bind(config.GetSection(ConfigSectionName))
.ValidateOnStart();
services.AddSingleton<IValidateOptions<AuditLogOptions>, AuditLogOptionsValidator>();
// Collapsed onto the shared ZB.MOM.WW.Configuration helper: it binds the
// "AuditLog" section, registers the validator, and enables ValidateOnStart in
// one call. Same section path as before; AddAuditLog is call-once per
// collection, and the helper's TryAddEnumerable is idempotent for the
// validator (a strict improvement over the previous AddSingleton).
services.AddValidatedOptions<AuditLogOptions, AuditLogOptionsValidator>(config, ConfigSectionName);
// M5 Bundle A: payload filter — truncates oversized RequestSummary /
// ResponseSummary / ErrorDetail / Extra fields between event
@@ -16,6 +16,7 @@
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
<PackageReference Include="Microsoft.Extensions.Options" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" />
<PackageReference Include="ZB.MOM.WW.Configuration" />
</ItemGroup>
<ItemGroup>
@@ -1,4 +1,4 @@
using Microsoft.Extensions.Options;
using ZB.MOM.WW.Configuration;
namespace ZB.MOM.WW.ScadaBridge.ClusterInfrastructure;
@@ -10,7 +10,7 @@ namespace ZB.MOM.WW.ScadaBridge.ClusterInfrastructure;
/// Registered with <c>ValidateOnStart()</c> so a bad <c>appsettings.json</c>
/// fails fast at boot rather than failing far from the cause.
/// </summary>
public sealed class ClusterOptionsValidator : IValidateOptions<ClusterOptions>
public sealed class ClusterOptionsValidator : OptionsValidatorBase<ClusterOptions>
{
/// <summary>Split-brain resolver strategies safe for ScadaBridge's two-node clusters.</summary>
private static readonly HashSet<string> AllowedStrategies = new(StringComparer.OrdinalIgnoreCase)
@@ -19,77 +19,51 @@ public sealed class ClusterOptionsValidator : IValidateOptions<ClusterOptions>
};
/// <summary>
/// Validates the cluster options, returning a failure result if any critical settings are misconfigured.
/// Validates the cluster options, recording a failure if any critical settings are misconfigured.
/// </summary>
/// <param name="name">Named options instance name (unused; all instances are validated identically).</param>
/// <param name="builder">The accumulator to record failures on.</param>
/// <param name="options">The cluster options to validate.</param>
public ValidateOptionsResult Validate(string? name, ClusterOptions options)
protected override void Validate(ValidationBuilder builder, ClusterOptions options)
{
var failures = new List<string>();
// CI-012: design doc states "both nodes are seed nodes — each node lists
// both itself and its partner" so a properly-configured deployment lists
// two. Accepting a single-seed configuration silently defeats the
// "no startup ordering dependency" guarantee called out by
// Component-ClusterInfrastructure.md (Node Configuration).
builder.RequireThat(options.SeedNodes is not null && options.SeedNodes.Count >= 2,
"ClusterOptions.SeedNodes must contain at least 2 seed nodes "
+ "(Component-ClusterInfrastructure.md → Node Configuration: "
+ "both nodes are seed nodes); a single-seed configuration defeats "
+ "the no-startup-ordering-dependency guarantee.");
if (options.SeedNodes is null || options.SeedNodes.Count < 2)
{
// CI-012: design doc states "both nodes are seed nodes — each node lists
// both itself and its partner" so a properly-configured deployment lists
// two. Accepting a single-seed configuration silently defeats the
// "no startup ordering dependency" guarantee called out by
// Component-ClusterInfrastructure.md (Node Configuration).
failures.Add(
"ClusterOptions.SeedNodes must contain at least 2 seed nodes "
+ "(Component-ClusterInfrastructure.md → Node Configuration: "
+ "both nodes are seed nodes); a single-seed configuration defeats "
+ "the no-startup-ordering-dependency guarantee.");
}
builder.RequireThat(
!string.IsNullOrWhiteSpace(options.SplitBrainResolverStrategy)
&& AllowedStrategies.Contains(options.SplitBrainResolverStrategy),
$"ClusterOptions.SplitBrainResolverStrategy must be 'keep-oldest' for a two-node cluster; " +
$"'{options.SplitBrainResolverStrategy}' would risk a total cluster shutdown on a partition.");
if (string.IsNullOrWhiteSpace(options.SplitBrainResolverStrategy)
|| !AllowedStrategies.Contains(options.SplitBrainResolverStrategy))
{
failures.Add(
$"ClusterOptions.SplitBrainResolverStrategy must be 'keep-oldest' for a two-node cluster; " +
$"'{options.SplitBrainResolverStrategy}' would risk a total cluster shutdown on a partition.");
}
builder.RequireThat(options.MinNrOfMembers == 1,
$"ClusterOptions.MinNrOfMembers must be 1 (was {options.MinNrOfMembers}); " +
"any other value blocks the cluster singleton after failover and halts all data collection.");
if (options.MinNrOfMembers != 1)
{
failures.Add(
$"ClusterOptions.MinNrOfMembers must be 1 (was {options.MinNrOfMembers}); " +
"any other value blocks the cluster singleton after failover and halts all data collection.");
}
builder.RequireThat(options.StableAfter > TimeSpan.Zero,
"ClusterOptions.StableAfter must be a positive duration.");
if (options.StableAfter <= TimeSpan.Zero)
{
failures.Add("ClusterOptions.StableAfter must be a positive duration.");
}
builder.RequireThat(options.HeartbeatInterval > TimeSpan.Zero,
"ClusterOptions.HeartbeatInterval must be a positive duration.");
if (options.HeartbeatInterval <= TimeSpan.Zero)
{
failures.Add("ClusterOptions.HeartbeatInterval must be a positive duration.");
}
builder.RequireThat(options.FailureDetectionThreshold > TimeSpan.Zero,
"ClusterOptions.FailureDetectionThreshold must be a positive duration.");
if (options.FailureDetectionThreshold <= TimeSpan.Zero)
{
failures.Add("ClusterOptions.FailureDetectionThreshold must be a positive duration.");
}
builder.RequireThat(options.HeartbeatInterval < options.FailureDetectionThreshold,
$"ClusterOptions.HeartbeatInterval ({options.HeartbeatInterval}) must be well below " +
$"FailureDetectionThreshold ({options.FailureDetectionThreshold}); otherwise nodes are " +
"declared unreachable before a heartbeat can arrive.");
if (options.HeartbeatInterval >= options.FailureDetectionThreshold)
{
failures.Add(
$"ClusterOptions.HeartbeatInterval ({options.HeartbeatInterval}) must be well below " +
$"FailureDetectionThreshold ({options.FailureDetectionThreshold}); otherwise nodes are " +
"declared unreachable before a heartbeat can arrive.");
}
if (!options.DownIfAlone)
{
failures.Add(
"ClusterOptions.DownIfAlone must be true for the keep-oldest resolver "
+ "(Component-ClusterInfrastructure.md → Split-Brain Resolution); with it false the "
+ "oldest node can run as an isolated single-node cluster during a partition while the "
+ "younger node forms its own, producing two live clusters.");
}
return failures.Count > 0
? ValidateOptionsResult.Fail(failures)
: ValidateOptionsResult.Success;
builder.RequireThat(options.DownIfAlone,
"ClusterOptions.DownIfAlone must be true for the keep-oldest resolver "
+ "(Component-ClusterInfrastructure.md → Split-Brain Resolution); with it false the "
+ "oldest node can run as an isolated single-node cluster during a partition while the "
+ "younger node forms its own, producing two live clusters.");
}
}
@@ -10,6 +10,7 @@
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" />
<PackageReference Include="Microsoft.Extensions.Options" />
<PackageReference Include="ZB.MOM.WW.Configuration" />
</ItemGroup>
<ItemGroup>
@@ -0,0 +1,94 @@
using System.Diagnostics.Metrics;
namespace ZB.MOM.WW.ScadaBridge.Commons.Observability;
/// <summary>
/// Central <see cref="Meter"/> + instrument definitions for ScadaBridge's application
/// telemetry, modelled on OtOpcUa's <c>OtOpcUaTelemetry</c>. Modules emit through these
/// pre-created instruments so a single OpenTelemetry / Prometheus binding in
/// <c>Host</c> (registered via <c>AddZbTelemetry</c> with this meter named in
/// <c>ZbTelemetryOptions.Meters</c>) catches everything. No exporter is required —
/// instruments are no-op until a listener attaches, so tests and dev hosts pay nothing
/// for instrumentation that nobody scrapes.
///
/// Instrument names follow the OpenTelemetry semantic convention pattern
/// <c>scadabridge.&lt;subsystem&gt;.&lt;event&gt;</c>. This task defines the instruments and
/// their emit helpers; four later tasks wire the actual emit points. Until those land the
/// helpers are dormant but inert — calling them is safe and simply records against a meter
/// that nothing observes.
/// </summary>
public static class ScadaBridgeTelemetry
{
/// <summary>The meter name registered with OTel via <c>ZbTelemetryOptions.Meters</c>.</summary>
public const string MeterName = "ZB.MOM.WW.ScadaBridge";
/// <summary>Singleton <see cref="Meter"/> all instruments hang off.</summary>
private static readonly Meter Meter = new(MeterName);
// ---------------- Counters ----------------
/// <summary>Incremented each time a deployment is successfully applied.</summary>
private static readonly Counter<long> _deploymentsApplied =
Meter.CreateCounter<long>("scadabridge.deployments.applied", unit: "1",
description: "Deployments applied.");
/// <summary>Incremented for each inbound API request, tagged with the API method.</summary>
private static readonly Counter<long> _inboundApiRequests =
Meter.CreateCounter<long>("scadabridge.inbound_api.requests", unit: "1",
description: "Inbound API requests, tagged by method.");
// ---------------- Observable gauges ----------------
/// <summary>Current count of open site connections, mutated via <see cref="Interlocked"/>.</summary>
private static long _siteConnectionsUp;
/// <summary>Provider that yields the live StoreAndForward queue depth; set by a later task.</summary>
private static Func<long>? _queueDepthProvider;
#pragma warning disable IDE0052 // Held to keep the observable gauges alive for the meter's lifetime.
/// <summary>Gauge reporting the number of currently open site connections.</summary>
private static readonly ObservableGauge<long> _siteConnectionUp =
Meter.CreateObservableGauge<long>("scadabridge.site.connection.up",
() => Interlocked.Read(ref _siteConnectionsUp),
description: "Number of currently open site connections.");
/// <summary>Gauge reporting the current StoreAndForward queue depth via the registered provider.</summary>
private static readonly ObservableGauge<long> _storeAndForwardQueueDepth =
Meter.CreateObservableGauge<long>("scadabridge.store_and_forward.queue.depth",
() => Volatile.Read(ref _queueDepthProvider) is { } p ? p() : 0L,
unit: "items",
description: "Current StoreAndForward queue depth.");
#pragma warning restore IDE0052
// ---------------- Emit helpers ----------------
/// <summary>Records that a deployment was applied.</summary>
public static void RecordDeploymentApplied() => _deploymentsApplied.Add(1);
/// <summary>Records an inbound API request for the given <paramref name="method"/>.</summary>
/// <param name="method">The API method the request targeted.</param>
public static void RecordInboundApiRequest(string method) =>
_inboundApiRequests.Add(1, new KeyValuePair<string, object?>("method", method));
/// <summary>Records that a site connection opened (increments the up-count gauge).</summary>
public static void SiteConnectionOpened() => Interlocked.Increment(ref _siteConnectionsUp);
/// <summary>Records that a site connection closed (decrements the up-count gauge).</summary>
public static void SiteConnectionClosed() => Interlocked.Decrement(ref _siteConnectionsUp);
/// <summary>
/// Registers the provider the StoreAndForward queue-depth gauge reads on each observation.
/// A later task supplies a provider that reads the real StoreAndForward depth. A null
/// provider is ignored so the gauge falls back to reporting 0.
/// </summary>
/// <param name="provider">A callback returning the current queue depth.</param>
public static void SetQueueDepthProvider(Func<long> provider)
{
if (provider is null)
{
return;
}
Volatile.Write(ref _queueDepthProvider, provider);
}
}
@@ -7,6 +7,7 @@ using Microsoft.Extensions.Options;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Audit;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Audit;
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
using GrpcStatus = Grpc.Core.Status;
namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc;
@@ -264,6 +265,14 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase
"Stream {CorrelationId} started for {Instance} (subscription {SubscriptionId})",
request.CorrelationId, request.InstanceUniqueName, subscriptionId);
// Telemetry follow-on: the connection is now fully established (Subscribe
// succeeded, so no leak via the catch above). Count it up here and balance
// it in the finally below so the scadabridge.site.connection.up gauge is
// decremented on EVERY exit path — normal completion, client-cancel /
// duplicate-replacement (OperationCanceledException), server shutdown
// (CancelAllStreams -> Cts.Cancel), and any other exception — guaranteeing
// exactly one Closed per Opened and a gauge that never drifts up.
ScadaBridgeTelemetry.SiteConnectionOpened();
try
{
await foreach (var evt in channel.Reader.ReadAllAsync(streamCts.Token))
@@ -277,6 +286,7 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase
}
finally
{
ScadaBridgeTelemetry.SiteConnectionClosed();
_streamSubscriber.RemoveSubscriber(relayActor);
_actorSystem!.Stop(relayActor);
channel.Writer.TryComplete();
@@ -7,6 +7,7 @@ using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Lifecycle;
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
using ZB.MOM.WW.ScadaBridge.Commons.Types;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Flattening;
@@ -244,6 +245,16 @@ public class DeploymentService
if (response.Status == DeploymentStatus.Success)
{
// Telemetry: one instance deployment successfully applied to a
// site. Counted once per successful deploy operation (the unit
// of scadabridge.deployments.applied — one DeployInstanceAsync
// deploys exactly one instance to one site). Emitted only on this
// confirmed-Success path, so failures, timeouts/retries (the
// catch block), and the reconciliation path (which recovers a
// PRIOR timed-out apply rather than performing a fresh one) do
// not increment it.
ScadaBridgeTelemetry.RecordDeploymentApplied();
// The site has applied the deployment. The post-success
// persistence below is best-effort: a failure here must be
// logged loudly for operator reconciliation but must not flip
@@ -1,4 +1,4 @@
using Microsoft.Extensions.Options;
using ZB.MOM.WW.Configuration;
namespace ZB.MOM.WW.ScadaBridge.HealthMonitoring;
@@ -14,51 +14,40 @@ namespace ZB.MOM.WW.ScadaBridge.HealthMonitoring;
/// <c>ValidateOnStart()</c> so a bad <c>ScadaBridge:HealthMonitoring</c> section
/// fails fast at boot with a clear, key-naming message.
/// </summary>
public sealed class HealthMonitoringOptionsValidator : IValidateOptions<HealthMonitoringOptions>
public sealed class HealthMonitoringOptionsValidator : OptionsValidatorBase<HealthMonitoringOptions>
{
/// <summary>
/// Validates the health monitoring options, returning a failure result if any interval values are non-positive.
/// Validates the health monitoring options, recording a failure if any interval values are non-positive.
/// </summary>
/// <param name="name">Named options instance name (unused).</param>
/// <param name="builder">The accumulator to record failures on.</param>
/// <param name="options">The health monitoring options to validate.</param>
public ValidateOptionsResult Validate(string? name, HealthMonitoringOptions options)
protected override void Validate(ValidationBuilder builder, HealthMonitoringOptions options)
{
var failures = new List<string>();
builder.RequireThat(options.ReportInterval > TimeSpan.Zero,
$"ScadaBridge:HealthMonitoring:ReportInterval must be a positive duration " +
$"(was {options.ReportInterval}); it is used directly as a PeriodicTimer period.");
if (options.ReportInterval <= TimeSpan.Zero)
{
failures.Add(
$"ScadaBridge:HealthMonitoring:ReportInterval must be a positive duration " +
$"(was {options.ReportInterval}); it is used directly as a PeriodicTimer period.");
}
builder.RequireThat(options.OfflineTimeout > TimeSpan.Zero,
$"ScadaBridge:HealthMonitoring:OfflineTimeout must be a positive duration " +
$"(was {options.OfflineTimeout}); it drives the offline-check PeriodicTimer cadence.");
if (options.OfflineTimeout <= TimeSpan.Zero)
{
failures.Add(
$"ScadaBridge:HealthMonitoring:OfflineTimeout must be a positive duration " +
$"(was {options.OfflineTimeout}); it drives the offline-check PeriodicTimer cadence.");
}
builder.RequireThat(options.CentralOfflineTimeout > TimeSpan.Zero,
$"ScadaBridge:HealthMonitoring:CentralOfflineTimeout must be a positive duration " +
$"(was {options.CentralOfflineTimeout}).");
if (options.CentralOfflineTimeout <= TimeSpan.Zero)
{
failures.Add(
$"ScadaBridge:HealthMonitoring:CentralOfflineTimeout must be a positive duration " +
$"(was {options.CentralOfflineTimeout}).");
}
if (options.OfflineTimeout > TimeSpan.Zero
&& options.CentralOfflineTimeout > TimeSpan.Zero
&& options.CentralOfflineTimeout < options.OfflineTimeout)
{
failures.Add(
$"ScadaBridge:HealthMonitoring:CentralOfflineTimeout ({options.CentralOfflineTimeout}) " +
$"must be >= OfflineTimeout ({options.OfflineTimeout}): the synthetic 'central' site has " +
"no heartbeat source and is fed only by the slower self-report loop, so it needs at " +
"least as much offline grace as a real site.");
}
return failures.Count > 0
? ValidateOptionsResult.Fail(failures)
: ValidateOptionsResult.Success;
// Valid when CentralOfflineTimeout >= OfflineTimeout (both already
// required to be positive above). The De Morgan'd guard !(both positive
// AND Central < Offline) is true unless BOTH timeouts are positive and
// Central is strictly smaller — so it stays silent when either field is
// non-positive, leaving that failure to the dedicated positive-duration
// checks above rather than double-firing here.
builder.RequireThat(
!(options.OfflineTimeout > TimeSpan.Zero
&& options.CentralOfflineTimeout > TimeSpan.Zero
&& options.CentralOfflineTimeout < options.OfflineTimeout),
$"ScadaBridge:HealthMonitoring:CentralOfflineTimeout ({options.CentralOfflineTimeout}) " +
$"must be >= OfflineTimeout ({options.OfflineTimeout}): the synthetic 'central' site has " +
"no heartbeat source and is fed only by the slower self-report loop, so it needs at " +
"least as much offline grace as a real site.");
}
}
@@ -12,6 +12,7 @@
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
<PackageReference Include="Microsoft.Extensions.Options" />
<PackageReference Include="ZB.MOM.WW.Configuration" />
</ItemGroup>
<ItemGroup>
@@ -1,5 +1,6 @@
using Serilog;
using Serilog.Events;
using ZB.MOM.WW.Telemetry.Serilog;
namespace ZB.MOM.WW.ScadaBridge.Host;
@@ -85,7 +86,8 @@ public static class LoggerConfigurationFactory
.MinimumLevel.Is(minimumLevel)
.Enrich.WithProperty("SiteId", siteId)
.Enrich.WithProperty("NodeHostname", nodeHostname)
.Enrich.WithProperty("NodeRole", nodeRole);
.Enrich.WithProperty("NodeRole", nodeRole)
.Enrich.With(new TraceContextEnricher());
}
/// <summary>
@@ -20,4 +20,11 @@ public class NodeOptions
public int RemotingPort { get; set; } = 8081;
/// <summary>Gets or sets the gRPC port for the site stream server.</summary>
public int GrpcPort { get; set; } = 8083;
/// <summary>
/// HTTP/1.1 port serving the Prometheus /metrics scrape endpoint on site nodes.
/// Defaults to 8084 — deliberately distinct from <see cref="RemotingPort"/> (8082)
/// and <see cref="GrpcPort"/> (8083) so the Kestrel metrics listener never contends
/// with the Akka remoting port a site node binds.
/// </summary>
public int MetricsPort { get; set; } = 8084;
}
+32
View File
@@ -21,6 +21,7 @@ using ZB.MOM.WW.ScadaBridge.Security;
using ZB.MOM.WW.ScadaBridge.SiteCallAudit;
using ZB.MOM.WW.ScadaBridge.TemplateEngine;
using ZB.MOM.WW.ScadaBridge.Transport;
using ZB.MOM.WW.Telemetry;
using Serilog;
// SCADABRIDGE_CONFIG determines which role-specific config to load (Central or Site)
@@ -248,6 +249,12 @@ try
// All three are anonymous and use the canonical ZbHealthWriter JSON output.
app.MapZbHealth();
// Observability — mount the always-on Prometheus /metrics scrape endpoint.
// AddZbTelemetry (in SiteServiceRegistration.BindSharedOptions) wires the OTel
// Resource + standard instrumentation + Prometheus exporter; this exposes them.
// Requires endpoint routing (app.UseRouting() above).
app.MapZbMetrics();
app.MapStaticAssets();
app.MapCentralUI<ZB.MOM.WW.ScadaBridge.Host.Components.App>();
app.MapInboundAPI();
@@ -286,6 +293,13 @@ try
// Read GrpcPort from config (NodeOptions already has default 8083)
var grpcPort = configuration.GetValue<int>("ScadaBridge:Node:GrpcPort", 8083);
// Read MetricsPort from config (NodeOptions already has default 8084).
// Separate HTTP/1.1 listener so a standard HTTP/1.1 Prometheus scraper can
// reach /metrics; the gRPC port stays HTTP/2-only below. The default is
// 8084 — distinct from RemotingPort (8082, Akka) and GrpcPort (8083) so the
// metrics listener never collides with the Akka remoting port on site nodes.
var metricsPort = configuration.GetValue<int>("ScadaBridge:Node:MetricsPort", 8084);
// Configure Kestrel for HTTP/2 only on the gRPC port
builder.WebHost.ConfigureKestrel(options =>
{
@@ -293,6 +307,13 @@ try
{
listenOptions.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http2;
});
// Dedicated HTTP/1.1 (and HTTP/2) listener for the Prometheus /metrics
// scrape endpoint, reachable by an HTTP/1.1 scraper.
options.ListenAnyIP(metricsPort, listenOptions =>
{
listenOptions.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http1AndHttp2;
});
});
// gRPC server registration
@@ -304,6 +325,17 @@ try
var app = builder.Build();
// Endpoint routing middleware. The gRPC service mapping below and the
// /metrics scrape endpoint both run on endpoint routing, so UseRouting()
// must be present before the Map* calls on the site role.
app.UseRouting();
// Observability — mount the always-on Prometheus /metrics scrape endpoint.
// AddZbTelemetry (in SiteServiceRegistration.Configure → BindSharedOptions)
// wires the OTel Resource + standard instrumentation + Prometheus exporter;
// this exposes them on the site node too.
app.MapZbMetrics();
// Map gRPC service — resolves the singleton SiteStreamGrpcServer from DI
app.MapGrpcService<ZB.MOM.WW.ScadaBridge.Communication.Grpc.SiteStreamGrpcServer>();
@@ -2,6 +2,7 @@ using ZB.MOM.WW.ScadaBridge.AuditLog;
using ZB.MOM.WW.ScadaBridge.ClusterInfrastructure;
using ZB.MOM.WW.ScadaBridge.Communication;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
using ZB.MOM.WW.ScadaBridge.DataConnectionLayer;
using ZB.MOM.WW.ScadaBridge.ExternalSystemGateway;
using ZB.MOM.WW.ScadaBridge.HealthMonitoring;
@@ -11,6 +12,7 @@ using ZB.MOM.WW.ScadaBridge.NotificationService;
using ZB.MOM.WW.ScadaBridge.SiteEventLogging;
using ZB.MOM.WW.ScadaBridge.SiteRuntime;
using ZB.MOM.WW.ScadaBridge.StoreAndForward;
using ZB.MOM.WW.Telemetry;
namespace ZB.MOM.WW.ScadaBridge.Host;
@@ -103,10 +105,16 @@ public static class SiteServiceRegistration
public static void BindSharedOptions(IServiceCollection services, IConfiguration config)
{
services.Configure<NodeOptions>(config.GetSection("ScadaBridge:Node"));
services.Configure<ClusterOptions>(config.GetSection("ScadaBridge:Cluster"));
// Bind + eagerly validate: ClusterOptionsValidator is registered (TryAddEnumerable)
// by the ClusterInfrastructure module, so chaining ValidateOnStart() here makes a bad
// ScadaBridge:Cluster section fail fast at host build instead of lazily on first resolve.
services.AddOptions<ClusterOptions>().Bind(config.GetSection("ScadaBridge:Cluster")).ValidateOnStart();
services.Configure<DatabaseOptions>(config.GetSection("ScadaBridge:Database"));
services.Configure<CommunicationOptions>(config.GetSection("ScadaBridge:Communication"));
services.Configure<HealthMonitoringOptions>(config.GetSection("ScadaBridge:HealthMonitoring"));
// Bind + eagerly validate: HealthMonitoringOptionsValidator is registered (TryAddEnumerable)
// by the HealthMonitoring module, so chaining ValidateOnStart() here makes a bad
// ScadaBridge:HealthMonitoring section fail fast at host build instead of lazily on first resolve.
services.AddOptions<HealthMonitoringOptions>().Bind(config.GetSection("ScadaBridge:HealthMonitoring")).ValidateOnStart();
services.Configure<NotificationOptions>(config.GetSection("ScadaBridge:Notification"));
services.Configure<LoggingOptions>(config.GetSection("ScadaBridge:Logging"));
@@ -114,5 +122,26 @@ public static class SiteServiceRegistration
// writers so they can stamp the SourceNode column. Registered here in
// shared bootstrap because every node (central + site) needs it.
services.AddSingleton<INodeIdentityProvider, NodeIdentityProvider>();
// Observability — shared ZB.MOM.WW.Telemetry. Registered in shared bootstrap so
// BOTH the central and site composition roots wire the OTel Resource (the
// service.name/site.id/node.role identity triple) + standard instrumentation +
// the always-on Prometheus exporter. Mount the /metrics scrape endpoint per role
// with app.MapZbMetrics(). The same `?? "central"` SiteId default Program.cs uses
// is applied here so the Resource attribute matches the log-enricher value.
// The application meter is named so OTel observes its instruments; emit points are
// wired by follow-on tasks (the instruments are no-op until a listener attaches).
services.AddZbTelemetry(o =>
{
o.ServiceName = "scadabridge";
o.SiteId = config["ScadaBridge:Node:SiteId"] ?? "central";
o.NodeRole = config["ScadaBridge:Node:Role"];
o.Meters = [ScadaBridgeTelemetry.MeterName];
if (Enum.TryParse<ZbExporter>(config["ScadaBridge:Telemetry:Exporter"], ignoreCase: true, out var exporter))
o.Exporter = exporter;
var otlp = config["ScadaBridge:Telemetry:OtlpEndpoint"];
if (!string.IsNullOrWhiteSpace(otlp))
o.OtlpEndpoint = otlp;
});
}
}
@@ -1,3 +1,5 @@
using ZB.MOM.WW.Configuration;
namespace ZB.MOM.WW.ScadaBridge.Host;
/// <summary>
@@ -10,77 +12,98 @@ public static class StartupValidator
/// <param name="configuration">The application configuration to validate.</param>
public static void Validate(IConfiguration configuration)
{
var errors = new List<string>();
// Resolve the same locals the original imperative validator used, so the
// cross-field predicates below can close over them. ConfigPreflight.Require
// passes config[key] to each predicate, but the cross-field rules ignore that
// argument and read these resolved values instead — preserving the exact
// conditions (and therefore the byte-identical failure messages and ordering)
// of the original StartupValidator.
var nodeSection = configuration.GetSection("ScadaBridge:Node");
var role = nodeSection["Role"];
if (string.IsNullOrEmpty(role) || (role != "Central" && role != "Site"))
errors.Add("ScadaBridge:Node:Role must be 'Central' or 'Site'");
if (string.IsNullOrEmpty(nodeSection["NodeHostname"]))
errors.Add("ScadaBridge:Node:NodeHostname is required");
var portStr = nodeSection["RemotingPort"];
if (!int.TryParse(portStr, out var port) || port < 1 || port > 65535)
errors.Add("ScadaBridge:Node:RemotingPort must be 1-65535");
if (role == "Site" && string.IsNullOrEmpty(nodeSection["SiteId"]))
errors.Add("ScadaBridge:Node:SiteId is required for Site nodes");
if (role == "Central")
{
var dbSection = configuration.GetSection("ScadaBridge:Database");
if (string.IsNullOrEmpty(dbSection["ConfigurationDb"]))
errors.Add("ScadaBridge:Database:ConfigurationDb connection string required for Central");
var secSection = configuration.GetSection("ScadaBridge:Security");
if (string.IsNullOrEmpty(secSection["LdapServer"]))
errors.Add("ScadaBridge:Security:LdapServer required for Central");
if (string.IsNullOrEmpty(secSection["JwtSigningKey"]))
errors.Add("ScadaBridge:Security:JwtSigningKey required for Central");
}
bool portValid = int.TryParse(portStr, out var port) && port >= 1 && port <= 65535;
var seedNodes = configuration.GetSection("ScadaBridge:Cluster:SeedNodes").Get<List<string>>();
if (seedNodes == null || seedNodes.Count < 2)
errors.Add("ScadaBridge:Cluster:SeedNodes must have at least 2 entries");
if (role == "Site")
{
var grpcPortStr = nodeSection["GrpcPort"];
int grpcPort = 8083; // NodeOptions default when the key is absent
if (grpcPortStr != null && (!int.TryParse(grpcPortStr, out grpcPort) || grpcPort < 1 || grpcPort > 65535))
errors.Add("ScadaBridge:Node:GrpcPort must be 1-65535");
// GrpcPort: default 8083 when absent; only fails the range rule when the key is
// present AND invalid. The out-param assignment mirrors the original so the
// resolved grpcPort feeds the cross-field rules even on a parse failure.
var grpcPortStr = nodeSection["GrpcPort"];
int grpcPort = 8083; // NodeOptions default when the key is absent
bool grpcValid = !(grpcPortStr != null && (!int.TryParse(grpcPortStr, out grpcPort) || grpcPort < 1 || grpcPort > 65535));
// Host-007 / REQ-HOST-4: the gRPC (Kestrel HTTP/2) port and the Akka
// remoting port must differ. Identical values make Kestrel and
// Akka.Remote contend for the same TCP port and fail opaquely at
// runtime. Uses the resolved GrpcPort, including the 8083 default.
if (port == grpcPort)
errors.Add("ScadaBridge:Node:GrpcPort must differ from RemotingPort");
// MetricsPort: default 8084 when absent; same parse-or-default contract as GrpcPort.
var metricsPortStr = nodeSection["MetricsPort"];
int metricsPort = 8084; // NodeOptions default when the key is absent
bool metricsValid = !(metricsPortStr != null && (!int.TryParse(metricsPortStr, out metricsPort) || metricsPort < 1 || metricsPort > 65535));
var dbSection = configuration.GetSection("ScadaBridge:Database");
if (string.IsNullOrEmpty(dbSection["SiteDbPath"]))
errors.Add("ScadaBridge:Database:SiteDbPath required for Site nodes");
// Host-004: a seed node must reference an Akka.Remote endpoint, never the
// Kestrel HTTP/2 gRPC port. A seed entry whose port equals this node's
// GrpcPort would make a joining node attempt an Akka.Remote TCP
// association against the gRPC listener and fail.
if (seedNodes != null)
ConfigPreflight.For(configuration)
// Role / NodeHostname / RemotingPort (unconditional)
.Require("ScadaBridge:Node:Role",
_ => !(string.IsNullOrEmpty(role) || (role != "Central" && role != "Site")),
"must be 'Central' or 'Site'")
.Require("ScadaBridge:Node:NodeHostname",
_ => !string.IsNullOrEmpty(nodeSection["NodeHostname"]),
"is required")
.Require("ScadaBridge:Node:RemotingPort",
_ => portValid,
"must be 1-65535")
// SiteId (Site only) — note: OUTSIDE the big Site block in the original,
// so it must run before the unconditional SeedNodes-count rule.
.When(role == "Site", p => p
.Require("ScadaBridge:Node:SiteId",
_ => !string.IsNullOrEmpty(nodeSection["SiteId"]),
"is required for Site nodes"))
// Central-only database/security rules.
.When(role == "Central", p => p
.Require("ScadaBridge:Database:ConfigurationDb",
_ => !string.IsNullOrEmpty(configuration.GetSection("ScadaBridge:Database")["ConfigurationDb"]),
"connection string required for Central")
.Require("ScadaBridge:Security:LdapServer",
_ => !string.IsNullOrEmpty(configuration.GetSection("ScadaBridge:Security")["LdapServer"]),
"required for Central")
.Require("ScadaBridge:Security:JwtSigningKey",
_ => !string.IsNullOrEmpty(configuration.GetSection("ScadaBridge:Security")["JwtSigningKey"]),
"required for Central"))
// SeedNodes count (unconditional, after SiteId).
.Require("ScadaBridge:Cluster:SeedNodes",
_ => seedNodes != null && seedNodes.Count >= 2,
"must have at least 2 entries")
// The big Site-only block: GrpcPort/MetricsPort validity + cross-field
// collisions + SiteDbPath + seed-node-port loop, in the original order.
.When(role == "Site", p =>
{
foreach (var seed in seedNodes)
{
if (SeedNodePort(seed) == grpcPort)
errors.Add(
$"ScadaBridge:Cluster:SeedNodes entry '{seed}' must not target the gRPC port " +
$"({grpcPort}); seed nodes must reference Akka remoting ports");
}
}
}
// Host-007 / REQ-HOST-4: GrpcPort range, then GrpcPort vs RemotingPort.
p.Require("ScadaBridge:Node:GrpcPort", _ => grpcValid, "must be 1-65535");
// Identical GrpcPort/RemotingPort make Kestrel and Akka.Remote contend
// for the same TCP port. Uses the resolved GrpcPort, including 8083.
p.Require("ScadaBridge:Node:GrpcPort", _ => port != grpcPort, "must differ from RemotingPort");
if (errors.Count > 0)
throw new InvalidOperationException(
$"Configuration validation failed:\n{string.Join("\n", errors.Select(e => $" - {e}"))}");
// Host-007 / REQ-HOST-4: MetricsPort range, then MetricsPort vs both ports.
p.Require("ScadaBridge:Node:MetricsPort", _ => metricsValid, "must be 1-65535");
// The Kestrel metrics (HTTP/1.1) listener port must differ from BOTH the
// Akka remoting port and the gRPC port. Uses the resolved MetricsPort (8084 default).
p.Require("ScadaBridge:Node:MetricsPort", _ => metricsPort != port, "must differ from RemotingPort");
p.Require("ScadaBridge:Node:MetricsPort", _ => metricsPort != grpcPort, "must differ from GrpcPort");
p.Require("ScadaBridge:Database:SiteDbPath",
_ => !string.IsNullOrEmpty(configuration.GetSection("ScadaBridge:Database")["SiteDbPath"]),
"required for Site nodes");
// Host-004: a seed node must reference an Akka.Remote endpoint, never the
// Kestrel HTTP/2 gRPC port. A seed entry whose port equals this node's
// GrpcPort would make a joining node attempt an Akka.Remote TCP
// association against the gRPC listener and fail.
foreach (var seed in seedNodes ?? Enumerable.Empty<string>())
{
p.Require("ScadaBridge:Cluster:SeedNodes",
_ => SeedNodePort(seed) != grpcPort,
$"entry '{seed}' must not target the gRPC port " +
$"({grpcPort}); seed nodes must reference Akka remoting ports");
}
})
.ThrowIfInvalid();
}
/// <summary>
@@ -28,9 +28,12 @@
<!-- Transitive override: Akka.Hosting 1.5.62 pins OpenTelemetry.Api 1.9.0 which is flagged
(GHSA-g94r-2vxg-569j, GHSA-8785-wc3w-h8q6). Bumping directly clears both advisories. -->
<PackageReference Include="OpenTelemetry.Api" />
<PackageReference Include="ZB.MOM.WW.Configuration" />
<PackageReference Include="ZB.MOM.WW.Health" />
<PackageReference Include="ZB.MOM.WW.Health.Akka" />
<PackageReference Include="ZB.MOM.WW.Health.EntityFrameworkCore" />
<PackageReference Include="ZB.MOM.WW.Telemetry" />
<PackageReference Include="ZB.MOM.WW.Telemetry.Serilog" />
</ItemGroup>
<ItemGroup>
@@ -7,6 +7,7 @@
"SiteId": "site-a",
"RemotingPort": 8082,
"GrpcPort": 8083,
"MetricsPort": 8084,
"NodeName": "node-a"
},
"Cluster": {
@@ -5,6 +5,7 @@ using Microsoft.AspNetCore.Routing;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
using ZB.MOM.WW.ScadaBridge.InboundAPI.Middleware;
namespace ZB.MOM.WW.ScadaBridge.InboundAPI;
@@ -44,6 +45,16 @@ public static class EndpointExtensions
if (!validationResult.IsValid)
{
// Telemetry follow-on: count every inbound request, including auth
// failures. The raw {methodName} route value is arbitrary caller input
// and would be high-cardinality, so failures are tagged with a small
// bounded set of sentinels keyed off the validator's status code rather
// than the unvalidated name (401 → "<unauthorized>", 403 → "<forbidden>").
ScadaBridgeTelemetry.RecordInboundApiRequest(
validationResult.StatusCode == StatusCodes.Status401Unauthorized
? "<unauthorized>"
: "<forbidden>");
// WP-5: Failures-only logging
logger.LogWarning(
"Inbound API auth failure for method {Method}: {Error} (status {StatusCode})",
@@ -56,6 +67,12 @@ public static class EndpointExtensions
var method = validationResult.Method!;
// Telemetry follow-on: count this inbound request against the resolved,
// registered method name. method.Name comes from the repository's method
// catalogue (an exact-name lookup), so the `method` tag is bounded to the
// set of configured API methods — never the raw caller-supplied route value.
ScadaBridgeTelemetry.RecordInboundApiRequest(method.Name);
// Audit Log (#23 M4 Bundle D): publish the resolved API key name so
// AuditWriteMiddleware can populate AuditEvent.Actor in its finally
// block. Done AFTER validation succeeded — auth failures leave the
@@ -1,4 +1,4 @@
using Microsoft.Extensions.Options;
using ZB.MOM.WW.Configuration;
namespace ZB.MOM.WW.ScadaBridge.Security;
@@ -29,7 +29,7 @@ namespace ZB.MOM.WW.ScadaBridge.Security;
/// minimum-byte length contract co-located with the type that enforces it.
/// </para>
/// </summary>
public sealed class SecurityOptionsValidator : IValidateOptions<SecurityOptions>
public sealed class SecurityOptionsValidator : OptionsValidatorBase<SecurityOptions>
{
/// <summary>
/// The configuration section name <see cref="SecurityOptions"/> is bound
@@ -41,30 +41,16 @@ public sealed class SecurityOptionsValidator : IValidateOptions<SecurityOptions>
public const string ConfigSectionName = "Security";
/// <inheritdoc />
public ValidateOptionsResult Validate(string? name, SecurityOptions options)
protected override void Validate(ValidationBuilder builder, SecurityOptions options)
{
ArgumentNullException.ThrowIfNull(options);
builder.RequireThat(!string.IsNullOrWhiteSpace(options.LdapServer),
$"{ConfigSectionName}:{nameof(SecurityOptions.LdapServer)} is required " +
"but was empty or whitespace — set it to the LDAP server hostname or IP " +
"(e.g. \"ldap.example.com\").");
var failures = new List<string>();
if (string.IsNullOrWhiteSpace(options.LdapServer))
{
failures.Add(
$"{ConfigSectionName}:{nameof(SecurityOptions.LdapServer)} is required " +
"but was empty or whitespace — set it to the LDAP server hostname or IP " +
"(e.g. \"ldap.example.com\").");
}
if (string.IsNullOrWhiteSpace(options.LdapSearchBase))
{
failures.Add(
$"{ConfigSectionName}:{nameof(SecurityOptions.LdapSearchBase)} is required " +
"but was empty or whitespace — set it to the search-base DN " +
"(e.g. \"dc=example,dc=com\").");
}
return failures.Count == 0
? ValidateOptionsResult.Success
: ValidateOptionsResult.Fail(failures);
builder.RequireThat(!string.IsNullOrWhiteSpace(options.LdapSearchBase),
$"{ConfigSectionName}:{nameof(SecurityOptions.LdapSearchBase)} is required " +
"but was empty or whitespace — set it to the search-base DN " +
"(e.g. \"dc=example,dc=com\").");
}
}
@@ -14,6 +14,7 @@
<PackageReference Include="Microsoft.AspNetCore.Authorization" />
<PackageReference Include="System.IdentityModel.Tokens.Jwt" />
<PackageReference Include="Novell.Directory.Ldap.NETStandard" />
<PackageReference Include="ZB.MOM.WW.Configuration" />
</ItemGroup>
<ItemGroup>
@@ -1,5 +1,6 @@
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
using ZB.MOM.WW.ScadaBridge.Commons.Types;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
@@ -98,6 +99,48 @@ public class StoreAndForwardService
/// </summary>
private static readonly TimeSpan SweepShutdownWaitTimeout = TimeSpan.FromSeconds(10);
/// <summary>
/// WP-14 (telemetry): cached count of messages currently buffered for
/// forwarding — i.e. rows in <see cref="StoreAndForwardMessageStatus.Pending"/>,
/// the live store-and-forward queue waiting to be delivered. This backs the
/// <c>scadabridge.store_and_forward.queue.depth</c> observable gauge.
/// <para>
/// The gauge's collection callback is synchronous and is invoked frequently by
/// the OpenTelemetry/Prometheus collector, so it must never run an async SQLite
/// <c>COUNT(*)</c>. Instead this <see cref="long"/> is seeded once from storage
/// in <see cref="StartAsync"/> and then adjusted in-process on the existing
/// paths that change the Pending population: <see cref="BufferAsync"/> (+1),
/// successful-retry removal and Pending→Parked transitions in
/// <see cref="RetryMessageAsync"/> (-1), and operator requeue in
/// <see cref="RetryParkedMessageAsync"/> (+1). The provider registered with
/// <see cref="ScadaBridgeTelemetry.SetQueueDepthProvider"/> reads it via
/// <see cref="Interlocked.Read"/> — non-blocking and sync-safe. It is an
/// approximate, eventually-consistent gauge (concurrent failover replication
/// applies to the standby's own store, not this counter), which is exactly
/// what a queue-depth metric needs.
/// </para>
/// </summary>
private long _bufferedCount;
/// <summary>
/// Test seam (WP-14 telemetry): simulates a concurrent pre-seed
/// <see cref="BufferAsync"/> increment landing on <see cref="_bufferedCount"/>
/// before <see cref="StartAsync"/> seeds it, so a test can prove the seed uses
/// <see cref="Interlocked.Add"/> (additive) rather than Exchange (clobbering).
/// </summary>
internal void TestOnly_IncrementBufferedCount() =>
Interlocked.Increment(ref _bufferedCount);
/// <summary>
/// WP-14 (telemetry): an instance field that guards against a single instance
/// registering the queue-depth provider (and re-seeding the counter) more than
/// once — e.g. a second <see cref="StartAsync"/> on the same instance. It does NOT
/// coordinate across instances: the gauge slot in <see cref="ScadaBridgeTelemetry"/>
/// is process-global, so in a multi-instance process the last <see cref="StartAsync"/>
/// wins the global slot. 0 = not yet registered, 1 = done.
/// </summary>
private int _queueDepthProviderRegistered;
/// <summary>
/// WP-10: Delivery handler delegate. The return value / exception is interpreted
/// the same way on both the immediate-delivery path (<see cref="EnqueueAsync"/>)
@@ -170,6 +213,27 @@ public class StoreAndForwardService
public async Task StartAsync()
{
await _storage.InitializeAsync();
// WP-14 (telemetry): seed the cached buffered-message count from the
// store exactly once (the gauge callback cannot run an async COUNT), then
// register the sync, non-blocking provider with the process-global
// ScadaBridgeTelemetry gauge. Both steps are inside the one-time guard so a
// second StartAsync on the same instance cannot double-seed.
//
// The seed is an Interlocked.Add — NOT an Exchange — to avoid a startup race:
// between the await above returning and this point, a concurrent BufferAsync
// could already have Interlocked.Increment'd _bufferedCount. Exchange would
// clobber that increment (losing a +1); Add preserves it. _bufferedCount
// starts at 0 and only BufferAsync increments it before the seed, so
// 0 + pending + (any concurrent increments) is the correct live count.
var pending = await _storage.GetMessageCountByStatusAsync(
StoreAndForwardMessageStatus.Pending);
if (Interlocked.CompareExchange(ref _queueDepthProviderRegistered, 1, 0) == 0)
{
Interlocked.Add(ref _bufferedCount, pending);
ScadaBridgeTelemetry.SetQueueDepthProvider(() => Interlocked.Read(ref _bufferedCount));
}
_retryTimer = new Timer(
// StoreAndForward-024: capture the sweep Task on each tick so
// StopAsync can await any in-flight invocation before the host
@@ -396,6 +460,10 @@ public class StoreAndForwardService
{
await _storage.EnqueueAsync(message);
_replication?.ReplicateEnqueue(message);
// WP-14 (telemetry): a freshly buffered row is Pending → grows the live
// queue depth. Bumped after the durable write so the gauge never leads the
// store.
Interlocked.Increment(ref _bufferedCount);
}
/// <summary>
@@ -452,6 +520,8 @@ public class StoreAndForwardService
{
await _storage.RemoveMessageAsync(message.Id);
_replication?.ReplicateRemove(message.Id);
// WP-14 (telemetry): a delivered row leaves the Pending queue.
Interlocked.Decrement(ref _bufferedCount);
RaiseActivity("Delivered", message.Category,
$"Delivered to {message.Target} after {message.RetryCount} retries");
@@ -483,6 +553,9 @@ public class StoreAndForwardService
message.Id);
return;
}
// WP-14 (telemetry): the row committed Pending→Parked, leaving the live
// forward queue. Only counted when the conditional update actually won.
Interlocked.Decrement(ref _bufferedCount);
_replication?.ReplicatePark(message);
RaiseActivity("Parked", message.Category,
$"Permanent failure for {message.Target}: handler returned false");
@@ -519,6 +592,9 @@ public class StoreAndForwardService
message.Id);
return;
}
// WP-14 (telemetry): the row committed Pending→Parked, leaving the
// live forward queue. Only counted when the conditional update won.
Interlocked.Decrement(ref _bufferedCount);
_replication?.ReplicatePark(message);
RaiseActivity("Parked", message.Category,
$"Max retries ({message.MaxRetries}) reached for {message.Target}");
@@ -737,6 +813,11 @@ public class StoreAndForwardService
return false;
}
// WP-14 (telemetry): an operator requeue moves Parked→Pending, re-adding the
// row to the live forward queue. Counted only when the conditional storage
// update actually flipped the row.
Interlocked.Increment(ref _bufferedCount);
// The active node just rewrote this row to Pending with retry_count = 0
// and cleared last_error / last_attempt_at (see
// StoreAndForwardStorage.RetryParkedMessageAsync). Reconstruct the
@@ -769,6 +850,7 @@ public class StoreAndForwardService
{
// Capture the category before the row is deleted so the activity log is
// labelled correctly.
// WP-14 (telemetry): Parked rows are not in _bufferedCount; discarding a Parked row needs no counter adjustment.
var message = await _storage.GetMessageByIdAsync(messageId);
var success = await _storage.DiscardParkedMessageAsync(messageId);
if (success)
@@ -1,3 +1,4 @@
using System.Diagnostics.Metrics;
using System.Threading.Channels;
using Akka.Actor;
using Akka.TestKit.Xunit2;
@@ -5,6 +6,7 @@ using Grpc.Core;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using NSubstitute;
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
using ZB.MOM.WW.ScadaBridge.Communication.Grpc;
namespace ZB.MOM.WW.ScadaBridge.Communication.Tests.Grpc;
@@ -342,6 +344,59 @@ public class SiteStreamGrpcServerTests : TestKit
subscriber.DidNotReceive().RemoveSubscriber(Arg.Any<IActorRef>());
}
[Fact]
public async Task SiteConnectionUpGauge_GoesToOneOnConnect_AndBackToZeroOnCancel()
{
// Telemetry follow-on: the scadabridge.site.connection.up gauge must read
// exactly 1 while a site stream is established and return to 0 once the
// stream terminates on the cancel path — proving SiteConnectionOpened() is
// matched by exactly one SiteConnectionClosed() in the handler's finally.
var server = CreateServer();
server.SetReady(Sys);
long ReadGauge()
{
long observed = 0;
using var listener = new MeterListener();
listener.InstrumentPublished = (instrument, l) =>
{
if (instrument.Meter.Name == ScadaBridgeTelemetry.MeterName &&
instrument.Name == "scadabridge.site.connection.up")
{
l.EnableMeasurementEvents(instrument);
}
};
listener.SetMeasurementEventCallback<long>((_, measurement, _, _) => observed = measurement);
listener.Start();
listener.RecordObservableInstruments();
return observed;
}
var baseline = ReadGauge();
var cts = new CancellationTokenSource();
var context = CreateMockContext(cts.Token);
var writer = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
var streamTask = Task.Run(() => server.SubscribeInstance(
MakeRequest("corr-gauge", "Site1.Pump01"), writer, context));
await WaitForConditionAsync(() => server.ActiveStreamCount == 1);
// While the stream is up the gauge is one above whatever baseline other
// (possibly parallel) tests left behind — read relative so the assertion
// is robust to test interleaving on the process-wide static counter.
Assert.Equal(baseline + 1, ReadGauge());
cts.Cancel();
await streamTask;
await WaitForConditionAsync(() => server.ActiveStreamCount == 0);
// After the cancel path runs the finally, the gauge is balanced back to
// the baseline — no leaked "up" count.
Assert.Equal(baseline, ReadGauge());
}
[Fact]
public void SetReady_AllowsStreamCreation()
{
@@ -1,3 +1,4 @@
using System.Diagnostics.Metrics;
using Akka.Actor;
using Akka.TestKit.Xunit2;
using Microsoft.Extensions.Logging.Abstractions;
@@ -10,6 +11,7 @@ using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Lifecycle;
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
using ZB.MOM.WW.ScadaBridge.Commons.Types;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Flattening;
@@ -558,6 +560,106 @@ public class DeploymentServiceTests : TestKit
Arg.Any<object>(), Arg.Any<CancellationToken>());
}
// ── Telemetry follow-on: scadabridge.deployments.applied on deploy success ──
[Fact]
public async Task DeployInstanceAsync_SiteSucceeds_EmitsDeploymentsAppliedCounterOnce()
{
// A successful deployment must increment the
// scadabridge.deployments.applied counter exactly once — one
// DeployInstanceAsync deploys one instance to one site, so the unit is
// one increment per successful deploy operation.
var instance = new Instance("MetricInst") { Id = 55, SiteId = 1, State = InstanceState.NotDeployed };
_repo.GetInstanceByIdAsync(55, Arg.Any<CancellationToken>()).Returns(instance);
SetupValidPipeline(55, "MetricInst", "sha256:target");
_repo.GetCurrentDeploymentStatusAsync(55, Arg.Any<CancellationToken>())
.Returns((DeploymentRecord?)null);
var counters = new ReconcileProbeCounters();
var commActor = Sys.ActorOf(Props.Create(() =>
new ReconcileProbeActor(counters, siteHash: "sha256:target", failQuery: false)));
var service = CreateServiceWithCommActor(commActor);
long applied = 0;
using var listener = new MeterListener
{
InstrumentPublished = (instrument, l) =>
{
if (instrument.Meter.Name == ScadaBridgeTelemetry.MeterName
&& instrument.Name == "scadabridge.deployments.applied")
{
l.EnableMeasurementEvents(instrument);
}
}
};
listener.SetMeasurementEventCallback<long>((_, measurement, _, _) =>
Interlocked.Add(ref applied, measurement));
listener.Start();
var result = await service.DeployInstanceAsync(55, "admin");
listener.Dispose();
Assert.True(result.IsSuccess);
// Fresh first-time deploy applied -> exactly one increment.
Assert.Equal(1, Interlocked.Read(ref applied));
}
[Fact]
public async Task DeployInstanceAsync_Reconciled_DoesNotEmitDeploymentsAppliedCounter()
{
// The reconciliation path recovers a PRIOR timed-out apply rather than
// performing a fresh one; counting it would risk double-counting the
// original apply, so scadabridge.deployments.applied must NOT increment
// on a reconciled (no re-deploy) success.
var instance = new Instance("MetricReconcileInst")
{
Id = 56, SiteId = 1, State = InstanceState.NotDeployed
};
_repo.GetInstanceByIdAsync(56, Arg.Any<CancellationToken>()).Returns(instance);
SetupValidPipeline(56, "MetricReconcileInst", "sha256:target");
var prior = new DeploymentRecord("dep-prior-56", "admin")
{
InstanceId = 56,
Status = DeploymentStatus.InProgress,
RevisionHash = "sha256:target"
};
_repo.GetCurrentDeploymentStatusAsync(56, Arg.Any<CancellationToken>()).Returns(prior);
_repo.GetDeployedSnapshotByInstanceIdAsync(56, Arg.Any<CancellationToken>())
.Returns((DeployedConfigSnapshot?)null);
var counters = new ReconcileProbeCounters();
var commActor = Sys.ActorOf(Props.Create(() =>
new ReconcileProbeActor(counters, siteHash: "sha256:target", failQuery: false)));
var service = CreateServiceWithCommActor(commActor);
long applied = 0;
using var listener = new MeterListener
{
InstrumentPublished = (instrument, l) =>
{
if (instrument.Meter.Name == ScadaBridgeTelemetry.MeterName
&& instrument.Name == "scadabridge.deployments.applied")
{
l.EnableMeasurementEvents(instrument);
}
}
};
listener.SetMeasurementEventCallback<long>((_, measurement, _, _) =>
Interlocked.Add(ref applied, measurement));
listener.Start();
var result = await service.DeployInstanceAsync(56, "admin");
listener.Dispose();
Assert.True(result.IsSuccess);
// Reconciled — no fresh deploy was sent, so no increment.
Assert.Equal(0, counters.DeployCount);
Assert.Equal(0, Interlocked.Read(ref applied));
}
// ── DeploymentManager-011: lifecycle success paths ──
[Fact]
@@ -0,0 +1,79 @@
using System.Net;
using Microsoft.AspNetCore.Mvc.Testing;
using Microsoft.Extensions.Configuration;
namespace ZB.MOM.WW.ScadaBridge.Host.Tests;
/// <summary>
/// Observability adoption: verifies the shared ZB.MOM.WW.Telemetry Prometheus
/// scrape endpoint (<c>/metrics</c>, mounted by <c>app.MapZbMetrics()</c>) is wired into the
/// Central composition root. <c>AddZbTelemetry</c> (registered in
/// <see cref="SiteServiceRegistration.BindSharedOptions"/>) always wires the Prometheus
/// exporter, so the endpoint returns the Prometheus exposition format regardless of DB /
/// cluster state. This is a pure route assertion — it requires no database, LDAP, or formed
/// Akka cluster. The Central-role factory bootstrap mirrors <see cref="HealthCheckTests"/>.
/// </summary>
public class MetricsEndpointTests : IDisposable
{
private readonly List<IDisposable> _disposables = new();
public MetricsEndpointTests()
{
// Host-003: connection strings are externalised; supply them via env vars.
_disposables.Add(new CentralDbTestEnvironment());
}
public void Dispose()
{
foreach (var d in _disposables)
{
try { d.Dispose(); } catch { /* best effort */ }
}
}
private WebApplicationFactory<Program> CreateCentralFactory()
{
var factory = new WebApplicationFactory<Program>()
.WithWebHostBuilder(builder =>
{
builder.ConfigureAppConfiguration((context, config) =>
{
config.AddInMemoryCollection(new Dictionary<string, string?>
{
["ScadaBridge:Node:NodeHostname"] = "localhost",
["ScadaBridge:Node:RemotingPort"] = "0",
["ScadaBridge:Cluster:SeedNodes:0"] = "akka.tcp://scadabridge@localhost:2551",
["ScadaBridge:Cluster:SeedNodes:1"] = "akka.tcp://scadabridge@localhost:2552",
["ScadaBridge:Database:SkipMigrations"] = "true",
});
});
builder.UseSetting("ScadaBridge:Node:Role", "Central");
builder.UseSetting("ScadaBridge:Database:SkipMigrations", "true");
});
_disposables.Add(factory);
return factory;
}
[Fact]
public async Task Metrics_Endpoint_IsMapped()
{
var previousEnv = Environment.GetEnvironmentVariable("DOTNET_ENVIRONMENT");
try
{
Environment.SetEnvironmentVariable("DOTNET_ENVIRONMENT", "Central");
var factory = CreateCentralFactory();
var client = factory.CreateClient();
_disposables.Add(client);
var response = await client.GetAsync("/metrics");
Assert.Equal(HttpStatusCode.OK, response.StatusCode);
var body = await response.Content.ReadAsStringAsync();
Assert.Contains("# ", body); // Prometheus exposition (HELP/TYPE comments)
}
finally
{
Environment.SetEnvironmentVariable("DOTNET_ENVIRONMENT", previousEnv);
}
}
}
@@ -0,0 +1,32 @@
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
namespace ZB.MOM.WW.ScadaBridge.Host.Tests;
/// <summary>
/// Infrastructure-free guards for <see cref="ScadaBridgeTelemetry"/>: the meter name is the
/// stable value registered with OTel, and the emit helpers are safe to call (they are no-op
/// until follow-on tasks wire real emit points and a listener attaches).
/// </summary>
public class ScadaBridgeTelemetryTests
{
[Fact]
public void MeterName_IsStableValue()
{
Assert.Equal("ZB.MOM.WW.ScadaBridge", ScadaBridgeTelemetry.MeterName);
}
[Fact]
public void EmitHelpers_DoNotThrow()
{
var ex = Record.Exception(() =>
{
ScadaBridgeTelemetry.RecordDeploymentApplied();
ScadaBridgeTelemetry.RecordInboundApiRequest("X");
ScadaBridgeTelemetry.SiteConnectionOpened();
ScadaBridgeTelemetry.SiteConnectionClosed();
ScadaBridgeTelemetry.SetQueueDepthProvider(() => 5);
});
Assert.Null(ex);
}
}
@@ -352,6 +352,105 @@ public class StartupValidatorTests
Assert.Null(ex);
}
[Theory]
[InlineData("0")]
[InlineData("-1")]
[InlineData("65536")]
[InlineData("abc")]
public void Site_InvalidMetricsPort_FailsValidation(string metricsPort)
{
var values = ValidSiteConfig();
values["ScadaBridge:Node:MetricsPort"] = metricsPort;
var config = BuildConfig(values);
var ex = Assert.Throws<InvalidOperationException>(() => StartupValidator.Validate(config));
Assert.Contains("MetricsPort must be 1-65535", ex.Message);
}
[Fact]
public void Site_ValidMetricsPort_PassesValidation()
{
var values = ValidSiteConfig();
values["ScadaBridge:Node:MetricsPort"] = "8084";
var config = BuildConfig(values);
var ex = Record.Exception(() => StartupValidator.Validate(config));
Assert.Null(ex);
}
[Fact]
public void Site_MetricsPortEqualsRemotingPort_FailsValidation()
{
// Host-007 regression: the Kestrel metrics (HTTP/1.1) listener port must
// differ from RemotingPort. Identical values cause the metrics listener
// and Akka.Remote to contend for the same port at runtime.
var values = ValidSiteConfig();
values["ScadaBridge:Node:RemotingPort"] = "8082";
values["ScadaBridge:Node:MetricsPort"] = "8082";
var config = BuildConfig(values);
var ex = Assert.Throws<InvalidOperationException>(() => StartupValidator.Validate(config));
Assert.Contains("MetricsPort must differ from RemotingPort", ex.Message);
}
[Fact]
public void Site_MetricsPortEqualsGrpcPort_FailsValidation()
{
// Host-007 regression: the metrics listener port must differ from GrpcPort.
var values = ValidSiteConfig();
values["ScadaBridge:Node:GrpcPort"] = "8083";
values["ScadaBridge:Node:MetricsPort"] = "8083";
var config = BuildConfig(values);
var ex = Assert.Throws<InvalidOperationException>(() => StartupValidator.Validate(config));
Assert.Contains("MetricsPort must differ from GrpcPort", ex.Message);
}
[Fact]
public void Site_DefaultMetricsPortEqualsRemotingPort_FailsValidation()
{
// MetricsPort absent => NodeOptions default 8084. A site whose RemotingPort
// is also 8084 must still be rejected.
var values = ValidSiteConfig();
values["ScadaBridge:Node:RemotingPort"] = "8084";
// Keep GrpcPort distinct so only the metrics-vs-remoting rule fires.
values["ScadaBridge:Node:GrpcPort"] = "8083";
// Seed nodes default to the remoting port (8082) in ValidSiteConfig; realign
// them to 8084 so the seed-vs-grpc rule is not what trips here.
values["ScadaBridge:Cluster:SeedNodes:0"] = "akka.tcp://scadabridge@site-a-node1:8084";
values["ScadaBridge:Cluster:SeedNodes:1"] = "akka.tcp://scadabridge@site-a-node2:8084";
var config = BuildConfig(values);
var ex = Assert.Throws<InvalidOperationException>(() => StartupValidator.Validate(config));
Assert.Contains("MetricsPort must differ from RemotingPort", ex.Message);
}
[Fact]
public void Site_MetricsPortDiffersFromRemotingAndGrpc_PassesValidation()
{
var values = ValidSiteConfig();
values["ScadaBridge:Node:RemotingPort"] = "8082";
values["ScadaBridge:Node:GrpcPort"] = "8083";
values["ScadaBridge:Node:MetricsPort"] = "8084";
var config = BuildConfig(values);
var ex = Record.Exception(() => StartupValidator.Validate(config));
Assert.Null(ex);
}
[Fact]
public void Central_InvalidMetricsPort_NotValidated()
{
// The metrics-port rules apply to Site nodes only; a Central node runs no
// metrics listener, so an out-of-range MetricsPort must not fail startup.
var values = ValidCentralConfig();
values["ScadaBridge:Node:MetricsPort"] = "0";
var config = BuildConfig(values);
var ex = Record.Exception(() => StartupValidator.Validate(config));
Assert.Null(ex);
}
[Fact]
public void MultipleErrors_AllReported()
{
@@ -9,8 +9,10 @@ using NSubstitute;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.InboundApi;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
using ZB.MOM.WW.ScadaBridge.Commons.Types.InboundApi;
using ZB.MOM.WW.ScadaBridge.InboundAPI.Middleware;
using System.Diagnostics.Metrics;
using System.Net;
using System.Text;
@@ -232,6 +234,114 @@ public class EndpointExtensionsTests
Assert.Equal("audit-actor-name", capture.CapturedActor);
}
[Fact]
public async Task ValidRequest_EmitsInboundApiRequestCounter_TaggedWithResolvedMethodName()
{
// Telemetry follow-on: a successful inbound request increments
// scadabridge.inbound_api.requests once, tagged with the resolved,
// registered method name (method.Name) — the bounded identifier, not the
// raw route value.
var key = SeedKey();
var method = SeedMethod(1, "echo", "return Parameters[\"value\"];",
"""[{"name":"value","type":"Integer","required":true}]""");
using var collector = new InboundApiRequestCounterCollector();
using var host = await BuildHostAsync(key, method);
var client = host.GetTestClient();
var response = await client.SendAsync(BuildPost("echo", """{"value":7}"""));
Assert.Equal(HttpStatusCode.OK, response.StatusCode);
// Filter by the method tag this test produced: the counter is a process-wide
// static, so a parallel test class could otherwise leak measurements in.
var echoTotal = collector.Measurements
.Where(m => m.Method == "echo")
.Sum(m => m.Value);
Assert.Equal(1, echoTotal);
}
[Fact]
public async Task UnknownMethod_EmitsInboundApiRequestCounter_WithBoundedForbiddenSentinel()
{
// Telemetry follow-on: an auth/authz failure is still counted, but the
// tag is a bounded sentinel ("<forbidden>") rather than the arbitrary
// caller-supplied route value — so an attacker posting random method
// names cannot blow up the `method` tag cardinality.
var key = SeedKey();
var method = SeedMethod(1, "knownMethod", "return 1;");
using var collector = new InboundApiRequestCounterCollector();
using var host = await BuildHostAsync(key, method);
var client = host.GetTestClient();
var response = await client.SendAsync(BuildPost("totally-made-up-name", "{}"));
Assert.Equal(HttpStatusCode.Forbidden, response.StatusCode);
var measurements = collector.Measurements;
// Cardinality safety: the arbitrary route value is never used as a tag.
Assert.DoesNotContain(measurements, m => m.Method == "totally-made-up-name");
// The failure path counts the request against the bounded sentinel.
Assert.Contains(measurements, m => m.Method == "<forbidden>" && m.Value == 1);
}
/// <summary>
/// Captures <c>scadabridge.inbound_api.requests</c> measurements (value + the
/// <c>method</c> tag) via a <see cref="MeterListener"/> for the duration of a test.
/// </summary>
private sealed class InboundApiRequestCounterCollector : IDisposable
{
private readonly MeterListener _listener;
private readonly List<(long Value, string? Method)> _measurements = new();
private readonly object _gate = new();
public InboundApiRequestCounterCollector()
{
_listener = new MeterListener
{
InstrumentPublished = (instrument, listener) =>
{
if (instrument.Meter.Name == ScadaBridgeTelemetry.MeterName
&& instrument.Name == "scadabridge.inbound_api.requests")
{
listener.EnableMeasurementEvents(instrument);
}
},
};
_listener.SetMeasurementEventCallback<long>((instrument, value, tags, state) =>
{
string? method = null;
foreach (var tag in tags)
{
if (tag.Key == "method")
{
method = tag.Value as string;
}
}
lock (_gate)
{
_measurements.Add((value, method));
}
});
_listener.Start();
}
public IReadOnlyList<(long Value, string? Method)> Measurements
{
get
{
lock (_gate)
{
return _measurements.ToList();
}
}
}
public void Dispose() => _listener.Dispose();
}
private static HttpRequestMessage BuildPost(string methodName, string body)
{
var request = new HttpRequestMessage(HttpMethod.Post, "/api/" + methodName)
@@ -0,0 +1,211 @@
using System.Diagnostics.Metrics;
using Microsoft.Data.Sqlite;
using Microsoft.Extensions.Logging.Abstractions;
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
namespace ZB.MOM.WW.ScadaBridge.StoreAndForward.Tests;
/// <summary>
/// WP-14 (telemetry follow-on): verifies the cached buffered-message counter that
/// backs the <c>scadabridge.store_and_forward.queue.depth</c> observable gauge tracks
/// the live (Pending) queue across the existing enqueue / drain / park / requeue paths,
/// and that the sync gauge callback reports it.
///
/// The gauge is read the way the OpenTelemetry collector reads it — via a
/// <see cref="MeterListener"/> that forces an observation (the callback is synchronous
/// and does no I/O, which is the whole point of caching the count). <see cref="StartAsync"/>
/// seeds the counter from storage and registers the provider against this service
/// instance, so the gauge resolves to this test's counter.
/// </summary>
public class QueueDepthGaugeTests : IAsyncLifetime, IDisposable
{
private readonly SqliteConnection _keepAlive;
private readonly StoreAndForwardStorage _storage;
private readonly StoreAndForwardService _service;
public QueueDepthGaugeTests()
{
var dbName = $"QueueDepthTests_{Guid.NewGuid():N}";
var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared";
_keepAlive = new SqliteConnection(connStr);
_keepAlive.Open();
_storage = new StoreAndForwardStorage(connStr, NullLogger<StoreAndForwardStorage>.Instance);
var options = new StoreAndForwardOptions
{
DefaultRetryInterval = TimeSpan.Zero,
DefaultMaxRetries = 3,
// Long interval so no background sweep fires on its own during the test;
// sweeps are driven explicitly via RetryPendingMessagesAsync.
RetryTimerInterval = TimeSpan.FromMinutes(10)
};
_service = new StoreAndForwardService(
_storage, options, NullLogger<StoreAndForwardService>.Instance);
}
public async Task InitializeAsync()
{
await _storage.InitializeAsync();
// StartAsync seeds _bufferedCount from the (empty) store and registers the
// queue-depth provider against this service instance.
await _service.StartAsync();
}
public async Task DisposeAsync() => await _service.StopAsync();
public void Dispose() => _keepAlive.Dispose();
/// <summary>
/// Reads the current value of the <c>scadabridge.store_and_forward.queue.depth</c>
/// gauge by forcing a synchronous observation through a transient MeterListener —
/// exactly the path the Prometheus/OTLP collector exercises on each scrape.
/// </summary>
private static long ReadQueueDepthGauge()
{
long observed = -1;
using var listener = new MeterListener
{
InstrumentPublished = (instrument, l) =>
{
if (instrument.Meter.Name == ScadaBridgeTelemetry.MeterName &&
instrument.Name == "scadabridge.store_and_forward.queue.depth")
{
l.EnableMeasurementEvents(instrument);
}
}
};
listener.SetMeasurementEventCallback<long>((_, measurement, _, _) => observed = measurement);
listener.Start();
listener.RecordObservableInstruments();
return observed;
}
[Fact]
public async Task Gauge_TracksBufferedDepth_AcrossEnqueueDrainAndPark()
{
// Empty store seeded at StartAsync → gauge reports 0.
Assert.Equal(0, ReadQueueDepthGauge());
// A handler that fails transiently so each enqueue buffers a Pending row
// (immediate attempt 0 throws → BufferAsync → +1).
var deliver = false;
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
_ =>
{
if (!deliver) throw new HttpRequestException("transient");
return Task.FromResult(true);
});
// Enqueue 3 → cached depth = 3 → gauge reports 3.
for (var i = 0; i < 3; i++)
{
var r = await _service.EnqueueAsync(
StoreAndForwardCategory.ExternalSystem, "api", """{}""");
Assert.True(r.WasBuffered);
}
Assert.Equal(3, ReadQueueDepthGauge());
// Drain: handler now succeeds → the retry sweep removes all 3 Pending rows → depth 0.
deliver = true;
await _service.RetryPendingMessagesAsync();
Assert.Equal(0, ReadQueueDepthGauge());
// Park path: buffer one more, then make it park (maxRetries:1 parks after one
// sweep). Pending→Parked leaves the live queue → depth back to 0.
deliver = false;
var parkResult = await _service.EnqueueAsync(
StoreAndForwardCategory.ExternalSystem, "api", """{}""", maxRetries: 1);
Assert.True(parkResult.WasBuffered);
Assert.Equal(1, ReadQueueDepthGauge());
await _service.RetryPendingMessagesAsync();
var parked = await _storage.GetMessageByIdAsync(parkResult.MessageId);
Assert.Equal(StoreAndForwardMessageStatus.Parked, parked!.Status);
Assert.Equal(0, ReadQueueDepthGauge());
// Operator requeue: Parked→Pending re-adds to the live queue → depth 1.
Assert.True(await _service.RetryParkedMessageAsync(parkResult.MessageId));
Assert.Equal(1, ReadQueueDepthGauge());
}
[Fact]
public async Task Gauge_SeedsFromExistingPendingRows_OnStart()
{
// Pre-seed two Pending rows directly in storage *before* a fresh service starts,
// simulating a process restart over a non-empty buffer. StartAsync must seed the
// cached counter from the store so the gauge does not under-report on restart.
await _storage.EnqueueAsync(new StoreAndForwardMessage
{
Id = Guid.NewGuid().ToString("N"),
Category = StoreAndForwardCategory.ExternalSystem,
Target = "api",
PayloadJson = "{}",
Status = StoreAndForwardMessageStatus.Pending,
CreatedAt = DateTimeOffset.UtcNow,
MaxRetries = 3
});
await _storage.EnqueueAsync(new StoreAndForwardMessage
{
Id = Guid.NewGuid().ToString("N"),
Category = StoreAndForwardCategory.Notification,
Target = "list",
PayloadJson = "{}",
Status = StoreAndForwardMessageStatus.Pending,
CreatedAt = DateTimeOffset.UtcNow,
MaxRetries = 3
});
var fresh = new StoreAndForwardService(
_storage,
new StoreAndForwardOptions { RetryTimerInterval = TimeSpan.FromMinutes(10) },
NullLogger<StoreAndForwardService>.Instance);
try
{
await fresh.StartAsync();
// The fresh service registered itself as the global provider and seeded 2.
Assert.Equal(2, ReadQueueDepthGauge());
}
finally
{
await fresh.StopAsync();
}
}
/// <summary>
/// Review finding (FINDING 1): the startup seed must ADD to whatever the counter
/// already holds, not overwrite it. A concurrent <c>BufferAsync</c> can
/// <c>Interlocked.Increment</c> <c>_bufferedCount</c> in the window between
/// <c>StartAsync</c>'s async <c>COUNT(*)</c> returning and the seed running; with an
/// <c>Interlocked.Exchange</c> seed that increment would be clobbered (lost +1). This
/// pre-increments the in-memory counter (standing in for that concurrent enqueue),
/// then starts the service over an empty store and asserts the pre-increment survives.
/// </summary>
[Fact]
public async Task Gauge_SeedAddsToConcurrentPreSeedIncrement_NotClobber()
{
// Store is empty (StartAsync's pending COUNT(*) = 0), so the only contribution
// is the simulated concurrent pre-seed enqueue increment.
var fresh = new StoreAndForwardService(
_storage,
new StoreAndForwardOptions { RetryTimerInterval = TimeSpan.FromMinutes(10) },
NullLogger<StoreAndForwardService>.Instance);
// Stand in for a BufferAsync increment that landed before StartAsync seeded.
fresh.TestOnly_IncrementBufferedCount();
try
{
await fresh.StartAsync();
// Add(0 seed) over the pre-existing +1 → 1. An Exchange(0) seed would clobber
// it to 0, losing the concurrent enqueue — the bug this fix prevents.
Assert.Equal(1, ReadQueueDepthGauge());
}
finally
{
await fresh.StopAsync();
}
}
}