diff --git a/src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs b/src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs
index 9ca53c9..d5ebab0 100644
--- a/src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs
+++ b/src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs
@@ -2,6 +2,64 @@ using NATS.Server.JetStream.Api.Handlers;
namespace NATS.Server.JetStream.Api;
+///
+/// Abstraction for forwarding a JetStream API request to the meta-group leader.
+/// Allows the forwarding behaviour to be replaced with a test double.
+/// Go reference: jetstream_api.go — jsClusteredStreamXxxRequest helpers forward to the leader.
+///
+public interface ILeaderForwarder
+{
+ ///
+ /// Forwards the request to the current meta-group leader.
+ /// Returns the leader's response, or null when forwarding is not available
+ /// (e.g. no route to leader) so the caller can fall back to a NotLeader error.
+ ///
+ Task ForwardAsync(
+ string subject,
+ ReadOnlyMemory payload,
+ string leaderName,
+ CancellationToken ct);
+}
+
+///
+/// Default implementation of .
+/// In a real cluster, this would send the request to the leader over the internal
+/// route connection using a request-reply pattern.
+/// The current implementation returns null (forwarding unavailable) so the caller
+/// falls back to a NotLeader error response, matching the stub behaviour from before Task 19.
+/// Go reference: jetstream_api.go — jsClusteredStreamRequest request-reply to leader.
+///
+public sealed class DefaultLeaderForwarder
+{
+ ///
+ /// How long to wait for a response from the leader before giving up.
+ ///
+ public TimeSpan Timeout { get; }
+
+ public DefaultLeaderForwarder(TimeSpan? timeout = null)
+ {
+ Timeout = timeout ?? TimeSpan.FromSeconds(5);
+ }
+
+ ///
+ public Task ForwardAsync(
+ string subject,
+ ReadOnlyMemory payload,
+ string leaderName,
+ CancellationToken ct)
+ {
+ // In a real cluster this would serialise the request and forward it to
+ // the leader over the internal route connection using request-reply.
+ // Returning null signals "forwarding unavailable" — the router falls back
+ // to a NotLeader error so the client can retry against the leader directly.
+ _ = subject;
+ _ = payload;
+ _ = leaderName;
+ _ = ct;
+ return Task.FromResult(null);
+ }
+}
+
///
/// Routes JetStream API requests to the appropriate handler.
/// Go reference: jetstream_api.go:200-300 — non-leader nodes must forward or reject
@@ -12,19 +70,33 @@ public sealed class JetStreamApiRouter
private readonly StreamManager _streamManager;
private readonly ConsumerManager _consumerManager;
private readonly JetStream.Cluster.JetStreamMetaGroup? _metaGroup;
+ private readonly ILeaderForwarder? _forwarder;
+ private long _forwardedCount;
public JetStreamApiRouter()
: this(new StreamManager(), new ConsumerManager(), null)
{
}
- public JetStreamApiRouter(StreamManager streamManager, ConsumerManager consumerManager, JetStream.Cluster.JetStreamMetaGroup? metaGroup = null)
+ public JetStreamApiRouter(
+ StreamManager streamManager,
+ ConsumerManager consumerManager,
+ JetStream.Cluster.JetStreamMetaGroup? metaGroup = null,
+ ILeaderForwarder? forwarder = null)
{
_streamManager = streamManager;
_consumerManager = consumerManager;
_metaGroup = metaGroup;
+ _forwarder = forwarder;
}
+ ///
+ /// Number of requests that were successfully forwarded to the leader (non-null response).
+ /// Useful for monitoring leader-forwarding activity.
+ /// Go reference: jetstream_api.go — observable for forwarded API calls.
+ ///
+ public long ForwardedCount => Interlocked.Read(ref _forwardedCount);
+
///
/// Determines whether the given API subject requires leader-only handling.
/// Mutating operations (Create, Update, Delete, Purge, Restore, Pause, Reset, Unpin,
@@ -89,17 +161,64 @@ public sealed class JetStreamApiRouter
}
///
- /// Stub for future leader-forwarding implementation.
- /// In a clustered deployment this would serialize the request and forward it
- /// to the leader node over the internal route connection.
+ /// Returns a not-leader error for backward-compatible synchronous callers.
+ /// Async callers should use which also attempts forwarding.
/// Go reference: jetstream_api.go — jsClusteredStreamXxxRequest helpers.
///
public static JetStreamApiResponse ForwardToLeader(string subject, ReadOnlySpan payload, string leaderName)
{
- // For now, return the not-leader error with a hint so the client can retry.
+ _ = subject;
+ _ = payload;
return JetStreamApiResponse.NotLeader(leaderName);
}
+ ///
+ /// Routes a JetStream API request asynchronously.
+ /// When this node is not the meta-group leader and the subject requires leadership,
+ /// attempts to forward the request to the leader via .
+ /// Falls back to a NotLeader error when no forwarder is configured or when the
+ /// forwarder returns null or throws .
+ /// Read-only operations are always handled locally regardless of leadership.
+ /// Go reference: jetstream_api.go:200-300 — leader-forwarding path.
+ ///
+ public async Task RouteAsync(
+ string subject,
+ ReadOnlyMemory payload,
+ CancellationToken ct = default)
+ {
+ // Go reference: jetstream_api.go:200-300 — leader check + forwarding.
+ if (_metaGroup is not null && IsLeaderRequired(subject) && !_metaGroup.IsLeader())
+ {
+ if (_forwarder is not null)
+ {
+ JetStreamApiResponse? forwarded = null;
+ bool forwardTimedOut = false;
+ try
+ {
+ forwarded = await _forwarder.ForwardAsync(subject, payload, _metaGroup.Leader, ct);
+ }
+ catch (OperationCanceledException ex) when (!ct.IsCancellationRequested)
+ {
+ // Forward attempt timed out internally (not cancelled by the caller).
+ // Fall through to the NotLeader response below.
+ forwardTimedOut = true;
+ _ = ex; // acknowledged: timeout during forward, falling back to NotLeader
+ }
+
+ if (!forwardTimedOut && forwarded is not null)
+ {
+ Interlocked.Increment(ref _forwardedCount);
+ return forwarded;
+ }
+ }
+
+ return JetStreamApiResponse.NotLeader(_metaGroup.Leader);
+ }
+
+ // Handle locally (leader or read-only operation).
+ return Route(subject, payload.Span);
+ }
+
public JetStreamApiResponse Route(string subject, ReadOnlySpan payload)
{
// Go reference: jetstream_api.go:200-300 — leader check + forwarding.
diff --git a/tests/NATS.Server.Tests/JetStream/Api/LeaderForwardingTests.cs b/tests/NATS.Server.Tests/JetStream/Api/LeaderForwardingTests.cs
index ebbd29c..fe9311f 100644
--- a/tests/NATS.Server.Tests/JetStream/Api/LeaderForwardingTests.cs
+++ b/tests/NATS.Server.Tests/JetStream/Api/LeaderForwardingTests.cs
@@ -9,6 +9,38 @@ using NATS.Server.JetStream.Cluster;
namespace NATS.Server.Tests.JetStream.Api;
+///
+/// Simple test double for ILeaderForwarder.
+/// Returns a predetermined response or null depending on the constructor.
+///
+file sealed class StubForwarder(JetStreamApiResponse? response) : ILeaderForwarder
+{
+ public int CallCount { get; private set; }
+ public string? LastSubject { get; private set; }
+ public ReadOnlyMemory LastPayload { get; private set; }
+ public string? LastLeaderName { get; private set; }
+
+ public Task ForwardAsync(
+ string subject, ReadOnlyMemory payload, string leaderName, CancellationToken ct)
+ {
+ CallCount++;
+ LastSubject = subject;
+ LastPayload = payload;
+ LastLeaderName = leaderName;
+ return Task.FromResult(response);
+ }
+}
+
+///
+/// Test double that throws OperationCanceledException to simulate a timeout.
+///
+file sealed class TimeoutForwarder : ILeaderForwarder
+{
+ public Task ForwardAsync(
+ string subject, ReadOnlyMemory payload, string leaderName, CancellationToken ct)
+ => Task.FromException(new OperationCanceledException("simulated timeout"));
+}
+
public class LeaderForwardingTests
{
///
@@ -147,4 +179,234 @@ public class LeaderForwardingTests
JetStreamApiRouter.IsLeaderRequired("$JS.API.CONSUMER.MSG.NEXT.STREAM.CON").ShouldBeFalse();
JetStreamApiRouter.IsLeaderRequired("$JS.API.DIRECT.GET.TEST").ShouldBeFalse();
}
+
+ // ---------------------------------------------------------------
+ // New tests for Task 19: Leader Forwarding (Gap 7.1)
+ // Go reference: jetstream_api.go:200-300 — jsClusteredStreamXxxRequest helpers.
+ // ---------------------------------------------------------------
+
+ ///
+ /// When not leader and a forwarder is provided, RouteAsync calls forward for mutating ops.
+ /// Go reference: jetstream_api.go — non-leader nodes forward mutating ops to the leader.
+ ///
+ [Fact]
+ public async Task Route_forwards_mutating_request_when_not_leader()
+ {
+ var metaGroup = new JetStreamMetaGroup(nodes: 3, selfIndex: 2);
+ var streamManager = new StreamManager(metaGroup);
+ var consumerManager = new ConsumerManager();
+ var forwarded = JetStreamApiResponse.SuccessResponse();
+ var forwarder = new StubForwarder(forwarded);
+ var router = new JetStreamApiRouter(streamManager, consumerManager, metaGroup, forwarder);
+
+ var payload = Encoding.UTF8.GetBytes("""{"name":"FWD","subjects":["fwd.>"]}""");
+ var result = await router.RouteAsync("$JS.API.STREAM.CREATE.FWD", payload.AsMemory());
+
+ forwarder.CallCount.ShouldBe(1);
+ forwarder.LastSubject.ShouldBe("$JS.API.STREAM.CREATE.FWD");
+ forwarder.LastLeaderName.ShouldBe("meta-1");
+ result.ShouldBeSameAs(forwarded);
+ }
+
+ ///
+ /// When not leader and no forwarder is provided, RouteAsync returns a NotLeader error.
+ /// Go reference: jetstream_api.go — fallback to not-leader error when no forwarder.
+ ///
+ [Fact]
+ public async Task Route_returns_not_leader_when_no_forwarder()
+ {
+ var metaGroup = new JetStreamMetaGroup(nodes: 3, selfIndex: 2);
+ var streamManager = new StreamManager(metaGroup);
+ var consumerManager = new ConsumerManager();
+ var router = new JetStreamApiRouter(streamManager, consumerManager, metaGroup, forwarder: null);
+
+ var payload = Encoding.UTF8.GetBytes("""{"name":"TEST","subjects":["test.>"]}""");
+ var result = await router.RouteAsync("$JS.API.STREAM.CREATE.TEST", payload.AsMemory());
+
+ result.Error.ShouldNotBeNull();
+ result.Error!.Code.ShouldBe(10003);
+ result.Error.Description.ShouldBe("not leader");
+ result.Error.LeaderHint.ShouldBe("meta-1");
+ }
+
+ ///
+ /// Read-only operations are handled locally even when not leader and a forwarder is set.
+ /// Go reference: jetstream_api.go — read ops do not require leadership, never forwarded.
+ ///
+ [Fact]
+ public async Task Route_does_not_forward_read_only_requests()
+ {
+ var metaGroup = new JetStreamMetaGroup(nodes: 3, selfIndex: 2);
+ var streamManager = new StreamManager(metaGroup);
+ var consumerManager = new ConsumerManager();
+ var forwarder = new StubForwarder(JetStreamApiResponse.SuccessResponse());
+ var router = new JetStreamApiRouter(streamManager, consumerManager, metaGroup, forwarder);
+
+ // $JS.API.INFO — read only
+ var infoResult = await router.RouteAsync("$JS.API.INFO", ReadOnlyMemory.Empty);
+ infoResult.Error.ShouldBeNull();
+
+ // $JS.API.STREAM.NAMES — read only
+ var namesResult = await router.RouteAsync("$JS.API.STREAM.NAMES", ReadOnlyMemory.Empty);
+ namesResult.Error.ShouldBeNull();
+
+ // Forwarder should never have been called for read-only subjects.
+ forwarder.CallCount.ShouldBe(0);
+ }
+
+ ///
+ /// When the forwarder returns null, RouteAsync falls back to a NotLeader response.
+ /// Go reference: jetstream_api.go — null forward result means forwarding unavailable.
+ ///
+ [Fact]
+ public async Task Route_handles_forward_returning_null_gracefully()
+ {
+ var metaGroup = new JetStreamMetaGroup(nodes: 3, selfIndex: 2);
+ var streamManager = new StreamManager(metaGroup);
+ var consumerManager = new ConsumerManager();
+ var forwarder = new StubForwarder(null); // returns null
+ var router = new JetStreamApiRouter(streamManager, consumerManager, metaGroup, forwarder);
+
+ var payload = Encoding.UTF8.GetBytes("""{"name":"TEST","subjects":["test.>"]}""");
+ var result = await router.RouteAsync("$JS.API.STREAM.CREATE.TEST", payload.AsMemory());
+
+ forwarder.CallCount.ShouldBe(1);
+ result.Error.ShouldNotBeNull();
+ result.Error!.Code.ShouldBe(10003);
+ result.Error.Description.ShouldBe("not leader");
+ }
+
+ ///
+ /// When the forwarder throws OperationCanceledException (timeout), RouteAsync falls back to NotLeader.
+ /// Go reference: jetstream_api.go — timeout/cancellation during forwarding falls back gracefully.
+ ///
+ [Fact]
+ public async Task Route_handles_forward_timeout()
+ {
+ var metaGroup = new JetStreamMetaGroup(nodes: 3, selfIndex: 2);
+ var streamManager = new StreamManager(metaGroup);
+ var consumerManager = new ConsumerManager();
+ var forwarder = new TimeoutForwarder();
+ var router = new JetStreamApiRouter(streamManager, consumerManager, metaGroup, forwarder);
+
+ var payload = Encoding.UTF8.GetBytes("""{"name":"TEST","subjects":["test.>"]}""");
+ var result = await router.RouteAsync("$JS.API.STREAM.CREATE.TEST", payload.AsMemory());
+
+ result.Error.ShouldNotBeNull();
+ result.Error!.Code.ShouldBe(10003);
+ result.Error.Description.ShouldBe("not leader");
+ }
+
+ ///
+ /// ForwardedCount increments on each successful (non-null) forward result.
+ /// Go reference: jetstream_api.go — monitoring/observability for forwarded requests.
+ ///
+ [Fact]
+ public async Task ForwardedCount_increments_on_successful_forward()
+ {
+ var metaGroup = new JetStreamMetaGroup(nodes: 3, selfIndex: 2);
+ var streamManager = new StreamManager(metaGroup);
+ var consumerManager = new ConsumerManager();
+ var forwarder = new StubForwarder(JetStreamApiResponse.SuccessResponse());
+ var router = new JetStreamApiRouter(streamManager, consumerManager, metaGroup, forwarder);
+
+ router.ForwardedCount.ShouldBe(0);
+
+ var payload = Encoding.UTF8.GetBytes("""{"name":"A","subjects":["a.>"]}""");
+ await router.RouteAsync("$JS.API.STREAM.CREATE.A", payload.AsMemory());
+ router.ForwardedCount.ShouldBe(1);
+
+ await router.RouteAsync("$JS.API.STREAM.DELETE.A", ReadOnlyMemory.Empty);
+ router.ForwardedCount.ShouldBe(2);
+ }
+
+ ///
+ /// When this node is the leader, RouteAsync handles requests locally and does not call the forwarder.
+ /// Go reference: jetstream_api.go — leader handles requests directly.
+ ///
+ [Fact]
+ public async Task Route_processes_locally_when_leader()
+ {
+ var metaGroup = new JetStreamMetaGroup(nodes: 3, selfIndex: 1); // IS leader
+ var streamManager = new StreamManager(metaGroup);
+ var consumerManager = new ConsumerManager();
+ var forwarder = new StubForwarder(JetStreamApiResponse.SuccessResponse());
+ var router = new JetStreamApiRouter(streamManager, consumerManager, metaGroup, forwarder);
+
+ var payload = Encoding.UTF8.GetBytes("""{"name":"LOCAL","subjects":["local.>"]}""");
+ var result = await router.RouteAsync("$JS.API.STREAM.CREATE.LOCAL", payload.AsMemory());
+
+ forwarder.CallCount.ShouldBe(0);
+ result.Error.ShouldBeNull();
+ result.StreamInfo.ShouldNotBeNull();
+ result.StreamInfo!.Config.Name.ShouldBe("LOCAL");
+ }
+
+ ///
+ /// When no meta-group is configured (single-server), RouteAsync handles all requests locally.
+ /// Go reference: jetstream_api.go — standalone servers have no meta-group.
+ ///
+ [Fact]
+ public async Task Route_processes_locally_when_no_meta_group()
+ {
+ var streamManager = new StreamManager();
+ var consumerManager = new ConsumerManager();
+ var forwarder = new StubForwarder(JetStreamApiResponse.SuccessResponse());
+ var router = new JetStreamApiRouter(streamManager, consumerManager, metaGroup: null, forwarder);
+
+ var payload = Encoding.UTF8.GetBytes("""{"name":"SOLO","subjects":["solo.>"]}""");
+ var result = await router.RouteAsync("$JS.API.STREAM.CREATE.SOLO", payload.AsMemory());
+
+ forwarder.CallCount.ShouldBe(0);
+ result.Error.ShouldBeNull();
+ result.StreamInfo.ShouldNotBeNull();
+ result.StreamInfo!.Config.Name.ShouldBe("SOLO");
+ }
+
+ ///
+ /// RouteAsync passes the payload bytes verbatim to the forwarder.
+ /// Go reference: jetstream_api.go — forwarded request includes the original payload.
+ ///
+ [Fact]
+ public async Task RouteAsync_forwards_to_leader_with_payload()
+ {
+ var metaGroup = new JetStreamMetaGroup(nodes: 3, selfIndex: 2);
+ var streamManager = new StreamManager(metaGroup);
+ var consumerManager = new ConsumerManager();
+ var forwarded = JetStreamApiResponse.SuccessResponse();
+ var forwarder = new StubForwarder(forwarded);
+ var router = new JetStreamApiRouter(streamManager, consumerManager, metaGroup, forwarder);
+
+ var payloadBytes = Encoding.UTF8.GetBytes("""{"name":"PAYLOAD","subjects":["p.>"]}""");
+ await router.RouteAsync("$JS.API.STREAM.CREATE.PAYLOAD", payloadBytes.AsMemory());
+
+ forwarder.LastPayload.Length.ShouldBe(payloadBytes.Length);
+ var receivedBytes = forwarder.LastPayload.ToArray();
+ receivedBytes.ShouldBe(payloadBytes);
+ }
+
+ ///
+ /// DefaultLeaderForwarder accepts a custom timeout value.
+ /// Go reference: jetstream_api.go — configurable forward timeout for slow leader responses.
+ ///
+ [Fact]
+ public void Forward_timeout_configurable()
+ {
+ var customTimeout = TimeSpan.FromSeconds(10);
+ var forwarder = new DefaultLeaderForwarder(customTimeout);
+
+ forwarder.Timeout.ShouldBe(customTimeout);
+ }
+
+ ///
+ /// DefaultLeaderForwarder uses a 5-second default timeout when none is provided.
+ /// Go reference: jetstream_api.go — default forward timeout.
+ ///
+ [Fact]
+ public void Forward_timeout_defaults_to_five_seconds()
+ {
+ var forwarder = new DefaultLeaderForwarder();
+
+ forwarder.Timeout.ShouldBe(TimeSpan.FromSeconds(5));
+ }
}