// Reference: golang/nats-server/server/filestore_test.go // Tests ported from: TestFileStoreNoFSSWhenNoSubjects, // TestFileStoreNoFSSBugAfterRemoveFirst, // TestFileStoreNoFSSAfterRecover, // TestFileStoreSubjectStateCacheExpiration, // TestFileStoreSubjectsTotals, // TestFileStoreSubjectCorruption, // TestFileStoreFilteredPendingBug, // TestFileStoreFilteredFirstMatchingBug, // TestFileStoreExpireSubjectMeta, // TestFileStoreAllFilteredStateWithDeleted using System.Text; using NATS.Server.JetStream.Storage; namespace NATS.Server.JetStream.Tests.JetStream.Storage; public sealed class FileStoreSubjectTests : IDisposable { private readonly string _dir; public FileStoreSubjectTests() { _dir = Path.Combine(Path.GetTempPath(), $"nats-js-fs-subject-{Guid.NewGuid():N}"); Directory.CreateDirectory(_dir); } public void Dispose() { if (Directory.Exists(_dir)) Directory.Delete(_dir, recursive: true); } private FileStore CreateStore(string subdirectory, FileStoreOptions? options = null) { var dir = Path.Combine(_dir, subdirectory); var opts = options ?? new FileStoreOptions(); opts.Directory = dir; return new FileStore(opts); } // Go: TestFileStoreNoFSSWhenNoSubjects server/filestore_test.go:4251 [Fact] public async Task Store_with_empty_subject() { await using var store = CreateStore("empty-subj"); // Store messages with empty subject (like raft state). for (var i = 0; i < 10; i++) await store.AppendAsync(string.Empty, "raft state"u8.ToArray(), default); var state = await store.GetStateAsync(default); state.Messages.ShouldBe((ulong)10); // Should be loadable. var msg = await store.LoadAsync(1, default); msg.ShouldNotBeNull(); msg!.Subject.ShouldBe(string.Empty); } // Go: TestFileStoreNoFSSBugAfterRemoveFirst server/filestore_test.go:4289 [Fact] public async Task Remove_first_with_different_subjects() { await using var store = CreateStore("rm-first-subj"); await store.AppendAsync("foo", "first"u8.ToArray(), default); await store.AppendAsync("bar", "second"u8.ToArray(), default); await store.AppendAsync("foo", "third"u8.ToArray(), default); // Remove first message. (await store.RemoveAsync(1, default)).ShouldBeTrue(); var state = await store.GetStateAsync(default); state.Messages.ShouldBe((ulong)2); state.FirstSeq.ShouldBe((ulong)2); // LoadLastBySubject should still work for "foo". var lastFoo = await store.LoadLastBySubjectAsync("foo", default); lastFoo.ShouldNotBeNull(); lastFoo!.Sequence.ShouldBe((ulong)3); } // Go: TestFileStoreNoFSSAfterRecover server/filestore_test.go:4333 [Fact] public async Task Subject_filtering_after_recovery() { var subDir = "subj-after-recover"; await using (var store = CreateStore(subDir)) { await store.AppendAsync("foo.1", "a"u8.ToArray(), default); await store.AppendAsync("foo.2", "b"u8.ToArray(), default); await store.AppendAsync("bar.1", "c"u8.ToArray(), default); await store.AppendAsync("foo.1", "d"u8.ToArray(), default); } // Recover. await using (var store = CreateStore(subDir)) { var state = await store.GetStateAsync(default); state.Messages.ShouldBe((ulong)4); // LoadLastBySubject should work after recovery. var lastFoo1 = await store.LoadLastBySubjectAsync("foo.1", default); lastFoo1.ShouldNotBeNull(); lastFoo1!.Sequence.ShouldBe((ulong)4); lastFoo1.Payload.ToArray().ShouldBe("d"u8.ToArray()); var lastBar1 = await store.LoadLastBySubjectAsync("bar.1", default); lastBar1.ShouldNotBeNull(); lastBar1!.Sequence.ShouldBe((ulong)3); } } // Go: TestFileStoreSubjectStateCacheExpiration server/filestore_test.go:4143 [Fact] public async Task Subject_state_cache_expiration() { await using var store = CreateStore("subj-state-cache"); await store.AppendAsync("foo.1", "a"u8.ToArray(), default); await store.AppendAsync("foo.2", "b"u8.ToArray(), default); await store.AppendAsync("bar.1", "c"u8.ToArray(), default); // Initial state: 3 subjects, each with 1 message. var initial = store.SubjectsState(">"); initial.Count.ShouldBe(3); initial["foo.1"].Msgs.ShouldBe((ulong)1); initial["foo.2"].Msgs.ShouldBe((ulong)1); initial["bar.1"].Msgs.ShouldBe((ulong)1); // Add a second message to "foo.1" — cache must be invalidated. await store.AppendAsync("foo.1", "d"u8.ToArray(), default); var updated = store.SubjectsState(">"); updated.Count.ShouldBe(3); updated["foo.1"].Msgs.ShouldBe((ulong)2); updated["foo.1"].First.ShouldBe((ulong)1); updated["foo.1"].Last.ShouldBe((ulong)4); // Remove one "foo.1" message — cache must be invalidated again. (await store.RemoveAsync(1, default)).ShouldBeTrue(); var afterRemove = store.SubjectsState(">"); afterRemove.Count.ShouldBe(3); afterRemove["foo.1"].Msgs.ShouldBe((ulong)1); afterRemove["foo.1"].First.ShouldBe((ulong)4); afterRemove["foo.1"].Last.ShouldBe((ulong)4); } // Go: TestFileStoreSubjectsTotals server/filestore_test.go:4948 [Fact] public async Task Subjects_totals_with_wildcards() { await using var store = CreateStore("subj-totals"); await store.AppendAsync("foo.a", "1"u8.ToArray(), default); await store.AppendAsync("foo.b", "2"u8.ToArray(), default); await store.AppendAsync("foo.a", "3"u8.ToArray(), default); await store.AppendAsync("bar.c", "4"u8.ToArray(), default); // Filter to foo.> — should only see foo subjects. var fooTotals = store.SubjectsTotals("foo.>"); fooTotals.Count.ShouldBe(2); fooTotals["foo.a"].ShouldBe((ulong)2); fooTotals["foo.b"].ShouldBe((ulong)1); fooTotals.ContainsKey("bar.c").ShouldBeFalse(); // Filter to > — should see all subjects. var allTotals = store.SubjectsTotals(">"); allTotals.Count.ShouldBe(3); allTotals["foo.a"].ShouldBe((ulong)2); allTotals["foo.b"].ShouldBe((ulong)1); allTotals["bar.c"].ShouldBe((ulong)1); } // Go: TestFileStoreSubjectCorruption server/filestore_test.go:6466 [Fact] public async Task Subject_corruption_detection() { await using var store = CreateStore("subj-corruption"); await store.AppendAsync("foo", "a"u8.ToArray(), default); await store.AppendAsync("bar", "b"u8.ToArray(), default); await store.AppendAsync("baz", "c"u8.ToArray(), default); // Each sequence should map to the correct subject. store.SubjectForSeq(1).ShouldBe("foo"); store.SubjectForSeq(2).ShouldBe("bar"); store.SubjectForSeq(3).ShouldBe("baz"); // Remove seq 2 — SubjectForSeq should throw for the removed sequence. (await store.RemoveAsync(2, default)).ShouldBeTrue(); Should.Throw(() => store.SubjectForSeq(2)); // Non-existent sequence should also throw. Should.Throw(() => store.SubjectForSeq(999)); // Remaining sequences still resolve correctly. store.SubjectForSeq(1).ShouldBe("foo"); store.SubjectForSeq(3).ShouldBe("baz"); } // Go: TestFileStoreFilteredPendingBug server/filestore_test.go:3414 [Fact] public async Task Filtered_pending_no_match_returns_zero() { await using var store = CreateStore("filtered-pending-nomatch"); await store.AppendAsync("foo", "a"u8.ToArray(), default); await store.AppendAsync("foo", "b"u8.ToArray(), default); await store.AppendAsync("foo", "c"u8.ToArray(), default); // Filter "bar" matches no messages — Msgs should be 0. var state = store.FilteredState(1, "bar"); state.Msgs.ShouldBe((ulong)0); } // Go: TestFileStoreFilteredFirstMatchingBug server/filestore_test.go:4448 // The bug was that LoadNextMsg with a filter could return a message whose subject // did not match the filter when fss (per-subject state) was regenerated from only // part of the block. The fix: when no matching message exists at or after start, // throw KeyNotFoundException rather than returning a wrong-subject message. [Fact] public async Task Filtered_first_matching_finds_correct_sequence() { await using var store = CreateStore("filtered-first-match"); // seqs 1-3: "foo.foo", seq 4: "foo.bar" (no more "foo.foo" after seq 3) await store.AppendAsync("foo.foo", "A"u8.ToArray(), default); await store.AppendAsync("foo.foo", "B"u8.ToArray(), default); await store.AppendAsync("foo.foo", "C"u8.ToArray(), default); await store.AppendAsync("foo.bar", "X"u8.ToArray(), default); // Starting at seq 4, filter "foo.foo" — seq 4 is "foo.bar", and there are no // further "foo.foo" messages, so LoadNextMsg must throw rather than return a // message with the wrong subject. Should.Throw(() => store.LoadNextMsg("foo.foo", false, 4, null)); // Sanity: starting at seq 1 should find "foo.foo" at seq 1 with no skip. var (msg, skip) = store.LoadNextMsg("foo.foo", false, 1, null); msg.Subject.ShouldBe("foo.foo"); msg.Sequence.ShouldBe((ulong)1); skip.ShouldBe((ulong)0); } // Go: TestFileStoreExpireSubjectMeta server/filestore_test.go:4014 [Fact] public async Task Expired_subject_metadata_cleans_up() { await using var store = CreateStore("expire-subj-meta"); await store.AppendAsync("foo.1", "a"u8.ToArray(), default); await store.AppendAsync("foo.1", "b"u8.ToArray(), default); await store.AppendAsync("foo.2", "c"u8.ToArray(), default); // Remove ALL messages on "foo.1". (await store.RemoveAsync(1, default)).ShouldBeTrue(); (await store.RemoveAsync(2, default)).ShouldBeTrue(); // "foo.1" should have been cleaned up — not present in SubjectsState. var state = store.SubjectsState(">"); state.ContainsKey("foo.1").ShouldBeFalse(); // "foo.2" is still alive. state.ContainsKey("foo.2").ShouldBeTrue(); state["foo.2"].Msgs.ShouldBe((ulong)1); } // Go: TestFileStoreAllFilteredStateWithDeleted server/filestore_test.go:4827 [Fact] public async Task Filtered_state_with_deleted_messages() { await using var store = CreateStore("filtered-state-deleted"); // Store 5 messages on "foo" — seqs 1..5. for (var i = 0; i < 5; i++) await store.AppendAsync("foo", "x"u8.ToArray(), default); // Remove seqs 2 and 4 — seqs 1, 3, 5 remain. (await store.RemoveAsync(2, default)).ShouldBeTrue(); (await store.RemoveAsync(4, default)).ShouldBeTrue(); // FilteredState from seq 1 on "foo" should report 3 remaining messages. var state = store.FilteredState(1, "foo"); state.Msgs.ShouldBe((ulong)3); state.First.ShouldBe((ulong)1); state.Last.ShouldBe((ulong)5); } // Test LoadLastBySubject with multiple subjects and removes. [Fact] public async Task LoadLastBySubject_after_removes() { await using var store = CreateStore("last-after-rm"); await store.AppendAsync("foo", "a"u8.ToArray(), default); await store.AppendAsync("foo", "b"u8.ToArray(), default); await store.AppendAsync("foo", "c"u8.ToArray(), default); // Remove the last message on "foo" (seq 3). await store.RemoveAsync(3, default); var last = await store.LoadLastBySubjectAsync("foo", default); last.ShouldNotBeNull(); last!.Sequence.ShouldBe((ulong)2); last.Payload.ToArray().ShouldBe("b"u8.ToArray()); } // Test LoadLastBySubject when all messages on that subject are removed. [Fact] public async Task LoadLastBySubject_all_removed_returns_null() { await using var store = CreateStore("last-all-rm"); await store.AppendAsync("foo", "a"u8.ToArray(), default); await store.AppendAsync("foo", "b"u8.ToArray(), default); await store.AppendAsync("bar", "c"u8.ToArray(), default); await store.RemoveAsync(1, default); await store.RemoveAsync(2, default); var last = await store.LoadLastBySubjectAsync("foo", default); last.ShouldBeNull(); // "bar" should still be present. var lastBar = await store.LoadLastBySubjectAsync("bar", default); lastBar.ShouldNotBeNull(); lastBar!.Sequence.ShouldBe((ulong)3); } // Test multiple subjects interleaved. [Fact] public async Task Multiple_subjects_interleaved() { await using var store = CreateStore("interleaved"); for (var i = 0; i < 20; i++) { var subject = i % 3 == 0 ? "alpha" : (i % 3 == 1 ? "beta" : "gamma"); await store.AppendAsync(subject, Encoding.UTF8.GetBytes($"msg-{i}"), default); } var state = await store.GetStateAsync(default); state.Messages.ShouldBe((ulong)20); // Verify all subjects are loadable and correct. for (ulong i = 1; i <= 20; i++) { var msg = await store.LoadAsync(i, default); msg.ShouldNotBeNull(); var idx = (int)(i - 1); var expectedSubj = idx % 3 == 0 ? "alpha" : (idx % 3 == 1 ? "beta" : "gamma"); msg!.Subject.ShouldBe(expectedSubj); } } // Test LoadLastBySubject with case-sensitive subjects. [Fact] public async Task LoadLastBySubject_is_case_sensitive() { await using var store = CreateStore("case-sensitive"); await store.AppendAsync("Foo", "upper"u8.ToArray(), default); await store.AppendAsync("foo", "lower"u8.ToArray(), default); var lastUpper = await store.LoadLastBySubjectAsync("Foo", default); lastUpper.ShouldNotBeNull(); lastUpper!.Payload.ToArray().ShouldBe("upper"u8.ToArray()); var lastLower = await store.LoadLastBySubjectAsync("foo", default); lastLower.ShouldNotBeNull(); lastLower!.Payload.ToArray().ShouldBe("lower"u8.ToArray()); } // Test subject preservation across restarts. [Fact] public async Task Subject_preserved_across_restart() { var subDir = "subj-restart"; await using (var store = CreateStore(subDir)) { await store.AppendAsync("topic.a", "one"u8.ToArray(), default); await store.AppendAsync("topic.b", "two"u8.ToArray(), default); await store.AppendAsync("topic.c", "three"u8.ToArray(), default); } await using (var store = CreateStore(subDir)) { var msg1 = await store.LoadAsync(1, default); msg1.ShouldNotBeNull(); msg1!.Subject.ShouldBe("topic.a"); var msg2 = await store.LoadAsync(2, default); msg2.ShouldNotBeNull(); msg2!.Subject.ShouldBe("topic.b"); var msg3 = await store.LoadAsync(3, default); msg3.ShouldNotBeNull(); msg3!.Subject.ShouldBe("topic.c"); } } // Go: TestFileStoreNumPendingLastBySubject server/filestore_test.go:6501 [Fact] public async Task NumPending_last_per_subject() { await using var store = CreateStore("num-pending-lps"); // "foo" x3, "bar" x2 — 2 distinct subjects. await store.AppendAsync("foo", "1"u8.ToArray(), default); await store.AppendAsync("foo", "2"u8.ToArray(), default); await store.AppendAsync("foo", "3"u8.ToArray(), default); await store.AppendAsync("bar", "4"u8.ToArray(), default); await store.AppendAsync("bar", "5"u8.ToArray(), default); // lastPerSubject=true: count only the last message per distinct subject. // 2 distinct subjects → Total == 2. var (total, _) = store.NumPending(1, ">", true); total.ShouldBe((ulong)2); // lastPerSubject=false: count all messages at or after sseq 1. var (totalAll, _) = store.NumPending(1, ">", false); totalAll.ShouldBe((ulong)5); // Filter to just "foo" with lastPerSubject=true → 1. var (fooLps, _) = store.NumPending(1, "foo", true); fooLps.ShouldBe((ulong)1); } // Test many distinct subjects. [Fact] public async Task Many_distinct_subjects() { await using var store = CreateStore("many-subjects"); for (var i = 0; i < 100; i++) await store.AppendAsync($"kv.{i}", Encoding.UTF8.GetBytes($"value-{i}"), default); var state = await store.GetStateAsync(default); state.Messages.ShouldBe((ulong)100); // Each subject should have exactly one message. for (var i = 0; i < 100; i++) { var last = await store.LoadLastBySubjectAsync($"kv.{i}", default); last.ShouldNotBeNull(); last!.Payload.ToArray().ShouldBe(Encoding.UTF8.GetBytes($"value-{i}")); } } }