perf: add compact FileStore index metadata

This commit is contained in:
Joseph Doherty
2026-03-13 10:34:31 -04:00
parent 5674853628
commit 9ff5216495
5 changed files with 190 additions and 159 deletions

View File

@@ -28,6 +28,8 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
// In-memory cache: keyed by sequence number. This is the primary data structure
// for reads and queries. The blocks are the on-disk persistence layer.
private readonly Dictionary<ulong, StoredMessage> _messages = new();
private readonly Dictionary<ulong, StoredMessageIndex> _messageIndexes = new();
private readonly Dictionary<string, ulong> _lastSequenceBySubject = new(StringComparer.Ordinal);
// Block-based storage: the active (writable) block and sealed blocks.
private readonly List<MsgBlock> _blocks = [];
@@ -141,20 +143,15 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
// We keep _messages for LoadAsync/RemoveAsync but avoid double payload.ToArray().
var persistedPayload = TransformForPersist(payload.Span);
var storedPayload = _noTransform ? persistedPayload : payload.ToArray();
_messages[_last] = new StoredMessage
TrackMessage(new StoredMessage
{
Sequence = _last,
Subject = subject,
Payload = storedPayload,
TimestampUtc = now,
};
});
_generation++;
_messageCount++;
_totalBytes += (ulong)payload.Length;
if (_messageCount == 1)
_firstSeq = _last;
// Go: register TTL only when TTL > 0.
if (_options.MaxAgeMs > 0)
RegisterTtl(_last, timestamp, (long)_options.MaxAgeMs * 1_000_000L);
@@ -195,11 +192,13 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
public ValueTask<StoredMessage?> LoadLastBySubjectAsync(string subject, CancellationToken ct)
{
var match = _messages.Values
.Where(m => string.Equals(m.Subject, subject, StringComparison.Ordinal))
.OrderByDescending(m => m.Sequence)
.FirstOrDefault();
return ValueTask.FromResult(match);
if (_lastSequenceBySubject.TryGetValue(subject, out var sequence)
&& _messages.TryGetValue(sequence, out var match))
{
return ValueTask.FromResult<StoredMessage?>(match);
}
return ValueTask.FromResult<StoredMessage?>(null);
}
public ValueTask<IReadOnlyList<StoredMessage>> ListAsync(CancellationToken ct)
@@ -212,21 +211,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
public ValueTask<bool> RemoveAsync(ulong sequence, CancellationToken ct)
{
if (!_messages.TryGetValue(sequence, out var msg))
if (!RemoveTrackedMessage(sequence, preserveHighWaterMark: false))
return ValueTask.FromResult(false);
_messages.Remove(sequence);
_generation++;
// Incremental state tracking.
_messageCount--;
_totalBytes -= (ulong)msg.Payload.Length;
if (sequence == _firstSeq)
_firstSeq = _messages.Count == 0 ? 0UL : _messages.Keys.Min();
if (sequence == _last)
_last = _messages.Count == 0 ? 0UL : _messages.Keys.Max();
// Soft-delete in the block that contains this sequence.
DeleteInBlock(sequence);
@@ -276,10 +265,13 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
public ValueTask RestoreSnapshotAsync(ReadOnlyMemory<byte> snapshot, CancellationToken ct)
{
_messages.Clear();
_messageIndexes.Clear();
_lastSequenceBySubject.Clear();
_last = 0;
_messageCount = 0;
_totalBytes = 0;
_firstSeq = 0;
_first = 0;
// Dispose existing blocks and clean files.
DisposeAllBlocks();
@@ -303,14 +295,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
_messages[record.Sequence] = message;
_last = Math.Max(_last, record.Sequence);
}
// Recompute incremental state from restored messages.
_messageCount = (ulong)_messages.Count;
_totalBytes = (ulong)_messages.Values.Sum(m => (long)m.Payload.Length);
_firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL;
}
}
RebuildIndexesFromMessages();
// Write all messages to fresh blocks.
RewriteBlocks();
return ValueTask.CompletedTask;
@@ -332,14 +321,10 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
var trimmed = false;
while ((ulong)_messages.Count > maxMessages)
{
var first = _messages.Keys.Min();
if (_messages.TryGetValue(first, out var msg))
{
_totalBytes -= (ulong)msg.Payload.Length;
_messageCount--;
}
var first = _firstSeq;
if (first == 0 || !RemoveTrackedMessage(first, preserveHighWaterMark: true))
break;
_messages.Remove(first);
DeleteInBlock(first);
trimmed = true;
}
@@ -347,7 +332,6 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
if (!trimmed)
return;
_firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL;
_generation++;
}
@@ -391,22 +375,15 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
}
var persistedPayload = TransformForPersist(combined.AsSpan());
var stored = new StoredMessage
TrackMessage(new StoredMessage
{
Sequence = _last,
Subject = subject,
Payload = combined,
TimestampUtc = now,
};
_messages[_last] = stored;
});
_generation++;
// Incremental state tracking.
_messageCount++;
_totalBytes += (ulong)combined.Length;
if (_messageCount == 1)
_firstSeq = _last;
// Determine effective TTL: per-message ttl (ns) takes priority over MaxAgeMs.
// Go: filestore.go:6830 — if msg.ttl > 0 use it, else use cfg.MaxAge.
var effectiveTtlNs = ttl > 0 ? ttl : (_options.MaxAgeMs > 0 ? (long)_options.MaxAgeMs * 1_000_000L : 0L);
@@ -443,11 +420,14 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
{
var count = (ulong)_messages.Count;
_messages.Clear();
_messageIndexes.Clear();
_lastSequenceBySubject.Clear();
_generation++;
_last = 0;
_messageCount = 0;
_totalBytes = 0;
_firstSeq = 0;
_first = 0;
DisposeAllBlocks();
CleanBlockFiles();
@@ -490,20 +470,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
foreach (var msg in toRemove)
{
_messages.Remove(msg.Sequence);
_totalBytes -= (ulong)msg.Payload.Length;
_messageCount--;
RemoveTrackedMessage(msg.Sequence, preserveHighWaterMark: true);
DeleteInBlock(msg.Sequence);
}
_generation++;
_firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL;
// Update _last if required.
if (_messages.Count == 0)
_last = 0;
else if (!_messages.ContainsKey(_last))
_last = _messages.Keys.Max();
return (ulong)toRemove.Count;
}
@@ -524,13 +495,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
foreach (var s in toRemove)
{
if (_messages.TryGetValue(s, out var msg))
{
_totalBytes -= (ulong)msg.Payload.Length;
_messageCount--;
}
_messages.Remove(s);
RemoveTrackedMessage(s, preserveHighWaterMark: true);
DeleteInBlock(s);
}
@@ -545,11 +510,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
}
else
{
if (!_messages.ContainsKey(_last))
_last = _messages.Keys.Max();
// Update _first to reflect the real first message.
_first = _messages.Keys.Min();
_firstSeq = _first;
_first = _firstSeq;
}
return (ulong)toRemove.Length;
@@ -566,11 +527,14 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
{
// Truncate to nothing.
_messages.Clear();
_messageIndexes.Clear();
_lastSequenceBySubject.Clear();
_generation++;
_last = 0;
_messageCount = 0;
_totalBytes = 0;
_firstSeq = 0;
_first = 0;
DisposeAllBlocks();
CleanBlockFiles();
return;
@@ -579,13 +543,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
var toRemove = _messages.Keys.Where(k => k > seq).ToArray();
foreach (var s in toRemove)
{
if (_messages.TryGetValue(s, out var msg))
{
_totalBytes -= (ulong)msg.Payload.Length;
_messageCount--;
}
_messages.Remove(s);
RemoveTrackedMessage(s, preserveHighWaterMark: false);
DeleteInBlock(s);
}
@@ -594,8 +552,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
// Update _last to the new highest existing sequence (or seq if it exists,
// or the highest below seq).
_last = _messages.Count == 0 ? 0 : _messages.Keys.Max();
_firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL;
if (_messageCount == 0)
{
_last = 0;
_first = 0;
_firstSeq = 0;
}
}
/// <summary>
@@ -865,6 +827,119 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
}
}
private void TrackMessage(StoredMessage message)
{
_messages[message.Sequence] = message;
_messageIndexes[message.Sequence] = message.ToIndex();
_lastSequenceBySubject[message.Subject] = message.Sequence;
_messageCount++;
_totalBytes += (ulong)message.Payload.Length;
if (_messageCount == 1)
{
_first = message.Sequence;
_firstSeq = message.Sequence;
}
}
private bool RemoveTrackedMessage(ulong sequence, bool preserveHighWaterMark)
{
if (!_messages.Remove(sequence, out var message))
return false;
_messageIndexes.Remove(sequence);
_messageCount--;
_totalBytes -= (ulong)message.Payload.Length;
UpdateLastSequenceForSubject(message.Subject, sequence);
if (_messageCount == 0)
{
if (preserveHighWaterMark)
_first = _last + 1;
else
_last = 0;
_firstSeq = 0;
return true;
}
if (sequence == _firstSeq)
AdvanceFirstSequence(sequence + 1);
if (!preserveHighWaterMark && sequence == _last)
_last = FindPreviousLiveSequence(sequence);
return true;
}
private void AdvanceFirstSequence(ulong start)
{
var candidate = start;
while (!_messageIndexes.ContainsKey(candidate) && candidate <= _last)
candidate++;
if (candidate <= _last)
{
_first = candidate;
_firstSeq = candidate;
return;
}
_first = _last + 1;
_firstSeq = 0;
}
private ulong FindPreviousLiveSequence(ulong startExclusive)
{
if (_messageCount == 0 || startExclusive == 0)
return 0;
for (var seq = startExclusive - 1; ; seq--)
{
if (_messageIndexes.ContainsKey(seq))
return seq;
if (seq == 0)
return 0;
}
}
private void UpdateLastSequenceForSubject(string subject, ulong removedSequence)
{
if (!_lastSequenceBySubject.TryGetValue(subject, out var currentLast) || currentLast != removedSequence)
return;
for (var seq = removedSequence - 1; ; seq--)
{
if (_messageIndexes.TryGetValue(seq, out var candidate) && string.Equals(candidate.Subject, subject, StringComparison.Ordinal))
{
_lastSequenceBySubject[subject] = seq;
return;
}
if (seq == 0)
break;
}
_lastSequenceBySubject.Remove(subject);
}
private void RebuildIndexesFromMessages()
{
_messageIndexes.Clear();
_lastSequenceBySubject.Clear();
_messageCount = 0;
_totalBytes = 0;
_firstSeq = 0;
_first = 0;
foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value))
TrackMessage(message);
if (_messageCount == 0 && _last > 0)
_first = _last + 1;
}
// -------------------------------------------------------------------------
// Subject matching helper
// -------------------------------------------------------------------------
@@ -1055,10 +1130,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
DisposeAllBlocks();
CleanBlockFiles();
_last = _messages.Count == 0 ? 0UL : _messages.Keys.Max();
_firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL;
_messageCount = (ulong)_messages.Count;
_totalBytes = (ulong)_messages.Values.Sum(m => (long)m.Payload.Length);
RebuildIndexesFromMessages();
foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value))
{
@@ -1168,15 +1240,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
}
// Sync _first from _messages; if empty, set to _last+1 (watermark).
if (_messages.Count > 0)
_first = _messages.Keys.Min();
else if (_last > 0)
_first = _last + 1;
// Recompute incremental state from recovered messages.
_messageCount = (ulong)_messages.Count;
_totalBytes = (ulong)_messages.Values.Sum(m => (long)m.Payload.Length);
_firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL;
RebuildIndexesFromMessages();
}
/// <summary>
@@ -1370,20 +1434,10 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
// Reference: golang/nats-server/server/filestore.go:expireMsgs — dmap-based removal.
foreach (var seq in expired)
{
if (_messages.Remove(seq, out var msg))
{
_messageCount--;
_totalBytes -= (ulong)msg.Payload.Length;
}
RemoveTrackedMessage(seq, preserveHighWaterMark: true);
DeleteInBlock(seq);
}
if (_messages.Count > 0)
_firstSeq = _messages.Keys.Min();
else
_firstSeq = 0;
_generation++;
}
@@ -1407,14 +1461,8 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
foreach (var sequence in expired)
{
if (_messages.Remove(sequence, out var msg))
{
_messageCount--;
_totalBytes -= (ulong)msg.Payload.Length;
}
RemoveTrackedMessage(sequence, preserveHighWaterMark: true);
}
_firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL;
RewriteBlocks();
}
@@ -1676,27 +1724,13 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
/// </summary>
public bool RemoveMsg(ulong seq)
{
if (!_messages.Remove(seq, out var msg))
if (!RemoveTrackedMessage(seq, preserveHighWaterMark: true))
return false;
_generation++;
_messageCount--;
_totalBytes -= (ulong)msg.Payload.Length;
// Go: filestore.go — LastSeq (lmb.last.seq) is a high-water mark and is
// never decremented on removal. Only FirstSeq advances when the first
// live message is removed.
if (_messages.Count == 0)
{
_first = _last + 1; // All gone — next first would be after last
_firstSeq = 0;
}
else
{
_first = _messages.Keys.Min();
_firstSeq = _first;
}
DeleteInBlock(seq);
return true;
}
@@ -1709,23 +1743,10 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
/// </summary>
public bool EraseMsg(ulong seq)
{
if (!_messages.Remove(seq, out var msg))
if (!RemoveTrackedMessage(seq, preserveHighWaterMark: true))
return false;
_generation++;
_messageCount--;
_totalBytes -= (ulong)msg.Payload.Length;
if (_messages.Count == 0)
{
_first = _last + 1;
_firstSeq = 0;
}
else
{
_first = _messages.Keys.Min();
_firstSeq = _first;
}
// Secure erase: overwrite payload bytes with random data before marking deleted.
// Reference: golang/nats-server/server/filestore.go:5890 (eraseMsg).