Files
natsdotnet/Documentation/Server/Client.md
Joseph Doherty 539b2b7588 feat: add structured logging, Shouldly assertions, CPM, and project documentation
- Add Microsoft.Extensions.Logging + Serilog to NatsServer and NatsClient
- Convert all test assertions from xUnit Assert to Shouldly
- Add NSubstitute package for future mocking needs
- Introduce Central Package Management via Directory.Packages.props
- Add documentation_rules.md with style guide, generation/update rules, component map
- Generate 10 documentation files across 5 component folders (GettingStarted, Protocol, Subscriptions, Server, Configuration/Operations)
- Update CLAUDE.md with logging, testing, porting, agent model, CPM, and documentation guidance
2026-02-22 21:05:53 -05:00

14 KiB

Client Connection Handler

NatsClient manages the full lifecycle of one TCP connection: sending the initial INFO handshake, reading and parsing the incoming byte stream, dispatching protocol commands, and writing outbound messages. One NatsClient instance exists per accepted socket.

Key Concepts

Fields and properties

public sealed class NatsClient : IDisposable
{
    private readonly Socket _socket;
    private readonly NetworkStream _stream;
    private readonly NatsOptions _options;
    private readonly ServerInfo _serverInfo;
    private readonly NatsParser _parser;
    private readonly SemaphoreSlim _writeLock = new(1, 1);
    private readonly Dictionary<string, Subscription> _subs = new();
    private readonly ILogger _logger;

    public ulong Id { get; }
    public ClientOptions? ClientOpts { get; private set; }
    public IMessageRouter? Router { get; set; }
    public bool ConnectReceived { get; private set; }

    public long InMsgs;
    public long OutMsgs;
    public long InBytes;
    public long OutBytes;

    public IReadOnlyDictionary<string, Subscription> Subscriptions => _subs;
}

_writeLock is a SemaphoreSlim(1, 1) — a binary semaphore that serializes all writes to _stream. Without it, concurrent SendMessageAsync calls from different publisher threads would interleave bytes on the wire. See Write serialization below.

_subs maps subscription IDs (SIDs) to Subscription objects. SIDs are client-assigned strings; Dictionary<string, Subscription> gives O(1) lookup for UNSUB processing.

The four stat fields (InMsgs, OutMsgs, InBytes, OutBytes) are long fields accessed via Interlocked operations throughout the hot path. They are exposed as public fields rather than properties to allow Interlocked.Increment and Interlocked.Add directly by reference.

Router is set by NatsServer after construction, before RunAsync is called. It is typed as IMessageRouter? rather than NatsServer so that tests can substitute a stub.

Constructor

public NatsClient(ulong id, Socket socket, NatsOptions options, ServerInfo serverInfo, ILogger logger)
{
    Id = id;
    _socket = socket;
    _stream = new NetworkStream(socket, ownsSocket: false);
    _options = options;
    _serverInfo = serverInfo;
    _logger = logger;
    _parser = new NatsParser(options.MaxPayload);
}

NetworkStream is created with ownsSocket: false. This keeps socket lifetime management in NatsServer, which disposes the socket explicitly in Dispose. If ownsSocket were true, disposing the NetworkStream would close the socket, potentially racing with the disposal path in NatsServer.

NatsParser is constructed with MaxPayload from options. The parser enforces this limit: a payload larger than MaxPayload causes a ProtocolViolationException and terminates the connection.

Connection Lifecycle

RunAsync

RunAsync is the single entry point for a connection. NatsServer calls it as a fire-and-forget task.

public async Task RunAsync(CancellationToken ct)
{
    var pipe = new Pipe();
    try
    {
        await SendInfoAsync(ct);

        var fillTask = FillPipeAsync(pipe.Writer, ct);
        var processTask = ProcessCommandsAsync(pipe.Reader, ct);

        await Task.WhenAny(fillTask, processTask);
    }
    catch (OperationCanceledException) { }
    catch (Exception ex)
    {
        _logger.LogDebug(ex, "Client {ClientId} connection error", Id);
    }
    finally
    {
        Router?.RemoveClient(this);
    }
}

The method:

  1. Sends INFO {json}\r\n immediately on connect — required by the NATS protocol before the client sends CONNECT.
  2. Creates a System.IO.Pipelines.Pipe and starts two concurrent tasks: FillPipeAsync reads bytes from the socket into the pipe's write end; ProcessCommandsAsync reads from the pipe's read end and dispatches commands.
  3. Awaits Task.WhenAny. Either task completing signals the connection is done — either the socket closed (fill task returns) or a protocol error caused the process task to throw.
  4. In finally, calls Router?.RemoveClient(this) to clean up subscriptions and remove the client from the server's client dictionary.

