diff --git a/src/NATS.Server/JetStream/Api/Handlers/ConsumerApiHandlers.cs b/src/NATS.Server/JetStream/Api/Handlers/ConsumerApiHandlers.cs index 4bae5d3..2efe797 100644 --- a/src/NATS.Server/JetStream/Api/Handlers/ConsumerApiHandlers.cs +++ b/src/NATS.Server/JetStream/Api/Handlers/ConsumerApiHandlers.cs @@ -84,10 +84,23 @@ public static class ConsumerApiHandlers return JetStreamApiResponse.NotFound(subject); var (stream, durableName) = parsed.Value; - var paused = ParsePause(payload); - return consumerManager.Pause(stream, durableName, paused) - ? JetStreamApiResponse.SuccessResponse() - : JetStreamApiResponse.NotFound(subject); + + var (paused, pauseUntil) = ParsePauseRequest(payload); + + bool success; + if (pauseUntil.HasValue) + success = consumerManager.Pause(stream, durableName, pauseUntil.Value); + else if (paused) + success = consumerManager.Pause(stream, durableName, true); + else + success = consumerManager.Resume(stream, durableName); + + if (!success) + return JetStreamApiResponse.NotFound(subject); + + return JetStreamApiResponse.PauseResponse( + consumerManager.IsPaused(stream, durableName), + consumerManager.GetPauseUntil(stream, durableName)); } public static JetStreamApiResponse HandleReset(string subject, ConsumerManager consumerManager) @@ -359,22 +372,44 @@ public static class ConsumerApiHandlers return 1; } - private static bool ParsePause(ReadOnlySpan payload) + /// + /// Parses pause request payload supporting both a boolean pause field and an RFC3339 + /// pause_until deadline. When pause_until is present it implies pause=true. + /// Go reference: server/consumer.go jsConsumerPauseRequest. + /// + private static (bool Paused, DateTime? PauseUntil) ParsePauseRequest(ReadOnlySpan payload) { if (payload.IsEmpty) - return false; + return (false, null); try { using var doc = JsonDocument.Parse(payload.ToArray()); - if (doc.RootElement.TryGetProperty("pause", out var pauseEl)) - return pauseEl.ValueKind == JsonValueKind.True; + var root = doc.RootElement; + + DateTime? pauseUntil = null; + if (root.TryGetProperty("pause_until", out var untilEl) + && untilEl.ValueKind == JsonValueKind.String + && DateTime.TryParse(untilEl.GetString(), null, System.Globalization.DateTimeStyles.RoundtripKind, out var dt)) + { + pauseUntil = dt.ToUniversalTime(); + } + + bool paused = false; + if (root.TryGetProperty("pause", out var pauseEl)) + paused = pauseEl.ValueKind == JsonValueKind.True; + + // pause_until implies pause=true (Go reference: consumer.go pauseConsumer). + if (pauseUntil.HasValue) + paused = true; + + return (paused, pauseUntil); } catch (JsonException) { } - return false; + return (false, null); } private static string? ParseStreamSubject(string subject, string prefix) diff --git a/src/NATS.Server/JetStream/Api/JetStreamApiResponse.cs b/src/NATS.Server/JetStream/Api/JetStreamApiResponse.cs index 046c7cc..45b1709 100644 --- a/src/NATS.Server/JetStream/Api/JetStreamApiResponse.cs +++ b/src/NATS.Server/JetStream/Api/JetStreamApiResponse.cs @@ -17,6 +17,18 @@ public sealed class JetStreamApiResponse public bool Success { get; init; } public ulong Purged { get; init; } + /// + /// Whether the consumer is currently paused. Populated by pause/resume API responses. + /// Go reference: server/consumer.go jsConsumerPauseResponse.paused field. + /// + public bool? Paused { get; init; } + + /// + /// UTC deadline until which the consumer is paused. Null when no deadline is set. + /// Go reference: server/consumer.go jsConsumerPauseResponse.pause_until field. + /// + public DateTime? PauseUntil { get; init; } + public static JetStreamApiResponse NotFound(string subject) => new() { Error = new JetStreamApiError @@ -66,6 +78,17 @@ public sealed class JetStreamApiResponse Success = true, Purged = purged, }; + + /// + /// Returns a pause/resume success response with current pause state. + /// Go reference: server/consumer.go jsConsumerPauseResponse — returned after pause/resume API call. + /// + public static JetStreamApiResponse PauseResponse(bool paused, DateTime? pauseUntil) => new() + { + Success = true, + Paused = paused, + PauseUntil = pauseUntil, + }; } public sealed class JetStreamStreamInfo @@ -102,6 +125,15 @@ public sealed class JetStreamDirectMessage public sealed class JetStreamSnapshot { public string Payload { get; init; } = string.Empty; + + /// Stream name this snapshot was taken from. + public string? StreamName { get; init; } + + /// Number of chunks the snapshot was split into (1 for non-chunked snapshots). + public int NumChunks { get; init; } + + /// Block/chunk size in bytes. + public int BlkSize { get; init; } } public sealed class JetStreamPullBatch diff --git a/tests/NATS.Server.Tests/JetStream/Api/ConsumerPauseApiTests.cs b/tests/NATS.Server.Tests/JetStream/Api/ConsumerPauseApiTests.cs new file mode 100644 index 0000000..7abf1da --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Api/ConsumerPauseApiTests.cs @@ -0,0 +1,181 @@ +// Go reference: server/consumer.go — pauseConsumer / resumeConsumer / isPaused +// Tests for the consumer pause/resume API endpoint, including pause_until (RFC3339) +// time-bounded pauses and response body containing pause state. + +using NATS.Server.JetStream.Api; + +namespace NATS.Server.Tests.JetStream.Api; + +public class ConsumerPauseApiTests : IAsyncLifetime +{ + private JetStreamApiFixture _fx = null!; + + public async Task InitializeAsync() + { + _fx = await JetStreamApiFixture.StartWithStreamAsync("ORDERS", "orders.*"); + _ = await _fx.CreateConsumerAsync("ORDERS", "MON", "orders.created"); + } + + public async Task DisposeAsync() => await _fx.DisposeAsync(); + + // Go ref: consumer.go pauseConsumer — pause=true pauses consumer. + [Fact] + public async Task HandlePause_with_pause_true_pauses_consumer() + { + var resp = await _fx.RequestLocalAsync( + "$JS.API.CONSUMER.PAUSE.ORDERS.MON", + "{\"pause\":true}"); + + resp.Error.ShouldBeNull(); + resp.Success.ShouldBeTrue(); + resp.Paused.ShouldBe(true); + } + + // Go ref: consumer.go resumeConsumer — pause=false resumes consumer. + [Fact] + public async Task HandlePause_with_pause_false_resumes_consumer() + { + // First pause + await _fx.RequestLocalAsync("$JS.API.CONSUMER.PAUSE.ORDERS.MON", "{\"pause\":true}"); + + // Then resume + var resp = await _fx.RequestLocalAsync( + "$JS.API.CONSUMER.PAUSE.ORDERS.MON", + "{\"pause\":false}"); + + resp.Error.ShouldBeNull(); + resp.Success.ShouldBeTrue(); + resp.Paused.ShouldBe(false); + } + + // Go ref: consumer.go pauseConsumer — pause_until sets deadline UTC datetime. + [Fact] + public async Task HandlePause_with_pause_until_sets_deadline() + { + var future = DateTime.UtcNow.AddHours(1); + var iso = future.ToString("O"); // RFC3339 round-trip format + + var resp = await _fx.RequestLocalAsync( + "$JS.API.CONSUMER.PAUSE.ORDERS.MON", + $"{{\"pause_until\":\"{iso}\"}}"); + + resp.Error.ShouldBeNull(); + resp.PauseUntil.ShouldNotBeNull(); + resp.PauseUntil!.Value.Should_Be_Close_To_Utc(future, tolerance: TimeSpan.FromSeconds(2)); + } + + // Go ref: consumer.go pauseConsumer — pause_until implies pause=true. + [Fact] + public async Task HandlePause_with_pause_until_implies_pause_true() + { + var future = DateTime.UtcNow.AddHours(1); + var iso = future.ToString("O"); + + var resp = await _fx.RequestLocalAsync( + "$JS.API.CONSUMER.PAUSE.ORDERS.MON", + $"{{\"pause_until\":\"{iso}\"}}"); + + resp.Error.ShouldBeNull(); + resp.Paused.ShouldBe(true); + } + + // Go ref: consumer.go isPaused — response includes current pause state. + [Fact] + public async Task HandlePause_returns_pause_state_in_response() + { + var resp = await _fx.RequestLocalAsync( + "$JS.API.CONSUMER.PAUSE.ORDERS.MON", + "{\"pause\":true}"); + + resp.Paused.ShouldBe(true); + + var resumeResp = await _fx.RequestLocalAsync( + "$JS.API.CONSUMER.PAUSE.ORDERS.MON", + "{\"pause\":false}"); + + resumeResp.Paused.ShouldBe(false); + } + + // Go ref: consumer.go pauseUntil — response includes pause_until when set. + [Fact] + public async Task HandlePause_returns_pause_until_in_response() + { + var future = DateTime.UtcNow.AddMinutes(30); + var iso = future.ToString("O"); + + var resp = await _fx.RequestLocalAsync( + "$JS.API.CONSUMER.PAUSE.ORDERS.MON", + $"{{\"pause_until\":\"{iso}\"}}"); + + resp.PauseUntil.ShouldNotBeNull(); + resp.PauseUntil!.Value.Kind.ShouldBe(DateTimeKind.Utc); + } + + // Go ref: consumer.go pauseConsumer — 404 when consumer not found. + [Fact] + public async Task HandlePause_returns_not_found_for_missing_consumer() + { + var resp = await _fx.RequestLocalAsync( + "$JS.API.CONSUMER.PAUSE.ORDERS.NONEXISTENT", + "{\"pause\":true}"); + + resp.Error.ShouldNotBeNull(); + resp.Error!.Code.ShouldBe(404); + } + + // Go ref: consumer.go resumeConsumer — empty payload resumes consumer. + [Fact] + public async Task HandlePause_with_empty_payload_resumes() + { + // Pause first + await _fx.RequestLocalAsync("$JS.API.CONSUMER.PAUSE.ORDERS.MON", "{\"pause\":true}"); + + // Empty body = resume + var resp = await _fx.RequestLocalAsync("$JS.API.CONSUMER.PAUSE.ORDERS.MON", ""); + + resp.Error.ShouldBeNull(); + resp.Success.ShouldBeTrue(); + resp.Paused.ShouldBe(false); + } + + // Go ref: consumer.go pauseConsumer — past pause_until auto-resumes immediately. + [Fact] + public async Task HandlePause_with_past_pause_until_auto_resumes() + { + var past = DateTime.UtcNow.AddHours(-1); + var iso = past.ToString("O"); + + var resp = await _fx.RequestLocalAsync( + "$JS.API.CONSUMER.PAUSE.ORDERS.MON", + $"{{\"pause_until\":\"{iso}\"}}"); + + // Deadline already passed — consumer should auto-resume, so paused=false. + resp.Error.ShouldBeNull(); + resp.Success.ShouldBeTrue(); + resp.Paused.ShouldBe(false); + } + + // Go ref: jsConsumerPauseT — bad subject (not matching stream.consumer pattern) returns 404. + [Fact] + public async Task HandlePause_returns_not_found_for_bad_subject() + { + var resp = await _fx.RequestLocalAsync( + "$JS.API.CONSUMER.PAUSE.ONLY_ONE_TOKEN", + "{\"pause\":true}"); + + resp.Error.ShouldNotBeNull(); + resp.Error!.Code.ShouldBe(404); + } +} + +/// +/// Shouldly-compatible extension for DateTime proximity assertions. +/// +internal static class DateTimeAssertExtensions +{ + public static void Should_Be_Close_To_Utc(this DateTime actual, DateTime expected, TimeSpan tolerance) + { + var diff = (actual.ToUniversalTime() - expected.ToUniversalTime()).Duration(); + diff.ShouldBeLessThanOrEqualTo(tolerance); + } +}