Files
Joseph Doherty 4de691c9c5 perf: add FileStore buffered writes, O(1) state tracking, and eliminate redundant per-publish work
Implement Go-parity background flush loop (coalesce 16KB/8ms) in MsgBlock/FileStore,
replace O(n) GetStateAsync with incremental counters, skip PruneExpired/LoadAsync/
PrunePerSubject when not needed, and bypass RAFT for single-replica streams. Fix counter
tracking bugs in RemoveMsg/EraseMsg/TTL expiry and ObjectDisposedException races in
flush loop disposal. FileStore optimizations verified with 3112/3112 JetStream tests
passing; async publish benchmark remains at ~174 msg/s due to E2E protocol path bottleneck.
2026-03-13 03:11:11 -04:00

362 lines
13 KiB
C#

// Reference: golang/nats-server/server/filestore_test.go
// Tests ported from: TestFileStoreEncrypted,
// TestFileStoreRestoreEncryptedWithNoKeyFuncFails,
// TestFileStoreDoubleCompactWithWriteInBetweenEncryptedBug,
// TestFileStoreEncryptedKeepIndexNeedBekResetBug,
// TestFileStoreShortIndexWriteBug (encryption variant)
using System.Text;
using NATS.Server.JetStream.Storage;
namespace NATS.Server.JetStream.Tests.JetStream.Storage;
public sealed class FileStoreEncryptionTests : IDisposable
{
private readonly string _dir;
public FileStoreEncryptionTests()
{
_dir = Path.Combine(Path.GetTempPath(), $"nats-js-fs-enc-{Guid.NewGuid():N}");
Directory.CreateDirectory(_dir);
}
public void Dispose()
{
if (Directory.Exists(_dir))
Directory.Delete(_dir, recursive: true);
}
private static byte[] TestKey => "nats-encryption-key-for-test!!"u8.ToArray();
private FileStore CreateStore(string subdirectory, bool encrypt = true, byte[]? key = null)
{
var dir = Path.Combine(_dir, subdirectory);
return new FileStore(new FileStoreOptions
{
Directory = dir,
EnableEncryption = encrypt,
EncryptionKey = key ?? TestKey,
});
}
// Go: TestFileStoreEncrypted server/filestore_test.go:4204
[Fact]
public async Task Encrypted_store_and_load()
{
await using var store = CreateStore("enc-basic");
const string subject = "foo";
var payload = "aes ftw"u8.ToArray();
for (var i = 0; i < 50; i++)
await store.AppendAsync(subject, payload, default);
var state = await store.GetStateAsync(default);
state.Messages.ShouldBe((ulong)50);
var msg = await store.LoadAsync(10, default);
msg.ShouldNotBeNull();
msg!.Subject.ShouldBe(subject);
msg.Payload.ToArray().ShouldBe(payload);
}
// Go: TestFileStoreEncrypted server/filestore_test.go:4228
[Fact]
public async Task Encrypted_store_and_recover()
{
var subDir = "enc-recover";
await using (var store = CreateStore(subDir))
{
for (var i = 0; i < 50; i++)
await store.AppendAsync("foo", "aes ftw"u8.ToArray(), default);
}
// Reopen with the same key.
await using (var store = CreateStore(subDir))
{
var msg = await store.LoadAsync(10, default);
msg.ShouldNotBeNull();
msg!.Payload.ToArray().ShouldBe("aes ftw"u8.ToArray());
var state = await store.GetStateAsync(default);
state.Messages.ShouldBe((ulong)50);
}
}
// Go: TestFileStoreRestoreEncryptedWithNoKeyFuncFails server/filestore_test.go:5134
[Fact]
public async Task Encrypted_data_without_key_throws_on_load()
{
var subDir = "enc-no-key";
var dir = Path.Combine(_dir, subDir);
// Store with encryption.
await using (var store = CreateStore(subDir))
{
await store.AppendAsync("foo", "secret data"u8.ToArray(), default);
}
// Reopen with a wrong key. The FileStore constructor calls LoadExisting()
// which calls RestorePayload(), and that throws InvalidDataException when
// the envelope key-hash does not match the configured key.
var createWithWrongKey = () => new FileStore(new FileStoreOptions
{
Directory = dir,
EnableEncryption = true,
EncryptionKey = "wrong-key-wrong-key-wrong-key!!"u8.ToArray(),
EnablePayloadIntegrityChecks = true,
});
Should.Throw<InvalidDataException>(createWithWrongKey);
await Task.CompletedTask;
}
// Go: TestFileStoreEncrypted server/filestore_test.go:4204
[Fact]
public async Task Encrypted_store_remove_and_reload()
{
await using var store = CreateStore("enc-remove");
for (var i = 0; i < 10; i++)
await store.AppendAsync("foo", Encoding.UTF8.GetBytes($"msg-{i}"), default);
await store.RemoveAsync(5, default);
var state = await store.GetStateAsync(default);
state.Messages.ShouldBe((ulong)9);
(await store.LoadAsync(5, default)).ShouldBeNull();
(await store.LoadAsync(6, default)).ShouldNotBeNull();
}
// Go: TestFileStoreEncrypted server/filestore_test.go:4204
[Fact]
public async Task Encrypted_purge_and_continue()
{
await using var store = CreateStore("enc-purge");
for (var i = 0; i < 10; i++)
await store.AppendAsync("foo", "data"u8.ToArray(), default);
await store.PurgeAsync(default);
(await store.GetStateAsync(default)).Messages.ShouldBe((ulong)0);
var seq = await store.AppendAsync("foo", "after purge"u8.ToArray(), default);
seq.ShouldBeGreaterThan((ulong)0);
var msg = await store.LoadAsync(seq, default);
msg.ShouldNotBeNull();
msg!.Payload.ToArray().ShouldBe("after purge"u8.ToArray());
}
// Go: TestFileStoreEncrypted server/filestore_test.go:4204
[Fact]
public async Task Encrypted_snapshot_and_restore()
{
await using var store = CreateStore("enc-snap-src");
for (var i = 0; i < 20; i++)
await store.AppendAsync("foo", Encoding.UTF8.GetBytes($"msg-{i}"), default);
var snap = await store.CreateSnapshotAsync(default);
snap.Length.ShouldBeGreaterThan(0);
await using var restored = CreateStore("enc-snap-dst");
await restored.RestoreSnapshotAsync(snap, default);
var srcState = await store.GetStateAsync(default);
var dstState = await restored.GetStateAsync(default);
dstState.Messages.ShouldBe(srcState.Messages);
for (ulong i = 1; i <= srcState.Messages; i++)
{
var original = await store.LoadAsync(i, default);
var copy = await restored.LoadAsync(i, default);
copy.ShouldNotBeNull();
copy!.Payload.ToArray().ShouldBe(original!.Payload.ToArray());
}
}
// Go: TestFileStoreEncrypted server/filestore_test.go:4204
[Fact]
public async Task Encrypted_large_payload()
{
await using var store = CreateStore("enc-large");
var payload = new byte[8192];
Random.Shared.NextBytes(payload);
await store.AppendAsync("foo", payload, default);
var msg = await store.LoadAsync(1, default);
msg.ShouldNotBeNull();
msg!.Payload.ToArray().ShouldBe(payload);
}
// Go: TestFileStoreEncrypted server/filestore_test.go:4204
[Fact]
public async Task Encrypted_binary_payload_round_trips()
{
await using var store = CreateStore("enc-binary");
// All byte values.
var payload = new byte[256];
for (var i = 0; i < 256; i++)
payload[i] = (byte)i;
await store.AppendAsync("foo", payload, default);
var msg = await store.LoadAsync(1, default);
msg.ShouldNotBeNull();
msg!.Payload.ToArray().ShouldBe(payload);
}
// Go: TestFileStoreEncrypted server/filestore_test.go:4204
[Fact]
public async Task Encrypted_empty_payload()
{
await using var store = CreateStore("enc-empty");
await store.AppendAsync("foo", ReadOnlyMemory<byte>.Empty, default);
var msg = await store.LoadAsync(1, default);
msg.ShouldNotBeNull();
msg!.Payload.Length.ShouldBe(0);
}
// Go: TestFileStoreDoubleCompactWithWriteInBetweenEncryptedBug server/filestore_test.go:3924
[Fact]
public async Task Encrypted_double_compact_with_write_in_between()
{
await using var store = CreateStore("enc-double-compact");
const string subject = "foo";
var payload = "ouch"u8.ToArray();
// Write 10 messages (seqs 1-10).
for (var i = 0; i < 10; i++)
await store.AppendAsync(subject, payload, default);
// First compact: remove seqs 1-4 (seq < 5).
store.Compact(5);
// 6 messages remain (seqs 5-10).
var stateAfterFirstCompact = await store.GetStateAsync(default);
stateAfterFirstCompact.Messages.ShouldBe(6UL);
stateAfterFirstCompact.LastSeq.ShouldBe(10UL);
// Write 5 more messages (seqs 11-15).
for (var i = 0; i < 5; i++)
await store.AppendAsync(subject, payload, default);
// Second compact: remove seqs 5-9 (seq < 10).
store.Compact(10);
// 6 messages remain (seqs 10-15).
var stateAfterSecondCompact = await store.GetStateAsync(default);
stateAfterSecondCompact.Messages.ShouldBe(6UL);
stateAfterSecondCompact.LastSeq.ShouldBe(15UL);
stateAfterSecondCompact.FirstSeq.ShouldBe(10UL);
// All remaining messages (seqs 10-15) must be loadable and readable.
for (var seq = 10UL; seq <= 15UL; seq++)
{
var msg = await store.LoadAsync(seq, default);
msg.ShouldNotBeNull($"seq {seq} should still be loadable after double compact");
msg!.Subject.ShouldBe(subject);
msg.Payload.ToArray().ShouldBe(payload);
}
// Compacted-away sequences must not be loadable.
(await store.LoadAsync(1, default)).ShouldBeNull();
(await store.LoadAsync(9, default)).ShouldBeNull();
}
// Go: TestFileStoreEncryptedKeepIndexNeedBekResetBug server/filestore_test.go:3956
// Verifies that after all messages in a block are removed (leaving the block empty),
// subsequent writes to that block are readable — i.e., the block encryption key
// (BEK) is correctly reset when new data follows a fully-emptied block.
// Go: TestFileStoreEncryptedKeepIndexNeedBekResetBug server/filestore_test.go:3956
[Fact]
public async Task Encrypted_keep_index_bek_reset()
{
await using var store = CreateStore("enc-bek-reset");
var payload = "ouch"u8.ToArray();
// Write 5 messages (seqs 1-5) into the active block.
for (var i = 0; i < 5; i++)
await store.AppendAsync("foo", payload, default);
// Remove all 5 — the block is now empty, mirroring the Go test's TTL-expiry path.
for (var seq = 1UL; seq <= 5UL; seq++)
(await store.RemoveAsync(seq, default)).ShouldBeTrue();
var emptyState = await store.GetStateAsync(default);
emptyState.Messages.ShouldBe((ulong)0);
// Write 5 more messages into the same (now-empty) block.
// The BEK must be reset so that encryption/decryption is valid for the new data.
var firstNewSeq = await store.AppendAsync("foo", payload, default);
for (var i = 1; i < 5; i++)
await store.AppendAsync("foo", payload, default);
var state = await store.GetStateAsync(default);
state.Messages.ShouldBe((ulong)5);
// Every message written after the block was emptied must decrypt correctly.
var msg = await store.LoadAsync(firstNewSeq, default);
msg.ShouldNotBeNull();
msg!.Subject.ShouldBe("foo");
msg.Payload.ToArray().ShouldBe(payload);
// Spot-check the last seq as well.
var lastMsg = await store.LoadAsync(firstNewSeq + 4, default);
lastMsg.ShouldNotBeNull();
lastMsg!.Payload.ToArray().ShouldBe(payload);
}
// Verify encryption with no-op key (empty key) does not crash.
[Fact]
public async Task Encrypted_with_empty_key_is_noop()
{
var dir = Path.Combine(_dir, "enc-noop");
await using var store = new FileStore(new FileStoreOptions
{
Directory = dir,
EnableEncryption = true,
EncryptionKey = [],
});
await store.AppendAsync("foo", "data"u8.ToArray(), default);
var msg = await store.LoadAsync(1, default);
msg.ShouldNotBeNull();
msg!.Payload.ToArray().ShouldBe("data"u8.ToArray());
}
// Verify data at rest is not plaintext when encrypted.
[Fact]
public async Task Encrypted_data_not_plaintext_on_disk()
{
var subDir = "enc-disk-check";
var dir = Path.Combine(_dir, subDir);
await using (var store = CreateStore(subDir))
{
await store.AppendAsync("foo", "THIS IS SENSITIVE DATA"u8.ToArray(), default);
}
// Read the raw data file and verify the plaintext payload does not appear.
var dataFile = Path.Combine(dir, "messages.jsonl");
if (File.Exists(dataFile))
{
var raw = File.ReadAllText(dataFile);
// The payload is base64-encoded after encryption, so the original
// plaintext string should not appear verbatim.
raw.ShouldNotContain("THIS IS SENSITIVE DATA");
}
}
}