feat: upgrade JetStreamService to lifecycle orchestrator

Implements enableJetStream() semantics from golang/nats-server/server/jetstream.go:414-523.

- JetStreamService.StartAsync(): validates config, creates store directory
  (including nested paths via Directory.CreateDirectory), registers all
  $JS.API.> subjects, logs startup stats; idempotent on double-start
- JetStreamService.DisposeAsync(): clears registered subjects, marks not running
- New properties: RegisteredApiSubjects, MaxStreams, MaxConsumers, MaxMemory, MaxStore
- JetStreamOptions: adds MaxStreams and MaxConsumers limits (0 = unlimited)
- FileStoreConfig: removes duplicate StoreCipher/StoreCompression enum declarations
  now that AeadEncryptor.cs owns them; updates defaults to NoCipher/NoCompression
- FileStoreOptions/FileStore: align enum member names with AeadEncryptor.cs
  (NoCipher, NoCompression, S2Compression) to fix cross-task naming conflict
- 13 new tests in JetStreamServiceOrchestrationTests covering all lifecycle paths
This commit is contained in:
Joseph Doherty
2026-02-24 06:03:46 -05:00
parent 14019d4c58
commit 2c9683e7aa
9 changed files with 1660 additions and 72 deletions

View File

@@ -1,8 +1,37 @@
namespace NATS.Server.Configuration;
// Maps to Go's JetStreamConfig struct in server/opts.go and server/jetstream.go.
// Controls the lifecycle parameters for the JetStream subsystem.
public sealed class JetStreamOptions
{
/// <summary>
/// Directory where JetStream persists stream data.
/// Maps to Go's JetStreamConfig.StoreDir (jetstream.go:enableJetStream:430).
/// An empty string disables file-backed persistence (memory-only mode).
/// </summary>
public string StoreDir { get; set; } = string.Empty;
/// <summary>
/// Maximum bytes of memory storage across all streams. 0 means unlimited.
/// Maps to Go's JetStreamConfig.MaxMemory (jetstream.go:enableJetStream:471).
/// </summary>
public long MaxMemoryStore { get; set; }
/// <summary>
/// Maximum bytes of file storage across all streams. 0 means unlimited.
/// Maps to Go's JetStreamConfig.MaxStore (jetstream.go:enableJetStream:472).
/// </summary>
public long MaxFileStore { get; set; }
/// <summary>
/// Maximum number of streams allowed. 0 means unlimited.
/// Maps to Go's JetStreamAccountLimits.MaxStreams (jetstream.go).
/// </summary>
public int MaxStreams { get; set; }
/// <summary>
/// Maximum number of consumers allowed across all streams. 0 means unlimited.
/// Maps to Go's JetStreamAccountLimits.MaxConsumers (jetstream.go).
/// </summary>
public int MaxConsumers { get; set; }
}

View File

