Compare commits

...

10 Commits

Author SHA1 Message Date
Joseph Doherty
a0926c3a50 docs: add design doc for SYSTEM and ACCOUNT connection types
Covers 6 implementation layers: ClientKind enum + INatsClient interface,
event infrastructure with Channel<T>, system event publishing, request-reply
monitoring services, import/export model with ACCOUNT client, and response
routing with latency tracking.
2026-02-23 05:03:17 -05:00
Joseph Doherty
ad336167b9 docs: update differences.md to reflect config parsing and hot reload implementation 2026-02-23 04:58:53 -05:00
Joseph Doherty
684ee222ad feat: integrate config file loading and SIGHUP hot reload
Wire up the config parsing infrastructure into the server:
- NatsServer: add ReloadConfig() with digest-based change detection,
  diff/validate, CLI override preservation, and side-effect triggers
- Program.cs: two-pass CLI parsing — load config file first, then
  apply CLI args on top with InCmdLine tracking for reload precedence
- SIGHUP handler upgraded from stub warning to actual reload
- Remove config file "not yet supported" warning from StartAsync
- Add integration tests for config loading, CLI overrides, and
  reload validation
2026-02-23 04:57:34 -05:00
Joseph Doherty
d21243bc8a feat: add config reloader with diff, validate, and CLI merge
Port of Go server/reload.go option interface and diffing logic. Compares
NatsOptions property-by-property to detect changes, tags each with category
flags (logging, auth, TLS, non-reloadable), validates that non-reloadable
options (Host, Port, ServerName) are not changed at runtime, and provides
MergeCliOverrides to ensure CLI flags always take precedence over config
file values during hot reload.
2026-02-23 04:53:25 -05:00
Joseph Doherty
8a2ded8e48 feat: add config processor mapping parsed config to NatsOptions
Port of Go server/opts.go processConfigFileLine switch. Maps parsed
NATS config dictionaries to NatsOptions fields including:
- Core options (port, host, server_name, limits, ping, write_deadline)
- Logging (debug, trace, logfile, log rotation)
- Authorization (single user, users array with permissions)
- TLS (cert/key/ca, verify, pinned_certs, handshake_first)
- Monitoring (http_port, https_port, http/https listen, base_path)
- Lifecycle (lame_duck_duration/grace_period)
- Server tags, file paths, system account options

Includes error collection (not fail-fast), duration parsing (ms/s/m/h
strings and numeric seconds), host:port listen parsing, and 56 tests
covering all config sections plus validation edge cases.
2026-02-23 04:47:54 -05:00
Joseph Doherty
5219f77f9b fix: add include depth limit, fix PopContext guard, add SetValue fallback
- Add MaxIncludeDepth = 10 constant and thread _includeDepth through ParserState
  constructors, ProcessInclude, ParseFile (private overload), and ParseEnvValue
  to prevent StackOverflowException from recursive includes
- Fix PopContext to check _ctxs.Count <= 1 instead of == 0 so the root context
  is never popped, replacing silent crash with clear InvalidOperationException
- Add else throw in SetValue so unknown context types surface as bugs rather
  than silently dropping values
