Compare commits

...

18 Commits

Author SHA1 Message Date
Joseph Doherty
9b784024db docs: update differences.md to reflect SYSTEM/ACCOUNT types and imports/exports implemented 2026-02-23 06:04:29 -05:00
Joseph Doherty
86283a7f97 feat: add latency tracking for service import request-reply 2026-02-23 06:03:37 -05:00
Joseph Doherty
4450c27381 feat: add response routing for service import request-reply patterns 2026-02-23 06:01:53 -05:00
Joseph Doherty
c9066e526d feat: wire service import forwarding into message delivery path
Add ProcessServiceImport method to NatsServer that transforms subjects
from importer to exporter namespace and delivers to destination account
subscribers. Wire service import checking into ProcessMessage so that
publishes matching a service import "From" pattern are automatically
forwarded to the destination account. Includes MapImportSubject for
wildcard-aware subject mapping and WireServiceImports for import setup.
2026-02-23 05:59:36 -05:00
Joseph Doherty
4c2b7fa3de feat: add import/export support to Account with ACCOUNT client lazy creation 2026-02-23 05:54:31 -05:00
Joseph Doherty
591833adbb feat: add import/export model types (ServiceImport, StreamImport, exports, auth) 2026-02-23 05:51:30 -05:00
Joseph Doherty
5bae9cc289 feat: add system request-reply monitoring services ($SYS.REQ.SERVER.*)
Register VARZ, HEALTHZ, SUBSZ, STATSZ, and IDZ request-reply handlers
on $SYS.REQ.SERVER.{id}.* subjects and $SYS.REQ.SERVER.PING.* wildcard
subjects via InitEventTracking. Also excludes the $SYS system account
from the /subz monitoring endpoint by default since its subscriptions
are internal infrastructure.
2026-02-23 05:48:32 -05:00
Joseph Doherty
0b34f8cec4 feat: add periodic server stats and account connection heartbeat publishing 2026-02-23 05:44:09 -05:00
Joseph Doherty
125b71b3b0 feat: wire system event publishing for connect, disconnect, and shutdown 2026-02-23 05:41:44 -05:00
Joseph Doherty
89465450a1 fix: use per-SID callback dictionary in SysSubscribe to support multiple subscriptions 2026-02-23 05:38:10 -05:00
Joseph Doherty
8e790445f4 feat: add InternalEventSystem with Channel-based send/receive loops 2026-02-23 05:34:57 -05:00
Joseph Doherty
fc96b6eb43 feat: add system event DTOs and JSON source generator context 2026-02-23 05:29:40 -05:00
Joseph Doherty
b0c5b4acd8 feat: add system event subject constants and SystemMessageHandler delegate 2026-02-23 05:26:25 -05:00
Joseph Doherty
0c4bca9073 feat: add InternalClient class for socketless internal messaging 2026-02-23 05:22:58 -05:00
Joseph Doherty
0e7db5615e feat: add INatsClient interface and implement on NatsClient
Extract INatsClient interface from NatsClient to enable internal clients
(SYSTEM, ACCOUNT) to participate in the subscription system without
requiring a socket connection. Change Subscription.Client from concrete
NatsClient to INatsClient, keeping IMessageRouter and RemoveClient using
the concrete type since only socket clients need those paths.
2026-02-23 05:18:59 -05:00
Joseph Doherty
5e11785bdf feat: add ClientKind enum with IsInternal extension 2026-02-23 05:15:06 -05:00
Joseph Doherty
4b3890f046 docs: add implementation plan for SYSTEM/ACCOUNT connection types
16 tasks across 6 layers: ClientKind + INatsClient + InternalClient,
event infrastructure, event publishing, request-reply services,
import/export model, and response routing with latency tracking.
2026-02-23 05:12:02 -05:00
Joseph Doherty
e0abce66da docs: add mqtt connection type design 2026-02-23 05:08:44 -05:00
34 changed files with 5382 additions and 9 deletions

View File

@@ -11,7 +11,7 @@
| Feature | Go | .NET | Notes |
|---------|:--:|:----:|-------|
| NKey generation (server identity) | Y | Y | Ed25519 key pair via NATS.NKeys at startup |
| System account setup | Y | Y | `$SYS` account created; no event publishing yet (stub) |
| System account setup | Y | Y | `$SYS` account with InternalEventSystem, event publishing, request-reply services |
| Config file validation on startup | Y | Y | Full config parsing with error collection via `ConfigProcessor` |
| PID file writing | Y | Y | Written on startup, deleted on shutdown |
| Profiling HTTP endpoint (`/debug/pprof`) | Y | Stub | `ProfPort` option exists but endpoint not implemented |
@@ -64,9 +64,9 @@
| ROUTER | Y | N | Excluded per scope |
| GATEWAY | Y | N | Excluded per scope |
| LEAF | Y | N | Excluded per scope |
| SYSTEM (internal) | Y | N | |
| SYSTEM (internal) | Y | Y | InternalClient + InternalEventSystem with Channel-based send/receive loops |
| JETSTREAM (internal) | Y | N | |
| ACCOUNT (internal) | Y | N | |
| ACCOUNT (internal) | Y | Y | Lazy per-account InternalClient with import/export subscription support |
| WebSocket clients | Y | N | |
| MQTT clients | Y | N | |
@@ -218,7 +218,7 @@ Go implements a sophisticated slow consumer detection system:
|---------|:--:|:----:|-------|
| Per-account SubList isolation | Y | Y | |
| Multi-account user resolution | Y | Y | `AccountConfig` per account in `NatsOptions.Accounts`; `GetOrCreateAccount` wires limits |
| Account exports/imports | Y | N | |
| Account exports/imports | Y | Y | ServiceImport/StreamImport with ExportAuth, subject transforms, response routing |
| Per-account connection limits | Y | Y | `Account.AddClient()` returns false when `MaxConnections` exceeded |
| Per-account subscription limits | Y | Y | `Account.IncrementSubscriptions()` enforced in `ProcessSub()` |
| Account JetStream limits | Y | N | Excluded per scope |
@@ -406,6 +406,11 @@ The following items from the original gap list have been implemented:
- **User revocation** — per-account tracking with wildcard (`*`) revocation
- **Config file parsing** — custom lexer/parser ported from Go; supports includes, variables, nested blocks, size suffixes
- **Hot reload (SIGHUP)** — re-parses config, diffs changes, validates reloadable set, applies with CLI precedence
- **SYSTEM client type** — InternalClient with InternalEventSystem, Channel-based send/receive loops, event publishing
- **ACCOUNT client type** — lazy per-account InternalClient with import/export subscription support
- **System event publishing** — connect/disconnect advisories, server stats, shutdown/lame-duck events, auth errors
- **System request-reply services** — $SYS.REQ.SERVER.*.VARZ/CONNZ/SUBSZ/HEALTHZ/IDZ/STATSZ with ping wildcards
- **Account exports/imports** — service and stream imports with ExportAuth, subject transforms, response routing, latency tracking
### Remaining Lower Priority
1. **Dynamic buffer sizing** — delegated to Pipe, less optimized for long-lived connections

View File

@@ -0,0 +1,93 @@
# MQTT Connection Type Port Design
## Goal
Port MQTT-related connection type parity from Go into the .NET server for two scoped areas:
1. JWT `allowed_connection_types` behavior for `MQTT` / `MQTT_WS` (plus existing known types).
2. `/connz` filtering by `mqtt_client`.
## Scope
- In scope:
- JWT allowed connection type normalization and enforcement semantics.
- `/connz?mqtt_client=` option parsing and filtering.
- Unit/integration tests for new and updated behavior.
- `differences.md` updates after implementation is verified.
- Out of scope:
- Full MQTT transport implementation.
- WebSocket transport implementation.
- Leaf/route/gateway transport plumbing.
## Architecture
- Add an auth-facing connection-type model that can be passed through `ClientAuthContext`.
- Implement Go-style allowed connection type conversion and matching in `JwtAuthenticator`:
- normalize input to uppercase.
- retain recognized types.
- collect unknown types as non-fatal if at least one valid type remains.
- reject when only unknown types are present.
- enforce current connection type against the resulting allowed set.
- Extend connz monitoring options to parse `mqtt_client` and apply exact-match filtering before sort/pagination.
## Components
- `src/NATS.Server/Auth/IAuthenticator.cs`
- Extend `ClientAuthContext` with a connection-type value.
- `src/NATS.Server/Auth/Jwt/JwtConnectionTypes.cs` (new)
- Canonical constants for known connection types:
- `STANDARD`, `WEBSOCKET`, `LEAFNODE`, `LEAFNODE_WS`, `MQTT`, `MQTT_WS`, `INPROCESS`.
- Helper(s) for normalization and validation behavior.
- `src/NATS.Server/Auth/JwtAuthenticator.cs`
- Evaluate `userClaims.Nats?.AllowedConnectionTypes` using Go-compatible semantics.
- Enforce against current `ClientAuthContext.ConnectionType`.
- `src/NATS.Server/NatsClient.cs`
- Populate auth context connection type (currently `STANDARD`).
- `src/NATS.Server/Monitoring/Connz.cs`
- Add `MqttClient` to `ConnzOptions` with JSON field `mqtt_client`.
- `src/NATS.Server/Monitoring/ConnzHandler.cs`
- Parse `mqtt_client` query param.
- Filter connection list by exact `MqttClient` match when provided.
- `src/NATS.Server/Monitoring/ClosedClient.cs`
- Add `MqttClient` field to closed snapshots.
- `src/NATS.Server/NatsServer.cs`
- Persist `MqttClient` into `ClosedClient` snapshot (empty for now).
## Data Flow
1. Client sends `CONNECT`.
2. `NatsClient.ProcessConnectAsync` builds `ClientAuthContext` with `ConnectionType=STANDARD`.
3. `AuthService` invokes `JwtAuthenticator` for JWT-based auth.
4. `JwtAuthenticator`:
- converts `allowed_connection_types` to valid/unknown buckets.
- rejects unknown-only lists.
- enforces connection-type membership when valid list is non-empty.
5. Monitoring request `/connz`:
- `ConnzHandler.ParseQueryParams` reads `mqtt_client`.
- open/closed conn rows are materialized.
- rows are filtered on exact `MqttClient` when filter is present.
- sorting and pagination run on filtered results.
## Error Handling and Compatibility
- Auth failures remain non-throwing (`Authenticate` returns `null`).
- Unknown connection type tokens in JWT are tolerated only when at least one known allowed type remains.
- Unknown-only allowed lists are rejected to avoid unintended allow-all behavior.
- `mqtt_client` query parsing is lenient and string-based; empty filter means no filter.
- Existing JSON schema compatibility is preserved.
## Current Runtime Limitation (Explicit)
- MQTT transport is not implemented yet in this repository.
- Runtime connection type currently resolves to `STANDARD` in auth context.
- `mqtt_client` values remain empty until MQTT path populates them.
## Testing Strategy
- `tests/NATS.Server.Tests/JwtAuthenticatorTests.cs`
- allow `STANDARD` for current client context.
- reject `MQTT` for current client context.
- allow mixed known+unknown when current type is known allowed.
- reject unknown-only list.
- validate case normalization behavior.
- `tests/NATS.Server.Tests/MonitorTests.cs`
- `/connz?mqtt_client=<id>` returns matching connections only.
- `/connz?state=closed&mqtt_client=<id>` filters closed snapshots.
- non-existing ID yields empty connection set.
## Success Criteria
- JWT `allowed_connection_types` behavior matches Go semantics for known/unknown mixing and unknown-only rejection.
- `/connz` supports exact `mqtt_client` filtering for open and closed sets.
- Added tests pass.
- `differences.md` accurately reflects implemented parity.

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,22 @@
{
"planPath": "docs/plans/2026-02-23-system-account-types-plan.md",
"tasks": [
{"id": 6, "subject": "Task 1: Create ClientKind enum and extensions", "status": "pending"},
{"id": 7, "subject": "Task 2: Create INatsClient interface and implement on NatsClient", "status": "pending", "blockedBy": [6]},
{"id": 8, "subject": "Task 3: Create InternalClient class", "status": "pending", "blockedBy": [7]},
{"id": 9, "subject": "Task 4: Create event subject constants and SystemMessageHandler delegate", "status": "pending", "blockedBy": [8]},
{"id": 10, "subject": "Task 5: Create event DTO types and JSON source generator", "status": "pending", "blockedBy": [9]},
{"id": 11, "subject": "Task 6: Create InternalEventSystem with send/receive loops", "status": "pending", "blockedBy": [10]},
{"id": 12, "subject": "Task 7: Wire system event publishing (connect, disconnect, shutdown)", "status": "pending", "blockedBy": [11]},
{"id": 13, "subject": "Task 8: Add periodic stats and account connection heartbeats", "status": "pending", "blockedBy": [12]},
{"id": 14, "subject": "Task 9: Add system request-reply monitoring services", "status": "pending", "blockedBy": [13]},
{"id": 15, "subject": "Task 10: Create import/export model types", "status": "pending", "blockedBy": [8]},
{"id": 16, "subject": "Task 11: Add import/export support to Account and ACCOUNT client", "status": "pending", "blockedBy": [15]},
{"id": 17, "subject": "Task 12: Wire service import into message delivery path", "status": "pending", "blockedBy": [16]},
{"id": 18, "subject": "Task 13: Add response routing for service imports", "status": "pending", "blockedBy": [17]},
{"id": 19, "subject": "Task 14: Add latency tracking for service imports", "status": "pending", "blockedBy": [18]},
{"id": 20, "subject": "Task 15: Update differences.md", "status": "pending", "blockedBy": [19]},
{"id": 21, "subject": "Task 16: Final verification — full test suite and build", "status": "pending", "blockedBy": [20]}
],
"lastUpdated": "2026-02-23T00:00:00Z"
}

