Files
natsdotnet/Documentation/GettingStarted/Architecture.md
Joseph Doherty 539b2b7588 feat: add structured logging, Shouldly assertions, CPM, and project documentation
- Add Microsoft.Extensions.Logging + Serilog to NatsServer and NatsClient
- Convert all test assertions from xUnit Assert to Shouldly
- Add NSubstitute package for future mocking needs
- Introduce Central Package Management via Directory.Packages.props
- Add documentation_rules.md with style guide, generation/update rules, component map
- Generate 10 documentation files across 5 component folders (GettingStarted, Protocol, Subscriptions, Server, Configuration/Operations)
- Update CLAUDE.md with logging, testing, porting, agent model, CPM, and documentation guidance
2026-02-22 21:05:53 -05:00

10 KiB

Architecture

This document describes the overall architecture of the NATS .NET server — its layers, component responsibilities, message flow, and the mapping from the Go reference implementation.

Project Overview

This project is a port of the NATS server (golang/nats-server/) to .NET 10 / C#. The Go source in golang/nats-server/server/ is the authoritative reference.

Current scope: base publish-subscribe server with wildcard subject matching and queue groups. Authentication, clustering (routes, gateways, leaf nodes), JetStream, and HTTP monitoring are not yet implemented.


Solution Layout

NatsDotNet.slnx
src/
  NATS.Server/          # Core server library — no executable entry point
  NATS.Server.Host/     # Console application — wires logging, parses CLI args, starts server
tests/
  NATS.Server.Tests/    # xUnit test project — unit and integration tests

NATS.Server depends only on Microsoft.Extensions.Logging.Abstractions. All Serilog wiring is in NATS.Server.Host. This keeps the core library testable without a console host.


Layers

The implementation is organized bottom-up: each layer depends only on the layers below it.

NatsServer          (orchestrator: accept loop, client registry, message routing)
    └── NatsClient  (per-connection: I/O pipeline, command dispatch, subscription tracking)
            └── NatsParser      (protocol state machine: bytes → ParsedCommand)
            └── SubList         (trie: subject → matching subscriptions)
                    └── SubjectMatch    (validation and wildcard matching primitives)

SubjectMatch and SubList

SubjectMatch (Subscriptions/SubjectMatch.cs) provides the primitive operations used throughout the subscription system:

  • IsValidSubject(string) — rejects empty tokens, whitespace, tokens after >
  • IsLiteral(string) — returns false if the subject contains a bare * or > wildcard token
  • IsValidPublishSubject(string) — combines both; publish subjects must be literal
  • MatchLiteral(string literal, string pattern) — token-by-token matching for cache maintenance

SubList (Subscriptions/SubList.cs) is a trie-based subscription store. Each trie level (TrieLevel) holds a Dictionary<string, TrieNode> for literal tokens plus dedicated Pwc and Fwc pointers for * and > wildcards. Each TrieNode holds PlainSubs (a HashSet<Subscription>) and QueueSubs (a Dictionary<string, HashSet<Subscription>> keyed by queue group name).

SubList.Match(string subject) checks an in-memory cache first, then falls back to a recursive trie walk (MatchLevel) if there is a cache miss. The result is stored as a SubListResult — an immutable snapshot containing PlainSubs (array) and QueueSubs (jagged array, one sub-array per group).

The cache holds up to 1,024 entries. When that limit is exceeded, 256 entries are swept. Wildcard subscriptions invalidate all matching cache keys on insert and removal; literal subscriptions invalidate only their own key.

NatsParser

NatsParser (Protocol/NatsParser.cs) is a stateless-per-invocation parser that operates on ReadOnlySequence<byte> from System.IO.Pipelines. The public entry point is:

public bool TryParse(ref ReadOnlySequence<byte> buffer, out ParsedCommand command)

It reads one control line at a time (up to NatsProtocol.MaxControlLineSize = 4,096 bytes), identifies the command by the first two lowercased bytes, then dispatches to a per-command parse method. Commands with payloads (PUB, HPUB) set _awaitingPayload = true and return on the next call via TryReadPayload once enough bytes arrive. This handles TCP fragmentation without buffering the entire payload before parsing begins.

ParsedCommand is a readonly struct — zero heap allocation for control-only commands (PING, PONG, SUB, UNSUB, CONNECT). Payload commands allocate a byte[] for the payload body.

Command dispatch in NatsClient.DispatchCommandAsync covers: Connect, Ping/Pong, Sub, Unsub, Pub, HPub.

NatsClient

NatsClient (NatsClient.cs) handles a single TCP connection. On RunAsync, it sends the initial INFO frame and then starts two concurrent tasks:

var fillTask    = FillPipeAsync(pipe.Writer, ct);   // socket → PipeWriter
var processTask = ProcessCommandsAsync(pipe.Reader, ct); // PipeReader → parser → dispatch

FillPipeAsync reads from the NetworkStream into a PipeWriter in 4,096-byte chunks. ProcessCommandsAsync reads from the PipeReader, calls NatsParser.TryParse in a loop, and dispatches each ParsedCommand. The tasks share a Pipe instance from System.IO.Pipelines. Either task completing (EOF, cancellation, or error) causes RunAsync to return, which triggers cleanup via Router.RemoveClient(this).

