Files
natsdotnet/Documentation/Server/Overview.md
Joseph Doherty e553db6d40 docs: add Authentication, Clustering, JetStream, Monitoring overviews; update existing docs
New files:
- Documentation/Authentication/Overview.md — all 7 auth mechanisms with real source
  snippets (NKey/JWT/username-password/token/TLS mapping), nonce generation, account
  system, permissions, JWT permission templates
- Documentation/Clustering/Overview.md — route TCP handshake, in-process subscription
  propagation, gateway/leaf node stubs, honest gaps list
- Documentation/JetStream/Overview.md — API surface (4 handled subjects), streams,
  consumers, storage (MemStore/FileStore), in-process RAFT, mirror/source, gaps list
- Documentation/Monitoring/Overview.md — all 12 endpoints with real field tables,
  Go compatibility notes

Updated files:
- GettingStarted/Architecture.md — 14-subdirectory tree, real NatsClient/NatsServer
  field snippets, 9 new Go reference rows, Channel write queue design choice
- GettingStarted/Setup.md — xUnit 3, 100 test files grouped by area
- Operations/Overview.md — 99 test files, accurate Program.cs snippet, limitations
  section renamed to "Known Gaps vs Go Reference" with 7 real gaps
- Server/Overview.md — grouped fields, TLS/WS accept path, lame-duck mode, POSIX signals
- Configuration/Overview.md — 14 subsystem option tables, 24-row CLI table, LogOverrides
- Server/Client.md — Channel write queue, 4-task RunAsync, CommandMatrix, real fields

All docs verified against codebase 2026-02-23; 713 tests pass.
2026-02-23 10:14:18 -05:00

15 KiB

Server Overview

NatsServer is the top-level orchestrator: it binds the TCP listener, accepts incoming connections, and routes published messages to matching subscribers. Each connected client is managed by a NatsClient instance; NatsServer coordinates them through two interfaces that decouple message routing from connection management.

Key Concepts

Interfaces

NatsServer exposes two interfaces that NatsClient depends on:

public interface IMessageRouter
{
    void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory<byte> headers,
        ReadOnlyMemory<byte> payload, NatsClient sender);
    void RemoveClient(NatsClient client);
}

public interface ISubListAccess
{
    SubList SubList { get; }
}

IMessageRouter is the surface NatsClient calls when a PUB command arrives. ISubListAccess gives NatsClient access to the shared SubList so it can insert and remove subscriptions directly — without needing a concrete reference to NatsServer. Both interfaces are implemented by NatsServer, and both are injected into NatsClient through the Router property after construction.

Defining them separately makes unit testing straightforward: a test can supply a stub IMessageRouter without standing up a real server.

Fields and State

public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
{
    // Client registry
    private readonly ConcurrentDictionary<ulong, NatsClient> _clients = new();
    private readonly ConcurrentQueue<ClosedClient> _closedClients = new();
    private ulong _nextClientId;
    private int _activeClientCount;

    // Account system
    private readonly ConcurrentDictionary<string, Account> _accounts = new(StringComparer.Ordinal);
    private readonly Account _globalAccount;
    private readonly Account _systemAccount;
    private AuthService _authService;

    // Subsystem managers (null when not configured)
    private readonly RouteManager? _routeManager;
    private readonly GatewayManager? _gatewayManager;
    private readonly LeafNodeManager? _leafNodeManager;
    private readonly JetStreamService? _jetStreamService;
    private readonly JetStreamPublisher? _jetStreamPublisher;
    private MonitorServer? _monitorServer;

    // TLS / transport
    private readonly SslServerAuthenticationOptions? _sslOptions;
    private readonly TlsRateLimiter? _tlsRateLimiter;
    private Socket? _listener;
    private Socket? _wsListener;

    // Shutdown coordination
    private readonly CancellationTokenSource _quitCts = new();
    private readonly TaskCompletionSource _shutdownComplete = new(TaskCreationOptions.RunContinuationsAsynchronously);
    private readonly TaskCompletionSource _acceptLoopExited = new(TaskCreationOptions.RunContinuationsAsynchronously);
    private int _shutdown;
    private int _lameDuck;

    public SubList SubList => _globalAccount.SubList;
}

