- Add Microsoft.Extensions.Logging + Serilog to NatsServer and NatsClient - Convert all test assertions from xUnit Assert to Shouldly - Add NSubstitute package for future mocking needs - Introduce Central Package Management via Directory.Packages.props - Add documentation_rules.md with style guide, generation/update rules, component map - Generate 10 documentation files across 5 component folders (GettingStarted, Protocol, Subscriptions, Server, Configuration/Operations) - Update CLAUDE.md with logging, testing, porting, agent model, CPM, and documentation guidance
10 KiB
Architecture
This document describes the overall architecture of the NATS .NET server — its layers, component responsibilities, message flow, and the mapping from the Go reference implementation.
Project Overview
This project is a port of the NATS server (golang/nats-server/) to .NET 10 / C#. The Go source in golang/nats-server/server/ is the authoritative reference.
Current scope: base publish-subscribe server with wildcard subject matching and queue groups. Authentication, clustering (routes, gateways, leaf nodes), JetStream, and HTTP monitoring are not yet implemented.
Solution Layout
NatsDotNet.slnx
src/
NATS.Server/ # Core server library — no executable entry point
NATS.Server.Host/ # Console application — wires logging, parses CLI args, starts server
tests/
NATS.Server.Tests/ # xUnit test project — unit and integration tests
NATS.Server depends only on Microsoft.Extensions.Logging.Abstractions. All Serilog wiring is in NATS.Server.Host. This keeps the core library testable without a console host.
Layers
The implementation is organized bottom-up: each layer depends only on the layers below it.
NatsServer (orchestrator: accept loop, client registry, message routing)
└── NatsClient (per-connection: I/O pipeline, command dispatch, subscription tracking)
└── NatsParser (protocol state machine: bytes → ParsedCommand)
└── SubList (trie: subject → matching subscriptions)
└── SubjectMatch (validation and wildcard matching primitives)
SubjectMatch and SubList
SubjectMatch (Subscriptions/SubjectMatch.cs) provides the primitive operations used throughout the subscription system:
IsValidSubject(string)— rejects empty tokens, whitespace, tokens after>IsLiteral(string)— returnsfalseif the subject contains a bare*or>wildcard tokenIsValidPublishSubject(string)— combines both; publish subjects must be literalMatchLiteral(string literal, string pattern)— token-by-token matching for cache maintenance
SubList (Subscriptions/SubList.cs) is a trie-based subscription store. Each trie level (TrieLevel) holds a Dictionary<string, TrieNode> for literal tokens plus dedicated Pwc and Fwc pointers for * and > wildcards. Each TrieNode holds PlainSubs (a HashSet<Subscription>) and QueueSubs (a Dictionary<string, HashSet<Subscription>> keyed by queue group name).
SubList.Match(string subject) checks an in-memory cache first, then falls back to a recursive trie walk (MatchLevel) if there is a cache miss. The result is stored as a SubListResult — an immutable snapshot containing PlainSubs (array) and QueueSubs (jagged array, one sub-array per group).
The cache holds up to 1,024 entries. When that limit is exceeded, 256 entries are swept. Wildcard subscriptions invalidate all matching cache keys on insert and removal; literal subscriptions invalidate only their own key.
NatsParser
NatsParser (Protocol/NatsParser.cs) is a stateless-per-invocation parser that operates on ReadOnlySequence<byte> from System.IO.Pipelines. The public entry point is:
public bool TryParse(ref ReadOnlySequence<byte> buffer, out ParsedCommand command)
It reads one control line at a time (up to NatsProtocol.MaxControlLineSize = 4,096 bytes), identifies the command by the first two lowercased bytes, then dispatches to a per-command parse method. Commands with payloads (PUB, HPUB) set _awaitingPayload = true and return on the next call via TryReadPayload once enough bytes arrive. This handles TCP fragmentation without buffering the entire payload before parsing begins.
ParsedCommand is a readonly struct — zero heap allocation for control-only commands (PING, PONG, SUB, UNSUB, CONNECT). Payload commands allocate a byte[] for the payload body.
Command dispatch in NatsClient.DispatchCommandAsync covers: Connect, Ping/Pong, Sub, Unsub, Pub, HPub.
NatsClient
NatsClient (NatsClient.cs) handles a single TCP connection. On RunAsync, it sends the initial INFO frame and then starts two concurrent tasks:
var fillTask = FillPipeAsync(pipe.Writer, ct); // socket → PipeWriter
var processTask = ProcessCommandsAsync(pipe.Reader, ct); // PipeReader → parser → dispatch
FillPipeAsync reads from the NetworkStream into a PipeWriter in 4,096-byte chunks. ProcessCommandsAsync reads from the PipeReader, calls NatsParser.TryParse in a loop, and dispatches each ParsedCommand. The tasks share a Pipe instance from System.IO.Pipelines. Either task completing (EOF, cancellation, or error) causes RunAsync to return, which triggers cleanup via Router.RemoveClient(this).
Write serialization uses a SemaphoreSlim(1,1) (_writeLock). All outbound writes (SendMessageAsync, WriteAsync) acquire this lock before touching the NetworkStream, preventing interleaved writes from concurrent message deliveries.
Subscription state is a Dictionary<string, Subscription> keyed by SID. This dictionary is accessed only from the single processing task, so no locking is needed. SUB inserts into this dictionary and into SubList; UNSUB either sets MaxMessages for auto-unsubscribe or immediately removes from both.
NatsClient exposes two interfaces to NatsServer:
public interface IMessageRouter
{
void ProcessMessage(string subject, string? replyTo,
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload, NatsClient sender);
void RemoveClient(NatsClient client);
}
public interface ISubListAccess
{
SubList SubList { get; }
}
NatsServer implements both. NatsClient.Router is set to the server instance immediately after construction.
NatsServer
NatsServer (NatsServer.cs) owns the TCP listener, the shared SubList, and the client registry. Its StartAsync method runs the accept loop:
public async Task StartAsync(CancellationToken ct)
{
_listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
_listener.Bind(new IPEndPoint(
_options.Host == "0.0.0.0" ? IPAddress.Any : IPAddress.Parse(_options.Host),
_options.Port));
_listener.Listen(128);
// ...
while (!ct.IsCancellationRequested)
{
var socket = await _listener.AcceptAsync(ct);
// create NatsClient, fire-and-forget RunClientAsync
}
}
Each accepted connection gets a unique clientId (incremented via Interlocked.Increment), a scoped logger, and a NatsClient instance registered in _clients (ConcurrentDictionary<ulong, NatsClient>). RunClientAsync is fired as a detached task — the accept loop does not await it.
Message delivery happens in ProcessMessage:
- Call
_subList.Match(subject)to get aSubListResult. - Iterate
result.PlainSubs— deliver to each subscriber, skipping the sender ifechois false. - Iterate
result.QueueSubs— for each group, use round-robin (moduloInterlocked.Increment) to pick one member to receive the message.
Delivery via SendMessageAsync is fire-and-forget (not awaited) to avoid blocking the publishing client's processing task while waiting for slow subscribers.
Message Flow
Client sends: PUB orders.new 12\r\nhello world\r\n
1. FillPipeAsync reads bytes from socket → PipeWriter
2. ProcessCommandsAsync reads from PipeReader
3. NatsParser.TryParse parses control line "PUB orders.new 12\r\n"
sets _awaitingPayload = true
on next call: reads payload + \r\n trailer
returns ParsedCommand { Type=Pub, Subject="orders.new", Payload=[...] }
4. DispatchCommandAsync calls ProcessPub(cmd)
5. ProcessPub calls Router.ProcessMessage("orders.new", null, default, payload, this)
6. NatsServer.ProcessMessage
→ _subList.Match("orders.new")
returns SubListResult { PlainSubs=[sub1, sub2], QueueSubs=[[sub3, sub4]] }
→ DeliverMessage(sub1, ...) → sub1.Client.SendMessageAsync(...)
→ DeliverMessage(sub2, ...) → sub2.Client.SendMessageAsync(...)
→ round-robin pick from [sub3, sub4], e.g. sub3
→ DeliverMessage(sub3, ...) → sub3.Client.SendMessageAsync(...)
7. SendMessageAsync acquires _writeLock, writes MSG frame to socket
Go Reference Mapping
| Go source | .NET source |
|---|---|
server/sublist.go |
src/NATS.Server/Subscriptions/SubList.cs |
server/parser.go |
src/NATS.Server/Protocol/NatsParser.cs |
server/client.go |
src/NATS.Server/NatsClient.cs |
server/server.go |
src/NATS.Server/NatsServer.cs |
server/opts.go |
src/NATS.Server/NatsOptions.cs |
The Go sublist.go uses atomic generation counters to invalidate a result cache. The .NET SubList uses a different strategy: it maintains the cache under ReaderWriterLockSlim and does targeted invalidation at insert/remove time, avoiding the need for generation counters.
The Go client.go uses goroutines for readLoop and writeLoop. The .NET equivalent uses async Task with System.IO.Pipelines: FillPipeAsync (writer side) and ProcessCommandsAsync (reader side).
Key .NET Design Choices
| Concern | .NET choice | Reason |
|---|---|---|
| I/O buffering | System.IO.Pipelines (Pipe, PipeReader, PipeWriter) |
Zero-copy buffer management; backpressure built in |
| SubList thread safety | ReaderWriterLockSlim |
Multiple concurrent readers (match), exclusive writers (insert/remove) |
| Client registry | ConcurrentDictionary<ulong, NatsClient> |
Lock-free concurrent access from accept loop and cleanup tasks |
| Write serialization | SemaphoreSlim(1,1) per client |
Prevents interleaved MSG frames from concurrent deliveries |
| Concurrency | async/await + Task |
Maps Go goroutines to .NET task-based async; no dedicated threads per connection |
| Protocol constants | NatsProtocol static class |
Pre-encoded byte arrays (PongBytes, CrLf, etc.) avoid per-call allocations |