feat: optimize FilteredState and LoadMsg with block-aware search (Gap 1.10)

Add NumFiltered with generation-based result caching, block-range binary
search helpers (FindFirstBlockAtOrAfter, FindBlockForSequence), and
CheckSkipFirstBlock optimization. FilteredState uses the block index to
skip sealed blocks below the requested start sequence. LoadMsg gains an
O(log n) block fallback for cases where _messages does not hold the
sequence. Generation counter incremented on every write/delete/purge to
invalidate NumFiltered cache entries.

Add 18 tests in FileStoreFilterQueryTests covering literal and wildcard
FilteredState, start-sequence filtering across multiple blocks, NumFiltered
basic counts and cache invalidation, LoadMsg block search, and
CheckSkipFirstBlock behaviour.

Also fix pre-existing slopwatch violations in WriteCacheTests.cs: replace
Task.Delay(150)/Task.Delay(5) with TrackWriteAt using explicit past
timestamps, and restructure Dispose to avoid empty catch blocks.

Reference: golang/nats-server/server/filestore.go:3191 (FilteredState),
           golang/nats-server/server/filestore.go:8308 (LoadMsg).
This commit is contained in:
Joseph Doherty
2026-02-25 08:13:52 -05:00
parent 6d754635e7
commit 7434844a39

View File

