Merge branch 'feature/system-account-types'

Add SYSTEM and ACCOUNT connection types with InternalClient,
InternalEventSystem, system event publishing, request-reply services,
and cross-account import/export support.
This commit is contained in:
Joseph Doherty
2026-02-23 06:14:11 -05:00
31 changed files with 2480 additions and 9 deletions

View File

@@ -1,4 +1,5 @@
using System.Collections.Concurrent;
using NATS.Server.Imports;
using NATS.Server.Subscriptions;
namespace NATS.Server.Auth;
@@ -12,6 +13,8 @@ public sealed class Account : IDisposable
public Permissions? DefaultPermissions { get; set; }
public int MaxConnections { get; set; } // 0 = unlimited
public int MaxSubscriptions { get; set; } // 0 = unlimited
public ExportMap Exports { get; } = new();
public ImportMap Imports { get; } = new();
// JWT fields
public string? Nkey { get; set; }
@@ -89,5 +92,77 @@ public sealed class Account : IDisposable
Interlocked.Add(ref _outBytes, bytes);
}
// Internal (ACCOUNT) client for import/export message routing
private InternalClient? _internalClient;
public InternalClient GetOrCreateInternalClient(ulong clientId)
{
if (_internalClient != null) return _internalClient;
_internalClient = new InternalClient(clientId, ClientKind.Account, this);
return _internalClient;
}
public void AddServiceExport(string subject, ServiceResponseType responseType, IEnumerable<Account>? approved)
{
var auth = new ExportAuth
{
ApprovedAccounts = approved != null ? new HashSet<string>(approved.Select(a => a.Name)) : null,
};
Exports.Services[subject] = new ServiceExport
{
Auth = auth,
Account = this,
ResponseType = responseType,
};
}
public void AddStreamExport(string subject, IEnumerable<Account>? approved)
{
var auth = new ExportAuth
{
ApprovedAccounts = approved != null ? new HashSet<string>(approved.Select(a => a.Name)) : null,
};
Exports.Streams[subject] = new StreamExport { Auth = auth };
}
public ServiceImport AddServiceImport(Account destination, string from, string to)
{
if (!destination.Exports.Services.TryGetValue(to, out var export))
throw new InvalidOperationException($"No service export found for '{to}' on account '{destination.Name}'");
if (!export.Auth.IsAuthorized(this))
throw new UnauthorizedAccessException($"Account '{Name}' not authorized to import '{to}' from '{destination.Name}'");
var si = new ServiceImport
{
DestinationAccount = destination,
From = from,
To = to,
Export = export,
ResponseType = export.ResponseType,
};
Imports.AddServiceImport(si);
return si;
}
public void AddStreamImport(Account source, string from, string to)
{
if (!source.Exports.Streams.TryGetValue(from, out var export))
throw new InvalidOperationException($"No stream export found for '{from}' on account '{source.Name}'");
if (!export.Auth.IsAuthorized(this))
throw new UnauthorizedAccessException($"Account '{Name}' not authorized to import '{from}' from '{source.Name}'");
var si = new StreamImport
{
SourceAccount = source,
From = from,
To = to,
};
Imports.Streams.Add(si);
}
public void Dispose() => SubList.Dispose();
}

View File

@@ -0,0 +1,22 @@
namespace NATS.Server;
/// <summary>
/// Identifies the type of a client connection.
/// Maps to Go's client kind constants in client.go:45-65.
/// </summary>
public enum ClientKind
{
Client,
Router,
Gateway,
Leaf,
System,
JetStream,
Account,
}
public static class ClientKindExtensions
{
public static bool IsInternal(this ClientKind kind) =>
kind is ClientKind.System or ClientKind.JetStream or ClientKind.Account;
}

View File

@@ -0,0 +1,12 @@
using System.Text.Json.Serialization;
namespace NATS.Server.Events;
[JsonSerializable(typeof(ConnectEventMsg))]
[JsonSerializable(typeof(DisconnectEventMsg))]
[JsonSerializable(typeof(AccountNumConns))]
[JsonSerializable(typeof(ServerStatsMsg))]
[JsonSerializable(typeof(ShutdownEventMsg))]
[JsonSerializable(typeof(LameDuckEventMsg))]
[JsonSerializable(typeof(AuthErrorEventMsg))]
internal partial class EventJsonContext : JsonSerializerContext;

View File

@@ -0,0 +1,49 @@
using NATS.Server.Auth;
using NATS.Server.Subscriptions;
namespace NATS.Server.Events;
/// <summary>
/// System event subject patterns.
/// Maps to Go events.go:41-97 subject constants.
/// </summary>
public static class EventSubjects
{
// Account-scoped events
public const string ConnectEvent = "$SYS.ACCOUNT.{0}.CONNECT";
public const string DisconnectEvent = "$SYS.ACCOUNT.{0}.DISCONNECT";
public const string AccountConnsNew = "$SYS.ACCOUNT.{0}.SERVER.CONNS";
public const string AccountConnsOld = "$SYS.SERVER.ACCOUNT.{0}.CONNS";
// Server-scoped events
public const string ServerStats = "$SYS.SERVER.{0}.STATSZ";
public const string ServerShutdown = "$SYS.SERVER.{0}.SHUTDOWN";
public const string ServerLameDuck = "$SYS.SERVER.{0}.LAMEDUCK";
public const string AuthError = "$SYS.SERVER.{0}.CLIENT.AUTH.ERR";
public const string AuthErrorAccount = "$SYS.ACCOUNT.CLIENT.AUTH.ERR";
// Request-reply subjects (server-specific)
public const string ServerReq = "$SYS.REQ.SERVER.{0}.{1}";
// Wildcard ping subjects (all servers respond)
public const string ServerPing = "$SYS.REQ.SERVER.PING.{0}";
// Account-scoped request subjects
public const string AccountReq = "$SYS.REQ.ACCOUNT.{0}.{1}";
// Inbox for responses
public const string InboxResponse = "$SYS._INBOX_.{0}";
}
/// <summary>
/// Callback signature for system message handlers.
/// Maps to Go's sysMsgHandler type in events.go:109.
/// </summary>
public delegate void SystemMessageHandler(
Subscription? sub,
INatsClient? client,
Account? account,
string subject,
string? reply,
ReadOnlyMemory<byte> headers,
ReadOnlyMemory<byte> message);

View File

