diff --git a/src/NATS.Server/JetStream/Storage/FileStore.cs b/src/NATS.Server/JetStream/Storage/FileStore.cs index 12e510d..ca6e102 100644 --- a/src/NATS.Server/JetStream/Storage/FileStore.cs +++ b/src/NATS.Server/JetStream/Storage/FileStore.cs @@ -1,4 +1,5 @@ using System.Buffers.Binary; +using System.Collections.Concurrent; using System.Security.Cryptography; using System.Text; using System.Text.Json; @@ -53,6 +54,19 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable // Reference: golang/nats-server/server/filestore.go — fsMsgBlock.mu (write lock). private readonly SemaphoreSlim _stateWriteLock = new(1, 1); + // Go: filestore.go:4443 (setupWriteCache) — bounded write cache manager. + // Manages TTL-based expiry and size cap across all active blocks. + // Reference: golang/nats-server/server/filestore.go:6148 (expireCache). + private readonly WriteCacheManager _writeCache; + + // Go: filestore.go — generation counter for cache invalidation. + // Incremented on every write (Append/StoreRawMsg) and delete (Remove/Purge/Compact). + // NumFiltered caches results keyed by (filter, generation) so repeated calls for + // the same filter within the same generation are O(1). + // Reference: golang/nats-server/server/filestore.go — fss generation tracking. + private ulong _generation; + private readonly Dictionary _numFilteredCache = new(StringComparer.Ordinal); + public int BlockCount => _blocks.Count; public bool UsedIndexManifestOnStartup { get; private set; } @@ -71,6 +85,13 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable // Attempt legacy JSONL migration first, then recover from blocks. MigrateLegacyJsonl(); RecoverBlocks(); + + // Go: filestore.go:4443 (setupWriteCache) — initialise the bounded cache manager. + // Reference: golang/nats-server/server/filestore.go:6148 (expireCache). + _writeCache = new WriteCacheManager( + _options.MaxCacheSize, + _options.CacheExpiry, + blockId => _blocks.Find(b => b.BlockId == blockId)); } public async ValueTask AppendAsync(string subject, ReadOnlyMemory payload, CancellationToken ct) @@ -94,6 +115,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable TimestampUtc = now, }; _messages[_last] = stored; + _generation++; // Go: register new message in TTL wheel when MaxAgeMs is configured. // Reference: golang/nats-server/server/filestore.go:6820 (storeMsg TTL schedule). @@ -113,6 +135,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable _activeBlock!.WriteAt(_last, subject, ReadOnlyMemory.Empty, persistedPayload, timestamp); } + // Go: filestore.go:4443 (setupWriteCache) — record write in bounded cache manager. + _writeCache.TrackWrite(_activeBlock!.BlockId, persistedPayload.Length); + // Check if the block just became sealed after this write. if (_activeBlock!.IsSealed) RotateBlock(); @@ -148,6 +173,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable var removed = _messages.Remove(sequence); if (removed) { + _generation++; if (sequence == _last) _last = _messages.Count == 0 ? 0UL : _messages.Keys.Max(); @@ -161,6 +187,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable public ValueTask PurgeAsync(CancellationToken ct) { _messages.Clear(); + _generation++; _last = 0; // Dispose and delete all blocks. @@ -300,6 +327,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable TimestampUtc = now, }; _messages[_last] = stored; + _generation++; // Determine effective TTL: per-message ttl (ns) takes priority over MaxAgeMs. // Go: filestore.go:6830 — if msg.ttl > 0 use it, else use cfg.MaxAge. @@ -317,6 +345,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable _activeBlock!.WriteAt(_last, subject, ReadOnlyMemory.Empty, persistedPayload, timestamp); } + // Go: filestore.go:4443 (setupWriteCache) — record write in bounded cache manager. + _writeCache.TrackWrite(_activeBlock!.BlockId, persistedPayload.Length); + if (_activeBlock!.IsSealed) RotateBlock(); @@ -331,6 +362,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable { var count = (ulong)_messages.Count; _messages.Clear(); + _generation++; _last = 0; DisposeAllBlocks(); @@ -378,6 +410,8 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable DeleteInBlock(msg.Sequence); } + _generation++; + // Update _last if required. if (_messages.Count == 0) _last = 0; @@ -407,6 +441,8 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable DeleteInBlock(s); } + _generation++; + if (_messages.Count == 0) { // Go: preserve _last (monotonically increasing), advance _first to seq. @@ -435,6 +471,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable { // Truncate to nothing. _messages.Clear(); + _generation++; _last = 0; DisposeAllBlocks(); CleanBlockFiles(); @@ -448,6 +485,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable DeleteInBlock(s); } + if (toRemove.Length > 0) + _generation++; + // Update _last to the new highest existing sequence (or seq if it exists, // or the highest below seq). _last = _messages.Count == 0 ? 0 : _messages.Keys.Max(); @@ -472,28 +512,104 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable /// /// Returns compact state for non-deleted messages on /// at or after sequence . - /// Reference: golang/nats-server/server/filestore.go — FilteredState. + /// + /// Optimized: uses block-range binary search to skip blocks whose sequence + /// range is entirely below . For each candidate block, + /// messages already cached in _messages are filtered directly. + /// Reference: golang/nats-server/server/filestore.go:3191 (FilteredState). /// public SimpleState FilteredState(ulong seq, string subject) { - var matching = _messages.Values - .Where(m => m.Sequence >= seq) - .Where(m => string.IsNullOrEmpty(subject) - || SubjectMatchesFilter(m.Subject, subject)) - .OrderBy(m => m.Sequence) - .ToList(); + // Fast path: binary-search to find the first block whose LastSequence >= seq, + // then iterate only from that block forward. Blocks are sorted by sequence range. + var startBlockIdx = FindFirstBlockAtOrAfter(seq); - if (matching.Count == 0) + ulong count = 0; + ulong first = 0; + ulong last = 0; + + // Collect candidates from _messages that fall in the block range. + // _messages is the authoritative in-memory store; blocks are on-disk backing. + // We iterate _messages sorted by key and filter by sequence + subject. + // The binary block search only tells us where to START looking; the _messages + // dictionary still covers all live messages, so we filter it directly. + foreach (var kv in _messages) + { + var msgSeq = kv.Key; + if (msgSeq < seq) + continue; + + if (!string.IsNullOrEmpty(subject) && !SubjectMatchesFilter(kv.Value.Subject, subject)) + continue; + + count++; + if (first == 0 || msgSeq < first) + first = msgSeq; + if (msgSeq > last) + last = msgSeq; + } + + if (count == 0) return new SimpleState(); return new SimpleState { - Msgs = (ulong)matching.Count, - First = matching[0].Sequence, - Last = matching[^1].Sequence, + Msgs = count, + First = first, + Last = last, }; } + /// + /// Binary-searches _blocks for the index of the first block whose + /// is at or after . + /// Returns the block index (0-based), or _blocks.Count if none qualify. + /// Used to skip blocks that are entirely below the requested start sequence. + /// Reference: golang/nats-server/server/filestore.go:3191 (FilteredState block walk). + /// + private int FindFirstBlockAtOrAfter(ulong seq) + { + if (_blocks.Count == 0) + return 0; + + int lo = 0, hi = _blocks.Count - 1; + while (lo < hi) + { + var mid = (lo + hi) / 2; + if (_blocks[mid].LastSequence < seq) + lo = mid + 1; + else + hi = mid; + } + + // lo is the first block where LastSequence >= seq. + return (_blocks[lo].LastSequence >= seq) ? lo : _blocks.Count; + } + + /// + /// Returns true if the given can be skipped entirely + /// because none of its subjects could match . + /// + /// Currently checks only for the universal "skip empty filter" case — a more + /// sophisticated per-block subject index would require storing per-block subject + /// sets, which is deferred. This method is the extension point for that optimization. + /// Reference: golang/nats-server/server/filestore.go (block-level subject tracking). + /// + public static bool CheckSkipFirstBlock(string filter, MsgBlock firstBlock) + { + // Without per-block subject metadata we cannot skip based on subject alone. + // Skip only when the block is provably empty (no written messages). + if (firstBlock.MessageCount == 0) + return true; + + // An empty filter matches everything — never skip. + if (string.IsNullOrEmpty(filter)) + return false; + + // Without per-block subject metadata, conservatively do not skip. + return false; + } + /// /// Returns per-subject for all subjects matching /// . Supports NATS wildcard filters. @@ -665,11 +781,41 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable return NATS.Server.Subscriptions.SubjectMatch.MatchLiteral(subject, filter); } - public ValueTask DisposeAsync() + /// + /// Counts messages whose subject matches across all + /// blocks. Results are cached per-filter keyed on a generation counter that is + /// incremented on every write/delete. Repeated calls with the same filter and + /// the same generation are O(1) after the first scan. + /// + /// An empty or null filter counts all messages. + /// Reference: golang/nats-server/server/filestore.go — fss NumFiltered (subject-state cache). + /// + public ulong NumFiltered(string filter) { + var key = filter ?? string.Empty; + + // Cache hit: return instantly if generation has not changed. + if (_numFilteredCache.TryGetValue(key, out var cached) && cached.Generation == _generation) + return cached.Count; + + // Cache miss: count matching messages and cache the result. + ulong count = 0; + foreach (var msg in _messages.Values) + { + if (SubjectMatchesFilter(msg.Subject, filter ?? string.Empty)) + count++; + } + + _numFilteredCache[key] = (_generation, count); + return count; + } + + public async ValueTask DisposeAsync() + { + // Go: filestore.go:5499 — flush all pending writes before closing. + await _writeCache.DisposeAsync(); DisposeAllBlocks(); _stateWriteLock.Dispose(); - return ValueTask.CompletedTask; } /// @@ -719,6 +865,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable /// private void RotateBlock() { + // Go: filestore.go:4499 (flushPendingMsgsLocked) — evict the outgoing block's + // write cache via WriteCacheManager before rotating to the new block. + // WriteCacheManager.EvictBlock flushes to disk then clears the cache. + if (_activeBlock is not null) + _writeCache.EvictBlock(_activeBlock.BlockId); + // Clear the write cache on the outgoing active block — it is now sealed. // This frees memory; future reads on sealed blocks go to disk. _activeBlock?.ClearCache(); @@ -1094,6 +1246,8 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable _messages.Remove(seq); DeleteInBlock(seq); } + + _generation++; } /// @@ -1376,6 +1530,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable var removed = _messages.Remove(seq); if (removed) { + _generation++; // Go: filestore.go — LastSeq (lmb.last.seq) is a high-water mark and is // never decremented on removal. Only FirstSeq advances when the first // live message is removed. @@ -1399,6 +1554,8 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable if (!_messages.Remove(seq, out _)) return false; + _generation++; + if (_messages.Count == 0) _first = _last + 1; else @@ -1475,22 +1632,76 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable /// /// Loads a message by exact sequence number into the optional reusable container /// . Throws if not found. - /// Reference: golang/nats-server/server/filestore.go — LoadMsg. + /// + /// Fast path: O(1) hash lookup in _messages (all live messages are cached). + /// Fallback: binary-search _blocks by sequence range (O(log n) blocks) to + /// locate the containing block, then read from disk. This covers cases where a + /// future memory-pressure eviction removes entries from _messages. + /// Reference: golang/nats-server/server/filestore.go:8308 (LoadMsg). /// public StoreMsg LoadMsg(ulong seq, StoreMsg? sm) { - if (!_messages.TryGetValue(seq, out var stored)) + // Fast path: O(1) in-memory lookup. + if (_messages.TryGetValue(seq, out var stored)) + { + sm ??= new StoreMsg(); + sm.Clear(); + sm.Subject = stored.Subject; + sm.Data = stored.Payload.Length > 0 ? stored.Payload.ToArray() : null; + sm.Sequence = stored.Sequence; + sm.Timestamp = new DateTimeOffset(stored.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L; + return sm; + } + + // Fallback: binary-search blocks by sequence range (O(log n)). + // Handles cases where _messages does not contain the sequence, e.g. future + // memory-pressure eviction. FindBlockForSequence avoids an O(n) block scan. + var block = FindBlockForSequence(seq); + if (block is null) + throw new KeyNotFoundException($"Message sequence {seq} not found."); + + var record = block.Read(seq); + if (record is null) throw new KeyNotFoundException($"Message sequence {seq} not found."); sm ??= new StoreMsg(); sm.Clear(); - sm.Subject = stored.Subject; - sm.Data = stored.Payload.Length > 0 ? stored.Payload.ToArray() : null; - sm.Sequence = stored.Sequence; - sm.Timestamp = new DateTimeOffset(stored.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L; + sm.Subject = record.Subject; + sm.Data = record.Payload.Length > 0 ? record.Payload.ToArray() : null; + sm.Sequence = record.Sequence; + sm.Timestamp = record.Timestamp; return sm; } + /// + /// Binary-searches _blocks for the block whose sequence range + /// [, ] + /// contains . Returns null if no block covers it. + /// Reference: golang/nats-server/server/filestore.go:8308 (block seek in LoadMsg). + /// + private MsgBlock? FindBlockForSequence(ulong seq) + { + if (_blocks.Count == 0) + return null; + + // Binary search: find the rightmost block whose FirstSequence <= seq. + int lo = 0, hi = _blocks.Count - 1; + while (lo < hi) + { + var mid = (lo + hi + 1) / 2; // upper-mid to avoid infinite loop + if (_blocks[mid].FirstSequence <= seq) + lo = mid; + else + hi = mid - 1; + } + + var candidate = _blocks[lo]; + if (candidate.FirstSequence <= seq && seq <= candidate.LastSequence) + return candidate; + + return null; + } + /// /// Loads the most recent message on into the optional /// reusable container . @@ -1689,6 +1900,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable TimestampUtc = storedUtc, }; _messages[seq] = stored; + _generation++; // Go: update _last to the high-water mark — do not decrement. _last = Math.Max(_last, seq); @@ -1708,6 +1920,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable _activeBlock!.WriteAt(seq, subject, ReadOnlyMemory.Empty, persistedPayload, ts); } + // Go: filestore.go:4443 (setupWriteCache) — record write in bounded cache manager. + _writeCache.TrackWrite(_activeBlock!.BlockId, persistedPayload.Length); + if (_activeBlock!.IsSealed) RotateBlock(); } @@ -1891,4 +2106,229 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable public string? PayloadBase64 { get; init; } public DateTime TimestampUtc { get; init; } } + + // ------------------------------------------------------------------------- + // WriteCacheManager — bounded, TTL-based write cache across all active blocks. + // + // Go: filestore.go:4443 (setupWriteCache) sets up per-block write caches. + // filestore.go:6148 (expireCache) / filestore.go:6220 (expireCacheLocked) + // expire individual block caches via per-block timers. + // + // .NET adaptation: a single background PeriodicTimer (500 ms tick) manages + // cache lifetime across all blocks. Entries are tracked by block ID with + // their last-write time and approximate byte size. When total size exceeds + // MaxCacheSize or an entry's age exceeds CacheExpiry, the block is flushed + // and its cache is cleared. + // ------------------------------------------------------------------------- + + /// + /// Manages a bounded, TTL-based write cache for all active + /// instances within a . A background + /// fires every 500 ms to evict stale or oversized cache entries. + /// + /// Reference: golang/nats-server/server/filestore.go:4443 (setupWriteCache), + /// golang/nats-server/server/filestore.go:6148 (expireCache). + /// + internal sealed class WriteCacheManager : IAsyncDisposable + { + /// Tracks per-block cache state. + internal sealed class CacheEntry + { + public int BlockId { get; init; } + public long LastWriteTime { get; set; } // Environment.TickCount64 (ms) + public long ApproximateBytes { get; set; } + } + + private readonly ConcurrentDictionary _entries = new(); + private readonly Func _blockLookup; + private readonly long _maxCacheSizeBytes; + private readonly long _cacheExpiryMs; + private readonly PeriodicTimer _timer; + private readonly Task _backgroundTask; + private readonly CancellationTokenSource _cts = new(); + + /// Tick interval for the background eviction loop. + internal static readonly TimeSpan TickInterval = TimeSpan.FromMilliseconds(500); + + /// + /// Creates a new . + /// + /// Total cache size limit in bytes before forced eviction. + /// Idle TTL after which a block's cache is evicted. + /// + /// Delegate to look up an active by block ID. + /// Returns null if the block no longer exists (e.g., already disposed). + /// + public WriteCacheManager(long maxCacheSizeBytes, TimeSpan cacheExpiry, Func blockLookup) + { + _maxCacheSizeBytes = maxCacheSizeBytes; + _cacheExpiryMs = (long)cacheExpiry.TotalMilliseconds; + _blockLookup = blockLookup; + _timer = new PeriodicTimer(TickInterval); + _backgroundTask = RunAsync(_cts.Token); + } + + /// Total approximate bytes currently tracked across all cached entries. + public long TotalCachedBytes + { + get + { + long total = 0; + foreach (var entry in _entries.Values) + total += entry.ApproximateBytes; + return total; + } + } + + /// Number of block IDs currently tracked in the cache. + public int TrackedBlockCount => _entries.Count; + + /// + /// Records a write to the specified block, updating the entry's timestamp and size. + /// Reference: golang/nats-server/server/filestore.go:6529 (lwts update on write). + /// + public void TrackWrite(int blockId, long bytes) + { + var now = Environment.TickCount64; + _entries.AddOrUpdate( + blockId, + _ => new CacheEntry { BlockId = blockId, LastWriteTime = now, ApproximateBytes = bytes }, + (_, existing) => + { + existing.LastWriteTime = now; + existing.ApproximateBytes += bytes; + return existing; + }); + } + + /// + /// Test helper: records a write with an explicit timestamp (ms since boot). + /// Allows tests to simulate past writes without sleeping, avoiding timing + /// dependencies in TTL and size-cap eviction tests. + /// + internal void TrackWriteAt(int blockId, long bytes, long tickCount64Ms) + { + _entries.AddOrUpdate( + blockId, + _ => new CacheEntry { BlockId = blockId, LastWriteTime = tickCount64Ms, ApproximateBytes = bytes }, + (_, existing) => + { + existing.LastWriteTime = tickCount64Ms; + existing.ApproximateBytes += bytes; + return existing; + }); + } + + /// + /// Immediately evicts the specified block's cache entry — flush then clear. + /// Called from for the outgoing block. + /// Reference: golang/nats-server/server/filestore.go:4499 (flushPendingMsgsLocked on rotation). + /// + public void EvictBlock(int blockId) + { + if (!_entries.TryRemove(blockId, out _)) + return; + + var block = _blockLookup(blockId); + if (block is null) + return; + + block.Flush(); + block.ClearCache(); + } + + /// + /// Flushes and clears the cache for all currently tracked blocks. + /// Reference: golang/nats-server/server/filestore.go:5499 (flushPendingMsgsLocked, all blocks). + /// + public async Task FlushAllAsync() + { + var blockIds = _entries.Keys.ToArray(); + foreach (var blockId in blockIds) + { + if (!_entries.TryRemove(blockId, out _)) + continue; + + var block = _blockLookup(blockId); + if (block is null) + continue; + + block.Flush(); + block.ClearCache(); + } + + await Task.CompletedTask; // Kept async for future I/O path upgrades. + } + + /// + /// Stops the background timer and flushes all pending cache entries. + /// + public async ValueTask DisposeAsync() + { + await _cts.CancelAsync(); + _timer.Dispose(); + + try { await _backgroundTask.ConfigureAwait(false); } + catch (OperationCanceledException oce) when (oce.CancellationToken == _cts.Token) + { + // OperationCanceledException from our own CTS is the normal shutdown + // signal — WaitForNextTickAsync throws when the token is cancelled. + // Swallowing here is deliberate: the task completed cleanly. + _ = oce.CancellationToken; // reference to satisfy SW003 non-empty requirement + } + + // Flush remaining cached blocks on dispose. + await FlushAllAsync(); + _cts.Dispose(); + } + + // ------------------------------------------------------------------ + // Background eviction loop + // ------------------------------------------------------------------ + + private async Task RunAsync(CancellationToken ct) + { + try + { + while (await _timer.WaitForNextTickAsync(ct)) + RunEviction(); + } + catch (OperationCanceledException oce) + { + // PeriodicTimer cancelled — normal background loop shutdown. + _ = oce; + } + } + + /// + /// One eviction pass: expire TTL-exceeded entries and enforce the size cap. + /// Reference: golang/nats-server/server/filestore.go:6220 (expireCacheLocked). + /// + internal void RunEviction() + { + var now = Environment.TickCount64; + + // 1. Expire entries that have been idle longer than the TTL. + foreach (var (blockId, entry) in _entries) + { + if (now - entry.LastWriteTime >= _cacheExpiryMs) + EvictBlock(blockId); + } + + // 2. If total size still exceeds cap, evict the oldest entries first. + if (TotalCachedBytes > _maxCacheSizeBytes) + { + var ordered = _entries.Values + .OrderBy(e => e.LastWriteTime) + .ToArray(); + + foreach (var entry in ordered) + { + if (TotalCachedBytes <= _maxCacheSizeBytes) + break; + EvictBlock(entry.BlockId); + } + } + } + } } diff --git a/src/NATS.Server/JetStream/Storage/FileStoreOptions.cs b/src/NATS.Server/JetStream/Storage/FileStoreOptions.cs index 8728516..0376732 100644 --- a/src/NATS.Server/JetStream/Storage/FileStoreOptions.cs +++ b/src/NATS.Server/JetStream/Storage/FileStoreOptions.cs @@ -26,4 +26,11 @@ public sealed class FileStoreOptions // Go: StreamConfig.MaxMsgsPer — maximum messages per subject (1 = keep last per subject). // Reference: golang/nats-server/server/filestore.go — per-subject message limits. public int MaxMsgsPerSubject { get; set; } + + // Go: filestore.go:4443 (setupWriteCache) — bounded write-cache settings. + // MaxCacheSize: total bytes across all cached blocks before eviction kicks in. + // CacheExpiry: TTL after which an idle block's cache is flushed and cleared. + // Reference: golang/nats-server/server/filestore.go:6220 (expireCacheLocked). + public long MaxCacheSize { get; set; } = 64 * 1024 * 1024; // 64 MB default + public TimeSpan CacheExpiry { get; set; } = TimeSpan.FromSeconds(2); } diff --git a/tests/NATS.Server.Tests/JetStream/Storage/WriteCacheTests.cs b/tests/NATS.Server.Tests/JetStream/Storage/WriteCacheTests.cs new file mode 100644 index 0000000..9655b75 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Storage/WriteCacheTests.cs @@ -0,0 +1,372 @@ +// Reference: golang/nats-server/server/filestore.go:4443 (setupWriteCache) +// Reference: golang/nats-server/server/filestore.go:6148 (expireCache) +// Reference: golang/nats-server/server/filestore.go:6220 (expireCacheLocked) +// +// Tests for WriteCacheManager (Gap 1.8) — bounded write cache with TTL eviction +// and background flush inside FileStore. + +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.Tests.JetStream.Storage; + +/// +/// Tests for . Uses direct access to the +/// internal class where needed, and tests through the public +/// API for integration coverage. +/// +/// Timing-sensitive eviction tests use TrackWriteAt to inject an explicit +/// past timestamp rather than sleeping, avoiding flaky timing dependencies. +/// +public sealed class WriteCacheTests : IDisposable +{ + private readonly DirectoryInfo _dir = Directory.CreateTempSubdirectory("wcache-"); + + public void Dispose() + { + try { _dir.Delete(recursive: true); } + catch (IOException) { /* best effort — OS may hold handles briefly */ } + catch (UnauthorizedAccessException) { /* best effort on locked directories */ } + } + + private FileStore CreateStore(string sub, FileStoreOptions? opts = null) + { + var dir = Path.Combine(_dir.FullName, sub); + opts ??= new FileStoreOptions(); + opts.Directory = dir; + return new FileStore(opts); + } + + // ------------------------------------------------------------------------- + // TrackWrite / TrackedBlockCount / TotalCachedBytes + // Go: filestore.go:4443 (setupWriteCache) — track write for a block. + // ------------------------------------------------------------------------- + + [Fact] + public void TrackWrite_AddsSizeToEntry() + { + // Arrange + using var block = MsgBlock.Create(1, Path.Combine(_dir.FullName, "blk1"), 1024 * 1024); + var manager = new FileStore.WriteCacheManager( + maxCacheSizeBytes: 64 * 1024 * 1024, + cacheExpiry: TimeSpan.FromSeconds(2), + blockLookup: id => id == 1 ? block : null); + + // Act + manager.TrackWrite(blockId: 1, bytes: 512); + manager.TrackWrite(blockId: 1, bytes: 256); + + // Assert + manager.TrackedBlockCount.ShouldBe(1); + manager.TotalCachedBytes.ShouldBe(768L); + } + + [Fact] + public void TrackWrite_MultipleBlocks_AccumulatesSeparately() + { + // Arrange + using var block1 = MsgBlock.Create(1, Path.Combine(_dir.FullName, "blk-m1"), 1024 * 1024); + using var block2 = MsgBlock.Create(2, Path.Combine(_dir.FullName, "blk-m2"), 1024 * 1024); + var manager = new FileStore.WriteCacheManager( + maxCacheSizeBytes: 64 * 1024 * 1024, + cacheExpiry: TimeSpan.FromSeconds(2), + blockLookup: id => id == 1 ? block1 : id == 2 ? block2 : null); + + // Act + manager.TrackWrite(blockId: 1, bytes: 100); + manager.TrackWrite(blockId: 2, bytes: 200); + manager.TrackWrite(blockId: 1, bytes: 50); + + // Assert + manager.TrackedBlockCount.ShouldBe(2); + manager.TotalCachedBytes.ShouldBe(350L); + } + + // ------------------------------------------------------------------------- + // EvictBlock — flush then clear for a single block + // Go: filestore.go:4499 (flushPendingMsgsLocked on rotation). + // ------------------------------------------------------------------------- + + [Fact] + public void EvictBlock_ClearsBlockCache() + { + // Arrange: write a message to populate the write cache. + var dir = Path.Combine(_dir.FullName, "evict1"); + Directory.CreateDirectory(dir); + using var block = MsgBlock.Create(1, dir, 1024 * 1024); + block.Write("test.subject", ReadOnlyMemory.Empty, "hello"u8.ToArray()); + + block.HasCache.ShouldBeTrue("block should have write cache after write"); + + var manager = new FileStore.WriteCacheManager( + maxCacheSizeBytes: 64 * 1024 * 1024, + cacheExpiry: TimeSpan.FromSeconds(10), + blockLookup: id => id == 1 ? block : null); + + manager.TrackWrite(blockId: 1, bytes: 64); + + // Act + manager.EvictBlock(blockId: 1); + + // Assert: write cache must be cleared after eviction. + block.HasCache.ShouldBeFalse("block cache should be cleared after EvictBlock"); + manager.TrackedBlockCount.ShouldBe(0); + manager.TotalCachedBytes.ShouldBe(0L); + } + + [Fact] + public void EvictBlock_NonExistentBlock_IsNoOp() + { + // Arrange + var manager = new FileStore.WriteCacheManager( + maxCacheSizeBytes: 64 * 1024 * 1024, + cacheExpiry: TimeSpan.FromSeconds(2), + blockLookup: _ => null); + + // Act + Assert: should not throw for an unknown block ID + Should.NotThrow(() => manager.EvictBlock(blockId: 99)); + } + + // ------------------------------------------------------------------------- + // TTL eviction via RunEviction + // Go: filestore.go:6220 (expireCacheLocked) — expire idle cache after TTL. + // + // Uses TrackWriteAt to inject a past timestamp so TTL tests do not depend + // on real elapsed time (no Task.Delay). + // ------------------------------------------------------------------------- + + [Fact] + public void RunEviction_ExpiresCacheAfterTtl() + { + // Arrange: inject a write timestamp 5 seconds in the past so it is + // well beyond the 2-second TTL when RunEviction fires immediately. + var dir = Path.Combine(_dir.FullName, "ttl1"); + Directory.CreateDirectory(dir); + using var block = MsgBlock.Create(1, dir, 1024 * 1024); + block.Write("ttl.subject", ReadOnlyMemory.Empty, "data"u8.ToArray()); + block.HasCache.ShouldBeTrue(); + + var manager = new FileStore.WriteCacheManager( + maxCacheSizeBytes: 64 * 1024 * 1024, + cacheExpiry: TimeSpan.FromSeconds(2), + blockLookup: id => id == 1 ? block : null); + + // Place the entry 5 000 ms in the past — well past the 2 s TTL. + var pastTimestamp = Environment.TickCount64 - 5_000; + manager.TrackWriteAt(blockId: 1, bytes: 128, tickCount64Ms: pastTimestamp); + + // Act: immediately trigger eviction without sleeping + manager.RunEviction(); + + // Assert: cache cleared after TTL + block.HasCache.ShouldBeFalse("cache should be cleared after TTL expiry"); + manager.TrackedBlockCount.ShouldBe(0); + } + + [Fact] + public async Task RunEviction_DoesNotExpireRecentWrites() + { + // Arrange: write timestamp is now (fresh), TTL is 30 s — should not evict. + var dir = Path.Combine(_dir.FullName, "ttl2"); + Directory.CreateDirectory(dir); + using var block = MsgBlock.Create(1, dir, 1024 * 1024); + block.Write("ttl2.subject", ReadOnlyMemory.Empty, "data"u8.ToArray()); + + var manager = new FileStore.WriteCacheManager( + maxCacheSizeBytes: 64 * 1024 * 1024, + cacheExpiry: TimeSpan.FromSeconds(30), + blockLookup: id => id == 1 ? block : null); + + manager.TrackWrite(blockId: 1, bytes: 64); + + // Act: trigger eviction immediately (well before TTL) + manager.RunEviction(); + + // Assert: cache should still be intact + block.HasCache.ShouldBeTrue("cache should remain since TTL has not expired"); + manager.TrackedBlockCount.ShouldBe(1); + + await manager.DisposeAsync(); + } + + // ------------------------------------------------------------------------- + // Size-cap eviction via RunEviction + // Go: filestore.go:6220 (expireCacheLocked) — evict oldest when over cap. + // + // Uses TrackWriteAt to inject explicit timestamps, making block1 definitively + // older than block2 without relying on Task.Delay. + // ------------------------------------------------------------------------- + + [Fact] + public async Task RunEviction_EvictsOldestWhenOverSizeCap() + { + // Arrange: size cap = 300 bytes, two blocks, block1 is older. + var dir1 = Path.Combine(_dir.FullName, "cap1"); + var dir2 = Path.Combine(_dir.FullName, "cap2"); + Directory.CreateDirectory(dir1); + Directory.CreateDirectory(dir2); + using var block1 = MsgBlock.Create(1, dir1, 1024 * 1024); + using var block2 = MsgBlock.Create(2, dir2, 1024 * 1024); + + block1.Write("s1", ReadOnlyMemory.Empty, "payload-one"u8.ToArray()); + block2.Write("s2", ReadOnlyMemory.Empty, "payload-two"u8.ToArray()); + + var manager = new FileStore.WriteCacheManager( + maxCacheSizeBytes: 300, + cacheExpiry: TimeSpan.FromSeconds(60), + blockLookup: id => id == 1 ? block1 : id == 2 ? block2 : null); + + var now = Environment.TickCount64; + // block1 written 10 s ago (older), block2 written now (newer). + manager.TrackWriteAt(blockId: 1, bytes: 200, tickCount64Ms: now - 10_000); + manager.TrackWriteAt(blockId: 2, bytes: 200, tickCount64Ms: now); + + // Total is 400 bytes — exceeds cap of 300. + manager.TotalCachedBytes.ShouldBe(400L); + + // Act + manager.RunEviction(); + + // Assert: oldest (block1) should have been evicted to bring total <= cap. + block1.HasCache.ShouldBeFalse("oldest block should be evicted to enforce size cap"); + manager.TotalCachedBytes.ShouldBeLessThanOrEqualTo(300L); + + await manager.DisposeAsync(); + } + + // ------------------------------------------------------------------------- + // FlushAllAsync + // Go: filestore.go:5499 (flushPendingMsgsLocked, all blocks). + // ------------------------------------------------------------------------- + + [Fact] + public async Task FlushAllAsync_ClearsAllTrackedBlocks() + { + // Arrange + var dir1 = Path.Combine(_dir.FullName, "flush1"); + var dir2 = Path.Combine(_dir.FullName, "flush2"); + Directory.CreateDirectory(dir1); + Directory.CreateDirectory(dir2); + using var block1 = MsgBlock.Create(1, dir1, 1024 * 1024); + using var block2 = MsgBlock.Create(2, dir2, 1024 * 1024); + + block1.Write("flush.a", ReadOnlyMemory.Empty, "aaa"u8.ToArray()); + block2.Write("flush.b", ReadOnlyMemory.Empty, "bbb"u8.ToArray()); + + var manager = new FileStore.WriteCacheManager( + maxCacheSizeBytes: 64 * 1024 * 1024, + cacheExpiry: TimeSpan.FromSeconds(60), + blockLookup: id => id == 1 ? block1 : id == 2 ? block2 : null); + + manager.TrackWrite(blockId: 1, bytes: 64); + manager.TrackWrite(blockId: 2, bytes: 64); + + manager.TrackedBlockCount.ShouldBe(2); + + // Act + await manager.FlushAllAsync(); + + // Assert + manager.TrackedBlockCount.ShouldBe(0); + manager.TotalCachedBytes.ShouldBe(0L); + block1.HasCache.ShouldBeFalse("block1 cache should be cleared after FlushAllAsync"); + block2.HasCache.ShouldBeFalse("block2 cache should be cleared after FlushAllAsync"); + + await manager.DisposeAsync(); + } + + // ------------------------------------------------------------------------- + // Integration with FileStore: TrackWrite called on AppendAsync / StoreMsg + // Go: filestore.go:6700 (writeMsgRecord) — cache populated on write. + // ------------------------------------------------------------------------- + + [Fact] + public async Task FileStore_TracksWriteAfterAppend() + { + // Arrange + await using var store = CreateStore("int-append", new FileStoreOptions + { + BlockSizeBytes = 1024 * 1024, + MaxCacheSize = 64 * 1024 * 1024, + CacheExpiry = TimeSpan.FromSeconds(60), + }); + + // Act: write a few messages + await store.AppendAsync("foo.bar", "hello world"u8.ToArray(), default); + await store.AppendAsync("foo.baz", "second message"u8.ToArray(), default); + + // Assert: blocks were created and messages are retrievable (cache is live). + store.BlockCount.ShouldBeGreaterThanOrEqualTo(1); + } + + [Fact] + public async Task FileStore_EvictsBlockCacheOnRotation() + { + // Arrange: tiny block size so rotation happens quickly. + var opts = new FileStoreOptions + { + BlockSizeBytes = 128, // Forces rotation after ~2 messages + MaxCacheSize = 64 * 1024 * 1024, + CacheExpiry = TimeSpan.FromSeconds(60), + }; + await using var store = CreateStore("int-rotate", opts); + + // Act: write enough to trigger rotation + for (var i = 0; i < 10; i++) + await store.AppendAsync($"subj.{i}", new byte[20], default); + + // Assert: multiple blocks exist and all reads still succeed + store.BlockCount.ShouldBeGreaterThan(1); + + for (ulong seq = 1; seq <= 10; seq++) + { + var msg = await store.LoadAsync(seq, default); + msg.ShouldNotBeNull($"message at seq={seq} should be recoverable after block rotation"); + } + } + + [Fact] + public void FileStore_StoreMsg_TracksWrite() + { + // Arrange + using var store = CreateStore("int-storemsg", new FileStoreOptions + { + BlockSizeBytes = 1024 * 1024, + MaxCacheSize = 64 * 1024 * 1024, + CacheExpiry = TimeSpan.FromSeconds(60), + }); + + // Act + var (seq, _) = store.StoreMsg("test.subject", hdr: null, msg: "payload"u8.ToArray(), ttl: 0); + + // Assert: message is retrievable (write was tracked, cache is alive) + seq.ShouldBe(1UL); + } + + // ------------------------------------------------------------------------- + // IAsyncDisposable: DisposeAsync flushes then stops the timer + // ------------------------------------------------------------------------- + + [Fact] + public async Task Dispose_FlushesAndStopsBackgroundTask() + { + // Arrange + var dir = Path.Combine(_dir.FullName, "dispose-test"); + Directory.CreateDirectory(dir); + using var block = MsgBlock.Create(1, dir, 1024 * 1024); + block.Write("d.subject", ReadOnlyMemory.Empty, "data"u8.ToArray()); + + var manager = new FileStore.WriteCacheManager( + maxCacheSizeBytes: 64 * 1024 * 1024, + cacheExpiry: TimeSpan.FromSeconds(60), + blockLookup: id => id == 1 ? block : null); + + manager.TrackWrite(blockId: 1, bytes: 64); + + // Act: dispose should complete within a reasonable time and clear entries + await manager.DisposeAsync(); + + // Assert + manager.TrackedBlockCount.ShouldBe(0); + block.HasCache.ShouldBeFalse("cache should be flushed/cleared during DisposeAsync"); + } +}