OpenTelemetry redundancy metrics + RoleChanged SignalR push. Closes instrumentation + live-push slices of task #198; the exporter wiring (OTLP vs Prometheus package decision) is split to new task #201 because the collector/scrape-endpoint choice is a fleet-ops decision that deserves its own PR rather than hardcoded here. New RedundancyMetrics class (Singleton-registered in DI) owning a System.Diagnostics.Metrics.Meter("ZB.MOM.WW.OtOpcUa.Redundancy", "1.0.0"). Three ObservableGauge instruments — otopcua.redundancy.primary_count / secondary_count / stale_count — all tagged by cluster.id, populated by SetClusterCounts(clusterId, primary, secondary, stale) which the poller calls at the tail of every tick; ObservableGauge callbacks snapshot the last value set under a lock so the reader (OTel collector, dotnet-counters) sees consistent tuples. One Counter — otopcua.redundancy.role_transition — tagged cluster.id, node.id, from_role, to_role; ideal for tracking "how often does Cluster-X failover" + "which node transitions most" aggregate queries. In-box Metrics API means zero NuGet dep here — the exporter PR adds OpenTelemetry.Extensions.Hosting + OpenTelemetry.Exporter.OpenTelemetryProtocol or OpenTelemetry.Exporter.Prometheus.AspNetCore to actually ship the data somewhere. FleetStatusPoller extended with role-change detection. Its PollOnceAsync now pulls ClusterNode rows alongside the existing ClusterNodeGenerationState scan, and a new PollRolesAsync walks every node comparing RedundancyRole to the _lastRole cache. On change: records the transition to RedundancyMetrics + emits a RoleChanged SignalR message to both FleetStatusHub.GroupName(cluster) + FleetStatusHub.FleetGroup so cluster-scoped + fleet-wide subscribers both see it. First observation per node is a bootstrap (cache fill) + NOT a transition — avoids spurious churn on service startup or pod restart. UpdateClusterGauges groups nodes by cluster + sets the three gauge values, using ClusterNodeService.StaleThreshold (shared 30s convention) for staleness so the /hosts page + the gauge agree. RoleChangedMessage record lives alongside NodeStateChangedMessage in FleetStatusPoller.cs. RedundancyTab.razor subscribes to the fleet-status hub on first parameters-set, filters RoleChanged events to the current cluster, reloads the node list + paints a blue info banner ("Role changed on node-a: Primary → Secondary at HH:mm:ss UTC") so operators see the transition without needing to poll-refresh the page. IAsyncDisposable closes the connection on tab swap-away. Two new RedundancyMetricsTests covering RecordRoleTransition tag emission (cluster.id + node.id + from_role + to_role all flow through the MeterListener callback) + ObservableGauge snapshot for two clusters (assert primary_count=1 for c1, stale_count=1 for c2). Existing FleetStatusPollerTests ctor-line updated to pass a RedundancyMetrics instance; all tests still pass. Full Admin.Tests suite 87/87 passing (was 85, +2). Admin project builds 0 errors. Task #201 captures the exporter-wiring follow-up — OpenTelemetry.Extensions.Hosting + OTLP vs Prometheus + /metrics endpoint decision, driven by fleet-ops infra direction.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-04-19 23:16:09 -04:00
parent 251f567b98
commit 1f3343e61f
6 changed files with 279 additions and 4 deletions

View File

@@ -1,9 +1,17 @@
@using Microsoft.AspNetCore.SignalR.Client
@using ZB.MOM.WW.OtOpcUa.Admin.Hubs
@using ZB.MOM.WW.OtOpcUa.Admin.Services
@using ZB.MOM.WW.OtOpcUa.Configuration.Entities
@using ZB.MOM.WW.OtOpcUa.Configuration.Enums
@inject ClusterNodeService NodeSvc
@inject NavigationManager Nav
@implements IAsyncDisposable
<h4>Redundancy topology</h4>
@if (_roleChangedBanner is not null)
{
<div class="alert alert-info small mb-2">@_roleChangedBanner</div>
}
<p class="text-muted small">
One row per <code>ClusterNode</code> in this cluster. Role, <code>ApplicationUri</code>,
and <code>ServiceLevelBase</code> are authored separately; the Admin UI shows them read-only
@@ -107,10 +115,41 @@ else
[Parameter] public string ClusterId { get; set; } = string.Empty;
private List<ClusterNode>? _nodes;
private HubConnection? _hub;
private string? _roleChangedBanner;
protected override async Task OnParametersSetAsync()
{
_nodes = await NodeSvc.ListByClusterAsync(ClusterId, CancellationToken.None);
if (_hub is null) await ConnectHubAsync();
}
private async Task ConnectHubAsync()
{
_hub = new HubConnectionBuilder()
.WithUrl(Nav.ToAbsoluteUri("/hubs/fleet-status"))
.WithAutomaticReconnect()
.Build();
_hub.On<RoleChangedMessage>("RoleChanged", async msg =>
{
if (msg.ClusterId != ClusterId) return;
_roleChangedBanner = $"Role changed on {msg.NodeId}: {msg.FromRole} → {msg.ToRole} at {msg.ObservedAtUtc:HH:mm:ss 'UTC'}";
_nodes = await NodeSvc.ListByClusterAsync(ClusterId, CancellationToken.None);
await InvokeAsync(StateHasChanged);
});
await _hub.StartAsync();
await _hub.SendAsync("SubscribeCluster", ClusterId);
}
public async ValueTask DisposeAsync()
{
if (_hub is not null)
{
await _hub.DisposeAsync();
_hub = null;
}
}
private static string RowClass(ClusterNode n) =>