2026-02-23 04:42:37 -05:00
Joseph Doherty
9f66ef72c6 feat: add NATS config file parser (port of Go conf/parse.go)
Implements NatsConfParser with Parse, ParseFile, and ParseFileWithDigest
methods. Supports nested maps/arrays, variable resolution with block
scoping and environment fallback, bcrypt password literals, integer
suffix multipliers, include directives, and cycle detection.
2026-02-23 04:35:46 -05:00
Joseph Doherty
ae043136a1 fix: address lexer code review findings (newline handling, emit cleanup, null guard) 2026-02-23 04:30:36 -05:00
Joseph Doherty
f952e6afab feat: add new NatsOptions fields for Go config parity
Adds 10 new fields to NatsOptions (ClientAdvertise, TraceVerbose, MaxTracedMsgLen,
DisableSublistCache, ConnectErrorReports, ReconnectErrorReports, NoHeaderSupport,
MaxClosedClients, NoSystemAccount, SystemAccount) plus InCmdLine tracking set.
Moves MaxClosedClients from a private constant in NatsServer to a configurable option.
2026-02-23 04:23:27 -05:00
Joseph Doherty
9fff5709c4 feat: add NATS config file lexer (port of Go conf/lex.go)
Port the NATS configuration file lexer from Go's conf/lex.go to C#.
The lexer is a state-machine tokenizer that supports the NATS config
format: key-value pairs with =, :, or whitespace separators; nested
maps {}; arrays []; single and double quoted strings with escape
sequences; block strings (); variables $VAR; include directives;
comments (# and //); booleans; integers with size suffixes (kb, mb, gb);
floats; ISO8601 datetimes; and IP addresses.
2026-02-23 04:20:56 -05:00
22 changed files with 5035 additions and 21 deletions

View File

@@ -12,7 +12,7 @@
|---------|:--:|:----:|-------|
| NKey generation (server identity) | Y | Y | Ed25519 key pair via NATS.NKeys at startup |
| System account setup | Y | Y | `$SYS` account created; no event publishing yet (stub) |
| Config file validation on startup | Y | Stub | `-c` flag parsed, `ConfigFile` stored, but no config parser |
| Config file validation on startup | Y | Y | Full config parsing with error collection via `ConfigProcessor` |
| PID file writing | Y | Y | Written on startup, deleted on shutdown |
| Profiling HTTP endpoint (`/debug/pprof`) | Y | Stub | `ProfPort` option exists but endpoint not implemented |
| Ports file output | Y | Y | JSON ports file written to `PortsFileDir` on startup |
@@ -42,7 +42,7 @@
| SIGTERM | Y | Y | `PosixSignalRegistration` triggers `ShutdownAsync()` |
| SIGUSR1 (reopen logs) | Y | Y | SIGUSR1 handler calls ReOpenLogFile |
| SIGUSR2 (lame duck mode) | Y | Y | Triggers `LameDuckShutdownAsync()` |
| SIGHUP (config reload) | Y | Stub | Signal registered, handler logs "not yet implemented" |
| SIGHUP (config reload) | Y | Y | Re-parses config, diffs options, applies reloadable subset; CLI flags preserved |
| Windows Service integration | Y | Y | `--service` flag with `Microsoft.Extensions.Hosting.WindowsServices` |
---
@@ -247,7 +247,7 @@ Go implements a sophisticated slow consumer detection system:
| `-a/--addr` | Y | Y | |
| `-n/--name` (ServerName) | Y | Y | |
| `-m/--http_port` (monitoring) | Y | Y | |
| `-c` (config file) | Y | Stub | Flag parsed, stored in `ConfigFile`, no config parser |
| `-c` (config file) | Y | Y | Full config parsing: lexer → parser → processor; CLI args override config |
| `-D/-V/-DV` (debug/trace) | Y | Y | `-D`/`--debug` for debug, `-V`/`-T`/`--trace` for trace, `-DV` for both |
| `--tlscert/--tlskey/--tlscacert` | Y | Y | |
| `--tlsverify` | Y | Y | |
@@ -257,10 +257,10 @@ Go implements a sophisticated slow consumer detection system:
### Configuration System
| Feature | Go | .NET | Notes |
|---------|:--:|:----:|-------|
| Config file parsing | Y | N | Go has custom `conf` parser with includes |
| Hot reload (SIGHUP) | Y | N | |
| Config change detection | Y | N | Go tracks `inConfig`/`inCmdLine` origins |
| ~450 option fields | Y | ~62 | .NET covers core + debug/trace/logging/limits/tags options |
| Config file parsing | Y | Y | Custom NATS conf lexer/parser ported from Go; supports includes, variables, blocks |
| Hot reload (SIGHUP) | Y | Y | Reloads logging, auth, limits, TLS certs on SIGHUP; rejects non-reloadable changes |
| Config change detection | Y | Y | SHA256 digest comparison; `InCmdLine` tracks CLI flag precedence |
| ~450 option fields | Y | ~72 | .NET covers core + all single-server options; cluster/JetStream keys silently ignored |
### Missing Options Categories
- ~~Logging options~~ — file logging, rotation, syslog, debug/trace, color, timestamps, per-subsystem log control all implemented
@@ -404,10 +404,8 @@ The following items from the original gap list have been implemented:
- **Permission templates** — `PermissionTemplates.Expand()` with 6 functions and cartesian product
- **Bearer tokens** — `UserClaims.BearerToken` skips nonce verification
- **User revocation** — per-account tracking with wildcard (`*`) revocation
### Remaining High Priority
1. **Config file parsing** — needed for production deployment (CLI stub exists)
2. **Hot reload** — needed for zero-downtime config changes (SIGHUP stub exists)
- **Config file parsing** — custom lexer/parser ported from Go; supports includes, variables, nested blocks, size suffixes
- **Hot reload (SIGHUP)** — re-parses config, diffs changes, validates reloadable set, applies with CLI precedence
### Remaining Lower Priority
3. **Dynamic buffer sizing** — delegated to Pipe, less optimized for long-lived connections
1. **Dynamic buffer sizing** — delegated to Pipe, less optimized for long-lived connections

View File

@@ -0,0 +1,565 @@
# Design: SYSTEM and ACCOUNT Connection Types
**Date:** 2026-02-23
**Status:** Approved
**Approach:** Bottom-Up Layered Build (6 layers)
## Overview
Port the SYSTEM and ACCOUNT internal connection types from the Go NATS server to .NET. This includes:
- Client type differentiation (ClientKind enum)
- Internal client infrastructure (socketless clients with callback-based delivery)
- Full system event publishing ($SYS.ACCOUNT.*.CONNECT, DISCONNECT, STATSZ, etc.)
- System request-reply monitoring services ($SYS.REQ.SERVER.*.VARZ, CONNZ, etc.)
- Account service/stream imports and exports (cross-account message routing)
- Response routing for service imports with latency tracking
**Go reference files:**
- `golang/nats-server/server/client.go` — client type constants (lines 45-65), `isInternalClient()`, message delivery (lines 3789-3803)
- `golang/nats-server/server/server.go` — system account setup (lines 1822-1892), `createInternalClient()` (lines 1910-1936)
- `golang/nats-server/server/events.go``internal` struct (lines 124-147), event subjects (lines 41-97), send/receive loops (lines 474-668), event publishing, subscriptions (lines 1172-1495)
- `golang/nats-server/server/accounts.go``Account` struct (lines 52-119), import/export structs (lines 142-263), `addServiceImport()` (lines 1560-2112), `addServiceImportSub()` (lines 2156-2187), `internalClient()` (lines 2114-2122)
---
## Layer 1: ClientKind Enum + INatsClient Interface + InternalClient
### ClientKind Enum
**New file:** `src/NATS.Server/ClientKind.cs`
```csharp
public enum ClientKind
{
Client, // End user connection
Router, // Cluster peer (out of scope)
Gateway, // Inter-cluster bridge (out of scope)
Leaf, // Leaf node (out of scope)
System, // Internal system client
JetStream, // Internal JetStream client (out of scope)
Account, // Internal per-account client
}
public static class ClientKindExtensions
{
public static bool IsInternal(this ClientKind kind) =>
kind is ClientKind.System or ClientKind.JetStream or ClientKind.Account;
}
```
### INatsClient Interface
Extract from `NatsClient` the surface used by `Subscription`, `DeliverMessage`, `ProcessMessage`:
```csharp
public interface INatsClient
{
ulong Id { get; }
ClientKind Kind { get; }
bool IsInternal { get; }
Account? Account { get; }
ClientOptions? ClientOpts { get; }
ClientPermissions? Permissions { get; }
void SendMessage(string subject, string sid, string? replyTo,
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload);
bool QueueOutbound(ReadOnlyMemory<byte> data);
}
```
### InternalClient Class
**New file:** `src/NATS.Server/InternalClient.cs`
Lightweight, socketless client for internal messaging:
- `ClientKind Kind` — System, Account, or JetStream
- `Account Account` — associated account
- `ulong Id` — unique client ID from server's ID counter
- Headers always enabled, echo always disabled
- `SendMessage` invokes internal callback delegate or pushes to Channel
- No socket, no read/write loops, no parser
- `QueueOutbound` is a no-op (internal clients don't write wire protocol)
### Subscription Change
`Subscription.Client` changes from `NatsClient?` to `INatsClient?`. This is the biggest refactoring step — all code referencing `sub.Client` as `NatsClient` needs updating.
`NatsClient` implements `INatsClient` with `Kind = ClientKind.Client`.
---
## Layer 2: System Event Infrastructure
### InternalEventSystem Class
**New file:** `src/NATS.Server/Events/InternalEventSystem.cs`
Core class managing the server's internal event system, mirroring Go's `internal` struct:
```csharp
public sealed class InternalEventSystem : IAsyncDisposable
{
// Core state
public Account SystemAccount { get; }
public InternalClient SystemClient { get; }
private ulong _sequence;
private int _subscriptionId;
private readonly string _serverHash;
private readonly string _inboxPrefix;
// Message queues (Channel<T>-based)
private readonly Channel<PublishMessage> _sendQueue;
private readonly Channel<InternalSystemMessage> _receiveQueue;
private readonly Channel<InternalSystemMessage> _receiveQueuePings;
// Background tasks
private Task? _sendLoop;
private Task? _receiveLoop;
private Task? _receiveLoopPings;
// Remote server tracking
private readonly ConcurrentDictionary<string, ServerUpdate> _remoteServers = new();
// Timers
private PeriodicTimer? _statszTimer; // 10s interval
private PeriodicTimer? _accountConnsTimer; // 30s interval
private PeriodicTimer? _orphanSweeper; // 90s interval
}
```
### Message Types
```csharp
public record PublishMessage(
InternalClient? Client, // Use specific client or default to system client
string Subject,
string? Reply,
ServerInfo? Info,
byte[]? Headers,
object? Body, // JSON-serializable
bool Echo = false,
bool IsLast = false);
public record InternalSystemMessage(
Subscription? Sub,
INatsClient? Client,
Account? Account,
string Subject,
string? Reply,
ReadOnlyMemory<byte> Headers,
ReadOnlyMemory<byte> Message,
Action<Subscription?, INatsClient?, Account?, string, string?, ReadOnlyMemory<byte>, ReadOnlyMemory<byte>> Callback);
```
### Lifecycle
- `StartAsync(NatsServer server)` — creates system client, starts 3 background Tasks
- `StopAsync()` — publishes shutdown event with `IsLast=true`, signals channels complete, awaits all tasks
### Send Loop
Consumes from `_sendQueue`:
1. Fills in ServerInfo metadata (name, host, ID, sequence, version, tags)
2. Serializes body to JSON using source-generated serializer
3. Calls `server.ProcessMessage()` on the system account to deliver locally
4. Handles compression if configured
### Receive Loop(s)
Two instances (general + pings) consuming from their respective channels:
- Pop messages, invoke callbacks
- Exit on cancellation
### APIs on NatsServer
```csharp
public void SendInternalMsg(string subject, string? reply, object? msg);
public void SendInternalAccountMsg(Account account, string subject, object? msg);
public Subscription SysSubscribe(string subject, SystemMessageHandler callback);
public Subscription SysSubscribeInternal(string subject, SystemMessageHandler callback);
```
### noInlineCallback Pattern
Wraps a `SystemMessageHandler` so that instead of executing inline during message delivery, it enqueues to `_receiveQueue` for async dispatch. This prevents system event handlers from blocking the publishing path.
---
## Layer 3: System Event Publishing
### Event Types (DTOs)
**New folder:** `src/NATS.Server/Events/`
All events embed a `TypedEvent` base:
```csharp
public record TypedEvent(string Type, string Id, DateTime Time);
```
| Event Class | Type String | Published On |
|-------------|-------------|-------------|
| `ConnectEventMsg` | `io.nats.server.advisory.v1.client_connect` | `$SYS.ACCOUNT.{acc}.CONNECT` |
| `DisconnectEventMsg` | `io.nats.server.advisory.v1.client_disconnect` | `$SYS.ACCOUNT.{acc}.DISCONNECT` |
| `AccountNumConns` | `io.nats.server.advisory.v1.account_connections` | `$SYS.ACCOUNT.{acc}.SERVER.CONNS` |
| `ServerStatsMsg` | (stats) | `$SYS.SERVER.{id}.STATSZ` |
| `ShutdownEventMsg` | (shutdown) | `$SYS.SERVER.{id}.SHUTDOWN` |
| `LameDuckEventMsg` | (lameduck) | `$SYS.SERVER.{id}.LAMEDUCK` |
| `AuthErrorEventMsg` | `io.nats.server.advisory.v1.client_auth` | `$SYS.SERVER.{id}.CLIENT.AUTH.ERR` |
### Integration Points
| Location | Event | Trigger |
|----------|-------|---------|
| `NatsServer.HandleClientAsync()` after auth | `ConnectEventMsg` | Client authenticated |
| `NatsServer.RemoveClient()` | `DisconnectEventMsg` | Client disconnected |
| `NatsServer.ShutdownAsync()` | `ShutdownEventMsg` | Server shutting down |
| `NatsServer.LameDuckShutdownAsync()` | `LameDuckEventMsg` | Lame duck mode |
| Auth failure in `NatsClient.ProcessConnect()` | `AuthErrorEventMsg` | Auth rejected |
| Periodic timer (10s) | `ServerStatsMsg` | Timer tick |
| Periodic timer (30s) | `AccountNumConns` | Timer tick, for each account with connections |
### JSON Serialization
`System.Text.Json` source generator context:
```csharp
[JsonSerializable(typeof(ConnectEventMsg))]
[JsonSerializable(typeof(DisconnectEventMsg))]
[JsonSerializable(typeof(ServerStatsMsg))]
// ... etc
internal partial class EventJsonContext : JsonSerializerContext { }
```
---
## Layer 4: System Request-Reply Services
### Subscriptions Created in initEventTracking()
Server-specific (only this server responds):
| Subject | Handler | Response |
|---------|---------|----------|
| `$SYS.REQ.SERVER.{id}.IDZ` | `IdzReq` | Server identity |
| `$SYS.REQ.SERVER.{id}.STATSZ` | `StatszReq` | Server stats (same as /varz stats) |
| `$SYS.REQ.SERVER.{id}.VARZ` | `VarzReq` | Same as /varz JSON |
| `$SYS.REQ.SERVER.{id}.CONNZ` | `ConnzReq` | Same as /connz JSON |
| `$SYS.REQ.SERVER.{id}.SUBSZ` | `SubszReq` | Same as /subz JSON |
| `$SYS.REQ.SERVER.{id}.HEALTHZ` | `HealthzReq` | Health status |
| `$SYS.REQ.SERVER.{id}.ACCOUNTZ` | `AccountzReq` | Account info |
Wildcard ping (all servers respond):
| Subject | Handler |
|---------|---------|
| `$SYS.REQ.SERVER.PING.STATSZ` | `StatszReq` |
| `$SYS.REQ.SERVER.PING.VARZ` | `VarzReq` |
| `$SYS.REQ.SERVER.PING.IDZ` | `IdzReq` |
| `$SYS.REQ.SERVER.PING.HEALTHZ` | `HealthzReq` |
Account-scoped:
| Subject | Handler |
|---------|---------|
| `$SYS.REQ.ACCOUNT.*.CONNZ` | `AccountConnzReq` |
| `$SYS.REQ.ACCOUNT.*.SUBSZ` | `AccountSubszReq` |
| `$SYS.REQ.ACCOUNT.*.INFO` | `AccountInfoReq` |
| `$SYS.REQ.ACCOUNT.*.STATZ` | `AccountStatzReq` |
### Implementation
Handlers reuse existing `MonitorServer` data builders. The request body (if present) is parsed for options (e.g., sort, limit for CONNZ). Response is serialized to JSON and published on the request's reply subject via `SendInternalMsg`.
---
## Layer 5: Import/Export Model + ACCOUNT Client
### Export Types
**New file:** `src/NATS.Server/Imports/StreamExport.cs`
```csharp
public sealed class StreamExport
{
public ExportAuth Auth { get; init; } = new();
}
```
**New file:** `src/NATS.Server/Imports/ServiceExport.cs`
```csharp
public sealed class ServiceExport
{
public ExportAuth Auth { get; init; } = new();
public Account? Account { get; init; }
public ServiceResponseType ResponseType { get; init; } = ServiceResponseType.Singleton;
public TimeSpan ResponseThreshold { get; init; } = TimeSpan.FromMinutes(2);
public ServiceLatency? Latency { get; init; }
public bool AllowTrace { get; init; }
}
```
**New file:** `src/NATS.Server/Imports/ExportAuth.cs`
```csharp
public sealed class ExportAuth
{
public bool TokenRequired { get; init; }
public uint AccountPosition { get; init; }
public HashSet<string>? ApprovedAccounts { get; init; }
public Dictionary<string, long>? RevokedAccounts { get; init; }
public bool IsAuthorized(Account account) { ... }
}
```
### Import Types
**New file:** `src/NATS.Server/Imports/StreamImport.cs`
```csharp
public sealed class StreamImport
{
public required Account SourceAccount { get; init; }
public required string From { get; init; }
public required string To { get; init; }
public SubjectTransform? Transform { get; init; }
public bool UsePub { get; init; }
public bool Invalid { get; set; }
}
```
**New file:** `src/NATS.Server/Imports/ServiceImport.cs`
```csharp
public sealed class ServiceImport
{
public required Account DestinationAccount { get; init; }
public required string From { get; init; }
public required string To { get; init; }
public SubjectTransform? Transform { get; init; }
public ServiceExport? Export { get; init; }
public ServiceResponseType ResponseType { get; init; }
public byte[]? Sid { get; set; }
public bool IsResponse { get; init; }
public bool UsePub { get; init; }
public bool Invalid { get; set; }
public bool Share { get; init; }
public bool Tracking { get; init; }
}
```
### Account Extensions
Add to `Account`:
```csharp
// Export/Import maps
public ExportMap Exports { get; } = new();
public ImportMap Imports { get; } = new();
// Internal ACCOUNT client (lazy)
private InternalClient? _internalClient;
public InternalClient GetOrCreateInternalClient(NatsServer server) { ... }
// Internal subscription management
private ulong _internalSubId;
public Subscription SubscribeInternal(string subject, SystemMessageHandler callback) { ... }
// Import/Export APIs
public void AddServiceExport(string subject, ServiceResponseType responseType, IEnumerable<Account>? approved);
public void AddStreamExport(string subject, IEnumerable<Account>? approved);
public ServiceImport AddServiceImport(Account destination, string from, string to);
public void AddStreamImport(Account source, string from, string to);
```
### ExportMap / ImportMap
```csharp
public sealed class ExportMap
{
public Dictionary<string, StreamExport> Streams { get; } = new(StringComparer.Ordinal);
public Dictionary<string, ServiceExport> Services { get; } = new(StringComparer.Ordinal);
public Dictionary<string, ServiceImport> Responses { get; } = new(StringComparer.Ordinal);
}
public sealed class ImportMap
{
public List<StreamImport> Streams { get; } = [];
public Dictionary<string, List<ServiceImport>> Services { get; } = new(StringComparer.Ordinal);
}
```
### Service Import Subscription Flow
1. `account.AddServiceImport(dest, "requests.>", "api.>")` called
2. Account creates its `InternalClient` (Kind=Account) if needed
3. Creates subscription on `"requests.>"` in account's SubList with `Client = internalClient`
4. Subscription carries a `ServiceImport` reference
5. When message matches, `DeliverMessage` detects internal client → invokes `ProcessServiceImport`
### ProcessServiceImport Callback
1. Transform subject if transform configured
2. Match against destination account's SubList
3. Deliver to destination subscribers (rewriting reply subject for response routing)
4. If reply present: set up response service import (see Layer 6)
### Stream Import Delivery
In `DeliverMessage`, before sending to subscriber:
- If subscription has `StreamImport` reference, apply subject transform
- Deliver with transformed subject
### Message Delivery Path Changes
`NatsServer.ProcessMessage` needs modification:
- After matching local account SubList, also check for service imports that might forward to other accounts
- For subscriptions with `sub.StreamImport != null`, transform subject before delivery
---
## Layer 6: Response Routing + Latency Tracking
### Service Reply Prefix
Generated per request: `_R_.{random10chars}.` — unique reply namespace in the exporting account.
### Response Service Import Creation
When `ProcessServiceImport` handles a request with a reply subject:
1. Generate new reply prefix: `_R_.{random}.`
2. Create response `ServiceImport` in the exporting account:
- `From = newReplyPrefix + ">"` (wildcard to catch all responses)
- `To = originalReply` (original reply subject in importing account)
- `IsResponse = true`
3. Subscribe to new prefix in exporting account
4. Rewrite reply in forwarded message to new prefix
5. Store in `ExportMap.Responses[newPrefix]`
### Response Delivery
When exporting account service responds on the rewritten reply:
1. Response matches the `_R_.{random}.>` subscription
2. Response service import callback fires
3. Transforms reply back to original subject
4. Delivers to original account's subscribers
### Cleanup
- **Singleton:** Remove response import after first response delivery
- **Streamed:** Track timestamp, clean up via timer after `ResponseThreshold` (default 2 min)
- **Chunked:** Same as Streamed
Timer runs periodically (every 30s), checks `ServiceImport.Timestamp` against threshold, removes stale entries.
### Latency Tracking
```csharp
public sealed class ServiceLatency
{
public int SamplingPercentage { get; init; } // 1-100
public string Subject { get; init; } = string.Empty; // where to publish metrics
}
public record ServiceLatencyMsg(
TypedEvent Event,
string Status,
string Requestor, // Account name
string Responder, // Account name
TimeSpan RequestStart,
TimeSpan ServiceLatency,
TimeSpan TotalLatency);
```
When tracking is enabled:
1. Record request timestamp when creating response import
2. On response delivery, calculate latency
3. Publish `ServiceLatencyMsg` to configured subject
4. Sampling: only track if `Random.Shared.Next(100) < SamplingPercentage`
---
## Testing Strategy
### Layer 1 Tests
- Verify `ClientKind.IsInternal()` for all kinds
- Create `InternalClient`, verify properties (Kind, Id, Account, IsInternal)
- Verify `INatsClient` interface on both `NatsClient` and `InternalClient`
### Layer 2 Tests
- Start/stop `InternalEventSystem` lifecycle
- `SysSubscribe` creates subscription in system account SubList
- `SendInternalMsg` delivers to system subscribers via send loop
- `noInlineCallback` queues to receive loop rather than executing inline
- Concurrent publish/subscribe stress test
### Layer 3 Tests
- Connect event published on `$SYS.ACCOUNT.{acc}.CONNECT` when client authenticates
- Disconnect event published when client closes
- Server stats published every 10s on `$SYS.SERVER.{id}.STATSZ`
- Account conns published every 30s for accounts with connections
- Shutdown event published during shutdown
- Auth error event published on auth failure
- Event JSON structure matches Go format
### Layer 4 Tests
- Subscribe to `$SYS.REQ.SERVER.{id}.VARZ`, send request, verify response matches /varz
- Subscribe to `$SYS.REQ.SERVER.{id}.CONNZ`, verify response
- Ping wildcard `$SYS.REQ.SERVER.PING.HEALTHZ` receives response
- Account-scoped requests work
### Layer 5 Tests
- `AddServiceExport` + `AddServiceImport` creates internal subscription
- Message published on import subject is forwarded to export account
- Wildcard imports with subject transforms
- Authorization: only approved accounts can import
- Stream import with subject transform
- Cycle detection in service imports
- Account internal client lazy creation
### Layer 6 Tests
- Service import request-reply: request forwarded with rewritten reply, response routed back
- Singleton response: import cleaned up after one response
- Streamed response: multiple responses, cleaned up after timeout
- Latency tracking: metrics published to configured subject
- Response threshold timer cleans up stale entries
---
## Files to Create/Modify
### New Files
- `src/NATS.Server/ClientKind.cs`
- `src/NATS.Server/INatsClient.cs`
- `src/NATS.Server/InternalClient.cs`
- `src/NATS.Server/Events/InternalEventSystem.cs`
- `src/NATS.Server/Events/EventTypes.cs` (all event DTOs)
- `src/NATS.Server/Events/EventJsonContext.cs` (source gen)
- `src/NATS.Server/Events/EventSubjects.cs` (subject constants)
- `src/NATS.Server/Imports/ServiceImport.cs`
- `src/NATS.Server/Imports/StreamImport.cs`
- `src/NATS.Server/Imports/ServiceExport.cs`
- `src/NATS.Server/Imports/StreamExport.cs`
- `src/NATS.Server/Imports/ExportAuth.cs`
- `src/NATS.Server/Imports/ExportMap.cs`
- `src/NATS.Server/Imports/ImportMap.cs`
- `src/NATS.Server/Imports/ServiceResponseType.cs`
- `src/NATS.Server/Imports/ServiceLatency.cs`
- `tests/NATS.Server.Tests/InternalClientTests.cs`
- `tests/NATS.Server.Tests/EventSystemTests.cs`
- `tests/NATS.Server.Tests/SystemEventsTests.cs`
- `tests/NATS.Server.Tests/SystemRequestReplyTests.cs`
- `tests/NATS.Server.Tests/ImportExportTests.cs`
- `tests/NATS.Server.Tests/ResponseRoutingTests.cs`
### Modified Files
- `src/NATS.Server/NatsClient.cs` — implement `INatsClient`, add `Kind` property
- `src/NATS.Server/NatsServer.cs` — integrate event system, add import/export message path, system event publishing
- `src/NATS.Server/Auth/Account.cs` — add exports/imports, internal client, subscription APIs
- `src/NATS.Server/Subscriptions/Subscription.cs``Client``INatsClient?`, add `ServiceImport?`, `StreamImport?`
- `src/NATS.Server/Subscriptions/SubList.cs` — work with `INatsClient` if needed
- `src/NATS.Server/Monitoring/MonitorServer.cs` — expose data builders for request-reply handlers
- `differences.md` — update SYSTEM, ACCOUNT, import/export status

View File

@@ -1,86 +1,126 @@
using NATS.Server;
using NATS.Server.Configuration;
using Serilog;
using Serilog.Sinks.SystemConsole.Themes;
var options = new NatsOptions();
// First pass: scan args for -c flag to get config file path
string? configFile = null;
for (int i = 0; i < args.Length; i++)
{
if (args[i] == "-c" && i + 1 < args.Length)
{
configFile = args[++i];
break;
}
}
var windowsService = false;
// Parse ALL CLI flags into NatsOptions first
// If config file specified, load it as the base options
var options = configFile != null
? ConfigProcessor.ProcessConfigFile(configFile)
: new NatsOptions();
// Second pass: apply CLI args on top of config-loaded options, tracking InCmdLine
for (int i = 0; i < args.Length; i++)
{
switch (args[i])
{
case "-p" or "--port" when i + 1 < args.Length:
options.Port = int.Parse(args[++i]);
options.InCmdLine.Add("Port");
break;
case "-a" or "--addr" when i + 1 < args.Length:
options.Host = args[++i];
options.InCmdLine.Add("Host");
break;
case "-n" or "--name" when i + 1 < args.Length:
options.ServerName = args[++i];
options.InCmdLine.Add("ServerName");
break;
case "-m" or "--http_port" when i + 1 < args.Length:
options.MonitorPort = int.Parse(args[++i]);
options.InCmdLine.Add("MonitorPort");
break;
case "--http_base_path" when i + 1 < args.Length:
options.MonitorBasePath = args[++i];
options.InCmdLine.Add("MonitorBasePath");
break;
case "--https_port" when i + 1 < args.Length:
options.MonitorHttpsPort = int.Parse(args[++i]);
options.InCmdLine.Add("MonitorHttpsPort");
break;
case "-c" when i + 1 < args.Length:
options.ConfigFile = args[++i];
// Already handled in first pass; skip the value
i++;
break;
case "--pid" when i + 1 < args.Length:
options.PidFile = args[++i];
options.InCmdLine.Add("PidFile");
break;
case "--ports_file_dir" when i + 1 < args.Length:
options.PortsFileDir = args[++i];
options.InCmdLine.Add("PortsFileDir");
break;
case "--tls":
break;
case "--tlscert" when i + 1 < args.Length:
options.TlsCert = args[++i];
options.InCmdLine.Add("TlsCert");
break;
case "--tlskey" when i + 1 < args.Length:
options.TlsKey = args[++i];
options.InCmdLine.Add("TlsKey");
break;
case "--tlscacert" when i + 1 < args.Length:
options.TlsCaCert = args[++i];
options.InCmdLine.Add("TlsCaCert");
break;
case "--tlsverify":
options.TlsVerify = true;
options.InCmdLine.Add("TlsVerify");
break;
case "-D" or "--debug":
options.Debug = true;
options.InCmdLine.Add("Debug");
break;
case "-V" or "-T" or "--trace":
options.Trace = true;
options.InCmdLine.Add("Trace");
break;
case "-DV":
options.Debug = true;
options.Trace = true;
options.InCmdLine.Add("Debug");
options.InCmdLine.Add("Trace");
break;
case "-l" or "--log" or "--log_file" when i + 1 < args.Length:
options.LogFile = args[++i];
options.InCmdLine.Add("LogFile");
break;
case "--log_size_limit" when i + 1 < args.Length:
options.LogSizeLimit = long.Parse(args[++i]);
options.InCmdLine.Add("LogSizeLimit");
break;
case "--log_max_files" when i + 1 < args.Length:
options.LogMaxFiles = int.Parse(args[++i]);
options.InCmdLine.Add("LogMaxFiles");
break;
case "--logtime" when i + 1 < args.Length:
options.Logtime = bool.Parse(args[++i]);
options.InCmdLine.Add("Logtime");
break;
case "--logtime_utc":
options.LogtimeUTC = true;
options.InCmdLine.Add("LogtimeUTC");
break;
case "--syslog":
options.Syslog = true;
options.InCmdLine.Add("Syslog");
break;
case "--remote_syslog" when i + 1 < args.Length:
options.RemoteSyslog = args[++i];
options.InCmdLine.Add("RemoteSyslog");
break;
case "--service":
windowsService = true;
@@ -163,6 +203,14 @@ if (windowsService)
using var loggerFactory = new Serilog.Extensions.Logging.SerilogLoggerFactory(Log.Logger);
using var server = new NatsServer(options, loggerFactory);
// Store CLI snapshot for reload precedence (CLI flags always win over config file)
if (configFile != null && options.InCmdLine.Count > 0)
{
var cliSnapshot = new NatsOptions();
ConfigReloader.MergeCliOverrides(cliSnapshot, options, options.InCmdLine);
server.SetCliSnapshot(cliSnapshot, options.InCmdLine);
}
// Register signal handlers
server.HandleSignals();

View File

@@ -0,0 +1,685 @@
// Port of Go server/opts.go processConfigFileLine — maps parsed config dictionaries
// to NatsOptions. Reference: golang/nats-server/server/opts.go lines 1050-1400.
using System.Globalization;
using System.Text.RegularExpressions;
using NATS.Server.Auth;
namespace NATS.Server.Configuration;
/// <summary>
/// Maps a parsed NATS configuration dictionary (produced by <see cref="NatsConfParser"/>)
/// into a fully populated <see cref="NatsOptions"/> instance. Collects all validation
/// errors rather than failing on the first one.
/// </summary>
public static class ConfigProcessor
{
/// <summary>
/// Parses a configuration file and returns the populated options.
/// </summary>
public static NatsOptions ProcessConfigFile(string filePath)
{
var config = NatsConfParser.ParseFile(filePath);
var opts = new NatsOptions { ConfigFile = filePath };
ApplyConfig(config, opts);
return opts;
}
/// <summary>
/// Parses configuration text (not from a file) and returns the populated options.
/// </summary>
public static NatsOptions ProcessConfig(string configText)
{
var config = NatsConfParser.Parse(configText);
var opts = new NatsOptions();
ApplyConfig(config, opts);
return opts;
}
/// <summary>
/// Applies a parsed configuration dictionary to existing options.
/// Throws <see cref="ConfigProcessorException"/> if any validation errors are collected.
/// </summary>
public static void ApplyConfig(Dictionary<string, object?> config, NatsOptions opts)
{
var errors = new List<string>();
foreach (var (key, value) in config)
{
try
{
ProcessKey(key, value, opts, errors);
}
catch (Exception ex)
{
errors.Add($"Error processing '{key}': {ex.Message}");
}
}
if (errors.Count > 0)
{
throw new ConfigProcessorException("Configuration errors", errors);
}
}
private static void ProcessKey(string key, object? value, NatsOptions opts, List<string> errors)
{
// Keys are already case-insensitive from the parser (OrdinalIgnoreCase dictionaries),
// but we normalize here for the switch statement.
switch (key.ToLowerInvariant())
{
case "listen":
ParseListen(value, opts);
break;
case "port":
opts.Port = ToInt(value);
break;
case "host" or "net":
opts.Host = ToString(value);
break;
case "server_name":
var name = ToString(value);
if (name.Contains(' '))
errors.Add("server_name cannot contain spaces");
else
opts.ServerName = name;
break;
case "client_advertise":
opts.ClientAdvertise = ToString(value);
break;
// Logging
case "debug":
opts.Debug = ToBool(value);
break;
case "trace":
opts.Trace = ToBool(value);
break;
case "trace_verbose":
opts.TraceVerbose = ToBool(value);
if (opts.TraceVerbose)
opts.Trace = true;
break;
case "logtime":
opts.Logtime = ToBool(value);
break;
case "logtime_utc":
opts.LogtimeUTC = ToBool(value);
break;
case "logfile" or "log_file":
opts.LogFile = ToString(value);
break;
case "log_size_limit":
opts.LogSizeLimit = ToLong(value);
break;
case "log_max_num":
opts.LogMaxFiles = ToInt(value);
break;
case "syslog":
opts.Syslog = ToBool(value);
break;
case "remote_syslog":
opts.RemoteSyslog = ToString(value);
break;
// Limits
case "max_payload":
opts.MaxPayload = ToInt(value);
break;
case "max_control_line":
opts.MaxControlLine = ToInt(value);
break;
case "max_connections" or "max_conn":
opts.MaxConnections = ToInt(value);
break;
case "max_pending":
opts.MaxPending = ToLong(value);
break;
case "max_subs" or "max_subscriptions":
opts.MaxSubs = ToInt(value);
break;
case "max_sub_tokens" or "max_subscription_tokens":
var tokens = ToInt(value);
if (tokens > 256)
errors.Add("max_sub_tokens cannot exceed 256");
else
opts.MaxSubTokens = tokens;
break;
case "max_traced_msg_len":
opts.MaxTracedMsgLen = ToInt(value);
break;
case "max_closed_clients":
opts.MaxClosedClients = ToInt(value);
break;
case "disable_sublist_cache" or "no_sublist_cache":
opts.DisableSublistCache = ToBool(value);
break;
case "write_deadline":
opts.WriteDeadline = ParseDuration(value);
break;
// Ping
case "ping_interval":
opts.PingInterval = ParseDuration(value);
break;
case "ping_max" or "ping_max_out":
opts.MaxPingsOut = ToInt(value);
break;
// Monitoring
case "http_port" or "monitor_port":
opts.MonitorPort = ToInt(value);
break;
case "https_port":
opts.MonitorHttpsPort = ToInt(value);
break;
case "http":
ParseMonitorListen(value, opts, isHttps: false);
break;
case "https":
ParseMonitorListen(value, opts, isHttps: true);
break;
case "http_base_path":
opts.MonitorBasePath = ToString(value);
break;
// Lifecycle
case "lame_duck_duration":
opts.LameDuckDuration = ParseDuration(value);
break;
case "lame_duck_grace_period":
opts.LameDuckGracePeriod = ParseDuration(value);
break;
// Files
case "pidfile" or "pid_file":
opts.PidFile = ToString(value);
break;
case "ports_file_dir":
opts.PortsFileDir = ToString(value);
break;
// Auth
case "authorization":
if (value is Dictionary<string, object?> authDict)
ParseAuthorization(authDict, opts, errors);
break;
case "no_auth_user":
opts.NoAuthUser = ToString(value);
break;
// TLS
case "tls":
if (value is Dictionary<string, object?> tlsDict)
ParseTls(tlsDict, opts, errors);
break;
case "allow_non_tls":
opts.AllowNonTls = ToBool(value);
break;
// Tags
case "server_tags":
if (value is Dictionary<string, object?> tagsDict)
ParseTags(tagsDict, opts);
break;
// Profiling
case "prof_port":
opts.ProfPort = ToInt(value);
break;
// System account
case "system_account":
opts.SystemAccount = ToString(value);
break;
case "no_system_account":
opts.NoSystemAccount = ToBool(value);
break;
case "no_header_support":
opts.NoHeaderSupport = ToBool(value);
break;
case "connect_error_reports":
opts.ConnectErrorReports = ToInt(value);
break;
case "reconnect_error_reports":
opts.ReconnectErrorReports = ToInt(value);
break;
// Unknown keys silently ignored (cluster, jetstream, gateway, leafnode, etc.)
default:
break;
}
}
// ─── Listen parsing ────────────────────────────────────────────
/// <summary>
/// Parses a "listen" value that can be:
/// <list type="bullet">
/// <item><c>":4222"</c> — port only</item>
/// <item><c>"0.0.0.0:4222"</c> — host + port</item>
/// <item><c>"4222"</c> — bare number (port only)</item>
/// <item><c>4222</c> — integer (port only)</item>
/// </list>
/// </summary>
private static void ParseListen(object? value, NatsOptions opts)
{
var (host, port) = ParseHostPort(value);
if (host is not null)
opts.Host = host;
if (port is not null)
opts.Port = port.Value;
}
/// <summary>
/// Parses a monitor listen value. For "http" the port goes to MonitorPort;
/// for "https" the port goes to MonitorHttpsPort.
/// </summary>
private static void ParseMonitorListen(object? value, NatsOptions opts, bool isHttps)
{
var (host, port) = ParseHostPort(value);
if (host is not null)
opts.MonitorHost = host;
if (port is not null)
{
if (isHttps)
opts.MonitorHttpsPort = port.Value;
else
opts.MonitorPort = port.Value;
}
}
/// <summary>
/// Shared host:port parsing logic.
/// </summary>
private static (string? Host, int? Port) ParseHostPort(object? value)
{
if (value is long l)
return (null, (int)l);
var str = ToString(value);
// Try bare integer
if (int.TryParse(str, NumberStyles.Integer, CultureInfo.InvariantCulture, out var barePort))
return (null, barePort);
// Check for host:port
var colonIdx = str.LastIndexOf(':');
if (colonIdx >= 0)
{
var hostPart = str[..colonIdx];
var portPart = str[(colonIdx + 1)..];
if (int.TryParse(portPart, NumberStyles.Integer, CultureInfo.InvariantCulture, out var p))
{
var host = hostPart.Length > 0 ? hostPart : null;
return (host, p);
}
}
throw new FormatException($"Cannot parse listen value: '{str}'");
}
// ─── Duration parsing ──────────────────────────────────────────
/// <summary>
/// Parses a duration value. Accepts:
/// <list type="bullet">
/// <item>A string with unit suffix: "30s", "2m", "1h", "500ms"</item>
/// <item>A number (long/double) treated as seconds</item>
/// </list>
/// </summary>
internal static TimeSpan ParseDuration(object? value)
{
return value switch
{
long seconds => TimeSpan.FromSeconds(seconds),
double seconds => TimeSpan.FromSeconds(seconds),
string s => ParseDurationString(s),
_ => throw new FormatException($"Cannot parse duration from {value?.GetType().Name ?? "null"}"),
};
}
private static readonly Regex DurationPattern = new(
@"^(-?\d+(?:\.\d+)?)\s*(ms|s|m|h)$",
RegexOptions.Compiled | RegexOptions.IgnoreCase);
private static TimeSpan ParseDurationString(string s)
{
var match = DurationPattern.Match(s);
if (!match.Success)
throw new FormatException($"Cannot parse duration: '{s}'");
var amount = double.Parse(match.Groups[1].Value, CultureInfo.InvariantCulture);
var unit = match.Groups[2].Value.ToLowerInvariant();
return unit switch
{
"ms" => TimeSpan.FromMilliseconds(amount),
"s" => TimeSpan.FromSeconds(amount),
"m" => TimeSpan.FromMinutes(amount),
"h" => TimeSpan.FromHours(amount),
_ => throw new FormatException($"Unknown duration unit: '{unit}'"),
};
}
// ─── Authorization parsing ─────────────────────────────────────
private static void ParseAuthorization(Dictionary<string, object?> dict, NatsOptions opts, List<string> errors)
{
foreach (var (key, value) in dict)
{
switch (key.ToLowerInvariant())
{
case "user" or "username":
opts.Username = ToString(value);
break;
case "pass" or "password":
opts.Password = ToString(value);
break;
case "token":
opts.Authorization = ToString(value);
break;
case "timeout":
opts.AuthTimeout = value switch
{
long l => TimeSpan.FromSeconds(l),
double d => TimeSpan.FromSeconds(d),
string s => ParseDuration(s),
_ => throw new FormatException($"Invalid auth timeout type: {value?.GetType().Name}"),
};
break;
case "users":
if (value is List<object?> userList)
opts.Users = ParseUsers(userList, errors);
break;
default:
// Unknown auth keys silently ignored
break;
}
}
}
private static List<User> ParseUsers(List<object?> list, List<string> errors)
{
var users = new List<User>();
foreach (var item in list)
{
if (item is not Dictionary<string, object?> userDict)
{
errors.Add("Expected user entry to be a map");
continue;
}
string? username = null;
string? password = null;
string? account = null;
Permissions? permissions = null;
foreach (var (key, value) in userDict)
{
switch (key.ToLowerInvariant())
{
case "user" or "username":
username = ToString(value);
break;
case "pass" or "password":
password = ToString(value);
break;
case "account":
account = ToString(value);
break;
case "permissions" or "permission":
if (value is Dictionary<string, object?> permDict)
permissions = ParsePermissions(permDict, errors);
break;
}
}
if (username is null)
{
errors.Add("User entry missing 'user' field");
continue;
}
users.Add(new User
{
Username = username,
Password = password ?? string.Empty,
Account = account,
Permissions = permissions,
});
}
return users;
}
private static Permissions ParsePermissions(Dictionary<string, object?> dict, List<string> errors)
{
SubjectPermission? publish = null;
SubjectPermission? subscribe = null;
ResponsePermission? response = null;
foreach (var (key, value) in dict)
{
switch (key.ToLowerInvariant())
{
case "publish" or "pub":
publish = ParseSubjectPermission(value, errors);
break;
case "subscribe" or "sub":
subscribe = ParseSubjectPermission(value, errors);
break;
case "resp" or "response":
if (value is Dictionary<string, object?> respDict)
response = ParseResponsePermission(respDict);
break;
}
}
return new Permissions
{
Publish = publish,
Subscribe = subscribe,
Response = response,
};
}
private static SubjectPermission? ParseSubjectPermission(object? value, List<string> errors)
{
// Can be a simple list of strings (treated as allow) or a dict with allow/deny
if (value is Dictionary<string, object?> dict)
{
IReadOnlyList<string>? allow = null;
IReadOnlyList<string>? deny = null;
foreach (var (key, v) in dict)
{
switch (key.ToLowerInvariant())
{
case "allow":
allow = ToStringList(v);
break;
case "deny":
deny = ToStringList(v);
break;
}
}
return new SubjectPermission { Allow = allow, Deny = deny };
}
if (value is List<object?> list)
{
return new SubjectPermission { Allow = ToStringList(list) };
}
if (value is string s)
{
return new SubjectPermission { Allow = [s] };
}
return null;
}
private static ResponsePermission ParseResponsePermission(Dictionary<string, object?> dict)
{
var maxMsgs = 0;
var expires = TimeSpan.Zero;
foreach (var (key, value) in dict)
{
switch (key.ToLowerInvariant())
{
case "max_msgs" or "max":
maxMsgs = ToInt(value);
break;
case "expires" or "ttl":
expires = ParseDuration(value);
break;
}
}
return new ResponsePermission { MaxMsgs = maxMsgs, Expires = expires };
}
// ─── TLS parsing ───────────────────────────────────────────────
private static void ParseTls(Dictionary<string, object?> dict, NatsOptions opts, List<string> errors)
{
foreach (var (key, value) in dict)
{
switch (key.ToLowerInvariant())
{
case "cert_file":
opts.TlsCert = ToString(value);
break;
case "key_file":
opts.TlsKey = ToString(value);
break;
case "ca_file":
opts.TlsCaCert = ToString(value);
break;
case "verify":
opts.TlsVerify = ToBool(value);
break;
case "verify_and_map":
var map = ToBool(value);
opts.TlsMap = map;
if (map)
opts.TlsVerify = true;
break;
case "timeout":
opts.TlsTimeout = value switch
{
long l => TimeSpan.FromSeconds(l),
double d => TimeSpan.FromSeconds(d),
string s => ParseDuration(s),
_ => throw new FormatException($"Invalid TLS timeout type: {value?.GetType().Name}"),
};
break;
case "connection_rate_limit":
opts.TlsRateLimit = ToLong(value);
break;
case "pinned_certs":
if (value is List<object?> pinnedList)
{
var certs = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
foreach (var item in pinnedList)
{
if (item is string s)
certs.Add(s.ToLowerInvariant());
}
opts.TlsPinnedCerts = certs;
}
break;
case "handshake_first" or "first" or "immediate":
opts.TlsHandshakeFirst = ToBool(value);
break;
case "handshake_first_fallback":
opts.TlsHandshakeFirstFallback = ParseDuration(value);
break;
default:
// Unknown TLS keys silently ignored
break;
}
}
}
// ─── Tags parsing ──────────────────────────────────────────────
private static void ParseTags(Dictionary<string, object?> dict, NatsOptions opts)
{
var tags = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase);
foreach (var (key, value) in dict)
{
tags[key] = ToString(value);
}
opts.Tags = tags;
}
// ─── Type conversion helpers ───────────────────────────────────
private static int ToInt(object? value) => value switch
{
long l => (int)l,
int i => i,
double d => (int)d,
string s when int.TryParse(s, NumberStyles.Integer, CultureInfo.InvariantCulture, out var i) => i,
_ => throw new FormatException($"Cannot convert {value?.GetType().Name ?? "null"} to int"),
};
private static long ToLong(object? value) => value switch
{
long l => l,
int i => i,
double d => (long)d,
string s when long.TryParse(s, NumberStyles.Integer, CultureInfo.InvariantCulture, out var l) => l,
_ => throw new FormatException($"Cannot convert {value?.GetType().Name ?? "null"} to long"),
};
private static bool ToBool(object? value) => value switch
{
bool b => b,
_ => throw new FormatException($"Cannot convert {value?.GetType().Name ?? "null"} to bool"),
};
private static string ToString(object? value) => value switch
{
string s => s,
long l => l.ToString(CultureInfo.InvariantCulture),
_ => throw new FormatException($"Cannot convert {value?.GetType().Name ?? "null"} to string"),
};
private static IReadOnlyList<string> ToStringList(object? value)
{
if (value is List<object?> list)
{
var result = new List<string>(list.Count);
foreach (var item in list)
{
if (item is string s)
result.Add(s);
}
return result;
}
if (value is string str)
return [str];
return [];
}
}
/// <summary>
/// Thrown when one or more configuration validation errors are detected.
/// All errors are collected rather than failing on the first one.
/// </summary>
public sealed class ConfigProcessorException(string message, List<string> errors)
: Exception(message)
{
public IReadOnlyList<string> Errors => errors;
}

View File

@@ -0,0 +1,341 @@
// Port of Go server/reload.go — config diffing, validation, and CLI override merging
// for hot reload support. Reference: golang/nats-server/server/reload.go.
namespace NATS.Server.Configuration;
/// <summary>
/// Provides static methods for comparing two <see cref="NatsOptions"/> instances,
/// validating that detected changes are reloadable, and merging CLI overrides
/// so that command-line flags always take precedence over config file values.
/// </summary>
public static class ConfigReloader
{
// Non-reloadable options (match Go server — Host, Port, ServerName require restart)
private static readonly HashSet<string> NonReloadable = ["Host", "Port", "ServerName"];
// Logging-related options
private static readonly HashSet<string> LoggingOptions =
["Debug", "Trace", "TraceVerbose", "Logtime", "LogtimeUTC", "LogFile",
"LogSizeLimit", "LogMaxFiles", "Syslog", "RemoteSyslog"];
// Auth-related options
private static readonly HashSet<string> AuthOptions =
["Username", "Password", "Authorization", "Users", "NKeys",
"NoAuthUser", "AuthTimeout"];
// TLS-related options
private static readonly HashSet<string> TlsOptions =
["TlsCert", "TlsKey", "TlsCaCert", "TlsVerify", "TlsMap",
"TlsTimeout", "TlsHandshakeFirst", "TlsHandshakeFirstFallback",
"AllowNonTls", "TlsRateLimit", "TlsPinnedCerts"];
/// <summary>
/// Compares two <see cref="NatsOptions"/> instances property by property and returns
/// a list of <see cref="IConfigChange"/> for every property that differs. Each change
/// is tagged with the appropriate category flags.
/// </summary>
public static List<IConfigChange> Diff(NatsOptions oldOpts, NatsOptions newOpts)
{
var changes = new List<IConfigChange>();
// Non-reloadable
CompareAndAdd(changes, "Host", oldOpts.Host, newOpts.Host);
CompareAndAdd(changes, "Port", oldOpts.Port, newOpts.Port);
CompareAndAdd(changes, "ServerName", oldOpts.ServerName, newOpts.ServerName);
// Logging
CompareAndAdd(changes, "Debug", oldOpts.Debug, newOpts.Debug);
CompareAndAdd(changes, "Trace", oldOpts.Trace, newOpts.Trace);
CompareAndAdd(changes, "TraceVerbose", oldOpts.TraceVerbose, newOpts.TraceVerbose);
CompareAndAdd(changes, "Logtime", oldOpts.Logtime, newOpts.Logtime);
CompareAndAdd(changes, "LogtimeUTC", oldOpts.LogtimeUTC, newOpts.LogtimeUTC);
CompareAndAdd(changes, "LogFile", oldOpts.LogFile, newOpts.LogFile);
CompareAndAdd(changes, "LogSizeLimit", oldOpts.LogSizeLimit, newOpts.LogSizeLimit);
CompareAndAdd(changes, "LogMaxFiles", oldOpts.LogMaxFiles, newOpts.LogMaxFiles);
CompareAndAdd(changes, "Syslog", oldOpts.Syslog, newOpts.Syslog);
CompareAndAdd(changes, "RemoteSyslog", oldOpts.RemoteSyslog, newOpts.RemoteSyslog);
// Auth
CompareAndAdd(changes, "Username", oldOpts.Username, newOpts.Username);
CompareAndAdd(changes, "Password", oldOpts.Password, newOpts.Password);
CompareAndAdd(changes, "Authorization", oldOpts.Authorization, newOpts.Authorization);
CompareCollectionAndAdd(changes, "Users", oldOpts.Users, newOpts.Users);
CompareCollectionAndAdd(changes, "NKeys", oldOpts.NKeys, newOpts.NKeys);
CompareAndAdd(changes, "NoAuthUser", oldOpts.NoAuthUser, newOpts.NoAuthUser);
CompareAndAdd(changes, "AuthTimeout", oldOpts.AuthTimeout, newOpts.AuthTimeout);
// TLS
CompareAndAdd(changes, "TlsCert", oldOpts.TlsCert, newOpts.TlsCert);
CompareAndAdd(changes, "TlsKey", oldOpts.TlsKey, newOpts.TlsKey);
CompareAndAdd(changes, "TlsCaCert", oldOpts.TlsCaCert, newOpts.TlsCaCert);
CompareAndAdd(changes, "TlsVerify", oldOpts.TlsVerify, newOpts.TlsVerify);
CompareAndAdd(changes, "TlsMap", oldOpts.TlsMap, newOpts.TlsMap);
CompareAndAdd(changes, "TlsTimeout", oldOpts.TlsTimeout, newOpts.TlsTimeout);
CompareAndAdd(changes, "TlsHandshakeFirst", oldOpts.TlsHandshakeFirst, newOpts.TlsHandshakeFirst);
CompareAndAdd(changes, "TlsHandshakeFirstFallback", oldOpts.TlsHandshakeFirstFallback, newOpts.TlsHandshakeFirstFallback);
CompareAndAdd(changes, "AllowNonTls", oldOpts.AllowNonTls, newOpts.AllowNonTls);
CompareAndAdd(changes, "TlsRateLimit", oldOpts.TlsRateLimit, newOpts.TlsRateLimit);
CompareCollectionAndAdd(changes, "TlsPinnedCerts", oldOpts.TlsPinnedCerts, newOpts.TlsPinnedCerts);
// Limits
CompareAndAdd(changes, "MaxConnections", oldOpts.MaxConnections, newOpts.MaxConnections);
CompareAndAdd(changes, "MaxPayload", oldOpts.MaxPayload, newOpts.MaxPayload);
CompareAndAdd(changes, "MaxPending", oldOpts.MaxPending, newOpts.MaxPending);
CompareAndAdd(changes, "WriteDeadline", oldOpts.WriteDeadline, newOpts.WriteDeadline);
CompareAndAdd(changes, "PingInterval", oldOpts.PingInterval, newOpts.PingInterval);
CompareAndAdd(changes, "MaxPingsOut", oldOpts.MaxPingsOut, newOpts.MaxPingsOut);
CompareAndAdd(changes, "MaxControlLine", oldOpts.MaxControlLine, newOpts.MaxControlLine);
CompareAndAdd(changes, "MaxSubs", oldOpts.MaxSubs, newOpts.MaxSubs);
CompareAndAdd(changes, "MaxSubTokens", oldOpts.MaxSubTokens, newOpts.MaxSubTokens);
CompareAndAdd(changes, "MaxTracedMsgLen", oldOpts.MaxTracedMsgLen, newOpts.MaxTracedMsgLen);
CompareAndAdd(changes, "MaxClosedClients", oldOpts.MaxClosedClients, newOpts.MaxClosedClients);
// Misc
CompareCollectionAndAdd(changes, "Tags", oldOpts.Tags, newOpts.Tags);
CompareAndAdd(changes, "LameDuckDuration", oldOpts.LameDuckDuration, newOpts.LameDuckDuration);
CompareAndAdd(changes, "LameDuckGracePeriod", oldOpts.LameDuckGracePeriod, newOpts.LameDuckGracePeriod);
CompareAndAdd(changes, "ClientAdvertise", oldOpts.ClientAdvertise, newOpts.ClientAdvertise);
CompareAndAdd(changes, "DisableSublistCache", oldOpts.DisableSublistCache, newOpts.DisableSublistCache);
CompareAndAdd(changes, "ConnectErrorReports", oldOpts.ConnectErrorReports, newOpts.ConnectErrorReports);
CompareAndAdd(changes, "ReconnectErrorReports", oldOpts.ReconnectErrorReports, newOpts.ReconnectErrorReports);
CompareAndAdd(changes, "NoHeaderSupport", oldOpts.NoHeaderSupport, newOpts.NoHeaderSupport);
CompareAndAdd(changes, "NoSystemAccount", oldOpts.NoSystemAccount, newOpts.NoSystemAccount);
CompareAndAdd(changes, "SystemAccount", oldOpts.SystemAccount, newOpts.SystemAccount);
return changes;
}
/// <summary>
/// Validates a list of config changes and returns error messages for any
/// non-reloadable changes (properties that require a server restart).
/// </summary>
public static List<string> Validate(List<IConfigChange> changes)
{
var errors = new List<string>();
foreach (var change in changes)
{
if (change.IsNonReloadable)
{
errors.Add($"Config reload: '{change.Name}' cannot be changed at runtime (requires restart)");
}
}
return errors;
}
/// <summary>
/// Merges CLI overrides into a freshly-parsed config so that command-line flags
/// always take precedence. Only properties whose names appear in <paramref name="cliFlags"/>
/// are copied from <paramref name="cliValues"/> to <paramref name="fromConfig"/>.
/// </summary>
public static void MergeCliOverrides(NatsOptions fromConfig, NatsOptions cliValues, HashSet<string> cliFlags)
{
foreach (var flag in cliFlags)
{
switch (flag)
{
// Non-reloadable
case "Host":
fromConfig.Host = cliValues.Host;
break;
case "Port":
fromConfig.Port = cliValues.Port;
break;
case "ServerName":
fromConfig.ServerName = cliValues.ServerName;
break;
// Logging
case "Debug":
fromConfig.Debug = cliValues.Debug;
break;
case "Trace":
fromConfig.Trace = cliValues.Trace;
break;
case "TraceVerbose":
fromConfig.TraceVerbose = cliValues.TraceVerbose;
break;
case "Logtime":
fromConfig.Logtime = cliValues.Logtime;
break;
case "LogtimeUTC":
fromConfig.LogtimeUTC = cliValues.LogtimeUTC;
break;
case "LogFile":
fromConfig.LogFile = cliValues.LogFile;
break;
case "LogSizeLimit":
fromConfig.LogSizeLimit = cliValues.LogSizeLimit;
break;
case "LogMaxFiles":
fromConfig.LogMaxFiles = cliValues.LogMaxFiles;
break;
case "Syslog":
fromConfig.Syslog = cliValues.Syslog;
break;
case "RemoteSyslog":
fromConfig.RemoteSyslog = cliValues.RemoteSyslog;
break;
// Auth
case "Username":
fromConfig.Username = cliValues.Username;
break;
case "Password":
fromConfig.Password = cliValues.Password;
break;
case "Authorization":
fromConfig.Authorization = cliValues.Authorization;
break;
case "Users":
fromConfig.Users = cliValues.Users;
break;
case "NKeys":
fromConfig.NKeys = cliValues.NKeys;
break;
case "NoAuthUser":
fromConfig.NoAuthUser = cliValues.NoAuthUser;
break;
case "AuthTimeout":
fromConfig.AuthTimeout = cliValues.AuthTimeout;
break;
// TLS
case "TlsCert":
fromConfig.TlsCert = cliValues.TlsCert;
break;
case "TlsKey":
fromConfig.TlsKey = cliValues.TlsKey;
break;
case "TlsCaCert":
fromConfig.TlsCaCert = cliValues.TlsCaCert;
break;
case "TlsVerify":
fromConfig.TlsVerify = cliValues.TlsVerify;
break;
case "TlsMap":
fromConfig.TlsMap = cliValues.TlsMap;
break;
case "TlsTimeout":
fromConfig.TlsTimeout = cliValues.TlsTimeout;
break;
case "TlsHandshakeFirst":
fromConfig.TlsHandshakeFirst = cliValues.TlsHandshakeFirst;
break;
case "TlsHandshakeFirstFallback":
fromConfig.TlsHandshakeFirstFallback = cliValues.TlsHandshakeFirstFallback;
break;
case "AllowNonTls":
fromConfig.AllowNonTls = cliValues.AllowNonTls;
break;
case "TlsRateLimit":
fromConfig.TlsRateLimit = cliValues.TlsRateLimit;
break;
case "TlsPinnedCerts":
fromConfig.TlsPinnedCerts = cliValues.TlsPinnedCerts;
break;
// Limits
case "MaxConnections":
fromConfig.MaxConnections = cliValues.MaxConnections;
break;
case "MaxPayload":
fromConfig.MaxPayload = cliValues.MaxPayload;
break;
case "MaxPending":
fromConfig.MaxPending = cliValues.MaxPending;
break;
case "WriteDeadline":
fromConfig.WriteDeadline = cliValues.WriteDeadline;
break;
case "PingInterval":
fromConfig.PingInterval = cliValues.PingInterval;
break;
case "MaxPingsOut":
fromConfig.MaxPingsOut = cliValues.MaxPingsOut;
break;
case "MaxControlLine":
fromConfig.MaxControlLine = cliValues.MaxControlLine;
break;
case "MaxSubs":
fromConfig.MaxSubs = cliValues.MaxSubs;
break;
case "MaxSubTokens":
fromConfig.MaxSubTokens = cliValues.MaxSubTokens;
break;
case "MaxTracedMsgLen":
fromConfig.MaxTracedMsgLen = cliValues.MaxTracedMsgLen;
break;
case "MaxClosedClients":
fromConfig.MaxClosedClients = cliValues.MaxClosedClients;
break;
// Misc
case "Tags":
fromConfig.Tags = cliValues.Tags;
break;
case "LameDuckDuration":
fromConfig.LameDuckDuration = cliValues.LameDuckDuration;
break;
case "LameDuckGracePeriod":
fromConfig.LameDuckGracePeriod = cliValues.LameDuckGracePeriod;
break;
case "ClientAdvertise":
fromConfig.ClientAdvertise = cliValues.ClientAdvertise;
break;
case "DisableSublistCache":
fromConfig.DisableSublistCache = cliValues.DisableSublistCache;
break;
case "ConnectErrorReports":
fromConfig.ConnectErrorReports = cliValues.ConnectErrorReports;
break;
case "ReconnectErrorReports":
fromConfig.ReconnectErrorReports = cliValues.ReconnectErrorReports;
break;
case "NoHeaderSupport":
fromConfig.NoHeaderSupport = cliValues.NoHeaderSupport;
break;
case "NoSystemAccount":
fromConfig.NoSystemAccount = cliValues.NoSystemAccount;
break;
case "SystemAccount":
fromConfig.SystemAccount = cliValues.SystemAccount;
break;
}
}
}
// ─── Comparison helpers ─────────────────────────────────────────
private static void CompareAndAdd<T>(List<IConfigChange> changes, string name, T oldVal, T newVal)
{
if (!Equals(oldVal, newVal))
{
changes.Add(new ConfigChange(
name,
isLoggingChange: LoggingOptions.Contains(name),
isAuthChange: AuthOptions.Contains(name),
isTlsChange: TlsOptions.Contains(name),
isNonReloadable: NonReloadable.Contains(name)));
}
}
private static void CompareCollectionAndAdd<T>(List<IConfigChange> changes, string name, T? oldVal, T? newVal)
where T : class
{
// For collections we compare by reference and null state.
// A change from null to non-null (or vice versa), or a different reference, counts as changed.
if (ReferenceEquals(oldVal, newVal))
return;
if (oldVal is null || newVal is null || !ReferenceEquals(oldVal, newVal))
{
changes.Add(new ConfigChange(
name,
isLoggingChange: LoggingOptions.Contains(name),
isAuthChange: AuthOptions.Contains(name),
isTlsChange: TlsOptions.Contains(name),
isNonReloadable: NonReloadable.Contains(name)));
}
}
}

View File

@@ -0,0 +1,54 @@
// Port of Go server/reload.go option interface — represents a single detected
// configuration change with category flags for reload handling.
// Reference: golang/nats-server/server/reload.go lines 42-74.
namespace NATS.Server.Configuration;
/// <summary>
/// Represents a single detected configuration change during a hot reload.
/// Category flags indicate what kind of reload action is needed.
/// </summary>
public interface IConfigChange
{
/// <summary>
/// The property name that changed (matches NatsOptions property name).
/// </summary>
string Name { get; }
/// <summary>
/// Whether this change requires reloading the logger.
/// </summary>
bool IsLoggingChange { get; }
/// <summary>
/// Whether this change requires reloading authorization.
/// </summary>
bool IsAuthChange { get; }
/// <summary>
/// Whether this change requires reloading TLS configuration.
/// </summary>
bool IsTlsChange { get; }
/// <summary>
/// Whether this option cannot be changed at runtime (requires restart).
/// </summary>
bool IsNonReloadable { get; }
}
/// <summary>
/// Default implementation of <see cref="IConfigChange"/> using a primary constructor.
/// </summary>
public sealed class ConfigChange(
string name,
bool isLoggingChange = false,
bool isAuthChange = false,
bool isTlsChange = false,
bool isNonReloadable = false) : IConfigChange
{
public string Name => name;
public bool IsLoggingChange => isLoggingChange;
public bool IsAuthChange => isAuthChange;
public bool IsTlsChange => isTlsChange;
public bool IsNonReloadable => isNonReloadable;
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,421 @@
// Port of Go conf/parse.go — recursive-descent parser for NATS config files.
// Reference: golang/nats-server/conf/parse.go
using System.Globalization;
using System.Security.Cryptography;
using System.Text;
namespace NATS.Server.Configuration;
/// <summary>
/// Parses NATS configuration data (tokenized by <see cref="NatsConfLexer"/>) into
/// a <c>Dictionary&lt;string, object?&gt;</c> tree. Supports nested maps, arrays,
/// variable references (block-scoped + environment), include directives, bcrypt
/// password literals, and integer suffix multipliers.
/// </summary>
public static class NatsConfParser
{
// Bcrypt hashes start with $2a$ or $2b$. The lexer consumes the leading '$'
// and emits a Variable token whose value begins with "2a$" or "2b$".
private const string BcryptPrefix2A = "2a$";
private const string BcryptPrefix2B = "2b$";
// Maximum nesting depth for include directives to prevent infinite recursion.
private const int MaxIncludeDepth = 10;
/// <summary>
/// Parses a NATS configuration string into a dictionary.
/// </summary>
public static Dictionary<string, object?> Parse(string data)
{
var tokens = NatsConfLexer.Tokenize(data);
var state = new ParserState(tokens, baseDir: string.Empty);
state.Run();
return state.Mapping;
}
/// <summary>
/// Parses a NATS configuration file into a dictionary.
/// </summary>
public static Dictionary<string, object?> ParseFile(string filePath) =>
ParseFile(filePath, includeDepth: 0);
private static Dictionary<string, object?> ParseFile(string filePath, int includeDepth)
{
var data = File.ReadAllText(filePath);
var tokens = NatsConfLexer.Tokenize(data);
var baseDir = Path.GetDirectoryName(Path.GetFullPath(filePath)) ?? string.Empty;
var state = new ParserState(tokens, baseDir, [], includeDepth);
state.Run();
return state.Mapping;
}
/// <summary>
/// Parses a NATS configuration file and returns the parsed config plus a
/// SHA-256 digest of the raw file content formatted as "sha256:&lt;hex&gt;".
/// </summary>
public static (Dictionary<string, object?> Config, string Digest) ParseFileWithDigest(string filePath)
{
var rawBytes = File.ReadAllBytes(filePath);
var hashBytes = SHA256.HashData(rawBytes);
var digest = "sha256:" + Convert.ToHexStringLower(hashBytes);
var data = Encoding.UTF8.GetString(rawBytes);
var tokens = NatsConfLexer.Tokenize(data);
var baseDir = Path.GetDirectoryName(Path.GetFullPath(filePath)) ?? string.Empty;
var state = new ParserState(tokens, baseDir, [], includeDepth: 0);
state.Run();
return (state.Mapping, digest);
}
/// <summary>
/// Internal: parse an environment variable value by wrapping it in a synthetic
/// key-value assignment and parsing it. Shares the parent's env var cycle tracker.
/// </summary>
private static Dictionary<string, object?> ParseEnvValue(string value, HashSet<string> envVarReferences, int includeDepth)
{
var synthetic = $"pk={value}";
var tokens = NatsConfLexer.Tokenize(synthetic);
var state = new ParserState(tokens, baseDir: string.Empty, envVarReferences, includeDepth);
state.Run();
return state.Mapping;
}
/// <summary>
/// Encapsulates the mutable parsing state: context stack, key stack, token cursor.
/// Mirrors the Go <c>parser</c> struct from conf/parse.go.
/// </summary>
private sealed class ParserState
{
private readonly IReadOnlyList<Token> _tokens;
private readonly string _baseDir;
private readonly HashSet<string> _envVarReferences;
private readonly int _includeDepth;
private int _pos;
// The context stack holds either Dictionary<string, object?> (map) or List<object?> (array).
private readonly List<object> _ctxs = new(4);
private object _ctx = null!;
// Key stack for map assignments.
private readonly List<string> _keys = new(4);
public Dictionary<string, object?> Mapping { get; } = new(StringComparer.OrdinalIgnoreCase);
public ParserState(IReadOnlyList<Token> tokens, string baseDir)
: this(tokens, baseDir, [], includeDepth: 0)
{
}
public ParserState(IReadOnlyList<Token> tokens, string baseDir, HashSet<string> envVarReferences, int includeDepth)
{
_tokens = tokens;
_baseDir = baseDir;
_envVarReferences = envVarReferences;
_includeDepth = includeDepth;
}
public void Run()
{
PushContext(Mapping);
Token prevToken = default;
while (true)
{
var token = Next();
if (token.Type == TokenType.Eof)
{
// Allow a trailing '}' (JSON-like configs) — mirror Go behavior.
if (prevToken.Type == TokenType.Key && prevToken.Value != "}")
{
throw new FormatException($"Config is invalid at line {token.Line}:{token.Position}");
}
break;
}
prevToken = token;
ProcessItem(token);
}
}
private Token Next()
{
if (_pos >= _tokens.Count)
{
return new Token(TokenType.Eof, string.Empty, 0, 0);
}
return _tokens[_pos++];
}
private void PushContext(object ctx)
{
_ctxs.Add(ctx);
_ctx = ctx;
}
private object PopContext()
{
if (_ctxs.Count <= 1)
{
throw new InvalidOperationException("BUG in parser, context stack underflow");
}
var last = _ctxs[^1];
_ctxs.RemoveAt(_ctxs.Count - 1);
_ctx = _ctxs[^1];
return last;
}
private void PushKey(string key) => _keys.Add(key);
private string PopKey()
{
if (_keys.Count == 0)
{
throw new InvalidOperationException("BUG in parser, keys stack empty");
}
var last = _keys[^1];
_keys.RemoveAt(_keys.Count - 1);
return last;
}
private void SetValue(object? val)
{
// Array context: append the value.
if (_ctx is List<object?> array)
{
array.Add(val);
return;
}
// Map context: pop the pending key and assign.
if (_ctx is Dictionary<string, object?> map)
{
var key = PopKey();
map[key] = val;
return;
}
throw new InvalidOperationException($"BUG in parser, unexpected context type {_ctx?.GetType().Name ?? "null"}");
}
private void ProcessItem(Token token)
{
switch (token.Type)
{
case TokenType.Error:
throw new FormatException($"Parse error on line {token.Line}: '{token.Value}'");
case TokenType.Key:
PushKey(token.Value);
break;
case TokenType.String:
SetValue(token.Value);
break;
case TokenType.Bool:
SetValue(ParseBool(token.Value));
break;
case TokenType.Integer:
SetValue(ParseInteger(token.Value));
break;
case TokenType.Float:
SetValue(ParseFloat(token.Value));
break;
case TokenType.DateTime:
SetValue(DateTimeOffset.Parse(token.Value, CultureInfo.InvariantCulture));
break;
case TokenType.ArrayStart:
PushContext(new List<object?>());
break;
case TokenType.ArrayEnd:
{
var array = _ctx;
PopContext();
SetValue(array);
break;
}
case TokenType.MapStart:
PushContext(new Dictionary<string, object?>(StringComparer.OrdinalIgnoreCase));
break;
case TokenType.MapEnd:
SetValue(PopContext());
break;
case TokenType.Variable:
ResolveVariable(token);
break;
case TokenType.Include:
ProcessInclude(token.Value);
break;
case TokenType.Comment:
// Skip comments entirely.
break;
case TokenType.Eof:
// Handled in the Run loop; should not reach here.
break;
default:
throw new FormatException($"Unexpected token type {token.Type} on line {token.Line}");
}
}
private static bool ParseBool(string value) =>
value.ToLowerInvariant() switch
{
"true" or "yes" or "on" => true,
"false" or "no" or "off" => false,
_ => throw new FormatException($"Expected boolean value, but got '{value}'"),
};
/// <summary>
/// Parses an integer token value, handling optional size suffixes
/// (k, kb, m, mb, g, gb, t, tb, etc.) exactly as the Go reference does.
/// </summary>
private static long ParseInteger(string value)
{
// Find where digits end and potential suffix begins.
var lastDigit = 0;
foreach (var c in value)
{
if (!char.IsDigit(c) && c != '-')
{
break;
}
lastDigit++;
}
var numStr = value[..lastDigit];
if (!long.TryParse(numStr, NumberStyles.Integer, CultureInfo.InvariantCulture, out var num))
{
throw new FormatException($"Expected integer, but got '{value}'");
}
var suffix = value[lastDigit..].Trim().ToLowerInvariant();
return suffix switch
{
"" => num,
"k" => num * 1000,
"kb" or "ki" or "kib" => num * 1024,
"m" => num * 1_000_000,
"mb" or "mi" or "mib" => num * 1024 * 1024,
"g" => num * 1_000_000_000,
"gb" or "gi" or "gib" => num * 1024 * 1024 * 1024,
"t" => num * 1_000_000_000_000,
"tb" or "ti" or "tib" => num * 1024L * 1024 * 1024 * 1024,
"p" => num * 1_000_000_000_000_000,
"pb" or "pi" or "pib" => num * 1024L * 1024 * 1024 * 1024 * 1024,
"e" => num * 1_000_000_000_000_000_000,
"eb" or "ei" or "eib" => num * 1024L * 1024 * 1024 * 1024 * 1024 * 1024,
_ => throw new FormatException($"Unknown integer suffix '{suffix}' in '{value}'"),
};
}
private static double ParseFloat(string value)
{
if (!double.TryParse(value, NumberStyles.Float | NumberStyles.AllowLeadingSign, CultureInfo.InvariantCulture, out var result))
{
throw new FormatException($"Expected float, but got '{value}'");
}
return result;
}
/// <summary>
/// Resolves a variable reference using block scoping: walks the context stack
/// top-down looking in map contexts, then falls back to environment variables.
/// Detects bcrypt password literals and reference cycles.
/// </summary>
private void ResolveVariable(Token token)
{
var varName = token.Value;
// Special case: raw bcrypt strings ($2a$... or $2b$...).
// The lexer consumed the leading '$', so the variable value starts with "2a$" or "2b$".
if (varName.StartsWith(BcryptPrefix2A, StringComparison.Ordinal) ||
varName.StartsWith(BcryptPrefix2B, StringComparison.Ordinal))
{
SetValue("$" + varName);
return;
}
// Walk context stack from top (innermost scope) to bottom (outermost).
for (var i = _ctxs.Count - 1; i >= 0; i--)
{
if (_ctxs[i] is Dictionary<string, object?> map && map.TryGetValue(varName, out var found))
{
SetValue(found);
return;
}
}
// Not found in any context map. Check environment variables.
// First, detect cycles.
if (!_envVarReferences.Add(varName))
{
throw new FormatException($"Variable reference cycle for '{varName}'");
}
try
{
var envValue = Environment.GetEnvironmentVariable(varName);
if (envValue is not null)
{
// Parse the env value through the full parser to get correct typing
// (e.g., "42" becomes long 42, "true" becomes bool, etc.).
var subResult = ParseEnvValue(envValue, _envVarReferences, _includeDepth);
if (subResult.TryGetValue("pk", out var parsedValue))
{
SetValue(parsedValue);
return;
}
}
}
finally
{
_envVarReferences.Remove(varName);
}
// Not found anywhere.
throw new FormatException(
$"Variable reference for '{varName}' on line {token.Line} can not be found");
}
/// <summary>
/// Processes an include directive by parsing the referenced file and merging
/// all its top-level keys into the current context.
/// </summary>
private void ProcessInclude(string includePath)
{
if (_includeDepth >= MaxIncludeDepth)
{
throw new FormatException(
$"Include depth limit of {MaxIncludeDepth} exceeded while processing '{includePath}'");
}
var fullPath = Path.Combine(_baseDir, includePath);
var includeResult = ParseFile(fullPath, _includeDepth + 1);
foreach (var (key, value) in includeResult)
{
PushKey(key);
SetValue(value);
}
}
}
}

View File

@@ -0,0 +1,24 @@
// Port of Go conf/lex.go token types.
namespace NATS.Server.Configuration;
public enum TokenType
{
Error,
Eof,
Key,
String,
Bool,
Integer,
Float,
DateTime,
ArrayStart,
ArrayEnd,
MapStart,
MapEnd,
Variable,
Include,
Comment,
}
public readonly record struct Token(TokenType Type, string Value, int Line, int Position);

View File

@@ -72,6 +72,21 @@ public sealed class NatsOptions
// Profiling (0 = disabled)
public int ProfPort { get; set; }
// Extended options for Go parity
public string? ClientAdvertise { get; set; }
public bool TraceVerbose { get; set; }
public int MaxTracedMsgLen { get; set; }
public bool DisableSublistCache { get; set; }
public int ConnectErrorReports { get; set; } = 3600;
public int ReconnectErrorReports { get; set; } = 1;
public bool NoHeaderSupport { get; set; }
public int MaxClosedClients { get; set; } = 10_000;
public bool NoSystemAccount { get; set; }
public string? SystemAccount { get; set; }
// Tracks which fields were set via CLI flags (for reload precedence)
public HashSet<string> InCmdLine { get; } = [];
// TLS
public string? TlsCert { get; set; }
public string? TlsKey { get; set; }

View File

@@ -8,6 +8,7 @@ using System.Text;
using Microsoft.Extensions.Logging;
using NATS.NKeys;
using NATS.Server.Auth;
using NATS.Server.Configuration;
using NATS.Server.Monitoring;
using NATS.Server.Protocol;
using NATS.Server.Subscriptions;
@@ -20,14 +21,18 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
private readonly NatsOptions _options;
private readonly ConcurrentDictionary<ulong, NatsClient> _clients = new();
private readonly ConcurrentQueue<ClosedClient> _closedClients = new();
private const int MaxClosedClients = 10_000;
private readonly ServerInfo _serverInfo;
private readonly ILogger<NatsServer> _logger;
private readonly ILoggerFactory _loggerFactory;
private readonly ServerStats _stats = new();
private readonly TaskCompletionSource _listeningStarted = new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly AuthService _authService;
private AuthService _authService;
private readonly ConcurrentDictionary<string, Account> _accounts = new(StringComparer.Ordinal);
// Config reload state
private NatsOptions? _cliSnapshot;
private HashSet<string> _cliFlags = [];
private string? _configDigest;
private readonly Account _globalAccount;
private readonly Account _systemAccount;
private readonly SslServerAuthenticationOptions? _sslOptions;
@@ -224,7 +229,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
_signalRegistrations.Add(PosixSignalRegistration.Create(PosixSignal.SIGHUP, ctx =>
{
ctx.Cancel = true;
_logger.LogWarning("Trapped SIGHUP signal — config reload not yet supported");
_logger.LogInformation("Trapped SIGHUP signal — reloading configuration");
_ = Task.Run(() => ReloadConfig());
}));
// SIGUSR1 and SIGUSR2 only on non-Windows
@@ -320,6 +326,20 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
}
BuildCachedInfo();
// Store initial config digest for reload change detection
if (options.ConfigFile != null)
{
try
{
var (_, digest) = NatsConfParser.ParseFileWithDigest(options.ConfigFile);
_configDigest = digest;
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Could not compute initial config digest for {ConfigFile}", options.ConfigFile);
}
}
}
private void BuildCachedInfo()
@@ -354,8 +374,6 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
_logger.LogInformation("Listening for client connections on {Host}:{Port}", _options.Host, _options.Port);
// Warn about stub features
if (_options.ConfigFile != null)
_logger.LogWarning("Config file parsing not yet supported (file: {ConfigFile})", _options.ConfigFile);
if (_options.ProfPort > 0)
_logger.LogWarning("Profiling endpoint not yet supported (port: {ProfPort})", _options.ProfPort);
@@ -696,7 +714,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
});
// Cap closed clients list
while (_closedClients.Count > MaxClosedClients)
while (_closedClients.Count > _options.MaxClosedClients)
_closedClients.TryDequeue(out _);
var subList = client.Account?.SubList ?? _globalAccount.SubList;
@@ -766,6 +784,155 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
}
}
/// <summary>
/// Stores the CLI snapshot and flags so that command-line overrides
/// always take precedence during config reload.
/// </summary>
public void SetCliSnapshot(NatsOptions cliSnapshot, HashSet<string> cliFlags)
{
_cliSnapshot = cliSnapshot;
_cliFlags = cliFlags;
}
/// <summary>
/// Reloads the configuration file, diffs against current options, validates
/// the changes, and applies reloadable settings. CLI overrides are preserved.
/// </summary>
public void ReloadConfig()
{
if (_options.ConfigFile == null)
{
_logger.LogWarning("No config file specified, cannot reload");
return;
}
try
{
var (newConfig, digest) = NatsConfParser.ParseFileWithDigest(_options.ConfigFile);
if (digest == _configDigest)
{
_logger.LogInformation("Config file unchanged, no reload needed");
return;
}
var newOpts = new NatsOptions { ConfigFile = _options.ConfigFile };
ConfigProcessor.ApplyConfig(newConfig, newOpts);
// CLI flags override config
if (_cliSnapshot != null)
ConfigReloader.MergeCliOverrides(newOpts, _cliSnapshot, _cliFlags);
var changes = ConfigReloader.Diff(_options, newOpts);
var errors = ConfigReloader.Validate(changes);
if (errors.Count > 0)
{
foreach (var err in errors)
_logger.LogError("Config reload error: {Error}", err);
return;
}
// Apply changes to running options
ApplyConfigChanges(changes, newOpts);
_configDigest = digest;
_logger.LogInformation("Config reloaded successfully ({Count} changes applied)", changes.Count);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to reload config file: {ConfigFile}", _options.ConfigFile);
}
}
private void ApplyConfigChanges(List<IConfigChange> changes, NatsOptions newOpts)
{
bool hasLoggingChanges = false;
bool hasAuthChanges = false;
foreach (var change in changes)
{
if (change.IsLoggingChange) hasLoggingChanges = true;
if (change.IsAuthChange) hasAuthChanges = true;
}
// Copy reloadable values from newOpts to _options
CopyReloadableOptions(newOpts);
// Trigger side effects
if (hasLoggingChanges)
{
ReOpenLogFile?.Invoke();
_logger.LogInformation("Logging configuration reloaded");
}
if (hasAuthChanges)
{
// Rebuild auth service with new options
_authService = AuthService.Build(_options);
_logger.LogInformation("Authorization configuration reloaded");
}
}
private void CopyReloadableOptions(NatsOptions newOpts)
{
// Logging
_options.Debug = newOpts.Debug;
_options.Trace = newOpts.Trace;
_options.TraceVerbose = newOpts.TraceVerbose;
_options.Logtime = newOpts.Logtime;
_options.LogtimeUTC = newOpts.LogtimeUTC;
_options.LogFile = newOpts.LogFile;
_options.LogSizeLimit = newOpts.LogSizeLimit;
_options.LogMaxFiles = newOpts.LogMaxFiles;
_options.Syslog = newOpts.Syslog;
_options.RemoteSyslog = newOpts.RemoteSyslog;
// Auth
_options.Username = newOpts.Username;
_options.Password = newOpts.Password;
_options.Authorization = newOpts.Authorization;
_options.Users = newOpts.Users;
_options.NKeys = newOpts.NKeys;
_options.NoAuthUser = newOpts.NoAuthUser;
_options.AuthTimeout = newOpts.AuthTimeout;
// Limits
_options.MaxConnections = newOpts.MaxConnections;
_options.MaxPayload = newOpts.MaxPayload;
_options.MaxPending = newOpts.MaxPending;
_options.WriteDeadline = newOpts.WriteDeadline;
_options.PingInterval = newOpts.PingInterval;
_options.MaxPingsOut = newOpts.MaxPingsOut;
_options.MaxControlLine = newOpts.MaxControlLine;
_options.MaxSubs = newOpts.MaxSubs;
_options.MaxSubTokens = newOpts.MaxSubTokens;
_options.MaxTracedMsgLen = newOpts.MaxTracedMsgLen;
_options.MaxClosedClients = newOpts.MaxClosedClients;
// TLS
_options.TlsCert = newOpts.TlsCert;
_options.TlsKey = newOpts.TlsKey;
_options.TlsCaCert = newOpts.TlsCaCert;
_options.TlsVerify = newOpts.TlsVerify;
_options.TlsMap = newOpts.TlsMap;
_options.TlsTimeout = newOpts.TlsTimeout;
_options.TlsHandshakeFirst = newOpts.TlsHandshakeFirst;
_options.TlsHandshakeFirstFallback = newOpts.TlsHandshakeFirstFallback;
_options.AllowNonTls = newOpts.AllowNonTls;
_options.TlsRateLimit = newOpts.TlsRateLimit;
_options.TlsPinnedCerts = newOpts.TlsPinnedCerts;
// Misc
_options.Tags = newOpts.Tags;
_options.LameDuckDuration = newOpts.LameDuckDuration;
_options.LameDuckGracePeriod = newOpts.LameDuckGracePeriod;
_options.ClientAdvertise = newOpts.ClientAdvertise;
_options.DisableSublistCache = newOpts.DisableSublistCache;
_options.ConnectErrorReports = newOpts.ConnectErrorReports;
_options.ReconnectErrorReports = newOpts.ReconnectErrorReports;
_options.NoHeaderSupport = newOpts.NoHeaderSupport;
_options.NoSystemAccount = newOpts.NoSystemAccount;
_options.SystemAccount = newOpts.SystemAccount;
}
public void Dispose()
{
if (!IsShuttingDown)

View File

@@ -0,0 +1,76 @@
using NATS.Server.Configuration;
namespace NATS.Server.Tests;
public class ConfigIntegrationTests
{
[Fact]
public void Server_WithConfigFile_LoadsOptionsFromFile()
{
var dir = Path.Combine(Path.GetTempPath(), $"nats_test_{Guid.NewGuid():N}");
Directory.CreateDirectory(dir);
try
{
var confPath = Path.Combine(dir, "test.conf");
File.WriteAllText(confPath, "port: 14222\nmax_payload: 2mb\ndebug: true");
var opts = ConfigProcessor.ProcessConfigFile(confPath);
opts.Port.ShouldBe(14222);
opts.MaxPayload.ShouldBe(2 * 1024 * 1024);
opts.Debug.ShouldBeTrue();
}
finally
{
Directory.Delete(dir, true);
}
}
[Fact]
public void Server_CliOverridesConfig()
{
var dir = Path.Combine(Path.GetTempPath(), $"nats_test_{Guid.NewGuid():N}");
Directory.CreateDirectory(dir);
try
{
var confPath = Path.Combine(dir, "test.conf");
File.WriteAllText(confPath, "port: 14222\ndebug: true");
var opts = ConfigProcessor.ProcessConfigFile(confPath);
opts.Port.ShouldBe(14222);
// Simulate CLI override: user passed -p 5222 on command line
var cliSnapshot = new NatsOptions { Port = 5222 };
var cliFlags = new HashSet<string> { "Port" };
ConfigReloader.MergeCliOverrides(opts, cliSnapshot, cliFlags);
opts.Port.ShouldBe(5222);
opts.Debug.ShouldBeTrue(); // Config file value preserved
}
finally
{
Directory.Delete(dir, true);
}
}
[Fact]
public void Reload_ChangingPort_ReturnsError()
{
var oldOpts = new NatsOptions { Port = 4222 };
var newOpts = new NatsOptions { Port = 5222 };
var changes = ConfigReloader.Diff(oldOpts, newOpts);
var errors = ConfigReloader.Validate(changes);
errors.Count.ShouldBeGreaterThan(0);
errors[0].ShouldContain("Port");
}
[Fact]
public void Reload_ChangingDebug_IsValid()
{
var oldOpts = new NatsOptions { Debug = false };
var newOpts = new NatsOptions { Debug = true };
var changes = ConfigReloader.Diff(oldOpts, newOpts);
var errors = ConfigReloader.Validate(changes);
errors.ShouldBeEmpty();
changes.ShouldContain(c => c.IsLoggingChange);
}
}

View File

@@ -0,0 +1,504 @@
using NATS.Server;
using NATS.Server.Configuration;
namespace NATS.Server.Tests;
public class ConfigProcessorTests
{
private static string TestDataPath(string fileName) =>
Path.Combine(AppContext.BaseDirectory, "TestData", fileName);
// ─── Basic config ──────────────────────────────────────────────
[Fact]
public void BasicConf_Port()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("basic.conf"));
opts.Port.ShouldBe(4222);
}
[Fact]
public void BasicConf_Host()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("basic.conf"));
opts.Host.ShouldBe("0.0.0.0");
}
[Fact]
public void BasicConf_ServerName()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("basic.conf"));
opts.ServerName.ShouldBe("test-server");
}
[Fact]
public void BasicConf_MaxPayload()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("basic.conf"));
opts.MaxPayload.ShouldBe(2 * 1024 * 1024);
}
[Fact]
public void BasicConf_MaxConnections()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("basic.conf"));
opts.MaxConnections.ShouldBe(1000);
}
[Fact]
public void BasicConf_Debug()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("basic.conf"));
opts.Debug.ShouldBeTrue();
}
[Fact]
public void BasicConf_Trace()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("basic.conf"));
opts.Trace.ShouldBeFalse();
}
[Fact]
public void BasicConf_PingInterval()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("basic.conf"));
opts.PingInterval.ShouldBe(TimeSpan.FromSeconds(30));
}
[Fact]
public void BasicConf_MaxPingsOut()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("basic.conf"));
opts.MaxPingsOut.ShouldBe(3);
}
[Fact]
public void BasicConf_WriteDeadline()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("basic.conf"));
opts.WriteDeadline.ShouldBe(TimeSpan.FromSeconds(5));
}
[Fact]
public void BasicConf_MaxSubs()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("basic.conf"));
opts.MaxSubs.ShouldBe(100);
}
[Fact]
public void BasicConf_MaxSubTokens()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("basic.conf"));
opts.MaxSubTokens.ShouldBe(16);
}
[Fact]
public void BasicConf_MaxControlLine()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("basic.conf"));
opts.MaxControlLine.ShouldBe(2048);
}
[Fact]
public void BasicConf_MaxPending()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("basic.conf"));
opts.MaxPending.ShouldBe(32L * 1024 * 1024);
}
[Fact]
public void BasicConf_LameDuckDuration()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("basic.conf"));
opts.LameDuckDuration.ShouldBe(TimeSpan.FromSeconds(60));
}
[Fact]
public void BasicConf_LameDuckGracePeriod()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("basic.conf"));
opts.LameDuckGracePeriod.ShouldBe(TimeSpan.FromSeconds(5));
}
[Fact]
public void BasicConf_MonitorPort()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("basic.conf"));
opts.MonitorPort.ShouldBe(8222);
}
[Fact]
public void BasicConf_Logtime()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("basic.conf"));
opts.Logtime.ShouldBeTrue();
opts.LogtimeUTC.ShouldBeFalse();
}
// ─── Auth config ───────────────────────────────────────────────
[Fact]
public void AuthConf_SimpleUser()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("auth.conf"));
opts.Username.ShouldBe("admin");
opts.Password.ShouldBe("s3cret");
}
[Fact]
public void AuthConf_AuthTimeout()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("auth.conf"));
opts.AuthTimeout.ShouldBe(TimeSpan.FromSeconds(5));
}
[Fact]
public void AuthConf_NoAuthUser()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("auth.conf"));
opts.NoAuthUser.ShouldBe("guest");
}
[Fact]
public void AuthConf_UsersArray()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("auth.conf"));
opts.Users.ShouldNotBeNull();
opts.Users.Count.ShouldBe(2);
}
[Fact]
public void AuthConf_AliceUser()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("auth.conf"));
var alice = opts.Users!.First(u => u.Username == "alice");
alice.Password.ShouldBe("pw1");
alice.Permissions.ShouldNotBeNull();
alice.Permissions!.Publish.ShouldNotBeNull();
alice.Permissions.Publish!.Allow.ShouldNotBeNull();
alice.Permissions.Publish.Allow!.ShouldContain("foo.>");
alice.Permissions.Subscribe.ShouldNotBeNull();
alice.Permissions.Subscribe!.Allow.ShouldNotBeNull();
alice.Permissions.Subscribe.Allow!.ShouldContain(">");
}
[Fact]
public void AuthConf_BobUser()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("auth.conf"));
var bob = opts.Users!.First(u => u.Username == "bob");
bob.Password.ShouldBe("pw2");
bob.Permissions.ShouldBeNull();
}
// ─── TLS config ────────────────────────────────────────────────
[Fact]
public void TlsConf_CertFiles()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("tls.conf"));
opts.TlsCert.ShouldBe("/path/to/cert.pem");
opts.TlsKey.ShouldBe("/path/to/key.pem");
opts.TlsCaCert.ShouldBe("/path/to/ca.pem");
}
[Fact]
public void TlsConf_Verify()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("tls.conf"));
opts.TlsVerify.ShouldBeTrue();
opts.TlsMap.ShouldBeTrue();
}
[Fact]
public void TlsConf_Timeout()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("tls.conf"));
opts.TlsTimeout.ShouldBe(TimeSpan.FromSeconds(3));
}
[Fact]
public void TlsConf_RateLimit()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("tls.conf"));
opts.TlsRateLimit.ShouldBe(100);
}
[Fact]
public void TlsConf_PinnedCerts()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("tls.conf"));
opts.TlsPinnedCerts.ShouldNotBeNull();
opts.TlsPinnedCerts!.Count.ShouldBe(1);
opts.TlsPinnedCerts.ShouldContain("abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789");
}
[Fact]
public void TlsConf_HandshakeFirst()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("tls.conf"));
opts.TlsHandshakeFirst.ShouldBeTrue();
}
[Fact]
public void TlsConf_AllowNonTls()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("tls.conf"));
opts.AllowNonTls.ShouldBeFalse();
}
// ─── Full config ───────────────────────────────────────────────
[Fact]
public void FullConf_CoreOptions()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("full.conf"));
opts.Port.ShouldBe(4222);
opts.Host.ShouldBe("0.0.0.0");
opts.ServerName.ShouldBe("full-test");
opts.ClientAdvertise.ShouldBe("nats://public.example.com:4222");
}
[Fact]
public void FullConf_Limits()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("full.conf"));
opts.MaxPayload.ShouldBe(1024 * 1024);
opts.MaxControlLine.ShouldBe(4096);
opts.MaxConnections.ShouldBe(65536);
opts.MaxPending.ShouldBe(64L * 1024 * 1024);
opts.MaxSubs.ShouldBe(0);
opts.MaxSubTokens.ShouldBe(0);
opts.MaxTracedMsgLen.ShouldBe(1024);
opts.DisableSublistCache.ShouldBeFalse();
opts.MaxClosedClients.ShouldBe(5000);
}
[Fact]
public void FullConf_Logging()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("full.conf"));
opts.Debug.ShouldBeFalse();
opts.Trace.ShouldBeFalse();
opts.TraceVerbose.ShouldBeFalse();
opts.Logtime.ShouldBeTrue();
opts.LogtimeUTC.ShouldBeFalse();
opts.LogFile.ShouldBe("/var/log/nats.log");
opts.LogSizeLimit.ShouldBe(100L * 1024 * 1024);
opts.LogMaxFiles.ShouldBe(5);
}
[Fact]
public void FullConf_Monitoring()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("full.conf"));
opts.MonitorPort.ShouldBe(8222);
opts.MonitorBasePath.ShouldBe("/nats");
}
[Fact]
public void FullConf_Files()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("full.conf"));
opts.PidFile.ShouldBe("/var/run/nats.pid");
opts.PortsFileDir.ShouldBe("/var/run");
}
[Fact]
public void FullConf_Lifecycle()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("full.conf"));
opts.LameDuckDuration.ShouldBe(TimeSpan.FromMinutes(2));
opts.LameDuckGracePeriod.ShouldBe(TimeSpan.FromSeconds(10));
}
[Fact]
public void FullConf_Tags()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("full.conf"));
opts.Tags.ShouldNotBeNull();
opts.Tags!["region"].ShouldBe("us-east");
opts.Tags["env"].ShouldBe("production");
}
[Fact]
public void FullConf_Auth()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("full.conf"));
opts.Username.ShouldBe("admin");
opts.Password.ShouldBe("secret");
opts.AuthTimeout.ShouldBe(TimeSpan.FromSeconds(2));
}
[Fact]
public void FullConf_Tls()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("full.conf"));
opts.TlsCert.ShouldBe("/path/to/cert.pem");
opts.TlsKey.ShouldBe("/path/to/key.pem");
opts.TlsCaCert.ShouldBe("/path/to/ca.pem");
opts.TlsVerify.ShouldBeTrue();
opts.TlsTimeout.ShouldBe(TimeSpan.FromSeconds(2));
opts.TlsHandshakeFirst.ShouldBeTrue();
}
// ─── Listen combined format ────────────────────────────────────
[Fact]
public void ListenCombined_HostAndPort()
{
var opts = ConfigProcessor.ProcessConfig("listen: \"10.0.0.1:5222\"");
opts.Host.ShouldBe("10.0.0.1");
opts.Port.ShouldBe(5222);
}
[Fact]
public void ListenCombined_PortOnly()
{
var opts = ConfigProcessor.ProcessConfig("listen: \":5222\"");
opts.Port.ShouldBe(5222);
}
[Fact]
public void ListenCombined_BarePort()
{
var opts = ConfigProcessor.ProcessConfig("listen: 5222");
opts.Port.ShouldBe(5222);
}
// ─── HTTP combined format ──────────────────────────────────────
[Fact]
public void HttpCombined_HostAndPort()
{
var opts = ConfigProcessor.ProcessConfig("http: \"10.0.0.1:8333\"");
opts.MonitorHost.ShouldBe("10.0.0.1");
opts.MonitorPort.ShouldBe(8333);
}
[Fact]
public void HttpsCombined_HostAndPort()
{
var opts = ConfigProcessor.ProcessConfig("https: \"10.0.0.1:8444\"");
opts.MonitorHost.ShouldBe("10.0.0.1");
opts.MonitorHttpsPort.ShouldBe(8444);
}
// ─── Duration as number ────────────────────────────────────────
[Fact]
public void DurationAsNumber_TreatedAsSeconds()
{
var opts = ConfigProcessor.ProcessConfig("ping_interval: 60");
opts.PingInterval.ShouldBe(TimeSpan.FromSeconds(60));
}
[Fact]
public void DurationAsString_Milliseconds()
{
var opts = ConfigProcessor.ProcessConfig("write_deadline: \"500ms\"");
opts.WriteDeadline.ShouldBe(TimeSpan.FromMilliseconds(500));
}
[Fact]
public void DurationAsString_Hours()
{
var opts = ConfigProcessor.ProcessConfig("ping_interval: \"1h\"");
opts.PingInterval.ShouldBe(TimeSpan.FromHours(1));
}
// ─── Unknown keys ──────────────────────────────────────────────
[Fact]
public void UnknownKeys_SilentlyIgnored()
{
var opts = ConfigProcessor.ProcessConfig("""
port: 4222
cluster { name: "my-cluster" }
jetstream { store_dir: "/tmp/js" }
unknown_key: "whatever"
""");
opts.Port.ShouldBe(4222);
}
// ─── Server name validation ────────────────────────────────────
[Fact]
public void ServerNameWithSpaces_ReportsError()
{
var ex = Should.Throw<ConfigProcessorException>(() =>
ConfigProcessor.ProcessConfig("server_name: \"my server\""));
ex.Errors.ShouldContain(e => e.Contains("server_name cannot contain spaces"));
}
// ─── Max sub tokens validation ─────────────────────────────────
[Fact]
public void MaxSubTokens_ExceedsLimit_ReportsError()
{
var ex = Should.Throw<ConfigProcessorException>(() =>
ConfigProcessor.ProcessConfig("max_sub_tokens: 300"));
ex.Errors.ShouldContain(e => e.Contains("max_sub_tokens cannot exceed 256"));
}
// ─── ProcessConfig from string ─────────────────────────────────
[Fact]
public void ProcessConfig_FromString()
{
var opts = ConfigProcessor.ProcessConfig("""
port: 9222
host: "127.0.0.1"
debug: true
""");
opts.Port.ShouldBe(9222);
opts.Host.ShouldBe("127.0.0.1");
opts.Debug.ShouldBeTrue();
}
// ─── TraceVerbose sets Trace ────────────────────────────────────
[Fact]
public void TraceVerbose_AlsoSetsTrace()
{
var opts = ConfigProcessor.ProcessConfig("trace_verbose: true");
opts.TraceVerbose.ShouldBeTrue();
opts.Trace.ShouldBeTrue();
}
// ─── Error collection (not fail-fast) ──────────────────────────
[Fact]
public void MultipleErrors_AllCollected()
{
var ex = Should.Throw<ConfigProcessorException>(() =>
ConfigProcessor.ProcessConfig("""
server_name: "bad name"
max_sub_tokens: 999
"""));
ex.Errors.Count.ShouldBe(2);
ex.Errors.ShouldContain(e => e.Contains("server_name"));
ex.Errors.ShouldContain(e => e.Contains("max_sub_tokens"));
}
// ─── ConfigFile path tracking ──────────────────────────────────
[Fact]
public void ProcessConfigFile_SetsConfigFilePath()
{
var path = TestDataPath("basic.conf");
var opts = ConfigProcessor.ProcessConfigFile(path);
opts.ConfigFile.ShouldBe(path);
}
// ─── HasTls derived property ───────────────────────────────────
[Fact]
public void HasTls_TrueWhenCertAndKeySet()
{
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("tls.conf"));
opts.HasTls.ShouldBeTrue();
}
}

