From b9f6a8cc0bdb65510c4722d505b07ddbfa0d9532 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 25 Feb 2026 11:21:21 -0500 Subject: [PATCH] feat: add cluster-aware pending request tracking for pull consumers (Gap 3.14) Adds ProposeWaitingRequest, RegisterClusterPending, RemoveClusterPending, GetClusterPendingRequests, and ClusterPendingCount to PullConsumerEngine, backed by a ConcurrentDictionary keyed by reply subject. Includes 10 xUnit tests covering quorum checks, pending tracking, and concurrent access patterns. --- .../JetStream/Consumers/PullConsumerEngine.cs | 56 +++++ .../Consumers/ClusterPendingRequestTests.cs | 198 ++++++++++++++++++ 2 files changed, 254 insertions(+) create mode 100644 tests/NATS.Server.Tests/JetStream/Consumers/ClusterPendingRequestTests.cs diff --git a/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs b/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs index 4ec49ae..a064762 100644 --- a/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs +++ b/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs @@ -1,3 +1,5 @@ +using System.Collections.Concurrent; +using NATS.Server.JetStream.Cluster; using NATS.Server.JetStream.Storage; using NATS.Server.JetStream.Models; using NATS.Server.Subscriptions; @@ -93,6 +95,60 @@ public sealed class CompiledFilter public sealed class PullConsumerEngine { + // Go: consumer.go — cluster-wide pending pull request tracking keyed by reply subject. + // Reference: golang/nats-server/server/consumer.go waitingRequestsPending / proposeWaitingRequest + private readonly ConcurrentDictionary _clusterPending = + new(StringComparer.Ordinal); + + /// + /// Number of pending pull requests currently tracked across the cluster. + /// Go reference: consumer.go — cluster pending count (waitingRequestsPending). + /// + public int ClusterPendingCount => _clusterPending.Count; + + /// + /// Proposes a waiting pull request through the consumer's RAFT group. + /// Returns true if quorum is available and the request was registered; false otherwise. + /// Go reference: consumer.go proposeWaitingRequest — propose via consumer RAFT group. + /// + public bool ProposeWaitingRequest(PullWaitingRequest request, RaftGroup group) + { + if (!group.HasQuorum(group.Peers.Count)) + return false; + + var replyKey = request.Reply ?? string.Empty; + _clusterPending[replyKey] = request; + return true; + } + + /// + /// Registers a pull request in the cluster pending tracker, keyed by reply subject. + /// Go reference: consumer.go — cluster pending registration on proposal acceptance. + /// + public void RegisterClusterPending(PullWaitingRequest request) + { + var replyKey = request.Reply ?? string.Empty; + _clusterPending[replyKey] = request; + } + + /// + /// Removes and returns a pending pull request by its reply subject. + /// Returns null if no request is registered for that reply subject. + /// Go reference: consumer.go — cluster pending removal on fulfillment or expiry. + /// + public PullWaitingRequest? RemoveClusterPending(string replySubject) + { + _clusterPending.TryRemove(replySubject, out var request); + return request; + } + + /// + /// Returns all currently pending pull requests tracked across the cluster. + /// Go reference: consumer.go — enumerate waitingRequestsPending for expiry sweep. + /// + public IReadOnlyCollection GetClusterPendingRequests() + => _clusterPending.Values.ToArray(); + public async ValueTask FetchAsync(StreamHandle stream, ConsumerHandle consumer, int batch, CancellationToken ct) => await FetchAsync(stream, consumer, new PullFetchRequest { Batch = batch }, ct); diff --git a/tests/NATS.Server.Tests/JetStream/Consumers/ClusterPendingRequestTests.cs b/tests/NATS.Server.Tests/JetStream/Consumers/ClusterPendingRequestTests.cs new file mode 100644 index 0000000..036e8c3 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Consumers/ClusterPendingRequestTests.cs @@ -0,0 +1,198 @@ +using NATS.Server.JetStream.Consumers; +using NATS.Server.JetStream.Cluster; +using Shouldly; + +namespace NATS.Server.Tests.JetStream.Consumers; + +/// +/// Tests for cluster-aware pending pull request tracking in PullConsumerEngine. +/// Go reference: consumer.go proposeWaitingRequest / waitingRequestsPending — cluster-wide +/// pending pull request coordination via the consumer RAFT group. +/// golang/nats-server/server/consumer.go proposeWaitingRequest +/// +public class ClusterPendingRequestTests +{ + // --------------------------------------------------------------- + // ProposeWaitingRequest + // --------------------------------------------------------------- + + [Fact] + public void ProposeWaitingRequest_with_quorum_returns_true() + { + // Go: consumer.go proposeWaitingRequest — only propose when quorum available. + var engine = new PullConsumerEngine(); + var group = new RaftGroup + { + Name = "test-group", + Peers = ["peer-1", "peer-2", "peer-3"], + }; + var request = new PullWaitingRequest { Batch = 10, Reply = "reply.test.1" }; + + var result = engine.ProposeWaitingRequest(request, group); + + result.ShouldBeTrue(); + } + + [Fact] + public void ProposeWaitingRequest_without_quorum_returns_false() + { + // Go: consumer.go proposeWaitingRequest — no quorum (0 peers means quorum = 1, but 0 < 1). + var engine = new PullConsumerEngine(); + var group = new RaftGroup + { + Name = "empty-group", + Peers = [], + }; + var request = new PullWaitingRequest { Batch = 5, Reply = "reply.noquorum" }; + + var result = engine.ProposeWaitingRequest(request, group); + + result.ShouldBeFalse(); + } + + [Fact] + public void ProposeWaitingRequest_registers_in_cluster_pending() + { + // Go: consumer.go — after a successful proposal, the request must appear in the + // cluster pending map so it can be fulfilled or expired. + var engine = new PullConsumerEngine(); + var group = new RaftGroup + { + Name = "test-group", + Peers = ["peer-1", "peer-2", "peer-3"], + }; + var request = new PullWaitingRequest { Batch = 4, Reply = "reply.reg" }; + + engine.ProposeWaitingRequest(request, group); + + var pending = engine.GetClusterPendingRequests(); + pending.ShouldContain(r => r.Reply == "reply.reg"); + } + + [Fact] + public void Multiple_proposals_tracked_independently() + { + // Go: consumer.go — each reply subject is an independent pending slot; + // proposals with different reply subjects must not overwrite each other. + var engine = new PullConsumerEngine(); + var group = new RaftGroup + { + Name = "test-group", + Peers = ["peer-1", "peer-2", "peer-3"], + }; + + engine.ProposeWaitingRequest(new PullWaitingRequest { Batch = 1, Reply = "reply.A" }, group); + engine.ProposeWaitingRequest(new PullWaitingRequest { Batch = 2, Reply = "reply.B" }, group); + engine.ProposeWaitingRequest(new PullWaitingRequest { Batch = 3, Reply = "reply.C" }, group); + + engine.ClusterPendingCount.ShouldBe(3); + var pending = engine.GetClusterPendingRequests(); + pending.ShouldContain(r => r.Reply == "reply.A" && r.Batch == 1); + pending.ShouldContain(r => r.Reply == "reply.B" && r.Batch == 2); + pending.ShouldContain(r => r.Reply == "reply.C" && r.Batch == 3); + } + + // --------------------------------------------------------------- + // ClusterPendingCount + // --------------------------------------------------------------- + + [Fact] + public void ClusterPendingCount_tracks_pending_requests() + { + // Go: consumer.go — ClusterPendingCount reflects the current size of the pending map. + var engine = new PullConsumerEngine(); + engine.ClusterPendingCount.ShouldBe(0); + + engine.RegisterClusterPending(new PullWaitingRequest { Batch = 1, Reply = "r1" }); + engine.RegisterClusterPending(new PullWaitingRequest { Batch = 2, Reply = "r2" }); + engine.RegisterClusterPending(new PullWaitingRequest { Batch = 3, Reply = "r3" }); + + engine.ClusterPendingCount.ShouldBe(3); + } + + [Fact] + public void ClusterPendingCount_decrements_on_remove() + { + // Go: consumer.go — removing a request via reply subject decrements the pending count. + var engine = new PullConsumerEngine(); + engine.RegisterClusterPending(new PullWaitingRequest { Batch = 5, Reply = "decrement.reply" }); + engine.ClusterPendingCount.ShouldBe(1); + + engine.RemoveClusterPending("decrement.reply"); + + engine.ClusterPendingCount.ShouldBe(0); + } + + // --------------------------------------------------------------- + // RegisterClusterPending + // --------------------------------------------------------------- + + [Fact] + public void RegisterClusterPending_adds_request_by_reply() + { + // Go: consumer.go — pending requests are keyed by reply subject for O(1) lookup. + var engine = new PullConsumerEngine(); + var request = new PullWaitingRequest { Batch = 7, Reply = "register.reply.subject" }; + + engine.RegisterClusterPending(request); + + var retrieved = engine.RemoveClusterPending("register.reply.subject"); + retrieved.ShouldNotBeNull(); + retrieved.Batch.ShouldBe(7); + retrieved.Reply.ShouldBe("register.reply.subject"); + } + + // --------------------------------------------------------------- + // RemoveClusterPending + // --------------------------------------------------------------- + + [Fact] + public void RemoveClusterPending_returns_and_removes_request() + { + // Go: consumer.go — RemoveClusterPending both returns the request and removes it + // from the map so it is not fulfilled twice. + var engine = new PullConsumerEngine(); + engine.RegisterClusterPending(new PullWaitingRequest { Batch = 3, Reply = "remove.me" }); + + var removed = engine.RemoveClusterPending("remove.me"); + + removed.ShouldNotBeNull(); + removed.Reply.ShouldBe("remove.me"); + engine.ClusterPendingCount.ShouldBe(0); + // Second removal should return null — the entry is gone. + engine.RemoveClusterPending("remove.me").ShouldBeNull(); + } + + [Fact] + public void RemoveClusterPending_returns_null_for_unknown() + { + // Go: consumer.go — attempting to remove an unknown reply subject is a no-op. + var engine = new PullConsumerEngine(); + + var result = engine.RemoveClusterPending("does.not.exist"); + + result.ShouldBeNull(); + } + + // --------------------------------------------------------------- + // GetClusterPendingRequests + // --------------------------------------------------------------- + + [Fact] + public void GetClusterPendingRequests_returns_all_pending() + { + // Go: consumer.go — GetClusterPendingRequests is used for expiry sweeps and + // diagnostics; it must return every currently pending request. + var engine = new PullConsumerEngine(); + engine.RegisterClusterPending(new PullWaitingRequest { Batch = 1, Reply = "bulk.a" }); + engine.RegisterClusterPending(new PullWaitingRequest { Batch = 2, Reply = "bulk.b" }); + engine.RegisterClusterPending(new PullWaitingRequest { Batch = 3, Reply = "bulk.c" }); + + var all = engine.GetClusterPendingRequests(); + + all.Count.ShouldBe(3); + all.Select(r => r.Reply).ShouldContain("bulk.a"); + all.Select(r => r.Reply).ShouldContain("bulk.b"); + all.Select(r => r.Reply).ShouldContain("bulk.c"); + } +}