Add ResetToSequence to ConsumerManager that updates NextSequence, clears AckProcessor state via new ClearAll(), and zeroes PendingBytes. Add AckProcessor.SetAckFloor() that prunes pending entries below the new floor. Go reference: consumer.go:4241 processResetReq.
245 lines
8.0 KiB
C#
245 lines
8.0 KiB
C#
// Go reference: consumer.go:4241 (processResetReq)
|
|
using NATS.Server.JetStream;
|
|
using NATS.Server.JetStream.Consumers;
|
|
using NATS.Server.JetStream.Models;
|
|
using NATS.Server.JetStream.Storage;
|
|
using System.Text;
|
|
|
|
namespace NATS.Server.Tests.JetStream.Consumers;
|
|
|
|
/// <summary>
|
|
/// Tests for consumer reset-to-sequence (Gap 3.12) and AckProcessor.ClearAll / SetAckFloor.
|
|
/// Go reference: consumer.go:4241 processResetReq.
|
|
/// </summary>
|
|
public class ConsumerResetTests
|
|
{
|
|
private static ConsumerManager CreateManager() => new();
|
|
|
|
private static void CreateConsumer(ConsumerManager mgr, string stream, string name,
|
|
Action<ConsumerConfig>? configure = null)
|
|
{
|
|
var config = new ConsumerConfig { DurableName = name };
|
|
configure?.Invoke(config);
|
|
mgr.CreateOrUpdate(stream, config);
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// ResetToSequence tests
|
|
// -------------------------------------------------------------------------
|
|
|
|
// Go reference: consumer.go:4241 — processResetReq sets consumer.sseq to
|
|
// the requested sequence so the next fetch starts there.
|
|
[Fact]
|
|
public void ResetToSequence_updates_next_sequence()
|
|
{
|
|
var mgr = CreateManager();
|
|
CreateConsumer(mgr, "ORDERS", "oc1");
|
|
|
|
// Advance the consumer naturally so NextSequence is not 1
|
|
mgr.TryGet("ORDERS", "oc1", out var before);
|
|
before.NextSequence = 10;
|
|
|
|
mgr.ResetToSequence("ORDERS", "oc1", 5);
|
|
|
|
mgr.TryGet("ORDERS", "oc1", out var after);
|
|
after.NextSequence.ShouldBe(5UL);
|
|
}
|
|
|
|
// Go reference: consumer.go:4241 — reset clears the pending ack map so
|
|
// stale ack tokens from before the reset cannot be accepted.
|
|
[Fact]
|
|
public void ResetToSequence_clears_pending_acks()
|
|
{
|
|
var mgr = CreateManager();
|
|
CreateConsumer(mgr, "ORDERS", "oc2");
|
|
|
|
mgr.TryGet("ORDERS", "oc2", out var handle);
|
|
handle.AckProcessor.Register(3, ackWaitMs: 5000);
|
|
handle.AckProcessor.Register(7, ackWaitMs: 5000);
|
|
handle.AckProcessor.PendingCount.ShouldBe(2);
|
|
|
|
mgr.ResetToSequence("ORDERS", "oc2", 1);
|
|
|
|
handle.AckProcessor.PendingCount.ShouldBe(0);
|
|
}
|
|
|
|
// Go reference: consumer.go:4241 — pendingBytes must be zeroed on reset
|
|
// so the idle heartbeat header is correct after the reset.
|
|
[Fact]
|
|
public void ResetToSequence_clears_pending_bytes()
|
|
{
|
|
var mgr = CreateManager();
|
|
CreateConsumer(mgr, "ORDERS", "oc3");
|
|
|
|
mgr.TryGet("ORDERS", "oc3", out var handle);
|
|
handle.PendingBytes = 12345;
|
|
|
|
mgr.ResetToSequence("ORDERS", "oc3", 1);
|
|
|
|
handle.PendingBytes.ShouldBe(0L);
|
|
}
|
|
|
|
// Go reference: consumer.go:4241 — returns false when the consumer does
|
|
// not exist (unknown stream or durable name).
|
|
[Fact]
|
|
public void ResetToSequence_returns_false_for_missing_consumer()
|
|
{
|
|
var mgr = CreateManager();
|
|
|
|
mgr.ResetToSequence("NO-STREAM", "NO-CONSUMER", 1).ShouldBeFalse();
|
|
}
|
|
|
|
// Go reference: consumer.go:4241 — returns true when the consumer exists
|
|
// and the reset is applied.
|
|
[Fact]
|
|
public void ResetToSequence_returns_true_for_existing_consumer()
|
|
{
|
|
var mgr = CreateManager();
|
|
CreateConsumer(mgr, "ORDERS", "oc4");
|
|
|
|
mgr.ResetToSequence("ORDERS", "oc4", 42).ShouldBeTrue();
|
|
}
|
|
|
|
// Go reference: consumer.go:4241 — consumer config (subject filters, ack
|
|
// policy, etc.) is immutable during reset; only positional / tracking state
|
|
// is cleared.
|
|
[Fact]
|
|
public void ResetToSequence_preserves_config()
|
|
{
|
|
var mgr = CreateManager();
|
|
CreateConsumer(mgr, "ORDERS", "oc5", cfg =>
|
|
{
|
|
cfg.FilterSubject = "orders.>";
|
|
cfg.AckPolicy = AckPolicy.Explicit;
|
|
});
|
|
|
|
mgr.ResetToSequence("ORDERS", "oc5", 1);
|
|
|
|
mgr.TryGet("ORDERS", "oc5", out var handle);
|
|
handle.Config.FilterSubject.ShouldBe("orders.>");
|
|
handle.Config.AckPolicy.ShouldBe(AckPolicy.Explicit);
|
|
}
|
|
|
|
// Go reference: consumer.go:4241 — after reset the push engine can
|
|
// re-enqueue messages starting at the reset sequence.
|
|
[Fact]
|
|
public void ResetToSequence_allows_re_delivery_from_sequence()
|
|
{
|
|
var mgr = CreateManager();
|
|
CreateConsumer(mgr, "ORDERS", "oc6", cfg =>
|
|
{
|
|
cfg.Push = true;
|
|
cfg.DeliverSubject = "deliver.test";
|
|
});
|
|
|
|
mgr.TryGet("ORDERS", "oc6", out var handle);
|
|
handle.NextSequence = 50;
|
|
|
|
mgr.ResetToSequence("ORDERS", "oc6", 10);
|
|
|
|
// After reset the consumer reads from sequence 10
|
|
handle.NextSequence.ShouldBe(10UL);
|
|
|
|
// Simulate re-enqueueing a message at that sequence via OnPublished
|
|
var msg = new StoredMessage
|
|
{
|
|
Sequence = 10,
|
|
Subject = "orders.new",
|
|
Payload = Encoding.UTF8.GetBytes("data"),
|
|
TimestampUtc = DateTime.UtcNow,
|
|
};
|
|
mgr.OnPublished("ORDERS", msg);
|
|
|
|
// Message should be in the push frame queue
|
|
handle.PushFrames.Count.ShouldBeGreaterThan(0);
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// AckProcessor.ClearAll tests
|
|
// -------------------------------------------------------------------------
|
|
|
|
// Go reference: consumer.go processResetReq — pending ack map cleared
|
|
[Fact]
|
|
public void ClearAll_clears_pending()
|
|
{
|
|
var processor = new AckProcessor();
|
|
processor.Register(1, ackWaitMs: 5000);
|
|
processor.Register(2, ackWaitMs: 5000);
|
|
processor.Register(3, ackWaitMs: 5000);
|
|
processor.PendingCount.ShouldBe(3);
|
|
|
|
processor.ClearAll();
|
|
|
|
processor.PendingCount.ShouldBe(0);
|
|
}
|
|
|
|
// Go reference: consumer.go processResetReq — terminated set cleared
|
|
[Fact]
|
|
public void ClearAll_clears_terminated()
|
|
{
|
|
var processor = new AckProcessor();
|
|
processor.Register(1, ackWaitMs: 5000);
|
|
processor.Register(2, ackWaitMs: 5000);
|
|
processor.ProcessTerm(1);
|
|
processor.ProcessTerm(2);
|
|
processor.TerminatedCount.ShouldBe(2);
|
|
|
|
processor.ClearAll();
|
|
|
|
processor.TerminatedCount.ShouldBe(0);
|
|
}
|
|
|
|
// Go reference: consumer.go processResetReq — ack floor reset to 0
|
|
[Fact]
|
|
public void ClearAll_resets_ack_floor()
|
|
{
|
|
var processor = new AckProcessor();
|
|
processor.Register(1, ackWaitMs: 5000);
|
|
processor.Register(2, ackWaitMs: 5000);
|
|
processor.AckSequence(1);
|
|
processor.AckSequence(2);
|
|
processor.AckFloor.ShouldBeGreaterThan(0UL);
|
|
|
|
processor.ClearAll();
|
|
|
|
processor.AckFloor.ShouldBe(0UL);
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// AckProcessor.SetAckFloor tests
|
|
// -------------------------------------------------------------------------
|
|
|
|
// Go reference: consumer.go processResetReq — ack floor can be set to a
|
|
// specific sequence to reflect the stream state after reset.
|
|
[Fact]
|
|
public void SetAckFloor_updates_floor()
|
|
{
|
|
var processor = new AckProcessor();
|
|
|
|
processor.SetAckFloor(99);
|
|
|
|
processor.AckFloor.ShouldBe(99UL);
|
|
}
|
|
|
|
// Go reference: consumer.go processResetReq — any pending sequences below
|
|
// the new floor are irrelevant (already delivered before the floor) and
|
|
// must be pruned to avoid ghost acks.
|
|
[Fact]
|
|
public void SetAckFloor_removes_entries_below_floor()
|
|
{
|
|
var processor = new AckProcessor();
|
|
processor.Register(1, ackWaitMs: 5000);
|
|
processor.Register(2, ackWaitMs: 5000);
|
|
processor.Register(5, ackWaitMs: 5000);
|
|
processor.Register(10, ackWaitMs: 5000);
|
|
processor.PendingCount.ShouldBe(4);
|
|
|
|
processor.SetAckFloor(5);
|
|
|
|
// Sequences 1, 2, and 5 (<=5) are below or at the new floor and must be removed
|
|
processor.PendingCount.ShouldBe(1);
|
|
// Sequence 10 is above the floor and must remain
|
|
processor.HasPending.ShouldBeTrue();
|
|
}
|
|
}
|