View File

@@ -0,0 +1,89 @@
using NATS.Server;
using NATS.Server.Auth;
using NATS.Server.Configuration;
namespace NATS.Server.Tests;
public class ConfigReloadTests
{
[Fact]
public void Diff_NoChanges_ReturnsEmpty()
{
var old = new NatsOptions { Port = 4222, Debug = true };
var @new = new NatsOptions { Port = 4222, Debug = true };
var changes = ConfigReloader.Diff(old, @new);
changes.ShouldBeEmpty();
}
[Fact]
public void Diff_ReloadableChange_ReturnsChange()
{
var old = new NatsOptions { Debug = false };
var @new = new NatsOptions { Debug = true };
var changes = ConfigReloader.Diff(old, @new);
changes.Count.ShouldBe(1);
changes[0].Name.ShouldBe("Debug");
changes[0].IsLoggingChange.ShouldBeTrue();
}
[Fact]
public void Diff_NonReloadableChange_ReturnsNonReloadableChange()
{
var old = new NatsOptions { Port = 4222 };
var @new = new NatsOptions { Port = 5222 };
var changes = ConfigReloader.Diff(old, @new);
changes.Count.ShouldBe(1);
changes[0].IsNonReloadable.ShouldBeTrue();
}
[Fact]
public void Diff_MultipleChanges_ReturnsAll()
{
var old = new NatsOptions { Debug = false, MaxPayload = 1024 };
var @new = new NatsOptions { Debug = true, MaxPayload = 2048 };
var changes = ConfigReloader.Diff(old, @new);
changes.Count.ShouldBe(2);
}
[Fact]
public void Diff_AuthChange_MarkedCorrectly()
{
var old = new NatsOptions { Username = "alice" };
var @new = new NatsOptions { Username = "bob" };
var changes = ConfigReloader.Diff(old, @new);
changes[0].IsAuthChange.ShouldBeTrue();
}
[Fact]
public void Diff_TlsChange_MarkedCorrectly()
{
var old = new NatsOptions { TlsCert = "/old/cert.pem" };
var @new = new NatsOptions { TlsCert = "/new/cert.pem" };
var changes = ConfigReloader.Diff(old, @new);
changes[0].IsTlsChange.ShouldBeTrue();
}
[Fact]
public void Validate_NonReloadableChanges_ReturnsErrors()
{
var changes = new List<IConfigChange>
{
new ConfigChange("Port", isNonReloadable: true),
};
var errors = ConfigReloader.Validate(changes);
errors.Count.ShouldBe(1);
errors[0].ShouldContain("Port");
}
[Fact]
public void MergeWithCli_CliOverridesConfig()
{
var fromConfig = new NatsOptions { Port = 5222, Debug = true };
var cliFlags = new HashSet<string> { "Port" };
var cliValues = new NatsOptions { Port = 4222 };
ConfigReloader.MergeCliOverrides(fromConfig, cliValues, cliFlags);
fromConfig.Port.ShouldBe(4222); // CLI wins
fromConfig.Debug.ShouldBeTrue(); // config value kept (not in CLI)
}
}

