94 lines
3.9 KiB
C#
94 lines
3.9 KiB
C#
using Microsoft.AspNetCore.SignalR;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using ZB.MOM.WW.OtOpcUa.Configuration;
|
|
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Admin.Hubs;
|
|
|
|
/// <summary>
|
|
/// Polls <c>ClusterNodeGenerationState</c> every <see cref="PollInterval"/> and publishes
|
|
/// per-node deltas to <see cref="FleetStatusHub"/>. Also raises sticky
|
|
/// <see cref="AlertMessage"/>s on transitions into <c>Failed</c>.
|
|
/// </summary>
|
|
public sealed class FleetStatusPoller(
|
|
IServiceScopeFactory scopeFactory,
|
|
IHubContext<FleetStatusHub> fleetHub,
|
|
IHubContext<AlertHub> alertHub,
|
|
ILogger<FleetStatusPoller> logger) : BackgroundService
|
|
{
|
|
public TimeSpan PollInterval { get; init; } = TimeSpan.FromSeconds(5);
|
|
|
|
private readonly Dictionary<string, NodeStateSnapshot> _last = new();
|
|
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
logger.LogInformation("FleetStatusPoller starting — interval {Interval}s", PollInterval.TotalSeconds);
|
|
|
|
while (!stoppingToken.IsCancellationRequested)
|
|
{
|
|
try { await PollOnceAsync(stoppingToken); }
|
|
catch (Exception ex) when (ex is not OperationCanceledException)
|
|
{
|
|
logger.LogWarning(ex, "FleetStatusPoller tick failed");
|
|
}
|
|
|
|
try { await Task.Delay(PollInterval, stoppingToken); }
|
|
catch (OperationCanceledException) { break; }
|
|
}
|
|
}
|
|
|
|
internal async Task PollOnceAsync(CancellationToken ct)
|
|
{
|
|
using var scope = scopeFactory.CreateScope();
|
|
var db = scope.ServiceProvider.GetRequiredService<OtOpcUaConfigDbContext>();
|
|
|
|
var rows = await db.ClusterNodeGenerationStates.AsNoTracking()
|
|
.Join(db.ClusterNodes.AsNoTracking(), s => s.NodeId, n => n.NodeId, (s, n) => new { s, n.ClusterId })
|
|
.ToListAsync(ct);
|
|
|
|
foreach (var r in rows)
|
|
{
|
|
var snapshot = new NodeStateSnapshot(
|
|
r.s.NodeId, r.ClusterId, r.s.CurrentGenerationId,
|
|
r.s.LastAppliedStatus?.ToString(), r.s.LastAppliedError,
|
|
r.s.LastAppliedAt, r.s.LastSeenAt);
|
|
|
|
var hadPrior = _last.TryGetValue(r.s.NodeId, out var prior);
|
|
if (!hadPrior || prior != snapshot)
|
|
{
|
|
_last[r.s.NodeId] = snapshot;
|
|
|
|
var msg = new NodeStateChangedMessage(
|
|
snapshot.NodeId, snapshot.ClusterId, snapshot.GenerationId,
|
|
snapshot.Status, snapshot.Error, snapshot.AppliedAt, snapshot.SeenAt);
|
|
|
|
await fleetHub.Clients.Group(FleetStatusHub.GroupName(snapshot.ClusterId))
|
|
.SendAsync("NodeStateChanged", msg, ct);
|
|
await fleetHub.Clients.Group(FleetStatusHub.FleetGroup)
|
|
.SendAsync("NodeStateChanged", msg, ct);
|
|
|
|
if (snapshot.Status == "Failed" && (!hadPrior || prior.Status != "Failed"))
|
|
{
|
|
var alert = new AlertMessage(
|
|
AlertId: $"{snapshot.NodeId}:apply-failed",
|
|
Severity: "error",
|
|
Title: $"Apply failed on {snapshot.NodeId}",
|
|
Detail: snapshot.Error ?? "(no detail)",
|
|
RaisedAtUtc: DateTime.UtcNow,
|
|
ClusterId: snapshot.ClusterId,
|
|
NodeId: snapshot.NodeId);
|
|
await alertHub.Clients.Group(AlertHub.AllAlertsGroup)
|
|
.SendAsync("AlertRaised", alert, ct);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>Exposed for tests — forces a snapshot reset so stub data re-seeds.</summary>
|
|
internal void ResetCache() => _last.Clear();
|
|
|
|
private readonly record struct NodeStateSnapshot(
|
|
string NodeId, string ClusterId, long? GenerationId,
|
|
string? Status, string? Error, DateTime? AppliedAt, DateTime? SeenAt);
|
|
}
|