diff --git a/Directory.Packages.props b/Directory.Packages.props index 1949ded..1dcd8b5 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -26,6 +26,9 @@ + + + diff --git a/src/NATS.Server/JetStream/Storage/MessageRecord.cs b/src/NATS.Server/JetStream/Storage/MessageRecord.cs new file mode 100644 index 0000000..0f9b2f5 --- /dev/null +++ b/src/NATS.Server/JetStream/Storage/MessageRecord.cs @@ -0,0 +1,232 @@ +// Reference: golang/nats-server/server/filestore.go +// Go wire format: filestore.go:6720-6724 (writeMsgRecordLocked) +// Go decode: filestore.go:8180-8250 (msgFromBufEx) +// Go size calc: filestore.go:8770-8777 (fileStoreMsgSizeRaw) +// Go constants: filestore.go:1034-1038 (msgHdrSize, checksumSize, emptyRecordLen) +// Go bit flags: filestore.go:7972-7982 (ebit = 1 << 63) +// +// Binary message record format: +// [1:flags][varint:subj_len][N:subject][varint:hdr_len][M:headers][varint:payload_len][P:payload][8:sequence_LE][8:checksum] +// +// Flags byte: 0x80 = deleted (ebit in Go). +// Varint encoding: high-bit continuation (same as protobuf). +// Checksum: XxHash64 over all bytes before the checksum field. + +using System.Buffers.Binary; +using System.IO.Hashing; +using System.Text; + +namespace NATS.Server.JetStream.Storage; + +/// +/// Binary message record encoder/decoder matching Go's filestore.go wire format. +/// Each record represents a single stored message in a JetStream file store block. +/// +public sealed class MessageRecord +{ + /// Stream sequence number. Go: StoreMsg.seq + public ulong Sequence { get; init; } + + /// NATS subject. Go: StoreMsg.subj + public string Subject { get; init; } = string.Empty; + + /// Optional NATS message headers. Go: StoreMsg.hdr + public ReadOnlyMemory Headers { get; init; } + + /// Message body payload. Go: StoreMsg.msg + public ReadOnlyMemory Payload { get; init; } + + /// Wall-clock timestamp in Unix nanoseconds. Go: StoreMsg.ts + public long Timestamp { get; init; } + + /// Whether this record is a deletion marker. Go: ebit (1 << 63) on sequence. + public bool Deleted { get; init; } + + // Wire format constants + private const byte DeletedFlag = 0x80; + private const int ChecksumSize = 8; + private const int SequenceSize = 8; + private const int TimestampSize = 8; + // Trailer: sequence(8) + timestamp(8) + checksum(8) + private const int TrailerSize = SequenceSize + TimestampSize + ChecksumSize; + + /// + /// Encodes a to its binary wire format. + /// + /// The encoded byte array. + public static byte[] Encode(MessageRecord record) + { + var subjectBytes = Encoding.UTF8.GetBytes(record.Subject); + var headersSpan = record.Headers.Span; + var payloadSpan = record.Payload.Span; + + // Calculate total size: + // flags(1) + varint(subj_len) + subject + varint(hdr_len) + headers + // + varint(payload_len) + payload + sequence(8) + timestamp(8) + checksum(8) + var size = 1 + + VarintSize((ulong)subjectBytes.Length) + subjectBytes.Length + + VarintSize((ulong)headersSpan.Length) + headersSpan.Length + + VarintSize((ulong)payloadSpan.Length) + payloadSpan.Length + + TrailerSize; + + var buffer = new byte[size]; + var offset = 0; + + // 1. Flags byte + buffer[offset++] = record.Deleted ? DeletedFlag : (byte)0; + + // 2. Subject length (varint) + subject bytes + offset += WriteVarint(buffer.AsSpan(offset), (ulong)subjectBytes.Length); + subjectBytes.CopyTo(buffer.AsSpan(offset)); + offset += subjectBytes.Length; + + // 3. Headers length (varint) + headers bytes + offset += WriteVarint(buffer.AsSpan(offset), (ulong)headersSpan.Length); + headersSpan.CopyTo(buffer.AsSpan(offset)); + offset += headersSpan.Length; + + // 4. Payload length (varint) + payload bytes + offset += WriteVarint(buffer.AsSpan(offset), (ulong)payloadSpan.Length); + payloadSpan.CopyTo(buffer.AsSpan(offset)); + offset += payloadSpan.Length; + + // 5. Sequence (8 bytes, little-endian) + BinaryPrimitives.WriteUInt64LittleEndian(buffer.AsSpan(offset), record.Sequence); + offset += SequenceSize; + + // 6. Timestamp (8 bytes, little-endian) + BinaryPrimitives.WriteInt64LittleEndian(buffer.AsSpan(offset), record.Timestamp); + offset += TimestampSize; + + // 7. Checksum: XxHash64 over everything before the checksum field + var checksumInput = buffer.AsSpan(0, offset); + var checksum = XxHash64.HashToUInt64(checksumInput); + BinaryPrimitives.WriteUInt64LittleEndian(buffer.AsSpan(offset), checksum); + + return buffer; + } + + /// + /// Decodes a binary record and validates its checksum. + /// + /// The raw record bytes. + /// The decoded . + /// Thrown when the record is too short or the checksum does not match. + public static MessageRecord Decode(ReadOnlySpan data) + { + // Minimum: flags(1) + varint(0)(1) + varint(0)(1) + varint(0)(1) + seq(8) + ts(8) + checksum(8) + if (data.Length < 1 + 3 + TrailerSize) + throw new InvalidDataException("Record too short."); + + // Validate checksum first: XxHash64 over everything except the last 8 bytes. + var payloadRegion = data[..^ChecksumSize]; + var expectedChecksum = BinaryPrimitives.ReadUInt64LittleEndian(data[^ChecksumSize..]); + var actualChecksum = XxHash64.HashToUInt64(payloadRegion); + + if (expectedChecksum != actualChecksum) + throw new InvalidDataException("Checksum mismatch: record is corrupt."); + + var offset = 0; + + // 1. Flags + var flags = data[offset++]; + var deleted = (flags & DeletedFlag) != 0; + + // 2. Subject + var (subjectLen, subjectLenBytes) = ReadVarint(data[offset..]); + offset += subjectLenBytes; + var subject = Encoding.UTF8.GetString(data.Slice(offset, (int)subjectLen)); + offset += (int)subjectLen; + + // 3. Headers + var (headersLen, headersLenBytes) = ReadVarint(data[offset..]); + offset += headersLenBytes; + var headers = data.Slice(offset, (int)headersLen).ToArray(); + offset += (int)headersLen; + + // 4. Payload + var (payloadLen, payloadLenBytes) = ReadVarint(data[offset..]); + offset += payloadLenBytes; + var payload = data.Slice(offset, (int)payloadLen).ToArray(); + offset += (int)payloadLen; + + // 5. Sequence + var sequence = BinaryPrimitives.ReadUInt64LittleEndian(data[offset..]); + offset += SequenceSize; + + // 6. Timestamp + var timestamp = BinaryPrimitives.ReadInt64LittleEndian(data[offset..]); + + return new MessageRecord + { + Sequence = sequence, + Subject = subject, + Headers = headers, + Payload = payload, + Timestamp = timestamp, + Deleted = deleted, + }; + } + + /// + /// Writes a varint (protobuf-style high-bit continuation encoding) to the target span. + /// + /// The target buffer. + /// The value to encode. + /// The number of bytes written. + public static int WriteVarint(Span buffer, ulong value) + { + var i = 0; + while (value >= 0x80) + { + buffer[i++] = (byte)(value | 0x80); + value >>= 7; + } + + buffer[i++] = (byte)value; + return i; + } + + /// + /// Reads a varint (protobuf-style high-bit continuation encoding) from the source span. + /// + /// The source buffer. + /// A tuple of (decoded value, number of bytes consumed). + public static (ulong Value, int BytesRead) ReadVarint(ReadOnlySpan data) + { + ulong result = 0; + var shift = 0; + var i = 0; + + while (i < data.Length) + { + var b = data[i++]; + result |= (ulong)(b & 0x7F) << shift; + + if ((b & 0x80) == 0) + return (result, i); + + shift += 7; + + if (shift >= 64) + throw new InvalidDataException("Varint is too long."); + } + + throw new InvalidDataException("Varint is truncated."); + } + + /// + /// Returns the number of bytes needed to encode a varint. + /// + private static int VarintSize(ulong value) + { + var size = 1; + while (value >= 0x80) + { + size++; + value >>= 7; + } + + return size; + } +} diff --git a/src/NATS.Server/NATS.Server.csproj b/src/NATS.Server/NATS.Server.csproj index 11bedd2..8b5edc5 100644 --- a/src/NATS.Server/NATS.Server.csproj +++ b/src/NATS.Server/NATS.Server.csproj @@ -5,6 +5,7 @@ + diff --git a/tests/NATS.Server.Tests/JetStream/Storage/MessageRecordTests.cs b/tests/NATS.Server.Tests/JetStream/Storage/MessageRecordTests.cs new file mode 100644 index 0000000..3c0798f --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Storage/MessageRecordTests.cs @@ -0,0 +1,200 @@ +// Reference: golang/nats-server/server/filestore.go +// Go wire format: filestore.go:6720-6724 (writeMsgRecordLocked) +// Go decode: filestore.go:8180-8250 (msgFromBufEx) +// Go size calc: filestore.go:8770-8777 (fileStoreMsgSizeRaw) +// +// These tests verify the .NET MessageRecord binary encoder/decoder that matches +// the Go message block record format: +// [1:flags][varint:subj_len][N:subject][varint:hdr_len][M:headers][varint:payload_len][P:payload][8:sequence_LE][8:checksum] + +using NATS.Server.JetStream.Storage; +using System.Text; + +namespace NATS.Server.Tests.JetStream.Storage; + +public sealed class MessageRecordTests +{ + // Go: writeMsgRecordLocked / msgFromBufEx — basic round-trip + [Fact] + public void RoundTrip_SimpleMessage() + { + var record = new MessageRecord + { + Sequence = 42, + Subject = "foo.bar", + Headers = ReadOnlyMemory.Empty, + Payload = Encoding.UTF8.GetBytes("hello world"), + Timestamp = 1_700_000_000_000_000_000L, + Deleted = false, + }; + + var encoded = MessageRecord.Encode(record); + var decoded = MessageRecord.Decode(encoded); + + decoded.Sequence.ShouldBe(record.Sequence); + decoded.Subject.ShouldBe(record.Subject); + decoded.Headers.Length.ShouldBe(0); + decoded.Payload.ToArray().ShouldBe(Encoding.UTF8.GetBytes("hello world")); + decoded.Timestamp.ShouldBe(record.Timestamp); + decoded.Deleted.ShouldBe(false); + } + + // Go: writeMsgRecordLocked with headers — hdr_len(4) hdr present in record + [Fact] + public void RoundTrip_WithHeaders() + { + var headers = "NATS/1.0\r\nX-Test: value\r\n\r\n"u8.ToArray(); + var record = new MessageRecord + { + Sequence = 99, + Subject = "test.headers", + Headers = headers, + Payload = Encoding.UTF8.GetBytes("payload with headers"), + Timestamp = 1_700_000_000_000_000_000L, + Deleted = false, + }; + + var encoded = MessageRecord.Encode(record); + var decoded = MessageRecord.Decode(encoded); + + decoded.Sequence.ShouldBe(99UL); + decoded.Subject.ShouldBe("test.headers"); + decoded.Headers.ToArray().ShouldBe(headers); + decoded.Payload.ToArray().ShouldBe(Encoding.UTF8.GetBytes("payload with headers")); + decoded.Timestamp.ShouldBe(record.Timestamp); + } + + // Verify that the last 8 bytes of the encoded record contain a nonzero XxHash64 checksum. + [Fact] + public void Encode_SetsChecksumInTrailer() + { + var record = new MessageRecord + { + Sequence = 1, + Subject = "checksum.test", + Headers = ReadOnlyMemory.Empty, + Payload = Encoding.UTF8.GetBytes("data"), + Timestamp = 0, + Deleted = false, + }; + + var encoded = MessageRecord.Encode(record); + + // Last 8 bytes are the checksum — should be nonzero for any non-trivial message. + var checksumBytes = encoded.AsSpan(encoded.Length - 8); + var checksum = BitConverter.ToUInt64(checksumBytes); + checksum.ShouldNotBe(0UL); + } + + // Flip a byte in the encoded data and verify decode throws InvalidDataException. + [Fact] + public void Decode_DetectsCorruptChecksum() + { + var record = new MessageRecord + { + Sequence = 7, + Subject = "corrupt", + Headers = ReadOnlyMemory.Empty, + Payload = Encoding.UTF8.GetBytes("will be corrupted"), + Timestamp = 0, + Deleted = false, + }; + + var encoded = MessageRecord.Encode(record); + + // Flip a byte in the payload area (not the checksum itself). + var corrupted = encoded.ToArray(); + corrupted[corrupted.Length / 2] ^= 0xFF; + + Should.Throw(() => MessageRecord.Decode(corrupted)); + } + + // Go: varint encoding matches protobuf convention — high-bit continuation. + [Theory] + [InlineData(0UL)] + [InlineData(1UL)] + [InlineData(127UL)] + [InlineData(128UL)] + [InlineData(16383UL)] + [InlineData(16384UL)] + public void Varint_RoundTrip(ulong value) + { + Span buf = stackalloc byte[10]; + var written = MessageRecord.WriteVarint(buf, value); + written.ShouldBeGreaterThan(0); + + var (decoded, bytesRead) = MessageRecord.ReadVarint(buf); + decoded.ShouldBe(value); + bytesRead.ShouldBe(written); + } + + // Go: ebit (1 << 63) marks deleted/erased messages in the sequence field. + [Fact] + public void RoundTrip_DeletedFlag() + { + var record = new MessageRecord + { + Sequence = 100, + Subject = "deleted.msg", + Headers = ReadOnlyMemory.Empty, + Payload = ReadOnlyMemory.Empty, + Timestamp = 0, + Deleted = true, + }; + + var encoded = MessageRecord.Encode(record); + var decoded = MessageRecord.Decode(encoded); + + decoded.Deleted.ShouldBe(true); + decoded.Sequence.ShouldBe(100UL); + decoded.Subject.ShouldBe("deleted.msg"); + } + + // Edge case: empty payload should encode and decode cleanly. + [Fact] + public void RoundTrip_EmptyPayload() + { + var record = new MessageRecord + { + Sequence = 1, + Subject = "empty", + Headers = ReadOnlyMemory.Empty, + Payload = ReadOnlyMemory.Empty, + Timestamp = 0, + Deleted = false, + }; + + var encoded = MessageRecord.Encode(record); + var decoded = MessageRecord.Decode(encoded); + + decoded.Subject.ShouldBe("empty"); + decoded.Payload.Length.ShouldBe(0); + decoded.Headers.Length.ShouldBe(0); + } + + // Verify 64KB payload works (large payload stress test). + [Fact] + public void RoundTrip_LargePayload() + { + var payload = new byte[64 * 1024]; + Random.Shared.NextBytes(payload); + + var record = new MessageRecord + { + Sequence = 999_999, + Subject = "large.payload.test", + Headers = ReadOnlyMemory.Empty, + Payload = payload, + Timestamp = long.MaxValue, + Deleted = false, + }; + + var encoded = MessageRecord.Encode(record); + var decoded = MessageRecord.Decode(encoded); + + decoded.Sequence.ShouldBe(999_999UL); + decoded.Subject.ShouldBe("large.payload.test"); + decoded.Payload.ToArray().ShouldBe(payload); + decoded.Timestamp.ShouldBe(long.MaxValue); + } +}