_clients tracks every live connection. _closedClients holds a capped ring of recently disconnected client snapshots (used by /connz). _nextClientId is incremented with Interlocked.Increment for each accepted socket, producing monotonically increasing client IDs without a lock. _loggerFactory is retained so per-client loggers can be created at accept time, each tagged with the client ID.

Each subsystem manager field (_routeManager, _gatewayManager, _leafNodeManager, _jetStreamService, _monitorServer) is null when the corresponding options section is absent from the configuration. Code that interacts with these managers always guards with a null check.

Constructor

The constructor takes NatsOptions and ILoggerFactory. It builds a ServerInfo struct that is sent to every connecting client in the initial INFO message:

public NatsServer(NatsOptions options, ILoggerFactory loggerFactory)
{
    _options = options;
    _loggerFactory = loggerFactory;
    _logger = loggerFactory.CreateLogger<NatsServer>();
    _serverInfo = new ServerInfo
    {
        ServerId = Guid.NewGuid().ToString("N")[..20].ToUpperInvariant(),
        ServerName = options.ServerName ?? $"nats-dotnet-{Environment.MachineName}",
        Version = NatsProtocol.Version,
        Host = options.Host,
        Port = options.Port,
        MaxPayload = options.MaxPayload,
    };
}

The ServerId is derived from a GUID — taking the first 20 characters of its "N" format (32 hex digits, no hyphens) and uppercasing them. This matches the fixed-length alphanumeric server ID format used by the Go server.

Subsystem managers are instantiated in the constructor if the corresponding options sections are non-null: options.Cluster != null creates a RouteManager, options.Gateway != null creates a GatewayManager, options.LeafNode != null creates a LeafNodeManager, and options.JetStream != null creates JetStreamService, JetStreamApiRouter, StreamManager, ConsumerManager, and JetStreamPublisher. TLS options are compiled into SslServerAuthenticationOptions via TlsHelper.BuildServerAuthOptions when options.HasTls is true.

Before entering the accept loop, StartAsync starts the monitoring server, WebSocket listener, route connections, gateway connections, leaf node listener, and JetStream service.

Accept Loop

StartAsync binds the socket, enables SO_REUSEADDR so the port can be reused immediately after a restart, and enters an async accept loop:

public async Task StartAsync(CancellationToken ct)
{
    _listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
    _listener.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
    _listener.Bind(new IPEndPoint(
        _options.Host == "0.0.0.0" ? IPAddress.Any : IPAddress.Parse(_options.Host),
        _options.Port));
    _listener.Listen(128);

    while (!ct.IsCancellationRequested)
    {
        var socket = await _listener.AcceptAsync(ct);
        var clientId = Interlocked.Increment(ref _nextClientId);

        var clientLogger = _loggerFactory.CreateLogger($"NATS.Server.NatsClient[{clientId}]");
        var client = new NatsClient(clientId, socket, _options, _serverInfo, clientLogger);
        client.Router = this;
        _clients[clientId] = client;

        _ = RunClientAsync(client, ct);
    }
}

RunClientAsync is fire-and-forget (_ = ...). The accept loop does not await it, so accepting new connections is not blocked by any single client's I/O. Each client runs concurrently on the thread pool.

The backlog of 128 passed to Listen controls the OS-level queue of unaccepted connections — matching the Go server default.

TLS wrapping and WebSocket upgrade

After AcceptAsync returns a socket, the connection is handed to AcceptClientAsync, which performs transport negotiation before constructing NatsClient:

