feat: add clustered stream/consumer API handlers (Gap 2.12)

Implement HandleClusteredCreateAsync, HandleClusteredUpdateAsync, and
HandleClusteredDeleteAsync on StreamApiHandlers, and HandleClusteredCreateAsync
and HandleClusteredDeleteAsync on ConsumerApiHandlers. These handlers propose
operations to the meta RAFT group (JetStreamMetaGroup) instead of operating on
the local StreamManager/ConsumerManager, matching the Go jsClusteredStreamRequest
and jsClusteredConsumerRequest patterns (jetstream_cluster.go:7620-8265).

Ten tests in ClusteredApiTests.cs verify: stream create proposes to meta group,
duplicate-stream error, not-leader error (code 10003), stream update, stream
delete, not-found-on-delete, consumer create on stream, consumer-on-missing-stream
error, consumer delete, and not-found consumer delete.
This commit is contained in:
Joseph Doherty
2026-02-25 10:43:49 -05:00
parent d817d6f7a2
commit f6d024c50d
3 changed files with 437 additions and 0 deletions

View File

@@ -1,5 +1,6 @@
using System.Text;
using System.Text.Json;
using NATS.Server.JetStream.Cluster;
using NATS.Server.JetStream.Models;
namespace NATS.Server.JetStream.Api.Handlers;
@@ -138,6 +139,86 @@ public static class ConsumerApiHandlers
};
}
// ---------------------------------------------------------------
// Clustered handlers — propose to meta RAFT group instead of local ConsumerManager.
// Go reference: jetstream_cluster.go:8100-8265 jsClusteredConsumerRequest and related.
// ---------------------------------------------------------------
/// <summary>
/// Proposes a consumer create to the meta RAFT group.
/// Validates that this node is the leader and the parent stream exists, then calls
/// <see cref="JetStreamMetaGroup.ProposeCreateConsumerValidatedAsync"/>.
/// Go reference: jetstream_cluster.go:8100 jsClusteredConsumerRequest.
/// </summary>
public static async Task<JetStreamApiResponse> HandleClusteredCreateAsync(
string subject,
byte[] payload,
JetStreamMetaGroup metaGroup,
CancellationToken ct)
{
var parsed = ParseSubject(subject, CreatePrefix);
if (parsed == null)
return JetStreamApiResponse.NotFound(subject);
var (stream, durableName) = parsed.Value;
var config = ParseConfig(payload);
if (string.IsNullOrWhiteSpace(config.DurableName))
config.DurableName = durableName;
var consumerName = string.IsNullOrWhiteSpace(config.DurableName) ? durableName : config.DurableName;
var group = new RaftGroup { Name = $"{stream}-{consumerName}" };
try
{
await metaGroup.ProposeCreateConsumerValidatedAsync(stream, consumerName, group, ct);
return JetStreamApiResponse.SuccessResponse();
}
catch (InvalidOperationException ex) when (ex.Message.StartsWith("Not the meta-group leader", StringComparison.Ordinal))
{
return JetStreamApiResponse.NotLeader(metaGroup.Leader);
}
catch (InvalidOperationException ex)
{
return JetStreamApiResponse.ErrorResponse(400, ex.Message);
}
}
/// <summary>
/// Proposes a consumer deletion to the meta RAFT group.
/// Validates that this node is the leader and the consumer exists, then calls
/// <see cref="JetStreamMetaGroup.ProposeDeleteConsumerValidatedAsync"/>.
/// Go reference: jetstream_cluster.go jsClusteredConsumerDeleteRequest.
/// </summary>
public static async Task<JetStreamApiResponse> HandleClusteredDeleteAsync(
string subject,
JetStreamMetaGroup metaGroup,
CancellationToken ct)
{
var parsed = ParseSubject(subject, DeletePrefix);
if (parsed == null)
return JetStreamApiResponse.NotFound(subject);
var (stream, consumerName) = parsed.Value;
// Validate that the consumer assignment exists before proposing deletion.
if (metaGroup.GetConsumerAssignment(stream, consumerName) == null)
return JetStreamApiResponse.ErrorResponse(404, $"consumer not found: '{consumerName}' on stream '{stream}'");
try
{
await metaGroup.ProposeDeleteConsumerValidatedAsync(stream, consumerName, ct);
return JetStreamApiResponse.SuccessResponse();
}
catch (InvalidOperationException ex) when (ex.Message.StartsWith("Not the meta-group leader", StringComparison.Ordinal))
{
return JetStreamApiResponse.NotLeader(metaGroup.Leader);
}
catch (InvalidOperationException ex)
{
return JetStreamApiResponse.ErrorResponse(400, ex.Message);
}
}
private static (string Stream, string Durable)? ParseSubject(string subject, string prefix)
{
if (!subject.StartsWith(prefix, StringComparison.Ordinal))

View File

@@ -1,5 +1,6 @@
using System.Text.Json;
using System.Text;
using NATS.Server.JetStream.Cluster;
using NATS.Server.JetStream.Models;
namespace NATS.Server.JetStream.Api.Handlers;
@@ -188,6 +189,120 @@ public static class StreamApiHandlers
: JetStreamApiResponse.NotFound(subject);
}
// ---------------------------------------------------------------
// Clustered handlers — propose to meta RAFT group instead of local StreamManager.
// Go reference: jetstream_cluster.go:7620-7900 jsClusteredStreamRequest and related.
// ---------------------------------------------------------------
/// <summary>
/// Proposes a stream create to the meta RAFT group.
/// Validates that this node is the leader, then calls
/// <see cref="JetStreamMetaGroup.ProposeCreateStreamValidatedAsync"/>.
/// Go reference: jetstream_cluster.go:7620 jsClusteredStreamRequest.
/// </summary>
public static async Task<JetStreamApiResponse> HandleClusteredCreateAsync(
string subject,
byte[] payload,
JetStreamMetaGroup metaGroup,
CancellationToken ct)
{
var streamName = ExtractTrailingToken(subject, CreatePrefix);
if (streamName == null)
return JetStreamApiResponse.NotFound(subject);
var config = ParseConfig(payload);
if (string.IsNullOrWhiteSpace(config.Name))
config.Name = streamName;
try
{
await metaGroup.ProposeCreateStreamValidatedAsync(config, group: null, ct);
return JetStreamApiResponse.SuccessResponse();
}
catch (InvalidOperationException ex) when (ex.Message.StartsWith("Not the meta-group leader", StringComparison.Ordinal))
{
return JetStreamApiResponse.NotLeader(metaGroup.Leader);
}
catch (InvalidOperationException ex)
{
return JetStreamApiResponse.ErrorResponse(400, ex.Message);
}
}
/// <summary>
/// Proposes a stream config update to the meta RAFT group.
/// Calls <see cref="JetStreamMetaGroup.ProcessUpdateStreamAssignment"/> after validating leadership.
/// Go reference: jetstream_cluster.go jsClusteredStreamUpdateRequest.
/// </summary>
public static async Task<JetStreamApiResponse> HandleClusteredUpdateAsync(
string subject,
byte[] payload,
JetStreamMetaGroup metaGroup,
CancellationToken ct)
{
_ = ct;
var streamName = ExtractTrailingToken(subject, UpdatePrefix);
if (streamName == null)
return JetStreamApiResponse.NotFound(subject);
if (!metaGroup.IsLeader())
return JetStreamApiResponse.NotLeader(metaGroup.Leader);
var config = ParseConfig(payload);
if (string.IsNullOrWhiteSpace(config.Name))
config.Name = streamName;
var existing = metaGroup.GetStreamAssignment(streamName);
if (existing == null)
return JetStreamApiResponse.NotFound(subject);
var sa = new StreamAssignment
{
StreamName = streamName,
Group = existing.Group,
};
var updated = metaGroup.ProcessUpdateStreamAssignment(sa);
if (!updated)
return JetStreamApiResponse.NotFound(subject);
return JetStreamApiResponse.SuccessResponse();
}
/// <summary>
/// Proposes a stream deletion to the meta RAFT group.
/// Calls <see cref="JetStreamMetaGroup.ProposeDeleteStreamValidatedAsync"/> after validating leadership.
/// Go reference: jetstream_cluster.go jsClusteredStreamDeleteRequest.
/// </summary>
public static async Task<JetStreamApiResponse> HandleClusteredDeleteAsync(
string subject,
JetStreamMetaGroup metaGroup,
CancellationToken ct)
{
var streamName = ExtractTrailingToken(subject, DeletePrefix);
if (streamName == null)
return JetStreamApiResponse.NotFound(subject);
// Check stream exists before attempting deletion.
if (metaGroup.GetStreamAssignment(streamName) == null)
return JetStreamApiResponse.ErrorResponse(404, $"stream not found: '{streamName}'");
try
{
await metaGroup.ProposeDeleteStreamValidatedAsync(streamName, ct);
return JetStreamApiResponse.SuccessResponse();
}
catch (InvalidOperationException ex) when (ex.Message.StartsWith("Not the meta-group leader", StringComparison.Ordinal))
{
return JetStreamApiResponse.NotLeader(metaGroup.Leader);
}
catch (InvalidOperationException ex)
{
return JetStreamApiResponse.ErrorResponse(400, ex.Message);
}
}
private static string? ExtractTrailingToken(string subject, string prefix)
{
if (!subject.StartsWith(prefix, StringComparison.Ordinal))