New files: - Documentation/Authentication/Overview.md — all 7 auth mechanisms with real source snippets (NKey/JWT/username-password/token/TLS mapping), nonce generation, account system, permissions, JWT permission templates - Documentation/Clustering/Overview.md — route TCP handshake, in-process subscription propagation, gateway/leaf node stubs, honest gaps list - Documentation/JetStream/Overview.md — API surface (4 handled subjects), streams, consumers, storage (MemStore/FileStore), in-process RAFT, mirror/source, gaps list - Documentation/Monitoring/Overview.md — all 12 endpoints with real field tables, Go compatibility notes Updated files: - GettingStarted/Architecture.md — 14-subdirectory tree, real NatsClient/NatsServer field snippets, 9 new Go reference rows, Channel write queue design choice - GettingStarted/Setup.md — xUnit 3, 100 test files grouped by area - Operations/Overview.md — 99 test files, accurate Program.cs snippet, limitations section renamed to "Known Gaps vs Go Reference" with 7 real gaps - Server/Overview.md — grouped fields, TLS/WS accept path, lame-duck mode, POSIX signals - Configuration/Overview.md — 14 subsystem option tables, 24-row CLI table, LogOverrides - Server/Client.md — Channel write queue, 4-task RunAsync, CommandMatrix, real fields All docs verified against codebase 2026-02-23; 713 tests pass.
21 KiB
Client Connection Handler
NatsClient manages the full lifecycle of one TCP connection: sending the initial INFO handshake, reading and parsing the incoming byte stream, dispatching protocol commands, and writing outbound messages. One NatsClient instance exists per accepted socket.
Key Concepts
Fields and properties
public sealed class NatsClient : INatsClient, IDisposable
{
private static readonly ClientCommandMatrix CommandMatrix = new();
private readonly Socket _socket;
private readonly Stream _stream;
private readonly NatsOptions _options;
private readonly AuthService _authService;
private readonly NatsParser _parser;
private readonly Channel<ReadOnlyMemory<byte>> _outbound = Channel.CreateBounded<ReadOnlyMemory<byte>>(
new BoundedChannelOptions(8192) { SingleReader = true, FullMode = BoundedChannelFullMode.Wait });
private long _pendingBytes;
private CancellationTokenSource? _clientCts;
private readonly Dictionary<string, Subscription> _subs = new();
private readonly ILogger _logger;
private ClientPermissions? _permissions;
private readonly ServerStats _serverStats;
public ulong Id { get; }
public ClientKind Kind { get; }
public ClientOptions? ClientOpts { get; private set; }
public IMessageRouter? Router { get; set; }
public Account? Account { get; private set; }
public DateTime StartTime { get; }
private readonly ClientFlagHolder _flags = new();
public bool ConnectReceived => _flags.HasFlag(ClientFlags.ConnectReceived);
public ClientClosedReason CloseReason { get; private set; }
}
_stream is typed as Stream rather than NetworkStream because the server passes in a pre-wrapped stream: plain NetworkStream for unencrypted connections, SslStream for TLS, or a WebSocket framing adapter. NatsClient does not know or care which transport is underneath.
_outbound is a bounded Channel<ReadOnlyMemory<byte>>(8192) with SingleReader = true and FullMode = BoundedChannelFullMode.Wait. The channel is the sole path for all outbound frames. Slow consumer detection uses _pendingBytes — an Interlocked-maintained counter of bytes queued but not yet flushed — checked against _options.MaxPending in QueueOutbound. See Write Serialization below.
_flags is a ClientFlagHolder (a thin wrapper around an int with atomic bit operations). Protocol-level boolean state — ConnectReceived, CloseConnection, IsSlowConsumer, TraceMode, and others — is stored as flag bits rather than separate fields, keeping the state machine manipulation thread-safe without separate locks.
_subs maps subscription IDs (SIDs) to Subscription objects. SIDs are client-assigned strings; Dictionary<string, Subscription> gives O(1) lookup for UNSUB processing.
The four stat fields (InMsgs, OutMsgs, InBytes, OutBytes) are long fields accessed via Interlocked operations throughout the hot path. They are exposed as public fields rather than properties to allow Interlocked.Increment and Interlocked.Add directly by reference.
Router is set by NatsServer after construction, before RunAsync is called. It is typed as IMessageRouter? rather than NatsServer so that tests can substitute a stub.
Constructor
public NatsClient(ulong id, Stream stream, Socket socket, NatsOptions options, ServerInfo serverInfo,
AuthService authService, byte[]? nonce, ILogger logger, ServerStats serverStats,
ClientKind kind = ClientKind.Client)
{
Id = id;
Kind = kind;
_socket = socket;
_stream = stream;
_options = options;
_authService = authService;
_logger = logger;
_serverStats = serverStats;
_parser = new NatsParser(options.MaxPayload, options.Trace ? logger : null);
StartTime = DateTime.UtcNow;
}
The stream parameter is passed in by NatsServer already wrapped for the appropriate transport. For a plain TCP connection it is a NetworkStream; after a TLS handshake it is an SslStream; for WebSocket connections it is a WebSocket framing adapter. NatsClient writes to Stream throughout and is unaware of which transport is underneath.
authService is the shared AuthService instance. NatsClient calls authService.IsAuthRequired and authService.Authenticate(context) during CONNECT processing rather than performing authentication checks inline. serverStats is a shared ServerStats struct updated via Interlocked operations on the hot path (message counts, slow consumer counts, stale connections).
byte[]? nonce carries a pre-generated challenge value for NKey authentication. When non-null, it is embedded in the INFO payload sent to the client. After ProcessConnectAsync completes, the nonce is zeroed via CryptographicOperations.ZeroMemory as a defense-in-depth measure.
NatsParser is constructed with MaxPayload from options. The parser enforces this limit: a payload larger than MaxPayload causes the connection to be closed with ClientClosedReason.MaxPayloadExceeded.
Connection Lifecycle
RunAsync
RunAsync is the single entry point for a connection. NatsServer calls it as a fire-and-forget task.
public async Task RunAsync(CancellationToken ct)
{
_clientCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
var pipe = new Pipe();
try
{
if (!InfoAlreadySent)
SendInfo();
var fillTask = FillPipeAsync(pipe.Writer, _clientCts.Token);
var processTask = ProcessCommandsAsync(pipe.Reader, _clientCts.Token);
var pingTask = RunPingTimerAsync(_clientCts.Token);
var writeTask = RunWriteLoopAsync(_clientCts.Token);
await Task.WhenAny(fillTask, processTask, pingTask, writeTask);
}
catch (OperationCanceledException)
{
MarkClosed(ClientClosedReason.ServerShutdown);
}
finally
{
MarkClosed(ClientClosedReason.ClientClosed);
_outbound.Writer.TryComplete();
Router?.RemoveClient(this);
}
}
The method:
- Creates
_clientCtsas aCancellationTokenSource.CreateLinkedTokenSource(ct). This gives the client its own cancellation scope linked to the server-wide token.CloseWithReasonAsynccancels_clientCtsto tear down only this connection without affecting the rest of the server. - Calls
SendInfo()unlessInfoAlreadySentis set — TLS negotiation sends INFO before handing theSslStreamtoRunAsync, so the flag prevents a duplicate INFO on TLS connections. - Starts four concurrent tasks using
_clientCts.Token:FillPipeAsync— reads bytes from_streaminto the pipe's write end.ProcessCommandsAsync— reads from the pipe's read end and dispatches commands.RunPingTimerAsync— sends periodic PING frames and enforces stale-connection detection via_options.MaxPingsOut.RunWriteLoopAsync— drains_outboundchannel frames and writes them to_stream.
- Awaits
Task.WhenAny. Any task completing signals the connection is ending — the socket closed, a protocol error was detected, or the server is shutting down. - In
finally, callsMarkClosed(ClientClosedReason.ClientClosed)(first-write-wins; earlier calls from error paths set the actual reason), completes the outbound channel writer soRunWriteLoopAsynccan drain and exit, then callsRouter?.RemoveClient(this)to remove subscriptions and deregister the client.
CloseWithReasonAsync(reason, errMessage) is the coordinated close path used by protocol violations and slow consumer detection. It sets CloseReason, optionally queues a -ERR frame, marks the CloseConnection flag, completes the channel writer, waits 50 ms for the write loop to flush the error frame, then cancels _clientCts. MarkClosed(reason) is the lighter first-writer-wins setter used by the RunAsync catch blocks.
Router?.RemoveClient(this) uses a null-conditional because Router could be null if the client is used in a test context without a server.
FillPipeAsync
private async Task FillPipeAsync(PipeWriter writer, CancellationToken ct)
{
try
{
while (!ct.IsCancellationRequested)
{
var memory = writer.GetMemory(4096);
int bytesRead = await _stream.ReadAsync(memory, ct);
if (bytesRead == 0)
break;
writer.Advance(bytesRead);
var result = await writer.FlushAsync(ct);
if (result.IsCompleted)
break;
}
}
finally
{
await writer.CompleteAsync();
}
}
writer.GetMemory(4096) requests at least 4096 bytes of buffer space from the pipe. The pipe may provide more. _stream.ReadAsync fills as many bytes as the OS delivers in one call. writer.Advance(bytesRead) commits those bytes. writer.FlushAsync makes them available to the reader.
When bytesRead is 0 the socket has closed. writer.CompleteAsync() in the finally block signals end-of-stream to the reader, which causes ProcessCommandsAsync to exit its loop on the next iteration.
ProcessCommandsAsync
private async Task ProcessCommandsAsync(PipeReader reader, CancellationToken ct)
{
try
{
while (!ct.IsCancellationRequested)
{
var result = await reader.ReadAsync(ct);
var buffer = result.Buffer;
while (_parser.TryParse(ref buffer, out var cmd))
{
await DispatchCommandAsync(cmd, ct);
}
reader.AdvanceTo(buffer.Start, buffer.End);
if (result.IsCompleted)
break;
}
}
finally
{
await reader.CompleteAsync();
}
}
reader.ReadAsync returns a ReadResult containing a ReadOnlySequence<byte>. The inner while loop calls _parser.TryParse repeatedly, which slices buffer forward past each complete command. When TryParse returns false, not enough data is available for a complete command.
reader.AdvanceTo(buffer.Start, buffer.End) uses the two-argument form: buffer.Start (the consumed position — data before this is discarded) and buffer.End (the examined position — the pipe knows to wake this task when more data arrives beyond this point). This is the standard System.IO.Pipelines backpressure pattern.
Command Dispatch
DispatchCommandAsync first consults CommandMatrix to verify the command is permitted for this client's Kind, then dispatches by CommandType:
private async ValueTask DispatchCommandAsync(ParsedCommand cmd, CancellationToken ct)
{
Interlocked.Exchange(ref _lastActivityTicks, DateTime.UtcNow.Ticks);
if (!CommandMatrix.IsAllowed(Kind, cmd.Operation))
{
await SendErrAndCloseAsync("Parser Error");
return;
}
switch (cmd.Type)
{
case CommandType.Connect:
await ProcessConnectAsync(cmd);
break;
case CommandType.Ping:
WriteProtocol(NatsProtocol.PongBytes);
break;
case CommandType.Pong:
Interlocked.Exchange(ref _pingsOut, 0);
Interlocked.Exchange(ref _rtt, DateTime.UtcNow.Ticks - Interlocked.Read(ref _rttStartTicks));
_flags.SetFlag(ClientFlags.FirstPongSent);
break;
case CommandType.Sub:
ProcessSub(cmd);
break;
case CommandType.Unsub:
ProcessUnsub(cmd);
break;
}
}
ClientCommandMatrix is a static lookup table keyed by ClientKind. Each ClientKind has an allowed set of CommandType values. Kind.Client accepts the standard client command set (CONNECT, PING, PONG, SUB, UNSUB, PUB, HPUB). Router-kind clients additionally accept RS+ and RS- subscription propagation messages used for cluster route subscription exchange. If a command is not allowed for the current kind, the connection is closed with Parser Error.
Every command dispatch updates _lastActivityTicks via Interlocked.Exchange. The ping timer in RunPingTimerAsync reads _lastIn (updated on every received byte batch) to decide whether the client was recently active; _lastActivityTicks is the higher-level timestamp exposed as LastActivity on the public interface.
CONNECT
ProcessConnect deserializes the JSON payload into a ClientOptions record and sets ConnectReceived = true. ClientOptions carries the echo flag (default true), the client name, language, and version strings.
PING / PONG
PING is responded to immediately with the pre-allocated NatsProtocol.PongBytes ("PONG\r\n") via WriteProtocol, which calls QueueOutbound. PONG resets _pingsOut to 0 (preventing stale-connection closure), records RTT by comparing the current tick count against _rttStartTicks set when the PING was sent, and sets the ClientFlags.FirstPongSent flag to unblock the initial ping timer delay.
SUB
private void ProcessSub(ParsedCommand cmd)
{
var sub = new Subscription
{
Subject = cmd.Subject!,
Queue = cmd.Queue,
Sid = cmd.Sid!,
};
_subs[cmd.Sid!] = sub;
sub.Client = this;
if (Router is ISubListAccess sl)
sl.SubList.Insert(sub);
}
A Subscription is stored in _subs (keyed by SID) and inserted into the shared SubList trie. The Client back-reference on Subscription is set to this so that NatsServer.ProcessMessage can reach the client from the subscription without a separate lookup.
Router is ISubListAccess sl checks the interface at runtime. In production, Router is NatsServer, which implements both interfaces. In tests using a stub IMessageRouter that does not implement ISubListAccess, the insert is silently skipped.
UNSUB
private void ProcessUnsub(ParsedCommand cmd)
{
if (!_subs.TryGetValue(cmd.Sid!, out var sub))
return;
if (cmd.MaxMessages > 0)
{
sub.MaxMessages = cmd.MaxMessages;
return;
}
_subs.Remove(cmd.Sid!);
if (Router is ISubListAccess sl)
sl.SubList.Remove(sub);
}
UNSUB has two modes:
- With
max_msgs > 0: setssub.MaxMessagesto limit future deliveries. The subscription stays in the trie and the client's_subsdict.DeliverMessageinNatsServerchecksMessageCountagainstMaxMessageson each delivery and silently drops messages beyond the limit. - Without
max_msgs(ormax_msgs == 0): removes the subscription immediately from both_subsand theSubList.
PUB and HPUB
private void ProcessPub(ParsedCommand cmd)
{
Interlocked.Increment(ref InMsgs);
Interlocked.Add(ref InBytes, cmd.Payload.Length);
ReadOnlyMemory<byte> headers = default;
ReadOnlyMemory<byte> payload = cmd.Payload;
if (cmd.Type == CommandType.HPub && cmd.HeaderSize > 0)
{
headers = cmd.Payload[..cmd.HeaderSize];
payload = cmd.Payload[cmd.HeaderSize..];
}
Router?.ProcessMessage(cmd.Subject!, cmd.ReplyTo, headers, payload, this);
}
Stats are updated before routing. For HPUB, the combined payload from the parser is split into a header slice and a body slice using cmd.HeaderSize. Both slices are ReadOnlyMemory<byte> views over the same backing array — no copy. Router.ProcessMessage then delivers to all matching subscribers.
Write Serialization
All outbound frames flow through a bounded Channel<ReadOnlyMemory<byte>> named _outbound. The channel has a capacity of 8192 entries, SingleReader = true, and FullMode = BoundedChannelFullMode.Wait. Every caller that wants to send bytes — protocol responses, MSG deliveries, PING frames, INFO, ERR — calls QueueOutbound(data), which performs two checks before writing to the channel:
public bool QueueOutbound(ReadOnlyMemory<byte> data)
{
if (_flags.HasFlag(ClientFlags.CloseConnection))
return false;
var pending = Interlocked.Add(ref _pendingBytes, data.Length);
if (pending > _options.MaxPending)
{
Interlocked.Add(ref _pendingBytes, -data.Length);
_flags.SetFlag(ClientFlags.IsSlowConsumer);
Interlocked.Increment(ref _serverStats.SlowConsumers);
_ = CloseWithReasonAsync(ClientClosedReason.SlowConsumerPendingBytes, NatsProtocol.ErrSlowConsumer);
return false;
}
if (!_outbound.Writer.TryWrite(data))
{
// Channel is full (all 8192 slots taken) -- slow consumer
_flags.SetFlag(ClientFlags.IsSlowConsumer);
_ = CloseWithReasonAsync(ClientClosedReason.SlowConsumerPendingBytes, NatsProtocol.ErrSlowConsumer);
return false;
}
return true;
}
_pendingBytes is an Interlocked-maintained counter. When it exceeds _options.MaxPending, the client is classified as a slow consumer and CloseWithReasonAsync is called. If TryWrite fails (all 8192 channel slots are occupied), the same slow consumer path fires. In either case the connection is closed with -ERR 'Slow Consumer'.
RunWriteLoopAsync is the sole reader of the channel, running as one of the four concurrent tasks in RunAsync:
private async Task RunWriteLoopAsync(CancellationToken ct)
{
var reader = _outbound.Reader;
while (await reader.WaitToReadAsync(ct))
{
long batchBytes = 0;
while (reader.TryRead(out var data))
{
await _stream.WriteAsync(data, ct);
batchBytes += data.Length;
}
using var flushCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
flushCts.CancelAfter(_options.WriteDeadline);
try
{
await _stream.FlushAsync(flushCts.Token);
}
catch (OperationCanceledException) when (!ct.IsCancellationRequested)
{
// Flush timed out -- slow consumer on the write side
await CloseWithReasonAsync(ClientClosedReason.SlowConsumerWriteDeadline, NatsProtocol.ErrSlowConsumer);
return;
}
Interlocked.Add(ref _pendingBytes, -batchBytes);
}
}
WaitToReadAsync yields until at least one frame is available. The inner TryRead loop drains as many frames as are available without yielding, batching them into a single FlushAsync. This amortizes the flush cost over multiple frames when the client is keeping up. After the flush, _pendingBytes is decremented by the batch size.
If FlushAsync does not complete within _options.WriteDeadline, the write-deadline slow consumer path fires. WriteDeadline is distinct from MaxPending: MaxPending catches a client whose channel is backing up due to slow reads; WriteDeadline catches a client whose OS socket send buffer is stalled (e.g. the TCP window is closed).
Subscription Cleanup
RemoveAllSubscriptions is called by NatsServer.RemoveClient when a connection ends:
public void RemoveAllSubscriptions(SubList subList)
{
foreach (var sub in _subs.Values)
subList.Remove(sub);
_subs.Clear();
}
This removes every subscription this client holds from the shared SubList trie, then clears the local dictionary. After this call, no future ProcessMessage call will deliver to this client's subscriptions.
Dispose
public void Dispose()
{
_permissions?.Dispose();
_outbound.Writer.TryComplete();
_clientCts?.Dispose();
_stream.Dispose();
_socket.Dispose();
}
_outbound.Writer.TryComplete() is called before disposing the stream so that RunWriteLoopAsync can observe channel completion and exit cleanly rather than faulting on a disposed stream. _clientCts is disposed to release the linked token registration. Disposing _stream and _socket closes the underlying transport; any in-flight ReadAsync or WriteAsync will fault with an ObjectDisposedException or IOException, which causes the remaining tasks to terminate.
Go Reference
The Go counterpart is golang/nats-server/server/client.go. Key differences in the .NET port:
- Go uses separate goroutines for
readLoopandwriteLoop; the .NET port usesFillPipeAsync,ProcessCommandsAsync,RunPingTimerAsync, andRunWriteLoopAsyncas four concurrentTasks all linked to_clientCts. - Go uses dynamic buffer sizing (512 to 65536 bytes) in
readLoop; the .NET port requests 4096-byte chunks from thePipeWriter. - Go uses a static per-client read buffer; the .NET port uses
System.IO.Pipelinesfor zero-copy parsing. The pipe separates the I/O pump from command parsing, avoids partial-read handling in the parser, and allows thePipeReaderbackpressure mechanism to control how much data is buffered between fill and process. - Go's
flushOutbound()batches queued writes and flushes them underc.mu; the .NET port uses a boundedChannel<ReadOnlyMemory<byte>>(8192)write queue with a_pendingBytescounter for backpressure.RunWriteLoopAsyncis the sole reader: it drains all available frames in one batch and callsFlushAsynconce per batch, with aWriteDeadlinetimeout to detect stale write-side connections. - Go uses
c.mu(a sync.Mutex) for write serialization; the .NET port eliminates the write lock entirely —RunWriteLoopAsyncis the only goroutine that writes to_stream, so no locking is required on the write path.