feat(storage): rewrite FileStore to use block-based MsgBlock storage

Replace JSONL persistence with real MsgBlock-based block files (.blk).
FileStore now acts as a block manager that creates, seals, and rotates
MsgBlocks while maintaining an in-memory cache for fast reads/queries.

Key changes:
- AppendAsync writes transformed payloads to MsgBlock via WriteAt
- Block rotation occurs when active block reaches size limit
- Recovery scans .blk files and rebuilds in-memory state from records
- Legacy JSONL migration: existing messages.jsonl data is automatically
  converted to block files on first open, then JSONL is deleted
- PurgeAsync disposes and deletes all block files
- RewriteBlocks rebuilds blocks from cache (used by trim/restore)
- InvalidDataException propagates during recovery (wrong encryption key)

MsgBlock.WriteAt added to support explicit sequence numbers and timestamps,
needed when rewriting blocks with non-contiguous sequences (after removes).

Tests updated:
- New FileStoreBlockTests.cs with 9 tests for block-specific behavior
- JetStreamFileStoreCompressionEncryptionParityTests updated to read
  FSV1 magic from .blk files instead of messages.jsonl
- JetStreamFileStoreDurabilityParityTests updated to verify .blk files
  instead of index.manifest.json

All 3,562 tests pass (3,535 passed + 27 skipped, 0 failures).
This commit is contained in:
Joseph Doherty
2026-02-24 12:39:32 -05:00
parent 09252b8c79
commit 2816e8f048
5 changed files with 681 additions and 170 deletions

View File

