feat: implement leader forwarding for JetStream API (Gap 7.1)

Add ILeaderForwarder interface and DefaultLeaderForwarder, update
JetStreamApiRouter with RouteAsync + ForwardedCount so non-leader nodes
can attempt to forward mutating requests to the meta-group leader before
falling back to a NotLeader error response.
This commit is contained in:
Joseph Doherty
2026-02-25 09:53:50 -05:00
parent aeeb2e6929
commit d817d6f7a2
2 changed files with 386 additions and 5 deletions

View File

@@ -9,6 +9,38 @@ using NATS.Server.JetStream.Cluster;
namespace NATS.Server.Tests.JetStream.Api;
/// <summary>
/// Simple test double for ILeaderForwarder.
/// Returns a predetermined response or null depending on the constructor.
/// </summary>
file sealed class StubForwarder(JetStreamApiResponse? response) : ILeaderForwarder
{
public int CallCount { get; private set; }
public string? LastSubject { get; private set; }
public ReadOnlyMemory<byte> LastPayload { get; private set; }
public string? LastLeaderName { get; private set; }
public Task<JetStreamApiResponse?> ForwardAsync(
string subject, ReadOnlyMemory<byte> payload, string leaderName, CancellationToken ct)
{
CallCount++;
LastSubject = subject;
LastPayload = payload;
LastLeaderName = leaderName;
return Task.FromResult(response);
}
}
/// <summary>
/// Test double that throws OperationCanceledException to simulate a timeout.
/// </summary>
file sealed class TimeoutForwarder : ILeaderForwarder
{
public Task<JetStreamApiResponse?> ForwardAsync(
string subject, ReadOnlyMemory<byte> payload, string leaderName, CancellationToken ct)
=> Task.FromException<JetStreamApiResponse?>(new OperationCanceledException("simulated timeout"));
}
public class LeaderForwardingTests
{
/// <summary>
@@ -147,4 +179,234 @@ public class LeaderForwardingTests
JetStreamApiRouter.IsLeaderRequired("$JS.API.CONSUMER.MSG.NEXT.STREAM.CON").ShouldBeFalse();
JetStreamApiRouter.IsLeaderRequired("$JS.API.DIRECT.GET.TEST").ShouldBeFalse();
}
// ---------------------------------------------------------------
// New tests for Task 19: Leader Forwarding (Gap 7.1)
// Go reference: jetstream_api.go:200-300 — jsClusteredStreamXxxRequest helpers.
// ---------------------------------------------------------------
/// <summary>
/// When not leader and a forwarder is provided, RouteAsync calls forward for mutating ops.
/// Go reference: jetstream_api.go — non-leader nodes forward mutating ops to the leader.
/// </summary>
[Fact]
public async Task Route_forwards_mutating_request_when_not_leader()
{
var metaGroup = new JetStreamMetaGroup(nodes: 3, selfIndex: 2);
var streamManager = new StreamManager(metaGroup);
var consumerManager = new ConsumerManager();
var forwarded = JetStreamApiResponse.SuccessResponse();
var forwarder = new StubForwarder(forwarded);
var router = new JetStreamApiRouter(streamManager, consumerManager, metaGroup, forwarder);
var payload = Encoding.UTF8.GetBytes("""{"name":"FWD","subjects":["fwd.>"]}""");
var result = await router.RouteAsync("$JS.API.STREAM.CREATE.FWD", payload.AsMemory());
forwarder.CallCount.ShouldBe(1);
forwarder.LastSubject.ShouldBe("$JS.API.STREAM.CREATE.FWD");
forwarder.LastLeaderName.ShouldBe("meta-1");
result.ShouldBeSameAs(forwarded);
}
/// <summary>
/// When not leader and no forwarder is provided, RouteAsync returns a NotLeader error.
/// Go reference: jetstream_api.go — fallback to not-leader error when no forwarder.
/// </summary>
[Fact]
public async Task Route_returns_not_leader_when_no_forwarder()
{
var metaGroup = new JetStreamMetaGroup(nodes: 3, selfIndex: 2);
var streamManager = new StreamManager(metaGroup);
var consumerManager = new ConsumerManager();
var router = new JetStreamApiRouter(streamManager, consumerManager, metaGroup, forwarder: null);
var payload = Encoding.UTF8.GetBytes("""{"name":"TEST","subjects":["test.>"]}""");
var result = await router.RouteAsync("$JS.API.STREAM.CREATE.TEST", payload.AsMemory());
result.Error.ShouldNotBeNull();
result.Error!.Code.ShouldBe(10003);
result.Error.Description.ShouldBe("not leader");
result.Error.LeaderHint.ShouldBe("meta-1");
}
/// <summary>
/// Read-only operations are handled locally even when not leader and a forwarder is set.
/// Go reference: jetstream_api.go — read ops do not require leadership, never forwarded.
/// </summary>
[Fact]
public async Task Route_does_not_forward_read_only_requests()
{
var metaGroup = new JetStreamMetaGroup(nodes: 3, selfIndex: 2);
var streamManager = new StreamManager(metaGroup);
var consumerManager = new ConsumerManager();
var forwarder = new StubForwarder(JetStreamApiResponse.SuccessResponse());
var router = new JetStreamApiRouter(streamManager, consumerManager, metaGroup, forwarder);
// $JS.API.INFO — read only
var infoResult = await router.RouteAsync("$JS.API.INFO", ReadOnlyMemory<byte>.Empty);
infoResult.Error.ShouldBeNull();
// $JS.API.STREAM.NAMES — read only
var namesResult = await router.RouteAsync("$JS.API.STREAM.NAMES", ReadOnlyMemory<byte>.Empty);
namesResult.Error.ShouldBeNull();
// Forwarder should never have been called for read-only subjects.
forwarder.CallCount.ShouldBe(0);
}
/// <summary>
/// When the forwarder returns null, RouteAsync falls back to a NotLeader response.
/// Go reference: jetstream_api.go — null forward result means forwarding unavailable.
/// </summary>
[Fact]
public async Task Route_handles_forward_returning_null_gracefully()
{
var metaGroup = new JetStreamMetaGroup(nodes: 3, selfIndex: 2);
var streamManager = new StreamManager(metaGroup);
var consumerManager = new ConsumerManager();
var forwarder = new StubForwarder(null); // returns null
var router = new JetStreamApiRouter(streamManager, consumerManager, metaGroup, forwarder);
var payload = Encoding.UTF8.GetBytes("""{"name":"TEST","subjects":["test.>"]}""");
var result = await router.RouteAsync("$JS.API.STREAM.CREATE.TEST", payload.AsMemory());
forwarder.CallCount.ShouldBe(1);
result.Error.ShouldNotBeNull();
result.Error!.Code.ShouldBe(10003);
result.Error.Description.ShouldBe("not leader");
}
/// <summary>
/// When the forwarder throws OperationCanceledException (timeout), RouteAsync falls back to NotLeader.
/// Go reference: jetstream_api.go — timeout/cancellation during forwarding falls back gracefully.
/// </summary>
[Fact]
public async Task Route_handles_forward_timeout()
{
var metaGroup = new JetStreamMetaGroup(nodes: 3, selfIndex: 2);
var streamManager = new StreamManager(metaGroup);
var consumerManager = new ConsumerManager();
var forwarder = new TimeoutForwarder();
var router = new JetStreamApiRouter(streamManager, consumerManager, metaGroup, forwarder);
var payload = Encoding.UTF8.GetBytes("""{"name":"TEST","subjects":["test.>"]}""");
var result = await router.RouteAsync("$JS.API.STREAM.CREATE.TEST", payload.AsMemory());
result.Error.ShouldNotBeNull();
result.Error!.Code.ShouldBe(10003);
result.Error.Description.ShouldBe("not leader");
}
/// <summary>
/// ForwardedCount increments on each successful (non-null) forward result.
/// Go reference: jetstream_api.go — monitoring/observability for forwarded requests.
/// </summary>
[Fact]
public async Task ForwardedCount_increments_on_successful_forward()
{
var metaGroup = new JetStreamMetaGroup(nodes: 3, selfIndex: 2);
var streamManager = new StreamManager(metaGroup);
var consumerManager = new ConsumerManager();
var forwarder = new StubForwarder(JetStreamApiResponse.SuccessResponse());
var router = new JetStreamApiRouter(streamManager, consumerManager, metaGroup, forwarder);
router.ForwardedCount.ShouldBe(0);
var payload = Encoding.UTF8.GetBytes("""{"name":"A","subjects":["a.>"]}""");
await router.RouteAsync("$JS.API.STREAM.CREATE.A", payload.AsMemory());
router.ForwardedCount.ShouldBe(1);
await router.RouteAsync("$JS.API.STREAM.DELETE.A", ReadOnlyMemory<byte>.Empty);
router.ForwardedCount.ShouldBe(2);
}
/// <summary>
/// When this node is the leader, RouteAsync handles requests locally and does not call the forwarder.
/// Go reference: jetstream_api.go — leader handles requests directly.
/// </summary>
[Fact]
public async Task Route_processes_locally_when_leader()
{
var metaGroup = new JetStreamMetaGroup(nodes: 3, selfIndex: 1); // IS leader
var streamManager = new StreamManager(metaGroup);
var consumerManager = new ConsumerManager();
var forwarder = new StubForwarder(JetStreamApiResponse.SuccessResponse());
var router = new JetStreamApiRouter(streamManager, consumerManager, metaGroup, forwarder);
var payload = Encoding.UTF8.GetBytes("""{"name":"LOCAL","subjects":["local.>"]}""");
var result = await router.RouteAsync("$JS.API.STREAM.CREATE.LOCAL", payload.AsMemory());
forwarder.CallCount.ShouldBe(0);
result.Error.ShouldBeNull();
result.StreamInfo.ShouldNotBeNull();
result.StreamInfo!.Config.Name.ShouldBe("LOCAL");
}
/// <summary>
/// When no meta-group is configured (single-server), RouteAsync handles all requests locally.
/// Go reference: jetstream_api.go — standalone servers have no meta-group.
/// </summary>
[Fact]
public async Task Route_processes_locally_when_no_meta_group()
{
var streamManager = new StreamManager();
var consumerManager = new ConsumerManager();
var forwarder = new StubForwarder(JetStreamApiResponse.SuccessResponse());
var router = new JetStreamApiRouter(streamManager, consumerManager, metaGroup: null, forwarder);
var payload = Encoding.UTF8.GetBytes("""{"name":"SOLO","subjects":["solo.>"]}""");
var result = await router.RouteAsync("$JS.API.STREAM.CREATE.SOLO", payload.AsMemory());
forwarder.CallCount.ShouldBe(0);
result.Error.ShouldBeNull();
result.StreamInfo.ShouldNotBeNull();
result.StreamInfo!.Config.Name.ShouldBe("SOLO");
}
/// <summary>
/// RouteAsync passes the payload bytes verbatim to the forwarder.
/// Go reference: jetstream_api.go — forwarded request includes the original payload.
/// </summary>
[Fact]
public async Task RouteAsync_forwards_to_leader_with_payload()
{
var metaGroup = new JetStreamMetaGroup(nodes: 3, selfIndex: 2);
var streamManager = new StreamManager(metaGroup);
var consumerManager = new ConsumerManager();
var forwarded = JetStreamApiResponse.SuccessResponse();
var forwarder = new StubForwarder(forwarded);
var router = new JetStreamApiRouter(streamManager, consumerManager, metaGroup, forwarder);
var payloadBytes = Encoding.UTF8.GetBytes("""{"name":"PAYLOAD","subjects":["p.>"]}""");
await router.RouteAsync("$JS.API.STREAM.CREATE.PAYLOAD", payloadBytes.AsMemory());
forwarder.LastPayload.Length.ShouldBe(payloadBytes.Length);
var receivedBytes = forwarder.LastPayload.ToArray();
receivedBytes.ShouldBe(payloadBytes);
}
/// <summary>
/// DefaultLeaderForwarder accepts a custom timeout value.
/// Go reference: jetstream_api.go — configurable forward timeout for slow leader responses.
/// </summary>
[Fact]
public void Forward_timeout_configurable()
{
var customTimeout = TimeSpan.FromSeconds(10);
var forwarder = new DefaultLeaderForwarder(customTimeout);
forwarder.Timeout.ShouldBe(customTimeout);
}
/// <summary>
/// DefaultLeaderForwarder uses a 5-second default timeout when none is provided.
/// Go reference: jetstream_api.go — default forward timeout.
/// </summary>
[Fact]
public void Forward_timeout_defaults_to_five_seconds()
{
var forwarder = new DefaultLeaderForwarder();
forwarder.Timeout.ShouldBe(TimeSpan.FromSeconds(5));
}
}