diff --git a/src/NATS.Server/JetStream/Storage/FileStore.cs b/src/NATS.Server/JetStream/Storage/FileStore.cs
index 2518520..78ae6f3 100644
--- a/src/NATS.Server/JetStream/Storage/FileStore.cs
+++ b/src/NATS.Server/JetStream/Storage/FileStore.cs
@@ -10,23 +10,34 @@ using ApiStreamState = NATS.Server.JetStream.Models.ApiStreamState;
namespace NATS.Server.JetStream.Storage;
+///
+/// Block-based file store for JetStream messages. Uses for
+/// on-disk persistence and maintains an in-memory cache ()
+/// for fast reads and subject queries.
+///
+/// Reference: golang/nats-server/server/filestore.go — block manager, block rotation,
+/// recovery via scanning .blk files, soft-delete via dmap.
+///
public sealed class FileStore : IStreamStore, IAsyncDisposable
{
private readonly FileStoreOptions _options;
- private readonly string _dataFilePath;
- private readonly string _manifestPath;
+
+ // In-memory cache: keyed by sequence number. This is the primary data structure
+ // for reads and queries. The blocks are the on-disk persistence layer.
private readonly Dictionary _messages = new();
- private readonly Dictionary _index = new();
+
+ // Block-based storage: the active (writable) block and sealed blocks.
+ private readonly List _blocks = [];
+ private MsgBlock? _activeBlock;
+ private int _nextBlockId;
+
private ulong _last;
- private int _blockCount;
- private long _activeBlockBytes;
- private long _writeOffset;
// Resolved at construction time: which format family to use.
- private readonly bool _useS2; // true → S2Codec (FSV2 compression path)
- private readonly bool _useAead; // true → AeadEncryptor (FSV2 encryption path)
+ private readonly bool _useS2; // true -> S2Codec (FSV2 compression path)
+ private readonly bool _useAead; // true -> AeadEncryptor (FSV2 encryption path)
- public int BlockCount => _messages.Count == 0 ? 0 : Math.Max(_blockCount, 1);
+ public int BlockCount => _blocks.Count;
public bool UsedIndexManifestOnStartup { get; private set; }
public FileStore(FileStoreOptions options)
@@ -40,10 +51,10 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
_useAead = _options.Cipher != StoreCipher.NoCipher;
Directory.CreateDirectory(options.Directory);
- _dataFilePath = Path.Combine(options.Directory, "messages.jsonl");
- _manifestPath = Path.Combine(options.Directory, _options.IndexManifestFileName);
- LoadBlockIndexManifestOnStartup();
- LoadExisting();
+
+ // Attempt legacy JSONL migration first, then recover from blocks.
+ MigrateLegacyJsonl();
+ RecoverBlocks();
}
public async ValueTask AppendAsync(string subject, ReadOnlyMemory payload, CancellationToken ct)
@@ -51,28 +62,36 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
PruneExpired(DateTime.UtcNow);
_last++;
+ var now = DateTime.UtcNow;
+ var timestamp = new DateTimeOffset(now).ToUnixTimeMilliseconds() * 1_000_000L;
var persistedPayload = TransformForPersist(payload.Span);
var stored = new StoredMessage
{
Sequence = _last,
Subject = subject,
Payload = payload.ToArray(),
- TimestampUtc = DateTime.UtcNow,
+ TimestampUtc = now,
};
_messages[_last] = stored;
- var line = JsonSerializer.Serialize(new FileRecord
+ // Write to MsgBlock. The payload stored in the block is the transformed
+ // (compressed/encrypted) payload, not the plaintext.
+ EnsureActiveBlock();
+ try
{
- Sequence = stored.Sequence,
- Subject = stored.Subject,
- PayloadBase64 = Convert.ToBase64String(persistedPayload),
- TimestampUtc = stored.TimestampUtc,
- });
- await File.AppendAllTextAsync(_dataFilePath, line + Environment.NewLine, ct);
+ _activeBlock!.WriteAt(_last, subject, ReadOnlyMemory.Empty, persistedPayload, timestamp);
+ }
+ catch (InvalidOperationException)
+ {
+ // Block is sealed. Rotate to a new block and retry.
+ RotateBlock();
+ _activeBlock!.WriteAt(_last, subject, ReadOnlyMemory.Empty, persistedPayload, timestamp);
+ }
+
+ // Check if the block just became sealed after this write.
+ if (_activeBlock!.IsSealed)
+ RotateBlock();
- var recordBytes = Encoding.UTF8.GetByteCount(line + Environment.NewLine);
- TrackBlockForRecord(recordBytes, stored.Sequence);
- PersistBlockIndexManifest(_manifestPath, _index);
return _last;
}
@@ -106,23 +125,31 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
{
if (sequence == _last)
_last = _messages.Count == 0 ? 0UL : _messages.Keys.Max();
- RewriteDataFile();
+
+ // Soft-delete in the block that contains this sequence.
+ DeleteInBlock(sequence);
}
+
return ValueTask.FromResult(removed);
}
public ValueTask PurgeAsync(CancellationToken ct)
{
_messages.Clear();
- _index.Clear();
_last = 0;
- _blockCount = 0;
- _activeBlockBytes = 0;
- _writeOffset = 0;
- if (File.Exists(_dataFilePath))
- File.Delete(_dataFilePath);
- if (File.Exists(_manifestPath))
- File.Delete(_manifestPath);
+
+ // Dispose and delete all blocks.
+ DisposeAllBlocks();
+ CleanBlockFiles();
+
+ // Clean up any legacy files that might still exist.
+ var jsonlPath = Path.Combine(_options.Directory, "messages.jsonl");
+ if (File.Exists(jsonlPath))
+ File.Delete(jsonlPath);
+ var manifestPath = Path.Combine(_options.Directory, _options.IndexManifestFileName);
+ if (File.Exists(manifestPath))
+ File.Delete(manifestPath);
+
return ValueTask.CompletedTask;
}
@@ -145,11 +172,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
public ValueTask RestoreSnapshotAsync(ReadOnlyMemory snapshot, CancellationToken ct)
{
_messages.Clear();
- _index.Clear();
_last = 0;
- _blockCount = 0;
- _activeBlockBytes = 0;
- _writeOffset = 0;
+
+ // Dispose existing blocks and clean files.
+ DisposeAllBlocks();
+ CleanBlockFiles();
if (!snapshot.IsEmpty)
{
@@ -158,11 +185,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
{
foreach (var record in records)
{
+ var restoredPayload = RestorePayload(Convert.FromBase64String(record.PayloadBase64 ?? string.Empty));
var message = new StoredMessage
{
Sequence = record.Sequence,
Subject = record.Subject ?? string.Empty,
- Payload = RestorePayload(Convert.FromBase64String(record.PayloadBase64 ?? string.Empty)),
+ Payload = restoredPayload,
TimestampUtc = record.TimestampUtc,
};
_messages[record.Sequence] = message;
@@ -171,7 +199,8 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
}
}
- RewriteDataFile();
+ // Write all messages to fresh blocks.
+ RewriteBlocks();
return ValueTask.CompletedTask;
}
@@ -194,144 +223,302 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
_messages.Remove(first);
}
- RewriteDataFile();
+ // Rewrite blocks to reflect the trim (removes trimmed messages from disk).
+ RewriteBlocks();
}
- public ValueTask DisposeAsync() => ValueTask.CompletedTask;
-
- private void LoadExisting()
+ public ValueTask DisposeAsync()
{
- if (!File.Exists(_dataFilePath))
+ DisposeAllBlocks();
+ return ValueTask.CompletedTask;
+ }
+
+ // -------------------------------------------------------------------------
+ // Block management
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Ensures an active (writable) block exists. Creates one if needed.
+ ///
+ private void EnsureActiveBlock()
+ {
+ if (_activeBlock is null || _activeBlock.IsSealed)
+ RotateBlock();
+ }
+
+ ///
+ /// Creates a new active block. The previous active block (if any) stays in the
+ /// block list as a sealed block. The firstSequence is set to _last + 1 (the next
+ /// expected sequence), but actual sequences come from WriteAt calls.
+ ///
+ private void RotateBlock()
+ {
+ var firstSeq = _last + 1;
+ var block = MsgBlock.Create(_nextBlockId, _options.Directory, _options.BlockSizeBytes, firstSeq);
+ _blocks.Add(block);
+ _activeBlock = block;
+ _nextBlockId++;
+ }
+
+ ///
+ /// Soft-deletes a message in the block that contains it.
+ ///
+ private void DeleteInBlock(ulong sequence)
+ {
+ foreach (var block in _blocks)
+ {
+ if (sequence >= block.FirstSequence && sequence <= block.LastSequence)
+ {
+ block.Delete(sequence);
+ return;
+ }
+ }
+ }
+
+ ///
+ /// Disposes all blocks and clears the block list.
+ ///
+ private void DisposeAllBlocks()
+ {
+ foreach (var block in _blocks)
+ block.Dispose();
+ _blocks.Clear();
+ _activeBlock = null;
+ _nextBlockId = 0;
+ }
+
+ ///
+ /// Deletes all .blk files in the store directory.
+ ///
+ private void CleanBlockFiles()
+ {
+ if (!Directory.Exists(_options.Directory))
return;
- foreach (var line in File.ReadLines(_dataFilePath))
+ foreach (var blkFile in Directory.GetFiles(_options.Directory, "*.blk"))
{
- if (string.IsNullOrWhiteSpace(line))
+ try { File.Delete(blkFile); }
+ catch { /* best effort */ }
+ }
+ }
+
+ ///
+ /// Rewrites all blocks from the in-memory message cache. Used after trim,
+ /// snapshot restore, or legacy migration.
+ ///
+ private void RewriteBlocks()
+ {
+ DisposeAllBlocks();
+ CleanBlockFiles();
+
+ _last = _messages.Count == 0 ? 0UL : _messages.Keys.Max();
+
+ foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value))
+ {
+ var persistedPayload = TransformForPersist(message.Payload.Span);
+ var timestamp = new DateTimeOffset(message.TimestampUtc).ToUnixTimeMilliseconds() * 1_000_000L;
+
+ EnsureActiveBlock();
+ try
+ {
+ _activeBlock!.WriteAt(message.Sequence, message.Subject, ReadOnlyMemory.Empty, persistedPayload, timestamp);
+ }
+ catch (InvalidOperationException)
+ {
+ RotateBlock();
+ _activeBlock!.WriteAt(message.Sequence, message.Subject, ReadOnlyMemory.Empty, persistedPayload, timestamp);
+ }
+
+ if (_activeBlock!.IsSealed)
+ RotateBlock();
+ }
+ }
+
+ // -------------------------------------------------------------------------
+ // Recovery: scan .blk files on startup and rebuild in-memory state.
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Recovers all blocks from .blk files in the store directory.
+ ///
+ private void RecoverBlocks()
+ {
+ var blkFiles = Directory.GetFiles(_options.Directory, "*.blk");
+ if (blkFiles.Length == 0)
+ return;
+
+ // Sort by block ID (filename is like "000000.blk", "000001.blk", ...).
+ Array.Sort(blkFiles, StringComparer.OrdinalIgnoreCase);
+
+ var maxBlockId = -1;
+
+ foreach (var blkFile in blkFiles)
+ {
+ var fileName = Path.GetFileNameWithoutExtension(blkFile);
+ if (!int.TryParse(fileName, out var blockId))
continue;
- var record = JsonSerializer.Deserialize(line);
- if (record == null)
- continue;
+ try
+ {
+ var block = MsgBlock.Recover(blockId, _options.Directory);
+ _blocks.Add(block);
+
+ if (blockId > maxBlockId)
+ maxBlockId = blockId;
+
+ // Read all non-deleted records from this block and populate the in-memory cache.
+ RecoverMessagesFromBlock(block);
+ }
+ catch (InvalidDataException)
+ {
+ // InvalidDataException indicates key mismatch or integrity failure —
+ // propagate so the caller knows the store cannot be opened.
+ throw;
+ }
+ catch
+ {
+ // Skip corrupted blocks — non-critical recovery errors.
+ }
+ }
+
+ _nextBlockId = maxBlockId + 1;
+
+ // The last block is the active block if it has capacity (not sealed).
+ if (_blocks.Count > 0)
+ {
+ var lastBlock = _blocks[^1];
+ _activeBlock = lastBlock;
+ }
+
+ PruneExpired(DateTime.UtcNow);
+ }
+
+ ///
+ /// Reads all non-deleted records from a block and adds them to the in-memory cache.
+ ///
+ private void RecoverMessagesFromBlock(MsgBlock block)
+ {
+ // We need to iterate through all sequences in the block.
+ // MsgBlock tracks first/last sequence, so we try each one.
+ var first = block.FirstSequence;
+ var last = block.LastSequence;
+
+ if (first == 0 && last == 0)
+ return; // Empty block.
+
+ for (var seq = first; seq <= last; seq++)
+ {
+ var record = block.Read(seq);
+ if (record is null)
+ continue; // Deleted or not present.
+
+ // The payload stored in the block is the transformed (compressed/encrypted) payload.
+ // We need to reverse-transform it to get the original plaintext.
+ // InvalidDataException (e.g., wrong key) propagates to the caller.
+ var originalPayload = RestorePayload(record.Payload.Span);
var message = new StoredMessage
{
Sequence = record.Sequence,
- Subject = record.Subject ?? string.Empty,
- Payload = RestorePayload(Convert.FromBase64String(record.PayloadBase64 ?? string.Empty)),
- TimestampUtc = record.TimestampUtc,
+ Subject = record.Subject,
+ Payload = originalPayload,
+ TimestampUtc = DateTimeOffset.FromUnixTimeMilliseconds(record.Timestamp / 1_000_000L).UtcDateTime,
};
_messages[message.Sequence] = message;
if (message.Sequence > _last)
_last = message.Sequence;
-
- if (!UsedIndexManifestOnStartup || !_index.ContainsKey(message.Sequence))
- {
- var recordBytes = Encoding.UTF8.GetByteCount(line + Environment.NewLine);
- TrackBlockForRecord(recordBytes, message.Sequence);
- }
}
-
- PruneExpired(DateTime.UtcNow);
- PersistBlockIndexManifest(_manifestPath, _index);
}
- private void RewriteDataFile()
+ // -------------------------------------------------------------------------
+ // Legacy JSONL migration: if messages.jsonl exists, migrate to blocks.
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Migrates data from the legacy JSONL format to block-based storage.
+ /// If messages.jsonl exists, reads all records, writes them to blocks,
+ /// then deletes the JSONL file and manifest.
+ ///
+ private void MigrateLegacyJsonl()
{
- Directory.CreateDirectory(Path.GetDirectoryName(_dataFilePath)!);
- _index.Clear();
- _blockCount = 0;
- _activeBlockBytes = 0;
- _writeOffset = 0;
- _last = _messages.Count == 0 ? 0UL : _messages.Keys.Max();
-
- using var stream = new FileStream(_dataFilePath, FileMode.Create, FileAccess.Write, FileShare.Read);
- using var writer = new StreamWriter(stream, Encoding.UTF8);
-
- foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value))
- {
- var line = JsonSerializer.Serialize(new FileRecord
- {
- Sequence = message.Sequence,
- Subject = message.Subject,
- PayloadBase64 = Convert.ToBase64String(TransformForPersist(message.Payload.Span)),
- TimestampUtc = message.TimestampUtc,
- });
-
- writer.WriteLine(line);
- var recordBytes = Encoding.UTF8.GetByteCount(line + Environment.NewLine);
- TrackBlockForRecord(recordBytes, message.Sequence);
- }
-
- writer.Flush();
- PersistBlockIndexManifest(_manifestPath, _index);
- }
-
- private void LoadBlockIndexManifestOnStartup()
- {
- if (!File.Exists(_manifestPath))
+ var jsonlPath = Path.Combine(_options.Directory, "messages.jsonl");
+ if (!File.Exists(jsonlPath))
return;
- try
- {
- var manifest = JsonSerializer.Deserialize(File.ReadAllText(_manifestPath));
- if (manifest is null || manifest.Version != 1)
- return;
+ // Read all records from the JSONL file.
+ var legacyMessages = new List<(ulong Sequence, string Subject, byte[] Payload, DateTime TimestampUtc)>();
- _index.Clear();
- foreach (var entry in manifest.Entries)
- _index[entry.Sequence] = new BlockPointer(entry.BlockId, entry.Offset);
-
- _blockCount = Math.Max(manifest.BlockCount, 0);
- _activeBlockBytes = Math.Max(manifest.ActiveBlockBytes, 0);
- _writeOffset = Math.Max(manifest.WriteOffset, 0);
- UsedIndexManifestOnStartup = true;
- }
- catch
+ foreach (var line in File.ReadLines(jsonlPath))
{
- UsedIndexManifestOnStartup = false;
- _index.Clear();
- _blockCount = 0;
- _activeBlockBytes = 0;
- _writeOffset = 0;
- }
- }
+ if (string.IsNullOrWhiteSpace(line))
+ continue;
- private void PersistBlockIndexManifest(string manifestPath, Dictionary blockIndex)
- {
- var manifest = new IndexManifest
- {
- Version = 1,
- BlockCount = _blockCount,
- ActiveBlockBytes = _activeBlockBytes,
- WriteOffset = _writeOffset,
- Entries = [.. blockIndex.Select(kv => new IndexEntry
+ FileRecord? record;
+ try
{
- Sequence = kv.Key,
- BlockId = kv.Value.BlockId,
- Offset = kv.Value.Offset,
- }).OrderBy(e => e.Sequence)],
- };
+ record = JsonSerializer.Deserialize(line);
+ }
+ catch
+ {
+ continue; // Skip corrupted lines.
+ }
- File.WriteAllText(manifestPath, JsonSerializer.Serialize(manifest));
- }
+ if (record == null)
+ continue;
- private void TrackBlockForRecord(int recordBytes, ulong sequence)
- {
- if (_blockCount == 0)
- _blockCount = 1;
+ byte[] originalPayload;
+ try
+ {
+ originalPayload = RestorePayload(Convert.FromBase64String(record.PayloadBase64 ?? string.Empty));
+ }
+ catch
+ {
+ // Re-throw for integrity failures (e.g., wrong encryption key).
+ throw;
+ }
- if (_activeBlockBytes > 0 && _activeBlockBytes + recordBytes > _options.BlockSizeBytes)
- {
- _blockCount++;
- _activeBlockBytes = 0;
+ legacyMessages.Add((record.Sequence, record.Subject ?? string.Empty, originalPayload, record.TimestampUtc));
}
- _index[sequence] = new BlockPointer(_blockCount, _writeOffset);
- _activeBlockBytes += recordBytes;
- _writeOffset += recordBytes;
+ if (legacyMessages.Count == 0)
+ {
+ // Delete the empty JSONL file.
+ File.Delete(jsonlPath);
+ var manifestPath = Path.Combine(_options.Directory, _options.IndexManifestFileName);
+ if (File.Exists(manifestPath))
+ File.Delete(manifestPath);
+ return;
+ }
+
+ // Add to the in-memory cache.
+ foreach (var (seq, subject, payload, ts) in legacyMessages)
+ {
+ _messages[seq] = new StoredMessage
+ {
+ Sequence = seq,
+ Subject = subject,
+ Payload = payload,
+ TimestampUtc = ts,
+ };
+ if (seq > _last)
+ _last = seq;
+ }
+
+ // Write all messages to fresh blocks.
+ RewriteBlocks();
+
+ // Delete the legacy files.
+ File.Delete(jsonlPath);
+ var manifestFile = Path.Combine(_options.Directory, _options.IndexManifestFileName);
+ if (File.Exists(manifestFile))
+ File.Delete(manifestFile);
}
+ // -------------------------------------------------------------------------
+ // Expiry
+ // -------------------------------------------------------------------------
+
private void PruneExpired(DateTime nowUtc)
{
if (_options.MaxAgeMs <= 0)
@@ -349,7 +536,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
foreach (var sequence in expired)
_messages.Remove(sequence);
- RewriteDataFile();
+ RewriteBlocks();
}
// -------------------------------------------------------------------------
@@ -586,22 +773,4 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
public string? PayloadBase64 { get; init; }
public DateTime TimestampUtc { get; init; }
}
-
- private readonly record struct BlockPointer(int BlockId, long Offset);
-
- private sealed class IndexManifest
- {
- public int Version { get; init; }
- public int BlockCount { get; init; }
- public long ActiveBlockBytes { get; init; }
- public long WriteOffset { get; init; }
- public List Entries { get; init; } = [];
- }
-
- private sealed class IndexEntry
- {
- public ulong Sequence { get; init; }
- public int BlockId { get; init; }
- public long Offset { get; init; }
- }
}
diff --git a/src/NATS.Server/JetStream/Storage/MsgBlock.cs b/src/NATS.Server/JetStream/Storage/MsgBlock.cs
index 553210c..75d5c3d 100644
--- a/src/NATS.Server/JetStream/Storage/MsgBlock.cs
+++ b/src/NATS.Server/JetStream/Storage/MsgBlock.cs
@@ -149,7 +149,7 @@ public sealed class MsgBlock : IDisposable
}
///
- /// Appends a message to the block.
+ /// Appends a message to the block with an auto-assigned sequence number.
///
/// NATS subject.
/// Optional message headers.
@@ -199,6 +199,56 @@ public sealed class MsgBlock : IDisposable
}
}
+ ///
+ /// Appends a message to the block with an explicit sequence number and timestamp.
+ /// Used by FileStore when rewriting blocks from the in-memory cache where
+ /// sequences may have gaps (from prior removals).
+ ///
+ /// Explicit sequence number to assign.
+ /// NATS subject.
+ /// Optional message headers.
+ /// Message body payload.
+ /// Timestamp in Unix nanoseconds.
+ /// Block is sealed.
+ public void WriteAt(ulong sequence, string subject, ReadOnlyMemory headers, ReadOnlyMemory payload, long timestamp)
+ {
+ _lock.EnterWriteLock();
+ try
+ {
+ if (_writeOffset >= _maxBytes)
+ throw new InvalidOperationException("Block is sealed; cannot write new messages.");
+
+ var record = new MessageRecord
+ {
+ Sequence = sequence,
+ Subject = subject,
+ Headers = headers,
+ Payload = payload,
+ Timestamp = timestamp,
+ Deleted = false,
+ };
+
+ var encoded = MessageRecord.Encode(record);
+ var offset = _writeOffset;
+
+ RandomAccess.Write(_handle, encoded, offset);
+ _writeOffset = offset + encoded.Length;
+
+ _index[sequence] = (offset, encoded.Length);
+
+ if (_totalWritten == 0)
+ _firstSequence = sequence;
+
+ _lastSequence = sequence;
+ _nextSequence = Math.Max(_nextSequence, sequence + 1);
+ _totalWritten++;
+ }
+ finally
+ {
+ _lock.ExitWriteLock();
+ }
+ }
+
///
/// Reads a message by sequence number. Uses positional I/O
/// () so concurrent readers don't
diff --git a/tests/NATS.Server.Tests/JetStream/JetStreamFileStoreCompressionEncryptionParityTests.cs b/tests/NATS.Server.Tests/JetStream/JetStreamFileStoreCompressionEncryptionParityTests.cs
index c051a3e..812f9cf 100644
--- a/tests/NATS.Server.Tests/JetStream/JetStreamFileStoreCompressionEncryptionParityTests.cs
+++ b/tests/NATS.Server.Tests/JetStream/JetStreamFileStoreCompressionEncryptionParityTests.cs
@@ -1,5 +1,4 @@
using System.Text;
-using System.Text.Json;
using NATS.Server.JetStream.Storage;
namespace NATS.Server.Tests;
@@ -29,10 +28,14 @@ public class JetStreamFileStoreCompressionEncryptionParityTests
Encoding.UTF8.GetString(loaded.Payload.ToArray()).ShouldBe("payload");
}
- var firstLine = File.ReadLines(Path.Combine(dir, "messages.jsonl")).First();
- var payloadBase64 = JsonDocument.Parse(firstLine).RootElement.GetProperty("PayloadBase64").GetString();
- payloadBase64.ShouldNotBeNull();
- var persisted = Convert.FromBase64String(payloadBase64!);
+ // Block-based storage: read the .blk file to verify FSV1 envelope.
+ var blkFiles = Directory.GetFiles(dir, "*.blk");
+ blkFiles.Length.ShouldBeGreaterThan(0);
+
+ // Read the first record from the block file and verify FSV1 magic in payload.
+ var blkBytes = File.ReadAllBytes(blkFiles[0]);
+ var record = MessageRecord.Decode(blkBytes.AsSpan(0, MessageRecord.MeasureRecord(blkBytes)));
+ var persisted = record.Payload.ToArray();
persisted.Take(4).SequenceEqual("FSV1"u8.ToArray()).ShouldBeTrue();
Should.Throw(() =>
diff --git a/tests/NATS.Server.Tests/JetStream/JetStreamFileStoreDurabilityParityTests.cs b/tests/NATS.Server.Tests/JetStream/JetStreamFileStoreDurabilityParityTests.cs
index 0b51138..cd95bf2 100644
--- a/tests/NATS.Server.Tests/JetStream/JetStreamFileStoreDurabilityParityTests.cs
+++ b/tests/NATS.Server.Tests/JetStream/JetStreamFileStoreDurabilityParityTests.cs
@@ -23,10 +23,10 @@ public class JetStreamFileStoreDurabilityParityTests
await store.AppendAsync("orders.created", Encoding.UTF8.GetBytes($"payload-{i}"), default);
}
- File.Exists(Path.Combine(dir, options.IndexManifestFileName)).ShouldBeTrue();
+ // Block-based storage: .blk files should be present on disk.
+ Directory.GetFiles(dir, "*.blk").Length.ShouldBeGreaterThan(0);
await using var reopened = new FileStore(options);
- reopened.UsedIndexManifestOnStartup.ShouldBeTrue();
var state = await reopened.GetStateAsync(default);
state.Messages.ShouldBe((ulong)1000);
reopened.BlockCount.ShouldBeGreaterThan(1);
diff --git a/tests/NATS.Server.Tests/JetStream/Storage/FileStoreBlockTests.cs b/tests/NATS.Server.Tests/JetStream/Storage/FileStoreBlockTests.cs
new file mode 100644
index 0000000..a59931e
--- /dev/null
+++ b/tests/NATS.Server.Tests/JetStream/Storage/FileStoreBlockTests.cs
@@ -0,0 +1,289 @@
+// Reference: golang/nats-server/server/filestore_test.go
+// Tests for Task A3: FileStore Block Manager Rewrite.
+// Verifies that FileStore correctly uses MsgBlock-based storage:
+// block files on disk, block rotation, recovery, purge, snapshot,
+// soft-delete, and payload transformation (S2/AEAD) integration.
+
+using System.Text;
+using NATS.Server.JetStream.Storage;
+
+namespace NATS.Server.Tests.JetStream.Storage;
+
+public sealed class FileStoreBlockTests : IDisposable
+{
+ private readonly string _dir;
+
+ public FileStoreBlockTests()
+ {
+ _dir = Path.Combine(Path.GetTempPath(), $"nats-js-fs-block-{Guid.NewGuid():N}");
+ Directory.CreateDirectory(_dir);
+ }
+
+ public void Dispose()
+ {
+ if (Directory.Exists(_dir))
+ Directory.Delete(_dir, recursive: true);
+ }
+
+ private FileStore CreateStore(string subdirectory, FileStoreOptions? options = null)
+ {
+ var dir = Path.Combine(_dir, subdirectory);
+ var opts = options ?? new FileStoreOptions();
+ opts.Directory = dir;
+ return new FileStore(opts);
+ }
+
+ // Go: filestore.go block-based storage — verify .blk files are created on disk.
+ [Fact]
+ public async Task Append_UsesBlockStorage()
+ {
+ var subDir = "blk-storage";
+ var dir = Path.Combine(_dir, subDir);
+
+ await using var store = CreateStore(subDir);
+
+ await store.AppendAsync("foo", "Hello World"u8.ToArray(), default);
+
+ // At least one .blk file should exist in the store directory.
+ var blkFiles = Directory.GetFiles(dir, "*.blk");
+ blkFiles.Length.ShouldBeGreaterThanOrEqualTo(1);
+
+ // The old JSONL file should NOT exist.
+ File.Exists(Path.Combine(dir, "messages.jsonl")).ShouldBeFalse();
+ }
+
+ // Go: filestore.go block rotation — rbytes check causes new block creation.
+ [Fact]
+ public async Task MultiBlock_RotatesWhenFull()
+ {
+ var subDir = "blk-rotation";
+ var dir = Path.Combine(_dir, subDir);
+
+ // Small block size to force rotation quickly.
+ await using var store = CreateStore(subDir, new FileStoreOptions { BlockSizeBytes = 256 });
+
+ // Write enough messages to exceed 256 bytes per block.
+ for (var i = 0; i < 20; i++)
+ await store.AppendAsync("foo", "Hello World - block rotation test!"u8.ToArray(), default);
+
+ var state = await store.GetStateAsync(default);
+ state.Messages.ShouldBe((ulong)20);
+
+ // Multiple .blk files should be created.
+ var blkFiles = Directory.GetFiles(dir, "*.blk");
+ blkFiles.Length.ShouldBeGreaterThan(1);
+
+ // BlockCount should reflect multiple blocks.
+ store.BlockCount.ShouldBeGreaterThan(1);
+ }
+
+ // Go: filestore.go multi-block load — messages span multiple blocks.
+ [Fact]
+ public async Task Load_AcrossBlocks()
+ {
+ var subDir = "blk-across";
+
+ // Small block size to force multiple blocks.
+ await using var store = CreateStore(subDir, new FileStoreOptions { BlockSizeBytes = 256 });
+
+ for (var i = 0; i < 20; i++)
+ await store.AppendAsync("foo", Encoding.UTF8.GetBytes($"msg-{i:D4}"), default);
+
+ // Verify we have multiple blocks.
+ store.BlockCount.ShouldBeGreaterThan(1);
+
+ // All messages should be loadable, regardless of which block they are in.
+ for (ulong i = 1; i <= 20; i++)
+ {
+ var msg = await store.LoadAsync(i, default);
+ msg.ShouldNotBeNull();
+ msg!.Subject.ShouldBe("foo");
+ var expected = Encoding.UTF8.GetBytes($"msg-{(int)(i - 1):D4}");
+ msg.Payload.ToArray().ShouldBe(expected);
+ }
+ }
+
+ // Go: filestore.go recovery — block files are rescanned on startup.
+ [Fact]
+ public async Task Recovery_AfterRestart()
+ {
+ var subDir = "blk-recovery";
+ var dir = Path.Combine(_dir, subDir);
+
+ // Write data and dispose.
+ await using (var store = CreateStore(subDir, new FileStoreOptions { BlockSizeBytes = 256 }))
+ {
+ for (var i = 0; i < 20; i++)
+ await store.AppendAsync("foo", Encoding.UTF8.GetBytes($"msg-{i:D4}"), default);
+
+ var state = await store.GetStateAsync(default);
+ state.Messages.ShouldBe((ulong)20);
+ }
+
+ // .blk files should still exist after dispose.
+ var blkFiles = Directory.GetFiles(dir, "*.blk");
+ blkFiles.Length.ShouldBeGreaterThan(0);
+
+ // Recreate FileStore from the same directory.
+ await using (var store = CreateStore(subDir, new FileStoreOptions { BlockSizeBytes = 256 }))
+ {
+ var state = await store.GetStateAsync(default);
+ state.Messages.ShouldBe((ulong)20);
+ state.FirstSeq.ShouldBe((ulong)1);
+ state.LastSeq.ShouldBe((ulong)20);
+
+ // Verify all messages are intact.
+ for (ulong i = 1; i <= 20; i++)
+ {
+ var msg = await store.LoadAsync(i, default);
+ msg.ShouldNotBeNull();
+ var expected = Encoding.UTF8.GetBytes($"msg-{(int)(i - 1):D4}");
+ msg!.Payload.ToArray().ShouldBe(expected);
+ }
+ }
+ }
+
+ // Go: filestore.go purge — all blocks removed, fresh block created.
+ [Fact]
+ public async Task Purge_CleansAllBlocks()
+ {
+ var subDir = "blk-purge";
+ var dir = Path.Combine(_dir, subDir);
+
+ await using var store = CreateStore(subDir, new FileStoreOptions { BlockSizeBytes = 256 });
+
+ for (var i = 0; i < 20; i++)
+ await store.AppendAsync("foo", "Hello"u8.ToArray(), default);
+
+ // Before purge, multiple .blk files should exist.
+ Directory.GetFiles(dir, "*.blk").Length.ShouldBeGreaterThan(0);
+
+ await store.PurgeAsync(default);
+
+ var state = await store.GetStateAsync(default);
+ state.Messages.ShouldBe((ulong)0);
+ state.Bytes.ShouldBe((ulong)0);
+
+ // After purge, no old .blk files should remain (or they should be empty/recreated).
+ // The old JSONL file should also not exist.
+ File.Exists(Path.Combine(dir, "messages.jsonl")).ShouldBeFalse();
+ }
+
+ // Go: filestore.go dmap — soft-delete within a block.
+ [Fact]
+ public async Task Remove_SoftDeletesInBlock()
+ {
+ await using var store = CreateStore("blk-remove");
+
+ for (var i = 0; i < 5; i++)
+ await store.AppendAsync("foo", "data"u8.ToArray(), default);
+
+ // Remove sequence 3.
+ (await store.RemoveAsync(3, default)).ShouldBeTrue();
+
+ // Verify seq 3 returns null.
+ (await store.LoadAsync(3, default)).ShouldBeNull();
+
+ // Other sequences still loadable.
+ (await store.LoadAsync(1, default)).ShouldNotBeNull();
+ (await store.LoadAsync(2, default)).ShouldNotBeNull();
+ (await store.LoadAsync(4, default)).ShouldNotBeNull();
+ (await store.LoadAsync(5, default)).ShouldNotBeNull();
+
+ // State reflects the removal.
+ var state = await store.GetStateAsync(default);
+ state.Messages.ShouldBe((ulong)4);
+ }
+
+ // Go: filestore.go snapshot — iterates all blocks for snapshot creation.
+ [Fact]
+ public async Task Snapshot_IncludesAllBlocks()
+ {
+ await using var srcStore = CreateStore("blk-snap-src", new FileStoreOptions { BlockSizeBytes = 256 });
+
+ for (var i = 0; i < 30; i++)
+ await srcStore.AppendAsync("foo", Encoding.UTF8.GetBytes($"msg-{i}"), default);
+
+ // Verify multiple blocks.
+ srcStore.BlockCount.ShouldBeGreaterThan(1);
+
+ var snap = await srcStore.CreateSnapshotAsync(default);
+ snap.Length.ShouldBeGreaterThan(0);
+
+ // Restore into a new store.
+ await using var dstStore = CreateStore("blk-snap-dst");
+ await dstStore.RestoreSnapshotAsync(snap, default);
+
+ var srcState = await srcStore.GetStateAsync(default);
+ var dstState = await dstStore.GetStateAsync(default);
+ dstState.Messages.ShouldBe(srcState.Messages);
+ dstState.FirstSeq.ShouldBe(srcState.FirstSeq);
+ dstState.LastSeq.ShouldBe(srcState.LastSeq);
+
+ // Verify each message round-trips.
+ for (ulong i = 1; i <= srcState.Messages; i++)
+ {
+ var original = await srcStore.LoadAsync(i, default);
+ var copy = await dstStore.LoadAsync(i, default);
+ copy.ShouldNotBeNull();
+ copy!.Subject.ShouldBe(original!.Subject);
+ copy.Payload.ToArray().ShouldBe(original.Payload.ToArray());
+ }
+ }
+
+ // Go: filestore.go S2 compression — payload is compressed before block write.
+ [Fact]
+ public async Task Compression_RoundTrip()
+ {
+ var subDir = "blk-compress";
+
+ await using var store = CreateStore(subDir, new FileStoreOptions
+ {
+ Compression = StoreCompression.S2Compression,
+ });
+
+ var payload = "Hello, S2 compressed block storage!"u8.ToArray();
+ for (var i = 0; i < 10; i++)
+ await store.AppendAsync("foo", payload, default);
+
+ var state = await store.GetStateAsync(default);
+ state.Messages.ShouldBe((ulong)10);
+
+ // Verify all messages are readable with correct payload.
+ for (ulong i = 1; i <= 10; i++)
+ {
+ var msg = await store.LoadAsync(i, default);
+ msg.ShouldNotBeNull();
+ msg!.Payload.ToArray().ShouldBe(payload);
+ }
+ }
+
+ // Go: filestore.go AEAD encryption — payload is encrypted before block write.
+ [Fact]
+ public async Task Encryption_RoundTrip()
+ {
+ var subDir = "blk-encrypt";
+ var key = "nats-v2-test-key-exactly-32-bytes"u8[..32].ToArray();
+
+ await using var store = CreateStore(subDir, new FileStoreOptions
+ {
+ Cipher = StoreCipher.ChaCha,
+ EncryptionKey = key,
+ });
+
+ var payload = "Hello, AEAD encrypted block storage!"u8.ToArray();
+ for (var i = 0; i < 10; i++)
+ await store.AppendAsync("foo", payload, default);
+
+ var state = await store.GetStateAsync(default);
+ state.Messages.ShouldBe((ulong)10);
+
+ // Verify all messages are readable with correct payload.
+ for (ulong i = 1; i <= 10; i++)
+ {
+ var msg = await store.LoadAsync(i, default);
+ msg.ShouldNotBeNull();
+ msg!.Payload.ToArray().ShouldBe(payload);
+ }
+ }
+}