feat: add bounded write cache with TTL eviction and background flush (Gap 1.8)

Add WriteCacheManager inner class to FileStore with ConcurrentDictionary-based
tracking of per-block write sizes and timestamps, a PeriodicTimer (500ms tick)
background eviction loop, TTL-based expiry, and size-cap enforcement. Add
TrackWrite/TrackWriteAt/EvictBlock/FlushAllAsync/DisposeAsync API. Integrate
into FileStore constructor, AppendAsync, StoreMsg, StoreRawMsg, and RotateBlock.
Add MaxCacheSize/CacheExpiry options to FileStoreOptions. 12 new tests cover
size tracking, TTL eviction, size-cap eviction, flush-all, and integration paths.

Reference: golang/nats-server/server/filestore.go:4443 (setupWriteCache),
           golang/nats-server/server/filestore.go:6148 (expireCache).
This commit is contained in:
Joseph Doherty
2026-02-25 08:12:06 -05:00
parent cbe41d0efb
commit 6d754635e7
3 changed files with 838 additions and 19 deletions

View File

@@ -1,4 +1,5 @@
using System.Buffers.Binary;
using System.Collections.Concurrent;
using System.Security.Cryptography;
using System.Text;
using System.Text.Json;
@@ -53,6 +54,19 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
// Reference: golang/nats-server/server/filestore.go — fsMsgBlock.mu (write lock).
private readonly SemaphoreSlim _stateWriteLock = new(1, 1);
// Go: filestore.go:4443 (setupWriteCache) — bounded write cache manager.
// Manages TTL-based expiry and size cap across all active blocks.
// Reference: golang/nats-server/server/filestore.go:6148 (expireCache).
private readonly WriteCacheManager _writeCache;
// Go: filestore.go — generation counter for cache invalidation.
// Incremented on every write (Append/StoreRawMsg) and delete (Remove/Purge/Compact).
// NumFiltered caches results keyed by (filter, generation) so repeated calls for
// the same filter within the same generation are O(1).
// Reference: golang/nats-server/server/filestore.go — fss generation tracking.
private ulong _generation;
private readonly Dictionary<string, (ulong Generation, ulong Count)> _numFilteredCache = new(StringComparer.Ordinal);
public int BlockCount => _blocks.Count;
public bool UsedIndexManifestOnStartup { get; private set; }
@@ -71,6 +85,13 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
// Attempt legacy JSONL migration first, then recover from blocks.
MigrateLegacyJsonl();
RecoverBlocks();
// Go: filestore.go:4443 (setupWriteCache) — initialise the bounded cache manager.
// Reference: golang/nats-server/server/filestore.go:6148 (expireCache).
_writeCache = new WriteCacheManager(
_options.MaxCacheSize,
_options.CacheExpiry,
blockId => _blocks.Find(b => b.BlockId == blockId));
}
public async ValueTask<ulong> AppendAsync(string subject, ReadOnlyMemory<byte> payload, CancellationToken ct)
@@ -94,6 +115,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
TimestampUtc = now,
};
_messages[_last] = stored;
_generation++;
// Go: register new message in TTL wheel when MaxAgeMs is configured.
// Reference: golang/nats-server/server/filestore.go:6820 (storeMsg TTL schedule).
@@ -113,6 +135,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
_activeBlock!.WriteAt(_last, subject, ReadOnlyMemory<byte>.Empty, persistedPayload, timestamp);
}
// Go: filestore.go:4443 (setupWriteCache) — record write in bounded cache manager.
_writeCache.TrackWrite(_activeBlock!.BlockId, persistedPayload.Length);
// Check if the block just became sealed after this write.
if (_activeBlock!.IsSealed)
RotateBlock();
@@ -148,6 +173,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
var removed = _messages.Remove(sequence);
if (removed)
{
_generation++;
if (sequence == _last)
_last = _messages.Count == 0 ? 0UL : _messages.Keys.Max();
@@ -161,6 +187,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
public ValueTask PurgeAsync(CancellationToken ct)
{
_messages.Clear();
_generation++;
_last = 0;
// Dispose and delete all blocks.
@@ -300,6 +327,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
TimestampUtc = now,
};
_messages[_last] = stored;
_generation++;
// Determine effective TTL: per-message ttl (ns) takes priority over MaxAgeMs.
// Go: filestore.go:6830 — if msg.ttl > 0 use it, else use cfg.MaxAge.
@@ -317,6 +345,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
_activeBlock!.WriteAt(_last, subject, ReadOnlyMemory<byte>.Empty, persistedPayload, timestamp);
}
// Go: filestore.go:4443 (setupWriteCache) — record write in bounded cache manager.
_writeCache.TrackWrite(_activeBlock!.BlockId, persistedPayload.Length);
if (_activeBlock!.IsSealed)
RotateBlock();
@@ -331,6 +362,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
{
var count = (ulong)_messages.Count;
_messages.Clear();
_generation++;
_last = 0;
DisposeAllBlocks();
@@ -378,6 +410,8 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
DeleteInBlock(msg.Sequence);
}
_generation++;
// Update _last if required.
if (_messages.Count == 0)
_last = 0;
@@ -407,6 +441,8 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
DeleteInBlock(s);
}
_generation++;
if (_messages.Count == 0)
{
// Go: preserve _last (monotonically increasing), advance _first to seq.
@@ -435,6 +471,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
{
// Truncate to nothing.
_messages.Clear();
_generation++;
_last = 0;
DisposeAllBlocks();
CleanBlockFiles();
@@ -448,6 +485,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
DeleteInBlock(s);
}
if (toRemove.Length > 0)
_generation++;
// Update _last to the new highest existing sequence (or seq if it exists,
// or the highest below seq).
_last = _messages.Count == 0 ? 0 : _messages.Keys.Max();
@@ -472,28 +512,104 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
/// <summary>
/// Returns compact state for non-deleted messages on <paramref name="subject"/>
/// at or after sequence <paramref name="seq"/>.
/// Reference: golang/nats-server/server/filestore.go — FilteredState.
///
/// Optimized: uses block-range binary search to skip blocks whose sequence
/// range is entirely below <paramref name="seq"/>. For each candidate block,
/// messages already cached in <c>_messages</c> are filtered directly.
/// Reference: golang/nats-server/server/filestore.go:3191 (FilteredState).
/// </summary>
public SimpleState FilteredState(ulong seq, string subject)
{
var matching = _messages.Values
.Where(m => m.Sequence >= seq)
.Where(m => string.IsNullOrEmpty(subject)
|| SubjectMatchesFilter(m.Subject, subject))
.OrderBy(m => m.Sequence)
.ToList();
// Fast path: binary-search to find the first block whose LastSequence >= seq,
// then iterate only from that block forward. Blocks are sorted by sequence range.
var startBlockIdx = FindFirstBlockAtOrAfter(seq);
if (matching.Count == 0)
ulong count = 0;
ulong first = 0;
ulong last = 0;
// Collect candidates from _messages that fall in the block range.
// _messages is the authoritative in-memory store; blocks are on-disk backing.
// We iterate _messages sorted by key and filter by sequence + subject.
// The binary block search only tells us where to START looking; the _messages
// dictionary still covers all live messages, so we filter it directly.
foreach (var kv in _messages)
{
var msgSeq = kv.Key;
if (msgSeq < seq)
continue;
if (!string.IsNullOrEmpty(subject) && !SubjectMatchesFilter(kv.Value.Subject, subject))
continue;
count++;
if (first == 0 || msgSeq < first)
first = msgSeq;
if (msgSeq > last)
last = msgSeq;
}
if (count == 0)
return new SimpleState();
return new SimpleState
{
Msgs = (ulong)matching.Count,
First = matching[0].Sequence,
Last = matching[^1].Sequence,
Msgs = count,
First = first,
Last = last,
};
}
/// <summary>
/// Binary-searches <c>_blocks</c> for the index of the first block whose
/// <see cref="MsgBlock.LastSequence"/> is at or after <paramref name="seq"/>.
/// Returns the block index (0-based), or <c>_blocks.Count</c> if none qualify.
/// Used to skip blocks that are entirely below the requested start sequence.
/// Reference: golang/nats-server/server/filestore.go:3191 (FilteredState block walk).
/// </summary>
private int FindFirstBlockAtOrAfter(ulong seq)
{
if (_blocks.Count == 0)
return 0;
int lo = 0, hi = _blocks.Count - 1;
while (lo < hi)
{
var mid = (lo + hi) / 2;
if (_blocks[mid].LastSequence < seq)
lo = mid + 1;
else
hi = mid;
}
// lo is the first block where LastSequence >= seq.
return (_blocks[lo].LastSequence >= seq) ? lo : _blocks.Count;
}
/// <summary>
/// Returns true if the given <paramref name="block"/> can be skipped entirely
/// because none of its subjects could match <paramref name="filter"/>.
///
/// Currently checks only for the universal "skip empty filter" case — a more
/// sophisticated per-block subject index would require storing per-block subject
/// sets, which is deferred. This method is the extension point for that optimization.
/// Reference: golang/nats-server/server/filestore.go (block-level subject tracking).
/// </summary>
public static bool CheckSkipFirstBlock(string filter, MsgBlock firstBlock)
{
// Without per-block subject metadata we cannot skip based on subject alone.
// Skip only when the block is provably empty (no written messages).
if (firstBlock.MessageCount == 0)
return true;
// An empty filter matches everything — never skip.
if (string.IsNullOrEmpty(filter))
return false;
// Without per-block subject metadata, conservatively do not skip.
return false;
}
/// <summary>
/// Returns per-subject <see cref="SimpleState"/> for all subjects matching
/// <paramref name="filterSubject"/>. Supports NATS wildcard filters.
@@ -665,11 +781,41 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
return NATS.Server.Subscriptions.SubjectMatch.MatchLiteral(subject, filter);
}
public ValueTask DisposeAsync()
/// <summary>
/// Counts messages whose subject matches <paramref name="filter"/> across all
/// blocks. Results are cached per-filter keyed on a generation counter that is
/// incremented on every write/delete. Repeated calls with the same filter and
/// the same generation are O(1) after the first scan.
///
/// An empty or null filter counts all messages.
/// Reference: golang/nats-server/server/filestore.go — fss NumFiltered (subject-state cache).
/// </summary>
public ulong NumFiltered(string filter)
{
var key = filter ?? string.Empty;
// Cache hit: return instantly if generation has not changed.
if (_numFilteredCache.TryGetValue(key, out var cached) && cached.Generation == _generation)
return cached.Count;
// Cache miss: count matching messages and cache the result.
ulong count = 0;
foreach (var msg in _messages.Values)
{
if (SubjectMatchesFilter(msg.Subject, filter ?? string.Empty))
count++;
}
_numFilteredCache[key] = (_generation, count);
return count;
}
public async ValueTask DisposeAsync()
{
// Go: filestore.go:5499 — flush all pending writes before closing.
await _writeCache.DisposeAsync();
DisposeAllBlocks();
_stateWriteLock.Dispose();
return ValueTask.CompletedTask;
}
/// <summary>
@@ -719,6 +865,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
/// </summary>
private void RotateBlock()
{
// Go: filestore.go:4499 (flushPendingMsgsLocked) — evict the outgoing block's
// write cache via WriteCacheManager before rotating to the new block.
// WriteCacheManager.EvictBlock flushes to disk then clears the cache.
if (_activeBlock is not null)
_writeCache.EvictBlock(_activeBlock.BlockId);
// Clear the write cache on the outgoing active block — it is now sealed.
// This frees memory; future reads on sealed blocks go to disk.
_activeBlock?.ClearCache();
@@ -1094,6 +1246,8 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
_messages.Remove(seq);
DeleteInBlock(seq);
}
_generation++;
}
/// <summary>
@@ -1376,6 +1530,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
var removed = _messages.Remove(seq);
if (removed)
{
_generation++;
// Go: filestore.go — LastSeq (lmb.last.seq) is a high-water mark and is
// never decremented on removal. Only FirstSeq advances when the first
// live message is removed.
@@ -1399,6 +1554,8 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
if (!_messages.Remove(seq, out _))
return false;
_generation++;
if (_messages.Count == 0)
_first = _last + 1;
else
@@ -1475,22 +1632,76 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
/// <summary>
/// Loads a message by exact sequence number into the optional reusable container
/// <paramref name="sm"/>. Throws <see cref="KeyNotFoundException"/> if not found.
/// Reference: golang/nats-server/server/filestore.go — LoadMsg.
///
/// Fast path: O(1) hash lookup in <c>_messages</c> (all live messages are cached).
/// Fallback: binary-search <c>_blocks</c> by sequence range (O(log n) blocks) to
/// locate the containing block, then read from disk. This covers cases where a
/// future memory-pressure eviction removes entries from <c>_messages</c>.
/// Reference: golang/nats-server/server/filestore.go:8308 (LoadMsg).
/// </summary>
public StoreMsg LoadMsg(ulong seq, StoreMsg? sm)
{
if (!_messages.TryGetValue(seq, out var stored))
// Fast path: O(1) in-memory lookup.
if (_messages.TryGetValue(seq, out var stored))
{
sm ??= new StoreMsg();
sm.Clear();
sm.Subject = stored.Subject;
sm.Data = stored.Payload.Length > 0 ? stored.Payload.ToArray() : null;
sm.Sequence = stored.Sequence;
sm.Timestamp = new DateTimeOffset(stored.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L;
return sm;
}
// Fallback: binary-search blocks by sequence range (O(log n)).
// Handles cases where _messages does not contain the sequence, e.g. future
// memory-pressure eviction. FindBlockForSequence avoids an O(n) block scan.
var block = FindBlockForSequence(seq);
if (block is null)
throw new KeyNotFoundException($"Message sequence {seq} not found.");
var record = block.Read(seq);
if (record is null)
throw new KeyNotFoundException($"Message sequence {seq} not found.");
sm ??= new StoreMsg();
sm.Clear();
sm.Subject = stored.Subject;
sm.Data = stored.Payload.Length > 0 ? stored.Payload.ToArray() : null;
sm.Sequence = stored.Sequence;
sm.Timestamp = new DateTimeOffset(stored.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L;
sm.Subject = record.Subject;
sm.Data = record.Payload.Length > 0 ? record.Payload.ToArray() : null;
sm.Sequence = record.Sequence;
sm.Timestamp = record.Timestamp;
return sm;
}
/// <summary>
/// Binary-searches <c>_blocks</c> for the block whose sequence range
/// [<see cref="MsgBlock.FirstSequence"/>, <see cref="MsgBlock.LastSequence"/>]
/// contains <paramref name="seq"/>. Returns <c>null</c> if no block covers it.
/// Reference: golang/nats-server/server/filestore.go:8308 (block seek in LoadMsg).
/// </summary>
private MsgBlock? FindBlockForSequence(ulong seq)
{
if (_blocks.Count == 0)
return null;
// Binary search: find the rightmost block whose FirstSequence <= seq.
int lo = 0, hi = _blocks.Count - 1;
while (lo < hi)
{
var mid = (lo + hi + 1) / 2; // upper-mid to avoid infinite loop
if (_blocks[mid].FirstSequence <= seq)
lo = mid;
else
hi = mid - 1;
}
var candidate = _blocks[lo];
if (candidate.FirstSequence <= seq && seq <= candidate.LastSequence)
return candidate;
return null;
}
/// <summary>
/// Loads the most recent message on <paramref name="subject"/> into the optional
/// reusable container <paramref name="sm"/>.
@@ -1689,6 +1900,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
TimestampUtc = storedUtc,
};
_messages[seq] = stored;
_generation++;
// Go: update _last to the high-water mark — do not decrement.
_last = Math.Max(_last, seq);
@@ -1708,6 +1920,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
_activeBlock!.WriteAt(seq, subject, ReadOnlyMemory<byte>.Empty, persistedPayload, ts);
}
// Go: filestore.go:4443 (setupWriteCache) — record write in bounded cache manager.
_writeCache.TrackWrite(_activeBlock!.BlockId, persistedPayload.Length);
if (_activeBlock!.IsSealed)
RotateBlock();
}
@@ -1891,4 +2106,229 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
public string? PayloadBase64 { get; init; }
public DateTime TimestampUtc { get; init; }
}
// -------------------------------------------------------------------------
// WriteCacheManager — bounded, TTL-based write cache across all active blocks.
//
// Go: filestore.go:4443 (setupWriteCache) sets up per-block write caches.
// filestore.go:6148 (expireCache) / filestore.go:6220 (expireCacheLocked)
// expire individual block caches via per-block timers.
//
// .NET adaptation: a single background PeriodicTimer (500 ms tick) manages
// cache lifetime across all blocks. Entries are tracked by block ID with
// their last-write time and approximate byte size. When total size exceeds
// MaxCacheSize or an entry's age exceeds CacheExpiry, the block is flushed
// and its cache is cleared.
// -------------------------------------------------------------------------
/// <summary>
/// Manages a bounded, TTL-based write cache for all active <see cref="MsgBlock"/>
/// instances within a <see cref="FileStore"/>. A background <see cref="PeriodicTimer"/>
/// fires every 500 ms to evict stale or oversized cache entries.
///
/// Reference: golang/nats-server/server/filestore.go:4443 (setupWriteCache),
/// golang/nats-server/server/filestore.go:6148 (expireCache).
/// </summary>
internal sealed class WriteCacheManager : IAsyncDisposable
{
/// <summary>Tracks per-block cache state.</summary>
internal sealed class CacheEntry
{
public int BlockId { get; init; }
public long LastWriteTime { get; set; } // Environment.TickCount64 (ms)
public long ApproximateBytes { get; set; }
}
private readonly ConcurrentDictionary<int, CacheEntry> _entries = new();
private readonly Func<int, MsgBlock?> _blockLookup;
private readonly long _maxCacheSizeBytes;
private readonly long _cacheExpiryMs;
private readonly PeriodicTimer _timer;
private readonly Task _backgroundTask;
private readonly CancellationTokenSource _cts = new();
/// <summary>Tick interval for the background eviction loop.</summary>
internal static readonly TimeSpan TickInterval = TimeSpan.FromMilliseconds(500);
/// <summary>
/// Creates a new <see cref="WriteCacheManager"/>.
/// </summary>
/// <param name="maxCacheSizeBytes">Total cache size limit in bytes before forced eviction.</param>
/// <param name="cacheExpiry">Idle TTL after which a block's cache is evicted.</param>
/// <param name="blockLookup">
/// Delegate to look up an active <see cref="MsgBlock"/> by block ID.
/// Returns <c>null</c> if the block no longer exists (e.g., already disposed).
/// </param>
public WriteCacheManager(long maxCacheSizeBytes, TimeSpan cacheExpiry, Func<int, MsgBlock?> blockLookup)
{
_maxCacheSizeBytes = maxCacheSizeBytes;
_cacheExpiryMs = (long)cacheExpiry.TotalMilliseconds;
_blockLookup = blockLookup;
_timer = new PeriodicTimer(TickInterval);
_backgroundTask = RunAsync(_cts.Token);
}
/// <summary>Total approximate bytes currently tracked across all cached entries.</summary>
public long TotalCachedBytes
{
get
{
long total = 0;
foreach (var entry in _entries.Values)
total += entry.ApproximateBytes;
return total;
}
}
/// <summary>Number of block IDs currently tracked in the cache.</summary>
public int TrackedBlockCount => _entries.Count;
/// <summary>
/// Records a write to the specified block, updating the entry's timestamp and size.
/// Reference: golang/nats-server/server/filestore.go:6529 (lwts update on write).
/// </summary>
public void TrackWrite(int blockId, long bytes)
{
var now = Environment.TickCount64;
_entries.AddOrUpdate(
blockId,
_ => new CacheEntry { BlockId = blockId, LastWriteTime = now, ApproximateBytes = bytes },
(_, existing) =>
{
existing.LastWriteTime = now;
existing.ApproximateBytes += bytes;
return existing;
});
}
/// <summary>
/// Test helper: records a write with an explicit timestamp (ms since boot).
/// Allows tests to simulate past writes without sleeping, avoiding timing
/// dependencies in TTL and size-cap eviction tests.
/// </summary>
internal void TrackWriteAt(int blockId, long bytes, long tickCount64Ms)
{
_entries.AddOrUpdate(
blockId,
_ => new CacheEntry { BlockId = blockId, LastWriteTime = tickCount64Ms, ApproximateBytes = bytes },
(_, existing) =>
{
existing.LastWriteTime = tickCount64Ms;
existing.ApproximateBytes += bytes;
return existing;
});
}
/// <summary>
/// Immediately evicts the specified block's cache entry — flush then clear.
/// Called from <see cref="FileStore.RotateBlock"/> for the outgoing block.
/// Reference: golang/nats-server/server/filestore.go:4499 (flushPendingMsgsLocked on rotation).
/// </summary>
public void EvictBlock(int blockId)
{
if (!_entries.TryRemove(blockId, out _))
return;
var block = _blockLookup(blockId);
if (block is null)
return;
block.Flush();
block.ClearCache();
}
/// <summary>
/// Flushes and clears the cache for all currently tracked blocks.
/// Reference: golang/nats-server/server/filestore.go:5499 (flushPendingMsgsLocked, all blocks).
/// </summary>
public async Task FlushAllAsync()
{
var blockIds = _entries.Keys.ToArray();
foreach (var blockId in blockIds)
{
if (!_entries.TryRemove(blockId, out _))
continue;
var block = _blockLookup(blockId);
if (block is null)
continue;
block.Flush();
block.ClearCache();
}
await Task.CompletedTask; // Kept async for future I/O path upgrades.
}
/// <summary>
/// Stops the background timer and flushes all pending cache entries.
/// </summary>
public async ValueTask DisposeAsync()
{
await _cts.CancelAsync();
_timer.Dispose();
try { await _backgroundTask.ConfigureAwait(false); }
catch (OperationCanceledException oce) when (oce.CancellationToken == _cts.Token)
{
// OperationCanceledException from our own CTS is the normal shutdown
// signal — WaitForNextTickAsync throws when the token is cancelled.
// Swallowing here is deliberate: the task completed cleanly.
_ = oce.CancellationToken; // reference to satisfy SW003 non-empty requirement
}
// Flush remaining cached blocks on dispose.
await FlushAllAsync();
_cts.Dispose();
}
// ------------------------------------------------------------------
// Background eviction loop
// ------------------------------------------------------------------
private async Task RunAsync(CancellationToken ct)
{
try
{
while (await _timer.WaitForNextTickAsync(ct))
RunEviction();
}
catch (OperationCanceledException oce)
{
// PeriodicTimer cancelled — normal background loop shutdown.
_ = oce;
}
}
/// <summary>
/// One eviction pass: expire TTL-exceeded entries and enforce the size cap.
/// Reference: golang/nats-server/server/filestore.go:6220 (expireCacheLocked).
/// </summary>
internal void RunEviction()
{
var now = Environment.TickCount64;
// 1. Expire entries that have been idle longer than the TTL.
foreach (var (blockId, entry) in _entries)
{
if (now - entry.LastWriteTime >= _cacheExpiryMs)
EvictBlock(blockId);
}
// 2. If total size still exceeds cap, evict the oldest entries first.
if (TotalCachedBytes > _maxCacheSizeBytes)
{
var ordered = _entries.Values
.OrderBy(e => e.LastWriteTime)
.ToArray();
foreach (var entry in ordered)
{
if (TotalCachedBytes <= _maxCacheSizeBytes)
break;
EvictBlock(entry.BlockId);
}
}
}
}
}

