Compare commits
9 Commits
3531a87de0
...
6228f748ab
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6228f748ab | ||
|
|
71f7f569b9 | ||
|
|
1269e8b364 | ||
|
|
54207e2906 | ||
|
|
9977a01c56 | ||
|
|
a661e641c6 | ||
|
|
3f48d1c5ee | ||
|
|
4a242f614f | ||
|
|
e562077e4c |
@@ -68,7 +68,7 @@
|
|||||||
| JETSTREAM (internal) | Y | N | |
|
| JETSTREAM (internal) | Y | N | |
|
||||||
| ACCOUNT (internal) | Y | Y | Lazy per-account InternalClient with import/export subscription support |
|
| ACCOUNT (internal) | Y | Y | Lazy per-account InternalClient with import/export subscription support |
|
||||||
| WebSocket clients | Y | Y | Custom frame parser, permessage-deflate compression, origin checking, cookie auth |
|
| WebSocket clients | Y | Y | Custom frame parser, permessage-deflate compression, origin checking, cookie auth |
|
||||||
| MQTT clients | Y | N | |
|
| MQTT clients | Y | Partial | JWT connection-type constants + config parsing; no MQTT transport yet |
|
||||||
|
|
||||||
### Client Features
|
### Client Features
|
||||||
| Feature | Go | .NET | Notes |
|
| Feature | Go | .NET | Notes |
|
||||||
@@ -204,7 +204,7 @@ Go implements a sophisticated slow consumer detection system:
|
|||||||
| Username/password | Y | Y | |
|
| Username/password | Y | Y | |
|
||||||
| Token | Y | Y | |
|
| Token | Y | Y | |
|
||||||
| NKeys (Ed25519) | Y | Y | .NET has framework but integration is basic |
|
| NKeys (Ed25519) | Y | Y | .NET has framework but integration is basic |
|
||||||
| JWT validation | Y | Y | `NatsJwt` decode/verify, `JwtAuthenticator` with account resolution + revocation |
|
| JWT validation | Y | Y | `NatsJwt` decode/verify, `JwtAuthenticator` with account resolution + revocation + `allowed_connection_types` enforcement |
|
||||||
| Bcrypt password hashing | Y | Y | .NET supports bcrypt (`$2*` prefix) with constant-time fallback |
|
| Bcrypt password hashing | Y | Y | .NET supports bcrypt (`$2*` prefix) with constant-time fallback |
|
||||||
| TLS certificate mapping | Y | Y | X500DistinguishedName with full DN match and CN fallback |
|
| TLS certificate mapping | Y | Y | X500DistinguishedName with full DN match and CN fallback |
|
||||||
| Custom auth interface | Y | N | |
|
| Custom auth interface | Y | N | |
|
||||||
@@ -268,7 +268,7 @@ Go implements a sophisticated slow consumer detection system:
|
|||||||
- ~~Tags/metadata~~ — `Tags` dictionary implemented in `NatsOptions`
|
- ~~Tags/metadata~~ — `Tags` dictionary implemented in `NatsOptions`
|
||||||
- ~~OCSP configuration~~ — `OcspConfig` with 4 modes (Auto/Always/Must/Never), peer verification, and stapling
|
- ~~OCSP configuration~~ — `OcspConfig` with 4 modes (Auto/Always/Must/Never), peer verification, and stapling
|
||||||
- ~~WebSocket options~~ — `WebSocketOptions` with port, compression, origin checking, cookie auth, custom headers
|
- ~~WebSocket options~~ — `WebSocketOptions` with port, compression, origin checking, cookie auth, custom headers
|
||||||
- MQTT options
|
- ~~MQTT options~~ — `mqtt {}` config block parsed with all Go `MQTTOpts` fields; no listener yet
|
||||||
- ~~Operator mode / account resolver~~ — `JwtAuthenticator` + `IAccountResolver` + `MemAccountResolver` with trusted keys
|
- ~~Operator mode / account resolver~~ — `JwtAuthenticator` + `IAccountResolver` + `MemAccountResolver` with trusted keys
|
||||||
|
|
||||||
---
|
---
|
||||||
@@ -317,7 +317,7 @@ Go implements a sophisticated slow consumer detection system:
|
|||||||
| Subscription detail mode | Y | N | |
|
| Subscription detail mode | Y | N | |
|
||||||
| TLS peer certificate info | Y | N | |
|
| TLS peer certificate info | Y | N | |
|
||||||
| JWT/IssuerKey/Tags fields | Y | N | |
|
| JWT/IssuerKey/Tags fields | Y | N | |
|
||||||
| MQTT client ID filtering | Y | N | |
|
| MQTT client ID filtering | Y | Y | `mqtt_client` query param filters open and closed connections |
|
||||||
| Proxy info | Y | N | |
|
| Proxy info | Y | N | |
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|||||||
1642
docs/plans/2026-02-23-jetstream-full-parity-plan.md
Normal file
1642
docs/plans/2026-02-23-jetstream-full-parity-plan.md
Normal file
File diff suppressed because it is too large
Load Diff
933
docs/plans/2026-02-23-mqtt-connection-type-plan.md
Normal file
933
docs/plans/2026-02-23-mqtt-connection-type-plan.md
Normal file
@@ -0,0 +1,933 @@
|
|||||||
|
# MQTT Connection Type Parity + Config Parsing Implementation Plan
|
||||||
|
|
||||||
|
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers-extended-cc:executing-plans to implement this plan task-by-task.
|
||||||
|
|
||||||
|
**Goal:** Port Go-compatible MQTT connection-type handling for JWT `allowed_connection_types`, add `/connz` `mqtt_client` filtering, parse all Go `MQTTOpts` config fields, and expand `MqttOptsVarz` monitoring output — with tests and docs updates.
|
||||||
|
|
||||||
|
**Architecture:** Thread a connection-type value into auth context and enforce Go-style allowed-connection-type semantics in `JwtAuthenticator`. Add connz query-option filtering for `mqtt_client` across open and closed connections. Parse the full `mqtt {}` config block into a new `MqttOptions` model following the existing `ParseTls()` pattern in `ConfigProcessor`. Expand `MqttOptsVarz` and wire into `/varz`. Keep behavior backward-compatible and transport-agnostic so MQTT runtime plumbing can be added later without changing auth/monitoring/config semantics.
|
||||||
|
|
||||||
|
**Tech Stack:** .NET 10, xUnit 3, Shouldly, ASP.NET minimal APIs, System.Text.Json.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 1: Add failing JWT connection-type behavior tests
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `tests/NATS.Server.Tests/JwtAuthenticatorTests.cs`
|
||||||
|
|
||||||
|
**Step 1: Write the failing tests**
|
||||||
|
|
||||||
|
Add these 5 test methods to the existing `JwtAuthenticatorTests` class. Each test must build a valid operator/account/user JWT chain (reuse the existing helper pattern from other tests in the file). The user JWT's `nats.allowed_connection_types` array controls which connection types are permitted.
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
[Fact]
|
||||||
|
public async Task Allowed_connection_types_allows_standard_context()
|
||||||
|
{
|
||||||
|
// Build valid operator/account/user JWT chain.
|
||||||
|
// User JWT includes: "allowed_connection_types":["STANDARD"]
|
||||||
|
// Context sets ConnectionType = "STANDARD".
|
||||||
|
// Assert Authenticate() is not null.
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Allowed_connection_types_rejects_mqtt_only_for_standard_context()
|
||||||
|
{
|
||||||
|
// User JWT includes: "allowed_connection_types":["MQTT"]
|
||||||
|
// Context sets ConnectionType = "STANDARD".
|
||||||
|
// Assert Authenticate() is null.
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Allowed_connection_types_allows_known_even_with_unknown_values()
|
||||||
|
{
|
||||||
|
// User JWT includes: ["STANDARD", "SOME_NEW_TYPE"]
|
||||||
|
// Context sets ConnectionType = "STANDARD".
|
||||||
|
// Assert Authenticate() is not null.
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Allowed_connection_types_rejects_when_only_unknown_values_present()
|
||||||
|
{
|
||||||
|
// User JWT includes: ["SOME_NEW_TYPE"]
|
||||||
|
// Context sets ConnectionType = "STANDARD".
|
||||||
|
// Assert Authenticate() is null.
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Allowed_connection_types_is_case_insensitive_for_input_values()
|
||||||
|
{
|
||||||
|
// User JWT includes: ["standard"]
|
||||||
|
// Context sets ConnectionType = "STANDARD".
|
||||||
|
// Assert Authenticate() is not null.
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 2: Run test to verify it fails**
|
||||||
|
|
||||||
|
Run: `dotnet test tests/NATS.Server.Tests/NATS.Server.Tests.csproj --filter "FullyQualifiedName~JwtAuthenticatorTests.Allowed_connection_types" -v minimal`
|
||||||
|
Expected: FAIL (current implementation ignores `allowed_connection_types`).
|
||||||
|
|
||||||
|
**Step 3: Commit test-only checkpoint**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add tests/NATS.Server.Tests/JwtAuthenticatorTests.cs
|
||||||
|
git commit -m "test: add failing jwt allowed connection type coverage"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 2: Implement auth connection-type model and Go-style allowed-type conversion
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `src/NATS.Server/Auth/IAuthenticator.cs` (line 11-16: add `ConnectionType` property to `ClientAuthContext`)
|
||||||
|
- Create: `src/NATS.Server/Auth/Jwt/JwtConnectionTypes.cs`
|
||||||
|
- Modify: `src/NATS.Server/Auth/JwtAuthenticator.cs` (insert check after step 7 revocation check, before step 8 permissions, around line 97)
|
||||||
|
- Modify: `src/NATS.Server/NatsClient.cs` (line 382-387: add `ConnectionType` to auth context construction)
|
||||||
|
|
||||||
|
**Step 1: Add connection type to auth context**
|
||||||
|
|
||||||
|
In `src/NATS.Server/Auth/IAuthenticator.cs`, add the `ConnectionType` property to `ClientAuthContext`. Note: this requires adding a `using NATS.Server.Auth.Jwt;` at the top of the file.
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
public sealed class ClientAuthContext
|
||||||
|
{
|
||||||
|
public required ClientOptions Opts { get; init; }
|
||||||
|
public required byte[] Nonce { get; init; }
|
||||||
|
public string ConnectionType { get; init; } = JwtConnectionTypes.Standard;
|
||||||
|
public X509Certificate2? ClientCertificate { get; init; }
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 2: Create JWT connection-type constants + converter helper**
|
||||||
|
|
||||||
|
Create new file `src/NATS.Server/Auth/Jwt/JwtConnectionTypes.cs`:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
namespace NATS.Server.Auth.Jwt;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Known connection type constants matching Go server/client.go.
|
||||||
|
/// Used for JWT allowed_connection_types claim validation.
|
||||||
|
/// Reference: golang/nats-server/server/client.go connectionType constants.
|
||||||
|
/// </summary>
|
||||||
|
internal static class JwtConnectionTypes
|
||||||
|
{
|
||||||
|
public const string Standard = "STANDARD";
|
||||||
|
public const string Websocket = "WEBSOCKET";
|
||||||
|
public const string Leafnode = "LEAFNODE";
|
||||||
|
public const string LeafnodeWs = "LEAFNODE_WS";
|
||||||
|
public const string Mqtt = "MQTT";
|
||||||
|
public const string MqttWs = "MQTT_WS";
|
||||||
|
public const string InProcess = "INPROCESS";
|
||||||
|
|
||||||
|
private static readonly HashSet<string> Known =
|
||||||
|
[
|
||||||
|
Standard, Websocket, Leafnode, LeafnodeWs, Mqtt, MqttWs, InProcess,
|
||||||
|
];
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Converts a list of connection type strings (from JWT claims) into a set of
|
||||||
|
/// known valid types plus a flag indicating unknown values were present.
|
||||||
|
/// Reference: Go server/client.go convertAllowedConnectionTypes.
|
||||||
|
/// </summary>
|
||||||
|
public static (HashSet<string> Valid, bool HasUnknown) Convert(IEnumerable<string>? values)
|
||||||
|
{
|
||||||
|
var valid = new HashSet<string>(StringComparer.Ordinal);
|
||||||
|
var hasUnknown = false;
|
||||||
|
if (values is null) return (valid, false);
|
||||||
|
|
||||||
|
foreach (var raw in values)
|
||||||
|
{
|
||||||
|
var up = (raw ?? string.Empty).Trim().ToUpperInvariant();
|
||||||
|
if (up.Length == 0) continue;
|
||||||
|
if (Known.Contains(up)) valid.Add(up);
|
||||||
|
else hasUnknown = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return (valid, hasUnknown);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 3: Enforce allowed connection types in JWT auth**
|
||||||
|
|
||||||
|
In `src/NATS.Server/Auth/JwtAuthenticator.cs`, insert the following block after the revocation check (step 7, around line 96) and before the permissions build (step 8):
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
// 7b. Check allowed connection types
|
||||||
|
var (allowedTypes, hasUnknown) = JwtConnectionTypes.Convert(userClaims.Nats?.AllowedConnectionTypes);
|
||||||
|
|
||||||
|
if (allowedTypes.Count == 0)
|
||||||
|
{
|
||||||
|
if (hasUnknown)
|
||||||
|
return null; // unknown-only list should reject
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
var connType = string.IsNullOrWhiteSpace(context.ConnectionType)
|
||||||
|
? JwtConnectionTypes.Standard
|
||||||
|
: context.ConnectionType.ToUpperInvariant();
|
||||||
|
|
||||||
|
if (!allowedTypes.Contains(connType))
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 4: Set auth context connection type in client connect path**
|
||||||
|
|
||||||
|
In `src/NATS.Server/NatsClient.cs` around line 382, add `ConnectionType` to the existing `ClientAuthContext` construction:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
var context = new ClientAuthContext
|
||||||
|
{
|
||||||
|
Opts = ClientOpts,
|
||||||
|
Nonce = _nonce ?? [],
|
||||||
|
ConnectionType = JwtConnectionTypes.Standard,
|
||||||
|
ClientCertificate = TlsState?.PeerCert,
|
||||||
|
};
|
||||||
|
```
|
||||||
|
|
||||||
|
Add `using NATS.Server.Auth.Jwt;` at the top of the file.
|
||||||
|
|
||||||
|
**Step 5: Run tests to verify pass**
|
||||||
|
|
||||||
|
Run: `dotnet test tests/NATS.Server.Tests/NATS.Server.Tests.csproj --filter "FullyQualifiedName~JwtAuthenticatorTests.Allowed_connection_types" -v minimal`
|
||||||
|
Expected: PASS.
|
||||||
|
|
||||||
|
**Step 6: Commit implementation checkpoint**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add src/NATS.Server/Auth/IAuthenticator.cs src/NATS.Server/Auth/Jwt/JwtConnectionTypes.cs src/NATS.Server/Auth/JwtAuthenticator.cs src/NATS.Server/NatsClient.cs
|
||||||
|
git commit -m "feat: enforce jwt allowed connection types with go-compatible semantics"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 3: Add failing connz mqtt_client filter tests
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `tests/NATS.Server.Tests/MonitorTests.cs`
|
||||||
|
|
||||||
|
**Step 1: Write the failing tests**
|
||||||
|
|
||||||
|
Add these 2 test methods to the existing `MonitorTests` class. These test the `/connz?mqtt_client=<id>` query parameter filtering.
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
[Fact]
|
||||||
|
public async Task Connz_filters_by_mqtt_client_for_open_connections()
|
||||||
|
{
|
||||||
|
// Start server with monitoring port.
|
||||||
|
// Connect a regular NATS client (no MQTT ID).
|
||||||
|
// Query /connz?mqtt_client=some-id.
|
||||||
|
// Assert num_connections == 0 (no client has that MQTT ID).
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Connz_filters_by_mqtt_client_for_closed_connections()
|
||||||
|
{
|
||||||
|
// Start server with monitoring port.
|
||||||
|
// Query /connz?state=closed&mqtt_client=missing-id.
|
||||||
|
// Assert num_connections == 0.
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 2: Run tests to verify expected failure mode**
|
||||||
|
|
||||||
|
Run: `dotnet test tests/NATS.Server.Tests/NATS.Server.Tests.csproj --filter "FullyQualifiedName~MonitorTests.Connz_filters_by_mqtt_client" -v minimal`
|
||||||
|
Expected: FAIL (query option not implemented yet — `mqtt_client` param ignored, so all connections returned).
|
||||||
|
|
||||||
|
**Step 3: Commit test-only checkpoint**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add tests/NATS.Server.Tests/MonitorTests.cs
|
||||||
|
git commit -m "test: add failing connz mqtt_client filter coverage"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 4: Implement connz mqtt_client filtering and closed snapshot support
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `src/NATS.Server/Monitoring/Connz.cs` (line 191-210: add `MqttClient` to `ConnzOptions`)
|
||||||
|
- Modify: `src/NATS.Server/Monitoring/ConnzHandler.cs` (line 148-201: parse query param; line 18-29: apply filter after collection but before sort)
|
||||||
|
- Modify: `src/NATS.Server/Monitoring/ClosedClient.cs` (line 6-25: add `MqttClient` property)
|
||||||
|
- Modify: `src/NATS.Server/NatsServer.cs` (line 695-714: add `MqttClient` to closed snapshot)
|
||||||
|
|
||||||
|
**Step 1: Add `MqttClient` to `ConnzOptions`**
|
||||||
|
|
||||||
|
In `src/NATS.Server/Monitoring/Connz.cs`, add after `FilterSubject` property (line 205):
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
public string MqttClient { get; set; } = "";
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 2: Parse `mqtt_client` query param in handler**
|
||||||
|
|
||||||
|
In `src/NATS.Server/Monitoring/ConnzHandler.cs` `ParseQueryParams` method, add after the existing `limit` parse block (around line 198):
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
if (q.TryGetValue("mqtt_client", out var mqttClient))
|
||||||
|
opts.MqttClient = mqttClient.ToString();
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 3: Apply `mqtt_client` filter in `HandleConnz`**
|
||||||
|
|
||||||
|
In `src/NATS.Server/Monitoring/ConnzHandler.cs` `HandleConnz` method, add after the closed connections collection block (after line 29) and before the sort validation (line 32):
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
// Filter by MQTT client ID
|
||||||
|
if (!string.IsNullOrEmpty(opts.MqttClient))
|
||||||
|
connInfos = connInfos.Where(c => c.MqttClient == opts.MqttClient).ToList();
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 4: Add `MqttClient` to `ClosedClient` model**
|
||||||
|
|
||||||
|
In `src/NATS.Server/Monitoring/ClosedClient.cs`, add after line 24 (`TlsCipherSuite`):
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
public string MqttClient { get; init; } = "";
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 5: Add `MqttClient` to closed snapshot creation in `NatsServer.RemoveClient`**
|
||||||
|
|
||||||
|
In `src/NATS.Server/NatsServer.cs` around line 713 (inside the `new ClosedClient { ... }` block), add:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
MqttClient = "", // populated when MQTT transport is implemented
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 6: Add `MqttClient` to `BuildClosedConnInfo`**
|
||||||
|
|
||||||
|
In `src/NATS.Server/Monitoring/ConnzHandler.cs` `BuildClosedConnInfo` method (line 119-146), add to the `new ConnInfo { ... }` initializer:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
MqttClient = closed.MqttClient,
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 7: Run connz mqtt filter tests**
|
||||||
|
|
||||||
|
Run: `dotnet test tests/NATS.Server.Tests/NATS.Server.Tests.csproj --filter "FullyQualifiedName~MonitorTests.Connz_filters_by_mqtt_client" -v minimal`
|
||||||
|
Expected: PASS.
|
||||||
|
|
||||||
|
**Step 8: Commit implementation checkpoint**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add src/NATS.Server/Monitoring/Connz.cs src/NATS.Server/Monitoring/ConnzHandler.cs src/NATS.Server/Monitoring/ClosedClient.cs src/NATS.Server/NatsServer.cs
|
||||||
|
git commit -m "feat: add connz mqtt_client filtering"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 5: Verification checkpoint for JWT + connz tasks
|
||||||
|
|
||||||
|
**Step 1: Run all JWT connection-type tests**
|
||||||
|
|
||||||
|
Run: `dotnet test tests/NATS.Server.Tests/NATS.Server.Tests.csproj --filter "FullyQualifiedName~JwtAuthenticatorTests.Allowed_connection_types" -v minimal`
|
||||||
|
Expected: PASS.
|
||||||
|
|
||||||
|
**Step 2: Run all connz tests**
|
||||||
|
|
||||||
|
Run: `dotnet test tests/NATS.Server.Tests/NATS.Server.Tests.csproj --filter "FullyQualifiedName~MonitorTests.Connz" -v minimal`
|
||||||
|
Expected: PASS.
|
||||||
|
|
||||||
|
**Step 3: Run full test suite**
|
||||||
|
|
||||||
|
Run: `dotnet test tests/NATS.Server.Tests/NATS.Server.Tests.csproj -v minimal`
|
||||||
|
Expected: PASS (no regressions).
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 6: Add MqttOptions model and config parsing
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Create: `src/NATS.Server/MqttOptions.cs`
|
||||||
|
- Modify: `src/NATS.Server/NatsOptions.cs` (line 116-117: add `Mqtt` property)
|
||||||
|
- Modify: `src/NATS.Server/Configuration/ConfigProcessor.cs` (line 248: add `mqtt` case; add `ParseMqtt` + `ParseMqttAuth` + `ParseMqttTls` + `ToDouble` methods)
|
||||||
|
|
||||||
|
**Step 1: Create `MqttOptions` model**
|
||||||
|
|
||||||
|
Create new file `src/NATS.Server/MqttOptions.cs`. This matches Go `MQTTOpts` struct (golang/nats-server/server/opts.go:613-707):
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
namespace NATS.Server;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// MQTT protocol configuration options.
|
||||||
|
/// Corresponds to Go server/opts.go MQTTOpts struct.
|
||||||
|
/// Config is parsed and stored but no MQTT listener is started yet.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class MqttOptions
|
||||||
|
{
|
||||||
|
// Network
|
||||||
|
public string Host { get; set; } = "";
|
||||||
|
public int Port { get; set; }
|
||||||
|
|
||||||
|
// Auth override (MQTT-specific, separate from global auth)
|
||||||
|
public string? NoAuthUser { get; set; }
|
||||||
|
public string? Username { get; set; }
|
||||||
|
public string? Password { get; set; }
|
||||||
|
public string? Token { get; set; }
|
||||||
|
public double AuthTimeout { get; set; }
|
||||||
|
|
||||||
|
// TLS
|
||||||
|
public string? TlsCert { get; set; }
|
||||||
|
public string? TlsKey { get; set; }
|
||||||
|
public string? TlsCaCert { get; set; }
|
||||||
|
public bool TlsVerify { get; set; }
|
||||||
|
public double TlsTimeout { get; set; } = 2.0;
|
||||||
|
public bool TlsMap { get; set; }
|
||||||
|
public HashSet<string>? TlsPinnedCerts { get; set; }
|
||||||
|
|
||||||
|
// JetStream integration
|
||||||
|
public string? JsDomain { get; set; }
|
||||||
|
public int StreamReplicas { get; set; }
|
||||||
|
public int ConsumerReplicas { get; set; }
|
||||||
|
public bool ConsumerMemoryStorage { get; set; }
|
||||||
|
public TimeSpan ConsumerInactiveThreshold { get; set; }
|
||||||
|
|
||||||
|
// QoS
|
||||||
|
public TimeSpan AckWait { get; set; } = TimeSpan.FromSeconds(30);
|
||||||
|
public ushort MaxAckPending { get; set; }
|
||||||
|
public TimeSpan JsApiTimeout { get; set; } = TimeSpan.FromSeconds(5);
|
||||||
|
|
||||||
|
public bool HasTls => TlsCert != null && TlsKey != null;
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 2: Add `Mqtt` property to `NatsOptions`**
|
||||||
|
|
||||||
|
In `src/NATS.Server/NatsOptions.cs`, add before the `HasTls` property (around line 117):
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
// MQTT configuration (parsed from config, no listener yet)
|
||||||
|
public MqttOptions? Mqtt { get; set; }
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 3: Add `ToDouble` helper to `ConfigProcessor`**
|
||||||
|
|
||||||
|
In `src/NATS.Server/Configuration/ConfigProcessor.cs`, add after the `ToString` helper (around line 654):
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
private static double ToDouble(object? value) => value switch
|
||||||
|
{
|
||||||
|
double d => d,
|
||||||
|
long l => l,
|
||||||
|
int i => i,
|
||||||
|
string s when double.TryParse(s, NumberStyles.Float, CultureInfo.InvariantCulture, out var d) => d,
|
||||||
|
_ => throw new FormatException($"Cannot convert {value?.GetType().Name ?? "null"} to double"),
|
||||||
|
};
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 4: Add `mqtt` case to `ProcessKey` switch**
|
||||||
|
|
||||||
|
In `src/NATS.Server/Configuration/ConfigProcessor.cs`, replace the default case comment at line 248:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
// MQTT
|
||||||
|
case "mqtt":
|
||||||
|
if (value is Dictionary<string, object?> mqttDict)
|
||||||
|
ParseMqtt(mqttDict, opts, errors);
|
||||||
|
break;
|
||||||
|
|
||||||
|
// Unknown keys silently ignored (cluster, jetstream, gateway, leafnode, etc.)
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 5: Add `ParseMqtt` method**
|
||||||
|
|
||||||
|
Add this method after `ParseTags` (around line 621). It follows the exact key/alias structure from Go `parseMQTT` (opts.go:5443-5541):
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
// ─── MQTT parsing ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
private static void ParseMqtt(Dictionary<string, object?> dict, NatsOptions opts, List<string> errors)
|
||||||
|
{
|
||||||
|
var mqtt = opts.Mqtt ?? new MqttOptions();
|
||||||
|
|
||||||
|
foreach (var (key, value) in dict)
|
||||||
|
{
|
||||||
|
switch (key.ToLowerInvariant())
|
||||||
|
{
|
||||||
|
case "listen":
|
||||||
|
var (host, port) = ParseHostPort(value);
|
||||||
|
if (host is not null) mqtt.Host = host;
|
||||||
|
if (port is not null) mqtt.Port = port.Value;
|
||||||
|
break;
|
||||||
|
case "port":
|
||||||
|
mqtt.Port = ToInt(value);
|
||||||
|
break;
|
||||||
|
case "host" or "net":
|
||||||
|
mqtt.Host = ToString(value);
|
||||||
|
break;
|
||||||
|
case "no_auth_user":
|
||||||
|
mqtt.NoAuthUser = ToString(value);
|
||||||
|
break;
|
||||||
|
case "tls":
|
||||||
|
if (value is Dictionary<string, object?> tlsDict)
|
||||||
|
ParseMqttTls(tlsDict, mqtt, errors);
|
||||||
|
break;
|
||||||
|
case "authorization" or "authentication":
|
||||||
|
if (value is Dictionary<string, object?> authDict)
|
||||||
|
ParseMqttAuth(authDict, mqtt, errors);
|
||||||
|
break;
|
||||||
|
case "ack_wait" or "ackwait":
|
||||||
|
mqtt.AckWait = ParseDuration(value);
|
||||||
|
break;
|
||||||
|
case "js_api_timeout" or "api_timeout":
|
||||||
|
mqtt.JsApiTimeout = ParseDuration(value);
|
||||||
|
break;
|
||||||
|
case "max_ack_pending" or "max_pending" or "max_inflight":
|
||||||
|
var pending = ToInt(value);
|
||||||
|
if (pending < 0 || pending > 0xFFFF)
|
||||||
|
errors.Add($"mqtt max_ack_pending invalid value {pending}, should be in [0..{0xFFFF}] range");
|
||||||
|
else
|
||||||
|
mqtt.MaxAckPending = (ushort)pending;
|
||||||
|
break;
|
||||||
|
case "js_domain":
|
||||||
|
mqtt.JsDomain = ToString(value);
|
||||||
|
break;
|
||||||
|
case "stream_replicas":
|
||||||
|
mqtt.StreamReplicas = ToInt(value);
|
||||||
|
break;
|
||||||
|
case "consumer_replicas":
|
||||||
|
mqtt.ConsumerReplicas = ToInt(value);
|
||||||
|
break;
|
||||||
|
case "consumer_memory_storage":
|
||||||
|
mqtt.ConsumerMemoryStorage = ToBool(value);
|
||||||
|
break;
|
||||||
|
case "consumer_inactive_threshold" or "consumer_auto_cleanup":
|
||||||
|
mqtt.ConsumerInactiveThreshold = ParseDuration(value);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
// Unknown MQTT keys silently ignored
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
opts.Mqtt = mqtt;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void ParseMqttAuth(Dictionary<string, object?> dict, MqttOptions mqtt, List<string> errors)
|
||||||
|
{
|
||||||
|
foreach (var (key, value) in dict)
|
||||||
|
{
|
||||||
|
switch (key.ToLowerInvariant())
|
||||||
|
{
|
||||||
|
case "user" or "username":
|
||||||
|
mqtt.Username = ToString(value);
|
||||||
|
break;
|
||||||
|
case "pass" or "password":
|
||||||
|
mqtt.Password = ToString(value);
|
||||||
|
break;
|
||||||
|
case "token":
|
||||||
|
mqtt.Token = ToString(value);
|
||||||
|
break;
|
||||||
|
case "timeout":
|
||||||
|
mqtt.AuthTimeout = ToDouble(value);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void ParseMqttTls(Dictionary<string, object?> dict, MqttOptions mqtt, List<string> errors)
|
||||||
|
{
|
||||||
|
foreach (var (key, value) in dict)
|
||||||
|
{
|
||||||
|
switch (key.ToLowerInvariant())
|
||||||
|
{
|
||||||
|
case "cert_file":
|
||||||
|
mqtt.TlsCert = ToString(value);
|
||||||
|
break;
|
||||||
|
case "key_file":
|
||||||
|
mqtt.TlsKey = ToString(value);
|
||||||
|
break;
|
||||||
|
case "ca_file":
|
||||||
|
mqtt.TlsCaCert = ToString(value);
|
||||||
|
break;
|
||||||
|
case "verify":
|
||||||
|
mqtt.TlsVerify = ToBool(value);
|
||||||
|
break;
|
||||||
|
case "verify_and_map":
|
||||||
|
var map = ToBool(value);
|
||||||
|
mqtt.TlsMap = map;
|
||||||
|
if (map) mqtt.TlsVerify = true;
|
||||||
|
break;
|
||||||
|
case "timeout":
|
||||||
|
mqtt.TlsTimeout = ToDouble(value);
|
||||||
|
break;
|
||||||
|
case "pinned_certs":
|
||||||
|
if (value is List<object?> pinnedList)
|
||||||
|
{
|
||||||
|
var certs = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
|
||||||
|
foreach (var item in pinnedList)
|
||||||
|
{
|
||||||
|
if (item is string s)
|
||||||
|
certs.Add(s.ToLowerInvariant());
|
||||||
|
}
|
||||||
|
mqtt.TlsPinnedCerts = certs;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 6: Build to verify compilation**
|
||||||
|
|
||||||
|
Run: `dotnet build`
|
||||||
|
Expected: Build succeeded.
|
||||||
|
|
||||||
|
**Step 7: Commit**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add src/NATS.Server/MqttOptions.cs src/NATS.Server/NatsOptions.cs src/NATS.Server/Configuration/ConfigProcessor.cs
|
||||||
|
git commit -m "feat: add mqtt config model and parser for all Go MQTTOpts fields"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 7: Add MQTT config parsing tests
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Create: `tests/NATS.Server.Tests/TestData/mqtt.conf`
|
||||||
|
- Modify: `tests/NATS.Server.Tests/ConfigProcessorTests.cs`
|
||||||
|
|
||||||
|
**Step 1: Create MQTT test config file**
|
||||||
|
|
||||||
|
Create `tests/NATS.Server.Tests/TestData/mqtt.conf`:
|
||||||
|
|
||||||
|
```
|
||||||
|
mqtt {
|
||||||
|
listen: "10.0.0.1:1883"
|
||||||
|
no_auth_user: "mqtt_default"
|
||||||
|
|
||||||
|
authorization {
|
||||||
|
user: "mqtt_user"
|
||||||
|
pass: "mqtt_pass"
|
||||||
|
token: "mqtt_token"
|
||||||
|
timeout: 3.0
|
||||||
|
}
|
||||||
|
|
||||||
|
tls {
|
||||||
|
cert_file: "/path/to/mqtt-cert.pem"
|
||||||
|
key_file: "/path/to/mqtt-key.pem"
|
||||||
|
ca_file: "/path/to/mqtt-ca.pem"
|
||||||
|
verify: true
|
||||||
|
timeout: 5.0
|
||||||
|
}
|
||||||
|
|
||||||
|
ack_wait: "60s"
|
||||||
|
max_ack_pending: 2048
|
||||||
|
js_domain: "mqtt-domain"
|
||||||
|
js_api_timeout: "10s"
|
||||||
|
stream_replicas: 3
|
||||||
|
consumer_replicas: 1
|
||||||
|
consumer_memory_storage: true
|
||||||
|
consumer_inactive_threshold: "5m"
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Ensure this file is copied to output: check that `.csproj` has a wildcard for TestData, or add:
|
||||||
|
```xml
|
||||||
|
<ItemGroup>
|
||||||
|
<None Update="TestData\**" CopyToOutputDirectory="PreserveNewest" />
|
||||||
|
</ItemGroup>
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 2: Add MQTT config tests**
|
||||||
|
|
||||||
|
Add to `tests/NATS.Server.Tests/ConfigProcessorTests.cs`:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
// ─── MQTT config ────────────────────────────────────────────
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void MqttConf_ListenHostAndPort()
|
||||||
|
{
|
||||||
|
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("mqtt.conf"));
|
||||||
|
opts.Mqtt.ShouldNotBeNull();
|
||||||
|
opts.Mqtt!.Host.ShouldBe("10.0.0.1");
|
||||||
|
opts.Mqtt.Port.ShouldBe(1883);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void MqttConf_NoAuthUser()
|
||||||
|
{
|
||||||
|
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("mqtt.conf"));
|
||||||
|
opts.Mqtt.ShouldNotBeNull();
|
||||||
|
opts.Mqtt!.NoAuthUser.ShouldBe("mqtt_default");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void MqttConf_Authorization()
|
||||||
|
{
|
||||||
|
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("mqtt.conf"));
|
||||||
|
opts.Mqtt.ShouldNotBeNull();
|
||||||
|
opts.Mqtt!.Username.ShouldBe("mqtt_user");
|
||||||
|
opts.Mqtt.Password.ShouldBe("mqtt_pass");
|
||||||
|
opts.Mqtt.Token.ShouldBe("mqtt_token");
|
||||||
|
opts.Mqtt.AuthTimeout.ShouldBe(3.0);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void MqttConf_Tls()
|
||||||
|
{
|
||||||
|
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("mqtt.conf"));
|
||||||
|
opts.Mqtt.ShouldNotBeNull();
|
||||||
|
opts.Mqtt!.TlsCert.ShouldBe("/path/to/mqtt-cert.pem");
|
||||||
|
opts.Mqtt.TlsKey.ShouldBe("/path/to/mqtt-key.pem");
|
||||||
|
opts.Mqtt.TlsCaCert.ShouldBe("/path/to/mqtt-ca.pem");
|
||||||
|
opts.Mqtt.TlsVerify.ShouldBeTrue();
|
||||||
|
opts.Mqtt.TlsTimeout.ShouldBe(5.0);
|
||||||
|
opts.Mqtt.HasTls.ShouldBeTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void MqttConf_QosSettings()
|
||||||
|
{
|
||||||
|
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("mqtt.conf"));
|
||||||
|
opts.Mqtt.ShouldNotBeNull();
|
||||||
|
opts.Mqtt!.AckWait.ShouldBe(TimeSpan.FromSeconds(60));
|
||||||
|
opts.Mqtt.MaxAckPending.ShouldBe((ushort)2048);
|
||||||
|
opts.Mqtt.JsApiTimeout.ShouldBe(TimeSpan.FromSeconds(10));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void MqttConf_JetStreamSettings()
|
||||||
|
{
|
||||||
|
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("mqtt.conf"));
|
||||||
|
opts.Mqtt.ShouldNotBeNull();
|
||||||
|
opts.Mqtt!.JsDomain.ShouldBe("mqtt-domain");
|
||||||
|
opts.Mqtt.StreamReplicas.ShouldBe(3);
|
||||||
|
opts.Mqtt.ConsumerReplicas.ShouldBe(1);
|
||||||
|
opts.Mqtt.ConsumerMemoryStorage.ShouldBeTrue();
|
||||||
|
opts.Mqtt.ConsumerInactiveThreshold.ShouldBe(TimeSpan.FromMinutes(5));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void MqttConf_MaxAckPendingValidation_ReportsError()
|
||||||
|
{
|
||||||
|
var ex = Should.Throw<ConfigProcessorException>(() =>
|
||||||
|
ConfigProcessor.ProcessConfig("""
|
||||||
|
mqtt {
|
||||||
|
max_ack_pending: 70000
|
||||||
|
}
|
||||||
|
"""));
|
||||||
|
ex.Errors.ShouldContain(e => e.Contains("max_ack_pending"));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void MqttConf_Aliases()
|
||||||
|
{
|
||||||
|
// Test alias keys: "ackwait" (alias for "ack_wait"), "net" (alias for "host"),
|
||||||
|
// "max_inflight" (alias for "max_ack_pending"), "consumer_auto_cleanup" (alias)
|
||||||
|
var opts = ConfigProcessor.ProcessConfig("""
|
||||||
|
mqtt {
|
||||||
|
net: "127.0.0.1"
|
||||||
|
port: 1884
|
||||||
|
ackwait: "45s"
|
||||||
|
max_inflight: 500
|
||||||
|
api_timeout: "8s"
|
||||||
|
consumer_auto_cleanup: "10m"
|
||||||
|
}
|
||||||
|
""");
|
||||||
|
opts.Mqtt.ShouldNotBeNull();
|
||||||
|
opts.Mqtt!.Host.ShouldBe("127.0.0.1");
|
||||||
|
opts.Mqtt.Port.ShouldBe(1884);
|
||||||
|
opts.Mqtt.AckWait.ShouldBe(TimeSpan.FromSeconds(45));
|
||||||
|
opts.Mqtt.MaxAckPending.ShouldBe((ushort)500);
|
||||||
|
opts.Mqtt.JsApiTimeout.ShouldBe(TimeSpan.FromSeconds(8));
|
||||||
|
opts.Mqtt.ConsumerInactiveThreshold.ShouldBe(TimeSpan.FromMinutes(10));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void MqttConf_Absent_ReturnsNull()
|
||||||
|
{
|
||||||
|
var opts = ConfigProcessor.ProcessConfig("port: 4222");
|
||||||
|
opts.Mqtt.ShouldBeNull();
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 3: Run MQTT config tests**
|
||||||
|
|
||||||
|
Run: `dotnet test tests/NATS.Server.Tests/NATS.Server.Tests.csproj --filter "FullyQualifiedName~ConfigProcessorTests.MqttConf" -v minimal`
|
||||||
|
Expected: PASS.
|
||||||
|
|
||||||
|
**Step 4: Commit**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add tests/NATS.Server.Tests/TestData/mqtt.conf tests/NATS.Server.Tests/ConfigProcessorTests.cs
|
||||||
|
git commit -m "test: add mqtt config parsing coverage"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 8: Expand MqttOptsVarz and wire into /varz
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `src/NATS.Server/Monitoring/Varz.cs` (lines 350-360: expand `MqttOptsVarz`)
|
||||||
|
- Modify: `src/NATS.Server/Monitoring/VarzHandler.cs` (line 67-124: populate MQTT block from options)
|
||||||
|
|
||||||
|
**Step 1: Expand `MqttOptsVarz` class**
|
||||||
|
|
||||||
|
In `src/NATS.Server/Monitoring/Varz.cs`, replace the existing minimal `MqttOptsVarz` (lines 350-360) with the full Go-compatible struct (matching Go server/monitor.go:1365-1378):
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
/// <summary>
|
||||||
|
/// MQTT configuration monitoring information.
|
||||||
|
/// Corresponds to Go server/monitor.go MQTTOptsVarz struct.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class MqttOptsVarz
|
||||||
|
{
|
||||||
|
[JsonPropertyName("host")]
|
||||||
|
public string Host { get; set; } = "";
|
||||||
|
|
||||||
|
[JsonPropertyName("port")]
|
||||||
|
public int Port { get; set; }
|
||||||
|
|
||||||
|
[JsonPropertyName("no_auth_user")]
|
||||||
|
public string NoAuthUser { get; set; } = "";
|
||||||
|
|
||||||
|
[JsonPropertyName("auth_timeout")]
|
||||||
|
public double AuthTimeout { get; set; }
|
||||||
|
|
||||||
|
[JsonPropertyName("tls_map")]
|
||||||
|
public bool TlsMap { get; set; }
|
||||||
|
|
||||||
|
[JsonPropertyName("tls_timeout")]
|
||||||
|
public double TlsTimeout { get; set; }
|
||||||
|
|
||||||
|
[JsonPropertyName("tls_pinned_certs")]
|
||||||
|
public string[] TlsPinnedCerts { get; set; } = [];
|
||||||
|
|
||||||
|
[JsonPropertyName("js_domain")]
|
||||||
|
public string JsDomain { get; set; } = "";
|
||||||
|
|
||||||
|
[JsonPropertyName("ack_wait")]
|
||||||
|
public long AckWait { get; set; }
|
||||||
|
|
||||||
|
[JsonPropertyName("max_ack_pending")]
|
||||||
|
public ushort MaxAckPending { get; set; }
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Note: Go's `AckWait` is serialized as `time.Duration` (nanoseconds as int64). We follow the same pattern used for `PingInterval` and `WriteDeadline` in the existing Varz class.
|
||||||
|
|
||||||
|
**Step 2: Populate MQTT block in VarzHandler**
|
||||||
|
|
||||||
|
In `src/NATS.Server/Monitoring/VarzHandler.cs`, add MQTT population to the `return new Varz { ... }` block (around line 123, after `HttpReqStats`):
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
Mqtt = BuildMqttVarz(),
|
||||||
|
```
|
||||||
|
|
||||||
|
And add the helper method to `VarzHandler`:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
private MqttOptsVarz BuildMqttVarz()
|
||||||
|
{
|
||||||
|
var mqtt = _options.Mqtt;
|
||||||
|
if (mqtt is null)
|
||||||
|
return new MqttOptsVarz();
|
||||||
|
|
||||||
|
return new MqttOptsVarz
|
||||||
|
{
|
||||||
|
Host = mqtt.Host,
|
||||||
|
Port = mqtt.Port,
|
||||||
|
NoAuthUser = mqtt.NoAuthUser ?? "",
|
||||||
|
AuthTimeout = mqtt.AuthTimeout,
|
||||||
|
TlsMap = mqtt.TlsMap,
|
||||||
|
TlsTimeout = mqtt.TlsTimeout,
|
||||||
|
TlsPinnedCerts = mqtt.TlsPinnedCerts?.ToArray() ?? [],
|
||||||
|
JsDomain = mqtt.JsDomain ?? "",
|
||||||
|
AckWait = (long)mqtt.AckWait.TotalNanoseconds,
|
||||||
|
MaxAckPending = mqtt.MaxAckPending,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 3: Build to verify compilation**
|
||||||
|
|
||||||
|
Run: `dotnet build`
|
||||||
|
Expected: Build succeeded.
|
||||||
|
|
||||||
|
**Step 4: Add varz MQTT test**
|
||||||
|
|
||||||
|
In `tests/NATS.Server.Tests/MonitorTests.cs`, add a test that verifies the MQTT section appears in `/varz` response. If there's an existing varz test pattern, follow it. Otherwise add:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
[Fact]
|
||||||
|
public async Task Varz_includes_mqtt_config_when_set()
|
||||||
|
{
|
||||||
|
// Start server with monitoring enabled and mqtt config set.
|
||||||
|
// GET /varz.
|
||||||
|
// Assert response contains "mqtt" block with expected host/port values.
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
The exact test implementation depends on how the existing varz tests create and query the server — follow the existing pattern in MonitorTests.cs.
|
||||||
|
|
||||||
|
**Step 5: Run full test suite**
|
||||||
|
|
||||||
|
Run: `dotnet test tests/NATS.Server.Tests/NATS.Server.Tests.csproj -v minimal`
|
||||||
|
Expected: PASS.
|
||||||
|
|
||||||
|
**Step 6: Commit**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add src/NATS.Server/Monitoring/Varz.cs src/NATS.Server/Monitoring/VarzHandler.cs tests/NATS.Server.Tests/MonitorTests.cs
|
||||||
|
git commit -m "feat: expand mqtt varz monitoring with all Go-compatible fields"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 9: Final verification and differences.md update
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `differences.md`
|
||||||
|
|
||||||
|
**Step 1: Run full test suite**
|
||||||
|
|
||||||
|
Run: `dotnet test tests/NATS.Server.Tests/NATS.Server.Tests.csproj -v minimal`
|
||||||
|
Expected: PASS (all tests green, no regressions).
|
||||||
|
|
||||||
|
**Step 2: Update parity document**
|
||||||
|
|
||||||
|
Edit `differences.md`:
|
||||||
|
|
||||||
|
1. In the **Connection Types** table (section 2), update the MQTT row:
|
||||||
|
|
||||||
|
```markdown
|
||||||
|
| MQTT clients | Y | Partial | JWT connection-type constants + config parsing; no MQTT transport yet |
|
||||||
|
```
|
||||||
|
|
||||||
|
2. In the **Connz Response** table (section 7), update the MQTT client ID filtering row:
|
||||||
|
|
||||||
|
```markdown
|
||||||
|
| MQTT client ID filtering | Y | Y | `mqtt_client` query param filters open and closed connections |
|
||||||
|
```
|
||||||
|
|
||||||
|
3. In the **Missing Options Categories** (section 6), replace the "WebSocket/MQTT options" line:
|
||||||
|
|
||||||
|
```markdown
|
||||||
|
- WebSocket options
|
||||||
|
- ~~MQTT options~~ — `mqtt {}` config block parsed with all Go `MQTTOpts` fields; no listener yet
|
||||||
|
```
|
||||||
|
|
||||||
|
4. In the **Auth Mechanisms** table (section 5), add note to JWT row:
|
||||||
|
|
||||||
|
```markdown
|
||||||
|
| JWT validation | Y | Y | ... + `allowed_connection_types` enforcement with Go-compatible semantics |
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 3: Commit docs update**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add differences.md
|
||||||
|
git commit -m "docs: update differences.md for mqtt connection type parity"
|
||||||
|
```
|
||||||
@@ -0,0 +1,15 @@
|
|||||||
|
{
|
||||||
|
"planPath": "docs/plans/2026-02-23-mqtt-connection-type-plan.md",
|
||||||
|
"tasks": [
|
||||||
|
{"id": 2, "subject": "Task 1: Add failing JWT connection-type behavior tests", "status": "pending"},
|
||||||
|
{"id": 3, "subject": "Task 2: Implement auth connection-type model and Go-style conversion", "status": "pending", "blockedBy": [2]},
|
||||||
|
{"id": 4, "subject": "Task 3: Add failing connz mqtt_client filter tests", "status": "pending", "blockedBy": [3]},
|
||||||
|
{"id": 5, "subject": "Task 4: Implement connz mqtt_client filtering", "status": "pending", "blockedBy": [4]},
|
||||||
|
{"id": 6, "subject": "Task 5: Verification checkpoint for JWT + connz tasks", "status": "pending", "blockedBy": [5]},
|
||||||
|
{"id": 7, "subject": "Task 6: Add MqttOptions model and config parsing", "status": "pending", "blockedBy": [6]},
|
||||||
|
{"id": 8, "subject": "Task 7: Add MQTT config parsing tests", "status": "pending", "blockedBy": [7]},
|
||||||
|
{"id": 9, "subject": "Task 8: Expand MqttOptsVarz and wire into /varz", "status": "pending", "blockedBy": [8]},
|
||||||
|
{"id": 10, "subject": "Task 9: Final verification and differences.md update", "status": "pending", "blockedBy": [9]}
|
||||||
|
],
|
||||||
|
"lastUpdated": "2026-02-23T00:00:00Z"
|
||||||
|
}
|
||||||
@@ -1,4 +1,5 @@
|
|||||||
using System.Security.Cryptography.X509Certificates;
|
using System.Security.Cryptography.X509Certificates;
|
||||||
|
using NATS.Server.Auth.Jwt;
|
||||||
using NATS.Server.Protocol;
|
using NATS.Server.Protocol;
|
||||||
|
|
||||||
namespace NATS.Server.Auth;
|
namespace NATS.Server.Auth;
|
||||||
@@ -13,4 +14,11 @@ public sealed class ClientAuthContext
|
|||||||
public required ClientOptions Opts { get; init; }
|
public required ClientOptions Opts { get; init; }
|
||||||
public required byte[] Nonce { get; init; }
|
public required byte[] Nonce { get; init; }
|
||||||
public X509Certificate2? ClientCertificate { get; init; }
|
public X509Certificate2? ClientCertificate { get; init; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// The type of connection (e.g., "STANDARD", "WEBSOCKET", "MQTT", "LEAFNODE").
|
||||||
|
/// Used by JWT authenticator to enforce allowed_connection_types claims.
|
||||||
|
/// Defaults to "STANDARD" for regular NATS client connections.
|
||||||
|
/// </summary>
|
||||||
|
public string ConnectionType { get; init; } = JwtConnectionTypes.Standard;
|
||||||
}
|
}
|
||||||
|
|||||||
34
src/NATS.Server/Auth/Jwt/JwtConnectionTypes.cs
Normal file
34
src/NATS.Server/Auth/Jwt/JwtConnectionTypes.cs
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
namespace NATS.Server.Auth.Jwt;
|
||||||
|
|
||||||
|
internal static class JwtConnectionTypes
|
||||||
|
{
|
||||||
|
public const string Standard = "STANDARD";
|
||||||
|
public const string Websocket = "WEBSOCKET";
|
||||||
|
public const string Leafnode = "LEAFNODE";
|
||||||
|
public const string LeafnodeWs = "LEAFNODE_WS";
|
||||||
|
public const string Mqtt = "MQTT";
|
||||||
|
public const string MqttWs = "MQTT_WS";
|
||||||
|
public const string InProcess = "INPROCESS";
|
||||||
|
|
||||||
|
private static readonly HashSet<string> Known =
|
||||||
|
[
|
||||||
|
Standard, Websocket, Leafnode, LeafnodeWs, Mqtt, MqttWs, InProcess,
|
||||||
|
];
|
||||||
|
|
||||||
|
public static (HashSet<string> Valid, bool HasUnknown) Convert(IEnumerable<string>? values)
|
||||||
|
{
|
||||||
|
var valid = new HashSet<string>(StringComparer.Ordinal);
|
||||||
|
var hasUnknown = false;
|
||||||
|
if (values is null) return (valid, false);
|
||||||
|
|
||||||
|
foreach (var raw in values)
|
||||||
|
{
|
||||||
|
var up = (raw ?? string.Empty).Trim().ToUpperInvariant();
|
||||||
|
if (up.Length == 0) continue;
|
||||||
|
if (Known.Contains(up)) valid.Add(up);
|
||||||
|
else hasUnknown = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return (valid, hasUnknown);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -95,6 +95,24 @@ public sealed class JwtAuthenticator : IAuthenticator
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 7b. Check allowed connection types
|
||||||
|
var (allowedTypes, hasUnknown) = JwtConnectionTypes.Convert(userClaims.Nats?.AllowedConnectionTypes);
|
||||||
|
|
||||||
|
if (allowedTypes.Count == 0)
|
||||||
|
{
|
||||||
|
if (hasUnknown)
|
||||||
|
return null; // unknown-only list should reject
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
var connType = string.IsNullOrWhiteSpace(context.ConnectionType)
|
||||||
|
? JwtConnectionTypes.Standard
|
||||||
|
: context.ConnectionType.ToUpperInvariant();
|
||||||
|
|
||||||
|
if (!allowedTypes.Contains(connType))
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
// 8. Build permissions from JWT claims
|
// 8. Build permissions from JWT claims
|
||||||
Permissions? permissions = null;
|
Permissions? permissions = null;
|
||||||
var nats = userClaims.Nats;
|
var nats = userClaims.Nats;
|
||||||
|
|||||||
@@ -245,6 +245,12 @@ public static class ConfigProcessor
|
|||||||
opts.ReconnectErrorReports = ToInt(value);
|
opts.ReconnectErrorReports = ToInt(value);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
// MQTT
|
||||||
|
case "mqtt":
|
||||||
|
if (value is Dictionary<string, object?> mqttDict)
|
||||||
|
ParseMqtt(mqttDict, opts, errors);
|
||||||
|
break;
|
||||||
|
|
||||||
// Unknown keys silently ignored (cluster, jetstream, gateway, leafnode, etc.)
|
// Unknown keys silently ignored (cluster, jetstream, gateway, leafnode, etc.)
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
@@ -620,6 +626,145 @@ public static class ConfigProcessor
|
|||||||
opts.Tags = tags;
|
opts.Tags = tags;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ─── MQTT parsing ────────────────────────────────────────────────
|
||||||
|
// Reference: Go server/opts.go parseMQTT (lines ~5443-5541)
|
||||||
|
|
||||||
|
private static void ParseMqtt(Dictionary<string, object?> dict, NatsOptions opts, List<string> errors)
|
||||||
|
{
|
||||||
|
var mqtt = opts.Mqtt ?? new MqttOptions();
|
||||||
|
|
||||||
|
foreach (var (key, value) in dict)
|
||||||
|
{
|
||||||
|
switch (key.ToLowerInvariant())
|
||||||
|
{
|
||||||
|
case "listen":
|
||||||
|
var (host, port) = ParseHostPort(value);
|
||||||
|
if (host is not null) mqtt.Host = host;
|
||||||
|
if (port is not null) mqtt.Port = port.Value;
|
||||||
|
break;
|
||||||
|
case "port":
|
||||||
|
mqtt.Port = ToInt(value);
|
||||||
|
break;
|
||||||
|
case "host" or "net":
|
||||||
|
mqtt.Host = ToString(value);
|
||||||
|
break;
|
||||||
|
case "no_auth_user":
|
||||||
|
mqtt.NoAuthUser = ToString(value);
|
||||||
|
break;
|
||||||
|
case "tls":
|
||||||
|
if (value is Dictionary<string, object?> tlsDict)
|
||||||
|
ParseMqttTls(tlsDict, mqtt, errors);
|
||||||
|
break;
|
||||||
|
case "authorization" or "authentication":
|
||||||
|
if (value is Dictionary<string, object?> authDict)
|
||||||
|
ParseMqttAuth(authDict, mqtt, errors);
|
||||||
|
break;
|
||||||
|
case "ack_wait" or "ackwait":
|
||||||
|
mqtt.AckWait = ParseDuration(value);
|
||||||
|
break;
|
||||||
|
case "js_api_timeout" or "api_timeout":
|
||||||
|
mqtt.JsApiTimeout = ParseDuration(value);
|
||||||
|
break;
|
||||||
|
case "max_ack_pending" or "max_pending" or "max_inflight":
|
||||||
|
var pending = ToInt(value);
|
||||||
|
if (pending < 0 || pending > 0xFFFF)
|
||||||
|
errors.Add($"mqtt max_ack_pending invalid value {pending}, should be in [0..{0xFFFF}] range");
|
||||||
|
else
|
||||||
|
mqtt.MaxAckPending = (ushort)pending;
|
||||||
|
break;
|
||||||
|
case "js_domain":
|
||||||
|
mqtt.JsDomain = ToString(value);
|
||||||
|
break;
|
||||||
|
case "stream_replicas":
|
||||||
|
mqtt.StreamReplicas = ToInt(value);
|
||||||
|
break;
|
||||||
|
case "consumer_replicas":
|
||||||
|
mqtt.ConsumerReplicas = ToInt(value);
|
||||||
|
break;
|
||||||
|
case "consumer_memory_storage":
|
||||||
|
mqtt.ConsumerMemoryStorage = ToBool(value);
|
||||||
|
break;
|
||||||
|
case "consumer_inactive_threshold" or "consumer_auto_cleanup":
|
||||||
|
mqtt.ConsumerInactiveThreshold = ParseDuration(value);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
opts.Mqtt = mqtt;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void ParseMqttAuth(Dictionary<string, object?> dict, MqttOptions mqtt, List<string> errors)
|
||||||
|
{
|
||||||
|
foreach (var (key, value) in dict)
|
||||||
|
{
|
||||||
|
switch (key.ToLowerInvariant())
|
||||||
|
{
|
||||||
|
case "user" or "username":
|
||||||
|
mqtt.Username = ToString(value);
|
||||||
|
break;
|
||||||
|
case "pass" or "password":
|
||||||
|
mqtt.Password = ToString(value);
|
||||||
|
break;
|
||||||
|
case "token":
|
||||||
|
mqtt.Token = ToString(value);
|
||||||
|
break;
|
||||||
|
case "timeout":
|
||||||
|
mqtt.AuthTimeout = ToDouble(value);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void ParseMqttTls(Dictionary<string, object?> dict, MqttOptions mqtt, List<string> errors)
|
||||||
|
{
|
||||||
|
foreach (var (key, value) in dict)
|
||||||
|
{
|
||||||
|
switch (key.ToLowerInvariant())
|
||||||
|
{
|
||||||
|
case "cert_file":
|
||||||
|
mqtt.TlsCert = ToString(value);
|
||||||
|
break;
|
||||||
|
case "key_file":
|
||||||
|
mqtt.TlsKey = ToString(value);
|
||||||
|
break;
|
||||||
|
case "ca_file":
|
||||||
|
mqtt.TlsCaCert = ToString(value);
|
||||||
|
break;
|
||||||
|
case "verify":
|
||||||
|
mqtt.TlsVerify = ToBool(value);
|
||||||
|
break;
|
||||||
|
case "verify_and_map":
|
||||||
|
var map = ToBool(value);
|
||||||
|
mqtt.TlsMap = map;
|
||||||
|
if (map) mqtt.TlsVerify = true;
|
||||||
|
break;
|
||||||
|
case "timeout":
|
||||||
|
mqtt.TlsTimeout = ToDouble(value);
|
||||||
|
break;
|
||||||
|
case "pinned_certs":
|
||||||
|
if (value is List<object?> pinnedList)
|
||||||
|
{
|
||||||
|
var certs = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
|
||||||
|
foreach (var item in pinnedList)
|
||||||
|
{
|
||||||
|
if (item is string s)
|
||||||
|
certs.Add(s.ToLowerInvariant());
|
||||||
|
}
|
||||||
|
|
||||||
|
mqtt.TlsPinnedCerts = certs;
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ─── Type conversion helpers ───────────────────────────────────
|
// ─── Type conversion helpers ───────────────────────────────────
|
||||||
|
|
||||||
private static int ToInt(object? value) => value switch
|
private static int ToInt(object? value) => value switch
|
||||||
@@ -653,6 +798,15 @@ public static class ConfigProcessor
|
|||||||
_ => throw new FormatException($"Cannot convert {value?.GetType().Name ?? "null"} to string"),
|
_ => throw new FormatException($"Cannot convert {value?.GetType().Name ?? "null"} to string"),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
private static double ToDouble(object? value) => value switch
|
||||||
|
{
|
||||||
|
double d => d,
|
||||||
|
long l => l,
|
||||||
|
int i => i,
|
||||||
|
string s when double.TryParse(s, NumberStyles.Float, CultureInfo.InvariantCulture, out var d) => d,
|
||||||
|
_ => throw new FormatException($"Cannot convert {value?.GetType().Name ?? "null"} to double"),
|
||||||
|
};
|
||||||
|
|
||||||
private static IReadOnlyList<string> ToStringList(object? value)
|
private static IReadOnlyList<string> ToStringList(object? value)
|
||||||
{
|
{
|
||||||
if (value is List<object?> list)
|
if (value is List<object?> list)
|
||||||
|
|||||||
@@ -22,4 +22,5 @@ public sealed record ClosedClient
|
|||||||
public TimeSpan Rtt { get; init; }
|
public TimeSpan Rtt { get; init; }
|
||||||
public string TlsVersion { get; init; } = "";
|
public string TlsVersion { get; init; } = "";
|
||||||
public string TlsCipherSuite { get; init; } = "";
|
public string TlsCipherSuite { get; init; } = "";
|
||||||
|
public string MqttClient { get; init; } = "";
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -204,6 +204,8 @@ public sealed class ConnzOptions
|
|||||||
|
|
||||||
public string FilterSubject { get; set; } = "";
|
public string FilterSubject { get; set; } = "";
|
||||||
|
|
||||||
|
public string MqttClient { get; set; } = "";
|
||||||
|
|
||||||
public int Offset { get; set; }
|
public int Offset { get; set; }
|
||||||
|
|
||||||
public int Limit { get; set; } = 1024;
|
public int Limit { get; set; } = 1024;
|
||||||
|
|||||||
@@ -28,6 +28,10 @@ public sealed class ConnzHandler(NatsServer server)
|
|||||||
connInfos.AddRange(server.GetClosedClients().Select(c => BuildClosedConnInfo(c, now, opts)));
|
connInfos.AddRange(server.GetClosedClients().Select(c => BuildClosedConnInfo(c, now, opts)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Filter by MQTT client ID
|
||||||
|
if (!string.IsNullOrEmpty(opts.MqttClient))
|
||||||
|
connInfos = connInfos.Where(c => c.MqttClient == opts.MqttClient).ToList();
|
||||||
|
|
||||||
// Validate sort options that require closed state
|
// Validate sort options that require closed state
|
||||||
if (opts.Sort is SortOpt.ByStop or SortOpt.ByReason && opts.State == ConnState.Open)
|
if (opts.Sort is SortOpt.ByStop or SortOpt.ByReason && opts.State == ConnState.Open)
|
||||||
opts.Sort = SortOpt.ByCid; // Fallback
|
opts.Sort = SortOpt.ByCid; // Fallback
|
||||||
@@ -142,6 +146,7 @@ public sealed class ConnzHandler(NatsServer server)
|
|||||||
Rtt = FormatRtt(closed.Rtt),
|
Rtt = FormatRtt(closed.Rtt),
|
||||||
TlsVersion = closed.TlsVersion,
|
TlsVersion = closed.TlsVersion,
|
||||||
TlsCipherSuite = closed.TlsCipherSuite,
|
TlsCipherSuite = closed.TlsCipherSuite,
|
||||||
|
MqttClient = closed.MqttClient,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -197,6 +202,9 @@ public sealed class ConnzHandler(NatsServer server)
|
|||||||
if (q.TryGetValue("limit", out var limit) && int.TryParse(limit, out var l))
|
if (q.TryGetValue("limit", out var limit) && int.TryParse(limit, out var l))
|
||||||
opts.Limit = l;
|
opts.Limit = l;
|
||||||
|
|
||||||
|
if (q.TryGetValue("mqtt_client", out var mqttClient))
|
||||||
|
opts.MqttClient = mqttClient.ToString();
|
||||||
|
|
||||||
return opts;
|
return opts;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -355,8 +355,29 @@ public sealed class MqttOptsVarz
|
|||||||
[JsonPropertyName("port")]
|
[JsonPropertyName("port")]
|
||||||
public int Port { get; set; }
|
public int Port { get; set; }
|
||||||
|
|
||||||
|
[JsonPropertyName("no_auth_user")]
|
||||||
|
public string NoAuthUser { get; set; } = "";
|
||||||
|
|
||||||
|
[JsonPropertyName("auth_timeout")]
|
||||||
|
public double AuthTimeout { get; set; }
|
||||||
|
|
||||||
|
[JsonPropertyName("tls_map")]
|
||||||
|
public bool TlsMap { get; set; }
|
||||||
|
|
||||||
[JsonPropertyName("tls_timeout")]
|
[JsonPropertyName("tls_timeout")]
|
||||||
public double TlsTimeout { get; set; }
|
public double TlsTimeout { get; set; }
|
||||||
|
|
||||||
|
[JsonPropertyName("tls_pinned_certs")]
|
||||||
|
public string[] TlsPinnedCerts { get; set; } = [];
|
||||||
|
|
||||||
|
[JsonPropertyName("js_domain")]
|
||||||
|
public string JsDomain { get; set; } = "";
|
||||||
|
|
||||||
|
[JsonPropertyName("ack_wait")]
|
||||||
|
public long AckWait { get; set; }
|
||||||
|
|
||||||
|
[JsonPropertyName("max_ack_pending")]
|
||||||
|
public ushort MaxAckPending { get; set; }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
|||||||
@@ -121,6 +121,7 @@ public sealed class VarzHandler : IDisposable
|
|||||||
Subscriptions = _server.SubList.Count,
|
Subscriptions = _server.SubList.Count,
|
||||||
ConfigLoadTime = _server.StartTime,
|
ConfigLoadTime = _server.StartTime,
|
||||||
HttpReqStats = stats.HttpReqStats.ToDictionary(kv => kv.Key, kv => (ulong)kv.Value),
|
HttpReqStats = stats.HttpReqStats.ToDictionary(kv => kv.Key, kv => (ulong)kv.Value),
|
||||||
|
Mqtt = BuildMqttVarz(),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
@@ -134,6 +135,27 @@ public sealed class VarzHandler : IDisposable
|
|||||||
_varzMu.Dispose();
|
_varzMu.Dispose();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private MqttOptsVarz BuildMqttVarz()
|
||||||
|
{
|
||||||
|
var mqtt = _options.Mqtt;
|
||||||
|
if (mqtt is null)
|
||||||
|
return new MqttOptsVarz();
|
||||||
|
|
||||||
|
return new MqttOptsVarz
|
||||||
|
{
|
||||||
|
Host = mqtt.Host,
|
||||||
|
Port = mqtt.Port,
|
||||||
|
NoAuthUser = mqtt.NoAuthUser ?? "",
|
||||||
|
AuthTimeout = mqtt.AuthTimeout,
|
||||||
|
TlsMap = mqtt.TlsMap,
|
||||||
|
TlsTimeout = mqtt.TlsTimeout,
|
||||||
|
TlsPinnedCerts = mqtt.TlsPinnedCerts?.ToArray() ?? [],
|
||||||
|
JsDomain = mqtt.JsDomain ?? "",
|
||||||
|
AckWait = (long)mqtt.AckWait.TotalNanoseconds,
|
||||||
|
MaxAckPending = mqtt.MaxAckPending,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Formats a TimeSpan as a human-readable uptime string matching Go server format.
|
/// Formats a TimeSpan as a human-readable uptime string matching Go server format.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
|||||||
43
src/NATS.Server/MqttOptions.cs
Normal file
43
src/NATS.Server/MqttOptions.cs
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
namespace NATS.Server;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// MQTT protocol configuration options.
|
||||||
|
/// Corresponds to Go server/opts.go MQTTOpts struct.
|
||||||
|
/// Config is parsed and stored but no MQTT listener is started yet.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class MqttOptions
|
||||||
|
{
|
||||||
|
// Network
|
||||||
|
public string Host { get; set; } = "";
|
||||||
|
public int Port { get; set; }
|
||||||
|
|
||||||
|
// Auth override (MQTT-specific, separate from global auth)
|
||||||
|
public string? NoAuthUser { get; set; }
|
||||||
|
public string? Username { get; set; }
|
||||||
|
public string? Password { get; set; }
|
||||||
|
public string? Token { get; set; }
|
||||||
|
public double AuthTimeout { get; set; }
|
||||||
|
|
||||||
|
// TLS
|
||||||
|
public string? TlsCert { get; set; }
|
||||||
|
public string? TlsKey { get; set; }
|
||||||
|
public string? TlsCaCert { get; set; }
|
||||||
|
public bool TlsVerify { get; set; }
|
||||||
|
public double TlsTimeout { get; set; } = 2.0;
|
||||||
|
public bool TlsMap { get; set; }
|
||||||
|
public HashSet<string>? TlsPinnedCerts { get; set; }
|
||||||
|
|
||||||
|
// JetStream integration
|
||||||
|
public string? JsDomain { get; set; }
|
||||||
|
public int StreamReplicas { get; set; }
|
||||||
|
public int ConsumerReplicas { get; set; }
|
||||||
|
public bool ConsumerMemoryStorage { get; set; }
|
||||||
|
public TimeSpan ConsumerInactiveThreshold { get; set; }
|
||||||
|
|
||||||
|
// QoS
|
||||||
|
public TimeSpan AckWait { get; set; } = TimeSpan.FromSeconds(30);
|
||||||
|
public ushort MaxAckPending { get; set; }
|
||||||
|
public TimeSpan JsApiTimeout { get; set; } = TimeSpan.FromSeconds(5);
|
||||||
|
|
||||||
|
public bool HasTls => TlsCert != null && TlsKey != null;
|
||||||
|
}
|
||||||
@@ -8,6 +8,7 @@ using System.Text.Json;
|
|||||||
using System.Threading.Channels;
|
using System.Threading.Channels;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using NATS.Server.Auth;
|
using NATS.Server.Auth;
|
||||||
|
using NATS.Server.Auth.Jwt;
|
||||||
using NATS.Server.Protocol;
|
using NATS.Server.Protocol;
|
||||||
using NATS.Server.Subscriptions;
|
using NATS.Server.Subscriptions;
|
||||||
using NATS.Server.Tls;
|
using NATS.Server.Tls;
|
||||||
@@ -391,6 +392,7 @@ public sealed class NatsClient : INatsClient, IDisposable
|
|||||||
Opts = ClientOpts,
|
Opts = ClientOpts,
|
||||||
Nonce = _nonce ?? [],
|
Nonce = _nonce ?? [],
|
||||||
ClientCertificate = TlsState?.PeerCert,
|
ClientCertificate = TlsState?.PeerCert,
|
||||||
|
ConnectionType = JwtConnectionTypes.Standard,
|
||||||
};
|
};
|
||||||
|
|
||||||
authResult = _authService.Authenticate(context);
|
authResult = _authService.Authenticate(context);
|
||||||
|
|||||||
@@ -115,6 +115,9 @@ public sealed class NatsOptions
|
|||||||
// Subject mapping / transforms (source pattern -> destination template)
|
// Subject mapping / transforms (source pattern -> destination template)
|
||||||
public Dictionary<string, string>? SubjectMappings { get; set; }
|
public Dictionary<string, string>? SubjectMappings { get; set; }
|
||||||
|
|
||||||
|
// MQTT configuration (parsed from config, no listener yet)
|
||||||
|
public MqttOptions? Mqtt { get; set; }
|
||||||
|
|
||||||
public bool HasTls => TlsCert != null && TlsKey != null;
|
public bool HasTls => TlsCert != null && TlsKey != null;
|
||||||
|
|
||||||
// WebSocket
|
// WebSocket
|
||||||
|
|||||||
@@ -1221,6 +1221,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
Rtt = client.Rtt,
|
Rtt = client.Rtt,
|
||||||
TlsVersion = client.TlsState?.TlsVersion ?? "",
|
TlsVersion = client.TlsState?.TlsVersion ?? "",
|
||||||
TlsCipherSuite = client.TlsState?.CipherSuite ?? "",
|
TlsCipherSuite = client.TlsState?.CipherSuite ?? "",
|
||||||
|
MqttClient = "", // populated when MQTT transport is implemented
|
||||||
});
|
});
|
||||||
|
|
||||||
// Cap closed clients list
|
// Cap closed clients list
|
||||||
|
|||||||
@@ -501,4 +501,112 @@ public class ConfigProcessorTests
|
|||||||
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("tls.conf"));
|
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("tls.conf"));
|
||||||
opts.HasTls.ShouldBeTrue();
|
opts.HasTls.ShouldBeTrue();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ─── MQTT config ────────────────────────────────────────────
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void MqttConf_ListenHostAndPort()
|
||||||
|
{
|
||||||
|
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("mqtt.conf"));
|
||||||
|
opts.Mqtt.ShouldNotBeNull();
|
||||||
|
opts.Mqtt!.Host.ShouldBe("10.0.0.1");
|
||||||
|
opts.Mqtt.Port.ShouldBe(1883);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void MqttConf_NoAuthUser()
|
||||||
|
{
|
||||||
|
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("mqtt.conf"));
|
||||||
|
opts.Mqtt.ShouldNotBeNull();
|
||||||
|
opts.Mqtt!.NoAuthUser.ShouldBe("mqtt_default");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void MqttConf_Authorization()
|
||||||
|
{
|
||||||
|
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("mqtt.conf"));
|
||||||
|
opts.Mqtt.ShouldNotBeNull();
|
||||||
|
opts.Mqtt!.Username.ShouldBe("mqtt_user");
|
||||||
|
opts.Mqtt.Password.ShouldBe("mqtt_pass");
|
||||||
|
opts.Mqtt.Token.ShouldBe("mqtt_token");
|
||||||
|
opts.Mqtt.AuthTimeout.ShouldBe(3.0);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void MqttConf_Tls()
|
||||||
|
{
|
||||||
|
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("mqtt.conf"));
|
||||||
|
opts.Mqtt.ShouldNotBeNull();
|
||||||
|
opts.Mqtt!.TlsCert.ShouldBe("/path/to/mqtt-cert.pem");
|
||||||
|
opts.Mqtt.TlsKey.ShouldBe("/path/to/mqtt-key.pem");
|
||||||
|
opts.Mqtt.TlsCaCert.ShouldBe("/path/to/mqtt-ca.pem");
|
||||||
|
opts.Mqtt.TlsVerify.ShouldBeTrue();
|
||||||
|
opts.Mqtt.TlsTimeout.ShouldBe(5.0);
|
||||||
|
opts.Mqtt.HasTls.ShouldBeTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void MqttConf_QosSettings()
|
||||||
|
{
|
||||||
|
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("mqtt.conf"));
|
||||||
|
opts.Mqtt.ShouldNotBeNull();
|
||||||
|
opts.Mqtt!.AckWait.ShouldBe(TimeSpan.FromSeconds(60));
|
||||||
|
opts.Mqtt.MaxAckPending.ShouldBe((ushort)2048);
|
||||||
|
opts.Mqtt.JsApiTimeout.ShouldBe(TimeSpan.FromSeconds(10));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void MqttConf_JetStreamSettings()
|
||||||
|
{
|
||||||
|
var opts = ConfigProcessor.ProcessConfigFile(TestDataPath("mqtt.conf"));
|
||||||
|
opts.Mqtt.ShouldNotBeNull();
|
||||||
|
opts.Mqtt!.JsDomain.ShouldBe("mqtt-domain");
|
||||||
|
opts.Mqtt.StreamReplicas.ShouldBe(3);
|
||||||
|
opts.Mqtt.ConsumerReplicas.ShouldBe(1);
|
||||||
|
opts.Mqtt.ConsumerMemoryStorage.ShouldBeTrue();
|
||||||
|
opts.Mqtt.ConsumerInactiveThreshold.ShouldBe(TimeSpan.FromMinutes(5));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void MqttConf_MaxAckPendingValidation_ReportsError()
|
||||||
|
{
|
||||||
|
var ex = Should.Throw<ConfigProcessorException>(() =>
|
||||||
|
ConfigProcessor.ProcessConfig("""
|
||||||
|
mqtt {
|
||||||
|
max_ack_pending: 70000
|
||||||
|
}
|
||||||
|
"""));
|
||||||
|
ex.Errors.ShouldContain(e => e.Contains("max_ack_pending"));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void MqttConf_Aliases()
|
||||||
|
{
|
||||||
|
// Test alias keys: "ackwait" (alias for "ack_wait"), "net" (alias for "host"),
|
||||||
|
// "max_inflight" (alias for "max_ack_pending"), "consumer_auto_cleanup" (alias)
|
||||||
|
var opts = ConfigProcessor.ProcessConfig("""
|
||||||
|
mqtt {
|
||||||
|
net: "127.0.0.1"
|
||||||
|
port: 1884
|
||||||
|
ackwait: "45s"
|
||||||
|
max_inflight: 500
|
||||||
|
api_timeout: "8s"
|
||||||
|
consumer_auto_cleanup: "10m"
|
||||||
|
}
|
||||||
|
""");
|
||||||
|
opts.Mqtt.ShouldNotBeNull();
|
||||||
|
opts.Mqtt!.Host.ShouldBe("127.0.0.1");
|
||||||
|
opts.Mqtt.Port.ShouldBe(1884);
|
||||||
|
opts.Mqtt.AckWait.ShouldBe(TimeSpan.FromSeconds(45));
|
||||||
|
opts.Mqtt.MaxAckPending.ShouldBe((ushort)500);
|
||||||
|
opts.Mqtt.JsApiTimeout.ShouldBe(TimeSpan.FromSeconds(8));
|
||||||
|
opts.Mqtt.ConsumerInactiveThreshold.ShouldBe(TimeSpan.FromMinutes(10));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void MqttConf_Absent_ReturnsNull()
|
||||||
|
{
|
||||||
|
var opts = ConfigProcessor.ProcessConfig("port: 4222");
|
||||||
|
opts.Mqtt.ShouldBeNull();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -588,4 +588,279 @@ public class JwtAuthenticatorTests
|
|||||||
|
|
||||||
auth.Authenticate(ctx).ShouldBeNull();
|
auth.Authenticate(ctx).ShouldBeNull();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// =========================================================================
|
||||||
|
// allowed_connection_types tests
|
||||||
|
// =========================================================================
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Allowed_connection_types_allows_standard_context()
|
||||||
|
{
|
||||||
|
var operatorKp = KeyPair.CreatePair(PrefixByte.Operator);
|
||||||
|
var accountKp = KeyPair.CreatePair(PrefixByte.Account);
|
||||||
|
var userKp = KeyPair.CreatePair(PrefixByte.User);
|
||||||
|
|
||||||
|
var operatorPub = operatorKp.GetPublicKey();
|
||||||
|
var accountPub = accountKp.GetPublicKey();
|
||||||
|
var userPub = userKp.GetPublicKey();
|
||||||
|
|
||||||
|
var accountPayload = $$"""
|
||||||
|
{
|
||||||
|
"sub":"{{accountPub}}",
|
||||||
|
"iss":"{{operatorPub}}",
|
||||||
|
"iat":1700000000,
|
||||||
|
"nats":{"type":"account","version":2}
|
||||||
|
}
|
||||||
|
""";
|
||||||
|
var accountJwt = BuildSignedToken(accountPayload, operatorKp);
|
||||||
|
|
||||||
|
var userPayload = $$"""
|
||||||
|
{
|
||||||
|
"sub":"{{userPub}}",
|
||||||
|
"iss":"{{accountPub}}",
|
||||||
|
"iat":1700000000,
|
||||||
|
"nats":{
|
||||||
|
"type":"user","version":2,
|
||||||
|
"bearer_token":true,
|
||||||
|
"issuer_account":"{{accountPub}}",
|
||||||
|
"allowed_connection_types":["STANDARD"]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
""";
|
||||||
|
var userJwt = BuildSignedToken(userPayload, accountKp);
|
||||||
|
|
||||||
|
var resolver = new MemAccountResolver();
|
||||||
|
await resolver.StoreAsync(accountPub, accountJwt);
|
||||||
|
|
||||||
|
var auth = new JwtAuthenticator([operatorPub], resolver);
|
||||||
|
|
||||||
|
var ctx = new ClientAuthContext
|
||||||
|
{
|
||||||
|
Opts = new ClientOptions { JWT = userJwt },
|
||||||
|
Nonce = "nonce"u8.ToArray(),
|
||||||
|
ConnectionType = "STANDARD",
|
||||||
|
};
|
||||||
|
|
||||||
|
var result = auth.Authenticate(ctx);
|
||||||
|
|
||||||
|
result.ShouldNotBeNull();
|
||||||
|
result.Identity.ShouldBe(userPub);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Allowed_connection_types_rejects_mqtt_only_for_standard_context()
|
||||||
|
{
|
||||||
|
var operatorKp = KeyPair.CreatePair(PrefixByte.Operator);
|
||||||
|
var accountKp = KeyPair.CreatePair(PrefixByte.Account);
|
||||||
|
var userKp = KeyPair.CreatePair(PrefixByte.User);
|
||||||
|
|
||||||
|
var operatorPub = operatorKp.GetPublicKey();
|
||||||
|
var accountPub = accountKp.GetPublicKey();
|
||||||
|
var userPub = userKp.GetPublicKey();
|
||||||
|
|
||||||
|
var accountPayload = $$"""
|
||||||
|
{
|
||||||
|
"sub":"{{accountPub}}",
|
||||||
|
"iss":"{{operatorPub}}",
|
||||||
|
"iat":1700000000,
|
||||||
|
"nats":{"type":"account","version":2}
|
||||||
|
}
|
||||||
|
""";
|
||||||
|
var accountJwt = BuildSignedToken(accountPayload, operatorKp);
|
||||||
|
|
||||||
|
// User JWT only allows MQTT connections
|
||||||
|
var userPayload = $$"""
|
||||||
|
{
|
||||||
|
"sub":"{{userPub}}",
|
||||||
|
"iss":"{{accountPub}}",
|
||||||
|
"iat":1700000000,
|
||||||
|
"nats":{
|
||||||
|
"type":"user","version":2,
|
||||||
|
"bearer_token":true,
|
||||||
|
"issuer_account":"{{accountPub}}",
|
||||||
|
"allowed_connection_types":["MQTT"]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
""";
|
||||||
|
var userJwt = BuildSignedToken(userPayload, accountKp);
|
||||||
|
|
||||||
|
var resolver = new MemAccountResolver();
|
||||||
|
await resolver.StoreAsync(accountPub, accountJwt);
|
||||||
|
|
||||||
|
var auth = new JwtAuthenticator([operatorPub], resolver);
|
||||||
|
|
||||||
|
var ctx = new ClientAuthContext
|
||||||
|
{
|
||||||
|
Opts = new ClientOptions { JWT = userJwt },
|
||||||
|
Nonce = "nonce"u8.ToArray(),
|
||||||
|
ConnectionType = "STANDARD",
|
||||||
|
};
|
||||||
|
|
||||||
|
// Should reject: STANDARD is not in allowed_connection_types
|
||||||
|
auth.Authenticate(ctx).ShouldBeNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Allowed_connection_types_allows_known_even_with_unknown_values()
|
||||||
|
{
|
||||||
|
var operatorKp = KeyPair.CreatePair(PrefixByte.Operator);
|
||||||
|
var accountKp = KeyPair.CreatePair(PrefixByte.Account);
|
||||||
|
var userKp = KeyPair.CreatePair(PrefixByte.User);
|
||||||
|
|
||||||
|
var operatorPub = operatorKp.GetPublicKey();
|
||||||
|
var accountPub = accountKp.GetPublicKey();
|
||||||
|
var userPub = userKp.GetPublicKey();
|
||||||
|
|
||||||
|
var accountPayload = $$"""
|
||||||
|
{
|
||||||
|
"sub":"{{accountPub}}",
|
||||||
|
"iss":"{{operatorPub}}",
|
||||||
|
"iat":1700000000,
|
||||||
|
"nats":{"type":"account","version":2}
|
||||||
|
}
|
||||||
|
""";
|
||||||
|
var accountJwt = BuildSignedToken(accountPayload, operatorKp);
|
||||||
|
|
||||||
|
// User JWT allows STANDARD and an unknown type
|
||||||
|
var userPayload = $$"""
|
||||||
|
{
|
||||||
|
"sub":"{{userPub}}",
|
||||||
|
"iss":"{{accountPub}}",
|
||||||
|
"iat":1700000000,
|
||||||
|
"nats":{
|
||||||
|
"type":"user","version":2,
|
||||||
|
"bearer_token":true,
|
||||||
|
"issuer_account":"{{accountPub}}",
|
||||||
|
"allowed_connection_types":["STANDARD","SOME_NEW_TYPE"]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
""";
|
||||||
|
var userJwt = BuildSignedToken(userPayload, accountKp);
|
||||||
|
|
||||||
|
var resolver = new MemAccountResolver();
|
||||||
|
await resolver.StoreAsync(accountPub, accountJwt);
|
||||||
|
|
||||||
|
var auth = new JwtAuthenticator([operatorPub], resolver);
|
||||||
|
|
||||||
|
var ctx = new ClientAuthContext
|
||||||
|
{
|
||||||
|
Opts = new ClientOptions { JWT = userJwt },
|
||||||
|
Nonce = "nonce"u8.ToArray(),
|
||||||
|
ConnectionType = "STANDARD",
|
||||||
|
};
|
||||||
|
|
||||||
|
var result = auth.Authenticate(ctx);
|
||||||
|
|
||||||
|
result.ShouldNotBeNull();
|
||||||
|
result.Identity.ShouldBe(userPub);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Allowed_connection_types_rejects_when_only_unknown_values_present()
|
||||||
|
{
|
||||||
|
var operatorKp = KeyPair.CreatePair(PrefixByte.Operator);
|
||||||
|
var accountKp = KeyPair.CreatePair(PrefixByte.Account);
|
||||||
|
var userKp = KeyPair.CreatePair(PrefixByte.User);
|
||||||
|
|
||||||
|
var operatorPub = operatorKp.GetPublicKey();
|
||||||
|
var accountPub = accountKp.GetPublicKey();
|
||||||
|
var userPub = userKp.GetPublicKey();
|
||||||
|
|
||||||
|
var accountPayload = $$"""
|
||||||
|
{
|
||||||
|
"sub":"{{accountPub}}",
|
||||||
|
"iss":"{{operatorPub}}",
|
||||||
|
"iat":1700000000,
|
||||||
|
"nats":{"type":"account","version":2}
|
||||||
|
}
|
||||||
|
""";
|
||||||
|
var accountJwt = BuildSignedToken(accountPayload, operatorKp);
|
||||||
|
|
||||||
|
// User JWT only allows an unknown connection type
|
||||||
|
var userPayload = $$"""
|
||||||
|
{
|
||||||
|
"sub":"{{userPub}}",
|
||||||
|
"iss":"{{accountPub}}",
|
||||||
|
"iat":1700000000,
|
||||||
|
"nats":{
|
||||||
|
"type":"user","version":2,
|
||||||
|
"bearer_token":true,
|
||||||
|
"issuer_account":"{{accountPub}}",
|
||||||
|
"allowed_connection_types":["SOME_NEW_TYPE"]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
""";
|
||||||
|
var userJwt = BuildSignedToken(userPayload, accountKp);
|
||||||
|
|
||||||
|
var resolver = new MemAccountResolver();
|
||||||
|
await resolver.StoreAsync(accountPub, accountJwt);
|
||||||
|
|
||||||
|
var auth = new JwtAuthenticator([operatorPub], resolver);
|
||||||
|
|
||||||
|
var ctx = new ClientAuthContext
|
||||||
|
{
|
||||||
|
Opts = new ClientOptions { JWT = userJwt },
|
||||||
|
Nonce = "nonce"u8.ToArray(),
|
||||||
|
ConnectionType = "STANDARD",
|
||||||
|
};
|
||||||
|
|
||||||
|
// Should reject: STANDARD is not in allowed_connection_types
|
||||||
|
auth.Authenticate(ctx).ShouldBeNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Allowed_connection_types_is_case_insensitive_for_input_values()
|
||||||
|
{
|
||||||
|
var operatorKp = KeyPair.CreatePair(PrefixByte.Operator);
|
||||||
|
var accountKp = KeyPair.CreatePair(PrefixByte.Account);
|
||||||
|
var userKp = KeyPair.CreatePair(PrefixByte.User);
|
||||||
|
|
||||||
|
var operatorPub = operatorKp.GetPublicKey();
|
||||||
|
var accountPub = accountKp.GetPublicKey();
|
||||||
|
var userPub = userKp.GetPublicKey();
|
||||||
|
|
||||||
|
var accountPayload = $$"""
|
||||||
|
{
|
||||||
|
"sub":"{{accountPub}}",
|
||||||
|
"iss":"{{operatorPub}}",
|
||||||
|
"iat":1700000000,
|
||||||
|
"nats":{"type":"account","version":2}
|
||||||
|
}
|
||||||
|
""";
|
||||||
|
var accountJwt = BuildSignedToken(accountPayload, operatorKp);
|
||||||
|
|
||||||
|
// User JWT allows "standard" (lowercase)
|
||||||
|
var userPayload = $$"""
|
||||||
|
{
|
||||||
|
"sub":"{{userPub}}",
|
||||||
|
"iss":"{{accountPub}}",
|
||||||
|
"iat":1700000000,
|
||||||
|
"nats":{
|
||||||
|
"type":"user","version":2,
|
||||||
|
"bearer_token":true,
|
||||||
|
"issuer_account":"{{accountPub}}",
|
||||||
|
"allowed_connection_types":["standard"]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
""";
|
||||||
|
var userJwt = BuildSignedToken(userPayload, accountKp);
|
||||||
|
|
||||||
|
var resolver = new MemAccountResolver();
|
||||||
|
await resolver.StoreAsync(accountPub, accountJwt);
|
||||||
|
|
||||||
|
var auth = new JwtAuthenticator([operatorPub], resolver);
|
||||||
|
|
||||||
|
var ctx = new ClientAuthContext
|
||||||
|
{
|
||||||
|
Opts = new ClientOptions { JWT = userJwt },
|
||||||
|
Nonce = "nonce"u8.ToArray(),
|
||||||
|
ConnectionType = "STANDARD",
|
||||||
|
};
|
||||||
|
|
||||||
|
// Should allow: case-insensitive match of "standard" == "STANDARD"
|
||||||
|
var result = auth.Authenticate(ctx);
|
||||||
|
|
||||||
|
result.ShouldNotBeNull();
|
||||||
|
result.Identity.ShouldBe(userPub);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -203,6 +203,51 @@ public class MonitorTests : IAsyncLifetime
|
|||||||
closed.Reason.ShouldNotBeNullOrEmpty();
|
closed.Reason.ShouldNotBeNullOrEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Connz_filters_by_mqtt_client_for_open_connections()
|
||||||
|
{
|
||||||
|
// Connect a regular NATS client (no MQTT ID)
|
||||||
|
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||||
|
await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort));
|
||||||
|
using var stream = new NetworkStream(sock);
|
||||||
|
var buf = new byte[4096];
|
||||||
|
_ = await stream.ReadAsync(buf);
|
||||||
|
await stream.WriteAsync("CONNECT {}\r\n"u8.ToArray());
|
||||||
|
await Task.Delay(200);
|
||||||
|
|
||||||
|
// Query for an MQTT client ID that no connection has
|
||||||
|
var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/connz?mqtt_client=some-id");
|
||||||
|
response.StatusCode.ShouldBe(HttpStatusCode.OK);
|
||||||
|
|
||||||
|
var connz = await response.Content.ReadFromJsonAsync<Connz>();
|
||||||
|
connz.ShouldNotBeNull();
|
||||||
|
connz.NumConns.ShouldBe(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Connz_filters_by_mqtt_client_for_closed_connections()
|
||||||
|
{
|
||||||
|
// Connect then disconnect a client so it appears in closed list
|
||||||
|
var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||||
|
await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort));
|
||||||
|
using var stream = new NetworkStream(sock);
|
||||||
|
var buf = new byte[4096];
|
||||||
|
_ = await stream.ReadAsync(buf);
|
||||||
|
await stream.WriteAsync("CONNECT {}\r\n"u8.ToArray());
|
||||||
|
await Task.Delay(200);
|
||||||
|
sock.Shutdown(SocketShutdown.Both);
|
||||||
|
sock.Dispose();
|
||||||
|
await Task.Delay(500);
|
||||||
|
|
||||||
|
// Query closed connections with an MQTT client ID that no connection has
|
||||||
|
var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/connz?state=closed&mqtt_client=missing-id");
|
||||||
|
response.StatusCode.ShouldBe(HttpStatusCode.OK);
|
||||||
|
|
||||||
|
var connz = await response.Content.ReadFromJsonAsync<Connz>();
|
||||||
|
connz.ShouldNotBeNull();
|
||||||
|
connz.NumConns.ShouldBe(0);
|
||||||
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
public async Task Connz_sort_by_stop_requires_closed_state()
|
public async Task Connz_sort_by_stop_requires_closed_state()
|
||||||
{
|
{
|
||||||
@@ -226,6 +271,23 @@ public class MonitorTests : IAsyncLifetime
|
|||||||
response.StatusCode.ShouldBe(HttpStatusCode.OK);
|
response.StatusCode.ShouldBe(HttpStatusCode.OK);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Varz_includes_mqtt_section()
|
||||||
|
{
|
||||||
|
var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/varz");
|
||||||
|
response.StatusCode.ShouldBe(HttpStatusCode.OK);
|
||||||
|
|
||||||
|
var varz = await response.Content.ReadFromJsonAsync<Varz>();
|
||||||
|
varz.ShouldNotBeNull();
|
||||||
|
varz.Mqtt.ShouldNotBeNull();
|
||||||
|
varz.Mqtt.Host.ShouldBe("");
|
||||||
|
varz.Mqtt.Port.ShouldBe(0);
|
||||||
|
varz.Mqtt.NoAuthUser.ShouldBe("");
|
||||||
|
varz.Mqtt.JsDomain.ShouldBe("");
|
||||||
|
varz.Mqtt.AckWait.ShouldBe(0L);
|
||||||
|
varz.Mqtt.MaxAckPending.ShouldBe((ushort)0);
|
||||||
|
}
|
||||||
|
|
||||||
private static int GetFreePort()
|
private static int GetFreePort()
|
||||||
{
|
{
|
||||||
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||||
|
|||||||
28
tests/NATS.Server.Tests/TestData/mqtt.conf
Normal file
28
tests/NATS.Server.Tests/TestData/mqtt.conf
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
mqtt {
|
||||||
|
listen: "10.0.0.1:1883"
|
||||||
|
no_auth_user: "mqtt_default"
|
||||||
|
|
||||||
|
authorization {
|
||||||
|
user: "mqtt_user"
|
||||||
|
pass: "mqtt_pass"
|
||||||
|
token: "mqtt_token"
|
||||||
|
timeout: 3.0
|
||||||
|
}
|
||||||
|
|
||||||
|
tls {
|
||||||
|
cert_file: "/path/to/mqtt-cert.pem"
|
||||||
|
key_file: "/path/to/mqtt-key.pem"
|
||||||
|
ca_file: "/path/to/mqtt-ca.pem"
|
||||||
|
verify: true
|
||||||
|
timeout: 5.0
|
||||||
|
}
|
||||||
|
|
||||||
|
ack_wait: "60s"
|
||||||
|
max_ack_pending: 2048
|
||||||
|
js_domain: "mqtt-domain"
|
||||||
|
js_api_timeout: "10s"
|
||||||
|
stream_replicas: 3
|
||||||
|
consumer_replicas: 1
|
||||||
|
consumer_memory_storage: true
|
||||||
|
consumer_inactive_threshold: "5m"
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user