Files
wwtools/mbproxy/src/Mbproxy/Proxy/ProxyWorker.cs
T
Joseph Doherty e66b17fe5f mbproxy: Wave 2 fixes from 2026-05-14 code review
Resolves the 21 Major findings catalogued in
codereviews/2026-05-14/RemediationPlan.md (Wave 2). Tests: 370 pass / 0 fail
(baseline 363 + 7 new W2 regression tests).

Multiplexer / concurrency:
  W2.1  ConfigReconciler.Attach now threads the live coalescingAccessor through
        to add/restart-built supervisors so a hot-reload of
        ReadCoalescing.{Enabled,MaxParties} propagates to PLCs added or
        restarted via reload.
  W2.2  PlcMultiplexer._disposed and UpstreamPipe._disposed are now volatile
        for ARM/portability defense.
  W2.3  ProxyWorker._supervisors / ConfigReconciler._supervisors switched from
        Dictionary to ConcurrentDictionary; reconciler uses TryRemove. The
        outer Apply is serialised by a semaphore but the inner Add/Remove/
        Restart Task.WhenAll continuations run in parallel.
  W2.4  Counter parity for cache miss + coalescing-saturation miss documented
        inline (per-design contract; behavior unchanged).
  W2.5  _disposeCts.Dispose() and _connectGate.Dispose() guarded against late
        watchdog ticks.
  W2.6  _connectGate disposed in DisposeAsync.
  W2.7  Inline doc clarifying the post-rewriter FC byte read.

Cache / hot-reload:
  W2.8  PlcListenerSupervisor.ReplaceContextAsync now calls Clear() to capture
        the entry count, emits mbproxy.cache.flushed, then disposes the old
        cache. Previously the event was defined but never emitted.
  W2.9  Inline doc explaining the implicit "skip cache invalidation while
        recovering" gating (no backend reader during recovery → no FC06/FC16
        response → no invalidation).
  W2.10 ReloadValidator now re-checks resolved per-tag CacheTtlMs against
        Cache.AllowLongTtl after BcdTagMapBuilder folds the per-PLC default.

BCD rewriter:
  W2.11 Duplicate addresses detected within Global itself and within the per-PLC
        Add list itself, BEFORE the working dictionary collapses keys. Cross-list
        collisions (Global vs Add) remain the documented width-override pattern.
        Previously the DuplicateAddress error was unreachable dead code.
  W2.12 OverlappingHighRegister reports each colliding pair exactly once
        (canonicalised low/high pair tracked in a HashSet).
  W2.13 FC16 32-bit write rejects clientLow > 9999 or clientHigh > 9999 BEFORE
        the high*10000+low reconstruction. Without this guard, (high=9999,
        low=9999) silently re-encoded as (high=9998, low=9999), losing 1 from
        the high word.
  W2.14 FC16 validates pdu.Length >= 6 + qty*2 upfront — no half-rewritten
        requests when a malformed client claims more registers than it ships.

Supervisor:
  W2.15 WaitForInitialBindAttemptAsync now backed by TaskCompletionSource
        instead of 10ms busy-poll. Resolves race against fast Stopped→Bound→
        Stopped transitions and hangs when the supervisor task throws.
  W2.16 StartAsync refuses re-entry on a non-Stopped supervisor (was leaking
        the previous _supervisorCts).
  W2.17 New TransitionTo helper writes _state, _lastBindError, and (optionally)
        _recoveryAttempts under one lock. Snapshot() reads under the same lock
        so the status page never reports an inconsistent triple. Truncate
        helper extracted (was copy-pasted across three sites).
  W2.18 MbproxyOptionsValidator + ReloadValidator reject Connection.{Backend
        ConnectTimeoutMs, BackendRequestTimeoutMs, GracefulShutdownTimeoutMs}
        <= 0. Misconfigured 0 produces immediate CancelAfter(0) failures.

Hosting / diagnostics:
  W2.20 ProxyWorker.StopAsync supervisor-stop deadline now reads from
        IOptionsMonitor.CurrentValue.Connection.GracefulShutdownTimeoutMs
        (was hard-coded 5s).
  W2.21 src/Mbproxy/appsettings.json deleted; the published file is now a Link
        to install/mbproxy.config.template.json so the binary ships with a
        usable, fully-commented example config instead of an empty stub. Tests
        strip the inherited file from their bin via an AfterTargets="Build"
        Target so they don't pick up the template's example PLCs.
  W2.22 invalidBcdWarnings (PlcPdusStatus) and codeOther (ExceptionCounts)
        added to StatusDto, plumbed through StatusSnapshotBuilder, surfaced
        in StatusHtmlRenderer table cells.
  W2.23 EventLogBridge caches EventLog.SourceExists at construction so Emit
        doesn't hit the registry on every Error+ log line.

