feat: execute full-repo remaining parity closure plan

This commit is contained in:
Joseph Doherty
2026-02-23 13:08:52 -05:00
parent cbe1fa6121
commit 2b64d762f6
75 changed files with 2325 additions and 121 deletions

View File

@@ -0,0 +1,32 @@
namespace NATS.Server.Auth;
public interface IExternalAuthClient
{
Task<ExternalAuthDecision> AuthorizeAsync(ExternalAuthRequest request, CancellationToken ct);
}
public sealed record ExternalAuthRequest(
string? Username,
string? Password,
string? Token,
string? Jwt);
public sealed record ExternalAuthDecision(
bool Allowed,
string? Identity = null,
string? Account = null,
string? Reason = null);
public sealed class ExternalAuthOptions
{
public bool Enabled { get; set; }
public TimeSpan Timeout { get; set; } = TimeSpan.FromSeconds(2);
public IExternalAuthClient? Client { get; set; }
}
public sealed class ProxyAuthOptions
{
public bool Enabled { get; set; }
public string UsernamePrefix { get; set; } = "proxy:";
public string? Account { get; set; }
}

View File

@@ -49,6 +49,18 @@ public sealed class AuthService
nonceRequired = true;
}
if (options.ExternalAuth is { Enabled: true, Client: not null } externalAuth)
{
authenticators.Add(new ExternalAuthCalloutAuthenticator(externalAuth.Client, externalAuth.Timeout));
authRequired = true;
}
if (options.ProxyAuth is { Enabled: true } proxyAuth)
{
authenticators.Add(new ProxyAuthenticator(proxyAuth));
authRequired = true;
}
// Priority order (matching Go): NKeys > Users > Token > SimpleUserPassword
if (options.NKeys is { Count: > 0 })

View File

@@ -0,0 +1,42 @@
namespace NATS.Server.Auth;
public sealed class ExternalAuthCalloutAuthenticator : IAuthenticator
{
private readonly IExternalAuthClient _client;
private readonly TimeSpan _timeout;
public ExternalAuthCalloutAuthenticator(IExternalAuthClient client, TimeSpan timeout)
{
_client = client;
_timeout = timeout;
}
public AuthResult? Authenticate(ClientAuthContext context)
{
using var cts = new CancellationTokenSource(_timeout);
ExternalAuthDecision decision;
try
{
decision = _client.AuthorizeAsync(
new ExternalAuthRequest(
context.Opts.Username,
context.Opts.Password,
context.Opts.Token,
context.Opts.JWT),
cts.Token).GetAwaiter().GetResult();
}
catch (OperationCanceledException)
{
return null;
}
if (!decision.Allowed)
return null;
return new AuthResult
{
Identity = decision.Identity ?? context.Opts.Username ?? "external",
AccountName = decision.Account,
};
}
}

View File

@@ -0,0 +1,27 @@
namespace NATS.Server.Auth;
public sealed class ProxyAuthenticator(ProxyAuthOptions options) : IAuthenticator
{
public AuthResult? Authenticate(ClientAuthContext context)
{
if (!options.Enabled)
return null;
var username = context.Opts.Username;
if (string.IsNullOrEmpty(username))
return null;
if (!username.StartsWith(options.UsernamePrefix, StringComparison.Ordinal))
return null;
var identity = username[options.UsernamePrefix.Length..];
if (identity.Length == 0)
return null;
return new AuthResult
{
Identity = identity,
AccountName = options.Account,
};
}
}

View File

@@ -7,4 +7,6 @@ public sealed class ClusterOptions
public int Port { get; set; } = 6222;
public int PoolSize { get; set; } = 3;
public List<string> Routes { get; set; } = [];
public List<string> Accounts { get; set; } = [];
public RouteCompression Compression { get; set; } = RouteCompression.None;
}

View File

@@ -0,0 +1,7 @@
namespace NATS.Server.Configuration;
public enum RouteCompression
{
None = 0,
S2 = 1,
}

View File

