# Architecture This document describes the overall architecture of the NATS .NET server — its layers, component responsibilities, message flow, and the mapping from the Go reference implementation. ## Project Overview This project is a port of the [NATS server](https://github.com/nats-io/nats-server) (`golang/nats-server/`) to .NET 10 / C#. The Go source in `golang/nats-server/server/` is the authoritative reference. Current scope: base publish-subscribe server with wildcard subject matching and queue groups. Authentication, clustering (routes, gateways, leaf nodes), JetStream, and HTTP monitoring are not yet implemented. --- ## Solution Layout ``` NatsDotNet.slnx src/ NATS.Server/ # Core server library — no executable entry point NATS.Server.Host/ # Console application — wires logging, parses CLI args, starts server tests/ NATS.Server.Tests/ # xUnit test project — unit and integration tests ``` `NATS.Server` depends only on `Microsoft.Extensions.Logging.Abstractions`. All Serilog wiring is in `NATS.Server.Host`. This keeps the core library testable without a console host. --- ## Layers The implementation is organized bottom-up: each layer depends only on the layers below it. ``` NatsServer (orchestrator: accept loop, client registry, message routing) └── NatsClient (per-connection: I/O pipeline, command dispatch, subscription tracking) └── NatsParser (protocol state machine: bytes → ParsedCommand) └── SubList (trie: subject → matching subscriptions) └── SubjectMatch (validation and wildcard matching primitives) ``` ### SubjectMatch and SubList `SubjectMatch` (`Subscriptions/SubjectMatch.cs`) provides the primitive operations used throughout the subscription system: - `IsValidSubject(string)` — rejects empty tokens, whitespace, tokens after `>` - `IsLiteral(string)` — returns `false` if the subject contains a bare `*` or `>` wildcard token - `IsValidPublishSubject(string)` — combines both; publish subjects must be literal - `MatchLiteral(string literal, string pattern)` — token-by-token matching for cache maintenance `SubList` (`Subscriptions/SubList.cs`) is a trie-based subscription store. Each trie level (`TrieLevel`) holds a `Dictionary` for literal tokens plus dedicated `Pwc` and `Fwc` pointers for `*` and `>` wildcards. Each `TrieNode` holds `PlainSubs` (a `HashSet`) and `QueueSubs` (a `Dictionary>` keyed by queue group name). `SubList.Match(string subject)` checks an in-memory cache first, then falls back to a recursive trie walk (`MatchLevel`) if there is a cache miss. The result is stored as a `SubListResult` — an immutable snapshot containing `PlainSubs` (array) and `QueueSubs` (jagged array, one sub-array per group). The cache holds up to 1,024 entries. When that limit is exceeded, 256 entries are swept. Wildcard subscriptions invalidate all matching cache keys on insert and removal; literal subscriptions invalidate only their own key. ### NatsParser `NatsParser` (`Protocol/NatsParser.cs`) is a stateless-per-invocation parser that operates on `ReadOnlySequence` from `System.IO.Pipelines`. The public entry point is: ```csharp public bool TryParse(ref ReadOnlySequence buffer, out ParsedCommand command) ``` It reads one control line at a time (up to `NatsProtocol.MaxControlLineSize` = 4,096 bytes), identifies the command by the first two lowercased bytes, then dispatches to a per-command parse method. Commands with payloads (`PUB`, `HPUB`) set `_awaitingPayload = true` and return on the next call via `TryReadPayload` once enough bytes arrive. This handles TCP fragmentation without buffering the entire payload before parsing begins. `ParsedCommand` is a `readonly struct` — zero heap allocation for control-only commands (`PING`, `PONG`, `SUB`, `UNSUB`, `CONNECT`). Payload commands allocate a `byte[]` for the payload body. Command dispatch in `NatsClient.DispatchCommandAsync` covers: `Connect`, `Ping`/`Pong`, `Sub`, `Unsub`, `Pub`, `HPub`. ### NatsClient `NatsClient` (`NatsClient.cs`) handles a single TCP connection. On `RunAsync`, it sends the initial `INFO` frame and then starts two concurrent tasks: ```csharp var fillTask = FillPipeAsync(pipe.Writer, ct); // socket → PipeWriter var processTask = ProcessCommandsAsync(pipe.Reader, ct); // PipeReader → parser → dispatch ``` `FillPipeAsync` reads from the `NetworkStream` into a `PipeWriter` in 4,096-byte chunks. `ProcessCommandsAsync` reads from the `PipeReader`, calls `NatsParser.TryParse` in a loop, and dispatches each `ParsedCommand`. The tasks share a `Pipe` instance from `System.IO.Pipelines`. Either task completing (EOF, cancellation, or error) causes `RunAsync` to return, which triggers cleanup via `Router.RemoveClient(this)`. Write serialization uses a `SemaphoreSlim(1,1)` (`_writeLock`). All outbound writes (`SendMessageAsync`, `WriteAsync`) acquire this lock before touching the `NetworkStream`, preventing interleaved writes from concurrent message deliveries. Subscription state is a `Dictionary` keyed by SID. This dictionary is accessed only from the single processing task, so no locking is needed. `SUB` inserts into this dictionary and into `SubList`; `UNSUB` either sets `MaxMessages` for auto-unsubscribe or immediately removes from both. `NatsClient` exposes two interfaces to `NatsServer`: ```csharp public interface IMessageRouter { void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory headers, ReadOnlyMemory payload, NatsClient sender); void RemoveClient(NatsClient client); } public interface ISubListAccess { SubList SubList { get; } } ``` `NatsServer` implements both. `NatsClient.Router` is set to the server instance immediately after construction. ### NatsServer `NatsServer` (`NatsServer.cs`) owns the TCP listener, the shared `SubList`, and the client registry. Its `StartAsync` method runs the accept loop: ```csharp public async Task StartAsync(CancellationToken ct) { _listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); _listener.Bind(new IPEndPoint( _options.Host == "0.0.0.0" ? IPAddress.Any : IPAddress.Parse(_options.Host), _options.Port)); _listener.Listen(128); // ... while (!ct.IsCancellationRequested) { var socket = await _listener.AcceptAsync(ct); // create NatsClient, fire-and-forget RunClientAsync } } ``` Each accepted connection gets a unique `clientId` (incremented via `Interlocked.Increment`), a scoped logger, and a `NatsClient` instance registered in `_clients` (`ConcurrentDictionary`). `RunClientAsync` is fired as a detached task — the accept loop does not await it. Message delivery happens in `ProcessMessage`: 1. Call `_subList.Match(subject)` to get a `SubListResult`. 2. Iterate `result.PlainSubs` — deliver to each subscriber, skipping the sender if `echo` is false. 3. Iterate `result.QueueSubs` — for each group, use round-robin (modulo `Interlocked.Increment`) to pick one member to receive the message. Delivery via `SendMessageAsync` is fire-and-forget (not awaited) to avoid blocking the publishing client's processing task while waiting for slow subscribers. --- ## Message Flow ``` Client sends: PUB orders.new 12\r\nhello world\r\n 1. FillPipeAsync reads bytes from socket → PipeWriter 2. ProcessCommandsAsync reads from PipeReader 3. NatsParser.TryParse parses control line "PUB orders.new 12\r\n" sets _awaitingPayload = true on next call: reads payload + \r\n trailer returns ParsedCommand { Type=Pub, Subject="orders.new", Payload=[...] } 4. DispatchCommandAsync calls ProcessPub(cmd) 5. ProcessPub calls Router.ProcessMessage("orders.new", null, default, payload, this) 6. NatsServer.ProcessMessage → _subList.Match("orders.new") returns SubListResult { PlainSubs=[sub1, sub2], QueueSubs=[[sub3, sub4]] } → DeliverMessage(sub1, ...) → sub1.Client.SendMessageAsync(...) → DeliverMessage(sub2, ...) → sub2.Client.SendMessageAsync(...) → round-robin pick from [sub3, sub4], e.g. sub3 → DeliverMessage(sub3, ...) → sub3.Client.SendMessageAsync(...) 7. SendMessageAsync acquires _writeLock, writes MSG frame to socket ``` --- ## Go Reference Mapping | Go source | .NET source | |-----------|-------------| | `server/sublist.go` | `src/NATS.Server/Subscriptions/SubList.cs` | | `server/parser.go` | `src/NATS.Server/Protocol/NatsParser.cs` | | `server/client.go` | `src/NATS.Server/NatsClient.cs` | | `server/server.go` | `src/NATS.Server/NatsServer.cs` | | `server/opts.go` | `src/NATS.Server/NatsOptions.cs` | The Go `sublist.go` uses atomic generation counters to invalidate a result cache. The .NET `SubList` uses a different strategy: it maintains the cache under `ReaderWriterLockSlim` and does targeted invalidation at insert/remove time, avoiding the need for generation counters. The Go `client.go` uses goroutines for `readLoop` and `writeLoop`. The .NET equivalent uses `async Task` with `System.IO.Pipelines`: `FillPipeAsync` (writer side) and `ProcessCommandsAsync` (reader side). --- ## Key .NET Design Choices | Concern | .NET choice | Reason | |---------|-------------|--------| | I/O buffering | `System.IO.Pipelines` (`Pipe`, `PipeReader`, `PipeWriter`) | Zero-copy buffer management; backpressure built in | | SubList thread safety | `ReaderWriterLockSlim` | Multiple concurrent readers (match), exclusive writers (insert/remove) | | Client registry | `ConcurrentDictionary` | Lock-free concurrent access from accept loop and cleanup tasks | | Write serialization | `SemaphoreSlim(1,1)` per client | Prevents interleaved MSG frames from concurrent deliveries | | Concurrency | `async/await` + `Task` | Maps Go goroutines to .NET task-based async; no dedicated threads per connection | | Protocol constants | `NatsProtocol` static class | Pre-encoded byte arrays (`PongBytes`, `CrLf`, etc.) avoid per-call allocations | --- ## Related Documentation - [Setup](./Setup.md) - [Protocol Overview](../Protocol/Overview.md) - [Subscriptions Overview](../Subscriptions/Overview.md) - [Server Overview](../Server/Overview.md) - [Configuration Overview](../Configuration/Overview.md)