View File

@@ -1,7 +1,9 @@
using Microsoft.AspNetCore.SignalR;
using Microsoft.EntityFrameworkCore;
using ZB.MOM.WW.OtOpcUa.Admin.Services;
using ZB.MOM.WW.OtOpcUa.Configuration;
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
namespace ZB.MOM.WW.OtOpcUa.Admin.Hubs;
@@ -14,11 +16,13 @@ public sealed class FleetStatusPoller(
IServiceScopeFactory scopeFactory,
IHubContext<FleetStatusHub> fleetHub,
IHubContext<AlertHub> alertHub,
ILogger<FleetStatusPoller> logger) : BackgroundService
ILogger<FleetStatusPoller> logger,
RedundancyMetrics redundancyMetrics) : BackgroundService
{
public TimeSpan PollInterval { get; init; } = TimeSpan.FromSeconds(5);
private readonly Dictionary<string, NodeStateSnapshot> _last = new();
private readonly Dictionary<string, RedundancyRole> _lastRole = new(StringComparer.Ordinal);
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
@@ -42,6 +46,10 @@ public sealed class FleetStatusPoller(
using var scope = scopeFactory.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<OtOpcUaConfigDbContext>();
var nodes = await db.ClusterNodes.AsNoTracking().ToListAsync(ct);
await PollRolesAsync(nodes, ct);
UpdateClusterGauges(nodes);
var rows = await db.ClusterNodeGenerationStates.AsNoTracking()
.Join(db.ClusterNodes.AsNoTracking(), s => s.NodeId, n => n.NodeId, (s, n) => new { s, n.ClusterId })
.ToListAsync(ct);
@@ -85,9 +93,63 @@ public sealed class FleetStatusPoller(
}
/// <summary>Exposed for tests — forces a snapshot reset so stub data re-seeds.</summary>
internal void ResetCache() => _last.Clear();
internal void ResetCache()
{
_last.Clear();
_lastRole.Clear();
}
private async Task PollRolesAsync(IReadOnlyList<ClusterNode> nodes, CancellationToken ct)
{
foreach (var n in nodes)
{
var hadPrior = _lastRole.TryGetValue(n.NodeId, out var priorRole);
if (hadPrior && priorRole == n.RedundancyRole) continue;
_lastRole[n.NodeId] = n.RedundancyRole;
if (!hadPrior) continue; // first-observation bootstrap — not a transition
redundancyMetrics.RecordRoleTransition(
clusterId: n.ClusterId, nodeId: n.NodeId,
fromRole: priorRole.ToString(), toRole: n.RedundancyRole.ToString());
var msg = new RoleChangedMessage(
ClusterId: n.ClusterId, NodeId: n.NodeId,
FromRole: priorRole.ToString(), ToRole: n.RedundancyRole.ToString(),
ObservedAtUtc: DateTime.UtcNow);
await fleetHub.Clients.Group(FleetStatusHub.GroupName(n.ClusterId))
.SendAsync("RoleChanged", msg, ct);
await fleetHub.Clients.Group(FleetStatusHub.FleetGroup)
.SendAsync("RoleChanged", msg, ct);
}
}
private void UpdateClusterGauges(IReadOnlyList<ClusterNode> nodes)
{
var staleCutoff = DateTime.UtcNow - Services.ClusterNodeService.StaleThreshold;
foreach (var group in nodes.GroupBy(n => n.ClusterId))
{
var primary = group.Count(n => n.RedundancyRole == RedundancyRole.Primary);
var secondary = group.Count(n => n.RedundancyRole == RedundancyRole.Secondary);
var stale = group.Count(n => n.LastSeenAt is null || n.LastSeenAt.Value < staleCutoff);
redundancyMetrics.SetClusterCounts(group.Key, primary, secondary, stale);
}
}
private readonly record struct NodeStateSnapshot(
string NodeId, string ClusterId, long? GenerationId,
string? Status, string? Error, DateTime? AppliedAt, DateTime? SeenAt);
}
/// <summary>
/// Pushed by <see cref="FleetStatusPoller"/> when it observes a change in
/// <see cref="ClusterNode.RedundancyRole"/>. Consumed by the Admin RedundancyTab to trigger
/// an instant reload instead of waiting for the next on-parameter-set poll.
/// </summary>
public sealed record RoleChangedMessage(
string ClusterId,
string NodeId,
string FromRole,
string ToRole,
DateTime ObservedAtUtc);

View File

@@ -49,6 +49,7 @@ builder.Services.AddScoped<DraftValidationService>();
builder.Services.AddScoped<AuditLogService>();
builder.Services.AddScoped<HostStatusService>();
builder.Services.AddScoped<ClusterNodeService>();
builder.Services.AddSingleton<RedundancyMetrics>();
builder.Services.AddScoped<EquipmentImportBatchService>();
builder.Services.AddScoped<ZB.MOM.WW.OtOpcUa.Configuration.Services.ILdapGroupRoleMappingService,
ZB.MOM.WW.OtOpcUa.Configuration.Services.LdapGroupRoleMappingService>();

View File

@@ -0,0 +1,102 @@
using System.Diagnostics.Metrics;
namespace ZB.MOM.WW.OtOpcUa.Admin.Services;
/// <summary>
/// OpenTelemetry-compatible instrumentation for the redundancy surface. Uses in-box
/// <see cref="System.Diagnostics.Metrics"/> so no NuGet dependency is required to emit —
/// any MeterListener (dotnet-counters, OpenTelemetry.Extensions.Hosting OTLP exporter,
/// Prometheus exporter, etc.) picks up the instruments by the <see cref="MeterName"/>.
/// </summary>
/// <remarks>
/// Exporter configuration (OTLP, Prometheus, etc.) is intentionally NOT wired here —
/// that's a deployment-ops decision that belongs in <c>Program.cs</c> behind an
/// <c>appsettings</c> toggle. This class owns only the Meter + instruments so the
/// production data stream exists regardless of exporter availability.
///
/// Counter + gauge names follow the otel-semantic-conventions pattern:
/// <c>otopcua.redundancy.*</c> with tags for ClusterId + (for transitions) FromRole/ToRole/NodeId.
/// </remarks>
public sealed class RedundancyMetrics : IDisposable
{
public const string MeterName = "ZB.MOM.WW.OtOpcUa.Redundancy";
private readonly Meter _meter;
private readonly Counter<long> _roleTransitions;
private readonly object _gaugeLock = new();
private readonly Dictionary<string, ClusterGaugeState> _gaugeState = new();
public RedundancyMetrics()
{
_meter = new Meter(MeterName, version: "1.0.0");
_roleTransitions = _meter.CreateCounter<long>(
"otopcua.redundancy.role_transition",
unit: "{transition}",
description: "Observed RedundancyRole changes per node — tagged FromRole, ToRole, NodeId, ClusterId.");
// Observable gauges — the callback reports whatever the last Observe*Count call stashed.
_meter.CreateObservableGauge(
"otopcua.redundancy.primary_count",
ObservePrimaryCounts,
unit: "{node}",
description: "Count of Primary-role nodes per cluster (should be 1 for N+1 redundant clusters, 0 during failover).");
_meter.CreateObservableGauge(
"otopcua.redundancy.secondary_count",
ObserveSecondaryCounts,
unit: "{node}",
description: "Count of Secondary-role nodes per cluster.");
_meter.CreateObservableGauge(
"otopcua.redundancy.stale_count",
ObserveStaleCounts,
unit: "{node}",
description: "Count of cluster nodes whose LastSeenAt is older than StaleThreshold.");
}
/// <summary>
/// Update the per-cluster snapshot consumed by the ObservableGauges. Poller calls this
/// at the end of every tick so the collectors see fresh numbers on the next observation
/// window (by default 1s for dotnet-counters, configurable per exporter).
/// </summary>
public void SetClusterCounts(string clusterId, int primary, int secondary, int stale)
{
lock (_gaugeLock)
{
_gaugeState[clusterId] = new ClusterGaugeState(primary, secondary, stale);
}
}
/// <summary>
/// Increment the role_transition counter when a node's RedundancyRole changes. Tags
/// allow breakdowns by from/to roles (e.g. Primary → Secondary for planned failover vs
/// Primary → Standalone for emergency recovery) + by cluster for multi-site fleets.
/// </summary>
public void RecordRoleTransition(string clusterId, string nodeId, string fromRole, string toRole)
{
_roleTransitions.Add(1,
new KeyValuePair<string, object?>("cluster.id", clusterId),
new KeyValuePair<string, object?>("node.id", nodeId),
new KeyValuePair<string, object?>("from_role", fromRole),
new KeyValuePair<string, object?>("to_role", toRole));
}
public void Dispose() => _meter.Dispose();
private IEnumerable<Measurement<long>> ObservePrimaryCounts() => SnapshotGauge(s => s.Primary);
private IEnumerable<Measurement<long>> ObserveSecondaryCounts() => SnapshotGauge(s => s.Secondary);
private IEnumerable<Measurement<long>> ObserveStaleCounts() => SnapshotGauge(s => s.Stale);
private IEnumerable<Measurement<long>> SnapshotGauge(Func<ClusterGaugeState, int> selector)
{
List<Measurement<long>> results;
lock (_gaugeLock)
{
results = new List<Measurement<long>>(_gaugeState.Count);
foreach (var (cluster, state) in _gaugeState)
results.Add(new Measurement<long>(selector(state),
new KeyValuePair<string, object?>("cluster.id", cluster)));
}
return results;
}
private readonly record struct ClusterGaugeState(int Primary, int Secondary, int Stale);
}

View File

@@ -5,6 +5,7 @@ using Microsoft.Extensions.Logging.Abstractions;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Admin.Hubs;
using ZB.MOM.WW.OtOpcUa.Admin.Services;
using ZB.MOM.WW.OtOpcUa.Configuration;
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
@@ -97,7 +98,7 @@ END";
var poller = new FleetStatusPoller(
_sp.GetRequiredService<IServiceScopeFactory>(),
fleetHub, alertHub, NullLogger<FleetStatusPoller>.Instance);
fleetHub, alertHub, NullLogger<FleetStatusPoller>.Instance, new RedundancyMetrics());
await poller.PollOnceAsync(CancellationToken.None);
@@ -142,7 +143,7 @@ END";
var poller = new FleetStatusPoller(
_sp.GetRequiredService<IServiceScopeFactory>(),
fleetHub, alertHub, NullLogger<FleetStatusPoller>.Instance);
fleetHub, alertHub, NullLogger<FleetStatusPoller>.Instance, new RedundancyMetrics());
await poller.PollOnceAsync(CancellationToken.None);

