diff --git a/src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs b/src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs index 0425dc0..3466e03 100644 --- a/src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs +++ b/src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs @@ -1,3 +1,6 @@ +// Go: consumer.go (sendIdleHeartbeat ~line 5222, sendFlowControl ~line 5495, +// deliverMsg ~line 5364, dispatchToDeliver ~line 5040) +using System.Text; using NATS.Server.JetStream.Models; using NATS.Server.JetStream.Storage; @@ -5,6 +8,12 @@ namespace NATS.Server.JetStream.Consumers; public sealed class PushConsumerEngine { + // Go: consumer.go — DeliverSubject routes push-mode messages (cfg.DeliverSubject) + public string DeliverSubject { get; private set; } = string.Empty; + + private CancellationTokenSource? _cts; + private Task? _deliveryTask; + public void Enqueue(ConsumerHandle consumer, StoredMessage message) { if (message.Sequence <= consumer.AckProcessor.AckFloor) @@ -48,6 +57,109 @@ public sealed class PushConsumerEngine }); } } + + // Go: consumer.go:1131 — dsubj is set from cfg.DeliverSubject at consumer creation. + // StartDeliveryLoop wires the background pump that drains PushFrames and calls + // sendMessage for each frame. The delegate matches the wire-level send signature used + // by NatsClient.SendMessage, mapped to an async ValueTask for testability. + public void StartDeliveryLoop( + ConsumerHandle consumer, + Func, ReadOnlyMemory, CancellationToken, ValueTask> sendMessage, + CancellationToken ct) + { + DeliverSubject = consumer.Config.DeliverSubject; + + _cts = CancellationTokenSource.CreateLinkedTokenSource(ct); + var token = _cts.Token; + + _deliveryTask = Task.Run(() => RunDeliveryLoopAsync(consumer, sendMessage, token), token); + } + + public void StopDeliveryLoop() + { + _cts?.Cancel(); + _cts?.Dispose(); + _cts = null; + } + + // Go: consumer.go:5040 — dispatchToDeliver drains the outbound message queue. + // For push consumers the dsubj is cfg.DeliverSubject; each stored message is + // formatted as an HMSG with JetStream metadata headers. + private static async Task RunDeliveryLoopAsync( + ConsumerHandle consumer, + Func, ReadOnlyMemory, CancellationToken, ValueTask> sendMessage, + CancellationToken ct) + { + var deliverSubject = consumer.Config.DeliverSubject; + + while (!ct.IsCancellationRequested) + { + if (consumer.PushFrames.Count == 0) + { + // Yield to avoid busy-spin when the queue is empty + await Task.Delay(1, ct).ConfigureAwait(false); + continue; + } + + var frame = consumer.PushFrames.Peek(); + + // Go: consumer.go — rate-limit by honouring AvailableAtUtc before dequeuing + var now = DateTime.UtcNow; + if (frame.AvailableAtUtc > now) + { + var wait = frame.AvailableAtUtc - now; + try + { + await Task.Delay(wait, ct).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + break; + } + continue; + } + + consumer.PushFrames.Dequeue(); + + try + { + if (frame.IsData && frame.Message is { } msg) + { + // Go: consumer.go:5067 — build JetStream metadata headers + // Header format: NATS/1.0\r\nNats-Sequence: {seq}\r\nNats-Time-Stamp: {ts}\r\nNats-Subject: {subj}\r\n\r\n + var headers = BuildDataHeaders(msg); + var subject = string.IsNullOrEmpty(deliverSubject) ? msg.Subject : deliverSubject; + await sendMessage(subject, msg.Subject, headers, msg.Payload, ct).ConfigureAwait(false); + } + else if (frame.IsFlowControl) + { + // Go: consumer.go:5501 — "NATS/1.0 100 FlowControl Request\r\n\r\n" + var headers = "NATS/1.0 100 FlowControl Request\r\nNats-Flow-Control: \r\n\r\n"u8.ToArray(); + var subject = string.IsNullOrEmpty(deliverSubject) ? "_fc_" : deliverSubject; + await sendMessage(subject, string.Empty, headers, ReadOnlyMemory.Empty, ct).ConfigureAwait(false); + } + else if (frame.IsHeartbeat) + { + // Go: consumer.go:5223 — "NATS/1.0 100 Idle Heartbeat\r\n..." + var headers = "NATS/1.0 100 Idle Heartbeat\r\n\r\n"u8.ToArray(); + var subject = string.IsNullOrEmpty(deliverSubject) ? "_hb_" : deliverSubject; + await sendMessage(subject, string.Empty, headers, ReadOnlyMemory.Empty, ct).ConfigureAwait(false); + } + } + catch (OperationCanceledException) + { + break; + } + } + } + + // Go: stream.go:586 — JSSequence = "Nats-Sequence", JSTimeStamp = "Nats-Time-Stamp", JSSubject = "Nats-Subject" + private static ReadOnlyMemory BuildDataHeaders(StoredMessage msg) + { + var ts = msg.TimestampUtc.ToString("O"); // ISO-8601 round-trip + var header = $"NATS/1.0\r\nNats-Sequence: {msg.Sequence}\r\nNats-Time-Stamp: {ts}\r\nNats-Subject: {msg.Subject}\r\n\r\n"; + return Encoding.ASCII.GetBytes(header); + } } public sealed class PushFrame diff --git a/src/NATS.Server/JetStream/Consumers/RedeliveryTracker.cs b/src/NATS.Server/JetStream/Consumers/RedeliveryTracker.cs new file mode 100644 index 0000000..eb003ed --- /dev/null +++ b/src/NATS.Server/JetStream/Consumers/RedeliveryTracker.cs @@ -0,0 +1,92 @@ +// Go: consumer.go (trackPending, processNak, rdc map, addToRedeliverQueue ~line 5540) +// RedeliveryTracker manages sequences waiting for redelivery after a NAK or ack-wait +// expiry. It mirrors the Go consumer's rdc (redelivery count) map combined with the +// rdq (redelivery queue) priority ordering. +namespace NATS.Server.JetStream.Consumers; + +public sealed class RedeliveryTracker +{ + private readonly int[] _backoffMs; + + // Go: consumer.go — pending maps sseq → (deadline, deliveries) + private readonly Dictionary _entries = new(); + + // Go: consumer.go:100 — BackOff []time.Duration in ConsumerConfig; empty falls back to ackWait + public RedeliveryTracker(int[] backoffMs) + { + _backoffMs = backoffMs; + } + + // Go: consumer.go:5540 — trackPending records delivery count and schedules deadline + // using the backoff array indexed by (deliveryCount-1), clamped at last entry. + // Returns the UTC time at which the sequence next becomes eligible for redelivery. + public DateTime Schedule(ulong seq, int deliveryCount, int ackWaitMs = 0) + { + var delayMs = ResolveDelay(deliveryCount, ackWaitMs); + var deadline = DateTime.UtcNow.AddMilliseconds(Math.Max(delayMs, 1)); + + _entries[seq] = new RedeliveryEntry + { + DeadlineUtc = deadline, + DeliveryCount = deliveryCount, + }; + + return deadline; + } + + // Go: consumer.go — rdq entries are dispatched once their deadline has passed + public IReadOnlyList GetDue() + { + var now = DateTime.UtcNow; + List? due = null; + + foreach (var (seq, entry) in _entries) + { + if (entry.DeadlineUtc <= now) + { + due ??= []; + due.Add(seq); + } + } + + return due ?? (IReadOnlyList)[]; + } + + // Go: consumer.go — acking a sequence removes it from the pending redelivery set + public void Acknowledge(ulong seq) => _entries.Remove(seq); + + // Go: consumer.go — maxdeliver check: drop sequence once delivery count exceeds max + public bool IsMaxDeliveries(ulong seq, int maxDeliver) + { + if (maxDeliver <= 0) + return false; + + if (!_entries.TryGetValue(seq, out var entry)) + return false; + + return entry.DeliveryCount >= maxDeliver; + } + + public bool IsTracking(ulong seq) => _entries.ContainsKey(seq); + + public int TrackedCount => _entries.Count; + + // Go: consumer.go — backoff index = min(deliveries-1, len(backoff)-1); + // falls back to ackWaitMs when the backoff array is empty. + private int ResolveDelay(int deliveryCount, int ackWaitMs) + { + if (_backoffMs.Length == 0) + return Math.Max(ackWaitMs, 1); + + var idx = Math.Min(deliveryCount - 1, _backoffMs.Length - 1); + if (idx < 0) + idx = 0; + return _backoffMs[idx]; + } + + private sealed class RedeliveryEntry + { + public DateTime DeadlineUtc { get; set; } + public int DeliveryCount { get; set; } + } +} diff --git a/src/NATS.Server/JetStream/Models/ConsumerConfig.cs b/src/NATS.Server/JetStream/Models/ConsumerConfig.cs index 463c8b0..434f227 100644 --- a/src/NATS.Server/JetStream/Models/ConsumerConfig.cs +++ b/src/NATS.Server/JetStream/Models/ConsumerConfig.cs @@ -15,6 +15,8 @@ public sealed class ConsumerConfig public int MaxDeliver { get; set; } = 1; public int MaxAckPending { get; set; } public bool Push { get; set; } + // Go: consumer.go:115 — deliver_subject routes push messages to a NATS subject + public string DeliverSubject { get; set; } = string.Empty; public int HeartbeatMs { get; set; } public List BackOffMs { get; set; } = []; public bool FlowControl { get; set; } diff --git a/tests/NATS.Server.Tests/JetStream/Consumers/PushConsumerDeliveryTests.cs b/tests/NATS.Server.Tests/JetStream/Consumers/PushConsumerDeliveryTests.cs new file mode 100644 index 0000000..203f815 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Consumers/PushConsumerDeliveryTests.cs @@ -0,0 +1,317 @@ +// Go: consumer.go (dispatchToDeliver ~line 5040, sendFlowControl ~line 5495, +// sendIdleHeartbeat ~line 5222, rate-limit logic ~line 5120) +using System.Collections.Concurrent; +using System.Text; +using NATS.Server.JetStream; +using NATS.Server.JetStream.Consumers; +using NATS.Server.JetStream.Models; +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.Tests.JetStream.Consumers; + +public class PushConsumerDeliveryTests +{ + // Helper: build a ConsumerHandle wired with the given config + private static ConsumerHandle MakeConsumer(ConsumerConfig config) + => new("TEST-STREAM", config); + + // Helper: build a minimal StoredMessage + private static StoredMessage MakeMessage(ulong seq, string subject = "test.subject", string payload = "hello") + => new() + { + Sequence = seq, + Subject = subject, + Payload = Encoding.UTF8.GetBytes(payload), + TimestampUtc = DateTime.UtcNow, + }; + + // ------------------------------------------------------------------------- + // Test 1 — Delivery loop sends messages in FIFO order + // + // Go reference: consumer.go:5040 — dispatchToDeliver processes the outbound + // queue sequentially; messages must arrive in the order they were enqueued. + // ------------------------------------------------------------------------- + [Fact] + public async Task DeliveryLoop_sends_messages_in_FIFO_order() + { + var engine = new PushConsumerEngine(); + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "PUSH", + Push = true, + DeliverSubject = "deliver.test", + }); + + engine.Enqueue(consumer, MakeMessage(1, payload: "first")); + engine.Enqueue(consumer, MakeMessage(2, payload: "second")); + engine.Enqueue(consumer, MakeMessage(3, payload: "third")); + + var received = new ConcurrentQueue<(string subject, ReadOnlyMemory payload)>(); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + + engine.StartDeliveryLoop(consumer, + async (subj, _, _, payload, ct) => + { + received.Enqueue((subj, payload)); + await ValueTask.CompletedTask; + }, + cts.Token); + + // Wait until all three messages are delivered + while (received.Count < 3 && !cts.IsCancellationRequested) + await Task.Delay(5, cts.Token); + + engine.StopDeliveryLoop(); + + received.Count.ShouldBe(3); + var items = received.ToArray(); + Encoding.UTF8.GetString(items[0].payload.Span).ShouldBe("first"); + Encoding.UTF8.GetString(items[1].payload.Span).ShouldBe("second"); + Encoding.UTF8.GetString(items[2].payload.Span).ShouldBe("third"); + } + + // ------------------------------------------------------------------------- + // Test 2 — Rate limiting delays delivery + // + // Go reference: consumer.go:5120 — the rate limiter delays sending when + // AvailableAtUtc is in the future. A frame whose AvailableAtUtc is 100ms + // ahead must not be delivered until that deadline has passed. + // The delivery loop honours frame.AvailableAtUtc directly; this test + // injects a frame with a known future timestamp to verify that behaviour. + // ------------------------------------------------------------------------- + [Fact] + public async Task DeliveryLoop_rate_limiting_delays_delivery() + { + var engine = new PushConsumerEngine(); + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "RATE", + Push = true, + DeliverSubject = "deliver.rate", + }); + + // Inject a frame with AvailableAtUtc 150ms in the future to simulate + // what Enqueue() computes when RateLimitBps produces a delay. + var msg = MakeMessage(1); + consumer.PushFrames.Enqueue(new PushFrame + { + IsData = true, + Message = msg, + AvailableAtUtc = DateTime.UtcNow.AddMilliseconds(150), + }); + + var delivered = new TaskCompletionSource(); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + + var startedAt = DateTime.UtcNow; + engine.StartDeliveryLoop(consumer, + async (_, _, _, _, _) => + { + delivered.TrySetResult(DateTime.UtcNow); + await ValueTask.CompletedTask; + }, + cts.Token); + + var deliveredAt = await delivered.Task.WaitAsync(TimeSpan.FromSeconds(5)); + engine.StopDeliveryLoop(); + + // The loop must have waited at least ~100ms for AvailableAtUtc to pass + var elapsed = deliveredAt - startedAt; + elapsed.TotalMilliseconds.ShouldBeGreaterThan(100); + } + + // ------------------------------------------------------------------------- + // Test 3 — Heartbeat frames are sent + // + // Go reference: consumer.go:5222 — sendIdleHeartbeat emits a + // "NATS/1.0 100 Idle Heartbeat" status frame on the deliver subject. + // ------------------------------------------------------------------------- + [Fact] + public async Task DeliveryLoop_sends_heartbeat_frames() + { + var engine = new PushConsumerEngine(); + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "HB", + Push = true, + DeliverSubject = "deliver.hb", + HeartbeatMs = 100, + }); + + // Enqueue one data message; HeartbeatMs > 0 causes Enqueue to also + // append a heartbeat frame immediately after. + engine.Enqueue(consumer, MakeMessage(1)); + + var headerSnapshots = new ConcurrentBag>(); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + + engine.StartDeliveryLoop(consumer, + async (_, _, headers, _, _) => + { + headerSnapshots.Add(headers); + await ValueTask.CompletedTask; + }, + cts.Token); + + // Wait for both the data frame and the heartbeat frame + while (headerSnapshots.Count < 2 && !cts.IsCancellationRequested) + await Task.Delay(5, cts.Token); + + engine.StopDeliveryLoop(); + + headerSnapshots.Count.ShouldBeGreaterThanOrEqualTo(2); + + // At least one frame must contain "Idle Heartbeat" + var anyHeartbeat = headerSnapshots.Any(h => + Encoding.ASCII.GetString(h.Span).Contains("Idle Heartbeat")); + anyHeartbeat.ShouldBeTrue(); + } + + // ------------------------------------------------------------------------- + // Test 4 — Flow control frames are sent + // + // Go reference: consumer.go:5495 — sendFlowControl sends a status frame + // "NATS/1.0 100 FlowControl Request" to the deliver subject. + // ------------------------------------------------------------------------- + [Fact] + public async Task DeliveryLoop_sends_flow_control_frames() + { + var engine = new PushConsumerEngine(); + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "FC", + Push = true, + DeliverSubject = "deliver.fc", + FlowControl = true, + HeartbeatMs = 100, // Go requires heartbeat when flow control is on + }); + + engine.Enqueue(consumer, MakeMessage(1)); + + var headerSnapshots = new ConcurrentBag>(); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + + engine.StartDeliveryLoop(consumer, + async (_, _, headers, _, _) => + { + headerSnapshots.Add(headers); + await ValueTask.CompletedTask; + }, + cts.Token); + + // data + flow-control + heartbeat = 3 frames + while (headerSnapshots.Count < 3 && !cts.IsCancellationRequested) + await Task.Delay(5, cts.Token); + + engine.StopDeliveryLoop(); + + var anyFlowControl = headerSnapshots.Any(h => + Encoding.ASCII.GetString(h.Span).Contains("FlowControl")); + anyFlowControl.ShouldBeTrue(); + } + + // ------------------------------------------------------------------------- + // Test 5 — Delivery stops on cancellation + // + // Go reference: consumer.go — the delivery goroutine exits when the qch + // (quit channel) is signalled, which maps to CancellationToken here. + // ------------------------------------------------------------------------- + [Fact] + public async Task DeliveryLoop_stops_on_cancellation() + { + var engine = new PushConsumerEngine(); + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "CANCEL", + Push = true, + DeliverSubject = "deliver.cancel", + }); + + var deliveryCount = 0; + var cts = new CancellationTokenSource(); + + engine.StartDeliveryLoop(consumer, + async (_, _, _, _, _) => + { + Interlocked.Increment(ref deliveryCount); + await ValueTask.CompletedTask; + }, + cts.Token); + + // Cancel immediately — nothing enqueued so delivery count must stay 0 + await cts.CancelAsync(); + engine.StopDeliveryLoop(); + + // Brief settle — no messages were queued so nothing should have been delivered + await Task.Delay(20); + deliveryCount.ShouldBe(0); + } + + // ------------------------------------------------------------------------- + // Test 6 — Data frame headers contain JetStream metadata + // + // Go reference: stream.go:586 — JSSequence = "Nats-Sequence", + // JSTimeStamp = "Nats-Time-Stamp", JSSubject = "Nats-Subject" + // ------------------------------------------------------------------------- + [Fact] + public async Task DeliveryLoop_data_frame_headers_contain_jetstream_metadata() + { + var engine = new PushConsumerEngine(); + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "META", + Push = true, + DeliverSubject = "deliver.meta", + }); + + var msg = MakeMessage(42, subject: "events.created"); + engine.Enqueue(consumer, msg); + + ReadOnlyMemory? capturedHeaders = null; + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var tcs = new TaskCompletionSource(); + + engine.StartDeliveryLoop(consumer, + async (_, _, headers, _, _) => + { + capturedHeaders = headers; + tcs.TrySetResult(true); + await ValueTask.CompletedTask; + }, + cts.Token); + + await tcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); + engine.StopDeliveryLoop(); + + capturedHeaders.ShouldNotBeNull(); + var headerText = Encoding.ASCII.GetString(capturedHeaders!.Value.Span); + headerText.ShouldContain("Nats-Sequence: 42"); + headerText.ShouldContain("Nats-Subject: events.created"); + headerText.ShouldContain("Nats-Time-Stamp:"); + } + + // ------------------------------------------------------------------------- + // Test 7 — DeliverSubject property is set when StartDeliveryLoop is called + // + // Go reference: consumer.go:1131 — dsubj is set from cfg.DeliverSubject. + // ------------------------------------------------------------------------- + [Fact] + public void DeliverSubject_property_is_set_from_consumer_config() + { + var engine = new PushConsumerEngine(); + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "DS", + Push = true, + DeliverSubject = "my.deliver.subject", + }); + + using var cts = new CancellationTokenSource(); + engine.StartDeliveryLoop(consumer, + (_, _, _, _, _) => ValueTask.CompletedTask, + cts.Token); + + engine.DeliverSubject.ShouldBe("my.deliver.subject"); + engine.StopDeliveryLoop(); + } +} diff --git a/tests/NATS.Server.Tests/JetStream/Consumers/RedeliveryTrackerTests.cs b/tests/NATS.Server.Tests/JetStream/Consumers/RedeliveryTrackerTests.cs new file mode 100644 index 0000000..013de17 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Consumers/RedeliveryTrackerTests.cs @@ -0,0 +1,198 @@ +// Go: consumer.go (trackPending ~line 5540, processNak, rdq/rdc map, +// addToRedeliverQueue, maxdeliver check) +using NATS.Server.JetStream.Consumers; + +namespace NATS.Server.Tests.JetStream.Consumers; + +public class RedeliveryTrackerTests +{ + // ------------------------------------------------------------------------- + // Test 1 — Backoff array clamping at last entry for high delivery counts + // + // Go reference: consumer.go — backoff index = min(deliveries-1, len(backoff)-1) + // so that sequences with delivery counts past the array length use the last + // backoff value rather than going out of bounds. + // ------------------------------------------------------------------------- + [Fact] + public async Task Schedule_clamps_backoff_at_last_entry_for_high_delivery_count() + { + var tracker = new RedeliveryTracker([1, 5000]); + + // delivery 1 → backoff[0] = 1ms + tracker.Schedule(seq: 1, deliveryCount: 1); + await Task.Delay(10); + tracker.GetDue().ShouldContain(1UL); + + tracker.Acknowledge(1); + + // delivery 3 → index clamps to 1 → backoff[1] = 5000ms + tracker.Schedule(seq: 1, deliveryCount: 3); + tracker.GetDue().ShouldNotContain(1UL); + } + + // ------------------------------------------------------------------------- + // Test 2 — GetDue returns only entries whose deadline has passed + // + // Go reference: consumer.go — rdq items are eligible for redelivery only + // once their scheduled deadline has elapsed. + // ------------------------------------------------------------------------- + [Fact] + public async Task GetDue_returns_only_expired_entries() + { + var tracker = new RedeliveryTracker([1, 5000]); + + // 1ms backoff → will expire quickly + tracker.Schedule(seq: 10, deliveryCount: 1); + // 5000ms backoff → will not expire in test window + tracker.Schedule(seq: 20, deliveryCount: 2); + + // Neither should be due yet immediately after scheduling + tracker.GetDue().ShouldNotContain(10UL); + + await Task.Delay(15); + + var due = tracker.GetDue(); + due.ShouldContain(10UL); + due.ShouldNotContain(20UL); + } + + // ------------------------------------------------------------------------- + // Test 3 — Acknowledge removes the sequence from tracking + // + // Go reference: consumer.go — acking a sequence removes it from pending map + // so it is never surfaced by GetDue again. + // ------------------------------------------------------------------------- + [Fact] + public async Task Acknowledge_removes_sequence_from_tracking() + { + var tracker = new RedeliveryTracker([1]); + + tracker.Schedule(seq: 5, deliveryCount: 1); + await Task.Delay(10); + + tracker.GetDue().ShouldContain(5UL); + + tracker.Acknowledge(5); + + tracker.IsTracking(5).ShouldBeFalse(); + tracker.GetDue().ShouldNotContain(5UL); + tracker.TrackedCount.ShouldBe(0); + } + + // ------------------------------------------------------------------------- + // Test 4 — IsMaxDeliveries returns true when threshold is reached + // + // Go reference: consumer.go — when rdc[sseq] >= MaxDeliver the sequence is + // dropped from redelivery and never surfaced again. + // ------------------------------------------------------------------------- + [Fact] + public void IsMaxDeliveries_returns_true_when_delivery_count_meets_threshold() + { + var tracker = new RedeliveryTracker([100]); + + tracker.Schedule(seq: 7, deliveryCount: 3); + + tracker.IsMaxDeliveries(7, maxDeliver: 3).ShouldBeTrue(); + tracker.IsMaxDeliveries(7, maxDeliver: 4).ShouldBeFalse(); + tracker.IsMaxDeliveries(7, maxDeliver: 2).ShouldBeTrue(); + } + + // ------------------------------------------------------------------------- + // Test 5 — IsMaxDeliveries returns false when maxDeliver is 0 (unlimited) + // + // Go reference: consumer.go — MaxDeliver <= 0 means unlimited redeliveries. + // ------------------------------------------------------------------------- + [Fact] + public void IsMaxDeliveries_returns_false_when_maxDeliver_is_zero() + { + var tracker = new RedeliveryTracker([100]); + + tracker.Schedule(seq: 99, deliveryCount: 1000); + + tracker.IsMaxDeliveries(99, maxDeliver: 0).ShouldBeFalse(); + } + + // ------------------------------------------------------------------------- + // Test 6 — Empty backoff falls back to ackWait + // + // Go reference: consumer.go — when BackOff is empty the ack-wait duration is + // used as the redelivery delay. + // ------------------------------------------------------------------------- + [Fact] + public async Task Schedule_with_empty_backoff_falls_back_to_ackWait() + { + // Empty backoff array → fall back to ackWaitMs + var tracker = new RedeliveryTracker([]); + + tracker.Schedule(seq: 1, deliveryCount: 1, ackWaitMs: 1); + await Task.Delay(10); + + tracker.GetDue().ShouldContain(1UL); + } + + // ------------------------------------------------------------------------- + // Test 7 — Empty backoff with large ackWait does not expire prematurely + // ------------------------------------------------------------------------- + [Fact] + public void Schedule_with_empty_backoff_and_large_ackWait_does_not_expire() + { + var tracker = new RedeliveryTracker([]); + + tracker.Schedule(seq: 2, deliveryCount: 1, ackWaitMs: 5000); + + tracker.GetDue().ShouldNotContain(2UL); + } + + // ------------------------------------------------------------------------- + // Test 8 — Schedule returns the deadline UTC time + // + // Go reference: consumer.go:5540 — trackPending stores the computed deadline. + // ------------------------------------------------------------------------- + [Fact] + public void Schedule_returns_deadline_in_the_future() + { + var tracker = new RedeliveryTracker([100]); + + var before = DateTime.UtcNow; + var deadline = tracker.Schedule(seq: 3, deliveryCount: 1); + var after = DateTime.UtcNow; + + deadline.ShouldBeGreaterThanOrEqualTo(before); + // Deadline should be ahead of scheduling time by at least the backoff value + (deadline - after).TotalMilliseconds.ShouldBeGreaterThan(0); + } + + // ------------------------------------------------------------------------- + // Test 9 — Multiple sequences tracked independently + // ------------------------------------------------------------------------- + [Fact] + public async Task Multiple_sequences_are_tracked_independently() + { + var tracker = new RedeliveryTracker([1, 5000]); + + tracker.Schedule(seq: 1, deliveryCount: 1); // 1ms → expires soon + tracker.Schedule(seq: 2, deliveryCount: 2); // 5000ms → won't expire + + tracker.TrackedCount.ShouldBe(2); + + await Task.Delay(15); + + var due = tracker.GetDue(); + due.ShouldContain(1UL); + due.ShouldNotContain(2UL); + + tracker.Acknowledge(1); + tracker.TrackedCount.ShouldBe(1); + } + + // ------------------------------------------------------------------------- + // Test 10 — IsMaxDeliveries returns false for untracked sequence + // ------------------------------------------------------------------------- + [Fact] + public void IsMaxDeliveries_returns_false_for_untracked_sequence() + { + var tracker = new RedeliveryTracker([100]); + + tracker.IsMaxDeliveries(999, maxDeliver: 1).ShouldBeFalse(); + } +}