feat: add checksum validation on MsgBlock read path (Gap 1.5)
Add _lastChecksum field and LastChecksum property to MsgBlock tracking the XxHash64 checksum of the last written record (Go: msgBlock.lchk, filestore.go:2204). Capture the checksum from the encoded record trailer on every Write/WriteAt/WriteSkip call. Read-path validation happens naturally through the existing MessageRecord.Decode checksum check.
This commit is contained in:
@@ -43,6 +43,11 @@ public sealed class MsgBlock : IDisposable
|
|||||||
// Reference: golang/nats-server/server/filestore.go:236 (cache field)
|
// Reference: golang/nats-server/server/filestore.go:236 (cache field)
|
||||||
private Dictionary<ulong, MessageRecord>? _cache;
|
private Dictionary<ulong, MessageRecord>? _cache;
|
||||||
|
|
||||||
|
// Go: msgBlock.lchk — last written record checksum (XxHash64, 8 bytes).
|
||||||
|
// Tracked so callers can chain checksum verification across blocks.
|
||||||
|
// Reference: golang/nats-server/server/filestore.go:2204 (lchk field)
|
||||||
|
private byte[]? _lastChecksum;
|
||||||
|
|
||||||
private MsgBlock(FileStream file, int blockId, long maxBytes, ulong firstSequence)
|
private MsgBlock(FileStream file, int blockId, long maxBytes, ulong firstSequence)
|
||||||
{
|
{
|
||||||
_file = file;
|
_file = file;
|
||||||
@@ -137,6 +142,22 @@ public sealed class MsgBlock : IDisposable
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// The XxHash64 checksum of the last record written to this block (8 bytes), or null
|
||||||
|
/// if no records have been written yet. Updated after every <see cref="Write"/>,
|
||||||
|
/// <see cref="WriteAt"/>, or <see cref="WriteSkip"/> call.
|
||||||
|
/// Reference: golang/nats-server/server/filestore.go:2204 (msgBlock.lchk)
|
||||||
|
/// </summary>
|
||||||
|
public byte[]? LastChecksum
|
||||||
|
{
|
||||||
|
get
|
||||||
|
{
|
||||||
|
_lock.EnterReadLock();
|
||||||
|
try { return _lastChecksum is null ? null : (byte[])_lastChecksum.Clone(); }
|
||||||
|
finally { _lock.ExitReadLock(); }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Creates a new empty block file.
|
/// Creates a new empty block file.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
@@ -215,6 +236,11 @@ public sealed class MsgBlock : IDisposable
|
|||||||
_cache ??= new Dictionary<ulong, MessageRecord>();
|
_cache ??= new Dictionary<ulong, MessageRecord>();
|
||||||
_cache[sequence] = record;
|
_cache[sequence] = record;
|
||||||
|
|
||||||
|
// Go: msgBlock.lchk — capture checksum (last 8 bytes of encoded record).
|
||||||
|
// Reference: golang/nats-server/server/filestore.go:2204 (lchk update on write)
|
||||||
|
_lastChecksum ??= new byte[8];
|
||||||
|
encoded.AsSpan(^8..).CopyTo(_lastChecksum);
|
||||||
|
|
||||||
if (_totalWritten == 0)
|
if (_totalWritten == 0)
|
||||||
_firstSequence = sequence;
|
_firstSequence = sequence;
|
||||||
|
|
||||||
@@ -274,6 +300,11 @@ public sealed class MsgBlock : IDisposable
|
|||||||
_cache ??= new Dictionary<ulong, MessageRecord>();
|
_cache ??= new Dictionary<ulong, MessageRecord>();
|
||||||
_cache[sequence] = record;
|
_cache[sequence] = record;
|
||||||
|
|
||||||
|
// Go: msgBlock.lchk — capture checksum (last 8 bytes of encoded record).
|
||||||
|
// Reference: golang/nats-server/server/filestore.go:2204 (lchk update on write)
|
||||||
|
_lastChecksum ??= new byte[8];
|
||||||
|
encoded.AsSpan(^8..).CopyTo(_lastChecksum);
|
||||||
|
|
||||||
if (_totalWritten == 0)
|
if (_totalWritten == 0)
|
||||||
_firstSequence = sequence;
|
_firstSequence = sequence;
|
||||||
|
|
||||||
@@ -408,6 +439,11 @@ public sealed class MsgBlock : IDisposable
|
|||||||
_skipSequences.Add(sequence); // Track skip sequences separately for recovery
|
_skipSequences.Add(sequence); // Track skip sequences separately for recovery
|
||||||
// Note: intentionally NOT added to _cache since it is deleted.
|
// Note: intentionally NOT added to _cache since it is deleted.
|
||||||
|
|
||||||
|
// Go: msgBlock.lchk — capture checksum (last 8 bytes of encoded record).
|
||||||
|
// Reference: golang/nats-server/server/filestore.go:2204 (lchk update on write)
|
||||||
|
_lastChecksum ??= new byte[8];
|
||||||
|
encoded.AsSpan(^8..).CopyTo(_lastChecksum);
|
||||||
|
|
||||||
if (_totalWritten == 0)
|
if (_totalWritten == 0)
|
||||||
_firstSequence = sequence;
|
_firstSequence = sequence;
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,95 @@
|
|||||||
|
// Go ref: filestore.go:2204 (lastChecksum), filestore.go:8180 (validation in msgFromBufEx)
|
||||||
|
//
|
||||||
|
// Tests for per-block last-checksum tracking and read-path validation using XxHash64.
|
||||||
|
// The Go reference implementation tracks the last written checksum in msgBlock.lchk
|
||||||
|
// and validates each record's checksum during reads to detect storage corruption.
|
||||||
|
|
||||||
|
using NATS.Server.JetStream.Storage;
|
||||||
|
|
||||||
|
namespace NATS.Server.Tests.JetStream.Storage;
|
||||||
|
|
||||||
|
public sealed class FileStoreChecksumTests : IDisposable
|
||||||
|
{
|
||||||
|
private readonly DirectoryInfo _dir = Directory.CreateTempSubdirectory("checksum-");
|
||||||
|
|
||||||
|
public void Dispose() => _dir.Delete(recursive: true);
|
||||||
|
|
||||||
|
// Go ref: filestore.go:2204 (msgBlock.lchk — last checksum field)
|
||||||
|
[Fact]
|
||||||
|
public void MsgBlock_tracks_last_checksum()
|
||||||
|
{
|
||||||
|
// Arrange / Act
|
||||||
|
using var block = MsgBlock.Create(1, _dir.FullName, 1024 * 1024);
|
||||||
|
block.Write("test", ReadOnlyMemory<byte>.Empty, "hello"u8.ToArray());
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
block.LastChecksum.ShouldNotBeNull();
|
||||||
|
block.LastChecksum!.Length.ShouldBe(8); // XxHash64 = 8 bytes
|
||||||
|
}
|
||||||
|
|
||||||
|
// Go ref: filestore.go:8180 (msgFromBufEx checksum validation)
|
||||||
|
[Fact]
|
||||||
|
public void MsgBlock_validates_checksum_on_read()
|
||||||
|
{
|
||||||
|
// Arrange
|
||||||
|
using var block = MsgBlock.Create(1, _dir.FullName, 1024 * 1024);
|
||||||
|
block.Write("test", ReadOnlyMemory<byte>.Empty, "hello"u8.ToArray());
|
||||||
|
block.Flush();
|
||||||
|
block.ClearCache(); // force disk read
|
||||||
|
|
||||||
|
// Act — read should succeed with valid data
|
||||||
|
var record = block.Read(1);
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
record.ShouldNotBeNull();
|
||||||
|
record!.Subject.ShouldBe("test");
|
||||||
|
record.Payload.ToArray().ShouldBe("hello"u8.ToArray());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Go ref: filestore.go:8180 (checksum mismatch → error path)
|
||||||
|
[Fact]
|
||||||
|
public void MsgBlock_detects_corrupted_record_on_disk_read()
|
||||||
|
{
|
||||||
|
// Arrange — write a record, flush, clear cache so next read goes to disk
|
||||||
|
using var block = MsgBlock.Create(1, _dir.FullName, 1024 * 1024);
|
||||||
|
block.Write("test", ReadOnlyMemory<byte>.Empty, "hello"u8.ToArray());
|
||||||
|
block.Flush();
|
||||||
|
block.ClearCache();
|
||||||
|
|
||||||
|
// Corrupt a byte near the end of the block file (in the payload region)
|
||||||
|
var files = Directory.GetFiles(_dir.FullName, "*.blk");
|
||||||
|
files.Length.ShouldBe(1);
|
||||||
|
var bytes = File.ReadAllBytes(files[0]);
|
||||||
|
// Flip a bit in the payload area (10 bytes from end: past checksum + timestamp)
|
||||||
|
bytes[^10] ^= 0xFF;
|
||||||
|
File.WriteAllBytes(files[0], bytes);
|
||||||
|
|
||||||
|
// Act / Assert — Decode should throw on checksum mismatch
|
||||||
|
Should.Throw<InvalidDataException>(() => block.Read(1));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Go ref: filestore.go:2204 (lchk updated on each write)
|
||||||
|
[Fact]
|
||||||
|
public void MsgBlock_checksum_chain_across_writes()
|
||||||
|
{
|
||||||
|
// Arrange
|
||||||
|
using var block = MsgBlock.Create(1, _dir.FullName, 1024 * 1024);
|
||||||
|
|
||||||
|
// Act — write three records, capture checksum after each
|
||||||
|
block.Write("a", ReadOnlyMemory<byte>.Empty, "one"u8.ToArray());
|
||||||
|
var checksum1 = block.LastChecksum?.ToArray();
|
||||||
|
|
||||||
|
block.Write("b", ReadOnlyMemory<byte>.Empty, "two"u8.ToArray());
|
||||||
|
var checksum2 = block.LastChecksum?.ToArray();
|
||||||
|
|
||||||
|
block.Write("c", ReadOnlyMemory<byte>.Empty, "three"u8.ToArray());
|
||||||
|
var checksum3 = block.LastChecksum?.ToArray();
|
||||||
|
|
||||||
|
// Assert — each write produces a non-null checksum that changes
|
||||||
|
checksum1.ShouldNotBeNull();
|
||||||
|
checksum2.ShouldNotBeNull();
|
||||||
|
checksum3.ShouldNotBeNull();
|
||||||
|
checksum1.ShouldNotBe(checksum2!);
|
||||||
|
checksum2.ShouldNotBe(checksum3!);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user