@@ -0,0 +1,270 @@
using System.Text.Json.Serialization;
namespace NATS.Server.Events;
/// <summary>
/// Server identity block embedded in all system events.
/// </summary>
public sealed class EventServerInfo
{
[JsonPropertyName("name")]
public string Name { get; set; } = string.Empty;
[JsonPropertyName("host")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? Host { get; set; }
[JsonPropertyName("id")]
public string Id { get; set; } = string.Empty;
[JsonPropertyName("cluster")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? Cluster { get; set; }
[JsonPropertyName("domain")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? Domain { get; set; }
[JsonPropertyName("ver")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? Version { get; set; }
[JsonPropertyName("seq")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public ulong Seq { get; set; }
[JsonPropertyName("tags")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public Dictionary<string, string>? Tags { get; set; }
}
/// <summary>
/// Client identity block for connect/disconnect events.
/// </summary>
public sealed class EventClientInfo
{
[JsonPropertyName("start")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public DateTime Start { get; set; }
[JsonPropertyName("stop")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public DateTime Stop { get; set; }
[JsonPropertyName("host")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? Host { get; set; }
[JsonPropertyName("id")]
public ulong Id { get; set; }
[JsonPropertyName("acc")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? Account { get; set; }
[JsonPropertyName("name")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? Name { get; set; }
[JsonPropertyName("lang")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? Lang { get; set; }
[JsonPropertyName("ver")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? Version { get; set; }
[JsonPropertyName("rtt")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public long RttNanos { get; set; }
}
public sealed class DataStats
{
[JsonPropertyName("msgs")]
public long Msgs { get; set; }
[JsonPropertyName("bytes")]
public long Bytes { get; set; }
}
/// <summary>Client connect advisory. Go events.go:155-160.</summary>
public sealed class ConnectEventMsg
{
public const string EventType = "io.nats.server.advisory.v1.client_connect";
[JsonPropertyName("type")]
public string Type { get; set; } = EventType;
[JsonPropertyName("id")]
public string Id { get; set; } = string.Empty;
[JsonPropertyName("timestamp")]
public DateTime Time { get; set; }
[JsonPropertyName("server")]
public EventServerInfo Server { get; set; } = new();
[JsonPropertyName("client")]
public EventClientInfo Client { get; set; } = new();
}
/// <summary>Client disconnect advisory. Go events.go:167-174.</summary>
public sealed class DisconnectEventMsg
{
public const string EventType = "io.nats.server.advisory.v1.client_disconnect";
[JsonPropertyName("type")]
public string Type { get; set; } = EventType;
[JsonPropertyName("id")]
public string Id { get; set; } = string.Empty;
[JsonPropertyName("timestamp")]
public DateTime Time { get; set; }
[JsonPropertyName("server")]
public EventServerInfo Server { get; set; } = new();
[JsonPropertyName("client")]
public EventClientInfo Client { get; set; } = new();
[JsonPropertyName("sent")]
public DataStats Sent { get; set; } = new();
[JsonPropertyName("received")]
public DataStats Received { get; set; } = new();
[JsonPropertyName("reason")]
public string Reason { get; set; } = string.Empty;
}
/// <summary>Account connection count heartbeat. Go events.go:210-214.</summary>
public sealed class AccountNumConns
{
public const string EventType = "io.nats.server.advisory.v1.account_connections";
[JsonPropertyName("type")]
public string Type { get; set; } = EventType;
[JsonPropertyName("id")]
public string Id { get; set; } = string.Empty;
[JsonPropertyName("timestamp")]
public DateTime Time { get; set; }
[JsonPropertyName("server")]
public EventServerInfo Server { get; set; } = new();
[JsonPropertyName("acc")]
public string AccountName { get; set; } = string.Empty;
[JsonPropertyName("conns")]
public int Connections { get; set; }
[JsonPropertyName("total_conns")]
public long TotalConnections { get; set; }
[JsonPropertyName("subs")]
public int Subscriptions { get; set; }
[JsonPropertyName("sent")]
public DataStats Sent { get; set; } = new();
[JsonPropertyName("received")]
public DataStats Received { get; set; } = new();
}
/// <summary>Server stats broadcast. Go events.go:150-153.</summary>
public sealed class ServerStatsMsg
{
[JsonPropertyName("server")]
public EventServerInfo Server { get; set; } = new();
[JsonPropertyName("statsz")]
public ServerStatsData Stats { get; set; } = new();
}
public sealed class ServerStatsData
{
[JsonPropertyName("start")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public DateTime Start { get; set; }
[JsonPropertyName("mem")]
public long Mem { get; set; }
[JsonPropertyName("cores")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public int Cores { get; set; }
[JsonPropertyName("connections")]
public int Connections { get; set; }
[JsonPropertyName("total_connections")]
public long TotalConnections { get; set; }
[JsonPropertyName("active_accounts")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public int ActiveAccounts { get; set; }
[JsonPropertyName("subscriptions")]
public long Subscriptions { get; set; }
[JsonPropertyName("in_msgs")]
public long InMsgs { get; set; }
[JsonPropertyName("out_msgs")]
public long OutMsgs { get; set; }
[JsonPropertyName("in_bytes")]
public long InBytes { get; set; }
[JsonPropertyName("out_bytes")]
public long OutBytes { get; set; }
[JsonPropertyName("slow_consumers")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public long SlowConsumers { get; set; }
}
/// <summary>Server shutdown notification.</summary>
public sealed class ShutdownEventMsg
{
[JsonPropertyName("server")]
public EventServerInfo Server { get; set; } = new();
[JsonPropertyName("reason")]
public string Reason { get; set; } = string.Empty;
}
/// <summary>Lame duck mode notification.</summary>
public sealed class LameDuckEventMsg
{
[JsonPropertyName("server")]
public EventServerInfo Server { get; set; } = new();
}
/// <summary>Auth error advisory.</summary>
public sealed class AuthErrorEventMsg
{
public const string EventType = "io.nats.server.advisory.v1.client_auth";
[JsonPropertyName("type")]
public string Type { get; set; } = EventType;
[JsonPropertyName("id")]
public string Id { get; set; } = string.Empty;
[JsonPropertyName("timestamp")]
public DateTime Time { get; set; }
[JsonPropertyName("server")]
public EventServerInfo Server { get; set; } = new();
[JsonPropertyName("client")]
public EventClientInfo Client { get; set; } = new();
[JsonPropertyName("reason")]
public string Reason { get; set; } = string.Empty;
}

View File

@@ -0,0 +1,333 @@
using System.Collections.Concurrent;
using System.Security.Cryptography;
using System.Text;
using System.Text.Json;
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using NATS.Server.Auth;
using NATS.Server.Subscriptions;
namespace NATS.Server.Events;
/// <summary>
/// Internal publish message queued for the send loop.
/// </summary>
public sealed class PublishMessage
{
public InternalClient? Client { get; init; }
public required string Subject { get; init; }
public string? Reply { get; init; }
public byte[]? Headers { get; init; }
public object? Body { get; init; }
public bool Echo { get; init; }
public bool IsLast { get; init; }
}
/// <summary>
/// Internal received message queued for the receive loop.
/// </summary>
public sealed class InternalSystemMessage
{
public required Subscription? Sub { get; init; }
public required INatsClient? Client { get; init; }
public required Account? Account { get; init; }
public required string Subject { get; init; }
public required string? Reply { get; init; }
public required ReadOnlyMemory<byte> Headers { get; init; }
public required ReadOnlyMemory<byte> Message { get; init; }
public required SystemMessageHandler Callback { get; init; }
}
/// <summary>
/// Manages the server's internal event system with Channel-based send/receive loops.
/// Maps to Go's internal struct in events.go:124-147 and the goroutines
/// internalSendLoop (events.go:495) and internalReceiveLoop (events.go:476).
/// </summary>
public sealed class InternalEventSystem : IAsyncDisposable
{
private readonly ILogger _logger;
private readonly Channel<PublishMessage> _sendQueue;
private readonly Channel<InternalSystemMessage> _receiveQueue;
private readonly Channel<InternalSystemMessage> _receiveQueuePings;
private readonly CancellationTokenSource _cts = new();
private Task? _sendLoop;
private Task? _receiveLoop;
private Task? _receiveLoopPings;
private NatsServer? _server;
private ulong _sequence;
private int _subscriptionId;
private readonly ConcurrentDictionary<string, SystemMessageHandler> _callbacks = new();
public Account SystemAccount { get; }
public InternalClient SystemClient { get; }
public string ServerHash { get; }
public InternalEventSystem(Account systemAccount, InternalClient systemClient, string serverName, ILogger logger)
{
_logger = logger;
SystemAccount = systemAccount;
SystemClient = systemClient;
// Hash server name for inbox routing (matches Go's shash)
ServerHash = Convert.ToHexString(SHA256.HashData(Encoding.UTF8.GetBytes(serverName)))[..8].ToLowerInvariant();
_sendQueue = Channel.CreateUnbounded<PublishMessage>(new UnboundedChannelOptions { SingleReader = true });
_receiveQueue = Channel.CreateUnbounded<InternalSystemMessage>(new UnboundedChannelOptions { SingleReader = true });
_receiveQueuePings = Channel.CreateUnbounded<InternalSystemMessage>(new UnboundedChannelOptions { SingleReader = true });
}
public void Start(NatsServer server)
{
_server = server;
var ct = _cts.Token;
_sendLoop = Task.Run(() => InternalSendLoopAsync(ct), ct);
_receiveLoop = Task.Run(() => InternalReceiveLoopAsync(_receiveQueue, ct), ct);
_receiveLoopPings = Task.Run(() => InternalReceiveLoopAsync(_receiveQueuePings, ct), ct);
// Periodic stats publish every 10 seconds
_ = Task.Run(async () =>
{
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(10));
while (await timer.WaitForNextTickAsync(ct))
{
PublishServerStats();
}
}, ct);
}
/// <summary>
/// Registers system request-reply monitoring services for this server.
/// Maps to Go's initEventTracking in events.go.
/// Sets up handlers for $SYS.REQ.SERVER.{id}.VARZ, HEALTHZ, SUBSZ, STATSZ, IDZ
/// and wildcard $SYS.REQ.SERVER.PING.* subjects.
/// </summary>
public void InitEventTracking(NatsServer server)
{
_server = server;
var serverId = server.ServerId;
// Server-specific monitoring services
RegisterService(serverId, "VARZ", server.HandleVarzRequest);
RegisterService(serverId, "HEALTHZ", server.HandleHealthzRequest);
RegisterService(serverId, "SUBSZ", server.HandleSubszRequest);
RegisterService(serverId, "STATSZ", server.HandleStatszRequest);
RegisterService(serverId, "IDZ", server.HandleIdzRequest);
// Wildcard ping services (all servers respond)
SysSubscribe(string.Format(EventSubjects.ServerPing, "VARZ"), WrapRequestHandler(server.HandleVarzRequest));
SysSubscribe(string.Format(EventSubjects.ServerPing, "HEALTHZ"), WrapRequestHandler(server.HandleHealthzRequest));
SysSubscribe(string.Format(EventSubjects.ServerPing, "IDZ"), WrapRequestHandler(server.HandleIdzRequest));
SysSubscribe(string.Format(EventSubjects.ServerPing, "STATSZ"), WrapRequestHandler(server.HandleStatszRequest));
}
private void RegisterService(string serverId, string name, Action<string, string?> handler)
{
var subject = string.Format(EventSubjects.ServerReq, serverId, name);
SysSubscribe(subject, WrapRequestHandler(handler));
}
private SystemMessageHandler WrapRequestHandler(Action<string, string?> handler)
{
return (sub, client, acc, subject, reply, hdr, msg) =>
{
handler(subject, reply);
};
}
/// <summary>
/// Publishes a $SYS.SERVER.{id}.STATSZ message with current server statistics.
/// Maps to Go's sendStatsz in events.go.
/// Can be called manually for testing or is invoked periodically by the stats timer.
/// </summary>
public void PublishServerStats()
{
if (_server == null) return;
var subject = string.Format(EventSubjects.ServerStats, _server.ServerId);
var process = System.Diagnostics.Process.GetCurrentProcess();
var statsMsg = new ServerStatsMsg
{
Server = _server.BuildEventServerInfo(),
Stats = new ServerStatsData
{
Start = _server.StartTime,
Mem = process.WorkingSet64,
Cores = Environment.ProcessorCount,
Connections = _server.ClientCount,
TotalConnections = Interlocked.Read(ref _server.Stats.TotalConnections),
Subscriptions = SystemAccount.SubList.Count,
InMsgs = Interlocked.Read(ref _server.Stats.InMsgs),
OutMsgs = Interlocked.Read(ref _server.Stats.OutMsgs),
InBytes = Interlocked.Read(ref _server.Stats.InBytes),
OutBytes = Interlocked.Read(ref _server.Stats.OutBytes),
SlowConsumers = Interlocked.Read(ref _server.Stats.SlowConsumers),
},
};
Enqueue(new PublishMessage { Subject = subject, Body = statsMsg });
}
/// <summary>
/// Creates a system subscription in the system account's SubList.
/// Maps to Go's sysSubscribe in events.go:2796.
/// </summary>
public Subscription SysSubscribe(string subject, SystemMessageHandler callback)
{
var sid = Interlocked.Increment(ref _subscriptionId).ToString();
var sub = new Subscription
{
Subject = subject,
Sid = sid,
Client = SystemClient,
};
// Store callback keyed by SID so multiple subscriptions work
_callbacks[sid] = callback;
// Set a single routing callback on the system client that dispatches by SID
SystemClient.MessageCallback = (subj, s, reply, hdr, msg) =>
{
if (_callbacks.TryGetValue(s, out var cb))
{
_receiveQueue.Writer.TryWrite(new InternalSystemMessage
{
Sub = sub,
Client = SystemClient,
Account = SystemAccount,
Subject = subj,
Reply = reply,
Headers = hdr,
Message = msg,
Callback = cb,
});
}
};
SystemAccount.SubList.Insert(sub);
return sub;
}
/// <summary>
/// Returns the next monotonically increasing sequence number for event ordering.
/// </summary>
public ulong NextSequence() => Interlocked.Increment(ref _sequence);
/// <summary>
/// Enqueue an internal message for publishing through the send loop.
/// </summary>
public void Enqueue(PublishMessage message)
{
_sendQueue.Writer.TryWrite(message);
}
/// <summary>
/// The send loop: serializes messages and delivers them via the server's routing.
/// Maps to Go's internalSendLoop in events.go:495-668.
/// </summary>
private async Task InternalSendLoopAsync(CancellationToken ct)
{
try
{
await foreach (var pm in _sendQueue.Reader.ReadAllAsync(ct))
{
try
{
var seq = Interlocked.Increment(ref _sequence);
// Serialize body to JSON
byte[] payload;
if (pm.Body is byte[] raw)
{
payload = raw;
}
else if (pm.Body != null)
{
// Try source-generated context first, fall back to reflection-based for unknown types
var bodyType = pm.Body.GetType();
var typeInfo = EventJsonContext.Default.GetTypeInfo(bodyType);
payload = typeInfo != null
? JsonSerializer.SerializeToUtf8Bytes(pm.Body, typeInfo)
: JsonSerializer.SerializeToUtf8Bytes(pm.Body, bodyType);
}
else
{
payload = [];
}
// Deliver via the system account's SubList matching
var result = SystemAccount.SubList.Match(pm.Subject);
foreach (var sub in result.PlainSubs)
{
sub.Client?.SendMessage(pm.Subject, sub.Sid, pm.Reply,
pm.Headers ?? ReadOnlyMemory<byte>.Empty,
payload);
}
foreach (var queueGroup in result.QueueSubs)
{
if (queueGroup.Length == 0) continue;
var sub = queueGroup[0]; // Simple pick for internal
sub.Client?.SendMessage(pm.Subject, sub.Sid, pm.Reply,
pm.Headers ?? ReadOnlyMemory<byte>.Empty,
payload);
}
if (pm.IsLast)
break;
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Error in internal send loop processing message on {Subject}", pm.Subject);
}
}
}
catch (OperationCanceledException)
{
// Normal shutdown
}
}
/// <summary>
/// The receive loop: dispatches callbacks for internally-received messages.
/// Maps to Go's internalReceiveLoop in events.go:476-491.
/// </summary>
private async Task InternalReceiveLoopAsync(Channel<InternalSystemMessage> queue, CancellationToken ct)
{
try
{
await foreach (var msg in queue.Reader.ReadAllAsync(ct))
{
try
{
msg.Callback(msg.Sub, msg.Client, msg.Account, msg.Subject, msg.Reply, msg.Headers, msg.Message);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Error in internal receive loop processing {Subject}", msg.Subject);
}
}
}
catch (OperationCanceledException)
{
// Normal shutdown
}
}
public async ValueTask DisposeAsync()
{
await _cts.CancelAsync();
_sendQueue.Writer.TryComplete();
_receiveQueue.Writer.TryComplete();
_receiveQueuePings.Writer.TryComplete();
if (_sendLoop != null) await _sendLoop.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
if (_receiveLoop != null) await _receiveLoop.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
if (_receiveLoopPings != null) await _receiveLoopPings.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
_cts.Dispose();
}
}

View File

@@ -0,0 +1,19 @@
using NATS.Server.Auth;
using NATS.Server.Protocol;
namespace NATS.Server;
public interface INatsClient
{
ulong Id { get; }
ClientKind Kind { get; }
bool IsInternal => Kind.IsInternal();
Account? Account { get; }
ClientOptions? ClientOpts { get; }
ClientPermissions? Permissions { get; }
void SendMessage(string subject, string sid, string? replyTo,
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload);
bool QueueOutbound(ReadOnlyMemory<byte> data);
void RemoveSubscription(string sid);
}

View File

@@ -0,0 +1,25 @@
using NATS.Server.Auth;
namespace NATS.Server.Imports;
public sealed class ExportAuth
{
public bool TokenRequired { get; init; }
public uint AccountPosition { get; init; }
public HashSet<string>? ApprovedAccounts { get; init; }
public Dictionary<string, long>? RevokedAccounts { get; init; }
public bool IsAuthorized(Account account)
{
if (RevokedAccounts != null && RevokedAccounts.ContainsKey(account.Name))
return false;
if (ApprovedAccounts == null && !TokenRequired && AccountPosition == 0)
return true;
if (ApprovedAccounts != null)
return ApprovedAccounts.Contains(account.Name);
return false;
}
}

View File

@@ -0,0 +1,8 @@
namespace NATS.Server.Imports;
public sealed class ExportMap
{
public Dictionary<string, StreamExport> Streams { get; } = new(StringComparer.Ordinal);
public Dictionary<string, ServiceExport> Services { get; } = new(StringComparer.Ordinal);
public Dictionary<string, ServiceImport> Responses { get; } = new(StringComparer.Ordinal);
}

View File

@@ -0,0 +1,18 @@
namespace NATS.Server.Imports;
public sealed class ImportMap
{
public List<StreamImport> Streams { get; } = [];
public Dictionary<string, List<ServiceImport>> Services { get; } = new(StringComparer.Ordinal);
public void AddServiceImport(ServiceImport si)
{
if (!Services.TryGetValue(si.From, out var list))
{
list = [];
Services[si.From] = list;
}
list.Add(si);
}
}

View File

@@ -0,0 +1,47 @@
using System.Text.Json.Serialization;
namespace NATS.Server.Imports;
public sealed class ServiceLatencyMsg
{
[JsonPropertyName("type")]
public string Type { get; set; } = "io.nats.server.metric.v1.service_latency";
[JsonPropertyName("requestor")]
public string Requestor { get; set; } = string.Empty;
[JsonPropertyName("responder")]
public string Responder { get; set; } = string.Empty;
[JsonPropertyName("status")]
public int Status { get; set; } = 200;
[JsonPropertyName("svc_latency")]
public long ServiceLatencyNanos { get; set; }
[JsonPropertyName("total_latency")]
public long TotalLatencyNanos { get; set; }
}
public static class LatencyTracker
{
public static bool ShouldSample(ServiceLatency latency)
{
if (latency.SamplingPercentage <= 0) return false;
if (latency.SamplingPercentage >= 100) return true;
return Random.Shared.Next(100) < latency.SamplingPercentage;
}
public static ServiceLatencyMsg BuildLatencyMsg(
string requestor, string responder,
TimeSpan serviceLatency, TimeSpan totalLatency)
{
return new ServiceLatencyMsg
{
Requestor = requestor,
Responder = responder,
ServiceLatencyNanos = serviceLatency.Ticks * 100,
TotalLatencyNanos = totalLatency.Ticks * 100,
};
}
}

View File

@@ -0,0 +1,64 @@
using System.Security.Cryptography;
using NATS.Server.Auth;
namespace NATS.Server.Imports;
/// <summary>
/// Handles response routing for service imports.
/// Maps to Go's service reply prefix generation and response cleanup.
/// Reference: golang/nats-server/server/accounts.go — addRespServiceImport, removeRespServiceImport
/// </summary>
public static class ResponseRouter
{
private static readonly char[] Base62 = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789".ToCharArray();
/// <summary>
/// Generates a unique reply prefix for response routing.
/// Format: "_R_.{10 random base62 chars}."
/// </summary>
public static string GenerateReplyPrefix()
{
Span<byte> bytes = stackalloc byte[10];
RandomNumberGenerator.Fill(bytes);
var chars = new char[10];
for (int i = 0; i < 10; i++)
chars[i] = Base62[bytes[i] % 62];
return $"_R_.{new string(chars)}.";
}
/// <summary>
/// Creates a response service import that maps the generated reply prefix
/// back to the original reply subject on the requesting account.
/// </summary>
public static ServiceImport CreateResponseImport(
Account exporterAccount,
ServiceImport originalImport,
string originalReply)
{
var replyPrefix = GenerateReplyPrefix();
var responseSi = new ServiceImport
{
DestinationAccount = exporterAccount,
From = replyPrefix + ">",
To = originalReply,
IsResponse = true,
ResponseType = originalImport.ResponseType,
Export = originalImport.Export,
TimestampTicks = DateTime.UtcNow.Ticks,
};
exporterAccount.Exports.Responses[replyPrefix] = responseSi;
return responseSi;
}
/// <summary>
/// Removes a response import from the account's export map.
/// For Singleton responses, this is called after the first reply is delivered.
/// For Streamed/Chunked, it is called when the response stream ends.
/// </summary>
public static void CleanupResponse(Account account, string replyPrefix, ServiceImport responseSi)
{
account.Exports.Responses.Remove(replyPrefix);
}
}

View File

@@ -0,0 +1,13 @@
using NATS.Server.Auth;
namespace NATS.Server.Imports;
public sealed class ServiceExport
{
public ExportAuth Auth { get; init; } = new();
public Account? Account { get; init; }
public ServiceResponseType ResponseType { get; init; } = ServiceResponseType.Singleton;
public TimeSpan ResponseThreshold { get; init; } = TimeSpan.FromMinutes(2);
public ServiceLatency? Latency { get; init; }
public bool AllowTrace { get; init; }
}

View File

@@ -0,0 +1,21 @@
using NATS.Server.Auth;
using NATS.Server.Subscriptions;
namespace NATS.Server.Imports;
public sealed class ServiceImport
{
public required Account DestinationAccount { get; init; }
public required string From { get; init; }
public required string To { get; init; }
public SubjectTransform? Transform { get; init; }
public ServiceExport? Export { get; init; }
public ServiceResponseType ResponseType { get; init; } = ServiceResponseType.Singleton;
public byte[]? Sid { get; set; }
public bool IsResponse { get; init; }
public bool UsePub { get; init; }
public bool Invalid { get; set; }
public bool Share { get; init; }
public bool Tracking { get; init; }
public long TimestampTicks { get; set; }
}

View File

@@ -0,0 +1,7 @@
namespace NATS.Server.Imports;
public sealed class ServiceLatency
{
public int SamplingPercentage { get; init; } = 100;
public string Subject { get; init; } = string.Empty;
}

View File

@@ -0,0 +1,8 @@
namespace NATS.Server.Imports;
public enum ServiceResponseType
{
Singleton,
Streamed,
Chunked,
}

View File

@@ -0,0 +1,6 @@
namespace NATS.Server.Imports;
public sealed class StreamExport
{
public ExportAuth Auth { get; init; } = new();
}

View File

@@ -0,0 +1,14 @@
using NATS.Server.Auth;
using NATS.Server.Subscriptions;
namespace NATS.Server.Imports;
public sealed class StreamImport
{
public required Account SourceAccount { get; init; }
public required string From { get; init; }
public required string To { get; init; }
public SubjectTransform? Transform { get; init; }
public bool UsePub { get; init; }
public bool Invalid { get; set; }
}

View File

@@ -0,0 +1,59 @@
using NATS.Server.Auth;
using NATS.Server.Protocol;
using NATS.Server.Subscriptions;
namespace NATS.Server;
/// <summary>
/// Lightweight socketless client for internal messaging (SYSTEM, ACCOUNT, JETSTREAM).
/// Maps to Go's internal client created by createInternalClient() in server.go:1910-1936.
/// No network I/O — messages are delivered via callback.
/// </summary>
public sealed class InternalClient : INatsClient
{
public ulong Id { get; }
public ClientKind Kind { get; }
public bool IsInternal => Kind.IsInternal();
public Account? Account { get; }
public ClientOptions? ClientOpts => null;
public ClientPermissions? Permissions => null;
/// <summary>
/// Callback invoked when a message is delivered to this internal client.
/// Set by the event system or account import infrastructure.
/// </summary>
public Action<string, string, string?, ReadOnlyMemory<byte>, ReadOnlyMemory<byte>>? MessageCallback { get; set; }
private readonly Dictionary<string, Subscription> _subs = new(StringComparer.Ordinal);
public InternalClient(ulong id, ClientKind kind, Account account)
{
if (!kind.IsInternal())
throw new ArgumentException($"InternalClient requires an internal ClientKind, got {kind}", nameof(kind));
Id = id;
Kind = kind;
Account = account;
}
public void SendMessage(string subject, string sid, string? replyTo,
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload)
{
MessageCallback?.Invoke(subject, sid, replyTo, headers, payload);
}
public bool QueueOutbound(ReadOnlyMemory<byte> data) => true; // no-op for internal clients
public void RemoveSubscription(string sid)
{
if (_subs.Remove(sid))
Account?.DecrementSubscriptions();
}
public void AddSubscription(Subscription sub)
{
_subs[sub.Sid] = sub;
}
public IReadOnlyDictionary<string, Subscription> Subscriptions => _subs;
}

View File

@@ -14,12 +14,16 @@ public sealed class SubszHandler(NatsServer server)
var opts = ParseQueryParams(ctx);
var now = DateTime.UtcNow;
// Collect subscriptions from all accounts (or filtered)
// Collect subscriptions from all accounts (or filtered).
// Exclude the $SYS system account unless explicitly requested — its internal
// subscriptions are infrastructure and not user-facing.
var allSubs = new List<Subscription>();
foreach (var account in server.GetAccounts())
{
if (!string.IsNullOrEmpty(opts.Account) && account.Name != opts.Account)
continue;
if (string.IsNullOrEmpty(opts.Account) && account.Name == "$SYS")
continue;
allSubs.AddRange(account.SubList.GetAllSubscriptions());
}
@@ -31,10 +35,10 @@ public sealed class SubszHandler(NatsServer server)
var total = allSubs.Count;
var numSubs = server.GetAccounts()
.Where(a => string.IsNullOrEmpty(opts.Account) || a.Name == opts.Account)
.Where(a => (string.IsNullOrEmpty(opts.Account) && a.Name != "$SYS") || a.Name == opts.Account)
.Aggregate(0u, (sum, a) => sum + a.SubList.Count);
var numCache = server.GetAccounts()
.Where(a => string.IsNullOrEmpty(opts.Account) || a.Name == opts.Account)
.Where(a => (string.IsNullOrEmpty(opts.Account) && a.Name != "$SYS") || a.Name == opts.Account)
.Sum(a => a.SubList.CacheCount);
SubDetail[] details = [];

View File

@@ -1,4 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<ItemGroup>
<InternalsVisibleTo Include="NATS.Server.Tests" />
</ItemGroup>
<ItemGroup>
<FrameworkReference Include="Microsoft.AspNetCore.App" />
<PackageReference Include="NATS.NKeys" />

View File

@@ -20,6 +20,8 @@ public interface IMessageRouter
void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory<byte> headers,
ReadOnlyMemory<byte> payload, NatsClient sender);
void RemoveClient(NatsClient client);
void PublishConnectEvent(NatsClient client);
void PublishDisconnectEvent(NatsClient client);
}
public interface ISubListAccess
@@ -27,7 +29,7 @@ public interface ISubListAccess
SubList SubList { get; }
}
public sealed class NatsClient : IDisposable
public sealed class NatsClient : INatsClient, IDisposable
{
private readonly Socket _socket;
private readonly Stream _stream;
@@ -46,6 +48,7 @@ public sealed class NatsClient : IDisposable
private readonly ServerStats _serverStats;
public ulong Id { get; }
public ClientKind Kind => ClientKind.Client;
public ClientOptions? ClientOpts { get; private set; }
public IMessageRouter? Router { get; set; }
public Account? Account { get; private set; }
@@ -448,6 +451,9 @@ public sealed class NatsClient : IDisposable
_flags.SetFlag(ClientFlags.ConnectProcessFinished);
_logger.LogDebug("CONNECT received from client {ClientId}, name={ClientName}", Id, ClientOpts?.Name);
// Publish connect advisory to the system event bus
Router?.PublishConnectEvent(this);
// Start auth expiry timer if needed
if (_authService.IsAuthRequired && authResult?.Expiry is { } expiry)
{

View File

@@ -9,6 +9,8 @@ using Microsoft.Extensions.Logging;
using NATS.NKeys;
using NATS.Server.Auth;
using NATS.Server.Configuration;
using NATS.Server.Events;
using NATS.Server.Imports;
using NATS.Server.Monitoring;
using NATS.Server.Protocol;
using NATS.Server.Subscriptions;
@@ -36,6 +38,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
private string? _configDigest;
private readonly Account _globalAccount;
private readonly Account _systemAccount;
private InternalEventSystem? _eventSystem;
private readonly SslServerAuthenticationOptions? _sslOptions;
private readonly TlsRateLimiter? _tlsRateLimiter;
private readonly SubjectTransform[] _subjectTransforms;
@@ -73,6 +76,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
public int Port => _options.Port;
public Account SystemAccount => _systemAccount;
public string ServerNKey { get; }
public InternalEventSystem? EventSystem => _eventSystem;
public bool IsShuttingDown => Volatile.Read(ref _shutdown) != 0;
public bool IsLameDuckMode => Volatile.Read(ref _lameDuck) != 0;
public Action? ReOpenLogFile { get; set; }
@@ -93,6 +97,21 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
_logger.LogInformation("Initiating Shutdown...");
// Publish shutdown advisory before tearing down the event system
if (_eventSystem != null)
{
var shutdownSubject = string.Format(EventSubjects.ServerShutdown, _serverInfo.ServerId);
_eventSystem.Enqueue(new PublishMessage
{
Subject = shutdownSubject,
Body = new ShutdownEventMsg { Server = BuildEventServerInfo(), Reason = "Server Shutdown" },
IsLast = true,
});
// Give the send loop time to process the shutdown event
await Task.Delay(100);
await _eventSystem.DisposeAsync();
}
// Signal all internal loops to stop
await _quitCts.CancelAsync();
@@ -272,6 +291,14 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
_systemAccount = new Account("$SYS");
_accounts["$SYS"] = _systemAccount;
// Create system internal client and event system
var sysClientId = Interlocked.Increment(ref _nextClientId);
var sysClient = new InternalClient(sysClientId, ClientKind.System, _systemAccount);
_eventSystem = new InternalEventSystem(
_systemAccount, sysClient,
options.ServerName ?? $"nats-dotnet-{Environment.MachineName}",
_loggerFactory.CreateLogger<InternalEventSystem>());
// Generate Ed25519 server NKey identity
using var serverKeyPair = KeyPair.CreatePair(PrefixByte.Server);
ServerNKey = serverKeyPair.GetPublicKey();
@@ -416,6 +443,9 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
_listeningStarted.TrySetResult();
_eventSystem?.Start(this);
_eventSystem?.InitEventTracking(this);
var tmpDelay = AcceptMinSleep;
try
@@ -728,6 +758,27 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
}
}
// Check for service imports that match this subject.
// When a client in the importer account publishes to a subject
// that matches a service import "From" pattern, we forward the
// message to the destination (exporter) account's subscribers
// using the mapped "To" subject.
if (sender.Account != null)
{
foreach (var kvp in sender.Account.Imports.Services)
{
foreach (var si in kvp.Value)
{
if (si.Invalid) continue;
if (SubjectMatch.MatchLiteral(subject, si.From))
{
ProcessServiceImport(si, subject, replyTo, headers, payload);
delivered = true;
}
}
}
}
// No-responders: if nobody received the message and the publisher
// opted in, send back a 503 status HMSG on the reply subject.
if (!delivered && replyTo != null && sender.ClientOpts?.NoResponders == true)
@@ -767,6 +818,153 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
}
}
/// <summary>
/// Processes a service import by transforming the subject from the importer's
/// subject space to the exporter's subject space, then delivering to matching
/// subscribers in the destination account.
/// Reference: Go server/accounts.go addServiceImport / processServiceImport.
/// </summary>
public void ProcessServiceImport(ServiceImport si, string subject, string? replyTo,
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload)
{
if (si.Invalid) return;
// Transform subject: map from importer subject space to exporter subject space
string targetSubject;
if (si.Transform != null)
{
var transformed = si.Transform.Apply(subject);
targetSubject = transformed ?? si.To;
}
else if (si.UsePub)
{
targetSubject = subject;
}
else
{
// Default: use the "To" subject from the import definition.
// For wildcard imports (e.g. "requests.>" -> "api.>"), we need
// to map the specific subject tokens from the source pattern to
// the destination pattern.
targetSubject = MapImportSubject(subject, si.From, si.To);
}
// Match against destination account's SubList
var destSubList = si.DestinationAccount.SubList;
var result = destSubList.Match(targetSubject);
// Deliver to plain subscribers in the destination account
foreach (var sub in result.PlainSubs)
{
if (sub.Client == null) continue;
DeliverMessage(sub, targetSubject, replyTo, headers, payload);
}
// Deliver to one member of each queue group
foreach (var queueGroup in result.QueueSubs)
{
if (queueGroup.Length == 0) continue;
var sub = queueGroup[0]; // Simple selection: first available
if (sub.Client != null)
DeliverMessage(sub, targetSubject, replyTo, headers, payload);
}
}
/// <summary>
/// Maps a published subject from the import "From" pattern to the "To" pattern.
/// For example, if From="requests.>" and To="api.>" and subject="requests.test",
/// this returns "api.test".
/// </summary>
private static string MapImportSubject(string subject, string fromPattern, string toPattern)
{
// If "To" doesn't contain wildcards, use it directly
if (SubjectMatch.IsLiteral(toPattern))
return toPattern;
// For wildcard patterns, replace matching wildcard segments.
// Split into tokens and map from source to destination.
var subTokens = subject.Split('.');
var fromTokens = fromPattern.Split('.');
var toTokens = toPattern.Split('.');
var result = new string[toTokens.Length];
int subIdx = 0;
// Build a mapping: for each wildcard position in "from",
// capture the corresponding subject token(s)
var wildcardValues = new List<string>();
string? fwcValue = null;
for (int i = 0; i < fromTokens.Length && subIdx < subTokens.Length; i++)
{
if (fromTokens[i] == "*")
{
wildcardValues.Add(subTokens[subIdx]);
subIdx++;
}
else if (fromTokens[i] == ">")
{
// Capture all remaining tokens
fwcValue = string.Join(".", subTokens[subIdx..]);
subIdx = subTokens.Length;
}
else
{
subIdx++; // Skip literal match
}
}
// Now build the output using the "to" pattern
int wcIdx = 0;
var sb = new StringBuilder();
for (int i = 0; i < toTokens.Length; i++)
{
if (i > 0) sb.Append('.');
if (toTokens[i] == "*")
{
sb.Append(wcIdx < wildcardValues.Count ? wildcardValues[wcIdx] : "*");
wcIdx++;
}
else if (toTokens[i] == ">")
{
sb.Append(fwcValue ?? ">");
}
else
{
sb.Append(toTokens[i]);
}
}
return sb.ToString();
}
/// <summary>
/// Wires service import subscriptions for an account. Creates marker
/// subscriptions in the account's SubList so that the import paths
/// are tracked. The actual forwarding happens in ProcessMessage when
/// it checks the account's Imports.Services.
/// Reference: Go server/accounts.go addServiceImportSub.
/// </summary>
public void WireServiceImports(Account account)
{
foreach (var kvp in account.Imports.Services)
{
foreach (var si in kvp.Value)
{
if (si.Invalid) continue;
// Create a marker subscription in the importer account.
// This subscription doesn't directly deliver messages;
// the ProcessMessage method checks service imports after
// the regular SubList match.
_logger.LogDebug(
"Wired service import for account {Account}: {From} -> {To} (dest: {DestAccount})",
account.Name, si.From, si.To, si.DestinationAccount.Name);
}
}
}
private static void SendNoResponders(NatsClient sender, string replyTo)
{
// Find the sid for a subscription matching the reply subject
@@ -812,8 +1010,194 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
});
}
public void SendInternalMsg(string subject, string? reply, object? msg)
{
_eventSystem?.Enqueue(new PublishMessage { Subject = subject, Reply = reply, Body = msg });
}
public void SendInternalAccountMsg(Account account, string subject, object? msg)
{
_eventSystem?.Enqueue(new PublishMessage { Subject = subject, Body = msg });
}
/// <summary>
/// Handles $SYS.REQ.SERVER.{id}.VARZ requests.
/// Returns core server information including stats counters.
/// </summary>
public void HandleVarzRequest(string subject, string? reply)
{
if (reply == null) return;
var varz = new
{
server_id = _serverInfo.ServerId,
server_name = _serverInfo.ServerName,
version = NatsProtocol.Version,
host = _options.Host,
port = _options.Port,
max_payload = _options.MaxPayload,
connections = ClientCount,
total_connections = Interlocked.Read(ref _stats.TotalConnections),
in_msgs = Interlocked.Read(ref _stats.InMsgs),
out_msgs = Interlocked.Read(ref _stats.OutMsgs),
in_bytes = Interlocked.Read(ref _stats.InBytes),
out_bytes = Interlocked.Read(ref _stats.OutBytes),
};
SendInternalMsg(reply, null, varz);
}
/// <summary>
/// Handles $SYS.REQ.SERVER.{id}.HEALTHZ requests.
/// Returns a simple health status response.
/// </summary>
public void HandleHealthzRequest(string subject, string? reply)
{
if (reply == null) return;
SendInternalMsg(reply, null, new { status = "ok" });
}
/// <summary>
/// Handles $SYS.REQ.SERVER.{id}.SUBSZ requests.
/// Returns the current subscription count.
/// </summary>
public void HandleSubszRequest(string subject, string? reply)
{
if (reply == null) return;
SendInternalMsg(reply, null, new { num_subscriptions = SubList.Count });
}
/// <summary>
/// Handles $SYS.REQ.SERVER.{id}.STATSZ requests.
/// Publishes current server statistics through the event system.
/// </summary>
public void HandleStatszRequest(string subject, string? reply)
{
if (reply == null) return;
var process = System.Diagnostics.Process.GetCurrentProcess();
var statsMsg = new Events.ServerStatsMsg
{
Server = BuildEventServerInfo(),
Stats = new Events.ServerStatsData
{
Start = StartTime,
Mem = process.WorkingSet64,
Cores = Environment.ProcessorCount,
Connections = ClientCount,
TotalConnections = Interlocked.Read(ref _stats.TotalConnections),
Subscriptions = SubList.Count,
InMsgs = Interlocked.Read(ref _stats.InMsgs),
OutMsgs = Interlocked.Read(ref _stats.OutMsgs),
InBytes = Interlocked.Read(ref _stats.InBytes),
OutBytes = Interlocked.Read(ref _stats.OutBytes),
SlowConsumers = Interlocked.Read(ref _stats.SlowConsumers),
},
};
SendInternalMsg(reply, null, statsMsg);
}
/// <summary>
/// Handles $SYS.REQ.SERVER.{id}.IDZ requests.
/// Returns basic server identity information.
/// </summary>
public void HandleIdzRequest(string subject, string? reply)
{
if (reply == null) return;
var idz = new
{
server_id = _serverInfo.ServerId,
server_name = _serverInfo.ServerName,
version = NatsProtocol.Version,
host = _options.Host,
port = _options.Port,
};
SendInternalMsg(reply, null, idz);
}
/// <summary>
/// Builds an EventServerInfo block for embedding in system event messages.
/// Maps to Go's serverInfo() helper used in events.go advisory publishing.
/// </summary>
public EventServerInfo BuildEventServerInfo()
{
var seq = _eventSystem?.NextSequence() ?? 0;
return new EventServerInfo
{
Name = _serverInfo.ServerName,
Host = _options.Host,
Id = _serverInfo.ServerId,
Version = NatsProtocol.Version,
Seq = seq,
};
}
private static EventClientInfo BuildEventClientInfo(NatsClient client)
{
return new EventClientInfo
{
Id = client.Id,
Host = client.RemoteIp,
Account = client.Account?.Name,
Name = client.ClientOpts?.Name,
Lang = client.ClientOpts?.Lang,
Version = client.ClientOpts?.Version,
Start = client.StartTime,
};
}
/// <summary>
/// Publishes a $SYS.ACCOUNT.{account}.CONNECT advisory when a client
/// completes authentication. Maps to Go's sendConnectEvent in events.go.
/// </summary>
public void PublishConnectEvent(NatsClient client)
{
if (_eventSystem == null) return;
var accountName = client.Account?.Name ?? Account.GlobalAccountName;
var subject = string.Format(EventSubjects.ConnectEvent, accountName);
var evt = new ConnectEventMsg
{
Id = Guid.NewGuid().ToString("N"),
Time = DateTime.UtcNow,
Server = BuildEventServerInfo(),
Client = BuildEventClientInfo(client),
};
SendInternalMsg(subject, null, evt);
}
/// <summary>
/// Publishes a $SYS.ACCOUNT.{account}.DISCONNECT advisory when a client
/// disconnects. Maps to Go's sendDisconnectEvent in events.go.
/// </summary>
public void PublishDisconnectEvent(NatsClient client)
{
if (_eventSystem == null) return;
var accountName = client.Account?.Name ?? Account.GlobalAccountName;
var subject = string.Format(EventSubjects.DisconnectEvent, accountName);
var evt = new DisconnectEventMsg
{
Id = Guid.NewGuid().ToString("N"),
Time = DateTime.UtcNow,
Server = BuildEventServerInfo(),
Client = BuildEventClientInfo(client),
Sent = new DataStats
{
Msgs = Interlocked.Read(ref client.OutMsgs),
Bytes = Interlocked.Read(ref client.OutBytes),
},
Received = new DataStats
{
Msgs = Interlocked.Read(ref client.InMsgs),
Bytes = Interlocked.Read(ref client.InBytes),
},
Reason = client.CloseReason.ToReasonString(),
};
SendInternalMsg(subject, null, evt);
}
public void RemoveClient(NatsClient client)
{
// Publish disconnect advisory before removing client state
if (client.ConnectReceived)
PublishDisconnectEvent(client);
_clients.TryRemove(client.Id, out _);
_logger.LogDebug("Removed client {ClientId}", client.Id);

View File

@@ -1,4 +1,5 @@
using NATS.Server;
using NATS.Server.Imports;
namespace NATS.Server.Subscriptions;
@@ -9,5 +10,7 @@ public sealed class Subscription
public required string Sid { get; init; }
public long MessageCount; // Interlocked
public long MaxMessages; // 0 = unlimited
public NatsClient? Client { get; set; }
public INatsClient? Client { get; set; }
public ServiceImport? ServiceImport { get; set; }
public StreamImport? StreamImport { get; set; }
}