mbproxy: add keepalive / connection monitoring

The DL205/DL260 ECOM emits no TCP keepalives, so an idle backend socket
can be silently dropped by a middlebox (switch, firewall, NAT) after
2-5 minutes. Enable OS SO_KEEPALIVE on backend and accepted upstream
sockets, and drive a periodic synthetic FC03 heartbeat on each idle
backend socket so a dead path is detected before a real client request
hits it. Controlled by Connection.Keepalive (ON by default).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-15 09:40:54 -04:00
parent 7466a46aa7
commit 0868613890
25 changed files with 1135 additions and 25 deletions
@@ -28,6 +28,12 @@ internal sealed record InterestedParty(UpstreamPipe Pipe, ushort OriginalTxId);
/// read coalescing uses to fan out a single PLC response to multiple upstream clients.
/// Reviewer note: do <i>not</i> simplify back to a single <c>UpstreamPipe</c> field.</para>
/// </summary>
/// <param name="IsHeartbeat">
/// <c>true</c> for the synthetic FC03 keepalive probe issued by the backend heartbeat
/// loop. Heartbeat entries carry no <see cref="InterestedParties"/>: the backend reader
/// drops the response (no fan-out, no rewriter, no cache) and the timeout watchdog tears
/// the backend down instead of dispatching a 0x0B exception. Defaults to <c>false</c>.
/// </param>
internal sealed record InFlightRequest(
byte UnitId,
byte Fc,
@@ -35,4 +41,5 @@ internal sealed record InFlightRequest(
ushort Qty,
IReadOnlyList<InterestedParty> InterestedParties,
DateTimeOffset SentAtUtc,
int ResolvedCacheTtlMs = 0);
int ResolvedCacheTtlMs = 0,
bool IsHeartbeat = false);
@@ -0,0 +1,54 @@
namespace Mbproxy.Proxy.Multiplexing;
/// <summary>
/// Source-generated <see cref="LoggerMessage"/> definitions for the backend keepalive
/// heartbeat. Event names are stable — do not rename without updating
/// docs/Reference/LogEvents.md's event-name table.
/// </summary>
internal static partial class KeepaliveLogEvents
{
/// <summary>
/// Emitted each time the heartbeat loop issues a synthetic FC03 probe on an idle
/// backend socket. Debug level — one per <c>BackendHeartbeatIdleMs</c> per idle PLC.
/// </summary>
[LoggerMessage(
EventId = 150,
EventName = "mbproxy.keepalive.heartbeat.sent",
Level = LogLevel.Debug,
Message = "Keepalive heartbeat sent: Plc={Plc} ProxyTxId={ProxyTxId} Address={Address}")]
public static partial void HeartbeatSent(
ILogger logger,
string plc,
ushort proxyTxId,
ushort address);
/// <summary>
/// Emitted when a keepalive heartbeat probe is not answered within
/// <c>BackendRequestTimeoutMs</c>. The backend is connected-but-not-answering; the
/// multiplexer tears it down (see <see cref="BackendIdleDisconnect"/>).
/// </summary>
[LoggerMessage(
EventId = 151,
EventName = "mbproxy.keepalive.heartbeat.timeout",
Level = LogLevel.Warning,
Message = "Keepalive heartbeat timed out: Plc={Plc} ProxyTxId={ProxyTxId} ElapsedMs={ElapsedMs}")]
public static partial void HeartbeatTimeout(
ILogger logger,
string plc,
ushort proxyTxId,
long elapsedMs);
/// <summary>
/// Emitted when a failed keepalive heartbeat triggers a proactive backend teardown.
/// Every attached upstream pipe is cascaded; clients reconnect on their next request.
/// </summary>
[LoggerMessage(
EventId = 152,
EventName = "mbproxy.keepalive.backend.idle_disconnect",
Level = LogLevel.Information,
Message = "Backend torn down by keepalive: Plc={Plc} HeartbeatElapsedMs={ElapsedMs}")]
public static partial void BackendIdleDisconnect(
ILogger logger,
string plc,
long elapsedMs);
}
@@ -61,6 +61,12 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
// `() => optionsMonitor.CurrentValue.Resilience.ReadCoalescing`. Tests default to a
// fresh `ReadCoalescingOptions()` (Enabled = true, MaxParties = 32).
private readonly Func<ReadCoalescingOptions> _coalescingOptions;
// Live keepalive config accessor. Read at backend-connect time (TCP SO_KEEPALIVE) and
// on each heartbeat-loop tick (idle threshold + probe address) so a hot-reload of
// `Connection.Keepalive` propagates without a listener restart. Production wires this
// to `() => optionsMonitor.CurrentValue.Connection.Keepalive`; the fallback reads the
// construction-time `ConnectionOptions` snapshot.
private readonly Func<KeepaliveOptions> _keepaliveOptions;
private readonly TxIdAllocator _allocator = new();
private readonly CorrelationMap _correlation = new();
@@ -86,6 +92,19 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
private CancellationTokenSource? _backendCts;
private Task? _backendWriterTask;
private Task? _backendReaderTask;
private Task? _backendHeartbeatTask;
// UTC ticks of the last backend socket activity (a send OR a received frame). Updated
// by the writer and reader tasks; read by the heartbeat loop to decide whether the
// socket has been idle long enough to warrant a probe. Interlocked for cross-task
// coherence.
private long _lastBackendActivityTicks;
// Unit ID of the most recent upstream request. The synthetic heartbeat reuses it so
// the probe targets the same Modbus unit the real clients successfully talk to.
// Defaults to 0 until the first upstream frame is seen; by the time a heartbeat can
// fire the backend socket exists, which means at least one upstream frame arrived.
private int _lastSeenUnitId;
private readonly CancellationTokenSource _disposeCts = new();
// Volatile so the disposing thread's write is observed by every hot-path reader
@@ -102,7 +121,8 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
PerPlcContext perPlcContext,
ILogger<PlcMultiplexer> logger,
ResiliencePipeline? backendConnectPipeline = null,
Func<ReadCoalescingOptions>? coalescingOptions = null)
Func<ReadCoalescingOptions>? coalescingOptions = null,
Func<KeepaliveOptions>? keepaliveOptions = null)
{
_plc = plc;
_connectionOptions = connectionOptions;
@@ -111,6 +131,7 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
_logger = logger;
_backendConnectPipeline = backendConnectPipeline;
_coalescingOptions = coalescingOptions ?? (static () => new ReadCoalescingOptions());
_keepaliveOptions = keepaliveOptions ?? (() => _connectionOptions.Keepalive);
// Register the per-PLC cache as the live stats source for the snapshot path.
// Cache may be null when the per-PLC context has not been wired with one
@@ -282,6 +303,7 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
// Build a fresh backend socket and Polly-connect.
var backend = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)
{ NoDelay = true };
SocketKeepalive.Apply(backend, _keepaliveOptions());
try
{
@@ -318,8 +340,11 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
{
_backendSocket = backend;
_backendCts = cts2;
// Seed the idle timer so the heartbeat loop measures idleness from connect.
Interlocked.Exchange(ref _lastBackendActivityTicks, DateTime.UtcNow.Ticks);
_backendWriterTask = Task.Run(() => RunBackendWriterAsync(backend, cts2.Token), CancellationToken.None);
_backendReaderTask = Task.Run(() => RunBackendReaderAsync(backend, cts2.Token), CancellationToken.None);
_backendHeartbeatTask = Task.Run(() => RunBackendHeartbeatAsync(cts2.Token), CancellationToken.None);
}
_ctx.Counters.IncrementConnectSuccess();
@@ -381,18 +406,20 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
{
Socket? oldSocket;
CancellationTokenSource? oldCts;
Task? writer, reader;
Task? writer, reader, heartbeat;
lock (_backendLock)
{
oldSocket = _backendSocket;
oldCts = _backendCts;
writer = _backendWriterTask;
reader = _backendReaderTask;
heartbeat = _backendHeartbeatTask;
_backendSocket = null;
_backendCts = null;
_backendWriterTask = null;
_backendReaderTask = null;
_backendHeartbeatTask = null;
}
if (oldSocket is null && oldCts is null) return;
@@ -454,6 +481,7 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
// Best-effort join.
try { if (writer is not null) await writer.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(false); } catch { /* swallow */ }
try { if (reader is not null) await reader.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(false); } catch { /* swallow */ }
try { if (heartbeat is not null) await heartbeat.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(false); } catch { /* swallow */ }
oldCts?.Dispose();
@@ -489,6 +517,9 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
if (n == 0) throw new SocketException((int)SocketError.ConnectionReset);
sent += n;
}
// A send counts as backend activity — it suppresses the idle heartbeat.
Interlocked.Exchange(ref _lastBackendActivityTicks, DateTime.UtcNow.Ticks);
}
}
catch (OperationCanceledException)
@@ -542,6 +573,10 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
if (!await FillAsync(backend, frame, MbapFrame.HeaderSize, pduBodyLen, ct).ConfigureAwait(false))
break;
// A received frame counts as backend activity — it suppresses (and, for a
// heartbeat response, satisfies) the idle heartbeat.
Interlocked.Exchange(ref _lastBackendActivityTicks, DateTime.UtcNow.Ticks);
if (!_correlation.TryRemove(proxyTxId, out var inFlight))
{
// No correlation entry — either a stale response after cascade, or
@@ -552,6 +587,14 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
// Free the allocator slot immediately so it can be reused.
_allocator.Release(proxyTxId);
// Keepalive heartbeat response — the probe came back, the backend is alive.
// The activity timestamp was already refreshed above. There is no upstream
// party, no cache eligibility, and no rewriting to do: drop the payload and
// skip the EWMA update so the synthetic probe never pollutes the
// client-facing round-trip metric.
if (inFlight.IsHeartbeat)
continue;
// For FC03/FC04 reads, also clear the coalescing-by-key entry so a
// brand-new identical request issued AFTER this response is treated as a
// miss (opens a fresh round-trip). The TryRemove is best-effort: a
@@ -727,6 +770,10 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
out ushort originalTxId, out _, out _, out byte unitId))
return;
// Remember the unit ID so the backend keepalive heartbeat probes the same Modbus
// unit the real clients are known to reach successfully.
Volatile.Write(ref _lastSeenUnitId, unitId);
// Count inbound bytes from the upstream client. Surfaces in bytes.upstreamIn on
// the status page. Counted ONCE per parsed frame regardless of subsequent
// routing (cache hit, coalesce, backend round-trip, exception).
@@ -1062,6 +1109,23 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
_allocator.Release(proxyTxId);
// Keepalive heartbeat that never came back. The backend is no longer
// answering Modbus even though the socket may still look connected —
// tear it down proactively (cascading every attached pipe) so the
// failure is found here, during idle, instead of corrupting the next
// real client request. There is no upstream party to send a 0x0B to.
if (req.IsHeartbeat)
{
long hbElapsedMs = (long)(DateTimeOffset.UtcNow - req.SentAtUtc).TotalMilliseconds;
KeepaliveLogEvents.HeartbeatTimeout(_logger, _plc.Name, proxyTxId, hbElapsedMs);
_ctx.Counters.IncrementBackendHeartbeatFailed();
_ctx.Counters.IncrementBackendIdleDisconnect();
KeepaliveLogEvents.BackendIdleDisconnect(_logger, _plc.Name, hbElapsedMs);
if (!_disposeCts.IsCancellationRequested)
_ = TearDownBackendAsync("keepalive heartbeat timeout", cascadeUpstreams: true);
continue;
}
// Also clear the coalescing-by-key entry. A late attach that raced
// in just before the watchdog claim will still receive the 0x0B
// exception via this entry's InterestedParties list (List<T>
@@ -1110,6 +1174,124 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
}
}
// ── Backend keepalive heartbeat ───────────────────────────────────────────
/// <summary>
/// Backend keepalive heartbeat loop. Started alongside the writer/reader on each
/// successful connect and cancelled with them on teardown. While the backend socket
/// has been idle (no send or receive) for longer than
/// <see cref="KeepaliveOptions.BackendHeartbeatIdleMs"/>, it issues a synthetic FC03
/// qty=1 read so the path stays warm against middlebox idle-drop and a backend that is
/// connected-but-not-answering is detected here rather than on the next client request.
///
/// <para>The probe response is consumed by <see cref="RunBackendReaderAsync"/> (which
/// recognises <see cref="InFlightRequest.IsHeartbeat"/> and drops it); a probe that
/// never returns is timed out by <see cref="RunRequestTimeoutWatchdogAsync"/>, which
/// tears the backend down. The heartbeat keeps an <i>existing</i> backend warm — it
/// never resurrects a dead one (reconnect stays gated on the next upstream frame).</para>
/// </summary>
private async Task RunBackendHeartbeatAsync(CancellationToken ct)
{
try
{
while (!ct.IsCancellationRequested)
{
var ka = _keepaliveOptions();
int idleMs = Math.Max(1000, ka.BackendHeartbeatIdleMs);
// Tick at a quarter of the idle window so a freshly-elapsed idle period is
// noticed promptly, floored at 500 ms so the loop never busy-wakes.
int tickMs = Math.Max(500, idleMs / 4);
await Task.Delay(tickMs, ct).ConfigureAwait(false);
if (!ka.Enabled)
continue;
long lastTicks = Interlocked.Read(ref _lastBackendActivityTicks);
double idleElapsedMs =
(DateTime.UtcNow - new DateTime(lastTicks, DateTimeKind.Utc)).TotalMilliseconds;
if (idleElapsedMs < idleMs)
continue;
SendHeartbeat(ka);
}
}
catch (OperationCanceledException)
{
// Normal teardown.
}
catch (Exception ex)
{
_logger.LogError(ex, "Backend heartbeat loop faulted: Plc={Plc}", _plc.Name);
}
}
/// <summary>
/// Builds and enqueues one synthetic FC03 qty=1 heartbeat request onto the backend
/// outbound channel. The correlation entry is flagged <see cref="InFlightRequest.IsHeartbeat"/>
/// so the reader and watchdog treat it specially; it carries no interested parties and
/// bypasses the coalescing and cache paths entirely.
/// </summary>
private void SendHeartbeat(KeepaliveOptions ka)
{
// A saturated TxId space means the backend is busy (65,536 requests in flight),
// which is the opposite of idle — skip this tick rather than force a probe.
if (!_allocator.TryAllocate(out ushort proxyTxId))
return;
byte unitId = (byte)Volatile.Read(ref _lastSeenUnitId);
ushort address = (ushort)ka.BackendHeartbeatProbeAddress;
var inFlight = new InFlightRequest(
UnitId: unitId,
Fc: 0x03,
StartAddress: address,
Qty: 1,
InterestedParties: Array.Empty<InterestedParty>(),
SentAtUtc: DateTimeOffset.UtcNow,
ResolvedCacheTtlMs: 0,
IsHeartbeat: true);
if (!_correlation.TryAdd(proxyTxId, inFlight))
{
_allocator.Release(proxyTxId);
return;
}
byte[] frame = BuildHeartbeatFrame(proxyTxId, unitId, address);
// Non-blocking enqueue: if the channel is full the backend is not idle (a race), and
// if it is completed the backend is tearing down — either way, undo and skip.
if (!_outboundChannel.Writer.TryWrite(frame))
{
if (_correlation.TryRemove(proxyTxId, out _))
_allocator.Release(proxyTxId);
return;
}
_ctx.Counters.IncrementBackendHeartbeatSent();
KeepaliveLogEvents.HeartbeatSent(_logger, _plc.Name, proxyTxId, address);
}
/// <summary>
/// Builds a 12-byte MBAP-framed FC03 (Read Holding Registers) request reading one
/// register at <paramref name="address"/> — the keepalive heartbeat probe PDU.
/// </summary>
private static byte[] BuildHeartbeatFrame(ushort proxyTxId, byte unitId, ushort address)
{
// PDU = [fc=03][addrHi][addrLo][qtyHi][qtyLo]. MBAP length = UnitId(1) + PDU(5) = 6.
var frame = new byte[MbapFrame.HeaderSize + 5];
frame[0] = (byte)(proxyTxId >> 8);
frame[1] = (byte)(proxyTxId & 0xFF);
frame[2] = 0; frame[3] = 0; // ProtocolId
frame[4] = 0; frame[5] = 6; // Length
frame[6] = unitId;
frame[7] = 0x03; // FC03 Read Holding Registers
frame[8] = (byte)(address >> 8);
frame[9] = (byte)(address & 0xFF);
frame[10] = 0; frame[11] = 1; // Qty = 1
return frame;
}
// ── Helpers ───────────────────────────────────────────────────────────────
/// <summary>
@@ -1,6 +1,7 @@
using System.Net;
using System.Net.Sockets;
using System.Threading.Channels;
using Mbproxy.Options;
namespace Mbproxy.Proxy.Multiplexing;
@@ -77,10 +78,15 @@ internal sealed partial class UpstreamPipe : IAsyncDisposable
/// </summary>
public bool IsAlive => !_disposed && !_cts.IsCancellationRequested;
public UpstreamPipe(Socket upstream, string plcName, ILogger logger)
public UpstreamPipe(Socket upstream, string plcName, ILogger logger, KeepaliveOptions? keepalive = null)
{
_upstream = upstream;
_upstream.NoDelay = true;
// Enable OS TCP keepalive on the accepted client socket so a half-open/dead
// client (gone without a TCP FIN) faults the read loop and is reaped, instead of
// leaking a pipe + correlation slots until the proxy next tries to write to it.
if (keepalive is not null)
SocketKeepalive.Apply(_upstream, keepalive);
RemoteEp = upstream.RemoteEndPoint as IPEndPoint;
_plcName = plcName;
_logger = logger;