Files
natsdotnet/src/NATS.Server/JetStream/Storage/FileStore.cs
Joseph Doherty 6e91fda7fd perf: Phase 2 lazy StoredMessage materialization in FileStore
Replace eager Dictionary<ulong, StoredMessage> with lightweight
Dictionary<ulong, MessageMeta> to eliminate ~200B StoredMessage
allocation per message on the write path.

- Add MessageMeta struct (BlockId, Subject, PayloadLength, HeaderLength,
  TimestampNs) — ~40B vs ~200B for StoredMessage
- Add MaterializeMessage(seq) for on-demand reconstruction from blocks
- Update all ~60 _messages references to use _meta
- Methods needing full payload (LoadAsync, ListAsync, etc.) call
  MaterializeMessage; metadata-only paths use _meta directly
- Fix MsgBlock.WriteAt to clear stale delete markers on re-write
2026-03-13 15:33:38 -04:00

2760 lines
103 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using System.Buffers.Binary;
using System.Collections.Concurrent;
using System.Security.Cryptography;
using System.Text;
using System.Text.Json;
using System.Threading.Channels;
using NATS.Server.JetStream.Models;
using NATS.Server.Internal.TimeHashWheel;
// Storage.StreamState is in this namespace. Use an alias for the API-layer type
// (now named ApiStreamState in the Models namespace) to keep method signatures clear.
using ApiStreamState = NATS.Server.JetStream.Models.ApiStreamState;
namespace NATS.Server.JetStream.Storage;
/// <summary>
/// Block-based file store for JetStream messages. Uses <see cref="MsgBlock"/> for
/// on-disk persistence and maintains an in-memory cache (<see cref="StoredMessage"/>)
/// for fast reads and subject queries.
///
/// Reference: golang/nats-server/server/filestore.go — block manager, block rotation,
/// recovery via scanning .blk files, soft-delete via dmap.
/// </summary>
public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
{
private readonly FileStoreOptions _options;
// Lightweight metadata index: keyed by sequence number. Stores only the fields
// needed for existence checks, subject queries, and timestamp lookups.
// Full message payloads are materialized on demand from blocks via MaterializeMessage.
private readonly Dictionary<ulong, MessageMeta> _meta = new();
/// <summary>
/// Lightweight per-message metadata stored in the in-memory index.
/// Eliminates the ~200B StoredMessage allocation on the write path.
/// </summary>
private struct MessageMeta
{
public int BlockId;
public string Subject;
public int PayloadLength;
public int HeaderLength;
public long TimestampNs;
}
private readonly Dictionary<string, ulong> _lastSequenceBySubject = new(StringComparer.Ordinal);
// Block-based storage: the active (writable) block and sealed blocks.
private readonly List<MsgBlock> _blocks = [];
private MsgBlock? _activeBlock;
private int _nextBlockId;
private ulong _last;
private ulong _first; // Go: first.seq — watermark for the first live or expected-first sequence
// Incremental state tracking — avoid O(n) scans in GetStateAsync/FastState.
// Updated in AppendAsync, StoreMsg, RemoveAsync, PurgeAsync, PurgeEx, Compact,
// Truncate, TrimToMaxMessages, EnforceMaxMsgsPerSubject, and recovery.
private ulong _messageCount;
private ulong _totalBytes;
private ulong _firstSeq;
// Set to true after Stop() is called. Prevents further writes.
private bool _stopped;
// Resolved at construction time: which format family to use.
private readonly bool _useS2; // true -> S2Codec (FSV2 compression path)
private readonly bool _useAead; // true -> AeadEncryptor (FSV2 encryption path)
private readonly bool _noTransform; // true -> no compression/encryption, TransformForPersist just copies
// Go: filestore.go — per-stream time hash wheel for efficient TTL expiration.
// Created lazily only when MaxAgeMs > 0. Entries are (seq, expires_ns) pairs.
// Reference: golang/nats-server/server/filestore.go:290 (fss/ttl fields).
private HashWheel? _ttlWheel;
// Mutual-exclusion lock for state file writes. Ensures that concurrent
// FlushAllPending calls (e.g. from a flush timer and a shutdown path) do not
// race on the stream.state / stream.state.tmp files.
// Reference: golang/nats-server/server/filestore.go — fsMsgBlock.mu (write lock).
private readonly SemaphoreSlim _stateWriteLock = new(1, 1);
// Go: filestore.go:4443 (setupWriteCache) — bounded write cache manager.
// Manages TTL-based expiry and size cap across all active blocks.
// Reference: golang/nats-server/server/filestore.go:6148 (expireCache).
private readonly WriteCacheManager _writeCache;
// Go: filestore.go:5841 — background flush loop coalesces buffered writes.
// Reference: golang/nats-server/server/filestore.go:328-331 (coalesce constants).
private readonly Channel<byte> _flushSignal = Channel.CreateBounded<byte>(1);
private readonly CancellationTokenSource _flushCts = new();
private Task? _flushTask;
private const int CoalesceMinimum = 16 * 1024; // 16KB — Go: filestore.go:328
private const int MaxFlushWaitMs = 8; // 8ms — Go: filestore.go:331
// Go: filestore.go — generation counter for cache invalidation.
// Incremented on every write (Append/StoreRawMsg) and delete (Remove/Purge/Compact).
// NumFiltered caches results keyed by (filter, generation) so repeated calls for
// the same filter within the same generation are O(1).
// Reference: golang/nats-server/server/filestore.go — fss generation tracking.
private ulong _generation;
private readonly Dictionary<string, (ulong Generation, ulong Count)> _numFilteredCache = new(StringComparer.Ordinal);
public int BlockCount => _blocks.Count;
public bool UsedIndexManifestOnStartup { get; private set; }
// IStreamStore cached state properties — O(1), maintained incrementally.
public ulong LastSeq => _last;
public ulong MessageCount => _messageCount;
public ulong TotalBytes => _totalBytes;
ulong IStreamStore.FirstSeq => _messageCount == 0 ? (_first > 0 ? _first : 0UL) : _firstSeq;
public FileStore(FileStoreOptions options)
{
_options = options;
if (_options.BlockSizeBytes <= 0)
_options.BlockSizeBytes = 64 * 1024;
// Determine which format path is active.
_useS2 = _options.Compression == StoreCompression.S2Compression;
_useAead = _options.Cipher != StoreCipher.NoCipher;
_noTransform = !_useS2 && !_useAead && !_options.EnableCompression && !_options.EnableEncryption;
Directory.CreateDirectory(options.Directory);
// Attempt legacy JSONL migration first, then recover from blocks.
MigrateLegacyJsonl();
RecoverBlocks();
// Go: filestore.go:4443 (setupWriteCache) — initialise the bounded cache manager.
// Reference: golang/nats-server/server/filestore.go:6148 (expireCache).
_writeCache = new WriteCacheManager(
_options.MaxCacheSize,
_options.CacheExpiry,
blockId => _blocks.Find(b => b.BlockId == blockId));
// Go: filestore.go:5841 — start background flush loop for write coalescing.
_flushTask = Task.Run(() => FlushLoopAsync(_flushCts.Token));
}
public ValueTask<ulong> AppendAsync(string subject, ReadOnlyMemory<byte> payload, CancellationToken ct)
{
if (_stopped)
throw new ObjectDisposedException(nameof(FileStore), "Store has been stopped.");
// Go: DiscardNew — reject when MaxBytes would be exceeded.
if (_options.MaxBytes > 0 && _options.Discard == DiscardPolicy.New
&& (long)_totalBytes + payload.Length > _options.MaxBytes)
{
throw new StoreCapacityException("maximum bytes exceeded");
}
// Go: check and remove expired messages before each append.
// Only when MaxAge is configured (Go: filestore.go:4701 conditional).
if (_options.MaxAgeMs > 0)
ExpireFromWheel();
_last++;
var now = DateTime.UtcNow;
var timestamp = new DateTimeOffset(now).ToUnixTimeMilliseconds() * 1_000_000L;
// Go: writeMsgRecordLocked writes directly into mb.cache.buf (a single contiguous
// byte slice). It does NOT store a per-message object in a map.
// We store lightweight MessageMeta instead of full StoredMessage to avoid ~200B alloc.
var persistedPayload = TransformForPersist(payload.Span);
// Go: register TTL only when TTL > 0.
if (_options.MaxAgeMs > 0)
RegisterTtl(_last, timestamp, (long)_options.MaxAgeMs * 1_000_000L);
// Write to MsgBlock first so we know the block ID for the meta entry.
EnsureActiveBlock();
try
{
_activeBlock!.WriteAt(_last, subject, ReadOnlyMemory<byte>.Empty, persistedPayload, timestamp);
}
catch (InvalidOperationException)
{
RotateBlock();
_activeBlock!.WriteAt(_last, subject, ReadOnlyMemory<byte>.Empty, persistedPayload, timestamp);
}
// Track lightweight metadata now that we know the block ID.
TrackMessage(_last, subject, 0, payload.Length, timestamp, _activeBlock!.BlockId);
_generation++;
_writeCache.TrackWrite(_activeBlock!.BlockId, persistedPayload.Length);
_flushSignal.Writer.TryWrite(0);
// Check if the block just became sealed after this write.
if (_activeBlock!.IsSealed)
RotateBlock();
// Go: enforce MaxMsgsPerSubject — remove oldest messages for this subject
// when the per-subject count exceeds the limit.
// Reference: golang/nats-server/server/filestore.go — enforcePerSubjectLimit.
if (_options.MaxMsgsPerSubject > 0 && !string.IsNullOrEmpty(subject))
EnforceMaxMsgsPerSubject(subject);
return ValueTask.FromResult(_last);
}
public ValueTask<StoredMessage?> LoadAsync(ulong sequence, CancellationToken ct)
{
return ValueTask.FromResult(MaterializeMessage(sequence));
}
public ValueTask<StoredMessage?> LoadLastBySubjectAsync(string subject, CancellationToken ct)
{
if (_lastSequenceBySubject.TryGetValue(subject, out var sequence))
{
var msg = MaterializeMessage(sequence);
return ValueTask.FromResult(msg);
}
return ValueTask.FromResult<StoredMessage?>(null);
}
public ValueTask<IReadOnlyList<StoredMessage>> ListAsync(CancellationToken ct)
{
var messages = _meta.Keys
.OrderBy(seq => seq)
.Select(seq => MaterializeMessage(seq))
.Where(m => m is not null)
.Cast<StoredMessage>()
.ToArray();
return ValueTask.FromResult<IReadOnlyList<StoredMessage>>(messages);
}
public ValueTask<bool> RemoveAsync(ulong sequence, CancellationToken ct)
{
if (!RemoveTrackedMessage(sequence, preserveHighWaterMark: false))
return ValueTask.FromResult(false);
_generation++;
// Soft-delete in the block that contains this sequence.
DeleteInBlock(sequence);
return ValueTask.FromResult(true);
}
public ValueTask PurgeAsync(CancellationToken ct)
{
_meta.Clear();
_generation++;
_last = 0;
_messageCount = 0;
_totalBytes = 0;
_firstSeq = 0;
// Dispose and delete all blocks.
DisposeAllBlocks();
CleanBlockFiles();
// Clean up any legacy files that might still exist.
var jsonlPath = Path.Combine(_options.Directory, "messages.jsonl");
if (File.Exists(jsonlPath))
File.Delete(jsonlPath);
var manifestPath = Path.Combine(_options.Directory, _options.IndexManifestFileName);
if (File.Exists(manifestPath))
File.Delete(manifestPath);
return ValueTask.CompletedTask;
}
public ValueTask<byte[]> CreateSnapshotAsync(CancellationToken ct)
{
var snapshot = _meta.Keys
.OrderBy(seq => seq)
.Select(seq =>
{
var msg = MaterializeMessage(seq);
if (msg is null) return null;
return new FileRecord
{
Sequence = msg.Sequence,
Subject = msg.Subject,
HeadersBase64 = msg.RawHeaders.IsEmpty ? null : Convert.ToBase64String(msg.RawHeaders.Span),
PayloadBase64 = Convert.ToBase64String(TransformForPersist(msg.Payload.Span)),
TimestampUtc = msg.TimestampUtc,
};
})
.Where(r => r is not null)
.ToArray();
return ValueTask.FromResult(JsonSerializer.SerializeToUtf8Bytes(snapshot));
}
public ValueTask RestoreSnapshotAsync(ReadOnlyMemory<byte> snapshot, CancellationToken ct)
{
_meta.Clear();
_lastSequenceBySubject.Clear();
_last = 0;
_messageCount = 0;
_totalBytes = 0;
_firstSeq = 0;
_first = 0;
// Dispose existing blocks and clean files.
DisposeAllBlocks();
CleanBlockFiles();
if (!snapshot.IsEmpty)
{
var records = JsonSerializer.Deserialize<FileRecord[]>(snapshot.Span);
if (records != null)
{
foreach (var record in records)
{
var restoredHeaders = string.IsNullOrEmpty(record.HeadersBase64)
? ReadOnlyMemory<byte>.Empty
: Convert.FromBase64String(record.HeadersBase64);
var restoredPayload = RestorePayload(Convert.FromBase64String(record.PayloadBase64 ?? string.Empty));
var persistedPayload = TransformForPersist(restoredPayload);
var timestampNs = new DateTimeOffset(record.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L;
var subject = record.Subject ?? string.Empty;
_last = Math.Max(_last, record.Sequence);
EnsureActiveBlock();
try
{
_activeBlock!.WriteAt(record.Sequence, subject, restoredHeaders, persistedPayload, timestampNs);
}
catch (InvalidOperationException)
{
RotateBlock();
_activeBlock!.WriteAt(record.Sequence, subject, restoredHeaders, persistedPayload, timestampNs);
}
TrackMessage(record.Sequence, subject, restoredHeaders.Length, restoredPayload.Length, timestampNs, _activeBlock!.BlockId);
if (_activeBlock!.IsSealed)
RotateBlock();
}
}
}
return ValueTask.CompletedTask;
}
public ValueTask<ApiStreamState> GetStateAsync(CancellationToken ct)
{
return ValueTask.FromResult(new ApiStreamState
{
Messages = _messageCount,
FirstSeq = _messageCount == 0 ? (_first > 0 ? _first : 0UL) : _firstSeq,
LastSeq = _last,
Bytes = _totalBytes,
});
}
public void TrimToMaxMessages(ulong maxMessages)
{
var trimmed = false;
while ((ulong)_meta.Count > maxMessages)
{
var first = _firstSeq;
if (first == 0 || !RemoveTrackedMessage(first, preserveHighWaterMark: true))
break;
DeleteInBlock(first);
trimmed = true;
}
if (!trimmed)
return;
_generation++;
}
// -------------------------------------------------------------------------
// Go-parity sync interface implementations
// Reference: golang/nats-server/server/filestore.go
// -------------------------------------------------------------------------
/// <summary>
/// Synchronously stores a message, optionally with a per-message TTL override.
/// Returns the assigned sequence number and timestamp in nanoseconds.
/// When <paramref name="ttl"/> is greater than zero it overrides MaxAgeMs for
/// this specific message; otherwise the stream's MaxAgeMs applies.
/// Reference: golang/nats-server/server/filestore.go:6790 (storeMsg).
/// </summary>
public (ulong Seq, long Ts) StoreMsg(string subject, byte[]? hdr, byte[] msg, long ttl)
{
if (_stopped)
throw new ObjectDisposedException(nameof(FileStore), "Store has been stopped.");
// Go: expire check before each store (same as AppendAsync).
// Reference: golang/nats-server/server/filestore.go:6793 (expireMsgs call).
ExpireFromWheel();
_last++;
var now = DateTime.UtcNow;
var timestamp = new DateTimeOffset(now).ToUnixTimeMilliseconds() * 1_000_000L;
var headers = hdr is { Length: > 0 } ? hdr : [];
var payload = msg ?? [];
var persistedPayload = TransformForPersist(payload);
// Determine effective TTL: per-message ttl (ns) takes priority over MaxAgeMs.
// Go: filestore.go:6830 — if msg.ttl > 0 use it, else use cfg.MaxAge.
var effectiveTtlNs = ttl > 0 ? ttl : (_options.MaxAgeMs > 0 ? (long)_options.MaxAgeMs * 1_000_000L : 0L);
RegisterTtl(_last, timestamp, effectiveTtlNs);
EnsureActiveBlock();
try
{
_activeBlock!.WriteAt(_last, subject, headers, persistedPayload, timestamp);
}
catch (InvalidOperationException)
{
RotateBlock();
_activeBlock!.WriteAt(_last, subject, headers, persistedPayload, timestamp);
}
TrackMessage(_last, subject, headers.Length, payload.Length, timestamp, _activeBlock!.BlockId);
_generation++;
// Go: filestore.go:4443 (setupWriteCache) — record write in bounded cache manager.
_writeCache.TrackWrite(_activeBlock!.BlockId, headers.Length + persistedPayload.Length);
// Signal the background flush loop to coalesce and flush pending writes.
_flushSignal.Writer.TryWrite(0);
if (_activeBlock!.IsSealed)
RotateBlock();
return (_last, timestamp);
}
/// <summary>
/// Removes all messages from the store and returns the count purged.
/// Reference: golang/nats-server/server/filestore.go — purge / purgeMsgs.
/// </summary>
public ulong Purge()
{
var count = (ulong)_meta.Count;
_meta.Clear();
_lastSequenceBySubject.Clear();
_generation++;
_last = 0;
_messageCount = 0;
_totalBytes = 0;
_firstSeq = 0;
_first = 0;
DisposeAllBlocks();
CleanBlockFiles();
return count;
}
/// <summary>
/// Purge messages on a given subject, up to sequence <paramref name="seq"/>,
/// keeping the newest <paramref name="keep"/> messages.
/// If subject is empty or null, behaves like <see cref="Purge"/>.
/// Returns the number of messages removed.
/// Reference: golang/nats-server/server/filestore.go — PurgeEx.
/// </summary>
public ulong PurgeEx(string subject, ulong seq, ulong keep)
{
// Go parity: empty subject with keep=0 and seq=0 is a full purge.
// If keep > 0 or seq > 0, fall through to the candidate-based path
// treating all messages as candidates.
if (string.IsNullOrEmpty(subject) && keep == 0 && seq == 0)
return Purge();
var upperBound = seq == 0 ? _last : Math.Min(seq, _last);
if (upperBound == 0)
return 0;
ulong candidateCount = 0;
for (var current = _messageCount == 0 ? 0UL : _first; current != 0 && current <= upperBound; current++)
{
if (!_meta.TryGetValue(current, out var meta))
continue;
if (!SubjectMatchesFilter(meta.Subject, subject))
continue;
candidateCount++;
}
if (candidateCount == 0)
return 0;
var targetRemoveCount = keep > 0
? (candidateCount > keep ? candidateCount - keep : 0UL)
: candidateCount;
if (targetRemoveCount == 0)
return 0;
ulong removed = 0;
for (var current = _messageCount == 0 ? 0UL : _first; current != 0 && current <= upperBound && removed < targetRemoveCount; current++)
{
if (!_meta.TryGetValue(current, out var meta2))
continue;
if (!SubjectMatchesFilter(meta2.Subject, subject))
continue;
if (RemoveTrackedMessage(current, preserveHighWaterMark: true))
{
DeleteInBlock(current);
removed++;
}
}
if (removed == 0)
return 0;
_generation++;
return removed;
}
/// <summary>
/// Removes all messages with sequence number strictly less than <paramref name="seq"/>
/// and returns the count removed.
/// Reference: golang/nats-server/server/filestore.go — Compact.
/// </summary>
public ulong Compact(ulong seq)
{
if (seq == 0)
return 0;
var toRemove = _meta.Keys.Where(k => k < seq).ToArray();
if (toRemove.Length == 0)
return 0;
foreach (var s in toRemove)
{
RemoveTrackedMessage(s, preserveHighWaterMark: true);
DeleteInBlock(s);
}
_generation++;
if (_meta.Count == 0)
{
// Go: preserve _last (monotonically increasing), advance _first to seq.
// Compact(seq) removes everything < seq; the new first is seq.
_first = seq;
_firstSeq = 0;
}
else
{
_first = _firstSeq;
}
return (ulong)toRemove.Length;
}
/// <summary>
/// Removes all messages with sequence number strictly greater than <paramref name="seq"/>
/// and updates the last sequence pointer.
/// Reference: golang/nats-server/server/filestore.go — Truncate.
/// </summary>
public void Truncate(ulong seq)
{
if (seq == 0)
{
// Truncate to nothing.
_meta.Clear();
_lastSequenceBySubject.Clear();
_generation++;
_last = 0;
_messageCount = 0;
_totalBytes = 0;
_firstSeq = 0;
_first = 0;
DisposeAllBlocks();
CleanBlockFiles();
return;
}
var toRemove = _meta.Keys.Where(k => k > seq).ToArray();
foreach (var s in toRemove)
{
RemoveTrackedMessage(s, preserveHighWaterMark: false);
DeleteInBlock(s);
}
if (toRemove.Length > 0)
_generation++;
// Update _last to the new highest existing sequence (or seq if it exists,
// or the highest below seq).
if (_messageCount == 0)
{
_last = 0;
_first = 0;
_firstSeq = 0;
}
}
/// <summary>
/// Returns the first sequence number at or after the given UTC time.
/// Returns <c>_last + 1</c> if no message exists at or after <paramref name="t"/>.
/// Reference: golang/nats-server/server/filestore.go — GetSeqFromTime.
/// </summary>
public ulong GetSeqFromTime(DateTime t)
{
var utc = t.Kind == DateTimeKind.Utc ? t : t.ToUniversalTime();
var targetNs = new DateTimeOffset(utc).ToUnixTimeMilliseconds() * 1_000_000L;
ulong? matchSeq = null;
foreach (var kv in _meta)
{
if (kv.Value.TimestampNs >= targetNs)
{
if (matchSeq is null || kv.Key < matchSeq.Value)
matchSeq = kv.Key;
}
}
return matchSeq ?? _last + 1;
}
/// <summary>
/// Returns compact state for non-deleted messages on <paramref name="subject"/>
/// at or after sequence <paramref name="seq"/>.
///
/// Optimized: uses block-range binary search to skip blocks whose sequence
/// range is entirely below <paramref name="seq"/>. For each candidate block,
/// messages already cached in <c>_meta</c> are filtered directly.
/// Reference: golang/nats-server/server/filestore.go:3191 (FilteredState).
/// </summary>
public SimpleState FilteredState(ulong seq, string subject)
{
// Fast path: binary-search to find the first block whose LastSequence >= seq,
// then iterate only from that block forward. Blocks are sorted by sequence range.
var startBlockIdx = FindFirstBlockAtOrAfter(seq);
ulong count = 0;
ulong first = 0;
ulong last = 0;
// Collect candidates from _meta that fall in the block range.
// _meta is the authoritative in-memory index; blocks are on-disk backing.
// We iterate _meta sorted by key and filter by sequence + subject.
// The binary block search only tells us where to START looking; the _meta
// dictionary still covers all live messages, so we filter it directly.
foreach (var kv in _meta)
{
var msgSeq = kv.Key;
if (msgSeq < seq)
continue;
if (!string.IsNullOrEmpty(subject) && !SubjectMatchesFilter(kv.Value.Subject, subject))
continue;
count++;
if (first == 0 || msgSeq < first)
first = msgSeq;
if (msgSeq > last)
last = msgSeq;
}
if (count == 0)
return new SimpleState();
return new SimpleState
{
Msgs = count,
First = first,
Last = last,
};
}
/// <summary>
/// Binary-searches <c>_blocks</c> for the index of the first block whose
/// <see cref="MsgBlock.LastSequence"/> is at or after <paramref name="seq"/>.
/// Returns the block index (0-based), or <c>_blocks.Count</c> if none qualify.
/// Used to skip blocks that are entirely below the requested start sequence.
/// Reference: golang/nats-server/server/filestore.go:3191 (FilteredState block walk).
/// </summary>
private int FindFirstBlockAtOrAfter(ulong seq)
{
if (_blocks.Count == 0)
return 0;
int lo = 0, hi = _blocks.Count - 1;
while (lo < hi)
{
var mid = (lo + hi) / 2;
if (_blocks[mid].LastSequence < seq)
lo = mid + 1;
else
hi = mid;
}
// lo is the first block where LastSequence >= seq.
return (_blocks[lo].LastSequence >= seq) ? lo : _blocks.Count;
}
/// <summary>
/// Returns true if the given <paramref name="block"/> can be skipped entirely
/// because none of its subjects could match <paramref name="filter"/>.
///
/// Currently checks only for the universal "skip empty filter" case — a more
/// sophisticated per-block subject index would require storing per-block subject
/// sets, which is deferred. This method is the extension point for that optimization.
/// Reference: golang/nats-server/server/filestore.go (block-level subject tracking).
/// </summary>
public static bool CheckSkipFirstBlock(string filter, MsgBlock firstBlock)
{
// Without per-block subject metadata we cannot skip based on subject alone.
// Skip only when the block is provably empty (no written messages).
if (firstBlock.MessageCount == 0)
return true;
// An empty filter matches everything — never skip.
if (string.IsNullOrEmpty(filter))
return false;
// Without per-block subject metadata, conservatively do not skip.
return false;
}
/// <summary>
/// Returns per-subject <see cref="SimpleState"/> for all subjects matching
/// <paramref name="filterSubject"/>. Supports NATS wildcard filters.
/// Reference: golang/nats-server/server/filestore.go — SubjectsState.
/// </summary>
public Dictionary<string, SimpleState> SubjectsState(string filterSubject)
{
var result = new Dictionary<string, SimpleState>(StringComparer.Ordinal);
foreach (var kv in _meta)
{
var seq = kv.Key;
var meta = kv.Value;
if (!string.IsNullOrEmpty(filterSubject) && !SubjectMatchesFilter(meta.Subject, filterSubject))
continue;
if (result.TryGetValue(meta.Subject, out var existing))
{
result[meta.Subject] = new SimpleState
{
Msgs = existing.Msgs + 1,
First = Math.Min(existing.First == 0 ? seq : existing.First, seq),
Last = Math.Max(existing.Last, seq),
};
}
else
{
result[meta.Subject] = new SimpleState
{
Msgs = 1,
First = seq,
Last = seq,
};
}
}
return result;
}
/// <summary>
/// Returns per-subject message counts for all subjects matching
/// <paramref name="filterSubject"/>. Supports NATS wildcard filters.
/// Reference: golang/nats-server/server/filestore.go — SubjectsTotals.
/// </summary>
public Dictionary<string, ulong> SubjectsTotals(string filterSubject)
{
var result = new Dictionary<string, ulong>(StringComparer.Ordinal);
foreach (var meta in _meta.Values)
{
if (!string.IsNullOrEmpty(filterSubject) && !SubjectMatchesFilter(meta.Subject, filterSubject))
continue;
result.TryGetValue(meta.Subject, out var count);
result[meta.Subject] = count + 1;
}
return result;
}
/// <summary>
/// Returns the full stream state, including the list of deleted (interior gap) sequences.
/// Reference: golang/nats-server/server/filestore.go — State.
/// </summary>
public StreamState State()
{
var state = new StreamState();
FastState(ref state);
// Populate deleted sequences: sequences in [firstSeq, lastSeq] that are
// not present in _meta.
if (state.FirstSeq > 0 && state.LastSeq >= state.FirstSeq)
{
var deletedList = new List<ulong>();
for (var s = state.FirstSeq; s <= state.LastSeq; s++)
{
if (!_meta.ContainsKey(s))
deletedList.Add(s);
}
if (deletedList.Count > 0)
{
state.Deleted = [.. deletedList];
state.NumDeleted = deletedList.Count;
}
}
// Populate per-subject counts.
var subjectCounts = new Dictionary<string, ulong>(StringComparer.Ordinal);
foreach (var meta in _meta.Values)
{
subjectCounts.TryGetValue(meta.Subject, out var cnt);
subjectCounts[meta.Subject] = cnt + 1;
}
state.NumSubjects = subjectCounts.Count;
state.Subjects = subjectCounts.Count > 0 ? subjectCounts : null;
return state;
}
/// <summary>
/// Populates a pre-allocated <see cref="StreamState"/> with the minimum fields
/// needed for replication without allocating a new struct.
/// Does not populate the <see cref="StreamState.Deleted"/> array or
/// <see cref="StreamState.Subjects"/> dictionary.
/// Reference: golang/nats-server/server/filestore.go — FastState.
/// </summary>
public void FastState(ref StreamState state)
{
state.Msgs = _messageCount;
state.Bytes = _totalBytes;
state.LastSeq = _last;
state.LastTime = default;
if (_messageCount == 0)
{
// Go: when all messages are removed/expired, first.seq tracks the watermark.
// If _first > 0 use it (set by Compact / SkipMsg); otherwise 0.
state.FirstSeq = _first > 0 ? _first : 0;
state.FirstTime = default;
state.NumDeleted = 0;
}
else
{
var firstSeq = _firstSeq;
state.FirstSeq = firstSeq;
state.FirstTime = _meta.TryGetValue(firstSeq, out var firstMeta)
? DateTimeOffset.FromUnixTimeMilliseconds(firstMeta.TimestampNs / 1_000_000).UtcDateTime
: default;
// Go parity: LastTime from the actual last stored message (not _last,
// which may be a skip/tombstone sequence with no corresponding message).
if (_meta.TryGetValue(_last, out var lastMeta))
state.LastTime = DateTimeOffset.FromUnixTimeMilliseconds(lastMeta.TimestampNs / 1_000_000).UtcDateTime;
else if (_meta.Count > 0)
{
// _last is a skip — use the highest actual message time.
var actualLast = _meta.Keys.Max();
var actualMeta = _meta[actualLast];
state.LastTime = DateTimeOffset.FromUnixTimeMilliseconds(actualMeta.TimestampNs / 1_000_000).UtcDateTime;
}
// Go parity: NumDeleted = gaps between firstSeq and lastSeq not in _meta.
// Reference: filestore.go — FastState sets state.NumDeleted.
if (_last >= firstSeq)
{
var span = _last - firstSeq + 1;
var liveCount = (ulong)_meta.Count;
state.NumDeleted = span > liveCount ? (int)(span - liveCount) : 0;
}
else
state.NumDeleted = 0;
}
}
/// <summary>
/// Materializes a full <see cref="StoredMessage"/> from block storage using
/// the lightweight metadata index. Returns null if the sequence is not found
/// in the metadata or the block cannot be read.
/// </summary>
private StoredMessage? MaterializeMessage(ulong seq)
{
if (!_meta.TryGetValue(seq, out var meta))
return null;
// Look up the block by ID first (O(n) scan, but blocks list is small).
// Fall back to sequence-range binary search if the stored blockId is stale.
var block = _blocks.Find(b => b.BlockId == meta.BlockId) ?? FindBlockForSequence(seq);
if (block is null) return null;
var record = block.Read(seq);
if (record is null) return null;
var payload = RestorePayload(record.Payload.Span);
return new StoredMessage
{
Sequence = seq,
Subject = meta.Subject,
RawHeaders = record.Headers,
Payload = payload,
TimestampUtc = DateTimeOffset.FromUnixTimeMilliseconds(meta.TimestampNs / 1_000_000).UtcDateTime,
};
}
private void TrackMessage(ulong sequence, string subject, int headerLength, int payloadLength, long timestampNs, int blockId)
{
_meta[sequence] = new MessageMeta
{
BlockId = blockId,
Subject = subject,
PayloadLength = payloadLength,
HeaderLength = headerLength,
TimestampNs = timestampNs,
};
_lastSequenceBySubject[subject] = sequence;
_messageCount++;
_totalBytes += (ulong)(headerLength + payloadLength);
if (_messageCount == 1)
{
_first = sequence;
_firstSeq = sequence;
}
}
private bool RemoveTrackedMessage(ulong sequence, bool preserveHighWaterMark)
{
if (!_meta.Remove(sequence, out var meta))
return false;
_messageCount--;
_totalBytes -= (ulong)(meta.HeaderLength + meta.PayloadLength);
UpdateLastSequenceForSubject(meta.Subject, sequence);
if (_messageCount == 0)
{
if (preserveHighWaterMark)
_first = _last + 1;
else
_last = 0;
_firstSeq = 0;
return true;
}
if (sequence == _firstSeq)
AdvanceFirstSequence(sequence + 1);
if (!preserveHighWaterMark && sequence == _last)
_last = FindPreviousLiveSequence(sequence);
return true;
}
private void AdvanceFirstSequence(ulong start)
{
var candidate = start;
while (!_meta.ContainsKey(candidate) && candidate <= _last)
candidate++;
if (candidate <= _last)
{
_first = candidate;
_firstSeq = candidate;
return;
}
_first = _last + 1;
_firstSeq = 0;
}
private ulong FindPreviousLiveSequence(ulong startExclusive)
{
if (_messageCount == 0 || startExclusive == 0)
return 0;
for (var seq = startExclusive - 1; ; seq--)
{
if (_meta.ContainsKey(seq))
return seq;
if (seq == 0)
return 0;
}
}
private void UpdateLastSequenceForSubject(string subject, ulong removedSequence)
{
if (!_lastSequenceBySubject.TryGetValue(subject, out var currentLast) || currentLast != removedSequence)
return;
for (var seq = removedSequence - 1; ; seq--)
{
if (_meta.TryGetValue(seq, out var candidate) && string.Equals(candidate.Subject, subject, StringComparison.Ordinal))
{
_lastSequenceBySubject[subject] = seq;
return;
}
if (seq == 0)
break;
}
_lastSequenceBySubject.Remove(subject);
}
private void RebuildIndexesFromMessages()
{
_lastSequenceBySubject.Clear();
_messageCount = 0;
_totalBytes = 0;
_firstSeq = 0;
_first = 0;
// Re-derive counters from the existing _meta entries without re-inserting.
foreach (var kv in _meta.OrderBy(kv => kv.Key))
{
var seq = kv.Key;
var meta = kv.Value;
_lastSequenceBySubject[meta.Subject] = seq;
_messageCount++;
_totalBytes += (ulong)(meta.HeaderLength + meta.PayloadLength);
if (_messageCount == 1)
{
_first = seq;
_firstSeq = seq;
}
}
if (_messageCount == 0 && _last > 0)
_first = _last + 1;
}
// -------------------------------------------------------------------------
// Subject matching helper
// -------------------------------------------------------------------------
/// <summary>
/// Returns true if <paramref name="subject"/> matches <paramref name="filter"/>.
/// If filter is a literal, performs exact string comparison.
/// If filter contains NATS wildcards (* or >), uses SubjectMatch.MatchLiteral.
/// Reference: golang/nats-server/server/filestore.go — subjectMatch helper.
/// </summary>
private static bool SubjectMatchesFilter(string subject, string filter)
{
if (string.IsNullOrEmpty(filter))
return true;
return NATS.Server.Subscriptions.SubjectMatch.SubjectMatchesFilter(subject, filter);
}
/// <summary>
/// Counts messages whose subject matches <paramref name="filter"/> across all
/// blocks. Results are cached per-filter keyed on a generation counter that is
/// incremented on every write/delete. Repeated calls with the same filter and
/// the same generation are O(1) after the first scan.
///
/// An empty or null filter counts all messages.
/// Reference: golang/nats-server/server/filestore.go — fss NumFiltered (subject-state cache).
/// </summary>
public ulong NumFiltered(string filter)
{
var key = filter ?? string.Empty;
// Cache hit: return instantly if generation has not changed.
if (_numFilteredCache.TryGetValue(key, out var cached) && cached.Generation == _generation)
return cached.Count;
// Cache miss: count matching messages and cache the result.
ulong count = 0;
foreach (var meta in _meta.Values)
{
if (SubjectMatchesFilter(meta.Subject, filter ?? string.Empty))
count++;
}
_numFilteredCache[key] = (_generation, count);
return count;
}
public async ValueTask DisposeAsync()
{
// Stop the background flush loop first to prevent it from accessing
// blocks that are about to be disposed.
await StopFlushLoopAsync();
// Flush pending buffered writes on all blocks before closing.
foreach (var block in _blocks)
block.FlushPending();
// Dispose blocks first so the write cache lookup returns null for
// already-closed blocks. WriteCacheManager.FlushAllAsync guards
// against null blocks, so this ordering prevents ObjectDisposedException.
DisposeAllBlocks();
await _writeCache.DisposeAsync();
_stateWriteLock.Dispose();
}
/// <summary>
/// Synchronous dispose — releases all block file handles and the state-write semaphore.
/// Allows the store to be used in synchronous test contexts with <c>using</c> blocks.
/// </summary>
public void Dispose()
{
StopFlushLoop();
foreach (var block in _blocks)
block.FlushPending();
DisposeAllBlocks();
_stateWriteLock.Dispose();
}
/// <summary>
/// Stops the store and deletes all persisted data (blocks, index files).
/// Reference: golang/nats-server/server/filestore.go — fileStore.Delete.
/// </summary>
public void Delete(bool inline = false)
{
Stop();
if (Directory.Exists(_options.Directory))
{
try { Directory.Delete(_options.Directory, recursive: true); }
catch { /* best effort */ }
}
}
// -------------------------------------------------------------------------
// Block management
// -------------------------------------------------------------------------
/// <summary>
/// Ensures an active (writable) block exists. Creates one if needed.
/// </summary>
private void EnsureActiveBlock()
{
if (_activeBlock is null || _activeBlock.IsSealed)
RotateBlock();
}
/// <summary>
/// Creates a new active block. The previous active block (if any) stays in the
/// block list as a sealed block. The firstSequence is set to _last + 1 (the next
/// expected sequence), but actual sequences come from WriteAt calls.
/// When rotating, the previously active block's write cache is cleared to free memory.
/// Reference: golang/nats-server/server/filestore.go — clearCache called on block seal.
/// </summary>
private void RotateBlock()
{
// Flush any pending buffered writes before sealing the outgoing block.
_activeBlock?.FlushPending();
// Go: filestore.go:4499 (flushPendingMsgsLocked) — evict the outgoing block's
// write cache via WriteCacheManager before rotating to the new block.
// WriteCacheManager.EvictBlock flushes to disk then clears the cache.
if (_activeBlock is not null)
_writeCache.EvictBlock(_activeBlock.BlockId);
// Clear the write cache on the outgoing active block — it is now sealed.
// This frees memory; future reads on sealed blocks go to disk.
_activeBlock?.ClearCache();
var firstSeq = _last + 1;
var block = MsgBlock.Create(_nextBlockId, _options.Directory, _options.BlockSizeBytes, firstSeq);
_blocks.Add(block);
_activeBlock = block;
_nextBlockId++;
}
/// <summary>
/// Soft-deletes a message in the block that contains it.
/// When <paramref name="secureErase"/> is <c>true</c>, payload bytes are
/// overwritten with random data before the delete record is written.
/// Reference: golang/nats-server/server/filestore.go:5890 (eraseMsg).
/// </summary>
private void DeleteInBlock(ulong sequence, bool secureErase = false)
{
foreach (var block in _blocks)
{
if (sequence >= block.FirstSequence && sequence <= block.LastSequence)
{
block.Delete(sequence, secureErase);
return;
}
}
}
/// <summary>
/// Disposes all blocks and clears the block list.
/// </summary>
private void DisposeAllBlocks()
{
// Clear _activeBlock first so the background flush loop sees null
// and skips FlushPending, avoiding ObjectDisposedException on the lock.
_activeBlock = null;
foreach (var block in _blocks)
block.Dispose();
_blocks.Clear();
_nextBlockId = 0;
}
/// <summary>
/// Deletes all .blk files in the store directory.
/// </summary>
private void CleanBlockFiles()
{
if (!Directory.Exists(_options.Directory))
return;
foreach (var blkFile in Directory.GetFiles(_options.Directory, "*.blk"))
{
try { File.Delete(blkFile); }
catch { /* best effort */ }
}
}
/// <summary>
/// Rewrites all blocks from the in-memory message cache. Used after trim,
/// snapshot restore, or legacy migration.
/// </summary>
private void RewriteBlocks()
{
// Materialize all messages before disposing blocks (since materialization reads from blocks).
var messages = _meta.Keys
.OrderBy(seq => seq)
.Select(seq => MaterializeMessage(seq))
.Where(m => m is not null)
.Cast<StoredMessage>()
.ToList();
DisposeAllBlocks();
CleanBlockFiles();
_meta.Clear();
_last = messages.Count == 0 ? 0UL : messages[^1].Sequence;
foreach (var message in messages)
{
var persistedPayload = TransformForPersist(message.Payload.Span);
var timestamp = new DateTimeOffset(message.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L;
EnsureActiveBlock();
try
{
_activeBlock!.WriteAt(message.Sequence, message.Subject, message.RawHeaders, persistedPayload, timestamp);
}
catch (InvalidOperationException)
{
RotateBlock();
_activeBlock!.WriteAt(message.Sequence, message.Subject, message.RawHeaders, persistedPayload, timestamp);
}
_meta[message.Sequence] = new MessageMeta
{
BlockId = _activeBlock!.BlockId,
Subject = message.Subject,
PayloadLength = message.Payload.Length,
HeaderLength = message.RawHeaders.Length,
TimestampNs = timestamp,
};
if (_activeBlock!.IsSealed)
RotateBlock();
}
RebuildIndexesFromMessages();
}
// -------------------------------------------------------------------------
// Recovery: scan .blk files on startup and rebuild in-memory state.
// -------------------------------------------------------------------------
/// <summary>
/// Recovers all blocks from .blk files in the store directory.
/// </summary>
private void RecoverBlocks()
{
var blkFiles = Directory.GetFiles(_options.Directory, "*.blk");
if (blkFiles.Length == 0)
return;
// Sort by block ID (filename is like "000000.blk", "000001.blk", ...).
Array.Sort(blkFiles, StringComparer.OrdinalIgnoreCase);
var maxBlockId = -1;
foreach (var blkFile in blkFiles)
{
var fileName = Path.GetFileNameWithoutExtension(blkFile);
if (!int.TryParse(fileName, out var blockId))
continue;
try
{
var block = MsgBlock.Recover(blockId, _options.Directory);
_blocks.Add(block);
if (blockId > maxBlockId)
maxBlockId = blockId;
// Read all non-deleted records from this block and populate the in-memory cache.
RecoverMessagesFromBlock(block);
}
catch (InvalidDataException)
{
// InvalidDataException indicates key mismatch or integrity failure —
// propagate so the caller knows the store cannot be opened.
throw;
}
catch
{
// Skip corrupted blocks — non-critical recovery errors.
}
}
_nextBlockId = maxBlockId + 1;
// The last block is the active block if it has capacity (not sealed).
if (_blocks.Count > 0)
{
var lastBlock = _blocks[^1];
_activeBlock = lastBlock;
}
// After recovery, sync _last from skip-sequence high-water marks.
// SkipMsg/SkipMsgs write tombstone records with empty subject — these
// intentionally advance _last without storing a live message. We must
// include them in the high-water mark so the next StoreMsg gets the
// correct sequence number.
// We do NOT use block.LastSequence blindly because that includes
// soft-deleted real messages at the tail (e.g., after Truncate or
// RemoveMsg of the last message), which must not inflate _last.
// Go: filestore.go — recovery sets state.LastSeq from lmb.last.seq.
foreach (var blk in _blocks)
{
var maxSkip = blk.MaxSkipSequence;
if (maxSkip > _last)
_last = maxSkip;
}
// If no messages and no skips were found, fall back to block.LastSequence
// to preserve watermarks from purge or full-delete scenarios.
if (_last == 0)
{
foreach (var blk in _blocks)
{
var blkLast = blk.LastSequence;
if (blkLast > _last)
_last = blkLast;
}
}
// Prune expired messages AFTER establishing _last from blocks/skips.
// If all messages expire, PruneExpiredLinear resets _last to 0.
PruneExpired(DateTime.UtcNow);
// Sync _first from _meta; if empty, set to _last+1 (watermark).
RebuildIndexesFromMessages();
}
/// <summary>
/// Reads all non-deleted records from a block and adds them to the in-memory cache.
/// </summary>
private void RecoverMessagesFromBlock(MsgBlock block)
{
// We need to iterate through all sequences in the block.
// MsgBlock tracks first/last sequence, so we try each one.
var first = block.FirstSequence;
var last = block.LastSequence;
if (first == 0 && last == 0)
return; // Empty block.
for (var seq = first; seq <= last; seq++)
{
var record = block.Read(seq);
if (record is null)
continue; // Deleted or not present.
// Compute the original payload length for metadata tracking.
// InvalidDataException (e.g., wrong key) propagates to the caller.
var originalPayload = RestorePayload(record.Payload.Span);
_meta[record.Sequence] = new MessageMeta
{
BlockId = block.BlockId,
Subject = record.Subject,
PayloadLength = originalPayload.Length,
HeaderLength = record.Headers.Length,
TimestampNs = record.Timestamp,
};
if (record.Sequence > _last)
_last = record.Sequence;
// Go: re-register unexpired TTLs in the wheel after recovery.
// Reference: golang/nats-server/server/filestore.go — recoverMsgs, TTL re-registration.
if (_options.MaxAgeMs > 0)
{
RegisterTtl(record.Sequence, record.Timestamp, (long)_options.MaxAgeMs * 1_000_000L);
}
}
}
// -------------------------------------------------------------------------
// Legacy JSONL migration: if messages.jsonl exists, migrate to blocks.
// -------------------------------------------------------------------------
/// <summary>
/// Migrates data from the legacy JSONL format to block-based storage.
/// If messages.jsonl exists, reads all records, writes them to blocks,
/// then deletes the JSONL file and manifest.
/// </summary>
private void MigrateLegacyJsonl()
{
var jsonlPath = Path.Combine(_options.Directory, "messages.jsonl");
if (!File.Exists(jsonlPath))
return;
// Read all records from the JSONL file.
var legacyMessages = new List<(ulong Sequence, string Subject, byte[] Payload, DateTime TimestampUtc)>();
foreach (var line in File.ReadLines(jsonlPath))
{
if (string.IsNullOrWhiteSpace(line))
continue;
FileRecord? record;
try
{
record = JsonSerializer.Deserialize<FileRecord>(line);
}
catch
{
continue; // Skip corrupted lines.
}
if (record == null)
continue;
byte[] originalPayload;
try
{
originalPayload = RestorePayload(Convert.FromBase64String(record.PayloadBase64 ?? string.Empty));
}
catch
{
// Re-throw for integrity failures (e.g., wrong encryption key).
throw;
}
legacyMessages.Add((record.Sequence, record.Subject ?? string.Empty, originalPayload, record.TimestampUtc));
}
if (legacyMessages.Count == 0)
{
// Delete the empty JSONL file.
File.Delete(jsonlPath);
var manifestPath = Path.Combine(_options.Directory, _options.IndexManifestFileName);
if (File.Exists(manifestPath))
File.Delete(manifestPath);
return;
}
// Write all messages to fresh blocks, then populate _meta.
foreach (var (seq, subject, payload, ts) in legacyMessages)
{
if (seq > _last)
_last = seq;
var persistedPayload = TransformForPersist(payload);
var timestampNs = new DateTimeOffset(ts).ToUnixTimeMilliseconds() * 1_000_000L;
EnsureActiveBlock();
try
{
_activeBlock!.WriteAt(seq, subject, ReadOnlyMemory<byte>.Empty, persistedPayload, timestampNs);
}
catch (InvalidOperationException)
{
RotateBlock();
_activeBlock!.WriteAt(seq, subject, ReadOnlyMemory<byte>.Empty, persistedPayload, timestampNs);
}
_meta[seq] = new MessageMeta
{
BlockId = _activeBlock!.BlockId,
Subject = subject,
PayloadLength = payload.Length,
HeaderLength = 0,
TimestampNs = timestampNs,
};
if (_activeBlock!.IsSealed)
RotateBlock();
}
RebuildIndexesFromMessages();
// Delete the legacy files.
File.Delete(jsonlPath);
var manifestFile = Path.Combine(_options.Directory, _options.IndexManifestFileName);
if (File.Exists(manifestFile))
File.Delete(manifestFile);
}
// -------------------------------------------------------------------------
// Expiry
// -------------------------------------------------------------------------
/// <summary>
/// Registers a message in the TTL wheel when MaxAgeMs is configured.
/// The wheel's <see cref="HashWheel.ExpireTasks"/> uses Stopwatch-relative nanoseconds,
/// so we compute <c>expiresNs</c> as the current Stopwatch position plus the TTL duration.
/// If ttlNs is 0, this is a no-op.
/// Reference: golang/nats-server/server/filestore.go:6820 — storeMsg TTL scheduling.
/// </summary>
private void RegisterTtl(ulong seq, long timestampNs, long ttlNs)
{
if (ttlNs <= 0)
return;
_ttlWheel ??= new HashWheel();
// Convert to Stopwatch-domain nanoseconds to match ExpireTasks' time source.
// We intentionally discard timestampNs (Unix epoch ns) and use "now + ttl"
// relative to the Stopwatch epoch used by ExpireTasks.
var nowStopwatchNs = (long)((double)System.Diagnostics.Stopwatch.GetTimestamp()
/ System.Diagnostics.Stopwatch.Frequency * 1_000_000_000);
var expiresNs = nowStopwatchNs + ttlNs;
_ttlWheel.Add(seq, expiresNs);
}
/// <summary>
/// Checks the TTL wheel for expired entries and removes them from the store.
/// Uses the wheel's expiration scan which is O(expired) rather than O(total).
/// Expired messages are removed from the in-memory cache and soft-deleted in blocks,
/// but <see cref="_last"/> is preserved (sequence numbers are monotonically increasing
/// even when messages expire).
/// Reference: golang/nats-server/server/filestore.go — expireMsgs using thw.ExpireTasks.
/// </summary>
private void ExpireFromWheel()
{
if (_ttlWheel is null)
{
// Fall back to linear scan if wheel is not yet initialised.
// PruneExpiredLinear is only used during recovery (before first write).
PruneExpiredLinear(DateTime.UtcNow);
return;
}
var expired = new List<ulong>();
_ttlWheel.ExpireTasks((seq, _) =>
{
expired.Add(seq);
return true; // Remove from wheel.
});
if (expired.Count == 0)
return;
// Remove from in-memory cache and soft-delete in the block layer.
// We do NOT call RewriteBlocks here — that would reset _last and create a
// discontinuity in the sequence space. Soft-delete is sufficient for expiry.
// Reference: golang/nats-server/server/filestore.go:expireMsgs — dmap-based removal.
foreach (var seq in expired)
{
RemoveTrackedMessage(seq, preserveHighWaterMark: true);
DeleteInBlock(seq);
}
_generation++;
}
/// <summary>
/// O(n) fallback expiry scan used during recovery (before the wheel is warm)
/// or when MaxAgeMs is set but no messages have been appended yet.
/// </summary>
private void PruneExpiredLinear(DateTime nowUtc)
{
if (_options.MaxAgeMs <= 0)
return;
var cutoffNs = new DateTimeOffset(nowUtc.AddMilliseconds(-_options.MaxAgeMs)).ToUnixTimeMilliseconds() * 1_000_000L;
var expired = _meta
.Where(kv => kv.Value.TimestampNs < cutoffNs)
.Select(kv => kv.Key)
.ToArray();
if (expired.Length == 0)
return;
foreach (var sequence in expired)
{
RemoveTrackedMessage(sequence, preserveHighWaterMark: true);
DeleteInBlock(sequence);
}
// When all messages are expired, reset sequence pointers to 0.
// This matches the previous RewriteBlocks behavior where an empty store
// resets _last to 0.
if (_meta.Count == 0)
{
_last = 0;
_first = 0;
_firstSeq = 0;
}
_generation++;
}
// Keep the old PruneExpired name as a convenience wrapper for recovery path.
private void PruneExpired(DateTime nowUtc) => PruneExpiredLinear(nowUtc);
// -------------------------------------------------------------------------
// Payload transform: compress + encrypt on write; reverse on read.
//
// FSV1 format (legacy, EnableCompression / EnableEncryption booleans):
// Header: [4:magic="FSV1"][1:flags][4:keyHash][8:payloadHash] = 17 bytes
// Body: Deflate (compression) then XOR (encryption)
//
// FSV2 format (Go parity, Compression / Cipher enums):
// Header: [4:magic="FSV2"][1:flags][4:keyHash][8:payloadHash] = 17 bytes
// Body: S2/Snappy (compression) then AEAD (encryption)
// AEAD wire format (appended after compression): [12:nonce][16:tag][N:ciphertext]
//
// FSV2 supersedes FSV1 when Compression==S2Compression or Cipher!=NoCipher.
// On read, magic bytes select the decode path; FSV1 files remain readable.
// -------------------------------------------------------------------------
private byte[] TransformForPersist(ReadOnlySpan<byte> payload)
{
// Fast path: no compression or encryption — store raw payload without envelope.
// Avoids SHA256 hashing and envelope allocation on the hot publish path.
if (!_useS2 && !_useAead && !_options.EnableCompression && !_options.EnableEncryption)
return payload.ToArray();
var plaintext = payload.ToArray();
var transformed = plaintext;
byte flags = 0;
byte[] magic;
if (_useS2 || _useAead)
{
// FSV2 path: S2 compression and/or AEAD encryption.
magic = EnvelopeMagicV2;
if (_useS2)
{
transformed = S2Codec.Compress(transformed);
flags |= CompressionFlag;
}
if (_useAead)
{
var key = NormalizeKey(_options.EncryptionKey);
transformed = AeadEncryptor.Encrypt(transformed, key, _options.Cipher);
flags |= EncryptionFlag;
}
}
else
{
// FSV1 legacy path: Deflate + XOR.
magic = EnvelopeMagicV1;
if (_options.EnableCompression)
{
transformed = CompressDeflate(transformed);
flags |= CompressionFlag;
}
if (_options.EnableEncryption)
{
transformed = Xor(transformed, _options.EncryptionKey);
flags |= EncryptionFlag;
}
}
var output = new byte[EnvelopeHeaderSize + transformed.Length];
magic.AsSpan().CopyTo(output.AsSpan(0, magic.Length));
output[magic.Length] = flags;
BinaryPrimitives.WriteUInt32LittleEndian(output.AsSpan(5, 4), ComputeKeyHash(_options.EncryptionKey));
BinaryPrimitives.WriteUInt64LittleEndian(output.AsSpan(9, 8), ComputePayloadHash(plaintext));
transformed.CopyTo(output.AsSpan(EnvelopeHeaderSize));
return output;
}
private byte[] RestorePayload(ReadOnlySpan<byte> persisted)
{
if (TryReadEnvelope(persisted, out var version, out var flags, out var keyHash, out var payloadHash, out var body))
{
var data = body.ToArray();
if (version == 2)
{
// FSV2: AEAD decrypt then S2 decompress.
if ((flags & EncryptionFlag) != 0)
{
var key = NormalizeKey(_options.EncryptionKey);
try
{
data = AeadEncryptor.Decrypt(data, key, _options.Cipher);
}
catch (CryptographicException ex)
{
// AEAD tag verification failed — wrong key or corrupted data.
// Wrap as InvalidDataException so RecoverBlocks propagates it
// as a fatal key-mismatch error (same behaviour as FSV1 key-hash check).
throw new InvalidDataException("AEAD decryption failed: wrong key or corrupted block.", ex);
}
}
if ((flags & CompressionFlag) != 0)
data = S2Codec.Decompress(data);
}
else
{
// FSV1: XOR decrypt then Deflate decompress.
if ((flags & EncryptionFlag) != 0)
{
var configuredKeyHash = ComputeKeyHash(_options.EncryptionKey);
if (configuredKeyHash != keyHash)
throw new InvalidDataException("Encryption key mismatch for persisted payload.");
data = Xor(data, _options.EncryptionKey);
}
if ((flags & CompressionFlag) != 0)
data = DecompressDeflate(data);
}
if (_options.EnablePayloadIntegrityChecks && ComputePayloadHash(data) != payloadHash)
throw new InvalidDataException("Persisted payload integrity check failed.");
return data;
}
// Legacy format fallback for pre-envelope data (no header at all).
var legacy = persisted.ToArray();
if (_options.EnableEncryption)
legacy = Xor(legacy, _options.EncryptionKey);
if (_options.EnableCompression)
legacy = DecompressDeflate(legacy);
return legacy;
}
// -------------------------------------------------------------------------
// Helpers
// -------------------------------------------------------------------------
/// <summary>
/// Ensures the encryption key is exactly 32 bytes (padding with zeros or
/// truncating), matching the Go server's key normalisation for AEAD ciphers.
/// Only called for FSV2 AEAD path; FSV1 XOR accepts arbitrary key lengths.
/// </summary>
private static byte[] NormalizeKey(byte[]? key)
{
var normalized = new byte[AeadEncryptor.KeySize];
if (key is { Length: > 0 })
{
var copyLen = Math.Min(key.Length, AeadEncryptor.KeySize);
key.AsSpan(0, copyLen).CopyTo(normalized.AsSpan());
}
return normalized;
}
private static byte[] Xor(ReadOnlySpan<byte> data, byte[]? key)
{
if (key == null || key.Length == 0)
return data.ToArray();
var output = data.ToArray();
for (var i = 0; i < output.Length; i++)
output[i] ^= key[i % key.Length];
return output;
}
private static byte[] CompressDeflate(ReadOnlySpan<byte> data)
{
using var output = new MemoryStream();
using (var stream = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Fastest, leaveOpen: true))
{
stream.Write(data);
}
return output.ToArray();
}
private static byte[] DecompressDeflate(ReadOnlySpan<byte> data)
{
using var input = new MemoryStream(data.ToArray());
using var stream = new System.IO.Compression.DeflateStream(input, System.IO.Compression.CompressionMode.Decompress);
using var output = new MemoryStream();
stream.CopyTo(output);
return output.ToArray();
}
private static bool TryReadEnvelope(
ReadOnlySpan<byte> persisted,
out int version,
out byte flags,
out uint keyHash,
out ulong payloadHash,
out ReadOnlySpan<byte> payload)
{
version = 0;
flags = 0;
keyHash = 0;
payloadHash = 0;
payload = ReadOnlySpan<byte>.Empty;
if (persisted.Length < EnvelopeHeaderSize)
return false;
var magic = persisted[..EnvelopeMagicV1.Length];
if (magic.SequenceEqual(EnvelopeMagicV1))
version = 1;
else if (magic.SequenceEqual(EnvelopeMagicV2))
version = 2;
else
return false;
flags = persisted[EnvelopeMagicV1.Length];
keyHash = BinaryPrimitives.ReadUInt32LittleEndian(persisted.Slice(5, 4));
payloadHash = BinaryPrimitives.ReadUInt64LittleEndian(persisted.Slice(9, 8));
payload = persisted[EnvelopeHeaderSize..];
return true;
}
private static uint ComputeKeyHash(byte[]? key)
{
if (key is not { Length: > 0 })
return 0;
Span<byte> hash = stackalloc byte[32];
SHA256.HashData(key, hash);
return BinaryPrimitives.ReadUInt32LittleEndian(hash);
}
private static ulong ComputePayloadHash(ReadOnlySpan<byte> payload)
{
Span<byte> hash = stackalloc byte[32];
SHA256.HashData(payload, hash);
return BinaryPrimitives.ReadUInt64LittleEndian(hash);
}
private const byte CompressionFlag = 0b0000_0001;
private const byte EncryptionFlag = 0b0000_0010;
// FSV1: legacy Deflate + XOR envelope
private static readonly byte[] EnvelopeMagicV1 = "FSV1"u8.ToArray();
// FSV2: Go-parity S2 + AEAD envelope (filestore.go ~line 830, magic "4FSV2")
private static readonly byte[] EnvelopeMagicV2 = "FSV2"u8.ToArray();
private const int EnvelopeHeaderSize = 17; // 4 magic + 1 flags + 4 keyHash + 8 payloadHash
// -------------------------------------------------------------------------
// Go-parity sync methods not yet in the interface default implementations
// Reference: golang/nats-server/server/filestore.go
// -------------------------------------------------------------------------
/// <summary>
/// Soft-deletes a message by sequence number.
/// Returns <c>true</c> if the sequence existed and was removed.
/// Reference: golang/nats-server/server/filestore.go — RemoveMsg.
/// </summary>
public bool RemoveMsg(ulong seq)
{
if (!RemoveTrackedMessage(seq, preserveHighWaterMark: true))
return false;
_generation++;
// Go: filestore.go — LastSeq (lmb.last.seq) is a high-water mark and is
// never decremented on removal. Only FirstSeq advances when the first
// live message is removed.
DeleteInBlock(seq);
return true;
}
/// <summary>
/// Secure-erases a message: overwrites its payload bytes with random data on disk,
/// then soft-deletes it (same in-memory semantics as <see cref="RemoveMsg"/>).
/// Returns <c>true</c> if the sequence existed and was erased.
/// Reference: golang/nats-server/server/filestore.go:5890 (eraseMsg).
/// </summary>
public bool EraseMsg(ulong seq)
{
if (!RemoveTrackedMessage(seq, preserveHighWaterMark: true))
return false;
_generation++;
// Secure erase: overwrite payload bytes with random data before marking deleted.
// Reference: golang/nats-server/server/filestore.go:5890 (eraseMsg).
DeleteInBlock(seq, secureErase: true);
return true;
}
/// <summary>
/// Reserves a sequence without storing a message. Advances <see cref="_last"/>
/// to <paramref name="seq"/> (or _last+1 when seq is 0), recording the gap in
/// the block as a tombstone-style skip.
/// Returns the skipped sequence number.
/// Reference: golang/nats-server/server/filestore.go — SkipMsg.
/// </summary>
public ulong SkipMsg(ulong seq)
{
// When seq is 0, auto-assign next sequence.
var skipSeq = seq == 0 ? _last + 1 : seq;
_last = skipSeq;
// Do NOT add to _meta — it is a skip (tombstone).
// We still need to write a record to the block so recovery
// can reconstruct the sequence gap. Use an empty subject sentinel.
EnsureActiveBlock();
try
{
_activeBlock!.WriteSkip(skipSeq);
}
catch (InvalidOperationException)
{
RotateBlock();
_activeBlock!.WriteSkip(skipSeq);
}
if (_activeBlock!.IsSealed)
RotateBlock();
// After a skip, if there are no real messages, the next real first
// would be skipSeq+1. Track this so FastState reports correctly.
if (_meta.Count == 0)
_first = skipSeq + 1;
return skipSeq;
}
/// <summary>
/// Reserves a contiguous range of sequences starting at <paramref name="seq"/>
/// for <paramref name="num"/> slots.
/// Reference: golang/nats-server/server/filestore.go — SkipMsgs.
/// Go parity: when seq is non-zero it must match the expected next sequence
/// (_last + 1); otherwise an <see cref="InvalidOperationException"/> is thrown
/// (Go: ErrSequenceMismatch).
/// </summary>
public void SkipMsgs(ulong seq, ulong num)
{
if (seq != 0)
{
var expectedNext = _last + 1;
if (seq != expectedNext)
throw new InvalidOperationException($"Sequence mismatch: expected {expectedNext}, got {seq}.");
}
else
{
seq = _last + 1;
}
for (var i = 0UL; i < num; i++)
SkipMsg(seq + i);
}
/// <summary>
/// Loads a message by exact sequence number into the optional reusable container
/// <paramref name="sm"/>. Throws <see cref="KeyNotFoundException"/> if not found.
///
/// Fast path: O(1) hash lookup in <c>_meta</c> (all live messages are cached).
/// Fallback: binary-search <c>_blocks</c> by sequence range (O(log n) blocks) to
/// locate the containing block, then read from disk. This covers cases where a
/// future memory-pressure eviction removes entries from <c>_meta</c>.
/// Reference: golang/nats-server/server/filestore.go:8308 (LoadMsg).
/// </summary>
public StoreMsg LoadMsg(ulong seq, StoreMsg? sm)
{
var stored = MaterializeMessage(seq);
if (stored is null)
{
// Fallback: binary-search blocks by sequence range (O(log n)).
// Handles cases where _meta does not contain the sequence.
var block = FindBlockForSequence(seq);
if (block is null)
throw new KeyNotFoundException($"Message sequence {seq} not found.");
var record = block.Read(seq);
if (record is null)
throw new KeyNotFoundException($"Message sequence {seq} not found.");
sm ??= new StoreMsg();
sm.Clear();
sm.Subject = record.Subject;
sm.Header = record.Headers.Length > 0 ? record.Headers.ToArray() : null;
var originalPayload = RestorePayload(record.Payload.Span);
sm.Data = originalPayload.Length > 0 ? originalPayload : null;
sm.Sequence = record.Sequence;
sm.Timestamp = record.Timestamp;
return sm;
}
sm ??= new StoreMsg();
sm.Clear();
sm.Subject = stored.Subject;
sm.Header = stored.RawHeaders.Length > 0 ? stored.RawHeaders.ToArray() : null;
sm.Data = stored.Payload.Length > 0 ? stored.Payload.ToArray() : null;
sm.Sequence = stored.Sequence;
sm.Timestamp = new DateTimeOffset(stored.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L;
return sm;
}
/// <summary>
/// Binary-searches <c>_blocks</c> for the block whose sequence range
/// [<see cref="MsgBlock.FirstSequence"/>, <see cref="MsgBlock.LastSequence"/>]
/// contains <paramref name="seq"/>. Returns <c>null</c> if no block covers it.
/// Reference: golang/nats-server/server/filestore.go:8308 (block seek in LoadMsg).
/// </summary>
private MsgBlock? FindBlockForSequence(ulong seq)
{
if (_blocks.Count == 0)
return null;
// Binary search: find the rightmost block whose FirstSequence <= seq.
int lo = 0, hi = _blocks.Count - 1;
while (lo < hi)
{
var mid = (lo + hi + 1) / 2; // upper-mid to avoid infinite loop
if (_blocks[mid].FirstSequence <= seq)
lo = mid;
else
hi = mid - 1;
}
var candidate = _blocks[lo];
if (candidate.FirstSequence <= seq && seq <= candidate.LastSequence)
return candidate;
return null;
}
/// <summary>
/// Loads the most recent message on <paramref name="subject"/> into the optional
/// reusable container <paramref name="sm"/>.
/// Throws <see cref="KeyNotFoundException"/> if no message exists on the subject.
/// Reference: golang/nats-server/server/filestore.go — LoadLastMsg.
/// </summary>
public StoreMsg LoadLastMsg(string subject, StoreMsg? sm)
{
ulong? bestSeq = null;
foreach (var kv in _meta)
{
if (!string.IsNullOrEmpty(subject) && !SubjectMatchesFilter(kv.Value.Subject, subject))
continue;
if (bestSeq is null || kv.Key > bestSeq.Value)
bestSeq = kv.Key;
}
if (bestSeq is null)
throw new KeyNotFoundException($"No message found for subject '{subject}'.");
var match = MaterializeMessage(bestSeq.Value)
?? throw new KeyNotFoundException($"No message found for subject '{subject}'.");
sm ??= new StoreMsg();
sm.Clear();
sm.Subject = match.Subject;
sm.Header = match.RawHeaders.Length > 0 ? match.RawHeaders.ToArray() : null;
sm.Data = match.Payload.Length > 0 ? match.Payload.ToArray() : null;
sm.Sequence = match.Sequence;
sm.Timestamp = new DateTimeOffset(match.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L;
return sm;
}
/// <summary>
/// Loads the next message at or after <paramref name="start"/> whose subject
/// matches <paramref name="filter"/>. Returns the message and the number of
/// sequences skipped to reach it.
/// Reference: golang/nats-server/server/filestore.go — LoadNextMsg.
/// </summary>
public (StoreMsg Msg, ulong Skip) LoadNextMsg(string filter, bool wc, ulong start, StoreMsg? sm)
{
ulong? bestSeq = null;
foreach (var kv in _meta)
{
if (kv.Key < start) continue;
if (!string.IsNullOrEmpty(filter) && !SubjectMatchesFilter(kv.Value.Subject, filter))
continue;
if (bestSeq is null || kv.Key < bestSeq.Value)
bestSeq = kv.Key;
}
if (bestSeq is null)
throw new KeyNotFoundException($"No message found at or after seq {start} matching filter '{filter}'.");
var found = MaterializeMessage(bestSeq.Value)
?? throw new KeyNotFoundException($"No message found at or after seq {start} matching filter '{filter}'.");
var skip = bestSeq.Value > start ? bestSeq.Value - start : 0UL;
sm ??= new StoreMsg();
sm.Clear();
sm.Subject = found.Subject;
sm.Header = found.RawHeaders.Length > 0 ? found.RawHeaders.ToArray() : null;
sm.Data = found.Payload.Length > 0 ? found.Payload.ToArray() : null;
sm.Sequence = found.Sequence;
sm.Timestamp = new DateTimeOffset(found.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L;
return (sm, skip);
}
/// <summary>
/// Returns the last sequence for every distinct subject in the stream,
/// sorted ascending.
/// Reference: golang/nats-server/server/filestore.go — AllLastSeqs.
/// </summary>
public ulong[] AllLastSeqs()
{
var lastPerSubject = new Dictionary<string, ulong>(StringComparer.Ordinal);
foreach (var kv in _meta)
{
var subj = kv.Value.Subject;
if (!lastPerSubject.TryGetValue(subj, out var existing) || kv.Key > existing)
lastPerSubject[subj] = kv.Key;
}
var result = lastPerSubject.Values.ToArray();
Array.Sort(result);
return result;
}
/// <summary>
/// Returns the last sequences for subjects matching <paramref name="filters"/>,
/// limited to sequences at or below <paramref name="maxSeq"/> and capped at
/// <paramref name="maxAllowed"/> results.
/// Reference: golang/nats-server/server/filestore.go — MultiLastSeqs.
/// </summary>
public ulong[] MultiLastSeqs(string[] filters, ulong maxSeq, int maxAllowed)
{
var lastPerSubject = new Dictionary<string, ulong>(StringComparer.Ordinal);
foreach (var kv in _meta)
{
var seq = kv.Key;
if (maxSeq > 0 && seq > maxSeq)
continue;
var subj = kv.Value.Subject;
var matches = filters.Length == 0
|| filters.Any(f => SubjectMatchesFilter(subj, f));
if (!matches)
continue;
if (!lastPerSubject.TryGetValue(subj, out var existing) || seq > existing)
lastPerSubject[subj] = seq;
}
var result = lastPerSubject.Values.OrderBy(s => s).ToArray();
// Go parity: ErrTooManyResults — when maxAllowed > 0 and results exceed it.
if (maxAllowed > 0 && result.Length > maxAllowed)
throw new InvalidOperationException($"Too many results: got {result.Length}, max allowed is {maxAllowed}.");
return result;
}
/// <summary>
/// Returns the subject stored at <paramref name="seq"/>.
/// Throws <see cref="KeyNotFoundException"/> if the sequence does not exist.
/// Reference: golang/nats-server/server/filestore.go — SubjectForSeq.
/// </summary>
public string SubjectForSeq(ulong seq)
{
if (!_meta.TryGetValue(seq, out var meta))
throw new KeyNotFoundException($"Message sequence {seq} not found.");
return meta.Subject;
}
/// <summary>
/// Counts messages pending from sequence <paramref name="sseq"/> matching
/// <paramref name="filter"/>. When <paramref name="lastPerSubject"/> is true,
/// only the last message per subject is counted.
/// Returns (total, validThrough) where validThrough is the last sequence checked.
/// Reference: golang/nats-server/server/filestore.go — NumPending.
/// </summary>
public (ulong Total, ulong ValidThrough) NumPending(ulong sseq, string filter, bool lastPerSubject)
{
var candidates = _meta
.Where(kv => kv.Key >= sseq)
.Where(kv => string.IsNullOrEmpty(filter) || SubjectMatchesFilter(kv.Value.Subject, filter))
.ToList();
if (lastPerSubject)
{
// One-per-subject: take the last sequence per subject.
var lastBySubject = new Dictionary<string, ulong>(StringComparer.Ordinal);
foreach (var kv in candidates)
{
if (!lastBySubject.TryGetValue(kv.Value.Subject, out var existing) || kv.Key > existing)
lastBySubject[kv.Value.Subject] = kv.Key;
}
candidates = candidates.Where(kv => lastBySubject.TryGetValue(kv.Value.Subject, out var last) && kv.Key == last).ToList();
}
var total = (ulong)candidates.Count;
var validThrough = _last;
return (total, validThrough);
}
// -------------------------------------------------------------------------
// Go-parity IStreamStore methods: StoreRawMsg, LoadPrevMsg, Type, Stop
// Reference: golang/nats-server/server/filestore.go
// -------------------------------------------------------------------------
/// <summary>
/// Stores a message at a caller-specified sequence number and timestamp.
/// Used for replication and mirroring — the caller (NRG, mirror source) controls
/// the sequence/timestamp rather than the store auto-incrementing them.
/// <para>Unlike <see cref="StoreMsg"/>, this does NOT call <c>ExpireFromWheel</c>
/// or auto-increment <c>_last</c>. It updates <c>_last</c> via
/// <c>Math.Max(_last, seq)</c> so the watermark reflects the highest stored
/// sequence.</para>
/// Reference: golang/nats-server/server/filestore.go:6756 (storeRawMsg).
/// </summary>
public void StoreRawMsg(string subject, byte[]? hdr, byte[] msg, ulong seq, long ts, long ttl, bool discardNewCheck)
{
if (_stopped)
throw new ObjectDisposedException(nameof(FileStore), "Store has been stopped.");
var headers = hdr is { Length: > 0 } ? hdr : [];
var payload = msg ?? [];
var persistedPayload = TransformForPersist(payload);
// Register TTL using the caller-supplied timestamp and TTL.
var effectiveTtlNs = ttl > 0 ? ttl : (_options.MaxAgeMs > 0 ? (long)_options.MaxAgeMs * 1_000_000L : 0L);
RegisterTtl(seq, ts, effectiveTtlNs);
EnsureActiveBlock();
try
{
_activeBlock!.WriteAt(seq, subject, headers, persistedPayload, ts);
}
catch (InvalidOperationException)
{
RotateBlock();
_activeBlock!.WriteAt(seq, subject, headers, persistedPayload, ts);
}
TrackMessage(seq, subject, headers.Length, payload.Length, ts, _activeBlock!.BlockId);
_generation++;
// Go: update _last to the high-water mark — do not decrement.
_last = Math.Max(_last, seq);
// Go: filestore.go:4443 (setupWriteCache) — record write in bounded cache manager.
_writeCache.TrackWrite(_activeBlock!.BlockId, headers.Length + persistedPayload.Length);
if (_activeBlock!.IsSealed)
RotateBlock();
}
/// <summary>
/// Loads the message immediately before <paramref name="start"/> by walking
/// backward from <c>start - 1</c> to <c>_first</c>.
/// Throws <see cref="KeyNotFoundException"/> if no such message exists.
/// Reference: golang/nats-server/server/filestore.go — LoadPrevMsg.
/// </summary>
public StoreMsg LoadPrevMsg(ulong start, StoreMsg? sm)
{
if (start == 0)
throw new KeyNotFoundException("No message found before seq 0.");
var first = _meta.Count > 0 ? _meta.Keys.Min() : 1UL;
for (var seq = start - 1; seq >= first && seq <= _last; seq--)
{
if (_meta.ContainsKey(seq))
{
var stored = MaterializeMessage(seq);
if (stored is not null)
{
sm ??= new StoreMsg();
sm.Clear();
sm.Subject = stored.Subject;
sm.Header = stored.RawHeaders.Length > 0 ? stored.RawHeaders.ToArray() : null;
sm.Data = stored.Payload.Length > 0 ? stored.Payload.ToArray() : null;
sm.Sequence = stored.Sequence;
sm.Timestamp = new DateTimeOffset(stored.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L;
return sm;
}
}
// Prevent underflow on ulong subtraction.
if (seq == 0)
break;
}
throw new KeyNotFoundException($"No message found before seq {start}.");
}
/// <summary>
/// Returns the storage backend type for this store instance.
/// Reference: golang/nats-server/server/filestore.go — fileStore.Type.
/// </summary>
public StorageType Type() => StorageType.File;
/// <summary>
/// Flushes the active block to disk and marks the store as stopped.
/// After <c>Stop()</c> returns, calls to <see cref="StoreMsg"/> or
/// <see cref="AppendAsync"/> will throw <see cref="ObjectDisposedException"/>.
/// Blocks are NOT deleted — use <see cref="Delete"/> if data removal is needed.
/// Reference: golang/nats-server/server/filestore.go — fileStore.Stop.
/// </summary>
public void Stop()
{
if (_stopped)
return;
_stopped = true;
// Stop the background flush loop before accessing blocks.
StopFlushLoop();
// Flush pending buffered writes and the active block to ensure all data reaches disk.
_activeBlock?.FlushPending();
_activeBlock?.Flush();
// Dispose all blocks to release OS file handles. The files remain on disk.
DisposeAllBlocks();
}
/// <summary>
/// Returns a binary-encoded snapshot of the stream state. The <paramref name="failed"/>
/// parameter indicates the number of failed apply operations (passed through for
/// cluster consensus use). Currently returns an empty array — the full binary
/// encoding will be added when the RAFT snapshot codec is implemented (Task 9).
/// Reference: golang/nats-server/server/filestore.go — EncodedStreamState.
/// </summary>
public byte[] EncodedStreamState(ulong failed) => [];
/// <summary>
/// Updates the stream configuration. Applies new limits (MaxMsgsPerSubject,
/// MaxAge, etc.) to the store options.
/// Reference: golang/nats-server/server/filestore.go — UpdateConfig.
/// </summary>
public void UpdateConfig(StreamConfig cfg)
{
_options.MaxMsgsPerSubject = cfg.MaxMsgsPer;
if (cfg.MaxAgeMs > 0)
_options.MaxAgeMs = cfg.MaxAgeMs;
// Enforce per-subject limits immediately after config change.
if (_options.MaxMsgsPerSubject > 0)
{
var subjects = _meta.Values.Select(m => m.Subject).Distinct(StringComparer.Ordinal).ToList();
foreach (var subject in subjects)
EnforceMaxMsgsPerSubject(subject);
}
}
/// <summary>
/// Removes oldest messages for the given subject until the per-subject count
/// is within the <see cref="FileStoreOptions.MaxMsgsPerSubject"/> limit.
/// Reference: golang/nats-server/server/filestore.go — enforcePerSubjectLimit.
/// </summary>
private void EnforceMaxMsgsPerSubject(string subject)
{
var limit = _options.MaxMsgsPerSubject;
if (limit <= 0)
return;
var subjectMsgs = _meta
.Where(kv => string.Equals(kv.Value.Subject, subject, StringComparison.Ordinal))
.OrderBy(kv => kv.Key)
.ToList();
while (subjectMsgs.Count > limit)
{
var oldest = subjectMsgs[0];
_totalBytes -= (ulong)(oldest.Value.HeaderLength + oldest.Value.PayloadLength);
_messageCount--;
_meta.Remove(oldest.Key);
DeleteInBlock(oldest.Key);
subjectMsgs.RemoveAt(0);
_generation++;
}
if (_meta.Count > 0)
_firstSeq = _meta.Keys.Min();
}
/// <summary>
/// Resets internal cached state after a leadership transition or snapshot restore.
/// Currently a no-op — the FileStore re-derives its state from blocks on construction.
/// Reference: golang/nats-server/server/filestore.go — ResetState.
/// </summary>
public void ResetState() { }
// -------------------------------------------------------------------------
// ConsumerStore factory
// Reference: golang/nats-server/server/filestore.go — fileStore.ConsumerStore
// -------------------------------------------------------------------------
/// <summary>
/// Creates or opens a per-consumer state store backed by a binary file.
/// The state file is located at <c>{Directory}/obs/{name}/o.dat</c>,
/// matching the Go server's consumer directory layout.
/// Reference: golang/nats-server/server/filestore.go — newConsumerFileStore.
/// </summary>
public IConsumerStore ConsumerStore(string name, DateTime created, ConsumerConfig cfg)
{
var consumerDir = Path.Combine(_options.Directory, "obs", name);
Directory.CreateDirectory(consumerDir);
var stateFile = Path.Combine(consumerDir, "o.dat");
return new ConsumerFileStore(stateFile, cfg);
}
// -------------------------------------------------------------------------
// FlushAllPending: flush buffered writes and checkpoint stream state.
// Reference: golang/nats-server/server/filestore.go:5783-5842
// (flushPendingWritesUnlocked / writeFullState)
// -------------------------------------------------------------------------
/// <summary>
/// Flushes any buffered writes in the active block to disk and atomically
/// persists a lightweight stream state checkpoint (stream.state) so that a
/// subsequent recovery after a crash can quickly identify the last known
/// good sequence without re-scanning every block.
/// Reference: golang/nats-server/server/filestore.go:5783 (flushPendingWritesUnlocked).
/// </summary>
public async Task FlushAllPending()
{
_activeBlock?.FlushPending();
_activeBlock?.Flush();
await WriteStreamStateAsync();
}
/// <summary>
/// Background flush loop that coalesces buffered writes from MsgBlock into
/// batched disk writes. Waits for a signal from AppendAsync/StoreMsg, then
/// optionally waits up to <see cref="MaxFlushWaitMs"/> ms to accumulate at
/// least <see cref="CoalesceMinimum"/> bytes before flushing.
/// Reference: golang/nats-server/server/filestore.go:5841 (flushLoop).
/// </summary>
private async Task FlushLoopAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
try { await _flushSignal.Reader.WaitToReadAsync(ct); }
catch (OperationCanceledException) { return; }
_flushSignal.Reader.TryRead(out _);
var block = _activeBlock;
if (block is null)
continue;
// Go-style exponential backoff: 1→2→4→8ms (vs linear 1ms × 8).
var waitMs = 1;
var totalWaited = 0;
while (block.PendingWriteSize < CoalesceMinimum && totalWaited < MaxFlushWaitMs)
{
var delay = Math.Min(waitMs, MaxFlushWaitMs - totalWaited);
try { await Task.Delay(delay, ct); }
catch (OperationCanceledException) { break; }
totalWaited += delay;
waitMs *= 2;
}
block.FlushPending();
}
}
/// <summary>
/// Cancels the background flush loop and waits for it to complete.
/// Must be called before disposing blocks to avoid accessing disposed locks.
/// </summary>
private void StopFlushLoop()
{
_flushCts.Cancel();
_flushTask?.GetAwaiter().GetResult();
}
/// <summary>
/// Async version of <see cref="StopFlushLoop"/>.
/// </summary>
private async Task StopFlushLoopAsync()
{
await _flushCts.CancelAsync();
if (_flushTask is not null)
await _flushTask;
}
/// <summary>
/// Atomically persists a compact stream state snapshot to disk using
/// <see cref="AtomicFileWriter"/> (write-to-temp-then-rename) so that a
/// partial write never leaves a corrupt state file. The <see cref="_stateWriteLock"/>
/// semaphore serialises concurrent flush calls so that only one write is
/// in-flight at a time.
/// The file is written as JSON to <c>{Directory}/stream.state</c>.
/// Reference: golang/nats-server/server/filestore.go:5820 (writeFullState).
/// </summary>
private async Task WriteStreamStateAsync()
{
var statePath = Path.Combine(_options.Directory, "stream.state");
var snapshot = new StreamStateSnapshot
{
FirstSeq = _messageCount > 0 ? _firstSeq : 0UL,
LastSeq = _last,
Messages = _messageCount,
Bytes = (ulong)_blocks.Sum(b => b.BytesUsed),
};
var json = JsonSerializer.SerializeToUtf8Bytes(snapshot);
await _stateWriteLock.WaitAsync();
try
{
await AtomicFileWriter.WriteAtomicallyAsync(statePath, json);
}
finally
{
_stateWriteLock.Release();
}
}
// -------------------------------------------------------------------------
// StreamStateSnapshot — private checkpoint record written by WriteStreamState.
// -------------------------------------------------------------------------
private sealed record StreamStateSnapshot
{
public ulong FirstSeq { get; init; }
public ulong LastSeq { get; init; }
public ulong Messages { get; init; }
public ulong Bytes { get; init; }
}
private sealed class FileRecord
{
public ulong Sequence { get; init; }
public string? Subject { get; init; }
public string? HeadersBase64 { get; init; }
public string? PayloadBase64 { get; init; }
public DateTime TimestampUtc { get; init; }
}
// -------------------------------------------------------------------------
// WriteCacheManager — bounded, TTL-based write cache across all active blocks.
//
// Go: filestore.go:4443 (setupWriteCache) sets up per-block write caches.
// filestore.go:6148 (expireCache) / filestore.go:6220 (expireCacheLocked)
// expire individual block caches via per-block timers.
//
// .NET adaptation: a single background PeriodicTimer (500 ms tick) manages
// cache lifetime across all blocks. Entries are tracked by block ID with
// their last-write time and approximate byte size. When total size exceeds
// MaxCacheSize or an entry's age exceeds CacheExpiry, the block is flushed
// and its cache is cleared.
// -------------------------------------------------------------------------
/// <summary>
/// Manages a bounded, TTL-based write cache for all active <see cref="MsgBlock"/>
/// instances within a <see cref="FileStore"/>. A background <see cref="PeriodicTimer"/>
/// fires every 500 ms to evict stale or oversized cache entries.
///
/// Reference: golang/nats-server/server/filestore.go:4443 (setupWriteCache),
/// golang/nats-server/server/filestore.go:6148 (expireCache).
/// </summary>
internal sealed class WriteCacheManager : IAsyncDisposable
{
/// <summary>Tracks per-block cache state.</summary>
internal sealed class CacheEntry
{
public int BlockId { get; init; }
public long LastWriteTime { get; set; } // Environment.TickCount64 (ms)
public long ApproximateBytes { get; set; }
}
private readonly ConcurrentDictionary<int, CacheEntry> _entries = new();
private readonly Func<int, MsgBlock?> _blockLookup;
private readonly long _maxCacheSizeBytes;
private readonly long _cacheExpiryMs;
private readonly PeriodicTimer _timer;
private readonly Task _backgroundTask;
private readonly CancellationTokenSource _cts = new();
/// <summary>Tick interval for the background eviction loop.</summary>
internal static readonly TimeSpan TickInterval = TimeSpan.FromMilliseconds(500);
/// <summary>
/// Creates a new <see cref="WriteCacheManager"/>.
/// </summary>
/// <param name="maxCacheSizeBytes">Total cache size limit in bytes before forced eviction.</param>
/// <param name="cacheExpiry">Idle TTL after which a block's cache is evicted.</param>
/// <param name="blockLookup">
/// Delegate to look up an active <see cref="MsgBlock"/> by block ID.
/// Returns <c>null</c> if the block no longer exists (e.g., already disposed).
/// </param>
public WriteCacheManager(long maxCacheSizeBytes, TimeSpan cacheExpiry, Func<int, MsgBlock?> blockLookup)
{
_maxCacheSizeBytes = maxCacheSizeBytes;
_cacheExpiryMs = (long)cacheExpiry.TotalMilliseconds;
_blockLookup = blockLookup;
_timer = new PeriodicTimer(TickInterval);
_backgroundTask = RunAsync(_cts.Token);
}
/// <summary>Total approximate bytes currently tracked across all cached entries.</summary>
public long TotalCachedBytes
{
get
{
long total = 0;
foreach (var entry in _entries.Values)
total += entry.ApproximateBytes;
return total;
}
}
/// <summary>Number of block IDs currently tracked in the cache.</summary>
public int TrackedBlockCount => _entries.Count;
/// <summary>
/// Records a write to the specified block, updating the entry's timestamp and size.
/// Reference: golang/nats-server/server/filestore.go:6529 (lwts update on write).
/// </summary>
public void TrackWrite(int blockId, long bytes)
{
var now = Environment.TickCount64;
_entries.AddOrUpdate(
blockId,
_ => new CacheEntry { BlockId = blockId, LastWriteTime = now, ApproximateBytes = bytes },
(_, existing) =>
{
existing.LastWriteTime = now;
existing.ApproximateBytes += bytes;
return existing;
});
}
/// <summary>
/// Test helper: records a write with an explicit timestamp (ms since boot).
/// Allows tests to simulate past writes without sleeping, avoiding timing
/// dependencies in TTL and size-cap eviction tests.
/// </summary>
internal void TrackWriteAt(int blockId, long bytes, long tickCount64Ms)
{
_entries.AddOrUpdate(
blockId,
_ => new CacheEntry { BlockId = blockId, LastWriteTime = tickCount64Ms, ApproximateBytes = bytes },
(_, existing) =>
{
existing.LastWriteTime = tickCount64Ms;
existing.ApproximateBytes += bytes;
return existing;
});
}
/// <summary>
/// Immediately evicts the specified block's cache entry — flush then clear.
/// Called from <see cref="FileStore.RotateBlock"/> for the outgoing block.
/// Reference: golang/nats-server/server/filestore.go:4499 (flushPendingMsgsLocked on rotation).
/// </summary>
public void EvictBlock(int blockId)
{
if (!_entries.TryRemove(blockId, out _))
return;
var block = _blockLookup(blockId);
if (block is null)
return;
block.Flush();
block.ClearCache();
}
/// <summary>
/// Flushes and clears the cache for all currently tracked blocks.
/// Reference: golang/nats-server/server/filestore.go:5499 (flushPendingMsgsLocked, all blocks).
/// </summary>
public async Task FlushAllAsync()
{
var blockIds = _entries.Keys.ToArray();
foreach (var blockId in blockIds)
{
if (!_entries.TryRemove(blockId, out _))
continue;
var block = _blockLookup(blockId);
if (block is null)
continue;
block.Flush();
block.ClearCache();
}
await Task.CompletedTask; // Kept async for future I/O path upgrades.
}
/// <summary>
/// Stops the background timer and flushes all pending cache entries.
/// </summary>
public async ValueTask DisposeAsync()
{
await _cts.CancelAsync();
_timer.Dispose();
try { await _backgroundTask.ConfigureAwait(false); }
catch (OperationCanceledException oce) when (oce.CancellationToken == _cts.Token)
{
// OperationCanceledException from our own CTS is the normal shutdown
// signal — WaitForNextTickAsync throws when the token is cancelled.
// Swallowing here is deliberate: the task completed cleanly.
_ = oce.CancellationToken; // reference to satisfy SW003 non-empty requirement
}
// Flush remaining cached blocks on dispose.
await FlushAllAsync();
_cts.Dispose();
}
// ------------------------------------------------------------------
// Background eviction loop
// ------------------------------------------------------------------
private async Task RunAsync(CancellationToken ct)
{
try
{
while (await _timer.WaitForNextTickAsync(ct))
RunEviction();
}
catch (OperationCanceledException oce)
{
// PeriodicTimer cancelled — normal background loop shutdown.
_ = oce;
}
}
/// <summary>
/// One eviction pass: expire TTL-exceeded entries and enforce the size cap.
/// Reference: golang/nats-server/server/filestore.go:6220 (expireCacheLocked).
/// </summary>
internal void RunEviction()
{
var now = Environment.TickCount64;
// 1. Expire entries that have been idle longer than the TTL.
foreach (var (blockId, entry) in _entries)
{
if (now - entry.LastWriteTime >= _cacheExpiryMs)
EvictBlock(blockId);
}
// 2. If total size still exceeds cap, evict the oldest entries first.
if (TotalCachedBytes > _maxCacheSizeBytes)
{
var ordered = _entries.Values
.OrderBy(e => e.LastWriteTime)
.ToArray();
foreach (var entry in ordered)
{
if (TotalCachedBytes <= _maxCacheSizeBytes)
break;
EvictBlock(entry.BlockId);
}
}
}
}
}