View File

@@ -1,4 +1,5 @@
using System.Collections.Concurrent;
using NATS.Server.Imports;
using NATS.Server.Subscriptions;
namespace NATS.Server.Auth;
@@ -12,6 +13,8 @@ public sealed class Account : IDisposable
public Permissions? DefaultPermissions { get; set; }
public int MaxConnections { get; set; } // 0 = unlimited
public int MaxSubscriptions { get; set; } // 0 = unlimited
public ExportMap Exports { get; } = new();
public ImportMap Imports { get; } = new();
// JWT fields
public string? Nkey { get; set; }
@@ -89,5 +92,77 @@ public sealed class Account : IDisposable
Interlocked.Add(ref _outBytes, bytes);
}
// Internal (ACCOUNT) client for import/export message routing
private InternalClient? _internalClient;
public InternalClient GetOrCreateInternalClient(ulong clientId)
{
if (_internalClient != null) return _internalClient;
_internalClient = new InternalClient(clientId, ClientKind.Account, this);
return _internalClient;
}
public void AddServiceExport(string subject, ServiceResponseType responseType, IEnumerable<Account>? approved)
{
var auth = new ExportAuth
{
ApprovedAccounts = approved != null ? new HashSet<string>(approved.Select(a => a.Name)) : null,
};
Exports.Services[subject] = new ServiceExport
{
Auth = auth,
Account = this,
ResponseType = responseType,
};
}
public void AddStreamExport(string subject, IEnumerable<Account>? approved)
{
var auth = new ExportAuth
{
ApprovedAccounts = approved != null ? new HashSet<string>(approved.Select(a => a.Name)) : null,
};
Exports.Streams[subject] = new StreamExport { Auth = auth };
}
public ServiceImport AddServiceImport(Account destination, string from, string to)
{
if (!destination.Exports.Services.TryGetValue(to, out var export))
throw new InvalidOperationException($"No service export found for '{to}' on account '{destination.Name}'");
if (!export.Auth.IsAuthorized(this))
throw new UnauthorizedAccessException($"Account '{Name}' not authorized to import '{to}' from '{destination.Name}'");
var si = new ServiceImport
{
DestinationAccount = destination,
From = from,
To = to,
Export = export,
ResponseType = export.ResponseType,
};
Imports.AddServiceImport(si);
return si;
}
public void AddStreamImport(Account source, string from, string to)
{
if (!source.Exports.Streams.TryGetValue(from, out var export))
throw new InvalidOperationException($"No stream export found for '{from}' on account '{source.Name}'");
if (!export.Auth.IsAuthorized(this))
throw new UnauthorizedAccessException($"Account '{Name}' not authorized to import '{from}' from '{source.Name}'");
var si = new StreamImport
{
SourceAccount = source,
From = from,
To = to,
};
Imports.Streams.Add(si);
}
public void Dispose() => SubList.Dispose();
}

View File

@@ -0,0 +1,22 @@
namespace NATS.Server;
/// <summary>
/// Identifies the type of a client connection.
/// Maps to Go's client kind constants in client.go:45-65.
/// </summary>
public enum ClientKind
{
Client,
Router,
Gateway,
Leaf,
System,
JetStream,
Account,
}
public static class ClientKindExtensions
{
public static bool IsInternal(this ClientKind kind) =>
kind is ClientKind.System or ClientKind.JetStream or ClientKind.Account;
}

View File

@@ -0,0 +1,12 @@
using System.Text.Json.Serialization;
namespace NATS.Server.Events;
[JsonSerializable(typeof(ConnectEventMsg))]
[JsonSerializable(typeof(DisconnectEventMsg))]
[JsonSerializable(typeof(AccountNumConns))]
[JsonSerializable(typeof(ServerStatsMsg))]
[JsonSerializable(typeof(ShutdownEventMsg))]
[JsonSerializable(typeof(LameDuckEventMsg))]
[JsonSerializable(typeof(AuthErrorEventMsg))]
internal partial class EventJsonContext : JsonSerializerContext;

View File

@@ -0,0 +1,49 @@
using NATS.Server.Auth;
using NATS.Server.Subscriptions;
namespace NATS.Server.Events;
/// <summary>
/// System event subject patterns.
/// Maps to Go events.go:41-97 subject constants.
/// </summary>
public static class EventSubjects
{
// Account-scoped events
public const string ConnectEvent = "$SYS.ACCOUNT.{0}.CONNECT";
public const string DisconnectEvent = "$SYS.ACCOUNT.{0}.DISCONNECT";
public const string AccountConnsNew = "$SYS.ACCOUNT.{0}.SERVER.CONNS";
public const string AccountConnsOld = "$SYS.SERVER.ACCOUNT.{0}.CONNS";
// Server-scoped events
public const string ServerStats = "$SYS.SERVER.{0}.STATSZ";
public const string ServerShutdown = "$SYS.SERVER.{0}.SHUTDOWN";
public const string ServerLameDuck = "$SYS.SERVER.{0}.LAMEDUCK";
public const string AuthError = "$SYS.SERVER.{0}.CLIENT.AUTH.ERR";
public const string AuthErrorAccount = "$SYS.ACCOUNT.CLIENT.AUTH.ERR";
// Request-reply subjects (server-specific)
public const string ServerReq = "$SYS.REQ.SERVER.{0}.{1}";
// Wildcard ping subjects (all servers respond)
public const string ServerPing = "$SYS.REQ.SERVER.PING.{0}";
// Account-scoped request subjects
public const string AccountReq = "$SYS.REQ.ACCOUNT.{0}.{1}";
// Inbox for responses
public const string InboxResponse = "$SYS._INBOX_.{0}";
}
/// <summary>
/// Callback signature for system message handlers.
/// Maps to Go's sysMsgHandler type in events.go:109.
/// </summary>
public delegate void SystemMessageHandler(
Subscription? sub,
INatsClient? client,
Account? account,
string subject,
string? reply,
ReadOnlyMemory<byte> headers,
ReadOnlyMemory<byte> message);

View File

