diff --git a/mbproxy/README.md b/mbproxy/README.md index 9a8c5f4..ce8d0bb 100644 --- a/mbproxy/README.md +++ b/mbproxy/README.md @@ -1,6 +1,6 @@ # mbproxy -A .NET 10 Windows Service that sits inline as a Modbus TCP proxy in front of a fleet of AutomationDirect DirectLOGIC DL205/DL260 controllers, rewriting BCD-encoded registers bidirectionally so upstream clients can read and write them as plain integers. +A .NET 10 Windows Service that sits inline as a Modbus TCP proxy in front of a fleet of AutomationDirect DirectLOGIC DL205/DL260 controllers, rewriting BCD-encoded registers bidirectionally so upstream clients can read and write them as plain integers. Since Phase 11, the proxy also offers an opt-in per-tag response cache (default OFF) for FC03/FC04 reads with bounded operator-configured staleness — see [`docs/design.md`](docs/design.md) → "Response cache (Phase 11)" before enabling it. ## Hard constraints / prerequisites @@ -14,7 +14,7 @@ A .NET 10 Windows Service that sits inline as a Modbus TCP proxy in front of a f ``` src/Mbproxy/ Main C# project (net10.0, Microsoft.NET.Sdk.Worker) -tests/Mbproxy.Tests/ xUnit v3 test project (282 unit + 43 E2E tests) +tests/Mbproxy.Tests/ xUnit v3 test project (314 unit + 48 E2E tests) install/ PowerShell install/uninstall scripts and config template docs/ Design document, phase plans, and operations runbook DL260/ DL205/DL260 reference material and pymodbus simulator profile diff --git a/mbproxy/docs/plan/11-response-cache.md b/mbproxy/docs/plan/11-response-cache.md index 90ab583..807114b 100644 --- a/mbproxy/docs/plan/11-response-cache.md +++ b/mbproxy/docs/plan/11-response-cache.md @@ -365,6 +365,46 @@ If you're the agent picking up this phase: 11. **Update `docs/design.md` AND `docs/kpi.md` AND `mbproxy/CLAUDE.md` AND `install/mbproxy.config.template.json` IN THE SAME PR AS THE CODE.** Doc drift is a gate fail. The architectural pivot must be visible across all reader-facing surfaces. +## Implementation clarifications discovered during this phase + +The following clarifications were resolved while implementing Phase 11 — recorded here so +the next agent doesn't re-derive them: + +- **`CacheKey` vs `CoalescingKey` — kept SEPARATE (no aliasing).** The two records carry + the same dimensions but live in different namespaces (`Mbproxy.Proxy.Cache` vs + `Mbproxy.Proxy.Multiplexing`). Aliasing them would couple the two phases' evolution; a + duplicate 4-field record-struct is cheap enough to justify keeping them independent. + Per-key equality is record-struct value equality; the two types are never compared. +- **`CacheEntry.LastUsedTick` is a `long`, not `ushort`.** The phase doc proposed `ushort` + but the LRU comparison needs to survive >65K touches in a long-running process. The + signed-long ticker stamp suffices for the lifetime of any reasonable deployment. +- **No-cacheable-tag PLCs skip the cache entirely.** When a PLC's resolved tag map has no + entry with `CacheTtlMs > 0`, `ProxyWorker` (and `ConfigReconciler` on reseat/add) + builds the `PerPlcContext` with `Cache = null`. The multiplexer's cache check is a + no-op on a null cache, and no eviction timer is started. The "default OFF = byte- + identical to Phase 10" regression test (`Cache_DisabledByDefault_*`) lands on this code + path. +- **Cache check runs BEFORE `EnsureBackendConnectedAsync`.** A cache hit serves the + upstream client even when the backend is currently unreachable. This is intentional and + matches the design contract bullet "cache survives backend disconnects." Verified by the + unit-level `FailedBackendConnect_OnFirstRead_DoesNotPreventLaterCacheHits_*` test. +- **FC06 / FC16 invalidation requires startAddr/qty parsing.** The multiplexer's request + parser previously only extracted start/qty for FC03/FC04. Phase 11 extends it to + FC06 (qty = 1) and FC16 (qty from request) so the InFlightRequest carries the write + span; the response path then invalidates by overlap using those values. +- **Cache eviction loop uses `PeriodicTimer`.** Per the phase doc; clamps the interval + to a 100 ms floor (operator-configurable down to that) so a misconfigured + `EvictionIntervalMs = 0` doesn't become a tight loop. +- **Write invalidation only fires on SUCCESSFUL responses.** The post-rewriter check at + the backend reader inspects the response FC byte for the exception-bit (`& 0x80`). An + exception response on FC06 / FC16 (e.g. PLC in PROGRAM mode → code 04) does NOT + invalidate — consistent with "the write didn't take effect." +- **Pre-existing flake in `BackendDisconnect_CascadesToAllUpstreams`** hardened with a + poll loop. The race window between "upstream EOF observed" and "BackendDisconnectCascades + counter incremented in `TearDownBackendAsync`" is inherent to the multiplexer's + serial-pipe-dispose loop; the test now polls for up to 1 s for the counter to reach 3. + Behaviour is unchanged. + ## Cross-references - Phase 9's multiplexer is the chokepoint that hosts the cache check: [`09-txid-multiplexing.md`](09-txid-multiplexing.md). diff --git a/mbproxy/src/Mbproxy/Admin/StatusDto.cs b/mbproxy/src/Mbproxy/Admin/StatusDto.cs index 501cbd2..7e878a8 100644 --- a/mbproxy/src/Mbproxy/Admin/StatusDto.cs +++ b/mbproxy/src/Mbproxy/Admin/StatusDto.cs @@ -74,7 +74,10 @@ public sealed record FcCounts( /// QueueDepth. Phase 10 added the three coalescing counters /// (CoalescedHitCount, CoalescedMissCount, CoalescedResponseToDeadUpstream); /// the dashboard-side derived coalescingRatio is intentionally NOT carried on the wire -/// — consumers compute Hit / (Hit + Miss). +/// — consumers compute Hit / (Hit + Miss). Phase 11 added the five cache counters +/// (CacheHitCount, CacheMissCount, CacheInvalidations, +/// CacheEntryCount, CacheBytes); the dashboard-side derived +/// cacheHitRatio is intentionally NOT carried on the wire. /// public sealed record PlcBackendStatus( long ConnectsSuccess, @@ -88,7 +91,12 @@ public sealed record PlcBackendStatus( long QueueDepth, long CoalescedHitCount, long CoalescedMissCount, - long CoalescedResponseToDeadUpstream); + long CoalescedResponseToDeadUpstream, + long CacheHitCount, + long CacheMissCount, + long CacheInvalidations, + long CacheEntryCount, + long CacheBytes); /// Modbus exception counts by code. public sealed record ExceptionCounts( diff --git a/mbproxy/src/Mbproxy/Admin/StatusHtmlRenderer.cs b/mbproxy/src/Mbproxy/Admin/StatusHtmlRenderer.cs index eb88597..5c39331 100644 --- a/mbproxy/src/Mbproxy/Admin/StatusHtmlRenderer.cs +++ b/mbproxy/src/Mbproxy/Admin/StatusHtmlRenderer.cs @@ -84,6 +84,10 @@ internal static class StatusHtmlRenderer // a percentage plus the raw hit count for context. Kept compact (one cell) to // stay under the 50 KB page-weight budget. sb.Append("Coal"); + // Phase 11: cache column. Single cell carries hit-ratio percent plus raw hit + // count; an em-dash when no cache-eligible reads have occurred. Page-weight + // budget assertion stays under 50 KB for the 54-PLC fleet. + sb.Append("Cache"); sb.Append(""); foreach (var plc in status.Plcs) @@ -165,6 +169,20 @@ internal static class StatusHtmlRenderer sb.Append(pct).Append("% (").Append(coalHit).Append(')'); } sb.Append(""); + // Phase 11: cache ratio cell — same pattern as coalescing. + long cacheHit = plc.Backend.CacheHitCount; + long cacheMiss = plc.Backend.CacheMissCount; + sb.Append(""); + if (cacheHit + cacheMiss == 0) + { + sb.Append("—"); + } + else + { + int pct = (int)Math.Round(100.0 * cacheHit / (cacheHit + cacheMiss)); + sb.Append(pct).Append("% (").Append(cacheHit).Append(')'); + } + sb.Append(""); sb.Append(""); } diff --git a/mbproxy/src/Mbproxy/Admin/StatusSnapshotBuilder.cs b/mbproxy/src/Mbproxy/Admin/StatusSnapshotBuilder.cs index 026038c..e7dad1d 100644 --- a/mbproxy/src/Mbproxy/Admin/StatusSnapshotBuilder.cs +++ b/mbproxy/src/Mbproxy/Admin/StatusSnapshotBuilder.cs @@ -102,7 +102,12 @@ internal sealed class StatusSnapshotBuilder BackendQueueDepth: 0, CoalescedHitCount: 0, CoalescedMissCount: 0, - CoalescedResponseToDeadUpstream: 0); + CoalescedResponseToDeadUpstream: 0, + CacheHitCount: 0, + CacheMissCount: 0, + CacheInvalidations: 0, + CacheEntryCount: 0, + CacheBytes: 0); // Phase 08: ConnectsSuccess / ConnectsFailed are now tracked in ProxyCounters. long connectsSuccess = counters.ConnectsSuccess; @@ -140,7 +145,12 @@ internal sealed class StatusSnapshotBuilder QueueDepth: counters.BackendQueueDepth, CoalescedHitCount: counters.CoalescedHitCount, CoalescedMissCount: counters.CoalescedMissCount, - CoalescedResponseToDeadUpstream: counters.CoalescedResponseToDeadUpstream), + CoalescedResponseToDeadUpstream: counters.CoalescedResponseToDeadUpstream, + CacheHitCount: counters.CacheHitCount, + CacheMissCount: counters.CacheMissCount, + CacheInvalidations: counters.CacheInvalidations, + CacheEntryCount: counters.CacheEntryCount, + CacheBytes: counters.CacheBytes), Bytes: new PlcBytesStatus( UpstreamIn: counters.BytesUpstreamIn, UpstreamOut: counters.BytesUpstreamOut))); diff --git a/mbproxy/src/Mbproxy/Bcd/BcdTag.cs b/mbproxy/src/Mbproxy/Bcd/BcdTag.cs index 23c7385..94aa9c1 100644 --- a/mbproxy/src/Mbproxy/Bcd/BcdTag.cs +++ b/mbproxy/src/Mbproxy/Bcd/BcdTag.cs @@ -3,26 +3,39 @@ namespace Mbproxy.Bcd; /// /// Immutable description of a single BCD-encoded V-memory tag as seen on the Modbus wire. /// Width is 16 (one register) or 32 (two registers, CDAB low-word-first). +/// +/// Phase 11 — is the resolved per-tag response-cache +/// TTL in milliseconds. 0 (the default) means caching is disabled for this tag. Positive +/// values cap upstream staleness; the multi-tag-range read uses min(TTLs) across all +/// matched tags and treats any 0 in the range as "uncached for the whole read." /// -public sealed record BcdTag(ushort Address, byte Width) +public sealed record BcdTag(ushort Address, byte Width, int CacheTtlMs = 0) { /// /// Creates a and validates that Width is 16 or 32. /// /// Width is not 16 or 32. - public static BcdTag Create(ushort address, byte width) + public static BcdTag Create(ushort address, byte width, int cacheTtlMs = 0) { if (width != 16 && width != 32) throw new ArgumentException( $"BCD tag Width must be 16 or 32; got {width} at address {address}.", nameof(width)); - return new BcdTag(address, width); + if (cacheTtlMs < 0) + throw new ArgumentException( + $"BCD tag CacheTtlMs must be >= 0; got {cacheTtlMs} at address {address}.", + nameof(cacheTtlMs)); + + return new BcdTag(address, width, cacheTtlMs); } /// True when this tag occupies two registers (32-bit BCD). public bool IsThirtyTwoBit => Width == 32; + /// True when this tag opts into the Phase-11 response cache. + public bool IsCacheable => CacheTtlMs > 0; + /// /// The address of the high-word register for a 32-bit tag (Address + 1). /// Only valid when is true. diff --git a/mbproxy/src/Mbproxy/Bcd/BcdTagMap.cs b/mbproxy/src/Mbproxy/Bcd/BcdTagMap.cs index 379ee84..1213d16 100644 --- a/mbproxy/src/Mbproxy/Bcd/BcdTagMap.cs +++ b/mbproxy/src/Mbproxy/Bcd/BcdTagMap.cs @@ -46,6 +46,37 @@ public sealed class BcdTagMap public bool TryGet(ushort address, out BcdTag tag) => _map.TryGetValue(address, out tag!); + /// + /// Phase 11 — resolves the effective cache TTL for an FC03/FC04 read over the range + /// [, + ). + /// + /// Returns 0 (uncached) when: + /// + /// The range covers no configured BCD tags (nothing to cache for, conservatively). + /// Any covered tag has = 0 (the + /// conservative-by-design "if any tag is uncached, the whole read is uncached" rule). + /// + /// + /// Otherwise returns the minimum non-zero TTL across all covered tags. + /// + /// Allocation-free in every path (delegates to which + /// is allocation-free on no-hit and allocates only the hit list on hit). + /// + public int ResolveCacheTtlMs(ushort startAddress, ushort qty) + { + if (!TryGetForRange(startAddress, qty, out var hits) || hits.Count == 0) + return 0; + + int min = int.MaxValue; + foreach (var hit in hits) + { + int ttl = hit.Tag.CacheTtlMs; + if (ttl <= 0) return 0; + if (ttl < min) min = ttl; + } + return min == int.MaxValue ? 0 : min; + } + /// /// Returns every BCD tag whose register footprint intersects /// [, + ). diff --git a/mbproxy/src/Mbproxy/Bcd/BcdTagMapBuilder.cs b/mbproxy/src/Mbproxy/Bcd/BcdTagMapBuilder.cs index 3b6749c..f673ac8 100644 --- a/mbproxy/src/Mbproxy/Bcd/BcdTagMapBuilder.cs +++ b/mbproxy/src/Mbproxy/Bcd/BcdTagMapBuilder.cs @@ -31,6 +31,24 @@ public static class BcdTagMapBuilder /// as a fatal configuration problem. /// public static ValidationResult Build(BcdTagListOptions global, PlcBcdOverrides? perPlc) + => Build(global, perPlc, perPlcDefaultCacheTtlMs: 0); + + /// + /// Phase 11 overload — resolves the effective BCD tag list for one PLC and validates + /// it, additionally folding the per-PLC into + /// any tag whose explicit is null. + /// + /// Resolution order per tag: + /// + /// Explicit per-tag CacheTtlMs if set (including explicit 0). + /// Otherwise the per-PLC default. + /// Otherwise 0 (uncached). + /// + /// + public static ValidationResult Build( + BcdTagListOptions global, + PlcBcdOverrides? perPlc, + int perPlcDefaultCacheTtlMs) { var errors = new List(); var warnings = new List(); @@ -84,7 +102,12 @@ public static class BcdTagMapBuilder continue; } - validated[addr] = BcdTag.Create(addr, opt.Width); + // Phase 11 — resolve the effective per-tag cache TTL: + // explicit per-tag (including 0) wins; otherwise fall back to per-PLC default. + int resolvedTtl = opt.CacheTtlMs ?? perPlcDefaultCacheTtlMs; + if (resolvedTtl < 0) resolvedTtl = 0; + + validated[addr] = BcdTag.Create(addr, opt.Width, resolvedTtl); } // High-register collision check (only meaningful for 32-bit entries). diff --git a/mbproxy/src/Mbproxy/Configuration/ConfigReconciler.cs b/mbproxy/src/Mbproxy/Configuration/ConfigReconciler.cs index 9a031a4..116a703 100644 --- a/mbproxy/src/Mbproxy/Configuration/ConfigReconciler.cs +++ b/mbproxy/src/Mbproxy/Configuration/ConfigReconciler.cs @@ -2,6 +2,7 @@ using System.Threading.Channels; using Mbproxy.Bcd; using Mbproxy.Options; using Mbproxy.Proxy; +using Mbproxy.Proxy.Cache; using Mbproxy.Proxy.Multiplexing; using Mbproxy.Proxy.Supervision; using Microsoft.Extensions.Options; @@ -274,14 +275,15 @@ internal sealed partial class ConfigReconciler : IDisposable await old.DisposeAsync().ConfigureAwait(false); } - // Build fresh context. - var result = BcdTagMapBuilder.Build(next.BcdTags, plcNew.BcdTags); + // Build fresh context. Phase 11: pass DefaultCacheTtlMs. + var result = BcdTagMapBuilder.Build(next.BcdTags, plcNew.BcdTags, plcNew.DefaultCacheTtlMs); var newCtx = new PerPlcContext { PlcName = plcNew.Name, TagMap = result.Map, Counters = new Proxy.ProxyCounters(), Logger = _loggerFactory.CreateLogger($"Mbproxy.Proxy.BcdRewriter.{plcNew.Name}"), + Cache = BuildCacheIfNeeded(result.Map, next.Cache), }; // Build and start new supervisor. @@ -330,6 +332,9 @@ internal sealed partial class ConfigReconciler : IDisposable // Preserve existing counters so operators see real history. Counters = supervisor.CurrentCounters, Logger = _loggerFactory.CreateLogger($"Mbproxy.Proxy.BcdRewriter.{name}"), + // Phase 11: any reseat (tag-map change) constructs a fresh cache. + // The supervisor disposes the old one inside ReplaceContextAsync. + Cache = BuildCacheIfNeeded(newMap, next.Cache), }; using var reseatCts = CancellationTokenSource.CreateLinkedTokenSource(ct); @@ -355,13 +360,15 @@ internal sealed partial class ConfigReconciler : IDisposable { try { - var result = BcdTagMapBuilder.Build(next.BcdTags, plcNew.BcdTags); + // Phase 11: pass DefaultCacheTtlMs. + var result = BcdTagMapBuilder.Build(next.BcdTags, plcNew.BcdTags, plcNew.DefaultCacheTtlMs); var newCtx = new PerPlcContext { PlcName = plcNew.Name, TagMap = result.Map, Counters = new Proxy.ProxyCounters(), Logger = _loggerFactory.CreateLogger($"Mbproxy.Proxy.BcdRewriter.{plcNew.Name}"), + Cache = BuildCacheIfNeeded(result.Map, next.Cache), }; var recoveryPipeline = PolicyFactory.BuildListenerRecovery( @@ -405,6 +412,19 @@ internal sealed partial class ConfigReconciler : IDisposable // ── Helpers ─────────────────────────────────────────────────────────────────────────── + /// + /// Phase 11 — constructs a only when at least one resolved + /// tag in opts in ( > 0). + /// Returns null otherwise so the no-cache path is byte-identical to Phase 10. + /// + private static ResponseCache? BuildCacheIfNeeded(BcdTagMap map, CacheOptions opts) + { + foreach (var t in map.All) + if (t.CacheTtlMs > 0) + return new ResponseCache(opts.MaxEntriesPerPlc, opts.EvictionIntervalMs); + return null; + } + private static int ComputeGlobalTagDelta(BcdTagListOptions before, BcdTagListOptions after) { // Count entries in before but not in after (removed), plus entries in after diff --git a/mbproxy/src/Mbproxy/Configuration/ReloadPlan.cs b/mbproxy/src/Mbproxy/Configuration/ReloadPlan.cs index 5ebea6f..6282a2b 100644 --- a/mbproxy/src/Mbproxy/Configuration/ReloadPlan.cs +++ b/mbproxy/src/Mbproxy/Configuration/ReloadPlan.cs @@ -78,8 +78,10 @@ public sealed record ReloadPlan( // Tag-map change → reseat (swap context, keep socket). // We must build both maps to compare them structurally. // Compute happens after validation so Build should never return errors here. - var oldMap = BcdTagMapBuilder.Build(current.BcdTags, plcOld.BcdTags).Map; - var newMap = BcdTagMapBuilder.Build(next.BcdTags, plcNew.BcdTags).Map; + // Phase 11: include DefaultCacheTtlMs in the build so a per-PLC default change + // is detected by TagMapsEqual via the per-tag CacheTtlMs delta. + var oldMap = BcdTagMapBuilder.Build(current.BcdTags, plcOld.BcdTags, plcOld.DefaultCacheTtlMs).Map; + var newMap = BcdTagMapBuilder.Build(next.BcdTags, plcNew.BcdTags, plcNew.DefaultCacheTtlMs).Map; if (!TagMapsEqual(oldMap, newMap)) toReseat.Add((name, newMap)); @@ -94,7 +96,9 @@ public sealed record ReloadPlan( /// /// Structural equality between two instances: same set of - /// (Address, Width) pairs. Order doesn't matter — we compare as sets. + /// (Address, Width, CacheTtlMs) triples. Order doesn't matter — we compare as sets. + /// Phase 11 includes in the comparison so a per-tag + /// or per-PLC default TTL change reseats the context (which flushes the cache). /// private static bool TagMapsEqual(BcdTagMap a, BcdTagMap b) { @@ -106,6 +110,8 @@ public sealed record ReloadPlan( return false; if (tag.Width != bTag.Width) return false; + if (tag.CacheTtlMs != bTag.CacheTtlMs) + return false; } return true; diff --git a/mbproxy/src/Mbproxy/Configuration/ReloadValidator.cs b/mbproxy/src/Mbproxy/Configuration/ReloadValidator.cs index 85bc482..f2a4781 100644 --- a/mbproxy/src/Mbproxy/Configuration/ReloadValidator.cs +++ b/mbproxy/src/Mbproxy/Configuration/ReloadValidator.cs @@ -77,12 +77,54 @@ internal static class ReloadValidator // well-formedness; we must not duplicate its validation logic here. foreach (var plc in next.Plcs) { - var result = BcdTagMapBuilder.Build(next.BcdTags, plc.BcdTags); + var result = BcdTagMapBuilder.Build(next.BcdTags, plc.BcdTags, plc.DefaultCacheTtlMs); foreach (var err in result.Errors) errs.Add($"Plc '{plc.Name}': BCD tag map error ({err.Kind}): {err.Message}"); } + // ── 5. Cache TTL bounds (Phase 11) ──────────────────────────────────── + // The MbproxyOptionsValidator catches these at schema time too, but ReloadValidator + // is the gate that the hot-reload path consults directly so re-checking here keeps + // both paths internally consistent (and the validator runs against tag-map-resolved + // BcdTag.CacheTtlMs values too). + bool allowLongTtl = next.Cache.AllowLongTtl; + foreach (var tag in next.BcdTags.Global) + { + CheckTtl(errs, $"BcdTags.Global Address {tag.Address}", tag.CacheTtlMs, allowLongTtl); + } + foreach (var plc in next.Plcs) + { + if (plc.DefaultCacheTtlMs > 60_000 && !allowLongTtl) + errs.Add( + $"Plc '{plc.Name}': DefaultCacheTtlMs={plc.DefaultCacheTtlMs} exceeds 60_000 ms " + + "without Cache.AllowLongTtl=true."); + else if (plc.DefaultCacheTtlMs < 0) + errs.Add($"Plc '{plc.Name}': DefaultCacheTtlMs must be >= 0; got {plc.DefaultCacheTtlMs}."); + + if (plc.BcdTags?.Add is { } addList) + { + foreach (var tag in addList) + CheckTtl(errs, $"Plc '{plc.Name}' BcdTags.Add Address {tag.Address}", + tag.CacheTtlMs, allowLongTtl); + } + } + if (next.Cache.MaxEntriesPerPlc < 0) + errs.Add($"Cache.MaxEntriesPerPlc must be >= 0; got {next.Cache.MaxEntriesPerPlc}."); + if (next.Cache.EvictionIntervalMs < 0) + errs.Add($"Cache.EvictionIntervalMs must be >= 0; got {next.Cache.EvictionIntervalMs}."); + errors = errs; return errs.Count == 0; } + + private static void CheckTtl(List errs, string context, int? ttl, bool allowLongTtl) + { + if (ttl is null) return; + int v = ttl.Value; + if (v < 0) + errs.Add($"{context}: CacheTtlMs must be >= 0; got {v}."); + else if (v > 60_000 && !allowLongTtl) + errs.Add( + $"{context}: CacheTtlMs={v} exceeds 60_000 ms without Cache.AllowLongTtl=true."); + } } diff --git a/mbproxy/src/Mbproxy/Options/BcdTagOptions.cs b/mbproxy/src/Mbproxy/Options/BcdTagOptions.cs index 8f12eaa..f7cc8cb 100644 --- a/mbproxy/src/Mbproxy/Options/BcdTagOptions.cs +++ b/mbproxy/src/Mbproxy/Options/BcdTagOptions.cs @@ -4,4 +4,12 @@ public sealed class BcdTagOptions { public ushort Address { get; init; } public byte Width { get; init; } // 16 or 32 + + /// + /// Phase 11 — optional opt-in to the response cache. Null (default) means + /// "unset" and falls back to the per-PLC ; + /// 0 explicitly disables caching for this tag even when the PLC default is non-zero. + /// Positive values cap the staleness window in milliseconds. + /// + public int? CacheTtlMs { get; init; } } diff --git a/mbproxy/src/Mbproxy/Options/MbproxyOptions.cs b/mbproxy/src/Mbproxy/Options/MbproxyOptions.cs index 0b50e7c..0fb98aa 100644 --- a/mbproxy/src/Mbproxy/Options/MbproxyOptions.cs +++ b/mbproxy/src/Mbproxy/Options/MbproxyOptions.cs @@ -9,6 +9,41 @@ public sealed class MbproxyOptions public int AdminPort { get; init; } = 8080; public ConnectionOptions Connection { get; init; } = new(); public ResilienceOptions Resilience { get; init; } = new(); + + /// + /// Phase 11 — service-wide response-cache settings. The cache is opt-in + /// per-tag (); this section configures the + /// safety knobs that gate / bound the cache. + /// + public CacheOptions Cache { get; init; } = new(); +} + +/// +/// Phase 11 — service-wide response-cache knobs. The cache is OFF by default for every +/// tag; this section governs the limits when an operator opts a tag in. +/// +public sealed class CacheOptions +{ + /// + /// Gate for any greater than 60_000 ms. + /// Defaults to false so accidentally-stale-for-an-hour deployments are caught + /// at reload validation. Set to true to explicitly allow long TTLs. + /// + public bool AllowLongTtl { get; init; } = false; + + /// + /// LRU cap on the number of entries per-PLC. Past this cap, the next insert evicts + /// the least-recently-used entry. Defaults to 1000 — comfortable for a 54-PLC fleet + /// with short TTLs. + /// + public int MaxEntriesPerPlc { get; init; } = 1000; + + /// + /// Background eviction loop tick in milliseconds. Each tick scans the cache and + /// removes entries past their ExpiresAtUtc. Defaults to 5000 ms; values below + /// 100 ms are clamped at 100 to avoid pathologically tight loops. + /// + public int EvictionIntervalMs { get; init; } = 5000; } /// @@ -20,28 +55,59 @@ public sealed class MbproxyOptionsValidator : IValidateOptions public ValidateOptionsResult Validate(string? name, MbproxyOptions options) { var errors = new List(); + bool allowLongTtl = options.Cache.AllowLongTtl; foreach (var tag in options.BcdTags.Global) { if (tag.Width != 16 && tag.Width != 32) errors.Add($"BcdTags.Global: Address {tag.Address} has invalid Width {tag.Width}; must be 16 or 32."); + ValidateCacheTtl(errors, $"BcdTags.Global Address {tag.Address}", tag.CacheTtlMs, allowLongTtl); } for (int i = 0; i < options.Plcs.Count; i++) { var plc = options.Plcs[i]; + + // Phase 11 — per-PLC default TTL bounds. + if (plc.DefaultCacheTtlMs < 0) + errors.Add($"Plcs[{i}] ({plc.Name}): DefaultCacheTtlMs must be >= 0."); + else if (plc.DefaultCacheTtlMs > 60_000 && !allowLongTtl) + errors.Add( + $"Plcs[{i}] ({plc.Name}): DefaultCacheTtlMs={plc.DefaultCacheTtlMs} exceeds the 60_000 ms safety cap; " + + $"set Cache.AllowLongTtl=true to opt in."); + if (plc.BcdTags is { } overrides) { foreach (var tag in overrides.Add) { if (tag.Width != 16 && tag.Width != 32) errors.Add($"Plcs[{i}] ({plc.Name}): BcdTags.Add Address {tag.Address} has invalid Width {tag.Width}; must be 16 or 32."); + ValidateCacheTtl(errors, $"Plcs[{i}] ({plc.Name}) BcdTags.Add Address {tag.Address}", + tag.CacheTtlMs, allowLongTtl); } } } + // Cache section ranges. + if (options.Cache.MaxEntriesPerPlc < 0) + errors.Add($"Cache.MaxEntriesPerPlc must be >= 0; got {options.Cache.MaxEntriesPerPlc}."); + if (options.Cache.EvictionIntervalMs < 0) + errors.Add($"Cache.EvictionIntervalMs must be >= 0; got {options.Cache.EvictionIntervalMs}."); + return errors.Count > 0 ? ValidateOptionsResult.Fail(errors) : ValidateOptionsResult.Success; } + + private static void ValidateCacheTtl(List errors, string context, int? ttlMs, bool allowLongTtl) + { + if (ttlMs is null) return; + int value = ttlMs.Value; + if (value < 0) + errors.Add($"{context}: CacheTtlMs must be >= 0; got {value}."); + else if (value > 60_000 && !allowLongTtl) + errors.Add( + $"{context}: CacheTtlMs={value} exceeds the 60_000 ms safety cap; " + + $"set Cache.AllowLongTtl=true to opt in."); + } } diff --git a/mbproxy/src/Mbproxy/Options/PlcOptions.cs b/mbproxy/src/Mbproxy/Options/PlcOptions.cs index 3e09e6a..37f1917 100644 --- a/mbproxy/src/Mbproxy/Options/PlcOptions.cs +++ b/mbproxy/src/Mbproxy/Options/PlcOptions.cs @@ -12,4 +12,12 @@ public sealed class PlcOptions public int Port { get; init; } = 502; public PlcBcdOverrides? BcdTags { get; init; } + + /// + /// Phase 11 — per-PLC default cache TTL applied to any tag whose explicit + /// is unset (null). 0 (the default) means + /// "no caching by default at this PLC". Per-tag values always win over the per-PLC + /// default when set; an explicit zero on a tag still disables caching for that tag. + /// + public int DefaultCacheTtlMs { get; init; } = 0; } diff --git a/mbproxy/src/Mbproxy/Proxy/Cache/CacheEntry.cs b/mbproxy/src/Mbproxy/Proxy/Cache/CacheEntry.cs new file mode 100644 index 0000000..b64130a --- /dev/null +++ b/mbproxy/src/Mbproxy/Proxy/Cache/CacheEntry.cs @@ -0,0 +1,23 @@ +namespace Mbproxy.Proxy.Cache; + +/// +/// One entry in the per-PLC response cache. +/// +/// is the POST-rewriter response PDU body (FC byte + +/// byteCount + register data; no MBAP header). The cache stores rewriter-decoded bytes so +/// hits never re-invoke the BCD rewriter — both a CPU optimisation and a correctness +/// guarantee against future rewriter changes accidentally transforming an already-decoded +/// payload. +/// +/// is the LRU ordering counter. The cache assigns +/// each touch (hit or fresh insert) the next value from 's +/// monotonic ticker; LRU eviction picks the entry with the smallest tick. Using a long +/// instead of on every access keeps the hot path free +/// of clock calls and works correctly even if the wall clock moves backward. +/// +internal sealed record CacheEntry( + byte[] PduBytes, + DateTimeOffset CachedAtUtc, + DateTimeOffset ExpiresAtUtc, + int Length, + long LastUsedTick); diff --git a/mbproxy/src/Mbproxy/Proxy/Cache/CacheInvalidator.cs b/mbproxy/src/Mbproxy/Proxy/Cache/CacheInvalidator.cs new file mode 100644 index 0000000..eafe6f4 --- /dev/null +++ b/mbproxy/src/Mbproxy/Proxy/Cache/CacheInvalidator.cs @@ -0,0 +1,53 @@ +namespace Mbproxy.Proxy.Cache; + +/// +/// Pure address-range-overlap matcher for FC06 / FC16 write invalidation. +/// +/// Half-open interval math: a write covering [w, w + writeQty) overlaps an +/// entry covering [s, s + qty) iff w < s + qty AND s < w + writeQty. +/// Adjacent-but-not-overlapping (write to [10..15) vs cached [15..20)) does +/// NOT match — register 15 is not in the cached range. +/// +/// Scope is restricted to FC03 / FC04 keys; we never cache writes so invalidation +/// only applies to read entries. Different unitId bytes never invalidate each other +/// (multi-drop / gateway personalities behind a shared socket). +/// +internal static class CacheInvalidator +{ + /// + /// Returns every in that satisfies: + /// + /// equals . + /// is 0x03 or 0x04 (only read entries are evicted). + /// The key's range [StartAddress, StartAddress + Qty) overlaps + /// the write range [writeStart, writeStart + writeQty). + /// + /// + /// Pure function; the returned enumeration is materialised so callers can mutate + /// the haystack while iterating the result without raising "collection modified." + /// + public static IEnumerable FindOverlapping( + IReadOnlyCollection haystack, + byte unitId, + ushort writeStart, + ushort writeQty) + { + // writeQty = 0 — a degenerate write that covers no registers. Nothing to invalidate. + if (writeQty == 0) return Array.Empty(); + + int writeEnd = writeStart + writeQty; // half-open upper bound + + var hits = new List(); + foreach (var key in haystack) + { + if (key.UnitId != unitId) continue; + if (key.Fc != 0x03 && key.Fc != 0x04) continue; + + int keyEnd = key.StartAddress + key.Qty; + // Overlap iff writeStart < keyEnd AND key.StartAddress < writeEnd. + if (writeStart < keyEnd && key.StartAddress < writeEnd) + hits.Add(key); + } + return hits; + } +} diff --git a/mbproxy/src/Mbproxy/Proxy/Cache/CacheKey.cs b/mbproxy/src/Mbproxy/Proxy/Cache/CacheKey.cs new file mode 100644 index 0000000..35ea8a7 --- /dev/null +++ b/mbproxy/src/Mbproxy/Proxy/Cache/CacheKey.cs @@ -0,0 +1,26 @@ +using Mbproxy.Proxy.Multiplexing; + +namespace Mbproxy.Proxy.Cache; + +/// +/// Hash key for the per-PLC . Structurally identical to +/// Phase 10's — both keys discriminate the same dimensions +/// (UnitId, FunctionCode, StartAddress, Quantity), but the two type aliases live in +/// different namespaces so the two phases can evolve independently without one shaping +/// the other's API surface. +/// +/// Equality semantics: record-struct value equality. FC03 and FC04 produce +/// different keys for the same address (different Modbus tables); different +/// bytes never share a key (different PLC personalities behind a +/// shared socket); reads of different never share a key (the responses +/// carry different register counts and would not be interchangeable on a fan-out). +/// +/// Scope: only FC03 (Read Holding Registers) and FC04 (Read Input Registers) +/// are cacheable. FC06 / FC16 writes invalidate cache entries by ADDRESS RANGE OVERLAP +/// rather than exact-key match — see . +/// +internal readonly record struct CacheKey( + byte UnitId, + byte Fc, + ushort StartAddress, + ushort Qty); diff --git a/mbproxy/src/Mbproxy/Proxy/Cache/CacheLogEvents.cs b/mbproxy/src/Mbproxy/Proxy/Cache/CacheLogEvents.cs new file mode 100644 index 0000000..7c79ffc --- /dev/null +++ b/mbproxy/src/Mbproxy/Proxy/Cache/CacheLogEvents.cs @@ -0,0 +1,77 @@ +namespace Mbproxy.Proxy.Cache; + +/// +/// Source-generated definitions for the Phase-11 response +/// cache. Event names are stable — do not rename without updating docs/design.md's +/// Logging event-name table. +/// +/// Levels are conservative — a busy PLC under steady cache pressure would emit one +/// Hit / Miss per FC03/FC04 request. The counters surface the same data at far lower cost +/// for monitoring; Debug-level events are present for incident-time diagnosis only. +/// +internal static partial class CacheLogEvents +{ + [LoggerMessage( + EventId = 140, + EventName = "mbproxy.cache.hit", + Level = LogLevel.Debug, + Message = "Cache hit: Plc={Plc} Unit={UnitId} Fc={Fc} Start={Start} Qty={Qty}")] + public static partial void Hit( + ILogger logger, + string plc, + byte unitId, + byte fc, + ushort start, + ushort qty); + + [LoggerMessage( + EventId = 141, + EventName = "mbproxy.cache.miss", + Level = LogLevel.Debug, + Message = "Cache miss: Plc={Plc} Unit={UnitId} Fc={Fc} Start={Start} Qty={Qty}")] + public static partial void Miss( + ILogger logger, + string plc, + byte unitId, + byte fc, + ushort start, + ushort qty); + + [LoggerMessage( + EventId = 142, + EventName = "mbproxy.cache.store", + Level = LogLevel.Debug, + Message = "Cache store: Plc={Plc} Unit={UnitId} Fc={Fc} Start={Start} Qty={Qty} TtlMs={TtlMs}")] + public static partial void Store( + ILogger logger, + string plc, + byte unitId, + byte fc, + ushort start, + ushort qty, + int ttlMs); + + [LoggerMessage( + EventId = 143, + EventName = "mbproxy.cache.invalidated", + Level = LogLevel.Debug, + Message = "Cache invalidated: Plc={Plc} Unit={UnitId} WriteStart={WriteStart} WriteQty={WriteQty} Count={Count}")] + public static partial void Invalidated( + ILogger logger, + string plc, + byte unitId, + ushort writeStart, + ushort writeQty, + int count); + + [LoggerMessage( + EventId = 144, + EventName = "mbproxy.cache.flushed", + Level = LogLevel.Information, + Message = "Cache flushed: Plc={Plc} Reason={Reason} Count={Count}")] + public static partial void Flushed( + ILogger logger, + string plc, + string reason, + int count); +} diff --git a/mbproxy/src/Mbproxy/Proxy/Cache/ResponseCache.cs b/mbproxy/src/Mbproxy/Proxy/Cache/ResponseCache.cs new file mode 100644 index 0000000..c46963d --- /dev/null +++ b/mbproxy/src/Mbproxy/Proxy/Cache/ResponseCache.cs @@ -0,0 +1,277 @@ +namespace Mbproxy.Proxy.Cache; + +/// +/// Per-PLC opt-in response cache for FC03 / FC04 read responses. Phase 11. +/// +/// Lifecycle. One instance per PLC, owned by the per-PLC context. The cache +/// is consulted on every FC03/FC04 request before coalescing; populated by the backend +/// reader task AFTER the BCD rewriter has decoded the response; invalidated on every +/// successful FC06/FC16 write response that overlaps a cached read range. +/// +/// Concurrency. A single lock serialises every method. +/// A per-PLC cache sees at most one outstanding FC03/FC04 read on the backend at any +/// instant (the multiplexer serialises onto the shared socket), but the read-on-hit path +/// is called from many upstream task contexts concurrently; the lock is small and fast. +/// +/// LRU eviction. Each touch (hit or insert) assigns the entry the next value +/// from . When the cache reaches and a +/// new entry is inserted, the entry with the smallest +/// is removed. +/// +/// TTL expiry. Entries past their are +/// dropped lazily on every read attempt, and also swept proactively by a background +/// loop every . The background +/// loop is the safety net that prevents abandoned entries (PLC whose clients all dropped) +/// from holding memory until process exit. +/// +internal sealed class ResponseCache : IDisposable +{ + // ── State ──────────────────────────────────────────────────────────────────── + private readonly object _lock = new(); + private readonly Dictionary _entries; + private readonly int _maxEntries; + private readonly int _evictionIntervalMs; + + private long _lruTicker; + private long _approxBytes; + + private readonly CancellationTokenSource _cts = new(); + private readonly Task _evictionTask; + private bool _disposed; + + /// + /// Constructs a cache with the supplied capacity and eviction tick interval. The + /// eviction loop starts immediately; the cache becomes usable as soon as the + /// constructor returns. + /// + /// LRU cap. Past this count, the next insert evicts + /// the least-recently-used entry. Must be >= 0; 0 disables caching entirely (every + /// call no-ops). + /// Background sweep interval in milliseconds. Clamped + /// to a 100 ms floor and an effective ceiling of int.MaxValue. + public ResponseCache(int maxEntriesPerPlc, int evictionIntervalMs) + { + if (maxEntriesPerPlc < 0) + throw new ArgumentOutOfRangeException(nameof(maxEntriesPerPlc), + "maxEntriesPerPlc must be >= 0."); + if (evictionIntervalMs < 0) + throw new ArgumentOutOfRangeException(nameof(evictionIntervalMs), + "evictionIntervalMs must be >= 0."); + + _maxEntries = maxEntriesPerPlc; + // 100 ms floor — protects against pathologically tight loops; 0 (operator-pinned) + // becomes 100 ms here so the eviction task isn't a tight loop spinning on + // _entries. + _evictionIntervalMs = Math.Max(100, evictionIntervalMs); + _entries = new Dictionary(capacity: Math.Min(_maxEntries, 64)); + + _evictionTask = Task.Run(() => RunEvictionLoopAsync(_cts.Token)); + } + + /// Current entry count. Stable read under lock. + public int Count + { + get { lock (_lock) return _entries.Count; } + } + + /// Approximation of cached PDU bytes (Sum of ). Stable read under lock. + public long ApproximateBytes + { + get { lock (_lock) return _approxBytes; } + } + + /// + /// Returns true with the cached when a non-expired + /// entry is present for . Expired entries are removed lazily. + /// Updates LRU ordering on hit. + /// + public bool TryGet(CacheKey key, out CacheEntry entry) + { + DateTimeOffset now = DateTimeOffset.UtcNow; + lock (_lock) + { + if (!_entries.TryGetValue(key, out var existing)) + { + entry = null!; + return false; + } + + if (existing.ExpiresAtUtc <= now) + { + // Expired — remove and miss. + _entries.Remove(key); + _approxBytes -= existing.Length; + entry = null!; + return false; + } + + long tick = ++_lruTicker; + var refreshed = existing with { LastUsedTick = tick }; + _entries[key] = refreshed; + entry = refreshed; + return true; + } + } + + /// + /// Inserts or replaces the entry under . If the cache is at + /// capacity, evicts the LRU entry first. No-op when is 0. + /// + public void Set(CacheKey key, CacheEntry entry) + { + if (_maxEntries == 0) return; + + lock (_lock) + { + long tick = ++_lruTicker; + var stamped = entry with { LastUsedTick = tick }; + + if (_entries.TryGetValue(key, out var existing)) + { + // Replace; adjust byte accounting. + _approxBytes -= existing.Length; + _approxBytes += stamped.Length; + _entries[key] = stamped; + return; + } + + // Insert. Evict LRU if at cap. + if (_entries.Count >= _maxEntries) + EvictLeastRecentlyUsed(); + + _entries[key] = stamped; + _approxBytes += stamped.Length; + } + } + + /// + /// Invalidates every entry whose range overlaps the write + /// [startAddress, startAddress + qty) on . Returns the + /// count of invalidated entries. + /// + public int Invalidate(byte unitId, ushort startAddress, ushort qty) + { + lock (_lock) + { + // Snapshot keys for the pure overlap matcher. + var keys = _entries.Keys.ToArray(); + int count = 0; + foreach (var k in CacheInvalidator.FindOverlapping(keys, unitId, startAddress, qty)) + { + if (_entries.TryGetValue(k, out var existing)) + { + _entries.Remove(k); + _approxBytes -= existing.Length; + count++; + } + } + return count; + } + } + + /// + /// Drops every entry. Used by hot-reload (per-PLC flush on tag-map change). + /// Returns the count of entries that were present before the flush. + /// + public int Clear() + { + lock (_lock) + { + int n = _entries.Count; + _entries.Clear(); + _approxBytes = 0; + return n; + } + } + + /// + /// Stops the eviction loop and disposes the internal CTS. Idempotent. + /// + public void Dispose() + { + if (_disposed) return; + _disposed = true; + + try { _cts.Cancel(); } catch { /* best effort */ } + + // Best-effort join the eviction loop; the loop will observe the cancellation and + // exit. We bound the wait so a faulted loop doesn't hold up disposal. + try { _evictionTask.Wait(TimeSpan.FromSeconds(1)); } catch { /* best effort */ } + + _cts.Dispose(); + } + + // ── Eviction internals ─────────────────────────────────────────────────────── + + private void EvictLeastRecentlyUsed() + { + // Linear scan — acceptable at MaxEntriesPerPlc = 1000 (insert path is far cheaper + // than the network round-trip the cache is saving). A sorted secondary structure + // would be a premature optimisation. + CacheKey lruKey = default; + long lruTick = long.MaxValue; + bool found = false; + + foreach (var kvp in _entries) + { + if (kvp.Value.LastUsedTick < lruTick) + { + lruTick = kvp.Value.LastUsedTick; + lruKey = kvp.Key; + found = true; + } + } + + if (found && _entries.TryGetValue(lruKey, out var existing)) + { + _entries.Remove(lruKey); + _approxBytes -= existing.Length; + } + } + + private async Task RunEvictionLoopAsync(CancellationToken ct) + { + var period = TimeSpan.FromMilliseconds(_evictionIntervalMs); + using var timer = new PeriodicTimer(period); + try + { + while (await timer.WaitForNextTickAsync(ct).ConfigureAwait(false)) + { + SweepExpired(); + } + } + catch (OperationCanceledException) + { + // Normal disposal. + } + catch + { + // Defensive — eviction loop must never fault the host. A swallow here means + // entries are only evicted on access until disposal, which is correctness-preserving. + } + } + + private void SweepExpired() + { + DateTimeOffset now = DateTimeOffset.UtcNow; + lock (_lock) + { + if (_entries.Count == 0) return; + // Two-pass to avoid mutating during enumeration. + var expired = new List(); + foreach (var kvp in _entries) + { + if (kvp.Value.ExpiresAtUtc <= now) + expired.Add(kvp.Key); + } + foreach (var k in expired) + { + if (_entries.TryGetValue(k, out var existing)) + { + _entries.Remove(k); + _approxBytes -= existing.Length; + } + } + } + } +} diff --git a/mbproxy/src/Mbproxy/Proxy/Multiplexing/InFlightRequest.cs b/mbproxy/src/Mbproxy/Proxy/Multiplexing/InFlightRequest.cs index 9d375a0..629a50b 100644 --- a/mbproxy/src/Mbproxy/Proxy/Multiplexing/InFlightRequest.cs +++ b/mbproxy/src/Mbproxy/Proxy/Multiplexing/InFlightRequest.cs @@ -38,4 +38,5 @@ internal sealed record InFlightRequest( ushort StartAddress, ushort Qty, IReadOnlyList InterestedParties, - DateTimeOffset SentAtUtc); + DateTimeOffset SentAtUtc, + int ResolvedCacheTtlMs = 0); diff --git a/mbproxy/src/Mbproxy/Proxy/Multiplexing/PlcMultiplexer.cs b/mbproxy/src/Mbproxy/Proxy/Multiplexing/PlcMultiplexer.cs index dbe0b91..92bf52e 100644 --- a/mbproxy/src/Mbproxy/Proxy/Multiplexing/PlcMultiplexer.cs +++ b/mbproxy/src/Mbproxy/Proxy/Multiplexing/PlcMultiplexer.cs @@ -3,6 +3,7 @@ using System.Diagnostics; using System.Net.Sockets; using System.Threading.Channels; using Mbproxy.Options; +using Mbproxy.Proxy.Cache; using Polly; namespace Mbproxy.Proxy.Multiplexing; @@ -102,6 +103,12 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi _backendConnectPipeline = backendConnectPipeline; _coalescingOptions = coalescingOptions ?? (static () => new ReadCoalescingOptions()); + // Phase 11 — register the per-PLC cache as the live stats source for the snapshot + // path. Cache may be null when the per-PLC context has not been wired with one + // (every tag uncached, or unit tests). + if (_ctx.Cache is not null) + _ctx.Counters.SetCacheStatsProvider(new CacheStatsAdapter(_ctx.Cache)); + // Register this multiplexer as the live telemetry source for the PLC's counters. _ctx.Counters.SetMultiplexProvider(this); @@ -177,6 +184,7 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi // Stop the counters provider link so a status snapshot during teardown doesn't // see live-but-soon-to-be-empty internal state. _ctx.Counters.SetMultiplexProvider(null); + _ctx.Counters.SetCacheStatsProvider(null); await _disposeCts.CancelAsync().ConfigureAwait(false); @@ -450,6 +458,57 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi frame.AsSpan(MbapFrame.HeaderSize, pduBodyLen), responseCtx); + // Phase 11 — post-rewriter cache update: + // * FC03/FC04 successful responses are stored when the request was + // cache-eligible (resolvedTtlMs > 0). + // * FC06/FC16 successful responses invalidate every cached entry whose + // address range overlaps the write. + if (_ctx.Cache is { } postCache) + { + byte fcInResponse = frame[MbapFrame.HeaderSize]; // post-rewriter, but the FC byte is never rewritten + bool isException = (fcInResponse & 0x80) != 0; + + if (!isException) + { + if (inFlight.Fc is 0x03 or 0x04 && inFlight.ResolvedCacheTtlMs > 0) + { + // Snapshot the post-rewriter PDU body so the cached entry is + // independent of this frame's lifetime. + byte[] pduSnapshot = new byte[pduBodyLen]; + Buffer.BlockCopy(frame, MbapFrame.HeaderSize, pduSnapshot, 0, pduBodyLen); + + var cacheKey = new CacheKey( + inFlight.UnitId, inFlight.Fc, + inFlight.StartAddress, inFlight.Qty); + var now = DateTimeOffset.UtcNow; + var entry = new CacheEntry( + PduBytes: pduSnapshot, + CachedAtUtc: now, + ExpiresAtUtc: now.AddMilliseconds(inFlight.ResolvedCacheTtlMs), + Length: pduSnapshot.Length, + LastUsedTick: 0); // ResponseCache.Set stamps the real tick + postCache.Set(cacheKey, entry); + + CacheLogEvents.Store(_logger, _plc.Name, + inFlight.UnitId, inFlight.Fc, + inFlight.StartAddress, inFlight.Qty, + inFlight.ResolvedCacheTtlMs); + } + else if (inFlight.Fc is 0x06 or 0x10) + { + int invalidated = postCache.Invalidate( + inFlight.UnitId, inFlight.StartAddress, inFlight.Qty); + if (invalidated > 0) + { + _ctx.Counters.AddCacheInvalidations(invalidated); + CacheLogEvents.Invalidated(_logger, _plc.Name, + inFlight.UnitId, inFlight.StartAddress, inFlight.Qty, + invalidated); + } + } + } + } + // Fan out to each interested party with their original TxId restored. // Phase 9: always exactly one party. Phase 10: N parties (read coalescing). // Note: the InFlightByKey TryRemove above (for FC03/FC04) guarantees no @@ -506,15 +565,6 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi { if (_disposed) return; - // Ensure backend is connected. Failure here means we cannot service the request; - // close the upstream pipe (consistent with the 1:1 model's behaviour on connect - // failure). - if (!await EnsureBackendConnectedAsync(ct).ConfigureAwait(false)) - { - try { await pipe.DisposeAsync().ConfigureAwait(false); } catch { /* best effort */ } - return; - } - if (frame.Length < MbapFrame.HeaderSize) return; @@ -522,9 +572,11 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi out ushort originalTxId, out _, out _, out byte unitId)) return; - // Parse the PDU FC + start/qty (for FC03/04) — needed for both the coalescing-key - // path and the response correlation slot. FC06/FC16 (writes) keep startAddr/qty = 0; - // they bypass coalescing entirely. + // Parse the PDU FC + start/qty. FC03/FC04 reads use start/qty for the coalescing key + // and (Phase 11) for the cache lookup. FC06 writes carry [addr][value]; we treat qty + // as 1 for invalidation. FC16 carries [start][qty][byteCount]...; qty is the write + // span used for cache invalidation. Phase 11: FC06/FC16 start/qty drive cache + // invalidation by overlap rather than exact key. int pduOffset = MbapFrame.HeaderSize; byte fcByte = frame.Length > pduOffset ? frame[pduOffset] : (byte)0; ushort startAddr = 0; @@ -534,6 +586,56 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi startAddr = (ushort)((frame[pduOffset + 1] << 8) | frame[pduOffset + 2]); qty = (ushort)((frame[pduOffset + 3] << 8) | frame[pduOffset + 4]); } + else if (fcByte == 0x06 && frame.Length >= pduOffset + 5) + { + // FC06 = Write Single Register. PDU: [fc=06][addrHi][addrLo][valHi][valLo]. + // For cache invalidation we represent this as qty=1 at addr. + startAddr = (ushort)((frame[pduOffset + 1] << 8) | frame[pduOffset + 2]); + qty = 1; + } + else if (fcByte == 0x10 && frame.Length >= pduOffset + 5) + { + // FC16 = Write Multiple Registers. PDU: [fc=10][startHi][startLo][qtyHi][qtyLo][byteCount]... + startAddr = (ushort)((frame[pduOffset + 1] << 8) | frame[pduOffset + 2]); + qty = (ushort)((frame[pduOffset + 3] << 8) | frame[pduOffset + 4]); + } + + // Phase 11 — response-cache path. Cache check happens BEFORE coalescing AND before + // we attempt to bring up the backend connection. A hit short-circuits everything, + // including the EnsureBackendConnectedAsync call — operators with all reads cached + // and the backend down still get served (the cache survives backend disconnects per + // the design contract). The cache only fires for FC03/FC04 and only when the read + // range's resolved TTL > 0. + int resolvedCacheTtlMs = 0; + if (fcByte is 0x03 or 0x04 && _ctx.Cache is { } responseCache) + { + resolvedCacheTtlMs = _ctx.TagMap.ResolveCacheTtlMs(startAddr, qty); + if (resolvedCacheTtlMs > 0) + { + var cacheKey = new CacheKey(unitId, fcByte, startAddr, qty); + if (responseCache.TryGet(cacheKey, out var cached)) + { + _ctx.Counters.IncrementCacheHit(); + CacheLogEvents.Hit(_logger, _plc.Name, unitId, fcByte, startAddr, qty); + + byte[] hitFrame = BuildCacheHitFrame(originalTxId, unitId, cached.PduBytes); + await pipe.SendResponseAsync(hitFrame, ct).ConfigureAwait(false); + return; + } + + _ctx.Counters.IncrementCacheMiss(); + CacheLogEvents.Miss(_logger, _plc.Name, unitId, fcByte, startAddr, qty); + } + } + + // Ensure backend is connected. Failure here means we cannot service the request; + // close the upstream pipe (consistent with the 1:1 model's behaviour on connect + // failure). + if (!await EnsureBackendConnectedAsync(ct).ConfigureAwait(false)) + { + try { await pipe.DisposeAsync().ConfigureAwait(false); } catch { /* best effort */ } + return; + } // Phase 10 — read-coalescing path. Only FC03/FC04 are coalescable; only when the // feature is enabled in the live config. If the late-arriving request matches an @@ -573,7 +675,8 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi StartAddress: startAddr, Qty: qty, InterestedParties: new List { newParty }, - SentAtUtc: DateTimeOffset.UtcNow); + SentAtUtc: DateTimeOffset.UtcNow, + ResolvedCacheTtlMs: resolvedCacheTtlMs); } var partyList = new List(capacity: 1) { newParty }; @@ -583,7 +686,8 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi StartAddress: startAddr, Qty: qty, InterestedParties: partyList, - SentAtUtc: DateTimeOffset.UtcNow); + SentAtUtc: DateTimeOffset.UtcNow, + ResolvedCacheTtlMs: resolvedCacheTtlMs); if (!_correlation.TryAdd(proxyTxId, inFlight)) { @@ -673,7 +777,8 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi StartAddress: startAddr, Qty: qty, InterestedParties: partyListNc, - SentAtUtc: DateTimeOffset.UtcNow); + SentAtUtc: DateTimeOffset.UtcNow, + ResolvedCacheTtlMs: resolvedCacheTtlMs); if (!_correlation.TryAdd(proxyTxIdFc, inFlightNc)) { @@ -809,6 +914,20 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi // ── Helpers ─────────────────────────────────────────────────────────────── + /// + /// Adapter exposing a 's Count / ApproximateBytes as + /// for the snapshot path. Kept as a sealed class so + /// the cache type itself doesn't need to take an interface dependency on + /// . + /// + private sealed class CacheStatsAdapter : ICacheStatsProvider + { + private readonly Cache.ResponseCache _cache; + public CacheStatsAdapter(Cache.ResponseCache cache) => _cache = cache; + public long EntryCount => _cache.Count; + public long ApproximateBytes => _cache.ApproximateBytes; + } + private static async Task FillAsync( Socket socket, byte[] buf, int offset, int count, CancellationToken ct) { @@ -824,6 +943,28 @@ internal sealed class PlcMultiplexer : IAsyncDisposable, IMultiplexCountersProvi return true; } + /// + /// Phase 11 — builds an MBAP-framed response from cached PDU bytes for the given + /// upstream party. The cache stores POST-rewriter PDU bodies (no MBAP); each hit + /// stamps a fresh MBAP header carrying the requesting party's original TxId so the + /// response looks indistinguishable from a fresh backend reply. + /// + private static byte[] BuildCacheHitFrame(ushort originalTxId, byte unitId, byte[] cachedPdu) + { + // Length field covers UnitId(1) + PDU body. Capped by Modbus spec at 253-byte PDU. + int pduLen = cachedPdu.Length; + ushort length = (ushort)(1 + pduLen); + var frame = new byte[MbapFrame.HeaderSize + pduLen]; + frame[0] = (byte)(originalTxId >> 8); + frame[1] = (byte)(originalTxId & 0xFF); + frame[2] = 0; frame[3] = 0; + frame[4] = (byte)(length >> 8); + frame[5] = (byte)(length & 0xFF); + frame[6] = unitId; + Buffer.BlockCopy(cachedPdu, 0, frame, MbapFrame.HeaderSize, pduLen); + return frame; + } + private static byte[] BuildExceptionFrame(ushort originalTxId, byte unitId, byte fc, byte exceptionCode) { // Modbus exception PDU = [fc | 0x80][exceptionCode]. diff --git a/mbproxy/src/Mbproxy/Proxy/PerPlcContext.cs b/mbproxy/src/Mbproxy/Proxy/PerPlcContext.cs index 6aa94bc..79e3dec 100644 --- a/mbproxy/src/Mbproxy/Proxy/PerPlcContext.cs +++ b/mbproxy/src/Mbproxy/Proxy/PerPlcContext.cs @@ -1,4 +1,5 @@ using Mbproxy.Bcd; +using Mbproxy.Proxy.Cache; using Mbproxy.Proxy.Multiplexing; namespace Mbproxy.Proxy; @@ -44,6 +45,14 @@ internal class PerPlcContext : PduContext /// internal InFlightRequest? CurrentRequest { get; init; } + /// + /// Phase 11 — optional per-PLC response cache. null on contexts that opt out + /// (every BCD tag has = 0) or in unit tests that don't + /// exercise the cache. The multiplexer constructs and disposes the cache alongside + /// itself. + /// + internal ResponseCache? Cache { get; init; } + /// /// Returns a shallow clone of this context with set to /// . The clone is cheap (one allocation per response) and avoids @@ -56,5 +65,6 @@ internal class PerPlcContext : PduContext Counters = Counters, Logger = Logger, CurrentRequest = req, + Cache = Cache, }; } diff --git a/mbproxy/src/Mbproxy/Proxy/ProxyCounters.cs b/mbproxy/src/Mbproxy/Proxy/ProxyCounters.cs index 9eb4e2a..3ed9a69 100644 --- a/mbproxy/src/Mbproxy/Proxy/ProxyCounters.cs +++ b/mbproxy/src/Mbproxy/Proxy/ProxyCounters.cs @@ -97,7 +97,34 @@ public sealed record CounterSnapshot( /// attached upstream pipe had already disconnected. A spike is a churn indicator; the /// metric itself is informational (Tier 2 in docs/kpi.md). /// - long CoalescedResponseToDeadUpstream); + long CoalescedResponseToDeadUpstream, + /// + /// Phase 11 — cumulative count of FC03/FC04 requests served from the response cache. + /// CacheHitCount + CacheMissCount equals total FC03/FC04 requests whose resolved + /// TTL was > 0 (cache-eligible). Reads against tags with TTL = 0 increment neither. + /// + long CacheHitCount, + /// + /// Phase 11 — cumulative count of cache-eligible FC03/FC04 requests that fell through + /// to coalescing / backend (no fresh entry was present or the entry had expired). + /// + long CacheMissCount, + /// + /// Phase 11 — cumulative count of cache entries invalidated by overlapping FC06/FC16 + /// write responses. A high rate suggests caching is fighting writes; consider lower + /// TTLs on cache-overlapping tags. + /// + long CacheInvalidations, + /// + /// Phase 11 — point-in-time snapshot of the per-PLC + /// entry count. Read on the snapshot path; 0 when no cache is wired. + /// + long CacheEntryCount, + /// + /// Phase 11 — point-in-time approximation of cached PDU bytes for this PLC. Sum of + /// across entries. Read on the snapshot path. + /// + long CacheBytes); /// /// Thread-safe per-PLC counters backed by longs. @@ -137,6 +164,16 @@ internal sealed class ProxyCounters private long _coalescedMissCount; private long _coalescedResponseToDeadUpstream; + // Phase 11 — response-cache counters. Hit + Miss = total cache-eligible FC03/FC04. + private long _cacheHitCount; + private long _cacheMissCount; + private long _cacheInvalidations; + + // Phase 11 — live cache state pulled from a per-PLC ResponseCache on each snapshot. + // The multiplexer registers a single provider via SetCacheStatsProvider so the status + // page sees current entry-count / bytes without a separate poll. + private volatile ICacheStatsProvider? _cacheStatsProvider; + // Phase 9: live state pulled from the multiplexer's allocator/map/queue on each // snapshot. The multiplexer registers a single provider via SetMultiplexProvider. // We use a volatile reference for lock-free read on the snapshot path. @@ -244,6 +281,25 @@ internal sealed class ProxyCounters public void IncrementCoalescedResponseToDeadUpstream() => Interlocked.Increment(ref _coalescedResponseToDeadUpstream); + /// Phase 11 — records one FC03/FC04 cache hit. + public void IncrementCacheHit() + => Interlocked.Increment(ref _cacheHitCount); + + /// Phase 11 — records one cache-eligible FC03/FC04 read that missed. + public void IncrementCacheMiss() + => Interlocked.Increment(ref _cacheMissCount); + + /// Phase 11 — records cache entries invalidated by a write. + public void AddCacheInvalidations(int n) + => Interlocked.Add(ref _cacheInvalidations, n); + + /// + /// Phase 11 — wires the per-PLC as the live stats + /// source for the snapshot path. Pass null to detach during disposal. + /// + internal void SetCacheStatsProvider(ICacheStatsProvider? provider) + => _cacheStatsProvider = provider; + /// /// CAS-updates the peak in-flight high-water mark. Called on every successful /// allocation by the multiplexer. Phase 9. @@ -328,6 +384,10 @@ internal sealed class ProxyCounters long txWraps = provider?.TxIdWraps ?? 0; long queueDepth = provider?.BackendQueueDepth ?? 0; + var cacheProvider = _cacheStatsProvider; + long cacheEntries = cacheProvider?.EntryCount ?? 0; + long cacheBytes = cacheProvider?.ApproximateBytes ?? 0; + return new( PdusForwarded: Interlocked.Read(ref _pdusForwarded), Fc03: Interlocked.Read(ref _fc03), @@ -357,7 +417,12 @@ internal sealed class ProxyCounters BackendQueueDepth: queueDepth, CoalescedHitCount: Interlocked.Read(ref _coalescedHitCount), CoalescedMissCount: Interlocked.Read(ref _coalescedMissCount), - CoalescedResponseToDeadUpstream: Interlocked.Read(ref _coalescedResponseToDeadUpstream)); + CoalescedResponseToDeadUpstream: Interlocked.Read(ref _coalescedResponseToDeadUpstream), + CacheHitCount: Interlocked.Read(ref _cacheHitCount), + CacheMissCount: Interlocked.Read(ref _cacheMissCount), + CacheInvalidations: Interlocked.Read(ref _cacheInvalidations), + CacheEntryCount: cacheEntries, + CacheBytes: cacheBytes); } } @@ -380,3 +445,17 @@ internal interface IMultiplexCountersProvider /// Current depth of the outbound channel (frames queued for the backend writer). long BackendQueueDepth { get; } } + +/// +/// Phase 11 — read-only window into the per-PLC 's live +/// state for the snapshot path. The multiplexer wires this on cache construction so the +/// status page sees live counts without holding a direct reference to the cache. +/// +internal interface ICacheStatsProvider +{ + /// Current cache entry count. + long EntryCount { get; } + + /// Approximation of cached PDU bytes (sum of ). + long ApproximateBytes { get; } +} diff --git a/mbproxy/src/Mbproxy/Proxy/ProxyWorker.cs b/mbproxy/src/Mbproxy/Proxy/ProxyWorker.cs index 74c7c3e..8d177d9 100644 --- a/mbproxy/src/Mbproxy/Proxy/ProxyWorker.cs +++ b/mbproxy/src/Mbproxy/Proxy/ProxyWorker.cs @@ -1,6 +1,7 @@ using Mbproxy.Bcd; using Mbproxy.Configuration; using Mbproxy.Options; +using Mbproxy.Proxy.Cache; using Mbproxy.Proxy.Multiplexing; using Mbproxy.Proxy.Supervision; using Microsoft.Extensions.Options; @@ -70,7 +71,7 @@ internal sealed partial class ProxyWorker : BackgroundService foreach (var plc in opts.Plcs) { - var result = BcdTagMapBuilder.Build(opts.BcdTags, plc.BcdTags); + var result = BcdTagMapBuilder.Build(opts.BcdTags, plc.BcdTags, plc.DefaultCacheTtlMs); foreach (var warn in result.Warnings) _logger.LogWarning("[{Plc}] BCD tag map warning: {Message}", plc.Name, warn.Message); @@ -85,12 +86,22 @@ internal sealed partial class ProxyWorker : BackgroundService continue; } + // Phase 11 — construct a per-PLC response cache only when at least one + // resolved tag opts in (CacheTtlMs > 0). Skipping cache construction for a + // PLC with no cacheable tags keeps the no-cache path free of the eviction + // timer and the per-call resolution cost, preserving "default behaviour = + // Phase 10 unchanged" when no operator has opted any tag in. + var cache = HasAnyCacheableTag(result.Map) + ? new ResponseCache(opts.Cache.MaxEntriesPerPlc, opts.Cache.EvictionIntervalMs) + : null; + plcContexts[plc.Name] = new PerPlcContext { PlcName = plc.Name, TagMap = result.Map, Counters = new ProxyCounters(), Logger = _loggerFactory.CreateLogger($"Mbproxy.Proxy.BcdRewriter.{plc.Name}"), + Cache = cache, }; } @@ -213,6 +224,20 @@ internal sealed partial class ProxyWorker : BackgroundService // ── Logging ─────────────────────────────────────────────────────────────────────────── + /// + /// Phase 11 — returns true when at least one BcdTag in the resolved map has a + /// positive . A PLC with no cacheable tags skips the + /// entirely (no eviction timer, no + /// per-call cache resolution cost), so the default-OFF deployment is byte-identical + /// to a Phase-10 deployment. + /// + private static bool HasAnyCacheableTag(BcdTagMap map) + { + foreach (var t in map.All) + if (t.CacheTtlMs > 0) return true; + return false; + } + [LoggerMessage(EventId = 1, EventName = "mbproxy.startup.ready", Level = LogLevel.Information, Message = "mbproxy service ready — ListenersBound={ListenersBound} PlcsConfigured={PlcsConfigured}")] diff --git a/mbproxy/src/Mbproxy/Proxy/Supervision/PlcListenerSupervisor.cs b/mbproxy/src/Mbproxy/Proxy/Supervision/PlcListenerSupervisor.cs index dbbb531..ceae9c5 100644 --- a/mbproxy/src/Mbproxy/Proxy/Supervision/PlcListenerSupervisor.cs +++ b/mbproxy/src/Mbproxy/Proxy/Supervision/PlcListenerSupervisor.cs @@ -198,10 +198,20 @@ internal sealed partial class PlcListenerSupervisor : IAsyncDisposable /// public Task ReplaceContextAsync(PerPlcContext newCtx, CancellationToken ct) { + // Phase 11: dispose the outgoing context's response cache (if any) so its + // eviction loop terminates. The "any tag-list reload flushes the affected PLC's + // whole cache" doctrine is satisfied here — the new context constructs its own + // fresh cache, the old cache is dropped wholesale. + var oldCache = _currentContext?.Cache; + // Volatile write: the next PlcListener created in RunSupervisorAsync will see // the new context. The accept loop itself does not hold a direct reference to // _currentContext — it was captured at PlcListener construction time. _currentContext = newCtx; + + if (oldCache is not null && !ReferenceEquals(oldCache, newCtx.Cache)) + oldCache.Dispose(); + return Task.CompletedTask; } @@ -376,6 +386,10 @@ internal sealed partial class PlcListenerSupervisor : IAsyncDisposable // Best-effort cleanup. } + // Phase 11: dispose the response cache (if any) — its eviction timer would + // otherwise outlive the supervisor. + _currentContext?.Cache?.Dispose(); + _supervisorCts.Dispose(); } diff --git a/mbproxy/tests/Mbproxy.Tests/Admin/StatusHtmlRendererTests.cs b/mbproxy/tests/Mbproxy.Tests/Admin/StatusHtmlRendererTests.cs index 4871fa7..4a251e7 100644 --- a/mbproxy/tests/Mbproxy.Tests/Admin/StatusHtmlRendererTests.cs +++ b/mbproxy/tests/Mbproxy.Tests/Admin/StatusHtmlRendererTests.cs @@ -51,7 +51,9 @@ public sealed class StatusHtmlRendererTests InFlight: 0, MaxInFlight: 0, TxIdWraps: 0, DisconnectCascades: 0, QueueDepth: 0, CoalescedHitCount: 0, CoalescedMissCount: 0, - CoalescedResponseToDeadUpstream: 0), + CoalescedResponseToDeadUpstream: 0, + CacheHitCount: 0, CacheMissCount: 0, + CacheInvalidations: 0, CacheEntryCount: 0, CacheBytes: 0), Bytes: new PlcBytesStatus(1024, 2048)); } diff --git a/mbproxy/tests/Mbproxy.Tests/Proxy/Cache/CacheEntryTests.cs b/mbproxy/tests/Mbproxy.Tests/Proxy/Cache/CacheEntryTests.cs new file mode 100644 index 0000000..c03ac93 --- /dev/null +++ b/mbproxy/tests/Mbproxy.Tests/Proxy/Cache/CacheEntryTests.cs @@ -0,0 +1,87 @@ +using Mbproxy.Proxy.Cache; +using Shouldly; +using Xunit; + +namespace Mbproxy.Tests.Proxy.Cache; + +/// +/// Cover invariants of the record: TTL boundary expiry, byte +/// independence (caller-supplied PduBytes are not mutated by the cache itself — +/// the cache copies on store), and monotonic +/// semantics when the cache stamps it. +/// +[Trait("Category", "Unit")] +public sealed class CacheEntryTests +{ + [Fact] + public void Expired_When_NowEqualsOrExceedsExpiresAtUtc() + { + var now = DateTimeOffset.UtcNow; + var entry = new CacheEntry( + PduBytes: [0x03, 0x02, 0x04, 0xD2], + CachedAtUtc: now, + ExpiresAtUtc: now.AddMilliseconds(50), + Length: 4, + LastUsedTick: 1); + + // The entry exposes ExpiresAtUtc for the cache to compare against UtcNow. Sanity: + // an entry whose ExpiresAtUtc is in the past is expired. + var past = entry with { ExpiresAtUtc = now.AddMilliseconds(-1) }; + (past.ExpiresAtUtc <= now).ShouldBeTrue("an entry whose expiry is in the past must be expired"); + + var future = entry with { ExpiresAtUtc = now.AddMilliseconds(100) }; + (future.ExpiresAtUtc > now).ShouldBeTrue("an entry whose expiry is in the future must be live"); + } + + [Fact] + public void Record_With_Expression_DoesNotMutate_OriginalArrayContents() + { + var bytes = new byte[] { 0x03, 0x02, 0x04, 0xD2 }; + var entry = new CacheEntry( + PduBytes: bytes, + CachedAtUtc: DateTimeOffset.UtcNow, + ExpiresAtUtc: DateTimeOffset.UtcNow.AddSeconds(1), + Length: 4, + LastUsedTick: 1); + + // Sanity: the entry holds a reference to the supplied array. The cache's hot path + // never mutates PduBytes; this test pins that contract by mutating the original + // array and confirming the entry sees the change (i.e. it doesn't copy on store, + // but the cache wraps Set with a snapshot — verified separately in the multiplexer + // path). + bytes[0] = 0xFF; + entry.PduBytes[0].ShouldBe((byte)0xFF, "CacheEntry holds the supplied reference; defensive copies live in the multiplexer"); + } + + [Fact] + public void LastUsedTick_Stamped_By_ResponseCache_OnSet_AndOnHit() + { + // The CacheEntry itself doesn't compute LastUsedTick — the cache assigns the next + // tick on every Set/Get. Verified here in conjunction with the cache: two inserts + // produce strictly-increasing ticks; a hit refreshes the tick. + using var cache = new ResponseCache(maxEntriesPerPlc: 10, evictionIntervalMs: 5000); + var k1 = new CacheKey(1, 0x03, 100, 1); + var k2 = new CacheKey(1, 0x03, 200, 1); + + cache.Set(k1, MakeEntry(ttlMs: 1000)); + cache.Set(k2, MakeEntry(ttlMs: 1000)); + + cache.TryGet(k1, out var e1).ShouldBeTrue(); + cache.TryGet(k2, out var e2).ShouldBeTrue(); + + // Whichever was touched last has the larger LastUsedTick (k2 was the most recent + // touch via TryGet). + e2.LastUsedTick.ShouldBeGreaterThan(e1.LastUsedTick, "the more-recently-touched entry must carry the larger tick"); + } + + private static CacheEntry MakeEntry(int ttlMs) + { + var now = DateTimeOffset.UtcNow; + return new CacheEntry( + PduBytes: [0x03, 0x02, 0x04, 0xD2], + CachedAtUtc: now, + ExpiresAtUtc: now.AddMilliseconds(ttlMs), + Length: 4, + LastUsedTick: 0); + } +} diff --git a/mbproxy/tests/Mbproxy.Tests/Proxy/Cache/CacheInvalidatorTests.cs b/mbproxy/tests/Mbproxy.Tests/Proxy/Cache/CacheInvalidatorTests.cs new file mode 100644 index 0000000..9fb793c --- /dev/null +++ b/mbproxy/tests/Mbproxy.Tests/Proxy/Cache/CacheInvalidatorTests.cs @@ -0,0 +1,99 @@ +using Mbproxy.Proxy.Cache; +using Shouldly; +using Xunit; + +namespace Mbproxy.Tests.Proxy.Cache; + +/// +/// Six range-overlap unit tests required by the Phase-11 doc. Half-open interval math: +/// write [w, w+writeQty) overlaps entry [s, s+qty) iff w < s+qty AND s < w+writeQty. +/// +[Trait("Category", "Unit")] +public sealed class CacheInvalidatorTests +{ + private static CacheKey K(byte unit, ushort start, ushort qty, byte fc = 0x03) + => new(unit, fc, start, qty); + + [Fact] + public void FullOverlap_WriteCoversEntryRange_Invalidates() + { + // Entry [100..110), write [95..115) — write covers entry fully. + var entry = K(unit: 1, start: 100, qty: 10); + var hits = CacheInvalidator.FindOverlapping([entry], unitId: 1, writeStart: 95, writeQty: 20).ToList(); + + hits.ShouldContain(entry, "a write that fully contains the entry's range must invalidate it"); + } + + [Fact] + public void PartialOverlap_WriteStartsBeforeEntry_Invalidates() + { + // Entry [100..110), write [95..105) — overlaps low side. + var entry = K(unit: 1, start: 100, qty: 10); + var hits = CacheInvalidator.FindOverlapping([entry], unitId: 1, writeStart: 95, writeQty: 10).ToList(); + + hits.ShouldContain(entry, "low-side partial overlap must invalidate"); + } + + [Fact] + public void PartialOverlap_WriteEndsAfterEntry_Invalidates() + { + // Entry [100..110), write [105..115) — overlaps high side. + var entry = K(unit: 1, start: 100, qty: 10); + var hits = CacheInvalidator.FindOverlapping([entry], unitId: 1, writeStart: 105, writeQty: 10).ToList(); + + hits.ShouldContain(entry, "high-side partial overlap must invalidate"); + } + + [Fact] + public void Adjacent_NotOverlapping_DoesNotInvalidate() + { + // Half-open intervals: write [10..15) is adjacent to but NOT overlapping entry + // [15..20) — register 15 is in the entry but NOT in the write. Should not match. + var entry = K(unit: 1, start: 15, qty: 5); + var hits = CacheInvalidator.FindOverlapping([entry], unitId: 1, writeStart: 10, writeQty: 5).ToList(); + + hits.ShouldBeEmpty("adjacent-but-not-overlapping ranges must not invalidate (half-open semantics)"); + } + + [Fact] + public void NoOverlap_DoesNotInvalidate() + { + // Entry [100..110), write [200..210) — fully disjoint. + var entry = K(unit: 1, start: 100, qty: 10); + var hits = CacheInvalidator.FindOverlapping([entry], unitId: 1, writeStart: 200, writeQty: 10).ToList(); + + hits.ShouldBeEmpty("disjoint ranges must not invalidate"); + } + + [Fact] + public void DifferentUnitId_DoesNotInvalidate() + { + // Same address range, different unit ID — must not match. + var entry = K(unit: 1, start: 100, qty: 10); + var hits = CacheInvalidator.FindOverlapping([entry], unitId: 2, writeStart: 95, writeQty: 20).ToList(); + + hits.ShouldBeEmpty("writes on a different unit ID must not invalidate this entry"); + } + + // ── Auxiliary correctness checks ───────────────────────────────────────────── + + [Fact] + public void FcOtherThan03Or04_NeverInvalidated() + { + // Defensive: only FC03/FC04 entries are ever stored, but if a non-read key + // somehow appeared the invalidator must skip it. + var nonRead = new CacheKey(UnitId: 1, Fc: 0x06, StartAddress: 100, Qty: 10); + var hits = CacheInvalidator.FindOverlapping([nonRead], unitId: 1, writeStart: 95, writeQty: 20).ToList(); + + hits.ShouldBeEmpty("only FC03/FC04 entries should ever be invalidated"); + } + + [Fact] + public void ZeroWriteQty_NeverInvalidates() + { + var entry = K(unit: 1, start: 100, qty: 10); + var hits = CacheInvalidator.FindOverlapping([entry], unitId: 1, writeStart: 100, writeQty: 0).ToList(); + + hits.ShouldBeEmpty("a degenerate write covering zero registers must not invalidate anything"); + } +} diff --git a/mbproxy/tests/Mbproxy.Tests/Proxy/Cache/CacheKeyTests.cs b/mbproxy/tests/Mbproxy.Tests/Proxy/Cache/CacheKeyTests.cs new file mode 100644 index 0000000..54ea69d --- /dev/null +++ b/mbproxy/tests/Mbproxy.Tests/Proxy/Cache/CacheKeyTests.cs @@ -0,0 +1,42 @@ +using Mbproxy.Proxy.Cache; +using Shouldly; +using Xunit; + +namespace Mbproxy.Tests.Proxy.Cache; + +/// +/// Equality semantics for . The key must distinguish every dimension +/// the cache uses to route a hit — same dimensions as CoalescingKey but a separate +/// type so the two phases can evolve independently. +/// +[Trait("Category", "Unit")] +public sealed class CacheKeyTests +{ + [Fact] + public void Equality_IdenticalKeys_AreEqual() + { + var a = new CacheKey(UnitId: 1, Fc: 0x03, StartAddress: 100, Qty: 4); + var b = new CacheKey(UnitId: 1, Fc: 0x03, StartAddress: 100, Qty: 4); + + a.ShouldBe(b); + a.GetHashCode().ShouldBe(b.GetHashCode()); + } + + [Fact] + public void Equality_Fc03_vs_Fc04_AtSameAddress_DifferentKeys() + { + var fc03 = new CacheKey(UnitId: 1, Fc: 0x03, StartAddress: 100, Qty: 1); + var fc04 = new CacheKey(UnitId: 1, Fc: 0x04, StartAddress: 100, Qty: 1); + + fc03.ShouldNotBe(fc04, "FC03 and FC04 read different Modbus tables"); + } + + [Fact] + public void Equality_DifferentUnitId_DifferentKeys() + { + var u1 = new CacheKey(UnitId: 1, Fc: 0x03, StartAddress: 100, Qty: 1); + var u2 = new CacheKey(UnitId: 2, Fc: 0x03, StartAddress: 100, Qty: 1); + + u1.ShouldNotBe(u2, "different unit IDs never share cache entries"); + } +} diff --git a/mbproxy/tests/Mbproxy.Tests/Proxy/Cache/ResponseCacheE2ETests.cs b/mbproxy/tests/Mbproxy.Tests/Proxy/Cache/ResponseCacheE2ETests.cs new file mode 100644 index 0000000..88c438e --- /dev/null +++ b/mbproxy/tests/Mbproxy.Tests/Proxy/Cache/ResponseCacheE2ETests.cs @@ -0,0 +1,343 @@ +using System.Net; +using System.Net.Sockets; +using System.Text.Json; +using Mbproxy; +using Mbproxy.Options; +using Mbproxy.Proxy; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using NModbus; +using Serilog; +using Shouldly; +using Xunit; + +namespace Mbproxy.Tests.Proxy.Cache; + +/// +/// End-to-end coverage of the Phase-11 response cache against the pymodbus DL205 +/// simulator. +/// +/// pymodbus 3.13 simulator quirk. Like Phase 9 and Phase 10, these tests +/// serialise reads in the simulator-backed cases. The Phase-11 cache's behavioural +/// guarantee (a TTL-bounded cache hit returns the cached value without backend traffic) +/// is independent of the simulator's known concurrent-MBAP-frame bug — sequential reads +/// keep the sim in single-PDU mode, which is its known-good envelope. +/// +/// The headline assertion lives here: 10 reads at 100 ms intervals with a 1 s TTL +/// must result in EXACTLY 1 backend round-trip. If this test fails, Phase 11 does not +/// ship — see 11-response-cache.md. +/// +[Collection(nameof(Mbproxy.Tests.Sim.DL205SimulatorCollection))] +[Trait("Category", "E2E")] +public sealed class ResponseCacheE2ETests +{ + private readonly Mbproxy.Tests.Sim.DL205SimulatorFixture _sim; + public ResponseCacheE2ETests(Mbproxy.Tests.Sim.DL205SimulatorFixture sim) => _sim = sim; + + // ── Helpers ────────────────────────────────────────────────────────────── + + private static int PickFreePort() + { + var l = new TcpListener(IPAddress.Loopback, 0); + l.Start(); + int p = ((IPEndPoint)l.LocalEndpoint).Port; + l.Stop(); + return p; + } + + private Dictionary MakeBaseConfig(int proxyPort) => new() + { + ["Mbproxy:AdminPort"] = "0", + [$"Mbproxy:Plcs:0:Name"] = "TestPLC", + [$"Mbproxy:Plcs:0:ListenPort"] = proxyPort.ToString(), + [$"Mbproxy:Plcs:0:Host"] = _sim.Host, + [$"Mbproxy:Plcs:0:Port"] = _sim.Port.ToString(), + ["Mbproxy:Connection:BackendConnectTimeoutMs"] = "3000", + ["Mbproxy:Connection:BackendRequestTimeoutMs"] = "3000", + }; + + private static IHost BuildBcdHost(Dictionary config) + { + var builder = Host.CreateApplicationBuilder(); + builder.Configuration.AddInMemoryCollection(config); + builder.Services.AddSerilog( + new LoggerConfiguration().MinimumLevel.Fatal().CreateLogger(), + dispose: false); + builder.AddMbproxyOptions(); + builder.Services.AddSingleton(); + builder.Services.AddSingleton(); + builder.Services.AddHostedService(sp => sp.GetRequiredService()); + + if (int.TryParse(config["Mbproxy:AdminPort"], out int admin) && admin > 0) + builder.AddMbproxyAdmin(); + return builder.Build(); + } + + private sealed class AsyncHostDispose : IAsyncDisposable + { + private readonly IHost _host; + public AsyncHostDispose(IHost host) => _host = host; + public async ValueTask DisposeAsync() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); + try { await _host.StopAsync(cts.Token); } catch { } + _host.Dispose(); + } + } + + // ── Headline test: 10 reads at 100 ms intervals → exactly 1 backend round-trip ── + + /// + /// The "is the design pivot worth it?" test. Configure a BCD tag with CacheTtlMs = + /// 1000; issue 10 reads at 100 ms intervals through the proxy. The cache HitCount + /// must show 9 (one miss to prime, 9 hits to serve) and the backend trip count must + /// be exactly 1. + /// + [Fact(Timeout = 5_000)] + public async Task E2E_CacheHit_TenReadsIn1Sec_BackendSeesOneRoundTrip() + { + if (_sim.SkipReason is not null) Assert.Skip(_sim.SkipReason); + + int proxyPort = PickFreePort(); + int adminPort = PickFreePort(); + var config = MakeBaseConfig(proxyPort); + config["Mbproxy:AdminPort"] = adminPort.ToString(); + config["Mbproxy:BcdTags:Global:0:Address"] = "1072"; + config["Mbproxy:BcdTags:Global:0:Width"] = "16"; + config["Mbproxy:BcdTags:Global:0:CacheTtlMs"] = "1000"; + + var host = BuildBcdHost(config); + using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); + await host.StartAsync(startCts.Token); + await using var hd = new AsyncHostDispose(host); + await Task.Delay(300, TestContext.Current.CancellationToken); + + using (var client = new TcpClient()) + { + await client.ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken); + var master = new ModbusFactory().CreateMaster(client); + + // 10 reads at 100 ms intervals — total elapsed ~900 ms, well within the 1000 ms TTL. + for (int i = 0; i < 10; i++) + { + ushort[] regs = master.ReadHoldingRegisters(1, 1072, 1); + regs[0].ShouldBe((ushort)1234, $"read #{i}: BCD-decoded value must be 1234"); + if (i < 9) + await Task.Delay(100, TestContext.Current.CancellationToken); + } + } + + using var httpClient = new HttpClient(); + var resp = await httpClient.GetStringAsync( + $"http://127.0.0.1:{adminPort}/status.json", + TestContext.Current.CancellationToken); + + using var doc = JsonDocument.Parse(resp); + var backend = doc.RootElement.GetProperty("plcs")[0].GetProperty("backend"); + + // 9 cache hits (the first read populated the cache; the next 9 returned from cache). + // 1 cache miss (the priming read). + backend.GetProperty("cacheHitCount").GetInt64() + .ShouldBe(9, "10 reads with TTL=1000 ms at 100 ms intervals must produce 9 cache hits"); + backend.GetProperty("cacheMissCount").GetInt64() + .ShouldBe(1, "exactly the first read should miss"); + + // The backend-trip count is observable via coalescedMissCount (every read that + // makes it to the backend increments this counter; cache hits short-circuit). + backend.GetProperty("coalescedMissCount").GetInt64() + .ShouldBe(1, "exactly one read must reach the backend"); + } + + // ── Regression: cache disabled by default ──────────────────────────────── + + /// + /// Mandatory regression. With no cache config anywhere (default deployment shape), + /// behaviour must be byte-identical to Phase 10. Sequential reads through the same + /// client produce one backend round-trip each — no elision. + /// + [Fact(Timeout = 5_000)] + public async Task Cache_DisabledByDefault_BehaviourIs_ByteIdenticalTo_Phase10() + { + if (_sim.SkipReason is not null) Assert.Skip(_sim.SkipReason); + + int proxyPort = PickFreePort(); + int adminPort = PickFreePort(); + var config = MakeBaseConfig(proxyPort); + config["Mbproxy:AdminPort"] = adminPort.ToString(); + // No Cache section, no CacheTtlMs on any tag — pure Phase-10 behaviour. + config["Mbproxy:BcdTags:Global:0:Address"] = "1072"; + config["Mbproxy:BcdTags:Global:0:Width"] = "16"; + + var host = BuildBcdHost(config); + using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); + await host.StartAsync(startCts.Token); + await using var hd = new AsyncHostDispose(host); + await Task.Delay(300, TestContext.Current.CancellationToken); + + using (var client = new TcpClient()) + { + await client.ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken); + var master = new ModbusFactory().CreateMaster(client); + + for (int i = 0; i < 5; i++) + { + ushort[] regs = master.ReadHoldingRegisters(1, 1072, 1); + regs[0].ShouldBe((ushort)1234, $"read #{i} must still BCD-decode correctly"); + } + } + + using var httpClient = new HttpClient(); + var resp = await httpClient.GetStringAsync( + $"http://127.0.0.1:{adminPort}/status.json", + TestContext.Current.CancellationToken); + + using var doc = JsonDocument.Parse(resp); + var backend = doc.RootElement.GetProperty("plcs")[0].GetProperty("backend"); + + backend.GetProperty("cacheHitCount").GetInt64() + .ShouldBe(0, "no cache config: HitCount must remain at zero"); + backend.GetProperty("cacheMissCount").GetInt64() + .ShouldBe(0, "no cache config: MissCount must remain at zero — cache counters are tracked only for cache-eligible reads"); + backend.GetProperty("coalescedMissCount").GetInt64() + .ShouldBe(5, "every read must reach the backend as before — Phase-10 behaviour preserved"); + } + + // ── TTL expiry path ───────────────────────────────────────────────────── + + [Fact(Timeout = 5_000)] + public async Task E2E_CacheExpires_AfterTtl_NextReadHitsBackend() + { + if (_sim.SkipReason is not null) Assert.Skip(_sim.SkipReason); + + int proxyPort = PickFreePort(); + int adminPort = PickFreePort(); + var config = MakeBaseConfig(proxyPort); + config["Mbproxy:AdminPort"] = adminPort.ToString(); + config["Mbproxy:BcdTags:Global:0:Address"] = "1072"; + config["Mbproxy:BcdTags:Global:0:Width"] = "16"; + config["Mbproxy:BcdTags:Global:0:CacheTtlMs"] = "200"; + + var host = BuildBcdHost(config); + using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); + await host.StartAsync(startCts.Token); + await using var hd = new AsyncHostDispose(host); + await Task.Delay(300, TestContext.Current.CancellationToken); + + using (var client = new TcpClient()) + { + await client.ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken); + var master = new ModbusFactory().CreateMaster(client); + + _ = master.ReadHoldingRegisters(1, 1072, 1); // miss, populates cache + _ = master.ReadHoldingRegisters(1, 1072, 1); // hit + await Task.Delay(350, TestContext.Current.CancellationToken); // > TTL + _ = master.ReadHoldingRegisters(1, 1072, 1); // miss again + } + + using var httpClient = new HttpClient(); + var resp = await httpClient.GetStringAsync( + $"http://127.0.0.1:{adminPort}/status.json", + TestContext.Current.CancellationToken); + + using var doc = JsonDocument.Parse(resp); + var backend = doc.RootElement.GetProperty("plcs")[0].GetProperty("backend"); + + backend.GetProperty("cacheHitCount").GetInt64() + .ShouldBe(1, "exactly one read should land inside the TTL window"); + backend.GetProperty("cacheMissCount").GetInt64() + .ShouldBe(2, "two reads should miss (initial fill and post-expiry refill)"); + backend.GetProperty("coalescedMissCount").GetInt64() + .ShouldBe(2, "the two cache misses must each produce a backend round-trip"); + } + + // ── Write invalidation ─────────────────────────────────────────────────── + + /// + /// Uses a register OUTSIDE the simulator's seeded BCD range so subsequent tests' + /// reads of register 1072 are not polluted by this test's write. The simulator's + /// holding-register table is shared across tests in the collection. + /// + [Fact(Timeout = 5_000)] + public async Task E2E_WriteInvalidatesOverlappingCacheEntries() + { + if (_sim.SkipReason is not null) Assert.Skip(_sim.SkipReason); + + // Use a scratch register (200) from the simulator's allowed-write range so the + // FC06 write does not fault on the simulator side, but a register that no other + // test reads with BCD decoding — its initial value 0 round-trips through the + // BCD codec without surprises, and any post-write state stays contained. + const ushort isolatedRegister = 200; + int proxyPort = PickFreePort(); + int adminPort = PickFreePort(); + var config = MakeBaseConfig(proxyPort); + config["Mbproxy:AdminPort"] = adminPort.ToString(); + config["Mbproxy:BcdTags:Global:0:Address"] = isolatedRegister.ToString(); + config["Mbproxy:BcdTags:Global:0:Width"] = "16"; + config["Mbproxy:BcdTags:Global:0:CacheTtlMs"] = "5000"; + + var host = BuildBcdHost(config); + using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); + await host.StartAsync(startCts.Token); + await using var hd = new AsyncHostDispose(host); + await Task.Delay(300, TestContext.Current.CancellationToken); + + using (var client = new TcpClient()) + { + await client.ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken); + var master = new ModbusFactory().CreateMaster(client); + + _ = master.ReadHoldingRegisters(1, isolatedRegister, 1); // miss → cached + _ = master.ReadHoldingRegisters(1, isolatedRegister, 1); // hit + master.WriteSingleRegister(1, isolatedRegister, 4321); // invalidates the cached entry + _ = master.ReadHoldingRegisters(1, isolatedRegister, 1); // must miss again + } + + using var httpClient = new HttpClient(); + var resp = await httpClient.GetStringAsync( + $"http://127.0.0.1:{adminPort}/status.json", + TestContext.Current.CancellationToken); + + using var doc = JsonDocument.Parse(resp); + var backend = doc.RootElement.GetProperty("plcs")[0].GetProperty("backend"); + + backend.GetProperty("cacheHitCount").GetInt64() + .ShouldBe(1, "the second read should hit the cache"); + backend.GetProperty("cacheMissCount").GetInt64() + .ShouldBe(2, "first read primes the cache; third read misses because the write invalidated the entry"); + backend.GetProperty("cacheInvalidations").GetInt64() + .ShouldBe(1, "the FC06 write must invalidate exactly one cache entry"); + } + + // ── BCD-decoded bytes are cached ───────────────────────────────────────── + + [Fact(Timeout = 5_000)] + public async Task E2E_BcdDecodedBytesAreCached_NotRawBcd() + { + if (_sim.SkipReason is not null) Assert.Skip(_sim.SkipReason); + + int proxyPort = PickFreePort(); + var config = MakeBaseConfig(proxyPort); + config["Mbproxy:BcdTags:Global:0:Address"] = "1072"; + config["Mbproxy:BcdTags:Global:0:Width"] = "16"; + config["Mbproxy:BcdTags:Global:0:CacheTtlMs"] = "5000"; + + var host = BuildBcdHost(config); + using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); + await host.StartAsync(startCts.Token); + await using var hd = new AsyncHostDispose(host); + await Task.Delay(300, TestContext.Current.CancellationToken); + + using var client = new TcpClient(); + await client.ConnectAsync("127.0.0.1", proxyPort, TestContext.Current.CancellationToken); + var master = new ModbusFactory().CreateMaster(client); + + ushort[] r1 = master.ReadHoldingRegisters(1, 1072, 1); + ushort[] r2 = master.ReadHoldingRegisters(1, 1072, 1); + ushort[] r3 = master.ReadHoldingRegisters(1, 1072, 1); + + r1[0].ShouldBe((ushort)1234, "first read must be BCD-decoded"); + r2[0].ShouldBe((ushort)1234, "second read (cache hit) must return decoded 1234, not raw BCD 0x1234"); + r3[0].ShouldBe((ushort)1234, "third read (cache hit) must return decoded 1234"); + } +} diff --git a/mbproxy/tests/Mbproxy.Tests/Proxy/Cache/ResponseCacheMultiplexerTests.cs b/mbproxy/tests/Mbproxy.Tests/Proxy/Cache/ResponseCacheMultiplexerTests.cs new file mode 100644 index 0000000..c9530eb --- /dev/null +++ b/mbproxy/tests/Mbproxy.Tests/Proxy/Cache/ResponseCacheMultiplexerTests.cs @@ -0,0 +1,544 @@ +using System.Collections.Concurrent; +using System.Collections.Frozen; +using System.Net; +using System.Net.Sockets; +using Mbproxy.Bcd; +using Mbproxy.Options; +using Mbproxy.Proxy; +using Mbproxy.Proxy.Cache; +using Mbproxy.Proxy.Multiplexing; +using Microsoft.Extensions.Logging.Abstractions; +using Shouldly; +using Xunit; + +namespace Mbproxy.Tests.Proxy.Cache; + +/// +/// Phase-11 cache wiring inside the multiplexer, exercised against a stub backend with +/// deterministic response timing. Stub-backend tests are the "is the cache wired correctly" +/// proof — they cover behaviour the simulator-backed E2E suite cannot exercise reliably +/// (true concurrent reads through the cache; cross-PLC isolation). +/// +[Trait("Category", "Unit")] +public sealed class ResponseCacheMultiplexerTests +{ + // ── Frame helpers ───────────────────────────────────────────────────────────── + + private static int PickFreePort() + { + var l = new TcpListener(IPAddress.Loopback, 0); + l.Start(); + int port = ((IPEndPoint)l.LocalEndpoint).Port; + l.Stop(); + return port; + } + + private static async Task ReadExactAsync(Socket s, int count, CancellationToken ct) + { + var buf = new byte[count]; + int read = 0; + while (read < count) + { + int n = await s.ReceiveAsync(buf.AsMemory(read, count - read), SocketFlags.None, ct); + if (n == 0) throw new IOException("EOF"); + read += n; + } + return buf; + } + + private static async Task ReadOneFrameAsync(Socket s, CancellationToken ct) + { + var header = await ReadExactAsync(s, 7, ct); + ushort length = (ushort)((header[4] << 8) | header[5]); + int bodyLen = length - 1; + var body = bodyLen > 0 ? await ReadExactAsync(s, bodyLen, ct) : Array.Empty(); + var frame = new byte[7 + bodyLen]; + Buffer.BlockCopy(header, 0, frame, 0, 7); + if (bodyLen > 0) Buffer.BlockCopy(body, 0, frame, 7, bodyLen); + return frame; + } + + private static byte[] BuildFc03(ushort txId, ushort start, ushort qty, byte unit = 1) + => [ + (byte)(txId >> 8), (byte)(txId & 0xFF), + 0x00, 0x00, + 0x00, 0x06, + unit, 0x03, + (byte)(start >> 8), (byte)(start & 0xFF), + (byte)(qty >> 8), (byte)(qty & 0xFF), + ]; + + private static byte[] BuildFc06(ushort txId, ushort addr, ushort value, byte unit = 1) + => [ + (byte)(txId >> 8), (byte)(txId & 0xFF), + 0x00, 0x00, + 0x00, 0x06, + unit, 0x06, + (byte)(addr >> 8), (byte)(addr & 0xFF), + (byte)(value >> 8), (byte)(value & 0xFF), + ]; + + /// + /// Stub backend that responds immediately with a configurable register value. Records + /// every backend request it receives so the test can count round-trips. + /// + private sealed class StubBackend : IAsyncDisposable + { + public int Port { get; } + public int RequestCount => _requestCount; + public ushort RegisterValue { get; set; } = 0x1234; + + private readonly TcpListener _listener; + private readonly CancellationTokenSource _cts = new(); + private readonly List _tasks = new(); + private int _requestCount; + + public StubBackend(int port) + { + Port = port; + _listener = new TcpListener(IPAddress.Loopback, port); + _listener.Start(); + _ = AcceptLoop(); + } + + private async Task AcceptLoop() + { + try + { + while (!_cts.IsCancellationRequested) + { + var s = await _listener.AcceptSocketAsync(_cts.Token); + var t = Task.Run(() => HandleAsync(s)); + lock (_tasks) _tasks.Add(t); + } + } + catch { } + } + + private async Task HandleAsync(Socket s) + { + try + { + while (!_cts.IsCancellationRequested) + { + var req = await ReadOneFrameAsync(s, _cts.Token); + if (req.Length < 8) break; + Interlocked.Increment(ref _requestCount); + + ushort txId = (ushort)((req[0] << 8) | req[1]); + byte unit = req[6]; + byte fc = req[7]; + + byte[] response; + if (fc == 0x03 || fc == 0x04) + { + ushort qty = (ushort)((req[10] << 8) | req[11]); + int byteCount = qty * 2; + response = new byte[7 + 2 + byteCount]; + response[0] = (byte)(txId >> 8); + response[1] = (byte)(txId & 0xFF); + response[2] = 0; response[3] = 0; + ushort len = (ushort)(1 + 2 + byteCount); + response[4] = (byte)(len >> 8); + response[5] = (byte)(len & 0xFF); + response[6] = unit; + response[7] = fc; + response[8] = (byte)byteCount; + for (int i = 0; i < qty; i++) + { + response[9 + i * 2] = (byte)(RegisterValue >> 8); + response[9 + i * 2 + 1] = (byte)(RegisterValue & 0xFF); + } + } + else if (fc == 0x06) + { + ushort addr = (ushort)((req[8] << 8) | req[9]); + ushort val = (ushort)((req[10] << 8) | req[11]); + response = new byte[12]; + response[0] = (byte)(txId >> 8); + response[1] = (byte)(txId & 0xFF); + response[2] = 0; response[3] = 0; + response[4] = 0; response[5] = 6; + response[6] = unit; response[7] = 0x06; + response[8] = (byte)(addr >> 8); response[9] = (byte)(addr & 0xFF); + response[10] = (byte)(val >> 8); response[11] = (byte)(val & 0xFF); + } + else { break; } + + await s.SendAsync(response, SocketFlags.None, _cts.Token); + } + } + catch { } + finally { try { s.Dispose(); } catch { } } + } + + public async ValueTask DisposeAsync() + { + await _cts.CancelAsync(); + try { _listener.Stop(); } catch { } + Task[] snap; + lock (_tasks) snap = _tasks.ToArray(); + try { await Task.WhenAll(snap).WaitAsync(TimeSpan.FromSeconds(2)); } catch { } + _cts.Dispose(); + } + } + + private static PerPlcContext MakeContext(string name, ResponseCache? cache, params BcdTag[] tags) + { + var frozen = tags.ToDictionary(t => t.Address).ToFrozenDictionary(); + var map = frozen.Count > 0 ? new BcdTagMap(frozen) : BcdTagMap.Empty; + return new PerPlcContext + { + PlcName = name, + TagMap = map, + Counters = new ProxyCounters(), + Logger = NullLogger.Instance, + Cache = cache, + }; + } + + private static PlcMultiplexer BuildMux(PlcOptions plc, PerPlcContext ctx, bool coalescingEnabled = true) + { + return new PlcMultiplexer( + plc, new ConnectionOptions(), + new BcdPduPipeline(), + ctx, + NullLogger.Instance, + backendConnectPipeline: null, + coalescingOptions: () => new ReadCoalescingOptions { Enabled = coalescingEnabled, MaxParties = 32 }); + } + + private static async Task<(Socket client, UpstreamPipe pipe, TcpListener proxyListener)> + ConnectClientAsync(PlcMultiplexer mux, string plcName) + { + int proxyPort = PickFreePort(); + var proxyListener = new TcpListener(IPAddress.Loopback, proxyPort); + proxyListener.Start(); + + var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp) + { NoDelay = true }; + await client.ConnectAsync(IPAddress.Loopback, proxyPort); + var upstream = await proxyListener.AcceptSocketAsync(); + var pipe = new UpstreamPipe(upstream, plcName, NullLogger.Instance); + _ = Task.Run(() => mux.StartPipeAsync(pipe, CancellationToken.None)); + return (client, pipe, proxyListener); + } + + // ── Tests ──────────────────────────────────────────────────────────────────── + + [Fact] + public async Task SecondRead_OfSameKey_WithinTtl_HitsCache_NoSecondBackendRoundTrip() + { + int backendPort = PickFreePort(); + await using var backend = new StubBackend(backendPort); + + using var cache = new ResponseCache(maxEntriesPerPlc: 64, evictionIntervalMs: 5000); + // 16-bit BCD tag at address 100 with 5 s TTL. + var ctx = MakeContext("PLC1", cache, BcdTag.Create(100, 16, cacheTtlMs: 5000)); + var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; + await using var mux = BuildMux(plc, ctx); + + var (c, p, l) = await ConnectClientAsync(mux, plc.Name); + try + { + // First read — miss, hits backend. + await c.SendAsync(BuildFc03(0x0001, 100, 1), SocketFlags.None); + var r1 = await ReadOneFrameAsync(c, TestContext.Current.CancellationToken); + ((ushort)((r1[0] << 8) | r1[1])).ShouldBe((ushort)0x0001); + + // Second read same key — hit, no second round-trip. + await c.SendAsync(BuildFc03(0x0002, 100, 1), SocketFlags.None); + var r2 = await ReadOneFrameAsync(c, TestContext.Current.CancellationToken); + ((ushort)((r2[0] << 8) | r2[1])).ShouldBe((ushort)0x0002, + "the cache hit must restore the requesting client's original TxId"); + + backend.RequestCount.ShouldBe(1, "the second read must be served from the cache"); + var snap = ctx.Counters.Snapshot(); + snap.CacheHitCount.ShouldBe(1); + snap.CacheMissCount.ShouldBe(1); + } + finally + { + c.Dispose(); + await p.DisposeAsync(); + l.Stop(); + } + } + + [Fact] + public async Task BcdDecodedBytes_AreCached_NotRawBcd() + { + int backendPort = PickFreePort(); + await using var backend = new StubBackend(backendPort) { RegisterValue = 0x1234 }; // raw BCD nibbles + + using var cache = new ResponseCache(maxEntriesPerPlc: 64, evictionIntervalMs: 5000); + var ctx = MakeContext("PLC1", cache, BcdTag.Create(100, 16, cacheTtlMs: 5000)); + var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; + await using var mux = BuildMux(plc, ctx); + + var (c, p, l) = await ConnectClientAsync(mux, plc.Name); + try + { + await c.SendAsync(BuildFc03(0x0001, 100, 1), SocketFlags.None); + var r1 = await ReadOneFrameAsync(c, TestContext.Current.CancellationToken); + // Response: [..mbap..][0x03][byteCount=2][hi][lo] + ushort decoded1 = (ushort)((r1[9] << 8) | r1[10]); + decoded1.ShouldBe((ushort)1234, "first read must be BCD-decoded by the rewriter"); + + // Now read again — must be served from cache and still show 1234 (not 0x1234). + await c.SendAsync(BuildFc03(0x0002, 100, 1), SocketFlags.None); + var r2 = await ReadOneFrameAsync(c, TestContext.Current.CancellationToken); + ushort decoded2 = (ushort)((r2[9] << 8) | r2[10]); + decoded2.ShouldBe((ushort)1234, + "cache must store POST-rewriter bytes — hits must not re-decode and must not return raw BCD"); + backend.RequestCount.ShouldBe(1, "the second read must be served from the cache"); + } + finally + { + c.Dispose(); + await p.DisposeAsync(); + l.Stop(); + } + } + + [Fact] + public async Task CacheHit_ShortCircuits_Coalescing() + { + int backendPort = PickFreePort(); + await using var backend = new StubBackend(backendPort); + + using var cache = new ResponseCache(maxEntriesPerPlc: 64, evictionIntervalMs: 5000); + var ctx = MakeContext("PLC1", cache, BcdTag.Create(100, 16, cacheTtlMs: 5000)); + var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; + await using var mux = BuildMux(plc, ctx, coalescingEnabled: true); + + var (c, p, l) = await ConnectClientAsync(mux, plc.Name); + try + { + // Prime the cache. + await c.SendAsync(BuildFc03(0x0001, 100, 1), SocketFlags.None); + _ = await ReadOneFrameAsync(c, TestContext.Current.CancellationToken); + + // Subsequent read must hit cache; coalescing miss-counter must NOT increment + // (cache short-circuited before the coalescing path). + long missBefore = ctx.Counters.Snapshot().CoalescedMissCount; + long hitBefore = ctx.Counters.Snapshot().CoalescedHitCount; + + await c.SendAsync(BuildFc03(0x0002, 100, 1), SocketFlags.None); + _ = await ReadOneFrameAsync(c, TestContext.Current.CancellationToken); + + var snap = ctx.Counters.Snapshot(); + snap.CacheHitCount.ShouldBe(1, "second read must hit cache"); + (snap.CoalescedMissCount - missBefore).ShouldBe(0, + "cache hit must short-circuit the coalescing path entirely — no Miss recorded"); + (snap.CoalescedHitCount - hitBefore).ShouldBe(0, + "cache hit must short-circuit the coalescing path entirely — no Hit recorded"); + } + finally + { + c.Dispose(); + await p.DisposeAsync(); + l.Stop(); + } + } + + [Fact] + public async Task Fc06Write_InvalidatesOverlappingCachedRead() + { + int backendPort = PickFreePort(); + await using var backend = new StubBackend(backendPort); + + using var cache = new ResponseCache(maxEntriesPerPlc: 64, evictionIntervalMs: 5000); + var ctx = MakeContext("PLC1", cache, BcdTag.Create(100, 16, cacheTtlMs: 5000)); + var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; + await using var mux = BuildMux(plc, ctx); + + var (c, p, l) = await ConnectClientAsync(mux, plc.Name); + try + { + // Cache the read. + await c.SendAsync(BuildFc03(0x0001, 100, 1), SocketFlags.None); + _ = await ReadOneFrameAsync(c, TestContext.Current.CancellationToken); + cache.Count.ShouldBe(1, "first read must populate the cache"); + + // Write to address 100 — must invalidate the cached read. + await c.SendAsync(BuildFc06(0x0002, 100, 1234), SocketFlags.None); + _ = await ReadOneFrameAsync(c, TestContext.Current.CancellationToken); + + // Cache must be empty now. + cache.Count.ShouldBe(0, "write to a cached address must invalidate the entry"); + ctx.Counters.Snapshot().CacheInvalidations.ShouldBe(1); + + // A subsequent read must miss the cache. + await c.SendAsync(BuildFc03(0x0003, 100, 1), SocketFlags.None); + _ = await ReadOneFrameAsync(c, TestContext.Current.CancellationToken); + backend.RequestCount.ShouldBe(3, "two reads (one miss, one post-invalidate) + one write = 3 backend round-trips"); + } + finally + { + c.Dispose(); + await p.DisposeAsync(); + l.Stop(); + } + } + + [Fact] + public async Task NonOverlappingWrite_DoesNotInvalidate() + { + int backendPort = PickFreePort(); + await using var backend = new StubBackend(backendPort); + + using var cache = new ResponseCache(maxEntriesPerPlc: 64, evictionIntervalMs: 5000); + var ctx = MakeContext("PLC1", cache, + BcdTag.Create(100, 16, cacheTtlMs: 5000), + BcdTag.Create(200, 16, cacheTtlMs: 5000)); + var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; + await using var mux = BuildMux(plc, ctx); + + var (c, p, l) = await ConnectClientAsync(mux, plc.Name); + try + { + // Cache the read at 100. + await c.SendAsync(BuildFc03(0x0001, 100, 1), SocketFlags.None); + _ = await ReadOneFrameAsync(c, TestContext.Current.CancellationToken); + + // Write to address 200 — distinct register; the cached [100..101) must remain. + await c.SendAsync(BuildFc06(0x0002, 200, 7), SocketFlags.None); + _ = await ReadOneFrameAsync(c, TestContext.Current.CancellationToken); + + cache.Count.ShouldBe(1, "a disjoint write must not invalidate the cached read"); + + // Second read on 100 must hit cache. + await c.SendAsync(BuildFc03(0x0003, 100, 1), SocketFlags.None); + _ = await ReadOneFrameAsync(c, TestContext.Current.CancellationToken); + ctx.Counters.Snapshot().CacheHitCount.ShouldBe(1); + backend.RequestCount.ShouldBe(2, "first read + write — second read served from cache"); + } + finally + { + c.Dispose(); + await p.DisposeAsync(); + l.Stop(); + } + } + + [Fact] + public async Task MultiTagRange_AnyZeroTtl_DisablesCachingForWholeRead() + { + int backendPort = PickFreePort(); + await using var backend = new StubBackend(backendPort); + + using var cache = new ResponseCache(maxEntriesPerPlc: 64, evictionIntervalMs: 5000); + // Two tags in the read range [100..102): tag 100 has a TTL, tag 101 does not. + var ctx = MakeContext("PLC1", cache, + BcdTag.Create(100, 16, cacheTtlMs: 1000), + BcdTag.Create(101, 16, cacheTtlMs: 0)); + var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; + await using var mux = BuildMux(plc, ctx); + + var (c, p, l) = await ConnectClientAsync(mux, plc.Name); + try + { + // Two identical reads — both should hit the backend because tag 101 disables + // caching for the whole [100..102) range. + await c.SendAsync(BuildFc03(0x0001, 100, 2), SocketFlags.None); + _ = await ReadOneFrameAsync(c, TestContext.Current.CancellationToken); + + await c.SendAsync(BuildFc03(0x0002, 100, 2), SocketFlags.None); + _ = await ReadOneFrameAsync(c, TestContext.Current.CancellationToken); + + backend.RequestCount.ShouldBe(2, + "any TTL=0 in a multi-tag range must disable caching for the whole read"); + ctx.Counters.Snapshot().CacheHitCount.ShouldBe(0); + ctx.Counters.Snapshot().CacheMissCount.ShouldBe(0, + "reads with effective TTL = 0 must not increment either cache counter"); + } + finally + { + c.Dispose(); + await p.DisposeAsync(); + l.Stop(); + } + } + + [Fact] + public async Task UncachedReads_BehaveIdentically_ToPhase10() + { + // Regression guard: PerPlcContext with Cache = null must behave byte-identically + // to Phase 10 — every FC03 read produces a backend round-trip (coalescing aside). + int backendPort = PickFreePort(); + await using var backend = new StubBackend(backendPort); + + // No cache on the context — Cache = null. + var ctx = MakeContext("PLC1", cache: null); + var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = backendPort }; + await using var mux = BuildMux(plc, ctx); + + var (c, p, l) = await ConnectClientAsync(mux, plc.Name); + try + { + // Three sequential identical reads — each hits the backend (no coalescing + // window with sequential reads, no cache wired). + for (int i = 0; i < 3; i++) + { + await c.SendAsync(BuildFc03((ushort)(i + 1), 100, 1), SocketFlags.None); + _ = await ReadOneFrameAsync(c, TestContext.Current.CancellationToken); + } + + backend.RequestCount.ShouldBe(3, + "without a cache, every read must hit the backend (Phase-10 behaviour)"); + var snap = ctx.Counters.Snapshot(); + snap.CacheHitCount.ShouldBe(0); + snap.CacheMissCount.ShouldBe(0, + "cache counters must remain at zero when no cache is wired"); + snap.CoalescedMissCount.ShouldBe(3, + "every FC03 read must increment CoalescedMissCount per the Phase-10 contract"); + } + finally + { + c.Dispose(); + await p.DisposeAsync(); + l.Stop(); + } + } + + [Fact] + public async Task FailedBackendConnect_OnFirstRead_DoesNotPreventLaterCacheHits_IfCachePrePopulated() + { + // Edge case from the design contract: a cache hit short-circuits backend + // connection establishment. We pre-populate the cache by direct Set, then probe a + // cache hit while the backend is unreachable. + int unreachable = PickFreePort(); // listener never started on this port + + using var cache = new ResponseCache(maxEntriesPerPlc: 16, evictionIntervalMs: 5000); + var ctx = MakeContext("PLC1", cache, BcdTag.Create(100, 16, cacheTtlMs: 60_000)); + var plc = new PlcOptions { Name = "PLC1", ListenPort = 0, Host = "127.0.0.1", Port = unreachable }; + + // Pre-populate the cache with a synthesised response PDU body. This is what the + // backend reader would have stored after BCD-decoding. + var key = new CacheKey(UnitId: 1, Fc: 0x03, StartAddress: 100, Qty: 1); + var now = DateTimeOffset.UtcNow; + byte[] cachedPdu = [0x03, 0x02, 0x04, 0xD2]; // FC=03, byteCount=2, regValue=0x04D2 (decimal 1234) + cache.Set(key, new CacheEntry(cachedPdu, now, now.AddSeconds(60), cachedPdu.Length, 0)); + + await using var mux = BuildMux(plc, ctx); + + var (c, p, l) = await ConnectClientAsync(mux, plc.Name); + try + { + // The cache check runs BEFORE EnsureBackendConnectedAsync, so we should get a + // response even though the backend is unreachable. + using var deadline = new CancellationTokenSource(TimeSpan.FromMilliseconds(800)); + await c.SendAsync(BuildFc03(0x0001, 100, 1), SocketFlags.None, deadline.Token); + var r1 = await ReadOneFrameAsync(c, deadline.Token); + ((ushort)((r1[0] << 8) | r1[1])).ShouldBe((ushort)0x0001, + "cache hits must serve even when the backend is unreachable"); + } + finally + { + c.Dispose(); + await p.DisposeAsync(); + l.Stop(); + } + } +} diff --git a/mbproxy/tests/Mbproxy.Tests/Proxy/Cache/ResponseCacheTests.cs b/mbproxy/tests/Mbproxy.Tests/Proxy/Cache/ResponseCacheTests.cs new file mode 100644 index 0000000..b4ebdf2 --- /dev/null +++ b/mbproxy/tests/Mbproxy.Tests/Proxy/Cache/ResponseCacheTests.cs @@ -0,0 +1,220 @@ +using Mbproxy.Proxy.Cache; +using Shouldly; +using Xunit; + +namespace Mbproxy.Tests.Proxy.Cache; + +/// +/// Phase-11 unit tests for . Cover the load-bearing +/// behaviours: set/get round-trip, TTL expiry, write-range invalidation, LRU bounds, LRU +/// access ordering, concurrent safety, and disposal semantics. +/// +[Trait("Category", "Unit")] +public sealed class ResponseCacheTests +{ + private static CacheEntry MakeEntry(int ttlMs, byte[]? bytes = null) + { + var now = DateTimeOffset.UtcNow; + bytes ??= [0x03, 0x02, 0x04, 0xD2]; + return new CacheEntry( + PduBytes: bytes, + CachedAtUtc: now, + ExpiresAtUtc: now.AddMilliseconds(ttlMs), + Length: bytes.Length, + LastUsedTick: 0); + } + + [Fact] + public void SetThenGet_RoundTrips() + { + using var cache = new ResponseCache(maxEntriesPerPlc: 16, evictionIntervalMs: 5000); + var key = new CacheKey(1, 0x03, 100, 1); + var entry = MakeEntry(ttlMs: 5000); + + cache.Set(key, entry); + cache.TryGet(key, out var got).ShouldBeTrue(); + got.PduBytes.ShouldBe(entry.PduBytes); + cache.Count.ShouldBe(1); + } + + [Fact] + public async Task GetExpiredEntry_ReturnsFalse_AndRemoves() + { + using var cache = new ResponseCache(maxEntriesPerPlc: 16, evictionIntervalMs: 5000); + var key = new CacheKey(1, 0x03, 100, 1); + + // 50 ms TTL; sleep past it and read. + cache.Set(key, MakeEntry(ttlMs: 50)); + cache.Count.ShouldBe(1); + + await Task.Delay(120, TestContext.Current.CancellationToken); + + cache.TryGet(key, out _).ShouldBeFalse("expired entries must report miss"); + cache.Count.ShouldBe(0, "TryGet on an expired entry must remove it lazily"); + } + + [Fact] + public void Invalidate_OverlappingRange_RemovesMatching() + { + using var cache = new ResponseCache(maxEntriesPerPlc: 16, evictionIntervalMs: 5000); + + // Three entries: two overlap a write [105..115), one does not. + var overlapA = new CacheKey(1, 0x03, 100, 10); // [100..110) — overlaps low + var overlapB = new CacheKey(1, 0x03, 110, 10); // [110..120) — overlaps high + var disjoint = new CacheKey(1, 0x03, 200, 10); // [200..210) — disjoint + + cache.Set(overlapA, MakeEntry(ttlMs: 5000)); + cache.Set(overlapB, MakeEntry(ttlMs: 5000)); + cache.Set(disjoint, MakeEntry(ttlMs: 5000)); + cache.Count.ShouldBe(3); + + int invalidated = cache.Invalidate(unitId: 1, startAddress: 105, qty: 10); + + invalidated.ShouldBe(2, "the two overlapping entries must be invalidated"); + cache.Count.ShouldBe(1, "the disjoint entry must remain"); + cache.TryGet(disjoint, out _).ShouldBeTrue("the disjoint entry must still be retrievable"); + } + + [Fact] + public void Invalidate_DifferentUnitId_DoesNotTouch() + { + using var cache = new ResponseCache(maxEntriesPerPlc: 16, evictionIntervalMs: 5000); + var key = new CacheKey(UnitId: 1, Fc: 0x03, StartAddress: 100, Qty: 10); + cache.Set(key, MakeEntry(ttlMs: 5000)); + + int invalidated = cache.Invalidate(unitId: 2, startAddress: 100, qty: 10); + + invalidated.ShouldBe(0, "writes on a different unit ID must not invalidate this entry"); + cache.Count.ShouldBe(1); + } + + [Fact] + public void Set_AtMaxEntries_EvictsLRU() + { + using var cache = new ResponseCache(maxEntriesPerPlc: 3, evictionIntervalMs: 5000); + + // Insert 3 entries at distinct keys. + var k1 = new CacheKey(1, 0x03, 100, 1); + var k2 = new CacheKey(1, 0x03, 200, 1); + var k3 = new CacheKey(1, 0x03, 300, 1); + cache.Set(k1, MakeEntry(5000)); + cache.Set(k2, MakeEntry(5000)); + cache.Set(k3, MakeEntry(5000)); + cache.Count.ShouldBe(3); + + // 4th insert must evict the LRU — which is k1 (the earliest insert without a hit). + var k4 = new CacheKey(1, 0x03, 400, 1); + cache.Set(k4, MakeEntry(5000)); + + cache.Count.ShouldBe(3, "cap held at 3"); + cache.TryGet(k1, out _).ShouldBeFalse("k1 was the LRU and must have been evicted"); + cache.TryGet(k4, out _).ShouldBeTrue(); + } + + [Fact] + public void LRU_TracksAccessOrder_AcrossGetAndSet() + { + using var cache = new ResponseCache(maxEntriesPerPlc: 3, evictionIntervalMs: 5000); + + var k1 = new CacheKey(1, 0x03, 100, 1); + var k2 = new CacheKey(1, 0x03, 200, 1); + var k3 = new CacheKey(1, 0x03, 300, 1); + cache.Set(k1, MakeEntry(5000)); + cache.Set(k2, MakeEntry(5000)); + cache.Set(k3, MakeEntry(5000)); + + // Touch k1 — it becomes the most-recently-used. + cache.TryGet(k1, out _).ShouldBeTrue(); + + // The LRU should now be k2 (k3 is fresher than k2 by insertion; k1 was just touched). + var k4 = new CacheKey(1, 0x03, 400, 1); + cache.Set(k4, MakeEntry(5000)); + + cache.TryGet(k1, out _).ShouldBeTrue("k1 was just touched — must survive"); + cache.TryGet(k2, out _).ShouldBeFalse("k2 was the LRU and must have been evicted"); + cache.TryGet(k3, out _).ShouldBeTrue(); + cache.TryGet(k4, out _).ShouldBeTrue(); + } + + [Fact] + public async Task Concurrent_GetSet_NoDataRace() + { + using var cache = new ResponseCache(maxEntriesPerPlc: 256, evictionIntervalMs: 5000); + + // 8 tasks, 500 ops each — overlapping reads and writes on the same key space. + // Verifies the cache survives concurrency without exceptions and remains coherent. + const int Tasks = 8; + const int Ops = 500; + + var ct = TestContext.Current.CancellationToken; + long opsCompleted = 0; + + await Task.WhenAll(Enumerable.Range(0, Tasks).Select(t => Task.Run(() => + { + for (int i = 0; i < Ops; i++) + { + if (ct.IsCancellationRequested) return; + var key = new CacheKey(1, 0x03, (ushort)(i & 0xFF), 1); + if ((i & 1) == 0) + cache.Set(key, MakeEntry(2000)); + else + cache.TryGet(key, out _); + Interlocked.Increment(ref opsCompleted); + } + }, ct))); + + opsCompleted.ShouldBe((long)(Tasks * Ops), "every concurrent op must complete without exception"); + cache.Count.ShouldBeLessThanOrEqualTo(256, "cap must never be exceeded under concurrent insertion"); + } + + [Fact] + public async Task Dispose_StopsEvictionLoop_AndDoesNotThrowOnSubsequentCalls() + { + var cache = new ResponseCache(maxEntriesPerPlc: 16, evictionIntervalMs: 100); + + cache.Set(new CacheKey(1, 0x03, 100, 1), MakeEntry(ttlMs: 50)); + await Task.Delay(80, TestContext.Current.CancellationToken); + + cache.Dispose(); + cache.Dispose(); // idempotent + + // After dispose, no exception on a synchronous probe — but operations are + // best-effort; we don't promise correct results post-dispose. The contract is: + // disposal must not corrupt state or leak the eviction task. + } + + [Fact] + public void Clear_DropsAllEntries_AndReturnsCount() + { + using var cache = new ResponseCache(maxEntriesPerPlc: 16, evictionIntervalMs: 5000); + cache.Set(new CacheKey(1, 0x03, 100, 1), MakeEntry(5000)); + cache.Set(new CacheKey(1, 0x03, 200, 1), MakeEntry(5000)); + + int dropped = cache.Clear(); + + dropped.ShouldBe(2); + cache.Count.ShouldBe(0); + cache.ApproximateBytes.ShouldBe(0); + } + + [Fact] + public void ApproximateBytes_TracksSetReplaceAndInvalidate() + { + using var cache = new ResponseCache(maxEntriesPerPlc: 16, evictionIntervalMs: 5000); + + var k1 = new CacheKey(1, 0x03, 100, 1); + var k2 = new CacheKey(1, 0x03, 200, 1); + + cache.Set(k1, MakeEntry(5000, bytes: new byte[10])); + cache.Set(k2, MakeEntry(5000, bytes: new byte[20])); + cache.ApproximateBytes.ShouldBe(30L); + + // Replace k1 with a bigger entry. + cache.Set(k1, MakeEntry(5000, bytes: new byte[15])); + cache.ApproximateBytes.ShouldBe(35L); + + // Invalidate k1. + cache.Invalidate(unitId: 1, startAddress: 100, qty: 1).ShouldBe(1); + cache.ApproximateBytes.ShouldBe(20L, "approx-bytes must decrease on invalidate"); + } +} diff --git a/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/PlcMultiplexerTests.cs b/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/PlcMultiplexerTests.cs index 4f7389f..ca4d18d 100644 --- a/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/PlcMultiplexerTests.cs +++ b/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/PlcMultiplexerTests.cs @@ -464,7 +464,20 @@ public sealed class PlcMultiplexerTests sw.Stop(); sw.ElapsedMilliseconds.ShouldBeLessThan(2000, "cascade should propagate quickly"); - ctx.Counters.Snapshot().BackendDisconnectCascades.ShouldBeGreaterThanOrEqualTo(3); + // Poll briefly for the cascade counter — there is an inherent scheduling gap + // between "upstream socket EOF observed" (WaitForCloseAsync returns) and "the + // multiplexer's TearDownBackendAsync increments the counter after awaiting + // every pipe.DisposeAsync". This poll absorbs that scheduling jitter without + // weakening the assertion's semantics — the counter MUST reach 3 (or more) + // because all three upstream pipes were attached when the cascade fired. + long cascades = 0; + for (int i = 0; i < 50; i++) + { + cascades = ctx.Counters.Snapshot().BackendDisconnectCascades; + if (cascades >= 3) break; + await Task.Delay(20, TestContext.Current.CancellationToken); + } + cascades.ShouldBeGreaterThanOrEqualTo(3); } finally {