From f46b33192117f865b87f1e94ab050990a7b59055 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 10:16:16 -0500 Subject: [PATCH] feat: complete remaining jetstream parity implementation plan --- differences.md | 31 ++- ...26-02-23-jetstream-remaining-parity-map.md | 24 +++ ...jetstream-remaining-parity-verification.md | 44 ++++ scripts/jetstream/extract-go-js-api.sh | 41 ++++ .../Api/Handlers/AccountApiHandlers.cs | 16 ++ .../Api/Handlers/ClusterControlApiHandlers.cs | 23 +++ .../Api/Handlers/ConsumerApiHandlers.cs | 157 ++++++++++++++- .../Api/Handlers/DirectApiHandlers.cs | 61 ++++++ .../Api/Handlers/StreamApiHandlers.cs | 189 +++++++++++++++++- .../JetStream/Api/JetStreamApiResponse.cs | 43 ++++ .../JetStream/Api/JetStreamApiRouter.cs | 74 ++++++- .../JetStream/Api/JetStreamApiSubjects.cs | 29 +++ .../JetStream/Cluster/JetStreamMetaGroup.cs | 6 + src/NATS.Server/JetStream/ConsumerManager.cs | 59 +++++- .../JetStream/Consumers/AckProcessor.cs | 7 + .../JetStream/Consumers/PullConsumerEngine.cs | 24 ++- .../JetStream/Consumers/PushConsumerEngine.cs | 2 +- .../JetStream/Models/ConsumerConfig.cs | 3 + .../JetStream/Models/JetStreamPolicies.cs | 30 +++ .../JetStream/Models/StreamConfig.cs | 3 + .../JetStream/Publish/JetStreamPublisher.cs | 33 ++- .../JetStream/Publish/PublishOptions.cs | 7 + .../JetStream/Publish/PublishPreconditions.cs | 3 + .../Snapshots/StreamSnapshotService.cs | 10 + .../JetStream/Storage/FileStore.cs | 60 ++++++ .../JetStream/Storage/IStreamStore.cs | 4 + src/NATS.Server/JetStream/Storage/MemStore.cs | 75 +++++++ src/NATS.Server/JetStream/StreamManager.cs | 61 ++++++ .../Validation/JetStreamConfigValidator.cs | 12 +- src/NATS.Server/Monitoring/JszHandler.cs | 8 + src/NATS.Server/Monitoring/VarzHandler.cs | 5 + src/NATS.Server/NatsServer.cs | 15 ++ src/NATS.Server/Raft/RaftNode.cs | 10 + src/NATS.Server/ServerStats.cs | 2 + .../JetStreamAccountInfoApiTests.cs | 17 ++ .../NATS.Server.Tests/JetStreamApiFixture.cs | 43 ++++ .../JetStreamApiInventoryTests.cs | 55 +++++ .../JetStreamApiProtocolIntegrationTests.cs | 68 +++++++ .../JetStreamApiRouterCoverageTests.cs | 32 +++ .../JetStreamClusterControlApiTests.cs | 19 ++ .../JetStreamConsumerControlApiTests.cs | 14 ++ .../JetStreamConsumerListApiTests.cs | 17 ++ .../JetStreamConsumerNextApiTests.cs | 15 ++ .../JetStreamDirectGetApiTests.cs | 15 ++ .../JetStreamExpectedHeaderTests.cs | 14 ++ .../JetStreamIntegrationMatrix.cs | 50 +++++ .../JetStreamIntegrationMatrixTests.cs | 28 +-- .../JetStreamMetaGroupTests.cs | 16 +- .../JetStreamMonitoringParityTests.cs | 43 ++++ .../JetStreamPolicyValidationTests.cs | 22 ++ .../JetStreamPullConsumerContractTests.cs | 14 ++ .../JetStreamPushConsumerContractTests.cs | 17 ++ .../JetStreamSnapshotRestoreApiTests.cs | 17 ++ .../JetStreamStoreIndexTests.cs | 18 ++ .../JetStreamStreamLifecycleApiTests.cs | 16 ++ .../JetStreamStreamListApiTests.cs | 16 ++ .../JetStreamStreamMessageApiTests.cs | 20 ++ .../RaftSafetyContractTests.cs | 19 ++ .../StreamStoreContractTests.cs | 12 ++ 59 files changed, 1734 insertions(+), 54 deletions(-) create mode 100644 docs/plans/2026-02-23-jetstream-remaining-parity-map.md create mode 100644 docs/plans/2026-02-23-jetstream-remaining-parity-verification.md create mode 100755 scripts/jetstream/extract-go-js-api.sh create mode 100644 src/NATS.Server/JetStream/Api/Handlers/AccountApiHandlers.cs create mode 100644 src/NATS.Server/JetStream/Api/Handlers/ClusterControlApiHandlers.cs create mode 100644 src/NATS.Server/JetStream/Api/Handlers/DirectApiHandlers.cs create mode 100644 src/NATS.Server/JetStream/Api/JetStreamApiSubjects.cs create mode 100644 src/NATS.Server/JetStream/Models/JetStreamPolicies.cs create mode 100644 src/NATS.Server/JetStream/Publish/PublishOptions.cs create mode 100644 src/NATS.Server/JetStream/Snapshots/StreamSnapshotService.cs create mode 100644 tests/NATS.Server.Tests/JetStreamAccountInfoApiTests.cs create mode 100644 tests/NATS.Server.Tests/JetStreamApiInventoryTests.cs create mode 100644 tests/NATS.Server.Tests/JetStreamApiProtocolIntegrationTests.cs create mode 100644 tests/NATS.Server.Tests/JetStreamApiRouterCoverageTests.cs create mode 100644 tests/NATS.Server.Tests/JetStreamClusterControlApiTests.cs create mode 100644 tests/NATS.Server.Tests/JetStreamConsumerControlApiTests.cs create mode 100644 tests/NATS.Server.Tests/JetStreamConsumerListApiTests.cs create mode 100644 tests/NATS.Server.Tests/JetStreamConsumerNextApiTests.cs create mode 100644 tests/NATS.Server.Tests/JetStreamDirectGetApiTests.cs create mode 100644 tests/NATS.Server.Tests/JetStreamExpectedHeaderTests.cs create mode 100644 tests/NATS.Server.Tests/JetStreamIntegrationMatrix.cs create mode 100644 tests/NATS.Server.Tests/JetStreamMonitoringParityTests.cs create mode 100644 tests/NATS.Server.Tests/JetStreamPolicyValidationTests.cs create mode 100644 tests/NATS.Server.Tests/JetStreamPullConsumerContractTests.cs create mode 100644 tests/NATS.Server.Tests/JetStreamPushConsumerContractTests.cs create mode 100644 tests/NATS.Server.Tests/JetStreamSnapshotRestoreApiTests.cs create mode 100644 tests/NATS.Server.Tests/JetStreamStoreIndexTests.cs create mode 100644 tests/NATS.Server.Tests/JetStreamStreamLifecycleApiTests.cs create mode 100644 tests/NATS.Server.Tests/JetStreamStreamListApiTests.cs create mode 100644 tests/NATS.Server.Tests/JetStreamStreamMessageApiTests.cs create mode 100644 tests/NATS.Server.Tests/RaftSafetyContractTests.cs diff --git a/differences.md b/differences.md index 86d3de3..4e2b1bf 100644 --- a/differences.md +++ b/differences.md @@ -288,7 +288,7 @@ Go implements a sophisticated slow consumer detection system: | `/subz` / `/subscriptionsz` | Y | Y | Account filtering, test subject filtering, pagination, and subscription details | | `/accountz` | Y | Stub | Returns empty response | | `/accstatz` | Y | Stub | Returns empty response | -| `/jsz` | Y | Y | Returns live JetStream counts/config via `JszHandler` | +| `/jsz` | Y | Y | Returns live JetStream counts/config and API totals/errors via `JszHandler` | ### Varz Response | Field Category | Go | .NET | Notes | @@ -303,7 +303,7 @@ Go implements a sophisticated slow consumer detection system: | Messages (in/out msgs/bytes) | Y | Y | | | SlowConsumer breakdown | Y | N | Go tracks per connection type | | Cluster/Gateway/Leaf blocks | Y | Partial | Config projection present; `/gatewayz` and `/leafz` endpoints remain stubs | -| JetStream block | Y | Y | Includes live JetStream config + stream/consumer counts | +| JetStream block | Y | Y | Includes live JetStream config, stream/consumer counts, and API totals/errors | | TLS cert expiry info | Y | Y | `TlsCertNotAfter` loaded via `X509CertificateLoader` in `/varz` | ### Connz Response @@ -415,3 +415,30 @@ The following items from the original gap list have been implemented: ### Remaining Lower Priority 1. **Dynamic buffer sizing** — delegated to Pipe, less optimized for long-lived connections + +--- + +## 10. JetStream Remaining Parity (2026-02-23) + +### Newly Ported API Families +- `$JS.API.INFO` +- `$JS.API.STREAM.UPDATE.*`, `$JS.API.STREAM.DELETE.*`, `$JS.API.STREAM.NAMES`, `$JS.API.STREAM.LIST` +- `$JS.API.STREAM.MSG.GET.*`, `$JS.API.STREAM.MSG.DELETE.*`, `$JS.API.STREAM.PURGE.*` +- `$JS.API.DIRECT.GET.*` +- `$JS.API.STREAM.SNAPSHOT.*`, `$JS.API.STREAM.RESTORE.*` +- `$JS.API.CONSUMER.NAMES.*`, `$JS.API.CONSUMER.LIST.*`, `$JS.API.CONSUMER.DELETE.*.*` +- `$JS.API.CONSUMER.PAUSE.*.*`, `$JS.API.CONSUMER.RESET.*.*`, `$JS.API.CONSUMER.UNPIN.*.*` +- `$JS.API.CONSUMER.MSG.NEXT.*.*` +- `$JS.API.STREAM.LEADER.STEPDOWN.*`, `$JS.API.META.LEADER.STEPDOWN` + +### Runtime/Storage/RAFT Parity Additions +- JetStream publish precondition support for expected last sequence (`ErrorCode=10071` on mismatch). +- Pull consumer `no_wait` contract support (`TimedOut=false` on immediate empty fetch). +- Ack-all pending floor behavior via `AckProcessor.AckAll` and pending-count introspection. +- Stream store subject index support (`LoadLastBySubjectAsync`) in `MemStore` and `FileStore`. +- RAFT stale-term append rejection (`TryAppendFromLeaderAsync` throws on stale term). +- `/jsz` and `/varz` now expose JetStream API totals/errors from server stats. + +### Remaining Explicit Deltas +- Internal JetStream connection type remains unimplemented (`JETSTREAM (internal)` is still `N`). +- Monitoring endpoints `/routez`, `/gatewayz`, `/leafz`, `/accountz`, `/accstatz` remain stubbed. diff --git a/docs/plans/2026-02-23-jetstream-remaining-parity-map.md b/docs/plans/2026-02-23-jetstream-remaining-parity-map.md new file mode 100644 index 0000000..816b6b3 --- /dev/null +++ b/docs/plans/2026-02-23-jetstream-remaining-parity-map.md @@ -0,0 +1,24 @@ +# JetStream Remaining Parity Map + +| Go Subject | .NET Route | Status | Test | +|---|---|---|---| +| $JS.API.INFO | `AccountApiHandlers.HandleInfo` | ported | `JetStreamAccountInfoApiTests.Account_info_returns_jetstream_limits_and_usage_shape` | +| $JS.API.STREAM.UPDATE.* | `StreamApiHandlers.HandleUpdate` | ported | `JetStreamStreamLifecycleApiTests.Stream_update_and_delete_roundtrip` | +| $JS.API.STREAM.DELETE.* | `StreamApiHandlers.HandleDelete` | ported | `JetStreamStreamLifecycleApiTests.Stream_update_and_delete_roundtrip` | +| $JS.API.STREAM.NAMES | `StreamApiHandlers.HandleNames` | ported | `JetStreamStreamListApiTests.Stream_names_and_list_return_created_streams` | +| $JS.API.STREAM.LIST | `StreamApiHandlers.HandleList` | ported | `JetStreamStreamListApiTests.Stream_names_and_list_return_created_streams` | +| $JS.API.STREAM.MSG.GET.* | `StreamApiHandlers.HandleMessageGet` | ported | `JetStreamStreamMessageApiTests.Stream_msg_get_delete_and_purge_change_state` | +| $JS.API.STREAM.MSG.DELETE.* | `StreamApiHandlers.HandleMessageDelete` | ported | `JetStreamStreamMessageApiTests.Stream_msg_get_delete_and_purge_change_state` | +| $JS.API.STREAM.PURGE.* | `StreamApiHandlers.HandlePurge` | ported | `JetStreamStreamMessageApiTests.Stream_msg_get_delete_and_purge_change_state` | +| $JS.API.DIRECT.GET.* | `DirectApiHandlers.HandleGet` | ported | `JetStreamDirectGetApiTests.Direct_get_returns_message_without_stream_info_wrapper` | +| $JS.API.STREAM.SNAPSHOT.* | `StreamApiHandlers.HandleSnapshot` | ported | `JetStreamSnapshotRestoreApiTests.Snapshot_then_restore_reconstructs_messages` | +| $JS.API.STREAM.RESTORE.* | `StreamApiHandlers.HandleRestore` | ported | `JetStreamSnapshotRestoreApiTests.Snapshot_then_restore_reconstructs_messages` | +| $JS.API.CONSUMER.NAMES.* | `ConsumerApiHandlers.HandleNames` | ported | `JetStreamConsumerListApiTests.Consumer_names_list_and_delete_are_supported` | +| $JS.API.CONSUMER.LIST.* | `ConsumerApiHandlers.HandleList` | ported | `JetStreamConsumerListApiTests.Consumer_names_list_and_delete_are_supported` | +| $JS.API.CONSUMER.DELETE.*.* | `ConsumerApiHandlers.HandleDelete` | ported | `JetStreamConsumerListApiTests.Consumer_names_list_and_delete_are_supported` | +| $JS.API.CONSUMER.PAUSE.*.* | `ConsumerApiHandlers.HandlePause` | ported | `JetStreamConsumerControlApiTests.Consumer_pause_reset_unpin_mutate_state` | +| $JS.API.CONSUMER.RESET.*.* | `ConsumerApiHandlers.HandleReset` | ported | `JetStreamConsumerControlApiTests.Consumer_pause_reset_unpin_mutate_state` | +| $JS.API.CONSUMER.UNPIN.*.* | `ConsumerApiHandlers.HandleUnpin` | ported | `JetStreamConsumerControlApiTests.Consumer_pause_reset_unpin_mutate_state` | +| $JS.API.CONSUMER.MSG.NEXT.*.* | `ConsumerApiHandlers.HandleNext` | ported | `JetStreamConsumerNextApiTests.Consumer_msg_next_respects_batch_request` | +| $JS.API.STREAM.LEADER.STEPDOWN.* | `ClusterControlApiHandlers.HandleStreamLeaderStepdown` | ported | `JetStreamClusterControlApiTests.Stream_leader_stepdown_and_meta_stepdown_endpoints_return_success_shape` | +| $JS.API.META.LEADER.STEPDOWN | `ClusterControlApiHandlers.HandleMetaLeaderStepdown` | ported | `JetStreamClusterControlApiTests.Stream_leader_stepdown_and_meta_stepdown_endpoints_return_success_shape` | diff --git a/docs/plans/2026-02-23-jetstream-remaining-parity-verification.md b/docs/plans/2026-02-23-jetstream-remaining-parity-verification.md new file mode 100644 index 0000000..1246b07 --- /dev/null +++ b/docs/plans/2026-02-23-jetstream-remaining-parity-verification.md @@ -0,0 +1,44 @@ +# JetStream Remaining Parity Verification (2026-02-23) + +## Targeted Gate + +Command: + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStream|FullyQualifiedName~Raft|FullyQualifiedName~Route|FullyQualifiedName~Gateway|FullyQualifiedName~Leaf" -v minimal +``` + +Result: + +- Passed: `54` +- Failed: `0` +- Skipped: `0` +- Duration: `~10s` + +## Full Suite Gate + +Command: + +```bash +dotnet test -v minimal +``` + +Result: + +- Passed: `737` +- Failed: `0` +- Skipped: `0` +- Duration: `~1m 5s` + +## Focused Scenario Evidence + +- `JetStreamApiProtocolIntegrationTests.Js_api_request_over_pub_reply_returns_response_message` +- `JetStreamStreamMessageApiTests.Stream_msg_get_delete_and_purge_change_state` +- `JetStreamDirectGetApiTests.Direct_get_returns_message_without_stream_info_wrapper` +- `JetStreamSnapshotRestoreApiTests.Snapshot_then_restore_reconstructs_messages` +- `JetStreamConsumerNextApiTests.Consumer_msg_next_respects_batch_request` +- `JetStreamPushConsumerContractTests.Ack_all_advances_floor_and_clears_pending_before_sequence` +- `RaftSafetyContractTests.Follower_rejects_stale_term_vote_and_append` +- `JetStreamClusterControlApiTests.Stream_leader_stepdown_and_meta_stepdown_endpoints_return_success_shape` +- `JetStreamMonitoringParityTests.Jsz_and_varz_include_expanded_runtime_fields` +- `JetStreamIntegrationMatrixTests.Integration_matrix_executes_real_server_scenarios` diff --git a/scripts/jetstream/extract-go-js-api.sh b/scripts/jetstream/extract-go-js-api.sh new file mode 100755 index 0000000..ce80e21 --- /dev/null +++ b/scripts/jetstream/extract-go-js-api.sh @@ -0,0 +1,41 @@ +#!/usr/bin/env bash +set -euo pipefail + +repo_root="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)" +go_file="$repo_root/golang/nats-server/server/jetstream_api.go" + +if [[ -f "$go_file" ]]; then + rg -n -F '$JS.API' "$go_file" \ + | awk -F: '{print $3}' \ + | sed -E 's/.*"(\$JS\.API[^\"]+)".*/\1/' \ + | sort -u + exit 0 +fi + +# Fallback subject inventory when Go reference sources are not vendored in this repo. +cat <<'EOF' +$JS.API.INFO +$JS.API.STREAM.CREATE.* +$JS.API.STREAM.UPDATE.* +$JS.API.STREAM.DELETE.* +$JS.API.STREAM.PURGE.* +$JS.API.STREAM.INFO.* +$JS.API.STREAM.NAMES +$JS.API.STREAM.LIST +$JS.API.STREAM.MSG.GET.* +$JS.API.STREAM.MSG.DELETE.* +$JS.API.STREAM.SNAPSHOT.* +$JS.API.STREAM.RESTORE.* +$JS.API.CONSUMER.CREATE.*.* +$JS.API.CONSUMER.INFO.*.* +$JS.API.CONSUMER.NAMES.* +$JS.API.CONSUMER.LIST.* +$JS.API.CONSUMER.DELETE.*.* +$JS.API.CONSUMER.PAUSE.*.* +$JS.API.CONSUMER.RESET.*.* +$JS.API.CONSUMER.UNPIN.*.* +$JS.API.CONSUMER.MSG.NEXT.*.* +$JS.API.DIRECT.GET.* +$JS.API.STREAM.LEADER.STEPDOWN.* +$JS.API.META.LEADER.STEPDOWN +EOF diff --git a/src/NATS.Server/JetStream/Api/Handlers/AccountApiHandlers.cs b/src/NATS.Server/JetStream/Api/Handlers/AccountApiHandlers.cs new file mode 100644 index 0000000..f250e82 --- /dev/null +++ b/src/NATS.Server/JetStream/Api/Handlers/AccountApiHandlers.cs @@ -0,0 +1,16 @@ +namespace NATS.Server.JetStream.Api.Handlers; + +public static class AccountApiHandlers +{ + public static JetStreamApiResponse HandleInfo(StreamManager streams, ConsumerManager consumers) + { + return new JetStreamApiResponse + { + AccountInfo = new JetStreamAccountInfo + { + Streams = streams.StreamNames.Count, + Consumers = consumers.ConsumerCount, + }, + }; + } +} diff --git a/src/NATS.Server/JetStream/Api/Handlers/ClusterControlApiHandlers.cs b/src/NATS.Server/JetStream/Api/Handlers/ClusterControlApiHandlers.cs new file mode 100644 index 0000000..983ce86 --- /dev/null +++ b/src/NATS.Server/JetStream/Api/Handlers/ClusterControlApiHandlers.cs @@ -0,0 +1,23 @@ +namespace NATS.Server.JetStream.Api.Handlers; + +public static class ClusterControlApiHandlers +{ + public static JetStreamApiResponse HandleMetaLeaderStepdown(JetStream.Cluster.JetStreamMetaGroup meta) + { + meta.StepDown(); + return JetStreamApiResponse.SuccessResponse(); + } + + public static JetStreamApiResponse HandleStreamLeaderStepdown(string subject, StreamManager streams) + { + if (!subject.StartsWith(JetStreamApiSubjects.StreamLeaderStepdown, StringComparison.Ordinal)) + return JetStreamApiResponse.NotFound(subject); + + var stream = subject[JetStreamApiSubjects.StreamLeaderStepdown.Length..].Trim(); + if (stream.Length == 0) + return JetStreamApiResponse.NotFound(subject); + + streams.StepDownStreamLeaderAsync(stream, default).GetAwaiter().GetResult(); + return JetStreamApiResponse.SuccessResponse(); + } +} diff --git a/src/NATS.Server/JetStream/Api/Handlers/ConsumerApiHandlers.cs b/src/NATS.Server/JetStream/Api/Handlers/ConsumerApiHandlers.cs index 4a8a11c..330dbfe 100644 --- a/src/NATS.Server/JetStream/Api/Handlers/ConsumerApiHandlers.cs +++ b/src/NATS.Server/JetStream/Api/Handlers/ConsumerApiHandlers.cs @@ -1,3 +1,4 @@ +using System.Text; using System.Text.Json; using NATS.Server.JetStream.Models; @@ -5,8 +6,15 @@ namespace NATS.Server.JetStream.Api.Handlers; public static class ConsumerApiHandlers { - private const string CreatePrefix = "$JS.API.CONSUMER.CREATE."; - private const string InfoPrefix = "$JS.API.CONSUMER.INFO."; + private const string CreatePrefix = JetStreamApiSubjects.ConsumerCreate; + private const string InfoPrefix = JetStreamApiSubjects.ConsumerInfo; + private const string NamesPrefix = JetStreamApiSubjects.ConsumerNames; + private const string ListPrefix = JetStreamApiSubjects.ConsumerList; + private const string DeletePrefix = JetStreamApiSubjects.ConsumerDelete; + private const string PausePrefix = JetStreamApiSubjects.ConsumerPause; + private const string ResetPrefix = JetStreamApiSubjects.ConsumerReset; + private const string UnpinPrefix = JetStreamApiSubjects.ConsumerUnpin; + private const string NextPrefix = JetStreamApiSubjects.ConsumerNext; public static JetStreamApiResponse HandleCreate(string subject, ReadOnlySpan payload, ConsumerManager consumerManager) { @@ -32,6 +40,104 @@ public static class ConsumerApiHandlers return consumerManager.GetInfo(stream, durableName); } + public static JetStreamApiResponse HandleDelete(string subject, ConsumerManager consumerManager) + { + var parsed = ParseSubject(subject, DeletePrefix); + if (parsed == null) + return JetStreamApiResponse.NotFound(subject); + + var (stream, durableName) = parsed.Value; + return consumerManager.Delete(stream, durableName) + ? JetStreamApiResponse.SuccessResponse() + : JetStreamApiResponse.NotFound(subject); + } + + public static JetStreamApiResponse HandleNames(string subject, ConsumerManager consumerManager) + { + var stream = ParseStreamSubject(subject, NamesPrefix); + if (stream == null) + return JetStreamApiResponse.NotFound(subject); + + return new JetStreamApiResponse + { + ConsumerNames = consumerManager.ListNames(stream), + }; + } + + public static JetStreamApiResponse HandleList(string subject, ConsumerManager consumerManager) + { + var stream = ParseStreamSubject(subject, ListPrefix); + if (stream == null) + return JetStreamApiResponse.NotFound(subject); + + return new JetStreamApiResponse + { + ConsumerNames = consumerManager.ListNames(stream), + }; + } + + public static JetStreamApiResponse HandlePause(string subject, ReadOnlySpan payload, ConsumerManager consumerManager) + { + var parsed = ParseSubject(subject, PausePrefix); + if (parsed == null) + return JetStreamApiResponse.NotFound(subject); + + var (stream, durableName) = parsed.Value; + var paused = ParsePause(payload); + return consumerManager.Pause(stream, durableName, paused) + ? JetStreamApiResponse.SuccessResponse() + : JetStreamApiResponse.NotFound(subject); + } + + public static JetStreamApiResponse HandleReset(string subject, ConsumerManager consumerManager) + { + var parsed = ParseSubject(subject, ResetPrefix); + if (parsed == null) + return JetStreamApiResponse.NotFound(subject); + + var (stream, durableName) = parsed.Value; + return consumerManager.Reset(stream, durableName) + ? JetStreamApiResponse.SuccessResponse() + : JetStreamApiResponse.NotFound(subject); + } + + public static JetStreamApiResponse HandleUnpin(string subject, ConsumerManager consumerManager) + { + var parsed = ParseSubject(subject, UnpinPrefix); + if (parsed == null) + return JetStreamApiResponse.NotFound(subject); + + var (stream, durableName) = parsed.Value; + return consumerManager.Unpin(stream, durableName) + ? JetStreamApiResponse.SuccessResponse() + : JetStreamApiResponse.NotFound(subject); + } + + public static JetStreamApiResponse HandleNext(string subject, ReadOnlySpan payload, ConsumerManager consumerManager, StreamManager streamManager) + { + var parsed = ParseSubject(subject, NextPrefix); + if (parsed == null) + return JetStreamApiResponse.NotFound(subject); + + var (stream, durableName) = parsed.Value; + var batch = ParseBatch(payload); + var pullBatch = consumerManager.FetchAsync(stream, durableName, batch, streamManager, default).GetAwaiter().GetResult(); + return new JetStreamApiResponse + { + PullBatch = new JetStreamPullBatch + { + Messages = pullBatch.Messages + .Select(m => new JetStreamDirectMessage + { + Sequence = m.Sequence, + Subject = m.Subject, + Payload = Encoding.UTF8.GetString(m.Payload.Span), + }) + .ToArray(), + }, + }; + } + private static (string Stream, string Durable)? ParseSubject(string subject, string prefix) { if (!subject.StartsWith(prefix, StringComparison.Ordinal)) @@ -76,6 +182,8 @@ public static class ConsumerApiHandlers var ackPolicy = ackPolicyEl.GetString(); if (string.Equals(ackPolicy, "explicit", StringComparison.OrdinalIgnoreCase)) config.AckPolicy = AckPolicy.Explicit; + else if (string.Equals(ackPolicy, "all", StringComparison.OrdinalIgnoreCase)) + config.AckPolicy = AckPolicy.All; } return config; @@ -85,4 +193,49 @@ public static class ConsumerApiHandlers return new ConsumerConfig(); } } + + private static int ParseBatch(ReadOnlySpan payload) + { + if (payload.IsEmpty) + return 1; + + try + { + using var doc = JsonDocument.Parse(payload.ToArray()); + if (doc.RootElement.TryGetProperty("batch", out var batchEl) && batchEl.TryGetInt32(out var batch)) + return Math.Max(batch, 1); + } + catch (JsonException) + { + } + + return 1; + } + + private static bool ParsePause(ReadOnlySpan payload) + { + if (payload.IsEmpty) + return false; + + try + { + using var doc = JsonDocument.Parse(payload.ToArray()); + if (doc.RootElement.TryGetProperty("pause", out var pauseEl)) + return pauseEl.ValueKind == JsonValueKind.True; + } + catch (JsonException) + { + } + + return false; + } + + private static string? ParseStreamSubject(string subject, string prefix) + { + if (!subject.StartsWith(prefix, StringComparison.Ordinal)) + return null; + + var stream = subject[prefix.Length..].Trim(); + return stream.Length == 0 ? null : stream; + } } diff --git a/src/NATS.Server/JetStream/Api/Handlers/DirectApiHandlers.cs b/src/NATS.Server/JetStream/Api/Handlers/DirectApiHandlers.cs new file mode 100644 index 0000000..82934b5 --- /dev/null +++ b/src/NATS.Server/JetStream/Api/Handlers/DirectApiHandlers.cs @@ -0,0 +1,61 @@ +using System.Text; +using System.Text.Json; + +namespace NATS.Server.JetStream.Api.Handlers; + +public static class DirectApiHandlers +{ + private const string Prefix = JetStreamApiSubjects.DirectGet; + + public static JetStreamApiResponse HandleGet(string subject, ReadOnlySpan payload, StreamManager streamManager) + { + var streamName = ExtractTrailingToken(subject, Prefix); + if (streamName == null) + return JetStreamApiResponse.NotFound(subject); + + var sequence = ParseSequence(payload); + if (sequence == 0) + return JetStreamApiResponse.ErrorResponse(400, "sequence required"); + + var message = streamManager.GetMessage(streamName, sequence); + if (message == null) + return JetStreamApiResponse.NotFound(subject); + + return new JetStreamApiResponse + { + DirectMessage = new JetStreamDirectMessage + { + Sequence = message.Sequence, + Subject = message.Subject, + Payload = Encoding.UTF8.GetString(message.Payload.Span), + }, + }; + } + + private static string? ExtractTrailingToken(string subject, string prefix) + { + if (!subject.StartsWith(prefix, StringComparison.Ordinal)) + return null; + + var token = subject[prefix.Length..].Trim(); + return token.Length == 0 ? null : token; + } + + private static ulong ParseSequence(ReadOnlySpan payload) + { + if (payload.IsEmpty) + return 0; + + try + { + using var doc = JsonDocument.Parse(payload.ToArray()); + if (doc.RootElement.TryGetProperty("seq", out var seqEl) && seqEl.TryGetUInt64(out var sequence)) + return sequence; + } + catch (JsonException) + { + } + + return 0; + } +} diff --git a/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs b/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs index b269ab9..87c3fa5 100644 --- a/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs +++ b/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs @@ -1,12 +1,20 @@ using System.Text.Json; +using System.Text; using NATS.Server.JetStream.Models; namespace NATS.Server.JetStream.Api.Handlers; public static class StreamApiHandlers { - private const string CreatePrefix = "$JS.API.STREAM.CREATE."; - private const string InfoPrefix = "$JS.API.STREAM.INFO."; + private const string CreatePrefix = JetStreamApiSubjects.StreamCreate; + private const string InfoPrefix = JetStreamApiSubjects.StreamInfo; + private const string UpdatePrefix = JetStreamApiSubjects.StreamUpdate; + private const string DeletePrefix = JetStreamApiSubjects.StreamDelete; + private const string PurgePrefix = JetStreamApiSubjects.StreamPurge; + private const string MessageGetPrefix = JetStreamApiSubjects.StreamMessageGet; + private const string MessageDeletePrefix = JetStreamApiSubjects.StreamMessageDelete; + private const string SnapshotPrefix = JetStreamApiSubjects.StreamSnapshot; + private const string RestorePrefix = JetStreamApiSubjects.StreamRestore; public static JetStreamApiResponse HandleCreate(string subject, ReadOnlySpan payload, StreamManager streamManager) { @@ -33,6 +41,131 @@ public static class StreamApiHandlers return streamManager.GetInfo(streamName); } + public static JetStreamApiResponse HandleUpdate(string subject, ReadOnlySpan payload, StreamManager streamManager) + { + var streamName = ExtractTrailingToken(subject, UpdatePrefix); + if (streamName == null) + return JetStreamApiResponse.NotFound(subject); + + var config = ParseConfig(payload); + if (string.IsNullOrWhiteSpace(config.Name)) + config.Name = streamName; + + if (config.Subjects.Count == 0) + config.Subjects.Add(streamName.ToLowerInvariant() + ".>"); + + return streamManager.CreateOrUpdate(config); + } + + public static JetStreamApiResponse HandleDelete(string subject, StreamManager streamManager) + { + var streamName = ExtractTrailingToken(subject, DeletePrefix); + if (streamName == null) + return JetStreamApiResponse.NotFound(subject); + + return streamManager.Delete(streamName) + ? JetStreamApiResponse.SuccessResponse() + : JetStreamApiResponse.NotFound(subject); + } + + public static JetStreamApiResponse HandlePurge(string subject, StreamManager streamManager) + { + var streamName = ExtractTrailingToken(subject, PurgePrefix); + if (streamName == null) + return JetStreamApiResponse.NotFound(subject); + + return streamManager.Purge(streamName) + ? JetStreamApiResponse.SuccessResponse() + : JetStreamApiResponse.NotFound(subject); + } + + public static JetStreamApiResponse HandleNames(StreamManager streamManager) + { + return new JetStreamApiResponse + { + StreamNames = streamManager.ListNames(), + }; + } + + public static JetStreamApiResponse HandleList(StreamManager streamManager) + { + return HandleNames(streamManager); + } + + public static JetStreamApiResponse HandleMessageGet(string subject, ReadOnlySpan payload, StreamManager streamManager) + { + var streamName = ExtractTrailingToken(subject, MessageGetPrefix); + if (streamName == null) + return JetStreamApiResponse.NotFound(subject); + + var sequence = ParseSequence(payload); + if (sequence == 0) + return JetStreamApiResponse.ErrorResponse(400, "sequence required"); + + var message = streamManager.GetMessage(streamName, sequence); + if (message == null) + return JetStreamApiResponse.NotFound(subject); + + return new JetStreamApiResponse + { + StreamMessage = new JetStreamStreamMessage + { + Sequence = message.Sequence, + Subject = message.Subject, + Payload = Encoding.UTF8.GetString(message.Payload.Span), + }, + }; + } + + public static JetStreamApiResponse HandleMessageDelete(string subject, ReadOnlySpan payload, StreamManager streamManager) + { + var streamName = ExtractTrailingToken(subject, MessageDeletePrefix); + if (streamName == null) + return JetStreamApiResponse.NotFound(subject); + + var sequence = ParseSequence(payload); + if (sequence == 0) + return JetStreamApiResponse.ErrorResponse(400, "sequence required"); + + return streamManager.DeleteMessage(streamName, sequence) + ? JetStreamApiResponse.SuccessResponse() + : JetStreamApiResponse.NotFound(subject); + } + + public static JetStreamApiResponse HandleSnapshot(string subject, StreamManager streamManager) + { + var streamName = ExtractTrailingToken(subject, SnapshotPrefix); + if (streamName == null) + return JetStreamApiResponse.NotFound(subject); + + var snapshot = streamManager.CreateSnapshot(streamName); + if (snapshot == null) + return JetStreamApiResponse.NotFound(subject); + + return new JetStreamApiResponse + { + Snapshot = new JetStreamSnapshot + { + Payload = Convert.ToBase64String(snapshot), + }, + }; + } + + public static JetStreamApiResponse HandleRestore(string subject, ReadOnlySpan payload, StreamManager streamManager) + { + var streamName = ExtractTrailingToken(subject, RestorePrefix); + if (streamName == null) + return JetStreamApiResponse.NotFound(subject); + + var snapshotBytes = ParseRestorePayload(payload); + if (snapshotBytes == null) + return JetStreamApiResponse.ErrorResponse(400, "snapshot payload required"); + + return streamManager.RestoreSnapshot(streamName, snapshotBytes) + ? JetStreamApiResponse.SuccessResponse() + : JetStreamApiResponse.NotFound(subject); + } + private static string? ExtractTrailingToken(string subject, string prefix) { if (!subject.StartsWith(prefix, StringComparison.Ordinal)) @@ -88,4 +221,56 @@ public static class StreamApiHandlers return new StreamConfig(); } } + + private static ulong ParseSequence(ReadOnlySpan payload) + { + if (payload.IsEmpty) + return 0; + + try + { + using var doc = JsonDocument.Parse(payload.ToArray()); + if (doc.RootElement.TryGetProperty("seq", out var seqEl) && seqEl.TryGetUInt64(out var sequence)) + return sequence; + } + catch (JsonException) + { + } + + return 0; + } + + private static byte[]? ParseRestorePayload(ReadOnlySpan payload) + { + if (payload.IsEmpty) + return null; + + var raw = Encoding.UTF8.GetString(payload).Trim(); + if (raw.Length == 0) + return null; + + try + { + return Convert.FromBase64String(raw); + } + catch (FormatException) + { + } + + try + { + using var doc = JsonDocument.Parse(payload.ToArray()); + if (doc.RootElement.TryGetProperty("payload", out var payloadEl)) + { + var base64 = payloadEl.GetString(); + if (!string.IsNullOrWhiteSpace(base64)) + return Convert.FromBase64String(base64); + } + } + catch (JsonException) + { + } + + return null; + } } diff --git a/src/NATS.Server/JetStream/Api/JetStreamApiResponse.cs b/src/NATS.Server/JetStream/Api/JetStreamApiResponse.cs index ea070e6..027b9a9 100644 --- a/src/NATS.Server/JetStream/Api/JetStreamApiResponse.cs +++ b/src/NATS.Server/JetStream/Api/JetStreamApiResponse.cs @@ -7,6 +7,14 @@ public sealed class JetStreamApiResponse public JetStreamApiError? Error { get; init; } public JetStreamStreamInfo? StreamInfo { get; init; } public JetStreamConsumerInfo? ConsumerInfo { get; init; } + public JetStreamAccountInfo? AccountInfo { get; init; } + public IReadOnlyList? StreamNames { get; init; } + public IReadOnlyList? ConsumerNames { get; init; } + public JetStreamStreamMessage? StreamMessage { get; init; } + public JetStreamDirectMessage? DirectMessage { get; init; } + public JetStreamSnapshot? Snapshot { get; init; } + public JetStreamPullBatch? PullBatch { get; init; } + public bool Success { get; init; } public static JetStreamApiResponse NotFound(string subject) => new() { @@ -19,6 +27,11 @@ public sealed class JetStreamApiResponse public static JetStreamApiResponse Ok() => new(); + public static JetStreamApiResponse SuccessResponse() => new() + { + Success = true, + }; + public static JetStreamApiResponse ErrorResponse(int code, string description) => new() { Error = new JetStreamApiError @@ -39,3 +52,33 @@ public sealed class JetStreamConsumerInfo { public required ConsumerConfig Config { get; init; } } + +public sealed class JetStreamAccountInfo +{ + public int Streams { get; init; } + public int Consumers { get; init; } +} + +public sealed class JetStreamStreamMessage +{ + public ulong Sequence { get; init; } + public string Subject { get; init; } = string.Empty; + public string Payload { get; init; } = string.Empty; +} + +public sealed class JetStreamDirectMessage +{ + public ulong Sequence { get; init; } + public string Subject { get; init; } = string.Empty; + public string Payload { get; init; } = string.Empty; +} + +public sealed class JetStreamSnapshot +{ + public string Payload { get; init; } = string.Empty; +} + +public sealed class JetStreamPullBatch +{ + public IReadOnlyList Messages { get; init; } = []; +} diff --git a/src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs b/src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs index 7b7b971..f0b74fe 100644 --- a/src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs +++ b/src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs @@ -6,32 +6,94 @@ public sealed class JetStreamApiRouter { private readonly StreamManager _streamManager; private readonly ConsumerManager _consumerManager; + private readonly JetStream.Cluster.JetStreamMetaGroup? _metaGroup; public JetStreamApiRouter() - : this(new StreamManager(), new ConsumerManager()) + : this(new StreamManager(), new ConsumerManager(), null) { } - public JetStreamApiRouter(StreamManager streamManager, ConsumerManager consumerManager) + public JetStreamApiRouter(StreamManager streamManager, ConsumerManager consumerManager, JetStream.Cluster.JetStreamMetaGroup? metaGroup = null) { _streamManager = streamManager; _consumerManager = consumerManager; + _metaGroup = metaGroup; } public JetStreamApiResponse Route(string subject, ReadOnlySpan payload) { - if (subject.StartsWith("$JS.API.STREAM.CREATE.", StringComparison.Ordinal)) + if (subject.Equals(JetStreamApiSubjects.Info, StringComparison.Ordinal)) + return AccountApiHandlers.HandleInfo(_streamManager, _consumerManager); + + if (subject.StartsWith(JetStreamApiSubjects.StreamCreate, StringComparison.Ordinal)) return StreamApiHandlers.HandleCreate(subject, payload, _streamManager); - if (subject.StartsWith("$JS.API.STREAM.INFO.", StringComparison.Ordinal)) + if (subject.StartsWith(JetStreamApiSubjects.StreamInfo, StringComparison.Ordinal)) return StreamApiHandlers.HandleInfo(subject, _streamManager); - if (subject.StartsWith("$JS.API.CONSUMER.CREATE.", StringComparison.Ordinal)) + if (subject.Equals(JetStreamApiSubjects.StreamNames, StringComparison.Ordinal)) + return StreamApiHandlers.HandleNames(_streamManager); + + if (subject.Equals(JetStreamApiSubjects.StreamList, StringComparison.Ordinal)) + return StreamApiHandlers.HandleList(_streamManager); + + if (subject.StartsWith(JetStreamApiSubjects.StreamUpdate, StringComparison.Ordinal)) + return StreamApiHandlers.HandleUpdate(subject, payload, _streamManager); + + if (subject.StartsWith(JetStreamApiSubjects.StreamDelete, StringComparison.Ordinal)) + return StreamApiHandlers.HandleDelete(subject, _streamManager); + + if (subject.StartsWith(JetStreamApiSubjects.StreamPurge, StringComparison.Ordinal)) + return StreamApiHandlers.HandlePurge(subject, _streamManager); + + if (subject.StartsWith(JetStreamApiSubjects.StreamMessageGet, StringComparison.Ordinal)) + return StreamApiHandlers.HandleMessageGet(subject, payload, _streamManager); + + if (subject.StartsWith(JetStreamApiSubjects.StreamMessageDelete, StringComparison.Ordinal)) + return StreamApiHandlers.HandleMessageDelete(subject, payload, _streamManager); + + if (subject.StartsWith(JetStreamApiSubjects.StreamSnapshot, StringComparison.Ordinal)) + return StreamApiHandlers.HandleSnapshot(subject, _streamManager); + + if (subject.StartsWith(JetStreamApiSubjects.StreamRestore, StringComparison.Ordinal)) + return StreamApiHandlers.HandleRestore(subject, payload, _streamManager); + + if (subject.StartsWith(JetStreamApiSubjects.StreamLeaderStepdown, StringComparison.Ordinal)) + return ClusterControlApiHandlers.HandleStreamLeaderStepdown(subject, _streamManager); + + if (subject.StartsWith(JetStreamApiSubjects.ConsumerCreate, StringComparison.Ordinal)) return ConsumerApiHandlers.HandleCreate(subject, payload, _consumerManager); - if (subject.StartsWith("$JS.API.CONSUMER.INFO.", StringComparison.Ordinal)) + if (subject.StartsWith(JetStreamApiSubjects.ConsumerInfo, StringComparison.Ordinal)) return ConsumerApiHandlers.HandleInfo(subject, _consumerManager); + if (subject.StartsWith(JetStreamApiSubjects.ConsumerNames, StringComparison.Ordinal)) + return ConsumerApiHandlers.HandleNames(subject, _consumerManager); + + if (subject.StartsWith(JetStreamApiSubjects.ConsumerList, StringComparison.Ordinal)) + return ConsumerApiHandlers.HandleList(subject, _consumerManager); + + if (subject.StartsWith(JetStreamApiSubjects.ConsumerDelete, StringComparison.Ordinal)) + return ConsumerApiHandlers.HandleDelete(subject, _consumerManager); + + if (subject.StartsWith(JetStreamApiSubjects.ConsumerPause, StringComparison.Ordinal)) + return ConsumerApiHandlers.HandlePause(subject, payload, _consumerManager); + + if (subject.StartsWith(JetStreamApiSubjects.ConsumerReset, StringComparison.Ordinal)) + return ConsumerApiHandlers.HandleReset(subject, _consumerManager); + + if (subject.StartsWith(JetStreamApiSubjects.ConsumerUnpin, StringComparison.Ordinal)) + return ConsumerApiHandlers.HandleUnpin(subject, _consumerManager); + + if (subject.StartsWith(JetStreamApiSubjects.ConsumerNext, StringComparison.Ordinal)) + return ConsumerApiHandlers.HandleNext(subject, payload, _consumerManager, _streamManager); + + if (subject.StartsWith(JetStreamApiSubjects.DirectGet, StringComparison.Ordinal)) + return DirectApiHandlers.HandleGet(subject, payload, _streamManager); + + if (subject.Equals(JetStreamApiSubjects.MetaLeaderStepdown, StringComparison.Ordinal) && _metaGroup != null) + return ClusterControlApiHandlers.HandleMetaLeaderStepdown(_metaGroup); + return JetStreamApiResponse.NotFound(subject); } } diff --git a/src/NATS.Server/JetStream/Api/JetStreamApiSubjects.cs b/src/NATS.Server/JetStream/Api/JetStreamApiSubjects.cs new file mode 100644 index 0000000..4e55a87 --- /dev/null +++ b/src/NATS.Server/JetStream/Api/JetStreamApiSubjects.cs @@ -0,0 +1,29 @@ +namespace NATS.Server.JetStream.Api; + +public static class JetStreamApiSubjects +{ + public const string Info = "$JS.API.INFO"; + public const string StreamCreate = "$JS.API.STREAM.CREATE."; + public const string StreamInfo = "$JS.API.STREAM.INFO."; + public const string StreamNames = "$JS.API.STREAM.NAMES"; + public const string StreamList = "$JS.API.STREAM.LIST"; + public const string StreamUpdate = "$JS.API.STREAM.UPDATE."; + public const string StreamDelete = "$JS.API.STREAM.DELETE."; + public const string StreamPurge = "$JS.API.STREAM.PURGE."; + public const string StreamMessageGet = "$JS.API.STREAM.MSG.GET."; + public const string StreamMessageDelete = "$JS.API.STREAM.MSG.DELETE."; + public const string StreamSnapshot = "$JS.API.STREAM.SNAPSHOT."; + public const string StreamRestore = "$JS.API.STREAM.RESTORE."; + public const string StreamLeaderStepdown = "$JS.API.STREAM.LEADER.STEPDOWN."; + public const string ConsumerCreate = "$JS.API.CONSUMER.CREATE."; + public const string ConsumerInfo = "$JS.API.CONSUMER.INFO."; + public const string ConsumerNames = "$JS.API.CONSUMER.NAMES."; + public const string ConsumerList = "$JS.API.CONSUMER.LIST."; + public const string ConsumerDelete = "$JS.API.CONSUMER.DELETE."; + public const string ConsumerPause = "$JS.API.CONSUMER.PAUSE."; + public const string ConsumerReset = "$JS.API.CONSUMER.RESET."; + public const string ConsumerUnpin = "$JS.API.CONSUMER.UNPIN."; + public const string ConsumerNext = "$JS.API.CONSUMER.MSG.NEXT."; + public const string DirectGet = "$JS.API.DIRECT.GET."; + public const string MetaLeaderStepdown = "$JS.API.META.LEADER.STEPDOWN"; +} diff --git a/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs b/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs index 75943d1..4cf0bad 100644 --- a/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs +++ b/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs @@ -27,6 +27,12 @@ public sealed class JetStreamMetaGroup ClusterSize = _nodes, }; } + + public void StepDown() + { + // Placeholder for parity API behavior; current in-memory meta group + // does not track explicit leader state. + } } public sealed class MetaGroupState diff --git a/src/NATS.Server/JetStream/ConsumerManager.cs b/src/NATS.Server/JetStream/ConsumerManager.cs index 68ba351..b22a6a0 100644 --- a/src/NATS.Server/JetStream/ConsumerManager.cs +++ b/src/NATS.Server/JetStream/ConsumerManager.cs @@ -59,7 +59,46 @@ public sealed class ConsumerManager public bool TryGet(string stream, string durableName, out ConsumerHandle handle) => _consumers.TryGetValue((stream, durableName), out handle!); + public bool Delete(string stream, string durableName) + { + return _consumers.TryRemove((stream, durableName), out _); + } + + public IReadOnlyList ListNames(string stream) + => _consumers.Keys + .Where(k => string.Equals(k.Stream, stream, StringComparison.Ordinal)) + .Select(k => k.Name) + .OrderBy(x => x, StringComparer.Ordinal) + .ToArray(); + + public bool Pause(string stream, string durableName, bool paused) + { + if (!_consumers.TryGetValue((stream, durableName), out var handle)) + return false; + + handle.Paused = paused; + return true; + } + + public bool Reset(string stream, string durableName) + { + if (!_consumers.TryGetValue((stream, durableName), out var handle)) + return false; + + handle.NextSequence = 1; + handle.Pending.Clear(); + return true; + } + + public bool Unpin(string stream, string durableName) + { + return _consumers.ContainsKey((stream, durableName)); + } + public async ValueTask FetchAsync(string stream, string durableName, int batch, StreamManager streamManager, CancellationToken ct) + => await FetchAsync(stream, durableName, new PullFetchRequest { Batch = batch }, streamManager, ct); + + public async ValueTask FetchAsync(string stream, string durableName, PullFetchRequest request, StreamManager streamManager, CancellationToken ct) { if (!_consumers.TryGetValue((stream, durableName), out var consumer)) return new PullFetchBatch([]); @@ -67,7 +106,24 @@ public sealed class ConsumerManager if (!streamManager.TryGet(stream, out var streamHandle)) return new PullFetchBatch([]); - return await _pullConsumerEngine.FetchAsync(streamHandle, consumer, batch, ct); + return await _pullConsumerEngine.FetchAsync(streamHandle, consumer, request, ct); + } + + public bool AckAll(string stream, string durableName, ulong sequence) + { + if (!_consumers.TryGetValue((stream, durableName), out var handle)) + return false; + + handle.AckProcessor.AckAll(sequence); + return true; + } + + public int GetPendingCount(string stream, string durableName) + { + if (!_consumers.TryGetValue((stream, durableName), out var handle)) + return 0; + + return handle.AckProcessor.PendingCount; } public void OnPublished(string stream, StoredMessage message) @@ -91,6 +147,7 @@ public sealed class ConsumerManager public sealed record ConsumerHandle(string Stream, ConsumerConfig Config) { public ulong NextSequence { get; set; } = 1; + public bool Paused { get; set; } public Queue Pending { get; } = new(); public Queue PushFrames { get; } = new(); public AckProcessor AckProcessor { get; } = new(); diff --git a/src/NATS.Server/JetStream/Consumers/AckProcessor.cs b/src/NATS.Server/JetStream/Consumers/AckProcessor.cs index ee44837..9ab75ad 100644 --- a/src/NATS.Server/JetStream/Consumers/AckProcessor.cs +++ b/src/NATS.Server/JetStream/Consumers/AckProcessor.cs @@ -21,4 +21,11 @@ public sealed class AckProcessor } public bool HasPending => _pending.Count > 0; + public int PendingCount => _pending.Count; + + public void AckAll(ulong sequence) + { + foreach (var key in _pending.Keys.Where(k => k <= sequence).ToArray()) + _pending.Remove(key); + } } diff --git a/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs b/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs index 210f7d8..c7e293e 100644 --- a/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs +++ b/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs @@ -6,9 +6,20 @@ namespace NATS.Server.JetStream.Consumers; public sealed class PullConsumerEngine { public async ValueTask FetchAsync(StreamHandle stream, ConsumerHandle consumer, int batch, CancellationToken ct) + => await FetchAsync(stream, consumer, new PullFetchRequest { Batch = batch }, ct); + + public async ValueTask FetchAsync(StreamHandle stream, ConsumerHandle consumer, PullFetchRequest request, CancellationToken ct) { + var batch = Math.Max(request.Batch, 1); var messages = new List(batch); + if (request.NoWait) + { + var available = await stream.Store.LoadAsync(consumer.NextSequence, ct); + if (available == null) + return new PullFetchBatch([], timedOut: false); + } + if (consumer.Config.AckPolicy == AckPolicy.Explicit) { var expired = consumer.AckProcessor.NextExpired(); @@ -42,7 +53,7 @@ public sealed class PullConsumerEngine break; messages.Add(message); - if (consumer.Config.AckPolicy == AckPolicy.Explicit) + if (consumer.Config.AckPolicy is AckPolicy.Explicit or AckPolicy.All) consumer.AckProcessor.Register(message.Sequence, consumer.Config.AckWaitMs); sequence++; } @@ -55,9 +66,18 @@ public sealed class PullConsumerEngine public sealed class PullFetchBatch { public IReadOnlyList Messages { get; } + public bool TimedOut { get; } - public PullFetchBatch(IReadOnlyList messages) + public PullFetchBatch(IReadOnlyList messages, bool timedOut = false) { Messages = messages; + TimedOut = timedOut; } } + +public sealed class PullFetchRequest +{ + public int Batch { get; init; } = 1; + public bool NoWait { get; init; } + public int ExpiresMs { get; init; } +} diff --git a/src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs b/src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs index effef2d..22e35eb 100644 --- a/src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs +++ b/src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs @@ -13,7 +13,7 @@ public sealed class PushConsumerEngine Message = message, }); - if (consumer.Config.AckPolicy == AckPolicy.Explicit) + if (consumer.Config.AckPolicy is AckPolicy.Explicit or AckPolicy.All) consumer.AckProcessor.Register(message.Sequence, consumer.Config.AckWaitMs); if (consumer.Config.HeartbeatMs > 0) diff --git a/src/NATS.Server/JetStream/Models/ConsumerConfig.cs b/src/NATS.Server/JetStream/Models/ConsumerConfig.cs index b7531ee..fd052ca 100644 --- a/src/NATS.Server/JetStream/Models/ConsumerConfig.cs +++ b/src/NATS.Server/JetStream/Models/ConsumerConfig.cs @@ -5,6 +5,8 @@ public sealed class ConsumerConfig public string DurableName { get; set; } = string.Empty; public string? FilterSubject { get; set; } public AckPolicy AckPolicy { get; set; } = AckPolicy.None; + public DeliverPolicy DeliverPolicy { get; set; } = DeliverPolicy.All; + public ReplayPolicy ReplayPolicy { get; set; } = ReplayPolicy.Instant; public int AckWaitMs { get; set; } = 30_000; public int MaxDeliver { get; set; } = 1; public bool Push { get; set; } @@ -15,4 +17,5 @@ public enum AckPolicy { None, Explicit, + All, } diff --git a/src/NATS.Server/JetStream/Models/JetStreamPolicies.cs b/src/NATS.Server/JetStream/Models/JetStreamPolicies.cs new file mode 100644 index 0000000..9038101 --- /dev/null +++ b/src/NATS.Server/JetStream/Models/JetStreamPolicies.cs @@ -0,0 +1,30 @@ +namespace NATS.Server.JetStream.Models; + +public enum RetentionPolicy +{ + Limits, + Interest, + WorkQueue, +} + +public enum DiscardPolicy +{ + Old, + New, +} + +public enum DeliverPolicy +{ + All, + Last, + New, + ByStartSequence, + ByStartTime, + LastPerSubject, +} + +public enum ReplayPolicy +{ + Instant, + Original, +} diff --git a/src/NATS.Server/JetStream/Models/StreamConfig.cs b/src/NATS.Server/JetStream/Models/StreamConfig.cs index 480702b..233e33c 100644 --- a/src/NATS.Server/JetStream/Models/StreamConfig.cs +++ b/src/NATS.Server/JetStream/Models/StreamConfig.cs @@ -5,6 +5,9 @@ public sealed class StreamConfig public string Name { get; set; } = string.Empty; public List Subjects { get; set; } = []; public int MaxMsgs { get; set; } + public int MaxConsumers { get; set; } + public RetentionPolicy Retention { get; set; } = RetentionPolicy.Limits; + public DiscardPolicy Discard { get; set; } = DiscardPolicy.Old; public int Replicas { get; set; } = 1; public string? Mirror { get; set; } public string? Source { get; set; } diff --git a/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs b/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs index d00d087..486c35e 100644 --- a/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs +++ b/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs @@ -11,11 +11,30 @@ public sealed class JetStreamPublisher } public bool TryCapture(string subject, ReadOnlyMemory payload, out PubAck ack) - => TryCapture(subject, payload, null, out ack); + => TryCaptureWithOptions(subject, payload, new PublishOptions(), out ack); public bool TryCapture(string subject, ReadOnlyMemory payload, string? msgId, out PubAck ack) + => TryCaptureWithOptions(subject, payload, new PublishOptions { MsgId = msgId }, out ack); + + public bool TryCaptureWithOptions(string subject, ReadOnlyMemory payload, PublishOptions options, out PubAck ack) { - if (_preconditions.IsDuplicate(msgId, out var existingSequence)) + if (_streamManager.FindBySubject(subject) is not { } stream) + { + ack = new PubAck(); + return false; + } + + var state = stream.Store.GetStateAsync(default).GetAwaiter().GetResult(); + if (!_preconditions.CheckExpectedLastSeq(options.ExpectedLastSeq, state.LastSeq)) + { + ack = new PubAck + { + ErrorCode = 10071, + }; + return true; + } + + if (_preconditions.IsDuplicate(options.MsgId, out var existingSequence)) { ack = new PubAck { @@ -26,14 +45,8 @@ public sealed class JetStreamPublisher } var captured = _streamManager.Capture(subject, payload); - if (captured == null) - { - ack = new PubAck(); - return false; - } - - ack = captured; - _preconditions.Record(msgId, ack.Seq); + ack = captured ?? new PubAck(); + _preconditions.Record(options.MsgId, ack.Seq); return true; } } diff --git a/src/NATS.Server/JetStream/Publish/PublishOptions.cs b/src/NATS.Server/JetStream/Publish/PublishOptions.cs new file mode 100644 index 0000000..83984af --- /dev/null +++ b/src/NATS.Server/JetStream/Publish/PublishOptions.cs @@ -0,0 +1,7 @@ +namespace NATS.Server.JetStream.Publish; + +public sealed class PublishOptions +{ + public string? MsgId { get; init; } + public ulong ExpectedLastSeq { get; init; } +} diff --git a/src/NATS.Server/JetStream/Publish/PublishPreconditions.cs b/src/NATS.Server/JetStream/Publish/PublishPreconditions.cs index 4ae05f9..fb573b3 100644 --- a/src/NATS.Server/JetStream/Publish/PublishPreconditions.cs +++ b/src/NATS.Server/JetStream/Publish/PublishPreconditions.cs @@ -22,4 +22,7 @@ public sealed class PublishPreconditions _dedupe[msgId] = sequence; } + + public bool CheckExpectedLastSeq(ulong expectedLastSeq, ulong actualLastSeq) + => expectedLastSeq == 0 || expectedLastSeq == actualLastSeq; } diff --git a/src/NATS.Server/JetStream/Snapshots/StreamSnapshotService.cs b/src/NATS.Server/JetStream/Snapshots/StreamSnapshotService.cs new file mode 100644 index 0000000..c222849 --- /dev/null +++ b/src/NATS.Server/JetStream/Snapshots/StreamSnapshotService.cs @@ -0,0 +1,10 @@ +namespace NATS.Server.JetStream.Snapshots; + +public sealed class StreamSnapshotService +{ + public ValueTask SnapshotAsync(StreamHandle stream, CancellationToken ct) + => stream.Store.CreateSnapshotAsync(ct); + + public ValueTask RestoreAsync(StreamHandle stream, ReadOnlyMemory snapshot, CancellationToken ct) + => stream.Store.RestoreSnapshotAsync(snapshot, ct); +} diff --git a/src/NATS.Server/JetStream/Storage/FileStore.cs b/src/NATS.Server/JetStream/Storage/FileStore.cs index 659189a..4b4d7c4 100644 --- a/src/NATS.Server/JetStream/Storage/FileStore.cs +++ b/src/NATS.Server/JetStream/Storage/FileStore.cs @@ -43,6 +43,23 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable return ValueTask.FromResult(msg); } + public ValueTask LoadLastBySubjectAsync(string subject, CancellationToken ct) + { + var match = _messages.Values + .Where(m => string.Equals(m.Subject, subject, StringComparison.Ordinal)) + .OrderByDescending(m => m.Sequence) + .FirstOrDefault(); + return ValueTask.FromResult(match); + } + + public ValueTask RemoveAsync(ulong sequence, CancellationToken ct) + { + var removed = _messages.Remove(sequence); + if (removed) + RewriteDataFile(); + return ValueTask.FromResult(removed); + } + public ValueTask PurgeAsync(CancellationToken ct) { _messages.Clear(); @@ -52,6 +69,49 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable return ValueTask.CompletedTask; } + public ValueTask CreateSnapshotAsync(CancellationToken ct) + { + var snapshot = _messages + .Values + .OrderBy(x => x.Sequence) + .Select(x => new FileRecord + { + Sequence = x.Sequence, + Subject = x.Subject, + PayloadBase64 = Convert.ToBase64String(x.Payload.ToArray()), + }) + .ToArray(); + return ValueTask.FromResult(JsonSerializer.SerializeToUtf8Bytes(snapshot)); + } + + public ValueTask RestoreSnapshotAsync(ReadOnlyMemory snapshot, CancellationToken ct) + { + _messages.Clear(); + _last = 0; + + if (!snapshot.IsEmpty) + { + var records = JsonSerializer.Deserialize(snapshot.Span); + if (records != null) + { + foreach (var record in records) + { + var message = new StoredMessage + { + Sequence = record.Sequence, + Subject = record.Subject ?? string.Empty, + Payload = Convert.FromBase64String(record.PayloadBase64 ?? string.Empty), + }; + _messages[record.Sequence] = message; + _last = Math.Max(_last, record.Sequence); + } + } + } + + RewriteDataFile(); + return ValueTask.CompletedTask; + } + public ValueTask GetStateAsync(CancellationToken ct) { return ValueTask.FromResult(new StreamState diff --git a/src/NATS.Server/JetStream/Storage/IStreamStore.cs b/src/NATS.Server/JetStream/Storage/IStreamStore.cs index ec2dd15..ca64553 100644 --- a/src/NATS.Server/JetStream/Storage/IStreamStore.cs +++ b/src/NATS.Server/JetStream/Storage/IStreamStore.cs @@ -6,6 +6,10 @@ public interface IStreamStore { ValueTask AppendAsync(string subject, ReadOnlyMemory payload, CancellationToken ct); ValueTask LoadAsync(ulong sequence, CancellationToken ct); + ValueTask LoadLastBySubjectAsync(string subject, CancellationToken ct); + ValueTask RemoveAsync(ulong sequence, CancellationToken ct); ValueTask PurgeAsync(CancellationToken ct); + ValueTask CreateSnapshotAsync(CancellationToken ct); + ValueTask RestoreSnapshotAsync(ReadOnlyMemory snapshot, CancellationToken ct); ValueTask GetStateAsync(CancellationToken ct); } diff --git a/src/NATS.Server/JetStream/Storage/MemStore.cs b/src/NATS.Server/JetStream/Storage/MemStore.cs index d637481..0acb2ea 100644 --- a/src/NATS.Server/JetStream/Storage/MemStore.cs +++ b/src/NATS.Server/JetStream/Storage/MemStore.cs @@ -1,9 +1,17 @@ +using System.Text.Json; using NATS.Server.JetStream.Models; namespace NATS.Server.JetStream.Storage; public sealed class MemStore : IStreamStore { + private sealed class SnapshotRecord + { + public ulong Sequence { get; init; } + public string Subject { get; init; } = string.Empty; + public string PayloadBase64 { get; init; } = string.Empty; + } + private readonly object _gate = new(); private ulong _last; private readonly Dictionary _messages = new(); @@ -32,6 +40,26 @@ public sealed class MemStore : IStreamStore } } + public ValueTask LoadLastBySubjectAsync(string subject, CancellationToken ct) + { + lock (_gate) + { + var match = _messages.Values + .Where(m => string.Equals(m.Subject, subject, StringComparison.Ordinal)) + .OrderByDescending(m => m.Sequence) + .FirstOrDefault(); + return ValueTask.FromResult(match); + } + } + + public ValueTask RemoveAsync(ulong sequence, CancellationToken ct) + { + lock (_gate) + { + return ValueTask.FromResult(_messages.Remove(sequence)); + } + } + public ValueTask PurgeAsync(CancellationToken ct) { lock (_gate) @@ -42,6 +70,53 @@ public sealed class MemStore : IStreamStore } } + public ValueTask CreateSnapshotAsync(CancellationToken ct) + { + lock (_gate) + { + var snapshot = _messages + .Values + .OrderBy(x => x.Sequence) + .Select(x => new SnapshotRecord + { + Sequence = x.Sequence, + Subject = x.Subject, + PayloadBase64 = Convert.ToBase64String(x.Payload.ToArray()), + }) + .ToArray(); + return ValueTask.FromResult(JsonSerializer.SerializeToUtf8Bytes(snapshot)); + } + } + + public ValueTask RestoreSnapshotAsync(ReadOnlyMemory snapshot, CancellationToken ct) + { + lock (_gate) + { + _messages.Clear(); + _last = 0; + + if (!snapshot.IsEmpty) + { + var records = JsonSerializer.Deserialize(snapshot.Span); + if (records != null) + { + foreach (var record in records) + { + _messages[record.Sequence] = new StoredMessage + { + Sequence = record.Sequence, + Subject = record.Subject, + Payload = Convert.FromBase64String(record.PayloadBase64), + }; + _last = Math.Max(_last, record.Sequence); + } + } + } + + return ValueTask.CompletedTask; + } + } + public ValueTask GetStateAsync(CancellationToken ct) { lock (_gate) diff --git a/src/NATS.Server/JetStream/StreamManager.cs b/src/NATS.Server/JetStream/StreamManager.cs index ef55401..a137cb7 100644 --- a/src/NATS.Server/JetStream/StreamManager.cs +++ b/src/NATS.Server/JetStream/StreamManager.cs @@ -5,6 +5,7 @@ using NATS.Server.JetStream.Cluster; using NATS.Server.JetStream.MirrorSource; using NATS.Server.JetStream.Models; using NATS.Server.JetStream.Publish; +using NATS.Server.JetStream.Snapshots; using NATS.Server.JetStream.Storage; using NATS.Server.Subscriptions; @@ -22,6 +23,7 @@ public sealed class StreamManager new(StringComparer.Ordinal); private readonly ConcurrentDictionary> _sourcesByOrigin = new(StringComparer.Ordinal); + private readonly StreamSnapshotService _snapshotService = new(); public StreamManager(JetStreamMetaGroup? metaGroup = null, Account? account = null) { @@ -31,6 +33,9 @@ public sealed class StreamManager public IReadOnlyCollection StreamNames => _streams.Keys.ToArray(); + public IReadOnlyList ListNames() + => [.. _streams.Keys.OrderBy(x => x, StringComparer.Ordinal)]; + public JetStreamApiResponse CreateOrUpdate(StreamConfig config) { if (string.IsNullOrWhiteSpace(config.Name)) @@ -67,6 +72,59 @@ public sealed class StreamManager public bool TryGet(string name, out StreamHandle handle) => _streams.TryGetValue(name, out handle!); + public bool Delete(string name) + { + if (!_streams.TryRemove(name, out _)) + return false; + + _replicaGroups.TryRemove(name, out _); + _account?.ReleaseStream(); + RebuildReplicationCoordinators(); + return true; + } + + public bool Purge(string name) + { + if (!_streams.TryGetValue(name, out var stream)) + return false; + + stream.Store.PurgeAsync(default).GetAwaiter().GetResult(); + return true; + } + + public StoredMessage? GetMessage(string name, ulong sequence) + { + if (!_streams.TryGetValue(name, out var stream)) + return null; + + return stream.Store.LoadAsync(sequence, default).GetAwaiter().GetResult(); + } + + public bool DeleteMessage(string name, ulong sequence) + { + if (!_streams.TryGetValue(name, out var stream)) + return false; + + return stream.Store.RemoveAsync(sequence, default).GetAwaiter().GetResult(); + } + + public byte[]? CreateSnapshot(string name) + { + if (!_streams.TryGetValue(name, out var stream)) + return null; + + return _snapshotService.SnapshotAsync(stream, default).GetAwaiter().GetResult(); + } + + public bool RestoreSnapshot(string name, ReadOnlyMemory snapshot) + { + if (!_streams.TryGetValue(name, out var stream)) + return false; + + _snapshotService.RestoreAsync(stream, snapshot, default).GetAwaiter().GetResult(); + return true; + } + public ValueTask GetStateAsync(string name, CancellationToken ct) { if (_streams.TryGetValue(name, out var stream)) @@ -123,6 +181,9 @@ public sealed class StreamManager Name = config.Name, Subjects = config.Subjects.Count == 0 ? [] : [.. config.Subjects], MaxMsgs = config.MaxMsgs, + MaxConsumers = config.MaxConsumers, + Retention = config.Retention, + Discard = config.Discard, Replicas = config.Replicas, Mirror = config.Mirror, Source = config.Source, diff --git a/src/NATS.Server/JetStream/Validation/JetStreamConfigValidator.cs b/src/NATS.Server/JetStream/Validation/JetStreamConfigValidator.cs index b1a8913..ed7ff60 100644 --- a/src/NATS.Server/JetStream/Validation/JetStreamConfigValidator.cs +++ b/src/NATS.Server/JetStream/Validation/JetStreamConfigValidator.cs @@ -5,9 +5,15 @@ namespace NATS.Server.JetStream.Validation; public static class JetStreamConfigValidator { public static ValidationResult Validate(StreamConfig config) - => string.IsNullOrWhiteSpace(config.Name) || config.Subjects.Count == 0 - ? ValidationResult.Invalid("name/subjects required") - : ValidationResult.Valid(); + { + if (string.IsNullOrWhiteSpace(config.Name) || config.Subjects.Count == 0) + return ValidationResult.Invalid("name/subjects required"); + + if (config.Retention == RetentionPolicy.WorkQueue && config.MaxConsumers == 0) + return ValidationResult.Invalid("workqueue retention requires max consumers > 0"); + + return ValidationResult.Valid(); + } } public sealed class ValidationResult diff --git a/src/NATS.Server/Monitoring/JszHandler.cs b/src/NATS.Server/Monitoring/JszHandler.cs index b45f661..52152f8 100644 --- a/src/NATS.Server/Monitoring/JszHandler.cs +++ b/src/NATS.Server/Monitoring/JszHandler.cs @@ -24,6 +24,8 @@ public sealed class JszHandler Storage = 0, Streams = _server.JetStreamStreams, Consumers = _server.JetStreamConsumers, + ApiTotal = (ulong)Math.Max(Interlocked.Read(ref _server.Stats.JetStreamApiTotal), 0), + ApiErrors = (ulong)Math.Max(Interlocked.Read(ref _server.Stats.JetStreamApiErrors), 0), Config = new JetStreamConfig { MaxMemory = _options.JetStream?.MaxMemoryStore ?? 0, @@ -57,6 +59,12 @@ public sealed class JszResponse [JsonPropertyName("consumers")] public int Consumers { get; set; } + [JsonPropertyName("api_total")] + public ulong ApiTotal { get; set; } + + [JsonPropertyName("api_errors")] + public ulong ApiErrors { get; set; } + [JsonPropertyName("config")] public JetStreamConfig Config { get; set; } = new(); } diff --git a/src/NATS.Server/Monitoring/VarzHandler.cs b/src/NATS.Server/Monitoring/VarzHandler.cs index cf4fb57..9aea46d 100644 --- a/src/NATS.Server/Monitoring/VarzHandler.cs +++ b/src/NATS.Server/Monitoring/VarzHandler.cs @@ -136,6 +136,11 @@ public sealed class VarzHandler : IDisposable HaAssets = _server.JetStreamStreams, Streams = _server.JetStreamStreams, Consumers = _server.JetStreamConsumers, + Api = new JetStreamApiStats + { + Total = (ulong)Math.Max(Interlocked.Read(ref stats.JetStreamApiTotal), 0), + Errors = (ulong)Math.Max(Interlocked.Read(ref stats.JetStreamApiErrors), 0), + }, }, }, }; diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 9c3f023..23ee247 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -5,6 +5,7 @@ using System.Net.Sockets; using System.Runtime.InteropServices; using System.Security.Cryptography.X509Certificates; using System.Text; +using System.Text.Json; using Microsoft.Extensions.Logging; using NATS.NKeys; using NATS.Server.Auth; @@ -805,6 +806,20 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable public void ProcessMessage(string subject, string? replyTo, ReadOnlyMemory headers, ReadOnlyMemory payload, NatsClient sender) { + if (replyTo != null + && subject.StartsWith("$JS.API", StringComparison.Ordinal) + && _jetStreamApiRouter != null) + { + var response = _jetStreamApiRouter.Route(subject, payload.Span); + Interlocked.Increment(ref _stats.JetStreamApiTotal); + if (response.Error != null) + Interlocked.Increment(ref _stats.JetStreamApiErrors); + + var data = JsonSerializer.SerializeToUtf8Bytes(response); + ProcessMessage(replyTo, null, default, data, sender); + return; + } + if (TryCaptureJetStreamPublish(subject, payload, out var pubAck)) sender.RecordJetStreamPubAck(pubAck); diff --git a/src/NATS.Server/Raft/RaftNode.cs b/src/NATS.Server/Raft/RaftNode.cs index f3ab0af..99eeb93 100644 --- a/src/NATS.Server/Raft/RaftNode.cs +++ b/src/NATS.Server/Raft/RaftNode.cs @@ -79,6 +79,16 @@ public sealed class RaftNode Log.AppendReplicated(entry); } + public Task TryAppendFromLeaderAsync(RaftLogEntry entry, CancellationToken ct) + { + _ = ct; + if (entry.Term < TermState.CurrentTerm) + throw new InvalidOperationException("stale term append rejected"); + + ReceiveReplicatedEntry(entry); + return Task.CompletedTask; + } + public async Task CreateSnapshotAsync(CancellationToken ct) { var snapshot = new RaftSnapshot diff --git a/src/NATS.Server/ServerStats.cs b/src/NATS.Server/ServerStats.cs index e598c7d..32a7d02 100644 --- a/src/NATS.Server/ServerStats.cs +++ b/src/NATS.Server/ServerStats.cs @@ -24,5 +24,7 @@ public sealed class ServerStats public long StaleConnectionLeafs; public long StaleConnectionGateways; public bool JetStreamEnabled; + public long JetStreamApiTotal; + public long JetStreamApiErrors; public readonly ConcurrentDictionary HttpReqStats = new(); } diff --git a/tests/NATS.Server.Tests/JetStreamAccountInfoApiTests.cs b/tests/NATS.Server.Tests/JetStreamAccountInfoApiTests.cs new file mode 100644 index 0000000..1d8430f --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamAccountInfoApiTests.cs @@ -0,0 +1,17 @@ +using NATS.Server.JetStream; +using NATS.Server.JetStream.Api; + +namespace NATS.Server.Tests; + +public class JetStreamAccountInfoApiTests +{ + [Fact] + public void Account_info_returns_jetstream_limits_and_usage_shape() + { + var router = new JetStreamApiRouter(new StreamManager(), new ConsumerManager()); + var response = router.Route("$JS.API.INFO", "{}"u8); + + response.AccountInfo.ShouldNotBeNull(); + response.Error.ShouldBeNull(); + } +} diff --git a/tests/NATS.Server.Tests/JetStreamApiFixture.cs b/tests/NATS.Server.Tests/JetStreamApiFixture.cs index 273ae87..8668c44 100644 --- a/tests/NATS.Server.Tests/JetStreamApiFixture.cs +++ b/tests/NATS.Server.Tests/JetStreamApiFixture.cs @@ -63,6 +63,13 @@ internal sealed class JetStreamApiFixture : IAsyncDisposable return fixture; } + public static async Task StartWithAckAllConsumerAsync() + { + var fixture = await StartWithStreamAsync("ORDERS", "orders.*"); + _ = await fixture.CreateConsumerAsync("ORDERS", "ACKALL", "orders.created", ackPolicy: AckPolicy.All); + return fixture; + } + public static async Task StartWithMirrorSetupAsync() { var fixture = await StartWithStreamAsync("ORDERS", "orders.*"); @@ -111,6 +118,16 @@ internal sealed class JetStreamApiFixture : IAsyncDisposable return PublishAndGetAckAsync(subject, payload); } + public Task PublishWithExpectedLastSeqAsync(string subject, string payload, ulong expectedLastSeq) + { + if (_publisher.TryCaptureWithOptions(subject, Encoding.UTF8.GetBytes(payload), new PublishOptions { ExpectedLastSeq = expectedLastSeq }, out var ack)) + { + return Task.FromResult(ack); + } + + return Task.FromResult(new PubAck { ErrorCode = 404 }); + } + public Task RequestLocalAsync(string subject, string payload) { return Task.FromResult(_router.Route(subject, Encoding.UTF8.GetBytes(payload))); @@ -148,6 +165,15 @@ internal sealed class JetStreamApiFixture : IAsyncDisposable return _consumerManager.FetchAsync(stream, durableName, batch, _streamManager, default).AsTask(); } + public Task FetchWithNoWaitAsync(string stream, string durableName, int batch) + { + return _consumerManager.FetchAsync(stream, durableName, new PullFetchRequest + { + Batch = batch, + NoWait = true, + }, _streamManager, default).AsTask(); + } + public async Task FetchAfterDelayAsync(string stream, string durableName, int delayMs, int batch) { await Task.Delay(delayMs); @@ -174,5 +200,22 @@ internal sealed class JetStreamApiFixture : IAsyncDisposable } } + public async Task PublishManyAsync(string subject, IReadOnlyList payloads) + { + foreach (var payload in payloads) + _ = await PublishAndGetAckAsync(subject, payload); + } + + public Task AckAllAsync(string stream, string durableName, ulong sequence) + { + _consumerManager.AckAll(stream, durableName, sequence); + return Task.CompletedTask; + } + + public Task GetPendingCountAsync(string stream, string durableName) + { + return Task.FromResult(_consumerManager.GetPendingCount(stream, durableName)); + } + public ValueTask DisposeAsync() => ValueTask.CompletedTask; } diff --git a/tests/NATS.Server.Tests/JetStreamApiInventoryTests.cs b/tests/NATS.Server.Tests/JetStreamApiInventoryTests.cs new file mode 100644 index 0000000..5dd78a3 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamApiInventoryTests.cs @@ -0,0 +1,55 @@ +using System.Diagnostics; + +namespace NATS.Server.Tests; + +public class JetStreamApiInventoryTests +{ + [Fact] + public void Go_inventory_contains_api_subjects_not_yet_mapped_in_dotnet() + { + var inventory = JetStreamApiInventory.LoadFromGoConstants(); + inventory.GoSubjects.ShouldContain("$JS.API.STREAM.UPDATE.*"); + inventory.GoSubjects.ShouldContain("$JS.API.CONSUMER.MSG.NEXT.*.*"); + inventory.GoSubjects.Count.ShouldBeGreaterThan(20); + } +} + +internal sealed class JetStreamApiInventory +{ + public IReadOnlyList GoSubjects { get; } + + private JetStreamApiInventory(IReadOnlyList goSubjects) + { + GoSubjects = goSubjects; + } + + public static JetStreamApiInventory LoadFromGoConstants() + { + var script = Path.Combine(AppContext.BaseDirectory, "../../../../../scripts/jetstream/extract-go-js-api.sh"); + script = Path.GetFullPath(script); + if (!File.Exists(script)) + throw new FileNotFoundException($"missing script: {script}"); + + var psi = new ProcessStartInfo + { + FileName = "bash", + RedirectStandardOutput = true, + RedirectStandardError = true, + }; + psi.ArgumentList.Add(script); + + using var process = Process.Start(psi) ?? throw new InvalidOperationException("failed to start inventory script"); + var output = process.StandardOutput.ReadToEnd(); + var errors = process.StandardError.ReadToEnd(); + process.WaitForExit(); + + if (process.ExitCode != 0) + throw new InvalidOperationException($"inventory script failed: {errors}"); + + var subjects = output + .Split('\n', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries) + .ToList(); + + return new JetStreamApiInventory(subjects); + } +} diff --git a/tests/NATS.Server.Tests/JetStreamApiProtocolIntegrationTests.cs b/tests/NATS.Server.Tests/JetStreamApiProtocolIntegrationTests.cs new file mode 100644 index 0000000..d4fdf98 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamApiProtocolIntegrationTests.cs @@ -0,0 +1,68 @@ +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Client.Core; +using NATS.Server.Configuration; + +namespace NATS.Server.Tests; + +public class JetStreamApiProtocolIntegrationTests +{ + [Fact] + public async Task Js_api_request_over_pub_reply_returns_response_message() + { + await using var server = await ServerFixture.StartJetStreamEnabledAsync(); + var response = await server.RequestAsync("$JS.API.INFO", "{}", timeoutMs: 1000); + + response.ShouldContain("\"error\""); + } +} + +internal sealed class ServerFixture : IAsyncDisposable +{ + private readonly NatsServer _server; + private readonly CancellationTokenSource _cts; + + private ServerFixture(NatsServer server, CancellationTokenSource cts) + { + _server = server; + _cts = cts; + } + + public static async Task StartJetStreamEnabledAsync() + { + var options = new NatsOptions + { + Host = "127.0.0.1", + Port = 0, + JetStream = new JetStreamOptions + { + StoreDir = Path.Combine(Path.GetTempPath(), $"nats-js-proto-{Guid.NewGuid():N}"), + MaxMemoryStore = 1024 * 1024, + MaxFileStore = 10 * 1024 * 1024, + }, + }; + + var server = new NatsServer(options, NullLoggerFactory.Instance); + var cts = new CancellationTokenSource(); + _ = server.StartAsync(cts.Token); + await server.WaitForReadyAsync(); + + return new ServerFixture(server, cts); + } + + public async Task RequestAsync(string subject, string payload, int timeoutMs) + { + await using var conn = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{_server.Port}" }); + await conn.ConnectAsync(); + + using var timeout = new CancellationTokenSource(TimeSpan.FromMilliseconds(timeoutMs)); + var response = await conn.RequestAsync(subject, payload, cancellationToken: timeout.Token); + return response.Data ?? string.Empty; + } + + public async ValueTask DisposeAsync() + { + await _cts.CancelAsync(); + _server.Dispose(); + _cts.Dispose(); + } +} diff --git a/tests/NATS.Server.Tests/JetStreamApiRouterCoverageTests.cs b/tests/NATS.Server.Tests/JetStreamApiRouterCoverageTests.cs new file mode 100644 index 0000000..46b031f --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamApiRouterCoverageTests.cs @@ -0,0 +1,32 @@ +using NATS.Server.JetStream; +using NATS.Server.JetStream.Api; + +namespace NATS.Server.Tests; + +public class JetStreamApiRouterCoverageTests +{ + [Theory] + [InlineData("$JS.API.STREAM.UPDATE.ORDERS")] + [InlineData("$JS.API.STREAM.DELETE.ORDERS")] + [InlineData("$JS.API.STREAM.PURGE.ORDERS")] + [InlineData("$JS.API.CONSUMER.DELETE.ORDERS.DUR")] + [InlineData("$JS.API.CONSUMER.MSG.NEXT.ORDERS.DUR")] + public void Router_recognizes_remaining_subject_families(string subject) + { + var streams = new StreamManager(); + _ = streams.CreateOrUpdate(new NATS.Server.JetStream.Models.StreamConfig + { + Name = "ORDERS", + Subjects = ["orders.*"], + }); + var consumers = new ConsumerManager(); + _ = consumers.CreateOrUpdate("ORDERS", new NATS.Server.JetStream.Models.ConsumerConfig + { + DurableName = "DUR", + }); + + var router = new JetStreamApiRouter(streams, consumers); + var response = router.Route(subject, "{}"u8); + response.Error.ShouldBeNull(); + } +} diff --git a/tests/NATS.Server.Tests/JetStreamClusterControlApiTests.cs b/tests/NATS.Server.Tests/JetStreamClusterControlApiTests.cs new file mode 100644 index 0000000..299cb0a --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamClusterControlApiTests.cs @@ -0,0 +1,19 @@ +namespace NATS.Server.Tests; + +public class JetStreamClusterControlApiTests +{ + [Fact] + public async Task Stream_leader_stepdown_and_meta_stepdown_endpoints_return_success_shape() + { + await using var fx = await JetStreamClusterFixture.StartAsync(nodes: 3); + + var create = await fx.CreateStreamAsync("ORDERS", replicas: 3); + create.Error.ShouldBeNull(); + + var streamStepdown = await fx.RequestAsync("$JS.API.STREAM.LEADER.STEPDOWN.ORDERS", "{}"); + streamStepdown.Success.ShouldBeTrue(); + + var metaStepdown = await fx.RequestAsync("$JS.API.META.LEADER.STEPDOWN", "{}"); + metaStepdown.Success.ShouldBeTrue(); + } +} diff --git a/tests/NATS.Server.Tests/JetStreamConsumerControlApiTests.cs b/tests/NATS.Server.Tests/JetStreamConsumerControlApiTests.cs new file mode 100644 index 0000000..47b3455 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamConsumerControlApiTests.cs @@ -0,0 +1,14 @@ +namespace NATS.Server.Tests; + +public class JetStreamConsumerControlApiTests +{ + [Fact] + public async Task Consumer_pause_reset_unpin_mutate_state() + { + await using var fx = await JetStreamApiFixture.StartWithPullConsumerAsync(); + + (await fx.RequestLocalAsync("$JS.API.CONSUMER.PAUSE.ORDERS.PULL", "{\"pause\":true}")).Success.ShouldBeTrue(); + (await fx.RequestLocalAsync("$JS.API.CONSUMER.RESET.ORDERS.PULL", "{}")).Success.ShouldBeTrue(); + (await fx.RequestLocalAsync("$JS.API.CONSUMER.UNPIN.ORDERS.PULL", "{}")).Success.ShouldBeTrue(); + } +} diff --git a/tests/NATS.Server.Tests/JetStreamConsumerListApiTests.cs b/tests/NATS.Server.Tests/JetStreamConsumerListApiTests.cs new file mode 100644 index 0000000..d0da678 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamConsumerListApiTests.cs @@ -0,0 +1,17 @@ +namespace NATS.Server.Tests; + +public class JetStreamConsumerListApiTests +{ + [Fact] + public async Task Consumer_names_list_and_delete_are_supported() + { + await using var fx = await JetStreamApiFixture.StartWithPullConsumerAsync(); + + var names = await fx.RequestLocalAsync("$JS.API.CONSUMER.NAMES.ORDERS", "{}"); + names.ConsumerNames.ShouldNotBeNull(); + names.ConsumerNames.ShouldContain("PULL"); + + var del = await fx.RequestLocalAsync("$JS.API.CONSUMER.DELETE.ORDERS.PULL", "{}"); + del.Success.ShouldBeTrue(); + } +} diff --git a/tests/NATS.Server.Tests/JetStreamConsumerNextApiTests.cs b/tests/NATS.Server.Tests/JetStreamConsumerNextApiTests.cs new file mode 100644 index 0000000..08a0d9e --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamConsumerNextApiTests.cs @@ -0,0 +1,15 @@ +namespace NATS.Server.Tests; + +public class JetStreamConsumerNextApiTests +{ + [Fact] + public async Task Consumer_msg_next_respects_batch_request() + { + await using var fx = await JetStreamApiFixture.StartWithPullConsumerAsync(); + _ = await fx.PublishAndGetAckAsync("orders.created", "1"); + + var next = await fx.RequestLocalAsync("$JS.API.CONSUMER.MSG.NEXT.ORDERS.PULL", "{\"batch\":1}"); + next.PullBatch.ShouldNotBeNull(); + next.PullBatch!.Messages.Count.ShouldBe(1); + } +} diff --git a/tests/NATS.Server.Tests/JetStreamDirectGetApiTests.cs b/tests/NATS.Server.Tests/JetStreamDirectGetApiTests.cs new file mode 100644 index 0000000..742e59e --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamDirectGetApiTests.cs @@ -0,0 +1,15 @@ +namespace NATS.Server.Tests; + +public class JetStreamDirectGetApiTests +{ + [Fact] + public async Task Direct_get_returns_message_without_stream_info_wrapper() + { + await using var fx = await JetStreamApiFixture.StartWithStreamAsync("ORDERS", "orders.*"); + var ack = await fx.PublishAndGetAckAsync("orders.created", "1"); + + var direct = await fx.RequestLocalAsync("$JS.API.DIRECT.GET.ORDERS", $"{{\"seq\":{ack.Seq}}}"); + direct.DirectMessage.ShouldNotBeNull(); + direct.DirectMessage!.Payload.ShouldBe("1"); + } +} diff --git a/tests/NATS.Server.Tests/JetStreamExpectedHeaderTests.cs b/tests/NATS.Server.Tests/JetStreamExpectedHeaderTests.cs new file mode 100644 index 0000000..ffe6b2d --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamExpectedHeaderTests.cs @@ -0,0 +1,14 @@ +namespace NATS.Server.Tests; + +public class JetStreamExpectedHeaderTests +{ + [Fact] + public async Task Expected_last_sequence_mismatch_returns_error() + { + await using var fx = await JetStreamApiFixture.StartWithStreamAsync("ORDERS", "orders.*"); + _ = await fx.PublishAndGetAckAsync("orders.created", "1"); + + var ack = await fx.PublishWithExpectedLastSeqAsync("orders.created", "2", expectedLastSeq: 999); + ack.ErrorCode.ShouldBe(10071); + } +} diff --git a/tests/NATS.Server.Tests/JetStreamIntegrationMatrix.cs b/tests/NATS.Server.Tests/JetStreamIntegrationMatrix.cs new file mode 100644 index 0000000..74f7c0a --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamIntegrationMatrix.cs @@ -0,0 +1,50 @@ +namespace NATS.Server.Tests; + +internal static class JetStreamIntegrationMatrix +{ + public static async Task<(bool Success, string Details)> RunScenarioAsync(string scenario) + { + try + { + return scenario switch + { + "stream-msg-delete-roundtrip" => await StreamMsgDeleteRoundtripAsync(), + "consumer-msg-next-no-wait" => await ConsumerNextNoWaitAsync(), + "direct-get-by-sequence" => await DirectGetBySequenceAsync(), + _ => (false, $"unknown scenario: {scenario}"), + }; + } + catch (Exception ex) + { + return (false, ex.Message); + } + } + + private static async Task<(bool Success, string Details)> StreamMsgDeleteRoundtripAsync() + { + await using var fx = await JetStreamApiFixture.StartWithStreamAsync("ORDERS", "orders.*"); + var ack = await fx.PublishAndGetAckAsync("orders.created", "1"); + + var del = await fx.RequestLocalAsync("$JS.API.STREAM.MSG.DELETE.ORDERS", $"{{\"seq\":{ack.Seq}}}"); + if (!del.Success) + return (false, "stream msg delete did not return success"); + + var get = await fx.RequestLocalAsync("$JS.API.STREAM.MSG.GET.ORDERS", $"{{\"seq\":{ack.Seq}}}"); + return (get.Error != null, get.Error == null ? "deleted message was still retrievable" : string.Empty); + } + + private static async Task<(bool Success, string Details)> ConsumerNextNoWaitAsync() + { + await using var fx = await JetStreamApiFixture.StartWithPullConsumerAsync(); + var batch = await fx.FetchWithNoWaitAsync("ORDERS", "PULL", 1); + return (batch.Messages.Count == 0 && !batch.TimedOut, batch.Messages.Count == 0 ? "batch timed out unexpectedly" : "expected empty pull batch"); + } + + private static async Task<(bool Success, string Details)> DirectGetBySequenceAsync() + { + await using var fx = await JetStreamApiFixture.StartWithStreamAsync("ORDERS", "orders.*"); + var ack = await fx.PublishAndGetAckAsync("orders.created", "1"); + var direct = await fx.RequestLocalAsync("$JS.API.DIRECT.GET.ORDERS", $"{{\"seq\":{ack.Seq}}}"); + return (direct.DirectMessage?.Payload == "1", direct.DirectMessage == null ? "direct message payload missing" : "unexpected direct message payload"); + } +} diff --git a/tests/NATS.Server.Tests/JetStreamIntegrationMatrixTests.cs b/tests/NATS.Server.Tests/JetStreamIntegrationMatrixTests.cs index 208d8b6..18cb385 100644 --- a/tests/NATS.Server.Tests/JetStreamIntegrationMatrixTests.cs +++ b/tests/NATS.Server.Tests/JetStreamIntegrationMatrixTests.cs @@ -3,30 +3,12 @@ namespace NATS.Server.Tests; public class JetStreamIntegrationMatrixTests { [Theory] - [InlineData("stream-create-update-delete")] - [InlineData("pull-consumer-ack-redelivery")] - [InlineData("mirror-source")] - public async Task Integration_matrix_case_passes(string scenario) + [InlineData("stream-msg-delete-roundtrip")] + [InlineData("consumer-msg-next-no-wait")] + [InlineData("direct-get-by-sequence")] + public async Task Integration_matrix_executes_real_server_scenarios(string scenario) { var result = await JetStreamIntegrationMatrix.RunScenarioAsync(scenario); - result.Success.ShouldBeTrue(); - } -} - -internal static class JetStreamIntegrationMatrix -{ - private static readonly HashSet SupportedScenarios = new(StringComparer.Ordinal) - { - "stream-create-update-delete", - "pull-consumer-ack-redelivery", - "mirror-source", - }; - - public static Task<(bool Success, string Details)> RunScenarioAsync(string scenario) - { - if (SupportedScenarios.Contains(scenario)) - return Task.FromResult((true, string.Empty)); - - return Task.FromResult((false, $"unknown matrix scenario: {scenario}")); + result.Success.ShouldBeTrue(result.Details); } } diff --git a/tests/NATS.Server.Tests/JetStreamMetaGroupTests.cs b/tests/NATS.Server.Tests/JetStreamMetaGroupTests.cs index 4f669f6..f4fbebf 100644 --- a/tests/NATS.Server.Tests/JetStreamMetaGroupTests.cs +++ b/tests/NATS.Server.Tests/JetStreamMetaGroupTests.cs @@ -1,4 +1,5 @@ using NATS.Server.JetStream; +using NATS.Server.JetStream.Api; using NATS.Server.JetStream.Cluster; using NATS.Server.JetStream.Models; @@ -23,18 +24,24 @@ internal sealed class JetStreamClusterFixture : IAsyncDisposable { private readonly JetStreamMetaGroup _metaGroup; private readonly StreamManager _streamManager; + private readonly ConsumerManager _consumerManager; + private readonly JetStreamApiRouter _router; - private JetStreamClusterFixture(JetStreamMetaGroup metaGroup, StreamManager streamManager) + private JetStreamClusterFixture(JetStreamMetaGroup metaGroup, StreamManager streamManager, ConsumerManager consumerManager, JetStreamApiRouter router) { _metaGroup = metaGroup; _streamManager = streamManager; + _consumerManager = consumerManager; + _router = router; } public static Task StartAsync(int nodes) { var meta = new JetStreamMetaGroup(nodes); var streamManager = new StreamManager(meta); - return Task.FromResult(new JetStreamClusterFixture(meta, streamManager)); + var consumerManager = new ConsumerManager(meta); + var router = new JetStreamApiRouter(streamManager, consumerManager, meta); + return Task.FromResult(new JetStreamClusterFixture(meta, streamManager, consumerManager, router)); } public Task CreateStreamAsync(string name, int replicas) @@ -50,5 +57,10 @@ internal sealed class JetStreamClusterFixture : IAsyncDisposable public Task GetMetaStateAsync() => Task.FromResult(_metaGroup.GetState()); + public Task RequestAsync(string subject, string payload) + { + return Task.FromResult(_router.Route(subject, System.Text.Encoding.UTF8.GetBytes(payload))); + } + public ValueTask DisposeAsync() => ValueTask.CompletedTask; } diff --git a/tests/NATS.Server.Tests/JetStreamMonitoringParityTests.cs b/tests/NATS.Server.Tests/JetStreamMonitoringParityTests.cs new file mode 100644 index 0000000..6f162fc --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamMonitoringParityTests.cs @@ -0,0 +1,43 @@ +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Server.Configuration; +using NATS.Server.Monitoring; + +namespace NATS.Server.Tests; + +public class JetStreamMonitoringParityTests +{ + [Fact] + public async Task Jsz_and_varz_include_expanded_runtime_fields() + { + var options = new NatsOptions + { + Host = "127.0.0.1", + Port = 0, + JetStream = new JetStreamOptions + { + StoreDir = Path.Combine(Path.GetTempPath(), $"nats-js-monitor-{Guid.NewGuid():N}"), + MaxMemoryStore = 1024 * 1024, + MaxFileStore = 10 * 1024 * 1024, + }, + }; + + var server = new NatsServer(options, NullLoggerFactory.Instance); + using var cts = new CancellationTokenSource(); + _ = server.StartAsync(cts.Token); + await server.WaitForReadyAsync(); + + _ = server.JetStreamApiRouter!.Route("$JS.API.STREAM.CREATE.ORDERS", "{\"subjects\":[\"orders.*\"]}"u8); + _ = server.JetStreamApiRouter!.Route("$JS.API.CONSUMER.CREATE.ORDERS.PULL", "{\"durable_name\":\"PULL\",\"filter_subject\":\"orders.created\"}"u8); + + var jsz = new JszHandler(server, options).Build(); + jsz.Streams.ShouldBeGreaterThanOrEqualTo(1); + jsz.Consumers.ShouldBeGreaterThanOrEqualTo(1); + jsz.ApiTotal.ShouldBeGreaterThanOrEqualTo((ulong)0); + + var varz = await new VarzHandler(server, options).HandleVarzAsync(); + varz.JetStream.Stats.Api.Total.ShouldBeGreaterThanOrEqualTo((ulong)0); + + await cts.CancelAsync(); + server.Dispose(); + } +} diff --git a/tests/NATS.Server.Tests/JetStreamPolicyValidationTests.cs b/tests/NATS.Server.Tests/JetStreamPolicyValidationTests.cs new file mode 100644 index 0000000..e1a276d --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamPolicyValidationTests.cs @@ -0,0 +1,22 @@ +using NATS.Server.JetStream.Models; +using NATS.Server.JetStream.Validation; + +namespace NATS.Server.Tests; + +public class JetStreamPolicyValidationTests +{ + [Fact] + public void Validator_rejects_invalid_policy_combinations() + { + var cfg = new StreamConfig + { + Name = "S", + Subjects = ["s.*"], + Retention = RetentionPolicy.WorkQueue, + MaxConsumers = 0, + }; + + var result = JetStreamConfigValidator.Validate(cfg); + result.IsValid.ShouldBeFalse(); + } +} diff --git a/tests/NATS.Server.Tests/JetStreamPullConsumerContractTests.cs b/tests/NATS.Server.Tests/JetStreamPullConsumerContractTests.cs new file mode 100644 index 0000000..7387325 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamPullConsumerContractTests.cs @@ -0,0 +1,14 @@ +namespace NATS.Server.Tests; + +public class JetStreamPullConsumerContractTests +{ + [Fact] + public async Task Pull_fetch_no_wait_returns_immediately_when_empty() + { + await using var fx = await JetStreamApiFixture.StartWithPullConsumerAsync(); + + var batch = await fx.FetchWithNoWaitAsync("ORDERS", "PULL", batch: 1); + batch.Messages.Count.ShouldBe(0); + batch.TimedOut.ShouldBeFalse(); + } +} diff --git a/tests/NATS.Server.Tests/JetStreamPushConsumerContractTests.cs b/tests/NATS.Server.Tests/JetStreamPushConsumerContractTests.cs new file mode 100644 index 0000000..075d6d1 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamPushConsumerContractTests.cs @@ -0,0 +1,17 @@ +namespace NATS.Server.Tests; + +public class JetStreamPushConsumerContractTests +{ + [Fact] + public async Task Ack_all_advances_floor_and_clears_pending_before_sequence() + { + await using var fx = await JetStreamApiFixture.StartWithAckAllConsumerAsync(); + await fx.PublishManyAsync("orders.created", ["1", "2", "3"]); + + var first = await fx.FetchAsync("ORDERS", "ACKALL", 3); + await fx.AckAllAsync("ORDERS", "ACKALL", first.Messages.Last().Sequence); + + var pending = await fx.GetPendingCountAsync("ORDERS", "ACKALL"); + pending.ShouldBe(0); + } +} diff --git a/tests/NATS.Server.Tests/JetStreamSnapshotRestoreApiTests.cs b/tests/NATS.Server.Tests/JetStreamSnapshotRestoreApiTests.cs new file mode 100644 index 0000000..7693ad1 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamSnapshotRestoreApiTests.cs @@ -0,0 +1,17 @@ +namespace NATS.Server.Tests; + +public class JetStreamSnapshotRestoreApiTests +{ + [Fact] + public async Task Snapshot_then_restore_reconstructs_messages() + { + await using var fx = await JetStreamApiFixture.StartWithStreamAsync("ORDERS", "orders.*"); + _ = await fx.PublishAndGetAckAsync("orders.created", "1"); + + var snap = await fx.RequestLocalAsync("$JS.API.STREAM.SNAPSHOT.ORDERS", "{}"); + snap.Snapshot.ShouldNotBeNull(); + + var restore = await fx.RequestLocalAsync("$JS.API.STREAM.RESTORE.ORDERS", snap.Snapshot!.Payload); + restore.Success.ShouldBeTrue(); + } +} diff --git a/tests/NATS.Server.Tests/JetStreamStoreIndexTests.cs b/tests/NATS.Server.Tests/JetStreamStoreIndexTests.cs new file mode 100644 index 0000000..9c1924a --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamStoreIndexTests.cs @@ -0,0 +1,18 @@ +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.Tests; + +public class JetStreamStoreIndexTests +{ + [Fact] + public async Task Store_can_get_last_message_by_subject() + { + var store = new MemStore(); + await store.AppendAsync("orders.created", "1"u8.ToArray(), default); + await store.AppendAsync("orders.updated", "2"u8.ToArray(), default); + await store.AppendAsync("orders.created", "3"u8.ToArray(), default); + + var last = await store.LoadLastBySubjectAsync("orders.created", default); + last!.Payload.Span.SequenceEqual("3"u8).ShouldBeTrue(); + } +} diff --git a/tests/NATS.Server.Tests/JetStreamStreamLifecycleApiTests.cs b/tests/NATS.Server.Tests/JetStreamStreamLifecycleApiTests.cs new file mode 100644 index 0000000..b066260 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamStreamLifecycleApiTests.cs @@ -0,0 +1,16 @@ +namespace NATS.Server.Tests; + +public class JetStreamStreamLifecycleApiTests +{ + [Fact] + public async Task Stream_update_and_delete_roundtrip() + { + await using var fx = await JetStreamApiFixture.StartWithStreamAsync("ORDERS", "orders.*"); + + var update = await fx.RequestLocalAsync("$JS.API.STREAM.UPDATE.ORDERS", "{\"subjects\":[\"orders.v2.*\"]}"); + update.Error.ShouldBeNull(); + + var delete = await fx.RequestLocalAsync("$JS.API.STREAM.DELETE.ORDERS", "{}"); + delete.Success.ShouldBeTrue(); + } +} diff --git a/tests/NATS.Server.Tests/JetStreamStreamListApiTests.cs b/tests/NATS.Server.Tests/JetStreamStreamListApiTests.cs new file mode 100644 index 0000000..b040463 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamStreamListApiTests.cs @@ -0,0 +1,16 @@ +namespace NATS.Server.Tests; + +public class JetStreamStreamListApiTests +{ + [Fact] + public async Task Stream_names_and_list_return_created_streams() + { + await using var fx = await JetStreamApiFixture.StartWithStreamAsync("ORDERS", "orders.*"); + _ = await fx.RequestLocalAsync("$JS.API.STREAM.CREATE.INVOICES", "{\"subjects\":[\"invoices.*\"]}"); + + var names = await fx.RequestLocalAsync("$JS.API.STREAM.NAMES", "{}"); + names.StreamNames.ShouldNotBeNull(); + names.StreamNames.ShouldContain("ORDERS"); + names.StreamNames.ShouldContain("INVOICES"); + } +} diff --git a/tests/NATS.Server.Tests/JetStreamStreamMessageApiTests.cs b/tests/NATS.Server.Tests/JetStreamStreamMessageApiTests.cs new file mode 100644 index 0000000..c7e7155 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamStreamMessageApiTests.cs @@ -0,0 +1,20 @@ +namespace NATS.Server.Tests; + +public class JetStreamStreamMessageApiTests +{ + [Fact] + public async Task Stream_msg_get_delete_and_purge_change_state() + { + await using var fx = await JetStreamApiFixture.StartWithStreamAsync("ORDERS", "orders.*"); + var ack = await fx.PublishAndGetAckAsync("orders.created", "1"); + + var get = await fx.RequestLocalAsync("$JS.API.STREAM.MSG.GET.ORDERS", $"{{\"seq\":{ack.Seq}}}"); + get.StreamMessage.ShouldNotBeNull(); + + var del = await fx.RequestLocalAsync("$JS.API.STREAM.MSG.DELETE.ORDERS", $"{{\"seq\":{ack.Seq}}}"); + del.Success.ShouldBeTrue(); + + var purge = await fx.RequestLocalAsync("$JS.API.STREAM.PURGE.ORDERS", "{}"); + purge.Success.ShouldBeTrue(); + } +} diff --git a/tests/NATS.Server.Tests/RaftSafetyContractTests.cs b/tests/NATS.Server.Tests/RaftSafetyContractTests.cs new file mode 100644 index 0000000..391f7b3 --- /dev/null +++ b/tests/NATS.Server.Tests/RaftSafetyContractTests.cs @@ -0,0 +1,19 @@ +using NATS.Server.Raft; + +namespace NATS.Server.Tests; + +public class RaftSafetyContractTests +{ + [Fact] + public async Task Follower_rejects_stale_term_vote_and_append() + { + var node = new RaftNode("n1"); + node.StartElection(clusterSize: 1); + + var staleVote = node.GrantVote(term: node.Term - 1); + staleVote.Granted.ShouldBeFalse(); + + await Should.ThrowAsync(async () => + await node.TryAppendFromLeaderAsync(new RaftLogEntry(1, node.Term - 1, "cmd"), default)); + } +} diff --git a/tests/NATS.Server.Tests/StreamStoreContractTests.cs b/tests/NATS.Server.Tests/StreamStoreContractTests.cs index c6ffa18..07d7e59 100644 --- a/tests/NATS.Server.Tests/StreamStoreContractTests.cs +++ b/tests/NATS.Server.Tests/StreamStoreContractTests.cs @@ -31,6 +31,18 @@ public class StreamStoreContractTests public ValueTask LoadAsync(ulong sequence, CancellationToken ct) => ValueTask.FromResult(null); + public ValueTask LoadLastBySubjectAsync(string subject, CancellationToken ct) + => ValueTask.FromResult(null); + + public ValueTask RemoveAsync(ulong sequence, CancellationToken ct) + => ValueTask.FromResult(false); + public ValueTask PurgeAsync(CancellationToken ct) => ValueTask.CompletedTask; + + public ValueTask CreateSnapshotAsync(CancellationToken ct) + => ValueTask.FromResult(Array.Empty()); + + public ValueTask RestoreSnapshotAsync(ReadOnlyMemory snapshot, CancellationToken ct) + => ValueTask.CompletedTask; } }