@@ -0,0 +1,270 @@
using System.Text.Json.Serialization;
namespace NATS.Server.Events;
/// <summary>
/// Server identity block embedded in all system events.
/// </summary>
public sealed class EventServerInfo
{
[JsonPropertyName("name")]
public string Name { get; set; } = string.Empty;
[JsonPropertyName("host")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? Host { get; set; }
[JsonPropertyName("id")]
public string Id { get; set; } = string.Empty;
[JsonPropertyName("cluster")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? Cluster { get; set; }
[JsonPropertyName("domain")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? Domain { get; set; }
[JsonPropertyName("ver")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? Version { get; set; }
[JsonPropertyName("seq")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public ulong Seq { get; set; }
[JsonPropertyName("tags")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public Dictionary<string, string>? Tags { get; set; }
}
/// <summary>
/// Client identity block for connect/disconnect events.
/// </summary>
public sealed class EventClientInfo
{
[JsonPropertyName("start")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public DateTime Start { get; set; }
[JsonPropertyName("stop")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public DateTime Stop { get; set; }
[JsonPropertyName("host")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? Host { get; set; }
[JsonPropertyName("id")]
public ulong Id { get; set; }
[JsonPropertyName("acc")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? Account { get; set; }
[JsonPropertyName("name")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? Name { get; set; }
[JsonPropertyName("lang")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? Lang { get; set; }
[JsonPropertyName("ver")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
public string? Version { get; set; }
[JsonPropertyName("rtt")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public long RttNanos { get; set; }
}
public sealed class DataStats
{
[JsonPropertyName("msgs")]
public long Msgs { get; set; }
[JsonPropertyName("bytes")]
public long Bytes { get; set; }
}
/// <summary>Client connect advisory. Go events.go:155-160.</summary>
public sealed class ConnectEventMsg
{
public const string EventType = "io.nats.server.advisory.v1.client_connect";
[JsonPropertyName("type")]
public string Type { get; set; } = EventType;
[JsonPropertyName("id")]
public string Id { get; set; } = string.Empty;
[JsonPropertyName("timestamp")]
public DateTime Time { get; set; }
[JsonPropertyName("server")]
public EventServerInfo Server { get; set; } = new();
[JsonPropertyName("client")]
public EventClientInfo Client { get; set; } = new();
}
/// <summary>Client disconnect advisory. Go events.go:167-174.</summary>
public sealed class DisconnectEventMsg
{
public const string EventType = "io.nats.server.advisory.v1.client_disconnect";
[JsonPropertyName("type")]
public string Type { get; set; } = EventType;
[JsonPropertyName("id")]
public string Id { get; set; } = string.Empty;
[JsonPropertyName("timestamp")]
public DateTime Time { get; set; }
[JsonPropertyName("server")]
public EventServerInfo Server { get; set; } = new();
[JsonPropertyName("client")]
public EventClientInfo Client { get; set; } = new();
[JsonPropertyName("sent")]
public DataStats Sent { get; set; } = new();
[JsonPropertyName("received")]
public DataStats Received { get; set; } = new();
[JsonPropertyName("reason")]
public string Reason { get; set; } = string.Empty;
}
/// <summary>Account connection count heartbeat. Go events.go:210-214.</summary>
public sealed class AccountNumConns
{
public const string EventType = "io.nats.server.advisory.v1.account_connections";
[JsonPropertyName("type")]
public string Type { get; set; } = EventType;
[JsonPropertyName("id")]
public string Id { get; set; } = string.Empty;
[JsonPropertyName("timestamp")]
public DateTime Time { get; set; }
[JsonPropertyName("server")]
public EventServerInfo Server { get; set; } = new();
[JsonPropertyName("acc")]
public string AccountName { get; set; } = string.Empty;
[JsonPropertyName("conns")]
public int Connections { get; set; }
[JsonPropertyName("total_conns")]
public long TotalConnections { get; set; }
[JsonPropertyName("subs")]
public int Subscriptions { get; set; }
[JsonPropertyName("sent")]
public DataStats Sent { get; set; } = new();
[JsonPropertyName("received")]
public DataStats Received { get; set; } = new();
}
/// <summary>Server stats broadcast. Go events.go:150-153.</summary>
public sealed class ServerStatsMsg
{
[JsonPropertyName("server")]
public EventServerInfo Server { get; set; } = new();
[JsonPropertyName("statsz")]
public ServerStatsData Stats { get; set; } = new();
}
public sealed class ServerStatsData
{
[JsonPropertyName("start")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public DateTime Start { get; set; }
[JsonPropertyName("mem")]
public long Mem { get; set; }
[JsonPropertyName("cores")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public int Cores { get; set; }
[JsonPropertyName("connections")]
public int Connections { get; set; }
[JsonPropertyName("total_connections")]
public long TotalConnections { get; set; }
[JsonPropertyName("active_accounts")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public int ActiveAccounts { get; set; }
[JsonPropertyName("subscriptions")]
public long Subscriptions { get; set; }
[JsonPropertyName("in_msgs")]
public long InMsgs { get; set; }
[JsonPropertyName("out_msgs")]
public long OutMsgs { get; set; }
[JsonPropertyName("in_bytes")]
public long InBytes { get; set; }
[JsonPropertyName("out_bytes")]
public long OutBytes { get; set; }
[JsonPropertyName("slow_consumers")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public long SlowConsumers { get; set; }
}
/// <summary>Server shutdown notification.</summary>
public sealed class ShutdownEventMsg
{
[JsonPropertyName("server")]
public EventServerInfo Server { get; set; } = new();
[JsonPropertyName("reason")]
public string Reason { get; set; } = string.Empty;
}
/// <summary>Lame duck mode notification.</summary>
public sealed class LameDuckEventMsg
{
[JsonPropertyName("server")]
public EventServerInfo Server { get; set; } = new();
}
/// <summary>Auth error advisory.</summary>
public sealed class AuthErrorEventMsg
{
public const string EventType = "io.nats.server.advisory.v1.client_auth";
[JsonPropertyName("type")]
public string Type { get; set; } = EventType;
[JsonPropertyName("id")]
public string Id { get; set; } = string.Empty;
[JsonPropertyName("timestamp")]
public DateTime Time { get; set; }
[JsonPropertyName("server")]
public EventServerInfo Server { get; set; } = new();
[JsonPropertyName("client")]
public EventClientInfo Client { get; set; } = new();
[JsonPropertyName("reason")]
public string Reason { get; set; } = string.Empty;
}

View File

@@ -0,0 +1,333 @@
using System.Collections.Concurrent;
using System.Security.Cryptography;
using System.Text;
using System.Text.Json;
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using NATS.Server.Auth;
using NATS.Server.Subscriptions;
namespace NATS.Server.Events;
/// <summary>
/// Internal publish message queued for the send loop.
/// </summary>
public sealed class PublishMessage
{
public InternalClient? Client { get; init; }
public required string Subject { get; init; }
public string? Reply { get; init; }
public byte[]? Headers { get; init; }
public object? Body { get; init; }
public bool Echo { get; init; }
public bool IsLast { get; init; }
}
/// <summary>
/// Internal received message queued for the receive loop.
/// </summary>
public sealed class InternalSystemMessage
{
public required Subscription? Sub { get; init; }
public required INatsClient? Client { get; init; }
public required Account? Account { get; init; }
public required string Subject { get; init; }
public required string? Reply { get; init; }
public required ReadOnlyMemory<byte> Headers { get; init; }
public required ReadOnlyMemory<byte> Message { get; init; }
public required SystemMessageHandler Callback { get; init; }
}
/// <summary>
/// Manages the server's internal event system with Channel-based send/receive loops.
/// Maps to Go's internal struct in events.go:124-147 and the goroutines
/// internalSendLoop (events.go:495) and internalReceiveLoop (events.go:476).
/// </summary>
public sealed class InternalEventSystem : IAsyncDisposable
{
private readonly ILogger _logger;
private readonly Channel<PublishMessage> _sendQueue;
private readonly Channel<InternalSystemMessage> _receiveQueue;
private readonly Channel<InternalSystemMessage> _receiveQueuePings;
private readonly CancellationTokenSource _cts = new();
private Task? _sendLoop;
private Task? _receiveLoop;
private Task? _receiveLoopPings;
private NatsServer? _server;
private ulong _sequence;
private int _subscriptionId;
private readonly ConcurrentDictionary<string, SystemMessageHandler> _callbacks = new();
public Account SystemAccount { get; }
public InternalClient SystemClient { get; }
public string ServerHash { get; }
public InternalEventSystem(Account systemAccount, InternalClient systemClient, string serverName, ILogger logger)
{
_logger = logger;
SystemAccount = systemAccount;
SystemClient = systemClient;
// Hash server name for inbox routing (matches Go's shash)
ServerHash = Convert.ToHexString(SHA256.HashData(Encoding.UTF8.GetBytes(serverName)))[..8].ToLowerInvariant();
_sendQueue = Channel.CreateUnbounded<PublishMessage>(new UnboundedChannelOptions { SingleReader = true });
_receiveQueue = Channel.CreateUnbounded<InternalSystemMessage>(new UnboundedChannelOptions { SingleReader = true });
_receiveQueuePings = Channel.CreateUnbounded<InternalSystemMessage>(new UnboundedChannelOptions { SingleReader = true });
}
public void Start(NatsServer server)
{
_server = server;
var ct = _cts.Token;
_sendLoop = Task.Run(() => InternalSendLoopAsync(ct), ct);
_receiveLoop = Task.Run(() => InternalReceiveLoopAsync(_receiveQueue, ct), ct);
_receiveLoopPings = Task.Run(() => InternalReceiveLoopAsync(_receiveQueuePings, ct), ct);
// Periodic stats publish every 10 seconds
_ = Task.Run(async () =>
{
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(10));
while (await timer.WaitForNextTickAsync(ct))
{
PublishServerStats();
}
}, ct);
}
/// <summary>
/// Registers system request-reply monitoring services for this server.
/// Maps to Go's initEventTracking in events.go.
/// Sets up handlers for $SYS.REQ.SERVER.{id}.VARZ, HEALTHZ, SUBSZ, STATSZ, IDZ
/// and wildcard $SYS.REQ.SERVER.PING.* subjects.
/// </summary>
public void InitEventTracking(NatsServer server)
{
_server = server;
var serverId = server.ServerId;
// Server-specific monitoring services
RegisterService(serverId, "VARZ", server.HandleVarzRequest);
RegisterService(serverId, "HEALTHZ", server.HandleHealthzRequest);
RegisterService(serverId, "SUBSZ", server.HandleSubszRequest);
RegisterService(serverId, "STATSZ", server.HandleStatszRequest);
RegisterService(serverId, "IDZ", server.HandleIdzRequest);
// Wildcard ping services (all servers respond)
SysSubscribe(string.Format(EventSubjects.ServerPing, "VARZ"), WrapRequestHandler(server.HandleVarzRequest));
SysSubscribe(string.Format(EventSubjects.ServerPing, "HEALTHZ"), WrapRequestHandler(server.HandleHealthzRequest));
SysSubscribe(string.Format(EventSubjects.ServerPing, "IDZ"), WrapRequestHandler(server.HandleIdzRequest));
SysSubscribe(string.Format(EventSubjects.ServerPing, "STATSZ"), WrapRequestHandler(server.HandleStatszRequest));
}
private void RegisterService(string serverId, string name, Action<string, string?> handler)
{
var subject = string.Format(EventSubjects.ServerReq, serverId, name);
SysSubscribe(subject, WrapRequestHandler(handler));
}
private SystemMessageHandler WrapRequestHandler(Action<string, string?> handler)
{
return (sub, client, acc, subject, reply, hdr, msg) =>
{
handler(subject, reply);
};
}
/// <summary>
/// Publishes a $SYS.SERVER.{id}.STATSZ message with current server statistics.
/// Maps to Go's sendStatsz in events.go.
/// Can be called manually for testing or is invoked periodically by the stats timer.
/// </summary>
public void PublishServerStats()
{
if (_server == null) return;
var subject = string.Format(EventSubjects.ServerStats, _server.ServerId);
var process = System.Diagnostics.Process.GetCurrentProcess();
var statsMsg = new ServerStatsMsg
{
Server = _server.BuildEventServerInfo(),
Stats = new ServerStatsData
{
Start = _server.StartTime,
Mem = process.WorkingSet64,
Cores = Environment.ProcessorCount,
Connections = _server.ClientCount,
TotalConnections = Interlocked.Read(ref _server.Stats.TotalConnections),
Subscriptions = SystemAccount.SubList.Count,
InMsgs = Interlocked.Read(ref _server.Stats.InMsgs),
OutMsgs = Interlocked.Read(ref _server.Stats.OutMsgs),
InBytes = Interlocked.Read(ref _server.Stats.InBytes),
OutBytes = Interlocked.Read(ref _server.Stats.OutBytes),
SlowConsumers = Interlocked.Read(ref _server.Stats.SlowConsumers),
},
};
Enqueue(new PublishMessage { Subject = subject, Body = statsMsg });
}
/// <summary>
/// Creates a system subscription in the system account's SubList.
/// Maps to Go's sysSubscribe in events.go:2796.
/// </summary>
public Subscription SysSubscribe(string subject, SystemMessageHandler callback)
{
var sid = Interlocked.Increment(ref _subscriptionId).ToString();
var sub = new Subscription
{
Subject = subject,
Sid = sid,
Client = SystemClient,
};
// Store callback keyed by SID so multiple subscriptions work
_callbacks[sid] = callback;
// Set a single routing callback on the system client that dispatches by SID
SystemClient.MessageCallback = (subj, s, reply, hdr, msg) =>
{
if (_callbacks.TryGetValue(s, out var cb))
{
_receiveQueue.Writer.TryWrite(new InternalSystemMessage
{
Sub = sub,
Client = SystemClient,
Account = SystemAccount,
Subject = subj,
Reply = reply,
Headers = hdr,
Message = msg,
Callback = cb,
});
}
};
SystemAccount.SubList.Insert(sub);
return sub;
}
/// <summary>
/// Returns the next monotonically increasing sequence number for event ordering.
/// </summary>
public ulong NextSequence() => Interlocked.Increment(ref _sequence);
/// <summary>
/// Enqueue an internal message for publishing through the send loop.
/// </summary>
public void Enqueue(PublishMessage message)
{
_sendQueue.Writer.TryWrite(message);
}
/// <summary>
/// The send loop: serializes messages and delivers them via the server's routing.
/// Maps to Go's internalSendLoop in events.go:495-668.
/// </summary>
private async Task InternalSendLoopAsync(CancellationToken ct)
{
try
{
await foreach (var pm in _sendQueue.Reader.ReadAllAsync(ct))
{
try
{
var seq = Interlocked.Increment(ref _sequence);
// Serialize body to JSON
byte[] payload;
if (pm.Body is byte[] raw)
{
payload = raw;
}
else if (pm.Body != null)
{
// Try source-generated context first, fall back to reflection-based for unknown types
var bodyType = pm.Body.GetType();
var typeInfo = EventJsonContext.Default.GetTypeInfo(bodyType);
payload = typeInfo != null
? JsonSerializer.SerializeToUtf8Bytes(pm.Body, typeInfo)
: JsonSerializer.SerializeToUtf8Bytes(pm.Body, bodyType);
}
else
{
payload = [];
}
// Deliver via the system account's SubList matching
var result = SystemAccount.SubList.Match(pm.Subject);
foreach (var sub in result.PlainSubs)
{
sub.Client?.SendMessage(pm.Subject, sub.Sid, pm.Reply,
pm.Headers ?? ReadOnlyMemory<byte>.Empty,
payload);
}
foreach (var queueGroup in result.QueueSubs)
{
if (queueGroup.Length == 0) continue;
var sub = queueGroup[0]; // Simple pick for internal
sub.Client?.SendMessage(pm.Subject, sub.Sid, pm.Reply,
pm.Headers ?? ReadOnlyMemory<byte>.Empty,
payload);
}
if (pm.IsLast)
break;
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Error in internal send loop processing message on {Subject}", pm.Subject);
}
}
}
catch (OperationCanceledException)
{
// Normal shutdown
}
}
/// <summary>
/// The receive loop: dispatches callbacks for internally-received messages.
/// Maps to Go's internalReceiveLoop in events.go:476-491.
/// </summary>
private async Task InternalReceiveLoopAsync(Channel<InternalSystemMessage> queue, CancellationToken ct)
{
try
{
await foreach (var msg in queue.Reader.ReadAllAsync(ct))
{
try
{
msg.Callback(msg.Sub, msg.Client, msg.Account, msg.Subject, msg.Reply, msg.Headers, msg.Message);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Error in internal receive loop processing {Subject}", msg.Subject);
}
}
}
catch (OperationCanceledException)
{
// Normal shutdown
}
}
public async ValueTask DisposeAsync()
{
await _cts.CancelAsync();
_sendQueue.Writer.TryComplete();
_receiveQueue.Writer.TryComplete();
_receiveQueuePings.Writer.TryComplete();
if (_sendLoop != null) await _sendLoop.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
if (_receiveLoop != null) await _receiveLoop.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
if (_receiveLoopPings != null) await _receiveLoopPings.WaitAsync(TimeSpan.FromSeconds(2)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
_cts.Dispose();
}
}

View File

@@ -0,0 +1,19 @@
using NATS.Server.Auth;
using NATS.Server.Protocol;
namespace NATS.Server;
public interface INatsClient
{
ulong Id { get; }
ClientKind Kind { get; }
bool IsInternal => Kind.IsInternal();
Account? Account { get; }
ClientOptions? ClientOpts { get; }
ClientPermissions? Permissions { get; }
void SendMessage(string subject, string sid, string? replyTo,
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload);
bool QueueOutbound(ReadOnlyMemory<byte> data);
void RemoveSubscription(string sid);
}

View File

@@ -0,0 +1,25 @@
using NATS.Server.Auth;
namespace NATS.Server.Imports;
public sealed class ExportAuth
{
public bool TokenRequired { get; init; }
public uint AccountPosition { get; init; }
public HashSet<string>? ApprovedAccounts { get; init; }
public Dictionary<string, long>? RevokedAccounts { get; init; }
public bool IsAuthorized(Account account)
{
if (RevokedAccounts != null && RevokedAccounts.ContainsKey(account.Name))
return false;
if (ApprovedAccounts == null && !TokenRequired && AccountPosition == 0)
return true;
if (ApprovedAccounts != null)
return ApprovedAccounts.Contains(account.Name);
return false;
}
}

View File

@@ -0,0 +1,8 @@
namespace NATS.Server.Imports;
public sealed class ExportMap
{
public Dictionary<string, StreamExport> Streams { get; } = new(StringComparer.Ordinal);
public Dictionary<string, ServiceExport> Services { get; } = new(StringComparer.Ordinal);
public Dictionary<string, ServiceImport> Responses { get; } = new(StringComparer.Ordinal);
}

View File

@@ -0,0 +1,18 @@
namespace NATS.Server.Imports;
public sealed class ImportMap
{
public List<StreamImport> Streams { get; } = [];
public Dictionary<string, List<ServiceImport>> Services { get; } = new(StringComparer.Ordinal);
public void AddServiceImport(ServiceImport si)
{
if (!Services.TryGetValue(si.From, out var list))
{
list = [];
Services[si.From] = list;
}
list.Add(si);
}
}

View File

@@ -0,0 +1,47 @@
using System.Text.Json.Serialization;
namespace NATS.Server.Imports;
public sealed class ServiceLatencyMsg
{
[JsonPropertyName("type")]
public string Type { get; set; } = "io.nats.server.metric.v1.service_latency";
[JsonPropertyName("requestor")]
public string Requestor { get; set; } = string.Empty;
[JsonPropertyName("responder")]
public string Responder { get; set; } = string.Empty;
[JsonPropertyName("status")]
public int Status { get; set; } = 200;
[JsonPropertyName("svc_latency")]
public long ServiceLatencyNanos { get; set; }
[JsonPropertyName("total_latency")]
public long TotalLatencyNanos { get; set; }
}
public static class LatencyTracker
{
public static bool ShouldSample(ServiceLatency latency)
{
if (latency.SamplingPercentage <= 0) return false;
if (latency.SamplingPercentage >= 100) return true;
return Random.Shared.Next(100) < latency.SamplingPercentage;
}
public static ServiceLatencyMsg BuildLatencyMsg(
string requestor, string responder,
TimeSpan serviceLatency, TimeSpan totalLatency)
{
return new ServiceLatencyMsg
{
Requestor = requestor,
Responder = responder,
ServiceLatencyNanos = serviceLatency.Ticks * 100,
TotalLatencyNanos = totalLatency.Ticks * 100,
};
}
}

View File

@@ -0,0 +1,64 @@
using System.Security.Cryptography;
using NATS.Server.Auth;
namespace NATS.Server.Imports;
/// <summary>
/// Handles response routing for service imports.
/// Maps to Go's service reply prefix generation and response cleanup.
/// Reference: golang/nats-server/server/accounts.go — addRespServiceImport, removeRespServiceImport
/// </summary>
public static class ResponseRouter
{
private static readonly char[] Base62 = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789".ToCharArray();
/// <summary>
/// Generates a unique reply prefix for response routing.
/// Format: "_R_.{10 random base62 chars}."
/// </summary>
public static string GenerateReplyPrefix()
{
Span<byte> bytes = stackalloc byte[10];
RandomNumberGenerator.Fill(bytes);
var chars = new char[10];
for (int i = 0; i < 10; i++)
chars[i] = Base62[bytes[i] % 62];
return $"_R_.{new string(chars)}.";
}
/// <summary>
/// Creates a response service import that maps the generated reply prefix
/// back to the original reply subject on the requesting account.
/// </summary>
public static ServiceImport CreateResponseImport(
Account exporterAccount,
ServiceImport originalImport,
string originalReply)
{
var replyPrefix = GenerateReplyPrefix();
var responseSi = new ServiceImport
{
DestinationAccount = exporterAccount,
From = replyPrefix + ">",
To = originalReply,
IsResponse = true,
ResponseType = originalImport.ResponseType,
Export = originalImport.Export,
TimestampTicks = DateTime.UtcNow.Ticks,
};
exporterAccount.Exports.Responses[replyPrefix] = responseSi;
return responseSi;
}
/// <summary>
/// Removes a response import from the account's export map.
/// For Singleton responses, this is called after the first reply is delivered.
/// For Streamed/Chunked, it is called when the response stream ends.
/// </summary>
public static void CleanupResponse(Account account, string replyPrefix, ServiceImport responseSi)
{
account.Exports.Responses.Remove(replyPrefix);
}
}

View File

@@ -0,0 +1,13 @@
using NATS.Server.Auth;
namespace NATS.Server.Imports;
public sealed class ServiceExport
{
public ExportAuth Auth { get; init; } = new();
public Account? Account { get; init; }
public ServiceResponseType ResponseType { get; init; } = ServiceResponseType.Singleton;
public TimeSpan ResponseThreshold { get; init; } = TimeSpan.FromMinutes(2);
public ServiceLatency? Latency { get; init; }
public bool AllowTrace { get; init; }
}

View File

@@ -0,0 +1,21 @@
using NATS.Server.Auth;
using NATS.Server.Subscriptions;
namespace NATS.Server.Imports;
public sealed class ServiceImport
{
public required Account DestinationAccount { get; init; }
public required string From { get; init; }
public required string To { get; init; }
public SubjectTransform? Transform { get; init; }
public ServiceExport? Export { get; init; }
public ServiceResponseType ResponseType { get; init; } = ServiceResponseType.Singleton;
public byte[]? Sid { get; set; }
public bool IsResponse { get; init; }
public bool UsePub { get; init; }
public bool Invalid { get; set; }
public bool Share { get; init; }
public bool Tracking { get; init; }
public long TimestampTicks { get; set; }
}

View File

@@ -0,0 +1,7 @@
namespace NATS.Server.Imports;
public sealed class ServiceLatency
{
public int SamplingPercentage { get; init; } = 100;
public string Subject { get; init; } = string.Empty;
}

View File

@@ -0,0 +1,8 @@
namespace NATS.Server.Imports;
public enum ServiceResponseType
{
Singleton,
Streamed,
Chunked,
}

View File

@@ -0,0 +1,6 @@
namespace NATS.Server.Imports;
public sealed class StreamExport
{
public ExportAuth Auth { get; init; } = new();
}

View File

@@ -0,0 +1,14 @@
using NATS.Server.Auth;
using NATS.Server.Subscriptions;
namespace NATS.Server.Imports;
public sealed class StreamImport
{
public required Account SourceAccount { get; init; }
public required string From { get; init; }
public required string To { get; init; }
public SubjectTransform? Transform { get; init; }
public bool UsePub { get; init; }
public bool Invalid { get; set; }
}

View File

@@ -0,0 +1,59 @@
using NATS.Server.Auth;
using NATS.Server.Protocol;
using NATS.Server.Subscriptions;
namespace NATS.Server;
/// <summary>
/// Lightweight socketless client for internal messaging (SYSTEM, ACCOUNT, JETSTREAM).
/// Maps to Go's internal client created by createInternalClient() in server.go:1910-1936.
/// No network I/O — messages are delivered via callback.
/// </summary>
public sealed class InternalClient : INatsClient
{
public ulong Id { get; }
public ClientKind Kind { get; }
public bool IsInternal => Kind.IsInternal();
public Account? Account { get; }
public ClientOptions? ClientOpts => null;
public ClientPermissions? Permissions => null;
/// <summary>
/// Callback invoked when a message is delivered to this internal client.
/// Set by the event system or account import infrastructure.
/// </summary>
public Action<string, string, string?, ReadOnlyMemory<byte>, ReadOnlyMemory<byte>>? MessageCallback { get; set; }
private readonly Dictionary<string, Subscription> _subs = new(StringComparer.Ordinal);
public InternalClient(ulong id, ClientKind kind, Account account)
{
if (!kind.IsInternal())
throw new ArgumentException($"InternalClient requires an internal ClientKind, got {kind}", nameof(kind));
Id = id;
Kind = kind;
Account = account;
}
public void SendMessage(string subject, string sid, string? replyTo,
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload)
{
MessageCallback?.Invoke(subject, sid, replyTo, headers, payload);
}
public bool QueueOutbound(ReadOnlyMemory<byte> data) => true; // no-op for internal clients
public void RemoveSubscription(string sid)
{
if (_subs.Remove(sid))
Account?.DecrementSubscriptions();
}
public void AddSubscription(Subscription sub)
{
_subs[sub.Sid] = sub;
}
public IReadOnlyDictionary<string, Subscription> Subscriptions => _subs;
}

View File

@@ -14,12 +14,16 @@ public sealed class SubszHandler(NatsServer server)
var opts = ParseQueryParams(ctx);
var now = DateTime.UtcNow;
// Collect subscriptions from all accounts (or filtered)
// Collect subscriptions from all accounts (or filtered).
// Exclude the $SYS system account unless explicitly requested — its internal
// subscriptions are infrastructure and not user-facing.
var allSubs = new List<Subscription>();
foreach (var account in server.GetAccounts())
{
if (!string.IsNullOrEmpty(opts.Account) && account.Name != opts.Account)
continue;
if (string.IsNullOrEmpty(opts.Account) && account.Name == "$SYS")
continue;
allSubs.AddRange(account.SubList.GetAllSubscriptions());
}
@@ -31,10 +35,10 @@ public sealed class SubszHandler(NatsServer server)
var total = allSubs.Count;
var numSubs = server.GetAccounts()
.Where(a => string.IsNullOrEmpty(opts.Account) || a.Name == opts.Account)
.Where(a => (string.IsNullOrEmpty(opts.Account) && a.Name != "$SYS") || a.Name == opts.Account)
.Aggregate(0u, (sum, a) => sum + a.SubList.Count);
var numCache = server.GetAccounts()
.Where(a => string.IsNullOrEmpty(opts.Account) || a.Name == opts.Account)
.Where(a => (string.IsNullOrEmpty(opts.Account) && a.Name != "$SYS") || a.Name == opts.Account)
.Sum(a => a.SubList.CacheCount);
SubDetail[] details = [];

View File

@@ -1,4 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<ItemGroup>
<InternalsVisibleTo Include="NATS.Server.Tests" />
</ItemGroup>
<ItemGroup>
<FrameworkReference Include="Microsoft.AspNetCore.App" />
<PackageReference Include="NATS.NKeys" />

View File

@@ -19,6 +19,8 @@ public interface IMessageRouter
void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory<byte> headers,
ReadOnlyMemory<byte> payload, NatsClient sender);
void RemoveClient(NatsClient client);
void PublishConnectEvent(NatsClient client);
void PublishDisconnectEvent(NatsClient client);
}
public interface ISubListAccess
@@ -26,7 +28,7 @@ public interface ISubListAccess
SubList SubList { get; }
}
public sealed class NatsClient : IDisposable
public sealed class NatsClient : INatsClient, IDisposable
{
private readonly Socket _socket;
private readonly Stream _stream;
@@ -45,6 +47,7 @@ public sealed class NatsClient : IDisposable
private readonly ServerStats _serverStats;
public ulong Id { get; }
public ClientKind Kind => ClientKind.Client;
public ClientOptions? ClientOpts { get; private set; }
public IMessageRouter? Router { get; set; }
public Account? Account { get; private set; }
@@ -444,6 +447,9 @@ public sealed class NatsClient : IDisposable
_flags.SetFlag(ClientFlags.ConnectProcessFinished);
_logger.LogDebug("CONNECT received from client {ClientId}, name={ClientName}", Id, ClientOpts?.Name);
// Publish connect advisory to the system event bus
Router?.PublishConnectEvent(this);
// Start auth expiry timer if needed
if (_authService.IsAuthRequired && authResult?.Expiry is { } expiry)
{

View File

@@ -9,6 +9,8 @@ using Microsoft.Extensions.Logging;
using NATS.NKeys;
using NATS.Server.Auth;
using NATS.Server.Configuration;
using NATS.Server.Events;
using NATS.Server.Imports;
using NATS.Server.Monitoring;
using NATS.Server.Protocol;
using NATS.Server.Subscriptions;
@@ -35,6 +37,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
private string? _configDigest;
private readonly Account _globalAccount;
private readonly Account _systemAccount;
private InternalEventSystem? _eventSystem;
private readonly SslServerAuthenticationOptions? _sslOptions;
private readonly TlsRateLimiter? _tlsRateLimiter;
private readonly SubjectTransform[] _subjectTransforms;
@@ -70,6 +73,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
public int Port => _options.Port;
public Account SystemAccount => _systemAccount;
public string ServerNKey { get; }
public InternalEventSystem? EventSystem => _eventSystem;
public bool IsShuttingDown => Volatile.Read(ref _shutdown) != 0;
public bool IsLameDuckMode => Volatile.Read(ref _lameDuck) != 0;
public Action? ReOpenLogFile { get; set; }
@@ -90,6 +94,21 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
_logger.LogInformation("Initiating Shutdown...");
// Publish shutdown advisory before tearing down the event system
if (_eventSystem != null)
{
var shutdownSubject = string.Format(EventSubjects.ServerShutdown, _serverInfo.ServerId);
_eventSystem.Enqueue(new PublishMessage
{
Subject = shutdownSubject,
Body = new ShutdownEventMsg { Server = BuildEventServerInfo(), Reason = "Server Shutdown" },
IsLast = true,
});
// Give the send loop time to process the shutdown event
await Task.Delay(100);
await _eventSystem.DisposeAsync();
}
// Signal all internal loops to stop
await _quitCts.CancelAsync();
@@ -265,6 +284,14 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
_systemAccount = new Account("$SYS");
_accounts["$SYS"] = _systemAccount;
// Create system internal client and event system
var sysClientId = Interlocked.Increment(ref _nextClientId);
var sysClient = new InternalClient(sysClientId, ClientKind.System, _systemAccount);
_eventSystem = new InternalEventSystem(
_systemAccount, sysClient,
options.ServerName ?? $"nats-dotnet-{Environment.MachineName}",
_loggerFactory.CreateLogger<InternalEventSystem>());
// Generate Ed25519 server NKey identity
using var serverKeyPair = KeyPair.CreatePair(PrefixByte.Server);
ServerNKey = serverKeyPair.GetPublicKey();
@@ -371,6 +398,9 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
_listeningStarted.TrySetResult();
_eventSystem?.Start(this);
_eventSystem?.InitEventTracking(this);
_logger.LogInformation("Listening for client connections on {Host}:{Port}", _options.Host, _options.Port);
// Warn about stub features
@@ -602,6 +632,27 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
}
}
// Check for service imports that match this subject.
// When a client in the importer account publishes to a subject
// that matches a service import "From" pattern, we forward the
// message to the destination (exporter) account's subscribers
// using the mapped "To" subject.
if (sender.Account != null)
{
foreach (var kvp in sender.Account.Imports.Services)
{
foreach (var si in kvp.Value)
{
if (si.Invalid) continue;
if (SubjectMatch.MatchLiteral(subject, si.From))
{
ProcessServiceImport(si, subject, replyTo, headers, payload);
delivered = true;
}
}
}
}
// No-responders: if nobody received the message and the publisher
// opted in, send back a 503 status HMSG on the reply subject.
if (!delivered && replyTo != null && sender.ClientOpts?.NoResponders == true)
@@ -641,6 +692,153 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
}
}
/// <summary>
/// Processes a service import by transforming the subject from the importer's
/// subject space to the exporter's subject space, then delivering to matching
/// subscribers in the destination account.
/// Reference: Go server/accounts.go addServiceImport / processServiceImport.
/// </summary>
public void ProcessServiceImport(ServiceImport si, string subject, string? replyTo,
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload)
{
if (si.Invalid) return;
// Transform subject: map from importer subject space to exporter subject space
string targetSubject;
if (si.Transform != null)
{
var transformed = si.Transform.Apply(subject);
targetSubject = transformed ?? si.To;
}
else if (si.UsePub)
{
targetSubject = subject;
}
else
{
// Default: use the "To" subject from the import definition.
// For wildcard imports (e.g. "requests.>" -> "api.>"), we need
// to map the specific subject tokens from the source pattern to
// the destination pattern.
targetSubject = MapImportSubject(subject, si.From, si.To);
}
// Match against destination account's SubList
var destSubList = si.DestinationAccount.SubList;
var result = destSubList.Match(targetSubject);
// Deliver to plain subscribers in the destination account
foreach (var sub in result.PlainSubs)
{
if (sub.Client == null) continue;
DeliverMessage(sub, targetSubject, replyTo, headers, payload);
}
// Deliver to one member of each queue group
foreach (var queueGroup in result.QueueSubs)
{
if (queueGroup.Length == 0) continue;
var sub = queueGroup[0]; // Simple selection: first available
if (sub.Client != null)
DeliverMessage(sub, targetSubject, replyTo, headers, payload);
}
}
/// <summary>
/// Maps a published subject from the import "From" pattern to the "To" pattern.
/// For example, if From="requests.>" and To="api.>" and subject="requests.test",
/// this returns "api.test".
/// </summary>
private static string MapImportSubject(string subject, string fromPattern, string toPattern)
{
// If "To" doesn't contain wildcards, use it directly
if (SubjectMatch.IsLiteral(toPattern))
return toPattern;
// For wildcard patterns, replace matching wildcard segments.
// Split into tokens and map from source to destination.
var subTokens = subject.Split('.');
var fromTokens = fromPattern.Split('.');
var toTokens = toPattern.Split('.');
var result = new string[toTokens.Length];
int subIdx = 0;
// Build a mapping: for each wildcard position in "from",
// capture the corresponding subject token(s)
var wildcardValues = new List<string>();
string? fwcValue = null;
for (int i = 0; i < fromTokens.Length && subIdx < subTokens.Length; i++)
{
if (fromTokens[i] == "*")
{
wildcardValues.Add(subTokens[subIdx]);
subIdx++;
}
else if (fromTokens[i] == ">")
{
// Capture all remaining tokens
fwcValue = string.Join(".", subTokens[subIdx..]);
subIdx = subTokens.Length;
}
else
{
subIdx++; // Skip literal match
}
}
// Now build the output using the "to" pattern
int wcIdx = 0;
var sb = new StringBuilder();
for (int i = 0; i < toTokens.Length; i++)
{
if (i > 0) sb.Append('.');
if (toTokens[i] == "*")
{
sb.Append(wcIdx < wildcardValues.Count ? wildcardValues[wcIdx] : "*");
wcIdx++;
}
else if (toTokens[i] == ">")
{
sb.Append(fwcValue ?? ">");
}
else
{
sb.Append(toTokens[i]);
}
}
return sb.ToString();
}
/// <summary>
/// Wires service import subscriptions for an account. Creates marker
/// subscriptions in the account's SubList so that the import paths
/// are tracked. The actual forwarding happens in ProcessMessage when
/// it checks the account's Imports.Services.
/// Reference: Go server/accounts.go addServiceImportSub.
/// </summary>
public void WireServiceImports(Account account)
{
foreach (var kvp in account.Imports.Services)
{
foreach (var si in kvp.Value)
{
if (si.Invalid) continue;
// Create a marker subscription in the importer account.
// This subscription doesn't directly deliver messages;
// the ProcessMessage method checks service imports after
// the regular SubList match.
_logger.LogDebug(
"Wired service import for account {Account}: {From} -> {To} (dest: {DestAccount})",
account.Name, si.From, si.To, si.DestinationAccount.Name);
}
}
}
private static void SendNoResponders(NatsClient sender, string replyTo)
{
// Find the sid for a subscription matching the reply subject
@@ -686,8 +884,194 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
});
}
public void SendInternalMsg(string subject, string? reply, object? msg)
{
_eventSystem?.Enqueue(new PublishMessage { Subject = subject, Reply = reply, Body = msg });
}
public void SendInternalAccountMsg(Account account, string subject, object? msg)
{
_eventSystem?.Enqueue(new PublishMessage { Subject = subject, Body = msg });
}
/// <summary>
/// Handles $SYS.REQ.SERVER.{id}.VARZ requests.
/// Returns core server information including stats counters.
/// </summary>
public void HandleVarzRequest(string subject, string? reply)
{
if (reply == null) return;
var varz = new
{
server_id = _serverInfo.ServerId,
server_name = _serverInfo.ServerName,
version = NatsProtocol.Version,
host = _options.Host,
port = _options.Port,
max_payload = _options.MaxPayload,
connections = ClientCount,
total_connections = Interlocked.Read(ref _stats.TotalConnections),
in_msgs = Interlocked.Read(ref _stats.InMsgs),
out_msgs = Interlocked.Read(ref _stats.OutMsgs),
in_bytes = Interlocked.Read(ref _stats.InBytes),
out_bytes = Interlocked.Read(ref _stats.OutBytes),
};
SendInternalMsg(reply, null, varz);
}
/// <summary>
/// Handles $SYS.REQ.SERVER.{id}.HEALTHZ requests.
/// Returns a simple health status response.
/// </summary>
public void HandleHealthzRequest(string subject, string? reply)
{
if (reply == null) return;
SendInternalMsg(reply, null, new { status = "ok" });
}
/// <summary>
/// Handles $SYS.REQ.SERVER.{id}.SUBSZ requests.
/// Returns the current subscription count.
/// </summary>
public void HandleSubszRequest(string subject, string? reply)
{
if (reply == null) return;
SendInternalMsg(reply, null, new { num_subscriptions = SubList.Count });
}
/// <summary>
/// Handles $SYS.REQ.SERVER.{id}.STATSZ requests.
/// Publishes current server statistics through the event system.
/// </summary>
public void HandleStatszRequest(string subject, string? reply)
{
if (reply == null) return;
var process = System.Diagnostics.Process.GetCurrentProcess();
var statsMsg = new Events.ServerStatsMsg
{
Server = BuildEventServerInfo(),
Stats = new Events.ServerStatsData
{
Start = StartTime,
Mem = process.WorkingSet64,
Cores = Environment.ProcessorCount,
Connections = ClientCount,
TotalConnections = Interlocked.Read(ref _stats.TotalConnections),
Subscriptions = SubList.Count,
InMsgs = Interlocked.Read(ref _stats.InMsgs),
OutMsgs = Interlocked.Read(ref _stats.OutMsgs),
InBytes = Interlocked.Read(ref _stats.InBytes),
OutBytes = Interlocked.Read(ref _stats.OutBytes),
SlowConsumers = Interlocked.Read(ref _stats.SlowConsumers),
},
};
SendInternalMsg(reply, null, statsMsg);
}
/// <summary>
/// Handles $SYS.REQ.SERVER.{id}.IDZ requests.
/// Returns basic server identity information.
/// </summary>
public void HandleIdzRequest(string subject, string? reply)
{
if (reply == null) return;
var idz = new
{
server_id = _serverInfo.ServerId,
server_name = _serverInfo.ServerName,
version = NatsProtocol.Version,
host = _options.Host,
port = _options.Port,
};
SendInternalMsg(reply, null, idz);
}
/// <summary>
/// Builds an EventServerInfo block for embedding in system event messages.
/// Maps to Go's serverInfo() helper used in events.go advisory publishing.
/// </summary>
public EventServerInfo BuildEventServerInfo()
{
var seq = _eventSystem?.NextSequence() ?? 0;
return new EventServerInfo
{
Name = _serverInfo.ServerName,
Host = _options.Host,
Id = _serverInfo.ServerId,
Version = NatsProtocol.Version,
Seq = seq,
};
}
private static EventClientInfo BuildEventClientInfo(NatsClient client)
{
return new EventClientInfo
{
Id = client.Id,
Host = client.RemoteIp,
Account = client.Account?.Name,
Name = client.ClientOpts?.Name,
Lang = client.ClientOpts?.Lang,
Version = client.ClientOpts?.Version,
Start = client.StartTime,
};
}
/// <summary>
/// Publishes a $SYS.ACCOUNT.{account}.CONNECT advisory when a client
/// completes authentication. Maps to Go's sendConnectEvent in events.go.
/// </summary>
public void PublishConnectEvent(NatsClient client)
{
if (_eventSystem == null) return;
var accountName = client.Account?.Name ?? Account.GlobalAccountName;
var subject = string.Format(EventSubjects.ConnectEvent, accountName);
var evt = new ConnectEventMsg
{
Id = Guid.NewGuid().ToString("N"),
Time = DateTime.UtcNow,
Server = BuildEventServerInfo(),
Client = BuildEventClientInfo(client),
};
SendInternalMsg(subject, null, evt);
}
/// <summary>
/// Publishes a $SYS.ACCOUNT.{account}.DISCONNECT advisory when a client
/// disconnects. Maps to Go's sendDisconnectEvent in events.go.
/// </summary>
public void PublishDisconnectEvent(NatsClient client)
{
if (_eventSystem == null) return;
var accountName = client.Account?.Name ?? Account.GlobalAccountName;
var subject = string.Format(EventSubjects.DisconnectEvent, accountName);
var evt = new DisconnectEventMsg
{
Id = Guid.NewGuid().ToString("N"),
Time = DateTime.UtcNow,
Server = BuildEventServerInfo(),
Client = BuildEventClientInfo(client),
Sent = new DataStats
{
Msgs = Interlocked.Read(ref client.OutMsgs),
Bytes = Interlocked.Read(ref client.OutBytes),
},
Received = new DataStats
{
Msgs = Interlocked.Read(ref client.InMsgs),
Bytes = Interlocked.Read(ref client.InBytes),
},
Reason = client.CloseReason.ToReasonString(),
};
SendInternalMsg(subject, null, evt);
}
public void RemoveClient(NatsClient client)
{
// Publish disconnect advisory before removing client state
if (client.ConnectReceived)
PublishDisconnectEvent(client);
_clients.TryRemove(client.Id, out _);
_logger.LogDebug("Removed client {ClientId}", client.Id);

View File

@@ -1,4 +1,5 @@
using NATS.Server;
using NATS.Server.Imports;
namespace NATS.Server.Subscriptions;
@@ -9,5 +10,7 @@ public sealed class Subscription
public required string Sid { get; init; }
public long MessageCount; // Interlocked
public long MaxMessages; // 0 = unlimited
public NatsClient? Client { get; set; }
public INatsClient? Client { get; set; }
public ServiceImport? ServiceImport { get; set; }
public StreamImport? StreamImport { get; set; }
}

View File

@@ -0,0 +1,121 @@
using System.Text.Json;
using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server.Events;
namespace NATS.Server.Tests;
public class EventSystemTests
{
[Fact]
public void ConnectEventMsg_serializes_with_correct_type()
{
var evt = new ConnectEventMsg
{
Type = ConnectEventMsg.EventType,
Id = "test123",
Time = new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc),
Server = new EventServerInfo { Name = "test-server", Id = "SRV1" },
Client = new EventClientInfo { Id = 1, Account = "$G" },
};
var json = JsonSerializer.Serialize(evt, EventJsonContext.Default.ConnectEventMsg);
json.ShouldContain("\"type\":\"io.nats.server.advisory.v1.client_connect\"");
json.ShouldContain("\"server\":");
json.ShouldContain("\"client\":");
}
[Fact]
public void DisconnectEventMsg_serializes_with_reason()
{
var evt = new DisconnectEventMsg
{
Type = DisconnectEventMsg.EventType,
Id = "test456",
Time = DateTime.UtcNow,
Server = new EventServerInfo { Name = "test-server", Id = "SRV1" },
Client = new EventClientInfo { Id = 2, Account = "myacc" },
Reason = "Client Closed",
Sent = new DataStats { Msgs = 10, Bytes = 1024 },
Received = new DataStats { Msgs = 5, Bytes = 512 },
};
var json = JsonSerializer.Serialize(evt, EventJsonContext.Default.DisconnectEventMsg);
json.ShouldContain("\"reason\":\"Client Closed\"");
}
[Fact]
public void ServerStatsMsg_serializes()
{
var evt = new ServerStatsMsg
{
Server = new EventServerInfo { Name = "srv1", Id = "ABC" },
Stats = new ServerStatsData
{
Connections = 10,
TotalConnections = 100,
InMsgs = 5000,
OutMsgs = 4500,
InBytes = 1_000_000,
OutBytes = 900_000,
Mem = 50 * 1024 * 1024,
Subscriptions = 42,
},
};
var json = JsonSerializer.Serialize(evt, EventJsonContext.Default.ServerStatsMsg);
json.ShouldContain("\"connections\":10");
json.ShouldContain("\"in_msgs\":5000");
}
[Fact]
public async Task InternalEventSystem_start_and_stop_lifecycle()
{
using var server = CreateTestServer();
_ = server.StartAsync(CancellationToken.None);
await server.WaitForReadyAsync();
var eventSystem = server.EventSystem;
eventSystem.ShouldNotBeNull();
eventSystem.SystemClient.ShouldNotBeNull();
eventSystem.SystemClient.Kind.ShouldBe(ClientKind.System);
await server.ShutdownAsync();
}
[Fact]
public async Task SendInternalMsg_delivers_to_system_subscriber()
{
using var server = CreateTestServer();
_ = server.StartAsync(CancellationToken.None);
await server.WaitForReadyAsync();
var received = new TaskCompletionSource<string>();
server.EventSystem!.SysSubscribe("test.subject", (sub, client, acc, subject, reply, hdr, msg) =>
{
received.TrySetResult(subject);
});
server.SendInternalMsg("test.subject", null, new { Value = "hello" });
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
result.ShouldBe("test.subject");
await server.ShutdownAsync();
}
private static NatsServer CreateTestServer()
{
var port = GetFreePort();
return new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance);
}
private static int GetFreePort()
{
using var sock = new System.Net.Sockets.Socket(
System.Net.Sockets.AddressFamily.InterNetwork,
System.Net.Sockets.SocketType.Stream,
System.Net.Sockets.ProtocolType.Tcp);
sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0));
return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port;
}
}

View File

@@ -0,0 +1,338 @@
using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server;
using NATS.Server.Auth;
using NATS.Server.Imports;
using NATS.Server.Subscriptions;
namespace NATS.Server.Tests;
public class ImportExportTests
{
[Fact]
public void ExportAuth_public_export_authorizes_any_account()
{
var auth = new ExportAuth();
var account = new Account("test");
auth.IsAuthorized(account).ShouldBeTrue();
}
[Fact]
public void ExportAuth_approved_accounts_restricts_access()
{
var auth = new ExportAuth { ApprovedAccounts = ["allowed"] };
var allowed = new Account("allowed");
var denied = new Account("denied");
auth.IsAuthorized(allowed).ShouldBeTrue();
auth.IsAuthorized(denied).ShouldBeFalse();
}
[Fact]
public void ExportAuth_revoked_account_denied()
{
var auth = new ExportAuth
{
ApprovedAccounts = ["test"],
RevokedAccounts = new() { ["test"] = DateTimeOffset.UtcNow.ToUnixTimeSeconds() },
};
var account = new Account("test");
auth.IsAuthorized(account).ShouldBeFalse();
}
[Fact]
public void ServiceResponseType_defaults_to_singleton()
{
var import = new ServiceImport
{
DestinationAccount = new Account("dest"),
From = "requests.>",
To = "api.>",
};
import.ResponseType.ShouldBe(ServiceResponseType.Singleton);
}
[Fact]
public void ExportMap_stores_and_retrieves_exports()
{
var map = new ExportMap();
map.Services["api.>"] = new ServiceExport { Account = new Account("svc") };
map.Streams["events.>"] = new StreamExport();
map.Services.ShouldContainKey("api.>");
map.Streams.ShouldContainKey("events.>");
}
[Fact]
public void ImportMap_stores_service_imports()
{
var map = new ImportMap();
var si = new ServiceImport
{
DestinationAccount = new Account("dest"),
From = "requests.>",
To = "api.>",
};
map.AddServiceImport(si);
map.Services.ShouldContainKey("requests.>");
map.Services["requests.>"].Count.ShouldBe(1);
}
[Fact]
public void Account_add_service_export_and_import()
{
var exporter = new Account("exporter");
var importer = new Account("importer");
exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null);
exporter.Exports.Services.ShouldContainKey("api.>");
var si = importer.AddServiceImport(exporter, "requests.>", "api.>");
si.ShouldNotBeNull();
si.From.ShouldBe("requests.>");
si.To.ShouldBe("api.>");
si.DestinationAccount.ShouldBe(exporter);
importer.Imports.Services.ShouldContainKey("requests.>");
}
[Fact]
public void Account_add_stream_export_and_import()
{
var exporter = new Account("exporter");
var importer = new Account("importer");
exporter.AddStreamExport("events.>", null);
exporter.Exports.Streams.ShouldContainKey("events.>");
importer.AddStreamImport(exporter, "events.>", "imported.events.>");
importer.Imports.Streams.Count.ShouldBe(1);
importer.Imports.Streams[0].From.ShouldBe("events.>");
importer.Imports.Streams[0].To.ShouldBe("imported.events.>");
}
[Fact]
public void Account_service_import_auth_rejected()
{
var exporter = new Account("exporter");
var importer = new Account("importer");
exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, [new Account("other")]);
Should.Throw<UnauthorizedAccessException>(() =>
importer.AddServiceImport(exporter, "requests.>", "api.>"));
}
[Fact]
public void Account_lazy_creates_internal_client()
{
var account = new Account("test");
var client = account.GetOrCreateInternalClient(99);
client.ShouldNotBeNull();
client.Kind.ShouldBe(ClientKind.Account);
client.Account.ShouldBe(account);
// Second call returns same instance
var client2 = account.GetOrCreateInternalClient(100);
client2.ShouldBeSameAs(client);
}
[Fact]
public async Task Service_import_forwards_message_to_export_account()
{
using var server = CreateTestServer();
_ = server.StartAsync(CancellationToken.None);
await server.WaitForReadyAsync();
// Set up exporter and importer accounts
var exporter = server.GetOrCreateAccount("exporter");
var importer = server.GetOrCreateAccount("importer");
exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null);
importer.AddServiceImport(exporter, "requests.>", "api.>");
// Wire the import subscriptions into the importer account
server.WireServiceImports(importer);
// Subscribe in exporter account to receive forwarded message
var exportSub = new Subscription { Subject = "api.test", Sid = "export-1", Client = null };
exporter.SubList.Insert(exportSub);
// Verify import infrastructure is wired: the importer should have service import entries
importer.Imports.Services.ShouldContainKey("requests.>");
importer.Imports.Services["requests.>"].Count.ShouldBe(1);
importer.Imports.Services["requests.>"][0].DestinationAccount.ShouldBe(exporter);
await server.ShutdownAsync();
}
[Fact]
public void ProcessServiceImport_delivers_to_destination_account_subscribers()
{
using var server = CreateTestServer();
var exporter = server.GetOrCreateAccount("exporter");
var importer = server.GetOrCreateAccount("importer");
exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null);
importer.AddServiceImport(exporter, "requests.>", "api.>");
// Add a subscriber in the exporter account's SubList
var received = new List<(string Subject, string Sid)>();
var mockClient = new TestNatsClient(1, exporter);
mockClient.OnMessage = (subject, sid, _, _, _) =>
received.Add((subject, sid));
var exportSub = new Subscription { Subject = "api.test", Sid = "s1", Client = mockClient };
exporter.SubList.Insert(exportSub);
// Process a service import directly
var si = importer.Imports.Services["requests.>"][0];
server.ProcessServiceImport(si, "requests.test", null,
ReadOnlyMemory<byte>.Empty, ReadOnlyMemory<byte>.Empty);
received.Count.ShouldBe(1);
received[0].Subject.ShouldBe("api.test");
received[0].Sid.ShouldBe("s1");
}
[Fact]
public void ProcessServiceImport_with_transform_applies_subject_mapping()
{
using var server = CreateTestServer();
var exporter = server.GetOrCreateAccount("exporter");
var importer = server.GetOrCreateAccount("importer");
exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null);
var si = importer.AddServiceImport(exporter, "requests.>", "api.>");
// Create a transform from requests.> to api.>
var transform = SubjectTransform.Create("requests.>", "api.>");
transform.ShouldNotBeNull();
// Create a new import with the transform set
var siWithTransform = new ServiceImport
{
DestinationAccount = exporter,
From = "requests.>",
To = "api.>",
Transform = transform,
};
var received = new List<string>();
var mockClient = new TestNatsClient(1, exporter);
mockClient.OnMessage = (subject, _, _, _, _) =>
received.Add(subject);
var exportSub = new Subscription { Subject = "api.hello", Sid = "s1", Client = mockClient };
exporter.SubList.Insert(exportSub);
server.ProcessServiceImport(siWithTransform, "requests.hello", null,
ReadOnlyMemory<byte>.Empty, ReadOnlyMemory<byte>.Empty);
received.Count.ShouldBe(1);
received[0].ShouldBe("api.hello");
}
[Fact]
public void ProcessServiceImport_skips_invalid_imports()
{
using var server = CreateTestServer();
var exporter = server.GetOrCreateAccount("exporter");
var importer = server.GetOrCreateAccount("importer");
exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null);
importer.AddServiceImport(exporter, "requests.>", "api.>");
// Mark the import as invalid
var si = importer.Imports.Services["requests.>"][0];
si.Invalid = true;
// Add a subscriber in the exporter account
var received = new List<string>();
var mockClient = new TestNatsClient(1, exporter);
mockClient.OnMessage = (subject, _, _, _, _) =>
received.Add(subject);
var exportSub = new Subscription { Subject = "api.test", Sid = "s1", Client = mockClient };
exporter.SubList.Insert(exportSub);
// ProcessServiceImport should be a no-op for invalid imports
server.ProcessServiceImport(si, "requests.test", null,
ReadOnlyMemory<byte>.Empty, ReadOnlyMemory<byte>.Empty);
received.Count.ShouldBe(0);
}
[Fact]
public void ProcessServiceImport_delivers_to_queue_groups()
{
using var server = CreateTestServer();
var exporter = server.GetOrCreateAccount("exporter");
var importer = server.GetOrCreateAccount("importer");
exporter.AddServiceExport("api.>", ServiceResponseType.Singleton, null);
importer.AddServiceImport(exporter, "requests.>", "api.>");
// Add queue group subscribers in the exporter account
var received = new List<(string Subject, string Sid)>();
var mockClient1 = new TestNatsClient(1, exporter);
mockClient1.OnMessage = (subject, sid, _, _, _) =>
received.Add((subject, sid));
var mockClient2 = new TestNatsClient(2, exporter);
mockClient2.OnMessage = (subject, sid, _, _, _) =>
received.Add((subject, sid));
var qSub1 = new Subscription { Subject = "api.test", Sid = "q1", Queue = "workers", Client = mockClient1 };
var qSub2 = new Subscription { Subject = "api.test", Sid = "q2", Queue = "workers", Client = mockClient2 };
exporter.SubList.Insert(qSub1);
exporter.SubList.Insert(qSub2);
var si = importer.Imports.Services["requests.>"][0];
server.ProcessServiceImport(si, "requests.test", null,
ReadOnlyMemory<byte>.Empty, ReadOnlyMemory<byte>.Empty);
// One member of the queue group should receive the message
received.Count.ShouldBe(1);
}
private static NatsServer CreateTestServer()
{
var port = GetFreePort();
return new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance);
}
private static int GetFreePort()
{
using var sock = new System.Net.Sockets.Socket(
System.Net.Sockets.AddressFamily.InterNetwork,
System.Net.Sockets.SocketType.Stream,
System.Net.Sockets.ProtocolType.Tcp);
sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0));
return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port;
}
/// <summary>
/// Minimal test double for INatsClient used in import/export tests.
/// </summary>
private sealed class TestNatsClient(ulong id, Account account) : INatsClient
{
public ulong Id => id;
public ClientKind Kind => ClientKind.Client;
public Account? Account => account;
public Protocol.ClientOptions? ClientOpts => null;
public ClientPermissions? Permissions => null;
public Action<string, string, string?, ReadOnlyMemory<byte>, ReadOnlyMemory<byte>>? OnMessage { get; set; }
public void SendMessage(string subject, string sid, string? replyTo,
ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload)
{
OnMessage?.Invoke(subject, sid, replyTo, headers, payload);
}
public bool QueueOutbound(ReadOnlyMemory<byte> data) => true;
public void RemoveSubscription(string sid) { }
}
}

