feat(jetstream): add API leader forwarding and stream purge options (C7+C8)

C7: JetStreamApiRouter now checks leadership before mutating operations.
Non-leader nodes return error code 10003 with a leader_hint field.
JetStreamMetaGroup gains IsLeader() and Leader for cluster-aware routing.

C8: StreamApiHandlers.HandlePurge accepts PurgeRequest options (filter,
seq, keep). StreamManager.PurgeEx implements subject-filtered purge,
sequence-based purge, keep-last-N, and filter+keep combinations.
This commit is contained in:
Joseph Doherty
2026-02-24 15:22:22 -05:00
parent d259a2d03e
commit 7116988d03
8 changed files with 627 additions and 5 deletions

View File

@@ -4,6 +4,21 @@ using NATS.Server.JetStream.Models;
namespace NATS.Server.JetStream.Api.Handlers;
/// <summary>
/// Purge request options. Go reference: jetstream_api.go:1200-1350.
/// </summary>
public sealed record PurgeRequest
{
/// <summary>Subject filter — only purge messages matching this subject pattern.</summary>
public string? Filter { get; init; }
/// <summary>Purge all messages with sequence strictly less than this value.</summary>
public ulong? Seq { get; init; }
/// <summary>Keep the last N messages (per matching subject if filter is set).</summary>
public ulong? Keep { get; init; }
}
public static class StreamApiHandlers
{
private const string CreatePrefix = JetStreamApiSubjects.StreamCreate;
@@ -68,15 +83,22 @@ public static class StreamApiHandlers
: JetStreamApiResponse.NotFound(subject);
}
public static JetStreamApiResponse HandlePurge(string subject, StreamManager streamManager)
/// <summary>
/// Handles stream purge with optional filter, seq, and keep options.
/// Go reference: jetstream_api.go:1200-1350.
/// </summary>
public static JetStreamApiResponse HandlePurge(string subject, ReadOnlySpan<byte> payload, StreamManager streamManager)
{
var streamName = ExtractTrailingToken(subject, PurgePrefix);
if (streamName == null)
return JetStreamApiResponse.NotFound(subject);
return streamManager.Purge(streamName)
? JetStreamApiResponse.SuccessResponse()
: JetStreamApiResponse.NotFound(subject);
var request = ParsePurgeRequest(payload);
var purged = streamManager.PurgeEx(streamName, request.Filter, request.Seq, request.Keep);
if (purged < 0)
return JetStreamApiResponse.NotFound(subject);
return JetStreamApiResponse.PurgeResponse((ulong)purged);
}
public static JetStreamApiResponse HandleNames(StreamManager streamManager)
@@ -175,6 +197,37 @@ public static class StreamApiHandlers
return token.Length == 0 ? null : token;
}
internal static PurgeRequest ParsePurgeRequest(ReadOnlySpan<byte> payload)
{
if (payload.IsEmpty)
return new PurgeRequest();
try
{
using var doc = JsonDocument.Parse(payload.ToArray());
var root = doc.RootElement;
string? filter = null;
ulong? seq = null;
ulong? keep = null;
if (root.TryGetProperty("filter", out var filterEl) && filterEl.ValueKind == JsonValueKind.String)
filter = filterEl.GetString();
if (root.TryGetProperty("seq", out var seqEl) && seqEl.TryGetUInt64(out var seqVal))
seq = seqVal;
if (root.TryGetProperty("keep", out var keepEl) && keepEl.TryGetUInt64(out var keepVal))
keep = keepVal;
return new PurgeRequest { Filter = filter, Seq = seq, Keep = keep };
}
catch (JsonException)
{
return new PurgeRequest();
}
}
private static StreamConfig ParseConfig(ReadOnlySpan<byte> payload)
{
if (payload.IsEmpty)

View File

@@ -4,4 +4,11 @@ public sealed class JetStreamApiError
{
public int Code { get; init; }
public string Description { get; init; } = string.Empty;
/// <summary>
/// When non-null, indicates which node is the current leader.
/// Go reference: jetstream_api.go — not-leader responses include a leader_hint
/// so clients can redirect to the correct node.
/// </summary>
public string? LeaderHint { get; init; }
}

View File

@@ -15,6 +15,7 @@ public sealed class JetStreamApiResponse
public JetStreamSnapshot? Snapshot { get; init; }
public JetStreamPullBatch? PullBatch { get; init; }
public bool Success { get; init; }
public ulong Purged { get; init; }
public static JetStreamApiResponse NotFound(string subject) => new()
{
@@ -40,6 +41,31 @@ public sealed class JetStreamApiResponse
Description = description,
},
};
/// <summary>
/// Returns a not-leader error with code 10003 and a leader_hint.
/// Go reference: jetstream_api.go:200-300 — non-leader nodes return this error
/// for mutating operations so clients can redirect.
/// </summary>
public static JetStreamApiResponse NotLeader(string leaderHint) => new()
{
Error = new JetStreamApiError
{
Code = 10003,
Description = "not leader",
LeaderHint = leaderHint,
},
};
/// <summary>
/// Returns a purge success response with the number of messages purged.
/// Go reference: jetstream_api.go:1200-1350 — purge response includes purged count.
/// </summary>
public static JetStreamApiResponse PurgeResponse(ulong purged) => new()
{
Success = true,
Purged = purged,
};
}
public sealed class JetStreamStreamInfo