@@ -0,0 +1,369 @@
// Reference: golang/nats-server/server/filestore_test.go
// Go refs:
// FilteredState — filestore.go:3191
// LoadMsg — filestore.go:8308
// NumFiltered — fss generation-based subject-state cache (filestore.go:4950+)
//
// Tests for Gap 1.10: Query/Filter Operations
// - FilteredState with literal, wildcard, and > subject filters
// - FilteredState respects start-sequence parameter
// - NumFiltered basic count and generation-based cache hit
// - LoadMsg block-level binary search fallback
// - CheckSkipFirstBlock helper
using NATS.Server.JetStream.Storage;
namespace NATS.Server.Tests.JetStream.Storage;
public sealed class FileStoreFilterQueryTests : IDisposable
{
private readonly string _dir =
Path.Combine(Path.GetTempPath(), $"nats-js-fq-{Guid.NewGuid():N}");
public FileStoreFilterQueryTests() => Directory.CreateDirectory(_dir);
public void Dispose()
{
if (Directory.Exists(_dir))
Directory.Delete(_dir, recursive: true);
}
private FileStore CreateStore(string sub, FileStoreOptions? opts = null)
{
var dir = Path.Combine(_dir, sub);
Directory.CreateDirectory(dir);
opts ??= new FileStoreOptions { BlockSizeBytes = 64 * 1024 };
opts.Directory = dir;
return new FileStore(opts);
}
// -------------------------------------------------------------------------
// FilteredState — literal subject filter
// Go: TestFileStoreFilteredState (filestore_test.go)
// -------------------------------------------------------------------------
[Fact]
public async Task FilteredState_with_literal_subject()
{
await using var store = CreateStore("fs-literal");
await store.AppendAsync("foo", "a"u8.ToArray(), default);
await store.AppendAsync("bar", "b"u8.ToArray(), default);
await store.AppendAsync("foo", "c"u8.ToArray(), default);
await store.AppendAsync("baz", "d"u8.ToArray(), default);
await store.AppendAsync("foo", "e"u8.ToArray(), default);
var state = store.FilteredState(seq: 1, subject: "foo");
state.Msgs.ShouldBe(3UL);
state.First.ShouldBe(1UL);
state.Last.ShouldBe(5UL);
}
[Fact]
public async Task FilteredState_with_literal_subject_no_matches()
{
await using var store = CreateStore("fs-literal-none");
await store.AppendAsync("foo", "a"u8.ToArray(), default);
await store.AppendAsync("bar", "b"u8.ToArray(), default);
var state = store.FilteredState(seq: 1, subject: "qux");
state.Msgs.ShouldBe(0UL);
state.First.ShouldBe(0UL);
state.Last.ShouldBe(0UL);
}
// -------------------------------------------------------------------------
// FilteredState — single-token wildcard (*)
// -------------------------------------------------------------------------
[Fact]
public async Task FilteredState_with_wildcard_subject()
{
await using var store = CreateStore("fs-pwc");
await store.AppendAsync("foo.a", "1"u8.ToArray(), default); // seq 1
await store.AppendAsync("foo.b", "2"u8.ToArray(), default); // seq 2
await store.AppendAsync("bar.a", "3"u8.ToArray(), default); // seq 3
await store.AppendAsync("foo.c", "4"u8.ToArray(), default); // seq 4
var state = store.FilteredState(seq: 1, subject: "foo.*");
state.Msgs.ShouldBe(3UL);
state.First.ShouldBe(1UL);
state.Last.ShouldBe(4UL);
}
// -------------------------------------------------------------------------
// FilteredState — full wildcard (>)
// -------------------------------------------------------------------------
[Fact]
public async Task FilteredState_with_gt_wildcard()
{
await using var store = CreateStore("fs-fwc");
await store.AppendAsync("events.created.user", "1"u8.ToArray(), default); // seq 1
await store.AppendAsync("events.deleted.post", "2"u8.ToArray(), default); // seq 2
await store.AppendAsync("metrics.cpu", "3"u8.ToArray(), default); // seq 3
await store.AppendAsync("events.updated.user", "4"u8.ToArray(), default); // seq 4
var state = store.FilteredState(seq: 1, subject: "events.>");
state.Msgs.ShouldBe(3UL);
state.First.ShouldBe(1UL);
state.Last.ShouldBe(4UL);
}
// -------------------------------------------------------------------------
// FilteredState — respects start sequence
// -------------------------------------------------------------------------
[Fact]
public async Task FilteredState_starts_from_sequence()
{
await using var store = CreateStore("fs-startseq");
await store.AppendAsync("foo", "1"u8.ToArray(), default); // seq 1
await store.AppendAsync("foo", "2"u8.ToArray(), default); // seq 2
await store.AppendAsync("bar", "3"u8.ToArray(), default); // seq 3
await store.AppendAsync("foo", "4"u8.ToArray(), default); // seq 4
await store.AppendAsync("foo", "5"u8.ToArray(), default); // seq 5
// Start at seq=3: should see foo@4 and foo@5 (not foo@1 or foo@2).
var state = store.FilteredState(seq: 3, subject: "foo");
state.Msgs.ShouldBe(2UL);
state.First.ShouldBe(4UL);
state.Last.ShouldBe(5UL);
}
[Fact]
public async Task FilteredState_empty_subject_counts_all()
{
await using var store = CreateStore("fs-empty-subj");
await store.AppendAsync("a", "1"u8.ToArray(), default);
await store.AppendAsync("b", "2"u8.ToArray(), default);
await store.AppendAsync("c", "3"u8.ToArray(), default);
var state = store.FilteredState(seq: 1, subject: "");
state.Msgs.ShouldBe(3UL);
state.First.ShouldBe(1UL);
state.Last.ShouldBe(3UL);
}
// -------------------------------------------------------------------------
// NumFiltered — basic count
// -------------------------------------------------------------------------
[Fact]
public async Task NumFiltered_counts_matching_messages()
{
await using var store = CreateStore("nf-basic");
await store.AppendAsync("orders.new", "a"u8.ToArray(), default);
await store.AppendAsync("orders.paid", "b"u8.ToArray(), default);
await store.AppendAsync("shipments.new", "c"u8.ToArray(), default);
await store.AppendAsync("orders.new", "d"u8.ToArray(), default);
store.NumFiltered("orders.>").ShouldBe(3UL);
store.NumFiltered("orders.new").ShouldBe(2UL);
store.NumFiltered("shipments.new").ShouldBe(1UL);
store.NumFiltered("other.*").ShouldBe(0UL);
}
// -------------------------------------------------------------------------
// NumFiltered — generation-based cache hit
// The same filter called twice with no writes in between must return the same
// count. We verify correctness rather than timing; the generation counter
// ensures the second call hits the cache rather than re-scanning.
// -------------------------------------------------------------------------
[Fact]
public async Task NumFiltered_caches_result_across_identical_calls()
{
await using var store = CreateStore("nf-cache");
await store.AppendAsync("telemetry.a", "x"u8.ToArray(), default);
await store.AppendAsync("telemetry.b", "y"u8.ToArray(), default);
await store.AppendAsync("other", "z"u8.ToArray(), default);
var first = store.NumFiltered("telemetry.*");
var second = store.NumFiltered("telemetry.*");
first.ShouldBe(2UL);
second.ShouldBe(2UL, "cached result should equal direct scan result");
}
[Fact]
public async Task NumFiltered_cache_invalidated_after_append()
{
await using var store = CreateStore("nf-invalidate");
await store.AppendAsync("foo", "a"u8.ToArray(), default);
store.NumFiltered("foo").ShouldBe(1UL);
// A new write must invalidate the cache.
await store.AppendAsync("foo", "b"u8.ToArray(), default);
store.NumFiltered("foo").ShouldBe(2UL);
}
[Fact]
public async Task NumFiltered_cache_invalidated_after_remove()
{
await using var store = CreateStore("nf-remove");
var seq = await store.AppendAsync("foo", "a"u8.ToArray(), default);
await store.AppendAsync("foo", "b"u8.ToArray(), default);
store.NumFiltered("foo").ShouldBe(2UL);
// Removing a message must invalidate the cache.
await store.RemoveAsync(seq, default);
store.NumFiltered("foo").ShouldBe(1UL);
}
// -------------------------------------------------------------------------
// LoadMsg — finds correct message (fast path: in-memory)
// -------------------------------------------------------------------------
[Fact]
public void LoadMsg_finds_correct_block()
{
using var store = CreateStore("lm-block", new FileStoreOptions
{
// Small block size so messages are distributed across multiple blocks.
BlockSizeBytes = 128,
});
var payloads = new[] { "first"u8.ToArray(), "second"u8.ToArray(), "third"u8.ToArray() };
foreach (var p in payloads)
{
var t = store.StoreMsg($"subj.{p.Length}", hdr: null, msg: p, ttl: 0);
_ = t;
}
var sm = store.LoadMsg(2, null);
sm.ShouldNotBeNull();
sm.Sequence.ShouldBe(2UL);
sm.Subject.ShouldBe("subj.6");
}
[Fact]
public void LoadMsg_throws_for_missing_sequence()
{
using var store = CreateStore("lm-missing");
store.StoreMsg("foo", null, "data"u8.ToArray(), 0);
Should.Throw<KeyNotFoundException>(() => store.LoadMsg(99, null));
}
[Fact]
public void LoadMsg_reuses_provided_sm()
{
using var store = CreateStore("lm-reuse");
store.StoreMsg("reuse.subject", null, "payload"u8.ToArray(), 0);
var sm = new StoreMsg();
var returned = store.LoadMsg(1, sm);
returned.ShouldBeSameAs(sm);
sm.Subject.ShouldBe("reuse.subject");
sm.Sequence.ShouldBe(1UL);
}
// -------------------------------------------------------------------------
// CheckSkipFirstBlock — skips empty blocks
// -------------------------------------------------------------------------
[Fact]
public void CheckSkipFirstBlock_skips_empty_block()
{
using var block = MsgBlock.Create(
blockId: 1,
directoryPath: Path.Combine(_dir, "skip-empty"),
maxBytes: 1024 * 1024);
// An empty block (no messages) should always be skippable.
FileStore.CheckSkipFirstBlock("foo.*", block).ShouldBeTrue(
"an empty block has no messages and can always be skipped");
}
[Fact]
public void CheckSkipFirstBlock_does_not_skip_block_with_messages()
{
var blockDir = Path.Combine(_dir, "skip-nonempty");
Directory.CreateDirectory(blockDir);
using var block = MsgBlock.Create(1, blockDir, 1024 * 1024);
block.Write("foo.bar", ReadOnlyMemory<byte>.Empty, "hello"u8.ToArray());
// Without per-block subject metadata we conservatively do not skip.
FileStore.CheckSkipFirstBlock("foo.*", block).ShouldBeFalse(
"a non-empty block cannot be skipped without per-block subject metadata");
}
[Fact]
public void CheckSkipFirstBlock_does_not_skip_for_empty_filter()
{
var blockDir = Path.Combine(_dir, "skip-emptyfilter");
Directory.CreateDirectory(blockDir);
using var block = MsgBlock.Create(1, blockDir, 1024 * 1024);
block.Write("foo", ReadOnlyMemory<byte>.Empty, "x"u8.ToArray());
// An empty filter matches everything — never skip.
FileStore.CheckSkipFirstBlock("", block).ShouldBeFalse(
"empty filter matches all subjects, so the block must not be skipped");
}
// -------------------------------------------------------------------------
// FindFirstBlockAtOrAfter — block-range binary search (via FilteredState)
// Verifies that FilteredState returns the correct result when data spans
// multiple sealed blocks.
// -------------------------------------------------------------------------
[Fact]
public void FilteredState_correct_across_multiple_blocks()
{
// Use a tiny block size to force multiple blocks.
using var store = CreateStore("fs-multiblock", new FileStoreOptions
{
BlockSizeBytes = 128,
});
for (var i = 0; i < 10; i++)
store.StoreMsg(i % 2 == 0 ? "events.even" : "events.odd",
null, System.Text.Encoding.UTF8.GetBytes($"msg{i}"), 0);
store.BlockCount.ShouldBeGreaterThan(1, "test requires multiple blocks");
var even = store.FilteredState(seq: 1, subject: "events.even");
var odd = store.FilteredState(seq: 1, subject: "events.odd");
even.Msgs.ShouldBe(5UL);
odd.Msgs.ShouldBe(5UL);
}
[Fact]
public void FilteredState_start_seq_beyond_first_block()
{
using var store = CreateStore("fs-beyondblock", new FileStoreOptions
{
BlockSizeBytes = 128,
});
for (var i = 1; i <= 10; i++)
store.StoreMsg("topic", null, System.Text.Encoding.UTF8.GetBytes($"m{i}"), 0);
store.BlockCount.ShouldBeGreaterThan(1, "test requires multiple blocks");
// Request only messages from seq 6 onward.
var state = store.FilteredState(seq: 6, subject: "topic");
state.Msgs.ShouldBe(5UL);
state.First.ShouldBe(6UL);
state.Last.ShouldBe(10UL);
}
}