From 0868613890a4d01f55368859d440b38d505b13c1 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 15 May 2026 09:40:54 -0400 Subject: [PATCH] mbproxy: add keepalive / connection monitoring The DL205/DL260 ECOM emits no TCP keepalives, so an idle backend socket can be silently dropped by a middlebox (switch, firewall, NAT) after 2-5 minutes. Enable OS SO_KEEPALIVE on backend and accepted upstream sockets, and drive a periodic synthetic FC03 heartbeat on each idle backend socket so a dead path is detected before a real client request hits it. Controlled by Connection.Keepalive (ON by default). Co-Authored-By: Claude Opus 4.7 (1M context) --- mbproxy/docs/Architecture/ConnectionModel.md | 1 + mbproxy/docs/Architecture/Keepalive.md | 76 ++++ mbproxy/docs/Operations/StatusPage.md | 24 +- mbproxy/install/mbproxy.config.template.json | 29 +- mbproxy/src/Mbproxy/Admin/StatusDto.cs | 8 +- .../src/Mbproxy/Admin/StatusHtmlRenderer.cs | 21 + .../Mbproxy/Admin/StatusSnapshotBuilder.cs | 10 +- .../Mbproxy/Configuration/ConfigReconciler.cs | 14 +- .../Mbproxy/Configuration/ReloadValidator.cs | 21 + .../src/Mbproxy/Options/ConnectionOptions.cs | 5 + .../src/Mbproxy/Options/KeepaliveOptions.cs | 52 +++ mbproxy/src/Mbproxy/Options/MbproxyOptions.cs | 16 + .../Proxy/Multiplexing/InFlightRequest.cs | 9 +- .../Proxy/Multiplexing/KeepaliveLogEvents.cs | 54 +++ .../Proxy/Multiplexing/PlcMultiplexer.cs | 186 ++++++++- .../Proxy/Multiplexing/UpstreamPipe.cs | 8 +- mbproxy/src/Mbproxy/Proxy/PlcListener.cs | 13 +- mbproxy/src/Mbproxy/Proxy/ProxyCounters.cs | 41 +- mbproxy/src/Mbproxy/Proxy/ProxyWorker.cs | 12 +- mbproxy/src/Mbproxy/Proxy/SocketKeepalive.cs | 49 +++ .../Supervision/PlcListenerSupervisor.cs | 8 +- .../Admin/StatusHtmlRendererTests.cs | 4 +- .../Configuration/ReloadValidatorTests.cs | 70 ++++ .../Proxy/Multiplexing/KeepaliveTests.cs | 366 ++++++++++++++++++ .../Proxy/Multiplexing/MultiplexerE2ETests.cs | 63 +++ 25 files changed, 1135 insertions(+), 25 deletions(-) create mode 100644 mbproxy/docs/Architecture/Keepalive.md create mode 100644 mbproxy/src/Mbproxy/Options/KeepaliveOptions.cs create mode 100644 mbproxy/src/Mbproxy/Proxy/Multiplexing/KeepaliveLogEvents.cs create mode 100644 mbproxy/src/Mbproxy/Proxy/SocketKeepalive.cs create mode 100644 mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/KeepaliveTests.cs diff --git a/mbproxy/docs/Architecture/ConnectionModel.md b/mbproxy/docs/Architecture/ConnectionModel.md index b355eae..a76f130 100644 --- a/mbproxy/docs/Architecture/ConnectionModel.md +++ b/mbproxy/docs/Architecture/ConnectionModel.md @@ -240,6 +240,7 @@ The per-request timeout watchdog described above is the production defence again - [`./Overview.md`](./Overview.md) — proxy architecture entry point - [`./ReadCoalescing.md`](./ReadCoalescing.md) — FC03/FC04 fan-out built on `InterestedParties` - [`./ResponseCache.md`](./ResponseCache.md) — per-PLC FC03/FC04 cache layered in front of this multiplexer +- [`./Keepalive.md`](./Keepalive.md) — TCP keepalive and the backend heartbeat that keeps this socket warm - [`../Operations/Configuration.md`](../Operations/Configuration.md) — `Connection.BackendConnectTimeoutMs`, `Connection.BackendRequestTimeoutMs`, retry tuning - [`../Operations/StatusPage.md`](../Operations/StatusPage.md) — `inFlight`, `maxInFlight`, `txIdWraps`, `queueDepth`, `disconnectCascades` counters - [`../Reference/LogEvents.md`](../Reference/LogEvents.md) — `mbproxy.multiplex.*` structured log events diff --git a/mbproxy/docs/Architecture/Keepalive.md b/mbproxy/docs/Architecture/Keepalive.md new file mode 100644 index 0000000..38b5b0e --- /dev/null +++ b/mbproxy/docs/Architecture/Keepalive.md @@ -0,0 +1,76 @@ +# Keepalive & Connection Monitoring + +The DL205/DL260 ECOM does not emit TCP keepalives (see [`../Reference/dl205.md`](../Reference/dl205.md) → "Behavioural Oddities"). An idle socket is silently dropped by middleboxes — switches, firewalls, NAT — typically after 2–5 minutes. The proxy holds one **persistent backend socket per PLC** ([`./ConnectionModel.md`](./ConnectionModel.md)) plus many accepted upstream client sockets, so it needs its own keepalive on both sides. + +Keepalive is **enabled by default** and is governed by the `Connection.Keepalive` option block (see [`../Operations/Configuration.md`](../Operations/Configuration.md)). Set `Connection.Keepalive.Enabled = false` to restore pre-keepalive behaviour exactly. + +## Two mechanisms + +| Mechanism | Scope | Detects | +|-----------|-------|---------| +| OS TCP keepalive (`SO_KEEPALIVE`) | Backend socket **and** accepted upstream sockets | A peer whose TCP stack is gone (host down, cable pulled, half-open socket). | +| Application heartbeat (FC03 probe) | Backend socket only | The above **plus** a middlebox idle-drop and an ECOM that is connected-but-not-answering Modbus. | + +The application heartbeat is the load-bearing mechanism; OS keepalive is a cheap belt-and-suspenders that also covers the window between heartbeat ticks. + +## Backend: OS TCP keepalive + +`SocketKeepalive.Apply` sets `SO_KEEPALIVE` plus the idle-time / probe-interval / probe-count tunables on the backend `Socket` right after it is created in `PlcMultiplexer.EnsureBackendConnectedAsync`. The tunables come from `Connection.Keepalive.Tcp*`. Socket options are applied **at connect time** — a hot-reload of the `Tcp*` values only affects backend sockets opened *after* the change. + +## Backend: application heartbeat + +A per-`PlcMultiplexer` background loop (`RunBackendHeartbeatAsync`) is started alongside the backend writer and reader on each successful connect, under the same `_backendCts`, and dies with them on teardown. + +- The multiplexer tracks `_lastBackendActivityUtc`, updated by **both** the writer (on every send) and the reader (on every received frame). Real traffic in either direction therefore suppresses the heartbeat. +- Each tick (a quarter of `BackendHeartbeatIdleMs`, floored at 500 ms), if the socket has been idle longer than `BackendHeartbeatIdleMs`, the loop issues a **synthetic FC03 qty=1 read** at `BackendHeartbeatProbeAddress` (default 0 = `V0`, valid on DL205/DL260). FC08 (Diagnostics) is **not** supported by the DL260 ECOM, so the probe must be a real register read. +- The probe targets the unit ID of the most recent upstream request, so it reaches the same Modbus unit the real clients successfully use. +- The probe takes a real proxy TxId and a `CorrelationMap` entry flagged `InFlightRequest.IsHeartbeat`. It is enqueued straight onto the backend outbound channel, **bypassing** the read-coalescing and response-cache paths. + +### Heartbeat response + +The backend reader recognises an `IsHeartbeat` correlation entry, refreshes the idle timer (already done on frame receipt), frees the TxId, and **drops the payload** — no rewriter, no cache write-through, no fan-out, and no round-trip-EWMA sample (the synthetic probe never pollutes the client-facing RTT metric). + +### Heartbeat timeout + +If a probe is not answered within `BackendRequestTimeoutMs`, the per-request timeout watchdog ([`./ConnectionModel.md`](./ConnectionModel.md) → "Per-Request Timeout Watchdog") finds the stale `IsHeartbeat` entry and — instead of dispatching a 0x0B exception to a (non-existent) upstream party — calls `TearDownBackendAsync`, cascading every attached upstream pipe. + +This is a **proactive** version of the existing backend-disconnect cascade: the dead path is found during idle instead of corrupting the next real client request. Reconnect stays lazy — the heartbeat keeps an *existing* backend warm, it never resurrects a dead one and adds no eager-reconnect spinner. Clients reconnect on their next request, exactly as for an organic cascade. + +`BackendHeartbeatIdleMs` must be greater than `BackendRequestTimeoutMs` (enforced by the reload validator) — a heartbeat interval at or below the request timeout would fire continuously. + +## Upstream: OS TCP keepalive + +`SocketKeepalive.Apply` is also called on each accepted client `Socket` in the `UpstreamPipe` constructor. This is the **only** standard keepalive available on the upstream side: Modbus TCP is strictly client-initiated, so the proxy — a server to its clients — cannot send an unsolicited application heartbeat to a client. OS keepalive lets the proxy's TCP stack probe each client; a dead or half-open client then faults the pipe's read loop, the pipe is disposed, and its correlation / coalescing slots are freed instead of leaking until the proxy next tries to write. + +## Counters + +Per-PLC, exposed on the status page (see [`../Operations/StatusPage.md`](../Operations/StatusPage.md)): + +| Counter | Meaning | +|---------|---------| +| `backendHeartbeatsSent` | Heartbeat probes issued on idle backend sockets. | +| `backendHeartbeatsFailed` | Probes not answered within `BackendRequestTimeoutMs`. | +| `backendIdleDisconnects` | Backend teardowns triggered by a failed heartbeat (event count — distinct from `disconnectCascades`, which counts cascaded pipes). | + +## Log events + +`mbproxy.keepalive.*` — see [`../Reference/LogEvents.md`](../Reference/LogEvents.md): + +- `mbproxy.keepalive.heartbeat.sent` (Debug) +- `mbproxy.keepalive.heartbeat.timeout` (Warning) +- `mbproxy.keepalive.backend.idle_disconnect` (Information) + +## Hot reload + +`Connection.Keepalive` is read through a live accessor (`Func`), so a reload of `appsettings.json` propagates without a listener restart: + +- The **heartbeat** interval and probe address are re-read on every loop tick. +- The **TCP socket options** are applied at connect/accept time, so a reload affects only sockets opened after the change. + +## Related documentation + +- [`./ConnectionModel.md`](./ConnectionModel.md) — backend socket lifecycle, the timeout watchdog, and the disconnect cascade this feature hooks into +- [`../Operations/Configuration.md`](../Operations/Configuration.md) — the `Connection.Keepalive` option block +- [`../Operations/StatusPage.md`](../Operations/StatusPage.md) — keepalive counters +- [`../Reference/LogEvents.md`](../Reference/LogEvents.md) — `mbproxy.keepalive.*` events +- [`../Reference/dl205.md`](../Reference/dl205.md) — the device "no keepalive" oddity and FC03/FC08 support diff --git a/mbproxy/docs/Operations/StatusPage.md b/mbproxy/docs/Operations/StatusPage.md index 262d375..7bc24d6 100644 --- a/mbproxy/docs/Operations/StatusPage.md +++ b/mbproxy/docs/Operations/StatusPage.md @@ -135,6 +135,16 @@ These two fields are Tier-2 KPIs intended for memory-budget alerts. The cache is | `backend.cacheEntryCount` | `long` | `CounterSnapshot.CacheEntryCount` | Current number of cached response entries for this PLC. | | `backend.cacheBytes` | `long` | `CounterSnapshot.CacheBytes` | Approximate byte cost of the cache entries (response payloads plus key overhead). Used to detect runaway growth from a chatty client. | +### Keepalive counters + +These fields describe the backend keepalive heartbeat. See [`../Architecture/Keepalive.md`](../Architecture/Keepalive.md). + +| JSON path | Type | Source | Meaning | +|---|---|---|---| +| `backend.backendHeartbeatsSent` | `long` | `CounterSnapshot.BackendHeartbeatsSent` | Synthetic FC03 heartbeat probes issued on this PLC's idle backend socket. | +| `backend.backendHeartbeatsFailed` | `long` | `CounterSnapshot.BackendHeartbeatsFailed` | Heartbeat probes not answered within `BackendRequestTimeoutMs`. Each failure tears the backend down. | +| `backend.backendIdleDisconnects` | `long` | `CounterSnapshot.BackendIdleDisconnects` | Backend teardowns triggered by a failed heartbeat — an event count, distinct from `disconnectCascades` (which counts cascaded pipes). Sustained growth means a PLC is repeatedly going dark while idle. | + ### Bytes | JSON path | Type | Source | Meaning | @@ -224,7 +234,10 @@ A representative two-PLC deployment, ~2 hours into a run: "cacheMissCount": 88691, "cacheInvalidations": 6203, "cacheEntryCount": 47, - "cacheBytes": 18512 + "cacheBytes": 18512, + "backendHeartbeatsSent": 412, + "backendHeartbeatsFailed": 0, + "backendIdleDisconnects": 0 }, "bytes": { "upstreamIn": 4108290, @@ -267,7 +280,10 @@ A representative two-PLC deployment, ~2 hours into a run: "cacheMissCount": 0, "cacheInvalidations": 0, "cacheEntryCount": 0, - "cacheBytes": 0 + "cacheBytes": 0, + "backendHeartbeatsSent": 0, + "backendHeartbeatsFailed": 0, + "backendIdleDisconnects": 0 }, "bytes": { "upstreamIn": 0, "upstreamOut": 0 } } @@ -282,10 +298,10 @@ The HTML renderer is `StatusHtmlRenderer.Render(StatusResponse)` in `src/Mbproxy Structure: 1. **Header summary** — version, formatted uptime (`Nh MMm SSs`), `bound/configured` listener tally, last reload timestamp, reload count with a `(N rejected)` suffix when applicable. -2. **PLC table** — one row per configured PLC. Columns: Name, Host, Port, State (colour-coded — `bound` = green, `recovering` = orange, `stopped` = grey), Clients (count plus a comma-separated list of `remote (N PDUs)`), PDUs forwarded, FC03/FC04/FC06/FC16/FC? counts, BCD slots, Partial BCD, exception codes 01/02/03/04, RTT (ms), bytes in/out, multiplexer columns (in-flight, max in-flight, TxId wraps, cascades, queue), coalescing ratio cell, cache ratio cell. +2. **PLC table** — one row per configured PLC. Columns: Name, Host, Port, State (colour-coded — `bound` = green, `recovering` = orange, `stopped` = grey), Clients (count plus a comma-separated list of `remote (N PDUs)`), PDUs forwarded, FC03/FC04/FC06/FC16/FC? counts, BCD slots, Partial BCD, exception codes 01/02/03/04, RTT (ms), bytes in/out, multiplexer columns (in-flight, max in-flight, TxId wraps, cascades, queue), coalescing ratio cell, cache ratio cell, keepalive cell. 3. **State cell error detail** — when `state == "recovering"`, the cell also shows `lastBindError` and `(attempt N)` in a small red span. -The coalescing and cache cells each render as `% ()`. When neither has been exercised (`hit + miss == 0`), the cell renders an em-dash to keep the column narrow. Page weight is bounded by the design budget (≤ 50 KB for a 54-PLC fleet). +The coalescing and cache cells each render as `% ()`. When neither has been exercised (`hit + miss == 0`), the cell renders an em-dash to keep the column narrow. The keepalive cell shows the heartbeat-sent count, with `(fail N, idle-disc N)` appended only when either is non-zero. Page weight is bounded by the design budget (≤ 50 KB for a 54-PLC fleet). The page does not depend on JavaScript. Refresh is driven entirely by the `` tag, so any browser — including text-mode browsers — sees the same view. diff --git a/mbproxy/install/mbproxy.config.template.json b/mbproxy/install/mbproxy.config.template.json index 82af6dc..5a76557 100644 --- a/mbproxy/install/mbproxy.config.template.json +++ b/mbproxy/install/mbproxy.config.template.json @@ -99,7 +99,34 @@ // Max time (ms) to wait for in-flight PDUs to complete during graceful shutdown // (sc.exe stop / Windows Service stop signal). After this deadline the coordinator // cancels remaining work and proceeds. Keep at or below the SCM wait-hint (30 s). - "GracefulShutdownTimeoutMs": 10000 + "GracefulShutdownTimeoutMs": 10000, + + // ── Keepalive / connection monitoring ─────────────────────────────────── + // The DL205/DL260 ECOM does not emit TCP keepalives, so an idle backend + // socket can be silently dropped by a middlebox (switch, firewall, NAT) + // after 2-5 minutes. This section enables OS-level SO_KEEPALIVE on both + // backend and upstream sockets, and drives a periodic Modbus FC03 heartbeat + // on each idle backend socket so a dead path is detected before a real + // client request hits it. See docs/Architecture/Keepalive.md. + "Keepalive": { + // Master switch. false → no SO_KEEPALIVE and no heartbeat; the proxy + // behaves exactly as a pre-keepalive build. + "Enabled": true, + + // SO_KEEPALIVE: idle time (ms) before the OS sends its first probe. + "TcpIdleTimeMs": 30000, + // SO_KEEPALIVE: interval (ms) between probes once the idle time elapses. + "TcpProbeIntervalMs": 5000, + // SO_KEEPALIVE: unanswered probes before the OS declares the socket dead. + "TcpProbeCount": 4, + + // Backend heartbeat: after this much backend idle (ms) the proxy issues a + // synthetic FC03 qty=1 read to keep the path warm and prove the ECOM is + // still answering Modbus. Must be greater than BackendRequestTimeoutMs. + "BackendHeartbeatIdleMs": 30000, + // FC03 PDU address the heartbeat reads. 0 = V0, valid on DL205/DL260. + "BackendHeartbeatProbeAddress": 0 + } }, // ── Resilience policies ───────────────────────────────────────────────────────────── diff --git a/mbproxy/src/Mbproxy/Admin/StatusDto.cs b/mbproxy/src/Mbproxy/Admin/StatusDto.cs index aeef78b..c8551bf 100644 --- a/mbproxy/src/Mbproxy/Admin/StatusDto.cs +++ b/mbproxy/src/Mbproxy/Admin/StatusDto.cs @@ -103,7 +103,13 @@ public sealed record PlcBackendStatus( long CacheMissCount, long CacheInvalidations, long CacheEntryCount, - long CacheBytes); + long CacheBytes, + /// Backend keepalive heartbeat probes issued on idle backend sockets. + long BackendHeartbeatsSent, + /// Keepalive heartbeat probes that timed out (backend not answering). + long BackendHeartbeatsFailed, + /// Backend teardowns triggered by a failed keepalive heartbeat. + long BackendIdleDisconnects); /// Modbus exception counts by code. public sealed record ExceptionCounts( diff --git a/mbproxy/src/Mbproxy/Admin/StatusHtmlRenderer.cs b/mbproxy/src/Mbproxy/Admin/StatusHtmlRenderer.cs index 2ece74c..1598be9 100644 --- a/mbproxy/src/Mbproxy/Admin/StatusHtmlRenderer.cs +++ b/mbproxy/src/Mbproxy/Admin/StatusHtmlRenderer.cs @@ -88,6 +88,9 @@ internal static class StatusHtmlRenderer // an em-dash when no cache-eligible reads have occurred. Page-weight budget // assertion stays under 50 KB for the 54-PLC fleet. sb.Append("Cache"); + // Keepalive column — heartbeats sent, with failure / idle-disconnect counts + // shown only when non-zero. + sb.Append("Keepalive"); sb.Append(""); foreach (var plc in status.Plcs) @@ -185,6 +188,24 @@ internal static class StatusHtmlRenderer sb.Append(pct).Append("% (").Append(cacheHit).Append(')'); } sb.Append(""); + // Keepalive cell — heartbeats sent; failures + idle-disconnects appended + // only when non-zero to keep the cell narrow. + long hbSent = plc.Backend.BackendHeartbeatsSent; + long hbFailed = plc.Backend.BackendHeartbeatsFailed; + long hbIdle = plc.Backend.BackendIdleDisconnects; + sb.Append(""); + if (hbSent == 0 && hbFailed == 0 && hbIdle == 0) + { + sb.Append("—"); + } + else + { + sb.Append(hbSent); + if (hbFailed > 0 || hbIdle > 0) + sb.Append(" (fail ").Append(hbFailed) + .Append(", idle-disc ").Append(hbIdle).Append(')'); + } + sb.Append(""); sb.Append(""); } diff --git a/mbproxy/src/Mbproxy/Admin/StatusSnapshotBuilder.cs b/mbproxy/src/Mbproxy/Admin/StatusSnapshotBuilder.cs index 3e9857f..0027920 100644 --- a/mbproxy/src/Mbproxy/Admin/StatusSnapshotBuilder.cs +++ b/mbproxy/src/Mbproxy/Admin/StatusSnapshotBuilder.cs @@ -108,7 +108,10 @@ internal sealed class StatusSnapshotBuilder CacheInvalidations: 0, CacheEntryCount: 0, CacheBytes: 0, - ResponseDropForFullUpstream: 0); + ResponseDropForFullUpstream: 0, + BackendHeartbeatsSent: 0, + BackendHeartbeatsFailed: 0, + BackendIdleDisconnects: 0); long connectsSuccess = counters.ConnectsSuccess; long connectsFailed = counters.ConnectsFailed; @@ -152,7 +155,10 @@ internal sealed class StatusSnapshotBuilder CacheMissCount: counters.CacheMissCount, CacheInvalidations: counters.CacheInvalidations, CacheEntryCount: counters.CacheEntryCount, - CacheBytes: counters.CacheBytes), + CacheBytes: counters.CacheBytes, + BackendHeartbeatsSent: counters.BackendHeartbeatsSent, + BackendHeartbeatsFailed: counters.BackendHeartbeatsFailed, + BackendIdleDisconnects: counters.BackendIdleDisconnects), Bytes: new PlcBytesStatus( UpstreamIn: counters.BytesUpstreamIn, UpstreamOut: counters.BytesUpstreamOut))); diff --git a/mbproxy/src/Mbproxy/Configuration/ConfigReconciler.cs b/mbproxy/src/Mbproxy/Configuration/ConfigReconciler.cs index 119b460..f124c0e 100644 --- a/mbproxy/src/Mbproxy/Configuration/ConfigReconciler.cs +++ b/mbproxy/src/Mbproxy/Configuration/ConfigReconciler.cs @@ -61,6 +61,10 @@ internal sealed partial class ConfigReconciler : IDisposable // and a hot-reload of `Enabled = false` would not propagate to them. private Func? _coalescingAccessor; + // Live accessor for KeepaliveOptions, threaded through Attach so PLCs added or + // restarted via hot-reload honour the current `Connection.Keepalive` values. + private Func? _keepaliveAccessor; + // ── Debounce + serialisation machinery ─────────────────────────────────────────────── // Channel carries Unit to signal "something changed — please check". @@ -121,11 +125,13 @@ internal sealed partial class ConfigReconciler : IDisposable public void Attach( ConcurrentDictionary supervisors, MbproxyOptions initialOptions, - Func? coalescingAccessor = null) + Func? coalescingAccessor = null, + Func? keepaliveAccessor = null) { _supervisors = supervisors; _currentOptions = initialOptions; _coalescingAccessor = coalescingAccessor; + _keepaliveAccessor = keepaliveAccessor; } // ── ApplyAsync (exposed for tests) ─────────────────────────────────────────────────── @@ -315,7 +321,8 @@ internal sealed partial class ConfigReconciler : IDisposable recoveryPipeline, _loggerFactory.CreateLogger(), backendPipeline, - _coalescingAccessor); + _coalescingAccessor, + _keepaliveAccessor); _supervisors[name] = newSupervisor; await newSupervisor.StartAsync(ct).ConfigureAwait(false); @@ -401,7 +408,8 @@ internal sealed partial class ConfigReconciler : IDisposable recoveryPipeline, _loggerFactory.CreateLogger(), backendPipeline, - _coalescingAccessor); + _coalescingAccessor, + _keepaliveAccessor); _supervisors[plcNew.Name] = newSupervisor; await newSupervisor.StartAsync(ct).ConfigureAwait(false); diff --git a/mbproxy/src/Mbproxy/Configuration/ReloadValidator.cs b/mbproxy/src/Mbproxy/Configuration/ReloadValidator.cs index 95b1eca..50a83ec 100644 --- a/mbproxy/src/Mbproxy/Configuration/ReloadValidator.cs +++ b/mbproxy/src/Mbproxy/Configuration/ReloadValidator.cs @@ -141,6 +141,27 @@ internal static class ReloadValidator errs.Add( $"Connection.GracefulShutdownTimeoutMs must be > 0; got {next.Connection.GracefulShutdownTimeoutMs}."); + // ── 6. Keepalive section ────────────────────────────────────────────── + // Schema bounds are also checked in MbproxyOptionsValidator; re-checking here keeps + // the hot-reload gate self-contained. The cross-field rule (heartbeat interval must + // sit above the request timeout, or it would fire continuously) lives only here. + var ka = next.Connection.Keepalive; + if (ka.TcpIdleTimeMs <= 0) + errs.Add($"Connection.Keepalive.TcpIdleTimeMs must be > 0; got {ka.TcpIdleTimeMs}."); + if (ka.TcpProbeIntervalMs <= 0) + errs.Add($"Connection.Keepalive.TcpProbeIntervalMs must be > 0; got {ka.TcpProbeIntervalMs}."); + if (ka.TcpProbeCount <= 0) + errs.Add($"Connection.Keepalive.TcpProbeCount must be > 0; got {ka.TcpProbeCount}."); + if (ka.BackendHeartbeatProbeAddress is < 0 or > 65535) + errs.Add( + $"Connection.Keepalive.BackendHeartbeatProbeAddress must be in [0, 65535]; " + + $"got {ka.BackendHeartbeatProbeAddress}."); + if (ka.BackendHeartbeatIdleMs <= next.Connection.BackendRequestTimeoutMs) + errs.Add( + $"Connection.Keepalive.BackendHeartbeatIdleMs ({ka.BackendHeartbeatIdleMs}) must be greater " + + $"than Connection.BackendRequestTimeoutMs ({next.Connection.BackendRequestTimeoutMs}); " + + "a heartbeat interval at or below the request timeout would fire continuously."); + errors = errs; return errs.Count == 0; } diff --git a/mbproxy/src/Mbproxy/Options/ConnectionOptions.cs b/mbproxy/src/Mbproxy/Options/ConnectionOptions.cs index ea83d8b..64f2faf 100644 --- a/mbproxy/src/Mbproxy/Options/ConnectionOptions.cs +++ b/mbproxy/src/Mbproxy/Options/ConnectionOptions.cs @@ -9,4 +9,9 @@ public sealed class ConnectionOptions /// graceful shutdown before cancelling them. Default: 10000 (10 s). /// public int GracefulShutdownTimeoutMs { get; init; } = 10000; + + /// + /// TCP keepalive and backend-heartbeat connection-monitoring settings. Enabled by default. + /// + public KeepaliveOptions Keepalive { get; init; } = new(); } diff --git a/mbproxy/src/Mbproxy/Options/KeepaliveOptions.cs b/mbproxy/src/Mbproxy/Options/KeepaliveOptions.cs new file mode 100644 index 0000000..fbcf46e --- /dev/null +++ b/mbproxy/src/Mbproxy/Options/KeepaliveOptions.cs @@ -0,0 +1,52 @@ +namespace Mbproxy.Options; + +/// +/// TCP keepalive and application-level connection-monitoring settings. +/// +/// The DL205/DL260 ECOM does not emit TCP keepalives, so an idle backend socket can be +/// silently dropped by a middlebox (switch, firewall, NAT) after 2-5 minutes. These knobs +/// (a) enable OS-level SO_KEEPALIVE on both backend and accepted upstream sockets and +/// (b) drive a periodic Modbus FC03 heartbeat on each idle backend socket so the path stays +/// warm and a dead ECOM is detected before a real client request hits it. +/// +public sealed class KeepaliveOptions +{ + /// + /// Master switch. When false, neither SO_KEEPALIVE nor the backend heartbeat + /// is applied and the proxy behaves exactly as a pre-keepalive build. Default: true. + /// + public bool Enabled { get; init; } = true; + + /// + /// SO_KEEPALIVE idle time in milliseconds — how long a socket may be idle before the + /// OS sends its first keepalive probe. Applied to backend and accepted upstream sockets. + /// Default: 30000 (30 s). + /// + public int TcpIdleTimeMs { get; init; } = 30000; + + /// + /// SO_KEEPALIVE interval in milliseconds between keepalive probes once the idle time + /// has elapsed. Default: 5000 (5 s). + /// + public int TcpProbeIntervalMs { get; init; } = 5000; + + /// + /// SO_KEEPALIVE probe count — unanswered probes before the OS declares the socket + /// dead. Default: 4. + /// + public int TcpProbeCount { get; init; } = 4; + + /// + /// Backend application heartbeat: after this many milliseconds with no backend traffic, the + /// multiplexer issues a synthetic FC03 qty=1 read to keep the socket warm and prove the ECOM + /// is still answering Modbus. Must be greater than . + /// Default: 30000 (30 s). + /// + public int BackendHeartbeatIdleMs { get; init; } = 30000; + + /// + /// Modbus PDU address read by the backend heartbeat FC03 probe. Address 0 (V0) is valid on + /// DL205/DL260 in factory absolute mode. Default: 0. + /// + public int BackendHeartbeatProbeAddress { get; init; } = 0; +} diff --git a/mbproxy/src/Mbproxy/Options/MbproxyOptions.cs b/mbproxy/src/Mbproxy/Options/MbproxyOptions.cs index 58c22a5..92aab38 100644 --- a/mbproxy/src/Mbproxy/Options/MbproxyOptions.cs +++ b/mbproxy/src/Mbproxy/Options/MbproxyOptions.cs @@ -106,6 +106,22 @@ public sealed class MbproxyOptionsValidator : IValidateOptions errors.Add( $"Connection.GracefulShutdownTimeoutMs must be > 0; got {options.Connection.GracefulShutdownTimeoutMs}."); + // Keepalive section ranges. Cross-field rules (heartbeat interval vs request + // timeout) are enforced in ReloadValidator. + var ka = options.Connection.Keepalive; + if (ka.TcpIdleTimeMs <= 0) + errors.Add($"Connection.Keepalive.TcpIdleTimeMs must be > 0; got {ka.TcpIdleTimeMs}."); + if (ka.TcpProbeIntervalMs <= 0) + errors.Add($"Connection.Keepalive.TcpProbeIntervalMs must be > 0; got {ka.TcpProbeIntervalMs}."); + if (ka.TcpProbeCount <= 0) + errors.Add($"Connection.Keepalive.TcpProbeCount must be > 0; got {ka.TcpProbeCount}."); + if (ka.BackendHeartbeatIdleMs <= 0) + errors.Add($"Connection.Keepalive.BackendHeartbeatIdleMs must be > 0; got {ka.BackendHeartbeatIdleMs}."); + if (ka.BackendHeartbeatProbeAddress is < 0 or > 65535) + errors.Add( + $"Connection.Keepalive.BackendHeartbeatProbeAddress must be in [0, 65535]; " + + $"got {ka.BackendHeartbeatProbeAddress}."); + return errors.Count > 0 ? ValidateOptionsResult.Fail(errors) : ValidateOptionsResult.Success; diff --git a/mbproxy/src/Mbproxy/Proxy/Multiplexing/InFlightRequest.cs b/mbproxy/src/Mbproxy/Proxy/Multiplexing/InFlightRequest.cs index 671b346..3108c6e 100644 --- a/mbproxy/src/Mbproxy/Proxy/Multiplexing/InFlightRequest.cs +++ b/mbproxy/src/Mbproxy/Proxy/Multiplexing/InFlightRequest.cs @@ -28,6 +28,12 @@ internal sealed record InterestedParty(UpstreamPipe Pipe, ushort OriginalTxId); /// read coalescing uses to fan out a single PLC response to multiple upstream clients. /// Reviewer note: do not simplify back to a single UpstreamPipe field. /// +/// +/// true for the synthetic FC03 keepalive probe issued by the backend heartbeat +/// loop. Heartbeat entries carry no : the backend reader +/// drops the response (no fan-out, no rewriter, no cache) and the timeout watchdog tears +/// the backend down instead of dispatching a 0x0B exception. Defaults to false. +/// internal sealed record InFlightRequest( byte UnitId, byte Fc, @@ -35,4 +41,5 @@ internal sealed record InFlightRequest( ushort Qty, IReadOnlyList InterestedParties, DateTimeOffset SentAtUtc, - int ResolvedCacheTtlMs = 0); + int ResolvedCacheTtlMs = 0, + bool IsHeartbeat = false); diff --git a/mbproxy/src/Mbproxy/Proxy/Multiplexing/KeepaliveLogEvents.cs b/mbproxy/src/Mbproxy/Proxy/Multiplexing/KeepaliveLogEvents.cs new file mode 100644 index 0000000..f7628ed --- /dev/null +++ b/mbproxy/src/Mbproxy/Proxy/Multiplexing/KeepaliveLogEvents.cs @@ -0,0 +1,54 @@ +namespace Mbproxy.Proxy.Multiplexing; + +/// +/// Source-generated definitions for the backend keepalive +/// heartbeat. Event names are stable — do not rename without updating +/// docs/Reference/LogEvents.md's event-name table. +/// +internal static partial class KeepaliveLogEvents +{ + /// + /// Emitted each time the heartbeat loop issues a synthetic FC03 probe on an idle + /// backend socket. Debug level — one per BackendHeartbeatIdleMs per idle PLC. + /// + [LoggerMessage( + EventId = 150, + EventName = "mbproxy.keepalive.heartbeat.sent", + Level = LogLevel.Debug, + Message = "Keepalive heartbeat sent: Plc={Plc} ProxyTxId={ProxyTxId} Address={Address}")] + public static partial void HeartbeatSent( + ILogger logger, + string plc, + ushort proxyTxId, + ushort address); + + /// + /// Emitted when a keepalive heartbeat probe is not answered within + /// BackendRequestTimeoutMs. The backend is connected-but-not-answering; the + /// multiplexer tears it down (see ). + /// + [LoggerMessage( + EventId = 151, + EventName = "mbproxy.keepalive.heartbeat.timeout", + Level = LogLevel.Warning, + Message = "Keepalive heartbeat timed out: Plc={Plc} ProxyTxId={ProxyTxId} ElapsedMs={ElapsedMs}")] + public static partial void HeartbeatTimeout( + ILogger logger, + string plc, + ushort proxyTxId, + long elapsedMs); + + /// + /// Emitted when a failed keepalive heartbeat triggers a proactive backend teardown. + /// Every attached upstream pipe is cascaded; clients reconnect on their next request. + /// + [LoggerMessage( + EventId = 152, + EventName = "mbproxy.keepalive.backend.idle_disconnect", + Level = LogLevel.Information, + Message = "Backend torn down by keepalive: Plc={Plc} HeartbeatElapsedMs={ElapsedMs}")] + public static partial void BackendIdleDisconnect( + ILogger logger, + string plc, + long elapsedMs); +} diff --git a/mbproxy/src/Mbproxy/Proxy/Multiplexing/PlcMultiplexer.cs b/mbproxy/src/Mbproxy/Proxy/Multiplexing/PlcMultiplexer.cs index a387e12..f605e43 100644 --- a/mbproxy/src/Mbproxy/Proxy/Multiplexing/PlcMultiplexer.cs +++ b/mbproxy/src/Mbproxy/Proxy/Multiplexing/PlcMultiplexer.cs @@ -61,6 +61,12 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi // `() => optionsMonitor.CurrentValue.Resilience.ReadCoalescing`. Tests default to a // fresh `ReadCoalescingOptions()` (Enabled = true, MaxParties = 32). private readonly Func _coalescingOptions; + // Live keepalive config accessor. Read at backend-connect time (TCP SO_KEEPALIVE) and + // on each heartbeat-loop tick (idle threshold + probe address) so a hot-reload of + // `Connection.Keepalive` propagates without a listener restart. Production wires this + // to `() => optionsMonitor.CurrentValue.Connection.Keepalive`; the fallback reads the + // construction-time `ConnectionOptions` snapshot. + private readonly Func _keepaliveOptions; private readonly TxIdAllocator _allocator = new(); private readonly CorrelationMap _correlation = new(); @@ -86,6 +92,19 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi private CancellationTokenSource? _backendCts; private Task? _backendWriterTask; private Task? _backendReaderTask; + private Task? _backendHeartbeatTask; + + // UTC ticks of the last backend socket activity (a send OR a received frame). Updated + // by the writer and reader tasks; read by the heartbeat loop to decide whether the + // socket has been idle long enough to warrant a probe. Interlocked for cross-task + // coherence. + private long _lastBackendActivityTicks; + + // Unit ID of the most recent upstream request. The synthetic heartbeat reuses it so + // the probe targets the same Modbus unit the real clients successfully talk to. + // Defaults to 0 until the first upstream frame is seen; by the time a heartbeat can + // fire the backend socket exists, which means at least one upstream frame arrived. + private int _lastSeenUnitId; private readonly CancellationTokenSource _disposeCts = new(); // Volatile so the disposing thread's write is observed by every hot-path reader @@ -102,7 +121,8 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi PerPlcContext perPlcContext, ILogger logger, ResiliencePipeline? backendConnectPipeline = null, - Func? coalescingOptions = null) + Func? coalescingOptions = null, + Func? keepaliveOptions = null) { _plc = plc; _connectionOptions = connectionOptions; @@ -111,6 +131,7 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi _logger = logger; _backendConnectPipeline = backendConnectPipeline; _coalescingOptions = coalescingOptions ?? (static () => new ReadCoalescingOptions()); + _keepaliveOptions = keepaliveOptions ?? (() => _connectionOptions.Keepalive); // Register the per-PLC cache as the live stats source for the snapshot path. // Cache may be null when the per-PLC context has not been wired with one @@ -282,6 +303,7 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi // Build a fresh backend socket and Polly-connect. var backend = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp) { NoDelay = true }; + SocketKeepalive.Apply(backend, _keepaliveOptions()); try { @@ -318,8 +340,11 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi { _backendSocket = backend; _backendCts = cts2; + // Seed the idle timer so the heartbeat loop measures idleness from connect. + Interlocked.Exchange(ref _lastBackendActivityTicks, DateTime.UtcNow.Ticks); _backendWriterTask = Task.Run(() => RunBackendWriterAsync(backend, cts2.Token), CancellationToken.None); _backendReaderTask = Task.Run(() => RunBackendReaderAsync(backend, cts2.Token), CancellationToken.None); + _backendHeartbeatTask = Task.Run(() => RunBackendHeartbeatAsync(cts2.Token), CancellationToken.None); } _ctx.Counters.IncrementConnectSuccess(); @@ -381,18 +406,20 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi { Socket? oldSocket; CancellationTokenSource? oldCts; - Task? writer, reader; + Task? writer, reader, heartbeat; lock (_backendLock) { oldSocket = _backendSocket; oldCts = _backendCts; writer = _backendWriterTask; reader = _backendReaderTask; + heartbeat = _backendHeartbeatTask; _backendSocket = null; _backendCts = null; _backendWriterTask = null; _backendReaderTask = null; + _backendHeartbeatTask = null; } if (oldSocket is null && oldCts is null) return; @@ -454,6 +481,7 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi // Best-effort join. try { if (writer is not null) await writer.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(false); } catch { /* swallow */ } try { if (reader is not null) await reader.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(false); } catch { /* swallow */ } + try { if (heartbeat is not null) await heartbeat.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(false); } catch { /* swallow */ } oldCts?.Dispose(); @@ -489,6 +517,9 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi if (n == 0) throw new SocketException((int)SocketError.ConnectionReset); sent += n; } + + // A send counts as backend activity — it suppresses the idle heartbeat. + Interlocked.Exchange(ref _lastBackendActivityTicks, DateTime.UtcNow.Ticks); } } catch (OperationCanceledException) @@ -542,6 +573,10 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi if (!await FillAsync(backend, frame, MbapFrame.HeaderSize, pduBodyLen, ct).ConfigureAwait(false)) break; + // A received frame counts as backend activity — it suppresses (and, for a + // heartbeat response, satisfies) the idle heartbeat. + Interlocked.Exchange(ref _lastBackendActivityTicks, DateTime.UtcNow.Ticks); + if (!_correlation.TryRemove(proxyTxId, out var inFlight)) { // No correlation entry — either a stale response after cascade, or @@ -552,6 +587,14 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi // Free the allocator slot immediately so it can be reused. _allocator.Release(proxyTxId); + // Keepalive heartbeat response — the probe came back, the backend is alive. + // The activity timestamp was already refreshed above. There is no upstream + // party, no cache eligibility, and no rewriting to do: drop the payload and + // skip the EWMA update so the synthetic probe never pollutes the + // client-facing round-trip metric. + if (inFlight.IsHeartbeat) + continue; + // For FC03/FC04 reads, also clear the coalescing-by-key entry so a // brand-new identical request issued AFTER this response is treated as a // miss (opens a fresh round-trip). The TryRemove is best-effort: a @@ -727,6 +770,10 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi out ushort originalTxId, out _, out _, out byte unitId)) return; + // Remember the unit ID so the backend keepalive heartbeat probes the same Modbus + // unit the real clients are known to reach successfully. + Volatile.Write(ref _lastSeenUnitId, unitId); + // Count inbound bytes from the upstream client. Surfaces in bytes.upstreamIn on // the status page. Counted ONCE per parsed frame regardless of subsequent // routing (cache hit, coalesce, backend round-trip, exception). @@ -1062,6 +1109,23 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi _allocator.Release(proxyTxId); + // Keepalive heartbeat that never came back. The backend is no longer + // answering Modbus even though the socket may still look connected — + // tear it down proactively (cascading every attached pipe) so the + // failure is found here, during idle, instead of corrupting the next + // real client request. There is no upstream party to send a 0x0B to. + if (req.IsHeartbeat) + { + long hbElapsedMs = (long)(DateTimeOffset.UtcNow - req.SentAtUtc).TotalMilliseconds; + KeepaliveLogEvents.HeartbeatTimeout(_logger, _plc.Name, proxyTxId, hbElapsedMs); + _ctx.Counters.IncrementBackendHeartbeatFailed(); + _ctx.Counters.IncrementBackendIdleDisconnect(); + KeepaliveLogEvents.BackendIdleDisconnect(_logger, _plc.Name, hbElapsedMs); + if (!_disposeCts.IsCancellationRequested) + _ = TearDownBackendAsync("keepalive heartbeat timeout", cascadeUpstreams: true); + continue; + } + // Also clear the coalescing-by-key entry. A late attach that raced // in just before the watchdog claim will still receive the 0x0B // exception via this entry's InterestedParties list (List @@ -1110,6 +1174,124 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi } } + // ── Backend keepalive heartbeat ─────────────────────────────────────────── + + /// + /// Backend keepalive heartbeat loop. Started alongside the writer/reader on each + /// successful connect and cancelled with them on teardown. While the backend socket + /// has been idle (no send or receive) for longer than + /// , it issues a synthetic FC03 + /// qty=1 read so the path stays warm against middlebox idle-drop and a backend that is + /// connected-but-not-answering is detected here rather than on the next client request. + /// + /// The probe response is consumed by (which + /// recognises and drops it); a probe that + /// never returns is timed out by , which + /// tears the backend down. The heartbeat keeps an existing backend warm — it + /// never resurrects a dead one (reconnect stays gated on the next upstream frame). + /// + private async Task RunBackendHeartbeatAsync(CancellationToken ct) + { + try + { + while (!ct.IsCancellationRequested) + { + var ka = _keepaliveOptions(); + int idleMs = Math.Max(1000, ka.BackendHeartbeatIdleMs); + // Tick at a quarter of the idle window so a freshly-elapsed idle period is + // noticed promptly, floored at 500 ms so the loop never busy-wakes. + int tickMs = Math.Max(500, idleMs / 4); + await Task.Delay(tickMs, ct).ConfigureAwait(false); + + if (!ka.Enabled) + continue; + + long lastTicks = Interlocked.Read(ref _lastBackendActivityTicks); + double idleElapsedMs = + (DateTime.UtcNow - new DateTime(lastTicks, DateTimeKind.Utc)).TotalMilliseconds; + if (idleElapsedMs < idleMs) + continue; + + SendHeartbeat(ka); + } + } + catch (OperationCanceledException) + { + // Normal teardown. + } + catch (Exception ex) + { + _logger.LogError(ex, "Backend heartbeat loop faulted: Plc={Plc}", _plc.Name); + } + } + + /// + /// Builds and enqueues one synthetic FC03 qty=1 heartbeat request onto the backend + /// outbound channel. The correlation entry is flagged + /// so the reader and watchdog treat it specially; it carries no interested parties and + /// bypasses the coalescing and cache paths entirely. + /// + private void SendHeartbeat(KeepaliveOptions ka) + { + // A saturated TxId space means the backend is busy (65,536 requests in flight), + // which is the opposite of idle — skip this tick rather than force a probe. + if (!_allocator.TryAllocate(out ushort proxyTxId)) + return; + + byte unitId = (byte)Volatile.Read(ref _lastSeenUnitId); + ushort address = (ushort)ka.BackendHeartbeatProbeAddress; + + var inFlight = new InFlightRequest( + UnitId: unitId, + Fc: 0x03, + StartAddress: address, + Qty: 1, + InterestedParties: Array.Empty(), + SentAtUtc: DateTimeOffset.UtcNow, + ResolvedCacheTtlMs: 0, + IsHeartbeat: true); + + if (!_correlation.TryAdd(proxyTxId, inFlight)) + { + _allocator.Release(proxyTxId); + return; + } + + byte[] frame = BuildHeartbeatFrame(proxyTxId, unitId, address); + + // Non-blocking enqueue: if the channel is full the backend is not idle (a race), and + // if it is completed the backend is tearing down — either way, undo and skip. + if (!_outboundChannel.Writer.TryWrite(frame)) + { + if (_correlation.TryRemove(proxyTxId, out _)) + _allocator.Release(proxyTxId); + return; + } + + _ctx.Counters.IncrementBackendHeartbeatSent(); + KeepaliveLogEvents.HeartbeatSent(_logger, _plc.Name, proxyTxId, address); + } + + /// + /// Builds a 12-byte MBAP-framed FC03 (Read Holding Registers) request reading one + /// register at — the keepalive heartbeat probe PDU. + /// + private static byte[] BuildHeartbeatFrame(ushort proxyTxId, byte unitId, ushort address) + { + // PDU = [fc=03][addrHi][addrLo][qtyHi][qtyLo]. MBAP length = UnitId(1) + PDU(5) = 6. + var frame = new byte[MbapFrame.HeaderSize + 5]; + frame[0] = (byte)(proxyTxId >> 8); + frame[1] = (byte)(proxyTxId & 0xFF); + frame[2] = 0; frame[3] = 0; // ProtocolId + frame[4] = 0; frame[5] = 6; // Length + frame[6] = unitId; + frame[7] = 0x03; // FC03 Read Holding Registers + frame[8] = (byte)(address >> 8); + frame[9] = (byte)(address & 0xFF); + frame[10] = 0; frame[11] = 1; // Qty = 1 + return frame; + } + // ── Helpers ─────────────────────────────────────────────────────────────── /// diff --git a/mbproxy/src/Mbproxy/Proxy/Multiplexing/UpstreamPipe.cs b/mbproxy/src/Mbproxy/Proxy/Multiplexing/UpstreamPipe.cs index 878027a..6d8a014 100644 --- a/mbproxy/src/Mbproxy/Proxy/Multiplexing/UpstreamPipe.cs +++ b/mbproxy/src/Mbproxy/Proxy/Multiplexing/UpstreamPipe.cs @@ -1,6 +1,7 @@ using System.Net; using System.Net.Sockets; using System.Threading.Channels; +using Mbproxy.Options; namespace Mbproxy.Proxy.Multiplexing; @@ -77,10 +78,15 @@ internal sealed partial class UpstreamPipe : IAsyncDisposable /// public bool IsAlive => !_disposed && !_cts.IsCancellationRequested; - public UpstreamPipe(Socket upstream, string plcName, ILogger logger) + public UpstreamPipe(Socket upstream, string plcName, ILogger logger, KeepaliveOptions? keepalive = null) { _upstream = upstream; _upstream.NoDelay = true; + // Enable OS TCP keepalive on the accepted client socket so a half-open/dead + // client (gone without a TCP FIN) faults the read loop and is reaped, instead of + // leaking a pipe + correlation slots until the proxy next tries to write to it. + if (keepalive is not null) + SocketKeepalive.Apply(_upstream, keepalive); RemoteEp = upstream.RemoteEndPoint as IPEndPoint; _plcName = plcName; _logger = logger; diff --git a/mbproxy/src/Mbproxy/Proxy/PlcListener.cs b/mbproxy/src/Mbproxy/Proxy/PlcListener.cs index d67dae2..5b97a72 100644 --- a/mbproxy/src/Mbproxy/Proxy/PlcListener.cs +++ b/mbproxy/src/Mbproxy/Proxy/PlcListener.cs @@ -30,6 +30,10 @@ internal sealed partial class PlcListener : IAsyncDisposable private readonly PerPlcContext? _perPlcContext; private readonly ResiliencePipeline? _backendConnectPipeline; private readonly Func? _coalescingOptions; + // Live keepalive accessor (TCP SO_KEEPALIVE on accepted upstream sockets + the backend + // heartbeat). Non-null after construction — falls back to the construction-time + // ConnectionOptions snapshot when no live accessor is supplied. + private readonly Func _keepaliveOptions; private TcpListener? _listener; private PlcMultiplexer? _multiplexer; @@ -62,7 +66,8 @@ internal sealed partial class PlcListener : IAsyncDisposable ILogger pipeLogger, PerPlcContext? perPlcContext = null, ResiliencePipeline? backendConnectPipeline = null, - Func? coalescingOptions = null) + Func? coalescingOptions = null, + Func? keepaliveOptions = null) { _plc = plc; _connectionOptions = connectionOptions; @@ -73,6 +78,7 @@ internal sealed partial class PlcListener : IAsyncDisposable _perPlcContext = perPlcContext; _backendConnectPipeline = backendConnectPipeline; _coalescingOptions = coalescingOptions; + _keepaliveOptions = keepaliveOptions ?? (() => _connectionOptions.Keepalive); } /// @@ -103,7 +109,8 @@ internal sealed partial class PlcListener : IAsyncDisposable ctx, _multiplexerLogger, _backendConnectPipeline, - _coalescingOptions); + _coalescingOptions, + _keepaliveOptions); } /// @@ -125,7 +132,7 @@ internal sealed partial class PlcListener : IAsyncDisposable { Socket upstream = await _listener.AcceptSocketAsync(ct).ConfigureAwait(false); - var pipe = new UpstreamPipe(upstream, _plc.Name, _pipeLogger); + var pipe = new UpstreamPipe(upstream, _plc.Name, _pipeLogger, _keepaliveOptions()); var pipeTask = Task.Run(async () => { try diff --git a/mbproxy/src/Mbproxy/Proxy/ProxyCounters.cs b/mbproxy/src/Mbproxy/Proxy/ProxyCounters.cs index f32953d..543eebe 100644 --- a/mbproxy/src/Mbproxy/Proxy/ProxyCounters.cs +++ b/mbproxy/src/Mbproxy/Proxy/ProxyCounters.cs @@ -133,7 +133,24 @@ public sealed record CounterSnapshot( /// socket fast enough to keep up with the backend; the wedged client loses its own /// responses but its peers on the same PLC continue to receive theirs. /// - long ResponseDropForFullUpstream); + long ResponseDropForFullUpstream, + /// + /// Cumulative count of backend keepalive heartbeat probes issued (synthetic FC03 + /// qty=1 reads sent on an idle backend socket). + /// + long BackendHeartbeatsSent, + /// + /// Cumulative count of backend keepalive heartbeat probes that were not answered + /// within BackendRequestTimeoutMs. Each failure triggers a proactive backend + /// teardown (see ). + /// + long BackendHeartbeatsFailed, + /// + /// Cumulative count of backend teardowns triggered by a failed keepalive heartbeat. + /// Distinct from (which counts cascaded + /// pipes); this counts the disconnect events attributed to keepalive. + /// + long BackendIdleDisconnects); /// /// Thread-safe per-PLC counters backed by longs. @@ -184,6 +201,11 @@ internal sealed class ProxyCounters // and account. private long _responseDropForFullUpstream; + // Backend keepalive heartbeat counters. + private long _backendHeartbeatsSent; + private long _backendHeartbeatsFailed; + private long _backendIdleDisconnects; + // Live cache state pulled from a per-PLC ResponseCache on each snapshot. The // multiplexer registers a single provider via SetCacheStatsProvider so the status // page sees current entry-count / bytes without a separate poll. @@ -315,6 +337,18 @@ internal sealed class ProxyCounters public void IncrementResponseDropForFullUpstream() => Interlocked.Increment(ref _responseDropForFullUpstream); + /// Records one backend keepalive heartbeat probe sent. + public void IncrementBackendHeartbeatSent() + => Interlocked.Increment(ref _backendHeartbeatsSent); + + /// Records one backend keepalive heartbeat probe that timed out. + public void IncrementBackendHeartbeatFailed() + => Interlocked.Increment(ref _backendHeartbeatsFailed); + + /// Records one backend teardown triggered by a failed keepalive heartbeat. + public void IncrementBackendIdleDisconnect() + => Interlocked.Increment(ref _backendIdleDisconnects); + /// /// Wires the per-PLC as the live stats source for /// the snapshot path. Pass null to detach during disposal. @@ -445,7 +479,10 @@ internal sealed class ProxyCounters CacheInvalidations: Interlocked.Read(ref _cacheInvalidations), CacheEntryCount: cacheEntries, CacheBytes: cacheBytes, - ResponseDropForFullUpstream: Interlocked.Read(ref _responseDropForFullUpstream)); + ResponseDropForFullUpstream: Interlocked.Read(ref _responseDropForFullUpstream), + BackendHeartbeatsSent: Interlocked.Read(ref _backendHeartbeatsSent), + BackendHeartbeatsFailed: Interlocked.Read(ref _backendHeartbeatsFailed), + BackendIdleDisconnects: Interlocked.Read(ref _backendIdleDisconnects)); } } diff --git a/mbproxy/src/Mbproxy/Proxy/ProxyWorker.cs b/mbproxy/src/Mbproxy/Proxy/ProxyWorker.cs index ec7d735..138e737 100644 --- a/mbproxy/src/Mbproxy/Proxy/ProxyWorker.cs +++ b/mbproxy/src/Mbproxy/Proxy/ProxyWorker.cs @@ -150,6 +150,11 @@ internal sealed partial class ProxyWorker : BackgroundService Func coalescingAccessor = () => _options.CurrentValue.Resilience.ReadCoalescing; + // Live accessor for KeepaliveOptions so a hot-reload of `Connection.Keepalive` + // propagates to the backend heartbeat loop and to upstream-socket keepalive. + Func keepaliveAccessor = + () => _options.CurrentValue.Connection.Keepalive; + var supervisor = new PlcListenerSupervisor( plc, opts.Connection, @@ -161,7 +166,8 @@ internal sealed partial class ProxyWorker : BackgroundService recoveryPipeline, _loggerFactory.CreateLogger(), backendPipeline, - coalescingAccessor); + coalescingAccessor, + keepaliveAccessor); _supervisors[plc.Name] = supervisor; } @@ -175,7 +181,9 @@ internal sealed partial class ProxyWorker : BackgroundService // (add/restart paths) honour hot-reloaded ReadCoalescing values. Func reconcilerCoalescingAccessor = () => _options.CurrentValue.Resilience.ReadCoalescing; - _reconciler.Attach(_supervisors, opts, reconcilerCoalescingAccessor); + Func reconcilerKeepaliveAccessor = + () => _options.CurrentValue.Connection.Keepalive; + _reconciler.Attach(_supervisors, opts, reconcilerCoalescingAccessor, reconcilerKeepaliveAccessor); if (_supervisors.Count == 0) { diff --git a/mbproxy/src/Mbproxy/Proxy/SocketKeepalive.cs b/mbproxy/src/Mbproxy/Proxy/SocketKeepalive.cs new file mode 100644 index 0000000..59cd931 --- /dev/null +++ b/mbproxy/src/Mbproxy/Proxy/SocketKeepalive.cs @@ -0,0 +1,49 @@ +using System.Net.Sockets; +using Mbproxy.Options; + +namespace Mbproxy.Proxy; + +/// +/// Applies OS-level TCP keepalive (SO_KEEPALIVE plus the idle-time / probe-interval / +/// probe-count tunables) to a socket. Used on both the backend socket (proxy → PLC) and +/// accepted upstream sockets (client → proxy) so the OS detects a dead peer on an +/// otherwise-idle connection — the DL205/DL260 ECOM never emits keepalives of its own. +/// +internal static class SocketKeepalive +{ + /// + /// Enables TCP keepalive on from . + /// A no-op when is false. + /// + /// Failures are swallowed: keepalive is a best-effort belt-and-suspenders measure + /// (the backend application heartbeat is the load-bearing mechanism) and must never + /// abort a connection. The three TCP tunables are also not honoured on every platform; + /// a refusal there is benign. + /// + public static void Apply(Socket socket, KeepaliveOptions options) + { + if (!options.Enabled) return; + + try + { + socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true); + + // SocketOptionName.TcpKeepAliveTime / TcpKeepAliveInterval are specified in + // SECONDS; round the configured milliseconds up to at least one second. + int idleSec = Math.Max(1, (options.TcpIdleTimeMs + 999) / 1000); + int intervalSec = Math.Max(1, (options.TcpProbeIntervalMs + 999) / 1000); + + socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveTime, idleSec); + socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveInterval, intervalSec); + socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveRetryCount, options.TcpProbeCount); + } + catch (SocketException) + { + // Platform refused a tunable — keepalive stays best-effort. + } + catch (ObjectDisposedException) + { + // Socket closed concurrently — nothing to do. + } + } +} diff --git a/mbproxy/src/Mbproxy/Proxy/Supervision/PlcListenerSupervisor.cs b/mbproxy/src/Mbproxy/Proxy/Supervision/PlcListenerSupervisor.cs index 725bba0..4584d69 100644 --- a/mbproxy/src/Mbproxy/Proxy/Supervision/PlcListenerSupervisor.cs +++ b/mbproxy/src/Mbproxy/Proxy/Supervision/PlcListenerSupervisor.cs @@ -38,6 +38,7 @@ internal sealed partial class PlcListenerSupervisor : IAsyncDisposable private readonly ILogger _logger; private readonly ResiliencePipeline? _backendConnectPipeline; private readonly Func? _coalescingOptions; + private readonly Func? _keepaliveOptions; // ── Mutable state ──────────────────────────────────────────────────────────────────── @@ -94,7 +95,8 @@ internal sealed partial class PlcListenerSupervisor : IAsyncDisposable ResiliencePipeline recoveryPipeline, ILogger logger, ResiliencePipeline? backendConnectPipeline = null, - Func? coalescingOptions = null) + Func? coalescingOptions = null, + Func? keepaliveOptions = null) { _plc = plc; _connectionOptions = connectionOptions; @@ -108,6 +110,7 @@ internal sealed partial class PlcListenerSupervisor : IAsyncDisposable _logger = logger; _backendConnectPipeline = backendConnectPipeline; _coalescingOptions = coalescingOptions; + _keepaliveOptions = keepaliveOptions; } /// @@ -325,7 +328,8 @@ internal sealed partial class PlcListenerSupervisor : IAsyncDisposable _pipeLogger, _currentContext, _backendConnectPipeline, - _coalescingOptions); + _coalescingOptions, + _keepaliveOptions); // Expose the current listener for status-page pair enumeration. _currentListener = listener; diff --git a/mbproxy/tests/Mbproxy.Tests/Admin/StatusHtmlRendererTests.cs b/mbproxy/tests/Mbproxy.Tests/Admin/StatusHtmlRendererTests.cs index 0afcdc1..2ec719e 100644 --- a/mbproxy/tests/Mbproxy.Tests/Admin/StatusHtmlRendererTests.cs +++ b/mbproxy/tests/Mbproxy.Tests/Admin/StatusHtmlRendererTests.cs @@ -53,7 +53,9 @@ public sealed class StatusHtmlRendererTests CoalescedHitCount: 0, CoalescedMissCount: 0, CoalescedResponseToDeadUpstream: 0, CacheHitCount: 0, CacheMissCount: 0, - CacheInvalidations: 0, CacheEntryCount: 0, CacheBytes: 0), + CacheInvalidations: 0, CacheEntryCount: 0, CacheBytes: 0, + BackendHeartbeatsSent: 0, BackendHeartbeatsFailed: 0, + BackendIdleDisconnects: 0), Bytes: new PlcBytesStatus(1024, 2048)); } diff --git a/mbproxy/tests/Mbproxy.Tests/Configuration/ReloadValidatorTests.cs b/mbproxy/tests/Mbproxy.Tests/Configuration/ReloadValidatorTests.cs index 2eabd6a..0789bc4 100644 --- a/mbproxy/tests/Mbproxy.Tests/Configuration/ReloadValidatorTests.cs +++ b/mbproxy/tests/Mbproxy.Tests/Configuration/ReloadValidatorTests.cs @@ -264,4 +264,74 @@ public sealed class ReloadValidatorTests Assert.False(valid); Assert.Contains(errors, e => e.Contains("GracefulShutdownTimeoutMs")); } + + // ── Keepalive section ───────────────────────────────────────────────────── + + [Fact] + public void Validate_DefaultKeepalive_Passes() + { + // Default ConnectionOptions → default KeepaliveOptions (idle 30 s, request 3 s). + var opts = MakeOptions([MakePlc("PLC-A", 5020)]); + + bool valid = ReloadValidator.Validate(opts, out _); + + Assert.True(valid); + } + + [Fact] + public void Validate_NonPositiveTcpProbeCount_Fails() + { + var opts = new MbproxyOptions + { + Plcs = [MakePlc("PLC-A", 5020)], + Connection = new ConnectionOptions + { + Keepalive = new KeepaliveOptions { TcpProbeCount = 0 }, + }, + }; + + bool valid = ReloadValidator.Validate(opts, out var errors); + + Assert.False(valid); + Assert.Contains(errors, e => e.Contains("TcpProbeCount")); + } + + [Fact] + public void Validate_OutOfRangeHeartbeatProbeAddress_Fails() + { + var opts = new MbproxyOptions + { + Plcs = [MakePlc("PLC-A", 5020)], + Connection = new ConnectionOptions + { + Keepalive = new KeepaliveOptions { BackendHeartbeatProbeAddress = 70000 }, + }, + }; + + bool valid = ReloadValidator.Validate(opts, out var errors); + + Assert.False(valid); + Assert.Contains(errors, e => e.Contains("BackendHeartbeatProbeAddress")); + } + + [Fact] + public void Validate_HeartbeatIdleNotAboveRequestTimeout_Fails() + { + // BackendHeartbeatIdleMs must sit ABOVE BackendRequestTimeoutMs, else a heartbeat + // would be timed out as fast as it could be issued. + var opts = new MbproxyOptions + { + Plcs = [MakePlc("PLC-A", 5020)], + Connection = new ConnectionOptions + { + BackendRequestTimeoutMs = 3000, + Keepalive = new KeepaliveOptions { BackendHeartbeatIdleMs = 3000 }, + }, + }; + + bool valid = ReloadValidator.Validate(opts, out var errors); + + Assert.False(valid); + Assert.Contains(errors, e => e.Contains("BackendHeartbeatIdleMs")); + } } diff --git a/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/KeepaliveTests.cs b/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/KeepaliveTests.cs new file mode 100644 index 0000000..468e065 --- /dev/null +++ b/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/KeepaliveTests.cs @@ -0,0 +1,366 @@ +using System.Net; +using System.Net.Sockets; +using Mbproxy.Options; +using Mbproxy.Proxy; +using Mbproxy.Proxy.Multiplexing; +using Microsoft.Extensions.Logging.Abstractions; +using Shouldly; +using Xunit; + +namespace Mbproxy.Tests.Proxy.Multiplexing; + +/// +/// Tests for the backend keepalive heartbeat and the helper. +/// The heartbeat tests run the real against a stub backend +/// (real sockets, no simulator) with a deliberately short BackendHeartbeatIdleMs. +/// +[Trait("Category", "Unit")] +public sealed class KeepaliveTests +{ + // ── Helpers ──────────────────────────────────────────────────────────────── + + private static int PickFreePort() + { + var l = new TcpListener(IPAddress.Loopback, 0); + l.Start(); + int port = ((IPEndPoint)l.LocalEndpoint).Port; + l.Stop(); + return port; + } + + private static async Task ReadExactAsync(Socket socket, int count, CancellationToken ct) + { + var buf = new byte[count]; + int read = 0; + while (read < count) + { + int n = await socket.ReceiveAsync(buf.AsMemory(read, count - read), SocketFlags.None, ct); + if (n == 0) throw new IOException("EOF"); + read += n; + } + return buf; + } + + private static async Task ReadOneFrameAsync(Socket socket, CancellationToken ct) + { + var header = await ReadExactAsync(socket, 7, ct); + ushort length = (ushort)((header[4] << 8) | header[5]); + int bodyLen = length - 1; + var body = bodyLen > 0 ? await ReadExactAsync(socket, bodyLen, ct) : Array.Empty(); + var frame = new byte[7 + bodyLen]; + Buffer.BlockCopy(header, 0, frame, 0, 7); + if (bodyLen > 0) Buffer.BlockCopy(body, 0, frame, 7, bodyLen); + return frame; + } + + private static byte[] BuildFc03ReadFrame(ushort txId, ushort start, ushort qty, byte unitId = 1) + => + [ + (byte)(txId >> 8), (byte)(txId & 0xFF), + 0x00, 0x00, + 0x00, 0x06, + unitId, + 0x03, + (byte)(start >> 8), (byte)(start & 0xFF), + (byte)(qty >> 8), (byte)(qty & 0xFF), + ]; + + private static byte[] BuildFc03Response(ushort txId, byte unitId, ushort register) + { + // Body = FC(1) + byteCount(1) + data(2) = 4. MBAP length = UnitId(1) + body(4) = 5. + var frame = new byte[7 + 4]; + frame[0] = (byte)(txId >> 8); + frame[1] = (byte)(txId & 0xFF); + frame[2] = 0; frame[3] = 0; + frame[4] = 0; frame[5] = 5; // length + frame[6] = unitId; + frame[7] = 0x03; + frame[8] = 2; // byte count + frame[9] = (byte)(register >> 8); + frame[10] = (byte)(register & 0xFF); + return frame; + } + + private static PerPlcContext MakeContext(string name) => new() + { + PlcName = name, + TagMap = Mbproxy.Bcd.BcdTagMap.Empty, + Counters = new ProxyCounters(), + Logger = NullLogger.Instance, + }; + + /// + /// Stub backend that echoes FC03 responses (including the synthetic heartbeat probe, + /// which is itself an FC03). When is set it reads and drains + /// requests but never responds — used to drive heartbeat timeouts. + /// + private sealed class StubBackend : IAsyncDisposable + { + public int Port { get; } + public volatile bool Silent; + private int _requestCount; + public int RequestCount => Volatile.Read(ref _requestCount); + + private readonly TcpListener _listener; + private readonly CancellationTokenSource _cts = new(); + private readonly List _clientTasks = new(); + + public StubBackend(int port, bool silent = false) + { + Port = port; + Silent = silent; + _listener = new TcpListener(IPAddress.Loopback, port); + _listener.Start(); + _ = AcceptLoop(); + } + + private async Task AcceptLoop() + { + try + { + while (!_cts.IsCancellationRequested) + { + Socket s = await _listener.AcceptSocketAsync(_cts.Token); + var task = Task.Run(() => HandleAsync(s)); + lock (_clientTasks) _clientTasks.Add(task); + } + } + catch { /* shutdown */ } + } + + private async Task HandleAsync(Socket s) + { + try + { + while (!_cts.IsCancellationRequested) + { + var req = await ReadOneFrameAsync(s, _cts.Token); + if (req.Length < 8) break; + Interlocked.Increment(ref _requestCount); + + if (Silent) continue; + + ushort txId = (ushort)((req[0] << 8) | req[1]); + byte unitId = req[6]; + byte fc = req[7]; + if (fc != 0x03) break; + + await s.SendAsync(BuildFc03Response(txId, unitId, 0x1234), SocketFlags.None, _cts.Token); + } + } + catch { /* normal */ } + finally { try { s.Dispose(); } catch { } } + } + + public async ValueTask DisposeAsync() + { + await _cts.CancelAsync(); + try { _listener.Stop(); } catch { } + Task[] snap; + lock (_clientTasks) snap = _clientTasks.ToArray(); + try { await Task.WhenAll(snap).WaitAsync(TimeSpan.FromSeconds(2)); } catch { } + _cts.Dispose(); + } + } + + private static PlcMultiplexer BuildMux(PlcOptions plc, ConnectionOptions connOpts, PerPlcContext ctx) + => new( + plc, connOpts, + new BcdPduPipeline(), + ctx, + NullLogger.Instance, + backendConnectPipeline: null); + + private static async Task<(Socket client, UpstreamPipe pipe, TcpListener listener)> + ConnectClientAsync(PlcMultiplexer mux, string plcName) + { + int proxyPort = PickFreePort(); + var listener = new TcpListener(IPAddress.Loopback, proxyPort); + listener.Start(); + + var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp) + { NoDelay = true }; + await client.ConnectAsync(IPAddress.Loopback, proxyPort); + var upstream = await listener.AcceptSocketAsync(); + var pipe = new UpstreamPipe(upstream, plcName, NullLogger.Instance); + _ = Task.Run(() => mux.StartPipeAsync(pipe, CancellationToken.None)); + return (client, pipe, listener); + } + + // ── SocketKeepalive helper ───────────────────────────────────────────────── + + [Fact] + public void SocketKeepalive_Apply_Enabled_TurnsOnKeepAlive() + { + using var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + + SocketKeepalive.Apply(socket, new KeepaliveOptions + { + Enabled = true, + TcpIdleTimeMs = 30000, + TcpProbeIntervalMs = 5000, + TcpProbeCount = 4, + }); + + int keepAlive = (int)socket.GetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive)!; + keepAlive.ShouldNotBe(0, "SO_KEEPALIVE must be enabled after Apply"); + } + + [Fact] + public void SocketKeepalive_Apply_Disabled_IsNoOp() + { + using var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + + SocketKeepalive.Apply(socket, new KeepaliveOptions { Enabled = false }); + + int keepAlive = (int)socket.GetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive)!; + keepAlive.ShouldBe(0, "Apply with Enabled=false must not touch the socket"); + } + + // ── Backend heartbeat ────────────────────────────────────────────────────── + + [Fact] + public async Task Heartbeat_FiresOnIdleBackend_AndIsAnswered_NoCascade() + { + int backendPort = PickFreePort(); + await using var backend = new StubBackend(backendPort); + + var ctx = MakeContext("PLC1"); + var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; + var connOpts = new ConnectionOptions + { + Keepalive = new KeepaliveOptions { Enabled = true, BackendHeartbeatIdleMs = 600 }, + }; + await using var mux = BuildMux(plc, connOpts, ctx); + + var (client, pipe, listener) = await ConnectClientAsync(mux, plc.Name); + try + { + // One real round-trip brings the backend up and starts the heartbeat loop. + await client.SendAsync(BuildFc03ReadFrame(0x0001, 0, 1), SocketFlags.None); + _ = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken); + + // Idle the connection past the heartbeat threshold a few times over. + long sent = 0; + for (int i = 0; i < 60; i++) + { + sent = ctx.Counters.Snapshot().BackendHeartbeatsSent; + if (sent >= 1) break; + await Task.Delay(100, TestContext.Current.CancellationToken); + } + + sent.ShouldBeGreaterThanOrEqualTo(1, "an idle backend must receive at least one heartbeat probe"); + + var snap = ctx.Counters.Snapshot(); + snap.BackendHeartbeatsFailed.ShouldBe(0, "an answered heartbeat must not count as failed"); + snap.BackendIdleDisconnects.ShouldBe(0, "an answered heartbeat must not tear the backend down"); + + // The client connection survived — a fresh request still round-trips. + await client.SendAsync(BuildFc03ReadFrame(0x0002, 0, 1), SocketFlags.None); + var rsp = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken) + .WaitAsync(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken); + ((ushort)((rsp[0] << 8) | rsp[1])).ShouldBe((ushort)0x0002); + } + finally + { + client.Dispose(); + await pipe.DisposeAsync(); + listener.Stop(); + } + } + + [Fact] + public async Task Heartbeat_SuppressedByRealTraffic() + { + int backendPort = PickFreePort(); + await using var backend = new StubBackend(backendPort); + + var ctx = MakeContext("PLC1"); + var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; + // Idle threshold well above the request cadence below. + var connOpts = new ConnectionOptions + { + Keepalive = new KeepaliveOptions { Enabled = true, BackendHeartbeatIdleMs = 1500 }, + }; + await using var mux = BuildMux(plc, connOpts, ctx); + + var (client, pipe, listener) = await ConnectClientAsync(mux, plc.Name); + try + { + // Steady real traffic every ~200 ms for ~2.4 s. Each round-trip refreshes the + // activity timestamp, so the 1500 ms idle threshold is never reached. + for (ushort i = 1; i <= 12; i++) + { + await client.SendAsync(BuildFc03ReadFrame(i, 0, 1), SocketFlags.None); + _ = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken) + .WaitAsync(TimeSpan.FromSeconds(3), TestContext.Current.CancellationToken); + await Task.Delay(200, TestContext.Current.CancellationToken); + } + + ctx.Counters.Snapshot().BackendHeartbeatsSent + .ShouldBe(0, "real traffic must keep resetting the idle timer so no heartbeat fires"); + } + finally + { + client.Dispose(); + await pipe.DisposeAsync(); + listener.Stop(); + } + } + + [Fact] + public async Task Heartbeat_Timeout_TearsDownBackend_AndCascades() + { + int backendPort = PickFreePort(); + // Silent from the start: the backend accepts the TCP connection and drains every + // frame (including the heartbeat) but never replies. + await using var backend = new StubBackend(backendPort, silent: true); + + var ctx = MakeContext("PLC1"); + var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; + var connOpts = new ConnectionOptions + { + BackendRequestTimeoutMs = 500, + Keepalive = new KeepaliveOptions { Enabled = true, BackendHeartbeatIdleMs = 700 }, + }; + await using var mux = BuildMux(plc, connOpts, ctx); + + var (client, pipe, listener) = await ConnectClientAsync(mux, plc.Name); + try + { + // First request brings the backend TCP connection up and starts the heartbeat + // loop. It will itself time out with 0x0B (the backend never answers) — drain + // and ignore that frame. + await client.SendAsync(BuildFc03ReadFrame(0x0001, 0, 1), SocketFlags.None); + try + { + _ = await ReadOneFrameAsync(client, TestContext.Current.CancellationToken) + .WaitAsync(TimeSpan.FromSeconds(2), TestContext.Current.CancellationToken); + } + catch { /* 0x0B or socket close — not what this test asserts */ } + + // The heartbeat fires on the idle socket, never gets answered, and the watchdog + // times it out — which tears the backend down. + long failed = 0, idleDisc = 0; + for (int i = 0; i < 80; i++) + { + var snap = ctx.Counters.Snapshot(); + failed = snap.BackendHeartbeatsFailed; + idleDisc = snap.BackendIdleDisconnects; + if (failed >= 1 && idleDisc >= 1) break; + await Task.Delay(100, TestContext.Current.CancellationToken); + } + + failed.ShouldBeGreaterThanOrEqualTo(1, "an unanswered heartbeat must count as failed"); + idleDisc.ShouldBeGreaterThanOrEqualTo(1, "a failed heartbeat must trigger a backend idle-disconnect"); + ctx.Counters.Snapshot().BackendHeartbeatsSent + .ShouldBeGreaterThanOrEqualTo(1, "a heartbeat must have been sent before it could fail"); + } + finally + { + client.Dispose(); + await pipe.DisposeAsync(); + listener.Stop(); + } + } +} diff --git a/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/MultiplexerE2ETests.cs b/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/MultiplexerE2ETests.cs index d9f00c3..1f52293 100644 --- a/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/MultiplexerE2ETests.cs +++ b/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/MultiplexerE2ETests.cs @@ -438,6 +438,69 @@ public sealed class MultiplexerE2ETests } } + // ── E2E 6: Backend keepalive heartbeat keeps an idle connection warm ───────────── + + /// + /// With keepalive enabled, an idle backend connection receives periodic FC03 heartbeat + /// probes. This test idles a simulator-backed connection past + /// BackendHeartbeatIdleMs, verifies backendHeartbeatsSent climbs on the + /// status page, and confirms a later real read still round-trips on the same + /// (un-cascaded) connection. + /// + [Fact(Timeout = 8_000)] + public async Task E2E_Keepalive_IdleBackend_ReceivesHeartbeats_AndStaysUsable() + { + if (_sim.SkipReason is not null) Assert.Skip(_sim.SkipReason); + + int proxyPort = PickFreePort(); + int adminPort = PickFreePort(); + + var config = MakeBaseConfig(proxyPort); + config["Mbproxy:AdminPort"] = adminPort.ToString(); + // Short idle window so the heartbeat fires several times within the test budget. + config["Mbproxy:Connection:Keepalive:Enabled"] = "true"; + config["Mbproxy:Connection:Keepalive:BackendHeartbeatIdleMs"] = "700"; + + var host = BuildBcdHost(config); + using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); + await host.StartAsync(startCts.Token); + await using var hd = new AsyncHostDispose(host); + await Task.Delay(200, TestContext.Current.CancellationToken); + + using (var client = new TcpClient()) + { + await client.ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken); + var master = new ModbusFactory().CreateMaster(client); + + // One read brings the backend up and starts the heartbeat loop. + _ = master.ReadHoldingRegisters(1, 0, 1); + + // Idle the connection so the heartbeat loop fires repeatedly. + await Task.Delay(2500, TestContext.Current.CancellationToken); + + // A later read still succeeds — the connection was never cascaded. + ushort[] regs = master.ReadHoldingRegisters(1, 0, 1); + regs.Length.ShouldBe(1, "the idle-then-active connection must still serve reads"); + } + + using var httpClient = new HttpClient(); + var resp = await httpClient.GetStringAsync( + $"http://127.0.0.1:{adminPort}/status.json", + TestContext.Current.CancellationToken); + + using var doc = JsonDocument.Parse(resp); + var backend = doc.RootElement.GetProperty("plcs")[0].GetProperty("backend"); + + backend.TryGetProperty("backendHeartbeatsSent", out _) + .ShouldBeTrue("status.json must expose backend.backendHeartbeatsSent"); + backend.GetProperty("backendHeartbeatsSent").GetInt64() + .ShouldBeGreaterThanOrEqualTo(1, "an idle backend must have received at least one heartbeat"); + backend.GetProperty("backendHeartbeatsFailed").GetInt64() + .ShouldBe(0, "every heartbeat against the live simulator must be answered"); + backend.GetProperty("backendIdleDisconnects").GetInt64() + .ShouldBe(0, "an answered heartbeat must never tear the backend down"); + } + // ── Helpers ────────────────────────────────────────────────────────────────────── private Dictionary MakeBaseConfig(int proxyPort) => new()