Merge branch 'codex/jetstream-full-parity-executeplan' into main

# Conflicts:
#	differences.md
#	docs/plans/2026-02-23-jetstream-full-parity-plan.md
#	src/NATS.Server/Auth/Account.cs
#	src/NATS.Server/Configuration/ConfigProcessor.cs
#	src/NATS.Server/Monitoring/VarzHandler.cs
#	src/NATS.Server/NatsClient.cs
#	src/NATS.Server/NatsOptions.cs
#	src/NATS.Server/NatsServer.cs
This commit is contained in:
Joseph Doherty
2026-02-23 08:53:44 -05:00
102 changed files with 7821 additions and 23 deletions

View File

@@ -15,6 +15,8 @@ public sealed class Account : IDisposable
public int MaxSubscriptions { get; set; } // 0 = unlimited
public ExportMap Exports { get; } = new();
public ImportMap Imports { get; } = new();
public int MaxJetStreamStreams { get; set; } // 0 = unlimited
public string? JetStreamTier { get; set; }
// JWT fields
public string? Nkey { get; set; }
@@ -36,6 +38,7 @@ public sealed class Account : IDisposable
private readonly ConcurrentDictionary<ulong, byte> _clients = new();
private int _subscriptionCount;
private int _jetStreamStreamCount;
public Account(string name)
{
@@ -44,6 +47,7 @@ public sealed class Account : IDisposable
public int ClientCount => _clients.Count;
public int SubscriptionCount => Volatile.Read(ref _subscriptionCount);
public int JetStreamStreamCount => Volatile.Read(ref _jetStreamStreamCount);
/// <summary>Returns false if max connections exceeded.</summary>
public bool AddClient(ulong clientId)
@@ -69,6 +73,23 @@ public sealed class Account : IDisposable
Interlocked.Decrement(ref _subscriptionCount);
}
public bool TryReserveStream()
{
if (MaxJetStreamStreams > 0 && Volatile.Read(ref _jetStreamStreamCount) >= MaxJetStreamStreams)
return false;
Interlocked.Increment(ref _jetStreamStreamCount);
return true;
}
public void ReleaseStream()
{
if (Volatile.Read(ref _jetStreamStreamCount) == 0)
return;
Interlocked.Decrement(ref _jetStreamStreamCount);
}
// Per-account message/byte stats
private long _inMsgs;
private long _outMsgs;

View File

@@ -6,4 +6,6 @@ public sealed class AuthResult
public string? AccountName { get; init; }
public Permissions? Permissions { get; init; }
public DateTimeOffset? Expiry { get; init; }
public int MaxJetStreamStreams { get; init; }
public string? JetStreamTier { get; init; }
}

View File

@@ -47,6 +47,10 @@ public sealed class AccountNats
[JsonPropertyName("limits")]
public AccountLimits? Limits { get; set; }
/// <summary>JetStream entitlement limits/tier for this account.</summary>
[JsonPropertyName("jetstream")]
public AccountJetStreamLimits? JetStream { get; set; }
/// <summary>NKey public keys authorized to sign user JWTs for this account.</summary>
[JsonPropertyName("signing_keys")]
public string[]? SigningKeys { get; set; }
@@ -92,3 +96,12 @@ public sealed class AccountLimits
[JsonPropertyName("data")]
public long MaxData { get; set; }
}
public sealed class AccountJetStreamLimits
{
[JsonPropertyName("max_streams")]
public int MaxStreams { get; set; }
[JsonPropertyName("tier")]
public string? Tier { get; set; }
}

View File

@@ -161,6 +161,8 @@ public sealed class JwtAuthenticator : IAuthenticator
AccountName = issuerAccount,
Permissions = permissions,
Expiry = userClaims.GetExpiry(),
MaxJetStreamStreams = accountClaims.Nats?.JetStream?.MaxStreams ?? 0,
JetStreamTier = accountClaims.Nats?.JetStream?.Tier,
};
}

View File

@@ -0,0 +1,9 @@
namespace NATS.Server.Configuration;
public sealed class ClusterOptions
{
public string? Name { get; set; }
public string Host { get; set; } = "0.0.0.0";
public int Port { get; set; } = 6222;
public List<string> Routes { get; set; } = [];
}

View File

@@ -217,6 +217,26 @@ public static class ConfigProcessor
opts.AllowNonTls = ToBool(value);
break;
// Cluster / inter-server / JetStream
case "cluster":
if (value is Dictionary<string, object?> clusterDict)
opts.Cluster = ParseCluster(clusterDict, errors);
break;
case "gateway":
if (value is Dictionary<string, object?> gatewayDict)
opts.Gateway = ParseGateway(gatewayDict, errors);
break;
case "leaf":
case "leafnode":
case "leafnodes":
if (value is Dictionary<string, object?> leafDict)
opts.LeafNode = ParseLeafNode(leafDict, errors);
break;
case "jetstream":
if (value is Dictionary<string, object?> jsDict)
opts.JetStream = ParseJetStream(jsDict, errors);
break;
// Tags
case "server_tags":
if (value is Dictionary<string, object?> tagsDict)
@@ -348,6 +368,9 @@ public static class ConfigProcessor
private static readonly Regex DurationPattern = new(
@"^(-?\d+(?:\.\d+)?)\s*(ms|s|m|h)$",
RegexOptions.Compiled | RegexOptions.IgnoreCase);
private static readonly Regex ByteSizePattern = new(
@"^(\d+)\s*(b|kb|mb|gb|tb)?$",
RegexOptions.Compiled | RegexOptions.IgnoreCase);
private static TimeSpan ParseDurationString(string s)
{
@@ -368,6 +391,133 @@ public static class ConfigProcessor
};
}
// ─── Cluster / gateway / leafnode / JetStream parsing ────────
private static ClusterOptions ParseCluster(Dictionary<string, object?> dict, List<string> errors)
{
var options = new ClusterOptions();
foreach (var (key, value) in dict)
{
switch (key.ToLowerInvariant())
{
case "name":
options.Name = ToString(value);
break;
case "listen":
try
{
var (host, port) = ParseHostPort(value);
if (host is not null)
options.Host = host;
if (port is not null)
options.Port = port.Value;
}
catch (Exception ex)
{
errors.Add($"Invalid cluster.listen: {ex.Message}");
}
break;
}
}
return options;
}
private static GatewayOptions ParseGateway(Dictionary<string, object?> dict, List<string> errors)
{
var options = new GatewayOptions();
foreach (var (key, value) in dict)
{
switch (key.ToLowerInvariant())
{
case "name":
options.Name = ToString(value);
break;
case "listen":
try
{
var (host, port) = ParseHostPort(value);
if (host is not null)
options.Host = host;
if (port is not null)
options.Port = port.Value;
}
catch (Exception ex)
{
errors.Add($"Invalid gateway.listen: {ex.Message}");
}
break;
}
}
return options;
}
private static LeafNodeOptions ParseLeafNode(Dictionary<string, object?> dict, List<string> errors)
{
var options = new LeafNodeOptions();
foreach (var (key, value) in dict)
{
if (key.Equals("listen", StringComparison.OrdinalIgnoreCase))
{
try
{
var (host, port) = ParseHostPort(value);
if (host is not null)
options.Host = host;
if (port is not null)
options.Port = port.Value;
}
catch (Exception ex)
{
errors.Add($"Invalid leafnode.listen: {ex.Message}");
}
}
}
return options;
}
private static JetStreamOptions ParseJetStream(Dictionary<string, object?> dict, List<string> errors)
{
var options = new JetStreamOptions();
foreach (var (key, value) in dict)
{
switch (key.ToLowerInvariant())
{
case "store_dir":
options.StoreDir = ToString(value);
break;
case "max_mem_store":
try
{
options.MaxMemoryStore = ParseByteSize(value);
}
catch (Exception ex)
{
errors.Add($"Invalid jetstream.max_mem_store: {ex.Message}");
}
break;
case "max_file_store":
try
{
options.MaxFileStore = ParseByteSize(value);
}
catch (Exception ex)
{
errors.Add($"Invalid jetstream.max_file_store: {ex.Message}");
}
break;
}
}
return options;
}
// ─── Authorization parsing ─────────────────────────────────────
private static void ParseAuthorization(Dictionary<string, object?> dict, NatsOptions opts, List<string> errors)
@@ -785,6 +935,40 @@ public static class ConfigProcessor
_ => throw new FormatException($"Cannot convert {value?.GetType().Name ?? "null"} to long"),
};
private static long ParseByteSize(object? value)
{
if (value is long l)
return l;
if (value is int i)
return i;
if (value is double d)
return (long)d;
if (value is not string s)
throw new FormatException($"Cannot parse byte size from {value?.GetType().Name ?? "null"}");
var trimmed = s.Trim();
var match = ByteSizePattern.Match(trimmed);
if (!match.Success)
throw new FormatException($"Cannot parse byte size: '{s}'");
var amount = long.Parse(match.Groups[1].Value, CultureInfo.InvariantCulture);
var unit = match.Groups[2].Value.ToLowerInvariant();
var multiplier = unit switch
{
"" or "b" => 1L,
"kb" => 1024L,
"mb" => 1024L * 1024L,
"gb" => 1024L * 1024L * 1024L,
"tb" => 1024L * 1024L * 1024L * 1024L,
_ => throw new FormatException($"Unknown byte-size unit: '{unit}'"),
};
checked
{
return amount * multiplier;
}
}
private static bool ToBool(object? value) => value switch
{
bool b => b,

View File

@@ -11,7 +11,8 @@ namespace NATS.Server.Configuration;
public static class ConfigReloader
{
// Non-reloadable options (match Go server — Host, Port, ServerName require restart)
private static readonly HashSet<string> NonReloadable = ["Host", "Port", "ServerName"];
private static readonly HashSet<string> NonReloadable =
["Host", "Port", "ServerName", "Cluster", "JetStream.StoreDir"];
// Logging-related options
private static readonly HashSet<string> LoggingOptions =
@@ -102,6 +103,13 @@ public static class ConfigReloader
CompareAndAdd(changes, "NoSystemAccount", oldOpts.NoSystemAccount, newOpts.NoSystemAccount);
CompareAndAdd(changes, "SystemAccount", oldOpts.SystemAccount, newOpts.SystemAccount);
// Cluster and JetStream (restart-required boundaries)
if (!ClusterEquivalent(oldOpts.Cluster, newOpts.Cluster))
changes.Add(new ConfigChange("Cluster", isNonReloadable: true));
if (JetStreamStoreDirChanged(oldOpts.JetStream, newOpts.JetStream))
changes.Add(new ConfigChange("JetStream.StoreDir", isNonReloadable: true));
return changes;
}
@@ -338,4 +346,35 @@ public static class ConfigReloader
isNonReloadable: NonReloadable.Contains(name)));
}
}
private static bool ClusterEquivalent(ClusterOptions? oldCluster, ClusterOptions? newCluster)
{
if (oldCluster is null && newCluster is null)
return true;
if (oldCluster is null || newCluster is null)
return false;
if (!string.Equals(oldCluster.Name, newCluster.Name, StringComparison.Ordinal))
return false;
if (!string.Equals(oldCluster.Host, newCluster.Host, StringComparison.Ordinal))
return false;
if (oldCluster.Port != newCluster.Port)
return false;
return oldCluster.Routes.SequenceEqual(newCluster.Routes, StringComparer.Ordinal);
}
private static bool JetStreamStoreDirChanged(JetStreamOptions? oldJetStream, JetStreamOptions? newJetStream)
{
if (oldJetStream is null && newJetStream is null)
return false;
if (oldJetStream is null || newJetStream is null)
return true;
return !string.Equals(oldJetStream.StoreDir, newJetStream.StoreDir, StringComparison.Ordinal);
}
}

