using Microsoft.AspNetCore.SignalR;
using Microsoft.EntityFrameworkCore;
using ZB.MOM.WW.OtOpcUa.Admin.Services;
using ZB.MOM.WW.OtOpcUa.Configuration;
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
namespace ZB.MOM.WW.OtOpcUa.Admin.Hubs;
///
/// Polls ClusterNodeGenerationState every and publishes
/// per-node deltas to . Also raises sticky
/// s on transitions into Failed.
///
public sealed class FleetStatusPoller(
IServiceScopeFactory scopeFactory,
IHubContext fleetHub,
IHubContext alertHub,
ILogger logger,
RedundancyMetrics redundancyMetrics) : BackgroundService
{
public TimeSpan PollInterval { get; init; } = TimeSpan.FromSeconds(5);
private readonly Dictionary _last = new();
private readonly Dictionary _lastRole = new(StringComparer.Ordinal);
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
logger.LogInformation("FleetStatusPoller starting — interval {Interval}s", PollInterval.TotalSeconds);
while (!stoppingToken.IsCancellationRequested)
{
try { await PollOnceAsync(stoppingToken); }
catch (Exception ex) when (ex is not OperationCanceledException)
{
logger.LogWarning(ex, "FleetStatusPoller tick failed");
}
try { await Task.Delay(PollInterval, stoppingToken); }
catch (OperationCanceledException) { break; }
}
}
internal async Task PollOnceAsync(CancellationToken ct)
{
using var scope = scopeFactory.CreateScope();
var db = scope.ServiceProvider.GetRequiredService();
var nodes = await db.ClusterNodes.AsNoTracking().ToListAsync(ct);
await PollRolesAsync(nodes, ct);
UpdateClusterGauges(nodes);
var rows = await db.ClusterNodeGenerationStates.AsNoTracking()
.Join(db.ClusterNodes.AsNoTracking(), s => s.NodeId, n => n.NodeId, (s, n) => new { s, n.ClusterId })
.ToListAsync(ct);
foreach (var r in rows)
{
var snapshot = new NodeStateSnapshot(
r.s.NodeId, r.ClusterId, r.s.CurrentGenerationId,
r.s.LastAppliedStatus?.ToString(), r.s.LastAppliedError,
r.s.LastAppliedAt, r.s.LastSeenAt);
var hadPrior = _last.TryGetValue(r.s.NodeId, out var prior);
if (!hadPrior || prior != snapshot)
{
_last[r.s.NodeId] = snapshot;
var msg = new NodeStateChangedMessage(
snapshot.NodeId, snapshot.ClusterId, snapshot.GenerationId,
snapshot.Status, snapshot.Error, snapshot.AppliedAt, snapshot.SeenAt);
await fleetHub.Clients.Group(FleetStatusHub.GroupName(snapshot.ClusterId))
.SendAsync("NodeStateChanged", msg, ct);
await fleetHub.Clients.Group(FleetStatusHub.FleetGroup)
.SendAsync("NodeStateChanged", msg, ct);
if (snapshot.Status == "Failed" && (!hadPrior || prior.Status != "Failed"))
{
var alert = new AlertMessage(
AlertId: $"{snapshot.NodeId}:apply-failed",
Severity: "error",
Title: $"Apply failed on {snapshot.NodeId}",
Detail: snapshot.Error ?? "(no detail)",
RaisedAtUtc: DateTime.UtcNow,
ClusterId: snapshot.ClusterId,
NodeId: snapshot.NodeId);
await alertHub.Clients.Group(AlertHub.AllAlertsGroup)
.SendAsync("AlertRaised", alert, ct);
}
}
}
}
/// Exposed for tests — forces a snapshot reset so stub data re-seeds.
internal void ResetCache()
{
_last.Clear();
_lastRole.Clear();
}
private async Task PollRolesAsync(IReadOnlyList nodes, CancellationToken ct)
{
foreach (var n in nodes)
{
var hadPrior = _lastRole.TryGetValue(n.NodeId, out var priorRole);
if (hadPrior && priorRole == n.RedundancyRole) continue;
_lastRole[n.NodeId] = n.RedundancyRole;
if (!hadPrior) continue; // first-observation bootstrap — not a transition
redundancyMetrics.RecordRoleTransition(
clusterId: n.ClusterId, nodeId: n.NodeId,
fromRole: priorRole.ToString(), toRole: n.RedundancyRole.ToString());
var msg = new RoleChangedMessage(
ClusterId: n.ClusterId, NodeId: n.NodeId,
FromRole: priorRole.ToString(), ToRole: n.RedundancyRole.ToString(),
ObservedAtUtc: DateTime.UtcNow);
await fleetHub.Clients.Group(FleetStatusHub.GroupName(n.ClusterId))
.SendAsync("RoleChanged", msg, ct);
await fleetHub.Clients.Group(FleetStatusHub.FleetGroup)
.SendAsync("RoleChanged", msg, ct);
}
}
private void UpdateClusterGauges(IReadOnlyList nodes)
{
var staleCutoff = DateTime.UtcNow - Services.ClusterNodeService.StaleThreshold;
foreach (var group in nodes.GroupBy(n => n.ClusterId))
{
var primary = group.Count(n => n.RedundancyRole == RedundancyRole.Primary);
var secondary = group.Count(n => n.RedundancyRole == RedundancyRole.Secondary);
var stale = group.Count(n => n.LastSeenAt is null || n.LastSeenAt.Value < staleCutoff);
redundancyMetrics.SetClusterCounts(group.Key, primary, secondary, stale);
}
}
private readonly record struct NodeStateSnapshot(
string NodeId, string ClusterId, long? GenerationId,
string? Status, string? Error, DateTime? AppliedAt, DateTime? SeenAt);
}
///
/// Pushed by when it observes a change in
/// . Consumed by the Admin RedundancyTab to trigger
/// an instant reload instead of waiting for the next on-parameter-set poll.
///
public sealed record RoleChangedMessage(
string ClusterId,
string NodeId,
string FromRole,
string ToRole,
DateTime ObservedAtUtc);