View File

@@ -0,0 +1,85 @@
using NATS.Server.Auth;
namespace NATS.Server.Tests;
public class InternalClientTests
{
[Theory]
[InlineData(ClientKind.Client, false)]
[InlineData(ClientKind.Router, false)]
[InlineData(ClientKind.Gateway, false)]
[InlineData(ClientKind.Leaf, false)]
[InlineData(ClientKind.System, true)]
[InlineData(ClientKind.JetStream, true)]
[InlineData(ClientKind.Account, true)]
public void IsInternal_returns_correct_value(ClientKind kind, bool expected)
{
kind.IsInternal().ShouldBe(expected);
}
[Fact]
public void NatsClient_implements_INatsClient()
{
typeof(NatsClient).GetInterfaces().ShouldContain(typeof(INatsClient));
}
[Fact]
public void NatsClient_kind_is_Client()
{
typeof(NatsClient).GetProperty("Kind")!.PropertyType.ShouldBe(typeof(ClientKind));
}
[Fact]
public void InternalClient_system_kind()
{
var account = new Account("$SYS");
var client = new InternalClient(1, ClientKind.System, account);
client.Kind.ShouldBe(ClientKind.System);
client.IsInternal.ShouldBeTrue();
client.Id.ShouldBe(1UL);
client.Account.ShouldBe(account);
}
[Fact]
public void InternalClient_account_kind()
{
var account = new Account("myaccount");
var client = new InternalClient(2, ClientKind.Account, account);
client.Kind.ShouldBe(ClientKind.Account);
client.IsInternal.ShouldBeTrue();
}
[Fact]
public void InternalClient_rejects_non_internal_kind()
{
var account = new Account("test");
Should.Throw<ArgumentException>(() => new InternalClient(1, ClientKind.Client, account));
}
[Fact]
public void InternalClient_SendMessage_invokes_callback()
{
var account = new Account("$SYS");
var client = new InternalClient(1, ClientKind.System, account);
string? capturedSubject = null;
string? capturedSid = null;
client.MessageCallback = (subject, sid, replyTo, headers, payload) =>
{
capturedSubject = subject;
capturedSid = sid;
};
client.SendMessage("test.subject", "1", null, ReadOnlyMemory<byte>.Empty, ReadOnlyMemory<byte>.Empty);
capturedSubject.ShouldBe("test.subject");
capturedSid.ShouldBe("1");
}
[Fact]
public void InternalClient_QueueOutbound_returns_true_noop()
{
var account = new Account("$SYS");
var client = new InternalClient(1, ClientKind.System, account);
client.QueueOutbound(ReadOnlyMemory<byte>.Empty).ShouldBeTrue();
}
}

