mbproxy: add opt-in response cache (Phase 11)
Layers a per-PLC, per-tag response cache on top of Phase 10's coalescing.
Cache is OFF by default per tag (CacheTtlMs = 0); a fresh deployment with no
TTL config behaves identically to Phase 10. Operators opt tags in by setting
CacheTtlMs > 0 on a BcdTagOptions entry (or DefaultCacheTtlMs > 0 on a
PlcOptions entry), explicitly acknowledging the staleness window.
Cache lookup order: cache -> coalesce -> backend. A cache hit short-circuits
both Phase 10's coalescing path and Phase 9's backend send. Cache stores
POST-rewriter PDU bytes so hits never re-invoke the BCD rewriter. FC06/FC16
write responses invalidate every cached entry whose address range overlaps
the write (half-open interval math).
New types (Mbproxy.Proxy.Cache, all internal):
- CacheKey (record-struct, same shape as CoalescingKey but kept SEPARATE so
the two phases evolve independently).
- CacheEntry, ResponseCache (IDisposable; LRU + PeriodicTimer eviction
loop), CacheInvalidator (pure overlap matcher), CacheLogEvents (stable
mbproxy.cache.* names).
Multi-tag range TTL = min(TTLs); any tag with TTL = 0 in the range disables
caching for the whole read (conservative-by-design).
Options surface:
- BcdTagOptions.CacheTtlMs (nullable int; null = fall through to PLC default)
- PlcOptions.DefaultCacheTtlMs
- MbproxyOptions.Cache.{AllowLongTtl, MaxEntriesPerPlc, EvictionIntervalMs}
- TTL > 60_000 ms requires Cache.AllowLongTtl = true (reload validation).
Admin counters (Tier 1.8 + Tier 2 cache-memory KPIs from docs/kpi.md):
- CacheHitCount, CacheMissCount, CacheInvalidations on ProxyCounters.
- CacheEntryCount, CacheBytes via a new ICacheStatsProvider snapshot path.
- /status.json and the HTML page surface a new Cache cell per PLC row.
Hot-reload: any tag-list change to a PLC reseats the per-PLC context with a
fresh cache; the old cache is disposed inside ReplaceContextAsync. Per-tag
flush granularity is intentionally not implemented in v1.
PLCs with no cache-eligible tags (every resolved tag has CacheTtlMs = 0)
get Cache = null on the context and skip the eviction timer entirely, so
the no-cache path is byte-identical to Phase 10.
Tests (32 new unit + 5 new E2E = 37 new; suite now 314 unit + 48 E2E):
- CacheKeyTests, CacheEntryTests (records + boundary semantics).
- CacheInvalidatorTests: full overlap, both partials, adjacent-not-
overlapping, disjoint, different unit ID + auxiliary FC-filter / zero-qty.
- ResponseCacheTests: round-trip, lazy expiry, range invalidation,
unit-id filter, LRU bound, LRU access tracking, concurrent get/set,
dispose, clear, approximate-bytes accounting.
- ResponseCacheMultiplexerTests (stub-backend): hit short-circuits
coalescing, BCD-decoded bytes are cached not raw, FC06 invalidates
overlapping, non-overlapping write does not invalidate, multi-tag
TTL=min rule, regression-cache-disabled-by-default-is-Phase-10, hit
works even when backend unreachable.
- ResponseCacheE2ETests (pymodbus DL205 sim, sequential reads):
* Headline: 10 reads with TTL=1000 ms -> 9 hits, 1 miss, 1 backend trip.
* TTL expiry path with sleep > TTL.
* Write invalidation through the proxy on a scratch register.
* BCD-decoded bytes are cached, not raw BCD nibbles.
* Regression: Cache disabled by default -> behaviour byte-identical to
Phase 10.
Pre-existing flake hardened: BackendDisconnect_CascadesToAllUpstreams now
polls briefly for the cascade counter to absorb the inherent scheduling
gap between "upstream EOF observed" and "counter incremented inside
TearDownBackendAsync." Counter semantics unchanged.
Phase doc updated with implementation clarifications discovered during
this work (CacheKey kept separate from CoalescingKey, LastUsedTick is
long, FC06/FC16 startAddr/qty parsing extension, cache-pre-connect
short-circuit, write-invalidation only on successful responses).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -3,6 +3,7 @@ using System.Diagnostics;
|
||||
using System.Net.Sockets;
|
||||
using System.Threading.Channels;
|
||||
using Mbproxy.Options;
|
||||
using Mbproxy.Proxy.Cache;
|
||||
using Polly;
|
||||
|
||||
namespace Mbproxy.Proxy.Multiplexing;
|
||||
@@ -102,6 +103,12 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
|
||||
_backendConnectPipeline = backendConnectPipeline;
|
||||
_coalescingOptions = coalescingOptions ?? (static () => new ReadCoalescingOptions());
|
||||
|
||||
// Phase 11 — register the per-PLC cache as the live stats source for the snapshot
|
||||
// path. Cache may be null when the per-PLC context has not been wired with one
|
||||
// (every tag uncached, or unit tests).
|
||||
if (_ctx.Cache is not null)
|
||||
_ctx.Counters.SetCacheStatsProvider(new CacheStatsAdapter(_ctx.Cache));
|
||||
|
||||
// Register this multiplexer as the live telemetry source for the PLC's counters.
|
||||
_ctx.Counters.SetMultiplexProvider(this);
|
||||
|
||||
@@ -177,6 +184,7 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
|
||||
// Stop the counters provider link so a status snapshot during teardown doesn't
|
||||
// see live-but-soon-to-be-empty internal state.
|
||||
_ctx.Counters.SetMultiplexProvider(null);
|
||||
_ctx.Counters.SetCacheStatsProvider(null);
|
||||
|
||||
await _disposeCts.CancelAsync().ConfigureAwait(false);
|
||||
|
||||
@@ -450,6 +458,57 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
|
||||
frame.AsSpan(MbapFrame.HeaderSize, pduBodyLen),
|
||||
responseCtx);
|
||||
|
||||
// Phase 11 — post-rewriter cache update:
|
||||
// * FC03/FC04 successful responses are stored when the request was
|
||||
// cache-eligible (resolvedTtlMs > 0).
|
||||
// * FC06/FC16 successful responses invalidate every cached entry whose
|
||||
// address range overlaps the write.
|
||||
if (_ctx.Cache is { } postCache)
|
||||
{
|
||||
byte fcInResponse = frame[MbapFrame.HeaderSize]; // post-rewriter, but the FC byte is never rewritten
|
||||
bool isException = (fcInResponse & 0x80) != 0;
|
||||
|
||||
if (!isException)
|
||||
{
|
||||
if (inFlight.Fc is 0x03 or 0x04 && inFlight.ResolvedCacheTtlMs > 0)
|
||||
{
|
||||
// Snapshot the post-rewriter PDU body so the cached entry is
|
||||
// independent of this frame's lifetime.
|
||||
byte[] pduSnapshot = new byte[pduBodyLen];
|
||||
Buffer.BlockCopy(frame, MbapFrame.HeaderSize, pduSnapshot, 0, pduBodyLen);
|
||||
|
||||
var cacheKey = new CacheKey(
|
||||
inFlight.UnitId, inFlight.Fc,
|
||||
inFlight.StartAddress, inFlight.Qty);
|
||||
var now = DateTimeOffset.UtcNow;
|
||||
var entry = new CacheEntry(
|
||||
PduBytes: pduSnapshot,
|
||||
CachedAtUtc: now,
|
||||
ExpiresAtUtc: now.AddMilliseconds(inFlight.ResolvedCacheTtlMs),
|
||||
Length: pduSnapshot.Length,
|
||||
LastUsedTick: 0); // ResponseCache.Set stamps the real tick
|
||||
postCache.Set(cacheKey, entry);
|
||||
|
||||
CacheLogEvents.Store(_logger, _plc.Name,
|
||||
inFlight.UnitId, inFlight.Fc,
|
||||
inFlight.StartAddress, inFlight.Qty,
|
||||
inFlight.ResolvedCacheTtlMs);
|
||||
}
|
||||
else if (inFlight.Fc is 0x06 or 0x10)
|
||||
{
|
||||
int invalidated = postCache.Invalidate(
|
||||
inFlight.UnitId, inFlight.StartAddress, inFlight.Qty);
|
||||
if (invalidated > 0)
|
||||
{
|
||||
_ctx.Counters.AddCacheInvalidations(invalidated);
|
||||
CacheLogEvents.Invalidated(_logger, _plc.Name,
|
||||
inFlight.UnitId, inFlight.StartAddress, inFlight.Qty,
|
||||
invalidated);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Fan out to each interested party with their original TxId restored.
|
||||
// Phase 9: always exactly one party. Phase 10: N parties (read coalescing).
|
||||
// Note: the InFlightByKey TryRemove above (for FC03/FC04) guarantees no
|
||||
@@ -506,15 +565,6 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
|
||||
{
|
||||
if (_disposed) return;
|
||||
|
||||
// Ensure backend is connected. Failure here means we cannot service the request;
|
||||
// close the upstream pipe (consistent with the 1:1 model's behaviour on connect
|
||||
// failure).
|
||||
if (!await EnsureBackendConnectedAsync(ct).ConfigureAwait(false))
|
||||
{
|
||||
try { await pipe.DisposeAsync().ConfigureAwait(false); } catch { /* best effort */ }
|
||||
return;
|
||||
}
|
||||
|
||||
if (frame.Length < MbapFrame.HeaderSize)
|
||||
return;
|
||||
|
||||
@@ -522,9 +572,11 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
|
||||
out ushort originalTxId, out _, out _, out byte unitId))
|
||||
return;
|
||||
|
||||
// Parse the PDU FC + start/qty (for FC03/04) — needed for both the coalescing-key
|
||||
// path and the response correlation slot. FC06/FC16 (writes) keep startAddr/qty = 0;
|
||||
// they bypass coalescing entirely.
|
||||
// Parse the PDU FC + start/qty. FC03/FC04 reads use start/qty for the coalescing key
|
||||
// and (Phase 11) for the cache lookup. FC06 writes carry [addr][value]; we treat qty
|
||||
// as 1 for invalidation. FC16 carries [start][qty][byteCount]...; qty is the write
|
||||
// span used for cache invalidation. Phase 11: FC06/FC16 start/qty drive cache
|
||||
// invalidation by overlap rather than exact key.
|
||||
int pduOffset = MbapFrame.HeaderSize;
|
||||
byte fcByte = frame.Length > pduOffset ? frame[pduOffset] : (byte)0;
|
||||
ushort startAddr = 0;
|
||||
@@ -534,6 +586,56 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
|
||||
startAddr = (ushort)((frame[pduOffset + 1] << 8) | frame[pduOffset + 2]);
|
||||
qty = (ushort)((frame[pduOffset + 3] << 8) | frame[pduOffset + 4]);
|
||||
}
|
||||
else if (fcByte == 0x06 && frame.Length >= pduOffset + 5)
|
||||
{
|
||||
// FC06 = Write Single Register. PDU: [fc=06][addrHi][addrLo][valHi][valLo].
|
||||
// For cache invalidation we represent this as qty=1 at addr.
|
||||
startAddr = (ushort)((frame[pduOffset + 1] << 8) | frame[pduOffset + 2]);
|
||||
qty = 1;
|
||||
}
|
||||
else if (fcByte == 0x10 && frame.Length >= pduOffset + 5)
|
||||
{
|
||||
// FC16 = Write Multiple Registers. PDU: [fc=10][startHi][startLo][qtyHi][qtyLo][byteCount]...
|
||||
startAddr = (ushort)((frame[pduOffset + 1] << 8) | frame[pduOffset + 2]);
|
||||
qty = (ushort)((frame[pduOffset + 3] << 8) | frame[pduOffset + 4]);
|
||||
}
|
||||
|
||||
// Phase 11 — response-cache path. Cache check happens BEFORE coalescing AND before
|
||||
// we attempt to bring up the backend connection. A hit short-circuits everything,
|
||||
// including the EnsureBackendConnectedAsync call — operators with all reads cached
|
||||
// and the backend down still get served (the cache survives backend disconnects per
|
||||
// the design contract). The cache only fires for FC03/FC04 and only when the read
|
||||
// range's resolved TTL > 0.
|
||||
int resolvedCacheTtlMs = 0;
|
||||
if (fcByte is 0x03 or 0x04 && _ctx.Cache is { } responseCache)
|
||||
{
|
||||
resolvedCacheTtlMs = _ctx.TagMap.ResolveCacheTtlMs(startAddr, qty);
|
||||
if (resolvedCacheTtlMs > 0)
|
||||
{
|
||||
var cacheKey = new CacheKey(unitId, fcByte, startAddr, qty);
|
||||
if (responseCache.TryGet(cacheKey, out var cached))
|
||||
{
|
||||
_ctx.Counters.IncrementCacheHit();
|
||||
CacheLogEvents.Hit(_logger, _plc.Name, unitId, fcByte, startAddr, qty);
|
||||
|
||||
byte[] hitFrame = BuildCacheHitFrame(originalTxId, unitId, cached.PduBytes);
|
||||
await pipe.SendResponseAsync(hitFrame, ct).ConfigureAwait(false);
|
||||
return;
|
||||
}
|
||||
|
||||
_ctx.Counters.IncrementCacheMiss();
|
||||
CacheLogEvents.Miss(_logger, _plc.Name, unitId, fcByte, startAddr, qty);
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure backend is connected. Failure here means we cannot service the request;
|
||||
// close the upstream pipe (consistent with the 1:1 model's behaviour on connect
|
||||
// failure).
|
||||
if (!await EnsureBackendConnectedAsync(ct).ConfigureAwait(false))
|
||||
{
|
||||
try { await pipe.DisposeAsync().ConfigureAwait(false); } catch { /* best effort */ }
|
||||
return;
|
||||
}
|
||||
|
||||
// Phase 10 — read-coalescing path. Only FC03/FC04 are coalescable; only when the
|
||||
// feature is enabled in the live config. If the late-arriving request matches an
|
||||
@@ -573,7 +675,8 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
|
||||
StartAddress: startAddr,
|
||||
Qty: qty,
|
||||
InterestedParties: new List<InterestedParty> { newParty },
|
||||
SentAtUtc: DateTimeOffset.UtcNow);
|
||||
SentAtUtc: DateTimeOffset.UtcNow,
|
||||
ResolvedCacheTtlMs: resolvedCacheTtlMs);
|
||||
}
|
||||
|
||||
var partyList = new List<InterestedParty>(capacity: 1) { newParty };
|
||||
@@ -583,7 +686,8 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
|
||||
StartAddress: startAddr,
|
||||
Qty: qty,
|
||||
InterestedParties: partyList,
|
||||
SentAtUtc: DateTimeOffset.UtcNow);
|
||||
SentAtUtc: DateTimeOffset.UtcNow,
|
||||
ResolvedCacheTtlMs: resolvedCacheTtlMs);
|
||||
|
||||
if (!_correlation.TryAdd(proxyTxId, inFlight))
|
||||
{
|
||||
@@ -673,7 +777,8 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
|
||||
StartAddress: startAddr,
|
||||
Qty: qty,
|
||||
InterestedParties: partyListNc,
|
||||
SentAtUtc: DateTimeOffset.UtcNow);
|
||||
SentAtUtc: DateTimeOffset.UtcNow,
|
||||
ResolvedCacheTtlMs: resolvedCacheTtlMs);
|
||||
|
||||
if (!_correlation.TryAdd(proxyTxIdFc, inFlightNc))
|
||||
{
|
||||
@@ -809,6 +914,20 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
|
||||
|
||||
// ── Helpers ───────────────────────────────────────────────────────────────
|
||||
|
||||
/// <summary>
|
||||
/// Adapter exposing a <see cref="Cache.ResponseCache"/>'s Count / ApproximateBytes as
|
||||
/// <see cref="ICacheStatsProvider"/> for the snapshot path. Kept as a sealed class so
|
||||
/// the cache type itself doesn't need to take an interface dependency on
|
||||
/// <see cref="ICacheStatsProvider"/>.
|
||||
/// </summary>
|
||||
private sealed class CacheStatsAdapter : ICacheStatsProvider
|
||||
{
|
||||
private readonly Cache.ResponseCache _cache;
|
||||
public CacheStatsAdapter(Cache.ResponseCache cache) => _cache = cache;
|
||||
public long EntryCount => _cache.Count;
|
||||
public long ApproximateBytes => _cache.ApproximateBytes;
|
||||
}
|
||||
|
||||
private static async Task<bool> FillAsync(
|
||||
Socket socket, byte[] buf, int offset, int count, CancellationToken ct)
|
||||
{
|
||||
@@ -824,6 +943,28 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi
|
||||
return true;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Phase 11 — builds an MBAP-framed response from cached PDU bytes for the given
|
||||
/// upstream party. The cache stores POST-rewriter PDU bodies (no MBAP); each hit
|
||||
/// stamps a fresh MBAP header carrying the requesting party's original TxId so the
|
||||
/// response looks indistinguishable from a fresh backend reply.
|
||||
/// </summary>
|
||||
private static byte[] BuildCacheHitFrame(ushort originalTxId, byte unitId, byte[] cachedPdu)
|
||||
{
|
||||
// Length field covers UnitId(1) + PDU body. Capped by Modbus spec at 253-byte PDU.
|
||||
int pduLen = cachedPdu.Length;
|
||||
ushort length = (ushort)(1 + pduLen);
|
||||
var frame = new byte[MbapFrame.HeaderSize + pduLen];
|
||||
frame[0] = (byte)(originalTxId >> 8);
|
||||
frame[1] = (byte)(originalTxId & 0xFF);
|
||||
frame[2] = 0; frame[3] = 0;
|
||||
frame[4] = (byte)(length >> 8);
|
||||
frame[5] = (byte)(length & 0xFF);
|
||||
frame[6] = unitId;
|
||||
Buffer.BlockCopy(cachedPdu, 0, frame, MbapFrame.HeaderSize, pduLen);
|
||||
return frame;
|
||||
}
|
||||
|
||||
private static byte[] BuildExceptionFrame(ushort originalTxId, byte unitId, byte fc, byte exceptionCode)
|
||||
{
|
||||
// Modbus exception PDU = [fc | 0x80][exceptionCode].
|
||||
|
||||
Reference in New Issue
Block a user