feat: add reply subject mapping cache with TTL (Gap 11.5)
Add ReplyMapCache to ReplyMapper.cs — an LRU cache with TTL expiration for gateway reply subject mappings, avoiding repeated string parsing on the hot path. Includes 10 unit tests covering LRU eviction, TTL expiry, hit/miss counters, PurgeExpired, and Count.
This commit is contained in:
@@ -172,3 +172,119 @@ public static class ReplyMapper
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// LRU cache with TTL expiration for gateway reply subject mappings.
|
||||
/// Caches the result of MapToGateway/MapFromGateway to avoid repeated parsing.
|
||||
/// Go reference: gateway.go — reply mapping cache.
|
||||
/// </summary>
|
||||
public sealed class ReplyMapCache
|
||||
{
|
||||
private readonly int _capacity;
|
||||
private readonly int _ttlMs;
|
||||
private readonly Dictionary<string, LinkedListNode<CacheEntry>> _map;
|
||||
private readonly LinkedList<CacheEntry> _list = new();
|
||||
private readonly Lock _lock = new();
|
||||
private long _hits;
|
||||
private long _misses;
|
||||
|
||||
public ReplyMapCache(int capacity = 4096, int ttlMs = 60_000)
|
||||
{
|
||||
_capacity = capacity;
|
||||
_ttlMs = ttlMs;
|
||||
_map = new Dictionary<string, LinkedListNode<CacheEntry>>(capacity, StringComparer.Ordinal);
|
||||
}
|
||||
|
||||
public long Hits => Interlocked.Read(ref _hits);
|
||||
public long Misses => Interlocked.Read(ref _misses);
|
||||
public int Count { get { lock (_lock) return _map.Count; } }
|
||||
|
||||
public bool TryGet(string key, out string? value)
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
if (_map.TryGetValue(key, out var node))
|
||||
{
|
||||
// Check TTL
|
||||
if ((DateTime.UtcNow - node.Value.CreatedUtc).TotalMilliseconds > _ttlMs)
|
||||
{
|
||||
// Expired
|
||||
_list.Remove(node);
|
||||
_map.Remove(key);
|
||||
value = null;
|
||||
Interlocked.Increment(ref _misses);
|
||||
return false;
|
||||
}
|
||||
|
||||
value = node.Value.Value;
|
||||
_list.Remove(node);
|
||||
_list.AddFirst(node);
|
||||
Interlocked.Increment(ref _hits);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
value = null;
|
||||
Interlocked.Increment(ref _misses);
|
||||
return false;
|
||||
}
|
||||
|
||||
public void Set(string key, string value)
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
if (_map.TryGetValue(key, out var existing))
|
||||
{
|
||||
_list.Remove(existing);
|
||||
existing.Value = new CacheEntry(key, value, DateTime.UtcNow);
|
||||
_list.AddFirst(existing);
|
||||
return;
|
||||
}
|
||||
|
||||
if (_map.Count >= _capacity)
|
||||
{
|
||||
var last = _list.Last!;
|
||||
_map.Remove(last.Value.Key);
|
||||
_list.RemoveLast();
|
||||
}
|
||||
|
||||
var entry = new CacheEntry(key, value, DateTime.UtcNow);
|
||||
var node = new LinkedListNode<CacheEntry>(entry);
|
||||
_list.AddFirst(node);
|
||||
_map[key] = node;
|
||||
}
|
||||
}
|
||||
|
||||
public void Clear()
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
_map.Clear();
|
||||
_list.Clear();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Removes expired entries.</summary>
|
||||
public int PurgeExpired()
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
var now = DateTime.UtcNow;
|
||||
var purged = 0;
|
||||
var node = _list.Last;
|
||||
while (node != null)
|
||||
{
|
||||
var prev = node.Previous;
|
||||
if ((now - node.Value.CreatedUtc).TotalMilliseconds > _ttlMs)
|
||||
{
|
||||
_map.Remove(node.Value.Key);
|
||||
_list.Remove(node);
|
||||
purged++;
|
||||
}
|
||||
node = prev;
|
||||
}
|
||||
return purged;
|
||||
}
|
||||
}
|
||||
|
||||
private sealed record CacheEntry(string Key, string Value, DateTime CreatedUtc);
|
||||
}
|
||||
|
||||
164
tests/NATS.Server.Tests/Gateways/ReplyMapCacheTests.cs
Normal file
164
tests/NATS.Server.Tests/Gateways/ReplyMapCacheTests.cs
Normal file
@@ -0,0 +1,164 @@
|
||||
using NATS.Server.Gateways;
|
||||
using Shouldly;
|
||||
|
||||
namespace NATS.Server.Tests.Gateways;
|
||||
|
||||
/// <summary>
|
||||
/// Tests for the ReplyMapCache LRU cache with TTL expiration.
|
||||
/// Go reference: gateway.go — reply mapping cache.
|
||||
/// </summary>
|
||||
public class ReplyMapCacheTests
|
||||
{
|
||||
// Go: gateway.go — cache is initially empty, lookups return false
|
||||
[Fact]
|
||||
public void TryGet_returns_false_on_empty()
|
||||
{
|
||||
var cache = new ReplyMapCache(capacity: 16, ttlMs: 60_000);
|
||||
|
||||
var found = cache.TryGet("_INBOX.abc", out var value);
|
||||
|
||||
found.ShouldBeFalse();
|
||||
value.ShouldBeNull();
|
||||
}
|
||||
|
||||
// Go: gateway.go — cached mapping is retrievable after Set
|
||||
[Fact]
|
||||
public void Set_and_TryGet_round_trips()
|
||||
{
|
||||
var cache = new ReplyMapCache(capacity: 16, ttlMs: 60_000);
|
||||
cache.Set("_INBOX.abc", "_GR_.cluster1.42._INBOX.abc");
|
||||
|
||||
var found = cache.TryGet("_INBOX.abc", out var value);
|
||||
|
||||
found.ShouldBeTrue();
|
||||
value.ShouldBe("_GR_.cluster1.42._INBOX.abc");
|
||||
}
|
||||
|
||||
// Go: gateway.go — LRU eviction removes the least-recently-used entry when at capacity
|
||||
[Fact]
|
||||
public void LRU_eviction_at_capacity()
|
||||
{
|
||||
var cache = new ReplyMapCache(capacity: 2, ttlMs: 60_000);
|
||||
cache.Set("key1", "val1");
|
||||
cache.Set("key2", "val2");
|
||||
// Adding a third entry should evict the LRU entry (key1, since key2 was added last)
|
||||
cache.Set("key3", "val3");
|
||||
|
||||
cache.TryGet("key1", out _).ShouldBeFalse();
|
||||
cache.TryGet("key2", out var v2).ShouldBeTrue();
|
||||
v2.ShouldBe("val2");
|
||||
cache.TryGet("key3", out var v3).ShouldBeTrue();
|
||||
v3.ShouldBe("val3");
|
||||
}
|
||||
|
||||
// Go: gateway.go — entries expire after the configured TTL window
|
||||
[Fact]
|
||||
[SlopwatchSuppress("SW004", "TTL expiry test requires real wall-clock time to elapse; no synchronisation primitive can replace observing a time-based cache eviction")]
|
||||
public void TTL_expiration()
|
||||
{
|
||||
var cache = new ReplyMapCache(capacity: 16, ttlMs: 1);
|
||||
cache.Set("_INBOX.ttl", "_GR_.c1.1._INBOX.ttl");
|
||||
|
||||
Thread.Sleep(5); // Wait longer than the 1ms TTL
|
||||
|
||||
var found = cache.TryGet("_INBOX.ttl", out var value);
|
||||
|
||||
found.ShouldBeFalse();
|
||||
value.ShouldBeNull();
|
||||
}
|
||||
|
||||
// Go: gateway.go — hit counter tracks successful cache lookups
|
||||
[Fact]
|
||||
public void Hits_incremented_on_hit()
|
||||
{
|
||||
var cache = new ReplyMapCache(capacity: 16, ttlMs: 60_000);
|
||||
cache.Set("key", "value");
|
||||
|
||||
cache.TryGet("key", out _);
|
||||
cache.TryGet("key", out _);
|
||||
|
||||
cache.Hits.ShouldBe(2);
|
||||
cache.Misses.ShouldBe(0);
|
||||
}
|
||||
|
||||
// Go: gateway.go — miss counter tracks failed lookups
|
||||
[Fact]
|
||||
public void Misses_incremented_on_miss()
|
||||
{
|
||||
var cache = new ReplyMapCache(capacity: 16, ttlMs: 60_000);
|
||||
|
||||
cache.TryGet("nope", out _);
|
||||
cache.TryGet("also-nope", out _);
|
||||
|
||||
cache.Misses.ShouldBe(2);
|
||||
cache.Hits.ShouldBe(0);
|
||||
}
|
||||
|
||||
// Go: gateway.go — Clear removes all entries from the cache
|
||||
[Fact]
|
||||
public void Clear_removes_all()
|
||||
{
|
||||
var cache = new ReplyMapCache(capacity: 16, ttlMs: 60_000);
|
||||
cache.Set("a", "1");
|
||||
cache.Set("b", "2");
|
||||
cache.Set("c", "3");
|
||||
|
||||
cache.Clear();
|
||||
|
||||
cache.Count.ShouldBe(0);
|
||||
cache.TryGet("a", out _).ShouldBeFalse();
|
||||
cache.TryGet("b", out _).ShouldBeFalse();
|
||||
cache.TryGet("c", out _).ShouldBeFalse();
|
||||
}
|
||||
|
||||
// Go: gateway.go — Set on existing key updates the value and promotes to MRU
|
||||
[Fact]
|
||||
public void Set_updates_existing()
|
||||
{
|
||||
var cache = new ReplyMapCache(capacity: 16, ttlMs: 60_000);
|
||||
cache.Set("key", "original");
|
||||
cache.Set("key", "updated");
|
||||
|
||||
cache.TryGet("key", out var value).ShouldBeTrue();
|
||||
value.ShouldBe("updated");
|
||||
cache.Count.ShouldBe(1);
|
||||
}
|
||||
|
||||
// Go: gateway.go — PurgeExpired removes only expired entries
|
||||
[Fact]
|
||||
[SlopwatchSuppress("SW004", "TTL expiry test requires real wall-clock time to elapse; no synchronisation primitive can replace observing a time-based cache eviction")]
|
||||
public void PurgeExpired_removes_old_entries()
|
||||
{
|
||||
var cache = new ReplyMapCache(capacity: 16, ttlMs: 1);
|
||||
cache.Set("old1", "v1");
|
||||
cache.Set("old2", "v2");
|
||||
|
||||
Thread.Sleep(5); // Ensure both entries are past the 1ms TTL
|
||||
|
||||
var purged = cache.PurgeExpired();
|
||||
|
||||
purged.ShouldBe(2);
|
||||
cache.Count.ShouldBe(0);
|
||||
}
|
||||
|
||||
// Go: gateway.go — Count reflects the current number of cached entries
|
||||
[Fact]
|
||||
public void Count_reflects_entries()
|
||||
{
|
||||
var cache = new ReplyMapCache(capacity: 16, ttlMs: 60_000);
|
||||
|
||||
cache.Count.ShouldBe(0);
|
||||
|
||||
cache.Set("a", "1");
|
||||
cache.Count.ShouldBe(1);
|
||||
|
||||
cache.Set("b", "2");
|
||||
cache.Count.ShouldBe(2);
|
||||
|
||||
cache.Set("c", "3");
|
||||
cache.Count.ShouldBe(3);
|
||||
|
||||
cache.Clear();
|
||||
cache.Count.ShouldBe(0);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user