From 9ff52164959294bf8f71d21f128b05dfb4381867 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 13 Mar 2026 10:34:31 -0400 Subject: [PATCH] perf: add compact FileStore index metadata --- .../JetStream/Storage/FileStore.cs | 293 ++++++++++-------- .../JetStream/Storage/StoredMessage.cs | 3 + .../JetStream/Storage/StoredMessageIndex.cs | 7 + .../FileStoreTests.cs | 23 ++ .../FileStoreOptimizationGuardTests.cs | 23 -- 5 files changed, 190 insertions(+), 159 deletions(-) create mode 100644 src/NATS.Server/JetStream/Storage/StoredMessageIndex.cs diff --git a/src/NATS.Server/JetStream/Storage/FileStore.cs b/src/NATS.Server/JetStream/Storage/FileStore.cs index a406dd8..598190b 100644 --- a/src/NATS.Server/JetStream/Storage/FileStore.cs +++ b/src/NATS.Server/JetStream/Storage/FileStore.cs @@ -28,6 +28,8 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable // In-memory cache: keyed by sequence number. This is the primary data structure // for reads and queries. The blocks are the on-disk persistence layer. private readonly Dictionary _messages = new(); + private readonly Dictionary _messageIndexes = new(); + private readonly Dictionary _lastSequenceBySubject = new(StringComparer.Ordinal); // Block-based storage: the active (writable) block and sealed blocks. private readonly List _blocks = []; @@ -141,20 +143,15 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable // We keep _messages for LoadAsync/RemoveAsync but avoid double payload.ToArray(). var persistedPayload = TransformForPersist(payload.Span); var storedPayload = _noTransform ? persistedPayload : payload.ToArray(); - _messages[_last] = new StoredMessage + TrackMessage(new StoredMessage { Sequence = _last, Subject = subject, Payload = storedPayload, TimestampUtc = now, - }; + }); _generation++; - _messageCount++; - _totalBytes += (ulong)payload.Length; - if (_messageCount == 1) - _firstSeq = _last; - // Go: register TTL only when TTL > 0. if (_options.MaxAgeMs > 0) RegisterTtl(_last, timestamp, (long)_options.MaxAgeMs * 1_000_000L); @@ -195,11 +192,13 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable public ValueTask LoadLastBySubjectAsync(string subject, CancellationToken ct) { - var match = _messages.Values - .Where(m => string.Equals(m.Subject, subject, StringComparison.Ordinal)) - .OrderByDescending(m => m.Sequence) - .FirstOrDefault(); - return ValueTask.FromResult(match); + if (_lastSequenceBySubject.TryGetValue(subject, out var sequence) + && _messages.TryGetValue(sequence, out var match)) + { + return ValueTask.FromResult(match); + } + + return ValueTask.FromResult(null); } public ValueTask> ListAsync(CancellationToken ct) @@ -212,21 +211,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable public ValueTask RemoveAsync(ulong sequence, CancellationToken ct) { - if (!_messages.TryGetValue(sequence, out var msg)) + if (!RemoveTrackedMessage(sequence, preserveHighWaterMark: false)) return ValueTask.FromResult(false); - _messages.Remove(sequence); _generation++; - // Incremental state tracking. - _messageCount--; - _totalBytes -= (ulong)msg.Payload.Length; - if (sequence == _firstSeq) - _firstSeq = _messages.Count == 0 ? 0UL : _messages.Keys.Min(); - - if (sequence == _last) - _last = _messages.Count == 0 ? 0UL : _messages.Keys.Max(); - // Soft-delete in the block that contains this sequence. DeleteInBlock(sequence); @@ -276,10 +265,13 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable public ValueTask RestoreSnapshotAsync(ReadOnlyMemory snapshot, CancellationToken ct) { _messages.Clear(); + _messageIndexes.Clear(); + _lastSequenceBySubject.Clear(); _last = 0; _messageCount = 0; _totalBytes = 0; _firstSeq = 0; + _first = 0; // Dispose existing blocks and clean files. DisposeAllBlocks(); @@ -303,14 +295,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable _messages[record.Sequence] = message; _last = Math.Max(_last, record.Sequence); } - - // Recompute incremental state from restored messages. - _messageCount = (ulong)_messages.Count; - _totalBytes = (ulong)_messages.Values.Sum(m => (long)m.Payload.Length); - _firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL; } } + RebuildIndexesFromMessages(); + // Write all messages to fresh blocks. RewriteBlocks(); return ValueTask.CompletedTask; @@ -332,14 +321,10 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable var trimmed = false; while ((ulong)_messages.Count > maxMessages) { - var first = _messages.Keys.Min(); - if (_messages.TryGetValue(first, out var msg)) - { - _totalBytes -= (ulong)msg.Payload.Length; - _messageCount--; - } + var first = _firstSeq; + if (first == 0 || !RemoveTrackedMessage(first, preserveHighWaterMark: true)) + break; - _messages.Remove(first); DeleteInBlock(first); trimmed = true; } @@ -347,7 +332,6 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable if (!trimmed) return; - _firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL; _generation++; } @@ -391,22 +375,15 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable } var persistedPayload = TransformForPersist(combined.AsSpan()); - var stored = new StoredMessage + TrackMessage(new StoredMessage { Sequence = _last, Subject = subject, Payload = combined, TimestampUtc = now, - }; - _messages[_last] = stored; + }); _generation++; - // Incremental state tracking. - _messageCount++; - _totalBytes += (ulong)combined.Length; - if (_messageCount == 1) - _firstSeq = _last; - // Determine effective TTL: per-message ttl (ns) takes priority over MaxAgeMs. // Go: filestore.go:6830 — if msg.ttl > 0 use it, else use cfg.MaxAge. var effectiveTtlNs = ttl > 0 ? ttl : (_options.MaxAgeMs > 0 ? (long)_options.MaxAgeMs * 1_000_000L : 0L); @@ -443,11 +420,14 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable { var count = (ulong)_messages.Count; _messages.Clear(); + _messageIndexes.Clear(); + _lastSequenceBySubject.Clear(); _generation++; _last = 0; _messageCount = 0; _totalBytes = 0; _firstSeq = 0; + _first = 0; DisposeAllBlocks(); CleanBlockFiles(); @@ -490,20 +470,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable foreach (var msg in toRemove) { - _messages.Remove(msg.Sequence); - _totalBytes -= (ulong)msg.Payload.Length; - _messageCount--; + RemoveTrackedMessage(msg.Sequence, preserveHighWaterMark: true); DeleteInBlock(msg.Sequence); } _generation++; - _firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL; - - // Update _last if required. - if (_messages.Count == 0) - _last = 0; - else if (!_messages.ContainsKey(_last)) - _last = _messages.Keys.Max(); return (ulong)toRemove.Count; } @@ -524,13 +495,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable foreach (var s in toRemove) { - if (_messages.TryGetValue(s, out var msg)) - { - _totalBytes -= (ulong)msg.Payload.Length; - _messageCount--; - } - - _messages.Remove(s); + RemoveTrackedMessage(s, preserveHighWaterMark: true); DeleteInBlock(s); } @@ -545,11 +510,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable } else { - if (!_messages.ContainsKey(_last)) - _last = _messages.Keys.Max(); - // Update _first to reflect the real first message. - _first = _messages.Keys.Min(); - _firstSeq = _first; + _first = _firstSeq; } return (ulong)toRemove.Length; @@ -566,11 +527,14 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable { // Truncate to nothing. _messages.Clear(); + _messageIndexes.Clear(); + _lastSequenceBySubject.Clear(); _generation++; _last = 0; _messageCount = 0; _totalBytes = 0; _firstSeq = 0; + _first = 0; DisposeAllBlocks(); CleanBlockFiles(); return; @@ -579,13 +543,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable var toRemove = _messages.Keys.Where(k => k > seq).ToArray(); foreach (var s in toRemove) { - if (_messages.TryGetValue(s, out var msg)) - { - _totalBytes -= (ulong)msg.Payload.Length; - _messageCount--; - } - - _messages.Remove(s); + RemoveTrackedMessage(s, preserveHighWaterMark: false); DeleteInBlock(s); } @@ -594,8 +552,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable // Update _last to the new highest existing sequence (or seq if it exists, // or the highest below seq). - _last = _messages.Count == 0 ? 0 : _messages.Keys.Max(); - _firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL; + if (_messageCount == 0) + { + _last = 0; + _first = 0; + _firstSeq = 0; + } } /// @@ -865,6 +827,119 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable } } + private void TrackMessage(StoredMessage message) + { + _messages[message.Sequence] = message; + _messageIndexes[message.Sequence] = message.ToIndex(); + _lastSequenceBySubject[message.Subject] = message.Sequence; + _messageCount++; + _totalBytes += (ulong)message.Payload.Length; + + if (_messageCount == 1) + { + _first = message.Sequence; + _firstSeq = message.Sequence; + } + } + + private bool RemoveTrackedMessage(ulong sequence, bool preserveHighWaterMark) + { + if (!_messages.Remove(sequence, out var message)) + return false; + + _messageIndexes.Remove(sequence); + _messageCount--; + _totalBytes -= (ulong)message.Payload.Length; + UpdateLastSequenceForSubject(message.Subject, sequence); + + if (_messageCount == 0) + { + if (preserveHighWaterMark) + _first = _last + 1; + else + _last = 0; + + _firstSeq = 0; + return true; + } + + if (sequence == _firstSeq) + AdvanceFirstSequence(sequence + 1); + + if (!preserveHighWaterMark && sequence == _last) + _last = FindPreviousLiveSequence(sequence); + + return true; + } + + private void AdvanceFirstSequence(ulong start) + { + var candidate = start; + while (!_messageIndexes.ContainsKey(candidate) && candidate <= _last) + candidate++; + + if (candidate <= _last) + { + _first = candidate; + _firstSeq = candidate; + return; + } + + _first = _last + 1; + _firstSeq = 0; + } + + private ulong FindPreviousLiveSequence(ulong startExclusive) + { + if (_messageCount == 0 || startExclusive == 0) + return 0; + + for (var seq = startExclusive - 1; ; seq--) + { + if (_messageIndexes.ContainsKey(seq)) + return seq; + + if (seq == 0) + return 0; + } + } + + private void UpdateLastSequenceForSubject(string subject, ulong removedSequence) + { + if (!_lastSequenceBySubject.TryGetValue(subject, out var currentLast) || currentLast != removedSequence) + return; + + for (var seq = removedSequence - 1; ; seq--) + { + if (_messageIndexes.TryGetValue(seq, out var candidate) && string.Equals(candidate.Subject, subject, StringComparison.Ordinal)) + { + _lastSequenceBySubject[subject] = seq; + return; + } + + if (seq == 0) + break; + } + + _lastSequenceBySubject.Remove(subject); + } + + private void RebuildIndexesFromMessages() + { + _messageIndexes.Clear(); + _lastSequenceBySubject.Clear(); + _messageCount = 0; + _totalBytes = 0; + _firstSeq = 0; + _first = 0; + + foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value)) + TrackMessage(message); + + if (_messageCount == 0 && _last > 0) + _first = _last + 1; + } + // ------------------------------------------------------------------------- // Subject matching helper // ------------------------------------------------------------------------- @@ -1055,10 +1130,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable DisposeAllBlocks(); CleanBlockFiles(); - _last = _messages.Count == 0 ? 0UL : _messages.Keys.Max(); - _firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL; - _messageCount = (ulong)_messages.Count; - _totalBytes = (ulong)_messages.Values.Sum(m => (long)m.Payload.Length); + RebuildIndexesFromMessages(); foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value)) { @@ -1168,15 +1240,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable } // Sync _first from _messages; if empty, set to _last+1 (watermark). - if (_messages.Count > 0) - _first = _messages.Keys.Min(); - else if (_last > 0) - _first = _last + 1; - - // Recompute incremental state from recovered messages. - _messageCount = (ulong)_messages.Count; - _totalBytes = (ulong)_messages.Values.Sum(m => (long)m.Payload.Length); - _firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL; + RebuildIndexesFromMessages(); } /// @@ -1370,20 +1434,10 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable // Reference: golang/nats-server/server/filestore.go:expireMsgs — dmap-based removal. foreach (var seq in expired) { - if (_messages.Remove(seq, out var msg)) - { - _messageCount--; - _totalBytes -= (ulong)msg.Payload.Length; - } - + RemoveTrackedMessage(seq, preserveHighWaterMark: true); DeleteInBlock(seq); } - if (_messages.Count > 0) - _firstSeq = _messages.Keys.Min(); - else - _firstSeq = 0; - _generation++; } @@ -1407,14 +1461,8 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable foreach (var sequence in expired) { - if (_messages.Remove(sequence, out var msg)) - { - _messageCount--; - _totalBytes -= (ulong)msg.Payload.Length; - } + RemoveTrackedMessage(sequence, preserveHighWaterMark: true); } - - _firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL; RewriteBlocks(); } @@ -1676,27 +1724,13 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable /// public bool RemoveMsg(ulong seq) { - if (!_messages.Remove(seq, out var msg)) + if (!RemoveTrackedMessage(seq, preserveHighWaterMark: true)) return false; - _generation++; - _messageCount--; - _totalBytes -= (ulong)msg.Payload.Length; // Go: filestore.go — LastSeq (lmb.last.seq) is a high-water mark and is // never decremented on removal. Only FirstSeq advances when the first // live message is removed. - if (_messages.Count == 0) - { - _first = _last + 1; // All gone — next first would be after last - _firstSeq = 0; - } - else - { - _first = _messages.Keys.Min(); - _firstSeq = _first; - } - DeleteInBlock(seq); return true; } @@ -1709,23 +1743,10 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable /// public bool EraseMsg(ulong seq) { - if (!_messages.Remove(seq, out var msg)) + if (!RemoveTrackedMessage(seq, preserveHighWaterMark: true)) return false; _generation++; - _messageCount--; - _totalBytes -= (ulong)msg.Payload.Length; - - if (_messages.Count == 0) - { - _first = _last + 1; - _firstSeq = 0; - } - else - { - _first = _messages.Keys.Min(); - _firstSeq = _first; - } // Secure erase: overwrite payload bytes with random data before marking deleted. // Reference: golang/nats-server/server/filestore.go:5890 (eraseMsg). diff --git a/src/NATS.Server/JetStream/Storage/StoredMessage.cs b/src/NATS.Server/JetStream/Storage/StoredMessage.cs index ab848d8..f40805c 100644 --- a/src/NATS.Server/JetStream/Storage/StoredMessage.cs +++ b/src/NATS.Server/JetStream/Storage/StoredMessage.cs @@ -18,4 +18,7 @@ public sealed class StoredMessage /// Convenience accessor for the Nats-Msg-Id header value, used by source deduplication. /// public string? MsgId => Headers is not null && Headers.TryGetValue("Nats-Msg-Id", out var id) ? id : null; + + internal StoredMessageIndex ToIndex() + => new(Sequence, Subject, Payload.Length, TimestampUtc); } diff --git a/src/NATS.Server/JetStream/Storage/StoredMessageIndex.cs b/src/NATS.Server/JetStream/Storage/StoredMessageIndex.cs new file mode 100644 index 0000000..6551d15 --- /dev/null +++ b/src/NATS.Server/JetStream/Storage/StoredMessageIndex.cs @@ -0,0 +1,7 @@ +namespace NATS.Server.JetStream.Storage; + +public readonly record struct StoredMessageIndex( + ulong Sequence, + string Subject, + int PayloadLength, + DateTime TimestampUtc); diff --git a/tests/NATS.Server.JetStream.Tests/FileStoreTests.cs b/tests/NATS.Server.JetStream.Tests/FileStoreTests.cs index 7ca58f3..af22f6a 100644 --- a/tests/NATS.Server.JetStream.Tests/FileStoreTests.cs +++ b/tests/NATS.Server.JetStream.Tests/FileStoreTests.cs @@ -15,4 +15,27 @@ public class FileStoreTests await using var recovered = new FileStore(new FileStoreOptions { Directory = dir.FullName }); (await recovered.GetStateAsync(default)).Messages.ShouldBe((ulong)1); } + + [Fact] + public async Task Snapshot_round_trip_preserves_headers_and_payload_separately() + { + var srcDir = Directory.CreateTempSubdirectory(); + var dstDir = Directory.CreateTempSubdirectory(); + + await using var src = new FileStore(new FileStoreOptions { Directory = srcDir.FullName }); + var hdr = "NATS/1.0\r\nX-Test: two\r\n\r\n"u8.ToArray(); + var msg = "payload-two"u8.ToArray(); + + var (seq, _) = src.StoreMsg("events.a", hdr, msg, 0L); + var snapshot = await src.CreateSnapshotAsync(default); + + await using var dst = new FileStore(new FileStoreOptions { Directory = dstDir.FullName }); + await dst.RestoreSnapshotAsync(snapshot, default); + + var loaded = dst.LoadMsg(seq, null); + loaded.Header.ShouldNotBeNull(); + loaded.Header.ShouldBe(hdr); + loaded.Data.ShouldNotBeNull(); + loaded.Data.ShouldBe(msg); + } } diff --git a/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreOptimizationGuardTests.cs b/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreOptimizationGuardTests.cs index f04613e..4b7e967 100644 --- a/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreOptimizationGuardTests.cs +++ b/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreOptimizationGuardTests.cs @@ -4,29 +4,6 @@ namespace NATS.Server.JetStream.Tests.JetStream.Storage; public sealed class FileStoreOptimizationGuardTests { - [Fact] - public async Task Snapshot_round_trip_preserves_headers_and_payload_separately() - { - var srcDir = Directory.CreateTempSubdirectory(); - var dstDir = Directory.CreateTempSubdirectory(); - - await using var src = new FileStore(new FileStoreOptions { Directory = srcDir.FullName }); - var hdr = "NATS/1.0\r\nX-Test: two\r\n\r\n"u8.ToArray(); - var msg = "payload-two"u8.ToArray(); - - var (seq, _) = src.StoreMsg("events.a", hdr, msg, 0L); - var snapshot = await src.CreateSnapshotAsync(default); - - await using var dst = new FileStore(new FileStoreOptions { Directory = dstDir.FullName }); - await dst.RestoreSnapshotAsync(snapshot, default); - - var loaded = dst.LoadMsg(seq, null); - loaded.Header.ShouldNotBeNull(); - loaded.Header.ShouldBe(hdr); - loaded.Data.ShouldNotBeNull(); - loaded.Data.ShouldBe(msg); - } - [Fact] public async Task PurgeEx_updates_last_by_subject_after_recovery() {