View File

@@ -0,0 +1,8 @@
namespace NATS.Server.Configuration;
public sealed class GatewayOptions
{
public string? Name { get; set; }
public string Host { get; set; } = "0.0.0.0";
public int Port { get; set; }
}

View File

@@ -0,0 +1,8 @@
namespace NATS.Server.Configuration;
public sealed class JetStreamOptions
{
public string StoreDir { get; set; } = string.Empty;
public long MaxMemoryStore { get; set; }
public long MaxFileStore { get; set; }
}

View File

@@ -0,0 +1,7 @@
namespace NATS.Server.Configuration;
public sealed class LeafNodeOptions
{
public string Host { get; set; } = "0.0.0.0";
public int Port { get; set; }
}

View File

@@ -0,0 +1,11 @@
namespace NATS.Server.Gateways;
public sealed class GatewayConnection
{
public string RemoteEndpoint { get; }
public GatewayConnection(string remoteEndpoint)
{
RemoteEndpoint = remoteEndpoint;
}
}

View File

@@ -0,0 +1,32 @@
using Microsoft.Extensions.Logging;
using NATS.Server.Configuration;
namespace NATS.Server.Gateways;
public sealed class GatewayManager : IAsyncDisposable
{
private readonly GatewayOptions _options;
private readonly ServerStats _stats;
private readonly ILogger<GatewayManager> _logger;
public GatewayManager(GatewayOptions options, ServerStats stats, ILogger<GatewayManager> logger)
{
_options = options;
_stats = stats;
_logger = logger;
}
public Task StartAsync(CancellationToken ct)
{
_logger.LogDebug("Gateway manager started (name={Name}, listen={Host}:{Port})",
_options.Name, _options.Host, _options.Port);
Interlocked.Exchange(ref _stats.Gateways, 0);
return Task.CompletedTask;
}
public ValueTask DisposeAsync()
{
_logger.LogDebug("Gateway manager stopped");
return ValueTask.CompletedTask;
}
}

View File

@@ -0,0 +1,88 @@
using System.Text.Json;
using NATS.Server.JetStream.Models;
namespace NATS.Server.JetStream.Api.Handlers;
public static class ConsumerApiHandlers
{
private const string CreatePrefix = "$JS.API.CONSUMER.CREATE.";
private const string InfoPrefix = "$JS.API.CONSUMER.INFO.";
public static JetStreamApiResponse HandleCreate(string subject, ReadOnlySpan<byte> payload, ConsumerManager consumerManager)
{
var parsed = ParseSubject(subject, CreatePrefix);
if (parsed == null)
return JetStreamApiResponse.NotFound(subject);
var (stream, durableName) = parsed.Value;
var config = ParseConfig(payload);
if (string.IsNullOrWhiteSpace(config.DurableName))
config.DurableName = durableName;
return consumerManager.CreateOrUpdate(stream, config);
}
public static JetStreamApiResponse HandleInfo(string subject, ConsumerManager consumerManager)
{
var parsed = ParseSubject(subject, InfoPrefix);
if (parsed == null)
return JetStreamApiResponse.NotFound(subject);
var (stream, durableName) = parsed.Value;
return consumerManager.GetInfo(stream, durableName);
}
private static (string Stream, string Durable)? ParseSubject(string subject, string prefix)
{
if (!subject.StartsWith(prefix, StringComparison.Ordinal))
return null;
var remainder = subject[prefix.Length..];
var split = remainder.Split('.', 2, StringSplitOptions.RemoveEmptyEntries);
if (split.Length != 2)
return null;
return (split[0], split[1]);
}
private static ConsumerConfig ParseConfig(ReadOnlySpan<byte> payload)
{
if (payload.IsEmpty)
return new ConsumerConfig();
try
{
using var doc = JsonDocument.Parse(payload.ToArray());
var root = doc.RootElement;
var config = new ConsumerConfig();
if (root.TryGetProperty("durable_name", out var durableEl))
config.DurableName = durableEl.GetString() ?? string.Empty;
if (root.TryGetProperty("filter_subject", out var filterEl))
config.FilterSubject = filterEl.GetString();
if (root.TryGetProperty("push", out var pushEl) && pushEl.ValueKind == JsonValueKind.True)
config.Push = true;
if (root.TryGetProperty("heartbeat_ms", out var hbEl) && hbEl.TryGetInt32(out var hbMs))
config.HeartbeatMs = hbMs;
if (root.TryGetProperty("ack_wait_ms", out var ackWaitEl) && ackWaitEl.TryGetInt32(out var ackWait))
config.AckWaitMs = ackWait;
if (root.TryGetProperty("ack_policy", out var ackPolicyEl))
{
var ackPolicy = ackPolicyEl.GetString();
if (string.Equals(ackPolicy, "explicit", StringComparison.OrdinalIgnoreCase))
config.AckPolicy = AckPolicy.Explicit;
}
return config;
}
catch (JsonException)
{
return new ConsumerConfig();
}
}
}

View File

@@ -0,0 +1,91 @@
using System.Text.Json;
using NATS.Server.JetStream.Models;
namespace NATS.Server.JetStream.Api.Handlers;
public static class StreamApiHandlers
{
private const string CreatePrefix = "$JS.API.STREAM.CREATE.";
private const string InfoPrefix = "$JS.API.STREAM.INFO.";
public static JetStreamApiResponse HandleCreate(string subject, ReadOnlySpan<byte> payload, StreamManager streamManager)
{
var streamName = ExtractTrailingToken(subject, CreatePrefix);
if (streamName == null)
return JetStreamApiResponse.NotFound(subject);
var config = ParseConfig(payload);
if (string.IsNullOrWhiteSpace(config.Name))
config.Name = streamName;
if (config.Subjects.Count == 0)
config.Subjects.Add(streamName.ToLowerInvariant() + ".>");
return streamManager.CreateOrUpdate(config);
}
public static JetStreamApiResponse HandleInfo(string subject, StreamManager streamManager)
{
var streamName = ExtractTrailingToken(subject, InfoPrefix);
if (streamName == null)
return JetStreamApiResponse.NotFound(subject);
return streamManager.GetInfo(streamName);
}
private static string? ExtractTrailingToken(string subject, string prefix)
{
if (!subject.StartsWith(prefix, StringComparison.Ordinal))
return null;
var token = subject[prefix.Length..].Trim();
return token.Length == 0 ? null : token;
}
private static StreamConfig ParseConfig(ReadOnlySpan<byte> payload)
{
if (payload.IsEmpty)
return new StreamConfig();
try
{
using var doc = JsonDocument.Parse(payload.ToArray());
var root = doc.RootElement;
var config = new StreamConfig();
if (root.TryGetProperty("name", out var nameEl))
config.Name = nameEl.GetString() ?? string.Empty;
if (root.TryGetProperty("subjects", out var subjectsEl))
{
if (subjectsEl.ValueKind == JsonValueKind.Array)
{
foreach (var item in subjectsEl.EnumerateArray())
{
var value = item.GetString();
if (!string.IsNullOrWhiteSpace(value))
config.Subjects.Add(value);
}
}
else if (subjectsEl.ValueKind == JsonValueKind.String)
{
var value = subjectsEl.GetString();
if (!string.IsNullOrWhiteSpace(value))
config.Subjects.Add(value);
}
}
if (root.TryGetProperty("max_msgs", out var maxMsgsEl) && maxMsgsEl.TryGetInt32(out var maxMsgs))
config.MaxMsgs = maxMsgs;
if (root.TryGetProperty("replicas", out var replicasEl) && replicasEl.TryGetInt32(out var replicas))
config.Replicas = replicas;
return config;
}
catch (JsonException)
{
return new StreamConfig();
}
}
}

View File

@@ -0,0 +1,7 @@
namespace NATS.Server.JetStream.Api;
public sealed class JetStreamApiError
{
public int Code { get; init; }
public string Description { get; init; } = string.Empty;
}

View File

@@ -0,0 +1,41 @@
using NATS.Server.JetStream.Models;
namespace NATS.Server.JetStream.Api;
public sealed class JetStreamApiResponse
{
public JetStreamApiError? Error { get; init; }
public JetStreamStreamInfo? StreamInfo { get; init; }
public JetStreamConsumerInfo? ConsumerInfo { get; init; }
public static JetStreamApiResponse NotFound(string subject) => new()
{
Error = new JetStreamApiError
{
Code = 404,
Description = $"unknown api subject '{subject}'",
},
};
public static JetStreamApiResponse Ok() => new();
public static JetStreamApiResponse ErrorResponse(int code, string description) => new()
{
Error = new JetStreamApiError
{
Code = code,
Description = description,
},
};
}
public sealed class JetStreamStreamInfo
{
public required StreamConfig Config { get; init; }
public required StreamState State { get; init; }
}
public sealed class JetStreamConsumerInfo
{
public required ConsumerConfig Config { get; init; }
}

View File

@@ -0,0 +1,37 @@
using NATS.Server.JetStream.Api.Handlers;
namespace NATS.Server.JetStream.Api;
public sealed class JetStreamApiRouter
{
private readonly StreamManager _streamManager;
private readonly ConsumerManager _consumerManager;
public JetStreamApiRouter()
: this(new StreamManager(), new ConsumerManager())
{
}
public JetStreamApiRouter(StreamManager streamManager, ConsumerManager consumerManager)
{
_streamManager = streamManager;
_consumerManager = consumerManager;
}
public JetStreamApiResponse Route(string subject, ReadOnlySpan<byte> payload)
{
if (subject.StartsWith("$JS.API.STREAM.CREATE.", StringComparison.Ordinal))
return StreamApiHandlers.HandleCreate(subject, payload, _streamManager);
if (subject.StartsWith("$JS.API.STREAM.INFO.", StringComparison.Ordinal))
return StreamApiHandlers.HandleInfo(subject, _streamManager);
if (subject.StartsWith("$JS.API.CONSUMER.CREATE.", StringComparison.Ordinal))
return ConsumerApiHandlers.HandleCreate(subject, payload, _consumerManager);
if (subject.StartsWith("$JS.API.CONSUMER.INFO.", StringComparison.Ordinal))
return ConsumerApiHandlers.HandleInfo(subject, _consumerManager);
return JetStreamApiResponse.NotFound(subject);
}
}

