Files
Joseph Doherty 78b4bc2486 refactor: extract NATS.Server.JetStream.Tests project
Move 225 JetStream-related test files from NATS.Server.Tests into a
dedicated NATS.Server.JetStream.Tests project. This includes root-level
JetStream*.cs files, storage test files (FileStore, MemStore,
StreamStoreContract), and the full JetStream/ subfolder tree (Api,
Cluster, Consumers, MirrorSource, Snapshots, Storage, Streams).

Updated all namespaces, added InternalsVisibleTo, registered in the
solution file, and added the JETSTREAM_INTEGRATION_MATRIX define.
2026-03-12 15:58:10 -04:00

290 lines
10 KiB
C#

// Reference: golang/nats-server/server/filestore_test.go
// Tests for Task A3: FileStore Block Manager Rewrite.
// Verifies that FileStore correctly uses MsgBlock-based storage:
// block files on disk, block rotation, recovery, purge, snapshot,
// soft-delete, and payload transformation (S2/AEAD) integration.
using System.Text;
using NATS.Server.JetStream.Storage;
namespace NATS.Server.JetStream.Tests.JetStream.Storage;
public sealed class FileStoreBlockTests : IDisposable
{
private readonly string _dir;
public FileStoreBlockTests()
{
_dir = Path.Combine(Path.GetTempPath(), $"nats-js-fs-block-{Guid.NewGuid():N}");
Directory.CreateDirectory(_dir);
}
public void Dispose()
{
if (Directory.Exists(_dir))
Directory.Delete(_dir, recursive: true);
}
private FileStore CreateStore(string subdirectory, FileStoreOptions? options = null)
{
var dir = Path.Combine(_dir, subdirectory);
var opts = options ?? new FileStoreOptions();
opts.Directory = dir;
return new FileStore(opts);
}
// Go: filestore.go block-based storage — verify .blk files are created on disk.
[Fact]
public async Task Append_UsesBlockStorage()
{
var subDir = "blk-storage";
var dir = Path.Combine(_dir, subDir);
await using var store = CreateStore(subDir);
await store.AppendAsync("foo", "Hello World"u8.ToArray(), default);
// At least one .blk file should exist in the store directory.
var blkFiles = Directory.GetFiles(dir, "*.blk");
blkFiles.Length.ShouldBeGreaterThanOrEqualTo(1);
// The old JSONL file should NOT exist.
File.Exists(Path.Combine(dir, "messages.jsonl")).ShouldBeFalse();
}
// Go: filestore.go block rotation — rbytes check causes new block creation.
[Fact]
public async Task MultiBlock_RotatesWhenFull()
{
var subDir = "blk-rotation";
var dir = Path.Combine(_dir, subDir);
// Small block size to force rotation quickly.
await using var store = CreateStore(subDir, new FileStoreOptions { BlockSizeBytes = 256 });
// Write enough messages to exceed 256 bytes per block.
for (var i = 0; i < 20; i++)
await store.AppendAsync("foo", "Hello World - block rotation test!"u8.ToArray(), default);
var state = await store.GetStateAsync(default);
state.Messages.ShouldBe((ulong)20);
// Multiple .blk files should be created.
var blkFiles = Directory.GetFiles(dir, "*.blk");
blkFiles.Length.ShouldBeGreaterThan(1);
// BlockCount should reflect multiple blocks.
store.BlockCount.ShouldBeGreaterThan(1);
}
// Go: filestore.go multi-block load — messages span multiple blocks.
[Fact]
public async Task Load_AcrossBlocks()
{
var subDir = "blk-across";
// Small block size to force multiple blocks.
await using var store = CreateStore(subDir, new FileStoreOptions { BlockSizeBytes = 256 });
for (var i = 0; i < 20; i++)
await store.AppendAsync("foo", Encoding.UTF8.GetBytes($"msg-{i:D4}"), default);
// Verify we have multiple blocks.
store.BlockCount.ShouldBeGreaterThan(1);
// All messages should be loadable, regardless of which block they are in.
for (ulong i = 1; i <= 20; i++)
{
var msg = await store.LoadAsync(i, default);
msg.ShouldNotBeNull();
msg!.Subject.ShouldBe("foo");
var expected = Encoding.UTF8.GetBytes($"msg-{(int)(i - 1):D4}");
msg.Payload.ToArray().ShouldBe(expected);
}
}
// Go: filestore.go recovery — block files are rescanned on startup.
[Fact]
public async Task Recovery_AfterRestart()
{
var subDir = "blk-recovery";
var dir = Path.Combine(_dir, subDir);
// Write data and dispose.
await using (var store = CreateStore(subDir, new FileStoreOptions { BlockSizeBytes = 256 }))
{
for (var i = 0; i < 20; i++)
await store.AppendAsync("foo", Encoding.UTF8.GetBytes($"msg-{i:D4}"), default);
var state = await store.GetStateAsync(default);
state.Messages.ShouldBe((ulong)20);
}
// .blk files should still exist after dispose.
var blkFiles = Directory.GetFiles(dir, "*.blk");
blkFiles.Length.ShouldBeGreaterThan(0);
// Recreate FileStore from the same directory.
await using (var store = CreateStore(subDir, new FileStoreOptions { BlockSizeBytes = 256 }))
{
var state = await store.GetStateAsync(default);
state.Messages.ShouldBe((ulong)20);
state.FirstSeq.ShouldBe((ulong)1);
state.LastSeq.ShouldBe((ulong)20);
// Verify all messages are intact.
for (ulong i = 1; i <= 20; i++)
{
var msg = await store.LoadAsync(i, default);
msg.ShouldNotBeNull();
var expected = Encoding.UTF8.GetBytes($"msg-{(int)(i - 1):D4}");
msg!.Payload.ToArray().ShouldBe(expected);
}
}
}
// Go: filestore.go purge — all blocks removed, fresh block created.
[Fact]
public async Task Purge_CleansAllBlocks()
{
var subDir = "blk-purge";
var dir = Path.Combine(_dir, subDir);
await using var store = CreateStore(subDir, new FileStoreOptions { BlockSizeBytes = 256 });
for (var i = 0; i < 20; i++)
await store.AppendAsync("foo", "Hello"u8.ToArray(), default);
// Before purge, multiple .blk files should exist.
Directory.GetFiles(dir, "*.blk").Length.ShouldBeGreaterThan(0);
await store.PurgeAsync(default);
var state = await store.GetStateAsync(default);
state.Messages.ShouldBe((ulong)0);
state.Bytes.ShouldBe((ulong)0);
// After purge, no old .blk files should remain (or they should be empty/recreated).
// The old JSONL file should also not exist.
File.Exists(Path.Combine(dir, "messages.jsonl")).ShouldBeFalse();
}
// Go: filestore.go dmap — soft-delete within a block.
[Fact]
public async Task Remove_SoftDeletesInBlock()
{
await using var store = CreateStore("blk-remove");
for (var i = 0; i < 5; i++)
await store.AppendAsync("foo", "data"u8.ToArray(), default);
// Remove sequence 3.
(await store.RemoveAsync(3, default)).ShouldBeTrue();
// Verify seq 3 returns null.
(await store.LoadAsync(3, default)).ShouldBeNull();
// Other sequences still loadable.
(await store.LoadAsync(1, default)).ShouldNotBeNull();
(await store.LoadAsync(2, default)).ShouldNotBeNull();
(await store.LoadAsync(4, default)).ShouldNotBeNull();
(await store.LoadAsync(5, default)).ShouldNotBeNull();
// State reflects the removal.
var state = await store.GetStateAsync(default);
state.Messages.ShouldBe((ulong)4);
}
// Go: filestore.go snapshot — iterates all blocks for snapshot creation.
[Fact]
public async Task Snapshot_IncludesAllBlocks()
{
await using var srcStore = CreateStore("blk-snap-src", new FileStoreOptions { BlockSizeBytes = 256 });
for (var i = 0; i < 30; i++)
await srcStore.AppendAsync("foo", Encoding.UTF8.GetBytes($"msg-{i}"), default);
// Verify multiple blocks.
srcStore.BlockCount.ShouldBeGreaterThan(1);
var snap = await srcStore.CreateSnapshotAsync(default);
snap.Length.ShouldBeGreaterThan(0);
// Restore into a new store.
await using var dstStore = CreateStore("blk-snap-dst");
await dstStore.RestoreSnapshotAsync(snap, default);
var srcState = await srcStore.GetStateAsync(default);
var dstState = await dstStore.GetStateAsync(default);
dstState.Messages.ShouldBe(srcState.Messages);
dstState.FirstSeq.ShouldBe(srcState.FirstSeq);
dstState.LastSeq.ShouldBe(srcState.LastSeq);
// Verify each message round-trips.
for (ulong i = 1; i <= srcState.Messages; i++)
{
var original = await srcStore.LoadAsync(i, default);
var copy = await dstStore.LoadAsync(i, default);
copy.ShouldNotBeNull();
copy!.Subject.ShouldBe(original!.Subject);
copy.Payload.ToArray().ShouldBe(original.Payload.ToArray());
}
}
// Go: filestore.go S2 compression — payload is compressed before block write.
[Fact]
public async Task Compression_RoundTrip()
{
var subDir = "blk-compress";
await using var store = CreateStore(subDir, new FileStoreOptions
{
Compression = StoreCompression.S2Compression,
});
var payload = "Hello, S2 compressed block storage!"u8.ToArray();
for (var i = 0; i < 10; i++)
await store.AppendAsync("foo", payload, default);
var state = await store.GetStateAsync(default);
state.Messages.ShouldBe((ulong)10);
// Verify all messages are readable with correct payload.
for (ulong i = 1; i <= 10; i++)
{
var msg = await store.LoadAsync(i, default);
msg.ShouldNotBeNull();
msg!.Payload.ToArray().ShouldBe(payload);
}
}
// Go: filestore.go AEAD encryption — payload is encrypted before block write.
[Fact]
public async Task Encryption_RoundTrip()
{
var subDir = "blk-encrypt";
var key = "nats-v2-test-key-exactly-32-bytes"u8[..32].ToArray();
await using var store = CreateStore(subDir, new FileStoreOptions
{
Cipher = StoreCipher.ChaCha,
EncryptionKey = key,
});
var payload = "Hello, AEAD encrypted block storage!"u8.ToArray();
for (var i = 0; i < 10; i++)
await store.AppendAsync("foo", payload, default);
var state = await store.GetStateAsync(default);
state.Messages.ShouldBe((ulong)10);
// Verify all messages are readable with correct payload.
for (ulong i = 1; i <= 10; i++)
{
var msg = await store.LoadAsync(i, default);
msg.ShouldNotBeNull();
msg!.Payload.ToArray().ShouldBe(payload);
}
}
}