diff --git a/tests/NATS.Server.Tests/JetStream/Storage/FileStoreFilterQueryTests.cs b/tests/NATS.Server.Tests/JetStream/Storage/FileStoreFilterQueryTests.cs new file mode 100644 index 0000000..b79c0fe --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Storage/FileStoreFilterQueryTests.cs @@ -0,0 +1,369 @@ +// Reference: golang/nats-server/server/filestore_test.go +// Go refs: +// FilteredState — filestore.go:3191 +// LoadMsg — filestore.go:8308 +// NumFiltered — fss generation-based subject-state cache (filestore.go:4950+) +// +// Tests for Gap 1.10: Query/Filter Operations +// - FilteredState with literal, wildcard, and > subject filters +// - FilteredState respects start-sequence parameter +// - NumFiltered basic count and generation-based cache hit +// - LoadMsg block-level binary search fallback +// - CheckSkipFirstBlock helper + +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.Tests.JetStream.Storage; + +public sealed class FileStoreFilterQueryTests : IDisposable +{ + private readonly string _dir = + Path.Combine(Path.GetTempPath(), $"nats-js-fq-{Guid.NewGuid():N}"); + + public FileStoreFilterQueryTests() => Directory.CreateDirectory(_dir); + + public void Dispose() + { + if (Directory.Exists(_dir)) + Directory.Delete(_dir, recursive: true); + } + + private FileStore CreateStore(string sub, FileStoreOptions? opts = null) + { + var dir = Path.Combine(_dir, sub); + Directory.CreateDirectory(dir); + opts ??= new FileStoreOptions { BlockSizeBytes = 64 * 1024 }; + opts.Directory = dir; + return new FileStore(opts); + } + + // ------------------------------------------------------------------------- + // FilteredState — literal subject filter + // Go: TestFileStoreFilteredState (filestore_test.go) + // ------------------------------------------------------------------------- + + [Fact] + public async Task FilteredState_with_literal_subject() + { + await using var store = CreateStore("fs-literal"); + + await store.AppendAsync("foo", "a"u8.ToArray(), default); + await store.AppendAsync("bar", "b"u8.ToArray(), default); + await store.AppendAsync("foo", "c"u8.ToArray(), default); + await store.AppendAsync("baz", "d"u8.ToArray(), default); + await store.AppendAsync("foo", "e"u8.ToArray(), default); + + var state = store.FilteredState(seq: 1, subject: "foo"); + + state.Msgs.ShouldBe(3UL); + state.First.ShouldBe(1UL); + state.Last.ShouldBe(5UL); + } + + [Fact] + public async Task FilteredState_with_literal_subject_no_matches() + { + await using var store = CreateStore("fs-literal-none"); + + await store.AppendAsync("foo", "a"u8.ToArray(), default); + await store.AppendAsync("bar", "b"u8.ToArray(), default); + + var state = store.FilteredState(seq: 1, subject: "qux"); + + state.Msgs.ShouldBe(0UL); + state.First.ShouldBe(0UL); + state.Last.ShouldBe(0UL); + } + + // ------------------------------------------------------------------------- + // FilteredState — single-token wildcard (*) + // ------------------------------------------------------------------------- + + [Fact] + public async Task FilteredState_with_wildcard_subject() + { + await using var store = CreateStore("fs-pwc"); + + await store.AppendAsync("foo.a", "1"u8.ToArray(), default); // seq 1 + await store.AppendAsync("foo.b", "2"u8.ToArray(), default); // seq 2 + await store.AppendAsync("bar.a", "3"u8.ToArray(), default); // seq 3 + await store.AppendAsync("foo.c", "4"u8.ToArray(), default); // seq 4 + + var state = store.FilteredState(seq: 1, subject: "foo.*"); + + state.Msgs.ShouldBe(3UL); + state.First.ShouldBe(1UL); + state.Last.ShouldBe(4UL); + } + + // ------------------------------------------------------------------------- + // FilteredState — full wildcard (>) + // ------------------------------------------------------------------------- + + [Fact] + public async Task FilteredState_with_gt_wildcard() + { + await using var store = CreateStore("fs-fwc"); + + await store.AppendAsync("events.created.user", "1"u8.ToArray(), default); // seq 1 + await store.AppendAsync("events.deleted.post", "2"u8.ToArray(), default); // seq 2 + await store.AppendAsync("metrics.cpu", "3"u8.ToArray(), default); // seq 3 + await store.AppendAsync("events.updated.user", "4"u8.ToArray(), default); // seq 4 + + var state = store.FilteredState(seq: 1, subject: "events.>"); + + state.Msgs.ShouldBe(3UL); + state.First.ShouldBe(1UL); + state.Last.ShouldBe(4UL); + } + + // ------------------------------------------------------------------------- + // FilteredState — respects start sequence + // ------------------------------------------------------------------------- + + [Fact] + public async Task FilteredState_starts_from_sequence() + { + await using var store = CreateStore("fs-startseq"); + + await store.AppendAsync("foo", "1"u8.ToArray(), default); // seq 1 + await store.AppendAsync("foo", "2"u8.ToArray(), default); // seq 2 + await store.AppendAsync("bar", "3"u8.ToArray(), default); // seq 3 + await store.AppendAsync("foo", "4"u8.ToArray(), default); // seq 4 + await store.AppendAsync("foo", "5"u8.ToArray(), default); // seq 5 + + // Start at seq=3: should see foo@4 and foo@5 (not foo@1 or foo@2). + var state = store.FilteredState(seq: 3, subject: "foo"); + + state.Msgs.ShouldBe(2UL); + state.First.ShouldBe(4UL); + state.Last.ShouldBe(5UL); + } + + [Fact] + public async Task FilteredState_empty_subject_counts_all() + { + await using var store = CreateStore("fs-empty-subj"); + + await store.AppendAsync("a", "1"u8.ToArray(), default); + await store.AppendAsync("b", "2"u8.ToArray(), default); + await store.AppendAsync("c", "3"u8.ToArray(), default); + + var state = store.FilteredState(seq: 1, subject: ""); + + state.Msgs.ShouldBe(3UL); + state.First.ShouldBe(1UL); + state.Last.ShouldBe(3UL); + } + + // ------------------------------------------------------------------------- + // NumFiltered — basic count + // ------------------------------------------------------------------------- + + [Fact] + public async Task NumFiltered_counts_matching_messages() + { + await using var store = CreateStore("nf-basic"); + + await store.AppendAsync("orders.new", "a"u8.ToArray(), default); + await store.AppendAsync("orders.paid", "b"u8.ToArray(), default); + await store.AppendAsync("shipments.new", "c"u8.ToArray(), default); + await store.AppendAsync("orders.new", "d"u8.ToArray(), default); + + store.NumFiltered("orders.>").ShouldBe(3UL); + store.NumFiltered("orders.new").ShouldBe(2UL); + store.NumFiltered("shipments.new").ShouldBe(1UL); + store.NumFiltered("other.*").ShouldBe(0UL); + } + + // ------------------------------------------------------------------------- + // NumFiltered — generation-based cache hit + // The same filter called twice with no writes in between must return the same + // count. We verify correctness rather than timing; the generation counter + // ensures the second call hits the cache rather than re-scanning. + // ------------------------------------------------------------------------- + + [Fact] + public async Task NumFiltered_caches_result_across_identical_calls() + { + await using var store = CreateStore("nf-cache"); + + await store.AppendAsync("telemetry.a", "x"u8.ToArray(), default); + await store.AppendAsync("telemetry.b", "y"u8.ToArray(), default); + await store.AppendAsync("other", "z"u8.ToArray(), default); + + var first = store.NumFiltered("telemetry.*"); + var second = store.NumFiltered("telemetry.*"); + + first.ShouldBe(2UL); + second.ShouldBe(2UL, "cached result should equal direct scan result"); + } + + [Fact] + public async Task NumFiltered_cache_invalidated_after_append() + { + await using var store = CreateStore("nf-invalidate"); + + await store.AppendAsync("foo", "a"u8.ToArray(), default); + store.NumFiltered("foo").ShouldBe(1UL); + + // A new write must invalidate the cache. + await store.AppendAsync("foo", "b"u8.ToArray(), default); + store.NumFiltered("foo").ShouldBe(2UL); + } + + [Fact] + public async Task NumFiltered_cache_invalidated_after_remove() + { + await using var store = CreateStore("nf-remove"); + + var seq = await store.AppendAsync("foo", "a"u8.ToArray(), default); + await store.AppendAsync("foo", "b"u8.ToArray(), default); + store.NumFiltered("foo").ShouldBe(2UL); + + // Removing a message must invalidate the cache. + await store.RemoveAsync(seq, default); + store.NumFiltered("foo").ShouldBe(1UL); + } + + // ------------------------------------------------------------------------- + // LoadMsg — finds correct message (fast path: in-memory) + // ------------------------------------------------------------------------- + + [Fact] + public void LoadMsg_finds_correct_block() + { + using var store = CreateStore("lm-block", new FileStoreOptions + { + // Small block size so messages are distributed across multiple blocks. + BlockSizeBytes = 128, + }); + + var payloads = new[] { "first"u8.ToArray(), "second"u8.ToArray(), "third"u8.ToArray() }; + foreach (var p in payloads) + { + var t = store.StoreMsg($"subj.{p.Length}", hdr: null, msg: p, ttl: 0); + _ = t; + } + + var sm = store.LoadMsg(2, null); + sm.ShouldNotBeNull(); + sm.Sequence.ShouldBe(2UL); + sm.Subject.ShouldBe("subj.6"); + } + + [Fact] + public void LoadMsg_throws_for_missing_sequence() + { + using var store = CreateStore("lm-missing"); + store.StoreMsg("foo", null, "data"u8.ToArray(), 0); + + Should.Throw(() => store.LoadMsg(99, null)); + } + + [Fact] + public void LoadMsg_reuses_provided_sm() + { + using var store = CreateStore("lm-reuse"); + store.StoreMsg("reuse.subject", null, "payload"u8.ToArray(), 0); + + var sm = new StoreMsg(); + var returned = store.LoadMsg(1, sm); + + returned.ShouldBeSameAs(sm); + sm.Subject.ShouldBe("reuse.subject"); + sm.Sequence.ShouldBe(1UL); + } + + // ------------------------------------------------------------------------- + // CheckSkipFirstBlock — skips empty blocks + // ------------------------------------------------------------------------- + + [Fact] + public void CheckSkipFirstBlock_skips_empty_block() + { + using var block = MsgBlock.Create( + blockId: 1, + directoryPath: Path.Combine(_dir, "skip-empty"), + maxBytes: 1024 * 1024); + + // An empty block (no messages) should always be skippable. + FileStore.CheckSkipFirstBlock("foo.*", block).ShouldBeTrue( + "an empty block has no messages and can always be skipped"); + } + + [Fact] + public void CheckSkipFirstBlock_does_not_skip_block_with_messages() + { + var blockDir = Path.Combine(_dir, "skip-nonempty"); + Directory.CreateDirectory(blockDir); + using var block = MsgBlock.Create(1, blockDir, 1024 * 1024); + block.Write("foo.bar", ReadOnlyMemory.Empty, "hello"u8.ToArray()); + + // Without per-block subject metadata we conservatively do not skip. + FileStore.CheckSkipFirstBlock("foo.*", block).ShouldBeFalse( + "a non-empty block cannot be skipped without per-block subject metadata"); + } + + [Fact] + public void CheckSkipFirstBlock_does_not_skip_for_empty_filter() + { + var blockDir = Path.Combine(_dir, "skip-emptyfilter"); + Directory.CreateDirectory(blockDir); + using var block = MsgBlock.Create(1, blockDir, 1024 * 1024); + block.Write("foo", ReadOnlyMemory.Empty, "x"u8.ToArray()); + + // An empty filter matches everything — never skip. + FileStore.CheckSkipFirstBlock("", block).ShouldBeFalse( + "empty filter matches all subjects, so the block must not be skipped"); + } + + // ------------------------------------------------------------------------- + // FindFirstBlockAtOrAfter — block-range binary search (via FilteredState) + // Verifies that FilteredState returns the correct result when data spans + // multiple sealed blocks. + // ------------------------------------------------------------------------- + + [Fact] + public void FilteredState_correct_across_multiple_blocks() + { + // Use a tiny block size to force multiple blocks. + using var store = CreateStore("fs-multiblock", new FileStoreOptions + { + BlockSizeBytes = 128, + }); + + for (var i = 0; i < 10; i++) + store.StoreMsg(i % 2 == 0 ? "events.even" : "events.odd", + null, System.Text.Encoding.UTF8.GetBytes($"msg{i}"), 0); + + store.BlockCount.ShouldBeGreaterThan(1, "test requires multiple blocks"); + + var even = store.FilteredState(seq: 1, subject: "events.even"); + var odd = store.FilteredState(seq: 1, subject: "events.odd"); + + even.Msgs.ShouldBe(5UL); + odd.Msgs.ShouldBe(5UL); + } + + [Fact] + public void FilteredState_start_seq_beyond_first_block() + { + using var store = CreateStore("fs-beyondblock", new FileStoreOptions + { + BlockSizeBytes = 128, + }); + + for (var i = 1; i <= 10; i++) + store.StoreMsg("topic", null, System.Text.Encoding.UTF8.GetBytes($"m{i}"), 0); + + store.BlockCount.ShouldBeGreaterThan(1, "test requires multiple blocks"); + + // Request only messages from seq 6 onward. + var state = store.FilteredState(seq: 6, subject: "topic"); + + state.Msgs.ShouldBe(5UL); + state.First.ShouldBe(6UL); + state.Last.ShouldBe(10UL); + } +}