View File

@@ -21,6 +21,10 @@
<Using Include="Shouldly" />
</ItemGroup>
<ItemGroup>
<None Update="TestData\**\*" CopyToOutputDirectory="PreserveNewest" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\NATS.Server\NATS.Server.csproj" />
</ItemGroup>

View File

@@ -0,0 +1,221 @@
using NATS.Server.Configuration;
namespace NATS.Server.Tests;
public class NatsConfLexerTests
{
[Fact]
public void Lex_SimpleKeyStringValue_ReturnsKeyAndString()
{
var tokens = NatsConfLexer.Tokenize("foo = \"bar\"").ToList();
tokens[0].Type.ShouldBe(TokenType.Key);
tokens[0].Value.ShouldBe("foo");
tokens[1].Type.ShouldBe(TokenType.String);
tokens[1].Value.ShouldBe("bar");
tokens[2].Type.ShouldBe(TokenType.Eof);
}
[Fact]
public void Lex_SingleQuotedString_ReturnsString()
{
var tokens = NatsConfLexer.Tokenize("foo = 'bar'").ToList();
tokens[1].Type.ShouldBe(TokenType.String);
tokens[1].Value.ShouldBe("bar");
}
[Fact]
public void Lex_IntegerValue_ReturnsInteger()
{
var tokens = NatsConfLexer.Tokenize("port = 4222").ToList();
tokens[0].Type.ShouldBe(TokenType.Key);
tokens[0].Value.ShouldBe("port");
tokens[1].Type.ShouldBe(TokenType.Integer);
tokens[1].Value.ShouldBe("4222");
}
[Fact]
public void Lex_IntegerWithSuffix_ReturnsInteger()
{
var tokens = NatsConfLexer.Tokenize("size = 64mb").ToList();
tokens[1].Type.ShouldBe(TokenType.Integer);
tokens[1].Value.ShouldBe("64mb");
}
[Fact]
public void Lex_BooleanValues_ReturnsBool()
{
foreach (var val in new[] { "true", "false", "yes", "no", "on", "off" })
{
var tokens = NatsConfLexer.Tokenize($"flag = {val}").ToList();
tokens[1].Type.ShouldBe(TokenType.Bool);
}
}
[Fact]
public void Lex_FloatValue_ReturnsFloat()
{
var tokens = NatsConfLexer.Tokenize("rate = 2.5").ToList();
tokens[1].Type.ShouldBe(TokenType.Float);
tokens[1].Value.ShouldBe("2.5");
}
[Fact]
public void Lex_NegativeNumber_ReturnsInteger()
{
var tokens = NatsConfLexer.Tokenize("offset = -10").ToList();
tokens[1].Type.ShouldBe(TokenType.Integer);
tokens[1].Value.ShouldBe("-10");
}
[Fact]
public void Lex_DatetimeValue_ReturnsDatetime()
{
var tokens = NatsConfLexer.Tokenize("ts = 2024-01-15T10:30:00Z").ToList();
tokens[1].Type.ShouldBe(TokenType.DateTime);
}
[Fact]
public void Lex_HashComment_IsIgnored()
{
var tokens = NatsConfLexer.Tokenize("# this is a comment\nfoo = 1").ToList();
var keys = tokens.Where(t => t.Type == TokenType.Key).ToList();
keys.Count.ShouldBe(1);
keys[0].Value.ShouldBe("foo");
}
[Fact]
public void Lex_SlashComment_IsIgnored()
{
var tokens = NatsConfLexer.Tokenize("// comment\nfoo = 1").ToList();
var keys = tokens.Where(t => t.Type == TokenType.Key).ToList();
keys.Count.ShouldBe(1);
}
[Fact]
public void Lex_MapBlock_ReturnsMapStartEnd()
{
var tokens = NatsConfLexer.Tokenize("auth { user: admin }").ToList();
tokens[0].Type.ShouldBe(TokenType.Key);
tokens[0].Value.ShouldBe("auth");
tokens[1].Type.ShouldBe(TokenType.MapStart);
tokens[2].Type.ShouldBe(TokenType.Key);
tokens[2].Value.ShouldBe("user");
tokens[3].Type.ShouldBe(TokenType.String);
tokens[3].Value.ShouldBe("admin");
tokens[4].Type.ShouldBe(TokenType.MapEnd);
}
[Fact]
public void Lex_Array_ReturnsArrayStartEnd()
{
var tokens = NatsConfLexer.Tokenize("items = [1, 2, 3]").ToList();
tokens[1].Type.ShouldBe(TokenType.ArrayStart);
tokens[2].Type.ShouldBe(TokenType.Integer);
tokens[2].Value.ShouldBe("1");
tokens[5].Type.ShouldBe(TokenType.ArrayEnd);
}
[Fact]
public void Lex_Variable_ReturnsVariable()
{
var tokens = NatsConfLexer.Tokenize("secret = $MY_VAR").ToList();
tokens[1].Type.ShouldBe(TokenType.Variable);
tokens[1].Value.ShouldBe("MY_VAR");
}
[Fact]
public void Lex_Include_ReturnsInclude()
{
var tokens = NatsConfLexer.Tokenize("include \"auth.conf\"").ToList();
tokens[0].Type.ShouldBe(TokenType.Include);
tokens[0].Value.ShouldBe("auth.conf");
}
[Fact]
public void Lex_EscapeSequences_AreProcessed()
{
var tokens = NatsConfLexer.Tokenize("msg = \"hello\\tworld\\n\"").ToList();
tokens[1].Type.ShouldBe(TokenType.String);
tokens[1].Value.ShouldBe("hello\tworld\n");
}
[Fact]
public void Lex_HexEscape_IsProcessed()
{
var tokens = NatsConfLexer.Tokenize("val = \"\\x41\\x42\"").ToList();
tokens[1].Value.ShouldBe("AB");
}
[Fact]
public void Lex_ColonSeparator_Works()
{
var tokens = NatsConfLexer.Tokenize("foo: bar").ToList();
tokens[0].Type.ShouldBe(TokenType.Key);
tokens[1].Type.ShouldBe(TokenType.String);
}
[Fact]
public void Lex_WhitespaceSeparator_Works()
{
var tokens = NatsConfLexer.Tokenize("foo bar").ToList();
tokens[0].Type.ShouldBe(TokenType.Key);
tokens[1].Type.ShouldBe(TokenType.String);
}
[Fact]
public void Lex_SemicolonTerminator_IsHandled()
{
var tokens = NatsConfLexer.Tokenize("foo = 1; bar = 2").ToList();
var keys = tokens.Where(t => t.Type == TokenType.Key).ToList();
keys.Count.ShouldBe(2);
}
[Fact]
public void Lex_EmptyInput_ReturnsEof()
{
var tokens = NatsConfLexer.Tokenize("").ToList();
tokens.Count.ShouldBe(1);
tokens[0].Type.ShouldBe(TokenType.Eof);
}
[Fact]
public void Lex_BlockString_ReturnsString()
{
var input = "desc (\nthis is\na block\n)\n";
var tokens = NatsConfLexer.Tokenize(input).ToList();
tokens[0].Type.ShouldBe(TokenType.Key);
tokens[1].Type.ShouldBe(TokenType.String);
}
[Fact]
public void Lex_IPAddress_ReturnsString()
{
var tokens = NatsConfLexer.Tokenize("host = 127.0.0.1").ToList();
tokens[1].Type.ShouldBe(TokenType.String);
tokens[1].Value.ShouldBe("127.0.0.1");
}
[Fact]
public void Lex_TrackLineNumbers()
{
var tokens = NatsConfLexer.Tokenize("a = 1\nb = 2\nc = 3").ToList();
tokens[0].Line.ShouldBe(1); // a
tokens[2].Line.ShouldBe(2); // b
tokens[4].Line.ShouldBe(3); // c
}
[Fact]
public void Lex_UnterminatedString_ReturnsError()
{
var tokens = NatsConfLexer.Tokenize("foo = \"unterminated").ToList();
tokens.ShouldContain(t => t.Type == TokenType.Error);
}
[Fact]
public void Lex_StringStartingWithDigit_TreatedAsString()
{
var tokens = NatsConfLexer.Tokenize("foo = 3xyz").ToList();
tokens[1].Type.ShouldBe(TokenType.String);
tokens[1].Value.ShouldBe("3xyz");
}
}