@@ -10,23 +10,34 @@ using ApiStreamState = NATS.Server.JetStream.Models.ApiStreamState;
namespace NATS.Server.JetStream.Storage;
/// <summary>
/// Block-based file store for JetStream messages. Uses <see cref="MsgBlock"/> for
/// on-disk persistence and maintains an in-memory cache (<see cref="StoredMessage"/>)
/// for fast reads and subject queries.
///
/// Reference: golang/nats-server/server/filestore.go — block manager, block rotation,
/// recovery via scanning .blk files, soft-delete via dmap.
/// </summary>
public sealed class FileStore : IStreamStore, IAsyncDisposable
{
private readonly FileStoreOptions _options;
private readonly string _dataFilePath;
private readonly string _manifestPath;
// In-memory cache: keyed by sequence number. This is the primary data structure
// for reads and queries. The blocks are the on-disk persistence layer.
private readonly Dictionary<ulong, StoredMessage> _messages = new();
private readonly Dictionary<ulong, BlockPointer> _index = new();
// Block-based storage: the active (writable) block and sealed blocks.
private readonly List<MsgBlock> _blocks = [];
private MsgBlock? _activeBlock;
private int _nextBlockId;
private ulong _last;
private int _blockCount;
private long _activeBlockBytes;
private long _writeOffset;
// Resolved at construction time: which format family to use.
private readonly bool _useS2; // true S2Codec (FSV2 compression path)
private readonly bool _useAead; // true AeadEncryptor (FSV2 encryption path)
private readonly bool _useS2; // true -> S2Codec (FSV2 compression path)
private readonly bool _useAead; // true -> AeadEncryptor (FSV2 encryption path)
public int BlockCount => _messages.Count == 0 ? 0 : Math.Max(_blockCount, 1);
public int BlockCount => _blocks.Count;
public bool UsedIndexManifestOnStartup { get; private set; }
public FileStore(FileStoreOptions options)
@@ -40,10 +51,10 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
_useAead = _options.Cipher != StoreCipher.NoCipher;
Directory.CreateDirectory(options.Directory);
_dataFilePath = Path.Combine(options.Directory, "messages.jsonl");
_manifestPath = Path.Combine(options.Directory, _options.IndexManifestFileName);
LoadBlockIndexManifestOnStartup();
LoadExisting();
// Attempt legacy JSONL migration first, then recover from blocks.
MigrateLegacyJsonl();
RecoverBlocks();
}
public async ValueTask<ulong> AppendAsync(string subject, ReadOnlyMemory<byte> payload, CancellationToken ct)
@@ -51,28 +62,36 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
PruneExpired(DateTime.UtcNow);
_last++;
var now = DateTime.UtcNow;
var timestamp = new DateTimeOffset(now).ToUnixTimeMilliseconds() * 1_000_000L;
var persistedPayload = TransformForPersist(payload.Span);
var stored = new StoredMessage
{
Sequence = _last,
Subject = subject,
Payload = payload.ToArray(),
TimestampUtc = DateTime.UtcNow,
TimestampUtc = now,
};
_messages[_last] = stored;
var line = JsonSerializer.Serialize(new FileRecord
// Write to MsgBlock. The payload stored in the block is the transformed
// (compressed/encrypted) payload, not the plaintext.
EnsureActiveBlock();
try
{
Sequence = stored.Sequence,
Subject = stored.Subject,
PayloadBase64 = Convert.ToBase64String(persistedPayload),
TimestampUtc = stored.TimestampUtc,
});
await File.AppendAllTextAsync(_dataFilePath, line + Environment.NewLine, ct);
_activeBlock!.WriteAt(_last, subject, ReadOnlyMemory<byte>.Empty, persistedPayload, timestamp);
}
catch (InvalidOperationException)
{
// Block is sealed. Rotate to a new block and retry.
RotateBlock();
_activeBlock!.WriteAt(_last, subject, ReadOnlyMemory<byte>.Empty, persistedPayload, timestamp);
}
// Check if the block just became sealed after this write.
if (_activeBlock!.IsSealed)
RotateBlock();
var recordBytes = Encoding.UTF8.GetByteCount(line + Environment.NewLine);
TrackBlockForRecord(recordBytes, stored.Sequence);
PersistBlockIndexManifest(_manifestPath, _index);
return _last;
}
@@ -106,23 +125,31 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
{
if (sequence == _last)
_last = _messages.Count == 0 ? 0UL : _messages.Keys.Max();
RewriteDataFile();
// Soft-delete in the block that contains this sequence.
DeleteInBlock(sequence);
}
return ValueTask.FromResult(removed);
}
public ValueTask PurgeAsync(CancellationToken ct)
{
_messages.Clear();
_index.Clear();
_last = 0;
_blockCount = 0;
_activeBlockBytes = 0;
_writeOffset = 0;
if (File.Exists(_dataFilePath))
File.Delete(_dataFilePath);
if (File.Exists(_manifestPath))
File.Delete(_manifestPath);
// Dispose and delete all blocks.
DisposeAllBlocks();
CleanBlockFiles();
// Clean up any legacy files that might still exist.
var jsonlPath = Path.Combine(_options.Directory, "messages.jsonl");
if (File.Exists(jsonlPath))
File.Delete(jsonlPath);
var manifestPath = Path.Combine(_options.Directory, _options.IndexManifestFileName);
if (File.Exists(manifestPath))
File.Delete(manifestPath);
return ValueTask.CompletedTask;
}
@@ -145,11 +172,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
public ValueTask RestoreSnapshotAsync(ReadOnlyMemory<byte> snapshot, CancellationToken ct)
{
_messages.Clear();
_index.Clear();
_last = 0;
_blockCount = 0;
_activeBlockBytes = 0;
_writeOffset = 0;
// Dispose existing blocks and clean files.
DisposeAllBlocks();
CleanBlockFiles();
if (!snapshot.IsEmpty)
{
@@ -158,11 +185,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
{
foreach (var record in records)
{
var restoredPayload = RestorePayload(Convert.FromBase64String(record.PayloadBase64 ?? string.Empty));
var message = new StoredMessage
{
Sequence = record.Sequence,
Subject = record.Subject ?? string.Empty,
Payload = RestorePayload(Convert.FromBase64String(record.PayloadBase64 ?? string.Empty)),
Payload = restoredPayload,
TimestampUtc = record.TimestampUtc,
};
_messages[record.Sequence] = message;
@@ -171,7 +199,8 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
}
}
RewriteDataFile();
// Write all messages to fresh blocks.
RewriteBlocks();
return ValueTask.CompletedTask;
}
@@ -194,144 +223,302 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
_messages.Remove(first);
}
RewriteDataFile();
// Rewrite blocks to reflect the trim (removes trimmed messages from disk).
RewriteBlocks();
}
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
private void LoadExisting()
public ValueTask DisposeAsync()
{
if (!File.Exists(_dataFilePath))
DisposeAllBlocks();
return ValueTask.CompletedTask;
}
// -------------------------------------------------------------------------
// Block management
// -------------------------------------------------------------------------
/// <summary>
/// Ensures an active (writable) block exists. Creates one if needed.
/// </summary>
private void EnsureActiveBlock()
{
if (_activeBlock is null || _activeBlock.IsSealed)
RotateBlock();
}
/// <summary>
/// Creates a new active block. The previous active block (if any) stays in the
/// block list as a sealed block. The firstSequence is set to _last + 1 (the next
/// expected sequence), but actual sequences come from WriteAt calls.
/// </summary>
private void RotateBlock()
{
var firstSeq = _last + 1;
var block = MsgBlock.Create(_nextBlockId, _options.Directory, _options.BlockSizeBytes, firstSeq);
_blocks.Add(block);
_activeBlock = block;
_nextBlockId++;
}
/// <summary>
/// Soft-deletes a message in the block that contains it.
/// </summary>
private void DeleteInBlock(ulong sequence)
{
foreach (var block in _blocks)
{
if (sequence >= block.FirstSequence && sequence <= block.LastSequence)
{
block.Delete(sequence);
return;
}
}
}
/// <summary>
/// Disposes all blocks and clears the block list.
/// </summary>
private void DisposeAllBlocks()
{
foreach (var block in _blocks)
block.Dispose();
_blocks.Clear();
_activeBlock = null;
_nextBlockId = 0;
}
/// <summary>
/// Deletes all .blk files in the store directory.
/// </summary>
private void CleanBlockFiles()
{
if (!Directory.Exists(_options.Directory))
return;
foreach (var line in File.ReadLines(_dataFilePath))
foreach (var blkFile in Directory.GetFiles(_options.Directory, "*.blk"))
{
if (string.IsNullOrWhiteSpace(line))
try { File.Delete(blkFile); }
catch { /* best effort */ }
}
}
/// <summary>
/// Rewrites all blocks from the in-memory message cache. Used after trim,
/// snapshot restore, or legacy migration.
/// </summary>
private void RewriteBlocks()
{
DisposeAllBlocks();
CleanBlockFiles();
_last = _messages.Count == 0 ? 0UL : _messages.Keys.Max();
foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value))
{
var persistedPayload = TransformForPersist(message.Payload.Span);
var timestamp = new DateTimeOffset(message.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L;
EnsureActiveBlock();
try
{
_activeBlock!.WriteAt(message.Sequence, message.Subject, ReadOnlyMemory<byte>.Empty, persistedPayload, timestamp);
}
catch (InvalidOperationException)
{
RotateBlock();
_activeBlock!.WriteAt(message.Sequence, message.Subject, ReadOnlyMemory<byte>.Empty, persistedPayload, timestamp);
}
if (_activeBlock!.IsSealed)
RotateBlock();
}
}
// -------------------------------------------------------------------------
// Recovery: scan .blk files on startup and rebuild in-memory state.
// -------------------------------------------------------------------------
/// <summary>
/// Recovers all blocks from .blk files in the store directory.
/// </summary>
private void RecoverBlocks()
{
var blkFiles = Directory.GetFiles(_options.Directory, "*.blk");
if (blkFiles.Length == 0)
return;
// Sort by block ID (filename is like "000000.blk", "000001.blk", ...).
Array.Sort(blkFiles, StringComparer.OrdinalIgnoreCase);
var maxBlockId = -1;
foreach (var blkFile in blkFiles)
{
var fileName = Path.GetFileNameWithoutExtension(blkFile);
if (!int.TryParse(fileName, out var blockId))
continue;
var record = JsonSerializer.Deserialize<FileRecord>(line);
if (record == null)
continue;
try
{
var block = MsgBlock.Recover(blockId, _options.Directory);
_blocks.Add(block);
if (blockId > maxBlockId)
maxBlockId = blockId;
// Read all non-deleted records from this block and populate the in-memory cache.
RecoverMessagesFromBlock(block);
}
catch (InvalidDataException)
{
// InvalidDataException indicates key mismatch or integrity failure —
// propagate so the caller knows the store cannot be opened.
throw;
}
catch
{
// Skip corrupted blocks — non-critical recovery errors.
}
}
_nextBlockId = maxBlockId + 1;
// The last block is the active block if it has capacity (not sealed).
if (_blocks.Count > 0)
{
var lastBlock = _blocks[^1];
_activeBlock = lastBlock;
}
PruneExpired(DateTime.UtcNow);
}
/// <summary>
/// Reads all non-deleted records from a block and adds them to the in-memory cache.
/// </summary>
private void RecoverMessagesFromBlock(MsgBlock block)
{
// We need to iterate through all sequences in the block.
// MsgBlock tracks first/last sequence, so we try each one.
var first = block.FirstSequence;
var last = block.LastSequence;
if (first == 0 && last == 0)
return; // Empty block.
for (var seq = first; seq <= last; seq++)
{
var record = block.Read(seq);
if (record is null)
continue; // Deleted or not present.
// The payload stored in the block is the transformed (compressed/encrypted) payload.
// We need to reverse-transform it to get the original plaintext.
// InvalidDataException (e.g., wrong key) propagates to the caller.
var originalPayload = RestorePayload(record.Payload.Span);
var message = new StoredMessage
{
Sequence = record.Sequence,
Subject = record.Subject ?? string.Empty,
Payload = RestorePayload(Convert.FromBase64String(record.PayloadBase64 ?? string.Empty)),
TimestampUtc = record.TimestampUtc,
Subject = record.Subject,
Payload = originalPayload,
TimestampUtc = DateTimeOffset.FromUnixTimeMilliseconds(record.Timestamp / 1_000_000L).UtcDateTime,
};
_messages[message.Sequence] = message;
if (message.Sequence > _last)
_last = message.Sequence;
if (!UsedIndexManifestOnStartup || !_index.ContainsKey(message.Sequence))
{
var recordBytes = Encoding.UTF8.GetByteCount(line + Environment.NewLine);
TrackBlockForRecord(recordBytes, message.Sequence);
}
}
PruneExpired(DateTime.UtcNow);
PersistBlockIndexManifest(_manifestPath, _index);
}
private void RewriteDataFile()
// -------------------------------------------------------------------------
// Legacy JSONL migration: if messages.jsonl exists, migrate to blocks.
// -------------------------------------------------------------------------
/// <summary>
/// Migrates data from the legacy JSONL format to block-based storage.
/// If messages.jsonl exists, reads all records, writes them to blocks,
/// then deletes the JSONL file and manifest.
/// </summary>
private void MigrateLegacyJsonl()
{
Directory.CreateDirectory(Path.GetDirectoryName(_dataFilePath)!);
_index.Clear();
_blockCount = 0;
_activeBlockBytes = 0;
_writeOffset = 0;
_last = _messages.Count == 0 ? 0UL : _messages.Keys.Max();
using var stream = new FileStream(_dataFilePath, FileMode.Create, FileAccess.Write, FileShare.Read);
using var writer = new StreamWriter(stream, Encoding.UTF8);
foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value))
{
var line = JsonSerializer.Serialize(new FileRecord
{
Sequence = message.Sequence,
Subject = message.Subject,
PayloadBase64 = Convert.ToBase64String(TransformForPersist(message.Payload.Span)),
TimestampUtc = message.TimestampUtc,
});
writer.WriteLine(line);
var recordBytes = Encoding.UTF8.GetByteCount(line + Environment.NewLine);
TrackBlockForRecord(recordBytes, message.Sequence);
}
writer.Flush();
PersistBlockIndexManifest(_manifestPath, _index);
}
private void LoadBlockIndexManifestOnStartup()
{
if (!File.Exists(_manifestPath))
var jsonlPath = Path.Combine(_options.Directory, "messages.jsonl");
if (!File.Exists(jsonlPath))
return;
try
{
var manifest = JsonSerializer.Deserialize<IndexManifest>(File.ReadAllText(_manifestPath));
if (manifest is null || manifest.Version != 1)
return;
// Read all records from the JSONL file.
var legacyMessages = new List<(ulong Sequence, string Subject, byte[] Payload, DateTime TimestampUtc)>();
_index.Clear();
foreach (var entry in manifest.Entries)
_index[entry.Sequence] = new BlockPointer(entry.BlockId, entry.Offset);
_blockCount = Math.Max(manifest.BlockCount, 0);
_activeBlockBytes = Math.Max(manifest.ActiveBlockBytes, 0);
_writeOffset = Math.Max(manifest.WriteOffset, 0);
UsedIndexManifestOnStartup = true;
}
catch
foreach (var line in File.ReadLines(jsonlPath))
{
UsedIndexManifestOnStartup = false;
_index.Clear();
_blockCount = 0;
_activeBlockBytes = 0;
_writeOffset = 0;
}
}
if (string.IsNullOrWhiteSpace(line))
continue;
private void PersistBlockIndexManifest(string manifestPath, Dictionary<ulong, BlockPointer> blockIndex)
{
var manifest = new IndexManifest
{
Version = 1,
BlockCount = _blockCount,
ActiveBlockBytes = _activeBlockBytes,
WriteOffset = _writeOffset,
Entries = [.. blockIndex.Select(kv => new IndexEntry
FileRecord? record;
try
{
Sequence = kv.Key,
BlockId = kv.Value.BlockId,
Offset = kv.Value.Offset,
}).OrderBy(e => e.Sequence)],
};
record = JsonSerializer.Deserialize<FileRecord>(line);
}
catch
{
continue; // Skip corrupted lines.
}
File.WriteAllText(manifestPath, JsonSerializer.Serialize(manifest));
}
if (record == null)
continue;
private void TrackBlockForRecord(int recordBytes, ulong sequence)
{
if (_blockCount == 0)
_blockCount = 1;
byte[] originalPayload;
try
{
originalPayload = RestorePayload(Convert.FromBase64String(record.PayloadBase64 ?? string.Empty));
}
catch
{
// Re-throw for integrity failures (e.g., wrong encryption key).
throw;
}
if (_activeBlockBytes > 0 && _activeBlockBytes + recordBytes > _options.BlockSizeBytes)
{
_blockCount++;
_activeBlockBytes = 0;
legacyMessages.Add((record.Sequence, record.Subject ?? string.Empty, originalPayload, record.TimestampUtc));
}
_index[sequence] = new BlockPointer(_blockCount, _writeOffset);
_activeBlockBytes += recordBytes;
_writeOffset += recordBytes;
if (legacyMessages.Count == 0)
{
// Delete the empty JSONL file.
File.Delete(jsonlPath);
var manifestPath = Path.Combine(_options.Directory, _options.IndexManifestFileName);
if (File.Exists(manifestPath))
File.Delete(manifestPath);
return;
}
// Add to the in-memory cache.
foreach (var (seq, subject, payload, ts) in legacyMessages)
{
_messages[seq] = new StoredMessage
{
Sequence = seq,
Subject = subject,
Payload = payload,
TimestampUtc = ts,
};
if (seq > _last)
_last = seq;
}
// Write all messages to fresh blocks.
RewriteBlocks();
// Delete the legacy files.
File.Delete(jsonlPath);
var manifestFile = Path.Combine(_options.Directory, _options.IndexManifestFileName);
if (File.Exists(manifestFile))
File.Delete(manifestFile);
}
// -------------------------------------------------------------------------
// Expiry
// -------------------------------------------------------------------------
private void PruneExpired(DateTime nowUtc)
{
if (_options.MaxAgeMs <= 0)
@@ -349,7 +536,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
foreach (var sequence in expired)
_messages.Remove(sequence);
RewriteDataFile();
RewriteBlocks();
}
// -------------------------------------------------------------------------
@@ -586,22 +773,4 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
public string? PayloadBase64 { get; init; }
public DateTime TimestampUtc { get; init; }
}
private readonly record struct BlockPointer(int BlockId, long Offset);
private sealed class IndexManifest
{
public int Version { get; init; }
public int BlockCount { get; init; }
public long ActiveBlockBytes { get; init; }
public long WriteOffset { get; init; }
public List<IndexEntry> Entries { get; init; } = [];
}
private sealed class IndexEntry
{
public ulong Sequence { get; init; }
public int BlockId { get; init; }
public long Offset { get; init; }
}
}