diff --git a/src/NATS.Server/JetStream/Api/Handlers/ConsumerApiHandlers.cs b/src/NATS.Server/JetStream/Api/Handlers/ConsumerApiHandlers.cs index cfee7f4..4bae5d3 100644 --- a/src/NATS.Server/JetStream/Api/Handlers/ConsumerApiHandlers.cs +++ b/src/NATS.Server/JetStream/Api/Handlers/ConsumerApiHandlers.cs @@ -1,5 +1,6 @@ using System.Text; using System.Text.Json; +using NATS.Server.JetStream.Cluster; using NATS.Server.JetStream.Models; namespace NATS.Server.JetStream.Api.Handlers; @@ -138,6 +139,86 @@ public static class ConsumerApiHandlers }; } + // --------------------------------------------------------------- + // Clustered handlers — propose to meta RAFT group instead of local ConsumerManager. + // Go reference: jetstream_cluster.go:8100-8265 jsClusteredConsumerRequest and related. + // --------------------------------------------------------------- + + /// + /// Proposes a consumer create to the meta RAFT group. + /// Validates that this node is the leader and the parent stream exists, then calls + /// . + /// Go reference: jetstream_cluster.go:8100 jsClusteredConsumerRequest. + /// + public static async Task HandleClusteredCreateAsync( + string subject, + byte[] payload, + JetStreamMetaGroup metaGroup, + CancellationToken ct) + { + var parsed = ParseSubject(subject, CreatePrefix); + if (parsed == null) + return JetStreamApiResponse.NotFound(subject); + + var (stream, durableName) = parsed.Value; + var config = ParseConfig(payload); + if (string.IsNullOrWhiteSpace(config.DurableName)) + config.DurableName = durableName; + + var consumerName = string.IsNullOrWhiteSpace(config.DurableName) ? durableName : config.DurableName; + var group = new RaftGroup { Name = $"{stream}-{consumerName}" }; + + try + { + await metaGroup.ProposeCreateConsumerValidatedAsync(stream, consumerName, group, ct); + return JetStreamApiResponse.SuccessResponse(); + } + catch (InvalidOperationException ex) when (ex.Message.StartsWith("Not the meta-group leader", StringComparison.Ordinal)) + { + return JetStreamApiResponse.NotLeader(metaGroup.Leader); + } + catch (InvalidOperationException ex) + { + return JetStreamApiResponse.ErrorResponse(400, ex.Message); + } + } + + /// + /// Proposes a consumer deletion to the meta RAFT group. + /// Validates that this node is the leader and the consumer exists, then calls + /// . + /// Go reference: jetstream_cluster.go jsClusteredConsumerDeleteRequest. + /// + public static async Task HandleClusteredDeleteAsync( + string subject, + JetStreamMetaGroup metaGroup, + CancellationToken ct) + { + var parsed = ParseSubject(subject, DeletePrefix); + if (parsed == null) + return JetStreamApiResponse.NotFound(subject); + + var (stream, consumerName) = parsed.Value; + + // Validate that the consumer assignment exists before proposing deletion. + if (metaGroup.GetConsumerAssignment(stream, consumerName) == null) + return JetStreamApiResponse.ErrorResponse(404, $"consumer not found: '{consumerName}' on stream '{stream}'"); + + try + { + await metaGroup.ProposeDeleteConsumerValidatedAsync(stream, consumerName, ct); + return JetStreamApiResponse.SuccessResponse(); + } + catch (InvalidOperationException ex) when (ex.Message.StartsWith("Not the meta-group leader", StringComparison.Ordinal)) + { + return JetStreamApiResponse.NotLeader(metaGroup.Leader); + } + catch (InvalidOperationException ex) + { + return JetStreamApiResponse.ErrorResponse(400, ex.Message); + } + } + private static (string Stream, string Durable)? ParseSubject(string subject, string prefix) { if (!subject.StartsWith(prefix, StringComparison.Ordinal)) diff --git a/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs b/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs index daeaf64..aeb38cb 100644 --- a/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs +++ b/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs @@ -1,5 +1,6 @@ using System.Text.Json; using System.Text; +using NATS.Server.JetStream.Cluster; using NATS.Server.JetStream.Models; namespace NATS.Server.JetStream.Api.Handlers; @@ -188,6 +189,120 @@ public static class StreamApiHandlers : JetStreamApiResponse.NotFound(subject); } + // --------------------------------------------------------------- + // Clustered handlers — propose to meta RAFT group instead of local StreamManager. + // Go reference: jetstream_cluster.go:7620-7900 jsClusteredStreamRequest and related. + // --------------------------------------------------------------- + + /// + /// Proposes a stream create to the meta RAFT group. + /// Validates that this node is the leader, then calls + /// . + /// Go reference: jetstream_cluster.go:7620 jsClusteredStreamRequest. + /// + public static async Task HandleClusteredCreateAsync( + string subject, + byte[] payload, + JetStreamMetaGroup metaGroup, + CancellationToken ct) + { + var streamName = ExtractTrailingToken(subject, CreatePrefix); + if (streamName == null) + return JetStreamApiResponse.NotFound(subject); + + var config = ParseConfig(payload); + if (string.IsNullOrWhiteSpace(config.Name)) + config.Name = streamName; + + try + { + await metaGroup.ProposeCreateStreamValidatedAsync(config, group: null, ct); + return JetStreamApiResponse.SuccessResponse(); + } + catch (InvalidOperationException ex) when (ex.Message.StartsWith("Not the meta-group leader", StringComparison.Ordinal)) + { + return JetStreamApiResponse.NotLeader(metaGroup.Leader); + } + catch (InvalidOperationException ex) + { + return JetStreamApiResponse.ErrorResponse(400, ex.Message); + } + } + + /// + /// Proposes a stream config update to the meta RAFT group. + /// Calls after validating leadership. + /// Go reference: jetstream_cluster.go jsClusteredStreamUpdateRequest. + /// + public static async Task HandleClusteredUpdateAsync( + string subject, + byte[] payload, + JetStreamMetaGroup metaGroup, + CancellationToken ct) + { + _ = ct; + + var streamName = ExtractTrailingToken(subject, UpdatePrefix); + if (streamName == null) + return JetStreamApiResponse.NotFound(subject); + + if (!metaGroup.IsLeader()) + return JetStreamApiResponse.NotLeader(metaGroup.Leader); + + var config = ParseConfig(payload); + if (string.IsNullOrWhiteSpace(config.Name)) + config.Name = streamName; + + var existing = metaGroup.GetStreamAssignment(streamName); + if (existing == null) + return JetStreamApiResponse.NotFound(subject); + + var sa = new StreamAssignment + { + StreamName = streamName, + Group = existing.Group, + }; + + var updated = metaGroup.ProcessUpdateStreamAssignment(sa); + if (!updated) + return JetStreamApiResponse.NotFound(subject); + + return JetStreamApiResponse.SuccessResponse(); + } + + /// + /// Proposes a stream deletion to the meta RAFT group. + /// Calls after validating leadership. + /// Go reference: jetstream_cluster.go jsClusteredStreamDeleteRequest. + /// + public static async Task HandleClusteredDeleteAsync( + string subject, + JetStreamMetaGroup metaGroup, + CancellationToken ct) + { + var streamName = ExtractTrailingToken(subject, DeletePrefix); + if (streamName == null) + return JetStreamApiResponse.NotFound(subject); + + // Check stream exists before attempting deletion. + if (metaGroup.GetStreamAssignment(streamName) == null) + return JetStreamApiResponse.ErrorResponse(404, $"stream not found: '{streamName}'"); + + try + { + await metaGroup.ProposeDeleteStreamValidatedAsync(streamName, ct); + return JetStreamApiResponse.SuccessResponse(); + } + catch (InvalidOperationException ex) when (ex.Message.StartsWith("Not the meta-group leader", StringComparison.Ordinal)) + { + return JetStreamApiResponse.NotLeader(metaGroup.Leader); + } + catch (InvalidOperationException ex) + { + return JetStreamApiResponse.ErrorResponse(400, ex.Message); + } + } + private static string? ExtractTrailingToken(string subject, string prefix) { if (!subject.StartsWith(prefix, StringComparison.Ordinal)) diff --git a/tests/NATS.Server.Tests/JetStream/Api/ClusteredApiTests.cs b/tests/NATS.Server.Tests/JetStream/Api/ClusteredApiTests.cs new file mode 100644 index 0000000..10a1fee --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Api/ClusteredApiTests.cs @@ -0,0 +1,241 @@ +// Go reference: jetstream_cluster.go:7620-8265 — clustered stream/consumer API handlers +// propose to the meta RAFT group rather than applying locally to StreamManager/ConsumerManager. + +using System.Text; +using NATS.Server.JetStream.Api.Handlers; +using NATS.Server.JetStream.Cluster; + +namespace NATS.Server.Tests.JetStream.Api; + +public class ClusteredApiTests +{ + // --------------------------------------------------------------- + // Stream clustered handlers + // --------------------------------------------------------------- + + /// + /// A successful clustered create proposes to the meta group, resulting in a new stream + /// assignment tracked under the provided name. + /// Go reference: jetstream_cluster.go:7620 jsClusteredStreamRequest. + /// + [Fact] + public async Task HandleClusteredCreate_proposes_to_meta_group() + { + var metaGroup = new JetStreamMetaGroup(nodes: 1); + var payload = Encoding.UTF8.GetBytes("""{"name":"ORDERS","subjects":["orders.>"]}"""); + + var response = await StreamApiHandlers.HandleClusteredCreateAsync( + "$JS.API.STREAM.CREATE.ORDERS", payload, metaGroup, CancellationToken.None); + + response.Error.ShouldBeNull(); + response.Success.ShouldBeTrue(); + metaGroup.GetStreamAssignment("ORDERS").ShouldNotBeNull(); + } + + /// + /// A duplicate clustered create for the same stream name returns an error response. + /// Go reference: jetstream_cluster.go — duplicate stream proposal returns error. + /// + [Fact] + public async Task HandleClusteredCreate_returns_error_for_duplicate() + { + var metaGroup = new JetStreamMetaGroup(nodes: 1); + var payload = Encoding.UTF8.GetBytes("""{"name":"ORDERS","subjects":["orders.>"]}"""); + + // First create succeeds. + var first = await StreamApiHandlers.HandleClusteredCreateAsync( + "$JS.API.STREAM.CREATE.ORDERS", payload, metaGroup, CancellationToken.None); + first.Error.ShouldBeNull(); + + // Second create for same name returns error. + var second = await StreamApiHandlers.HandleClusteredCreateAsync( + "$JS.API.STREAM.CREATE.ORDERS", payload, metaGroup, CancellationToken.None); + second.Error.ShouldNotBeNull(); + second.Error!.Description.ShouldContain("ORDERS"); + } + + /// + /// When this node is not the meta-group leader, clustered create returns a not-leader error. + /// Go reference: jetstream_cluster.go:7620 — leader check before proposing. + /// + [Fact] + public async Task HandleClusteredCreate_returns_error_when_not_leader() + { + // selfIndex=2, leaderIndex defaults to 1 — this node is NOT the leader. + var metaGroup = new JetStreamMetaGroup(nodes: 3, selfIndex: 2); + var payload = Encoding.UTF8.GetBytes("""{"name":"ORDERS","subjects":["orders.>"]}"""); + + var response = await StreamApiHandlers.HandleClusteredCreateAsync( + "$JS.API.STREAM.CREATE.ORDERS", payload, metaGroup, CancellationToken.None); + + response.Error.ShouldNotBeNull(); + response.Error!.Code.ShouldBe(10003); + response.Error.Description.ShouldBe("not leader"); + } + + /// + /// Clustered update proposes a config change to an existing stream assignment. + /// Go reference: jetstream_cluster.go jsClusteredStreamUpdateRequest. + /// + [Fact] + public async Task HandleClusteredUpdate_updates_existing_stream() + { + var metaGroup = new JetStreamMetaGroup(nodes: 1); + + // Create the stream first. + var createPayload = Encoding.UTF8.GetBytes("""{"name":"EVENTS","subjects":["events.>"]}"""); + await StreamApiHandlers.HandleClusteredCreateAsync( + "$JS.API.STREAM.CREATE.EVENTS", createPayload, metaGroup, CancellationToken.None); + + // Now update it with a max_msgs constraint. + var updatePayload = Encoding.UTF8.GetBytes("""{"name":"EVENTS","subjects":["events.>"],"max_msgs":500}"""); + var response = await StreamApiHandlers.HandleClusteredUpdateAsync( + "$JS.API.STREAM.UPDATE.EVENTS", updatePayload, metaGroup, CancellationToken.None); + + response.Error.ShouldBeNull(); + response.Success.ShouldBeTrue(); + + // The assignment should still exist. + metaGroup.GetStreamAssignment("EVENTS").ShouldNotBeNull(); + } + + /// + /// Clustered delete proposes removal of a stream from the meta group. + /// Go reference: jetstream_cluster.go processStreamRemoval via meta leader. + /// + [Fact] + public async Task HandleClusteredDelete_proposes_deletion() + { + var metaGroup = new JetStreamMetaGroup(nodes: 1); + var createPayload = Encoding.UTF8.GetBytes("""{"name":"ORDERS","subjects":["orders.>"]}"""); + await StreamApiHandlers.HandleClusteredCreateAsync( + "$JS.API.STREAM.CREATE.ORDERS", createPayload, metaGroup, CancellationToken.None); + + metaGroup.GetStreamAssignment("ORDERS").ShouldNotBeNull(); + + var response = await StreamApiHandlers.HandleClusteredDeleteAsync( + "$JS.API.STREAM.DELETE.ORDERS", metaGroup, CancellationToken.None); + + response.Error.ShouldBeNull(); + response.Success.ShouldBeTrue(); + metaGroup.GetStreamAssignment("ORDERS").ShouldBeNull(); + } + + /// + /// Clustered delete of a non-existent stream returns a 404 not-found error. + /// Go reference: jetstream_cluster.go — delete missing stream returns error. + /// + [Fact] + public async Task HandleClusteredDelete_returns_error_for_missing_stream() + { + var metaGroup = new JetStreamMetaGroup(nodes: 1); + + var response = await StreamApiHandlers.HandleClusteredDeleteAsync( + "$JS.API.STREAM.DELETE.GHOST", metaGroup, CancellationToken.None); + + response.Error.ShouldNotBeNull(); + response.Error!.Code.ShouldBe(404); + } + + // --------------------------------------------------------------- + // Consumer clustered handlers + // --------------------------------------------------------------- + + /// + /// Clustered consumer create proposes to the meta group, adding the consumer to the + /// stream's assignment map. + /// Go reference: jetstream_cluster.go:8100 jsClusteredConsumerRequest. + /// + [Fact] + public async Task Consumer_clustered_create_proposes_to_meta() + { + var metaGroup = new JetStreamMetaGroup(nodes: 1); + + // Create parent stream first. + await StreamApiHandlers.HandleClusteredCreateAsync( + "$JS.API.STREAM.CREATE.ORDERS", + Encoding.UTF8.GetBytes("""{"name":"ORDERS","subjects":["orders.>"]}"""), + metaGroup, + CancellationToken.None); + + var consumerPayload = Encoding.UTF8.GetBytes("""{"durable_name":"MON","filter_subject":"orders.created"}"""); + var response = await ConsumerApiHandlers.HandleClusteredCreateAsync( + "$JS.API.CONSUMER.CREATE.ORDERS.MON", consumerPayload, metaGroup, CancellationToken.None); + + response.Error.ShouldBeNull(); + response.Success.ShouldBeTrue(); + metaGroup.GetConsumerAssignment("ORDERS", "MON").ShouldNotBeNull(); + } + + /// + /// Creating a consumer on a stream that does not exist in the meta group returns an error. + /// Go reference: jetstream_cluster.go — consumer proposal validates stream existence. + /// + [Fact] + public async Task Consumer_clustered_create_returns_error_for_missing_stream() + { + var metaGroup = new JetStreamMetaGroup(nodes: 1); + var payload = Encoding.UTF8.GetBytes("""{"durable_name":"MON","filter_subject":"orders.created"}"""); + + var response = await ConsumerApiHandlers.HandleClusteredCreateAsync( + "$JS.API.CONSUMER.CREATE.GHOST.MON", payload, metaGroup, CancellationToken.None); + + response.Error.ShouldNotBeNull(); + response.Error!.Description.ShouldContain("GHOST"); + } + + /// + /// Clustered consumer delete removes the consumer from the stream assignment. + /// Go reference: jetstream_cluster.go processConsumerRemoval via meta leader. + /// + [Fact] + public async Task Consumer_clustered_delete_removes_consumer() + { + var metaGroup = new JetStreamMetaGroup(nodes: 1); + + // Set up stream and consumer. + await StreamApiHandlers.HandleClusteredCreateAsync( + "$JS.API.STREAM.CREATE.ORDERS", + Encoding.UTF8.GetBytes("""{"name":"ORDERS","subjects":["orders.>"]}"""), + metaGroup, + CancellationToken.None); + + await ConsumerApiHandlers.HandleClusteredCreateAsync( + "$JS.API.CONSUMER.CREATE.ORDERS.MON", + Encoding.UTF8.GetBytes("""{"durable_name":"MON"}"""), + metaGroup, + CancellationToken.None); + + metaGroup.GetConsumerAssignment("ORDERS", "MON").ShouldNotBeNull(); + + var response = await ConsumerApiHandlers.HandleClusteredDeleteAsync( + "$JS.API.CONSUMER.DELETE.ORDERS.MON", metaGroup, CancellationToken.None); + + response.Error.ShouldBeNull(); + response.Success.ShouldBeTrue(); + metaGroup.GetConsumerAssignment("ORDERS", "MON").ShouldBeNull(); + } + + /// + /// Deleting a non-existent consumer returns a 404 not-found error. + /// Go reference: jetstream_cluster.go — consumer delete validates existence. + /// + [Fact] + public async Task Consumer_clustered_delete_returns_not_found_for_missing() + { + var metaGroup = new JetStreamMetaGroup(nodes: 1); + + // Create the stream but not the consumer. + await StreamApiHandlers.HandleClusteredCreateAsync( + "$JS.API.STREAM.CREATE.ORDERS", + Encoding.UTF8.GetBytes("""{"name":"ORDERS","subjects":["orders.>"]}"""), + metaGroup, + CancellationToken.None); + + var response = await ConsumerApiHandlers.HandleClusteredDeleteAsync( + "$JS.API.CONSUMER.DELETE.ORDERS.GHOST", metaGroup, CancellationToken.None); + + response.Error.ShouldNotBeNull(); + response.Error!.Code.ShouldBe(404); + } +}