View File

@@ -0,0 +1,149 @@
using NATS.Server.Auth;
using NATS.Server.Imports;
namespace NATS.Server.Tests;
public class ResponseRoutingTests
{
[Fact]
public void GenerateReplyPrefix_creates_unique_prefix()
{
var prefix1 = ResponseRouter.GenerateReplyPrefix();
var prefix2 = ResponseRouter.GenerateReplyPrefix();
prefix1.ShouldStartWith("_R_.");
prefix2.ShouldStartWith("_R_.");
prefix1.ShouldNotBe(prefix2);
prefix1.Length.ShouldBeGreaterThan(4);
}
[Fact]
public void GenerateReplyPrefix_ends_with_dot()
{
var prefix = ResponseRouter.GenerateReplyPrefix();
prefix.ShouldEndWith(".");
// Format: "_R_." + 10 chars + "." = 15 chars
prefix.Length.ShouldBe(15);
}
[Fact]
public void Singleton_response_import_removed_after_delivery()
{
var exporter = new Account("exporter");
exporter.AddServiceExport("api.test", ServiceResponseType.Singleton, null);
var replyPrefix = ResponseRouter.GenerateReplyPrefix();
var responseSi = new ServiceImport
{
DestinationAccount = exporter,
From = replyPrefix + ">",
To = "_INBOX.original.reply",
IsResponse = true,
ResponseType = ServiceResponseType.Singleton,
};
exporter.Exports.Responses[replyPrefix] = responseSi;
exporter.Exports.Responses.ShouldContainKey(replyPrefix);
// Simulate singleton delivery cleanup
ResponseRouter.CleanupResponse(exporter, replyPrefix, responseSi);
exporter.Exports.Responses.ShouldNotContainKey(replyPrefix);
}
[Fact]
public void CreateResponseImport_registers_in_exporter_responses()
{
var exporter = new Account("exporter");
var importer = new Account("importer");
exporter.AddServiceExport("api.test", ServiceResponseType.Singleton, null);
var originalSi = new ServiceImport
{
DestinationAccount = exporter,
From = "api.test",
To = "api.test",
Export = exporter.Exports.Services["api.test"],
ResponseType = ServiceResponseType.Singleton,
};
var responseSi = ResponseRouter.CreateResponseImport(exporter, originalSi, "_INBOX.abc123");
responseSi.IsResponse.ShouldBeTrue();
responseSi.ResponseType.ShouldBe(ServiceResponseType.Singleton);
responseSi.To.ShouldBe("_INBOX.abc123");
responseSi.DestinationAccount.ShouldBe(exporter);
responseSi.From.ShouldEndWith(">");
responseSi.Export.ShouldBe(originalSi.Export);
// Should be registered in the exporter's response map
exporter.Exports.Responses.Count.ShouldBe(1);
}
[Fact]
public void CreateResponseImport_preserves_streamed_response_type()
{
var exporter = new Account("exporter");
exporter.AddServiceExport("api.stream", ServiceResponseType.Streamed, null);
var originalSi = new ServiceImport
{
DestinationAccount = exporter,
From = "api.stream",
To = "api.stream",
Export = exporter.Exports.Services["api.stream"],
ResponseType = ServiceResponseType.Streamed,
};
var responseSi = ResponseRouter.CreateResponseImport(exporter, originalSi, "_INBOX.xyz789");
responseSi.ResponseType.ShouldBe(ServiceResponseType.Streamed);
}
[Fact]
public void Multiple_response_imports_each_get_unique_prefix()
{
var exporter = new Account("exporter");
exporter.AddServiceExport("api.test", ServiceResponseType.Singleton, null);
var originalSi = new ServiceImport
{
DestinationAccount = exporter,
From = "api.test",
To = "api.test",
Export = exporter.Exports.Services["api.test"],
ResponseType = ServiceResponseType.Singleton,
};
var resp1 = ResponseRouter.CreateResponseImport(exporter, originalSi, "_INBOX.reply1");
var resp2 = ResponseRouter.CreateResponseImport(exporter, originalSi, "_INBOX.reply2");
exporter.Exports.Responses.Count.ShouldBe(2);
resp1.To.ShouldBe("_INBOX.reply1");
resp2.To.ShouldBe("_INBOX.reply2");
resp1.From.ShouldNotBe(resp2.From);
}
[Fact]
public void LatencyTracker_should_sample_respects_percentage()
{
var latency = new ServiceLatency { SamplingPercentage = 0, Subject = "latency.test" };
LatencyTracker.ShouldSample(latency).ShouldBeFalse();
var latency100 = new ServiceLatency { SamplingPercentage = 100, Subject = "latency.test" };
LatencyTracker.ShouldSample(latency100).ShouldBeTrue();
}
[Fact]
public void LatencyTracker_builds_latency_message()
{
var msg = LatencyTracker.BuildLatencyMsg("requester", "responder",
TimeSpan.FromMilliseconds(5), TimeSpan.FromMilliseconds(10));
msg.Requestor.ShouldBe("requester");
msg.Responder.ShouldBe("responder");
msg.ServiceLatencyNanos.ShouldBeGreaterThan(0);
msg.TotalLatencyNanos.ShouldBeGreaterThan(0);
}
}

