docs: add Authentication, Clustering, JetStream, Monitoring overviews; update existing docs

New files:
- Documentation/Authentication/Overview.md — all 7 auth mechanisms with real source
  snippets (NKey/JWT/username-password/token/TLS mapping), nonce generation, account
  system, permissions, JWT permission templates
- Documentation/Clustering/Overview.md — route TCP handshake, in-process subscription
  propagation, gateway/leaf node stubs, honest gaps list
- Documentation/JetStream/Overview.md — API surface (4 handled subjects), streams,
  consumers, storage (MemStore/FileStore), in-process RAFT, mirror/source, gaps list
- Documentation/Monitoring/Overview.md — all 12 endpoints with real field tables,
  Go compatibility notes

Updated files:
- GettingStarted/Architecture.md — 14-subdirectory tree, real NatsClient/NatsServer
  field snippets, 9 new Go reference rows, Channel write queue design choice
- GettingStarted/Setup.md — xUnit 3, 100 test files grouped by area
- Operations/Overview.md — 99 test files, accurate Program.cs snippet, limitations
  section renamed to "Known Gaps vs Go Reference" with 7 real gaps
- Server/Overview.md — grouped fields, TLS/WS accept path, lame-duck mode, POSIX signals
- Configuration/Overview.md — 14 subsystem option tables, 24-row CLI table, LogOverrides
- Server/Client.md — Channel write queue, 4-task RunAsync, CommandMatrix, real fields

All docs verified against codebase 2026-02-23; 713 tests pass.
This commit is contained in:
Joseph Doherty
2026-02-23 10:14:18 -05:00
parent 9efe787cab
commit e553db6d40
10 changed files with 2415 additions and 186 deletions

View File

@@ -0,0 +1,641 @@
# Authentication Overview
`AuthService` is the single entry point for client authentication. It builds an ordered chain of authenticators from `NatsOptions` at startup and evaluates them in priority order when a client sends a `CONNECT` message. Each authenticator inspects the `ClientAuthContext` and returns an `AuthResult` on success or `null` to pass to the next authenticator in the chain.
---
## How Authentication Works
`AuthService.Build()` constructs the authenticator chain at server startup. The order matches the Go reference (see `golang/nats-server/server/auth.go`, `configureAuthentication`):
```csharp
// AuthService.cs — AuthService.Build()
public static AuthService Build(NatsOptions options)
{
var authenticators = new List<IAuthenticator>();
// TLS certificate mapping (highest priority when enabled)
if (options.TlsMap && options.TlsVerify && options.Users is { Count: > 0 })
authenticators.Add(new TlsMapAuthenticator(options.Users));
// JWT / Operator mode
if (options.TrustedKeys is { Length: > 0 } && options.AccountResolver is not null)
{
authenticators.Add(new JwtAuthenticator(options.TrustedKeys, options.AccountResolver));
nonceRequired = true;
}
// Priority order: NKeys > Users > Token > SimpleUserPassword
if (options.NKeys is { Count: > 0 })
authenticators.Add(new NKeyAuthenticator(options.NKeys));
if (options.Users is { Count: > 0 })
authenticators.Add(new UserPasswordAuthenticator(options.Users));
if (!string.IsNullOrEmpty(options.Authorization))
authenticators.Add(new TokenAuthenticator(options.Authorization));
if (!string.IsNullOrEmpty(options.Username) && !string.IsNullOrEmpty(options.Password))
authenticators.Add(new SimpleUserPasswordAuthenticator(options.Username, options.Password));
}
```
`NonceRequired` is set to `true` when JWT or NKey authenticators are active. The server includes a nonce in the `INFO` message before accepting `CONNECT`, so clients can sign it.
`Authenticate()` iterates the chain and returns the first non-null result. If all authenticators decline and a `NoAuthUser` is configured, it falls back to that user — but only when the client presented no credentials at all:
```csharp
// AuthService.cs — Authenticate() and IsNoCredentials()
public AuthResult? Authenticate(ClientAuthContext context)
{
if (!IsAuthRequired)
return new AuthResult { Identity = string.Empty };
foreach (var authenticator in _authenticators)
{
var result = authenticator.Authenticate(context);
if (result != null)
return result;
}
if (_noAuthUser != null && IsNoCredentials(context))
return ResolveNoAuthUser();
return null;
}
private static bool IsNoCredentials(ClientAuthContext context)
{
var opts = context.Opts;
return string.IsNullOrEmpty(opts.Username)
&& string.IsNullOrEmpty(opts.Password)
&& string.IsNullOrEmpty(opts.Token)
&& string.IsNullOrEmpty(opts.Nkey)
&& string.IsNullOrEmpty(opts.Sig)
&& string.IsNullOrEmpty(opts.JWT);
}
```
A `null` return from `Authenticate()` causes the server to reject the connection with an `-ERR 'Authorization Violation'` message.
---
## Auth Mechanisms
### TLS certificate mapping
`TlsMapAuthenticator` maps a client's TLS certificate to a configured `User` by matching the certificate subject Distinguished Name (DN) or Common Name (CN). This fires only when `tls_map: true` and `tls: { verify: true }` are both set alongside a `users` block.
```csharp
// TlsMapAuthenticator.cs — Authenticate()
public AuthResult? Authenticate(ClientAuthContext context)
{
var cert = context.ClientCertificate;
if (cert == null)
return null;
var dn = cert.SubjectName;
var dnString = dn.Name; // RFC 2253 format
// Try exact DN match first
if (_usersByDn.TryGetValue(dnString, out var user))
return BuildResult(user);
// Try CN extraction
var cn = ExtractCn(dn);
if (cn != null && _usersByCn.TryGetValue(cn, out user))
return BuildResult(user);
return null;
}
private static string? ExtractCn(X500DistinguishedName dn)
{
foreach (var rdn in dn.Name.Split(',', StringSplitOptions.TrimEntries))
{
if (rdn.StartsWith("CN=", StringComparison.OrdinalIgnoreCase))
return rdn[3..];
}
return null;
}
```
CN extraction splits the RFC 2253 DN string on commas and looks for the `CN=` attribute. The username in the `users` block must match either the full DN or the CN value.
### JWT / Operator mode
`JwtAuthenticator` validates client JWTs in operator mode. The server is configured with one or more trusted operator NKey public keys and an `IAccountResolver` that maps account NKey public keys to account JWTs. Validation runs nine steps before authentication succeeds:
```csharp
// JwtAuthenticator.cs — Authenticate() (steps 1-7)
var userClaims = NatsJwt.DecodeUserClaims(jwt); // 1. Decode user JWT
if (userClaims.IsExpired()) return null; // 2. Check expiry
var accountJwt = _resolver.FetchAsync(issuerAccount) // 3. Resolve account JWT
.GetAwaiter().GetResult();
var accountClaims = NatsJwt.DecodeAccountClaims(accountJwt);
if (!IsTrusted(accountClaims.Issuer)) return null; // 4. Account issuer must be trusted operator
// 5. User JWT must be issued by the account or one of its signing keys
if (userIssuer != accountClaims.Subject)
{
var signingKeys = accountClaims.Nats?.SigningKeys;
if (signingKeys is null || !signingKeys.Contains(userIssuer))
return null;
}
if (!userClaims.BearerToken) // 6. Verify nonce signature
{
if (!NatsJwt.VerifyNonce(context.Nonce, context.Opts.Sig, userNkey))
return null;
}
if (revocations.TryGetValue(userClaims.Subject, out var revokedAt)) // 7. Revocation check
if (userClaims.IssuedAt <= revokedAt) return null;
```
The `IAccountResolver` interface decouples JWT storage from the authenticator. `MemAccountResolver` covers tests and simple single-operator deployments; production deployments can supply a resolver backed by a URL or directory:
```csharp
// AccountResolver.cs
public sealed class MemAccountResolver : IAccountResolver
{
private readonly ConcurrentDictionary<string, string> _accounts = new(StringComparer.Ordinal);
public Task<string?> FetchAsync(string accountNkey)
{
_accounts.TryGetValue(accountNkey, out var jwt);
return Task.FromResult(jwt);
}
public Task StoreAsync(string accountNkey, string jwt)
{
_accounts[accountNkey] = jwt;
return Task.CompletedTask;
}
}
```
`NatsJwt.Decode()` splits the token into header, payload, and signature segments and uses `System.Text.Json` to deserialize them. All NATS JWTs use the `ed25519-nkey` algorithm and start with `eyJ` (base64url for `{"`).
### NKey
`NKeyAuthenticator` performs Ed25519 public-key authentication without a JWT. The client sends its public NKey and a base64-encoded signature of the server nonce. The server verifies the signature using the `NATS.NKeys` library:
```csharp
// NKeyAuthenticator.cs — Authenticate()
public AuthResult? Authenticate(ClientAuthContext context)
{
var clientNkey = context.Opts.Nkey;
if (string.IsNullOrEmpty(clientNkey)) return null;
if (!_nkeys.TryGetValue(clientNkey, out var nkeyUser)) return null;
// Decode base64 signature (handle both standard and URL-safe base64)
byte[] sigBytes;
try { sigBytes = Convert.FromBase64String(clientSig); }
catch (FormatException)
{
var padded = clientSig.Replace('-', '+').Replace('_', '/');
padded = padded.PadRight(padded.Length + (4 - padded.Length % 4) % 4, '=');
sigBytes = Convert.FromBase64String(padded);
}
var kp = KeyPair.FromPublicKey(clientNkey);
if (!kp.Verify(context.Nonce, sigBytes)) return null;
return new AuthResult { Identity = clientNkey, AccountName = nkeyUser.Account,
Permissions = nkeyUser.Permissions };
}
```
The signature fallback handles both URL-safe and standard base64 encoding because different NATS client libraries encode signatures differently.
### Username/password — multi-user
`UserPasswordAuthenticator` handles the `users` block where multiple username/password pairs are defined. It supports both plain-text and bcrypt-hashed passwords. The `$2` prefix detection matches the Go server's `isBcrypt()` function:
```csharp
// UserPasswordAuthenticator.cs — ComparePasswords()
private static bool ComparePasswords(string serverPassword, string clientPassword)
{
if (IsBcrypt(serverPassword))
{
try { return BCrypt.Net.BCrypt.Verify(clientPassword, serverPassword); }
catch { return false; }
}
var serverBytes = Encoding.UTF8.GetBytes(serverPassword);
var clientBytes = Encoding.UTF8.GetBytes(clientPassword);
return CryptographicOperations.FixedTimeEquals(serverBytes, clientBytes);
}
private static bool IsBcrypt(string password) => password.StartsWith("$2");
```
Plain-text passwords use `CryptographicOperations.FixedTimeEquals` to prevent timing attacks. Bcrypt hashes are prefixed with `$2a$`, `$2b$`, or `$2y$` depending on the variant.
### Username/password — single user
`SimpleUserPasswordAuthenticator` covers the common case of a single `user`/`password` pair in the server config. It applies constant-time comparison for both the username and password:
```csharp
// SimpleUserPasswordAuthenticator.cs — Authenticate()
public AuthResult? Authenticate(ClientAuthContext context)
{
var clientUsernameBytes = Encoding.UTF8.GetBytes(clientUsername);
if (!CryptographicOperations.FixedTimeEquals(clientUsernameBytes, _expectedUsername))
return null;
var clientPassword = context.Opts.Password ?? string.Empty;
if (!ComparePasswords(_serverPassword, clientPassword))
return null;
return new AuthResult { Identity = clientUsername };
}
```
Comparing the username in constant time prevents an attacker from using response timing to enumerate valid usernames even before the password check.
### Token
`TokenAuthenticator` matches a single opaque authorization token against the `authorization` config key. Comparison is constant-time to prevent length-based timing leaks:
```csharp
// TokenAuthenticator.cs
public sealed class TokenAuthenticator : IAuthenticator
{
private readonly byte[] _expectedToken;
public TokenAuthenticator(string token)
{
_expectedToken = Encoding.UTF8.GetBytes(token);
}
public AuthResult? Authenticate(ClientAuthContext context)
{
var clientToken = context.Opts.Token;
if (string.IsNullOrEmpty(clientToken)) return null;
var clientBytes = Encoding.UTF8.GetBytes(clientToken);
if (!CryptographicOperations.FixedTimeEquals(clientBytes, _expectedToken))
return null;
return new AuthResult { Identity = "token" };
}
}
```
The token authenticator does not associate an account or permissions — those must be managed at the server level.
### No-auth user fallback
When `NoAuthUser` is set in `NatsOptions`, clients that present no credentials at all (no username, password, token, NKey, or JWT) are mapped to that named user. The fallback only applies after all authenticators have declined. The resolution pulls the user's permissions and account assignment from the `users` map built during `Build()`:
```csharp
// AuthService.cs — ResolveNoAuthUser()
private AuthResult? ResolveNoAuthUser()
{
if (_noAuthUser == null) return null;
if (_usersMap != null && _usersMap.TryGetValue(_noAuthUser, out var user))
{
return new AuthResult
{
Identity = user.Username,
AccountName = user.Account,
Permissions = user.Permissions,
Expiry = user.ConnectionDeadline,
};
}
return new AuthResult { Identity = _noAuthUser };
}
```
This pattern lets an operator define one permissive "guest" user and one or more restricted named users without requiring every client to authenticate explicitly.
---
## Nonce Generation
When `NonceRequired` is `true`, the server generates a nonce before sending `INFO` and includes it in the `nonce` field. The client must sign this nonce with its private key and return the signature in the `sig` field of `CONNECT`.
The nonce is 11 random bytes encoded as URL-safe base64 (no padding). 11 bytes produce 15 base64 characters, which avoids padding characters entirely:
```csharp
// AuthService.cs — GenerateNonce() and EncodeNonce()
public byte[] GenerateNonce()
{
Span<byte> raw = stackalloc byte[11];
RandomNumberGenerator.Fill(raw);
return raw.ToArray();
}
public string EncodeNonce(byte[] nonce)
{
return Convert.ToBase64String(nonce)
.TrimEnd('=')
.Replace('+', '-')
.Replace('/', '_');
}
```
The raw nonce bytes (not the base64 string) are passed to `ClientAuthContext.Nonce` so that NKey and JWT signature verification receive the exact bytes that were sent on the wire as a base64 string but verified as the original bytes.
---
## The Account System
Every authenticated connection belongs to an `Account`. Accounts provide subject namespace isolation: each `Account` owns a dedicated `SubList`, so messages published within one account never reach subscribers in another unless an explicit export/import is configured.
```csharp
// Account.cs
public sealed class Account : IDisposable
{
public const string GlobalAccountName = "$G";
public string Name { get; }
public SubList SubList { get; } = new();
public int MaxConnections { get; set; } // 0 = unlimited
public int MaxSubscriptions { get; set; } // 0 = unlimited
public int MaxJetStreamStreams { get; set; } // 0 = unlimited
public ExportMap Exports { get; } = new();
public ImportMap Imports { get; } = new();
}
```
The `$G` (Global) account is the default when no multi-account configuration is present. All clients that authenticate without an explicit account name join `$G`.
Resource limits are enforced with atomic counters at connection and subscription time:
```csharp
// Account.cs — AddClient() and IncrementSubscriptions()
public bool AddClient(ulong clientId)
{
if (MaxConnections > 0 && _clients.Count >= MaxConnections)
return false;
_clients[clientId] = 0;
return true;
}
public bool IncrementSubscriptions()
{
if (MaxSubscriptions > 0 && Volatile.Read(ref _subscriptionCount) >= MaxSubscriptions)
return false;
Interlocked.Increment(ref _subscriptionCount);
return true;
}
public bool TryReserveStream()
{
if (MaxJetStreamStreams > 0 && Volatile.Read(ref _jetStreamStreamCount) >= MaxJetStreamStreams)
return false;
Interlocked.Increment(ref _jetStreamStreamCount);
return true;
}
```
`AddClient` checks the limit before inserting. The `_clients` dictionary uses `ulong` client IDs as keys so `ClientCount` reflects only the current live connections for that account.
User revocation is per-account and supports both individual user NKeys and a `"*"` wildcard that revokes all users issued before a given timestamp:
```csharp
// Account.cs — IsUserRevoked()
public bool IsUserRevoked(string userNkey, long issuedAt)
{
if (_revokedUsers.TryGetValue(userNkey, out var revokedAt))
return issuedAt <= revokedAt;
if (_revokedUsers.TryGetValue("*", out revokedAt))
return issuedAt <= revokedAt;
return false;
}
```
Exports and imports allow subjects to be shared between accounts. A service export makes a set of subjects callable by other accounts; a stream export allows subscriptions. Imports wire an external subject into the current account's namespace under a local alias. Both directions enforce authorization at configuration time via `ExportAuth.IsAuthorized()`.
---
## Permissions
`Permissions` defines per-client publish and subscribe rules via `SubjectPermission` allow/deny lists and an optional `ResponsePermission` for request-reply:
```csharp
// Permissions.cs
public sealed class Permissions
{
public SubjectPermission? Publish { get; init; }
public SubjectPermission? Subscribe { get; init; }
public ResponsePermission? Response { get; init; }
}
public sealed class SubjectPermission
{
public IReadOnlyList<string>? Allow { get; init; }
public IReadOnlyList<string>? Deny { get; init; }
}
public sealed class ResponsePermission
{
public int MaxMsgs { get; init; }
public TimeSpan Expires { get; init; }
}
```
`ClientPermissions.Build()` compiles these into `PermissionSet` instances backed by `SubList` tries, so wildcard patterns in allow/deny lists are matched using the same trie that handles subscriptions. `PermissionSet.IsAllowed()` evaluates allow first, then deny:
```csharp
// ClientPermissions.cs — PermissionSet.IsAllowed()
public bool IsAllowed(string subject)
{
bool allowed = true;
if (_allow != null)
{
var result = _allow.Match(subject);
allowed = result.PlainSubs.Length > 0 || result.QueueSubs.Length > 0;
}
if (allowed && _deny != null)
{
var result = _deny.Match(subject);
allowed = result.PlainSubs.Length == 0 && result.QueueSubs.Length == 0;
}
return allowed;
}
```
A subject is allowed when it matches the allow list (or no allow list exists) and does not match any deny entry. Deny rules take precedence over allow rules when both match.
### PermissionLruCache
Permission checks happen on every `PUB` and `SUB` command. To avoid a `SubList.Match()` call on every message, `ClientPermissions` maintains a `PermissionLruCache` per client for publish results:
```csharp
// PermissionLruCache.cs
public sealed class PermissionLruCache
{
private readonly int _capacity;
private readonly Dictionary<string, LinkedListNode<(string Key, bool Value)>> _map;
private readonly LinkedList<(string Key, bool Value)> _list = new();
public void Set(string key, bool value)
{
if (_map.Count >= _capacity)
{
var last = _list.Last!;
_map.Remove(last.Value.Key);
_list.RemoveLast();
}
var node = new LinkedListNode<(string Key, bool Value)>((key, value));
_list.AddFirst(node);
_map[key] = node;
}
}
```
The default capacity is 128, matching the Go server (`maxPermCacheSize = 128` in `client.go`). Cache hits move the node to the front of the linked list; eviction removes the tail node. The cache is per-client and lock-protected, so contention is low.
Dynamic reply subjects bypass the cache. When a client sends a request with a reply subject, that subject is registered in `ResponseTracker` and bypasses the deny check for the configured window.
### ResponseTracker
`ResponseTracker` maintains the set of reply subjects a client is temporarily permitted to publish to. This enables request-reply patterns for clients whose `Publish` permission list does not include the auto-generated `_INBOX.*` subject:
```csharp
// ResponseTracker.cs — IsReplyAllowed()
public bool IsReplyAllowed(string subject)
{
lock (_lock)
{
if (!_replies.TryGetValue(subject, out var entry))
return false;
if (_expires > TimeSpan.Zero && DateTime.UtcNow - entry.RegisteredAt > _expires)
{
_replies.Remove(subject);
return false;
}
var newCount = entry.Count + 1;
if (_maxMsgs > 0 && newCount > _maxMsgs)
{
_replies.Remove(subject);
return false;
}
_replies[subject] = (entry.RegisteredAt, newCount);
return true;
}
}
```
Each entry tracks registration time and message count. Entries expire by TTL (`_expires`), by message count (`_maxMsgs`), or both. `Prune()` can be called periodically to evict stale entries without waiting for an access attempt.
---
## JWT Permission Templates
When a user connects via JWT, permission subjects can contain mustache-style template expressions that are expanded using claim values from the user and account JWTs. This allows a single JWT template to scope permissions to specific tenants or user identities without issuing unique JWTs for every user.
`PermissionTemplates.Expand()` handles the expansion for a single pattern. When a template expression resolves to multiple values (e.g., a user with two `dept:` tags), the cartesian product of all expansions is computed:
```csharp
// PermissionTemplates.cs — Expand()
public static List<string> Expand(
string pattern, string name, string subject,
string accountName, string accountSubject,
string[] userTags, string[] accountTags)
{
var matches = TemplateRegex().Matches(pattern);
if (matches.Count == 0)
return [pattern];
// Compute cartesian product across all multi-value replacements
var results = new List<string> { pattern };
foreach (var (placeholder, values) in replacements)
{
var next = new List<string>();
foreach (var current in results)
foreach (var value in values)
next.Add(current.Replace(placeholder, value));
results = next;
}
return results;
}
```
Supported template functions:
| Expression | Resolves to |
|---|---|
| `{{name()}}` | User's `name` claim |
| `{{subject()}}` | User's NKey public key (`sub` claim) |
| `{{tag(tagname)}}` | All user tag values for `tagname:` prefix (multi-value) |
| `{{account-name()}}` | Account's `name` claim |
| `{{account-subject()}}` | Account's NKey public key |
| `{{account-tag(tagname)}}` | All account tag values for `tagname:` prefix (multi-value) |
If a tag expression matches no tags, the entire pattern is dropped from the result list (returns empty), not expanded to an empty string. This prevents accidental wildcard grants when a user lacks the expected tag.
`JwtAuthenticator` calls `PermissionTemplates.ExpandAll()` after decoding the user JWT, before constructing the `Permissions` object that goes into `AuthResult`.
---
## AuthResult
`AuthResult` carries the outcome of a successful authentication. All fields are init-only; `AuthResult` is produced by authenticators and consumed by the server when the connection is accepted.
```csharp
// AuthResult.cs
public sealed class AuthResult
{
public required string Identity { get; init; }
public string? AccountName { get; init; }
public Permissions? Permissions { get; init; }
public DateTimeOffset? Expiry { get; init; }
public int MaxJetStreamStreams { get; init; }
public string? JetStreamTier { get; init; }
}
```
| Field | Purpose |
|---|---|
| `Identity` | Human-readable identifier for the client (username, NKey public key, or `"token"`) |
| `AccountName` | The account this client belongs to. `null` falls back to `$G`. |
| `Permissions` | Publish/subscribe/response restrictions. `null` means unrestricted. |
| `Expiry` | When the connection should be terminated. `null` means no expiry. Derived from JWT `exp` or `User.ConnectionDeadline`. |
| `MaxJetStreamStreams` | Maximum JetStream streams this client's account may create. `0` means unlimited. Set by JWT account claims. |
| `JetStreamTier` | JetStream resource tier from the account JWT. Informational; used for multi-tier deployments. |
---
## Configuration
These `NatsOptions` fields control authentication. All fields have zero-value defaults that disable the corresponding mechanism.
| `NatsOptions` field | NATS config key | Description |
|---|---|---|
| `Username` | `authorization.user` | Single username |
| `Password` | `authorization.password` | Single password (plain or bcrypt) |
| `Authorization` | `authorization.token` | Opaque auth token |
| `Users` | `authorization.users` | Multi-user list with per-user permissions |
| `NKeys` | `authorization.nkeys` | NKey user list |
| `NoAuthUser` | `authorization.no_auth_user` | Fallback user for unauthenticated clients |
| `AuthTimeout` | `authorization.timeout` | Seconds allowed for the client to send `CONNECT` (default 2s) |
| `TrustedKeys` | `operator` | Operator NKey public keys for JWT mode |
| `AccountResolver` | _(programmatic)_ | `IAccountResolver` implementation for JWT account lookups |
| `TlsVerify` | `tls.verify` | Require client TLS certificates |
| `TlsMap` | `tls.map` | Map TLS certificate subject to user |
| `Accounts` | `accounts` | Per-account limits (`MaxConnections`, `MaxSubscriptions`) |
Bcrypt-hashed passwords are stored in config as the full bcrypt string (e.g., `$2b$11$...`). The server detects the `$2` prefix and delegates to `BCrypt.Net.BCrypt.Verify()`.
---
## Related Documentation
- [Server Overview](../Server/Overview.md)
- [Configuration Overview](../Configuration/Overview.md)
- [Subscriptions Overview](../Subscriptions/Overview.md)
- [SubList](../Subscriptions/SubList.md)
<!-- Last verified against codebase: 2026-02-23 -->

