156 lines
6.4 KiB
C#
156 lines
6.4 KiB
C#
using Microsoft.AspNetCore.SignalR;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using ZB.MOM.WW.OtOpcUa.Admin.Services;
|
|
using ZB.MOM.WW.OtOpcUa.Configuration;
|
|
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
|
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Admin.Hubs;
|
|
|
|
/// <summary>
|
|
/// Polls <c>ClusterNodeGenerationState</c> every <see cref="PollInterval"/> and publishes
|
|
/// per-node deltas to <see cref="FleetStatusHub"/>. Also raises sticky
|
|
/// <see cref="AlertMessage"/>s on transitions into <c>Failed</c>.
|
|
/// </summary>
|
|
public sealed class FleetStatusPoller(
|
|
IServiceScopeFactory scopeFactory,
|
|
IHubContext<FleetStatusHub> fleetHub,
|
|
IHubContext<AlertHub> alertHub,
|
|
ILogger<FleetStatusPoller> logger,
|
|
RedundancyMetrics redundancyMetrics) : BackgroundService
|
|
{
|
|
public TimeSpan PollInterval { get; init; } = TimeSpan.FromSeconds(5);
|
|
|
|
private readonly Dictionary<string, NodeStateSnapshot> _last = new();
|
|
private readonly Dictionary<string, RedundancyRole> _lastRole = new(StringComparer.Ordinal);
|
|
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
logger.LogInformation("FleetStatusPoller starting — interval {Interval}s", PollInterval.TotalSeconds);
|
|
|
|
while (!stoppingToken.IsCancellationRequested)
|
|
{
|
|
try { await PollOnceAsync(stoppingToken); }
|
|
catch (Exception ex) when (ex is not OperationCanceledException)
|
|
{
|
|
logger.LogWarning(ex, "FleetStatusPoller tick failed");
|
|
}
|
|
|
|
try { await Task.Delay(PollInterval, stoppingToken); }
|
|
catch (OperationCanceledException) { break; }
|
|
}
|
|
}
|
|
|
|
internal async Task PollOnceAsync(CancellationToken ct)
|
|
{
|
|
using var scope = scopeFactory.CreateScope();
|
|
var db = scope.ServiceProvider.GetRequiredService<OtOpcUaConfigDbContext>();
|
|
|
|
var nodes = await db.ClusterNodes.AsNoTracking().ToListAsync(ct);
|
|
await PollRolesAsync(nodes, ct);
|
|
UpdateClusterGauges(nodes);
|
|
|
|
var rows = await db.ClusterNodeGenerationStates.AsNoTracking()
|
|
.Join(db.ClusterNodes.AsNoTracking(), s => s.NodeId, n => n.NodeId, (s, n) => new { s, n.ClusterId })
|
|
.ToListAsync(ct);
|
|
|
|
foreach (var r in rows)
|
|
{
|
|
var snapshot = new NodeStateSnapshot(
|
|
r.s.NodeId, r.ClusterId, r.s.CurrentGenerationId,
|
|
r.s.LastAppliedStatus?.ToString(), r.s.LastAppliedError,
|
|
r.s.LastAppliedAt, r.s.LastSeenAt);
|
|
|
|
var hadPrior = _last.TryGetValue(r.s.NodeId, out var prior);
|
|
if (!hadPrior || prior != snapshot)
|
|
{
|
|
_last[r.s.NodeId] = snapshot;
|
|
|
|
var msg = new NodeStateChangedMessage(
|
|
snapshot.NodeId, snapshot.ClusterId, snapshot.GenerationId,
|
|
snapshot.Status, snapshot.Error, snapshot.AppliedAt, snapshot.SeenAt);
|
|
|
|
await fleetHub.Clients.Group(FleetStatusHub.GroupName(snapshot.ClusterId))
|
|
.SendAsync("NodeStateChanged", msg, ct);
|
|
await fleetHub.Clients.Group(FleetStatusHub.FleetGroup)
|
|
.SendAsync("NodeStateChanged", msg, ct);
|
|
|
|
if (snapshot.Status == "Failed" && (!hadPrior || prior.Status != "Failed"))
|
|
{
|
|
var alert = new AlertMessage(
|
|
AlertId: $"{snapshot.NodeId}:apply-failed",
|
|
Severity: "error",
|
|
Title: $"Apply failed on {snapshot.NodeId}",
|
|
Detail: snapshot.Error ?? "(no detail)",
|
|
RaisedAtUtc: DateTime.UtcNow,
|
|
ClusterId: snapshot.ClusterId,
|
|
NodeId: snapshot.NodeId);
|
|
await alertHub.Clients.Group(AlertHub.AllAlertsGroup)
|
|
.SendAsync("AlertRaised", alert, ct);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>Exposed for tests — forces a snapshot reset so stub data re-seeds.</summary>
|
|
internal void ResetCache()
|
|
{
|
|
_last.Clear();
|
|
_lastRole.Clear();
|
|
}
|
|
|
|
private async Task PollRolesAsync(IReadOnlyList<ClusterNode> nodes, CancellationToken ct)
|
|
{
|
|
foreach (var n in nodes)
|
|
{
|
|
var hadPrior = _lastRole.TryGetValue(n.NodeId, out var priorRole);
|
|
if (hadPrior && priorRole == n.RedundancyRole) continue;
|
|
|
|
_lastRole[n.NodeId] = n.RedundancyRole;
|
|
if (!hadPrior) continue; // first-observation bootstrap — not a transition
|
|
|
|
redundancyMetrics.RecordRoleTransition(
|
|
clusterId: n.ClusterId, nodeId: n.NodeId,
|
|
fromRole: priorRole.ToString(), toRole: n.RedundancyRole.ToString());
|
|
|
|
var msg = new RoleChangedMessage(
|
|
ClusterId: n.ClusterId, NodeId: n.NodeId,
|
|
FromRole: priorRole.ToString(), ToRole: n.RedundancyRole.ToString(),
|
|
ObservedAtUtc: DateTime.UtcNow);
|
|
|
|
await fleetHub.Clients.Group(FleetStatusHub.GroupName(n.ClusterId))
|
|
.SendAsync("RoleChanged", msg, ct);
|
|
await fleetHub.Clients.Group(FleetStatusHub.FleetGroup)
|
|
.SendAsync("RoleChanged", msg, ct);
|
|
}
|
|
}
|
|
|
|
private void UpdateClusterGauges(IReadOnlyList<ClusterNode> nodes)
|
|
{
|
|
var staleCutoff = DateTime.UtcNow - Services.ClusterNodeService.StaleThreshold;
|
|
foreach (var group in nodes.GroupBy(n => n.ClusterId))
|
|
{
|
|
var primary = group.Count(n => n.RedundancyRole == RedundancyRole.Primary);
|
|
var secondary = group.Count(n => n.RedundancyRole == RedundancyRole.Secondary);
|
|
var stale = group.Count(n => n.LastSeenAt is null || n.LastSeenAt.Value < staleCutoff);
|
|
redundancyMetrics.SetClusterCounts(group.Key, primary, secondary, stale);
|
|
}
|
|
}
|
|
|
|
private readonly record struct NodeStateSnapshot(
|
|
string NodeId, string ClusterId, long? GenerationId,
|
|
string? Status, string? Error, DateTime? AppliedAt, DateTime? SeenAt);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Pushed by <see cref="FleetStatusPoller"/> when it observes a change in
|
|
/// <see cref="ClusterNode.RedundancyRole"/>. Consumed by the Admin RedundancyTab to trigger
|
|
/// an instant reload instead of waiting for the next on-parameter-set poll.
|
|
/// </summary>
|
|
public sealed record RoleChangedMessage(
|
|
string ClusterId,
|
|
string NodeId,
|
|
string FromRole,
|
|
string ToRole,
|
|
DateTime ObservedAtUtc);
|