View File

@@ -0,0 +1,133 @@
using System.Text.Json;
using NATS.Server;
using NATS.Server.Events;
using Microsoft.Extensions.Logging.Abstractions;
namespace NATS.Server.Tests;
public class SystemEventsTests
{
[Fact]
public async Task Server_publishes_connect_event_on_client_auth()
{
using var server = CreateTestServer();
_ = server.StartAsync(CancellationToken.None);
await server.WaitForReadyAsync();
var received = new TaskCompletionSource<string>();
server.EventSystem!.SysSubscribe("$SYS.ACCOUNT.*.CONNECT", (sub, client, acc, subject, reply, hdr, msg) =>
{
received.TrySetResult(subject);
});
// Connect a real client
using var sock = new System.Net.Sockets.Socket(
System.Net.Sockets.AddressFamily.InterNetwork,
System.Net.Sockets.SocketType.Stream,
System.Net.Sockets.ProtocolType.Tcp);
await sock.ConnectAsync(System.Net.IPAddress.Loopback, server.Port);
// Read INFO
var buf = new byte[4096];
await sock.ReceiveAsync(buf);
// Send CONNECT
var connect = System.Text.Encoding.ASCII.GetBytes("CONNECT {}\r\n");
await sock.SendAsync(connect);
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
result.ShouldStartWith("$SYS.ACCOUNT.");
result.ShouldEndWith(".CONNECT");
await server.ShutdownAsync();
}
[Fact]
public async Task Server_publishes_disconnect_event_on_client_close()
{
using var server = CreateTestServer();
_ = server.StartAsync(CancellationToken.None);
await server.WaitForReadyAsync();
var received = new TaskCompletionSource<string>();
server.EventSystem!.SysSubscribe("$SYS.ACCOUNT.*.DISCONNECT", (sub, client, acc, subject, reply, hdr, msg) =>
{
received.TrySetResult(subject);
});
// Connect and then disconnect
using var sock = new System.Net.Sockets.Socket(
System.Net.Sockets.AddressFamily.InterNetwork,
System.Net.Sockets.SocketType.Stream,
System.Net.Sockets.ProtocolType.Tcp);
await sock.ConnectAsync(System.Net.IPAddress.Loopback, server.Port);
var buf = new byte[4096];
await sock.ReceiveAsync(buf);
await sock.SendAsync(System.Text.Encoding.ASCII.GetBytes("CONNECT {}\r\n"));
await Task.Delay(100);
sock.Shutdown(System.Net.Sockets.SocketShutdown.Both);
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
result.ShouldStartWith("$SYS.ACCOUNT.");
result.ShouldEndWith(".DISCONNECT");
await server.ShutdownAsync();
}
[Fact]
public async Task Server_publishes_statsz_periodically()
{
using var server = CreateTestServer();
_ = server.StartAsync(CancellationToken.None);
await server.WaitForReadyAsync();
var received = new TaskCompletionSource<string>();
server.EventSystem!.SysSubscribe("$SYS.SERVER.*.STATSZ", (sub, client, acc, subject, reply, hdr, msg) =>
{
received.TrySetResult(subject);
});
// Trigger a manual stats publish (don't wait 10s)
server.EventSystem!.PublishServerStats();
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
result.ShouldContain(".STATSZ");
await server.ShutdownAsync();
}
[Fact]
public async Task Server_publishes_shutdown_event()
{
using var server = CreateTestServer();
_ = server.StartAsync(CancellationToken.None);
await server.WaitForReadyAsync();
var received = new TaskCompletionSource<string>();
server.EventSystem!.SysSubscribe("$SYS.SERVER.*.SHUTDOWN", (sub, client, acc, subject, reply, hdr, msg) =>
{
received.TrySetResult(subject);
});
await server.ShutdownAsync();
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
result.ShouldContain(".SHUTDOWN");
}
private static NatsServer CreateTestServer()
{
var port = GetFreePort();
return new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance);
}
private static int GetFreePort()
{
using var sock = new System.Net.Sockets.Socket(
System.Net.Sockets.AddressFamily.InterNetwork,
System.Net.Sockets.SocketType.Stream,
System.Net.Sockets.ProtocolType.Tcp);
sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0));
return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port;
}
}