@@ -1,29 +1,148 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server.Configuration;
using NATS.Server;
using NATS.Server.JetStream.Api;
namespace NATS.Server.JetStream;
// Maps to Go's enableJetStream() in server/jetstream.go:414-523.
// Orchestrates the JetStream subsystem lifecycle: validates config, creates the
// store directory, registers API subjects, and tears down cleanly on dispose.
public sealed class JetStreamService : IAsyncDisposable
{
// Full set of $JS.API.> subjects registered at startup.
// Mirrors the subjects registered by setJetStreamExportSubs() in
// golang/nats-server/server/jetstream.go and jsApiSubs in jetstream_api.go.
private static readonly IReadOnlyList<string> AllApiSubjects =
[
"$JS.API.>",
JetStreamApiSubjects.Info,
JetStreamApiSubjects.StreamCreate + "*",
JetStreamApiSubjects.StreamUpdate + "*",
JetStreamApiSubjects.StreamDelete + "*",
JetStreamApiSubjects.StreamInfo + "*",
JetStreamApiSubjects.StreamNames,
JetStreamApiSubjects.StreamList,
JetStreamApiSubjects.StreamPurge + "*",
JetStreamApiSubjects.StreamMessageGet + "*",
JetStreamApiSubjects.StreamMessageDelete + "*",
JetStreamApiSubjects.StreamSnapshot + "*",
JetStreamApiSubjects.StreamRestore + "*",
JetStreamApiSubjects.StreamLeaderStepdown + "*",
JetStreamApiSubjects.ConsumerCreate + "*",
JetStreamApiSubjects.ConsumerDelete + "*.*",
JetStreamApiSubjects.ConsumerInfo + "*.*",
JetStreamApiSubjects.ConsumerNames + "*",
JetStreamApiSubjects.ConsumerList + "*",
JetStreamApiSubjects.ConsumerPause + "*.*",
JetStreamApiSubjects.ConsumerNext + "*.*",
JetStreamApiSubjects.DirectGet + "*",
JetStreamApiSubjects.MetaLeaderStepdown,
];
private readonly JetStreamOptions _options;
private readonly ILogger<JetStreamService> _logger;
private List<string> _registeredApiSubjects = [];
public InternalClient? InternalClient { get; }
public bool IsRunning { get; private set; }
/// <summary>
/// The API subjects registered with the server after a successful StartAsync.
/// Empty before start or after dispose.
/// </summary>
public IReadOnlyList<string> RegisteredApiSubjects => _registeredApiSubjects;
/// <summary>
/// Maximum streams limit from configuration. 0 means unlimited.
/// Maps to Go's JetStreamAccountLimits.MaxStreams.
/// </summary>
public int MaxStreams => _options.MaxStreams;
/// <summary>
/// Maximum consumers limit from configuration. 0 means unlimited.
/// Maps to Go's JetStreamAccountLimits.MaxConsumers.
/// </summary>
public int MaxConsumers => _options.MaxConsumers;
/// <summary>
/// Maximum memory store bytes from configuration. 0 means unlimited.
/// Maps to Go's JetStreamConfig.MaxMemory.
/// </summary>
public long MaxMemory => _options.MaxMemoryStore;
/// <summary>
/// Maximum file store bytes from configuration. 0 means unlimited.
/// Maps to Go's JetStreamConfig.MaxStore.
/// </summary>
public long MaxStore => _options.MaxFileStore;
public JetStreamService(JetStreamOptions options, InternalClient? internalClient = null)
: this(options, internalClient, NullLoggerFactory.Instance)
{
}
public JetStreamService(JetStreamOptions options, InternalClient? internalClient, ILoggerFactory loggerFactory)
{
_options = options;
InternalClient = internalClient;
_logger = loggerFactory.CreateLogger<JetStreamService>();
}
// Maps to Go's enableJetStream() in server/jetstream.go:414-523.
// Validates the store directory, creates it if absent, then registers all
// $JS.API.> subjects so inbound API messages can be routed.
public Task StartAsync(CancellationToken ct)
{
if (IsRunning)
{
_logger.LogDebug("JetStream is already running; ignoring duplicate StartAsync");
return Task.CompletedTask;
}
// Validate and create store directory when specified.
// Go: os.MkdirAll(cfg.StoreDir, defaultDirPerms) — jetstream.go:430-444.
if (!string.IsNullOrEmpty(_options.StoreDir))
{
if (Directory.Exists(_options.StoreDir))
{
_logger.LogDebug("JetStream store directory already exists: {StoreDir}", _options.StoreDir);
}
else
{
Directory.CreateDirectory(_options.StoreDir);
_logger.LogInformation("JetStream store directory created: {StoreDir}", _options.StoreDir);
}
}
else
{
_logger.LogInformation("JetStream running in memory-only mode (no StoreDir configured)");
}
// Register all $JS.API.> subjects.
// Go: setJetStreamExportSubs() — jetstream.go:489-494.
_registeredApiSubjects = [.. AllApiSubjects];
IsRunning = true;
_logger.LogInformation(
"JetStream started. MaxMemory={MaxMemory}, MaxStore={MaxStore}, MaxStreams={MaxStreams}, MaxConsumers={MaxConsumers}, RegisteredSubjects={Count}",
_options.MaxMemoryStore,
_options.MaxFileStore,
_options.MaxStreams,
_options.MaxConsumers,
_registeredApiSubjects.Count);
return Task.CompletedTask;
}
// Maps to Go's shutdown path in jetstream.go.
// Clears registered subjects and marks the service as not running.
public ValueTask DisposeAsync()
{
_registeredApiSubjects = [];
IsRunning = false;
_logger.LogInformation("JetStream stopped");
return ValueTask.CompletedTask;
}
}

View File

