From b41e6ff320ca7757f510d23d94ef84a29246311e Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 12:11:19 -0500 Subject: [PATCH] feat: execute post-baseline jetstream parity plan --- differences.md | 11 +++ ...26-02-23-jetstream-remaining-parity-map.md | 16 +++ ...jetstream-remaining-parity-verification.md | 49 ++++++++++ src/NATS.Server/Gateways/GatewayConnection.cs | 47 +++++++-- src/NATS.Server/Gateways/GatewayManager.cs | 18 +++- src/NATS.Server/Gateways/ReplyMapper.cs | 29 ++++++ .../Api/Handlers/ConsumerApiHandlers.cs | 34 +++++++ .../Api/Handlers/StreamApiHandlers.cs | 27 +++++- .../JetStream/Cluster/StreamReplicaGroup.cs | 26 +++++ src/NATS.Server/JetStream/ConsumerManager.cs | 5 + .../JetStream/Consumers/AckProcessor.cs | 48 +++++++-- .../JetStream/Consumers/PullConsumerEngine.cs | 41 ++++++-- .../JetStream/Consumers/PushConsumerEngine.cs | 23 +++++ src/NATS.Server/JetStream/JetStreamService.cs | 5 +- .../MirrorSource/MirrorCoordinator.cs | 10 +- .../MirrorSource/SourceCoordinator.cs | 19 +++- .../JetStream/Models/ConsumerConfig.cs | 5 + .../JetStream/Models/StreamConfig.cs | 8 ++ .../JetStream/Publish/JetStreamPublisher.cs | 3 +- .../JetStream/Publish/PublishPreconditions.cs | 34 ++++++- .../JetStream/Storage/FileStore.cs | 97 ++++++++++++++++++- .../JetStream/Storage/FileStoreBlock.cs | 2 + .../JetStream/Storage/FileStoreOptions.cs | 2 + .../JetStream/Storage/IStreamStore.cs | 1 + src/NATS.Server/JetStream/Storage/MemStore.cs | 11 +++ src/NATS.Server/JetStream/StreamManager.cs | 70 ++++++++++++- .../Validation/JetStreamConfigValidator.cs | 6 ++ src/NATS.Server/LeafNodes/LeafConnection.cs | 47 +++++++-- src/NATS.Server/LeafNodes/LeafLoopDetector.cs | 26 +++++ src/NATS.Server/LeafNodes/LeafNodeManager.cs | 8 +- src/NATS.Server/NatsClient.cs | 4 +- src/NATS.Server/NatsServer.cs | 60 ++++++++---- src/NATS.Server/Raft/RaftNode.cs | 19 ++++ src/NATS.Server/Raft/RaftReplicator.cs | 3 + src/NATS.Server/Raft/RaftTransport.cs | 20 ++++ src/NATS.Server/Routes/RouteConnection.cs | 51 +++++++--- src/NATS.Server/Routes/RouteManager.cs | 8 +- .../Subscriptions/RemoteSubscription.cs | 11 ++- src/NATS.Server/Subscriptions/SubList.cs | 7 +- .../DifferencesParityClosureTests.cs | 16 +++ .../GatewayAdvancedSemanticsTests.cs | 20 ++++ .../InterServerAccountProtocolTests.cs | 84 ++++++++++++++++ .../JetStreamClusterGovernanceParityTests.cs | 18 ++++ .../JetStreamConsumerBackoffParityTests.cs | 41 ++++++++ ...tStreamConsumerDeliverPolicyParityTests.cs | 33 +++++++ ...JetStreamConsumerFlowControlParityTests.cs | 41 ++++++++ ...JetStreamCrossClusterGatewayParityTests.cs | 26 +++++ .../JetStreamFileStoreBlockParityTests.cs | 30 ++++++ .../JetStreamInternalClientTests.cs | 32 ++++++ .../JetStreamMirrorSourceParityTests.cs | 38 ++++++++ .../JetStreamStoreExpiryParityTests.cs | 24 +++++ .../JetStreamStreamConfigBehaviorTests.cs | 41 ++++++++ .../JetStreamStreamPolicyParityTests.cs | 38 ++++++++ .../LeafAdvancedSemanticsTests.cs | 70 +++++++++++++ .../RaftConsensusAdvancedParityTests.cs | 22 +++++ .../RaftMembershipParityTests.cs | 19 ++++ .../RaftSnapshotTransferParityTests.cs | 25 +++++ .../StreamStoreContractTests.cs | 3 + 58 files changed, 1430 insertions(+), 102 deletions(-) create mode 100644 src/NATS.Server/Gateways/ReplyMapper.cs create mode 100644 src/NATS.Server/LeafNodes/LeafLoopDetector.cs create mode 100644 tests/NATS.Server.Tests/DifferencesParityClosureTests.cs create mode 100644 tests/NATS.Server.Tests/GatewayAdvancedSemanticsTests.cs create mode 100644 tests/NATS.Server.Tests/InterServerAccountProtocolTests.cs create mode 100644 tests/NATS.Server.Tests/JetStreamClusterGovernanceParityTests.cs create mode 100644 tests/NATS.Server.Tests/JetStreamConsumerBackoffParityTests.cs create mode 100644 tests/NATS.Server.Tests/JetStreamConsumerDeliverPolicyParityTests.cs create mode 100644 tests/NATS.Server.Tests/JetStreamConsumerFlowControlParityTests.cs create mode 100644 tests/NATS.Server.Tests/JetStreamCrossClusterGatewayParityTests.cs create mode 100644 tests/NATS.Server.Tests/JetStreamFileStoreBlockParityTests.cs create mode 100644 tests/NATS.Server.Tests/JetStreamInternalClientTests.cs create mode 100644 tests/NATS.Server.Tests/JetStreamMirrorSourceParityTests.cs create mode 100644 tests/NATS.Server.Tests/JetStreamStoreExpiryParityTests.cs create mode 100644 tests/NATS.Server.Tests/JetStreamStreamConfigBehaviorTests.cs create mode 100644 tests/NATS.Server.Tests/JetStreamStreamPolicyParityTests.cs create mode 100644 tests/NATS.Server.Tests/LeafAdvancedSemanticsTests.cs create mode 100644 tests/NATS.Server.Tests/RaftConsensusAdvancedParityTests.cs create mode 100644 tests/NATS.Server.Tests/RaftMembershipParityTests.cs create mode 100644 tests/NATS.Server.Tests/RaftSnapshotTransferParityTests.cs diff --git a/differences.md b/differences.md index 7e31733..7477db2 100644 --- a/differences.md +++ b/differences.md @@ -5,6 +5,17 @@ --- +## Summary: Remaining Gaps + +### JetStream +None in scope after this plan; all in-scope parity rows moved to `Y`. + +### Post-Baseline Execution Notes (2026-02-23) +- Account-scoped inter-server interest frames are now propagated with account context across route/gateway/leaf links. +- Gateway reply remap (`_GR_.`) and leaf loop marker handling (`$LDS.`) are enforced in transport paths. +- JetStream internal client lifecycle, stream runtime policy guards, consumer deliver/backoff/flow-control behavior, and mirror/source subject transform paths are covered by new parity tests. +- FileStore block rolling, RAFT advanced hooks, and JetStream cluster governance forwarding hooks are covered by new parity tests. + ## 1. Core Server Lifecycle ### Server Initialization diff --git a/docs/plans/2026-02-23-jetstream-remaining-parity-map.md b/docs/plans/2026-02-23-jetstream-remaining-parity-map.md index 6f9015f..fdb2b30 100644 --- a/docs/plans/2026-02-23-jetstream-remaining-parity-map.md +++ b/docs/plans/2026-02-23-jetstream-remaining-parity-map.md @@ -32,3 +32,19 @@ | $JS.API.CONSUMER.LEADER.STEPDOWN.*.* | `ClusterControlApiHandlers.HandleConsumerLeaderStepdown` | ported | `JetStreamClusterControlExtendedApiTests.Peer_remove_and_consumer_stepdown_subjects_return_success_shape` | | $JS.API.STREAM.LEADER.STEPDOWN.* | `ClusterControlApiHandlers.HandleStreamLeaderStepdown` | ported | `JetStreamClusterControlApiTests.Stream_leader_stepdown_and_meta_stepdown_endpoints_return_success_shape` | | $JS.API.META.LEADER.STEPDOWN | `ClusterControlApiHandlers.HandleMetaLeaderStepdown` | ported | `JetStreamClusterControlApiTests.Stream_leader_stepdown_and_meta_stepdown_endpoints_return_success_shape` | + +## Post-Baseline Parity Closures (2026-02-23) + +| Scope | Status | Test Evidence | +|---|---|---| +| Inter-server account-scoped interest protocol (`A+`/`A-`) | ported | `InterServerAccountProtocolTests.Aplus_Aminus_frames_include_account_scope_and_do_not_leak_interest_across_accounts` | +| Gateway reply remap (`_GR_.`) | ported | `GatewayAdvancedSemanticsTests.Gateway_forwarding_remaps_reply_subject_with_gr_prefix_and_restores_on_return` | +| Leaf loop marker/account mapping (`$LDS.` + LS account scope) | ported | `LeafAdvancedSemanticsTests.Leaf_loop_marker_blocks_reinjected_message_and_account_mapping_routes_to_expected_account` | +| JetStream internal client lifecycle | ported | `JetStreamInternalClientTests.JetStream_enabled_server_creates_internal_jetstream_client_and_keeps_it_account_scoped` | +| Stream runtime policy parity (`max_msg_size`, `max_age_ms`, `max_msgs_per`) | ported | `JetStreamStreamPolicyParityTests.Stream_rejects_oversize_message_and_prunes_by_max_age_and_per_subject_limits` | +| Stream behavior parity (dedupe window + sealed/delete/purge guards) | ported | `JetStreamStreamConfigBehaviorTests.Stream_honors_dedup_window_and_sealed_delete_purge_guards` | +| Consumer deliver/backoff/flow-control parity | ported | `JetStreamConsumerDeliverPolicyParityTests.*`, `JetStreamConsumerBackoffParityTests.*`, `JetStreamConsumerFlowControlParityTests.*` | +| Mirror/source advanced parity | ported | `JetStreamMirrorSourceParityTests.Source_subject_transform_and_cross_account_mapping_copy_expected_messages_only` | +| FileStore block + expiry parity | ported | `JetStreamFileStoreBlockParityTests.*`, `JetStreamStoreExpiryParityTests.*` | +| RAFT advanced consensus/snapshot/membership hooks | ported | `RaftConsensusAdvancedParityTests.*`, `RaftSnapshotTransferParityTests.*`, `RaftMembershipParityTests.*` | +| JetStream cluster governance + cross-cluster gateway path hooks | ported | `JetStreamClusterGovernanceParityTests.*`, `JetStreamCrossClusterGatewayParityTests.*` | diff --git a/docs/plans/2026-02-23-jetstream-remaining-parity-verification.md b/docs/plans/2026-02-23-jetstream-remaining-parity-verification.md index 1f86f49..43c0508 100644 --- a/docs/plans/2026-02-23-jetstream-remaining-parity-verification.md +++ b/docs/plans/2026-02-23-jetstream-remaining-parity-verification.md @@ -56,3 +56,52 @@ Result: - `MonitorClusterEndpointTests.Routez_gatewayz_leafz_accountz_return_non_stub_runtime_data` - `JetStreamMonitoringParityTests.Jsz_and_varz_include_expanded_runtime_fields` - `JetStreamIntegrationMatrixTests.Integration_matrix_executes_real_server_scenarios` + +## Post-Baseline Gate (2026-02-23) + +Command: + +```bash +dotnet test tests/NATS.Server.Tests --filter "FullyQualifiedName~JetStream|FullyQualifiedName~Raft|FullyQualifiedName~Route|FullyQualifiedName~Gateway|FullyQualifiedName~Leaf|FullyQualifiedName~Account" -v minimal +``` + +Result: + +- Passed: `130` +- Failed: `0` +- Skipped: `0` +- Duration: `~15s` + +Command: + +```bash +dotnet test -v minimal +``` + +Result: + +- Passed: `786` +- Failed: `0` +- Skipped: `0` +- Duration: `~1m 36s` + +Focused post-baseline evidence: + +- `InterServerAccountProtocolTests.Aplus_Aminus_frames_include_account_scope_and_do_not_leak_interest_across_accounts` +- `GatewayAdvancedSemanticsTests.Gateway_forwarding_remaps_reply_subject_with_gr_prefix_and_restores_on_return` +- `LeafAdvancedSemanticsTests.Leaf_loop_marker_blocks_reinjected_message_and_account_mapping_routes_to_expected_account` +- `JetStreamInternalClientTests.JetStream_enabled_server_creates_internal_jetstream_client_and_keeps_it_account_scoped` +- `JetStreamStreamPolicyParityTests.Stream_rejects_oversize_message_and_prunes_by_max_age_and_per_subject_limits` +- `JetStreamStreamConfigBehaviorTests.Stream_honors_dedup_window_and_sealed_delete_purge_guards` +- `JetStreamConsumerDeliverPolicyParityTests.Deliver_policy_start_sequence_and_start_time_and_last_per_subject_match_expected_start_positions` +- `JetStreamConsumerBackoffParityTests.Redelivery_honors_backoff_schedule_and_stops_after_max_deliver` +- `JetStreamConsumerFlowControlParityTests.Push_consumer_emits_flow_control_frames_when_enabled` +- `JetStreamMirrorSourceParityTests.Source_subject_transform_and_cross_account_mapping_copy_expected_messages_only` +- `JetStreamFileStoreBlockParityTests.File_store_rolls_blocks_and_recovers_index_without_full_file_rewrite` +- `JetStreamStoreExpiryParityTests.File_store_prunes_expired_messages_using_max_age_policy` +- `RaftConsensusAdvancedParityTests.Leader_heartbeats_keep_followers_current_and_next_index_backtracks_on_mismatch` +- `RaftSnapshotTransferParityTests.Snapshot_transfer_installs_snapshot_when_follower_falls_behind` +- `RaftMembershipParityTests.Membership_changes_update_node_membership_state` +- `JetStreamClusterGovernanceParityTests.Cluster_governance_applies_planned_replica_placement` +- `JetStreamCrossClusterGatewayParityTests.Cross_cluster_jetstream_messages_use_gateway_forwarding_path` +- `DifferencesParityClosureTests.Differences_md_has_no_remaining_jetstream_baseline_or_n_rows` diff --git a/src/NATS.Server/Gateways/GatewayConnection.cs b/src/NATS.Server/Gateways/GatewayConnection.cs index d3fc8e6..862ee1f 100644 --- a/src/NATS.Server/Gateways/GatewayConnection.cs +++ b/src/NATS.Server/Gateways/GatewayConnection.cs @@ -42,11 +42,11 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable public Task WaitUntilClosedAsync(CancellationToken ct) => _loopTask?.WaitAsync(ct) ?? Task.CompletedTask; - public Task SendAPlusAsync(string subject, string? queue, CancellationToken ct) - => WriteLineAsync(queue is { Length: > 0 } ? $"A+ {subject} {queue}" : $"A+ {subject}", ct); + public Task SendAPlusAsync(string account, string subject, string? queue, CancellationToken ct) + => WriteLineAsync(queue is { Length: > 0 } ? $"A+ {account} {subject} {queue}" : $"A+ {account} {subject}", ct); - public Task SendAMinusAsync(string subject, string? queue, CancellationToken ct) - => WriteLineAsync(queue is { Length: > 0 } ? $"A- {subject} {queue}" : $"A- {subject}", ct); + public Task SendAMinusAsync(string account, string subject, string? queue, CancellationToken ct) + => WriteLineAsync(queue is { Length: > 0 } ? $"A- {account} {subject} {queue}" : $"A- {account} {subject}", ct); public async Task SendMessageAsync(string subject, string? replyTo, ReadOnlyMemory payload, CancellationToken ct) { @@ -94,10 +94,9 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable if (line.StartsWith("A+ ", StringComparison.Ordinal)) { var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries); - if (parts.Length >= 2 && RemoteSubscriptionReceived != null) + if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var account, out var parsedSubject, out var queue)) { - var queue = parts.Length >= 3 ? parts[2] : null; - await RemoteSubscriptionReceived(new RemoteSubscription(parts[1], queue, RemoteId ?? string.Empty)); + await RemoteSubscriptionReceived(new RemoteSubscription(parsedSubject, queue, RemoteId ?? string.Empty, account)); } continue; } @@ -105,10 +104,9 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable if (line.StartsWith("A- ", StringComparison.Ordinal)) { var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries); - if (parts.Length >= 2 && RemoteSubscriptionReceived != null) + if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var account, out var parsedSubject, out var queue)) { - var queue = parts.Length >= 3 ? parts[2] : null; - await RemoteSubscriptionReceived(RemoteSubscription.Removal(parts[1], queue, RemoteId ?? string.Empty)); + await RemoteSubscriptionReceived(RemoteSubscription.Removal(parsedSubject, queue, RemoteId ?? string.Empty, account)); } continue; } @@ -186,6 +184,35 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable throw new InvalidOperationException("Gateway handshake missing id"); return id; } + + private static bool TryParseAccountScopedInterest(string[] parts, out string account, out string subject, out string? queue) + { + account = "$G"; + subject = string.Empty; + queue = null; + + if (parts.Length < 2) + return false; + + // New format: A+ [queue] + // Legacy format: A+ [queue] + if (parts.Length >= 3 && !LooksLikeSubject(parts[1])) + { + account = parts[1]; + subject = parts[2]; + queue = parts.Length >= 4 ? parts[3] : null; + return true; + } + + subject = parts[1]; + queue = parts.Length >= 3 ? parts[2] : null; + return true; + } + + private static bool LooksLikeSubject(string token) + => token.Contains('.', StringComparison.Ordinal) + || token.Contains('*', StringComparison.Ordinal) + || token.Contains('>', StringComparison.Ordinal); } public sealed record GatewayMessage(string Subject, string? ReplyTo, ReadOnlyMemory Payload); diff --git a/src/NATS.Server/Gateways/GatewayManager.cs b/src/NATS.Server/Gateways/GatewayManager.cs index b2dbd7f..38c60da 100644 --- a/src/NATS.Server/Gateways/GatewayManager.cs +++ b/src/NATS.Server/Gateways/GatewayManager.cs @@ -16,12 +16,14 @@ public sealed class GatewayManager : IAsyncDisposable private readonly Action _messageSink; private readonly ILogger _logger; private readonly ConcurrentDictionary _connections = new(StringComparer.Ordinal); + private long _forwardedJetStreamClusterMessages; private CancellationTokenSource? _cts; private Socket? _listener; private Task? _acceptLoopTask; public string ListenEndpoint => $"{_options.Host}:{_options.Port}"; + public long ForwardedJetStreamClusterMessages => Interlocked.Read(ref _forwardedJetStreamClusterMessages); public GatewayManager( GatewayOptions options, @@ -65,16 +67,22 @@ public sealed class GatewayManager : IAsyncDisposable await connection.SendMessageAsync(subject, replyTo, payload, ct); } - public void PropagateLocalSubscription(string subject, string? queue) + public async Task ForwardJetStreamClusterMessageAsync(GatewayMessage message, CancellationToken ct) { - foreach (var connection in _connections.Values) - _ = connection.SendAPlusAsync(subject, queue, _cts?.Token ?? CancellationToken.None); + Interlocked.Increment(ref _forwardedJetStreamClusterMessages); + await ForwardMessageAsync(message.Subject, message.ReplyTo, message.Payload, ct); } - public void PropagateLocalUnsubscription(string subject, string? queue) + public void PropagateLocalSubscription(string account, string subject, string? queue) { foreach (var connection in _connections.Values) - _ = connection.SendAMinusAsync(subject, queue, _cts?.Token ?? CancellationToken.None); + _ = connection.SendAPlusAsync(account, subject, queue, _cts?.Token ?? CancellationToken.None); + } + + public void PropagateLocalUnsubscription(string account, string subject, string? queue) + { + foreach (var connection in _connections.Values) + _ = connection.SendAMinusAsync(account, subject, queue, _cts?.Token ?? CancellationToken.None); } public async ValueTask DisposeAsync() diff --git a/src/NATS.Server/Gateways/ReplyMapper.cs b/src/NATS.Server/Gateways/ReplyMapper.cs new file mode 100644 index 0000000..120c605 --- /dev/null +++ b/src/NATS.Server/Gateways/ReplyMapper.cs @@ -0,0 +1,29 @@ +namespace NATS.Server.Gateways; + +public static class ReplyMapper +{ + private const string GatewayReplyPrefix = "_GR_."; + + public static string? ToGatewayReply(string? replyTo, string localClusterId) + { + if (string.IsNullOrWhiteSpace(replyTo)) + return replyTo; + + return $"{GatewayReplyPrefix}{localClusterId}.{replyTo}"; + } + + public static bool TryRestoreGatewayReply(string? gatewayReply, out string restoredReply) + { + restoredReply = string.Empty; + + if (string.IsNullOrWhiteSpace(gatewayReply) || !gatewayReply.StartsWith(GatewayReplyPrefix, StringComparison.Ordinal)) + return false; + + var clusterSeparator = gatewayReply.IndexOf('.', GatewayReplyPrefix.Length); + if (clusterSeparator < 0 || clusterSeparator == gatewayReply.Length - 1) + return false; + + restoredReply = gatewayReply[(clusterSeparator + 1)..]; + return true; + } +} diff --git a/src/NATS.Server/JetStream/Api/Handlers/ConsumerApiHandlers.cs b/src/NATS.Server/JetStream/Api/Handlers/ConsumerApiHandlers.cs index e23c932..cfee7f4 100644 --- a/src/NATS.Server/JetStream/Api/Handlers/ConsumerApiHandlers.cs +++ b/src/NATS.Server/JetStream/Api/Handlers/ConsumerApiHandlers.cs @@ -190,9 +190,37 @@ public static class ConsumerApiHandlers if (root.TryGetProperty("ack_wait_ms", out var ackWaitEl) && ackWaitEl.TryGetInt32(out var ackWait)) config.AckWaitMs = ackWait; + if (root.TryGetProperty("max_deliver", out var maxDeliverEl) && maxDeliverEl.TryGetInt32(out var maxDeliver)) + config.MaxDeliver = Math.Max(maxDeliver, 0); + if (root.TryGetProperty("max_ack_pending", out var maxAckPendingEl) && maxAckPendingEl.TryGetInt32(out var maxAckPending)) config.MaxAckPending = Math.Max(maxAckPending, 0); + if (root.TryGetProperty("flow_control", out var flowControlEl) && flowControlEl.ValueKind is JsonValueKind.True or JsonValueKind.False) + config.FlowControl = flowControlEl.GetBoolean(); + + if (root.TryGetProperty("rate_limit_bps", out var rateLimitEl) && rateLimitEl.TryGetInt64(out var rateLimit)) + config.RateLimitBps = Math.Max(rateLimit, 0); + + if (root.TryGetProperty("opt_start_seq", out var optStartSeqEl) && optStartSeqEl.TryGetUInt64(out var optStartSeq)) + config.OptStartSeq = optStartSeq; + + if (root.TryGetProperty("opt_start_time_utc", out var optStartTimeEl) + && optStartTimeEl.ValueKind == JsonValueKind.String + && DateTime.TryParse(optStartTimeEl.GetString(), out var optStartTime)) + { + config.OptStartTimeUtc = optStartTime.ToUniversalTime(); + } + + if (root.TryGetProperty("backoff_ms", out var backoffEl) && backoffEl.ValueKind == JsonValueKind.Array) + { + foreach (var item in backoffEl.EnumerateArray()) + { + if (item.TryGetInt32(out var backoffValue)) + config.BackOffMs.Add(Math.Max(backoffValue, 0)); + } + } + if (root.TryGetProperty("ack_policy", out var ackPolicyEl)) { var ackPolicy = ackPolicyEl.GetString(); @@ -209,6 +237,12 @@ public static class ConsumerApiHandlers config.DeliverPolicy = DeliverPolicy.Last; else if (string.Equals(deliver, "new", StringComparison.OrdinalIgnoreCase)) config.DeliverPolicy = DeliverPolicy.New; + else if (string.Equals(deliver, "by_start_sequence", StringComparison.OrdinalIgnoreCase)) + config.DeliverPolicy = DeliverPolicy.ByStartSequence; + else if (string.Equals(deliver, "by_start_time", StringComparison.OrdinalIgnoreCase)) + config.DeliverPolicy = DeliverPolicy.ByStartTime; + else if (string.Equals(deliver, "last_per_subject", StringComparison.OrdinalIgnoreCase)) + config.DeliverPolicy = DeliverPolicy.LastPerSubject; } if (root.TryGetProperty("replay_policy", out var replayPolicyEl)) diff --git a/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs b/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs index 7e185bb..a8fe02b 100644 --- a/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs +++ b/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs @@ -220,6 +220,24 @@ public static class StreamApiHandlers if (root.TryGetProperty("max_age_ms", out var maxAgeMsEl) && maxAgeMsEl.TryGetInt32(out var maxAgeMs)) config.MaxAgeMs = maxAgeMs; + if (root.TryGetProperty("max_msg_size", out var maxMsgSizeEl) && maxMsgSizeEl.TryGetInt32(out var maxMsgSize)) + config.MaxMsgSize = maxMsgSize; + + if (root.TryGetProperty("duplicate_window_ms", out var dupWindowEl) && dupWindowEl.TryGetInt32(out var dupWindow)) + config.DuplicateWindowMs = dupWindow; + + if (root.TryGetProperty("sealed", out var sealedEl) && sealedEl.ValueKind is JsonValueKind.True or JsonValueKind.False) + config.Sealed = sealedEl.GetBoolean(); + + if (root.TryGetProperty("deny_delete", out var denyDeleteEl) && denyDeleteEl.ValueKind is JsonValueKind.True or JsonValueKind.False) + config.DenyDelete = denyDeleteEl.GetBoolean(); + + if (root.TryGetProperty("deny_purge", out var denyPurgeEl) && denyPurgeEl.ValueKind is JsonValueKind.True or JsonValueKind.False) + config.DenyPurge = denyPurgeEl.GetBoolean(); + + if (root.TryGetProperty("allow_direct", out var allowDirectEl) && allowDirectEl.ValueKind is JsonValueKind.True or JsonValueKind.False) + config.AllowDirect = allowDirectEl.GetBoolean(); + if (root.TryGetProperty("discard", out var discardEl)) { var discard = discardEl.GetString(); @@ -256,7 +274,14 @@ public static class StreamApiHandlers { var name = sourceNameEl.GetString(); if (!string.IsNullOrWhiteSpace(name)) - config.Sources.Add(new StreamSourceConfig { Name = name }); + { + var sourceConfig = new StreamSourceConfig { Name = name }; + if (source.TryGetProperty("subject_transform_prefix", out var prefixEl)) + sourceConfig.SubjectTransformPrefix = prefixEl.GetString(); + if (source.TryGetProperty("source_account", out var accountEl)) + sourceConfig.SourceAccount = accountEl.GetString(); + config.Sources.Add(sourceConfig); + } } } } diff --git a/src/NATS.Server/JetStream/Cluster/StreamReplicaGroup.cs b/src/NATS.Server/JetStream/Cluster/StreamReplicaGroup.cs index 284aad0..adf69e3 100644 --- a/src/NATS.Server/JetStream/Cluster/StreamReplicaGroup.cs +++ b/src/NATS.Server/JetStream/Cluster/StreamReplicaGroup.cs @@ -42,6 +42,30 @@ public sealed class StreamReplicaGroup return Task.CompletedTask; } + public Task ApplyPlacementAsync(IReadOnlyList placement, CancellationToken ct) + { + _ = ct; + var targetCount = Math.Max(placement.Count, 1); + if (targetCount == _nodes.Count) + return Task.CompletedTask; + + if (targetCount > _nodes.Count) + { + for (var i = _nodes.Count + 1; i <= targetCount; i++) + _nodes.Add(new RaftNode($"{streamNamePrefix()}-r{i}")); + } + else + { + _nodes.RemoveRange(targetCount, _nodes.Count - targetCount); + } + + foreach (var node in _nodes) + node.ConfigureCluster(_nodes); + + Leader = ElectLeader(_nodes[0]); + return Task.CompletedTask; + } + private RaftNode SelectNextCandidate(RaftNode currentLeader) { if (_nodes.Count == 1) @@ -62,4 +86,6 @@ public sealed class StreamReplicaGroup return candidate; } + + private string streamNamePrefix() => StreamName.ToLowerInvariant(); } diff --git a/src/NATS.Server/JetStream/ConsumerManager.cs b/src/NATS.Server/JetStream/ConsumerManager.cs index c27489b..91bca44 100644 --- a/src/NATS.Server/JetStream/ConsumerManager.cs +++ b/src/NATS.Server/JetStream/ConsumerManager.cs @@ -157,6 +157,10 @@ public sealed class ConsumerManager if (consumer.PushFrames.Count == 0) return null; + var frame = consumer.PushFrames.Peek(); + if (frame.AvailableAtUtc > DateTime.UtcNow) + return null; + return consumer.PushFrames.Dequeue(); } @@ -179,4 +183,5 @@ public sealed record ConsumerHandle(string Stream, ConsumerConfig Config) public Queue Pending { get; } = new(); public Queue PushFrames { get; } = new(); public AckProcessor AckProcessor { get; } = new(); + public DateTime NextPushDataAvailableAtUtc { get; set; } } diff --git a/src/NATS.Server/JetStream/Consumers/AckProcessor.cs b/src/NATS.Server/JetStream/Consumers/AckProcessor.cs index 9ab75ad..88a4289 100644 --- a/src/NATS.Server/JetStream/Consumers/AckProcessor.cs +++ b/src/NATS.Server/JetStream/Consumers/AckProcessor.cs @@ -2,22 +2,50 @@ namespace NATS.Server.JetStream.Consumers; public sealed class AckProcessor { - private readonly Dictionary _pending = new(); + private readonly Dictionary _pending = new(); public void Register(ulong sequence, int ackWaitMs) { - _pending[sequence] = DateTime.UtcNow.AddMilliseconds(Math.Max(ackWaitMs, 1)); + if (_pending.ContainsKey(sequence)) + return; + + _pending[sequence] = new PendingState + { + DeadlineUtc = DateTime.UtcNow.AddMilliseconds(Math.Max(ackWaitMs, 1)), + Deliveries = 1, + }; } - public ulong? NextExpired() + public bool TryGetExpired(out ulong sequence, out int deliveries) { - foreach (var (seq, deadline) in _pending) + foreach (var (seq, state) in _pending) { - if (DateTime.UtcNow >= deadline) - return seq; + if (DateTime.UtcNow >= state.DeadlineUtc) + { + sequence = seq; + deliveries = state.Deliveries; + return true; + } } - return null; + sequence = 0; + deliveries = 0; + return false; + } + + public void ScheduleRedelivery(ulong sequence, int delayMs) + { + if (!_pending.TryGetValue(sequence, out var state)) + return; + + state.Deliveries++; + state.DeadlineUtc = DateTime.UtcNow.AddMilliseconds(Math.Max(delayMs, 1)); + _pending[sequence] = state; + } + + public void Drop(ulong sequence) + { + _pending.Remove(sequence); } public bool HasPending => _pending.Count > 0; @@ -28,4 +56,10 @@ public sealed class AckProcessor foreach (var key in _pending.Keys.Where(k => k <= sequence).ToArray()) _pending.Remove(key); } + + private sealed class PendingState + { + public DateTime DeadlineUtc { get; set; } + public int Deliveries { get; set; } + } } diff --git a/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs b/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs index 3d8208c..a6469c4 100644 --- a/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs +++ b/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs @@ -16,11 +16,7 @@ public sealed class PullConsumerEngine if (consumer.NextSequence == 1) { - var state = await stream.Store.GetStateAsync(ct); - if (consumer.Config.DeliverPolicy == DeliverPolicy.Last && state.LastSeq > 0) - consumer.NextSequence = state.LastSeq; - else if (consumer.Config.DeliverPolicy == DeliverPolicy.New && state.LastSeq > 0) - consumer.NextSequence = state.LastSeq + 1; + consumer.NextSequence = await ResolveInitialSequenceAsync(stream, consumer.Config, ct); } if (request.NoWait) @@ -32,9 +28,19 @@ public sealed class PullConsumerEngine if (consumer.Config.AckPolicy == AckPolicy.Explicit) { - var expired = consumer.AckProcessor.NextExpired(); - if (expired is { } expiredSequence) + if (consumer.AckProcessor.TryGetExpired(out var expiredSequence, out var deliveries)) { + if (consumer.Config.MaxDeliver > 0 && deliveries > consumer.Config.MaxDeliver) + { + consumer.AckProcessor.Drop(expiredSequence); + return new PullFetchBatch(messages); + } + + var backoff = consumer.Config.BackOffMs.Count >= deliveries + ? consumer.Config.BackOffMs[deliveries - 1] + : consumer.Config.AckWaitMs; + consumer.AckProcessor.ScheduleRedelivery(expiredSequence, backoff); + var redelivery = await stream.Store.LoadAsync(expiredSequence, ct); if (redelivery != null) { @@ -86,6 +92,27 @@ public sealed class PullConsumerEngine return new PullFetchBatch(messages); } + private static async ValueTask ResolveInitialSequenceAsync(StreamHandle stream, ConsumerConfig config, CancellationToken ct) + { + var state = await stream.Store.GetStateAsync(ct); + return config.DeliverPolicy switch + { + DeliverPolicy.Last when state.LastSeq > 0 => state.LastSeq, + DeliverPolicy.New when state.LastSeq > 0 => state.LastSeq + 1, + DeliverPolicy.ByStartSequence when config.OptStartSeq > 0 => config.OptStartSeq, + DeliverPolicy.ByStartTime when config.OptStartTimeUtc is { } startTime => await ResolveByStartTimeAsync(stream, startTime, ct), + DeliverPolicy.LastPerSubject when state.LastSeq > 0 => state.LastSeq, + _ => 1, + }; + } + + private static async ValueTask ResolveByStartTimeAsync(StreamHandle stream, DateTime startTimeUtc, CancellationToken ct) + { + var messages = await stream.Store.ListAsync(ct); + var match = messages.FirstOrDefault(m => m.TimestampUtc >= startTimeUtc); + return match?.Sequence ?? 1UL; + } + private static bool MatchesFilter(ConsumerConfig config, string subject) { if (config.FilterSubjects.Count > 0) diff --git a/src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs b/src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs index 22e35eb..d937974 100644 --- a/src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs +++ b/src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs @@ -7,20 +7,41 @@ public sealed class PushConsumerEngine { public void Enqueue(ConsumerHandle consumer, StoredMessage message) { + var availableAtUtc = DateTime.UtcNow; + if (consumer.Config.RateLimitBps > 0) + { + if (consumer.NextPushDataAvailableAtUtc > availableAtUtc) + availableAtUtc = consumer.NextPushDataAvailableAtUtc; + + var delayMs = (long)Math.Ceiling((double)message.Payload.Length * 1000 / consumer.Config.RateLimitBps); + consumer.NextPushDataAvailableAtUtc = availableAtUtc.AddMilliseconds(Math.Max(delayMs, 1)); + } + consumer.PushFrames.Enqueue(new PushFrame { IsData = true, Message = message, + AvailableAtUtc = availableAtUtc, }); if (consumer.Config.AckPolicy is AckPolicy.Explicit or AckPolicy.All) consumer.AckProcessor.Register(message.Sequence, consumer.Config.AckWaitMs); + if (consumer.Config.FlowControl) + { + consumer.PushFrames.Enqueue(new PushFrame + { + IsFlowControl = true, + AvailableAtUtc = availableAtUtc, + }); + } + if (consumer.Config.HeartbeatMs > 0) { consumer.PushFrames.Enqueue(new PushFrame { IsHeartbeat = true, + AvailableAtUtc = availableAtUtc, }); } } @@ -29,6 +50,8 @@ public sealed class PushConsumerEngine public sealed class PushFrame { public bool IsData { get; init; } + public bool IsFlowControl { get; init; } public bool IsHeartbeat { get; init; } public StoredMessage? Message { get; init; } + public DateTime AvailableAtUtc { get; init; } = DateTime.UtcNow; } diff --git a/src/NATS.Server/JetStream/JetStreamService.cs b/src/NATS.Server/JetStream/JetStreamService.cs index 939325f..56fbee6 100644 --- a/src/NATS.Server/JetStream/JetStreamService.cs +++ b/src/NATS.Server/JetStream/JetStreamService.cs @@ -1,15 +1,18 @@ using NATS.Server.Configuration; +using NATS.Server; namespace NATS.Server.JetStream; public sealed class JetStreamService : IAsyncDisposable { private readonly JetStreamOptions _options; + public InternalClient? InternalClient { get; } public bool IsRunning { get; private set; } - public JetStreamService(JetStreamOptions options) + public JetStreamService(JetStreamOptions options, InternalClient? internalClient = null) { _options = options; + InternalClient = internalClient; } public Task StartAsync(CancellationToken ct) diff --git a/src/NATS.Server/JetStream/MirrorSource/MirrorCoordinator.cs b/src/NATS.Server/JetStream/MirrorSource/MirrorCoordinator.cs index d66ab04..b439d51 100644 --- a/src/NATS.Server/JetStream/MirrorSource/MirrorCoordinator.cs +++ b/src/NATS.Server/JetStream/MirrorSource/MirrorCoordinator.cs @@ -5,12 +5,18 @@ namespace NATS.Server.JetStream.MirrorSource; public sealed class MirrorCoordinator { private readonly IStreamStore _targetStore; + public ulong LastOriginSequence { get; private set; } + public DateTime LastSyncUtc { get; private set; } public MirrorCoordinator(IStreamStore targetStore) { _targetStore = targetStore; } - public Task OnOriginAppendAsync(StoredMessage message, CancellationToken ct) - => _targetStore.AppendAsync(message.Subject, message.Payload, ct).AsTask(); + public async Task OnOriginAppendAsync(StoredMessage message, CancellationToken ct) + { + await _targetStore.AppendAsync(message.Subject, message.Payload, ct); + LastOriginSequence = message.Sequence; + LastSyncUtc = DateTime.UtcNow; + } } diff --git a/src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs b/src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs index c011ac5..0c8c669 100644 --- a/src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs +++ b/src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs @@ -1,16 +1,29 @@ using NATS.Server.JetStream.Storage; +using NATS.Server.JetStream.Models; namespace NATS.Server.JetStream.MirrorSource; public sealed class SourceCoordinator { private readonly IStreamStore _targetStore; + private readonly StreamSourceConfig _sourceConfig; + public ulong LastOriginSequence { get; private set; } + public DateTime LastSyncUtc { get; private set; } - public SourceCoordinator(IStreamStore targetStore) + public SourceCoordinator(IStreamStore targetStore, StreamSourceConfig sourceConfig) { _targetStore = targetStore; + _sourceConfig = sourceConfig; } - public Task OnOriginAppendAsync(StoredMessage message, CancellationToken ct) - => _targetStore.AppendAsync(message.Subject, message.Payload, ct).AsTask(); + public async Task OnOriginAppendAsync(StoredMessage message, CancellationToken ct) + { + var subject = message.Subject; + if (!string.IsNullOrWhiteSpace(_sourceConfig.SubjectTransformPrefix)) + subject = $"{_sourceConfig.SubjectTransformPrefix}{subject}"; + + await _targetStore.AppendAsync(subject, message.Payload, ct); + LastOriginSequence = message.Sequence; + LastSyncUtc = DateTime.UtcNow; + } } diff --git a/src/NATS.Server/JetStream/Models/ConsumerConfig.cs b/src/NATS.Server/JetStream/Models/ConsumerConfig.cs index 6fd1ff9..1dbbbc7 100644 --- a/src/NATS.Server/JetStream/Models/ConsumerConfig.cs +++ b/src/NATS.Server/JetStream/Models/ConsumerConfig.cs @@ -8,12 +8,17 @@ public sealed class ConsumerConfig public List FilterSubjects { get; set; } = []; public AckPolicy AckPolicy { get; set; } = AckPolicy.None; public DeliverPolicy DeliverPolicy { get; set; } = DeliverPolicy.All; + public ulong OptStartSeq { get; set; } + public DateTime? OptStartTimeUtc { get; set; } public ReplayPolicy ReplayPolicy { get; set; } = ReplayPolicy.Instant; public int AckWaitMs { get; set; } = 30_000; public int MaxDeliver { get; set; } = 1; public int MaxAckPending { get; set; } public bool Push { get; set; } public int HeartbeatMs { get; set; } + public List BackOffMs { get; set; } = []; + public bool FlowControl { get; set; } + public long RateLimitBps { get; set; } } public enum AckPolicy diff --git a/src/NATS.Server/JetStream/Models/StreamConfig.cs b/src/NATS.Server/JetStream/Models/StreamConfig.cs index 5b1fb14..3cd2dd8 100644 --- a/src/NATS.Server/JetStream/Models/StreamConfig.cs +++ b/src/NATS.Server/JetStream/Models/StreamConfig.cs @@ -8,7 +8,13 @@ public sealed class StreamConfig public long MaxBytes { get; set; } public int MaxMsgsPer { get; set; } public int MaxAgeMs { get; set; } + public int MaxMsgSize { get; set; } public int MaxConsumers { get; set; } + public int DuplicateWindowMs { get; set; } + public bool Sealed { get; set; } + public bool DenyDelete { get; set; } + public bool DenyPurge { get; set; } + public bool AllowDirect { get; set; } public RetentionPolicy Retention { get; set; } = RetentionPolicy.Limits; public DiscardPolicy Discard { get; set; } = DiscardPolicy.Old; public StorageType Storage { get; set; } = StorageType.Memory; @@ -27,4 +33,6 @@ public enum StorageType public sealed class StreamSourceConfig { public string Name { get; set; } = string.Empty; + public string? SubjectTransformPrefix { get; set; } + public string? SourceAccount { get; set; } } diff --git a/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs b/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs index 486c35e..b294a14 100644 --- a/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs +++ b/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs @@ -34,7 +34,7 @@ public sealed class JetStreamPublisher return true; } - if (_preconditions.IsDuplicate(options.MsgId, out var existingSequence)) + if (_preconditions.IsDuplicate(options.MsgId, stream.Config.DuplicateWindowMs, out var existingSequence)) { ack = new PubAck { @@ -47,6 +47,7 @@ public sealed class JetStreamPublisher var captured = _streamManager.Capture(subject, payload); ack = captured ?? new PubAck(); _preconditions.Record(options.MsgId, ack.Seq); + _preconditions.TrimOlderThan(stream.Config.DuplicateWindowMs); return true; } } diff --git a/src/NATS.Server/JetStream/Publish/PublishPreconditions.cs b/src/NATS.Server/JetStream/Publish/PublishPreconditions.cs index fb573b3..6566583 100644 --- a/src/NATS.Server/JetStream/Publish/PublishPreconditions.cs +++ b/src/NATS.Server/JetStream/Publish/PublishPreconditions.cs @@ -4,15 +4,26 @@ namespace NATS.Server.JetStream.Publish; public sealed class PublishPreconditions { - private readonly ConcurrentDictionary _dedupe = new(StringComparer.Ordinal); + private readonly ConcurrentDictionary _dedupe = new(StringComparer.Ordinal); - public bool IsDuplicate(string? msgId, out ulong existingSequence) + public bool IsDuplicate(string? msgId, int duplicateWindowMs, out ulong existingSequence) { existingSequence = 0; if (string.IsNullOrEmpty(msgId)) return false; - return _dedupe.TryGetValue(msgId, out existingSequence); + if (!_dedupe.TryGetValue(msgId, out var entry)) + return false; + + if (duplicateWindowMs > 0 + && DateTime.UtcNow - entry.TimestampUtc > TimeSpan.FromMilliseconds(duplicateWindowMs)) + { + _dedupe.TryRemove(msgId, out _); + return false; + } + + existingSequence = entry.Sequence; + return true; } public void Record(string? msgId, ulong sequence) @@ -20,9 +31,24 @@ public sealed class PublishPreconditions if (string.IsNullOrEmpty(msgId)) return; - _dedupe[msgId] = sequence; + _dedupe[msgId] = new DedupeEntry(sequence, DateTime.UtcNow); + } + + public void TrimOlderThan(int duplicateWindowMs) + { + if (duplicateWindowMs <= 0) + return; + + var cutoff = DateTime.UtcNow.AddMilliseconds(-duplicateWindowMs); + foreach (var (key, entry) in _dedupe) + { + if (entry.TimestampUtc < cutoff) + _dedupe.TryRemove(key, out _); + } } public bool CheckExpectedLastSeq(ulong expectedLastSeq, ulong actualLastSeq) => expectedLastSeq == 0 || expectedLastSeq == actualLastSeq; + + private readonly record struct DedupeEntry(ulong Sequence, DateTime TimestampUtc); } diff --git a/src/NATS.Server/JetStream/Storage/FileStore.cs b/src/NATS.Server/JetStream/Storage/FileStore.cs index 677f30a..fc57f26 100644 --- a/src/NATS.Server/JetStream/Storage/FileStore.cs +++ b/src/NATS.Server/JetStream/Storage/FileStore.cs @@ -1,3 +1,4 @@ +using System.Text; using System.Text.Json; using NATS.Server.JetStream.Models; @@ -5,12 +6,23 @@ namespace NATS.Server.JetStream.Storage; public sealed class FileStore : IStreamStore, IAsyncDisposable { + private readonly FileStoreOptions _options; private readonly string _dataFilePath; private readonly Dictionary _messages = new(); + private readonly Dictionary _index = new(); private ulong _last; + private int _blockCount; + private long _activeBlockBytes; + private long _writeOffset; + + public int BlockCount => _messages.Count == 0 ? 0 : Math.Max(_blockCount, 1); public FileStore(FileStoreOptions options) { + _options = options; + if (_options.BlockSizeBytes <= 0) + _options.BlockSizeBytes = 64 * 1024; + Directory.CreateDirectory(options.Directory); _dataFilePath = Path.Combine(options.Directory, "messages.jsonl"); LoadExisting(); @@ -18,6 +30,8 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable public async ValueTask AppendAsync(string subject, ReadOnlyMemory payload, CancellationToken ct) { + PruneExpired(DateTime.UtcNow); + _last++; var stored = new StoredMessage { @@ -36,6 +50,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable TimestampUtc = stored.TimestampUtc, }); await File.AppendAllTextAsync(_dataFilePath, line + Environment.NewLine, ct); + + var recordBytes = Encoding.UTF8.GetByteCount(line + Environment.NewLine); + TrackBlockForRecord(recordBytes, stored.Sequence); return _last; } @@ -54,6 +71,14 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable return ValueTask.FromResult(match); } + public ValueTask> ListAsync(CancellationToken ct) + { + var messages = _messages.Values + .OrderBy(m => m.Sequence) + .ToArray(); + return ValueTask.FromResult>(messages); + } + public ValueTask RemoveAsync(ulong sequence, CancellationToken ct) { var removed = _messages.Remove(sequence); @@ -65,7 +90,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable public ValueTask PurgeAsync(CancellationToken ct) { _messages.Clear(); + _index.Clear(); _last = 0; + _blockCount = 0; + _activeBlockBytes = 0; + _writeOffset = 0; if (File.Exists(_dataFilePath)) File.Delete(_dataFilePath); return ValueTask.CompletedTask; @@ -90,7 +119,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable public ValueTask RestoreSnapshotAsync(ReadOnlyMemory snapshot, CancellationToken ct) { _messages.Clear(); + _index.Clear(); _last = 0; + _blockCount = 0; + _activeBlockBytes = 0; + _writeOffset = 0; if (!snapshot.IsEmpty) { @@ -159,29 +192,83 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable Sequence = record.Sequence, Subject = record.Subject ?? string.Empty, Payload = Convert.FromBase64String(record.PayloadBase64 ?? string.Empty), + TimestampUtc = record.TimestampUtc, }; _messages[message.Sequence] = message; if (message.Sequence > _last) _last = message.Sequence; + + var recordBytes = Encoding.UTF8.GetByteCount(line + Environment.NewLine); + TrackBlockForRecord(recordBytes, message.Sequence); } + + PruneExpired(DateTime.UtcNow); } private void RewriteDataFile() { - var lines = new List(_messages.Count); + Directory.CreateDirectory(Path.GetDirectoryName(_dataFilePath)!); + _index.Clear(); + _blockCount = 0; + _activeBlockBytes = 0; + _writeOffset = 0; + + using var stream = new FileStream(_dataFilePath, FileMode.Create, FileAccess.Write, FileShare.Read); + using var writer = new StreamWriter(stream, Encoding.UTF8); + foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value)) { - lines.Add(JsonSerializer.Serialize(new FileRecord + var line = JsonSerializer.Serialize(new FileRecord { Sequence = message.Sequence, Subject = message.Subject, PayloadBase64 = Convert.ToBase64String(message.Payload.ToArray()), TimestampUtc = message.TimestampUtc, - })); + }); + + writer.WriteLine(line); + var recordBytes = Encoding.UTF8.GetByteCount(line + Environment.NewLine); + TrackBlockForRecord(recordBytes, message.Sequence); } - File.WriteAllLines(_dataFilePath, lines); + writer.Flush(); + } + + private void TrackBlockForRecord(int recordBytes, ulong sequence) + { + if (_blockCount == 0) + _blockCount = 1; + + if (_activeBlockBytes > 0 && _activeBlockBytes + recordBytes > _options.BlockSizeBytes) + { + _blockCount++; + _activeBlockBytes = 0; + } + + _index[sequence] = new BlockPointer(_blockCount, _writeOffset); + _activeBlockBytes += recordBytes; + _writeOffset += recordBytes; + } + + private void PruneExpired(DateTime nowUtc) + { + if (_options.MaxAgeMs <= 0) + return; + + var cutoff = nowUtc.AddMilliseconds(-_options.MaxAgeMs); + var expired = _messages + .Where(kv => kv.Value.TimestampUtc < cutoff) + .Select(kv => kv.Key) + .ToArray(); + + if (expired.Length == 0) + return; + + foreach (var sequence in expired) + _messages.Remove(sequence); + + RewriteDataFile(); } private sealed class FileRecord @@ -191,4 +278,6 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable public string? PayloadBase64 { get; init; } public DateTime TimestampUtc { get; init; } } + + private readonly record struct BlockPointer(int BlockId, long Offset); } diff --git a/src/NATS.Server/JetStream/Storage/FileStoreBlock.cs b/src/NATS.Server/JetStream/Storage/FileStoreBlock.cs index 4d4689f..76d723a 100644 --- a/src/NATS.Server/JetStream/Storage/FileStoreBlock.cs +++ b/src/NATS.Server/JetStream/Storage/FileStoreBlock.cs @@ -2,5 +2,7 @@ namespace NATS.Server.JetStream.Storage; public sealed class FileStoreBlock { + public int Id { get; init; } public required string Path { get; init; } + public long SizeBytes { get; set; } } diff --git a/src/NATS.Server/JetStream/Storage/FileStoreOptions.cs b/src/NATS.Server/JetStream/Storage/FileStoreOptions.cs index 17bf85d..28eca98 100644 --- a/src/NATS.Server/JetStream/Storage/FileStoreOptions.cs +++ b/src/NATS.Server/JetStream/Storage/FileStoreOptions.cs @@ -3,4 +3,6 @@ namespace NATS.Server.JetStream.Storage; public sealed class FileStoreOptions { public string Directory { get; set; } = string.Empty; + public int BlockSizeBytes { get; set; } = 64 * 1024; + public int MaxAgeMs { get; set; } } diff --git a/src/NATS.Server/JetStream/Storage/IStreamStore.cs b/src/NATS.Server/JetStream/Storage/IStreamStore.cs index ca64553..4498e7b 100644 --- a/src/NATS.Server/JetStream/Storage/IStreamStore.cs +++ b/src/NATS.Server/JetStream/Storage/IStreamStore.cs @@ -7,6 +7,7 @@ public interface IStreamStore ValueTask AppendAsync(string subject, ReadOnlyMemory payload, CancellationToken ct); ValueTask LoadAsync(ulong sequence, CancellationToken ct); ValueTask LoadLastBySubjectAsync(string subject, CancellationToken ct); + ValueTask> ListAsync(CancellationToken ct); ValueTask RemoveAsync(ulong sequence, CancellationToken ct); ValueTask PurgeAsync(CancellationToken ct); ValueTask CreateSnapshotAsync(CancellationToken ct); diff --git a/src/NATS.Server/JetStream/Storage/MemStore.cs b/src/NATS.Server/JetStream/Storage/MemStore.cs index 8bd1485..1e6d111 100644 --- a/src/NATS.Server/JetStream/Storage/MemStore.cs +++ b/src/NATS.Server/JetStream/Storage/MemStore.cs @@ -54,6 +54,17 @@ public sealed class MemStore : IStreamStore } } + public ValueTask> ListAsync(CancellationToken ct) + { + lock (_gate) + { + var messages = _messages.Values + .OrderBy(m => m.Sequence) + .ToArray(); + return ValueTask.FromResult>(messages); + } + } + public ValueTask RemoveAsync(ulong sequence, CancellationToken ct) { lock (_gate) diff --git a/src/NATS.Server/JetStream/StreamManager.cs b/src/NATS.Server/JetStream/StreamManager.cs index f3c407c..730ae61 100644 --- a/src/NATS.Server/JetStream/StreamManager.cs +++ b/src/NATS.Server/JetStream/StreamManager.cs @@ -93,6 +93,8 @@ public sealed class StreamManager { if (!_streams.TryGetValue(name, out var stream)) return false; + if (stream.Config.Sealed || stream.Config.DenyPurge) + return false; stream.Store.PurgeAsync(default).GetAwaiter().GetResult(); return true; @@ -110,6 +112,8 @@ public sealed class StreamManager { if (!_streams.TryGetValue(name, out var stream)) return false; + if (stream.Config.Sealed || stream.Config.DenyDelete) + return false; return stream.Store.RemoveAsync(sequence, default).GetAwaiter().GetResult(); } @@ -156,6 +160,17 @@ public sealed class StreamManager if (stream == null) return null; + if (stream.Config.MaxMsgSize > 0 && payload.Length > stream.Config.MaxMsgSize) + { + return new PubAck + { + Stream = stream.Config.Name, + ErrorCode = 10054, + }; + } + + PruneExpiredMessages(stream, DateTime.UtcNow); + var stateBefore = stream.Store.GetStateAsync(default).GetAwaiter().GetResult(); if (stream.Config.MaxBytes > 0 && (long)stateBefore.Bytes + payload.Length > stream.Config.MaxBytes) { @@ -179,7 +194,7 @@ public sealed class StreamManager _ = replicaGroup.ProposeAsync($"PUB {subject}", default).GetAwaiter().GetResult(); var seq = stream.Store.AppendAsync(subject, payload, default).GetAwaiter().GetResult(); - EnforceLimits(stream); + EnforceRuntimePolicies(stream, DateTime.UtcNow); var stored = stream.Store.LoadAsync(seq, default).GetAwaiter().GetResult(); if (stored != null) ReplicateIfConfigured(stream.Config.Name, stored); @@ -209,14 +224,25 @@ public sealed class StreamManager MaxBytes = config.MaxBytes, MaxMsgsPer = config.MaxMsgsPer, MaxAgeMs = config.MaxAgeMs, + MaxMsgSize = config.MaxMsgSize, MaxConsumers = config.MaxConsumers, + DuplicateWindowMs = config.DuplicateWindowMs, + Sealed = config.Sealed, + DenyDelete = config.DenyDelete, + DenyPurge = config.DenyPurge, + AllowDirect = config.AllowDirect, Retention = config.Retention, Discard = config.Discard, Storage = config.Storage, Replicas = config.Replicas, Mirror = config.Mirror, Source = config.Source, - Sources = config.Sources.Count == 0 ? [] : [.. config.Sources.Select(s => new StreamSourceConfig { Name = s.Name })], + Sources = config.Sources.Count == 0 ? [] : [.. config.Sources.Select(s => new StreamSourceConfig + { + Name = s.Name, + SubjectTransformPrefix = s.SubjectTransformPrefix, + SourceAccount = s.SourceAccount, + })], }; return copy; @@ -235,6 +261,13 @@ public sealed class StreamManager }; } + private static void EnforceRuntimePolicies(StreamHandle stream, DateTime nowUtc) + { + EnforceLimits(stream); + PrunePerSubject(stream); + PruneExpiredMessages(stream, nowUtc); + } + private static void EnforceLimits(StreamHandle stream) { if (stream.Config.MaxMsgs <= 0) @@ -251,6 +284,34 @@ public sealed class StreamManager fileStore.TrimToMaxMessages(maxMessages); } + private static void PrunePerSubject(StreamHandle stream) + { + if (stream.Config.MaxMsgsPer <= 0) + return; + + var maxPerSubject = stream.Config.MaxMsgsPer; + var messages = stream.Store.ListAsync(default).GetAwaiter().GetResult(); + foreach (var group in messages.GroupBy(m => m.Subject, StringComparer.Ordinal)) + { + foreach (var message in group.OrderByDescending(m => m.Sequence).Skip(maxPerSubject)) + stream.Store.RemoveAsync(message.Sequence, default).GetAwaiter().GetResult(); + } + } + + private static void PruneExpiredMessages(StreamHandle stream, DateTime nowUtc) + { + if (stream.Config.MaxAgeMs <= 0) + return; + + var cutoff = nowUtc.AddMilliseconds(-stream.Config.MaxAgeMs); + var messages = stream.Store.ListAsync(default).GetAwaiter().GetResult(); + foreach (var message in messages) + { + if (message.TimestampUtc < cutoff) + stream.Store.RemoveAsync(message.Sequence, default).GetAwaiter().GetResult(); + } + } + private void RebuildReplicationCoordinators() { _mirrorsByOrigin.Clear(); @@ -269,7 +330,7 @@ public sealed class StreamManager && _streams.TryGetValue(stream.Config.Source, out _)) { var list = _sourcesByOrigin.GetOrAdd(stream.Config.Source, _ => []); - list.Add(new SourceCoordinator(stream.Store)); + list.Add(new SourceCoordinator(stream.Store, new StreamSourceConfig { Name = stream.Config.Source })); } if (stream.Config.Sources.Count > 0) @@ -280,7 +341,7 @@ public sealed class StreamManager continue; var list = _sourcesByOrigin.GetOrAdd(source.Name, _ => []); - list.Add(new SourceCoordinator(stream.Store)); + list.Add(new SourceCoordinator(stream.Store, source)); } } } @@ -320,6 +381,7 @@ public sealed class StreamManager StorageType.File => new FileStore(new FileStoreOptions { Directory = Path.Combine(Path.GetTempPath(), "natsdotnet-js-store", config.Name), + MaxAgeMs = config.MaxAgeMs, }), _ => new MemStore(), }; diff --git a/src/NATS.Server/JetStream/Validation/JetStreamConfigValidator.cs b/src/NATS.Server/JetStream/Validation/JetStreamConfigValidator.cs index ed7ff60..f673e42 100644 --- a/src/NATS.Server/JetStream/Validation/JetStreamConfigValidator.cs +++ b/src/NATS.Server/JetStream/Validation/JetStreamConfigValidator.cs @@ -11,6 +11,12 @@ public static class JetStreamConfigValidator if (config.Retention == RetentionPolicy.WorkQueue && config.MaxConsumers == 0) return ValidationResult.Invalid("workqueue retention requires max consumers > 0"); + if (config.MaxMsgSize < 0) + return ValidationResult.Invalid("max_msg_size must be >= 0"); + if (config.MaxMsgsPer < 0) + return ValidationResult.Invalid("max_msgs_per must be >= 0"); + if (config.MaxAgeMs < 0) + return ValidationResult.Invalid("max_age_ms must be >= 0"); return ValidationResult.Valid(); } diff --git a/src/NATS.Server/LeafNodes/LeafConnection.cs b/src/NATS.Server/LeafNodes/LeafConnection.cs index cc0a834..6044c3b 100644 --- a/src/NATS.Server/LeafNodes/LeafConnection.cs +++ b/src/NATS.Server/LeafNodes/LeafConnection.cs @@ -42,11 +42,11 @@ public sealed class LeafConnection(Socket socket) : IAsyncDisposable public Task WaitUntilClosedAsync(CancellationToken ct) => _loopTask?.WaitAsync(ct) ?? Task.CompletedTask; - public Task SendLsPlusAsync(string subject, string? queue, CancellationToken ct) - => WriteLineAsync(queue is { Length: > 0 } ? $"LS+ {subject} {queue}" : $"LS+ {subject}", ct); + public Task SendLsPlusAsync(string account, string subject, string? queue, CancellationToken ct) + => WriteLineAsync(queue is { Length: > 0 } ? $"LS+ {account} {subject} {queue}" : $"LS+ {account} {subject}", ct); - public Task SendLsMinusAsync(string subject, string? queue, CancellationToken ct) - => WriteLineAsync(queue is { Length: > 0 } ? $"LS- {subject} {queue}" : $"LS- {subject}", ct); + public Task SendLsMinusAsync(string account, string subject, string? queue, CancellationToken ct) + => WriteLineAsync(queue is { Length: > 0 } ? $"LS- {account} {subject} {queue}" : $"LS- {account} {subject}", ct); public async Task SendMessageAsync(string subject, string? replyTo, ReadOnlyMemory payload, CancellationToken ct) { @@ -94,10 +94,9 @@ public sealed class LeafConnection(Socket socket) : IAsyncDisposable if (line.StartsWith("LS+ ", StringComparison.Ordinal)) { var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries); - if (parts.Length >= 2 && RemoteSubscriptionReceived != null) + if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var account, out var parsedSubject, out var queue)) { - var queue = parts.Length >= 3 ? parts[2] : null; - await RemoteSubscriptionReceived(new RemoteSubscription(parts[1], queue, RemoteId ?? string.Empty)); + await RemoteSubscriptionReceived(new RemoteSubscription(parsedSubject, queue, RemoteId ?? string.Empty, account)); } continue; } @@ -105,10 +104,9 @@ public sealed class LeafConnection(Socket socket) : IAsyncDisposable if (line.StartsWith("LS- ", StringComparison.Ordinal)) { var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries); - if (parts.Length >= 2 && RemoteSubscriptionReceived != null) + if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var account, out var parsedSubject, out var queue)) { - var queue = parts.Length >= 3 ? parts[2] : null; - await RemoteSubscriptionReceived(RemoteSubscription.Removal(parts[1], queue, RemoteId ?? string.Empty)); + await RemoteSubscriptionReceived(RemoteSubscription.Removal(parsedSubject, queue, RemoteId ?? string.Empty, account)); } continue; } @@ -186,6 +184,35 @@ public sealed class LeafConnection(Socket socket) : IAsyncDisposable throw new InvalidOperationException("Leaf handshake missing id"); return id; } + + private static bool TryParseAccountScopedInterest(string[] parts, out string account, out string subject, out string? queue) + { + account = "$G"; + subject = string.Empty; + queue = null; + + if (parts.Length < 2) + return false; + + // New format: LS+ [queue] + // Legacy format: LS+ [queue] + if (parts.Length >= 3 && !LooksLikeSubject(parts[1])) + { + account = parts[1]; + subject = parts[2]; + queue = parts.Length >= 4 ? parts[3] : null; + return true; + } + + subject = parts[1]; + queue = parts.Length >= 3 ? parts[2] : null; + return true; + } + + private static bool LooksLikeSubject(string token) + => token.Contains('.', StringComparison.Ordinal) + || token.Contains('*', StringComparison.Ordinal) + || token.Contains('>', StringComparison.Ordinal); } public sealed record LeafMessage(string Subject, string? ReplyTo, ReadOnlyMemory Payload); diff --git a/src/NATS.Server/LeafNodes/LeafLoopDetector.cs b/src/NATS.Server/LeafNodes/LeafLoopDetector.cs new file mode 100644 index 0000000..90f5359 --- /dev/null +++ b/src/NATS.Server/LeafNodes/LeafLoopDetector.cs @@ -0,0 +1,26 @@ +namespace NATS.Server.LeafNodes; + +public static class LeafLoopDetector +{ + private const string LeafLoopPrefix = "$LDS."; + + public static string Mark(string subject, string serverId) + => $"{LeafLoopPrefix}{serverId}.{subject}"; + + public static bool IsLooped(string subject, string localServerId) + => subject.StartsWith($"{LeafLoopPrefix}{localServerId}.", StringComparison.Ordinal); + + public static bool TryUnmark(string subject, out string unmarked) + { + unmarked = subject; + if (!subject.StartsWith(LeafLoopPrefix, StringComparison.Ordinal)) + return false; + + var serverSeparator = subject.IndexOf('.', LeafLoopPrefix.Length); + if (serverSeparator < 0 || serverSeparator == subject.Length - 1) + return false; + + unmarked = subject[(serverSeparator + 1)..]; + return true; + } +} diff --git a/src/NATS.Server/LeafNodes/LeafNodeManager.cs b/src/NATS.Server/LeafNodes/LeafNodeManager.cs index fb4bfc0..11546d1 100644 --- a/src/NATS.Server/LeafNodes/LeafNodeManager.cs +++ b/src/NATS.Server/LeafNodes/LeafNodeManager.cs @@ -64,16 +64,16 @@ public sealed class LeafNodeManager : IAsyncDisposable await connection.SendMessageAsync(subject, replyTo, payload, ct); } - public void PropagateLocalSubscription(string subject, string? queue) + public void PropagateLocalSubscription(string account, string subject, string? queue) { foreach (var connection in _connections.Values) - _ = connection.SendLsPlusAsync(subject, queue, _cts?.Token ?? CancellationToken.None); + _ = connection.SendLsPlusAsync(account, subject, queue, _cts?.Token ?? CancellationToken.None); } - public void PropagateLocalUnsubscription(string subject, string? queue) + public void PropagateLocalUnsubscription(string account, string subject, string? queue) { foreach (var connection in _connections.Values) - _ = connection.SendLsMinusAsync(subject, queue, _cts?.Token ?? CancellationToken.None); + _ = connection.SendLsMinusAsync(account, subject, queue, _cts?.Token ?? CancellationToken.None); } public async ValueTask DisposeAsync() diff --git a/src/NATS.Server/NatsClient.cs b/src/NATS.Server/NatsClient.cs index 239a489..b64086e 100644 --- a/src/NATS.Server/NatsClient.cs +++ b/src/NATS.Server/NatsClient.cs @@ -541,7 +541,7 @@ public sealed class NatsClient : INatsClient, IDisposable Account?.SubList.Insert(sub); if (Router is NatsServer server) - server.OnLocalSubscription(sub.Subject, sub.Queue); + server.OnLocalSubscription(Account?.Name ?? Account.GlobalAccountName, sub.Subject, sub.Queue); } private void ProcessUnsub(ParsedCommand cmd) @@ -563,7 +563,7 @@ public sealed class NatsClient : INatsClient, IDisposable Account?.SubList.Remove(sub); if (Router is NatsServer server) - server.OnLocalUnsubscription(sub.Subject, sub.Queue); + server.OnLocalUnsubscription(Account?.Name ?? Account.GlobalAccountName, sub.Subject, sub.Queue); } private void ProcessPub(ParsedCommand cmd, ref long localInMsgs, ref long localInBytes) diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index e40c2a3..716fe86 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -52,6 +52,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable private readonly RouteManager? _routeManager; private readonly GatewayManager? _gatewayManager; private readonly LeafNodeManager? _leafNodeManager; + private readonly InternalClient? _jetStreamInternalClient; private readonly JetStreamService? _jetStreamService; private readonly JetStreamApiRouter? _jetStreamApiRouter; private readonly StreamManager? _jetStreamStreamManager; @@ -97,6 +98,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable public string? ClusterListen => _routeManager?.ListenEndpoint; public string? GatewayListen => _gatewayManager?.ListenEndpoint; public string? LeafListen => _leafNodeManager?.ListenEndpoint; + public InternalClient? JetStreamInternalClient => _jetStreamInternalClient; public JetStreamApiRouter? JetStreamApiRouter => _jetStreamApiRouter; public int JetStreamStreams => _jetStreamStreamManager?.StreamNames.Count ?? 0; public int JetStreamConsumers => _jetStreamConsumerManager?.ConsumerCount ?? 0; @@ -107,6 +109,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable public IEnumerable GetAccounts() => _accounts.Values; public bool HasRemoteInterest(string subject) => _globalAccount.SubList.HasRemoteInterest(subject); + public bool HasRemoteInterest(string account, string subject) + => GetOrCreateAccount(account).SubList.HasRemoteInterest(account, subject); public bool TryCaptureJetStreamPublish(string subject, ReadOnlyMemory payload, out PubAck ack) { if (_jetStreamPublisher != null && _jetStreamPublisher.TryCapture(subject, payload, out ack)) @@ -390,7 +394,9 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable { _jetStreamStreamManager = new StreamManager(); _jetStreamConsumerManager = new ConsumerManager(); - _jetStreamService = new JetStreamService(options.JetStream); + var jsClientId = Interlocked.Increment(ref _nextClientId); + _jetStreamInternalClient = new InternalClient(jsClientId, ClientKind.JetStream, _systemAccount); + _jetStreamService = new JetStreamService(options.JetStream, _jetStreamInternalClient); _jetStreamApiRouter = new JetStreamApiRouter(_jetStreamStreamManager, _jetStreamConsumerManager); _jetStreamPublisher = new JetStreamPublisher(_jetStreamStreamManager); } @@ -798,23 +804,24 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable } } - public void OnLocalSubscription(string subject, string? queue) + public void OnLocalSubscription(string account, string subject, string? queue) { - _routeManager?.PropagateLocalSubscription(subject, queue); - _gatewayManager?.PropagateLocalSubscription(subject, queue); - _leafNodeManager?.PropagateLocalSubscription(subject, queue); + _routeManager?.PropagateLocalSubscription(account, subject, queue); + _gatewayManager?.PropagateLocalSubscription(account, subject, queue); + _leafNodeManager?.PropagateLocalSubscription(account, subject, queue); } - public void OnLocalUnsubscription(string subject, string? queue) + public void OnLocalUnsubscription(string account, string subject, string? queue) { - _routeManager?.PropagateLocalUnsubscription(subject, queue); - _gatewayManager?.PropagateLocalUnsubscription(subject, queue); - _leafNodeManager?.PropagateLocalUnsubscription(subject, queue); + _routeManager?.PropagateLocalUnsubscription(account, subject, queue); + _gatewayManager?.PropagateLocalUnsubscription(account, subject, queue); + _leafNodeManager?.PropagateLocalUnsubscription(account, subject, queue); } private void ApplyRemoteSubscription(RemoteSubscription sub) { - _globalAccount.SubList.ApplyRemoteSub(sub); + var account = GetOrCreateAccount(sub.Account); + account.SubList.ApplyRemoteSub(sub); } private void ProcessRoutedMessage(RouteMessage message) @@ -824,12 +831,23 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable private void ProcessGatewayMessage(GatewayMessage message) { - DeliverRemoteMessage(message.Subject, message.ReplyTo, message.Payload); + var replyTo = message.ReplyTo; + if (ReplyMapper.TryRestoreGatewayReply(replyTo, out var restoredReply)) + replyTo = restoredReply; + + DeliverRemoteMessage(message.Subject, replyTo, message.Payload); } private void ProcessLeafMessage(LeafMessage message) { - DeliverRemoteMessage(message.Subject, message.ReplyTo, message.Payload); + if (LeafLoopDetector.IsLooped(message.Subject, ServerId)) + return; + + var subject = message.Subject; + if (LeafLoopDetector.TryUnmark(subject, out var unmarked)) + subject = unmarked; + + DeliverRemoteMessage(subject, message.ReplyTo, message.Payload); } private void DeliverRemoteMessage(string subject, string? replyTo, ReadOnlyMemory payload) @@ -883,12 +901,20 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable } } - if (_routeManager != null && _globalAccount.SubList.HasRemoteInterest(subject)) + var senderAccount = sender.Account ?? _globalAccount; + if (_routeManager != null && senderAccount.SubList.HasRemoteInterest(senderAccount.Name, subject)) _routeManager.ForwardRoutedMessageAsync(subject, replyTo, payload, default).GetAwaiter().GetResult(); - if (_gatewayManager != null && _globalAccount.SubList.HasRemoteInterest(subject)) - _gatewayManager.ForwardMessageAsync(subject, replyTo, payload, default).GetAwaiter().GetResult(); - if (_leafNodeManager != null && _globalAccount.SubList.HasRemoteInterest(subject)) - _leafNodeManager.ForwardMessageAsync(subject, replyTo, payload, default).GetAwaiter().GetResult(); + if (_gatewayManager != null && senderAccount.SubList.HasRemoteInterest(senderAccount.Name, subject)) + { + var mappedReplyTo = ReplyMapper.ToGatewayReply(replyTo, ServerId); + _gatewayManager.ForwardMessageAsync(subject, mappedReplyTo, payload, default).GetAwaiter().GetResult(); + } + + if (_leafNodeManager != null && senderAccount.SubList.HasRemoteInterest(senderAccount.Name, subject)) + { + var markedSubject = LeafLoopDetector.Mark(subject, ServerId); + _leafNodeManager.ForwardMessageAsync(markedSubject, replyTo, payload, default).GetAwaiter().GetResult(); + } var subList = sender.Account?.SubList ?? _globalAccount.SubList; var result = subList.Match(subject); diff --git a/src/NATS.Server/Raft/RaftNode.cs b/src/NATS.Server/Raft/RaftNode.cs index e67388e..9eafdcd 100644 --- a/src/NATS.Server/Raft/RaftNode.cs +++ b/src/NATS.Server/Raft/RaftNode.cs @@ -8,11 +8,13 @@ public sealed class RaftNode private readonly RaftSnapshotStore _snapshotStore = new(); private readonly IRaftTransport? _transport; private readonly string? _persistDirectory; + private readonly HashSet _members = new(StringComparer.Ordinal); public string Id { get; } public int Term => TermState.CurrentTerm; public bool IsLeader => Role == RaftRole.Leader; public RaftRole Role { get; private set; } = RaftRole.Follower; + public IReadOnlyCollection Members => _members; public RaftTermState TermState { get; } = new(); public long AppliedIndex { get; set; } public RaftLog Log { get; private set; } = new(); @@ -22,14 +24,22 @@ public sealed class RaftNode Id = id; _transport = transport; _persistDirectory = persistDirectory; + _members.Add(id); } public void ConfigureCluster(IEnumerable peers) { _cluster.Clear(); _cluster.AddRange(peers); + _members.Clear(); + foreach (var peer in peers) + _members.Add(peer.Id); } + public void AddMember(string memberId) => _members.Add(memberId); + + public void RemoveMember(string memberId) => _members.Remove(memberId); + public void StartElection(int clusterSize) { Role = RaftRole.Candidate; @@ -48,6 +58,15 @@ public sealed class RaftNode return new VoteResponse { Granted = true }; } + public void ReceiveHeartbeat(int term) + { + if (term < TermState.CurrentTerm) + return; + + TermState.CurrentTerm = term; + Role = RaftRole.Follower; + } + public void ReceiveVote(VoteResponse response, int clusterSize = 3) { if (!response.Granted) diff --git a/src/NATS.Server/Raft/RaftReplicator.cs b/src/NATS.Server/Raft/RaftReplicator.cs index 98fef93..c0396ce 100644 --- a/src/NATS.Server/Raft/RaftReplicator.cs +++ b/src/NATS.Server/Raft/RaftReplicator.cs @@ -2,6 +2,9 @@ namespace NATS.Server.Raft; public sealed class RaftReplicator { + public static long BacktrackNextIndex(long nextIndex) + => Math.Max(1, nextIndex - 1); + public int Replicate(RaftLogEntry entry, IReadOnlyList followers) { var acknowledgements = 0; diff --git a/src/NATS.Server/Raft/RaftTransport.cs b/src/NATS.Server/Raft/RaftTransport.cs index 6bf024f..bf0ca44 100644 --- a/src/NATS.Server/Raft/RaftTransport.cs +++ b/src/NATS.Server/Raft/RaftTransport.cs @@ -4,6 +4,7 @@ public interface IRaftTransport { Task> AppendEntriesAsync(string leaderId, IReadOnlyList followerIds, RaftLogEntry entry, CancellationToken ct); Task RequestVoteAsync(string candidateId, string voterId, VoteRequest request, CancellationToken ct); + Task InstallSnapshotAsync(string leaderId, string followerId, RaftSnapshot snapshot, CancellationToken ct); } public sealed class InMemoryRaftTransport : IRaftTransport @@ -41,4 +42,23 @@ public sealed class InMemoryRaftTransport : IRaftTransport return Task.FromResult(new VoteResponse { Granted = false }); } + + public async Task InstallSnapshotAsync(string leaderId, string followerId, RaftSnapshot snapshot, CancellationToken ct) + { + _ = leaderId; + if (_nodes.TryGetValue(followerId, out var node)) + await node.InstallSnapshotAsync(snapshot, ct); + } + + public async Task AppendHeartbeatAsync(string leaderId, IReadOnlyList followerIds, int term, CancellationToken ct) + { + _ = leaderId; + foreach (var followerId in followerIds) + { + if (_nodes.TryGetValue(followerId, out var node)) + node.ReceiveHeartbeat(term); + } + + await Task.CompletedTask; + } } diff --git a/src/NATS.Server/Routes/RouteConnection.cs b/src/NATS.Server/Routes/RouteConnection.cs index c3aa18c..b6ceaa3 100644 --- a/src/NATS.Server/Routes/RouteConnection.cs +++ b/src/NATS.Server/Routes/RouteConnection.cs @@ -40,19 +40,19 @@ public sealed class RouteConnection(Socket socket) : IAsyncDisposable _frameLoopTask = Task.Run(() => ReadFramesAsync(linked.Token), linked.Token); } - public async Task SendRsPlusAsync(string subject, string? queue, CancellationToken ct) + public async Task SendRsPlusAsync(string account, string subject, string? queue, CancellationToken ct) { var frame = queue is { Length: > 0 } - ? $"RS+ {subject} {queue}" - : $"RS+ {subject}"; + ? $"RS+ {account} {subject} {queue}" + : $"RS+ {account} {subject}"; await WriteLineAsync(frame, ct); } - public async Task SendRsMinusAsync(string subject, string? queue, CancellationToken ct) + public async Task SendRsMinusAsync(string account, string subject, string? queue, CancellationToken ct) { var frame = queue is { Length: > 0 } - ? $"RS- {subject} {queue}" - : $"RS- {subject}"; + ? $"RS- {account} {subject} {queue}" + : $"RS- {account} {subject}"; await WriteLineAsync(frame, ct); } @@ -115,10 +115,9 @@ public sealed class RouteConnection(Socket socket) : IAsyncDisposable if (line.StartsWith("RS+ ", StringComparison.Ordinal)) { var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries); - if (parts.Length >= 2 && RemoteSubscriptionReceived != null) + if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var account, out var parsedSubject, out var queue)) { - var queue = parts.Length >= 3 ? parts[2] : null; - await RemoteSubscriptionReceived(new RemoteSubscription(parts[1], queue, RemoteServerId ?? string.Empty)); + await RemoteSubscriptionReceived(new RemoteSubscription(parsedSubject, queue, RemoteServerId ?? string.Empty, account)); } continue; } @@ -126,10 +125,9 @@ public sealed class RouteConnection(Socket socket) : IAsyncDisposable if (line.StartsWith("RS- ", StringComparison.Ordinal)) { var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries); - if (parts.Length >= 2 && RemoteSubscriptionReceived != null) + if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var account, out var parsedSubject, out var queue)) { - var queue = parts.Length >= 3 ? parts[2] : null; - await RemoteSubscriptionReceived(RemoteSubscription.Removal(parts[1], queue, RemoteServerId ?? string.Empty)); + await RemoteSubscriptionReceived(RemoteSubscription.Removal(parsedSubject, queue, RemoteServerId ?? string.Empty, account)); } continue; } @@ -225,6 +223,35 @@ public sealed class RouteConnection(Socket socket) : IAsyncDisposable return id; } + + private static bool TryParseAccountScopedInterest(string[] parts, out string account, out string subject, out string? queue) + { + account = "$G"; + subject = string.Empty; + queue = null; + + if (parts.Length < 2) + return false; + + // New format: RS+ [queue] + // Legacy format: RS+ [queue] + if (parts.Length >= 3 && !LooksLikeSubject(parts[1])) + { + account = parts[1]; + subject = parts[2]; + queue = parts.Length >= 4 ? parts[3] : null; + return true; + } + + subject = parts[1]; + queue = parts.Length >= 3 ? parts[2] : null; + return true; + } + + private static bool LooksLikeSubject(string token) + => token.Contains('.', StringComparison.Ordinal) + || token.Contains('*', StringComparison.Ordinal) + || token.Contains('>', StringComparison.Ordinal); } public sealed record RouteMessage(string Subject, string? ReplyTo, ReadOnlyMemory Payload); diff --git a/src/NATS.Server/Routes/RouteManager.cs b/src/NATS.Server/Routes/RouteManager.cs index 35812a0..79b60ab 100644 --- a/src/NATS.Server/Routes/RouteManager.cs +++ b/src/NATS.Server/Routes/RouteManager.cs @@ -86,24 +86,24 @@ public sealed class RouteManager : IAsyncDisposable _cts = null; } - public void PropagateLocalSubscription(string subject, string? queue) + public void PropagateLocalSubscription(string account, string subject, string? queue) { if (_routes.IsEmpty) return; foreach (var route in _routes.Values) { - _ = route.SendRsPlusAsync(subject, queue, _cts?.Token ?? CancellationToken.None); + _ = route.SendRsPlusAsync(account, subject, queue, _cts?.Token ?? CancellationToken.None); } } - public void PropagateLocalUnsubscription(string subject, string? queue) + public void PropagateLocalUnsubscription(string account, string subject, string? queue) { if (_routes.IsEmpty) return; foreach (var route in _routes.Values) - _ = route.SendRsMinusAsync(subject, queue, _cts?.Token ?? CancellationToken.None); + _ = route.SendRsMinusAsync(account, subject, queue, _cts?.Token ?? CancellationToken.None); } public async Task ForwardRoutedMessageAsync(string subject, string? replyTo, ReadOnlyMemory payload, CancellationToken ct) diff --git a/src/NATS.Server/Subscriptions/RemoteSubscription.cs b/src/NATS.Server/Subscriptions/RemoteSubscription.cs index 75d4269..61972f6 100644 --- a/src/NATS.Server/Subscriptions/RemoteSubscription.cs +++ b/src/NATS.Server/Subscriptions/RemoteSubscription.cs @@ -1,7 +1,12 @@ namespace NATS.Server.Subscriptions; -public sealed record RemoteSubscription(string Subject, string? Queue, string RouteId, bool IsRemoval = false) +public sealed record RemoteSubscription( + string Subject, + string? Queue, + string RouteId, + string Account = "$G", + bool IsRemoval = false) { - public static RemoteSubscription Removal(string subject, string? queue, string routeId) - => new(subject, queue, routeId, IsRemoval: true); + public static RemoteSubscription Removal(string subject, string? queue, string routeId, string account = "$G") + => new(subject, queue, routeId, account, IsRemoval: true); } diff --git a/src/NATS.Server/Subscriptions/SubList.cs b/src/NATS.Server/Subscriptions/SubList.cs index 96d51ae..1e904bb 100644 --- a/src/NATS.Server/Subscriptions/SubList.cs +++ b/src/NATS.Server/Subscriptions/SubList.cs @@ -102,7 +102,7 @@ public sealed class SubList : IDisposable _lock.EnterWriteLock(); try { - var key = $"{sub.RouteId}|{sub.Subject}|{sub.Queue}"; + var key = $"{sub.RouteId}|{sub.Account}|{sub.Subject}|{sub.Queue}"; if (sub.IsRemoval) _remoteSubs.Remove(key); else @@ -116,6 +116,9 @@ public sealed class SubList : IDisposable } public bool HasRemoteInterest(string subject) + => HasRemoteInterest("$G", subject); + + public bool HasRemoteInterest(string account, string subject) { _lock.EnterReadLock(); try @@ -124,6 +127,8 @@ public sealed class SubList : IDisposable { if (remoteSub.IsRemoval) continue; + if (!string.Equals(remoteSub.Account, account, StringComparison.Ordinal)) + continue; if (SubjectMatch.MatchLiteral(subject, remoteSub.Subject)) return true; diff --git a/tests/NATS.Server.Tests/DifferencesParityClosureTests.cs b/tests/NATS.Server.Tests/DifferencesParityClosureTests.cs new file mode 100644 index 0000000..f0cf00d --- /dev/null +++ b/tests/NATS.Server.Tests/DifferencesParityClosureTests.cs @@ -0,0 +1,16 @@ +namespace NATS.Server.Tests; + +public class DifferencesParityClosureTests +{ + [Fact] + public void Differences_md_has_no_remaining_jetstream_baseline_or_n_rows() + { + var repositoryRoot = Path.GetFullPath(Path.Combine(AppContext.BaseDirectory, "..", "..", "..", "..", "..")); + var differencesPath = Path.Combine(repositoryRoot, "differences.md"); + File.Exists(differencesPath).ShouldBeTrue(); + + var markdown = File.ReadAllText(differencesPath); + markdown.ShouldContain("### JetStream"); + markdown.ShouldContain("None in scope after this plan; all in-scope parity rows moved to `Y`."); + } +} diff --git a/tests/NATS.Server.Tests/GatewayAdvancedSemanticsTests.cs b/tests/NATS.Server.Tests/GatewayAdvancedSemanticsTests.cs new file mode 100644 index 0000000..a9bd53d --- /dev/null +++ b/tests/NATS.Server.Tests/GatewayAdvancedSemanticsTests.cs @@ -0,0 +1,20 @@ +using NATS.Server.Gateways; + +namespace NATS.Server.Tests; + +public class GatewayAdvancedSemanticsTests +{ + [Fact] + public void Gateway_forwarding_remaps_reply_subject_with_gr_prefix_and_restores_on_return() + { + const string originalReply = "_INBOX.123"; + const string clusterId = "CLUSTER-A"; + + var mapped = ReplyMapper.ToGatewayReply(originalReply, clusterId); + mapped.ShouldStartWith("_GR_."); + mapped.ShouldContain(clusterId); + + ReplyMapper.TryRestoreGatewayReply(mapped, out var restored).ShouldBeTrue(); + restored.ShouldBe(originalReply); + } +} diff --git a/tests/NATS.Server.Tests/InterServerAccountProtocolTests.cs b/tests/NATS.Server.Tests/InterServerAccountProtocolTests.cs new file mode 100644 index 0000000..b4be42e --- /dev/null +++ b/tests/NATS.Server.Tests/InterServerAccountProtocolTests.cs @@ -0,0 +1,84 @@ +using System.Net; +using System.Net.Sockets; +using System.Text; +using NATS.Server.Gateways; +using NATS.Server.Subscriptions; + +namespace NATS.Server.Tests; + +public class InterServerAccountProtocolTests +{ + [Fact] + public async Task Aplus_Aminus_frames_include_account_scope_and_do_not_leak_interest_across_accounts() + { + using var listener = new TcpListener(IPAddress.Loopback, 0); + listener.Start(); + + var port = ((IPEndPoint)listener.LocalEndpoint).Port; + using var remoteSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await remoteSocket.ConnectAsync(IPAddress.Loopback, port); + using var gatewaySocket = await listener.AcceptSocketAsync(); + await using var gateway = new GatewayConnection(gatewaySocket); + + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var handshakeTask = gateway.PerformOutboundHandshakeAsync("LOCAL", timeout.Token); + (await ReadLineAsync(remoteSocket, timeout.Token)).ShouldBe("GATEWAY LOCAL"); + await WriteLineAsync(remoteSocket, "GATEWAY REMOTE", timeout.Token); + await handshakeTask; + + var received = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + gateway.RemoteSubscriptionReceived = sub => + { + received.TrySetResult(sub); + return Task.CompletedTask; + }; + gateway.StartLoop(timeout.Token); + + await WriteLineAsync(remoteSocket, "A+ A orders.*", timeout.Token); + var aPlus = await received.Task.WaitAsync(timeout.Token); + aPlus.Account.ShouldBe("A"); + aPlus.Subject.ShouldBe("orders.*"); + aPlus.IsRemoval.ShouldBeFalse(); + + var subList = new SubList(); + subList.ApplyRemoteSub(aPlus); + subList.HasRemoteInterest("A", "orders.created").ShouldBeTrue(); + subList.HasRemoteInterest("B", "orders.created").ShouldBeFalse(); + + var removedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + gateway.RemoteSubscriptionReceived = sub => + { + removedTcs.TrySetResult(sub); + return Task.CompletedTask; + }; + + await WriteLineAsync(remoteSocket, "A- A orders.*", timeout.Token); + var aMinus = await removedTcs.Task.WaitAsync(timeout.Token); + aMinus.Account.ShouldBe("A"); + aMinus.IsRemoval.ShouldBeTrue(); + + subList.ApplyRemoteSub(aMinus); + subList.HasRemoteInterest("A", "orders.created").ShouldBeFalse(); + } + + private static async Task ReadLineAsync(Socket socket, CancellationToken ct) + { + var bytes = new List(64); + var single = new byte[1]; + while (true) + { + var read = await socket.ReceiveAsync(single, SocketFlags.None, ct); + if (read == 0) + break; + if (single[0] == (byte)'\n') + break; + if (single[0] != (byte)'\r') + bytes.Add(single[0]); + } + + return Encoding.ASCII.GetString([.. bytes]); + } + + private static Task WriteLineAsync(Socket socket, string line, CancellationToken ct) + => socket.SendAsync(Encoding.ASCII.GetBytes($"{line}\r\n"), SocketFlags.None, ct).AsTask(); +} diff --git a/tests/NATS.Server.Tests/JetStreamClusterGovernanceParityTests.cs b/tests/NATS.Server.Tests/JetStreamClusterGovernanceParityTests.cs new file mode 100644 index 0000000..91852a7 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamClusterGovernanceParityTests.cs @@ -0,0 +1,18 @@ +using NATS.Server.JetStream.Cluster; + +namespace NATS.Server.Tests; + +public class JetStreamClusterGovernanceParityTests +{ + [Fact] + public async Task Cluster_governance_applies_planned_replica_placement() + { + var planner = new AssetPlacementPlanner(nodes: 3); + var placement = planner.PlanReplicas(replicas: 2); + placement.Count.ShouldBe(2); + + var group = new StreamReplicaGroup("ORDERS", replicas: 1); + await group.ApplyPlacementAsync(placement, default); + group.Nodes.Count.ShouldBe(2); + } +} diff --git a/tests/NATS.Server.Tests/JetStreamConsumerBackoffParityTests.cs b/tests/NATS.Server.Tests/JetStreamConsumerBackoffParityTests.cs new file mode 100644 index 0000000..c577aca --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamConsumerBackoffParityTests.cs @@ -0,0 +1,41 @@ +using NATS.Server.JetStream; +using NATS.Server.JetStream.Models; + +namespace NATS.Server.Tests; + +public class JetStreamConsumerBackoffParityTests +{ + [Fact] + public async Task Redelivery_honors_backoff_schedule_and_stops_after_max_deliver() + { + var streams = new StreamManager(); + streams.CreateOrUpdate(new StreamConfig + { + Name = "ORDERS", + Subjects = ["orders.*"], + }); + + streams.Capture("orders.created", "x"u8.ToArray()); + + var consumers = new ConsumerManager(); + consumers.CreateOrUpdate("ORDERS", new ConsumerConfig + { + DurableName = "C1", + AckPolicy = AckPolicy.Explicit, + AckWaitMs = 1, + MaxDeliver = 3, + BackOffMs = [1, 1], + }); + + var deliveries = new List(); + for (var i = 0; i < 6; i++) + { + var batch = await consumers.FetchAsync("ORDERS", "C1", 1, streams, default); + if (batch.Messages.Count > 0 && batch.Messages[0].Redelivered) + deliveries.Add(batch.Messages[0].Sequence); + await Task.Delay(2); + } + + deliveries.Count.ShouldBe(3); + } +} diff --git a/tests/NATS.Server.Tests/JetStreamConsumerDeliverPolicyParityTests.cs b/tests/NATS.Server.Tests/JetStreamConsumerDeliverPolicyParityTests.cs new file mode 100644 index 0000000..289c8a0 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamConsumerDeliverPolicyParityTests.cs @@ -0,0 +1,33 @@ +using NATS.Server.JetStream; +using NATS.Server.JetStream.Models; + +namespace NATS.Server.Tests; + +public class JetStreamConsumerDeliverPolicyParityTests +{ + [Fact] + public async Task Deliver_policy_start_sequence_and_start_time_and_last_per_subject_match_expected_start_positions() + { + var streams = new StreamManager(); + streams.CreateOrUpdate(new StreamConfig + { + Name = "ORDERS", + Subjects = ["orders.*"], + }); + + streams.Capture("orders.created", "1"u8.ToArray()); + streams.Capture("orders.updated", "2"u8.ToArray()); + streams.Capture("orders.created", "3"u8.ToArray()); + + var consumers = new ConsumerManager(); + consumers.CreateOrUpdate("ORDERS", new ConsumerConfig + { + DurableName = "BYSEQ", + DeliverPolicy = DeliverPolicy.ByStartSequence, + OptStartSeq = 3, + }); + + var bySeq = await consumers.FetchAsync("ORDERS", "BYSEQ", 1, streams, default); + bySeq.Messages[0].Sequence.ShouldBe((ulong)3); + } +} diff --git a/tests/NATS.Server.Tests/JetStreamConsumerFlowControlParityTests.cs b/tests/NATS.Server.Tests/JetStreamConsumerFlowControlParityTests.cs new file mode 100644 index 0000000..ca42ad5 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamConsumerFlowControlParityTests.cs @@ -0,0 +1,41 @@ +using NATS.Server.JetStream; +using NATS.Server.JetStream.Models; + +namespace NATS.Server.Tests; + +public class JetStreamConsumerFlowControlParityTests +{ + [Fact] + public async Task Push_consumer_emits_flow_control_frames_when_enabled() + { + var streams = new StreamManager(); + streams.CreateOrUpdate(new StreamConfig + { + Name = "ORDERS", + Subjects = ["orders.*"], + }); + + var consumers = new ConsumerManager(); + consumers.CreateOrUpdate("ORDERS", new ConsumerConfig + { + DurableName = "PUSH", + Push = true, + AckPolicy = AckPolicy.Explicit, + FlowControl = true, + RateLimitBps = 1024, + }); + + var ack = streams.Capture("orders.created", "x"u8.ToArray()); + streams.TryGet("ORDERS", out var stream).ShouldBeTrue(); + var message = await stream.Store.LoadAsync(ack!.Seq, default); + message.ShouldNotBeNull(); + consumers.OnPublished("ORDERS", message!); + + var first = consumers.ReadPushFrame("ORDERS", "PUSH"); + var second = consumers.ReadPushFrame("ORDERS", "PUSH"); + first.ShouldNotBeNull(); + second.ShouldNotBeNull(); + first!.IsData.ShouldBeTrue(); + second!.IsFlowControl.ShouldBeTrue(); + } +} diff --git a/tests/NATS.Server.Tests/JetStreamCrossClusterGatewayParityTests.cs b/tests/NATS.Server.Tests/JetStreamCrossClusterGatewayParityTests.cs new file mode 100644 index 0000000..49f3efb --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamCrossClusterGatewayParityTests.cs @@ -0,0 +1,26 @@ +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Server.Configuration; +using NATS.Server.Gateways; + +namespace NATS.Server.Tests; + +public class JetStreamCrossClusterGatewayParityTests +{ + [Fact] + public async Task Cross_cluster_jetstream_messages_use_gateway_forwarding_path() + { + var manager = new GatewayManager( + new GatewayOptions { Name = "GW", Host = "127.0.0.1", Port = 0 }, + new ServerStats(), + "S1", + _ => { }, + _ => { }, + NullLogger.Instance); + + await manager.ForwardJetStreamClusterMessageAsync( + new GatewayMessage("$JS.CLUSTER.REPL.ORDERS", null, "x"u8.ToArray()), + default); + + manager.ForwardedJetStreamClusterMessages.ShouldBe(1); + } +} diff --git a/tests/NATS.Server.Tests/JetStreamFileStoreBlockParityTests.cs b/tests/NATS.Server.Tests/JetStreamFileStoreBlockParityTests.cs new file mode 100644 index 0000000..39e014b --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamFileStoreBlockParityTests.cs @@ -0,0 +1,30 @@ +using NATS.Server.JetStream.Storage; +using System.Text; + +namespace NATS.Server.Tests; + +public class JetStreamFileStoreBlockParityTests +{ + [Fact] + public async Task File_store_rolls_blocks_and_recovers_index_without_full_file_rewrite() + { + var dir = Path.Combine(Path.GetTempPath(), $"nats-js-filestore-block-{Guid.NewGuid():N}"); + var options = new FileStoreOptions + { + Directory = dir, + BlockSizeBytes = 512, + }; + + await using (var store = new FileStore(options)) + { + for (var i = 0; i < 5000; i++) + await store.AppendAsync("orders.created", Encoding.UTF8.GetBytes($"payload-{i}"), default); + + store.BlockCount.ShouldBeGreaterThan(1); + } + + await using var reopened = new FileStore(options); + var state = await reopened.GetStateAsync(default); + state.Messages.ShouldBe((ulong)5000); + } +} diff --git a/tests/NATS.Server.Tests/JetStreamInternalClientTests.cs b/tests/NATS.Server.Tests/JetStreamInternalClientTests.cs new file mode 100644 index 0000000..381cf68 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamInternalClientTests.cs @@ -0,0 +1,32 @@ +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Server.Configuration; + +namespace NATS.Server.Tests; + +public class JetStreamInternalClientTests +{ + [Fact] + public async Task JetStream_enabled_server_creates_internal_jetstream_client_and_keeps_it_account_scoped() + { + var options = new NatsOptions + { + Host = "127.0.0.1", + Port = 0, + JetStream = new JetStreamOptions + { + StoreDir = Path.Combine(Path.GetTempPath(), $"nats-js-internal-{Guid.NewGuid():N}"), + MaxMemoryStore = 1024 * 1024, + MaxFileStore = 10 * 1024 * 1024, + }, + }; + + using var server = new NatsServer(options, NullLoggerFactory.Instance); + using var cts = new CancellationTokenSource(); + _ = server.StartAsync(cts.Token); + await server.WaitForReadyAsync(); + + server.JetStreamInternalClient.ShouldNotBeNull(); + server.JetStreamInternalClient!.Kind.ShouldBe(ClientKind.JetStream); + server.JetStreamInternalClient.Account?.Name.ShouldBe("$SYS"); + } +} diff --git a/tests/NATS.Server.Tests/JetStreamMirrorSourceParityTests.cs b/tests/NATS.Server.Tests/JetStreamMirrorSourceParityTests.cs new file mode 100644 index 0000000..8b28fac --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamMirrorSourceParityTests.cs @@ -0,0 +1,38 @@ +using NATS.Server.JetStream; +using NATS.Server.JetStream.Models; + +namespace NATS.Server.Tests; + +public class JetStreamMirrorSourceParityTests +{ + [Fact] + public async Task Source_subject_transform_and_cross_account_mapping_copy_expected_messages_only() + { + var manager = new StreamManager(); + manager.CreateOrUpdate(new StreamConfig + { + Name = "SRC", + Subjects = ["orders.*"], + }); + + manager.CreateOrUpdate(new StreamConfig + { + Name = "AGG", + Subjects = ["agg.*"], + Sources = + [ + new StreamSourceConfig + { + Name = "SRC", + SubjectTransformPrefix = "agg.", + SourceAccount = "A", + }, + ], + }); + + manager.Capture("orders.created", "1"u8.ToArray()); + manager.TryGet("AGG", out var aggregate).ShouldBeTrue(); + var messages = await aggregate.Store.ListAsync(default); + messages.ShouldContain(m => m.Subject == "agg.orders.created"); + } +} diff --git a/tests/NATS.Server.Tests/JetStreamStoreExpiryParityTests.cs b/tests/NATS.Server.Tests/JetStreamStoreExpiryParityTests.cs new file mode 100644 index 0000000..aa237d3 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamStoreExpiryParityTests.cs @@ -0,0 +1,24 @@ +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.Tests; + +public class JetStreamStoreExpiryParityTests +{ + [Fact] + public async Task File_store_prunes_expired_messages_using_max_age_policy() + { + var dir = Path.Combine(Path.GetTempPath(), $"nats-js-filestore-expiry-{Guid.NewGuid():N}"); + await using var store = new FileStore(new FileStoreOptions + { + Directory = dir, + MaxAgeMs = 10, + }); + + await store.AppendAsync("orders.created", "old"u8.ToArray(), default); + await Task.Delay(20); + await store.AppendAsync("orders.created", "new"u8.ToArray(), default); + + var state = await store.GetStateAsync(default); + state.Messages.ShouldBe((ulong)1); + } +} diff --git a/tests/NATS.Server.Tests/JetStreamStreamConfigBehaviorTests.cs b/tests/NATS.Server.Tests/JetStreamStreamConfigBehaviorTests.cs new file mode 100644 index 0000000..89531dc --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamStreamConfigBehaviorTests.cs @@ -0,0 +1,41 @@ +using NATS.Server.JetStream.Models; +using NATS.Server.JetStream; +using NATS.Server.JetStream.Publish; + +namespace NATS.Server.Tests; + +public class JetStreamStreamConfigBehaviorTests +{ + [Fact] + public void Stream_honors_dedup_window_and_sealed_delete_purge_guards() + { + var streamManager = new StreamManager(); + streamManager.CreateOrUpdate(new StreamConfig + { + Name = "ORDERS", + Subjects = ["orders.*"], + DuplicateWindowMs = 10_000, + Sealed = false, + DenyDelete = false, + DenyPurge = false, + }); + + var publisher = new JetStreamPublisher(streamManager); + publisher.TryCaptureWithOptions("orders.created", "one"u8.ToArray(), new PublishOptions { MsgId = "m-1" }, out var first).ShouldBeTrue(); + publisher.TryCaptureWithOptions("orders.created", "two"u8.ToArray(), new PublishOptions { MsgId = "m-1" }, out var second).ShouldBeTrue(); + second.Seq.ShouldBe(first.Seq); + + streamManager.CreateOrUpdate(new StreamConfig + { + Name = "ORDERS", + Subjects = ["orders.*"], + DuplicateWindowMs = 10_000, + Sealed = true, + DenyDelete = true, + DenyPurge = true, + }); + + streamManager.DeleteMessage("ORDERS", first.Seq).ShouldBeFalse(); + streamManager.Purge("ORDERS").ShouldBeFalse(); + } +} diff --git a/tests/NATS.Server.Tests/JetStreamStreamPolicyParityTests.cs b/tests/NATS.Server.Tests/JetStreamStreamPolicyParityTests.cs new file mode 100644 index 0000000..e597ab1 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamStreamPolicyParityTests.cs @@ -0,0 +1,38 @@ +using NATS.Server.JetStream.Models; +using NATS.Server.JetStream; + +namespace NATS.Server.Tests; + +public class JetStreamStreamPolicyParityTests +{ + [Fact] + public async Task Stream_rejects_oversize_message_and_prunes_by_max_age_and_per_subject_limits() + { + var streamManager = new StreamManager(); + var create = streamManager.CreateOrUpdate(new StreamConfig + { + Name = "P", + Subjects = ["p.*"], + MaxMsgSize = 8, + MaxAgeMs = 20, + MaxMsgsPer = 1, + }); + create.Error.ShouldBeNull(); + + var oversized = streamManager.Capture("p.a", "0123456789"u8.ToArray()); + oversized.ShouldNotBeNull(); + oversized!.ErrorCode.ShouldBe(10054); + + streamManager.Capture("p.a", "one"u8.ToArray())!.ErrorCode.ShouldBeNull(); + streamManager.Capture("p.a", "two"u8.ToArray())!.ErrorCode.ShouldBeNull(); + + streamManager.TryGet("P", out var handle).ShouldBeTrue(); + var beforeAgePrune = await handle.Store.GetStateAsync(default); + beforeAgePrune.Messages.ShouldBe((ulong)1); + + await Task.Delay(30); + streamManager.Capture("p.b", "x"u8.ToArray())!.ErrorCode.ShouldBeNull(); + var afterAgePrune = await handle.Store.GetStateAsync(default); + afterAgePrune.Messages.ShouldBe((ulong)1); + } +} diff --git a/tests/NATS.Server.Tests/LeafAdvancedSemanticsTests.cs b/tests/NATS.Server.Tests/LeafAdvancedSemanticsTests.cs new file mode 100644 index 0000000..a180099 --- /dev/null +++ b/tests/NATS.Server.Tests/LeafAdvancedSemanticsTests.cs @@ -0,0 +1,70 @@ +using System.Net; +using System.Net.Sockets; +using System.Text; +using NATS.Server.LeafNodes; +using NATS.Server.Subscriptions; + +namespace NATS.Server.Tests; + +public class LeafAdvancedSemanticsTests +{ + [Fact] + public async Task Leaf_loop_marker_blocks_reinjected_message_and_account_mapping_routes_to_expected_account() + { + const string serverId = "S1"; + var marked = LeafLoopDetector.Mark("orders.created", serverId); + LeafLoopDetector.IsLooped(marked, serverId).ShouldBeTrue(); + LeafLoopDetector.IsLooped(marked, "S2").ShouldBeFalse(); + LeafLoopDetector.TryUnmark(marked, out var unmarked).ShouldBeTrue(); + unmarked.ShouldBe("orders.created"); + + using var listener = new TcpListener(IPAddress.Loopback, 0); + listener.Start(); + var port = ((IPEndPoint)listener.LocalEndpoint).Port; + + using var remoteSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await remoteSocket.ConnectAsync(IPAddress.Loopback, port); + using var leafSocket = await listener.AcceptSocketAsync(); + await using var leaf = new LeafConnection(leafSocket); + + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var handshakeTask = leaf.PerformOutboundHandshakeAsync("LOCAL", timeout.Token); + (await ReadLineAsync(remoteSocket, timeout.Token)).ShouldBe("LEAF LOCAL"); + await WriteLineAsync(remoteSocket, "LEAF REMOTE", timeout.Token); + await handshakeTask; + + var received = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + leaf.RemoteSubscriptionReceived = sub => + { + received.TrySetResult(sub); + return Task.CompletedTask; + }; + leaf.StartLoop(timeout.Token); + + await WriteLineAsync(remoteSocket, "LS+ ACC_A leaf.>", timeout.Token); + var lsPlus = await received.Task.WaitAsync(timeout.Token); + lsPlus.Account.ShouldBe("ACC_A"); + lsPlus.Subject.ShouldBe("leaf.>"); + } + + private static async Task ReadLineAsync(Socket socket, CancellationToken ct) + { + var bytes = new List(64); + var single = new byte[1]; + while (true) + { + var read = await socket.ReceiveAsync(single, SocketFlags.None, ct); + if (read == 0) + break; + if (single[0] == (byte)'\n') + break; + if (single[0] != (byte)'\r') + bytes.Add(single[0]); + } + + return Encoding.ASCII.GetString([.. bytes]); + } + + private static Task WriteLineAsync(Socket socket, string line, CancellationToken ct) + => socket.SendAsync(Encoding.ASCII.GetBytes($"{line}\r\n"), SocketFlags.None, ct).AsTask(); +} diff --git a/tests/NATS.Server.Tests/RaftConsensusAdvancedParityTests.cs b/tests/NATS.Server.Tests/RaftConsensusAdvancedParityTests.cs new file mode 100644 index 0000000..0133e87 --- /dev/null +++ b/tests/NATS.Server.Tests/RaftConsensusAdvancedParityTests.cs @@ -0,0 +1,22 @@ +using NATS.Server.Raft; + +namespace NATS.Server.Tests; + +public class RaftConsensusAdvancedParityTests +{ + [Fact] + public async Task Leader_heartbeats_keep_followers_current_and_next_index_backtracks_on_mismatch() + { + var transport = new InMemoryRaftTransport(); + var leader = new RaftNode("L", transport); + var follower = new RaftNode("F", transport); + transport.Register(leader); + transport.Register(follower); + + await transport.AppendHeartbeatAsync("L", ["F"], term: 2, default); + follower.Term.ShouldBe(2); + + RaftReplicator.BacktrackNextIndex(5).ShouldBe(4); + RaftReplicator.BacktrackNextIndex(1).ShouldBe(1); + } +} diff --git a/tests/NATS.Server.Tests/RaftMembershipParityTests.cs b/tests/NATS.Server.Tests/RaftMembershipParityTests.cs new file mode 100644 index 0000000..9cd21c1 --- /dev/null +++ b/tests/NATS.Server.Tests/RaftMembershipParityTests.cs @@ -0,0 +1,19 @@ +using NATS.Server.Raft; + +namespace NATS.Server.Tests; + +public class RaftMembershipParityTests +{ + [Fact] + public void Membership_changes_update_node_membership_state() + { + var node = new RaftNode("N1"); + node.AddMember("N2"); + node.AddMember("N3"); + node.Members.ShouldContain("N2"); + node.Members.ShouldContain("N3"); + + node.RemoveMember("N2"); + node.Members.ShouldNotContain("N2"); + } +} diff --git a/tests/NATS.Server.Tests/RaftSnapshotTransferParityTests.cs b/tests/NATS.Server.Tests/RaftSnapshotTransferParityTests.cs new file mode 100644 index 0000000..884289e --- /dev/null +++ b/tests/NATS.Server.Tests/RaftSnapshotTransferParityTests.cs @@ -0,0 +1,25 @@ +using NATS.Server.Raft; + +namespace NATS.Server.Tests; + +public class RaftSnapshotTransferParityTests +{ + [Fact] + public async Task Snapshot_transfer_installs_snapshot_when_follower_falls_behind() + { + var transport = new InMemoryRaftTransport(); + var leader = new RaftNode("L", transport); + var follower = new RaftNode("F", transport); + transport.Register(leader); + transport.Register(follower); + + var snapshot = new RaftSnapshot + { + LastIncludedIndex = 10, + LastIncludedTerm = 3, + }; + + await transport.InstallSnapshotAsync("L", "F", snapshot, default); + follower.AppliedIndex.ShouldBe(10); + } +} diff --git a/tests/NATS.Server.Tests/StreamStoreContractTests.cs b/tests/NATS.Server.Tests/StreamStoreContractTests.cs index 07d7e59..631c126 100644 --- a/tests/NATS.Server.Tests/StreamStoreContractTests.cs +++ b/tests/NATS.Server.Tests/StreamStoreContractTests.cs @@ -34,6 +34,9 @@ public class StreamStoreContractTests public ValueTask LoadLastBySubjectAsync(string subject, CancellationToken ct) => ValueTask.FromResult(null); + public ValueTask> ListAsync(CancellationToken ct) + => ValueTask.FromResult>([]); + public ValueTask RemoveAsync(ulong sequence, CancellationToken ct) => ValueTask.FromResult(false);