View File

@@ -0,0 +1,184 @@
using NATS.Server.Configuration;
namespace NATS.Server.Tests;
public class NatsConfParserTests
{
[Fact]
public void Parse_SimpleTopLevel_ReturnsCorrectTypes()
{
var result = NatsConfParser.Parse("foo = '1'; bar = 2.2; baz = true; boo = 22");
result["foo"].ShouldBe("1");
result["bar"].ShouldBe(2.2);
result["baz"].ShouldBe(true);
result["boo"].ShouldBe(22L);
}
[Fact]
public void Parse_Booleans_AllVariants()
{
foreach (var (input, expected) in new[] {
("true", true), ("TRUE", true), ("yes", true), ("on", true),
("false", false), ("FALSE", false), ("no", false), ("off", false)
})
{
var result = NatsConfParser.Parse($"flag = {input}");
result["flag"].ShouldBe(expected);
}
}
[Fact]
public void Parse_IntegerWithSuffix_AppliesMultiplier()
{
var result = NatsConfParser.Parse("a = 1k; b = 2mb; c = 3gb; d = 4kb");
result["a"].ShouldBe(1000L);
result["b"].ShouldBe(2L * 1024 * 1024);
result["c"].ShouldBe(3L * 1024 * 1024 * 1024);
result["d"].ShouldBe(4L * 1024);
}
[Fact]
public void Parse_NestedMap_ReturnsDictionary()
{
var result = NatsConfParser.Parse("auth { user: admin, pass: secret }");
var auth = result["auth"].ShouldBeOfType<Dictionary<string, object?>>();
auth["user"].ShouldBe("admin");
auth["pass"].ShouldBe("secret");
}
[Fact]
public void Parse_Array_ReturnsList()
{
var result = NatsConfParser.Parse("items = [1, 2, 3]");
var items = result["items"].ShouldBeOfType<List<object?>>();
items.Count.ShouldBe(3);
items[0].ShouldBe(1L);
}
[Fact]
public void Parse_Variable_ResolvesFromContext()
{
var result = NatsConfParser.Parse("index = 22\nfoo = $index");
result["foo"].ShouldBe(22L);
}
[Fact]
public void Parse_NestedVariable_UsesBlockScope()
{
var input = "index = 22\nnest {\n index = 11\n foo = $index\n}\nbar = $index";
var result = NatsConfParser.Parse(input);
var nest = result["nest"].ShouldBeOfType<Dictionary<string, object?>>();
nest["foo"].ShouldBe(11L);
result["bar"].ShouldBe(22L);
}
[Fact]
public void Parse_EnvironmentVariable_ResolvesFromEnv()
{
Environment.SetEnvironmentVariable("NATS_TEST_VAR_12345", "hello");
try
{
var result = NatsConfParser.Parse("val = $NATS_TEST_VAR_12345");
result["val"].ShouldBe("hello");
}
finally
{
Environment.SetEnvironmentVariable("NATS_TEST_VAR_12345", null);
}
}
[Fact]
public void Parse_UndefinedVariable_Throws()
{
Should.Throw<FormatException>(() =>
NatsConfParser.Parse("val = $UNDEFINED_VAR_XYZZY_99999"));
}
[Fact]
public void Parse_IncludeDirective_MergesFile()
{
var dir = Path.Combine(Path.GetTempPath(), $"nats_test_{Guid.NewGuid():N}");
Directory.CreateDirectory(dir);
try
{
File.WriteAllText(Path.Combine(dir, "main.conf"), "port = 4222\ninclude \"sub.conf\"");
File.WriteAllText(Path.Combine(dir, "sub.conf"), "host = \"localhost\"");
var result = NatsConfParser.ParseFile(Path.Combine(dir, "main.conf"));
result["port"].ShouldBe(4222L);
result["host"].ShouldBe("localhost");
}
finally
{
Directory.Delete(dir, true);
}
}
[Fact]
public void Parse_MultipleKeySeparators_AllWork()
{
var r1 = NatsConfParser.Parse("a = 1");
var r2 = NatsConfParser.Parse("a : 1");
var r3 = NatsConfParser.Parse("a 1");
r1["a"].ShouldBe(1L);
r2["a"].ShouldBe(1L);
r3["a"].ShouldBe(1L);
}
[Fact]
public void Parse_ErrorOnInvalidInput_Throws()
{
Should.Throw<FormatException>(() => NatsConfParser.Parse("= invalid"));
}
[Fact]
public void Parse_CommentsInsideBlocks_AreIgnored()
{
var input = "auth {\n # comment\n user: admin\n // another comment\n pass: secret\n}";
var result = NatsConfParser.Parse(input);
var auth = result["auth"].ShouldBeOfType<Dictionary<string, object?>>();
auth["user"].ShouldBe("admin");
auth["pass"].ShouldBe("secret");
}
[Fact]
public void Parse_ArrayOfMaps_Works()
{
var input = "users = [\n { user: alice, pass: pw1 }\n { user: bob, pass: pw2 }\n]";
var result = NatsConfParser.Parse(input);
var users = result["users"].ShouldBeOfType<List<object?>>();
users.Count.ShouldBe(2);
var first = users[0].ShouldBeOfType<Dictionary<string, object?>>();
first["user"].ShouldBe("alice");
}
[Fact]
public void Parse_BcryptPassword_HandledAsString()
{
var input = "pass = $2a$04$P/.bd.7unw9Ew7yWJqXsl.f4oNRLQGvadEL2YnqQXbbb.IVQajRdK";
var result = NatsConfParser.Parse(input);
((string)result["pass"]!).ShouldStartWith("$2a$");
}
[Fact]
public void ParseFile_WithDigest_ReturnsStableHash()
{
var dir = Path.Combine(Path.GetTempPath(), $"nats_test_{Guid.NewGuid():N}");
Directory.CreateDirectory(dir);
try
{
var conf = Path.Combine(dir, "test.conf");
File.WriteAllText(conf, "port = 4222\nhost = \"localhost\"");
var (result, digest) = NatsConfParser.ParseFileWithDigest(conf);
result["port"].ShouldBe(4222L);
digest.ShouldStartWith("sha256:");
digest.Length.ShouldBeGreaterThan(10);
var (_, digest2) = NatsConfParser.ParseFileWithDigest(conf);
digest2.ShouldBe(digest);
}
finally
{
Directory.Delete(dir, true);
}
}
}