View File

@@ -0,0 +1,17 @@
namespace NATS.Server.JetStream.Cluster;
public sealed class AssetPlacementPlanner
{
private readonly int _nodes;
public AssetPlacementPlanner(int nodes)
{
_nodes = Math.Max(nodes, 1);
}
public IReadOnlyList<int> PlanReplicas(int replicas)
{
var count = Math.Min(Math.Max(replicas, 1), _nodes);
return Enumerable.Range(1, count).ToArray();
}
}

View File

@@ -0,0 +1,36 @@
using System.Collections.Concurrent;
using NATS.Server.JetStream.Models;
namespace NATS.Server.JetStream.Cluster;
public sealed class JetStreamMetaGroup
{
private readonly int _nodes;
private readonly ConcurrentDictionary<string, byte> _streams = new(StringComparer.Ordinal);
public JetStreamMetaGroup(int nodes)
{
_nodes = nodes;
}
public Task ProposeCreateStreamAsync(StreamConfig config, CancellationToken ct)
{
_streams[config.Name] = 0;
return Task.CompletedTask;
}
public MetaGroupState GetState()
{
return new MetaGroupState
{
Streams = _streams.Keys.OrderBy(x => x, StringComparer.Ordinal).ToArray(),
ClusterSize = _nodes,
};
}
}
public sealed class MetaGroupState
{
public IReadOnlyList<string> Streams { get; init; } = [];
public int ClusterSize { get; init; }
}

View File

@@ -0,0 +1,65 @@
using NATS.Server.Raft;
namespace NATS.Server.JetStream.Cluster;
public sealed class StreamReplicaGroup
{
private readonly List<RaftNode> _nodes;
public string StreamName { get; }
public IReadOnlyList<RaftNode> Nodes => _nodes;
public RaftNode Leader { get; private set; }
public StreamReplicaGroup(string streamName, int replicas)
{
StreamName = streamName;
var nodeCount = Math.Max(replicas, 1);
_nodes = Enumerable.Range(1, nodeCount)
.Select(i => new RaftNode($"{streamName.ToLowerInvariant()}-r{i}"))
.ToList();
foreach (var node in _nodes)
node.ConfigureCluster(_nodes);
Leader = ElectLeader(_nodes[0]);
}
public async ValueTask<long> ProposeAsync(string command, CancellationToken ct)
{
if (!Leader.IsLeader)
Leader = ElectLeader(SelectNextCandidate(Leader));
return await Leader.ProposeAsync(command, ct);
}
public Task StepDownAsync(CancellationToken ct)
{
_ = ct;
var previous = Leader;
previous.RequestStepDown();
Leader = ElectLeader(SelectNextCandidate(previous));
return Task.CompletedTask;
}
private RaftNode SelectNextCandidate(RaftNode currentLeader)
{
if (_nodes.Count == 1)
return _nodes[0];
var index = _nodes.FindIndex(n => n.Id == currentLeader.Id);
if (index < 0)
return _nodes[0];
return _nodes[(index + 1) % _nodes.Count];
}
private RaftNode ElectLeader(RaftNode candidate)
{
candidate.StartElection(_nodes.Count);
foreach (var voter in _nodes.Where(n => n.Id != candidate.Id))
candidate.ReceiveVote(voter.GrantVote(candidate.Term), _nodes.Count);
return candidate;
}
}

View File

@@ -0,0 +1,97 @@
using System.Collections.Concurrent;
using NATS.Server.JetStream.Api;
using NATS.Server.JetStream.Cluster;
using NATS.Server.JetStream.Consumers;
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Storage;
namespace NATS.Server.JetStream;
public sealed class ConsumerManager
{
private readonly JetStreamMetaGroup? _metaGroup;
private readonly ConcurrentDictionary<(string Stream, string Name), ConsumerHandle> _consumers = new();
private readonly PullConsumerEngine _pullConsumerEngine = new();
private readonly PushConsumerEngine _pushConsumerEngine = new();
public ConsumerManager(JetStreamMetaGroup? metaGroup = null)
{
_metaGroup = metaGroup;
}
public int ConsumerCount => _consumers.Count;
public JetStreamApiResponse CreateOrUpdate(string stream, ConsumerConfig config)
{
if (string.IsNullOrWhiteSpace(config.DurableName))
return JetStreamApiResponse.ErrorResponse(400, "durable name required");
var key = (stream, config.DurableName);
var handle = _consumers.AddOrUpdate(key,
_ => new ConsumerHandle(stream, config),
(_, existing) => existing with { Config = config });
return new JetStreamApiResponse
{
ConsumerInfo = new JetStreamConsumerInfo
{
Config = handle.Config,
},
};
}
public JetStreamApiResponse GetInfo(string stream, string durableName)
{
if (_consumers.TryGetValue((stream, durableName), out var handle))
{
return new JetStreamApiResponse
{
ConsumerInfo = new JetStreamConsumerInfo
{
Config = handle.Config,
},
};
}
return JetStreamApiResponse.NotFound($"$JS.API.CONSUMER.INFO.{stream}.{durableName}");
}
public bool TryGet(string stream, string durableName, out ConsumerHandle handle)
=> _consumers.TryGetValue((stream, durableName), out handle!);
public async ValueTask<PullFetchBatch> FetchAsync(string stream, string durableName, int batch, StreamManager streamManager, CancellationToken ct)
{
if (!_consumers.TryGetValue((stream, durableName), out var consumer))
return new PullFetchBatch([]);
if (!streamManager.TryGet(stream, out var streamHandle))
return new PullFetchBatch([]);
return await _pullConsumerEngine.FetchAsync(streamHandle, consumer, batch, ct);
}
public void OnPublished(string stream, StoredMessage message)
{
foreach (var handle in _consumers.Values.Where(c => c.Stream == stream && c.Config.Push))
_pushConsumerEngine.Enqueue(handle, message);
}
public PushFrame? ReadPushFrame(string stream, string durableName)
{
if (!_consumers.TryGetValue((stream, durableName), out var consumer))
return null;
if (consumer.PushFrames.Count == 0)
return null;
return consumer.PushFrames.Dequeue();
}
}
public sealed record ConsumerHandle(string Stream, ConsumerConfig Config)
{
public ulong NextSequence { get; set; } = 1;
public Queue<StoredMessage> Pending { get; } = new();
public Queue<PushFrame> PushFrames { get; } = new();
public AckProcessor AckProcessor { get; } = new();
}

View File

@@ -0,0 +1,24 @@
namespace NATS.Server.JetStream.Consumers;
public sealed class AckProcessor
{
private readonly Dictionary<ulong, DateTime> _pending = new();
public void Register(ulong sequence, int ackWaitMs)
{
_pending[sequence] = DateTime.UtcNow.AddMilliseconds(Math.Max(ackWaitMs, 1));
}
public ulong? NextExpired()
{
foreach (var (seq, deadline) in _pending)
{
if (DateTime.UtcNow >= deadline)
return seq;
}
return null;
}
public bool HasPending => _pending.Count > 0;
}

View File

@@ -0,0 +1,63 @@
using NATS.Server.JetStream.Storage;
using NATS.Server.JetStream.Models;
namespace NATS.Server.JetStream.Consumers;
public sealed class PullConsumerEngine
{
public async ValueTask<PullFetchBatch> FetchAsync(StreamHandle stream, ConsumerHandle consumer, int batch, CancellationToken ct)
{
var messages = new List<StoredMessage>(batch);
if (consumer.Config.AckPolicy == AckPolicy.Explicit)
{
var expired = consumer.AckProcessor.NextExpired();
if (expired is { } expiredSequence)
{
var redelivery = await stream.Store.LoadAsync(expiredSequence, ct);
if (redelivery != null)
{
messages.Add(new StoredMessage
{
Sequence = redelivery.Sequence,
Subject = redelivery.Subject,
Payload = redelivery.Payload,
Redelivered = true,
});
}
return new PullFetchBatch(messages);
}
if (consumer.AckProcessor.HasPending)
return new PullFetchBatch(messages);
}
var sequence = consumer.NextSequence;
for (var i = 0; i < batch; i++)
{
var message = await stream.Store.LoadAsync(sequence, ct);
if (message == null)
break;
messages.Add(message);
if (consumer.Config.AckPolicy == AckPolicy.Explicit)
consumer.AckProcessor.Register(message.Sequence, consumer.Config.AckWaitMs);
sequence++;
}
consumer.NextSequence = sequence;
return new PullFetchBatch(messages);
}
}
public sealed class PullFetchBatch
{
public IReadOnlyList<StoredMessage> Messages { get; }
public PullFetchBatch(IReadOnlyList<StoredMessage> messages)
{
Messages = messages;
}
}

View File

@@ -0,0 +1,34 @@
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Storage;
namespace NATS.Server.JetStream.Consumers;
public sealed class PushConsumerEngine
{
public void Enqueue(ConsumerHandle consumer, StoredMessage message)
{
consumer.PushFrames.Enqueue(new PushFrame
{
IsData = true,
Message = message,
});
if (consumer.Config.AckPolicy == AckPolicy.Explicit)
consumer.AckProcessor.Register(message.Sequence, consumer.Config.AckWaitMs);
if (consumer.Config.HeartbeatMs > 0)
{
consumer.PushFrames.Enqueue(new PushFrame
{
IsHeartbeat = true,
});
}
}
}
public sealed class PushFrame
{
public bool IsData { get; init; }
public bool IsHeartbeat { get; init; }
public StoredMessage? Message { get; init; }
}

View File

@@ -0,0 +1,26 @@
using NATS.Server.Configuration;
namespace NATS.Server.JetStream;
public sealed class JetStreamService : IAsyncDisposable
{
private readonly JetStreamOptions _options;
public bool IsRunning { get; private set; }
public JetStreamService(JetStreamOptions options)
{
_options = options;
}
public Task StartAsync(CancellationToken ct)
{
IsRunning = true;
return Task.CompletedTask;
}
public ValueTask DisposeAsync()
{
IsRunning = false;
return ValueTask.CompletedTask;
}
}

View File

@@ -0,0 +1,16 @@
using NATS.Server.JetStream.Storage;
namespace NATS.Server.JetStream.MirrorSource;
public sealed class MirrorCoordinator
{
private readonly IStreamStore _targetStore;
public MirrorCoordinator(IStreamStore targetStore)
{
_targetStore = targetStore;
}
public Task OnOriginAppendAsync(StoredMessage message, CancellationToken ct)
=> _targetStore.AppendAsync(message.Subject, message.Payload, ct).AsTask();
}

View File

@@ -0,0 +1,16 @@
using NATS.Server.JetStream.Storage;
namespace NATS.Server.JetStream.MirrorSource;
public sealed class SourceCoordinator
{
private readonly IStreamStore _targetStore;
public SourceCoordinator(IStreamStore targetStore)
{
_targetStore = targetStore;
}
public Task OnOriginAppendAsync(StoredMessage message, CancellationToken ct)
=> _targetStore.AppendAsync(message.Subject, message.Payload, ct).AsTask();
}

