Compare commits
20 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 145d2668e2 | |||
| 9668a4e84a | |||
| 6dbbc7ad04 | |||
| aac59c9fae | |||
| 9bca6aae61 | |||
| 7d16f8f275 | |||
| ccf43312e8 | |||
| a5f8651b0f | |||
| 15a626390b | |||
| 782fb73015 | |||
| 547b685a42 | |||
| 877f2e200b | |||
| c41cb41c7b | |||
| fe25ac3e51 | |||
| bbc9f09268 | |||
| 43f5886024 | |||
| f743ffaad2 | |||
| b3070c0bda | |||
| 20a31835cf | |||
| 59dca0d5fd |
@@ -75,8 +75,11 @@
|
|||||||
<PackageVersion Include="ZB.MOM.WW.Health" Version="0.1.0" />
|
<PackageVersion Include="ZB.MOM.WW.Health" Version="0.1.0" />
|
||||||
<PackageVersion Include="ZB.MOM.WW.Health.Akka" Version="0.1.0" />
|
<PackageVersion Include="ZB.MOM.WW.Health.Akka" Version="0.1.0" />
|
||||||
<PackageVersion Include="ZB.MOM.WW.Health.EntityFrameworkCore" Version="0.1.0" />
|
<PackageVersion Include="ZB.MOM.WW.Health.EntityFrameworkCore" Version="0.1.0" />
|
||||||
|
<PackageVersion Include="ZB.MOM.WW.Telemetry" Version="0.1.0" />
|
||||||
|
<PackageVersion Include="ZB.MOM.WW.Telemetry.Serilog" Version="0.1.0" />
|
||||||
<PackageVersion Include="ZB.MOM.WW.MxGateway.Client" Version="0.1.0" />
|
<PackageVersion Include="ZB.MOM.WW.MxGateway.Client" Version="0.1.0" />
|
||||||
<PackageVersion Include="ZB.MOM.WW.MxGateway.Contracts" Version="0.1.0" />
|
<PackageVersion Include="ZB.MOM.WW.MxGateway.Contracts" Version="0.1.0" />
|
||||||
|
<PackageVersion Include="ZB.MOM.WW.Configuration" Version="0.1.0" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
</Project>
|
</Project>
|
||||||
|
|||||||
@@ -6,7 +6,8 @@
|
|||||||
"NodeHostname": "scadabridge-site-a-a",
|
"NodeHostname": "scadabridge-site-a-a",
|
||||||
"SiteId": "site-a",
|
"SiteId": "site-a",
|
||||||
"RemotingPort": 8082,
|
"RemotingPort": 8082,
|
||||||
"GrpcPort": 8083
|
"GrpcPort": 8083,
|
||||||
|
"MetricsPort": 8084
|
||||||
},
|
},
|
||||||
"Cluster": {
|
"Cluster": {
|
||||||
"SeedNodes": [
|
"SeedNodes": [
|
||||||
|
|||||||
@@ -6,7 +6,8 @@
|
|||||||
"NodeHostname": "scadabridge-site-a-b",
|
"NodeHostname": "scadabridge-site-a-b",
|
||||||
"SiteId": "site-a",
|
"SiteId": "site-a",
|
||||||
"RemotingPort": 8082,
|
"RemotingPort": 8082,
|
||||||
"GrpcPort": 8083
|
"GrpcPort": 8083,
|
||||||
|
"MetricsPort": 8084
|
||||||
},
|
},
|
||||||
"Cluster": {
|
"Cluster": {
|
||||||
"SeedNodes": [
|
"SeedNodes": [
|
||||||
|
|||||||
@@ -6,7 +6,8 @@
|
|||||||
"NodeHostname": "scadabridge-site-b-a",
|
"NodeHostname": "scadabridge-site-b-a",
|
||||||
"SiteId": "site-b",
|
"SiteId": "site-b",
|
||||||
"RemotingPort": 8082,
|
"RemotingPort": 8082,
|
||||||
"GrpcPort": 8083
|
"GrpcPort": 8083,
|
||||||
|
"MetricsPort": 8084
|
||||||
},
|
},
|
||||||
"Cluster": {
|
"Cluster": {
|
||||||
"SeedNodes": [
|
"SeedNodes": [
|
||||||
|
|||||||
@@ -6,7 +6,8 @@
|
|||||||
"NodeHostname": "scadabridge-site-b-b",
|
"NodeHostname": "scadabridge-site-b-b",
|
||||||
"SiteId": "site-b",
|
"SiteId": "site-b",
|
||||||
"RemotingPort": 8082,
|
"RemotingPort": 8082,
|
||||||
"GrpcPort": 8083
|
"GrpcPort": 8083,
|
||||||
|
"MetricsPort": 8084
|
||||||
},
|
},
|
||||||
"Cluster": {
|
"Cluster": {
|
||||||
"SeedNodes": [
|
"SeedNodes": [
|
||||||
|
|||||||
@@ -6,7 +6,8 @@
|
|||||||
"NodeHostname": "scadabridge-site-c-a",
|
"NodeHostname": "scadabridge-site-c-a",
|
||||||
"SiteId": "site-c",
|
"SiteId": "site-c",
|
||||||
"RemotingPort": 8082,
|
"RemotingPort": 8082,
|
||||||
"GrpcPort": 8083
|
"GrpcPort": 8083,
|
||||||
|
"MetricsPort": 8084
|
||||||
},
|
},
|
||||||
"Cluster": {
|
"Cluster": {
|
||||||
"SeedNodes": [
|
"SeedNodes": [
|
||||||
|
|||||||
@@ -6,7 +6,8 @@
|
|||||||
"NodeHostname": "scadabridge-site-c-b",
|
"NodeHostname": "scadabridge-site-c-b",
|
||||||
"SiteId": "site-c",
|
"SiteId": "site-c",
|
||||||
"RemotingPort": 8082,
|
"RemotingPort": 8082,
|
||||||
"GrpcPort": 8083
|
"GrpcPort": 8083,
|
||||||
|
"MetricsPort": 8084
|
||||||
},
|
},
|
||||||
"Cluster": {
|
"Cluster": {
|
||||||
"SeedNodes": [
|
"SeedNodes": [
|
||||||
|
|||||||
@@ -18,6 +18,9 @@
|
|||||||
<package pattern="ZB.MOM.WW.MxGateway.*" />
|
<package pattern="ZB.MOM.WW.MxGateway.*" />
|
||||||
<package pattern="ZB.MOM.WW.Health" />
|
<package pattern="ZB.MOM.WW.Health" />
|
||||||
<package pattern="ZB.MOM.WW.Health.*" />
|
<package pattern="ZB.MOM.WW.Health.*" />
|
||||||
|
<package pattern="ZB.MOM.WW.Telemetry" />
|
||||||
|
<package pattern="ZB.MOM.WW.Telemetry.*" />
|
||||||
|
<package pattern="ZB.MOM.WW.Configuration" />
|
||||||
</packageSource>
|
</packageSource>
|
||||||
</packageSourceMapping>
|
</packageSourceMapping>
|
||||||
<!--
|
<!--
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
using Microsoft.Extensions.Options;
|
using ZB.MOM.WW.Configuration;
|
||||||
|
|
||||||
namespace ZB.MOM.WW.ScadaBridge.AuditLog.Configuration;
|
namespace ZB.MOM.WW.ScadaBridge.AuditLog.Configuration;
|
||||||
|
|
||||||
@@ -13,7 +13,7 @@ namespace ZB.MOM.WW.ScadaBridge.AuditLog.Configuration;
|
|||||||
/// drop in-flight investigations, too long would defeat the partition-switch
|
/// drop in-flight investigations, too long would defeat the partition-switch
|
||||||
/// purge's purpose.
|
/// purge's purpose.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public sealed class AuditLogOptionsValidator : IValidateOptions<AuditLogOptions>
|
public sealed class AuditLogOptionsValidator : OptionsValidatorBase<AuditLogOptions>
|
||||||
{
|
{
|
||||||
/// <summary>Inclusive lower bound for <see cref="AuditLogOptions.RetentionDays"/>.</summary>
|
/// <summary>Inclusive lower bound for <see cref="AuditLogOptions.RetentionDays"/>.</summary>
|
||||||
public const int MinRetentionDays = 30;
|
public const int MinRetentionDays = 30;
|
||||||
@@ -28,43 +28,29 @@ public sealed class AuditLogOptionsValidator : IValidateOptions<AuditLogOptions>
|
|||||||
public const int MaxInboundMaxBytes = 16_777_216;
|
public const int MaxInboundMaxBytes = 16_777_216;
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public ValidateOptionsResult Validate(string? name, AuditLogOptions options)
|
protected override void Validate(ValidationBuilder builder, AuditLogOptions options)
|
||||||
{
|
{
|
||||||
ArgumentNullException.ThrowIfNull(options);
|
builder.RequireThat(options.DefaultCapBytes > 0,
|
||||||
|
|
||||||
var failures = new List<string>();
|
|
||||||
|
|
||||||
if (options.DefaultCapBytes <= 0)
|
|
||||||
{
|
|
||||||
failures.Add(
|
|
||||||
$"AuditLog:{nameof(AuditLogOptions.DefaultCapBytes)} ({options.DefaultCapBytes}) " +
|
$"AuditLog:{nameof(AuditLogOptions.DefaultCapBytes)} ({options.DefaultCapBytes}) " +
|
||||||
"must be > 0; it drives payload-summary truncation in audit writers.");
|
"must be > 0; it drives payload-summary truncation in audit writers.");
|
||||||
}
|
|
||||||
|
|
||||||
if (options.ErrorCapBytes < options.DefaultCapBytes)
|
builder.RequireThat(options.ErrorCapBytes >= options.DefaultCapBytes,
|
||||||
{
|
|
||||||
failures.Add(
|
|
||||||
$"AuditLog:{nameof(AuditLogOptions.ErrorCapBytes)} ({options.ErrorCapBytes}) " +
|
$"AuditLog:{nameof(AuditLogOptions.ErrorCapBytes)} ({options.ErrorCapBytes}) " +
|
||||||
$"must be >= {nameof(AuditLogOptions.DefaultCapBytes)} ({options.DefaultCapBytes}); " +
|
$"must be >= {nameof(AuditLogOptions.DefaultCapBytes)} ({options.DefaultCapBytes}); " +
|
||||||
"the error-row cap is intended to capture more detail than the happy-path summary.");
|
"the error-row cap is intended to capture more detail than the happy-path summary.");
|
||||||
}
|
|
||||||
|
|
||||||
if (options.RetentionDays < MinRetentionDays || options.RetentionDays > MaxRetentionDays)
|
// Valid when RetentionDays is within [Min, Max] inclusive. The De Morgan'd
|
||||||
{
|
// guard !(below Min OR above Max) is equivalent to (>= Min AND <= Max).
|
||||||
failures.Add(
|
builder.RequireThat(
|
||||||
|
!(options.RetentionDays < MinRetentionDays || options.RetentionDays > MaxRetentionDays),
|
||||||
$"AuditLog:{nameof(AuditLogOptions.RetentionDays)} ({options.RetentionDays}) " +
|
$"AuditLog:{nameof(AuditLogOptions.RetentionDays)} ({options.RetentionDays}) " +
|
||||||
$"must be in [{MinRetentionDays}, {MaxRetentionDays}] days.");
|
$"must be in [{MinRetentionDays}, {MaxRetentionDays}] days.");
|
||||||
}
|
|
||||||
|
|
||||||
if (options.InboundMaxBytes < MinInboundMaxBytes || options.InboundMaxBytes > MaxInboundMaxBytes)
|
// Valid when InboundMaxBytes is within [Min, Max] inclusive. The De Morgan'd
|
||||||
{
|
// guard !(below Min OR above Max) is equivalent to (>= Min AND <= Max).
|
||||||
failures.Add(
|
builder.RequireThat(
|
||||||
|
!(options.InboundMaxBytes < MinInboundMaxBytes || options.InboundMaxBytes > MaxInboundMaxBytes),
|
||||||
$"AuditLog:{nameof(AuditLogOptions.InboundMaxBytes)} ({options.InboundMaxBytes}) " +
|
$"AuditLog:{nameof(AuditLogOptions.InboundMaxBytes)} ({options.InboundMaxBytes}) " +
|
||||||
$"must be in [{MinInboundMaxBytes}, {MaxInboundMaxBytes}] bytes.");
|
$"must be in [{MinInboundMaxBytes}, {MaxInboundMaxBytes}] bytes.");
|
||||||
}
|
}
|
||||||
|
|
||||||
return failures.Count == 0
|
|
||||||
? ValidateOptionsResult.Success
|
|
||||||
: ValidateOptionsResult.Fail(failures);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ using Microsoft.Extensions.DependencyInjection;
|
|||||||
using Microsoft.Extensions.DependencyInjection.Extensions;
|
using Microsoft.Extensions.DependencyInjection.Extensions;
|
||||||
using Microsoft.Extensions.Hosting;
|
using Microsoft.Extensions.Hosting;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using Microsoft.Extensions.Options;
|
using ZB.MOM.WW.Configuration;
|
||||||
using ZB.MOM.WW.ScadaBridge.AuditLog.Central;
|
using ZB.MOM.WW.ScadaBridge.AuditLog.Central;
|
||||||
using ZB.MOM.WW.ScadaBridge.AuditLog.Configuration;
|
using ZB.MOM.WW.ScadaBridge.AuditLog.Configuration;
|
||||||
using ZB.MOM.WW.ScadaBridge.AuditLog.Payload;
|
using ZB.MOM.WW.ScadaBridge.AuditLog.Payload;
|
||||||
@@ -62,10 +62,12 @@ public static class ServiceCollectionExtensions
|
|||||||
ArgumentNullException.ThrowIfNull(config);
|
ArgumentNullException.ThrowIfNull(config);
|
||||||
|
|
||||||
// M1: top-level AuditLogOptions + validator (redaction policy, payload caps, etc.).
|
// M1: top-level AuditLogOptions + validator (redaction policy, payload caps, etc.).
|
||||||
services.AddOptions<AuditLogOptions>()
|
// Collapsed onto the shared ZB.MOM.WW.Configuration helper: it binds the
|
||||||
.Bind(config.GetSection(ConfigSectionName))
|
// "AuditLog" section, registers the validator, and enables ValidateOnStart in
|
||||||
.ValidateOnStart();
|
// one call. Same section path as before; AddAuditLog is call-once per
|
||||||
services.AddSingleton<IValidateOptions<AuditLogOptions>, AuditLogOptionsValidator>();
|
// collection, and the helper's TryAddEnumerable is idempotent for the
|
||||||
|
// validator (a strict improvement over the previous AddSingleton).
|
||||||
|
services.AddValidatedOptions<AuditLogOptions, AuditLogOptionsValidator>(config, ConfigSectionName);
|
||||||
|
|
||||||
// M5 Bundle A: payload filter — truncates oversized RequestSummary /
|
// M5 Bundle A: payload filter — truncates oversized RequestSummary /
|
||||||
// ResponseSummary / ErrorDetail / Extra fields between event
|
// ResponseSummary / ErrorDetail / Extra fields between event
|
||||||
|
|||||||
@@ -16,6 +16,7 @@
|
|||||||
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
|
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
|
||||||
<PackageReference Include="Microsoft.Extensions.Options" />
|
<PackageReference Include="Microsoft.Extensions.Options" />
|
||||||
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" />
|
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" />
|
||||||
|
<PackageReference Include="ZB.MOM.WW.Configuration" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
using Microsoft.Extensions.Options;
|
using ZB.MOM.WW.Configuration;
|
||||||
|
|
||||||
namespace ZB.MOM.WW.ScadaBridge.ClusterInfrastructure;
|
namespace ZB.MOM.WW.ScadaBridge.ClusterInfrastructure;
|
||||||
|
|
||||||
@@ -10,7 +10,7 @@ namespace ZB.MOM.WW.ScadaBridge.ClusterInfrastructure;
|
|||||||
/// Registered with <c>ValidateOnStart()</c> so a bad <c>appsettings.json</c>
|
/// Registered with <c>ValidateOnStart()</c> so a bad <c>appsettings.json</c>
|
||||||
/// fails fast at boot rather than failing far from the cause.
|
/// fails fast at boot rather than failing far from the cause.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public sealed class ClusterOptionsValidator : IValidateOptions<ClusterOptions>
|
public sealed class ClusterOptionsValidator : OptionsValidatorBase<ClusterOptions>
|
||||||
{
|
{
|
||||||
/// <summary>Split-brain resolver strategies safe for ScadaBridge's two-node clusters.</summary>
|
/// <summary>Split-brain resolver strategies safe for ScadaBridge's two-node clusters.</summary>
|
||||||
private static readonly HashSet<string> AllowedStrategies = new(StringComparer.OrdinalIgnoreCase)
|
private static readonly HashSet<string> AllowedStrategies = new(StringComparer.OrdinalIgnoreCase)
|
||||||
@@ -19,77 +19,51 @@ public sealed class ClusterOptionsValidator : IValidateOptions<ClusterOptions>
|
|||||||
};
|
};
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Validates the cluster options, returning a failure result if any critical settings are misconfigured.
|
/// Validates the cluster options, recording a failure if any critical settings are misconfigured.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="name">Named options instance name (unused; all instances are validated identically).</param>
|
/// <param name="builder">The accumulator to record failures on.</param>
|
||||||
/// <param name="options">The cluster options to validate.</param>
|
/// <param name="options">The cluster options to validate.</param>
|
||||||
public ValidateOptionsResult Validate(string? name, ClusterOptions options)
|
protected override void Validate(ValidationBuilder builder, ClusterOptions options)
|
||||||
{
|
|
||||||
var failures = new List<string>();
|
|
||||||
|
|
||||||
if (options.SeedNodes is null || options.SeedNodes.Count < 2)
|
|
||||||
{
|
{
|
||||||
// CI-012: design doc states "both nodes are seed nodes — each node lists
|
// CI-012: design doc states "both nodes are seed nodes — each node lists
|
||||||
// both itself and its partner" so a properly-configured deployment lists
|
// both itself and its partner" so a properly-configured deployment lists
|
||||||
// two. Accepting a single-seed configuration silently defeats the
|
// two. Accepting a single-seed configuration silently defeats the
|
||||||
// "no startup ordering dependency" guarantee called out by
|
// "no startup ordering dependency" guarantee called out by
|
||||||
// Component-ClusterInfrastructure.md (Node Configuration).
|
// Component-ClusterInfrastructure.md (Node Configuration).
|
||||||
failures.Add(
|
builder.RequireThat(options.SeedNodes is not null && options.SeedNodes.Count >= 2,
|
||||||
"ClusterOptions.SeedNodes must contain at least 2 seed nodes "
|
"ClusterOptions.SeedNodes must contain at least 2 seed nodes "
|
||||||
+ "(Component-ClusterInfrastructure.md → Node Configuration: "
|
+ "(Component-ClusterInfrastructure.md → Node Configuration: "
|
||||||
+ "both nodes are seed nodes); a single-seed configuration defeats "
|
+ "both nodes are seed nodes); a single-seed configuration defeats "
|
||||||
+ "the no-startup-ordering-dependency guarantee.");
|
+ "the no-startup-ordering-dependency guarantee.");
|
||||||
}
|
|
||||||
|
|
||||||
if (string.IsNullOrWhiteSpace(options.SplitBrainResolverStrategy)
|
builder.RequireThat(
|
||||||
|| !AllowedStrategies.Contains(options.SplitBrainResolverStrategy))
|
!string.IsNullOrWhiteSpace(options.SplitBrainResolverStrategy)
|
||||||
{
|
&& AllowedStrategies.Contains(options.SplitBrainResolverStrategy),
|
||||||
failures.Add(
|
|
||||||
$"ClusterOptions.SplitBrainResolverStrategy must be 'keep-oldest' for a two-node cluster; " +
|
$"ClusterOptions.SplitBrainResolverStrategy must be 'keep-oldest' for a two-node cluster; " +
|
||||||
$"'{options.SplitBrainResolverStrategy}' would risk a total cluster shutdown on a partition.");
|
$"'{options.SplitBrainResolverStrategy}' would risk a total cluster shutdown on a partition.");
|
||||||
}
|
|
||||||
|
|
||||||
if (options.MinNrOfMembers != 1)
|
builder.RequireThat(options.MinNrOfMembers == 1,
|
||||||
{
|
|
||||||
failures.Add(
|
|
||||||
$"ClusterOptions.MinNrOfMembers must be 1 (was {options.MinNrOfMembers}); " +
|
$"ClusterOptions.MinNrOfMembers must be 1 (was {options.MinNrOfMembers}); " +
|
||||||
"any other value blocks the cluster singleton after failover and halts all data collection.");
|
"any other value blocks the cluster singleton after failover and halts all data collection.");
|
||||||
}
|
|
||||||
|
|
||||||
if (options.StableAfter <= TimeSpan.Zero)
|
builder.RequireThat(options.StableAfter > TimeSpan.Zero,
|
||||||
{
|
"ClusterOptions.StableAfter must be a positive duration.");
|
||||||
failures.Add("ClusterOptions.StableAfter must be a positive duration.");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (options.HeartbeatInterval <= TimeSpan.Zero)
|
builder.RequireThat(options.HeartbeatInterval > TimeSpan.Zero,
|
||||||
{
|
"ClusterOptions.HeartbeatInterval must be a positive duration.");
|
||||||
failures.Add("ClusterOptions.HeartbeatInterval must be a positive duration.");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (options.FailureDetectionThreshold <= TimeSpan.Zero)
|
builder.RequireThat(options.FailureDetectionThreshold > TimeSpan.Zero,
|
||||||
{
|
"ClusterOptions.FailureDetectionThreshold must be a positive duration.");
|
||||||
failures.Add("ClusterOptions.FailureDetectionThreshold must be a positive duration.");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (options.HeartbeatInterval >= options.FailureDetectionThreshold)
|
builder.RequireThat(options.HeartbeatInterval < options.FailureDetectionThreshold,
|
||||||
{
|
|
||||||
failures.Add(
|
|
||||||
$"ClusterOptions.HeartbeatInterval ({options.HeartbeatInterval}) must be well below " +
|
$"ClusterOptions.HeartbeatInterval ({options.HeartbeatInterval}) must be well below " +
|
||||||
$"FailureDetectionThreshold ({options.FailureDetectionThreshold}); otherwise nodes are " +
|
$"FailureDetectionThreshold ({options.FailureDetectionThreshold}); otherwise nodes are " +
|
||||||
"declared unreachable before a heartbeat can arrive.");
|
"declared unreachable before a heartbeat can arrive.");
|
||||||
}
|
|
||||||
|
|
||||||
if (!options.DownIfAlone)
|
builder.RequireThat(options.DownIfAlone,
|
||||||
{
|
|
||||||
failures.Add(
|
|
||||||
"ClusterOptions.DownIfAlone must be true for the keep-oldest resolver "
|
"ClusterOptions.DownIfAlone must be true for the keep-oldest resolver "
|
||||||
+ "(Component-ClusterInfrastructure.md → Split-Brain Resolution); with it false the "
|
+ "(Component-ClusterInfrastructure.md → Split-Brain Resolution); with it false the "
|
||||||
+ "oldest node can run as an isolated single-node cluster during a partition while the "
|
+ "oldest node can run as an isolated single-node cluster during a partition while the "
|
||||||
+ "younger node forms its own, producing two live clusters.");
|
+ "younger node forms its own, producing two live clusters.");
|
||||||
}
|
}
|
||||||
|
|
||||||
return failures.Count > 0
|
|
||||||
? ValidateOptionsResult.Fail(failures)
|
|
||||||
: ValidateOptionsResult.Success;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
+1
@@ -10,6 +10,7 @@
|
|||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" />
|
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" />
|
||||||
<PackageReference Include="Microsoft.Extensions.Options" />
|
<PackageReference Include="Microsoft.Extensions.Options" />
|
||||||
|
<PackageReference Include="ZB.MOM.WW.Configuration" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
|
|||||||
@@ -0,0 +1,94 @@
|
|||||||
|
using System.Diagnostics.Metrics;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.ScadaBridge.Commons.Observability;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Central <see cref="Meter"/> + instrument definitions for ScadaBridge's application
|
||||||
|
/// telemetry, modelled on OtOpcUa's <c>OtOpcUaTelemetry</c>. Modules emit through these
|
||||||
|
/// pre-created instruments so a single OpenTelemetry / Prometheus binding in
|
||||||
|
/// <c>Host</c> (registered via <c>AddZbTelemetry</c> with this meter named in
|
||||||
|
/// <c>ZbTelemetryOptions.Meters</c>) catches everything. No exporter is required —
|
||||||
|
/// instruments are no-op until a listener attaches, so tests and dev hosts pay nothing
|
||||||
|
/// for instrumentation that nobody scrapes.
|
||||||
|
///
|
||||||
|
/// Instrument names follow the OpenTelemetry semantic convention pattern
|
||||||
|
/// <c>scadabridge.<subsystem>.<event></c>. This task defines the instruments and
|
||||||
|
/// their emit helpers; four later tasks wire the actual emit points. Until those land the
|
||||||
|
/// helpers are dormant but inert — calling them is safe and simply records against a meter
|
||||||
|
/// that nothing observes.
|
||||||
|
/// </summary>
|
||||||
|
public static class ScadaBridgeTelemetry
|
||||||
|
{
|
||||||
|
/// <summary>The meter name registered with OTel via <c>ZbTelemetryOptions.Meters</c>.</summary>
|
||||||
|
public const string MeterName = "ZB.MOM.WW.ScadaBridge";
|
||||||
|
|
||||||
|
/// <summary>Singleton <see cref="Meter"/> all instruments hang off.</summary>
|
||||||
|
private static readonly Meter Meter = new(MeterName);
|
||||||
|
|
||||||
|
// ---------------- Counters ----------------
|
||||||
|
|
||||||
|
/// <summary>Incremented each time a deployment is successfully applied.</summary>
|
||||||
|
private static readonly Counter<long> _deploymentsApplied =
|
||||||
|
Meter.CreateCounter<long>("scadabridge.deployments.applied", unit: "1",
|
||||||
|
description: "Deployments applied.");
|
||||||
|
|
||||||
|
/// <summary>Incremented for each inbound API request, tagged with the API method.</summary>
|
||||||
|
private static readonly Counter<long> _inboundApiRequests =
|
||||||
|
Meter.CreateCounter<long>("scadabridge.inbound_api.requests", unit: "1",
|
||||||
|
description: "Inbound API requests, tagged by method.");
|
||||||
|
|
||||||
|
// ---------------- Observable gauges ----------------
|
||||||
|
|
||||||
|
/// <summary>Current count of open site connections, mutated via <see cref="Interlocked"/>.</summary>
|
||||||
|
private static long _siteConnectionsUp;
|
||||||
|
|
||||||
|
/// <summary>Provider that yields the live StoreAndForward queue depth; set by a later task.</summary>
|
||||||
|
private static Func<long>? _queueDepthProvider;
|
||||||
|
|
||||||
|
#pragma warning disable IDE0052 // Held to keep the observable gauges alive for the meter's lifetime.
|
||||||
|
/// <summary>Gauge reporting the number of currently open site connections.</summary>
|
||||||
|
private static readonly ObservableGauge<long> _siteConnectionUp =
|
||||||
|
Meter.CreateObservableGauge<long>("scadabridge.site.connection.up",
|
||||||
|
() => Interlocked.Read(ref _siteConnectionsUp),
|
||||||
|
description: "Number of currently open site connections.");
|
||||||
|
|
||||||
|
/// <summary>Gauge reporting the current StoreAndForward queue depth via the registered provider.</summary>
|
||||||
|
private static readonly ObservableGauge<long> _storeAndForwardQueueDepth =
|
||||||
|
Meter.CreateObservableGauge<long>("scadabridge.store_and_forward.queue.depth",
|
||||||
|
() => Volatile.Read(ref _queueDepthProvider) is { } p ? p() : 0L,
|
||||||
|
unit: "items",
|
||||||
|
description: "Current StoreAndForward queue depth.");
|
||||||
|
#pragma warning restore IDE0052
|
||||||
|
|
||||||
|
// ---------------- Emit helpers ----------------
|
||||||
|
|
||||||
|
/// <summary>Records that a deployment was applied.</summary>
|
||||||
|
public static void RecordDeploymentApplied() => _deploymentsApplied.Add(1);
|
||||||
|
|
||||||
|
/// <summary>Records an inbound API request for the given <paramref name="method"/>.</summary>
|
||||||
|
/// <param name="method">The API method the request targeted.</param>
|
||||||
|
public static void RecordInboundApiRequest(string method) =>
|
||||||
|
_inboundApiRequests.Add(1, new KeyValuePair<string, object?>("method", method));
|
||||||
|
|
||||||
|
/// <summary>Records that a site connection opened (increments the up-count gauge).</summary>
|
||||||
|
public static void SiteConnectionOpened() => Interlocked.Increment(ref _siteConnectionsUp);
|
||||||
|
|
||||||
|
/// <summary>Records that a site connection closed (decrements the up-count gauge).</summary>
|
||||||
|
public static void SiteConnectionClosed() => Interlocked.Decrement(ref _siteConnectionsUp);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Registers the provider the StoreAndForward queue-depth gauge reads on each observation.
|
||||||
|
/// A later task supplies a provider that reads the real StoreAndForward depth. A null
|
||||||
|
/// provider is ignored so the gauge falls back to reporting 0.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="provider">A callback returning the current queue depth.</param>
|
||||||
|
public static void SetQueueDepthProvider(Func<long> provider)
|
||||||
|
{
|
||||||
|
if (provider is null)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Volatile.Write(ref _queueDepthProvider, provider);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -7,6 +7,7 @@ using Microsoft.Extensions.Options;
|
|||||||
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Audit;
|
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Audit;
|
||||||
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
|
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
|
||||||
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Audit;
|
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Audit;
|
||||||
|
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
|
||||||
using GrpcStatus = Grpc.Core.Status;
|
using GrpcStatus = Grpc.Core.Status;
|
||||||
|
|
||||||
namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc;
|
namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc;
|
||||||
@@ -264,6 +265,14 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase
|
|||||||
"Stream {CorrelationId} started for {Instance} (subscription {SubscriptionId})",
|
"Stream {CorrelationId} started for {Instance} (subscription {SubscriptionId})",
|
||||||
request.CorrelationId, request.InstanceUniqueName, subscriptionId);
|
request.CorrelationId, request.InstanceUniqueName, subscriptionId);
|
||||||
|
|
||||||
|
// Telemetry follow-on: the connection is now fully established (Subscribe
|
||||||
|
// succeeded, so no leak via the catch above). Count it up here and balance
|
||||||
|
// it in the finally below so the scadabridge.site.connection.up gauge is
|
||||||
|
// decremented on EVERY exit path — normal completion, client-cancel /
|
||||||
|
// duplicate-replacement (OperationCanceledException), server shutdown
|
||||||
|
// (CancelAllStreams -> Cts.Cancel), and any other exception — guaranteeing
|
||||||
|
// exactly one Closed per Opened and a gauge that never drifts up.
|
||||||
|
ScadaBridgeTelemetry.SiteConnectionOpened();
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
await foreach (var evt in channel.Reader.ReadAllAsync(streamCts.Token))
|
await foreach (var evt in channel.Reader.ReadAllAsync(streamCts.Token))
|
||||||
@@ -277,6 +286,7 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase
|
|||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
{
|
{
|
||||||
|
ScadaBridgeTelemetry.SiteConnectionClosed();
|
||||||
_streamSubscriber.RemoveSubscriber(relayActor);
|
_streamSubscriber.RemoveSubscriber(relayActor);
|
||||||
_actorSystem!.Stop(relayActor);
|
_actorSystem!.Stop(relayActor);
|
||||||
channel.Writer.TryComplete();
|
channel.Writer.TryComplete();
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
|
|||||||
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
|
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
|
||||||
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment;
|
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment;
|
||||||
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Lifecycle;
|
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Lifecycle;
|
||||||
|
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
|
||||||
using ZB.MOM.WW.ScadaBridge.Commons.Types;
|
using ZB.MOM.WW.ScadaBridge.Commons.Types;
|
||||||
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
|
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
|
||||||
using ZB.MOM.WW.ScadaBridge.Commons.Types.Flattening;
|
using ZB.MOM.WW.ScadaBridge.Commons.Types.Flattening;
|
||||||
@@ -244,6 +245,16 @@ public class DeploymentService
|
|||||||
|
|
||||||
if (response.Status == DeploymentStatus.Success)
|
if (response.Status == DeploymentStatus.Success)
|
||||||
{
|
{
|
||||||
|
// Telemetry: one instance deployment successfully applied to a
|
||||||
|
// site. Counted once per successful deploy operation (the unit
|
||||||
|
// of scadabridge.deployments.applied — one DeployInstanceAsync
|
||||||
|
// deploys exactly one instance to one site). Emitted only on this
|
||||||
|
// confirmed-Success path, so failures, timeouts/retries (the
|
||||||
|
// catch block), and the reconciliation path (which recovers a
|
||||||
|
// PRIOR timed-out apply rather than performing a fresh one) do
|
||||||
|
// not increment it.
|
||||||
|
ScadaBridgeTelemetry.RecordDeploymentApplied();
|
||||||
|
|
||||||
// The site has applied the deployment. The post-success
|
// The site has applied the deployment. The post-success
|
||||||
// persistence below is best-effort: a failure here must be
|
// persistence below is best-effort: a failure here must be
|
||||||
// logged loudly for operator reconciliation but must not flip
|
// logged loudly for operator reconciliation but must not flip
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
using Microsoft.Extensions.Options;
|
using ZB.MOM.WW.Configuration;
|
||||||
|
|
||||||
namespace ZB.MOM.WW.ScadaBridge.HealthMonitoring;
|
namespace ZB.MOM.WW.ScadaBridge.HealthMonitoring;
|
||||||
|
|
||||||
@@ -14,51 +14,40 @@ namespace ZB.MOM.WW.ScadaBridge.HealthMonitoring;
|
|||||||
/// <c>ValidateOnStart()</c> so a bad <c>ScadaBridge:HealthMonitoring</c> section
|
/// <c>ValidateOnStart()</c> so a bad <c>ScadaBridge:HealthMonitoring</c> section
|
||||||
/// fails fast at boot with a clear, key-naming message.
|
/// fails fast at boot with a clear, key-naming message.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public sealed class HealthMonitoringOptionsValidator : IValidateOptions<HealthMonitoringOptions>
|
public sealed class HealthMonitoringOptionsValidator : OptionsValidatorBase<HealthMonitoringOptions>
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Validates the health monitoring options, returning a failure result if any interval values are non-positive.
|
/// Validates the health monitoring options, recording a failure if any interval values are non-positive.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="name">Named options instance name (unused).</param>
|
/// <param name="builder">The accumulator to record failures on.</param>
|
||||||
/// <param name="options">The health monitoring options to validate.</param>
|
/// <param name="options">The health monitoring options to validate.</param>
|
||||||
public ValidateOptionsResult Validate(string? name, HealthMonitoringOptions options)
|
protected override void Validate(ValidationBuilder builder, HealthMonitoringOptions options)
|
||||||
{
|
{
|
||||||
var failures = new List<string>();
|
builder.RequireThat(options.ReportInterval > TimeSpan.Zero,
|
||||||
|
|
||||||
if (options.ReportInterval <= TimeSpan.Zero)
|
|
||||||
{
|
|
||||||
failures.Add(
|
|
||||||
$"ScadaBridge:HealthMonitoring:ReportInterval must be a positive duration " +
|
$"ScadaBridge:HealthMonitoring:ReportInterval must be a positive duration " +
|
||||||
$"(was {options.ReportInterval}); it is used directly as a PeriodicTimer period.");
|
$"(was {options.ReportInterval}); it is used directly as a PeriodicTimer period.");
|
||||||
}
|
|
||||||
|
|
||||||
if (options.OfflineTimeout <= TimeSpan.Zero)
|
builder.RequireThat(options.OfflineTimeout > TimeSpan.Zero,
|
||||||
{
|
|
||||||
failures.Add(
|
|
||||||
$"ScadaBridge:HealthMonitoring:OfflineTimeout must be a positive duration " +
|
$"ScadaBridge:HealthMonitoring:OfflineTimeout must be a positive duration " +
|
||||||
$"(was {options.OfflineTimeout}); it drives the offline-check PeriodicTimer cadence.");
|
$"(was {options.OfflineTimeout}); it drives the offline-check PeriodicTimer cadence.");
|
||||||
}
|
|
||||||
|
|
||||||
if (options.CentralOfflineTimeout <= TimeSpan.Zero)
|
builder.RequireThat(options.CentralOfflineTimeout > TimeSpan.Zero,
|
||||||
{
|
|
||||||
failures.Add(
|
|
||||||
$"ScadaBridge:HealthMonitoring:CentralOfflineTimeout must be a positive duration " +
|
$"ScadaBridge:HealthMonitoring:CentralOfflineTimeout must be a positive duration " +
|
||||||
$"(was {options.CentralOfflineTimeout}).");
|
$"(was {options.CentralOfflineTimeout}).");
|
||||||
}
|
|
||||||
|
|
||||||
if (options.OfflineTimeout > TimeSpan.Zero
|
// Valid when CentralOfflineTimeout >= OfflineTimeout (both already
|
||||||
|
// required to be positive above). The De Morgan'd guard !(both positive
|
||||||
|
// AND Central < Offline) is true unless BOTH timeouts are positive and
|
||||||
|
// Central is strictly smaller — so it stays silent when either field is
|
||||||
|
// non-positive, leaving that failure to the dedicated positive-duration
|
||||||
|
// checks above rather than double-firing here.
|
||||||
|
builder.RequireThat(
|
||||||
|
!(options.OfflineTimeout > TimeSpan.Zero
|
||||||
&& options.CentralOfflineTimeout > TimeSpan.Zero
|
&& options.CentralOfflineTimeout > TimeSpan.Zero
|
||||||
&& options.CentralOfflineTimeout < options.OfflineTimeout)
|
&& options.CentralOfflineTimeout < options.OfflineTimeout),
|
||||||
{
|
|
||||||
failures.Add(
|
|
||||||
$"ScadaBridge:HealthMonitoring:CentralOfflineTimeout ({options.CentralOfflineTimeout}) " +
|
$"ScadaBridge:HealthMonitoring:CentralOfflineTimeout ({options.CentralOfflineTimeout}) " +
|
||||||
$"must be >= OfflineTimeout ({options.OfflineTimeout}): the synthetic 'central' site has " +
|
$"must be >= OfflineTimeout ({options.OfflineTimeout}): the synthetic 'central' site has " +
|
||||||
"no heartbeat source and is fed only by the slower self-report loop, so it needs at " +
|
"no heartbeat source and is fed only by the slower self-report loop, so it needs at " +
|
||||||
"least as much offline grace as a real site.");
|
"least as much offline grace as a real site.");
|
||||||
}
|
}
|
||||||
|
|
||||||
return failures.Count > 0
|
|
||||||
? ValidateOptionsResult.Fail(failures)
|
|
||||||
: ValidateOptionsResult.Success;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
+1
@@ -12,6 +12,7 @@
|
|||||||
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" />
|
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" />
|
||||||
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
|
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
|
||||||
<PackageReference Include="Microsoft.Extensions.Options" />
|
<PackageReference Include="Microsoft.Extensions.Options" />
|
||||||
|
<PackageReference Include="ZB.MOM.WW.Configuration" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
using Serilog;
|
using Serilog;
|
||||||
using Serilog.Events;
|
using Serilog.Events;
|
||||||
|
using ZB.MOM.WW.Telemetry.Serilog;
|
||||||
|
|
||||||
namespace ZB.MOM.WW.ScadaBridge.Host;
|
namespace ZB.MOM.WW.ScadaBridge.Host;
|
||||||
|
|
||||||
@@ -85,7 +86,8 @@ public static class LoggerConfigurationFactory
|
|||||||
.MinimumLevel.Is(minimumLevel)
|
.MinimumLevel.Is(minimumLevel)
|
||||||
.Enrich.WithProperty("SiteId", siteId)
|
.Enrich.WithProperty("SiteId", siteId)
|
||||||
.Enrich.WithProperty("NodeHostname", nodeHostname)
|
.Enrich.WithProperty("NodeHostname", nodeHostname)
|
||||||
.Enrich.WithProperty("NodeRole", nodeRole);
|
.Enrich.WithProperty("NodeRole", nodeRole)
|
||||||
|
.Enrich.With(new TraceContextEnricher());
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
|||||||
@@ -20,4 +20,11 @@ public class NodeOptions
|
|||||||
public int RemotingPort { get; set; } = 8081;
|
public int RemotingPort { get; set; } = 8081;
|
||||||
/// <summary>Gets or sets the gRPC port for the site stream server.</summary>
|
/// <summary>Gets or sets the gRPC port for the site stream server.</summary>
|
||||||
public int GrpcPort { get; set; } = 8083;
|
public int GrpcPort { get; set; } = 8083;
|
||||||
|
/// <summary>
|
||||||
|
/// HTTP/1.1 port serving the Prometheus /metrics scrape endpoint on site nodes.
|
||||||
|
/// Defaults to 8084 — deliberately distinct from <see cref="RemotingPort"/> (8082)
|
||||||
|
/// and <see cref="GrpcPort"/> (8083) so the Kestrel metrics listener never contends
|
||||||
|
/// with the Akka remoting port a site node binds.
|
||||||
|
/// </summary>
|
||||||
|
public int MetricsPort { get; set; } = 8084;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ using ZB.MOM.WW.ScadaBridge.Security;
|
|||||||
using ZB.MOM.WW.ScadaBridge.SiteCallAudit;
|
using ZB.MOM.WW.ScadaBridge.SiteCallAudit;
|
||||||
using ZB.MOM.WW.ScadaBridge.TemplateEngine;
|
using ZB.MOM.WW.ScadaBridge.TemplateEngine;
|
||||||
using ZB.MOM.WW.ScadaBridge.Transport;
|
using ZB.MOM.WW.ScadaBridge.Transport;
|
||||||
|
using ZB.MOM.WW.Telemetry;
|
||||||
using Serilog;
|
using Serilog;
|
||||||
|
|
||||||
// SCADABRIDGE_CONFIG determines which role-specific config to load (Central or Site)
|
// SCADABRIDGE_CONFIG determines which role-specific config to load (Central or Site)
|
||||||
@@ -248,6 +249,12 @@ try
|
|||||||
// All three are anonymous and use the canonical ZbHealthWriter JSON output.
|
// All three are anonymous and use the canonical ZbHealthWriter JSON output.
|
||||||
app.MapZbHealth();
|
app.MapZbHealth();
|
||||||
|
|
||||||
|
// Observability — mount the always-on Prometheus /metrics scrape endpoint.
|
||||||
|
// AddZbTelemetry (in SiteServiceRegistration.BindSharedOptions) wires the OTel
|
||||||
|
// Resource + standard instrumentation + Prometheus exporter; this exposes them.
|
||||||
|
// Requires endpoint routing (app.UseRouting() above).
|
||||||
|
app.MapZbMetrics();
|
||||||
|
|
||||||
app.MapStaticAssets();
|
app.MapStaticAssets();
|
||||||
app.MapCentralUI<ZB.MOM.WW.ScadaBridge.Host.Components.App>();
|
app.MapCentralUI<ZB.MOM.WW.ScadaBridge.Host.Components.App>();
|
||||||
app.MapInboundAPI();
|
app.MapInboundAPI();
|
||||||
@@ -286,6 +293,13 @@ try
|
|||||||
// Read GrpcPort from config (NodeOptions already has default 8083)
|
// Read GrpcPort from config (NodeOptions already has default 8083)
|
||||||
var grpcPort = configuration.GetValue<int>("ScadaBridge:Node:GrpcPort", 8083);
|
var grpcPort = configuration.GetValue<int>("ScadaBridge:Node:GrpcPort", 8083);
|
||||||
|
|
||||||
|
// Read MetricsPort from config (NodeOptions already has default 8084).
|
||||||
|
// Separate HTTP/1.1 listener so a standard HTTP/1.1 Prometheus scraper can
|
||||||
|
// reach /metrics; the gRPC port stays HTTP/2-only below. The default is
|
||||||
|
// 8084 — distinct from RemotingPort (8082, Akka) and GrpcPort (8083) so the
|
||||||
|
// metrics listener never collides with the Akka remoting port on site nodes.
|
||||||
|
var metricsPort = configuration.GetValue<int>("ScadaBridge:Node:MetricsPort", 8084);
|
||||||
|
|
||||||
// Configure Kestrel for HTTP/2 only on the gRPC port
|
// Configure Kestrel for HTTP/2 only on the gRPC port
|
||||||
builder.WebHost.ConfigureKestrel(options =>
|
builder.WebHost.ConfigureKestrel(options =>
|
||||||
{
|
{
|
||||||
@@ -293,6 +307,13 @@ try
|
|||||||
{
|
{
|
||||||
listenOptions.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http2;
|
listenOptions.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http2;
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Dedicated HTTP/1.1 (and HTTP/2) listener for the Prometheus /metrics
|
||||||
|
// scrape endpoint, reachable by an HTTP/1.1 scraper.
|
||||||
|
options.ListenAnyIP(metricsPort, listenOptions =>
|
||||||
|
{
|
||||||
|
listenOptions.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http1AndHttp2;
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
// gRPC server registration
|
// gRPC server registration
|
||||||
@@ -304,6 +325,17 @@ try
|
|||||||
|
|
||||||
var app = builder.Build();
|
var app = builder.Build();
|
||||||
|
|
||||||
|
// Endpoint routing middleware. The gRPC service mapping below and the
|
||||||
|
// /metrics scrape endpoint both run on endpoint routing, so UseRouting()
|
||||||
|
// must be present before the Map* calls on the site role.
|
||||||
|
app.UseRouting();
|
||||||
|
|
||||||
|
// Observability — mount the always-on Prometheus /metrics scrape endpoint.
|
||||||
|
// AddZbTelemetry (in SiteServiceRegistration.Configure → BindSharedOptions)
|
||||||
|
// wires the OTel Resource + standard instrumentation + Prometheus exporter;
|
||||||
|
// this exposes them on the site node too.
|
||||||
|
app.MapZbMetrics();
|
||||||
|
|
||||||
// Map gRPC service — resolves the singleton SiteStreamGrpcServer from DI
|
// Map gRPC service — resolves the singleton SiteStreamGrpcServer from DI
|
||||||
app.MapGrpcService<ZB.MOM.WW.ScadaBridge.Communication.Grpc.SiteStreamGrpcServer>();
|
app.MapGrpcService<ZB.MOM.WW.ScadaBridge.Communication.Grpc.SiteStreamGrpcServer>();
|
||||||
|
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ using ZB.MOM.WW.ScadaBridge.AuditLog;
|
|||||||
using ZB.MOM.WW.ScadaBridge.ClusterInfrastructure;
|
using ZB.MOM.WW.ScadaBridge.ClusterInfrastructure;
|
||||||
using ZB.MOM.WW.ScadaBridge.Communication;
|
using ZB.MOM.WW.ScadaBridge.Communication;
|
||||||
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
|
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
|
||||||
|
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
|
||||||
using ZB.MOM.WW.ScadaBridge.DataConnectionLayer;
|
using ZB.MOM.WW.ScadaBridge.DataConnectionLayer;
|
||||||
using ZB.MOM.WW.ScadaBridge.ExternalSystemGateway;
|
using ZB.MOM.WW.ScadaBridge.ExternalSystemGateway;
|
||||||
using ZB.MOM.WW.ScadaBridge.HealthMonitoring;
|
using ZB.MOM.WW.ScadaBridge.HealthMonitoring;
|
||||||
@@ -11,6 +12,7 @@ using ZB.MOM.WW.ScadaBridge.NotificationService;
|
|||||||
using ZB.MOM.WW.ScadaBridge.SiteEventLogging;
|
using ZB.MOM.WW.ScadaBridge.SiteEventLogging;
|
||||||
using ZB.MOM.WW.ScadaBridge.SiteRuntime;
|
using ZB.MOM.WW.ScadaBridge.SiteRuntime;
|
||||||
using ZB.MOM.WW.ScadaBridge.StoreAndForward;
|
using ZB.MOM.WW.ScadaBridge.StoreAndForward;
|
||||||
|
using ZB.MOM.WW.Telemetry;
|
||||||
|
|
||||||
namespace ZB.MOM.WW.ScadaBridge.Host;
|
namespace ZB.MOM.WW.ScadaBridge.Host;
|
||||||
|
|
||||||
@@ -103,10 +105,16 @@ public static class SiteServiceRegistration
|
|||||||
public static void BindSharedOptions(IServiceCollection services, IConfiguration config)
|
public static void BindSharedOptions(IServiceCollection services, IConfiguration config)
|
||||||
{
|
{
|
||||||
services.Configure<NodeOptions>(config.GetSection("ScadaBridge:Node"));
|
services.Configure<NodeOptions>(config.GetSection("ScadaBridge:Node"));
|
||||||
services.Configure<ClusterOptions>(config.GetSection("ScadaBridge:Cluster"));
|
// Bind + eagerly validate: ClusterOptionsValidator is registered (TryAddEnumerable)
|
||||||
|
// by the ClusterInfrastructure module, so chaining ValidateOnStart() here makes a bad
|
||||||
|
// ScadaBridge:Cluster section fail fast at host build instead of lazily on first resolve.
|
||||||
|
services.AddOptions<ClusterOptions>().Bind(config.GetSection("ScadaBridge:Cluster")).ValidateOnStart();
|
||||||
services.Configure<DatabaseOptions>(config.GetSection("ScadaBridge:Database"));
|
services.Configure<DatabaseOptions>(config.GetSection("ScadaBridge:Database"));
|
||||||
services.Configure<CommunicationOptions>(config.GetSection("ScadaBridge:Communication"));
|
services.Configure<CommunicationOptions>(config.GetSection("ScadaBridge:Communication"));
|
||||||
services.Configure<HealthMonitoringOptions>(config.GetSection("ScadaBridge:HealthMonitoring"));
|
// Bind + eagerly validate: HealthMonitoringOptionsValidator is registered (TryAddEnumerable)
|
||||||
|
// by the HealthMonitoring module, so chaining ValidateOnStart() here makes a bad
|
||||||
|
// ScadaBridge:HealthMonitoring section fail fast at host build instead of lazily on first resolve.
|
||||||
|
services.AddOptions<HealthMonitoringOptions>().Bind(config.GetSection("ScadaBridge:HealthMonitoring")).ValidateOnStart();
|
||||||
services.Configure<NotificationOptions>(config.GetSection("ScadaBridge:Notification"));
|
services.Configure<NotificationOptions>(config.GetSection("ScadaBridge:Notification"));
|
||||||
services.Configure<LoggingOptions>(config.GetSection("ScadaBridge:Logging"));
|
services.Configure<LoggingOptions>(config.GetSection("ScadaBridge:Logging"));
|
||||||
|
|
||||||
@@ -114,5 +122,26 @@ public static class SiteServiceRegistration
|
|||||||
// writers so they can stamp the SourceNode column. Registered here in
|
// writers so they can stamp the SourceNode column. Registered here in
|
||||||
// shared bootstrap because every node (central + site) needs it.
|
// shared bootstrap because every node (central + site) needs it.
|
||||||
services.AddSingleton<INodeIdentityProvider, NodeIdentityProvider>();
|
services.AddSingleton<INodeIdentityProvider, NodeIdentityProvider>();
|
||||||
|
|
||||||
|
// Observability — shared ZB.MOM.WW.Telemetry. Registered in shared bootstrap so
|
||||||
|
// BOTH the central and site composition roots wire the OTel Resource (the
|
||||||
|
// service.name/site.id/node.role identity triple) + standard instrumentation +
|
||||||
|
// the always-on Prometheus exporter. Mount the /metrics scrape endpoint per role
|
||||||
|
// with app.MapZbMetrics(). The same `?? "central"` SiteId default Program.cs uses
|
||||||
|
// is applied here so the Resource attribute matches the log-enricher value.
|
||||||
|
// The application meter is named so OTel observes its instruments; emit points are
|
||||||
|
// wired by follow-on tasks (the instruments are no-op until a listener attaches).
|
||||||
|
services.AddZbTelemetry(o =>
|
||||||
|
{
|
||||||
|
o.ServiceName = "scadabridge";
|
||||||
|
o.SiteId = config["ScadaBridge:Node:SiteId"] ?? "central";
|
||||||
|
o.NodeRole = config["ScadaBridge:Node:Role"];
|
||||||
|
o.Meters = [ScadaBridgeTelemetry.MeterName];
|
||||||
|
if (Enum.TryParse<ZbExporter>(config["ScadaBridge:Telemetry:Exporter"], ignoreCase: true, out var exporter))
|
||||||
|
o.Exporter = exporter;
|
||||||
|
var otlp = config["ScadaBridge:Telemetry:OtlpEndpoint"];
|
||||||
|
if (!string.IsNullOrWhiteSpace(otlp))
|
||||||
|
o.OtlpEndpoint = otlp;
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
using ZB.MOM.WW.Configuration;
|
||||||
|
|
||||||
namespace ZB.MOM.WW.ScadaBridge.Host;
|
namespace ZB.MOM.WW.ScadaBridge.Host;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -10,77 +12,98 @@ public static class StartupValidator
|
|||||||
/// <param name="configuration">The application configuration to validate.</param>
|
/// <param name="configuration">The application configuration to validate.</param>
|
||||||
public static void Validate(IConfiguration configuration)
|
public static void Validate(IConfiguration configuration)
|
||||||
{
|
{
|
||||||
var errors = new List<string>();
|
// Resolve the same locals the original imperative validator used, so the
|
||||||
|
// cross-field predicates below can close over them. ConfigPreflight.Require
|
||||||
|
// passes config[key] to each predicate, but the cross-field rules ignore that
|
||||||
|
// argument and read these resolved values instead — preserving the exact
|
||||||
|
// conditions (and therefore the byte-identical failure messages and ordering)
|
||||||
|
// of the original StartupValidator.
|
||||||
var nodeSection = configuration.GetSection("ScadaBridge:Node");
|
var nodeSection = configuration.GetSection("ScadaBridge:Node");
|
||||||
var role = nodeSection["Role"];
|
var role = nodeSection["Role"];
|
||||||
if (string.IsNullOrEmpty(role) || (role != "Central" && role != "Site"))
|
|
||||||
errors.Add("ScadaBridge:Node:Role must be 'Central' or 'Site'");
|
|
||||||
|
|
||||||
if (string.IsNullOrEmpty(nodeSection["NodeHostname"]))
|
|
||||||
errors.Add("ScadaBridge:Node:NodeHostname is required");
|
|
||||||
|
|
||||||
var portStr = nodeSection["RemotingPort"];
|
var portStr = nodeSection["RemotingPort"];
|
||||||
if (!int.TryParse(portStr, out var port) || port < 1 || port > 65535)
|
bool portValid = int.TryParse(portStr, out var port) && port >= 1 && port <= 65535;
|
||||||
errors.Add("ScadaBridge:Node:RemotingPort must be 1-65535");
|
|
||||||
|
|
||||||
if (role == "Site" && string.IsNullOrEmpty(nodeSection["SiteId"]))
|
|
||||||
errors.Add("ScadaBridge:Node:SiteId is required for Site nodes");
|
|
||||||
|
|
||||||
if (role == "Central")
|
|
||||||
{
|
|
||||||
var dbSection = configuration.GetSection("ScadaBridge:Database");
|
|
||||||
if (string.IsNullOrEmpty(dbSection["ConfigurationDb"]))
|
|
||||||
errors.Add("ScadaBridge:Database:ConfigurationDb connection string required for Central");
|
|
||||||
|
|
||||||
var secSection = configuration.GetSection("ScadaBridge:Security");
|
|
||||||
if (string.IsNullOrEmpty(secSection["LdapServer"]))
|
|
||||||
errors.Add("ScadaBridge:Security:LdapServer required for Central");
|
|
||||||
if (string.IsNullOrEmpty(secSection["JwtSigningKey"]))
|
|
||||||
errors.Add("ScadaBridge:Security:JwtSigningKey required for Central");
|
|
||||||
}
|
|
||||||
|
|
||||||
var seedNodes = configuration.GetSection("ScadaBridge:Cluster:SeedNodes").Get<List<string>>();
|
var seedNodes = configuration.GetSection("ScadaBridge:Cluster:SeedNodes").Get<List<string>>();
|
||||||
if (seedNodes == null || seedNodes.Count < 2)
|
|
||||||
errors.Add("ScadaBridge:Cluster:SeedNodes must have at least 2 entries");
|
|
||||||
|
|
||||||
if (role == "Site")
|
// GrpcPort: default 8083 when absent; only fails the range rule when the key is
|
||||||
{
|
// present AND invalid. The out-param assignment mirrors the original so the
|
||||||
|
// resolved grpcPort feeds the cross-field rules even on a parse failure.
|
||||||
var grpcPortStr = nodeSection["GrpcPort"];
|
var grpcPortStr = nodeSection["GrpcPort"];
|
||||||
int grpcPort = 8083; // NodeOptions default when the key is absent
|
int grpcPort = 8083; // NodeOptions default when the key is absent
|
||||||
if (grpcPortStr != null && (!int.TryParse(grpcPortStr, out grpcPort) || grpcPort < 1 || grpcPort > 65535))
|
bool grpcValid = !(grpcPortStr != null && (!int.TryParse(grpcPortStr, out grpcPort) || grpcPort < 1 || grpcPort > 65535));
|
||||||
errors.Add("ScadaBridge:Node:GrpcPort must be 1-65535");
|
|
||||||
|
|
||||||
// Host-007 / REQ-HOST-4: the gRPC (Kestrel HTTP/2) port and the Akka
|
// MetricsPort: default 8084 when absent; same parse-or-default contract as GrpcPort.
|
||||||
// remoting port must differ. Identical values make Kestrel and
|
var metricsPortStr = nodeSection["MetricsPort"];
|
||||||
// Akka.Remote contend for the same TCP port and fail opaquely at
|
int metricsPort = 8084; // NodeOptions default when the key is absent
|
||||||
// runtime. Uses the resolved GrpcPort, including the 8083 default.
|
bool metricsValid = !(metricsPortStr != null && (!int.TryParse(metricsPortStr, out metricsPort) || metricsPort < 1 || metricsPort > 65535));
|
||||||
if (port == grpcPort)
|
|
||||||
errors.Add("ScadaBridge:Node:GrpcPort must differ from RemotingPort");
|
|
||||||
|
|
||||||
var dbSection = configuration.GetSection("ScadaBridge:Database");
|
ConfigPreflight.For(configuration)
|
||||||
if (string.IsNullOrEmpty(dbSection["SiteDbPath"]))
|
// Role / NodeHostname / RemotingPort (unconditional)
|
||||||
errors.Add("ScadaBridge:Database:SiteDbPath required for Site nodes");
|
.Require("ScadaBridge:Node:Role",
|
||||||
|
_ => !(string.IsNullOrEmpty(role) || (role != "Central" && role != "Site")),
|
||||||
|
"must be 'Central' or 'Site'")
|
||||||
|
.Require("ScadaBridge:Node:NodeHostname",
|
||||||
|
_ => !string.IsNullOrEmpty(nodeSection["NodeHostname"]),
|
||||||
|
"is required")
|
||||||
|
.Require("ScadaBridge:Node:RemotingPort",
|
||||||
|
_ => portValid,
|
||||||
|
"must be 1-65535")
|
||||||
|
// SiteId (Site only) — note: OUTSIDE the big Site block in the original,
|
||||||
|
// so it must run before the unconditional SeedNodes-count rule.
|
||||||
|
.When(role == "Site", p => p
|
||||||
|
.Require("ScadaBridge:Node:SiteId",
|
||||||
|
_ => !string.IsNullOrEmpty(nodeSection["SiteId"]),
|
||||||
|
"is required for Site nodes"))
|
||||||
|
// Central-only database/security rules.
|
||||||
|
.When(role == "Central", p => p
|
||||||
|
.Require("ScadaBridge:Database:ConfigurationDb",
|
||||||
|
_ => !string.IsNullOrEmpty(configuration.GetSection("ScadaBridge:Database")["ConfigurationDb"]),
|
||||||
|
"connection string required for Central")
|
||||||
|
.Require("ScadaBridge:Security:LdapServer",
|
||||||
|
_ => !string.IsNullOrEmpty(configuration.GetSection("ScadaBridge:Security")["LdapServer"]),
|
||||||
|
"required for Central")
|
||||||
|
.Require("ScadaBridge:Security:JwtSigningKey",
|
||||||
|
_ => !string.IsNullOrEmpty(configuration.GetSection("ScadaBridge:Security")["JwtSigningKey"]),
|
||||||
|
"required for Central"))
|
||||||
|
// SeedNodes count (unconditional, after SiteId).
|
||||||
|
.Require("ScadaBridge:Cluster:SeedNodes",
|
||||||
|
_ => seedNodes != null && seedNodes.Count >= 2,
|
||||||
|
"must have at least 2 entries")
|
||||||
|
// The big Site-only block: GrpcPort/MetricsPort validity + cross-field
|
||||||
|
// collisions + SiteDbPath + seed-node-port loop, in the original order.
|
||||||
|
.When(role == "Site", p =>
|
||||||
|
{
|
||||||
|
// Host-007 / REQ-HOST-4: GrpcPort range, then GrpcPort vs RemotingPort.
|
||||||
|
p.Require("ScadaBridge:Node:GrpcPort", _ => grpcValid, "must be 1-65535");
|
||||||
|
// Identical GrpcPort/RemotingPort make Kestrel and Akka.Remote contend
|
||||||
|
// for the same TCP port. Uses the resolved GrpcPort, including 8083.
|
||||||
|
p.Require("ScadaBridge:Node:GrpcPort", _ => port != grpcPort, "must differ from RemotingPort");
|
||||||
|
|
||||||
|
// Host-007 / REQ-HOST-4: MetricsPort range, then MetricsPort vs both ports.
|
||||||
|
p.Require("ScadaBridge:Node:MetricsPort", _ => metricsValid, "must be 1-65535");
|
||||||
|
// The Kestrel metrics (HTTP/1.1) listener port must differ from BOTH the
|
||||||
|
// Akka remoting port and the gRPC port. Uses the resolved MetricsPort (8084 default).
|
||||||
|
p.Require("ScadaBridge:Node:MetricsPort", _ => metricsPort != port, "must differ from RemotingPort");
|
||||||
|
p.Require("ScadaBridge:Node:MetricsPort", _ => metricsPort != grpcPort, "must differ from GrpcPort");
|
||||||
|
|
||||||
|
p.Require("ScadaBridge:Database:SiteDbPath",
|
||||||
|
_ => !string.IsNullOrEmpty(configuration.GetSection("ScadaBridge:Database")["SiteDbPath"]),
|
||||||
|
"required for Site nodes");
|
||||||
|
|
||||||
// Host-004: a seed node must reference an Akka.Remote endpoint, never the
|
// Host-004: a seed node must reference an Akka.Remote endpoint, never the
|
||||||
// Kestrel HTTP/2 gRPC port. A seed entry whose port equals this node's
|
// Kestrel HTTP/2 gRPC port. A seed entry whose port equals this node's
|
||||||
// GrpcPort would make a joining node attempt an Akka.Remote TCP
|
// GrpcPort would make a joining node attempt an Akka.Remote TCP
|
||||||
// association against the gRPC listener and fail.
|
// association against the gRPC listener and fail.
|
||||||
if (seedNodes != null)
|
foreach (var seed in seedNodes ?? Enumerable.Empty<string>())
|
||||||
{
|
{
|
||||||
foreach (var seed in seedNodes)
|
p.Require("ScadaBridge:Cluster:SeedNodes",
|
||||||
{
|
_ => SeedNodePort(seed) != grpcPort,
|
||||||
if (SeedNodePort(seed) == grpcPort)
|
$"entry '{seed}' must not target the gRPC port " +
|
||||||
errors.Add(
|
|
||||||
$"ScadaBridge:Cluster:SeedNodes entry '{seed}' must not target the gRPC port " +
|
|
||||||
$"({grpcPort}); seed nodes must reference Akka remoting ports");
|
$"({grpcPort}); seed nodes must reference Akka remoting ports");
|
||||||
}
|
}
|
||||||
}
|
})
|
||||||
}
|
.ThrowIfInvalid();
|
||||||
|
|
||||||
if (errors.Count > 0)
|
|
||||||
throw new InvalidOperationException(
|
|
||||||
$"Configuration validation failed:\n{string.Join("\n", errors.Select(e => $" - {e}"))}");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
|||||||
@@ -28,9 +28,12 @@
|
|||||||
<!-- Transitive override: Akka.Hosting 1.5.62 pins OpenTelemetry.Api 1.9.0 which is flagged
|
<!-- Transitive override: Akka.Hosting 1.5.62 pins OpenTelemetry.Api 1.9.0 which is flagged
|
||||||
(GHSA-g94r-2vxg-569j, GHSA-8785-wc3w-h8q6). Bumping directly clears both advisories. -->
|
(GHSA-g94r-2vxg-569j, GHSA-8785-wc3w-h8q6). Bumping directly clears both advisories. -->
|
||||||
<PackageReference Include="OpenTelemetry.Api" />
|
<PackageReference Include="OpenTelemetry.Api" />
|
||||||
|
<PackageReference Include="ZB.MOM.WW.Configuration" />
|
||||||
<PackageReference Include="ZB.MOM.WW.Health" />
|
<PackageReference Include="ZB.MOM.WW.Health" />
|
||||||
<PackageReference Include="ZB.MOM.WW.Health.Akka" />
|
<PackageReference Include="ZB.MOM.WW.Health.Akka" />
|
||||||
<PackageReference Include="ZB.MOM.WW.Health.EntityFrameworkCore" />
|
<PackageReference Include="ZB.MOM.WW.Health.EntityFrameworkCore" />
|
||||||
|
<PackageReference Include="ZB.MOM.WW.Telemetry" />
|
||||||
|
<PackageReference Include="ZB.MOM.WW.Telemetry.Serilog" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
|
|||||||
@@ -7,6 +7,7 @@
|
|||||||
"SiteId": "site-a",
|
"SiteId": "site-a",
|
||||||
"RemotingPort": 8082,
|
"RemotingPort": 8082,
|
||||||
"GrpcPort": 8083,
|
"GrpcPort": 8083,
|
||||||
|
"MetricsPort": 8084,
|
||||||
"NodeName": "node-a"
|
"NodeName": "node-a"
|
||||||
},
|
},
|
||||||
"Cluster": {
|
"Cluster": {
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ using Microsoft.AspNetCore.Routing;
|
|||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using Microsoft.Extensions.Options;
|
using Microsoft.Extensions.Options;
|
||||||
|
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
|
||||||
using ZB.MOM.WW.ScadaBridge.InboundAPI.Middleware;
|
using ZB.MOM.WW.ScadaBridge.InboundAPI.Middleware;
|
||||||
|
|
||||||
namespace ZB.MOM.WW.ScadaBridge.InboundAPI;
|
namespace ZB.MOM.WW.ScadaBridge.InboundAPI;
|
||||||
@@ -44,6 +45,16 @@ public static class EndpointExtensions
|
|||||||
|
|
||||||
if (!validationResult.IsValid)
|
if (!validationResult.IsValid)
|
||||||
{
|
{
|
||||||
|
// Telemetry follow-on: count every inbound request, including auth
|
||||||
|
// failures. The raw {methodName} route value is arbitrary caller input
|
||||||
|
// and would be high-cardinality, so failures are tagged with a small
|
||||||
|
// bounded set of sentinels keyed off the validator's status code rather
|
||||||
|
// than the unvalidated name (401 → "<unauthorized>", 403 → "<forbidden>").
|
||||||
|
ScadaBridgeTelemetry.RecordInboundApiRequest(
|
||||||
|
validationResult.StatusCode == StatusCodes.Status401Unauthorized
|
||||||
|
? "<unauthorized>"
|
||||||
|
: "<forbidden>");
|
||||||
|
|
||||||
// WP-5: Failures-only logging
|
// WP-5: Failures-only logging
|
||||||
logger.LogWarning(
|
logger.LogWarning(
|
||||||
"Inbound API auth failure for method {Method}: {Error} (status {StatusCode})",
|
"Inbound API auth failure for method {Method}: {Error} (status {StatusCode})",
|
||||||
@@ -56,6 +67,12 @@ public static class EndpointExtensions
|
|||||||
|
|
||||||
var method = validationResult.Method!;
|
var method = validationResult.Method!;
|
||||||
|
|
||||||
|
// Telemetry follow-on: count this inbound request against the resolved,
|
||||||
|
// registered method name. method.Name comes from the repository's method
|
||||||
|
// catalogue (an exact-name lookup), so the `method` tag is bounded to the
|
||||||
|
// set of configured API methods — never the raw caller-supplied route value.
|
||||||
|
ScadaBridgeTelemetry.RecordInboundApiRequest(method.Name);
|
||||||
|
|
||||||
// Audit Log (#23 M4 Bundle D): publish the resolved API key name so
|
// Audit Log (#23 M4 Bundle D): publish the resolved API key name so
|
||||||
// AuditWriteMiddleware can populate AuditEvent.Actor in its finally
|
// AuditWriteMiddleware can populate AuditEvent.Actor in its finally
|
||||||
// block. Done AFTER validation succeeded — auth failures leave the
|
// block. Done AFTER validation succeeded — auth failures leave the
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
using Microsoft.Extensions.Options;
|
using ZB.MOM.WW.Configuration;
|
||||||
|
|
||||||
namespace ZB.MOM.WW.ScadaBridge.Security;
|
namespace ZB.MOM.WW.ScadaBridge.Security;
|
||||||
|
|
||||||
@@ -29,7 +29,7 @@ namespace ZB.MOM.WW.ScadaBridge.Security;
|
|||||||
/// minimum-byte length contract co-located with the type that enforces it.
|
/// minimum-byte length contract co-located with the type that enforces it.
|
||||||
/// </para>
|
/// </para>
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public sealed class SecurityOptionsValidator : IValidateOptions<SecurityOptions>
|
public sealed class SecurityOptionsValidator : OptionsValidatorBase<SecurityOptions>
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// The configuration section name <see cref="SecurityOptions"/> is bound
|
/// The configuration section name <see cref="SecurityOptions"/> is bound
|
||||||
@@ -41,30 +41,16 @@ public sealed class SecurityOptionsValidator : IValidateOptions<SecurityOptions>
|
|||||||
public const string ConfigSectionName = "Security";
|
public const string ConfigSectionName = "Security";
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public ValidateOptionsResult Validate(string? name, SecurityOptions options)
|
protected override void Validate(ValidationBuilder builder, SecurityOptions options)
|
||||||
{
|
{
|
||||||
ArgumentNullException.ThrowIfNull(options);
|
builder.RequireThat(!string.IsNullOrWhiteSpace(options.LdapServer),
|
||||||
|
|
||||||
var failures = new List<string>();
|
|
||||||
|
|
||||||
if (string.IsNullOrWhiteSpace(options.LdapServer))
|
|
||||||
{
|
|
||||||
failures.Add(
|
|
||||||
$"{ConfigSectionName}:{nameof(SecurityOptions.LdapServer)} is required " +
|
$"{ConfigSectionName}:{nameof(SecurityOptions.LdapServer)} is required " +
|
||||||
"but was empty or whitespace — set it to the LDAP server hostname or IP " +
|
"but was empty or whitespace — set it to the LDAP server hostname or IP " +
|
||||||
"(e.g. \"ldap.example.com\").");
|
"(e.g. \"ldap.example.com\").");
|
||||||
}
|
|
||||||
|
|
||||||
if (string.IsNullOrWhiteSpace(options.LdapSearchBase))
|
builder.RequireThat(!string.IsNullOrWhiteSpace(options.LdapSearchBase),
|
||||||
{
|
|
||||||
failures.Add(
|
|
||||||
$"{ConfigSectionName}:{nameof(SecurityOptions.LdapSearchBase)} is required " +
|
$"{ConfigSectionName}:{nameof(SecurityOptions.LdapSearchBase)} is required " +
|
||||||
"but was empty or whitespace — set it to the search-base DN " +
|
"but was empty or whitespace — set it to the search-base DN " +
|
||||||
"(e.g. \"dc=example,dc=com\").");
|
"(e.g. \"dc=example,dc=com\").");
|
||||||
}
|
}
|
||||||
|
|
||||||
return failures.Count == 0
|
|
||||||
? ValidateOptionsResult.Success
|
|
||||||
: ValidateOptionsResult.Fail(failures);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,6 +14,7 @@
|
|||||||
<PackageReference Include="Microsoft.AspNetCore.Authorization" />
|
<PackageReference Include="Microsoft.AspNetCore.Authorization" />
|
||||||
<PackageReference Include="System.IdentityModel.Tokens.Jwt" />
|
<PackageReference Include="System.IdentityModel.Tokens.Jwt" />
|
||||||
<PackageReference Include="Novell.Directory.Ldap.NETStandard" />
|
<PackageReference Include="Novell.Directory.Ldap.NETStandard" />
|
||||||
|
<PackageReference Include="ZB.MOM.WW.Configuration" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
|
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
|
||||||
|
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
|
||||||
using ZB.MOM.WW.ScadaBridge.Commons.Types;
|
using ZB.MOM.WW.ScadaBridge.Commons.Types;
|
||||||
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
|
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
|
||||||
|
|
||||||
@@ -98,6 +99,48 @@ public class StoreAndForwardService
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
private static readonly TimeSpan SweepShutdownWaitTimeout = TimeSpan.FromSeconds(10);
|
private static readonly TimeSpan SweepShutdownWaitTimeout = TimeSpan.FromSeconds(10);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// WP-14 (telemetry): cached count of messages currently buffered for
|
||||||
|
/// forwarding — i.e. rows in <see cref="StoreAndForwardMessageStatus.Pending"/>,
|
||||||
|
/// the live store-and-forward queue waiting to be delivered. This backs the
|
||||||
|
/// <c>scadabridge.store_and_forward.queue.depth</c> observable gauge.
|
||||||
|
/// <para>
|
||||||
|
/// The gauge's collection callback is synchronous and is invoked frequently by
|
||||||
|
/// the OpenTelemetry/Prometheus collector, so it must never run an async SQLite
|
||||||
|
/// <c>COUNT(*)</c>. Instead this <see cref="long"/> is seeded once from storage
|
||||||
|
/// in <see cref="StartAsync"/> and then adjusted in-process on the existing
|
||||||
|
/// paths that change the Pending population: <see cref="BufferAsync"/> (+1),
|
||||||
|
/// successful-retry removal and Pending→Parked transitions in
|
||||||
|
/// <see cref="RetryMessageAsync"/> (-1), and operator requeue in
|
||||||
|
/// <see cref="RetryParkedMessageAsync"/> (+1). The provider registered with
|
||||||
|
/// <see cref="ScadaBridgeTelemetry.SetQueueDepthProvider"/> reads it via
|
||||||
|
/// <see cref="Interlocked.Read"/> — non-blocking and sync-safe. It is an
|
||||||
|
/// approximate, eventually-consistent gauge (concurrent failover replication
|
||||||
|
/// applies to the standby's own store, not this counter), which is exactly
|
||||||
|
/// what a queue-depth metric needs.
|
||||||
|
/// </para>
|
||||||
|
/// </summary>
|
||||||
|
private long _bufferedCount;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Test seam (WP-14 telemetry): simulates a concurrent pre-seed
|
||||||
|
/// <see cref="BufferAsync"/> increment landing on <see cref="_bufferedCount"/>
|
||||||
|
/// before <see cref="StartAsync"/> seeds it, so a test can prove the seed uses
|
||||||
|
/// <see cref="Interlocked.Add"/> (additive) rather than Exchange (clobbering).
|
||||||
|
/// </summary>
|
||||||
|
internal void TestOnly_IncrementBufferedCount() =>
|
||||||
|
Interlocked.Increment(ref _bufferedCount);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// WP-14 (telemetry): an instance field that guards against a single instance
|
||||||
|
/// registering the queue-depth provider (and re-seeding the counter) more than
|
||||||
|
/// once — e.g. a second <see cref="StartAsync"/> on the same instance. It does NOT
|
||||||
|
/// coordinate across instances: the gauge slot in <see cref="ScadaBridgeTelemetry"/>
|
||||||
|
/// is process-global, so in a multi-instance process the last <see cref="StartAsync"/>
|
||||||
|
/// wins the global slot. 0 = not yet registered, 1 = done.
|
||||||
|
/// </summary>
|
||||||
|
private int _queueDepthProviderRegistered;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// WP-10: Delivery handler delegate. The return value / exception is interpreted
|
/// WP-10: Delivery handler delegate. The return value / exception is interpreted
|
||||||
/// the same way on both the immediate-delivery path (<see cref="EnqueueAsync"/>)
|
/// the same way on both the immediate-delivery path (<see cref="EnqueueAsync"/>)
|
||||||
@@ -170,6 +213,27 @@ public class StoreAndForwardService
|
|||||||
public async Task StartAsync()
|
public async Task StartAsync()
|
||||||
{
|
{
|
||||||
await _storage.InitializeAsync();
|
await _storage.InitializeAsync();
|
||||||
|
|
||||||
|
// WP-14 (telemetry): seed the cached buffered-message count from the
|
||||||
|
// store exactly once (the gauge callback cannot run an async COUNT), then
|
||||||
|
// register the sync, non-blocking provider with the process-global
|
||||||
|
// ScadaBridgeTelemetry gauge. Both steps are inside the one-time guard so a
|
||||||
|
// second StartAsync on the same instance cannot double-seed.
|
||||||
|
//
|
||||||
|
// The seed is an Interlocked.Add — NOT an Exchange — to avoid a startup race:
|
||||||
|
// between the await above returning and this point, a concurrent BufferAsync
|
||||||
|
// could already have Interlocked.Increment'd _bufferedCount. Exchange would
|
||||||
|
// clobber that increment (losing a +1); Add preserves it. _bufferedCount
|
||||||
|
// starts at 0 and only BufferAsync increments it before the seed, so
|
||||||
|
// 0 + pending + (any concurrent increments) is the correct live count.
|
||||||
|
var pending = await _storage.GetMessageCountByStatusAsync(
|
||||||
|
StoreAndForwardMessageStatus.Pending);
|
||||||
|
if (Interlocked.CompareExchange(ref _queueDepthProviderRegistered, 1, 0) == 0)
|
||||||
|
{
|
||||||
|
Interlocked.Add(ref _bufferedCount, pending);
|
||||||
|
ScadaBridgeTelemetry.SetQueueDepthProvider(() => Interlocked.Read(ref _bufferedCount));
|
||||||
|
}
|
||||||
|
|
||||||
_retryTimer = new Timer(
|
_retryTimer = new Timer(
|
||||||
// StoreAndForward-024: capture the sweep Task on each tick so
|
// StoreAndForward-024: capture the sweep Task on each tick so
|
||||||
// StopAsync can await any in-flight invocation before the host
|
// StopAsync can await any in-flight invocation before the host
|
||||||
@@ -396,6 +460,10 @@ public class StoreAndForwardService
|
|||||||
{
|
{
|
||||||
await _storage.EnqueueAsync(message);
|
await _storage.EnqueueAsync(message);
|
||||||
_replication?.ReplicateEnqueue(message);
|
_replication?.ReplicateEnqueue(message);
|
||||||
|
// WP-14 (telemetry): a freshly buffered row is Pending → grows the live
|
||||||
|
// queue depth. Bumped after the durable write so the gauge never leads the
|
||||||
|
// store.
|
||||||
|
Interlocked.Increment(ref _bufferedCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -452,6 +520,8 @@ public class StoreAndForwardService
|
|||||||
{
|
{
|
||||||
await _storage.RemoveMessageAsync(message.Id);
|
await _storage.RemoveMessageAsync(message.Id);
|
||||||
_replication?.ReplicateRemove(message.Id);
|
_replication?.ReplicateRemove(message.Id);
|
||||||
|
// WP-14 (telemetry): a delivered row leaves the Pending queue.
|
||||||
|
Interlocked.Decrement(ref _bufferedCount);
|
||||||
RaiseActivity("Delivered", message.Category,
|
RaiseActivity("Delivered", message.Category,
|
||||||
$"Delivered to {message.Target} after {message.RetryCount} retries");
|
$"Delivered to {message.Target} after {message.RetryCount} retries");
|
||||||
|
|
||||||
@@ -483,6 +553,9 @@ public class StoreAndForwardService
|
|||||||
message.Id);
|
message.Id);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
// WP-14 (telemetry): the row committed Pending→Parked, leaving the live
|
||||||
|
// forward queue. Only counted when the conditional update actually won.
|
||||||
|
Interlocked.Decrement(ref _bufferedCount);
|
||||||
_replication?.ReplicatePark(message);
|
_replication?.ReplicatePark(message);
|
||||||
RaiseActivity("Parked", message.Category,
|
RaiseActivity("Parked", message.Category,
|
||||||
$"Permanent failure for {message.Target}: handler returned false");
|
$"Permanent failure for {message.Target}: handler returned false");
|
||||||
@@ -519,6 +592,9 @@ public class StoreAndForwardService
|
|||||||
message.Id);
|
message.Id);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
// WP-14 (telemetry): the row committed Pending→Parked, leaving the
|
||||||
|
// live forward queue. Only counted when the conditional update won.
|
||||||
|
Interlocked.Decrement(ref _bufferedCount);
|
||||||
_replication?.ReplicatePark(message);
|
_replication?.ReplicatePark(message);
|
||||||
RaiseActivity("Parked", message.Category,
|
RaiseActivity("Parked", message.Category,
|
||||||
$"Max retries ({message.MaxRetries}) reached for {message.Target}");
|
$"Max retries ({message.MaxRetries}) reached for {message.Target}");
|
||||||
@@ -737,6 +813,11 @@ public class StoreAndForwardService
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WP-14 (telemetry): an operator requeue moves Parked→Pending, re-adding the
|
||||||
|
// row to the live forward queue. Counted only when the conditional storage
|
||||||
|
// update actually flipped the row.
|
||||||
|
Interlocked.Increment(ref _bufferedCount);
|
||||||
|
|
||||||
// The active node just rewrote this row to Pending with retry_count = 0
|
// The active node just rewrote this row to Pending with retry_count = 0
|
||||||
// and cleared last_error / last_attempt_at (see
|
// and cleared last_error / last_attempt_at (see
|
||||||
// StoreAndForwardStorage.RetryParkedMessageAsync). Reconstruct the
|
// StoreAndForwardStorage.RetryParkedMessageAsync). Reconstruct the
|
||||||
@@ -769,6 +850,7 @@ public class StoreAndForwardService
|
|||||||
{
|
{
|
||||||
// Capture the category before the row is deleted so the activity log is
|
// Capture the category before the row is deleted so the activity log is
|
||||||
// labelled correctly.
|
// labelled correctly.
|
||||||
|
// WP-14 (telemetry): Parked rows are not in _bufferedCount; discarding a Parked row needs no counter adjustment.
|
||||||
var message = await _storage.GetMessageByIdAsync(messageId);
|
var message = await _storage.GetMessageByIdAsync(messageId);
|
||||||
var success = await _storage.DiscardParkedMessageAsync(messageId);
|
var success = await _storage.DiscardParkedMessageAsync(messageId);
|
||||||
if (success)
|
if (success)
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
using System.Diagnostics.Metrics;
|
||||||
using System.Threading.Channels;
|
using System.Threading.Channels;
|
||||||
using Akka.Actor;
|
using Akka.Actor;
|
||||||
using Akka.TestKit.Xunit2;
|
using Akka.TestKit.Xunit2;
|
||||||
@@ -5,6 +6,7 @@ using Grpc.Core;
|
|||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using Microsoft.Extensions.Logging.Abstractions;
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
using NSubstitute;
|
using NSubstitute;
|
||||||
|
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
|
||||||
using ZB.MOM.WW.ScadaBridge.Communication.Grpc;
|
using ZB.MOM.WW.ScadaBridge.Communication.Grpc;
|
||||||
|
|
||||||
namespace ZB.MOM.WW.ScadaBridge.Communication.Tests.Grpc;
|
namespace ZB.MOM.WW.ScadaBridge.Communication.Tests.Grpc;
|
||||||
@@ -342,6 +344,59 @@ public class SiteStreamGrpcServerTests : TestKit
|
|||||||
subscriber.DidNotReceive().RemoveSubscriber(Arg.Any<IActorRef>());
|
subscriber.DidNotReceive().RemoveSubscriber(Arg.Any<IActorRef>());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task SiteConnectionUpGauge_GoesToOneOnConnect_AndBackToZeroOnCancel()
|
||||||
|
{
|
||||||
|
// Telemetry follow-on: the scadabridge.site.connection.up gauge must read
|
||||||
|
// exactly 1 while a site stream is established and return to 0 once the
|
||||||
|
// stream terminates on the cancel path — proving SiteConnectionOpened() is
|
||||||
|
// matched by exactly one SiteConnectionClosed() in the handler's finally.
|
||||||
|
var server = CreateServer();
|
||||||
|
server.SetReady(Sys);
|
||||||
|
|
||||||
|
long ReadGauge()
|
||||||
|
{
|
||||||
|
long observed = 0;
|
||||||
|
using var listener = new MeterListener();
|
||||||
|
listener.InstrumentPublished = (instrument, l) =>
|
||||||
|
{
|
||||||
|
if (instrument.Meter.Name == ScadaBridgeTelemetry.MeterName &&
|
||||||
|
instrument.Name == "scadabridge.site.connection.up")
|
||||||
|
{
|
||||||
|
l.EnableMeasurementEvents(instrument);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
listener.SetMeasurementEventCallback<long>((_, measurement, _, _) => observed = measurement);
|
||||||
|
listener.Start();
|
||||||
|
listener.RecordObservableInstruments();
|
||||||
|
return observed;
|
||||||
|
}
|
||||||
|
|
||||||
|
var baseline = ReadGauge();
|
||||||
|
|
||||||
|
var cts = new CancellationTokenSource();
|
||||||
|
var context = CreateMockContext(cts.Token);
|
||||||
|
var writer = Substitute.For<IServerStreamWriter<SiteStreamEvent>>();
|
||||||
|
|
||||||
|
var streamTask = Task.Run(() => server.SubscribeInstance(
|
||||||
|
MakeRequest("corr-gauge", "Site1.Pump01"), writer, context));
|
||||||
|
|
||||||
|
await WaitForConditionAsync(() => server.ActiveStreamCount == 1);
|
||||||
|
|
||||||
|
// While the stream is up the gauge is one above whatever baseline other
|
||||||
|
// (possibly parallel) tests left behind — read relative so the assertion
|
||||||
|
// is robust to test interleaving on the process-wide static counter.
|
||||||
|
Assert.Equal(baseline + 1, ReadGauge());
|
||||||
|
|
||||||
|
cts.Cancel();
|
||||||
|
await streamTask;
|
||||||
|
await WaitForConditionAsync(() => server.ActiveStreamCount == 0);
|
||||||
|
|
||||||
|
// After the cancel path runs the finally, the gauge is balanced back to
|
||||||
|
// the baseline — no leaked "up" count.
|
||||||
|
Assert.Equal(baseline, ReadGauge());
|
||||||
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
public void SetReady_AllowsStreamCreation()
|
public void SetReady_AllowsStreamCreation()
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
using System.Diagnostics.Metrics;
|
||||||
using Akka.Actor;
|
using Akka.Actor;
|
||||||
using Akka.TestKit.Xunit2;
|
using Akka.TestKit.Xunit2;
|
||||||
using Microsoft.Extensions.Logging.Abstractions;
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
@@ -10,6 +11,7 @@ using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
|
|||||||
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
|
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
|
||||||
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment;
|
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment;
|
||||||
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Lifecycle;
|
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Lifecycle;
|
||||||
|
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
|
||||||
using ZB.MOM.WW.ScadaBridge.Commons.Types;
|
using ZB.MOM.WW.ScadaBridge.Commons.Types;
|
||||||
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
|
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
|
||||||
using ZB.MOM.WW.ScadaBridge.Commons.Types.Flattening;
|
using ZB.MOM.WW.ScadaBridge.Commons.Types.Flattening;
|
||||||
@@ -558,6 +560,106 @@ public class DeploymentServiceTests : TestKit
|
|||||||
Arg.Any<object>(), Arg.Any<CancellationToken>());
|
Arg.Any<object>(), Arg.Any<CancellationToken>());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Telemetry follow-on: scadabridge.deployments.applied on deploy success ──
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task DeployInstanceAsync_SiteSucceeds_EmitsDeploymentsAppliedCounterOnce()
|
||||||
|
{
|
||||||
|
// A successful deployment must increment the
|
||||||
|
// scadabridge.deployments.applied counter exactly once — one
|
||||||
|
// DeployInstanceAsync deploys one instance to one site, so the unit is
|
||||||
|
// one increment per successful deploy operation.
|
||||||
|
var instance = new Instance("MetricInst") { Id = 55, SiteId = 1, State = InstanceState.NotDeployed };
|
||||||
|
_repo.GetInstanceByIdAsync(55, Arg.Any<CancellationToken>()).Returns(instance);
|
||||||
|
SetupValidPipeline(55, "MetricInst", "sha256:target");
|
||||||
|
_repo.GetCurrentDeploymentStatusAsync(55, Arg.Any<CancellationToken>())
|
||||||
|
.Returns((DeploymentRecord?)null);
|
||||||
|
|
||||||
|
var counters = new ReconcileProbeCounters();
|
||||||
|
var commActor = Sys.ActorOf(Props.Create(() =>
|
||||||
|
new ReconcileProbeActor(counters, siteHash: "sha256:target", failQuery: false)));
|
||||||
|
var service = CreateServiceWithCommActor(commActor);
|
||||||
|
|
||||||
|
long applied = 0;
|
||||||
|
using var listener = new MeterListener
|
||||||
|
{
|
||||||
|
InstrumentPublished = (instrument, l) =>
|
||||||
|
{
|
||||||
|
if (instrument.Meter.Name == ScadaBridgeTelemetry.MeterName
|
||||||
|
&& instrument.Name == "scadabridge.deployments.applied")
|
||||||
|
{
|
||||||
|
l.EnableMeasurementEvents(instrument);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
listener.SetMeasurementEventCallback<long>((_, measurement, _, _) =>
|
||||||
|
Interlocked.Add(ref applied, measurement));
|
||||||
|
listener.Start();
|
||||||
|
|
||||||
|
var result = await service.DeployInstanceAsync(55, "admin");
|
||||||
|
|
||||||
|
listener.Dispose();
|
||||||
|
|
||||||
|
Assert.True(result.IsSuccess);
|
||||||
|
// Fresh first-time deploy applied -> exactly one increment.
|
||||||
|
Assert.Equal(1, Interlocked.Read(ref applied));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task DeployInstanceAsync_Reconciled_DoesNotEmitDeploymentsAppliedCounter()
|
||||||
|
{
|
||||||
|
// The reconciliation path recovers a PRIOR timed-out apply rather than
|
||||||
|
// performing a fresh one; counting it would risk double-counting the
|
||||||
|
// original apply, so scadabridge.deployments.applied must NOT increment
|
||||||
|
// on a reconciled (no re-deploy) success.
|
||||||
|
var instance = new Instance("MetricReconcileInst")
|
||||||
|
{
|
||||||
|
Id = 56, SiteId = 1, State = InstanceState.NotDeployed
|
||||||
|
};
|
||||||
|
_repo.GetInstanceByIdAsync(56, Arg.Any<CancellationToken>()).Returns(instance);
|
||||||
|
SetupValidPipeline(56, "MetricReconcileInst", "sha256:target");
|
||||||
|
|
||||||
|
var prior = new DeploymentRecord("dep-prior-56", "admin")
|
||||||
|
{
|
||||||
|
InstanceId = 56,
|
||||||
|
Status = DeploymentStatus.InProgress,
|
||||||
|
RevisionHash = "sha256:target"
|
||||||
|
};
|
||||||
|
_repo.GetCurrentDeploymentStatusAsync(56, Arg.Any<CancellationToken>()).Returns(prior);
|
||||||
|
_repo.GetDeployedSnapshotByInstanceIdAsync(56, Arg.Any<CancellationToken>())
|
||||||
|
.Returns((DeployedConfigSnapshot?)null);
|
||||||
|
|
||||||
|
var counters = new ReconcileProbeCounters();
|
||||||
|
var commActor = Sys.ActorOf(Props.Create(() =>
|
||||||
|
new ReconcileProbeActor(counters, siteHash: "sha256:target", failQuery: false)));
|
||||||
|
var service = CreateServiceWithCommActor(commActor);
|
||||||
|
|
||||||
|
long applied = 0;
|
||||||
|
using var listener = new MeterListener
|
||||||
|
{
|
||||||
|
InstrumentPublished = (instrument, l) =>
|
||||||
|
{
|
||||||
|
if (instrument.Meter.Name == ScadaBridgeTelemetry.MeterName
|
||||||
|
&& instrument.Name == "scadabridge.deployments.applied")
|
||||||
|
{
|
||||||
|
l.EnableMeasurementEvents(instrument);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
listener.SetMeasurementEventCallback<long>((_, measurement, _, _) =>
|
||||||
|
Interlocked.Add(ref applied, measurement));
|
||||||
|
listener.Start();
|
||||||
|
|
||||||
|
var result = await service.DeployInstanceAsync(56, "admin");
|
||||||
|
|
||||||
|
listener.Dispose();
|
||||||
|
|
||||||
|
Assert.True(result.IsSuccess);
|
||||||
|
// Reconciled — no fresh deploy was sent, so no increment.
|
||||||
|
Assert.Equal(0, counters.DeployCount);
|
||||||
|
Assert.Equal(0, Interlocked.Read(ref applied));
|
||||||
|
}
|
||||||
|
|
||||||
// ── DeploymentManager-011: lifecycle success paths ──
|
// ── DeploymentManager-011: lifecycle success paths ──
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
|
|||||||
@@ -0,0 +1,79 @@
|
|||||||
|
using System.Net;
|
||||||
|
using Microsoft.AspNetCore.Mvc.Testing;
|
||||||
|
using Microsoft.Extensions.Configuration;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.ScadaBridge.Host.Tests;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Observability adoption: verifies the shared ZB.MOM.WW.Telemetry Prometheus
|
||||||
|
/// scrape endpoint (<c>/metrics</c>, mounted by <c>app.MapZbMetrics()</c>) is wired into the
|
||||||
|
/// Central composition root. <c>AddZbTelemetry</c> (registered in
|
||||||
|
/// <see cref="SiteServiceRegistration.BindSharedOptions"/>) always wires the Prometheus
|
||||||
|
/// exporter, so the endpoint returns the Prometheus exposition format regardless of DB /
|
||||||
|
/// cluster state. This is a pure route assertion — it requires no database, LDAP, or formed
|
||||||
|
/// Akka cluster. The Central-role factory bootstrap mirrors <see cref="HealthCheckTests"/>.
|
||||||
|
/// </summary>
|
||||||
|
public class MetricsEndpointTests : IDisposable
|
||||||
|
{
|
||||||
|
private readonly List<IDisposable> _disposables = new();
|
||||||
|
|
||||||
|
public MetricsEndpointTests()
|
||||||
|
{
|
||||||
|
// Host-003: connection strings are externalised; supply them via env vars.
|
||||||
|
_disposables.Add(new CentralDbTestEnvironment());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
foreach (var d in _disposables)
|
||||||
|
{
|
||||||
|
try { d.Dispose(); } catch { /* best effort */ }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private WebApplicationFactory<Program> CreateCentralFactory()
|
||||||
|
{
|
||||||
|
var factory = new WebApplicationFactory<Program>()
|
||||||
|
.WithWebHostBuilder(builder =>
|
||||||
|
{
|
||||||
|
builder.ConfigureAppConfiguration((context, config) =>
|
||||||
|
{
|
||||||
|
config.AddInMemoryCollection(new Dictionary<string, string?>
|
||||||
|
{
|
||||||
|
["ScadaBridge:Node:NodeHostname"] = "localhost",
|
||||||
|
["ScadaBridge:Node:RemotingPort"] = "0",
|
||||||
|
["ScadaBridge:Cluster:SeedNodes:0"] = "akka.tcp://scadabridge@localhost:2551",
|
||||||
|
["ScadaBridge:Cluster:SeedNodes:1"] = "akka.tcp://scadabridge@localhost:2552",
|
||||||
|
["ScadaBridge:Database:SkipMigrations"] = "true",
|
||||||
|
});
|
||||||
|
});
|
||||||
|
builder.UseSetting("ScadaBridge:Node:Role", "Central");
|
||||||
|
builder.UseSetting("ScadaBridge:Database:SkipMigrations", "true");
|
||||||
|
});
|
||||||
|
_disposables.Add(factory);
|
||||||
|
return factory;
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Metrics_Endpoint_IsMapped()
|
||||||
|
{
|
||||||
|
var previousEnv = Environment.GetEnvironmentVariable("DOTNET_ENVIRONMENT");
|
||||||
|
try
|
||||||
|
{
|
||||||
|
Environment.SetEnvironmentVariable("DOTNET_ENVIRONMENT", "Central");
|
||||||
|
var factory = CreateCentralFactory();
|
||||||
|
var client = factory.CreateClient();
|
||||||
|
_disposables.Add(client);
|
||||||
|
|
||||||
|
var response = await client.GetAsync("/metrics");
|
||||||
|
|
||||||
|
Assert.Equal(HttpStatusCode.OK, response.StatusCode);
|
||||||
|
var body = await response.Content.ReadAsStringAsync();
|
||||||
|
Assert.Contains("# ", body); // Prometheus exposition (HELP/TYPE comments)
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
Environment.SetEnvironmentVariable("DOTNET_ENVIRONMENT", previousEnv);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,32 @@
|
|||||||
|
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.ScadaBridge.Host.Tests;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Infrastructure-free guards for <see cref="ScadaBridgeTelemetry"/>: the meter name is the
|
||||||
|
/// stable value registered with OTel, and the emit helpers are safe to call (they are no-op
|
||||||
|
/// until follow-on tasks wire real emit points and a listener attaches).
|
||||||
|
/// </summary>
|
||||||
|
public class ScadaBridgeTelemetryTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public void MeterName_IsStableValue()
|
||||||
|
{
|
||||||
|
Assert.Equal("ZB.MOM.WW.ScadaBridge", ScadaBridgeTelemetry.MeterName);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void EmitHelpers_DoNotThrow()
|
||||||
|
{
|
||||||
|
var ex = Record.Exception(() =>
|
||||||
|
{
|
||||||
|
ScadaBridgeTelemetry.RecordDeploymentApplied();
|
||||||
|
ScadaBridgeTelemetry.RecordInboundApiRequest("X");
|
||||||
|
ScadaBridgeTelemetry.SiteConnectionOpened();
|
||||||
|
ScadaBridgeTelemetry.SiteConnectionClosed();
|
||||||
|
ScadaBridgeTelemetry.SetQueueDepthProvider(() => 5);
|
||||||
|
});
|
||||||
|
|
||||||
|
Assert.Null(ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -352,6 +352,105 @@ public class StartupValidatorTests
|
|||||||
Assert.Null(ex);
|
Assert.Null(ex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Theory]
|
||||||
|
[InlineData("0")]
|
||||||
|
[InlineData("-1")]
|
||||||
|
[InlineData("65536")]
|
||||||
|
[InlineData("abc")]
|
||||||
|
public void Site_InvalidMetricsPort_FailsValidation(string metricsPort)
|
||||||
|
{
|
||||||
|
var values = ValidSiteConfig();
|
||||||
|
values["ScadaBridge:Node:MetricsPort"] = metricsPort;
|
||||||
|
var config = BuildConfig(values);
|
||||||
|
|
||||||
|
var ex = Assert.Throws<InvalidOperationException>(() => StartupValidator.Validate(config));
|
||||||
|
Assert.Contains("MetricsPort must be 1-65535", ex.Message);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Site_ValidMetricsPort_PassesValidation()
|
||||||
|
{
|
||||||
|
var values = ValidSiteConfig();
|
||||||
|
values["ScadaBridge:Node:MetricsPort"] = "8084";
|
||||||
|
var config = BuildConfig(values);
|
||||||
|
|
||||||
|
var ex = Record.Exception(() => StartupValidator.Validate(config));
|
||||||
|
Assert.Null(ex);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Site_MetricsPortEqualsRemotingPort_FailsValidation()
|
||||||
|
{
|
||||||
|
// Host-007 regression: the Kestrel metrics (HTTP/1.1) listener port must
|
||||||
|
// differ from RemotingPort. Identical values cause the metrics listener
|
||||||
|
// and Akka.Remote to contend for the same port at runtime.
|
||||||
|
var values = ValidSiteConfig();
|
||||||
|
values["ScadaBridge:Node:RemotingPort"] = "8082";
|
||||||
|
values["ScadaBridge:Node:MetricsPort"] = "8082";
|
||||||
|
var config = BuildConfig(values);
|
||||||
|
|
||||||
|
var ex = Assert.Throws<InvalidOperationException>(() => StartupValidator.Validate(config));
|
||||||
|
Assert.Contains("MetricsPort must differ from RemotingPort", ex.Message);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Site_MetricsPortEqualsGrpcPort_FailsValidation()
|
||||||
|
{
|
||||||
|
// Host-007 regression: the metrics listener port must differ from GrpcPort.
|
||||||
|
var values = ValidSiteConfig();
|
||||||
|
values["ScadaBridge:Node:GrpcPort"] = "8083";
|
||||||
|
values["ScadaBridge:Node:MetricsPort"] = "8083";
|
||||||
|
var config = BuildConfig(values);
|
||||||
|
|
||||||
|
var ex = Assert.Throws<InvalidOperationException>(() => StartupValidator.Validate(config));
|
||||||
|
Assert.Contains("MetricsPort must differ from GrpcPort", ex.Message);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Site_DefaultMetricsPortEqualsRemotingPort_FailsValidation()
|
||||||
|
{
|
||||||
|
// MetricsPort absent => NodeOptions default 8084. A site whose RemotingPort
|
||||||
|
// is also 8084 must still be rejected.
|
||||||
|
var values = ValidSiteConfig();
|
||||||
|
values["ScadaBridge:Node:RemotingPort"] = "8084";
|
||||||
|
// Keep GrpcPort distinct so only the metrics-vs-remoting rule fires.
|
||||||
|
values["ScadaBridge:Node:GrpcPort"] = "8083";
|
||||||
|
// Seed nodes default to the remoting port (8082) in ValidSiteConfig; realign
|
||||||
|
// them to 8084 so the seed-vs-grpc rule is not what trips here.
|
||||||
|
values["ScadaBridge:Cluster:SeedNodes:0"] = "akka.tcp://scadabridge@site-a-node1:8084";
|
||||||
|
values["ScadaBridge:Cluster:SeedNodes:1"] = "akka.tcp://scadabridge@site-a-node2:8084";
|
||||||
|
var config = BuildConfig(values);
|
||||||
|
|
||||||
|
var ex = Assert.Throws<InvalidOperationException>(() => StartupValidator.Validate(config));
|
||||||
|
Assert.Contains("MetricsPort must differ from RemotingPort", ex.Message);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Site_MetricsPortDiffersFromRemotingAndGrpc_PassesValidation()
|
||||||
|
{
|
||||||
|
var values = ValidSiteConfig();
|
||||||
|
values["ScadaBridge:Node:RemotingPort"] = "8082";
|
||||||
|
values["ScadaBridge:Node:GrpcPort"] = "8083";
|
||||||
|
values["ScadaBridge:Node:MetricsPort"] = "8084";
|
||||||
|
var config = BuildConfig(values);
|
||||||
|
|
||||||
|
var ex = Record.Exception(() => StartupValidator.Validate(config));
|
||||||
|
Assert.Null(ex);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Central_InvalidMetricsPort_NotValidated()
|
||||||
|
{
|
||||||
|
// The metrics-port rules apply to Site nodes only; a Central node runs no
|
||||||
|
// metrics listener, so an out-of-range MetricsPort must not fail startup.
|
||||||
|
var values = ValidCentralConfig();
|
||||||
|
values["ScadaBridge:Node:MetricsPort"] = "0";
|
||||||
|
var config = BuildConfig(values);
|
||||||
|
|
||||||
|
var ex = Record.Exception(() => StartupValidator.Validate(config));
|
||||||
|
Assert.Null(ex);
|
||||||
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
public void MultipleErrors_AllReported()
|
public void MultipleErrors_AllReported()
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -9,8 +9,10 @@ using NSubstitute;
|
|||||||
using ZB.MOM.WW.ScadaBridge.Commons.Entities.InboundApi;
|
using ZB.MOM.WW.ScadaBridge.Commons.Entities.InboundApi;
|
||||||
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
|
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
|
||||||
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
|
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
|
||||||
|
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
|
||||||
using ZB.MOM.WW.ScadaBridge.Commons.Types.InboundApi;
|
using ZB.MOM.WW.ScadaBridge.Commons.Types.InboundApi;
|
||||||
using ZB.MOM.WW.ScadaBridge.InboundAPI.Middleware;
|
using ZB.MOM.WW.ScadaBridge.InboundAPI.Middleware;
|
||||||
|
using System.Diagnostics.Metrics;
|
||||||
using System.Net;
|
using System.Net;
|
||||||
using System.Text;
|
using System.Text;
|
||||||
|
|
||||||
@@ -232,6 +234,114 @@ public class EndpointExtensionsTests
|
|||||||
Assert.Equal("audit-actor-name", capture.CapturedActor);
|
Assert.Equal("audit-actor-name", capture.CapturedActor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ValidRequest_EmitsInboundApiRequestCounter_TaggedWithResolvedMethodName()
|
||||||
|
{
|
||||||
|
// Telemetry follow-on: a successful inbound request increments
|
||||||
|
// scadabridge.inbound_api.requests once, tagged with the resolved,
|
||||||
|
// registered method name (method.Name) — the bounded identifier, not the
|
||||||
|
// raw route value.
|
||||||
|
var key = SeedKey();
|
||||||
|
var method = SeedMethod(1, "echo", "return Parameters[\"value\"];",
|
||||||
|
"""[{"name":"value","type":"Integer","required":true}]""");
|
||||||
|
|
||||||
|
using var collector = new InboundApiRequestCounterCollector();
|
||||||
|
|
||||||
|
using var host = await BuildHostAsync(key, method);
|
||||||
|
var client = host.GetTestClient();
|
||||||
|
|
||||||
|
var response = await client.SendAsync(BuildPost("echo", """{"value":7}"""));
|
||||||
|
|
||||||
|
Assert.Equal(HttpStatusCode.OK, response.StatusCode);
|
||||||
|
// Filter by the method tag this test produced: the counter is a process-wide
|
||||||
|
// static, so a parallel test class could otherwise leak measurements in.
|
||||||
|
var echoTotal = collector.Measurements
|
||||||
|
.Where(m => m.Method == "echo")
|
||||||
|
.Sum(m => m.Value);
|
||||||
|
Assert.Equal(1, echoTotal);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task UnknownMethod_EmitsInboundApiRequestCounter_WithBoundedForbiddenSentinel()
|
||||||
|
{
|
||||||
|
// Telemetry follow-on: an auth/authz failure is still counted, but the
|
||||||
|
// tag is a bounded sentinel ("<forbidden>") rather than the arbitrary
|
||||||
|
// caller-supplied route value — so an attacker posting random method
|
||||||
|
// names cannot blow up the `method` tag cardinality.
|
||||||
|
var key = SeedKey();
|
||||||
|
var method = SeedMethod(1, "knownMethod", "return 1;");
|
||||||
|
|
||||||
|
using var collector = new InboundApiRequestCounterCollector();
|
||||||
|
|
||||||
|
using var host = await BuildHostAsync(key, method);
|
||||||
|
var client = host.GetTestClient();
|
||||||
|
|
||||||
|
var response = await client.SendAsync(BuildPost("totally-made-up-name", "{}"));
|
||||||
|
|
||||||
|
Assert.Equal(HttpStatusCode.Forbidden, response.StatusCode);
|
||||||
|
var measurements = collector.Measurements;
|
||||||
|
// Cardinality safety: the arbitrary route value is never used as a tag.
|
||||||
|
Assert.DoesNotContain(measurements, m => m.Method == "totally-made-up-name");
|
||||||
|
// The failure path counts the request against the bounded sentinel.
|
||||||
|
Assert.Contains(measurements, m => m.Method == "<forbidden>" && m.Value == 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Captures <c>scadabridge.inbound_api.requests</c> measurements (value + the
|
||||||
|
/// <c>method</c> tag) via a <see cref="MeterListener"/> for the duration of a test.
|
||||||
|
/// </summary>
|
||||||
|
private sealed class InboundApiRequestCounterCollector : IDisposable
|
||||||
|
{
|
||||||
|
private readonly MeterListener _listener;
|
||||||
|
private readonly List<(long Value, string? Method)> _measurements = new();
|
||||||
|
private readonly object _gate = new();
|
||||||
|
|
||||||
|
public InboundApiRequestCounterCollector()
|
||||||
|
{
|
||||||
|
_listener = new MeterListener
|
||||||
|
{
|
||||||
|
InstrumentPublished = (instrument, listener) =>
|
||||||
|
{
|
||||||
|
if (instrument.Meter.Name == ScadaBridgeTelemetry.MeterName
|
||||||
|
&& instrument.Name == "scadabridge.inbound_api.requests")
|
||||||
|
{
|
||||||
|
listener.EnableMeasurementEvents(instrument);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
};
|
||||||
|
_listener.SetMeasurementEventCallback<long>((instrument, value, tags, state) =>
|
||||||
|
{
|
||||||
|
string? method = null;
|
||||||
|
foreach (var tag in tags)
|
||||||
|
{
|
||||||
|
if (tag.Key == "method")
|
||||||
|
{
|
||||||
|
method = tag.Value as string;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
lock (_gate)
|
||||||
|
{
|
||||||
|
_measurements.Add((value, method));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
_listener.Start();
|
||||||
|
}
|
||||||
|
|
||||||
|
public IReadOnlyList<(long Value, string? Method)> Measurements
|
||||||
|
{
|
||||||
|
get
|
||||||
|
{
|
||||||
|
lock (_gate)
|
||||||
|
{
|
||||||
|
return _measurements.ToList();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Dispose() => _listener.Dispose();
|
||||||
|
}
|
||||||
|
|
||||||
private static HttpRequestMessage BuildPost(string methodName, string body)
|
private static HttpRequestMessage BuildPost(string methodName, string body)
|
||||||
{
|
{
|
||||||
var request = new HttpRequestMessage(HttpMethod.Post, "/api/" + methodName)
|
var request = new HttpRequestMessage(HttpMethod.Post, "/api/" + methodName)
|
||||||
|
|||||||
@@ -0,0 +1,211 @@
|
|||||||
|
using System.Diagnostics.Metrics;
|
||||||
|
using Microsoft.Data.Sqlite;
|
||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
|
||||||
|
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.ScadaBridge.StoreAndForward.Tests;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// WP-14 (telemetry follow-on): verifies the cached buffered-message counter that
|
||||||
|
/// backs the <c>scadabridge.store_and_forward.queue.depth</c> observable gauge tracks
|
||||||
|
/// the live (Pending) queue across the existing enqueue / drain / park / requeue paths,
|
||||||
|
/// and that the sync gauge callback reports it.
|
||||||
|
///
|
||||||
|
/// The gauge is read the way the OpenTelemetry collector reads it — via a
|
||||||
|
/// <see cref="MeterListener"/> that forces an observation (the callback is synchronous
|
||||||
|
/// and does no I/O, which is the whole point of caching the count). <see cref="StartAsync"/>
|
||||||
|
/// seeds the counter from storage and registers the provider against this service
|
||||||
|
/// instance, so the gauge resolves to this test's counter.
|
||||||
|
/// </summary>
|
||||||
|
public class QueueDepthGaugeTests : IAsyncLifetime, IDisposable
|
||||||
|
{
|
||||||
|
private readonly SqliteConnection _keepAlive;
|
||||||
|
private readonly StoreAndForwardStorage _storage;
|
||||||
|
private readonly StoreAndForwardService _service;
|
||||||
|
|
||||||
|
public QueueDepthGaugeTests()
|
||||||
|
{
|
||||||
|
var dbName = $"QueueDepthTests_{Guid.NewGuid():N}";
|
||||||
|
var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared";
|
||||||
|
_keepAlive = new SqliteConnection(connStr);
|
||||||
|
_keepAlive.Open();
|
||||||
|
|
||||||
|
_storage = new StoreAndForwardStorage(connStr, NullLogger<StoreAndForwardStorage>.Instance);
|
||||||
|
|
||||||
|
var options = new StoreAndForwardOptions
|
||||||
|
{
|
||||||
|
DefaultRetryInterval = TimeSpan.Zero,
|
||||||
|
DefaultMaxRetries = 3,
|
||||||
|
// Long interval so no background sweep fires on its own during the test;
|
||||||
|
// sweeps are driven explicitly via RetryPendingMessagesAsync.
|
||||||
|
RetryTimerInterval = TimeSpan.FromMinutes(10)
|
||||||
|
};
|
||||||
|
|
||||||
|
_service = new StoreAndForwardService(
|
||||||
|
_storage, options, NullLogger<StoreAndForwardService>.Instance);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task InitializeAsync()
|
||||||
|
{
|
||||||
|
await _storage.InitializeAsync();
|
||||||
|
// StartAsync seeds _bufferedCount from the (empty) store and registers the
|
||||||
|
// queue-depth provider against this service instance.
|
||||||
|
await _service.StartAsync();
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task DisposeAsync() => await _service.StopAsync();
|
||||||
|
|
||||||
|
public void Dispose() => _keepAlive.Dispose();
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Reads the current value of the <c>scadabridge.store_and_forward.queue.depth</c>
|
||||||
|
/// gauge by forcing a synchronous observation through a transient MeterListener —
|
||||||
|
/// exactly the path the Prometheus/OTLP collector exercises on each scrape.
|
||||||
|
/// </summary>
|
||||||
|
private static long ReadQueueDepthGauge()
|
||||||
|
{
|
||||||
|
long observed = -1;
|
||||||
|
using var listener = new MeterListener
|
||||||
|
{
|
||||||
|
InstrumentPublished = (instrument, l) =>
|
||||||
|
{
|
||||||
|
if (instrument.Meter.Name == ScadaBridgeTelemetry.MeterName &&
|
||||||
|
instrument.Name == "scadabridge.store_and_forward.queue.depth")
|
||||||
|
{
|
||||||
|
l.EnableMeasurementEvents(instrument);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
listener.SetMeasurementEventCallback<long>((_, measurement, _, _) => observed = measurement);
|
||||||
|
listener.Start();
|
||||||
|
listener.RecordObservableInstruments();
|
||||||
|
return observed;
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Gauge_TracksBufferedDepth_AcrossEnqueueDrainAndPark()
|
||||||
|
{
|
||||||
|
// Empty store seeded at StartAsync → gauge reports 0.
|
||||||
|
Assert.Equal(0, ReadQueueDepthGauge());
|
||||||
|
|
||||||
|
// A handler that fails transiently so each enqueue buffers a Pending row
|
||||||
|
// (immediate attempt 0 throws → BufferAsync → +1).
|
||||||
|
var deliver = false;
|
||||||
|
_service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem,
|
||||||
|
_ =>
|
||||||
|
{
|
||||||
|
if (!deliver) throw new HttpRequestException("transient");
|
||||||
|
return Task.FromResult(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
// Enqueue 3 → cached depth = 3 → gauge reports 3.
|
||||||
|
for (var i = 0; i < 3; i++)
|
||||||
|
{
|
||||||
|
var r = await _service.EnqueueAsync(
|
||||||
|
StoreAndForwardCategory.ExternalSystem, "api", """{}""");
|
||||||
|
Assert.True(r.WasBuffered);
|
||||||
|
}
|
||||||
|
Assert.Equal(3, ReadQueueDepthGauge());
|
||||||
|
|
||||||
|
// Drain: handler now succeeds → the retry sweep removes all 3 Pending rows → depth 0.
|
||||||
|
deliver = true;
|
||||||
|
await _service.RetryPendingMessagesAsync();
|
||||||
|
Assert.Equal(0, ReadQueueDepthGauge());
|
||||||
|
|
||||||
|
// Park path: buffer one more, then make it park (maxRetries:1 parks after one
|
||||||
|
// sweep). Pending→Parked leaves the live queue → depth back to 0.
|
||||||
|
deliver = false;
|
||||||
|
var parkResult = await _service.EnqueueAsync(
|
||||||
|
StoreAndForwardCategory.ExternalSystem, "api", """{}""", maxRetries: 1);
|
||||||
|
Assert.True(parkResult.WasBuffered);
|
||||||
|
Assert.Equal(1, ReadQueueDepthGauge());
|
||||||
|
|
||||||
|
await _service.RetryPendingMessagesAsync();
|
||||||
|
var parked = await _storage.GetMessageByIdAsync(parkResult.MessageId);
|
||||||
|
Assert.Equal(StoreAndForwardMessageStatus.Parked, parked!.Status);
|
||||||
|
Assert.Equal(0, ReadQueueDepthGauge());
|
||||||
|
|
||||||
|
// Operator requeue: Parked→Pending re-adds to the live queue → depth 1.
|
||||||
|
Assert.True(await _service.RetryParkedMessageAsync(parkResult.MessageId));
|
||||||
|
Assert.Equal(1, ReadQueueDepthGauge());
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Gauge_SeedsFromExistingPendingRows_OnStart()
|
||||||
|
{
|
||||||
|
// Pre-seed two Pending rows directly in storage *before* a fresh service starts,
|
||||||
|
// simulating a process restart over a non-empty buffer. StartAsync must seed the
|
||||||
|
// cached counter from the store so the gauge does not under-report on restart.
|
||||||
|
await _storage.EnqueueAsync(new StoreAndForwardMessage
|
||||||
|
{
|
||||||
|
Id = Guid.NewGuid().ToString("N"),
|
||||||
|
Category = StoreAndForwardCategory.ExternalSystem,
|
||||||
|
Target = "api",
|
||||||
|
PayloadJson = "{}",
|
||||||
|
Status = StoreAndForwardMessageStatus.Pending,
|
||||||
|
CreatedAt = DateTimeOffset.UtcNow,
|
||||||
|
MaxRetries = 3
|
||||||
|
});
|
||||||
|
await _storage.EnqueueAsync(new StoreAndForwardMessage
|
||||||
|
{
|
||||||
|
Id = Guid.NewGuid().ToString("N"),
|
||||||
|
Category = StoreAndForwardCategory.Notification,
|
||||||
|
Target = "list",
|
||||||
|
PayloadJson = "{}",
|
||||||
|
Status = StoreAndForwardMessageStatus.Pending,
|
||||||
|
CreatedAt = DateTimeOffset.UtcNow,
|
||||||
|
MaxRetries = 3
|
||||||
|
});
|
||||||
|
|
||||||
|
var fresh = new StoreAndForwardService(
|
||||||
|
_storage,
|
||||||
|
new StoreAndForwardOptions { RetryTimerInterval = TimeSpan.FromMinutes(10) },
|
||||||
|
NullLogger<StoreAndForwardService>.Instance);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await fresh.StartAsync();
|
||||||
|
// The fresh service registered itself as the global provider and seeded 2.
|
||||||
|
Assert.Equal(2, ReadQueueDepthGauge());
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
await fresh.StopAsync();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Review finding (FINDING 1): the startup seed must ADD to whatever the counter
|
||||||
|
/// already holds, not overwrite it. A concurrent <c>BufferAsync</c> can
|
||||||
|
/// <c>Interlocked.Increment</c> <c>_bufferedCount</c> in the window between
|
||||||
|
/// <c>StartAsync</c>'s async <c>COUNT(*)</c> returning and the seed running; with an
|
||||||
|
/// <c>Interlocked.Exchange</c> seed that increment would be clobbered (lost +1). This
|
||||||
|
/// pre-increments the in-memory counter (standing in for that concurrent enqueue),
|
||||||
|
/// then starts the service over an empty store and asserts the pre-increment survives.
|
||||||
|
/// </summary>
|
||||||
|
[Fact]
|
||||||
|
public async Task Gauge_SeedAddsToConcurrentPreSeedIncrement_NotClobber()
|
||||||
|
{
|
||||||
|
// Store is empty (StartAsync's pending COUNT(*) = 0), so the only contribution
|
||||||
|
// is the simulated concurrent pre-seed enqueue increment.
|
||||||
|
var fresh = new StoreAndForwardService(
|
||||||
|
_storage,
|
||||||
|
new StoreAndForwardOptions { RetryTimerInterval = TimeSpan.FromMinutes(10) },
|
||||||
|
NullLogger<StoreAndForwardService>.Instance);
|
||||||
|
|
||||||
|
// Stand in for a BufferAsync increment that landed before StartAsync seeded.
|
||||||
|
fresh.TestOnly_IncrementBufferedCount();
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await fresh.StartAsync();
|
||||||
|
// Add(0 seed) over the pre-existing +1 → 1. An Exchange(0) seed would clobber
|
||||||
|
// it to 0, losing the concurrent enqueue — the bug this fix prevents.
|
||||||
|
Assert.Equal(1, ReadQueueDepthGauge());
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
await fresh.StopAsync();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user