diff --git a/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs b/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs index a8fe02b..daeaf64 100644 --- a/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs +++ b/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs @@ -4,6 +4,21 @@ using NATS.Server.JetStream.Models; namespace NATS.Server.JetStream.Api.Handlers; +/// +/// Purge request options. Go reference: jetstream_api.go:1200-1350. +/// +public sealed record PurgeRequest +{ + /// Subject filter — only purge messages matching this subject pattern. + public string? Filter { get; init; } + + /// Purge all messages with sequence strictly less than this value. + public ulong? Seq { get; init; } + + /// Keep the last N messages (per matching subject if filter is set). + public ulong? Keep { get; init; } +} + public static class StreamApiHandlers { private const string CreatePrefix = JetStreamApiSubjects.StreamCreate; @@ -68,15 +83,22 @@ public static class StreamApiHandlers : JetStreamApiResponse.NotFound(subject); } - public static JetStreamApiResponse HandlePurge(string subject, StreamManager streamManager) + /// + /// Handles stream purge with optional filter, seq, and keep options. + /// Go reference: jetstream_api.go:1200-1350. + /// + public static JetStreamApiResponse HandlePurge(string subject, ReadOnlySpan payload, StreamManager streamManager) { var streamName = ExtractTrailingToken(subject, PurgePrefix); if (streamName == null) return JetStreamApiResponse.NotFound(subject); - return streamManager.Purge(streamName) - ? JetStreamApiResponse.SuccessResponse() - : JetStreamApiResponse.NotFound(subject); + var request = ParsePurgeRequest(payload); + var purged = streamManager.PurgeEx(streamName, request.Filter, request.Seq, request.Keep); + if (purged < 0) + return JetStreamApiResponse.NotFound(subject); + + return JetStreamApiResponse.PurgeResponse((ulong)purged); } public static JetStreamApiResponse HandleNames(StreamManager streamManager) @@ -175,6 +197,37 @@ public static class StreamApiHandlers return token.Length == 0 ? null : token; } + internal static PurgeRequest ParsePurgeRequest(ReadOnlySpan payload) + { + if (payload.IsEmpty) + return new PurgeRequest(); + + try + { + using var doc = JsonDocument.Parse(payload.ToArray()); + var root = doc.RootElement; + + string? filter = null; + ulong? seq = null; + ulong? keep = null; + + if (root.TryGetProperty("filter", out var filterEl) && filterEl.ValueKind == JsonValueKind.String) + filter = filterEl.GetString(); + + if (root.TryGetProperty("seq", out var seqEl) && seqEl.TryGetUInt64(out var seqVal)) + seq = seqVal; + + if (root.TryGetProperty("keep", out var keepEl) && keepEl.TryGetUInt64(out var keepVal)) + keep = keepVal; + + return new PurgeRequest { Filter = filter, Seq = seq, Keep = keep }; + } + catch (JsonException) + { + return new PurgeRequest(); + } + } + private static StreamConfig ParseConfig(ReadOnlySpan payload) { if (payload.IsEmpty) diff --git a/src/NATS.Server/JetStream/Api/JetStreamApiError.cs b/src/NATS.Server/JetStream/Api/JetStreamApiError.cs index 4aaf120..eba6820 100644 --- a/src/NATS.Server/JetStream/Api/JetStreamApiError.cs +++ b/src/NATS.Server/JetStream/Api/JetStreamApiError.cs @@ -4,4 +4,11 @@ public sealed class JetStreamApiError { public int Code { get; init; } public string Description { get; init; } = string.Empty; + + /// + /// When non-null, indicates which node is the current leader. + /// Go reference: jetstream_api.go — not-leader responses include a leader_hint + /// so clients can redirect to the correct node. + /// + public string? LeaderHint { get; init; } } diff --git a/src/NATS.Server/JetStream/Api/JetStreamApiResponse.cs b/src/NATS.Server/JetStream/Api/JetStreamApiResponse.cs index 79cbdbf..046c7cc 100644 --- a/src/NATS.Server/JetStream/Api/JetStreamApiResponse.cs +++ b/src/NATS.Server/JetStream/Api/JetStreamApiResponse.cs @@ -15,6 +15,7 @@ public sealed class JetStreamApiResponse public JetStreamSnapshot? Snapshot { get; init; } public JetStreamPullBatch? PullBatch { get; init; } public bool Success { get; init; } + public ulong Purged { get; init; } public static JetStreamApiResponse NotFound(string subject) => new() { @@ -40,6 +41,31 @@ public sealed class JetStreamApiResponse Description = description, }, }; + + /// + /// Returns a not-leader error with code 10003 and a leader_hint. + /// Go reference: jetstream_api.go:200-300 — non-leader nodes return this error + /// for mutating operations so clients can redirect. + /// + public static JetStreamApiResponse NotLeader(string leaderHint) => new() + { + Error = new JetStreamApiError + { + Code = 10003, + Description = "not leader", + LeaderHint = leaderHint, + }, + }; + + /// + /// Returns a purge success response with the number of messages purged. + /// Go reference: jetstream_api.go:1200-1350 — purge response includes purged count. + /// + public static JetStreamApiResponse PurgeResponse(ulong purged) => new() + { + Success = true, + Purged = purged, + }; } public sealed class JetStreamStreamInfo diff --git a/src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs b/src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs index 981301f..1430378 100644 --- a/src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs +++ b/src/NATS.Server/JetStream/Api/JetStreamApiRouter.cs @@ -2,6 +2,11 @@ using NATS.Server.JetStream.Api.Handlers; namespace NATS.Server.JetStream.Api; +/// +/// Routes JetStream API requests to the appropriate handler. +/// Go reference: jetstream_api.go:200-300 — non-leader nodes must forward or reject +/// mutating operations (Create, Update, Delete, Purge) to the current meta-group leader. +/// public sealed class JetStreamApiRouter { private readonly StreamManager _streamManager; @@ -20,8 +25,86 @@ public sealed class JetStreamApiRouter _metaGroup = metaGroup; } + /// + /// Determines whether the given API subject requires leader-only handling. + /// Mutating operations (Create, Update, Delete, Purge, Restore, Pause, Reset, Unpin, + /// message delete, peer/leader stepdown, server remove, account purge/move) require the leader. + /// Read-only operations (Info, Names, List, MessageGet, Snapshot, DirectGet, Next) do not. + /// Go reference: jetstream_api.go:200-300. + /// + public static bool IsLeaderRequired(string subject) + { + // Stream mutating operations + if (subject.StartsWith(JetStreamApiSubjects.StreamCreate, StringComparison.Ordinal)) + return true; + if (subject.StartsWith(JetStreamApiSubjects.StreamUpdate, StringComparison.Ordinal)) + return true; + if (subject.StartsWith(JetStreamApiSubjects.StreamDelete, StringComparison.Ordinal)) + return true; + if (subject.StartsWith(JetStreamApiSubjects.StreamPurge, StringComparison.Ordinal)) + return true; + if (subject.StartsWith(JetStreamApiSubjects.StreamRestore, StringComparison.Ordinal)) + return true; + if (subject.StartsWith(JetStreamApiSubjects.StreamMessageDelete, StringComparison.Ordinal)) + return true; + + // Consumer mutating operations + if (subject.StartsWith(JetStreamApiSubjects.ConsumerCreate, StringComparison.Ordinal)) + return true; + if (subject.StartsWith(JetStreamApiSubjects.ConsumerDelete, StringComparison.Ordinal)) + return true; + if (subject.StartsWith(JetStreamApiSubjects.ConsumerPause, StringComparison.Ordinal)) + return true; + if (subject.StartsWith(JetStreamApiSubjects.ConsumerReset, StringComparison.Ordinal)) + return true; + if (subject.StartsWith(JetStreamApiSubjects.ConsumerUnpin, StringComparison.Ordinal)) + return true; + + // Cluster control operations + if (subject.StartsWith(JetStreamApiSubjects.StreamLeaderStepdown, StringComparison.Ordinal)) + return true; + if (subject.StartsWith(JetStreamApiSubjects.StreamPeerRemove, StringComparison.Ordinal)) + return true; + if (subject.StartsWith(JetStreamApiSubjects.ConsumerLeaderStepdown, StringComparison.Ordinal)) + return true; + if (subject.Equals(JetStreamApiSubjects.MetaLeaderStepdown, StringComparison.Ordinal)) + return true; + + // Account-level control + if (subject.Equals(JetStreamApiSubjects.ServerRemove, StringComparison.Ordinal)) + return true; + if (subject.StartsWith(JetStreamApiSubjects.AccountPurge, StringComparison.Ordinal)) + return true; + if (subject.StartsWith(JetStreamApiSubjects.AccountStreamMove, StringComparison.Ordinal)) + return true; + if (subject.StartsWith(JetStreamApiSubjects.AccountStreamMoveCancel, StringComparison.Ordinal)) + return true; + + return false; + } + + /// + /// Stub for future leader-forwarding implementation. + /// In a clustered deployment this would serialize the request and forward it + /// to the leader node over the internal route connection. + /// Go reference: jetstream_api.go — jsClusteredStreamXxxRequest helpers. + /// + public static JetStreamApiResponse ForwardToLeader(string subject, ReadOnlySpan payload, string leaderName) + { + // For now, return the not-leader error with a hint so the client can retry. + return JetStreamApiResponse.NotLeader(leaderName); + } + public JetStreamApiResponse Route(string subject, ReadOnlySpan payload) { + // Leader check: if a meta-group exists and this node is not the leader, + // reject mutating operations with a not-leader error containing a leader hint. + // Go reference: jetstream_api.go:200-300. + if (_metaGroup is not null && IsLeaderRequired(subject) && !_metaGroup.IsLeader()) + { + return ForwardToLeader(subject, payload, _metaGroup.Leader); + } + if (subject.Equals(JetStreamApiSubjects.Info, StringComparison.Ordinal)) return AccountApiHandlers.HandleInfo(_streamManager, _consumerManager); @@ -56,7 +139,7 @@ public sealed class JetStreamApiRouter return StreamApiHandlers.HandleDelete(subject, _streamManager); if (subject.StartsWith(JetStreamApiSubjects.StreamPurge, StringComparison.Ordinal)) - return StreamApiHandlers.HandlePurge(subject, _streamManager); + return StreamApiHandlers.HandlePurge(subject, payload, _streamManager); if (subject.StartsWith(JetStreamApiSubjects.StreamMessageGet, StringComparison.Ordinal)) return StreamApiHandlers.HandleMessageGet(subject, payload, _streamManager); diff --git a/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs b/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs index 0dc8db3..24f8ed4 100644 --- a/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs +++ b/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs @@ -6,15 +6,34 @@ namespace NATS.Server.JetStream.Cluster; public sealed class JetStreamMetaGroup { private readonly int _nodes; + private readonly int _selfIndex; private readonly ConcurrentDictionary _streams = new(StringComparer.Ordinal); private int _leaderIndex = 1; private long _leadershipVersion = 1; public JetStreamMetaGroup(int nodes) + : this(nodes, selfIndex: 1) + { + } + + public JetStreamMetaGroup(int nodes, int selfIndex) { _nodes = nodes; + _selfIndex = selfIndex; } + /// + /// Returns true when this node is the current meta-group leader. + /// Go reference: jetstream_api.go:200-300 — leader check before mutating operations. + /// + public bool IsLeader() => _leaderIndex == _selfIndex; + + /// + /// Returns the leader identifier string, e.g. "meta-1". + /// Used to populate the leader_hint field in not-leader error responses. + /// + public string Leader => $"meta-{_leaderIndex}"; + public Task ProposeCreateStreamAsync(StreamConfig config, CancellationToken ct) { _streams[config.Name] = 0; diff --git a/src/NATS.Server/JetStream/StreamManager.cs b/src/NATS.Server/JetStream/StreamManager.cs index b12bfc5..b53e5c7 100644 --- a/src/NATS.Server/JetStream/StreamManager.cs +++ b/src/NATS.Server/JetStream/StreamManager.cs @@ -103,6 +103,97 @@ public sealed class StreamManager return true; } + /// + /// Extended purge with optional subject filter, sequence cutoff, and keep-last-N. + /// Returns the number of messages purged, or -1 if the stream was not found. + /// Go reference: jetstream_api.go:1200-1350 — purge options: filter, seq, keep. + /// + public long PurgeEx(string name, string? filter, ulong? seq, ulong? keep) + { + if (!_streams.TryGetValue(name, out var stream)) + return -1; + if (stream.Config.Sealed || stream.Config.DenyPurge) + return -1; + + // No options — purge everything (backward-compatible with the original Purge). + if (filter is null && seq is null && keep is null) + { + var stateBefore = stream.Store.GetStateAsync(default).GetAwaiter().GetResult(); + var count = stateBefore.Messages; + stream.Store.PurgeAsync(default).GetAwaiter().GetResult(); + return (long)count; + } + + var messages = stream.Store.ListAsync(default).GetAwaiter().GetResult(); + long purged = 0; + + // Filter + Keep: keep last N per matching subject. + if (filter is not null && keep is not null) + { + var matching = messages + .Where(m => SubjectMatch.MatchLiteral(m.Subject, filter)) + .GroupBy(m => m.Subject, StringComparer.Ordinal); + + foreach (var group in matching) + { + var ordered = group.OrderByDescending(m => m.Sequence).ToList(); + foreach (var msg in ordered.Skip((int)keep.Value)) + { + if (stream.Store.RemoveAsync(msg.Sequence, default).GetAwaiter().GetResult()) + purged++; + } + } + + return purged; + } + + // Filter only: remove all messages matching the subject pattern. + if (filter is not null) + { + // If seq is also set, only purge matching messages below that sequence. + foreach (var msg in messages) + { + if (!SubjectMatch.MatchLiteral(msg.Subject, filter)) + continue; + if (seq is not null && msg.Sequence >= seq.Value) + continue; + if (stream.Store.RemoveAsync(msg.Sequence, default).GetAwaiter().GetResult()) + purged++; + } + + return purged; + } + + // Seq only: remove all messages with sequence < seq. + if (seq is not null) + { + foreach (var msg in messages) + { + if (msg.Sequence >= seq.Value) + continue; + if (stream.Store.RemoveAsync(msg.Sequence, default).GetAwaiter().GetResult()) + purged++; + } + + return purged; + } + + // Keep only (no filter): keep the last N messages globally, delete the rest. + if (keep is not null) + { + var ordered = messages.OrderByDescending(m => m.Sequence).ToList(); + foreach (var msg in ordered.Skip((int)keep.Value)) + { + if (stream.Store.RemoveAsync(msg.Sequence, default).GetAwaiter().GetResult()) + purged++; + } + + return purged; + } + + return purged; + } + public StoredMessage? GetMessage(string name, ulong sequence) { if (!_streams.TryGetValue(name, out var stream)) diff --git a/tests/NATS.Server.Tests/JetStream/Api/LeaderForwardingTests.cs b/tests/NATS.Server.Tests/JetStream/Api/LeaderForwardingTests.cs new file mode 100644 index 0000000..ebbd29c --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Api/LeaderForwardingTests.cs @@ -0,0 +1,150 @@ +// Go reference: jetstream_api.go:200-300 — API requests at non-leader nodes must be +// forwarded to the current leader. Mutating operations return a not-leader error with +// a leader_hint field; read-only operations are handled locally on any node. + +using System.Text; +using NATS.Server.JetStream; +using NATS.Server.JetStream.Api; +using NATS.Server.JetStream.Cluster; + +namespace NATS.Server.Tests.JetStream.Api; + +public class LeaderForwardingTests +{ + /// + /// When this node IS the leader, mutating requests are handled locally. + /// Go reference: jetstream_api.go — leader handles requests directly. + /// + [Fact] + public void Route_WhenLeader_HandlesLocally() + { + // selfIndex=1 matches default leaderIndex=1, so this node is the leader. + var metaGroup = new JetStreamMetaGroup(nodes: 3, selfIndex: 1); + var streamManager = new StreamManager(metaGroup); + var consumerManager = new ConsumerManager(); + var router = new JetStreamApiRouter(streamManager, consumerManager, metaGroup); + + // Create a stream first so the purge has something to operate on. + var createPayload = Encoding.UTF8.GetBytes("""{"name":"TEST","subjects":["test.>"]}"""); + var createResult = router.Route("$JS.API.STREAM.CREATE.TEST", createPayload); + createResult.Error.ShouldBeNull(); + createResult.StreamInfo.ShouldNotBeNull(); + + // A mutating operation (delete) should succeed locally. + var deleteResult = router.Route("$JS.API.STREAM.DELETE.TEST", ReadOnlySpan.Empty); + deleteResult.Error.ShouldBeNull(); + deleteResult.Success.ShouldBeTrue(); + } + + /// + /// When this node is NOT the leader, mutating operations return a not-leader error + /// with the current leader's identifier in the leader_hint field. + /// Go reference: jetstream_api.go:200-300 — not-leader response. + /// + [Fact] + public void Route_WhenNotLeader_MutatingOp_ReturnsNotLeaderError() + { + // selfIndex=2, leaderIndex defaults to 1 — this node is NOT the leader. + var metaGroup = new JetStreamMetaGroup(nodes: 3, selfIndex: 2); + var streamManager = new StreamManager(metaGroup); + var consumerManager = new ConsumerManager(); + var router = new JetStreamApiRouter(streamManager, consumerManager, metaGroup); + + var payload = Encoding.UTF8.GetBytes("""{"name":"TEST","subjects":["test.>"]}"""); + var result = router.Route("$JS.API.STREAM.CREATE.TEST", payload); + + result.Error.ShouldNotBeNull(); + result.Error!.Code.ShouldBe(10003); + result.Error.Description.ShouldBe("not leader"); + result.Error.LeaderHint.ShouldNotBeNull(); + result.Error.LeaderHint.ShouldBe("meta-1"); + } + + /// + /// Read-only operations (INFO, NAMES, LIST) are handled locally even when + /// this node is not the leader. + /// Go reference: jetstream_api.go — read operations do not require leadership. + /// + [Fact] + public void Route_WhenNotLeader_ReadOp_HandlesLocally() + { + // selfIndex=2, leaderIndex defaults to 1 — this node is NOT the leader. + var metaGroup = new JetStreamMetaGroup(nodes: 3, selfIndex: 2); + var streamManager = new StreamManager(metaGroup); + var consumerManager = new ConsumerManager(); + var router = new JetStreamApiRouter(streamManager, consumerManager, metaGroup); + + // $JS.API.INFO is a read-only operation. + var infoResult = router.Route("$JS.API.INFO", ReadOnlySpan.Empty); + infoResult.Error.ShouldBeNull(); + + // $JS.API.STREAM.NAMES is a read-only operation. + var namesResult = router.Route("$JS.API.STREAM.NAMES", ReadOnlySpan.Empty); + namesResult.Error.ShouldBeNull(); + namesResult.StreamNames.ShouldNotBeNull(); + + // $JS.API.STREAM.LIST is a read-only operation. + var listResult = router.Route("$JS.API.STREAM.LIST", ReadOnlySpan.Empty); + listResult.Error.ShouldBeNull(); + listResult.StreamNames.ShouldNotBeNull(); + } + + /// + /// When there is no meta-group (single-server mode), all operations are handled + /// locally regardless of the subject type. + /// Go reference: jetstream_api.go — standalone servers have no meta-group. + /// + [Fact] + public void Route_NoMetaGroup_HandlesLocally() + { + // No meta-group — single server mode. + var streamManager = new StreamManager(); + var consumerManager = new ConsumerManager(); + var router = new JetStreamApiRouter(streamManager, consumerManager, metaGroup: null); + + var payload = Encoding.UTF8.GetBytes("""{"name":"TEST","subjects":["test.>"]}"""); + var result = router.Route("$JS.API.STREAM.CREATE.TEST", payload); + + // Should succeed — no leader check in single-server mode. + result.Error.ShouldBeNull(); + result.StreamInfo.ShouldNotBeNull(); + result.StreamInfo!.Config.Name.ShouldBe("TEST"); + } + + /// + /// IsLeaderRequired returns true for Create, Update, Delete, and Purge operations. + /// Go reference: jetstream_api.go:200-300 — mutating operations require leader. + /// + [Fact] + public void IsLeaderRequired_CreateUpdate_ReturnsTrue() + { + JetStreamApiRouter.IsLeaderRequired("$JS.API.STREAM.CREATE.TEST").ShouldBeTrue(); + JetStreamApiRouter.IsLeaderRequired("$JS.API.STREAM.UPDATE.TEST").ShouldBeTrue(); + JetStreamApiRouter.IsLeaderRequired("$JS.API.STREAM.DELETE.TEST").ShouldBeTrue(); + JetStreamApiRouter.IsLeaderRequired("$JS.API.STREAM.PURGE.TEST").ShouldBeTrue(); + JetStreamApiRouter.IsLeaderRequired("$JS.API.STREAM.RESTORE.TEST").ShouldBeTrue(); + JetStreamApiRouter.IsLeaderRequired("$JS.API.STREAM.MSG.DELETE.TEST").ShouldBeTrue(); + JetStreamApiRouter.IsLeaderRequired("$JS.API.CONSUMER.CREATE.STREAM.CON").ShouldBeTrue(); + JetStreamApiRouter.IsLeaderRequired("$JS.API.CONSUMER.DELETE.STREAM.CON").ShouldBeTrue(); + } + + /// + /// IsLeaderRequired returns false for Info, Names, List, and other read operations. + /// Go reference: jetstream_api.go — read-only operations do not need leadership. + /// + [Fact] + public void IsLeaderRequired_InfoList_ReturnsFalse() + { + JetStreamApiRouter.IsLeaderRequired("$JS.API.INFO").ShouldBeFalse(); + JetStreamApiRouter.IsLeaderRequired("$JS.API.STREAM.INFO.TEST").ShouldBeFalse(); + JetStreamApiRouter.IsLeaderRequired("$JS.API.STREAM.NAMES").ShouldBeFalse(); + JetStreamApiRouter.IsLeaderRequired("$JS.API.STREAM.LIST").ShouldBeFalse(); + JetStreamApiRouter.IsLeaderRequired("$JS.API.STREAM.MSG.GET.TEST").ShouldBeFalse(); + JetStreamApiRouter.IsLeaderRequired("$JS.API.STREAM.SNAPSHOT.TEST").ShouldBeFalse(); + JetStreamApiRouter.IsLeaderRequired("$JS.API.CONSUMER.INFO.STREAM.CON").ShouldBeFalse(); + JetStreamApiRouter.IsLeaderRequired("$JS.API.CONSUMER.NAMES.STREAM").ShouldBeFalse(); + JetStreamApiRouter.IsLeaderRequired("$JS.API.CONSUMER.LIST.STREAM").ShouldBeFalse(); + JetStreamApiRouter.IsLeaderRequired("$JS.API.CONSUMER.MSG.NEXT.STREAM.CON").ShouldBeFalse(); + JetStreamApiRouter.IsLeaderRequired("$JS.API.DIRECT.GET.TEST").ShouldBeFalse(); + } +} diff --git a/tests/NATS.Server.Tests/JetStream/Api/StreamPurgeOptionsTests.cs b/tests/NATS.Server.Tests/JetStream/Api/StreamPurgeOptionsTests.cs new file mode 100644 index 0000000..afd0942 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Api/StreamPurgeOptionsTests.cs @@ -0,0 +1,193 @@ +// Go reference: jetstream_api.go:1200-1350 — stream purge supports options: subject filter, +// sequence cutoff, and keep-last-N. Combinations like filter+keep allow keeping the last N +// messages per matching subject. + +using System.Text; +using NATS.Server.JetStream; +using NATS.Server.JetStream.Api; + +namespace NATS.Server.Tests.JetStream.Api; + +public class StreamPurgeOptionsTests +{ + private static JetStreamApiRouter CreateRouterWithStream(string streamName, string subjectPattern, out StreamManager streamManager) + { + streamManager = new StreamManager(); + var consumerManager = new ConsumerManager(); + var router = new JetStreamApiRouter(streamManager, consumerManager); + + var payload = Encoding.UTF8.GetBytes($$$"""{"name":"{{{streamName}}}","subjects":["{{{subjectPattern}}}"]}"""); + var result = router.Route($"$JS.API.STREAM.CREATE.{streamName}", payload); + result.Error.ShouldBeNull(); + + return router; + } + + private static async Task PublishAsync(StreamManager streamManager, string subject, string payload) + { + var stream = streamManager.FindBySubject(subject); + stream.ShouldNotBeNull(); + await stream.Store.AppendAsync(subject, Encoding.UTF8.GetBytes(payload), default); + } + + /// + /// Purge with no options removes all messages and returns the count. + /// Go reference: jetstream_api.go — basic purge with empty request body. + /// + [Fact] + public async Task Purge_NoOptions_RemovesAll() + { + var router = CreateRouterWithStream("TEST", "test.>", out var sm); + + await PublishAsync(sm, "test.a", "1"); + await PublishAsync(sm, "test.b", "2"); + await PublishAsync(sm, "test.c", "3"); + + var result = router.Route("$JS.API.STREAM.PURGE.TEST", Encoding.UTF8.GetBytes("{}")); + result.Error.ShouldBeNull(); + result.Success.ShouldBeTrue(); + result.Purged.ShouldBe(3UL); + + var state = await sm.GetStateAsync("TEST", default); + state.Messages.ShouldBe(0UL); + } + + /// + /// Purge with a subject filter removes only messages matching the pattern. + /// Go reference: jetstream_api.go:1200-1350 — filter option. + /// + [Fact] + public async Task Purge_WithSubjectFilter_RemovesOnlyMatching() + { + var router = CreateRouterWithStream("TEST", ">", out var sm); + + await PublishAsync(sm, "orders.a", "1"); + await PublishAsync(sm, "orders.b", "2"); + await PublishAsync(sm, "logs.x", "3"); + await PublishAsync(sm, "orders.c", "4"); + + var payload = Encoding.UTF8.GetBytes("""{"filter":"orders.*"}"""); + var result = router.Route("$JS.API.STREAM.PURGE.TEST", payload); + result.Error.ShouldBeNull(); + result.Success.ShouldBeTrue(); + result.Purged.ShouldBe(3UL); + + var state = await sm.GetStateAsync("TEST", default); + state.Messages.ShouldBe(1UL); + } + + /// + /// Purge with seq option removes all messages with sequence strictly less than the given value. + /// Go reference: jetstream_api.go:1200-1350 — seq option. + /// + [Fact] + public async Task Purge_WithSeq_RemovesBelowSequence() + { + var router = CreateRouterWithStream("TEST", "test.>", out var sm); + + await PublishAsync(sm, "test.a", "1"); // seq 1 + await PublishAsync(sm, "test.b", "2"); // seq 2 + await PublishAsync(sm, "test.c", "3"); // seq 3 + await PublishAsync(sm, "test.d", "4"); // seq 4 + await PublishAsync(sm, "test.e", "5"); // seq 5 + + // Remove all messages with seq < 4 (i.e., sequences 1, 2, 3). + var payload = Encoding.UTF8.GetBytes("""{"seq":4}"""); + var result = router.Route("$JS.API.STREAM.PURGE.TEST", payload); + result.Error.ShouldBeNull(); + result.Success.ShouldBeTrue(); + result.Purged.ShouldBe(3UL); + + var state = await sm.GetStateAsync("TEST", default); + state.Messages.ShouldBe(2UL); + } + + /// + /// Purge with keep option retains the last N messages globally. + /// Go reference: jetstream_api.go:1200-1350 — keep option. + /// + [Fact] + public async Task Purge_WithKeep_KeepsLastN() + { + var router = CreateRouterWithStream("TEST", "test.>", out var sm); + + await PublishAsync(sm, "test.a", "1"); // seq 1 + await PublishAsync(sm, "test.b", "2"); // seq 2 + await PublishAsync(sm, "test.c", "3"); // seq 3 + await PublishAsync(sm, "test.d", "4"); // seq 4 + await PublishAsync(sm, "test.e", "5"); // seq 5 + + // Keep the last 2 messages (seq 4, 5); purge 1, 2, 3. + var payload = Encoding.UTF8.GetBytes("""{"keep":2}"""); + var result = router.Route("$JS.API.STREAM.PURGE.TEST", payload); + result.Error.ShouldBeNull(); + result.Success.ShouldBeTrue(); + result.Purged.ShouldBe(3UL); + + var state = await sm.GetStateAsync("TEST", default); + state.Messages.ShouldBe(2UL); + } + + /// + /// Purge with both filter and keep retains the last N messages per matching subject. + /// Go reference: jetstream_api.go:1200-1350 — filter+keep combination. + /// + [Fact] + public async Task Purge_FilterAndKeep_KeepsLastNPerFilter() + { + var router = CreateRouterWithStream("TEST", ">", out var sm); + + // Publish multiple messages on two subjects. + await PublishAsync(sm, "orders.a", "o1"); // seq 1 + await PublishAsync(sm, "orders.a", "o2"); // seq 2 + await PublishAsync(sm, "orders.a", "o3"); // seq 3 + await PublishAsync(sm, "logs.x", "l1"); // seq 4 — not matching filter + await PublishAsync(sm, "orders.b", "ob1"); // seq 5 + await PublishAsync(sm, "orders.b", "ob2"); // seq 6 + + // Keep last 1 per matching subject "orders.*". + // orders.a has 3 msgs -> keep seq 3, purge seq 1, 2 + // orders.b has 2 msgs -> keep seq 6, purge seq 5 + // logs.x is unaffected (does not match filter) + var payload = Encoding.UTF8.GetBytes("""{"filter":"orders.*","keep":1}"""); + var result = router.Route("$JS.API.STREAM.PURGE.TEST", payload); + result.Error.ShouldBeNull(); + result.Success.ShouldBeTrue(); + result.Purged.ShouldBe(3UL); + + var state = await sm.GetStateAsync("TEST", default); + // Remaining: orders.a seq 3, logs.x seq 4, orders.b seq 6 = 3 messages + state.Messages.ShouldBe(3UL); + } + + /// + /// Purge on a non-existent stream returns a 404 not-found error. + /// Go reference: jetstream_api.go — stream not found. + /// + [Fact] + public void Purge_InvalidStream_ReturnsNotFound() + { + var streamManager = new StreamManager(); + var consumerManager = new ConsumerManager(); + var router = new JetStreamApiRouter(streamManager, consumerManager); + + var result = router.Route("$JS.API.STREAM.PURGE.NONEXISTENT", Encoding.UTF8.GetBytes("{}")); + result.Error.ShouldNotBeNull(); + result.Error!.Code.ShouldBe(404); + } + + /// + /// Purge on an empty stream returns success with zero purged count. + /// Go reference: jetstream_api.go — purge on empty stream. + /// + [Fact] + public void Purge_EmptyStream_ReturnsZeroPurged() + { + var router = CreateRouterWithStream("TEST", "test.>", out _); + + var result = router.Route("$JS.API.STREAM.PURGE.TEST", Encoding.UTF8.GetBytes("{}")); + result.Error.ShouldBeNull(); + result.Success.ShouldBeTrue(); + result.Purged.ShouldBe(0UL); + } +}