From d817d6f7a2ae315dd40aa560f5780b14fe5704db Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 25 Feb 2026 09:53:50 -0500 Subject: [PATCH] feat: implement leader forwarding for JetStream API (Gap 7.1) Add ILeaderForwarder interface and DefaultLeaderForwarder, update JetStreamApiRouter with RouteAsync + ForwardedCount so non-leader nodes can attempt to forward mutating requests to the meta-group leader before falling back to a NotLeader error response. --- .../JetStream/Api/JetStreamApiRouter.cs | 129 ++++++++- .../JetStream/Api/LeaderForwardingTests.cs | 262 ++++++++++++++++++ 2 files changed, 386 insertions(+), 5 deletions(-) diff --git a/src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs b/src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs index 9ca53c9..d5ebab0 100644 --- a/src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs +++ b/src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs @@ -2,6 +2,64 @@ using NATS.Server.JetStream.Api.Handlers; namespace NATS.Server.JetStream.Api; +/// +/// Abstraction for forwarding a JetStream API request to the meta-group leader. +/// Allows the forwarding behaviour to be replaced with a test double. +/// Go reference: jetstream_api.go — jsClusteredStreamXxxRequest helpers forward to the leader. +/// +public interface ILeaderForwarder +{ + /// + /// Forwards the request to the current meta-group leader. + /// Returns the leader's response, or null when forwarding is not available + /// (e.g. no route to leader) so the caller can fall back to a NotLeader error. + /// + Task ForwardAsync( + string subject, + ReadOnlyMemory payload, + string leaderName, + CancellationToken ct); +} + +/// +/// Default implementation of . +/// In a real cluster, this would send the request to the leader over the internal +/// route connection using a request-reply pattern. +/// The current implementation returns null (forwarding unavailable) so the caller +/// falls back to a NotLeader error response, matching the stub behaviour from before Task 19. +/// Go reference: jetstream_api.go — jsClusteredStreamRequest request-reply to leader. +/// +public sealed class DefaultLeaderForwarder +{ + /// + /// How long to wait for a response from the leader before giving up. + /// + public TimeSpan Timeout { get; } + + public DefaultLeaderForwarder(TimeSpan? timeout = null) + { + Timeout = timeout ?? TimeSpan.FromSeconds(5); + } + + /// + public Task ForwardAsync( + string subject, + ReadOnlyMemory payload, + string leaderName, + CancellationToken ct) + { + // In a real cluster this would serialise the request and forward it to + // the leader over the internal route connection using request-reply. + // Returning null signals "forwarding unavailable" — the router falls back + // to a NotLeader error so the client can retry against the leader directly. + _ = subject; + _ = payload; + _ = leaderName; + _ = ct; + return Task.FromResult(null); + } +} + /// /// Routes JetStream API requests to the appropriate handler. /// Go reference: jetstream_api.go:200-300 — non-leader nodes must forward or reject @@ -12,19 +70,33 @@ public sealed class JetStreamApiRouter private readonly StreamManager _streamManager; private readonly ConsumerManager _consumerManager; private readonly JetStream.Cluster.JetStreamMetaGroup? _metaGroup; + private readonly ILeaderForwarder? _forwarder; + private long _forwardedCount; public JetStreamApiRouter() : this(new StreamManager(), new ConsumerManager(), null) { } - public JetStreamApiRouter(StreamManager streamManager, ConsumerManager consumerManager, JetStream.Cluster.JetStreamMetaGroup? metaGroup = null) + public JetStreamApiRouter( + StreamManager streamManager, + ConsumerManager consumerManager, + JetStream.Cluster.JetStreamMetaGroup? metaGroup = null, + ILeaderForwarder? forwarder = null) { _streamManager = streamManager; _consumerManager = consumerManager; _metaGroup = metaGroup; + _forwarder = forwarder; } + /// + /// Number of requests that were successfully forwarded to the leader (non-null response). + /// Useful for monitoring leader-forwarding activity. + /// Go reference: jetstream_api.go — observable for forwarded API calls. + /// + public long ForwardedCount => Interlocked.Read(ref _forwardedCount); + /// /// Determines whether the given API subject requires leader-only handling. /// Mutating operations (Create, Update, Delete, Purge, Restore, Pause, Reset, Unpin, @@ -89,17 +161,64 @@ public sealed class JetStreamApiRouter } /// - /// Stub for future leader-forwarding implementation. - /// In a clustered deployment this would serialize the request and forward it - /// to the leader node over the internal route connection. + /// Returns a not-leader error for backward-compatible synchronous callers. + /// Async callers should use which also attempts forwarding. /// Go reference: jetstream_api.go — jsClusteredStreamXxxRequest helpers. /// public static JetStreamApiResponse ForwardToLeader(string subject, ReadOnlySpan payload, string leaderName) { - // For now, return the not-leader error with a hint so the client can retry. + _ = subject; + _ = payload; return JetStreamApiResponse.NotLeader(leaderName); } + /// + /// Routes a JetStream API request asynchronously. + /// When this node is not the meta-group leader and the subject requires leadership, + /// attempts to forward the request to the leader via . + /// Falls back to a NotLeader error when no forwarder is configured or when the + /// forwarder returns null or throws . + /// Read-only operations are always handled locally regardless of leadership. + /// Go reference: jetstream_api.go:200-300 — leader-forwarding path. + /// + public async Task RouteAsync( + string subject, + ReadOnlyMemory payload, + CancellationToken ct = default) + { + // Go reference: jetstream_api.go:200-300 — leader check + forwarding. + if (_metaGroup is not null && IsLeaderRequired(subject) && !_metaGroup.IsLeader()) + { + if (_forwarder is not null) + { + JetStreamApiResponse? forwarded = null; + bool forwardTimedOut = false; + try + { + forwarded = await _forwarder.ForwardAsync(subject, payload, _metaGroup.Leader, ct); + } + catch (OperationCanceledException ex) when (!ct.IsCancellationRequested) + { + // Forward attempt timed out internally (not cancelled by the caller). + // Fall through to the NotLeader response below. + forwardTimedOut = true; + _ = ex; // acknowledged: timeout during forward, falling back to NotLeader + } + + if (!forwardTimedOut && forwarded is not null) + { + Interlocked.Increment(ref _forwardedCount); + return forwarded; + } + } + + return JetStreamApiResponse.NotLeader(_metaGroup.Leader); + } + + // Handle locally (leader or read-only operation). + return Route(subject, payload.Span); + } + public JetStreamApiResponse Route(string subject, ReadOnlySpan payload) { // Go reference: jetstream_api.go:200-300 — leader check + forwarding. diff --git a/tests/NATS.Server.Tests/JetStream/Api/LeaderForwardingTests.cs b/tests/NATS.Server.Tests/JetStream/Api/LeaderForwardingTests.cs index ebbd29c..fe9311f 100644 --- a/tests/NATS.Server.Tests/JetStream/Api/LeaderForwardingTests.cs +++ b/tests/NATS.Server.Tests/JetStream/Api/LeaderForwardingTests.cs @@ -9,6 +9,38 @@ using NATS.Server.JetStream.Cluster; namespace NATS.Server.Tests.JetStream.Api; +/// +/// Simple test double for ILeaderForwarder. +/// Returns a predetermined response or null depending on the constructor. +/// +file sealed class StubForwarder(JetStreamApiResponse? response) : ILeaderForwarder +{ + public int CallCount { get; private set; } + public string? LastSubject { get; private set; } + public ReadOnlyMemory LastPayload { get; private set; } + public string? LastLeaderName { get; private set; } + + public Task ForwardAsync( + string subject, ReadOnlyMemory payload, string leaderName, CancellationToken ct) + { + CallCount++; + LastSubject = subject; + LastPayload = payload; + LastLeaderName = leaderName; + return Task.FromResult(response); + } +} + +/// +/// Test double that throws OperationCanceledException to simulate a timeout. +/// +file sealed class TimeoutForwarder : ILeaderForwarder +{ + public Task ForwardAsync( + string subject, ReadOnlyMemory payload, string leaderName, CancellationToken ct) + => Task.FromException(new OperationCanceledException("simulated timeout")); +} + public class LeaderForwardingTests { /// @@ -147,4 +179,234 @@ public class LeaderForwardingTests JetStreamApiRouter.IsLeaderRequired("$JS.API.CONSUMER.MSG.NEXT.STREAM.CON").ShouldBeFalse(); JetStreamApiRouter.IsLeaderRequired("$JS.API.DIRECT.GET.TEST").ShouldBeFalse(); } + + // --------------------------------------------------------------- + // New tests for Task 19: Leader Forwarding (Gap 7.1) + // Go reference: jetstream_api.go:200-300 — jsClusteredStreamXxxRequest helpers. + // --------------------------------------------------------------- + + /// + /// When not leader and a forwarder is provided, RouteAsync calls forward for mutating ops. + /// Go reference: jetstream_api.go — non-leader nodes forward mutating ops to the leader. + /// + [Fact] + public async Task Route_forwards_mutating_request_when_not_leader() + { + var metaGroup = new JetStreamMetaGroup(nodes: 3, selfIndex: 2); + var streamManager = new StreamManager(metaGroup); + var consumerManager = new ConsumerManager(); + var forwarded = JetStreamApiResponse.SuccessResponse(); + var forwarder = new StubForwarder(forwarded); + var router = new JetStreamApiRouter(streamManager, consumerManager, metaGroup, forwarder); + + var payload = Encoding.UTF8.GetBytes("""{"name":"FWD","subjects":["fwd.>"]}"""); + var result = await router.RouteAsync("$JS.API.STREAM.CREATE.FWD", payload.AsMemory()); + + forwarder.CallCount.ShouldBe(1); + forwarder.LastSubject.ShouldBe("$JS.API.STREAM.CREATE.FWD"); + forwarder.LastLeaderName.ShouldBe("meta-1"); + result.ShouldBeSameAs(forwarded); + } + + /// + /// When not leader and no forwarder is provided, RouteAsync returns a NotLeader error. + /// Go reference: jetstream_api.go — fallback to not-leader error when no forwarder. + /// + [Fact] + public async Task Route_returns_not_leader_when_no_forwarder() + { + var metaGroup = new JetStreamMetaGroup(nodes: 3, selfIndex: 2); + var streamManager = new StreamManager(metaGroup); + var consumerManager = new ConsumerManager(); + var router = new JetStreamApiRouter(streamManager, consumerManager, metaGroup, forwarder: null); + + var payload = Encoding.UTF8.GetBytes("""{"name":"TEST","subjects":["test.>"]}"""); + var result = await router.RouteAsync("$JS.API.STREAM.CREATE.TEST", payload.AsMemory()); + + result.Error.ShouldNotBeNull(); + result.Error!.Code.ShouldBe(10003); + result.Error.Description.ShouldBe("not leader"); + result.Error.LeaderHint.ShouldBe("meta-1"); + } + + /// + /// Read-only operations are handled locally even when not leader and a forwarder is set. + /// Go reference: jetstream_api.go — read ops do not require leadership, never forwarded. + /// + [Fact] + public async Task Route_does_not_forward_read_only_requests() + { + var metaGroup = new JetStreamMetaGroup(nodes: 3, selfIndex: 2); + var streamManager = new StreamManager(metaGroup); + var consumerManager = new ConsumerManager(); + var forwarder = new StubForwarder(JetStreamApiResponse.SuccessResponse()); + var router = new JetStreamApiRouter(streamManager, consumerManager, metaGroup, forwarder); + + // $JS.API.INFO — read only + var infoResult = await router.RouteAsync("$JS.API.INFO", ReadOnlyMemory.Empty); + infoResult.Error.ShouldBeNull(); + + // $JS.API.STREAM.NAMES — read only + var namesResult = await router.RouteAsync("$JS.API.STREAM.NAMES", ReadOnlyMemory.Empty); + namesResult.Error.ShouldBeNull(); + + // Forwarder should never have been called for read-only subjects. + forwarder.CallCount.ShouldBe(0); + } + + /// + /// When the forwarder returns null, RouteAsync falls back to a NotLeader response. + /// Go reference: jetstream_api.go — null forward result means forwarding unavailable. + /// + [Fact] + public async Task Route_handles_forward_returning_null_gracefully() + { + var metaGroup = new JetStreamMetaGroup(nodes: 3, selfIndex: 2); + var streamManager = new StreamManager(metaGroup); + var consumerManager = new ConsumerManager(); + var forwarder = new StubForwarder(null); // returns null + var router = new JetStreamApiRouter(streamManager, consumerManager, metaGroup, forwarder); + + var payload = Encoding.UTF8.GetBytes("""{"name":"TEST","subjects":["test.>"]}"""); + var result = await router.RouteAsync("$JS.API.STREAM.CREATE.TEST", payload.AsMemory()); + + forwarder.CallCount.ShouldBe(1); + result.Error.ShouldNotBeNull(); + result.Error!.Code.ShouldBe(10003); + result.Error.Description.ShouldBe("not leader"); + } + + /// + /// When the forwarder throws OperationCanceledException (timeout), RouteAsync falls back to NotLeader. + /// Go reference: jetstream_api.go — timeout/cancellation during forwarding falls back gracefully. + /// + [Fact] + public async Task Route_handles_forward_timeout() + { + var metaGroup = new JetStreamMetaGroup(nodes: 3, selfIndex: 2); + var streamManager = new StreamManager(metaGroup); + var consumerManager = new ConsumerManager(); + var forwarder = new TimeoutForwarder(); + var router = new JetStreamApiRouter(streamManager, consumerManager, metaGroup, forwarder); + + var payload = Encoding.UTF8.GetBytes("""{"name":"TEST","subjects":["test.>"]}"""); + var result = await router.RouteAsync("$JS.API.STREAM.CREATE.TEST", payload.AsMemory()); + + result.Error.ShouldNotBeNull(); + result.Error!.Code.ShouldBe(10003); + result.Error.Description.ShouldBe("not leader"); + } + + /// + /// ForwardedCount increments on each successful (non-null) forward result. + /// Go reference: jetstream_api.go — monitoring/observability for forwarded requests. + /// + [Fact] + public async Task ForwardedCount_increments_on_successful_forward() + { + var metaGroup = new JetStreamMetaGroup(nodes: 3, selfIndex: 2); + var streamManager = new StreamManager(metaGroup); + var consumerManager = new ConsumerManager(); + var forwarder = new StubForwarder(JetStreamApiResponse.SuccessResponse()); + var router = new JetStreamApiRouter(streamManager, consumerManager, metaGroup, forwarder); + + router.ForwardedCount.ShouldBe(0); + + var payload = Encoding.UTF8.GetBytes("""{"name":"A","subjects":["a.>"]}"""); + await router.RouteAsync("$JS.API.STREAM.CREATE.A", payload.AsMemory()); + router.ForwardedCount.ShouldBe(1); + + await router.RouteAsync("$JS.API.STREAM.DELETE.A", ReadOnlyMemory.Empty); + router.ForwardedCount.ShouldBe(2); + } + + /// + /// When this node is the leader, RouteAsync handles requests locally and does not call the forwarder. + /// Go reference: jetstream_api.go — leader handles requests directly. + /// + [Fact] + public async Task Route_processes_locally_when_leader() + { + var metaGroup = new JetStreamMetaGroup(nodes: 3, selfIndex: 1); // IS leader + var streamManager = new StreamManager(metaGroup); + var consumerManager = new ConsumerManager(); + var forwarder = new StubForwarder(JetStreamApiResponse.SuccessResponse()); + var router = new JetStreamApiRouter(streamManager, consumerManager, metaGroup, forwarder); + + var payload = Encoding.UTF8.GetBytes("""{"name":"LOCAL","subjects":["local.>"]}"""); + var result = await router.RouteAsync("$JS.API.STREAM.CREATE.LOCAL", payload.AsMemory()); + + forwarder.CallCount.ShouldBe(0); + result.Error.ShouldBeNull(); + result.StreamInfo.ShouldNotBeNull(); + result.StreamInfo!.Config.Name.ShouldBe("LOCAL"); + } + + /// + /// When no meta-group is configured (single-server), RouteAsync handles all requests locally. + /// Go reference: jetstream_api.go — standalone servers have no meta-group. + /// + [Fact] + public async Task Route_processes_locally_when_no_meta_group() + { + var streamManager = new StreamManager(); + var consumerManager = new ConsumerManager(); + var forwarder = new StubForwarder(JetStreamApiResponse.SuccessResponse()); + var router = new JetStreamApiRouter(streamManager, consumerManager, metaGroup: null, forwarder); + + var payload = Encoding.UTF8.GetBytes("""{"name":"SOLO","subjects":["solo.>"]}"""); + var result = await router.RouteAsync("$JS.API.STREAM.CREATE.SOLO", payload.AsMemory()); + + forwarder.CallCount.ShouldBe(0); + result.Error.ShouldBeNull(); + result.StreamInfo.ShouldNotBeNull(); + result.StreamInfo!.Config.Name.ShouldBe("SOLO"); + } + + /// + /// RouteAsync passes the payload bytes verbatim to the forwarder. + /// Go reference: jetstream_api.go — forwarded request includes the original payload. + /// + [Fact] + public async Task RouteAsync_forwards_to_leader_with_payload() + { + var metaGroup = new JetStreamMetaGroup(nodes: 3, selfIndex: 2); + var streamManager = new StreamManager(metaGroup); + var consumerManager = new ConsumerManager(); + var forwarded = JetStreamApiResponse.SuccessResponse(); + var forwarder = new StubForwarder(forwarded); + var router = new JetStreamApiRouter(streamManager, consumerManager, metaGroup, forwarder); + + var payloadBytes = Encoding.UTF8.GetBytes("""{"name":"PAYLOAD","subjects":["p.>"]}"""); + await router.RouteAsync("$JS.API.STREAM.CREATE.PAYLOAD", payloadBytes.AsMemory()); + + forwarder.LastPayload.Length.ShouldBe(payloadBytes.Length); + var receivedBytes = forwarder.LastPayload.ToArray(); + receivedBytes.ShouldBe(payloadBytes); + } + + /// + /// DefaultLeaderForwarder accepts a custom timeout value. + /// Go reference: jetstream_api.go — configurable forward timeout for slow leader responses. + /// + [Fact] + public void Forward_timeout_configurable() + { + var customTimeout = TimeSpan.FromSeconds(10); + var forwarder = new DefaultLeaderForwarder(customTimeout); + + forwarder.Timeout.ShouldBe(customTimeout); + } + + /// + /// DefaultLeaderForwarder uses a 5-second default timeout when none is provided. + /// Go reference: jetstream_api.go — default forward timeout. + /// + [Fact] + public void Forward_timeout_defaults_to_five_seconds() + { + var forwarder = new DefaultLeaderForwarder(); + + forwarder.Timeout.ShouldBe(TimeSpan.FromSeconds(5)); + } }