feat: add consumer reset to specific sequence (Gap 3.12)

Add ResetToSequence to ConsumerManager that updates NextSequence,
clears AckProcessor state via new ClearAll(), and zeroes PendingBytes.
Add AckProcessor.SetAckFloor() that prunes pending entries below the
new floor. Go reference: consumer.go:4241 processResetReq.
This commit is contained in:
Joseph Doherty
2026-02-25 11:15:33 -05:00
parent b9aa62ae99
commit 778687cf6f
3 changed files with 292 additions and 0 deletions

View File

@@ -225,6 +225,28 @@ public sealed class ConsumerManager : IDisposable
return true;
}
/// <summary>
/// Resets a consumer's position to the specified sequence.
/// Clears pending acks and redelivery state.
/// Go reference: consumer.go:4241 processResetReq.
/// </summary>
public bool ResetToSequence(string stream, string durableName, ulong sequence)
{
if (!_consumers.TryGetValue((stream, durableName), out var handle))
return false;
// Update the consumer's next sequence
handle.NextSequence = sequence;
// Clear pending acks — all outstanding acks are invalid after reset
handle.AckProcessor.ClearAll();
// Clear pending bytes
handle.PendingBytes = 0;
return true;
}
public bool Unpin(string stream, string durableName)
{
return _consumers.ContainsKey((stream, durableName));

View File

@@ -294,6 +294,32 @@ public sealed class AckProcessor
_pending.Remove(sequence);
}
/// <summary>
/// Clears all pending acks, terminated set, and resets the ack floor.
/// Used during consumer reset to specific sequence.
/// Go reference: consumer.go processResetReq — clear all tracking state.
/// </summary>
public void ClearAll()
{
_pending.Clear();
_terminated.Clear();
AckFloor = 0;
TerminatedCount = 0;
_exceededSequences.Clear();
}
/// <summary>
/// Resets the ack floor to the specified value.
/// Used during consumer reset.
/// </summary>
public void SetAckFloor(ulong floor)
{
AckFloor = floor;
// Remove any pending entries below or at the new floor
foreach (var key in _pending.Keys.Where(k => k <= floor).ToArray())
_pending.Remove(key);
}
public bool HasPending => _pending.Count > 0;
public int PendingCount => _pending.Count;

View File

