1db900edef
Layers a per-PLC, per-tag response cache on top of Phase 10's coalescing.
Cache is OFF by default per tag (CacheTtlMs = 0); a fresh deployment with no
TTL config behaves identically to Phase 10. Operators opt tags in by setting
CacheTtlMs > 0 on a BcdTagOptions entry (or DefaultCacheTtlMs > 0 on a
PlcOptions entry), explicitly acknowledging the staleness window.
Cache lookup order: cache -> coalesce -> backend. A cache hit short-circuits
both Phase 10's coalescing path and Phase 9's backend send. Cache stores
POST-rewriter PDU bytes so hits never re-invoke the BCD rewriter. FC06/FC16
write responses invalidate every cached entry whose address range overlaps
the write (half-open interval math).
New types (Mbproxy.Proxy.Cache, all internal):
- CacheKey (record-struct, same shape as CoalescingKey but kept SEPARATE so
the two phases evolve independently).
- CacheEntry, ResponseCache (IDisposable; LRU + PeriodicTimer eviction
loop), CacheInvalidator (pure overlap matcher), CacheLogEvents (stable
mbproxy.cache.* names).
Multi-tag range TTL = min(TTLs); any tag with TTL = 0 in the range disables
caching for the whole read (conservative-by-design).
Options surface:
- BcdTagOptions.CacheTtlMs (nullable int; null = fall through to PLC default)
- PlcOptions.DefaultCacheTtlMs
- MbproxyOptions.Cache.{AllowLongTtl, MaxEntriesPerPlc, EvictionIntervalMs}
- TTL > 60_000 ms requires Cache.AllowLongTtl = true (reload validation).
Admin counters (Tier 1.8 + Tier 2 cache-memory KPIs from docs/kpi.md):
- CacheHitCount, CacheMissCount, CacheInvalidations on ProxyCounters.
- CacheEntryCount, CacheBytes via a new ICacheStatsProvider snapshot path.
- /status.json and the HTML page surface a new Cache cell per PLC row.
Hot-reload: any tag-list change to a PLC reseats the per-PLC context with a
fresh cache; the old cache is disposed inside ReplaceContextAsync. Per-tag
flush granularity is intentionally not implemented in v1.
PLCs with no cache-eligible tags (every resolved tag has CacheTtlMs = 0)
get Cache = null on the context and skip the eviction timer entirely, so
the no-cache path is byte-identical to Phase 10.
Tests (32 new unit + 5 new E2E = 37 new; suite now 314 unit + 48 E2E):
- CacheKeyTests, CacheEntryTests (records + boundary semantics).
- CacheInvalidatorTests: full overlap, both partials, adjacent-not-
overlapping, disjoint, different unit ID + auxiliary FC-filter / zero-qty.
- ResponseCacheTests: round-trip, lazy expiry, range invalidation,
unit-id filter, LRU bound, LRU access tracking, concurrent get/set,
dispose, clear, approximate-bytes accounting.
- ResponseCacheMultiplexerTests (stub-backend): hit short-circuits
coalescing, BCD-decoded bytes are cached not raw, FC06 invalidates
overlapping, non-overlapping write does not invalidate, multi-tag
TTL=min rule, regression-cache-disabled-by-default-is-Phase-10, hit
works even when backend unreachable.
- ResponseCacheE2ETests (pymodbus DL205 sim, sequential reads):
* Headline: 10 reads with TTL=1000 ms -> 9 hits, 1 miss, 1 backend trip.
* TTL expiry path with sleep > TTL.
* Write invalidation through the proxy on a scratch register.
* BCD-decoded bytes are cached, not raw BCD nibbles.
* Regression: Cache disabled by default -> behaviour byte-identical to
Phase 10.
Pre-existing flake hardened: BackendDisconnect_CascadesToAllUpstreams now
polls briefly for the cascade counter to absorb the inherent scheduling
gap between "upstream EOF observed" and "counter incremented inside
TearDownBackendAsync." Counter semantics unchanged.
Phase doc updated with implementation clarifications discovered during
this work (CacheKey kept separate from CoalescingKey, LastUsedTick is
long, FC06/FC16 startAddr/qty parsing extension, cache-pre-connect
short-circuit, write-invalidation only on successful responses).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
627 lines
25 KiB
C#
627 lines
25 KiB
C#
using System.Collections.Concurrent;
|
|
using System.Collections.Frozen;
|
|
using System.Net;
|
|
using System.Net.Sockets;
|
|
using Mbproxy.Bcd;
|
|
using Mbproxy.Options;
|
|
using Mbproxy.Proxy;
|
|
using Mbproxy.Proxy.Multiplexing;
|
|
using Microsoft.Extensions.Logging.Abstractions;
|
|
using Shouldly;
|
|
using Xunit;
|
|
|
|
namespace Mbproxy.Tests.Proxy.Multiplexing;
|
|
|
|
/// <summary>
|
|
/// Integration tests for <see cref="PlcMultiplexer"/> against a stub backend
|
|
/// (a <see cref="TcpListener"/> that canned-responds). Uses real sockets but no simulator.
|
|
/// </summary>
|
|
[Trait("Category", "Unit")]
|
|
public sealed class PlcMultiplexerTests
|
|
{
|
|
// ── Helpers ────────────────────────────────────────────────────────────────
|
|
|
|
private static int PickFreePort()
|
|
{
|
|
var l = new TcpListener(IPAddress.Loopback, 0);
|
|
l.Start();
|
|
int port = ((IPEndPoint)l.LocalEndpoint).Port;
|
|
l.Stop();
|
|
return port;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Reads exactly <paramref name="count"/> bytes from <paramref name="socket"/>.
|
|
/// </summary>
|
|
private static async Task<byte[]> ReadExactAsync(Socket socket, int count, CancellationToken ct)
|
|
{
|
|
var buf = new byte[count];
|
|
int read = 0;
|
|
while (read < count)
|
|
{
|
|
int n = await socket.ReceiveAsync(buf.AsMemory(read, count - read), SocketFlags.None, ct);
|
|
if (n == 0) throw new IOException("EOF");
|
|
read += n;
|
|
}
|
|
return buf;
|
|
}
|
|
|
|
private static async Task<byte[]> ReadOneFrameAsync(Socket socket, CancellationToken ct)
|
|
{
|
|
var header = await ReadExactAsync(socket, 7, ct);
|
|
ushort length = (ushort)((header[4] << 8) | header[5]);
|
|
int bodyLen = length - 1;
|
|
var body = bodyLen > 0 ? await ReadExactAsync(socket, bodyLen, ct) : Array.Empty<byte>();
|
|
var frame = new byte[7 + bodyLen];
|
|
Buffer.BlockCopy(header, 0, frame, 0, 7);
|
|
if (bodyLen > 0) Buffer.BlockCopy(body, 0, frame, 7, bodyLen);
|
|
return frame;
|
|
}
|
|
|
|
private static byte[] BuildFc03ReadFrame(ushort txId, ushort start, ushort qty, byte unitId = 1)
|
|
=>
|
|
[
|
|
(byte)(txId >> 8), (byte)(txId & 0xFF),
|
|
0x00, 0x00,
|
|
0x00, 0x06,
|
|
unitId,
|
|
0x03,
|
|
(byte)(start >> 8), (byte)(start & 0xFF),
|
|
(byte)(qty >> 8), (byte)(qty & 0xFF),
|
|
];
|
|
|
|
private static byte[] BuildFc06WriteFrame(ushort txId, ushort addr, ushort value, byte unitId = 1)
|
|
=>
|
|
[
|
|
(byte)(txId >> 8), (byte)(txId & 0xFF),
|
|
0x00, 0x00,
|
|
0x00, 0x06,
|
|
unitId,
|
|
0x06,
|
|
(byte)(addr >> 8), (byte)(addr & 0xFF),
|
|
(byte)(value >> 8), (byte)(value & 0xFF),
|
|
];
|
|
|
|
private static byte[] BuildFc03Response(ushort txId, byte unitId, params ushort[] registers)
|
|
{
|
|
int bodyLen = 2 + registers.Length * 2; // FC + byteCount + register data
|
|
var frame = new byte[7 + bodyLen];
|
|
frame[0] = (byte)(txId >> 8);
|
|
frame[1] = (byte)(txId & 0xFF);
|
|
frame[2] = 0;
|
|
frame[3] = 0;
|
|
ushort length = (ushort)(1 + bodyLen); // UnitId + PDU
|
|
frame[4] = (byte)(length >> 8);
|
|
frame[5] = (byte)(length & 0xFF);
|
|
frame[6] = unitId;
|
|
frame[7] = 0x03;
|
|
frame[8] = (byte)(registers.Length * 2);
|
|
for (int i = 0; i < registers.Length; i++)
|
|
{
|
|
frame[9 + i * 2] = (byte)(registers[i] >> 8);
|
|
frame[9 + i * 2 + 1] = (byte)(registers[i] & 0xFF);
|
|
}
|
|
return frame;
|
|
}
|
|
|
|
/// <summary>
|
|
/// FC06 response echo with txId / addr / value.
|
|
/// </summary>
|
|
private static byte[] BuildFc06Response(ushort txId, byte unitId, ushort addr, ushort value)
|
|
{
|
|
var frame = new byte[7 + 5];
|
|
frame[0] = (byte)(txId >> 8);
|
|
frame[1] = (byte)(txId & 0xFF);
|
|
frame[2] = 0; frame[3] = 0;
|
|
frame[4] = 0; frame[5] = 6; // length: UnitId(1) + FC(1) + Addr(2) + Value(2)
|
|
frame[6] = unitId;
|
|
frame[7] = 0x06;
|
|
frame[8] = (byte)(addr >> 8);
|
|
frame[9] = (byte)(addr & 0xFF);
|
|
frame[10] = (byte)(value >> 8);
|
|
frame[11] = (byte)(value & 0xFF);
|
|
return frame;
|
|
}
|
|
|
|
private static PerPlcContext MakeContext(string name, params BcdTag[] tags)
|
|
{
|
|
var frozen = tags.ToDictionary(t => t.Address).ToFrozenDictionary();
|
|
var map = frozen.Count > 0 ? new BcdTagMap(frozen) : BcdTagMap.Empty;
|
|
return new PerPlcContext
|
|
{
|
|
PlcName = name,
|
|
TagMap = map,
|
|
Counters = new ProxyCounters(),
|
|
Logger = NullLogger.Instance,
|
|
};
|
|
}
|
|
|
|
/// <summary>
|
|
/// A stub backend that echoes FC03 responses for every request, recording the proxy
|
|
/// TxIds it sees on the wire so tests can verify the multiplexer rewrites them.
|
|
/// </summary>
|
|
private sealed class StubBackend : IAsyncDisposable
|
|
{
|
|
public int Port { get; }
|
|
private readonly TcpListener _listener;
|
|
private readonly CancellationTokenSource _cts = new();
|
|
private readonly List<Task> _clientTasks = new();
|
|
public ConcurrentQueue<ushort> SeenProxyTxIds { get; } = new();
|
|
public Func<byte, ushort, ushort, ushort, byte[]>? FcResponseFactory { get; set; }
|
|
|
|
public StubBackend(int port)
|
|
{
|
|
Port = port;
|
|
_listener = new TcpListener(IPAddress.Loopback, port);
|
|
_listener.Start();
|
|
_ = AcceptLoop();
|
|
}
|
|
|
|
private async Task AcceptLoop()
|
|
{
|
|
try
|
|
{
|
|
while (!_cts.IsCancellationRequested)
|
|
{
|
|
Socket s = await _listener.AcceptSocketAsync(_cts.Token);
|
|
var task = Task.Run(() => HandleAsync(s));
|
|
lock (_clientTasks) _clientTasks.Add(task);
|
|
}
|
|
}
|
|
catch { /* shutdown */ }
|
|
}
|
|
|
|
private async Task HandleAsync(Socket s)
|
|
{
|
|
try
|
|
{
|
|
while (!_cts.IsCancellationRequested)
|
|
{
|
|
var req = await ReadOneFrameAsync(s, _cts.Token);
|
|
if (req.Length < 8) break;
|
|
|
|
ushort txId = (ushort)((req[0] << 8) | req[1]);
|
|
SeenProxyTxIds.Enqueue(txId);
|
|
byte unitId = req[6];
|
|
byte fc = req[7];
|
|
|
|
byte[] response;
|
|
if (FcResponseFactory is not null)
|
|
{
|
|
ushort start = req.Length >= 10 ? (ushort)((req[8] << 8) | req[9]) : (ushort)0;
|
|
ushort qty = req.Length >= 12 ? (ushort)((req[10] << 8) | req[11]) : (ushort)0;
|
|
response = FcResponseFactory(fc, start, qty, txId);
|
|
}
|
|
else if (fc == 0x03)
|
|
{
|
|
// Default: FC03 echo a single register containing 0x1234.
|
|
response = BuildFc03Response(txId, unitId, 0x1234);
|
|
}
|
|
else if (fc == 0x06)
|
|
{
|
|
ushort addr = (ushort)((req[8] << 8) | req[9]);
|
|
ushort value = (ushort)((req[10] << 8) | req[11]);
|
|
response = BuildFc06Response(txId, unitId, addr, value);
|
|
}
|
|
else
|
|
{
|
|
break;
|
|
}
|
|
await s.SendAsync(response, SocketFlags.None, _cts.Token);
|
|
}
|
|
}
|
|
catch { /* normal */ }
|
|
finally { try { s.Dispose(); } catch { } }
|
|
}
|
|
|
|
public async ValueTask DisposeAsync()
|
|
{
|
|
await _cts.CancelAsync();
|
|
try { _listener.Stop(); } catch { }
|
|
Task[] snap;
|
|
lock (_clientTasks) snap = _clientTasks.ToArray();
|
|
try { await Task.WhenAll(snap).WaitAsync(TimeSpan.FromSeconds(2)); } catch { }
|
|
_cts.Dispose();
|
|
}
|
|
}
|
|
|
|
private static async Task<PlcMultiplexer> BuildMuxAsync(
|
|
PlcOptions plc, ConnectionOptions connOpts, PerPlcContext ctx)
|
|
{
|
|
var mux = new PlcMultiplexer(
|
|
plc, connOpts,
|
|
new BcdPduPipeline(),
|
|
ctx,
|
|
NullLogger<PlcMultiplexer>.Instance,
|
|
backendConnectPipeline: null);
|
|
await Task.Yield();
|
|
return mux;
|
|
}
|
|
|
|
private static async Task<(Socket client, UpstreamPipe pipe, TcpListener proxyListener, int proxyPort)>
|
|
ConnectClientAsync(PlcMultiplexer mux, string plcName)
|
|
{
|
|
int proxyPort = PickFreePort();
|
|
var proxyListener = new TcpListener(IPAddress.Loopback, proxyPort);
|
|
proxyListener.Start();
|
|
|
|
var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)
|
|
{ NoDelay = true };
|
|
await client.ConnectAsync(IPAddress.Loopback, proxyPort);
|
|
var upstream = await proxyListener.AcceptSocketAsync();
|
|
var pipe = new UpstreamPipe(upstream, plcName, NullLogger.Instance);
|
|
_ = Task.Run(() => mux.StartPipeAsync(pipe, CancellationToken.None));
|
|
|
|
return (client, pipe, proxyListener, proxyPort);
|
|
}
|
|
|
|
// ── Tests ─────────────────────────────────────────────────────────────────
|
|
|
|
[Fact]
|
|
public async Task SingleUpstream_RoundTripsFC03_Through_Multiplexer()
|
|
{
|
|
int backendPort = PickFreePort();
|
|
await using var backend = new StubBackend(backendPort);
|
|
|
|
var ctx = MakeContext("PLC1", BcdTag.Create(100, 16));
|
|
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
|
|
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx);
|
|
|
|
var (client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name);
|
|
try
|
|
{
|
|
await client.SendAsync(BuildFc03ReadFrame(0x1234, 100, 1), SocketFlags.None);
|
|
var rsp = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken);
|
|
|
|
ushort rspTxId = (ushort)((rsp[0] << 8) | rsp[1]);
|
|
rspTxId.ShouldBe((ushort)0x1234, "the original TxId must be restored on the way back to the client");
|
|
|
|
// BCD decode of the stub's 0x1234 response = 1234.
|
|
ushort decoded = (ushort)((rsp[9] << 8) | rsp[10]);
|
|
decoded.ShouldBe((ushort)1234);
|
|
}
|
|
finally
|
|
{
|
|
client.Dispose();
|
|
await pipe.DisposeAsync();
|
|
listener.Stop();
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task SingleUpstream_RoundTripsFC06_Through_Multiplexer()
|
|
{
|
|
int backendPort = PickFreePort();
|
|
await using var backend = new StubBackend(backendPort);
|
|
|
|
var ctx = MakeContext("PLC1", BcdTag.Create(200, 16));
|
|
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
|
|
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx);
|
|
|
|
var (client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name);
|
|
try
|
|
{
|
|
// Client writes binary 1234; proxy encodes to BCD 0x1234 on the way out.
|
|
await client.SendAsync(BuildFc06WriteFrame(0xABCD, 200, 1234), SocketFlags.None);
|
|
var rsp = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken);
|
|
|
|
ushort rspTxId = (ushort)((rsp[0] << 8) | rsp[1]);
|
|
rspTxId.ShouldBe((ushort)0xABCD);
|
|
|
|
// Echo bytes decoded back to client binary.
|
|
ushort echoed = (ushort)((rsp[10] << 8) | rsp[11]);
|
|
echoed.ShouldBe((ushort)1234);
|
|
}
|
|
finally
|
|
{
|
|
client.Dispose();
|
|
await pipe.DisposeAsync();
|
|
listener.Stop();
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task TwoUpstreams_ConcurrentFC03_BothGetCorrectResponses()
|
|
{
|
|
int backendPort = PickFreePort();
|
|
await using var backend = new StubBackend(backendPort)
|
|
{
|
|
// Both clients read address 100; both should see their own TxId echoed.
|
|
FcResponseFactory = (fc, start, qty, txId) =>
|
|
{
|
|
byte unitId = 1;
|
|
return fc == 0x03
|
|
? BuildFc03Response(txId, unitId, 0x1234)
|
|
: throw new InvalidOperationException("unexpected fc");
|
|
},
|
|
};
|
|
|
|
var ctx = MakeContext("PLC1", BcdTag.Create(100, 16));
|
|
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
|
|
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx);
|
|
|
|
var (c1, p1, l1, _) = await ConnectClientAsync(mux, plc.Name);
|
|
var (c2, p2, l2, _) = await ConnectClientAsync(mux, plc.Name);
|
|
try
|
|
{
|
|
// Both clients use the same upstream TxId (0x0001). That would clash on a
|
|
// shared backend wire if the mux didn't rewrite the TxId.
|
|
await c1.SendAsync(BuildFc03ReadFrame(0x0001, 100, 1), SocketFlags.None);
|
|
await c2.SendAsync(BuildFc03ReadFrame(0x0001, 100, 1), SocketFlags.None);
|
|
|
|
var r1 = await ReadOneFrameAsync(c1, TestContext.Current.CancellationToken);
|
|
var r2 = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken);
|
|
|
|
// Both responses must carry the original (colliding) TxId.
|
|
((ushort)((r1[0] << 8) | r1[1])).ShouldBe((ushort)0x0001);
|
|
((ushort)((r2[0] << 8) | r2[1])).ShouldBe((ushort)0x0001);
|
|
}
|
|
finally
|
|
{
|
|
c1.Dispose(); c2.Dispose();
|
|
await p1.DisposeAsync(); await p2.DisposeAsync();
|
|
l1.Stop(); l2.Stop();
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task TwoUpstreams_ProxyTxIds_AreDistinct_OnTheWire()
|
|
{
|
|
int backendPort = PickFreePort();
|
|
await using var backend = new StubBackend(backendPort);
|
|
|
|
var ctx = MakeContext("PLC1");
|
|
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
|
|
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx);
|
|
|
|
var (c1, p1, l1, _) = await ConnectClientAsync(mux, plc.Name);
|
|
var (c2, p2, l2, _) = await ConnectClientAsync(mux, plc.Name);
|
|
try
|
|
{
|
|
// Both clients use the same upstream TxId 0x0007 — the proxy must hand out
|
|
// distinct proxy TxIds on the backend wire. Phase 10: reads target DIFFERENT
|
|
// addresses so coalescing does not fuse them into a single backend request.
|
|
await c1.SendAsync(BuildFc03ReadFrame(0x0007, 0, 1), SocketFlags.None);
|
|
await c2.SendAsync(BuildFc03ReadFrame(0x0007, 10, 1), SocketFlags.None);
|
|
|
|
_ = await ReadOneFrameAsync(c1, TestContext.Current.CancellationToken);
|
|
_ = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken);
|
|
|
|
// Collect what the backend saw.
|
|
var seen = new HashSet<ushort>(backend.SeenProxyTxIds);
|
|
seen.Count.ShouldBeGreaterThanOrEqualTo(2, "the multiplexer must allocate distinct proxy TxIds even when upstreams collide");
|
|
}
|
|
finally
|
|
{
|
|
c1.Dispose(); c2.Dispose();
|
|
await p1.DisposeAsync(); await p2.DisposeAsync();
|
|
l1.Stop(); l2.Stop();
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task UpstreamDisconnect_DoesNotAffectOtherUpstreams()
|
|
{
|
|
int backendPort = PickFreePort();
|
|
await using var backend = new StubBackend(backendPort);
|
|
|
|
var ctx = MakeContext("PLC1");
|
|
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
|
|
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx);
|
|
|
|
var (cA, pA, lA, _) = await ConnectClientAsync(mux, plc.Name);
|
|
var (cB, pB, lB, _) = await ConnectClientAsync(mux, plc.Name);
|
|
try
|
|
{
|
|
// Drop client A entirely.
|
|
cA.Dispose();
|
|
await Task.Delay(50, TestContext.Current.CancellationToken);
|
|
|
|
// Client B should still be able to round-trip.
|
|
await cB.SendAsync(BuildFc03ReadFrame(0x0042, 0, 1), SocketFlags.None);
|
|
var rsp = await ReadOneFrameAsync(cB, TestContext.Current.CancellationToken);
|
|
((ushort)((rsp[0] << 8) | rsp[1])).ShouldBe((ushort)0x0042);
|
|
}
|
|
finally
|
|
{
|
|
cB.Dispose();
|
|
await pA.DisposeAsync(); await pB.DisposeAsync();
|
|
lA.Stop(); lB.Stop();
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task BackendDisconnect_CascadesToAllUpstreams()
|
|
{
|
|
int backendPort = PickFreePort();
|
|
var backend = new StubBackend(backendPort);
|
|
|
|
var ctx = MakeContext("PLC1");
|
|
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
|
|
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx);
|
|
|
|
var (cA, pA, lA, _) = await ConnectClientAsync(mux, plc.Name);
|
|
var (cB, pB, lB, _) = await ConnectClientAsync(mux, plc.Name);
|
|
var (cC, pC, lC, _) = await ConnectClientAsync(mux, plc.Name);
|
|
try
|
|
{
|
|
// Force a round-trip on each so backend connect occurs first.
|
|
await cA.SendAsync(BuildFc03ReadFrame(1, 0, 1), SocketFlags.None);
|
|
await cB.SendAsync(BuildFc03ReadFrame(2, 0, 1), SocketFlags.None);
|
|
await cC.SendAsync(BuildFc03ReadFrame(3, 0, 1), SocketFlags.None);
|
|
_ = await ReadOneFrameAsync(cA, TestContext.Current.CancellationToken);
|
|
_ = await ReadOneFrameAsync(cB, TestContext.Current.CancellationToken);
|
|
_ = await ReadOneFrameAsync(cC, TestContext.Current.CancellationToken);
|
|
|
|
// Kill the backend.
|
|
await backend.DisposeAsync();
|
|
|
|
// All three upstream sockets should observe a clean EOF within 500 ms.
|
|
var sw = System.Diagnostics.Stopwatch.StartNew();
|
|
await WaitForCloseAsync(cA, TestContext.Current.CancellationToken);
|
|
await WaitForCloseAsync(cB, TestContext.Current.CancellationToken);
|
|
await WaitForCloseAsync(cC, TestContext.Current.CancellationToken);
|
|
sw.Stop();
|
|
sw.ElapsedMilliseconds.ShouldBeLessThan(2000, "cascade should propagate quickly");
|
|
|
|
// Poll briefly for the cascade counter — there is an inherent scheduling gap
|
|
// between "upstream socket EOF observed" (WaitForCloseAsync returns) and "the
|
|
// multiplexer's TearDownBackendAsync increments the counter after awaiting
|
|
// every pipe.DisposeAsync". This poll absorbs that scheduling jitter without
|
|
// weakening the assertion's semantics — the counter MUST reach 3 (or more)
|
|
// because all three upstream pipes were attached when the cascade fired.
|
|
long cascades = 0;
|
|
for (int i = 0; i < 50; i++)
|
|
{
|
|
cascades = ctx.Counters.Snapshot().BackendDisconnectCascades;
|
|
if (cascades >= 3) break;
|
|
await Task.Delay(20, TestContext.Current.CancellationToken);
|
|
}
|
|
cascades.ShouldBeGreaterThanOrEqualTo(3);
|
|
}
|
|
finally
|
|
{
|
|
cA.Dispose(); cB.Dispose(); cC.Dispose();
|
|
await pA.DisposeAsync(); await pB.DisposeAsync(); await pC.DisposeAsync();
|
|
lA.Stop(); lB.Stop(); lC.Stop();
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task RequestTimeoutWatchdog_DeliversException0B_ToUpstream_WhenBackendNeverResponds()
|
|
{
|
|
// A drain-only stub that consumes requests but never responds. The multiplexer's
|
|
// per-request watchdog must surface a Modbus exception 0x0B to the upstream client
|
|
// once BackendRequestTimeoutMs elapses, freeing the proxy TxId + correlation entry.
|
|
int backendPort = PickFreePort();
|
|
var drainListener = new TcpListener(IPAddress.Loopback, backendPort);
|
|
drainListener.Start();
|
|
var drainCts = new CancellationTokenSource();
|
|
var drainToken = drainCts.Token;
|
|
_ = Task.Run(async () =>
|
|
{
|
|
try
|
|
{
|
|
while (!drainToken.IsCancellationRequested)
|
|
{
|
|
var s = await drainListener.AcceptSocketAsync(drainToken);
|
|
_ = Task.Run(async () =>
|
|
{
|
|
var buf = new byte[256];
|
|
try
|
|
{
|
|
while (!drainToken.IsCancellationRequested)
|
|
{
|
|
int n = await s.ReceiveAsync(buf, SocketFlags.None, drainToken);
|
|
if (n == 0) break;
|
|
}
|
|
}
|
|
catch { }
|
|
finally { try { s.Dispose(); } catch { } }
|
|
}, drainToken);
|
|
}
|
|
}
|
|
catch { }
|
|
}, drainToken);
|
|
|
|
try
|
|
{
|
|
var ctx = MakeContext("PLC1");
|
|
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
|
|
// Short request timeout so the test does not have to wait long.
|
|
var connOpts = new ConnectionOptions { BackendRequestTimeoutMs = 400 };
|
|
await using var mux = await BuildMuxAsync(plc, connOpts, ctx);
|
|
|
|
var (client, pipe, listener, _) = await ConnectClientAsync(mux, plc.Name);
|
|
try
|
|
{
|
|
await client.SendAsync(BuildFc03ReadFrame(0xABCD, 0, 1), SocketFlags.None);
|
|
|
|
// The watchdog should deliver an exception within ~watchdog-tick * 2.
|
|
var rsp = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken);
|
|
|
|
ushort rspTxId = (ushort)((rsp[0] << 8) | rsp[1]);
|
|
rspTxId.ShouldBe((ushort)0xABCD, "watchdog must echo the original client TxId");
|
|
byte fcByte = rsp[7];
|
|
(fcByte & 0x80).ShouldBe(0x80, "FC must have the exception bit set");
|
|
(fcByte & 0x7F).ShouldBe(0x03, "original FC must be FC03 (read holding registers)");
|
|
rsp[8].ShouldBe((byte)0x0B, "exception code must be 0x0B (Gateway Target Device Failed To Respond)");
|
|
}
|
|
finally
|
|
{
|
|
client.Dispose();
|
|
await pipe.DisposeAsync();
|
|
listener.Stop();
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
await drainCts.CancelAsync();
|
|
try { drainListener.Stop(); } catch { }
|
|
drainCts.Dispose();
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task BackendReconnect_AfterCascade_NextUpstreamRequest_Succeeds()
|
|
{
|
|
int backendPort = PickFreePort();
|
|
var backend = new StubBackend(backendPort);
|
|
|
|
var ctx = MakeContext("PLC1");
|
|
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
|
|
await using var mux = await BuildMuxAsync(plc, new ConnectionOptions(), ctx);
|
|
|
|
var (cA, pA, lA, _) = await ConnectClientAsync(mux, plc.Name);
|
|
try
|
|
{
|
|
await cA.SendAsync(BuildFc03ReadFrame(1, 0, 1), SocketFlags.None);
|
|
_ = await ReadOneFrameAsync(cA, TestContext.Current.CancellationToken);
|
|
|
|
await backend.DisposeAsync();
|
|
await WaitForCloseAsync(cA, TestContext.Current.CancellationToken);
|
|
cA.Dispose();
|
|
await pA.DisposeAsync();
|
|
lA.Stop();
|
|
}
|
|
catch { /* tolerate any teardown noise */ }
|
|
|
|
// Start a new backend on the same port.
|
|
await using var backend2 = new StubBackend(backendPort);
|
|
|
|
// A fresh client should round-trip cleanly through the same multiplexer.
|
|
var (cB, pB, lB, _) = await ConnectClientAsync(mux, plc.Name);
|
|
try
|
|
{
|
|
await cB.SendAsync(BuildFc03ReadFrame(0x7777, 0, 1), SocketFlags.None);
|
|
var rsp = await ReadOneFrameAsync(cB, TestContext.Current.CancellationToken);
|
|
((ushort)((rsp[0] << 8) | rsp[1])).ShouldBe((ushort)0x7777);
|
|
}
|
|
finally
|
|
{
|
|
cB.Dispose();
|
|
await pB.DisposeAsync();
|
|
lB.Stop();
|
|
}
|
|
}
|
|
|
|
private static async Task WaitForCloseAsync(Socket s, CancellationToken ct)
|
|
{
|
|
var buf = new byte[1];
|
|
using var deadline = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
|
deadline.CancelAfter(TimeSpan.FromSeconds(2));
|
|
while (!deadline.IsCancellationRequested)
|
|
{
|
|
try
|
|
{
|
|
int n = await s.ReceiveAsync(buf, SocketFlags.None, deadline.Token);
|
|
if (n == 0) return;
|
|
}
|
|
catch
|
|
{
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
}
|