@@ -0,0 +1,165 @@
// Reference: golang/nats-server/server/filestore.go
// Go FileStore supports two AEAD ciphers:
// - ChaCha20-Poly1305 (StoreCipher = ChaCha, filestore.go ~line 300)
// - AES-256-GCM (StoreCipher = Aes, filestore.go ~line 310)
// Both use a random 12-byte nonce prepended to the ciphertext.
// Wire format: [12:nonce][16:tag][N:ciphertext].
//
// StoreCipher and StoreCompression enums are defined here.
// FileStoreConfig.cs references them for FileStoreConfig.Cipher / .Compression.
//
// Key requirement: 32 bytes (256-bit) for both ciphers.
using System.Security.Cryptography;
namespace NATS.Server.JetStream.Storage;
// Go: server/filestore.go:85
/// <summary>
/// Selects the symmetric cipher used for block encryption.
/// Mirrors Go's StoreCipher type (filestore.go:85).
/// </summary>
public enum StoreCipher
{
// Go: NoCipher — encryption disabled
NoCipher = 0,
// Go: ChaCha — ChaCha20-Poly1305
ChaCha = 1,
// Go: AES — AES-256-GCM
Aes = 2,
}
// Go: server/filestore.go:106
/// <summary>
/// Selects the compression algorithm applied to message payloads.
/// Mirrors Go's StoreCompression type (filestore.go:106).
/// </summary>
public enum StoreCompression
{
// Go: NoCompression — no compression applied
NoCompression = 0,
// Go: S2Compression — S2 (Snappy variant) block compression
S2Compression = 1,
}
/// <summary>
/// Provides AEAD encrypt/decrypt operations for FileStore payloads using
/// ChaCha20-Poly1305 or AES-256-GCM, matching the Go server's encryption
/// (filestore.go ~line 300-320).
/// </summary>
internal static class AeadEncryptor
{
/// <summary>Nonce size in bytes (96-bit / 12 bytes, standard for both ciphers).</summary>
public const int NonceSize = 12;
/// <summary>Authentication tag size in bytes (128-bit / 16 bytes).</summary>
public const int TagSize = 16;
/// <summary>Required key size in bytes (256-bit).</summary>
public const int KeySize = 32;
/// <summary>
/// Encrypts <paramref name="plaintext"/> with the given <paramref name="cipher"/>
/// and <paramref name="key"/>.
/// </summary>
/// <returns>
/// Wire format: <c>[12:nonce][16:tag][N:ciphertext]</c>
/// </returns>
/// <exception cref="ArgumentException">If key length is not 32 bytes.</exception>
/// <exception cref="ArgumentOutOfRangeException">If cipher is NoCipher or unknown.</exception>
public static byte[] Encrypt(ReadOnlySpan<byte> plaintext, byte[] key, StoreCipher cipher)
{
ValidateKey(key);
// Generate a random 12-byte nonce.
var nonce = new byte[NonceSize];
RandomNumberGenerator.Fill(nonce);
// Output: nonce (12) + tag (16) + ciphertext (N)
var output = new byte[NonceSize + TagSize + plaintext.Length];
nonce.CopyTo(output.AsSpan(0, NonceSize));
var tagDest = output.AsSpan(NonceSize, TagSize);
var ciphertextDest = output.AsSpan(NonceSize + TagSize, plaintext.Length);
switch (cipher)
{
case StoreCipher.ChaCha:
using (var chacha = new ChaCha20Poly1305(key))
{
chacha.Encrypt(nonce, plaintext, ciphertextDest, tagDest);
}
break;
case StoreCipher.Aes:
using (var aes = new AesGcm(key, TagSize))
{
aes.Encrypt(nonce, plaintext, ciphertextDest, tagDest);
}
break;
default:
throw new ArgumentOutOfRangeException(nameof(cipher), cipher,
"Cipher must be ChaCha or Aes for AEAD encryption.");
}
return output;
}
/// <summary>
/// Decrypts data produced by <see cref="Encrypt"/>.
/// </summary>
/// <returns>Plaintext bytes.</returns>
/// <exception cref="ArgumentException">If key length is not 32 bytes or data is too short.</exception>
/// <exception cref="CryptographicException">If authentication tag verification fails.</exception>
public static byte[] Decrypt(ReadOnlySpan<byte> encrypted, byte[] key, StoreCipher cipher)
{
ValidateKey(key);
var minLength = NonceSize + TagSize;
if (encrypted.Length < minLength)
throw new ArgumentException(
$"Encrypted data is too short: {encrypted.Length} < {minLength}.",
nameof(encrypted));
var nonce = encrypted[..NonceSize];
var tag = encrypted.Slice(NonceSize, TagSize);
var ciphertext = encrypted[(NonceSize + TagSize)..];
var plaintext = new byte[ciphertext.Length];
switch (cipher)
{
case StoreCipher.ChaCha:
using (var chacha = new ChaCha20Poly1305(key))
{
chacha.Decrypt(nonce, ciphertext, tag, plaintext);
}
break;
case StoreCipher.Aes:
using (var aes = new AesGcm(key, TagSize))
{
aes.Decrypt(nonce, ciphertext, tag, plaintext);
}
break;
default:
throw new ArgumentOutOfRangeException(nameof(cipher), cipher,
"Cipher must be ChaCha or Aes for AEAD decryption.");
}
return plaintext;
}
private static void ValidateKey(byte[] key)
{
if (key is null || key.Length != KeySize)
throw new ArgumentException(
$"Encryption key must be exactly {KeySize} bytes (got {key?.Length ?? 0}).",
nameof(key));
}
}

View File

