diff --git a/src/NATS.Server/Configuration/JetStreamOptions.cs b/src/NATS.Server/Configuration/JetStreamOptions.cs
index 1301f60..8c94d2c 100644
--- a/src/NATS.Server/Configuration/JetStreamOptions.cs
+++ b/src/NATS.Server/Configuration/JetStreamOptions.cs
@@ -1,8 +1,37 @@
namespace NATS.Server.Configuration;
+// Maps to Go's JetStreamConfig struct in server/opts.go and server/jetstream.go.
+// Controls the lifecycle parameters for the JetStream subsystem.
public sealed class JetStreamOptions
{
+ ///
+ /// Directory where JetStream persists stream data.
+ /// Maps to Go's JetStreamConfig.StoreDir (jetstream.go:enableJetStream:430).
+ /// An empty string disables file-backed persistence (memory-only mode).
+ ///
public string StoreDir { get; set; } = string.Empty;
+
+ ///
+ /// Maximum bytes of memory storage across all streams. 0 means unlimited.
+ /// Maps to Go's JetStreamConfig.MaxMemory (jetstream.go:enableJetStream:471).
+ ///
public long MaxMemoryStore { get; set; }
+
+ ///
+ /// Maximum bytes of file storage across all streams. 0 means unlimited.
+ /// Maps to Go's JetStreamConfig.MaxStore (jetstream.go:enableJetStream:472).
+ ///
public long MaxFileStore { get; set; }
+
+ ///
+ /// Maximum number of streams allowed. 0 means unlimited.
+ /// Maps to Go's JetStreamAccountLimits.MaxStreams (jetstream.go).
+ ///
+ public int MaxStreams { get; set; }
+
+ ///
+ /// Maximum number of consumers allowed across all streams. 0 means unlimited.
+ /// Maps to Go's JetStreamAccountLimits.MaxConsumers (jetstream.go).
+ ///
+ public int MaxConsumers { get; set; }
}
diff --git a/src/NATS.Server/JetStream/JetStreamService.cs b/src/NATS.Server/JetStream/JetStreamService.cs
index 56fbee6..f15639e 100644
--- a/src/NATS.Server/JetStream/JetStreamService.cs
+++ b/src/NATS.Server/JetStream/JetStreamService.cs
@@ -1,29 +1,148 @@
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server.Configuration;
-using NATS.Server;
+using NATS.Server.JetStream.Api;
namespace NATS.Server.JetStream;
+// Maps to Go's enableJetStream() in server/jetstream.go:414-523.
+// Orchestrates the JetStream subsystem lifecycle: validates config, creates the
+// store directory, registers API subjects, and tears down cleanly on dispose.
public sealed class JetStreamService : IAsyncDisposable
{
+ // Full set of $JS.API.> subjects registered at startup.
+ // Mirrors the subjects registered by setJetStreamExportSubs() in
+ // golang/nats-server/server/jetstream.go and jsApiSubs in jetstream_api.go.
+ private static readonly IReadOnlyList AllApiSubjects =
+ [
+ "$JS.API.>",
+ JetStreamApiSubjects.Info,
+ JetStreamApiSubjects.StreamCreate + "*",
+ JetStreamApiSubjects.StreamUpdate + "*",
+ JetStreamApiSubjects.StreamDelete + "*",
+ JetStreamApiSubjects.StreamInfo + "*",
+ JetStreamApiSubjects.StreamNames,
+ JetStreamApiSubjects.StreamList,
+ JetStreamApiSubjects.StreamPurge + "*",
+ JetStreamApiSubjects.StreamMessageGet + "*",
+ JetStreamApiSubjects.StreamMessageDelete + "*",
+ JetStreamApiSubjects.StreamSnapshot + "*",
+ JetStreamApiSubjects.StreamRestore + "*",
+ JetStreamApiSubjects.StreamLeaderStepdown + "*",
+ JetStreamApiSubjects.ConsumerCreate + "*",
+ JetStreamApiSubjects.ConsumerDelete + "*.*",
+ JetStreamApiSubjects.ConsumerInfo + "*.*",
+ JetStreamApiSubjects.ConsumerNames + "*",
+ JetStreamApiSubjects.ConsumerList + "*",
+ JetStreamApiSubjects.ConsumerPause + "*.*",
+ JetStreamApiSubjects.ConsumerNext + "*.*",
+ JetStreamApiSubjects.DirectGet + "*",
+ JetStreamApiSubjects.MetaLeaderStepdown,
+ ];
+
private readonly JetStreamOptions _options;
+ private readonly ILogger _logger;
+ private List _registeredApiSubjects = [];
+
public InternalClient? InternalClient { get; }
public bool IsRunning { get; private set; }
+ ///
+ /// The API subjects registered with the server after a successful StartAsync.
+ /// Empty before start or after dispose.
+ ///
+ public IReadOnlyList RegisteredApiSubjects => _registeredApiSubjects;
+
+ ///
+ /// Maximum streams limit from configuration. 0 means unlimited.
+ /// Maps to Go's JetStreamAccountLimits.MaxStreams.
+ ///
+ public int MaxStreams => _options.MaxStreams;
+
+ ///
+ /// Maximum consumers limit from configuration. 0 means unlimited.
+ /// Maps to Go's JetStreamAccountLimits.MaxConsumers.
+ ///
+ public int MaxConsumers => _options.MaxConsumers;
+
+ ///
+ /// Maximum memory store bytes from configuration. 0 means unlimited.
+ /// Maps to Go's JetStreamConfig.MaxMemory.
+ ///
+ public long MaxMemory => _options.MaxMemoryStore;
+
+ ///
+ /// Maximum file store bytes from configuration. 0 means unlimited.
+ /// Maps to Go's JetStreamConfig.MaxStore.
+ ///
+ public long MaxStore => _options.MaxFileStore;
+
public JetStreamService(JetStreamOptions options, InternalClient? internalClient = null)
+ : this(options, internalClient, NullLoggerFactory.Instance)
+ {
+ }
+
+ public JetStreamService(JetStreamOptions options, InternalClient? internalClient, ILoggerFactory loggerFactory)
{
_options = options;
InternalClient = internalClient;
+ _logger = loggerFactory.CreateLogger();
}
+ // Maps to Go's enableJetStream() in server/jetstream.go:414-523.
+ // Validates the store directory, creates it if absent, then registers all
+ // $JS.API.> subjects so inbound API messages can be routed.
public Task StartAsync(CancellationToken ct)
{
+ if (IsRunning)
+ {
+ _logger.LogDebug("JetStream is already running; ignoring duplicate StartAsync");
+ return Task.CompletedTask;
+ }
+
+ // Validate and create store directory when specified.
+ // Go: os.MkdirAll(cfg.StoreDir, defaultDirPerms) — jetstream.go:430-444.
+ if (!string.IsNullOrEmpty(_options.StoreDir))
+ {
+ if (Directory.Exists(_options.StoreDir))
+ {
+ _logger.LogDebug("JetStream store directory already exists: {StoreDir}", _options.StoreDir);
+ }
+ else
+ {
+ Directory.CreateDirectory(_options.StoreDir);
+ _logger.LogInformation("JetStream store directory created: {StoreDir}", _options.StoreDir);
+ }
+ }
+ else
+ {
+ _logger.LogInformation("JetStream running in memory-only mode (no StoreDir configured)");
+ }
+
+ // Register all $JS.API.> subjects.
+ // Go: setJetStreamExportSubs() — jetstream.go:489-494.
+ _registeredApiSubjects = [.. AllApiSubjects];
+
IsRunning = true;
+
+ _logger.LogInformation(
+ "JetStream started. MaxMemory={MaxMemory}, MaxStore={MaxStore}, MaxStreams={MaxStreams}, MaxConsumers={MaxConsumers}, RegisteredSubjects={Count}",
+ _options.MaxMemoryStore,
+ _options.MaxFileStore,
+ _options.MaxStreams,
+ _options.MaxConsumers,
+ _registeredApiSubjects.Count);
+
return Task.CompletedTask;
}
+ // Maps to Go's shutdown path in jetstream.go.
+ // Clears registered subjects and marks the service as not running.
public ValueTask DisposeAsync()
{
+ _registeredApiSubjects = [];
IsRunning = false;
+ _logger.LogInformation("JetStream stopped");
return ValueTask.CompletedTask;
}
}
diff --git a/src/NATS.Server/JetStream/Storage/AeadEncryptor.cs b/src/NATS.Server/JetStream/Storage/AeadEncryptor.cs
new file mode 100644
index 0000000..1bad564
--- /dev/null
+++ b/src/NATS.Server/JetStream/Storage/AeadEncryptor.cs
@@ -0,0 +1,165 @@
+// Reference: golang/nats-server/server/filestore.go
+// Go FileStore supports two AEAD ciphers:
+// - ChaCha20-Poly1305 (StoreCipher = ChaCha, filestore.go ~line 300)
+// - AES-256-GCM (StoreCipher = Aes, filestore.go ~line 310)
+// Both use a random 12-byte nonce prepended to the ciphertext.
+// Wire format: [12:nonce][16:tag][N:ciphertext].
+//
+// StoreCipher and StoreCompression enums are defined here.
+// FileStoreConfig.cs references them for FileStoreConfig.Cipher / .Compression.
+//
+// Key requirement: 32 bytes (256-bit) for both ciphers.
+
+using System.Security.Cryptography;
+
+namespace NATS.Server.JetStream.Storage;
+
+// Go: server/filestore.go:85
+///
+/// Selects the symmetric cipher used for block encryption.
+/// Mirrors Go's StoreCipher type (filestore.go:85).
+///
+public enum StoreCipher
+{
+ // Go: NoCipher — encryption disabled
+ NoCipher = 0,
+
+ // Go: ChaCha — ChaCha20-Poly1305
+ ChaCha = 1,
+
+ // Go: AES — AES-256-GCM
+ Aes = 2,
+}
+
+// Go: server/filestore.go:106
+///
+/// Selects the compression algorithm applied to message payloads.
+/// Mirrors Go's StoreCompression type (filestore.go:106).
+///
+public enum StoreCompression
+{
+ // Go: NoCompression — no compression applied
+ NoCompression = 0,
+
+ // Go: S2Compression — S2 (Snappy variant) block compression
+ S2Compression = 1,
+}
+
+///
+/// Provides AEAD encrypt/decrypt operations for FileStore payloads using
+/// ChaCha20-Poly1305 or AES-256-GCM, matching the Go server's encryption
+/// (filestore.go ~line 300-320).
+///
+internal static class AeadEncryptor
+{
+ /// Nonce size in bytes (96-bit / 12 bytes, standard for both ciphers).
+ public const int NonceSize = 12;
+
+ /// Authentication tag size in bytes (128-bit / 16 bytes).
+ public const int TagSize = 16;
+
+ /// Required key size in bytes (256-bit).
+ public const int KeySize = 32;
+
+ ///
+ /// Encrypts with the given
+ /// and .
+ ///
+ ///
+ /// Wire format: [12:nonce][16:tag][N:ciphertext]
+ ///
+ /// If key length is not 32 bytes.
+ /// If cipher is NoCipher or unknown.
+ public static byte[] Encrypt(ReadOnlySpan plaintext, byte[] key, StoreCipher cipher)
+ {
+ ValidateKey(key);
+
+ // Generate a random 12-byte nonce.
+ var nonce = new byte[NonceSize];
+ RandomNumberGenerator.Fill(nonce);
+
+ // Output: nonce (12) + tag (16) + ciphertext (N)
+ var output = new byte[NonceSize + TagSize + plaintext.Length];
+ nonce.CopyTo(output.AsSpan(0, NonceSize));
+
+ var tagDest = output.AsSpan(NonceSize, TagSize);
+ var ciphertextDest = output.AsSpan(NonceSize + TagSize, plaintext.Length);
+
+ switch (cipher)
+ {
+ case StoreCipher.ChaCha:
+ using (var chacha = new ChaCha20Poly1305(key))
+ {
+ chacha.Encrypt(nonce, plaintext, ciphertextDest, tagDest);
+ }
+ break;
+
+ case StoreCipher.Aes:
+ using (var aes = new AesGcm(key, TagSize))
+ {
+ aes.Encrypt(nonce, plaintext, ciphertextDest, tagDest);
+ }
+ break;
+
+ default:
+ throw new ArgumentOutOfRangeException(nameof(cipher), cipher,
+ "Cipher must be ChaCha or Aes for AEAD encryption.");
+ }
+
+ return output;
+ }
+
+ ///
+ /// Decrypts data produced by .
+ ///
+ /// Plaintext bytes.
+ /// If key length is not 32 bytes or data is too short.
+ /// If authentication tag verification fails.
+ public static byte[] Decrypt(ReadOnlySpan encrypted, byte[] key, StoreCipher cipher)
+ {
+ ValidateKey(key);
+
+ var minLength = NonceSize + TagSize;
+ if (encrypted.Length < minLength)
+ throw new ArgumentException(
+ $"Encrypted data is too short: {encrypted.Length} < {minLength}.",
+ nameof(encrypted));
+
+ var nonce = encrypted[..NonceSize];
+ var tag = encrypted.Slice(NonceSize, TagSize);
+ var ciphertext = encrypted[(NonceSize + TagSize)..];
+
+ var plaintext = new byte[ciphertext.Length];
+
+ switch (cipher)
+ {
+ case StoreCipher.ChaCha:
+ using (var chacha = new ChaCha20Poly1305(key))
+ {
+ chacha.Decrypt(nonce, ciphertext, tag, plaintext);
+ }
+ break;
+
+ case StoreCipher.Aes:
+ using (var aes = new AesGcm(key, TagSize))
+ {
+ aes.Decrypt(nonce, ciphertext, tag, plaintext);
+ }
+ break;
+
+ default:
+ throw new ArgumentOutOfRangeException(nameof(cipher), cipher,
+ "Cipher must be ChaCha or Aes for AEAD decryption.");
+ }
+
+ return plaintext;
+ }
+
+ private static void ValidateKey(byte[] key)
+ {
+ if (key is null || key.Length != KeySize)
+ throw new ArgumentException(
+ $"Encryption key must be exactly {KeySize} bytes (got {key?.Length ?? 0}).",
+ nameof(key));
+ }
+}
diff --git a/src/NATS.Server/JetStream/Storage/FileStore.cs b/src/NATS.Server/JetStream/Storage/FileStore.cs
index 0c3419a..2518520 100644
--- a/src/NATS.Server/JetStream/Storage/FileStore.cs
+++ b/src/NATS.Server/JetStream/Storage/FileStore.cs
@@ -22,6 +22,10 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
private long _activeBlockBytes;
private long _writeOffset;
+ // Resolved at construction time: which format family to use.
+ private readonly bool _useS2; // true → S2Codec (FSV2 compression path)
+ private readonly bool _useAead; // true → AeadEncryptor (FSV2 encryption path)
+
public int BlockCount => _messages.Count == 0 ? 0 : Math.Max(_blockCount, 1);
public bool UsedIndexManifestOnStartup { get; private set; }
@@ -31,6 +35,10 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
if (_options.BlockSizeBytes <= 0)
_options.BlockSizeBytes = 64 * 1024;
+ // Determine which format path is active.
+ _useS2 = _options.Compression == StoreCompression.S2Compression;
+ _useAead = _options.Cipher != StoreCipher.NoCipher;
+
Directory.CreateDirectory(options.Directory);
_dataFilePath = Path.Combine(options.Directory, "messages.jsonl");
_manifestPath = Path.Combine(options.Directory, _options.IndexManifestFileName);
@@ -344,37 +352,68 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
RewriteDataFile();
}
- private sealed class FileRecord
- {
- public ulong Sequence { get; init; }
- public string? Subject { get; init; }
- public string? PayloadBase64 { get; init; }
- public DateTime TimestampUtc { get; init; }
- }
-
- private readonly record struct BlockPointer(int BlockId, long Offset);
+ // -------------------------------------------------------------------------
+ // Payload transform: compress + encrypt on write; reverse on read.
+ //
+ // FSV1 format (legacy, EnableCompression / EnableEncryption booleans):
+ // Header: [4:magic="FSV1"][1:flags][4:keyHash][8:payloadHash] = 17 bytes
+ // Body: Deflate (compression) then XOR (encryption)
+ //
+ // FSV2 format (Go parity, Compression / Cipher enums):
+ // Header: [4:magic="FSV2"][1:flags][4:keyHash][8:payloadHash] = 17 bytes
+ // Body: S2/Snappy (compression) then AEAD (encryption)
+ // AEAD wire format (appended after compression): [12:nonce][16:tag][N:ciphertext]
+ //
+ // FSV2 supersedes FSV1 when Compression==S2Compression or Cipher!=NoCipher.
+ // On read, magic bytes select the decode path; FSV1 files remain readable.
+ // -------------------------------------------------------------------------
private byte[] TransformForPersist(ReadOnlySpan payload)
{
var plaintext = payload.ToArray();
var transformed = plaintext;
byte flags = 0;
+ byte[] magic;
- if (_options.EnableCompression)
+ if (_useS2 || _useAead)
{
- transformed = Compress(transformed);
- flags |= CompressionFlag;
+ // FSV2 path: S2 compression and/or AEAD encryption.
+ magic = EnvelopeMagicV2;
+
+ if (_useS2)
+ {
+ transformed = S2Codec.Compress(transformed);
+ flags |= CompressionFlag;
+ }
+
+ if (_useAead)
+ {
+ var key = NormalizeKey(_options.EncryptionKey);
+ transformed = AeadEncryptor.Encrypt(transformed, key, _options.Cipher);
+ flags |= EncryptionFlag;
+ }
}
-
- if (_options.EnableEncryption)
+ else
{
- transformed = Xor(transformed, _options.EncryptionKey);
- flags |= EncryptionFlag;
+ // FSV1 legacy path: Deflate + XOR.
+ magic = EnvelopeMagicV1;
+
+ if (_options.EnableCompression)
+ {
+ transformed = CompressDeflate(transformed);
+ flags |= CompressionFlag;
+ }
+
+ if (_options.EnableEncryption)
+ {
+ transformed = Xor(transformed, _options.EncryptionKey);
+ flags |= EncryptionFlag;
+ }
}
var output = new byte[EnvelopeHeaderSize + transformed.Length];
- EnvelopeMagic.AsSpan().CopyTo(output.AsSpan(0, EnvelopeMagic.Length));
- output[EnvelopeMagic.Length] = flags;
+ magic.AsSpan().CopyTo(output.AsSpan(0, magic.Length));
+ output[magic.Length] = flags;
BinaryPrimitives.WriteUInt32LittleEndian(output.AsSpan(5, 4), ComputeKeyHash(_options.EncryptionKey));
BinaryPrimitives.WriteUInt64LittleEndian(output.AsSpan(9, 8), ComputePayloadHash(plaintext));
transformed.CopyTo(output.AsSpan(EnvelopeHeaderSize));
@@ -383,19 +422,36 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
private byte[] RestorePayload(ReadOnlySpan persisted)
{
- if (TryReadEnvelope(persisted, out var flags, out var keyHash, out var payloadHash, out var payload))
+ if (TryReadEnvelope(persisted, out var version, out var flags, out var keyHash, out var payloadHash, out var body))
{
- var data = payload.ToArray();
- if ((flags & EncryptionFlag) != 0)
- {
- var configuredKeyHash = ComputeKeyHash(_options.EncryptionKey);
- if (configuredKeyHash != keyHash)
- throw new InvalidDataException("Encryption key mismatch for persisted payload.");
- data = Xor(data, _options.EncryptionKey);
- }
+ var data = body.ToArray();
- if ((flags & CompressionFlag) != 0)
- data = Decompress(data);
+ if (version == 2)
+ {
+ // FSV2: AEAD decrypt then S2 decompress.
+ if ((flags & EncryptionFlag) != 0)
+ {
+ var key = NormalizeKey(_options.EncryptionKey);
+ data = AeadEncryptor.Decrypt(data, key, _options.Cipher);
+ }
+
+ if ((flags & CompressionFlag) != 0)
+ data = S2Codec.Decompress(data);
+ }
+ else
+ {
+ // FSV1: XOR decrypt then Deflate decompress.
+ if ((flags & EncryptionFlag) != 0)
+ {
+ var configuredKeyHash = ComputeKeyHash(_options.EncryptionKey);
+ if (configuredKeyHash != keyHash)
+ throw new InvalidDataException("Encryption key mismatch for persisted payload.");
+ data = Xor(data, _options.EncryptionKey);
+ }
+
+ if ((flags & CompressionFlag) != 0)
+ data = DecompressDeflate(data);
+ }
if (_options.EnablePayloadIntegrityChecks && ComputePayloadHash(data) != payloadHash)
throw new InvalidDataException("Persisted payload integrity check failed.");
@@ -403,15 +459,35 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
return data;
}
- // Legacy format fallback for pre-envelope data.
+ // Legacy format fallback for pre-envelope data (no header at all).
var legacy = persisted.ToArray();
if (_options.EnableEncryption)
legacy = Xor(legacy, _options.EncryptionKey);
if (_options.EnableCompression)
- legacy = Decompress(legacy);
+ legacy = DecompressDeflate(legacy);
return legacy;
}
+ // -------------------------------------------------------------------------
+ // Helpers
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Ensures the encryption key is exactly 32 bytes (padding with zeros or
+ /// truncating), matching the Go server's key normalisation for AEAD ciphers.
+ /// Only called for FSV2 AEAD path; FSV1 XOR accepts arbitrary key lengths.
+ ///
+ private static byte[] NormalizeKey(byte[]? key)
+ {
+ var normalized = new byte[AeadEncryptor.KeySize];
+ if (key is { Length: > 0 })
+ {
+ var copyLen = Math.Min(key.Length, AeadEncryptor.KeySize);
+ key.AsSpan(0, copyLen).CopyTo(normalized.AsSpan());
+ }
+ return normalized;
+ }
+
private static byte[] Xor(ReadOnlySpan data, byte[]? key)
{
if (key == null || key.Length == 0)
@@ -423,7 +499,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
return output;
}
- private static byte[] Compress(ReadOnlySpan data)
+ private static byte[] CompressDeflate(ReadOnlySpan data)
{
using var output = new MemoryStream();
using (var stream = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Fastest, leaveOpen: true))
@@ -434,7 +510,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
return output.ToArray();
}
- private static byte[] Decompress(ReadOnlySpan data)
+ private static byte[] DecompressDeflate(ReadOnlySpan data)
{
using var input = new MemoryStream(data.ToArray());
using var stream = new System.IO.Compression.DeflateStream(input, System.IO.Compression.CompressionMode.Decompress);
@@ -445,20 +521,30 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
private static bool TryReadEnvelope(
ReadOnlySpan persisted,
+ out int version,
out byte flags,
out uint keyHash,
out ulong payloadHash,
out ReadOnlySpan payload)
{
+ version = 0;
flags = 0;
keyHash = 0;
payloadHash = 0;
payload = ReadOnlySpan.Empty;
- if (persisted.Length < EnvelopeHeaderSize || !persisted[..EnvelopeMagic.Length].SequenceEqual(EnvelopeMagic))
+ if (persisted.Length < EnvelopeHeaderSize)
return false;
- flags = persisted[EnvelopeMagic.Length];
+ var magic = persisted[..EnvelopeMagicV1.Length];
+ if (magic.SequenceEqual(EnvelopeMagicV1))
+ version = 1;
+ else if (magic.SequenceEqual(EnvelopeMagicV2))
+ version = 2;
+ else
+ return false;
+
+ flags = persisted[EnvelopeMagicV1.Length];
keyHash = BinaryPrimitives.ReadUInt32LittleEndian(persisted.Slice(5, 4));
payloadHash = BinaryPrimitives.ReadUInt64LittleEndian(persisted.Slice(9, 8));
payload = persisted[EnvelopeHeaderSize..];
@@ -484,8 +570,24 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
private const byte CompressionFlag = 0b0000_0001;
private const byte EncryptionFlag = 0b0000_0010;
- private static readonly byte[] EnvelopeMagic = "FSV1"u8.ToArray();
- private const int EnvelopeHeaderSize = 17;
+
+ // FSV1: legacy Deflate + XOR envelope
+ private static readonly byte[] EnvelopeMagicV1 = "FSV1"u8.ToArray();
+
+ // FSV2: Go-parity S2 + AEAD envelope (filestore.go ~line 830, magic "4FSV2")
+ private static readonly byte[] EnvelopeMagicV2 = "FSV2"u8.ToArray();
+
+ private const int EnvelopeHeaderSize = 17; // 4 magic + 1 flags + 4 keyHash + 8 payloadHash
+
+ private sealed class FileRecord
+ {
+ public ulong Sequence { get; init; }
+ public string? Subject { get; init; }
+ public string? PayloadBase64 { get; init; }
+ public DateTime TimestampUtc { get; init; }
+ }
+
+ private readonly record struct BlockPointer(int BlockId, long Offset);
private sealed class IndexManifest
{
diff --git a/src/NATS.Server/JetStream/Storage/FileStoreConfig.cs b/src/NATS.Server/JetStream/Storage/FileStoreConfig.cs
index bc1f32d..a0d9efc 100644
--- a/src/NATS.Server/JetStream/Storage/FileStoreConfig.cs
+++ b/src/NATS.Server/JetStream/Storage/FileStoreConfig.cs
@@ -1,36 +1,6 @@
namespace NATS.Server.JetStream.Storage;
-// Go: server/filestore.go:85
-///
-/// Selects the symmetric cipher used for block encryption.
-/// ChaCha is the default (ChaCha20-Poly1305); AES uses AES-256-GCM.
-/// Mirrors Go's StoreCipher type (filestore.go:85).
-///
-public enum StoreCipher
-{
- // Go: ChaCha — ChaCha20-Poly1305 (default)
- ChaCha,
-
- // Go: AES — AES-256-GCM
- Aes,
-
- // Go: NoCipher — encryption disabled
- None,
-}
-
-// Go: server/filestore.go:106
-///
-/// Selects the compression algorithm applied to each message block.
-/// Mirrors Go's StoreCompression type (filestore.go:106).
-///
-public enum StoreCompression : byte
-{
- // Go: NoCompression — no compression applied
- None = 0,
-
- // Go: S2Compression — S2 (Snappy variant) block compression
- S2 = 1,
-}
+// StoreCipher and StoreCompression are defined in AeadEncryptor.cs (Task 4).
// Go: server/filestore.go:55
///
@@ -67,9 +37,9 @@ public sealed class FileStoreConfig
// flushed asynchronously for higher throughput
public bool AsyncFlush { get; set; }
- // Go: FileStoreConfig.Cipher — cipher used for at-rest encryption; None disables it
- public StoreCipher Cipher { get; set; } = StoreCipher.None;
+ // Go: FileStoreConfig.Cipher — cipher used for at-rest encryption; NoCipher disables it
+ public StoreCipher Cipher { get; set; } = StoreCipher.NoCipher;
// Go: FileStoreConfig.Compression — compression algorithm applied to block data
- public StoreCompression Compression { get; set; } = StoreCompression.None;
+ public StoreCompression Compression { get; set; } = StoreCompression.NoCompression;
}
diff --git a/src/NATS.Server/JetStream/Storage/FileStoreOptions.cs b/src/NATS.Server/JetStream/Storage/FileStoreOptions.cs
index 0e081ac..5b8eea8 100644
--- a/src/NATS.Server/JetStream/Storage/FileStoreOptions.cs
+++ b/src/NATS.Server/JetStream/Storage/FileStoreOptions.cs
@@ -6,8 +6,20 @@ public sealed class FileStoreOptions
public int BlockSizeBytes { get; set; } = 64 * 1024;
public string IndexManifestFileName { get; set; } = "index.manifest.json";
public int MaxAgeMs { get; set; }
+
+ // Legacy boolean compression / encryption flags (FSV1 envelope format).
+ // When set and the corresponding enum is left at its default (NoCompression /
+ // NoCipher), the legacy Deflate / XOR path is used for backward compatibility.
public bool EnableCompression { get; set; }
public bool EnableEncryption { get; set; }
+
public bool EnablePayloadIntegrityChecks { get; set; } = true;
public byte[]? EncryptionKey { get; set; }
+
+ // Go parity: StoreCompression / StoreCipher (filestore.go ~line 91-92).
+ // When Compression == S2Compression the S2/Snappy codec is used (FSV2 envelope).
+ // When Cipher != NoCipher an AEAD cipher is used instead of the legacy XOR.
+ // Enums are defined in AeadEncryptor.cs.
+ public StoreCompression Compression { get; set; } = StoreCompression.NoCompression;
+ public StoreCipher Cipher { get; set; } = StoreCipher.NoCipher;
}
diff --git a/src/NATS.Server/Raft/RaftWireFormat.cs b/src/NATS.Server/Raft/RaftWireFormat.cs
new file mode 100644
index 0000000..62d85e0
--- /dev/null
+++ b/src/NATS.Server/Raft/RaftWireFormat.cs
@@ -0,0 +1,430 @@
+using System.Buffers.Binary;
+using System.Text;
+
+namespace NATS.Server.Raft;
+
+// Binary wire format types matching Go's raft.go encoding exactly.
+// Go reference: golang/nats-server/server/raft.go
+//
+// All integers are little-endian. ID fields are exactly 8 bytes, zero-padded
+// if shorter (or truncated if longer), matching Go's idLen = 8 constant.
+// Go: server/raft.go:2756 — const idLen = 8
+
+///
+/// Wire-format constants matching Go's raft.go definitions.
+/// Go: server/raft.go:2756-2757
+///
+internal static class RaftWireConstants
+{
+ ///
+ /// Fixed width of all peer/leader/candidate ID fields on the wire.
+ /// Go: server/raft.go:2756 — const idLen = 8
+ ///
+ public const int IdLen = 8;
+
+ ///
+ /// Fixed byte length of a VoteRequest message.
+ /// Go: server/raft.go:4558 — const voteRequestLen = 24 + idLen = 32
+ ///
+ public const int VoteRequestLen = 24 + IdLen; // 32
+
+ ///
+ /// Fixed byte length of a VoteResponse message.
+ /// Go: server/raft.go:4737 — const voteResponseLen = 8 + 8 + 1 = 17
+ ///
+ public const int VoteResponseLen = 8 + 8 + 1; // 17
+
+ ///
+ /// Minimum byte length of an AppendEntry message (header only, no entries).
+ /// Go: server/raft.go:2660 — const appendEntryBaseLen = idLen + 4*8 + 2 = 42
+ ///
+ public const int AppendEntryBaseLen = IdLen + 4 * 8 + 2; // 42
+
+ ///
+ /// Fixed byte length of an AppendEntryResponse message.
+ /// Go: server/raft.go:2757 — const appendEntryResponseLen = 24 + 1 = 25
+ ///
+ public const int AppendEntryResponseLen = 24 + 1; // 25
+}
+
+///
+/// Entry types matching Go's EntryType constants.
+/// Go: server/raft.go:2607-2618
+///
+public enum RaftEntryType : byte
+{
+ Normal = 0,
+ OldSnapshot = 1,
+ PeerState = 2,
+ AddPeer = 3,
+ RemovePeer = 4,
+ LeaderTransfer = 5,
+ Snapshot = 6,
+}
+
+///
+/// A single RAFT log entry encoded inside an AppendEntry message.
+/// Wire layout (inline within AppendEntry body):
+/// [4] size uint32 LE — equals 1 + len(Data)
+/// [1] type byte
+/// [*] data raw bytes
+/// Go: server/raft.go:2641-2644 (Entry struct), 2699-2704 (encode loop)
+///
+public readonly record struct RaftEntryWire(RaftEntryType Type, byte[] Data);
+
+///
+/// Binary wire encoding of a RAFT VoteRequest.
+/// Fixed 32-byte layout (little-endian):
+/// [0..7] term uint64
+/// [8..15] lastTerm uint64
+/// [16..23] lastIndex uint64
+/// [24..31] candidateId 8-byte ASCII, zero-padded
+/// Go: server/raft.go:4549-4583 (voteRequest struct, encode, decodeVoteRequest)
+///
+public readonly record struct RaftVoteRequestWire(
+ ulong Term,
+ ulong LastTerm,
+ ulong LastIndex,
+ string CandidateId)
+{
+ ///
+ /// Encodes this VoteRequest to a 32-byte little-endian buffer.
+ /// Go: server/raft.go:4560-4568 — voteRequest.encode()
+ ///
+ public byte[] Encode()
+ {
+ var buf = new byte[RaftWireConstants.VoteRequestLen];
+ BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(0), Term);
+ BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(8), LastTerm);
+ BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(16), LastIndex);
+ RaftWireHelpers.WriteId(buf.AsSpan(24), CandidateId);
+ return buf;
+ }
+
+ ///
+ /// Decodes a VoteRequest from a span. Throws
+ /// if the span is not exactly 32 bytes.
+ /// Go: server/raft.go:4571-4583 — decodeVoteRequest()
+ ///
+ public static RaftVoteRequestWire Decode(ReadOnlySpan msg)
+ {
+ if (msg.Length != RaftWireConstants.VoteRequestLen)
+ throw new ArgumentException(
+ $"VoteRequest requires exactly {RaftWireConstants.VoteRequestLen} bytes, got {msg.Length}.",
+ nameof(msg));
+
+ return new RaftVoteRequestWire(
+ Term: BinaryPrimitives.ReadUInt64LittleEndian(msg[0..]),
+ LastTerm: BinaryPrimitives.ReadUInt64LittleEndian(msg[8..]),
+ LastIndex: BinaryPrimitives.ReadUInt64LittleEndian(msg[16..]),
+ CandidateId: RaftWireHelpers.ReadId(msg[24..]));
+ }
+}
+
+///
+/// Binary wire encoding of a RAFT VoteResponse.
+/// Fixed 17-byte layout (little-endian):
+/// [0..7] term uint64
+/// [8..15] peer 8-byte ASCII, zero-padded
+/// [16] flags bit 0 = granted, bit 1 = empty-log marker
+/// Go: server/raft.go:4729-4762 (voteResponse struct, encode, decodeVoteResponse)
+///
+public readonly record struct RaftVoteResponseWire(
+ ulong Term,
+ string PeerId,
+ bool Granted,
+ bool Empty = false)
+{
+ ///
+ /// Encodes this VoteResponse to a 17-byte buffer.
+ /// Go: server/raft.go:4739-4751 — voteResponse.encode()
+ ///
+ public byte[] Encode()
+ {
+ var buf = new byte[RaftWireConstants.VoteResponseLen];
+ BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(0), Term);
+ RaftWireHelpers.WriteId(buf.AsSpan(8), PeerId);
+ byte flags = 0;
+ if (Granted) flags |= 1;
+ if (Empty) flags |= 2;
+ buf[16] = flags;
+ return buf;
+ }
+
+ ///
+ /// Decodes a VoteResponse from a span. Throws
+ /// if the span is not exactly 17 bytes.
+ /// Go: server/raft.go:4753-4762 — decodeVoteResponse()
+ ///
+ public static RaftVoteResponseWire Decode(ReadOnlySpan msg)
+ {
+ if (msg.Length != RaftWireConstants.VoteResponseLen)
+ throw new ArgumentException(
+ $"VoteResponse requires exactly {RaftWireConstants.VoteResponseLen} bytes, got {msg.Length}.",
+ nameof(msg));
+
+ var flags = msg[16];
+ return new RaftVoteResponseWire(
+ Term: BinaryPrimitives.ReadUInt64LittleEndian(msg[0..]),
+ PeerId: RaftWireHelpers.ReadId(msg[8..]),
+ Granted: (flags & 1) != 0,
+ Empty: (flags & 2) != 0);
+ }
+}
+
+///
+/// Binary wire encoding of a RAFT AppendEntry message (variable length).
+/// Layout (little-endian):
+/// [0..7] leaderId 8-byte ASCII, zero-padded
+/// [8..15] term uint64
+/// [16..23] commit uint64
+/// [24..31] pterm uint64
+/// [32..39] pindex uint64
+/// [40..41] entryCount uint16
+/// [42+] entries each: [4:size uint32][1:type][data...]
+/// where size = 1 + len(data)
+/// [tail] leaderTerm uvarint (appended after entries; old nodes ignore it)
+/// Go: server/raft.go:2557-2569 (appendEntry struct), 2662-2746 (encode/decode)
+///
+public readonly record struct RaftAppendEntryWire(
+ string LeaderId,
+ ulong Term,
+ ulong Commit,
+ ulong PrevTerm,
+ ulong PrevIndex,
+ IReadOnlyList Entries,
+ ulong LeaderTerm = 0)
+{
+ ///
+ /// Encodes this AppendEntry to a byte array.
+ /// Go: server/raft.go:2662-2711 — appendEntry.encode()
+ ///
+ public byte[] Encode()
+ {
+ if (Entries.Count > ushort.MaxValue)
+ throw new ArgumentException($"Too many entries: {Entries.Count} exceeds uint16 max.", nameof(Entries));
+
+ // Calculate total entry data size.
+ // Go: server/raft.go:2670-2678 — elen += ulen + 1 + 4
+ var elen = 0;
+ foreach (var e in Entries)
+ elen += 4 + 1 + e.Data.Length; // 4-byte size prefix + 1-byte type + data
+
+ // Encode leaderTerm as uvarint.
+ // Go: server/raft.go:2681-2682 — binary.PutUvarint(_lterm[:], ae.lterm)
+ Span ltermBuf = stackalloc byte[10];
+ var ltermLen = RaftWireHelpers.WriteUvarint(ltermBuf, LeaderTerm);
+
+ var totalLen = RaftWireConstants.AppendEntryBaseLen + elen + ltermLen;
+ var buf = new byte[totalLen];
+ var span = buf.AsSpan();
+
+ // Go: server/raft.go:2693-2698 — copy leader and write fixed fields
+ RaftWireHelpers.WriteId(span[0..], LeaderId);
+ BinaryPrimitives.WriteUInt64LittleEndian(span[8..], Term);
+ BinaryPrimitives.WriteUInt64LittleEndian(span[16..], Commit);
+ BinaryPrimitives.WriteUInt64LittleEndian(span[24..], PrevTerm);
+ BinaryPrimitives.WriteUInt64LittleEndian(span[32..], PrevIndex);
+ BinaryPrimitives.WriteUInt16LittleEndian(span[40..], (ushort)Entries.Count);
+
+ // Go: server/raft.go:2699-2705 — encode each entry
+ var pos = RaftWireConstants.AppendEntryBaseLen;
+ foreach (var e in Entries)
+ {
+ // size = 1 (type) + len(data)
+ // Go: server/raft.go:2702 — le.AppendUint32(buf, uint32(1+len(e.Data)))
+ BinaryPrimitives.WriteUInt32LittleEndian(span[pos..], (uint)(1 + e.Data.Length));
+ pos += 4;
+ buf[pos++] = (byte)e.Type;
+ e.Data.CopyTo(span[pos..]);
+ pos += e.Data.Length;
+ }
+
+ // Append leaderTerm uvarint.
+ // Go: server/raft.go:2709 — buf = append(buf, lterm...)
+ ltermBuf[..ltermLen].CopyTo(span[pos..]);
+
+ return buf;
+ }
+
+ ///
+ /// Decodes an AppendEntry from a span. Throws
+ /// if the buffer is shorter than the minimum header length or malformed.
+ /// Go: server/raft.go:2714-2746 — decodeAppendEntry()
+ ///
+ public static RaftAppendEntryWire Decode(ReadOnlySpan msg)
+ {
+ if (msg.Length < RaftWireConstants.AppendEntryBaseLen)
+ throw new ArgumentException(
+ $"AppendEntry requires at least {RaftWireConstants.AppendEntryBaseLen} bytes, got {msg.Length}.",
+ nameof(msg));
+
+ // Go: server/raft.go:2721 — ae := newAppendEntry(string(msg[:idLen]), ...)
+ var leaderId = RaftWireHelpers.ReadId(msg[0..]);
+ var term = BinaryPrimitives.ReadUInt64LittleEndian(msg[8..]);
+ var commit = BinaryPrimitives.ReadUInt64LittleEndian(msg[16..]);
+ var pterm = BinaryPrimitives.ReadUInt64LittleEndian(msg[24..]);
+ var pindex = BinaryPrimitives.ReadUInt64LittleEndian(msg[32..]);
+
+ // Go: server/raft.go:2725 — ne, ri := int(le.Uint16(msg[40:])), uint64(42)
+ var entryCount = BinaryPrimitives.ReadUInt16LittleEndian(msg[40..]);
+ var entries = new List(entryCount);
+ var ri = RaftWireConstants.AppendEntryBaseLen;
+
+ // Go: server/raft.go:2726-2737 — decode entries loop
+ for (var i = 0; i < entryCount; i++)
+ {
+ if (ri >= msg.Length - 1)
+ throw new ArgumentException("AppendEntry buffer truncated while reading entries.", nameof(msg));
+
+ var ml = (int)BinaryPrimitives.ReadUInt32LittleEndian(msg[ri..]);
+ ri += 4;
+
+ if (ml <= 0 || ri + ml > msg.Length)
+ throw new ArgumentException("AppendEntry entry size is out of bounds.", nameof(msg));
+
+ var entryType = (RaftEntryType)msg[ri];
+ var data = msg[(ri + 1)..(ri + ml)].ToArray();
+ entries.Add(new RaftEntryWire(entryType, data));
+ ri += ml;
+ }
+
+ // Decode optional leaderTerm uvarint from tail bytes.
+ // Go: server/raft.go:2739-2743 — if lterm, n := binary.Uvarint(msg[ri:]); n > 0 ...
+ ulong lterm = 0;
+ if (ri < msg.Length)
+ RaftWireHelpers.ReadUvarint(msg[ri..], out lterm);
+
+ return new RaftAppendEntryWire(
+ LeaderId: leaderId,
+ Term: term,
+ Commit: commit,
+ PrevTerm: pterm,
+ PrevIndex: pindex,
+ Entries: entries,
+ LeaderTerm: lterm);
+ }
+}
+
+///
+/// Binary wire encoding of a RAFT AppendEntryResponse.
+/// Fixed 25-byte layout (little-endian):
+/// [0..7] term uint64
+/// [8..15] index uint64
+/// [16..23] peerId 8-byte ASCII, zero-padded
+/// [24] success 0 or 1
+/// Go: server/raft.go:2760-2817 (appendEntryResponse struct, encode, decodeAppendEntryResponse)
+///
+public readonly record struct RaftAppendEntryResponseWire(
+ ulong Term,
+ ulong Index,
+ string PeerId,
+ bool Success)
+{
+ ///
+ /// Encodes this AppendEntryResponse to a 25-byte buffer.
+ /// Go: server/raft.go:2777-2794 — appendEntryResponse.encode()
+ ///
+ public byte[] Encode()
+ {
+ var buf = new byte[RaftWireConstants.AppendEntryResponseLen];
+ BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(0), Term);
+ BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(8), Index);
+ RaftWireHelpers.WriteId(buf.AsSpan(16), PeerId);
+ buf[24] = Success ? (byte)1 : (byte)0;
+ return buf;
+ }
+
+ ///
+ /// Decodes an AppendEntryResponse from a span. Throws
+ /// if the span is not exactly 25 bytes.
+ /// Go: server/raft.go:2799-2817 — decodeAppendEntryResponse()
+ ///
+ public static RaftAppendEntryResponseWire Decode(ReadOnlySpan msg)
+ {
+ if (msg.Length != RaftWireConstants.AppendEntryResponseLen)
+ throw new ArgumentException(
+ $"AppendEntryResponse requires exactly {RaftWireConstants.AppendEntryResponseLen} bytes, got {msg.Length}.",
+ nameof(msg));
+
+ return new RaftAppendEntryResponseWire(
+ Term: BinaryPrimitives.ReadUInt64LittleEndian(msg[0..]),
+ Index: BinaryPrimitives.ReadUInt64LittleEndian(msg[8..]),
+ PeerId: RaftWireHelpers.ReadId(msg[16..]),
+ // Go: server/raft.go:2815 — ar.success = msg[24] == 1
+ Success: msg[24] == 1);
+ }
+}
+
+///
+/// Shared encoding helpers for all RAFT wire format types.
+///
+internal static class RaftWireHelpers
+{
+ ///
+ /// Writes a peer/leader ID to an 8-byte span. IDs shorter than 8 bytes are
+ /// zero-padded; IDs longer than 8 bytes are silently truncated (matching Go's
+ /// copy(buf[:idLen], id) semantics).
+ /// Go: server/raft.go:2693 — copy(buf[:idLen], ae.leader)
+ ///
+ public static void WriteId(Span dest, string id)
+ {
+ // Zero-fill the 8-byte slot first.
+ dest[..RaftWireConstants.IdLen].Clear();
+ var bytes = Encoding.ASCII.GetBytes(id);
+ var copyLen = Math.Min(bytes.Length, RaftWireConstants.IdLen);
+ bytes.AsSpan(0, copyLen).CopyTo(dest);
+ }
+
+ ///
+ /// Reads a peer/leader ID from an 8-byte span, trimming trailing null bytes so
+ /// that zero-padded IDs decode back to their original string.
+ /// Go: server/raft.go:4581 — string(copyBytes(msg[24:24+idLen]))
+ ///
+ public static string ReadId(ReadOnlySpan src)
+ {
+ var idBytes = src[..RaftWireConstants.IdLen];
+ var len = idBytes.Length;
+ while (len > 0 && idBytes[len - 1] == 0)
+ len--;
+ return Encoding.ASCII.GetString(idBytes[..len]);
+ }
+
+ ///
+ /// Writes a uint64 as a uvarint into and returns the
+ /// number of bytes written (1-10).
+ /// Go: server/raft.go:2682 — binary.PutUvarint(_lterm[:], ae.lterm)
+ ///
+ public static int WriteUvarint(Span buf, ulong value)
+ {
+ var pos = 0;
+ while (value > 0x7F)
+ {
+ buf[pos++] = (byte)((value & 0x7F) | 0x80);
+ value >>= 7;
+ }
+ buf[pos++] = (byte)value;
+ return pos;
+ }
+
+ ///
+ /// Reads a uvarint from into
+ /// and returns the number of bytes consumed (0 on overflow or empty input).
+ /// Go: server/raft.go:2740 — binary.Uvarint(msg[ri:])
+ ///
+ public static int ReadUvarint(ReadOnlySpan buf, out ulong value)
+ {
+ value = 0;
+ var shift = 0;
+ for (var i = 0; i < buf.Length && i < 10; i++)
+ {
+ var b = buf[i];
+ value |= ((ulong)(b & 0x7F)) << shift;
+ if ((b & 0x80) == 0)
+ return i + 1;
+ shift += 7;
+ }
+ value = 0;
+ return 0; // overflow or empty
+ }
+}
diff --git a/tests/NATS.Server.Tests/JetStream/JetStreamServiceOrchestrationTests.cs b/tests/NATS.Server.Tests/JetStream/JetStreamServiceOrchestrationTests.cs
new file mode 100644
index 0000000..352a218
--- /dev/null
+++ b/tests/NATS.Server.Tests/JetStream/JetStreamServiceOrchestrationTests.cs
@@ -0,0 +1,242 @@
+// Ported from golang/nats-server/server/jetstream.go:414-523 (enableJetStream)
+// Tests for JetStreamService lifecycle orchestration: store directory creation,
+// API subject registration, configuration property exposure, and dispose semantics.
+
+using NATS.Server.Configuration;
+using NATS.Server.JetStream;
+
+namespace NATS.Server.Tests.JetStream;
+
+public sealed class JetStreamServiceOrchestrationTests : IDisposable
+{
+ private readonly List _tempDirs = [];
+
+ private string MakeTempDir()
+ {
+ var path = Path.Combine(Path.GetTempPath(), "nats-js-test-" + Guid.NewGuid().ToString("N"));
+ _tempDirs.Add(path);
+ return path;
+ }
+
+ public void Dispose()
+ {
+ foreach (var dir in _tempDirs)
+ {
+ if (Directory.Exists(dir))
+ Directory.Delete(dir, recursive: true);
+ }
+ }
+
+ // Go: enableJetStream — jetstream.go:414 — happy path creates store dir and marks running
+ [Fact]
+ public async Task StartAsync_creates_store_directory_and_marks_running()
+ {
+ var storeDir = MakeTempDir();
+ var options = new JetStreamOptions { StoreDir = storeDir };
+ await using var svc = new JetStreamService(options);
+
+ Directory.Exists(storeDir).ShouldBeFalse("directory must not exist before start");
+
+ await svc.StartAsync(CancellationToken.None);
+
+ svc.IsRunning.ShouldBeTrue();
+ Directory.Exists(storeDir).ShouldBeTrue("StartAsync must create the store directory");
+ }
+
+ // Go: enableJetStream — jetstream.go:430 — existing dir is accepted without error
+ [Fact]
+ public async Task StartAsync_accepts_preexisting_store_directory()
+ {
+ var storeDir = MakeTempDir();
+ Directory.CreateDirectory(storeDir);
+ var options = new JetStreamOptions { StoreDir = storeDir };
+ await using var svc = new JetStreamService(options);
+
+ await svc.StartAsync(CancellationToken.None);
+
+ svc.IsRunning.ShouldBeTrue();
+ Directory.Exists(storeDir).ShouldBeTrue();
+ }
+
+ // Go: enableJetStream — memory-only mode when StoreDir is empty
+ [Fact]
+ public async Task StartAsync_with_empty_StoreDir_starts_in_memory_only_mode()
+ {
+ var options = new JetStreamOptions { StoreDir = string.Empty };
+ await using var svc = new JetStreamService(options);
+
+ await svc.StartAsync(CancellationToken.None);
+
+ svc.IsRunning.ShouldBeTrue();
+ }
+
+ // Go: setJetStreamExportSubs — jetstream.go:489 — all $JS.API subjects registered
+ [Fact]
+ public async Task RegisteredApiSubjects_contains_expected_subjects_after_start()
+ {
+ var options = new JetStreamOptions();
+ await using var svc = new JetStreamService(options);
+
+ await svc.StartAsync(CancellationToken.None);
+
+ var subjects = svc.RegisteredApiSubjects;
+ subjects.ShouldNotBeEmpty();
+ subjects.ShouldContain("$JS.API.>");
+ subjects.ShouldContain("$JS.API.INFO");
+ subjects.ShouldContain("$JS.API.META.LEADER.STEPDOWN");
+ subjects.ShouldContain("$JS.API.STREAM.NAMES");
+ subjects.ShouldContain("$JS.API.STREAM.LIST");
+ }
+
+ // Go: setJetStreamExportSubs — all consumer-related wildcards registered
+ [Fact]
+ public async Task RegisteredApiSubjects_includes_consumer_and_stream_wildcard_subjects()
+ {
+ var options = new JetStreamOptions();
+ await using var svc = new JetStreamService(options);
+
+ await svc.StartAsync(CancellationToken.None);
+
+ var subjects = svc.RegisteredApiSubjects;
+
+ // Stream management
+ subjects.ShouldContain(s => s.StartsWith("$JS.API.STREAM.CREATE."), "stream create wildcard");
+ subjects.ShouldContain(s => s.StartsWith("$JS.API.STREAM.DELETE."), "stream delete wildcard");
+ subjects.ShouldContain(s => s.StartsWith("$JS.API.STREAM.INFO."), "stream info wildcard");
+ subjects.ShouldContain(s => s.StartsWith("$JS.API.STREAM.UPDATE."), "stream update wildcard");
+ subjects.ShouldContain(s => s.StartsWith("$JS.API.STREAM.PURGE."), "stream purge wildcard");
+ subjects.ShouldContain(s => s.StartsWith("$JS.API.STREAM.MSG.GET."), "stream msg get wildcard");
+ subjects.ShouldContain(s => s.StartsWith("$JS.API.STREAM.MSG.DELETE."), "stream msg delete wildcard");
+ subjects.ShouldContain(s => s.StartsWith("$JS.API.STREAM.SNAPSHOT."), "stream snapshot wildcard");
+ subjects.ShouldContain(s => s.StartsWith("$JS.API.STREAM.RESTORE."), "stream restore wildcard");
+ subjects.ShouldContain(s => s.StartsWith("$JS.API.STREAM.LEADER.STEPDOWN."), "stream leader stepdown wildcard");
+
+ // Consumer management
+ subjects.ShouldContain(s => s.StartsWith("$JS.API.CONSUMER.CREATE."), "consumer create wildcard");
+ subjects.ShouldContain(s => s.StartsWith("$JS.API.CONSUMER.DELETE."), "consumer delete wildcard");
+ subjects.ShouldContain(s => s.StartsWith("$JS.API.CONSUMER.INFO."), "consumer info wildcard");
+ subjects.ShouldContain(s => s.StartsWith("$JS.API.CONSUMER.NAMES."), "consumer names wildcard");
+ subjects.ShouldContain(s => s.StartsWith("$JS.API.CONSUMER.LIST."), "consumer list wildcard");
+ subjects.ShouldContain(s => s.StartsWith("$JS.API.CONSUMER.PAUSE."), "consumer pause wildcard");
+ subjects.ShouldContain(s => s.StartsWith("$JS.API.CONSUMER.MSG.NEXT."), "consumer msg next wildcard");
+
+ // Direct get
+ subjects.ShouldContain(s => s.StartsWith("$JS.API.DIRECT.GET."), "direct get wildcard");
+ }
+
+ // RegisteredApiSubjects should be empty before start
+ [Fact]
+ public void RegisteredApiSubjects_is_empty_before_start()
+ {
+ var options = new JetStreamOptions();
+ var svc = new JetStreamService(options);
+
+ svc.RegisteredApiSubjects.ShouldBeEmpty();
+ }
+
+ // Go: shutdown path — DisposeAsync clears subjects and marks not running
+ [Fact]
+ public async Task DisposeAsync_clears_subjects_and_marks_not_running()
+ {
+ var options = new JetStreamOptions();
+ var svc = new JetStreamService(options);
+
+ await svc.StartAsync(CancellationToken.None);
+ svc.IsRunning.ShouldBeTrue();
+ svc.RegisteredApiSubjects.ShouldNotBeEmpty();
+
+ await svc.DisposeAsync();
+
+ svc.IsRunning.ShouldBeFalse();
+ svc.RegisteredApiSubjects.ShouldBeEmpty();
+ }
+
+ // MaxStreams and MaxConsumers reflect config values
+ [Fact]
+ public async Task MaxStreams_and_MaxConsumers_reflect_config_values()
+ {
+ var options = new JetStreamOptions
+ {
+ MaxStreams = 100,
+ MaxConsumers = 500,
+ };
+ await using var svc = new JetStreamService(options);
+
+ await svc.StartAsync(CancellationToken.None);
+
+ svc.MaxStreams.ShouldBe(100);
+ svc.MaxConsumers.ShouldBe(500);
+ }
+
+ // MaxMemory and MaxStore reflect config values
+ [Fact]
+ public async Task MaxMemory_and_MaxStore_reflect_config_values()
+ {
+ var options = new JetStreamOptions
+ {
+ MaxMemoryStore = 1_073_741_824L, // 1 GiB
+ MaxFileStore = 10_737_418_240L, // 10 GiB
+ };
+ await using var svc = new JetStreamService(options);
+
+ await svc.StartAsync(CancellationToken.None);
+
+ svc.MaxMemory.ShouldBe(1_073_741_824L);
+ svc.MaxStore.ShouldBe(10_737_418_240L);
+ }
+
+ // Default config values are zero (unlimited)
+ [Fact]
+ public void Default_config_values_are_unlimited_zero()
+ {
+ var options = new JetStreamOptions();
+ var svc = new JetStreamService(options);
+
+ svc.MaxStreams.ShouldBe(0);
+ svc.MaxConsumers.ShouldBe(0);
+ svc.MaxMemory.ShouldBe(0L);
+ svc.MaxStore.ShouldBe(0L);
+ }
+
+ // Go: enableJetStream idempotency — double-start is safe (not an error)
+ [Fact]
+ public async Task Double_start_is_idempotent()
+ {
+ var options = new JetStreamOptions();
+ await using var svc = new JetStreamService(options);
+
+ await svc.StartAsync(CancellationToken.None);
+ var subjectCountAfterFirst = svc.RegisteredApiSubjects.Count;
+
+ // Second start must not throw and must not duplicate subjects
+ await svc.StartAsync(CancellationToken.None);
+
+ svc.IsRunning.ShouldBeTrue();
+ svc.RegisteredApiSubjects.Count.ShouldBe(subjectCountAfterFirst);
+ }
+
+ // Store directory is created with a nested path (MkdirAll semantics)
+ [Fact]
+ public async Task StartAsync_creates_nested_store_directory()
+ {
+ var baseDir = MakeTempDir();
+ var nestedDir = Path.Combine(baseDir, "level1", "level2", "jetstream");
+ var options = new JetStreamOptions { StoreDir = nestedDir };
+ await using var svc = new JetStreamService(options);
+
+ await svc.StartAsync(CancellationToken.None);
+
+ svc.IsRunning.ShouldBeTrue();
+ Directory.Exists(nestedDir).ShouldBeTrue("nested store directory must be created");
+ }
+
+ // Service is not running before start
+ [Fact]
+ public void IsRunning_is_false_before_start()
+ {
+ var options = new JetStreamOptions();
+ var svc = new JetStreamService(options);
+
+ svc.IsRunning.ShouldBeFalse();
+ }
+}
diff --git a/tests/NATS.Server.Tests/Raft/RaftBinaryWireFormatTests.cs b/tests/NATS.Server.Tests/Raft/RaftBinaryWireFormatTests.cs
new file mode 100644
index 0000000..9e05894
--- /dev/null
+++ b/tests/NATS.Server.Tests/Raft/RaftBinaryWireFormatTests.cs
@@ -0,0 +1,519 @@
+using NATS.Server.Raft;
+
+namespace NATS.Server.Tests.Raft;
+
+///
+/// Binary wire format encoding/decoding tests for all RAFT RPC types.
+/// These validate exact byte-for-byte fidelity with Go's raft.go encoding.
+/// Go reference: golang/nats-server/server/raft.go lines 2662-2796 (AppendEntry),
+/// 4560-4768 (vote types).
+///
+public class RaftBinaryWireFormatTests
+{
+ // ---------------------------------------------------------------------------
+ // VoteRequest
+ // ---------------------------------------------------------------------------
+
+ // Go: server/raft.go:4560-4568 — voteRequest.encode()
+ // Go: server/raft.go:4571-4583 — decodeVoteRequest()
+ [Fact]
+ public void VoteRequest_round_trip_encode_decode()
+ {
+ var original = new RaftVoteRequestWire(
+ Term: 7,
+ LastTerm: 3,
+ LastIndex: 42,
+ CandidateId: "peer0001");
+
+ var encoded = original.Encode();
+ encoded.Length.ShouldBe(RaftWireConstants.VoteRequestLen); // 32 bytes
+
+ var decoded = RaftVoteRequestWire.Decode(encoded);
+ decoded.Term.ShouldBe(7UL);
+ decoded.LastTerm.ShouldBe(3UL);
+ decoded.LastIndex.ShouldBe(42UL);
+ decoded.CandidateId.ShouldBe("peer0001");
+ }
+
+ [Fact]
+ public void VoteRequest_bytes_are_little_endian()
+ {
+ var req = new RaftVoteRequestWire(Term: 1, LastTerm: 0, LastIndex: 0, CandidateId: "");
+ var bytes = req.Encode();
+ // term = 1 in little-endian: [1, 0, 0, 0, 0, 0, 0, 0]
+ // Go: server/raft.go:4563 — le.PutUint64(buf[0:], vr.term)
+ bytes[0].ShouldBe((byte)1);
+ bytes[1].ShouldBe((byte)0);
+ }
+
+ [Fact]
+ public void VoteRequest_zero_values_encode_to_zeroed_buffer()
+ {
+ var req = new RaftVoteRequestWire(Term: 0, LastTerm: 0, LastIndex: 0, CandidateId: "");
+ var bytes = req.Encode();
+ bytes.Length.ShouldBe(32);
+ bytes.ShouldAllBe(b => b == 0);
+ }
+
+ [Fact]
+ public void VoteRequest_large_term_round_trips()
+ {
+ var req = new RaftVoteRequestWire(
+ Term: ulong.MaxValue,
+ LastTerm: ulong.MaxValue - 1,
+ LastIndex: ulong.MaxValue - 2,
+ CandidateId: "node1234");
+
+ var decoded = RaftVoteRequestWire.Decode(req.Encode());
+ decoded.Term.ShouldBe(ulong.MaxValue);
+ decoded.LastTerm.ShouldBe(ulong.MaxValue - 1);
+ decoded.LastIndex.ShouldBe(ulong.MaxValue - 2);
+ decoded.CandidateId.ShouldBe("node1234");
+ }
+
+ [Fact]
+ public void VoteRequest_short_buffer_throws_ArgumentException()
+ {
+ var shortBuffer = new byte[RaftWireConstants.VoteRequestLen - 1];
+ Should.Throw(() => RaftVoteRequestWire.Decode(shortBuffer));
+ }
+
+ [Fact]
+ public void VoteRequest_long_buffer_throws_ArgumentException()
+ {
+ var longBuffer = new byte[RaftWireConstants.VoteRequestLen + 1];
+ Should.Throw(() => RaftVoteRequestWire.Decode(longBuffer));
+ }
+
+ [Fact]
+ public void VoteRequest_candidate_id_truncated_to_8_bytes()
+ {
+ // IDs longer than 8 chars are silently truncated (Go copy semantics).
+ // Go: server/raft.go:4566 — copy(buf[24:24+idLen], vr.candidate)
+ var req = new RaftVoteRequestWire(
+ Term: 1, LastTerm: 0, LastIndex: 0,
+ CandidateId: "abcdefghXXXXXXXX"); // 16 chars; only first 8 kept
+
+ var bytes = req.Encode();
+ // Check that the ID field contains only the first 8 chars.
+ var idBytes = bytes[24..32];
+ System.Text.Encoding.ASCII.GetString(idBytes).ShouldBe("abcdefgh");
+ }
+
+ [Fact]
+ public void VoteRequest_short_candidate_id_zero_padded()
+ {
+ var req = new RaftVoteRequestWire(
+ Term: 1, LastTerm: 0, LastIndex: 0, CandidateId: "abc");
+
+ var bytes = req.Encode();
+ bytes[27].ShouldBe((byte)0); // byte 3..7 should be zero
+ bytes[28].ShouldBe((byte)0);
+
+ // Decode should recover the original 3-char ID.
+ var decoded = RaftVoteRequestWire.Decode(bytes);
+ decoded.CandidateId.ShouldBe("abc");
+ }
+
+ // ---------------------------------------------------------------------------
+ // VoteResponse
+ // ---------------------------------------------------------------------------
+
+ // Go: server/raft.go:4739-4751 — voteResponse.encode()
+ // Go: server/raft.go:4753-4762 — decodeVoteResponse()
+ [Fact]
+ public void VoteResponse_granted_true_round_trip()
+ {
+ var resp = new RaftVoteResponseWire(Term: 5, PeerId: "peer0002", Granted: true);
+ var decoded = RaftVoteResponseWire.Decode(resp.Encode());
+
+ decoded.Term.ShouldBe(5UL);
+ decoded.PeerId.ShouldBe("peer0002");
+ decoded.Granted.ShouldBeTrue();
+ decoded.Empty.ShouldBeFalse();
+ }
+
+ [Fact]
+ public void VoteResponse_granted_false_round_trip()
+ {
+ var resp = new RaftVoteResponseWire(Term: 3, PeerId: "peer0003", Granted: false);
+ var decoded = RaftVoteResponseWire.Decode(resp.Encode());
+
+ decoded.Granted.ShouldBeFalse();
+ decoded.PeerId.ShouldBe("peer0003");
+ }
+
+ [Fact]
+ public void VoteResponse_empty_flag_round_trip()
+ {
+ // Go: server/raft.go:4746-4748 — buf[16] |= 2 when empty
+ var resp = new RaftVoteResponseWire(Term: 1, PeerId: "p1", Granted: false, Empty: true);
+ var decoded = RaftVoteResponseWire.Decode(resp.Encode());
+
+ decoded.Empty.ShouldBeTrue();
+ decoded.Granted.ShouldBeFalse();
+ }
+
+ [Fact]
+ public void VoteResponse_both_flags_set()
+ {
+ var resp = new RaftVoteResponseWire(Term: 1, PeerId: "p1", Granted: true, Empty: true);
+ var bytes = resp.Encode();
+
+ // Go: server/raft.go:4744-4748 — bit 0 = granted, bit 1 = empty
+ (bytes[16] & 1).ShouldBe(1); // granted
+ (bytes[16] & 2).ShouldBe(2); // empty
+
+ var decoded = RaftVoteResponseWire.Decode(bytes);
+ decoded.Granted.ShouldBeTrue();
+ decoded.Empty.ShouldBeTrue();
+ }
+
+ [Fact]
+ public void VoteResponse_fixed_17_bytes()
+ {
+ var resp = new RaftVoteResponseWire(Term: 10, PeerId: "peer0001", Granted: true);
+ resp.Encode().Length.ShouldBe(RaftWireConstants.VoteResponseLen); // 17
+ }
+
+ [Fact]
+ public void VoteResponse_short_buffer_throws_ArgumentException()
+ {
+ var shortBuffer = new byte[RaftWireConstants.VoteResponseLen - 1];
+ Should.Throw(() => RaftVoteResponseWire.Decode(shortBuffer));
+ }
+
+ [Fact]
+ public void VoteResponse_peer_id_truncated_to_8_bytes()
+ {
+ // Go: server/raft.go:4743 — copy(buf[8:], vr.peer)
+ var resp = new RaftVoteResponseWire(
+ Term: 1, PeerId: "longpeernamethatexceeds8chars", Granted: true);
+ var bytes = resp.Encode();
+
+ // Bytes [8..15] hold the peer ID — only first 8 chars fit.
+ var idBytes = bytes[8..16];
+ System.Text.Encoding.ASCII.GetString(idBytes).ShouldBe("longpeer");
+ }
+
+ // ---------------------------------------------------------------------------
+ // AppendEntry — zero entries
+ // ---------------------------------------------------------------------------
+
+ // Go: server/raft.go:2662-2711 — appendEntry.encode()
+ // Go: server/raft.go:2714-2746 — decodeAppendEntry()
+ [Fact]
+ public void AppendEntry_zero_entries_round_trip()
+ {
+ var ae = new RaftAppendEntryWire(
+ LeaderId: "lead0001",
+ Term: 10,
+ Commit: 8,
+ PrevTerm: 9,
+ PrevIndex: 7,
+ Entries: [],
+ LeaderTerm: 0);
+
+ var encoded = ae.Encode();
+ // Base length + 1-byte uvarint(0) for lterm.
+ // Go: server/raft.go:2681-2683 — lterm uvarint always appended
+ encoded.Length.ShouldBe(RaftWireConstants.AppendEntryBaseLen + 1);
+
+ var decoded = RaftAppendEntryWire.Decode(encoded);
+ decoded.LeaderId.ShouldBe("lead0001");
+ decoded.Term.ShouldBe(10UL);
+ decoded.Commit.ShouldBe(8UL);
+ decoded.PrevTerm.ShouldBe(9UL);
+ decoded.PrevIndex.ShouldBe(7UL);
+ decoded.Entries.Count.ShouldBe(0);
+ decoded.LeaderTerm.ShouldBe(0UL);
+ }
+
+ [Fact]
+ public void AppendEntry_base_layout_at_correct_offsets()
+ {
+ // Go: server/raft.go:2693-2698 — exact layout:
+ // [0..7]=leader [8..15]=term [16..23]=commit [24..31]=pterm [32..39]=pindex [40..41]=entryCount
+ var ae = new RaftAppendEntryWire(
+ LeaderId: "AAAAAAAA", // 0x41 x 8
+ Term: 1,
+ Commit: 2,
+ PrevTerm: 3,
+ PrevIndex: 4,
+ Entries: []);
+
+ var bytes = ae.Encode();
+
+ // leader bytes
+ bytes[0].ShouldBe((byte)'A');
+ bytes[7].ShouldBe((byte)'A');
+
+ // term = 1 LE
+ bytes[8].ShouldBe((byte)1);
+ bytes[9].ShouldBe((byte)0);
+
+ // commit = 2 LE
+ bytes[16].ShouldBe((byte)2);
+
+ // entryCount = 0
+ bytes[40].ShouldBe((byte)0);
+ bytes[41].ShouldBe((byte)0);
+ }
+
+ // ---------------------------------------------------------------------------
+ // AppendEntry — single entry
+ // ---------------------------------------------------------------------------
+
+ [Fact]
+ public void AppendEntry_single_entry_round_trip()
+ {
+ var data = "hello world"u8.ToArray();
+ var entry = new RaftEntryWire(RaftEntryType.Normal, data);
+
+ var ae = new RaftAppendEntryWire(
+ LeaderId: "leader01",
+ Term: 5,
+ Commit: 3,
+ PrevTerm: 4,
+ PrevIndex: 2,
+ Entries: [entry]);
+
+ var decoded = RaftAppendEntryWire.Decode(ae.Encode());
+ decoded.Entries.Count.ShouldBe(1);
+ decoded.Entries[0].Type.ShouldBe(RaftEntryType.Normal);
+ decoded.Entries[0].Data.ShouldBe(data);
+ }
+
+ [Fact]
+ public void AppendEntry_entry_size_field_equals_1_plus_data_length()
+ {
+ // Go: server/raft.go:2702 — le.AppendUint32(buf, uint32(1+len(e.Data)))
+ var data = new byte[10];
+ var entry = new RaftEntryWire(RaftEntryType.PeerState, data);
+
+ var ae = new RaftAppendEntryWire(
+ LeaderId: "ld", Term: 1, Commit: 0, PrevTerm: 0, PrevIndex: 0,
+ Entries: [entry]);
+
+ var bytes = ae.Encode();
+
+ // Entry starts at offset 42 (appendEntryBaseLen).
+ // First 4 bytes are the uint32 size = 1 + 10 = 11.
+ var sizeField = System.Buffers.Binary.BinaryPrimitives.ReadUInt32LittleEndian(bytes.AsSpan(42));
+ sizeField.ShouldBe(11u);
+
+ // Byte at offset 46 is the entry type.
+ bytes[46].ShouldBe((byte)RaftEntryType.PeerState);
+ }
+
+ // ---------------------------------------------------------------------------
+ // AppendEntry — multiple entries
+ // ---------------------------------------------------------------------------
+
+ [Fact]
+ public void AppendEntry_multiple_entries_round_trip()
+ {
+ var entries = new RaftEntryWire[]
+ {
+ new(RaftEntryType.Normal, "first"u8.ToArray()),
+ new(RaftEntryType.AddPeer, "second"u8.ToArray()),
+ new(RaftEntryType.RemovePeer, "third"u8.ToArray()),
+ };
+
+ var ae = new RaftAppendEntryWire(
+ LeaderId: "lead0001",
+ Term: 20,
+ Commit: 15,
+ PrevTerm: 19,
+ PrevIndex: 14,
+ Entries: entries);
+
+ var decoded = RaftAppendEntryWire.Decode(ae.Encode());
+ decoded.Entries.Count.ShouldBe(3);
+
+ decoded.Entries[0].Type.ShouldBe(RaftEntryType.Normal);
+ decoded.Entries[0].Data.ShouldBe("first"u8.ToArray());
+
+ decoded.Entries[1].Type.ShouldBe(RaftEntryType.AddPeer);
+ decoded.Entries[1].Data.ShouldBe("second"u8.ToArray());
+
+ decoded.Entries[2].Type.ShouldBe(RaftEntryType.RemovePeer);
+ decoded.Entries[2].Data.ShouldBe("third"u8.ToArray());
+ }
+
+ [Fact]
+ public void AppendEntry_50_entries_preserve_order()
+ {
+ var entries = Enumerable.Range(0, 50)
+ .Select(i => new RaftEntryWire(RaftEntryType.Normal, [(byte)i]))
+ .ToArray();
+
+ var ae = new RaftAppendEntryWire(
+ LeaderId: "lead0001", Term: 1, Commit: 0, PrevTerm: 0, PrevIndex: 0,
+ Entries: entries);
+
+ var decoded = RaftAppendEntryWire.Decode(ae.Encode());
+ decoded.Entries.Count.ShouldBe(50);
+
+ for (var i = 0; i < 50; i++)
+ decoded.Entries[i].Data[0].ShouldBe((byte)i);
+ }
+
+ [Fact]
+ public void AppendEntry_entry_with_empty_data_round_trips()
+ {
+ var entry = new RaftEntryWire(RaftEntryType.LeaderTransfer, []);
+ var ae = new RaftAppendEntryWire(
+ LeaderId: "ld", Term: 1, Commit: 0, PrevTerm: 0, PrevIndex: 0,
+ Entries: [entry]);
+
+ var decoded = RaftAppendEntryWire.Decode(ae.Encode());
+ decoded.Entries.Count.ShouldBe(1);
+ decoded.Entries[0].Data.Length.ShouldBe(0);
+ decoded.Entries[0].Type.ShouldBe(RaftEntryType.LeaderTransfer);
+ }
+
+ // ---------------------------------------------------------------------------
+ // AppendEntry — leaderTerm (uvarint tail)
+ // ---------------------------------------------------------------------------
+
+ // Go: server/raft.go:2709 — buf = append(buf, lterm...)
+ // Go: server/raft.go:2740-2743 — if lterm, n := binary.Uvarint(msg[ri:]); n > 0 ...
+ [Theory]
+ [InlineData(0UL)]
+ [InlineData(1UL)]
+ [InlineData(127UL)]
+ [InlineData(128UL)]
+ [InlineData(ulong.MaxValue)]
+ public void AppendEntry_leader_term_uvarint_round_trips(ulong lterm)
+ {
+ var ae = new RaftAppendEntryWire(
+ LeaderId: "lead0001", Term: 5, Commit: 3, PrevTerm: 4, PrevIndex: 2,
+ Entries: [],
+ LeaderTerm: lterm);
+
+ var decoded = RaftAppendEntryWire.Decode(ae.Encode());
+ decoded.LeaderTerm.ShouldBe(lterm);
+ }
+
+ // ---------------------------------------------------------------------------
+ // AppendEntry — error cases
+ // ---------------------------------------------------------------------------
+
+ [Fact]
+ public void AppendEntry_short_buffer_throws_ArgumentException()
+ {
+ // Buffer smaller than appendEntryBaseLen (42 bytes).
+ var shortBuffer = new byte[RaftWireConstants.AppendEntryBaseLen - 1];
+ Should.Throw(() => RaftAppendEntryWire.Decode(shortBuffer));
+ }
+
+ // ---------------------------------------------------------------------------
+ // AppendEntryResponse
+ // ---------------------------------------------------------------------------
+
+ // Go: server/raft.go:2777-2794 — appendEntryResponse.encode()
+ // Go: server/raft.go:2799-2817 — decodeAppendEntryResponse()
+ [Fact]
+ public void AppendEntryResponse_success_true_round_trip()
+ {
+ var resp = new RaftAppendEntryResponseWire(
+ Term: 12, Index: 99, PeerId: "follwr01", Success: true);
+
+ var encoded = resp.Encode();
+ encoded.Length.ShouldBe(RaftWireConstants.AppendEntryResponseLen); // 25
+
+ var decoded = RaftAppendEntryResponseWire.Decode(encoded);
+ decoded.Term.ShouldBe(12UL);
+ decoded.Index.ShouldBe(99UL);
+ decoded.PeerId.ShouldBe("follwr01");
+ decoded.Success.ShouldBeTrue();
+ }
+
+ [Fact]
+ public void AppendEntryResponse_success_false_round_trip()
+ {
+ var resp = new RaftAppendEntryResponseWire(
+ Term: 3, Index: 1, PeerId: "follwr02", Success: false);
+
+ var decoded = RaftAppendEntryResponseWire.Decode(resp.Encode());
+ decoded.Success.ShouldBeFalse();
+ decoded.PeerId.ShouldBe("follwr02");
+ }
+
+ [Fact]
+ public void AppendEntryResponse_success_byte_is_0_or_1()
+ {
+ // Go: server/raft.go:2815 — ar.success = msg[24] == 1
+ var yes = new RaftAppendEntryResponseWire(Term: 1, Index: 0, PeerId: "p", Success: true);
+ var no = new RaftAppendEntryResponseWire(Term: 1, Index: 0, PeerId: "p", Success: false);
+
+ yes.Encode()[24].ShouldBe((byte)1);
+ no.Encode()[24].ShouldBe((byte)0);
+ }
+
+ [Fact]
+ public void AppendEntryResponse_layout_at_correct_offsets()
+ {
+ // Go: server/raft.go:2784-2792 — exact layout:
+ // [0..7]=term [8..15]=index [16..23]=peer [24]=success
+ var resp = new RaftAppendEntryResponseWire(
+ Term: 1, Index: 2, PeerId: "BBBBBBBB", Success: true);
+ var bytes = resp.Encode();
+
+ bytes[0].ShouldBe((byte)1); // term LE
+ bytes[8].ShouldBe((byte)2); // index LE
+ bytes[16].ShouldBe((byte)'B'); // peer[0]
+ bytes[24].ShouldBe((byte)1); // success = 1
+ }
+
+ [Fact]
+ public void AppendEntryResponse_short_buffer_throws_ArgumentException()
+ {
+ var shortBuffer = new byte[RaftWireConstants.AppendEntryResponseLen - 1];
+ Should.Throw(() => RaftAppendEntryResponseWire.Decode(shortBuffer));
+ }
+
+ [Fact]
+ public void AppendEntryResponse_long_buffer_throws_ArgumentException()
+ {
+ var longBuffer = new byte[RaftWireConstants.AppendEntryResponseLen + 1];
+ Should.Throw(() => RaftAppendEntryResponseWire.Decode(longBuffer));
+ }
+
+ [Fact]
+ public void AppendEntryResponse_peer_id_truncated_to_8_bytes()
+ {
+ // Go: server/raft.go:2787 — copy(buf[16:16+idLen], ar.peer)
+ var resp = new RaftAppendEntryResponseWire(
+ Term: 1, Index: 0,
+ PeerId: "verylongpeeridthatexceeds8", Success: false);
+
+ var bytes = resp.Encode();
+ var idBytes = bytes[16..24];
+ System.Text.Encoding.ASCII.GetString(idBytes).ShouldBe("verylong");
+ }
+
+ // ---------------------------------------------------------------------------
+ // Wire constant values
+ // ---------------------------------------------------------------------------
+
+ [Fact]
+ public void Wire_constants_match_go_definitions()
+ {
+ // Go: server/raft.go:4558 — voteRequestLen = 24 + idLen = 32
+ RaftWireConstants.VoteRequestLen.ShouldBe(32);
+
+ // Go: server/raft.go:4737 — voteResponseLen = 8 + 8 + 1 = 17
+ RaftWireConstants.VoteResponseLen.ShouldBe(17);
+
+ // Go: server/raft.go:2660 — appendEntryBaseLen = idLen + 4*8 + 2 = 42
+ RaftWireConstants.AppendEntryBaseLen.ShouldBe(42);
+
+ // Go: server/raft.go:2757 — appendEntryResponseLen = 24 + 1 = 25
+ RaftWireConstants.AppendEntryResponseLen.ShouldBe(25);
+
+ // Go: server/raft.go:2756 — idLen = 8
+ RaftWireConstants.IdLen.ShouldBe(8);
+ }
+}