using Mbproxy.Options; using Mbproxy.Proxy; using Microsoft.Extensions.Options; namespace Mbproxy.Admin; /// /// Background loop that drives the admin dashboard's live feed. Every /// it builds a status snapshot and /// pushes it through an : /// /// the fleet snapshot to every fleet-dashboard subscriber; /// a per-PLC detail payload (status row + tag-value capture) to each PLC that /// currently has a detail-page subscriber — PLCs with no viewer are skipped. /// /// /// Owned by : is called once /// the Kestrel app is up, before it stops. /// disarms every tag-value capture, so an AdminPort hot-reload — which tears down the /// SignalR host and all connections without firing per-connection disconnect cleanup /// deterministically — never leaves a capture armed with no viewer. /// internal sealed partial class StatusBroadcaster : IAsyncDisposable { private readonly IStatusPushSink _sink; private readonly StatusSnapshotBuilder _builder; private readonly PlcSubscriptionTracker _tracker; private readonly TagCaptureRegistry _captureRegistry; private readonly IOptionsMonitor _options; private readonly ILogger _logger; private readonly CancellationTokenSource _cts = new(); private Task _loop = Task.CompletedTask; // Guards StopAsync against a double-stop (DisposeAsync also calls StopAsync, and the // owner may call StopAsync explicitly first) — symmetry with AdminEndpointHost's // _disposed flag, and defends a future caller from touching the disposed CTS. private bool _stopped; public StatusBroadcaster( IStatusPushSink sink, StatusSnapshotBuilder builder, PlcSubscriptionTracker tracker, TagCaptureRegistry captureRegistry, IOptionsMonitor options, ILogger logger) { _sink = sink; _builder = builder; _tracker = tracker; _captureRegistry = captureRegistry; _options = options; _logger = logger; } /// Starts the push loop. Idempotent only in the sense that it is called once. public void Start() => _loop = Task.Run(() => LoopAsync(_cts.Token)); /// /// Stops the push loop and disarms every tag-value capture. /// public async Task StopAsync() { if (_stopped) return; _stopped = true; if (!_cts.IsCancellationRequested) await _cts.CancelAsync().ConfigureAwait(false); try { await _loop.ConfigureAwait(false); } catch (OperationCanceledException) { // Expected on cancellation. } _captureRegistry.DisarmAll(); } /// One push cycle. Exposed internally so unit tests can drive it deterministically. internal async Task PushOnceAsync(CancellationToken ct) { StatusResponse snapshot; try { snapshot = _builder.Build(); } catch (Exception ex) { LogSnapshotFailed(_logger, ex); return; } try { await _sink.PushFleetAsync(snapshot, ct).ConfigureAwait(false); } catch (Exception ex) when (ex is not OperationCanceledException) { LogFleetPushFailed(_logger, ex); } // Reconcile capture arm state from the live viewer set. This is the single // arm/disarm authority — doing it here (one thread, every cycle) means a SignalR // reconnect or a hot-reload capture rebuild can never strand a capture armed. var activePlcs = _tracker.ActivePlcs(); _captureRegistry.ReconcileArmed(activePlcs); // Index the snapshot's PLC rows once per cycle — a per-active-PLC FirstOrDefault // would be O(active × fleet). Dictionary? plcsByName = activePlcs.Count > 0 ? snapshot.Plcs.ToDictionary(p => p.Name, StringComparer.Ordinal) : null; foreach (var plcName in activePlcs) { try { var plc = plcsByName!.GetValueOrDefault(plcName); var debug = _builder.BuildDebug(plcName); var detail = new PlcDetailResponse(plc, debug); await _sink.PushPlcAsync(plcName, detail, ct).ConfigureAwait(false); } catch (Exception ex) when (ex is not OperationCanceledException) { LogDetailPushFailed(_logger, plcName, ex); } } } private async Task LoopAsync(CancellationToken ct) { try { while (!ct.IsCancellationRequested) { // Push first, delay second — so a dashboard that connects right after the // loop starts gets a snapshot immediately instead of waiting one interval. await PushOnceAsync(ct).ConfigureAwait(false); // Re-read the interval each cycle so an AdminPushIntervalMs hot-reload // takes effect without restarting the loop. Floored at 100 ms to avoid a // pathologically tight loop if a bad value slips past validation. int interval = Math.Max(100, _options.CurrentValue.AdminPushIntervalMs); await Task.Delay(interval, ct).ConfigureAwait(false); } } catch (OperationCanceledException) { // Normal shutdown. } catch (Exception ex) { LogLoopTerminated(_logger, ex); } } public async ValueTask DisposeAsync() { await StopAsync().ConfigureAwait(false); _cts.Dispose(); } // ── Logging ────────────────────────────────────────────────────────────── // Stable event names in the mbproxy.admin.broadcast.* family — see // docs/Reference/LogEvents.md. EventIds continue the admin block (70/71 in // AdminEndpointHost). [LoggerMessage(EventId = 72, EventName = "mbproxy.admin.broadcast.snapshot.failed", Level = LogLevel.Error, Message = "Status broadcaster failed to build a status snapshot — this push cycle is skipped")] private static partial void LogSnapshotFailed(ILogger logger, Exception ex); [LoggerMessage(EventId = 73, EventName = "mbproxy.admin.broadcast.fleet.failed", Level = LogLevel.Error, Message = "Status broadcaster failed to push the fleet snapshot to dashboard subscribers")] private static partial void LogFleetPushFailed(ILogger logger, Exception ex); [LoggerMessage(EventId = 74, EventName = "mbproxy.admin.broadcast.detail.failed", Level = LogLevel.Error, Message = "Status broadcaster failed to push the detail snapshot for PLC {Plc}")] private static partial void LogDetailPushFailed(ILogger logger, string plc, Exception ex); [LoggerMessage(EventId = 75, EventName = "mbproxy.admin.broadcast.loop.terminated", Level = LogLevel.Error, Message = "Status broadcaster push loop terminated unexpectedly — the live dashboard feed has stopped")] private static partial void LogLoopTerminated(ILogger logger, Exception ex); }