@@ -0,0 +1,244 @@
// Go reference: consumer.go:4241 (processResetReq)
using NATS.Server.JetStream;
using NATS.Server.JetStream.Consumers;
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Storage;
using System.Text;
namespace NATS.Server.Tests.JetStream.Consumers;
/// <summary>
/// Tests for consumer reset-to-sequence (Gap 3.12) and AckProcessor.ClearAll / SetAckFloor.
/// Go reference: consumer.go:4241 processResetReq.
/// </summary>
public class ConsumerResetTests
{
private static ConsumerManager CreateManager() => new();
private static void CreateConsumer(ConsumerManager mgr, string stream, string name,
Action<ConsumerConfig>? configure = null)
{
var config = new ConsumerConfig { DurableName = name };
configure?.Invoke(config);
mgr.CreateOrUpdate(stream, config);
}
// -------------------------------------------------------------------------
// ResetToSequence tests
// -------------------------------------------------------------------------
// Go reference: consumer.go:4241 — processResetReq sets consumer.sseq to
// the requested sequence so the next fetch starts there.
[Fact]
public void ResetToSequence_updates_next_sequence()
{
var mgr = CreateManager();
CreateConsumer(mgr, "ORDERS", "oc1");
// Advance the consumer naturally so NextSequence is not 1
mgr.TryGet("ORDERS", "oc1", out var before);
before.NextSequence = 10;
mgr.ResetToSequence("ORDERS", "oc1", 5);
mgr.TryGet("ORDERS", "oc1", out var after);
after.NextSequence.ShouldBe(5UL);
}
// Go reference: consumer.go:4241 — reset clears the pending ack map so
// stale ack tokens from before the reset cannot be accepted.
[Fact]
public void ResetToSequence_clears_pending_acks()
{
var mgr = CreateManager();
CreateConsumer(mgr, "ORDERS", "oc2");
mgr.TryGet("ORDERS", "oc2", out var handle);
handle.AckProcessor.Register(3, ackWaitMs: 5000);
handle.AckProcessor.Register(7, ackWaitMs: 5000);
handle.AckProcessor.PendingCount.ShouldBe(2);
mgr.ResetToSequence("ORDERS", "oc2", 1);
handle.AckProcessor.PendingCount.ShouldBe(0);
}
// Go reference: consumer.go:4241 — pendingBytes must be zeroed on reset
// so the idle heartbeat header is correct after the reset.
[Fact]
public void ResetToSequence_clears_pending_bytes()
{
var mgr = CreateManager();
CreateConsumer(mgr, "ORDERS", "oc3");
mgr.TryGet("ORDERS", "oc3", out var handle);
handle.PendingBytes = 12345;
mgr.ResetToSequence("ORDERS", "oc3", 1);
handle.PendingBytes.ShouldBe(0L);
}
// Go reference: consumer.go:4241 — returns false when the consumer does
// not exist (unknown stream or durable name).
[Fact]
public void ResetToSequence_returns_false_for_missing_consumer()
{
var mgr = CreateManager();
mgr.ResetToSequence("NO-STREAM", "NO-CONSUMER", 1).ShouldBeFalse();
}
// Go reference: consumer.go:4241 — returns true when the consumer exists
// and the reset is applied.
[Fact]
public void ResetToSequence_returns_true_for_existing_consumer()
{
var mgr = CreateManager();
CreateConsumer(mgr, "ORDERS", "oc4");
mgr.ResetToSequence("ORDERS", "oc4", 42).ShouldBeTrue();
}
// Go reference: consumer.go:4241 — consumer config (subject filters, ack
// policy, etc.) is immutable during reset; only positional / tracking state
// is cleared.
[Fact]
public void ResetToSequence_preserves_config()
{
var mgr = CreateManager();
CreateConsumer(mgr, "ORDERS", "oc5", cfg =>
{
cfg.FilterSubject = "orders.>";
cfg.AckPolicy = AckPolicy.Explicit;
});
mgr.ResetToSequence("ORDERS", "oc5", 1);
mgr.TryGet("ORDERS", "oc5", out var handle);
handle.Config.FilterSubject.ShouldBe("orders.>");
handle.Config.AckPolicy.ShouldBe(AckPolicy.Explicit);
}
// Go reference: consumer.go:4241 — after reset the push engine can
// re-enqueue messages starting at the reset sequence.
[Fact]
public void ResetToSequence_allows_re_delivery_from_sequence()
{
var mgr = CreateManager();
CreateConsumer(mgr, "ORDERS", "oc6", cfg =>
{
cfg.Push = true;
cfg.DeliverSubject = "deliver.test";
});
mgr.TryGet("ORDERS", "oc6", out var handle);
handle.NextSequence = 50;
mgr.ResetToSequence("ORDERS", "oc6", 10);
// After reset the consumer reads from sequence 10
handle.NextSequence.ShouldBe(10UL);
// Simulate re-enqueueing a message at that sequence via OnPublished
var msg = new StoredMessage
{
Sequence = 10,
Subject = "orders.new",
Payload = Encoding.UTF8.GetBytes("data"),
TimestampUtc = DateTime.UtcNow,
};
mgr.OnPublished("ORDERS", msg);
// Message should be in the push frame queue
handle.PushFrames.Count.ShouldBeGreaterThan(0);
}
// -------------------------------------------------------------------------
// AckProcessor.ClearAll tests
// -------------------------------------------------------------------------
// Go reference: consumer.go processResetReq — pending ack map cleared
[Fact]
public void ClearAll_clears_pending()
{
var processor = new AckProcessor();
processor.Register(1, ackWaitMs: 5000);
processor.Register(2, ackWaitMs: 5000);
processor.Register(3, ackWaitMs: 5000);
processor.PendingCount.ShouldBe(3);
processor.ClearAll();
processor.PendingCount.ShouldBe(0);
}
// Go reference: consumer.go processResetReq — terminated set cleared
[Fact]
public void ClearAll_clears_terminated()
{
var processor = new AckProcessor();
processor.Register(1, ackWaitMs: 5000);
processor.Register(2, ackWaitMs: 5000);
processor.ProcessTerm(1);
processor.ProcessTerm(2);
processor.TerminatedCount.ShouldBe(2);
processor.ClearAll();
processor.TerminatedCount.ShouldBe(0);
}
// Go reference: consumer.go processResetReq — ack floor reset to 0
[Fact]
public void ClearAll_resets_ack_floor()
{
var processor = new AckProcessor();
processor.Register(1, ackWaitMs: 5000);
processor.Register(2, ackWaitMs: 5000);
processor.AckSequence(1);
processor.AckSequence(2);
processor.AckFloor.ShouldBeGreaterThan(0UL);
processor.ClearAll();
processor.AckFloor.ShouldBe(0UL);
}
// -------------------------------------------------------------------------
// AckProcessor.SetAckFloor tests
// -------------------------------------------------------------------------
// Go reference: consumer.go processResetReq — ack floor can be set to a
// specific sequence to reflect the stream state after reset.
[Fact]
public void SetAckFloor_updates_floor()
{
var processor = new AckProcessor();
processor.SetAckFloor(99);
processor.AckFloor.ShouldBe(99UL);
}
// Go reference: consumer.go processResetReq — any pending sequences below
// the new floor are irrelevant (already delivered before the floor) and
// must be pruned to avoid ghost acks.
[Fact]
public void SetAckFloor_removes_entries_below_floor()
{
var processor = new AckProcessor();
processor.Register(1, ackWaitMs: 5000);
processor.Register(2, ackWaitMs: 5000);
processor.Register(5, ackWaitMs: 5000);
processor.Register(10, ackWaitMs: 5000);
processor.PendingCount.ShouldBe(4);
processor.SetAckFloor(5);
// Sequences 1, 2, and 5 (<=5) are below or at the new floor and must be removed
processor.PendingCount.ShouldBe(1);
// Sequence 10 is above the floor and must remain
processor.HasPending.ShouldBeTrue();
}
}