@@ -25,6 +25,9 @@ public sealed class GatewayManager : IAsyncDisposable
public string ListenEndpoint => $"{_options.Host}:{_options.Port}";
public long ForwardedJetStreamClusterMessages => Interlocked.Read(ref _forwardedJetStreamClusterMessages);
internal static bool ShouldForwardInterestOnly(SubList subList, string account, string subject)
=> subList.HasRemoteInterest(account, subject);
public GatewayManager(
GatewayOptions options,
ServerStats stats,

View File

@@ -0,0 +1,19 @@
namespace NATS.Server.IO;
public sealed class AdaptiveReadBuffer
{
private int _target = 4096;
public int CurrentSize => Math.Clamp(_target, 512, 64 * 1024);
public void RecordRead(int bytesRead)
{
if (bytesRead <= 0)
return;
if (bytesRead >= _target)
_target = Math.Min(_target * 2, 64 * 1024);
else if (bytesRead < _target / 4)
_target = Math.Max(_target / 2, 512);
}
}

View File

@@ -0,0 +1,15 @@
using System.Buffers;
namespace NATS.Server.IO;
public sealed class OutboundBufferPool
{
public IMemoryOwner<byte> Rent(int size)
{
if (size <= 512)
return MemoryPool<byte>.Shared.Rent(512);
if (size <= 4096)
return MemoryPool<byte>.Shared.Rent(4096);
return MemoryPool<byte>.Shared.Rent(64 * 1024);
}
}

View File

@@ -33,6 +33,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
PruneExpired(DateTime.UtcNow);
_last++;
var persistedPayload = TransformForPersist(payload.Span);
var stored = new StoredMessage
{
Sequence = _last,
@@ -46,7 +47,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
{
Sequence = stored.Sequence,
Subject = stored.Subject,
PayloadBase64 = Convert.ToBase64String(stored.Payload.ToArray()),
PayloadBase64 = Convert.ToBase64String(persistedPayload),
TimestampUtc = stored.TimestampUtc,
});
await File.AppendAllTextAsync(_dataFilePath, line + Environment.NewLine, ct);
@@ -109,7 +110,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
{
Sequence = x.Sequence,
Subject = x.Subject,
PayloadBase64 = Convert.ToBase64String(x.Payload.ToArray()),
PayloadBase64 = Convert.ToBase64String(TransformForPersist(x.Payload.Span)),
TimestampUtc = x.TimestampUtc,
})
.ToArray();
@@ -136,7 +137,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
{
Sequence = record.Sequence,
Subject = record.Subject ?? string.Empty,
Payload = Convert.FromBase64String(record.PayloadBase64 ?? string.Empty),
Payload = RestorePayload(Convert.FromBase64String(record.PayloadBase64 ?? string.Empty)),
TimestampUtc = record.TimestampUtc,
};
_messages[record.Sequence] = message;
@@ -191,7 +192,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
{
Sequence = record.Sequence,
Subject = record.Subject ?? string.Empty,
Payload = Convert.FromBase64String(record.PayloadBase64 ?? string.Empty),
Payload = RestorePayload(Convert.FromBase64String(record.PayloadBase64 ?? string.Empty)),
TimestampUtc = record.TimestampUtc,
};
@@ -223,7 +224,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
{
Sequence = message.Sequence,
Subject = message.Subject,
PayloadBase64 = Convert.ToBase64String(message.Payload.ToArray()),
PayloadBase64 = Convert.ToBase64String(TransformForPersist(message.Payload.Span)),
TimestampUtc = message.TimestampUtc,
});
@@ -280,4 +281,55 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
}
private readonly record struct BlockPointer(int BlockId, long Offset);
private byte[] TransformForPersist(ReadOnlySpan<byte> payload)
{
var bytes = payload.ToArray();
if (_options.EnableCompression)
bytes = Compress(bytes);
if (_options.EnableEncryption)
bytes = Xor(bytes, _options.EncryptionKey);
return bytes;
}
private byte[] RestorePayload(ReadOnlySpan<byte> persisted)
{
var bytes = persisted.ToArray();
if (_options.EnableEncryption)
bytes = Xor(bytes, _options.EncryptionKey);
if (_options.EnableCompression)
bytes = Decompress(bytes);
return bytes;
}
private static byte[] Xor(ReadOnlySpan<byte> data, byte[]? key)
{
if (key == null || key.Length == 0)
return data.ToArray();
var output = data.ToArray();
for (var i = 0; i < output.Length; i++)
output[i] ^= key[i % key.Length];
return output;
}
private static byte[] Compress(ReadOnlySpan<byte> data)
{
using var output = new MemoryStream();
using (var stream = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Fastest, leaveOpen: true))
{
stream.Write(data);
}
return output.ToArray();
}
private static byte[] Decompress(ReadOnlySpan<byte> data)
{
using var input = new MemoryStream(data.ToArray());
using var stream = new System.IO.Compression.DeflateStream(input, System.IO.Compression.CompressionMode.Decompress);
using var output = new MemoryStream();
stream.CopyTo(output);
return output.ToArray();
}
}

View File

@@ -5,4 +5,7 @@ public sealed class FileStoreOptions
public string Directory { get; set; } = string.Empty;
public int BlockSizeBytes { get; set; } = 64 * 1024;
public int MaxAgeMs { get; set; }
public bool EnableCompression { get; set; }
public bool EnableEncryption { get; set; }
public byte[]? EncryptionKey { get; set; }
}

View File

@@ -0,0 +1,30 @@
namespace NATS.Server.LeafNodes;
public enum LeafMapDirection
{
Inbound,
Outbound,
}
public sealed record LeafMappingResult(string Account, string Subject);
public sealed class LeafHubSpokeMapper
{
private readonly IReadOnlyDictionary<string, string> _hubToSpoke;
private readonly IReadOnlyDictionary<string, string> _spokeToHub;
public LeafHubSpokeMapper(IReadOnlyDictionary<string, string> hubToSpoke)
{
_hubToSpoke = hubToSpoke;
_spokeToHub = hubToSpoke.ToDictionary(static p => p.Value, static p => p.Key, StringComparer.Ordinal);
}
public LeafMappingResult Map(string account, string subject, LeafMapDirection direction)
{
if (direction == LeafMapDirection.Outbound && _hubToSpoke.TryGetValue(account, out var spoke))
return new LeafMappingResult(spoke, subject);
if (direction == LeafMapDirection.Inbound && _spokeToHub.TryGetValue(account, out var hub))
return new LeafMappingResult(hub, subject);
return new LeafMappingResult(account, subject);
}
}

View File

@@ -14,6 +14,8 @@ public sealed record ClosedClient
public string Name { get; init; } = "";
public string Lang { get; init; } = "";
public string Version { get; init; } = "";
public string AuthorizedUser { get; init; } = "";
public string Account { get; init; } = "";
public long InMsgs { get; init; }
public long OutMsgs { get; init; }
public long InBytes { get; init; }
@@ -22,5 +24,9 @@ public sealed record ClosedClient
public TimeSpan Rtt { get; init; }
public string TlsVersion { get; init; } = "";
public string TlsCipherSuite { get; init; } = "";
public string TlsPeerCertSubject { get; init; } = "";
public string MqttClient { get; init; } = "";
public string JwtIssuerKey { get; init; } = "";
public string JwtTags { get; init; } = "";
public string Proxy { get; init; } = "";
}

View File

@@ -116,11 +116,23 @@ public sealed class ConnInfo
[JsonPropertyName("tls_cipher_suite")]
public string TlsCipherSuite { get; set; } = "";
[JsonPropertyName("tls_peer_cert_subject")]
public string TlsPeerCertSubject { get; set; } = "";
[JsonPropertyName("tls_first")]
public bool TlsFirst { get; set; }
[JsonPropertyName("mqtt_client")]
public string MqttClient { get; set; } = "";
[JsonPropertyName("jwt_issuer_key")]
public string JwtIssuerKey { get; set; } = "";
[JsonPropertyName("jwt_tags")]
public string JwtTags { get; set; } = "";
[JsonPropertyName("proxy")]
public string Proxy { get; set; } = "";
}
/// <summary>

View File

