using Microsoft.AspNetCore.SignalR; using Microsoft.EntityFrameworkCore; using ZB.MOM.WW.OtOpcUa.Configuration; using ZB.MOM.WW.OtOpcUa.Configuration.Entities; namespace ZB.MOM.WW.OtOpcUa.Admin.Hubs; /// /// Polls ClusterNodeGenerationState every and publishes /// per-node deltas to . Also raises sticky /// s on transitions into Failed. /// public sealed class FleetStatusPoller( IServiceScopeFactory scopeFactory, IHubContext fleetHub, IHubContext alertHub, ILogger logger) : BackgroundService { public TimeSpan PollInterval { get; init; } = TimeSpan.FromSeconds(5); private readonly Dictionary _last = new(); protected override async Task ExecuteAsync(CancellationToken stoppingToken) { logger.LogInformation("FleetStatusPoller starting — interval {Interval}s", PollInterval.TotalSeconds); while (!stoppingToken.IsCancellationRequested) { try { await PollOnceAsync(stoppingToken); } catch (Exception ex) when (ex is not OperationCanceledException) { logger.LogWarning(ex, "FleetStatusPoller tick failed"); } try { await Task.Delay(PollInterval, stoppingToken); } catch (OperationCanceledException) { break; } } } internal async Task PollOnceAsync(CancellationToken ct) { using var scope = scopeFactory.CreateScope(); var db = scope.ServiceProvider.GetRequiredService(); var rows = await db.ClusterNodeGenerationStates.AsNoTracking() .Join(db.ClusterNodes.AsNoTracking(), s => s.NodeId, n => n.NodeId, (s, n) => new { s, n.ClusterId }) .ToListAsync(ct); foreach (var r in rows) { var snapshot = new NodeStateSnapshot( r.s.NodeId, r.ClusterId, r.s.CurrentGenerationId, r.s.LastAppliedStatus?.ToString(), r.s.LastAppliedError, r.s.LastAppliedAt, r.s.LastSeenAt); var hadPrior = _last.TryGetValue(r.s.NodeId, out var prior); if (!hadPrior || prior != snapshot) { _last[r.s.NodeId] = snapshot; var msg = new NodeStateChangedMessage( snapshot.NodeId, snapshot.ClusterId, snapshot.GenerationId, snapshot.Status, snapshot.Error, snapshot.AppliedAt, snapshot.SeenAt); await fleetHub.Clients.Group(FleetStatusHub.GroupName(snapshot.ClusterId)) .SendAsync("NodeStateChanged", msg, ct); await fleetHub.Clients.Group(FleetStatusHub.FleetGroup) .SendAsync("NodeStateChanged", msg, ct); if (snapshot.Status == "Failed" && (!hadPrior || prior.Status != "Failed")) { var alert = new AlertMessage( AlertId: $"{snapshot.NodeId}:apply-failed", Severity: "error", Title: $"Apply failed on {snapshot.NodeId}", Detail: snapshot.Error ?? "(no detail)", RaisedAtUtc: DateTime.UtcNow, ClusterId: snapshot.ClusterId, NodeId: snapshot.NodeId); await alertHub.Clients.Group(AlertHub.AllAlertsGroup) .SendAsync("AlertRaised", alert, ct); } } } } /// Exposed for tests — forces a snapshot reset so stub data re-seeds. internal void ResetCache() => _last.Clear(); private readonly record struct NodeStateSnapshot( string NodeId, string ClusterId, long? GenerationId, string? Status, string? Error, DateTime? AppliedAt, DateTime? SeenAt); }