7a435957ee
Closes the new findings from the post-remediation re-review (codereviews/2026-05-14/ReReviewAfterRemediation.md): NC1 — ProxyWorker.StopAsync drain loop is structurally always-zero Wave 1's W1.5 inherited the original ShutdownCoordinator bug it was meant to replace. Supervisor.StopAsync nulls the per-mux counter provider before the drain loop runs, so CountInFlight always returns 0 and the drain budget is never spent on actual draining. Fix: snapshot the in-flight count BEFORE supervisor stop, drop the theatrical post-stop loop, and report InFlightAtCancel as the snapshot count (= the number of in-flight requests dropped by the stop). The supervisor stop IS the drain — there is nothing to drain that wouldn't be killed by the stop itself. NM1 — TearDownBackendAsync._connectGate.WaitAsync uncancellable Without a token, a long Polly-wrapped EnsureBackendConnectedAsync against an unreachable host could hold the gate for the full BackendConnectTimeoutMs * MaxAttempts window, blocking DisposeAsync (and therefore ProxyWorker.StopAsync) for that duration. Fix: bound the wait with a 2 s teardown deadline; on timeout proceed best-effort without the gate. Worst-case consequence is one orphaned in-flight cycle on the dying backend, surfaced to upstream as exception 0x0B by the watchdog. NM2 — ReplaceContext non-atomic ctx + provider swap Snapshot path reads `_cacheStatsProvider` independently of `_ctx`. If `_ctx` was swapped first, a snapshot taken in the gap would still hold the OLD adapter wrapping the OLD cache — which the supervisor disposes immediately after we return. Fix: set the provider FIRST, then swap `_ctx`. Snapshots in the swap window now read either (old, old) or (new, new), never (old-after-disposed). NM5 — Self-cascade ObjectDisposedException after dispose Writer/reader fault catches fired `_ = TearDownBackendAsync(...)` unconditionally. After DisposeAsync runs `_connectGate.Dispose()`, the fire-and-forget TearDown threw ObjectDisposedException on WaitAsync as an unobserved Task exception. Fix: skip self-cascade when `_disposeCts.IsCancellationRequested` — DisposeAsync runs an explicit TearDown anyway. Nm1 — Saturation cleanup uses await SendResponseAsync W1.2's per-attacher delivery loop awaited the blocking SendResponseAsync, which would serialise on a wedged late-attacher's full bounded channel and stall delivery to its peers — contradicting the W1.3 doctrine that the fan-out path must never await per-pipe writes. Fix: use TrySendResponse and increment ResponseDropForFullUpstream on drop. T2 — WatchdogVsResponse_Race seeded Random fragility Used `new Random(12345)` over [350, 450) ms with watchdog at 400 ms; Random's algorithm is implementation-defined across .NET major versions (legacy → Xoshiro128 in .NET 6) so a runtime upgrade could land all samples on one side of the deadline and break the "both branches must fire" assertion. Fix: deterministic counter-based alternation (15 fast + 15 slow across 30 iterations) — guaranteed by construction. Latent items NM3 (_supervisorCts leak on re-Start) and NM4 (TCS single-shot semantics) are unfixed: no caller actually re-Starts a supervisor today; both become real only if the reconciler ever changes to re-Start instead of dispose-and-rebuild. Documented in the re-review. Tests: 387 pass / 0 fail. Three back-to-back race-test runs in isolation all green (T2 alternation is deterministic). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1240 lines
54 KiB
C#
1240 lines
54 KiB
C#
using System.Collections.Concurrent;
|
|
using System.Collections.Frozen;
|
|
using System.Net;
|
|
using System.Net.Sockets;
|
|
using Mbproxy.Bcd;
|
|
using Mbproxy.Options;
|
|
using Mbproxy.Proxy;
|
|
using Mbproxy.Proxy.Cache;
|
|
using Mbproxy.Proxy.Multiplexing;
|
|
using Microsoft.Extensions.Logging.Abstractions;
|
|
using Shouldly;
|
|
using Xunit;
|
|
|
|
namespace Mbproxy.Tests.Proxy.Multiplexing;
|
|
|
|
/// <summary>
|
|
/// Integration tests for <see cref="PlcMultiplexer"/> against a stub backend
|
|
/// (a <see cref="TcpListener"/> that canned-responds). Uses real sockets but no simulator.
|
|
/// </summary>
|
|
[Trait("Category", "Unit")]
|
|
public sealed class PlcMultiplexerTests
|
|
{
|
|
// ── Helpers ────────────────────────────────────────────────────────────────
|
|
|
|
private static int PickFreePort()
|
|
{
|
|
var l = new TcpListener(IPAddress.Loopback, 0);
|
|
l.Start();
|
|
int port = ((IPEndPoint)l.LocalEndpoint).Port;
|
|
l.Stop();
|
|
return port;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Reads exactly <paramref name="count"/> bytes from <paramref name="socket"/>.
|
|
/// </summary>
|
|
private static async Task<byte[]> ReadExactAsync(Socket socket, int count, CancellationToken ct)
|
|
{
|
|
var buf = new byte[count];
|
|
int read = 0;
|
|
while (read < count)
|
|
{
|
|
int n = await socket.ReceiveAsync(buf.AsMemory(read, count - read), SocketFlags.None, ct);
|
|
if (n == 0) throw new IOException("EOF");
|
|
read += n;
|
|
}
|
|
return buf;
|
|
}
|
|
|
|
private static async Task<byte[]> ReadOneFrameAsync(Socket socket, CancellationToken ct)
|
|
{
|
|
var header = await ReadExactAsync(socket, 7, ct);
|
|
ushort length = (ushort)((header[4] << 8) | header[5]);
|
|
int bodyLen = length - 1;
|
|
var body = bodyLen > 0 ? await ReadExactAsync(socket, bodyLen, ct) : Array.Empty<byte>();
|
|
var frame = new byte[7 + bodyLen];
|
|
Buffer.BlockCopy(header, 0, frame, 0, 7);
|
|
if (bodyLen > 0) Buffer.BlockCopy(body, 0, frame, 7, bodyLen);
|
|
return frame;
|
|
}
|
|
|
|
private static byte[] BuildFc03ReadFrame(ushort txId, ushort start, ushort qty, byte unitId = 1)
|
|
=>
|
|
[
|
|
(byte)(txId >> 8), (byte)(txId & 0xFF),
|
|
0x00, 0x00,
|
|
0x00, 0x06,
|
|
unitId,
|
|
0x03,
|
|
(byte)(start >> 8), (byte)(start & 0xFF),
|
|
(byte)(qty >> 8), (byte)(qty & 0xFF),
|
|
];
|
|
|
|
private static byte[] BuildFc06WriteFrame(ushort txId, ushort addr, ushort value, byte unitId = 1)
|
|
=>
|
|
[
|
|
(byte)(txId >> 8), (byte)(txId & 0xFF),
|
|
0x00, 0x00,
|
|
0x00, 0x06,
|
|
unitId,
|
|
0x06,
|
|
(byte)(addr >> 8), (byte)(addr & 0xFF),
|
|
(byte)(value >> 8), (byte)(value & 0xFF),
|
|
];
|
|
|
|
private static byte[] BuildFc03Response(ushort txId, byte unitId, params ushort[] registers)
|
|
{
|
|
int bodyLen = 2 + registers.Length * 2; // FC + byteCount + register data
|
|
var frame = new byte[7 + bodyLen];
|
|
frame[0] = (byte)(txId >> 8);
|
|
frame[1] = (byte)(txId & 0xFF);
|
|
frame[2] = 0;
|
|
frame[3] = 0;
|
|
ushort length = (ushort)(1 + bodyLen); // UnitId + PDU
|
|
frame[4] = (byte)(length >> 8);
|
|
frame[5] = (byte)(length & 0xFF);
|
|
frame[6] = unitId;
|
|
frame[7] = 0x03;
|
|
frame[8] = (byte)(registers.Length * 2);
|
|
for (int i = 0; i < registers.Length; i++)
|
|
{
|
|
frame[9 + i * 2] = (byte)(registers[i] >> 8);
|
|
frame[9 + i * 2 + 1] = (byte)(registers[i] & 0xFF);
|
|
}
|
|
return frame;
|
|
}
|
|
|
|
/// <summary>
|
|
/// FC06 response echo with txId / addr / value.
|
|
/// </summary>
|
|
private static byte[] BuildFc06Response(ushort txId, byte unitId, ushort addr, ushort value)
|
|
{
|
|
var frame = new byte[7 + 5];
|
|
frame[0] = (byte)(txId >> 8);
|
|
frame[1] = (byte)(txId & 0xFF);
|
|
frame[2] = 0; frame[3] = 0;
|
|
frame[4] = 0; frame[5] = 6; // length: UnitId(1) + FC(1) + Addr(2) + Value(2)
|
|
frame[6] = unitId;
|
|
frame[7] = 0x06;
|
|
frame[8] = (byte)(addr >> 8);
|
|
frame[9] = (byte)(addr & 0xFF);
|
|
frame[10] = (byte)(value >> 8);
|
|
frame[11] = (byte)(value & 0xFF);
|
|
return frame;
|
|
}
|
|
|
|
private static PerPlcContext MakeContext(string name, params BcdTag[] tags)
|
|
{
|
|
var frozen = tags.ToDictionary(t => t.Address).ToFrozenDictionary();
|
|
var map = frozen.Count > 0 ? new BcdTagMap(frozen) : BcdTagMap.Empty;
|
|
return new PerPlcContext
|
|
{
|
|
PlcName = name,
|
|
TagMap = map,
|
|
Counters = new ProxyCounters(),
|
|
Logger = NullLogger.Instance,
|
|
};
|
|
}
|
|
|
|
/// <summary>
|
|
/// A stub backend that echoes FC03 responses for every request, recording the proxy
|
|
/// TxIds it sees on the wire so tests can verify the multiplexer rewrites them.
|
|
/// </summary>
|
|
private sealed class StubBackend : IAsyncDisposable
|
|
{
|
|
public int Port { get; }
|
|
private readonly TcpListener _listener;
|
|
private readonly CancellationTokenSource _cts = new();
|
|
private readonly List<Task> _clientTasks = new();
|
|
public ConcurrentQueue<ushort> SeenProxyTxIds { get; } = new();
|
|
public Func<byte, ushort, ushort, ushort, byte[]>? FcResponseFactory { get; set; }
|
|
|
|
public StubBackend(int port)
|
|
{
|
|
Port = port;
|
|
_listener = new TcpListener(IPAddress.Loopback, port);
|
|
_listener.Start();
|
|
_ = AcceptLoop();
|
|
}
|
|
|
|
private async Task AcceptLoop()
|
|
{
|
|
try
|
|
{
|
|
while (!_cts.IsCancellationRequested)
|
|
{
|
|
Socket s = await _listener.AcceptSocketAsync(_cts.Token);
|
|
var task = Task.Run(() => HandleAsync(s));
|
|
lock (_clientTasks) _clientTasks.Add(task);
|
|
}
|
|
}
|
|
catch { /* shutdown */ }
|
|
}
|
|
|
|
private async Task HandleAsync(Socket s)
|
|
{
|
|
try
|
|
{
|
|
while (!_cts.IsCancellationRequested)
|
|
{
|
|
var req = await ReadOneFrameAsync(s, _cts.Token);
|
|
if (req.Length < 8) break;
|
|
|
|
ushort txId = (ushort)((req[0] << 8) | req[1]);
|
|
SeenProxyTxIds.Enqueue(txId);
|
|
byte unitId = req[6];
|
|
byte fc = req[7];
|
|
|
|
byte[] response;
|
|
if (FcResponseFactory is not null)
|
|
{
|
|
ushort start = req.Length >= 10 ? (ushort)((req[8] << 8) | req[9]) : (ushort)0;
|
|
ushort qty = req.Length >= 12 ? (ushort)((req[10] << 8) | req[11]) : (ushort)0;
|
|
response = FcResponseFactory(fc, start, qty, txId);
|
|
}
|
|
else if (fc == 0x03)
|
|
{
|
|
// Default: FC03 echo a single register containing 0x1234.
|
|
response = BuildFc03Response(txId, unitId, 0x1234);
|
|
}
|
|
else if (fc == 0x06)
|
|
{
|
|
ushort addr = (ushort)((req[8] << 8) | req[9]);
|
|
ushort value = (ushort)((req[10] << 8) | req[11]);
|
|
response = BuildFc06Response(txId, unitId, addr, value);
|
|
}
|
|
else
|
|
{
|
|
break;
|
|
}
|
|
await s.SendAsync(response, SocketFlags.None, _cts.Token);
|
|
}
|
|
}
|
|
catch { /* normal */ }
|
|
finally { try { s.Dispose(); } catch { } }
|
|
}
|
|
|
|
public async ValueTask DisposeAsync()
|
|
{
|
|
await _cts.CancelAsync();
|
|
try { _listener.Stop(); } catch { }
|
|
Task[] snap;
|
|
lock (_clientTasks) snap = _clientTasks.ToArray();
|
|
try { await Task.WhenAll(snap).WaitAsync(TimeSpan.FromSeconds(2)); } catch { }
|
|
_cts.Dispose();
|
|
}
|
|
}
|
|
|
|
private static async Task<PlcMultiplexer> BuildMuxAsync(
|
|
PlcOptions plc, ConnectionOptions connOpts, PerPlcContext ctx)
|
|
{
|
|
var mux = new PlcMultiplexer(
|
|
plc, connOpts,
|
|
new BcdPduPipeline(),
|
|
ctx,
|
|
NullLogger<PlcMultiplexer>.Instance,
|
|
backendConnectPipeline: null);
|
|
await Task.Yield();
|
|
return mux;
|
|
}
|
|
|
|
private static async Task<(Socket client, UpstreamPipe pipe, TcpListener proxyListener, int proxyPort)>
|
|
ConnectClientAsync(PlcMultiplexer mux, string plcName)
|
|
{
|
|
int proxyPort = PickFreePort();
|
|
var proxyListener = new TcpListener(IPAddress.Loopback, proxyPort);
|
|
proxyListener.Start();
|
|
|
|
var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)
|
|
{ NoDelay = true };
|
|
await client.ConnectAsync(IPAddress.Loopback, proxyPort);
|
|
var upstream = await proxyListener.AcceptSocketAsync();
|
|
var pipe = new UpstreamPipe(upstream, plcName, NullLogger.Instance);
|
|
_ = Task.Run(() => mux.StartPipeAsync(pipe, CancellationToken.None));
|
|
|
|
return (client, pipe, proxyListener, proxyPort);
|
|
}
|
|
|
|
// ── Tests ─────────────────────────────────────────────────────────────────
|
|
|
|
[Fact]
|
|
public async Task SingleUpstream_RoundTripsFC03_Through_Multiplexer()
|
|
{
|
|
int backendPort = PickFreePort();
|
|
await using var backend = new StubBackend(backendPort);
|
|
|
|
var ctx = MakeContext("PLC1", BcdTag.Create(100, 16));
|
|
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
|
|
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx);
|
|
|
|
var (client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name);
|
|
try
|
|
{
|
|
await client.SendAsync(BuildFc03ReadFrame(0x1234, 100, 1), SocketFlags.None);
|
|
var rsp = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken);
|
|
|
|
ushort rspTxId = (ushort)((rsp[0] << 8) | rsp[1]);
|
|
rspTxId.ShouldBe((ushort)0x1234, "the original TxId must be restored on the way back to the client");
|
|
|
|
// BCD decode of the stub's 0x1234 response = 1234.
|
|
ushort decoded = (ushort)((rsp[9] << 8) | rsp[10]);
|
|
decoded.ShouldBe((ushort)1234);
|
|
}
|
|
finally
|
|
{
|
|
client.Dispose();
|
|
await pipe.DisposeAsync();
|
|
listener.Stop();
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task SingleUpstream_RoundTripsFC06_Through_Multiplexer()
|
|
{
|
|
int backendPort = PickFreePort();
|
|
await using var backend = new StubBackend(backendPort);
|
|
|
|
var ctx = MakeContext("PLC1", BcdTag.Create(200, 16));
|
|
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
|
|
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx);
|
|
|
|
var (client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name);
|
|
try
|
|
{
|
|
// Client writes binary 1234; proxy encodes to BCD 0x1234 on the way out.
|
|
await client.SendAsync(BuildFc06WriteFrame(0xABCD, 200, 1234), SocketFlags.None);
|
|
var rsp = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken);
|
|
|
|
ushort rspTxId = (ushort)((rsp[0] << 8) | rsp[1]);
|
|
rspTxId.ShouldBe((ushort)0xABCD);
|
|
|
|
// Echo bytes decoded back to client binary.
|
|
ushort echoed = (ushort)((rsp[10] << 8) | rsp[11]);
|
|
echoed.ShouldBe((ushort)1234);
|
|
}
|
|
finally
|
|
{
|
|
client.Dispose();
|
|
await pipe.DisposeAsync();
|
|
listener.Stop();
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task TwoUpstreams_ConcurrentFC03_BothGetCorrectResponses()
|
|
{
|
|
int backendPort = PickFreePort();
|
|
await using var backend = new StubBackend(backendPort)
|
|
{
|
|
// Both clients read address 100; both should see their own TxId echoed.
|
|
FcResponseFactory = (fc, start, qty, txId) =>
|
|
{
|
|
byte unitId = 1;
|
|
return fc == 0x03
|
|
? BuildFc03Response(txId, unitId, 0x1234)
|
|
: throw new InvalidOperationException("unexpected fc");
|
|
},
|
|
};
|
|
|
|
var ctx = MakeContext("PLC1", BcdTag.Create(100, 16));
|
|
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
|
|
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx);
|
|
|
|
var (c1, p1, l1, _) = await ConnectClientAsync(mux, plc.Name);
|
|
var (c2, p2, l2, _) = await ConnectClientAsync(mux, plc.Name);
|
|
try
|
|
{
|
|
// Both clients use the same upstream TxId (0x0001). That would clash on a
|
|
// shared backend wire if the mux didn't rewrite the TxId.
|
|
await c1.SendAsync(BuildFc03ReadFrame(0x0001, 100, 1), SocketFlags.None);
|
|
await c2.SendAsync(BuildFc03ReadFrame(0x0001, 100, 1), SocketFlags.None);
|
|
|
|
var r1 = await ReadOneFrameAsync(c1, TestContext.Current.CancellationToken);
|
|
var r2 = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken);
|
|
|
|
// Both responses must carry the original (colliding) TxId.
|
|
((ushort)((r1[0] << 8) | r1[1])).ShouldBe((ushort)0x0001);
|
|
((ushort)((r2[0] << 8) | r2[1])).ShouldBe((ushort)0x0001);
|
|
}
|
|
finally
|
|
{
|
|
c1.Dispose(); c2.Dispose();
|
|
await p1.DisposeAsync(); await p2.DisposeAsync();
|
|
l1.Stop(); l2.Stop();
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task TwoUpstreams_ProxyTxIds_AreDistinct_OnTheWire()
|
|
{
|
|
int backendPort = PickFreePort();
|
|
await using var backend = new StubBackend(backendPort);
|
|
|
|
var ctx = MakeContext("PLC1");
|
|
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
|
|
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx);
|
|
|
|
var (c1, p1, l1, _) = await ConnectClientAsync(mux, plc.Name);
|
|
var (c2, p2, l2, _) = await ConnectClientAsync(mux, plc.Name);
|
|
try
|
|
{
|
|
// Both clients use the same upstream TxId 0x0007 — the proxy must hand out
|
|
// distinct proxy TxIds on the backend wire. Phase 10: reads target DIFFERENT
|
|
// addresses so coalescing does not fuse them into a single backend request.
|
|
await c1.SendAsync(BuildFc03ReadFrame(0x0007, 0, 1), SocketFlags.None);
|
|
await c2.SendAsync(BuildFc03ReadFrame(0x0007, 10, 1), SocketFlags.None);
|
|
|
|
_ = await ReadOneFrameAsync(c1, TestContext.Current.CancellationToken);
|
|
_ = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken);
|
|
|
|
// Collect what the backend saw.
|
|
var seen = new HashSet<ushort>(backend.SeenProxyTxIds);
|
|
seen.Count.ShouldBeGreaterThanOrEqualTo(2, "the multiplexer must allocate distinct proxy TxIds even when upstreams collide");
|
|
}
|
|
finally
|
|
{
|
|
c1.Dispose(); c2.Dispose();
|
|
await p1.DisposeAsync(); await p2.DisposeAsync();
|
|
l1.Stop(); l2.Stop();
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task UpstreamDisconnect_DoesNotAffectOtherUpstreams()
|
|
{
|
|
int backendPort = PickFreePort();
|
|
await using var backend = new StubBackend(backendPort);
|
|
|
|
var ctx = MakeContext("PLC1");
|
|
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
|
|
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx);
|
|
|
|
var (cA, pA, lA, _) = await ConnectClientAsync(mux, plc.Name);
|
|
var (cB, pB, lB, _) = await ConnectClientAsync(mux, plc.Name);
|
|
try
|
|
{
|
|
// Drop client A entirely.
|
|
cA.Dispose();
|
|
await Task.Delay(50, TestContext.Current.CancellationToken);
|
|
|
|
// Client B should still be able to round-trip.
|
|
await cB.SendAsync(BuildFc03ReadFrame(0x0042, 0, 1), SocketFlags.None);
|
|
var rsp = await ReadOneFrameAsync(cB, TestContext.Current.CancellationToken);
|
|
((ushort)((rsp[0] << 8) | rsp[1])).ShouldBe((ushort)0x0042);
|
|
}
|
|
finally
|
|
{
|
|
cB.Dispose();
|
|
await pA.DisposeAsync(); await pB.DisposeAsync();
|
|
lA.Stop(); lB.Stop();
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task BackendDisconnect_CascadesToAllUpstreams()
|
|
{
|
|
int backendPort = PickFreePort();
|
|
var backend = new StubBackend(backendPort);
|
|
|
|
var ctx = MakeContext("PLC1");
|
|
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
|
|
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx);
|
|
|
|
var (cA, pA, lA, _) = await ConnectClientAsync(mux, plc.Name);
|
|
var (cB, pB, lB, _) = await ConnectClientAsync(mux, plc.Name);
|
|
var (cC, pC, lC, _) = await ConnectClientAsync(mux, plc.Name);
|
|
try
|
|
{
|
|
// Force a round-trip on each so backend connect occurs first.
|
|
await cA.SendAsync(BuildFc03ReadFrame(1, 0, 1), SocketFlags.None);
|
|
await cB.SendAsync(BuildFc03ReadFrame(2, 0, 1), SocketFlags.None);
|
|
await cC.SendAsync(BuildFc03ReadFrame(3, 0, 1), SocketFlags.None);
|
|
_ = await ReadOneFrameAsync(cA, TestContext.Current.CancellationToken);
|
|
_ = await ReadOneFrameAsync(cB, TestContext.Current.CancellationToken);
|
|
_ = await ReadOneFrameAsync(cC, TestContext.Current.CancellationToken);
|
|
|
|
// Kill the backend.
|
|
await backend.DisposeAsync();
|
|
|
|
// All three upstream sockets should observe a clean EOF within 500 ms.
|
|
var sw = System.Diagnostics.Stopwatch.StartNew();
|
|
await WaitForCloseAsync(cA, TestContext.Current.CancellationToken);
|
|
await WaitForCloseAsync(cB, TestContext.Current.CancellationToken);
|
|
await WaitForCloseAsync(cC, TestContext.Current.CancellationToken);
|
|
sw.Stop();
|
|
sw.ElapsedMilliseconds.ShouldBeLessThan(2000, "cascade should propagate quickly");
|
|
|
|
// Poll briefly for the cascade counter — there is an inherent scheduling gap
|
|
// between "upstream socket EOF observed" (WaitForCloseAsync returns) and "the
|
|
// multiplexer's TearDownBackendAsync increments the counter after awaiting
|
|
// every pipe.DisposeAsync". This poll absorbs that scheduling jitter without
|
|
// weakening the assertion's semantics — the counter MUST reach 3 (or more)
|
|
// because all three upstream pipes were attached when the cascade fired.
|
|
long cascades = 0;
|
|
for (int i = 0; i < 50; i++)
|
|
{
|
|
cascades = ctx.Counters.Snapshot().BackendDisconnectCascades;
|
|
if (cascades >= 3) break;
|
|
await Task.Delay(20, TestContext.Current.CancellationToken);
|
|
}
|
|
cascades.ShouldBeGreaterThanOrEqualTo(3);
|
|
}
|
|
finally
|
|
{
|
|
cA.Dispose(); cB.Dispose(); cC.Dispose();
|
|
await pA.DisposeAsync(); await pB.DisposeAsync(); await pC.DisposeAsync();
|
|
lA.Stop(); lB.Stop(); lC.Stop();
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task RequestTimeoutWatchdog_DeliversException0B_ToUpstream_WhenBackendNeverResponds()
|
|
{
|
|
// A drain-only stub that consumes requests but never responds. The multiplexer's
|
|
// per-request watchdog must surface a Modbus exception 0x0B to the upstream client
|
|
// once BackendRequestTimeoutMs elapses, freeing the proxy TxId + correlation entry.
|
|
int backendPort = PickFreePort();
|
|
var drainListener = new TcpListener(IPAddress.Loopback, backendPort);
|
|
drainListener.Start();
|
|
var drainCts = new CancellationTokenSource();
|
|
var drainToken = drainCts.Token;
|
|
_ = Task.Run(async () =>
|
|
{
|
|
try
|
|
{
|
|
while (!drainToken.IsCancellationRequested)
|
|
{
|
|
var s = await drainListener.AcceptSocketAsync(drainToken);
|
|
_ = Task.Run(async () =>
|
|
{
|
|
var buf = new byte[256];
|
|
try
|
|
{
|
|
while (!drainToken.IsCancellationRequested)
|
|
{
|
|
int n = await s.ReceiveAsync(buf, SocketFlags.None, drainToken);
|
|
if (n == 0) break;
|
|
}
|
|
}
|
|
catch { }
|
|
finally { try { s.Dispose(); } catch { } }
|
|
}, drainToken);
|
|
}
|
|
}
|
|
catch { }
|
|
}, drainToken);
|
|
|
|
try
|
|
{
|
|
var ctx = MakeContext("PLC1");
|
|
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
|
|
// Short request timeout so the test does not have to wait long.
|
|
var connOpts = new ConnectionOptions { BackendRequestTimeoutMs = 400 };
|
|
await using var mux = await BuildMuxAsync(plc, connOpts, ctx);
|
|
|
|
var (client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name);
|
|
try
|
|
{
|
|
await client.SendAsync(BuildFc03ReadFrame(0xABCD, 0, 1), SocketFlags.None);
|
|
|
|
// The watchdog should deliver an exception within ~watchdog-tick * 2.
|
|
var rsp = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken);
|
|
|
|
ushort rspTxId = (ushort)((rsp[0] << 8) | rsp[1]);
|
|
rspTxId.ShouldBe((ushort)0xABCD, "watchdog must echo the original client TxId");
|
|
byte fcByte = rsp[7];
|
|
(fcByte & 0x80).ShouldBe(0x80, "FC must have the exception bit set");
|
|
(fcByte & 0x7F).ShouldBe(0x03, "original FC must be FC03 (read holding registers)");
|
|
rsp[8].ShouldBe((byte)0x0B, "exception code must be 0x0B (Gateway Target Device Failed To Respond)");
|
|
}
|
|
finally
|
|
{
|
|
client.Dispose();
|
|
await pipe.DisposeAsync();
|
|
listener.Stop();
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
await drainCts.CancelAsync();
|
|
try { drainListener.Stop(); } catch { }
|
|
drainCts.Dispose();
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task BackendReconnect_AfterCascade_NextUpstreamRequest_Succeeds()
|
|
{
|
|
int backendPort = PickFreePort();
|
|
var backend = new StubBackend(backendPort);
|
|
|
|
var ctx = MakeContext("PLC1");
|
|
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
|
|
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx);
|
|
|
|
var (cA, pA, lA, _) = await ConnectClientAsync(mux, plc.Name);
|
|
try
|
|
{
|
|
await cA.SendAsync(BuildFc03ReadFrame(1, 0, 1), SocketFlags.None);
|
|
_ = await ReadOneFrameAsync(cA, TestContext.Current.CancellationToken);
|
|
|
|
await backend.DisposeAsync();
|
|
await WaitForCloseAsync(cA, TestContext.Current.CancellationToken);
|
|
cA.Dispose();
|
|
await pA.DisposeAsync();
|
|
lA.Stop();
|
|
}
|
|
catch { /* tolerate any teardown noise */ }
|
|
|
|
// Start a new backend on the same port.
|
|
await using var backend2 = new StubBackend(backendPort);
|
|
|
|
// A fresh client should round-trip cleanly through the same multiplexer.
|
|
var (cB, pB, lB, _) = await ConnectClientAsync(mux, plc.Name);
|
|
try
|
|
{
|
|
await cB.SendAsync(BuildFc03ReadFrame(0x7777, 0, 1), SocketFlags.None);
|
|
var rsp = await ReadOneFrameAsync(cB, TestContext.Current.CancellationToken);
|
|
((ushort)((rsp[0] << 8) | rsp[1])).ShouldBe((ushort)0x7777);
|
|
}
|
|
finally
|
|
{
|
|
cB.Dispose();
|
|
await pB.DisposeAsync();
|
|
lB.Stop();
|
|
}
|
|
}
|
|
|
|
private static async Task WaitForCloseAsync(Socket s, CancellationToken ct)
|
|
{
|
|
var buf = new byte[1];
|
|
using var deadline = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
|
deadline.CancelAfter(TimeSpan.FromSeconds(2));
|
|
while (!deadline.IsCancellationRequested)
|
|
{
|
|
try
|
|
{
|
|
int n = await s.ReceiveAsync(buf, SocketFlags.None, deadline.Token);
|
|
if (n == 0) return;
|
|
}
|
|
catch
|
|
{
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
// ── Phase 12 Wave-1 regression tests ──────────────────────────────────────
|
|
|
|
/// <summary>
|
|
/// W1.1 — verifies that <see cref="PlcMultiplexer.ReplaceContext"/> swaps the live
|
|
/// per-PLC context on the running multiplexer, so the very next PDU's BCD rewriter
|
|
/// uses the new tag map (not the captured-at-construction map). Before W1.1 this
|
|
/// scenario would silently keep using the old map until the listener faulted and the
|
|
/// supervisor's Polly loop reconstructed everything.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task ReplaceContext_NewTagMap_VisibleOnNextPdu()
|
|
{
|
|
int backendPort = PickFreePort();
|
|
await using var backend = new StubBackend(backendPort);
|
|
backend.FcResponseFactory = (fc, _, _, txId) =>
|
|
fc == 0x03 ? BuildFc03Response(txId, 1, 0x1234) : Array.Empty<byte>();
|
|
|
|
// Context 1 — tag at addr 100, BCD16. Wire 0x1234 decodes to decimal 1234.
|
|
var ctx1 = MakeContext("PLC1", BcdTag.Create(100, 16));
|
|
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
|
|
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx1);
|
|
|
|
var (client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name);
|
|
try
|
|
{
|
|
// Read 1 with original ctx — wire 0x1234 should be decoded to 1234 (= 0x04D2).
|
|
await client.SendAsync(BuildFc03ReadFrame(1, 100, 1), SocketFlags.None);
|
|
var rsp1 = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken);
|
|
ushort decoded1 = (ushort)((rsp1[9] << 8) | rsp1[10]);
|
|
decoded1.ShouldBe((ushort)1234, "with tag at 100, BCD wire 0x1234 must decode to decimal 1234");
|
|
|
|
// Swap to an empty tag map (counters preserved per the design's reseat contract).
|
|
var ctx2 = new PerPlcContext
|
|
{
|
|
PlcName = "PLC1",
|
|
TagMap = BcdTagMap.Empty,
|
|
Counters = ctx1.Counters,
|
|
Logger = NullLogger.Instance,
|
|
};
|
|
mux.ReplaceContext(ctx2);
|
|
|
|
// Read 2 with swapped ctx — no tag, raw 0x1234 must pass through unchanged.
|
|
await client.SendAsync(BuildFc03ReadFrame(2, 100, 1), SocketFlags.None);
|
|
var rsp2 = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken);
|
|
ushort raw2 = (ushort)((rsp2[9] << 8) | rsp2[10]);
|
|
raw2.ShouldBe((ushort)0x1234,
|
|
"after ReplaceContext to empty tag map, the next PDU must use the new map and pass 0x1234 through unchanged");
|
|
}
|
|
finally
|
|
{
|
|
client.Dispose();
|
|
await pipe.DisposeAsync();
|
|
listener.Stop();
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// W1.1 — verifies that swapping in a fresh response cache via <see cref="PlcMultiplexer.ReplaceContext"/>
|
|
/// makes the running multiplexer consult the NEW cache for subsequent reads, not the
|
|
/// old cache that was disposed by the supervisor. Without W1.1 the running mux would
|
|
/// keep its constructor-captured cache reference and either return stale entries or
|
|
/// hit a disposed cache.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task ReplaceContext_NewCache_NextReadGoesToBackend_NotOldCache()
|
|
{
|
|
int backendPort = PickFreePort();
|
|
await using var backend = new StubBackend(backendPort);
|
|
backend.FcResponseFactory = (fc, _, _, txId) =>
|
|
fc == 0x03 ? BuildFc03Response(txId, 1, (ushort)0x1111) : Array.Empty<byte>();
|
|
|
|
// Context 1 — cacheable tag at addr 200 with TTL 60_000 ms.
|
|
var tag = new BcdTag(200, 16, CacheTtlMs: 60_000);
|
|
var dict = new[] { tag }.ToDictionary(t => t.Address).ToFrozenDictionary();
|
|
var map = new BcdTagMap(dict);
|
|
var cache1 = new ResponseCache(maxEntriesPerPlc: 100, evictionIntervalMs: 5000);
|
|
var ctx1 = new PerPlcContext
|
|
{
|
|
PlcName = "PLC1",
|
|
TagMap = map,
|
|
Counters = new ProxyCounters(),
|
|
Logger = NullLogger.Instance,
|
|
Cache = cache1,
|
|
};
|
|
|
|
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
|
|
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx1);
|
|
var (client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name);
|
|
try
|
|
{
|
|
// Read 1 — populates cache1 from backend.
|
|
await client.SendAsync(BuildFc03ReadFrame(1, 200, 1), SocketFlags.None);
|
|
_ = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken);
|
|
await Task.Delay(50, TestContext.Current.CancellationToken);
|
|
int afterFirst = backend.SeenProxyTxIds.Count;
|
|
cache1.Count.ShouldBe(1, "cache1 must contain the first read");
|
|
|
|
// Read 2 — must hit cache1 (no backend traffic).
|
|
await client.SendAsync(BuildFc03ReadFrame(2, 200, 1), SocketFlags.None);
|
|
_ = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken);
|
|
backend.SeenProxyTxIds.Count.ShouldBe(afterFirst, "cache hit must not produce backend traffic");
|
|
|
|
// Swap in a brand-new (empty) cache via ReplaceContext.
|
|
var cache2 = new ResponseCache(maxEntriesPerPlc: 100, evictionIntervalMs: 5000);
|
|
var ctx2 = new PerPlcContext
|
|
{
|
|
PlcName = "PLC1",
|
|
TagMap = map,
|
|
Counters = ctx1.Counters,
|
|
Logger = NullLogger.Instance,
|
|
Cache = cache2,
|
|
};
|
|
mux.ReplaceContext(ctx2);
|
|
|
|
// Read 3 — old cache had the entry, but mux now uses cache2 which is empty,
|
|
// so the next read MUST go to the backend.
|
|
await client.SendAsync(BuildFc03ReadFrame(3, 200, 1), SocketFlags.None);
|
|
_ = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken);
|
|
await Task.Delay(50, TestContext.Current.CancellationToken);
|
|
backend.SeenProxyTxIds.Count.ShouldBe(afterFirst + 1,
|
|
"after ReplaceContext, the running multiplexer must consult the NEW cache (empty) — not the old one (still warm)");
|
|
cache2.Count.ShouldBe(1, "the new cache should be populated by the post-swap read");
|
|
}
|
|
finally
|
|
{
|
|
client.Dispose();
|
|
await pipe.DisposeAsync();
|
|
listener.Stop();
|
|
cache1.Dispose();
|
|
}
|
|
}
|
|
|
|
// ── Phase 12 (W3 final-tier race tests) ──────────────────────────────────
|
|
|
|
/// <summary>
|
|
/// Reflection helper — drains the multiplexer's TxIdAllocator by calling
|
|
/// <c>TryAllocate</c> in a loop until it refuses. Returns the number of slots taken
|
|
/// so callers can later release them if needed. Used only by the saturation tests
|
|
/// below; production code never touches the allocator from outside the multiplexer.
|
|
/// </summary>
|
|
private static int DrainAllocator(PlcMultiplexer mux)
|
|
{
|
|
var allocField = typeof(PlcMultiplexer).GetField("_allocator",
|
|
System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!;
|
|
object alloc = allocField.GetValue(mux)!;
|
|
var tryAllocate = alloc.GetType().GetMethod("TryAllocate",
|
|
System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.Instance)!;
|
|
|
|
int taken = 0;
|
|
object?[] args = [(ushort)0];
|
|
while ((bool)tryAllocate.Invoke(alloc, args)!)
|
|
taken++;
|
|
return taken;
|
|
}
|
|
|
|
/// <summary>
|
|
/// W3 #5 — TxId allocator saturation propagates as a Modbus exception 04 to the
|
|
/// upstream client (no hang, no crash). The 16-bit TxId space (65,536 slots) is
|
|
/// pre-saturated via reflection so the next request hits the
|
|
/// <c>!_allocator.TryAllocate</c> branch in <c>OnUpstreamFrameAsync</c> immediately.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task TxIdAllocator_Saturated_NextRequest_GetsException04_WithOriginalTxId()
|
|
{
|
|
int backendPort = PickFreePort();
|
|
await using var backend = new StubBackend(backendPort);
|
|
|
|
var ctx = MakeContext("PLC1");
|
|
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
|
|
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx);
|
|
|
|
var (client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name);
|
|
try
|
|
{
|
|
// Force the multiplexer to bring up its backend so the saturation check at
|
|
// OnUpstreamFrameAsync's allocator path is the only thing left to fail.
|
|
// (We send a normal request first, drain the response, then saturate.)
|
|
await client.SendAsync(BuildFc03ReadFrame(1, 0, 1), SocketFlags.None);
|
|
_ = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken);
|
|
|
|
int taken = DrainAllocator(mux);
|
|
// The TxId space is 65536; the previous request released its slot when the
|
|
// response landed, so we should be able to take ≥ 65535 fresh slots.
|
|
taken.ShouldBeGreaterThanOrEqualTo(65_535);
|
|
|
|
// Send a request that MUST hit saturation. Use a non-coalescing FC (06) to
|
|
// exercise the simpler non-coalescing saturation branch directly.
|
|
const ushort txId = 0xBEEF;
|
|
await client.SendAsync(BuildFc06WriteFrame(txId, addr: 100, value: 0x1234), SocketFlags.None);
|
|
|
|
var rsp = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken);
|
|
|
|
// Validate the saturation exception: original TxId echoed, exception bit set,
|
|
// exception code 04 (SLAVE_DEVICE_FAILURE).
|
|
ushort echoTxId = (ushort)((rsp[0] << 8) | rsp[1]);
|
|
echoTxId.ShouldBe(txId, "saturation exception must echo the original client TxId");
|
|
byte fc = rsp[7];
|
|
(fc & 0x80).ShouldBe(0x80, "FC must have the exception bit set");
|
|
(fc & 0x7F).ShouldBe(0x06, "original FC must be FC06");
|
|
rsp[8].ShouldBe((byte)0x04, "exception code must be 0x04 (Slave Device Failure)");
|
|
}
|
|
finally
|
|
{
|
|
client.Dispose();
|
|
await pipe.DisposeAsync();
|
|
listener.Stop();
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// W3 #6 — under TxId saturation, two concurrent identical FC03 reads must BOTH
|
|
/// receive exception 04 (one as the leader directly, the other either via a
|
|
/// coalesced fan-out from the W1.2 cleanup OR via its own independent saturation
|
|
/// path — either timing produces the same observable contract). Validates that no
|
|
/// pipe hangs forever waiting for a backend response that would never arrive.
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task TxIdAllocator_Saturated_TwoConcurrentIdenticalReads_BothPipesGetException04()
|
|
{
|
|
int backendPort = PickFreePort();
|
|
await using var backend = new StubBackend(backendPort);
|
|
|
|
var ctx = MakeContext("PLC1");
|
|
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
|
|
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx);
|
|
|
|
// Bring backend up via a primer request from a throwaway pipe.
|
|
var (primer, primerPipe, primerListener, _) = await ConnectClientAsync(mux, plc.Name);
|
|
await primer.SendAsync(BuildFc03ReadFrame(1, 0, 1), SocketFlags.None);
|
|
_ = await ReadOneFrameAsync(primer, TestContext.Current.CancellationToken);
|
|
primer.Dispose();
|
|
await primerPipe.DisposeAsync();
|
|
primerListener.Stop();
|
|
|
|
// Saturate after primer has released its slot.
|
|
DrainAllocator(mux);
|
|
|
|
// Two pipes, each sends the same FC03 (unitId=1, fc=03, start=200, qty=1).
|
|
var (cA, pA, lA, _) = await ConnectClientAsync(mux, plc.Name);
|
|
var (cB, pB, lB, _) = await ConnectClientAsync(mux, plc.Name);
|
|
try
|
|
{
|
|
const ushort txA = 0xAAA1;
|
|
const ushort txB = 0xBBB1;
|
|
// Fire concurrently; both targets the same coalescing key.
|
|
await Task.WhenAll(
|
|
cA.SendAsync(BuildFc03ReadFrame(txA, 200, 1), SocketFlags.None),
|
|
cB.SendAsync(BuildFc03ReadFrame(txB, 200, 1), SocketFlags.None));
|
|
|
|
var rspA = await ReadOneFrameAsync(cA, TestContext.Current.CancellationToken);
|
|
var rspB = await ReadOneFrameAsync(cB, TestContext.Current.CancellationToken);
|
|
|
|
// Both must be exception 04 with the original TxId echoed — the W1.2 contract
|
|
// is "no late attacher hangs."
|
|
foreach (var (rsp, expectedTxId, label) in new[] {
|
|
(rspA, txA, "A"), (rspB, txB, "B") })
|
|
{
|
|
ushort echo = (ushort)((rsp[0] << 8) | rsp[1]);
|
|
echo.ShouldBe(expectedTxId, $"{label}: original TxId must be echoed");
|
|
byte fc = rsp[7];
|
|
(fc & 0x80).ShouldBe(0x80, $"{label}: exception bit must be set");
|
|
rsp[8].ShouldBe((byte)0x04, $"{label}: exception code must be 0x04");
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
cA.Dispose(); cB.Dispose();
|
|
await pA.DisposeAsync(); await pB.DisposeAsync();
|
|
lA.Stop(); lB.Stop();
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// W3 #7 — backend-reader head-of-line block. One upstream pipe is wedged by the
|
|
/// test holding its socket-receive side without reading. The W1.3 fix routes the
|
|
/// fan-out through <c>TrySendResponse</c> so the per-PLC backend reader cannot be
|
|
/// stalled by a wedged pipe; responses to a healthy peer must keep flowing and the
|
|
/// wedged pipe's <c>responseDropForFullUpstream</c> counter must increment.
|
|
///
|
|
/// <para>Pre-W1.3 the synchronous <c>await SendResponseAsync</c> inside the reader
|
|
/// would block on the wedged pipe's full bounded channel and starve every peer.</para>
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task SlowUpstream_DoesNotStallPeerResponses_DropCounterIncrements()
|
|
{
|
|
int backendPort = PickFreePort();
|
|
await using var backend = new StubBackend(backendPort);
|
|
|
|
var ctx = MakeContext("PLC1");
|
|
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
|
|
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx);
|
|
|
|
// Wedged client A — connects but NEVER reads from its socket. The mux's per-pipe
|
|
// response channel (cap 16) plus the kernel send buffer eventually fill, then
|
|
// TrySendResponse returns false and the drop counter increments.
|
|
var (cA, pA, lA, _) = await ConnectClientAsync(mux, plc.Name);
|
|
// Healthy client B — will read responses normally.
|
|
var (cB, pB, lB, _) = await ConnectClientAsync(mux, plc.Name);
|
|
try
|
|
{
|
|
// Pump A with many requests but DON'T read responses from A's socket. The
|
|
// exact count to fill the channel + kernel buffer is environment-dependent;
|
|
// a few hundred 12-byte responses is enough on Windows loopback.
|
|
const int wedgePumpCount = 500;
|
|
for (ushort i = 1; i <= wedgePumpCount; i++)
|
|
{
|
|
await cA.SendAsync(BuildFc03ReadFrame(i, 0, 1), SocketFlags.None);
|
|
}
|
|
|
|
// Give time for backend round-trips to fan back, channels to fill, and drops
|
|
// to start happening on A.
|
|
// We don't synchronously assert the drop counter yet — we want to first prove
|
|
// B is still being served before doing the drop assertion, otherwise a slow
|
|
// CI machine might fail the timing on B before the drops register.
|
|
const ushort txB = 0xB001;
|
|
await cB.SendAsync(BuildFc03ReadFrame(txB, 0, 1), SocketFlags.None);
|
|
|
|
// B's response must arrive within a few hundred ms even with A wedged. If
|
|
// the W1.3 fix were missing, the reader would be blocked on A's channel and
|
|
// B would time out.
|
|
var rspB = await ReadOneFrameAsync(cB, TestContext.Current.CancellationToken)
|
|
.WaitAsync(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken);
|
|
ushort echoB = (ushort)((rspB[0] << 8) | rspB[1]);
|
|
echoB.ShouldBe(txB, "B's TxId must be echoed; reader must not be stalled by A");
|
|
|
|
// Now poll for A's drop counter to register. The drops happen as the reader
|
|
// tries to fan responses to A and the channel/socket is full; this needs
|
|
// pumpCount drops minus channel-capacity (16) minus a few in transit.
|
|
long drops = 0;
|
|
for (int i = 0; i < 50; i++)
|
|
{
|
|
drops = ctx.Counters.Snapshot().ResponseDropForFullUpstream;
|
|
if (drops > 0) break;
|
|
await Task.Delay(50, TestContext.Current.CancellationToken);
|
|
}
|
|
drops.ShouldBeGreaterThan(0,
|
|
"ResponseDropForFullUpstream must increment when A's bounded response channel fills");
|
|
}
|
|
finally
|
|
{
|
|
cA.Dispose(); cB.Dispose();
|
|
await pA.DisposeAsync(); await pB.DisposeAsync();
|
|
lA.Stop(); lB.Stop();
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// W3 #8 — watchdog↔response race. The W1 design uses claim-then-dispatch:
|
|
/// <c>CorrelationMap.TryRemove</c> is the single source of truth, so exactly ONE
|
|
/// of (response delivered, watchdog timeout) wins for any given proxy TxId. This
|
|
/// test exercises the race window directly: a stub backend that responds at almost
|
|
/// exactly the request-timeout deadline. Across many iterations the test drives
|
|
/// both branches and verifies the contract:
|
|
/// <list type="bullet">
|
|
/// <item>Exactly one response per request (no double-delivery).</item>
|
|
/// <item>Every response carries the original client TxId.</item>
|
|
/// <item>No request hangs.</item>
|
|
/// </list>
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task WatchdogVsResponse_Race_AlwaysExactlyOneOutcome_PerRequest()
|
|
{
|
|
// Backend that replies after a delay we can wedge to fall close to the watchdog
|
|
// deadline. Watchdog tick is max(100, RequestTimeoutMs/4); with RequestTimeoutMs
|
|
// = 400 the tick is 100 ms. Configure the backend to delay 350-450 ms for each
|
|
// request so some land before, some after the timeout.
|
|
int backendPort = PickFreePort();
|
|
// Phase 12 (W4 / T2) — deterministic alternation rather than seeded Random. Random
|
|
// with a fixed seed is not stable across .NET major versions (Microsoft has changed
|
|
// the implementation, e.g. legacy → Xoshiro128 in .NET 6), so a runtime upgrade
|
|
// could land all samples on one side of the watchdog deadline and break the
|
|
// "both branches must fire" assertion below. Counter-based alternation guarantees
|
|
// 15 fast (350 ms, beats watchdog) and 15 slow (450 ms, loses to watchdog) responses
|
|
// across 30 iterations, regardless of runtime.
|
|
int reqCount = 0;
|
|
var slowBackend = new SlowResponseBackend(backendPort, () =>
|
|
{
|
|
int n = Interlocked.Increment(ref reqCount);
|
|
return (n & 1) == 1 ? 350 : 450;
|
|
});
|
|
await using var _ = slowBackend;
|
|
|
|
var ctx = MakeContext("PLC1");
|
|
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
|
|
var connOpts = new ConnectionOptions { BackendRequestTimeoutMs = 400 };
|
|
await using var mux = await BuildMuxAsync(plc, connOpts, ctx);
|
|
|
|
var (client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name);
|
|
try
|
|
{
|
|
const int iterations = 30;
|
|
int normalResponses = 0;
|
|
int timeoutResponses = 0;
|
|
|
|
for (ushort i = 1; i <= iterations; i++)
|
|
{
|
|
await client.SendAsync(BuildFc03ReadFrame(i, 0, 1), SocketFlags.None);
|
|
var rsp = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken)
|
|
.WaitAsync(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken);
|
|
|
|
// Contract 1: original TxId echoed regardless of which branch won.
|
|
ushort echo = (ushort)((rsp[0] << 8) | rsp[1]);
|
|
echo.ShouldBe(i, $"iteration {i}: TxId must be echoed");
|
|
|
|
// Branch detection.
|
|
byte fc = rsp[7];
|
|
if ((fc & 0x80) != 0)
|
|
{
|
|
// Watchdog branch: exception 0x0B (Gateway Target Failed To Respond).
|
|
rsp[8].ShouldBe((byte)0x0B, $"iteration {i}: watchdog must use exception code 0x0B");
|
|
timeoutResponses++;
|
|
}
|
|
else
|
|
{
|
|
// Response branch: normal FC03 response.
|
|
normalResponses++;
|
|
}
|
|
}
|
|
|
|
// Contract 2: no request hung — every iteration produced exactly one response.
|
|
(normalResponses + timeoutResponses).ShouldBe(iterations);
|
|
// The race should produce a mix; 30 iterations spanning 350-450 ms with a
|
|
// 400 ms deadline should yield both branches in any healthy run. We assert
|
|
// both outcomes were hit at least once to prove the race window is real and
|
|
// both branches are exercised.
|
|
normalResponses.ShouldBeGreaterThan(0, "at least one request must beat the watchdog");
|
|
timeoutResponses.ShouldBeGreaterThan(0, "at least one request must lose to the watchdog");
|
|
}
|
|
finally
|
|
{
|
|
client.Dispose();
|
|
await pipe.DisposeAsync();
|
|
listener.Stop();
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// W3 #9 — cascade racing with new accepts. Stress-test: while the backend is repeatedly
|
|
/// killed and resurrected (forcing repeated cascade cycles), new upstream clients
|
|
/// connect and disconnect concurrently. The contract verified is the
|
|
/// no-crash-under-churn property: the multiplexer must survive arbitrary interleavings
|
|
/// of teardown and new-pipe-attach without throwing into the host or leaking sockets.
|
|
///
|
|
/// <para>The originally-flagged race window — a new pipe added between
|
|
/// <c>_pipes.Values.ToArray()</c> and <c>_pipes.Clear()</c> in <c>TearDownBackendAsync</c>
|
|
/// — leaves the new pipe alive but orphaned from <c>_pipes</c>. Its read loop will
|
|
/// receive normal traffic until the next cascade or its socket closes. This test
|
|
/// doesn't try to reproduce that exact ordering (which is microseconds wide); it
|
|
/// instead proves that arbitrary interleavings of cascade + accept under load do not
|
|
/// cause an observable failure (exception, hang, or counter inconsistency).</para>
|
|
/// </summary>
|
|
[Fact(Timeout = 15_000)]
|
|
public async Task CascadeVsNewAccept_StressChurn_NoCrash_NoHang()
|
|
{
|
|
int backendPort = PickFreePort();
|
|
|
|
var ctx = MakeContext("PLC1");
|
|
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
|
|
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx);
|
|
|
|
// Backend that's brought up and torn down repeatedly to force cascade cycles.
|
|
StubBackend? backend = new(backendPort);
|
|
|
|
const int cascadeCycles = 3;
|
|
const int connectsPerCycle = 8;
|
|
|
|
try
|
|
{
|
|
for (int cycle = 0; cycle < cascadeCycles; cycle++)
|
|
{
|
|
// Spawn many concurrent connect+request tasks.
|
|
var tasks = Enumerable.Range(0, connectsPerCycle).Select(async _ =>
|
|
{
|
|
Socket? client = null;
|
|
UpstreamPipe? pipe = null;
|
|
TcpListener? listener = null;
|
|
try
|
|
{
|
|
(client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name);
|
|
await client.SendAsync(BuildFc03ReadFrame(1, 0, 1), SocketFlags.None,
|
|
TestContext.Current.CancellationToken);
|
|
// Read with a short deadline; cascades will close us mid-flight.
|
|
try
|
|
{
|
|
await ReadOneFrameAsync(client, TestContext.Current.CancellationToken)
|
|
.WaitAsync(TimeSpan.FromMilliseconds(500), TestContext.Current.CancellationToken);
|
|
}
|
|
catch
|
|
{
|
|
// Expected when our pipe is cascaded mid-request — no test failure.
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
try { client?.Dispose(); } catch { }
|
|
try { if (pipe is not null) await pipe.DisposeAsync(); } catch { }
|
|
try { listener?.Stop(); } catch { }
|
|
}
|
|
}).ToArray();
|
|
|
|
// Wait for the connect storm to be in flight, then kill the backend mid-storm.
|
|
await Task.Delay(40, TestContext.Current.CancellationToken);
|
|
await backend.DisposeAsync();
|
|
|
|
// Drain the connect tasks.
|
|
await Task.WhenAll(tasks).WaitAsync(TimeSpan.FromSeconds(5),
|
|
TestContext.Current.CancellationToken);
|
|
|
|
// Bring backend back up for the next cycle.
|
|
backend = new StubBackend(backendPort);
|
|
await Task.Delay(50, TestContext.Current.CancellationToken);
|
|
}
|
|
|
|
// Survival check — the multiplexer must still service a normal request after
|
|
// all the cascade churn.
|
|
var (cFinal, pFinal, lFinal, _) = await ConnectClientAsync(mux, plc.Name);
|
|
try
|
|
{
|
|
await cFinal.SendAsync(BuildFc03ReadFrame(0xF1F1, 0, 1), SocketFlags.None,
|
|
TestContext.Current.CancellationToken);
|
|
var rsp = await ReadOneFrameAsync(cFinal, TestContext.Current.CancellationToken)
|
|
.WaitAsync(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken);
|
|
ushort echo = (ushort)((rsp[0] << 8) | rsp[1]);
|
|
echo.ShouldBe((ushort)0xF1F1, "multiplexer must still serve traffic after cascade churn");
|
|
}
|
|
finally
|
|
{
|
|
cFinal.Dispose();
|
|
await pFinal.DisposeAsync();
|
|
lFinal.Stop();
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
try { await backend.DisposeAsync(); } catch { }
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Backend stub that delays each response by a caller-supplied amount. Used by the
|
|
/// W3 #8 watchdog race test.
|
|
/// </summary>
|
|
private sealed class SlowResponseBackend : IAsyncDisposable
|
|
{
|
|
private readonly TcpListener _listener;
|
|
private readonly CancellationTokenSource _cts = new();
|
|
private readonly Func<int> _delayMsFactory;
|
|
private readonly List<Task> _clientTasks = new();
|
|
|
|
public SlowResponseBackend(int port, Func<int> delayMsFactory)
|
|
{
|
|
_delayMsFactory = delayMsFactory;
|
|
_listener = new TcpListener(IPAddress.Loopback, port);
|
|
_listener.Start();
|
|
_ = AcceptLoop();
|
|
}
|
|
|
|
private async Task AcceptLoop()
|
|
{
|
|
try
|
|
{
|
|
while (!_cts.IsCancellationRequested)
|
|
{
|
|
var s = await _listener.AcceptSocketAsync(_cts.Token);
|
|
var task = Task.Run(() => HandleAsync(s));
|
|
lock (_clientTasks) _clientTasks.Add(task);
|
|
}
|
|
}
|
|
catch { /* shutdown */ }
|
|
}
|
|
|
|
private async Task HandleAsync(Socket s)
|
|
{
|
|
try
|
|
{
|
|
while (!_cts.IsCancellationRequested)
|
|
{
|
|
var req = await ReadOneFrameAsync(s, _cts.Token);
|
|
if (req.Length < 8) break;
|
|
|
|
ushort txId = (ushort)((req[0] << 8) | req[1]);
|
|
byte unitId = req[6];
|
|
int delayMs = _delayMsFactory();
|
|
// Schedule the response after delay; don't block this iteration so
|
|
// pipelined requests can race within one client connection.
|
|
_ = Task.Run(async () =>
|
|
{
|
|
try
|
|
{
|
|
await Task.Delay(delayMs, _cts.Token);
|
|
byte[] rsp = BuildFc03Response(txId, unitId, 0x0001);
|
|
await s.SendAsync(rsp, SocketFlags.None, _cts.Token);
|
|
}
|
|
catch { /* shutdown / pipe down */ }
|
|
});
|
|
}
|
|
}
|
|
catch { /* normal */ }
|
|
finally { try { s.Dispose(); } catch { } }
|
|
}
|
|
|
|
public async ValueTask DisposeAsync()
|
|
{
|
|
await _cts.CancelAsync();
|
|
try { _listener.Stop(); } catch { }
|
|
Task[] snap;
|
|
lock (_clientTasks) snap = _clientTasks.ToArray();
|
|
try { await Task.WhenAll(snap).WaitAsync(TimeSpan.FromSeconds(2)); } catch { }
|
|
_cts.Dispose();
|
|
}
|
|
}
|
|
}
|