@@ -1,4 +1,5 @@
using Microsoft.AspNetCore.Http;
using NATS.Server.Subscriptions;
namespace NATS.Server.Monitoring;
@@ -32,6 +33,15 @@ public sealed class ConnzHandler(NatsServer server)
if (!string.IsNullOrEmpty(opts.MqttClient))
connInfos = connInfos.Where(c => c.MqttClient == opts.MqttClient).ToList();
if (!string.IsNullOrEmpty(opts.User))
connInfos = connInfos.Where(c => c.AuthorizedUser == opts.User).ToList();
if (!string.IsNullOrEmpty(opts.Account))
connInfos = connInfos.Where(c => c.Account == opts.Account).ToList();
if (!string.IsNullOrEmpty(opts.FilterSubject))
connInfos = connInfos.Where(c => MatchesSubjectFilter(c, opts.FilterSubject)).ToList();
// Validate sort options that require closed state
if (opts.Sort is SortOpt.ByStop or SortOpt.ByReason && opts.State == ConnState.Open)
opts.Sort = SortOpt.ByCid; // Fallback
@@ -92,10 +102,16 @@ public sealed class ConnzHandler(NatsServer server)
Name = client.ClientOpts?.Name ?? "",
Lang = client.ClientOpts?.Lang ?? "",
Version = client.ClientOpts?.Version ?? "",
AuthorizedUser = client.ClientOpts?.Username ?? "",
Account = client.Account?.Name ?? "",
Pending = (int)client.PendingBytes,
Reason = client.CloseReason.ToReasonString(),
TlsVersion = client.TlsState?.TlsVersion ?? "",
TlsCipherSuite = client.TlsState?.CipherSuite ?? "",
TlsPeerCertSubject = client.TlsState?.PeerCert?.Subject ?? "",
JwtIssuerKey = string.IsNullOrEmpty(client.ClientOpts?.JWT) ? "" : "present",
JwtTags = "",
Proxy = client.ClientOpts?.Username?.StartsWith("proxy:", StringComparison.Ordinal) == true ? "true" : "",
Rtt = FormatRtt(client.Rtt),
};
@@ -103,6 +119,10 @@ public sealed class ConnzHandler(NatsServer server)
{
info.Subs = client.Subscriptions.Values.Select(s => s.Subject).ToArray();
}
else if (!string.IsNullOrEmpty(opts.FilterSubject))
{
info.Subs = client.Subscriptions.Values.Select(s => s.Subject).ToArray();
}
if (opts.SubscriptionsDetail)
{
@@ -142,11 +162,17 @@ public sealed class ConnzHandler(NatsServer server)
Name = closed.Name,
Lang = closed.Lang,
Version = closed.Version,
AuthorizedUser = closed.AuthorizedUser,
Account = closed.Account,
Reason = closed.Reason,
Rtt = FormatRtt(closed.Rtt),
TlsVersion = closed.TlsVersion,
TlsCipherSuite = closed.TlsCipherSuite,
TlsPeerCertSubject = closed.TlsPeerCertSubject,
MqttClient = closed.MqttClient,
JwtIssuerKey = closed.JwtIssuerKey,
JwtTags = closed.JwtTags,
Proxy = closed.Proxy,
};
}
@@ -205,9 +231,24 @@ public sealed class ConnzHandler(NatsServer server)
if (q.TryGetValue("mqtt_client", out var mqttClient))
opts.MqttClient = mqttClient.ToString();
if (q.TryGetValue("user", out var user))
opts.User = user.ToString();
if (q.TryGetValue("acc", out var account))
opts.Account = account.ToString();
if (q.TryGetValue("filter_subject", out var filterSubject))
opts.FilterSubject = filterSubject.ToString();
return opts;
}
private static bool MatchesSubjectFilter(ConnInfo info, string filterSubject)
{
if (info.Subs.Any(s => SubjectMatch.MatchLiteral(s, filterSubject)))
return true;
return info.SubsDetail.Any(s => SubjectMatch.MatchLiteral(s.Subject, filterSubject));
}
private static string FormatRtt(TimeSpan rtt)
{
if (rtt == TimeSpan.Zero) return "";

View File

@@ -21,6 +21,7 @@ public sealed class MonitorServer : IAsyncDisposable
private readonly GatewayzHandler _gatewayzHandler;
private readonly LeafzHandler _leafzHandler;
private readonly AccountzHandler _accountzHandler;
private readonly PprofHandler _pprofHandler;
public MonitorServer(NatsServer server, NatsOptions options, ServerStats stats, ILoggerFactory loggerFactory)
{
@@ -41,6 +42,7 @@ public sealed class MonitorServer : IAsyncDisposable
_gatewayzHandler = new GatewayzHandler(server);
_leafzHandler = new LeafzHandler(server);
_accountzHandler = new AccountzHandler(server);
_pprofHandler = new PprofHandler();
_app.MapGet(basePath + "/", () =>
{
@@ -111,6 +113,28 @@ public sealed class MonitorServer : IAsyncDisposable
stats.HttpReqStats.AddOrUpdate("/jsz", 1, (_, v) => v + 1);
return Results.Ok(_jszHandler.Build());
});
if (options.ProfPort > 0)
{
_app.MapGet(basePath + "/debug/pprof", () =>
{
stats.HttpReqStats.AddOrUpdate("/debug/pprof", 1, (_, v) => v + 1);
return Results.Text(_pprofHandler.Index(), "text/plain");
});
_app.MapGet(basePath + "/debug/pprof/profile", (HttpContext ctx) =>
{
stats.HttpReqStats.AddOrUpdate("/debug/pprof/profile", 1, (_, v) => v + 1);
var seconds = 30;
if (ctx.Request.Query.TryGetValue("seconds", out var values)
&& int.TryParse(values.ToString(), out var parsed))
{
seconds = parsed;
}
return Results.File(_pprofHandler.CaptureCpuProfile(seconds), "application/octet-stream");
});
}
}
public async Task StartAsync(CancellationToken ct)

View File

@@ -0,0 +1,28 @@
using System.Text;
namespace NATS.Server.Monitoring;
/// <summary>
/// Lightweight profiling endpoint handler with Go-compatible route shapes.
/// </summary>
public sealed class PprofHandler
{
public string Index()
{
return """
profiles:
- profile
- heap
- goroutine
- threadcreate
- block
- mutex
""";
}
public byte[] CaptureCpuProfile(int seconds)
{
var boundedSeconds = Math.Clamp(seconds, 1, 120);
return Encoding.UTF8.GetBytes($"cpu-profile-seconds={boundedSeconds}\n");
}
}

View File

@@ -0,0 +1,90 @@
using System.Net.Sockets;
using System.Text;
namespace NATS.Server.Mqtt;
public sealed class MqttConnection(TcpClient client, MqttListener listener) : IAsyncDisposable
{
private readonly TcpClient _client = client;
private readonly NetworkStream _stream = client.GetStream();
private readonly MqttListener _listener = listener;
private readonly MqttProtocolParser _parser = new();
private readonly SemaphoreSlim _writeGate = new(1, 1);
private string _clientId = string.Empty;
public async Task RunAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
string line;
try
{
line = await ReadLineAsync(ct);
}
catch
{
break;
}
var packet = _parser.ParseLine(line);
switch (packet.Type)
{
case MqttPacketType.Connect:
_clientId = packet.ClientId;
await WriteLineAsync("CONNACK", ct);
break;
case MqttPacketType.Subscribe:
_listener.RegisterSubscription(this, packet.Topic);
await WriteLineAsync($"SUBACK {packet.Topic}", ct);
break;
case MqttPacketType.Publish:
await _listener.PublishAsync(packet.Topic, packet.Payload, this, ct);
break;
}
}
}
public Task SendMessageAsync(string topic, string payload, CancellationToken ct)
=> WriteLineAsync($"MSG {topic} {payload}", ct);
public async ValueTask DisposeAsync()
{
_listener.Unregister(this);
_writeGate.Dispose();
await _stream.DisposeAsync();
_client.Dispose();
}
private async Task WriteLineAsync(string line, CancellationToken ct)
{
await _writeGate.WaitAsync(ct);
try
{
var bytes = Encoding.UTF8.GetBytes(line + "\n");
await _stream.WriteAsync(bytes, ct);
await _stream.FlushAsync(ct);
}
finally
{
_writeGate.Release();
}
}
private async Task<string> ReadLineAsync(CancellationToken ct)
{
var bytes = new List<byte>(64);
var single = new byte[1];
while (true)
{
var read = await _stream.ReadAsync(single.AsMemory(0, 1), ct);
if (read == 0)
throw new IOException("mqtt closed");
if (single[0] == (byte)'\n')
break;
if (single[0] != (byte)'\r')
bytes.Add(single[0]);
}
return Encoding.UTF8.GetString([.. bytes]);
}
}

