From ca2d8019a1ea8b5ca6228167aa868fef6b85cffd Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 13 Mar 2026 11:34:19 -0400 Subject: [PATCH] docs: add FileStore benchmarks and storage notes --- Documentation/JetStream/Overview.md | 46 ++---- .../JetStream/FileStoreAppendBenchmarks.cs | 151 ++++++++++++++++++ tests/NATS.Server.Benchmark.Tests/README.md | 3 + .../JetStream/Storage/FileStoreTtlTests.cs | 6 +- 4 files changed, 174 insertions(+), 32 deletions(-) create mode 100644 tests/NATS.Server.Benchmark.Tests/JetStream/FileStoreAppendBenchmarks.cs diff --git a/Documentation/JetStream/Overview.md b/Documentation/JetStream/Overview.md index d977c17..520490d 100644 --- a/Documentation/JetStream/Overview.md +++ b/Documentation/JetStream/Overview.md @@ -286,43 +286,31 @@ public ValueTask AppendAsync(string subject, ReadOnlyMemory payload ### FileStore -`FileStore` appends messages to a JSONL file (`messages.jsonl`) and keeps a full in-memory index (`Dictionary`) identical in structure to `MemStore`. It is not production-safe for several reasons: - -- **No locking**: `AppendAsync`, `LoadAsync`, `GetStateAsync`, and `TrimToMaxMessages` are not synchronized. Concurrent access from `StreamManager.Capture` and `PullConsumerEngine.FetchAsync` is unsafe. -- **Per-write file I/O**: Each `AppendAsync` calls `File.AppendAllTextAsync`, issuing a separate file open/write/close per message. -- **Full rewrite on trim**: `TrimToMaxMessages` calls `RewriteDataFile()`, which rewrites the entire file from the in-memory index. This is O(n) in message count and blocking. -- **Full in-memory index**: The in-memory dictionary holds every undeleted message payload; there is no paging or streaming read path. +`FileStore` now persists messages into block files via `MsgBlock`, keeps a live in-memory message cache for load paths, and maintains a compact metadata index (`Dictionary`) plus a per-subject last-sequence map for hot-path lookups such as `LoadLastBySubjectAsync`. Headers and payloads are stored separately and remain separate across snapshot, restore, block rewrite, and crash recovery. On startup, any legacy `messages.jsonl` file is migrated into block storage before recovery continues. ```csharp // FileStore.cs -public void TrimToMaxMessages(ulong maxMessages) +private readonly Dictionary _messages = new(); +private readonly Dictionary _messageIndexes = new(); +private readonly Dictionary _lastSequenceBySubject = new(StringComparer.Ordinal); + +public ValueTask LoadLastBySubjectAsync(string subject, CancellationToken ct) { - while ((ulong)_messages.Count > maxMessages) + if (_lastSequenceBySubject.TryGetValue(subject, out var sequence) + && _messages.TryGetValue(sequence, out var match)) { - var first = _messages.Keys.Min(); - _messages.Remove(first); + return ValueTask.FromResult(match); } - RewriteDataFile(); -} - -private void RewriteDataFile() -{ - var lines = new List(_messages.Count); - foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value)) - { - lines.Add(JsonSerializer.Serialize(new FileRecord - { - Sequence = message.Sequence, - Subject = message.Subject, - PayloadBase64 = Convert.ToBase64String(message.Payload.ToArray()), - })); - } - File.WriteAllLines(_dataFilePath, lines); + return ValueTask.FromResult(null); } ``` -The Go reference (`filestore.go`) uses block-based binary storage with S2 compression, per-block indexes, and memory-mapped I/O. This implementation shares none of those properties. +The current implementation is still materially simpler than Go `filestore.go`: + +- **No synchronization**: `FileStore` still exposes unsynchronized mutation and read paths. It is safe only under the current test and single-process usage assumptions. +- **Payloads still stay resident**: the compact index removes duplicate payload ownership for metadata-heavy operations, but `_messages` still retains live payload bytes in memory for direct load paths. +- **No Go-equivalent block index stack**: there is no per-block subject tree, mmap-backed read path, or Go-style cache/compaction parity. Deletes and trims rely on tombstones plus later block maintenance rather than Go's full production filestore behavior. --- @@ -445,7 +433,7 @@ The following features are present in the Go reference (`golang/nats-server/serv - **Ephemeral consumers**: `ConsumerManager.CreateOrUpdate` requires a non-empty `DurableName`. There is no support for unnamed ephemeral consumers. - **Push delivery over the NATS wire**: Push consumers enqueue `PushFrame` objects into an in-memory queue. No MSG is written to any connected NATS client's TCP socket. - **Consumer filter subject enforcement**: `FilterSubject` is stored on `ConsumerConfig` but is never applied in `PullConsumerEngine.FetchAsync`. All messages in the stream are returned regardless of filter. -- **FileStore production safety**: No locking, per-write file I/O, full-rewrite-on-trim, and full in-memory index make `FileStore` unsuitable for production use. +- **FileStore production safety**: `FileStore` now uses block files and compact metadata indexes, but it still lacks synchronization and Go-level block indexing, so it remains unsuitable for production use. - **RAFT persistence and networking**: `RaftNode` log entries are not persisted across restarts. Replication uses direct in-process method calls; there is no network transport for multi-server consensus. - **Cross-server replication**: Mirror and source coordinators work only within one `StreamManager` in one process. Messages published on a remote server are not replicated. - **Duplicate message window**: `PublishPreconditions` tracks message IDs for deduplication but there is no configurable `DuplicateWindow` TTL to expire old IDs. @@ -460,4 +448,4 @@ The following features are present in the Go reference (`golang/nats-server/serv - [Configuration Overview](../Configuration/Overview.md) - [Protocol Overview](../Protocol/Overview.md) - + diff --git a/tests/NATS.Server.Benchmark.Tests/JetStream/FileStoreAppendBenchmarks.cs b/tests/NATS.Server.Benchmark.Tests/JetStream/FileStoreAppendBenchmarks.cs new file mode 100644 index 0000000..18b349f --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/JetStream/FileStoreAppendBenchmarks.cs @@ -0,0 +1,151 @@ +using System.Diagnostics; +using NATS.Server.JetStream.Storage; +using Xunit.Abstractions; + +namespace NATS.Server.Benchmark.Tests.JetStream; + +[Collection("Benchmark-JetStream")] +public class FileStoreAppendBenchmarks(ITestOutputHelper output) +{ + [Fact] + [Trait("Category", "Benchmark")] + public async Task FileStore_AppendAsync_128B_Throughput() + { + var payload = new byte[128]; + var dir = CreateDirectory("append"); + var opts = CreateOptions(dir); + + try + { + await using var store = new FileStore(opts); + await MeasureAsync("FileStore AppendAsync (128B)", operations: 20_000, payload.Length, + i => store.AppendAsync($"bench.append.{i % 8}", payload, default).AsTask()); + } + finally + { + DeleteDirectory(dir); + } + } + + [Fact] + [Trait("Category", "Benchmark")] + public void FileStore_LoadLastBySubject_Throughput() + { + var payload = new byte[64]; + var dir = CreateDirectory("load-last"); + var opts = CreateOptions(dir); + + try + { + using var store = new FileStore(opts); + for (var i = 0; i < 25_000; i++) + store.StoreMsg($"bench.subject.{i % 16}", null, payload, 0L); + + Measure("FileStore LoadLastBySubject (hot)", operations: 50_000, payload.Length, + () => + { + var loaded = store.LoadLastBySubjectAsync("bench.subject.7", default).GetAwaiter().GetResult(); + if (loaded is null || loaded.Payload.Length != payload.Length) + throw new InvalidOperationException("LoadLastBySubjectAsync returned an unexpected result."); + }); + } + finally + { + DeleteDirectory(dir); + } + } + + [Fact] + [Trait("Category", "Benchmark")] + public void FileStore_PurgeEx_Trim_Overhead() + { + var payload = new byte[96]; + var dir = CreateDirectory("purge-trim"); + var opts = CreateOptions(dir); + + try + { + using var store = new FileStore(opts); + for (var i = 0; i < 12_000; i++) + store.StoreMsg($"bench.purge.{i % 6}", null, payload, 0L); + + Measure("FileStore PurgeEx+Trim", operations: 2_000, payload.Length, + () => + { + store.PurgeEx("bench.purge.1", 0, 8); + store.TrimToMaxMessages(10_000); + store.StoreMsg("bench.purge.1", null, payload, 0L); + }); + } + finally + { + DeleteDirectory(dir); + } + } + + private async Task MeasureAsync(string name, int operations, int payloadSize, Func action) + { + GC.Collect(); + GC.WaitForPendingFinalizers(); + GC.Collect(); + + var beforeAlloc = GC.GetAllocatedBytesForCurrentThread(); + var sw = Stopwatch.StartNew(); + + for (var i = 0; i < operations; i++) + await action(i); + + sw.Stop(); + WriteResult(name, operations, (long)operations * payloadSize, sw.Elapsed, GC.GetAllocatedBytesForCurrentThread() - beforeAlloc); + } + + private void Measure(string name, int operations, int payloadSize, Action action) + { + GC.Collect(); + GC.WaitForPendingFinalizers(); + GC.Collect(); + + var beforeAlloc = GC.GetAllocatedBytesForCurrentThread(); + var sw = Stopwatch.StartNew(); + + for (var i = 0; i < operations; i++) + action(); + + sw.Stop(); + WriteResult(name, operations, (long)operations * payloadSize, sw.Elapsed, GC.GetAllocatedBytesForCurrentThread() - beforeAlloc); + } + + private void WriteResult(string name, int operations, long totalBytes, TimeSpan elapsed, long allocatedBytes) + { + var opsPerSecond = operations / elapsed.TotalSeconds; + var megabytesPerSecond = totalBytes / elapsed.TotalSeconds / (1024.0 * 1024.0); + var bytesPerOperation = allocatedBytes / (double)operations; + + output.WriteLine($"=== {name} ==="); + output.WriteLine($"Ops: {opsPerSecond:N0} ops/s"); + output.WriteLine($"Data: {megabytesPerSecond:F1} MB/s"); + output.WriteLine($"Alloc: {bytesPerOperation:F1} B/op"); + output.WriteLine($"Elapsed: {elapsed.TotalMilliseconds:F0} ms"); + output.WriteLine(""); + } + + private static string CreateDirectory(string suffix) + => Path.Combine(Path.GetTempPath(), $"nats-js-filestore-bench-{suffix}-{Guid.NewGuid():N}"); + + private static FileStoreOptions CreateOptions(string dir) + { + Directory.CreateDirectory(dir); + + return new FileStoreOptions + { + Directory = dir, + BlockSizeBytes = 256 * 1024, + }; + } + + private static void DeleteDirectory(string dir) + { + if (Directory.Exists(dir)) + Directory.Delete(dir, recursive: true); + } +} diff --git a/tests/NATS.Server.Benchmark.Tests/README.md b/tests/NATS.Server.Benchmark.Tests/README.md index 40be85a..012465c 100644 --- a/tests/NATS.Server.Benchmark.Tests/README.md +++ b/tests/NATS.Server.Benchmark.Tests/README.md @@ -45,6 +45,9 @@ Use `-v normal` or `--logger "console;verbosity=detailed"` to see the comparison | `MultiClientLatencyTests` | `RequestReply_10Clients2Services_16B` | Request/reply latency, 10 concurrent clients, 2 queue-group services | | `SyncPublishTests` | `JSSyncPublish_16B_MemoryStore` | JetStream synchronous publish, memory-backed stream | | `AsyncPublishTests` | `JSAsyncPublish_128B_FileStore` | JetStream async batch publish, file-backed stream | +| `FileStoreAppendBenchmarks` | `FileStore_AppendAsync_128B_Throughput` | FileStore direct append throughput, 128-byte payload | +| `FileStoreAppendBenchmarks` | `FileStore_LoadLastBySubject_Throughput` | FileStore hot-path subject index lookup throughput | +| `FileStoreAppendBenchmarks` | `FileStore_PurgeEx_Trim_Overhead` | FileStore purge/trim maintenance overhead under repeated updates | | `OrderedConsumerTests` | `JSOrderedConsumer_Throughput` | JetStream ordered ephemeral consumer read throughput | | `DurableConsumerFetchTests` | `JSDurableFetch_Throughput` | JetStream durable consumer fetch-in-batches throughput | diff --git a/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreTtlTests.cs b/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreTtlTests.cs index d3ad5b9..9d55aed 100644 --- a/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreTtlTests.cs +++ b/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreTtlTests.cs @@ -181,7 +181,7 @@ public sealed class FileStoreTtlTests : IDisposable // Go: TestFileStoreStoreMsg — filestore.go storeMsg with headers [Fact] - public async Task StoreMsg_WithHeaders_CombinesHeadersAndPayload() + public async Task StoreMsg_WithHeaders_KeepsPayloadSeparateFromHeaders() { await using var store = CreateStore(sub: "storemsg-headers"); @@ -192,10 +192,10 @@ public sealed class FileStoreTtlTests : IDisposable seq.ShouldBe(1UL); ts.ShouldBeGreaterThan(0L); - // The stored payload should be the combination of headers + body. + // The stored payload should remain the message body only. var loaded = await store.LoadAsync(seq, default); loaded.ShouldNotBeNull(); - loaded!.Payload.Length.ShouldBe(hdr.Length + body.Length); + loaded!.Payload.ToArray().ShouldBe(body); } // Go: TestFileStoreStoreMsgPerMsgTtl — filestore.go per-message TTL override