From 655ca30e0bbd0ca790872c606b9a58ae77235c74 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 13 Mar 2026 10:29:02 -0400 Subject: [PATCH 1/5] fix: stabilize pull consumer expires timeout fetch --- .../JetStream/Consumers/PullConsumerEngine.cs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs b/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs index 5d1bbbc..eb84e6d 100644 --- a/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs +++ b/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs @@ -95,9 +95,6 @@ public sealed class CompiledFilter public sealed class PullConsumerEngine { - // Reusable fetch buffer to avoid per-fetch List allocation. - // Go reference: consumer.go — reuses slice backing array across fetch calls. - [ThreadStatic] private static List? t_fetchBuf; // Go: consumer.go — cluster-wide pending pull request tracking keyed by reply subject. // Reference: golang/nats-server/server/consumer.go waitingRequestsPending / proposeWaitingRequest private readonly ConcurrentDictionary _clusterPending = @@ -158,11 +155,7 @@ public sealed class PullConsumerEngine public async ValueTask FetchAsync(StreamHandle stream, ConsumerHandle consumer, PullFetchRequest request, CancellationToken ct) { var batch = Math.Max(request.Batch, 1); - // Use thread-static buffer to avoid per-fetch List allocation. - // Results are snapshot'd into PullFetchBatch before the buffer is reused. - var messages = t_fetchBuf ??= new List(); - messages.Clear(); - if (messages.Capacity < batch) messages.Capacity = batch; + var messages = new List(batch); // Go: consumer.go — enforce ExpiresMs timeout on pull fetch requests. // When ExpiresMs > 0, create a linked CancellationTokenSource that fires @@ -313,6 +306,7 @@ public sealed class PullConsumerEngine await Task.Delay(5, ct).ConfigureAwait(false); } + ct.ThrowIfCancellationRequested(); return null; } From 5674853628b77a65a6d41d66809de865e79c98dc Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 13 Mar 2026 10:29:10 -0400 Subject: [PATCH 2/5] test: lock FileStore optimization boundaries --- .../FileStoreOptimizationGuardTests.cs | 51 +++++++++++++++++++ .../JetStream/Storage/StoreInterfaceTests.cs | 18 +++++++ .../JetStreamStoreIndexTests.cs | 18 +++++++ 3 files changed, 87 insertions(+) create mode 100644 tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreOptimizationGuardTests.cs diff --git a/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreOptimizationGuardTests.cs b/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreOptimizationGuardTests.cs new file mode 100644 index 0000000..f04613e --- /dev/null +++ b/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreOptimizationGuardTests.cs @@ -0,0 +1,51 @@ +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.JetStream.Tests.JetStream.Storage; + +public sealed class FileStoreOptimizationGuardTests +{ + [Fact] + public async Task Snapshot_round_trip_preserves_headers_and_payload_separately() + { + var srcDir = Directory.CreateTempSubdirectory(); + var dstDir = Directory.CreateTempSubdirectory(); + + await using var src = new FileStore(new FileStoreOptions { Directory = srcDir.FullName }); + var hdr = "NATS/1.0\r\nX-Test: two\r\n\r\n"u8.ToArray(); + var msg = "payload-two"u8.ToArray(); + + var (seq, _) = src.StoreMsg("events.a", hdr, msg, 0L); + var snapshot = await src.CreateSnapshotAsync(default); + + await using var dst = new FileStore(new FileStoreOptions { Directory = dstDir.FullName }); + await dst.RestoreSnapshotAsync(snapshot, default); + + var loaded = dst.LoadMsg(seq, null); + loaded.Header.ShouldNotBeNull(); + loaded.Header.ShouldBe(hdr); + loaded.Data.ShouldNotBeNull(); + loaded.Data.ShouldBe(msg); + } + + [Fact] + public async Task PurgeEx_updates_last_by_subject_after_recovery() + { + var dir = Directory.CreateTempSubdirectory(); + + await using (var store = new FileStore(new FileStoreOptions { Directory = dir.FullName })) + { + store.StoreMsg("events.a", null, "one"u8.ToArray(), 0L); + store.StoreMsg("events.a", null, "two"u8.ToArray(), 0L); + store.StoreMsg("events.b", null, "other"u8.ToArray(), 0L); + store.PurgeEx("events.a", 0, 1); + await store.FlushAllPending(); + } + + await using var recovered = new FileStore(new FileStoreOptions { Directory = dir.FullName }); + var last = await recovered.LoadLastBySubjectAsync("events.a", default); + + last.ShouldNotBeNull(); + last.Sequence.ShouldBe(2UL); + last.Payload.ToArray().ShouldBe("two"u8.ToArray()); + } +} diff --git a/tests/NATS.Server.JetStream.Tests/JetStream/Storage/StoreInterfaceTests.cs b/tests/NATS.Server.JetStream.Tests/JetStream/Storage/StoreInterfaceTests.cs index 8a0aa34..701bff9 100644 --- a/tests/NATS.Server.JetStream.Tests/JetStream/Storage/StoreInterfaceTests.cs +++ b/tests/NATS.Server.JetStream.Tests/JetStream/Storage/StoreInterfaceTests.cs @@ -532,4 +532,22 @@ public sealed class StoreInterfaceTests lastMsg = s.LoadLastMsg("foo", null); lastMsg.Sequence.ShouldBe(2UL); } + + [Fact] + public void FileStore_LoadMsg_preserves_headers_separately_from_payload() + { + var dir = Directory.CreateTempSubdirectory(); + using var store = new FileStore(new FileStoreOptions { Directory = dir.FullName }); + + var hdr = "NATS/1.0\r\nX-Test: one\r\n\r\n"u8.ToArray(); + var msg = "payload"u8.ToArray(); + + var (seq, _) = store.StoreMsg("foo", hdr, msg, 0L); + var loaded = store.LoadMsg(seq, null); + + loaded.Header.ShouldNotBeNull(); + loaded.Header.ShouldBe(hdr); + loaded.Data.ShouldNotBeNull(); + loaded.Data.ShouldBe(msg); + } } diff --git a/tests/NATS.Server.JetStream.Tests/JetStreamStoreIndexTests.cs b/tests/NATS.Server.JetStream.Tests/JetStreamStoreIndexTests.cs index 5c91d98..6cc12d0 100644 --- a/tests/NATS.Server.JetStream.Tests/JetStreamStoreIndexTests.cs +++ b/tests/NATS.Server.JetStream.Tests/JetStreamStoreIndexTests.cs @@ -15,4 +15,22 @@ public class JetStreamStoreIndexTests var last = await store.LoadLastBySubjectAsync("orders.created", default); last!.Payload.Span.SequenceEqual("3"u8).ShouldBeTrue(); } + + [Fact] + public async Task FileStore_trim_to_zero_preserves_high_water_mark_for_empty_state() + { + var dir = Directory.CreateTempSubdirectory(); + await using var store = new FileStore(new FileStoreOptions { Directory = dir.FullName }); + + await store.AppendAsync("orders.created", "1"u8.ToArray(), default); + await store.AppendAsync("orders.updated", "2"u8.ToArray(), default); + await store.AppendAsync("orders.created", "3"u8.ToArray(), default); + + store.TrimToMaxMessages(0); + + var state = await store.GetStateAsync(default); + state.Messages.ShouldBe(0UL); + state.LastSeq.ShouldBe(3UL); + state.FirstSeq.ShouldBe(4UL); + } } From 9ff52164959294bf8f71d21f128b05dfb4381867 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 13 Mar 2026 10:34:31 -0400 Subject: [PATCH 3/5] perf: add compact FileStore index metadata --- .../JetStream/Storage/FileStore.cs | 293 ++++++++++-------- .../JetStream/Storage/StoredMessage.cs | 3 + .../JetStream/Storage/StoredMessageIndex.cs | 7 + .../FileStoreTests.cs | 23 ++ .../FileStoreOptimizationGuardTests.cs | 23 -- 5 files changed, 190 insertions(+), 159 deletions(-) create mode 100644 src/NATS.Server/JetStream/Storage/StoredMessageIndex.cs diff --git a/src/NATS.Server/JetStream/Storage/FileStore.cs b/src/NATS.Server/JetStream/Storage/FileStore.cs index a406dd8..598190b 100644 --- a/src/NATS.Server/JetStream/Storage/FileStore.cs +++ b/src/NATS.Server/JetStream/Storage/FileStore.cs @@ -28,6 +28,8 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable // In-memory cache: keyed by sequence number. This is the primary data structure // for reads and queries. The blocks are the on-disk persistence layer. private readonly Dictionary _messages = new(); + private readonly Dictionary _messageIndexes = new(); + private readonly Dictionary _lastSequenceBySubject = new(StringComparer.Ordinal); // Block-based storage: the active (writable) block and sealed blocks. private readonly List _blocks = []; @@ -141,20 +143,15 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable // We keep _messages for LoadAsync/RemoveAsync but avoid double payload.ToArray(). var persistedPayload = TransformForPersist(payload.Span); var storedPayload = _noTransform ? persistedPayload : payload.ToArray(); - _messages[_last] = new StoredMessage + TrackMessage(new StoredMessage { Sequence = _last, Subject = subject, Payload = storedPayload, TimestampUtc = now, - }; + }); _generation++; - _messageCount++; - _totalBytes += (ulong)payload.Length; - if (_messageCount == 1) - _firstSeq = _last; - // Go: register TTL only when TTL > 0. if (_options.MaxAgeMs > 0) RegisterTtl(_last, timestamp, (long)_options.MaxAgeMs * 1_000_000L); @@ -195,11 +192,13 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable public ValueTask LoadLastBySubjectAsync(string subject, CancellationToken ct) { - var match = _messages.Values - .Where(m => string.Equals(m.Subject, subject, StringComparison.Ordinal)) - .OrderByDescending(m => m.Sequence) - .FirstOrDefault(); - return ValueTask.FromResult(match); + if (_lastSequenceBySubject.TryGetValue(subject, out var sequence) + && _messages.TryGetValue(sequence, out var match)) + { + return ValueTask.FromResult(match); + } + + return ValueTask.FromResult(null); } public ValueTask> ListAsync(CancellationToken ct) @@ -212,21 +211,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable public ValueTask RemoveAsync(ulong sequence, CancellationToken ct) { - if (!_messages.TryGetValue(sequence, out var msg)) + if (!RemoveTrackedMessage(sequence, preserveHighWaterMark: false)) return ValueTask.FromResult(false); - _messages.Remove(sequence); _generation++; - // Incremental state tracking. - _messageCount--; - _totalBytes -= (ulong)msg.Payload.Length; - if (sequence == _firstSeq) - _firstSeq = _messages.Count == 0 ? 0UL : _messages.Keys.Min(); - - if (sequence == _last) - _last = _messages.Count == 0 ? 0UL : _messages.Keys.Max(); - // Soft-delete in the block that contains this sequence. DeleteInBlock(sequence); @@ -276,10 +265,13 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable public ValueTask RestoreSnapshotAsync(ReadOnlyMemory snapshot, CancellationToken ct) { _messages.Clear(); + _messageIndexes.Clear(); + _lastSequenceBySubject.Clear(); _last = 0; _messageCount = 0; _totalBytes = 0; _firstSeq = 0; + _first = 0; // Dispose existing blocks and clean files. DisposeAllBlocks(); @@ -303,14 +295,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable _messages[record.Sequence] = message; _last = Math.Max(_last, record.Sequence); } - - // Recompute incremental state from restored messages. - _messageCount = (ulong)_messages.Count; - _totalBytes = (ulong)_messages.Values.Sum(m => (long)m.Payload.Length); - _firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL; } } + RebuildIndexesFromMessages(); + // Write all messages to fresh blocks. RewriteBlocks(); return ValueTask.CompletedTask; @@ -332,14 +321,10 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable var trimmed = false; while ((ulong)_messages.Count > maxMessages) { - var first = _messages.Keys.Min(); - if (_messages.TryGetValue(first, out var msg)) - { - _totalBytes -= (ulong)msg.Payload.Length; - _messageCount--; - } + var first = _firstSeq; + if (first == 0 || !RemoveTrackedMessage(first, preserveHighWaterMark: true)) + break; - _messages.Remove(first); DeleteInBlock(first); trimmed = true; } @@ -347,7 +332,6 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable if (!trimmed) return; - _firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL; _generation++; } @@ -391,22 +375,15 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable } var persistedPayload = TransformForPersist(combined.AsSpan()); - var stored = new StoredMessage + TrackMessage(new StoredMessage { Sequence = _last, Subject = subject, Payload = combined, TimestampUtc = now, - }; - _messages[_last] = stored; + }); _generation++; - // Incremental state tracking. - _messageCount++; - _totalBytes += (ulong)combined.Length; - if (_messageCount == 1) - _firstSeq = _last; - // Determine effective TTL: per-message ttl (ns) takes priority over MaxAgeMs. // Go: filestore.go:6830 — if msg.ttl > 0 use it, else use cfg.MaxAge. var effectiveTtlNs = ttl > 0 ? ttl : (_options.MaxAgeMs > 0 ? (long)_options.MaxAgeMs * 1_000_000L : 0L); @@ -443,11 +420,14 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable { var count = (ulong)_messages.Count; _messages.Clear(); + _messageIndexes.Clear(); + _lastSequenceBySubject.Clear(); _generation++; _last = 0; _messageCount = 0; _totalBytes = 0; _firstSeq = 0; + _first = 0; DisposeAllBlocks(); CleanBlockFiles(); @@ -490,20 +470,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable foreach (var msg in toRemove) { - _messages.Remove(msg.Sequence); - _totalBytes -= (ulong)msg.Payload.Length; - _messageCount--; + RemoveTrackedMessage(msg.Sequence, preserveHighWaterMark: true); DeleteInBlock(msg.Sequence); } _generation++; - _firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL; - - // Update _last if required. - if (_messages.Count == 0) - _last = 0; - else if (!_messages.ContainsKey(_last)) - _last = _messages.Keys.Max(); return (ulong)toRemove.Count; } @@ -524,13 +495,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable foreach (var s in toRemove) { - if (_messages.TryGetValue(s, out var msg)) - { - _totalBytes -= (ulong)msg.Payload.Length; - _messageCount--; - } - - _messages.Remove(s); + RemoveTrackedMessage(s, preserveHighWaterMark: true); DeleteInBlock(s); } @@ -545,11 +510,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable } else { - if (!_messages.ContainsKey(_last)) - _last = _messages.Keys.Max(); - // Update _first to reflect the real first message. - _first = _messages.Keys.Min(); - _firstSeq = _first; + _first = _firstSeq; } return (ulong)toRemove.Length; @@ -566,11 +527,14 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable { // Truncate to nothing. _messages.Clear(); + _messageIndexes.Clear(); + _lastSequenceBySubject.Clear(); _generation++; _last = 0; _messageCount = 0; _totalBytes = 0; _firstSeq = 0; + _first = 0; DisposeAllBlocks(); CleanBlockFiles(); return; @@ -579,13 +543,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable var toRemove = _messages.Keys.Where(k => k > seq).ToArray(); foreach (var s in toRemove) { - if (_messages.TryGetValue(s, out var msg)) - { - _totalBytes -= (ulong)msg.Payload.Length; - _messageCount--; - } - - _messages.Remove(s); + RemoveTrackedMessage(s, preserveHighWaterMark: false); DeleteInBlock(s); } @@ -594,8 +552,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable // Update _last to the new highest existing sequence (or seq if it exists, // or the highest below seq). - _last = _messages.Count == 0 ? 0 : _messages.Keys.Max(); - _firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL; + if (_messageCount == 0) + { + _last = 0; + _first = 0; + _firstSeq = 0; + } } /// @@ -865,6 +827,119 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable } } + private void TrackMessage(StoredMessage message) + { + _messages[message.Sequence] = message; + _messageIndexes[message.Sequence] = message.ToIndex(); + _lastSequenceBySubject[message.Subject] = message.Sequence; + _messageCount++; + _totalBytes += (ulong)message.Payload.Length; + + if (_messageCount == 1) + { + _first = message.Sequence; + _firstSeq = message.Sequence; + } + } + + private bool RemoveTrackedMessage(ulong sequence, bool preserveHighWaterMark) + { + if (!_messages.Remove(sequence, out var message)) + return false; + + _messageIndexes.Remove(sequence); + _messageCount--; + _totalBytes -= (ulong)message.Payload.Length; + UpdateLastSequenceForSubject(message.Subject, sequence); + + if (_messageCount == 0) + { + if (preserveHighWaterMark) + _first = _last + 1; + else + _last = 0; + + _firstSeq = 0; + return true; + } + + if (sequence == _firstSeq) + AdvanceFirstSequence(sequence + 1); + + if (!preserveHighWaterMark && sequence == _last) + _last = FindPreviousLiveSequence(sequence); + + return true; + } + + private void AdvanceFirstSequence(ulong start) + { + var candidate = start; + while (!_messageIndexes.ContainsKey(candidate) && candidate <= _last) + candidate++; + + if (candidate <= _last) + { + _first = candidate; + _firstSeq = candidate; + return; + } + + _first = _last + 1; + _firstSeq = 0; + } + + private ulong FindPreviousLiveSequence(ulong startExclusive) + { + if (_messageCount == 0 || startExclusive == 0) + return 0; + + for (var seq = startExclusive - 1; ; seq--) + { + if (_messageIndexes.ContainsKey(seq)) + return seq; + + if (seq == 0) + return 0; + } + } + + private void UpdateLastSequenceForSubject(string subject, ulong removedSequence) + { + if (!_lastSequenceBySubject.TryGetValue(subject, out var currentLast) || currentLast != removedSequence) + return; + + for (var seq = removedSequence - 1; ; seq--) + { + if (_messageIndexes.TryGetValue(seq, out var candidate) && string.Equals(candidate.Subject, subject, StringComparison.Ordinal)) + { + _lastSequenceBySubject[subject] = seq; + return; + } + + if (seq == 0) + break; + } + + _lastSequenceBySubject.Remove(subject); + } + + private void RebuildIndexesFromMessages() + { + _messageIndexes.Clear(); + _lastSequenceBySubject.Clear(); + _messageCount = 0; + _totalBytes = 0; + _firstSeq = 0; + _first = 0; + + foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value)) + TrackMessage(message); + + if (_messageCount == 0 && _last > 0) + _first = _last + 1; + } + // ------------------------------------------------------------------------- // Subject matching helper // ------------------------------------------------------------------------- @@ -1055,10 +1130,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable DisposeAllBlocks(); CleanBlockFiles(); - _last = _messages.Count == 0 ? 0UL : _messages.Keys.Max(); - _firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL; - _messageCount = (ulong)_messages.Count; - _totalBytes = (ulong)_messages.Values.Sum(m => (long)m.Payload.Length); + RebuildIndexesFromMessages(); foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value)) { @@ -1168,15 +1240,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable } // Sync _first from _messages; if empty, set to _last+1 (watermark). - if (_messages.Count > 0) - _first = _messages.Keys.Min(); - else if (_last > 0) - _first = _last + 1; - - // Recompute incremental state from recovered messages. - _messageCount = (ulong)_messages.Count; - _totalBytes = (ulong)_messages.Values.Sum(m => (long)m.Payload.Length); - _firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL; + RebuildIndexesFromMessages(); } /// @@ -1370,20 +1434,10 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable // Reference: golang/nats-server/server/filestore.go:expireMsgs — dmap-based removal. foreach (var seq in expired) { - if (_messages.Remove(seq, out var msg)) - { - _messageCount--; - _totalBytes -= (ulong)msg.Payload.Length; - } - + RemoveTrackedMessage(seq, preserveHighWaterMark: true); DeleteInBlock(seq); } - if (_messages.Count > 0) - _firstSeq = _messages.Keys.Min(); - else - _firstSeq = 0; - _generation++; } @@ -1407,14 +1461,8 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable foreach (var sequence in expired) { - if (_messages.Remove(sequence, out var msg)) - { - _messageCount--; - _totalBytes -= (ulong)msg.Payload.Length; - } + RemoveTrackedMessage(sequence, preserveHighWaterMark: true); } - - _firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL; RewriteBlocks(); } @@ -1676,27 +1724,13 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable /// public bool RemoveMsg(ulong seq) { - if (!_messages.Remove(seq, out var msg)) + if (!RemoveTrackedMessage(seq, preserveHighWaterMark: true)) return false; - _generation++; - _messageCount--; - _totalBytes -= (ulong)msg.Payload.Length; // Go: filestore.go — LastSeq (lmb.last.seq) is a high-water mark and is // never decremented on removal. Only FirstSeq advances when the first // live message is removed. - if (_messages.Count == 0) - { - _first = _last + 1; // All gone — next first would be after last - _firstSeq = 0; - } - else - { - _first = _messages.Keys.Min(); - _firstSeq = _first; - } - DeleteInBlock(seq); return true; } @@ -1709,23 +1743,10 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable /// public bool EraseMsg(ulong seq) { - if (!_messages.Remove(seq, out var msg)) + if (!RemoveTrackedMessage(seq, preserveHighWaterMark: true)) return false; _generation++; - _messageCount--; - _totalBytes -= (ulong)msg.Payload.Length; - - if (_messages.Count == 0) - { - _first = _last + 1; - _firstSeq = 0; - } - else - { - _first = _messages.Keys.Min(); - _firstSeq = _first; - } // Secure erase: overwrite payload bytes with random data before marking deleted. // Reference: golang/nats-server/server/filestore.go:5890 (eraseMsg). diff --git a/src/NATS.Server/JetStream/Storage/StoredMessage.cs b/src/NATS.Server/JetStream/Storage/StoredMessage.cs index ab848d8..f40805c 100644 --- a/src/NATS.Server/JetStream/Storage/StoredMessage.cs +++ b/src/NATS.Server/JetStream/Storage/StoredMessage.cs @@ -18,4 +18,7 @@ public sealed class StoredMessage /// Convenience accessor for the Nats-Msg-Id header value, used by source deduplication. /// public string? MsgId => Headers is not null && Headers.TryGetValue("Nats-Msg-Id", out var id) ? id : null; + + internal StoredMessageIndex ToIndex() + => new(Sequence, Subject, Payload.Length, TimestampUtc); } diff --git a/src/NATS.Server/JetStream/Storage/StoredMessageIndex.cs b/src/NATS.Server/JetStream/Storage/StoredMessageIndex.cs new file mode 100644 index 0000000..6551d15 --- /dev/null +++ b/src/NATS.Server/JetStream/Storage/StoredMessageIndex.cs @@ -0,0 +1,7 @@ +namespace NATS.Server.JetStream.Storage; + +public readonly record struct StoredMessageIndex( + ulong Sequence, + string Subject, + int PayloadLength, + DateTime TimestampUtc); diff --git a/tests/NATS.Server.JetStream.Tests/FileStoreTests.cs b/tests/NATS.Server.JetStream.Tests/FileStoreTests.cs index 7ca58f3..af22f6a 100644 --- a/tests/NATS.Server.JetStream.Tests/FileStoreTests.cs +++ b/tests/NATS.Server.JetStream.Tests/FileStoreTests.cs @@ -15,4 +15,27 @@ public class FileStoreTests await using var recovered = new FileStore(new FileStoreOptions { Directory = dir.FullName }); (await recovered.GetStateAsync(default)).Messages.ShouldBe((ulong)1); } + + [Fact] + public async Task Snapshot_round_trip_preserves_headers_and_payload_separately() + { + var srcDir = Directory.CreateTempSubdirectory(); + var dstDir = Directory.CreateTempSubdirectory(); + + await using var src = new FileStore(new FileStoreOptions { Directory = srcDir.FullName }); + var hdr = "NATS/1.0\r\nX-Test: two\r\n\r\n"u8.ToArray(); + var msg = "payload-two"u8.ToArray(); + + var (seq, _) = src.StoreMsg("events.a", hdr, msg, 0L); + var snapshot = await src.CreateSnapshotAsync(default); + + await using var dst = new FileStore(new FileStoreOptions { Directory = dstDir.FullName }); + await dst.RestoreSnapshotAsync(snapshot, default); + + var loaded = dst.LoadMsg(seq, null); + loaded.Header.ShouldNotBeNull(); + loaded.Header.ShouldBe(hdr); + loaded.Data.ShouldNotBeNull(); + loaded.Data.ShouldBe(msg); + } } diff --git a/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreOptimizationGuardTests.cs b/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreOptimizationGuardTests.cs index f04613e..4b7e967 100644 --- a/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreOptimizationGuardTests.cs +++ b/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreOptimizationGuardTests.cs @@ -4,29 +4,6 @@ namespace NATS.Server.JetStream.Tests.JetStream.Storage; public sealed class FileStoreOptimizationGuardTests { - [Fact] - public async Task Snapshot_round_trip_preserves_headers_and_payload_separately() - { - var srcDir = Directory.CreateTempSubdirectory(); - var dstDir = Directory.CreateTempSubdirectory(); - - await using var src = new FileStore(new FileStoreOptions { Directory = srcDir.FullName }); - var hdr = "NATS/1.0\r\nX-Test: two\r\n\r\n"u8.ToArray(); - var msg = "payload-two"u8.ToArray(); - - var (seq, _) = src.StoreMsg("events.a", hdr, msg, 0L); - var snapshot = await src.CreateSnapshotAsync(default); - - await using var dst = new FileStore(new FileStoreOptions { Directory = dstDir.FullName }); - await dst.RestoreSnapshotAsync(snapshot, default); - - var loaded = dst.LoadMsg(seq, null); - loaded.Header.ShouldNotBeNull(); - loaded.Header.ShouldBe(hdr); - loaded.Data.ShouldNotBeNull(); - loaded.Data.ShouldBe(msg); - } - [Fact] public async Task PurgeEx_updates_last_by_subject_after_recovery() { From f57edca5a88f9943691f95a962d11acda3d511ab Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 13 Mar 2026 11:21:48 -0400 Subject: [PATCH 4/5] perf: optimize FileStore payload and maintenance paths --- .../JetStream/Storage/FileStore.cs | 140 ++++++++++-------- .../JetStream/Storage/StoredMessage.cs | 1 + 2 files changed, 78 insertions(+), 63 deletions(-) diff --git a/src/NATS.Server/JetStream/Storage/FileStore.cs b/src/NATS.Server/JetStream/Storage/FileStore.cs index 598190b..07c2603 100644 --- a/src/NATS.Server/JetStream/Storage/FileStore.cs +++ b/src/NATS.Server/JetStream/Storage/FileStore.cs @@ -255,6 +255,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable { Sequence = x.Sequence, Subject = x.Subject, + HeadersBase64 = x.RawHeaders.IsEmpty ? null : Convert.ToBase64String(x.RawHeaders.Span), PayloadBase64 = Convert.ToBase64String(TransformForPersist(x.Payload.Span)), TimestampUtc = x.TimestampUtc, }) @@ -284,11 +285,15 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable { foreach (var record in records) { + var restoredHeaders = string.IsNullOrEmpty(record.HeadersBase64) + ? ReadOnlyMemory.Empty + : Convert.FromBase64String(record.HeadersBase64); var restoredPayload = RestorePayload(Convert.FromBase64String(record.PayloadBase64 ?? string.Empty)); var message = new StoredMessage { Sequence = record.Sequence, Subject = record.Subject ?? string.Empty, + RawHeaders = restoredHeaders, Payload = restoredPayload, TimestampUtc = record.TimestampUtc, }; @@ -361,25 +366,15 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable var now = DateTime.UtcNow; var timestamp = new DateTimeOffset(now).ToUnixTimeMilliseconds() * 1_000_000L; - // Combine headers and payload (headers precede the body in NATS wire format). - byte[] combined; - if (hdr is { Length: > 0 }) - { - combined = new byte[hdr.Length + (msg?.Length ?? 0)]; - hdr.CopyTo(combined, 0); - msg?.CopyTo(combined, hdr.Length); - } - else - { - combined = msg ?? []; - } - - var persistedPayload = TransformForPersist(combined.AsSpan()); + var headers = hdr is { Length: > 0 } ? hdr : []; + var payload = msg ?? []; + var persistedPayload = TransformForPersist(payload); TrackMessage(new StoredMessage { Sequence = _last, Subject = subject, - Payload = combined, + RawHeaders = headers, + Payload = payload, TimestampUtc = now, }); _generation++; @@ -392,16 +387,16 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable EnsureActiveBlock(); try { - _activeBlock!.WriteAt(_last, subject, ReadOnlyMemory.Empty, persistedPayload, timestamp); + _activeBlock!.WriteAt(_last, subject, headers, persistedPayload, timestamp); } catch (InvalidOperationException) { RotateBlock(); - _activeBlock!.WriteAt(_last, subject, ReadOnlyMemory.Empty, persistedPayload, timestamp); + _activeBlock!.WriteAt(_last, subject, headers, persistedPayload, timestamp); } // Go: filestore.go:4443 (setupWriteCache) — record write in bounded cache manager. - _writeCache.TrackWrite(_activeBlock!.BlockId, persistedPayload.Length); + _writeCache.TrackWrite(_activeBlock!.BlockId, headers.Length + persistedPayload.Length); // Signal the background flush loop to coalesce and flush pending writes. _flushSignal.Writer.TryWrite(0); @@ -450,33 +445,53 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable if (string.IsNullOrEmpty(subject) && keep == 0 && seq == 0) return Purge(); - // Collect all messages matching the subject (with wildcard support) at or below seq, ordered by sequence. - var candidates = _messages.Values - .Where(m => SubjectMatchesFilter(m.Subject, subject)) - .Where(m => seq == 0 || m.Sequence <= seq) - .OrderBy(m => m.Sequence) - .ToList(); - - if (candidates.Count == 0) + var upperBound = seq == 0 ? _last : Math.Min(seq, _last); + if (upperBound == 0) return 0; - // Keep the newest `keep` messages; purge the rest. - var toRemove = keep > 0 && (ulong)candidates.Count > keep - ? candidates.Take(candidates.Count - (int)keep).ToList() - : (keep == 0 ? candidates : []); - - if (toRemove.Count == 0) - return 0; - - foreach (var msg in toRemove) + ulong candidateCount = 0; + for (var current = _messageCount == 0 ? 0UL : _first; current != 0 && current <= upperBound; current++) { - RemoveTrackedMessage(msg.Sequence, preserveHighWaterMark: true); - DeleteInBlock(msg.Sequence); + if (!_messages.TryGetValue(current, out var message)) + continue; + + if (!SubjectMatchesFilter(message.Subject, subject)) + continue; + + candidateCount++; } - _generation++; + if (candidateCount == 0) + return 0; - return (ulong)toRemove.Count; + var targetRemoveCount = keep > 0 + ? (candidateCount > keep ? candidateCount - keep : 0UL) + : candidateCount; + + if (targetRemoveCount == 0) + return 0; + + ulong removed = 0; + for (var current = _messageCount == 0 ? 0UL : _first; current != 0 && current <= upperBound && removed < targetRemoveCount; current++) + { + if (!_messages.TryGetValue(current, out var message)) + continue; + + if (!SubjectMatchesFilter(message.Subject, subject)) + continue; + + if (RemoveTrackedMessage(current, preserveHighWaterMark: true)) + { + DeleteInBlock(current); + removed++; + } + } + + if (removed == 0) + return 0; + + _generation++; + return removed; } /// @@ -833,7 +848,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable _messageIndexes[message.Sequence] = message.ToIndex(); _lastSequenceBySubject[message.Subject] = message.Sequence; _messageCount++; - _totalBytes += (ulong)message.Payload.Length; + _totalBytes += (ulong)(message.RawHeaders.Length + message.Payload.Length); if (_messageCount == 1) { @@ -849,7 +864,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable _messageIndexes.Remove(sequence); _messageCount--; - _totalBytes -= (ulong)message.Payload.Length; + _totalBytes -= (ulong)(message.RawHeaders.Length + message.Payload.Length); UpdateLastSequenceForSubject(message.Subject, sequence); if (_messageCount == 0) @@ -1130,6 +1145,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable DisposeAllBlocks(); CleanBlockFiles(); + _last = _messages.Count == 0 ? 0UL : _messages.Keys.Max(); RebuildIndexesFromMessages(); foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value)) @@ -1140,12 +1156,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable EnsureActiveBlock(); try { - _activeBlock!.WriteAt(message.Sequence, message.Subject, ReadOnlyMemory.Empty, persistedPayload, timestamp); + _activeBlock!.WriteAt(message.Sequence, message.Subject, message.RawHeaders, persistedPayload, timestamp); } catch (InvalidOperationException) { RotateBlock(); - _activeBlock!.WriteAt(message.Sequence, message.Subject, ReadOnlyMemory.Empty, persistedPayload, timestamp); + _activeBlock!.WriteAt(message.Sequence, message.Subject, message.RawHeaders, persistedPayload, timestamp); } if (_activeBlock!.IsSealed) @@ -1271,6 +1287,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable { Sequence = record.Sequence, Subject = record.Subject, + RawHeaders = record.Headers, Payload = originalPayload, TimestampUtc = DateTimeOffset.FromUnixTimeMilliseconds(record.Timestamp / 1_000_000L).UtcDateTime, }; @@ -1834,6 +1851,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable sm ??= new StoreMsg(); sm.Clear(); sm.Subject = stored.Subject; + sm.Header = stored.RawHeaders.Length > 0 ? stored.RawHeaders.ToArray() : null; sm.Data = stored.Payload.Length > 0 ? stored.Payload.ToArray() : null; sm.Sequence = stored.Sequence; sm.Timestamp = new DateTimeOffset(stored.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L; @@ -1854,7 +1872,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable sm ??= new StoreMsg(); sm.Clear(); sm.Subject = record.Subject; - sm.Data = record.Payload.Length > 0 ? record.Payload.ToArray() : null; + sm.Header = record.Headers.Length > 0 ? record.Headers.ToArray() : null; + var originalPayload = RestorePayload(record.Payload.Span); + sm.Data = originalPayload.Length > 0 ? originalPayload : null; sm.Sequence = record.Sequence; sm.Timestamp = record.Timestamp; return sm; @@ -1908,6 +1928,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable sm ??= new StoreMsg(); sm.Clear(); sm.Subject = match.Subject; + sm.Header = match.RawHeaders.Length > 0 ? match.RawHeaders.ToArray() : null; sm.Data = match.Payload.Length > 0 ? match.Payload.ToArray() : null; sm.Sequence = match.Sequence; sm.Timestamp = new DateTimeOffset(match.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L; @@ -1938,6 +1959,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable sm ??= new StoreMsg(); sm.Clear(); sm.Subject = found.Value.Subject; + sm.Header = found.Value.RawHeaders.Length > 0 ? found.Value.RawHeaders.ToArray() : null; sm.Data = found.Value.Payload.Length > 0 ? found.Value.Payload.ToArray() : null; sm.Sequence = found.Key; sm.Timestamp = new DateTimeOffset(found.Value.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L; @@ -2062,20 +2084,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable if (_stopped) throw new ObjectDisposedException(nameof(FileStore), "Store has been stopped."); - // Combine headers and payload, same as StoreMsg. - byte[] combined; - if (hdr is { Length: > 0 }) - { - combined = new byte[hdr.Length + msg.Length]; - hdr.CopyTo(combined, 0); - msg.CopyTo(combined, hdr.Length); - } - else - { - combined = msg; - } - - var persistedPayload = TransformForPersist(combined.AsSpan()); + var headers = hdr is { Length: > 0 } ? hdr : []; + var payload = msg ?? []; + var persistedPayload = TransformForPersist(payload); // Recover UTC DateTime from caller-supplied Unix nanosecond timestamp. var storedUtc = DateTimeOffset.FromUnixTimeMilliseconds(ts / 1_000_000L).UtcDateTime; @@ -2083,10 +2094,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable { Sequence = seq, Subject = subject, - Payload = combined, + RawHeaders = headers, + Payload = payload, TimestampUtc = storedUtc, }; - _messages[seq] = stored; + TrackMessage(stored); _generation++; // Go: update _last to the high-water mark — do not decrement. @@ -2099,16 +2111,16 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable EnsureActiveBlock(); try { - _activeBlock!.WriteAt(seq, subject, ReadOnlyMemory.Empty, persistedPayload, ts); + _activeBlock!.WriteAt(seq, subject, headers, persistedPayload, ts); } catch (InvalidOperationException) { RotateBlock(); - _activeBlock!.WriteAt(seq, subject, ReadOnlyMemory.Empty, persistedPayload, ts); + _activeBlock!.WriteAt(seq, subject, headers, persistedPayload, ts); } // Go: filestore.go:4443 (setupWriteCache) — record write in bounded cache manager. - _writeCache.TrackWrite(_activeBlock!.BlockId, persistedPayload.Length); + _writeCache.TrackWrite(_activeBlock!.BlockId, headers.Length + persistedPayload.Length); if (_activeBlock!.IsSealed) RotateBlock(); @@ -2134,6 +2146,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable sm ??= new StoreMsg(); sm.Clear(); sm.Subject = stored.Subject; + sm.Header = stored.RawHeaders.Length > 0 ? stored.RawHeaders.ToArray() : null; sm.Data = stored.Payload.Length > 0 ? stored.Payload.ToArray() : null; sm.Sequence = stored.Sequence; sm.Timestamp = new DateTimeOffset(stored.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L; @@ -2386,6 +2399,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable { public ulong Sequence { get; init; } public string? Subject { get; init; } + public string? HeadersBase64 { get; init; } public string? PayloadBase64 { get; init; } public DateTime TimestampUtc { get; init; } } diff --git a/src/NATS.Server/JetStream/Storage/StoredMessage.cs b/src/NATS.Server/JetStream/Storage/StoredMessage.cs index f40805c..1a3af42 100644 --- a/src/NATS.Server/JetStream/Storage/StoredMessage.cs +++ b/src/NATS.Server/JetStream/Storage/StoredMessage.cs @@ -5,6 +5,7 @@ public sealed class StoredMessage public ulong Sequence { get; init; } public string Subject { get; init; } = string.Empty; public ReadOnlyMemory Payload { get; init; } + internal ReadOnlyMemory RawHeaders { get; init; } public DateTime TimestampUtc { get; init; } = DateTime.UtcNow; public string? Account { get; init; } public bool Redelivered { get; init; } From ca2d8019a1ea8b5ca6228167aa868fef6b85cffd Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 13 Mar 2026 11:34:19 -0400 Subject: [PATCH 5/5] docs: add FileStore benchmarks and storage notes --- Documentation/JetStream/Overview.md | 46 ++---- .../JetStream/FileStoreAppendBenchmarks.cs | 151 ++++++++++++++++++ tests/NATS.Server.Benchmark.Tests/README.md | 3 + .../JetStream/Storage/FileStoreTtlTests.cs | 6 +- 4 files changed, 174 insertions(+), 32 deletions(-) create mode 100644 tests/NATS.Server.Benchmark.Tests/JetStream/FileStoreAppendBenchmarks.cs diff --git a/Documentation/JetStream/Overview.md b/Documentation/JetStream/Overview.md index d977c17..520490d 100644 --- a/Documentation/JetStream/Overview.md +++ b/Documentation/JetStream/Overview.md @@ -286,43 +286,31 @@ public ValueTask AppendAsync(string subject, ReadOnlyMemory payload ### FileStore -`FileStore` appends messages to a JSONL file (`messages.jsonl`) and keeps a full in-memory index (`Dictionary`) identical in structure to `MemStore`. It is not production-safe for several reasons: - -- **No locking**: `AppendAsync`, `LoadAsync`, `GetStateAsync`, and `TrimToMaxMessages` are not synchronized. Concurrent access from `StreamManager.Capture` and `PullConsumerEngine.FetchAsync` is unsafe. -- **Per-write file I/O**: Each `AppendAsync` calls `File.AppendAllTextAsync`, issuing a separate file open/write/close per message. -- **Full rewrite on trim**: `TrimToMaxMessages` calls `RewriteDataFile()`, which rewrites the entire file from the in-memory index. This is O(n) in message count and blocking. -- **Full in-memory index**: The in-memory dictionary holds every undeleted message payload; there is no paging or streaming read path. +`FileStore` now persists messages into block files via `MsgBlock`, keeps a live in-memory message cache for load paths, and maintains a compact metadata index (`Dictionary`) plus a per-subject last-sequence map for hot-path lookups such as `LoadLastBySubjectAsync`. Headers and payloads are stored separately and remain separate across snapshot, restore, block rewrite, and crash recovery. On startup, any legacy `messages.jsonl` file is migrated into block storage before recovery continues. ```csharp // FileStore.cs -public void TrimToMaxMessages(ulong maxMessages) +private readonly Dictionary _messages = new(); +private readonly Dictionary _messageIndexes = new(); +private readonly Dictionary _lastSequenceBySubject = new(StringComparer.Ordinal); + +public ValueTask LoadLastBySubjectAsync(string subject, CancellationToken ct) { - while ((ulong)_messages.Count > maxMessages) + if (_lastSequenceBySubject.TryGetValue(subject, out var sequence) + && _messages.TryGetValue(sequence, out var match)) { - var first = _messages.Keys.Min(); - _messages.Remove(first); + return ValueTask.FromResult(match); } - RewriteDataFile(); -} - -private void RewriteDataFile() -{ - var lines = new List(_messages.Count); - foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value)) - { - lines.Add(JsonSerializer.Serialize(new FileRecord - { - Sequence = message.Sequence, - Subject = message.Subject, - PayloadBase64 = Convert.ToBase64String(message.Payload.ToArray()), - })); - } - File.WriteAllLines(_dataFilePath, lines); + return ValueTask.FromResult(null); } ``` -The Go reference (`filestore.go`) uses block-based binary storage with S2 compression, per-block indexes, and memory-mapped I/O. This implementation shares none of those properties. +The current implementation is still materially simpler than Go `filestore.go`: + +- **No synchronization**: `FileStore` still exposes unsynchronized mutation and read paths. It is safe only under the current test and single-process usage assumptions. +- **Payloads still stay resident**: the compact index removes duplicate payload ownership for metadata-heavy operations, but `_messages` still retains live payload bytes in memory for direct load paths. +- **No Go-equivalent block index stack**: there is no per-block subject tree, mmap-backed read path, or Go-style cache/compaction parity. Deletes and trims rely on tombstones plus later block maintenance rather than Go's full production filestore behavior. --- @@ -445,7 +433,7 @@ The following features are present in the Go reference (`golang/nats-server/serv - **Ephemeral consumers**: `ConsumerManager.CreateOrUpdate` requires a non-empty `DurableName`. There is no support for unnamed ephemeral consumers. - **Push delivery over the NATS wire**: Push consumers enqueue `PushFrame` objects into an in-memory queue. No MSG is written to any connected NATS client's TCP socket. - **Consumer filter subject enforcement**: `FilterSubject` is stored on `ConsumerConfig` but is never applied in `PullConsumerEngine.FetchAsync`. All messages in the stream are returned regardless of filter. -- **FileStore production safety**: No locking, per-write file I/O, full-rewrite-on-trim, and full in-memory index make `FileStore` unsuitable for production use. +- **FileStore production safety**: `FileStore` now uses block files and compact metadata indexes, but it still lacks synchronization and Go-level block indexing, so it remains unsuitable for production use. - **RAFT persistence and networking**: `RaftNode` log entries are not persisted across restarts. Replication uses direct in-process method calls; there is no network transport for multi-server consensus. - **Cross-server replication**: Mirror and source coordinators work only within one `StreamManager` in one process. Messages published on a remote server are not replicated. - **Duplicate message window**: `PublishPreconditions` tracks message IDs for deduplication but there is no configurable `DuplicateWindow` TTL to expire old IDs. @@ -460,4 +448,4 @@ The following features are present in the Go reference (`golang/nats-server/serv - [Configuration Overview](../Configuration/Overview.md) - [Protocol Overview](../Protocol/Overview.md) - + diff --git a/tests/NATS.Server.Benchmark.Tests/JetStream/FileStoreAppendBenchmarks.cs b/tests/NATS.Server.Benchmark.Tests/JetStream/FileStoreAppendBenchmarks.cs new file mode 100644 index 0000000..18b349f --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/JetStream/FileStoreAppendBenchmarks.cs @@ -0,0 +1,151 @@ +using System.Diagnostics; +using NATS.Server.JetStream.Storage; +using Xunit.Abstractions; + +namespace NATS.Server.Benchmark.Tests.JetStream; + +[Collection("Benchmark-JetStream")] +public class FileStoreAppendBenchmarks(ITestOutputHelper output) +{ + [Fact] + [Trait("Category", "Benchmark")] + public async Task FileStore_AppendAsync_128B_Throughput() + { + var payload = new byte[128]; + var dir = CreateDirectory("append"); + var opts = CreateOptions(dir); + + try + { + await using var store = new FileStore(opts); + await MeasureAsync("FileStore AppendAsync (128B)", operations: 20_000, payload.Length, + i => store.AppendAsync($"bench.append.{i % 8}", payload, default).AsTask()); + } + finally + { + DeleteDirectory(dir); + } + } + + [Fact] + [Trait("Category", "Benchmark")] + public void FileStore_LoadLastBySubject_Throughput() + { + var payload = new byte[64]; + var dir = CreateDirectory("load-last"); + var opts = CreateOptions(dir); + + try + { + using var store = new FileStore(opts); + for (var i = 0; i < 25_000; i++) + store.StoreMsg($"bench.subject.{i % 16}", null, payload, 0L); + + Measure("FileStore LoadLastBySubject (hot)", operations: 50_000, payload.Length, + () => + { + var loaded = store.LoadLastBySubjectAsync("bench.subject.7", default).GetAwaiter().GetResult(); + if (loaded is null || loaded.Payload.Length != payload.Length) + throw new InvalidOperationException("LoadLastBySubjectAsync returned an unexpected result."); + }); + } + finally + { + DeleteDirectory(dir); + } + } + + [Fact] + [Trait("Category", "Benchmark")] + public void FileStore_PurgeEx_Trim_Overhead() + { + var payload = new byte[96]; + var dir = CreateDirectory("purge-trim"); + var opts = CreateOptions(dir); + + try + { + using var store = new FileStore(opts); + for (var i = 0; i < 12_000; i++) + store.StoreMsg($"bench.purge.{i % 6}", null, payload, 0L); + + Measure("FileStore PurgeEx+Trim", operations: 2_000, payload.Length, + () => + { + store.PurgeEx("bench.purge.1", 0, 8); + store.TrimToMaxMessages(10_000); + store.StoreMsg("bench.purge.1", null, payload, 0L); + }); + } + finally + { + DeleteDirectory(dir); + } + } + + private async Task MeasureAsync(string name, int operations, int payloadSize, Func action) + { + GC.Collect(); + GC.WaitForPendingFinalizers(); + GC.Collect(); + + var beforeAlloc = GC.GetAllocatedBytesForCurrentThread(); + var sw = Stopwatch.StartNew(); + + for (var i = 0; i < operations; i++) + await action(i); + + sw.Stop(); + WriteResult(name, operations, (long)operations * payloadSize, sw.Elapsed, GC.GetAllocatedBytesForCurrentThread() - beforeAlloc); + } + + private void Measure(string name, int operations, int payloadSize, Action action) + { + GC.Collect(); + GC.WaitForPendingFinalizers(); + GC.Collect(); + + var beforeAlloc = GC.GetAllocatedBytesForCurrentThread(); + var sw = Stopwatch.StartNew(); + + for (var i = 0; i < operations; i++) + action(); + + sw.Stop(); + WriteResult(name, operations, (long)operations * payloadSize, sw.Elapsed, GC.GetAllocatedBytesForCurrentThread() - beforeAlloc); + } + + private void WriteResult(string name, int operations, long totalBytes, TimeSpan elapsed, long allocatedBytes) + { + var opsPerSecond = operations / elapsed.TotalSeconds; + var megabytesPerSecond = totalBytes / elapsed.TotalSeconds / (1024.0 * 1024.0); + var bytesPerOperation = allocatedBytes / (double)operations; + + output.WriteLine($"=== {name} ==="); + output.WriteLine($"Ops: {opsPerSecond:N0} ops/s"); + output.WriteLine($"Data: {megabytesPerSecond:F1} MB/s"); + output.WriteLine($"Alloc: {bytesPerOperation:F1} B/op"); + output.WriteLine($"Elapsed: {elapsed.TotalMilliseconds:F0} ms"); + output.WriteLine(""); + } + + private static string CreateDirectory(string suffix) + => Path.Combine(Path.GetTempPath(), $"nats-js-filestore-bench-{suffix}-{Guid.NewGuid():N}"); + + private static FileStoreOptions CreateOptions(string dir) + { + Directory.CreateDirectory(dir); + + return new FileStoreOptions + { + Directory = dir, + BlockSizeBytes = 256 * 1024, + }; + } + + private static void DeleteDirectory(string dir) + { + if (Directory.Exists(dir)) + Directory.Delete(dir, recursive: true); + } +} diff --git a/tests/NATS.Server.Benchmark.Tests/README.md b/tests/NATS.Server.Benchmark.Tests/README.md index 40be85a..012465c 100644 --- a/tests/NATS.Server.Benchmark.Tests/README.md +++ b/tests/NATS.Server.Benchmark.Tests/README.md @@ -45,6 +45,9 @@ Use `-v normal` or `--logger "console;verbosity=detailed"` to see the comparison | `MultiClientLatencyTests` | `RequestReply_10Clients2Services_16B` | Request/reply latency, 10 concurrent clients, 2 queue-group services | | `SyncPublishTests` | `JSSyncPublish_16B_MemoryStore` | JetStream synchronous publish, memory-backed stream | | `AsyncPublishTests` | `JSAsyncPublish_128B_FileStore` | JetStream async batch publish, file-backed stream | +| `FileStoreAppendBenchmarks` | `FileStore_AppendAsync_128B_Throughput` | FileStore direct append throughput, 128-byte payload | +| `FileStoreAppendBenchmarks` | `FileStore_LoadLastBySubject_Throughput` | FileStore hot-path subject index lookup throughput | +| `FileStoreAppendBenchmarks` | `FileStore_PurgeEx_Trim_Overhead` | FileStore purge/trim maintenance overhead under repeated updates | | `OrderedConsumerTests` | `JSOrderedConsumer_Throughput` | JetStream ordered ephemeral consumer read throughput | | `DurableConsumerFetchTests` | `JSDurableFetch_Throughput` | JetStream durable consumer fetch-in-batches throughput | diff --git a/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreTtlTests.cs b/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreTtlTests.cs index d3ad5b9..9d55aed 100644 --- a/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreTtlTests.cs +++ b/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreTtlTests.cs @@ -181,7 +181,7 @@ public sealed class FileStoreTtlTests : IDisposable // Go: TestFileStoreStoreMsg — filestore.go storeMsg with headers [Fact] - public async Task StoreMsg_WithHeaders_CombinesHeadersAndPayload() + public async Task StoreMsg_WithHeaders_KeepsPayloadSeparateFromHeaders() { await using var store = CreateStore(sub: "storemsg-headers"); @@ -192,10 +192,10 @@ public sealed class FileStoreTtlTests : IDisposable seq.ShouldBe(1UL); ts.ShouldBeGreaterThan(0L); - // The stored payload should be the combination of headers + body. + // The stored payload should remain the message body only. var loaded = await store.LoadAsync(seq, default); loaded.ShouldNotBeNull(); - loaded!.Payload.Length.ShouldBe(hdr.Length + body.Length); + loaded!.Payload.ToArray().ShouldBe(body); } // Go: TestFileStoreStoreMsgPerMsgTtl — filestore.go per-message TTL override