View File

@@ -0,0 +1,292 @@
# Clustering Overview
This document describes how clustering is implemented in the .NET NATS server port. The Go reference server supports three distinct connection types for clustering: routes, gateways, and leaf nodes. This implementation has partial route support and stub managers for gateways and leaf nodes.
---
## Cluster Topology
The Go reference server uses three connection types, each serving a different topological purpose:
| Connection Type | Default Port | Go Reference | .NET Status |
|----------------|-------------|--------------|-------------|
| Routes | 6222 | Full-mesh TCP connections between servers in a cluster; propagate subscriptions via `RS+`/`RS-` wire protocol; route messages with `RMSG` | TCP handshake and in-process subscription propagation only — no `RMSG`, no `RS+`/`RS-` wire protocol |
| Gateways | 7222 | Inter-cluster bridges with interest-only optimization; reply subject remapping via `_GR_.` prefix | Stub only — `GatewayManager.StartAsync` logs and returns |
| Leaf Nodes | 5222 | Hub-and-spoke edge connections; only subscribed subjects shared with hub | Stub only — `LeafNodeManager.StartAsync` logs and returns |
---
## Routes
### What the Go reference does
In the Go server, routes form a full-mesh TCP connection pool between every pair of cluster peers. Each peer connection carries three kinds of traffic:
- `RS+`/`RS-` — subscribe/unsubscribe propagation so every server knows the full interest set of all peers
- `RMSG` — actual message forwarding when a publisher's server does not locally hold all matching subscribers
- Route pooling — the Go server maintains 3 TCP connections per peer by default to parallelize traffic
Subscription information flows over the wire using the `RS+`/`RS-` protocol, and messages flow over the wire using `RMSG`. This means a client connected to server A can receive a message published on server B without any shared memory.
### What this implementation does
This implementation establishes real TCP connections between route peers and completes a handshake, but subscription propagation happens entirely in-process via a static `ConcurrentDictionary<string, RouteManager>`. Messages are never forwarded over the wire. This means clustering only works when all servers share the same process — which is a test/development topology, not a production one.
### RouteManager
`RouteManager` (`src/NATS.Server/Routes/RouteManager.cs`) owns the listener socket and the set of active `RouteConnection` instances. It also holds the process-wide registry of all `RouteManager` instances, which is the mechanism used for in-process subscription propagation.
**`AcceptLoopAsync`** — accepts inbound TCP connections from peers:
```csharp
private async Task AcceptLoopAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
Socket socket;
try
{
socket = await _listener!.AcceptAsync(ct);
}
catch (OperationCanceledException) { break; }
catch (ObjectDisposedException) { break; }
catch (Exception ex)
{
_logger.LogDebug(ex, "Route accept loop error");
break;
}
_ = Task.Run(() => HandleInboundRouteAsync(socket, ct), ct);
}
}
```
**`ConnectToRouteWithRetryAsync`** — dials each configured seed route with a fixed 250 ms backoff between attempts:
```csharp
private async Task ConnectToRouteWithRetryAsync(string route, CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
try
{
var endPoint = ParseRouteEndpoint(route);
var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await socket.ConnectAsync(endPoint.Address, endPoint.Port, ct);
var connection = new RouteConnection(socket);
await connection.PerformOutboundHandshakeAsync(_serverId, ct);
Register(connection);
return;
}
catch (OperationCanceledException) { return; }
catch (Exception ex)
{
_logger.LogDebug(ex, "Failed to connect route seed {Route}", route);
}
try { await Task.Delay(250, ct); }
catch (OperationCanceledException) { return; }
}
}
```
The 250 ms delay is fixed — there is no exponential backoff.
### PropagateLocalSubscription
When a client on the local server subscribes, `NatsServer` calls `RouteManager.PropagateLocalSubscription`. This does not send any bytes over TCP. Instead, it looks up peer `RouteManager` instances from the static `Managers` dictionary and calls `ReceiveRemoteSubscription` directly on each one:
```csharp
public void PropagateLocalSubscription(string subject, string? queue)
{
if (_connectedServerIds.IsEmpty)
return;
var remoteSub = new RemoteSubscription(subject, queue, _serverId);
foreach (var peerId in _connectedServerIds.Keys)
{
if (Managers.TryGetValue(peerId, out var peer))
peer.ReceiveRemoteSubscription(remoteSub);
}
}
```
`RemoteSubscription` is a record: `record RemoteSubscription(string Subject, string? Queue, string RouteId)`. The receiving manager calls `_remoteSubSink(sub)`, which is wired to `SubList.AddRemoteSubscription` in `NatsServer`.
This design means subscription propagation works only when peer servers run in the same .NET process. No subscription state is exchanged over the TCP connection.
### RouteConnection handshake
`RouteConnection` (`src/NATS.Server/Routes/RouteConnection.cs`) wraps a `Socket` and `NetworkStream`. The handshake is a single line exchange in both directions: `ROUTE <serverId>\r\n`. The initiating side sends first, then reads; the accepting side reads first, then sends.
```csharp
public async Task PerformOutboundHandshakeAsync(string serverId, CancellationToken ct)
{
await WriteLineAsync($"ROUTE {serverId}", ct);
var line = await ReadLineAsync(ct);
RemoteServerId = ParseHandshake(line);
}
public async Task PerformInboundHandshakeAsync(string serverId, CancellationToken ct)
{
var line = await ReadLineAsync(ct);
RemoteServerId = ParseHandshake(line);
await WriteLineAsync($"ROUTE {serverId}", ct);
}
```
`ParseHandshake` validates that the line starts with `"ROUTE "` (case-insensitive) and extracts the server ID from `line[6..]`. An empty or missing ID throws `InvalidOperationException`.
This handshake is not compatible with the Go server's route protocol, which sends a JSON `INFO` block and processes `CONNECT` options.
### WaitUntilClosedAsync
After the handshake completes and the connection is registered, `RouteManager` calls `WaitUntilClosedAsync` on a background task. This reads from the socket in a loop and discards all bytes, returning only when the remote end closes the connection (zero-byte read):
```csharp
public async Task WaitUntilClosedAsync(CancellationToken ct)
{
var buffer = new byte[1024];
while (!ct.IsCancellationRequested)
{
var bytesRead = await _stream.ReadAsync(buffer, ct);
if (bytesRead == 0)
return;
}
}
```
Because no messages are ever sent over a route connection after the handshake, this is the entire post-handshake read loop.
### Deduplication
Duplicate route connections are prevented in `Register`. The deduplication key combines the remote server ID and the remote TCP endpoint:
```csharp
private void Register(RouteConnection route)
{
var key = $"{route.RemoteServerId}:{route.RemoteEndpoint}";
if (!_routes.TryAdd(key, route))
{
_ = route.DisposeAsync();
return;
}
if (route.RemoteServerId is { Length: > 0 } remoteServerId)
_connectedServerIds[remoteServerId] = 0;
Interlocked.Increment(ref _stats.Routes);
_ = Task.Run(() => WatchRouteAsync(key, route, _cts!.Token));
}
```
If both sides of a peer pair initiate connections simultaneously, the second `TryAdd` loses and that connection is disposed. `RemoteEndpoint` falls back to a new GUID string if the socket's `RemoteEndPoint` is null, which prevents a null-keyed entry.
---
## Gateways
`GatewayManager` (`src/NATS.Server/Gateways/GatewayManager.cs`) is a stub. `StartAsync` logs the configured name and listen address at `Debug` level, resets the gateway count in `ServerStats` to zero, and returns a completed task. No socket is bound, no connections are made:
```csharp
public Task StartAsync(CancellationToken ct)
{
_logger.LogDebug("Gateway manager started (name={Name}, listen={Host}:{Port})",
_options.Name, _options.Host, _options.Port);
Interlocked.Exchange(ref _stats.Gateways, 0);
return Task.CompletedTask;
}
```
`GatewayConnection` exists as a skeleton class with only a `RemoteEndpoint` string property — no networking or protocol logic is present.
---
## Leaf Nodes
`LeafNodeManager` (`src/NATS.Server/LeafNodes/LeafNodeManager.cs`) is a stub. `StartAsync` logs the configured listen address at `Debug` level, resets the leaf count in `ServerStats` to zero, and returns a completed task. No socket is bound:
```csharp
public Task StartAsync(CancellationToken ct)
{
_logger.LogDebug("Leaf manager started (listen={Host}:{Port})", _options.Host, _options.Port);
Interlocked.Exchange(ref _stats.Leafs, 0);
return Task.CompletedTask;
}
```
`LeafConnection` follows the same skeleton pattern as `GatewayConnection`.
---
## Configuration
### ClusterOptions
`ClusterOptions` (`src/NATS.Server/Configuration/ClusterOptions.cs`) controls route clustering:
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `Name` | `string?` | `null` | Cluster name; currently unused at runtime |
| `Host` | `string` | `"0.0.0.0"` | Listen address for inbound route connections |
| `Port` | `int` | `6222` | Listen port; set to 0 for OS-assigned port (updated after bind) |
| `Routes` | `List<string>` | `[]` | Seed route endpoints to dial on startup |
### GatewayOptions
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `Name` | `string?` | `null` | Gateway cluster name |
| `Host` | `string` | `"0.0.0.0"` | Listen address (not used; stub only) |
| `Port` | `int` | `0` | Listen port (not used; stub only) |
### LeafNodeOptions
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `Host` | `string` | `"0.0.0.0"` | Listen address (not used; stub only) |
| `Port` | `int` | `0` | Listen port (not used; stub only) |
### Route endpoint format
`ParseRouteEndpoint` in `RouteManager` parses entries in `ClusterOptions.Routes`. The format is a bare `host:port` string — **not** the `nats-route://host:port` URL scheme that the Go server config file uses:
```csharp
private static IPEndPoint ParseRouteEndpoint(string route)
{
var trimmed = route.Trim();
var parts = trimmed.Split(':', 2, StringSplitOptions.TrimEntries | StringSplitOptions.RemoveEmptyEntries);
if (parts.Length != 2)
throw new FormatException($"Invalid route endpoint: '{route}'");
return new IPEndPoint(IPAddress.Parse(parts[0]), int.Parse(parts[1]));
}
```
Only IPv4 addresses are accepted — `IPAddress.Parse` is called directly on `parts[0]` with no hostname resolution. Hostname-based seeds will throw.
---
## What Is Not Implemented
The following features from the Go reference are not present in this codebase:
- **RMSG wire routing** — messages are never sent over a route TCP connection; cross-server delivery only works in-process
- **RS+/RS- wire protocol** — subscription interest is propagated by direct in-process method calls, not over the wire
- **Route pooling** — the Go server opens 3 TCP connections per peer by default; this implementation opens 1
- **Route compression** — the Go server optionally compresses route traffic with S2; no compression is implemented here
- **Solicited routes** — when a Go server connects to a seed, the seed can back-propagate other cluster member addresses for full-mesh formation; this does not occur here
- **Full-mesh auto-formation** — beyond the configured seed list, no additional peer discovery or mesh formation happens
- **Gateways** — no inter-cluster bridge networking; `GatewayManager` is a logging stub
- **Leaf nodes** — no edge node networking; `LeafNodeManager` is a logging stub
- **Route-compatible INFO/CONNECT handshake** — the custom `ROUTE <id>` handshake is not compatible with the Go server's route protocol
---
## Related Documentation
- [Server Overview](../Server/Overview.md)
- [Subscriptions Overview](../Subscriptions/Overview.md)
- [Configuration Overview](../Configuration/Overview.md)
<!-- Last verified against codebase: 2026-02-23 -->

View File

