# Client Connection Handler `NatsClient` manages the full lifecycle of one TCP connection: sending the initial INFO handshake, reading and parsing the incoming byte stream, dispatching protocol commands, and writing outbound messages. One `NatsClient` instance exists per accepted socket. ## Key Concepts ### Fields and properties ```csharp public sealed class NatsClient : IDisposable { private readonly Socket _socket; private readonly NetworkStream _stream; private readonly NatsOptions _options; private readonly ServerInfo _serverInfo; private readonly NatsParser _parser; private readonly SemaphoreSlim _writeLock = new(1, 1); private readonly Dictionary _subs = new(); private readonly ILogger _logger; public ulong Id { get; } public ClientOptions? ClientOpts { get; private set; } public IMessageRouter? Router { get; set; } public bool ConnectReceived { get; private set; } public long InMsgs; public long OutMsgs; public long InBytes; public long OutBytes; public IReadOnlyDictionary Subscriptions => _subs; } ``` `_writeLock` is a `SemaphoreSlim(1, 1)` — a binary semaphore that serializes all writes to `_stream`. Without it, concurrent `SendMessageAsync` calls from different publisher threads would interleave bytes on the wire. See [Write serialization](#write-serialization) below. `_subs` maps subscription IDs (SIDs) to `Subscription` objects. SIDs are client-assigned strings; `Dictionary` gives O(1) lookup for UNSUB processing. The four stat fields (`InMsgs`, `OutMsgs`, `InBytes`, `OutBytes`) are `long` fields accessed via `Interlocked` operations throughout the hot path. They are exposed as public fields rather than properties to allow `Interlocked.Increment` and `Interlocked.Add` directly by reference. `Router` is set by `NatsServer` after construction, before `RunAsync` is called. It is typed as `IMessageRouter?` rather than `NatsServer` so that tests can substitute a stub. ### Constructor ```csharp public NatsClient(ulong id, Socket socket, NatsOptions options, ServerInfo serverInfo, ILogger logger) { Id = id; _socket = socket; _stream = new NetworkStream(socket, ownsSocket: false); _options = options; _serverInfo = serverInfo; _logger = logger; _parser = new NatsParser(options.MaxPayload); } ``` `NetworkStream` is created with `ownsSocket: false`. This keeps socket lifetime management in `NatsServer`, which disposes the socket explicitly in `Dispose`. If `ownsSocket` were `true`, disposing the `NetworkStream` would close the socket, potentially racing with the disposal path in `NatsServer`. `NatsParser` is constructed with `MaxPayload` from options. The parser enforces this limit: a payload larger than `MaxPayload` causes a `ProtocolViolationException` and terminates the connection. ## Connection Lifecycle ### RunAsync `RunAsync` is the single entry point for a connection. `NatsServer` calls it as a fire-and-forget task. ```csharp public async Task RunAsync(CancellationToken ct) { var pipe = new Pipe(); try { await SendInfoAsync(ct); var fillTask = FillPipeAsync(pipe.Writer, ct); var processTask = ProcessCommandsAsync(pipe.Reader, ct); await Task.WhenAny(fillTask, processTask); } catch (OperationCanceledException) { } catch (Exception ex) { _logger.LogDebug(ex, "Client {ClientId} connection error", Id); } finally { Router?.RemoveClient(this); } } ``` The method: 1. Sends `INFO {json}\r\n` immediately on connect — required by the NATS protocol before the client sends CONNECT. 2. Creates a `System.IO.Pipelines.Pipe` and starts two concurrent tasks: `FillPipeAsync` reads bytes from the socket into the pipe's write end; `ProcessCommandsAsync` reads from the pipe's read end and dispatches commands. 3. Awaits `Task.WhenAny`. Either task completing signals the connection is done — either the socket closed (fill task returns) or a protocol error caused the process task to throw. 4. In `finally`, calls `Router?.RemoveClient(this)` to clean up subscriptions and remove the client from the server's client dictionary. `Router?.RemoveClient(this)` uses a null-conditional because `Router` could be null if the client is used in a test context without a server. ### FillPipeAsync ```csharp private async Task FillPipeAsync(PipeWriter writer, CancellationToken ct) { try { while (!ct.IsCancellationRequested) { var memory = writer.GetMemory(4096); int bytesRead = await _stream.ReadAsync(memory, ct); if (bytesRead == 0) break; writer.Advance(bytesRead); var result = await writer.FlushAsync(ct); if (result.IsCompleted) break; } } finally { await writer.CompleteAsync(); } } ``` `writer.GetMemory(4096)` requests at least 4096 bytes of buffer space from the pipe. The pipe may provide more. `_stream.ReadAsync` fills as many bytes as the OS delivers in one call. `writer.Advance(bytesRead)` commits those bytes. `writer.FlushAsync` makes them available to the reader. When `bytesRead` is 0 the socket has closed. `writer.CompleteAsync()` in the `finally` block signals end-of-stream to the reader, which causes `ProcessCommandsAsync` to exit its loop on the next iteration. ### ProcessCommandsAsync ```csharp private async Task ProcessCommandsAsync(PipeReader reader, CancellationToken ct) { try { while (!ct.IsCancellationRequested) { var result = await reader.ReadAsync(ct); var buffer = result.Buffer; while (_parser.TryParse(ref buffer, out var cmd)) { await DispatchCommandAsync(cmd, ct); } reader.AdvanceTo(buffer.Start, buffer.End); if (result.IsCompleted) break; } } finally { await reader.CompleteAsync(); } } ``` `reader.ReadAsync` returns a `ReadResult` containing a `ReadOnlySequence`. The inner `while` loop calls `_parser.TryParse` repeatedly, which slices `buffer` forward past each complete command. When `TryParse` returns `false`, not enough data is available for a complete command. `reader.AdvanceTo(buffer.Start, buffer.End)` uses the two-argument form: `buffer.Start` (the consumed position — data before this is discarded) and `buffer.End` (the examined position — the pipe knows to wake this task when more data arrives beyond this point). This is the standard `System.IO.Pipelines` backpressure pattern. ## Command Dispatch `DispatchCommandAsync` switches on the `CommandType` returned by the parser: ```csharp private async ValueTask DispatchCommandAsync(ParsedCommand cmd, CancellationToken ct) { switch (cmd.Type) { case CommandType.Connect: ProcessConnect(cmd); break; case CommandType.Ping: await WriteAsync(NatsProtocol.PongBytes, ct); break; case CommandType.Pong: // Update RTT tracking (placeholder) break; case CommandType.Sub: ProcessSub(cmd); break; case CommandType.Unsub: ProcessUnsub(cmd); break; case CommandType.Pub: case CommandType.HPub: ProcessPub(cmd); break; } } ``` ### CONNECT `ProcessConnect` deserializes the JSON payload into a `ClientOptions` record and sets `ConnectReceived = true`. `ClientOptions` carries the `echo` flag (default `true`), the client name, language, and version strings. ### PING / PONG PING is responded to immediately with the pre-allocated `NatsProtocol.PongBytes` (`"PONG\r\n"`). The response goes through `WriteAsync`, which acquires the write lock. PONG handling is currently a placeholder for future RTT tracking. ### SUB ```csharp private void ProcessSub(ParsedCommand cmd) { var sub = new Subscription { Subject = cmd.Subject!, Queue = cmd.Queue, Sid = cmd.Sid!, }; _subs[cmd.Sid!] = sub; sub.Client = this; if (Router is ISubListAccess sl) sl.SubList.Insert(sub); } ``` A `Subscription` is stored in `_subs` (keyed by SID) and inserted into the shared `SubList` trie. The `Client` back-reference on `Subscription` is set to `this` so that `NatsServer.ProcessMessage` can reach the client from the subscription without a separate lookup. `Router is ISubListAccess sl` checks the interface at runtime. In production, `Router` is `NatsServer`, which implements both interfaces. In tests using a stub `IMessageRouter` that does not implement `ISubListAccess`, the insert is silently skipped. ### UNSUB ```csharp private void ProcessUnsub(ParsedCommand cmd) { if (!_subs.TryGetValue(cmd.Sid!, out var sub)) return; if (cmd.MaxMessages > 0) { sub.MaxMessages = cmd.MaxMessages; return; } _subs.Remove(cmd.Sid!); if (Router is ISubListAccess sl) sl.SubList.Remove(sub); } ``` UNSUB has two modes: - With `max_msgs > 0`: sets `sub.MaxMessages` to limit future deliveries. The subscription stays in the trie and the client's `_subs` dict. `DeliverMessage` in `NatsServer` checks `MessageCount` against `MaxMessages` on each delivery and silently drops messages beyond the limit. - Without `max_msgs` (or `max_msgs == 0`): removes the subscription immediately from both `_subs` and the `SubList`. ### PUB and HPUB ```csharp private void ProcessPub(ParsedCommand cmd) { Interlocked.Increment(ref InMsgs); Interlocked.Add(ref InBytes, cmd.Payload.Length); ReadOnlyMemory headers = default; ReadOnlyMemory payload = cmd.Payload; if (cmd.Type == CommandType.HPub && cmd.HeaderSize > 0) { headers = cmd.Payload[..cmd.HeaderSize]; payload = cmd.Payload[cmd.HeaderSize..]; } Router?.ProcessMessage(cmd.Subject!, cmd.ReplyTo, headers, payload, this); } ``` Stats are updated before routing. For HPUB, the combined payload from the parser is split into a header slice and a body slice using `cmd.HeaderSize`. Both slices are `ReadOnlyMemory` views over the same backing array — no copy. `Router.ProcessMessage` then delivers to all matching subscribers. ## Write Serialization Multiple concurrent `SendMessageAsync` calls can arrive from different publisher connections at the same time. Without coordination, their writes would interleave on the socket and corrupt the message stream for the receiving client. `_writeLock` prevents this: ```csharp public async Task SendMessageAsync(string subject, string sid, string? replyTo, ReadOnlyMemory headers, ReadOnlyMemory payload, CancellationToken ct) { Interlocked.Increment(ref OutMsgs); Interlocked.Add(ref OutBytes, payload.Length + headers.Length); byte[] line; if (headers.Length > 0) { int totalSize = headers.Length + payload.Length; line = Encoding.ASCII.GetBytes( $"HMSG {subject} {sid} {(replyTo != null ? replyTo + " " : "")}{headers.Length} {totalSize}\r\n"); } else { line = Encoding.ASCII.GetBytes( $"MSG {subject} {sid} {(replyTo != null ? replyTo + " " : "")}{payload.Length}\r\n"); } await _writeLock.WaitAsync(ct); try { await _stream.WriteAsync(line, ct); if (headers.Length > 0) await _stream.WriteAsync(headers, ct); if (payload.Length > 0) await _stream.WriteAsync(payload, ct); await _stream.WriteAsync(NatsProtocol.CrLf, ct); await _stream.FlushAsync(ct); } finally { _writeLock.Release(); } } ``` The control line is constructed before acquiring the lock so the string formatting work happens outside the critical section. Once the lock is held, all writes for one message — control line, optional headers, payload, and trailing `\r\n` — happen atomically from the perspective of other writers. Stats (`OutMsgs`, `OutBytes`) are updated before the lock because they are independent of the write ordering constraint. ## Subscription Cleanup `RemoveAllSubscriptions` is called by `NatsServer.RemoveClient` when a connection ends: ```csharp public void RemoveAllSubscriptions(SubList subList) { foreach (var sub in _subs.Values) subList.Remove(sub); _subs.Clear(); } ``` This removes every subscription this client holds from the shared `SubList` trie, then clears the local dictionary. After this call, no future `ProcessMessage` call will deliver to this client's subscriptions. ## Dispose ```csharp public void Dispose() { _stream.Dispose(); _socket.Dispose(); _writeLock.Dispose(); } ``` Disposing `_stream` closes the network stream. Disposing `_socket` closes the OS socket. Any in-flight `ReadAsync` or `WriteAsync` will fault with an `ObjectDisposedException` or `IOException`, which causes the read/write tasks to terminate. `_writeLock` is disposed last to release the `SemaphoreSlim`'s internal handle. ## Go Reference The Go counterpart is `golang/nats-server/server/client.go`. Key differences in the .NET port: - Go uses separate goroutines for `readLoop` and `writeLoop`; the .NET port uses `FillPipeAsync` and `ProcessCommandsAsync` as concurrent `Task`s sharing a `Pipe`. - Go uses dynamic buffer sizing (512 to 65536 bytes) in `readLoop`; the .NET port requests 4096-byte chunks from the `PipeWriter`. - Go uses a mutex for write serialization (`c.mu`); the .NET port uses `SemaphoreSlim(1,1)` to allow `await`-based waiting without blocking a thread. - The `System.IO.Pipelines` `Pipe` replaces Go's direct `net.Conn` reads. This separates the I/O pump from command parsing and avoids partial-read handling in the parser itself. ## Related Documentation - [Server Overview](Overview.md) - [Protocol Parser](../Protocol/Parser.md) - [SubList Trie](../Subscriptions/SubList.md) - [Subscriptions Overview](../Subscriptions/Overview.md)