diff --git a/src/NATS.Server/JetStream/Storage/MsgBlock.cs b/src/NATS.Server/JetStream/Storage/MsgBlock.cs index e74cbee..78ed6b3 100644 --- a/src/NATS.Server/JetStream/Storage/MsgBlock.cs +++ b/src/NATS.Server/JetStream/Storage/MsgBlock.cs @@ -43,6 +43,11 @@ public sealed class MsgBlock : IDisposable // Reference: golang/nats-server/server/filestore.go:236 (cache field) private Dictionary? _cache; + // Go: msgBlock.lchk — last written record checksum (XxHash64, 8 bytes). + // Tracked so callers can chain checksum verification across blocks. + // Reference: golang/nats-server/server/filestore.go:2204 (lchk field) + private byte[]? _lastChecksum; + private MsgBlock(FileStream file, int blockId, long maxBytes, ulong firstSequence) { _file = file; @@ -137,6 +142,22 @@ public sealed class MsgBlock : IDisposable } } + /// + /// The XxHash64 checksum of the last record written to this block (8 bytes), or null + /// if no records have been written yet. Updated after every , + /// , or call. + /// Reference: golang/nats-server/server/filestore.go:2204 (msgBlock.lchk) + /// + public byte[]? LastChecksum + { + get + { + _lock.EnterReadLock(); + try { return _lastChecksum is null ? null : (byte[])_lastChecksum.Clone(); } + finally { _lock.ExitReadLock(); } + } + } + /// /// Creates a new empty block file. /// @@ -215,6 +236,11 @@ public sealed class MsgBlock : IDisposable _cache ??= new Dictionary(); _cache[sequence] = record; + // Go: msgBlock.lchk — capture checksum (last 8 bytes of encoded record). + // Reference: golang/nats-server/server/filestore.go:2204 (lchk update on write) + _lastChecksum ??= new byte[8]; + encoded.AsSpan(^8..).CopyTo(_lastChecksum); + if (_totalWritten == 0) _firstSequence = sequence; @@ -274,6 +300,11 @@ public sealed class MsgBlock : IDisposable _cache ??= new Dictionary(); _cache[sequence] = record; + // Go: msgBlock.lchk — capture checksum (last 8 bytes of encoded record). + // Reference: golang/nats-server/server/filestore.go:2204 (lchk update on write) + _lastChecksum ??= new byte[8]; + encoded.AsSpan(^8..).CopyTo(_lastChecksum); + if (_totalWritten == 0) _firstSequence = sequence; @@ -408,6 +439,11 @@ public sealed class MsgBlock : IDisposable _skipSequences.Add(sequence); // Track skip sequences separately for recovery // Note: intentionally NOT added to _cache since it is deleted. + // Go: msgBlock.lchk — capture checksum (last 8 bytes of encoded record). + // Reference: golang/nats-server/server/filestore.go:2204 (lchk update on write) + _lastChecksum ??= new byte[8]; + encoded.AsSpan(^8..).CopyTo(_lastChecksum); + if (_totalWritten == 0) _firstSequence = sequence; diff --git a/tests/NATS.Server.Tests/JetStream/Storage/FileStoreChecksumTests.cs b/tests/NATS.Server.Tests/JetStream/Storage/FileStoreChecksumTests.cs new file mode 100644 index 0000000..f8cbce6 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Storage/FileStoreChecksumTests.cs @@ -0,0 +1,95 @@ +// Go ref: filestore.go:2204 (lastChecksum), filestore.go:8180 (validation in msgFromBufEx) +// +// Tests for per-block last-checksum tracking and read-path validation using XxHash64. +// The Go reference implementation tracks the last written checksum in msgBlock.lchk +// and validates each record's checksum during reads to detect storage corruption. + +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.Tests.JetStream.Storage; + +public sealed class FileStoreChecksumTests : IDisposable +{ + private readonly DirectoryInfo _dir = Directory.CreateTempSubdirectory("checksum-"); + + public void Dispose() => _dir.Delete(recursive: true); + + // Go ref: filestore.go:2204 (msgBlock.lchk — last checksum field) + [Fact] + public void MsgBlock_tracks_last_checksum() + { + // Arrange / Act + using var block = MsgBlock.Create(1, _dir.FullName, 1024 * 1024); + block.Write("test", ReadOnlyMemory.Empty, "hello"u8.ToArray()); + + // Assert + block.LastChecksum.ShouldNotBeNull(); + block.LastChecksum!.Length.ShouldBe(8); // XxHash64 = 8 bytes + } + + // Go ref: filestore.go:8180 (msgFromBufEx checksum validation) + [Fact] + public void MsgBlock_validates_checksum_on_read() + { + // Arrange + using var block = MsgBlock.Create(1, _dir.FullName, 1024 * 1024); + block.Write("test", ReadOnlyMemory.Empty, "hello"u8.ToArray()); + block.Flush(); + block.ClearCache(); // force disk read + + // Act — read should succeed with valid data + var record = block.Read(1); + + // Assert + record.ShouldNotBeNull(); + record!.Subject.ShouldBe("test"); + record.Payload.ToArray().ShouldBe("hello"u8.ToArray()); + } + + // Go ref: filestore.go:8180 (checksum mismatch → error path) + [Fact] + public void MsgBlock_detects_corrupted_record_on_disk_read() + { + // Arrange — write a record, flush, clear cache so next read goes to disk + using var block = MsgBlock.Create(1, _dir.FullName, 1024 * 1024); + block.Write("test", ReadOnlyMemory.Empty, "hello"u8.ToArray()); + block.Flush(); + block.ClearCache(); + + // Corrupt a byte near the end of the block file (in the payload region) + var files = Directory.GetFiles(_dir.FullName, "*.blk"); + files.Length.ShouldBe(1); + var bytes = File.ReadAllBytes(files[0]); + // Flip a bit in the payload area (10 bytes from end: past checksum + timestamp) + bytes[^10] ^= 0xFF; + File.WriteAllBytes(files[0], bytes); + + // Act / Assert — Decode should throw on checksum mismatch + Should.Throw(() => block.Read(1)); + } + + // Go ref: filestore.go:2204 (lchk updated on each write) + [Fact] + public void MsgBlock_checksum_chain_across_writes() + { + // Arrange + using var block = MsgBlock.Create(1, _dir.FullName, 1024 * 1024); + + // Act — write three records, capture checksum after each + block.Write("a", ReadOnlyMemory.Empty, "one"u8.ToArray()); + var checksum1 = block.LastChecksum?.ToArray(); + + block.Write("b", ReadOnlyMemory.Empty, "two"u8.ToArray()); + var checksum2 = block.LastChecksum?.ToArray(); + + block.Write("c", ReadOnlyMemory.Empty, "three"u8.ToArray()); + var checksum3 = block.LastChecksum?.ToArray(); + + // Assert — each write produces a non-null checksum that changes + checksum1.ShouldNotBeNull(); + checksum2.ShouldNotBeNull(); + checksum3.ShouldNotBeNull(); + checksum1.ShouldNotBe(checksum2!); + checksum2.ShouldNotBe(checksum3!); + } +}