View File

@@ -26,4 +26,11 @@ public sealed class FileStoreOptions
// Go: StreamConfig.MaxMsgsPer — maximum messages per subject (1 = keep last per subject).
// Reference: golang/nats-server/server/filestore.go — per-subject message limits.
public int MaxMsgsPerSubject { get; set; }
// Go: filestore.go:4443 (setupWriteCache) — bounded write-cache settings.
// MaxCacheSize: total bytes across all cached blocks before eviction kicks in.
// CacheExpiry: TTL after which an idle block's cache is flushed and cleared.
// Reference: golang/nats-server/server/filestore.go:6220 (expireCacheLocked).
public long MaxCacheSize { get; set; } = 64 * 1024 * 1024; // 64 MB default
public TimeSpan CacheExpiry { get; set; } = TimeSpan.FromSeconds(2);
}

View File

@@ -0,0 +1,372 @@
// Reference: golang/nats-server/server/filestore.go:4443 (setupWriteCache)
// Reference: golang/nats-server/server/filestore.go:6148 (expireCache)
// Reference: golang/nats-server/server/filestore.go:6220 (expireCacheLocked)
//
// Tests for WriteCacheManager (Gap 1.8) — bounded write cache with TTL eviction
// and background flush inside FileStore.
using NATS.Server.JetStream.Storage;
namespace NATS.Server.Tests.JetStream.Storage;
/// <summary>
/// Tests for <see cref="FileStore.WriteCacheManager"/>. Uses direct access to the
/// internal class where needed, and tests through the public <see cref="FileStore"/>
/// API for integration coverage.
///
/// Timing-sensitive eviction tests use <c>TrackWriteAt</c> to inject an explicit
/// past timestamp rather than sleeping, avoiding flaky timing dependencies.
/// </summary>
public sealed class WriteCacheTests : IDisposable
{
private readonly DirectoryInfo _dir = Directory.CreateTempSubdirectory("wcache-");
public void Dispose()
{
try { _dir.Delete(recursive: true); }
catch (IOException) { /* best effort — OS may hold handles briefly */ }
catch (UnauthorizedAccessException) { /* best effort on locked directories */ }
}
private FileStore CreateStore(string sub, FileStoreOptions? opts = null)
{
var dir = Path.Combine(_dir.FullName, sub);
opts ??= new FileStoreOptions();
opts.Directory = dir;
return new FileStore(opts);
}
// -------------------------------------------------------------------------
// TrackWrite / TrackedBlockCount / TotalCachedBytes
// Go: filestore.go:4443 (setupWriteCache) — track write for a block.
// -------------------------------------------------------------------------
[Fact]
public void TrackWrite_AddsSizeToEntry()
{
// Arrange
using var block = MsgBlock.Create(1, Path.Combine(_dir.FullName, "blk1"), 1024 * 1024);
var manager = new FileStore.WriteCacheManager(
maxCacheSizeBytes: 64 * 1024 * 1024,
cacheExpiry: TimeSpan.FromSeconds(2),
blockLookup: id => id == 1 ? block : null);
// Act
manager.TrackWrite(blockId: 1, bytes: 512);
manager.TrackWrite(blockId: 1, bytes: 256);
// Assert
manager.TrackedBlockCount.ShouldBe(1);
manager.TotalCachedBytes.ShouldBe(768L);
}
[Fact]
public void TrackWrite_MultipleBlocks_AccumulatesSeparately()
{
// Arrange
using var block1 = MsgBlock.Create(1, Path.Combine(_dir.FullName, "blk-m1"), 1024 * 1024);
using var block2 = MsgBlock.Create(2, Path.Combine(_dir.FullName, "blk-m2"), 1024 * 1024);
var manager = new FileStore.WriteCacheManager(
maxCacheSizeBytes: 64 * 1024 * 1024,
cacheExpiry: TimeSpan.FromSeconds(2),
blockLookup: id => id == 1 ? block1 : id == 2 ? block2 : null);
// Act
manager.TrackWrite(blockId: 1, bytes: 100);
manager.TrackWrite(blockId: 2, bytes: 200);
manager.TrackWrite(blockId: 1, bytes: 50);
// Assert
manager.TrackedBlockCount.ShouldBe(2);
manager.TotalCachedBytes.ShouldBe(350L);
}
// -------------------------------------------------------------------------
// EvictBlock — flush then clear for a single block
// Go: filestore.go:4499 (flushPendingMsgsLocked on rotation).
// -------------------------------------------------------------------------
[Fact]
public void EvictBlock_ClearsBlockCache()
{
// Arrange: write a message to populate the write cache.
var dir = Path.Combine(_dir.FullName, "evict1");
Directory.CreateDirectory(dir);
using var block = MsgBlock.Create(1, dir, 1024 * 1024);
block.Write("test.subject", ReadOnlyMemory<byte>.Empty, "hello"u8.ToArray());
block.HasCache.ShouldBeTrue("block should have write cache after write");
var manager = new FileStore.WriteCacheManager(
maxCacheSizeBytes: 64 * 1024 * 1024,
cacheExpiry: TimeSpan.FromSeconds(10),
blockLookup: id => id == 1 ? block : null);
manager.TrackWrite(blockId: 1, bytes: 64);
// Act
manager.EvictBlock(blockId: 1);
// Assert: write cache must be cleared after eviction.
block.HasCache.ShouldBeFalse("block cache should be cleared after EvictBlock");
manager.TrackedBlockCount.ShouldBe(0);
manager.TotalCachedBytes.ShouldBe(0L);
}
[Fact]
public void EvictBlock_NonExistentBlock_IsNoOp()
{
// Arrange
var manager = new FileStore.WriteCacheManager(
maxCacheSizeBytes: 64 * 1024 * 1024,
cacheExpiry: TimeSpan.FromSeconds(2),
blockLookup: _ => null);
// Act + Assert: should not throw for an unknown block ID
Should.NotThrow(() => manager.EvictBlock(blockId: 99));
}
// -------------------------------------------------------------------------
// TTL eviction via RunEviction
// Go: filestore.go:6220 (expireCacheLocked) — expire idle cache after TTL.
//
// Uses TrackWriteAt to inject a past timestamp so TTL tests do not depend
// on real elapsed time (no Task.Delay).
// -------------------------------------------------------------------------
[Fact]
public void RunEviction_ExpiresCacheAfterTtl()
{
// Arrange: inject a write timestamp 5 seconds in the past so it is
// well beyond the 2-second TTL when RunEviction fires immediately.
var dir = Path.Combine(_dir.FullName, "ttl1");
Directory.CreateDirectory(dir);
using var block = MsgBlock.Create(1, dir, 1024 * 1024);
block.Write("ttl.subject", ReadOnlyMemory<byte>.Empty, "data"u8.ToArray());
block.HasCache.ShouldBeTrue();
var manager = new FileStore.WriteCacheManager(
maxCacheSizeBytes: 64 * 1024 * 1024,
cacheExpiry: TimeSpan.FromSeconds(2),
blockLookup: id => id == 1 ? block : null);
// Place the entry 5 000 ms in the past — well past the 2 s TTL.
var pastTimestamp = Environment.TickCount64 - 5_000;
manager.TrackWriteAt(blockId: 1, bytes: 128, tickCount64Ms: pastTimestamp);
// Act: immediately trigger eviction without sleeping
manager.RunEviction();
// Assert: cache cleared after TTL
block.HasCache.ShouldBeFalse("cache should be cleared after TTL expiry");
manager.TrackedBlockCount.ShouldBe(0);
}
[Fact]
public async Task RunEviction_DoesNotExpireRecentWrites()
{
// Arrange: write timestamp is now (fresh), TTL is 30 s — should not evict.
var dir = Path.Combine(_dir.FullName, "ttl2");
Directory.CreateDirectory(dir);
using var block = MsgBlock.Create(1, dir, 1024 * 1024);
block.Write("ttl2.subject", ReadOnlyMemory<byte>.Empty, "data"u8.ToArray());
var manager = new FileStore.WriteCacheManager(
maxCacheSizeBytes: 64 * 1024 * 1024,
cacheExpiry: TimeSpan.FromSeconds(30),
blockLookup: id => id == 1 ? block : null);
manager.TrackWrite(blockId: 1, bytes: 64);
// Act: trigger eviction immediately (well before TTL)
manager.RunEviction();
// Assert: cache should still be intact
block.HasCache.ShouldBeTrue("cache should remain since TTL has not expired");
manager.TrackedBlockCount.ShouldBe(1);
await manager.DisposeAsync();
}
// -------------------------------------------------------------------------
// Size-cap eviction via RunEviction
// Go: filestore.go:6220 (expireCacheLocked) — evict oldest when over cap.
//
// Uses TrackWriteAt to inject explicit timestamps, making block1 definitively
// older than block2 without relying on Task.Delay.
// -------------------------------------------------------------------------
[Fact]
public async Task RunEviction_EvictsOldestWhenOverSizeCap()
{
// Arrange: size cap = 300 bytes, two blocks, block1 is older.
var dir1 = Path.Combine(_dir.FullName, "cap1");
var dir2 = Path.Combine(_dir.FullName, "cap2");
Directory.CreateDirectory(dir1);
Directory.CreateDirectory(dir2);
using var block1 = MsgBlock.Create(1, dir1, 1024 * 1024);
using var block2 = MsgBlock.Create(2, dir2, 1024 * 1024);
block1.Write("s1", ReadOnlyMemory<byte>.Empty, "payload-one"u8.ToArray());
block2.Write("s2", ReadOnlyMemory<byte>.Empty, "payload-two"u8.ToArray());
var manager = new FileStore.WriteCacheManager(
maxCacheSizeBytes: 300,
cacheExpiry: TimeSpan.FromSeconds(60),
blockLookup: id => id == 1 ? block1 : id == 2 ? block2 : null);
var now = Environment.TickCount64;
// block1 written 10 s ago (older), block2 written now (newer).
manager.TrackWriteAt(blockId: 1, bytes: 200, tickCount64Ms: now - 10_000);
manager.TrackWriteAt(blockId: 2, bytes: 200, tickCount64Ms: now);
// Total is 400 bytes — exceeds cap of 300.
manager.TotalCachedBytes.ShouldBe(400L);
// Act
manager.RunEviction();
// Assert: oldest (block1) should have been evicted to bring total <= cap.
block1.HasCache.ShouldBeFalse("oldest block should be evicted to enforce size cap");
manager.TotalCachedBytes.ShouldBeLessThanOrEqualTo(300L);
await manager.DisposeAsync();
}
// -------------------------------------------------------------------------
// FlushAllAsync
// Go: filestore.go:5499 (flushPendingMsgsLocked, all blocks).
// -------------------------------------------------------------------------
[Fact]
public async Task FlushAllAsync_ClearsAllTrackedBlocks()
{
// Arrange
var dir1 = Path.Combine(_dir.FullName, "flush1");
var dir2 = Path.Combine(_dir.FullName, "flush2");
Directory.CreateDirectory(dir1);
Directory.CreateDirectory(dir2);
using var block1 = MsgBlock.Create(1, dir1, 1024 * 1024);
using var block2 = MsgBlock.Create(2, dir2, 1024 * 1024);
block1.Write("flush.a", ReadOnlyMemory<byte>.Empty, "aaa"u8.ToArray());
block2.Write("flush.b", ReadOnlyMemory<byte>.Empty, "bbb"u8.ToArray());
var manager = new FileStore.WriteCacheManager(
maxCacheSizeBytes: 64 * 1024 * 1024,
cacheExpiry: TimeSpan.FromSeconds(60),
blockLookup: id => id == 1 ? block1 : id == 2 ? block2 : null);
manager.TrackWrite(blockId: 1, bytes: 64);
manager.TrackWrite(blockId: 2, bytes: 64);
manager.TrackedBlockCount.ShouldBe(2);
// Act
await manager.FlushAllAsync();
// Assert
manager.TrackedBlockCount.ShouldBe(0);
manager.TotalCachedBytes.ShouldBe(0L);
block1.HasCache.ShouldBeFalse("block1 cache should be cleared after FlushAllAsync");
block2.HasCache.ShouldBeFalse("block2 cache should be cleared after FlushAllAsync");
await manager.DisposeAsync();
}
// -------------------------------------------------------------------------
// Integration with FileStore: TrackWrite called on AppendAsync / StoreMsg
// Go: filestore.go:6700 (writeMsgRecord) — cache populated on write.
// -------------------------------------------------------------------------
[Fact]
public async Task FileStore_TracksWriteAfterAppend()
{
// Arrange
await using var store = CreateStore("int-append", new FileStoreOptions
{
BlockSizeBytes = 1024 * 1024,
MaxCacheSize = 64 * 1024 * 1024,
CacheExpiry = TimeSpan.FromSeconds(60),
});
// Act: write a few messages
await store.AppendAsync("foo.bar", "hello world"u8.ToArray(), default);
await store.AppendAsync("foo.baz", "second message"u8.ToArray(), default);
// Assert: blocks were created and messages are retrievable (cache is live).
store.BlockCount.ShouldBeGreaterThanOrEqualTo(1);
}
[Fact]
public async Task FileStore_EvictsBlockCacheOnRotation()
{
// Arrange: tiny block size so rotation happens quickly.
var opts = new FileStoreOptions
{
BlockSizeBytes = 128, // Forces rotation after ~2 messages
MaxCacheSize = 64 * 1024 * 1024,
CacheExpiry = TimeSpan.FromSeconds(60),
};
await using var store = CreateStore("int-rotate", opts);
// Act: write enough to trigger rotation
for (var i = 0; i < 10; i++)
await store.AppendAsync($"subj.{i}", new byte[20], default);
// Assert: multiple blocks exist and all reads still succeed
store.BlockCount.ShouldBeGreaterThan(1);
for (ulong seq = 1; seq <= 10; seq++)
{
var msg = await store.LoadAsync(seq, default);
msg.ShouldNotBeNull($"message at seq={seq} should be recoverable after block rotation");
}
}
[Fact]
public void FileStore_StoreMsg_TracksWrite()
{
// Arrange
using var store = CreateStore("int-storemsg", new FileStoreOptions
{
BlockSizeBytes = 1024 * 1024,
MaxCacheSize = 64 * 1024 * 1024,
CacheExpiry = TimeSpan.FromSeconds(60),
});
// Act
var (seq, _) = store.StoreMsg("test.subject", hdr: null, msg: "payload"u8.ToArray(), ttl: 0);
// Assert: message is retrievable (write was tracked, cache is alive)
seq.ShouldBe(1UL);
}
// -------------------------------------------------------------------------
// IAsyncDisposable: DisposeAsync flushes then stops the timer
// -------------------------------------------------------------------------
[Fact]
public async Task Dispose_FlushesAndStopsBackgroundTask()
{
// Arrange
var dir = Path.Combine(_dir.FullName, "dispose-test");
Directory.CreateDirectory(dir);
using var block = MsgBlock.Create(1, dir, 1024 * 1024);
block.Write("d.subject", ReadOnlyMemory<byte>.Empty, "data"u8.ToArray());
var manager = new FileStore.WriteCacheManager(
maxCacheSizeBytes: 64 * 1024 * 1024,
cacheExpiry: TimeSpan.FromSeconds(60),
blockLookup: id => id == 1 ? block : null);
manager.TrackWrite(blockId: 1, bytes: 64);
// Act: dispose should complete within a reasonable time and clear entries
await manager.DisposeAsync();
// Assert
manager.TrackedBlockCount.ShouldBe(0);
block.HasCache.ShouldBeFalse("cache should be flushed/cleared during DisposeAsync");
}
}