private async Task AcceptClientAsync(Socket socket, ulong clientId, CancellationToken ct)
{
    if (_tlsRateLimiter != null)
        await _tlsRateLimiter.WaitAsync(ct);

    var networkStream = new NetworkStream(socket, ownsSocket: false);

    // TlsConnectionWrapper performs the TLS handshake if _sslOptions is set;
    // returns the raw NetworkStream unchanged when TLS is not configured.
    var (stream, infoAlreadySent) = await TlsConnectionWrapper.NegotiateAsync(
        socket, networkStream, _options, _sslOptions, _serverInfo,
        _loggerFactory.CreateLogger("NATS.Server.Tls"), ct);

    // ...auth nonce generation, TLS state extraction...

    var client = new NatsClient(clientId, stream, socket, _options, clientInfo,
        _authService, nonce, clientLogger, _stats);
    client.Router = this;
    client.TlsState = tlsState;
    client.InfoAlreadySent = infoAlreadySent;
    _clients[clientId] = client;
}

WebSocket connections follow a parallel path through AcceptWebSocketClientAsync. After optional TLS negotiation via TlsConnectionWrapper, the HTTP upgrade handshake is performed by WsUpgrade.TryUpgradeAsync. On success, the raw stream is wrapped in a WsConnection that handles WebSocket framing, masking, and per-message compression before NatsClient is constructed.

Message Routing

ProcessMessage is called by NatsClient for every PUB or HPUB command. It is the hot path: called once per published message.

public void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory<byte> headers,
    ReadOnlyMemory<byte> payload, NatsClient sender)
{
    var result = _subList.Match(subject);

    // Deliver to plain subscribers
    foreach (var sub in result.PlainSubs)
    {
        if (sub.Client == null || sub.Client == sender && !(sender.ClientOpts?.Echo ?? true))
            continue;

        DeliverMessage(sub, subject, replyTo, headers, payload);
    }

    // Deliver to one member of each queue group (round-robin)
    foreach (var queueGroup in result.QueueSubs)
    {
        if (queueGroup.Length == 0) continue;

        var idx = Math.Abs((int)Interlocked.Increment(ref sender.OutMsgs)) % queueGroup.Length;
        Interlocked.Decrement(ref sender.OutMsgs);

        for (int attempt = 0; attempt < queueGroup.Length; attempt++)
        {
            var sub = queueGroup[(idx + attempt) % queueGroup.Length];
            if (sub.Client != null && (sub.Client != sender || (sender.ClientOpts?.Echo ?? true)))
            {
                DeliverMessage(sub, subject, replyTo, headers, payload);
                break;
            }
        }
    }
}

Plain subscriber delivery

Each subscription in result.PlainSubs receives the message unless:

  • sub.Client is null (the subscription was removed concurrently), or
  • the subscriber is the sender and the sender has echo: false in its CONNECT options.

The echo flag defaults to true in ClientOptions, so publishers receive their own messages unless they explicitly opt out.

Queue group delivery

Queue groups provide load-balanced fan-out: exactly one member of each group receives each message. The selection uses a round-robin counter derived from sender.OutMsgs. An Interlocked.Increment picks the starting index; the Interlocked.Decrement immediately after undoes the side effect on the stat, since OutMsgs will be incremented correctly inside SendMessageAsync when the message is actually dispatched.

The loop walks from the selected index, wrapping around, until it finds an eligible member (non-null client, echo check). This handles stale subscriptions where the client has disconnected but the subscription object has not yet been cleaned up.

DeliverMessage and auto-unsub

private static void DeliverMessage(Subscription sub, string subject, string? replyTo,
    ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload)
{
    var client = sub.Client;
    if (client == null) return;

    var count = Interlocked.Increment(ref sub.MessageCount);
    if (sub.MaxMessages > 0 && count > sub.MaxMessages)
        return;

    _ = client.SendMessageAsync(subject, sub.Sid, replyTo, headers, payload, CancellationToken.None);
}