New regression tests:
  ReloadValidatorTests:
    Validate_PerTagCacheTtl_Above60s_Without_AllowLongTtl_Fails
    Validate_PerTagCacheTtl_Above60s_With_AllowLongTtl_Passes
    Validate_ResolvedTtl_FromPerPlcDefault_AboveCap_Fails
    Validate_ZeroBackendConnectTimeoutMs_Fails
    Validate_NegativeGracefulShutdownTimeoutMs_Fails
  BcdPduPipelineTests:
    FC16_32Bit_ClientHighOrLowAbove9999_PassesThroughRaw_WithInvalidBcdWarning
    FC16_TruncatedRegisterData_PassesThroughRaw_NoPartialRewrite

Reworked tests in BcdTagMapBuilderTests for the W2.11 contract (Global dup,
Add dup, Add-overrides-Global accepted as width override).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-14 05:48:44 -04:00

375 lines
18 KiB
C#

using System.Collections.Concurrent;
using System.Diagnostics;
using Mbproxy.Admin;
using Mbproxy.Bcd;
using Mbproxy.Configuration;
using Mbproxy.Options;
using Mbproxy.Proxy.Cache;
using Mbproxy.Proxy.Multiplexing;
using Mbproxy.Proxy.Supervision;
using Microsoft.Extensions.Options;
namespace Mbproxy.Proxy;
/// <summary>
/// <see cref="BackgroundService"/> that owns all <see cref="PlcListenerSupervisor"/> instances.
///
/// Startup posture (matches design doc "eager, continue on per-port failure"):
/// <list type="number">
/// <item>Enumerate <see cref="MbproxyOptions.Plcs"/> and build one supervisor per PLC.</item>
/// <item>Start all supervisors in parallel. Each supervisor attempts to bind immediately
/// and enters the Polly recovery loop if the bind fails.</item>
/// <item>After all supervisors have completed their first bind attempt (reached
/// <see cref="SupervisorState.Bound"/> or <see cref="SupervisorState.Recovering"/>),
/// log <c>mbproxy.startup.ready</c> with bound/configured counts.</item>
/// </list>
///
/// Phase 06: passes the supervisor dictionary to <see cref="ConfigReconciler.Attach"/>
/// after initial startup so hot-reload changes are applied by the reconciler.
///
/// Stop: cancels all supervisors in parallel with a 5-second hard deadline.
/// </summary>
internal sealed partial class ProxyWorker : BackgroundService
{
private readonly IOptionsMonitor<MbproxyOptions> _options;
private readonly IPduPipeline _pipeline;
private readonly ILogger<ProxyWorker> _logger;
private readonly ILoggerFactory _loggerFactory;
private readonly ConfigReconciler _reconciler;
// Phase 12 (W1.5) — admin endpoint is no longer IHostedService; ProxyWorker drives its
// lifecycle directly so the design's "drain THEN stop admin" ordering is honoured.
//
// Resolved LAZILY (in ExecuteAsync) rather than in the constructor because the DI graph
// is circular: AdminEndpointHost → StatusSnapshotBuilder → ProxyWorker. A constructor
// GetService<AdminEndpointHost>() during ProxyWorker's own construction returns null
// silently. Lazy resolution sidesteps the cycle — by the time ExecuteAsync runs the DI
// container is fully built.
private readonly IServiceProvider _services;
private AdminEndpointHost? _admin;
// Phase 06: supervisors are now managed jointly by ProxyWorker (initial bootstrap)
// and ConfigReconciler (subsequent hot-reload changes). The dictionary is shared
// via ConfigReconciler.Attach() after initial startup.
//
// Phase 12 (W2.3) — ConcurrentDictionary because ConfigReconciler mutates this from
// parallel Task.WhenAll continuations (Add/Remove/Restart paths). The outer Apply is
// serialised by a semaphore but the inner per-PLC tasks run concurrently. Status-page
// reads via IReadOnlyDictionary still work without locking.
private readonly ConcurrentDictionary<string, PlcListenerSupervisor> _supervisors =
new(StringComparer.Ordinal);
/// <summary>
/// Read-only view of the live supervisor dictionary. Consumed by Phase 07's
/// <see cref="Admin.StatusSnapshotBuilder"/> to enumerate per-PLC state.
/// The caller should read this on the status-page path only (not the hot path).
/// </summary>
internal IReadOnlyDictionary<string, PlcListenerSupervisor> Supervisors => _supervisors;
public ProxyWorker(
IOptionsMonitor<MbproxyOptions> options,
IPduPipeline pipeline,
ILogger<ProxyWorker> logger,
ILoggerFactory loggerFactory,
ConfigReconciler reconciler,
IServiceProvider services)
{
_options = options;
_pipeline = pipeline;
_logger = logger;
_loggerFactory = loggerFactory;
_reconciler = reconciler;
_services = services;
// Phase 12 (W1.5) — admin endpoint resolved lazily in ExecuteAsync (see field comment).
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var opts = _options.CurrentValue;
int plcsConfigured = opts.Plcs.Count;
// ── 1. Build per-PLC BCD tag maps ────────────────────────────────────────────
var plcContexts = new Dictionary<string, PerPlcContext>(opts.Plcs.Count, StringComparer.Ordinal);
foreach (var plc in opts.Plcs)
{
var result = BcdTagMapBuilder.Build(opts.BcdTags, plc.BcdTags, plc.DefaultCacheTtlMs);
foreach (var warn in result.Warnings)
_logger.LogWarning("[{Plc}] BCD tag map warning: {Message}", plc.Name, warn.Message);
if (result.Errors.Count > 0)
{
foreach (var err in result.Errors)
_logger.LogError("[{Plc}] BCD tag map error ({Kind}): {Message}",
plc.Name, err.Kind, err.Message);
_logger.LogError("Skipping listener for PLC '{Plc}' due to BCD tag map errors.", plc.Name);
continue;
}
// Phase 11 — construct a per-PLC response cache only when at least one
// resolved tag opts in (CacheTtlMs > 0). Skipping cache construction for a
// PLC with no cacheable tags keeps the no-cache path free of the eviction
// timer and the per-call resolution cost, preserving "default behaviour =
// Phase 10 unchanged" when no operator has opted any tag in.
var cache = HasAnyCacheableTag(result.Map)
? new ResponseCache(opts.Cache.MaxEntriesPerPlc, opts.Cache.EvictionIntervalMs)
: null;
plcContexts[plc.Name] = new PerPlcContext
{
PlcName = plc.Name,
TagMap = result.Map,
Counters = new ProxyCounters(),
Logger = _loggerFactory.CreateLogger($"Mbproxy.Proxy.BcdRewriter.{plc.Name}"),
Cache = cache,
};
}
// ── 2. Build Polly pipelines once ─────────────────────────────────────────────
// Both pipelines are built from ResilienceOptions and reused across all PLCs.
var resilienceOpts = opts.Resilience;
var backendPipeline = PolicyFactory.BuildBackendConnect(
resilienceOpts.BackendConnect,
_loggerFactory.CreateLogger("Mbproxy.Proxy.BackendConnect"));
// ── 3. Build supervisors ──────────────────────────────────────────────────────
foreach (var plc in opts.Plcs)
{
if (!plcContexts.TryGetValue(plc.Name, out var perPlcContext))
continue; // BCD map failed — skip this PLC.
// Each supervisor gets its own recovery pipeline (with its own logger scope).
var recoveryPipeline = PolicyFactory.BuildListenerRecovery(
resilienceOpts.ListenerRecovery,
_loggerFactory.CreateLogger($"Mbproxy.Proxy.ListenerRecovery.{plc.Name}"));
// Phase 10 — give the supervisor a live accessor for ReadCoalescingOptions
// so a hot-reload of `Mbproxy.Resilience.ReadCoalescing.Enabled` propagates
// to the multiplexer's per-PDU coalescing decision.
Func<ReadCoalescingOptions> coalescingAccessor =
() => _options.CurrentValue.Resilience.ReadCoalescing;
var supervisor = new PlcListenerSupervisor(
plc,
opts.Connection,
_pipeline,
_loggerFactory.CreateLogger<PlcListener>(),
_loggerFactory.CreateLogger<PlcMultiplexer>(),
_loggerFactory.CreateLogger($"Mbproxy.Proxy.UpstreamPipe.{plc.Name}"),
perPlcContext,
recoveryPipeline,
_loggerFactory.CreateLogger<PlcListenerSupervisor>(),
backendPipeline,
coalescingAccessor);
_supervisors[plc.Name] = supervisor;
}
// ── Phase 06: wire reconciler BEFORE starting supervisors ─────────────────
// Attach hands the reconciler the authoritative supervisor dictionary and the
// initial options snapshot. The reconciler won't process OnChange events until
// after this call — the brief window between Attach and first supervisor start
// is safe because the channel signal only enqueues; apply runs asynchronously.
// Phase 12 (W2.1) — also pass the live coalescing accessor so reconciler-built
// supervisors (add/restart paths) honour hot-reloaded ReadCoalescing values.
Func<ReadCoalescingOptions> reconcilerCoalescingAccessor =
() => _options.CurrentValue.Resilience.ReadCoalescing;
_reconciler.Attach(_supervisors, opts, reconcilerCoalescingAccessor);
if (_supervisors.Count == 0)
{
LogStartupReady(_logger, 0, plcsConfigured);
await Task.Delay(Timeout.Infinite, stoppingToken).ConfigureAwait(false);
return;
}
// ── 4. Start all supervisors in parallel ──────────────────────────────────────
var startTasks = _supervisors.Values
.Select(s => s.StartAsync(stoppingToken))
.ToArray();
await Task.WhenAll(startTasks).ConfigureAwait(false);
// ── 5. Wait for every supervisor to complete its first bind attempt ───────────
// "Ready" = every supervisor has transitioned out of Stopped (i.e. reached
// Bound or Recovering from its first attempt).
using var readyCts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
using var readyLinked = CancellationTokenSource.CreateLinkedTokenSource(
readyCts.Token, stoppingToken);
var waitTasks = _supervisors.Values
.Select(s => s.WaitForInitialBindAttemptAsync(readyLinked.Token))
.ToArray();
try
{
await Task.WhenAll(waitTasks).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// Either the 30 s deadline fired or the service is stopping.
}
int boundCount = _supervisors.Values.Count(s => s.Snapshot().State == SupervisorState.Bound);
LogStartupReady(_logger, boundCount, plcsConfigured);
// Phase 12 (W1.5) — start the admin endpoint AFTER listeners are bound so the
// status page can never observe the service in a "no PLCs configured yet" state.
// The admin endpoint is no longer registered as IHostedService (the host's reverse
// stop order would tear it down BEFORE drain). ProxyWorker drives both ends.
//
// Resolution happens here, not in the constructor — the DI graph is circular
// (admin → StatusSnapshotBuilder → ProxyWorker) and a constructor-time lookup
// returns null silently.
_admin = _services.GetService<AdminEndpointHost>();
if (_admin is not null)
{
try
{
await _admin.StartAsync(stoppingToken).ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogError(ex, "Admin endpoint failed to start: {Message}", ex.Message);
}
}
// ── 6. Keep the worker alive until the host signals stop ─────────────────────
// Supervisors run their own background loops; ExecuteAsync just waits.
await Task.Delay(Timeout.Infinite, stoppingToken).ConfigureAwait(false);
}
/// <summary>
/// Phase 12 (W1.5) — graceful shutdown sequence (replaces the deleted
/// <c>ShutdownCoordinator</c>):
/// <list type="number">
/// <item>Cancel <see cref="ExecuteAsync"/> via <c>base.StopAsync</c>.</item>
/// <item>Stop all supervisors with a 5 s hard deadline (no new connections; existing
/// pipes are cascaded by <see cref="PlcListenerSupervisor"/> teardown).</item>
/// <item>Wait for in-flight PDUs to drain via the live
/// <see cref="ConnectionOptions.GracefulShutdownTimeoutMs"/> (read fresh from
/// <see cref="IOptionsMonitor{T}.CurrentValue"/> so a hot-reloaded value is
/// honoured at stop time).</item>
/// <item>Stop the admin endpoint LAST so the status page survives the drain phase
/// and an operator polling it sees the in-flight count fall to zero.</item>
/// <item>Dispose every supervisor to release sockets, channels, and watchdog timers.</item>
/// </list>
/// Logs <c>mbproxy.shutdown.complete</c> on the way out with the in-flight count at
/// drain-deadline (zero on a clean shutdown, positive when forced cancel).
/// </summary>
public override async Task StopAsync(CancellationToken cancellationToken)
{
// Cancel ExecuteAsync first.
await base.StopAsync(cancellationToken).ConfigureAwait(false);
var sw = Stopwatch.StartNew();
// Phase 12 (W2.20) — supervisor stop deadline read from the live config so a
// hot-reloaded GracefulShutdownTimeoutMs is honoured. Previously hard-coded 5 s.
// The supervisor stop budget is bounded by the same total-shutdown budget.
int gracefulMs = _options.CurrentValue.Connection.GracefulShutdownTimeoutMs;
// ── 1. Stop accepting new connections ─────────────────────────────────────────
using var stopCts = new CancellationTokenSource(TimeSpan.FromMilliseconds(gracefulMs));
using var linked = CancellationTokenSource.CreateLinkedTokenSource(
stopCts.Token, cancellationToken);
var stopTasks = _supervisors.Values
.Select(s => s.StopAsync(linked.Token))
.ToArray();
try
{
await Task.WhenAll(stopTasks).ConfigureAwait(false);
}
catch
{
// Best effort — don't let individual supervisor failures block shutdown.
}
// ── 2. Drain in-flight PDUs ───────────────────────────────────────────────────
// Same `gracefulMs` budget the supervisor-stop step used.
int drainDeadlineMs = gracefulMs;
int inFlightAtCancel = 0;
if (drainDeadlineMs > 0)
{
using var drainCts = new CancellationTokenSource(TimeSpan.FromMilliseconds(drainDeadlineMs));
try
{
while (!drainCts.Token.IsCancellationRequested)
{
int total = CountInFlight();
if (total == 0) break;
await Task.Delay(10, drainCts.Token).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
{
inFlightAtCancel = CountInFlight();
}
}
// ── 3. Stop admin endpoint LAST ───────────────────────────────────────────────
if (_admin is not null)
{
try
{
using var adminCts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
await _admin.StopAsync(adminCts.Token).ConfigureAwait(false);
}
catch
{
// Best-effort.
}
}
// ── 4. Dispose supervisors (releases sockets, channels, watchdog timers) ─────
foreach (var supervisor in _supervisors.Values)
await supervisor.DisposeAsync().ConfigureAwait(false);
_supervisors.Clear();
LogShutdownComplete(_logger, inFlightAtCancel, sw.ElapsedMilliseconds);
}
private int CountInFlight()
{
int total = 0;
foreach (var supervisor in _supervisors.Values)
total += (int)supervisor.CurrentCounters.Snapshot().InFlightCount;
return total;
}
// ── Logging ───────────────────────────────────────────────────────────────────────────
/// <summary>
/// Phase 11 — returns <c>true</c> when at least one BcdTag in the resolved map has a
/// positive <see cref="BcdTag.CacheTtlMs"/>. A PLC with no cacheable tags skips the
/// <see cref="Mbproxy.Proxy.Cache.ResponseCache"/> entirely (no eviction timer, no
/// per-call cache resolution cost), so the default-OFF deployment is byte-identical
/// to a Phase-10 deployment.
/// </summary>
private static bool HasAnyCacheableTag(BcdTagMap map)
{
foreach (var t in map.All)
if (t.CacheTtlMs > 0) return true;
return false;
}
[LoggerMessage(EventId = 1, EventName = "mbproxy.startup.ready",
Level = LogLevel.Information,
Message = "mbproxy service ready — ListenersBound={ListenersBound} PlcsConfigured={PlcsConfigured}")]
private static partial void LogStartupReady(ILogger logger, int listenersBound, int plcsConfigured);
[LoggerMessage(EventId = 21, EventName = "mbproxy.startup.bind.failed",
Level = LogLevel.Error,
Message = "Failed to bind listener: Plc={Plc} Port={Port} Reason={Reason}")]
private static partial void LogBindFailed(ILogger logger, string plc, int port, string reason);
// Phase 12 (W1.5) — moved here from the deleted ShutdownCoordinator.
[LoggerMessage(EventId = 80, EventName = "mbproxy.shutdown.complete",
Level = LogLevel.Information,
Message = "Graceful shutdown complete: InFlightAtCancel={InFlightAtCancel} ElapsedMs={ElapsedMs}")]
private static partial void LogShutdownComplete(ILogger logger, int inFlightAtCancel, long elapsedMs);
}