diff --git a/src/NATS.Server/JetStream/Storage/FileStore.cs b/src/NATS.Server/JetStream/Storage/FileStore.cs index 373bc9d..c1170ea 100644 --- a/src/NATS.Server/JetStream/Storage/FileStore.cs +++ b/src/NATS.Server/JetStream/Storage/FileStore.cs @@ -25,11 +25,23 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable { private readonly FileStoreOptions _options; - // In-memory cache: keyed by sequence number. This is the primary data structure - // for reads and queries. The blocks are the on-disk persistence layer. - private readonly Dictionary _messages = new(); - // _messageIndexes removed — all lookups now use _messages directly to avoid - // per-message StoredMessageIndex allocation on the write path. + // Lightweight metadata index: keyed by sequence number. Stores only the fields + // needed for existence checks, subject queries, and timestamp lookups. + // Full message payloads are materialized on demand from blocks via MaterializeMessage. + private readonly Dictionary _meta = new(); + + /// + /// Lightweight per-message metadata stored in the in-memory index. + /// Eliminates the ~200B StoredMessage allocation on the write path. + /// + private struct MessageMeta + { + public int BlockId; + public string Subject; + public int PayloadLength; + public int HeaderLength; + public long TimestampNs; + } private readonly Dictionary _lastSequenceBySubject = new(StringComparer.Ordinal); // Block-based storage: the active (writable) block and sealed blocks. @@ -147,23 +159,14 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable // Go: writeMsgRecordLocked writes directly into mb.cache.buf (a single contiguous // byte slice). It does NOT store a per-message object in a map. - // We keep _messages for LoadAsync/RemoveAsync but avoid double payload.ToArray(). + // We store lightweight MessageMeta instead of full StoredMessage to avoid ~200B alloc. var persistedPayload = TransformForPersist(payload.Span); - var storedPayload = _noTransform ? persistedPayload : payload.ToArray(); - TrackMessage(new StoredMessage - { - Sequence = _last, - Subject = subject, - Payload = storedPayload, - TimestampUtc = now, - }); - _generation++; // Go: register TTL only when TTL > 0. if (_options.MaxAgeMs > 0) RegisterTtl(_last, timestamp, (long)_options.MaxAgeMs * 1_000_000L); - // Write to MsgBlock. + // Write to MsgBlock first so we know the block ID for the meta entry. EnsureActiveBlock(); try { @@ -175,6 +178,10 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable _activeBlock!.WriteAt(_last, subject, ReadOnlyMemory.Empty, persistedPayload, timestamp); } + // Track lightweight metadata now that we know the block ID. + TrackMessage(_last, subject, 0, payload.Length, timestamp, _activeBlock!.BlockId); + _generation++; + _writeCache.TrackWrite(_activeBlock!.BlockId, persistedPayload.Length); _flushSignal.Writer.TryWrite(0); @@ -193,16 +200,15 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable public ValueTask LoadAsync(ulong sequence, CancellationToken ct) { - _messages.TryGetValue(sequence, out var msg); - return ValueTask.FromResult(msg); + return ValueTask.FromResult(MaterializeMessage(sequence)); } public ValueTask LoadLastBySubjectAsync(string subject, CancellationToken ct) { - if (_lastSequenceBySubject.TryGetValue(subject, out var sequence) - && _messages.TryGetValue(sequence, out var match)) + if (_lastSequenceBySubject.TryGetValue(subject, out var sequence)) { - return ValueTask.FromResult(match); + var msg = MaterializeMessage(sequence); + return ValueTask.FromResult(msg); } return ValueTask.FromResult(null); @@ -210,8 +216,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable public ValueTask> ListAsync(CancellationToken ct) { - var messages = _messages.Values - .OrderBy(m => m.Sequence) + var messages = _meta.Keys + .OrderBy(seq => seq) + .Select(seq => MaterializeMessage(seq)) + .Where(m => m is not null) + .Cast() .ToArray(); return ValueTask.FromResult>(messages); } @@ -231,7 +240,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable public ValueTask PurgeAsync(CancellationToken ct) { - _messages.Clear(); + _meta.Clear(); _generation++; _last = 0; _messageCount = 0; @@ -255,24 +264,29 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable public ValueTask CreateSnapshotAsync(CancellationToken ct) { - var snapshot = _messages - .Values - .OrderBy(x => x.Sequence) - .Select(x => new FileRecord + var snapshot = _meta.Keys + .OrderBy(seq => seq) + .Select(seq => { - Sequence = x.Sequence, - Subject = x.Subject, - HeadersBase64 = x.RawHeaders.IsEmpty ? null : Convert.ToBase64String(x.RawHeaders.Span), - PayloadBase64 = Convert.ToBase64String(TransformForPersist(x.Payload.Span)), - TimestampUtc = x.TimestampUtc, + var msg = MaterializeMessage(seq); + if (msg is null) return null; + return new FileRecord + { + Sequence = msg.Sequence, + Subject = msg.Subject, + HeadersBase64 = msg.RawHeaders.IsEmpty ? null : Convert.ToBase64String(msg.RawHeaders.Span), + PayloadBase64 = Convert.ToBase64String(TransformForPersist(msg.Payload.Span)), + TimestampUtc = msg.TimestampUtc, + }; }) + .Where(r => r is not null) .ToArray(); return ValueTask.FromResult(JsonSerializer.SerializeToUtf8Bytes(snapshot)); } public ValueTask RestoreSnapshotAsync(ReadOnlyMemory snapshot, CancellationToken ct) { - _messages.Clear(); + _meta.Clear(); _lastSequenceBySubject.Clear(); _last = 0; _messageCount = 0; @@ -295,24 +309,30 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable ? ReadOnlyMemory.Empty : Convert.FromBase64String(record.HeadersBase64); var restoredPayload = RestorePayload(Convert.FromBase64String(record.PayloadBase64 ?? string.Empty)); - var message = new StoredMessage - { - Sequence = record.Sequence, - Subject = record.Subject ?? string.Empty, - RawHeaders = restoredHeaders, - Payload = restoredPayload, - TimestampUtc = record.TimestampUtc, - }; - _messages[record.Sequence] = message; + var persistedPayload = TransformForPersist(restoredPayload); + var timestampNs = new DateTimeOffset(record.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L; + var subject = record.Subject ?? string.Empty; + _last = Math.Max(_last, record.Sequence); + + EnsureActiveBlock(); + try + { + _activeBlock!.WriteAt(record.Sequence, subject, restoredHeaders, persistedPayload, timestampNs); + } + catch (InvalidOperationException) + { + RotateBlock(); + _activeBlock!.WriteAt(record.Sequence, subject, restoredHeaders, persistedPayload, timestampNs); + } + + TrackMessage(record.Sequence, subject, restoredHeaders.Length, restoredPayload.Length, timestampNs, _activeBlock!.BlockId); + + if (_activeBlock!.IsSealed) + RotateBlock(); } } } - - RebuildIndexesFromMessages(); - - // Write all messages to fresh blocks. - RewriteBlocks(); return ValueTask.CompletedTask; } @@ -330,7 +350,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable public void TrimToMaxMessages(ulong maxMessages) { var trimmed = false; - while ((ulong)_messages.Count > maxMessages) + while ((ulong)_meta.Count > maxMessages) { var first = _firstSeq; if (first == 0 || !RemoveTrackedMessage(first, preserveHighWaterMark: true)) @@ -375,15 +395,6 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable var headers = hdr is { Length: > 0 } ? hdr : []; var payload = msg ?? []; var persistedPayload = TransformForPersist(payload); - TrackMessage(new StoredMessage - { - Sequence = _last, - Subject = subject, - RawHeaders = headers, - Payload = payload, - TimestampUtc = now, - }); - _generation++; // Determine effective TTL: per-message ttl (ns) takes priority over MaxAgeMs. // Go: filestore.go:6830 — if msg.ttl > 0 use it, else use cfg.MaxAge. @@ -401,6 +412,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable _activeBlock!.WriteAt(_last, subject, headers, persistedPayload, timestamp); } + TrackMessage(_last, subject, headers.Length, payload.Length, timestamp, _activeBlock!.BlockId); + _generation++; + // Go: filestore.go:4443 (setupWriteCache) — record write in bounded cache manager. _writeCache.TrackWrite(_activeBlock!.BlockId, headers.Length + persistedPayload.Length); @@ -419,8 +433,8 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable /// public ulong Purge() { - var count = (ulong)_messages.Count; - _messages.Clear(); + var count = (ulong)_meta.Count; + _meta.Clear(); _lastSequenceBySubject.Clear(); _generation++; _last = 0; @@ -457,10 +471,10 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable ulong candidateCount = 0; for (var current = _messageCount == 0 ? 0UL : _first; current != 0 && current <= upperBound; current++) { - if (!_messages.TryGetValue(current, out var message)) + if (!_meta.TryGetValue(current, out var meta)) continue; - if (!SubjectMatchesFilter(message.Subject, subject)) + if (!SubjectMatchesFilter(meta.Subject, subject)) continue; candidateCount++; @@ -479,10 +493,10 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable ulong removed = 0; for (var current = _messageCount == 0 ? 0UL : _first; current != 0 && current <= upperBound && removed < targetRemoveCount; current++) { - if (!_messages.TryGetValue(current, out var message)) + if (!_meta.TryGetValue(current, out var meta2)) continue; - if (!SubjectMatchesFilter(message.Subject, subject)) + if (!SubjectMatchesFilter(meta2.Subject, subject)) continue; if (RemoveTrackedMessage(current, preserveHighWaterMark: true)) @@ -509,7 +523,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable if (seq == 0) return 0; - var toRemove = _messages.Keys.Where(k => k < seq).ToArray(); + var toRemove = _meta.Keys.Where(k => k < seq).ToArray(); if (toRemove.Length == 0) return 0; @@ -521,7 +535,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable _generation++; - if (_messages.Count == 0) + if (_meta.Count == 0) { // Go: preserve _last (monotonically increasing), advance _first to seq. // Compact(seq) removes everything < seq; the new first is seq. @@ -546,7 +560,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable if (seq == 0) { // Truncate to nothing. - _messages.Clear(); + _meta.Clear(); _lastSequenceBySubject.Clear(); _generation++; _last = 0; @@ -559,7 +573,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable return; } - var toRemove = _messages.Keys.Where(k => k > seq).ToArray(); + var toRemove = _meta.Keys.Where(k => k > seq).ToArray(); foreach (var s in toRemove) { RemoveTrackedMessage(s, preserveHighWaterMark: false); @@ -587,12 +601,18 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable public ulong GetSeqFromTime(DateTime t) { var utc = t.Kind == DateTimeKind.Utc ? t : t.ToUniversalTime(); - var match = _messages.Values - .Where(m => m.TimestampUtc >= utc) - .OrderBy(m => m.Sequence) - .FirstOrDefault(); + var targetNs = new DateTimeOffset(utc).ToUnixTimeMilliseconds() * 1_000_000L; + ulong? matchSeq = null; + foreach (var kv in _meta) + { + if (kv.Value.TimestampNs >= targetNs) + { + if (matchSeq is null || kv.Key < matchSeq.Value) + matchSeq = kv.Key; + } + } - return match?.Sequence ?? _last + 1; + return matchSeq ?? _last + 1; } /// @@ -601,7 +621,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable /// /// Optimized: uses block-range binary search to skip blocks whose sequence /// range is entirely below . For each candidate block, - /// messages already cached in _messages are filtered directly. + /// messages already cached in _meta are filtered directly. /// Reference: golang/nats-server/server/filestore.go:3191 (FilteredState). /// public SimpleState FilteredState(ulong seq, string subject) @@ -614,12 +634,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable ulong first = 0; ulong last = 0; - // Collect candidates from _messages that fall in the block range. - // _messages is the authoritative in-memory store; blocks are on-disk backing. - // We iterate _messages sorted by key and filter by sequence + subject. - // The binary block search only tells us where to START looking; the _messages + // Collect candidates from _meta that fall in the block range. + // _meta is the authoritative in-memory index; blocks are on-disk backing. + // We iterate _meta sorted by key and filter by sequence + subject. + // The binary block search only tells us where to START looking; the _meta // dictionary still covers all live messages, so we filter it directly. - foreach (var kv in _messages) + foreach (var kv in _meta) { var msgSeq = kv.Key; if (msgSeq < seq) @@ -705,27 +725,29 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable { var result = new Dictionary(StringComparer.Ordinal); - foreach (var msg in _messages.Values) + foreach (var kv in _meta) { - if (!string.IsNullOrEmpty(filterSubject) && !SubjectMatchesFilter(msg.Subject, filterSubject)) + var seq = kv.Key; + var meta = kv.Value; + if (!string.IsNullOrEmpty(filterSubject) && !SubjectMatchesFilter(meta.Subject, filterSubject)) continue; - if (result.TryGetValue(msg.Subject, out var existing)) + if (result.TryGetValue(meta.Subject, out var existing)) { - result[msg.Subject] = new SimpleState + result[meta.Subject] = new SimpleState { Msgs = existing.Msgs + 1, - First = Math.Min(existing.First == 0 ? msg.Sequence : existing.First, msg.Sequence), - Last = Math.Max(existing.Last, msg.Sequence), + First = Math.Min(existing.First == 0 ? seq : existing.First, seq), + Last = Math.Max(existing.Last, seq), }; } else { - result[msg.Subject] = new SimpleState + result[meta.Subject] = new SimpleState { Msgs = 1, - First = msg.Sequence, - Last = msg.Sequence, + First = seq, + Last = seq, }; } } @@ -742,13 +764,13 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable { var result = new Dictionary(StringComparer.Ordinal); - foreach (var msg in _messages.Values) + foreach (var meta in _meta.Values) { - if (!string.IsNullOrEmpty(filterSubject) && !SubjectMatchesFilter(msg.Subject, filterSubject)) + if (!string.IsNullOrEmpty(filterSubject) && !SubjectMatchesFilter(meta.Subject, filterSubject)) continue; - result.TryGetValue(msg.Subject, out var count); - result[msg.Subject] = count + 1; + result.TryGetValue(meta.Subject, out var count); + result[meta.Subject] = count + 1; } return result; @@ -764,13 +786,13 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable FastState(ref state); // Populate deleted sequences: sequences in [firstSeq, lastSeq] that are - // not present in _messages. + // not present in _meta. if (state.FirstSeq > 0 && state.LastSeq >= state.FirstSeq) { var deletedList = new List(); for (var s = state.FirstSeq; s <= state.LastSeq; s++) { - if (!_messages.ContainsKey(s)) + if (!_meta.ContainsKey(s)) deletedList.Add(s); } @@ -783,10 +805,10 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable // Populate per-subject counts. var subjectCounts = new Dictionary(StringComparer.Ordinal); - foreach (var msg in _messages.Values) + foreach (var meta in _meta.Values) { - subjectCounts.TryGetValue(msg.Subject, out var cnt); - subjectCounts[msg.Subject] = cnt + 1; + subjectCounts.TryGetValue(meta.Subject, out var cnt); + subjectCounts[meta.Subject] = cnt + 1; } state.NumSubjects = subjectCounts.Count; state.Subjects = subjectCounts.Count > 0 ? subjectCounts : null; @@ -820,25 +842,28 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable { var firstSeq = _firstSeq; state.FirstSeq = firstSeq; - state.FirstTime = _messages.TryGetValue(firstSeq, out var firstMsg) ? firstMsg.TimestampUtc : default; + state.FirstTime = _meta.TryGetValue(firstSeq, out var firstMeta) + ? DateTimeOffset.FromUnixTimeMilliseconds(firstMeta.TimestampNs / 1_000_000).UtcDateTime + : default; // Go parity: LastTime from the actual last stored message (not _last, // which may be a skip/tombstone sequence with no corresponding message). - if (_messages.TryGetValue(_last, out var lastMsg)) - state.LastTime = lastMsg.TimestampUtc; - else if (_messages.Count > 0) + if (_meta.TryGetValue(_last, out var lastMeta)) + state.LastTime = DateTimeOffset.FromUnixTimeMilliseconds(lastMeta.TimestampNs / 1_000_000).UtcDateTime; + else if (_meta.Count > 0) { // _last is a skip — use the highest actual message time. - var actualLast = _messages.Keys.Max(); - state.LastTime = _messages[actualLast].TimestampUtc; + var actualLast = _meta.Keys.Max(); + var actualMeta = _meta[actualLast]; + state.LastTime = DateTimeOffset.FromUnixTimeMilliseconds(actualMeta.TimestampNs / 1_000_000).UtcDateTime; } - // Go parity: NumDeleted = gaps between firstSeq and lastSeq not in _messages. + // Go parity: NumDeleted = gaps between firstSeq and lastSeq not in _meta. // Reference: filestore.go — FastState sets state.NumDeleted. if (_last >= firstSeq) { var span = _last - firstSeq + 1; - var liveCount = (ulong)_messages.Count; + var liveCount = (ulong)_meta.Count; state.NumDeleted = span > liveCount ? (int)(span - liveCount) : 0; } else @@ -846,27 +871,60 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable } } - private void TrackMessage(StoredMessage message) + /// + /// Materializes a full from block storage using + /// the lightweight metadata index. Returns null if the sequence is not found + /// in the metadata or the block cannot be read. + /// + private StoredMessage? MaterializeMessage(ulong seq) { - _messages[message.Sequence] = message; - _lastSequenceBySubject[message.Subject] = message.Sequence; + if (!_meta.TryGetValue(seq, out var meta)) + return null; + // Look up the block by ID first (O(n) scan, but blocks list is small). + // Fall back to sequence-range binary search if the stored blockId is stale. + var block = _blocks.Find(b => b.BlockId == meta.BlockId) ?? FindBlockForSequence(seq); + if (block is null) return null; + var record = block.Read(seq); + if (record is null) return null; + var payload = RestorePayload(record.Payload.Span); + return new StoredMessage + { + Sequence = seq, + Subject = meta.Subject, + RawHeaders = record.Headers, + Payload = payload, + TimestampUtc = DateTimeOffset.FromUnixTimeMilliseconds(meta.TimestampNs / 1_000_000).UtcDateTime, + }; + } + + private void TrackMessage(ulong sequence, string subject, int headerLength, int payloadLength, long timestampNs, int blockId) + { + _meta[sequence] = new MessageMeta + { + BlockId = blockId, + Subject = subject, + PayloadLength = payloadLength, + HeaderLength = headerLength, + TimestampNs = timestampNs, + }; + _lastSequenceBySubject[subject] = sequence; _messageCount++; - _totalBytes += (ulong)(message.RawHeaders.Length + message.Payload.Length); + _totalBytes += (ulong)(headerLength + payloadLength); if (_messageCount == 1) { - _first = message.Sequence; - _firstSeq = message.Sequence; + _first = sequence; + _firstSeq = sequence; } } private bool RemoveTrackedMessage(ulong sequence, bool preserveHighWaterMark) { - if (!_messages.Remove(sequence, out var message)) + if (!_meta.Remove(sequence, out var meta)) return false; _messageCount--; - _totalBytes -= (ulong)(message.RawHeaders.Length + message.Payload.Length); - UpdateLastSequenceForSubject(message.Subject, sequence); + _totalBytes -= (ulong)(meta.HeaderLength + meta.PayloadLength); + UpdateLastSequenceForSubject(meta.Subject, sequence); if (_messageCount == 0) { @@ -891,7 +949,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable private void AdvanceFirstSequence(ulong start) { var candidate = start; - while (!_messages.ContainsKey(candidate) && candidate <= _last) + while (!_meta.ContainsKey(candidate) && candidate <= _last) candidate++; if (candidate <= _last) @@ -912,7 +970,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable for (var seq = startExclusive - 1; ; seq--) { - if (_messages.ContainsKey(seq)) + if (_meta.ContainsKey(seq)) return seq; if (seq == 0) @@ -927,7 +985,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable for (var seq = removedSequence - 1; ; seq--) { - if (_messages.TryGetValue(seq, out var candidate) && string.Equals(candidate.Subject, subject, StringComparison.Ordinal)) + if (_meta.TryGetValue(seq, out var candidate) && string.Equals(candidate.Subject, subject, StringComparison.Ordinal)) { _lastSequenceBySubject[subject] = seq; return; @@ -948,8 +1006,21 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable _firstSeq = 0; _first = 0; - foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value)) - TrackMessage(message); + // Re-derive counters from the existing _meta entries without re-inserting. + foreach (var kv in _meta.OrderBy(kv => kv.Key)) + { + var seq = kv.Key; + var meta = kv.Value; + _lastSequenceBySubject[meta.Subject] = seq; + _messageCount++; + _totalBytes += (ulong)(meta.HeaderLength + meta.PayloadLength); + + if (_messageCount == 1) + { + _first = seq; + _firstSeq = seq; + } + } if (_messageCount == 0 && _last > 0) _first = _last + 1; @@ -991,9 +1062,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable // Cache miss: count matching messages and cache the result. ulong count = 0; - foreach (var msg in _messages.Values) + foreach (var meta in _meta.Values) { - if (SubjectMatchesFilter(msg.Subject, filter ?? string.Empty)) + if (SubjectMatchesFilter(meta.Subject, filter ?? string.Empty)) count++; } @@ -1142,13 +1213,21 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable /// private void RewriteBlocks() { + // Materialize all messages before disposing blocks (since materialization reads from blocks). + var messages = _meta.Keys + .OrderBy(seq => seq) + .Select(seq => MaterializeMessage(seq)) + .Where(m => m is not null) + .Cast() + .ToList(); + DisposeAllBlocks(); CleanBlockFiles(); - _last = _messages.Count == 0 ? 0UL : _messages.Keys.Max(); - RebuildIndexesFromMessages(); + _meta.Clear(); + _last = messages.Count == 0 ? 0UL : messages[^1].Sequence; - foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value)) + foreach (var message in messages) { var persistedPayload = TransformForPersist(message.Payload.Span); var timestamp = new DateTimeOffset(message.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L; @@ -1164,9 +1243,20 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable _activeBlock!.WriteAt(message.Sequence, message.Subject, message.RawHeaders, persistedPayload, timestamp); } + _meta[message.Sequence] = new MessageMeta + { + BlockId = _activeBlock!.BlockId, + Subject = message.Subject, + PayloadLength = message.Payload.Length, + HeaderLength = message.RawHeaders.Length, + TimestampNs = timestamp, + }; + if (_activeBlock!.IsSealed) RotateBlock(); } + + RebuildIndexesFromMessages(); } // ------------------------------------------------------------------------- @@ -1225,8 +1315,6 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable _activeBlock = lastBlock; } - PruneExpired(DateTime.UtcNow); - // After recovery, sync _last from skip-sequence high-water marks. // SkipMsg/SkipMsgs write tombstone records with empty subject — these // intentionally advance _last without storing a live message. We must @@ -1255,7 +1343,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable } } - // Sync _first from _messages; if empty, set to _last+1 (watermark). + // Prune expired messages AFTER establishing _last from blocks/skips. + // If all messages expire, PruneExpiredLinear resets _last to 0. + PruneExpired(DateTime.UtcNow); + + // Sync _first from _meta; if empty, set to _last+1 (watermark). RebuildIndexesFromMessages(); } @@ -1278,30 +1370,27 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable if (record is null) continue; // Deleted or not present. - // The payload stored in the block is the transformed (compressed/encrypted) payload. - // We need to reverse-transform it to get the original plaintext. + // Compute the original payload length for metadata tracking. // InvalidDataException (e.g., wrong key) propagates to the caller. var originalPayload = RestorePayload(record.Payload.Span); - var message = new StoredMessage + _meta[record.Sequence] = new MessageMeta { - Sequence = record.Sequence, + BlockId = block.BlockId, Subject = record.Subject, - RawHeaders = record.Headers, - Payload = originalPayload, - TimestampUtc = DateTimeOffset.FromUnixTimeMilliseconds(record.Timestamp / 1_000_000L).UtcDateTime, + PayloadLength = originalPayload.Length, + HeaderLength = record.Headers.Length, + TimestampNs = record.Timestamp, }; - _messages[message.Sequence] = message; - if (message.Sequence > _last) - _last = message.Sequence; + if (record.Sequence > _last) + _last = record.Sequence; // Go: re-register unexpired TTLs in the wheel after recovery. // Reference: golang/nats-server/server/filestore.go — recoverMsgs, TTL re-registration. if (_options.MaxAgeMs > 0) { - var msgTs = new DateTimeOffset(message.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L; - RegisterTtl(message.Sequence, msgTs, (long)_options.MaxAgeMs * 1_000_000L); + RegisterTtl(record.Sequence, record.Timestamp, (long)_options.MaxAgeMs * 1_000_000L); } } } @@ -1366,22 +1455,40 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable return; } - // Add to the in-memory cache. + // Write all messages to fresh blocks, then populate _meta. foreach (var (seq, subject, payload, ts) in legacyMessages) { - _messages[seq] = new StoredMessage - { - Sequence = seq, - Subject = subject, - Payload = payload, - TimestampUtc = ts, - }; if (seq > _last) _last = seq; + + var persistedPayload = TransformForPersist(payload); + var timestampNs = new DateTimeOffset(ts).ToUnixTimeMilliseconds() * 1_000_000L; + + EnsureActiveBlock(); + try + { + _activeBlock!.WriteAt(seq, subject, ReadOnlyMemory.Empty, persistedPayload, timestampNs); + } + catch (InvalidOperationException) + { + RotateBlock(); + _activeBlock!.WriteAt(seq, subject, ReadOnlyMemory.Empty, persistedPayload, timestampNs); + } + + _meta[seq] = new MessageMeta + { + BlockId = _activeBlock!.BlockId, + Subject = subject, + PayloadLength = payload.Length, + HeaderLength = 0, + TimestampNs = timestampNs, + }; + + if (_activeBlock!.IsSealed) + RotateBlock(); } - // Write all messages to fresh blocks. - RewriteBlocks(); + RebuildIndexesFromMessages(); // Delete the legacy files. File.Delete(jsonlPath); @@ -1467,9 +1574,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable if (_options.MaxAgeMs <= 0) return; - var cutoff = nowUtc.AddMilliseconds(-_options.MaxAgeMs); - var expired = _messages - .Where(kv => kv.Value.TimestampUtc < cutoff) + var cutoffNs = new DateTimeOffset(nowUtc.AddMilliseconds(-_options.MaxAgeMs)).ToUnixTimeMilliseconds() * 1_000_000L; + var expired = _meta + .Where(kv => kv.Value.TimestampNs < cutoffNs) .Select(kv => kv.Key) .ToArray(); @@ -1479,8 +1586,20 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable foreach (var sequence in expired) { RemoveTrackedMessage(sequence, preserveHighWaterMark: true); + DeleteInBlock(sequence); } - RewriteBlocks(); + + // When all messages are expired, reset sequence pointers to 0. + // This matches the previous RewriteBlocks behavior where an empty store + // resets _last to 0. + if (_meta.Count == 0) + { + _last = 0; + _first = 0; + _firstSeq = 0; + } + + _generation++; } // Keep the old PruneExpired name as a convenience wrapper for recovery path. @@ -1783,7 +1902,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable // When seq is 0, auto-assign next sequence. var skipSeq = seq == 0 ? _last + 1 : seq; _last = skipSeq; - // Do NOT add to _messages — it is a skip (tombstone). + // Do NOT add to _meta — it is a skip (tombstone). // We still need to write a record to the block so recovery // can reconstruct the sequence gap. Use an empty subject sentinel. EnsureActiveBlock(); @@ -1802,7 +1921,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable // After a skip, if there are no real messages, the next real first // would be skipSeq+1. Track this so FastState reports correctly. - if (_messages.Count == 0) + if (_meta.Count == 0) _first = skipSeq + 1; return skipSeq; @@ -1837,46 +1956,45 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable /// Loads a message by exact sequence number into the optional reusable container /// . Throws if not found. /// - /// Fast path: O(1) hash lookup in _messages (all live messages are cached). + /// Fast path: O(1) hash lookup in _meta (all live messages are cached). /// Fallback: binary-search _blocks by sequence range (O(log n) blocks) to /// locate the containing block, then read from disk. This covers cases where a - /// future memory-pressure eviction removes entries from _messages. + /// future memory-pressure eviction removes entries from _meta. /// Reference: golang/nats-server/server/filestore.go:8308 (LoadMsg). /// public StoreMsg LoadMsg(ulong seq, StoreMsg? sm) { - // Fast path: O(1) in-memory lookup. - if (_messages.TryGetValue(seq, out var stored)) + var stored = MaterializeMessage(seq); + if (stored is null) { + // Fallback: binary-search blocks by sequence range (O(log n)). + // Handles cases where _meta does not contain the sequence. + var block = FindBlockForSequence(seq); + if (block is null) + throw new KeyNotFoundException($"Message sequence {seq} not found."); + + var record = block.Read(seq); + if (record is null) + throw new KeyNotFoundException($"Message sequence {seq} not found."); + sm ??= new StoreMsg(); sm.Clear(); - sm.Subject = stored.Subject; - sm.Header = stored.RawHeaders.Length > 0 ? stored.RawHeaders.ToArray() : null; - sm.Data = stored.Payload.Length > 0 ? stored.Payload.ToArray() : null; - sm.Sequence = stored.Sequence; - sm.Timestamp = new DateTimeOffset(stored.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L; + sm.Subject = record.Subject; + sm.Header = record.Headers.Length > 0 ? record.Headers.ToArray() : null; + var originalPayload = RestorePayload(record.Payload.Span); + sm.Data = originalPayload.Length > 0 ? originalPayload : null; + sm.Sequence = record.Sequence; + sm.Timestamp = record.Timestamp; return sm; } - // Fallback: binary-search blocks by sequence range (O(log n)). - // Handles cases where _messages does not contain the sequence, e.g. future - // memory-pressure eviction. FindBlockForSequence avoids an O(n) block scan. - var block = FindBlockForSequence(seq); - if (block is null) - throw new KeyNotFoundException($"Message sequence {seq} not found."); - - var record = block.Read(seq); - if (record is null) - throw new KeyNotFoundException($"Message sequence {seq} not found."); - sm ??= new StoreMsg(); sm.Clear(); - sm.Subject = record.Subject; - sm.Header = record.Headers.Length > 0 ? record.Headers.ToArray() : null; - var originalPayload = RestorePayload(record.Payload.Span); - sm.Data = originalPayload.Length > 0 ? originalPayload : null; - sm.Sequence = record.Sequence; - sm.Timestamp = record.Timestamp; + sm.Subject = stored.Subject; + sm.Header = stored.RawHeaders.Length > 0 ? stored.RawHeaders.ToArray() : null; + sm.Data = stored.Payload.Length > 0 ? stored.Payload.ToArray() : null; + sm.Sequence = stored.Sequence; + sm.Timestamp = new DateTimeOffset(stored.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L; return sm; } @@ -1917,14 +2035,21 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable /// public StoreMsg LoadLastMsg(string subject, StoreMsg? sm) { - var match = _messages.Values - .Where(m => string.IsNullOrEmpty(subject) - || SubjectMatchesFilter(m.Subject, subject)) - .MaxBy(m => m.Sequence); + ulong? bestSeq = null; + foreach (var kv in _meta) + { + if (!string.IsNullOrEmpty(subject) && !SubjectMatchesFilter(kv.Value.Subject, subject)) + continue; + if (bestSeq is null || kv.Key > bestSeq.Value) + bestSeq = kv.Key; + } - if (match is null) + if (bestSeq is null) throw new KeyNotFoundException($"No message found for subject '{subject}'."); + var match = MaterializeMessage(bestSeq.Value) + ?? throw new KeyNotFoundException($"No message found for subject '{subject}'."); + sm ??= new StoreMsg(); sm.Clear(); sm.Subject = match.Subject; @@ -1943,26 +2068,31 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable /// public (StoreMsg Msg, ulong Skip) LoadNextMsg(string filter, bool wc, ulong start, StoreMsg? sm) { - var match = _messages - .Where(kv => kv.Key >= start) - .Where(kv => string.IsNullOrEmpty(filter) || SubjectMatchesFilter(kv.Value.Subject, filter)) - .OrderBy(kv => kv.Key) - .Cast?>() - .FirstOrDefault(); + ulong? bestSeq = null; + foreach (var kv in _meta) + { + if (kv.Key < start) continue; + if (!string.IsNullOrEmpty(filter) && !SubjectMatchesFilter(kv.Value.Subject, filter)) + continue; + if (bestSeq is null || kv.Key < bestSeq.Value) + bestSeq = kv.Key; + } - if (match is null) + if (bestSeq is null) throw new KeyNotFoundException($"No message found at or after seq {start} matching filter '{filter}'."); - var found = match.Value; - var skip = found.Key > start ? found.Key - start : 0UL; + var found = MaterializeMessage(bestSeq.Value) + ?? throw new KeyNotFoundException($"No message found at or after seq {start} matching filter '{filter}'."); + + var skip = bestSeq.Value > start ? bestSeq.Value - start : 0UL; sm ??= new StoreMsg(); sm.Clear(); - sm.Subject = found.Value.Subject; - sm.Header = found.Value.RawHeaders.Length > 0 ? found.Value.RawHeaders.ToArray() : null; - sm.Data = found.Value.Payload.Length > 0 ? found.Value.Payload.ToArray() : null; - sm.Sequence = found.Key; - sm.Timestamp = new DateTimeOffset(found.Value.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L; + sm.Subject = found.Subject; + sm.Header = found.RawHeaders.Length > 0 ? found.RawHeaders.ToArray() : null; + sm.Data = found.Payload.Length > 0 ? found.Payload.ToArray() : null; + sm.Sequence = found.Sequence; + sm.Timestamp = new DateTimeOffset(found.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L; return (sm, skip); } @@ -1974,7 +2104,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable public ulong[] AllLastSeqs() { var lastPerSubject = new Dictionary(StringComparer.Ordinal); - foreach (var kv in _messages) + foreach (var kv in _meta) { var subj = kv.Value.Subject; if (!lastPerSubject.TryGetValue(subj, out var existing) || kv.Key > existing) @@ -1996,7 +2126,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable { var lastPerSubject = new Dictionary(StringComparer.Ordinal); - foreach (var kv in _messages) + foreach (var kv in _meta) { var seq = kv.Key; if (maxSeq > 0 && seq > maxSeq) @@ -2027,9 +2157,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable /// public string SubjectForSeq(ulong seq) { - if (!_messages.TryGetValue(seq, out var stored)) + if (!_meta.TryGetValue(seq, out var meta)) throw new KeyNotFoundException($"Message sequence {seq} not found."); - return stored.Subject; + return meta.Subject; } /// @@ -2041,7 +2171,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable /// public (ulong Total, ulong ValidThrough) NumPending(ulong sseq, string filter, bool lastPerSubject) { - var candidates = _messages + var candidates = _meta .Where(kv => kv.Key >= sseq) .Where(kv => string.IsNullOrEmpty(filter) || SubjectMatchesFilter(kv.Value.Subject, filter)) .ToList(); @@ -2087,22 +2217,6 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable var headers = hdr is { Length: > 0 } ? hdr : []; var payload = msg ?? []; var persistedPayload = TransformForPersist(payload); - // Recover UTC DateTime from caller-supplied Unix nanosecond timestamp. - var storedUtc = DateTimeOffset.FromUnixTimeMilliseconds(ts / 1_000_000L).UtcDateTime; - - var stored = new StoredMessage - { - Sequence = seq, - Subject = subject, - RawHeaders = headers, - Payload = payload, - TimestampUtc = storedUtc, - }; - TrackMessage(stored); - _generation++; - - // Go: update _last to the high-water mark — do not decrement. - _last = Math.Max(_last, seq); // Register TTL using the caller-supplied timestamp and TTL. var effectiveTtlNs = ttl > 0 ? ttl : (_options.MaxAgeMs > 0 ? (long)_options.MaxAgeMs * 1_000_000L : 0L); @@ -2119,6 +2233,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable _activeBlock!.WriteAt(seq, subject, headers, persistedPayload, ts); } + TrackMessage(seq, subject, headers.Length, payload.Length, ts, _activeBlock!.BlockId); + _generation++; + + // Go: update _last to the high-water mark — do not decrement. + _last = Math.Max(_last, seq); + // Go: filestore.go:4443 (setupWriteCache) — record write in bounded cache manager. _writeCache.TrackWrite(_activeBlock!.BlockId, headers.Length + persistedPayload.Length); @@ -2137,20 +2257,24 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable if (start == 0) throw new KeyNotFoundException("No message found before seq 0."); - var first = _messages.Count > 0 ? _messages.Keys.Min() : 1UL; + var first = _meta.Count > 0 ? _meta.Keys.Min() : 1UL; for (var seq = start - 1; seq >= first && seq <= _last; seq--) { - if (_messages.TryGetValue(seq, out var stored)) + if (_meta.ContainsKey(seq)) { - sm ??= new StoreMsg(); - sm.Clear(); - sm.Subject = stored.Subject; - sm.Header = stored.RawHeaders.Length > 0 ? stored.RawHeaders.ToArray() : null; - sm.Data = stored.Payload.Length > 0 ? stored.Payload.ToArray() : null; - sm.Sequence = stored.Sequence; - sm.Timestamp = new DateTimeOffset(stored.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L; - return sm; + var stored = MaterializeMessage(seq); + if (stored is not null) + { + sm ??= new StoreMsg(); + sm.Clear(); + sm.Subject = stored.Subject; + sm.Header = stored.RawHeaders.Length > 0 ? stored.RawHeaders.ToArray() : null; + sm.Data = stored.Payload.Length > 0 ? stored.Payload.ToArray() : null; + sm.Sequence = stored.Sequence; + sm.Timestamp = new DateTimeOffset(stored.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L; + return sm; + } } // Prevent underflow on ulong subtraction. @@ -2215,7 +2339,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable // Enforce per-subject limits immediately after config change. if (_options.MaxMsgsPerSubject > 0) { - var subjects = _messages.Values.Select(m => m.Subject).Distinct(StringComparer.Ordinal).ToList(); + var subjects = _meta.Values.Select(m => m.Subject).Distinct(StringComparer.Ordinal).ToList(); foreach (var subject in subjects) EnforceMaxMsgsPerSubject(subject); } @@ -2232,7 +2356,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable if (limit <= 0) return; - var subjectMsgs = _messages + var subjectMsgs = _meta .Where(kv => string.Equals(kv.Value.Subject, subject, StringComparison.Ordinal)) .OrderBy(kv => kv.Key) .ToList(); @@ -2240,16 +2364,16 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable while (subjectMsgs.Count > limit) { var oldest = subjectMsgs[0]; - _totalBytes -= (ulong)oldest.Value.Payload.Length; + _totalBytes -= (ulong)(oldest.Value.HeaderLength + oldest.Value.PayloadLength); _messageCount--; - _messages.Remove(oldest.Key); + _meta.Remove(oldest.Key); DeleteInBlock(oldest.Key); subjectMsgs.RemoveAt(0); _generation++; } - if (_messages.Count > 0) - _firstSeq = _messages.Keys.Min(); + if (_meta.Count > 0) + _firstSeq = _meta.Keys.Min(); } /// diff --git a/src/NATS.Server/JetStream/Storage/MsgBlock.cs b/src/NATS.Server/JetStream/Storage/MsgBlock.cs index 06361c0..ef30d7d 100644 --- a/src/NATS.Server/JetStream/Storage/MsgBlock.cs +++ b/src/NATS.Server/JetStream/Storage/MsgBlock.cs @@ -315,6 +315,10 @@ public sealed class MsgBlock : IDisposable _index[sequence] = (offset, written); + // If this sequence was previously soft-deleted, clear the deletion marker + // so that subsequent Read calls return the new record rather than null. + _deleted.Remove(sequence); + // Go: cache populated lazily on read, not eagerly on write. // Reads that miss _cache flush pending buf to disk and decode from there.