View File

@@ -0,0 +1,70 @@
using System.Diagnostics.Metrics;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Admin.Services;
namespace ZB.MOM.WW.OtOpcUa.Admin.Tests;
[Trait("Category", "Unit")]
public sealed class RedundancyMetricsTests
{
[Fact]
public void RecordRoleTransition_Increments_Counter_WithExpectedTags()
{
using var metrics = new RedundancyMetrics();
using var listener = new MeterListener();
var observed = new List<(long Value, Dictionary<string, object?> Tags)>();
listener.InstrumentPublished = (instrument, l) =>
{
if (instrument.Meter.Name == RedundancyMetrics.MeterName &&
instrument.Name == "otopcua.redundancy.role_transition")
{
l.EnableMeasurementEvents(instrument);
}
};
listener.SetMeasurementEventCallback<long>((_, value, tags, _) =>
{
var dict = new Dictionary<string, object?>();
foreach (var tag in tags) dict[tag.Key] = tag.Value;
observed.Add((value, dict));
});
listener.Start();
metrics.RecordRoleTransition("c1", "node-a", "Primary", "Secondary");
observed.Count.ShouldBe(1);
observed[0].Value.ShouldBe(1);
observed[0].Tags["cluster.id"].ShouldBe("c1");
observed[0].Tags["node.id"].ShouldBe("node-a");
observed[0].Tags["from_role"].ShouldBe("Primary");
observed[0].Tags["to_role"].ShouldBe("Secondary");
}
[Fact]
public void SetClusterCounts_Observed_Via_ObservableGauges()
{
using var metrics = new RedundancyMetrics();
metrics.SetClusterCounts("c1", primary: 1, secondary: 2, stale: 0);
metrics.SetClusterCounts("c2", primary: 0, secondary: 1, stale: 1);
var observations = new List<(string Name, long Value, string Cluster)>();
using var listener = new MeterListener();
listener.InstrumentPublished = (instrument, l) =>
{
if (instrument.Meter.Name == RedundancyMetrics.MeterName)
l.EnableMeasurementEvents(instrument);
};
listener.SetMeasurementEventCallback<long>((instrument, value, tags, _) =>
{
string? cluster = null;
foreach (var t in tags) if (t.Key == "cluster.id") cluster = t.Value as string;
observations.Add((instrument.Name, value, cluster ?? "?"));
});
listener.Start();
listener.RecordObservableInstruments();
observations.ShouldContain(o => o.Name == "otopcua.redundancy.primary_count" && o.Cluster == "c1" && o.Value == 1);
observations.ShouldContain(o => o.Name == "otopcua.redundancy.secondary_count" && o.Cluster == "c1" && o.Value == 2);
observations.ShouldContain(o => o.Name == "otopcua.redundancy.stale_count" && o.Cluster == "c2" && o.Value == 1);
}
}