// Reference: golang/nats-server/server/filestore.go:4443 (setupWriteCache) // Reference: golang/nats-server/server/filestore.go:6148 (expireCache) // Reference: golang/nats-server/server/filestore.go:6220 (expireCacheLocked) // // Tests for WriteCacheManager (Gap 1.8) — bounded write cache with TTL eviction // and background flush inside FileStore. using NATS.Server.JetStream.Storage; namespace NATS.Server.Tests.JetStream.Storage; /// /// Tests for . Uses direct access to the /// internal class where needed, and tests through the public /// API for integration coverage. /// /// Timing-sensitive eviction tests use TrackWriteAt to inject an explicit /// past timestamp rather than sleeping, avoiding flaky timing dependencies. /// public sealed class WriteCacheTests : IDisposable { private readonly DirectoryInfo _dir = Directory.CreateTempSubdirectory("wcache-"); public void Dispose() { try { _dir.Delete(recursive: true); } catch (IOException) { /* best effort — OS may hold handles briefly */ } catch (UnauthorizedAccessException) { /* best effort on locked directories */ } } private FileStore CreateStore(string sub, FileStoreOptions? opts = null) { var dir = Path.Combine(_dir.FullName, sub); opts ??= new FileStoreOptions(); opts.Directory = dir; return new FileStore(opts); } // ------------------------------------------------------------------------- // TrackWrite / TrackedBlockCount / TotalCachedBytes // Go: filestore.go:4443 (setupWriteCache) — track write for a block. // ------------------------------------------------------------------------- [Fact] public void TrackWrite_AddsSizeToEntry() { // Arrange using var block = MsgBlock.Create(1, Path.Combine(_dir.FullName, "blk1"), 1024 * 1024); var manager = new FileStore.WriteCacheManager( maxCacheSizeBytes: 64 * 1024 * 1024, cacheExpiry: TimeSpan.FromSeconds(2), blockLookup: id => id == 1 ? block : null); // Act manager.TrackWrite(blockId: 1, bytes: 512); manager.TrackWrite(blockId: 1, bytes: 256); // Assert manager.TrackedBlockCount.ShouldBe(1); manager.TotalCachedBytes.ShouldBe(768L); } [Fact] public void TrackWrite_MultipleBlocks_AccumulatesSeparately() { // Arrange using var block1 = MsgBlock.Create(1, Path.Combine(_dir.FullName, "blk-m1"), 1024 * 1024); using var block2 = MsgBlock.Create(2, Path.Combine(_dir.FullName, "blk-m2"), 1024 * 1024); var manager = new FileStore.WriteCacheManager( maxCacheSizeBytes: 64 * 1024 * 1024, cacheExpiry: TimeSpan.FromSeconds(2), blockLookup: id => id == 1 ? block1 : id == 2 ? block2 : null); // Act manager.TrackWrite(blockId: 1, bytes: 100); manager.TrackWrite(blockId: 2, bytes: 200); manager.TrackWrite(blockId: 1, bytes: 50); // Assert manager.TrackedBlockCount.ShouldBe(2); manager.TotalCachedBytes.ShouldBe(350L); } // ------------------------------------------------------------------------- // EvictBlock — flush then clear for a single block // Go: filestore.go:4499 (flushPendingMsgsLocked on rotation). // ------------------------------------------------------------------------- [Fact] public void EvictBlock_ClearsBlockCache() { // Arrange: write a message to populate the write cache. var dir = Path.Combine(_dir.FullName, "evict1"); Directory.CreateDirectory(dir); using var block = MsgBlock.Create(1, dir, 1024 * 1024); block.Write("test.subject", ReadOnlyMemory.Empty, "hello"u8.ToArray()); block.HasCache.ShouldBeTrue("block should have write cache after write"); var manager = new FileStore.WriteCacheManager( maxCacheSizeBytes: 64 * 1024 * 1024, cacheExpiry: TimeSpan.FromSeconds(10), blockLookup: id => id == 1 ? block : null); manager.TrackWrite(blockId: 1, bytes: 64); // Act manager.EvictBlock(blockId: 1); // Assert: write cache must be cleared after eviction. block.HasCache.ShouldBeFalse("block cache should be cleared after EvictBlock"); manager.TrackedBlockCount.ShouldBe(0); manager.TotalCachedBytes.ShouldBe(0L); } [Fact] public void EvictBlock_NonExistentBlock_IsNoOp() { // Arrange var manager = new FileStore.WriteCacheManager( maxCacheSizeBytes: 64 * 1024 * 1024, cacheExpiry: TimeSpan.FromSeconds(2), blockLookup: _ => null); // Act + Assert: should not throw for an unknown block ID Should.NotThrow(() => manager.EvictBlock(blockId: 99)); } // ------------------------------------------------------------------------- // TTL eviction via RunEviction // Go: filestore.go:6220 (expireCacheLocked) — expire idle cache after TTL. // // Uses TrackWriteAt to inject a past timestamp so TTL tests do not depend // on real elapsed time (no Task.Delay). // ------------------------------------------------------------------------- [Fact] public void RunEviction_ExpiresCacheAfterTtl() { // Arrange: inject a write timestamp 5 seconds in the past so it is // well beyond the 2-second TTL when RunEviction fires immediately. var dir = Path.Combine(_dir.FullName, "ttl1"); Directory.CreateDirectory(dir); using var block = MsgBlock.Create(1, dir, 1024 * 1024); block.Write("ttl.subject", ReadOnlyMemory.Empty, "data"u8.ToArray()); block.HasCache.ShouldBeTrue(); var manager = new FileStore.WriteCacheManager( maxCacheSizeBytes: 64 * 1024 * 1024, cacheExpiry: TimeSpan.FromSeconds(2), blockLookup: id => id == 1 ? block : null); // Place the entry 5 000 ms in the past — well past the 2 s TTL. var pastTimestamp = Environment.TickCount64 - 5_000; manager.TrackWriteAt(blockId: 1, bytes: 128, tickCount64Ms: pastTimestamp); // Act: immediately trigger eviction without sleeping manager.RunEviction(); // Assert: cache cleared after TTL block.HasCache.ShouldBeFalse("cache should be cleared after TTL expiry"); manager.TrackedBlockCount.ShouldBe(0); } [Fact] public async Task RunEviction_DoesNotExpireRecentWrites() { // Arrange: write timestamp is now (fresh), TTL is 30 s — should not evict. var dir = Path.Combine(_dir.FullName, "ttl2"); Directory.CreateDirectory(dir); using var block = MsgBlock.Create(1, dir, 1024 * 1024); block.Write("ttl2.subject", ReadOnlyMemory.Empty, "data"u8.ToArray()); var manager = new FileStore.WriteCacheManager( maxCacheSizeBytes: 64 * 1024 * 1024, cacheExpiry: TimeSpan.FromSeconds(30), blockLookup: id => id == 1 ? block : null); manager.TrackWrite(blockId: 1, bytes: 64); // Act: trigger eviction immediately (well before TTL) manager.RunEviction(); // Assert: cache should still be intact block.HasCache.ShouldBeTrue("cache should remain since TTL has not expired"); manager.TrackedBlockCount.ShouldBe(1); await manager.DisposeAsync(); } // ------------------------------------------------------------------------- // Size-cap eviction via RunEviction // Go: filestore.go:6220 (expireCacheLocked) — evict oldest when over cap. // // Uses TrackWriteAt to inject explicit timestamps, making block1 definitively // older than block2 without relying on Task.Delay. // ------------------------------------------------------------------------- [Fact] public async Task RunEviction_EvictsOldestWhenOverSizeCap() { // Arrange: size cap = 300 bytes, two blocks, block1 is older. var dir1 = Path.Combine(_dir.FullName, "cap1"); var dir2 = Path.Combine(_dir.FullName, "cap2"); Directory.CreateDirectory(dir1); Directory.CreateDirectory(dir2); using var block1 = MsgBlock.Create(1, dir1, 1024 * 1024); using var block2 = MsgBlock.Create(2, dir2, 1024 * 1024); block1.Write("s1", ReadOnlyMemory.Empty, "payload-one"u8.ToArray()); block2.Write("s2", ReadOnlyMemory.Empty, "payload-two"u8.ToArray()); var manager = new FileStore.WriteCacheManager( maxCacheSizeBytes: 300, cacheExpiry: TimeSpan.FromSeconds(60), blockLookup: id => id == 1 ? block1 : id == 2 ? block2 : null); var now = Environment.TickCount64; // block1 written 10 s ago (older), block2 written now (newer). manager.TrackWriteAt(blockId: 1, bytes: 200, tickCount64Ms: now - 10_000); manager.TrackWriteAt(blockId: 2, bytes: 200, tickCount64Ms: now); // Total is 400 bytes — exceeds cap of 300. manager.TotalCachedBytes.ShouldBe(400L); // Act manager.RunEviction(); // Assert: oldest (block1) should have been evicted to bring total <= cap. block1.HasCache.ShouldBeFalse("oldest block should be evicted to enforce size cap"); manager.TotalCachedBytes.ShouldBeLessThanOrEqualTo(300L); await manager.DisposeAsync(); } // ------------------------------------------------------------------------- // FlushAllAsync // Go: filestore.go:5499 (flushPendingMsgsLocked, all blocks). // ------------------------------------------------------------------------- [Fact] public async Task FlushAllAsync_ClearsAllTrackedBlocks() { // Arrange var dir1 = Path.Combine(_dir.FullName, "flush1"); var dir2 = Path.Combine(_dir.FullName, "flush2"); Directory.CreateDirectory(dir1); Directory.CreateDirectory(dir2); using var block1 = MsgBlock.Create(1, dir1, 1024 * 1024); using var block2 = MsgBlock.Create(2, dir2, 1024 * 1024); block1.Write("flush.a", ReadOnlyMemory.Empty, "aaa"u8.ToArray()); block2.Write("flush.b", ReadOnlyMemory.Empty, "bbb"u8.ToArray()); var manager = new FileStore.WriteCacheManager( maxCacheSizeBytes: 64 * 1024 * 1024, cacheExpiry: TimeSpan.FromSeconds(60), blockLookup: id => id == 1 ? block1 : id == 2 ? block2 : null); manager.TrackWrite(blockId: 1, bytes: 64); manager.TrackWrite(blockId: 2, bytes: 64); manager.TrackedBlockCount.ShouldBe(2); // Act await manager.FlushAllAsync(); // Assert manager.TrackedBlockCount.ShouldBe(0); manager.TotalCachedBytes.ShouldBe(0L); block1.HasCache.ShouldBeFalse("block1 cache should be cleared after FlushAllAsync"); block2.HasCache.ShouldBeFalse("block2 cache should be cleared after FlushAllAsync"); await manager.DisposeAsync(); } // ------------------------------------------------------------------------- // Integration with FileStore: TrackWrite called on AppendAsync / StoreMsg // Go: filestore.go:6700 (writeMsgRecord) — cache populated on write. // ------------------------------------------------------------------------- [Fact] public async Task FileStore_TracksWriteAfterAppend() { // Arrange await using var store = CreateStore("int-append", new FileStoreOptions { BlockSizeBytes = 1024 * 1024, MaxCacheSize = 64 * 1024 * 1024, CacheExpiry = TimeSpan.FromSeconds(60), }); // Act: write a few messages await store.AppendAsync("foo.bar", "hello world"u8.ToArray(), default); await store.AppendAsync("foo.baz", "second message"u8.ToArray(), default); // Assert: blocks were created and messages are retrievable (cache is live). store.BlockCount.ShouldBeGreaterThanOrEqualTo(1); } [Fact] public async Task FileStore_EvictsBlockCacheOnRotation() { // Arrange: tiny block size so rotation happens quickly. var opts = new FileStoreOptions { BlockSizeBytes = 128, // Forces rotation after ~2 messages MaxCacheSize = 64 * 1024 * 1024, CacheExpiry = TimeSpan.FromSeconds(60), }; await using var store = CreateStore("int-rotate", opts); // Act: write enough to trigger rotation for (var i = 0; i < 10; i++) await store.AppendAsync($"subj.{i}", new byte[20], default); // Assert: multiple blocks exist and all reads still succeed store.BlockCount.ShouldBeGreaterThan(1); for (ulong seq = 1; seq <= 10; seq++) { var msg = await store.LoadAsync(seq, default); msg.ShouldNotBeNull($"message at seq={seq} should be recoverable after block rotation"); } } [Fact] public void FileStore_StoreMsg_TracksWrite() { // Arrange using var store = CreateStore("int-storemsg", new FileStoreOptions { BlockSizeBytes = 1024 * 1024, MaxCacheSize = 64 * 1024 * 1024, CacheExpiry = TimeSpan.FromSeconds(60), }); // Act var (seq, _) = store.StoreMsg("test.subject", hdr: null, msg: "payload"u8.ToArray(), ttl: 0); // Assert: message is retrievable (write was tracked, cache is alive) seq.ShouldBe(1UL); } // ------------------------------------------------------------------------- // IAsyncDisposable: DisposeAsync flushes then stops the timer // ------------------------------------------------------------------------- [Fact] public async Task Dispose_FlushesAndStopsBackgroundTask() { // Arrange var dir = Path.Combine(_dir.FullName, "dispose-test"); Directory.CreateDirectory(dir); using var block = MsgBlock.Create(1, dir, 1024 * 1024); block.Write("d.subject", ReadOnlyMemory.Empty, "data"u8.ToArray()); var manager = new FileStore.WriteCacheManager( maxCacheSizeBytes: 64 * 1024 * 1024, cacheExpiry: TimeSpan.FromSeconds(60), blockLookup: id => id == 1 ? block : null); manager.TrackWrite(blockId: 1, bytes: 64); // Act: dispose should complete within a reasonable time and clear entries await manager.DisposeAsync(); // Assert manager.TrackedBlockCount.ShouldBe(0); block.HasCache.ShouldBeFalse("cache should be flushed/cleared during DisposeAsync"); } }