Files
Joseph Doherty 4de691c9c5 perf: add FileStore buffered writes, O(1) state tracking, and eliminate redundant per-publish work
Implement Go-parity background flush loop (coalesce 16KB/8ms) in MsgBlock/FileStore,
replace O(n) GetStateAsync with incremental counters, skip PruneExpired/LoadAsync/
PrunePerSubject when not needed, and bypass RAFT for single-replica streams. Fix counter
tracking bugs in RemoveMsg/EraseMsg/TTL expiry and ObjectDisposedException races in
flush loop disposal. FileStore optimizations verified with 3112/3112 JetStream tests
passing; async publish benchmark remains at ~174 msg/s due to E2E protocol path bottleneck.
2026-03-13 03:11:11 -04:00

456 lines
17 KiB
C#

// Reference: golang/nats-server/server/filestore_test.go
// Tests ported from: TestFileStoreNoFSSWhenNoSubjects,
// TestFileStoreNoFSSBugAfterRemoveFirst,
// TestFileStoreNoFSSAfterRecover,
// TestFileStoreSubjectStateCacheExpiration,
// TestFileStoreSubjectsTotals,
// TestFileStoreSubjectCorruption,
// TestFileStoreFilteredPendingBug,
// TestFileStoreFilteredFirstMatchingBug,
// TestFileStoreExpireSubjectMeta,
// TestFileStoreAllFilteredStateWithDeleted
using System.Text;
using NATS.Server.JetStream.Storage;
namespace NATS.Server.JetStream.Tests.JetStream.Storage;
public sealed class FileStoreSubjectTests : IDisposable
{
private readonly string _dir;
public FileStoreSubjectTests()
{
_dir = Path.Combine(Path.GetTempPath(), $"nats-js-fs-subject-{Guid.NewGuid():N}");
Directory.CreateDirectory(_dir);
}
public void Dispose()
{
if (Directory.Exists(_dir))
Directory.Delete(_dir, recursive: true);
}
private FileStore CreateStore(string subdirectory, FileStoreOptions? options = null)
{
var dir = Path.Combine(_dir, subdirectory);
var opts = options ?? new FileStoreOptions();
opts.Directory = dir;
return new FileStore(opts);
}
// Go: TestFileStoreNoFSSWhenNoSubjects server/filestore_test.go:4251
[Fact]
public async Task Store_with_empty_subject()
{
await using var store = CreateStore("empty-subj");
// Store messages with empty subject (like raft state).
for (var i = 0; i < 10; i++)
await store.AppendAsync(string.Empty, "raft state"u8.ToArray(), default);
var state = await store.GetStateAsync(default);
state.Messages.ShouldBe((ulong)10);
// Should be loadable.
var msg = await store.LoadAsync(1, default);
msg.ShouldNotBeNull();
msg!.Subject.ShouldBe(string.Empty);
}
// Go: TestFileStoreNoFSSBugAfterRemoveFirst server/filestore_test.go:4289
[Fact]
public async Task Remove_first_with_different_subjects()
{
await using var store = CreateStore("rm-first-subj");
await store.AppendAsync("foo", "first"u8.ToArray(), default);
await store.AppendAsync("bar", "second"u8.ToArray(), default);
await store.AppendAsync("foo", "third"u8.ToArray(), default);
// Remove first message.
(await store.RemoveAsync(1, default)).ShouldBeTrue();
var state = await store.GetStateAsync(default);
state.Messages.ShouldBe((ulong)2);
state.FirstSeq.ShouldBe((ulong)2);
// LoadLastBySubject should still work for "foo".
var lastFoo = await store.LoadLastBySubjectAsync("foo", default);
lastFoo.ShouldNotBeNull();
lastFoo!.Sequence.ShouldBe((ulong)3);
}
// Go: TestFileStoreNoFSSAfterRecover server/filestore_test.go:4333
[Fact]
public async Task Subject_filtering_after_recovery()
{
var subDir = "subj-after-recover";
await using (var store = CreateStore(subDir))
{
await store.AppendAsync("foo.1", "a"u8.ToArray(), default);
await store.AppendAsync("foo.2", "b"u8.ToArray(), default);
await store.AppendAsync("bar.1", "c"u8.ToArray(), default);
await store.AppendAsync("foo.1", "d"u8.ToArray(), default);
}
// Recover.
await using (var store = CreateStore(subDir))
{
var state = await store.GetStateAsync(default);
state.Messages.ShouldBe((ulong)4);
// LoadLastBySubject should work after recovery.
var lastFoo1 = await store.LoadLastBySubjectAsync("foo.1", default);
lastFoo1.ShouldNotBeNull();
lastFoo1!.Sequence.ShouldBe((ulong)4);
lastFoo1.Payload.ToArray().ShouldBe("d"u8.ToArray());
var lastBar1 = await store.LoadLastBySubjectAsync("bar.1", default);
lastBar1.ShouldNotBeNull();
lastBar1!.Sequence.ShouldBe((ulong)3);
}
}
// Go: TestFileStoreSubjectStateCacheExpiration server/filestore_test.go:4143
[Fact]
public async Task Subject_state_cache_expiration()
{
await using var store = CreateStore("subj-state-cache");
await store.AppendAsync("foo.1", "a"u8.ToArray(), default);
await store.AppendAsync("foo.2", "b"u8.ToArray(), default);
await store.AppendAsync("bar.1", "c"u8.ToArray(), default);
// Initial state: 3 subjects, each with 1 message.
var initial = store.SubjectsState(">");
initial.Count.ShouldBe(3);
initial["foo.1"].Msgs.ShouldBe((ulong)1);
initial["foo.2"].Msgs.ShouldBe((ulong)1);
initial["bar.1"].Msgs.ShouldBe((ulong)1);
// Add a second message to "foo.1" — cache must be invalidated.
await store.AppendAsync("foo.1", "d"u8.ToArray(), default);
var updated = store.SubjectsState(">");
updated.Count.ShouldBe(3);
updated["foo.1"].Msgs.ShouldBe((ulong)2);
updated["foo.1"].First.ShouldBe((ulong)1);
updated["foo.1"].Last.ShouldBe((ulong)4);
// Remove one "foo.1" message — cache must be invalidated again.
(await store.RemoveAsync(1, default)).ShouldBeTrue();
var afterRemove = store.SubjectsState(">");
afterRemove.Count.ShouldBe(3);
afterRemove["foo.1"].Msgs.ShouldBe((ulong)1);
afterRemove["foo.1"].First.ShouldBe((ulong)4);
afterRemove["foo.1"].Last.ShouldBe((ulong)4);
}
// Go: TestFileStoreSubjectsTotals server/filestore_test.go:4948
[Fact]
public async Task Subjects_totals_with_wildcards()
{
await using var store = CreateStore("subj-totals");
await store.AppendAsync("foo.a", "1"u8.ToArray(), default);
await store.AppendAsync("foo.b", "2"u8.ToArray(), default);
await store.AppendAsync("foo.a", "3"u8.ToArray(), default);
await store.AppendAsync("bar.c", "4"u8.ToArray(), default);
// Filter to foo.> — should only see foo subjects.
var fooTotals = store.SubjectsTotals("foo.>");
fooTotals.Count.ShouldBe(2);
fooTotals["foo.a"].ShouldBe((ulong)2);
fooTotals["foo.b"].ShouldBe((ulong)1);
fooTotals.ContainsKey("bar.c").ShouldBeFalse();
// Filter to > — should see all subjects.
var allTotals = store.SubjectsTotals(">");
allTotals.Count.ShouldBe(3);
allTotals["foo.a"].ShouldBe((ulong)2);
allTotals["foo.b"].ShouldBe((ulong)1);
allTotals["bar.c"].ShouldBe((ulong)1);
}
// Go: TestFileStoreSubjectCorruption server/filestore_test.go:6466
[Fact]
public async Task Subject_corruption_detection()
{
await using var store = CreateStore("subj-corruption");
await store.AppendAsync("foo", "a"u8.ToArray(), default);
await store.AppendAsync("bar", "b"u8.ToArray(), default);
await store.AppendAsync("baz", "c"u8.ToArray(), default);
// Each sequence should map to the correct subject.
store.SubjectForSeq(1).ShouldBe("foo");
store.SubjectForSeq(2).ShouldBe("bar");
store.SubjectForSeq(3).ShouldBe("baz");
// Remove seq 2 — SubjectForSeq should throw for the removed sequence.
(await store.RemoveAsync(2, default)).ShouldBeTrue();
Should.Throw<KeyNotFoundException>(() => store.SubjectForSeq(2));
// Non-existent sequence should also throw.
Should.Throw<KeyNotFoundException>(() => store.SubjectForSeq(999));
// Remaining sequences still resolve correctly.
store.SubjectForSeq(1).ShouldBe("foo");
store.SubjectForSeq(3).ShouldBe("baz");
}
// Go: TestFileStoreFilteredPendingBug server/filestore_test.go:3414
[Fact]
public async Task Filtered_pending_no_match_returns_zero()
{
await using var store = CreateStore("filtered-pending-nomatch");
await store.AppendAsync("foo", "a"u8.ToArray(), default);
await store.AppendAsync("foo", "b"u8.ToArray(), default);
await store.AppendAsync("foo", "c"u8.ToArray(), default);
// Filter "bar" matches no messages — Msgs should be 0.
var state = store.FilteredState(1, "bar");
state.Msgs.ShouldBe((ulong)0);
}
// Go: TestFileStoreFilteredFirstMatchingBug server/filestore_test.go:4448
// The bug was that LoadNextMsg with a filter could return a message whose subject
// did not match the filter when fss (per-subject state) was regenerated from only
// part of the block. The fix: when no matching message exists at or after start,
// throw KeyNotFoundException rather than returning a wrong-subject message.
[Fact]
public async Task Filtered_first_matching_finds_correct_sequence()
{
await using var store = CreateStore("filtered-first-match");
// seqs 1-3: "foo.foo", seq 4: "foo.bar" (no more "foo.foo" after seq 3)
await store.AppendAsync("foo.foo", "A"u8.ToArray(), default);
await store.AppendAsync("foo.foo", "B"u8.ToArray(), default);
await store.AppendAsync("foo.foo", "C"u8.ToArray(), default);
await store.AppendAsync("foo.bar", "X"u8.ToArray(), default);
// Starting at seq 4, filter "foo.foo" — seq 4 is "foo.bar", and there are no
// further "foo.foo" messages, so LoadNextMsg must throw rather than return a
// message with the wrong subject.
Should.Throw<KeyNotFoundException>(() =>
store.LoadNextMsg("foo.foo", false, 4, null));
// Sanity: starting at seq 1 should find "foo.foo" at seq 1 with no skip.
var (msg, skip) = store.LoadNextMsg("foo.foo", false, 1, null);
msg.Subject.ShouldBe("foo.foo");
msg.Sequence.ShouldBe((ulong)1);
skip.ShouldBe((ulong)0);
}
// Go: TestFileStoreExpireSubjectMeta server/filestore_test.go:4014
[Fact]
public async Task Expired_subject_metadata_cleans_up()
{
await using var store = CreateStore("expire-subj-meta");
await store.AppendAsync("foo.1", "a"u8.ToArray(), default);
await store.AppendAsync("foo.1", "b"u8.ToArray(), default);
await store.AppendAsync("foo.2", "c"u8.ToArray(), default);
// Remove ALL messages on "foo.1".
(await store.RemoveAsync(1, default)).ShouldBeTrue();
(await store.RemoveAsync(2, default)).ShouldBeTrue();
// "foo.1" should have been cleaned up — not present in SubjectsState.
var state = store.SubjectsState(">");
state.ContainsKey("foo.1").ShouldBeFalse();
// "foo.2" is still alive.
state.ContainsKey("foo.2").ShouldBeTrue();
state["foo.2"].Msgs.ShouldBe((ulong)1);
}
// Go: TestFileStoreAllFilteredStateWithDeleted server/filestore_test.go:4827
[Fact]
public async Task Filtered_state_with_deleted_messages()
{
await using var store = CreateStore("filtered-state-deleted");
// Store 5 messages on "foo" — seqs 1..5.
for (var i = 0; i < 5; i++)
await store.AppendAsync("foo", "x"u8.ToArray(), default);
// Remove seqs 2 and 4 — seqs 1, 3, 5 remain.
(await store.RemoveAsync(2, default)).ShouldBeTrue();
(await store.RemoveAsync(4, default)).ShouldBeTrue();
// FilteredState from seq 1 on "foo" should report 3 remaining messages.
var state = store.FilteredState(1, "foo");
state.Msgs.ShouldBe((ulong)3);
state.First.ShouldBe((ulong)1);
state.Last.ShouldBe((ulong)5);
}
// Test LoadLastBySubject with multiple subjects and removes.
[Fact]
public async Task LoadLastBySubject_after_removes()
{
await using var store = CreateStore("last-after-rm");
await store.AppendAsync("foo", "a"u8.ToArray(), default);
await store.AppendAsync("foo", "b"u8.ToArray(), default);
await store.AppendAsync("foo", "c"u8.ToArray(), default);
// Remove the last message on "foo" (seq 3).
await store.RemoveAsync(3, default);
var last = await store.LoadLastBySubjectAsync("foo", default);
last.ShouldNotBeNull();
last!.Sequence.ShouldBe((ulong)2);
last.Payload.ToArray().ShouldBe("b"u8.ToArray());
}
// Test LoadLastBySubject when all messages on that subject are removed.
[Fact]
public async Task LoadLastBySubject_all_removed_returns_null()
{
await using var store = CreateStore("last-all-rm");
await store.AppendAsync("foo", "a"u8.ToArray(), default);
await store.AppendAsync("foo", "b"u8.ToArray(), default);
await store.AppendAsync("bar", "c"u8.ToArray(), default);
await store.RemoveAsync(1, default);
await store.RemoveAsync(2, default);
var last = await store.LoadLastBySubjectAsync("foo", default);
last.ShouldBeNull();
// "bar" should still be present.
var lastBar = await store.LoadLastBySubjectAsync("bar", default);
lastBar.ShouldNotBeNull();
lastBar!.Sequence.ShouldBe((ulong)3);
}
// Test multiple subjects interleaved.
[Fact]
public async Task Multiple_subjects_interleaved()
{
await using var store = CreateStore("interleaved");
for (var i = 0; i < 20; i++)
{
var subject = i % 3 == 0 ? "alpha" : (i % 3 == 1 ? "beta" : "gamma");
await store.AppendAsync(subject, Encoding.UTF8.GetBytes($"msg-{i}"), default);
}
var state = await store.GetStateAsync(default);
state.Messages.ShouldBe((ulong)20);
// Verify all subjects are loadable and correct.
for (ulong i = 1; i <= 20; i++)
{
var msg = await store.LoadAsync(i, default);
msg.ShouldNotBeNull();
var idx = (int)(i - 1);
var expectedSubj = idx % 3 == 0 ? "alpha" : (idx % 3 == 1 ? "beta" : "gamma");
msg!.Subject.ShouldBe(expectedSubj);
}
}
// Test LoadLastBySubject with case-sensitive subjects.
[Fact]
public async Task LoadLastBySubject_is_case_sensitive()
{
await using var store = CreateStore("case-sensitive");
await store.AppendAsync("Foo", "upper"u8.ToArray(), default);
await store.AppendAsync("foo", "lower"u8.ToArray(), default);
var lastUpper = await store.LoadLastBySubjectAsync("Foo", default);
lastUpper.ShouldNotBeNull();
lastUpper!.Payload.ToArray().ShouldBe("upper"u8.ToArray());
var lastLower = await store.LoadLastBySubjectAsync("foo", default);
lastLower.ShouldNotBeNull();
lastLower!.Payload.ToArray().ShouldBe("lower"u8.ToArray());
}
// Test subject preservation across restarts.
[Fact]
public async Task Subject_preserved_across_restart()
{
var subDir = "subj-restart";
await using (var store = CreateStore(subDir))
{
await store.AppendAsync("topic.a", "one"u8.ToArray(), default);
await store.AppendAsync("topic.b", "two"u8.ToArray(), default);
await store.AppendAsync("topic.c", "three"u8.ToArray(), default);
}
await using (var store = CreateStore(subDir))
{
var msg1 = await store.LoadAsync(1, default);
msg1.ShouldNotBeNull();
msg1!.Subject.ShouldBe("topic.a");
var msg2 = await store.LoadAsync(2, default);
msg2.ShouldNotBeNull();
msg2!.Subject.ShouldBe("topic.b");
var msg3 = await store.LoadAsync(3, default);
msg3.ShouldNotBeNull();
msg3!.Subject.ShouldBe("topic.c");
}
}
// Go: TestFileStoreNumPendingLastBySubject server/filestore_test.go:6501
[Fact]
public async Task NumPending_last_per_subject()
{
await using var store = CreateStore("num-pending-lps");
// "foo" x3, "bar" x2 — 2 distinct subjects.
await store.AppendAsync("foo", "1"u8.ToArray(), default);
await store.AppendAsync("foo", "2"u8.ToArray(), default);
await store.AppendAsync("foo", "3"u8.ToArray(), default);
await store.AppendAsync("bar", "4"u8.ToArray(), default);
await store.AppendAsync("bar", "5"u8.ToArray(), default);
// lastPerSubject=true: count only the last message per distinct subject.
// 2 distinct subjects → Total == 2.
var (total, _) = store.NumPending(1, ">", true);
total.ShouldBe((ulong)2);
// lastPerSubject=false: count all messages at or after sseq 1.
var (totalAll, _) = store.NumPending(1, ">", false);
totalAll.ShouldBe((ulong)5);
// Filter to just "foo" with lastPerSubject=true → 1.
var (fooLps, _) = store.NumPending(1, "foo", true);
fooLps.ShouldBe((ulong)1);
}
// Test many distinct subjects.
[Fact]
public async Task Many_distinct_subjects()
{
await using var store = CreateStore("many-subjects");
for (var i = 0; i < 100; i++)
await store.AppendAsync($"kv.{i}", Encoding.UTF8.GetBytes($"value-{i}"), default);
var state = await store.GetStateAsync(default);
state.Messages.ShouldBe((ulong)100);
// Each subject should have exactly one message.
for (var i = 0; i < 100; i++)
{
var last = await store.LoadLastBySubjectAsync($"kv.{i}", default);
last.ShouldNotBeNull();
last!.Payload.ToArray().ShouldBe(Encoding.UTF8.GetBytes($"value-{i}"));
}
}
}