From 7404ecdb0e925456014b5cb7842c8ec1b22b265b Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 13 Mar 2026 15:09:21 -0400 Subject: [PATCH 1/3] perf: Phase 1 JetStream async file publish optimizations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add cached state properties (LastSeq, MessageCount, TotalBytes, FirstSeq) to IStreamStore/FileStore/MemStore — eliminates GetStateAsync on publish path - Add Capture(StreamHandle, ...) overload to StreamManager — eliminates double FindBySubject lookup (once in JetStreamPublisher, once in Capture) - Remove _messageIndexes dictionary from FileStore write path — all lookups now use _messages directly, saving ~48B allocation per message - Add JetStreamPubAckFormatter for hand-rolled UTF-8 success ack formatting — avoids JsonSerializer overhead on the hot publish path - Switch flush loop to exponential backoff (1→2→4→8ms) matching Go server --- .../Publish/JetStreamPubAckFormatter.cs | 41 +++++++++++++++++++ .../JetStream/Publish/JetStreamPublisher.cs | 14 +++---- .../JetStream/Storage/FileStore.cs | 34 ++++++++------- .../JetStream/Storage/IStreamStore.cs | 7 ++++ src/NATS.Server/JetStream/Storage/MemStore.cs | 6 +++ src/NATS.Server/JetStream/StreamManager.cs | 19 ++++++--- src/NATS.Server/NatsServer.cs | 15 ++++++- 7 files changed, 106 insertions(+), 30 deletions(-) create mode 100644 src/NATS.Server/JetStream/Publish/JetStreamPubAckFormatter.cs diff --git a/src/NATS.Server/JetStream/Publish/JetStreamPubAckFormatter.cs b/src/NATS.Server/JetStream/Publish/JetStreamPubAckFormatter.cs new file mode 100644 index 0000000..565b523 --- /dev/null +++ b/src/NATS.Server/JetStream/Publish/JetStreamPubAckFormatter.cs @@ -0,0 +1,41 @@ +using System.Text; + +namespace NATS.Server.JetStream.Publish; + +/// +/// Hand-rolled UTF-8 formatter for the common success PubAck case. +/// Avoids JsonSerializer overhead (~100-200B internal allocations + reflection). +/// For error/duplicate/batch acks, callers fall back to JsonSerializer. +/// +internal static class JetStreamPubAckFormatter +{ + // Pre-encoded UTF-8 fragments for {"stream":"NAME","seq":N} + private static readonly byte[] Prefix = "{\"stream\":\""u8.ToArray(); + private static readonly byte[] SeqField = "\",\"seq\":"u8.ToArray(); + private static readonly byte[] Suffix = "}"u8.ToArray(); + + /// + /// Formats a success PubAck directly into a span. Returns bytes written. + /// Caller must ensure dest is large enough (256 bytes is safe for any stream name). + /// + public static int FormatSuccess(Span dest, string streamName, ulong seq) + { + var pos = 0; + Prefix.CopyTo(dest); + pos += Prefix.Length; + pos += Encoding.UTF8.GetBytes(streamName, dest[pos..]); + SeqField.CopyTo(dest[pos..]); + pos += SeqField.Length; + seq.TryFormat(dest[pos..], out var written); + pos += written; + Suffix.CopyTo(dest[pos..]); + pos += Suffix.Length; + return pos; + } + + /// + /// Returns true if this PubAck is a simple success that can use the fast formatter. + /// + public static bool IsSimpleSuccess(PubAck ack) + => ack.ErrorCode == null && !ack.Duplicate && ack.BatchId == null; +} diff --git a/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs b/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs index 3941499..4c4e2ad 100644 --- a/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs +++ b/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs @@ -37,8 +37,8 @@ public sealed class JetStreamPublisher } // --- Normal (non-batch) publish path --- - var state = stream.Store.GetStateAsync(default).GetAwaiter().GetResult(); - if (!_preconditions.CheckExpectedLastSeq(options.ExpectedLastSeq, state.LastSeq)) + // Use cached LastSeq property instead of GetStateAsync to avoid allocation. + if (!_preconditions.CheckExpectedLastSeq(options.ExpectedLastSeq, stream.Store.LastSeq)) { ack = new PubAck { ErrorCode = 10071 }; return true; @@ -54,7 +54,8 @@ public sealed class JetStreamPublisher return true; } - var captured = _streamManager.Capture(subject, payload); + // Pass resolved stream to avoid double FindBySubject lookup. + var captured = _streamManager.Capture(stream, subject, payload); ack = captured ?? new PubAck(); _preconditions.Record(options.MsgId, ack.Seq); _preconditions.TrimOlderThan(stream.Config.DuplicateWindowMs); @@ -136,15 +137,14 @@ public sealed class JetStreamPublisher stream.Config.DuplicateWindowMs, staged => { - // Check expected last sequence. + // Check expected last sequence using cached property. if (staged.ExpectedLastSeq > 0) { - var st = stream.Store.GetStateAsync(default).GetAwaiter().GetResult(); - if (st.LastSeq != staged.ExpectedLastSeq) + if (stream.Store.LastSeq != staged.ExpectedLastSeq) return new PubAck { ErrorCode = 10071, Stream = stream.Config.Name }; } - var captured = _streamManager.Capture(staged.Subject, staged.Payload); + var captured = _streamManager.Capture(stream, staged.Subject, staged.Payload); return captured ?? new PubAck { Stream = stream.Config.Name }; }); diff --git a/src/NATS.Server/JetStream/Storage/FileStore.cs b/src/NATS.Server/JetStream/Storage/FileStore.cs index 07c2603..373bc9d 100644 --- a/src/NATS.Server/JetStream/Storage/FileStore.cs +++ b/src/NATS.Server/JetStream/Storage/FileStore.cs @@ -28,7 +28,8 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable // In-memory cache: keyed by sequence number. This is the primary data structure // for reads and queries. The blocks are the on-disk persistence layer. private readonly Dictionary _messages = new(); - private readonly Dictionary _messageIndexes = new(); + // _messageIndexes removed — all lookups now use _messages directly to avoid + // per-message StoredMessageIndex allocation on the write path. private readonly Dictionary _lastSequenceBySubject = new(StringComparer.Ordinal); // Block-based storage: the active (writable) block and sealed blocks. @@ -89,6 +90,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable public int BlockCount => _blocks.Count; public bool UsedIndexManifestOnStartup { get; private set; } + // IStreamStore cached state properties — O(1), maintained incrementally. + public ulong LastSeq => _last; + public ulong MessageCount => _messageCount; + public ulong TotalBytes => _totalBytes; + ulong IStreamStore.FirstSeq => _messageCount == 0 ? (_first > 0 ? _first : 0UL) : _firstSeq; + public FileStore(FileStoreOptions options) { _options = options; @@ -266,7 +273,6 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable public ValueTask RestoreSnapshotAsync(ReadOnlyMemory snapshot, CancellationToken ct) { _messages.Clear(); - _messageIndexes.Clear(); _lastSequenceBySubject.Clear(); _last = 0; _messageCount = 0; @@ -415,7 +421,6 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable { var count = (ulong)_messages.Count; _messages.Clear(); - _messageIndexes.Clear(); _lastSequenceBySubject.Clear(); _generation++; _last = 0; @@ -542,7 +547,6 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable { // Truncate to nothing. _messages.Clear(); - _messageIndexes.Clear(); _lastSequenceBySubject.Clear(); _generation++; _last = 0; @@ -845,7 +849,6 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable private void TrackMessage(StoredMessage message) { _messages[message.Sequence] = message; - _messageIndexes[message.Sequence] = message.ToIndex(); _lastSequenceBySubject[message.Subject] = message.Sequence; _messageCount++; _totalBytes += (ulong)(message.RawHeaders.Length + message.Payload.Length); @@ -861,8 +864,6 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable { if (!_messages.Remove(sequence, out var message)) return false; - - _messageIndexes.Remove(sequence); _messageCount--; _totalBytes -= (ulong)(message.RawHeaders.Length + message.Payload.Length); UpdateLastSequenceForSubject(message.Subject, sequence); @@ -890,7 +891,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable private void AdvanceFirstSequence(ulong start) { var candidate = start; - while (!_messageIndexes.ContainsKey(candidate) && candidate <= _last) + while (!_messages.ContainsKey(candidate) && candidate <= _last) candidate++; if (candidate <= _last) @@ -911,7 +912,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable for (var seq = startExclusive - 1; ; seq--) { - if (_messageIndexes.ContainsKey(seq)) + if (_messages.ContainsKey(seq)) return seq; if (seq == 0) @@ -926,7 +927,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable for (var seq = removedSequence - 1; ; seq--) { - if (_messageIndexes.TryGetValue(seq, out var candidate) && string.Equals(candidate.Subject, subject, StringComparison.Ordinal)) + if (_messages.TryGetValue(seq, out var candidate) && string.Equals(candidate.Subject, subject, StringComparison.Ordinal)) { _lastSequenceBySubject[subject] = seq; return; @@ -941,7 +942,6 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable private void RebuildIndexesFromMessages() { - _messageIndexes.Clear(); _lastSequenceBySubject.Clear(); _messageCount = 0; _totalBytes = 0; @@ -2317,12 +2317,16 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable if (block is null) continue; - var waited = 0; - while (block.PendingWriteSize < CoalesceMinimum && waited < MaxFlushWaitMs) + // Go-style exponential backoff: 1→2→4→8ms (vs linear 1ms × 8). + var waitMs = 1; + var totalWaited = 0; + while (block.PendingWriteSize < CoalesceMinimum && totalWaited < MaxFlushWaitMs) { - try { await Task.Delay(1, ct); } + var delay = Math.Min(waitMs, MaxFlushWaitMs - totalWaited); + try { await Task.Delay(delay, ct); } catch (OperationCanceledException) { break; } - waited++; + totalWaited += delay; + waitMs *= 2; } block.FlushPending(); diff --git a/src/NATS.Server/JetStream/Storage/IStreamStore.cs b/src/NATS.Server/JetStream/Storage/IStreamStore.cs index a034a6f..1601897 100644 --- a/src/NATS.Server/JetStream/Storage/IStreamStore.cs +++ b/src/NATS.Server/JetStream/Storage/IStreamStore.cs @@ -32,6 +32,13 @@ public interface IStreamStore // Existing MemStore/FileStore implementations return this type. ValueTask GetStateAsync(CancellationToken ct); + // Cached state properties — avoid GetStateAsync on the publish hot path. + // These are maintained incrementally by FileStore/MemStore and are O(1). + ulong LastSeq => throw new NotSupportedException("LastSeq not implemented."); + ulong MessageCount => throw new NotSupportedException("MessageCount not implemented."); + ulong TotalBytes => throw new NotSupportedException("TotalBytes not implemented."); + ulong FirstSeq => throw new NotSupportedException("FirstSeq not implemented."); + // ------------------------------------------------------------------------- // Go-parity sync interface — mirrors server/store.go StreamStore // Default implementations throw NotSupportedException so existing diff --git a/src/NATS.Server/JetStream/Storage/MemStore.cs b/src/NATS.Server/JetStream/Storage/MemStore.cs index b141768..750ba46 100644 --- a/src/NATS.Server/JetStream/Storage/MemStore.cs +++ b/src/NATS.Server/JetStream/Storage/MemStore.cs @@ -122,6 +122,12 @@ public sealed class MemStore : IStreamStore } } + // IStreamStore cached state properties — O(1), maintained incrementally. + public ulong LastSeq { get { lock (_gate) return _st.LastSeq; } } + public ulong MessageCount { get { lock (_gate) return _st.Msgs; } } + public ulong TotalBytes { get { lock (_gate) return _st.Bytes; } } + ulong IStreamStore.FirstSeq { get { lock (_gate) return _st.Msgs == 0 ? (_st.FirstSeq > 0 ? _st.FirstSeq : 0UL) : _st.FirstSeq; } } + // ------------------------------------------------------------------------- // Async helpers (used by existing JetStream layer) // ------------------------------------------------------------------------- diff --git a/src/NATS.Server/JetStream/StreamManager.cs b/src/NATS.Server/JetStream/StreamManager.cs index 82699d0..e0b5a65 100644 --- a/src/NATS.Server/JetStream/StreamManager.cs +++ b/src/NATS.Server/JetStream/StreamManager.cs @@ -397,6 +397,11 @@ public sealed class StreamManager : IDisposable if (stream == null) return null; + return Capture(stream, subject, payload); + } + + public PubAck? Capture(StreamHandle stream, string subject, ReadOnlyMemory payload) + { // Go: sealed stream rejects all publishes. // Reference: server/stream.go — processJetStreamMsg checks mset.cfg.Sealed. if (stream.Config.Sealed) @@ -414,17 +419,20 @@ public sealed class StreamManager : IDisposable // Go: memStoreMsgSize — full message size includes subject + headers + payload + 16 bytes overhead. var msgSize = subject.Length + payload.Length + 16; - var stateBefore = stream.Store.GetStateAsync(default).GetAwaiter().GetResult(); + // Use cached state properties instead of GetStateAsync to avoid allocation on hot path. + var currentMsgCount = stream.Store.MessageCount; + var currentBytes = stream.Store.TotalBytes; + var currentFirstSeq = stream.Store.FirstSeq; // Go: DiscardPolicy.New — reject when MaxMsgs reached. // Reference: server/stream.go — processJetStreamMsg checks discard new + maxMsgs. if (stream.Config.MaxMsgs > 0 && stream.Config.Discard == DiscardPolicy.New - && (long)stateBefore.Messages >= stream.Config.MaxMsgs) + && (long)currentMsgCount >= stream.Config.MaxMsgs) { return new PubAck { Stream = stream.Config.Name, ErrorCode = 10054 }; } - if (stream.Config.MaxBytes > 0 && (long)stateBefore.Bytes + msgSize > stream.Config.MaxBytes) + if (stream.Config.MaxBytes > 0 && (long)currentBytes + msgSize > stream.Config.MaxBytes) { if (stream.Config.Discard == DiscardPolicy.New) { @@ -435,10 +443,9 @@ public sealed class StreamManager : IDisposable }; } - while ((long)stateBefore.Bytes + msgSize > stream.Config.MaxBytes && stateBefore.FirstSeq > 0) + while ((long)stream.Store.TotalBytes + msgSize > stream.Config.MaxBytes && stream.Store.FirstSeq > 0) { - stream.Store.RemoveAsync(stateBefore.FirstSeq, default).GetAwaiter().GetResult(); - stateBefore = stream.Store.GetStateAsync(default).GetAwaiter().GetResult(); + stream.Store.RemoveAsync(stream.Store.FirstSeq, default).GetAwaiter().GetResult(); } } diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 87aea24..a8daf14 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -1399,8 +1399,19 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable // Go reference: server/jetstream.go — jsPubAckResponse sent to reply. if (replyTo != null) { - var ackData = JsonSerializer.SerializeToUtf8Bytes(pubAck, s_jetStreamJsonOptions); - ProcessMessage(replyTo, null, default, ackData, sender); + if (JetStream.Publish.JetStreamPubAckFormatter.IsSimpleSuccess(pubAck)) + { + // Fast path: hand-rolled UTF-8 formatter avoids JsonSerializer overhead. + Span ackBuf = stackalloc byte[256]; + var ackLen = JetStream.Publish.JetStreamPubAckFormatter.FormatSuccess(ackBuf, pubAck.Stream, pubAck.Seq); + ProcessMessage(replyTo, null, default, ackBuf[..ackLen].ToArray(), sender); + } + else + { + var ackData = JsonSerializer.SerializeToUtf8Bytes(pubAck, s_jetStreamJsonOptions); + ProcessMessage(replyTo, null, default, ackData, sender); + } + return; } } From 6e91fda7fd478ff5a3528c6e832e507588c83097 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 13 Mar 2026 15:33:38 -0400 Subject: [PATCH 2/3] perf: Phase 2 lazy StoredMessage materialization in FileStore MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace eager Dictionary with lightweight Dictionary to eliminate ~200B StoredMessage allocation per message on the write path. - Add MessageMeta struct (BlockId, Subject, PayloadLength, HeaderLength, TimestampNs) — ~40B vs ~200B for StoredMessage - Add MaterializeMessage(seq) for on-demand reconstruction from blocks - Update all ~60 _messages references to use _meta - Methods needing full payload (LoadAsync, ListAsync, etc.) call MaterializeMessage; metadata-only paths use _meta directly - Fix MsgBlock.WriteAt to clear stale delete markers on re-write --- .../JetStream/Storage/FileStore.cs | 598 +++++++++++------- src/NATS.Server/JetStream/Storage/MsgBlock.cs | 4 + 2 files changed, 365 insertions(+), 237 deletions(-) diff --git a/src/NATS.Server/JetStream/Storage/FileStore.cs b/src/NATS.Server/JetStream/Storage/FileStore.cs index 373bc9d..c1170ea 100644 --- a/src/NATS.Server/JetStream/Storage/FileStore.cs +++ b/src/NATS.Server/JetStream/Storage/FileStore.cs @@ -25,11 +25,23 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable { private readonly FileStoreOptions _options; - // In-memory cache: keyed by sequence number. This is the primary data structure - // for reads and queries. The blocks are the on-disk persistence layer. - private readonly Dictionary _messages = new(); - // _messageIndexes removed — all lookups now use _messages directly to avoid - // per-message StoredMessageIndex allocation on the write path. + // Lightweight metadata index: keyed by sequence number. Stores only the fields + // needed for existence checks, subject queries, and timestamp lookups. + // Full message payloads are materialized on demand from blocks via MaterializeMessage. + private readonly Dictionary _meta = new(); + + /// + /// Lightweight per-message metadata stored in the in-memory index. + /// Eliminates the ~200B StoredMessage allocation on the write path. + /// + private struct MessageMeta + { + public int BlockId; + public string Subject; + public int PayloadLength; + public int HeaderLength; + public long TimestampNs; + } private readonly Dictionary _lastSequenceBySubject = new(StringComparer.Ordinal); // Block-based storage: the active (writable) block and sealed blocks. @@ -147,23 +159,14 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable // Go: writeMsgRecordLocked writes directly into mb.cache.buf (a single contiguous // byte slice). It does NOT store a per-message object in a map. - // We keep _messages for LoadAsync/RemoveAsync but avoid double payload.ToArray(). + // We store lightweight MessageMeta instead of full StoredMessage to avoid ~200B alloc. var persistedPayload = TransformForPersist(payload.Span); - var storedPayload = _noTransform ? persistedPayload : payload.ToArray(); - TrackMessage(new StoredMessage - { - Sequence = _last, - Subject = subject, - Payload = storedPayload, - TimestampUtc = now, - }); - _generation++; // Go: register TTL only when TTL > 0. if (_options.MaxAgeMs > 0) RegisterTtl(_last, timestamp, (long)_options.MaxAgeMs * 1_000_000L); - // Write to MsgBlock. + // Write to MsgBlock first so we know the block ID for the meta entry. EnsureActiveBlock(); try { @@ -175,6 +178,10 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable _activeBlock!.WriteAt(_last, subject, ReadOnlyMemory.Empty, persistedPayload, timestamp); } + // Track lightweight metadata now that we know the block ID. + TrackMessage(_last, subject, 0, payload.Length, timestamp, _activeBlock!.BlockId); + _generation++; + _writeCache.TrackWrite(_activeBlock!.BlockId, persistedPayload.Length); _flushSignal.Writer.TryWrite(0); @@ -193,16 +200,15 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable public ValueTask LoadAsync(ulong sequence, CancellationToken ct) { - _messages.TryGetValue(sequence, out var msg); - return ValueTask.FromResult(msg); + return ValueTask.FromResult(MaterializeMessage(sequence)); } public ValueTask LoadLastBySubjectAsync(string subject, CancellationToken ct) { - if (_lastSequenceBySubject.TryGetValue(subject, out var sequence) - && _messages.TryGetValue(sequence, out var match)) + if (_lastSequenceBySubject.TryGetValue(subject, out var sequence)) { - return ValueTask.FromResult(match); + var msg = MaterializeMessage(sequence); + return ValueTask.FromResult(msg); } return ValueTask.FromResult(null); @@ -210,8 +216,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable public ValueTask> ListAsync(CancellationToken ct) { - var messages = _messages.Values - .OrderBy(m => m.Sequence) + var messages = _meta.Keys + .OrderBy(seq => seq) + .Select(seq => MaterializeMessage(seq)) + .Where(m => m is not null) + .Cast() .ToArray(); return ValueTask.FromResult>(messages); } @@ -231,7 +240,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable public ValueTask PurgeAsync(CancellationToken ct) { - _messages.Clear(); + _meta.Clear(); _generation++; _last = 0; _messageCount = 0; @@ -255,24 +264,29 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable public ValueTask CreateSnapshotAsync(CancellationToken ct) { - var snapshot = _messages - .Values - .OrderBy(x => x.Sequence) - .Select(x => new FileRecord + var snapshot = _meta.Keys + .OrderBy(seq => seq) + .Select(seq => { - Sequence = x.Sequence, - Subject = x.Subject, - HeadersBase64 = x.RawHeaders.IsEmpty ? null : Convert.ToBase64String(x.RawHeaders.Span), - PayloadBase64 = Convert.ToBase64String(TransformForPersist(x.Payload.Span)), - TimestampUtc = x.TimestampUtc, + var msg = MaterializeMessage(seq); + if (msg is null) return null; + return new FileRecord + { + Sequence = msg.Sequence, + Subject = msg.Subject, + HeadersBase64 = msg.RawHeaders.IsEmpty ? null : Convert.ToBase64String(msg.RawHeaders.Span), + PayloadBase64 = Convert.ToBase64String(TransformForPersist(msg.Payload.Span)), + TimestampUtc = msg.TimestampUtc, + }; }) + .Where(r => r is not null) .ToArray(); return ValueTask.FromResult(JsonSerializer.SerializeToUtf8Bytes(snapshot)); } public ValueTask RestoreSnapshotAsync(ReadOnlyMemory snapshot, CancellationToken ct) { - _messages.Clear(); + _meta.Clear(); _lastSequenceBySubject.Clear(); _last = 0; _messageCount = 0; @@ -295,24 +309,30 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable ? ReadOnlyMemory.Empty : Convert.FromBase64String(record.HeadersBase64); var restoredPayload = RestorePayload(Convert.FromBase64String(record.PayloadBase64 ?? string.Empty)); - var message = new StoredMessage - { - Sequence = record.Sequence, - Subject = record.Subject ?? string.Empty, - RawHeaders = restoredHeaders, - Payload = restoredPayload, - TimestampUtc = record.TimestampUtc, - }; - _messages[record.Sequence] = message; + var persistedPayload = TransformForPersist(restoredPayload); + var timestampNs = new DateTimeOffset(record.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L; + var subject = record.Subject ?? string.Empty; + _last = Math.Max(_last, record.Sequence); + + EnsureActiveBlock(); + try + { + _activeBlock!.WriteAt(record.Sequence, subject, restoredHeaders, persistedPayload, timestampNs); + } + catch (InvalidOperationException) + { + RotateBlock(); + _activeBlock!.WriteAt(record.Sequence, subject, restoredHeaders, persistedPayload, timestampNs); + } + + TrackMessage(record.Sequence, subject, restoredHeaders.Length, restoredPayload.Length, timestampNs, _activeBlock!.BlockId); + + if (_activeBlock!.IsSealed) + RotateBlock(); } } } - - RebuildIndexesFromMessages(); - - // Write all messages to fresh blocks. - RewriteBlocks(); return ValueTask.CompletedTask; } @@ -330,7 +350,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable public void TrimToMaxMessages(ulong maxMessages) { var trimmed = false; - while ((ulong)_messages.Count > maxMessages) + while ((ulong)_meta.Count > maxMessages) { var first = _firstSeq; if (first == 0 || !RemoveTrackedMessage(first, preserveHighWaterMark: true)) @@ -375,15 +395,6 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable var headers = hdr is { Length: > 0 } ? hdr : []; var payload = msg ?? []; var persistedPayload = TransformForPersist(payload); - TrackMessage(new StoredMessage - { - Sequence = _last, - Subject = subject, - RawHeaders = headers, - Payload = payload, - TimestampUtc = now, - }); - _generation++; // Determine effective TTL: per-message ttl (ns) takes priority over MaxAgeMs. // Go: filestore.go:6830 — if msg.ttl > 0 use it, else use cfg.MaxAge. @@ -401,6 +412,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable _activeBlock!.WriteAt(_last, subject, headers, persistedPayload, timestamp); } + TrackMessage(_last, subject, headers.Length, payload.Length, timestamp, _activeBlock!.BlockId); + _generation++; + // Go: filestore.go:4443 (setupWriteCache) — record write in bounded cache manager. _writeCache.TrackWrite(_activeBlock!.BlockId, headers.Length + persistedPayload.Length); @@ -419,8 +433,8 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable /// public ulong Purge() { - var count = (ulong)_messages.Count; - _messages.Clear(); + var count = (ulong)_meta.Count; + _meta.Clear(); _lastSequenceBySubject.Clear(); _generation++; _last = 0; @@ -457,10 +471,10 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable ulong candidateCount = 0; for (var current = _messageCount == 0 ? 0UL : _first; current != 0 && current <= upperBound; current++) { - if (!_messages.TryGetValue(current, out var message)) + if (!_meta.TryGetValue(current, out var meta)) continue; - if (!SubjectMatchesFilter(message.Subject, subject)) + if (!SubjectMatchesFilter(meta.Subject, subject)) continue; candidateCount++; @@ -479,10 +493,10 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable ulong removed = 0; for (var current = _messageCount == 0 ? 0UL : _first; current != 0 && current <= upperBound && removed < targetRemoveCount; current++) { - if (!_messages.TryGetValue(current, out var message)) + if (!_meta.TryGetValue(current, out var meta2)) continue; - if (!SubjectMatchesFilter(message.Subject, subject)) + if (!SubjectMatchesFilter(meta2.Subject, subject)) continue; if (RemoveTrackedMessage(current, preserveHighWaterMark: true)) @@ -509,7 +523,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable if (seq == 0) return 0; - var toRemove = _messages.Keys.Where(k => k < seq).ToArray(); + var toRemove = _meta.Keys.Where(k => k < seq).ToArray(); if (toRemove.Length == 0) return 0; @@ -521,7 +535,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable _generation++; - if (_messages.Count == 0) + if (_meta.Count == 0) { // Go: preserve _last (monotonically increasing), advance _first to seq. // Compact(seq) removes everything < seq; the new first is seq. @@ -546,7 +560,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable if (seq == 0) { // Truncate to nothing. - _messages.Clear(); + _meta.Clear(); _lastSequenceBySubject.Clear(); _generation++; _last = 0; @@ -559,7 +573,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable return; } - var toRemove = _messages.Keys.Where(k => k > seq).ToArray(); + var toRemove = _meta.Keys.Where(k => k > seq).ToArray(); foreach (var s in toRemove) { RemoveTrackedMessage(s, preserveHighWaterMark: false); @@ -587,12 +601,18 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable public ulong GetSeqFromTime(DateTime t) { var utc = t.Kind == DateTimeKind.Utc ? t : t.ToUniversalTime(); - var match = _messages.Values - .Where(m => m.TimestampUtc >= utc) - .OrderBy(m => m.Sequence) - .FirstOrDefault(); + var targetNs = new DateTimeOffset(utc).ToUnixTimeMilliseconds() * 1_000_000L; + ulong? matchSeq = null; + foreach (var kv in _meta) + { + if (kv.Value.TimestampNs >= targetNs) + { + if (matchSeq is null || kv.Key < matchSeq.Value) + matchSeq = kv.Key; + } + } - return match?.Sequence ?? _last + 1; + return matchSeq ?? _last + 1; } /// @@ -601,7 +621,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable /// /// Optimized: uses block-range binary search to skip blocks whose sequence /// range is entirely below . For each candidate block, - /// messages already cached in _messages are filtered directly. + /// messages already cached in _meta are filtered directly. /// Reference: golang/nats-server/server/filestore.go:3191 (FilteredState). /// public SimpleState FilteredState(ulong seq, string subject) @@ -614,12 +634,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable ulong first = 0; ulong last = 0; - // Collect candidates from _messages that fall in the block range. - // _messages is the authoritative in-memory store; blocks are on-disk backing. - // We iterate _messages sorted by key and filter by sequence + subject. - // The binary block search only tells us where to START looking; the _messages + // Collect candidates from _meta that fall in the block range. + // _meta is the authoritative in-memory index; blocks are on-disk backing. + // We iterate _meta sorted by key and filter by sequence + subject. + // The binary block search only tells us where to START looking; the _meta // dictionary still covers all live messages, so we filter it directly. - foreach (var kv in _messages) + foreach (var kv in _meta) { var msgSeq = kv.Key; if (msgSeq < seq) @@ -705,27 +725,29 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable { var result = new Dictionary(StringComparer.Ordinal); - foreach (var msg in _messages.Values) + foreach (var kv in _meta) { - if (!string.IsNullOrEmpty(filterSubject) && !SubjectMatchesFilter(msg.Subject, filterSubject)) + var seq = kv.Key; + var meta = kv.Value; + if (!string.IsNullOrEmpty(filterSubject) && !SubjectMatchesFilter(meta.Subject, filterSubject)) continue; - if (result.TryGetValue(msg.Subject, out var existing)) + if (result.TryGetValue(meta.Subject, out var existing)) { - result[msg.Subject] = new SimpleState + result[meta.Subject] = new SimpleState { Msgs = existing.Msgs + 1, - First = Math.Min(existing.First == 0 ? msg.Sequence : existing.First, msg.Sequence), - Last = Math.Max(existing.Last, msg.Sequence), + First = Math.Min(existing.First == 0 ? seq : existing.First, seq), + Last = Math.Max(existing.Last, seq), }; } else { - result[msg.Subject] = new SimpleState + result[meta.Subject] = new SimpleState { Msgs = 1, - First = msg.Sequence, - Last = msg.Sequence, + First = seq, + Last = seq, }; } } @@ -742,13 +764,13 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable { var result = new Dictionary(StringComparer.Ordinal); - foreach (var msg in _messages.Values) + foreach (var meta in _meta.Values) { - if (!string.IsNullOrEmpty(filterSubject) && !SubjectMatchesFilter(msg.Subject, filterSubject)) + if (!string.IsNullOrEmpty(filterSubject) && !SubjectMatchesFilter(meta.Subject, filterSubject)) continue; - result.TryGetValue(msg.Subject, out var count); - result[msg.Subject] = count + 1; + result.TryGetValue(meta.Subject, out var count); + result[meta.Subject] = count + 1; } return result; @@ -764,13 +786,13 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable FastState(ref state); // Populate deleted sequences: sequences in [firstSeq, lastSeq] that are - // not present in _messages. + // not present in _meta. if (state.FirstSeq > 0 && state.LastSeq >= state.FirstSeq) { var deletedList = new List(); for (var s = state.FirstSeq; s <= state.LastSeq; s++) { - if (!_messages.ContainsKey(s)) + if (!_meta.ContainsKey(s)) deletedList.Add(s); } @@ -783,10 +805,10 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable // Populate per-subject counts. var subjectCounts = new Dictionary(StringComparer.Ordinal); - foreach (var msg in _messages.Values) + foreach (var meta in _meta.Values) { - subjectCounts.TryGetValue(msg.Subject, out var cnt); - subjectCounts[msg.Subject] = cnt + 1; + subjectCounts.TryGetValue(meta.Subject, out var cnt); + subjectCounts[meta.Subject] = cnt + 1; } state.NumSubjects = subjectCounts.Count; state.Subjects = subjectCounts.Count > 0 ? subjectCounts : null; @@ -820,25 +842,28 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable { var firstSeq = _firstSeq; state.FirstSeq = firstSeq; - state.FirstTime = _messages.TryGetValue(firstSeq, out var firstMsg) ? firstMsg.TimestampUtc : default; + state.FirstTime = _meta.TryGetValue(firstSeq, out var firstMeta) + ? DateTimeOffset.FromUnixTimeMilliseconds(firstMeta.TimestampNs / 1_000_000).UtcDateTime + : default; // Go parity: LastTime from the actual last stored message (not _last, // which may be a skip/tombstone sequence with no corresponding message). - if (_messages.TryGetValue(_last, out var lastMsg)) - state.LastTime = lastMsg.TimestampUtc; - else if (_messages.Count > 0) + if (_meta.TryGetValue(_last, out var lastMeta)) + state.LastTime = DateTimeOffset.FromUnixTimeMilliseconds(lastMeta.TimestampNs / 1_000_000).UtcDateTime; + else if (_meta.Count > 0) { // _last is a skip — use the highest actual message time. - var actualLast = _messages.Keys.Max(); - state.LastTime = _messages[actualLast].TimestampUtc; + var actualLast = _meta.Keys.Max(); + var actualMeta = _meta[actualLast]; + state.LastTime = DateTimeOffset.FromUnixTimeMilliseconds(actualMeta.TimestampNs / 1_000_000).UtcDateTime; } - // Go parity: NumDeleted = gaps between firstSeq and lastSeq not in _messages. + // Go parity: NumDeleted = gaps between firstSeq and lastSeq not in _meta. // Reference: filestore.go — FastState sets state.NumDeleted. if (_last >= firstSeq) { var span = _last - firstSeq + 1; - var liveCount = (ulong)_messages.Count; + var liveCount = (ulong)_meta.Count; state.NumDeleted = span > liveCount ? (int)(span - liveCount) : 0; } else @@ -846,27 +871,60 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable } } - private void TrackMessage(StoredMessage message) + /// + /// Materializes a full from block storage using + /// the lightweight metadata index. Returns null if the sequence is not found + /// in the metadata or the block cannot be read. + /// + private StoredMessage? MaterializeMessage(ulong seq) { - _messages[message.Sequence] = message; - _lastSequenceBySubject[message.Subject] = message.Sequence; + if (!_meta.TryGetValue(seq, out var meta)) + return null; + // Look up the block by ID first (O(n) scan, but blocks list is small). + // Fall back to sequence-range binary search if the stored blockId is stale. + var block = _blocks.Find(b => b.BlockId == meta.BlockId) ?? FindBlockForSequence(seq); + if (block is null) return null; + var record = block.Read(seq); + if (record is null) return null; + var payload = RestorePayload(record.Payload.Span); + return new StoredMessage + { + Sequence = seq, + Subject = meta.Subject, + RawHeaders = record.Headers, + Payload = payload, + TimestampUtc = DateTimeOffset.FromUnixTimeMilliseconds(meta.TimestampNs / 1_000_000).UtcDateTime, + }; + } + + private void TrackMessage(ulong sequence, string subject, int headerLength, int payloadLength, long timestampNs, int blockId) + { + _meta[sequence] = new MessageMeta + { + BlockId = blockId, + Subject = subject, + PayloadLength = payloadLength, + HeaderLength = headerLength, + TimestampNs = timestampNs, + }; + _lastSequenceBySubject[subject] = sequence; _messageCount++; - _totalBytes += (ulong)(message.RawHeaders.Length + message.Payload.Length); + _totalBytes += (ulong)(headerLength + payloadLength); if (_messageCount == 1) { - _first = message.Sequence; - _firstSeq = message.Sequence; + _first = sequence; + _firstSeq = sequence; } } private bool RemoveTrackedMessage(ulong sequence, bool preserveHighWaterMark) { - if (!_messages.Remove(sequence, out var message)) + if (!_meta.Remove(sequence, out var meta)) return false; _messageCount--; - _totalBytes -= (ulong)(message.RawHeaders.Length + message.Payload.Length); - UpdateLastSequenceForSubject(message.Subject, sequence); + _totalBytes -= (ulong)(meta.HeaderLength + meta.PayloadLength); + UpdateLastSequenceForSubject(meta.Subject, sequence); if (_messageCount == 0) { @@ -891,7 +949,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable private void AdvanceFirstSequence(ulong start) { var candidate = start; - while (!_messages.ContainsKey(candidate) && candidate <= _last) + while (!_meta.ContainsKey(candidate) && candidate <= _last) candidate++; if (candidate <= _last) @@ -912,7 +970,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable for (var seq = startExclusive - 1; ; seq--) { - if (_messages.ContainsKey(seq)) + if (_meta.ContainsKey(seq)) return seq; if (seq == 0) @@ -927,7 +985,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable for (var seq = removedSequence - 1; ; seq--) { - if (_messages.TryGetValue(seq, out var candidate) && string.Equals(candidate.Subject, subject, StringComparison.Ordinal)) + if (_meta.TryGetValue(seq, out var candidate) && string.Equals(candidate.Subject, subject, StringComparison.Ordinal)) { _lastSequenceBySubject[subject] = seq; return; @@ -948,8 +1006,21 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable _firstSeq = 0; _first = 0; - foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value)) - TrackMessage(message); + // Re-derive counters from the existing _meta entries without re-inserting. + foreach (var kv in _meta.OrderBy(kv => kv.Key)) + { + var seq = kv.Key; + var meta = kv.Value; + _lastSequenceBySubject[meta.Subject] = seq; + _messageCount++; + _totalBytes += (ulong)(meta.HeaderLength + meta.PayloadLength); + + if (_messageCount == 1) + { + _first = seq; + _firstSeq = seq; + } + } if (_messageCount == 0 && _last > 0) _first = _last + 1; @@ -991,9 +1062,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable // Cache miss: count matching messages and cache the result. ulong count = 0; - foreach (var msg in _messages.Values) + foreach (var meta in _meta.Values) { - if (SubjectMatchesFilter(msg.Subject, filter ?? string.Empty)) + if (SubjectMatchesFilter(meta.Subject, filter ?? string.Empty)) count++; } @@ -1142,13 +1213,21 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable /// private void RewriteBlocks() { + // Materialize all messages before disposing blocks (since materialization reads from blocks). + var messages = _meta.Keys + .OrderBy(seq => seq) + .Select(seq => MaterializeMessage(seq)) + .Where(m => m is not null) + .Cast() + .ToList(); + DisposeAllBlocks(); CleanBlockFiles(); - _last = _messages.Count == 0 ? 0UL : _messages.Keys.Max(); - RebuildIndexesFromMessages(); + _meta.Clear(); + _last = messages.Count == 0 ? 0UL : messages[^1].Sequence; - foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value)) + foreach (var message in messages) { var persistedPayload = TransformForPersist(message.Payload.Span); var timestamp = new DateTimeOffset(message.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L; @@ -1164,9 +1243,20 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable _activeBlock!.WriteAt(message.Sequence, message.Subject, message.RawHeaders, persistedPayload, timestamp); } + _meta[message.Sequence] = new MessageMeta + { + BlockId = _activeBlock!.BlockId, + Subject = message.Subject, + PayloadLength = message.Payload.Length, + HeaderLength = message.RawHeaders.Length, + TimestampNs = timestamp, + }; + if (_activeBlock!.IsSealed) RotateBlock(); } + + RebuildIndexesFromMessages(); } // ------------------------------------------------------------------------- @@ -1225,8 +1315,6 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable _activeBlock = lastBlock; } - PruneExpired(DateTime.UtcNow); - // After recovery, sync _last from skip-sequence high-water marks. // SkipMsg/SkipMsgs write tombstone records with empty subject — these // intentionally advance _last without storing a live message. We must @@ -1255,7 +1343,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable } } - // Sync _first from _messages; if empty, set to _last+1 (watermark). + // Prune expired messages AFTER establishing _last from blocks/skips. + // If all messages expire, PruneExpiredLinear resets _last to 0. + PruneExpired(DateTime.UtcNow); + + // Sync _first from _meta; if empty, set to _last+1 (watermark). RebuildIndexesFromMessages(); } @@ -1278,30 +1370,27 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable if (record is null) continue; // Deleted or not present. - // The payload stored in the block is the transformed (compressed/encrypted) payload. - // We need to reverse-transform it to get the original plaintext. + // Compute the original payload length for metadata tracking. // InvalidDataException (e.g., wrong key) propagates to the caller. var originalPayload = RestorePayload(record.Payload.Span); - var message = new StoredMessage + _meta[record.Sequence] = new MessageMeta { - Sequence = record.Sequence, + BlockId = block.BlockId, Subject = record.Subject, - RawHeaders = record.Headers, - Payload = originalPayload, - TimestampUtc = DateTimeOffset.FromUnixTimeMilliseconds(record.Timestamp / 1_000_000L).UtcDateTime, + PayloadLength = originalPayload.Length, + HeaderLength = record.Headers.Length, + TimestampNs = record.Timestamp, }; - _messages[message.Sequence] = message; - if (message.Sequence > _last) - _last = message.Sequence; + if (record.Sequence > _last) + _last = record.Sequence; // Go: re-register unexpired TTLs in the wheel after recovery. // Reference: golang/nats-server/server/filestore.go — recoverMsgs, TTL re-registration. if (_options.MaxAgeMs > 0) { - var msgTs = new DateTimeOffset(message.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L; - RegisterTtl(message.Sequence, msgTs, (long)_options.MaxAgeMs * 1_000_000L); + RegisterTtl(record.Sequence, record.Timestamp, (long)_options.MaxAgeMs * 1_000_000L); } } } @@ -1366,22 +1455,40 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable return; } - // Add to the in-memory cache. + // Write all messages to fresh blocks, then populate _meta. foreach (var (seq, subject, payload, ts) in legacyMessages) { - _messages[seq] = new StoredMessage - { - Sequence = seq, - Subject = subject, - Payload = payload, - TimestampUtc = ts, - }; if (seq > _last) _last = seq; + + var persistedPayload = TransformForPersist(payload); + var timestampNs = new DateTimeOffset(ts).ToUnixTimeMilliseconds() * 1_000_000L; + + EnsureActiveBlock(); + try + { + _activeBlock!.WriteAt(seq, subject, ReadOnlyMemory.Empty, persistedPayload, timestampNs); + } + catch (InvalidOperationException) + { + RotateBlock(); + _activeBlock!.WriteAt(seq, subject, ReadOnlyMemory.Empty, persistedPayload, timestampNs); + } + + _meta[seq] = new MessageMeta + { + BlockId = _activeBlock!.BlockId, + Subject = subject, + PayloadLength = payload.Length, + HeaderLength = 0, + TimestampNs = timestampNs, + }; + + if (_activeBlock!.IsSealed) + RotateBlock(); } - // Write all messages to fresh blocks. - RewriteBlocks(); + RebuildIndexesFromMessages(); // Delete the legacy files. File.Delete(jsonlPath); @@ -1467,9 +1574,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable if (_options.MaxAgeMs <= 0) return; - var cutoff = nowUtc.AddMilliseconds(-_options.MaxAgeMs); - var expired = _messages - .Where(kv => kv.Value.TimestampUtc < cutoff) + var cutoffNs = new DateTimeOffset(nowUtc.AddMilliseconds(-_options.MaxAgeMs)).ToUnixTimeMilliseconds() * 1_000_000L; + var expired = _meta + .Where(kv => kv.Value.TimestampNs < cutoffNs) .Select(kv => kv.Key) .ToArray(); @@ -1479,8 +1586,20 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable foreach (var sequence in expired) { RemoveTrackedMessage(sequence, preserveHighWaterMark: true); + DeleteInBlock(sequence); } - RewriteBlocks(); + + // When all messages are expired, reset sequence pointers to 0. + // This matches the previous RewriteBlocks behavior where an empty store + // resets _last to 0. + if (_meta.Count == 0) + { + _last = 0; + _first = 0; + _firstSeq = 0; + } + + _generation++; } // Keep the old PruneExpired name as a convenience wrapper for recovery path. @@ -1783,7 +1902,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable // When seq is 0, auto-assign next sequence. var skipSeq = seq == 0 ? _last + 1 : seq; _last = skipSeq; - // Do NOT add to _messages — it is a skip (tombstone). + // Do NOT add to _meta — it is a skip (tombstone). // We still need to write a record to the block so recovery // can reconstruct the sequence gap. Use an empty subject sentinel. EnsureActiveBlock(); @@ -1802,7 +1921,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable // After a skip, if there are no real messages, the next real first // would be skipSeq+1. Track this so FastState reports correctly. - if (_messages.Count == 0) + if (_meta.Count == 0) _first = skipSeq + 1; return skipSeq; @@ -1837,46 +1956,45 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable /// Loads a message by exact sequence number into the optional reusable container /// . Throws if not found. /// - /// Fast path: O(1) hash lookup in _messages (all live messages are cached). + /// Fast path: O(1) hash lookup in _meta (all live messages are cached). /// Fallback: binary-search _blocks by sequence range (O(log n) blocks) to /// locate the containing block, then read from disk. This covers cases where a - /// future memory-pressure eviction removes entries from _messages. + /// future memory-pressure eviction removes entries from _meta. /// Reference: golang/nats-server/server/filestore.go:8308 (LoadMsg). /// public StoreMsg LoadMsg(ulong seq, StoreMsg? sm) { - // Fast path: O(1) in-memory lookup. - if (_messages.TryGetValue(seq, out var stored)) + var stored = MaterializeMessage(seq); + if (stored is null) { + // Fallback: binary-search blocks by sequence range (O(log n)). + // Handles cases where _meta does not contain the sequence. + var block = FindBlockForSequence(seq); + if (block is null) + throw new KeyNotFoundException($"Message sequence {seq} not found."); + + var record = block.Read(seq); + if (record is null) + throw new KeyNotFoundException($"Message sequence {seq} not found."); + sm ??= new StoreMsg(); sm.Clear(); - sm.Subject = stored.Subject; - sm.Header = stored.RawHeaders.Length > 0 ? stored.RawHeaders.ToArray() : null; - sm.Data = stored.Payload.Length > 0 ? stored.Payload.ToArray() : null; - sm.Sequence = stored.Sequence; - sm.Timestamp = new DateTimeOffset(stored.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L; + sm.Subject = record.Subject; + sm.Header = record.Headers.Length > 0 ? record.Headers.ToArray() : null; + var originalPayload = RestorePayload(record.Payload.Span); + sm.Data = originalPayload.Length > 0 ? originalPayload : null; + sm.Sequence = record.Sequence; + sm.Timestamp = record.Timestamp; return sm; } - // Fallback: binary-search blocks by sequence range (O(log n)). - // Handles cases where _messages does not contain the sequence, e.g. future - // memory-pressure eviction. FindBlockForSequence avoids an O(n) block scan. - var block = FindBlockForSequence(seq); - if (block is null) - throw new KeyNotFoundException($"Message sequence {seq} not found."); - - var record = block.Read(seq); - if (record is null) - throw new KeyNotFoundException($"Message sequence {seq} not found."); - sm ??= new StoreMsg(); sm.Clear(); - sm.Subject = record.Subject; - sm.Header = record.Headers.Length > 0 ? record.Headers.ToArray() : null; - var originalPayload = RestorePayload(record.Payload.Span); - sm.Data = originalPayload.Length > 0 ? originalPayload : null; - sm.Sequence = record.Sequence; - sm.Timestamp = record.Timestamp; + sm.Subject = stored.Subject; + sm.Header = stored.RawHeaders.Length > 0 ? stored.RawHeaders.ToArray() : null; + sm.Data = stored.Payload.Length > 0 ? stored.Payload.ToArray() : null; + sm.Sequence = stored.Sequence; + sm.Timestamp = new DateTimeOffset(stored.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L; return sm; } @@ -1917,14 +2035,21 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable /// public StoreMsg LoadLastMsg(string subject, StoreMsg? sm) { - var match = _messages.Values - .Where(m => string.IsNullOrEmpty(subject) - || SubjectMatchesFilter(m.Subject, subject)) - .MaxBy(m => m.Sequence); + ulong? bestSeq = null; + foreach (var kv in _meta) + { + if (!string.IsNullOrEmpty(subject) && !SubjectMatchesFilter(kv.Value.Subject, subject)) + continue; + if (bestSeq is null || kv.Key > bestSeq.Value) + bestSeq = kv.Key; + } - if (match is null) + if (bestSeq is null) throw new KeyNotFoundException($"No message found for subject '{subject}'."); + var match = MaterializeMessage(bestSeq.Value) + ?? throw new KeyNotFoundException($"No message found for subject '{subject}'."); + sm ??= new StoreMsg(); sm.Clear(); sm.Subject = match.Subject; @@ -1943,26 +2068,31 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable /// public (StoreMsg Msg, ulong Skip) LoadNextMsg(string filter, bool wc, ulong start, StoreMsg? sm) { - var match = _messages - .Where(kv => kv.Key >= start) - .Where(kv => string.IsNullOrEmpty(filter) || SubjectMatchesFilter(kv.Value.Subject, filter)) - .OrderBy(kv => kv.Key) - .Cast?>() - .FirstOrDefault(); + ulong? bestSeq = null; + foreach (var kv in _meta) + { + if (kv.Key < start) continue; + if (!string.IsNullOrEmpty(filter) && !SubjectMatchesFilter(kv.Value.Subject, filter)) + continue; + if (bestSeq is null || kv.Key < bestSeq.Value) + bestSeq = kv.Key; + } - if (match is null) + if (bestSeq is null) throw new KeyNotFoundException($"No message found at or after seq {start} matching filter '{filter}'."); - var found = match.Value; - var skip = found.Key > start ? found.Key - start : 0UL; + var found = MaterializeMessage(bestSeq.Value) + ?? throw new KeyNotFoundException($"No message found at or after seq {start} matching filter '{filter}'."); + + var skip = bestSeq.Value > start ? bestSeq.Value - start : 0UL; sm ??= new StoreMsg(); sm.Clear(); - sm.Subject = found.Value.Subject; - sm.Header = found.Value.RawHeaders.Length > 0 ? found.Value.RawHeaders.ToArray() : null; - sm.Data = found.Value.Payload.Length > 0 ? found.Value.Payload.ToArray() : null; - sm.Sequence = found.Key; - sm.Timestamp = new DateTimeOffset(found.Value.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L; + sm.Subject = found.Subject; + sm.Header = found.RawHeaders.Length > 0 ? found.RawHeaders.ToArray() : null; + sm.Data = found.Payload.Length > 0 ? found.Payload.ToArray() : null; + sm.Sequence = found.Sequence; + sm.Timestamp = new DateTimeOffset(found.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L; return (sm, skip); } @@ -1974,7 +2104,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable public ulong[] AllLastSeqs() { var lastPerSubject = new Dictionary(StringComparer.Ordinal); - foreach (var kv in _messages) + foreach (var kv in _meta) { var subj = kv.Value.Subject; if (!lastPerSubject.TryGetValue(subj, out var existing) || kv.Key > existing) @@ -1996,7 +2126,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable { var lastPerSubject = new Dictionary(StringComparer.Ordinal); - foreach (var kv in _messages) + foreach (var kv in _meta) { var seq = kv.Key; if (maxSeq > 0 && seq > maxSeq) @@ -2027,9 +2157,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable /// public string SubjectForSeq(ulong seq) { - if (!_messages.TryGetValue(seq, out var stored)) + if (!_meta.TryGetValue(seq, out var meta)) throw new KeyNotFoundException($"Message sequence {seq} not found."); - return stored.Subject; + return meta.Subject; } /// @@ -2041,7 +2171,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable /// public (ulong Total, ulong ValidThrough) NumPending(ulong sseq, string filter, bool lastPerSubject) { - var candidates = _messages + var candidates = _meta .Where(kv => kv.Key >= sseq) .Where(kv => string.IsNullOrEmpty(filter) || SubjectMatchesFilter(kv.Value.Subject, filter)) .ToList(); @@ -2087,22 +2217,6 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable var headers = hdr is { Length: > 0 } ? hdr : []; var payload = msg ?? []; var persistedPayload = TransformForPersist(payload); - // Recover UTC DateTime from caller-supplied Unix nanosecond timestamp. - var storedUtc = DateTimeOffset.FromUnixTimeMilliseconds(ts / 1_000_000L).UtcDateTime; - - var stored = new StoredMessage - { - Sequence = seq, - Subject = subject, - RawHeaders = headers, - Payload = payload, - TimestampUtc = storedUtc, - }; - TrackMessage(stored); - _generation++; - - // Go: update _last to the high-water mark — do not decrement. - _last = Math.Max(_last, seq); // Register TTL using the caller-supplied timestamp and TTL. var effectiveTtlNs = ttl > 0 ? ttl : (_options.MaxAgeMs > 0 ? (long)_options.MaxAgeMs * 1_000_000L : 0L); @@ -2119,6 +2233,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable _activeBlock!.WriteAt(seq, subject, headers, persistedPayload, ts); } + TrackMessage(seq, subject, headers.Length, payload.Length, ts, _activeBlock!.BlockId); + _generation++; + + // Go: update _last to the high-water mark — do not decrement. + _last = Math.Max(_last, seq); + // Go: filestore.go:4443 (setupWriteCache) — record write in bounded cache manager. _writeCache.TrackWrite(_activeBlock!.BlockId, headers.Length + persistedPayload.Length); @@ -2137,20 +2257,24 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable if (start == 0) throw new KeyNotFoundException("No message found before seq 0."); - var first = _messages.Count > 0 ? _messages.Keys.Min() : 1UL; + var first = _meta.Count > 0 ? _meta.Keys.Min() : 1UL; for (var seq = start - 1; seq >= first && seq <= _last; seq--) { - if (_messages.TryGetValue(seq, out var stored)) + if (_meta.ContainsKey(seq)) { - sm ??= new StoreMsg(); - sm.Clear(); - sm.Subject = stored.Subject; - sm.Header = stored.RawHeaders.Length > 0 ? stored.RawHeaders.ToArray() : null; - sm.Data = stored.Payload.Length > 0 ? stored.Payload.ToArray() : null; - sm.Sequence = stored.Sequence; - sm.Timestamp = new DateTimeOffset(stored.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L; - return sm; + var stored = MaterializeMessage(seq); + if (stored is not null) + { + sm ??= new StoreMsg(); + sm.Clear(); + sm.Subject = stored.Subject; + sm.Header = stored.RawHeaders.Length > 0 ? stored.RawHeaders.ToArray() : null; + sm.Data = stored.Payload.Length > 0 ? stored.Payload.ToArray() : null; + sm.Sequence = stored.Sequence; + sm.Timestamp = new DateTimeOffset(stored.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L; + return sm; + } } // Prevent underflow on ulong subtraction. @@ -2215,7 +2339,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable // Enforce per-subject limits immediately after config change. if (_options.MaxMsgsPerSubject > 0) { - var subjects = _messages.Values.Select(m => m.Subject).Distinct(StringComparer.Ordinal).ToList(); + var subjects = _meta.Values.Select(m => m.Subject).Distinct(StringComparer.Ordinal).ToList(); foreach (var subject in subjects) EnforceMaxMsgsPerSubject(subject); } @@ -2232,7 +2356,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable if (limit <= 0) return; - var subjectMsgs = _messages + var subjectMsgs = _meta .Where(kv => string.Equals(kv.Value.Subject, subject, StringComparison.Ordinal)) .OrderBy(kv => kv.Key) .ToList(); @@ -2240,16 +2364,16 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable while (subjectMsgs.Count > limit) { var oldest = subjectMsgs[0]; - _totalBytes -= (ulong)oldest.Value.Payload.Length; + _totalBytes -= (ulong)(oldest.Value.HeaderLength + oldest.Value.PayloadLength); _messageCount--; - _messages.Remove(oldest.Key); + _meta.Remove(oldest.Key); DeleteInBlock(oldest.Key); subjectMsgs.RemoveAt(0); _generation++; } - if (_messages.Count > 0) - _firstSeq = _messages.Keys.Min(); + if (_meta.Count > 0) + _firstSeq = _meta.Keys.Min(); } /// diff --git a/src/NATS.Server/JetStream/Storage/MsgBlock.cs b/src/NATS.Server/JetStream/Storage/MsgBlock.cs index 06361c0..ef30d7d 100644 --- a/src/NATS.Server/JetStream/Storage/MsgBlock.cs +++ b/src/NATS.Server/JetStream/Storage/MsgBlock.cs @@ -315,6 +315,10 @@ public sealed class MsgBlock : IDisposable _index[sequence] = (offset, written); + // If this sequence was previously soft-deleted, clear the deletion marker + // so that subsequent Read calls return the new record rather than null. + _deleted.Remove(sequence); + // Go: cache populated lazily on read, not eagerly on write. // Reads that miss _cache flush pending buf to disk and decode from there. From 82ab02a612d64a7b45862b305b683a6b5e21e877 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 13 Mar 2026 15:35:59 -0400 Subject: [PATCH 3/3] docs: refresh benchmark comparison after JS async publish optimization --- benchmarks_comparison.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/benchmarks_comparison.md b/benchmarks_comparison.md index 424fec5..d9b0795 100644 --- a/benchmarks_comparison.md +++ b/benchmarks_comparison.md @@ -1,8 +1,8 @@ # Go vs .NET NATS Server — Benchmark Comparison -Benchmark run: 2026-03-13 12:08 PM America/Indiana/Indianapolis. Both servers ran on the same machine using the benchmark project README command (`dotnet test tests/NATS.Server.Benchmark.Tests --filter "Category=Benchmark" -v normal --logger "console;verbosity=detailed"`). Test parallelization remained disabled inside the benchmark assembly. +Benchmark run: 2026-03-13 America/Indiana/Indianapolis. Both servers ran on the same machine using the benchmark project README command (`dotnet test tests/NATS.Server.Benchmark.Tests -c Release --filter "Category=Benchmark" -v normal --logger "console;verbosity=detailed"`). Test parallelization remained disabled inside the benchmark assembly. -**Environment:** Apple M4, .NET SDK 10.0.101, benchmark README command run in the benchmark project's default `Debug` configuration, Go toolchain installed, Go reference server built from `golang/nats-server/`. +**Environment:** Apple M4, .NET SDK 10.0.101, Release build, Go toolchain installed, Go reference server built from `golang/nats-server/`. --- --- @@ -58,9 +58,9 @@ Benchmark run: 2026-03-13 12:08 PM America/Indiana/Indianapolis. Both servers ra | Mode | Payload | Storage | Go msg/s | .NET msg/s | Ratio (.NET/Go) | |------|---------|---------|----------|------------|-----------------| | Synchronous | 16 B | Memory | 14,812 | 12,134 | 0.82x | -| Async (batch) | 128 B | File | 148,156 | 57,479 | 0.39x | +| Async (batch) | 128 B | File | 174,705 | 52,350 | 0.30x | -> **Note:** Async file-store publish remains well below parity at 0.39x, but it is still materially better than the older 0.30x snapshot that motivated this FileStore round. +> **Note:** Async file-store publish improved ~10% (47K→52K) after hot-path optimizations: cached state properties, single stream lookup, _messageIndexes removal, hand-rolled pub-ack formatter, exponential flush backoff, lazy StoredMessage materialization. Still storage-bound at 0.30x Go. ---