From f57edca5a88f9943691f95a962d11acda3d511ab Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 13 Mar 2026 11:21:48 -0400 Subject: [PATCH] perf: optimize FileStore payload and maintenance paths --- .../JetStream/Storage/FileStore.cs | 140 ++++++++++-------- .../JetStream/Storage/StoredMessage.cs | 1 + 2 files changed, 78 insertions(+), 63 deletions(-) diff --git a/src/NATS.Server/JetStream/Storage/FileStore.cs b/src/NATS.Server/JetStream/Storage/FileStore.cs index 598190b..07c2603 100644 --- a/src/NATS.Server/JetStream/Storage/FileStore.cs +++ b/src/NATS.Server/JetStream/Storage/FileStore.cs @@ -255,6 +255,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable { Sequence = x.Sequence, Subject = x.Subject, + HeadersBase64 = x.RawHeaders.IsEmpty ? null : Convert.ToBase64String(x.RawHeaders.Span), PayloadBase64 = Convert.ToBase64String(TransformForPersist(x.Payload.Span)), TimestampUtc = x.TimestampUtc, }) @@ -284,11 +285,15 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable { foreach (var record in records) { + var restoredHeaders = string.IsNullOrEmpty(record.HeadersBase64) + ? ReadOnlyMemory.Empty + : Convert.FromBase64String(record.HeadersBase64); var restoredPayload = RestorePayload(Convert.FromBase64String(record.PayloadBase64 ?? string.Empty)); var message = new StoredMessage { Sequence = record.Sequence, Subject = record.Subject ?? string.Empty, + RawHeaders = restoredHeaders, Payload = restoredPayload, TimestampUtc = record.TimestampUtc, }; @@ -361,25 +366,15 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable var now = DateTime.UtcNow; var timestamp = new DateTimeOffset(now).ToUnixTimeMilliseconds() * 1_000_000L; - // Combine headers and payload (headers precede the body in NATS wire format). - byte[] combined; - if (hdr is { Length: > 0 }) - { - combined = new byte[hdr.Length + (msg?.Length ?? 0)]; - hdr.CopyTo(combined, 0); - msg?.CopyTo(combined, hdr.Length); - } - else - { - combined = msg ?? []; - } - - var persistedPayload = TransformForPersist(combined.AsSpan()); + var headers = hdr is { Length: > 0 } ? hdr : []; + var payload = msg ?? []; + var persistedPayload = TransformForPersist(payload); TrackMessage(new StoredMessage { Sequence = _last, Subject = subject, - Payload = combined, + RawHeaders = headers, + Payload = payload, TimestampUtc = now, }); _generation++; @@ -392,16 +387,16 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable EnsureActiveBlock(); try { - _activeBlock!.WriteAt(_last, subject, ReadOnlyMemory.Empty, persistedPayload, timestamp); + _activeBlock!.WriteAt(_last, subject, headers, persistedPayload, timestamp); } catch (InvalidOperationException) { RotateBlock(); - _activeBlock!.WriteAt(_last, subject, ReadOnlyMemory.Empty, persistedPayload, timestamp); + _activeBlock!.WriteAt(_last, subject, headers, persistedPayload, timestamp); } // Go: filestore.go:4443 (setupWriteCache) — record write in bounded cache manager. - _writeCache.TrackWrite(_activeBlock!.BlockId, persistedPayload.Length); + _writeCache.TrackWrite(_activeBlock!.BlockId, headers.Length + persistedPayload.Length); // Signal the background flush loop to coalesce and flush pending writes. _flushSignal.Writer.TryWrite(0); @@ -450,33 +445,53 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable if (string.IsNullOrEmpty(subject) && keep == 0 && seq == 0) return Purge(); - // Collect all messages matching the subject (with wildcard support) at or below seq, ordered by sequence. - var candidates = _messages.Values - .Where(m => SubjectMatchesFilter(m.Subject, subject)) - .Where(m => seq == 0 || m.Sequence <= seq) - .OrderBy(m => m.Sequence) - .ToList(); - - if (candidates.Count == 0) + var upperBound = seq == 0 ? _last : Math.Min(seq, _last); + if (upperBound == 0) return 0; - // Keep the newest `keep` messages; purge the rest. - var toRemove = keep > 0 && (ulong)candidates.Count > keep - ? candidates.Take(candidates.Count - (int)keep).ToList() - : (keep == 0 ? candidates : []); - - if (toRemove.Count == 0) - return 0; - - foreach (var msg in toRemove) + ulong candidateCount = 0; + for (var current = _messageCount == 0 ? 0UL : _first; current != 0 && current <= upperBound; current++) { - RemoveTrackedMessage(msg.Sequence, preserveHighWaterMark: true); - DeleteInBlock(msg.Sequence); + if (!_messages.TryGetValue(current, out var message)) + continue; + + if (!SubjectMatchesFilter(message.Subject, subject)) + continue; + + candidateCount++; } - _generation++; + if (candidateCount == 0) + return 0; - return (ulong)toRemove.Count; + var targetRemoveCount = keep > 0 + ? (candidateCount > keep ? candidateCount - keep : 0UL) + : candidateCount; + + if (targetRemoveCount == 0) + return 0; + + ulong removed = 0; + for (var current = _messageCount == 0 ? 0UL : _first; current != 0 && current <= upperBound && removed < targetRemoveCount; current++) + { + if (!_messages.TryGetValue(current, out var message)) + continue; + + if (!SubjectMatchesFilter(message.Subject, subject)) + continue; + + if (RemoveTrackedMessage(current, preserveHighWaterMark: true)) + { + DeleteInBlock(current); + removed++; + } + } + + if (removed == 0) + return 0; + + _generation++; + return removed; } /// @@ -833,7 +848,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable _messageIndexes[message.Sequence] = message.ToIndex(); _lastSequenceBySubject[message.Subject] = message.Sequence; _messageCount++; - _totalBytes += (ulong)message.Payload.Length; + _totalBytes += (ulong)(message.RawHeaders.Length + message.Payload.Length); if (_messageCount == 1) { @@ -849,7 +864,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable _messageIndexes.Remove(sequence); _messageCount--; - _totalBytes -= (ulong)message.Payload.Length; + _totalBytes -= (ulong)(message.RawHeaders.Length + message.Payload.Length); UpdateLastSequenceForSubject(message.Subject, sequence); if (_messageCount == 0) @@ -1130,6 +1145,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable DisposeAllBlocks(); CleanBlockFiles(); + _last = _messages.Count == 0 ? 0UL : _messages.Keys.Max(); RebuildIndexesFromMessages(); foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value)) @@ -1140,12 +1156,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable EnsureActiveBlock(); try { - _activeBlock!.WriteAt(message.Sequence, message.Subject, ReadOnlyMemory.Empty, persistedPayload, timestamp); + _activeBlock!.WriteAt(message.Sequence, message.Subject, message.RawHeaders, persistedPayload, timestamp); } catch (InvalidOperationException) { RotateBlock(); - _activeBlock!.WriteAt(message.Sequence, message.Subject, ReadOnlyMemory.Empty, persistedPayload, timestamp); + _activeBlock!.WriteAt(message.Sequence, message.Subject, message.RawHeaders, persistedPayload, timestamp); } if (_activeBlock!.IsSealed) @@ -1271,6 +1287,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable { Sequence = record.Sequence, Subject = record.Subject, + RawHeaders = record.Headers, Payload = originalPayload, TimestampUtc = DateTimeOffset.FromUnixTimeMilliseconds(record.Timestamp / 1_000_000L).UtcDateTime, }; @@ -1834,6 +1851,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable sm ??= new StoreMsg(); sm.Clear(); sm.Subject = stored.Subject; + sm.Header = stored.RawHeaders.Length > 0 ? stored.RawHeaders.ToArray() : null; sm.Data = stored.Payload.Length > 0 ? stored.Payload.ToArray() : null; sm.Sequence = stored.Sequence; sm.Timestamp = new DateTimeOffset(stored.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L; @@ -1854,7 +1872,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable sm ??= new StoreMsg(); sm.Clear(); sm.Subject = record.Subject; - sm.Data = record.Payload.Length > 0 ? record.Payload.ToArray() : null; + sm.Header = record.Headers.Length > 0 ? record.Headers.ToArray() : null; + var originalPayload = RestorePayload(record.Payload.Span); + sm.Data = originalPayload.Length > 0 ? originalPayload : null; sm.Sequence = record.Sequence; sm.Timestamp = record.Timestamp; return sm; @@ -1908,6 +1928,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable sm ??= new StoreMsg(); sm.Clear(); sm.Subject = match.Subject; + sm.Header = match.RawHeaders.Length > 0 ? match.RawHeaders.ToArray() : null; sm.Data = match.Payload.Length > 0 ? match.Payload.ToArray() : null; sm.Sequence = match.Sequence; sm.Timestamp = new DateTimeOffset(match.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L; @@ -1938,6 +1959,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable sm ??= new StoreMsg(); sm.Clear(); sm.Subject = found.Value.Subject; + sm.Header = found.Value.RawHeaders.Length > 0 ? found.Value.RawHeaders.ToArray() : null; sm.Data = found.Value.Payload.Length > 0 ? found.Value.Payload.ToArray() : null; sm.Sequence = found.Key; sm.Timestamp = new DateTimeOffset(found.Value.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L; @@ -2062,20 +2084,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable if (_stopped) throw new ObjectDisposedException(nameof(FileStore), "Store has been stopped."); - // Combine headers and payload, same as StoreMsg. - byte[] combined; - if (hdr is { Length: > 0 }) - { - combined = new byte[hdr.Length + msg.Length]; - hdr.CopyTo(combined, 0); - msg.CopyTo(combined, hdr.Length); - } - else - { - combined = msg; - } - - var persistedPayload = TransformForPersist(combined.AsSpan()); + var headers = hdr is { Length: > 0 } ? hdr : []; + var payload = msg ?? []; + var persistedPayload = TransformForPersist(payload); // Recover UTC DateTime from caller-supplied Unix nanosecond timestamp. var storedUtc = DateTimeOffset.FromUnixTimeMilliseconds(ts / 1_000_000L).UtcDateTime; @@ -2083,10 +2094,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable { Sequence = seq, Subject = subject, - Payload = combined, + RawHeaders = headers, + Payload = payload, TimestampUtc = storedUtc, }; - _messages[seq] = stored; + TrackMessage(stored); _generation++; // Go: update _last to the high-water mark — do not decrement. @@ -2099,16 +2111,16 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable EnsureActiveBlock(); try { - _activeBlock!.WriteAt(seq, subject, ReadOnlyMemory.Empty, persistedPayload, ts); + _activeBlock!.WriteAt(seq, subject, headers, persistedPayload, ts); } catch (InvalidOperationException) { RotateBlock(); - _activeBlock!.WriteAt(seq, subject, ReadOnlyMemory.Empty, persistedPayload, ts); + _activeBlock!.WriteAt(seq, subject, headers, persistedPayload, ts); } // Go: filestore.go:4443 (setupWriteCache) — record write in bounded cache manager. - _writeCache.TrackWrite(_activeBlock!.BlockId, persistedPayload.Length); + _writeCache.TrackWrite(_activeBlock!.BlockId, headers.Length + persistedPayload.Length); if (_activeBlock!.IsSealed) RotateBlock(); @@ -2134,6 +2146,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable sm ??= new StoreMsg(); sm.Clear(); sm.Subject = stored.Subject; + sm.Header = stored.RawHeaders.Length > 0 ? stored.RawHeaders.ToArray() : null; sm.Data = stored.Payload.Length > 0 ? stored.Payload.ToArray() : null; sm.Sequence = stored.Sequence; sm.Timestamp = new DateTimeOffset(stored.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L; @@ -2386,6 +2399,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable { public ulong Sequence { get; init; } public string? Subject { get; init; } + public string? HeadersBase64 { get; init; } public string? PayloadBase64 { get; init; } public DateTime TimestampUtc { get; init; } } diff --git a/src/NATS.Server/JetStream/Storage/StoredMessage.cs b/src/NATS.Server/JetStream/Storage/StoredMessage.cs index f40805c..1a3af42 100644 --- a/src/NATS.Server/JetStream/Storage/StoredMessage.cs +++ b/src/NATS.Server/JetStream/Storage/StoredMessage.cs @@ -5,6 +5,7 @@ public sealed class StoredMessage public ulong Sequence { get; init; } public string Subject { get; init; } = string.Empty; public ReadOnlyMemory Payload { get; init; } + internal ReadOnlyMemory RawHeaders { get; init; } public DateTime TimestampUtc { get; init; } = DateTime.UtcNow; public string? Account { get; init; } public bool Redelivered { get; init; }