diff --git a/src/NATS.Server/JetStream/Storage/MessageRecord.cs b/src/NATS.Server/JetStream/Storage/MessageRecord.cs index 0f9b2f5..4d1c912 100644 --- a/src/NATS.Server/JetStream/Storage/MessageRecord.cs +++ b/src/NATS.Server/JetStream/Storage/MessageRecord.cs @@ -215,6 +215,38 @@ public sealed class MessageRecord throw new InvalidDataException("Varint is truncated."); } + /// + /// Measures the total byte length of the first record in a buffer without fully decoding it. + /// This parses the varint-encoded field lengths to compute the record size. + /// + /// Buffer that starts with a record (may contain trailing data). + /// The total byte length of the first record. + /// If the buffer is too short to contain a valid record header. + public static int MeasureRecord(ReadOnlySpan data) + { + if (data.Length < 1 + 3 + TrailerSize) + throw new InvalidDataException("Buffer too short to contain a record."); + + var offset = 1; // flags byte + + // Subject length + var (subjectLen, subjectLenBytes) = ReadVarint(data[offset..]); + offset += subjectLenBytes + (int)subjectLen; + + // Headers length + var (headersLen, headersLenBytes) = ReadVarint(data[offset..]); + offset += headersLenBytes + (int)headersLen; + + // Payload length + var (payloadLen, payloadLenBytes) = ReadVarint(data[offset..]); + offset += payloadLenBytes + (int)payloadLen; + + // Trailer: sequence(8) + timestamp(8) + checksum(8) + offset += TrailerSize; + + return offset; + } + /// /// Returns the number of bytes needed to encode a varint. /// diff --git a/src/NATS.Server/JetStream/Storage/MsgBlock.cs b/src/NATS.Server/JetStream/Storage/MsgBlock.cs new file mode 100644 index 0000000..553210c --- /dev/null +++ b/src/NATS.Server/JetStream/Storage/MsgBlock.cs @@ -0,0 +1,358 @@ +// Reference: golang/nats-server/server/filestore.go:217-267 (msgBlock struct) +// Go block write: filestore.go:6700-6760 (writeMsgRecord / writeMsgRecordLocked) +// Go block load: filestore.go:8140-8260 (loadMsgs / msgFromBufEx) +// Go deletion: filestore.go dmap (avl.SequenceSet) for soft-deletes +// Go sealing: filestore.go rbytes check — block rolls when rbytes >= maxBytes +// +// MsgBlock is the unit of storage in the file store. Messages are appended +// sequentially as binary records (using MessageRecord). Blocks are sealed +// (read-only) when they reach a configurable size limit. + +using Microsoft.Win32.SafeHandles; + +namespace NATS.Server.JetStream.Storage; + +/// +/// A block of messages stored in a single append-only file on disk. +/// This is the unit of storage in the file store. Messages are appended +/// sequentially as binary records. Blocks become sealed (read-only) when +/// they reach a configurable byte-size limit. +/// +public sealed class MsgBlock : IDisposable +{ + private readonly FileStream _file; + private readonly SafeFileHandle _handle; + private readonly Dictionary _index = new(); + private readonly HashSet _deleted = new(); + private readonly long _maxBytes; + private readonly ReaderWriterLockSlim _lock = new(); + private long _writeOffset; // Tracks the append position independently of FileStream.Position + private ulong _nextSequence; + private ulong _firstSequence; + private ulong _lastSequence; + private ulong _totalWritten; // Total records written (including later-deleted) + private bool _disposed; + + private MsgBlock(FileStream file, int blockId, long maxBytes, ulong firstSequence) + { + _file = file; + _handle = file.SafeFileHandle; + BlockId = blockId; + _maxBytes = maxBytes; + _firstSequence = firstSequence; + _nextSequence = firstSequence; + _writeOffset = file.Length; + } + + /// Block identifier. + public int BlockId { get; } + + /// First sequence number in this block. + public ulong FirstSequence + { + get + { + _lock.EnterReadLock(); + try { return _firstSequence; } + finally { _lock.ExitReadLock(); } + } + } + + /// Last sequence number written. + public ulong LastSequence + { + get + { + _lock.EnterReadLock(); + try { return _lastSequence; } + finally { _lock.ExitReadLock(); } + } + } + + /// Total messages excluding deleted. + public ulong MessageCount + { + get + { + _lock.EnterReadLock(); + try { return _totalWritten - (ulong)_deleted.Count; } + finally { _lock.ExitReadLock(); } + } + } + + /// Count of soft-deleted messages. + public ulong DeletedCount + { + get + { + _lock.EnterReadLock(); + try { return (ulong)_deleted.Count; } + finally { _lock.ExitReadLock(); } + } + } + + /// Total bytes written to block file. + public long BytesUsed + { + get + { + _lock.EnterReadLock(); + try { return _writeOffset; } + finally { _lock.ExitReadLock(); } + } + } + + /// True when BytesUsed >= maxBytes (block is full). + public bool IsSealed + { + get + { + _lock.EnterReadLock(); + try { return _writeOffset >= _maxBytes; } + finally { _lock.ExitReadLock(); } + } + } + + /// + /// Creates a new empty block file. + /// + /// Block identifier. + /// Directory to store the block file. + /// Size limit before sealing. + /// First sequence number (default 1). + /// A new ready for writes. + public static MsgBlock Create(int blockId, string directoryPath, long maxBytes, ulong firstSequence = 1) + { + Directory.CreateDirectory(directoryPath); + var filePath = BlockFilePath(directoryPath, blockId); + var file = new FileStream(filePath, FileMode.CreateNew, FileAccess.ReadWrite, FileShare.Read); + return new MsgBlock(file, blockId, maxBytes, firstSequence); + } + + /// + /// Recovers a block from an existing file, rebuilding the in-memory index. + /// + /// Block identifier. + /// Directory containing the block file. + /// A recovered . + public static MsgBlock Recover(int blockId, string directoryPath) + { + var filePath = BlockFilePath(directoryPath, blockId); + var file = new FileStream(filePath, FileMode.Open, FileAccess.ReadWrite, FileShare.Read); + + // We don't know maxBytes from the file alone — use long.MaxValue so + // the recovered block is effectively unsealed. The caller can re-create + // with proper limits if needed. + var block = new MsgBlock(file, blockId, long.MaxValue, firstSequence: 0); + block.RebuildIndex(); + return block; + } + + /// + /// Appends a message to the block. + /// + /// NATS subject. + /// Optional message headers. + /// Message body payload. + /// The assigned sequence number. + /// Block is sealed. + public ulong Write(string subject, ReadOnlyMemory headers, ReadOnlyMemory payload) + { + _lock.EnterWriteLock(); + try + { + if (_writeOffset >= _maxBytes) + throw new InvalidOperationException("Block is sealed; cannot write new messages."); + + var sequence = _nextSequence; + var record = new MessageRecord + { + Sequence = sequence, + Subject = subject, + Headers = headers, + Payload = payload, + Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L, + Deleted = false, + }; + + var encoded = MessageRecord.Encode(record); + var offset = _writeOffset; + + // Write at the current append offset using positional I/O + RandomAccess.Write(_handle, encoded, offset); + _writeOffset = offset + encoded.Length; + + _index[sequence] = (offset, encoded.Length); + + if (_totalWritten == 0) + _firstSequence = sequence; + + _lastSequence = sequence; + _nextSequence = sequence + 1; + _totalWritten++; + + return sequence; + } + finally + { + _lock.ExitWriteLock(); + } + } + + /// + /// Reads a message by sequence number. Uses positional I/O + /// () so concurrent readers don't + /// interfere with each other or the writer's append position. + /// + /// The sequence number to read. + /// The decoded record, or null if not found or deleted. + public MessageRecord? Read(ulong sequence) + { + _lock.EnterReadLock(); + try + { + if (_deleted.Contains(sequence)) + return null; + + if (!_index.TryGetValue(sequence, out var entry)) + return null; + + var buffer = new byte[entry.Length]; + RandomAccess.Read(_handle, buffer, entry.Offset); + + return MessageRecord.Decode(buffer); + } + finally + { + _lock.ExitReadLock(); + } + } + + /// + /// Soft-deletes a message by sequence number. Re-encodes the record on disk + /// with the deleted flag set (and updated checksum) so the deletion survives recovery. + /// + /// The sequence number to delete. + /// True if the message was deleted; false if already deleted or not found. + public bool Delete(ulong sequence) + { + _lock.EnterWriteLock(); + try + { + if (!_index.TryGetValue(sequence, out var entry)) + return false; + + if (!_deleted.Add(sequence)) + return false; + + // Read the existing record, re-encode with Deleted flag, write back in-place. + // The encoded size doesn't change (only flags byte + checksum differ). + var buffer = new byte[entry.Length]; + RandomAccess.Read(_handle, buffer, entry.Offset); + var record = MessageRecord.Decode(buffer); + + var deletedRecord = new MessageRecord + { + Sequence = record.Sequence, + Subject = record.Subject, + Headers = record.Headers, + Payload = record.Payload, + Timestamp = record.Timestamp, + Deleted = true, + }; + + var encoded = MessageRecord.Encode(deletedRecord); + RandomAccess.Write(_handle, encoded, entry.Offset); + + return true; + } + finally + { + _lock.ExitWriteLock(); + } + } + + /// + /// Flushes any buffered writes to disk. + /// + public void Flush() + { + _lock.EnterWriteLock(); + try + { + _file.Flush(flushToDisk: true); + } + finally + { + _lock.ExitWriteLock(); + } + } + + /// + /// Closes the file handle and releases resources. + /// + public void Dispose() + { + if (_disposed) + return; + _disposed = true; + + _lock.EnterWriteLock(); + try + { + _file.Flush(); + _file.Dispose(); + } + finally + { + _lock.ExitWriteLock(); + } + + _lock.Dispose(); + } + + /// + /// Rebuilds the in-memory index by scanning all records in the block file. + /// Uses to determine each record's + /// size before decoding, so trailing data from subsequent records doesn't + /// corrupt the checksum validation. + /// + private void RebuildIndex() + { + var fileLength = _file.Length; + long offset = 0; + ulong count = 0; + + while (offset < fileLength) + { + // Read remaining bytes from current offset using positional I/O + var remaining = (int)(fileLength - offset); + var buffer = new byte[remaining]; + RandomAccess.Read(_handle, buffer, offset); + + // Measure the first record's length, then decode only that slice + var recordLength = MessageRecord.MeasureRecord(buffer); + var record = MessageRecord.Decode(buffer.AsSpan(0, recordLength)); + + _index[record.Sequence] = (offset, recordLength); + + if (record.Deleted) + _deleted.Add(record.Sequence); + + if (count == 0) + _firstSequence = record.Sequence; + + _lastSequence = record.Sequence; + _nextSequence = record.Sequence + 1; + count++; + + offset += recordLength; + } + + _totalWritten = count; + _writeOffset = offset; + } + + private static string BlockFilePath(string directoryPath, int blockId) + => Path.Combine(directoryPath, $"{blockId:D6}.blk"); +} diff --git a/tests/NATS.Server.Tests/JetStream/Storage/MsgBlockTests.cs b/tests/NATS.Server.Tests/JetStream/Storage/MsgBlockTests.cs new file mode 100644 index 0000000..6780c14 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Storage/MsgBlockTests.cs @@ -0,0 +1,263 @@ +// Reference: golang/nats-server/server/filestore.go:217-267 (msgBlock struct) +// Go block write: filestore.go:6700-6760 (writeMsgRecord) +// Go block load: filestore.go:8140-8260 (loadMsgs / msgFromBufEx) +// Go deletion: filestore.go dmap (avl.SequenceSet) for soft-deletes +// +// These tests verify the .NET MsgBlock abstraction — a block of messages stored +// in a single append-only file on disk, with in-memory index and soft-delete support. + +using System.Text; +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.Tests.JetStream.Storage; + +public sealed class MsgBlockTests : IDisposable +{ + private readonly string _testDir; + + public MsgBlockTests() + { + _testDir = Path.Combine(Path.GetTempPath(), $"msgblock_test_{Guid.NewGuid():N}"); + Directory.CreateDirectory(_testDir); + } + + public void Dispose() + { + try { Directory.Delete(_testDir, recursive: true); } + catch { /* best effort cleanup */ } + } + + // Go: writeMsgRecord — single message write returns first sequence + [Fact] + public void Write_SingleMessage_ReturnsSequence() + { + using var block = MsgBlock.Create(0, _testDir, maxBytes: 1024 * 1024); + var seq = block.Write("foo.bar", ReadOnlyMemory.Empty, "hello"u8.ToArray()); + seq.ShouldBe(1UL); + } + + // Go: writeMsgRecord — sequential writes increment sequence + [Fact] + public void Write_MultipleMessages_IncrementsSequence() + { + using var block = MsgBlock.Create(0, _testDir, maxBytes: 1024 * 1024); + var s1 = block.Write("a", ReadOnlyMemory.Empty, "one"u8.ToArray()); + var s2 = block.Write("b", ReadOnlyMemory.Empty, "two"u8.ToArray()); + var s3 = block.Write("c", ReadOnlyMemory.Empty, "three"u8.ToArray()); + + s1.ShouldBe(1UL); + s2.ShouldBe(2UL); + s3.ShouldBe(3UL); + block.MessageCount.ShouldBe(3UL); + } + + // Go: loadMsgs / msgFromBufEx — read back by sequence number + [Fact] + public void Read_BySequence_ReturnsMessage() + { + using var block = MsgBlock.Create(0, _testDir, maxBytes: 1024 * 1024); + block.Write("test.subject", ReadOnlyMemory.Empty, "payload data"u8.ToArray()); + + var record = block.Read(1); + record.ShouldNotBeNull(); + record.Sequence.ShouldBe(1UL); + record.Subject.ShouldBe("test.subject"); + Encoding.UTF8.GetString(record.Payload.Span).ShouldBe("payload data"); + } + + // Go: loadMsgs — reading a non-existent sequence returns nil + [Fact] + public void Read_NonexistentSequence_ReturnsNull() + { + using var block = MsgBlock.Create(0, _testDir, maxBytes: 1024 * 1024); + block.Write("a", ReadOnlyMemory.Empty, "data"u8.ToArray()); + + var record = block.Read(999); + record.ShouldBeNull(); + } + + // Go: filestore.go rbytes check — block seals when size exceeds maxBytes + [Fact] + public void IsSealed_ReturnsTrueWhenFull() + { + // Use a very small maxBytes so the block seals quickly. + // A single record with subject "a", empty headers, and 32-byte payload is ~61 bytes. + // Set maxBytes to 50 so one write seals the block. + using var block = MsgBlock.Create(0, _testDir, maxBytes: 50); + + var payload = new byte[32]; + block.Write("a", ReadOnlyMemory.Empty, payload); + block.IsSealed.ShouldBeTrue(); + } + + // Go: filestore.go errNoRoom — cannot write to sealed block + [Fact] + public void Write_ThrowsWhenSealed() + { + using var block = MsgBlock.Create(0, _testDir, maxBytes: 50); + block.Write("a", ReadOnlyMemory.Empty, new byte[32]); + block.IsSealed.ShouldBeTrue(); + + Should.Throw(() => + block.Write("b", ReadOnlyMemory.Empty, "more"u8.ToArray())); + } + + // Go: dmap soft-delete — deleted message reads back as null + [Fact] + public void Delete_MarksSequenceAsDeleted() + { + using var block = MsgBlock.Create(0, _testDir, maxBytes: 1024 * 1024); + block.Write("a", ReadOnlyMemory.Empty, "data"u8.ToArray()); + + block.Delete(1).ShouldBeTrue(); + block.Read(1).ShouldBeNull(); + } + + // Go: dmap — MessageCount reflects only non-deleted messages + [Fact] + public void Delete_DecreasesMessageCount() + { + using var block = MsgBlock.Create(0, _testDir, maxBytes: 1024 * 1024); + block.Write("a", ReadOnlyMemory.Empty, "one"u8.ToArray()); + block.Write("b", ReadOnlyMemory.Empty, "two"u8.ToArray()); + block.Write("c", ReadOnlyMemory.Empty, "three"u8.ToArray()); + + block.MessageCount.ShouldBe(3UL); + block.DeletedCount.ShouldBe(0UL); + + block.Delete(2).ShouldBeTrue(); + + block.MessageCount.ShouldBe(2UL); + block.DeletedCount.ShouldBe(1UL); + + // Double delete returns false + block.Delete(2).ShouldBeFalse(); + } + + // Go: recovery path — rebuild index from existing block file + [Fact] + public void Recover_RebuildsIndexFromFile() + { + // Write messages and dispose + using (var block = MsgBlock.Create(0, _testDir, maxBytes: 1024 * 1024)) + { + block.Write("a.b", ReadOnlyMemory.Empty, "first"u8.ToArray()); + block.Write("c.d", ReadOnlyMemory.Empty, "second"u8.ToArray()); + block.Write("e.f", ReadOnlyMemory.Empty, "third"u8.ToArray()); + block.Flush(); + } + + // Recover and verify all messages readable + using var recovered = MsgBlock.Recover(0, _testDir); + recovered.MessageCount.ShouldBe(3UL); + recovered.FirstSequence.ShouldBe(1UL); + recovered.LastSequence.ShouldBe(3UL); + + var r1 = recovered.Read(1); + r1.ShouldNotBeNull(); + r1.Subject.ShouldBe("a.b"); + Encoding.UTF8.GetString(r1.Payload.Span).ShouldBe("first"); + + var r2 = recovered.Read(2); + r2.ShouldNotBeNull(); + r2.Subject.ShouldBe("c.d"); + + var r3 = recovered.Read(3); + r3.ShouldNotBeNull(); + r3.Subject.ShouldBe("e.f"); + } + + // Go: recovery with dmap — deleted records still show as null after recovery + [Fact] + public void Recover_PreservesDeletedState() + { + // Write messages, delete one, flush and dispose + using (var block = MsgBlock.Create(0, _testDir, maxBytes: 1024 * 1024)) + { + block.Write("a", ReadOnlyMemory.Empty, "one"u8.ToArray()); + block.Write("b", ReadOnlyMemory.Empty, "two"u8.ToArray()); + block.Write("c", ReadOnlyMemory.Empty, "three"u8.ToArray()); + block.Delete(2); + block.Flush(); + } + + // Recover — seq 2 should still be deleted + using var recovered = MsgBlock.Recover(0, _testDir); + recovered.MessageCount.ShouldBe(2UL); + recovered.DeletedCount.ShouldBe(1UL); + + recovered.Read(1).ShouldNotBeNull(); + recovered.Read(2).ShouldBeNull(); + recovered.Read(3).ShouldNotBeNull(); + } + + // Go: sync.RWMutex on msgBlock — concurrent reads during writes should not crash + [Fact] + public async Task ConcurrentReads_DuringWrite() + { + using var block = MsgBlock.Create(0, _testDir, maxBytes: 1024 * 1024); + + // Pre-populate some messages + for (var i = 0; i < 10; i++) + block.Write($"subj.{i}", ReadOnlyMemory.Empty, Encoding.UTF8.GetBytes($"msg-{i}")); + + // Run concurrent reads and writes + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); + var exceptions = new List(); + + var writerTask = Task.Run(() => + { + try + { + while (!cts.Token.IsCancellationRequested) + { + try + { + block.Write("concurrent", ReadOnlyMemory.Empty, "data"u8.ToArray()); + } + catch (InvalidOperationException) + { + // Block sealed — expected eventually + break; + } + } + } + catch (Exception ex) { lock (exceptions) { exceptions.Add(ex); } } + }); + + var readerTasks = Enumerable.Range(0, 4).Select(t => Task.Run(() => + { + try + { + while (!cts.Token.IsCancellationRequested) + { + for (ulong seq = 1; seq <= 10; seq++) + _ = block.Read(seq); + } + } + catch (Exception ex) { lock (exceptions) { exceptions.Add(ex); } } + })).ToArray(); + + await Task.WhenAll([writerTask, .. readerTasks]).WaitAsync(TimeSpan.FromSeconds(5)); + + exceptions.ShouldBeEmpty(); + } + + // Go: msgBlock first/last — custom firstSequence offsets sequence numbering + [Fact] + public void Write_WithCustomFirstSequence() + { + using var block = MsgBlock.Create(0, _testDir, maxBytes: 1024 * 1024, firstSequence: 100); + var s1 = block.Write("x", ReadOnlyMemory.Empty, "a"u8.ToArray()); + var s2 = block.Write("y", ReadOnlyMemory.Empty, "b"u8.ToArray()); + + s1.ShouldBe(100UL); + s2.ShouldBe(101UL); + block.FirstSequence.ShouldBe(100UL); + block.LastSequence.ShouldBe(101UL); + + var r = block.Read(100); + r.ShouldNotBeNull(); + r.Subject.ShouldBe("x"); + } +}