Files
Joseph Doherty a2dba4bd07 mbproxy: add in-flight read coalescing (Phase 10)
When two or more upstream clients send the same FC03/FC04 read while a
matching request is already in flight on the same PLC's multiplexed
backend socket, attach the late arrivals to the existing InFlightRequest
.InterestedParties list instead of opening a second backend round-trip.
The single backend response fans out to every attached party with each
party's original MBAP TxId restored individually. Zero post-response
staleness — coalescing operates entirely within the in-flight window
(microseconds to ~10 ms typical); the proxy is NOT a cache layer.

Headline mechanism:

- New record struct CoalescingKey(UnitId, Fc, StartAddress, Qty) keys
  the per-PLC InFlightByKeyMap. FC03 and FC04 are separate Modbus
  tables and never share a key; different unit IDs never coalesce;
  writes (FC06/FC16) bypass the coalescing path entirely.
- InFlightByKeyMap uses a simple lock around a Dictionary; atomic
  TryAttachOrCreate either appends a new party to the in-flight
  request's mutable List<InterestedParty> or invokes a factory to
  build a fresh entry. Per-entry MaxParties cap (default 32) bounds
  fan-out cost; past the cap, the next arrival opens a new entry.
- PlcMultiplexer.OnUpstreamFrameAsync takes the coalescing path for
  FC03/FC04 when Mbproxy.Resilience.ReadCoalescing.Enabled. The
  factory closure does the Phase-9 work (allocate TxId, add to
  CorrelationMap); the channel send happens AFTER returning from
  TryAttachOrCreate so the map lock is not held across the async send.
- Response fan-out in RunBackendReaderAsync removes the entry from
  InFlightByKeyMap before iterating InterestedParties, ensuring no
  concurrent attach can mutate the list during iteration.
- Cascade + watchdog paths also drain the key map so a stale entry
  cannot outlive its backend round-trip.

Counter accounting balance (per snapshot): CoalescedHitCount +
CoalescedMissCount equals total FC03 + FC04 requests since startup.
Even with coalescing disabled, every read still bumps Miss so dashboard
math stays balanced.

New surface (additive only):
- src/Mbproxy/Proxy/Multiplexing/CoalescingKey.cs
- src/Mbproxy/Proxy/Multiplexing/InFlightByKeyMap.cs
- src/Mbproxy/Proxy/Multiplexing/CoalescingLogEvents.cs
- ReadCoalescingOptions on ResilienceOptions
- CoalescedHitCount / CoalescedMissCount /
  CoalescedResponseToDeadUpstream counters surfaced on /status.json
  per PLC and as a compact "Coal" cell on the HTML status page.

Phase 9 test patch: TwoUpstreams_ProxyTxIds_AreDistinct_OnTheWire
previously read the same register from both clients (which now
coalesces). Patched to read two different addresses so the test still
proves distinct backend TxIds without violating the coalescing
contract.