View File

@@ -0,0 +1,18 @@
namespace NATS.Server.JetStream.Models;
public sealed class ConsumerConfig
{
public string DurableName { get; set; } = string.Empty;
public string? FilterSubject { get; set; }
public AckPolicy AckPolicy { get; set; } = AckPolicy.None;
public int AckWaitMs { get; set; } = 30_000;
public int MaxDeliver { get; set; } = 1;
public bool Push { get; set; }
public int HeartbeatMs { get; set; }
}
public enum AckPolicy
{
None,
Explicit,
}

View File

@@ -0,0 +1,11 @@
namespace NATS.Server.JetStream.Models;
public sealed class StreamConfig
{
public string Name { get; set; } = string.Empty;
public List<string> Subjects { get; set; } = [];
public int MaxMsgs { get; set; }
public int Replicas { get; set; } = 1;
public string? Mirror { get; set; }
public string? Source { get; set; }
}

View File

@@ -0,0 +1,8 @@
namespace NATS.Server.JetStream.Models;
public sealed class StreamState
{
public ulong Messages { get; set; }
public ulong FirstSeq { get; set; }
public ulong LastSeq { get; set; }
}

View File

@@ -0,0 +1,39 @@
namespace NATS.Server.JetStream.Publish;
public sealed class JetStreamPublisher
{
private readonly StreamManager _streamManager;
private readonly PublishPreconditions _preconditions = new();
public JetStreamPublisher(StreamManager streamManager)
{
_streamManager = streamManager;
}
public bool TryCapture(string subject, ReadOnlyMemory<byte> payload, out PubAck ack)
=> TryCapture(subject, payload, null, out ack);
public bool TryCapture(string subject, ReadOnlyMemory<byte> payload, string? msgId, out PubAck ack)
{
if (_preconditions.IsDuplicate(msgId, out var existingSequence))
{
ack = new PubAck
{
Seq = existingSequence,
ErrorCode = 10071,
};
return true;
}
var captured = _streamManager.Capture(subject, payload);
if (captured == null)
{
ack = new PubAck();
return false;
}
ack = captured;
_preconditions.Record(msgId, ack.Seq);
return true;
}
}

View File

@@ -0,0 +1,8 @@
namespace NATS.Server.JetStream.Publish;
public sealed class PubAck
{
public string Stream { get; init; } = string.Empty;
public ulong Seq { get; init; }
public int? ErrorCode { get; init; }
}

View File

@@ -0,0 +1,25 @@
using System.Collections.Concurrent;
namespace NATS.Server.JetStream.Publish;
public sealed class PublishPreconditions
{
private readonly ConcurrentDictionary<string, ulong> _dedupe = new(StringComparer.Ordinal);
public bool IsDuplicate(string? msgId, out ulong existingSequence)
{
existingSequence = 0;
if (string.IsNullOrEmpty(msgId))
return false;
return _dedupe.TryGetValue(msgId, out existingSequence);
}
public void Record(string? msgId, ulong sequence)
{
if (string.IsNullOrEmpty(msgId))
return;
_dedupe[msgId] = sequence;
}
}

View File

@@ -0,0 +1,127 @@
using System.Text.Json;
using NATS.Server.JetStream.Models;
namespace NATS.Server.JetStream.Storage;
public sealed class FileStore : IStreamStore, IAsyncDisposable
{
private readonly string _dataFilePath;
private readonly Dictionary<ulong, StoredMessage> _messages = new();
private ulong _last;
public FileStore(FileStoreOptions options)
{
Directory.CreateDirectory(options.Directory);
_dataFilePath = Path.Combine(options.Directory, "messages.jsonl");
LoadExisting();
}
public async ValueTask<ulong> AppendAsync(string subject, ReadOnlyMemory<byte> payload, CancellationToken ct)
{
_last++;
var stored = new StoredMessage
{
Sequence = _last,
Subject = subject,
Payload = payload.ToArray(),
};
_messages[_last] = stored;
var line = JsonSerializer.Serialize(new FileRecord
{
Sequence = stored.Sequence,
Subject = stored.Subject,
PayloadBase64 = Convert.ToBase64String(stored.Payload.ToArray()),
});
await File.AppendAllTextAsync(_dataFilePath, line + Environment.NewLine, ct);
return _last;
}
public ValueTask<StoredMessage?> LoadAsync(ulong sequence, CancellationToken ct)
{
_messages.TryGetValue(sequence, out var msg);
return ValueTask.FromResult(msg);
}
public ValueTask PurgeAsync(CancellationToken ct)
{
_messages.Clear();
_last = 0;
if (File.Exists(_dataFilePath))
File.Delete(_dataFilePath);
return ValueTask.CompletedTask;
}
public ValueTask<StreamState> GetStateAsync(CancellationToken ct)
{
return ValueTask.FromResult(new StreamState
{
Messages = (ulong)_messages.Count,
FirstSeq = _messages.Count == 0 ? 0UL : _messages.Keys.Min(),
LastSeq = _last,
});
}
public void TrimToMaxMessages(ulong maxMessages)
{
while ((ulong)_messages.Count > maxMessages)
{
var first = _messages.Keys.Min();
_messages.Remove(first);
}
RewriteDataFile();
}
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
private void LoadExisting()
{
if (!File.Exists(_dataFilePath))
return;
foreach (var line in File.ReadLines(_dataFilePath))
{
if (string.IsNullOrWhiteSpace(line))
continue;
var record = JsonSerializer.Deserialize<FileRecord>(line);
if (record == null)
continue;
var message = new StoredMessage
{
Sequence = record.Sequence,
Subject = record.Subject ?? string.Empty,
Payload = Convert.FromBase64String(record.PayloadBase64 ?? string.Empty),
};
_messages[message.Sequence] = message;
if (message.Sequence > _last)
_last = message.Sequence;
}
}
private void RewriteDataFile()
{
var lines = new List<string>(_messages.Count);
foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value))
{
lines.Add(JsonSerializer.Serialize(new FileRecord
{
Sequence = message.Sequence,
Subject = message.Subject,
PayloadBase64 = Convert.ToBase64String(message.Payload.ToArray()),
}));
}
File.WriteAllLines(_dataFilePath, lines);
}
private sealed class FileRecord
{
public ulong Sequence { get; init; }
public string? Subject { get; init; }
public string? PayloadBase64 { get; init; }
}
}

View File

@@ -0,0 +1,6 @@
namespace NATS.Server.JetStream.Storage;
public sealed class FileStoreBlock
{
public required string Path { get; init; }
}

View File

@@ -0,0 +1,6 @@
namespace NATS.Server.JetStream.Storage;
public sealed class FileStoreOptions
{
public string Directory { get; set; } = string.Empty;
}

View File

@@ -0,0 +1,11 @@
using NATS.Server.JetStream.Models;
namespace NATS.Server.JetStream.Storage;
public interface IStreamStore
{
ValueTask<ulong> AppendAsync(string subject, ReadOnlyMemory<byte> payload, CancellationToken ct);
ValueTask<StoredMessage?> LoadAsync(ulong sequence, CancellationToken ct);
ValueTask PurgeAsync(CancellationToken ct);
ValueTask<StreamState> GetStateAsync(CancellationToken ct);
}

View File

@@ -0,0 +1,69 @@
using NATS.Server.JetStream.Models;
namespace NATS.Server.JetStream.Storage;
public sealed class MemStore : IStreamStore
{
private readonly object _gate = new();
private ulong _last;
private readonly Dictionary<ulong, StoredMessage> _messages = new();
public ValueTask<ulong> AppendAsync(string subject, ReadOnlyMemory<byte> payload, CancellationToken ct)
{
lock (_gate)
{
_last++;
_messages[_last] = new StoredMessage
{
Sequence = _last,
Subject = subject,
Payload = payload,
};
return ValueTask.FromResult(_last);
}
}
public ValueTask<StoredMessage?> LoadAsync(ulong sequence, CancellationToken ct)
{
lock (_gate)
{
_messages.TryGetValue(sequence, out var msg);
return ValueTask.FromResult(msg);
}
}
public ValueTask PurgeAsync(CancellationToken ct)
{
lock (_gate)
{
_messages.Clear();
_last = 0;
return ValueTask.CompletedTask;
}
}
public ValueTask<StreamState> GetStateAsync(CancellationToken ct)
{
lock (_gate)
{
return ValueTask.FromResult(new StreamState
{
Messages = (ulong)_messages.Count,
FirstSeq = _messages.Count == 0 ? 0UL : _messages.Keys.Min(),
LastSeq = _last,
});
}
}
public void TrimToMaxMessages(ulong maxMessages)
{
lock (_gate)
{
while ((ulong)_messages.Count > maxMessages)
{
var first = _messages.Keys.Min();
_messages.Remove(first);
}
}
}
}

View File

@@ -0,0 +1,9 @@
namespace NATS.Server.JetStream.Storage;
public sealed class StoredMessage
{
public ulong Sequence { get; init; }
public string Subject { get; init; } = string.Empty;
public ReadOnlyMemory<byte> Payload { get; init; }
public bool Redelivered { get; init; }
}

View File