View File

@@ -14,6 +14,22 @@ public class NatsOptionsTests
opts.LogSizeLimit.ShouldBe(0L);
opts.Tags.ShouldBeNull();
}
[Fact]
public void New_fields_have_correct_defaults()
{
var opts = new NatsOptions();
opts.ClientAdvertise.ShouldBeNull();
opts.TraceVerbose.ShouldBeFalse();
opts.MaxTracedMsgLen.ShouldBe(0);
opts.DisableSublistCache.ShouldBeFalse();
opts.ConnectErrorReports.ShouldBe(3600);
opts.ReconnectErrorReports.ShouldBe(1);
opts.NoHeaderSupport.ShouldBeFalse();
opts.MaxClosedClients.ShouldBe(10_000);
opts.NoSystemAccount.ShouldBeFalse();
opts.SystemAccount.ShouldBeNull();
}
}
public class LogOverrideTests

View File

@@ -0,0 +1,11 @@
authorization {
user: admin
password: "s3cret"
timeout: 5
users = [
{ user: alice, password: "pw1", permissions: { publish: { allow: ["foo.>"] }, subscribe: { allow: [">"] } } }
{ user: bob, password: "pw2" }
]
}
no_auth_user: "guest"

View File

@@ -0,0 +1,19 @@
port: 4222
host: "0.0.0.0"
server_name: "test-server"
max_payload: 2mb
max_connections: 1000
debug: true
trace: false
logtime: true
logtime_utc: false
ping_interval: "30s"
ping_max: 3
write_deadline: "5s"
max_subs: 100
max_sub_tokens: 16
max_control_line: 2048
max_pending: 32mb
lame_duck_duration: "60s"
lame_duck_grace_period: "5s"
http_port: 8222

