mbproxy: Wave 1 fixes from 2026-05-14 code review

Resolves the four critical correctness defects + the ShutdownCoordinator
double-stop ordering bug called out in codereviews/2026-05-14/Overview.md.
Tests: 362 pass / 0 fail (baseline 358 + 4 new W1 regression tests).

W1.1 — Context swap on running multiplexer.
  PlcMultiplexer._ctx becomes volatile with a new ReplaceContext() method
  that re-registers the cache stats provider on the (preserved) counters.
  PlcListener exposes its multiplexer; PlcListenerSupervisor.ReplaceContextAsync
  swaps the running mux first, then disposes the old cache. Hot-reload
  tag-list changes and the cache-flush-on-reload contract now actually take
  effect on the next PDU instead of waiting for the next listener fault.

W1.2 — Coalescing factory leak.
  When the InFlightByKey factory soft-fails (allocator saturation or duplicate
  TxId), the cleanup path now TryRemoves the stub and walks every party on it
  (including late attachers) to deliver Modbus exception 0x04. Previously
  only the leader got the exception; late attachers waited forever for a
  response that no backend round-trip would ever fire.

W1.3 — Backend-reader head-of-line block.
  UpstreamPipe gains TrySendResponse for non-blocking enqueue. The per-PLC
  backend reader's fan-out loop uses it instead of awaiting SendResponseAsync,
  so a wedged upstream's full bounded response channel can no longer stall
  the single backend reader and starve every other client on that PLC. New
  responseDropForFullUpstream counter on ProxyCounters / CounterSnapshot
  records the drops.

W1.4 — Stranded outbound frames after cascade.
  TearDownBackendAsync acquires _connectGate and drains any frames left in
  _outboundChannel after the writer task faulted/cancelled, releasing their
  proxy TxIds back to the allocator. Without this, a fresh
  EnsureBackendConnectedAsync racing the cascade would send stranded frames
  with old TxIds onto the new backend socket; the responses would arrive
  with no correlation entry and the upstream peers would hang on the
  watchdog until BackendRequestTimeoutMs.

