feat(storage): add binary message record encoding (Go parity)

Add MessageRecord class with Encode/Decode for the binary wire format
used in JetStream file store blocks. Uses varint-encoded lengths,
XxHash64 checksums, and a flags byte for deletion markers.

Go reference: filestore.go:6720-6724, 8180-8250, 8770-8777

13 tests covering round-trip, headers, checksum validation, corruption
detection, varint encoding, deleted flag, empty/large payloads.
This commit is contained in:
Joseph Doherty
2026-02-24 12:12:15 -05:00
parent 2a240c6355
commit 17731e2af5
4 changed files with 436 additions and 0 deletions

View File

@@ -0,0 +1,232 @@
// Reference: golang/nats-server/server/filestore.go
// Go wire format: filestore.go:6720-6724 (writeMsgRecordLocked)
// Go decode: filestore.go:8180-8250 (msgFromBufEx)
// Go size calc: filestore.go:8770-8777 (fileStoreMsgSizeRaw)
// Go constants: filestore.go:1034-1038 (msgHdrSize, checksumSize, emptyRecordLen)
// Go bit flags: filestore.go:7972-7982 (ebit = 1 << 63)
//
// Binary message record format:
// [1:flags][varint:subj_len][N:subject][varint:hdr_len][M:headers][varint:payload_len][P:payload][8:sequence_LE][8:checksum]
//
// Flags byte: 0x80 = deleted (ebit in Go).
// Varint encoding: high-bit continuation (same as protobuf).
// Checksum: XxHash64 over all bytes before the checksum field.
using System.Buffers.Binary;
using System.IO.Hashing;
using System.Text;
namespace NATS.Server.JetStream.Storage;
/// <summary>
/// Binary message record encoder/decoder matching Go's filestore.go wire format.
/// Each record represents a single stored message in a JetStream file store block.
/// </summary>
public sealed class MessageRecord
{
/// <summary>Stream sequence number. Go: StoreMsg.seq</summary>
public ulong Sequence { get; init; }
/// <summary>NATS subject. Go: StoreMsg.subj</summary>
public string Subject { get; init; } = string.Empty;
/// <summary>Optional NATS message headers. Go: StoreMsg.hdr</summary>
public ReadOnlyMemory<byte> Headers { get; init; }
/// <summary>Message body payload. Go: StoreMsg.msg</summary>
public ReadOnlyMemory<byte> Payload { get; init; }
/// <summary>Wall-clock timestamp in Unix nanoseconds. Go: StoreMsg.ts</summary>
public long Timestamp { get; init; }
/// <summary>Whether this record is a deletion marker. Go: ebit (1 &lt;&lt; 63) on sequence.</summary>
public bool Deleted { get; init; }
// Wire format constants
private const byte DeletedFlag = 0x80;
private const int ChecksumSize = 8;
private const int SequenceSize = 8;
private const int TimestampSize = 8;
// Trailer: sequence(8) + timestamp(8) + checksum(8)
private const int TrailerSize = SequenceSize + TimestampSize + ChecksumSize;
/// <summary>
/// Encodes a <see cref="MessageRecord"/> to its binary wire format.
/// </summary>
/// <returns>The encoded byte array.</returns>
public static byte[] Encode(MessageRecord record)
{
var subjectBytes = Encoding.UTF8.GetBytes(record.Subject);
var headersSpan = record.Headers.Span;
var payloadSpan = record.Payload.Span;
// Calculate total size:
// flags(1) + varint(subj_len) + subject + varint(hdr_len) + headers
// + varint(payload_len) + payload + sequence(8) + timestamp(8) + checksum(8)
var size = 1
+ VarintSize((ulong)subjectBytes.Length) + subjectBytes.Length
+ VarintSize((ulong)headersSpan.Length) + headersSpan.Length
+ VarintSize((ulong)payloadSpan.Length) + payloadSpan.Length
+ TrailerSize;
var buffer = new byte[size];
var offset = 0;
// 1. Flags byte
buffer[offset++] = record.Deleted ? DeletedFlag : (byte)0;
// 2. Subject length (varint) + subject bytes
offset += WriteVarint(buffer.AsSpan(offset), (ulong)subjectBytes.Length);
subjectBytes.CopyTo(buffer.AsSpan(offset));
offset += subjectBytes.Length;
// 3. Headers length (varint) + headers bytes
offset += WriteVarint(buffer.AsSpan(offset), (ulong)headersSpan.Length);
headersSpan.CopyTo(buffer.AsSpan(offset));
offset += headersSpan.Length;
// 4. Payload length (varint) + payload bytes
offset += WriteVarint(buffer.AsSpan(offset), (ulong)payloadSpan.Length);
payloadSpan.CopyTo(buffer.AsSpan(offset));
offset += payloadSpan.Length;
// 5. Sequence (8 bytes, little-endian)
BinaryPrimitives.WriteUInt64LittleEndian(buffer.AsSpan(offset), record.Sequence);
offset += SequenceSize;
// 6. Timestamp (8 bytes, little-endian)
BinaryPrimitives.WriteInt64LittleEndian(buffer.AsSpan(offset), record.Timestamp);
offset += TimestampSize;
// 7. Checksum: XxHash64 over everything before the checksum field
var checksumInput = buffer.AsSpan(0, offset);
var checksum = XxHash64.HashToUInt64(checksumInput);
BinaryPrimitives.WriteUInt64LittleEndian(buffer.AsSpan(offset), checksum);
return buffer;
}
/// <summary>
/// Decodes a binary record and validates its checksum.
/// </summary>
/// <param name="data">The raw record bytes.</param>
/// <returns>The decoded <see cref="MessageRecord"/>.</returns>
/// <exception cref="InvalidDataException">Thrown when the record is too short or the checksum does not match.</exception>
public static MessageRecord Decode(ReadOnlySpan<byte> data)
{
// Minimum: flags(1) + varint(0)(1) + varint(0)(1) + varint(0)(1) + seq(8) + ts(8) + checksum(8)
if (data.Length < 1 + 3 + TrailerSize)
throw new InvalidDataException("Record too short.");
// Validate checksum first: XxHash64 over everything except the last 8 bytes.
var payloadRegion = data[..^ChecksumSize];
var expectedChecksum = BinaryPrimitives.ReadUInt64LittleEndian(data[^ChecksumSize..]);
var actualChecksum = XxHash64.HashToUInt64(payloadRegion);
if (expectedChecksum != actualChecksum)
throw new InvalidDataException("Checksum mismatch: record is corrupt.");
var offset = 0;
// 1. Flags
var flags = data[offset++];
var deleted = (flags & DeletedFlag) != 0;
// 2. Subject
var (subjectLen, subjectLenBytes) = ReadVarint(data[offset..]);
offset += subjectLenBytes;
var subject = Encoding.UTF8.GetString(data.Slice(offset, (int)subjectLen));
offset += (int)subjectLen;
// 3. Headers
var (headersLen, headersLenBytes) = ReadVarint(data[offset..]);
offset += headersLenBytes;
var headers = data.Slice(offset, (int)headersLen).ToArray();
offset += (int)headersLen;
// 4. Payload
var (payloadLen, payloadLenBytes) = ReadVarint(data[offset..]);
offset += payloadLenBytes;
var payload = data.Slice(offset, (int)payloadLen).ToArray();
offset += (int)payloadLen;
// 5. Sequence
var sequence = BinaryPrimitives.ReadUInt64LittleEndian(data[offset..]);
offset += SequenceSize;
// 6. Timestamp
var timestamp = BinaryPrimitives.ReadInt64LittleEndian(data[offset..]);
return new MessageRecord
{
Sequence = sequence,
Subject = subject,
Headers = headers,
Payload = payload,
Timestamp = timestamp,
Deleted = deleted,
};
}
/// <summary>
/// Writes a varint (protobuf-style high-bit continuation encoding) to the target span.
/// </summary>
/// <param name="buffer">The target buffer.</param>
/// <param name="value">The value to encode.</param>
/// <returns>The number of bytes written.</returns>
public static int WriteVarint(Span<byte> buffer, ulong value)
{
var i = 0;
while (value >= 0x80)
{
buffer[i++] = (byte)(value | 0x80);
value >>= 7;
}
buffer[i++] = (byte)value;
return i;
}
/// <summary>
/// Reads a varint (protobuf-style high-bit continuation encoding) from the source span.
/// </summary>
/// <param name="data">The source buffer.</param>
/// <returns>A tuple of (decoded value, number of bytes consumed).</returns>
public static (ulong Value, int BytesRead) ReadVarint(ReadOnlySpan<byte> data)
{
ulong result = 0;
var shift = 0;
var i = 0;
while (i < data.Length)
{
var b = data[i++];
result |= (ulong)(b & 0x7F) << shift;
if ((b & 0x80) == 0)
return (result, i);
shift += 7;
if (shift >= 64)
throw new InvalidDataException("Varint is too long.");
}
throw new InvalidDataException("Varint is truncated.");
}
/// <summary>
/// Returns the number of bytes needed to encode a varint.
/// </summary>
private static int VarintSize(ulong value)
{
var size = 1;
while (value >= 0x80)
{
size++;
value >>= 7;
}
return size;
}
}

View File

@@ -5,6 +5,7 @@
<ItemGroup>
<FrameworkReference Include="Microsoft.AspNetCore.App" />
<PackageReference Include="IronSnappy" />
<PackageReference Include="System.IO.Hashing" />
<PackageReference Include="NATS.NKeys" />
<PackageReference Include="BCrypt.Net-Next" />
</ItemGroup>