using System.Collections.Concurrent;
using System.Collections.Frozen;
using System.Net;
using System.Net.Sockets;
using Mbproxy.Bcd;
using Mbproxy.Options;
using Mbproxy.Proxy;
using Mbproxy.Proxy.Cache;
using Mbproxy.Proxy.Multiplexing;
using Microsoft.Extensions.Logging.Abstractions;
using Shouldly;
using Xunit;
namespace Mbproxy.Tests.Proxy.Multiplexing;
///
/// Integration tests for against a stub backend
/// (a that canned-responds). Uses real sockets but no simulator.
///
[Trait("Category", "Unit")]
public sealed class PlcMultiplexerTests
{
// ── Helpers ────────────────────────────────────────────────────────────────
private static int PickFreePort()
{
var l = new TcpListener(IPAddress.Loopback, 0);
l.Start();
int port = ((IPEndPoint)l.LocalEndpoint).Port;
l.Stop();
return port;
}
///
/// Reads exactly bytes from .
///
private static async Task ReadExactAsync(Socket socket, int count, CancellationToken ct)
{
var buf = new byte[count];
int read = 0;
while (read < count)
{
int n = await socket.ReceiveAsync(buf.AsMemory(read, count - read), SocketFlags.None, ct);
if (n == 0) throw new IOException("EOF");
read += n;
}
return buf;
}
private static async Task ReadOneFrameAsync(Socket socket, CancellationToken ct)
{
var header = await ReadExactAsync(socket, 7, ct);
ushort length = (ushort)((header[4] << 8) | header[5]);
int bodyLen = length - 1;
var body = bodyLen > 0 ? await ReadExactAsync(socket, bodyLen, ct) : Array.Empty();
var frame = new byte[7 + bodyLen];
Buffer.BlockCopy(header, 0, frame, 0, 7);
if (bodyLen > 0) Buffer.BlockCopy(body, 0, frame, 7, bodyLen);
return frame;
}
private static byte[] BuildFc03ReadFrame(ushort txId, ushort start, ushort qty, byte unitId = 1)
=>
[
(byte)(txId >> 8), (byte)(txId & 0xFF),
0x00, 0x00,
0x00, 0x06,
unitId,
0x03,
(byte)(start >> 8), (byte)(start & 0xFF),
(byte)(qty >> 8), (byte)(qty & 0xFF),
];
private static byte[] BuildFc06WriteFrame(ushort txId, ushort addr, ushort value, byte unitId = 1)
=>
[
(byte)(txId >> 8), (byte)(txId & 0xFF),
0x00, 0x00,
0x00, 0x06,
unitId,
0x06,
(byte)(addr >> 8), (byte)(addr & 0xFF),
(byte)(value >> 8), (byte)(value & 0xFF),
];
private static byte[] BuildFc03Response(ushort txId, byte unitId, params ushort[] registers)
{
int bodyLen = 2 + registers.Length * 2; // FC + byteCount + register data
var frame = new byte[7 + bodyLen];
frame[0] = (byte)(txId >> 8);
frame[1] = (byte)(txId & 0xFF);
frame[2] = 0;
frame[3] = 0;
ushort length = (ushort)(1 + bodyLen); // UnitId + PDU
frame[4] = (byte)(length >> 8);
frame[5] = (byte)(length & 0xFF);
frame[6] = unitId;
frame[7] = 0x03;
frame[8] = (byte)(registers.Length * 2);
for (int i = 0; i < registers.Length; i++)
{
frame[9 + i * 2] = (byte)(registers[i] >> 8);
frame[9 + i * 2 + 1] = (byte)(registers[i] & 0xFF);
}
return frame;
}
///
/// FC06 response echo with txId / addr / value.
///
private static byte[] BuildFc06Response(ushort txId, byte unitId, ushort addr, ushort value)
{
var frame = new byte[7 + 5];
frame[0] = (byte)(txId >> 8);
frame[1] = (byte)(txId & 0xFF);
frame[2] = 0; frame[3] = 0;
frame[4] = 0; frame[5] = 6; // length: UnitId(1) + FC(1) + Addr(2) + Value(2)
frame[6] = unitId;
frame[7] = 0x06;
frame[8] = (byte)(addr >> 8);
frame[9] = (byte)(addr & 0xFF);
frame[10] = (byte)(value >> 8);
frame[11] = (byte)(value & 0xFF);
return frame;
}
private static PerPlcContext MakeContext(string name, params BcdTag[] tags)
{
var frozen = tags.ToDictionary(t => t.Address).ToFrozenDictionary();
var map = frozen.Count > 0 ? new BcdTagMap(frozen) : BcdTagMap.Empty;
return new PerPlcContext
{
PlcName = name,
TagMap = map,
Counters = new ProxyCounters(),
Logger = NullLogger.Instance,
};
}
///
/// A stub backend that echoes FC03 responses for every request, recording the proxy
/// TxIds it sees on the wire so tests can verify the multiplexer rewrites them.
///
private sealed class StubBackend : IAsyncDisposable
{
public int Port { get; }
private readonly TcpListener _listener;
private readonly CancellationTokenSource _cts = new();
private readonly List _clientTasks = new();
public ConcurrentQueue SeenProxyTxIds { get; } = new();
public Func? FcResponseFactory { get; set; }
public StubBackend(int port)
{
Port = port;
_listener = new TcpListener(IPAddress.Loopback, port);
_listener.Start();
_ = AcceptLoop();
}
private async Task AcceptLoop()
{
try
{
while (!_cts.IsCancellationRequested)
{
Socket s = await _listener.AcceptSocketAsync(_cts.Token);
var task = Task.Run(() => HandleAsync(s));
lock (_clientTasks) _clientTasks.Add(task);
}
}
catch { /* shutdown */ }
}
private async Task HandleAsync(Socket s)
{
try
{
while (!_cts.IsCancellationRequested)
{
var req = await ReadOneFrameAsync(s, _cts.Token);
if (req.Length < 8) break;
ushort txId = (ushort)((req[0] << 8) | req[1]);
SeenProxyTxIds.Enqueue(txId);
byte unitId = req[6];
byte fc = req[7];
byte[] response;
if (FcResponseFactory is not null)
{
ushort start = req.Length >= 10 ? (ushort)((req[8] << 8) | req[9]) : (ushort)0;
ushort qty = req.Length >= 12 ? (ushort)((req[10] << 8) | req[11]) : (ushort)0;
response = FcResponseFactory(fc, start, qty, txId);
}
else if (fc == 0x03)
{
// Default: FC03 echo a single register containing 0x1234.
response = BuildFc03Response(txId, unitId, 0x1234);
}
else if (fc == 0x06)
{
ushort addr = (ushort)((req[8] << 8) | req[9]);
ushort value = (ushort)((req[10] << 8) | req[11]);
response = BuildFc06Response(txId, unitId, addr, value);
}
else
{
break;
}
await s.SendAsync(response, SocketFlags.None, _cts.Token);
}
}
catch { /* normal */ }
finally { try { s.Dispose(); } catch { } }
}
public async ValueTask DisposeAsync()
{
await _cts.CancelAsync();
try { _listener.Stop(); } catch { }
Task[] snap;
lock (_clientTasks) snap = _clientTasks.ToArray();
try { await Task.WhenAll(snap).WaitAsync(TimeSpan.FromSeconds(2)); } catch { }
_cts.Dispose();
}
}
private static async Task BuildMuxAsync(
PlcOptions plc, ConnectionOptions connOpts, PerPlcContext ctx)
{
var mux = new PlcMultiplexer(
plc, connOpts,
new BcdPduPipeline(),
ctx,
NullLogger.Instance,
backendConnectPipeline: null);
await Task.Yield();
return mux;
}
private static async Task<(Socket client, UpstreamPipe pipe, TcpListener proxyListener, int proxyPort)>
ConnectClientAsync(PlcMultiplexer mux, string plcName)
{
int proxyPort = PickFreePort();
var proxyListener = new TcpListener(IPAddress.Loopback, proxyPort);
proxyListener.Start();
var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)
{ NoDelay = true };
await client.ConnectAsync(IPAddress.Loopback, proxyPort);
var upstream = await proxyListener.AcceptSocketAsync();
var pipe = new UpstreamPipe(upstream, plcName, NullLogger.Instance);
_ = Task.Run(() => mux.StartPipeAsync(pipe, CancellationToken.None));
return (client, pipe, proxyListener, proxyPort);
}
// ── Tests ─────────────────────────────────────────────────────────────────
[Fact]
public async Task SingleUpstream_RoundTripsFC03_Through_Multiplexer()
{
int backendPort = PickFreePort();
await using var backend = new StubBackend(backendPort);
var ctx = MakeContext("PLC1", BcdTag.Create(100, 16));
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx);
var (client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name);
try
{
await client.SendAsync(BuildFc03ReadFrame(0x1234, 100, 1), SocketFlags.None);
var rsp = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken);
ushort rspTxId = (ushort)((rsp[0] << 8) | rsp[1]);
rspTxId.ShouldBe((ushort)0x1234, "the original TxId must be restored on the way back to the client");
// BCD decode of the stub's 0x1234 response = 1234.
ushort decoded = (ushort)((rsp[9] << 8) | rsp[10]);
decoded.ShouldBe((ushort)1234);
}
finally
{
client.Dispose();
await pipe.DisposeAsync();
listener.Stop();
}
}
[Fact]
public async Task SingleUpstream_RoundTripsFC06_Through_Multiplexer()
{
int backendPort = PickFreePort();
await using var backend = new StubBackend(backendPort);
var ctx = MakeContext("PLC1", BcdTag.Create(200, 16));
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx);
var (client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name);
try
{
// Client writes binary 1234; proxy encodes to BCD 0x1234 on the way out.
await client.SendAsync(BuildFc06WriteFrame(0xABCD, 200, 1234), SocketFlags.None);
var rsp = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken);
ushort rspTxId = (ushort)((rsp[0] << 8) | rsp[1]);
rspTxId.ShouldBe((ushort)0xABCD);
// Echo bytes decoded back to client binary.
ushort echoed = (ushort)((rsp[10] << 8) | rsp[11]);
echoed.ShouldBe((ushort)1234);
}
finally
{
client.Dispose();
await pipe.DisposeAsync();
listener.Stop();
}
}
[Fact]
public async Task TwoUpstreams_ConcurrentFC03_BothGetCorrectResponses()
{
int backendPort = PickFreePort();
await using var backend = new StubBackend(backendPort)
{
// Both clients read address 100; both should see their own TxId echoed.
FcResponseFactory = (fc, start, qty, txId) =>
{
byte unitId = 1;
return fc == 0x03
? BuildFc03Response(txId, unitId, 0x1234)
: throw new InvalidOperationException("unexpected fc");
},
};
var ctx = MakeContext("PLC1", BcdTag.Create(100, 16));
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx);
var (c1, p1, l1, _) = await ConnectClientAsync(mux, plc.Name);
var (c2, p2, l2, _) = await ConnectClientAsync(mux, plc.Name);
try
{
// Both clients use the same upstream TxId (0x0001). That would clash on a
// shared backend wire if the mux didn't rewrite the TxId.
await c1.SendAsync(BuildFc03ReadFrame(0x0001, 100, 1), SocketFlags.None);
await c2.SendAsync(BuildFc03ReadFrame(0x0001, 100, 1), SocketFlags.None);
var r1 = await ReadOneFrameAsync(c1, TestContext.Current.CancellationToken);
var r2 = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken);
// Both responses must carry the original (colliding) TxId.
((ushort)((r1[0] << 8) | r1[1])).ShouldBe((ushort)0x0001);
((ushort)((r2[0] << 8) | r2[1])).ShouldBe((ushort)0x0001);
}
finally
{
c1.Dispose(); c2.Dispose();
await p1.DisposeAsync(); await p2.DisposeAsync();
l1.Stop(); l2.Stop();
}
}
[Fact]
public async Task TwoUpstreams_ProxyTxIds_AreDistinct_OnTheWire()
{
int backendPort = PickFreePort();
await using var backend = new StubBackend(backendPort);
var ctx = MakeContext("PLC1");
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx);
var (c1, p1, l1, _) = await ConnectClientAsync(mux, plc.Name);
var (c2, p2, l2, _) = await ConnectClientAsync(mux, plc.Name);
try
{
// Both clients use the same upstream TxId 0x0007 — the proxy must hand out
// distinct proxy TxIds on the backend wire. Reads target DIFFERENT addresses
// so coalescing does not fuse them into a single backend request.
await c1.SendAsync(BuildFc03ReadFrame(0x0007, 0, 1), SocketFlags.None);
await c2.SendAsync(BuildFc03ReadFrame(0x0007, 10, 1), SocketFlags.None);
_ = await ReadOneFrameAsync(c1, TestContext.Current.CancellationToken);
_ = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken);
// Collect what the backend saw.
var seen = new HashSet(backend.SeenProxyTxIds);
seen.Count.ShouldBeGreaterThanOrEqualTo(2, "the multiplexer must allocate distinct proxy TxIds even when upstreams collide");
}
finally
{
c1.Dispose(); c2.Dispose();
await p1.DisposeAsync(); await p2.DisposeAsync();
l1.Stop(); l2.Stop();
}
}
[Fact]
public async Task UpstreamDisconnect_DoesNotAffectOtherUpstreams()
{
int backendPort = PickFreePort();
await using var backend = new StubBackend(backendPort);
var ctx = MakeContext("PLC1");
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx);
var (cA, pA, lA, _) = await ConnectClientAsync(mux, plc.Name);
var (cB, pB, lB, _) = await ConnectClientAsync(mux, plc.Name);
try
{
// Drop client A entirely.
cA.Dispose();
await Task.Delay(50, TestContext.Current.CancellationToken);
// Client B should still be able to round-trip.
await cB.SendAsync(BuildFc03ReadFrame(0x0042, 0, 1), SocketFlags.None);
var rsp = await ReadOneFrameAsync(cB, TestContext.Current.CancellationToken);
((ushort)((rsp[0] << 8) | rsp[1])).ShouldBe((ushort)0x0042);
}
finally
{
cB.Dispose();
await pA.DisposeAsync(); await pB.DisposeAsync();
lA.Stop(); lB.Stop();
}
}
[Fact]
public async Task BackendDisconnect_CascadesToAllUpstreams()
{
int backendPort = PickFreePort();
var backend = new StubBackend(backendPort);
var ctx = MakeContext("PLC1");
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx);
var (cA, pA, lA, _) = await ConnectClientAsync(mux, plc.Name);
var (cB, pB, lB, _) = await ConnectClientAsync(mux, plc.Name);
var (cC, pC, lC, _) = await ConnectClientAsync(mux, plc.Name);
try
{
// Force a round-trip on each so backend connect occurs first.
await cA.SendAsync(BuildFc03ReadFrame(1, 0, 1), SocketFlags.None);
await cB.SendAsync(BuildFc03ReadFrame(2, 0, 1), SocketFlags.None);
await cC.SendAsync(BuildFc03ReadFrame(3, 0, 1), SocketFlags.None);
_ = await ReadOneFrameAsync(cA, TestContext.Current.CancellationToken);
_ = await ReadOneFrameAsync(cB, TestContext.Current.CancellationToken);
_ = await ReadOneFrameAsync(cC, TestContext.Current.CancellationToken);
// Kill the backend.
await backend.DisposeAsync();
// All three upstream sockets should observe a clean EOF within 500 ms.
var sw = System.Diagnostics.Stopwatch.StartNew();
await WaitForCloseAsync(cA, TestContext.Current.CancellationToken);
await WaitForCloseAsync(cB, TestContext.Current.CancellationToken);
await WaitForCloseAsync(cC, TestContext.Current.CancellationToken);
sw.Stop();
sw.ElapsedMilliseconds.ShouldBeLessThan(2000, "cascade should propagate quickly");
// Poll briefly for the cascade counter — there is an inherent scheduling gap
// between "upstream socket EOF observed" (WaitForCloseAsync returns) and "the
// multiplexer's TearDownBackendAsync increments the counter after awaiting
// every pipe.DisposeAsync". This poll absorbs that scheduling jitter without
// weakening the assertion's semantics — the counter MUST reach 3 (or more)
// because all three upstream pipes were attached when the cascade fired.
long cascades = 0;
for (int i = 0; i < 50; i++)
{
cascades = ctx.Counters.Snapshot().BackendDisconnectCascades;
if (cascades >= 3) break;
await Task.Delay(20, TestContext.Current.CancellationToken);
}
cascades.ShouldBeGreaterThanOrEqualTo(3);
}
finally
{
cA.Dispose(); cB.Dispose(); cC.Dispose();
await pA.DisposeAsync(); await pB.DisposeAsync(); await pC.DisposeAsync();
lA.Stop(); lB.Stop(); lC.Stop();
}
}
[Fact]
public async Task RequestTimeoutWatchdog_DeliversException0B_ToUpstream_WhenBackendNeverResponds()
{
// A drain-only stub that consumes requests but never responds. The multiplexer's
// per-request watchdog must surface a Modbus exception 0x0B to the upstream client
// once BackendRequestTimeoutMs elapses, freeing the proxy TxId + correlation entry.
int backendPort = PickFreePort();
var drainListener = new TcpListener(IPAddress.Loopback, backendPort);
drainListener.Start();
var drainCts = new CancellationTokenSource();
var drainToken = drainCts.Token;
_ = Task.Run(async () =>
{
try
{
while (!drainToken.IsCancellationRequested)
{
var s = await drainListener.AcceptSocketAsync(drainToken);
_ = Task.Run(async () =>
{
var buf = new byte[256];
try
{
while (!drainToken.IsCancellationRequested)
{
int n = await s.ReceiveAsync(buf, SocketFlags.None, drainToken);
if (n == 0) break;
}
}
catch { }
finally { try { s.Dispose(); } catch { } }
}, drainToken);
}
}
catch { }
}, drainToken);
try
{
var ctx = MakeContext("PLC1");
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
// Short request timeout so the test does not have to wait long.
var connOpts = new ConnectionOptions { BackendRequestTimeoutMs = 400 };
await using var mux = await BuildMuxAsync(plc, connOpts, ctx);
var (client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name);
try
{
await client.SendAsync(BuildFc03ReadFrame(0xABCD, 0, 1), SocketFlags.None);
// The watchdog should deliver an exception within ~watchdog-tick * 2.
var rsp = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken);
ushort rspTxId = (ushort)((rsp[0] << 8) | rsp[1]);
rspTxId.ShouldBe((ushort)0xABCD, "watchdog must echo the original client TxId");
byte fcByte = rsp[7];
(fcByte & 0x80).ShouldBe(0x80, "FC must have the exception bit set");
(fcByte & 0x7F).ShouldBe(0x03, "original FC must be FC03 (read holding registers)");
rsp[8].ShouldBe((byte)0x0B, "exception code must be 0x0B (Gateway Target Device Failed To Respond)");
}
finally
{
client.Dispose();
await pipe.DisposeAsync();
listener.Stop();
}
}
finally
{
await drainCts.CancelAsync();
try { drainListener.Stop(); } catch { }
drainCts.Dispose();
}
}
[Fact]
public async Task BackendReconnect_AfterCascade_NextUpstreamRequest_Succeeds()
{
int backendPort = PickFreePort();
var backend = new StubBackend(backendPort);
var ctx = MakeContext("PLC1");
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx);
var (cA, pA, lA, _) = await ConnectClientAsync(mux, plc.Name);
try
{
await cA.SendAsync(BuildFc03ReadFrame(1, 0, 1), SocketFlags.None);
_ = await ReadOneFrameAsync(cA, TestContext.Current.CancellationToken);
await backend.DisposeAsync();
await WaitForCloseAsync(cA, TestContext.Current.CancellationToken);
cA.Dispose();
await pA.DisposeAsync();
lA.Stop();
}
catch { /* tolerate any teardown noise */ }
// Start a new backend on the same port.
await using var backend2 = new StubBackend(backendPort);
// A fresh client should round-trip cleanly through the same multiplexer.
var (cB, pB, lB, _) = await ConnectClientAsync(mux, plc.Name);
try
{
await cB.SendAsync(BuildFc03ReadFrame(0x7777, 0, 1), SocketFlags.None);
var rsp = await ReadOneFrameAsync(cB, TestContext.Current.CancellationToken);
((ushort)((rsp[0] << 8) | rsp[1])).ShouldBe((ushort)0x7777);
}
finally
{
cB.Dispose();
await pB.DisposeAsync();
lB.Stop();
}
}
private static async Task WaitForCloseAsync(Socket s, CancellationToken ct)
{
var buf = new byte[1];
using var deadline = CancellationTokenSource.CreateLinkedTokenSource(ct);
deadline.CancelAfter(TimeSpan.FromSeconds(2));
while (!deadline.IsCancellationRequested)
{
try
{
int n = await s.ReceiveAsync(buf, SocketFlags.None, deadline.Token);
if (n == 0) return;
}
catch
{
return;
}
}
}
// ── ReplaceContext live-swap regression tests ────────────────────────────────
///
/// Verifies that swaps the live per-PLC
/// context on the running multiplexer, so the very next PDU's BCD rewriter uses the
/// new tag map (not the captured-at-construction map). Without the live swap this
/// scenario would silently keep using the old map until the listener faulted and the
/// supervisor's Polly loop reconstructed everything.
///
[Fact]
public async Task ReplaceContext_NewTagMap_VisibleOnNextPdu()
{
int backendPort = PickFreePort();
await using var backend = new StubBackend(backendPort);
backend.FcResponseFactory = (fc, _, _, txId) =>
fc == 0x03 ? BuildFc03Response(txId, 1, 0x1234) : Array.Empty();
// Context 1 — tag at addr 100, BCD16. Wire 0x1234 decodes to decimal 1234.
var ctx1 = MakeContext("PLC1", BcdTag.Create(100, 16));
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx1);
var (client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name);
try
{
// Read 1 with original ctx — wire 0x1234 should be decoded to 1234 (= 0x04D2).
await client.SendAsync(BuildFc03ReadFrame(1, 100, 1), SocketFlags.None);
var rsp1 = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken);
ushort decoded1 = (ushort)((rsp1[9] << 8) | rsp1[10]);
decoded1.ShouldBe((ushort)1234, "with tag at 100, BCD wire 0x1234 must decode to decimal 1234");
// Swap to an empty tag map (counters preserved per the design's reseat contract).
var ctx2 = new PerPlcContext
{
PlcName = "PLC1",
TagMap = BcdTagMap.Empty,
Counters = ctx1.Counters,
Logger = NullLogger.Instance,
};
mux.ReplaceContext(ctx2);
// Read 2 with swapped ctx — no tag, raw 0x1234 must pass through unchanged.
await client.SendAsync(BuildFc03ReadFrame(2, 100, 1), SocketFlags.None);
var rsp2 = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken);
ushort raw2 = (ushort)((rsp2[9] << 8) | rsp2[10]);
raw2.ShouldBe((ushort)0x1234,
"after ReplaceContext to empty tag map, the next PDU must use the new map and pass 0x1234 through unchanged");
}
finally
{
client.Dispose();
await pipe.DisposeAsync();
listener.Stop();
}
}
///
/// Verifies that swapping in a fresh response cache via
/// makes the running multiplexer consult
/// the NEW cache for subsequent reads, not the old cache that was disposed by the
/// supervisor. Without the live swap the running mux would keep its constructor-
/// captured cache reference and either return stale entries or hit a disposed cache.
///
[Fact]
public async Task ReplaceContext_NewCache_NextReadGoesToBackend_NotOldCache()
{
int backendPort = PickFreePort();
await using var backend = new StubBackend(backendPort);
backend.FcResponseFactory = (fc, _, _, txId) =>
fc == 0x03 ? BuildFc03Response(txId, 1, (ushort)0x1111) : Array.Empty();
// Context 1 — cacheable tag at addr 200 with TTL 60_000 ms.
var tag = new BcdTag(200, 16, CacheTtlMs: 60_000);
var dict = new[] { tag }.ToDictionary(t => t.Address).ToFrozenDictionary();
var map = new BcdTagMap(dict);
var cache1 = new ResponseCache(maxEntriesPerPlc: 100, evictionIntervalMs: 5000);
var ctx1 = new PerPlcContext
{
PlcName = "PLC1",
TagMap = map,
Counters = new ProxyCounters(),
Logger = NullLogger.Instance,
Cache = cache1,
};
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx1);
var (client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name);
try
{
// Read 1 — populates cache1 from backend.
await client.SendAsync(BuildFc03ReadFrame(1, 200, 1), SocketFlags.None);
_ = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken);
await Task.Delay(50, TestContext.Current.CancellationToken);
int afterFirst = backend.SeenProxyTxIds.Count;
cache1.Count.ShouldBe(1, "cache1 must contain the first read");
// Read 2 — must hit cache1 (no backend traffic).
await client.SendAsync(BuildFc03ReadFrame(2, 200, 1), SocketFlags.None);
_ = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken);
backend.SeenProxyTxIds.Count.ShouldBe(afterFirst, "cache hit must not produce backend traffic");
// Swap in a brand-new (empty) cache via ReplaceContext.
var cache2 = new ResponseCache(maxEntriesPerPlc: 100, evictionIntervalMs: 5000);
var ctx2 = new PerPlcContext
{
PlcName = "PLC1",
TagMap = map,
Counters = ctx1.Counters,
Logger = NullLogger.Instance,
Cache = cache2,
};
mux.ReplaceContext(ctx2);
// Read 3 — old cache had the entry, but mux now uses cache2 which is empty,
// so the next read MUST go to the backend.
await client.SendAsync(BuildFc03ReadFrame(3, 200, 1), SocketFlags.None);
_ = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken);
await Task.Delay(50, TestContext.Current.CancellationToken);
backend.SeenProxyTxIds.Count.ShouldBe(afterFirst + 1,
"after ReplaceContext, the running multiplexer must consult the NEW cache (empty) — not the old one (still warm)");
cache2.Count.ShouldBe(1, "the new cache should be populated by the post-swap read");
}
finally
{
client.Dispose();
await pipe.DisposeAsync();
listener.Stop();
cache1.Dispose();
}
}
// ── Final-tier race tests ─────────────────────────────────────────────────
///
/// Reflection helper — drains the multiplexer's TxIdAllocator by calling
/// TryAllocate in a loop until it refuses. Returns the number of slots taken
/// so callers can later release them if needed. Used only by the saturation tests
/// below; production code never touches the allocator from outside the multiplexer.
///
private static int DrainAllocator(PlcMultiplexer mux)
{
var allocField = typeof(PlcMultiplexer).GetField("_allocator",
System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!;
object alloc = allocField.GetValue(mux)!;
var tryAllocate = alloc.GetType().GetMethod("TryAllocate",
System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.Instance)!;
int taken = 0;
object?[] args = [(ushort)0];
while ((bool)tryAllocate.Invoke(alloc, args)!)
taken++;
return taken;
}
///
/// TxId allocator saturation propagates as a Modbus exception 04 to the upstream
/// client (no hang, no crash). The 16-bit TxId space (65,536 slots) is pre-saturated
/// via reflection so the next request hits the !_allocator.TryAllocate branch
/// in OnUpstreamFrameAsync immediately.
///
[Fact]
public async Task TxIdAllocator_Saturated_NextRequest_GetsException04_WithOriginalTxId()
{
int backendPort = PickFreePort();
await using var backend = new StubBackend(backendPort);
var ctx = MakeContext("PLC1");
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx);
var (client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name);
try
{
// Force the multiplexer to bring up its backend so the saturation check at
// OnUpstreamFrameAsync's allocator path is the only thing left to fail.
// (We send a normal request first, drain the response, then saturate.)
await client.SendAsync(BuildFc03ReadFrame(1, 0, 1), SocketFlags.None);
_ = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken);
int taken = DrainAllocator(mux);
// The TxId space is 65536; the previous request released its slot when the
// response landed, so we should be able to take ≥ 65535 fresh slots.
taken.ShouldBeGreaterThanOrEqualTo(65_535);
// Send a request that MUST hit saturation. Use a non-coalescing FC (06) to
// exercise the simpler non-coalescing saturation branch directly.
const ushort txId = 0xBEEF;
await client.SendAsync(BuildFc06WriteFrame(txId, addr: 100, value: 0x1234), SocketFlags.None);
var rsp = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken);
// Validate the saturation exception: original TxId echoed, exception bit set,
// exception code 04 (SLAVE_DEVICE_FAILURE).
ushort echoTxId = (ushort)((rsp[0] << 8) | rsp[1]);
echoTxId.ShouldBe(txId, "saturation exception must echo the original client TxId");
byte fc = rsp[7];
(fc & 0x80).ShouldBe(0x80, "FC must have the exception bit set");
(fc & 0x7F).ShouldBe(0x06, "original FC must be FC06");
rsp[8].ShouldBe((byte)0x04, "exception code must be 0x04 (Slave Device Failure)");
}
finally
{
client.Dispose();
await pipe.DisposeAsync();
listener.Stop();
}
}
///
/// Under TxId saturation, two concurrent identical FC03 reads must BOTH receive
/// exception 04 (one as the leader directly, the other either via a coalesced
/// fan-out from the saturation cleanup OR via its own independent saturation path —
/// either timing produces the same observable contract). Validates that no pipe
/// hangs forever waiting for a backend response that would never arrive.
///
[Fact]
public async Task TxIdAllocator_Saturated_TwoConcurrentIdenticalReads_BothPipesGetException04()
{
int backendPort = PickFreePort();
await using var backend = new StubBackend(backendPort);
var ctx = MakeContext("PLC1");
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx);
// Bring backend up via a primer request from a throwaway pipe.
var (primer, primerPipe, primerListener, _) = await ConnectClientAsync(mux, plc.Name);
await primer.SendAsync(BuildFc03ReadFrame(1, 0, 1), SocketFlags.None);
_ = await ReadOneFrameAsync(primer, TestContext.Current.CancellationToken);
primer.Dispose();
await primerPipe.DisposeAsync();
primerListener.Stop();
// Saturate after primer has released its slot.
DrainAllocator(mux);
// Two pipes, each sends the same FC03 (unitId=1, fc=03, start=200, qty=1).
var (cA, pA, lA, _) = await ConnectClientAsync(mux, plc.Name);
var (cB, pB, lB, _) = await ConnectClientAsync(mux, plc.Name);
try
{
const ushort txA = 0xAAA1;
const ushort txB = 0xBBB1;
// Fire concurrently; both targets the same coalescing key.
await Task.WhenAll(
cA.SendAsync(BuildFc03ReadFrame(txA, 200, 1), SocketFlags.None),
cB.SendAsync(BuildFc03ReadFrame(txB, 200, 1), SocketFlags.None));
var rspA = await ReadOneFrameAsync(cA, TestContext.Current.CancellationToken);
var rspB = await ReadOneFrameAsync(cB, TestContext.Current.CancellationToken);
// Both must be exception 04 with the original TxId echoed — the contract
// is "no late attacher hangs."
foreach (var (rsp, expectedTxId, label) in new[] {
(rspA, txA, "A"), (rspB, txB, "B") })
{
ushort echo = (ushort)((rsp[0] << 8) | rsp[1]);
echo.ShouldBe(expectedTxId, $"{label}: original TxId must be echoed");
byte fc = rsp[7];
(fc & 0x80).ShouldBe(0x80, $"{label}: exception bit must be set");
rsp[8].ShouldBe((byte)0x04, $"{label}: exception code must be 0x04");
}
}
finally
{
cA.Dispose(); cB.Dispose();
await pA.DisposeAsync(); await pB.DisposeAsync();
lA.Stop(); lB.Stop();
}
}
///
/// Backend-reader head-of-line block guard. One upstream pipe is wedged by the test
/// holding its socket-receive side without reading. The fan-out is routed through
/// TrySendResponse so the per-PLC backend reader cannot be stalled by a
/// wedged pipe; responses to a healthy peer must keep flowing and the wedged pipe's
/// responseDropForFullUpstream counter must increment.
///
/// A synchronous await SendResponseAsync inside the reader would block
/// on the wedged pipe's full bounded channel and starve every peer.
///
[Fact]
public async Task SlowUpstream_DoesNotStallPeerResponses_DropCounterIncrements()
{
int backendPort = PickFreePort();
await using var backend = new StubBackend(backendPort);
var ctx = MakeContext("PLC1");
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx);
// Wedged client A — connects but NEVER reads from its socket. The mux's per-pipe
// response channel (cap 16) plus the kernel send buffer eventually fill, then
// TrySendResponse returns false and the drop counter increments.
var (cA, pA, lA, _) = await ConnectClientAsync(mux, plc.Name);
// Healthy client B — will read responses normally.
var (cB, pB, lB, _) = await ConnectClientAsync(mux, plc.Name);
try
{
// Pump A with many requests but DON'T read responses from A's socket. The
// exact count to fill the channel + kernel buffer is environment-dependent;
// a few hundred 12-byte responses is enough on Windows loopback.
const int wedgePumpCount = 500;
for (ushort i = 1; i <= wedgePumpCount; i++)
{
await cA.SendAsync(BuildFc03ReadFrame(i, 0, 1), SocketFlags.None);
}
// Give time for backend round-trips to fan back, channels to fill, and drops
// to start happening on A.
// We don't synchronously assert the drop counter yet — we want to first prove
// B is still being served before doing the drop assertion, otherwise a slow
// CI machine might fail the timing on B before the drops register.
const ushort txB = 0xB001;
await cB.SendAsync(BuildFc03ReadFrame(txB, 0, 1), SocketFlags.None);
// B's response must arrive within a few hundred ms even with A wedged. If
// the non-blocking enqueue path were missing, the reader would be blocked on
// A's channel and B would time out.
var rspB = await ReadOneFrameAsync(cB, TestContext.Current.CancellationToken)
.WaitAsync(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken);
ushort echoB = (ushort)((rspB[0] << 8) | rspB[1]);
echoB.ShouldBe(txB, "B's TxId must be echoed; reader must not be stalled by A");
// Now poll for A's drop counter to register. The drops happen as the reader
// tries to fan responses to A and the channel/socket is full; this needs
// pumpCount drops minus channel-capacity (16) minus a few in transit.
long drops = 0;
for (int i = 0; i < 50; i++)
{
drops = ctx.Counters.Snapshot().ResponseDropForFullUpstream;
if (drops > 0) break;
await Task.Delay(50, TestContext.Current.CancellationToken);
}
drops.ShouldBeGreaterThan(0,
"ResponseDropForFullUpstream must increment when A's bounded response channel fills");
}
finally
{
cA.Dispose(); cB.Dispose();
await pA.DisposeAsync(); await pB.DisposeAsync();
lA.Stop(); lB.Stop();
}
}
///
/// Watchdog↔response race. The design uses claim-then-dispatch:
/// CorrelationMap.TryRemove is the single source of truth, so exactly ONE
/// of (response delivered, watchdog timeout) wins for any given proxy TxId. This
/// test exercises the race window directly: a stub backend that responds at almost
/// exactly the request-timeout deadline. Across many iterations the test drives
/// both branches and verifies the contract:
///
/// - Exactly one response per request (no double-delivery).
/// - Every response carries the original client TxId.
/// - No request hangs.
///
///
[Fact]
public async Task WatchdogVsResponse_Race_AlwaysExactlyOneOutcome_PerRequest()
{
// Backend that replies after a delay we can wedge to fall close to the watchdog
// deadline. Watchdog tick is max(100, RequestTimeoutMs/4); with RequestTimeoutMs
// = 400 the tick is 100 ms. Configure the backend to delay 350-450 ms for each
// request so some land before, some after the timeout.
int backendPort = PickFreePort();
// Deterministic alternation rather than seeded Random. Random with a fixed seed is
// not stable across .NET major versions (Microsoft has changed the implementation,
// e.g. legacy → Xoshiro128 in .NET 6), so a runtime upgrade could land all samples
// on one side of the watchdog deadline and break the "both branches must fire"
// assertion below. Counter-based alternation guarantees 15 fast (350 ms, beats
// watchdog) and 15 slow (450 ms, loses to watchdog) responses across 30 iterations,
// regardless of runtime.
int reqCount = 0;
var slowBackend = new SlowResponseBackend(backendPort, () =>
{
int n = Interlocked.Increment(ref reqCount);
return (n & 1) == 1 ? 350 : 450;
});
await using var _ = slowBackend;
var ctx = MakeContext("PLC1");
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
var connOpts = new ConnectionOptions { BackendRequestTimeoutMs = 400 };
await using var mux = await BuildMuxAsync(plc, connOpts, ctx);
var (client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name);
try
{
const int iterations = 30;
int normalResponses = 0;
int timeoutResponses = 0;
for (ushort i = 1; i <= iterations; i++)
{
await client.SendAsync(BuildFc03ReadFrame(i, 0, 1), SocketFlags.None);
var rsp = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken)
.WaitAsync(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken);
// Contract 1: original TxId echoed regardless of which branch won.
ushort echo = (ushort)((rsp[0] << 8) | rsp[1]);
echo.ShouldBe(i, $"iteration {i}: TxId must be echoed");
// Branch detection.
byte fc = rsp[7];
if ((fc & 0x80) != 0)
{
// Watchdog branch: exception 0x0B (Gateway Target Failed To Respond).
rsp[8].ShouldBe((byte)0x0B, $"iteration {i}: watchdog must use exception code 0x0B");
timeoutResponses++;
}
else
{
// Response branch: normal FC03 response.
normalResponses++;
}
}
// Contract 2: no request hung — every iteration produced exactly one response.
(normalResponses + timeoutResponses).ShouldBe(iterations);
// The race should produce a mix; 30 iterations spanning 350-450 ms with a
// 400 ms deadline should yield both branches in any healthy run. We assert
// both outcomes were hit at least once to prove the race window is real and
// both branches are exercised.
normalResponses.ShouldBeGreaterThan(0, "at least one request must beat the watchdog");
timeoutResponses.ShouldBeGreaterThan(0, "at least one request must lose to the watchdog");
}
finally
{
client.Dispose();
await pipe.DisposeAsync();
listener.Stop();
}
}
///
/// Cascade racing with new accepts. Stress-test: while the backend is repeatedly
/// killed and resurrected (forcing repeated cascade cycles), new upstream clients
/// connect and disconnect concurrently. The contract verified is the
/// no-crash-under-churn property: the multiplexer must survive arbitrary interleavings
/// of teardown and new-pipe-attach without throwing into the host or leaking sockets.
///
/// The race window — a new pipe added between
/// _pipes.Values.ToArray() and _pipes.Clear() in TearDownBackendAsync
/// — leaves the new pipe alive but orphaned from _pipes. Its read loop will
/// receive normal traffic until the next cascade or its socket closes. This test
/// doesn't try to reproduce that exact ordering (which is microseconds wide); it
/// instead proves that arbitrary interleavings of cascade + accept under load do not
/// cause an observable failure (exception, hang, or counter inconsistency).
///
[Fact(Timeout = 15_000)]
public async Task CascadeVsNewAccept_StressChurn_NoCrash_NoHang()
{
int backendPort = PickFreePort();
var ctx = MakeContext("PLC1");
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx);
// Backend that's brought up and torn down repeatedly to force cascade cycles.
StubBackend? backend = new(backendPort);
const int cascadeCycles = 3;
const int connectsPerCycle = 8;
try
{
for (int cycle = 0; cycle < cascadeCycles; cycle++)
{
// Spawn many concurrent connect+request tasks.
var tasks = Enumerable.Range(0, connectsPerCycle).Select(async _ =>
{
Socket? client = null;
UpstreamPipe? pipe = null;
TcpListener? listener = null;
try
{
(client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name);
await client.SendAsync(BuildFc03ReadFrame(1, 0, 1), SocketFlags.None,
TestContext.Current.CancellationToken);
// Read with a short deadline; cascades will close us mid-flight.
try
{
await ReadOneFrameAsync(client, TestContext.Current.CancellationToken)
.WaitAsync(TimeSpan.FromMilliseconds(500), TestContext.Current.CancellationToken);
}
catch
{
// Expected when our pipe is cascaded mid-request — no test failure.
}
}
finally
{
try { client?.Dispose(); } catch { }
try { if (pipe is not null) await pipe.DisposeAsync(); } catch { }
try { listener?.Stop(); } catch { }
}
}).ToArray();
// Wait for the connect storm to be in flight, then kill the backend mid-storm.
await Task.Delay(40, TestContext.Current.CancellationToken);
await backend.DisposeAsync();
// Drain the connect tasks.
await Task.WhenAll(tasks).WaitAsync(TimeSpan.FromSeconds(5),
TestContext.Current.CancellationToken);
// Bring backend back up for the next cycle.
backend = new StubBackend(backendPort);
await Task.Delay(50, TestContext.Current.CancellationToken);
}
// Survival check — the multiplexer must still service a normal request after
// all the cascade churn.
var (cFinal, pFinal, lFinal, _) = await ConnectClientAsync(mux, plc.Name);
try
{
await cFinal.SendAsync(BuildFc03ReadFrame(0xF1F1, 0, 1), SocketFlags.None,
TestContext.Current.CancellationToken);
var rsp = await ReadOneFrameAsync(cFinal, TestContext.Current.CancellationToken)
.WaitAsync(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken);
ushort echo = (ushort)((rsp[0] << 8) | rsp[1]);
echo.ShouldBe((ushort)0xF1F1, "multiplexer must still serve traffic after cascade churn");
}
finally
{
cFinal.Dispose();
await pFinal.DisposeAsync();
lFinal.Stop();
}
}
finally
{
try { await backend.DisposeAsync(); } catch { }
}
}
///
/// Backend stub that delays each response by a caller-supplied amount. Used by the
/// watchdog-vs-response race test.
///
private sealed class SlowResponseBackend : IAsyncDisposable
{
private readonly TcpListener _listener;
private readonly CancellationTokenSource _cts = new();
private readonly Func _delayMsFactory;
private readonly List _clientTasks = new();
public SlowResponseBackend(int port, Func delayMsFactory)
{
_delayMsFactory = delayMsFactory;
_listener = new TcpListener(IPAddress.Loopback, port);
_listener.Start();
_ = AcceptLoop();
}
private async Task AcceptLoop()
{
try
{
while (!_cts.IsCancellationRequested)
{
var s = await _listener.AcceptSocketAsync(_cts.Token);
var task = Task.Run(() => HandleAsync(s));
lock (_clientTasks) _clientTasks.Add(task);
}
}
catch { /* shutdown */ }
}
private async Task HandleAsync(Socket s)
{
try
{
while (!_cts.IsCancellationRequested)
{
var req = await ReadOneFrameAsync(s, _cts.Token);
if (req.Length < 8) break;
ushort txId = (ushort)((req[0] << 8) | req[1]);
byte unitId = req[6];
int delayMs = _delayMsFactory();
// Schedule the response after delay; don't block this iteration so
// pipelined requests can race within one client connection.
_ = Task.Run(async () =>
{
try
{
await Task.Delay(delayMs, _cts.Token);
byte[] rsp = BuildFc03Response(txId, unitId, 0x0001);
await s.SendAsync(rsp, SocketFlags.None, _cts.Token);
}
catch { /* shutdown / pipe down */ }
});
}
}
catch { /* normal */ }
finally { try { s.Dispose(); } catch { } }
}
public async ValueTask DisposeAsync()
{
await _cts.CancelAsync();
try { _listener.Stop(); } catch { }
Task[] snap;
lock (_clientTasks) snap = _clientTasks.ToArray();
try { await Task.WhenAll(snap).WaitAsync(TimeSpan.FromSeconds(2)); } catch { }
_cts.Dispose();
}
}
}