View File

@@ -0,0 +1,57 @@
# Full configuration with all supported options
port: 4222
host: "0.0.0.0"
server_name: "full-test"
client_advertise: "nats://public.example.com:4222"
max_payload: 1mb
max_control_line: 4096
max_connections: 65536
max_pending: 64mb
write_deadline: "10s"
max_subs: 0
max_sub_tokens: 0
max_traced_msg_len: 1024
disable_sublist_cache: false
max_closed_clients: 5000
ping_interval: "2m"
ping_max: 2
debug: false
trace: false
trace_verbose: false
logtime: true
logtime_utc: false
logfile: "/var/log/nats.log"
log_size_limit: 100mb
log_max_num: 5
http_port: 8222
http_base_path: "/nats"
pidfile: "/var/run/nats.pid"
ports_file_dir: "/var/run"
lame_duck_duration: "2m"
lame_duck_grace_period: "10s"
server_tags {
region: "us-east"
env: "production"
}
authorization {
user: admin
password: "secret"
timeout: 2
}
tls {
cert_file: "/path/to/cert.pem"
key_file: "/path/to/key.pem"
ca_file: "/path/to/ca.pem"
verify: true
timeout: 2
handshake_first: true
}

View File

@@ -0,0 +1,12 @@
tls {
cert_file: "/path/to/cert.pem"
key_file: "/path/to/key.pem"
ca_file: "/path/to/ca.pem"
verify: true
verify_and_map: true
timeout: 3
connection_rate_limit: 100
pinned_certs: ["abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789"]
handshake_first: true
}
allow_non_tls: false