From 2816e8f04809e8d9ae0e2664519284fb36e5bac7 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 24 Feb 2026 12:39:32 -0500 Subject: [PATCH] feat(storage): rewrite FileStore to use block-based MsgBlock storage Replace JSONL persistence with real MsgBlock-based block files (.blk). FileStore now acts as a block manager that creates, seals, and rotates MsgBlocks while maintaining an in-memory cache for fast reads/queries. Key changes: - AppendAsync writes transformed payloads to MsgBlock via WriteAt - Block rotation occurs when active block reaches size limit - Recovery scans .blk files and rebuilds in-memory state from records - Legacy JSONL migration: existing messages.jsonl data is automatically converted to block files on first open, then JSONL is deleted - PurgeAsync disposes and deletes all block files - RewriteBlocks rebuilds blocks from cache (used by trim/restore) - InvalidDataException propagates during recovery (wrong encryption key) MsgBlock.WriteAt added to support explicit sequence numbers and timestamps, needed when rewriting blocks with non-contiguous sequences (after removes). Tests updated: - New FileStoreBlockTests.cs with 9 tests for block-specific behavior - JetStreamFileStoreCompressionEncryptionParityTests updated to read FSV1 magic from .blk files instead of messages.jsonl - JetStreamFileStoreDurabilityParityTests updated to verify .blk files instead of index.manifest.json All 3,562 tests pass (3,535 passed + 27 skipped, 0 failures). --- .../JetStream/Storage/FileStore.cs | 493 ++++++++++++------ src/NATS.Server/JetStream/Storage/MsgBlock.cs | 52 +- ...leStoreCompressionEncryptionParityTests.cs | 13 +- ...JetStreamFileStoreDurabilityParityTests.cs | 4 +- .../JetStream/Storage/FileStoreBlockTests.cs | 289 ++++++++++ 5 files changed, 681 insertions(+), 170 deletions(-) create mode 100644 tests/NATS.Server.Tests/JetStream/Storage/FileStoreBlockTests.cs diff --git a/src/NATS.Server/JetStream/Storage/FileStore.cs b/src/NATS.Server/JetStream/Storage/FileStore.cs index 2518520..78ae6f3 100644 --- a/src/NATS.Server/JetStream/Storage/FileStore.cs +++ b/src/NATS.Server/JetStream/Storage/FileStore.cs @@ -10,23 +10,34 @@ using ApiStreamState = NATS.Server.JetStream.Models.ApiStreamState; namespace NATS.Server.JetStream.Storage; +/// +/// Block-based file store for JetStream messages. Uses for +/// on-disk persistence and maintains an in-memory cache () +/// for fast reads and subject queries. +/// +/// Reference: golang/nats-server/server/filestore.go — block manager, block rotation, +/// recovery via scanning .blk files, soft-delete via dmap. +/// public sealed class FileStore : IStreamStore, IAsyncDisposable { private readonly FileStoreOptions _options; - private readonly string _dataFilePath; - private readonly string _manifestPath; + + // In-memory cache: keyed by sequence number. This is the primary data structure + // for reads and queries. The blocks are the on-disk persistence layer. private readonly Dictionary _messages = new(); - private readonly Dictionary _index = new(); + + // Block-based storage: the active (writable) block and sealed blocks. + private readonly List _blocks = []; + private MsgBlock? _activeBlock; + private int _nextBlockId; + private ulong _last; - private int _blockCount; - private long _activeBlockBytes; - private long _writeOffset; // Resolved at construction time: which format family to use. - private readonly bool _useS2; // true → S2Codec (FSV2 compression path) - private readonly bool _useAead; // true → AeadEncryptor (FSV2 encryption path) + private readonly bool _useS2; // true -> S2Codec (FSV2 compression path) + private readonly bool _useAead; // true -> AeadEncryptor (FSV2 encryption path) - public int BlockCount => _messages.Count == 0 ? 0 : Math.Max(_blockCount, 1); + public int BlockCount => _blocks.Count; public bool UsedIndexManifestOnStartup { get; private set; } public FileStore(FileStoreOptions options) @@ -40,10 +51,10 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable _useAead = _options.Cipher != StoreCipher.NoCipher; Directory.CreateDirectory(options.Directory); - _dataFilePath = Path.Combine(options.Directory, "messages.jsonl"); - _manifestPath = Path.Combine(options.Directory, _options.IndexManifestFileName); - LoadBlockIndexManifestOnStartup(); - LoadExisting(); + + // Attempt legacy JSONL migration first, then recover from blocks. + MigrateLegacyJsonl(); + RecoverBlocks(); } public async ValueTask AppendAsync(string subject, ReadOnlyMemory payload, CancellationToken ct) @@ -51,28 +62,36 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable PruneExpired(DateTime.UtcNow); _last++; + var now = DateTime.UtcNow; + var timestamp = new DateTimeOffset(now).ToUnixTimeMilliseconds() * 1_000_000L; var persistedPayload = TransformForPersist(payload.Span); var stored = new StoredMessage { Sequence = _last, Subject = subject, Payload = payload.ToArray(), - TimestampUtc = DateTime.UtcNow, + TimestampUtc = now, }; _messages[_last] = stored; - var line = JsonSerializer.Serialize(new FileRecord + // Write to MsgBlock. The payload stored in the block is the transformed + // (compressed/encrypted) payload, not the plaintext. + EnsureActiveBlock(); + try { - Sequence = stored.Sequence, - Subject = stored.Subject, - PayloadBase64 = Convert.ToBase64String(persistedPayload), - TimestampUtc = stored.TimestampUtc, - }); - await File.AppendAllTextAsync(_dataFilePath, line + Environment.NewLine, ct); + _activeBlock!.WriteAt(_last, subject, ReadOnlyMemory.Empty, persistedPayload, timestamp); + } + catch (InvalidOperationException) + { + // Block is sealed. Rotate to a new block and retry. + RotateBlock(); + _activeBlock!.WriteAt(_last, subject, ReadOnlyMemory.Empty, persistedPayload, timestamp); + } + + // Check if the block just became sealed after this write. + if (_activeBlock!.IsSealed) + RotateBlock(); - var recordBytes = Encoding.UTF8.GetByteCount(line + Environment.NewLine); - TrackBlockForRecord(recordBytes, stored.Sequence); - PersistBlockIndexManifest(_manifestPath, _index); return _last; } @@ -106,23 +125,31 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable { if (sequence == _last) _last = _messages.Count == 0 ? 0UL : _messages.Keys.Max(); - RewriteDataFile(); + + // Soft-delete in the block that contains this sequence. + DeleteInBlock(sequence); } + return ValueTask.FromResult(removed); } public ValueTask PurgeAsync(CancellationToken ct) { _messages.Clear(); - _index.Clear(); _last = 0; - _blockCount = 0; - _activeBlockBytes = 0; - _writeOffset = 0; - if (File.Exists(_dataFilePath)) - File.Delete(_dataFilePath); - if (File.Exists(_manifestPath)) - File.Delete(_manifestPath); + + // Dispose and delete all blocks. + DisposeAllBlocks(); + CleanBlockFiles(); + + // Clean up any legacy files that might still exist. + var jsonlPath = Path.Combine(_options.Directory, "messages.jsonl"); + if (File.Exists(jsonlPath)) + File.Delete(jsonlPath); + var manifestPath = Path.Combine(_options.Directory, _options.IndexManifestFileName); + if (File.Exists(manifestPath)) + File.Delete(manifestPath); + return ValueTask.CompletedTask; } @@ -145,11 +172,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable public ValueTask RestoreSnapshotAsync(ReadOnlyMemory snapshot, CancellationToken ct) { _messages.Clear(); - _index.Clear(); _last = 0; - _blockCount = 0; - _activeBlockBytes = 0; - _writeOffset = 0; + + // Dispose existing blocks and clean files. + DisposeAllBlocks(); + CleanBlockFiles(); if (!snapshot.IsEmpty) { @@ -158,11 +185,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable { foreach (var record in records) { + var restoredPayload = RestorePayload(Convert.FromBase64String(record.PayloadBase64 ?? string.Empty)); var message = new StoredMessage { Sequence = record.Sequence, Subject = record.Subject ?? string.Empty, - Payload = RestorePayload(Convert.FromBase64String(record.PayloadBase64 ?? string.Empty)), + Payload = restoredPayload, TimestampUtc = record.TimestampUtc, }; _messages[record.Sequence] = message; @@ -171,7 +199,8 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable } } - RewriteDataFile(); + // Write all messages to fresh blocks. + RewriteBlocks(); return ValueTask.CompletedTask; } @@ -194,144 +223,302 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable _messages.Remove(first); } - RewriteDataFile(); + // Rewrite blocks to reflect the trim (removes trimmed messages from disk). + RewriteBlocks(); } - public ValueTask DisposeAsync() => ValueTask.CompletedTask; - - private void LoadExisting() + public ValueTask DisposeAsync() { - if (!File.Exists(_dataFilePath)) + DisposeAllBlocks(); + return ValueTask.CompletedTask; + } + + // ------------------------------------------------------------------------- + // Block management + // ------------------------------------------------------------------------- + + /// + /// Ensures an active (writable) block exists. Creates one if needed. + /// + private void EnsureActiveBlock() + { + if (_activeBlock is null || _activeBlock.IsSealed) + RotateBlock(); + } + + /// + /// Creates a new active block. The previous active block (if any) stays in the + /// block list as a sealed block. The firstSequence is set to _last + 1 (the next + /// expected sequence), but actual sequences come from WriteAt calls. + /// + private void RotateBlock() + { + var firstSeq = _last + 1; + var block = MsgBlock.Create(_nextBlockId, _options.Directory, _options.BlockSizeBytes, firstSeq); + _blocks.Add(block); + _activeBlock = block; + _nextBlockId++; + } + + /// + /// Soft-deletes a message in the block that contains it. + /// + private void DeleteInBlock(ulong sequence) + { + foreach (var block in _blocks) + { + if (sequence >= block.FirstSequence && sequence <= block.LastSequence) + { + block.Delete(sequence); + return; + } + } + } + + /// + /// Disposes all blocks and clears the block list. + /// + private void DisposeAllBlocks() + { + foreach (var block in _blocks) + block.Dispose(); + _blocks.Clear(); + _activeBlock = null; + _nextBlockId = 0; + } + + /// + /// Deletes all .blk files in the store directory. + /// + private void CleanBlockFiles() + { + if (!Directory.Exists(_options.Directory)) return; - foreach (var line in File.ReadLines(_dataFilePath)) + foreach (var blkFile in Directory.GetFiles(_options.Directory, "*.blk")) { - if (string.IsNullOrWhiteSpace(line)) + try { File.Delete(blkFile); } + catch { /* best effort */ } + } + } + + /// + /// Rewrites all blocks from the in-memory message cache. Used after trim, + /// snapshot restore, or legacy migration. + /// + private void RewriteBlocks() + { + DisposeAllBlocks(); + CleanBlockFiles(); + + _last = _messages.Count == 0 ? 0UL : _messages.Keys.Max(); + + foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value)) + { + var persistedPayload = TransformForPersist(message.Payload.Span); + var timestamp = new DateTimeOffset(message.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L; + + EnsureActiveBlock(); + try + { + _activeBlock!.WriteAt(message.Sequence, message.Subject, ReadOnlyMemory.Empty, persistedPayload, timestamp); + } + catch (InvalidOperationException) + { + RotateBlock(); + _activeBlock!.WriteAt(message.Sequence, message.Subject, ReadOnlyMemory.Empty, persistedPayload, timestamp); + } + + if (_activeBlock!.IsSealed) + RotateBlock(); + } + } + + // ------------------------------------------------------------------------- + // Recovery: scan .blk files on startup and rebuild in-memory state. + // ------------------------------------------------------------------------- + + /// + /// Recovers all blocks from .blk files in the store directory. + /// + private void RecoverBlocks() + { + var blkFiles = Directory.GetFiles(_options.Directory, "*.blk"); + if (blkFiles.Length == 0) + return; + + // Sort by block ID (filename is like "000000.blk", "000001.blk", ...). + Array.Sort(blkFiles, StringComparer.OrdinalIgnoreCase); + + var maxBlockId = -1; + + foreach (var blkFile in blkFiles) + { + var fileName = Path.GetFileNameWithoutExtension(blkFile); + if (!int.TryParse(fileName, out var blockId)) continue; - var record = JsonSerializer.Deserialize(line); - if (record == null) - continue; + try + { + var block = MsgBlock.Recover(blockId, _options.Directory); + _blocks.Add(block); + + if (blockId > maxBlockId) + maxBlockId = blockId; + + // Read all non-deleted records from this block and populate the in-memory cache. + RecoverMessagesFromBlock(block); + } + catch (InvalidDataException) + { + // InvalidDataException indicates key mismatch or integrity failure — + // propagate so the caller knows the store cannot be opened. + throw; + } + catch + { + // Skip corrupted blocks — non-critical recovery errors. + } + } + + _nextBlockId = maxBlockId + 1; + + // The last block is the active block if it has capacity (not sealed). + if (_blocks.Count > 0) + { + var lastBlock = _blocks[^1]; + _activeBlock = lastBlock; + } + + PruneExpired(DateTime.UtcNow); + } + + /// + /// Reads all non-deleted records from a block and adds them to the in-memory cache. + /// + private void RecoverMessagesFromBlock(MsgBlock block) + { + // We need to iterate through all sequences in the block. + // MsgBlock tracks first/last sequence, so we try each one. + var first = block.FirstSequence; + var last = block.LastSequence; + + if (first == 0 && last == 0) + return; // Empty block. + + for (var seq = first; seq <= last; seq++) + { + var record = block.Read(seq); + if (record is null) + continue; // Deleted or not present. + + // The payload stored in the block is the transformed (compressed/encrypted) payload. + // We need to reverse-transform it to get the original plaintext. + // InvalidDataException (e.g., wrong key) propagates to the caller. + var originalPayload = RestorePayload(record.Payload.Span); var message = new StoredMessage { Sequence = record.Sequence, - Subject = record.Subject ?? string.Empty, - Payload = RestorePayload(Convert.FromBase64String(record.PayloadBase64 ?? string.Empty)), - TimestampUtc = record.TimestampUtc, + Subject = record.Subject, + Payload = originalPayload, + TimestampUtc = DateTimeOffset.FromUnixTimeMilliseconds(record.Timestamp / 1_000_000L).UtcDateTime, }; _messages[message.Sequence] = message; if (message.Sequence > _last) _last = message.Sequence; - - if (!UsedIndexManifestOnStartup || !_index.ContainsKey(message.Sequence)) - { - var recordBytes = Encoding.UTF8.GetByteCount(line + Environment.NewLine); - TrackBlockForRecord(recordBytes, message.Sequence); - } } - - PruneExpired(DateTime.UtcNow); - PersistBlockIndexManifest(_manifestPath, _index); } - private void RewriteDataFile() + // ------------------------------------------------------------------------- + // Legacy JSONL migration: if messages.jsonl exists, migrate to blocks. + // ------------------------------------------------------------------------- + + /// + /// Migrates data from the legacy JSONL format to block-based storage. + /// If messages.jsonl exists, reads all records, writes them to blocks, + /// then deletes the JSONL file and manifest. + /// + private void MigrateLegacyJsonl() { - Directory.CreateDirectory(Path.GetDirectoryName(_dataFilePath)!); - _index.Clear(); - _blockCount = 0; - _activeBlockBytes = 0; - _writeOffset = 0; - _last = _messages.Count == 0 ? 0UL : _messages.Keys.Max(); - - using var stream = new FileStream(_dataFilePath, FileMode.Create, FileAccess.Write, FileShare.Read); - using var writer = new StreamWriter(stream, Encoding.UTF8); - - foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value)) - { - var line = JsonSerializer.Serialize(new FileRecord - { - Sequence = message.Sequence, - Subject = message.Subject, - PayloadBase64 = Convert.ToBase64String(TransformForPersist(message.Payload.Span)), - TimestampUtc = message.TimestampUtc, - }); - - writer.WriteLine(line); - var recordBytes = Encoding.UTF8.GetByteCount(line + Environment.NewLine); - TrackBlockForRecord(recordBytes, message.Sequence); - } - - writer.Flush(); - PersistBlockIndexManifest(_manifestPath, _index); - } - - private void LoadBlockIndexManifestOnStartup() - { - if (!File.Exists(_manifestPath)) + var jsonlPath = Path.Combine(_options.Directory, "messages.jsonl"); + if (!File.Exists(jsonlPath)) return; - try - { - var manifest = JsonSerializer.Deserialize(File.ReadAllText(_manifestPath)); - if (manifest is null || manifest.Version != 1) - return; + // Read all records from the JSONL file. + var legacyMessages = new List<(ulong Sequence, string Subject, byte[] Payload, DateTime TimestampUtc)>(); - _index.Clear(); - foreach (var entry in manifest.Entries) - _index[entry.Sequence] = new BlockPointer(entry.BlockId, entry.Offset); - - _blockCount = Math.Max(manifest.BlockCount, 0); - _activeBlockBytes = Math.Max(manifest.ActiveBlockBytes, 0); - _writeOffset = Math.Max(manifest.WriteOffset, 0); - UsedIndexManifestOnStartup = true; - } - catch + foreach (var line in File.ReadLines(jsonlPath)) { - UsedIndexManifestOnStartup = false; - _index.Clear(); - _blockCount = 0; - _activeBlockBytes = 0; - _writeOffset = 0; - } - } + if (string.IsNullOrWhiteSpace(line)) + continue; - private void PersistBlockIndexManifest(string manifestPath, Dictionary blockIndex) - { - var manifest = new IndexManifest - { - Version = 1, - BlockCount = _blockCount, - ActiveBlockBytes = _activeBlockBytes, - WriteOffset = _writeOffset, - Entries = [.. blockIndex.Select(kv => new IndexEntry + FileRecord? record; + try { - Sequence = kv.Key, - BlockId = kv.Value.BlockId, - Offset = kv.Value.Offset, - }).OrderBy(e => e.Sequence)], - }; + record = JsonSerializer.Deserialize(line); + } + catch + { + continue; // Skip corrupted lines. + } - File.WriteAllText(manifestPath, JsonSerializer.Serialize(manifest)); - } + if (record == null) + continue; - private void TrackBlockForRecord(int recordBytes, ulong sequence) - { - if (_blockCount == 0) - _blockCount = 1; + byte[] originalPayload; + try + { + originalPayload = RestorePayload(Convert.FromBase64String(record.PayloadBase64 ?? string.Empty)); + } + catch + { + // Re-throw for integrity failures (e.g., wrong encryption key). + throw; + } - if (_activeBlockBytes > 0 && _activeBlockBytes + recordBytes > _options.BlockSizeBytes) - { - _blockCount++; - _activeBlockBytes = 0; + legacyMessages.Add((record.Sequence, record.Subject ?? string.Empty, originalPayload, record.TimestampUtc)); } - _index[sequence] = new BlockPointer(_blockCount, _writeOffset); - _activeBlockBytes += recordBytes; - _writeOffset += recordBytes; + if (legacyMessages.Count == 0) + { + // Delete the empty JSONL file. + File.Delete(jsonlPath); + var manifestPath = Path.Combine(_options.Directory, _options.IndexManifestFileName); + if (File.Exists(manifestPath)) + File.Delete(manifestPath); + return; + } + + // Add to the in-memory cache. + foreach (var (seq, subject, payload, ts) in legacyMessages) + { + _messages[seq] = new StoredMessage + { + Sequence = seq, + Subject = subject, + Payload = payload, + TimestampUtc = ts, + }; + if (seq > _last) + _last = seq; + } + + // Write all messages to fresh blocks. + RewriteBlocks(); + + // Delete the legacy files. + File.Delete(jsonlPath); + var manifestFile = Path.Combine(_options.Directory, _options.IndexManifestFileName); + if (File.Exists(manifestFile)) + File.Delete(manifestFile); } + // ------------------------------------------------------------------------- + // Expiry + // ------------------------------------------------------------------------- + private void PruneExpired(DateTime nowUtc) { if (_options.MaxAgeMs <= 0) @@ -349,7 +536,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable foreach (var sequence in expired) _messages.Remove(sequence); - RewriteDataFile(); + RewriteBlocks(); } // ------------------------------------------------------------------------- @@ -586,22 +773,4 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable public string? PayloadBase64 { get; init; } public DateTime TimestampUtc { get; init; } } - - private readonly record struct BlockPointer(int BlockId, long Offset); - - private sealed class IndexManifest - { - public int Version { get; init; } - public int BlockCount { get; init; } - public long ActiveBlockBytes { get; init; } - public long WriteOffset { get; init; } - public List Entries { get; init; } = []; - } - - private sealed class IndexEntry - { - public ulong Sequence { get; init; } - public int BlockId { get; init; } - public long Offset { get; init; } - } } diff --git a/src/NATS.Server/JetStream/Storage/MsgBlock.cs b/src/NATS.Server/JetStream/Storage/MsgBlock.cs index 553210c..75d5c3d 100644 --- a/src/NATS.Server/JetStream/Storage/MsgBlock.cs +++ b/src/NATS.Server/JetStream/Storage/MsgBlock.cs @@ -149,7 +149,7 @@ public sealed class MsgBlock : IDisposable } /// - /// Appends a message to the block. + /// Appends a message to the block with an auto-assigned sequence number. /// /// NATS subject. /// Optional message headers. @@ -199,6 +199,56 @@ public sealed class MsgBlock : IDisposable } } + /// + /// Appends a message to the block with an explicit sequence number and timestamp. + /// Used by FileStore when rewriting blocks from the in-memory cache where + /// sequences may have gaps (from prior removals). + /// + /// Explicit sequence number to assign. + /// NATS subject. + /// Optional message headers. + /// Message body payload. + /// Timestamp in Unix nanoseconds. + /// Block is sealed. + public void WriteAt(ulong sequence, string subject, ReadOnlyMemory headers, ReadOnlyMemory payload, long timestamp) + { + _lock.EnterWriteLock(); + try + { + if (_writeOffset >= _maxBytes) + throw new InvalidOperationException("Block is sealed; cannot write new messages."); + + var record = new MessageRecord + { + Sequence = sequence, + Subject = subject, + Headers = headers, + Payload = payload, + Timestamp = timestamp, + Deleted = false, + }; + + var encoded = MessageRecord.Encode(record); + var offset = _writeOffset; + + RandomAccess.Write(_handle, encoded, offset); + _writeOffset = offset + encoded.Length; + + _index[sequence] = (offset, encoded.Length); + + if (_totalWritten == 0) + _firstSequence = sequence; + + _lastSequence = sequence; + _nextSequence = Math.Max(_nextSequence, sequence + 1); + _totalWritten++; + } + finally + { + _lock.ExitWriteLock(); + } + } + /// /// Reads a message by sequence number. Uses positional I/O /// () so concurrent readers don't diff --git a/tests/NATS.Server.Tests/JetStream/JetStreamFileStoreCompressionEncryptionParityTests.cs b/tests/NATS.Server.Tests/JetStream/JetStreamFileStoreCompressionEncryptionParityTests.cs index c051a3e..812f9cf 100644 --- a/tests/NATS.Server.Tests/JetStream/JetStreamFileStoreCompressionEncryptionParityTests.cs +++ b/tests/NATS.Server.Tests/JetStream/JetStreamFileStoreCompressionEncryptionParityTests.cs @@ -1,5 +1,4 @@ using System.Text; -using System.Text.Json; using NATS.Server.JetStream.Storage; namespace NATS.Server.Tests; @@ -29,10 +28,14 @@ public class JetStreamFileStoreCompressionEncryptionParityTests Encoding.UTF8.GetString(loaded.Payload.ToArray()).ShouldBe("payload"); } - var firstLine = File.ReadLines(Path.Combine(dir, "messages.jsonl")).First(); - var payloadBase64 = JsonDocument.Parse(firstLine).RootElement.GetProperty("PayloadBase64").GetString(); - payloadBase64.ShouldNotBeNull(); - var persisted = Convert.FromBase64String(payloadBase64!); + // Block-based storage: read the .blk file to verify FSV1 envelope. + var blkFiles = Directory.GetFiles(dir, "*.blk"); + blkFiles.Length.ShouldBeGreaterThan(0); + + // Read the first record from the block file and verify FSV1 magic in payload. + var blkBytes = File.ReadAllBytes(blkFiles[0]); + var record = MessageRecord.Decode(blkBytes.AsSpan(0, MessageRecord.MeasureRecord(blkBytes))); + var persisted = record.Payload.ToArray(); persisted.Take(4).SequenceEqual("FSV1"u8.ToArray()).ShouldBeTrue(); Should.Throw(() => diff --git a/tests/NATS.Server.Tests/JetStream/JetStreamFileStoreDurabilityParityTests.cs b/tests/NATS.Server.Tests/JetStream/JetStreamFileStoreDurabilityParityTests.cs index 0b51138..cd95bf2 100644 --- a/tests/NATS.Server.Tests/JetStream/JetStreamFileStoreDurabilityParityTests.cs +++ b/tests/NATS.Server.Tests/JetStream/JetStreamFileStoreDurabilityParityTests.cs @@ -23,10 +23,10 @@ public class JetStreamFileStoreDurabilityParityTests await store.AppendAsync("orders.created", Encoding.UTF8.GetBytes($"payload-{i}"), default); } - File.Exists(Path.Combine(dir, options.IndexManifestFileName)).ShouldBeTrue(); + // Block-based storage: .blk files should be present on disk. + Directory.GetFiles(dir, "*.blk").Length.ShouldBeGreaterThan(0); await using var reopened = new FileStore(options); - reopened.UsedIndexManifestOnStartup.ShouldBeTrue(); var state = await reopened.GetStateAsync(default); state.Messages.ShouldBe((ulong)1000); reopened.BlockCount.ShouldBeGreaterThan(1); diff --git a/tests/NATS.Server.Tests/JetStream/Storage/FileStoreBlockTests.cs b/tests/NATS.Server.Tests/JetStream/Storage/FileStoreBlockTests.cs new file mode 100644 index 0000000..a59931e --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Storage/FileStoreBlockTests.cs @@ -0,0 +1,289 @@ +// Reference: golang/nats-server/server/filestore_test.go +// Tests for Task A3: FileStore Block Manager Rewrite. +// Verifies that FileStore correctly uses MsgBlock-based storage: +// block files on disk, block rotation, recovery, purge, snapshot, +// soft-delete, and payload transformation (S2/AEAD) integration. + +using System.Text; +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.Tests.JetStream.Storage; + +public sealed class FileStoreBlockTests : IDisposable +{ + private readonly string _dir; + + public FileStoreBlockTests() + { + _dir = Path.Combine(Path.GetTempPath(), $"nats-js-fs-block-{Guid.NewGuid():N}"); + Directory.CreateDirectory(_dir); + } + + public void Dispose() + { + if (Directory.Exists(_dir)) + Directory.Delete(_dir, recursive: true); + } + + private FileStore CreateStore(string subdirectory, FileStoreOptions? options = null) + { + var dir = Path.Combine(_dir, subdirectory); + var opts = options ?? new FileStoreOptions(); + opts.Directory = dir; + return new FileStore(opts); + } + + // Go: filestore.go block-based storage — verify .blk files are created on disk. + [Fact] + public async Task Append_UsesBlockStorage() + { + var subDir = "blk-storage"; + var dir = Path.Combine(_dir, subDir); + + await using var store = CreateStore(subDir); + + await store.AppendAsync("foo", "Hello World"u8.ToArray(), default); + + // At least one .blk file should exist in the store directory. + var blkFiles = Directory.GetFiles(dir, "*.blk"); + blkFiles.Length.ShouldBeGreaterThanOrEqualTo(1); + + // The old JSONL file should NOT exist. + File.Exists(Path.Combine(dir, "messages.jsonl")).ShouldBeFalse(); + } + + // Go: filestore.go block rotation — rbytes check causes new block creation. + [Fact] + public async Task MultiBlock_RotatesWhenFull() + { + var subDir = "blk-rotation"; + var dir = Path.Combine(_dir, subDir); + + // Small block size to force rotation quickly. + await using var store = CreateStore(subDir, new FileStoreOptions { BlockSizeBytes = 256 }); + + // Write enough messages to exceed 256 bytes per block. + for (var i = 0; i < 20; i++) + await store.AppendAsync("foo", "Hello World - block rotation test!"u8.ToArray(), default); + + var state = await store.GetStateAsync(default); + state.Messages.ShouldBe((ulong)20); + + // Multiple .blk files should be created. + var blkFiles = Directory.GetFiles(dir, "*.blk"); + blkFiles.Length.ShouldBeGreaterThan(1); + + // BlockCount should reflect multiple blocks. + store.BlockCount.ShouldBeGreaterThan(1); + } + + // Go: filestore.go multi-block load — messages span multiple blocks. + [Fact] + public async Task Load_AcrossBlocks() + { + var subDir = "blk-across"; + + // Small block size to force multiple blocks. + await using var store = CreateStore(subDir, new FileStoreOptions { BlockSizeBytes = 256 }); + + for (var i = 0; i < 20; i++) + await store.AppendAsync("foo", Encoding.UTF8.GetBytes($"msg-{i:D4}"), default); + + // Verify we have multiple blocks. + store.BlockCount.ShouldBeGreaterThan(1); + + // All messages should be loadable, regardless of which block they are in. + for (ulong i = 1; i <= 20; i++) + { + var msg = await store.LoadAsync(i, default); + msg.ShouldNotBeNull(); + msg!.Subject.ShouldBe("foo"); + var expected = Encoding.UTF8.GetBytes($"msg-{(int)(i - 1):D4}"); + msg.Payload.ToArray().ShouldBe(expected); + } + } + + // Go: filestore.go recovery — block files are rescanned on startup. + [Fact] + public async Task Recovery_AfterRestart() + { + var subDir = "blk-recovery"; + var dir = Path.Combine(_dir, subDir); + + // Write data and dispose. + await using (var store = CreateStore(subDir, new FileStoreOptions { BlockSizeBytes = 256 })) + { + for (var i = 0; i < 20; i++) + await store.AppendAsync("foo", Encoding.UTF8.GetBytes($"msg-{i:D4}"), default); + + var state = await store.GetStateAsync(default); + state.Messages.ShouldBe((ulong)20); + } + + // .blk files should still exist after dispose. + var blkFiles = Directory.GetFiles(dir, "*.blk"); + blkFiles.Length.ShouldBeGreaterThan(0); + + // Recreate FileStore from the same directory. + await using (var store = CreateStore(subDir, new FileStoreOptions { BlockSizeBytes = 256 })) + { + var state = await store.GetStateAsync(default); + state.Messages.ShouldBe((ulong)20); + state.FirstSeq.ShouldBe((ulong)1); + state.LastSeq.ShouldBe((ulong)20); + + // Verify all messages are intact. + for (ulong i = 1; i <= 20; i++) + { + var msg = await store.LoadAsync(i, default); + msg.ShouldNotBeNull(); + var expected = Encoding.UTF8.GetBytes($"msg-{(int)(i - 1):D4}"); + msg!.Payload.ToArray().ShouldBe(expected); + } + } + } + + // Go: filestore.go purge — all blocks removed, fresh block created. + [Fact] + public async Task Purge_CleansAllBlocks() + { + var subDir = "blk-purge"; + var dir = Path.Combine(_dir, subDir); + + await using var store = CreateStore(subDir, new FileStoreOptions { BlockSizeBytes = 256 }); + + for (var i = 0; i < 20; i++) + await store.AppendAsync("foo", "Hello"u8.ToArray(), default); + + // Before purge, multiple .blk files should exist. + Directory.GetFiles(dir, "*.blk").Length.ShouldBeGreaterThan(0); + + await store.PurgeAsync(default); + + var state = await store.GetStateAsync(default); + state.Messages.ShouldBe((ulong)0); + state.Bytes.ShouldBe((ulong)0); + + // After purge, no old .blk files should remain (or they should be empty/recreated). + // The old JSONL file should also not exist. + File.Exists(Path.Combine(dir, "messages.jsonl")).ShouldBeFalse(); + } + + // Go: filestore.go dmap — soft-delete within a block. + [Fact] + public async Task Remove_SoftDeletesInBlock() + { + await using var store = CreateStore("blk-remove"); + + for (var i = 0; i < 5; i++) + await store.AppendAsync("foo", "data"u8.ToArray(), default); + + // Remove sequence 3. + (await store.RemoveAsync(3, default)).ShouldBeTrue(); + + // Verify seq 3 returns null. + (await store.LoadAsync(3, default)).ShouldBeNull(); + + // Other sequences still loadable. + (await store.LoadAsync(1, default)).ShouldNotBeNull(); + (await store.LoadAsync(2, default)).ShouldNotBeNull(); + (await store.LoadAsync(4, default)).ShouldNotBeNull(); + (await store.LoadAsync(5, default)).ShouldNotBeNull(); + + // State reflects the removal. + var state = await store.GetStateAsync(default); + state.Messages.ShouldBe((ulong)4); + } + + // Go: filestore.go snapshot — iterates all blocks for snapshot creation. + [Fact] + public async Task Snapshot_IncludesAllBlocks() + { + await using var srcStore = CreateStore("blk-snap-src", new FileStoreOptions { BlockSizeBytes = 256 }); + + for (var i = 0; i < 30; i++) + await srcStore.AppendAsync("foo", Encoding.UTF8.GetBytes($"msg-{i}"), default); + + // Verify multiple blocks. + srcStore.BlockCount.ShouldBeGreaterThan(1); + + var snap = await srcStore.CreateSnapshotAsync(default); + snap.Length.ShouldBeGreaterThan(0); + + // Restore into a new store. + await using var dstStore = CreateStore("blk-snap-dst"); + await dstStore.RestoreSnapshotAsync(snap, default); + + var srcState = await srcStore.GetStateAsync(default); + var dstState = await dstStore.GetStateAsync(default); + dstState.Messages.ShouldBe(srcState.Messages); + dstState.FirstSeq.ShouldBe(srcState.FirstSeq); + dstState.LastSeq.ShouldBe(srcState.LastSeq); + + // Verify each message round-trips. + for (ulong i = 1; i <= srcState.Messages; i++) + { + var original = await srcStore.LoadAsync(i, default); + var copy = await dstStore.LoadAsync(i, default); + copy.ShouldNotBeNull(); + copy!.Subject.ShouldBe(original!.Subject); + copy.Payload.ToArray().ShouldBe(original.Payload.ToArray()); + } + } + + // Go: filestore.go S2 compression — payload is compressed before block write. + [Fact] + public async Task Compression_RoundTrip() + { + var subDir = "blk-compress"; + + await using var store = CreateStore(subDir, new FileStoreOptions + { + Compression = StoreCompression.S2Compression, + }); + + var payload = "Hello, S2 compressed block storage!"u8.ToArray(); + for (var i = 0; i < 10; i++) + await store.AppendAsync("foo", payload, default); + + var state = await store.GetStateAsync(default); + state.Messages.ShouldBe((ulong)10); + + // Verify all messages are readable with correct payload. + for (ulong i = 1; i <= 10; i++) + { + var msg = await store.LoadAsync(i, default); + msg.ShouldNotBeNull(); + msg!.Payload.ToArray().ShouldBe(payload); + } + } + + // Go: filestore.go AEAD encryption — payload is encrypted before block write. + [Fact] + public async Task Encryption_RoundTrip() + { + var subDir = "blk-encrypt"; + var key = "nats-v2-test-key-exactly-32-bytes"u8[..32].ToArray(); + + await using var store = CreateStore(subDir, new FileStoreOptions + { + Cipher = StoreCipher.ChaCha, + EncryptionKey = key, + }); + + var payload = "Hello, AEAD encrypted block storage!"u8.ToArray(); + for (var i = 0; i < 10; i++) + await store.AppendAsync("foo", payload, default); + + var state = await store.GetStateAsync(default); + state.Messages.ShouldBe((ulong)10); + + // Verify all messages are readable with correct payload. + for (ulong i = 1; i <= 10; i++) + { + var msg = await store.LoadAsync(i, default); + msg.ShouldNotBeNull(); + msg!.Payload.ToArray().ShouldBe(payload); + } + } +}