@@ -0,0 +1,202 @@
using System.Collections.Concurrent;
using NATS.Server.Auth;
using NATS.Server.JetStream.Api;
using NATS.Server.JetStream.Cluster;
using NATS.Server.JetStream.MirrorSource;
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Publish;
using NATS.Server.JetStream.Storage;
using NATS.Server.Subscriptions;
namespace NATS.Server.JetStream;
public sealed class StreamManager
{
private readonly Account? _account;
private readonly JetStreamMetaGroup? _metaGroup;
private readonly ConcurrentDictionary<string, StreamHandle> _streams =
new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, StreamReplicaGroup> _replicaGroups =
new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, List<MirrorCoordinator>> _mirrorsByOrigin =
new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, List<SourceCoordinator>> _sourcesByOrigin =
new(StringComparer.Ordinal);
public StreamManager(JetStreamMetaGroup? metaGroup = null, Account? account = null)
{
_metaGroup = metaGroup;
_account = account;
}
public IReadOnlyCollection<string> StreamNames => _streams.Keys.ToArray();
public JetStreamApiResponse CreateOrUpdate(StreamConfig config)
{
if (string.IsNullOrWhiteSpace(config.Name))
return JetStreamApiResponse.ErrorResponse(400, "stream name required");
var normalized = NormalizeConfig(config);
var isCreate = !_streams.ContainsKey(normalized.Name);
if (isCreate && _account is not null && !_account.TryReserveStream())
return JetStreamApiResponse.ErrorResponse(10027, "maximum streams exceeded");
var handle = _streams.AddOrUpdate(
normalized.Name,
_ => new StreamHandle(normalized, new MemStore()),
(_, existing) => existing with { Config = normalized });
_replicaGroups.AddOrUpdate(
normalized.Name,
_ => new StreamReplicaGroup(normalized.Name, normalized.Replicas),
(_, existing) => existing.Nodes.Count == Math.Max(normalized.Replicas, 1)
? existing
: new StreamReplicaGroup(normalized.Name, normalized.Replicas));
RebuildReplicationCoordinators();
_metaGroup?.ProposeCreateStreamAsync(normalized, default).GetAwaiter().GetResult();
return BuildStreamInfoResponse(handle);
}
public JetStreamApiResponse GetInfo(string name)
{
if (_streams.TryGetValue(name, out var stream))
return BuildStreamInfoResponse(stream);
return JetStreamApiResponse.NotFound($"$JS.API.STREAM.INFO.{name}");
}
public bool TryGet(string name, out StreamHandle handle) => _streams.TryGetValue(name, out handle!);
public ValueTask<StreamState> GetStateAsync(string name, CancellationToken ct)
{
if (_streams.TryGetValue(name, out var stream))
return stream.Store.GetStateAsync(ct);
return ValueTask.FromResult(new StreamState());
}
public StreamHandle? FindBySubject(string subject)
{
foreach (var stream in _streams.Values)
{
if (stream.Config.Subjects.Any(p => SubjectMatch.MatchLiteral(subject, p)))
return stream;
}
return null;
}
public PubAck? Capture(string subject, ReadOnlyMemory<byte> payload)
{
var stream = FindBySubject(subject);
if (stream == null)
return null;
if (_replicaGroups.TryGetValue(stream.Config.Name, out var replicaGroup))
_ = replicaGroup.ProposeAsync($"PUB {subject}", default).GetAwaiter().GetResult();
var seq = stream.Store.AppendAsync(subject, payload, default).GetAwaiter().GetResult();
EnforceLimits(stream);
var stored = stream.Store.LoadAsync(seq, default).GetAwaiter().GetResult();
if (stored != null)
ReplicateIfConfigured(stream.Config.Name, stored);
return new PubAck
{
Stream = stream.Config.Name,
Seq = seq,
};
}
public Task StepDownStreamLeaderAsync(string stream, CancellationToken ct)
{
if (_replicaGroups.TryGetValue(stream, out var replicaGroup))
return replicaGroup.StepDownAsync(ct);
return Task.CompletedTask;
}
private static StreamConfig NormalizeConfig(StreamConfig config)
{
var copy = new StreamConfig
{
Name = config.Name,
Subjects = config.Subjects.Count == 0 ? [] : [.. config.Subjects],
MaxMsgs = config.MaxMsgs,
Replicas = config.Replicas,
Mirror = config.Mirror,
Source = config.Source,
};
return copy;
}
private static JetStreamApiResponse BuildStreamInfoResponse(StreamHandle handle)
{
var state = handle.Store.GetStateAsync(default).GetAwaiter().GetResult();
return new JetStreamApiResponse
{
StreamInfo = new JetStreamStreamInfo
{
Config = handle.Config,
State = state,
},
};
}
private static void EnforceLimits(StreamHandle stream)
{
if (stream.Config.MaxMsgs <= 0)
return;
var maxMessages = (ulong)stream.Config.MaxMsgs;
if (stream.Store is MemStore memStore)
{
memStore.TrimToMaxMessages(maxMessages);
return;
}
if (stream.Store is FileStore fileStore)
fileStore.TrimToMaxMessages(maxMessages);
}
private void RebuildReplicationCoordinators()
{
_mirrorsByOrigin.Clear();
_sourcesByOrigin.Clear();
foreach (var stream in _streams.Values)
{
if (!string.IsNullOrWhiteSpace(stream.Config.Mirror)
&& _streams.TryGetValue(stream.Config.Mirror, out _))
{
var list = _mirrorsByOrigin.GetOrAdd(stream.Config.Mirror, _ => []);
list.Add(new MirrorCoordinator(stream.Store));
}
if (!string.IsNullOrWhiteSpace(stream.Config.Source)
&& _streams.TryGetValue(stream.Config.Source, out _))
{
var list = _sourcesByOrigin.GetOrAdd(stream.Config.Source, _ => []);
list.Add(new SourceCoordinator(stream.Store));
}
}
}
private void ReplicateIfConfigured(string originStream, StoredMessage stored)
{
if (_mirrorsByOrigin.TryGetValue(originStream, out var mirrors))
{
foreach (var mirror in mirrors)
mirror.OnOriginAppendAsync(stored, default).GetAwaiter().GetResult();
}
if (_sourcesByOrigin.TryGetValue(originStream, out var sources))
{
foreach (var source in sources)
source.OnOriginAppendAsync(stored, default).GetAwaiter().GetResult();
}
}
}
public sealed record StreamHandle(StreamConfig Config, IStreamStore Store);

View File

@@ -0,0 +1,26 @@
using NATS.Server.JetStream.Models;
namespace NATS.Server.JetStream.Validation;
public static class JetStreamConfigValidator
{
public static ValidationResult Validate(StreamConfig config)
=> string.IsNullOrWhiteSpace(config.Name) || config.Subjects.Count == 0
? ValidationResult.Invalid("name/subjects required")
: ValidationResult.Valid();
}
public sealed class ValidationResult
{
public bool IsValid { get; }
public string Message { get; }
private ValidationResult(bool isValid, string message)
{
IsValid = isValid;
Message = message;
}
public static ValidationResult Valid() => new(true, string.Empty);
public static ValidationResult Invalid(string message) => new(false, message);
}

View File

@@ -0,0 +1,11 @@
namespace NATS.Server.LeafNodes;
public sealed class LeafConnection
{
public string RemoteEndpoint { get; }
public LeafConnection(string remoteEndpoint)
{
RemoteEndpoint = remoteEndpoint;
}
}

View File

@@ -0,0 +1,31 @@
using Microsoft.Extensions.Logging;
using NATS.Server.Configuration;
namespace NATS.Server.LeafNodes;
public sealed class LeafNodeManager : IAsyncDisposable
{
private readonly LeafNodeOptions _options;
private readonly ServerStats _stats;
private readonly ILogger<LeafNodeManager> _logger;
public LeafNodeManager(LeafNodeOptions options, ServerStats stats, ILogger<LeafNodeManager> logger)
{
_options = options;
_stats = stats;
_logger = logger;
}
public Task StartAsync(CancellationToken ct)
{
_logger.LogDebug("Leaf manager started (listen={Host}:{Port})", _options.Host, _options.Port);
Interlocked.Exchange(ref _stats.Leafs, 0);
return Task.CompletedTask;
}
public ValueTask DisposeAsync()
{
_logger.LogDebug("Leaf manager stopped");
return ValueTask.CompletedTask;
}
}

View File

@@ -0,0 +1,62 @@
using System.Text.Json.Serialization;
namespace NATS.Server.Monitoring;
public sealed class JszHandler
{
private readonly NatsServer _server;
private readonly NatsOptions _options;
public JszHandler(NatsServer server, NatsOptions options)
{
_server = server;
_options = options;
}
public JszResponse Build()
{
return new JszResponse
{
ServerId = _server.ServerId,
Now = DateTime.UtcNow,
Enabled = _server.Stats.JetStreamEnabled,
Memory = 0,
Storage = 0,
Streams = _server.JetStreamStreams,
Consumers = _server.JetStreamConsumers,
Config = new JetStreamConfig
{
MaxMemory = _options.JetStream?.MaxMemoryStore ?? 0,
MaxStorage = _options.JetStream?.MaxFileStore ?? 0,
StoreDir = _options.JetStream?.StoreDir ?? string.Empty,
},
};
}
}
public sealed class JszResponse
{
[JsonPropertyName("server_id")]
public string ServerId { get; set; } = string.Empty;
[JsonPropertyName("now")]
public DateTime Now { get; set; }
[JsonPropertyName("enabled")]
public bool Enabled { get; set; }
[JsonPropertyName("memory")]
public ulong Memory { get; set; }
[JsonPropertyName("storage")]
public ulong Storage { get; set; }
[JsonPropertyName("streams")]
public int Streams { get; set; }
[JsonPropertyName("consumers")]
public int Consumers { get; set; }
[JsonPropertyName("config")]
public JetStreamConfig Config { get; set; } = new();
}

View File

@@ -16,6 +16,7 @@ public sealed class MonitorServer : IAsyncDisposable
private readonly VarzHandler _varzHandler;
private readonly ConnzHandler _connzHandler;
private readonly SubszHandler _subszHandler;
private readonly JszHandler _jszHandler;
public MonitorServer(NatsServer server, NatsOptions options, ServerStats stats, ILoggerFactory loggerFactory)
{
@@ -31,6 +32,7 @@ public sealed class MonitorServer : IAsyncDisposable
_varzHandler = new VarzHandler(server, options);
_connzHandler = new ConnzHandler(server);
_subszHandler = new SubszHandler(server);
_jszHandler = new JszHandler(server, options);
_app.MapGet(basePath + "/", () =>
{
@@ -100,7 +102,7 @@ public sealed class MonitorServer : IAsyncDisposable
_app.MapGet(basePath + "/jsz", () =>
{
stats.HttpReqStats.AddOrUpdate("/jsz", 1, (_, v) => v + 1);
return Results.Ok(new { });
return Results.Ok(_jszHandler.Build());
});
}

View File

@@ -443,6 +443,12 @@ public sealed class JetStreamStats
[JsonPropertyName("ha_assets")]
public int HaAssets { get; set; }
[JsonPropertyName("streams")]
public int Streams { get; set; }
[JsonPropertyName("consumers")]
public int Consumers { get; set; }
[JsonPropertyName("api")]
public JetStreamApiStats Api { get; set; } = new();
}

View File

@@ -122,6 +122,23 @@ public sealed class VarzHandler : IDisposable
ConfigLoadTime = _server.StartTime,
HttpReqStats = stats.HttpReqStats.ToDictionary(kv => kv.Key, kv => (ulong)kv.Value),
Mqtt = BuildMqttVarz(),
JetStream = new JetStreamVarz
{
Config = new JetStreamConfig
{
MaxMemory = _options.JetStream?.MaxMemoryStore ?? 0,
MaxStorage = _options.JetStream?.MaxFileStore ?? 0,
StoreDir = _options.JetStream?.StoreDir ?? string.Empty,
},
Stats = new JetStreamStats
{
Accounts = _options.JetStream is null ? 0 : 1,
HaAssets = _server.JetStreamStreams,
Streams = _server.JetStreamStreams,
Consumers = _server.JetStreamConsumers,
},
},
Mqtt = BuildMqttVarz(),
};
}
finally

View File

