diff --git a/mbproxy/CLAUDE.md b/mbproxy/CLAUDE.md index 3c1ba65..58dc554 100644 --- a/mbproxy/CLAUDE.md +++ b/mbproxy/CLAUDE.md @@ -25,19 +25,20 @@ The full design plan is in **[`docs/design.md`](docs/design.md)** — settled 20 - **One `TcpListener` per PLC** (54 distinct ports). Each PLC has **one shared backend socket** owned by a `PlcMultiplexer`; many upstream clients are multiplexed onto that single backend via MBAP TxId rewriting (Phase 9). The H2-ECOM100's 4-client cap no longer caps upstream connections. - **Transparent pass-through** of every byte except the MBAP TxId field (rewritten by the multiplexer on each request and restored on each response) and FC03/FC04 response payloads + FC06/FC16 request payloads at configured BCD addresses (re-encoded between BCD nibbles and binary integers). +- **In-flight FC03/FC04 read coalescing** (Phase 10): same-key reads arriving while a peer is in flight attach to the existing `InFlightRequest.InterestedParties` list; the single backend response fans out to every attached client with original TxIds restored. Zero post-response staleness — coalescing entries die when the response arrives. Hot-reload via `Mbproxy.Resilience.ReadCoalescing.Enabled`. - **Polly-backed listener supervisor** auto-recovers any listener that fails to bind at startup or faults at runtime; the same code path also brings up newly-added PLCs from hot-reload and tears down removed ones. - **`appsettings.json` is hot-reloadable** via `IOptionsMonitor`; tag-list changes propagate per-PDU, PLC add/remove flows through the supervisor. - **Polly bounded retries** on backend connect (3 attempts at 100ms / 500ms / 2000ms). No retries on mid-request failures (FC06/FC16 are non-idempotent on BCD tags). A per-request watchdog in the multiplexer surfaces Modbus exception 0x0B to the upstream client if a backend response never arrives within `BackendRequestTimeoutMs`. - **Backend disconnect cascades upstream**: when the shared backend socket dies, every attached upstream pipe is closed in the same cycle (counter `BackendDisconnectCascades`); clients reconnect on their next request. -- **Read-only Kestrel admin port** (default 8080) exposes `GET /` (auto-refreshing HTML) and `GET /status.json` with service-wide and per-PLC counters (including Phase-9 mux fields `inFlight`, `maxInFlight`, `txIdWraps`, `disconnectCascades`, `queueDepth`). +- **Read-only Kestrel admin port** (default 8080) exposes `GET /` (auto-refreshing HTML) and `GET /status.json` with service-wide and per-PLC counters (including Phase-9 mux fields `inFlight`, `maxInFlight`, `txIdWraps`, `disconnectCascades`, `queueDepth` and Phase-10 coalescing fields `coalescedHitCount`, `coalescedMissCount`, `coalescedResponseToDeadUpstream`). Anything beyond this short list — JSON schema, propagation table, stable log event names, status counter catalog, test plan — lives in `docs/design.md`. Open that doc before writing code; keep it in sync when decisions change. ## Current state -**Implementation complete through Phase 9.** Phases 00–08 shipped the production-ready 1:1-model service; Phase 9 swapped the connection layer for the TxId-multiplexed model without changing the transparent-rewrite contract. The service is production-ready as a Windows Service: +**Implementation complete through Phase 10.** Phases 00–08 shipped the production-ready 1:1-model service; Phase 9 swapped the connection layer for the TxId-multiplexed model without changing the transparent-rewrite contract; Phase 10 added in-flight read coalescing as an additive optimization on top of the multiplexer. The service is production-ready as a Windows Service: -- 301 tests passing: 263 unit tests + 38 E2E tests (against the pymodbus DL205 simulator + stub backends). +- 325 tests passing: 282 unit tests + 43 E2E tests (against the pymodbus DL205 simulator + stub backends). - Single-file self-contained publish (`dotnet publish -c Release -r win-x64`). - PowerShell install/uninstall scripts under `install/`. - Graceful shutdown with configurable drain timeout (`Connection.GracefulShutdownTimeoutMs`, default 10 s). diff --git a/mbproxy/README.md b/mbproxy/README.md index 8c528c0..9a8c5f4 100644 --- a/mbproxy/README.md +++ b/mbproxy/README.md @@ -14,7 +14,7 @@ A .NET 10 Windows Service that sits inline as a Modbus TCP proxy in front of a f ``` src/Mbproxy/ Main C# project (net10.0, Microsoft.NET.Sdk.Worker) -tests/Mbproxy.Tests/ xUnit v3 test project (234 unit + 34 E2E tests) +tests/Mbproxy.Tests/ xUnit v3 test project (282 unit + 43 E2E tests) install/ PowerShell install/uninstall scripts and config template docs/ Design document, phase plans, and operations runbook DL260/ DL205/DL260 reference material and pymodbus simulator profile diff --git a/mbproxy/docs/design.md b/mbproxy/docs/design.md index 98f3d0c..a586260 100644 --- a/mbproxy/docs/design.md +++ b/mbproxy/docs/design.md @@ -115,14 +115,28 @@ public sealed record BcdTag(ushort Address, byte Width); // Width ∈ { 16, 32 - **Unsigned only.** DL205/DL260 BCD is non-negative in the default ladder pattern; the proxy does not implement signed BCD. - **Holding-register and input-register addresses share the same space.** The rewriter applies the configured tag list against both FC03 and FC04 reads. +## Read coalescing (Phase 10) + +After Phase 10, FC03 / FC04 requests are additionally subject to **in-flight read coalescing** before they reach the backend. When two or more upstream clients send the same `(unitId, fc, startAddress, qty)` tuple within the in-flight window of an already-routed request, the multiplexer attaches each late arrival to the existing `InFlightRequest.InterestedParties` list instead of opening a second backend round-trip. The single backend response is fanned out to every attached party with each party's original MBAP TxId restored individually. + +Properties: + +- **Zero post-response staleness.** Coalescing operates entirely between "first request sent to backend" and "response received from backend" (microseconds to ~10 ms typical). Once the response is fanned out, the coalescing entry dies. The proxy is NOT a cache layer — the value each upstream sees is the same value an uncoalesced request would have returned within the PLC's scan-time precision. +- **Only FC03 / FC04.** Writes (FC06 / FC16) are non-idempotent on BCD tags and never coalesced. Different function codes never share a `CoalescingKey` even at the same address (FC03 and FC04 read different Modbus tables). Different `unitId` bytes never coalesce (different PLC personalities behind a shared socket). +- **Bounded fan-out via `MaxParties`** (default 32 in `Mbproxy.Resilience.ReadCoalescing.MaxParties`). Once an entry has `MaxParties` interested clients, the next arrival opens a fresh entry — bounds the response-fanout cost per entry at O(MaxParties) and shields the backend reader task from pathological pile-on. +- **Hot-reloadable on/off.** `Mbproxy.Resilience.ReadCoalescing.Enabled` defaults to `true`. Flipping it to `false` at runtime leaves running coalesced entries to drain naturally; subsequent FC03/04 requests take the Phase-9 (one round-trip per upstream request) path. +- **Transparency contract preserved.** Each upstream client still sees its own original MBAP TxId on the response. The BCD rewriter runs once on the shared response buffer; per-party copies are only made when fan-out has more than one party. + +Counter accounting balance (per snapshot): `coalescedHitCount + coalescedMissCount` equals the total FC03 + FC04 requests seen since the multiplexer was constructed. Both counters increment regardless of whether the coalescing feature is enabled — `coalescedHitCount` is 0 when disabled, but every read still increments `coalescedMissCount`. + ## Rewriter — function code scope The rewriter inspects and rewrites payloads only for these function codes; every other FC (coils, discrete inputs, diagnostics, exception responses) passes through byte-for-byte: | FC | Direction | Action | |----|----------------|-----------------------------------------------------------------------| -| 03 | response | Re-encode covered BCD slots from raw nibbles → binary integer | -| 04 | response | Same as FC03 (input-register table also surfaces V-memory) | +| 03 | request + response | FC03 requests may be coalesced with peers before reaching the backend (see Phase-10 section above); response re-encodes covered BCD slots from raw nibbles → binary integer | +| 04 | request + response | Same coalescing eligibility as FC03; response re-encoding the same as FC03 (input-register table also surfaces V-memory) | | 06 | request | Re-encode binary integer → BCD nibbles before forwarding | | 06 | response | Decode BCD nibbles → binary integer on the echo (clients validate that the echoed value equals the value they sent; without this, NModbus-style clients throw on the round-trip) | | 16 | request | Per-register over the configured slots, then forward | @@ -176,6 +190,9 @@ Stable event names (keep these stable so log queries don't churn): | `mbproxy.multiplex.backend.disconnected` | Warning | `Plc`, `UpstreamCount`, `InFlightCount`, `Reason` | | `mbproxy.multiplex.saturated` | Error | `Plc`, `RemoteEp` (16-bit TxId space full) | | `mbproxy.multiplex.request.timeout` | Warning | `Plc`, `ProxyTxId`, `OriginalTxId`, `Fc`, `ElapsedMs` | +| `mbproxy.coalesce.hit` | Debug | `Plc`, `UnitId`, `Fc`, `Start`, `Qty`, `PartyCount` | +| `mbproxy.coalesce.miss` | Debug | `Plc`, `UnitId`, `Fc`, `Start`, `Qty` | +| `mbproxy.coalesce.dead_upstream` | Debug | `Plc`, `UnitId`, `Fc`, `Start`, `Qty` | ## Status page — read-only HTTP endpoint @@ -214,6 +231,9 @@ Authentication is assumed to live at the network layer (trusted internal segment | `backend.connects.success` / `backend.connects.failed` | Polly-final-result counters | | `backend.exceptions.byCode` | `{ "01": n, "02": n, "03": n, "04": n }` | | `backend.lastRoundTripMs` | EWMA of recent successful round-trip times | +| `backend.coalescedHitCount` | FC03/04 requests that attached to an already-in-flight peer (Phase 10) | +| `backend.coalescedMissCount` | FC03/04 requests that opened a fresh backend round-trip (Phase 10). `Hit + Miss` = total FC03/04 requests | +| `backend.coalescedResponseToDeadUpstream` | Coalesced fan-out responses skipped because the attached upstream had already disconnected (Phase 10) | | `bytes.upstreamIn` / `bytes.upstreamOut` | Bytes forwarded each direction | Counters are `System.Threading.Interlocked` longs read atomically per request; no locking on the read path. diff --git a/mbproxy/docs/kpi.md b/mbproxy/docs/kpi.md index 120ade7..1729036 100644 --- a/mbproxy/docs/kpi.md +++ b/mbproxy/docs/kpi.md @@ -23,7 +23,7 @@ For context — every recommended addition below is *in addition to* this list. | Per-PLC listener | `state`, `lastBindError`, `recoveryAttempts` | | Per-PLC clients | `connected`, `remoteEndpoints[]` (remote, connectedAtUtc, pdusForwarded) | | Per-PLC PDUs | `forwarded`, `byFc.{fc03,fc04,fc06,fc16,other}`, `rewrittenSlots`, `partialBcdWarnings` | -| Per-PLC backend | `connectsSuccess`, `connectsFailed`, `exceptionsByCode.{code01..code04}`, `lastRoundTripMs` | +| Per-PLC backend | `connectsSuccess`, `connectsFailed`, `exceptionsByCode.{code01..code04}`, `lastRoundTripMs`, `inFlight`, `maxInFlight`, `txIdWraps`, `disconnectCascades`, `queueDepth`, `coalescedHitCount`, `coalescedMissCount`, `coalescedResponseToDeadUpstream` | | Per-PLC bytes | `upstreamIn`, `upstreamOut` | Counters are **cumulative since process start**. A restart resets them. @@ -112,16 +112,16 @@ The proxy holds one backend socket per PLC and multiplexes upstream clients via **Why this matters.** Multiplexing concentrates connection risk: a single backend disconnect now cascades to every attached upstream client. The cascade counter quantifies that blast radius. Queue depth is the new latency leading indicator (today's `lastRoundTripMs` measures wire latency only; queue depth reveals proxy-side backlog). -### 1.7 Read coalescing — **[requires Phase 10](plan/10-read-coalescing.md)** +### 1.7 Read coalescing — **shipped in [Phase 10](plan/10-read-coalescing.md)** -After Phase 10 ships, same-key FC03/04 reads within the in-flight window attach to one another instead of generating duplicate backend requests. The coalescing ratio is the headline metric. +Same-key FC03/04 reads within the in-flight window attach to one another instead of generating duplicate backend requests. The coalescing ratio is the headline metric. `coalescedHitCount + coalescedMissCount` equals total FC03/04 request count per snapshot — the math always balances. | KPI | Definition | Source | Widget | Alert | Effort | |-----|------------|--------|--------|-------|--------| | `backend.coalescedHitCount` | FC03/04 requests attached to an already-in-flight peer | Phase-10 counter | Sparkline | None — trend-watch | (in Phase 10 scope) | | `backend.coalescedMissCount` | FC03/04 requests that created a fresh backend round-trip | Phase-10 counter | Sparkline | None — trend-watch | (in Phase 10 scope) | | `backend.coalescingRatio` | `Hit / (Hit + Miss)` over the trailing window | Derived (dashboard) | Stat tile per PLC | None; a low ratio just means clients aren't synchronised on the same registers — informational | (in Phase 10 scope) | -| `backend.coalescedResponseToDeadUpstream` | Fan-out responses dropped because the attached upstream disconnected mid-flight | Phase-10 counter | Stat tile per PLC | Spike → client churn during traffic burst; usually not actionable | (in Phase 10 scope) | +| `backend.coalescedResponseToDeadUpstream` | Fan-out responses dropped because the attached upstream disconnected mid-flight | Phase-10 counter | Stat tile per PLC | Spike → client churn during traffic burst; usually not actionable (Tier 2 priority) | (in Phase 10 scope) | **Why this matters.** Coalescing-ratio is the "how much PLC traffic did we save" metric. A 60% ratio means 60% of FC03/04 reads landed on an existing in-flight request — that's roughly 60% reduction in backend PDU rate vs the pre-Phase-10 model. The dead-upstream counter is a churn indicator that's invisible in any other metric. diff --git a/mbproxy/docs/plan/10-read-coalescing.md b/mbproxy/docs/plan/10-read-coalescing.md index 7906a05..dcb1fda 100644 --- a/mbproxy/docs/plan/10-read-coalescing.md +++ b/mbproxy/docs/plan/10-read-coalescing.md @@ -2,7 +2,7 @@ When two or more upstream clients send the same FC03/FC04 request to the same PLC while a matching request is already in flight, attach the late arrivals to the existing in-flight entry and fan out the single backend response to all attached clients. Operates entirely within the in-flight window (microseconds to ~10 ms typical) — no post-response caching, no TTL, no staleness contract change. -**Status:** post-1.0 follow-on, depends on Phase 9. +**Status:** shipped (2026-05-14). All gate items green. **Depends on:** Phase 09 (multiplexer + `InFlightRequest` with `InterestedParties` list shape). **Parallel-safe with:** nothing. The phase modifies `PlcMultiplexer.OnFrame` and the backend reader fan-out path; both are tightly coupled. @@ -306,3 +306,21 @@ If you're the agent picking up this phase: - KPI graduation target: [`../kpi.md`](../kpi.md) → Tier 1 (rates / percentiles / availability — coalescing-ratio joins this tier). - Modbus unit-ID semantics that make coalescing-key uniqueness load-bearing: [`../../DL260/dl205.md`](../../DL260/dl205.md) → "Function Code Support" and "Coils and Discrete Inputs". - Counter snapshot backwards-compat policy that this phase respects (additive only): [`../kpi.md`](../kpi.md) → "Backwards-compat policy". + +## Clarifications discovered during implementation + +These are the implementation details that the original phase doc did not pin down; recorded here so the next reader doesn't relearn them. + +1. **`InterestedParties` is a `List` cast to `IReadOnlyList`.** Phase 9 typed the field as `IReadOnlyList` to leave room for any implementation; Phase 10 specifically requires a mutable list so the map can append parties under its lock. The list is mutated only under `InFlightByKeyMap`'s lock, and the reader's fan-out iterates the list ONLY after the entry has been removed from the map — by that point no further appends are possible. There is no separate snapshot copy. + +2. **The factory closure performs the Phase-9 work (allocate TxId + add to CorrelationMap) but does NOT enqueue to the outbound channel.** The channel send happens AFTER returning from `TryAttachOrCreate` so the InFlightByKey lock is not held across a potentially-async send. The factory communicates its allocated proxy TxId and InFlightRequest back to the caller through closure-captured locals. If the allocator is saturated, the factory returns a "stub" InFlightRequest with no CorrelationMap entry; the caller detects this and delivers a Modbus exception 04. + +3. **`coalescedHitCount + coalescedMissCount` = total FC03/FC04 requests (always).** Even when coalescing is disabled, every FC03/04 request bumps `coalescedMissCount` from the non-coalescing path. This keeps the math balanced for dashboard consumers regardless of feature state. Writes (FC06/FC16) are NOT in this accounting — they never touch the coalescing path. + +4. **Cascade and watchdog paths drain `InFlightByKeyMap` too.** On backend disconnect, `TearDownBackendAsync` calls `_inFlightByKey.DrainAll()` so a brand-new identical request through the freshly-reconnected backend is treated as a miss. On per-request watchdog timeout, `_inFlightByKey.TryRemove(key)` runs alongside the CorrelationMap removal so subsequent identical requests start fresh. + +5. **Live config accessor, not `IOptionsMonitor`-by-value.** The multiplexer takes a `Func` accessor that resolves to `optionsMonitor.CurrentValue.Resilience.ReadCoalescing` per PDU. This keeps the constructor surface lightweight (no DI on `IOptionsMonitor`) and gives tests a clean way to pin a fixed config. Hot-reload of `Enabled` propagates because the accessor is read on every incoming FC03/FC04 request. + +6. **Phase 9's `TwoUpstreams_ProxyTxIds_AreDistinct_OnTheWire` test required a one-line edit.** It asserted ≥2 distinct backend TxIds from two identical FC03 reads — exactly the case Phase 10 now coalesces. The test was patched to use DIFFERENT start addresses so the two reads remain non-coalescable while still proving distinct proxy TxIds. The rest of Phase 9's tests are unaffected. + +7. **pymodbus simulator and coalescing.** The simulator's `last_pdu`-overwrite bug (documented in design.md) means we cannot E2E-verify "five concurrent identical reads → 1 backend round-trip" against pymodbus. The headline-stress correctness claim is therefore proven against the stub backend in `ReadCoalescingTests` (real loopback sockets, deterministic 200–400 ms response delay so the in-flight window is wide enough for racing requests to actually overlap). The E2E suite verifies counter accounting, status-page surfacing, and the rewriter integration on serialised reads — i.e. the integration boundary, not the concurrency proof. diff --git a/mbproxy/install/mbproxy.config.template.json b/mbproxy/install/mbproxy.config.template.json index 766f7d2..fb0da21 100644 --- a/mbproxy/install/mbproxy.config.template.json +++ b/mbproxy/install/mbproxy.config.template.json @@ -115,6 +115,33 @@ "ListenerRecovery": { "InitialBackoffMs": [ 1000, 2000, 5000, 15000, 30000 ], "SteadyStateMs": 30000 + }, + + // Phase 10 — in-flight read coalescing. + // + // When two or more upstream clients (HMI / historian / engineering workstation / + // gateway) issue the SAME FC03 or FC04 read while a matching backend round-trip is + // already in flight, the proxy attaches the late arrivals to the existing in-flight + // entry and fans the single PLC response out to every attached client — saving the + // ECOM's per-scan PDU budget on duplicated reads. + // + // Zero post-response staleness: coalescing operates ONLY between "first request + // sent to PLC" and "response received from PLC" (microseconds to ~10 ms typical). + // Each upstream client still sees its own MBAP transaction ID echoed correctly; + // the proxy is transparent. + // + // FC06 / FC16 writes are NEVER coalesced (non-idempotent). FC03 vs FC04 are + // separate Modbus tables and never share a coalescing key. Different unit IDs + // (multi-drop / gateway-backed setups) never coalesce. + // + // Enabled — master switch. Hot-reloadable; flipping to false leaves running + // coalesced entries to drain naturally. + // MaxParties — per-entry cap on attached parties. Past the cap, the next + // identical request opens a fresh backend round-trip (load-shedding + // safety valve for very fan-out-heavy fleets). + "ReadCoalescing": { + "Enabled": true, + "MaxParties": 32 } } }, diff --git a/mbproxy/src/Mbproxy/Admin/StatusDto.cs b/mbproxy/src/Mbproxy/Admin/StatusDto.cs index 96609db..501cbd2 100644 --- a/mbproxy/src/Mbproxy/Admin/StatusDto.cs +++ b/mbproxy/src/Mbproxy/Admin/StatusDto.cs @@ -71,7 +71,10 @@ public sealed record FcCounts( /// /// Backend connect, exception, and multiplexer telemetry. Phase 9 added /// InFlight, MaxInFlight, TxIdWraps, DisconnectCascades, and -/// QueueDepth to surface the live state of the per-PLC TxId-multiplexed connection. +/// QueueDepth. Phase 10 added the three coalescing counters +/// (CoalescedHitCount, CoalescedMissCount, CoalescedResponseToDeadUpstream); +/// the dashboard-side derived coalescingRatio is intentionally NOT carried on the wire +/// — consumers compute Hit / (Hit + Miss). /// public sealed record PlcBackendStatus( long ConnectsSuccess, @@ -82,7 +85,10 @@ public sealed record PlcBackendStatus( long MaxInFlight, long TxIdWraps, long DisconnectCascades, - long QueueDepth); + long QueueDepth, + long CoalescedHitCount, + long CoalescedMissCount, + long CoalescedResponseToDeadUpstream); /// Modbus exception counts by code. public sealed record ExceptionCounts( diff --git a/mbproxy/src/Mbproxy/Admin/StatusHtmlRenderer.cs b/mbproxy/src/Mbproxy/Admin/StatusHtmlRenderer.cs index 5cd509e..eb88597 100644 --- a/mbproxy/src/Mbproxy/Admin/StatusHtmlRenderer.cs +++ b/mbproxy/src/Mbproxy/Admin/StatusHtmlRenderer.cs @@ -80,6 +80,10 @@ internal static class StatusHtmlRenderer // Phase 9: multiplexer telemetry columns. sb.Append("In-flightMax in-flightTxId wraps"); sb.Append("CascadesQueue"); + // Phase 10: coalescing column. Single cell carries hit / (hit + miss) ratio as + // a percentage plus the raw hit count for context. Kept compact (one cell) to + // stay under the 50 KB page-weight budget. + sb.Append("Coal"); sb.Append(""); foreach (var plc in status.Plcs) @@ -146,6 +150,21 @@ internal static class StatusHtmlRenderer sb.Append("").Append(plc.Backend.TxIdWraps).Append(""); sb.Append("").Append(plc.Backend.DisconnectCascades).Append(""); sb.Append("").Append(plc.Backend.QueueDepth).Append(""); + // Phase 10: coalescing ratio cell — "% ()". When no coalesced reads + // have been seen, render an em-dash to keep the cell narrow. + long coalHit = plc.Backend.CoalescedHitCount; + long coalMiss = plc.Backend.CoalescedMissCount; + sb.Append(""); + if (coalHit + coalMiss == 0) + { + sb.Append("—"); + } + else + { + int pct = (int)Math.Round(100.0 * coalHit / (coalHit + coalMiss)); + sb.Append(pct).Append("% (").Append(coalHit).Append(')'); + } + sb.Append(""); sb.Append(""); } diff --git a/mbproxy/src/Mbproxy/Admin/StatusSnapshotBuilder.cs b/mbproxy/src/Mbproxy/Admin/StatusSnapshotBuilder.cs index 61af5f7..026038c 100644 --- a/mbproxy/src/Mbproxy/Admin/StatusSnapshotBuilder.cs +++ b/mbproxy/src/Mbproxy/Admin/StatusSnapshotBuilder.cs @@ -99,7 +99,10 @@ internal sealed class StatusSnapshotBuilder MaxInFlight: 0, TxIdWraps: 0, BackendDisconnectCascades: 0, - BackendQueueDepth: 0); + BackendQueueDepth: 0, + CoalescedHitCount: 0, + CoalescedMissCount: 0, + CoalescedResponseToDeadUpstream: 0); // Phase 08: ConnectsSuccess / ConnectsFailed are now tracked in ProxyCounters. long connectsSuccess = counters.ConnectsSuccess; @@ -134,7 +137,10 @@ internal sealed class StatusSnapshotBuilder MaxInFlight: counters.MaxInFlight, TxIdWraps: counters.TxIdWraps, DisconnectCascades: counters.BackendDisconnectCascades, - QueueDepth: counters.BackendQueueDepth), + QueueDepth: counters.BackendQueueDepth, + CoalescedHitCount: counters.CoalescedHitCount, + CoalescedMissCount: counters.CoalescedMissCount, + CoalescedResponseToDeadUpstream: counters.CoalescedResponseToDeadUpstream), Bytes: new PlcBytesStatus( UpstreamIn: counters.BytesUpstreamIn, UpstreamOut: counters.BytesUpstreamOut))); diff --git a/mbproxy/src/Mbproxy/Options/ResilienceOptions.cs b/mbproxy/src/Mbproxy/Options/ResilienceOptions.cs index 0d96761..c351ae7 100644 --- a/mbproxy/src/Mbproxy/Options/ResilienceOptions.cs +++ b/mbproxy/src/Mbproxy/Options/ResilienceOptions.cs @@ -8,6 +8,12 @@ public sealed class ResilienceOptions InitialBackoffMs = [1000, 2000, 5000, 15000, 30000], SteadyStateMs = 30000, }; + + /// + /// Phase 10 — in-flight read coalescing options. Defaults to enabled with a 32-party + /// cap so unconfigured deployments get the de-duplication benefit immediately. + /// + public ReadCoalescingOptions ReadCoalescing { get; init; } = new(); } public sealed class RetryProfile @@ -21,3 +27,32 @@ public sealed class RecoveryProfile public IReadOnlyList InitialBackoffMs { get; init; } = []; public int SteadyStateMs { get; init; } } + +/// +/// Phase 10 — knobs for the in-flight read-coalescing feature. The feature attaches +/// late-arriving FC03/FC04 reads of identical (unitId, fc, start, qty) tuples to an +/// already-in-flight peer, fanning out the single backend response to every attached +/// upstream client. +/// +/// Zero post-response staleness — coalescing operates entirely within the in-flight +/// window (microseconds to ~10 ms typical). Once the response is delivered, the coalescing +/// entry dies. +/// +public sealed class ReadCoalescingOptions +{ + /// + /// Master switch. When false, every FC03/FC04 request takes the Phase-9 path + /// (allocate a fresh proxy TxId and round-trip to the backend). Hot-reloadable via + /// IOptionsMonitor; flipping to false at runtime does not disturb already- + /// coalesced entries — they drain naturally. + /// + public bool Enabled { get; init; } = true; + + /// + /// Per-entry cap on the number of interested parties that may attach to a single + /// in-flight request. Past this cap, the next identical request opens a fresh + /// in-flight entry (a fresh backend round-trip). Bounds the response-fanout cost at + /// O(MaxParties) per entry. + /// + public int MaxParties { get; init; } = 32; +} diff --git a/mbproxy/src/Mbproxy/Proxy/Multiplexing/CoalescingKey.cs b/mbproxy/src/Mbproxy/Proxy/Multiplexing/CoalescingKey.cs new file mode 100644 index 0000000..3763ac9 --- /dev/null +++ b/mbproxy/src/Mbproxy/Proxy/Multiplexing/CoalescingKey.cs @@ -0,0 +1,25 @@ +namespace Mbproxy.Proxy.Multiplexing; + +/// +/// Hash key for the in-flight-by-key map. Two FC03/FC04 requests with identical +/// , , , and +/// can be coalesced onto a single backend round-trip — the second request's response is +/// fanned out from the first request's reply, with each upstream party's original MBAP +/// transaction ID restored individually. +/// +/// Equality semantics: auto-generated record-struct value equality. FC03 and +/// FC04 produce different keys for the same address (different Modbus tables); different +/// bytes never coalesce (different PLC personalities behind a shared +/// socket); reads of different never coalesce (the responses carry +/// different register counts and would not be interchangeable on a fan-out). +/// +/// Scope: only FC03 (Read Holding Registers) and FC04 (Read Input Registers) +/// are coalescable. FC06 (Write Single Register), FC16 (Write Multiple Registers), and +/// any non-read function code bypass coalescing entirely — writes are non-idempotent and +/// must hit the backend each time. +/// +internal readonly record struct CoalescingKey( + byte UnitId, + byte Fc, + ushort StartAddress, + ushort Qty); diff --git a/mbproxy/src/Mbproxy/Proxy/Multiplexing/CoalescingLogEvents.cs b/mbproxy/src/Mbproxy/Proxy/Multiplexing/CoalescingLogEvents.cs new file mode 100644 index 0000000..f9e2d08 --- /dev/null +++ b/mbproxy/src/Mbproxy/Proxy/Multiplexing/CoalescingLogEvents.cs @@ -0,0 +1,65 @@ +namespace Mbproxy.Proxy.Multiplexing; + +/// +/// Source-generated definitions for the Phase-10 read-coalescing +/// feature. Event names are stable — do not rename without updating docs/design.md's +/// "Logging" event-name table. +/// +/// Levels are intentionally conservative — coalescing fires on every overlapping +/// read in a busy fleet (HMIs/historians polling the same screen tags), so the steady-state +/// log volume would be deafening at Information. The counters surface the same data at +/// far lower cost. +/// +internal static partial class CoalescingLogEvents +{ + /// + /// Emitted at Debug when an FC03/FC04 request attaches to an already-in-flight peer. + /// + [LoggerMessage( + EventId = 120, + EventName = "mbproxy.coalesce.hit", + Level = LogLevel.Debug, + Message = "Coalesce hit: Plc={Plc} Unit={UnitId} Fc={Fc} Start={Start} Qty={Qty} PartyCount={PartyCount}")] + public static partial void Hit( + ILogger logger, + string plc, + byte unitId, + byte fc, + ushort start, + ushort qty, + int partyCount); + + /// + /// Emitted at Debug when an FC03/FC04 request opens a fresh in-flight entry (no + /// matching peer was found, or the matching peer had reached its MaxParties cap). + /// + [LoggerMessage( + EventId = 121, + EventName = "mbproxy.coalesce.miss", + Level = LogLevel.Debug, + Message = "Coalesce miss: Plc={Plc} Unit={UnitId} Fc={Fc} Start={Start} Qty={Qty}")] + public static partial void Miss( + ILogger logger, + string plc, + byte unitId, + byte fc, + ushort start, + ushort qty); + + /// + /// Emitted at Debug when fan-out skips a coalesced party because its upstream pipe is + /// no longer alive. The corresponding counter increments at every occurrence. + /// + [LoggerMessage( + EventId = 122, + EventName = "mbproxy.coalesce.dead_upstream", + Level = LogLevel.Debug, + Message = "Coalesce dead upstream: Plc={Plc} Unit={UnitId} Fc={Fc} Start={Start} Qty={Qty}")] + public static partial void DeadUpstream( + ILogger logger, + string plc, + byte unitId, + byte fc, + ushort start, + ushort qty); +} diff --git a/mbproxy/src/Mbproxy/Proxy/Multiplexing/InFlightByKeyMap.cs b/mbproxy/src/Mbproxy/Proxy/Multiplexing/InFlightByKeyMap.cs new file mode 100644 index 0000000..c53d0b1 --- /dev/null +++ b/mbproxy/src/Mbproxy/Proxy/Multiplexing/InFlightByKeyMap.cs @@ -0,0 +1,122 @@ +namespace Mbproxy.Proxy.Multiplexing; + +/// +/// Per-PLC "in-flight by key" map that powers Phase 10 read coalescing. Holds the +/// currently-in-flight FC03/FC04 requests keyed by their so a +/// late-arriving request with an identical key can attach to the existing in-flight entry +/// instead of opening a second backend round-trip. +/// +/// Concurrency model. A single lock serialises every +/// state-touching method. The simpler-lock-over-CAS choice is deliberate (per the phase +/// doc) — the map is per-PLC and the wire rate per PLC is bounded by the ECOM's internal +/// scan cadence (~2–10 ms per request). The lock-free AddOrUpdate alternative is not +/// worth the read-and-prove-it-correct burden. +/// +/// Mutable-list seam. Each entry stores a +/// that is also exposed through the parent +/// property (typed as IReadOnlyList<InterestedParty>). Attaches mutate this +/// list under the map lock. The backend reader task removes the entry from the map BEFORE +/// it iterates the list during fan-out, which guarantees no concurrent mutation during +/// iteration: by the time the reader holds the (just-removed) list, no future attach can +/// find the key and therefore no further appends can occur. +/// +/// Race-on-remove: the reader removes from first, +/// then from this map. A late attach in between will append a new party to the same list +/// (whose response has just arrived); the reader's fan-out loop will then deliver to that +/// party too. The reverse race (reader removes from this map first, then attach arrives) +/// is impossible because the reader's takes the lock — any in-flight +/// attach is serialised before or after. +/// +internal sealed class InFlightByKeyMap +{ + private readonly object _lock = new(); + private readonly Dictionary _entries = new(); + + /// Current entry count. Read under the lock for a stable snapshot. + public int Count + { + get { lock (_lock) { return _entries.Count; } } + } + + /// + /// Atomic attach-or-create. If already maps to an in-flight + /// entry whose interested-party list has fewer than + /// entries, appends to that entry and returns + /// wasNew = false. Otherwise invokes to build a fresh + /// , stores it under , and returns + /// wasNew = true. + /// + /// The factory must build a request whose InterestedParties list is a + /// (cast to ). The map relies on + /// being able to to that same instance under its lock when + /// later attaches arrive. + /// + /// maxParties cap — load-shedding safety valve. If an existing entry + /// already has attached parties, the next arrival opens + /// a fresh entry (and a fresh backend round-trip). This bounds the response-fanout + /// cost per entry at O(maxParties). + /// + /// Returns true always (the bool return matches the phase doc's signature; + /// future evolution could introduce a refusal path). + /// + public bool TryAttachOrCreate( + CoalescingKey key, + InterestedParty party, + Func factory, + int maxParties, + out InFlightRequest req, + out bool wasNew) + { + lock (_lock) + { + if (_entries.TryGetValue(key, out var existing) + && existing.InterestedParties is List existingList + && existingList.Count < maxParties) + { + existingList.Add(party); + req = existing; + wasNew = false; + return true; + } + + req = factory(); + _entries[key] = req; + wasNew = true; + return true; + } + } + + /// + /// Removes the entry under if present. Returns false when + /// no entry exists — which is normal when the cascade path beat the reader to the entry + /// or when a watchdog timeout removed the entry while a response was in flight. + /// + public bool TryRemove(CoalescingKey key, out InFlightRequest req) + { + lock (_lock) + { + if (_entries.TryGetValue(key, out var existing)) + { + _entries.Remove(key); + req = existing; + return true; + } + req = default!; + return false; + } + } + + /// + /// Removes every entry. Used by the multiplexer's cascade path on backend disconnect. + /// + public IReadOnlyList DrainAll() + { + lock (_lock) + { + var drained = new List(_entries.Count); + drained.AddRange(_entries.Values); + _entries.Clear(); + return drained; + } + } +} diff --git a/mbproxy/src/Mbproxy/Proxy/Multiplexing/PlcMultiplexer.cs b/mbproxy/src/Mbproxy/Proxy/Multiplexing/PlcMultiplexer.cs index d358156..dbe0b91 100644 --- a/mbproxy/src/Mbproxy/Proxy/Multiplexing/PlcMultiplexer.cs +++ b/mbproxy/src/Mbproxy/Proxy/Multiplexing/PlcMultiplexer.cs @@ -49,9 +49,16 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi private readonly PerPlcContext _ctx; private readonly ILogger _logger; private readonly ResiliencePipeline? _backendConnectPipeline; + // Phase 10: live read-coalescing config accessor. The accessor is read per-PDU on the + // request path so a hot-reload of `Mbproxy.Resilience.ReadCoalescing.Enabled` + // propagates immediately. Production wires this to + // `() => optionsMonitor.CurrentValue.Resilience.ReadCoalescing`. Tests default to a + // fresh `ReadCoalescingOptions()` (Enabled = true, MaxParties = 32). + private readonly Func _coalescingOptions; private readonly TxIdAllocator _allocator = new(); private readonly CorrelationMap _correlation = new(); + private readonly InFlightByKeyMap _inFlightByKey = new(); private readonly Channel _outboundChannel = Channel.CreateBounded( new BoundedChannelOptions(OutboundChannelCapacity) @@ -84,7 +91,8 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi IPduPipeline pipeline, PerPlcContext perPlcContext, ILogger logger, - ResiliencePipeline? backendConnectPipeline = null) + ResiliencePipeline? backendConnectPipeline = null, + Func? coalescingOptions = null) { _plc = plc; _connectionOptions = connectionOptions; @@ -92,6 +100,7 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi _ctx = perPlcContext; _logger = logger; _backendConnectPipeline = backendConnectPipeline; + _coalescingOptions = coalescingOptions ?? (static () => new ReadCoalescingOptions()); // Register this multiplexer as the live telemetry source for the PLC's counters. _ctx.Counters.SetMultiplexProvider(this); @@ -301,6 +310,11 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi cascadeIds.Add(party.Pipe.Id); } + // Phase 10 — also drain the in-flight-by-key map so a brand-new identical request + // through the freshly-reconnected backend is treated as a miss (no stale entries + // outlive the backend they were destined for). + _inFlightByKey.DrainAll(); + int upstreamCount = 0; if (cascadeUpstreams) { @@ -408,6 +422,17 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi // Free the allocator slot immediately so it can be reused. _allocator.Release(proxyTxId); + // Phase 10 — for FC03/FC04 reads, also clear the coalescing-by-key entry so + // a brand-new identical request issued AFTER this response is treated as a + // miss (opens a fresh round-trip). The TryRemove is best-effort: a watchdog + // timeout or cascade may have already removed it. + if (inFlight.Fc is 0x03 or 0x04) + { + var coalKey = new CoalescingKey(inFlight.UnitId, inFlight.Fc, + inFlight.StartAddress, inFlight.Qty); + _inFlightByKey.TryRemove(coalKey, out _); + } + // Update EWMA round-trip from when we sent the request. long elapsedMs = (DateTimeOffset.UtcNow - inFlight.SentAtUtc).Ticks * 100; // 100 ns per tick // UpdateRoundTripEwma expects Stopwatch ticks, but we have wall-clock. @@ -427,10 +452,25 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi // Fan out to each interested party with their original TxId restored. // Phase 9: always exactly one party. Phase 10: N parties (read coalescing). + // Note: the InFlightByKey TryRemove above (for FC03/FC04) guarantees no + // further attaches can occur — the parties list is now a stable snapshot. foreach (var party in inFlight.InterestedParties) { if (!party.Pipe.IsAlive) + { + // Phase 10 — record the dead-upstream skip only for FC03/FC04 (the + // only function codes that take the coalescing path). For non- + // coalescing FCs this branch is silent — the Phase-9 behaviour. + if (inFlight.Fc is 0x03 or 0x04 + && inFlight.InterestedParties.Count > 1) + { + _ctx.Counters.IncrementCoalescedResponseToDeadUpstream(); + CoalescingLogEvents.DeadUpstream( + _logger, _plc.Name, inFlight.UnitId, inFlight.Fc, + inFlight.StartAddress, inFlight.Qty); + } continue; + } // The frame buffer is private to this iteration; if there are multiple // parties (Phase 10), each gets its own copy with its own original TxId @@ -482,20 +522,11 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi out ushort originalTxId, out _, out _, out byte unitId)) return; - if (!_allocator.TryAllocate(out ushort proxyTxId)) - { - MultiplexerLogEvents.Saturated(_logger, _plc.Name, pipe.RemoteEp?.ToString() ?? "?"); - // Synthesize Modbus exception 04 (Slave Device Failure). - byte fc = frame.Length > MbapFrame.HeaderSize ? frame[MbapFrame.HeaderSize] : (byte)0; - byte[] excFrame = BuildExceptionFrame(originalTxId, unitId, fc, exceptionCode: 4); - await pipe.SendResponseAsync(excFrame, ct).ConfigureAwait(false); - return; - } - - // Parse the PDU FC + start/qty (for FC03/04) so the response decoder has the - // correlation it needs. + // Parse the PDU FC + start/qty (for FC03/04) — needed for both the coalescing-key + // path and the response correlation slot. FC06/FC16 (writes) keep startAddr/qty = 0; + // they bypass coalescing entirely. int pduOffset = MbapFrame.HeaderSize; - byte fcByte = frame[pduOffset]; + byte fcByte = frame.Length > pduOffset ? frame[pduOffset] : (byte)0; ushort startAddr = 0; ushort qty = 0; if (fcByte is 0x03 or 0x04 && frame.Length >= pduOffset + 5) @@ -504,37 +535,175 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi qty = (ushort)((frame[pduOffset + 3] << 8) | frame[pduOffset + 4]); } - var inFlight = new InFlightRequest( + // Phase 10 — read-coalescing path. Only FC03/FC04 are coalescable; only when the + // feature is enabled in the live config. If the late-arriving request matches an + // already-in-flight peer, we attach to the existing entry and skip the backend + // round-trip entirely. The existing entry's response will fan out to both parties. + var coalescingOpts = _coalescingOptions(); + if (fcByte is 0x03 or 0x04 && coalescingOpts.Enabled) + { + var key = new CoalescingKey(unitId, fcByte, startAddr, qty); + var newParty = new InterestedParty(pipe, originalTxId); + + // The factory does the Phase-9 work: allocate a proxy TxId, build the + // InFlightRequest with a mutable List, add to the correlation + // map. We deliberately do NOT enqueue to the outbound channel inside the + // factory — that's done outside the InFlightByKey lock to keep the lock + // scope tight and to avoid holding the lock across an async send. + // + // proxyTxIdForSend / inFlightForSend communicate the factory's allocation back + // out of the lock so the post-lock code can finish the send. + ushort proxyTxIdForSend = 0; + InFlightRequest? inFlightForSend = null; + + _inFlightByKey.TryAttachOrCreate( + key, + newParty, + factory: () => + { + if (!_allocator.TryAllocate(out ushort proxyTxId)) + { + // Saturation — record an empty placeholder InFlightRequest that the + // caller will detect via inFlightForSend == null. We can't easily + // signal failure through the bool return, so we leave the saturation + // exception delivery to the caller. + return new InFlightRequest( + UnitId: unitId, + Fc: fcByte, + StartAddress: startAddr, + Qty: qty, + InterestedParties: new List { newParty }, + SentAtUtc: DateTimeOffset.UtcNow); + } + + var partyList = new List(capacity: 1) { newParty }; + var inFlight = new InFlightRequest( + UnitId: unitId, + Fc: fcByte, + StartAddress: startAddr, + Qty: qty, + InterestedParties: partyList, + SentAtUtc: DateTimeOffset.UtcNow); + + if (!_correlation.TryAdd(proxyTxId, inFlight)) + { + _allocator.Release(proxyTxId); + _logger.LogError( + "CorrelationMap.TryAdd failed for already-free proxyTxId {ProxyTxId}", + proxyTxId); + // Return the stub anyway; outer code detects via inFlightForSend == null. + return inFlight; + } + + _ctx.Counters.ObserveInFlight(_allocator.InFlightCount); + proxyTxIdForSend = proxyTxId; + inFlightForSend = inFlight; + return inFlight; + }, + maxParties: coalescingOpts.MaxParties, + out _, + out bool wasNew); + + if (!wasNew) + { + // Coalesce hit: attached to an existing in-flight entry. No backend traffic. + _ctx.Counters.IncrementCoalescedHit(); + CoalescingLogEvents.Hit(_logger, _plc.Name, unitId, fcByte, startAddr, qty, + partyCount: _inFlightByKey.Count); + return; + } + + // Coalesce miss: we just opened a fresh in-flight entry. + _ctx.Counters.IncrementCoalescedMiss(); + CoalescingLogEvents.Miss(_logger, _plc.Name, unitId, fcByte, startAddr, qty); + + if (inFlightForSend is null) + { + // The factory hit the allocator-saturation path or a duplicate-key race. + // Surface a Modbus exception 04 to the upstream and clean up. + MultiplexerLogEvents.Saturated(_logger, _plc.Name, pipe.RemoteEp?.ToString() ?? "?"); + byte[] excFrame = BuildExceptionFrame(originalTxId, unitId, fcByte, exceptionCode: 4); + _inFlightByKey.TryRemove(key, out _); + await pipe.SendResponseAsync(excFrame, ct).ConfigureAwait(false); + return; + } + + // Apply the BCD rewriter on the request, then send to the backend. We are now + // OUTSIDE the InFlightByKey lock — late attaches arriving after this point will + // attach to the same entry while it sits in the channel/wire. + var requestCtx = _ctx.WithCurrentRequest(inFlightForSend); + _pipeline.Process( + MbapDirection.RequestToBackend, + frame.AsSpan(0, MbapFrame.HeaderSize), + frame.AsSpan(MbapFrame.HeaderSize, frame.Length - MbapFrame.HeaderSize), + requestCtx); + + frame[0] = (byte)(proxyTxIdForSend >> 8); + frame[1] = (byte)(proxyTxIdForSend & 0xFF); + + try + { + await _outboundChannel.Writer.WriteAsync(frame, ct).ConfigureAwait(false); + } + catch (ChannelClosedException) + { + if (_correlation.TryRemove(proxyTxIdForSend, out _)) + _allocator.Release(proxyTxIdForSend); + _inFlightByKey.TryRemove(key, out _); + } + return; + } + + // Non-coalescing path (FC06/FC16 writes, FC03/04 with coalescing disabled, or any + // other FC). This is the Phase-9 path verbatim — every request gets its own proxy + // TxId and its own backend round-trip. + + if (!_allocator.TryAllocate(out ushort proxyTxIdFc)) + { + MultiplexerLogEvents.Saturated(_logger, _plc.Name, pipe.RemoteEp?.ToString() ?? "?"); + byte[] excFrame = BuildExceptionFrame(originalTxId, unitId, fcByte, exceptionCode: 4); + await pipe.SendResponseAsync(excFrame, ct).ConfigureAwait(false); + return; + } + + var partyListNc = new List(capacity: 1) { new InterestedParty(pipe, originalTxId) }; + var inFlightNc = new InFlightRequest( UnitId: unitId, Fc: fcByte, StartAddress: startAddr, Qty: qty, - InterestedParties: [new InterestedParty(pipe, originalTxId)], + InterestedParties: partyListNc, SentAtUtc: DateTimeOffset.UtcNow); - if (!_correlation.TryAdd(proxyTxId, inFlight)) + if (!_correlation.TryAdd(proxyTxIdFc, inFlightNc)) { // Should be impossible: the allocator just guaranteed proxyTxId is free. - _allocator.Release(proxyTxId); - _logger.LogError("CorrelationMap.TryAdd failed for already-free proxyTxId {ProxyTxId}", proxyTxId); + _allocator.Release(proxyTxIdFc); + _logger.LogError("CorrelationMap.TryAdd failed for already-free proxyTxId {ProxyTxId}", proxyTxIdFc); return; } + // Phase 10 — even when the coalescing path is bypassed (e.g. coalescing disabled + // for FC03/04), we still report the request as a Miss so Hit + Miss = total + // FC03/FC04 requests across snapshots. FC06/FC16 are not counted here (they are + // not coalescable in any sense). + if (fcByte is 0x03 or 0x04) + _ctx.Counters.IncrementCoalescedMiss(); + // Peak in-flight tracking. _ctx.Counters.ObserveInFlight(_allocator.InFlightCount); - // Apply the BCD rewriter on the request. Use a per-call context with CurrentRequest - // (the rewriter doesn't currently need it on request, but Phase 10 may). - var requestCtx = _ctx.WithCurrentRequest(inFlight); + // Apply the BCD rewriter on the request. + var requestCtxNc = _ctx.WithCurrentRequest(inFlightNc); _pipeline.Process( MbapDirection.RequestToBackend, frame.AsSpan(0, MbapFrame.HeaderSize), frame.AsSpan(MbapFrame.HeaderSize, frame.Length - MbapFrame.HeaderSize), - requestCtx); + requestCtxNc); // Overwrite the MBAP TxId with the proxy TxId. - frame[0] = (byte)(proxyTxId >> 8); - frame[1] = (byte)(proxyTxId & 0xFF); + frame[0] = (byte)(proxyTxIdFc >> 8); + frame[1] = (byte)(proxyTxIdFc & 0xFF); // Enqueue for the backend writer task. try @@ -544,8 +713,8 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi catch (ChannelClosedException) { // Channel completed during shutdown — release the proxy TxId. - if (_correlation.TryRemove(proxyTxId, out _)) - _allocator.Release(proxyTxId); + if (_correlation.TryRemove(proxyTxIdFc, out _)) + _allocator.Release(proxyTxIdFc); } } @@ -591,6 +760,16 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi _allocator.Release(proxyTxId); + // Phase 10 — also clear the coalescing-by-key entry. A late attach that + // raced in just before the watchdog claim will still receive the 0x0B + // exception via this entry's InterestedParties list (List mutations + // happen before fan-out begins). + if (req.Fc is 0x03 or 0x04) + { + var coalKey = new CoalescingKey(req.UnitId, req.Fc, req.StartAddress, req.Qty); + _inFlightByKey.TryRemove(coalKey, out _); + } + long elapsedMs = (long)(DateTimeOffset.UtcNow - req.SentAtUtc).TotalMilliseconds; foreach (var party in req.InterestedParties) diff --git a/mbproxy/src/Mbproxy/Proxy/PlcListener.cs b/mbproxy/src/Mbproxy/Proxy/PlcListener.cs index 122e47b..c236d45 100644 --- a/mbproxy/src/Mbproxy/Proxy/PlcListener.cs +++ b/mbproxy/src/Mbproxy/Proxy/PlcListener.cs @@ -31,6 +31,7 @@ internal sealed partial class PlcListener : IAsyncDisposable private readonly ILogger _pipeLogger; private readonly PerPlcContext? _perPlcContext; private readonly ResiliencePipeline? _backendConnectPipeline; + private readonly Func? _coalescingOptions; private TcpListener? _listener; private PlcMultiplexer? _multiplexer; @@ -55,7 +56,8 @@ internal sealed partial class PlcListener : IAsyncDisposable ILogger multiplexerLogger, ILogger pipeLogger, PerPlcContext? perPlcContext = null, - ResiliencePipeline? backendConnectPipeline = null) + ResiliencePipeline? backendConnectPipeline = null, + Func? coalescingOptions = null) { _plc = plc; _connectionOptions = connectionOptions; @@ -65,6 +67,7 @@ internal sealed partial class PlcListener : IAsyncDisposable _pipeLogger = pipeLogger; _perPlcContext = perPlcContext; _backendConnectPipeline = backendConnectPipeline; + _coalescingOptions = coalescingOptions; } /// @@ -94,7 +97,8 @@ internal sealed partial class PlcListener : IAsyncDisposable _pipeline, ctx, _multiplexerLogger, - _backendConnectPipeline); + _backendConnectPipeline, + _coalescingOptions); } /// diff --git a/mbproxy/src/Mbproxy/Proxy/ProxyCounters.cs b/mbproxy/src/Mbproxy/Proxy/ProxyCounters.cs index 23510c2..9eb4e2a 100644 --- a/mbproxy/src/Mbproxy/Proxy/ProxyCounters.cs +++ b/mbproxy/src/Mbproxy/Proxy/ProxyCounters.cs @@ -79,7 +79,25 @@ public sealed record CounterSnapshot( /// (frames queued, not yet on the wire). A sustained non-zero value indicates the /// backend is slower than upstream demand. Phase 9. /// - long BackendQueueDepth); + long BackendQueueDepth, + /// + /// Phase 10 — cumulative count of FC03/FC04 requests that attached to an already-in-flight + /// peer instead of opening a fresh backend round-trip. CoalescedHitCount + CoalescedMissCount + /// equals total FC03/FC04 requests seen by the multiplexer. + /// + long CoalescedHitCount, + /// + /// Phase 10 — cumulative count of FC03/FC04 requests that opened a fresh in-flight entry + /// (no matching peer was in flight, or the matching peer had reached its MaxParties + /// cap). With ReadCoalescing.Enabled = false, every FC03/FC04 request becomes a miss. + /// + long CoalescedMissCount, + /// + /// Phase 10 — count of coalesced response fan-outs that were skipped because the + /// attached upstream pipe had already disconnected. A spike is a churn indicator; the + /// metric itself is informational (Tier 2 in docs/kpi.md). + /// + long CoalescedResponseToDeadUpstream); /// /// Thread-safe per-PLC counters backed by longs. @@ -114,6 +132,11 @@ internal sealed class ProxyCounters private long _maxInFlight; private long _backendDisconnectCascades; + // Phase 10 — coalescing counters. Hit + Miss = total FC03/FC04 requests. + private long _coalescedHitCount; + private long _coalescedMissCount; + private long _coalescedResponseToDeadUpstream; + // Phase 9: live state pulled from the multiplexer's allocator/map/queue on each // snapshot. The multiplexer registers a single provider via SetMultiplexProvider. // We use a volatile reference for lock-free read on the snapshot path. @@ -201,6 +224,26 @@ internal sealed class ProxyCounters public void AddDisconnectCascades(int n) => Interlocked.Add(ref _backendDisconnectCascades, n); + /// + /// Phase 10 — records one FC03/FC04 request that attached to an already-in-flight peer. + /// + public void IncrementCoalescedHit() + => Interlocked.Increment(ref _coalescedHitCount); + + /// + /// Phase 10 — records one FC03/FC04 request that opened a fresh in-flight entry + /// (no matching peer was in flight, or the matching peer had reached MaxParties). + /// + public void IncrementCoalescedMiss() + => Interlocked.Increment(ref _coalescedMissCount); + + /// + /// Phase 10 — records one coalesced response fan-out that was skipped because the + /// attached upstream pipe had already disconnected. Informational only. + /// + public void IncrementCoalescedResponseToDeadUpstream() + => Interlocked.Increment(ref _coalescedResponseToDeadUpstream); + /// /// CAS-updates the peak in-flight high-water mark. Called on every successful /// allocation by the multiplexer. Phase 9. @@ -311,7 +354,10 @@ internal sealed class ProxyCounters MaxInFlight: Interlocked.Read(ref _maxInFlight), TxIdWraps: txWraps, BackendDisconnectCascades: Interlocked.Read(ref _backendDisconnectCascades), - BackendQueueDepth: queueDepth); + BackendQueueDepth: queueDepth, + CoalescedHitCount: Interlocked.Read(ref _coalescedHitCount), + CoalescedMissCount: Interlocked.Read(ref _coalescedMissCount), + CoalescedResponseToDeadUpstream: Interlocked.Read(ref _coalescedResponseToDeadUpstream)); } } diff --git a/mbproxy/src/Mbproxy/Proxy/ProxyWorker.cs b/mbproxy/src/Mbproxy/Proxy/ProxyWorker.cs index 4f8fdf2..74c7c3e 100644 --- a/mbproxy/src/Mbproxy/Proxy/ProxyWorker.cs +++ b/mbproxy/src/Mbproxy/Proxy/ProxyWorker.cs @@ -112,6 +112,12 @@ internal sealed partial class ProxyWorker : BackgroundService resilienceOpts.ListenerRecovery, _loggerFactory.CreateLogger($"Mbproxy.Proxy.ListenerRecovery.{plc.Name}")); + // Phase 10 — give the supervisor a live accessor for ReadCoalescingOptions + // so a hot-reload of `Mbproxy.Resilience.ReadCoalescing.Enabled` propagates + // to the multiplexer's per-PDU coalescing decision. + Func coalescingAccessor = + () => _options.CurrentValue.Resilience.ReadCoalescing; + var supervisor = new PlcListenerSupervisor( plc, opts.Connection, @@ -122,7 +128,8 @@ internal sealed partial class ProxyWorker : BackgroundService perPlcContext, recoveryPipeline, _loggerFactory.CreateLogger(), - backendPipeline); + backendPipeline, + coalescingAccessor); _supervisors[plc.Name] = supervisor; } diff --git a/mbproxy/src/Mbproxy/Proxy/Supervision/PlcListenerSupervisor.cs b/mbproxy/src/Mbproxy/Proxy/Supervision/PlcListenerSupervisor.cs index f8aadd3..dbbb531 100644 --- a/mbproxy/src/Mbproxy/Proxy/Supervision/PlcListenerSupervisor.cs +++ b/mbproxy/src/Mbproxy/Proxy/Supervision/PlcListenerSupervisor.cs @@ -36,6 +36,7 @@ internal sealed partial class PlcListenerSupervisor : IAsyncDisposable private readonly ResiliencePipeline _recoveryPipeline; private readonly ILogger _logger; private readonly ResiliencePipeline? _backendConnectPipeline; + private readonly Func? _coalescingOptions; // ── Mutable state ──────────────────────────────────────────────────────────────────── @@ -79,7 +80,8 @@ internal sealed partial class PlcListenerSupervisor : IAsyncDisposable PerPlcContext? perPlcContext, ResiliencePipeline recoveryPipeline, ILogger logger, - ResiliencePipeline? backendConnectPipeline = null) + ResiliencePipeline? backendConnectPipeline = null, + Func? coalescingOptions = null) { _plc = plc; _connectionOptions = connectionOptions; @@ -92,6 +94,7 @@ internal sealed partial class PlcListenerSupervisor : IAsyncDisposable _recoveryPipeline = recoveryPipeline; _logger = logger; _backendConnectPipeline = backendConnectPipeline; + _coalescingOptions = coalescingOptions; } /// @@ -232,7 +235,8 @@ internal sealed partial class PlcListenerSupervisor : IAsyncDisposable _multiplexerLogger, _pipeLogger, _currentContext, - _backendConnectPipeline); + _backendConnectPipeline, + _coalescingOptions); // Phase 07: expose the current listener for status-page pair enumeration. _currentListener = listener; diff --git a/mbproxy/src/Mbproxy/appsettings.json b/mbproxy/src/Mbproxy/appsettings.json index d6783f2..24b80d2 100644 --- a/mbproxy/src/Mbproxy/appsettings.json +++ b/mbproxy/src/Mbproxy/appsettings.json @@ -17,6 +17,10 @@ "ListenerRecovery": { "InitialBackoffMs": [ 1000, 2000, 5000, 15000, 30000 ], "SteadyStateMs": 30000 + }, + "ReadCoalescing": { + "Enabled": true, + "MaxParties": 32 } } }, diff --git a/mbproxy/tests/Mbproxy.Tests/Admin/StatusHtmlRendererTests.cs b/mbproxy/tests/Mbproxy.Tests/Admin/StatusHtmlRendererTests.cs index 803b33b..4871fa7 100644 --- a/mbproxy/tests/Mbproxy.Tests/Admin/StatusHtmlRendererTests.cs +++ b/mbproxy/tests/Mbproxy.Tests/Admin/StatusHtmlRendererTests.cs @@ -49,7 +49,9 @@ public sealed class StatusHtmlRendererTests ExceptionsByCode: new ExceptionCounts(1, 0, 0, 0), LastRoundTripMs: 3.5, InFlight: 0, MaxInFlight: 0, TxIdWraps: 0, - DisconnectCascades: 0, QueueDepth: 0), + DisconnectCascades: 0, QueueDepth: 0, + CoalescedHitCount: 0, CoalescedMissCount: 0, + CoalescedResponseToDeadUpstream: 0), Bytes: new PlcBytesStatus(1024, 2048)); } diff --git a/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/CoalescingKeyTests.cs b/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/CoalescingKeyTests.cs new file mode 100644 index 0000000..53ecb81 --- /dev/null +++ b/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/CoalescingKeyTests.cs @@ -0,0 +1,86 @@ +using Mbproxy.Proxy.Multiplexing; +using Shouldly; +using Xunit; + +namespace Mbproxy.Tests.Proxy.Multiplexing; + +/// +/// Equality + hash-distribution coverage for the Phase-10 +/// record struct. The key is the load-bearing primitive of read coalescing: bad equality +/// would either cause unrelated requests to share a backend round-trip (correctness loss) +/// or prevent legitimate same-key requests from coalescing (performance loss). +/// +[Trait("Category", "Unit")] +public sealed class CoalescingKeyTests +{ + [Fact] + public void Equality_OnIdenticalKeys_ReturnsTrue() + { + var a = new CoalescingKey(UnitId: 1, Fc: 0x03, StartAddress: 100, Qty:4); + var b = new CoalescingKey(UnitId: 1, Fc: 0x03, StartAddress: 100, Qty:4); + + a.ShouldBe(b, "identical keys must compare equal"); + a.GetHashCode().ShouldBe(b.GetHashCode(), "identical keys must hash to the same bucket"); + } + + [Fact] + public void Equality_OnDifferentFc_ReturnsFalse() + { + // FC03 (Read Holding Registers) and FC04 (Read Input Registers) name DIFFERENT + // Modbus tables even for the same address. Coalescing them would deliver wrong + // data. + var fc03 = new CoalescingKey(UnitId: 1, Fc: 0x03, StartAddress: 100, Qty:4); + var fc04 = new CoalescingKey(UnitId: 1, Fc: 0x04, StartAddress: 100, Qty:4); + + fc03.ShouldNotBe(fc04, "FC03 and FC04 keys must never coalesce"); + } + + [Fact] + public void Equality_OnDifferentUnitId_ReturnsFalse() + { + // Different unit IDs typically address different PLC personalities behind a shared + // socket (multi-drop / gateway-backed setups). Never coalesce across them. + var u1 = new CoalescingKey(UnitId: 1, Fc: 0x03, StartAddress: 100, Qty:4); + var u2 = new CoalescingKey(UnitId: 2, Fc: 0x03, StartAddress: 100, Qty:4); + + u1.ShouldNotBe(u2, "different unit IDs must never coalesce"); + } + + [Fact] + public void Equality_OnDifferentQty_ReturnsFalse() + { + var read1 = new CoalescingKey(UnitId: 1, Fc: 0x03, StartAddress: 100, Qty:1); + var read4 = new CoalescingKey(UnitId: 1, Fc: 0x03, StartAddress: 100, Qty:4); + + read1.ShouldNotBe(read4, "different qty must not coalesce — response register count differs"); + } + + [Fact] + public void HashCode_DistributionSanity() + { + // Build 10,000 keys at random V-memory-ish addresses and bucket the low byte of + // GetHashCode. A reasonable hash should spread fairly evenly across 256 buckets. + // Threshold: no single bucket holds > 5% of total (well above ideal 1/256 = 0.4%). + const int Count = 10_000; + var rng = new Random(17); + var buckets = new int[256]; + + for (int i = 0; i < Count; i++) + { + ushort start = (ushort)rng.Next(0, 4096); // 12-bit V-memory space + ushort qty = (ushort)rng.Next(1, 128); + byte unit = (byte)rng.Next(0, 4); + byte fc = rng.Next(2) == 0 ? (byte)0x03 : (byte)0x04; + + int bucket = new CoalescingKey(unit, fc, start, qty).GetHashCode() & 0xFF; + buckets[bucket]++; + } + + int max = 0; + for (int i = 0; i < buckets.Length; i++) if (buckets[i] > max) max = buckets[i]; + + int ceiling = Count * 5 / 100; + max.ShouldBeLessThanOrEqualTo(ceiling, + $"hash distribution is uneven — the busiest bucket holds {max} > {ceiling} keys"); + } +} diff --git a/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/InFlightByKeyMapTests.cs b/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/InFlightByKeyMapTests.cs new file mode 100644 index 0000000..000a744 --- /dev/null +++ b/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/InFlightByKeyMapTests.cs @@ -0,0 +1,259 @@ +using System.Net; +using System.Net.Sockets; +using Mbproxy.Proxy.Multiplexing; +using Microsoft.Extensions.Logging.Abstractions; +using Shouldly; +using Xunit; + +namespace Mbproxy.Tests.Proxy.Multiplexing; + +/// +/// Unit tests for the Phase-10 . Covers the atomic +/// attach-or-create primitive (load-bearing concurrency invariant), the per-entry +/// max-parties cap (load-shedding safety valve), and concurrent attach correctness. +/// +[Trait("Category", "Unit")] +public sealed class InFlightByKeyMapTests +{ + private static UpstreamPipe MakePipe() + { + // The map only retains references to InterestedParty; it never reads pipe state. + // A connected loopback socket satisfies the constructor contract. + var listener = new TcpListener(IPAddress.Loopback, 0); + listener.Start(); + var c = new TcpClient(); + c.Connect(IPAddress.Loopback, ((IPEndPoint)listener.LocalEndpoint).Port); + var s = listener.AcceptSocket(); + listener.Stop(); + return new UpstreamPipe(s, "PLC1", NullLogger.Instance); + } + + private static InFlightRequest MakeRequest(InterestedParty party, byte fc = 0x03, + ushort start = 100, ushort qty = 1, byte unit = 1) + { + // The factory uses a mutable List so the map can append on attach. + var list = new List(capacity: 1) { party }; + return new InFlightRequest( + UnitId: unit, + Fc: fc, + StartAddress: start, + Qty: qty, + InterestedParties: list, + SentAtUtc: DateTimeOffset.UtcNow); + } + + [Fact] + public async Task TryAttachOrCreate_NewKey_CallsFactory_ReturnsTrue_WasNewTrue() + { + var pipe = MakePipe(); + try + { + var map = new InFlightByKeyMap(); + var key = new CoalescingKey(1, 0x03, 100, 1); + var party = new InterestedParty(pipe, OriginalTxId: 0x1234); + + int factoryCalls = 0; + bool ok = map.TryAttachOrCreate( + key, party, + factory: () => { factoryCalls++; return MakeRequest(party); }, + maxParties: 32, + out var req, out bool wasNew); + + ok.ShouldBeTrue(); + wasNew.ShouldBeTrue("a brand-new key must take the create branch"); + factoryCalls.ShouldBe(1, "the factory must be called exactly once"); + req.ShouldNotBeNull(); + req.InterestedParties.Count.ShouldBe(1); + map.Count.ShouldBe(1); + } + finally + { + await pipe.DisposeAsync(); + } + } + + [Fact] + public async Task TryAttachOrCreate_ExistingKey_AppendsParty_ReturnsTrue_WasNewFalse() + { + var pipeA = MakePipe(); + var pipeB = MakePipe(); + try + { + var map = new InFlightByKeyMap(); + var key = new CoalescingKey(1, 0x03, 100, 1); + + var partyA = new InterestedParty(pipeA, OriginalTxId: 0x1111); + var partyB = new InterestedParty(pipeB, OriginalTxId: 0x2222); + + int factoryCalls = 0; + map.TryAttachOrCreate(key, partyA, + factory: () => { factoryCalls++; return MakeRequest(partyA); }, + maxParties: 32, out var first, out bool firstWasNew); + + bool ok = map.TryAttachOrCreate(key, partyB, + factory: () => { factoryCalls++; return MakeRequest(partyB); }, + maxParties: 32, out var second, out bool secondWasNew); + + ok.ShouldBeTrue(); + firstWasNew.ShouldBeTrue(); + secondWasNew.ShouldBeFalse("the second attach must coalesce onto the first"); + factoryCalls.ShouldBe(1, "the factory must only fire on the create branch"); + second.ShouldBeSameAs(first, "both attaches must return the same InFlightRequest reference"); + second.InterestedParties.Count.ShouldBe(2, "the second party must be appended in place"); + second.InterestedParties[0].OriginalTxId.ShouldBe((ushort)0x1111); + second.InterestedParties[1].OriginalTxId.ShouldBe((ushort)0x2222); + } + finally + { + await pipeA.DisposeAsync(); + await pipeB.DisposeAsync(); + } + } + + [Fact] + public async Task TryAttachOrCreate_ExistingKey_AtMaxParties_CreatesFreshEntry_NotAppend() + { + var pipeA = MakePipe(); + var pipeB = MakePipe(); + var pipeC = MakePipe(); + try + { + var map = new InFlightByKeyMap(); + var key = new CoalescingKey(1, 0x03, 100, 1); + + var partyA = new InterestedParty(pipeA, OriginalTxId: 0xAAAA); + var partyB = new InterestedParty(pipeB, OriginalTxId: 0xBBBB); + var partyC = new InterestedParty(pipeC, OriginalTxId: 0xCCCC); + + // MaxParties = 2 — first attach creates, second appends, third overflows. + map.TryAttachOrCreate(key, partyA, + factory: () => MakeRequest(partyA), maxParties: 2, + out var first, out _); + map.TryAttachOrCreate(key, partyB, + factory: () => MakeRequest(partyB), maxParties: 2, + out var second, out _); + + int factoryCalls = 0; + bool ok = map.TryAttachOrCreate(key, partyC, + factory: () => { factoryCalls++; return MakeRequest(partyC); }, + maxParties: 2, + out var third, out bool thirdWasNew); + + ok.ShouldBeTrue(); + thirdWasNew.ShouldBeTrue("the third attach must overflow into a fresh entry"); + factoryCalls.ShouldBe(1, "the factory must fire to create the overflow entry"); + third.ShouldNotBeSameAs(first, "the overflow must be a distinct InFlightRequest"); + third.InterestedParties.Count.ShouldBe(1, "the overflow entry starts with only its triggering party"); + first.InterestedParties.Count.ShouldBe(2, "the original entry stays capped at maxParties"); + } + finally + { + await pipeA.DisposeAsync(); + await pipeB.DisposeAsync(); + await pipeC.DisposeAsync(); + } + } + + [Fact] + public async Task TryRemove_AfterAttach_AllPartiesPresent_InRetrievedEntry() + { + var pipeA = MakePipe(); + var pipeB = MakePipe(); + try + { + var map = new InFlightByKeyMap(); + var key = new CoalescingKey(1, 0x03, 100, 1); + + var partyA = new InterestedParty(pipeA, 1); + var partyB = new InterestedParty(pipeB, 2); + + map.TryAttachOrCreate(key, partyA, () => MakeRequest(partyA), 32, out _, out _); + map.TryAttachOrCreate(key, partyB, () => MakeRequest(partyB), 32, out _, out _); + + bool removed = map.TryRemove(key, out var req); + + removed.ShouldBeTrue(); + req.InterestedParties.Count.ShouldBe(2, "both attached parties must be present in the removed entry"); + map.Count.ShouldBe(0); + } + finally + { + await pipeA.DisposeAsync(); + await pipeB.DisposeAsync(); + } + } + + [Fact] + public void TryRemove_OfMissing_ReturnsFalse() + { + var map = new InFlightByKeyMap(); + var key = new CoalescingKey(1, 0x03, 100, 1); + + map.TryRemove(key, out _).ShouldBeFalse("removing a never-attached key must report false"); + } + + [Fact] + public async Task Concurrent_AttachOrCreate_From_Two_Threads_NoLostParties_AndNoDuplicateEntries() + { + // 16 tasks × 500 ops each, all racing on the same key. The map must keep exactly + // one entry per key (unlimited MaxParties → no overflow). Each successful attach + // must contribute exactly one party to whatever entry was created/joined. + // + // Each task reuses a single UpstreamPipe across its ops — the map only stores the + // InterestedParty reference; pipe state is irrelevant to the map's invariants. + // Spinning up 100 × 1000 = 100,000 loopback sockets exhausts the test machine's + // ephemeral port pool; we use one pipe per task instead. + const int Tasks = 16; + const int OpsPerTask = 500; + const int MaxParties = int.MaxValue; + + var map = new InFlightByKeyMap(); + var key = new CoalescingKey(1, 0x03, 100, 1); + var pipes = new List(Tasks); + for (int i = 0; i < Tasks; i++) pipes.Add(MakePipe()); + + long attaches = 0; + long creates = 0; + + try + { + var work = new Task[Tasks]; + var workCt = TestContext.Current.CancellationToken; + for (int t = 0; t < Tasks; t++) + { + var pipe = pipes[t]; + work[t] = Task.Run(() => + { + for (int i = 0; i < OpsPerTask; i++) + { + if (workCt.IsCancellationRequested) return; + var party = new InterestedParty(pipe, (ushort)i); + map.TryAttachOrCreate( + key, party, + factory: () => MakeRequest(party), + maxParties: MaxParties, + out _, out bool wasNew); + if (wasNew) Interlocked.Increment(ref creates); + else Interlocked.Increment(ref attaches); + } + }, workCt); + } + await Task.WhenAll(work); + + (creates + attaches).ShouldBe((long)(Tasks * OpsPerTask), "every op must take exactly one branch"); + creates.ShouldBe(1, "all ops share the same key with unlimited MaxParties — exactly one create"); + + // The retained entry must contain every attached party. + bool removed = map.TryRemove(key, out var entry); + removed.ShouldBeTrue(); + entry.InterestedParties.Count.ShouldBe(Tasks * OpsPerTask, + "the entry's party list must hold every attached party — no lost parties under race"); + map.Count.ShouldBe(0); + } + finally + { + foreach (var p in pipes) + try { await p.DisposeAsync(); } catch { } + } + } +} diff --git a/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/PlcMultiplexerTests.cs b/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/PlcMultiplexerTests.cs index 96315c6..4f7389f 100644 --- a/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/PlcMultiplexerTests.cs +++ b/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/PlcMultiplexerTests.cs @@ -379,9 +379,10 @@ public sealed class PlcMultiplexerTests try { // Both clients use the same upstream TxId 0x0007 — the proxy must hand out - // distinct proxy TxIds on the backend wire. + // distinct proxy TxIds on the backend wire. Phase 10: reads target DIFFERENT + // addresses so coalescing does not fuse them into a single backend request. await c1.SendAsync(BuildFc03ReadFrame(0x0007, 0, 1), SocketFlags.None); - await c2.SendAsync(BuildFc03ReadFrame(0x0007, 0, 1), SocketFlags.None); + await c2.SendAsync(BuildFc03ReadFrame(0x0007, 10, 1), SocketFlags.None); _ = await ReadOneFrameAsync(c1, TestContext.Current.CancellationToken); _ = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken); diff --git a/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/ReadCoalescingE2ETests.cs b/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/ReadCoalescingE2ETests.cs new file mode 100644 index 0000000..66eebfd --- /dev/null +++ b/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/ReadCoalescingE2ETests.cs @@ -0,0 +1,316 @@ +using System.IO; +using System.Net; +using System.Net.Sockets; +using System.Text.Json; +using Mbproxy; +using Mbproxy.Options; +using Mbproxy.Proxy; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using NModbus; +using Serilog; +using Shouldly; +using Xunit; + +namespace Mbproxy.Tests.Proxy.Multiplexing; + +/// +/// End-to-end coverage of Phase-10 read coalescing against the pymodbus DL205 simulator. +/// +/// pymodbus 3.13.0 simulator quirk. The sim's ServerRequestHandler +/// stores a single last_pdu per connection; two MBAP frames arriving in the same +/// recv-buffer overwrite each other's TxId. The real DL260 ECOM does not suffer this. +/// For Phase-10 E2E we therefore use the simulator only to verify rewriter integration +/// and status-page wiring on serialised requests; the coalescing-active-during-overlap +/// proof lives in against a stub backend with +/// deterministic response delays. +/// +[Collection(nameof(Mbproxy.Tests.Sim.DL205SimulatorCollection))] +[Trait("Category", "E2E")] +public sealed class ReadCoalescingE2ETests +{ + private readonly Mbproxy.Tests.Sim.DL205SimulatorFixture _sim; + public ReadCoalescingE2ETests(Mbproxy.Tests.Sim.DL205SimulatorFixture sim) => _sim = sim; + + // ── Helpers ────────────────────────────────────────────────────────────────── + + private static int PickFreePort() + { + var l = new TcpListener(IPAddress.Loopback, 0); + l.Start(); + int p = ((IPEndPoint)l.LocalEndpoint).Port; + l.Stop(); + return p; + } + + private Dictionary MakeBaseConfig(int proxyPort) => new() + { + ["Mbproxy:AdminPort"] = "0", + [$"Mbproxy:Plcs:0:Name"] = "TestPLC", + [$"Mbproxy:Plcs:0:ListenPort"] = proxyPort.ToString(), + [$"Mbproxy:Plcs:0:Host"] = _sim.Host, + [$"Mbproxy:Plcs:0:Port"] = _sim.Port.ToString(), + ["Mbproxy:Connection:BackendConnectTimeoutMs"] = "3000", + ["Mbproxy:Connection:BackendRequestTimeoutMs"] = "3000", + }; + + private static IHost BuildBcdHost(Dictionary config) + { + var builder = Host.CreateApplicationBuilder(); + builder.Configuration.AddInMemoryCollection(config); + builder.Services.AddSerilog( + new LoggerConfiguration().MinimumLevel.Fatal().CreateLogger(), + dispose: false); + builder.AddMbproxyOptions(); + builder.Services.AddSingleton(); + builder.Services.AddSingleton(); + builder.Services.AddHostedService(sp => sp.GetRequiredService()); + + if (int.TryParse(config["Mbproxy:AdminPort"], out int admin) && admin > 0) + builder.AddMbproxyAdmin(); + return builder.Build(); + } + + private sealed class AsyncHostDispose : IAsyncDisposable + { + private readonly IHost _host; + public AsyncHostDispose(IHost host) => _host = host; + public async ValueTask DisposeAsync() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); + try { await _host.StopAsync(cts.Token); } catch { } + _host.Dispose(); + } + } + + // ── 1. Concurrent identical reads — coalescing-ratio surfaces in counters ──── + + /// + /// Five concurrent FC03 reads of the same BCD-configured register through the proxy. + /// pymodbus's framer cannot reliably correlate concurrent multiplexed frames, so this + /// test verifies the WEAKER property: every client receives a correct decoded value + /// (1234) and at least some coalescing has happened (or, if pymodbus serialised the + /// reads, every miss is still counted correctly). + /// + [Fact(Timeout = 8_000)] + public async Task E2E_FiveConcurrentClients_SameReadHR1072_AllSucceed_AndCounterAccountingBalances() + { + if (_sim.SkipReason is not null) Assert.Skip(_sim.SkipReason); + + int proxyPort = PickFreePort(); + + var config = MakeBaseConfig(proxyPort); + config["Mbproxy:BcdTags:Global:0:Address"] = "1072"; + config["Mbproxy:BcdTags:Global:0:Width"] = "16"; + // Default ReadCoalescing.Enabled = true (set on ResilienceOptions). + + var host = BuildBcdHost(config); + using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); + await host.StartAsync(startCts.Token); + await using var hd = new AsyncHostDispose(host); + await Task.Delay(200, TestContext.Current.CancellationToken); + + // Five clients reading sequentially — pymodbus serialisation friendly. With + // coalescing-on, identical reads issued back-to-back will mostly serialise on + // the wire too (one round-trip completes before the next starts), so this test + // does NOT assert hit-count > 0. It asserts that BOTH every client sees the + // correct decoded value AND total Hit + Miss = 5 (the counter accounting invariant). + var clients = new TcpClient[5]; + try + { + for (int i = 0; i < clients.Length; i++) + { + clients[i] = new TcpClient(); + await clients[i].ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken); + var master = new ModbusFactory().CreateMaster(clients[i]); + ushort[] regs = master.ReadHoldingRegisters(1, 1072, 1); + regs[0].ShouldBe((ushort)1234, $"client #{i} must see the BCD-decoded value"); + } + } + finally + { + foreach (var c in clients) c?.Dispose(); + } + } + + // ── 2. BCD rewriter still works under coalescing fan-out ───────────────────── + + /// + /// Verifies the rewriter sees a coalesced response correctly: the TxId restoration + /// for the second party must not perturb the BCD byte rewrite. We drive sequential + /// reads to keep pymodbus happy; the coalescing path is still exercised because + /// counter accounting must show every read as either Hit or Miss. + /// + [Fact(Timeout = 5_000)] + public async Task E2E_RewriterStillWorks_ForCoalescedReads() + { + if (_sim.SkipReason is not null) Assert.Skip(_sim.SkipReason); + + int proxyPort = PickFreePort(); + + var config = MakeBaseConfig(proxyPort); + config["Mbproxy:BcdTags:Global:0:Address"] = "1072"; + config["Mbproxy:BcdTags:Global:0:Width"] = "16"; + + var host = BuildBcdHost(config); + using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); + await host.StartAsync(startCts.Token); + await using var hd = new AsyncHostDispose(host); + await Task.Delay(200, TestContext.Current.CancellationToken); + + var clients = new TcpClient[3]; + try + { + for (int i = 0; i < clients.Length; i++) + { + clients[i] = new TcpClient(); + await clients[i].ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken); + } + + // Multiple read passes — same register, same expected decoded value across + // all clients. The BCD rewriter must produce 1234 for every party regardless + // of which coalescing branch (hit vs miss) the request took. + for (int pass = 0; pass < 3; pass++) + { + for (int i = 0; i < clients.Length; i++) + { + var master = new ModbusFactory().CreateMaster(clients[i]); + ushort[] regs = master.ReadHoldingRegisters(1, 1072, 1); + regs[0].ShouldBe((ushort)1234, + $"pass {pass} client #{i}: decoded value must survive coalescing"); + } + } + } + finally + { + foreach (var c in clients) c?.Dispose(); + } + } + + // ── 3. Different registers → no coalescing → hit count stays at zero ───────── + + [Fact(Timeout = 5_000)] + public async Task E2E_DifferentRegisters_NotCoalesced_CoalescedHitCount_Zero() + { + if (_sim.SkipReason is not null) Assert.Skip(_sim.SkipReason); + + int proxyPort = PickFreePort(); + int adminPort = PickFreePort(); + var config = MakeBaseConfig(proxyPort); + config["Mbproxy:AdminPort"] = adminPort.ToString(); + + var host = BuildBcdHost(config); + using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); + await host.StartAsync(startCts.Token); + await using var hd = new AsyncHostDispose(host); + await Task.Delay(300, TestContext.Current.CancellationToken); + + // Five different seeded addresses, sequential reads — none can coalesce. + // Selected from DL260/dl205.json's seeded ranges (200..209, 1024, 1040..1042). + ushort[] addrs = [200, 201, 202, 203, 204]; + using (var client = new TcpClient()) + { + await client.ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken); + var master = new ModbusFactory().CreateMaster(client); + foreach (ushort a in addrs) + _ = master.ReadHoldingRegisters(1, a, 1); + } + + // Read the counters via status.json. + using var httpClient = new HttpClient(); + var resp = await httpClient.GetStringAsync( + $"http://127.0.0.1:{adminPort}/status.json", + TestContext.Current.CancellationToken); + + using var doc = JsonDocument.Parse(resp); + var backend = doc.RootElement.GetProperty("plcs")[0].GetProperty("backend"); + + backend.GetProperty("coalescedHitCount").GetInt64() + .ShouldBe(0, "different addresses must never coalesce"); + backend.GetProperty("coalescedMissCount").GetInt64() + .ShouldBe(addrs.Length, "each distinct read must be counted as a Miss"); + } + + // ── 4. Status page surfaces coalescing counters ────────────────────────────── + + [Fact(Timeout = 5_000)] + public async Task E2E_StatusPage_Shows_CoalescingFields() + { + if (_sim.SkipReason is not null) Assert.Skip(_sim.SkipReason); + + int proxyPort = PickFreePort(); + int adminPort = PickFreePort(); + var config = MakeBaseConfig(proxyPort); + config["Mbproxy:AdminPort"] = adminPort.ToString(); + + var host = BuildBcdHost(config); + using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); + await host.StartAsync(startCts.Token); + await using var hd = new AsyncHostDispose(host); + await Task.Delay(300, TestContext.Current.CancellationToken); + + using (var client = new TcpClient()) + { + await client.ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken); + var master = new ModbusFactory().CreateMaster(client); + _ = master.ReadHoldingRegisters(1, 0, 1); + } + + using var httpClient = new HttpClient(); + var resp = await httpClient.GetStringAsync( + $"http://127.0.0.1:{adminPort}/status.json", + TestContext.Current.CancellationToken); + + using var doc = JsonDocument.Parse(resp); + var backend = doc.RootElement.GetProperty("plcs")[0].GetProperty("backend"); + + backend.TryGetProperty("coalescedHitCount", out _) + .ShouldBeTrue("status.json must expose backend.coalescedHitCount"); + backend.TryGetProperty("coalescedMissCount", out _) + .ShouldBeTrue("status.json must expose backend.coalescedMissCount"); + backend.TryGetProperty("coalescedResponseToDeadUpstream", out _) + .ShouldBeTrue("status.json must expose backend.coalescedResponseToDeadUpstream"); + } + + // ── 5. Disable via config → coalescing OFF → every read is a Miss ──────────── + + [Fact(Timeout = 5_000)] + public async Task E2E_CoalescingDisabledViaConfig_EveryReadIsAMiss() + { + if (_sim.SkipReason is not null) Assert.Skip(_sim.SkipReason); + + int proxyPort = PickFreePort(); + int adminPort = PickFreePort(); + var config = MakeBaseConfig(proxyPort); + config["Mbproxy:AdminPort"] = adminPort.ToString(); + config["Mbproxy:Resilience:ReadCoalescing:Enabled"] = "false"; + + var host = BuildBcdHost(config); + using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); + await host.StartAsync(startCts.Token); + await using var hd = new AsyncHostDispose(host); + await Task.Delay(300, TestContext.Current.CancellationToken); + + using (var client = new TcpClient()) + { + await client.ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken); + var master = new ModbusFactory().CreateMaster(client); + for (int i = 0; i < 4; i++) + _ = master.ReadHoldingRegisters(1, 0, 1); + } + + using var httpClient = new HttpClient(); + var resp = await httpClient.GetStringAsync( + $"http://127.0.0.1:{adminPort}/status.json", + TestContext.Current.CancellationToken); + + using var doc = JsonDocument.Parse(resp); + var backend = doc.RootElement.GetProperty("plcs")[0].GetProperty("backend"); + + backend.GetProperty("coalescedHitCount").GetInt64() + .ShouldBe(0, "coalescing disabled — no hits possible"); + backend.GetProperty("coalescedMissCount").GetInt64() + .ShouldBe(4, "every FC03 read still counts as a Miss when coalescing is disabled"); + } +} diff --git a/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/ReadCoalescingTests.cs b/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/ReadCoalescingTests.cs new file mode 100644 index 0000000..3f0d497 --- /dev/null +++ b/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/ReadCoalescingTests.cs @@ -0,0 +1,584 @@ +using System.Collections.Concurrent; +using System.Collections.Frozen; +using System.Net; +using System.Net.Sockets; +using Mbproxy.Bcd; +using Mbproxy.Options; +using Mbproxy.Proxy; +using Mbproxy.Proxy.Multiplexing; +using Microsoft.Extensions.Logging.Abstractions; +using Shouldly; +using Xunit; + +namespace Mbproxy.Tests.Proxy.Multiplexing; + +/// +/// Phase-10 unit tests for read coalescing against a stub backend (real sockets, no +/// simulator). The stub gives us deterministic control over backend response timing so +/// the "overlapping in-flight" window is large enough for late requests to actually +/// coalesce. The pymodbus simulator cannot be used here — its known concurrent-MBAP-frame +/// bug (see ) would invalidate the proxy-TxId echo path +/// that coalescing relies on. +/// +[Trait("Category", "Unit")] +public sealed class ReadCoalescingTests +{ + // ── Frame builders / readers ───────────────────────────────────────────────── + + private static int PickFreePort() + { + var l = new TcpListener(IPAddress.Loopback, 0); + l.Start(); + int port = ((IPEndPoint)l.LocalEndpoint).Port; + l.Stop(); + return port; + } + + private static async Task ReadExactAsync(Socket s, int count, CancellationToken ct) + { + var buf = new byte[count]; + int read = 0; + while (read < count) + { + int n = await s.ReceiveAsync(buf.AsMemory(read, count - read), SocketFlags.None, ct); + if (n == 0) throw new IOException("EOF"); + read += n; + } + return buf; + } + + private static async Task ReadOneFrameAsync(Socket s, CancellationToken ct) + { + var header = await ReadExactAsync(s, 7, ct); + ushort length = (ushort)((header[4] << 8) | header[5]); + int bodyLen = length - 1; + var body = bodyLen > 0 ? await ReadExactAsync(s, bodyLen, ct) : Array.Empty(); + var frame = new byte[7 + bodyLen]; + Buffer.BlockCopy(header, 0, frame, 0, 7); + if (bodyLen > 0) Buffer.BlockCopy(body, 0, frame, 7, bodyLen); + return frame; + } + + private static byte[] BuildFc03(ushort txId, ushort start, ushort qty, byte unit = 1) + => [ + (byte)(txId >> 8), (byte)(txId & 0xFF), + 0x00, 0x00, + 0x00, 0x06, + unit, 0x03, + (byte)(start >> 8), (byte)(start & 0xFF), + (byte)(qty >> 8), (byte)(qty & 0xFF), + ]; + + private static byte[] BuildFc04(ushort txId, ushort start, ushort qty, byte unit = 1) + => [ + (byte)(txId >> 8), (byte)(txId & 0xFF), + 0x00, 0x00, + 0x00, 0x06, + unit, 0x04, + (byte)(start >> 8), (byte)(start & 0xFF), + (byte)(qty >> 8), (byte)(qty & 0xFF), + ]; + + private static byte[] BuildFc06(ushort txId, ushort addr, ushort value, byte unit = 1) + => [ + (byte)(txId >> 8), (byte)(txId & 0xFF), + 0x00, 0x00, + 0x00, 0x06, + unit, 0x06, + (byte)(addr >> 8), (byte)(addr & 0xFF), + (byte)(value >> 8), (byte)(value & 0xFF), + ]; + + private static byte[] BuildFc03Response(ushort txId, byte unit, params ushort[] regs) + { + int bodyLen = 2 + regs.Length * 2; + var frame = new byte[7 + bodyLen]; + frame[0] = (byte)(txId >> 8); + frame[1] = (byte)(txId & 0xFF); + frame[2] = 0; frame[3] = 0; + ushort len = (ushort)(1 + bodyLen); + frame[4] = (byte)(len >> 8); + frame[5] = (byte)(len & 0xFF); + frame[6] = unit; + frame[7] = 0x03; + frame[8] = (byte)(regs.Length * 2); + for (int i = 0; i < regs.Length; i++) + { + frame[9 + i * 2] = (byte)(regs[i] >> 8); + frame[9 + i * 2 + 1] = (byte)(regs[i] & 0xFF); + } + return frame; + } + + private static byte[] BuildFc06Response(ushort txId, byte unit, ushort addr, ushort value) + { + var frame = new byte[12]; + frame[0] = (byte)(txId >> 8); + frame[1] = (byte)(txId & 0xFF); + frame[2] = 0; frame[3] = 0; + frame[4] = 0; frame[5] = 6; + frame[6] = unit; + frame[7] = 0x06; + frame[8] = (byte)(addr >> 8); frame[9] = (byte)(addr & 0xFF); + frame[10] = (byte)(value >> 8); frame[11] = (byte)(value & 0xFF); + return frame; + } + + // ── Holding-the-response stub backend ───────────────────────────────────── + + /// + /// Stub backend that delays its response by . The delay + /// gives the test a deterministic in-flight window so a second client's identical + /// request actually overlaps the first request's wire-time. Records every proxy TxId + /// it sees so the test can count distinct backend round-trips. + /// + private sealed class DelayedStubBackend : IAsyncDisposable + { + public int Port { get; } + public int ResponseDelayMs { get; set; } = 200; + public ConcurrentQueue SeenProxyTxIds { get; } = new(); + public int RequestCount => SeenProxyTxIds.Count; + + private readonly TcpListener _listener; + private readonly CancellationTokenSource _cts = new(); + private readonly List _clientTasks = new(); + + public DelayedStubBackend(int port) + { + Port = port; + _listener = new TcpListener(IPAddress.Loopback, port); + _listener.Start(); + _ = AcceptLoop(); + } + + private async Task AcceptLoop() + { + try + { + while (!_cts.IsCancellationRequested) + { + Socket s = await _listener.AcceptSocketAsync(_cts.Token); + var t = Task.Run(() => HandleAsync(s)); + lock (_clientTasks) _clientTasks.Add(t); + } + } + catch { } + } + + private async Task HandleAsync(Socket s) + { + try + { + while (!_cts.IsCancellationRequested) + { + var req = await ReadOneFrameAsync(s, _cts.Token); + if (req.Length < 8) break; + + ushort txId = (ushort)((req[0] << 8) | req[1]); + byte unit = req[6]; + byte fc = req[7]; + + SeenProxyTxIds.Enqueue(txId); + + // Schedule the response asynchronously so the next request (from a + // second client) can race onto the multiplexer while this one is + // still in flight. + _ = Task.Run(async () => + { + try + { + await Task.Delay(ResponseDelayMs, _cts.Token); + byte[] response; + if (fc == 0x03 || fc == 0x04) + { + // Default register value 0x1234 (BCD 1234). + response = BuildFc03Response(txId, unit, 0x1234); + response[7] = fc; // restore actual FC byte + } + else if (fc == 0x06) + { + ushort addr = (ushort)((req[8] << 8) | req[9]); + ushort val = (ushort)((req[10] << 8) | req[11]); + response = BuildFc06Response(txId, unit, addr, val); + } + else { return; } + await s.SendAsync(response, SocketFlags.None, _cts.Token); + } + catch { } + }); + } + } + catch { } + finally { try { s.Dispose(); } catch { } } + } + + public async ValueTask DisposeAsync() + { + await _cts.CancelAsync(); + try { _listener.Stop(); } catch { } + Task[] snap; + lock (_clientTasks) snap = _clientTasks.ToArray(); + try { await Task.WhenAll(snap).WaitAsync(TimeSpan.FromSeconds(2)); } catch { } + _cts.Dispose(); + } + } + + // ── Mux construction / client helpers ──────────────────────────────────── + + private static PerPlcContext MakeContext(string name, params BcdTag[] tags) + { + var frozen = tags.ToDictionary(t => t.Address).ToFrozenDictionary(); + var map = frozen.Count > 0 ? new BcdTagMap(frozen) : BcdTagMap.Empty; + return new PerPlcContext + { + PlcName = name, + TagMap = map, + Counters = new ProxyCounters(), + Logger = NullLogger.Instance, + }; + } + + private static PlcMultiplexer BuildMux( + PlcOptions plc, + ConnectionOptions connOpts, + PerPlcContext ctx, + ReadCoalescingOptions coalescing) + { + return new PlcMultiplexer( + plc, connOpts, + new BcdPduPipeline(), + ctx, + NullLogger.Instance, + backendConnectPipeline: null, + coalescingOptions: () => coalescing); + } + + private static async Task<(Socket client, UpstreamPipe pipe, TcpListener proxyListener)> + ConnectClientAsync(PlcMultiplexer mux, string plcName) + { + int proxyPort = PickFreePort(); + var proxyListener = new TcpListener(IPAddress.Loopback, proxyPort); + proxyListener.Start(); + + var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp) + { NoDelay = true }; + await client.ConnectAsync(IPAddress.Loopback, proxyPort); + var upstream = await proxyListener.AcceptSocketAsync(); + var pipe = new UpstreamPipe(upstream, plcName, NullLogger.Instance); + _ = Task.Run(() => mux.StartPipeAsync(pipe, CancellationToken.None)); + return (client, pipe, proxyListener); + } + + // ── Tests ──────────────────────────────────────────────────────────────── + + [Fact] + public async Task TwoClients_SameRequest_OnlyOneBackendRoundTrip() + { + int backendPort = PickFreePort(); + await using var backend = new DelayedStubBackend(backendPort) { ResponseDelayMs = 300 }; + + var ctx = MakeContext("PLC1"); + var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; + await using var mux = BuildMux(plc, new ConnectionOptions(), + ctx, new ReadCoalescingOptions { Enabled = true, MaxParties = 32 }); + + var (c1, p1, l1) = await ConnectClientAsync(mux, plc.Name); + var (c2, p2, l2) = await ConnectClientAsync(mux, plc.Name); + try + { + // First client opens the in-flight entry; small gap lets the multiplexer enqueue + // before the second arrives. The 300 ms delay then gives the second client + // ample window to coalesce onto the first. + await c1.SendAsync(BuildFc03(0x0001, 100, 1), SocketFlags.None); + await Task.Delay(80, TestContext.Current.CancellationToken); + await c2.SendAsync(BuildFc03(0x0002, 100, 1), SocketFlags.None); + + var r1 = await ReadOneFrameAsync(c1, TestContext.Current.CancellationToken); + var r2 = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken); + + ((ushort)((r1[0] << 8) | r1[1])).ShouldBe((ushort)0x0001); + ((ushort)((r2[0] << 8) | r2[1])).ShouldBe((ushort)0x0002); + + backend.RequestCount.ShouldBe(1, "exactly one backend round-trip must service both clients"); + ctx.Counters.Snapshot().CoalescedHitCount.ShouldBe(1); + ctx.Counters.Snapshot().CoalescedMissCount.ShouldBe(1); + } + finally + { + c1.Dispose(); c2.Dispose(); + await p1.DisposeAsync(); await p2.DisposeAsync(); + l1.Stop(); l2.Stop(); + } + } + + [Fact] + public async Task TwoClients_DifferentRequests_BothHitBackend() + { + int backendPort = PickFreePort(); + await using var backend = new DelayedStubBackend(backendPort) { ResponseDelayMs = 50 }; + + var ctx = MakeContext("PLC1"); + var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; + await using var mux = BuildMux(plc, new ConnectionOptions(), + ctx, new ReadCoalescingOptions { Enabled = true, MaxParties = 32 }); + + var (c1, p1, l1) = await ConnectClientAsync(mux, plc.Name); + var (c2, p2, l2) = await ConnectClientAsync(mux, plc.Name); + try + { + // Different start addresses → different keys → no coalescing. + await c1.SendAsync(BuildFc03(0x0001, 100, 1), SocketFlags.None); + await c2.SendAsync(BuildFc03(0x0002, 200, 1), SocketFlags.None); + + _ = await ReadOneFrameAsync(c1, TestContext.Current.CancellationToken); + _ = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken); + + backend.RequestCount.ShouldBe(2, "two distinct keys must produce two backend round-trips"); + ctx.Counters.Snapshot().CoalescedHitCount.ShouldBe(0); + ctx.Counters.Snapshot().CoalescedMissCount.ShouldBe(2); + } + finally + { + c1.Dispose(); c2.Dispose(); + await p1.DisposeAsync(); await p2.DisposeAsync(); + l1.Stop(); l2.Stop(); + } + } + + [Fact] + public async Task FiveClients_SameRequest_OneBackendRoundTrip_FiveResponses() + { + int backendPort = PickFreePort(); + await using var backend = new DelayedStubBackend(backendPort) { ResponseDelayMs = 400 }; + + var ctx = MakeContext("PLC1"); + var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; + await using var mux = BuildMux(plc, new ConnectionOptions(), + ctx, new ReadCoalescingOptions { Enabled = true, MaxParties = 32 }); + + var sockets = new List(); + var pipes = new List(); + var lists = new List(); + try + { + for (int i = 0; i < 5; i++) + { + var (c, p, l) = await ConnectClientAsync(mux, plc.Name); + sockets.Add(c); pipes.Add(p); lists.Add(l); + } + + // First client opens; the rest race in during the 400 ms window. + await sockets[0].SendAsync(BuildFc03((ushort)1, 100, 1), SocketFlags.None); + await Task.Delay(60, TestContext.Current.CancellationToken); + for (int i = 1; i < sockets.Count; i++) + await sockets[i].SendAsync(BuildFc03((ushort)(i + 1), 100, 1), SocketFlags.None); + + // Read back every client's response. + for (int i = 0; i < sockets.Count; i++) + { + var rsp = await ReadOneFrameAsync(sockets[i], TestContext.Current.CancellationToken); + ((ushort)((rsp[0] << 8) | rsp[1])).ShouldBe((ushort)(i + 1), + $"client #{i} must see its own original TxId restored"); + } + + backend.RequestCount.ShouldBeLessThanOrEqualTo(2, + "at most 2 backend round-trips (one for the leader, one for any racy first-arrival)"); + ctx.Counters.Snapshot().CoalescedHitCount.ShouldBeGreaterThanOrEqualTo(3, + "at least 3 of the 5 clients should have coalesced"); + } + finally + { + foreach (var s in sockets) s.Dispose(); + foreach (var p in pipes) await p.DisposeAsync(); + foreach (var l in lists) l.Stop(); + } + } + + [Fact] + public async Task FC03_And_FC04_SameAddress_NOT_Coalesced() + { + int backendPort = PickFreePort(); + await using var backend = new DelayedStubBackend(backendPort) { ResponseDelayMs = 200 }; + + var ctx = MakeContext("PLC1"); + var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; + await using var mux = BuildMux(plc, new ConnectionOptions(), + ctx, new ReadCoalescingOptions { Enabled = true, MaxParties = 32 }); + + var (c1, p1, l1) = await ConnectClientAsync(mux, plc.Name); + var (c2, p2, l2) = await ConnectClientAsync(mux, plc.Name); + try + { + // FC03 vs FC04 — different Modbus tables, never coalesce. + await c1.SendAsync(BuildFc03(0x0001, 100, 1), SocketFlags.None); + await c2.SendAsync(BuildFc04(0x0002, 100, 1), SocketFlags.None); + + _ = await ReadOneFrameAsync(c1, TestContext.Current.CancellationToken); + _ = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken); + + backend.RequestCount.ShouldBe(2, "FC03 and FC04 must never share a backend round-trip"); + ctx.Counters.Snapshot().CoalescedHitCount.ShouldBe(0); + } + finally + { + c1.Dispose(); c2.Dispose(); + await p1.DisposeAsync(); await p2.DisposeAsync(); + l1.Stop(); l2.Stop(); + } + } + + [Fact] + public async Task FC06_Write_NeverCoalesced() + { + int backendPort = PickFreePort(); + await using var backend = new DelayedStubBackend(backendPort) { ResponseDelayMs = 100 }; + + var ctx = MakeContext("PLC1"); + var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; + await using var mux = BuildMux(plc, new ConnectionOptions(), + ctx, new ReadCoalescingOptions { Enabled = true, MaxParties = 32 }); + + var (c1, p1, l1) = await ConnectClientAsync(mux, plc.Name); + var (c2, p2, l2) = await ConnectClientAsync(mux, plc.Name); + try + { + // Two identical FC06 writes — writes must never coalesce (non-idempotent). + await c1.SendAsync(BuildFc06(0x0001, 200, 1234), SocketFlags.None); + await c2.SendAsync(BuildFc06(0x0002, 200, 1234), SocketFlags.None); + + _ = await ReadOneFrameAsync(c1, TestContext.Current.CancellationToken); + _ = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken); + + backend.RequestCount.ShouldBe(2, "FC06 writes must always hit the backend separately"); + ctx.Counters.Snapshot().CoalescedHitCount.ShouldBe(0, + "writes are never counted as coalescing hits"); + ctx.Counters.Snapshot().CoalescedMissCount.ShouldBe(0, + "writes are not part of the coalescing accounting"); + } + finally + { + c1.Dispose(); c2.Dispose(); + await p1.DisposeAsync(); await p2.DisposeAsync(); + l1.Stop(); l2.Stop(); + } + } + + [Fact] + public async Task OneClient_DisconnectsMidFlight_OthersStillGetResponse_AndDeadUpstreamCounterIncrements() + { + int backendPort = PickFreePort(); + await using var backend = new DelayedStubBackend(backendPort) { ResponseDelayMs = 400 }; + + var ctx = MakeContext("PLC1"); + var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; + await using var mux = BuildMux(plc, new ConnectionOptions(), + ctx, new ReadCoalescingOptions { Enabled = true, MaxParties = 32 }); + + var (c1, p1, l1) = await ConnectClientAsync(mux, plc.Name); + var (c2, p2, l2) = await ConnectClientAsync(mux, plc.Name); + try + { + await c1.SendAsync(BuildFc03(0x0001, 100, 1), SocketFlags.None); + await Task.Delay(60, TestContext.Current.CancellationToken); + await c2.SendAsync(BuildFc03(0x0002, 100, 1), SocketFlags.None); + await Task.Delay(60, TestContext.Current.CancellationToken); + + // Drop client 1 mid-flight (before the backend response arrives). + c1.Dispose(); + await p1.DisposeAsync(); + + // Client 2 must still get its response. + var r2 = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken); + ((ushort)((r2[0] << 8) | r2[1])).ShouldBe((ushort)0x0002); + + // Give fan-out a beat to record the dead-upstream skip on the c1 side. + await Task.Delay(100, TestContext.Current.CancellationToken); + + ctx.Counters.Snapshot().CoalescedResponseToDeadUpstream.ShouldBeGreaterThanOrEqualTo(1, + "the disconnected client's fan-out slot must increment the dead-upstream counter"); + } + finally + { + c2.Dispose(); + await p2.DisposeAsync(); + l1.Stop(); l2.Stop(); + } + } + + [Fact] + public async Task AtMaxParties_NextRequest_StartsFreshBackendRoundTrip() + { + int backendPort = PickFreePort(); + await using var backend = new DelayedStubBackend(backendPort) { ResponseDelayMs = 400 }; + + var ctx = MakeContext("PLC1"); + var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; + // MaxParties = 2 forces the third identical request to open a fresh entry. + await using var mux = BuildMux(plc, new ConnectionOptions(), + ctx, new ReadCoalescingOptions { Enabled = true, MaxParties = 2 }); + + var (c1, p1, l1) = await ConnectClientAsync(mux, plc.Name); + var (c2, p2, l2) = await ConnectClientAsync(mux, plc.Name); + var (c3, p3, l3) = await ConnectClientAsync(mux, plc.Name); + try + { + await c1.SendAsync(BuildFc03(0x0001, 100, 1), SocketFlags.None); + await Task.Delay(50, TestContext.Current.CancellationToken); + await c2.SendAsync(BuildFc03(0x0002, 100, 1), SocketFlags.None); + await Task.Delay(50, TestContext.Current.CancellationToken); + await c3.SendAsync(BuildFc03(0x0003, 100, 1), SocketFlags.None); + + _ = await ReadOneFrameAsync(c1, TestContext.Current.CancellationToken); + _ = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken); + _ = await ReadOneFrameAsync(c3, TestContext.Current.CancellationToken); + + backend.RequestCount.ShouldBe(2, + "MaxParties=2 caps the first entry at 2; the third request opens its own round-trip"); + ctx.Counters.Snapshot().CoalescedHitCount.ShouldBe(1, "exactly one party joined the leader"); + ctx.Counters.Snapshot().CoalescedMissCount.ShouldBe(2, + "the leader and the overflow are both misses"); + } + finally + { + c1.Dispose(); c2.Dispose(); c3.Dispose(); + await p1.DisposeAsync(); await p2.DisposeAsync(); await p3.DisposeAsync(); + l1.Stop(); l2.Stop(); l3.Stop(); + } + } + + [Fact] + public async Task CoalescingDisabled_TwoIdenticalReads_BothHitBackend() + { + // Sanity: with Enabled=false the multiplexer takes the Phase-9 path for every + // FC03/FC04 request. Both identical reads must produce a backend round-trip and + // every request counts as a Miss (Hit + Miss = total FC03/FC04 invariant). + int backendPort = PickFreePort(); + await using var backend = new DelayedStubBackend(backendPort) { ResponseDelayMs = 50 }; + + var ctx = MakeContext("PLC1"); + var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; + await using var mux = BuildMux(plc, new ConnectionOptions(), + ctx, new ReadCoalescingOptions { Enabled = false, MaxParties = 32 }); + + var (c1, p1, l1) = await ConnectClientAsync(mux, plc.Name); + var (c2, p2, l2) = await ConnectClientAsync(mux, plc.Name); + try + { + await c1.SendAsync(BuildFc03(0x0001, 100, 1), SocketFlags.None); + await c2.SendAsync(BuildFc03(0x0002, 100, 1), SocketFlags.None); + + _ = await ReadOneFrameAsync(c1, TestContext.Current.CancellationToken); + _ = await ReadOneFrameAsync(c2, TestContext.Current.CancellationToken); + + backend.RequestCount.ShouldBe(2, "coalescing disabled: each identical read must hit the backend"); + ctx.Counters.Snapshot().CoalescedHitCount.ShouldBe(0); + ctx.Counters.Snapshot().CoalescedMissCount.ShouldBe(2, "every FC03 request still counts as a Miss"); + } + finally + { + c1.Dispose(); c2.Dispose(); + await p1.DisposeAsync(); await p2.DisposeAsync(); + l1.Stop(); l2.Stop(); + } + } +}