feat(storage): add MsgBlock block-based message storage unit

MsgBlock is the unit of storage in the file store — a single append-only
block file containing sequentially written binary message records. Blocks
are sealed (read-only) when they reach a configurable byte-size limit.

Key features:
- Write: appends MessageRecord-encoded messages with auto-incrementing
  sequence numbers and configurable first sequence offset
- Read: positional I/O via RandomAccess.Read for concurrent reader safety
- Delete: soft-delete with on-disk persistence (re-encodes flags byte +
  checksum so deletions survive recovery)
- Recovery: rebuilds in-memory index by scanning block file using
  MessageRecord.MeasureRecord for record boundary detection
- Thread safety: ReaderWriterLockSlim allows concurrent reads during writes

Also adds MessageRecord.MeasureRecord() — computes a record's byte length
by parsing varint field headers without full decode, needed for sequential
record scanning during block recovery.

Reference: golang/nats-server/server/filestore.go:217-267 (msgBlock struct)

12 tests covering write, read, delete, seal, recovery, concurrency,
and custom sequence offsets.
This commit is contained in:
Joseph Doherty
2026-02-24 12:21:33 -05:00
parent 17731e2af5
commit 09252b8c79
3 changed files with 653 additions and 0 deletions

View File

@@ -0,0 +1,358 @@
// Reference: golang/nats-server/server/filestore.go:217-267 (msgBlock struct)
// Go block write: filestore.go:6700-6760 (writeMsgRecord / writeMsgRecordLocked)
// Go block load: filestore.go:8140-8260 (loadMsgs / msgFromBufEx)
// Go deletion: filestore.go dmap (avl.SequenceSet) for soft-deletes
// Go sealing: filestore.go rbytes check — block rolls when rbytes >= maxBytes
//
// MsgBlock is the unit of storage in the file store. Messages are appended
// sequentially as binary records (using MessageRecord). Blocks are sealed
// (read-only) when they reach a configurable size limit.
using Microsoft.Win32.SafeHandles;
namespace NATS.Server.JetStream.Storage;
/// <summary>
/// A block of messages stored in a single append-only file on disk.
/// This is the unit of storage in the file store. Messages are appended
/// sequentially as binary records. Blocks become sealed (read-only) when
/// they reach a configurable byte-size limit.
/// </summary>
public sealed class MsgBlock : IDisposable
{
private readonly FileStream _file;
private readonly SafeFileHandle _handle;
private readonly Dictionary<ulong, (long Offset, int Length)> _index = new();
private readonly HashSet<ulong> _deleted = new();
private readonly long _maxBytes;
private readonly ReaderWriterLockSlim _lock = new();
private long _writeOffset; // Tracks the append position independently of FileStream.Position
private ulong _nextSequence;
private ulong _firstSequence;
private ulong _lastSequence;
private ulong _totalWritten; // Total records written (including later-deleted)
private bool _disposed;
private MsgBlock(FileStream file, int blockId, long maxBytes, ulong firstSequence)
{
_file = file;
_handle = file.SafeFileHandle;
BlockId = blockId;
_maxBytes = maxBytes;
_firstSequence = firstSequence;
_nextSequence = firstSequence;
_writeOffset = file.Length;
}
/// <summary>Block identifier.</summary>
public int BlockId { get; }
/// <summary>First sequence number in this block.</summary>
public ulong FirstSequence
{
get
{
_lock.EnterReadLock();
try { return _firstSequence; }
finally { _lock.ExitReadLock(); }
}
}
/// <summary>Last sequence number written.</summary>
public ulong LastSequence
{
get
{
_lock.EnterReadLock();
try { return _lastSequence; }
finally { _lock.ExitReadLock(); }
}
}
/// <summary>Total messages excluding deleted.</summary>
public ulong MessageCount
{
get
{
_lock.EnterReadLock();
try { return _totalWritten - (ulong)_deleted.Count; }
finally { _lock.ExitReadLock(); }
}
}
/// <summary>Count of soft-deleted messages.</summary>
public ulong DeletedCount
{
get
{
_lock.EnterReadLock();
try { return (ulong)_deleted.Count; }
finally { _lock.ExitReadLock(); }
}
}
/// <summary>Total bytes written to block file.</summary>
public long BytesUsed
{
get
{
_lock.EnterReadLock();
try { return _writeOffset; }
finally { _lock.ExitReadLock(); }
}
}
/// <summary>True when BytesUsed >= maxBytes (block is full).</summary>
public bool IsSealed
{
get
{
_lock.EnterReadLock();
try { return _writeOffset >= _maxBytes; }
finally { _lock.ExitReadLock(); }
}
}
/// <summary>
/// Creates a new empty block file.
/// </summary>
/// <param name="blockId">Block identifier.</param>
/// <param name="directoryPath">Directory to store the block file.</param>
/// <param name="maxBytes">Size limit before sealing.</param>
/// <param name="firstSequence">First sequence number (default 1).</param>
/// <returns>A new <see cref="MsgBlock"/> ready for writes.</returns>
public static MsgBlock Create(int blockId, string directoryPath, long maxBytes, ulong firstSequence = 1)
{
Directory.CreateDirectory(directoryPath);
var filePath = BlockFilePath(directoryPath, blockId);
var file = new FileStream(filePath, FileMode.CreateNew, FileAccess.ReadWrite, FileShare.Read);
return new MsgBlock(file, blockId, maxBytes, firstSequence);
}
/// <summary>
/// Recovers a block from an existing file, rebuilding the in-memory index.
/// </summary>
/// <param name="blockId">Block identifier.</param>
/// <param name="directoryPath">Directory containing the block file.</param>
/// <returns>A recovered <see cref="MsgBlock"/>.</returns>
public static MsgBlock Recover(int blockId, string directoryPath)
{
var filePath = BlockFilePath(directoryPath, blockId);
var file = new FileStream(filePath, FileMode.Open, FileAccess.ReadWrite, FileShare.Read);
// We don't know maxBytes from the file alone — use long.MaxValue so
// the recovered block is effectively unsealed. The caller can re-create
// with proper limits if needed.
var block = new MsgBlock(file, blockId, long.MaxValue, firstSequence: 0);
block.RebuildIndex();
return block;
}
/// <summary>
/// Appends a message to the block.
/// </summary>
/// <param name="subject">NATS subject.</param>
/// <param name="headers">Optional message headers.</param>
/// <param name="payload">Message body payload.</param>
/// <returns>The assigned sequence number.</returns>
/// <exception cref="InvalidOperationException">Block is sealed.</exception>
public ulong Write(string subject, ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload)
{
_lock.EnterWriteLock();
try
{
if (_writeOffset >= _maxBytes)
throw new InvalidOperationException("Block is sealed; cannot write new messages.");
var sequence = _nextSequence;
var record = new MessageRecord
{
Sequence = sequence,
Subject = subject,
Headers = headers,
Payload = payload,
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L,
Deleted = false,
};
var encoded = MessageRecord.Encode(record);
var offset = _writeOffset;
// Write at the current append offset using positional I/O
RandomAccess.Write(_handle, encoded, offset);
_writeOffset = offset + encoded.Length;
_index[sequence] = (offset, encoded.Length);
if (_totalWritten == 0)
_firstSequence = sequence;
_lastSequence = sequence;
_nextSequence = sequence + 1;
_totalWritten++;
return sequence;
}
finally
{
_lock.ExitWriteLock();
}
}
/// <summary>
/// Reads a message by sequence number. Uses positional I/O
/// (<see cref="RandomAccess.Read"/>) so concurrent readers don't
/// interfere with each other or the writer's append position.
/// </summary>
/// <param name="sequence">The sequence number to read.</param>
/// <returns>The decoded record, or null if not found or deleted.</returns>
public MessageRecord? Read(ulong sequence)
{
_lock.EnterReadLock();
try
{
if (_deleted.Contains(sequence))
return null;
if (!_index.TryGetValue(sequence, out var entry))
return null;
var buffer = new byte[entry.Length];
RandomAccess.Read(_handle, buffer, entry.Offset);
return MessageRecord.Decode(buffer);
}
finally
{
_lock.ExitReadLock();
}
}
/// <summary>
/// Soft-deletes a message by sequence number. Re-encodes the record on disk
/// with the deleted flag set (and updated checksum) so the deletion survives recovery.
/// </summary>
/// <param name="sequence">The sequence number to delete.</param>
/// <returns>True if the message was deleted; false if already deleted or not found.</returns>
public bool Delete(ulong sequence)
{
_lock.EnterWriteLock();
try
{
if (!_index.TryGetValue(sequence, out var entry))
return false;
if (!_deleted.Add(sequence))
return false;
// Read the existing record, re-encode with Deleted flag, write back in-place.
// The encoded size doesn't change (only flags byte + checksum differ).
var buffer = new byte[entry.Length];
RandomAccess.Read(_handle, buffer, entry.Offset);
var record = MessageRecord.Decode(buffer);
var deletedRecord = new MessageRecord
{
Sequence = record.Sequence,
Subject = record.Subject,
Headers = record.Headers,
Payload = record.Payload,
Timestamp = record.Timestamp,
Deleted = true,
};
var encoded = MessageRecord.Encode(deletedRecord);
RandomAccess.Write(_handle, encoded, entry.Offset);
return true;
}
finally
{
_lock.ExitWriteLock();
}
}
/// <summary>
/// Flushes any buffered writes to disk.
/// </summary>
public void Flush()
{
_lock.EnterWriteLock();
try
{
_file.Flush(flushToDisk: true);
}
finally
{
_lock.ExitWriteLock();
}
}
/// <summary>
/// Closes the file handle and releases resources.
/// </summary>
public void Dispose()
{
if (_disposed)
return;
_disposed = true;
_lock.EnterWriteLock();
try
{
_file.Flush();
_file.Dispose();
}
finally
{
_lock.ExitWriteLock();
}
_lock.Dispose();
}
/// <summary>
/// Rebuilds the in-memory index by scanning all records in the block file.
/// Uses <see cref="MessageRecord.MeasureRecord"/> to determine each record's
/// size before decoding, so trailing data from subsequent records doesn't
/// corrupt the checksum validation.
/// </summary>
private void RebuildIndex()
{
var fileLength = _file.Length;
long offset = 0;
ulong count = 0;
while (offset < fileLength)
{
// Read remaining bytes from current offset using positional I/O
var remaining = (int)(fileLength - offset);
var buffer = new byte[remaining];
RandomAccess.Read(_handle, buffer, offset);
// Measure the first record's length, then decode only that slice
var recordLength = MessageRecord.MeasureRecord(buffer);
var record = MessageRecord.Decode(buffer.AsSpan(0, recordLength));
_index[record.Sequence] = (offset, recordLength);
if (record.Deleted)
_deleted.Add(record.Sequence);
if (count == 0)
_firstSequence = record.Sequence;
_lastSequence = record.Sequence;
_nextSequence = record.Sequence + 1;
count++;
offset += recordLength;
}
_totalWritten = count;
_writeOffset = offset;
}
private static string BlockFilePath(string directoryPath, int blockId)
=> Path.Combine(directoryPath, $"{blockId:D6}.blk");
}