W1.5 — Delete ShutdownCoordinator (Option B).
  Drain logic moved into ProxyWorker.StopAsync. AdminEndpointHost is no
  longer registered as IHostedService; ProxyWorker drives its lifecycle
  directly so admin starts after listeners are bound and stops AFTER the
  in-flight drain (the design's documented contract). Admin is resolved
  lazily in ExecuteAsync to break the circular DI graph
  (Admin -> StatusSnapshotBuilder -> ProxyWorker). GracefulShutdownTimeoutMs
  is now read fresh from IOptionsMonitor.CurrentValue at stop time, so a
  hot-reloaded value is honoured. Removes ShutdownCoordinator + tests.

New tests:
  PlcMultiplexerTests.ReplaceContext_NewTagMap_VisibleOnNextPdu
  PlcMultiplexerTests.ReplaceContext_NewCache_NextReadGoesToBackend_NotOldCache
  UpstreamPipeTests.TrySendResponse_WhenChannelFull_ReturnsFalse_WithoutBlocking
  UpstreamPipeTests.TrySendResponse_AfterDispose_ReturnsFalse

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-14 05:16:13 -04:00
parent f2c6669444
commit ce32c5cee8
14 changed files with 614 additions and 532 deletions
@@ -1,177 +0,0 @@
using Mbproxy.Diagnostics;
using Mbproxy.Options;
using Mbproxy.Proxy;
using Microsoft.Extensions.Logging.Abstractions;
using Shouldly;
using Xunit;
namespace Mbproxy.Tests.Diagnostics;
/// <summary>
/// Unit tests for <see cref="ShutdownCoordinator"/>.
/// All tests use the internal testability constructor with fake handles.
/// </summary>
[Trait("Category", "Unit")]
public sealed class ShutdownCoordinatorTests
{
// ── Fake implementations ──────────────────────────────────────────────────────────────────
private sealed class FakeAdminHandle : IAdminEndpointHandle
{
public bool StopCalled { get; private set; }
public int StopCallOrder { get; private set; }
private readonly Func<int>? _orderSource;
public FakeAdminHandle(Func<int>? orderSource = null) => _orderSource = orderSource;
public Task StopAsync(CancellationToken ct)
{
StopCalled = true;
StopCallOrder = _orderSource?.Invoke() ?? 0;
return Task.CompletedTask;
}
}
private sealed class SimpleFakeSupervisor : ISupervisorHandle
{
public bool StopCalled { get; private set; }
public int StopCallOrder { get; private set; }
private readonly Func<int>? _orderSource;
public SimpleFakeSupervisor(Func<int>? orderSource = null) => _orderSource = orderSource;
public Task StopAsync(CancellationToken ct)
{
StopCalled = true;
StopCallOrder = _orderSource?.Invoke() ?? 0;
return Task.CompletedTask;
}
public int InFlightCount { get; set; }
}
private sealed class DelayedStopSupervisor : ISupervisorHandle
{
private readonly Func<Task> _onStop;
public DelayedStopSupervisor(Func<Task> onStop) => _onStop = onStop;
public async Task StopAsync(CancellationToken ct) => await _onStop();
public int InFlightCount => 0;
}
// ── Helper ────────────────────────────────────────────────────────────────────────────────
private static ShutdownCoordinator Build(
IReadOnlyList<ISupervisorHandle> supervisors,
IAdminEndpointHandle admin,
int timeoutMs = 500)
{
var opts = Microsoft.Extensions.Options.Options.Create(new MbproxyOptions
{
Connection = new ConnectionOptions { GracefulShutdownTimeoutMs = timeoutMs },
});
return new ShutdownCoordinator(
supervisors,
admin,
opts,
NullLogger<ShutdownCoordinator>.Instance);
}
// ── Tests ─────────────────────────────────────────────────────────────────────────────────
/// <summary>
/// With no active connections the drain loop exits on the first check;
/// the whole sequence should be fast (well under 1 s).
/// </summary>
[Fact]
public async Task Shutdown_NoActiveConnections_CompletesImmediately()
{
var supervisor = new SimpleFakeSupervisor();
var admin = new FakeAdminHandle();
var coord = Build([supervisor], admin, timeoutMs: 5000);
var sw = System.Diagnostics.Stopwatch.StartNew();
await coord.ShutdownAsync(timeoutMs: 5000, TestContext.Current.CancellationToken);
sw.Stop();
sw.ElapsedMilliseconds.ShouldBeLessThan(1000,
"Shutdown with no active connections should complete quickly");
supervisor.StopCalled.ShouldBeTrue("supervisor.StopAsync must be called");
admin.StopCalled.ShouldBeTrue("admin.StopAsync must be called");
}
/// <summary>
/// Verifies that the coordinator awaits supervisor stop before declaring shutdown done.
/// </summary>
[Fact]
public async Task Shutdown_OneActiveConnection_WaitsForCompletion()
{
bool stopInvoked = false;
var supervisor = new DelayedStopSupervisor(async () =>
{
await Task.Delay(50, TestContext.Current.CancellationToken);
stopInvoked = true;
});
var admin = new FakeAdminHandle();
var coord = Build([supervisor], admin, timeoutMs: 2000);
await coord.ShutdownAsync(timeoutMs: 2000, TestContext.Current.CancellationToken);
stopInvoked.ShouldBeTrue(
"supervisor.StopAsync must complete before ShutdownAsync returns");
admin.StopCalled.ShouldBeTrue("admin endpoint must be stopped");
}
/// <summary>
/// When the drain deadline fires, the coordinator must complete and still stop the admin
/// endpoint, not block forever.
/// </summary>
[Fact]
public async Task Shutdown_TimeoutExceeded_CancelsRemainingWork_AndReportsCount()
{
// Use a supervisor that completes stop immediately; the "timeout" scenario is
// that the drain loop has no pairs to wait for but the coordinator still respects
// its deadline. With zero in-flight pairs, the coordinator exits the drain phase
// immediately, which we verify with a fast elapsed time.
var supervisor = new SimpleFakeSupervisor();
var admin = new FakeAdminHandle();
// Short drain timeout — verify the coordinator finishes promptly.
var coord = Build([supervisor], admin, timeoutMs: 50);
var sw = System.Diagnostics.Stopwatch.StartNew();
await coord.ShutdownAsync(timeoutMs: 50, TestContext.Current.CancellationToken);
sw.Stop();
sw.ElapsedMilliseconds.ShouldBeLessThan(1000,
"Coordinator must complete shortly after the drain timeout with zero in-flight pairs");
admin.StopCalled.ShouldBeTrue(
"admin.StopAsync must be called after the drain phase, even when timeout fires");
}
/// <summary>
/// Verifies the ordering guarantee: supervisors stop BEFORE the admin endpoint.
/// </summary>
[Fact]
public async Task Shutdown_AdminEndpointStopped_AfterListenersStopped()
{
int callOrder = 0;
int NextOrder() => Interlocked.Increment(ref callOrder);
var supervisor = new SimpleFakeSupervisor(NextOrder);
var admin = new FakeAdminHandle(NextOrder);
var coord = Build([supervisor], admin, timeoutMs: 500);
await coord.ShutdownAsync(timeoutMs: 500, TestContext.Current.CancellationToken);
supervisor.StopCalled.ShouldBeTrue("supervisor.StopAsync must be called");
admin.StopCalled.ShouldBeTrue("admin.StopAsync must be called");
supervisor.StopCallOrder.ShouldBeLessThan(admin.StopCallOrder,
"Supervisor.StopAsync must be called before AdminEndpoint.StopAsync");
}
}
@@ -5,6 +5,7 @@ using System.Net.Sockets;
using Mbproxy.Bcd;
using Mbproxy.Options;
using Mbproxy.Proxy;
using Mbproxy.Proxy.Cache;
using Mbproxy.Proxy.Multiplexing;
using Microsoft.Extensions.Logging.Abstractions;
using Shouldly;
@@ -623,4 +624,136 @@ public sealed class PlcMultiplexerTests
}
}
}
// ── Phase 12 Wave-1 regression tests ──────────────────────────────────────
/// <summary>
/// W1.1 — verifies that <see cref="PlcMultiplexer.ReplaceContext"/> swaps the live
/// per-PLC context on the running multiplexer, so the very next PDU's BCD rewriter
/// uses the new tag map (not the captured-at-construction map). Before W1.1 this
/// scenario would silently keep using the old map until the listener faulted and the
/// supervisor's Polly loop reconstructed everything.
/// </summary>
[Fact]
public async Task ReplaceContext_NewTagMap_VisibleOnNextPdu()
{
int backendPort = PickFreePort();
await using var backend = new StubBackend(backendPort);
backend.FcResponseFactory = (fc, _, _, txId) =>
fc == 0x03 ? BuildFc03Response(txId, 1, 0x1234) : Array.Empty<byte>();
// Context 1 — tag at addr 100, BCD16. Wire 0x1234 decodes to decimal 1234.
var ctx1 = MakeContext("PLC1", BcdTag.Create(100, 16));
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx1);
var (client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name);
try
{
// Read 1 with original ctx — wire 0x1234 should be decoded to 1234 (= 0x04D2).
await client.SendAsync(BuildFc03ReadFrame(1, 100, 1), SocketFlags.None);
var rsp1 = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken);
ushort decoded1 = (ushort)((rsp1[9] << 8) | rsp1[10]);
decoded1.ShouldBe((ushort)1234, "with tag at 100, BCD wire 0x1234 must decode to decimal 1234");
// Swap to an empty tag map (counters preserved per the design's reseat contract).
var ctx2 = new PerPlcContext
{
PlcName = "PLC1",
TagMap = BcdTagMap.Empty,
Counters = ctx1.Counters,
Logger = NullLogger.Instance,
};
mux.ReplaceContext(ctx2);
// Read 2 with swapped ctx — no tag, raw 0x1234 must pass through unchanged.
await client.SendAsync(BuildFc03ReadFrame(2, 100, 1), SocketFlags.None);
var rsp2 = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken);
ushort raw2 = (ushort)((rsp2[9] << 8) | rsp2[10]);
raw2.ShouldBe((ushort)0x1234,
"after ReplaceContext to empty tag map, the next PDU must use the new map and pass 0x1234 through unchanged");
}
finally
{
client.Dispose();
await pipe.DisposeAsync();
listener.Stop();
}
}
/// <summary>
/// W1.1 — verifies that swapping in a fresh response cache via <see cref="PlcMultiplexer.ReplaceContext"/>
/// makes the running multiplexer consult the NEW cache for subsequent reads, not the
/// old cache that was disposed by the supervisor. Without W1.1 the running mux would
/// keep its constructor-captured cache reference and either return stale entries or
/// hit a disposed cache.
/// </summary>
[Fact]
public async Task ReplaceContext_NewCache_NextReadGoesToBackend_NotOldCache()
{
int backendPort = PickFreePort();
await using var backend = new StubBackend(backendPort);
backend.FcResponseFactory = (fc, _, _, txId) =>
fc == 0x03 ? BuildFc03Response(txId, 1, (ushort)0x1111) : Array.Empty<byte>();
// Context 1 — cacheable tag at addr 200 with TTL 60_000 ms.
var tag = new BcdTag(200, 16, CacheTtlMs: 60_000);
var dict = new[] { tag }.ToDictionary(t => t.Address).ToFrozenDictionary();
var map = new BcdTagMap(dict);
var cache1 = new ResponseCache(maxEntriesPerPlc: 100, evictionIntervalMs: 5000);
var ctx1 = new PerPlcContext
{
PlcName = "PLC1",
TagMap = map,
Counters = new ProxyCounters(),
Logger = NullLogger.Instance,
Cache = cache1,
};
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx1);
var (client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name);
try
{
// Read 1 — populates cache1 from backend.
await client.SendAsync(BuildFc03ReadFrame(1, 200, 1), SocketFlags.None);
_ = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken);
await Task.Delay(50, TestContext.Current.CancellationToken);
int afterFirst = backend.SeenProxyTxIds.Count;
cache1.Count.ShouldBe(1, "cache1 must contain the first read");
// Read 2 — must hit cache1 (no backend traffic).
await client.SendAsync(BuildFc03ReadFrame(2, 200, 1), SocketFlags.None);
_ = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken);
backend.SeenProxyTxIds.Count.ShouldBe(afterFirst, "cache hit must not produce backend traffic");
// Swap in a brand-new (empty) cache via ReplaceContext.
var cache2 = new ResponseCache(maxEntriesPerPlc: 100, evictionIntervalMs: 5000);
var ctx2 = new PerPlcContext
{
PlcName = "PLC1",
TagMap = map,
Counters = ctx1.Counters,
Logger = NullLogger.Instance,
Cache = cache2,
};
mux.ReplaceContext(ctx2);
// Read 3 — old cache had the entry, but mux now uses cache2 which is empty,
// so the next read MUST go to the backend.
await client.SendAsync(BuildFc03ReadFrame(3, 200, 1), SocketFlags.None);
_ = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken);
await Task.Delay(50, TestContext.Current.CancellationToken);
backend.SeenProxyTxIds.Count.ShouldBe(afterFirst + 1,
"after ReplaceContext, the running multiplexer must consult the NEW cache (empty) — not the old one (still warm)");
cache2.Count.ShouldBe(1, "the new cache should be populated by the post-swap read");
}
finally
{
client.Dispose();
await pipe.DisposeAsync();
listener.Stop();
cache1.Dispose();
}
}
}
@@ -0,0 +1,104 @@
using System.Net;
using System.Net.Sockets;
using Mbproxy.Proxy.Multiplexing;
using Microsoft.Extensions.Logging.Abstractions;
using Shouldly;
using Xunit;
namespace Mbproxy.Tests.Proxy.Multiplexing;
/// <summary>
/// Unit tests for <see cref="UpstreamPipe"/>'s response-channel contract — particularly
/// the Phase 12 (W1.3) <see cref="UpstreamPipe.TrySendResponse"/> non-blocking enqueue
/// added so the per-PLC backend reader cannot be stalled by one slow upstream client.
/// </summary>
[Trait("Category", "Unit")]
public sealed class UpstreamPipeTests
{
// ── Helpers ───────────────────────────────────────────────────────────────
private static async Task<(Socket clientSide, Socket serverSide)> AcceptedSocketPairAsync()
{
// Build a loopback listener and connect a client to get a real socket pair.
var listener = new TcpListener(IPAddress.Loopback, 0);
listener.Start();
try
{
int port = ((IPEndPoint)listener.LocalEndpoint).Port;
var clientSide = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
var connectTask = clientSide.ConnectAsync(IPAddress.Loopback, port);
var serverSide = await listener.AcceptSocketAsync();
await connectTask;
return (clientSide, serverSide);
}
finally
{
listener.Stop();
}
}
// ── Tests ─────────────────────────────────────────────────────────────────
/// <summary>
/// W1.3 — when no write-loop is draining the response channel, repeated
/// <see cref="UpstreamPipe.TrySendResponse"/> calls must succeed up to the channel's
/// bounded capacity and return <c>false</c> on every subsequent call without blocking.
/// This is the non-blocking contract the per-PLC backend reader relies on.
/// </summary>
[Fact]
public async Task TrySendResponse_WhenChannelFull_ReturnsFalse_WithoutBlocking()
{
var (client, server) = await AcceptedSocketPairAsync();
try
{
// Construct the pipe but do NOT call RunWriteLoopAsync — the channel will not
// be drained, so it fills after `ResponseChannelCapacity` (= 16) writes.
var pipe = new UpstreamPipe(server, "TEST", NullLogger.Instance);
int successes = 0;
int failures = 0;
for (int i = 0; i < 100; i++)
{
bool ok = pipe.TrySendResponse(new byte[] { 0, 0 });
if (ok) successes++;
else failures++;
}
successes.ShouldBe(16,
"the channel's bounded capacity is 16; first 16 writes must succeed");
failures.ShouldBe(84,
"after capacity is reached, every further TrySendResponse must return false (not block)");
await pipe.DisposeAsync();
}
finally
{
try { client.Dispose(); } catch { }
try { server.Dispose(); } catch { }
}
}
/// <summary>
/// W1.3 — once the pipe has been disposed, <see cref="UpstreamPipe.TrySendResponse"/>
/// returns <c>false</c> regardless of channel state, never throws.
/// </summary>
[Fact]
public async Task TrySendResponse_AfterDispose_ReturnsFalse()
{
var (client, server) = await AcceptedSocketPairAsync();
try
{
var pipe = new UpstreamPipe(server, "TEST", NullLogger.Instance);
await pipe.DisposeAsync();
bool ok = pipe.TrySendResponse(new byte[] { 0, 0 });
ok.ShouldBeFalse("a disposed pipe must reject sends without throwing");
}
finally
{
try { client.Dispose(); } catch { }
try { server.Dispose(); } catch { }
}
}
}