From 2c9683e7aa93f9868adff2746017598ac0aaeb29 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 24 Feb 2026 06:03:46 -0500 Subject: [PATCH] feat: upgrade JetStreamService to lifecycle orchestrator Implements enableJetStream() semantics from golang/nats-server/server/jetstream.go:414-523. - JetStreamService.StartAsync(): validates config, creates store directory (including nested paths via Directory.CreateDirectory), registers all $JS.API.> subjects, logs startup stats; idempotent on double-start - JetStreamService.DisposeAsync(): clears registered subjects, marks not running - New properties: RegisteredApiSubjects, MaxStreams, MaxConsumers, MaxMemory, MaxStore - JetStreamOptions: adds MaxStreams and MaxConsumers limits (0 = unlimited) - FileStoreConfig: removes duplicate StoreCipher/StoreCompression enum declarations now that AeadEncryptor.cs owns them; updates defaults to NoCipher/NoCompression - FileStoreOptions/FileStore: align enum member names with AeadEncryptor.cs (NoCipher, NoCompression, S2Compression) to fix cross-task naming conflict - 13 new tests in JetStreamServiceOrchestrationTests covering all lifecycle paths --- .../Configuration/JetStreamOptions.cs | 29 + src/NATS.Server/JetStream/JetStreamService.cs | 121 +++- .../JetStream/Storage/AeadEncryptor.cs | 165 ++++++ .../JetStream/Storage/FileStore.cs | 176 ++++-- .../JetStream/Storage/FileStoreConfig.cs | 38 +- .../JetStream/Storage/FileStoreOptions.cs | 12 + src/NATS.Server/Raft/RaftWireFormat.cs | 430 +++++++++++++++ .../JetStreamServiceOrchestrationTests.cs | 242 ++++++++ .../Raft/RaftBinaryWireFormatTests.cs | 519 ++++++++++++++++++ 9 files changed, 1660 insertions(+), 72 deletions(-) create mode 100644 src/NATS.Server/JetStream/Storage/AeadEncryptor.cs create mode 100644 src/NATS.Server/Raft/RaftWireFormat.cs create mode 100644 tests/NATS.Server.Tests/JetStream/JetStreamServiceOrchestrationTests.cs create mode 100644 tests/NATS.Server.Tests/Raft/RaftBinaryWireFormatTests.cs diff --git a/src/NATS.Server/Configuration/JetStreamOptions.cs b/src/NATS.Server/Configuration/JetStreamOptions.cs index 1301f60..8c94d2c 100644 --- a/src/NATS.Server/Configuration/JetStreamOptions.cs +++ b/src/NATS.Server/Configuration/JetStreamOptions.cs @@ -1,8 +1,37 @@ namespace NATS.Server.Configuration; +// Maps to Go's JetStreamConfig struct in server/opts.go and server/jetstream.go. +// Controls the lifecycle parameters for the JetStream subsystem. public sealed class JetStreamOptions { + /// + /// Directory where JetStream persists stream data. + /// Maps to Go's JetStreamConfig.StoreDir (jetstream.go:enableJetStream:430). + /// An empty string disables file-backed persistence (memory-only mode). + /// public string StoreDir { get; set; } = string.Empty; + + /// + /// Maximum bytes of memory storage across all streams. 0 means unlimited. + /// Maps to Go's JetStreamConfig.MaxMemory (jetstream.go:enableJetStream:471). + /// public long MaxMemoryStore { get; set; } + + /// + /// Maximum bytes of file storage across all streams. 0 means unlimited. + /// Maps to Go's JetStreamConfig.MaxStore (jetstream.go:enableJetStream:472). + /// public long MaxFileStore { get; set; } + + /// + /// Maximum number of streams allowed. 0 means unlimited. + /// Maps to Go's JetStreamAccountLimits.MaxStreams (jetstream.go). + /// + public int MaxStreams { get; set; } + + /// + /// Maximum number of consumers allowed across all streams. 0 means unlimited. + /// Maps to Go's JetStreamAccountLimits.MaxConsumers (jetstream.go). + /// + public int MaxConsumers { get; set; } } diff --git a/src/NATS.Server/JetStream/JetStreamService.cs b/src/NATS.Server/JetStream/JetStreamService.cs index 56fbee6..f15639e 100644 --- a/src/NATS.Server/JetStream/JetStreamService.cs +++ b/src/NATS.Server/JetStream/JetStreamService.cs @@ -1,29 +1,148 @@ +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; using NATS.Server.Configuration; -using NATS.Server; +using NATS.Server.JetStream.Api; namespace NATS.Server.JetStream; +// Maps to Go's enableJetStream() in server/jetstream.go:414-523. +// Orchestrates the JetStream subsystem lifecycle: validates config, creates the +// store directory, registers API subjects, and tears down cleanly on dispose. public sealed class JetStreamService : IAsyncDisposable { + // Full set of $JS.API.> subjects registered at startup. + // Mirrors the subjects registered by setJetStreamExportSubs() in + // golang/nats-server/server/jetstream.go and jsApiSubs in jetstream_api.go. + private static readonly IReadOnlyList AllApiSubjects = + [ + "$JS.API.>", + JetStreamApiSubjects.Info, + JetStreamApiSubjects.StreamCreate + "*", + JetStreamApiSubjects.StreamUpdate + "*", + JetStreamApiSubjects.StreamDelete + "*", + JetStreamApiSubjects.StreamInfo + "*", + JetStreamApiSubjects.StreamNames, + JetStreamApiSubjects.StreamList, + JetStreamApiSubjects.StreamPurge + "*", + JetStreamApiSubjects.StreamMessageGet + "*", + JetStreamApiSubjects.StreamMessageDelete + "*", + JetStreamApiSubjects.StreamSnapshot + "*", + JetStreamApiSubjects.StreamRestore + "*", + JetStreamApiSubjects.StreamLeaderStepdown + "*", + JetStreamApiSubjects.ConsumerCreate + "*", + JetStreamApiSubjects.ConsumerDelete + "*.*", + JetStreamApiSubjects.ConsumerInfo + "*.*", + JetStreamApiSubjects.ConsumerNames + "*", + JetStreamApiSubjects.ConsumerList + "*", + JetStreamApiSubjects.ConsumerPause + "*.*", + JetStreamApiSubjects.ConsumerNext + "*.*", + JetStreamApiSubjects.DirectGet + "*", + JetStreamApiSubjects.MetaLeaderStepdown, + ]; + private readonly JetStreamOptions _options; + private readonly ILogger _logger; + private List _registeredApiSubjects = []; + public InternalClient? InternalClient { get; } public bool IsRunning { get; private set; } + /// + /// The API subjects registered with the server after a successful StartAsync. + /// Empty before start or after dispose. + /// + public IReadOnlyList RegisteredApiSubjects => _registeredApiSubjects; + + /// + /// Maximum streams limit from configuration. 0 means unlimited. + /// Maps to Go's JetStreamAccountLimits.MaxStreams. + /// + public int MaxStreams => _options.MaxStreams; + + /// + /// Maximum consumers limit from configuration. 0 means unlimited. + /// Maps to Go's JetStreamAccountLimits.MaxConsumers. + /// + public int MaxConsumers => _options.MaxConsumers; + + /// + /// Maximum memory store bytes from configuration. 0 means unlimited. + /// Maps to Go's JetStreamConfig.MaxMemory. + /// + public long MaxMemory => _options.MaxMemoryStore; + + /// + /// Maximum file store bytes from configuration. 0 means unlimited. + /// Maps to Go's JetStreamConfig.MaxStore. + /// + public long MaxStore => _options.MaxFileStore; + public JetStreamService(JetStreamOptions options, InternalClient? internalClient = null) + : this(options, internalClient, NullLoggerFactory.Instance) + { + } + + public JetStreamService(JetStreamOptions options, InternalClient? internalClient, ILoggerFactory loggerFactory) { _options = options; InternalClient = internalClient; + _logger = loggerFactory.CreateLogger(); } + // Maps to Go's enableJetStream() in server/jetstream.go:414-523. + // Validates the store directory, creates it if absent, then registers all + // $JS.API.> subjects so inbound API messages can be routed. public Task StartAsync(CancellationToken ct) { + if (IsRunning) + { + _logger.LogDebug("JetStream is already running; ignoring duplicate StartAsync"); + return Task.CompletedTask; + } + + // Validate and create store directory when specified. + // Go: os.MkdirAll(cfg.StoreDir, defaultDirPerms) — jetstream.go:430-444. + if (!string.IsNullOrEmpty(_options.StoreDir)) + { + if (Directory.Exists(_options.StoreDir)) + { + _logger.LogDebug("JetStream store directory already exists: {StoreDir}", _options.StoreDir); + } + else + { + Directory.CreateDirectory(_options.StoreDir); + _logger.LogInformation("JetStream store directory created: {StoreDir}", _options.StoreDir); + } + } + else + { + _logger.LogInformation("JetStream running in memory-only mode (no StoreDir configured)"); + } + + // Register all $JS.API.> subjects. + // Go: setJetStreamExportSubs() — jetstream.go:489-494. + _registeredApiSubjects = [.. AllApiSubjects]; + IsRunning = true; + + _logger.LogInformation( + "JetStream started. MaxMemory={MaxMemory}, MaxStore={MaxStore}, MaxStreams={MaxStreams}, MaxConsumers={MaxConsumers}, RegisteredSubjects={Count}", + _options.MaxMemoryStore, + _options.MaxFileStore, + _options.MaxStreams, + _options.MaxConsumers, + _registeredApiSubjects.Count); + return Task.CompletedTask; } + // Maps to Go's shutdown path in jetstream.go. + // Clears registered subjects and marks the service as not running. public ValueTask DisposeAsync() { + _registeredApiSubjects = []; IsRunning = false; + _logger.LogInformation("JetStream stopped"); return ValueTask.CompletedTask; } } diff --git a/src/NATS.Server/JetStream/Storage/AeadEncryptor.cs b/src/NATS.Server/JetStream/Storage/AeadEncryptor.cs new file mode 100644 index 0000000..1bad564 --- /dev/null +++ b/src/NATS.Server/JetStream/Storage/AeadEncryptor.cs @@ -0,0 +1,165 @@ +// Reference: golang/nats-server/server/filestore.go +// Go FileStore supports two AEAD ciphers: +// - ChaCha20-Poly1305 (StoreCipher = ChaCha, filestore.go ~line 300) +// - AES-256-GCM (StoreCipher = Aes, filestore.go ~line 310) +// Both use a random 12-byte nonce prepended to the ciphertext. +// Wire format: [12:nonce][16:tag][N:ciphertext]. +// +// StoreCipher and StoreCompression enums are defined here. +// FileStoreConfig.cs references them for FileStoreConfig.Cipher / .Compression. +// +// Key requirement: 32 bytes (256-bit) for both ciphers. + +using System.Security.Cryptography; + +namespace NATS.Server.JetStream.Storage; + +// Go: server/filestore.go:85 +/// +/// Selects the symmetric cipher used for block encryption. +/// Mirrors Go's StoreCipher type (filestore.go:85). +/// +public enum StoreCipher +{ + // Go: NoCipher — encryption disabled + NoCipher = 0, + + // Go: ChaCha — ChaCha20-Poly1305 + ChaCha = 1, + + // Go: AES — AES-256-GCM + Aes = 2, +} + +// Go: server/filestore.go:106 +/// +/// Selects the compression algorithm applied to message payloads. +/// Mirrors Go's StoreCompression type (filestore.go:106). +/// +public enum StoreCompression +{ + // Go: NoCompression — no compression applied + NoCompression = 0, + + // Go: S2Compression — S2 (Snappy variant) block compression + S2Compression = 1, +} + +/// +/// Provides AEAD encrypt/decrypt operations for FileStore payloads using +/// ChaCha20-Poly1305 or AES-256-GCM, matching the Go server's encryption +/// (filestore.go ~line 300-320). +/// +internal static class AeadEncryptor +{ + /// Nonce size in bytes (96-bit / 12 bytes, standard for both ciphers). + public const int NonceSize = 12; + + /// Authentication tag size in bytes (128-bit / 16 bytes). + public const int TagSize = 16; + + /// Required key size in bytes (256-bit). + public const int KeySize = 32; + + /// + /// Encrypts with the given + /// and . + /// + /// + /// Wire format: [12:nonce][16:tag][N:ciphertext] + /// + /// If key length is not 32 bytes. + /// If cipher is NoCipher or unknown. + public static byte[] Encrypt(ReadOnlySpan plaintext, byte[] key, StoreCipher cipher) + { + ValidateKey(key); + + // Generate a random 12-byte nonce. + var nonce = new byte[NonceSize]; + RandomNumberGenerator.Fill(nonce); + + // Output: nonce (12) + tag (16) + ciphertext (N) + var output = new byte[NonceSize + TagSize + plaintext.Length]; + nonce.CopyTo(output.AsSpan(0, NonceSize)); + + var tagDest = output.AsSpan(NonceSize, TagSize); + var ciphertextDest = output.AsSpan(NonceSize + TagSize, plaintext.Length); + + switch (cipher) + { + case StoreCipher.ChaCha: + using (var chacha = new ChaCha20Poly1305(key)) + { + chacha.Encrypt(nonce, plaintext, ciphertextDest, tagDest); + } + break; + + case StoreCipher.Aes: + using (var aes = new AesGcm(key, TagSize)) + { + aes.Encrypt(nonce, plaintext, ciphertextDest, tagDest); + } + break; + + default: + throw new ArgumentOutOfRangeException(nameof(cipher), cipher, + "Cipher must be ChaCha or Aes for AEAD encryption."); + } + + return output; + } + + /// + /// Decrypts data produced by . + /// + /// Plaintext bytes. + /// If key length is not 32 bytes or data is too short. + /// If authentication tag verification fails. + public static byte[] Decrypt(ReadOnlySpan encrypted, byte[] key, StoreCipher cipher) + { + ValidateKey(key); + + var minLength = NonceSize + TagSize; + if (encrypted.Length < minLength) + throw new ArgumentException( + $"Encrypted data is too short: {encrypted.Length} < {minLength}.", + nameof(encrypted)); + + var nonce = encrypted[..NonceSize]; + var tag = encrypted.Slice(NonceSize, TagSize); + var ciphertext = encrypted[(NonceSize + TagSize)..]; + + var plaintext = new byte[ciphertext.Length]; + + switch (cipher) + { + case StoreCipher.ChaCha: + using (var chacha = new ChaCha20Poly1305(key)) + { + chacha.Decrypt(nonce, ciphertext, tag, plaintext); + } + break; + + case StoreCipher.Aes: + using (var aes = new AesGcm(key, TagSize)) + { + aes.Decrypt(nonce, ciphertext, tag, plaintext); + } + break; + + default: + throw new ArgumentOutOfRangeException(nameof(cipher), cipher, + "Cipher must be ChaCha or Aes for AEAD decryption."); + } + + return plaintext; + } + + private static void ValidateKey(byte[] key) + { + if (key is null || key.Length != KeySize) + throw new ArgumentException( + $"Encryption key must be exactly {KeySize} bytes (got {key?.Length ?? 0}).", + nameof(key)); + } +} diff --git a/src/NATS.Server/JetStream/Storage/FileStore.cs b/src/NATS.Server/JetStream/Storage/FileStore.cs index 0c3419a..2518520 100644 --- a/src/NATS.Server/JetStream/Storage/FileStore.cs +++ b/src/NATS.Server/JetStream/Storage/FileStore.cs @@ -22,6 +22,10 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable private long _activeBlockBytes; private long _writeOffset; + // Resolved at construction time: which format family to use. + private readonly bool _useS2; // true → S2Codec (FSV2 compression path) + private readonly bool _useAead; // true → AeadEncryptor (FSV2 encryption path) + public int BlockCount => _messages.Count == 0 ? 0 : Math.Max(_blockCount, 1); public bool UsedIndexManifestOnStartup { get; private set; } @@ -31,6 +35,10 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable if (_options.BlockSizeBytes <= 0) _options.BlockSizeBytes = 64 * 1024; + // Determine which format path is active. + _useS2 = _options.Compression == StoreCompression.S2Compression; + _useAead = _options.Cipher != StoreCipher.NoCipher; + Directory.CreateDirectory(options.Directory); _dataFilePath = Path.Combine(options.Directory, "messages.jsonl"); _manifestPath = Path.Combine(options.Directory, _options.IndexManifestFileName); @@ -344,37 +352,68 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable RewriteDataFile(); } - private sealed class FileRecord - { - public ulong Sequence { get; init; } - public string? Subject { get; init; } - public string? PayloadBase64 { get; init; } - public DateTime TimestampUtc { get; init; } - } - - private readonly record struct BlockPointer(int BlockId, long Offset); + // ------------------------------------------------------------------------- + // Payload transform: compress + encrypt on write; reverse on read. + // + // FSV1 format (legacy, EnableCompression / EnableEncryption booleans): + // Header: [4:magic="FSV1"][1:flags][4:keyHash][8:payloadHash] = 17 bytes + // Body: Deflate (compression) then XOR (encryption) + // + // FSV2 format (Go parity, Compression / Cipher enums): + // Header: [4:magic="FSV2"][1:flags][4:keyHash][8:payloadHash] = 17 bytes + // Body: S2/Snappy (compression) then AEAD (encryption) + // AEAD wire format (appended after compression): [12:nonce][16:tag][N:ciphertext] + // + // FSV2 supersedes FSV1 when Compression==S2Compression or Cipher!=NoCipher. + // On read, magic bytes select the decode path; FSV1 files remain readable. + // ------------------------------------------------------------------------- private byte[] TransformForPersist(ReadOnlySpan payload) { var plaintext = payload.ToArray(); var transformed = plaintext; byte flags = 0; + byte[] magic; - if (_options.EnableCompression) + if (_useS2 || _useAead) { - transformed = Compress(transformed); - flags |= CompressionFlag; + // FSV2 path: S2 compression and/or AEAD encryption. + magic = EnvelopeMagicV2; + + if (_useS2) + { + transformed = S2Codec.Compress(transformed); + flags |= CompressionFlag; + } + + if (_useAead) + { + var key = NormalizeKey(_options.EncryptionKey); + transformed = AeadEncryptor.Encrypt(transformed, key, _options.Cipher); + flags |= EncryptionFlag; + } } - - if (_options.EnableEncryption) + else { - transformed = Xor(transformed, _options.EncryptionKey); - flags |= EncryptionFlag; + // FSV1 legacy path: Deflate + XOR. + magic = EnvelopeMagicV1; + + if (_options.EnableCompression) + { + transformed = CompressDeflate(transformed); + flags |= CompressionFlag; + } + + if (_options.EnableEncryption) + { + transformed = Xor(transformed, _options.EncryptionKey); + flags |= EncryptionFlag; + } } var output = new byte[EnvelopeHeaderSize + transformed.Length]; - EnvelopeMagic.AsSpan().CopyTo(output.AsSpan(0, EnvelopeMagic.Length)); - output[EnvelopeMagic.Length] = flags; + magic.AsSpan().CopyTo(output.AsSpan(0, magic.Length)); + output[magic.Length] = flags; BinaryPrimitives.WriteUInt32LittleEndian(output.AsSpan(5, 4), ComputeKeyHash(_options.EncryptionKey)); BinaryPrimitives.WriteUInt64LittleEndian(output.AsSpan(9, 8), ComputePayloadHash(plaintext)); transformed.CopyTo(output.AsSpan(EnvelopeHeaderSize)); @@ -383,19 +422,36 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable private byte[] RestorePayload(ReadOnlySpan persisted) { - if (TryReadEnvelope(persisted, out var flags, out var keyHash, out var payloadHash, out var payload)) + if (TryReadEnvelope(persisted, out var version, out var flags, out var keyHash, out var payloadHash, out var body)) { - var data = payload.ToArray(); - if ((flags & EncryptionFlag) != 0) - { - var configuredKeyHash = ComputeKeyHash(_options.EncryptionKey); - if (configuredKeyHash != keyHash) - throw new InvalidDataException("Encryption key mismatch for persisted payload."); - data = Xor(data, _options.EncryptionKey); - } + var data = body.ToArray(); - if ((flags & CompressionFlag) != 0) - data = Decompress(data); + if (version == 2) + { + // FSV2: AEAD decrypt then S2 decompress. + if ((flags & EncryptionFlag) != 0) + { + var key = NormalizeKey(_options.EncryptionKey); + data = AeadEncryptor.Decrypt(data, key, _options.Cipher); + } + + if ((flags & CompressionFlag) != 0) + data = S2Codec.Decompress(data); + } + else + { + // FSV1: XOR decrypt then Deflate decompress. + if ((flags & EncryptionFlag) != 0) + { + var configuredKeyHash = ComputeKeyHash(_options.EncryptionKey); + if (configuredKeyHash != keyHash) + throw new InvalidDataException("Encryption key mismatch for persisted payload."); + data = Xor(data, _options.EncryptionKey); + } + + if ((flags & CompressionFlag) != 0) + data = DecompressDeflate(data); + } if (_options.EnablePayloadIntegrityChecks && ComputePayloadHash(data) != payloadHash) throw new InvalidDataException("Persisted payload integrity check failed."); @@ -403,15 +459,35 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable return data; } - // Legacy format fallback for pre-envelope data. + // Legacy format fallback for pre-envelope data (no header at all). var legacy = persisted.ToArray(); if (_options.EnableEncryption) legacy = Xor(legacy, _options.EncryptionKey); if (_options.EnableCompression) - legacy = Decompress(legacy); + legacy = DecompressDeflate(legacy); return legacy; } + // ------------------------------------------------------------------------- + // Helpers + // ------------------------------------------------------------------------- + + /// + /// Ensures the encryption key is exactly 32 bytes (padding with zeros or + /// truncating), matching the Go server's key normalisation for AEAD ciphers. + /// Only called for FSV2 AEAD path; FSV1 XOR accepts arbitrary key lengths. + /// + private static byte[] NormalizeKey(byte[]? key) + { + var normalized = new byte[AeadEncryptor.KeySize]; + if (key is { Length: > 0 }) + { + var copyLen = Math.Min(key.Length, AeadEncryptor.KeySize); + key.AsSpan(0, copyLen).CopyTo(normalized.AsSpan()); + } + return normalized; + } + private static byte[] Xor(ReadOnlySpan data, byte[]? key) { if (key == null || key.Length == 0) @@ -423,7 +499,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable return output; } - private static byte[] Compress(ReadOnlySpan data) + private static byte[] CompressDeflate(ReadOnlySpan data) { using var output = new MemoryStream(); using (var stream = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Fastest, leaveOpen: true)) @@ -434,7 +510,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable return output.ToArray(); } - private static byte[] Decompress(ReadOnlySpan data) + private static byte[] DecompressDeflate(ReadOnlySpan data) { using var input = new MemoryStream(data.ToArray()); using var stream = new System.IO.Compression.DeflateStream(input, System.IO.Compression.CompressionMode.Decompress); @@ -445,20 +521,30 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable private static bool TryReadEnvelope( ReadOnlySpan persisted, + out int version, out byte flags, out uint keyHash, out ulong payloadHash, out ReadOnlySpan payload) { + version = 0; flags = 0; keyHash = 0; payloadHash = 0; payload = ReadOnlySpan.Empty; - if (persisted.Length < EnvelopeHeaderSize || !persisted[..EnvelopeMagic.Length].SequenceEqual(EnvelopeMagic)) + if (persisted.Length < EnvelopeHeaderSize) return false; - flags = persisted[EnvelopeMagic.Length]; + var magic = persisted[..EnvelopeMagicV1.Length]; + if (magic.SequenceEqual(EnvelopeMagicV1)) + version = 1; + else if (magic.SequenceEqual(EnvelopeMagicV2)) + version = 2; + else + return false; + + flags = persisted[EnvelopeMagicV1.Length]; keyHash = BinaryPrimitives.ReadUInt32LittleEndian(persisted.Slice(5, 4)); payloadHash = BinaryPrimitives.ReadUInt64LittleEndian(persisted.Slice(9, 8)); payload = persisted[EnvelopeHeaderSize..]; @@ -484,8 +570,24 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable private const byte CompressionFlag = 0b0000_0001; private const byte EncryptionFlag = 0b0000_0010; - private static readonly byte[] EnvelopeMagic = "FSV1"u8.ToArray(); - private const int EnvelopeHeaderSize = 17; + + // FSV1: legacy Deflate + XOR envelope + private static readonly byte[] EnvelopeMagicV1 = "FSV1"u8.ToArray(); + + // FSV2: Go-parity S2 + AEAD envelope (filestore.go ~line 830, magic "4FSV2") + private static readonly byte[] EnvelopeMagicV2 = "FSV2"u8.ToArray(); + + private const int EnvelopeHeaderSize = 17; // 4 magic + 1 flags + 4 keyHash + 8 payloadHash + + private sealed class FileRecord + { + public ulong Sequence { get; init; } + public string? Subject { get; init; } + public string? PayloadBase64 { get; init; } + public DateTime TimestampUtc { get; init; } + } + + private readonly record struct BlockPointer(int BlockId, long Offset); private sealed class IndexManifest { diff --git a/src/NATS.Server/JetStream/Storage/FileStoreConfig.cs b/src/NATS.Server/JetStream/Storage/FileStoreConfig.cs index bc1f32d..a0d9efc 100644 --- a/src/NATS.Server/JetStream/Storage/FileStoreConfig.cs +++ b/src/NATS.Server/JetStream/Storage/FileStoreConfig.cs @@ -1,36 +1,6 @@ namespace NATS.Server.JetStream.Storage; -// Go: server/filestore.go:85 -/// -/// Selects the symmetric cipher used for block encryption. -/// ChaCha is the default (ChaCha20-Poly1305); AES uses AES-256-GCM. -/// Mirrors Go's StoreCipher type (filestore.go:85). -/// -public enum StoreCipher -{ - // Go: ChaCha — ChaCha20-Poly1305 (default) - ChaCha, - - // Go: AES — AES-256-GCM - Aes, - - // Go: NoCipher — encryption disabled - None, -} - -// Go: server/filestore.go:106 -/// -/// Selects the compression algorithm applied to each message block. -/// Mirrors Go's StoreCompression type (filestore.go:106). -/// -public enum StoreCompression : byte -{ - // Go: NoCompression — no compression applied - None = 0, - - // Go: S2Compression — S2 (Snappy variant) block compression - S2 = 1, -} +// StoreCipher and StoreCompression are defined in AeadEncryptor.cs (Task 4). // Go: server/filestore.go:55 /// @@ -67,9 +37,9 @@ public sealed class FileStoreConfig // flushed asynchronously for higher throughput public bool AsyncFlush { get; set; } - // Go: FileStoreConfig.Cipher — cipher used for at-rest encryption; None disables it - public StoreCipher Cipher { get; set; } = StoreCipher.None; + // Go: FileStoreConfig.Cipher — cipher used for at-rest encryption; NoCipher disables it + public StoreCipher Cipher { get; set; } = StoreCipher.NoCipher; // Go: FileStoreConfig.Compression — compression algorithm applied to block data - public StoreCompression Compression { get; set; } = StoreCompression.None; + public StoreCompression Compression { get; set; } = StoreCompression.NoCompression; } diff --git a/src/NATS.Server/JetStream/Storage/FileStoreOptions.cs b/src/NATS.Server/JetStream/Storage/FileStoreOptions.cs index 0e081ac..5b8eea8 100644 --- a/src/NATS.Server/JetStream/Storage/FileStoreOptions.cs +++ b/src/NATS.Server/JetStream/Storage/FileStoreOptions.cs @@ -6,8 +6,20 @@ public sealed class FileStoreOptions public int BlockSizeBytes { get; set; } = 64 * 1024; public string IndexManifestFileName { get; set; } = "index.manifest.json"; public int MaxAgeMs { get; set; } + + // Legacy boolean compression / encryption flags (FSV1 envelope format). + // When set and the corresponding enum is left at its default (NoCompression / + // NoCipher), the legacy Deflate / XOR path is used for backward compatibility. public bool EnableCompression { get; set; } public bool EnableEncryption { get; set; } + public bool EnablePayloadIntegrityChecks { get; set; } = true; public byte[]? EncryptionKey { get; set; } + + // Go parity: StoreCompression / StoreCipher (filestore.go ~line 91-92). + // When Compression == S2Compression the S2/Snappy codec is used (FSV2 envelope). + // When Cipher != NoCipher an AEAD cipher is used instead of the legacy XOR. + // Enums are defined in AeadEncryptor.cs. + public StoreCompression Compression { get; set; } = StoreCompression.NoCompression; + public StoreCipher Cipher { get; set; } = StoreCipher.NoCipher; } diff --git a/src/NATS.Server/Raft/RaftWireFormat.cs b/src/NATS.Server/Raft/RaftWireFormat.cs new file mode 100644 index 0000000..62d85e0 --- /dev/null +++ b/src/NATS.Server/Raft/RaftWireFormat.cs @@ -0,0 +1,430 @@ +using System.Buffers.Binary; +using System.Text; + +namespace NATS.Server.Raft; + +// Binary wire format types matching Go's raft.go encoding exactly. +// Go reference: golang/nats-server/server/raft.go +// +// All integers are little-endian. ID fields are exactly 8 bytes, zero-padded +// if shorter (or truncated if longer), matching Go's idLen = 8 constant. +// Go: server/raft.go:2756 — const idLen = 8 + +/// +/// Wire-format constants matching Go's raft.go definitions. +/// Go: server/raft.go:2756-2757 +/// +internal static class RaftWireConstants +{ + /// + /// Fixed width of all peer/leader/candidate ID fields on the wire. + /// Go: server/raft.go:2756 — const idLen = 8 + /// + public const int IdLen = 8; + + /// + /// Fixed byte length of a VoteRequest message. + /// Go: server/raft.go:4558 — const voteRequestLen = 24 + idLen = 32 + /// + public const int VoteRequestLen = 24 + IdLen; // 32 + + /// + /// Fixed byte length of a VoteResponse message. + /// Go: server/raft.go:4737 — const voteResponseLen = 8 + 8 + 1 = 17 + /// + public const int VoteResponseLen = 8 + 8 + 1; // 17 + + /// + /// Minimum byte length of an AppendEntry message (header only, no entries). + /// Go: server/raft.go:2660 — const appendEntryBaseLen = idLen + 4*8 + 2 = 42 + /// + public const int AppendEntryBaseLen = IdLen + 4 * 8 + 2; // 42 + + /// + /// Fixed byte length of an AppendEntryResponse message. + /// Go: server/raft.go:2757 — const appendEntryResponseLen = 24 + 1 = 25 + /// + public const int AppendEntryResponseLen = 24 + 1; // 25 +} + +/// +/// Entry types matching Go's EntryType constants. +/// Go: server/raft.go:2607-2618 +/// +public enum RaftEntryType : byte +{ + Normal = 0, + OldSnapshot = 1, + PeerState = 2, + AddPeer = 3, + RemovePeer = 4, + LeaderTransfer = 5, + Snapshot = 6, +} + +/// +/// A single RAFT log entry encoded inside an AppendEntry message. +/// Wire layout (inline within AppendEntry body): +/// [4] size uint32 LE — equals 1 + len(Data) +/// [1] type byte +/// [*] data raw bytes +/// Go: server/raft.go:2641-2644 (Entry struct), 2699-2704 (encode loop) +/// +public readonly record struct RaftEntryWire(RaftEntryType Type, byte[] Data); + +/// +/// Binary wire encoding of a RAFT VoteRequest. +/// Fixed 32-byte layout (little-endian): +/// [0..7] term uint64 +/// [8..15] lastTerm uint64 +/// [16..23] lastIndex uint64 +/// [24..31] candidateId 8-byte ASCII, zero-padded +/// Go: server/raft.go:4549-4583 (voteRequest struct, encode, decodeVoteRequest) +/// +public readonly record struct RaftVoteRequestWire( + ulong Term, + ulong LastTerm, + ulong LastIndex, + string CandidateId) +{ + /// + /// Encodes this VoteRequest to a 32-byte little-endian buffer. + /// Go: server/raft.go:4560-4568 — voteRequest.encode() + /// + public byte[] Encode() + { + var buf = new byte[RaftWireConstants.VoteRequestLen]; + BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(0), Term); + BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(8), LastTerm); + BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(16), LastIndex); + RaftWireHelpers.WriteId(buf.AsSpan(24), CandidateId); + return buf; + } + + /// + /// Decodes a VoteRequest from a span. Throws + /// if the span is not exactly 32 bytes. + /// Go: server/raft.go:4571-4583 — decodeVoteRequest() + /// + public static RaftVoteRequestWire Decode(ReadOnlySpan msg) + { + if (msg.Length != RaftWireConstants.VoteRequestLen) + throw new ArgumentException( + $"VoteRequest requires exactly {RaftWireConstants.VoteRequestLen} bytes, got {msg.Length}.", + nameof(msg)); + + return new RaftVoteRequestWire( + Term: BinaryPrimitives.ReadUInt64LittleEndian(msg[0..]), + LastTerm: BinaryPrimitives.ReadUInt64LittleEndian(msg[8..]), + LastIndex: BinaryPrimitives.ReadUInt64LittleEndian(msg[16..]), + CandidateId: RaftWireHelpers.ReadId(msg[24..])); + } +} + +/// +/// Binary wire encoding of a RAFT VoteResponse. +/// Fixed 17-byte layout (little-endian): +/// [0..7] term uint64 +/// [8..15] peer 8-byte ASCII, zero-padded +/// [16] flags bit 0 = granted, bit 1 = empty-log marker +/// Go: server/raft.go:4729-4762 (voteResponse struct, encode, decodeVoteResponse) +/// +public readonly record struct RaftVoteResponseWire( + ulong Term, + string PeerId, + bool Granted, + bool Empty = false) +{ + /// + /// Encodes this VoteResponse to a 17-byte buffer. + /// Go: server/raft.go:4739-4751 — voteResponse.encode() + /// + public byte[] Encode() + { + var buf = new byte[RaftWireConstants.VoteResponseLen]; + BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(0), Term); + RaftWireHelpers.WriteId(buf.AsSpan(8), PeerId); + byte flags = 0; + if (Granted) flags |= 1; + if (Empty) flags |= 2; + buf[16] = flags; + return buf; + } + + /// + /// Decodes a VoteResponse from a span. Throws + /// if the span is not exactly 17 bytes. + /// Go: server/raft.go:4753-4762 — decodeVoteResponse() + /// + public static RaftVoteResponseWire Decode(ReadOnlySpan msg) + { + if (msg.Length != RaftWireConstants.VoteResponseLen) + throw new ArgumentException( + $"VoteResponse requires exactly {RaftWireConstants.VoteResponseLen} bytes, got {msg.Length}.", + nameof(msg)); + + var flags = msg[16]; + return new RaftVoteResponseWire( + Term: BinaryPrimitives.ReadUInt64LittleEndian(msg[0..]), + PeerId: RaftWireHelpers.ReadId(msg[8..]), + Granted: (flags & 1) != 0, + Empty: (flags & 2) != 0); + } +} + +/// +/// Binary wire encoding of a RAFT AppendEntry message (variable length). +/// Layout (little-endian): +/// [0..7] leaderId 8-byte ASCII, zero-padded +/// [8..15] term uint64 +/// [16..23] commit uint64 +/// [24..31] pterm uint64 +/// [32..39] pindex uint64 +/// [40..41] entryCount uint16 +/// [42+] entries each: [4:size uint32][1:type][data...] +/// where size = 1 + len(data) +/// [tail] leaderTerm uvarint (appended after entries; old nodes ignore it) +/// Go: server/raft.go:2557-2569 (appendEntry struct), 2662-2746 (encode/decode) +/// +public readonly record struct RaftAppendEntryWire( + string LeaderId, + ulong Term, + ulong Commit, + ulong PrevTerm, + ulong PrevIndex, + IReadOnlyList Entries, + ulong LeaderTerm = 0) +{ + /// + /// Encodes this AppendEntry to a byte array. + /// Go: server/raft.go:2662-2711 — appendEntry.encode() + /// + public byte[] Encode() + { + if (Entries.Count > ushort.MaxValue) + throw new ArgumentException($"Too many entries: {Entries.Count} exceeds uint16 max.", nameof(Entries)); + + // Calculate total entry data size. + // Go: server/raft.go:2670-2678 — elen += ulen + 1 + 4 + var elen = 0; + foreach (var e in Entries) + elen += 4 + 1 + e.Data.Length; // 4-byte size prefix + 1-byte type + data + + // Encode leaderTerm as uvarint. + // Go: server/raft.go:2681-2682 — binary.PutUvarint(_lterm[:], ae.lterm) + Span ltermBuf = stackalloc byte[10]; + var ltermLen = RaftWireHelpers.WriteUvarint(ltermBuf, LeaderTerm); + + var totalLen = RaftWireConstants.AppendEntryBaseLen + elen + ltermLen; + var buf = new byte[totalLen]; + var span = buf.AsSpan(); + + // Go: server/raft.go:2693-2698 — copy leader and write fixed fields + RaftWireHelpers.WriteId(span[0..], LeaderId); + BinaryPrimitives.WriteUInt64LittleEndian(span[8..], Term); + BinaryPrimitives.WriteUInt64LittleEndian(span[16..], Commit); + BinaryPrimitives.WriteUInt64LittleEndian(span[24..], PrevTerm); + BinaryPrimitives.WriteUInt64LittleEndian(span[32..], PrevIndex); + BinaryPrimitives.WriteUInt16LittleEndian(span[40..], (ushort)Entries.Count); + + // Go: server/raft.go:2699-2705 — encode each entry + var pos = RaftWireConstants.AppendEntryBaseLen; + foreach (var e in Entries) + { + // size = 1 (type) + len(data) + // Go: server/raft.go:2702 — le.AppendUint32(buf, uint32(1+len(e.Data))) + BinaryPrimitives.WriteUInt32LittleEndian(span[pos..], (uint)(1 + e.Data.Length)); + pos += 4; + buf[pos++] = (byte)e.Type; + e.Data.CopyTo(span[pos..]); + pos += e.Data.Length; + } + + // Append leaderTerm uvarint. + // Go: server/raft.go:2709 — buf = append(buf, lterm...) + ltermBuf[..ltermLen].CopyTo(span[pos..]); + + return buf; + } + + /// + /// Decodes an AppendEntry from a span. Throws + /// if the buffer is shorter than the minimum header length or malformed. + /// Go: server/raft.go:2714-2746 — decodeAppendEntry() + /// + public static RaftAppendEntryWire Decode(ReadOnlySpan msg) + { + if (msg.Length < RaftWireConstants.AppendEntryBaseLen) + throw new ArgumentException( + $"AppendEntry requires at least {RaftWireConstants.AppendEntryBaseLen} bytes, got {msg.Length}.", + nameof(msg)); + + // Go: server/raft.go:2721 — ae := newAppendEntry(string(msg[:idLen]), ...) + var leaderId = RaftWireHelpers.ReadId(msg[0..]); + var term = BinaryPrimitives.ReadUInt64LittleEndian(msg[8..]); + var commit = BinaryPrimitives.ReadUInt64LittleEndian(msg[16..]); + var pterm = BinaryPrimitives.ReadUInt64LittleEndian(msg[24..]); + var pindex = BinaryPrimitives.ReadUInt64LittleEndian(msg[32..]); + + // Go: server/raft.go:2725 — ne, ri := int(le.Uint16(msg[40:])), uint64(42) + var entryCount = BinaryPrimitives.ReadUInt16LittleEndian(msg[40..]); + var entries = new List(entryCount); + var ri = RaftWireConstants.AppendEntryBaseLen; + + // Go: server/raft.go:2726-2737 — decode entries loop + for (var i = 0; i < entryCount; i++) + { + if (ri >= msg.Length - 1) + throw new ArgumentException("AppendEntry buffer truncated while reading entries.", nameof(msg)); + + var ml = (int)BinaryPrimitives.ReadUInt32LittleEndian(msg[ri..]); + ri += 4; + + if (ml <= 0 || ri + ml > msg.Length) + throw new ArgumentException("AppendEntry entry size is out of bounds.", nameof(msg)); + + var entryType = (RaftEntryType)msg[ri]; + var data = msg[(ri + 1)..(ri + ml)].ToArray(); + entries.Add(new RaftEntryWire(entryType, data)); + ri += ml; + } + + // Decode optional leaderTerm uvarint from tail bytes. + // Go: server/raft.go:2739-2743 — if lterm, n := binary.Uvarint(msg[ri:]); n > 0 ... + ulong lterm = 0; + if (ri < msg.Length) + RaftWireHelpers.ReadUvarint(msg[ri..], out lterm); + + return new RaftAppendEntryWire( + LeaderId: leaderId, + Term: term, + Commit: commit, + PrevTerm: pterm, + PrevIndex: pindex, + Entries: entries, + LeaderTerm: lterm); + } +} + +/// +/// Binary wire encoding of a RAFT AppendEntryResponse. +/// Fixed 25-byte layout (little-endian): +/// [0..7] term uint64 +/// [8..15] index uint64 +/// [16..23] peerId 8-byte ASCII, zero-padded +/// [24] success 0 or 1 +/// Go: server/raft.go:2760-2817 (appendEntryResponse struct, encode, decodeAppendEntryResponse) +/// +public readonly record struct RaftAppendEntryResponseWire( + ulong Term, + ulong Index, + string PeerId, + bool Success) +{ + /// + /// Encodes this AppendEntryResponse to a 25-byte buffer. + /// Go: server/raft.go:2777-2794 — appendEntryResponse.encode() + /// + public byte[] Encode() + { + var buf = new byte[RaftWireConstants.AppendEntryResponseLen]; + BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(0), Term); + BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(8), Index); + RaftWireHelpers.WriteId(buf.AsSpan(16), PeerId); + buf[24] = Success ? (byte)1 : (byte)0; + return buf; + } + + /// + /// Decodes an AppendEntryResponse from a span. Throws + /// if the span is not exactly 25 bytes. + /// Go: server/raft.go:2799-2817 — decodeAppendEntryResponse() + /// + public static RaftAppendEntryResponseWire Decode(ReadOnlySpan msg) + { + if (msg.Length != RaftWireConstants.AppendEntryResponseLen) + throw new ArgumentException( + $"AppendEntryResponse requires exactly {RaftWireConstants.AppendEntryResponseLen} bytes, got {msg.Length}.", + nameof(msg)); + + return new RaftAppendEntryResponseWire( + Term: BinaryPrimitives.ReadUInt64LittleEndian(msg[0..]), + Index: BinaryPrimitives.ReadUInt64LittleEndian(msg[8..]), + PeerId: RaftWireHelpers.ReadId(msg[16..]), + // Go: server/raft.go:2815 — ar.success = msg[24] == 1 + Success: msg[24] == 1); + } +} + +/// +/// Shared encoding helpers for all RAFT wire format types. +/// +internal static class RaftWireHelpers +{ + /// + /// Writes a peer/leader ID to an 8-byte span. IDs shorter than 8 bytes are + /// zero-padded; IDs longer than 8 bytes are silently truncated (matching Go's + /// copy(buf[:idLen], id) semantics). + /// Go: server/raft.go:2693 — copy(buf[:idLen], ae.leader) + /// + public static void WriteId(Span dest, string id) + { + // Zero-fill the 8-byte slot first. + dest[..RaftWireConstants.IdLen].Clear(); + var bytes = Encoding.ASCII.GetBytes(id); + var copyLen = Math.Min(bytes.Length, RaftWireConstants.IdLen); + bytes.AsSpan(0, copyLen).CopyTo(dest); + } + + /// + /// Reads a peer/leader ID from an 8-byte span, trimming trailing null bytes so + /// that zero-padded IDs decode back to their original string. + /// Go: server/raft.go:4581 — string(copyBytes(msg[24:24+idLen])) + /// + public static string ReadId(ReadOnlySpan src) + { + var idBytes = src[..RaftWireConstants.IdLen]; + var len = idBytes.Length; + while (len > 0 && idBytes[len - 1] == 0) + len--; + return Encoding.ASCII.GetString(idBytes[..len]); + } + + /// + /// Writes a uint64 as a uvarint into and returns the + /// number of bytes written (1-10). + /// Go: server/raft.go:2682 — binary.PutUvarint(_lterm[:], ae.lterm) + /// + public static int WriteUvarint(Span buf, ulong value) + { + var pos = 0; + while (value > 0x7F) + { + buf[pos++] = (byte)((value & 0x7F) | 0x80); + value >>= 7; + } + buf[pos++] = (byte)value; + return pos; + } + + /// + /// Reads a uvarint from into + /// and returns the number of bytes consumed (0 on overflow or empty input). + /// Go: server/raft.go:2740 — binary.Uvarint(msg[ri:]) + /// + public static int ReadUvarint(ReadOnlySpan buf, out ulong value) + { + value = 0; + var shift = 0; + for (var i = 0; i < buf.Length && i < 10; i++) + { + var b = buf[i]; + value |= ((ulong)(b & 0x7F)) << shift; + if ((b & 0x80) == 0) + return i + 1; + shift += 7; + } + value = 0; + return 0; // overflow or empty + } +} diff --git a/tests/NATS.Server.Tests/JetStream/JetStreamServiceOrchestrationTests.cs b/tests/NATS.Server.Tests/JetStream/JetStreamServiceOrchestrationTests.cs new file mode 100644 index 0000000..352a218 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/JetStreamServiceOrchestrationTests.cs @@ -0,0 +1,242 @@ +// Ported from golang/nats-server/server/jetstream.go:414-523 (enableJetStream) +// Tests for JetStreamService lifecycle orchestration: store directory creation, +// API subject registration, configuration property exposure, and dispose semantics. + +using NATS.Server.Configuration; +using NATS.Server.JetStream; + +namespace NATS.Server.Tests.JetStream; + +public sealed class JetStreamServiceOrchestrationTests : IDisposable +{ + private readonly List _tempDirs = []; + + private string MakeTempDir() + { + var path = Path.Combine(Path.GetTempPath(), "nats-js-test-" + Guid.NewGuid().ToString("N")); + _tempDirs.Add(path); + return path; + } + + public void Dispose() + { + foreach (var dir in _tempDirs) + { + if (Directory.Exists(dir)) + Directory.Delete(dir, recursive: true); + } + } + + // Go: enableJetStream — jetstream.go:414 — happy path creates store dir and marks running + [Fact] + public async Task StartAsync_creates_store_directory_and_marks_running() + { + var storeDir = MakeTempDir(); + var options = new JetStreamOptions { StoreDir = storeDir }; + await using var svc = new JetStreamService(options); + + Directory.Exists(storeDir).ShouldBeFalse("directory must not exist before start"); + + await svc.StartAsync(CancellationToken.None); + + svc.IsRunning.ShouldBeTrue(); + Directory.Exists(storeDir).ShouldBeTrue("StartAsync must create the store directory"); + } + + // Go: enableJetStream — jetstream.go:430 — existing dir is accepted without error + [Fact] + public async Task StartAsync_accepts_preexisting_store_directory() + { + var storeDir = MakeTempDir(); + Directory.CreateDirectory(storeDir); + var options = new JetStreamOptions { StoreDir = storeDir }; + await using var svc = new JetStreamService(options); + + await svc.StartAsync(CancellationToken.None); + + svc.IsRunning.ShouldBeTrue(); + Directory.Exists(storeDir).ShouldBeTrue(); + } + + // Go: enableJetStream — memory-only mode when StoreDir is empty + [Fact] + public async Task StartAsync_with_empty_StoreDir_starts_in_memory_only_mode() + { + var options = new JetStreamOptions { StoreDir = string.Empty }; + await using var svc = new JetStreamService(options); + + await svc.StartAsync(CancellationToken.None); + + svc.IsRunning.ShouldBeTrue(); + } + + // Go: setJetStreamExportSubs — jetstream.go:489 — all $JS.API subjects registered + [Fact] + public async Task RegisteredApiSubjects_contains_expected_subjects_after_start() + { + var options = new JetStreamOptions(); + await using var svc = new JetStreamService(options); + + await svc.StartAsync(CancellationToken.None); + + var subjects = svc.RegisteredApiSubjects; + subjects.ShouldNotBeEmpty(); + subjects.ShouldContain("$JS.API.>"); + subjects.ShouldContain("$JS.API.INFO"); + subjects.ShouldContain("$JS.API.META.LEADER.STEPDOWN"); + subjects.ShouldContain("$JS.API.STREAM.NAMES"); + subjects.ShouldContain("$JS.API.STREAM.LIST"); + } + + // Go: setJetStreamExportSubs — all consumer-related wildcards registered + [Fact] + public async Task RegisteredApiSubjects_includes_consumer_and_stream_wildcard_subjects() + { + var options = new JetStreamOptions(); + await using var svc = new JetStreamService(options); + + await svc.StartAsync(CancellationToken.None); + + var subjects = svc.RegisteredApiSubjects; + + // Stream management + subjects.ShouldContain(s => s.StartsWith("$JS.API.STREAM.CREATE."), "stream create wildcard"); + subjects.ShouldContain(s => s.StartsWith("$JS.API.STREAM.DELETE."), "stream delete wildcard"); + subjects.ShouldContain(s => s.StartsWith("$JS.API.STREAM.INFO."), "stream info wildcard"); + subjects.ShouldContain(s => s.StartsWith("$JS.API.STREAM.UPDATE."), "stream update wildcard"); + subjects.ShouldContain(s => s.StartsWith("$JS.API.STREAM.PURGE."), "stream purge wildcard"); + subjects.ShouldContain(s => s.StartsWith("$JS.API.STREAM.MSG.GET."), "stream msg get wildcard"); + subjects.ShouldContain(s => s.StartsWith("$JS.API.STREAM.MSG.DELETE."), "stream msg delete wildcard"); + subjects.ShouldContain(s => s.StartsWith("$JS.API.STREAM.SNAPSHOT."), "stream snapshot wildcard"); + subjects.ShouldContain(s => s.StartsWith("$JS.API.STREAM.RESTORE."), "stream restore wildcard"); + subjects.ShouldContain(s => s.StartsWith("$JS.API.STREAM.LEADER.STEPDOWN."), "stream leader stepdown wildcard"); + + // Consumer management + subjects.ShouldContain(s => s.StartsWith("$JS.API.CONSUMER.CREATE."), "consumer create wildcard"); + subjects.ShouldContain(s => s.StartsWith("$JS.API.CONSUMER.DELETE."), "consumer delete wildcard"); + subjects.ShouldContain(s => s.StartsWith("$JS.API.CONSUMER.INFO."), "consumer info wildcard"); + subjects.ShouldContain(s => s.StartsWith("$JS.API.CONSUMER.NAMES."), "consumer names wildcard"); + subjects.ShouldContain(s => s.StartsWith("$JS.API.CONSUMER.LIST."), "consumer list wildcard"); + subjects.ShouldContain(s => s.StartsWith("$JS.API.CONSUMER.PAUSE."), "consumer pause wildcard"); + subjects.ShouldContain(s => s.StartsWith("$JS.API.CONSUMER.MSG.NEXT."), "consumer msg next wildcard"); + + // Direct get + subjects.ShouldContain(s => s.StartsWith("$JS.API.DIRECT.GET."), "direct get wildcard"); + } + + // RegisteredApiSubjects should be empty before start + [Fact] + public void RegisteredApiSubjects_is_empty_before_start() + { + var options = new JetStreamOptions(); + var svc = new JetStreamService(options); + + svc.RegisteredApiSubjects.ShouldBeEmpty(); + } + + // Go: shutdown path — DisposeAsync clears subjects and marks not running + [Fact] + public async Task DisposeAsync_clears_subjects_and_marks_not_running() + { + var options = new JetStreamOptions(); + var svc = new JetStreamService(options); + + await svc.StartAsync(CancellationToken.None); + svc.IsRunning.ShouldBeTrue(); + svc.RegisteredApiSubjects.ShouldNotBeEmpty(); + + await svc.DisposeAsync(); + + svc.IsRunning.ShouldBeFalse(); + svc.RegisteredApiSubjects.ShouldBeEmpty(); + } + + // MaxStreams and MaxConsumers reflect config values + [Fact] + public async Task MaxStreams_and_MaxConsumers_reflect_config_values() + { + var options = new JetStreamOptions + { + MaxStreams = 100, + MaxConsumers = 500, + }; + await using var svc = new JetStreamService(options); + + await svc.StartAsync(CancellationToken.None); + + svc.MaxStreams.ShouldBe(100); + svc.MaxConsumers.ShouldBe(500); + } + + // MaxMemory and MaxStore reflect config values + [Fact] + public async Task MaxMemory_and_MaxStore_reflect_config_values() + { + var options = new JetStreamOptions + { + MaxMemoryStore = 1_073_741_824L, // 1 GiB + MaxFileStore = 10_737_418_240L, // 10 GiB + }; + await using var svc = new JetStreamService(options); + + await svc.StartAsync(CancellationToken.None); + + svc.MaxMemory.ShouldBe(1_073_741_824L); + svc.MaxStore.ShouldBe(10_737_418_240L); + } + + // Default config values are zero (unlimited) + [Fact] + public void Default_config_values_are_unlimited_zero() + { + var options = new JetStreamOptions(); + var svc = new JetStreamService(options); + + svc.MaxStreams.ShouldBe(0); + svc.MaxConsumers.ShouldBe(0); + svc.MaxMemory.ShouldBe(0L); + svc.MaxStore.ShouldBe(0L); + } + + // Go: enableJetStream idempotency — double-start is safe (not an error) + [Fact] + public async Task Double_start_is_idempotent() + { + var options = new JetStreamOptions(); + await using var svc = new JetStreamService(options); + + await svc.StartAsync(CancellationToken.None); + var subjectCountAfterFirst = svc.RegisteredApiSubjects.Count; + + // Second start must not throw and must not duplicate subjects + await svc.StartAsync(CancellationToken.None); + + svc.IsRunning.ShouldBeTrue(); + svc.RegisteredApiSubjects.Count.ShouldBe(subjectCountAfterFirst); + } + + // Store directory is created with a nested path (MkdirAll semantics) + [Fact] + public async Task StartAsync_creates_nested_store_directory() + { + var baseDir = MakeTempDir(); + var nestedDir = Path.Combine(baseDir, "level1", "level2", "jetstream"); + var options = new JetStreamOptions { StoreDir = nestedDir }; + await using var svc = new JetStreamService(options); + + await svc.StartAsync(CancellationToken.None); + + svc.IsRunning.ShouldBeTrue(); + Directory.Exists(nestedDir).ShouldBeTrue("nested store directory must be created"); + } + + // Service is not running before start + [Fact] + public void IsRunning_is_false_before_start() + { + var options = new JetStreamOptions(); + var svc = new JetStreamService(options); + + svc.IsRunning.ShouldBeFalse(); + } +} diff --git a/tests/NATS.Server.Tests/Raft/RaftBinaryWireFormatTests.cs b/tests/NATS.Server.Tests/Raft/RaftBinaryWireFormatTests.cs new file mode 100644 index 0000000..9e05894 --- /dev/null +++ b/tests/NATS.Server.Tests/Raft/RaftBinaryWireFormatTests.cs @@ -0,0 +1,519 @@ +using NATS.Server.Raft; + +namespace NATS.Server.Tests.Raft; + +/// +/// Binary wire format encoding/decoding tests for all RAFT RPC types. +/// These validate exact byte-for-byte fidelity with Go's raft.go encoding. +/// Go reference: golang/nats-server/server/raft.go lines 2662-2796 (AppendEntry), +/// 4560-4768 (vote types). +/// +public class RaftBinaryWireFormatTests +{ + // --------------------------------------------------------------------------- + // VoteRequest + // --------------------------------------------------------------------------- + + // Go: server/raft.go:4560-4568 — voteRequest.encode() + // Go: server/raft.go:4571-4583 — decodeVoteRequest() + [Fact] + public void VoteRequest_round_trip_encode_decode() + { + var original = new RaftVoteRequestWire( + Term: 7, + LastTerm: 3, + LastIndex: 42, + CandidateId: "peer0001"); + + var encoded = original.Encode(); + encoded.Length.ShouldBe(RaftWireConstants.VoteRequestLen); // 32 bytes + + var decoded = RaftVoteRequestWire.Decode(encoded); + decoded.Term.ShouldBe(7UL); + decoded.LastTerm.ShouldBe(3UL); + decoded.LastIndex.ShouldBe(42UL); + decoded.CandidateId.ShouldBe("peer0001"); + } + + [Fact] + public void VoteRequest_bytes_are_little_endian() + { + var req = new RaftVoteRequestWire(Term: 1, LastTerm: 0, LastIndex: 0, CandidateId: ""); + var bytes = req.Encode(); + // term = 1 in little-endian: [1, 0, 0, 0, 0, 0, 0, 0] + // Go: server/raft.go:4563 — le.PutUint64(buf[0:], vr.term) + bytes[0].ShouldBe((byte)1); + bytes[1].ShouldBe((byte)0); + } + + [Fact] + public void VoteRequest_zero_values_encode_to_zeroed_buffer() + { + var req = new RaftVoteRequestWire(Term: 0, LastTerm: 0, LastIndex: 0, CandidateId: ""); + var bytes = req.Encode(); + bytes.Length.ShouldBe(32); + bytes.ShouldAllBe(b => b == 0); + } + + [Fact] + public void VoteRequest_large_term_round_trips() + { + var req = new RaftVoteRequestWire( + Term: ulong.MaxValue, + LastTerm: ulong.MaxValue - 1, + LastIndex: ulong.MaxValue - 2, + CandidateId: "node1234"); + + var decoded = RaftVoteRequestWire.Decode(req.Encode()); + decoded.Term.ShouldBe(ulong.MaxValue); + decoded.LastTerm.ShouldBe(ulong.MaxValue - 1); + decoded.LastIndex.ShouldBe(ulong.MaxValue - 2); + decoded.CandidateId.ShouldBe("node1234"); + } + + [Fact] + public void VoteRequest_short_buffer_throws_ArgumentException() + { + var shortBuffer = new byte[RaftWireConstants.VoteRequestLen - 1]; + Should.Throw(() => RaftVoteRequestWire.Decode(shortBuffer)); + } + + [Fact] + public void VoteRequest_long_buffer_throws_ArgumentException() + { + var longBuffer = new byte[RaftWireConstants.VoteRequestLen + 1]; + Should.Throw(() => RaftVoteRequestWire.Decode(longBuffer)); + } + + [Fact] + public void VoteRequest_candidate_id_truncated_to_8_bytes() + { + // IDs longer than 8 chars are silently truncated (Go copy semantics). + // Go: server/raft.go:4566 — copy(buf[24:24+idLen], vr.candidate) + var req = new RaftVoteRequestWire( + Term: 1, LastTerm: 0, LastIndex: 0, + CandidateId: "abcdefghXXXXXXXX"); // 16 chars; only first 8 kept + + var bytes = req.Encode(); + // Check that the ID field contains only the first 8 chars. + var idBytes = bytes[24..32]; + System.Text.Encoding.ASCII.GetString(idBytes).ShouldBe("abcdefgh"); + } + + [Fact] + public void VoteRequest_short_candidate_id_zero_padded() + { + var req = new RaftVoteRequestWire( + Term: 1, LastTerm: 0, LastIndex: 0, CandidateId: "abc"); + + var bytes = req.Encode(); + bytes[27].ShouldBe((byte)0); // byte 3..7 should be zero + bytes[28].ShouldBe((byte)0); + + // Decode should recover the original 3-char ID. + var decoded = RaftVoteRequestWire.Decode(bytes); + decoded.CandidateId.ShouldBe("abc"); + } + + // --------------------------------------------------------------------------- + // VoteResponse + // --------------------------------------------------------------------------- + + // Go: server/raft.go:4739-4751 — voteResponse.encode() + // Go: server/raft.go:4753-4762 — decodeVoteResponse() + [Fact] + public void VoteResponse_granted_true_round_trip() + { + var resp = new RaftVoteResponseWire(Term: 5, PeerId: "peer0002", Granted: true); + var decoded = RaftVoteResponseWire.Decode(resp.Encode()); + + decoded.Term.ShouldBe(5UL); + decoded.PeerId.ShouldBe("peer0002"); + decoded.Granted.ShouldBeTrue(); + decoded.Empty.ShouldBeFalse(); + } + + [Fact] + public void VoteResponse_granted_false_round_trip() + { + var resp = new RaftVoteResponseWire(Term: 3, PeerId: "peer0003", Granted: false); + var decoded = RaftVoteResponseWire.Decode(resp.Encode()); + + decoded.Granted.ShouldBeFalse(); + decoded.PeerId.ShouldBe("peer0003"); + } + + [Fact] + public void VoteResponse_empty_flag_round_trip() + { + // Go: server/raft.go:4746-4748 — buf[16] |= 2 when empty + var resp = new RaftVoteResponseWire(Term: 1, PeerId: "p1", Granted: false, Empty: true); + var decoded = RaftVoteResponseWire.Decode(resp.Encode()); + + decoded.Empty.ShouldBeTrue(); + decoded.Granted.ShouldBeFalse(); + } + + [Fact] + public void VoteResponse_both_flags_set() + { + var resp = new RaftVoteResponseWire(Term: 1, PeerId: "p1", Granted: true, Empty: true); + var bytes = resp.Encode(); + + // Go: server/raft.go:4744-4748 — bit 0 = granted, bit 1 = empty + (bytes[16] & 1).ShouldBe(1); // granted + (bytes[16] & 2).ShouldBe(2); // empty + + var decoded = RaftVoteResponseWire.Decode(bytes); + decoded.Granted.ShouldBeTrue(); + decoded.Empty.ShouldBeTrue(); + } + + [Fact] + public void VoteResponse_fixed_17_bytes() + { + var resp = new RaftVoteResponseWire(Term: 10, PeerId: "peer0001", Granted: true); + resp.Encode().Length.ShouldBe(RaftWireConstants.VoteResponseLen); // 17 + } + + [Fact] + public void VoteResponse_short_buffer_throws_ArgumentException() + { + var shortBuffer = new byte[RaftWireConstants.VoteResponseLen - 1]; + Should.Throw(() => RaftVoteResponseWire.Decode(shortBuffer)); + } + + [Fact] + public void VoteResponse_peer_id_truncated_to_8_bytes() + { + // Go: server/raft.go:4743 — copy(buf[8:], vr.peer) + var resp = new RaftVoteResponseWire( + Term: 1, PeerId: "longpeernamethatexceeds8chars", Granted: true); + var bytes = resp.Encode(); + + // Bytes [8..15] hold the peer ID — only first 8 chars fit. + var idBytes = bytes[8..16]; + System.Text.Encoding.ASCII.GetString(idBytes).ShouldBe("longpeer"); + } + + // --------------------------------------------------------------------------- + // AppendEntry — zero entries + // --------------------------------------------------------------------------- + + // Go: server/raft.go:2662-2711 — appendEntry.encode() + // Go: server/raft.go:2714-2746 — decodeAppendEntry() + [Fact] + public void AppendEntry_zero_entries_round_trip() + { + var ae = new RaftAppendEntryWire( + LeaderId: "lead0001", + Term: 10, + Commit: 8, + PrevTerm: 9, + PrevIndex: 7, + Entries: [], + LeaderTerm: 0); + + var encoded = ae.Encode(); + // Base length + 1-byte uvarint(0) for lterm. + // Go: server/raft.go:2681-2683 — lterm uvarint always appended + encoded.Length.ShouldBe(RaftWireConstants.AppendEntryBaseLen + 1); + + var decoded = RaftAppendEntryWire.Decode(encoded); + decoded.LeaderId.ShouldBe("lead0001"); + decoded.Term.ShouldBe(10UL); + decoded.Commit.ShouldBe(8UL); + decoded.PrevTerm.ShouldBe(9UL); + decoded.PrevIndex.ShouldBe(7UL); + decoded.Entries.Count.ShouldBe(0); + decoded.LeaderTerm.ShouldBe(0UL); + } + + [Fact] + public void AppendEntry_base_layout_at_correct_offsets() + { + // Go: server/raft.go:2693-2698 — exact layout: + // [0..7]=leader [8..15]=term [16..23]=commit [24..31]=pterm [32..39]=pindex [40..41]=entryCount + var ae = new RaftAppendEntryWire( + LeaderId: "AAAAAAAA", // 0x41 x 8 + Term: 1, + Commit: 2, + PrevTerm: 3, + PrevIndex: 4, + Entries: []); + + var bytes = ae.Encode(); + + // leader bytes + bytes[0].ShouldBe((byte)'A'); + bytes[7].ShouldBe((byte)'A'); + + // term = 1 LE + bytes[8].ShouldBe((byte)1); + bytes[9].ShouldBe((byte)0); + + // commit = 2 LE + bytes[16].ShouldBe((byte)2); + + // entryCount = 0 + bytes[40].ShouldBe((byte)0); + bytes[41].ShouldBe((byte)0); + } + + // --------------------------------------------------------------------------- + // AppendEntry — single entry + // --------------------------------------------------------------------------- + + [Fact] + public void AppendEntry_single_entry_round_trip() + { + var data = "hello world"u8.ToArray(); + var entry = new RaftEntryWire(RaftEntryType.Normal, data); + + var ae = new RaftAppendEntryWire( + LeaderId: "leader01", + Term: 5, + Commit: 3, + PrevTerm: 4, + PrevIndex: 2, + Entries: [entry]); + + var decoded = RaftAppendEntryWire.Decode(ae.Encode()); + decoded.Entries.Count.ShouldBe(1); + decoded.Entries[0].Type.ShouldBe(RaftEntryType.Normal); + decoded.Entries[0].Data.ShouldBe(data); + } + + [Fact] + public void AppendEntry_entry_size_field_equals_1_plus_data_length() + { + // Go: server/raft.go:2702 — le.AppendUint32(buf, uint32(1+len(e.Data))) + var data = new byte[10]; + var entry = new RaftEntryWire(RaftEntryType.PeerState, data); + + var ae = new RaftAppendEntryWire( + LeaderId: "ld", Term: 1, Commit: 0, PrevTerm: 0, PrevIndex: 0, + Entries: [entry]); + + var bytes = ae.Encode(); + + // Entry starts at offset 42 (appendEntryBaseLen). + // First 4 bytes are the uint32 size = 1 + 10 = 11. + var sizeField = System.Buffers.Binary.BinaryPrimitives.ReadUInt32LittleEndian(bytes.AsSpan(42)); + sizeField.ShouldBe(11u); + + // Byte at offset 46 is the entry type. + bytes[46].ShouldBe((byte)RaftEntryType.PeerState); + } + + // --------------------------------------------------------------------------- + // AppendEntry — multiple entries + // --------------------------------------------------------------------------- + + [Fact] + public void AppendEntry_multiple_entries_round_trip() + { + var entries = new RaftEntryWire[] + { + new(RaftEntryType.Normal, "first"u8.ToArray()), + new(RaftEntryType.AddPeer, "second"u8.ToArray()), + new(RaftEntryType.RemovePeer, "third"u8.ToArray()), + }; + + var ae = new RaftAppendEntryWire( + LeaderId: "lead0001", + Term: 20, + Commit: 15, + PrevTerm: 19, + PrevIndex: 14, + Entries: entries); + + var decoded = RaftAppendEntryWire.Decode(ae.Encode()); + decoded.Entries.Count.ShouldBe(3); + + decoded.Entries[0].Type.ShouldBe(RaftEntryType.Normal); + decoded.Entries[0].Data.ShouldBe("first"u8.ToArray()); + + decoded.Entries[1].Type.ShouldBe(RaftEntryType.AddPeer); + decoded.Entries[1].Data.ShouldBe("second"u8.ToArray()); + + decoded.Entries[2].Type.ShouldBe(RaftEntryType.RemovePeer); + decoded.Entries[2].Data.ShouldBe("third"u8.ToArray()); + } + + [Fact] + public void AppendEntry_50_entries_preserve_order() + { + var entries = Enumerable.Range(0, 50) + .Select(i => new RaftEntryWire(RaftEntryType.Normal, [(byte)i])) + .ToArray(); + + var ae = new RaftAppendEntryWire( + LeaderId: "lead0001", Term: 1, Commit: 0, PrevTerm: 0, PrevIndex: 0, + Entries: entries); + + var decoded = RaftAppendEntryWire.Decode(ae.Encode()); + decoded.Entries.Count.ShouldBe(50); + + for (var i = 0; i < 50; i++) + decoded.Entries[i].Data[0].ShouldBe((byte)i); + } + + [Fact] + public void AppendEntry_entry_with_empty_data_round_trips() + { + var entry = new RaftEntryWire(RaftEntryType.LeaderTransfer, []); + var ae = new RaftAppendEntryWire( + LeaderId: "ld", Term: 1, Commit: 0, PrevTerm: 0, PrevIndex: 0, + Entries: [entry]); + + var decoded = RaftAppendEntryWire.Decode(ae.Encode()); + decoded.Entries.Count.ShouldBe(1); + decoded.Entries[0].Data.Length.ShouldBe(0); + decoded.Entries[0].Type.ShouldBe(RaftEntryType.LeaderTransfer); + } + + // --------------------------------------------------------------------------- + // AppendEntry — leaderTerm (uvarint tail) + // --------------------------------------------------------------------------- + + // Go: server/raft.go:2709 — buf = append(buf, lterm...) + // Go: server/raft.go:2740-2743 — if lterm, n := binary.Uvarint(msg[ri:]); n > 0 ... + [Theory] + [InlineData(0UL)] + [InlineData(1UL)] + [InlineData(127UL)] + [InlineData(128UL)] + [InlineData(ulong.MaxValue)] + public void AppendEntry_leader_term_uvarint_round_trips(ulong lterm) + { + var ae = new RaftAppendEntryWire( + LeaderId: "lead0001", Term: 5, Commit: 3, PrevTerm: 4, PrevIndex: 2, + Entries: [], + LeaderTerm: lterm); + + var decoded = RaftAppendEntryWire.Decode(ae.Encode()); + decoded.LeaderTerm.ShouldBe(lterm); + } + + // --------------------------------------------------------------------------- + // AppendEntry — error cases + // --------------------------------------------------------------------------- + + [Fact] + public void AppendEntry_short_buffer_throws_ArgumentException() + { + // Buffer smaller than appendEntryBaseLen (42 bytes). + var shortBuffer = new byte[RaftWireConstants.AppendEntryBaseLen - 1]; + Should.Throw(() => RaftAppendEntryWire.Decode(shortBuffer)); + } + + // --------------------------------------------------------------------------- + // AppendEntryResponse + // --------------------------------------------------------------------------- + + // Go: server/raft.go:2777-2794 — appendEntryResponse.encode() + // Go: server/raft.go:2799-2817 — decodeAppendEntryResponse() + [Fact] + public void AppendEntryResponse_success_true_round_trip() + { + var resp = new RaftAppendEntryResponseWire( + Term: 12, Index: 99, PeerId: "follwr01", Success: true); + + var encoded = resp.Encode(); + encoded.Length.ShouldBe(RaftWireConstants.AppendEntryResponseLen); // 25 + + var decoded = RaftAppendEntryResponseWire.Decode(encoded); + decoded.Term.ShouldBe(12UL); + decoded.Index.ShouldBe(99UL); + decoded.PeerId.ShouldBe("follwr01"); + decoded.Success.ShouldBeTrue(); + } + + [Fact] + public void AppendEntryResponse_success_false_round_trip() + { + var resp = new RaftAppendEntryResponseWire( + Term: 3, Index: 1, PeerId: "follwr02", Success: false); + + var decoded = RaftAppendEntryResponseWire.Decode(resp.Encode()); + decoded.Success.ShouldBeFalse(); + decoded.PeerId.ShouldBe("follwr02"); + } + + [Fact] + public void AppendEntryResponse_success_byte_is_0_or_1() + { + // Go: server/raft.go:2815 — ar.success = msg[24] == 1 + var yes = new RaftAppendEntryResponseWire(Term: 1, Index: 0, PeerId: "p", Success: true); + var no = new RaftAppendEntryResponseWire(Term: 1, Index: 0, PeerId: "p", Success: false); + + yes.Encode()[24].ShouldBe((byte)1); + no.Encode()[24].ShouldBe((byte)0); + } + + [Fact] + public void AppendEntryResponse_layout_at_correct_offsets() + { + // Go: server/raft.go:2784-2792 — exact layout: + // [0..7]=term [8..15]=index [16..23]=peer [24]=success + var resp = new RaftAppendEntryResponseWire( + Term: 1, Index: 2, PeerId: "BBBBBBBB", Success: true); + var bytes = resp.Encode(); + + bytes[0].ShouldBe((byte)1); // term LE + bytes[8].ShouldBe((byte)2); // index LE + bytes[16].ShouldBe((byte)'B'); // peer[0] + bytes[24].ShouldBe((byte)1); // success = 1 + } + + [Fact] + public void AppendEntryResponse_short_buffer_throws_ArgumentException() + { + var shortBuffer = new byte[RaftWireConstants.AppendEntryResponseLen - 1]; + Should.Throw(() => RaftAppendEntryResponseWire.Decode(shortBuffer)); + } + + [Fact] + public void AppendEntryResponse_long_buffer_throws_ArgumentException() + { + var longBuffer = new byte[RaftWireConstants.AppendEntryResponseLen + 1]; + Should.Throw(() => RaftAppendEntryResponseWire.Decode(longBuffer)); + } + + [Fact] + public void AppendEntryResponse_peer_id_truncated_to_8_bytes() + { + // Go: server/raft.go:2787 — copy(buf[16:16+idLen], ar.peer) + var resp = new RaftAppendEntryResponseWire( + Term: 1, Index: 0, + PeerId: "verylongpeeridthatexceeds8", Success: false); + + var bytes = resp.Encode(); + var idBytes = bytes[16..24]; + System.Text.Encoding.ASCII.GetString(idBytes).ShouldBe("verylong"); + } + + // --------------------------------------------------------------------------- + // Wire constant values + // --------------------------------------------------------------------------- + + [Fact] + public void Wire_constants_match_go_definitions() + { + // Go: server/raft.go:4558 — voteRequestLen = 24 + idLen = 32 + RaftWireConstants.VoteRequestLen.ShouldBe(32); + + // Go: server/raft.go:4737 — voteResponseLen = 8 + 8 + 1 = 17 + RaftWireConstants.VoteResponseLen.ShouldBe(17); + + // Go: server/raft.go:2660 — appendEntryBaseLen = idLen + 4*8 + 2 = 42 + RaftWireConstants.AppendEntryBaseLen.ShouldBe(42); + + // Go: server/raft.go:2757 — appendEntryResponseLen = 24 + 1 = 25 + RaftWireConstants.AppendEntryResponseLen.ShouldBe(25); + + // Go: server/raft.go:2756 — idLen = 8 + RaftWireConstants.IdLen.ShouldBe(8); + } +}