View File

@@ -0,0 +1,170 @@
using System.Text;
using System.Text.Json;
using NATS.Server;
using NATS.Server.Events;
using Microsoft.Extensions.Logging.Abstractions;
namespace NATS.Server.Tests;
public class SystemRequestReplyTests
{
[Fact]
public async Task Varz_request_reply_returns_server_info()
{
using var server = CreateTestServer();
_ = server.StartAsync(CancellationToken.None);
await server.WaitForReadyAsync();
var received = new TaskCompletionSource<byte[]>();
var replySubject = $"_INBOX.test.{Guid.NewGuid():N}";
server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) =>
{
received.TrySetResult(msg.ToArray());
});
var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "VARZ");
server.SendInternalMsg(reqSubject, replySubject, null);
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
var json = Encoding.UTF8.GetString(result);
json.ShouldContain("\"server_id\"");
json.ShouldContain("\"version\"");
json.ShouldContain("\"host\"");
json.ShouldContain("\"port\"");
await server.ShutdownAsync();
}
[Fact]
public async Task Healthz_request_reply_returns_ok()
{
using var server = CreateTestServer();
_ = server.StartAsync(CancellationToken.None);
await server.WaitForReadyAsync();
var received = new TaskCompletionSource<byte[]>();
var replySubject = $"_INBOX.test.{Guid.NewGuid():N}";
server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) =>
{
received.TrySetResult(msg.ToArray());
});
var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "HEALTHZ");
server.SendInternalMsg(reqSubject, replySubject, null);
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
var json = Encoding.UTF8.GetString(result);
json.ShouldContain("ok");
await server.ShutdownAsync();
}
[Fact]
public async Task Subsz_request_reply_returns_subscription_count()
{
using var server = CreateTestServer();
_ = server.StartAsync(CancellationToken.None);
await server.WaitForReadyAsync();
var received = new TaskCompletionSource<byte[]>();
var replySubject = $"_INBOX.test.{Guid.NewGuid():N}";
server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) =>
{
received.TrySetResult(msg.ToArray());
});
var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "SUBSZ");
server.SendInternalMsg(reqSubject, replySubject, null);
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
var json = Encoding.UTF8.GetString(result);
json.ShouldContain("\"num_subscriptions\"");
await server.ShutdownAsync();
}
[Fact]
public async Task Idz_request_reply_returns_server_identity()
{
using var server = CreateTestServer();
_ = server.StartAsync(CancellationToken.None);
await server.WaitForReadyAsync();
var received = new TaskCompletionSource<byte[]>();
var replySubject = $"_INBOX.test.{Guid.NewGuid():N}";
server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) =>
{
received.TrySetResult(msg.ToArray());
});
var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "IDZ");
server.SendInternalMsg(reqSubject, replySubject, null);
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
var json = Encoding.UTF8.GetString(result);
json.ShouldContain("\"server_id\"");
json.ShouldContain("\"server_name\"");
await server.ShutdownAsync();
}
[Fact]
public async Task Ping_varz_responds_via_wildcard_subject()
{
using var server = CreateTestServer();
_ = server.StartAsync(CancellationToken.None);
await server.WaitForReadyAsync();
var received = new TaskCompletionSource<byte[]>();
var replySubject = $"_INBOX.test.{Guid.NewGuid():N}";
server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) =>
{
received.TrySetResult(msg.ToArray());
});
var pingSubject = string.Format(EventSubjects.ServerPing, "VARZ");
server.SendInternalMsg(pingSubject, replySubject, null);
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
var json = Encoding.UTF8.GetString(result);
json.ShouldContain("\"server_id\"");
await server.ShutdownAsync();
}
[Fact]
public async Task Request_without_reply_is_ignored()
{
using var server = CreateTestServer();
_ = server.StartAsync(CancellationToken.None);
await server.WaitForReadyAsync();
// Send a request with no reply subject -- should not crash
var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "VARZ");
server.SendInternalMsg(reqSubject, null, null);
// Give it a moment to process without error
await Task.Delay(200);
// Server should still be running
server.IsShuttingDown.ShouldBeFalse();
await server.ShutdownAsync();
}
private static NatsServer CreateTestServer()
{
var port = GetFreePort();
return new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance);
}
private static int GetFreePort()
{
using var sock = new System.Net.Sockets.Socket(
System.Net.Sockets.AddressFamily.InterNetwork,
System.Net.Sockets.SocketType.Stream,
System.Net.Sockets.ProtocolType.Tcp);
sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0));
return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port;
}
}