Compare commits
20 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 145d2668e2 | |||
| 9668a4e84a | |||
| 6dbbc7ad04 | |||
| aac59c9fae | |||
| 9bca6aae61 | |||
| 7d16f8f275 | |||
| ccf43312e8 | |||
| a5f8651b0f | |||
| 15a626390b | |||
| 782fb73015 | |||
| 547b685a42 | |||
| 877f2e200b | |||
| c41cb41c7b | |||
| fe25ac3e51 | |||
| bbc9f09268 | |||
| 43f5886024 | |||
| f743ffaad2 | |||
| b3070c0bda | |||
| 20a31835cf | |||
| 59dca0d5fd |
@@ -75,8 +75,11 @@
|
||||
<PackageVersion Include="ZB.MOM.WW.Health" Version="0.1.0" />
|
||||
<PackageVersion Include="ZB.MOM.WW.Health.Akka" Version="0.1.0" />
|
||||
<PackageVersion Include="ZB.MOM.WW.Health.EntityFrameworkCore" Version="0.1.0" />
|
||||
<PackageVersion Include="ZB.MOM.WW.Telemetry" Version="0.1.0" />
|
||||
<PackageVersion Include="ZB.MOM.WW.Telemetry.Serilog" Version="0.1.0" />
|
||||
<PackageVersion Include="ZB.MOM.WW.MxGateway.Client" Version="0.1.0" />
|
||||
<PackageVersion Include="ZB.MOM.WW.MxGateway.Contracts" Version="0.1.0" />
|
||||
<PackageVersion Include="ZB.MOM.WW.Configuration" Version="0.1.0" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
|
||||
@@ -6,7 +6,8 @@
|
||||
"NodeHostname": "scadabridge-site-a-a",
|
||||
"SiteId": "site-a",
|
||||
"RemotingPort": 8082,
|
||||
"GrpcPort": 8083
|
||||
"GrpcPort": 8083,
|
||||
"MetricsPort": 8084
|
||||
},
|
||||
"Cluster": {
|
||||
"SeedNodes": [
|
||||
|
||||
@@ -6,7 +6,8 @@
|
||||
"NodeHostname": "scadabridge-site-a-b",
|
||||
"SiteId": "site-a",
|
||||
"RemotingPort": 8082,
|
||||
"GrpcPort": 8083
|
||||
"GrpcPort": 8083,
|
||||
"MetricsPort": 8084
|
||||
},
|
||||
"Cluster": {
|
||||
"SeedNodes": [
|
||||
|
||||
@@ -6,7 +6,8 @@
|
||||
"NodeHostname": "scadabridge-site-b-a",
|
||||
"SiteId": "site-b",
|
||||
"RemotingPort": 8082,
|
||||
"GrpcPort": 8083
|
||||
"GrpcPort": 8083,
|
||||
"MetricsPort": 8084
|
||||
},
|
||||
"Cluster": {
|
||||
"SeedNodes": [
|
||||
|
||||
@@ -6,7 +6,8 @@
|
||||
"NodeHostname": "scadabridge-site-b-b",
|
||||
"SiteId": "site-b",
|
||||
"RemotingPort": 8082,
|
||||
"GrpcPort": 8083
|
||||
"GrpcPort": 8083,
|
||||
"MetricsPort": 8084
|
||||
},
|
||||
"Cluster": {
|
||||
"SeedNodes": [
|
||||
|
||||
@@ -6,7 +6,8 @@
|
||||
"NodeHostname": "scadabridge-site-c-a",
|
||||
"SiteId": "site-c",
|
||||
"RemotingPort": 8082,
|
||||
"GrpcPort": 8083
|
||||
"GrpcPort": 8083,
|
||||
"MetricsPort": 8084
|
||||
},
|
||||
"Cluster": {
|
||||
"SeedNodes": [
|
||||
|
||||
@@ -6,7 +6,8 @@
|
||||
"NodeHostname": "scadabridge-site-c-b",
|
||||
"SiteId": "site-c",
|
||||
"RemotingPort": 8082,
|
||||
"GrpcPort": 8083
|
||||
"GrpcPort": 8083,
|
||||
"MetricsPort": 8084
|
||||
},
|
||||
"Cluster": {
|
||||
"SeedNodes": [
|
||||
|
||||
@@ -18,6 +18,9 @@
|
||||
<package pattern="ZB.MOM.WW.MxGateway.*" />
|
||||
<package pattern="ZB.MOM.WW.Health" />
|
||||
<package pattern="ZB.MOM.WW.Health.*" />
|
||||
<package pattern="ZB.MOM.WW.Telemetry" />
|
||||
<package pattern="ZB.MOM.WW.Telemetry.*" />
|
||||
<package pattern="ZB.MOM.WW.Configuration" />
|
||||
</packageSource>
|
||||
</packageSourceMapping>
|
||||
<!--
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
using Microsoft.Extensions.Options;
|
||||
using ZB.MOM.WW.Configuration;
|
||||
|
||||
namespace ZB.MOM.WW.ScadaBridge.AuditLog.Configuration;
|
||||
|
||||
@@ -13,7 +13,7 @@ namespace ZB.MOM.WW.ScadaBridge.AuditLog.Configuration;
|
||||
/// drop in-flight investigations, too long would defeat the partition-switch
|
||||
/// purge's purpose.
|
||||
/// </summary>
|
||||
public sealed class AuditLogOptionsValidator : IValidateOptions<AuditLogOptions>
|
||||
public sealed class AuditLogOptionsValidator : OptionsValidatorBase<AuditLogOptions>
|
||||
{
|
||||
/// <summary>Inclusive lower bound for <see cref="AuditLogOptions.RetentionDays"/>.</summary>
|
||||
public const int MinRetentionDays = 30;
|
||||
@@ -28,43 +28,29 @@ public sealed class AuditLogOptionsValidator : IValidateOptions<AuditLogOptions>
|
||||
public const int MaxInboundMaxBytes = 16_777_216;
|
||||
|
||||
/// <inheritdoc />
|
||||
public ValidateOptionsResult Validate(string? name, AuditLogOptions options)
|
||||
protected override void Validate(ValidationBuilder builder, AuditLogOptions options)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(options);
|
||||
builder.RequireThat(options.DefaultCapBytes > 0,
|
||||
$"AuditLog:{nameof(AuditLogOptions.DefaultCapBytes)} ({options.DefaultCapBytes}) " +
|
||||
"must be > 0; it drives payload-summary truncation in audit writers.");
|
||||
|
||||
var failures = new List<string>();
|
||||
builder.RequireThat(options.ErrorCapBytes >= options.DefaultCapBytes,
|
||||
$"AuditLog:{nameof(AuditLogOptions.ErrorCapBytes)} ({options.ErrorCapBytes}) " +
|
||||
$"must be >= {nameof(AuditLogOptions.DefaultCapBytes)} ({options.DefaultCapBytes}); " +
|
||||
"the error-row cap is intended to capture more detail than the happy-path summary.");
|
||||
|
||||
if (options.DefaultCapBytes <= 0)
|
||||
{
|
||||
failures.Add(
|
||||
$"AuditLog:{nameof(AuditLogOptions.DefaultCapBytes)} ({options.DefaultCapBytes}) " +
|
||||
"must be > 0; it drives payload-summary truncation in audit writers.");
|
||||
}
|
||||
// Valid when RetentionDays is within [Min, Max] inclusive. The De Morgan'd
|
||||
// guard !(below Min OR above Max) is equivalent to (>= Min AND <= Max).
|
||||
builder.RequireThat(
|
||||
!(options.RetentionDays < MinRetentionDays || options.RetentionDays > MaxRetentionDays),
|
||||
$"AuditLog:{nameof(AuditLogOptions.RetentionDays)} ({options.RetentionDays}) " +
|
||||
$"must be in [{MinRetentionDays}, {MaxRetentionDays}] days.");
|
||||
|
||||
if (options.ErrorCapBytes < options.DefaultCapBytes)
|
||||
{
|
||||
failures.Add(
|
||||
$"AuditLog:{nameof(AuditLogOptions.ErrorCapBytes)} ({options.ErrorCapBytes}) " +
|
||||
$"must be >= {nameof(AuditLogOptions.DefaultCapBytes)} ({options.DefaultCapBytes}); " +
|
||||
"the error-row cap is intended to capture more detail than the happy-path summary.");
|
||||
}
|
||||
|
||||
if (options.RetentionDays < MinRetentionDays || options.RetentionDays > MaxRetentionDays)
|
||||
{
|
||||
failures.Add(
|
||||
$"AuditLog:{nameof(AuditLogOptions.RetentionDays)} ({options.RetentionDays}) " +
|
||||
$"must be in [{MinRetentionDays}, {MaxRetentionDays}] days.");
|
||||
}
|
||||
|
||||
if (options.InboundMaxBytes < MinInboundMaxBytes || options.InboundMaxBytes > MaxInboundMaxBytes)
|
||||
{
|
||||
failures.Add(
|
||||
$"AuditLog:{nameof(AuditLogOptions.InboundMaxBytes)} ({options.InboundMaxBytes}) " +
|
||||
$"must be in [{MinInboundMaxBytes}, {MaxInboundMaxBytes}] bytes.");
|
||||
}
|
||||
|
||||
return failures.Count == 0
|
||||
? ValidateOptionsResult.Success
|
||||
: ValidateOptionsResult.Fail(failures);
|
||||
// Valid when InboundMaxBytes is within [Min, Max] inclusive. The De Morgan'd
|
||||
// guard !(below Min OR above Max) is equivalent to (>= Min AND <= Max).
|
||||
builder.RequireThat(
|
||||
!(options.InboundMaxBytes < MinInboundMaxBytes || options.InboundMaxBytes > MaxInboundMaxBytes),
|
||||
$"AuditLog:{nameof(AuditLogOptions.InboundMaxBytes)} ({options.InboundMaxBytes}) " +
|
||||
$"must be in [{MinInboundMaxBytes}, {MaxInboundMaxBytes}] bytes.");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.DependencyInjection.Extensions;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using ZB.MOM.WW.Configuration;
|
||||
using ZB.MOM.WW.ScadaBridge.AuditLog.Central;
|
||||
using ZB.MOM.WW.ScadaBridge.AuditLog.Configuration;
|
||||
using ZB.MOM.WW.ScadaBridge.AuditLog.Payload;
|
||||
@@ -62,10 +62,12 @@ public static class ServiceCollectionExtensions
|
||||
ArgumentNullException.ThrowIfNull(config);
|
||||
|
||||
// M1: top-level AuditLogOptions + validator (redaction policy, payload caps, etc.).
|
||||
services.AddOptions<AuditLogOptions>()
|
||||
.Bind(config.GetSection(ConfigSectionName))
|
||||
.ValidateOnStart();
|
||||
services.AddSingleton<IValidateOptions<AuditLogOptions>, AuditLogOptionsValidator>();
|
||||
// Collapsed onto the shared ZB.MOM.WW.Configuration helper: it binds the
|
||||
// "AuditLog" section, registers the validator, and enables ValidateOnStart in
|
||||
// one call. Same section path as before; AddAuditLog is call-once per
|
||||
// collection, and the helper's TryAddEnumerable is idempotent for the
|
||||
// validator (a strict improvement over the previous AddSingleton).
|
||||
services.AddValidatedOptions<AuditLogOptions, AuditLogOptionsValidator>(config, ConfigSectionName);
|
||||
|
||||
// M5 Bundle A: payload filter — truncates oversized RequestSummary /
|
||||
// ResponseSummary / ErrorDetail / Extra fields between event
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
|
||||
<PackageReference Include="Microsoft.Extensions.Options" />
|
||||
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" />
|
||||
<PackageReference Include="ZB.MOM.WW.Configuration" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
using Microsoft.Extensions.Options;
|
||||
using ZB.MOM.WW.Configuration;
|
||||
|
||||
namespace ZB.MOM.WW.ScadaBridge.ClusterInfrastructure;
|
||||
|
||||
@@ -10,7 +10,7 @@ namespace ZB.MOM.WW.ScadaBridge.ClusterInfrastructure;
|
||||
/// Registered with <c>ValidateOnStart()</c> so a bad <c>appsettings.json</c>
|
||||
/// fails fast at boot rather than failing far from the cause.
|
||||
/// </summary>
|
||||
public sealed class ClusterOptionsValidator : IValidateOptions<ClusterOptions>
|
||||
public sealed class ClusterOptionsValidator : OptionsValidatorBase<ClusterOptions>
|
||||
{
|
||||
/// <summary>Split-brain resolver strategies safe for ScadaBridge's two-node clusters.</summary>
|
||||
private static readonly HashSet<string> AllowedStrategies = new(StringComparer.OrdinalIgnoreCase)
|
||||
@@ -19,77 +19,51 @@ public sealed class ClusterOptionsValidator : IValidateOptions<ClusterOptions>
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
/// Validates the cluster options, returning a failure result if any critical settings are misconfigured.
|
||||
/// Validates the cluster options, recording a failure if any critical settings are misconfigured.
|
||||
/// </summary>
|
||||
/// <param name="name">Named options instance name (unused; all instances are validated identically).</param>
|
||||
/// <param name="builder">The accumulator to record failures on.</param>
|
||||
/// <param name="options">The cluster options to validate.</param>
|
||||
public ValidateOptionsResult Validate(string? name, ClusterOptions options)
|
||||
protected override void Validate(ValidationBuilder builder, ClusterOptions options)
|
||||
{
|
||||
var failures = new List<string>();
|
||||
// CI-012: design doc states "both nodes are seed nodes — each node lists
|
||||
// both itself and its partner" so a properly-configured deployment lists
|
||||
// two. Accepting a single-seed configuration silently defeats the
|
||||
// "no startup ordering dependency" guarantee called out by
|
||||
// Component-ClusterInfrastructure.md (Node Configuration).
|
||||
builder.RequireThat(options.SeedNodes is not null && options.SeedNodes.Count >= 2,
|
||||
"ClusterOptions.SeedNodes must contain at least 2 seed nodes "
|
||||
+ "(Component-ClusterInfrastructure.md → Node Configuration: "
|
||||
+ "both nodes are seed nodes); a single-seed configuration defeats "
|
||||
+ "the no-startup-ordering-dependency guarantee.");
|
||||
|
||||
if (options.SeedNodes is null || options.SeedNodes.Count < 2)
|
||||
{
|
||||
// CI-012: design doc states "both nodes are seed nodes — each node lists
|
||||
// both itself and its partner" so a properly-configured deployment lists
|
||||
// two. Accepting a single-seed configuration silently defeats the
|
||||
// "no startup ordering dependency" guarantee called out by
|
||||
// Component-ClusterInfrastructure.md (Node Configuration).
|
||||
failures.Add(
|
||||
"ClusterOptions.SeedNodes must contain at least 2 seed nodes "
|
||||
+ "(Component-ClusterInfrastructure.md → Node Configuration: "
|
||||
+ "both nodes are seed nodes); a single-seed configuration defeats "
|
||||
+ "the no-startup-ordering-dependency guarantee.");
|
||||
}
|
||||
builder.RequireThat(
|
||||
!string.IsNullOrWhiteSpace(options.SplitBrainResolverStrategy)
|
||||
&& AllowedStrategies.Contains(options.SplitBrainResolverStrategy),
|
||||
$"ClusterOptions.SplitBrainResolverStrategy must be 'keep-oldest' for a two-node cluster; " +
|
||||
$"'{options.SplitBrainResolverStrategy}' would risk a total cluster shutdown on a partition.");
|
||||
|
||||
if (string.IsNullOrWhiteSpace(options.SplitBrainResolverStrategy)
|
||||
|| !AllowedStrategies.Contains(options.SplitBrainResolverStrategy))
|
||||
{
|
||||
failures.Add(
|
||||
$"ClusterOptions.SplitBrainResolverStrategy must be 'keep-oldest' for a two-node cluster; " +
|
||||
$"'{options.SplitBrainResolverStrategy}' would risk a total cluster shutdown on a partition.");
|
||||
}
|
||||
builder.RequireThat(options.MinNrOfMembers == 1,
|
||||
$"ClusterOptions.MinNrOfMembers must be 1 (was {options.MinNrOfMembers}); " +
|
||||
"any other value blocks the cluster singleton after failover and halts all data collection.");
|
||||
|
||||
if (options.MinNrOfMembers != 1)
|
||||
{
|
||||
failures.Add(
|
||||
$"ClusterOptions.MinNrOfMembers must be 1 (was {options.MinNrOfMembers}); " +
|
||||
"any other value blocks the cluster singleton after failover and halts all data collection.");
|
||||
}
|
||||
builder.RequireThat(options.StableAfter > TimeSpan.Zero,
|
||||
"ClusterOptions.StableAfter must be a positive duration.");
|
||||
|
||||
if (options.StableAfter <= TimeSpan.Zero)
|
||||
{
|
||||
failures.Add("ClusterOptions.StableAfter must be a positive duration.");
|
||||
}
|
||||
builder.RequireThat(options.HeartbeatInterval > TimeSpan.Zero,
|
||||
"ClusterOptions.HeartbeatInterval must be a positive duration.");
|
||||
|
||||
if (options.HeartbeatInterval <= TimeSpan.Zero)
|
||||
{
|
||||
failures.Add("ClusterOptions.HeartbeatInterval must be a positive duration.");
|
||||
}
|
||||
builder.RequireThat(options.FailureDetectionThreshold > TimeSpan.Zero,
|
||||
"ClusterOptions.FailureDetectionThreshold must be a positive duration.");
|
||||
|
||||
if (options.FailureDetectionThreshold <= TimeSpan.Zero)
|
||||
{
|
||||
failures.Add("ClusterOptions.FailureDetectionThreshold must be a positive duration.");
|
||||
}
|
||||
builder.RequireThat(options.HeartbeatInterval < options.FailureDetectionThreshold,
|
||||
$"ClusterOptions.HeartbeatInterval ({options.HeartbeatInterval}) must be well below " +
|
||||
$"FailureDetectionThreshold ({options.FailureDetectionThreshold}); otherwise nodes are " +
|
||||
"declared unreachable before a heartbeat can arrive.");
|
||||
|
||||
if (options.HeartbeatInterval >= options.FailureDetectionThreshold)
|
||||
{
|
||||
failures.Add(
|
||||
$"ClusterOptions.HeartbeatInterval ({options.HeartbeatInterval}) must be well below " +
|
||||
$"FailureDetectionThreshold ({options.FailureDetectionThreshold}); otherwise nodes are " +
|
||||
"declared unreachable before a heartbeat can arrive.");
|
||||
}
|
||||
|
||||
if (!options.DownIfAlone)
|
||||
{
|
||||
failures.Add(
|
||||
"ClusterOptions.DownIfAlone must be true for the keep-oldest resolver "
|
||||
+ "(Component-ClusterInfrastructure.md → Split-Brain Resolution); with it false the "
|
||||
+ "oldest node can run as an isolated single-node cluster during a partition while the "
|
||||
+ "younger node forms its own, producing two live clusters.");
|
||||
}
|
||||
|
||||
return failures.Count > 0
|
||||
? ValidateOptionsResult.Fail(failures)
|
||||
: ValidateOptionsResult.Success;
|
||||
builder.RequireThat(options.DownIfAlone,
|
||||
"ClusterOptions.DownIfAlone must be true for the keep-oldest resolver "
|
||||
+ "(Component-ClusterInfrastructure.md → Split-Brain Resolution); with it false the "
|
||||
+ "oldest node can run as an isolated single-node cluster during a partition while the "
|
||||
+ "younger node forms its own, producing two live clusters.");
|
||||
}
|
||||
}
|
||||
|
||||
+1
@@ -10,6 +10,7 @@
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" />
|
||||
<PackageReference Include="Microsoft.Extensions.Options" />
|
||||
<PackageReference Include="ZB.MOM.WW.Configuration" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
|
||||
@@ -0,0 +1,94 @@
|
||||
using System.Diagnostics.Metrics;
|
||||
|
||||
namespace ZB.MOM.WW.ScadaBridge.Commons.Observability;
|
||||
|
||||
/// <summary>
|
||||
/// Central <see cref="Meter"/> + instrument definitions for ScadaBridge's application
|
||||
/// telemetry, modelled on OtOpcUa's <c>OtOpcUaTelemetry</c>. Modules emit through these
|
||||
/// pre-created instruments so a single OpenTelemetry / Prometheus binding in
|
||||
/// <c>Host</c> (registered via <c>AddZbTelemetry</c> with this meter named in
|
||||
/// <c>ZbTelemetryOptions.Meters</c>) catches everything. No exporter is required —
|
||||
/// instruments are no-op until a listener attaches, so tests and dev hosts pay nothing
|
||||
/// for instrumentation that nobody scrapes.
|
||||
///
|
||||
/// Instrument names follow the OpenTelemetry semantic convention pattern
|
||||
/// <c>scadabridge.<subsystem>.<event></c>. This task defines the instruments and
|
||||
/// their emit helpers; four later tasks wire the actual emit points. Until those land the
|
||||
/// helpers are dormant but inert — calling them is safe and simply records against a meter
|
||||
/// that nothing observes.
|
||||
/// </summary>
|
||||
public static class ScadaBridgeTelemetry
|
||||
{
|
||||
/// <summary>The meter name registered with OTel via <c>ZbTelemetryOptions.Meters</c>.</summary>
|
||||
public const string MeterName = "ZB.MOM.WW.ScadaBridge";
|
||||
|
||||
/// <summary>Singleton <see cref="Meter"/> all instruments hang off.</summary>
|
||||
private static readonly Meter Meter = new(MeterName);
|
||||
|
||||
// ---------------- Counters ----------------
|
||||
|
||||
/// <summary>Incremented each time a deployment is successfully applied.</summary>
|
||||
private static readonly Counter<long> _deploymentsApplied =
|
||||
Meter.CreateCounter<long>("scadabridge.deployments.applied", unit: "1",
|
||||
description: "Deployments applied.");
|
||||
|
||||
/// <summary>Incremented for each inbound API request, tagged with the API method.</summary>
|
||||
private static readonly Counter<long> _inboundApiRequests =
|
||||
Meter.CreateCounter<long>("scadabridge.inbound_api.requests", unit: "1",
|
||||
description: "Inbound API requests, tagged by method.");
|
||||
|
||||
// ---------------- Observable gauges ----------------
|
||||
|
||||
/// <summary>Current count of open site connections, mutated via <see cref="Interlocked"/>.</summary>
|
||||
private static long _siteConnectionsUp;
|
||||
|
||||
/// <summary>Provider that yields the live StoreAndForward queue depth; set by a later task.</summary>
|
||||
private static Func<long>? _queueDepthProvider;
|
||||
|
||||
#pragma warning disable IDE0052 // Held to keep the observable gauges alive for the meter's lifetime.
|
||||
/// <summary>Gauge reporting the number of currently open site connections.</summary>
|
||||
private static readonly ObservableGauge<long> _siteConnectionUp =
|
||||
Meter.CreateObservableGauge<long>("scadabridge.site.connection.up",
|
||||
() => Interlocked.Read(ref _siteConnectionsUp),
|
||||
description: "Number of currently open site connections.");
|
||||
|
||||
/// <summary>Gauge reporting the current StoreAndForward queue depth via the registered provider.</summary>
|
||||
private static readonly ObservableGauge<long> _storeAndForwardQueueDepth =
|
||||
Meter.CreateObservableGauge<long>("scadabridge.store_and_forward.queue.depth",
|
||||
() => Volatile.Read(ref _queueDepthProvider) is { } p ? p() : 0L,
|
||||
unit: "items",
|
||||
description: "Current StoreAndForward queue depth.");
|
||||
#pragma warning restore IDE0052
|
||||
|
||||
// ---------------- Emit helpers ----------------
|
||||
|
||||
/// <summary>Records that a deployment was applied.</summary>
|
||||
public static void RecordDeploymentApplied() => _deploymentsApplied.Add(1);
|
||||
|
||||
/// <summary>Records an inbound API request for the given <paramref name="method"/>.</summary>
|
||||
/// <param name="method">The API method the request targeted.</param>
|
||||
public static void RecordInboundApiRequest(string method) =>
|
||||
_inboundApiRequests.Add(1, new KeyValuePair<string, object?>("method", method));
|
||||
|
||||
/// <summary>Records that a site connection opened (increments the up-count gauge).</summary>
|
||||
public static void SiteConnectionOpened() => Interlocked.Increment(ref _siteConnectionsUp);
|
||||
|
||||
/// <summary>Records that a site connection closed (decrements the up-count gauge).</summary>
|
||||
public static void SiteConnectionClosed() => Interlocked.Decrement(ref _siteConnectionsUp);
|
||||
|
||||
/// <summary>
|
||||
/// Registers the provider the StoreAndForward queue-depth gauge reads on each observation.
|
||||
/// A later task supplies a provider that reads the real StoreAndForward depth. A null
|
||||
/// provider is ignored so the gauge falls back to reporting 0.
|
||||
/// </summary>
|
||||
/// <param name="provider">A callback returning the current queue depth.</param>
|
||||
public static void SetQueueDepthProvider(Func<long> provider)
|
||||
{
|
||||
if (provider is null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
Volatile.Write(ref _queueDepthProvider, provider);
|
||||
}
|
||||
}
|
||||
@@ -7,6 +7,7 @@ using Microsoft.Extensions.Options;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Audit;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Audit;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
|
||||
using GrpcStatus = Grpc.Core.Status;
|
||||
|
||||
namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc;
|
||||
@@ -264,6 +265,14 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase
|
||||
"Stream {CorrelationId} started for {Instance} (subscription {SubscriptionId})",
|
||||
request.CorrelationId, request.InstanceUniqueName, subscriptionId);
|
||||
|
||||
// Telemetry follow-on: the connection is now fully established (Subscribe
|
||||
// succeeded, so no leak via the catch above). Count it up here and balance
|
||||
// it in the finally below so the scadabridge.site.connection.up gauge is
|
||||
// decremented on EVERY exit path — normal completion, client-cancel /
|
||||
// duplicate-replacement (OperationCanceledException), server shutdown
|
||||
// (CancelAllStreams -> Cts.Cancel), and any other exception — guaranteeing
|
||||
// exactly one Closed per Opened and a gauge that never drifts up.
|
||||
ScadaBridgeTelemetry.SiteConnectionOpened();
|
||||
try
|
||||
{
|
||||
await foreach (var evt in channel.Reader.ReadAllAsync(streamCts.Token))
|
||||
@@ -277,6 +286,7 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase
|
||||
}
|
||||
finally
|
||||
{
|
||||
ScadaBridgeTelemetry.SiteConnectionClosed();
|
||||
_streamSubscriber.RemoveSubscriber(relayActor);
|
||||
_actorSystem!.Stop(relayActor);
|
||||
channel.Writer.TryComplete();
|
||||
|
||||
@@ -7,6 +7,7 @@ using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Lifecycle;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Types;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Types.Flattening;
|
||||
@@ -244,6 +245,16 @@ public class DeploymentService
|
||||
|
||||
if (response.Status == DeploymentStatus.Success)
|
||||
{
|
||||
// Telemetry: one instance deployment successfully applied to a
|
||||
// site. Counted once per successful deploy operation (the unit
|
||||
// of scadabridge.deployments.applied — one DeployInstanceAsync
|
||||
// deploys exactly one instance to one site). Emitted only on this
|
||||
// confirmed-Success path, so failures, timeouts/retries (the
|
||||
// catch block), and the reconciliation path (which recovers a
|
||||
// PRIOR timed-out apply rather than performing a fresh one) do
|
||||
// not increment it.
|
||||
ScadaBridgeTelemetry.RecordDeploymentApplied();
|
||||
|
||||
// The site has applied the deployment. The post-success
|
||||
// persistence below is best-effort: a failure here must be
|
||||
// logged loudly for operator reconciliation but must not flip
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
using Microsoft.Extensions.Options;
|
||||
using ZB.MOM.WW.Configuration;
|
||||
|
||||
namespace ZB.MOM.WW.ScadaBridge.HealthMonitoring;
|
||||
|
||||
@@ -14,51 +14,40 @@ namespace ZB.MOM.WW.ScadaBridge.HealthMonitoring;
|
||||
/// <c>ValidateOnStart()</c> so a bad <c>ScadaBridge:HealthMonitoring</c> section
|
||||
/// fails fast at boot with a clear, key-naming message.
|
||||
/// </summary>
|
||||
public sealed class HealthMonitoringOptionsValidator : IValidateOptions<HealthMonitoringOptions>
|
||||
public sealed class HealthMonitoringOptionsValidator : OptionsValidatorBase<HealthMonitoringOptions>
|
||||
{
|
||||
/// <summary>
|
||||
/// Validates the health monitoring options, returning a failure result if any interval values are non-positive.
|
||||
/// Validates the health monitoring options, recording a failure if any interval values are non-positive.
|
||||
/// </summary>
|
||||
/// <param name="name">Named options instance name (unused).</param>
|
||||
/// <param name="builder">The accumulator to record failures on.</param>
|
||||
/// <param name="options">The health monitoring options to validate.</param>
|
||||
public ValidateOptionsResult Validate(string? name, HealthMonitoringOptions options)
|
||||
protected override void Validate(ValidationBuilder builder, HealthMonitoringOptions options)
|
||||
{
|
||||
var failures = new List<string>();
|
||||
builder.RequireThat(options.ReportInterval > TimeSpan.Zero,
|
||||
$"ScadaBridge:HealthMonitoring:ReportInterval must be a positive duration " +
|
||||
$"(was {options.ReportInterval}); it is used directly as a PeriodicTimer period.");
|
||||
|
||||
if (options.ReportInterval <= TimeSpan.Zero)
|
||||
{
|
||||
failures.Add(
|
||||
$"ScadaBridge:HealthMonitoring:ReportInterval must be a positive duration " +
|
||||
$"(was {options.ReportInterval}); it is used directly as a PeriodicTimer period.");
|
||||
}
|
||||
builder.RequireThat(options.OfflineTimeout > TimeSpan.Zero,
|
||||
$"ScadaBridge:HealthMonitoring:OfflineTimeout must be a positive duration " +
|
||||
$"(was {options.OfflineTimeout}); it drives the offline-check PeriodicTimer cadence.");
|
||||
|
||||
if (options.OfflineTimeout <= TimeSpan.Zero)
|
||||
{
|
||||
failures.Add(
|
||||
$"ScadaBridge:HealthMonitoring:OfflineTimeout must be a positive duration " +
|
||||
$"(was {options.OfflineTimeout}); it drives the offline-check PeriodicTimer cadence.");
|
||||
}
|
||||
builder.RequireThat(options.CentralOfflineTimeout > TimeSpan.Zero,
|
||||
$"ScadaBridge:HealthMonitoring:CentralOfflineTimeout must be a positive duration " +
|
||||
$"(was {options.CentralOfflineTimeout}).");
|
||||
|
||||
if (options.CentralOfflineTimeout <= TimeSpan.Zero)
|
||||
{
|
||||
failures.Add(
|
||||
$"ScadaBridge:HealthMonitoring:CentralOfflineTimeout must be a positive duration " +
|
||||
$"(was {options.CentralOfflineTimeout}).");
|
||||
}
|
||||
|
||||
if (options.OfflineTimeout > TimeSpan.Zero
|
||||
&& options.CentralOfflineTimeout > TimeSpan.Zero
|
||||
&& options.CentralOfflineTimeout < options.OfflineTimeout)
|
||||
{
|
||||
failures.Add(
|
||||
$"ScadaBridge:HealthMonitoring:CentralOfflineTimeout ({options.CentralOfflineTimeout}) " +
|
||||
$"must be >= OfflineTimeout ({options.OfflineTimeout}): the synthetic 'central' site has " +
|
||||
"no heartbeat source and is fed only by the slower self-report loop, so it needs at " +
|
||||
"least as much offline grace as a real site.");
|
||||
}
|
||||
|
||||
return failures.Count > 0
|
||||
? ValidateOptionsResult.Fail(failures)
|
||||
: ValidateOptionsResult.Success;
|
||||
// Valid when CentralOfflineTimeout >= OfflineTimeout (both already
|
||||
// required to be positive above). The De Morgan'd guard !(both positive
|
||||
// AND Central < Offline) is true unless BOTH timeouts are positive and
|
||||
// Central is strictly smaller — so it stays silent when either field is
|
||||
// non-positive, leaving that failure to the dedicated positive-duration
|
||||
// checks above rather than double-firing here.
|
||||
builder.RequireThat(
|
||||
!(options.OfflineTimeout > TimeSpan.Zero
|
||||
&& options.CentralOfflineTimeout > TimeSpan.Zero
|
||||
&& options.CentralOfflineTimeout < options.OfflineTimeout),
|
||||
$"ScadaBridge:HealthMonitoring:CentralOfflineTimeout ({options.CentralOfflineTimeout}) " +
|
||||
$"must be >= OfflineTimeout ({options.OfflineTimeout}): the synthetic 'central' site has " +
|
||||
"no heartbeat source and is fed only by the slower self-report loop, so it needs at " +
|
||||
"least as much offline grace as a real site.");
|
||||
}
|
||||
}
|
||||
|
||||
+1
@@ -12,6 +12,7 @@
|
||||
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" />
|
||||
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
|
||||
<PackageReference Include="Microsoft.Extensions.Options" />
|
||||
<PackageReference Include="ZB.MOM.WW.Configuration" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
using Serilog;
|
||||
using Serilog.Events;
|
||||
using ZB.MOM.WW.Telemetry.Serilog;
|
||||
|
||||
namespace ZB.MOM.WW.ScadaBridge.Host;
|
||||
|
||||
@@ -85,7 +86,8 @@ public static class LoggerConfigurationFactory
|
||||
.MinimumLevel.Is(minimumLevel)
|
||||
.Enrich.WithProperty("SiteId", siteId)
|
||||
.Enrich.WithProperty("NodeHostname", nodeHostname)
|
||||
.Enrich.WithProperty("NodeRole", nodeRole);
|
||||
.Enrich.WithProperty("NodeRole", nodeRole)
|
||||
.Enrich.With(new TraceContextEnricher());
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
||||
@@ -20,4 +20,11 @@ public class NodeOptions
|
||||
public int RemotingPort { get; set; } = 8081;
|
||||
/// <summary>Gets or sets the gRPC port for the site stream server.</summary>
|
||||
public int GrpcPort { get; set; } = 8083;
|
||||
/// <summary>
|
||||
/// HTTP/1.1 port serving the Prometheus /metrics scrape endpoint on site nodes.
|
||||
/// Defaults to 8084 — deliberately distinct from <see cref="RemotingPort"/> (8082)
|
||||
/// and <see cref="GrpcPort"/> (8083) so the Kestrel metrics listener never contends
|
||||
/// with the Akka remoting port a site node binds.
|
||||
/// </summary>
|
||||
public int MetricsPort { get; set; } = 8084;
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ using ZB.MOM.WW.ScadaBridge.Security;
|
||||
using ZB.MOM.WW.ScadaBridge.SiteCallAudit;
|
||||
using ZB.MOM.WW.ScadaBridge.TemplateEngine;
|
||||
using ZB.MOM.WW.ScadaBridge.Transport;
|
||||
using ZB.MOM.WW.Telemetry;
|
||||
using Serilog;
|
||||
|
||||
// SCADABRIDGE_CONFIG determines which role-specific config to load (Central or Site)
|
||||
@@ -248,6 +249,12 @@ try
|
||||
// All three are anonymous and use the canonical ZbHealthWriter JSON output.
|
||||
app.MapZbHealth();
|
||||
|
||||
// Observability — mount the always-on Prometheus /metrics scrape endpoint.
|
||||
// AddZbTelemetry (in SiteServiceRegistration.BindSharedOptions) wires the OTel
|
||||
// Resource + standard instrumentation + Prometheus exporter; this exposes them.
|
||||
// Requires endpoint routing (app.UseRouting() above).
|
||||
app.MapZbMetrics();
|
||||
|
||||
app.MapStaticAssets();
|
||||
app.MapCentralUI<ZB.MOM.WW.ScadaBridge.Host.Components.App>();
|
||||
app.MapInboundAPI();
|
||||
@@ -286,6 +293,13 @@ try
|
||||
// Read GrpcPort from config (NodeOptions already has default 8083)
|
||||
var grpcPort = configuration.GetValue<int>("ScadaBridge:Node:GrpcPort", 8083);
|
||||
|
||||
// Read MetricsPort from config (NodeOptions already has default 8084).
|
||||
// Separate HTTP/1.1 listener so a standard HTTP/1.1 Prometheus scraper can
|
||||
// reach /metrics; the gRPC port stays HTTP/2-only below. The default is
|
||||
// 8084 — distinct from RemotingPort (8082, Akka) and GrpcPort (8083) so the
|
||||
// metrics listener never collides with the Akka remoting port on site nodes.
|
||||
var metricsPort = configuration.GetValue<int>("ScadaBridge:Node:MetricsPort", 8084);
|
||||
|
||||
// Configure Kestrel for HTTP/2 only on the gRPC port
|
||||
builder.WebHost.ConfigureKestrel(options =>
|
||||
{
|
||||
@@ -293,6 +307,13 @@ try
|
||||
{
|
||||
listenOptions.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http2;
|
||||
});
|
||||
|
||||
// Dedicated HTTP/1.1 (and HTTP/2) listener for the Prometheus /metrics
|
||||
// scrape endpoint, reachable by an HTTP/1.1 scraper.
|
||||
options.ListenAnyIP(metricsPort, listenOptions =>
|
||||
{
|
||||
listenOptions.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http1AndHttp2;
|
||||
});
|
||||
});
|
||||
|
||||
// gRPC server registration
|
||||
@@ -304,6 +325,17 @@ try
|
||||
|
||||
var app = builder.Build();
|
||||
|
||||
// Endpoint routing middleware. The gRPC service mapping below and the
|
||||
// /metrics scrape endpoint both run on endpoint routing, so UseRouting()
|
||||
// must be present before the Map* calls on the site role.
|
||||
app.UseRouting();
|
||||
|
||||
// Observability — mount the always-on Prometheus /metrics scrape endpoint.
|
||||
// AddZbTelemetry (in SiteServiceRegistration.Configure → BindSharedOptions)
|
||||
// wires the OTel Resource + standard instrumentation + Prometheus exporter;
|
||||
// this exposes them on the site node too.
|
||||
app.MapZbMetrics();
|
||||
|
||||
// Map gRPC service — resolves the singleton SiteStreamGrpcServer from DI
|
||||
app.MapGrpcService<ZB.MOM.WW.ScadaBridge.Communication.Grpc.SiteStreamGrpcServer>();
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ using ZB.MOM.WW.ScadaBridge.AuditLog;
|
||||
using ZB.MOM.WW.ScadaBridge.ClusterInfrastructure;
|
||||
using ZB.MOM.WW.ScadaBridge.Communication;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
|
||||
using ZB.MOM.WW.ScadaBridge.DataConnectionLayer;
|
||||
using ZB.MOM.WW.ScadaBridge.ExternalSystemGateway;
|
||||
using ZB.MOM.WW.ScadaBridge.HealthMonitoring;
|
||||
@@ -11,6 +12,7 @@ using ZB.MOM.WW.ScadaBridge.NotificationService;
|
||||
using ZB.MOM.WW.ScadaBridge.SiteEventLogging;
|
||||
using ZB.MOM.WW.ScadaBridge.SiteRuntime;
|
||||
using ZB.MOM.WW.ScadaBridge.StoreAndForward;
|
||||
using ZB.MOM.WW.Telemetry;
|
||||
|
||||
namespace ZB.MOM.WW.ScadaBridge.Host;
|
||||
|
||||
@@ -103,10 +105,16 @@ public static class SiteServiceRegistration
|
||||
public static void BindSharedOptions(IServiceCollection services, IConfiguration config)
|
||||
{
|
||||
services.Configure<NodeOptions>(config.GetSection("ScadaBridge:Node"));
|
||||
services.Configure<ClusterOptions>(config.GetSection("ScadaBridge:Cluster"));
|
||||
// Bind + eagerly validate: ClusterOptionsValidator is registered (TryAddEnumerable)
|
||||
// by the ClusterInfrastructure module, so chaining ValidateOnStart() here makes a bad
|
||||
// ScadaBridge:Cluster section fail fast at host build instead of lazily on first resolve.
|
||||
services.AddOptions<ClusterOptions>().Bind(config.GetSection("ScadaBridge:Cluster")).ValidateOnStart();
|
||||
services.Configure<DatabaseOptions>(config.GetSection("ScadaBridge:Database"));
|
||||
services.Configure<CommunicationOptions>(config.GetSection("ScadaBridge:Communication"));
|
||||
services.Configure<HealthMonitoringOptions>(config.GetSection("ScadaBridge:HealthMonitoring"));
|
||||
// Bind + eagerly validate: HealthMonitoringOptionsValidator is registered (TryAddEnumerable)
|
||||
// by the HealthMonitoring module, so chaining ValidateOnStart() here makes a bad
|
||||
// ScadaBridge:HealthMonitoring section fail fast at host build instead of lazily on first resolve.
|
||||
services.AddOptions<HealthMonitoringOptions>().Bind(config.GetSection("ScadaBridge:HealthMonitoring")).ValidateOnStart();
|
||||
services.Configure<NotificationOptions>(config.GetSection("ScadaBridge:Notification"));
|
||||
services.Configure<LoggingOptions>(config.GetSection("ScadaBridge:Logging"));
|
||||
|
||||
@@ -114,5 +122,26 @@ public static class SiteServiceRegistration
|
||||
// writers so they can stamp the SourceNode column. Registered here in
|
||||
// shared bootstrap because every node (central + site) needs it.
|
||||
services.AddSingleton<INodeIdentityProvider, NodeIdentityProvider>();
|
||||
|
||||
// Observability — shared ZB.MOM.WW.Telemetry. Registered in shared bootstrap so
|
||||
// BOTH the central and site composition roots wire the OTel Resource (the
|
||||
// service.name/site.id/node.role identity triple) + standard instrumentation +
|
||||
// the always-on Prometheus exporter. Mount the /metrics scrape endpoint per role
|
||||
// with app.MapZbMetrics(). The same `?? "central"` SiteId default Program.cs uses
|
||||
// is applied here so the Resource attribute matches the log-enricher value.
|
||||
// The application meter is named so OTel observes its instruments; emit points are
|
||||
// wired by follow-on tasks (the instruments are no-op until a listener attaches).
|
||||
services.AddZbTelemetry(o =>
|
||||
{
|
||||
o.ServiceName = "scadabridge";
|
||||
o.SiteId = config["ScadaBridge:Node:SiteId"] ?? "central";
|
||||
o.NodeRole = config["ScadaBridge:Node:Role"];
|
||||
o.Meters = [ScadaBridgeTelemetry.MeterName];
|
||||
if (Enum.TryParse<ZbExporter>(config["ScadaBridge:Telemetry:Exporter"], ignoreCase: true, out var exporter))
|
||||
o.Exporter = exporter;
|
||||
var otlp = config["ScadaBridge:Telemetry:OtlpEndpoint"];
|
||||
if (!string.IsNullOrWhiteSpace(otlp))
|
||||
o.OtlpEndpoint = otlp;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
using ZB.MOM.WW.Configuration;
|
||||
|
||||
namespace ZB.MOM.WW.ScadaBridge.Host;
|
||||
|
||||
/// <summary>
|
||||
@@ -10,77 +12,98 @@ public static class StartupValidator
|
||||
/// <param name="configuration">The application configuration to validate.</param>
|
||||
public static void Validate(IConfiguration configuration)
|
||||
{
|
||||
var errors = new List<string>();
|
||||
|
||||
// Resolve the same locals the original imperative validator used, so the
|
||||
// cross-field predicates below can close over them. ConfigPreflight.Require
|
||||
// passes config[key] to each predicate, but the cross-field rules ignore that
|
||||
// argument and read these resolved values instead — preserving the exact
|
||||
// conditions (and therefore the byte-identical failure messages and ordering)
|
||||
// of the original StartupValidator.
|
||||
var nodeSection = configuration.GetSection("ScadaBridge:Node");
|
||||
var role = nodeSection["Role"];
|
||||
if (string.IsNullOrEmpty(role) || (role != "Central" && role != "Site"))
|
||||
errors.Add("ScadaBridge:Node:Role must be 'Central' or 'Site'");
|
||||
|
||||
if (string.IsNullOrEmpty(nodeSection["NodeHostname"]))
|
||||
errors.Add("ScadaBridge:Node:NodeHostname is required");
|
||||
|
||||
var portStr = nodeSection["RemotingPort"];
|
||||
if (!int.TryParse(portStr, out var port) || port < 1 || port > 65535)
|
||||
errors.Add("ScadaBridge:Node:RemotingPort must be 1-65535");
|
||||
|
||||
if (role == "Site" && string.IsNullOrEmpty(nodeSection["SiteId"]))
|
||||
errors.Add("ScadaBridge:Node:SiteId is required for Site nodes");
|
||||
|
||||
if (role == "Central")
|
||||
{
|
||||
var dbSection = configuration.GetSection("ScadaBridge:Database");
|
||||
if (string.IsNullOrEmpty(dbSection["ConfigurationDb"]))
|
||||
errors.Add("ScadaBridge:Database:ConfigurationDb connection string required for Central");
|
||||
|
||||
var secSection = configuration.GetSection("ScadaBridge:Security");
|
||||
if (string.IsNullOrEmpty(secSection["LdapServer"]))
|
||||
errors.Add("ScadaBridge:Security:LdapServer required for Central");
|
||||
if (string.IsNullOrEmpty(secSection["JwtSigningKey"]))
|
||||
errors.Add("ScadaBridge:Security:JwtSigningKey required for Central");
|
||||
}
|
||||
bool portValid = int.TryParse(portStr, out var port) && port >= 1 && port <= 65535;
|
||||
|
||||
var seedNodes = configuration.GetSection("ScadaBridge:Cluster:SeedNodes").Get<List<string>>();
|
||||
if (seedNodes == null || seedNodes.Count < 2)
|
||||
errors.Add("ScadaBridge:Cluster:SeedNodes must have at least 2 entries");
|
||||
|
||||
if (role == "Site")
|
||||
{
|
||||
var grpcPortStr = nodeSection["GrpcPort"];
|
||||
int grpcPort = 8083; // NodeOptions default when the key is absent
|
||||
if (grpcPortStr != null && (!int.TryParse(grpcPortStr, out grpcPort) || grpcPort < 1 || grpcPort > 65535))
|
||||
errors.Add("ScadaBridge:Node:GrpcPort must be 1-65535");
|
||||
// GrpcPort: default 8083 when absent; only fails the range rule when the key is
|
||||
// present AND invalid. The out-param assignment mirrors the original so the
|
||||
// resolved grpcPort feeds the cross-field rules even on a parse failure.
|
||||
var grpcPortStr = nodeSection["GrpcPort"];
|
||||
int grpcPort = 8083; // NodeOptions default when the key is absent
|
||||
bool grpcValid = !(grpcPortStr != null && (!int.TryParse(grpcPortStr, out grpcPort) || grpcPort < 1 || grpcPort > 65535));
|
||||
|
||||
// Host-007 / REQ-HOST-4: the gRPC (Kestrel HTTP/2) port and the Akka
|
||||
// remoting port must differ. Identical values make Kestrel and
|
||||
// Akka.Remote contend for the same TCP port and fail opaquely at
|
||||
// runtime. Uses the resolved GrpcPort, including the 8083 default.
|
||||
if (port == grpcPort)
|
||||
errors.Add("ScadaBridge:Node:GrpcPort must differ from RemotingPort");
|
||||
// MetricsPort: default 8084 when absent; same parse-or-default contract as GrpcPort.
|
||||
var metricsPortStr = nodeSection["MetricsPort"];
|
||||
int metricsPort = 8084; // NodeOptions default when the key is absent
|
||||
bool metricsValid = !(metricsPortStr != null && (!int.TryParse(metricsPortStr, out metricsPort) || metricsPort < 1 || metricsPort > 65535));
|
||||
|
||||
var dbSection = configuration.GetSection("ScadaBridge:Database");
|
||||
if (string.IsNullOrEmpty(dbSection["SiteDbPath"]))
|
||||
errors.Add("ScadaBridge:Database:SiteDbPath required for Site nodes");
|
||||
|
||||
// Host-004: a seed node must reference an Akka.Remote endpoint, never the
|
||||
// Kestrel HTTP/2 gRPC port. A seed entry whose port equals this node's
|
||||
// GrpcPort would make a joining node attempt an Akka.Remote TCP
|
||||
// association against the gRPC listener and fail.
|
||||
if (seedNodes != null)
|
||||
ConfigPreflight.For(configuration)
|
||||
// Role / NodeHostname / RemotingPort (unconditional)
|
||||
.Require("ScadaBridge:Node:Role",
|
||||
_ => !(string.IsNullOrEmpty(role) || (role != "Central" && role != "Site")),
|
||||
"must be 'Central' or 'Site'")
|
||||
.Require("ScadaBridge:Node:NodeHostname",
|
||||
_ => !string.IsNullOrEmpty(nodeSection["NodeHostname"]),
|
||||
"is required")
|
||||
.Require("ScadaBridge:Node:RemotingPort",
|
||||
_ => portValid,
|
||||
"must be 1-65535")
|
||||
// SiteId (Site only) — note: OUTSIDE the big Site block in the original,
|
||||
// so it must run before the unconditional SeedNodes-count rule.
|
||||
.When(role == "Site", p => p
|
||||
.Require("ScadaBridge:Node:SiteId",
|
||||
_ => !string.IsNullOrEmpty(nodeSection["SiteId"]),
|
||||
"is required for Site nodes"))
|
||||
// Central-only database/security rules.
|
||||
.When(role == "Central", p => p
|
||||
.Require("ScadaBridge:Database:ConfigurationDb",
|
||||
_ => !string.IsNullOrEmpty(configuration.GetSection("ScadaBridge:Database")["ConfigurationDb"]),
|
||||
"connection string required for Central")
|
||||
.Require("ScadaBridge:Security:LdapServer",
|
||||
_ => !string.IsNullOrEmpty(configuration.GetSection("ScadaBridge:Security")["LdapServer"]),
|
||||
"required for Central")
|
||||
.Require("ScadaBridge:Security:JwtSigningKey",
|
||||
_ => !string.IsNullOrEmpty(configuration.GetSection("ScadaBridge:Security")["JwtSigningKey"]),
|
||||
"required for Central"))
|
||||
// SeedNodes count (unconditional, after SiteId).
|
||||
.Require("ScadaBridge:Cluster:SeedNodes",
|
||||
_ => seedNodes != null && seedNodes.Count >= 2,
|
||||
"must have at least 2 entries")
|
||||
// The big Site-only block: GrpcPort/MetricsPort validity + cross-field
|
||||
// collisions + SiteDbPath + seed-node-port loop, in the original order.
|
||||
.When(role == "Site", p =>
|
||||
{
|
||||
foreach (var seed in seedNodes)
|
||||
{
|
||||
if (SeedNodePort(seed) == grpcPort)
|
||||
errors.Add(
|
||||
$"ScadaBridge:Cluster:SeedNodes entry '{seed}' must not target the gRPC port " +
|
||||
$"({grpcPort}); seed nodes must reference Akka remoting ports");
|
||||
}
|
||||
}
|
||||
}
|
||||
// Host-007 / REQ-HOST-4: GrpcPort range, then GrpcPort vs RemotingPort.
|
||||
p.Require("ScadaBridge:Node:GrpcPort", _ => grpcValid, "must be 1-65535");
|
||||
// Identical GrpcPort/RemotingPort make Kestrel and Akka.Remote contend
|
||||
// for the same TCP port. Uses the resolved GrpcPort, including 8083.
|
||||
p.Require("ScadaBridge:Node:GrpcPort", _ => port != grpcPort, "must differ from RemotingPort");
|
||||
|
||||
if (errors.Count > 0)
|
||||
throw new InvalidOperationException(
|
||||
$"Configuration validation failed:\n{string.Join("\n", errors.Select(e => $" - {e}"))}");
|
||||
// Host-007 / REQ-HOST-4: MetricsPort range, then MetricsPort vs both ports.
|
||||
p.Require("ScadaBridge:Node:MetricsPort", _ => metricsValid, "must be 1-65535");
|
||||
// The Kestrel metrics (HTTP/1.1) listener port must differ from BOTH the
|
||||
// Akka remoting port and the gRPC port. Uses the resolved MetricsPort (8084 default).
|
||||
p.Require("ScadaBridge:Node:MetricsPort", _ => metricsPort != port, "must differ from RemotingPort");
|
||||
p.Require("ScadaBridge:Node:MetricsPort", _ => metricsPort != grpcPort, "must differ from GrpcPort");
|
||||
|
||||
p.Require("ScadaBridge:Database:SiteDbPath",
|
||||
_ => !string.IsNullOrEmpty(configuration.GetSection("ScadaBridge:Database")["SiteDbPath"]),
|
||||
"required for Site nodes");
|
||||
|
||||
// Host-004: a seed node must reference an Akka.Remote endpoint, never the
|
||||
// Kestrel HTTP/2 gRPC port. A seed entry whose port equals this node's
|
||||
// GrpcPort would make a joining node attempt an Akka.Remote TCP
|
||||
// association against the gRPC listener and fail.
|
||||
foreach (var seed in seedNodes ?? Enumerable.Empty<string>())
|
||||
{
|
||||
p.Require("ScadaBridge:Cluster:SeedNodes",
|
||||
_ => SeedNodePort(seed) != grpcPort,
|
||||
$"entry '{seed}' must not target the gRPC port " +
|
||||
$"({grpcPort}); seed nodes must reference Akka remoting ports");
|
||||
}
|
||||
})
|
||||
.ThrowIfInvalid();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
||||
@@ -28,9 +28,12 @@
|
||||
<!-- Transitive override: Akka.Hosting 1.5.62 pins OpenTelemetry.Api 1.9.0 which is flagged
|
||||
(GHSA-g94r-2vxg-569j, GHSA-8785-wc3w-h8q6). Bumping directly clears both advisories. -->
|
||||
<PackageReference Include="OpenTelemetry.Api" />
|
||||
<PackageReference Include="ZB.MOM.WW.Configuration" />
|
||||
<PackageReference Include="ZB.MOM.WW.Health" />
|
||||
<PackageReference Include="ZB.MOM.WW.Health.Akka" />
|
||||
<PackageReference Include="ZB.MOM.WW.Health.EntityFrameworkCore" />
|
||||
<PackageReference Include="ZB.MOM.WW.Telemetry" />
|
||||
<PackageReference Include="ZB.MOM.WW.Telemetry.Serilog" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
"SiteId": "site-a",
|
||||
"RemotingPort": 8082,
|
||||
"GrpcPort": 8083,
|
||||
"MetricsPort": 8084,
|
||||
"NodeName": "node-a"
|
||||
},
|
||||
"Cluster": {
|
||||
|
||||
@@ -5,6 +5,7 @@ using Microsoft.AspNetCore.Routing;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
|
||||
using ZB.MOM.WW.ScadaBridge.InboundAPI.Middleware;
|
||||
|
||||
namespace ZB.MOM.WW.ScadaBridge.InboundAPI;
|
||||
@@ -44,6 +45,16 @@ public static class EndpointExtensions
|
||||
|
||||
if (!validationResult.IsValid)
|
||||
{
|
||||
// Telemetry follow-on: count every inbound request, including auth
|
||||
// failures. The raw {methodName} route value is arbitrary caller input
|
||||
// and would be high-cardinality, so failures are tagged with a small
|
||||
// bounded set of sentinels keyed off the validator's status code rather
|
||||
// than the unvalidated name (401 → "<unauthorized>", 403 → "<forbidden>").
|
||||
ScadaBridgeTelemetry.RecordInboundApiRequest(
|
||||
validationResult.StatusCode == StatusCodes.Status401Unauthorized
|
||||
? "<unauthorized>"
|
||||
: "<forbidden>");
|
||||
|
||||
// WP-5: Failures-only logging
|
||||
logger.LogWarning(
|
||||
"Inbound API auth failure for method {Method}: {Error} (status {StatusCode})",
|
||||
@@ -56,6 +67,12 @@ public static class EndpointExtensions
|
||||
|
||||
var method = validationResult.Method!;
|
||||
|
||||
// Telemetry follow-on: count this inbound request against the resolved,
|
||||
// registered method name. method.Name comes from the repository's method
|
||||
// catalogue (an exact-name lookup), so the `method` tag is bounded to the
|
||||
// set of configured API methods — never the raw caller-supplied route value.
|
||||
ScadaBridgeTelemetry.RecordInboundApiRequest(method.Name);
|
||||
|
||||
// Audit Log (#23 M4 Bundle D): publish the resolved API key name so
|
||||
// AuditWriteMiddleware can populate AuditEvent.Actor in its finally
|
||||
// block. Done AFTER validation succeeded — auth failures leave the
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
using Microsoft.Extensions.Options;
|
||||
using ZB.MOM.WW.Configuration;
|
||||
|
||||
namespace ZB.MOM.WW.ScadaBridge.Security;
|
||||
|
||||
@@ -29,7 +29,7 @@ namespace ZB.MOM.WW.ScadaBridge.Security;
|
||||
/// minimum-byte length contract co-located with the type that enforces it.
|
||||
/// </para>
|
||||
/// </summary>
|
||||
public sealed class SecurityOptionsValidator : IValidateOptions<SecurityOptions>
|
||||
public sealed class SecurityOptionsValidator : OptionsValidatorBase<SecurityOptions>
|
||||
{
|
||||
/// <summary>
|
||||
/// The configuration section name <see cref="SecurityOptions"/> is bound
|
||||
@@ -41,30 +41,16 @@ public sealed class SecurityOptionsValidator : IValidateOptions<SecurityOptions>
|
||||
public const string ConfigSectionName = "Security";
|
||||
|
||||
/// <inheritdoc />
|
||||
public ValidateOptionsResult Validate(string? name, SecurityOptions options)
|
||||
protected override void Validate(ValidationBuilder builder, SecurityOptions options)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(options);
|
||||
builder.RequireThat(!string.IsNullOrWhiteSpace(options.LdapServer),
|
||||
$"{ConfigSectionName}:{nameof(SecurityOptions.LdapServer)} is required " +
|
||||
"but was empty or whitespace — set it to the LDAP server hostname or IP " +
|
||||
"(e.g. \"ldap.example.com\").");
|
||||
|
||||
var failures = new List<string>();
|
||||
|
||||
if (string.IsNullOrWhiteSpace(options.LdapServer))
|
||||
{
|
||||
failures.Add(
|
||||
$"{ConfigSectionName}:{nameof(SecurityOptions.LdapServer)} is required " +
|
||||
"but was empty or whitespace — set it to the LDAP server hostname or IP " +
|
||||
"(e.g. \"ldap.example.com\").");
|
||||
}
|
||||
|
||||
if (string.IsNullOrWhiteSpace(options.LdapSearchBase))
|
||||
{
|
||||
failures.Add(
|
||||
$"{ConfigSectionName}:{nameof(SecurityOptions.LdapSearchBase)} is required " +
|
||||
"but was empty or whitespace — set it to the search-base DN " +
|
||||
"(e.g. \"dc=example,dc=com\").");
|
||||
}
|
||||
|
||||
return failures.Count == 0
|
||||
? ValidateOptionsResult.Success
|
||||
: ValidateOptionsResult.Fail(failures);
|
||||
builder.RequireThat(!string.IsNullOrWhiteSpace(options.LdapSearchBase),
|
||||
$"{ConfigSectionName}:{nameof(SecurityOptions.LdapSearchBase)} is required " +
|
||||
"but was empty or whitespace — set it to the search-base DN " +
|
||||
"(e.g. \"dc=example,dc=com\").");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
<PackageReference Include="Microsoft.AspNetCore.Authorization" />
|
||||
<PackageReference Include="System.IdentityModel.Tokens.Jwt" />
|
||||
<PackageReference Include="Novell.Directory.Ldap.NETStandard" />
|
||||
<PackageReference Include="ZB.MOM.WW.Configuration" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
using Microsoft.Extensions.Logging;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Types;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
|
||||
|
||||
@@ -98,6 +99,48 @@ public class StoreAndForwardService
|
||||
/// </summary>
|
||||
private static readonly TimeSpan SweepShutdownWaitTimeout = TimeSpan.FromSeconds(10);
|
||||
|
||||
/// <summary>
|
||||
/// WP-14 (telemetry): cached count of messages currently buffered for
|
||||
/// forwarding — i.e. rows in <see cref="StoreAndForwardMessageStatus.Pending"/>,
|
||||
/// the live store-and-forward queue waiting to be delivered. This backs the
|
||||
/// <c>scadabridge.store_and_forward.queue.depth</c> observable gauge.
|
||||
/// <para>
|
||||
/// The gauge's collection callback is synchronous and is invoked frequently by
|
||||
/// the OpenTelemetry/Prometheus collector, so it must never run an async SQLite
|
||||
/// <c>COUNT(*)</c>. Instead this <see cref="long"/> is seeded once from storage
|
||||
/// in <see cref="StartAsync"/> and then adjusted in-process on the existing
|
||||
/// paths that change the Pending population: <see cref="BufferAsync"/> (+1),
|
||||
/// successful-retry removal and Pending→Parked transitions in
|
||||
/// <see cref="RetryMessageAsync"/> (-1), and operator requeue in
|
||||
/// <see cref="RetryParkedMessageAsync"/> (+1). The provider registered with
|
||||
/// <see cref="ScadaBridgeTelemetry.SetQueueDepthProvider"/> reads it via
|
||||
/// <see cref="Interlocked.Read"/> — non-blocking and sync-safe. It is an
|
||||
/// approximate, eventually-consistent gauge (concurrent failover replication
|
||||
/// applies to the standby's own store, not this counter), which is exactly
|
||||
/// what a queue-depth metric needs.
|
||||
/// </para>
|
||||
/// </summary>
|
||||
private long _bufferedCount;
|
||||
|
||||
/// <summary>
|
||||
/// Test seam (WP-14 telemetry): simulates a concurrent pre-seed
|
||||
/// <see cref="BufferAsync"/> increment landing on <see cref="_bufferedCount"/>
|
||||
/// before <see cref="StartAsync"/> seeds it, so a test can prove the seed uses
|
||||
/// <see cref="Interlocked.Add"/> (additive) rather than Exchange (clobbering).
|
||||
/// </summary>
|
||||
internal void TestOnly_IncrementBufferedCount() =>
|
||||
Interlocked.Increment(ref _bufferedCount);
|
||||
|
||||
/// <summary>
|
||||
/// WP-14 (telemetry): an instance field that guards against a single instance
|
||||
/// registering the queue-depth provider (and re-seeding the counter) more than
|
||||
/// once — e.g. a second <see cref="StartAsync"/> on the same instance. It does NOT
|
||||
/// coordinate across instances: the gauge slot in <see cref="ScadaBridgeTelemetry"/>
|
||||
/// is process-global, so in a multi-instance process the last <see cref="StartAsync"/>
|
||||
/// wins the global slot. 0 = not yet registered, 1 = done.
|
||||
/// </summary>
|
||||
private int _queueDepthProviderRegistered;
|
||||
|
||||
/// <summary>
|
||||
/// WP-10: Delivery handler delegate. The return value / exception is interpreted
|
||||
/// the same way on both the immediate-delivery path (<see cref="EnqueueAsync"/>)
|
||||
@@ -170,6 +213,27 @@ public class StoreAndForwardService
|
||||
public async Task StartAsync()
|
||||
{
|
||||
await _storage.InitializeAsync();
|
||||
|
||||
// WP-14 (telemetry): seed the cached buffered-message count from the
|
||||
// store exactly once (the gauge callback cannot run an async COUNT), then
|
||||
// register the sync, non-blocking provider with the process-global
|
||||
// ScadaBridgeTelemetry gauge. Both steps are inside the one-time guard so a
|
||||
// second StartAsync on the same instance cannot double-seed.
|
||||
//
|
||||
// The seed is an Interlocked.Add — NOT an Exchange — to avoid a startup race:
|
||||
// between the await above returning and this point, a concurrent BufferAsync
|
||||
// could already have Interlocked.Increment'd _bufferedCount. Exchange would
|
||||
// clobber that increment (losing a +1); Add preserves it. _bufferedCount
|
||||
// starts at 0 and only BufferAsync increments it before the seed, so
|
||||
// 0 + pending + (any concurrent increments) is the correct live count.
|
||||
var pending = await _storage.GetMessageCountByStatusAsync(
|
||||
StoreAndForwardMessageStatus.Pending);
|
||||
if (Interlocked.CompareExchange(ref _queueDepthProviderRegistered, 1, 0) == 0)
|
||||
{
|
||||
Interlocked.Add(ref _bufferedCount, pending);
|
||||
ScadaBridgeTelemetry.SetQueueDepthProvider(() => Interlocked.Read(ref _bufferedCount));
|
||||
}
|
||||
|
||||
_retryTimer = new Timer(
|
||||
// StoreAndForward-024: capture the sweep Task on each tick so
|
||||
// StopAsync can await any in-flight invocation before the host
|
||||
@@ -396,6 +460,10 @@ public class StoreAndForwardService
|
||||
{
|
||||
await _storage.EnqueueAsync(message);
|
||||
_replication?.ReplicateEnqueue(message);
|
||||
// WP-14 (telemetry): a freshly buffered row is Pending → grows the live
|
||||
// queue depth. Bumped after the durable write so the gauge never leads the
|
||||
// store.
|
||||
Interlocked.Increment(ref _bufferedCount);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -452,6 +520,8 @@ public class StoreAndForwardService
|
||||
{
|
||||
await _storage.RemoveMessageAsync(message.Id);
|
||||
_replication?.ReplicateRemove(message.Id);
|
||||
// WP-14 (telemetry): a delivered row leaves the Pending queue.
|
||||
Interlocked.Decrement(ref _bufferedCount);
|
||||
RaiseActivity("Delivered", message.Category,
|
||||
$"Delivered to {message.Target} after {message.RetryCount} retries");
|
||||
|
||||
@@ -483,6 +553,9 @@ public class StoreAndForwardService
|
||||
message.Id);
|
||||
return;
|
||||
}
|
||||
// WP-14 (telemetry): the row committed Pending→Parked, leaving the live
|
||||
// forward queue. Only counted when the conditional update actually won.
|
||||
Interlocked.Decrement(ref _bufferedCount);
|
||||
_replication?.ReplicatePark(message);
|
||||
RaiseActivity("Parked", message.Category,
|
||||
$"Permanent failure for {message.Target}: handler returned false");
|
||||
@@ -519,6 +592,9 @@ public class StoreAndForwardService
|
||||
message.Id);
|
||||
return;
|
||||
}
|
||||
// WP-14 (telemetry): the row committed Pending→Parked, leaving the
|
||||
// live forward queue. Only counted when the conditional update won.
|
||||
Interlocked.Decrement(ref _bufferedCount);
|
||||
_replication?.ReplicatePark(message);
|
||||
RaiseActivity("Parked", message.Category,
|
||||
$"Max retries ({message.MaxRetries}) reached for {message.Target}");
|
||||
@@ -737,6 +813,11 @@ public class StoreAndForwardService
|
||||
return false;
|
||||
}
|
||||
|
||||
// WP-14 (telemetry): an operator requeue moves Parked→Pending, re-adding the
|
||||
// row to the live forward queue. Counted only when the conditional storage
|
||||
// update actually flipped the row.
|
||||
Interlocked.Increment(ref _bufferedCount);
|
||||
|
||||
// The active node just rewrote this row to Pending with retry_count = 0
|
||||
// and cleared last_error / last_attempt_at (see
|
||||
// StoreAndForwardStorage.RetryParkedMessageAsync). Reconstruct the
|
||||
@@ -769,6 +850,7 @@ public class StoreAndForwardService
|
||||
{
|
||||
// Capture the category before the row is deleted so the activity log is
|
||||
// labelled correctly.
|
||||
// WP-14 (telemetry): Parked rows are not in _bufferedCount; discarding a Parked row needs no counter adjustment.
|
||||
var message = await _storage.GetMessageByIdAsync(messageId);
|
||||
var success = await _storage.DiscardParkedMessageAsync(messageId);
|
||||
if (success)
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
using System.Diagnostics.Metrics;
|
||||
using System.Threading.Channels;
|
||||
using Akka.Actor;
|
||||
using Akka.TestKit.Xunit2;
|
||||
@@ -5,6 +6,7 @@ using Grpc.Core;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using NSubstitute;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
|
||||
using ZB.MOM.WW.ScadaBridge.Communication.Grpc;
|
||||
|
||||
namespace ZB.MOM.WW.ScadaBridge.Communication.Tests.Grpc;
|
||||
@@ -342,6 +344,59 @@ public class SiteStreamGrpcServerTests : TestKit
|
||||
subscriber.DidNotReceive().RemoveSubscriber(Arg.Any<IActorRef>());
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task SiteConnectionUpGauge_GoesToOneOnConnect_AndBackToZeroOnCancel()
|
||||
{
|
||||
// Telemetry follow-on: the scadabridge.site.connection.up gauge must read
|
||||
// exactly 1 while a site stream is established and return to 0 once the
|
||||
// stream terminates on the cancel path — proving SiteConnectionOpened() is
|
||||
// matched by exactly one SiteConnectionClosed() in the handler's finally.
|
||||
var server = CreateServer();
|
||||
server.SetReady(Sys);
|
||||
|
||||
long ReadGauge()
|
||||
{
|
||||
long observed = 0;
|
||||
using var listener = new MeterListener();
|
||||
listener.InstrumentPublished = (instrument, l) =>
|
||||
{
|
||||
if (instrument.Meter.Name == ScadaBridgeTelemetry.MeterName &&
|
||||
instrument.Name == "scadabridge.site.connection.up")
|
||||
{
|
||||
l.EnableMeasurementEvents(instrument);
|
||||
}
|
||||
};
|
||||
listener.SetMeasurementEventCallback<long>((_, measurement, _, _) => observed = measurement);
|
||||
listener.Start();
|
||||
listener.RecordObservableInstruments();
|
||||
return observed;
|
||||
}
|
||||
|
||||
var baseline = ReadGauge();
|
||||
|
||||
var cts = new CancellationTokenSource();
|
||||
var context = CreateMockContext(cts.Token);
|
||||
var writer = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
|
||||
|
||||
var streamTask = Task.Run(() => server.SubscribeInstance(
|
||||
MakeRequest("corr-gauge", "Site1.Pump01"), writer, context));
|
||||
|
||||
await WaitForConditionAsync(() => server.ActiveStreamCount == 1);
|
||||
|
||||
// While the stream is up the gauge is one above whatever baseline other
|
||||
// (possibly parallel) tests left behind — read relative so the assertion
|
||||
// is robust to test interleaving on the process-wide static counter.
|
||||
Assert.Equal(baseline + 1, ReadGauge());
|
||||
|
||||
cts.Cancel();
|
||||
await streamTask;
|
||||
await WaitForConditionAsync(() => server.ActiveStreamCount == 0);
|
||||
|
||||
// After the cancel path runs the finally, the gauge is balanced back to
|
||||
// the baseline — no leaked "up" count.
|
||||
Assert.Equal(baseline, ReadGauge());
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void SetReady_AllowsStreamCreation()
|
||||
{
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
using System.Diagnostics.Metrics;
|
||||
using Akka.Actor;
|
||||
using Akka.TestKit.Xunit2;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
@@ -10,6 +11,7 @@ using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Lifecycle;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Types;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Types.Flattening;
|
||||
@@ -558,6 +560,106 @@ public class DeploymentServiceTests : TestKit
|
||||
Arg.Any<object>(), Arg.Any<CancellationToken>());
|
||||
}
|
||||
|
||||
// ── Telemetry follow-on: scadabridge.deployments.applied on deploy success ──
|
||||
|
||||
[Fact]
|
||||
public async Task DeployInstanceAsync_SiteSucceeds_EmitsDeploymentsAppliedCounterOnce()
|
||||
{
|
||||
// A successful deployment must increment the
|
||||
// scadabridge.deployments.applied counter exactly once — one
|
||||
// DeployInstanceAsync deploys one instance to one site, so the unit is
|
||||
// one increment per successful deploy operation.
|
||||
var instance = new Instance("MetricInst") { Id = 55, SiteId = 1, State = InstanceState.NotDeployed };
|
||||
_repo.GetInstanceByIdAsync(55, Arg.Any<CancellationToken>()).Returns(instance);
|
||||
SetupValidPipeline(55, "MetricInst", "sha256:target");
|
||||
_repo.GetCurrentDeploymentStatusAsync(55, Arg.Any<CancellationToken>())
|
||||
.Returns((DeploymentRecord?)null);
|
||||
|
||||
var counters = new ReconcileProbeCounters();
|
||||
var commActor = Sys.ActorOf(Props.Create(() =>
|
||||
new ReconcileProbeActor(counters, siteHash: "sha256:target", failQuery: false)));
|
||||
var service = CreateServiceWithCommActor(commActor);
|
||||
|
||||
long applied = 0;
|
||||
using var listener = new MeterListener
|
||||
{
|
||||
InstrumentPublished = (instrument, l) =>
|
||||
{
|
||||
if (instrument.Meter.Name == ScadaBridgeTelemetry.MeterName
|
||||
&& instrument.Name == "scadabridge.deployments.applied")
|
||||
{
|
||||
l.EnableMeasurementEvents(instrument);
|
||||
}
|
||||
}
|
||||
};
|
||||
listener.SetMeasurementEventCallback<long>((_, measurement, _, _) =>
|
||||
Interlocked.Add(ref applied, measurement));
|
||||
listener.Start();
|
||||
|
||||
var result = await service.DeployInstanceAsync(55, "admin");
|
||||
|
||||
listener.Dispose();
|
||||
|
||||
Assert.True(result.IsSuccess);
|
||||
// Fresh first-time deploy applied -> exactly one increment.
|
||||
Assert.Equal(1, Interlocked.Read(ref applied));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DeployInstanceAsync_Reconciled_DoesNotEmitDeploymentsAppliedCounter()
|
||||
{
|
||||
// The reconciliation path recovers a PRIOR timed-out apply rather than
|
||||
// performing a fresh one; counting it would risk double-counting the
|
||||
// original apply, so scadabridge.deployments.applied must NOT increment
|
||||
// on a reconciled (no re-deploy) success.
|
||||
var instance = new Instance("MetricReconcileInst")
|
||||
{
|
||||
Id = 56, SiteId = 1, State = InstanceState.NotDeployed
|
||||
};
|
||||
_repo.GetInstanceByIdAsync(56, Arg.Any<CancellationToken>()).Returns(instance);
|
||||
SetupValidPipeline(56, "MetricReconcileInst", "sha256:target");
|
||||
|
||||
var prior = new DeploymentRecord("dep-prior-56", "admin")
|
||||
{
|
||||
InstanceId = 56,
|
||||
Status = DeploymentStatus.InProgress,
|
||||
RevisionHash = "sha256:target"
|
||||
};
|
||||
_repo.GetCurrentDeploymentStatusAsync(56, Arg.Any<CancellationToken>()).Returns(prior);
|
||||
_repo.GetDeployedSnapshotByInstanceIdAsync(56, Arg.Any<CancellationToken>())
|
||||
.Returns((DeployedConfigSnapshot?)null);
|
||||
|
||||
var counters = new ReconcileProbeCounters();
|
||||
var commActor = Sys.ActorOf(Props.Create(() =>
|
||||
new ReconcileProbeActor(counters, siteHash: "sha256:target", failQuery: false)));
|
||||
var service = CreateServiceWithCommActor(commActor);
|
||||
|
||||
long applied = 0;
|
||||
using var listener = new MeterListener
|
||||
{
|
||||
InstrumentPublished = (instrument, l) =>
|
||||
{
|
||||
if (instrument.Meter.Name == ScadaBridgeTelemetry.MeterName
|
||||
&& instrument.Name == "scadabridge.deployments.applied")
|
||||
{
|
||||
l.EnableMeasurementEvents(instrument);
|
||||
}
|
||||
}
|
||||
};
|
||||
listener.SetMeasurementEventCallback<long>((_, measurement, _, _) =>
|
||||
Interlocked.Add(ref applied, measurement));
|
||||
listener.Start();
|
||||
|
||||
var result = await service.DeployInstanceAsync(56, "admin");
|
||||
|
||||
listener.Dispose();
|
||||
|
||||
Assert.True(result.IsSuccess);
|
||||
// Reconciled — no fresh deploy was sent, so no increment.
|
||||
Assert.Equal(0, counters.DeployCount);
|
||||
Assert.Equal(0, Interlocked.Read(ref applied));
|
||||
}
|
||||
|
||||
// ── DeploymentManager-011: lifecycle success paths ──
|
||||
|
||||
[Fact]
|
||||
|
||||
@@ -0,0 +1,79 @@
|
||||
using System.Net;
|
||||
using Microsoft.AspNetCore.Mvc.Testing;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
|
||||
namespace ZB.MOM.WW.ScadaBridge.Host.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// Observability adoption: verifies the shared ZB.MOM.WW.Telemetry Prometheus
|
||||
/// scrape endpoint (<c>/metrics</c>, mounted by <c>app.MapZbMetrics()</c>) is wired into the
|
||||
/// Central composition root. <c>AddZbTelemetry</c> (registered in
|
||||
/// <see cref="SiteServiceRegistration.BindSharedOptions"/>) always wires the Prometheus
|
||||
/// exporter, so the endpoint returns the Prometheus exposition format regardless of DB /
|
||||
/// cluster state. This is a pure route assertion — it requires no database, LDAP, or formed
|
||||
/// Akka cluster. The Central-role factory bootstrap mirrors <see cref="HealthCheckTests"/>.
|
||||
/// </summary>
|
||||
public class MetricsEndpointTests : IDisposable
|
||||
{
|
||||
private readonly List<IDisposable> _disposables = new();
|
||||
|
||||
public MetricsEndpointTests()
|
||||
{
|
||||
// Host-003: connection strings are externalised; supply them via env vars.
|
||||
_disposables.Add(new CentralDbTestEnvironment());
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
foreach (var d in _disposables)
|
||||
{
|
||||
try { d.Dispose(); } catch { /* best effort */ }
|
||||
}
|
||||
}
|
||||
|
||||
private WebApplicationFactory<Program> CreateCentralFactory()
|
||||
{
|
||||
var factory = new WebApplicationFactory<Program>()
|
||||
.WithWebHostBuilder(builder =>
|
||||
{
|
||||
builder.ConfigureAppConfiguration((context, config) =>
|
||||
{
|
||||
config.AddInMemoryCollection(new Dictionary<string, string?>
|
||||
{
|
||||
["ScadaBridge:Node:NodeHostname"] = "localhost",
|
||||
["ScadaBridge:Node:RemotingPort"] = "0",
|
||||
["ScadaBridge:Cluster:SeedNodes:0"] = "akka.tcp://scadabridge@localhost:2551",
|
||||
["ScadaBridge:Cluster:SeedNodes:1"] = "akka.tcp://scadabridge@localhost:2552",
|
||||
["ScadaBridge:Database:SkipMigrations"] = "true",
|
||||
});
|
||||
});
|
||||
builder.UseSetting("ScadaBridge:Node:Role", "Central");
|
||||
builder.UseSetting("ScadaBridge:Database:SkipMigrations", "true");
|
||||
});
|
||||
_disposables.Add(factory);
|
||||
return factory;
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Metrics_Endpoint_IsMapped()
|
||||
{
|
||||
var previousEnv = Environment.GetEnvironmentVariable("DOTNET_ENVIRONMENT");
|
||||
try
|
||||
{
|
||||
Environment.SetEnvironmentVariable("DOTNET_ENVIRONMENT", "Central");
|
||||
var factory = CreateCentralFactory();
|
||||
var client = factory.CreateClient();
|
||||
_disposables.Add(client);
|
||||
|
||||
var response = await client.GetAsync("/metrics");
|
||||
|
||||
Assert.Equal(HttpStatusCode.OK, response.StatusCode);
|
||||
var body = await response.Content.ReadAsStringAsync();
|
||||
Assert.Contains("# ", body); // Prometheus exposition (HELP/TYPE comments)
|
||||
}
|
||||
finally
|
||||
{
|
||||
Environment.SetEnvironmentVariable("DOTNET_ENVIRONMENT", previousEnv);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
|
||||
|
||||
namespace ZB.MOM.WW.ScadaBridge.Host.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// Infrastructure-free guards for <see cref="ScadaBridgeTelemetry"/>: the meter name is the
|
||||
/// stable value registered with OTel, and the emit helpers are safe to call (they are no-op
|
||||
/// until follow-on tasks wire real emit points and a listener attaches).
|
||||
/// </summary>
|
||||
public class ScadaBridgeTelemetryTests
|
||||
{
|
||||
[Fact]
|
||||
public void MeterName_IsStableValue()
|
||||
{
|
||||
Assert.Equal("ZB.MOM.WW.ScadaBridge", ScadaBridgeTelemetry.MeterName);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void EmitHelpers_DoNotThrow()
|
||||
{
|
||||
var ex = Record.Exception(() =>
|
||||
{
|
||||
ScadaBridgeTelemetry.RecordDeploymentApplied();
|
||||
ScadaBridgeTelemetry.RecordInboundApiRequest("X");
|
||||
ScadaBridgeTelemetry.SiteConnectionOpened();
|
||||
ScadaBridgeTelemetry.SiteConnectionClosed();
|
||||
ScadaBridgeTelemetry.SetQueueDepthProvider(() => 5);
|
||||
});
|
||||
|
||||
Assert.Null(ex);
|
||||
}
|
||||
}
|
||||
@@ -352,6 +352,105 @@ public class StartupValidatorTests
|
||||
Assert.Null(ex);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData("0")]
|
||||
[InlineData("-1")]
|
||||
[InlineData("65536")]
|
||||
[InlineData("abc")]
|
||||
public void Site_InvalidMetricsPort_FailsValidation(string metricsPort)
|
||||
{
|
||||
var values = ValidSiteConfig();
|
||||
values["ScadaBridge:Node:MetricsPort"] = metricsPort;
|
||||
var config = BuildConfig(values);
|
||||
|
||||
var ex = Assert.Throws<InvalidOperationException>(() => StartupValidator.Validate(config));
|
||||
Assert.Contains("MetricsPort must be 1-65535", ex.Message);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Site_ValidMetricsPort_PassesValidation()
|
||||
{
|
||||
var values = ValidSiteConfig();
|
||||
values["ScadaBridge:Node:MetricsPort"] = "8084";
|
||||
var config = BuildConfig(values);
|
||||
|
||||
var ex = Record.Exception(() => StartupValidator.Validate(config));
|
||||
Assert.Null(ex);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Site_MetricsPortEqualsRemotingPort_FailsValidation()
|
||||
{
|
||||
// Host-007 regression: the Kestrel metrics (HTTP/1.1) listener port must
|
||||
// differ from RemotingPort. Identical values cause the metrics listener
|
||||
// and Akka.Remote to contend for the same port at runtime.
|
||||
var values = ValidSiteConfig();
|
||||
values["ScadaBridge:Node:RemotingPort"] = "8082";
|
||||
values["ScadaBridge:Node:MetricsPort"] = "8082";
|
||||
var config = BuildConfig(values);
|
||||
|
||||
var ex = Assert.Throws<InvalidOperationException>(() => StartupValidator.Validate(config));
|
||||
Assert.Contains("MetricsPort must differ from RemotingPort", ex.Message);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Site_MetricsPortEqualsGrpcPort_FailsValidation()
|
||||
{
|
||||
// Host-007 regression: the metrics listener port must differ from GrpcPort.
|
||||
var values = ValidSiteConfig();
|
||||
values["ScadaBridge:Node:GrpcPort"] = "8083";
|
||||
values["ScadaBridge:Node:MetricsPort"] = "8083";
|
||||
var config = BuildConfig(values);
|
||||
|
||||
var ex = Assert.Throws<InvalidOperationException>(() => StartupValidator.Validate(config));
|
||||
Assert.Contains("MetricsPort must differ from GrpcPort", ex.Message);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Site_DefaultMetricsPortEqualsRemotingPort_FailsValidation()
|
||||
{
|
||||
// MetricsPort absent => NodeOptions default 8084. A site whose RemotingPort
|
||||
// is also 8084 must still be rejected.
|
||||
var values = ValidSiteConfig();
|
||||
values["ScadaBridge:Node:RemotingPort"] = "8084";
|
||||
// Keep GrpcPort distinct so only the metrics-vs-remoting rule fires.
|
||||
values["ScadaBridge:Node:GrpcPort"] = "8083";
|
||||
// Seed nodes default to the remoting port (8082) in ValidSiteConfig; realign
|
||||
// them to 8084 so the seed-vs-grpc rule is not what trips here.
|
||||
values["ScadaBridge:Cluster:SeedNodes:0"] = "akka.tcp://scadabridge@site-a-node1:8084";
|
||||
values["ScadaBridge:Cluster:SeedNodes:1"] = "akka.tcp://scadabridge@site-a-node2:8084";
|
||||
var config = BuildConfig(values);
|
||||
|
||||
var ex = Assert.Throws<InvalidOperationException>(() => StartupValidator.Validate(config));
|
||||
Assert.Contains("MetricsPort must differ from RemotingPort", ex.Message);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Site_MetricsPortDiffersFromRemotingAndGrpc_PassesValidation()
|
||||
{
|
||||
var values = ValidSiteConfig();
|
||||
values["ScadaBridge:Node:RemotingPort"] = "8082";
|
||||
values["ScadaBridge:Node:GrpcPort"] = "8083";
|
||||
values["ScadaBridge:Node:MetricsPort"] = "8084";
|
||||
var config = BuildConfig(values);
|
||||
|
||||
var ex = Record.Exception(() => StartupValidator.Validate(config));
|
||||
Assert.Null(ex);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Central_InvalidMetricsPort_NotValidated()
|
||||
{
|
||||
// The metrics-port rules apply to Site nodes only; a Central node runs no
|
||||
// metrics listener, so an out-of-range MetricsPort must not fail startup.
|
||||
var values = ValidCentralConfig();
|
||||
values["ScadaBridge:Node:MetricsPort"] = "0";
|
||||
var config = BuildConfig(values);
|
||||
|
||||
var ex = Record.Exception(() => StartupValidator.Validate(config));
|
||||
Assert.Null(ex);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void MultipleErrors_AllReported()
|
||||
{
|
||||
|
||||
@@ -9,8 +9,10 @@ using NSubstitute;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Entities.InboundApi;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Types.InboundApi;
|
||||
using ZB.MOM.WW.ScadaBridge.InboundAPI.Middleware;
|
||||
using System.Diagnostics.Metrics;
|
||||
using System.Net;
|
||||
using System.Text;
|
||||
|
||||
@@ -232,6 +234,114 @@ public class EndpointExtensionsTests
|
||||
Assert.Equal("audit-actor-name", capture.CapturedActor);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ValidRequest_EmitsInboundApiRequestCounter_TaggedWithResolvedMethodName()
|
||||
{
|
||||
// Telemetry follow-on: a successful inbound request increments
|
||||
// scadabridge.inbound_api.requests once, tagged with the resolved,
|
||||
// registered method name (method.Name) — the bounded identifier, not the
|
||||
// raw route value.
|
||||
var key = SeedKey();
|
||||
var method = SeedMethod(1, "echo", "return Parameters[\"value\"];",
|
||||
"""[{"name":"value","type":"Integer","required":true}]""");
|
||||
|
||||
using var collector = new InboundApiRequestCounterCollector();
|
||||
|
||||
using var host = await BuildHostAsync(key, method);
|
||||
var client = host.GetTestClient();
|
||||
|
||||
var response = await client.SendAsync(BuildPost("echo", """{"value":7}"""));
|
||||
|
||||
Assert.Equal(HttpStatusCode.OK, response.StatusCode);
|
||||
// Filter by the method tag this test produced: the counter is a process-wide
|
||||
// static, so a parallel test class could otherwise leak measurements in.
|
||||
var echoTotal = collector.Measurements
|
||||
.Where(m => m.Method == "echo")
|
||||
.Sum(m => m.Value);
|
||||
Assert.Equal(1, echoTotal);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task UnknownMethod_EmitsInboundApiRequestCounter_WithBoundedForbiddenSentinel()
|
||||
{
|
||||
// Telemetry follow-on: an auth/authz failure is still counted, but the
|
||||
// tag is a bounded sentinel ("<forbidden>") rather than the arbitrary
|
||||
// caller-supplied route value — so an attacker posting random method
|
||||
// names cannot blow up the `method` tag cardinality.
|
||||
var key = SeedKey();
|
||||
var method = SeedMethod(1, "knownMethod", "return 1;");
|
||||
|
||||
using var collector = new InboundApiRequestCounterCollector();
|
||||
|
||||
using var host = await BuildHostAsync(key, method);
|
||||
var client = host.GetTestClient();
|
||||
|
||||
var response = await client.SendAsync(BuildPost("totally-made-up-name", "{}"));
|
||||
|
||||
Assert.Equal(HttpStatusCode.Forbidden, response.StatusCode);
|
||||
var measurements = collector.Measurements;
|
||||
// Cardinality safety: the arbitrary route value is never used as a tag.
|
||||
Assert.DoesNotContain(measurements, m => m.Method == "totally-made-up-name");
|
||||
// The failure path counts the request against the bounded sentinel.
|
||||
Assert.Contains(measurements, m => m.Method == "<forbidden>" && m.Value == 1);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Captures <c>scadabridge.inbound_api.requests</c> measurements (value + the
|
||||
/// <c>method</c> tag) via a <see cref="MeterListener"/> for the duration of a test.
|
||||
/// </summary>
|
||||
private sealed class InboundApiRequestCounterCollector : IDisposable
|
||||
{
|
||||
private readonly MeterListener _listener;
|
||||
private readonly List<(long Value, string? Method)> _measurements = new();
|
||||
private readonly object _gate = new();
|
||||
|
||||
public InboundApiRequestCounterCollector()
|
||||
{
|
||||
_listener = new MeterListener
|
||||
{
|
||||
InstrumentPublished = (instrument, listener) =>
|
||||
{
|
||||
if (instrument.Meter.Name == ScadaBridgeTelemetry.MeterName
|
||||
&& instrument.Name == "scadabridge.inbound_api.requests")
|
||||
{
|
||||
listener.EnableMeasurementEvents(instrument);
|
||||
}
|
||||
},
|
||||
};
|
||||
_listener.SetMeasurementEventCallback<long>((instrument, value, tags, state) =>
|
||||
{
|
||||
string? method = null;
|
||||
foreach (var tag in tags)
|
||||
{
|
||||
if (tag.Key == "method")
|
||||
{
|
||||
method = tag.Value as string;
|
||||
}
|
||||
}
|
||||
|
||||
lock (_gate)
|
||||
{
|
||||
_measurements.Add((value, method));
|
||||
}
|
||||
});
|
||||
_listener.Start();
|
||||
}
|
||||
|
||||
public IReadOnlyList<(long Value, string? Method)> Measurements
|
||||
{
|
||||
get
|
||||
{
|
||||
lock (_gate)
|
||||
{
|
||||
return _measurements.ToList();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void Dispose() => _listener.Dispose();
|
||||
}
|
||||
|
||||
private static HttpRequestMessage BuildPost(string methodName, string body)
|
||||
{
|
||||
var request = new HttpRequestMessage(HttpMethod.Post, "/api/" + methodName)
|
||||
|
||||
@@ -0,0 +1,211 @@
|
||||
using System.Diagnostics.Metrics;
|
||||
using Microsoft.Data.Sqlite;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
|
||||
|
||||
namespace ZB.MOM.WW.ScadaBridge.StoreAndForward.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// WP-14 (telemetry follow-on): verifies the cached buffered-message counter that
|
||||
/// backs the <c>scadabridge.store_and_forward.queue.depth</c> observable gauge tracks
|
||||
/// the live (Pending) queue across the existing enqueue / drain / park / requeue paths,
|
||||
/// and that the sync gauge callback reports it.
|
||||
///
|
||||
/// The gauge is read the way the OpenTelemetry collector reads it — via a
|
||||
/// <see cref="MeterListener"/> that forces an observation (the callback is synchronous
|
||||
/// and does no I/O, which is the whole point of caching the count). <see cref="StartAsync"/>
|
||||
/// seeds the counter from storage and registers the provider against this service
|
||||
/// instance, so the gauge resolves to this test's counter.
|
||||
/// </summary>
|
||||
public class QueueDepthGaugeTests : IAsyncLifetime, IDisposable
|
||||
{
|
||||
private readonly SqliteConnection _keepAlive;
|
||||
private readonly StoreAndForwardStorage _storage;
|
||||
private readonly StoreAndForwardService _service;
|
||||
|
||||
public QueueDepthGaugeTests()
|
||||
{
|
||||
var dbName = $"QueueDepthTests_{Guid.NewGuid():N}";
|
||||
var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared";
|
||||
_keepAlive = new SqliteConnection(connStr);
|
||||
_keepAlive.Open();
|
||||
|
||||
_storage = new StoreAndForwardStorage(connStr, NullLogger<StoreAndForwardStorage>.Instance);
|
||||
|
||||
var options = new StoreAndForwardOptions
|
||||
{
|
||||
DefaultRetryInterval = TimeSpan.Zero,
|
||||
DefaultMaxRetries = 3,
|
||||
// Long interval so no background sweep fires on its own during the test;
|
||||
// sweeps are driven explicitly via RetryPendingMessagesAsync.
|
||||
RetryTimerInterval = TimeSpan.FromMinutes(10)
|
||||
};
|
||||
|
||||
_service = new StoreAndForwardService(
|
||||
_storage, options, NullLogger<StoreAndForwardService>.Instance);
|
||||
}
|
||||
|
||||
public async Task InitializeAsync()
|
||||
{
|
||||
await _storage.InitializeAsync();
|
||||
// StartAsync seeds _bufferedCount from the (empty) store and registers the
|
||||
// queue-depth provider against this service instance.
|
||||
await _service.StartAsync();
|
||||
}
|
||||
|
||||
public async Task DisposeAsync() => await _service.StopAsync();
|
||||
|
||||
public void Dispose() => _keepAlive.Dispose();
|
||||
|
||||
/// <summary>
|
||||
/// Reads the current value of the <c>scadabridge.store_and_forward.queue.depth</c>
|
||||
/// gauge by forcing a synchronous observation through a transient MeterListener —
|
||||
/// exactly the path the Prometheus/OTLP collector exercises on each scrape.
|
||||
/// </summary>
|
||||
private static long ReadQueueDepthGauge()
|
||||
{
|
||||
long observed = -1;
|
||||
using var listener = new MeterListener
|
||||
{
|
||||
InstrumentPublished = (instrument, l) =>
|
||||
{
|
||||
if (instrument.Meter.Name == ScadaBridgeTelemetry.MeterName &&
|
||||
instrument.Name == "scadabridge.store_and_forward.queue.depth")
|
||||
{
|
||||
l.EnableMeasurementEvents(instrument);
|
||||
}
|
||||
}
|
||||
};
|
||||
listener.SetMeasurementEventCallback<long>((_, measurement, _, _) => observed = measurement);
|
||||
listener.Start();
|
||||
listener.RecordObservableInstruments();
|
||||
return observed;
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Gauge_TracksBufferedDepth_AcrossEnqueueDrainAndPark()
|
||||
{
|
||||
// Empty store seeded at StartAsync → gauge reports 0.
|
||||
Assert.Equal(0, ReadQueueDepthGauge());
|
||||
|
||||
// A handler that fails transiently so each enqueue buffers a Pending row
|
||||
// (immediate attempt 0 throws → BufferAsync → +1).
|
||||
var deliver = false;
|
||||
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
|
||||
_ =>
|
||||
{
|
||||
if (!deliver) throw new HttpRequestException("transient");
|
||||
return Task.FromResult(true);
|
||||
});
|
||||
|
||||
// Enqueue 3 → cached depth = 3 → gauge reports 3.
|
||||
for (var i = 0; i < 3; i++)
|
||||
{
|
||||
var r = await _service.EnqueueAsync(
|
||||
StoreAndForwardCategory.ExternalSystem, "api", """{}""");
|
||||
Assert.True(r.WasBuffered);
|
||||
}
|
||||
Assert.Equal(3, ReadQueueDepthGauge());
|
||||
|
||||
// Drain: handler now succeeds → the retry sweep removes all 3 Pending rows → depth 0.
|
||||
deliver = true;
|
||||
await _service.RetryPendingMessagesAsync();
|
||||
Assert.Equal(0, ReadQueueDepthGauge());
|
||||
|
||||
// Park path: buffer one more, then make it park (maxRetries:1 parks after one
|
||||
// sweep). Pending→Parked leaves the live queue → depth back to 0.
|
||||
deliver = false;
|
||||
var parkResult = await _service.EnqueueAsync(
|
||||
StoreAndForwardCategory.ExternalSystem, "api", """{}""", maxRetries: 1);
|
||||
Assert.True(parkResult.WasBuffered);
|
||||
Assert.Equal(1, ReadQueueDepthGauge());
|
||||
|
||||
await _service.RetryPendingMessagesAsync();
|
||||
var parked = await _storage.GetMessageByIdAsync(parkResult.MessageId);
|
||||
Assert.Equal(StoreAndForwardMessageStatus.Parked, parked!.Status);
|
||||
Assert.Equal(0, ReadQueueDepthGauge());
|
||||
|
||||
// Operator requeue: Parked→Pending re-adds to the live queue → depth 1.
|
||||
Assert.True(await _service.RetryParkedMessageAsync(parkResult.MessageId));
|
||||
Assert.Equal(1, ReadQueueDepthGauge());
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Gauge_SeedsFromExistingPendingRows_OnStart()
|
||||
{
|
||||
// Pre-seed two Pending rows directly in storage *before* a fresh service starts,
|
||||
// simulating a process restart over a non-empty buffer. StartAsync must seed the
|
||||
// cached counter from the store so the gauge does not under-report on restart.
|
||||
await _storage.EnqueueAsync(new StoreAndForwardMessage
|
||||
{
|
||||
Id = Guid.NewGuid().ToString("N"),
|
||||
Category = StoreAndForwardCategory.ExternalSystem,
|
||||
Target = "api",
|
||||
PayloadJson = "{}",
|
||||
Status = StoreAndForwardMessageStatus.Pending,
|
||||
CreatedAt = DateTimeOffset.UtcNow,
|
||||
MaxRetries = 3
|
||||
});
|
||||
await _storage.EnqueueAsync(new StoreAndForwardMessage
|
||||
{
|
||||
Id = Guid.NewGuid().ToString("N"),
|
||||
Category = StoreAndForwardCategory.Notification,
|
||||
Target = "list",
|
||||
PayloadJson = "{}",
|
||||
Status = StoreAndForwardMessageStatus.Pending,
|
||||
CreatedAt = DateTimeOffset.UtcNow,
|
||||
MaxRetries = 3
|
||||
});
|
||||
|
||||
var fresh = new StoreAndForwardService(
|
||||
_storage,
|
||||
new StoreAndForwardOptions { RetryTimerInterval = TimeSpan.FromMinutes(10) },
|
||||
NullLogger<StoreAndForwardService>.Instance);
|
||||
try
|
||||
{
|
||||
await fresh.StartAsync();
|
||||
// The fresh service registered itself as the global provider and seeded 2.
|
||||
Assert.Equal(2, ReadQueueDepthGauge());
|
||||
}
|
||||
finally
|
||||
{
|
||||
await fresh.StopAsync();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Review finding (FINDING 1): the startup seed must ADD to whatever the counter
|
||||
/// already holds, not overwrite it. A concurrent <c>BufferAsync</c> can
|
||||
/// <c>Interlocked.Increment</c> <c>_bufferedCount</c> in the window between
|
||||
/// <c>StartAsync</c>'s async <c>COUNT(*)</c> returning and the seed running; with an
|
||||
/// <c>Interlocked.Exchange</c> seed that increment would be clobbered (lost +1). This
|
||||
/// pre-increments the in-memory counter (standing in for that concurrent enqueue),
|
||||
/// then starts the service over an empty store and asserts the pre-increment survives.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task Gauge_SeedAddsToConcurrentPreSeedIncrement_NotClobber()
|
||||
{
|
||||
// Store is empty (StartAsync's pending COUNT(*) = 0), so the only contribution
|
||||
// is the simulated concurrent pre-seed enqueue increment.
|
||||
var fresh = new StoreAndForwardService(
|
||||
_storage,
|
||||
new StoreAndForwardOptions { RetryTimerInterval = TimeSpan.FromMinutes(10) },
|
||||
NullLogger<StoreAndForwardService>.Instance);
|
||||
|
||||
// Stand in for a BufferAsync increment that landed before StartAsync seeded.
|
||||
fresh.TestOnly_IncrementBufferedCount();
|
||||
|
||||
try
|
||||
{
|
||||
await fresh.StartAsync();
|
||||
// Add(0 seed) over the pre-existing +1 → 1. An Exchange(0) seed would clobber
|
||||
// it to 0, losing the concurrent enqueue — the bug this fix prevents.
|
||||
Assert.Equal(1, ReadQueueDepthGauge());
|
||||
}
|
||||
finally
|
||||
{
|
||||
await fresh.StopAsync();
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user