View File

@@ -2,6 +2,11 @@ using NATS.Server.JetStream.Api.Handlers;
namespace NATS.Server.JetStream.Api;
/// <summary>
/// Routes JetStream API requests to the appropriate handler.
/// Go reference: jetstream_api.go:200-300 — non-leader nodes must forward or reject
/// mutating operations (Create, Update, Delete, Purge) to the current meta-group leader.
/// </summary>
public sealed class JetStreamApiRouter
{
private readonly StreamManager _streamManager;
@@ -20,8 +25,86 @@ public sealed class JetStreamApiRouter
_metaGroup = metaGroup;
}
/// <summary>
/// Determines whether the given API subject requires leader-only handling.
/// Mutating operations (Create, Update, Delete, Purge, Restore, Pause, Reset, Unpin,
/// message delete, peer/leader stepdown, server remove, account purge/move) require the leader.
/// Read-only operations (Info, Names, List, MessageGet, Snapshot, DirectGet, Next) do not.
/// Go reference: jetstream_api.go:200-300.
/// </summary>
public static bool IsLeaderRequired(string subject)
{
// Stream mutating operations
if (subject.StartsWith(JetStreamApiSubjects.StreamCreate, StringComparison.Ordinal))
return true;
if (subject.StartsWith(JetStreamApiSubjects.StreamUpdate, StringComparison.Ordinal))
return true;
if (subject.StartsWith(JetStreamApiSubjects.StreamDelete, StringComparison.Ordinal))
return true;
if (subject.StartsWith(JetStreamApiSubjects.StreamPurge, StringComparison.Ordinal))
return true;
if (subject.StartsWith(JetStreamApiSubjects.StreamRestore, StringComparison.Ordinal))
return true;
if (subject.StartsWith(JetStreamApiSubjects.StreamMessageDelete, StringComparison.Ordinal))
return true;
// Consumer mutating operations
if (subject.StartsWith(JetStreamApiSubjects.ConsumerCreate, StringComparison.Ordinal))
return true;
if (subject.StartsWith(JetStreamApiSubjects.ConsumerDelete, StringComparison.Ordinal))
return true;
if (subject.StartsWith(JetStreamApiSubjects.ConsumerPause, StringComparison.Ordinal))
return true;
if (subject.StartsWith(JetStreamApiSubjects.ConsumerReset, StringComparison.Ordinal))
return true;
if (subject.StartsWith(JetStreamApiSubjects.ConsumerUnpin, StringComparison.Ordinal))
return true;
// Cluster control operations
if (subject.StartsWith(JetStreamApiSubjects.StreamLeaderStepdown, StringComparison.Ordinal))
return true;
if (subject.StartsWith(JetStreamApiSubjects.StreamPeerRemove, StringComparison.Ordinal))
return true;
if (subject.StartsWith(JetStreamApiSubjects.ConsumerLeaderStepdown, StringComparison.Ordinal))
return true;
if (subject.Equals(JetStreamApiSubjects.MetaLeaderStepdown, StringComparison.Ordinal))
return true;
// Account-level control
if (subject.Equals(JetStreamApiSubjects.ServerRemove, StringComparison.Ordinal))
return true;
if (subject.StartsWith(JetStreamApiSubjects.AccountPurge, StringComparison.Ordinal))
return true;
if (subject.StartsWith(JetStreamApiSubjects.AccountStreamMove, StringComparison.Ordinal))
return true;
if (subject.StartsWith(JetStreamApiSubjects.AccountStreamMoveCancel, StringComparison.Ordinal))
return true;
return false;
}
/// <summary>
/// Stub for future leader-forwarding implementation.
/// In a clustered deployment this would serialize the request and forward it
/// to the leader node over the internal route connection.
/// Go reference: jetstream_api.go — jsClusteredStreamXxxRequest helpers.
/// </summary>
public static JetStreamApiResponse ForwardToLeader(string subject, ReadOnlySpan<byte> payload, string leaderName)
{
// For now, return the not-leader error with a hint so the client can retry.
return JetStreamApiResponse.NotLeader(leaderName);
}
public JetStreamApiResponse Route(string subject, ReadOnlySpan<byte> payload)
{
// Leader check: if a meta-group exists and this node is not the leader,
// reject mutating operations with a not-leader error containing a leader hint.
// Go reference: jetstream_api.go:200-300.
if (_metaGroup is not null && IsLeaderRequired(subject) && !_metaGroup.IsLeader())
{
return ForwardToLeader(subject, payload, _metaGroup.Leader);
}
if (subject.Equals(JetStreamApiSubjects.Info, StringComparison.Ordinal))
return AccountApiHandlers.HandleInfo(_streamManager, _consumerManager);
@@ -56,7 +139,7 @@ public sealed class JetStreamApiRouter
return StreamApiHandlers.HandleDelete(subject, _streamManager);
if (subject.StartsWith(JetStreamApiSubjects.StreamPurge, StringComparison.Ordinal))
return StreamApiHandlers.HandlePurge(subject, _streamManager);
return StreamApiHandlers.HandlePurge(subject, payload, _streamManager);
if (subject.StartsWith(JetStreamApiSubjects.StreamMessageGet, StringComparison.Ordinal))
return StreamApiHandlers.HandleMessageGet(subject, payload, _streamManager);

View File

@@ -6,15 +6,34 @@ namespace NATS.Server.JetStream.Cluster;
public sealed class JetStreamMetaGroup
{
private readonly int _nodes;
private readonly int _selfIndex;
private readonly ConcurrentDictionary<string, byte> _streams = new(StringComparer.Ordinal);
private int _leaderIndex = 1;
private long _leadershipVersion = 1;
public JetStreamMetaGroup(int nodes)
: this(nodes, selfIndex: 1)
{
}
public JetStreamMetaGroup(int nodes, int selfIndex)
{
_nodes = nodes;
_selfIndex = selfIndex;
}
/// <summary>
/// Returns true when this node is the current meta-group leader.
/// Go reference: jetstream_api.go:200-300 — leader check before mutating operations.
/// </summary>
public bool IsLeader() => _leaderIndex == _selfIndex;
/// <summary>
/// Returns the leader identifier string, e.g. "meta-1".
/// Used to populate the leader_hint field in not-leader error responses.
/// </summary>
public string Leader => $"meta-{_leaderIndex}";
public Task ProposeCreateStreamAsync(StreamConfig config, CancellationToken ct)
{
_streams[config.Name] = 0;

View File

@@ -103,6 +103,97 @@ public sealed class StreamManager
return true;
}
/// <summary>
/// Extended purge with optional subject filter, sequence cutoff, and keep-last-N.
/// Returns the number of messages purged, or -1 if the stream was not found.
/// Go reference: jetstream_api.go:1200-1350 — purge options: filter, seq, keep.
/// </summary>
public long PurgeEx(string name, string? filter, ulong? seq, ulong? keep)
{
if (!_streams.TryGetValue(name, out var stream))
return -1;
if (stream.Config.Sealed || stream.Config.DenyPurge)
return -1;
// No options — purge everything (backward-compatible with the original Purge).
if (filter is null && seq is null && keep is null)
{
var stateBefore = stream.Store.GetStateAsync(default).GetAwaiter().GetResult();
var count = stateBefore.Messages;
stream.Store.PurgeAsync(default).GetAwaiter().GetResult();
return (long)count;
}
var messages = stream.Store.ListAsync(default).GetAwaiter().GetResult();
long purged = 0;
// Filter + Keep: keep last N per matching subject.
if (filter is not null && keep is not null)
{
var matching = messages
.Where(m => SubjectMatch.MatchLiteral(m.Subject, filter))
.GroupBy(m => m.Subject, StringComparer.Ordinal);
foreach (var group in matching)
{
var ordered = group.OrderByDescending(m => m.Sequence).ToList();
foreach (var msg in ordered.Skip((int)keep.Value))
{
if (stream.Store.RemoveAsync(msg.Sequence, default).GetAwaiter().GetResult())
purged++;
}
}
return purged;
}
// Filter only: remove all messages matching the subject pattern.
if (filter is not null)
{
// If seq is also set, only purge matching messages below that sequence.
foreach (var msg in messages)
{
if (!SubjectMatch.MatchLiteral(msg.Subject, filter))
continue;
if (seq is not null && msg.Sequence >= seq.Value)
continue;
if (stream.Store.RemoveAsync(msg.Sequence, default).GetAwaiter().GetResult())
purged++;
}
return purged;
}
// Seq only: remove all messages with sequence < seq.
if (seq is not null)
{
foreach (var msg in messages)
{
if (msg.Sequence >= seq.Value)
continue;
if (stream.Store.RemoveAsync(msg.Sequence, default).GetAwaiter().GetResult())
purged++;
}
return purged;
}
// Keep only (no filter): keep the last N messages globally, delete the rest.
if (keep is not null)
{
var ordered = messages.OrderByDescending(m => m.Sequence).ToList();
foreach (var msg in ordered.Skip((int)keep.Value))
{
if (stream.Store.RemoveAsync(msg.Sequence, default).GetAwaiter().GetResult())
purged++;
}
return purged;
}
return purged;
}
public StoredMessage? GetMessage(string name, ulong sequence)
{
if (!_streams.TryGetValue(name, out var stream))