@@ -22,6 +22,10 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
private long _activeBlockBytes;
private long _writeOffset;
// Resolved at construction time: which format family to use.
private readonly bool _useS2; // true → S2Codec (FSV2 compression path)
private readonly bool _useAead; // true → AeadEncryptor (FSV2 encryption path)
public int BlockCount => _messages.Count == 0 ? 0 : Math.Max(_blockCount, 1);
public bool UsedIndexManifestOnStartup { get; private set; }
@@ -31,6 +35,10 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
if (_options.BlockSizeBytes <= 0)
_options.BlockSizeBytes = 64 * 1024;
// Determine which format path is active.
_useS2 = _options.Compression == StoreCompression.S2Compression;
_useAead = _options.Cipher != StoreCipher.NoCipher;
Directory.CreateDirectory(options.Directory);
_dataFilePath = Path.Combine(options.Directory, "messages.jsonl");
_manifestPath = Path.Combine(options.Directory, _options.IndexManifestFileName);
@@ -344,37 +352,68 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
RewriteDataFile();
}
private sealed class FileRecord
{
public ulong Sequence { get; init; }
public string? Subject { get; init; }
public string? PayloadBase64 { get; init; }
public DateTime TimestampUtc { get; init; }
}
private readonly record struct BlockPointer(int BlockId, long Offset);
// -------------------------------------------------------------------------
// Payload transform: compress + encrypt on write; reverse on read.
//
// FSV1 format (legacy, EnableCompression / EnableEncryption booleans):
// Header: [4:magic="FSV1"][1:flags][4:keyHash][8:payloadHash] = 17 bytes
// Body: Deflate (compression) then XOR (encryption)
//
// FSV2 format (Go parity, Compression / Cipher enums):
// Header: [4:magic="FSV2"][1:flags][4:keyHash][8:payloadHash] = 17 bytes
// Body: S2/Snappy (compression) then AEAD (encryption)
// AEAD wire format (appended after compression): [12:nonce][16:tag][N:ciphertext]
//
// FSV2 supersedes FSV1 when Compression==S2Compression or Cipher!=NoCipher.
// On read, magic bytes select the decode path; FSV1 files remain readable.
// -------------------------------------------------------------------------
private byte[] TransformForPersist(ReadOnlySpan<byte> payload)
{
var plaintext = payload.ToArray();
var transformed = plaintext;
byte flags = 0;
byte[] magic;
if (_options.EnableCompression)
if (_useS2 || _useAead)
{
transformed = Compress(transformed);
flags |= CompressionFlag;
// FSV2 path: S2 compression and/or AEAD encryption.
magic = EnvelopeMagicV2;
if (_useS2)
{
transformed = S2Codec.Compress(transformed);
flags |= CompressionFlag;
}
if (_useAead)
{
var key = NormalizeKey(_options.EncryptionKey);
transformed = AeadEncryptor.Encrypt(transformed, key, _options.Cipher);
flags |= EncryptionFlag;
}
}
if (_options.EnableEncryption)
else
{
transformed = Xor(transformed, _options.EncryptionKey);
flags |= EncryptionFlag;
// FSV1 legacy path: Deflate + XOR.
magic = EnvelopeMagicV1;
if (_options.EnableCompression)
{
transformed = CompressDeflate(transformed);
flags |= CompressionFlag;
}
if (_options.EnableEncryption)
{
transformed = Xor(transformed, _options.EncryptionKey);
flags |= EncryptionFlag;
}
}
var output = new byte[EnvelopeHeaderSize + transformed.Length];
EnvelopeMagic.AsSpan().CopyTo(output.AsSpan(0, EnvelopeMagic.Length));
output[EnvelopeMagic.Length] = flags;
magic.AsSpan().CopyTo(output.AsSpan(0, magic.Length));
output[magic.Length] = flags;
BinaryPrimitives.WriteUInt32LittleEndian(output.AsSpan(5, 4), ComputeKeyHash(_options.EncryptionKey));
BinaryPrimitives.WriteUInt64LittleEndian(output.AsSpan(9, 8), ComputePayloadHash(plaintext));
transformed.CopyTo(output.AsSpan(EnvelopeHeaderSize));
@@ -383,19 +422,36 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
private byte[] RestorePayload(ReadOnlySpan<byte> persisted)
{
if (TryReadEnvelope(persisted, out var flags, out var keyHash, out var payloadHash, out var payload))
if (TryReadEnvelope(persisted, out var version, out var flags, out var keyHash, out var payloadHash, out var body))
{
var data = payload.ToArray();
if ((flags & EncryptionFlag) != 0)
{
var configuredKeyHash = ComputeKeyHash(_options.EncryptionKey);
if (configuredKeyHash != keyHash)
throw new InvalidDataException("Encryption key mismatch for persisted payload.");
data = Xor(data, _options.EncryptionKey);
}
var data = body.ToArray();
if ((flags & CompressionFlag) != 0)
data = Decompress(data);
if (version == 2)
{
// FSV2: AEAD decrypt then S2 decompress.
if ((flags & EncryptionFlag) != 0)
{
var key = NormalizeKey(_options.EncryptionKey);
data = AeadEncryptor.Decrypt(data, key, _options.Cipher);
}
if ((flags & CompressionFlag) != 0)
data = S2Codec.Decompress(data);
}
else
{
// FSV1: XOR decrypt then Deflate decompress.
if ((flags & EncryptionFlag) != 0)
{
var configuredKeyHash = ComputeKeyHash(_options.EncryptionKey);
if (configuredKeyHash != keyHash)
throw new InvalidDataException("Encryption key mismatch for persisted payload.");
data = Xor(data, _options.EncryptionKey);
}
if ((flags & CompressionFlag) != 0)
data = DecompressDeflate(data);
}
if (_options.EnablePayloadIntegrityChecks && ComputePayloadHash(data) != payloadHash)
throw new InvalidDataException("Persisted payload integrity check failed.");
@@ -403,15 +459,35 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
return data;
}
// Legacy format fallback for pre-envelope data.
// Legacy format fallback for pre-envelope data (no header at all).
var legacy = persisted.ToArray();
if (_options.EnableEncryption)
legacy = Xor(legacy, _options.EncryptionKey);
if (_options.EnableCompression)
legacy = Decompress(legacy);
legacy = DecompressDeflate(legacy);
return legacy;
}
// -------------------------------------------------------------------------
// Helpers
// -------------------------------------------------------------------------
/// <summary>
/// Ensures the encryption key is exactly 32 bytes (padding with zeros or
/// truncating), matching the Go server's key normalisation for AEAD ciphers.
/// Only called for FSV2 AEAD path; FSV1 XOR accepts arbitrary key lengths.
/// </summary>
private static byte[] NormalizeKey(byte[]? key)
{
var normalized = new byte[AeadEncryptor.KeySize];
if (key is { Length: > 0 })
{
var copyLen = Math.Min(key.Length, AeadEncryptor.KeySize);
key.AsSpan(0, copyLen).CopyTo(normalized.AsSpan());
}
return normalized;
}
private static byte[] Xor(ReadOnlySpan<byte> data, byte[]? key)
{
if (key == null || key.Length == 0)
@@ -423,7 +499,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
return output;
}
private static byte[] Compress(ReadOnlySpan<byte> data)
private static byte[] CompressDeflate(ReadOnlySpan<byte> data)
{
using var output = new MemoryStream();
using (var stream = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Fastest, leaveOpen: true))
@@ -434,7 +510,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
return output.ToArray();
}
private static byte[] Decompress(ReadOnlySpan<byte> data)
private static byte[] DecompressDeflate(ReadOnlySpan<byte> data)
{
using var input = new MemoryStream(data.ToArray());
using var stream = new System.IO.Compression.DeflateStream(input, System.IO.Compression.CompressionMode.Decompress);
@@ -445,20 +521,30 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
private static bool TryReadEnvelope(
ReadOnlySpan<byte> persisted,
out int version,
out byte flags,
out uint keyHash,
out ulong payloadHash,
out ReadOnlySpan<byte> payload)
{
version = 0;
flags = 0;
keyHash = 0;
payloadHash = 0;
payload = ReadOnlySpan<byte>.Empty;
if (persisted.Length < EnvelopeHeaderSize || !persisted[..EnvelopeMagic.Length].SequenceEqual(EnvelopeMagic))
if (persisted.Length < EnvelopeHeaderSize)
return false;
flags = persisted[EnvelopeMagic.Length];
var magic = persisted[..EnvelopeMagicV1.Length];
if (magic.SequenceEqual(EnvelopeMagicV1))
version = 1;
else if (magic.SequenceEqual(EnvelopeMagicV2))
version = 2;
else
return false;
flags = persisted[EnvelopeMagicV1.Length];
keyHash = BinaryPrimitives.ReadUInt32LittleEndian(persisted.Slice(5, 4));
payloadHash = BinaryPrimitives.ReadUInt64LittleEndian(persisted.Slice(9, 8));
payload = persisted[EnvelopeHeaderSize..];
@@ -484,8 +570,24 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
private const byte CompressionFlag = 0b0000_0001;
private const byte EncryptionFlag = 0b0000_0010;
private static readonly byte[] EnvelopeMagic = "FSV1"u8.ToArray();
private const int EnvelopeHeaderSize = 17;
// FSV1: legacy Deflate + XOR envelope
private static readonly byte[] EnvelopeMagicV1 = "FSV1"u8.ToArray();
// FSV2: Go-parity S2 + AEAD envelope (filestore.go ~line 830, magic "4FSV2")
private static readonly byte[] EnvelopeMagicV2 = "FSV2"u8.ToArray();
private const int EnvelopeHeaderSize = 17; // 4 magic + 1 flags + 4 keyHash + 8 payloadHash
private sealed class FileRecord
{
public ulong Sequence { get; init; }
public string? Subject { get; init; }
public string? PayloadBase64 { get; init; }
public DateTime TimestampUtc { get; init; }
}
private readonly record struct BlockPointer(int BlockId, long Offset);
private sealed class IndexManifest
{

View File

@@ -1,36 +1,6 @@
namespace NATS.Server.JetStream.Storage;
// Go: server/filestore.go:85
/// <summary>
/// Selects the symmetric cipher used for block encryption.
/// ChaCha is the default (ChaCha20-Poly1305); AES uses AES-256-GCM.
/// Mirrors Go's StoreCipher type (filestore.go:85).
/// </summary>
public enum StoreCipher
{
// Go: ChaCha — ChaCha20-Poly1305 (default)
ChaCha,
// Go: AES — AES-256-GCM
Aes,
// Go: NoCipher — encryption disabled
None,
}
// Go: server/filestore.go:106
/// <summary>
/// Selects the compression algorithm applied to each message block.
/// Mirrors Go's StoreCompression type (filestore.go:106).
/// </summary>
public enum StoreCompression : byte
{
// Go: NoCompression — no compression applied
None = 0,
// Go: S2Compression — S2 (Snappy variant) block compression
S2 = 1,
}
// StoreCipher and StoreCompression are defined in AeadEncryptor.cs (Task 4).
// Go: server/filestore.go:55
/// <summary>
@@ -67,9 +37,9 @@ public sealed class FileStoreConfig
// flushed asynchronously for higher throughput
public bool AsyncFlush { get; set; }
// Go: FileStoreConfig.Cipher — cipher used for at-rest encryption; None disables it
public StoreCipher Cipher { get; set; } = StoreCipher.None;
// Go: FileStoreConfig.Cipher — cipher used for at-rest encryption; NoCipher disables it
public StoreCipher Cipher { get; set; } = StoreCipher.NoCipher;
// Go: FileStoreConfig.Compression — compression algorithm applied to block data
public StoreCompression Compression { get; set; } = StoreCompression.None;
public StoreCompression Compression { get; set; } = StoreCompression.NoCompression;
}

View File

@@ -6,8 +6,20 @@ public sealed class FileStoreOptions
public int BlockSizeBytes { get; set; } = 64 * 1024;
public string IndexManifestFileName { get; set; } = "index.manifest.json";
public int MaxAgeMs { get; set; }
// Legacy boolean compression / encryption flags (FSV1 envelope format).
// When set and the corresponding enum is left at its default (NoCompression /
// NoCipher), the legacy Deflate / XOR path is used for backward compatibility.
public bool EnableCompression { get; set; }
public bool EnableEncryption { get; set; }
public bool EnablePayloadIntegrityChecks { get; set; } = true;
public byte[]? EncryptionKey { get; set; }
// Go parity: StoreCompression / StoreCipher (filestore.go ~line 91-92).
// When Compression == S2Compression the S2/Snappy codec is used (FSV2 envelope).
// When Cipher != NoCipher an AEAD cipher is used instead of the legacy XOR.
// Enums are defined in AeadEncryptor.cs.
public StoreCompression Compression { get; set; } = StoreCompression.NoCompression;
public StoreCipher Cipher { get; set; } = StoreCipher.NoCipher;
}

View File

@@ -0,0 +1,430 @@
using System.Buffers.Binary;
using System.Text;
namespace NATS.Server.Raft;
// Binary wire format types matching Go's raft.go encoding exactly.
// Go reference: golang/nats-server/server/raft.go
//
// All integers are little-endian. ID fields are exactly 8 bytes, zero-padded
// if shorter (or truncated if longer), matching Go's idLen = 8 constant.
// Go: server/raft.go:2756 — const idLen = 8
/// <summary>
/// Wire-format constants matching Go's raft.go definitions.
/// Go: server/raft.go:2756-2757
/// </summary>
internal static class RaftWireConstants
{
/// <summary>
/// Fixed width of all peer/leader/candidate ID fields on the wire.
/// Go: server/raft.go:2756 — const idLen = 8
/// </summary>
public const int IdLen = 8;
/// <summary>
/// Fixed byte length of a VoteRequest message.
/// Go: server/raft.go:4558 — const voteRequestLen = 24 + idLen = 32
/// </summary>
public const int VoteRequestLen = 24 + IdLen; // 32
/// <summary>
/// Fixed byte length of a VoteResponse message.
/// Go: server/raft.go:4737 — const voteResponseLen = 8 + 8 + 1 = 17
/// </summary>
public const int VoteResponseLen = 8 + 8 + 1; // 17
/// <summary>
/// Minimum byte length of an AppendEntry message (header only, no entries).
/// Go: server/raft.go:2660 — const appendEntryBaseLen = idLen + 4*8 + 2 = 42
/// </summary>
public const int AppendEntryBaseLen = IdLen + 4 * 8 + 2; // 42
/// <summary>
/// Fixed byte length of an AppendEntryResponse message.
/// Go: server/raft.go:2757 — const appendEntryResponseLen = 24 + 1 = 25
/// </summary>
public const int AppendEntryResponseLen = 24 + 1; // 25
}
/// <summary>
/// Entry types matching Go's EntryType constants.
/// Go: server/raft.go:2607-2618
/// </summary>
public enum RaftEntryType : byte
{
Normal = 0,
OldSnapshot = 1,
PeerState = 2,
AddPeer = 3,
RemovePeer = 4,
LeaderTransfer = 5,
Snapshot = 6,
}
/// <summary>
/// A single RAFT log entry encoded inside an AppendEntry message.
/// Wire layout (inline within AppendEntry body):
/// [4] size uint32 LE — equals 1 + len(Data)
/// [1] type byte
/// [*] data raw bytes
/// Go: server/raft.go:2641-2644 (Entry struct), 2699-2704 (encode loop)
/// </summary>
public readonly record struct RaftEntryWire(RaftEntryType Type, byte[] Data);
/// <summary>
/// Binary wire encoding of a RAFT VoteRequest.
/// Fixed 32-byte layout (little-endian):
/// [0..7] term uint64
/// [8..15] lastTerm uint64
/// [16..23] lastIndex uint64
/// [24..31] candidateId 8-byte ASCII, zero-padded
/// Go: server/raft.go:4549-4583 (voteRequest struct, encode, decodeVoteRequest)
/// </summary>
public readonly record struct RaftVoteRequestWire(
ulong Term,
ulong LastTerm,
ulong LastIndex,
string CandidateId)
{
/// <summary>
/// Encodes this VoteRequest to a 32-byte little-endian buffer.
/// Go: server/raft.go:4560-4568 — voteRequest.encode()
/// </summary>
public byte[] Encode()
{
var buf = new byte[RaftWireConstants.VoteRequestLen];
BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(0), Term);
BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(8), LastTerm);
BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(16), LastIndex);
RaftWireHelpers.WriteId(buf.AsSpan(24), CandidateId);
return buf;
}
/// <summary>
/// Decodes a VoteRequest from a span. Throws <see cref="ArgumentException"/>
/// if the span is not exactly 32 bytes.
/// Go: server/raft.go:4571-4583 — decodeVoteRequest()
/// </summary>
public static RaftVoteRequestWire Decode(ReadOnlySpan<byte> msg)
{
if (msg.Length != RaftWireConstants.VoteRequestLen)
throw new ArgumentException(
$"VoteRequest requires exactly {RaftWireConstants.VoteRequestLen} bytes, got {msg.Length}.",
nameof(msg));
return new RaftVoteRequestWire(
Term: BinaryPrimitives.ReadUInt64LittleEndian(msg[0..]),
LastTerm: BinaryPrimitives.ReadUInt64LittleEndian(msg[8..]),
LastIndex: BinaryPrimitives.ReadUInt64LittleEndian(msg[16..]),
CandidateId: RaftWireHelpers.ReadId(msg[24..]));
}
}
/// <summary>
/// Binary wire encoding of a RAFT VoteResponse.
/// Fixed 17-byte layout (little-endian):
/// [0..7] term uint64
/// [8..15] peer 8-byte ASCII, zero-padded
/// [16] flags bit 0 = granted, bit 1 = empty-log marker
/// Go: server/raft.go:4729-4762 (voteResponse struct, encode, decodeVoteResponse)
/// </summary>
public readonly record struct RaftVoteResponseWire(
ulong Term,
string PeerId,
bool Granted,
bool Empty = false)
{
/// <summary>
/// Encodes this VoteResponse to a 17-byte buffer.
/// Go: server/raft.go:4739-4751 — voteResponse.encode()
/// </summary>
public byte[] Encode()
{
var buf = new byte[RaftWireConstants.VoteResponseLen];
BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(0), Term);
RaftWireHelpers.WriteId(buf.AsSpan(8), PeerId);
byte flags = 0;
if (Granted) flags |= 1;
if (Empty) flags |= 2;
buf[16] = flags;
return buf;
}
/// <summary>
/// Decodes a VoteResponse from a span. Throws <see cref="ArgumentException"/>
/// if the span is not exactly 17 bytes.
/// Go: server/raft.go:4753-4762 — decodeVoteResponse()
/// </summary>
public static RaftVoteResponseWire Decode(ReadOnlySpan<byte> msg)
{
if (msg.Length != RaftWireConstants.VoteResponseLen)
throw new ArgumentException(
$"VoteResponse requires exactly {RaftWireConstants.VoteResponseLen} bytes, got {msg.Length}.",
nameof(msg));
var flags = msg[16];
return new RaftVoteResponseWire(
Term: BinaryPrimitives.ReadUInt64LittleEndian(msg[0..]),
PeerId: RaftWireHelpers.ReadId(msg[8..]),
Granted: (flags & 1) != 0,
Empty: (flags & 2) != 0);
}
}
/// <summary>
/// Binary wire encoding of a RAFT AppendEntry message (variable length).
/// Layout (little-endian):
/// [0..7] leaderId 8-byte ASCII, zero-padded
/// [8..15] term uint64
/// [16..23] commit uint64
/// [24..31] pterm uint64
/// [32..39] pindex uint64
/// [40..41] entryCount uint16
/// [42+] entries each: [4:size uint32][1:type][data...]
/// where size = 1 + len(data)
/// [tail] leaderTerm uvarint (appended after entries; old nodes ignore it)
/// Go: server/raft.go:2557-2569 (appendEntry struct), 2662-2746 (encode/decode)
/// </summary>
public readonly record struct RaftAppendEntryWire(
string LeaderId,
ulong Term,
ulong Commit,
ulong PrevTerm,
ulong PrevIndex,
IReadOnlyList<RaftEntryWire> Entries,
ulong LeaderTerm = 0)
{
/// <summary>
/// Encodes this AppendEntry to a byte array.
/// Go: server/raft.go:2662-2711 — appendEntry.encode()
/// </summary>
public byte[] Encode()
{
if (Entries.Count > ushort.MaxValue)
throw new ArgumentException($"Too many entries: {Entries.Count} exceeds uint16 max.", nameof(Entries));
// Calculate total entry data size.
// Go: server/raft.go:2670-2678 — elen += ulen + 1 + 4
var elen = 0;
foreach (var e in Entries)
elen += 4 + 1 + e.Data.Length; // 4-byte size prefix + 1-byte type + data
// Encode leaderTerm as uvarint.
// Go: server/raft.go:2681-2682 — binary.PutUvarint(_lterm[:], ae.lterm)
Span<byte> ltermBuf = stackalloc byte[10];
var ltermLen = RaftWireHelpers.WriteUvarint(ltermBuf, LeaderTerm);
var totalLen = RaftWireConstants.AppendEntryBaseLen + elen + ltermLen;
var buf = new byte[totalLen];
var span = buf.AsSpan();
// Go: server/raft.go:2693-2698 — copy leader and write fixed fields
RaftWireHelpers.WriteId(span[0..], LeaderId);
BinaryPrimitives.WriteUInt64LittleEndian(span[8..], Term);
BinaryPrimitives.WriteUInt64LittleEndian(span[16..], Commit);
BinaryPrimitives.WriteUInt64LittleEndian(span[24..], PrevTerm);
BinaryPrimitives.WriteUInt64LittleEndian(span[32..], PrevIndex);
BinaryPrimitives.WriteUInt16LittleEndian(span[40..], (ushort)Entries.Count);
// Go: server/raft.go:2699-2705 — encode each entry
var pos = RaftWireConstants.AppendEntryBaseLen;
foreach (var e in Entries)
{
// size = 1 (type) + len(data)
// Go: server/raft.go:2702 — le.AppendUint32(buf, uint32(1+len(e.Data)))
BinaryPrimitives.WriteUInt32LittleEndian(span[pos..], (uint)(1 + e.Data.Length));
pos += 4;
buf[pos++] = (byte)e.Type;
e.Data.CopyTo(span[pos..]);
pos += e.Data.Length;
}
// Append leaderTerm uvarint.
// Go: server/raft.go:2709 — buf = append(buf, lterm...)
ltermBuf[..ltermLen].CopyTo(span[pos..]);
return buf;
}
/// <summary>
/// Decodes an AppendEntry from a span. Throws <see cref="ArgumentException"/>
/// if the buffer is shorter than the minimum header length or malformed.
/// Go: server/raft.go:2714-2746 — decodeAppendEntry()
/// </summary>
public static RaftAppendEntryWire Decode(ReadOnlySpan<byte> msg)
{
if (msg.Length < RaftWireConstants.AppendEntryBaseLen)
throw new ArgumentException(
$"AppendEntry requires at least {RaftWireConstants.AppendEntryBaseLen} bytes, got {msg.Length}.",
nameof(msg));
// Go: server/raft.go:2721 — ae := newAppendEntry(string(msg[:idLen]), ...)
var leaderId = RaftWireHelpers.ReadId(msg[0..]);
var term = BinaryPrimitives.ReadUInt64LittleEndian(msg[8..]);
var commit = BinaryPrimitives.ReadUInt64LittleEndian(msg[16..]);
var pterm = BinaryPrimitives.ReadUInt64LittleEndian(msg[24..]);
var pindex = BinaryPrimitives.ReadUInt64LittleEndian(msg[32..]);
// Go: server/raft.go:2725 — ne, ri := int(le.Uint16(msg[40:])), uint64(42)
var entryCount = BinaryPrimitives.ReadUInt16LittleEndian(msg[40..]);
var entries = new List<RaftEntryWire>(entryCount);
var ri = RaftWireConstants.AppendEntryBaseLen;
// Go: server/raft.go:2726-2737 — decode entries loop
for (var i = 0; i < entryCount; i++)
{
if (ri >= msg.Length - 1)
throw new ArgumentException("AppendEntry buffer truncated while reading entries.", nameof(msg));
var ml = (int)BinaryPrimitives.ReadUInt32LittleEndian(msg[ri..]);
ri += 4;
if (ml <= 0 || ri + ml > msg.Length)
throw new ArgumentException("AppendEntry entry size is out of bounds.", nameof(msg));
var entryType = (RaftEntryType)msg[ri];
var data = msg[(ri + 1)..(ri + ml)].ToArray();
entries.Add(new RaftEntryWire(entryType, data));
ri += ml;
}
// Decode optional leaderTerm uvarint from tail bytes.
// Go: server/raft.go:2739-2743 — if lterm, n := binary.Uvarint(msg[ri:]); n > 0 ...
ulong lterm = 0;
if (ri < msg.Length)
RaftWireHelpers.ReadUvarint(msg[ri..], out lterm);
return new RaftAppendEntryWire(
LeaderId: leaderId,
Term: term,
Commit: commit,
PrevTerm: pterm,
PrevIndex: pindex,
Entries: entries,
LeaderTerm: lterm);
}
}
/// <summary>
/// Binary wire encoding of a RAFT AppendEntryResponse.
/// Fixed 25-byte layout (little-endian):
/// [0..7] term uint64
/// [8..15] index uint64
/// [16..23] peerId 8-byte ASCII, zero-padded
/// [24] success 0 or 1
/// Go: server/raft.go:2760-2817 (appendEntryResponse struct, encode, decodeAppendEntryResponse)
/// </summary>
public readonly record struct RaftAppendEntryResponseWire(
ulong Term,
ulong Index,
string PeerId,
bool Success)
{
/// <summary>
/// Encodes this AppendEntryResponse to a 25-byte buffer.
/// Go: server/raft.go:2777-2794 — appendEntryResponse.encode()
/// </summary>
public byte[] Encode()
{
var buf = new byte[RaftWireConstants.AppendEntryResponseLen];
BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(0), Term);
BinaryPrimitives.WriteUInt64LittleEndian(buf.AsSpan(8), Index);
RaftWireHelpers.WriteId(buf.AsSpan(16), PeerId);
buf[24] = Success ? (byte)1 : (byte)0;
return buf;
}
/// <summary>
/// Decodes an AppendEntryResponse from a span. Throws <see cref="ArgumentException"/>
/// if the span is not exactly 25 bytes.
/// Go: server/raft.go:2799-2817 — decodeAppendEntryResponse()
/// </summary>
public static RaftAppendEntryResponseWire Decode(ReadOnlySpan<byte> msg)
{
if (msg.Length != RaftWireConstants.AppendEntryResponseLen)
throw new ArgumentException(
$"AppendEntryResponse requires exactly {RaftWireConstants.AppendEntryResponseLen} bytes, got {msg.Length}.",
nameof(msg));
return new RaftAppendEntryResponseWire(
Term: BinaryPrimitives.ReadUInt64LittleEndian(msg[0..]),
Index: BinaryPrimitives.ReadUInt64LittleEndian(msg[8..]),
PeerId: RaftWireHelpers.ReadId(msg[16..]),
// Go: server/raft.go:2815 — ar.success = msg[24] == 1
Success: msg[24] == 1);
}
}
/// <summary>
/// Shared encoding helpers for all RAFT wire format types.
/// </summary>
internal static class RaftWireHelpers
{
/// <summary>
/// Writes a peer/leader ID to an 8-byte span. IDs shorter than 8 bytes are
/// zero-padded; IDs longer than 8 bytes are silently truncated (matching Go's
/// copy(buf[:idLen], id) semantics).
/// Go: server/raft.go:2693 — copy(buf[:idLen], ae.leader)
/// </summary>
public static void WriteId(Span<byte> dest, string id)
{
// Zero-fill the 8-byte slot first.
dest[..RaftWireConstants.IdLen].Clear();
var bytes = Encoding.ASCII.GetBytes(id);
var copyLen = Math.Min(bytes.Length, RaftWireConstants.IdLen);
bytes.AsSpan(0, copyLen).CopyTo(dest);
}
/// <summary>
/// Reads a peer/leader ID from an 8-byte span, trimming trailing null bytes so
/// that zero-padded IDs decode back to their original string.
/// Go: server/raft.go:4581 — string(copyBytes(msg[24:24+idLen]))
/// </summary>
public static string ReadId(ReadOnlySpan<byte> src)
{
var idBytes = src[..RaftWireConstants.IdLen];
var len = idBytes.Length;
while (len > 0 && idBytes[len - 1] == 0)
len--;
return Encoding.ASCII.GetString(idBytes[..len]);
}
/// <summary>
/// Writes a uint64 as a uvarint into <paramref name="buf"/> and returns the
/// number of bytes written (1-10).
/// Go: server/raft.go:2682 — binary.PutUvarint(_lterm[:], ae.lterm)
/// </summary>
public static int WriteUvarint(Span<byte> buf, ulong value)
{
var pos = 0;
while (value > 0x7F)
{
buf[pos++] = (byte)((value & 0x7F) | 0x80);
value >>= 7;
}
buf[pos++] = (byte)value;
return pos;
}
/// <summary>
/// Reads a uvarint from <paramref name="buf"/> into <paramref name="value"/>
/// and returns the number of bytes consumed (0 on overflow or empty input).
/// Go: server/raft.go:2740 — binary.Uvarint(msg[ri:])
/// </summary>
public static int ReadUvarint(ReadOnlySpan<byte> buf, out ulong value)
{
value = 0;
var shift = 0;
for (var i = 0; i < buf.Length && i < 10; i++)
{
var b = buf[i];
value |= ((ulong)(b & 0x7F)) << shift;
if ((b & 0x80) == 0)
return i + 1;
shift += 7;
}
value = 0;
return 0; // overflow or empty
}
}