# Clustering Overview This document describes how clustering is implemented in the .NET NATS server port. The Go reference server supports three distinct connection types for clustering: routes, gateways, and leaf nodes. This implementation has partial route support and stub managers for gateways and leaf nodes. --- ## Cluster Topology The Go reference server uses three connection types, each serving a different topological purpose: | Connection Type | Default Port | Go Reference | .NET Status | |----------------|-------------|--------------|-------------| | Routes | 6222 | Full-mesh TCP connections between servers in a cluster; propagate subscriptions via `RS+`/`RS-` wire protocol; route messages with `RMSG` | TCP handshake and in-process subscription propagation only — no `RMSG`, no `RS+`/`RS-` wire protocol | | Gateways | 7222 | Inter-cluster bridges with interest-only optimization; reply subject remapping via `_GR_.` prefix | Stub only — `GatewayManager.StartAsync` logs and returns | | Leaf Nodes | 5222 | Hub-and-spoke edge connections; only subscribed subjects shared with hub | Stub only — `LeafNodeManager.StartAsync` logs and returns | --- ## Routes ### What the Go reference does In the Go server, routes form a full-mesh TCP connection pool between every pair of cluster peers. Each peer connection carries three kinds of traffic: - `RS+`/`RS-` — subscribe/unsubscribe propagation so every server knows the full interest set of all peers - `RMSG` — actual message forwarding when a publisher's server does not locally hold all matching subscribers - Route pooling — the Go server maintains 3 TCP connections per peer by default to parallelize traffic Subscription information flows over the wire using the `RS+`/`RS-` protocol, and messages flow over the wire using `RMSG`. This means a client connected to server A can receive a message published on server B without any shared memory. ### What this implementation does This implementation establishes real TCP connections between route peers and completes a handshake, but subscription propagation happens entirely in-process via a static `ConcurrentDictionary`. Messages are never forwarded over the wire. This means clustering only works when all servers share the same process — which is a test/development topology, not a production one. ### RouteManager `RouteManager` (`src/NATS.Server/Routes/RouteManager.cs`) owns the listener socket and the set of active `RouteConnection` instances. It also holds the process-wide registry of all `RouteManager` instances, which is the mechanism used for in-process subscription propagation. **`AcceptLoopAsync`** — accepts inbound TCP connections from peers: ```csharp private async Task AcceptLoopAsync(CancellationToken ct) { while (!ct.IsCancellationRequested) { Socket socket; try { socket = await _listener!.AcceptAsync(ct); } catch (OperationCanceledException) { break; } catch (ObjectDisposedException) { break; } catch (Exception ex) { _logger.LogDebug(ex, "Route accept loop error"); break; } _ = Task.Run(() => HandleInboundRouteAsync(socket, ct), ct); } } ``` **`ConnectToRouteWithRetryAsync`** — dials each configured seed route with a fixed 250 ms backoff between attempts: ```csharp private async Task ConnectToRouteWithRetryAsync(string route, CancellationToken ct) { while (!ct.IsCancellationRequested) { try { var endPoint = ParseRouteEndpoint(route); var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await socket.ConnectAsync(endPoint.Address, endPoint.Port, ct); var connection = new RouteConnection(socket); await connection.PerformOutboundHandshakeAsync(_serverId, ct); Register(connection); return; } catch (OperationCanceledException) { return; } catch (Exception ex) { _logger.LogDebug(ex, "Failed to connect route seed {Route}", route); } try { await Task.Delay(250, ct); } catch (OperationCanceledException) { return; } } } ``` The 250 ms delay is fixed — there is no exponential backoff. ### PropagateLocalSubscription When a client on the local server subscribes, `NatsServer` calls `RouteManager.PropagateLocalSubscription`. This does not send any bytes over TCP. Instead, it looks up peer `RouteManager` instances from the static `Managers` dictionary and calls `ReceiveRemoteSubscription` directly on each one: ```csharp public void PropagateLocalSubscription(string subject, string? queue) { if (_connectedServerIds.IsEmpty) return; var remoteSub = new RemoteSubscription(subject, queue, _serverId); foreach (var peerId in _connectedServerIds.Keys) { if (Managers.TryGetValue(peerId, out var peer)) peer.ReceiveRemoteSubscription(remoteSub); } } ``` `RemoteSubscription` is a record: `record RemoteSubscription(string Subject, string? Queue, string RouteId)`. The receiving manager calls `_remoteSubSink(sub)`, which is wired to `SubList.AddRemoteSubscription` in `NatsServer`. This design means subscription propagation works only when peer servers run in the same .NET process. No subscription state is exchanged over the TCP connection. ### RouteConnection handshake `RouteConnection` (`src/NATS.Server/Routes/RouteConnection.cs`) wraps a `Socket` and `NetworkStream`. The handshake is a single line exchange in both directions: `ROUTE \r\n`. The initiating side sends first, then reads; the accepting side reads first, then sends. ```csharp public async Task PerformOutboundHandshakeAsync(string serverId, CancellationToken ct) { await WriteLineAsync($"ROUTE {serverId}", ct); var line = await ReadLineAsync(ct); RemoteServerId = ParseHandshake(line); } public async Task PerformInboundHandshakeAsync(string serverId, CancellationToken ct) { var line = await ReadLineAsync(ct); RemoteServerId = ParseHandshake(line); await WriteLineAsync($"ROUTE {serverId}", ct); } ``` `ParseHandshake` validates that the line starts with `"ROUTE "` (case-insensitive) and extracts the server ID from `line[6..]`. An empty or missing ID throws `InvalidOperationException`. This handshake is not compatible with the Go server's route protocol, which sends a JSON `INFO` block and processes `CONNECT` options. ### WaitUntilClosedAsync After the handshake completes and the connection is registered, `RouteManager` calls `WaitUntilClosedAsync` on a background task. This reads from the socket in a loop and discards all bytes, returning only when the remote end closes the connection (zero-byte read): ```csharp public async Task WaitUntilClosedAsync(CancellationToken ct) { var buffer = new byte[1024]; while (!ct.IsCancellationRequested) { var bytesRead = await _stream.ReadAsync(buffer, ct); if (bytesRead == 0) return; } } ``` Because no messages are ever sent over a route connection after the handshake, this is the entire post-handshake read loop. ### Deduplication Duplicate route connections are prevented in `Register`. The deduplication key combines the remote server ID and the remote TCP endpoint: ```csharp private void Register(RouteConnection route) { var key = $"{route.RemoteServerId}:{route.RemoteEndpoint}"; if (!_routes.TryAdd(key, route)) { _ = route.DisposeAsync(); return; } if (route.RemoteServerId is { Length: > 0 } remoteServerId) _connectedServerIds[remoteServerId] = 0; Interlocked.Increment(ref _stats.Routes); _ = Task.Run(() => WatchRouteAsync(key, route, _cts!.Token)); } ``` If both sides of a peer pair initiate connections simultaneously, the second `TryAdd` loses and that connection is disposed. `RemoteEndpoint` falls back to a new GUID string if the socket's `RemoteEndPoint` is null, which prevents a null-keyed entry. --- ## Gateways `GatewayManager` (`src/NATS.Server/Gateways/GatewayManager.cs`) is a stub. `StartAsync` logs the configured name and listen address at `Debug` level, resets the gateway count in `ServerStats` to zero, and returns a completed task. No socket is bound, no connections are made: ```csharp public Task StartAsync(CancellationToken ct) { _logger.LogDebug("Gateway manager started (name={Name}, listen={Host}:{Port})", _options.Name, _options.Host, _options.Port); Interlocked.Exchange(ref _stats.Gateways, 0); return Task.CompletedTask; } ``` `GatewayConnection` exists as a skeleton class with only a `RemoteEndpoint` string property — no networking or protocol logic is present. --- ## Leaf Nodes `LeafNodeManager` (`src/NATS.Server/LeafNodes/LeafNodeManager.cs`) is a stub. `StartAsync` logs the configured listen address at `Debug` level, resets the leaf count in `ServerStats` to zero, and returns a completed task. No socket is bound: ```csharp public Task StartAsync(CancellationToken ct) { _logger.LogDebug("Leaf manager started (listen={Host}:{Port})", _options.Host, _options.Port); Interlocked.Exchange(ref _stats.Leafs, 0); return Task.CompletedTask; } ``` `LeafConnection` follows the same skeleton pattern as `GatewayConnection`. --- ## Configuration ### ClusterOptions `ClusterOptions` (`src/NATS.Server/Configuration/ClusterOptions.cs`) controls route clustering: | Field | Type | Default | Description | |-------|------|---------|-------------| | `Name` | `string?` | `null` | Cluster name; currently unused at runtime | | `Host` | `string` | `"0.0.0.0"` | Listen address for inbound route connections | | `Port` | `int` | `6222` | Listen port; set to 0 for OS-assigned port (updated after bind) | | `Routes` | `List` | `[]` | Seed route endpoints to dial on startup | ### GatewayOptions | Field | Type | Default | Description | |-------|------|---------|-------------| | `Name` | `string?` | `null` | Gateway cluster name | | `Host` | `string` | `"0.0.0.0"` | Listen address (not used; stub only) | | `Port` | `int` | `0` | Listen port (not used; stub only) | ### LeafNodeOptions | Field | Type | Default | Description | |-------|------|---------|-------------| | `Host` | `string` | `"0.0.0.0"` | Listen address (not used; stub only) | | `Port` | `int` | `0` | Listen port (not used; stub only) | ### Route endpoint format `ParseRouteEndpoint` in `RouteManager` parses entries in `ClusterOptions.Routes`. The format is a bare `host:port` string — **not** the `nats-route://host:port` URL scheme that the Go server config file uses: ```csharp private static IPEndPoint ParseRouteEndpoint(string route) { var trimmed = route.Trim(); var parts = trimmed.Split(':', 2, StringSplitOptions.TrimEntries | StringSplitOptions.RemoveEmptyEntries); if (parts.Length != 2) throw new FormatException($"Invalid route endpoint: '{route}'"); return new IPEndPoint(IPAddress.Parse(parts[0]), int.Parse(parts[1])); } ``` Only IPv4 addresses are accepted — `IPAddress.Parse` is called directly on `parts[0]` with no hostname resolution. Hostname-based seeds will throw. --- ## What Is Not Implemented The following features from the Go reference are not present in this codebase: - **RMSG wire routing** — messages are never sent over a route TCP connection; cross-server delivery only works in-process - **RS+/RS- wire protocol** — subscription interest is propagated by direct in-process method calls, not over the wire - **Route pooling** — the Go server opens 3 TCP connections per peer by default; this implementation opens 1 - **Route compression** — the Go server optionally compresses route traffic with S2; no compression is implemented here - **Solicited routes** — when a Go server connects to a seed, the seed can back-propagate other cluster member addresses for full-mesh formation; this does not occur here - **Full-mesh auto-formation** — beyond the configured seed list, no additional peer discovery or mesh formation happens - **Gateways** — no inter-cluster bridge networking; `GatewayManager` is a logging stub - **Leaf nodes** — no edge node networking; `LeafNodeManager` is a logging stub - **Route-compatible INFO/CONNECT handshake** — the custom `ROUTE ` handshake is not compatible with the Go server's route protocol --- ## Related Documentation - [Server Overview](../Server/Overview.md) - [Subscriptions Overview](../Subscriptions/Overview.md) - [Configuration Overview](../Configuration/Overview.md)