@@ -9,6 +9,7 @@ using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using NATS.Server.Auth;
using NATS.Server.Auth.Jwt;
using NATS.Server.JetStream.Publish;
using NATS.Server.Protocol;
using NATS.Server.Subscriptions;
using NATS.Server.Tls;
@@ -32,6 +33,7 @@ public interface ISubListAccess
public sealed class NatsClient : INatsClient, IDisposable
{
private static readonly ClientCommandMatrix CommandMatrix = new();
private readonly Socket _socket;
private readonly Stream _stream;
private readonly NatsOptions _options;
@@ -49,7 +51,7 @@ public sealed class NatsClient : INatsClient, IDisposable
private readonly ServerStats _serverStats;
public ulong Id { get; }
public ClientKind Kind => ClientKind.Client;
public ClientKind Kind { get; }
public ClientOptions? ClientOpts { get; private set; }
public IMessageRouter? Router { get; set; }
public Account? Account { get; private set; }
@@ -105,11 +107,14 @@ public sealed class NatsClient : INatsClient, IDisposable
public bool InfoAlreadySent { get; set; }
public IReadOnlyDictionary<string, Subscription> Subscriptions => _subs;
public PubAck? LastJetStreamPubAck { get; private set; }
public NatsClient(ulong id, Stream stream, Socket socket, NatsOptions options, ServerInfo serverInfo,
AuthService authService, byte[]? nonce, ILogger logger, ServerStats serverStats)
AuthService authService, byte[]? nonce, ILogger logger, ServerStats serverStats,
ClientKind kind = ClientKind.Client)
{
Id = id;
Kind = kind;
_socket = socket;
_stream = stream;
_options = options;
@@ -315,6 +320,13 @@ public sealed class NatsClient : INatsClient, IDisposable
{
Interlocked.Exchange(ref _lastActivityTicks, DateTime.UtcNow.Ticks);
if (!CommandMatrix.IsAllowed(Kind, cmd.Operation))
{
_logger.LogDebug("Command {Command} is not allowed for client kind {ClientKind}", cmd.Operation, Kind);
await SendErrAndCloseAsync("Parser Error");
return;
}
// If auth is required and CONNECT hasn't been received yet,
// only allow CONNECT and PING commands
if (_authService.IsAuthRequired && !ConnectReceived)
@@ -411,6 +423,10 @@ public sealed class NatsClient : INatsClient, IDisposable
{
var accountName = authResult.AccountName ?? Account.GlobalAccountName;
Account = server.GetOrCreateAccount(accountName);
if (authResult.MaxJetStreamStreams > 0)
Account.MaxJetStreamStreams = authResult.MaxJetStreamStreams;
if (!string.IsNullOrWhiteSpace(authResult.JetStreamTier))
Account.JetStreamTier = authResult.JetStreamTier;
if (!Account.AddClient(Id))
{
Account = null;
@@ -524,6 +540,8 @@ public sealed class NatsClient : INatsClient, IDisposable
_logger.LogDebug("SUB {Subject} {Sid} from client {ClientId}", cmd.Subject, cmd.Sid, Id);
Account?.SubList.Insert(sub);
if (Router is NatsServer server)
server.OnLocalSubscription(sub.Subject, sub.Queue);
}
private void ProcessUnsub(ParsedCommand cmd)
@@ -588,6 +606,11 @@ public sealed class NatsClient : INatsClient, IDisposable
Router?.ProcessMessage(cmd.Subject!, cmd.ReplyTo, headers, payload, this);
}
public void RecordJetStreamPubAck(PubAck ack)
{
LastJetStreamPubAck = ack;
}
private void SendInfo()
{
// Use the cached INFO bytes from the server when there is no per-connection

View File

@@ -1,5 +1,6 @@
using System.Security.Authentication;
using NATS.Server.Auth;
using NATS.Server.Configuration;
using NATS.Server.Tls;
namespace NATS.Server;
@@ -118,6 +119,12 @@ public sealed class NatsOptions
// MQTT configuration (parsed from config, no listener yet)
public MqttOptions? Mqtt { get; set; }
// Cluster and JetStream settings
public ClusterOptions? Cluster { get; set; }
public GatewayOptions? Gateway { get; set; }
public LeafNodeOptions? LeafNode { get; set; }
public JetStreamOptions? JetStream { get; set; }
public bool HasTls => TlsCert != null && TlsKey != null;
// WebSocket

View File

@@ -10,9 +10,15 @@ using NATS.NKeys;
using NATS.Server.Auth;
using NATS.Server.Configuration;
using NATS.Server.Events;
using NATS.Server.Gateways;
using NATS.Server.Imports;
using NATS.Server.JetStream;
using NATS.Server.JetStream.Api;
using NATS.Server.JetStream.Publish;
using NATS.Server.LeafNodes;
using NATS.Server.Monitoring;
using NATS.Server.Protocol;
using NATS.Server.Routes;
using NATS.Server.Subscriptions;
using NATS.Server.Tls;
using NATS.Server.WebSocket;
@@ -42,6 +48,14 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
private readonly SslServerAuthenticationOptions? _sslOptions;
private readonly TlsRateLimiter? _tlsRateLimiter;
private readonly SubjectTransform[] _subjectTransforms;
private readonly RouteManager? _routeManager;
private readonly GatewayManager? _gatewayManager;
private readonly LeafNodeManager? _leafNodeManager;
private readonly JetStreamService? _jetStreamService;
private readonly JetStreamApiRouter? _jetStreamApiRouter;
private readonly StreamManager? _jetStreamStreamManager;
private readonly ConsumerManager? _jetStreamConsumerManager;
private readonly JetStreamPublisher? _jetStreamPublisher;
private Socket? _listener;
private Socket? _wsListener;
private readonly TaskCompletionSource _wsAcceptLoopExited = new(TaskCreationOptions.RunContinuationsAsynchronously);
@@ -79,12 +93,37 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
public InternalEventSystem? EventSystem => _eventSystem;
public bool IsShuttingDown => Volatile.Read(ref _shutdown) != 0;
public bool IsLameDuckMode => Volatile.Read(ref _lameDuck) != 0;
public string? ClusterListen => _routeManager?.ListenEndpoint;
public JetStreamApiRouter? JetStreamApiRouter => _jetStreamApiRouter;
public int JetStreamStreams => _jetStreamStreamManager?.StreamNames.Count ?? 0;
public int JetStreamConsumers => _jetStreamConsumerManager?.ConsumerCount ?? 0;
public Action? ReOpenLogFile { get; set; }
public IEnumerable<NatsClient> GetClients() => _clients.Values;
public IEnumerable<ClosedClient> GetClosedClients() => _closedClients;
public IEnumerable<Auth.Account> GetAccounts() => _accounts.Values;
public bool HasRemoteInterest(string subject) => _globalAccount.SubList.HasRemoteInterest(subject);
public bool TryCaptureJetStreamPublish(string subject, ReadOnlyMemory<byte> payload, out PubAck ack)
{
if (_jetStreamPublisher != null && _jetStreamPublisher.TryCapture(subject, payload, out ack))
{
if (ack.ErrorCode == null
&& _jetStreamConsumerManager != null
&& _jetStreamStreamManager != null
&& _jetStreamStreamManager.TryGet(ack.Stream, out var streamHandle))
{
var stored = streamHandle.Store.LoadAsync(ack.Seq, default).GetAwaiter().GetResult();
if (stored != null)
_jetStreamConsumerManager.OnPublished(ack.Stream, stored);
}
return true;
}
ack = new PubAck();
return false;
}
public Task WaitForReadyAsync() => _listeningStarted.Task;
@@ -118,6 +157,15 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
// Close listeners to stop accept loops
_listener?.Close();
_wsListener?.Close();
if (_routeManager != null)
await _routeManager.DisposeAsync();
if (_gatewayManager != null)
await _gatewayManager.DisposeAsync();
if (_leafNodeManager != null)
await _leafNodeManager.DisposeAsync();
if (_jetStreamService != null)
await _jetStreamService.DisposeAsync();
_stats.JetStreamEnabled = false;
// Wait for accept loops to exit
await _acceptLoopExited.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
@@ -314,6 +362,33 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
AuthRequired = _authService.IsAuthRequired,
};
if (options.Cluster != null)
{
_routeManager = new RouteManager(options.Cluster, _stats, _serverInfo.ServerId, ApplyRemoteSubscription,
_loggerFactory.CreateLogger<RouteManager>());
}
if (options.Gateway != null)
{
_gatewayManager = new GatewayManager(options.Gateway, _stats,
_loggerFactory.CreateLogger<GatewayManager>());
}
if (options.LeafNode != null)
{
_leafNodeManager = new LeafNodeManager(options.LeafNode, _stats,
_loggerFactory.CreateLogger<LeafNodeManager>());
}
if (options.JetStream != null)
{
_jetStreamStreamManager = new StreamManager();
_jetStreamConsumerManager = new ConsumerManager();
_jetStreamService = new JetStreamService(options.JetStream);
_jetStreamApiRouter = new JetStreamApiRouter(_jetStreamStreamManager, _jetStreamConsumerManager);
_jetStreamPublisher = new JetStreamPublisher(_jetStreamStreamManager);
}
if (options.HasTls)
{
_sslOptions = TlsHelper.BuildServerAuthOptions(options);
@@ -441,6 +516,18 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
_ = RunWebSocketAcceptLoopAsync(linked.Token);
}
if (_routeManager != null)
await _routeManager.StartAsync(linked.Token);
if (_gatewayManager != null)
await _gatewayManager.StartAsync(linked.Token);
if (_leafNodeManager != null)
await _leafNodeManager.StartAsync(linked.Token);
if (_jetStreamService != null)
{
await _jetStreamService.StartAsync(linked.Token);
_stats.JetStreamEnabled = true;
}
_listeningStarted.TrySetResult();
_eventSystem?.Start(this);
@@ -705,9 +792,22 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
}
}
public void OnLocalSubscription(string subject, string? queue)
{
_routeManager?.PropagateLocalSubscription(subject, queue);
}
private void ApplyRemoteSubscription(RemoteSubscription sub)
{
_globalAccount.SubList.ApplyRemoteSub(sub);
}
public void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory<byte> headers,
ReadOnlyMemory<byte> payload, NatsClient sender)
{
if (TryCaptureJetStreamPublish(subject, payload, out var pubAck))
sender.RecordJetStreamPubAck(pubAck);
// Apply subject transforms
if (_subjectTransforms.Length > 0)
{
@@ -1310,10 +1410,22 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
/// the changes, and applies reloadable settings. CLI overrides are preserved.
/// </summary>
public void ReloadConfig()
{
ReloadConfigCore(throwOnError: false);
}
public void ReloadConfigOrThrow()
{
ReloadConfigCore(throwOnError: true);
}
private void ReloadConfigCore(bool throwOnError)
{
if (_options.ConfigFile == null)
{
_logger.LogWarning("No config file specified, cannot reload");
if (throwOnError)
throw new InvalidOperationException("No config file specified.");
return;
}
@@ -1339,6 +1451,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
{
foreach (var err in errors)
_logger.LogError("Config reload error: {Error}", err);
if (throwOnError)
throw new InvalidOperationException(string.Join("; ", errors));
return;
}
@@ -1350,6 +1464,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
catch (Exception ex)
{
_logger.LogError(ex, "Failed to reload config file: {ConfigFile}", _options.ConfigFile);
if (throwOnError)
throw;
}
}
@@ -1454,6 +1570,11 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
_tlsRateLimiter?.Dispose();
_listener?.Dispose();
_wsListener?.Dispose();
_routeManager?.DisposeAsync().AsTask().GetAwaiter().GetResult();
_gatewayManager?.DisposeAsync().AsTask().GetAwaiter().GetResult();
_leafNodeManager?.DisposeAsync().AsTask().GetAwaiter().GetResult();
_jetStreamService?.DisposeAsync().AsTask().GetAwaiter().GetResult();
_stats.JetStreamEnabled = false;
foreach (var client in _clients.Values)
client.Dispose();
foreach (var account in _accounts.Values)

View File

@@ -0,0 +1,17 @@
namespace NATS.Server.Protocol;
public sealed class ClientCommandMatrix
{
public bool IsAllowed(ClientKind kind, string? op)
{
if (string.IsNullOrWhiteSpace(op))
return true;
return (kind, op.ToUpperInvariant()) switch
{
(ClientKind.Router, "RS+") => true,
(_, "RS+") => false,
_ => true,
};
}
}

View File

@@ -0,0 +1,12 @@
namespace NATS.Server.Protocol;
public enum ClientKind
{
Client,
Router,
Gateway,
Leaf,
System,
JetStream,
Account,
}

View File

@@ -21,6 +21,7 @@ public enum CommandType
public readonly struct ParsedCommand
{
public CommandType Type { get; init; }
public string? Operation { get; init; }
public string? Subject { get; init; }
public string? ReplyTo { get; init; }
public string? Queue { get; init; }
@@ -29,7 +30,8 @@ public readonly struct ParsedCommand
public int HeaderSize { get; init; }
public ReadOnlyMemory<byte> Payload { get; init; }
public static ParsedCommand Simple(CommandType type) => new() { Type = type, MaxMessages = -1 };
public static ParsedCommand Simple(CommandType type, string operation) =>
new() { Type = type, Operation = operation, MaxMessages = -1 };
}
public sealed class NatsParser
@@ -46,6 +48,7 @@ public sealed class NatsParser
private string? _pendingReplyTo;
private int _pendingHeaderSize;
private CommandType _pendingType;
private string _pendingOperation = string.Empty;
public NatsParser(int maxPayload = NatsProtocol.MaxPayloadSize, ILogger? logger = null)
{
@@ -103,7 +106,7 @@ public sealed class NatsParser
case (byte)'p':
if (b1 == (byte)'i') // PING
{
command = ParsedCommand.Simple(CommandType.Ping);
command = ParsedCommand.Simple(CommandType.Ping, "PING");
buffer = buffer.Slice(reader.Position);
TraceInOp("PING");
return true;
@@ -111,7 +114,7 @@ public sealed class NatsParser
if (b1 == (byte)'o') // PONG
{
command = ParsedCommand.Simple(CommandType.Pong);
command = ParsedCommand.Simple(CommandType.Pong, "PONG");
buffer = buffer.Slice(reader.Position);
TraceInOp("PONG");
return true;
@@ -177,13 +180,13 @@ public sealed class NatsParser
break;
case (byte)'+': // +OK
command = ParsedCommand.Simple(CommandType.Ok);
command = ParsedCommand.Simple(CommandType.Ok, "+OK");
buffer = buffer.Slice(reader.Position);
TraceInOp("+OK");
return true;
case (byte)'-': // -ERR
command = ParsedCommand.Simple(CommandType.Err);
command = ParsedCommand.Simple(CommandType.Err, "-ERR");
buffer = buffer.Slice(reader.Position);
TraceInOp("-ERR");
return true;
@@ -236,6 +239,7 @@ public sealed class NatsParser
_pendingReplyTo = reply;
_pendingHeaderSize = -1;
_pendingType = CommandType.Pub;
_pendingOperation = "PUB";
TraceInOp("PUB", argsSpan);
return TryReadPayload(ref buffer, out command);
@@ -286,6 +290,7 @@ public sealed class NatsParser
_pendingReplyTo = reply;
_pendingHeaderSize = hdrSize;
_pendingType = CommandType.HPub;
_pendingOperation = "HPUB";
TraceInOp("HPUB", argsSpan);
return TryReadPayload(ref buffer, out command);
@@ -315,6 +320,7 @@ public sealed class NatsParser
command = new ParsedCommand
{
Type = _pendingType,
Operation = _pendingOperation,
Subject = _pendingSubject,
ReplyTo = _pendingReplyTo,
Payload = payload,
@@ -339,6 +345,7 @@ public sealed class NatsParser
2 => new ParsedCommand
{
Type = CommandType.Sub,
Operation = "SUB",
Subject = Encoding.ASCII.GetString(argsSpan[ranges[0]]),
Sid = Encoding.ASCII.GetString(argsSpan[ranges[1]]),
MaxMessages = -1,
@@ -346,6 +353,7 @@ public sealed class NatsParser
3 => new ParsedCommand
{
Type = CommandType.Sub,
Operation = "SUB",
Subject = Encoding.ASCII.GetString(argsSpan[ranges[0]]),
Queue = Encoding.ASCII.GetString(argsSpan[ranges[1]]),
Sid = Encoding.ASCII.GetString(argsSpan[ranges[2]]),
@@ -367,12 +375,14 @@ public sealed class NatsParser
1 => new ParsedCommand
{
Type = CommandType.Unsub,
Operation = "UNSUB",
Sid = Encoding.ASCII.GetString(argsSpan[ranges[0]]),
MaxMessages = -1,
},
2 => new ParsedCommand
{
Type = CommandType.Unsub,
Operation = "UNSUB",
Sid = Encoding.ASCII.GetString(argsSpan[ranges[0]]),
MaxMessages = ParseSize(argsSpan[ranges[1]]),
},
@@ -391,6 +401,7 @@ public sealed class NatsParser
return new ParsedCommand
{
Type = CommandType.Connect,
Operation = "CONNECT",
Payload = json.ToArray(),
MaxMessages = -1,
};
@@ -407,6 +418,7 @@ public sealed class NatsParser
return new ParsedCommand
{
Type = CommandType.Info,
Operation = "INFO",
Payload = json.ToArray(),
MaxMessages = -1,
};

View File

@@ -0,0 +1,32 @@
namespace NATS.Server.Raft;
public sealed class RaftLog
{
private readonly List<RaftLogEntry> _entries = [];
private long _baseIndex;
public IReadOnlyList<RaftLogEntry> Entries => _entries;
public RaftLogEntry Append(int term, string command)
{
var entry = new RaftLogEntry(_baseIndex + _entries.Count + 1, term, command);
_entries.Add(entry);
return entry;
}
public void AppendReplicated(RaftLogEntry entry)
{
if (_entries.Any(e => e.Index == entry.Index))
return;
_entries.Add(entry);
}
public void ReplaceWithSnapshot(RaftSnapshot snapshot)
{
_entries.Clear();
_baseIndex = snapshot.LastIncludedIndex;
}
}
public sealed record RaftLogEntry(long Index, int Term, string Command);

View File

@@ -0,0 +1,113 @@
namespace NATS.Server.Raft;
public sealed class RaftNode
{
private int _votesReceived;
private readonly List<RaftNode> _cluster = [];
private readonly RaftReplicator _replicator = new();
private readonly RaftSnapshotStore _snapshotStore = new();
public string Id { get; }
public int Term => TermState.CurrentTerm;
public bool IsLeader => Role == RaftRole.Leader;
public RaftRole Role { get; private set; } = RaftRole.Follower;
public RaftTermState TermState { get; } = new();
public long AppliedIndex { get; set; }
public RaftLog Log { get; } = new();
public RaftNode(string id)
{
Id = id;
}
public void ConfigureCluster(IEnumerable<RaftNode> peers)
{
_cluster.Clear();
_cluster.AddRange(peers);
}
public void StartElection(int clusterSize)
{
Role = RaftRole.Candidate;
TermState.CurrentTerm++;
TermState.VotedFor = Id;
_votesReceived = 1;
TryBecomeLeader(clusterSize);
}
public VoteResponse GrantVote(int term)
{
if (term < TermState.CurrentTerm)
return new VoteResponse { Granted = false };
TermState.CurrentTerm = term;
return new VoteResponse { Granted = true };
}
public void ReceiveVote(VoteResponse response, int clusterSize = 3)
{
if (!response.Granted)
return;
_votesReceived++;
TryBecomeLeader(clusterSize);
}
public async ValueTask<long> ProposeAsync(string command, CancellationToken ct)
{
if (Role != RaftRole.Leader)
throw new InvalidOperationException("Only leader can propose entries.");
var entry = Log.Append(TermState.CurrentTerm, command);
var followers = _cluster.Where(n => n.Id != Id).ToList();
var acknowledgements = _replicator.Replicate(entry, followers);
var quorum = (_cluster.Count / 2) + 1;
if (acknowledgements + 1 >= quorum)
{
AppliedIndex = entry.Index;
foreach (var node in _cluster)
node.AppliedIndex = Math.Max(node.AppliedIndex, entry.Index);
}
await Task.CompletedTask;
return entry.Index;
}
public void ReceiveReplicatedEntry(RaftLogEntry entry)
{
Log.AppendReplicated(entry);
}
public async Task<RaftSnapshot> CreateSnapshotAsync(CancellationToken ct)
{
var snapshot = new RaftSnapshot
{
LastIncludedIndex = AppliedIndex,
LastIncludedTerm = Term,
};
await _snapshotStore.SaveAsync(snapshot, ct);
return snapshot;
}
public Task InstallSnapshotAsync(RaftSnapshot snapshot, CancellationToken ct)
{
Log.ReplaceWithSnapshot(snapshot);
AppliedIndex = snapshot.LastIncludedIndex;
return _snapshotStore.SaveAsync(snapshot, ct);
}
public void RequestStepDown()
{
Role = RaftRole.Follower;
_votesReceived = 0;
TermState.VotedFor = null;
}
private void TryBecomeLeader(int clusterSize)
{
var quorum = (clusterSize / 2) + 1;
if (_votesReceived >= quorum)
Role = RaftRole.Leader;
}
}

View File

@@ -0,0 +1,16 @@
namespace NATS.Server.Raft;
public sealed class RaftReplicator
{
public int Replicate(RaftLogEntry entry, IReadOnlyList<RaftNode> followers)
{
var acknowledgements = 0;
foreach (var follower in followers)
{
follower.ReceiveReplicatedEntry(entry);
acknowledgements++;
}
return acknowledgements;
}
}

View File

@@ -0,0 +1,12 @@
namespace NATS.Server.Raft;
public sealed class VoteRequest
{
public int Term { get; init; }
public string CandidateId { get; init; } = string.Empty;
}
public sealed class VoteResponse
{
public bool Granted { get; init; }
}

View File

@@ -0,0 +1,8 @@
namespace NATS.Server.Raft;
public sealed class RaftSnapshot
{
public long LastIncludedIndex { get; init; }
public int LastIncludedTerm { get; init; }
public byte[] Data { get; init; } = [];
}

View File

@@ -0,0 +1,17 @@
namespace NATS.Server.Raft;
public sealed class RaftSnapshotStore
{
private RaftSnapshot? _snapshot;
public Task SaveAsync(RaftSnapshot snapshot, CancellationToken ct)
{
_snapshot = snapshot;
return Task.CompletedTask;
}
public Task<RaftSnapshot?> LoadAsync(CancellationToken ct)
{
return Task.FromResult(_snapshot);
}
}

View File

@@ -0,0 +1,14 @@
namespace NATS.Server.Raft;
public sealed class RaftTermState
{
public int CurrentTerm { get; set; }
public string? VotedFor { get; set; }
}
public enum RaftRole
{
Follower,
Candidate,
Leader,
}

View File

@@ -0,0 +1,81 @@
using System.Net.Sockets;
using System.Text;
namespace NATS.Server.Routes;
public sealed class RouteConnection(Socket socket) : IAsyncDisposable
{
private readonly Socket _socket = socket;
private readonly NetworkStream _stream = new(socket, ownsSocket: true);
public string? RemoteServerId { get; private set; }
public string RemoteEndpoint => _socket.RemoteEndPoint?.ToString() ?? Guid.NewGuid().ToString("N");
public async Task PerformOutboundHandshakeAsync(string serverId, CancellationToken ct)
{
await WriteLineAsync($"ROUTE {serverId}", ct);
var line = await ReadLineAsync(ct);
RemoteServerId = ParseHandshake(line);
}
public async Task PerformInboundHandshakeAsync(string serverId, CancellationToken ct)
{
var line = await ReadLineAsync(ct);
RemoteServerId = ParseHandshake(line);
await WriteLineAsync($"ROUTE {serverId}", ct);
}
public async Task WaitUntilClosedAsync(CancellationToken ct)
{
var buffer = new byte[1024];
while (!ct.IsCancellationRequested)
{
var bytesRead = await _stream.ReadAsync(buffer, ct);
if (bytesRead == 0)
return;
}
}
public async ValueTask DisposeAsync()
{
await _stream.DisposeAsync();
}
private async Task WriteLineAsync(string line, CancellationToken ct)
{
var bytes = Encoding.ASCII.GetBytes($"{line}\r\n");
await _stream.WriteAsync(bytes, ct);
await _stream.FlushAsync(ct);
}
private async Task<string> ReadLineAsync(CancellationToken ct)
{
var bytes = new List<byte>(64);
var single = new byte[1];
while (true)
{
var read = await _stream.ReadAsync(single, ct);
if (read == 0)
throw new IOException("Route connection closed during handshake");
if (single[0] == (byte)'\n')
break;
if (single[0] != (byte)'\r')
bytes.Add(single[0]);
}
return Encoding.ASCII.GetString([.. bytes]);
}
private static string ParseHandshake(string line)
{
if (!line.StartsWith("ROUTE ", StringComparison.OrdinalIgnoreCase))
throw new InvalidOperationException("Invalid route handshake");
var id = line[6..].Trim();
if (id.Length == 0)
throw new InvalidOperationException("Route handshake missing server id");
return id;
}
}

View File

@@ -0,0 +1,224 @@
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using Microsoft.Extensions.Logging;
using NATS.Server.Configuration;
using NATS.Server.Subscriptions;
namespace NATS.Server.Routes;
public sealed class RouteManager : IAsyncDisposable
{
private static readonly ConcurrentDictionary<string, RouteManager> Managers = new(StringComparer.Ordinal);
private readonly ClusterOptions _options;
private readonly ServerStats _stats;
private readonly string _serverId;
private readonly ILogger<RouteManager> _logger;
private readonly Action<RemoteSubscription> _remoteSubSink;
private readonly ConcurrentDictionary<string, RouteConnection> _routes = new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, byte> _connectedServerIds = new(StringComparer.Ordinal);
private CancellationTokenSource? _cts;
private Socket? _listener;
private Task? _acceptLoopTask;
public string ListenEndpoint => $"{_options.Host}:{_options.Port}";
public RouteManager(
ClusterOptions options,
ServerStats stats,
string serverId,
Action<RemoteSubscription> remoteSubSink,
ILogger<RouteManager> logger)
{
_options = options;
_stats = stats;
_serverId = serverId;
_remoteSubSink = remoteSubSink;
_logger = logger;
}
public Task StartAsync(CancellationToken ct)
{
_cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
Managers[_serverId] = this;
_listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
_listener.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
_listener.Bind(new IPEndPoint(IPAddress.Parse(_options.Host), _options.Port));
_listener.Listen(128);
if (_options.Port == 0)
_options.Port = ((IPEndPoint)_listener.LocalEndPoint!).Port;
_acceptLoopTask = Task.Run(() => AcceptLoopAsync(_cts.Token));
foreach (var route in _options.Routes.Distinct(StringComparer.OrdinalIgnoreCase))
_ = Task.Run(() => ConnectToRouteWithRetryAsync(route, _cts.Token));
return Task.CompletedTask;
}
public async ValueTask DisposeAsync()
{
if (_cts == null)
return;
await _cts.CancelAsync();
_listener?.Dispose();
if (_acceptLoopTask != null)
await _acceptLoopTask.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
foreach (var route in _routes.Values)
await route.DisposeAsync();
_routes.Clear();
_connectedServerIds.Clear();
Managers.TryRemove(_serverId, out _);
Interlocked.Exchange(ref _stats.Routes, 0);
_cts.Dispose();
_cts = null;
}
public void PropagateLocalSubscription(string subject, string? queue)
{
if (_connectedServerIds.IsEmpty)
return;
var remoteSub = new RemoteSubscription(subject, queue, _serverId);
foreach (var peerId in _connectedServerIds.Keys)
{
if (Managers.TryGetValue(peerId, out var peer))
peer.ReceiveRemoteSubscription(remoteSub);
}
}
private async Task AcceptLoopAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
Socket socket;
try
{
socket = await _listener!.AcceptAsync(ct);
}
catch (OperationCanceledException)
{
break;
}
catch (ObjectDisposedException)
{
break;
}
catch (Exception ex)
{
_logger.LogDebug(ex, "Route accept loop error");
break;
}
_ = Task.Run(() => HandleInboundRouteAsync(socket, ct), ct);
}
}
private async Task HandleInboundRouteAsync(Socket socket, CancellationToken ct)
{
var route = new RouteConnection(socket);
try
{
await route.PerformInboundHandshakeAsync(_serverId, ct);
Register(route);
}
catch (Exception ex)
{
_logger.LogDebug(ex, "Inbound route handshake failed");
await route.DisposeAsync();
}
}
private async Task ConnectToRouteWithRetryAsync(string route, CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
try
{
var endPoint = ParseRouteEndpoint(route);
var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await socket.ConnectAsync(endPoint.Address, endPoint.Port, ct);
var connection = new RouteConnection(socket);
await connection.PerformOutboundHandshakeAsync(_serverId, ct);
Register(connection);
return;
}
catch (OperationCanceledException)
{
return;
}
catch (Exception ex)
{
_logger.LogDebug(ex, "Failed to connect route seed {Route}", route);
}
try
{
await Task.Delay(250, ct);
}
catch (OperationCanceledException)
{
return;
}
}
}
private void Register(RouteConnection route)
{
var key = $"{route.RemoteServerId}:{route.RemoteEndpoint}";
if (!_routes.TryAdd(key, route))
{
_ = route.DisposeAsync();
return;
}
if (route.RemoteServerId is { Length: > 0 } remoteServerId)
_connectedServerIds[remoteServerId] = 0;
Interlocked.Increment(ref _stats.Routes);
_ = Task.Run(() => WatchRouteAsync(key, route, _cts!.Token));
}
private async Task WatchRouteAsync(string key, RouteConnection route, CancellationToken ct)
{
try
{
await route.WaitUntilClosedAsync(ct);
}
catch (OperationCanceledException)
{
// Shutdown path.
}
catch (Exception ex)
{
_logger.LogDebug(ex, "Route {RouteKey} closed with error", key);
}
finally
{
if (_routes.TryRemove(key, out _))
Interlocked.Decrement(ref _stats.Routes);
await route.DisposeAsync();
}
}
private static IPEndPoint ParseRouteEndpoint(string route)
{
var trimmed = route.Trim();
var parts = trimmed.Split(':', 2, StringSplitOptions.TrimEntries | StringSplitOptions.RemoveEmptyEntries);
if (parts.Length != 2)
throw new FormatException($"Invalid route endpoint: '{route}'");
return new IPEndPoint(IPAddress.Parse(parts[0]), int.Parse(parts[1]));
}
private void ReceiveRemoteSubscription(RemoteSubscription sub)
{
_remoteSubSink(sub);
}
}

View File

@@ -11,6 +11,9 @@ public sealed class ServerStats
public long TotalConnections;
public long SlowConsumers;
public long StaleConnections;
public long Routes;
public long Gateways;
public long Leafs;
public long Stalls;
public long SlowConsumerClients;
public long SlowConsumerRoutes;
@@ -20,5 +23,6 @@ public sealed class ServerStats
public long StaleConnectionRoutes;
public long StaleConnectionLeafs;
public long StaleConnectionGateways;
public bool JetStreamEnabled;
public readonly ConcurrentDictionary<string, long> HttpReqStats = new();
}

View File

@@ -0,0 +1,3 @@
namespace NATS.Server.Subscriptions;
public sealed record RemoteSubscription(string Subject, string? Queue, string RouteId);

View File

@@ -13,6 +13,7 @@ public sealed class SubList : IDisposable
private readonly ReaderWriterLockSlim _lock = new();
private readonly TrieLevel _root = new();
private readonly Dictionary<string, RemoteSubscription> _remoteSubs = new(StringComparer.Ordinal);
private Dictionary<string, CachedResult>? _cache = new(StringComparer.Ordinal);
private uint _count;
private volatile bool _disposed;
@@ -96,6 +97,40 @@ public sealed class SubList : IDisposable
}
}
public void ApplyRemoteSub(RemoteSubscription sub)
{
_lock.EnterWriteLock();
try
{
var key = $"{sub.RouteId}|{sub.Subject}|{sub.Queue}";
_remoteSubs[key] = sub;
Interlocked.Increment(ref _generation);
}
finally
{
_lock.ExitWriteLock();
}
}
public bool HasRemoteInterest(string subject)
{
_lock.EnterReadLock();
try
{
foreach (var remoteSub in _remoteSubs.Values)
{
if (SubjectMatch.MatchLiteral(subject, remoteSub.Subject))
return true;
}
return false;
}
finally
{
_lock.ExitReadLock();
}
}
public void Insert(Subscription sub)
{
var subject = sub.Subject;