Files
natsdotnet/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreLimitsTests.cs
Joseph Doherty 4de691c9c5 perf: add FileStore buffered writes, O(1) state tracking, and eliminate redundant per-publish work
Implement Go-parity background flush loop (coalesce 16KB/8ms) in MsgBlock/FileStore,
replace O(n) GetStateAsync with incremental counters, skip PruneExpired/LoadAsync/
PrunePerSubject when not needed, and bypass RAFT for single-replica streams. Fix counter
tracking bugs in RemoveMsg/EraseMsg/TTL expiry and ObjectDisposedException races in
flush loop disposal. FileStore optimizations verified with 3112/3112 JetStream tests
passing; async publish benchmark remains at ~174 msg/s due to E2E protocol path bottleneck.
2026-03-13 03:11:11 -04:00

442 lines
16 KiB
C#

// Reference: golang/nats-server/server/filestore_test.go
// Tests ported from: TestFileStoreMsgLimit, TestFileStoreMsgLimitBug,
// TestFileStoreBytesLimit, TestFileStoreBytesLimitWithDiscardNew,
// TestFileStoreAgeLimit, TestFileStoreMaxMsgsPerSubject,
// TestFileStoreMaxMsgsAndMaxMsgsPerSubject,
// TestFileStoreUpdateMaxMsgsPerSubject
using System.Text;
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Storage;
namespace NATS.Server.JetStream.Tests.JetStream.Storage;
public sealed class FileStoreLimitsTests : IDisposable
{
private readonly string _dir;
public FileStoreLimitsTests()
{
_dir = Path.Combine(Path.GetTempPath(), $"nats-js-fs-limits-{Guid.NewGuid():N}");
Directory.CreateDirectory(_dir);
}
public void Dispose()
{
if (Directory.Exists(_dir))
Directory.Delete(_dir, recursive: true);
}
private FileStore CreateStore(string subdirectory, FileStoreOptions? options = null)
{
var dir = Path.Combine(_dir, subdirectory);
var opts = options ?? new FileStoreOptions();
opts.Directory = dir;
return new FileStore(opts);
}
// Go: TestFileStoreMsgLimit server/filestore_test.go:484
[Fact]
public async Task TrimToMaxMessages_maintains_limit()
{
await using var store = CreateStore("msg-limit");
for (var i = 0; i < 10; i++)
await store.AppendAsync("foo", "Hello World"u8.ToArray(), default);
(await store.GetStateAsync(default)).Messages.ShouldBe((ulong)10);
// Store one more, then trim.
await store.AppendAsync("foo", "Hello World"u8.ToArray(), default);
store.TrimToMaxMessages(10);
var state = await store.GetStateAsync(default);
state.Messages.ShouldBe((ulong)10);
state.LastSeq.ShouldBe((ulong)11);
state.FirstSeq.ShouldBe((ulong)2);
// Seq 1 should be evicted.
(await store.LoadAsync(1, default)).ShouldBeNull();
}
// Go: TestFileStoreMsgLimitBug server/filestore_test.go:518
[Fact]
public async Task TrimToMaxMessages_one_across_restart()
{
var subDir = "msg-limit-bug";
await using (var store = CreateStore(subDir))
{
await store.AppendAsync("foo", "Hello World"u8.ToArray(), default);
await store.AppendAsync("foo", "Hello World"u8.ToArray(), default);
store.TrimToMaxMessages(1);
}
// Reopen and store one more.
await using (var store = CreateStore(subDir))
{
var state = await store.GetStateAsync(default);
state.Messages.ShouldBe((ulong)1);
await store.AppendAsync("foo", "Hello World"u8.ToArray(), default);
store.TrimToMaxMessages(1);
state = await store.GetStateAsync(default);
state.Messages.ShouldBe((ulong)1);
}
}
// Go: TestFileStoreMsgLimit server/filestore_test.go:484
[Fact]
public async Task TrimToMaxMessages_repeated_trims()
{
await using var store = CreateStore("repeated-trim");
for (var i = 0; i < 20; i++)
await store.AppendAsync("foo", Encoding.UTF8.GetBytes($"msg-{i}"), default);
store.TrimToMaxMessages(10);
(await store.GetStateAsync(default)).Messages.ShouldBe((ulong)10);
(await store.GetStateAsync(default)).FirstSeq.ShouldBe((ulong)11);
store.TrimToMaxMessages(5);
(await store.GetStateAsync(default)).Messages.ShouldBe((ulong)5);
(await store.GetStateAsync(default)).FirstSeq.ShouldBe((ulong)16);
store.TrimToMaxMessages(1);
(await store.GetStateAsync(default)).Messages.ShouldBe((ulong)1);
(await store.GetStateAsync(default)).FirstSeq.ShouldBe((ulong)20);
}
// Go: TestFileStoreBytesLimit server/filestore_test.go:537
[Fact]
public async Task Bytes_accumulate_correctly()
{
await using var store = CreateStore("bytes-accum");
var payload = new byte[512];
const int count = 10;
for (var i = 0; i < count; i++)
await store.AppendAsync("foo", payload, default);
var state = await store.GetStateAsync(default);
state.Messages.ShouldBe((ulong)count);
state.Bytes.ShouldBe((ulong)(count * 512));
}
// Go: TestFileStoreBytesLimit server/filestore_test.go:537
[Fact]
public async Task TrimToMaxMessages_reduces_bytes()
{
await using var store = CreateStore("bytes-trim");
var payload = new byte[100];
for (var i = 0; i < 10; i++)
await store.AppendAsync("foo", payload, default);
var beforeState = await store.GetStateAsync(default);
beforeState.Bytes.ShouldBe((ulong)1000);
store.TrimToMaxMessages(5);
var afterState = await store.GetStateAsync(default);
afterState.Messages.ShouldBe((ulong)5);
afterState.Bytes.ShouldBe((ulong)500);
}
// Go: TestFileStoreAgeLimit server/filestore_test.go:616
[SlopwatchSuppress("SW004", "MaxAge TTL expiry test requires real wall-clock time to elapse; no synchronisation primitive can replace observing time-based expiration")]
[Fact]
public async Task MaxAge_expires_old_messages()
{
// MaxAgeMs = 200ms
await using var store = CreateStore("age-limit", new FileStoreOptions { MaxAgeMs = 200 });
for (var i = 0; i < 5; i++)
await store.AppendAsync("foo", "Hello World"u8.ToArray(), default);
(await store.GetStateAsync(default)).Messages.ShouldBe((ulong)5);
// Wait for messages to expire.
await Task.Delay(300);
// Trigger pruning by appending a new message.
await store.AppendAsync("foo", "trigger"u8.ToArray(), default);
var state = await store.GetStateAsync(default);
// Only the freshly-appended trigger message should remain.
state.Messages.ShouldBe((ulong)1);
}
// Go: TestFileStoreAgeLimit server/filestore_test.go:660
[SlopwatchSuppress("SW004", "MaxAge TTL expiry test requires real wall-clock time to elapse; no synchronisation primitive can replace observing time-based expiration")]
[Fact]
public async Task MaxAge_timer_fires_again_for_second_batch()
{
await using var store = CreateStore("age-second-batch", new FileStoreOptions { MaxAgeMs = 200 });
for (var i = 0; i < 3; i++)
await store.AppendAsync("foo", "batch1"u8.ToArray(), default);
await Task.Delay(300);
// Trigger pruning.
await store.AppendAsync("foo", "trigger1"u8.ToArray(), default);
(await store.GetStateAsync(default)).Messages.ShouldBe((ulong)1);
// Second batch.
for (var i = 0; i < 3; i++)
await store.AppendAsync("foo", "batch2"u8.ToArray(), default);
await Task.Delay(300);
await store.AppendAsync("foo", "trigger2"u8.ToArray(), default);
(await store.GetStateAsync(default)).Messages.ShouldBe((ulong)1);
}
// Go: TestFileStoreAgeLimit server/filestore_test.go:616
[SlopwatchSuppress("SW004", "MaxAge TTL expiry test requires real wall-clock time to elapse; verifying zero-age means no expiration needs a delay window")]
[Fact]
public async Task MaxAge_zero_means_no_expiration()
{
await using var store = CreateStore("age-zero", new FileStoreOptions { MaxAgeMs = 0 });
for (var i = 0; i < 5; i++)
await store.AppendAsync("foo", "Hello"u8.ToArray(), default);
await Task.Delay(100);
// Trigger append to check pruning.
await store.AppendAsync("foo", "trigger"u8.ToArray(), default);
(await store.GetStateAsync(default)).Messages.ShouldBe((ulong)6);
}
// Go: TestFileStoreMsgLimit server/filestore_test.go:484
[Fact]
public async Task TrimToMaxMessages_zero_removes_all()
{
await using var store = CreateStore("trim-zero");
for (var i = 0; i < 5; i++)
await store.AppendAsync("foo", "data"u8.ToArray(), default);
store.TrimToMaxMessages(0);
var state = await store.GetStateAsync(default);
state.Messages.ShouldBe((ulong)0);
}
// Go: TestFileStoreMsgLimit server/filestore_test.go:484
[Fact]
public async Task TrimToMaxMessages_larger_than_count_is_noop()
{
await using var store = CreateStore("trim-noop");
for (var i = 0; i < 5; i++)
await store.AppendAsync("foo", "data"u8.ToArray(), default);
store.TrimToMaxMessages(100);
var state = await store.GetStateAsync(default);
state.Messages.ShouldBe((ulong)5);
state.FirstSeq.ShouldBe((ulong)1);
}
// Go: TestFileStoreBytesLimit server/filestore_test.go:537
[Fact]
public async Task Bytes_decrease_after_remove()
{
await using var store = CreateStore("bytes-rm");
var payload = new byte[100];
for (var i = 0; i < 5; i++)
await store.AppendAsync("foo", payload, default);
var before = await store.GetStateAsync(default);
before.Bytes.ShouldBe((ulong)500);
await store.RemoveAsync(1, default);
await store.RemoveAsync(3, default);
var after = await store.GetStateAsync(default);
after.Bytes.ShouldBe((ulong)300);
}
// Go: TestFileStoreBytesLimitWithDiscardNew server/filestore_test.go:583
[Fact]
public async Task Bytes_limit_with_discard_new_rejects_over_limit()
{
var payload = new byte[7];
await using var store = CreateStore("bytes-discard-new", new FileStoreOptions
{
MaxBytes = 20,
Discard = DiscardPolicy.New,
});
// 2 messages fit (14 bytes <= 20)
await store.AppendAsync("tiny", payload, default);
await store.AppendAsync("tiny", payload, default);
// 3rd rejected (14 + 7 = 21 > 20)
await Should.ThrowAsync<StoreCapacityException>(
async () => await store.AppendAsync("tiny", payload, default));
var state = await store.GetStateAsync(default);
state.Messages.ShouldBe(2UL);
state.Bytes.ShouldBe(14UL);
}
// Go: TestFileStoreMaxMsgsPerSubject server/filestore_test.go:4065
[Fact]
public async Task MaxMsgsPerSubject_enforces_per_subject_limit()
{
await using var store = CreateStore("max-per-subj", new FileStoreOptions { MaxMsgsPerSubject = 2 });
// Store 5 messages on "foo" — only last 2 should survive.
for (var i = 0; i < 5; i++)
await store.AppendAsync("foo", Encoding.UTF8.GetBytes($"foo-{i}"), default);
// Store 3 messages on "bar" — only last 2 should survive.
for (var i = 0; i < 3; i++)
await store.AppendAsync("bar", Encoding.UTF8.GetBytes($"bar-{i}"), default);
var state = await store.GetStateAsync(default);
state.Messages.ShouldBe((ulong)4); // 2 foo + 2 bar
// Verify oldest foo messages were evicted.
(await store.LoadAsync(1, default)).ShouldBeNull();
(await store.LoadAsync(2, default)).ShouldBeNull();
(await store.LoadAsync(3, default)).ShouldBeNull();
// Last 2 foo messages should survive (seqs 4 and 5).
(await store.LoadAsync(4, default)).ShouldNotBeNull();
(await store.LoadAsync(5, default)).ShouldNotBeNull();
}
// Go: TestFileStoreMaxMsgsAndMaxMsgsPerSubject server/filestore_test.go:4098
[Fact]
public async Task MaxMsgs_and_MaxMsgsPerSubject_combined()
{
await using var store = CreateStore("max-combined", new FileStoreOptions { MaxMsgsPerSubject = 3 });
// Store messages across multiple subjects.
for (var i = 0; i < 5; i++)
await store.AppendAsync("foo", Encoding.UTF8.GetBytes($"foo-{i}"), default);
for (var i = 0; i < 5; i++)
await store.AppendAsync("bar", Encoding.UTF8.GetBytes($"bar-{i}"), default);
var state = await store.GetStateAsync(default);
// Each subject limited to 3 → 6 total.
state.Messages.ShouldBe((ulong)6);
// Verify per-subject: last 3 of each subject survive.
var fooLast = await store.LoadLastBySubjectAsync("foo", default);
fooLast.ShouldNotBeNull();
fooLast!.Sequence.ShouldBe((ulong)5);
var barLast = await store.LoadLastBySubjectAsync("bar", default);
barLast.ShouldNotBeNull();
barLast!.Sequence.ShouldBe((ulong)10);
}
// Go: TestFileStoreUpdateMaxMsgsPerSubject server/filestore_test.go:4563
[Fact]
public async Task UpdateConfig_changes_MaxMsgsPerSubject()
{
await using var store = CreateStore("update-max-per-subj");
// Store 10 messages on "foo".
for (var i = 0; i < 10; i++)
await store.AppendAsync("foo", Encoding.UTF8.GetBytes($"foo-{i}"), default);
(await store.GetStateAsync(default)).Messages.ShouldBe((ulong)10);
// Update config to limit to 3 per subject.
store.UpdateConfig(new StreamConfig { MaxMsgsPer = 3 });
var state = await store.GetStateAsync(default);
state.Messages.ShouldBe((ulong)3);
// Only the last 3 messages should remain.
(await store.LoadAsync(8, default)).ShouldNotBeNull();
(await store.LoadAsync(9, default)).ShouldNotBeNull();
(await store.LoadAsync(10, default)).ShouldNotBeNull();
(await store.LoadAsync(7, default)).ShouldBeNull();
}
// Go: TestFileStoreMsgLimit server/filestore_test.go:484
[Fact]
public async Task TrimToMaxMessages_persists_across_restart()
{
var subDir = "trim-persist";
await using (var store = CreateStore(subDir))
{
for (var i = 0; i < 20; i++)
await store.AppendAsync("foo", "data"u8.ToArray(), default);
store.TrimToMaxMessages(5);
}
await using (var store = CreateStore(subDir))
{
var state = await store.GetStateAsync(default);
state.Messages.ShouldBe((ulong)5);
state.FirstSeq.ShouldBe((ulong)16);
state.LastSeq.ShouldBe((ulong)20);
}
}
// Go: TestFileStoreAgeLimit server/filestore_test.go:616
[SlopwatchSuppress("SW004", "MaxAge TTL expiry test requires real wall-clock time to elapse; no synchronisation primitive can replace observing time-based expiration")]
[Fact]
public async Task MaxAge_with_interior_deletes()
{
await using var store = CreateStore("age-interior", new FileStoreOptions { MaxAgeMs = 200 });
for (var i = 0; i < 10; i++)
await store.AppendAsync("foo", "Hello"u8.ToArray(), default);
// Remove some interior messages.
await store.RemoveAsync(3, default);
await store.RemoveAsync(5, default);
await store.RemoveAsync(7, default);
(await store.GetStateAsync(default)).Messages.ShouldBe((ulong)7);
await Task.Delay(300);
// Trigger pruning.
await store.AppendAsync("foo", "trigger"u8.ToArray(), default);
var state = await store.GetStateAsync(default);
state.Messages.ShouldBe((ulong)1);
}
// Go: TestFileStoreMsgLimit server/filestore_test.go:484
[Fact]
public async Task Sequence_numbers_monotonically_increase_through_trimming()
{
await using var store = CreateStore("seq-mono");
for (var i = 1; i <= 15; i++)
await store.AppendAsync("foo", Encoding.UTF8.GetBytes($"msg-{i}"), default);
store.TrimToMaxMessages(5);
var state = await store.GetStateAsync(default);
state.LastSeq.ShouldBe((ulong)15);
state.FirstSeq.ShouldBe((ulong)11);
// Append more.
var nextSeq = await store.AppendAsync("foo", "after-trim"u8.ToArray(), default);
nextSeq.ShouldBe((ulong)16);
state = await store.GetStateAsync(default);
state.LastSeq.ShouldBe((ulong)16);
state.Messages.ShouldBe((ulong)6);
}
}