Router?.RemoveClient(this) uses a null-conditional because Router could be null if the client is used in a test context without a server.

FillPipeAsync

private async Task FillPipeAsync(PipeWriter writer, CancellationToken ct)
{
    try
    {
        while (!ct.IsCancellationRequested)
        {
            var memory = writer.GetMemory(4096);
            int bytesRead = await _stream.ReadAsync(memory, ct);
            if (bytesRead == 0)
                break;

            writer.Advance(bytesRead);
            var result = await writer.FlushAsync(ct);
            if (result.IsCompleted)
                break;
        }
    }
    finally
    {
        await writer.CompleteAsync();
    }
}

writer.GetMemory(4096) requests at least 4096 bytes of buffer space from the pipe. The pipe may provide more. _stream.ReadAsync fills as many bytes as the OS delivers in one call. writer.Advance(bytesRead) commits those bytes. writer.FlushAsync makes them available to the reader.

When bytesRead is 0 the socket has closed. writer.CompleteAsync() in the finally block signals end-of-stream to the reader, which causes ProcessCommandsAsync to exit its loop on the next iteration.

ProcessCommandsAsync

private async Task ProcessCommandsAsync(PipeReader reader, CancellationToken ct)
{
    try
    {
        while (!ct.IsCancellationRequested)
        {
            var result = await reader.ReadAsync(ct);
            var buffer = result.Buffer;

            while (_parser.TryParse(ref buffer, out var cmd))
            {
                await DispatchCommandAsync(cmd, ct);
            }

            reader.AdvanceTo(buffer.Start, buffer.End);

            if (result.IsCompleted)
                break;
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

reader.ReadAsync returns a ReadResult containing a ReadOnlySequence<byte>. The inner while loop calls _parser.TryParse repeatedly, which slices buffer forward past each complete command. When TryParse returns false, not enough data is available for a complete command.

reader.AdvanceTo(buffer.Start, buffer.End) uses the two-argument form: buffer.Start (the consumed position — data before this is discarded) and buffer.End (the examined position — the pipe knows to wake this task when more data arrives beyond this point). This is the standard System.IO.Pipelines backpressure pattern.

Command Dispatch

DispatchCommandAsync switches on the CommandType returned by the parser:

private async ValueTask DispatchCommandAsync(ParsedCommand cmd, CancellationToken ct)
{
    switch (cmd.Type)
    {
        case CommandType.Connect:
            ProcessConnect(cmd);
            break;

        case CommandType.Ping:
            await WriteAsync(NatsProtocol.PongBytes, ct);
            break;

        case CommandType.Pong:
            // Update RTT tracking (placeholder)
            break;

        case CommandType.Sub:
            ProcessSub(cmd);
            break;

        case CommandType.Unsub:
            ProcessUnsub(cmd);
            break;

        case CommandType.Pub:
        case CommandType.HPub:
            ProcessPub(cmd);
            break;
    }
}

CONNECT

ProcessConnect deserializes the JSON payload into a ClientOptions record and sets ConnectReceived = true. ClientOptions carries the echo flag (default true), the client name, language, and version strings.

PING / PONG

PING is responded to immediately with the pre-allocated NatsProtocol.PongBytes ("PONG\r\n"). The response goes through WriteAsync, which acquires the write lock. PONG handling is currently a placeholder for future RTT tracking.

SUB

private void ProcessSub(ParsedCommand cmd)
{
    var sub = new Subscription
    {
        Subject = cmd.Subject!,
        Queue = cmd.Queue,
        Sid = cmd.Sid!,
    };

    _subs[cmd.Sid!] = sub;
    sub.Client = this;

    if (Router is ISubListAccess sl)
        sl.SubList.Insert(sub);
}

A Subscription is stored in _subs (keyed by SID) and inserted into the shared SubList trie. The Client back-reference on Subscription is set to this so that NatsServer.ProcessMessage can reach the client from the subscription without a separate lookup.

Router is ISubListAccess sl checks the interface at runtime. In production, Router is NatsServer, which implements both interfaces. In tests using a stub IMessageRouter that does not implement ISubListAccess, the insert is silently skipped.

UNSUB

private void ProcessUnsub(ParsedCommand cmd)
{
    if (!_subs.TryGetValue(cmd.Sid!, out var sub))
        return;

    if (cmd.MaxMessages > 0)
    {
        sub.MaxMessages = cmd.MaxMessages;
        return;
    }

    _subs.Remove(cmd.Sid!);

    if (Router is ISubListAccess sl)
        sl.SubList.Remove(sub);
}

UNSUB has two modes:

  • With max_msgs > 0: sets sub.MaxMessages to limit future deliveries. The subscription stays in the trie and the client's _subs dict. DeliverMessage in NatsServer checks MessageCount against MaxMessages on each delivery and silently drops messages beyond the limit.
  • Without max_msgs (or max_msgs == 0): removes the subscription immediately from both _subs and the SubList.

PUB and HPUB

private void ProcessPub(ParsedCommand cmd)
{
    Interlocked.Increment(ref InMsgs);
    Interlocked.Add(ref InBytes, cmd.Payload.Length);

    ReadOnlyMemory<byte> headers = default;
    ReadOnlyMemory<byte> payload = cmd.Payload;

    if (cmd.Type == CommandType.HPub && cmd.HeaderSize > 0)
    {
        headers = cmd.Payload[..cmd.HeaderSize];
        payload = cmd.Payload[cmd.HeaderSize..];
    }

    Router?.ProcessMessage(cmd.Subject!, cmd.ReplyTo, headers, payload, this);
}

Stats are updated before routing. For HPUB, the combined payload from the parser is split into a header slice and a body slice using cmd.HeaderSize. Both slices are ReadOnlyMemory<byte> views over the same backing array — no copy. Router.ProcessMessage then delivers to all matching subscribers.

Write Serialization

Multiple concurrent SendMessageAsync calls can arrive from different publisher connections at the same time. Without coordination, their writes would interleave on the socket and corrupt the message stream for the receiving client. _writeLock prevents this:

public async Task SendMessageAsync(string subject, string sid, string? replyTo,
    ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload, CancellationToken ct)
{
    Interlocked.Increment(ref OutMsgs);
    Interlocked.Add(ref OutBytes, payload.Length + headers.Length);

    byte[] line;
    if (headers.Length > 0)
    {
        int totalSize = headers.Length + payload.Length;
        line = Encoding.ASCII.GetBytes(
            $"HMSG {subject} {sid} {(replyTo != null ? replyTo + " " : "")}{headers.Length} {totalSize}\r\n");
    }
    else
    {
        line = Encoding.ASCII.GetBytes(
            $"MSG {subject} {sid} {(replyTo != null ? replyTo + " " : "")}{payload.Length}\r\n");
    }

    await _writeLock.WaitAsync(ct);
    try
    {
        await _stream.WriteAsync(line, ct);
        if (headers.Length > 0)
            await _stream.WriteAsync(headers, ct);
        if (payload.Length > 0)
            await _stream.WriteAsync(payload, ct);
        await _stream.WriteAsync(NatsProtocol.CrLf, ct);
        await _stream.FlushAsync(ct);
    }
    finally
    {
        _writeLock.Release();
    }
}

The control line is constructed before acquiring the lock so the string formatting work happens outside the critical section. Once the lock is held, all writes for one message — control line, optional headers, payload, and trailing \r\n — happen atomically from the perspective of other writers.

Stats (OutMsgs, OutBytes) are updated before the lock because they are independent of the write ordering constraint.

Subscription Cleanup

RemoveAllSubscriptions is called by NatsServer.RemoveClient when a connection ends:

public void RemoveAllSubscriptions(SubList subList)
{
    foreach (var sub in _subs.Values)
        subList.Remove(sub);
    _subs.Clear();
}

This removes every subscription this client holds from the shared SubList trie, then clears the local dictionary. After this call, no future ProcessMessage call will deliver to this client's subscriptions.

Dispose

public void Dispose()
{
    _stream.Dispose();
    _socket.Dispose();
    _writeLock.Dispose();
}

Disposing _stream closes the network stream. Disposing _socket closes the OS socket. Any in-flight ReadAsync or WriteAsync will fault with an ObjectDisposedException or IOException, which causes the read/write tasks to terminate. _writeLock is disposed last to release the SemaphoreSlim's internal handle.

Go Reference

The Go counterpart is golang/nats-server/server/client.go. Key differences in the .NET port:

  • Go uses separate goroutines for readLoop and writeLoop; the .NET port uses FillPipeAsync and ProcessCommandsAsync as concurrent Tasks sharing a Pipe.
  • Go uses dynamic buffer sizing (512 to 65536 bytes) in readLoop; the .NET port requests 4096-byte chunks from the PipeWriter.
  • Go uses a mutex for write serialization (c.mu); the .NET port uses SemaphoreSlim(1,1) to allow await-based waiting without blocking a thread.
  • The System.IO.Pipelines Pipe replaces Go's direct net.Conn reads. This separates the I/O pump from command parsing and avoids partial-read handling in the parser itself.