MessageCount is incremented atomically before the send. If it exceeds MaxMessages (set by an UNSUB with a message count argument), the subscription is removed from the trie immediately (subList.Remove(sub)) and from the client's tracking table (client.RemoveSubscription(sub.Sid)), then the message is dropped without delivery.

SendMessage enqueues the serialized wire bytes on the client's outbound channel. Multiple deliveries to different clients happen concurrently.

After local delivery, ProcessMessage forwards to the JetStream publisher first: if the subject matches a configured stream, TryCaptureJetStreamPublish stores the message and the PubAck is sent back to the publisher via sender.RecordJetStreamPubAck. Route forwarding is handled separately by OnLocalSubscription, which calls _routeManager?.PropagateLocalSubscription when a new subscription is added — keeping remote peers informed of local interest without re-routing individual messages inside ProcessMessage.

Client Removal

public void RemoveClient(NatsClient client)
{
    _clients.TryRemove(client.Id, out _);
    client.RemoveAllSubscriptions(_subList);
}

RemoveClient is called either from RunClientAsync's finally block (after a client disconnects or errors) or from NatsClient.RunAsync's own finally block. Both paths may call it; TryRemove is idempotent, so double-calls are safe. After removal from _clients, all subscriptions belonging to that client are purged from the SubList trie and its internal cache.

Shutdown and Dispose

Graceful shutdown is initiated by ShutdownAsync. It uses _quitCts — a CancellationTokenSource shared between StartAsync and all subsystem managers — to signal all internal loops to stop:

public async Task ShutdownAsync()
{
    if (Interlocked.CompareExchange(ref _shutdown, 1, 0) != 0)
        return; // Already shutting down

    // Signal all internal loops to stop
    await _quitCts.CancelAsync();

    // Close listeners to stop accept loops
    _listener?.Close();
    _wsListener?.Close();
    if (_routeManager != null) await _routeManager.DisposeAsync();
    if (_gatewayManager != null) await _gatewayManager.DisposeAsync();
    if (_leafNodeManager != null) await _leafNodeManager.DisposeAsync();
    if (_jetStreamService != null) await _jetStreamService.DisposeAsync();

    // Wait for accept loops to exit, flush and close clients, drain active tasks...
    if (_monitorServer != null) await _monitorServer.DisposeAsync();
    _shutdownComplete.TrySetResult();
}

Lame-duck mode is a two-phase variant initiated by LameDuckShutdownAsync. The _lameDuck field (checked via IsLameDuckMode) is set first, which stops the accept loops from receiving new connections while existing clients are given a grace period (options.LameDuckGracePeriod) to disconnect naturally. After the grace period, remaining clients are stagger-closed over options.LameDuckDuration to avoid a thundering herd of reconnects, then ShutdownAsync completes the teardown.

Dispose is a synchronous fallback. If ShutdownAsync has not already run, it blocks on it. It then disposes _quitCts, _tlsRateLimiter, the listener sockets, all subsystem managers (route, gateway, leaf node, JetStream), all connected clients, and all accounts. PosixSignalRegistrations are also disposed, deregistering the signal handlers.

Go Reference

The Go counterpart is golang/nats-server/server/server.go. Key differences in the .NET port:

  • Go uses goroutines for the accept loop and per-client read/write loops; the .NET port uses async/await with Task.
  • Go uses sync/atomic for client ID generation; the .NET port uses Interlocked.Increment.
  • Go passes the server to clients via the srv field on the client struct; the .NET port uses the IMessageRouter interface through the Router property.
  • POSIX signal handlers — SIGTERM/SIGQUIT for shutdown, SIGHUP for config reload, SIGUSR1 for log file reopen, SIGUSR2 for lame-duck mode — are registered in HandleSignals via PosixSignalRegistration.Create. SIGUSR1 and SIGUSR2 are skipped on Windows. Registrations are stored in _signalRegistrations and disposed during Dispose.