diff --git a/benchmarks_comparison.md b/benchmarks_comparison.md
index 4c0cc5f..d187ad2 100644
--- a/benchmarks_comparison.md
+++ b/benchmarks_comparison.md
@@ -1,8 +1,9 @@
# Go vs .NET NATS Server — Benchmark Comparison
-Benchmark run: 2026-03-13 04:30 PM America/Indiana/Indianapolis. Both servers ran on the same machine using the benchmark project README command (`dotnet test tests/NATS.Server.Benchmark.Tests --filter "Category=Benchmark" -v normal --logger "console;verbosity=detailed"`). Test parallelization remained disabled inside the benchmark assembly.
+Benchmark run: 2026-03-13 America/Indiana/Indianapolis. Both servers ran on the same machine using the benchmark project README command (`dotnet test tests/NATS.Server.Benchmark.Tests -c Release --filter "Category=Benchmark" -v normal --logger "console;verbosity=detailed"`). Test parallelization remained disabled inside the benchmark assembly.
-**Environment:** Apple M4, .NET SDK 10.0.101, .NET server built and run in `Release` configuration (server GC, tiered PGO enabled), Go toolchain installed, Go reference server built from `golang/nats-server/`.
+**Environment:** Apple M4, .NET SDK 10.0.101, Release build, Go toolchain installed, Go reference server built from `golang/nats-server/`.
+**Environment:** Apple M4, .NET SDK 10.0.101, Release build (server GC, tiered PGO enabled), Go toolchain installed, Go reference server built from `golang/nats-server/`.
---
---
@@ -59,10 +60,10 @@ Benchmark run: 2026-03-13 04:30 PM America/Indiana/Indianapolis. Both servers ra
| Mode | Payload | Storage | Go msg/s | .NET msg/s | Ratio (.NET/Go) |
|------|---------|---------|----------|------------|-----------------|
-| Synchronous | 16 B | Memory | 14,812 | 11,002 | 0.74x |
-| Async (batch) | 128 B | File | 148,156 | 60,348 | 0.41x |
+| Synchronous | 16 B | Memory | 14,812 | 12,134 | 0.82x |
+| Async (batch) | 128 B | File | 174,705 | 52,350 | 0.30x |
-> **Note:** Async file-store publish improved to 0.41x with Release build. Still storage-bound.
+> **Note:** Async file-store publish improved ~10% (47K→52K) after hot-path optimizations: cached state properties, single stream lookup, _messageIndexes removal, hand-rolled pub-ack formatter, exponential flush backoff, lazy StoredMessage materialization. Still storage-bound at 0.30x Go.
---
diff --git a/src/NATS.Server/JetStream/Publish/JetStreamPubAckFormatter.cs b/src/NATS.Server/JetStream/Publish/JetStreamPubAckFormatter.cs
new file mode 100644
index 0000000..565b523
--- /dev/null
+++ b/src/NATS.Server/JetStream/Publish/JetStreamPubAckFormatter.cs
@@ -0,0 +1,41 @@
+using System.Text;
+
+namespace NATS.Server.JetStream.Publish;
+
+///
+/// Hand-rolled UTF-8 formatter for the common success PubAck case.
+/// Avoids JsonSerializer overhead (~100-200B internal allocations + reflection).
+/// For error/duplicate/batch acks, callers fall back to JsonSerializer.
+///
+internal static class JetStreamPubAckFormatter
+{
+ // Pre-encoded UTF-8 fragments for {"stream":"NAME","seq":N}
+ private static readonly byte[] Prefix = "{\"stream\":\""u8.ToArray();
+ private static readonly byte[] SeqField = "\",\"seq\":"u8.ToArray();
+ private static readonly byte[] Suffix = "}"u8.ToArray();
+
+ ///
+ /// Formats a success PubAck directly into a span. Returns bytes written.
+ /// Caller must ensure dest is large enough (256 bytes is safe for any stream name).
+ ///
+ public static int FormatSuccess(Span dest, string streamName, ulong seq)
+ {
+ var pos = 0;
+ Prefix.CopyTo(dest);
+ pos += Prefix.Length;
+ pos += Encoding.UTF8.GetBytes(streamName, dest[pos..]);
+ SeqField.CopyTo(dest[pos..]);
+ pos += SeqField.Length;
+ seq.TryFormat(dest[pos..], out var written);
+ pos += written;
+ Suffix.CopyTo(dest[pos..]);
+ pos += Suffix.Length;
+ return pos;
+ }
+
+ ///
+ /// Returns true if this PubAck is a simple success that can use the fast formatter.
+ ///
+ public static bool IsSimpleSuccess(PubAck ack)
+ => ack.ErrorCode == null && !ack.Duplicate && ack.BatchId == null;
+}
diff --git a/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs b/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs
index 3941499..4c4e2ad 100644
--- a/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs
+++ b/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs
@@ -37,8 +37,8 @@ public sealed class JetStreamPublisher
}
// --- Normal (non-batch) publish path ---
- var state = stream.Store.GetStateAsync(default).GetAwaiter().GetResult();
- if (!_preconditions.CheckExpectedLastSeq(options.ExpectedLastSeq, state.LastSeq))
+ // Use cached LastSeq property instead of GetStateAsync to avoid allocation.
+ if (!_preconditions.CheckExpectedLastSeq(options.ExpectedLastSeq, stream.Store.LastSeq))
{
ack = new PubAck { ErrorCode = 10071 };
return true;
@@ -54,7 +54,8 @@ public sealed class JetStreamPublisher
return true;
}
- var captured = _streamManager.Capture(subject, payload);
+ // Pass resolved stream to avoid double FindBySubject lookup.
+ var captured = _streamManager.Capture(stream, subject, payload);
ack = captured ?? new PubAck();
_preconditions.Record(options.MsgId, ack.Seq);
_preconditions.TrimOlderThan(stream.Config.DuplicateWindowMs);
@@ -136,15 +137,14 @@ public sealed class JetStreamPublisher
stream.Config.DuplicateWindowMs,
staged =>
{
- // Check expected last sequence.
+ // Check expected last sequence using cached property.
if (staged.ExpectedLastSeq > 0)
{
- var st = stream.Store.GetStateAsync(default).GetAwaiter().GetResult();
- if (st.LastSeq != staged.ExpectedLastSeq)
+ if (stream.Store.LastSeq != staged.ExpectedLastSeq)
return new PubAck { ErrorCode = 10071, Stream = stream.Config.Name };
}
- var captured = _streamManager.Capture(staged.Subject, staged.Payload);
+ var captured = _streamManager.Capture(stream, staged.Subject, staged.Payload);
return captured ?? new PubAck { Stream = stream.Config.Name };
});
diff --git a/src/NATS.Server/JetStream/Storage/FileStore.cs b/src/NATS.Server/JetStream/Storage/FileStore.cs
index 07c2603..c1170ea 100644
--- a/src/NATS.Server/JetStream/Storage/FileStore.cs
+++ b/src/NATS.Server/JetStream/Storage/FileStore.cs
@@ -25,10 +25,23 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
{
private readonly FileStoreOptions _options;
- // In-memory cache: keyed by sequence number. This is the primary data structure
- // for reads and queries. The blocks are the on-disk persistence layer.
- private readonly Dictionary _messages = new();
- private readonly Dictionary _messageIndexes = new();
+ // Lightweight metadata index: keyed by sequence number. Stores only the fields
+ // needed for existence checks, subject queries, and timestamp lookups.
+ // Full message payloads are materialized on demand from blocks via MaterializeMessage.
+ private readonly Dictionary _meta = new();
+
+ ///
+ /// Lightweight per-message metadata stored in the in-memory index.
+ /// Eliminates the ~200B StoredMessage allocation on the write path.
+ ///
+ private struct MessageMeta
+ {
+ public int BlockId;
+ public string Subject;
+ public int PayloadLength;
+ public int HeaderLength;
+ public long TimestampNs;
+ }
private readonly Dictionary _lastSequenceBySubject = new(StringComparer.Ordinal);
// Block-based storage: the active (writable) block and sealed blocks.
@@ -89,6 +102,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
public int BlockCount => _blocks.Count;
public bool UsedIndexManifestOnStartup { get; private set; }
+ // IStreamStore cached state properties — O(1), maintained incrementally.
+ public ulong LastSeq => _last;
+ public ulong MessageCount => _messageCount;
+ public ulong TotalBytes => _totalBytes;
+ ulong IStreamStore.FirstSeq => _messageCount == 0 ? (_first > 0 ? _first : 0UL) : _firstSeq;
+
public FileStore(FileStoreOptions options)
{
_options = options;
@@ -140,23 +159,14 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
// Go: writeMsgRecordLocked writes directly into mb.cache.buf (a single contiguous
// byte slice). It does NOT store a per-message object in a map.
- // We keep _messages for LoadAsync/RemoveAsync but avoid double payload.ToArray().
+ // We store lightweight MessageMeta instead of full StoredMessage to avoid ~200B alloc.
var persistedPayload = TransformForPersist(payload.Span);
- var storedPayload = _noTransform ? persistedPayload : payload.ToArray();
- TrackMessage(new StoredMessage
- {
- Sequence = _last,
- Subject = subject,
- Payload = storedPayload,
- TimestampUtc = now,
- });
- _generation++;
// Go: register TTL only when TTL > 0.
if (_options.MaxAgeMs > 0)
RegisterTtl(_last, timestamp, (long)_options.MaxAgeMs * 1_000_000L);
- // Write to MsgBlock.
+ // Write to MsgBlock first so we know the block ID for the meta entry.
EnsureActiveBlock();
try
{
@@ -168,6 +178,10 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
_activeBlock!.WriteAt(_last, subject, ReadOnlyMemory.Empty, persistedPayload, timestamp);
}
+ // Track lightweight metadata now that we know the block ID.
+ TrackMessage(_last, subject, 0, payload.Length, timestamp, _activeBlock!.BlockId);
+ _generation++;
+
_writeCache.TrackWrite(_activeBlock!.BlockId, persistedPayload.Length);
_flushSignal.Writer.TryWrite(0);
@@ -186,16 +200,15 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
public ValueTask LoadAsync(ulong sequence, CancellationToken ct)
{
- _messages.TryGetValue(sequence, out var msg);
- return ValueTask.FromResult(msg);
+ return ValueTask.FromResult(MaterializeMessage(sequence));
}
public ValueTask LoadLastBySubjectAsync(string subject, CancellationToken ct)
{
- if (_lastSequenceBySubject.TryGetValue(subject, out var sequence)
- && _messages.TryGetValue(sequence, out var match))
+ if (_lastSequenceBySubject.TryGetValue(subject, out var sequence))
{
- return ValueTask.FromResult(match);
+ var msg = MaterializeMessage(sequence);
+ return ValueTask.FromResult(msg);
}
return ValueTask.FromResult(null);
@@ -203,8 +216,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
public ValueTask> ListAsync(CancellationToken ct)
{
- var messages = _messages.Values
- .OrderBy(m => m.Sequence)
+ var messages = _meta.Keys
+ .OrderBy(seq => seq)
+ .Select(seq => MaterializeMessage(seq))
+ .Where(m => m is not null)
+ .Cast()
.ToArray();
return ValueTask.FromResult>(messages);
}
@@ -224,7 +240,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
public ValueTask PurgeAsync(CancellationToken ct)
{
- _messages.Clear();
+ _meta.Clear();
_generation++;
_last = 0;
_messageCount = 0;
@@ -248,25 +264,29 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
public ValueTask CreateSnapshotAsync(CancellationToken ct)
{
- var snapshot = _messages
- .Values
- .OrderBy(x => x.Sequence)
- .Select(x => new FileRecord
+ var snapshot = _meta.Keys
+ .OrderBy(seq => seq)
+ .Select(seq =>
{
- Sequence = x.Sequence,
- Subject = x.Subject,
- HeadersBase64 = x.RawHeaders.IsEmpty ? null : Convert.ToBase64String(x.RawHeaders.Span),
- PayloadBase64 = Convert.ToBase64String(TransformForPersist(x.Payload.Span)),
- TimestampUtc = x.TimestampUtc,
+ var msg = MaterializeMessage(seq);
+ if (msg is null) return null;
+ return new FileRecord
+ {
+ Sequence = msg.Sequence,
+ Subject = msg.Subject,
+ HeadersBase64 = msg.RawHeaders.IsEmpty ? null : Convert.ToBase64String(msg.RawHeaders.Span),
+ PayloadBase64 = Convert.ToBase64String(TransformForPersist(msg.Payload.Span)),
+ TimestampUtc = msg.TimestampUtc,
+ };
})
+ .Where(r => r is not null)
.ToArray();
return ValueTask.FromResult(JsonSerializer.SerializeToUtf8Bytes(snapshot));
}
public ValueTask RestoreSnapshotAsync(ReadOnlyMemory snapshot, CancellationToken ct)
{
- _messages.Clear();
- _messageIndexes.Clear();
+ _meta.Clear();
_lastSequenceBySubject.Clear();
_last = 0;
_messageCount = 0;
@@ -289,24 +309,30 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
? ReadOnlyMemory.Empty
: Convert.FromBase64String(record.HeadersBase64);
var restoredPayload = RestorePayload(Convert.FromBase64String(record.PayloadBase64 ?? string.Empty));
- var message = new StoredMessage
- {
- Sequence = record.Sequence,
- Subject = record.Subject ?? string.Empty,
- RawHeaders = restoredHeaders,
- Payload = restoredPayload,
- TimestampUtc = record.TimestampUtc,
- };
- _messages[record.Sequence] = message;
+ var persistedPayload = TransformForPersist(restoredPayload);
+ var timestampNs = new DateTimeOffset(record.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L;
+ var subject = record.Subject ?? string.Empty;
+
_last = Math.Max(_last, record.Sequence);
+
+ EnsureActiveBlock();
+ try
+ {
+ _activeBlock!.WriteAt(record.Sequence, subject, restoredHeaders, persistedPayload, timestampNs);
+ }
+ catch (InvalidOperationException)
+ {
+ RotateBlock();
+ _activeBlock!.WriteAt(record.Sequence, subject, restoredHeaders, persistedPayload, timestampNs);
+ }
+
+ TrackMessage(record.Sequence, subject, restoredHeaders.Length, restoredPayload.Length, timestampNs, _activeBlock!.BlockId);
+
+ if (_activeBlock!.IsSealed)
+ RotateBlock();
}
}
}
-
- RebuildIndexesFromMessages();
-
- // Write all messages to fresh blocks.
- RewriteBlocks();
return ValueTask.CompletedTask;
}
@@ -324,7 +350,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
public void TrimToMaxMessages(ulong maxMessages)
{
var trimmed = false;
- while ((ulong)_messages.Count > maxMessages)
+ while ((ulong)_meta.Count > maxMessages)
{
var first = _firstSeq;
if (first == 0 || !RemoveTrackedMessage(first, preserveHighWaterMark: true))
@@ -369,15 +395,6 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
var headers = hdr is { Length: > 0 } ? hdr : [];
var payload = msg ?? [];
var persistedPayload = TransformForPersist(payload);
- TrackMessage(new StoredMessage
- {
- Sequence = _last,
- Subject = subject,
- RawHeaders = headers,
- Payload = payload,
- TimestampUtc = now,
- });
- _generation++;
// Determine effective TTL: per-message ttl (ns) takes priority over MaxAgeMs.
// Go: filestore.go:6830 — if msg.ttl > 0 use it, else use cfg.MaxAge.
@@ -395,6 +412,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
_activeBlock!.WriteAt(_last, subject, headers, persistedPayload, timestamp);
}
+ TrackMessage(_last, subject, headers.Length, payload.Length, timestamp, _activeBlock!.BlockId);
+ _generation++;
+
// Go: filestore.go:4443 (setupWriteCache) — record write in bounded cache manager.
_writeCache.TrackWrite(_activeBlock!.BlockId, headers.Length + persistedPayload.Length);
@@ -413,9 +433,8 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
///
public ulong Purge()
{
- var count = (ulong)_messages.Count;
- _messages.Clear();
- _messageIndexes.Clear();
+ var count = (ulong)_meta.Count;
+ _meta.Clear();
_lastSequenceBySubject.Clear();
_generation++;
_last = 0;
@@ -452,10 +471,10 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
ulong candidateCount = 0;
for (var current = _messageCount == 0 ? 0UL : _first; current != 0 && current <= upperBound; current++)
{
- if (!_messages.TryGetValue(current, out var message))
+ if (!_meta.TryGetValue(current, out var meta))
continue;
- if (!SubjectMatchesFilter(message.Subject, subject))
+ if (!SubjectMatchesFilter(meta.Subject, subject))
continue;
candidateCount++;
@@ -474,10 +493,10 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
ulong removed = 0;
for (var current = _messageCount == 0 ? 0UL : _first; current != 0 && current <= upperBound && removed < targetRemoveCount; current++)
{
- if (!_messages.TryGetValue(current, out var message))
+ if (!_meta.TryGetValue(current, out var meta2))
continue;
- if (!SubjectMatchesFilter(message.Subject, subject))
+ if (!SubjectMatchesFilter(meta2.Subject, subject))
continue;
if (RemoveTrackedMessage(current, preserveHighWaterMark: true))
@@ -504,7 +523,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
if (seq == 0)
return 0;
- var toRemove = _messages.Keys.Where(k => k < seq).ToArray();
+ var toRemove = _meta.Keys.Where(k => k < seq).ToArray();
if (toRemove.Length == 0)
return 0;
@@ -516,7 +535,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
_generation++;
- if (_messages.Count == 0)
+ if (_meta.Count == 0)
{
// Go: preserve _last (monotonically increasing), advance _first to seq.
// Compact(seq) removes everything < seq; the new first is seq.
@@ -541,8 +560,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
if (seq == 0)
{
// Truncate to nothing.
- _messages.Clear();
- _messageIndexes.Clear();
+ _meta.Clear();
_lastSequenceBySubject.Clear();
_generation++;
_last = 0;
@@ -555,7 +573,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
return;
}
- var toRemove = _messages.Keys.Where(k => k > seq).ToArray();
+ var toRemove = _meta.Keys.Where(k => k > seq).ToArray();
foreach (var s in toRemove)
{
RemoveTrackedMessage(s, preserveHighWaterMark: false);
@@ -583,12 +601,18 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
public ulong GetSeqFromTime(DateTime t)
{
var utc = t.Kind == DateTimeKind.Utc ? t : t.ToUniversalTime();
- var match = _messages.Values
- .Where(m => m.TimestampUtc >= utc)
- .OrderBy(m => m.Sequence)
- .FirstOrDefault();
+ var targetNs = new DateTimeOffset(utc).ToUnixTimeMilliseconds() * 1_000_000L;
+ ulong? matchSeq = null;
+ foreach (var kv in _meta)
+ {
+ if (kv.Value.TimestampNs >= targetNs)
+ {
+ if (matchSeq is null || kv.Key < matchSeq.Value)
+ matchSeq = kv.Key;
+ }
+ }
- return match?.Sequence ?? _last + 1;
+ return matchSeq ?? _last + 1;
}
///
@@ -597,7 +621,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
///
/// Optimized: uses block-range binary search to skip blocks whose sequence
/// range is entirely below . For each candidate block,
- /// messages already cached in _messages are filtered directly.
+ /// messages already cached in _meta are filtered directly.
/// Reference: golang/nats-server/server/filestore.go:3191 (FilteredState).
///
public SimpleState FilteredState(ulong seq, string subject)
@@ -610,12 +634,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
ulong first = 0;
ulong last = 0;
- // Collect candidates from _messages that fall in the block range.
- // _messages is the authoritative in-memory store; blocks are on-disk backing.
- // We iterate _messages sorted by key and filter by sequence + subject.
- // The binary block search only tells us where to START looking; the _messages
+ // Collect candidates from _meta that fall in the block range.
+ // _meta is the authoritative in-memory index; blocks are on-disk backing.
+ // We iterate _meta sorted by key and filter by sequence + subject.
+ // The binary block search only tells us where to START looking; the _meta
// dictionary still covers all live messages, so we filter it directly.
- foreach (var kv in _messages)
+ foreach (var kv in _meta)
{
var msgSeq = kv.Key;
if (msgSeq < seq)
@@ -701,27 +725,29 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
{
var result = new Dictionary(StringComparer.Ordinal);
- foreach (var msg in _messages.Values)
+ foreach (var kv in _meta)
{
- if (!string.IsNullOrEmpty(filterSubject) && !SubjectMatchesFilter(msg.Subject, filterSubject))
+ var seq = kv.Key;
+ var meta = kv.Value;
+ if (!string.IsNullOrEmpty(filterSubject) && !SubjectMatchesFilter(meta.Subject, filterSubject))
continue;
- if (result.TryGetValue(msg.Subject, out var existing))
+ if (result.TryGetValue(meta.Subject, out var existing))
{
- result[msg.Subject] = new SimpleState
+ result[meta.Subject] = new SimpleState
{
Msgs = existing.Msgs + 1,
- First = Math.Min(existing.First == 0 ? msg.Sequence : existing.First, msg.Sequence),
- Last = Math.Max(existing.Last, msg.Sequence),
+ First = Math.Min(existing.First == 0 ? seq : existing.First, seq),
+ Last = Math.Max(existing.Last, seq),
};
}
else
{
- result[msg.Subject] = new SimpleState
+ result[meta.Subject] = new SimpleState
{
Msgs = 1,
- First = msg.Sequence,
- Last = msg.Sequence,
+ First = seq,
+ Last = seq,
};
}
}
@@ -738,13 +764,13 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
{
var result = new Dictionary(StringComparer.Ordinal);
- foreach (var msg in _messages.Values)
+ foreach (var meta in _meta.Values)
{
- if (!string.IsNullOrEmpty(filterSubject) && !SubjectMatchesFilter(msg.Subject, filterSubject))
+ if (!string.IsNullOrEmpty(filterSubject) && !SubjectMatchesFilter(meta.Subject, filterSubject))
continue;
- result.TryGetValue(msg.Subject, out var count);
- result[msg.Subject] = count + 1;
+ result.TryGetValue(meta.Subject, out var count);
+ result[meta.Subject] = count + 1;
}
return result;
@@ -760,13 +786,13 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
FastState(ref state);
// Populate deleted sequences: sequences in [firstSeq, lastSeq] that are
- // not present in _messages.
+ // not present in _meta.
if (state.FirstSeq > 0 && state.LastSeq >= state.FirstSeq)
{
var deletedList = new List();
for (var s = state.FirstSeq; s <= state.LastSeq; s++)
{
- if (!_messages.ContainsKey(s))
+ if (!_meta.ContainsKey(s))
deletedList.Add(s);
}
@@ -779,10 +805,10 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
// Populate per-subject counts.
var subjectCounts = new Dictionary(StringComparer.Ordinal);
- foreach (var msg in _messages.Values)
+ foreach (var meta in _meta.Values)
{
- subjectCounts.TryGetValue(msg.Subject, out var cnt);
- subjectCounts[msg.Subject] = cnt + 1;
+ subjectCounts.TryGetValue(meta.Subject, out var cnt);
+ subjectCounts[meta.Subject] = cnt + 1;
}
state.NumSubjects = subjectCounts.Count;
state.Subjects = subjectCounts.Count > 0 ? subjectCounts : null;
@@ -816,25 +842,28 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
{
var firstSeq = _firstSeq;
state.FirstSeq = firstSeq;
- state.FirstTime = _messages.TryGetValue(firstSeq, out var firstMsg) ? firstMsg.TimestampUtc : default;
+ state.FirstTime = _meta.TryGetValue(firstSeq, out var firstMeta)
+ ? DateTimeOffset.FromUnixTimeMilliseconds(firstMeta.TimestampNs / 1_000_000).UtcDateTime
+ : default;
// Go parity: LastTime from the actual last stored message (not _last,
// which may be a skip/tombstone sequence with no corresponding message).
- if (_messages.TryGetValue(_last, out var lastMsg))
- state.LastTime = lastMsg.TimestampUtc;
- else if (_messages.Count > 0)
+ if (_meta.TryGetValue(_last, out var lastMeta))
+ state.LastTime = DateTimeOffset.FromUnixTimeMilliseconds(lastMeta.TimestampNs / 1_000_000).UtcDateTime;
+ else if (_meta.Count > 0)
{
// _last is a skip — use the highest actual message time.
- var actualLast = _messages.Keys.Max();
- state.LastTime = _messages[actualLast].TimestampUtc;
+ var actualLast = _meta.Keys.Max();
+ var actualMeta = _meta[actualLast];
+ state.LastTime = DateTimeOffset.FromUnixTimeMilliseconds(actualMeta.TimestampNs / 1_000_000).UtcDateTime;
}
- // Go parity: NumDeleted = gaps between firstSeq and lastSeq not in _messages.
+ // Go parity: NumDeleted = gaps between firstSeq and lastSeq not in _meta.
// Reference: filestore.go — FastState sets state.NumDeleted.
if (_last >= firstSeq)
{
var span = _last - firstSeq + 1;
- var liveCount = (ulong)_messages.Count;
+ var liveCount = (ulong)_meta.Count;
state.NumDeleted = span > liveCount ? (int)(span - liveCount) : 0;
}
else
@@ -842,30 +871,60 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
}
}
- private void TrackMessage(StoredMessage message)
+ ///
+ /// Materializes a full from block storage using
+ /// the lightweight metadata index. Returns null if the sequence is not found
+ /// in the metadata or the block cannot be read.
+ ///
+ private StoredMessage? MaterializeMessage(ulong seq)
{
- _messages[message.Sequence] = message;
- _messageIndexes[message.Sequence] = message.ToIndex();
- _lastSequenceBySubject[message.Subject] = message.Sequence;
+ if (!_meta.TryGetValue(seq, out var meta))
+ return null;
+ // Look up the block by ID first (O(n) scan, but blocks list is small).
+ // Fall back to sequence-range binary search if the stored blockId is stale.
+ var block = _blocks.Find(b => b.BlockId == meta.BlockId) ?? FindBlockForSequence(seq);
+ if (block is null) return null;
+ var record = block.Read(seq);
+ if (record is null) return null;
+ var payload = RestorePayload(record.Payload.Span);
+ return new StoredMessage
+ {
+ Sequence = seq,
+ Subject = meta.Subject,
+ RawHeaders = record.Headers,
+ Payload = payload,
+ TimestampUtc = DateTimeOffset.FromUnixTimeMilliseconds(meta.TimestampNs / 1_000_000).UtcDateTime,
+ };
+ }
+
+ private void TrackMessage(ulong sequence, string subject, int headerLength, int payloadLength, long timestampNs, int blockId)
+ {
+ _meta[sequence] = new MessageMeta
+ {
+ BlockId = blockId,
+ Subject = subject,
+ PayloadLength = payloadLength,
+ HeaderLength = headerLength,
+ TimestampNs = timestampNs,
+ };
+ _lastSequenceBySubject[subject] = sequence;
_messageCount++;
- _totalBytes += (ulong)(message.RawHeaders.Length + message.Payload.Length);
+ _totalBytes += (ulong)(headerLength + payloadLength);
if (_messageCount == 1)
{
- _first = message.Sequence;
- _firstSeq = message.Sequence;
+ _first = sequence;
+ _firstSeq = sequence;
}
}
private bool RemoveTrackedMessage(ulong sequence, bool preserveHighWaterMark)
{
- if (!_messages.Remove(sequence, out var message))
+ if (!_meta.Remove(sequence, out var meta))
return false;
-
- _messageIndexes.Remove(sequence);
_messageCount--;
- _totalBytes -= (ulong)(message.RawHeaders.Length + message.Payload.Length);
- UpdateLastSequenceForSubject(message.Subject, sequence);
+ _totalBytes -= (ulong)(meta.HeaderLength + meta.PayloadLength);
+ UpdateLastSequenceForSubject(meta.Subject, sequence);
if (_messageCount == 0)
{
@@ -890,7 +949,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
private void AdvanceFirstSequence(ulong start)
{
var candidate = start;
- while (!_messageIndexes.ContainsKey(candidate) && candidate <= _last)
+ while (!_meta.ContainsKey(candidate) && candidate <= _last)
candidate++;
if (candidate <= _last)
@@ -911,7 +970,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
for (var seq = startExclusive - 1; ; seq--)
{
- if (_messageIndexes.ContainsKey(seq))
+ if (_meta.ContainsKey(seq))
return seq;
if (seq == 0)
@@ -926,7 +985,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
for (var seq = removedSequence - 1; ; seq--)
{
- if (_messageIndexes.TryGetValue(seq, out var candidate) && string.Equals(candidate.Subject, subject, StringComparison.Ordinal))
+ if (_meta.TryGetValue(seq, out var candidate) && string.Equals(candidate.Subject, subject, StringComparison.Ordinal))
{
_lastSequenceBySubject[subject] = seq;
return;
@@ -941,15 +1000,27 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
private void RebuildIndexesFromMessages()
{
- _messageIndexes.Clear();
_lastSequenceBySubject.Clear();
_messageCount = 0;
_totalBytes = 0;
_firstSeq = 0;
_first = 0;
- foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value))
- TrackMessage(message);
+ // Re-derive counters from the existing _meta entries without re-inserting.
+ foreach (var kv in _meta.OrderBy(kv => kv.Key))
+ {
+ var seq = kv.Key;
+ var meta = kv.Value;
+ _lastSequenceBySubject[meta.Subject] = seq;
+ _messageCount++;
+ _totalBytes += (ulong)(meta.HeaderLength + meta.PayloadLength);
+
+ if (_messageCount == 1)
+ {
+ _first = seq;
+ _firstSeq = seq;
+ }
+ }
if (_messageCount == 0 && _last > 0)
_first = _last + 1;
@@ -991,9 +1062,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
// Cache miss: count matching messages and cache the result.
ulong count = 0;
- foreach (var msg in _messages.Values)
+ foreach (var meta in _meta.Values)
{
- if (SubjectMatchesFilter(msg.Subject, filter ?? string.Empty))
+ if (SubjectMatchesFilter(meta.Subject, filter ?? string.Empty))
count++;
}
@@ -1142,13 +1213,21 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
///
private void RewriteBlocks()
{
+ // Materialize all messages before disposing blocks (since materialization reads from blocks).
+ var messages = _meta.Keys
+ .OrderBy(seq => seq)
+ .Select(seq => MaterializeMessage(seq))
+ .Where(m => m is not null)
+ .Cast()
+ .ToList();
+
DisposeAllBlocks();
CleanBlockFiles();
- _last = _messages.Count == 0 ? 0UL : _messages.Keys.Max();
- RebuildIndexesFromMessages();
+ _meta.Clear();
+ _last = messages.Count == 0 ? 0UL : messages[^1].Sequence;
- foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value))
+ foreach (var message in messages)
{
var persistedPayload = TransformForPersist(message.Payload.Span);
var timestamp = new DateTimeOffset(message.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L;
@@ -1164,9 +1243,20 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
_activeBlock!.WriteAt(message.Sequence, message.Subject, message.RawHeaders, persistedPayload, timestamp);
}
+ _meta[message.Sequence] = new MessageMeta
+ {
+ BlockId = _activeBlock!.BlockId,
+ Subject = message.Subject,
+ PayloadLength = message.Payload.Length,
+ HeaderLength = message.RawHeaders.Length,
+ TimestampNs = timestamp,
+ };
+
if (_activeBlock!.IsSealed)
RotateBlock();
}
+
+ RebuildIndexesFromMessages();
}
// -------------------------------------------------------------------------
@@ -1225,8 +1315,6 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
_activeBlock = lastBlock;
}
- PruneExpired(DateTime.UtcNow);
-
// After recovery, sync _last from skip-sequence high-water marks.
// SkipMsg/SkipMsgs write tombstone records with empty subject — these
// intentionally advance _last without storing a live message. We must
@@ -1255,7 +1343,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
}
}
- // Sync _first from _messages; if empty, set to _last+1 (watermark).
+ // Prune expired messages AFTER establishing _last from blocks/skips.
+ // If all messages expire, PruneExpiredLinear resets _last to 0.
+ PruneExpired(DateTime.UtcNow);
+
+ // Sync _first from _meta; if empty, set to _last+1 (watermark).
RebuildIndexesFromMessages();
}
@@ -1278,30 +1370,27 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
if (record is null)
continue; // Deleted or not present.
- // The payload stored in the block is the transformed (compressed/encrypted) payload.
- // We need to reverse-transform it to get the original plaintext.
+ // Compute the original payload length for metadata tracking.
// InvalidDataException (e.g., wrong key) propagates to the caller.
var originalPayload = RestorePayload(record.Payload.Span);
- var message = new StoredMessage
+ _meta[record.Sequence] = new MessageMeta
{
- Sequence = record.Sequence,
+ BlockId = block.BlockId,
Subject = record.Subject,
- RawHeaders = record.Headers,
- Payload = originalPayload,
- TimestampUtc = DateTimeOffset.FromUnixTimeMilliseconds(record.Timestamp / 1_000_000L).UtcDateTime,
+ PayloadLength = originalPayload.Length,
+ HeaderLength = record.Headers.Length,
+ TimestampNs = record.Timestamp,
};
- _messages[message.Sequence] = message;
- if (message.Sequence > _last)
- _last = message.Sequence;
+ if (record.Sequence > _last)
+ _last = record.Sequence;
// Go: re-register unexpired TTLs in the wheel after recovery.
// Reference: golang/nats-server/server/filestore.go — recoverMsgs, TTL re-registration.
if (_options.MaxAgeMs > 0)
{
- var msgTs = new DateTimeOffset(message.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L;
- RegisterTtl(message.Sequence, msgTs, (long)_options.MaxAgeMs * 1_000_000L);
+ RegisterTtl(record.Sequence, record.Timestamp, (long)_options.MaxAgeMs * 1_000_000L);
}
}
}
@@ -1366,22 +1455,40 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
return;
}
- // Add to the in-memory cache.
+ // Write all messages to fresh blocks, then populate _meta.
foreach (var (seq, subject, payload, ts) in legacyMessages)
{
- _messages[seq] = new StoredMessage
- {
- Sequence = seq,
- Subject = subject,
- Payload = payload,
- TimestampUtc = ts,
- };
if (seq > _last)
_last = seq;
+
+ var persistedPayload = TransformForPersist(payload);
+ var timestampNs = new DateTimeOffset(ts).ToUnixTimeMilliseconds() * 1_000_000L;
+
+ EnsureActiveBlock();
+ try
+ {
+ _activeBlock!.WriteAt(seq, subject, ReadOnlyMemory.Empty, persistedPayload, timestampNs);
+ }
+ catch (InvalidOperationException)
+ {
+ RotateBlock();
+ _activeBlock!.WriteAt(seq, subject, ReadOnlyMemory.Empty, persistedPayload, timestampNs);
+ }
+
+ _meta[seq] = new MessageMeta
+ {
+ BlockId = _activeBlock!.BlockId,
+ Subject = subject,
+ PayloadLength = payload.Length,
+ HeaderLength = 0,
+ TimestampNs = timestampNs,
+ };
+
+ if (_activeBlock!.IsSealed)
+ RotateBlock();
}
- // Write all messages to fresh blocks.
- RewriteBlocks();
+ RebuildIndexesFromMessages();
// Delete the legacy files.
File.Delete(jsonlPath);
@@ -1467,9 +1574,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
if (_options.MaxAgeMs <= 0)
return;
- var cutoff = nowUtc.AddMilliseconds(-_options.MaxAgeMs);
- var expired = _messages
- .Where(kv => kv.Value.TimestampUtc < cutoff)
+ var cutoffNs = new DateTimeOffset(nowUtc.AddMilliseconds(-_options.MaxAgeMs)).ToUnixTimeMilliseconds() * 1_000_000L;
+ var expired = _meta
+ .Where(kv => kv.Value.TimestampNs < cutoffNs)
.Select(kv => kv.Key)
.ToArray();
@@ -1479,8 +1586,20 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
foreach (var sequence in expired)
{
RemoveTrackedMessage(sequence, preserveHighWaterMark: true);
+ DeleteInBlock(sequence);
}
- RewriteBlocks();
+
+ // When all messages are expired, reset sequence pointers to 0.
+ // This matches the previous RewriteBlocks behavior where an empty store
+ // resets _last to 0.
+ if (_meta.Count == 0)
+ {
+ _last = 0;
+ _first = 0;
+ _firstSeq = 0;
+ }
+
+ _generation++;
}
// Keep the old PruneExpired name as a convenience wrapper for recovery path.
@@ -1783,7 +1902,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
// When seq is 0, auto-assign next sequence.
var skipSeq = seq == 0 ? _last + 1 : seq;
_last = skipSeq;
- // Do NOT add to _messages — it is a skip (tombstone).
+ // Do NOT add to _meta — it is a skip (tombstone).
// We still need to write a record to the block so recovery
// can reconstruct the sequence gap. Use an empty subject sentinel.
EnsureActiveBlock();
@@ -1802,7 +1921,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
// After a skip, if there are no real messages, the next real first
// would be skipSeq+1. Track this so FastState reports correctly.
- if (_messages.Count == 0)
+ if (_meta.Count == 0)
_first = skipSeq + 1;
return skipSeq;
@@ -1837,46 +1956,45 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
/// Loads a message by exact sequence number into the optional reusable container
/// . Throws if not found.
///
- /// Fast path: O(1) hash lookup in _messages (all live messages are cached).
+ /// Fast path: O(1) hash lookup in _meta (all live messages are cached).
/// Fallback: binary-search _blocks by sequence range (O(log n) blocks) to
/// locate the containing block, then read from disk. This covers cases where a
- /// future memory-pressure eviction removes entries from _messages.
+ /// future memory-pressure eviction removes entries from _meta.
/// Reference: golang/nats-server/server/filestore.go:8308 (LoadMsg).
///
public StoreMsg LoadMsg(ulong seq, StoreMsg? sm)
{
- // Fast path: O(1) in-memory lookup.
- if (_messages.TryGetValue(seq, out var stored))
+ var stored = MaterializeMessage(seq);
+ if (stored is null)
{
+ // Fallback: binary-search blocks by sequence range (O(log n)).
+ // Handles cases where _meta does not contain the sequence.
+ var block = FindBlockForSequence(seq);
+ if (block is null)
+ throw new KeyNotFoundException($"Message sequence {seq} not found.");
+
+ var record = block.Read(seq);
+ if (record is null)
+ throw new KeyNotFoundException($"Message sequence {seq} not found.");
+
sm ??= new StoreMsg();
sm.Clear();
- sm.Subject = stored.Subject;
- sm.Header = stored.RawHeaders.Length > 0 ? stored.RawHeaders.ToArray() : null;
- sm.Data = stored.Payload.Length > 0 ? stored.Payload.ToArray() : null;
- sm.Sequence = stored.Sequence;
- sm.Timestamp = new DateTimeOffset(stored.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L;
+ sm.Subject = record.Subject;
+ sm.Header = record.Headers.Length > 0 ? record.Headers.ToArray() : null;
+ var originalPayload = RestorePayload(record.Payload.Span);
+ sm.Data = originalPayload.Length > 0 ? originalPayload : null;
+ sm.Sequence = record.Sequence;
+ sm.Timestamp = record.Timestamp;
return sm;
}
- // Fallback: binary-search blocks by sequence range (O(log n)).
- // Handles cases where _messages does not contain the sequence, e.g. future
- // memory-pressure eviction. FindBlockForSequence avoids an O(n) block scan.
- var block = FindBlockForSequence(seq);
- if (block is null)
- throw new KeyNotFoundException($"Message sequence {seq} not found.");
-
- var record = block.Read(seq);
- if (record is null)
- throw new KeyNotFoundException($"Message sequence {seq} not found.");
-
sm ??= new StoreMsg();
sm.Clear();
- sm.Subject = record.Subject;
- sm.Header = record.Headers.Length > 0 ? record.Headers.ToArray() : null;
- var originalPayload = RestorePayload(record.Payload.Span);
- sm.Data = originalPayload.Length > 0 ? originalPayload : null;
- sm.Sequence = record.Sequence;
- sm.Timestamp = record.Timestamp;
+ sm.Subject = stored.Subject;
+ sm.Header = stored.RawHeaders.Length > 0 ? stored.RawHeaders.ToArray() : null;
+ sm.Data = stored.Payload.Length > 0 ? stored.Payload.ToArray() : null;
+ sm.Sequence = stored.Sequence;
+ sm.Timestamp = new DateTimeOffset(stored.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L;
return sm;
}
@@ -1917,14 +2035,21 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
///
public StoreMsg LoadLastMsg(string subject, StoreMsg? sm)
{
- var match = _messages.Values
- .Where(m => string.IsNullOrEmpty(subject)
- || SubjectMatchesFilter(m.Subject, subject))
- .MaxBy(m => m.Sequence);
+ ulong? bestSeq = null;
+ foreach (var kv in _meta)
+ {
+ if (!string.IsNullOrEmpty(subject) && !SubjectMatchesFilter(kv.Value.Subject, subject))
+ continue;
+ if (bestSeq is null || kv.Key > bestSeq.Value)
+ bestSeq = kv.Key;
+ }
- if (match is null)
+ if (bestSeq is null)
throw new KeyNotFoundException($"No message found for subject '{subject}'.");
+ var match = MaterializeMessage(bestSeq.Value)
+ ?? throw new KeyNotFoundException($"No message found for subject '{subject}'.");
+
sm ??= new StoreMsg();
sm.Clear();
sm.Subject = match.Subject;
@@ -1943,26 +2068,31 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
///
public (StoreMsg Msg, ulong Skip) LoadNextMsg(string filter, bool wc, ulong start, StoreMsg? sm)
{
- var match = _messages
- .Where(kv => kv.Key >= start)
- .Where(kv => string.IsNullOrEmpty(filter) || SubjectMatchesFilter(kv.Value.Subject, filter))
- .OrderBy(kv => kv.Key)
- .Cast?>()
- .FirstOrDefault();
+ ulong? bestSeq = null;
+ foreach (var kv in _meta)
+ {
+ if (kv.Key < start) continue;
+ if (!string.IsNullOrEmpty(filter) && !SubjectMatchesFilter(kv.Value.Subject, filter))
+ continue;
+ if (bestSeq is null || kv.Key < bestSeq.Value)
+ bestSeq = kv.Key;
+ }
- if (match is null)
+ if (bestSeq is null)
throw new KeyNotFoundException($"No message found at or after seq {start} matching filter '{filter}'.");
- var found = match.Value;
- var skip = found.Key > start ? found.Key - start : 0UL;
+ var found = MaterializeMessage(bestSeq.Value)
+ ?? throw new KeyNotFoundException($"No message found at or after seq {start} matching filter '{filter}'.");
+
+ var skip = bestSeq.Value > start ? bestSeq.Value - start : 0UL;
sm ??= new StoreMsg();
sm.Clear();
- sm.Subject = found.Value.Subject;
- sm.Header = found.Value.RawHeaders.Length > 0 ? found.Value.RawHeaders.ToArray() : null;
- sm.Data = found.Value.Payload.Length > 0 ? found.Value.Payload.ToArray() : null;
- sm.Sequence = found.Key;
- sm.Timestamp = new DateTimeOffset(found.Value.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L;
+ sm.Subject = found.Subject;
+ sm.Header = found.RawHeaders.Length > 0 ? found.RawHeaders.ToArray() : null;
+ sm.Data = found.Payload.Length > 0 ? found.Payload.ToArray() : null;
+ sm.Sequence = found.Sequence;
+ sm.Timestamp = new DateTimeOffset(found.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L;
return (sm, skip);
}
@@ -1974,7 +2104,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
public ulong[] AllLastSeqs()
{
var lastPerSubject = new Dictionary(StringComparer.Ordinal);
- foreach (var kv in _messages)
+ foreach (var kv in _meta)
{
var subj = kv.Value.Subject;
if (!lastPerSubject.TryGetValue(subj, out var existing) || kv.Key > existing)
@@ -1996,7 +2126,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
{
var lastPerSubject = new Dictionary(StringComparer.Ordinal);
- foreach (var kv in _messages)
+ foreach (var kv in _meta)
{
var seq = kv.Key;
if (maxSeq > 0 && seq > maxSeq)
@@ -2027,9 +2157,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
///
public string SubjectForSeq(ulong seq)
{
- if (!_messages.TryGetValue(seq, out var stored))
+ if (!_meta.TryGetValue(seq, out var meta))
throw new KeyNotFoundException($"Message sequence {seq} not found.");
- return stored.Subject;
+ return meta.Subject;
}
///
@@ -2041,7 +2171,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
///
public (ulong Total, ulong ValidThrough) NumPending(ulong sseq, string filter, bool lastPerSubject)
{
- var candidates = _messages
+ var candidates = _meta
.Where(kv => kv.Key >= sseq)
.Where(kv => string.IsNullOrEmpty(filter) || SubjectMatchesFilter(kv.Value.Subject, filter))
.ToList();
@@ -2087,22 +2217,6 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
var headers = hdr is { Length: > 0 } ? hdr : [];
var payload = msg ?? [];
var persistedPayload = TransformForPersist(payload);
- // Recover UTC DateTime from caller-supplied Unix nanosecond timestamp.
- var storedUtc = DateTimeOffset.FromUnixTimeMilliseconds(ts / 1_000_000L).UtcDateTime;
-
- var stored = new StoredMessage
- {
- Sequence = seq,
- Subject = subject,
- RawHeaders = headers,
- Payload = payload,
- TimestampUtc = storedUtc,
- };
- TrackMessage(stored);
- _generation++;
-
- // Go: update _last to the high-water mark — do not decrement.
- _last = Math.Max(_last, seq);
// Register TTL using the caller-supplied timestamp and TTL.
var effectiveTtlNs = ttl > 0 ? ttl : (_options.MaxAgeMs > 0 ? (long)_options.MaxAgeMs * 1_000_000L : 0L);
@@ -2119,6 +2233,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
_activeBlock!.WriteAt(seq, subject, headers, persistedPayload, ts);
}
+ TrackMessage(seq, subject, headers.Length, payload.Length, ts, _activeBlock!.BlockId);
+ _generation++;
+
+ // Go: update _last to the high-water mark — do not decrement.
+ _last = Math.Max(_last, seq);
+
// Go: filestore.go:4443 (setupWriteCache) — record write in bounded cache manager.
_writeCache.TrackWrite(_activeBlock!.BlockId, headers.Length + persistedPayload.Length);
@@ -2137,20 +2257,24 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
if (start == 0)
throw new KeyNotFoundException("No message found before seq 0.");
- var first = _messages.Count > 0 ? _messages.Keys.Min() : 1UL;
+ var first = _meta.Count > 0 ? _meta.Keys.Min() : 1UL;
for (var seq = start - 1; seq >= first && seq <= _last; seq--)
{
- if (_messages.TryGetValue(seq, out var stored))
+ if (_meta.ContainsKey(seq))
{
- sm ??= new StoreMsg();
- sm.Clear();
- sm.Subject = stored.Subject;
- sm.Header = stored.RawHeaders.Length > 0 ? stored.RawHeaders.ToArray() : null;
- sm.Data = stored.Payload.Length > 0 ? stored.Payload.ToArray() : null;
- sm.Sequence = stored.Sequence;
- sm.Timestamp = new DateTimeOffset(stored.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L;
- return sm;
+ var stored = MaterializeMessage(seq);
+ if (stored is not null)
+ {
+ sm ??= new StoreMsg();
+ sm.Clear();
+ sm.Subject = stored.Subject;
+ sm.Header = stored.RawHeaders.Length > 0 ? stored.RawHeaders.ToArray() : null;
+ sm.Data = stored.Payload.Length > 0 ? stored.Payload.ToArray() : null;
+ sm.Sequence = stored.Sequence;
+ sm.Timestamp = new DateTimeOffset(stored.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L;
+ return sm;
+ }
}
// Prevent underflow on ulong subtraction.
@@ -2215,7 +2339,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
// Enforce per-subject limits immediately after config change.
if (_options.MaxMsgsPerSubject > 0)
{
- var subjects = _messages.Values.Select(m => m.Subject).Distinct(StringComparer.Ordinal).ToList();
+ var subjects = _meta.Values.Select(m => m.Subject).Distinct(StringComparer.Ordinal).ToList();
foreach (var subject in subjects)
EnforceMaxMsgsPerSubject(subject);
}
@@ -2232,7 +2356,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
if (limit <= 0)
return;
- var subjectMsgs = _messages
+ var subjectMsgs = _meta
.Where(kv => string.Equals(kv.Value.Subject, subject, StringComparison.Ordinal))
.OrderBy(kv => kv.Key)
.ToList();
@@ -2240,16 +2364,16 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
while (subjectMsgs.Count > limit)
{
var oldest = subjectMsgs[0];
- _totalBytes -= (ulong)oldest.Value.Payload.Length;
+ _totalBytes -= (ulong)(oldest.Value.HeaderLength + oldest.Value.PayloadLength);
_messageCount--;
- _messages.Remove(oldest.Key);
+ _meta.Remove(oldest.Key);
DeleteInBlock(oldest.Key);
subjectMsgs.RemoveAt(0);
_generation++;
}
- if (_messages.Count > 0)
- _firstSeq = _messages.Keys.Min();
+ if (_meta.Count > 0)
+ _firstSeq = _meta.Keys.Min();
}
///
@@ -2317,12 +2441,16 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
if (block is null)
continue;
- var waited = 0;
- while (block.PendingWriteSize < CoalesceMinimum && waited < MaxFlushWaitMs)
+ // Go-style exponential backoff: 1→2→4→8ms (vs linear 1ms × 8).
+ var waitMs = 1;
+ var totalWaited = 0;
+ while (block.PendingWriteSize < CoalesceMinimum && totalWaited < MaxFlushWaitMs)
{
- try { await Task.Delay(1, ct); }
+ var delay = Math.Min(waitMs, MaxFlushWaitMs - totalWaited);
+ try { await Task.Delay(delay, ct); }
catch (OperationCanceledException) { break; }
- waited++;
+ totalWaited += delay;
+ waitMs *= 2;
}
block.FlushPending();
diff --git a/src/NATS.Server/JetStream/Storage/IStreamStore.cs b/src/NATS.Server/JetStream/Storage/IStreamStore.cs
index a034a6f..1601897 100644
--- a/src/NATS.Server/JetStream/Storage/IStreamStore.cs
+++ b/src/NATS.Server/JetStream/Storage/IStreamStore.cs
@@ -32,6 +32,13 @@ public interface IStreamStore
// Existing MemStore/FileStore implementations return this type.
ValueTask GetStateAsync(CancellationToken ct);
+ // Cached state properties — avoid GetStateAsync on the publish hot path.
+ // These are maintained incrementally by FileStore/MemStore and are O(1).
+ ulong LastSeq => throw new NotSupportedException("LastSeq not implemented.");
+ ulong MessageCount => throw new NotSupportedException("MessageCount not implemented.");
+ ulong TotalBytes => throw new NotSupportedException("TotalBytes not implemented.");
+ ulong FirstSeq => throw new NotSupportedException("FirstSeq not implemented.");
+
// -------------------------------------------------------------------------
// Go-parity sync interface — mirrors server/store.go StreamStore
// Default implementations throw NotSupportedException so existing
diff --git a/src/NATS.Server/JetStream/Storage/MemStore.cs b/src/NATS.Server/JetStream/Storage/MemStore.cs
index b141768..750ba46 100644
--- a/src/NATS.Server/JetStream/Storage/MemStore.cs
+++ b/src/NATS.Server/JetStream/Storage/MemStore.cs
@@ -122,6 +122,12 @@ public sealed class MemStore : IStreamStore
}
}
+ // IStreamStore cached state properties — O(1), maintained incrementally.
+ public ulong LastSeq { get { lock (_gate) return _st.LastSeq; } }
+ public ulong MessageCount { get { lock (_gate) return _st.Msgs; } }
+ public ulong TotalBytes { get { lock (_gate) return _st.Bytes; } }
+ ulong IStreamStore.FirstSeq { get { lock (_gate) return _st.Msgs == 0 ? (_st.FirstSeq > 0 ? _st.FirstSeq : 0UL) : _st.FirstSeq; } }
+
// -------------------------------------------------------------------------
// Async helpers (used by existing JetStream layer)
// -------------------------------------------------------------------------
diff --git a/src/NATS.Server/JetStream/Storage/MsgBlock.cs b/src/NATS.Server/JetStream/Storage/MsgBlock.cs
index 06361c0..ef30d7d 100644
--- a/src/NATS.Server/JetStream/Storage/MsgBlock.cs
+++ b/src/NATS.Server/JetStream/Storage/MsgBlock.cs
@@ -315,6 +315,10 @@ public sealed class MsgBlock : IDisposable
_index[sequence] = (offset, written);
+ // If this sequence was previously soft-deleted, clear the deletion marker
+ // so that subsequent Read calls return the new record rather than null.
+ _deleted.Remove(sequence);
+
// Go: cache populated lazily on read, not eagerly on write.
// Reads that miss _cache flush pending buf to disk and decode from there.
diff --git a/src/NATS.Server/JetStream/StreamManager.cs b/src/NATS.Server/JetStream/StreamManager.cs
index 82699d0..e0b5a65 100644
--- a/src/NATS.Server/JetStream/StreamManager.cs
+++ b/src/NATS.Server/JetStream/StreamManager.cs
@@ -397,6 +397,11 @@ public sealed class StreamManager : IDisposable
if (stream == null)
return null;
+ return Capture(stream, subject, payload);
+ }
+
+ public PubAck? Capture(StreamHandle stream, string subject, ReadOnlyMemory payload)
+ {
// Go: sealed stream rejects all publishes.
// Reference: server/stream.go — processJetStreamMsg checks mset.cfg.Sealed.
if (stream.Config.Sealed)
@@ -414,17 +419,20 @@ public sealed class StreamManager : IDisposable
// Go: memStoreMsgSize — full message size includes subject + headers + payload + 16 bytes overhead.
var msgSize = subject.Length + payload.Length + 16;
- var stateBefore = stream.Store.GetStateAsync(default).GetAwaiter().GetResult();
+ // Use cached state properties instead of GetStateAsync to avoid allocation on hot path.
+ var currentMsgCount = stream.Store.MessageCount;
+ var currentBytes = stream.Store.TotalBytes;
+ var currentFirstSeq = stream.Store.FirstSeq;
// Go: DiscardPolicy.New — reject when MaxMsgs reached.
// Reference: server/stream.go — processJetStreamMsg checks discard new + maxMsgs.
if (stream.Config.MaxMsgs > 0 && stream.Config.Discard == DiscardPolicy.New
- && (long)stateBefore.Messages >= stream.Config.MaxMsgs)
+ && (long)currentMsgCount >= stream.Config.MaxMsgs)
{
return new PubAck { Stream = stream.Config.Name, ErrorCode = 10054 };
}
- if (stream.Config.MaxBytes > 0 && (long)stateBefore.Bytes + msgSize > stream.Config.MaxBytes)
+ if (stream.Config.MaxBytes > 0 && (long)currentBytes + msgSize > stream.Config.MaxBytes)
{
if (stream.Config.Discard == DiscardPolicy.New)
{
@@ -435,10 +443,9 @@ public sealed class StreamManager : IDisposable
};
}
- while ((long)stateBefore.Bytes + msgSize > stream.Config.MaxBytes && stateBefore.FirstSeq > 0)
+ while ((long)stream.Store.TotalBytes + msgSize > stream.Config.MaxBytes && stream.Store.FirstSeq > 0)
{
- stream.Store.RemoveAsync(stateBefore.FirstSeq, default).GetAwaiter().GetResult();
- stateBefore = stream.Store.GetStateAsync(default).GetAwaiter().GetResult();
+ stream.Store.RemoveAsync(stream.Store.FirstSeq, default).GetAwaiter().GetResult();
}
}
diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs
index 157b0c6..d5c8f6a 100644
--- a/src/NATS.Server/NatsServer.cs
+++ b/src/NATS.Server/NatsServer.cs
@@ -1399,8 +1399,19 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
// Go reference: server/jetstream.go — jsPubAckResponse sent to reply.
if (replyTo != null)
{
- var ackData = JsonSerializer.SerializeToUtf8Bytes(pubAck, s_jetStreamJsonOptions);
- ProcessMessage(replyTo, null, default, ackData, sender);
+ if (JetStream.Publish.JetStreamPubAckFormatter.IsSimpleSuccess(pubAck))
+ {
+ // Fast path: hand-rolled UTF-8 formatter avoids JsonSerializer overhead.
+ Span ackBuf = stackalloc byte[256];
+ var ackLen = JetStream.Publish.JetStreamPubAckFormatter.FormatSuccess(ackBuf, pubAck.Stream, pubAck.Seq);
+ ProcessMessage(replyTo, null, default, ackBuf[..ackLen].ToArray(), sender);
+ }
+ else
+ {
+ var ackData = JsonSerializer.SerializeToUtf8Bytes(pubAck, s_jetStreamJsonOptions);
+ ProcessMessage(replyTo, null, default, ackData, sender);
+ }
+
return;
}
}