Files
lmxopcua/src/Server/ZB.MOM.WW.OtOpcUa.Admin/Hubs/FleetStatusPoller.cs
T
Joseph Doherty 2b33b64a58 fix(admin): resolve Low code-review findings (Admin-010,011,012)
- Admin-010: vendor Bootstrap 5.3.3 (CSS + JS bundle + maps + provenance
  README) under wwwroot/lib/bootstrap and reference local paths from
  App.razor — Admin no longer pulls Bootstrap from jsDelivr.
- Admin-011: swap FleetStatusPoller's three plain dictionaries for
  ConcurrentDictionary so ResetCache can't race a poll tick.
- Admin-012: drop the EquipmentId column from EquipmentCsvImporter (per
  admin-ui.md — equipment id is system-derived from EquipmentUuid);
  EquipmentImportBatchService and the textarea placeholder updated to
  match.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 07:24:07 -04:00

197 lines
8.6 KiB
C#

using System.Collections.Concurrent;
using Microsoft.AspNetCore.SignalR;
using Microsoft.EntityFrameworkCore;
using ZB.MOM.WW.OtOpcUa.Admin.Services;
using ZB.MOM.WW.OtOpcUa.Configuration;
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
namespace ZB.MOM.WW.OtOpcUa.Admin.Hubs;
/// <summary>
/// Polls <c>ClusterNodeGenerationState</c> every <see cref="PollInterval"/> and publishes
/// per-node deltas to <see cref="FleetStatusHub"/>. Also raises sticky
/// <see cref="AlertMessage"/>s on transitions into <c>Failed</c>.
/// </summary>
public sealed class FleetStatusPoller(
IServiceScopeFactory scopeFactory,
IHubContext<FleetStatusHub> fleetHub,
IHubContext<AlertHub> alertHub,
ILogger<FleetStatusPoller> logger,
RedundancyMetrics redundancyMetrics) : BackgroundService
{
public TimeSpan PollInterval { get; init; } = TimeSpan.FromSeconds(5);
// Admin-011: ConcurrentDictionary so the steady-state poll path and ResetCache() (which
// is exposed `internal` for tests) can race without throwing InvalidOperationException
// or corrupting state. Plain Dictionary<,> is not safe under concurrent clear + mutate.
private readonly ConcurrentDictionary<string, NodeStateSnapshot> _last = new();
private readonly ConcurrentDictionary<string, RedundancyRole> _lastRole = new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, ResilienceSnapshot> _lastResilience = new(StringComparer.Ordinal);
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
logger.LogInformation("FleetStatusPoller starting — interval {Interval}s", PollInterval.TotalSeconds);
while (!stoppingToken.IsCancellationRequested)
{
try { await PollOnceAsync(stoppingToken); }
catch (Exception ex) when (ex is not OperationCanceledException)
{
logger.LogWarning(ex, "FleetStatusPoller tick failed");
}
try { await Task.Delay(PollInterval, stoppingToken); }
catch (OperationCanceledException) { break; }
}
}
internal async Task PollOnceAsync(CancellationToken ct)
{
using var scope = scopeFactory.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<OtOpcUaConfigDbContext>();
var nodes = await db.ClusterNodes.AsNoTracking().ToListAsync(ct);
await PollRolesAsync(nodes, ct);
UpdateClusterGauges(nodes);
await PollResilienceAsync(db, ct);
var rows = await db.ClusterNodeGenerationStates.AsNoTracking()
.Join(db.ClusterNodes.AsNoTracking(), s => s.NodeId, n => n.NodeId, (s, n) => new { s, n.ClusterId })
.ToListAsync(ct);
foreach (var r in rows)
{
var snapshot = new NodeStateSnapshot(
r.s.NodeId, r.ClusterId, r.s.CurrentGenerationId,
r.s.LastAppliedStatus?.ToString(), r.s.LastAppliedError,
r.s.LastAppliedAt, r.s.LastSeenAt);
var hadPrior = _last.TryGetValue(r.s.NodeId, out var prior);
if (!hadPrior || prior != snapshot)
{
_last[r.s.NodeId] = snapshot;
var msg = new NodeStateChangedMessage(
snapshot.NodeId, snapshot.ClusterId, snapshot.GenerationId,
snapshot.Status, snapshot.Error, snapshot.AppliedAt, snapshot.SeenAt);
await fleetHub.Clients.Group(FleetStatusHub.GroupName(snapshot.ClusterId))
.SendAsync("NodeStateChanged", msg, ct);
await fleetHub.Clients.Group(FleetStatusHub.FleetGroup)
.SendAsync("NodeStateChanged", msg, ct);
if (snapshot.Status == "Failed" && (!hadPrior || prior.Status != "Failed"))
{
var alert = new AlertMessage(
AlertId: $"{snapshot.NodeId}:apply-failed",
Severity: "error",
Title: $"Apply failed on {snapshot.NodeId}",
Detail: snapshot.Error ?? "(no detail)",
RaisedAtUtc: DateTime.UtcNow,
ClusterId: snapshot.ClusterId,
NodeId: snapshot.NodeId);
await alertHub.Clients.Group(AlertHub.AllAlertsGroup)
.SendAsync("AlertRaised", alert, ct);
}
}
}
}
/// <summary>Exposed for tests — forces a snapshot reset so stub data re-seeds.</summary>
internal void ResetCache()
{
_last.Clear();
_lastRole.Clear();
_lastResilience.Clear();
}
/// <summary>
/// Phase 6.1 Stream E.2 — detect deltas on the <c>DriverInstanceResilienceStatus</c>
/// table and push <see cref="ResilienceStatusChangedMessage"/> on the fleet group so
/// the Admin <c>/hosts</c> page can upsert the matching row without waiting for the
/// 10-second poll cycle. Keyed on <c>(DriverInstanceId, HostName)</c>; same key the
/// UI uses to join status rows.
/// </summary>
private async Task PollResilienceAsync(OtOpcUaConfigDbContext db, CancellationToken ct)
{
var rows = await db.DriverInstanceResilienceStatuses.AsNoTracking().ToListAsync(ct);
foreach (var r in rows)
{
var key = $"{r.DriverInstanceId}{r.HostName}";
var snapshot = new ResilienceSnapshot(
r.ConsecutiveFailures, r.LastCircuitBreakerOpenUtc,
r.CurrentBulkheadDepth, r.LastRecycleUtc);
if (_lastResilience.TryGetValue(key, out var prior) && prior == snapshot) continue;
_lastResilience[key] = snapshot;
var msg = new ResilienceStatusChangedMessage(
r.DriverInstanceId, r.HostName,
snapshot.ConsecutiveFailures, snapshot.LastCircuitBreakerOpenUtc,
snapshot.CurrentBulkheadDepth, snapshot.LastRecycleUtc);
await fleetHub.Clients.Group(FleetStatusHub.FleetGroup)
.SendAsync("ResilienceStatusChanged", msg, ct);
}
}
private async Task PollRolesAsync(IReadOnlyList<ClusterNode> nodes, CancellationToken ct)
{
foreach (var n in nodes)
{
var hadPrior = _lastRole.TryGetValue(n.NodeId, out var priorRole);
if (hadPrior && priorRole == n.RedundancyRole) continue;
_lastRole[n.NodeId] = n.RedundancyRole;
if (!hadPrior) continue; // first-observation bootstrap — not a transition
redundancyMetrics.RecordRoleTransition(
clusterId: n.ClusterId, nodeId: n.NodeId,
fromRole: priorRole.ToString(), toRole: n.RedundancyRole.ToString());
var msg = new RoleChangedMessage(
ClusterId: n.ClusterId, NodeId: n.NodeId,
FromRole: priorRole.ToString(), ToRole: n.RedundancyRole.ToString(),
ObservedAtUtc: DateTime.UtcNow);
await fleetHub.Clients.Group(FleetStatusHub.GroupName(n.ClusterId))
.SendAsync("RoleChanged", msg, ct);
await fleetHub.Clients.Group(FleetStatusHub.FleetGroup)
.SendAsync("RoleChanged", msg, ct);
}
}
private void UpdateClusterGauges(IReadOnlyList<ClusterNode> nodes)
{
var staleCutoff = DateTime.UtcNow - Services.ClusterNodeService.StaleThreshold;
foreach (var group in nodes.GroupBy(n => n.ClusterId))
{
var primary = group.Count(n => n.RedundancyRole == RedundancyRole.Primary);
var secondary = group.Count(n => n.RedundancyRole == RedundancyRole.Secondary);
var stale = group.Count(n => n.LastSeenAt is null || n.LastSeenAt.Value < staleCutoff);
redundancyMetrics.SetClusterCounts(group.Key, primary, secondary, stale);
}
}
private readonly record struct NodeStateSnapshot(
string NodeId, string ClusterId, long? GenerationId,
string? Status, string? Error, DateTime? AppliedAt, DateTime? SeenAt);
private readonly record struct ResilienceSnapshot(
int ConsecutiveFailures, DateTime? LastCircuitBreakerOpenUtc,
int CurrentBulkheadDepth, DateTime? LastRecycleUtc);
}
/// <summary>
/// Pushed by <see cref="FleetStatusPoller"/> when it observes a change in
/// <see cref="ClusterNode.RedundancyRole"/>. Consumed by the Admin RedundancyTab to trigger
/// an instant reload instead of waiting for the next on-parameter-set poll.
/// </summary>
public sealed record RoleChangedMessage(
string ClusterId,
string NodeId,
string FromRole,
string ToRole,
DateTime ObservedAtUtc);