From dc8d28c22276da497511d35184591d5411fb0c26 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 25 Feb 2026 11:51:55 -0500 Subject: [PATCH] feat: add reply subject mapping cache with TTL (Gap 11.5) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add ReplyMapCache to ReplyMapper.cs — an LRU cache with TTL expiration for gateway reply subject mappings, avoiding repeated string parsing on the hot path. Includes 10 unit tests covering LRU eviction, TTL expiry, hit/miss counters, PurgeExpired, and Count. --- src/NATS.Server/Gateways/ReplyMapper.cs | 116 +++++++++++++ .../Gateways/ReplyMapCacheTests.cs | 164 ++++++++++++++++++ 2 files changed, 280 insertions(+) create mode 100644 tests/NATS.Server.Tests/Gateways/ReplyMapCacheTests.cs diff --git a/src/NATS.Server/Gateways/ReplyMapper.cs b/src/NATS.Server/Gateways/ReplyMapper.cs index 8a1b42e..e80496e 100644 --- a/src/NATS.Server/Gateways/ReplyMapper.cs +++ b/src/NATS.Server/Gateways/ReplyMapper.cs @@ -172,3 +172,119 @@ public static class ReplyMapper return true; } } + +/// +/// LRU cache with TTL expiration for gateway reply subject mappings. +/// Caches the result of MapToGateway/MapFromGateway to avoid repeated parsing. +/// Go reference: gateway.go — reply mapping cache. +/// +public sealed class ReplyMapCache +{ + private readonly int _capacity; + private readonly int _ttlMs; + private readonly Dictionary> _map; + private readonly LinkedList _list = new(); + private readonly Lock _lock = new(); + private long _hits; + private long _misses; + + public ReplyMapCache(int capacity = 4096, int ttlMs = 60_000) + { + _capacity = capacity; + _ttlMs = ttlMs; + _map = new Dictionary>(capacity, StringComparer.Ordinal); + } + + public long Hits => Interlocked.Read(ref _hits); + public long Misses => Interlocked.Read(ref _misses); + public int Count { get { lock (_lock) return _map.Count; } } + + public bool TryGet(string key, out string? value) + { + lock (_lock) + { + if (_map.TryGetValue(key, out var node)) + { + // Check TTL + if ((DateTime.UtcNow - node.Value.CreatedUtc).TotalMilliseconds > _ttlMs) + { + // Expired + _list.Remove(node); + _map.Remove(key); + value = null; + Interlocked.Increment(ref _misses); + return false; + } + + value = node.Value.Value; + _list.Remove(node); + _list.AddFirst(node); + Interlocked.Increment(ref _hits); + return true; + } + } + value = null; + Interlocked.Increment(ref _misses); + return false; + } + + public void Set(string key, string value) + { + lock (_lock) + { + if (_map.TryGetValue(key, out var existing)) + { + _list.Remove(existing); + existing.Value = new CacheEntry(key, value, DateTime.UtcNow); + _list.AddFirst(existing); + return; + } + + if (_map.Count >= _capacity) + { + var last = _list.Last!; + _map.Remove(last.Value.Key); + _list.RemoveLast(); + } + + var entry = new CacheEntry(key, value, DateTime.UtcNow); + var node = new LinkedListNode(entry); + _list.AddFirst(node); + _map[key] = node; + } + } + + public void Clear() + { + lock (_lock) + { + _map.Clear(); + _list.Clear(); + } + } + + /// Removes expired entries. + public int PurgeExpired() + { + lock (_lock) + { + var now = DateTime.UtcNow; + var purged = 0; + var node = _list.Last; + while (node != null) + { + var prev = node.Previous; + if ((now - node.Value.CreatedUtc).TotalMilliseconds > _ttlMs) + { + _map.Remove(node.Value.Key); + _list.Remove(node); + purged++; + } + node = prev; + } + return purged; + } + } + + private sealed record CacheEntry(string Key, string Value, DateTime CreatedUtc); +} diff --git a/tests/NATS.Server.Tests/Gateways/ReplyMapCacheTests.cs b/tests/NATS.Server.Tests/Gateways/ReplyMapCacheTests.cs new file mode 100644 index 0000000..3c7ef33 --- /dev/null +++ b/tests/NATS.Server.Tests/Gateways/ReplyMapCacheTests.cs @@ -0,0 +1,164 @@ +using NATS.Server.Gateways; +using Shouldly; + +namespace NATS.Server.Tests.Gateways; + +/// +/// Tests for the ReplyMapCache LRU cache with TTL expiration. +/// Go reference: gateway.go — reply mapping cache. +/// +public class ReplyMapCacheTests +{ + // Go: gateway.go — cache is initially empty, lookups return false + [Fact] + public void TryGet_returns_false_on_empty() + { + var cache = new ReplyMapCache(capacity: 16, ttlMs: 60_000); + + var found = cache.TryGet("_INBOX.abc", out var value); + + found.ShouldBeFalse(); + value.ShouldBeNull(); + } + + // Go: gateway.go — cached mapping is retrievable after Set + [Fact] + public void Set_and_TryGet_round_trips() + { + var cache = new ReplyMapCache(capacity: 16, ttlMs: 60_000); + cache.Set("_INBOX.abc", "_GR_.cluster1.42._INBOX.abc"); + + var found = cache.TryGet("_INBOX.abc", out var value); + + found.ShouldBeTrue(); + value.ShouldBe("_GR_.cluster1.42._INBOX.abc"); + } + + // Go: gateway.go — LRU eviction removes the least-recently-used entry when at capacity + [Fact] + public void LRU_eviction_at_capacity() + { + var cache = new ReplyMapCache(capacity: 2, ttlMs: 60_000); + cache.Set("key1", "val1"); + cache.Set("key2", "val2"); + // Adding a third entry should evict the LRU entry (key1, since key2 was added last) + cache.Set("key3", "val3"); + + cache.TryGet("key1", out _).ShouldBeFalse(); + cache.TryGet("key2", out var v2).ShouldBeTrue(); + v2.ShouldBe("val2"); + cache.TryGet("key3", out var v3).ShouldBeTrue(); + v3.ShouldBe("val3"); + } + + // Go: gateway.go — entries expire after the configured TTL window + [Fact] + [SlopwatchSuppress("SW004", "TTL expiry test requires real wall-clock time to elapse; no synchronisation primitive can replace observing a time-based cache eviction")] + public void TTL_expiration() + { + var cache = new ReplyMapCache(capacity: 16, ttlMs: 1); + cache.Set("_INBOX.ttl", "_GR_.c1.1._INBOX.ttl"); + + Thread.Sleep(5); // Wait longer than the 1ms TTL + + var found = cache.TryGet("_INBOX.ttl", out var value); + + found.ShouldBeFalse(); + value.ShouldBeNull(); + } + + // Go: gateway.go — hit counter tracks successful cache lookups + [Fact] + public void Hits_incremented_on_hit() + { + var cache = new ReplyMapCache(capacity: 16, ttlMs: 60_000); + cache.Set("key", "value"); + + cache.TryGet("key", out _); + cache.TryGet("key", out _); + + cache.Hits.ShouldBe(2); + cache.Misses.ShouldBe(0); + } + + // Go: gateway.go — miss counter tracks failed lookups + [Fact] + public void Misses_incremented_on_miss() + { + var cache = new ReplyMapCache(capacity: 16, ttlMs: 60_000); + + cache.TryGet("nope", out _); + cache.TryGet("also-nope", out _); + + cache.Misses.ShouldBe(2); + cache.Hits.ShouldBe(0); + } + + // Go: gateway.go — Clear removes all entries from the cache + [Fact] + public void Clear_removes_all() + { + var cache = new ReplyMapCache(capacity: 16, ttlMs: 60_000); + cache.Set("a", "1"); + cache.Set("b", "2"); + cache.Set("c", "3"); + + cache.Clear(); + + cache.Count.ShouldBe(0); + cache.TryGet("a", out _).ShouldBeFalse(); + cache.TryGet("b", out _).ShouldBeFalse(); + cache.TryGet("c", out _).ShouldBeFalse(); + } + + // Go: gateway.go — Set on existing key updates the value and promotes to MRU + [Fact] + public void Set_updates_existing() + { + var cache = new ReplyMapCache(capacity: 16, ttlMs: 60_000); + cache.Set("key", "original"); + cache.Set("key", "updated"); + + cache.TryGet("key", out var value).ShouldBeTrue(); + value.ShouldBe("updated"); + cache.Count.ShouldBe(1); + } + + // Go: gateway.go — PurgeExpired removes only expired entries + [Fact] + [SlopwatchSuppress("SW004", "TTL expiry test requires real wall-clock time to elapse; no synchronisation primitive can replace observing a time-based cache eviction")] + public void PurgeExpired_removes_old_entries() + { + var cache = new ReplyMapCache(capacity: 16, ttlMs: 1); + cache.Set("old1", "v1"); + cache.Set("old2", "v2"); + + Thread.Sleep(5); // Ensure both entries are past the 1ms TTL + + var purged = cache.PurgeExpired(); + + purged.ShouldBe(2); + cache.Count.ShouldBe(0); + } + + // Go: gateway.go — Count reflects the current number of cached entries + [Fact] + public void Count_reflects_entries() + { + var cache = new ReplyMapCache(capacity: 16, ttlMs: 60_000); + + cache.Count.ShouldBe(0); + + cache.Set("a", "1"); + cache.Count.ShouldBe(1); + + cache.Set("b", "2"); + cache.Count.ShouldBe(2); + + cache.Set("c", "3"); + cache.Count.ShouldBe(3); + + cache.Clear(); + cache.Count.ShouldBe(0); + } +}