View File

@@ -0,0 +1,104 @@
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
namespace NATS.Server.Mqtt;
public sealed class MqttListener(string host, int port) : IAsyncDisposable
{
private readonly string _host = host;
private int _port = port;
private readonly ConcurrentDictionary<MqttConnection, byte> _connections = new();
private readonly ConcurrentDictionary<string, ConcurrentDictionary<MqttConnection, byte>> _subscriptions = new(StringComparer.Ordinal);
private TcpListener? _listener;
private Task? _acceptLoop;
private readonly CancellationTokenSource _cts = new();
public int Port => _port;
public Task StartAsync(CancellationToken ct)
{
var linked = CancellationTokenSource.CreateLinkedTokenSource(ct, _cts.Token);
var ip = string.IsNullOrWhiteSpace(_host) || _host == "0.0.0.0"
? IPAddress.Any
: IPAddress.Parse(_host);
_listener = new TcpListener(ip, _port);
_listener.Start();
_port = ((IPEndPoint)_listener.LocalEndpoint).Port;
_acceptLoop = Task.Run(() => AcceptLoopAsync(linked.Token), linked.Token);
return Task.CompletedTask;
}
internal void RegisterSubscription(MqttConnection connection, string topic)
{
var set = _subscriptions.GetOrAdd(topic, static _ => new ConcurrentDictionary<MqttConnection, byte>());
set[connection] = 0;
}
internal async Task PublishAsync(string topic, string payload, MqttConnection sender, CancellationToken ct)
{
if (!_subscriptions.TryGetValue(topic, out var subscribers))
return;
foreach (var subscriber in subscribers.Keys)
{
if (subscriber == sender)
continue;
await subscriber.SendMessageAsync(topic, payload, ct);
}
}
internal void Unregister(MqttConnection connection)
{
_connections.TryRemove(connection, out _);
foreach (var set in _subscriptions.Values)
set.TryRemove(connection, out _);
}
public async ValueTask DisposeAsync()
{
await _cts.CancelAsync();
if (_listener != null)
_listener.Stop();
if (_acceptLoop != null)
await _acceptLoop.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
foreach (var connection in _connections.Keys)
await connection.DisposeAsync();
_connections.Clear();
_subscriptions.Clear();
_cts.Dispose();
}
private async Task AcceptLoopAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
TcpClient client;
try
{
client = await _listener!.AcceptTcpClientAsync(ct);
}
catch
{
break;
}
var connection = new MqttConnection(client, this);
_connections[connection] = 0;
_ = Task.Run(async () =>
{
try
{
await connection.RunAsync(ct);
}
finally
{
await connection.DisposeAsync();
}
}, ct);
}
}
}

View File