Tests added: 24 new (19 unit + 5 E2E):
- CoalescingKeyTests (5)
- InFlightByKeyMapTests (6, includes concurrent stress)
- ReadCoalescingTests (8, stub-backend with deterministic delay)
- ReadCoalescingE2ETests (5, pymodbus simulator; coalescing-active
  during overlap is proven against the stub, not the sim, due to
  pymodbus 3.13's known concurrent-frame bug)

Total: 325 tests passing (282 unit + 43 E2E).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-14 02:26:06 -04:00

585 lines
24 KiB
C#

using System.Collections.Concurrent;
using System.Collections.Frozen;
using System.Net;
using System.Net.Sockets;
using Mbproxy.Bcd;
using Mbproxy.Options;
using Mbproxy.Proxy;
using Mbproxy.Proxy.Multiplexing;
using Microsoft.Extensions.Logging.Abstractions;
using Shouldly;
using Xunit;
namespace Mbproxy.Tests.Proxy.Multiplexing;
/// <summary>
/// Phase-10 unit tests for read coalescing against a stub backend (real sockets, no
/// simulator). The stub gives us deterministic control over backend response timing so
/// the "overlapping in-flight" window is large enough for late requests to actually
/// coalesce. The pymodbus simulator cannot be used here — its known concurrent-MBAP-frame
/// bug (see <see cref="MultiplexerE2ETests"/>) would invalidate the proxy-TxId echo path
/// that coalescing relies on.
/// </summary>
[Trait("Category", "Unit")]
public sealed class ReadCoalescingTests
{
// ── Frame builders / readers ─────────────────────────────────────────────────
private static int PickFreePort()
{
var l = new TcpListener(IPAddress.Loopback, 0);
l.Start();
int port = ((IPEndPoint)l.LocalEndpoint).Port;
l.Stop();
return port;
}
private static async Task<byte[]> ReadExactAsync(Socket s, int count, CancellationToken ct)
{
var buf = new byte[count];
int read = 0;
while (read < count)
{
int n = await s.ReceiveAsync(buf.AsMemory(read, count - read), SocketFlags.None, ct);
if (n == 0) throw new IOException("EOF");
read += n;
}
return buf;
}
private static async Task<byte[]> ReadOneFrameAsync(Socket s, CancellationToken ct)
{
var header = await ReadExactAsync(s, 7, ct);
ushort length = (ushort)((header[4] << 8) | header[5]);
int bodyLen = length - 1;
var body = bodyLen > 0 ? await ReadExactAsync(s, bodyLen, ct) : Array.Empty<byte>();
var frame = new byte[7 + bodyLen];
Buffer.BlockCopy(header, 0, frame, 0, 7);
if (bodyLen > 0) Buffer.BlockCopy(body, 0, frame, 7, bodyLen);
return frame;
}
private static byte[] BuildFc03(ushort txId, ushort start, ushort qty, byte unit = 1)
=> [
(byte)(txId >> 8), (byte)(txId & 0xFF),
0x00, 0x00,
0x00, 0x06,
unit, 0x03,
(byte)(start >> 8), (byte)(start & 0xFF),
(byte)(qty >> 8), (byte)(qty & 0xFF),
];
private static byte[] BuildFc04(ushort txId, ushort start, ushort qty, byte unit = 1)
=> [
(byte)(txId >> 8), (byte)(txId & 0xFF),
0x00, 0x00,
0x00, 0x06,
unit, 0x04,
(byte)(start >> 8), (byte)(start & 0xFF),
(byte)(qty >> 8), (byte)(qty & 0xFF),
];
private static byte[] BuildFc06(ushort txId, ushort addr, ushort value, byte unit = 1)
=> [
(byte)(txId >> 8), (byte)(txId & 0xFF),
0x00, 0x00,
0x00, 0x06,
unit, 0x06,
(byte)(addr >> 8), (byte)(addr & 0xFF),
(byte)(value >> 8), (byte)(value & 0xFF),
];
private static byte[] BuildFc03Response(ushort txId, byte unit, params ushort[] regs)
{
int bodyLen = 2 + regs.Length * 2;
var frame = new byte[7 + bodyLen];
frame[0] = (byte)(txId >> 8);
frame[1] = (byte)(txId & 0xFF);
frame[2] = 0; frame[3] = 0;
ushort len = (ushort)(1 + bodyLen);
frame[4] = (byte)(len >> 8);
frame[5] = (byte)(len & 0xFF);
frame[6] = unit;
frame[7] = 0x03;
frame[8] = (byte)(regs.Length * 2);
for (int i = 0; i < regs.Length; i++)
{
frame[9 + i * 2] = (byte)(regs[i] >> 8);
frame[9 + i * 2 + 1] = (byte)(regs[i] & 0xFF);
}
return frame;
}
private static byte[] BuildFc06Response(ushort txId, byte unit, ushort addr, ushort value)
{
var frame = new byte[12];
frame[0] = (byte)(txId >> 8);
frame[1] = (byte)(txId & 0xFF);
frame[2] = 0; frame[3] = 0;
frame[4] = 0; frame[5] = 6;
frame[6] = unit;
frame[7] = 0x06;
frame[8] = (byte)(addr >> 8); frame[9] = (byte)(addr & 0xFF);
frame[10] = (byte)(value >> 8); frame[11] = (byte)(value & 0xFF);
return frame;
}
// ── Holding-the-response stub backend ─────────────────────────────────────
/// <summary>
/// Stub backend that delays its response by <see cref="ResponseDelayMs"/>. The delay
/// gives the test a deterministic in-flight window so a second client's identical
/// request actually overlaps the first request's wire-time. Records every proxy TxId
/// it sees so the test can count distinct backend round-trips.
/// </summary>
private sealed class DelayedStubBackend : IAsyncDisposable
{
public int Port { get; }
public int ResponseDelayMs { get; set; } = 200;
public ConcurrentQueue<ushort> SeenProxyTxIds { get; } = new();
public int RequestCount => SeenProxyTxIds.Count;
private readonly TcpListener _listener;
private readonly CancellationTokenSource _cts = new();
private readonly List<Task> _clientTasks = new();
public DelayedStubBackend(int port)
{
Port = port;
_listener = new TcpListener(IPAddress.Loopback, port);
_listener.Start();
_ = AcceptLoop();
}
private async Task AcceptLoop()
{
try
{
while (!_cts.IsCancellationRequested)
{
Socket s = await _listener.AcceptSocketAsync(_cts.Token);
var t = Task.Run(() => HandleAsync(s));
lock (_clientTasks) _clientTasks.Add(t);
}
}
catch { }
}
private async Task HandleAsync(Socket s)
{
try
{
while (!_cts.IsCancellationRequested)
{
var req = await ReadOneFrameAsync(s, _cts.Token);
if (req.Length < 8) break;
ushort txId = (ushort)((req[0] << 8) | req[1]);
byte unit = req[6];
byte fc = req[7];
SeenProxyTxIds.Enqueue(txId);
// Schedule the response asynchronously so the next request (from a
// second client) can race onto the multiplexer while this one is
// still in flight.
_ = Task.Run(async () =>
{
try
{
await Task.Delay(ResponseDelayMs, _cts.Token);
byte[] response;
if (fc == 0x03 || fc == 0x04)
{
// Default register value 0x1234 (BCD 1234).
response = BuildFc03Response(txId, unit, 0x1234);
response[7] = fc; // restore actual FC byte
}
else if (fc == 0x06)
{
ushort addr = (ushort)((req[8] << 8) | req[9]);
ushort val = (ushort)((req[10] << 8) | req[11]);
response = BuildFc06Response(txId, unit, addr, val);
}
else { return; }
await s.SendAsync(response, SocketFlags.None, _cts.Token);
}
catch { }
});
}
}
catch { }
finally { try { s.Dispose(); } catch { } }
}
public async ValueTask DisposeAsync()
{
await _cts.CancelAsync();
try { _listener.Stop(); } catch { }
Task[] snap;
lock (_clientTasks) snap = _clientTasks.ToArray();
try { await Task.WhenAll(snap).WaitAsync(TimeSpan.FromSeconds(2)); } catch { }
_cts.Dispose();
}
}
// ── Mux construction / client helpers ────────────────────────────────────
private static PerPlcContext MakeContext(string name, params BcdTag[] tags)
{
var frozen = tags.ToDictionary(t => t.Address).ToFrozenDictionary();
var map = frozen.Count > 0 ? new BcdTagMap(frozen) : BcdTagMap.Empty;
return new PerPlcContext
{
PlcName = name,
TagMap = map,
Counters = new ProxyCounters(),
Logger = NullLogger.Instance,
};
}
private static PlcMultiplexer BuildMux(
PlcOptions plc,
ConnectionOptions connOpts,
PerPlcContext ctx,
ReadCoalescingOptions coalescing)
{
return new PlcMultiplexer(
plc, connOpts,
new BcdPduPipeline(),
ctx,
NullLogger<PlcMultiplexer>.Instance,
backendConnectPipeline: null,
coalescingOptions: () => coalescing);
}
private static async Task<(Socket client, UpstreamPipe pipe, TcpListener proxyListener)>
ConnectClientAsync(PlcMultiplexer mux, string plcName)
{
int proxyPort = PickFreePort();
var proxyListener = new TcpListener(IPAddress.Loopback, proxyPort);
proxyListener.Start();
var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)
{ NoDelay = true };
await client.ConnectAsync(IPAddress.Loopback, proxyPort);
var upstream = await proxyListener.AcceptSocketAsync();
var pipe = new UpstreamPipe(upstream, plcName, NullLogger.Instance);
_ = Task.Run(() => mux.StartPipeAsync(pipe, CancellationToken.None));
return (client, pipe, proxyListener);
}
// ── Tests ────────────────────────────────────────────────────────────────
[Fact]
public async Task TwoClients_SameRequest_OnlyOneBackendRoundTrip()
{
int backendPort = PickFreePort();
await using var backend = new DelayedStubBackend(backendPort) { ResponseDelayMs = 300 };
var ctx = MakeContext("PLC1");
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
await using var mux = BuildMux(plc, new ConnectionOptions(),
ctx, new ReadCoalescingOptions { Enabled = true, MaxParties = 32 });
var (c1, p1, l1) = await ConnectClientAsync(mux, plc.Name);
var (c2, p2, l2) = await ConnectClientAsync(mux, plc.Name);
try
{
// First client opens the in-flight entry; small gap lets the multiplexer enqueue
// before the second arrives. The 300 ms delay then gives the second client
// ample window to coalesce onto the first.
await c1.SendAsync(BuildFc03(0x0001, 100, 1), SocketFlags.None);
await Task.Delay(80, TestContext.Current.CancellationToken);
await c2.SendAsync(BuildFc03(0x0002, 100, 1), SocketFlags.None);
var r1 = await ReadOneFrameAsync(c1, TestContext.Current.CancellationToken);
var r2 = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken);
((ushort)((r1[0] << 8) | r1[1])).ShouldBe((ushort)0x0001);
((ushort)((r2[0] << 8) | r2[1])).ShouldBe((ushort)0x0002);
backend.RequestCount.ShouldBe(1, "exactly one backend round-trip must service both clients");
ctx.Counters.Snapshot().CoalescedHitCount.ShouldBe(1);
ctx.Counters.Snapshot().CoalescedMissCount.ShouldBe(1);
}
finally
{
c1.Dispose(); c2.Dispose();
await p1.DisposeAsync(); await p2.DisposeAsync();
l1.Stop(); l2.Stop();
}
}
[Fact]
public async Task TwoClients_DifferentRequests_BothHitBackend()
{
int backendPort = PickFreePort();
await using var backend = new DelayedStubBackend(backendPort) { ResponseDelayMs = 50 };
var ctx = MakeContext("PLC1");
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
await using var mux = BuildMux(plc, new ConnectionOptions(),
ctx, new ReadCoalescingOptions { Enabled = true, MaxParties = 32 });
var (c1, p1, l1) = await ConnectClientAsync(mux, plc.Name);
var (c2, p2, l2) = await ConnectClientAsync(mux, plc.Name);
try
{
// Different start addresses → different keys → no coalescing.
await c1.SendAsync(BuildFc03(0x0001, 100, 1), SocketFlags.None);
await c2.SendAsync(BuildFc03(0x0002, 200, 1), SocketFlags.None);
_ = await ReadOneFrameAsync(c1, TestContext.Current.CancellationToken);
_ = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken);
backend.RequestCount.ShouldBe(2, "two distinct keys must produce two backend round-trips");
ctx.Counters.Snapshot().CoalescedHitCount.ShouldBe(0);
ctx.Counters.Snapshot().CoalescedMissCount.ShouldBe(2);
}
finally
{
c1.Dispose(); c2.Dispose();
await p1.DisposeAsync(); await p2.DisposeAsync();
l1.Stop(); l2.Stop();
}
}
[Fact]
public async Task FiveClients_SameRequest_OneBackendRoundTrip_FiveResponses()
{
int backendPort = PickFreePort();
await using var backend = new DelayedStubBackend(backendPort) { ResponseDelayMs = 400 };
var ctx = MakeContext("PLC1");
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
await using var mux = BuildMux(plc, new ConnectionOptions(),
ctx, new ReadCoalescingOptions { Enabled = true, MaxParties = 32 });
var sockets = new List<Socket>();
var pipes = new List<UpstreamPipe>();
var lists = new List<TcpListener>();
try
{
for (int i = 0; i < 5; i++)
{
var (c, p, l) = await ConnectClientAsync(mux, plc.Name);
sockets.Add(c); pipes.Add(p); lists.Add(l);
}
// First client opens; the rest race in during the 400 ms window.
await sockets[0].SendAsync(BuildFc03((ushort)1, 100, 1), SocketFlags.None);
await Task.Delay(60, TestContext.Current.CancellationToken);
for (int i = 1; i < sockets.Count; i++)
await sockets[i].SendAsync(BuildFc03((ushort)(i + 1), 100, 1), SocketFlags.None);
// Read back every client's response.
for (int i = 0; i < sockets.Count; i++)
{
var rsp = await ReadOneFrameAsync(sockets[i], TestContext.Current.CancellationToken);
((ushort)((rsp[0] << 8) | rsp[1])).ShouldBe((ushort)(i + 1),
$"client #{i} must see its own original TxId restored");
}
backend.RequestCount.ShouldBeLessThanOrEqualTo(2,
"at most 2 backend round-trips (one for the leader, one for any racy first-arrival)");
ctx.Counters.Snapshot().CoalescedHitCount.ShouldBeGreaterThanOrEqualTo(3,
"at least 3 of the 5 clients should have coalesced");
}
finally
{
foreach (var s in sockets) s.Dispose();
foreach (var p in pipes) await p.DisposeAsync();
foreach (var l in lists) l.Stop();
}
}
[Fact]
public async Task FC03_And_FC04_SameAddress_NOT_Coalesced()
{
int backendPort = PickFreePort();
await using var backend = new DelayedStubBackend(backendPort) { ResponseDelayMs = 200 };
var ctx = MakeContext("PLC1");
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
await using var mux = BuildMux(plc, new ConnectionOptions(),
ctx, new ReadCoalescingOptions { Enabled = true, MaxParties = 32 });
var (c1, p1, l1) = await ConnectClientAsync(mux, plc.Name);
var (c2, p2, l2) = await ConnectClientAsync(mux, plc.Name);
try
{
// FC03 vs FC04 — different Modbus tables, never coalesce.
await c1.SendAsync(BuildFc03(0x0001, 100, 1), SocketFlags.None);
await c2.SendAsync(BuildFc04(0x0002, 100, 1), SocketFlags.None);
_ = await ReadOneFrameAsync(c1, TestContext.Current.CancellationToken);
_ = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken);
backend.RequestCount.ShouldBe(2, "FC03 and FC04 must never share a backend round-trip");
ctx.Counters.Snapshot().CoalescedHitCount.ShouldBe(0);
}
finally
{
c1.Dispose(); c2.Dispose();
await p1.DisposeAsync(); await p2.DisposeAsync();
l1.Stop(); l2.Stop();
}
}
[Fact]
public async Task FC06_Write_NeverCoalesced()
{
int backendPort = PickFreePort();
await using var backend = new DelayedStubBackend(backendPort) { ResponseDelayMs = 100 };
var ctx = MakeContext("PLC1");
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
await using var mux = BuildMux(plc, new ConnectionOptions(),
ctx, new ReadCoalescingOptions { Enabled = true, MaxParties = 32 });
var (c1, p1, l1) = await ConnectClientAsync(mux, plc.Name);
var (c2, p2, l2) = await ConnectClientAsync(mux, plc.Name);
try
{
// Two identical FC06 writes — writes must never coalesce (non-idempotent).
await c1.SendAsync(BuildFc06(0x0001, 200, 1234), SocketFlags.None);
await c2.SendAsync(BuildFc06(0x0002, 200, 1234), SocketFlags.None);
_ = await ReadOneFrameAsync(c1, TestContext.Current.CancellationToken);
_ = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken);
backend.RequestCount.ShouldBe(2, "FC06 writes must always hit the backend separately");
ctx.Counters.Snapshot().CoalescedHitCount.ShouldBe(0,
"writes are never counted as coalescing hits");
ctx.Counters.Snapshot().CoalescedMissCount.ShouldBe(0,
"writes are not part of the coalescing accounting");
}
finally
{
c1.Dispose(); c2.Dispose();
await p1.DisposeAsync(); await p2.DisposeAsync();
l1.Stop(); l2.Stop();
}
}
[Fact]
public async Task OneClient_DisconnectsMidFlight_OthersStillGetResponse_AndDeadUpstreamCounterIncrements()
{
int backendPort = PickFreePort();
await using var backend = new DelayedStubBackend(backendPort) { ResponseDelayMs = 400 };
var ctx = MakeContext("PLC1");
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
await using var mux = BuildMux(plc, new ConnectionOptions(),
ctx, new ReadCoalescingOptions { Enabled = true, MaxParties = 32 });
var (c1, p1, l1) = await ConnectClientAsync(mux, plc.Name);
var (c2, p2, l2) = await ConnectClientAsync(mux, plc.Name);
try
{
await c1.SendAsync(BuildFc03(0x0001, 100, 1), SocketFlags.None);
await Task.Delay(60, TestContext.Current.CancellationToken);
await c2.SendAsync(BuildFc03(0x0002, 100, 1), SocketFlags.None);
await Task.Delay(60, TestContext.Current.CancellationToken);
// Drop client 1 mid-flight (before the backend response arrives).
c1.Dispose();
await p1.DisposeAsync();
// Client 2 must still get its response.
var r2 = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken);
((ushort)((r2[0] << 8) | r2[1])).ShouldBe((ushort)0x0002);
// Give fan-out a beat to record the dead-upstream skip on the c1 side.
await Task.Delay(100, TestContext.Current.CancellationToken);
ctx.Counters.Snapshot().CoalescedResponseToDeadUpstream.ShouldBeGreaterThanOrEqualTo(1,
"the disconnected client's fan-out slot must increment the dead-upstream counter");
}
finally
{
c2.Dispose();
await p2.DisposeAsync();
l1.Stop(); l2.Stop();
}
}
[Fact]
public async Task AtMaxParties_NextRequest_StartsFreshBackendRoundTrip()
{
int backendPort = PickFreePort();
await using var backend = new DelayedStubBackend(backendPort) { ResponseDelayMs = 400 };
var ctx = MakeContext("PLC1");
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
// MaxParties = 2 forces the third identical request to open a fresh entry.
await using var mux = BuildMux(plc, new ConnectionOptions(),
ctx, new ReadCoalescingOptions { Enabled = true, MaxParties = 2 });
var (c1, p1, l1) = await ConnectClientAsync(mux, plc.Name);
var (c2, p2, l2) = await ConnectClientAsync(mux, plc.Name);
var (c3, p3, l3) = await ConnectClientAsync(mux, plc.Name);
try
{
await c1.SendAsync(BuildFc03(0x0001, 100, 1), SocketFlags.None);
await Task.Delay(50, TestContext.Current.CancellationToken);
await c2.SendAsync(BuildFc03(0x0002, 100, 1), SocketFlags.None);
await Task.Delay(50, TestContext.Current.CancellationToken);
await c3.SendAsync(BuildFc03(0x0003, 100, 1), SocketFlags.None);
_ = await ReadOneFrameAsync(c1, TestContext.Current.CancellationToken);
_ = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken);
_ = await ReadOneFrameAsync(c3, TestContext.Current.CancellationToken);
backend.RequestCount.ShouldBe(2,
"MaxParties=2 caps the first entry at 2; the third request opens its own round-trip");
ctx.Counters.Snapshot().CoalescedHitCount.ShouldBe(1, "exactly one party joined the leader");
ctx.Counters.Snapshot().CoalescedMissCount.ShouldBe(2,
"the leader and the overflow are both misses");
}
finally
{
c1.Dispose(); c2.Dispose(); c3.Dispose();
await p1.DisposeAsync(); await p2.DisposeAsync(); await p3.DisposeAsync();
l1.Stop(); l2.Stop(); l3.Stop();
}
}
[Fact]
public async Task CoalescingDisabled_TwoIdenticalReads_BothHitBackend()
{
// Sanity: with Enabled=false the multiplexer takes the Phase-9 path for every
// FC03/FC04 request. Both identical reads must produce a backend round-trip and
// every request counts as a Miss (Hit + Miss = total FC03/FC04 invariant).
int backendPort = PickFreePort();
await using var backend = new DelayedStubBackend(backendPort) { ResponseDelayMs = 50 };
var ctx = MakeContext("PLC1");
var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort };
await using var mux = BuildMux(plc, new ConnectionOptions(),
ctx, new ReadCoalescingOptions { Enabled = false, MaxParties = 32 });
var (c1, p1, l1) = await ConnectClientAsync(mux, plc.Name);
var (c2, p2, l2) = await ConnectClientAsync(mux, plc.Name);
try
{
await c1.SendAsync(BuildFc03(0x0001, 100, 1), SocketFlags.None);
await c2.SendAsync(BuildFc03(0x0002, 100, 1), SocketFlags.None);
_ = await ReadOneFrameAsync(c1, TestContext.Current.CancellationToken);
_ = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken);
backend.RequestCount.ShouldBe(2, "coalescing disabled: each identical read must hit the backend");
ctx.Counters.Snapshot().CoalescedHitCount.ShouldBe(0);
ctx.Counters.Snapshot().CoalescedMissCount.ShouldBe(2, "every FC03 request still counts as a Miss");
}
finally
{
c1.Dispose(); c2.Dispose();
await p1.DisposeAsync(); await p2.DisposeAsync();
l1.Stop(); l2.Stop();
}
}
}