diff --git a/src/NATS.Server/JetStream/ConsumerManager.cs b/src/NATS.Server/JetStream/ConsumerManager.cs index 31c216a..57e1376 100644 --- a/src/NATS.Server/JetStream/ConsumerManager.cs +++ b/src/NATS.Server/JetStream/ConsumerManager.cs @@ -225,6 +225,28 @@ public sealed class ConsumerManager : IDisposable return true; } + /// + /// Resets a consumer's position to the specified sequence. + /// Clears pending acks and redelivery state. + /// Go reference: consumer.go:4241 processResetReq. + /// + public bool ResetToSequence(string stream, string durableName, ulong sequence) + { + if (!_consumers.TryGetValue((stream, durableName), out var handle)) + return false; + + // Update the consumer's next sequence + handle.NextSequence = sequence; + + // Clear pending acks — all outstanding acks are invalid after reset + handle.AckProcessor.ClearAll(); + + // Clear pending bytes + handle.PendingBytes = 0; + + return true; + } + public bool Unpin(string stream, string durableName) { return _consumers.ContainsKey((stream, durableName)); diff --git a/src/NATS.Server/JetStream/Consumers/AckProcessor.cs b/src/NATS.Server/JetStream/Consumers/AckProcessor.cs index 8f63e9b..30bca35 100644 --- a/src/NATS.Server/JetStream/Consumers/AckProcessor.cs +++ b/src/NATS.Server/JetStream/Consumers/AckProcessor.cs @@ -294,6 +294,32 @@ public sealed class AckProcessor _pending.Remove(sequence); } + /// + /// Clears all pending acks, terminated set, and resets the ack floor. + /// Used during consumer reset to specific sequence. + /// Go reference: consumer.go processResetReq — clear all tracking state. + /// + public void ClearAll() + { + _pending.Clear(); + _terminated.Clear(); + AckFloor = 0; + TerminatedCount = 0; + _exceededSequences.Clear(); + } + + /// + /// Resets the ack floor to the specified value. + /// Used during consumer reset. + /// + public void SetAckFloor(ulong floor) + { + AckFloor = floor; + // Remove any pending entries below or at the new floor + foreach (var key in _pending.Keys.Where(k => k <= floor).ToArray()) + _pending.Remove(key); + } + public bool HasPending => _pending.Count > 0; public int PendingCount => _pending.Count; diff --git a/tests/NATS.Server.Tests/JetStream/Consumers/ConsumerResetTests.cs b/tests/NATS.Server.Tests/JetStream/Consumers/ConsumerResetTests.cs new file mode 100644 index 0000000..2f75048 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Consumers/ConsumerResetTests.cs @@ -0,0 +1,244 @@ +// Go reference: consumer.go:4241 (processResetReq) +using NATS.Server.JetStream; +using NATS.Server.JetStream.Consumers; +using NATS.Server.JetStream.Models; +using NATS.Server.JetStream.Storage; +using System.Text; + +namespace NATS.Server.Tests.JetStream.Consumers; + +/// +/// Tests for consumer reset-to-sequence (Gap 3.12) and AckProcessor.ClearAll / SetAckFloor. +/// Go reference: consumer.go:4241 processResetReq. +/// +public class ConsumerResetTests +{ + private static ConsumerManager CreateManager() => new(); + + private static void CreateConsumer(ConsumerManager mgr, string stream, string name, + Action? configure = null) + { + var config = new ConsumerConfig { DurableName = name }; + configure?.Invoke(config); + mgr.CreateOrUpdate(stream, config); + } + + // ------------------------------------------------------------------------- + // ResetToSequence tests + // ------------------------------------------------------------------------- + + // Go reference: consumer.go:4241 — processResetReq sets consumer.sseq to + // the requested sequence so the next fetch starts there. + [Fact] + public void ResetToSequence_updates_next_sequence() + { + var mgr = CreateManager(); + CreateConsumer(mgr, "ORDERS", "oc1"); + + // Advance the consumer naturally so NextSequence is not 1 + mgr.TryGet("ORDERS", "oc1", out var before); + before.NextSequence = 10; + + mgr.ResetToSequence("ORDERS", "oc1", 5); + + mgr.TryGet("ORDERS", "oc1", out var after); + after.NextSequence.ShouldBe(5UL); + } + + // Go reference: consumer.go:4241 — reset clears the pending ack map so + // stale ack tokens from before the reset cannot be accepted. + [Fact] + public void ResetToSequence_clears_pending_acks() + { + var mgr = CreateManager(); + CreateConsumer(mgr, "ORDERS", "oc2"); + + mgr.TryGet("ORDERS", "oc2", out var handle); + handle.AckProcessor.Register(3, ackWaitMs: 5000); + handle.AckProcessor.Register(7, ackWaitMs: 5000); + handle.AckProcessor.PendingCount.ShouldBe(2); + + mgr.ResetToSequence("ORDERS", "oc2", 1); + + handle.AckProcessor.PendingCount.ShouldBe(0); + } + + // Go reference: consumer.go:4241 — pendingBytes must be zeroed on reset + // so the idle heartbeat header is correct after the reset. + [Fact] + public void ResetToSequence_clears_pending_bytes() + { + var mgr = CreateManager(); + CreateConsumer(mgr, "ORDERS", "oc3"); + + mgr.TryGet("ORDERS", "oc3", out var handle); + handle.PendingBytes = 12345; + + mgr.ResetToSequence("ORDERS", "oc3", 1); + + handle.PendingBytes.ShouldBe(0L); + } + + // Go reference: consumer.go:4241 — returns false when the consumer does + // not exist (unknown stream or durable name). + [Fact] + public void ResetToSequence_returns_false_for_missing_consumer() + { + var mgr = CreateManager(); + + mgr.ResetToSequence("NO-STREAM", "NO-CONSUMER", 1).ShouldBeFalse(); + } + + // Go reference: consumer.go:4241 — returns true when the consumer exists + // and the reset is applied. + [Fact] + public void ResetToSequence_returns_true_for_existing_consumer() + { + var mgr = CreateManager(); + CreateConsumer(mgr, "ORDERS", "oc4"); + + mgr.ResetToSequence("ORDERS", "oc4", 42).ShouldBeTrue(); + } + + // Go reference: consumer.go:4241 — consumer config (subject filters, ack + // policy, etc.) is immutable during reset; only positional / tracking state + // is cleared. + [Fact] + public void ResetToSequence_preserves_config() + { + var mgr = CreateManager(); + CreateConsumer(mgr, "ORDERS", "oc5", cfg => + { + cfg.FilterSubject = "orders.>"; + cfg.AckPolicy = AckPolicy.Explicit; + }); + + mgr.ResetToSequence("ORDERS", "oc5", 1); + + mgr.TryGet("ORDERS", "oc5", out var handle); + handle.Config.FilterSubject.ShouldBe("orders.>"); + handle.Config.AckPolicy.ShouldBe(AckPolicy.Explicit); + } + + // Go reference: consumer.go:4241 — after reset the push engine can + // re-enqueue messages starting at the reset sequence. + [Fact] + public void ResetToSequence_allows_re_delivery_from_sequence() + { + var mgr = CreateManager(); + CreateConsumer(mgr, "ORDERS", "oc6", cfg => + { + cfg.Push = true; + cfg.DeliverSubject = "deliver.test"; + }); + + mgr.TryGet("ORDERS", "oc6", out var handle); + handle.NextSequence = 50; + + mgr.ResetToSequence("ORDERS", "oc6", 10); + + // After reset the consumer reads from sequence 10 + handle.NextSequence.ShouldBe(10UL); + + // Simulate re-enqueueing a message at that sequence via OnPublished + var msg = new StoredMessage + { + Sequence = 10, + Subject = "orders.new", + Payload = Encoding.UTF8.GetBytes("data"), + TimestampUtc = DateTime.UtcNow, + }; + mgr.OnPublished("ORDERS", msg); + + // Message should be in the push frame queue + handle.PushFrames.Count.ShouldBeGreaterThan(0); + } + + // ------------------------------------------------------------------------- + // AckProcessor.ClearAll tests + // ------------------------------------------------------------------------- + + // Go reference: consumer.go processResetReq — pending ack map cleared + [Fact] + public void ClearAll_clears_pending() + { + var processor = new AckProcessor(); + processor.Register(1, ackWaitMs: 5000); + processor.Register(2, ackWaitMs: 5000); + processor.Register(3, ackWaitMs: 5000); + processor.PendingCount.ShouldBe(3); + + processor.ClearAll(); + + processor.PendingCount.ShouldBe(0); + } + + // Go reference: consumer.go processResetReq — terminated set cleared + [Fact] + public void ClearAll_clears_terminated() + { + var processor = new AckProcessor(); + processor.Register(1, ackWaitMs: 5000); + processor.Register(2, ackWaitMs: 5000); + processor.ProcessTerm(1); + processor.ProcessTerm(2); + processor.TerminatedCount.ShouldBe(2); + + processor.ClearAll(); + + processor.TerminatedCount.ShouldBe(0); + } + + // Go reference: consumer.go processResetReq — ack floor reset to 0 + [Fact] + public void ClearAll_resets_ack_floor() + { + var processor = new AckProcessor(); + processor.Register(1, ackWaitMs: 5000); + processor.Register(2, ackWaitMs: 5000); + processor.AckSequence(1); + processor.AckSequence(2); + processor.AckFloor.ShouldBeGreaterThan(0UL); + + processor.ClearAll(); + + processor.AckFloor.ShouldBe(0UL); + } + + // ------------------------------------------------------------------------- + // AckProcessor.SetAckFloor tests + // ------------------------------------------------------------------------- + + // Go reference: consumer.go processResetReq — ack floor can be set to a + // specific sequence to reflect the stream state after reset. + [Fact] + public void SetAckFloor_updates_floor() + { + var processor = new AckProcessor(); + + processor.SetAckFloor(99); + + processor.AckFloor.ShouldBe(99UL); + } + + // Go reference: consumer.go processResetReq — any pending sequences below + // the new floor are irrelevant (already delivered before the floor) and + // must be pruned to avoid ghost acks. + [Fact] + public void SetAckFloor_removes_entries_below_floor() + { + var processor = new AckProcessor(); + processor.Register(1, ackWaitMs: 5000); + processor.Register(2, ackWaitMs: 5000); + processor.Register(5, ackWaitMs: 5000); + processor.Register(10, ackWaitMs: 5000); + processor.PendingCount.ShouldBe(4); + + processor.SetAckFloor(5); + + // Sequences 1, 2, and 5 (<=5) are below or at the new floor and must be removed + processor.PendingCount.ShouldBe(1); + // Sequence 10 is above the floor and must remain + processor.HasPending.ShouldBeTrue(); + } +}