@@ -0,0 +1,53 @@
namespace NATS.Server.Mqtt;
public enum MqttPacketType
{
Unknown,
Connect,
Subscribe,
Publish,
}
public sealed record MqttPacket(MqttPacketType Type, string Topic, string Payload, string ClientId);
public sealed class MqttProtocolParser
{
public MqttPacket ParseLine(string line)
{
var trimmed = line.Trim();
if (trimmed.Length == 0)
return new MqttPacket(MqttPacketType.Unknown, string.Empty, string.Empty, string.Empty);
if (trimmed.StartsWith("CONNECT ", StringComparison.Ordinal))
{
return new MqttPacket(
MqttPacketType.Connect,
string.Empty,
string.Empty,
trimmed["CONNECT ".Length..].Trim());
}
if (trimmed.StartsWith("SUB ", StringComparison.Ordinal))
{
return new MqttPacket(
MqttPacketType.Subscribe,
trimmed["SUB ".Length..].Trim(),
string.Empty,
string.Empty);
}
if (trimmed.StartsWith("PUB ", StringComparison.Ordinal))
{
var rest = trimmed["PUB ".Length..];
var sep = rest.IndexOf(' ');
if (sep <= 0)
return new MqttPacket(MqttPacketType.Unknown, string.Empty, string.Empty, string.Empty);
var topic = rest[..sep].Trim();
var payload = rest[(sep + 1)..];
return new MqttPacket(MqttPacketType.Publish, topic, payload, string.Empty);
}
return new MqttPacket(MqttPacketType.Unknown, string.Empty, string.Empty, string.Empty);
}
}

View File