Write serialization uses a SemaphoreSlim(1,1) (_writeLock). All outbound writes (SendMessageAsync, WriteAsync) acquire this lock before touching the NetworkStream, preventing interleaved writes from concurrent message deliveries.

Subscription state is a Dictionary<string, Subscription> keyed by SID. This dictionary is accessed only from the single processing task, so no locking is needed. SUB inserts into this dictionary and into SubList; UNSUB either sets MaxMessages for auto-unsubscribe or immediately removes from both.

NatsClient exposes two interfaces to NatsServer:

public interface IMessageRouter
{
    void ProcessMessage(string subject, string? replyTo,
        ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload, NatsClient sender);
    void RemoveClient(NatsClient client);
}

public interface ISubListAccess
{
    SubList SubList { get; }
}

NatsServer implements both. NatsClient.Router is set to the server instance immediately after construction.

NatsServer

NatsServer (NatsServer.cs) owns the TCP listener, the shared SubList, and the client registry. Its StartAsync method runs the accept loop:

public async Task StartAsync(CancellationToken ct)
{
    _listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
    _listener.Bind(new IPEndPoint(
        _options.Host == "0.0.0.0" ? IPAddress.Any : IPAddress.Parse(_options.Host),
        _options.Port));
    _listener.Listen(128);
    // ...
    while (!ct.IsCancellationRequested)
    {
        var socket = await _listener.AcceptAsync(ct);
        // create NatsClient, fire-and-forget RunClientAsync
    }
}

Each accepted connection gets a unique clientId (incremented via Interlocked.Increment), a scoped logger, and a NatsClient instance registered in _clients (ConcurrentDictionary<ulong, NatsClient>). RunClientAsync is fired as a detached task — the accept loop does not await it.

Message delivery happens in ProcessMessage:

  1. Call _subList.Match(subject) to get a SubListResult.
  2. Iterate result.PlainSubs — deliver to each subscriber, skipping the sender if echo is false.
  3. Iterate result.QueueSubs — for each group, use round-robin (modulo Interlocked.Increment) to pick one member to receive the message.

Delivery via SendMessageAsync is fire-and-forget (not awaited) to avoid blocking the publishing client's processing task while waiting for slow subscribers.


Message Flow

Client sends: PUB orders.new 12\r\nhello world\r\n

1. FillPipeAsync      reads bytes from socket → PipeWriter
2. ProcessCommandsAsync reads from PipeReader
3. NatsParser.TryParse  parses control line "PUB orders.new 12\r\n"
                         sets _awaitingPayload = true
                         on next call: reads payload + \r\n trailer
                         returns ParsedCommand { Type=Pub, Subject="orders.new", Payload=[...] }
4. DispatchCommandAsync  calls ProcessPub(cmd)
5. ProcessPub            calls Router.ProcessMessage("orders.new", null, default, payload, this)
6. NatsServer.ProcessMessage
       → _subList.Match("orders.new")
           returns SubListResult { PlainSubs=[sub1, sub2], QueueSubs=[[sub3, sub4]] }
       → DeliverMessage(sub1, ...)   → sub1.Client.SendMessageAsync(...)
       → DeliverMessage(sub2, ...)   → sub2.Client.SendMessageAsync(...)
       → round-robin pick from [sub3, sub4], e.g. sub3
       → DeliverMessage(sub3, ...)   → sub3.Client.SendMessageAsync(...)
7. SendMessageAsync  acquires _writeLock, writes MSG frame to socket

Go Reference Mapping

Go source .NET source
server/sublist.go src/NATS.Server/Subscriptions/SubList.cs
server/parser.go src/NATS.Server/Protocol/NatsParser.cs
server/client.go src/NATS.Server/NatsClient.cs
server/server.go src/NATS.Server/NatsServer.cs
server/opts.go src/NATS.Server/NatsOptions.cs

The Go sublist.go uses atomic generation counters to invalidate a result cache. The .NET SubList uses a different strategy: it maintains the cache under ReaderWriterLockSlim and does targeted invalidation at insert/remove time, avoiding the need for generation counters.

The Go client.go uses goroutines for readLoop and writeLoop. The .NET equivalent uses async Task with System.IO.Pipelines: FillPipeAsync (writer side) and ProcessCommandsAsync (reader side).


Key .NET Design Choices

Concern .NET choice Reason
I/O buffering System.IO.Pipelines (Pipe, PipeReader, PipeWriter) Zero-copy buffer management; backpressure built in
SubList thread safety ReaderWriterLockSlim Multiple concurrent readers (match), exclusive writers (insert/remove)
Client registry ConcurrentDictionary<ulong, NatsClient> Lock-free concurrent access from accept loop and cleanup tasks
Write serialization SemaphoreSlim(1,1) per client Prevents interleaved MSG frames from concurrent deliveries
Concurrency async/await + Task Maps Go goroutines to .NET task-based async; no dedicated threads per connection
Protocol constants NatsProtocol static class Pre-encoded byte arrays (PongBytes, CrLf, etc.) avoid per-call allocations