Merge pull request (#141) - Redundancy OTel + SignalR
This commit was merged in pull request #141.
This commit is contained in:
@@ -1,9 +1,17 @@
|
||||
@using Microsoft.AspNetCore.SignalR.Client
|
||||
@using ZB.MOM.WW.OtOpcUa.Admin.Hubs
|
||||
@using ZB.MOM.WW.OtOpcUa.Admin.Services
|
||||
@using ZB.MOM.WW.OtOpcUa.Configuration.Entities
|
||||
@using ZB.MOM.WW.OtOpcUa.Configuration.Enums
|
||||
@inject ClusterNodeService NodeSvc
|
||||
@inject NavigationManager Nav
|
||||
@implements IAsyncDisposable
|
||||
|
||||
<h4>Redundancy topology</h4>
|
||||
@if (_roleChangedBanner is not null)
|
||||
{
|
||||
<div class="alert alert-info small mb-2">@_roleChangedBanner</div>
|
||||
}
|
||||
<p class="text-muted small">
|
||||
One row per <code>ClusterNode</code> in this cluster. Role, <code>ApplicationUri</code>,
|
||||
and <code>ServiceLevelBase</code> are authored separately; the Admin UI shows them read-only
|
||||
@@ -107,10 +115,41 @@ else
|
||||
[Parameter] public string ClusterId { get; set; } = string.Empty;
|
||||
|
||||
private List<ClusterNode>? _nodes;
|
||||
private HubConnection? _hub;
|
||||
private string? _roleChangedBanner;
|
||||
|
||||
protected override async Task OnParametersSetAsync()
|
||||
{
|
||||
_nodes = await NodeSvc.ListByClusterAsync(ClusterId, CancellationToken.None);
|
||||
if (_hub is null) await ConnectHubAsync();
|
||||
}
|
||||
|
||||
private async Task ConnectHubAsync()
|
||||
{
|
||||
_hub = new HubConnectionBuilder()
|
||||
.WithUrl(Nav.ToAbsoluteUri("/hubs/fleet-status"))
|
||||
.WithAutomaticReconnect()
|
||||
.Build();
|
||||
|
||||
_hub.On<RoleChangedMessage>("RoleChanged", async msg =>
|
||||
{
|
||||
if (msg.ClusterId != ClusterId) return;
|
||||
_roleChangedBanner = $"Role changed on {msg.NodeId}: {msg.FromRole} → {msg.ToRole} at {msg.ObservedAtUtc:HH:mm:ss 'UTC'}";
|
||||
_nodes = await NodeSvc.ListByClusterAsync(ClusterId, CancellationToken.None);
|
||||
await InvokeAsync(StateHasChanged);
|
||||
});
|
||||
|
||||
await _hub.StartAsync();
|
||||
await _hub.SendAsync("SubscribeCluster", ClusterId);
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
if (_hub is not null)
|
||||
{
|
||||
await _hub.DisposeAsync();
|
||||
_hub = null;
|
||||
}
|
||||
}
|
||||
|
||||
private static string RowClass(ClusterNode n) =>
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
using Microsoft.AspNetCore.SignalR;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using ZB.MOM.WW.OtOpcUa.Admin.Services;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Admin.Hubs;
|
||||
|
||||
@@ -14,11 +16,13 @@ public sealed class FleetStatusPoller(
|
||||
IServiceScopeFactory scopeFactory,
|
||||
IHubContext<FleetStatusHub> fleetHub,
|
||||
IHubContext<AlertHub> alertHub,
|
||||
ILogger<FleetStatusPoller> logger) : BackgroundService
|
||||
ILogger<FleetStatusPoller> logger,
|
||||
RedundancyMetrics redundancyMetrics) : BackgroundService
|
||||
{
|
||||
public TimeSpan PollInterval { get; init; } = TimeSpan.FromSeconds(5);
|
||||
|
||||
private readonly Dictionary<string, NodeStateSnapshot> _last = new();
|
||||
private readonly Dictionary<string, RedundancyRole> _lastRole = new(StringComparer.Ordinal);
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
@@ -42,6 +46,10 @@ public sealed class FleetStatusPoller(
|
||||
using var scope = scopeFactory.CreateScope();
|
||||
var db = scope.ServiceProvider.GetRequiredService<OtOpcUaConfigDbContext>();
|
||||
|
||||
var nodes = await db.ClusterNodes.AsNoTracking().ToListAsync(ct);
|
||||
await PollRolesAsync(nodes, ct);
|
||||
UpdateClusterGauges(nodes);
|
||||
|
||||
var rows = await db.ClusterNodeGenerationStates.AsNoTracking()
|
||||
.Join(db.ClusterNodes.AsNoTracking(), s => s.NodeId, n => n.NodeId, (s, n) => new { s, n.ClusterId })
|
||||
.ToListAsync(ct);
|
||||
@@ -85,9 +93,63 @@ public sealed class FleetStatusPoller(
|
||||
}
|
||||
|
||||
/// <summary>Exposed for tests — forces a snapshot reset so stub data re-seeds.</summary>
|
||||
internal void ResetCache() => _last.Clear();
|
||||
internal void ResetCache()
|
||||
{
|
||||
_last.Clear();
|
||||
_lastRole.Clear();
|
||||
}
|
||||
|
||||
private async Task PollRolesAsync(IReadOnlyList<ClusterNode> nodes, CancellationToken ct)
|
||||
{
|
||||
foreach (var n in nodes)
|
||||
{
|
||||
var hadPrior = _lastRole.TryGetValue(n.NodeId, out var priorRole);
|
||||
if (hadPrior && priorRole == n.RedundancyRole) continue;
|
||||
|
||||
_lastRole[n.NodeId] = n.RedundancyRole;
|
||||
if (!hadPrior) continue; // first-observation bootstrap — not a transition
|
||||
|
||||
redundancyMetrics.RecordRoleTransition(
|
||||
clusterId: n.ClusterId, nodeId: n.NodeId,
|
||||
fromRole: priorRole.ToString(), toRole: n.RedundancyRole.ToString());
|
||||
|
||||
var msg = new RoleChangedMessage(
|
||||
ClusterId: n.ClusterId, NodeId: n.NodeId,
|
||||
FromRole: priorRole.ToString(), ToRole: n.RedundancyRole.ToString(),
|
||||
ObservedAtUtc: DateTime.UtcNow);
|
||||
|
||||
await fleetHub.Clients.Group(FleetStatusHub.GroupName(n.ClusterId))
|
||||
.SendAsync("RoleChanged", msg, ct);
|
||||
await fleetHub.Clients.Group(FleetStatusHub.FleetGroup)
|
||||
.SendAsync("RoleChanged", msg, ct);
|
||||
}
|
||||
}
|
||||
|
||||
private void UpdateClusterGauges(IReadOnlyList<ClusterNode> nodes)
|
||||
{
|
||||
var staleCutoff = DateTime.UtcNow - Services.ClusterNodeService.StaleThreshold;
|
||||
foreach (var group in nodes.GroupBy(n => n.ClusterId))
|
||||
{
|
||||
var primary = group.Count(n => n.RedundancyRole == RedundancyRole.Primary);
|
||||
var secondary = group.Count(n => n.RedundancyRole == RedundancyRole.Secondary);
|
||||
var stale = group.Count(n => n.LastSeenAt is null || n.LastSeenAt.Value < staleCutoff);
|
||||
redundancyMetrics.SetClusterCounts(group.Key, primary, secondary, stale);
|
||||
}
|
||||
}
|
||||
|
||||
private readonly record struct NodeStateSnapshot(
|
||||
string NodeId, string ClusterId, long? GenerationId,
|
||||
string? Status, string? Error, DateTime? AppliedAt, DateTime? SeenAt);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Pushed by <see cref="FleetStatusPoller"/> when it observes a change in
|
||||
/// <see cref="ClusterNode.RedundancyRole"/>. Consumed by the Admin RedundancyTab to trigger
|
||||
/// an instant reload instead of waiting for the next on-parameter-set poll.
|
||||
/// </summary>
|
||||
public sealed record RoleChangedMessage(
|
||||
string ClusterId,
|
||||
string NodeId,
|
||||
string FromRole,
|
||||
string ToRole,
|
||||
DateTime ObservedAtUtc);
|
||||
|
||||
@@ -49,6 +49,7 @@ builder.Services.AddScoped<DraftValidationService>();
|
||||
builder.Services.AddScoped<AuditLogService>();
|
||||
builder.Services.AddScoped<HostStatusService>();
|
||||
builder.Services.AddScoped<ClusterNodeService>();
|
||||
builder.Services.AddSingleton<RedundancyMetrics>();
|
||||
builder.Services.AddScoped<EquipmentImportBatchService>();
|
||||
builder.Services.AddScoped<ZB.MOM.WW.OtOpcUa.Configuration.Services.ILdapGroupRoleMappingService,
|
||||
ZB.MOM.WW.OtOpcUa.Configuration.Services.LdapGroupRoleMappingService>();
|
||||
|
||||
102
src/ZB.MOM.WW.OtOpcUa.Admin/Services/RedundancyMetrics.cs
Normal file
102
src/ZB.MOM.WW.OtOpcUa.Admin/Services/RedundancyMetrics.cs
Normal file
@@ -0,0 +1,102 @@
|
||||
using System.Diagnostics.Metrics;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Admin.Services;
|
||||
|
||||
/// <summary>
|
||||
/// OpenTelemetry-compatible instrumentation for the redundancy surface. Uses in-box
|
||||
/// <see cref="System.Diagnostics.Metrics"/> so no NuGet dependency is required to emit —
|
||||
/// any MeterListener (dotnet-counters, OpenTelemetry.Extensions.Hosting OTLP exporter,
|
||||
/// Prometheus exporter, etc.) picks up the instruments by the <see cref="MeterName"/>.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Exporter configuration (OTLP, Prometheus, etc.) is intentionally NOT wired here —
|
||||
/// that's a deployment-ops decision that belongs in <c>Program.cs</c> behind an
|
||||
/// <c>appsettings</c> toggle. This class owns only the Meter + instruments so the
|
||||
/// production data stream exists regardless of exporter availability.
|
||||
///
|
||||
/// Counter + gauge names follow the otel-semantic-conventions pattern:
|
||||
/// <c>otopcua.redundancy.*</c> with tags for ClusterId + (for transitions) FromRole/ToRole/NodeId.
|
||||
/// </remarks>
|
||||
public sealed class RedundancyMetrics : IDisposable
|
||||
{
|
||||
public const string MeterName = "ZB.MOM.WW.OtOpcUa.Redundancy";
|
||||
|
||||
private readonly Meter _meter;
|
||||
private readonly Counter<long> _roleTransitions;
|
||||
private readonly object _gaugeLock = new();
|
||||
private readonly Dictionary<string, ClusterGaugeState> _gaugeState = new();
|
||||
|
||||
public RedundancyMetrics()
|
||||
{
|
||||
_meter = new Meter(MeterName, version: "1.0.0");
|
||||
_roleTransitions = _meter.CreateCounter<long>(
|
||||
"otopcua.redundancy.role_transition",
|
||||
unit: "{transition}",
|
||||
description: "Observed RedundancyRole changes per node — tagged FromRole, ToRole, NodeId, ClusterId.");
|
||||
|
||||
// Observable gauges — the callback reports whatever the last Observe*Count call stashed.
|
||||
_meter.CreateObservableGauge(
|
||||
"otopcua.redundancy.primary_count",
|
||||
ObservePrimaryCounts,
|
||||
unit: "{node}",
|
||||
description: "Count of Primary-role nodes per cluster (should be 1 for N+1 redundant clusters, 0 during failover).");
|
||||
_meter.CreateObservableGauge(
|
||||
"otopcua.redundancy.secondary_count",
|
||||
ObserveSecondaryCounts,
|
||||
unit: "{node}",
|
||||
description: "Count of Secondary-role nodes per cluster.");
|
||||
_meter.CreateObservableGauge(
|
||||
"otopcua.redundancy.stale_count",
|
||||
ObserveStaleCounts,
|
||||
unit: "{node}",
|
||||
description: "Count of cluster nodes whose LastSeenAt is older than StaleThreshold.");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Update the per-cluster snapshot consumed by the ObservableGauges. Poller calls this
|
||||
/// at the end of every tick so the collectors see fresh numbers on the next observation
|
||||
/// window (by default 1s for dotnet-counters, configurable per exporter).
|
||||
/// </summary>
|
||||
public void SetClusterCounts(string clusterId, int primary, int secondary, int stale)
|
||||
{
|
||||
lock (_gaugeLock)
|
||||
{
|
||||
_gaugeState[clusterId] = new ClusterGaugeState(primary, secondary, stale);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Increment the role_transition counter when a node's RedundancyRole changes. Tags
|
||||
/// allow breakdowns by from/to roles (e.g. Primary → Secondary for planned failover vs
|
||||
/// Primary → Standalone for emergency recovery) + by cluster for multi-site fleets.
|
||||
/// </summary>
|
||||
public void RecordRoleTransition(string clusterId, string nodeId, string fromRole, string toRole)
|
||||
{
|
||||
_roleTransitions.Add(1,
|
||||
new KeyValuePair<string, object?>("cluster.id", clusterId),
|
||||
new KeyValuePair<string, object?>("node.id", nodeId),
|
||||
new KeyValuePair<string, object?>("from_role", fromRole),
|
||||
new KeyValuePair<string, object?>("to_role", toRole));
|
||||
}
|
||||
|
||||
public void Dispose() => _meter.Dispose();
|
||||
|
||||
private IEnumerable<Measurement<long>> ObservePrimaryCounts() => SnapshotGauge(s => s.Primary);
|
||||
private IEnumerable<Measurement<long>> ObserveSecondaryCounts() => SnapshotGauge(s => s.Secondary);
|
||||
private IEnumerable<Measurement<long>> ObserveStaleCounts() => SnapshotGauge(s => s.Stale);
|
||||
|
||||
private IEnumerable<Measurement<long>> SnapshotGauge(Func<ClusterGaugeState, int> selector)
|
||||
{
|
||||
List<Measurement<long>> results;
|
||||
lock (_gaugeLock)
|
||||
{
|
||||
results = new List<Measurement<long>>(_gaugeState.Count);
|
||||
foreach (var (cluster, state) in _gaugeState)
|
||||
results.Add(new Measurement<long>(selector(state),
|
||||
new KeyValuePair<string, object?>("cluster.id", cluster)));
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
private readonly record struct ClusterGaugeState(int Primary, int Secondary, int Stale);
|
||||
}
|
||||
@@ -5,6 +5,7 @@ using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Admin.Hubs;
|
||||
using ZB.MOM.WW.OtOpcUa.Admin.Services;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
||||
@@ -97,7 +98,7 @@ END";
|
||||
|
||||
var poller = new FleetStatusPoller(
|
||||
_sp.GetRequiredService<IServiceScopeFactory>(),
|
||||
fleetHub, alertHub, NullLogger<FleetStatusPoller>.Instance);
|
||||
fleetHub, alertHub, NullLogger<FleetStatusPoller>.Instance, new RedundancyMetrics());
|
||||
|
||||
await poller.PollOnceAsync(CancellationToken.None);
|
||||
|
||||
@@ -142,7 +143,7 @@ END";
|
||||
|
||||
var poller = new FleetStatusPoller(
|
||||
_sp.GetRequiredService<IServiceScopeFactory>(),
|
||||
fleetHub, alertHub, NullLogger<FleetStatusPoller>.Instance);
|
||||
fleetHub, alertHub, NullLogger<FleetStatusPoller>.Instance, new RedundancyMetrics());
|
||||
|
||||
await poller.PollOnceAsync(CancellationToken.None);
|
||||
|
||||
|
||||
@@ -0,0 +1,70 @@
|
||||
using System.Diagnostics.Metrics;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Admin.Services;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Admin.Tests;
|
||||
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class RedundancyMetricsTests
|
||||
{
|
||||
[Fact]
|
||||
public void RecordRoleTransition_Increments_Counter_WithExpectedTags()
|
||||
{
|
||||
using var metrics = new RedundancyMetrics();
|
||||
using var listener = new MeterListener();
|
||||
var observed = new List<(long Value, Dictionary<string, object?> Tags)>();
|
||||
listener.InstrumentPublished = (instrument, l) =>
|
||||
{
|
||||
if (instrument.Meter.Name == RedundancyMetrics.MeterName &&
|
||||
instrument.Name == "otopcua.redundancy.role_transition")
|
||||
{
|
||||
l.EnableMeasurementEvents(instrument);
|
||||
}
|
||||
};
|
||||
listener.SetMeasurementEventCallback<long>((_, value, tags, _) =>
|
||||
{
|
||||
var dict = new Dictionary<string, object?>();
|
||||
foreach (var tag in tags) dict[tag.Key] = tag.Value;
|
||||
observed.Add((value, dict));
|
||||
});
|
||||
listener.Start();
|
||||
|
||||
metrics.RecordRoleTransition("c1", "node-a", "Primary", "Secondary");
|
||||
|
||||
observed.Count.ShouldBe(1);
|
||||
observed[0].Value.ShouldBe(1);
|
||||
observed[0].Tags["cluster.id"].ShouldBe("c1");
|
||||
observed[0].Tags["node.id"].ShouldBe("node-a");
|
||||
observed[0].Tags["from_role"].ShouldBe("Primary");
|
||||
observed[0].Tags["to_role"].ShouldBe("Secondary");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void SetClusterCounts_Observed_Via_ObservableGauges()
|
||||
{
|
||||
using var metrics = new RedundancyMetrics();
|
||||
metrics.SetClusterCounts("c1", primary: 1, secondary: 2, stale: 0);
|
||||
metrics.SetClusterCounts("c2", primary: 0, secondary: 1, stale: 1);
|
||||
|
||||
var observations = new List<(string Name, long Value, string Cluster)>();
|
||||
using var listener = new MeterListener();
|
||||
listener.InstrumentPublished = (instrument, l) =>
|
||||
{
|
||||
if (instrument.Meter.Name == RedundancyMetrics.MeterName)
|
||||
l.EnableMeasurementEvents(instrument);
|
||||
};
|
||||
listener.SetMeasurementEventCallback<long>((instrument, value, tags, _) =>
|
||||
{
|
||||
string? cluster = null;
|
||||
foreach (var t in tags) if (t.Key == "cluster.id") cluster = t.Value as string;
|
||||
observations.Add((instrument.Name, value, cluster ?? "?"));
|
||||
});
|
||||
listener.Start();
|
||||
listener.RecordObservableInstruments();
|
||||
|
||||
observations.ShouldContain(o => o.Name == "otopcua.redundancy.primary_count" && o.Cluster == "c1" && o.Value == 1);
|
||||
observations.ShouldContain(o => o.Name == "otopcua.redundancy.secondary_count" && o.Cluster == "c1" && o.Value == 2);
|
||||
observations.ShouldContain(o => o.Name == "otopcua.redundancy.stale_count" && o.Cluster == "c2" && o.Value == 1);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user