using System.Collections.Concurrent;
using System.Diagnostics;
using Mbproxy.Admin;
using Mbproxy.Bcd;
using Mbproxy.Configuration;
using Mbproxy.Options;
using Mbproxy.Proxy.Cache;
using Mbproxy.Proxy.Multiplexing;
using Mbproxy.Proxy.Supervision;
using Microsoft.Extensions.Options;
namespace Mbproxy.Proxy;
///
/// that owns all instances.
///
/// Startup posture (matches docs/Architecture/Overview.md "eager, continue on per-port failure"):
///
/// - Enumerate and build one supervisor per PLC.
/// - Start all supervisors in parallel. Each supervisor attempts to bind immediately
/// and enters the Polly recovery loop if the bind fails.
/// - After all supervisors have completed their first bind attempt (reached
/// or ),
/// log mbproxy.startup.ready with bound/configured counts.
///
///
/// Passes the supervisor dictionary to after
/// initial startup so hot-reload changes are applied by the reconciler.
///
/// Stop: cancels all supervisors in parallel with a 5-second hard deadline.
///
internal sealed partial class ProxyWorker : BackgroundService
{
private readonly IOptionsMonitor _options;
private readonly IPduPipeline _pipeline;
private readonly ILogger _logger;
private readonly ILoggerFactory _loggerFactory;
private readonly ConfigReconciler _reconciler;
// Admin endpoint is not registered as IHostedService; ProxyWorker drives its
// lifecycle directly so the design's "drain THEN stop admin" ordering is honoured.
//
// Resolved LAZILY (in ExecuteAsync) rather than in the constructor because the DI
// graph is circular: AdminEndpointHost → StatusSnapshotBuilder → ProxyWorker. A
// constructor GetService() during ProxyWorker's own construction
// returns null silently. Lazy resolution sidesteps the cycle — by the time
// ExecuteAsync runs the DI container is fully built.
private readonly IServiceProvider _services;
private AdminEndpointHost? _admin;
// Per-PLC tag-value captures for the connection-detail debug view. Populated as
// each PerPlcContext is built; the admin SignalR layer arms/disarms entries.
private readonly TagCaptureRegistry _captureRegistry;
// Supervisors are managed jointly by ProxyWorker (initial bootstrap) and
// ConfigReconciler (subsequent hot-reload changes). The dictionary is shared via
// ConfigReconciler.Attach() after initial startup.
//
// ConcurrentDictionary because ConfigReconciler mutates this from parallel
// Task.WhenAll continuations (Add/Remove/Restart paths). The outer Apply is
// serialised by a semaphore but the inner per-PLC tasks run concurrently.
// Status-page reads via IReadOnlyDictionary still work without locking.
private readonly ConcurrentDictionary _supervisors =
new(StringComparer.Ordinal);
///
/// Read-only view of the live supervisor dictionary. Consumed by
/// to enumerate per-PLC state.
/// The caller should read this on the status-page path only (not the hot path).
///
internal IReadOnlyDictionary Supervisors => _supervisors;
public ProxyWorker(
IOptionsMonitor options,
IPduPipeline pipeline,
ILogger logger,
ILoggerFactory loggerFactory,
ConfigReconciler reconciler,
TagCaptureRegistry captureRegistry,
IServiceProvider services)
{
_options = options;
_pipeline = pipeline;
_logger = logger;
_loggerFactory = loggerFactory;
_reconciler = reconciler;
_captureRegistry = captureRegistry;
_services = services;
// Admin endpoint resolved lazily in ExecuteAsync (see field comment).
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var opts = _options.CurrentValue;
int plcsConfigured = opts.Plcs.Count;
// ── 1. Build per-PLC BCD tag maps ────────────────────────────────────────────
var plcContexts = new Dictionary(opts.Plcs.Count, StringComparer.Ordinal);
foreach (var plc in opts.Plcs)
{
var result = BcdTagMapBuilder.Build(opts.BcdTags, plc.BcdTags, plc.DefaultCacheTtlMs);
foreach (var warn in result.Warnings)
_logger.LogWarning("[{Plc}] BCD tag map warning: {Message}", plc.Name, warn.Message);
if (result.Errors.Count > 0)
{
foreach (var err in result.Errors)
_logger.LogError("[{Plc}] BCD tag map error ({Kind}): {Message}",
plc.Name, err.Kind, err.Message);
_logger.LogError("Skipping listener for PLC '{Plc}' due to BCD tag map errors.", plc.Name);
continue;
}
// Construct a per-PLC response cache only when at least one resolved tag
// opts in (CacheTtlMs > 0). Skipping cache construction for a PLC with no
// cacheable tags keeps the no-cache path free of the eviction timer and the
// per-call resolution cost, preserving the "no caching" default behaviour
// when no operator has opted any tag in.
var cache = HasAnyCacheableTag(result.Map)
? new ResponseCache(opts.Cache.MaxEntriesPerPlc, opts.Cache.EvictionIntervalMs)
: null;
plcContexts[plc.Name] = new PerPlcContext
{
PlcName = plc.Name,
TagMap = result.Map,
Counters = new ProxyCounters(),
Logger = _loggerFactory.CreateLogger($"Mbproxy.Proxy.BcdRewriter.{plc.Name}"),
Cache = cache,
Capture = _captureRegistry.GetOrCreate(plc.Name, result.Map),
};
}
// ── 2. Build Polly pipelines once ─────────────────────────────────────────────
// Both pipelines are built from ResilienceOptions and reused across all PLCs.
var resilienceOpts = opts.Resilience;
var backendPipeline = PolicyFactory.BuildBackendConnect(
resilienceOpts.BackendConnect,
_loggerFactory.CreateLogger("Mbproxy.Proxy.BackendConnect"));
// ── 3. Build supervisors ──────────────────────────────────────────────────────
foreach (var plc in opts.Plcs)
{
if (!plcContexts.TryGetValue(plc.Name, out var perPlcContext))
continue; // BCD map failed — skip this PLC.
// Each supervisor gets its own recovery pipeline (with its own logger scope).
var recoveryPipeline = PolicyFactory.BuildListenerRecovery(
resilienceOpts.ListenerRecovery,
_loggerFactory.CreateLogger($"Mbproxy.Proxy.ListenerRecovery.{plc.Name}"));
// Give the supervisor a live accessor for ReadCoalescingOptions so a
// hot-reload of `Mbproxy.Resilience.ReadCoalescing.Enabled` propagates to
// the multiplexer's per-PDU coalescing decision.
Func coalescingAccessor =
() => _options.CurrentValue.Resilience.ReadCoalescing;
// Live accessor for KeepaliveOptions so a hot-reload of `Connection.Keepalive`
// propagates to the backend heartbeat loop and to upstream-socket keepalive.
Func keepaliveAccessor =
() => _options.CurrentValue.Connection.Keepalive;
var supervisor = new PlcListenerSupervisor(
plc,
opts.Connection,
_pipeline,
_loggerFactory.CreateLogger(),
_loggerFactory.CreateLogger(),
_loggerFactory.CreateLogger($"Mbproxy.Proxy.UpstreamPipe.{plc.Name}"),
perPlcContext,
recoveryPipeline,
_loggerFactory.CreateLogger(),
backendPipeline,
coalescingAccessor,
keepaliveAccessor);
_supervisors[plc.Name] = supervisor;
}
// ── Wire reconciler BEFORE starting supervisors ──────────────────────────
// Attach hands the reconciler the authoritative supervisor dictionary and the
// initial options snapshot. The reconciler won't process OnChange events until
// after this call — the brief window between Attach and first supervisor start
// is safe because the channel signal only enqueues; apply runs asynchronously.
// Pass the live coalescing accessor so reconciler-built supervisors
// (add/restart paths) honour hot-reloaded ReadCoalescing values.
Func reconcilerCoalescingAccessor =
() => _options.CurrentValue.Resilience.ReadCoalescing;
Func reconcilerKeepaliveAccessor =
() => _options.CurrentValue.Connection.Keepalive;
_reconciler.Attach(_supervisors, opts, reconcilerCoalescingAccessor, reconcilerKeepaliveAccessor);
if (_supervisors.Count == 0)
{
LogStartupReady(_logger, 0, plcsConfigured);
await Task.Delay(Timeout.Infinite, stoppingToken).ConfigureAwait(false);
return;
}
// ── 4. Start all supervisors in parallel ──────────────────────────────────────
var startTasks = _supervisors.Values
.Select(s => s.StartAsync(stoppingToken))
.ToArray();
await Task.WhenAll(startTasks).ConfigureAwait(false);
// ── 5. Wait for every supervisor to complete its first bind attempt ───────────
// "Ready" = every supervisor has transitioned out of Stopped (i.e. reached
// Bound or Recovering from its first attempt).
using var readyCts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
using var readyLinked = CancellationTokenSource.CreateLinkedTokenSource(
readyCts.Token, stoppingToken);
var waitTasks = _supervisors.Values
.Select(s => s.WaitForInitialBindAttemptAsync(readyLinked.Token))
.ToArray();
try
{
await Task.WhenAll(waitTasks).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// Either the 30 s deadline fired or the service is stopping.
}
int boundCount = _supervisors.Values.Count(s => s.Snapshot().State == SupervisorState.Bound);
LogStartupReady(_logger, boundCount, plcsConfigured);
// Start the admin endpoint AFTER listeners are bound so the status page can
// never observe the service in a "no PLCs configured yet" state. The admin
// endpoint is not registered as IHostedService (the host's reverse stop order
// would tear it down BEFORE drain) — ProxyWorker drives both ends.
//
// Resolution happens here, not in the constructor — the DI graph is circular
// (admin → StatusSnapshotBuilder → ProxyWorker) and a constructor-time lookup
// returns null silently.
_admin = _services.GetService();
if (_admin is not null)
{
try
{
await _admin.StartAsync(stoppingToken).ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogError(ex, "Admin endpoint failed to start: {Message}", ex.Message);
}
}
else
{
// Surface the absence. The lazy lookup returns null silently if
// AddMbproxyAdmin() is missing from Program.cs; a single warning makes a
// botched composition observable without blocking startup.
_logger.LogWarning(
"Admin endpoint not registered (AddMbproxyAdmin() missing from composition). " +
"Status page will be unavailable; service continues without it.");
}
// ── 6. Keep the worker alive until the host signals stop ─────────────────────
// Supervisors run their own background loops; ExecuteAsync just waits.
await Task.Delay(Timeout.Infinite, stoppingToken).ConfigureAwait(false);
}
///
/// Graceful shutdown sequence:
///
/// - Cancel via base.StopAsync.
/// - Snapshot per-PLC in-flight counts BEFORE stopping supervisors —
/// this is the only honest reading of "how many requests were in flight when
/// we decided to stop." Once supervisors stop, their multiplexers are torn
/// down and the per-mux counter providers are nulled, so any later read
/// returns 0 regardless of what was actually dropped.
/// - Stop all supervisors with the configured graceful timeout. Supervisor
/// stop is the actual drain — it cancels the listener, which exits its
/// accept loop, which disposes the multiplexer, which cascades all attached
/// pipes. There is no separate "drain in-flight" phase because there is
/// nothing to drain that wouldn't be killed by the supervisor stop itself.
/// - Stop the admin endpoint LAST so the status page survives the supervisor
/// stop phase and operators can observe the live state right up to shutdown.
/// - Dispose every supervisor to release sockets, channels, and watchdog timers.
///
/// Logs mbproxy.shutdown.complete with InFlightAtCancel equal to the
/// snapshot count from step 2 (= the number of in-flight requests dropped by the
/// stop) and ElapsedMs for the whole sequence.
///
public override async Task StopAsync(CancellationToken cancellationToken)
{
// Snapshot in-flight BEFORE base.StopAsync so the field matches its name: "the
// count at the moment the host signalled stop", not "the count at the moment we
// got around to computing it." `base.StopAsync` cancels the ExecuteAsync
// stoppingToken; in the milliseconds before it returns, in-flight requests
// whose responses arrive will be removed from _correlation and the watchdog can
// clear stale entries — the count would otherwise drift downward.
//
// Must run BEFORE supervisor stop too: after supervisor.StopAsync, multiplexers
// are disposed and CountInFlight returns 0 unconditionally.
int inFlightAtCancel = CountInFlight();
// Cancel ExecuteAsync first.
await base.StopAsync(cancellationToken).ConfigureAwait(false);
var sw = Stopwatch.StartNew();
// Supervisor stop deadline read from the live config so a hot-reloaded
// GracefulShutdownTimeoutMs is honoured. Supervisor stop is the drain:
// cancelling the supervisor cancels the listener, which exits accept, which
// disposes the multiplexer, which cascades all attached pipes.
int gracefulMs = _options.CurrentValue.Connection.GracefulShutdownTimeoutMs;
// ── 1. Stop accepting new connections + drain (one combined phase) ────────────
using var stopCts = new CancellationTokenSource(TimeSpan.FromMilliseconds(gracefulMs));
using var linked = CancellationTokenSource.CreateLinkedTokenSource(
stopCts.Token, cancellationToken);
var stopTasks = _supervisors.Values
.Select(s => s.StopAsync(linked.Token))
.ToArray();
try
{
await Task.WhenAll(stopTasks).ConfigureAwait(false);
}
catch
{
// Best effort — don't let individual supervisor failures block shutdown.
}
// ── 2. Stop admin endpoint LAST ───────────────────────────────────────────────
if (_admin is not null)
{
try
{
using var adminCts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
await _admin.StopAsync(adminCts.Token).ConfigureAwait(false);
}
catch
{
// Best-effort.
}
}
// ── 3. Dispose supervisors (releases sockets, channels, watchdog timers) ─────
foreach (var supervisor in _supervisors.Values)
await supervisor.DisposeAsync().ConfigureAwait(false);
_supervisors.Clear();
LogShutdownComplete(_logger, inFlightAtCancel, sw.ElapsedMilliseconds);
}
private int CountInFlight()
{
int total = 0;
foreach (var supervisor in _supervisors.Values)
total += (int)supervisor.CurrentCounters.Snapshot().InFlightCount;
return total;
}
// ── Logging ───────────────────────────────────────────────────────────────────────────
///
/// Returns true when at least one BcdTag in the resolved map has a positive
/// . A PLC with no cacheable tags skips the
/// entirely (no eviction timer, no
/// per-call cache resolution cost), so the default-OFF deployment runs the
/// no-cache code path.
///
private static bool HasAnyCacheableTag(BcdTagMap map)
{
foreach (var t in map.All)
if (t.CacheTtlMs > 0) return true;
return false;
}
[LoggerMessage(EventId = 1, EventName = "mbproxy.startup.ready",
Level = LogLevel.Information,
Message = "mbproxy service ready — ListenersBound={ListenersBound} PlcsConfigured={PlcsConfigured}")]
private static partial void LogStartupReady(ILogger logger, int listenersBound, int plcsConfigured);
[LoggerMessage(EventId = 21, EventName = "mbproxy.startup.bind.failed",
Level = LogLevel.Error,
Message = "Failed to bind listener: Plc={Plc} Port={Port} Reason={Reason}")]
private static partial void LogBindFailed(ILogger logger, string plc, int port, string reason);
[LoggerMessage(EventId = 80, EventName = "mbproxy.shutdown.complete",
Level = LogLevel.Information,
Message = "Graceful shutdown complete: InFlightAtCancel={InFlightAtCancel} ElapsedMs={ElapsedMs}")]
private static partial void LogShutdownComplete(ILogger logger, int inFlightAtCancel, long elapsedMs);
}