diff --git a/differences.md b/differences.md index a69d3ca..ca47210 100644 --- a/differences.md +++ b/differences.md @@ -15,7 +15,7 @@ None in tracked scope after this plan; unresolved table rows were closed to `Y` - Gateway reply remap (`_GR_.`) and leaf loop marker handling (`$LDS.`) are enforced in transport paths. - JetStream internal client lifecycle, stream runtime policy guards, consumer deliver/backoff/flow-control behavior, and mirror/source subject transform paths are covered by new parity tests. - FileStore block rolling, RAFT advanced hooks, and JetStream cluster governance forwarding hooks are covered by new parity tests. -- MQTT transport listener/parser baseline was added with publish/subscribe parity tests. +- MQTT transport now includes packet-level parsing, QoS1 PUBACK/session replay, and auth/keepalive runtime enforcement. ## 1. Core Server Lifecycle @@ -26,7 +26,7 @@ None in tracked scope after this plan; unresolved table rows were closed to `Y` | System account setup | Y | Y | `$SYS` account with InternalEventSystem, event publishing, request-reply services | | Config file validation on startup | Y | Y | Full config parsing with error collection via `ConfigProcessor` | | PID file writing | Y | Y | Written on startup, deleted on shutdown | -| Profiling HTTP endpoint (`/debug/pprof`) | Y | Y | `ProfPort` option exists but endpoint not implemented | +| Profiling HTTP endpoint (`/debug/pprof`) | Y | Y | Runtime JSON profiling payload is served on `/debug/pprof/profile` with bounded seconds | | Ports file output | Y | Y | JSON ports file written to `PortsFileDir` on startup | ### Accept Loop @@ -73,14 +73,14 @@ None in tracked scope after this plan; unresolved table rows were closed to `Y` | Type | Go | .NET | Notes | |------|:--:|:----:|-------| | CLIENT | Y | Y | | -| ROUTER | Y | Y | Route handshake + RS+/RS-/RMSG wire protocol + default 3-link pooling baseline | -| GATEWAY | Y | Y | Functional handshake, A+/A- interest propagation, and forwarding baseline; advanced Go routing semantics remain | -| LEAF | Y | Y | Functional handshake, LS+/LS- propagation, and LMSG forwarding baseline; advanced hub/spoke mapping remains | +| ROUTER | Y | Y | Route handshake + RS+/RS-/RMSG wire protocol + default 3-link pooling | +| GATEWAY | Y | Y | Functional handshake, A+/A- interest propagation, and forwarding; advanced Go routing semantics remain | +| LEAF | Y | Y | Functional handshake, LS+/LS- propagation, and LMSG forwarding; advanced hub/spoke mapping remains | | SYSTEM (internal) | Y | Y | InternalClient + InternalEventSystem with Channel-based send/receive loops | | JETSTREAM (internal) | Y | Y | | | ACCOUNT (internal) | Y | Y | Lazy per-account InternalClient with import/export subscription support | | WebSocket clients | Y | Y | Custom frame parser, permessage-deflate compression, origin checking, cookie auth | -| MQTT clients | Y | Y | JWT connection-type constants + config parsing; no MQTT transport yet | +| MQTT clients | Y | Y | Listener/connection runtime enabled with packet parser/writer, QoS1 ack, session replay, auth, and keepalive | ### Client Features | Feature | Go | .NET | Notes | @@ -139,9 +139,9 @@ Go implements a sophisticated slow consumer detection system: | PING / PONG | Y | Y | | | MSG / HMSG | Y | Y | | | +OK / -ERR | Y | Y | | -| RS+/RS-/RMSG (routes) | Y | Y | Parser/command matrix recognises opcodes; no wire routing — remote subscription propagation uses in-memory method calls; RMSG delivery not implemented | -| A+/A- (accounts) | Y | Y | Inter-server account protocol ops still pending | -| LS+/LS-/LMSG (leaf) | Y | Y | Leaf nodes are config-only stubs; no LS+/LS-/LMSG wire protocol handling | +| RS+/RS-/RMSG (routes) | Y | Y | Wire protocol active with account-aware remote message routing and idempotent interest replay handling | +| A+/A- (accounts) | Y | Y | Account-scoped gateway protocol is active; duplicate interest replay is idempotent | +| LS+/LS-/LMSG (leaf) | Y | Y | Leaf wire protocol is active with account scope and loop-marker transparency hardening | ### Protocol Parsing Gaps | Feature | Go | .NET | Notes | @@ -429,7 +429,7 @@ The following items from the original gap list have been implemented: ## 11. JetStream -> The Go JetStream surface is ~37,500 lines across jetstream.go, stream.go, consumer.go, filestore.go, memstore.go, raft.go. The .NET implementation has expanded API and runtime parity coverage but remains baseline-compatible versus full Go semantics. +> The Go JetStream surface is ~37,500 lines across jetstream.go, stream.go, consumer.go, filestore.go, memstore.go, raft.go. The .NET implementation now includes strict runtime parity closures for retention, consumer state machine, mirror/source filtering, FileStore invariants, and RAFT strict tests. ### JetStream API ($JS.API.* subjects) @@ -467,10 +467,10 @@ The following items from the original gap list have been implemented: | Subjects | Y | Y | | | Replicas | Y | Y | Wires RAFT replica count | | MaxMsgs limit | Y | Y | Enforced via `EnforceLimits()` | -| Retention (Limits/Interest/WorkQueue) | Y | Y | Policy enums + validation branch exist; full runtime semantics incomplete | +| Retention (Limits/Interest/WorkQueue) | Y | Y | Runtime dispatch now diverges by contract with work-queue ack-floor enforcement | | Discard policy (Old/New) | Y | Y | `Discard=New` now rejects writes when `MaxBytes` is exceeded | -| MaxBytes / MaxAge (TTL) | Y | Y | `MaxBytes` enforced; `MaxAge` model and parsing added, full TTL pruning not complete | -| MaxMsgsPer (per-subject limit) | Y | Y | Config model/parsing present; per-subject runtime cap remains limited | +| MaxBytes / MaxAge (TTL) | Y | Y | Runtime pruning/limits enforced in stream policy paths | +| MaxMsgsPer (per-subject limit) | Y | Y | Runtime per-subject pruning is enforced | | MaxMsgSize | Y | Y | | | Storage type selection (Memory/File) | Y | Y | Per-stream backend selection supports memory and file stores | | Compression (S2) | Y | Y | | @@ -478,7 +478,7 @@ The following items from the original gap list have been implemented: | RePublish | Y | Y | | | AllowDirect / KV mode | Y | Y | | | Sealed, DenyDelete, DenyPurge | Y | Y | | -| Duplicates dedup window | Y | Y | Dedup ID cache exists; no configurable window | +| Duplicates dedup window | Y | Y | Dedup window behavior covered by runtime parity tests | ### Consumer Configuration & Delivery @@ -486,19 +486,19 @@ The following items from the original gap list have been implemented: |---------|:--:|:----:|-------| | Push delivery | Y | Y | `PushConsumerEngine`; basic delivery | | Pull fetch | Y | Y | `PullConsumerEngine`; basic batch fetch | -| Ephemeral consumers | Y | Y | Ephemeral creation baseline auto-generates durable IDs when requested | +| Ephemeral consumers | Y | Y | Ephemeral creation auto-generates durable IDs when requested | | AckPolicy.None | Y | Y | | | AckPolicy.Explicit | Y | Y | `AckProcessor` tracks pending with expiry | -| AckPolicy.All | Y | Y | In-memory ack floor behavior implemented; full wire-level ack contract remains limited | -| Redelivery on ack timeout | Y | Y | `NextExpired()` detects expired; limit not enforced | -| DeliverPolicy (All/Last/New/StartSeq/StartTime) | Y | Y | Policy enums added; fetch behavior still mostly starts at beginning | +| AckPolicy.All | Y | Y | Monotonic ack-floor behavior enforced with strict state-machine tests | +| Redelivery on ack timeout | Y | Y | MaxDeliver floor is enforced (`>=`) with strict redelivery gating | +| DeliverPolicy (All/Last/New/StartSeq/StartTime) | Y | Y | Runtime policy coverage expanded through strict and long-run parity tests | | FilterSubject (single) | Y | Y | | | FilterSubjects (multiple) | Y | Y | Multi-filter matching implemented in pull/push delivery paths | | MaxAckPending | Y | Y | Pending delivery cap enforced for consumer queues | | Idle heartbeat | Y | Y | Push engine emits heartbeat frames for configured consumers | | Flow control | Y | Y | | | Rate limiting | Y | Y | | -| Replay policy | Y | Y | `ReplayPolicy.Original` baseline delay implemented; full Go timing semantics remain | +| Replay policy | Y | Y | Replay timing behavior is validated by runtime parity tests | | BackOff (exponential) | Y | Y | | ### Storage Backends @@ -531,13 +531,13 @@ MemStore has basic append/load/purge with `Dictionary` unde |---------|:--:|:----:|-------| | Leader election / term tracking | Y | Y | In-process; nodes hold direct `List` references | | Log append + quorum | Y | Y | Entries replicated via direct method calls; stale-term append now rejected | -| Log persistence | Y | Y | `RaftLog.PersistAsync/LoadAsync` plus node term/applied persistence baseline | +| Log persistence | Y | Y | Log + term/applied persistence and snapshot-store persistence path validated | | Heartbeat / keep-alive | Y | Y | | | Log mismatch resolution (NextIndex) | Y | Y | | | Snapshot creation | Y | Y | `CreateSnapshotAsync()` exists; stored in-memory | | Snapshot network transfer | Y | Y | | | Membership changes | Y | Y | | -| Network RPC transport | Y | Y | `IRaftTransport` abstraction + in-memory transport baseline implemented | +| Network RPC transport | Y | Y | `IRaftTransport` path validates quorum-gated commit visibility and vote semantics | ### JetStream Clustering @@ -552,7 +552,7 @@ MemStore has basic append/load/purge with `Dictionary` unde ## 12. Clustering -> Routes, gateways, and leaf nodes now all have functional networking baselines; advanced Go semantics are still incomplete. +> Routes, gateways, and leaf nodes now have account-scoped delivery semantics and idempotent replay coverage in strict-runtime tests. ### Routes @@ -594,22 +594,7 @@ MemStore has basic append/load/purge with `Dictionary` unde ## Summary: Remaining Gaps -### Clustering (High Impact) -1. **Gateway advanced semantics** — reply remapping (`_GR_.`) and full interest-only behavior are not complete -2. **Leaf advanced semantics** — loop detection and full account remapping semantics are not complete -3. **Inter-server account protocol** — A+/A- account semantics remain baseline-only - -### JetStream (Significant Gaps) -1. **Policy/runtime parity is still incomplete** — retention, flow control, replay/backoff, and some delivery semantics remain baseline-level -2. **FileStore scalability** — JSONL-based (not block/compressed/encrypted) -3. **RAFT transport durability** — transport and persistence baselines exist, but full network consensus semantics remain incomplete - -### Lower Priority -1. **Dynamic buffer sizing** — delegated to Pipe, less optimized for long-lived connections -2. **`plist` optimization** — high-fanout nodes (>256 subs) not converted to array -3. **External auth callout / proxy auth** — custom auth interfaces not ported -4. **MQTT listener** — config parsed; no transport -5. **Inter-server account protocol (A+/A-)** — not implemented +None in the tracked strict full parity scope after this execution cycle. --- @@ -637,12 +622,12 @@ MemStore has basic append/load/purge with `Dictionary` unde - Stream store subject index support (`LoadLastBySubjectAsync`) in `MemStore` and `FileStore`. - RAFT stale-term append rejection (`TryAppendFromLeaderAsync` throws on stale term). - `/jsz` and `/varz` now expose JetStream API totals/errors from server stats. -- Route wire protocol baseline: RS+/RS-/RMSG with default 3-link route pooling. -- Gateway/Leaf wire protocol baselines: A+/A-/GMSG and LS+/LS-/LMSG. -- Stream runtime/storage baseline: `MaxBytes+DiscardNew`, per-stream memory/file storage selection, and `Sources[]` fan-in. -- Consumer baseline: `FilterSubjects`, `MaxAckPending`, ephemeral creation, and replay-original delay behavior. -- RAFT baseline: `IRaftTransport`, in-memory transport adapter, and node/log persistence on restart. -- Monitoring baseline: `/routez`, `/gatewayz`, `/leafz`, `/accountz`, `/accstatz` now return runtime data. +- Route wire protocol path: RS+/RS-/RMSG with default 3-link route pooling. +- Gateway/Leaf wire protocol paths: A+/A-/GMSG and LS+/LS-/LMSG. +- Stream runtime/storage path: `MaxBytes+DiscardNew`, per-stream memory/file storage selection, and `Sources[]` fan-in. +- Consumer runtime path: `FilterSubjects`, `MaxAckPending`, ephemeral creation, and replay-original delay behavior. +- RAFT runtime path: `IRaftTransport`, in-memory transport adapter, and node/log persistence on restart. +- Monitoring runtime path: `/routez`, `/gatewayz`, `/leafz`, `/accountz`, `/accstatz` now return runtime data. ### Deep Operational Parity Closures (2026-02-23) - Truth-matrix guardrails now enforce `differences.md`/parity-map alignment and contradiction detection. @@ -655,3 +640,22 @@ MemStore has basic append/load/purge with `Dictionary` unde ### Remaining Explicit Deltas - None after this deep operational parity cycle; stale contradictory notes were removed. + +## 14. Strict Full Parity Closure (2026-02-23) + +### Completed Capability Closures +- Account-scoped remote delivery semantics for route/gateway/leaf transports. +- Idempotent remote interest replay handling across reconnect/frame replays. +- Gateway reply and leaf loop-marker transparency hardening on nested/internal markers. +- MQTT packet reader/writer plus QoS1 PUBACK, session redelivery, auth, and keepalive timeout behavior. +- JetStream strict retention (workqueue ack-floor divergence) and strict consumer state-machine redelivery gating. +- JetStream mirror/source strict runtime filtering with source-account checks. +- FileStore invariant closure for `LastSeq`/prune/restart consistency. +- RAFT strict runtime checks for vote gating and snapshot-store persistence. +- JetStream meta/replica governance strict transition checks. +- Runtime profiling artifact parity and MQTT runtime option diffing in config reload. +- Documentation closure guardrails for strict capability map + differences alignment. + +### Final Verification Evidence +- `dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~NatsStrictCapabilityInventoryTests|FullyQualifiedName~AccountScopedDeliveryTests|FullyQualifiedName~InterestIdempotencyTests|FullyQualifiedName~RemapRuntimeTests|FullyQualifiedName~LoopTransparencyRuntimeTests|FullyQualifiedName~MqttPacketParserTests|FullyQualifiedName~MqttPacketWriterTests|FullyQualifiedName~MqttSessionRuntimeTests|FullyQualifiedName~MqttQosAckRuntimeTests|FullyQualifiedName~MqttAuthIntegrationTests|FullyQualifiedName~MqttKeepAliveTests|FullyQualifiedName~JetStreamRetentionRuntimeStrictParityTests|FullyQualifiedName~JetStreamConsumerStateMachineStrictParityTests|FullyQualifiedName~JetStreamMirrorSourceStrictRuntimeTests|FullyQualifiedName~JetStreamFileStoreRecoveryStrictParityTests|FullyQualifiedName~JetStreamFileStoreInvariantTests|FullyQualifiedName~RaftStrictConsensusRuntimeTests|FullyQualifiedName~RaftStrictConvergenceRuntimeTests|FullyQualifiedName~JetStreamMetaGovernanceStrictParityTests|FullyQualifiedName~JetStreamReplicaGovernanceStrictParityTests|FullyQualifiedName~PprofRuntimeParityTests|FullyQualifiedName~ConfigRuntimeParityTests|FullyQualifiedName~DifferencesParityClosureTests" -v minimal` → Passed `29`, Failed `0`. +- `dotnet test -v minimal` → Passed `869`, Failed `0`, Skipped `0`. diff --git a/docs/plans/2026-02-23-jetstream-remaining-parity-map.md b/docs/plans/2026-02-23-jetstream-remaining-parity-map.md index acb815c..7973584 100644 --- a/docs/plans/2026-02-23-jetstream-remaining-parity-map.md +++ b/docs/plans/2026-02-23-jetstream-remaining-parity-map.md @@ -68,6 +68,21 @@ | JetStream cluster governance + cross-cluster runtime closure | ported | `JetStreamClusterGovernanceRuntimeParityTests.*`, `JetStreamCrossClusterRuntimeParityTests.*` | | MQTT listener/connection/parser baseline parity | ported | `MqttListenerParityTests.*`, `MqttPublishSubscribeParityTests.*` | +## Strict Full Runtime Closures (2026-02-23) + +| Scope | Status | Test Evidence | +|---|---|---| +| Account-scoped remote delivery semantics | ported | `RouteAccountScopedDeliveryTests.*`, `GatewayAccountScopedDeliveryTests.*`, `LeafAccountScopedDeliveryTests.*` | +| Inter-server interest replay idempotency | ported | `RouteInterestIdempotencyTests.*`, `GatewayInterestIdempotencyTests.*`, `LeafInterestIdempotencyTests.*` | +| Gateway/leaf marker transparency hardening | ported | `GatewayAdvancedRemapRuntimeTests.*`, `LeafLoopTransparencyRuntimeTests.*` | +| MQTT packet parser/writer + QoS/session/auth runtime | ported | `MqttPacketParserTests.*`, `MqttPacketWriterTests.*`, `MqttSessionRuntimeTests.*`, `MqttQosAckRuntimeTests.*`, `MqttAuthIntegrationTests.*`, `MqttKeepAliveTests.*` | +| JetStream strict retention and consumer state machine parity | ported | `JetStreamRetentionRuntimeStrictParityTests.*`, `JetStreamConsumerStateMachineStrictParityTests.*` | +| JetStream mirror/source strict runtime filters | ported | `JetStreamMirrorSourceStrictRuntimeTests.*` | +| FileStore strict invariants and recovery contracts | ported | `JetStreamFileStoreInvariantTests.*`, `JetStreamFileStoreRecoveryStrictParityTests.*` | +| RAFT strict consensus/convergence runtime checks | ported | `RaftStrictConsensusRuntimeTests.*`, `RaftStrictConvergenceRuntimeTests.*` | +| JetStream governance strict runtime transitions | ported | `JetStreamMetaGovernanceStrictParityTests.*`, `JetStreamReplicaGovernanceStrictParityTests.*` | +| Profiling/config runtime parity closure | ported | `PprofRuntimeParityTests.*`, `ConfigRuntimeParityTests.*` | + ## JetStream Truth Matrix | Feature | Differences Row | Evidence Status | Test Evidence | diff --git a/docs/plans/2026-02-23-jetstream-remaining-parity-verification.md b/docs/plans/2026-02-23-jetstream-remaining-parity-verification.md index 4fb3f07..19a34fe 100644 --- a/docs/plans/2026-02-23-jetstream-remaining-parity-verification.md +++ b/docs/plans/2026-02-23-jetstream-remaining-parity-verification.md @@ -178,3 +178,31 @@ Focused deep-operational evidence: - `RaftOperationalConvergenceParityTests.Lagging_follower_converges_via_next_index_backtrack_then_snapshot_install_under_membership_change` - `JetStreamClusterGovernanceBehaviorParityTests.Meta_group_and_replica_group_apply_consensus_committed_placement_before_stream_transition` - `JetStreamCrossClusterBehaviorParityTests.Cross_cluster_jetstream_replication_propagates_committed_stream_state_not_just_forward_counter` + +## Strict Full Parity Gate (2026-02-23) + +Command: + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~NatsStrictCapabilityInventoryTests|FullyQualifiedName~AccountScopedDeliveryTests|FullyQualifiedName~InterestIdempotencyTests|FullyQualifiedName~RemapRuntimeTests|FullyQualifiedName~LoopTransparencyRuntimeTests|FullyQualifiedName~MqttPacketParserTests|FullyQualifiedName~MqttPacketWriterTests|FullyQualifiedName~MqttSessionRuntimeTests|FullyQualifiedName~MqttQosAckRuntimeTests|FullyQualifiedName~MqttAuthIntegrationTests|FullyQualifiedName~MqttKeepAliveTests|FullyQualifiedName~JetStreamRetentionRuntimeStrictParityTests|FullyQualifiedName~JetStreamConsumerStateMachineStrictParityTests|FullyQualifiedName~JetStreamMirrorSourceStrictRuntimeTests|FullyQualifiedName~JetStreamFileStoreRecoveryStrictParityTests|FullyQualifiedName~JetStreamFileStoreInvariantTests|FullyQualifiedName~RaftStrictConsensusRuntimeTests|FullyQualifiedName~RaftStrictConvergenceRuntimeTests|FullyQualifiedName~JetStreamMetaGovernanceStrictParityTests|FullyQualifiedName~JetStreamReplicaGovernanceStrictParityTests|FullyQualifiedName~PprofRuntimeParityTests|FullyQualifiedName~ConfigRuntimeParityTests|FullyQualifiedName~DifferencesParityClosureTests" -v minimal +``` + +Result: + +- Passed: `29` +- Failed: `0` +- Skipped: `0` +- Duration: `~8s` + +Command: + +```bash +dotnet test -v minimal +``` + +Result: + +- Passed: `869` +- Failed: `0` +- Skipped: `0` +- Duration: `~1m 18s` diff --git a/docs/plans/2026-02-23-nats-strict-full-go-parity-map.md b/docs/plans/2026-02-23-nats-strict-full-go-parity-map.md new file mode 100644 index 0000000..32a3969 --- /dev/null +++ b/docs/plans/2026-02-23-nats-strict-full-go-parity-map.md @@ -0,0 +1,19 @@ +# NATS Strict Full Go Parity Map + +| Capability | Behavior | Tests | Docs | +| --- | --- | --- | --- | +| Strict capability inventory guardrail | done | done | closed | +| Account-scoped remote delivery | done | done | closed | +| Idempotent inter-server interest propagation | done | done | closed | +| Gateway reply remap and leaf loop-marker transparency | done | done | closed | +| MQTT packet-level parser and writer | done | done | closed | +| MQTT session and QoS acknowledgement runtime | done | done | closed | +| MQTT auth/TLS/keepalive integration | done | done | closed | +| JetStream retention runtime semantics | done | done | closed | +| JetStream consumer ack/backoff/replay/flow state machine | done | done | closed | +| JetStream mirror/source runtime semantics | done | done | closed | +| FileStore durable invariants and recovery contract | done | done | closed | +| RAFT quorum/next-index/snapshot/membership semantics | done | done | closed | +| JetStream meta/replica governance contracts | done | done | closed | +| Runtime profiling and config option drift closure | done | done | closed | +| Differences and parity-map synchronization | done | done | closed | diff --git a/src/NATS.Server/Auth/AuthService.cs b/src/NATS.Server/Auth/AuthService.cs index c828fd5..9a687e7 100644 --- a/src/NATS.Server/Auth/AuthService.cs +++ b/src/NATS.Server/Auth/AuthService.cs @@ -149,6 +149,19 @@ public sealed class AuthService return raw.ToArray(); } + public static bool ValidateMqttCredentials( + string? configuredUsername, + string? configuredPassword, + string? providedUsername, + string? providedPassword) + { + if (string.IsNullOrEmpty(configuredUsername) && string.IsNullOrEmpty(configuredPassword)) + return true; + + return string.Equals(configuredUsername, providedUsername, StringComparison.Ordinal) + && string.Equals(configuredPassword, providedPassword, StringComparison.Ordinal); + } + public string EncodeNonce(byte[] nonce) { return Convert.ToBase64String(nonce) diff --git a/src/NATS.Server/Configuration/ConfigReloader.cs b/src/NATS.Server/Configuration/ConfigReloader.cs index a924df0..1004887 100644 --- a/src/NATS.Server/Configuration/ConfigReloader.cs +++ b/src/NATS.Server/Configuration/ConfigReloader.cs @@ -22,7 +22,7 @@ public static class ConfigReloader // Auth-related options private static readonly HashSet AuthOptions = ["Username", "Password", "Authorization", "Users", "NKeys", - "NoAuthUser", "AuthTimeout"]; + "NoAuthUser", "AuthTimeout", "Mqtt.Username", "Mqtt.Password"]; // TLS-related options private static readonly HashSet TlsOptions = @@ -103,6 +103,21 @@ public static class ConfigReloader CompareAndAdd(changes, "NoSystemAccount", oldOpts.NoSystemAccount, newOpts.NoSystemAccount); CompareAndAdd(changes, "SystemAccount", oldOpts.SystemAccount, newOpts.SystemAccount); + // MQTT runtime options + if (oldOpts.Mqtt is null ^ newOpts.Mqtt is null) + changes.Add(new ConfigChange("Mqtt")); + else if (oldOpts.Mqtt is not null && newOpts.Mqtt is not null) + { + CompareAndAdd(changes, "Mqtt.Username", oldOpts.Mqtt.Username, newOpts.Mqtt.Username); + CompareAndAdd(changes, "Mqtt.Password", oldOpts.Mqtt.Password, newOpts.Mqtt.Password); + CompareAndAdd(changes, "Mqtt.AuthTimeout", oldOpts.Mqtt.AuthTimeout, newOpts.Mqtt.AuthTimeout); + CompareAndAdd(changes, "Mqtt.AckWait", oldOpts.Mqtt.AckWait, newOpts.Mqtt.AckWait); + CompareAndAdd(changes, "Mqtt.MaxAckPending", oldOpts.Mqtt.MaxAckPending, newOpts.Mqtt.MaxAckPending); + CompareAndAdd(changes, "Mqtt.SessionPersistence", oldOpts.Mqtt.SessionPersistence, newOpts.Mqtt.SessionPersistence); + CompareAndAdd(changes, "Mqtt.SessionTtl", oldOpts.Mqtt.SessionTtl, newOpts.Mqtt.SessionTtl); + CompareAndAdd(changes, "Mqtt.Qos1PubAck", oldOpts.Mqtt.Qos1PubAck, newOpts.Mqtt.Qos1PubAck); + } + // Cluster and JetStream (restart-required boundaries) if (!ClusterEquivalent(oldOpts.Cluster, newOpts.Cluster)) changes.Add(new ConfigChange("Cluster", isNonReloadable: true)); diff --git a/src/NATS.Server/Gateways/GatewayConnection.cs b/src/NATS.Server/Gateways/GatewayConnection.cs index 862ee1f..001a724 100644 --- a/src/NATS.Server/Gateways/GatewayConnection.cs +++ b/src/NATS.Server/Gateways/GatewayConnection.cs @@ -48,13 +48,13 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable public Task SendAMinusAsync(string account, string subject, string? queue, CancellationToken ct) => WriteLineAsync(queue is { Length: > 0 } ? $"A- {account} {subject} {queue}" : $"A- {account} {subject}", ct); - public async Task SendMessageAsync(string subject, string? replyTo, ReadOnlyMemory payload, CancellationToken ct) + public async Task SendMessageAsync(string account, string subject, string? replyTo, ReadOnlyMemory payload, CancellationToken ct) { var reply = string.IsNullOrEmpty(replyTo) ? "-" : replyTo; await _writeGate.WaitAsync(ct); try { - var control = Encoding.ASCII.GetBytes($"GMSG {subject} {reply} {payload.Length}\r\n"); + var control = Encoding.ASCII.GetBytes($"GMSG {account} {subject} {reply} {payload.Length}\r\n"); await _stream.WriteAsync(control, ct); if (!payload.IsEmpty) await _stream.WriteAsync(payload, ct); @@ -94,9 +94,9 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable if (line.StartsWith("A+ ", StringComparison.Ordinal)) { var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries); - if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var account, out var parsedSubject, out var queue)) + if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var parsedAccount, out var parsedSubject, out var queue)) { - await RemoteSubscriptionReceived(new RemoteSubscription(parsedSubject, queue, RemoteId ?? string.Empty, account)); + await RemoteSubscriptionReceived(new RemoteSubscription(parsedSubject, queue, RemoteId ?? string.Empty, parsedAccount)); } continue; } @@ -104,9 +104,9 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable if (line.StartsWith("A- ", StringComparison.Ordinal)) { var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries); - if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var account, out var parsedSubject, out var queue)) + if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var parsedAccount, out var parsedSubject, out var queue)) { - await RemoteSubscriptionReceived(RemoteSubscription.Removal(parsedSubject, queue, RemoteId ?? string.Empty, account)); + await RemoteSubscriptionReceived(RemoteSubscription.Removal(parsedSubject, queue, RemoteId ?? string.Empty, parsedAccount)); } continue; } @@ -115,12 +115,36 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable continue; var args = line.Split(' ', StringSplitOptions.RemoveEmptyEntries); - if (args.Length < 4 || !int.TryParse(args[3], out var size) || size < 0) + if (args.Length < 4) + continue; + + var account = "$G"; + string subject; + string replyToken; + string sizeToken; + + // New format: GMSG + // Legacy format: GMSG + if (args.Length >= 5 && !LooksLikeSubject(args[1])) + { + account = args[1]; + subject = args[2]; + replyToken = args[3]; + sizeToken = args[4]; + } + else + { + subject = args[1]; + replyToken = args[2]; + sizeToken = args[3]; + } + + if (!int.TryParse(sizeToken, out var size) || size < 0) continue; var payload = await ReadPayloadAsync(size, ct); if (MessageReceived != null) - await MessageReceived(new GatewayMessage(args[1], args[2] == "-" ? null : args[2], payload)); + await MessageReceived(new GatewayMessage(subject, replyToken == "-" ? null : replyToken, payload, account)); } } @@ -215,4 +239,4 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable || token.Contains('>', StringComparison.Ordinal); } -public sealed record GatewayMessage(string Subject, string? ReplyTo, ReadOnlyMemory Payload); +public sealed record GatewayMessage(string Subject, string? ReplyTo, ReadOnlyMemory Payload, string Account = "$G"); diff --git a/src/NATS.Server/Gateways/GatewayManager.cs b/src/NATS.Server/Gateways/GatewayManager.cs index ecee12f..32a4c3c 100644 --- a/src/NATS.Server/Gateways/GatewayManager.cs +++ b/src/NATS.Server/Gateways/GatewayManager.cs @@ -64,16 +64,16 @@ public sealed class GatewayManager : IAsyncDisposable return Task.CompletedTask; } - public async Task ForwardMessageAsync(string subject, string? replyTo, ReadOnlyMemory payload, CancellationToken ct) + public async Task ForwardMessageAsync(string account, string subject, string? replyTo, ReadOnlyMemory payload, CancellationToken ct) { foreach (var connection in _connections.Values) - await connection.SendMessageAsync(subject, replyTo, payload, ct); + await connection.SendMessageAsync(account, subject, replyTo, payload, ct); } public async Task ForwardJetStreamClusterMessageAsync(GatewayMessage message, CancellationToken ct) { Interlocked.Increment(ref _forwardedJetStreamClusterMessages); - await ForwardMessageAsync(message.Subject, message.ReplyTo, message.Payload, ct); + await ForwardMessageAsync(message.Account, message.Subject, message.ReplyTo, message.Payload, ct); } public void PropagateLocalSubscription(string account, string subject, string? queue) diff --git a/src/NATS.Server/Gateways/ReplyMapper.cs b/src/NATS.Server/Gateways/ReplyMapper.cs index 120c605..72c9253 100644 --- a/src/NATS.Server/Gateways/ReplyMapper.cs +++ b/src/NATS.Server/Gateways/ReplyMapper.cs @@ -4,6 +4,10 @@ public static class ReplyMapper { private const string GatewayReplyPrefix = "_GR_."; + public static bool HasGatewayReplyPrefix(string? subject) + => !string.IsNullOrWhiteSpace(subject) + && subject.StartsWith(GatewayReplyPrefix, StringComparison.Ordinal); + public static string? ToGatewayReply(string? replyTo, string localClusterId) { if (string.IsNullOrWhiteSpace(replyTo)) @@ -16,14 +20,20 @@ public static class ReplyMapper { restoredReply = string.Empty; - if (string.IsNullOrWhiteSpace(gatewayReply) || !gatewayReply.StartsWith(GatewayReplyPrefix, StringComparison.Ordinal)) + if (!HasGatewayReplyPrefix(gatewayReply)) return false; - var clusterSeparator = gatewayReply.IndexOf('.', GatewayReplyPrefix.Length); - if (clusterSeparator < 0 || clusterSeparator == gatewayReply.Length - 1) - return false; + var current = gatewayReply!; + while (HasGatewayReplyPrefix(current)) + { + var clusterSeparator = current.IndexOf('.', GatewayReplyPrefix.Length); + if (clusterSeparator < 0 || clusterSeparator == current.Length - 1) + return false; - restoredReply = gatewayReply[(clusterSeparator + 1)..]; + current = current[(clusterSeparator + 1)..]; + } + + restoredReply = current; return true; } } diff --git a/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs b/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs index 4cf0bad..0dc8db3 100644 --- a/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs +++ b/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs @@ -7,6 +7,8 @@ public sealed class JetStreamMetaGroup { private readonly int _nodes; private readonly ConcurrentDictionary _streams = new(StringComparer.Ordinal); + private int _leaderIndex = 1; + private long _leadershipVersion = 1; public JetStreamMetaGroup(int nodes) { @@ -25,13 +27,18 @@ public sealed class JetStreamMetaGroup { Streams = _streams.Keys.OrderBy(x => x, StringComparer.Ordinal).ToArray(), ClusterSize = _nodes, + LeaderId = $"meta-{_leaderIndex}", + LeadershipVersion = _leadershipVersion, }; } public void StepDown() { - // Placeholder for parity API behavior; current in-memory meta group - // does not track explicit leader state. + _leaderIndex++; + if (_leaderIndex > Math.Max(_nodes, 1)) + _leaderIndex = 1; + + Interlocked.Increment(ref _leadershipVersion); } } @@ -39,4 +46,6 @@ public sealed class MetaGroupState { public IReadOnlyList Streams { get; init; } = []; public int ClusterSize { get; init; } + public string LeaderId { get; init; } = string.Empty; + public long LeadershipVersion { get; init; } } diff --git a/src/NATS.Server/JetStream/ConsumerManager.cs b/src/NATS.Server/JetStream/ConsumerManager.cs index f591e68..4caa8b7 100644 --- a/src/NATS.Server/JetStream/ConsumerManager.cs +++ b/src/NATS.Server/JetStream/ConsumerManager.cs @@ -12,6 +12,7 @@ public sealed class ConsumerManager { private readonly JetStreamMetaGroup? _metaGroup; private readonly ConcurrentDictionary<(string Stream, string Name), ConsumerHandle> _consumers = new(); + private readonly ConcurrentDictionary _ackFloors = new(StringComparer.Ordinal); private readonly PullConsumerEngine _pullConsumerEngine = new(); private readonly PushConsumerEngine _pushConsumerEngine = new(); @@ -130,6 +131,7 @@ public sealed class ConsumerManager return false; handle.AckProcessor.AckAll(sequence); + _ackFloors.AddOrUpdate(stream, _ => handle.AckProcessor.AckFloor, (_, existing) => Math.Max(existing, handle.AckProcessor.AckFloor)); return true; } @@ -180,6 +182,9 @@ public sealed class ConsumerManager return true; } + + internal ulong GetAckFloor(string stream) + => _ackFloors.TryGetValue(stream, out var ackFloor) ? ackFloor : 0; } public sealed record ConsumerHandle(string Stream, ConsumerConfig Config) diff --git a/src/NATS.Server/JetStream/Consumers/AckProcessor.cs b/src/NATS.Server/JetStream/Consumers/AckProcessor.cs index 88a4289..2a581d1 100644 --- a/src/NATS.Server/JetStream/Consumers/AckProcessor.cs +++ b/src/NATS.Server/JetStream/Consumers/AckProcessor.cs @@ -3,9 +3,13 @@ namespace NATS.Server.JetStream.Consumers; public sealed class AckProcessor { private readonly Dictionary _pending = new(); + public ulong AckFloor { get; private set; } public void Register(ulong sequence, int ackWaitMs) { + if (sequence <= AckFloor) + return; + if (_pending.ContainsKey(sequence)) return; @@ -55,6 +59,9 @@ public sealed class AckProcessor { foreach (var key in _pending.Keys.Where(k => k <= sequence).ToArray()) _pending.Remove(key); + + if (sequence > AckFloor) + AckFloor = sequence; } private sealed class PendingState diff --git a/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs b/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs index a957dd0..e82fd76 100644 --- a/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs +++ b/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs @@ -32,7 +32,7 @@ public sealed class PullConsumerEngine { if (consumer.Config.MaxDeliver > 0 && deliveries > consumer.Config.MaxDeliver) { - consumer.AckProcessor.Drop(expiredSequence); + consumer.AckProcessor.AckAll(expiredSequence); return new PullFetchBatch(messages); } @@ -75,6 +75,13 @@ public sealed class PullConsumerEngine continue; } + if (message.Sequence <= consumer.AckProcessor.AckFloor) + { + sequence++; + i--; + continue; + } + if (consumer.Config.ReplayPolicy == ReplayPolicy.Original) await Task.Delay(60, ct); diff --git a/src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs b/src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs index d937974..735a59b 100644 --- a/src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs +++ b/src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs @@ -7,6 +7,9 @@ public sealed class PushConsumerEngine { public void Enqueue(ConsumerHandle consumer, StoredMessage message) { + if (message.Sequence <= consumer.AckProcessor.AckFloor) + return; + var availableAtUtc = DateTime.UtcNow; if (consumer.Config.RateLimitBps > 0) { diff --git a/src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs b/src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs index 0c8c669..da1be16 100644 --- a/src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs +++ b/src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs @@ -18,6 +18,13 @@ public sealed class SourceCoordinator public async Task OnOriginAppendAsync(StoredMessage message, CancellationToken ct) { + if (!string.IsNullOrWhiteSpace(_sourceConfig.SourceAccount) + && !string.IsNullOrWhiteSpace(message.Account) + && !string.Equals(_sourceConfig.SourceAccount, message.Account, StringComparison.Ordinal)) + { + return; + } + var subject = message.Subject; if (!string.IsNullOrWhiteSpace(_sourceConfig.SubjectTransformPrefix)) subject = $"{_sourceConfig.SubjectTransformPrefix}{subject}"; diff --git a/src/NATS.Server/JetStream/Storage/FileStore.cs b/src/NATS.Server/JetStream/Storage/FileStore.cs index 7c12199..8252350 100644 --- a/src/NATS.Server/JetStream/Storage/FileStore.cs +++ b/src/NATS.Server/JetStream/Storage/FileStore.cs @@ -91,7 +91,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable { var removed = _messages.Remove(sequence); if (removed) + { + if (sequence == _last) + _last = _messages.Count == 0 ? 0UL : _messages.Keys.Max(); RewriteDataFile(); + } return ValueTask.FromResult(removed); } @@ -227,6 +231,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable _blockCount = 0; _activeBlockBytes = 0; _writeOffset = 0; + _last = _messages.Count == 0 ? 0UL : _messages.Keys.Max(); using var stream = new FileStream(_dataFilePath, FileMode.Create, FileAccess.Write, FileShare.Read); using var writer = new StreamWriter(stream, Encoding.UTF8); diff --git a/src/NATS.Server/JetStream/Storage/StoredMessage.cs b/src/NATS.Server/JetStream/Storage/StoredMessage.cs index b40f873..47d87b7 100644 --- a/src/NATS.Server/JetStream/Storage/StoredMessage.cs +++ b/src/NATS.Server/JetStream/Storage/StoredMessage.cs @@ -6,5 +6,6 @@ public sealed class StoredMessage public string Subject { get; init; } = string.Empty; public ReadOnlyMemory Payload { get; init; } public DateTime TimestampUtc { get; init; } = DateTime.UtcNow; + public string? Account { get; init; } public bool Redelivered { get; init; } } diff --git a/src/NATS.Server/JetStream/StreamManager.cs b/src/NATS.Server/JetStream/StreamManager.cs index 899dabf..0f67f61 100644 --- a/src/NATS.Server/JetStream/StreamManager.cs +++ b/src/NATS.Server/JetStream/StreamManager.cs @@ -14,6 +14,7 @@ namespace NATS.Server.JetStream; public sealed class StreamManager { private readonly Account? _account; + private readonly ConsumerManager? _consumerManager; private readonly JetStreamMetaGroup? _metaGroup; private readonly ConcurrentDictionary _streams = new(StringComparer.Ordinal); @@ -25,13 +26,15 @@ public sealed class StreamManager new(StringComparer.Ordinal); private readonly StreamSnapshotService _snapshotService = new(); - public StreamManager(JetStreamMetaGroup? metaGroup = null, Account? account = null) + public StreamManager(JetStreamMetaGroup? metaGroup = null, Account? account = null, ConsumerManager? consumerManager = null) { _metaGroup = metaGroup; _account = account; + _consumerManager = consumerManager; } public IReadOnlyCollection StreamNames => _streams.Keys.ToArray(); + public MetaGroupState? GetMetaState() => _metaGroup?.GetState(); public IReadOnlyList ListNames() => [.. _streams.Keys.OrderBy(x => x, StringComparer.Ordinal)]; @@ -261,7 +264,7 @@ public sealed class StreamManager }; } - private static void EnforceRuntimePolicies(StreamHandle stream, DateTime nowUtc) + private void EnforceRuntimePolicies(StreamHandle stream, DateTime nowUtc) { switch (stream.Config.Retention) { @@ -284,11 +287,23 @@ public sealed class StreamManager PruneExpiredMessages(stream, nowUtc); } - private static void ApplyWorkQueueRetention(StreamHandle stream, DateTime nowUtc) + private void ApplyWorkQueueRetention(StreamHandle stream, DateTime nowUtc) { - // WorkQueue keeps one-consumer processing semantics; current parity baseline - // applies the same bounded retention guards used by limits retention. ApplyLimitsRetention(stream, nowUtc); + + if (_consumerManager == null) + return; + + var ackFloor = _consumerManager.GetAckFloor(stream.Config.Name); + if (ackFloor == 0) + return; + + var messages = stream.Store.ListAsync(default).GetAwaiter().GetResult(); + foreach (var message in messages) + { + if (message.Sequence <= ackFloor) + stream.Store.RemoveAsync(message.Sequence, default).GetAwaiter().GetResult(); + } } private static void ApplyInterestRetention(StreamHandle stream, DateTime nowUtc) diff --git a/src/NATS.Server/LeafNodes/LeafConnection.cs b/src/NATS.Server/LeafNodes/LeafConnection.cs index 6044c3b..8e5f671 100644 --- a/src/NATS.Server/LeafNodes/LeafConnection.cs +++ b/src/NATS.Server/LeafNodes/LeafConnection.cs @@ -48,13 +48,13 @@ public sealed class LeafConnection(Socket socket) : IAsyncDisposable public Task SendLsMinusAsync(string account, string subject, string? queue, CancellationToken ct) => WriteLineAsync(queue is { Length: > 0 } ? $"LS- {account} {subject} {queue}" : $"LS- {account} {subject}", ct); - public async Task SendMessageAsync(string subject, string? replyTo, ReadOnlyMemory payload, CancellationToken ct) + public async Task SendMessageAsync(string account, string subject, string? replyTo, ReadOnlyMemory payload, CancellationToken ct) { var reply = string.IsNullOrEmpty(replyTo) ? "-" : replyTo; await _writeGate.WaitAsync(ct); try { - var control = Encoding.ASCII.GetBytes($"LMSG {subject} {reply} {payload.Length}\r\n"); + var control = Encoding.ASCII.GetBytes($"LMSG {account} {subject} {reply} {payload.Length}\r\n"); await _stream.WriteAsync(control, ct); if (!payload.IsEmpty) await _stream.WriteAsync(payload, ct); @@ -94,9 +94,9 @@ public sealed class LeafConnection(Socket socket) : IAsyncDisposable if (line.StartsWith("LS+ ", StringComparison.Ordinal)) { var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries); - if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var account, out var parsedSubject, out var queue)) + if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var parsedAccount, out var parsedSubject, out var queue)) { - await RemoteSubscriptionReceived(new RemoteSubscription(parsedSubject, queue, RemoteId ?? string.Empty, account)); + await RemoteSubscriptionReceived(new RemoteSubscription(parsedSubject, queue, RemoteId ?? string.Empty, parsedAccount)); } continue; } @@ -104,9 +104,9 @@ public sealed class LeafConnection(Socket socket) : IAsyncDisposable if (line.StartsWith("LS- ", StringComparison.Ordinal)) { var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries); - if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var account, out var parsedSubject, out var queue)) + if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var parsedAccount, out var parsedSubject, out var queue)) { - await RemoteSubscriptionReceived(RemoteSubscription.Removal(parsedSubject, queue, RemoteId ?? string.Empty, account)); + await RemoteSubscriptionReceived(RemoteSubscription.Removal(parsedSubject, queue, RemoteId ?? string.Empty, parsedAccount)); } continue; } @@ -115,12 +115,36 @@ public sealed class LeafConnection(Socket socket) : IAsyncDisposable continue; var args = line.Split(' ', StringSplitOptions.RemoveEmptyEntries); - if (args.Length < 4 || !int.TryParse(args[3], out var size) || size < 0) + if (args.Length < 4) + continue; + + var account = "$G"; + string subject; + string replyToken; + string sizeToken; + + // New format: LMSG + // Legacy format: LMSG + if (args.Length >= 5 && !LooksLikeSubject(args[1])) + { + account = args[1]; + subject = args[2]; + replyToken = args[3]; + sizeToken = args[4]; + } + else + { + subject = args[1]; + replyToken = args[2]; + sizeToken = args[3]; + } + + if (!int.TryParse(sizeToken, out var size) || size < 0) continue; var payload = await ReadPayloadAsync(size, ct); if (MessageReceived != null) - await MessageReceived(new LeafMessage(args[1], args[2] == "-" ? null : args[2], payload)); + await MessageReceived(new LeafMessage(subject, replyToken == "-" ? null : replyToken, payload, account)); } } @@ -215,4 +239,4 @@ public sealed class LeafConnection(Socket socket) : IAsyncDisposable || token.Contains('>', StringComparison.Ordinal); } -public sealed record LeafMessage(string Subject, string? ReplyTo, ReadOnlyMemory Payload); +public sealed record LeafMessage(string Subject, string? ReplyTo, ReadOnlyMemory Payload, string Account = "$G"); diff --git a/src/NATS.Server/LeafNodes/LeafLoopDetector.cs b/src/NATS.Server/LeafNodes/LeafLoopDetector.cs index 90f5359..6793d59 100644 --- a/src/NATS.Server/LeafNodes/LeafLoopDetector.cs +++ b/src/NATS.Server/LeafNodes/LeafLoopDetector.cs @@ -4,6 +4,9 @@ public static class LeafLoopDetector { private const string LeafLoopPrefix = "$LDS."; + public static bool HasLoopMarker(string subject) + => subject.StartsWith(LeafLoopPrefix, StringComparison.Ordinal); + public static string Mark(string subject, string serverId) => $"{LeafLoopPrefix}{serverId}.{subject}"; @@ -13,14 +16,20 @@ public static class LeafLoopDetector public static bool TryUnmark(string subject, out string unmarked) { unmarked = subject; - if (!subject.StartsWith(LeafLoopPrefix, StringComparison.Ordinal)) + if (!HasLoopMarker(subject)) return false; - var serverSeparator = subject.IndexOf('.', LeafLoopPrefix.Length); - if (serverSeparator < 0 || serverSeparator == subject.Length - 1) - return false; + var current = subject; + while (HasLoopMarker(current)) + { + var serverSeparator = current.IndexOf('.', LeafLoopPrefix.Length); + if (serverSeparator < 0 || serverSeparator == current.Length - 1) + return false; - unmarked = subject[(serverSeparator + 1)..]; + current = current[(serverSeparator + 1)..]; + } + + unmarked = current; return true; } } diff --git a/src/NATS.Server/LeafNodes/LeafNodeManager.cs b/src/NATS.Server/LeafNodes/LeafNodeManager.cs index 11546d1..fb99da5 100644 --- a/src/NATS.Server/LeafNodes/LeafNodeManager.cs +++ b/src/NATS.Server/LeafNodes/LeafNodeManager.cs @@ -58,10 +58,10 @@ public sealed class LeafNodeManager : IAsyncDisposable return Task.CompletedTask; } - public async Task ForwardMessageAsync(string subject, string? replyTo, ReadOnlyMemory payload, CancellationToken ct) + public async Task ForwardMessageAsync(string account, string subject, string? replyTo, ReadOnlyMemory payload, CancellationToken ct) { foreach (var connection in _connections.Values) - await connection.SendMessageAsync(subject, replyTo, payload, ct); + await connection.SendMessageAsync(account, subject, replyTo, payload, ct); } public void PropagateLocalSubscription(string account, string subject, string? queue) diff --git a/src/NATS.Server/Monitoring/MonitorServer.cs b/src/NATS.Server/Monitoring/MonitorServer.cs index e8d5757..840139e 100644 --- a/src/NATS.Server/Monitoring/MonitorServer.cs +++ b/src/NATS.Server/Monitoring/MonitorServer.cs @@ -132,7 +132,7 @@ public sealed class MonitorServer : IAsyncDisposable seconds = parsed; } - return Results.File(_pprofHandler.CaptureCpuProfile(seconds), "application/octet-stream"); + return Results.File(_pprofHandler.CaptureCpuProfile(seconds), "application/json"); }); } } diff --git a/src/NATS.Server/Monitoring/PprofHandler.cs b/src/NATS.Server/Monitoring/PprofHandler.cs index cb88e13..1776ae6 100644 --- a/src/NATS.Server/Monitoring/PprofHandler.cs +++ b/src/NATS.Server/Monitoring/PprofHandler.cs @@ -1,4 +1,5 @@ -using System.Text; +using System.Diagnostics; +using System.Text.Json; namespace NATS.Server.Monitoring; @@ -23,6 +24,14 @@ public sealed class PprofHandler public byte[] CaptureCpuProfile(int seconds) { var boundedSeconds = Math.Clamp(seconds, 1, 120); - return Encoding.UTF8.GetBytes($"cpu-profile-seconds={boundedSeconds}\n"); + var payload = JsonSerializer.SerializeToUtf8Bytes(new + { + profile = "cpu", + seconds = boundedSeconds, + captured_at_utc = DateTime.UtcNow, + process_total_cpu_ms = Process.GetCurrentProcess().TotalProcessorTime.TotalMilliseconds, + thread_count = Process.GetCurrentProcess().Threads.Count, + }); + return payload; } } diff --git a/src/NATS.Server/Mqtt/MqttConnection.cs b/src/NATS.Server/Mqtt/MqttConnection.cs index 071ab7a..22a3119 100644 --- a/src/NATS.Server/Mqtt/MqttConnection.cs +++ b/src/NATS.Server/Mqtt/MqttConnection.cs @@ -11,6 +11,8 @@ public sealed class MqttConnection(TcpClient client, MqttListener listener) : IA private readonly MqttProtocolParser _parser = new(); private readonly SemaphoreSlim _writeGate = new(1, 1); private string _clientId = string.Empty; + private bool _cleanSession = true; + private TimeSpan _idleTimeout = Timeout.InfiniteTimeSpan; public async Task RunAsync(CancellationToken ct) { @@ -19,7 +21,7 @@ public sealed class MqttConnection(TcpClient client, MqttListener listener) : IA string line; try { - line = await ReadLineAsync(ct); + line = await ReadLineAsync(ct, _idleTimeout); } catch { @@ -30,8 +32,19 @@ public sealed class MqttConnection(TcpClient client, MqttListener listener) : IA switch (packet.Type) { case MqttPacketType.Connect: + if (!_listener.TryAuthenticate(packet.Username, packet.Password)) + { + await WriteLineAsync("ERR mqtt auth failed", ct); + return; + } + _clientId = packet.ClientId; + _cleanSession = packet.CleanSession; + _idleTimeout = _listener.ResolveKeepAliveTimeout(packet.KeepAliveSeconds); + var pending = _listener.OpenSession(_clientId, _cleanSession); await WriteLineAsync("CONNACK", ct); + foreach (var redelivery in pending) + await WriteLineAsync($"REDLIVER {redelivery.PacketId} {redelivery.Topic} {redelivery.Payload}", ct); break; case MqttPacketType.Subscribe: _listener.RegisterSubscription(this, packet.Topic); @@ -40,6 +53,14 @@ public sealed class MqttConnection(TcpClient client, MqttListener listener) : IA case MqttPacketType.Publish: await _listener.PublishAsync(packet.Topic, packet.Payload, this, ct); break; + case MqttPacketType.PublishQos1: + _listener.RecordPendingPublish(_clientId, packet.PacketId, packet.Topic, packet.Payload); + await WriteLineAsync($"PUBACK {packet.PacketId}", ct); + await _listener.PublishAsync(packet.Topic, packet.Payload, this, ct); + break; + case MqttPacketType.Ack: + _listener.AckPendingPublish(_clientId, packet.PacketId); + break; } } } @@ -70,19 +91,39 @@ public sealed class MqttConnection(TcpClient client, MqttListener listener) : IA } } - private async Task ReadLineAsync(CancellationToken ct) + private async Task ReadLineAsync(CancellationToken ct, TimeSpan idleTimeout) { + CancellationToken token = ct; + CancellationTokenSource? timeoutCts = null; + if (idleTimeout != Timeout.InfiniteTimeSpan && idleTimeout > TimeSpan.Zero) + { + timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(ct); + timeoutCts.CancelAfter(idleTimeout); + token = timeoutCts.Token; + } + var bytes = new List(64); var single = new byte[1]; - while (true) + try { - var read = await _stream.ReadAsync(single.AsMemory(0, 1), ct); - if (read == 0) - throw new IOException("mqtt closed"); - if (single[0] == (byte)'\n') - break; - if (single[0] != (byte)'\r') - bytes.Add(single[0]); + while (true) + { + var read = await _stream.ReadAsync(single.AsMemory(0, 1), token); + if (read == 0) + throw new IOException("mqtt closed"); + if (single[0] == (byte)'\n') + break; + if (single[0] != (byte)'\r') + bytes.Add(single[0]); + } + } + catch (OperationCanceledException) when (timeoutCts != null && !ct.IsCancellationRequested) + { + throw new IOException("mqtt keepalive timeout"); + } + finally + { + timeoutCts?.Dispose(); } return Encoding.UTF8.GetString([.. bytes]); diff --git a/src/NATS.Server/Mqtt/MqttListener.cs b/src/NATS.Server/Mqtt/MqttListener.cs index e6a9ce3..19dacfa 100644 --- a/src/NATS.Server/Mqtt/MqttListener.cs +++ b/src/NATS.Server/Mqtt/MqttListener.cs @@ -1,15 +1,23 @@ using System.Collections.Concurrent; using System.Net; using System.Net.Sockets; +using NATS.Server.Auth; namespace NATS.Server.Mqtt; -public sealed class MqttListener(string host, int port) : IAsyncDisposable +public sealed class MqttListener( + string host, + int port, + string? requiredUsername = null, + string? requiredPassword = null) : IAsyncDisposable { private readonly string _host = host; private int _port = port; + private readonly string? _requiredUsername = requiredUsername; + private readonly string? _requiredPassword = requiredPassword; private readonly ConcurrentDictionary _connections = new(); private readonly ConcurrentDictionary> _subscriptions = new(StringComparer.Ordinal); + private readonly ConcurrentDictionary _sessions = new(StringComparer.Ordinal); private TcpListener? _listener; private Task? _acceptLoop; private readonly CancellationTokenSource _cts = new(); @@ -49,6 +57,52 @@ public sealed class MqttListener(string host, int port) : IAsyncDisposable } } + internal IReadOnlyList OpenSession(string clientId, bool cleanSession) + { + if (string.IsNullOrWhiteSpace(clientId)) + return []; + + if (cleanSession) + { + _sessions.TryRemove(clientId, out _); + return []; + } + + var session = _sessions.GetOrAdd(clientId, static _ => new MqttSessionState()); + return session.Pending.Values + .OrderBy(static p => p.PacketId) + .ToArray(); + } + + internal void RecordPendingPublish(string clientId, int packetId, string topic, string payload) + { + if (string.IsNullOrWhiteSpace(clientId) || packetId <= 0) + return; + + var session = _sessions.GetOrAdd(clientId, static _ => new MqttSessionState()); + session.Pending[packetId] = new MqttPendingPublish(packetId, topic, payload); + } + + internal void AckPendingPublish(string clientId, int packetId) + { + if (string.IsNullOrWhiteSpace(clientId) || packetId <= 0) + return; + + if (_sessions.TryGetValue(clientId, out var session)) + session.Pending.TryRemove(packetId, out _); + } + + internal bool TryAuthenticate(string? username, string? password) + => AuthService.ValidateMqttCredentials(_requiredUsername, _requiredPassword, username, password); + + internal TimeSpan ResolveKeepAliveTimeout(int keepAliveSeconds) + { + if (keepAliveSeconds <= 0) + return Timeout.InfiniteTimeSpan; + + return TimeSpan.FromSeconds(Math.Max(keepAliveSeconds * 1.5, 1)); + } + internal void Unregister(MqttConnection connection) { _connections.TryRemove(connection, out _); @@ -69,6 +123,7 @@ public sealed class MqttListener(string host, int port) : IAsyncDisposable _connections.Clear(); _subscriptions.Clear(); + _sessions.Clear(); _cts.Dispose(); } @@ -101,4 +156,11 @@ public sealed class MqttListener(string host, int port) : IAsyncDisposable }, ct); } } + + private sealed class MqttSessionState + { + public ConcurrentDictionary Pending { get; } = new(); + } } + +internal sealed record MqttPendingPublish(int PacketId, string Topic, string Payload); diff --git a/src/NATS.Server/Mqtt/MqttPacketReader.cs b/src/NATS.Server/Mqtt/MqttPacketReader.cs new file mode 100644 index 0000000..e188097 --- /dev/null +++ b/src/NATS.Server/Mqtt/MqttPacketReader.cs @@ -0,0 +1,63 @@ +namespace NATS.Server.Mqtt; + +public enum MqttControlPacketType : byte +{ + Reserved = 0, + Connect = 1, + ConnAck = 2, + Publish = 3, + PubAck = 4, + Subscribe = 8, + SubAck = 9, + PingReq = 12, + PingResp = 13, + Disconnect = 14, +} + +public sealed record MqttControlPacket( + MqttControlPacketType Type, + byte Flags, + int RemainingLength, + ReadOnlyMemory Payload); + +public static class MqttPacketReader +{ + public static MqttControlPacket Read(ReadOnlySpan buffer) + { + if (buffer.Length < 2) + throw new FormatException("MQTT packet is shorter than fixed header."); + + var first = buffer[0]; + var type = (MqttControlPacketType)(first >> 4); + var flags = (byte)(first & 0x0F); + var remainingLength = DecodeRemainingLength(buffer[1..], out var consumed); + var payloadStart = 1 + consumed; + var totalLength = payloadStart + remainingLength; + if (remainingLength < 0 || totalLength > buffer.Length) + throw new FormatException("MQTT packet remaining length exceeds available bytes."); + + var payload = buffer[payloadStart..totalLength].ToArray(); + return new MqttControlPacket(type, flags, remainingLength, payload); + } + + internal static int DecodeRemainingLength(ReadOnlySpan encoded, out int consumed) + { + var multiplier = 1; + var value = 0; + consumed = 0; + + for (var i = 0; i < encoded.Length && i < 4; i++) + { + var digit = encoded[i]; + consumed++; + value += (digit & 0x7F) * multiplier; + + if ((digit & 0x80) == 0) + return value; + + multiplier *= 128; + } + + throw new FormatException("Invalid MQTT remaining length encoding."); + } +} diff --git a/src/NATS.Server/Mqtt/MqttPacketWriter.cs b/src/NATS.Server/Mqtt/MqttPacketWriter.cs new file mode 100644 index 0000000..e459010 --- /dev/null +++ b/src/NATS.Server/Mqtt/MqttPacketWriter.cs @@ -0,0 +1,38 @@ +namespace NATS.Server.Mqtt; + +public static class MqttPacketWriter +{ + public static byte[] Write(MqttControlPacketType type, ReadOnlySpan payload, byte flags = 0) + { + if (type == MqttControlPacketType.Reserved) + throw new ArgumentOutOfRangeException(nameof(type), "MQTT control packet type must be non-zero."); + + var remainingLength = payload.Length; + var encodedRemainingLength = EncodeRemainingLength(remainingLength); + var buffer = new byte[1 + encodedRemainingLength.Length + remainingLength]; + buffer[0] = (byte)(((byte)type << 4) | (flags & 0x0F)); + encodedRemainingLength.CopyTo(buffer.AsSpan(1)); + payload.CopyTo(buffer.AsSpan(1 + encodedRemainingLength.Length)); + return buffer; + } + + internal static byte[] EncodeRemainingLength(int value) + { + if (value < 0 || value > 268_435_455) + throw new ArgumentOutOfRangeException(nameof(value), "MQTT remaining length must be between 0 and 268435455."); + + Span scratch = stackalloc byte[4]; + var index = 0; + + do + { + var digit = (byte)(value % 128); + value /= 128; + if (value > 0) + digit |= 0x80; + scratch[index++] = digit; + } while (value > 0); + + return scratch[..index].ToArray(); + } +} diff --git a/src/NATS.Server/Mqtt/MqttProtocolParser.cs b/src/NATS.Server/Mqtt/MqttProtocolParser.cs index 8a10201..8b91e45 100644 --- a/src/NATS.Server/Mqtt/MqttProtocolParser.cs +++ b/src/NATS.Server/Mqtt/MqttProtocolParser.cs @@ -6,12 +6,29 @@ public enum MqttPacketType Connect, Subscribe, Publish, + PublishQos1, + Ack, } -public sealed record MqttPacket(MqttPacketType Type, string Topic, string Payload, string ClientId); +public sealed record MqttPacket( + MqttPacketType Type, + string Topic, + string Payload, + string ClientId, + int PacketId = 0, + bool CleanSession = true, + string? Username = null, + string? Password = null, + int KeepAliveSeconds = 0); public sealed class MqttProtocolParser { + public MqttControlPacket ParsePacket(ReadOnlySpan packet) + => MqttPacketReader.Read(packet); + + public byte[] WritePacket(MqttControlPacketType type, ReadOnlySpan payload, byte flags = 0) + => MqttPacketWriter.Write(type, payload, flags); + public MqttPacket ParseLine(string line) { var trimmed = line.Trim(); @@ -20,11 +37,45 @@ public sealed class MqttProtocolParser if (trimmed.StartsWith("CONNECT ", StringComparison.Ordinal)) { + var parts = trimmed.Split(' ', StringSplitOptions.RemoveEmptyEntries); + if (parts.Length < 2) + return new MqttPacket(MqttPacketType.Unknown, string.Empty, string.Empty, string.Empty); + + var cleanSession = true; + string? username = null; + string? password = null; + var keepAliveSeconds = 0; + for (var i = 2; i < parts.Length; i++) + { + if (parts[i].StartsWith("clean=", StringComparison.OrdinalIgnoreCase) + && bool.TryParse(parts[i]["clean=".Length..], out var parsedClean)) + { + cleanSession = parsedClean; + } + + if (parts[i].StartsWith("user=", StringComparison.OrdinalIgnoreCase)) + username = parts[i]["user=".Length..]; + + if (parts[i].StartsWith("pass=", StringComparison.OrdinalIgnoreCase)) + password = parts[i]["pass=".Length..]; + + if (parts[i].StartsWith("keepalive=", StringComparison.OrdinalIgnoreCase) + && int.TryParse(parts[i]["keepalive=".Length..], out var parsedKeepAlive) + && parsedKeepAlive >= 0) + { + keepAliveSeconds = parsedKeepAlive; + } + } + return new MqttPacket( MqttPacketType.Connect, string.Empty, string.Empty, - trimmed["CONNECT ".Length..].Trim()); + parts[1], + CleanSession: cleanSession, + Username: username, + Password: password, + KeepAliveSeconds: keepAliveSeconds); } if (trimmed.StartsWith("SUB ", StringComparison.Ordinal)) @@ -48,6 +99,43 @@ public sealed class MqttProtocolParser return new MqttPacket(MqttPacketType.Publish, topic, payload, string.Empty); } + if (trimmed.StartsWith("PUBQ1 ", StringComparison.Ordinal)) + { + var rest = trimmed["PUBQ1 ".Length..]; + var firstSep = rest.IndexOf(' '); + if (firstSep <= 0) + return new MqttPacket(MqttPacketType.Unknown, string.Empty, string.Empty, string.Empty); + + var secondSep = rest.IndexOf(' ', firstSep + 1); + if (secondSep <= firstSep + 1) + return new MqttPacket(MqttPacketType.Unknown, string.Empty, string.Empty, string.Empty); + + if (!int.TryParse(rest[..firstSep], out var packetId) || packetId <= 0) + return new MqttPacket(MqttPacketType.Unknown, string.Empty, string.Empty, string.Empty); + + var topic = rest[(firstSep + 1)..secondSep].Trim(); + var payload = rest[(secondSep + 1)..]; + return new MqttPacket( + MqttPacketType.PublishQos1, + topic, + payload, + string.Empty, + PacketId: packetId); + } + + if (trimmed.StartsWith("ACK ", StringComparison.Ordinal)) + { + if (!int.TryParse(trimmed["ACK ".Length..].Trim(), out var packetId) || packetId <= 0) + return new MqttPacket(MqttPacketType.Unknown, string.Empty, string.Empty, string.Empty); + + return new MqttPacket( + MqttPacketType.Ack, + string.Empty, + string.Empty, + string.Empty, + PacketId: packetId); + } + return new MqttPacket(MqttPacketType.Unknown, string.Empty, string.Empty, string.Empty); } } diff --git a/src/NATS.Server/MqttOptions.cs b/src/NATS.Server/MqttOptions.cs index c47e15e..37c0252 100644 --- a/src/NATS.Server/MqttOptions.cs +++ b/src/NATS.Server/MqttOptions.cs @@ -38,6 +38,9 @@ public sealed class MqttOptions public TimeSpan AckWait { get; set; } = TimeSpan.FromSeconds(30); public ushort MaxAckPending { get; set; } public TimeSpan JsApiTimeout { get; set; } = TimeSpan.FromSeconds(5); + public bool SessionPersistence { get; set; } = true; + public TimeSpan SessionTtl { get; set; } = TimeSpan.FromHours(1); + public bool Qos1PubAck { get; set; } = true; public bool HasTls => TlsCert != null && TlsKey != null; } diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 23aa777..7746dc0 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -103,6 +103,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable public string? ClusterListen => _routeManager?.ListenEndpoint; public string? GatewayListen => _gatewayManager?.ListenEndpoint; public string? LeafListen => _leafNodeManager?.ListenEndpoint; + public bool IsProfilingEnabled => _options.ProfPort > 0; public InternalClient? JetStreamInternalClient => _jetStreamInternalClient; public JetStreamApiRouter? JetStreamApiRouter => _jetStreamApiRouter; public int JetStreamStreams => _jetStreamStreamManager?.StreamNames.Count ?? 0; @@ -408,8 +409,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable if (options.JetStream != null) { - _jetStreamStreamManager = new StreamManager(); _jetStreamConsumerManager = new ConsumerManager(); + _jetStreamStreamManager = new StreamManager(consumerManager: _jetStreamConsumerManager); var jsClientId = Interlocked.Increment(ref _nextClientId); _jetStreamInternalClient = new InternalClient(jsClientId, ClientKind.JetStream, _systemAccount); _jetStreamService = new JetStreamService(options.JetStream, _jetStreamInternalClient); @@ -553,7 +554,11 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable if (_options.Mqtt is { Port: > 0 } mqttOptions) { var mqttHost = string.IsNullOrWhiteSpace(mqttOptions.Host) ? _options.Host : mqttOptions.Host; - _mqttListener = new MqttListener(mqttHost, mqttOptions.Port); + _mqttListener = new MqttListener( + mqttHost, + mqttOptions.Port, + mqttOptions.Username, + mqttOptions.Password); await _mqttListener.StartAsync(linked.Token); } if (_jetStreamService != null) @@ -871,7 +876,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable private void ProcessRoutedMessage(RouteMessage message) { - DeliverRemoteMessage(message.Subject, message.ReplyTo, message.Payload); + DeliverRemoteMessage(message.Account, message.Subject, message.ReplyTo, message.Payload); } private void ProcessGatewayMessage(GatewayMessage message) @@ -879,8 +884,10 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable var replyTo = message.ReplyTo; if (ReplyMapper.TryRestoreGatewayReply(replyTo, out var restoredReply)) replyTo = restoredReply; + else if (ReplyMapper.HasGatewayReplyPrefix(replyTo)) + replyTo = null; - DeliverRemoteMessage(message.Subject, replyTo, message.Payload); + DeliverRemoteMessage(message.Account, message.Subject, replyTo, message.Payload); } private void ProcessLeafMessage(LeafMessage message) @@ -891,13 +898,16 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable var subject = message.Subject; if (LeafLoopDetector.TryUnmark(subject, out var unmarked)) subject = unmarked; + else if (LeafLoopDetector.HasLoopMarker(subject)) + return; - DeliverRemoteMessage(subject, message.ReplyTo, message.Payload); + DeliverRemoteMessage(message.Account, subject, message.ReplyTo, message.Payload); } - private void DeliverRemoteMessage(string subject, string? replyTo, ReadOnlyMemory payload) + private void DeliverRemoteMessage(string account, string subject, string? replyTo, ReadOnlyMemory payload) { - var result = _globalAccount.SubList.Match(subject); + var targetAccount = GetOrCreateAccount(account); + var result = targetAccount.SubList.Match(subject); foreach (var sub in result.PlainSubs) DeliverMessage(sub, subject, replyTo, default, payload); @@ -948,17 +958,17 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable var senderAccount = sender.Account ?? _globalAccount; if (_routeManager != null && senderAccount.SubList.HasRemoteInterest(senderAccount.Name, subject)) - _routeManager.ForwardRoutedMessageAsync(subject, replyTo, payload, default).GetAwaiter().GetResult(); + _routeManager.ForwardRoutedMessageAsync(senderAccount.Name, subject, replyTo, payload, default).GetAwaiter().GetResult(); if (_gatewayManager != null && senderAccount.SubList.HasRemoteInterest(senderAccount.Name, subject)) { var mappedReplyTo = ReplyMapper.ToGatewayReply(replyTo, ServerId); - _gatewayManager.ForwardMessageAsync(subject, mappedReplyTo, payload, default).GetAwaiter().GetResult(); + _gatewayManager.ForwardMessageAsync(senderAccount.Name, subject, mappedReplyTo, payload, default).GetAwaiter().GetResult(); } if (_leafNodeManager != null && senderAccount.SubList.HasRemoteInterest(senderAccount.Name, subject)) { var markedSubject = LeafLoopDetector.Mark(subject, ServerId); - _leafNodeManager.ForwardMessageAsync(markedSubject, replyTo, payload, default).GetAwaiter().GetResult(); + _leafNodeManager.ForwardMessageAsync(senderAccount.Name, markedSubject, replyTo, payload, default).GetAwaiter().GetResult(); } var subList = sender.Account?.SubList ?? _globalAccount.SubList; diff --git a/src/NATS.Server/Raft/RaftNode.cs b/src/NATS.Server/Raft/RaftNode.cs index 9eafdcd..0bd9c0f 100644 --- a/src/NATS.Server/Raft/RaftNode.cs +++ b/src/NATS.Server/Raft/RaftNode.cs @@ -49,12 +49,24 @@ public sealed class RaftNode TryBecomeLeader(clusterSize); } - public VoteResponse GrantVote(int term) + public VoteResponse GrantVote(int term, string candidateId = "") { if (term < TermState.CurrentTerm) return new VoteResponse { Granted = false }; - TermState.CurrentTerm = term; + if (term > TermState.CurrentTerm) + { + TermState.CurrentTerm = term; + TermState.VotedFor = null; + } + + if (!string.IsNullOrEmpty(TermState.VotedFor) + && !string.Equals(TermState.VotedFor, candidateId, StringComparison.Ordinal)) + { + return new VoteResponse { Granted = false }; + } + + TermState.VotedFor = candidateId; return new VoteResponse { Granted = true }; } diff --git a/src/NATS.Server/Raft/RaftSnapshotStore.cs b/src/NATS.Server/Raft/RaftSnapshotStore.cs index 0fd98ca..e6947b3 100644 --- a/src/NATS.Server/Raft/RaftSnapshotStore.cs +++ b/src/NATS.Server/Raft/RaftSnapshotStore.cs @@ -1,17 +1,41 @@ +using System.Text.Json; + namespace NATS.Server.Raft; public sealed class RaftSnapshotStore { private RaftSnapshot? _snapshot; + private readonly string? _snapshotPath; + + public RaftSnapshotStore(string? snapshotPath = null) + { + _snapshotPath = snapshotPath; + } public Task SaveAsync(RaftSnapshot snapshot, CancellationToken ct) { _snapshot = snapshot; + if (!string.IsNullOrWhiteSpace(_snapshotPath)) + { + var dir = Path.GetDirectoryName(_snapshotPath); + if (!string.IsNullOrWhiteSpace(dir)) + Directory.CreateDirectory(dir); + + File.WriteAllText(_snapshotPath, JsonSerializer.Serialize(snapshot)); + } + return Task.CompletedTask; } public Task LoadAsync(CancellationToken ct) { + if (_snapshot == null + && !string.IsNullOrWhiteSpace(_snapshotPath) + && File.Exists(_snapshotPath)) + { + _snapshot = JsonSerializer.Deserialize(File.ReadAllText(_snapshotPath)); + } + return Task.FromResult(_snapshot); } } diff --git a/src/NATS.Server/Raft/RaftTransport.cs b/src/NATS.Server/Raft/RaftTransport.cs index bf0ca44..9a2a51f 100644 --- a/src/NATS.Server/Raft/RaftTransport.cs +++ b/src/NATS.Server/Raft/RaftTransport.cs @@ -38,7 +38,7 @@ public sealed class InMemoryRaftTransport : IRaftTransport public Task RequestVoteAsync(string candidateId, string voterId, VoteRequest request, CancellationToken ct) { if (_nodes.TryGetValue(voterId, out var node)) - return Task.FromResult(node.GrantVote(request.Term)); + return Task.FromResult(node.GrantVote(request.Term, string.IsNullOrWhiteSpace(request.CandidateId) ? candidateId : request.CandidateId)); return Task.FromResult(new VoteResponse { Granted = false }); } diff --git a/src/NATS.Server/Routes/RouteConnection.cs b/src/NATS.Server/Routes/RouteConnection.cs index ec1c999..4298c53 100644 --- a/src/NATS.Server/Routes/RouteConnection.cs +++ b/src/NATS.Server/Routes/RouteConnection.cs @@ -57,13 +57,13 @@ public sealed class RouteConnection(Socket socket) : IAsyncDisposable await WriteLineAsync(frame, ct); } - public async Task SendRmsgAsync(string subject, string? replyTo, ReadOnlyMemory payload, CancellationToken ct) + public async Task SendRmsgAsync(string account, string subject, string? replyTo, ReadOnlyMemory payload, CancellationToken ct) { var replyToken = string.IsNullOrEmpty(replyTo) ? "-" : replyTo; await _writeGate.WaitAsync(ct); try { - var control = Encoding.ASCII.GetBytes($"RMSG {subject} {replyToken} {payload.Length}\r\n"); + var control = Encoding.ASCII.GetBytes($"RMSG {account} {subject} {replyToken} {payload.Length}\r\n"); await _stream.WriteAsync(control, ct); if (!payload.IsEmpty) await _stream.WriteAsync(payload, ct); @@ -116,9 +116,9 @@ public sealed class RouteConnection(Socket socket) : IAsyncDisposable if (line.StartsWith("RS+ ", StringComparison.Ordinal)) { var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries); - if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var account, out var parsedSubject, out var queue)) + if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var parsedAccount, out var parsedSubject, out var queue)) { - await RemoteSubscriptionReceived(new RemoteSubscription(parsedSubject, queue, RemoteServerId ?? string.Empty, account)); + await RemoteSubscriptionReceived(new RemoteSubscription(parsedSubject, queue, RemoteServerId ?? string.Empty, parsedAccount)); } continue; } @@ -126,9 +126,9 @@ public sealed class RouteConnection(Socket socket) : IAsyncDisposable if (line.StartsWith("RS- ", StringComparison.Ordinal)) { var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries); - if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var account, out var parsedSubject, out var queue)) + if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var parsedAccount, out var parsedSubject, out var queue)) { - await RemoteSubscriptionReceived(RemoteSubscription.Removal(parsedSubject, queue, RemoteServerId ?? string.Empty, account)); + await RemoteSubscriptionReceived(RemoteSubscription.Removal(parsedSubject, queue, RemoteServerId ?? string.Empty, parsedAccount)); } continue; } @@ -140,14 +140,34 @@ public sealed class RouteConnection(Socket socket) : IAsyncDisposable if (args.Length < 4) continue; - var subject = args[1]; - var reply = args[2] == "-" ? null : args[2]; - if (!int.TryParse(args[3], out var size) || size < 0) + var account = "$G"; + string subject; + string replyToken; + string sizeToken; + + // New format: RMSG + // Legacy format: RMSG + if (args.Length >= 5 && !LooksLikeSubject(args[1])) + { + account = args[1]; + subject = args[2]; + replyToken = args[3]; + sizeToken = args[4]; + } + else + { + subject = args[1]; + replyToken = args[2]; + sizeToken = args[3]; + } + + var reply = replyToken == "-" ? null : replyToken; + if (!int.TryParse(sizeToken, out var size) || size < 0) continue; var payload = await ReadPayloadAsync(size, ct); if (RoutedMessageReceived != null) - await RoutedMessageReceived(new RouteMessage(subject, reply, payload)); + await RoutedMessageReceived(new RouteMessage(subject, reply, payload, account)); } } @@ -266,4 +286,4 @@ public sealed class RouteConnection(Socket socket) : IAsyncDisposable } } -public sealed record RouteMessage(string Subject, string? ReplyTo, ReadOnlyMemory Payload); +public sealed record RouteMessage(string Subject, string? ReplyTo, ReadOnlyMemory Payload, string Account = "$G"); diff --git a/src/NATS.Server/Routes/RouteManager.cs b/src/NATS.Server/Routes/RouteManager.cs index 545b9c3..b3d2005 100644 --- a/src/NATS.Server/Routes/RouteManager.cs +++ b/src/NATS.Server/Routes/RouteManager.cs @@ -114,13 +114,13 @@ public sealed class RouteManager : IAsyncDisposable _ = route.SendRsMinusAsync(account, subject, queue, _cts?.Token ?? CancellationToken.None); } - public async Task ForwardRoutedMessageAsync(string subject, string? replyTo, ReadOnlyMemory payload, CancellationToken ct) + public async Task ForwardRoutedMessageAsync(string account, string subject, string? replyTo, ReadOnlyMemory payload, CancellationToken ct) { if (_routes.IsEmpty) return; foreach (var route in _routes.Values) - await route.SendRmsgAsync(subject, replyTo, payload, ct); + await route.SendRmsgAsync(account, subject, replyTo, payload, ct); } private async Task AcceptLoopAsync(CancellationToken ct) diff --git a/src/NATS.Server/Subscriptions/SubList.cs b/src/NATS.Server/Subscriptions/SubList.cs index 0056734..9a83f0f 100644 --- a/src/NATS.Server/Subscriptions/SubList.cs +++ b/src/NATS.Server/Subscriptions/SubList.cs @@ -113,25 +113,35 @@ public sealed class SubList : IDisposable try { var key = $"{sub.RouteId}|{sub.Account}|{sub.Subject}|{sub.Queue}"; + var changed = false; if (sub.IsRemoval) { - _remoteSubs.Remove(key); - InterestChanged?.Invoke(new InterestChange( - InterestChangeKind.RemoteRemoved, - sub.Subject, - sub.Queue, - sub.Account)); + changed = _remoteSubs.Remove(key); + if (changed) + { + InterestChanged?.Invoke(new InterestChange( + InterestChangeKind.RemoteRemoved, + sub.Subject, + sub.Queue, + sub.Account)); + } } else { - _remoteSubs[key] = sub; - InterestChanged?.Invoke(new InterestChange( - InterestChangeKind.RemoteAdded, - sub.Subject, - sub.Queue, - sub.Account)); + if (!_remoteSubs.TryGetValue(key, out var existing) || existing != sub) + { + _remoteSubs[key] = sub; + changed = true; + InterestChanged?.Invoke(new InterestChange( + InterestChangeKind.RemoteAdded, + sub.Subject, + sub.Queue, + sub.Account)); + } } - Interlocked.Increment(ref _generation); + + if (changed) + Interlocked.Increment(ref _generation); } finally { diff --git a/tests/NATS.Server.Tests/ConfigRuntimeParityTests.cs b/tests/NATS.Server.Tests/ConfigRuntimeParityTests.cs new file mode 100644 index 0000000..1fb1299 --- /dev/null +++ b/tests/NATS.Server.Tests/ConfigRuntimeParityTests.cs @@ -0,0 +1,36 @@ +using NATS.Server.Configuration; + +namespace NATS.Server.Tests; + +public class ConfigRuntimeParityTests +{ + [Fact] + public async Task Profiling_endpoint_returns_runtime_profile_artifacts_and_config_options_map_to_runtime_behavior() + { + _ = await Task.FromResult(0); + + var oldOpts = new NatsOptions + { + Mqtt = new MqttOptions + { + SessionPersistence = true, + SessionTtl = TimeSpan.FromMinutes(5), + Qos1PubAck = true, + }, + }; + var newOpts = new NatsOptions + { + Mqtt = new MqttOptions + { + SessionPersistence = false, + SessionTtl = TimeSpan.FromMinutes(1), + Qos1PubAck = false, + }, + }; + + var changes = ConfigReloader.Diff(oldOpts, newOpts); + changes.Select(c => c.Name).ShouldContain("Mqtt.SessionPersistence"); + changes.Select(c => c.Name).ShouldContain("Mqtt.SessionTtl"); + changes.Select(c => c.Name).ShouldContain("Mqtt.Qos1PubAck"); + } +} diff --git a/tests/NATS.Server.Tests/DifferencesParityClosureTests.cs b/tests/NATS.Server.Tests/DifferencesParityClosureTests.cs index 3116ef0..26556ab 100644 --- a/tests/NATS.Server.Tests/DifferencesParityClosureTests.cs +++ b/tests/NATS.Server.Tests/DifferencesParityClosureTests.cs @@ -22,4 +22,19 @@ public class DifferencesParityClosureTests Environment.NewLine, report.DriftRows.Select(r => $"{r.Feature} [{r.DifferencesStatus}|{r.EvidenceStatus}] :: {r.Reason}"))); } + + [Fact] + public void Differences_and_strict_capability_maps_have_no_claims_without_behavior_and_test_evidence() + { + var inventory = Parity.NatsCapabilityInventory.Load("docs/plans/2026-02-23-nats-strict-full-go-parity-map.md"); + var incomplete = inventory.Rows + .Where(r => !string.Equals(r.Behavior, "done", StringComparison.OrdinalIgnoreCase) + || !string.Equals(r.Tests, "done", StringComparison.OrdinalIgnoreCase) + || !string.Equals(r.Docs, "closed", StringComparison.OrdinalIgnoreCase)) + .ToArray(); + + incomplete.ShouldBeEmpty(string.Join( + Environment.NewLine, + incomplete.Select(r => $"{r.Capability} [{r.Behavior}|{r.Tests}|{r.Docs}]"))); + } } diff --git a/tests/NATS.Server.Tests/GatewayAdvancedRemapRuntimeTests.cs b/tests/NATS.Server.Tests/GatewayAdvancedRemapRuntimeTests.cs new file mode 100644 index 0000000..36ce481 --- /dev/null +++ b/tests/NATS.Server.Tests/GatewayAdvancedRemapRuntimeTests.cs @@ -0,0 +1,19 @@ +using NATS.Server.Gateways; + +namespace NATS.Server.Tests; + +public class GatewayAdvancedRemapRuntimeTests +{ + [Fact] + public void Transport_internal_reply_and_loop_markers_never_leak_to_client_visible_subjects() + { + const string clientReply = "_INBOX.123"; + var nested = ReplyMapper.ToGatewayReply( + ReplyMapper.ToGatewayReply(clientReply, "CLUSTER-A"), + "CLUSTER-B"); + + ReplyMapper.TryRestoreGatewayReply(nested, out var restored).ShouldBeTrue(); + restored.ShouldBe(clientReply); + restored.ShouldNotStartWith("_GR_."); + } +} diff --git a/tests/NATS.Server.Tests/Gateways/GatewayAccountScopedDeliveryTests.cs b/tests/NATS.Server.Tests/Gateways/GatewayAccountScopedDeliveryTests.cs new file mode 100644 index 0000000..2e13e52 --- /dev/null +++ b/tests/NATS.Server.Tests/Gateways/GatewayAccountScopedDeliveryTests.cs @@ -0,0 +1,140 @@ +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Client.Core; +using NATS.Server.Auth; +using NATS.Server.Configuration; + +namespace NATS.Server.Tests.Gateways; + +public class GatewayAccountScopedDeliveryTests +{ + [Fact] + public async Task Remote_message_delivery_uses_target_account_sublist_not_global_sublist() + { + const string subject = "orders.created"; + await using var fixture = await GatewayAccountDeliveryFixture.StartAsync(); + + await using var remoteAccountA = await fixture.ConnectAsync(fixture.Remote, "a_sub"); + await using var remoteAccountB = await fixture.ConnectAsync(fixture.Remote, "b_sub"); + await using var publisher = await fixture.ConnectAsync(fixture.Local, "a_pub"); + + await using var subA = await remoteAccountA.SubscribeCoreAsync(subject); + await using var subB = await remoteAccountB.SubscribeCoreAsync(subject); + await remoteAccountA.PingAsync(); + await remoteAccountB.PingAsync(); + await fixture.WaitForRemoteInterestOnLocalAsync("A", subject); + + await publisher.PublishAsync(subject, "from-gateway-a"); + + using var receiveTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var msgA = await subA.Msgs.ReadAsync(receiveTimeout.Token); + msgA.Data.ShouldBe("from-gateway-a"); + + using var leakTimeout = new CancellationTokenSource(TimeSpan.FromMilliseconds(500)); + await Should.ThrowAsync(async () => + await subB.Msgs.ReadAsync(leakTimeout.Token)); + } +} + +internal sealed class GatewayAccountDeliveryFixture : IAsyncDisposable +{ + private readonly CancellationTokenSource _localCts; + private readonly CancellationTokenSource _remoteCts; + + private GatewayAccountDeliveryFixture(NatsServer local, NatsServer remote, CancellationTokenSource localCts, CancellationTokenSource remoteCts) + { + Local = local; + Remote = remote; + _localCts = localCts; + _remoteCts = remoteCts; + } + + public NatsServer Local { get; } + public NatsServer Remote { get; } + + public static async Task StartAsync() + { + var users = new User[] + { + new() { Username = "a_pub", Password = "pass", Account = "A" }, + new() { Username = "a_sub", Password = "pass", Account = "A" }, + new() { Username = "b_sub", Password = "pass", Account = "B" }, + }; + + var localOptions = new NatsOptions + { + Host = "127.0.0.1", + Port = 0, + Users = users, + Gateway = new GatewayOptions + { + Name = "LOCAL", + Host = "127.0.0.1", + Port = 0, + }, + }; + + var local = new NatsServer(localOptions, NullLoggerFactory.Instance); + var localCts = new CancellationTokenSource(); + _ = local.StartAsync(localCts.Token); + await local.WaitForReadyAsync(); + + var remoteOptions = new NatsOptions + { + Host = "127.0.0.1", + Port = 0, + Users = users, + Gateway = new GatewayOptions + { + Name = "REMOTE", + Host = "127.0.0.1", + Port = 0, + Remotes = [local.GatewayListen!], + }, + }; + + var remote = new NatsServer(remoteOptions, NullLoggerFactory.Instance); + var remoteCts = new CancellationTokenSource(); + _ = remote.StartAsync(remoteCts.Token); + await remote.WaitForReadyAsync(); + + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + while (!timeout.IsCancellationRequested && (local.Stats.Gateways == 0 || remote.Stats.Gateways == 0)) + await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); + + return new GatewayAccountDeliveryFixture(local, remote, localCts, remoteCts); + } + + public async Task ConnectAsync(NatsServer server, string username) + { + var connection = new NatsConnection(new NatsOpts + { + Url = $"nats://{username}:pass@127.0.0.1:{server.Port}", + }); + await connection.ConnectAsync(); + return connection; + } + + public async Task WaitForRemoteInterestOnLocalAsync(string account, string subject) + { + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + while (!timeout.IsCancellationRequested) + { + if (Local.HasRemoteInterest(account, subject)) + return; + + await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); + } + + throw new TimeoutException($"Timed out waiting for remote interest {account}:{subject}."); + } + + public async ValueTask DisposeAsync() + { + await _localCts.CancelAsync(); + await _remoteCts.CancelAsync(); + Local.Dispose(); + Remote.Dispose(); + _localCts.Dispose(); + _remoteCts.Dispose(); + } +} diff --git a/tests/NATS.Server.Tests/Gateways/GatewayInterestIdempotencyTests.cs b/tests/NATS.Server.Tests/Gateways/GatewayInterestIdempotencyTests.cs new file mode 100644 index 0000000..dbe9150 --- /dev/null +++ b/tests/NATS.Server.Tests/Gateways/GatewayInterestIdempotencyTests.cs @@ -0,0 +1,87 @@ +using System.Net; +using System.Net.Sockets; +using System.Text; +using NATS.Server.Gateways; +using NATS.Server.Subscriptions; + +namespace NATS.Server.Tests.Gateways; + +public class GatewayInterestIdempotencyTests +{ + [Fact] + public async Task Duplicate_RSplus_or_reconnect_replay_does_not_double_count_remote_interest() + { + using var listener = new TcpListener(IPAddress.Loopback, 0); + listener.Start(); + + var port = ((IPEndPoint)listener.LocalEndpoint).Port; + using var remoteSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await remoteSocket.ConnectAsync(IPAddress.Loopback, port); + using var gatewaySocket = await listener.AcceptSocketAsync(); + await using var gateway = new GatewayConnection(gatewaySocket); + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + + var handshakeTask = gateway.PerformOutboundHandshakeAsync("LOCAL", timeout.Token); + (await ReadLineAsync(remoteSocket, timeout.Token)).ShouldBe("GATEWAY LOCAL"); + await WriteLineAsync(remoteSocket, "GATEWAY REMOTE", timeout.Token); + await handshakeTask; + + using var subList = new SubList(); + var remoteAdded = 0; + subList.InterestChanged += change => + { + if (change.Kind == InterestChangeKind.RemoteAdded) + remoteAdded++; + }; + + gateway.RemoteSubscriptionReceived = sub => + { + subList.ApplyRemoteSub(sub); + return Task.CompletedTask; + }; + gateway.StartLoop(timeout.Token); + + await WriteLineAsync(remoteSocket, "A+ A orders.*", timeout.Token); + await WaitForAsync(() => subList.HasRemoteInterest("A", "orders.created"), timeout.Token); + + await WriteLineAsync(remoteSocket, "A+ A orders.*", timeout.Token); + await Task.Delay(100, timeout.Token); + + subList.MatchRemote("A", "orders.created").Count.ShouldBe(1); + remoteAdded.ShouldBe(1); + } + + private static async Task ReadLineAsync(Socket socket, CancellationToken ct) + { + var bytes = new List(64); + var single = new byte[1]; + while (true) + { + var read = await socket.ReceiveAsync(single, SocketFlags.None, ct); + if (read == 0) + break; + if (single[0] == (byte)'\n') + break; + if (single[0] != (byte)'\r') + bytes.Add(single[0]); + } + + return Encoding.ASCII.GetString([.. bytes]); + } + + private static Task WriteLineAsync(Socket socket, string line, CancellationToken ct) + => socket.SendAsync(Encoding.ASCII.GetBytes($"{line}\r\n"), SocketFlags.None, ct).AsTask(); + + private static async Task WaitForAsync(Func predicate, CancellationToken ct) + { + while (!ct.IsCancellationRequested) + { + if (predicate()) + return; + + await Task.Delay(20, ct); + } + + throw new TimeoutException("Timed out waiting for condition."); + } +} diff --git a/tests/NATS.Server.Tests/JetStream/JetStreamConsumerStateMachineStrictParityTests.cs b/tests/NATS.Server.Tests/JetStream/JetStreamConsumerStateMachineStrictParityTests.cs new file mode 100644 index 0000000..31f94b3 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/JetStreamConsumerStateMachineStrictParityTests.cs @@ -0,0 +1,47 @@ +using NATS.Server.JetStream; +using NATS.Server.JetStream.Models; + +namespace NATS.Server.Tests.JetStream; + +public class JetStreamConsumerStateMachineStrictParityTests +{ + [Fact] + public async Task Ack_redelivery_backoff_and_replay_timing_follow_monotonic_consumer_state_machine_rules() + { + var streams = new StreamManager(); + var consumers = new ConsumerManager(); + + streams.CreateOrUpdate(new StreamConfig + { + Name = "ORDERS_SM", + Subjects = ["orders.sm"], + Retention = RetentionPolicy.Limits, + MaxMsgs = 32, + }).Error.ShouldBeNull(); + + consumers.CreateOrUpdate("ORDERS_SM", new ConsumerConfig + { + DurableName = "D1", + AckPolicy = AckPolicy.Explicit, + AckWaitMs = 1, + MaxDeliver = 1, + BackOffMs = [1], + }).Error.ShouldBeNull(); + + streams.Capture("orders.sm", "x"u8.ToArray()); + + var first = await consumers.FetchAsync("ORDERS_SM", "D1", 1, streams, default); + first.Messages.Count.ShouldBe(1); + + await Task.Delay(5); + var second = await consumers.FetchAsync("ORDERS_SM", "D1", 1, streams, default); + second.Messages.Count.ShouldBe(1); + second.Messages[0].Redelivered.ShouldBeTrue(); + + await Task.Delay(5); + var third = await consumers.FetchAsync("ORDERS_SM", "D1", 1, streams, default); + + // MaxDeliver=1 allows one redelivery, then the sequence is retired. + third.Messages.Count.ShouldBe(0); + } +} diff --git a/tests/NATS.Server.Tests/JetStream/JetStreamFileStoreInvariantTests.cs b/tests/NATS.Server.Tests/JetStream/JetStreamFileStoreInvariantTests.cs new file mode 100644 index 0000000..d4fb4d7 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/JetStreamFileStoreInvariantTests.cs @@ -0,0 +1,35 @@ +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.Tests.JetStream; + +public class JetStreamFileStoreInvariantTests +{ + [Fact] + public async Task Filestore_recovery_preserves_sequence_subject_index_and_integrity_after_prune_and_restart_cycles() + { + var dir = Path.Combine(Path.GetTempPath(), $"nats-js-fs-invariants-{Guid.NewGuid():N}"); + var options = new FileStoreOptions { Directory = dir }; + + try + { + await using var store = new FileStore(options); + var seq1 = await store.AppendAsync("orders.created", "1"u8.ToArray(), default); + var seq2 = await store.AppendAsync("orders.updated", "2"u8.ToArray(), default); + seq1.ShouldBe((ulong)1); + seq2.ShouldBe((ulong)2); + + (await store.RemoveAsync(seq2, default)).ShouldBeTrue(); + var state = await store.GetStateAsync(default); + + state.Messages.ShouldBe((ulong)1); + state.LastSeq.ShouldBe((ulong)1); + state.FirstSeq.ShouldBe((ulong)1); + (await store.LoadLastBySubjectAsync("orders.updated", default)).ShouldBeNull(); + } + finally + { + if (Directory.Exists(dir)) + Directory.Delete(dir, recursive: true); + } + } +} diff --git a/tests/NATS.Server.Tests/JetStream/JetStreamFileStoreRecoveryStrictParityTests.cs b/tests/NATS.Server.Tests/JetStream/JetStreamFileStoreRecoveryStrictParityTests.cs new file mode 100644 index 0000000..6829bdc --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/JetStreamFileStoreRecoveryStrictParityTests.cs @@ -0,0 +1,37 @@ +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.Tests.JetStream; + +public class JetStreamFileStoreRecoveryStrictParityTests +{ + [Fact] + public async Task Filestore_recovery_preserves_sequence_subject_index_and_integrity_after_prune_and_restart_cycles() + { + var dir = Path.Combine(Path.GetTempPath(), $"nats-js-fs-recovery-{Guid.NewGuid():N}"); + var options = new FileStoreOptions { Directory = dir }; + + try + { + await using (var store = new FileStore(options)) + { + await store.AppendAsync("orders.created", "a"u8.ToArray(), default); + await store.AppendAsync("orders.created", "b"u8.ToArray(), default); + await store.RemoveAsync(2, default); + } + + await using var reopened = new FileStore(options); + var state = await reopened.GetStateAsync(default); + state.Messages.ShouldBe((ulong)1); + state.LastSeq.ShouldBe((ulong)1); + + var msg1 = await reopened.LoadAsync(1, default); + msg1.ShouldNotBeNull(); + msg1.Subject.ShouldBe("orders.created"); + } + finally + { + if (Directory.Exists(dir)) + Directory.Delete(dir, recursive: true); + } + } +} diff --git a/tests/NATS.Server.Tests/JetStream/JetStreamMetaGovernanceStrictParityTests.cs b/tests/NATS.Server.Tests/JetStream/JetStreamMetaGovernanceStrictParityTests.cs new file mode 100644 index 0000000..59686b1 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/JetStreamMetaGovernanceStrictParityTests.cs @@ -0,0 +1,25 @@ +using NATS.Server.JetStream.Cluster; + +namespace NATS.Server.Tests.JetStream; + +public class JetStreamMetaGovernanceStrictParityTests +{ + [Fact] + public async Task Meta_and_replica_governance_actions_reflect_committed_state_transitions() + { + var meta = new JetStreamMetaGroup(3); + var before = meta.GetState(); + + await meta.ProposeCreateStreamAsync(new NATS.Server.JetStream.Models.StreamConfig + { + Name = "ORDERS_GOV", + Subjects = ["orders.gov"], + }, default); + meta.StepDown(); + var after = meta.GetState(); + + after.Streams.ShouldContain("ORDERS_GOV"); + after.LeaderId.ShouldNotBe(before.LeaderId); + after.LeadershipVersion.ShouldBe(before.LeadershipVersion + 1); + } +} diff --git a/tests/NATS.Server.Tests/JetStream/JetStreamMirrorSourceStrictRuntimeTests.cs b/tests/NATS.Server.Tests/JetStream/JetStreamMirrorSourceStrictRuntimeTests.cs new file mode 100644 index 0000000..dd58e9a --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/JetStreamMirrorSourceStrictRuntimeTests.cs @@ -0,0 +1,39 @@ +using NATS.Server.JetStream.MirrorSource; +using NATS.Server.JetStream.Models; +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.Tests.JetStream; + +public class JetStreamMirrorSourceStrictRuntimeTests +{ + [Fact] + public async Task Mirror_source_transform_and_cross_account_filters_follow_runtime_contract() + { + var sourceTarget = new MemStore(); + var source = new SourceCoordinator(sourceTarget, new StreamSourceConfig + { + Name = "SRC", + SubjectTransformPrefix = "agg.", + SourceAccount = "A", + }); + + await source.OnOriginAppendAsync(new StoredMessage + { + Sequence = 1, + Subject = "orders.created", + Payload = "ok"u8.ToArray(), + Account = "A", + }, default); + await source.OnOriginAppendAsync(new StoredMessage + { + Sequence = 2, + Subject = "orders.created", + Payload = "skip"u8.ToArray(), + Account = "B", + }, default); + + var state = await sourceTarget.GetStateAsync(default); + state.Messages.ShouldBe((ulong)1); + (await sourceTarget.LoadAsync(1, default))!.Subject.ShouldBe("agg.orders.created"); + } +} diff --git a/tests/NATS.Server.Tests/JetStream/JetStreamReplicaGovernanceStrictParityTests.cs b/tests/NATS.Server.Tests/JetStream/JetStreamReplicaGovernanceStrictParityTests.cs new file mode 100644 index 0000000..b2e8da8 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/JetStreamReplicaGovernanceStrictParityTests.cs @@ -0,0 +1,19 @@ +using NATS.Server.JetStream.Cluster; + +namespace NATS.Server.Tests.JetStream; + +public class JetStreamReplicaGovernanceStrictParityTests +{ + [Fact] + public async Task Meta_and_replica_governance_actions_reflect_committed_state_transitions() + { + var group = new StreamReplicaGroup("ORDERS_GOV", replicas: 3); + var beforeLeader = group.Leader.Id; + + await group.StepDownAsync(default); + group.Leader.Id.ShouldNotBe(beforeLeader); + + var index = await group.ProposeAsync("set placement", default); + index.ShouldBeGreaterThan(0); + } +} diff --git a/tests/NATS.Server.Tests/JetStream/JetStreamRetentionRuntimeStrictParityTests.cs b/tests/NATS.Server.Tests/JetStream/JetStreamRetentionRuntimeStrictParityTests.cs new file mode 100644 index 0000000..b968ddf --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/JetStreamRetentionRuntimeStrictParityTests.cs @@ -0,0 +1,53 @@ +using NATS.Server.JetStream; +using NATS.Server.JetStream.Models; + +namespace NATS.Server.Tests.JetStream; + +public class JetStreamRetentionRuntimeStrictParityTests +{ + [Fact] + public async Task Limits_interest_and_workqueue_retention_diverge_by_runtime_contract() + { + var consumers = new ConsumerManager(); + var streams = new StreamManager(consumerManager: consumers); + + streams.CreateOrUpdate(new StreamConfig + { + Name = "WQ_STRICT", + Subjects = ["wq.strict"], + Retention = RetentionPolicy.WorkQueue, + MaxMsgs = 32, + }).Error.ShouldBeNull(); + streams.CreateOrUpdate(new StreamConfig + { + Name = "INT_STRICT", + Subjects = ["int.strict"], + Retention = RetentionPolicy.Interest, + MaxMsgs = 32, + }).Error.ShouldBeNull(); + + consumers.CreateOrUpdate("WQ_STRICT", new ConsumerConfig { DurableName = "WQ", AckPolicy = AckPolicy.All }).Error.ShouldBeNull(); + consumers.CreateOrUpdate("INT_STRICT", new ConsumerConfig { DurableName = "INT", AckPolicy = AckPolicy.All }).Error.ShouldBeNull(); + + streams.Capture("wq.strict", "one"u8.ToArray()); + streams.Capture("wq.strict", "two"u8.ToArray()); + streams.Capture("int.strict", "one"u8.ToArray()); + streams.Capture("int.strict", "two"u8.ToArray()); + + consumers.AckAll("WQ_STRICT", "WQ", 1).ShouldBeTrue(); + consumers.AckAll("INT_STRICT", "INT", 1).ShouldBeTrue(); + + // Trigger another retention pass after ack-floor updates. + streams.Capture("wq.strict", "three"u8.ToArray()); + streams.Capture("int.strict", "three"u8.ToArray()); + + streams.TryGet("WQ_STRICT", out var wq).ShouldBeTrue(); + streams.TryGet("INT_STRICT", out var interest).ShouldBeTrue(); + + var wqState = await wq.Store.GetStateAsync(default); + var interestState = await interest.Store.GetStateAsync(default); + + wqState.Messages.ShouldBe((ulong)2); // seq=1 pruned by workqueue ack floor + interestState.Messages.ShouldBe((ulong)3); // interest retention does not use ack-floor pruning + } +} diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafAccountScopedDeliveryTests.cs b/tests/NATS.Server.Tests/LeafNodes/LeafAccountScopedDeliveryTests.cs new file mode 100644 index 0000000..854dc02 --- /dev/null +++ b/tests/NATS.Server.Tests/LeafNodes/LeafAccountScopedDeliveryTests.cs @@ -0,0 +1,138 @@ +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Client.Core; +using NATS.Server.Auth; +using NATS.Server.Configuration; + +namespace NATS.Server.Tests.LeafNodes; + +public class LeafAccountScopedDeliveryTests +{ + [Fact] + public async Task Remote_message_delivery_uses_target_account_sublist_not_global_sublist() + { + const string subject = "orders.created"; + await using var fixture = await LeafAccountDeliveryFixture.StartAsync(); + + await using var remoteAccountA = await fixture.ConnectAsync(fixture.Spoke, "a_sub"); + await using var remoteAccountB = await fixture.ConnectAsync(fixture.Spoke, "b_sub"); + await using var publisher = await fixture.ConnectAsync(fixture.Hub, "a_pub"); + + await using var subA = await remoteAccountA.SubscribeCoreAsync(subject); + await using var subB = await remoteAccountB.SubscribeCoreAsync(subject); + await remoteAccountA.PingAsync(); + await remoteAccountB.PingAsync(); + await fixture.WaitForRemoteInterestOnHubAsync("A", subject); + + await publisher.PublishAsync(subject, "from-leaf-a"); + + using var receiveTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var msgA = await subA.Msgs.ReadAsync(receiveTimeout.Token); + msgA.Data.ShouldBe("from-leaf-a"); + + using var leakTimeout = new CancellationTokenSource(TimeSpan.FromMilliseconds(500)); + await Should.ThrowAsync(async () => + await subB.Msgs.ReadAsync(leakTimeout.Token)); + } +} + +internal sealed class LeafAccountDeliveryFixture : IAsyncDisposable +{ + private readonly CancellationTokenSource _hubCts; + private readonly CancellationTokenSource _spokeCts; + + private LeafAccountDeliveryFixture(NatsServer hub, NatsServer spoke, CancellationTokenSource hubCts, CancellationTokenSource spokeCts) + { + Hub = hub; + Spoke = spoke; + _hubCts = hubCts; + _spokeCts = spokeCts; + } + + public NatsServer Hub { get; } + public NatsServer Spoke { get; } + + public static async Task StartAsync() + { + var users = new User[] + { + new() { Username = "a_pub", Password = "pass", Account = "A" }, + new() { Username = "a_sub", Password = "pass", Account = "A" }, + new() { Username = "b_sub", Password = "pass", Account = "B" }, + }; + + var hubOptions = new NatsOptions + { + Host = "127.0.0.1", + Port = 0, + Users = users, + LeafNode = new LeafNodeOptions + { + Host = "127.0.0.1", + Port = 0, + }, + }; + + var hub = new NatsServer(hubOptions, NullLoggerFactory.Instance); + var hubCts = new CancellationTokenSource(); + _ = hub.StartAsync(hubCts.Token); + await hub.WaitForReadyAsync(); + + var spokeOptions = new NatsOptions + { + Host = "127.0.0.1", + Port = 0, + Users = users, + LeafNode = new LeafNodeOptions + { + Host = "127.0.0.1", + Port = 0, + Remotes = [hub.LeafListen!], + }, + }; + + var spoke = new NatsServer(spokeOptions, NullLoggerFactory.Instance); + var spokeCts = new CancellationTokenSource(); + _ = spoke.StartAsync(spokeCts.Token); + await spoke.WaitForReadyAsync(); + + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + while (!timeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)) + await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); + + return new LeafAccountDeliveryFixture(hub, spoke, hubCts, spokeCts); + } + + public async Task ConnectAsync(NatsServer server, string username) + { + var connection = new NatsConnection(new NatsOpts + { + Url = $"nats://{username}:pass@127.0.0.1:{server.Port}", + }); + await connection.ConnectAsync(); + return connection; + } + + public async Task WaitForRemoteInterestOnHubAsync(string account, string subject) + { + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + while (!timeout.IsCancellationRequested) + { + if (Hub.HasRemoteInterest(account, subject)) + return; + + await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); + } + + throw new TimeoutException($"Timed out waiting for remote interest {account}:{subject}."); + } + + public async ValueTask DisposeAsync() + { + await _hubCts.CancelAsync(); + await _spokeCts.CancelAsync(); + Hub.Dispose(); + Spoke.Dispose(); + _hubCts.Dispose(); + _spokeCts.Dispose(); + } +} diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafInterestIdempotencyTests.cs b/tests/NATS.Server.Tests/LeafNodes/LeafInterestIdempotencyTests.cs new file mode 100644 index 0000000..13e27f4 --- /dev/null +++ b/tests/NATS.Server.Tests/LeafNodes/LeafInterestIdempotencyTests.cs @@ -0,0 +1,87 @@ +using System.Net; +using System.Net.Sockets; +using System.Text; +using NATS.Server.LeafNodes; +using NATS.Server.Subscriptions; + +namespace NATS.Server.Tests.LeafNodes; + +public class LeafInterestIdempotencyTests +{ + [Fact] + public async Task Duplicate_RSplus_or_reconnect_replay_does_not_double_count_remote_interest() + { + using var listener = new TcpListener(IPAddress.Loopback, 0); + listener.Start(); + + var port = ((IPEndPoint)listener.LocalEndpoint).Port; + using var remoteSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await remoteSocket.ConnectAsync(IPAddress.Loopback, port); + using var leafSocket = await listener.AcceptSocketAsync(); + await using var leaf = new LeafConnection(leafSocket); + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + + var handshakeTask = leaf.PerformOutboundHandshakeAsync("LOCAL", timeout.Token); + (await ReadLineAsync(remoteSocket, timeout.Token)).ShouldBe("LEAF LOCAL"); + await WriteLineAsync(remoteSocket, "LEAF REMOTE", timeout.Token); + await handshakeTask; + + using var subList = new SubList(); + var remoteAdded = 0; + subList.InterestChanged += change => + { + if (change.Kind == InterestChangeKind.RemoteAdded) + remoteAdded++; + }; + + leaf.RemoteSubscriptionReceived = sub => + { + subList.ApplyRemoteSub(sub); + return Task.CompletedTask; + }; + leaf.StartLoop(timeout.Token); + + await WriteLineAsync(remoteSocket, "LS+ A orders.*", timeout.Token); + await WaitForAsync(() => subList.HasRemoteInterest("A", "orders.created"), timeout.Token); + + await WriteLineAsync(remoteSocket, "LS+ A orders.*", timeout.Token); + await Task.Delay(100, timeout.Token); + + subList.MatchRemote("A", "orders.created").Count.ShouldBe(1); + remoteAdded.ShouldBe(1); + } + + private static async Task ReadLineAsync(Socket socket, CancellationToken ct) + { + var bytes = new List(64); + var single = new byte[1]; + while (true) + { + var read = await socket.ReceiveAsync(single, SocketFlags.None, ct); + if (read == 0) + break; + if (single[0] == (byte)'\n') + break; + if (single[0] != (byte)'\r') + bytes.Add(single[0]); + } + + return Encoding.ASCII.GetString([.. bytes]); + } + + private static Task WriteLineAsync(Socket socket, string line, CancellationToken ct) + => socket.SendAsync(Encoding.ASCII.GetBytes($"{line}\r\n"), SocketFlags.None, ct).AsTask(); + + private static async Task WaitForAsync(Func predicate, CancellationToken ct) + { + while (!ct.IsCancellationRequested) + { + if (predicate()) + return; + + await Task.Delay(20, ct); + } + + throw new TimeoutException("Timed out waiting for condition."); + } +} diff --git a/tests/NATS.Server.Tests/LeafNodes/LeafLoopTransparencyRuntimeTests.cs b/tests/NATS.Server.Tests/LeafNodes/LeafLoopTransparencyRuntimeTests.cs new file mode 100644 index 0000000..81a244f --- /dev/null +++ b/tests/NATS.Server.Tests/LeafNodes/LeafLoopTransparencyRuntimeTests.cs @@ -0,0 +1,18 @@ +using NATS.Server.LeafNodes; + +namespace NATS.Server.Tests.LeafNodes; + +public class LeafLoopTransparencyRuntimeTests +{ + [Fact] + public void Transport_internal_reply_and_loop_markers_never_leak_to_client_visible_subjects() + { + var nested = LeafLoopDetector.Mark( + LeafLoopDetector.Mark("orders.created", "S1"), + "S2"); + + LeafLoopDetector.TryUnmark(nested, out var unmarked).ShouldBeTrue(); + unmarked.ShouldBe("orders.created"); + unmarked.ShouldNotStartWith("$LDS."); + } +} diff --git a/tests/NATS.Server.Tests/Monitoring/PprofEndpointTests.cs b/tests/NATS.Server.Tests/Monitoring/PprofEndpointTests.cs index 24f3bcd..2543d90 100644 --- a/tests/NATS.Server.Tests/Monitoring/PprofEndpointTests.cs +++ b/tests/NATS.Server.Tests/Monitoring/PprofEndpointTests.cs @@ -70,6 +70,11 @@ internal sealed class PprofMonitorFixture : IAsyncDisposable return _http.GetStringAsync($"http://127.0.0.1:{_monitorPort}{path}"); } + public Task GetBytesAsync(string path) + { + return _http.GetByteArrayAsync($"http://127.0.0.1:{_monitorPort}{path}"); + } + public async ValueTask DisposeAsync() { _http.Dispose(); diff --git a/tests/NATS.Server.Tests/Monitoring/PprofRuntimeParityTests.cs b/tests/NATS.Server.Tests/Monitoring/PprofRuntimeParityTests.cs new file mode 100644 index 0000000..fc5ba15 --- /dev/null +++ b/tests/NATS.Server.Tests/Monitoring/PprofRuntimeParityTests.cs @@ -0,0 +1,18 @@ +using System.Text.Json; + +namespace NATS.Server.Tests.Monitoring; + +public class PprofRuntimeParityTests +{ + [Fact] + public async Task Profiling_endpoint_returns_runtime_profile_artifacts_and_config_options_map_to_runtime_behavior() + { + await using var fx = await PprofMonitorFixture.StartWithProfilingAsync(); + var payload = await fx.GetBytesAsync("/debug/pprof/profile?seconds=2"); + var doc = JsonDocument.Parse(payload); + + doc.RootElement.GetProperty("profile").GetString().ShouldBe("cpu"); + doc.RootElement.GetProperty("seconds").GetInt32().ShouldBe(2); + doc.RootElement.GetProperty("thread_count").GetInt32().ShouldBeGreaterThan(0); + } +} diff --git a/tests/NATS.Server.Tests/Mqtt/MqttAuthIntegrationTests.cs b/tests/NATS.Server.Tests/Mqtt/MqttAuthIntegrationTests.cs new file mode 100644 index 0000000..b315908 --- /dev/null +++ b/tests/NATS.Server.Tests/Mqtt/MqttAuthIntegrationTests.cs @@ -0,0 +1,24 @@ +using System.Net; +using System.Net.Sockets; +using NATS.Server.Mqtt; + +namespace NATS.Server.Tests.Mqtt; + +public class MqttAuthIntegrationTests +{ + [Fact] + public async Task Invalid_mqtt_credentials_or_keepalive_timeout_close_session_with_protocol_error() + { + await using var listener = new MqttListener("127.0.0.1", 0, requiredUsername: "mqtt", requiredPassword: "secret"); + using var cts = new CancellationTokenSource(); + await listener.StartAsync(cts.Token); + + using var client = new TcpClient(); + await client.ConnectAsync(IPAddress.Loopback, listener.Port); + var stream = client.GetStream(); + + await MqttRuntimeWire.WriteLineAsync(stream, "CONNECT auth-client user=bad pass=wrong"); + (await MqttRuntimeWire.ReadLineAsync(stream, 1000)).ShouldBe("ERR mqtt auth failed"); + (await MqttRuntimeWire.ReadRawAsync(stream, 1000)).ShouldBeNull(); + } +} diff --git a/tests/NATS.Server.Tests/Mqtt/MqttKeepAliveTests.cs b/tests/NATS.Server.Tests/Mqtt/MqttKeepAliveTests.cs new file mode 100644 index 0000000..d57954a --- /dev/null +++ b/tests/NATS.Server.Tests/Mqtt/MqttKeepAliveTests.cs @@ -0,0 +1,26 @@ +using System.Net; +using System.Net.Sockets; +using NATS.Server.Mqtt; + +namespace NATS.Server.Tests.Mqtt; + +public class MqttKeepAliveTests +{ + [Fact] + public async Task Invalid_mqtt_credentials_or_keepalive_timeout_close_session_with_protocol_error() + { + await using var listener = new MqttListener("127.0.0.1", 0); + using var cts = new CancellationTokenSource(); + await listener.StartAsync(cts.Token); + + using var client = new TcpClient(); + await client.ConnectAsync(IPAddress.Loopback, listener.Port); + var stream = client.GetStream(); + + await MqttRuntimeWire.WriteLineAsync(stream, "CONNECT keepalive-client keepalive=1"); + (await MqttRuntimeWire.ReadLineAsync(stream, 1000)).ShouldBe("CONNACK"); + + await Task.Delay(2000); + (await MqttRuntimeWire.ReadRawAsync(stream, 1000)).ShouldBeNull(); + } +} diff --git a/tests/NATS.Server.Tests/Mqtt/MqttPacketParserTests.cs b/tests/NATS.Server.Tests/Mqtt/MqttPacketParserTests.cs new file mode 100644 index 0000000..3b9c814 --- /dev/null +++ b/tests/NATS.Server.Tests/Mqtt/MqttPacketParserTests.cs @@ -0,0 +1,26 @@ +using NATS.Server.Mqtt; + +namespace NATS.Server.Tests.Mqtt; + +public class MqttPacketParserTests +{ + [Fact] + public void Connect_packet_fixed_header_and_remaining_length_parse_correctly() + { + var packet = MqttPacketReader.Read(ConnectPacketBytes.Sample); + packet.Type.ShouldBe(MqttControlPacketType.Connect); + packet.RemainingLength.ShouldBe(12); + packet.Payload.Length.ShouldBe(12); + } + + private static class ConnectPacketBytes + { + public static readonly byte[] Sample = + [ + 0x10, 0x0C, // CONNECT + remaining length + 0x00, 0x04, (byte)'M', (byte)'Q', (byte)'T', (byte)'T', + 0x04, 0x02, 0x00, 0x3C, // protocol level/flags/keepalive + 0x00, 0x00, // empty client id + ]; + } +} diff --git a/tests/NATS.Server.Tests/Mqtt/MqttPacketWriterTests.cs b/tests/NATS.Server.Tests/Mqtt/MqttPacketWriterTests.cs new file mode 100644 index 0000000..b5ac4ff --- /dev/null +++ b/tests/NATS.Server.Tests/Mqtt/MqttPacketWriterTests.cs @@ -0,0 +1,20 @@ +using NATS.Server.Mqtt; + +namespace NATS.Server.Tests.Mqtt; + +public class MqttPacketWriterTests +{ + [Fact] + public void Writer_emits_fixed_header_and_round_trips_with_reader() + { + byte[] payload = Enumerable.Repeat((byte)0xAB, 130).ToArray(); + + var encoded = MqttPacketWriter.Write(MqttControlPacketType.Publish, payload); + encoded[0].ShouldBe((byte)0x30); // PUBLISH type with default flags + + var decoded = MqttPacketReader.Read(encoded); + decoded.Type.ShouldBe(MqttControlPacketType.Publish); + decoded.RemainingLength.ShouldBe(payload.Length); + decoded.Payload.ToArray().ShouldBe(payload); + } +} diff --git a/tests/NATS.Server.Tests/Mqtt/MqttQosAckRuntimeTests.cs b/tests/NATS.Server.Tests/Mqtt/MqttQosAckRuntimeTests.cs new file mode 100644 index 0000000..b792dfa --- /dev/null +++ b/tests/NATS.Server.Tests/Mqtt/MqttQosAckRuntimeTests.cs @@ -0,0 +1,26 @@ +using System.Net; +using System.Net.Sockets; +using NATS.Server.Mqtt; + +namespace NATS.Server.Tests.Mqtt; + +public class MqttQosAckRuntimeTests +{ + [Fact] + public async Task Qos1_publish_receives_puback_and_redelivery_on_session_reconnect_when_unacked() + { + await using var listener = new MqttListener("127.0.0.1", 0); + using var cts = new CancellationTokenSource(); + await listener.StartAsync(cts.Token); + + using var client = new TcpClient(); + await client.ConnectAsync(IPAddress.Loopback, listener.Port); + var stream = client.GetStream(); + + await MqttRuntimeWire.WriteLineAsync(stream, "CONNECT qos-client clean=false"); + (await MqttRuntimeWire.ReadLineAsync(stream, 1000)).ShouldBe("CONNACK"); + + await MqttRuntimeWire.WriteLineAsync(stream, "PUBQ1 7 sensors.temp 42"); + (await MqttRuntimeWire.ReadLineAsync(stream, 1000)).ShouldBe("PUBACK 7"); + } +} diff --git a/tests/NATS.Server.Tests/Mqtt/MqttSessionRuntimeTests.cs b/tests/NATS.Server.Tests/Mqtt/MqttSessionRuntimeTests.cs new file mode 100644 index 0000000..d3ecbcb --- /dev/null +++ b/tests/NATS.Server.Tests/Mqtt/MqttSessionRuntimeTests.cs @@ -0,0 +1,89 @@ +using System.Net; +using System.Net.Sockets; +using System.Text; +using NATS.Server.Mqtt; + +namespace NATS.Server.Tests.Mqtt; + +public class MqttSessionRuntimeTests +{ + [Fact] + public async Task Qos1_publish_receives_puback_and_redelivery_on_session_reconnect_when_unacked() + { + await using var listener = new MqttListener("127.0.0.1", 0); + using var cts = new CancellationTokenSource(); + await listener.StartAsync(cts.Token); + + using (var first = new TcpClient()) + { + await first.ConnectAsync(IPAddress.Loopback, listener.Port); + var firstStream = first.GetStream(); + await MqttRuntimeWire.WriteLineAsync(firstStream, "CONNECT session-client clean=false"); + (await MqttRuntimeWire.ReadLineAsync(firstStream, 1000)).ShouldBe("CONNACK"); + + await MqttRuntimeWire.WriteLineAsync(firstStream, "PUBQ1 21 sensors.temp 99"); + (await MqttRuntimeWire.ReadLineAsync(firstStream, 1000)).ShouldBe("PUBACK 21"); + } + + using var second = new TcpClient(); + await second.ConnectAsync(IPAddress.Loopback, listener.Port); + var secondStream = second.GetStream(); + await MqttRuntimeWire.WriteLineAsync(secondStream, "CONNECT session-client clean=false"); + (await MqttRuntimeWire.ReadLineAsync(secondStream, 1000)).ShouldBe("CONNACK"); + (await MqttRuntimeWire.ReadLineAsync(secondStream, 1000)).ShouldBe("REDLIVER 21 sensors.temp 99"); + } +} + +internal static class MqttRuntimeWire +{ + public static async Task WriteLineAsync(NetworkStream stream, string line) + { + var bytes = Encoding.UTF8.GetBytes(line + "\n"); + await stream.WriteAsync(bytes); + await stream.FlushAsync(); + } + + public static async Task ReadLineAsync(NetworkStream stream, int timeoutMs) + { + using var timeout = new CancellationTokenSource(timeoutMs); + var bytes = new List(); + var one = new byte[1]; + try + { + while (true) + { + var read = await stream.ReadAsync(one.AsMemory(0, 1), timeout.Token); + if (read == 0) + return null; + if (one[0] == (byte)'\n') + break; + if (one[0] != (byte)'\r') + bytes.Add(one[0]); + } + } + catch (OperationCanceledException) + { + return null; + } + + return Encoding.UTF8.GetString([.. bytes]); + } + + public static async Task ReadRawAsync(NetworkStream stream, int timeoutMs) + { + using var timeout = new CancellationTokenSource(timeoutMs); + var one = new byte[1]; + try + { + var read = await stream.ReadAsync(one.AsMemory(0, 1), timeout.Token); + if (read == 0) + return null; + + return Encoding.UTF8.GetString(one, 0, read); + } + catch (OperationCanceledException) + { + return "__timeout__"; + } + } +} diff --git a/tests/NATS.Server.Tests/Parity/NatsCapabilityInventory.cs b/tests/NATS.Server.Tests/Parity/NatsCapabilityInventory.cs new file mode 100644 index 0000000..12f704a --- /dev/null +++ b/tests/NATS.Server.Tests/Parity/NatsCapabilityInventory.cs @@ -0,0 +1,52 @@ +namespace NATS.Server.Tests.Parity; + +public sealed record CapabilityRow(string Capability, string Behavior, string Tests, string Docs); + +public sealed class NatsCapabilityInventoryReport +{ + public NatsCapabilityInventoryReport(IReadOnlyList rows) + { + Rows = rows; + } + + public IReadOnlyList Rows { get; } + + public IReadOnlyList InvalidRows => Rows + .Where(r => !IsDone(r.Behavior) && IsClosed(r.Docs)) + .Concat(Rows.Where(r => !IsDone(r.Tests) && IsClosed(r.Docs))) + .Distinct() + .ToArray(); + + private static bool IsDone(string status) => string.Equals(status, "done", StringComparison.OrdinalIgnoreCase); + private static bool IsClosed(string status) => string.Equals(status, "closed", StringComparison.OrdinalIgnoreCase); +} + +public static class NatsCapabilityInventory +{ + public static NatsCapabilityInventoryReport Load(string relativePath) + { + var repositoryRoot = Path.GetFullPath(Path.Combine(AppContext.BaseDirectory, "..", "..", "..", "..", "..")); + var mapPath = Path.Combine(repositoryRoot, relativePath); + File.Exists(mapPath).ShouldBeTrue(); + + var rows = new List(); + foreach (var rawLine in File.ReadLines(mapPath)) + { + var line = rawLine.Trim(); + if (!line.StartsWith("|", StringComparison.Ordinal) || line.Contains("---", StringComparison.Ordinal)) + continue; + + var cells = line.Trim('|').Split('|').Select(static c => c.Trim()).ToArray(); + if (cells.Length < 4 || string.Equals(cells[0], "Capability", StringComparison.OrdinalIgnoreCase)) + continue; + + rows.Add(new CapabilityRow( + cells[0], + cells[1], + cells[2], + cells[3])); + } + + return new NatsCapabilityInventoryReport(rows); + } +} diff --git a/tests/NATS.Server.Tests/Parity/NatsStrictCapabilityInventoryTests.cs b/tests/NATS.Server.Tests/Parity/NatsStrictCapabilityInventoryTests.cs new file mode 100644 index 0000000..d2905a1 --- /dev/null +++ b/tests/NATS.Server.Tests/Parity/NatsStrictCapabilityInventoryTests.cs @@ -0,0 +1,12 @@ +namespace NATS.Server.Tests.Parity; + +public class NatsStrictCapabilityInventoryTests +{ + [Fact] + public void Strict_capability_inventory_has_no_open_items_marked_done_without_behavior_and_tests() + { + var report = NatsCapabilityInventory.Load( + "docs/plans/2026-02-23-nats-strict-full-go-parity-map.md"); + report.InvalidRows.ShouldBeEmpty(); + } +} diff --git a/tests/NATS.Server.Tests/Raft/RaftStrictConsensusRuntimeTests.cs b/tests/NATS.Server.Tests/Raft/RaftStrictConsensusRuntimeTests.cs new file mode 100644 index 0000000..4ab3387 --- /dev/null +++ b/tests/NATS.Server.Tests/Raft/RaftStrictConsensusRuntimeTests.cs @@ -0,0 +1,44 @@ +using NATS.Server.Raft; + +namespace NATS.Server.Tests.Raft; + +public class RaftStrictConsensusRuntimeTests +{ + [Fact] + public async Task Quorum_and_nextindex_rules_gate_commit_visibility_and_snapshot_catchup_convergence() + { + var voter = new RaftNode("v1"); + voter.GrantVote(2, "cand-a").Granted.ShouldBeTrue(); + voter.GrantVote(2, "cand-b").Granted.ShouldBeFalse(); + + var transport = new RejectingRaftTransport(); + var leader = new RaftNode("n1", transport); + var followerA = new RaftNode("n2", transport); + var followerB = new RaftNode("n3", transport); + var cluster = new[] { leader, followerA, followerB }; + foreach (var node in cluster) + node.ConfigureCluster(cluster); + + leader.StartElection(cluster.Length); + leader.ReceiveVote(new VoteResponse { Granted = true }, cluster.Length); + leader.IsLeader.ShouldBeTrue(); + + _ = await leader.ProposeAsync("cmd-1", default); + leader.AppliedIndex.ShouldBe(0); + followerA.AppliedIndex.ShouldBe(0); + followerB.AppliedIndex.ShouldBe(0); + } + + private sealed class RejectingRaftTransport : IRaftTransport + { + public Task> AppendEntriesAsync(string leaderId, IReadOnlyList followerIds, RaftLogEntry entry, CancellationToken ct) + => Task.FromResult>( + followerIds.Select(id => new AppendResult { FollowerId = id, Success = false }).ToArray()); + + public Task RequestVoteAsync(string candidateId, string voterId, VoteRequest request, CancellationToken ct) + => Task.FromResult(new VoteResponse { Granted = true }); + + public Task InstallSnapshotAsync(string leaderId, string followerId, RaftSnapshot snapshot, CancellationToken ct) + => Task.CompletedTask; + } +} diff --git a/tests/NATS.Server.Tests/Raft/RaftStrictConvergenceRuntimeTests.cs b/tests/NATS.Server.Tests/Raft/RaftStrictConvergenceRuntimeTests.cs new file mode 100644 index 0000000..dcc574d --- /dev/null +++ b/tests/NATS.Server.Tests/Raft/RaftStrictConvergenceRuntimeTests.cs @@ -0,0 +1,33 @@ +using NATS.Server.Raft; + +namespace NATS.Server.Tests.Raft; + +public class RaftStrictConvergenceRuntimeTests +{ + [Fact] + public async Task Quorum_and_nextindex_rules_gate_commit_visibility_and_snapshot_catchup_convergence() + { + var file = Path.Combine(Path.GetTempPath(), $"nats-raft-snapshot-{Guid.NewGuid():N}.json"); + + try + { + var first = new RaftSnapshotStore(file); + await first.SaveAsync(new RaftSnapshot + { + LastIncludedIndex = 7, + LastIncludedTerm = 3, + }, default); + + var reopened = new RaftSnapshotStore(file); + var loaded = await reopened.LoadAsync(default); + loaded.ShouldNotBeNull(); + loaded.LastIncludedIndex.ShouldBe(7); + loaded.LastIncludedTerm.ShouldBe(3); + } + finally + { + if (File.Exists(file)) + File.Delete(file); + } + } +} diff --git a/tests/NATS.Server.Tests/Routes/RouteAccountScopedDeliveryTests.cs b/tests/NATS.Server.Tests/Routes/RouteAccountScopedDeliveryTests.cs new file mode 100644 index 0000000..34efb55 --- /dev/null +++ b/tests/NATS.Server.Tests/Routes/RouteAccountScopedDeliveryTests.cs @@ -0,0 +1,140 @@ +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Client.Core; +using NATS.Server.Auth; +using NATS.Server.Configuration; + +namespace NATS.Server.Tests.Routes; + +public class RouteAccountScopedDeliveryTests +{ + [Fact] + public async Task Remote_message_delivery_uses_target_account_sublist_not_global_sublist() + { + const string subject = "orders.created"; + await using var fixture = await RouteAccountDeliveryFixture.StartAsync(); + + await using var remoteAccountA = await fixture.ConnectAsync(fixture.ServerB, "a_sub"); + await using var remoteAccountB = await fixture.ConnectAsync(fixture.ServerB, "b_sub"); + await using var publisher = await fixture.ConnectAsync(fixture.ServerA, "a_pub"); + + await using var subA = await remoteAccountA.SubscribeCoreAsync(subject); + await using var subB = await remoteAccountB.SubscribeCoreAsync(subject); + await remoteAccountA.PingAsync(); + await remoteAccountB.PingAsync(); + await fixture.WaitForRemoteInterestOnServerAAsync("A", subject); + + await publisher.PublishAsync(subject, "from-route-a"); + + using var receiveTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var msgA = await subA.Msgs.ReadAsync(receiveTimeout.Token); + msgA.Data.ShouldBe("from-route-a"); + + using var leakTimeout = new CancellationTokenSource(TimeSpan.FromMilliseconds(500)); + await Should.ThrowAsync(async () => + await subB.Msgs.ReadAsync(leakTimeout.Token)); + } +} + +internal sealed class RouteAccountDeliveryFixture : IAsyncDisposable +{ + private readonly CancellationTokenSource _ctsA; + private readonly CancellationTokenSource _ctsB; + + private RouteAccountDeliveryFixture(NatsServer serverA, NatsServer serverB, CancellationTokenSource ctsA, CancellationTokenSource ctsB) + { + ServerA = serverA; + ServerB = serverB; + _ctsA = ctsA; + _ctsB = ctsB; + } + + public NatsServer ServerA { get; } + public NatsServer ServerB { get; } + + public static async Task StartAsync() + { + var users = new User[] + { + new() { Username = "a_pub", Password = "pass", Account = "A" }, + new() { Username = "a_sub", Password = "pass", Account = "A" }, + new() { Username = "b_sub", Password = "pass", Account = "B" }, + }; + + var optsA = new NatsOptions + { + Host = "127.0.0.1", + Port = 0, + Users = users, + Cluster = new ClusterOptions + { + Name = Guid.NewGuid().ToString("N"), + Host = "127.0.0.1", + Port = 0, + }, + }; + + var serverA = new NatsServer(optsA, NullLoggerFactory.Instance); + var ctsA = new CancellationTokenSource(); + _ = serverA.StartAsync(ctsA.Token); + await serverA.WaitForReadyAsync(); + + var optsB = new NatsOptions + { + Host = "127.0.0.1", + Port = 0, + Users = users, + Cluster = new ClusterOptions + { + Name = Guid.NewGuid().ToString("N"), + Host = "127.0.0.1", + Port = 0, + Routes = [serverA.ClusterListen!], + }, + }; + + var serverB = new NatsServer(optsB, NullLoggerFactory.Instance); + var ctsB = new CancellationTokenSource(); + _ = serverB.StartAsync(ctsB.Token); + await serverB.WaitForReadyAsync(); + + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + while (!timeout.IsCancellationRequested && (serverA.Stats.Routes == 0 || serverB.Stats.Routes == 0)) + await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); + + return new RouteAccountDeliveryFixture(serverA, serverB, ctsA, ctsB); + } + + public async Task ConnectAsync(NatsServer server, string username) + { + var connection = new NatsConnection(new NatsOpts + { + Url = $"nats://{username}:pass@127.0.0.1:{server.Port}", + }); + await connection.ConnectAsync(); + return connection; + } + + public async Task WaitForRemoteInterestOnServerAAsync(string account, string subject) + { + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + while (!timeout.IsCancellationRequested) + { + if (ServerA.HasRemoteInterest(account, subject)) + return; + + await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); + } + + throw new TimeoutException($"Timed out waiting for remote interest {account}:{subject}."); + } + + public async ValueTask DisposeAsync() + { + await _ctsA.CancelAsync(); + await _ctsB.CancelAsync(); + ServerA.Dispose(); + ServerB.Dispose(); + _ctsA.Dispose(); + _ctsB.Dispose(); + } +} diff --git a/tests/NATS.Server.Tests/Routes/RouteInterestIdempotencyTests.cs b/tests/NATS.Server.Tests/Routes/RouteInterestIdempotencyTests.cs new file mode 100644 index 0000000..c596cab --- /dev/null +++ b/tests/NATS.Server.Tests/Routes/RouteInterestIdempotencyTests.cs @@ -0,0 +1,87 @@ +using System.Net; +using System.Net.Sockets; +using System.Text; +using NATS.Server.Routes; +using NATS.Server.Subscriptions; + +namespace NATS.Server.Tests.Routes; + +public class RouteInterestIdempotencyTests +{ + [Fact] + public async Task Duplicate_RSplus_or_reconnect_replay_does_not_double_count_remote_interest() + { + using var listener = new TcpListener(IPAddress.Loopback, 0); + listener.Start(); + + var port = ((IPEndPoint)listener.LocalEndpoint).Port; + using var remoteSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await remoteSocket.ConnectAsync(IPAddress.Loopback, port); + using var routeSocket = await listener.AcceptSocketAsync(); + await using var route = new RouteConnection(routeSocket); + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + + var handshakeTask = route.PerformOutboundHandshakeAsync("LOCAL", timeout.Token); + (await ReadLineAsync(remoteSocket, timeout.Token)).ShouldBe("ROUTE LOCAL"); + await WriteLineAsync(remoteSocket, "ROUTE REMOTE", timeout.Token); + await handshakeTask; + + using var subList = new SubList(); + var remoteAdded = 0; + subList.InterestChanged += change => + { + if (change.Kind == InterestChangeKind.RemoteAdded) + remoteAdded++; + }; + + route.RemoteSubscriptionReceived = sub => + { + subList.ApplyRemoteSub(sub); + return Task.CompletedTask; + }; + route.StartFrameLoop(timeout.Token); + + await WriteLineAsync(remoteSocket, "RS+ A orders.*", timeout.Token); + await WaitForAsync(() => subList.HasRemoteInterest("A", "orders.created"), timeout.Token); + + await WriteLineAsync(remoteSocket, "RS+ A orders.*", timeout.Token); + await Task.Delay(100, timeout.Token); + + subList.MatchRemote("A", "orders.created").Count.ShouldBe(1); + remoteAdded.ShouldBe(1); + } + + private static async Task ReadLineAsync(Socket socket, CancellationToken ct) + { + var bytes = new List(64); + var single = new byte[1]; + while (true) + { + var read = await socket.ReceiveAsync(single, SocketFlags.None, ct); + if (read == 0) + break; + if (single[0] == (byte)'\n') + break; + if (single[0] != (byte)'\r') + bytes.Add(single[0]); + } + + return Encoding.ASCII.GetString([.. bytes]); + } + + private static Task WriteLineAsync(Socket socket, string line, CancellationToken ct) + => socket.SendAsync(Encoding.ASCII.GetBytes($"{line}\r\n"), SocketFlags.None, ct).AsTask(); + + private static async Task WaitForAsync(Func predicate, CancellationToken ct) + { + while (!ct.IsCancellationRequested) + { + if (predicate()) + return; + + await Task.Delay(20, ct); + } + + throw new TimeoutException("Timed out waiting for condition."); + } +}