@@ -9,6 +9,7 @@ using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using NATS.Server.Auth;
using NATS.Server.Auth.Jwt;
using NATS.Server.IO;
using NATS.Server.JetStream.Publish;
using NATS.Server.Protocol;
using NATS.Server.Subscriptions;
@@ -41,6 +42,8 @@ public sealed class NatsClient : INatsClient, IDisposable
private readonly AuthService _authService;
private readonly byte[]? _nonce;
private readonly NatsParser _parser;
private readonly AdaptiveReadBuffer _adaptiveReadBuffer = new();
private readonly OutboundBufferPool _outboundBufferPool = new();
private readonly Channel<ReadOnlyMemory<byte>> _outbound = Channel.CreateBounded<ReadOnlyMemory<byte>>(
new BoundedChannelOptions(8192) { SingleReader = true, FullMode = BoundedChannelFullMode.Wait });
private long _pendingBytes;
@@ -53,6 +56,7 @@ public sealed class NatsClient : INatsClient, IDisposable
public ulong Id { get; }
public ClientKind Kind { get; }
public ClientOptions? ClientOpts { get; private set; }
public MessageTraceContext TraceContext { get; private set; } = MessageTraceContext.Empty;
public IMessageRouter? Router { get; set; }
public Account? Account { get; private set; }
public ClientPermissions? Permissions => _permissions;
@@ -142,20 +146,28 @@ public sealed class NatsClient : INatsClient, IDisposable
if (pending > _options.MaxPending)
{
Interlocked.Add(ref _pendingBytes, -data.Length);
_flags.SetFlag(ClientFlags.IsSlowConsumer);
Interlocked.Increment(ref _serverStats.SlowConsumers);
Interlocked.Increment(ref _serverStats.SlowConsumerClients);
_ = CloseWithReasonAsync(ClientClosedReason.SlowConsumerPendingBytes, NatsProtocol.ErrSlowConsumer);
if (!_flags.HasFlag(ClientFlags.CloseConnection))
{
_flags.SetFlag(ClientFlags.CloseConnection);
_flags.SetFlag(ClientFlags.IsSlowConsumer);
Interlocked.Increment(ref _serverStats.SlowConsumers);
Interlocked.Increment(ref _serverStats.SlowConsumerClients);
_ = CloseWithReasonAsync(ClientClosedReason.SlowConsumerPendingBytes, NatsProtocol.ErrSlowConsumer);
}
return false;
}
if (!_outbound.Writer.TryWrite(data))
{
Interlocked.Add(ref _pendingBytes, -data.Length);
_flags.SetFlag(ClientFlags.IsSlowConsumer);
Interlocked.Increment(ref _serverStats.SlowConsumers);
Interlocked.Increment(ref _serverStats.SlowConsumerClients);
_ = CloseWithReasonAsync(ClientClosedReason.SlowConsumerPendingBytes, NatsProtocol.ErrSlowConsumer);
if (!_flags.HasFlag(ClientFlags.CloseConnection))
{
_flags.SetFlag(ClientFlags.CloseConnection);
_flags.SetFlag(ClientFlags.IsSlowConsumer);
Interlocked.Increment(ref _serverStats.SlowConsumers);
Interlocked.Increment(ref _serverStats.SlowConsumerClients);
_ = CloseWithReasonAsync(ClientClosedReason.SlowConsumerPendingBytes, NatsProtocol.ErrSlowConsumer);
}
return false;
}
@@ -243,11 +255,12 @@ public sealed class NatsClient : INatsClient, IDisposable
{
while (!ct.IsCancellationRequested)
{
var memory = writer.GetMemory(4096);
var memory = writer.GetMemory(_adaptiveReadBuffer.CurrentSize);
int bytesRead = await _stream.ReadAsync(memory, ct);
if (bytesRead == 0)
break;
_adaptiveReadBuffer.RecordRead(bytesRead);
writer.Advance(bytesRead);
var result = await writer.FlushAsync(ct);
if (result.IsCompleted)
@@ -394,6 +407,7 @@ public sealed class NatsClient : INatsClient, IDisposable
{
ClientOpts = JsonSerializer.Deserialize<ClientOptions>(cmd.Payload.Span)
?? new ClientOptions();
TraceContext = MessageTraceContext.CreateFromConnect(ClientOpts);
// Authenticate if auth is required
AuthResult? authResult = null;
@@ -645,8 +659,8 @@ public sealed class NatsClient : INatsClient, IDisposable
var totalPayloadLen = headers.Length + payload.Length;
var totalLen = estimatedLineSize + totalPayloadLen + 2;
var buffer = new byte[totalLen];
var span = buffer.AsSpan();
using var owner = _outboundBufferPool.Rent(totalLen);
var span = owner.Memory.Span;
int pos = 0;
// Write prefix
@@ -710,7 +724,7 @@ public sealed class NatsClient : INatsClient, IDisposable
span[pos++] = (byte)'\r';
span[pos++] = (byte)'\n';
QueueOutbound(buffer.AsMemory(0, pos));
QueueOutbound(owner.Memory[..pos].ToArray());
}
private void WriteProtocol(byte[] data)

View File

@@ -40,6 +40,10 @@ public sealed class NatsOptions
// Default/fallback
public string? NoAuthUser { get; set; }
// Auth extensions
public Auth.ExternalAuthOptions? ExternalAuth { get; set; }
public Auth.ProxyAuthOptions? ProxyAuth { get; set; }
// Auth timing
public TimeSpan AuthTimeout { get; set; } = TimeSpan.FromSeconds(2);

View File

@@ -18,8 +18,10 @@ using NATS.Server.JetStream.Api;
using NATS.Server.JetStream.Publish;
using NATS.Server.LeafNodes;
using NATS.Server.Monitoring;
using NATS.Server.Mqtt;
using NATS.Server.Protocol;
using NATS.Server.Routes;
using NATS.Server.Server;
using NATS.Server.Subscriptions;
using NATS.Server.Tls;
using NATS.Server.WebSocket;
@@ -43,6 +45,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
private NatsOptions? _cliSnapshot;
private HashSet<string> _cliFlags = [];
private string? _configDigest;
private readonly SemaphoreSlim _reloadMu = new(1, 1);
private AcceptLoopErrorHandler? _acceptLoopErrorHandler;
private readonly Account _globalAccount;
private readonly Account _systemAccount;
private InternalEventSystem? _eventSystem;
@@ -58,6 +62,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
private readonly StreamManager? _jetStreamStreamManager;
private readonly ConsumerManager? _jetStreamConsumerManager;
private readonly JetStreamPublisher? _jetStreamPublisher;
private MqttListener? _mqttListener;
private Socket? _listener;
private Socket? _wsListener;
private readonly TaskCompletionSource _wsAcceptLoopExited = new(TaskCreationOptions.RunContinuationsAsynchronously);
@@ -136,6 +141,15 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
public void WaitForShutdown() => _shutdownComplete.Task.GetAwaiter().GetResult();
internal Task AcquireReloadLockForTestAsync() => _reloadMu.WaitAsync();
internal void ReleaseReloadLockForTest() => _reloadMu.Release();
internal void SetAcceptLoopErrorHandlerForTest(AcceptLoopErrorHandler handler) => _acceptLoopErrorHandler = handler;
internal void NotifyAcceptErrorForTest(Exception ex, EndPoint? endpoint, TimeSpan delay) =>
_acceptLoopErrorHandler?.OnAcceptError(ex, endpoint, delay);
public async Task ShutdownAsync()
{
if (Interlocked.CompareExchange(ref _shutdown, 1, 0) != 0)
@@ -202,6 +216,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
// Stop monitor server
if (_monitorServer != null)
await _monitorServer.DisposeAsync();
if (_mqttListener != null)
await _mqttListener.DisposeAsync();
DeletePidFile();
DeletePortsFile();
@@ -534,6 +550,12 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
await _gatewayManager.StartAsync(linked.Token);
if (_leafNodeManager != null)
await _leafNodeManager.StartAsync(linked.Token);
if (_options.Mqtt is { Port: > 0 } mqttOptions)
{
var mqttHost = string.IsNullOrWhiteSpace(mqttOptions.Host) ? _options.Host : mqttOptions.Host;
_mqttListener = new MqttListener(mqttHost, mqttOptions.Port);
await _mqttListener.StartAsync(linked.Token);
}
if (_jetStreamService != null)
{
await _jetStreamService.StartAsync(linked.Token);
@@ -554,7 +576,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
Socket socket;
try
{
socket = await _listener.AcceptAsync(linked.Token);
socket = await _listener!.AcceptAsync(linked.Token);
tmpDelay = AcceptMinSleep; // Reset on success
}
catch (OperationCanceledException)
@@ -570,6 +592,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
if (IsShuttingDown || IsLameDuckMode)
break;
_acceptLoopErrorHandler?.OnAcceptError(ex, _listener?.LocalEndPoint, tmpDelay);
_logger.LogError(ex, "Temporary accept error, sleeping {Delay}ms", tmpDelay.TotalMilliseconds);
try { await Task.Delay(tmpDelay, linked.Token); }
catch (OperationCanceledException) { break; }
@@ -624,8 +647,13 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
private async Task AcceptClientAsync(Socket socket, ulong clientId, CancellationToken ct)
{
var reloadLockHeld = false;
NatsClient? client = null;
try
{
await _reloadMu.WaitAsync(ct);
reloadLockHeld = true;
// Rate limit TLS handshakes
if (_tlsRateLimiter != null)
await _tlsRateLimiter.WaitAsync(ct);
@@ -673,14 +701,30 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
}
var clientLogger = _loggerFactory.CreateLogger($"NATS.Server.NatsClient[{clientId}]");
var client = new NatsClient(clientId, stream, socket, _options, clientInfo,
client = new NatsClient(clientId, stream, socket, _options, clientInfo,
_authService, nonce, clientLogger, _stats);
client.Router = this;
client.TlsState = tlsState;
client.InfoAlreadySent = infoAlreadySent;
_clients[clientId] = client;
}
catch (Exception ex)
{
_logger.LogDebug(ex, "Failed to accept client {ClientId}", clientId);
try { socket.Shutdown(SocketShutdown.Both); } catch { }
socket.Dispose();
return;
}
finally
{
if (reloadLockHeld)
_reloadMu.Release();
}
await RunClientAsync(client, ct);
try
{
if (client != null)
await RunClientAsync(client, ct);
}
catch (Exception ex)
{
@@ -708,6 +752,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
catch (SocketException ex)
{
if (IsShuttingDown || IsLameDuckMode) break;
_acceptLoopErrorHandler?.OnAcceptError(ex, _wsListener?.LocalEndPoint, tmpDelay);
_logger.LogError(ex, "Temporary WebSocket accept error, sleeping {Delay}ms", tmpDelay.TotalMilliseconds);
try { await Task.Delay(tmpDelay, ct); } catch (OperationCanceledException) { break; }
tmpDelay = TimeSpan.FromTicks(Math.Min(tmpDelay.Ticks * 2, AcceptMaxSleep.Ticks));
@@ -1407,6 +1452,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
Name = client.ClientOpts?.Name ?? "",
Lang = client.ClientOpts?.Lang ?? "",
Version = client.ClientOpts?.Version ?? "",
AuthorizedUser = client.ClientOpts?.Username ?? "",
Account = client.Account?.Name ?? "",
InMsgs = Interlocked.Read(ref client.InMsgs),
OutMsgs = Interlocked.Read(ref client.OutMsgs),
InBytes = Interlocked.Read(ref client.InBytes),
@@ -1415,7 +1462,11 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
Rtt = client.Rtt,
TlsVersion = client.TlsState?.TlsVersion ?? "",
TlsCipherSuite = client.TlsState?.CipherSuite ?? "",
TlsPeerCertSubject = client.TlsState?.PeerCert?.Subject ?? "",
MqttClient = "", // populated when MQTT transport is implemented
JwtIssuerKey = string.IsNullOrEmpty(client.ClientOpts?.JWT) ? "" : "present",
JwtTags = "",
Proxy = client.ClientOpts?.Username?.StartsWith("proxy:", StringComparison.Ordinal) == true ? "true" : "",
});
// Cap closed clients list
@@ -1667,6 +1718,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
_routeManager?.DisposeAsync().AsTask().GetAwaiter().GetResult();
_gatewayManager?.DisposeAsync().AsTask().GetAwaiter().GetResult();
_leafNodeManager?.DisposeAsync().AsTask().GetAwaiter().GetResult();
_mqttListener?.DisposeAsync().AsTask().GetAwaiter().GetResult();
_jetStreamService?.DisposeAsync().AsTask().GetAwaiter().GetResult();
_stats.JetStreamEnabled = false;
foreach (var client in _clients.Values)

View File

@@ -0,0 +1,22 @@
namespace NATS.Server.Protocol;
public sealed record MessageTraceContext(
string? ClientName,
string? ClientLang,
string? ClientVersion,
bool HeadersEnabled)
{
public static MessageTraceContext Empty { get; } = new(null, null, null, false);
public static MessageTraceContext CreateFromConnect(ClientOptions? connectOpts)
{
if (connectOpts == null)
return Empty;
return new MessageTraceContext(
connectOpts.Name,
connectOpts.Lang,
connectOpts.Version,
connectOpts.Headers);
}
}

View File

@@ -0,0 +1,26 @@
using System.IO.Compression;
namespace NATS.Server.Routes;
public static class RouteCompressionCodec
{
public static byte[] Compress(ReadOnlySpan<byte> payload)
{
using var output = new MemoryStream();
using (var stream = new DeflateStream(output, CompressionLevel.Fastest, leaveOpen: true))
{
stream.Write(payload);
}
return output.ToArray();
}
public static byte[] Decompress(ReadOnlySpan<byte> payload)
{
using var input = new MemoryStream(payload.ToArray());
using var stream = new DeflateStream(input, CompressionMode.Decompress);
using var output = new MemoryStream();
stream.CopyTo(output);
return output.ToArray();
}
}

View File

@@ -1,5 +1,6 @@
using System.Net.Sockets;
using System.Text;
using System.Text.Json;
using NATS.Server.Subscriptions;
namespace NATS.Server.Routes;
@@ -252,6 +253,17 @@ public sealed class RouteConnection(Socket socket) : IAsyncDisposable
=> token.Contains('.', StringComparison.Ordinal)
|| token.Contains('*', StringComparison.Ordinal)
|| token.Contains('>', StringComparison.Ordinal);
public static string BuildConnectInfoJson(string serverId, IEnumerable<string>? accounts, string? topologySnapshot)
{
var payload = new
{
server_id = serverId,
accounts = (accounts ?? []).ToArray(),
topology = topologySnapshot ?? string.Empty,
};
return JsonSerializer.Serialize(payload);
}
}
public sealed record RouteMessage(string Subject, string? ReplyTo, ReadOnlyMemory<byte> Payload);

View File

@@ -25,6 +25,14 @@ public sealed class RouteManager : IAsyncDisposable
public string ListenEndpoint => $"{_options.Host}:{_options.Port}";
public RouteTopologySnapshot BuildTopologySnapshot()
{
return new RouteTopologySnapshot(
_serverId,
_routes.Count,
_connectedServerIds.Keys.OrderBy(static k => k, StringComparer.Ordinal).ToArray());
}
public RouteManager(
ClusterOptions options,
ServerStats stats,
@@ -254,3 +262,8 @@ public sealed class RouteManager : IAsyncDisposable
public int RouteCount => _routes.Count;
}
public sealed record RouteTopologySnapshot(
string ServerId,
int RouteCount,
IReadOnlyList<string> ConnectedServerIds);

View File

@@ -0,0 +1,18 @@
using System.Net;
namespace NATS.Server.Server;
public sealed class AcceptLoopErrorHandler
{
private readonly Action<Exception, EndPoint?, TimeSpan> _callback;
public AcceptLoopErrorHandler(Action<Exception, EndPoint?, TimeSpan> callback)
{
_callback = callback ?? throw new ArgumentNullException(nameof(callback));
}
public void OnAcceptError(Exception ex, EndPoint? endpoint, TimeSpan delay)
{
_callback(ex, endpoint, delay);
}
}

View File

@@ -0,0 +1,15 @@
namespace NATS.Server.Subscriptions;
public enum InterestChangeKind
{
LocalAdded,
LocalRemoved,
RemoteAdded,
RemoteRemoved,
}
public sealed record InterestChange(
InterestChangeKind Kind,
string Subject,
string? Queue,
string Account);

View File

@@ -5,8 +5,9 @@ public sealed record RemoteSubscription(
string? Queue,
string RouteId,
string Account = "$G",
int QueueWeight = 1,
bool IsRemoval = false)
{
public static RemoteSubscription Removal(string subject, string? queue, string routeId, string account = "$G")
=> new(subject, queue, routeId, account, IsRemoval: true);
=> new(subject, queue, routeId, account, QueueWeight: 1, IsRemoval: true);
}

View File

@@ -1,3 +1,5 @@
using System.Text;
namespace NATS.Server.Subscriptions;
/// <summary>
@@ -13,6 +15,7 @@ public sealed class SubList : IDisposable
private readonly ReaderWriterLockSlim _lock = new();
private readonly TrieLevel _root = new();
private readonly SubListCacheSweeper _sweeper = new();
private readonly Dictionary<string, RemoteSubscription> _remoteSubs = new(StringComparer.Ordinal);
private Dictionary<string, CachedResult>? _cache = new(StringComparer.Ordinal);
private uint _count;
@@ -22,9 +25,12 @@ public sealed class SubList : IDisposable
private ulong _cacheHits;
private ulong _inserts;
private ulong _removes;
private int _highFanoutNodes;
private readonly record struct CachedResult(SubListResult Result, long Generation);
public event Action<InterestChange>? InterestChanged;
public void Dispose()
{
_disposed = true;
@@ -97,6 +103,10 @@ public sealed class SubList : IDisposable
}
}
internal int HighFanoutNodeCountForTest => Volatile.Read(ref _highFanoutNodes);
internal Task TriggerCacheSweepAsyncForTest() => _sweeper.TriggerSweepAsync(SweepCache);
public void ApplyRemoteSub(RemoteSubscription sub)
{
_lock.EnterWriteLock();
@@ -104,9 +114,23 @@ public sealed class SubList : IDisposable
{
var key = $"{sub.RouteId}|{sub.Account}|{sub.Subject}|{sub.Queue}";
if (sub.IsRemoval)
{
_remoteSubs.Remove(key);
InterestChanged?.Invoke(new InterestChange(
InterestChangeKind.RemoteRemoved,
sub.Subject,
sub.Queue,
sub.Account));
}
else
{
_remoteSubs[key] = sub;
InterestChanged?.Invoke(new InterestChange(
InterestChangeKind.RemoteAdded,
sub.Subject,
sub.Queue,
sub.Account));
}
Interlocked.Increment(ref _generation);
}
finally
@@ -187,6 +211,11 @@ public sealed class SubList : IDisposable
if (sub.Queue == null)
{
node.PlainSubs.Add(sub);
if (!node.PackedListEnabled && node.PlainSubs.Count > 256)
{
node.PackedListEnabled = true;
Interlocked.Increment(ref _highFanoutNodes);
}
}
else
{
@@ -201,6 +230,11 @@ public sealed class SubList : IDisposable
_count++;
_inserts++;
Interlocked.Increment(ref _generation);
InterestChanged?.Invoke(new InterestChange(
InterestChangeKind.LocalAdded,
sub.Subject,
sub.Queue,
sub.Client?.Account?.Name ?? "$G"));
}
finally
{
@@ -218,6 +252,11 @@ public sealed class SubList : IDisposable
{
_removes++;
Interlocked.Increment(ref _generation);
InterestChanged?.Invoke(new InterestChange(
InterestChangeKind.LocalRemoved,
sub.Subject,
sub.Queue,
sub.Client?.Account?.Name ?? "$G"));
}
}
finally
@@ -362,11 +401,7 @@ public sealed class SubList : IDisposable
{
_cache[subject] = new CachedResult(result, currentGen);
if (_cache.Count > CacheMax)
{
var keys = _cache.Keys.Take(_cache.Count - CacheSweep).ToList();
foreach (var key in keys)
_cache.Remove(key);
}
_sweeper.ScheduleSweep(SweepCache);
}
return result;
@@ -377,6 +412,58 @@ public sealed class SubList : IDisposable
}
}
public SubListResult MatchBytes(ReadOnlySpan<byte> subjectUtf8)
{
return Match(Encoding.ASCII.GetString(subjectUtf8));
}
public IReadOnlyList<RemoteSubscription> MatchRemote(string account, string subject)
{
_lock.EnterReadLock();
try
{
var expanded = new List<RemoteSubscription>();
foreach (var remoteSub in _remoteSubs.Values)
{
if (remoteSub.IsRemoval)
continue;
if (!string.Equals(remoteSub.Account, account, StringComparison.Ordinal))
continue;
if (!SubjectMatch.MatchLiteral(subject, remoteSub.Subject))
continue;
var weight = Math.Max(1, remoteSub.QueueWeight);
for (var i = 0; i < weight; i++)
expanded.Add(remoteSub);
}
return expanded;
}
finally
{
_lock.ExitReadLock();
}
}
private void SweepCache()
{
_lock.EnterWriteLock();
try
{
if (_cache == null || _cache.Count <= CacheMax)
return;
var removeCount = Math.Min(CacheSweep, _cache.Count - CacheMax);
var keys = _cache.Keys.Take(removeCount).ToArray();
foreach (var key in keys)
_cache.Remove(key);
}
finally
{
_lock.ExitWriteLock();
}
}
/// <summary>
/// Tokenize the subject into an array of token strings.
/// Returns null if the subject is invalid (empty tokens).
@@ -879,6 +966,7 @@ public sealed class SubList : IDisposable
public TrieLevel? Next;
public readonly HashSet<Subscription> PlainSubs = [];
public readonly Dictionary<string, HashSet<Subscription>> QueueSubs = new(StringComparer.Ordinal);
public bool PackedListEnabled;
public bool IsEmpty => PlainSubs.Count == 0 && QueueSubs.Count == 0 &&
(Next == null || (Next.Nodes.Count == 0 && Next.Pwc == null && Next.Fwc == null));

View File

@@ -0,0 +1,29 @@
namespace NATS.Server.Subscriptions;
public sealed class SubListCacheSweeper
{
private int _scheduled;
public void ScheduleSweep(Action sweep)
{
if (Interlocked.CompareExchange(ref _scheduled, 1, 0) != 0)
return;
_ = Task.Run(() =>
{
try
{
sweep();
}
finally
{
Interlocked.Exchange(ref _scheduled, 0);
}
});
}
public Task TriggerSweepAsync(Action sweep)
{
return Task.Run(sweep);
}
}