// Reference: golang/nats-server/server/filestore_test.go // Go's testFileStoreAllPermutations (line 55) runs every test across 6 combinations: // {NoCipher, ChaCha, AES} x {NoCompression, S2Compression} // This file ports 16 representative tests from that matrix to .NET using // [Theory] + [MemberData] so each test case executes all 6 permutations // automatically, giving ~96 total executions. // // Covered Go tests (each appears 6 times): // TestFileStoreBasics (line 86) // TestFileStoreMsgHeaders (line 152) // TestFileStoreBasicWriteMsgsAndRestore (line 181) // TestFileStoreSelectNextFirst (line 304) // TestFileStoreMsgLimit (line 484) // TestFileStoreMsgLimitBug (line 518) // TestFileStoreBytesLimit (line 537) // TestFileStoreAgeLimit (line 616) // TestFileStoreTimeStamps (line 683) // TestFileStorePurge (line 710) // TestFileStoreCollapseDmap (line 1561) // TestFileStoreWriteAndReadSameBlock (line 1510) // TestFileStoreAndRetrieveMultiBlock (line 1527) // TestFileStoreSnapshot (line 1799) // TestFileStoreBasics (large payload variant) // TestFileStoreBasics (sequential ordering variant) using System.Text; using NATS.Server.JetStream.Storage; namespace NATS.Server.Tests.JetStream.Storage; public sealed class FileStorePermutationTests : IDisposable { private readonly string _dir; public FileStorePermutationTests() { _dir = Path.Combine(Path.GetTempPath(), $"nats-js-fs-perm-{Guid.NewGuid():N}"); Directory.CreateDirectory(_dir); } public void Dispose() { if (Directory.Exists(_dir)) Directory.Delete(_dir, recursive: true); } // ------------------------------------------------------------------------- // Permutation matrix: {NoCipher, ChaCha, Aes} x {NoCompression, S2Compression} // Mirrors Go's testFileStoreAllPermutations (filestore_test.go:55). // ------------------------------------------------------------------------- public static IEnumerable AllPermutations() { foreach (var cipher in new[] { StoreCipher.NoCipher, StoreCipher.ChaCha, StoreCipher.Aes }) foreach (var compression in new[] { StoreCompression.NoCompression, StoreCompression.S2Compression }) yield return [cipher, compression]; } /// /// Creates a FileStore wired for a specific cipher/compression permutation. /// Mirrors Go's prf() + newFileStoreWithCreated() pattern (filestore_test.go:73-84). /// private FileStore CreatePermutedStore(string subdir, StoreCipher cipher, StoreCompression compression, FileStoreOptions? extraOptions = null) { var dir = Path.Combine(_dir, subdir); byte[]? key = null; if (cipher != StoreCipher.NoCipher) { key = new byte[32]; Random.Shared.NextBytes(key); } var opts = extraOptions ?? new FileStoreOptions(); opts.Directory = dir; opts.Cipher = cipher; opts.Compression = compression; opts.EncryptionKey = key; // Keep the legacy boolean flags in sync so existing code paths are not confused. opts.EnableCompression = compression != StoreCompression.NoCompression; opts.EnableEncryption = cipher != StoreCipher.NoCipher; return new FileStore(opts); } /// /// Creates a permuted store re-using the same key as a previously-created store /// so that encrypted recovery tests can re-open with the correct key. /// private FileStore ReopenPermutedStore(string subdir, StoreCipher cipher, StoreCompression compression, byte[]? key, FileStoreOptions? extraOptions = null) { var dir = Path.Combine(_dir, subdir); var opts = extraOptions ?? new FileStoreOptions(); opts.Directory = dir; opts.Cipher = cipher; opts.Compression = compression; opts.EncryptionKey = key; opts.EnableCompression = compression != StoreCompression.NoCompression; opts.EnableEncryption = cipher != StoreCipher.NoCipher; return new FileStore(opts); } // Helper: build a stable subdir name from the permutation so test isolation is clear. private static string PermSubdir(string prefix, StoreCipher cipher, StoreCompression compression) => $"{prefix}-{cipher}-{compression}"; // Helper: extract the key from an already-created store's options by re-reading the // options that were supplied. Because we cannot reach into the store's private field, // we use a separate dictionary keyed by subdir name. private readonly Dictionary _keyStore = new(); private FileStore CreatePermutedStoreTracked(string subdir, StoreCipher cipher, StoreCompression compression, FileStoreOptions? extraOptions = null) { var dir = Path.Combine(_dir, subdir); byte[]? key = null; if (cipher != StoreCipher.NoCipher) { key = new byte[32]; Random.Shared.NextBytes(key); } _keyStore[subdir] = key; var opts = extraOptions ?? new FileStoreOptions(); opts.Directory = dir; opts.Cipher = cipher; opts.Compression = compression; opts.EncryptionKey = key; opts.EnableCompression = compression != StoreCompression.NoCompression; opts.EnableEncryption = cipher != StoreCipher.NoCipher; return new FileStore(opts); } private FileStore ReopenTrackedStore(string subdir, StoreCipher cipher, StoreCompression compression, FileStoreOptions? extraOptions = null) { var dir = Path.Combine(_dir, subdir); var key = _keyStore.GetValueOrDefault(subdir); var opts = extraOptions ?? new FileStoreOptions(); opts.Directory = dir; opts.Cipher = cipher; opts.Compression = compression; opts.EncryptionKey = key; opts.EnableCompression = compression != StoreCompression.NoCompression; opts.EnableEncryption = cipher != StoreCipher.NoCipher; return new FileStore(opts); } // ========================================================================= // Test 1: Basic store and load round-trip // Go: TestFileStoreBasics server/filestore_test.go:86 // ========================================================================= [Theory] [MemberData(nameof(AllPermutations))] public async Task Store_and_load_basic(StoreCipher cipher, StoreCompression compression) { // Go: TestFileStoreBasics line 86 — store 5 messages and load by sequence. var subdir = PermSubdir("basic", cipher, compression); await using var store = CreatePermutedStore(subdir, cipher, compression); const string subject = "foo"; var payload = "Hello World"u8.ToArray(); for (var i = 1; i <= 5; i++) { var seq = await store.AppendAsync(subject, payload, default); seq.ShouldBe((ulong)i); } var state = await store.GetStateAsync(default); state.Messages.ShouldBe((ulong)5); var msg2 = await store.LoadAsync(2, default); msg2.ShouldNotBeNull(); msg2!.Subject.ShouldBe(subject); msg2.Payload.ToArray().ShouldBe(payload); var msg3 = await store.LoadAsync(3, default); msg3.ShouldNotBeNull(); msg3!.Subject.ShouldBe(subject); } // ========================================================================= // Test 2: Store multiple messages, load by sequence // Go: TestFileStoreBasics server/filestore_test.go:86 (extended variant) // ========================================================================= [Theory] [MemberData(nameof(AllPermutations))] public async Task Store_multiple_messages_load_by_sequence(StoreCipher cipher, StoreCompression compression) { // Go: TestFileStoreBasics — verify every message is independently loadable. var subdir = PermSubdir("multi-seq", cipher, compression); await using var store = CreatePermutedStore(subdir, cipher, compression); const int count = 20; for (var i = 0; i < count; i++) { var payload = Encoding.UTF8.GetBytes($"msg-{i:D4}"); await store.AppendAsync("foo", payload, default); } var state = await store.GetStateAsync(default); state.Messages.ShouldBe((ulong)count); state.FirstSeq.ShouldBe((ulong)1); state.LastSeq.ShouldBe((ulong)count); for (ulong i = 1; i <= count; i++) { var msg = await store.LoadAsync(i, default); msg.ShouldNotBeNull(); msg!.Subject.ShouldBe("foo"); var expected = Encoding.UTF8.GetBytes($"msg-{(int)(i - 1):D4}"); msg.Payload.ToArray().ShouldBe(expected); } } // ========================================================================= // Test 3: LoadLastBySubjectAsync // Go: TestFileStoreBasics server/filestore_test.go:86 (per-subject lookup) // ========================================================================= [Theory] [MemberData(nameof(AllPermutations))] public async Task LoadLastBySubject_returns_most_recent_for_subject(StoreCipher cipher, StoreCompression compression) { // Go: TestFileStoreBasics — per-subject last-message lookup. var subdir = PermSubdir("last-subj", cipher, compression); await using var store = CreatePermutedStore(subdir, cipher, compression); await store.AppendAsync("foo", "first"u8.ToArray(), default); await store.AppendAsync("bar", "other"u8.ToArray(), default); await store.AppendAsync("foo", "second"u8.ToArray(), default); await store.AppendAsync("foo", "third"u8.ToArray(), default); var last = await store.LoadLastBySubjectAsync("foo", default); last.ShouldNotBeNull(); last!.Payload.ToArray().ShouldBe("third"u8.ToArray()); last.Sequence.ShouldBe((ulong)4); last.Subject.ShouldBe("foo"); // Non-existent subject returns null. (await store.LoadLastBySubjectAsync("does.not.exist", default)).ShouldBeNull(); } // ========================================================================= // Test 4: RemoveAsync single message // Go: TestFileStoreBasics server/filestore_test.go:129 // ========================================================================= [Theory] [MemberData(nameof(AllPermutations))] public async Task Remove_single_message_updates_state(StoreCipher cipher, StoreCompression compression) { // Go: TestFileStoreBasics remove section (line 129). var subdir = PermSubdir("remove-single", cipher, compression); await using var store = CreatePermutedStore(subdir, cipher, compression); var payload = "Hello World"u8.ToArray(); for (var i = 0; i < 5; i++) await store.AppendAsync("foo", payload, default); // Remove first (seq 1). (await store.RemoveAsync(1, default)).ShouldBeTrue(); (await store.GetStateAsync(default)).Messages.ShouldBe((ulong)4); // Remove last (seq 5). (await store.RemoveAsync(5, default)).ShouldBeTrue(); (await store.GetStateAsync(default)).Messages.ShouldBe((ulong)3); // Remove middle (seq 3). (await store.RemoveAsync(3, default)).ShouldBeTrue(); (await store.GetStateAsync(default)).Messages.ShouldBe((ulong)2); // Surviving sequences loadable. (await store.LoadAsync(2, default)).ShouldNotBeNull(); (await store.LoadAsync(4, default)).ShouldNotBeNull(); // Removed sequences return null. (await store.LoadAsync(1, default)).ShouldBeNull(); (await store.LoadAsync(3, default)).ShouldBeNull(); (await store.LoadAsync(5, default)).ShouldBeNull(); } // ========================================================================= // Test 5: PurgeAsync clears all messages // Go: TestFileStorePurge server/filestore_test.go:710 // ========================================================================= [Theory] [MemberData(nameof(AllPermutations))] public async Task Purge_clears_all_messages(StoreCipher cipher, StoreCompression compression) { // Go: TestFileStorePurge line 710 — purge empties the store. var subdir = PermSubdir("purge", cipher, compression); await using var store = CreatePermutedStore(subdir, cipher, compression); for (var i = 0; i < 20; i++) await store.AppendAsync("foo", "Hello"u8.ToArray(), default); (await store.GetStateAsync(default)).Messages.ShouldBe((ulong)20); await store.PurgeAsync(default); var state = await store.GetStateAsync(default); state.Messages.ShouldBe((ulong)0); state.Bytes.ShouldBe((ulong)0); // Can still append after purge. var seq = await store.AppendAsync("foo", "after purge"u8.ToArray(), default); seq.ShouldBeGreaterThan((ulong)0); var msg = await store.LoadAsync(seq, default); msg.ShouldNotBeNull(); msg!.Payload.ToArray().ShouldBe("after purge"u8.ToArray()); } // ========================================================================= // Test 6: TrimToMaxMessages enforcement // Go: TestFileStoreMsgLimitBug server/filestore_test.go:518 // ========================================================================= [Theory] [MemberData(nameof(AllPermutations))] public async Task TrimToMaxMessages_enforces_limit(StoreCipher cipher, StoreCompression compression) { // Go: TestFileStoreMsgLimitBug line 518. var subdir = PermSubdir("trim-limit", cipher, compression); await using var store = CreatePermutedStore(subdir, cipher, compression); for (var i = 0; i < 10; i++) await store.AppendAsync("foo", "Hello World"u8.ToArray(), default); store.TrimToMaxMessages(5); var state = await store.GetStateAsync(default); state.Messages.ShouldBe((ulong)5); state.FirstSeq.ShouldBe((ulong)6); state.LastSeq.ShouldBe((ulong)10); // Evicted messages not loadable. for (ulong i = 1; i <= 5; i++) (await store.LoadAsync(i, default)).ShouldBeNull(); // Remaining messages loadable. for (ulong i = 6; i <= 10; i++) (await store.LoadAsync(i, default)).ShouldNotBeNull(); } // ========================================================================= // Test 7: Block rotation when exceeding block size // Go: TestFileStoreAndRetrieveMultiBlock server/filestore_test.go:1527 // ========================================================================= [Theory] [MemberData(nameof(AllPermutations))] public async Task Block_rotation_when_exceeding_block_size(StoreCipher cipher, StoreCompression compression) { // Go: TestFileStoreAndRetrieveMultiBlock line 1527 — small block forces rotation. // Both the initial and the reopened store must share the same key so // the encrypted data file can be decrypted on reopen. var subdir = PermSubdir("multi-block", cipher, compression); // Generate a single key for the lifetime of this test (reopen must reuse it). byte[]? key = null; if (cipher != StoreCipher.NoCipher) { key = new byte[32]; Random.Shared.NextBytes(key); } var opts1 = new FileStoreOptions { BlockSizeBytes = 256, Cipher = cipher, Compression = compression, EncryptionKey = key, EnableCompression = compression != StoreCompression.NoCompression, EnableEncryption = cipher != StoreCipher.NoCipher, }; opts1.Directory = Path.Combine(_dir, subdir); await using (var store = new FileStore(opts1)) { for (var i = 0; i < 20; i++) await store.AppendAsync("foo", "Hello World!"u8.ToArray(), default); var state = await store.GetStateAsync(default); state.Messages.ShouldBe((ulong)20); // With a 256-byte block and ~100 bytes per record, multiple blocks form. store.BlockCount.ShouldBeGreaterThan(1); } // Reopen with the same key — all messages must survive block rotation. var opts2 = new FileStoreOptions { BlockSizeBytes = 256, Cipher = cipher, Compression = compression, EncryptionKey = key, EnableCompression = compression != StoreCompression.NoCompression, EnableEncryption = cipher != StoreCipher.NoCipher, }; opts2.Directory = Path.Combine(_dir, subdir); await using (var store = new FileStore(opts2)) { for (ulong i = 1; i <= 20; i++) { var msg = await store.LoadAsync(i, default); msg.ShouldNotBeNull(); msg!.Subject.ShouldBe("foo"); } } } // ========================================================================= // Test 8: GetStateAsync returns correct counts // Go: TestFileStoreBasics server/filestore_test.go:104 // ========================================================================= [Theory] [MemberData(nameof(AllPermutations))] public async Task GetState_returns_correct_counts(StoreCipher cipher, StoreCompression compression) { // Go: TestFileStoreBasics — state tracks Msgs, Bytes, FirstSeq, LastSeq. var subdir = PermSubdir("state-counts", cipher, compression); await using var store = CreatePermutedStore(subdir, cipher, compression); var state = await store.GetStateAsync(default); state.Messages.ShouldBe((ulong)0); state.Bytes.ShouldBe((ulong)0); state.FirstSeq.ShouldBe((ulong)0); state.LastSeq.ShouldBe((ulong)0); var payload = new byte[100]; for (var i = 0; i < 5; i++) await store.AppendAsync("foo", payload, default); state = await store.GetStateAsync(default); state.Messages.ShouldBe((ulong)5); state.Bytes.ShouldBe((ulong)(5 * 100)); state.FirstSeq.ShouldBe((ulong)1); state.LastSeq.ShouldBe((ulong)5); // Remove first and last — state updates accordingly. await store.RemoveAsync(1, default); await store.RemoveAsync(5, default); state = await store.GetStateAsync(default); state.Messages.ShouldBe((ulong)3); state.FirstSeq.ShouldBe((ulong)2); state.LastSeq.ShouldBe((ulong)4); state.Bytes.ShouldBe((ulong)(3 * 100)); } // ========================================================================= // Test 9: CreateSnapshotAsync and RestoreSnapshotAsync round-trip // Go: TestFileStoreSnapshot server/filestore_test.go:1799 // ========================================================================= [Theory] [MemberData(nameof(AllPermutations))] public async Task Snapshot_and_restore_round_trip(StoreCipher cipher, StoreCompression compression) { // Go: TestFileStoreSnapshot line 1799. // // The snapshot blob is produced by CreateSnapshotAsync which calls // TransformForPersist on each message (i.e. the data is encrypted with the // src store's key before being embedded in the snapshot). RestoreSnapshotAsync // then calls RestorePayload on those bytes using its own store's key. // Therefore src and dst MUST share the same key for encrypted permutations. var srcSubdir = PermSubdir("snap-src", cipher, compression); var dstSubdir = PermSubdir("snap-dst", cipher, compression); // One key shared by both stores. byte[]? sharedKey = null; if (cipher != StoreCipher.NoCipher) { sharedKey = new byte[32]; Random.Shared.NextBytes(sharedKey); } var srcOpts = new FileStoreOptions { Cipher = cipher, Compression = compression, EncryptionKey = sharedKey, EnableCompression = compression != StoreCompression.NoCompression, EnableEncryption = cipher != StoreCipher.NoCipher, }; srcOpts.Directory = Path.Combine(_dir, srcSubdir); await using var src = new FileStore(srcOpts); for (var i = 0; i < 30; i++) await src.AppendAsync("foo", Encoding.UTF8.GetBytes($"msg-{i}"), default); var snap = await src.CreateSnapshotAsync(default); snap.Length.ShouldBeGreaterThan(0); var dstOpts = new FileStoreOptions { Cipher = cipher, Compression = compression, EncryptionKey = sharedKey, EnableCompression = compression != StoreCompression.NoCompression, EnableEncryption = cipher != StoreCipher.NoCipher, }; dstOpts.Directory = Path.Combine(_dir, dstSubdir); await using var dst = new FileStore(dstOpts); await dst.RestoreSnapshotAsync(snap, default); var srcState = await src.GetStateAsync(default); var dstState = await dst.GetStateAsync(default); dstState.Messages.ShouldBe(srcState.Messages); dstState.FirstSeq.ShouldBe(srcState.FirstSeq); dstState.LastSeq.ShouldBe(srcState.LastSeq); // Verify every message round-trips correctly. for (ulong i = 1; i <= srcState.Messages; i++) { var original = await src.LoadAsync(i, default); var copy = await dst.LoadAsync(i, default); copy.ShouldNotBeNull(); copy!.Subject.ShouldBe(original!.Subject); copy.Payload.ToArray().ShouldBe(original.Payload.ToArray()); } } // ========================================================================= // Test 10: ListAsync returns ordered messages // Go: TestFileStoreTimeStamps server/filestore_test.go:683 // ========================================================================= [Theory] [MemberData(nameof(AllPermutations))] public async Task ListAsync_returns_ordered_messages(StoreCipher cipher, StoreCompression compression) { // Go: TestFileStoreTimeStamps line 683 — messages returned in sequence order. var subdir = PermSubdir("list-ordered", cipher, compression); await using var store = CreatePermutedStore(subdir, cipher, compression); await store.AppendAsync("foo", "one"u8.ToArray(), default); await store.AppendAsync("bar", "two"u8.ToArray(), default); await store.AppendAsync("baz", "three"u8.ToArray(), default); var messages = await store.ListAsync(default); messages.Count.ShouldBe(3); messages[0].Sequence.ShouldBe((ulong)1); messages[1].Sequence.ShouldBe((ulong)2); messages[2].Sequence.ShouldBe((ulong)3); messages[0].Subject.ShouldBe("foo"); messages[1].Subject.ShouldBe("bar"); messages[2].Subject.ShouldBe("baz"); } // ========================================================================= // Test 11: Max age TTL prunes expired messages // Go: TestFileStoreAgeLimit server/filestore_test.go:616 (partial — skip // compression/cipher guard that Go applies to some variants) // ========================================================================= [Theory] [MemberData(nameof(AllPermutations))] public async Task MaxAge_prunes_expired_messages(StoreCipher cipher, StoreCompression compression) { // Go: TestFileStoreAgeLimit line 616. var subdir = PermSubdir("max-age", cipher, compression); var opts = new FileStoreOptions { MaxAgeMs = 200 }; await using var store = CreatePermutedStore(subdir, cipher, compression, opts); for (var i = 0; i < 5; i++) await store.AppendAsync("foo", "Hello World"u8.ToArray(), default); (await store.GetStateAsync(default)).Messages.ShouldBe((ulong)5); // Wait for messages to age out. await Task.Delay(350); // Trigger pruning by appending a new message. await store.AppendAsync("foo", "trigger"u8.ToArray(), default); var state = await store.GetStateAsync(default); // Only the freshly-appended trigger message should remain. state.Messages.ShouldBe((ulong)1); } // ========================================================================= // Test 12: Recovery after reopen // Go: TestFileStoreBasicWriteMsgsAndRestore server/filestore_test.go:181 // ========================================================================= [Theory] [MemberData(nameof(AllPermutations))] public async Task Recovery_after_reopen_preserves_messages(StoreCipher cipher, StoreCompression compression) { // Go: TestFileStoreBasicWriteMsgsAndRestore line 181 — stop and restart. var subdir = PermSubdir("recovery", cipher, compression); byte[]? key = null; if (cipher != StoreCipher.NoCipher) { key = new byte[32]; Random.Shared.NextBytes(key); } _keyStore[subdir] = key; await using (var store = ReopenPermutedStore(subdir, cipher, compression, key)) { for (var i = 1; i <= 100; i++) { var payload = Encoding.UTF8.GetBytes($"[{i:D8}] Hello World!"); var seq = await store.AppendAsync("foo", payload, default); seq.ShouldBe((ulong)i); } (await store.GetStateAsync(default)).Messages.ShouldBe((ulong)100); } // Reopen with same key and verify all 100 messages survived. await using (var store = ReopenPermutedStore(subdir, cipher, compression, key)) { var state = await store.GetStateAsync(default); state.Messages.ShouldBe((ulong)100); state.FirstSeq.ShouldBe((ulong)1); state.LastSeq.ShouldBe((ulong)100); // Spot-check a few messages. var msg1 = await store.LoadAsync(1, default); msg1.ShouldNotBeNull(); msg1!.Payload.ToArray().ShouldBe(Encoding.UTF8.GetBytes("[00000001] Hello World!")); var msg50 = await store.LoadAsync(50, default); msg50.ShouldNotBeNull(); msg50!.Payload.ToArray().ShouldBe(Encoding.UTF8.GetBytes("[00000050] Hello World!")); } } // ========================================================================= // Test 13: Large payload (64 KB) store and load // Go: TestFileStoreBasics server/filestore_test.go:86 (large payload variant) // ========================================================================= [Theory] [MemberData(nameof(AllPermutations))] public async Task Large_payload_store_and_load(StoreCipher cipher, StoreCompression compression) { // Go: TestFileStoreBasics — large random payloads must round-trip exactly. var subdir = PermSubdir("large-payload", cipher, compression); await using var store = CreatePermutedStore(subdir, cipher, compression); var payload = new byte[64 * 1024]; // 64 KiB Random.Shared.NextBytes(payload); var seq = await store.AppendAsync("foo", payload, default); seq.ShouldBe((ulong)1); var msg = await store.LoadAsync(1, default); msg.ShouldNotBeNull(); msg!.Subject.ShouldBe("foo"); msg.Payload.ToArray().ShouldBe(payload); } // ========================================================================= // Test 14: Multiple subjects, filter by subject // Go: TestFileStoreBasics server/filestore_test.go:86 (multi-subject variant) // ========================================================================= [Theory] [MemberData(nameof(AllPermutations))] public async Task Multiple_subjects_filter_by_subject(StoreCipher cipher, StoreCompression compression) { // Go: TestFileStoreBasics — multiple subjects stored; each LoadLastBySubject // returns the correct one. var subdir = PermSubdir("multi-subj", cipher, compression); await using var store = CreatePermutedStore(subdir, cipher, compression); await store.AppendAsync("foo.bar", "one"u8.ToArray(), default); await store.AppendAsync("baz.qux", "two"u8.ToArray(), default); await store.AppendAsync("foo.bar", "three"u8.ToArray(), default); await store.AppendAsync("baz.qux", "four"u8.ToArray(), default); var state = await store.GetStateAsync(default); state.Messages.ShouldBe((ulong)4); // Check each message's subject. (await store.LoadAsync(1, default))!.Subject.ShouldBe("foo.bar"); (await store.LoadAsync(2, default))!.Subject.ShouldBe("baz.qux"); (await store.LoadAsync(3, default))!.Subject.ShouldBe("foo.bar"); (await store.LoadAsync(4, default))!.Subject.ShouldBe("baz.qux"); // LoadLastBySubject picks the correct last message per subject. var lastFoo = await store.LoadLastBySubjectAsync("foo.bar", default); lastFoo.ShouldNotBeNull(); lastFoo!.Sequence.ShouldBe((ulong)3); lastFoo.Payload.ToArray().ShouldBe("three"u8.ToArray()); var lastBaz = await store.LoadLastBySubjectAsync("baz.qux", default); lastBaz.ShouldNotBeNull(); lastBaz!.Sequence.ShouldBe((ulong)4); lastBaz.Payload.ToArray().ShouldBe("four"u8.ToArray()); } // ========================================================================= // Test 15: Sequential writes maintain sequence ordering // Go: TestFileStoreSelectNextFirst server/filestore_test.go:304 // ========================================================================= [Theory] [MemberData(nameof(AllPermutations))] public async Task Sequential_writes_maintain_ordering(StoreCipher cipher, StoreCompression compression) { // Go: TestFileStoreSelectNextFirst line 304 — remove a run, verify FirstSeq jumps. var subdir = PermSubdir("seq-order", cipher, compression); await using var store = CreatePermutedStore(subdir, cipher, compression); for (var i = 0; i < 10; i++) await store.AppendAsync("zzz", "Hello World"u8.ToArray(), default); (await store.GetStateAsync(default)).Messages.ShouldBe((ulong)10); // Delete 2-7 forming a contiguous gap. for (var i = 2; i <= 7; i++) (await store.RemoveAsync((ulong)i, default)).ShouldBeTrue(); var state = await store.GetStateAsync(default); state.Messages.ShouldBe((ulong)4); state.FirstSeq.ShouldBe((ulong)1); // Remove seq 1 — first should jump to 8. (await store.RemoveAsync(1, default)).ShouldBeTrue(); state = await store.GetStateAsync(default); state.Messages.ShouldBe((ulong)3); state.FirstSeq.ShouldBe((ulong)8); // Sequences 8, 9, 10 must be loadable. for (ulong i = 8; i <= 10; i++) (await store.LoadAsync(i, default)).ShouldNotBeNull(); } // ========================================================================= // Test 16: Store to new directory, verify files created on disk // Go: TestFileStoreBasics server/filestore_test.go:86 (disk-presence variant) // ========================================================================= [Theory] [MemberData(nameof(AllPermutations))] public async Task Store_creates_files_on_disk(StoreCipher cipher, StoreCompression compression) { // Go: TestFileStoreBasics — the store must actually persist data on disk. var subdir = PermSubdir("disk-presence", cipher, compression); var dir = Path.Combine(_dir, subdir); await using var store = CreatePermutedStore(subdir, cipher, compression); await store.AppendAsync("foo", "Hello World"u8.ToArray(), default); // The store directory must exist. Directory.Exists(dir).ShouldBeTrue(); // At least one file must be present (data file or manifest). var files = Directory.GetFiles(dir, "*", SearchOption.AllDirectories); files.Length.ShouldBeGreaterThan(0); } // ========================================================================= // Test 17: Write-and-read in the same block // Go: TestFileStoreWriteAndReadSameBlock server/filestore_test.go:1510 // ========================================================================= [Theory] [MemberData(nameof(AllPermutations))] public async Task Write_and_read_same_block(StoreCipher cipher, StoreCompression compression) { // Go: TestFileStoreWriteAndReadSameBlock line 1510 — interleaved store+load. var subdir = PermSubdir("same-block", cipher, compression); await using var store = CreatePermutedStore(subdir, cipher, compression); const string subject = "foo"; var payload = "Hello World!"u8.ToArray(); for (ulong i = 1; i <= 10; i++) { var seq = await store.AppendAsync(subject, payload, default); seq.ShouldBe(i); var msg = await store.LoadAsync(i, default); msg.ShouldNotBeNull(); msg!.Subject.ShouldBe(subject); msg.Payload.ToArray().ShouldBe(payload); } } // ========================================================================= // Test 18: Timestamps are non-decreasing // Go: TestFileStoreTimeStamps server/filestore_test.go:683 // ========================================================================= [Theory] [MemberData(nameof(AllPermutations))] public async Task Stored_messages_have_non_decreasing_timestamps(StoreCipher cipher, StoreCompression compression) { // Go: TestFileStoreTimeStamps line 683. var subdir = PermSubdir("timestamps", cipher, compression); await using var store = CreatePermutedStore(subdir, cipher, compression); for (var i = 0; i < 10; i++) await store.AppendAsync("foo", "Hello World"u8.ToArray(), default); var messages = await store.ListAsync(default); messages.Count.ShouldBe(10); DateTime? previous = null; foreach (var msg in messages) { if (previous.HasValue) msg.TimestampUtc.ShouldBeGreaterThanOrEqualTo(previous.Value); previous = msg.TimestampUtc; } } // ========================================================================= // Test 19: CollapseDmap — out-of-order removes, FirstSeq collapses properly // Go: TestFileStoreCollapseDmap server/filestore_test.go:1561 // ========================================================================= [Theory] [MemberData(nameof(AllPermutations))] public async Task Remove_out_of_order_collapses_first_seq(StoreCipher cipher, StoreCompression compression) { // Go: TestFileStoreCollapseDmap line 1561. var subdir = PermSubdir("dmap", cipher, compression); await using var store = CreatePermutedStore(subdir, cipher, compression); for (var i = 0; i < 10; i++) await store.AppendAsync("foo", "Hello World!"u8.ToArray(), default); (await store.GetStateAsync(default)).Messages.ShouldBe((ulong)10); // Remove out of order, forming gaps. (await store.RemoveAsync(2, default)).ShouldBeTrue(); (await store.RemoveAsync(4, default)).ShouldBeTrue(); (await store.RemoveAsync(8, default)).ShouldBeTrue(); var state = await store.GetStateAsync(default); state.Messages.ShouldBe((ulong)7); // Remove first — seq 1 gone, FirstSeq advances to 3. (await store.RemoveAsync(1, default)).ShouldBeTrue(); state = await store.GetStateAsync(default); state.Messages.ShouldBe((ulong)6); state.FirstSeq.ShouldBe((ulong)3); // Remove seq 3 — FirstSeq advances to 5. (await store.RemoveAsync(3, default)).ShouldBeTrue(); state = await store.GetStateAsync(default); state.Messages.ShouldBe((ulong)5); state.FirstSeq.ShouldBe((ulong)5); } // ========================================================================= // Test 20: Snapshot after removes — removed sequences absent from restore // Go: TestFileStoreSnapshot server/filestore_test.go:1904 // ========================================================================= [Theory] [MemberData(nameof(AllPermutations))] public async Task Snapshot_after_removes_preserves_remaining(StoreCipher cipher, StoreCompression compression) { // Go: TestFileStoreSnapshot line 1904 — snapshot taken after removes; removed // sequences must not appear in the restored store. // // src and dst share the same key: see comment in Snapshot_and_restore_round_trip. var srcSubdir = PermSubdir("snap-rm-src", cipher, compression); var dstSubdir = PermSubdir("snap-rm-dst", cipher, compression); byte[]? sharedKey = null; if (cipher != StoreCipher.NoCipher) { sharedKey = new byte[32]; Random.Shared.NextBytes(sharedKey); } var srcOpts = new FileStoreOptions { Cipher = cipher, Compression = compression, EncryptionKey = sharedKey, EnableCompression = compression != StoreCompression.NoCompression, EnableEncryption = cipher != StoreCipher.NoCipher, }; srcOpts.Directory = Path.Combine(_dir, srcSubdir); await using var src = new FileStore(srcOpts); for (var i = 0; i < 20; i++) await src.AppendAsync("foo", Encoding.UTF8.GetBytes($"msg-{i}"), default); // Remove first 5 messages. for (ulong i = 1; i <= 5; i++) await src.RemoveAsync(i, default); var snap = await src.CreateSnapshotAsync(default); var dstOpts = new FileStoreOptions { Cipher = cipher, Compression = compression, EncryptionKey = sharedKey, EnableCompression = compression != StoreCompression.NoCompression, EnableEncryption = cipher != StoreCipher.NoCipher, }; dstOpts.Directory = Path.Combine(_dir, dstSubdir); await using var dst = new FileStore(dstOpts); await dst.RestoreSnapshotAsync(snap, default); var dstState = await dst.GetStateAsync(default); dstState.Messages.ShouldBe((ulong)15); dstState.FirstSeq.ShouldBe((ulong)6); // Removed sequences must not be present. for (ulong i = 1; i <= 5; i++) (await dst.LoadAsync(i, default)).ShouldBeNull(); // Remaining sequences must be present. for (ulong i = 6; i <= 20; i++) (await dst.LoadAsync(i, default)).ShouldNotBeNull(); } }