@@ -26,8 +26,12 @@ public sealed class NatsOptions
}
```
// NatsOptions contains 150+ fields organized into subsystem groups; the snippet shows the core network options.
### Option reference
The table below covers the core network options documented in the snippet above. For the full set of option groups, see the subsystem tables that follow.
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `Host` | `string` | `"0.0.0.0"` | Bind address for the TCP listener. Use `"127.0.0.1"` to restrict to loopback. |
@@ -39,6 +43,143 @@ public sealed class NatsOptions
| `PingInterval` | `TimeSpan` | `2 minutes` | Interval between server-initiated `PING` messages to connected clients. |
| `MaxPingsOut` | `int` | `2` | Number of outstanding `PING`s without a `PONG` response before the server disconnects a client. |
### Subscription limits
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `MaxSubs` | `int` | `0` (unlimited) | Maximum subscriptions allowed per client connection. `0` disables the limit. |
| `MaxSubTokens` | `int` | `0` (unlimited) | Maximum number of tokens (dot-separated segments) allowed in a subject. `0` disables the limit. |
### Monitoring
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `MonitorPort` | `int` | `0` (disabled) | HTTP monitoring port. Set to `8222` for the standard NATS monitoring port. |
| `MonitorHost` | `string` | `"0.0.0.0"` | Bind address for the HTTP monitoring listener. |
| `MonitorBasePath` | `string?` | `null` | Optional URL path prefix for all monitoring endpoints (e.g., `"/nats"`). |
| `MonitorHttpsPort` | `int` | `0` (disabled) | HTTPS monitoring port. Requires TLS configuration to be set. |
### Lifecycle
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `MaxConnections` | `int` | `65536` | Maximum concurrent client connections. |
| `MaxPayload` | `int` | `1048576` | Maximum message payload in bytes. |
| `MaxPending` | `long` | `67108864` (64 MB) | Maximum bytes buffered per client before the server applies back-pressure. Matches Go `MAX_PENDING_SIZE`. |
| `WriteDeadline` | `TimeSpan` | `10 seconds` | Deadline for a single write operation to a client socket. Slow clients that cannot consume within this window are disconnected. |
| `LameDuckDuration` | `TimeSpan` | `2 minutes` | How long the server remains in lame-duck mode, draining existing clients before shutting down. |
| `LameDuckGracePeriod` | `TimeSpan` | `10 seconds` | Grace period at the start of lame-duck mode before the server begins rejecting new connections. |
### File paths
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `ConfigFile` | `string?` | `null` | Path to the NATS config file loaded at startup via `-c`. |
| `PidFile` | `string?` | `null` | Path where the server writes its process ID. |
| `PortsFileDir` | `string?` | `null` | Directory where the server writes a JSON file listing its bound ports. |
### Logging
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `Debug` | `bool` | `false` | Enables debug-level log output. Sets Serilog minimum level to `Debug`. |
| `Trace` | `bool` | `false` | Enables trace-level (verbose) log output. Sets Serilog minimum level to `Verbose`, overriding `Debug`. |
| `TraceVerbose` | `bool` | `false` | Enables verbose protocol tracing including message payload content. |
| `Logtime` | `bool` | `true` | Includes timestamps in log output. |
| `LogtimeUTC` | `bool` | `false` | Uses UTC timestamps instead of local time when `Logtime` is `true`. |
| `LogFile` | `string?` | `null` | Path to a log file. When set, the Serilog file sink is activated alongside the console sink. |
| `LogSizeLimit` | `long` | `0` (unlimited) | Maximum log file size in bytes before rotation. `0` disables size-based rotation. |
| `LogMaxFiles` | `int` | `0` (unlimited) | Number of rotated log files to retain. `0` retains all files. |
| `Syslog` | `bool` | `false` | Writes logs to the local syslog daemon. |
| `RemoteSyslog` | `string?` | `null` | UDP endpoint for remote syslog (e.g., `"udp://logs.example.com:514"`). Activates the UDP syslog sink. |
| `LogOverrides` | `Dictionary<string, string>?` | `null` | Per-namespace minimum level overrides applied to Serilog (e.g., `"NATS.Server.NatsClient" -> "Warning"`). |
### Authentication
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `Username` | `string?` | `null` | Single-user password auth: username. |
| `Password` | `string?` | `null` | Single-user password auth: password. |
| `Authorization` | `string?` | `null` | Single shared token auth. Equivalent to `token` in the Go config. |
| `Users` | `IReadOnlyList<User>?` | `null` | Multi-user list with per-user passwords and permissions. |
| `NKeys` | `IReadOnlyList<NKeyUser>?` | `null` | NKey-based user list. Each entry carries a public NKey and optional permissions. |
| `NoAuthUser` | `string?` | `null` | Username of the user to authenticate unauthenticated connections as. Must exist in `Users`. |
| `AuthTimeout` | `TimeSpan` | `2 seconds` | Time allowed for a client to complete the auth handshake. |
### JWT / Operator mode
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `TrustedKeys` | `string[]?` | `null` | Operator public NKeys that are permitted to sign account JWTs. |
| `AccountResolver` | `IAccountResolver?` | `null` | Pluggable resolver used to look up account JWTs by account public key. |
### TLS
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `TlsCert` | `string?` | `null` | Path to the server TLS certificate file (PEM). |
| `TlsKey` | `string?` | `null` | Path to the server TLS private key file (PEM). |
| `TlsCaCert` | `string?` | `null` | Path to the CA certificate file used to verify client certificates. |
| `TlsVerify` | `bool` | `false` | Requires clients to present a valid certificate signed by the CA. |
| `TlsMap` | `bool` | `false` | Maps the TLS client certificate subject to a NATS username for auth. |
| `TlsTimeout` | `TimeSpan` | `2 seconds` | Deadline for completing the TLS handshake. |
| `TlsHandshakeFirst` | `bool` | `false` | Performs the TLS handshake before the NATS `INFO`/`CONNECT` exchange. |
| `TlsHandshakeFirstFallback` | `TimeSpan` | `50 ms` | Time to wait for a TLS client hello before falling back to plain-text when `TlsHandshakeFirst` is `true`. |
| `AllowNonTls` | `bool` | `false` | Accepts non-TLS connections alongside TLS connections. |
| `TlsRateLimit` | `long` | `0` (unlimited) | Maximum new TLS handshakes per second. `0` disables rate limiting. |
| `TlsPinnedCerts` | `HashSet<string>?` | `null` | Set of SHA-256 certificate fingerprints that are permitted. Connections presenting other certs are rejected. |
| `TlsMinVersion` | `SslProtocols` | `Tls12` | Minimum TLS protocol version accepted. |
### OCSP stapling
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `OcspConfig` | `OcspConfig?` | `null` | OCSP stapling settings. When `null`, stapling is disabled. The `OcspConfig` type exposes `Mode` (`Auto`, `Always`, `Must`, `Never`) and `OverrideUrls`. |
| `OcspPeerVerify` | `bool` | `false` | Requires OCSP staples from connecting clients when mutual TLS is enabled. |
### Clustering
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `Cluster` | `ClusterOptions?` | `null` | Cluster listener and route configuration. When `null`, clustering is disabled. `ClusterOptions` exposes `Name`, `Host` (`"0.0.0.0"`), `Port` (`6222`), and `Routes` (list of seed URLs). |
| `Gateway` | `GatewayOptions?` | `null` | Gateway bridge to other clusters. `GatewayOptions` exposes `Name`, `Host`, and `Port`. |
| `LeafNode` | `LeafNodeOptions?` | `null` | Leaf node listener. `LeafNodeOptions` exposes `Host` and `Port`. |
### JetStream
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `JetStream` | `JetStreamOptions?` | `null` | Enables and configures JetStream persistence. When `null`, JetStream is disabled. `JetStreamOptions` exposes `StoreDir` (base directory for file-backed streams), `MaxMemoryStore` (bytes, `0` = unlimited), and `MaxFileStore` (bytes, `0` = unlimited). |
### MQTT
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `Mqtt` | `MqttOptions?` | `null` | MQTT protocol configuration. Config is parsed and stored but no MQTT listener is started yet. `MqttOptions` exposes network (`Host`, `Port`), auth (`Username`, `Password`, `Token`, `NoAuthUser`), TLS, and JetStream integration fields (`JsDomain`, `StreamReplicas`, `AckWait`). |
### WebSocket
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `WebSocket` | `WebSocketOptions` | `new()` | WebSocket transport configuration. Always present; the listener is inactive when `Port` is `-1` (the default). `WebSocketOptions` exposes `Host`, `Port`, `NoTls`, `SameOrigin`, `AllowedOrigins`, `Compression`, `HandshakeTimeout`, per-connection auth fields, and TLS cert paths. |
### Advanced
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `NoHeaderSupport` | `bool` | `false` | Disables NATS header support. Clients are informed via the `INFO` message; `HPUB`/`HMSG` commands are rejected. |
| `DisableSublistCache` | `bool` | `false` | Disables the `SubList` match cache. Useful in benchmarks to isolate raw matching cost. |
| `NoSystemAccount` | `bool` | `false` | Suppresses creation of the built-in `$SYS` account used for system events. |
| `SystemAccount` | `string?` | `null` | Name of the account to use as the system account instead of the built-in default. |
| `MaxClosedClients` | `int` | `10000` | Number of recently closed client records retained for monitoring (`/connz?closed=true`). |
| `ConnectErrorReports` | `int` | `3600` | How often (in attempts) connection errors to routes/gateways are logged. |
| `ReconnectErrorReports` | `int` | `1` | How often reconnect errors are logged. `1` logs every attempt. |
| `MaxTracedMsgLen` | `int` | `0` (unlimited) | Truncation length for message payloads in trace-level logs. `0` logs the full payload. |
| `Tags` | `Dictionary<string, string>?` | `null` | Arbitrary key-value tags exposed via the `/varz` monitoring endpoint. |
| `ClientAdvertise` | `string?` | `null` | Alternative `host:port` advertised to cluster peers for client connections (NAT traversal). |
| `SubjectMappings` | `Dictionary<string, string>?` | `null` | Subject transform rules mapping source patterns to destination templates. |
| `InCmdLine` | `HashSet<string>` | `[]` | Tracks which property names were set via CLI flags. Used during config reload to prevent file-based values from overwriting CLI-supplied ones. Not a user-settable option. |
### How ServerName is resolved
`NatsServer` constructs the `ServerInfo` sent to each client at connection time. If `ServerName` is `null`, it uses `nats-dotnet-{Environment.MachineName}`:
@@ -59,13 +200,39 @@ _serverInfo = new ServerInfo
## CLI Arguments
`Program.cs` parses command-line arguments before creating `NatsServer`. The three supported flags map directly to `NatsOptions` fields:
`Program.cs` parses command-line arguments in two passes before creating `NatsServer`. The first pass scans for `-c` to load a config file as the base `NatsOptions`. The second pass applies all remaining flags on top of the loaded options. Every flag that is processed is recorded in `options.InCmdLine` so that config-file reloads cannot overwrite values that were explicitly supplied on the command line.
| Flag | Alias | Field | Example |
|------|-------|-------|---------|
| `-c` | — | `ConfigFile` (load only) | `-c /etc/nats/server.conf` |
| `-p` | `--port` | `Port` | `-p 14222` |
| `-a` | `--addr` | `Host` | `-a 127.0.0.1` |
| `-n` | `--name` | `ServerName` | `-n my-server` |
| `-m` | `--http_port` | `MonitorPort` | `-m 8222` |
| — | `--http_base_path` | `MonitorBasePath` | `--http_base_path /nats` |
| — | `--https_port` | `MonitorHttpsPort` | `--https_port 8443` |
| — | `--pid` | `PidFile` | `--pid /var/run/nats.pid` |
| — | `--ports_file_dir` | `PortsFileDir` | `--ports_file_dir /tmp` |
| — | `--tlscert` | `TlsCert` | `--tlscert server.pem` |
| — | `--tlskey` | `TlsKey` | `--tlskey server-key.pem` |
| — | `--tlscacert` | `TlsCaCert` | `--tlscacert ca.pem` |
| — | `--tlsverify` | `TlsVerify` | `--tlsverify` |
| `-D` | `--debug` | `Debug` | `-D` |
| `-V` / `-T` | `--trace` | `Trace` | `-V` |
| `-DV` | — | `Debug` + `Trace` | `-DV` |
| `-l` | `--log` / `--log_file` | `LogFile` | `-l /var/log/nats.log` |
| — | `--log_size_limit` | `LogSizeLimit` | `--log_size_limit 104857600` |
| — | `--log_max_files` | `LogMaxFiles` | `--log_max_files 5` |
| — | `--logtime` | `Logtime` | `--logtime false` |
| — | `--logtime_utc` | `LogtimeUTC` | `--logtime_utc` |
| — | `--syslog` | `Syslog` | `--syslog` |
| — | `--remote_syslog` | `RemoteSyslog` | `--remote_syslog udp://logs.example.com:514` |
| — | `--log_level_override` | `LogOverrides` | `--log_level_override NATS.Server.NatsClient=Warning` |
| — | `--service` | Windows Service mode | `--service` |
The `-c` flag is consumed in the first pass and silently skipped in the second pass. Unrecognized flags are silently ignored. There is no `--help` output.
The `InCmdLine` set is used after startup to establish reload precedence. When a config-file reload is triggered (e.g., via `SIGHUP`), `ConfigReloader.MergeCliOverrides` copies the CLI-supplied field values back over the reloaded options, ensuring flags like `-p` or `-D` cannot be reverted by a config change.
```csharp
for (int i = 0; i < args.Length; i++)
@@ -74,19 +241,20 @@ for (int i = 0; i < args.Length; i++)
{
case "-p" or "--port" when i + 1 < args.Length:
options.Port = int.Parse(args[++i]);
options.InCmdLine.Add("Port");
break;
case "-a" or "--addr" when i + 1 < args.Length:
options.Host = args[++i];
options.InCmdLine.Add("Host");
break;
case "-n" or "--name" when i + 1 < args.Length:
options.ServerName = args[++i];
options.InCmdLine.Add("ServerName");
break;
}
}
```
Unrecognized flags are silently ignored. There is no `--help` output.
---
## Protocol Constants
@@ -118,6 +286,35 @@ public static class NatsProtocol
## Logging Configuration
### Debug and Trace flags
`NatsOptions` exposes two boolean flags that control the Serilog minimum log level. `Debug` sets the minimum level to `Debug`; `Trace` sets it to `Verbose` (Serilog's finest level, matching NATS protocol tracing). When both are present, `Trace` wins because `Verbose` is finer than `Debug`. Neither flag changes log output format — only the minimum severity threshold.
`TraceVerbose` is a separate flag that enables payload content in protocol traces. It is not wired to a Serilog level; components that check it emit additional `Verbose`-level log entries that include message body bytes.
### LogOverrides dictionary
`LogOverrides` is a `Dictionary<string, string>?` on `NatsOptions` that maps .NET logger category name prefixes to Serilog level names (`Verbose`, `Debug`, `Information`, `Warning`, `Error`, `Fatal`). Each entry becomes a `MinimumLevel.Override(ns, level)` call in the Serilog configuration:
```csharp
if (options.LogOverrides is not null)
{
foreach (var (ns, level) in options.LogOverrides)
{
if (Enum.TryParse<Serilog.Events.LogEventLevel>(level, true, out var serilogLevel))
logConfig.MinimumLevel.Override(ns, serilogLevel);
}
}
```
This maps directly to Serilog's per-category filtering, which is applied before the global minimum level check. A useful override pattern is silencing the high-volume per-client category while keeping server-level events visible:
```
--log_level_override NATS.Server.NatsClient=Warning
```
The `--log_level_override` CLI flag sets a single entry in `LogOverrides` using `key=value` format. Multiple flags may be supplied to add multiple overrides.
### Serilog setup
Logging uses [Serilog](https://serilog.net/) with the console sink, configured in `Program.cs` before any other code runs:
@@ -182,4 +379,4 @@ finally
- [Operations Overview](../Operations/Overview.md)
- [Server Overview](../Server/Overview.md)
<!-- Last verified against codebase: 2026-02-22 -->
<!-- Last verified against codebase: 2026-02-23 -->

View File

@@ -6,7 +6,7 @@ This document describes the overall architecture of the NATS .NET server — its
This project is a port of the [NATS server](https://github.com/nats-io/nats-server) (`golang/nats-server/`) to .NET 10 / C#. The Go source in `golang/nats-server/server/` is the authoritative reference.
Current scope: base publish-subscribe server with wildcard subject matching and queue groups. Authentication, clustering (routes, gateways, leaf nodes), JetStream, and HTTP monitoring are not yet implemented.
Current scope: core pub/sub with wildcard subject matching and queue groups; authentication (username/password, token, NKey, JWT, TLS client certificate mapping); TLS transport; WebSocket transport; config file parsing with hot reload; clustering via routes (in-process subscription propagation and message routing); gateway and leaf node managers (bootstrapped, protocol stubs); JetStream (streams, consumers, file and memory storage, RAFT consensus); and HTTP monitoring endpoints (`/varz`, `/connz`, `/routez`, `/jsz`, etc.).
---
@@ -15,10 +15,27 @@ Current scope: base publish-subscribe server with wildcard subject matching and
```
NatsDotNet.slnx
src/
NATS.Server/ # Core server library — no executable entry point
NATS.Server.Host/ # Console application — wires logging, parses CLI args, starts server
NATS.Server/ # Core server library — no executable entry point
Auth/ # Auth mechanisms: username/password, token, NKey, JWT, TLS mapping
Configuration/ # Config file lexer/parser, ClusterOptions, JetStreamOptions, etc.
Events/ # Internal event system (connect/disconnect advisory subjects)
Gateways/ # GatewayManager, GatewayConnection (inter-cluster bridge)
Imports/ # Account import/export maps, service latency tracking
JetStream/ # Streams, consumers, storage, API routing, RAFT meta-group
LeafNodes/ # LeafNodeManager, LeafConnection (hub-and-spoke topology)
Monitoring/ # HTTP monitoring server: /varz, /connz, /jsz, /subsz
Protocol/ # NatsParser state machine, NatsProtocol constants and wire helpers
Raft/ # RaftNode, RaftLog, RaftReplicator, snapshot support
Routes/ # RouteManager, RouteConnection (full-mesh cluster routes)
Subscriptions/ # SubList trie, SubjectMatch, Subscription, SubListResult
Tls/ # TLS handshake wrapper, OCSP stapling, TlsRateLimiter
WebSocket/ # WsUpgrade, WsConnection, frame writer and compression
NatsClient.cs # Per-connection client: I/O pipeline, command dispatch, sub tracking
NatsServer.cs # Server orchestrator: accept loop, client registry, message routing
NatsOptions.cs # Top-level configuration model
NATS.Server.Host/ # Console application — wires logging, parses CLI args, starts server
tests/
NATS.Server.Tests/ # xUnit test project — unit and integration tests
NATS.Server.Tests/ # xUnit test project — 92 .cs test files covering all subsystems
```
`NATS.Server` depends only on `Microsoft.Extensions.Logging.Abstractions`. All Serilog wiring is in `NATS.Server.Host`. This keeps the core library testable without a console host.
@@ -68,16 +85,29 @@ Command dispatch in `NatsClient.DispatchCommandAsync` covers: `Connect`, `Ping`/
### NatsClient
`NatsClient` (`NatsClient.cs`) handles a single TCP connection. On `RunAsync`, it sends the initial `INFO` frame and then starts two concurrent tasks:
`NatsClient` (`NatsClient.cs`) handles a single TCP connection. On `RunAsync`, it sends the initial `INFO` frame and then starts two concurrent tasks: `FillPipeAsync` (socket → `PipeWriter`) and `ProcessCommandsAsync` (`PipeReader` → parser → dispatch). The tasks share a `Pipe` from `System.IO.Pipelines`. Either task completing (EOF, cancellation, or error) causes `RunAsync` to return, which triggers cleanup via `Router.RemoveClient(this)`.
Key fields:
```csharp
var fillTask = FillPipeAsync(pipe.Writer, ct); // socket → PipeWriter
var processTask = ProcessCommandsAsync(pipe.Reader, ct); // PipeReader → parser → dispatch
public sealed class NatsClient : INatsClient, IDisposable
{
private readonly Socket _socket;
private readonly Stream _stream; // plain NetworkStream or TlsConnectionWrapper
private readonly NatsParser _parser;
private readonly Channel<ReadOnlyMemory<byte>> _outbound = Channel.CreateBounded<ReadOnlyMemory<byte>>(
new BoundedChannelOptions(8192) { SingleReader = true, FullMode = BoundedChannelFullMode.Wait });
private long _pendingBytes; // bytes queued but not yet written
private readonly ClientFlagHolder _flags = new(); // ConnectReceived, TraceMode, etc.
private readonly Dictionary<string, Subscription> _subs = new();
public ulong Id { get; }
public ClientKind Kind { get; } // CLIENT, ROUTER, LEAF, SYSTEM
public Account? Account { get; private set; }
}
```
`FillPipeAsync` reads from the `NetworkStream` into a `PipeWriter` in 4,096-byte chunks. `ProcessCommandsAsync` reads from the `PipeReader`, calls `NatsParser.TryParse` in a loop, and dispatches each `ParsedCommand`. The tasks share a `Pipe` instance from `System.IO.Pipelines`. Either task completing (EOF, cancellation, or error) causes `RunAsync` to return, which triggers cleanup via `Router.RemoveClient(this)`.
Write serialization uses a `SemaphoreSlim(1,1)` (`_writeLock`). All outbound writes (`SendMessageAsync`, `WriteAsync`) acquire this lock before touching the `NetworkStream`, preventing interleaved writes from concurrent message deliveries.
Write serialization uses a bounded `Channel<ReadOnlyMemory<byte>>(8192)` (`_outbound`). All outbound message deliveries enqueue a pre-encoded frame into this channel. A dedicated write loop drains the channel sequentially, preventing interleaved writes from concurrent message deliveries. A `_pendingBytes` counter tracks bytes queued but not yet written, enabling slow-consumer detection and back-pressure enforcement.
Subscription state is a `Dictionary<string, Subscription>` keyed by SID. This dictionary is accessed only from the single processing task, so no locking is needed. `SUB` inserts into this dictionary and into `SubList`; `UNSUB` either sets `MaxMessages` for auto-unsubscribe or immediately removes from both.
@@ -101,27 +131,46 @@ public interface ISubListAccess
### NatsServer
`NatsServer` (`NatsServer.cs`) owns the TCP listener, the shared `SubList`, and the client registry. Its `StartAsync` method runs the accept loop:
`NatsServer` (`NatsServer.cs`) owns the TCP listener, the shared `SubList`, and the client registry. Each accepted connection gets a unique `clientId` (incremented via `Interlocked.Increment`), a scoped logger, and a `NatsClient` instance registered in `_clients`. `RunClientAsync` is fired as a detached task — the accept loop does not await it.
Key fields:
```csharp
public async Task StartAsync(CancellationToken ct)
public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
{
_listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
_listener.Bind(new IPEndPoint(
_options.Host == "0.0.0.0" ? IPAddress.Any : IPAddress.Parse(_options.Host),
_options.Port));
_listener.Listen(128);
// ...
while (!ct.IsCancellationRequested)
{
var socket = await _listener.AcceptAsync(ct);
// create NatsClient, fire-and-forget RunClientAsync
}
// Client registry
private readonly ConcurrentDictionary<ulong, NatsClient> _clients = new();
private readonly ConcurrentQueue<ClosedClient> _closedClients = new();
private ulong _nextClientId;
private int _activeClientCount;
// Account system
private readonly ConcurrentDictionary<string, Account> _accounts = new(StringComparer.Ordinal);
private readonly Account _globalAccount;
private readonly Account _systemAccount;
private AuthService _authService;
// Subsystem managers
private readonly RouteManager? _routeManager;
private readonly GatewayManager? _gatewayManager;
private readonly LeafNodeManager? _leafNodeManager;
private readonly JetStreamService? _jetStreamService;
private MonitorServer? _monitorServer;
// TLS / transport
private readonly SslServerAuthenticationOptions? _sslOptions;
private readonly TlsRateLimiter? _tlsRateLimiter;
private Socket? _listener;
private Socket? _wsListener;
// Shutdown coordination
private readonly CancellationTokenSource _quitCts = new();
private readonly TaskCompletionSource _shutdownComplete = new(TaskCreationOptions.RunContinuationsAsynchronously);
private int _shutdown;
private int _lameDuck;
}
```
Each accepted connection gets a unique `clientId` (incremented via `Interlocked.Increment`), a scoped logger, and a `NatsClient` instance registered in `_clients` (`ConcurrentDictionary<ulong, NatsClient>`). `RunClientAsync` is fired as a detached task — the accept loop does not await it.
Message delivery happens in `ProcessMessage`:
1. Call `_subList.Match(subject)` to get a `SubListResult`.
@@ -152,7 +201,7 @@ Client sends: PUB orders.new 12\r\nhello world\r\n
→ DeliverMessage(sub2, ...) → sub2.Client.SendMessageAsync(...)
→ round-robin pick from [sub3, sub4], e.g. sub3
→ DeliverMessage(sub3, ...) → sub3.Client.SendMessageAsync(...)
7. SendMessageAsync acquires _writeLock, writes MSG frame to socket
7. SendMessageAsync enqueues encoded MSG frame into _outbound channel; write loop drains to socket
```
---
@@ -165,7 +214,16 @@ Client sends: PUB orders.new 12\r\nhello world\r\n
| `server/parser.go` | `src/NATS.Server/Protocol/NatsParser.cs` |
| `server/client.go` | `src/NATS.Server/NatsClient.cs` |
| `server/server.go` | `src/NATS.Server/NatsServer.cs` |
| `server/opts.go` | `src/NATS.Server/NatsOptions.cs` |
| `server/opts.go` | `src/NATS.Server/NatsOptions.cs` + `src/NATS.Server/Configuration/` |
| `server/auth.go` | `src/NATS.Server/Auth/AuthService.cs` |
| `server/route.go` | `src/NATS.Server/Routes/RouteManager.cs` |
| `server/gateway.go` | `src/NATS.Server/Gateways/GatewayManager.cs` |
| `server/leafnode.go` | `src/NATS.Server/LeafNodes/LeafNodeManager.cs` |
| `server/jetstream.go` | `src/NATS.Server/JetStream/JetStreamService.cs` |
| `server/stream.go` | `src/NATS.Server/JetStream/StreamManager.cs` (via `JetStreamService`) |
| `server/consumer.go` | `src/NATS.Server/JetStream/ConsumerManager.cs` |
| `server/raft.go` | `src/NATS.Server/Raft/RaftNode.cs` |
| `server/monitor.go` | `src/NATS.Server/Monitoring/MonitorServer.cs` |
The Go `sublist.go` uses atomic generation counters to invalidate a result cache. The .NET `SubList` uses a different strategy: it maintains the cache under `ReaderWriterLockSlim` and does targeted invalidation at insert/remove time, avoiding the need for generation counters.
@@ -180,7 +238,7 @@ The Go `client.go` uses goroutines for `readLoop` and `writeLoop`. The .NET equi
| I/O buffering | `System.IO.Pipelines` (`Pipe`, `PipeReader`, `PipeWriter`) | Zero-copy buffer management; backpressure built in |
| SubList thread safety | `ReaderWriterLockSlim` | Multiple concurrent readers (match), exclusive writers (insert/remove) |
| Client registry | `ConcurrentDictionary<ulong, NatsClient>` | Lock-free concurrent access from accept loop and cleanup tasks |
| Write serialization | `SemaphoreSlim(1,1)` per client | Prevents interleaved MSG frames from concurrent deliveries |
| Write serialization | `Channel<ReadOnlyMemory<byte>>(8192)` bounded queue per client with `_pendingBytes` slow-consumer tracking | Sequential drain by a single writer task prevents interleaved MSG frames; bounded capacity enables back-pressure |
| Concurrency | `async/await` + `Task` | Maps Go goroutines to .NET task-based async; no dedicated threads per connection |
| Protocol constants | `NatsProtocol` static class | Pre-encoded byte arrays (`PongBytes`, `CrLf`, etc.) avoid per-call allocations |
@@ -194,4 +252,4 @@ The Go `client.go` uses goroutines for `readLoop` and `writeLoop`. The .NET equi
- [Server Overview](../Server/Overview.md)
- [Configuration Overview](../Configuration/Overview.md)
<!-- Last verified against codebase: 2026-02-22 -->
<!-- Last verified against codebase: 2026-02-23 -->

View File

@@ -104,7 +104,7 @@ dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~SubListTests"
| Package | Version | Purpose |
|---------|---------|---------|
| `xunit` | 2.9.3 | Test framework |
| `xunit` (xUnit 3) | 2.9.3 | Test framework |
| `xunit.runner.visualstudio` | 3.1.4 | VS/Rider test runner integration |
| `Shouldly` | 4.3.0 | Assertion library |
| `NSubstitute` | 5.3.0 | Mocking |
@@ -115,14 +115,7 @@ Do not use FluentAssertions or Moq — the project uses Shouldly and NSubstitute
### Test Files
| File | Covers |
|------|--------|
| `ParserTests.cs` | `NatsParser.TryParse` for each command type |
| `SubjectMatchTests.cs` | `SubjectMatch` validation and wildcard matching |
| `SubListTests.cs` | `SubList` trie insert, remove, match, and cache behaviour |
| `ClientTests.cs` | `NatsClient` command dispatch and subscription tracking |
| `ServerTests.cs` | `NatsServer` pub/sub, wildcards, queue groups |
| `IntegrationTests.cs` | End-to-end tests using `NATS.Client.Core` against a live server |
The test project contains 100 test files organised by subsystem. Authentication and TLS tests cover token, username/password, NKey, JWT, and OCSP authenticators, account isolation, client permissions, TLS connection wrapping, and TLS rate limiting (`AuthProtocolTests.cs`, `AuthServiceTests.cs`, `AuthIntegrationTests.cs`, `AccountIsolationTests.cs`, `NKeyAuthenticatorTests.cs`, `JwtAuthenticatorTests.cs`, `TlsServerTests.cs`, `TlsHelperTests.cs`, `TlsConnectionWrapperTests.cs`, `TlsMapAuthenticatorTests.cs`, `TlsRateLimiterTests.cs`, and related files). JetStream and RAFT tests cover stream and consumer APIs, publish, pull and push consumers, ack and redelivery, retention policies, mirror/source replication, cluster reload, JWT limits, RAFT election, replication, and snapshot catchup (`JetStreamStreamApiTests.cs`, `JetStreamConsumerApiTests.cs`, `JetStreamPublishTests.cs`, `JetStreamPullConsumerTests.cs`, `JetStreamPushConsumerTests.cs`, `RaftElectionTests.cs`, `RaftReplicationTests.cs`, `RaftSnapshotCatchupTests.cs`, and related files). Clustering and routing tests cover route handshake, subscription propagation, gateway and leaf node bootstrap, cluster JetStream config, and response routing (`RouteHandshakeTests.cs`, `RouteSubscriptionPropagationTests.cs`, `GatewayLeafBootstrapTests.cs`, `ResponseRoutingTests.cs`). Monitoring and configuration tests cover config file parsing and reloading, options processing, monitoring endpoints, account stats, server stats, and subject-transform config (`MonitorTests.cs`, `NatsConfParserTests.cs`, `ConfigReloadTests.cs`, `ConfigProcessorTests.cs`, `SubjectTransformTests.cs`, and related files). WebSocket tests cover frame read/write, compression, upgrade handshake, origin checking, and integration (`WebSocket/WsFrameReadTests.cs`, `WebSocket/WsFrameWriterTests.cs`, `WebSocket/WsCompressionTests.cs`, `WebSocket/WsUpgradeTests.cs`, `WebSocket/WsIntegrationTests.cs`, and related files). Protocol and parser tests cover `NatsParser.TryParse` for each command type, header parsing, subject matching, and the `SubList` trie (`ParserTests.cs`, `NatsHeaderParserTests.cs`, `SubjectMatchTests.cs`, `SubListTests.cs`). Client lifecycle tests cover command dispatch, subscription tracking, write loop, verbose mode, no-responders, trace mode, client flags, and closed-reason handling (`ClientTests.cs`, `WriteLoopTests.cs`, `VerboseModeTests.cs`, `ClientFlagsTests.cs`, `ClientClosedReasonTests.cs`, and related files). Integration tests run end-to-end scenarios against a live server instance using `NATS.Client.Core` (`IntegrationTests.cs`, `AuthIntegrationTests.cs`, `NKeyIntegrationTests.cs`, `PermissionIntegrationTests.cs`, `SubjectTransformIntegrationTests.cs`, `ConfigIntegrationTests.cs`, `WebSocket/WsIntegrationTests.cs`).
---
@@ -180,4 +173,4 @@ To adjust log levels at runtime, modify the `LoggerConfiguration` in `Program.cs
- [Protocol Overview](../Protocol/Overview.md)
- [Server Overview](../Server/Overview.md)
<!-- Last verified against codebase: 2026-02-22 -->
<!-- Last verified against codebase: 2026-02-23 -->

View File

@@ -0,0 +1,463 @@
# JetStream Overview
JetStream is the persistence layer of NATS. Clients publish to subjects that match a configured stream; the server stores those messages and delivers them to consumers on demand (pull) or proactively (push). This document describes the current .NET implementation: what is built, how the pieces connect, and where it falls short of the Go reference.
---
## Architecture
### Component diagram
```
NATS PUB message
JetStreamPublisher.TryCapture()
│ duplicate check (PublishPreconditions)
│ subject → stream lookup (StreamManager.FindBySubject)
StreamManager.Capture()
├── StreamReplicaGroup.ProposeAsync() ← in-process RAFT only
├── IStreamStore.AppendAsync() ← MemStore or FileStore
├── EnforceLimits() ← MaxMsgs trim
└── ReplicateIfConfigured()
├── MirrorCoordinator.OnOriginAppendAsync() ← in-process only
└── SourceCoordinator.OnOriginAppendAsync() ← in-process only
$JS.API.* request
JetStreamApiRouter.Route()
├── $JS.API.STREAM.CREATE.* → StreamApiHandlers.HandleCreate() → StreamManager.CreateOrUpdate()
├── $JS.API.STREAM.INFO.* → StreamApiHandlers.HandleInfo() → StreamManager.GetInfo()
├── $JS.API.CONSUMER.CREATE.* → ConsumerApiHandlers.HandleCreate() → ConsumerManager.CreateOrUpdate()
├── $JS.API.CONSUMER.INFO.* → ConsumerApiHandlers.HandleInfo() → ConsumerManager.GetInfo()
└── anything else → JetStreamApiResponse.NotFound()
Consumer delivery
├── Pull: ConsumerManager.FetchAsync() → PullConsumerEngine.FetchAsync() → IStreamStore.LoadAsync()
└── Push: ConsumerManager.OnPublished() → PushConsumerEngine.Enqueue() → ConsumerHandle.PushFrames (queue)
```
### API dispatch
`JetStreamApiRouter.Route` is the single entry point for all `$JS.API.*` requests. It dispatches by prefix matching on the subject string:
```csharp
// JetStreamApiRouter.cs
public JetStreamApiResponse Route(string subject, ReadOnlySpan<byte> payload)
{
if (subject.StartsWith("$JS.API.STREAM.CREATE.", StringComparison.Ordinal))
return StreamApiHandlers.HandleCreate(subject, payload, _streamManager);
if (subject.StartsWith("$JS.API.STREAM.INFO.", StringComparison.Ordinal))
return StreamApiHandlers.HandleInfo(subject, _streamManager);
if (subject.StartsWith("$JS.API.CONSUMER.CREATE.", StringComparison.Ordinal))
return ConsumerApiHandlers.HandleCreate(subject, payload, _consumerManager);
if (subject.StartsWith("$JS.API.CONSUMER.INFO.", StringComparison.Ordinal))
return ConsumerApiHandlers.HandleInfo(subject, _consumerManager);
return JetStreamApiResponse.NotFound(subject);
}
```
The stream or consumer name is the trailing token after the fixed prefix — e.g., `$JS.API.STREAM.CREATE.ORDERS` creates a stream named `ORDERS`.
---
## API Surface
The following `$JS.API.*` subjects are handled. Every other subject returns a not-found error response.
| Subject prefix | Handler | Description |
|---|---|---|
| `$JS.API.STREAM.CREATE.<name>` | `StreamApiHandlers.HandleCreate` | Create or update a stream |
| `$JS.API.STREAM.INFO.<name>` | `StreamApiHandlers.HandleInfo` | Get stream info and state |
| `$JS.API.CONSUMER.CREATE.<stream>.<name>` | `ConsumerApiHandlers.HandleCreate` | Create or update a durable consumer |
| `$JS.API.CONSUMER.INFO.<stream>.<name>` | `ConsumerApiHandlers.HandleInfo` | Get consumer info |
Subjects such as `$JS.API.STREAM.LIST`, `$JS.API.STREAM.DELETE`, `$JS.API.CONSUMER.LIST`, `$JS.API.CONSUMER.DELETE`, and `$JS.API.CONSUMER.PAUSE` are not handled and return not-found.
---
## Streams
### StreamConfig fields
`StreamConfig` (`src/NATS.Server/JetStream/Models/StreamConfig.cs`) defines what the server stores for a stream:
| Field | Type | Default | Description |
|---|---|---|---|
| `Name` | `string` | `""` | Stream name. Required; rejected if empty or whitespace. |
| `Subjects` | `List<string>` | `[]` | Subject filter patterns. Messages published to matching subjects are captured. |
| `MaxMsgs` | `int` | `0` | Maximum number of messages to retain. `0` means unlimited. Enforced by trimming oldest messages after each append. |
| `Replicas` | `int` | `1` | Number of in-process RAFT nodes to create for this stream. Has no network effect. |
| `Mirror` | `string?` | `null` | Name of another stream in the same `StreamManager` to mirror from. In-process only. |
| `Source` | `string?` | `null` | Name of another stream in the same `StreamManager` to source from. In-process only. |
The Go reference supports many additional fields: `RetentionPolicy`, `Storage`, `MaxBytes`, `MaxAge`, `MaxMsgSize`, `Discard`, `DuplicateWindow`, `Placement`, `SubjectTransform`, and more. None of these are present in this implementation.
### Subject matching and capture
`StreamManager.FindBySubject` scans all registered streams and uses `SubjectMatch.MatchLiteral` to find the first stream whose `Subjects` list matches the incoming publish subject. `StreamManager.Capture` then appends the message to that stream's store:
```csharp
// StreamManager.cs
public PubAck? Capture(string subject, ReadOnlyMemory<byte> payload)
{
var stream = FindBySubject(subject);
if (stream == null)
return null;
if (_replicaGroups.TryGetValue(stream.Config.Name, out var replicaGroup))
_ = replicaGroup.ProposeAsync($"PUB {subject}", default).GetAwaiter().GetResult();
var seq = stream.Store.AppendAsync(subject, payload, default).GetAwaiter().GetResult();
EnforceLimits(stream);
var stored = stream.Store.LoadAsync(seq, default).GetAwaiter().GetResult();
if (stored != null)
ReplicateIfConfigured(stream.Config.Name, stored);
return new PubAck { Stream = stream.Config.Name, Seq = seq };
}
```
`EnforceLimits` trims the store to `MaxMsgs` after each append, calling `TrimToMaxMessages` on `MemStore` or `FileStore`. No other limit types (`MaxBytes`, `MaxAge`) are enforced.
---
## Consumers
### ConsumerConfig fields
`ConsumerConfig` (`src/NATS.Server/JetStream/Models/ConsumerConfig.cs`) defines consumer behavior:
| Field | Type | Default | Description |
|---|---|---|---|
| `DurableName` | `string` | `""` | Consumer name. Required; rejected if empty or whitespace. |
| `FilterSubject` | `string?` | `null` | Subject filter. Stored but not applied during fetch — all messages in the stream are delivered regardless. |
| `AckPolicy` | `AckPolicy` | `None` | `None` (no ack tracking) or `Explicit` (pending ack tracking with redelivery). |
| `AckWaitMs` | `int` | `30000` | Milliseconds before an unacknowledged message is considered expired and eligible for redelivery. |
| `MaxDeliver` | `int` | `1` | Stored but not enforced — redelivery count is not capped. |
| `Push` | `bool` | `false` | If `true`, the consumer receives messages via `PushConsumerEngine` on publish. |
| `HeartbeatMs` | `int` | `0` | If positive, a heartbeat `PushFrame` is enqueued after each data frame. Not transmitted over the wire. |
The Go reference supports additional fields: `DeliverSubject`, `DeliverGroup`, `DeliverPolicy`, `OptStartSeq`, `OptStartTime`, `ReplayPolicy`, `FlowControl`, `IdleHeartbeat`, `HeadersOnly`, `MaxWaiting`, `MaxAckPending`, `BackOff`, priority groups, and pause. None are present here.
### Pull delivery
`PullConsumerEngine.FetchAsync` reads up to `batch` messages starting from `consumer.NextSequence`. With `AckPolicy.Explicit`, it first checks `AckProcessor.NextExpired()` and redelivers one expired message before advancing the cursor:
```csharp
// PullConsumerEngine.cs
public async ValueTask<PullFetchBatch> FetchAsync(
StreamHandle stream, ConsumerHandle consumer, int batch, CancellationToken ct)
{
var messages = new List<StoredMessage>(batch);
if (consumer.Config.AckPolicy == AckPolicy.Explicit)
{
var expired = consumer.AckProcessor.NextExpired();
if (expired is { } expiredSequence)
{
var redelivery = await stream.Store.LoadAsync(expiredSequence, ct);
if (redelivery != null)
messages.Add(new StoredMessage { /* ... Redelivered = true */ });
return new PullFetchBatch(messages);
}
if (consumer.AckProcessor.HasPending)
return new PullFetchBatch(messages);
}
var sequence = consumer.NextSequence;
for (var i = 0; i < batch; i++)
{
var message = await stream.Store.LoadAsync(sequence, ct);
if (message == null) break;
messages.Add(message);
if (consumer.Config.AckPolicy == AckPolicy.Explicit)
consumer.AckProcessor.Register(message.Sequence, consumer.Config.AckWaitMs);
sequence++;
}
consumer.NextSequence = sequence;
return new PullFetchBatch(messages);
}
```
The fetch blocks on pending acks: if any messages are registered but not yet acknowledged, no new messages are returned until either an ack is received or the deadline expires. Only one expired message is redelivered per fetch call.
### Push delivery
`PushConsumerEngine.Enqueue` places messages onto `ConsumerHandle.PushFrames`, a plain `Queue<PushFrame>`. These frames are not transmitted to any NATS subject. `ConsumerManager.ReadPushFrame` allows callers to dequeue frames in-process:
```csharp
// PushConsumerEngine.cs
public void Enqueue(ConsumerHandle consumer, StoredMessage message)
{
consumer.PushFrames.Enqueue(new PushFrame { IsData = true, Message = message });
if (consumer.Config.AckPolicy == AckPolicy.Explicit)
consumer.AckProcessor.Register(message.Sequence, consumer.Config.AckWaitMs);
if (consumer.Config.HeartbeatMs > 0)
consumer.PushFrames.Enqueue(new PushFrame { IsHeartbeat = true });
}
```
Push delivery is not wired to the NATS protocol layer. A connected NATS client subscribing to a `DeliverSubject` will not receive messages from a push consumer. The queue is only accessible through `ConsumerManager.ReadPushFrame`.
### Ack processing
`AckProcessor` is a per-consumer dictionary of sequence numbers to deadline timestamps. It is used by both `PullConsumerEngine` (to check for expired messages) and `PushConsumerEngine` (to register newly enqueued messages):
```csharp
// AckProcessor.cs
public sealed class AckProcessor
{
private readonly Dictionary<ulong, DateTime> _pending = new();
public void Register(ulong sequence, int ackWaitMs)
{
_pending[sequence] = DateTime.UtcNow.AddMilliseconds(Math.Max(ackWaitMs, 1));
}
public ulong? NextExpired()
{
foreach (var (seq, deadline) in _pending)
{
if (DateTime.UtcNow >= deadline)
return seq;
}
return null;
}
public bool HasPending => _pending.Count > 0;
}
```
Expiry detection is lazy — `NextExpired()` is only called from `PullConsumerEngine.FetchAsync`. There is no background timer or active expiry sweep. Acknowledged messages are never removed from `_pending` because there is no `Ack(ulong sequence)` method on `AckProcessor`. This means `HasPending` is always `true` once any message has been registered, and pending acks accumulate without bound.
---
## Storage
### IStreamStore interface
```csharp
// IStreamStore.cs
public interface IStreamStore
{
ValueTask<ulong> AppendAsync(string subject, ReadOnlyMemory<byte> payload, CancellationToken ct);
ValueTask<StoredMessage?> LoadAsync(ulong sequence, CancellationToken ct);
ValueTask PurgeAsync(CancellationToken ct);
ValueTask<StreamState> GetStateAsync(CancellationToken ct);
}
```
`AppendAsync` returns the assigned sequence number. `LoadAsync` returns `null` if the sequence does not exist (trimmed or never written). The interface does not expose delete-by-sequence, range scans, or subject filtering. `TrimToMaxMessages` is implemented on the concrete types but is not part of the interface.
### MemStore
`MemStore` holds all messages in a `Dictionary<ulong, StoredMessage>` under a single `object` lock. Every operation acquires that lock synchronously:
```csharp
// MemStore.cs
public ValueTask<ulong> AppendAsync(string subject, ReadOnlyMemory<byte> payload, CancellationToken ct)
{
lock (_gate)
{
_last++;
_messages[_last] = new StoredMessage
{
Sequence = _last,
Subject = subject,
Payload = payload,
};
return ValueTask.FromResult(_last);
}
}
```
`TrimToMaxMessages` removes entries one by one starting from the minimum key, using `_messages.Keys.Min()` on each iteration — O(n) per removal. This is the default store used by `StreamManager.CreateOrUpdate`. Messages survive only for the lifetime of the process.
### FileStore
`FileStore` appends messages to a JSONL file (`messages.jsonl`) and keeps a full in-memory index (`Dictionary<ulong, StoredMessage>`) identical in structure to `MemStore`. It is not production-safe for several reasons:
- **No locking**: `AppendAsync`, `LoadAsync`, `GetStateAsync`, and `TrimToMaxMessages` are not synchronized. Concurrent access from `StreamManager.Capture` and `PullConsumerEngine.FetchAsync` is unsafe.
- **Per-write file I/O**: Each `AppendAsync` calls `File.AppendAllTextAsync`, issuing a separate file open/write/close per message.
- **Full rewrite on trim**: `TrimToMaxMessages` calls `RewriteDataFile()`, which rewrites the entire file from the in-memory index. This is O(n) in message count and blocking.
- **Full in-memory index**: The in-memory dictionary holds every undeleted message payload; there is no paging or streaming read path.
```csharp
// FileStore.cs
public void TrimToMaxMessages(ulong maxMessages)
{
while ((ulong)_messages.Count > maxMessages)
{
var first = _messages.Keys.Min();
_messages.Remove(first);
}
RewriteDataFile();
}
private void RewriteDataFile()
{
var lines = new List<string>(_messages.Count);
foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value))
{
lines.Add(JsonSerializer.Serialize(new FileRecord
{
Sequence = message.Sequence,
Subject = message.Subject,
PayloadBase64 = Convert.ToBase64String(message.Payload.ToArray()),
}));
}
File.WriteAllLines(_dataFilePath, lines);
}
```
The Go reference (`filestore.go`) uses block-based binary storage with S2 compression, per-block indexes, and memory-mapped I/O. This implementation shares none of those properties.
---
## In-Process RAFT
The RAFT implementation has no network transport. All `RaftNode` instances live in the same process, and replication is a direct in-memory method call.
### RaftNode.ProposeAsync
`ProposeAsync` requires the caller to be the leader (`Role == RaftRole.Leader`). It appends the command to the local `RaftLog`, calls `RaftReplicator.Replicate` to fan out synchronously to all peer nodes held in `_cluster`, and commits if a quorum of acknowledgements is reached:
```csharp
// RaftNode.cs
public async ValueTask<long> ProposeAsync(string command, CancellationToken ct)
{
if (Role != RaftRole.Leader)
throw new InvalidOperationException("Only leader can propose entries.");
var entry = Log.Append(TermState.CurrentTerm, command);
var followers = _cluster.Where(n => n.Id != Id).ToList();
var acknowledgements = _replicator.Replicate(entry, followers);
var quorum = (_cluster.Count / 2) + 1;
if (acknowledgements + 1 >= quorum)
{
AppliedIndex = entry.Index;
foreach (var node in _cluster)
node.AppliedIndex = Math.Max(node.AppliedIndex, entry.Index);
}
await Task.CompletedTask;
return entry.Index;
}
```
`Task.CompletedTask` is awaited unconditionally — the method is synchronous in practice. The log is not persisted; snapshots are stored via `RaftSnapshotStore` but that type's persistence behavior is not visible from `RaftNode` alone. Leader election uses `StartElection` / `GrantVote` / `ReceiveVote`, all of which are direct method calls within the same process.
### StreamReplicaGroup
`StreamReplicaGroup` creates `Math.Max(replicas, 1)` `RaftNode` instances when a stream is created and immediately elects a leader via `StartElection`:
```csharp
// StreamReplicaGroup.cs
public StreamReplicaGroup(string streamName, int replicas)
{
var nodeCount = Math.Max(replicas, 1);
_nodes = Enumerable.Range(1, nodeCount)
.Select(i => new RaftNode($"{streamName.ToLowerInvariant()}-r{i}"))
.ToList();
foreach (var node in _nodes)
node.ConfigureCluster(_nodes);
Leader = ElectLeader(_nodes[0]);
}
```
`ProposeAsync` on the group delegates to the leader node. `StepDownAsync` forces a leader change by calling `RequestStepDown()` on the current leader and electing the next node in the list. All of this is in-process; there is no coordination across server instances.
### JetStreamMetaGroup
`JetStreamMetaGroup` is a thin registry that tracks stream names and the declared cluster size. It does not use `RaftNode` internally. `ProposeCreateStreamAsync` records a stream name in a `ConcurrentDictionary` and returns immediately:
```csharp
// JetStreamMetaGroup.cs
public Task ProposeCreateStreamAsync(StreamConfig config, CancellationToken ct)
{
_streams[config.Name] = 0;
return Task.CompletedTask;
}
```
Its purpose is to provide `GetState()` — a sorted list of known stream names and the configured cluster size — for monitoring or coordination callers. It does not replicate metadata across nodes.
---
## Mirror and Source
`MirrorCoordinator` and `SourceCoordinator` are structurally identical: each holds a reference to a target `IStreamStore` and appends messages to it when notified of an origin append. Both operate entirely in-process within a single `StreamManager`:
```csharp
// MirrorCoordinator.cs
public sealed class MirrorCoordinator
{
private readonly IStreamStore _targetStore;
public MirrorCoordinator(IStreamStore targetStore) { _targetStore = targetStore; }
public Task OnOriginAppendAsync(StoredMessage message, CancellationToken ct)
=> _targetStore.AppendAsync(message.Subject, message.Payload, ct).AsTask();
}
```
`StreamManager.RebuildReplicationCoordinators` rebuilds the coordinator lists whenever a stream is created or updated. A stream configured with `Mirror = "ORDERS"` receives a copy of every message appended to `ORDERS`, but only if `ORDERS` exists in the same `StreamManager` instance. There is no subscription to an external NATS subject, no replay of historical messages on coordinator setup, and no cross-server replication.
---
## Configuration
`JetStreamOptions` (`src/NATS.Server/Configuration/JetStreamOptions.cs`) holds the configuration model for JetStream:
| Field | Type | Default | Description |
|---|---|---|---|
| `StoreDir` | `string` | `""` | Directory path for `FileStore`. Not currently used to switch the default store; `StreamManager.CreateOrUpdate` always allocates a `MemStore`. |
| `MaxMemoryStore` | `long` | `0` | Maximum bytes for in-memory storage. Not enforced. |
| `MaxFileStore` | `long` | `0` | Maximum bytes for file storage. Not enforced. |
None of the three fields currently affect runtime behavior. `StoreDir` would need to be wired into `StreamManager` to cause `FileStore` allocation. `MaxMemoryStore` and `MaxFileStore` have no enforcement path.
---
## What Is Not Implemented
The following features are present in the Go reference (`golang/nats-server/server/`) but absent from this implementation:
- **Stream delete and update**: `$JS.API.STREAM.DELETE.*` and `$JS.API.STREAM.UPDATE.*` are not handled. `CreateOrUpdate` accepts updates but there is no delete path.
- **Stream list**: `$JS.API.STREAM.LIST` and `$JS.API.STREAM.NAMES` return not-found.
- **Consumer delete, list, and pause**: `$JS.API.CONSUMER.DELETE.*`, `$JS.API.CONSUMER.LIST.*`, and `$JS.API.CONSUMER.PAUSE.*` are not handled.
- **Retention policies**: Only `MaxMsgs` trimming is enforced. `Limits`, `Interest`, and `WorkQueue` retention semantics are not implemented. `MaxBytes` and `MaxAge` are not enforced.
- **Ephemeral consumers**: `ConsumerManager.CreateOrUpdate` requires a non-empty `DurableName`. There is no support for unnamed ephemeral consumers.
- **Push delivery over the NATS wire**: Push consumers enqueue `PushFrame` objects into an in-memory queue. No MSG is written to any connected NATS client's TCP socket.
- **Consumer filter subject enforcement**: `FilterSubject` is stored on `ConsumerConfig` but is never applied in `PullConsumerEngine.FetchAsync`. All messages in the stream are returned regardless of filter.
- **FileStore production safety**: No locking, per-write file I/O, full-rewrite-on-trim, and full in-memory index make `FileStore` unsuitable for production use.
- **RAFT persistence and networking**: `RaftNode` log entries are not persisted across restarts. Replication uses direct in-process method calls; there is no network transport for multi-server consensus.
- **Cross-server replication**: Mirror and source coordinators work only within one `StreamManager` in one process. Messages published on a remote server are not replicated.
- **Duplicate message window**: `PublishPreconditions` tracks message IDs for deduplication but there is no configurable `DuplicateWindow` TTL to expire old IDs.
- **Subject transforms, placement, and mirroring policies**: None of the stream configuration fields beyond `Name`, `Subjects`, `MaxMsgs`, `Replicas`, `Mirror`, and `Source` are processed.
---
## Related Documentation
- [Server Overview](../Server/Overview.md)
- [Subscriptions Overview](../Subscriptions/Overview.md)
- [Configuration Overview](../Configuration/Overview.md)
- [Protocol Overview](../Protocol/Overview.md)
<!-- Last verified against codebase: 2026-02-23 -->

View File

@@ -0,0 +1,435 @@
# Monitoring Overview
The monitoring subsystem exposes an HTTP server that reports server state, connection details, subscription counts, and JetStream statistics. It is the .NET port of the monitoring endpoints in `golang/nats-server/server/monitor.go`.
## Enabling Monitoring
Monitoring is disabled by default. Set `MonitorPort` to a non-zero value to enable it. The standard NATS monitoring port is `8222`.
### Configuration options
| `NatsOptions` field | CLI flag | Default | Description |
|---|---|---|---|
| `MonitorPort` | `-m` / `--http_port` | `0` (disabled) | HTTP port for the monitoring server |
| `MonitorHost` | _(none)_ | `"0.0.0.0"` | Address the monitoring server binds to |
| `MonitorBasePath` | `--http_base_path` | `""` | URL prefix prepended to all endpoint paths |
| `MonitorHttpsPort` | `--https_port` | `0` (disabled) | HTTPS port (reported in `/varz`; HTTPS listener not yet implemented) |
Starting with a custom port:
```bash
dotnet run --project src/NATS.Server.Host -- -m 8222
```
With a base path (all endpoints become `/monitor/varz`, `/monitor/connz`, etc.):
```bash
dotnet run --project src/NATS.Server.Host -- -m 8222 --http_base_path /monitor
```
### MonitorServer startup
`MonitorServer` uses `WebApplication.CreateSlimBuilder` — the minimal ASP.NET Core host, without MVC or Razor, with no extra middleware. Logging providers are cleared so monitoring HTTP request logs do not appear in the NATS server's Serilog output. The actual `ILogger<MonitorServer>` logger is used only for the startup confirmation message.
```csharp
public MonitorServer(NatsServer server, NatsOptions options, ServerStats stats, ILoggerFactory loggerFactory)
{
_logger = loggerFactory.CreateLogger<MonitorServer>();
var builder = WebApplication.CreateSlimBuilder();
builder.WebHost.UseUrls($"http://{options.MonitorHost}:{options.MonitorPort}");
builder.Logging.ClearProviders();
_app = builder.Build();
var basePath = options.MonitorBasePath ?? "";
_varzHandler = new VarzHandler(server, options);
_connzHandler = new ConnzHandler(server);
_subszHandler = new SubszHandler(server);
_jszHandler = new JszHandler(server, options);
// ... endpoint registration follows
}
public async Task StartAsync(CancellationToken ct)
{
await _app.StartAsync(ct);
_logger.LogInformation("Monitoring listening on {Urls}", string.Join(", ", _app.Urls));
}
```
`MonitorServer` is `IAsyncDisposable`. `DisposeAsync` stops the web application and disposes the `VarzHandler` (which holds a `SemaphoreSlim`).
## Architecture
### Endpoint-to-handler mapping
| Path | Handler | Status |
|---|---|---|
| `GET /` | Inline lambda | Implemented |
| `GET /healthz` | Inline lambda | Implemented |
| `GET /varz` | `VarzHandler.HandleVarzAsync` | Implemented |
| `GET /connz` | `ConnzHandler.HandleConnz` | Implemented |
| `GET /subz` | `SubszHandler.HandleSubsz` | Implemented |
| `GET /subscriptionsz` | `SubszHandler.HandleSubsz` | Implemented (alias for `/subz`) |
| `GET /jsz` | `JszHandler.Build` | Implemented (summary only) |
| `GET /routez` | Inline lambda | Stub — returns `{}` |
| `GET /gatewayz` | Inline lambda | Stub — returns `{}` |
| `GET /leafz` | Inline lambda | Stub — returns `{}` |
| `GET /accountz` | Inline lambda | Stub — returns `{}` |
| `GET /accstatz` | Inline lambda | Stub — returns `{}` |
All endpoints are registered with `MonitorBasePath` prepended when set.
### Request counting
Every endpoint increments `ServerStats.HttpReqStats` — a `ConcurrentDictionary<string, long>` — using `AddOrUpdate`. The path string (e.g., `"/varz"`) is the key. These counts are included in `/varz` responses as the `http_req_stats` field, allowing external tooling to track monitoring traffic over time.
```csharp
// ServerStats.cs
public readonly ConcurrentDictionary<string, long> HttpReqStats = new();
// MonitorServer.cs — pattern used for every endpoint
stats.HttpReqStats.AddOrUpdate("/varz", 1, (_, v) => v + 1);
```
## Endpoints
### `GET /`
Returns a JSON object listing the available endpoint paths. The list is static and does not reflect which endpoints are currently implemented.
```json
{
"endpoints": [
"/varz", "/connz", "/healthz", "/routez",
"/gatewayz", "/leafz", "/subz", "/accountz", "/jsz"
]
}
```
### `GET /healthz`
Returns HTTP 200 with the plain text body `"ok"`. This is a liveness probe: if the monitoring HTTP server responds, the process is alive. It does not check message delivery, subscription state, or JetStream health.
### `GET /varz`
Returns a `Varz` JSON object containing server identity, configuration limits, runtime metrics, and traffic counters. The response is built by `VarzHandler.HandleVarzAsync`, which holds a `SemaphoreSlim` (`_varzMu`) to serialize concurrent requests.
#### CPU sampling
CPU usage is calculated by comparing `Process.TotalProcessorTime` samples. Results are cached for one second; requests within that window return the previous sample.
```csharp
// VarzHandler.cs
if ((now - _lastCpuSampleTime).TotalSeconds >= 1.0)
{
var currentCpu = proc.TotalProcessorTime;
var elapsed = now - _lastCpuSampleTime;
_cachedCpuPercent = (currentCpu - _lastCpuUsage).TotalMilliseconds
/ elapsed.TotalMilliseconds / Environment.ProcessorCount * 100.0;
_lastCpuSampleTime = now;
_lastCpuUsage = currentCpu;
}
```
The value is divided by `Environment.ProcessorCount` to produce a per-core percentage and then rounded to two decimal places.
#### TLS certificate expiry
When `HasTls` is true and `TlsCert` is set, the handler loads the certificate file with `X509CertificateLoader.LoadCertificateFromFile` and reads `NotAfter`. Load failures are silently swallowed; the field defaults to `DateTime.MinValue` in that case.
#### Field reference
**Identity**
| JSON key | C# property | Description |
|---|---|---|
| `server_id` | `Id` | 20-char uppercase alphanumeric server ID |
| `server_name` | `Name` | Server name from options or generated default |
| `version` | `Version` | Protocol version string |
| `proto` | `Proto` | Protocol version integer |
| `go` | `GoVersion` | Reports `"dotnet {RuntimeInformation.FrameworkDescription}"` |
| `host` | `Host` | Bound client host |
| `port` | `Port` | Bound client port |
| `git_commit` | `GitCommit` | Always empty in this port |
**Network**
| JSON key | C# property | Description |
|---|---|---|
| `ip` | `Ip` | Resolved IP (empty if not set) |
| `connect_urls` | `ConnectUrls` | Advertised client URLs |
| `ws_connect_urls` | `WsConnectUrls` | Advertised WebSocket URLs |
| `http_host` | `HttpHost` | Monitoring bind host |
| `http_port` | `HttpPort` | Monitoring HTTP port |
| `http_base_path` | `HttpBasePath` | Monitoring base path |
| `https_port` | `HttpsPort` | Monitoring HTTPS port |
**Security**
| JSON key | C# property | Description |
|---|---|---|
| `auth_required` | `AuthRequired` | Whether auth is required |
| `tls_required` | `TlsRequired` | `HasTls && !AllowNonTls` |
| `tls_verify` | `TlsVerify` | Client certificate verification |
| `tls_ocsp_peer_verify` | `TlsOcspPeerVerify` | OCSP peer verification |
| `auth_timeout` | `AuthTimeout` | Auth timeout in seconds |
| `tls_timeout` | `TlsTimeout` | TLS handshake timeout in seconds |
| `tls_cert_not_after` | `TlsCertNotAfter` | TLS certificate expiry date |
**Limits**
| JSON key | C# property | Description |
|---|---|---|
| `max_connections` | `MaxConnections` | Max simultaneous connections |
| `max_subscriptions` | `MaxSubscriptions` | Max subscriptions (0 = unlimited) |
| `max_payload` | `MaxPayload` | Max message payload in bytes |
| `max_pending` | `MaxPending` | Max pending bytes per client |
| `max_control_line` | `MaxControlLine` | Max control line length in bytes |
| `ping_max` | `MaxPingsOut` | Max outstanding pings before disconnect |
**Timing**
| JSON key | C# property | Type | Description |
|---|---|---|---|
| `ping_interval` | `PingInterval` | `long` (nanoseconds) | Ping send interval |
| `write_deadline` | `WriteDeadline` | `long` (nanoseconds) | Write deadline |
| `start` | `Start` | `DateTime` | Server start time |
| `now` | `Now` | `DateTime` | Time of this response |
| `uptime` | `Uptime` | `string` | Human-readable uptime (e.g., `"2d4h30m10s"`) |
| `config_load_time` | `ConfigLoadTime` | `DateTime` | Currently set to server start time |
**Runtime**
| JSON key | C# property | Description |
|---|---|---|
| `mem` | `Mem` | Process working set in bytes |
| `cpu` | `Cpu` | CPU usage percentage (1-second cache) |
| `cores` | `Cores` | `Environment.ProcessorCount` |
| `gomaxprocs` | `MaxProcs` | `ThreadPool.ThreadCount` |
**Traffic and connections**
| JSON key | C# property | Description |
|---|---|---|
| `connections` | `Connections` | Current open client count |
| `total_connections` | `TotalConnections` | Cumulative connections since start |
| `routes` | `Routes` | Current cluster route count |
| `remotes` | `Remotes` | Remote cluster count |
| `leafnodes` | `Leafnodes` | Leaf node count |
| `in_msgs` | `InMsgs` | Total messages received |
| `out_msgs` | `OutMsgs` | Total messages sent |
| `in_bytes` | `InBytes` | Total bytes received |
| `out_bytes` | `OutBytes` | Total bytes sent |
| `slow_consumers` | `SlowConsumers` | Slow consumer disconnect count |
| `slow_consumer_stats` | `SlowConsumerStats` | Breakdown by connection type |
| `stale_connections` | `StaleConnections` | Stale connection count |
| `stale_connection_stats` | `StaleConnectionStatsDetail` | Breakdown by connection type |
| `subscriptions` | `Subscriptions` | Current subscription count |
**HTTP**
| JSON key | C# property | Description |
|---|---|---|
| `http_req_stats` | `HttpReqStats` | Per-path request counts since start |
**Subsystems**
| JSON key | C# property | Type |
|---|---|---|
| `cluster` | `Cluster` | `ClusterOptsVarz` |
| `gateway` | `Gateway` | `GatewayOptsVarz` |
| `leaf` | `Leaf` | `LeafNodeOptsVarz` |
| `mqtt` | `Mqtt` | `MqttOptsVarz` |
| `websocket` | `Websocket` | `WebsocketOptsVarz` |
| `jetstream` | `JetStream` | `JetStreamVarz` |
The `JetStreamVarz` object contains a `config` object (`JetStreamConfig`) with `max_memory`, `max_storage`, and `store_dir`, and a `stats` object (`JetStreamStats`) with `accounts`, `ha_assets`, `streams`, `consumers`, and an `api` sub-object with `total` and `errors`.
### `GET /connz`
Returns a `Connz` JSON object with a paged list of connection details. Handled by `ConnzHandler.HandleConnz`.
#### Query parameters
| Parameter | Values | Default | Description |
|---|---|---|---|
| `sort` | `cid`, `start`, `subs`, `pending`, `msgs_to`, `msgs_from`, `bytes_to`, `bytes_from`, `last`, `idle`, `uptime`, `rtt`, `stop`, `reason` | `cid` | Sort order; `stop` and `reason` are silently coerced to `cid` when `state=open` |
| `subs` | `true`, `1`, `detail` | _(omitted)_ | Include subscription list; `detail` adds per-subscription message counts and queue group names |
| `state` | `open`, `closed`, `all` | `open` | Which connections to include |
| `offset` | integer | `0` | Pagination offset |
| `limit` | integer | `1024` | Max connections per response |
| `mqtt_client` | string | _(omitted)_ | Filter to a specific MQTT client ID |
#### Response shape
```json
{
"server_id": "NABCDEFGHIJ1234567890",
"now": "2026-02-23T12:00:00Z",
"num_connections": 2,
"total": 2,
"offset": 0,
"limit": 1024,
"connections": [
{
"cid": 1,
"kind": "Client",
"type": "Client",
"ip": "127.0.0.1",
"port": 52100,
"start": "2026-02-23T11:55:00Z",
"last_activity": "2026-02-23T11:59:50Z",
"uptime": "5m0s",
"idle": "10s",
"pending_bytes": 0,
"in_msgs": 100,
"out_msgs": 50,
"in_bytes": 4096,
"out_bytes": 2048,
"subscriptions": 3,
"name": "my-client",
"lang": "go",
"version": "1.20.0",
"rtt": "1.234ms"
}
]
}
```
When `subs=true`, `ConnInfo` includes `subscriptions_list: string[]`. When `subs=detail`, it includes `subscriptions_list_detail: SubDetail[]` where each entry has `subject`, `qgroup`, `sid`, `msgs`, `max`, and `cid`.
#### Closed connection tracking
`NatsServer` maintains a bounded ring buffer of `ClosedClient` records (capacity set by `NatsOptions.MaxClosedClients`, default `10_000`). When a client disconnects, a `ClosedClient` record is captured with the final counters, timestamps, and disconnect reason. These records are included when `state=closed` or `state=all`.
`ClosedClient` is a `sealed record` with `init`-only properties:
```csharp
public sealed record ClosedClient
{
public required ulong Cid { get; init; }
public string Ip { get; init; } = "";
public int Port { get; init; }
public DateTime Start { get; init; }
public DateTime Stop { get; init; }
public string Reason { get; init; } = "";
public long InMsgs { get; init; }
public long OutMsgs { get; init; }
// ... additional fields
}
```
### `GET /subz` and `GET /subscriptionsz`
Both paths are handled by `SubszHandler.HandleSubsz`. Returns a `Subsz` JSON object with subscription counts and an optional subscription listing.
#### Query parameters
| Parameter | Values | Default | Description |
|---|---|---|---|
| `subs` | `true`, `1`, `detail` | _(omitted)_ | Include individual subscription records |
| `offset` | integer | `0` | Pagination offset into the subscription list |
| `limit` | integer | `1024` | Max subscriptions returned |
| `acc` | account name | _(omitted)_ | Restrict results to a single account |
| `test` | subject literal | _(omitted)_ | Filter to subscriptions that match this literal subject |
#### `$SYS` account exclusion
When `acc` is not specified, the `$SYS` system account is excluded from results. Its subscriptions are internal infrastructure (server event routing) and are not user-visible. To inspect `$SYS` subscriptions explicitly, pass `acc=$SYS`.
```csharp
// SubszHandler.cs
if (string.IsNullOrEmpty(opts.Account) && account.Name == "$SYS")
continue;
```
#### Cache fields
`num_cache` in the response is the sum of `SubList.CacheCount` across all included accounts. This reflects the number of cached `Match()` results currently held in the subscription trie. It is informational — a high cache count is normal and expected after traffic warms up the cache.
#### Response shape
```json
{
"server_id": "NABCDEFGHIJ1234567890",
"now": "2026-02-23T12:00:00Z",
"num_subscriptions": 42,
"num_cache": 18,
"total": 42,
"offset": 0,
"limit": 1024,
"subscriptions": []
}
```
When `subs=true` or `subs=1`, the `subscriptions` array is populated with `SubDetail` objects:
```json
{
"subject": "orders.>",
"qgroup": "",
"sid": "1",
"msgs": 500,
"max": 0,
"cid": 3
}
```
### `GET /jsz`
Returns a `JszResponse` JSON object built by `JszHandler.Build`. Reports whether JetStream is enabled and summarises stream and consumer counts.
```json
{
"server_id": "NABCDEFGHIJ1234567890",
"now": "2026-02-23T12:00:00Z",
"enabled": true,
"memory": 0,
"storage": 0,
"streams": 5,
"consumers": 12,
"config": {
"max_memory": 1073741824,
"max_storage": 10737418240,
"store_dir": "/var/nats/jetstream"
}
}
```
`memory` and `storage` are always `0` in the current implementation — per-stream byte accounting is not yet wired up. `streams` and `consumers` reflect live counts from `NatsServer.JetStreamStreams` and `NatsServer.JetStreamConsumers`.
For full JetStream documentation see [JetStream](../JetStream/Overview.md) (when available).
### Stub endpoints
The following endpoints exist and respond with HTTP 200 and an empty JSON object (`{}`). They increment `HttpReqStats` but return no data. They are placeholders for future implementation once the corresponding subsystems are ported.
| Endpoint | Subsystem |
|---|---|
| `/routez` | Cluster routes |
| `/gatewayz` | Gateways |
| `/leafz` | Leaf nodes |
| `/accountz` | Account listing |
| `/accstatz` | Per-account statistics |
## Go Compatibility
The JSON shapes are designed to match the Go server's monitoring responses so that existing NATS tooling (e.g., `nats-top`, Prometheus exporters, Grafana dashboards) works without modification.
Known differences from the Go server:
- The `go` field in `/varz` reports the .NET runtime description (e.g., `"dotnet .NET 10.0.0"`) rather than a Go version string. Tools that parse this field for display only are unaffected; tools that parse it to gate on runtime type will see a different value.
- `/varz` `config_load_time` is currently set to server start time rather than the time the configuration file was last loaded.
- `/varz` `mem` reports `Process.WorkingSet64` (the OS working set). The Go server reports heap allocation via `runtime.MemStats.HeapInuse`. The values are comparable in meaning but not identical.
- `/varz` `gomaxprocs` is mapped to `ThreadPool.ThreadCount`. The Go field represents the goroutine parallelism limit (`GOMAXPROCS`); the .NET value represents the current thread pool size, which is a reasonable equivalent.
- `/jsz` `memory` and `storage` are always `0`. The Go server reports actual byte usage per stream.
- `/routez`, `/gatewayz`, `/leafz`, `/accountz`, `/accstatz` return `{}` instead of structured data.
## Related Documentation
- [Configuration Overview](../Configuration/Overview.md)
- [Server Overview](../Server/Overview.md)
- [Subscriptions Overview](../Subscriptions/Overview.md)
<!-- Last verified against codebase: 2026-02-23 -->

View File

@@ -29,64 +29,70 @@ On startup, the server logs the address it is listening on:
### Full host setup
`Program.cs` initializes Serilog, parses CLI arguments, starts the server, and handles graceful shutdown:
`Program.cs` initializes Serilog, parses CLI arguments, starts the server, and handles graceful shutdown. The startup sequence does two passes over `args`: the first scans for `-c` to load a config file as the base `NatsOptions`, and the second applies remaining CLI flags on top (CLI flags always win over the config file):
```csharp
using NATS.Server;
using Serilog;
// First pass: scan args for -c flag to get config file path
string? configFile = null;
for (int i = 0; i < args.Length; i++)
{
if (args[i] == "-c" && i + 1 < args.Length)
{
configFile = args[++i];
break;
}
}
Log.Logger = new LoggerConfiguration()
.MinimumLevel.Debug()
.Enrich.FromLogContext()
.WriteTo.Console(outputTemplate: "[{Timestamp:HH:mm:ss} {Level:u3}] {Message:lj}{NewLine}{Exception}")
.CreateLogger();
var options = new NatsOptions();
var options = configFile != null
? ConfigProcessor.ProcessConfigFile(configFile)
: new NatsOptions();
// Second pass: apply CLI args on top of config-loaded options
for (int i = 0; i < args.Length; i++)
{
switch (args[i])
{
case "-p" or "--port" when i + 1 < args.Length:
options.Port = int.Parse(args[++i]);
options.InCmdLine.Add("Port");
break;
case "-a" or "--addr" when i + 1 < args.Length:
options.Host = args[++i];
options.InCmdLine.Add("Host");
break;
case "-n" or "--name" when i + 1 < args.Length:
options.ServerName = args[++i];
options.InCmdLine.Add("ServerName");
break;
// ... additional flags: -m, --tls*, -D/-V/-DV, -l, -c, --pid, etc.
}
}
using var loggerFactory = new Serilog.Extensions.Logging.SerilogLoggerFactory(Log.Logger);
var server = new NatsServer(options, loggerFactory);
using var server = new NatsServer(options, loggerFactory);
server.HandleSignals();
var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cts.Cancel();
_ = Task.Run(async () => await server.ShutdownAsync());
};
try
{
await server.StartAsync(cts.Token);
}
catch (OperationCanceledException) { }
finally
{
Log.CloseAndFlush();
}
_ = server.StartAsync(CancellationToken.None);
await server.WaitForReadyAsync();
server.WaitForShutdown();
```
`InCmdLine` tracks which options were supplied on the command line so that a subsequent config-file reload does not overwrite them.
---
## Graceful Shutdown
Pressing Ctrl+C triggers `Console.CancelKeyPress`. The handler sets `e.Cancel = true` — this prevents the process from terminating immediately — and calls `cts.Cancel()` to signal the `CancellationToken` passed to `server.StartAsync`.
Pressing Ctrl+C triggers `Console.CancelKeyPress`. The handler sets `e.Cancel = true` — this prevents the process from terminating immediately — and dispatches `server.ShutdownAsync()` on a background task. `WaitForShutdown()` blocks the main thread until shutdown completes. The `finally` block runs `Log.CloseAndFlush()` to ensure all buffered log output is written before the process exits.
`NatsServer.StartAsync` exits its accept loop on cancellation. In-flight client connections are left to drain naturally. After `StartAsync` returns (via `OperationCanceledException` which is caught), the `finally` block runs `Log.CloseAndFlush()` to ensure all buffered log output is written before the process exits.
`server.HandleSignals()` registers additional OS signal handlers (SIGHUP for config reload, SIGUSR1 for log file reopen on Unix) before the main loop starts.
---
@@ -102,30 +108,30 @@ The test project is at `tests/NATS.Server.Tests/`. It uses xUnit with Shouldly f
### Test summary
69 tests across 6 test files:
The test project contains 99 test files across seven areas:
| File | Tests | Coverage |
|------|-------|----------|
| `SubjectMatchTests.cs` | 33 | Subject validation and wildcard matching |
| `SubListTests.cs` | 12 | Trie insert, remove, match, queue groups, cache |
| `ParserTests.cs` | 14 | All command types, split packets, case insensitivity |
| `ClientTests.cs` | 2 | Socket-level INFO on connect, PING/PONG |
| `ServerTests.cs` | 3 | End-to-end accept, pub/sub, wildcard delivery |
| `IntegrationTests.cs` | 5 | NATS.Client.Core protocol compatibility |
- **Auth/TLS** (23 files) — authenticators (token, username/password, NKey, JWT), client permissions, OCSP, TLS connection wrapping, TLS rate limiting, account isolation, permission integration
- **JetStream/RAFT** (23 files) — stream API, consumer API, publish, pull/push delivery, ack redelivery, retention policies, mirroring/sourcing, config validation, FileStore, MemStore, store contract, RAFT election, replication, and snapshot catchup
- **Monitoring/Config** (15 files) — HTTP monitor endpoints, `/jsz`, config file parsing (lexer + parser), config reload, `NatsOptions`, server stats, subsz, account stats, account resolver, logging, Go parity runner
- **Client lifecycle** (12 files) — `NatsClient` flags, closed-reason tracking, trace mode, write loop, no-responders, verbose mode, RTT, response tracker, internal client, event system, import/export, response routing
- **Protocol/Parser** (7 files) — `NatsParser` commands, subject validation and wildcard matching, `SubList` trie, NATS header parser, subject transforms
- **Clustering** (4 files) — route handshake, route subscription propagation, gateway/leaf bootstrap, cluster JetStream config processor
- **WebSocket** (9 files in `WebSocket/`) — frame read/write, compression, upgrade handshake, origin checking, connection handling, integration, options, constants
- **Integration** (6 files) — end-to-end tests using `NATS.Client.Core`, system events, system request-reply, auth integration, NKey integration, permission integration
### Test categories
**SubjectMatchTests** 33 `[Theory]` cases verifying `SubjectMatch.IsValidSubject` (16 cases), `SubjectMatch.IsValidPublishSubject` (6 cases), and `SubjectMatch.MatchLiteral` (11 cases). Covers empty strings, leading/trailing dots, embedded spaces, `>` in non-terminal position, and all wildcard combinations.
**SubjectMatchTests**`[Theory]` cases verifying `SubjectMatch.IsValidSubject`, `SubjectMatch.IsValidPublishSubject`, and `SubjectMatch.MatchLiteral`. Covers empty strings, leading/trailing dots, embedded spaces, `>` in non-terminal position, and all wildcard combinations.
**SubListTests** 12 `[Fact]` tests exercising the `SubList` trie directly: literal insert and match, empty result, `*` wildcard at various token levels, `>` wildcard, root `>`, multiple overlapping subscriptions, remove, queue group grouping, `Count` tracking, and cache invalidation after a wildcard insert.
**SubListTests**`[Fact]` tests exercising the `SubList` trie directly: literal insert and match, empty result, `*` wildcard at various token levels, `>` wildcard, root `>`, multiple overlapping subscriptions, remove, queue group grouping, `Count` tracking, and cache invalidation after a wildcard insert.
**ParserTests** 14 `async [Fact]` tests that write protocol bytes into a `Pipe` and assert on the resulting `ParsedCommand` list. Covers `PING`, `PONG`, `CONNECT`, `SUB` (with and without queue group), `UNSUB` (with and without `max-messages`), `PUB` (with payload, with reply-to, zero payload), `HPUB` (with header), `INFO`, multiple commands in a single buffer, and case-insensitive parsing.
**ParserTests**`async [Fact]` tests that write protocol bytes into a `Pipe` and assert on the resulting `ParsedCommand` list. Covers `PING`, `PONG`, `CONNECT`, `SUB` (with and without queue group), `UNSUB` (with and without `max-messages`), `PUB` (with payload, with reply-to, zero payload), `HPUB` (with header), `INFO`, multiple commands in a single buffer, and case-insensitive parsing.
**ClientTests** 2 `async [Fact]` tests using a real loopback socket pair. Verifies that `NatsClient` sends an `INFO` frame immediately on connection, and that it responds `PONG` to a `PING` after `CONNECT`.
**ClientTests**`async [Fact]` tests using a real loopback socket pair. Verifies that `NatsClient` sends an `INFO` frame immediately on connection, and that it responds `PONG` to a `PING` after `CONNECT`.
**ServerTests** 3 `async [Fact]` tests that start `NatsServer` on a random port. Verifies `INFO` on connect, basic pub/sub delivery (`MSG` format), and wildcard subscription matching.
**ServerTests**`async [Fact]` tests that start `NatsServer` on a random port. Verifies `INFO` on connect, basic pub/sub delivery (`MSG` format), and wildcard subscription matching.
**IntegrationTests** 5 `async [Fact]` tests using the official `NATS.Client.Core` v2.7.2 NuGet package. Verifies end-to-end protocol compatibility with a real NATS client library: basic pub/sub, `*` wildcard delivery, `>` wildcard delivery, fan-out to two subscribers, and `PingAsync`.
**IntegrationTests**`async [Fact]` tests using the official `NATS.Client.Core` NuGet package. Verifies end-to-end protocol compatibility with a real NATS client library: basic pub/sub, `*` wildcard delivery, `>` wildcard delivery, fan-out to two subscribers, and `PingAsync`.
Integration tests use `NullLoggerFactory.Instance` for the server so test output is not cluttered with server logs.
@@ -171,16 +177,17 @@ The Go server is useful for verifying that the .NET port produces identical prot
---
## Current Limitations
## Known Gaps vs Go Reference
The following features present in the Go reference server are not yet ported:
The following areas have partial or stub implementations compared to the Go reference server:
- Authentication — no username/password, token, NKey, or JWT support
- Clustering — no routes, gateways, or leaf nodes
- JetStream — no persistent streaming, streams, consumers, or RAFT
- Monitoring — no HTTP endpoints (`/varz`, `/connz`, `/healthz`, etc.)
- TLS — all connections are plaintext
- WebSocket — no WebSocket transport
- **MQTT listener** — config is parsed and the option is recognized, but no MQTT transport is implemented
- **Route message routing**the route TCP connection and handshake are established, but `RMSG` forwarding is not implemented; messages are not relayed to peer nodes
- **Gateways** — the listener stub accepts connections, but no inter-cluster bridging or interest-only filtering is implemented
- **Leaf nodes** — the listener stub accepts connections, but no hub-and-spoke topology or subject sharing is implemented
- **JetStream API surface** — only `STREAM.CREATE`, `STREAM.INFO`, `CONSUMER.CREATE`, and `CONSUMER.INFO` API subjects are handled; all others return a not-found error response
- **FileStore durability** — the file store maintains a full in-memory index, performs per-write I/O without batching, and rewrites the full block on trim; it is not production-safe under load
- **RAFT network transport** — the RAFT implementation uses in-process message passing only; there is no network transport, so consensus does not survive process restarts or span multiple server instances
---
@@ -190,4 +197,4 @@ The following features present in the Go reference server are not yet ported:
- [Server Overview](../Server/Overview.md)
- [Protocol Overview](../Protocol/Overview.md)
<!-- Last verified against codebase: 2026-02-22 -->
<!-- Last verified against codebase: 2026-02-23 -->

View File

@@ -7,32 +7,40 @@
### Fields and properties
```csharp
public sealed class NatsClient : IDisposable
public sealed class NatsClient : INatsClient, IDisposable
{
private static readonly ClientCommandMatrix CommandMatrix = new();
private readonly Socket _socket;
private readonly NetworkStream _stream;
private readonly Stream _stream;
private readonly NatsOptions _options;
private readonly ServerInfo _serverInfo;
private readonly AuthService _authService;
private readonly NatsParser _parser;
private readonly SemaphoreSlim _writeLock = new(1, 1);
private readonly Channel<ReadOnlyMemory<byte>> _outbound = Channel.CreateBounded<ReadOnlyMemory<byte>>(
new BoundedChannelOptions(8192) { SingleReader = true, FullMode = BoundedChannelFullMode.Wait });
private long _pendingBytes;
private CancellationTokenSource? _clientCts;
private readonly Dictionary<string, Subscription> _subs = new();
private readonly ILogger _logger;
private ClientPermissions? _permissions;
private readonly ServerStats _serverStats;
public ulong Id { get; }
public ClientKind Kind { get; }
public ClientOptions? ClientOpts { get; private set; }
public IMessageRouter? Router { get; set; }
public bool ConnectReceived { get; private set; }
public long InMsgs;
public long OutMsgs;
public long InBytes;
public long OutBytes;
public IReadOnlyDictionary<string, Subscription> Subscriptions => _subs;
public Account? Account { get; private set; }
public DateTime StartTime { get; }
private readonly ClientFlagHolder _flags = new();
public bool ConnectReceived => _flags.HasFlag(ClientFlags.ConnectReceived);
public ClientClosedReason CloseReason { get; private set; }
}
```
`_writeLock` is a `SemaphoreSlim(1, 1)` — a binary semaphore that serializes all writes to `_stream`. Without it, concurrent `SendMessageAsync` calls from different publisher threads would interleave bytes on the wire. See [Write serialization](#write-serialization) below.
`_stream` is typed as `Stream` rather than `NetworkStream` because the server passes in a pre-wrapped stream: plain `NetworkStream` for unencrypted connections, `SslStream` for TLS, or a WebSocket framing adapter. `NatsClient` does not know or care which transport is underneath.
`_outbound` is a bounded `Channel<ReadOnlyMemory<byte>>(8192)` with `SingleReader = true` and `FullMode = BoundedChannelFullMode.Wait`. The channel is the sole path for all outbound frames. Slow consumer detection uses `_pendingBytes` — an `Interlocked`-maintained counter of bytes queued but not yet flushed — checked against `_options.MaxPending` in `QueueOutbound`. See [Write Serialization](#write-serialization) below.
`_flags` is a `ClientFlagHolder` (a thin wrapper around an `int` with atomic bit operations). Protocol-level boolean state — `ConnectReceived`, `CloseConnection`, `IsSlowConsumer`, `TraceMode`, and others — is stored as flag bits rather than separate fields, keeping the state machine manipulation thread-safe without separate locks.
`_subs` maps subscription IDs (SIDs) to `Subscription` objects. SIDs are client-assigned strings; `Dictionary<string, Subscription>` gives O(1) lookup for UNSUB processing.
@@ -43,21 +51,30 @@ The four stat fields (`InMsgs`, `OutMsgs`, `InBytes`, `OutBytes`) are `long` fie
### Constructor
```csharp
public NatsClient(ulong id, Socket socket, NatsOptions options, ServerInfo serverInfo, ILogger logger)
public NatsClient(ulong id, Stream stream, Socket socket, NatsOptions options, ServerInfo serverInfo,
AuthService authService, byte[]? nonce, ILogger logger, ServerStats serverStats,
ClientKind kind = ClientKind.Client)
{
Id = id;
Kind = kind;
_socket = socket;
_stream = new NetworkStream(socket, ownsSocket: false);
_stream = stream;
_options = options;
_serverInfo = serverInfo;
_authService = authService;
_logger = logger;
_parser = new NatsParser(options.MaxPayload);
_serverStats = serverStats;
_parser = new NatsParser(options.MaxPayload, options.Trace ? logger : null);
StartTime = DateTime.UtcNow;
}
```
`NetworkStream` is created with `ownsSocket: false`. This keeps socket lifetime management in `NatsServer`, which disposes the socket explicitly in `Dispose`. If `ownsSocket` were `true`, disposing the `NetworkStream` would close the socket, potentially racing with the disposal path in `NatsServer`.
The `stream` parameter is passed in by `NatsServer` already wrapped for the appropriate transport. For a plain TCP connection it is a `NetworkStream`; after a TLS handshake it is an `SslStream`; for WebSocket connections it is a WebSocket framing adapter. `NatsClient` writes to `Stream` throughout and is unaware of which transport is underneath.
`NatsParser` is constructed with `MaxPayload` from options. The parser enforces this limit: a payload larger than `MaxPayload` causes a `ProtocolViolationException` and terminates the connection.
`authService` is the shared `AuthService` instance. `NatsClient` calls `authService.IsAuthRequired` and `authService.Authenticate(context)` during CONNECT processing rather than performing authentication checks inline. `serverStats` is a shared `ServerStats` struct updated via `Interlocked` operations on the hot path (message counts, slow consumer counts, stale connections).
`byte[]? nonce` carries a pre-generated challenge value for NKey authentication. When non-null, it is embedded in the INFO payload sent to the client. After `ProcessConnectAsync` completes, the nonce is zeroed via `CryptographicOperations.ZeroMemory` as a defense-in-depth measure.
`NatsParser` is constructed with `MaxPayload` from options. The parser enforces this limit: a payload larger than `MaxPayload` causes the connection to be closed with `ClientClosedReason.MaxPayloadExceeded`.
## Connection Lifecycle
@@ -68,23 +85,28 @@ public NatsClient(ulong id, Socket socket, NatsOptions options, ServerInfo serve
```csharp
public async Task RunAsync(CancellationToken ct)
{
_clientCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
var pipe = new Pipe();
try
{
await SendInfoAsync(ct);
if (!InfoAlreadySent)
SendInfo();
var fillTask = FillPipeAsync(pipe.Writer, ct);
var processTask = ProcessCommandsAsync(pipe.Reader, ct);
var fillTask = FillPipeAsync(pipe.Writer, _clientCts.Token);
var processTask = ProcessCommandsAsync(pipe.Reader, _clientCts.Token);
var pingTask = RunPingTimerAsync(_clientCts.Token);
var writeTask = RunWriteLoopAsync(_clientCts.Token);
await Task.WhenAny(fillTask, processTask);
await Task.WhenAny(fillTask, processTask, pingTask, writeTask);
}
catch (OperationCanceledException) { }
catch (Exception ex)
catch (OperationCanceledException)
{
_logger.LogDebug(ex, "Client {ClientId} connection error", Id);
MarkClosed(ClientClosedReason.ServerShutdown);
}
finally
{
MarkClosed(ClientClosedReason.ClientClosed);
_outbound.Writer.TryComplete();
Router?.RemoveClient(this);
}
}
@@ -92,10 +114,17 @@ public async Task RunAsync(CancellationToken ct)
The method:
1. Sends `INFO {json}\r\n` immediately on connect — required by the NATS protocol before the client sends CONNECT.
2. Creates a `System.IO.Pipelines.Pipe` and starts two concurrent tasks: `FillPipeAsync` reads bytes from the socket into the pipe's write end; `ProcessCommandsAsync` reads from the pipe's read end and dispatches commands.
3. Awaits `Task.WhenAny`. Either task completing signals the connection is done — either the socket closed (fill task returns) or a protocol error caused the process task to throw.
4. In `finally`, calls `Router?.RemoveClient(this)` to clean up subscriptions and remove the client from the server's client dictionary.
1. Creates `_clientCts` as a `CancellationTokenSource.CreateLinkedTokenSource(ct)`. This gives the client its own cancellation scope linked to the server-wide token. `CloseWithReasonAsync` cancels `_clientCts` to tear down only this connection without affecting the rest of the server.
2. Calls `SendInfo()` unless `InfoAlreadySent` is set — TLS negotiation sends INFO before handing the `SslStream` to `RunAsync`, so the flag prevents a duplicate INFO on TLS connections.
3. Starts four concurrent tasks using `_clientCts.Token`:
- `FillPipeAsync` — reads bytes from `_stream` into the pipe's write end.
- `ProcessCommandsAsync` — reads from the pipe's read end and dispatches commands.
- `RunPingTimerAsync` — sends periodic PING frames and enforces stale-connection detection via `_options.MaxPingsOut`.
- `RunWriteLoopAsync` — drains `_outbound` channel frames and writes them to `_stream`.
4. Awaits `Task.WhenAny`. Any task completing signals the connection is ending — the socket closed, a protocol error was detected, or the server is shutting down.
5. In `finally`, calls `MarkClosed(ClientClosedReason.ClientClosed)` (first-write-wins; earlier calls from error paths set the actual reason), completes the outbound channel writer so `RunWriteLoopAsync` can drain and exit, then calls `Router?.RemoveClient(this)` to remove subscriptions and deregister the client.
`CloseWithReasonAsync(reason, errMessage)` is the coordinated close path used by protocol violations and slow consumer detection. It sets `CloseReason`, optionally queues a `-ERR` frame, marks the `CloseConnection` flag, completes the channel writer, waits 50 ms for the write loop to flush the error frame, then cancels `_clientCts`. `MarkClosed(reason)` is the lighter first-writer-wins setter used by the `RunAsync` catch blocks.
`Router?.RemoveClient(this)` uses a null-conditional because `Router` could be null if the client is used in a test context without a server.
@@ -166,48 +195,53 @@ private async Task ProcessCommandsAsync(PipeReader reader, CancellationToken ct)
## Command Dispatch
`DispatchCommandAsync` switches on the `CommandType` returned by the parser:
`DispatchCommandAsync` first consults `CommandMatrix` to verify the command is permitted for this client's `Kind`, then dispatches by `CommandType`:
```csharp
private async ValueTask DispatchCommandAsync(ParsedCommand cmd, CancellationToken ct)
{
Interlocked.Exchange(ref _lastActivityTicks, DateTime.UtcNow.Ticks);
if (!CommandMatrix.IsAllowed(Kind, cmd.Operation))
{
await SendErrAndCloseAsync("Parser Error");
return;
}
switch (cmd.Type)
{
case CommandType.Connect:
ProcessConnect(cmd);
await ProcessConnectAsync(cmd);
break;
case CommandType.Ping:
await WriteAsync(NatsProtocol.PongBytes, ct);
WriteProtocol(NatsProtocol.PongBytes);
break;
case CommandType.Pong:
// Update RTT tracking (placeholder)
Interlocked.Exchange(ref _pingsOut, 0);
Interlocked.Exchange(ref _rtt, DateTime.UtcNow.Ticks - Interlocked.Read(ref _rttStartTicks));
_flags.SetFlag(ClientFlags.FirstPongSent);
break;
case CommandType.Sub:
ProcessSub(cmd);
break;
case CommandType.Unsub:
ProcessUnsub(cmd);
break;
case CommandType.Pub:
case CommandType.HPub:
ProcessPub(cmd);
break;
}
}
```
`ClientCommandMatrix` is a static lookup table keyed by `ClientKind`. Each `ClientKind` has an allowed set of `CommandType` values. `Kind.Client` accepts the standard client command set (CONNECT, PING, PONG, SUB, UNSUB, PUB, HPUB). Router-kind clients additionally accept `RS+` and `RS-` subscription propagation messages used for cluster route subscription exchange. If a command is not allowed for the current kind, the connection is closed with `Parser Error`.
Every command dispatch updates `_lastActivityTicks` via `Interlocked.Exchange`. The ping timer in `RunPingTimerAsync` reads `_lastIn` (updated on every received byte batch) to decide whether the client was recently active; `_lastActivityTicks` is the higher-level timestamp exposed as `LastActivity` on the public interface.
### CONNECT
`ProcessConnect` deserializes the JSON payload into a `ClientOptions` record and sets `ConnectReceived = true`. `ClientOptions` carries the `echo` flag (default `true`), the client name, language, and version strings.
### PING / PONG
PING is responded to immediately with the pre-allocated `NatsProtocol.PongBytes` (`"PONG\r\n"`). The response goes through `WriteAsync`, which acquires the write lock. PONG handling is currently a placeholder for future RTT tracking.
PING is responded to immediately with the pre-allocated `NatsProtocol.PongBytes` (`"PONG\r\n"`) via `WriteProtocol`, which calls `QueueOutbound`. PONG resets `_pingsOut` to 0 (preventing stale-connection closure), records RTT by comparing the current tick count against `_rttStartTicks` set when the PING was sent, and sets the `ClientFlags.FirstPongSent` flag to unblock the initial ping timer delay.
### SUB
@@ -284,49 +318,74 @@ Stats are updated before routing. For HPUB, the combined payload from the parser
## Write Serialization
Multiple concurrent `SendMessageAsync` calls can arrive from different publisher connections at the same time. Without coordination, their writes would interleave on the socket and corrupt the message stream for the receiving client. `_writeLock` prevents this:
All outbound frames flow through a bounded `Channel<ReadOnlyMemory<byte>>` named `_outbound`. The channel has a capacity of 8192 entries, `SingleReader = true`, and `FullMode = BoundedChannelFullMode.Wait`. Every caller that wants to send bytes — protocol responses, MSG deliveries, PING frames, INFO, ERR — calls `QueueOutbound(data)`, which performs two checks before writing to the channel:
```csharp
public async Task SendMessageAsync(string subject, string sid, string? replyTo,
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload, CancellationToken ct)
public bool QueueOutbound(ReadOnlyMemory<byte> data)
{
Interlocked.Increment(ref OutMsgs);
Interlocked.Add(ref OutBytes, payload.Length + headers.Length);
if (_flags.HasFlag(ClientFlags.CloseConnection))
return false;
byte[] line;
if (headers.Length > 0)
var pending = Interlocked.Add(ref _pendingBytes, data.Length);
if (pending > _options.MaxPending)
{
int totalSize = headers.Length + payload.Length;
line = Encoding.ASCII.GetBytes(
$"HMSG {subject} {sid} {(replyTo != null ? replyTo + " " : "")}{headers.Length} {totalSize}\r\n");
}
else
{
line = Encoding.ASCII.GetBytes(
$"MSG {subject} {sid} {(replyTo != null ? replyTo + " " : "")}{payload.Length}\r\n");
Interlocked.Add(ref _pendingBytes, -data.Length);
_flags.SetFlag(ClientFlags.IsSlowConsumer);
Interlocked.Increment(ref _serverStats.SlowConsumers);
_ = CloseWithReasonAsync(ClientClosedReason.SlowConsumerPendingBytes, NatsProtocol.ErrSlowConsumer);
return false;
}
await _writeLock.WaitAsync(ct);
try
if (!_outbound.Writer.TryWrite(data))
{
await _stream.WriteAsync(line, ct);
if (headers.Length > 0)
await _stream.WriteAsync(headers, ct);
if (payload.Length > 0)
await _stream.WriteAsync(payload, ct);
await _stream.WriteAsync(NatsProtocol.CrLf, ct);
await _stream.FlushAsync(ct);
// Channel is full (all 8192 slots taken) -- slow consumer
_flags.SetFlag(ClientFlags.IsSlowConsumer);
_ = CloseWithReasonAsync(ClientClosedReason.SlowConsumerPendingBytes, NatsProtocol.ErrSlowConsumer);
return false;
}
finally
return true;
}
```
`_pendingBytes` is an `Interlocked`-maintained counter. When it exceeds `_options.MaxPending`, the client is classified as a slow consumer and `CloseWithReasonAsync` is called. If `TryWrite` fails (all 8192 channel slots are occupied), the same slow consumer path fires. In either case the connection is closed with `-ERR 'Slow Consumer'`.
`RunWriteLoopAsync` is the sole reader of the channel, running as one of the four concurrent tasks in `RunAsync`:
```csharp
private async Task RunWriteLoopAsync(CancellationToken ct)
{
var reader = _outbound.Reader;
while (await reader.WaitToReadAsync(ct))
{
_writeLock.Release();
long batchBytes = 0;
while (reader.TryRead(out var data))
{
await _stream.WriteAsync(data, ct);
batchBytes += data.Length;
}
using var flushCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
flushCts.CancelAfter(_options.WriteDeadline);
try
{
await _stream.FlushAsync(flushCts.Token);
}
catch (OperationCanceledException) when (!ct.IsCancellationRequested)
{
// Flush timed out -- slow consumer on the write side
await CloseWithReasonAsync(ClientClosedReason.SlowConsumerWriteDeadline, NatsProtocol.ErrSlowConsumer);
return;
}
Interlocked.Add(ref _pendingBytes, -batchBytes);
}
}
```
The control line is constructed before acquiring the lock so the string formatting work happens outside the critical section. Once the lock is held, all writes for one message — control line, optional headers, payload, and trailing `\r\n` — happen atomically from the perspective of other writers.
`WaitToReadAsync` yields until at least one frame is available. The inner `TryRead` loop drains as many frames as are available without yielding, batching them into a single `FlushAsync`. This amortizes the flush cost over multiple frames when the client is keeping up. After the flush, `_pendingBytes` is decremented by the batch size.
Stats (`OutMsgs`, `OutBytes`) are updated before the lock because they are independent of the write ordering constraint.
If `FlushAsync` does not complete within `_options.WriteDeadline`, the write-deadline slow consumer path fires. `WriteDeadline` is distinct from `MaxPending`: `MaxPending` catches a client whose channel is backing up due to slow reads; `WriteDeadline` catches a client whose OS socket send buffer is stalled (e.g. the TCP window is closed).
## Subscription Cleanup
@@ -348,22 +407,25 @@ This removes every subscription this client holds from the shared `SubList` trie
```csharp
public void Dispose()
{
_permissions?.Dispose();
_outbound.Writer.TryComplete();
_clientCts?.Dispose();
_stream.Dispose();
_socket.Dispose();
_writeLock.Dispose();
}
```
Disposing `_stream` closes the network stream. Disposing `_socket` closes the OS socket. Any in-flight `ReadAsync` or `WriteAsync` will fault with an `ObjectDisposedException` or `IOException`, which causes the read/write tasks to terminate. `_writeLock` is disposed last to release the `SemaphoreSlim`'s internal handle.
`_outbound.Writer.TryComplete()` is called before disposing the stream so that `RunWriteLoopAsync` can observe channel completion and exit cleanly rather than faulting on a disposed stream. `_clientCts` is disposed to release the linked token registration. Disposing `_stream` and `_socket` closes the underlying transport; any in-flight `ReadAsync` or `WriteAsync` will fault with an `ObjectDisposedException` or `IOException`, which causes the remaining tasks to terminate.
## Go Reference
The Go counterpart is `golang/nats-server/server/client.go`. Key differences in the .NET port:
- Go uses separate goroutines for `readLoop` and `writeLoop`; the .NET port uses `FillPipeAsync` and `ProcessCommandsAsync` as concurrent `Task`s sharing a `Pipe`.
- Go uses separate goroutines for `readLoop` and `writeLoop`; the .NET port uses `FillPipeAsync`, `ProcessCommandsAsync`, `RunPingTimerAsync`, and `RunWriteLoopAsync` as four concurrent `Task`s all linked to `_clientCts`.
- Go uses dynamic buffer sizing (512 to 65536 bytes) in `readLoop`; the .NET port requests 4096-byte chunks from the `PipeWriter`.
- Go uses a mutex for write serialization (`c.mu`); the .NET port uses `SemaphoreSlim(1,1)` to allow `await`-based waiting without blocking a thread.
- The `System.IO.Pipelines` `Pipe` replaces Go's direct `net.Conn` reads. This separates the I/O pump from command parsing and avoids partial-read handling in the parser itself.
- Go uses a static per-client read buffer; the .NET port uses `System.IO.Pipelines` for zero-copy parsing. The pipe separates the I/O pump from command parsing, avoids partial-read handling in the parser, and allows the `PipeReader` backpressure mechanism to control how much data is buffered between fill and process.
- Go's `flushOutbound()` batches queued writes and flushes them under `c.mu`; the .NET port uses a bounded `Channel<ReadOnlyMemory<byte>>(8192)` write queue with a `_pendingBytes` counter for backpressure. `RunWriteLoopAsync` is the sole reader: it drains all available frames in one batch and calls `FlushAsync` once per batch, with a `WriteDeadline` timeout to detect stale write-side connections.
- Go uses `c.mu` (a sync.Mutex) for write serialization; the .NET port eliminates the write lock entirely — `RunWriteLoopAsync` is the only goroutine that writes to `_stream`, so no locking is required on the write path.
## Related Documentation
@@ -372,4 +434,4 @@ The Go counterpart is `golang/nats-server/server/client.go`. Key differences in
- [SubList Trie](../Subscriptions/SubList.md)
- [Subscriptions Overview](../Subscriptions/Overview.md)
<!-- Last verified against codebase: 2026-02-22 -->
<!-- Last verified against codebase: 2026-02-23 -->

View File

@@ -31,20 +31,46 @@ Defining them separately makes unit testing straightforward: a test can supply a
```csharp
public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
{
private readonly NatsOptions _options;
// Client registry
private readonly ConcurrentDictionary<ulong, NatsClient> _clients = new();
private readonly SubList _subList = new();
private readonly ServerInfo _serverInfo;
private readonly ILogger<NatsServer> _logger;
private readonly ILoggerFactory _loggerFactory;
private Socket? _listener;
private readonly ConcurrentQueue<ClosedClient> _closedClients = new();
private ulong _nextClientId;
private int _activeClientCount;
public SubList SubList => _subList;
// Account system
private readonly ConcurrentDictionary<string, Account> _accounts = new(StringComparer.Ordinal);
private readonly Account _globalAccount;
private readonly Account _systemAccount;
private AuthService _authService;
// Subsystem managers (null when not configured)
private readonly RouteManager? _routeManager;
private readonly GatewayManager? _gatewayManager;
private readonly LeafNodeManager? _leafNodeManager;
private readonly JetStreamService? _jetStreamService;
private readonly JetStreamPublisher? _jetStreamPublisher;
private MonitorServer? _monitorServer;
// TLS / transport
private readonly SslServerAuthenticationOptions? _sslOptions;
private readonly TlsRateLimiter? _tlsRateLimiter;
private Socket? _listener;
private Socket? _wsListener;
// Shutdown coordination
private readonly CancellationTokenSource _quitCts = new();
private readonly TaskCompletionSource _shutdownComplete = new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly TaskCompletionSource _acceptLoopExited = new(TaskCreationOptions.RunContinuationsAsynchronously);
private int _shutdown;
private int _lameDuck;
public SubList SubList => _globalAccount.SubList;
}
```
`_clients` tracks every live connection. `_nextClientId` is incremented with `Interlocked.Increment` for each accepted socket, producing monotonically increasing client IDs without a lock. `_loggerFactory` is retained so per-client loggers can be created at accept time, each tagged with the client ID.
`_clients` tracks every live connection. `_closedClients` holds a capped ring of recently disconnected client snapshots (used by `/connz`). `_nextClientId` is incremented with `Interlocked.Increment` for each accepted socket, producing monotonically increasing client IDs without a lock. `_loggerFactory` is retained so per-client loggers can be created at accept time, each tagged with the client ID.
Each subsystem manager field (`_routeManager`, `_gatewayManager`, `_leafNodeManager`, `_jetStreamService`, `_monitorServer`) is `null` when the corresponding options section is absent from the configuration. Code that interacts with these managers always guards with a null check.
### Constructor
@@ -70,6 +96,10 @@ public NatsServer(NatsOptions options, ILoggerFactory loggerFactory)
The `ServerId` is derived from a GUID — taking the first 20 characters of its `"N"` format (32 hex digits, no hyphens) and uppercasing them. This matches the fixed-length alphanumeric server ID format used by the Go server.
Subsystem managers are instantiated in the constructor if the corresponding options sections are non-null: `options.Cluster != null` creates a `RouteManager`, `options.Gateway != null` creates a `GatewayManager`, `options.LeafNode != null` creates a `LeafNodeManager`, and `options.JetStream != null` creates `JetStreamService`, `JetStreamApiRouter`, `StreamManager`, `ConsumerManager`, and `JetStreamPublisher`. TLS options are compiled into `SslServerAuthenticationOptions` via `TlsHelper.BuildServerAuthOptions` when `options.HasTls` is true.
Before entering the accept loop, `StartAsync` starts the monitoring server, WebSocket listener, route connections, gateway connections, leaf node listener, and JetStream service.
## Accept Loop
`StartAsync` binds the socket, enables `SO_REUSEADDR` so the port can be reused immediately after a restart, and enters an async accept loop:
@@ -103,6 +133,37 @@ public async Task StartAsync(CancellationToken ct)
The backlog of 128 passed to `Listen` controls the OS-level queue of unaccepted connections — matching the Go server default.
### TLS wrapping and WebSocket upgrade
After `AcceptAsync` returns a socket, the connection is handed to `AcceptClientAsync`, which performs transport negotiation before constructing `NatsClient`:
```csharp
private async Task AcceptClientAsync(Socket socket, ulong clientId, CancellationToken ct)
{
if (_tlsRateLimiter != null)
await _tlsRateLimiter.WaitAsync(ct);
var networkStream = new NetworkStream(socket, ownsSocket: false);
// TlsConnectionWrapper performs the TLS handshake if _sslOptions is set;
// returns the raw NetworkStream unchanged when TLS is not configured.
var (stream, infoAlreadySent) = await TlsConnectionWrapper.NegotiateAsync(
socket, networkStream, _options, _sslOptions, _serverInfo,
_loggerFactory.CreateLogger("NATS.Server.Tls"), ct);
// ...auth nonce generation, TLS state extraction...
var client = new NatsClient(clientId, stream, socket, _options, clientInfo,
_authService, nonce, clientLogger, _stats);
client.Router = this;
client.TlsState = tlsState;
client.InfoAlreadySent = infoAlreadySent;
_clients[clientId] = client;
}
```
WebSocket connections follow a parallel path through `AcceptWebSocketClientAsync`. After optional TLS negotiation via `TlsConnectionWrapper`, the HTTP upgrade handshake is performed by `WsUpgrade.TryUpgradeAsync`. On success, the raw stream is wrapped in a `WsConnection` that handles WebSocket framing, masking, and per-message compression before `NatsClient` is constructed.
## Message Routing
`ProcessMessage` is called by `NatsClient` for every PUB or HPUB command. It is the hot path: called once per published message.
@@ -175,9 +236,11 @@ private static void DeliverMessage(Subscription sub, string subject, string? rep
}
```
`MessageCount` is incremented atomically before the send. If it exceeds `MaxMessages` (set by an UNSUB with a message count argument), the message is silently dropped. The subscription itself is not removed here — removal happens when the client processes the count limit through `ProcessUnsub`, or when the client disconnects and `RemoveAllSubscriptions` is called.
`MessageCount` is incremented atomically before the send. If it exceeds `MaxMessages` (set by an UNSUB with a message count argument), the subscription is removed from the trie immediately (`subList.Remove(sub)`) and from the client's tracking table (`client.RemoveSubscription(sub.Sid)`), then the message is dropped without delivery.
`SendMessageAsync` is again fire-and-forget. Multiple deliveries to different clients happen concurrently.
`SendMessage` enqueues the serialized wire bytes on the client's outbound channel. Multiple deliveries to different clients happen concurrently.
After local delivery, `ProcessMessage` forwards to the JetStream publisher first: if the subject matches a configured stream, `TryCaptureJetStreamPublish` stores the message and the `PubAck` is sent back to the publisher via `sender.RecordJetStreamPubAck`. Route forwarding is handled separately by `OnLocalSubscription`, which calls `_routeManager?.PropagateLocalSubscription` when a new subscription is added — keeping remote peers informed of local interest without re-routing individual messages inside `ProcessMessage`.
## Client Removal
@@ -193,17 +256,34 @@ public void RemoveClient(NatsClient client)
## Shutdown and Dispose
Graceful shutdown is initiated by `ShutdownAsync`. It uses `_quitCts` — a `CancellationTokenSource` shared between `StartAsync` and all subsystem managers — to signal all internal loops to stop:
```csharp
public void Dispose()
public async Task ShutdownAsync()
{
_listener?.Dispose();
foreach (var client in _clients.Values)
client.Dispose();
_subList.Dispose();
if (Interlocked.CompareExchange(ref _shutdown, 1, 0) != 0)
return; // Already shutting down
// Signal all internal loops to stop
await _quitCts.CancelAsync();
// Close listeners to stop accept loops
_listener?.Close();
_wsListener?.Close();
if (_routeManager != null) await _routeManager.DisposeAsync();
if (_gatewayManager != null) await _gatewayManager.DisposeAsync();
if (_leafNodeManager != null) await _leafNodeManager.DisposeAsync();
if (_jetStreamService != null) await _jetStreamService.DisposeAsync();
// Wait for accept loops to exit, flush and close clients, drain active tasks...
if (_monitorServer != null) await _monitorServer.DisposeAsync();
_shutdownComplete.TrySetResult();
}
```
Disposing the listener socket causes `AcceptAsync` to throw, which unwinds `StartAsync`. Client sockets are disposed, which closes their `NetworkStream` and causes their read loops to terminate. `SubList.Dispose` releases its `ReaderWriterLockSlim`.
Lame-duck mode is a two-phase variant initiated by `LameDuckShutdownAsync`. The `_lameDuck` field (checked via `IsLameDuckMode`) is set first, which stops the accept loops from receiving new connections while existing clients are given a grace period (`options.LameDuckGracePeriod`) to disconnect naturally. After the grace period, remaining clients are stagger-closed over `options.LameDuckDuration` to avoid a thundering herd of reconnects, then `ShutdownAsync` completes the teardown.
`Dispose` is a synchronous fallback. If `ShutdownAsync` has not already run, it blocks on it. It then disposes `_quitCts`, `_tlsRateLimiter`, the listener sockets, all subsystem managers (route, gateway, leaf node, JetStream), all connected clients, and all accounts. PosixSignalRegistrations are also disposed, deregistering the signal handlers.
## Go Reference
@@ -212,6 +292,7 @@ The Go counterpart is `golang/nats-server/server/server.go`. Key differences in
- Go uses goroutines for the accept loop and per-client read/write loops; the .NET port uses `async`/`await` with `Task`.
- Go uses `sync/atomic` for client ID generation; the .NET port uses `Interlocked.Increment`.
- Go passes the server to clients via the `srv` field on the client struct; the .NET port uses the `IMessageRouter` interface through the `Router` property.
- POSIX signal handlers — `SIGTERM`/`SIGQUIT` for shutdown, `SIGHUP` for config reload, `SIGUSR1` for log file reopen, `SIGUSR2` for lame-duck mode — are registered in `HandleSignals` via `PosixSignalRegistration.Create`. `SIGUSR1` and `SIGUSR2` are skipped on Windows. Registrations are stored in `_signalRegistrations` and disposed during `Dispose`.
## Related Documentation
@@ -220,4 +301,4 @@ The Go counterpart is `golang/nats-server/server/server.go`. Key differences in
- [Protocol Overview](../Protocol/Overview.md)
- [Configuration](../Configuration/Overview.md)
<!-- Last verified against codebase: 2026-02-22 -->
<!-- Last verified against codebase: 2026-02-23 -->