diff --git a/tests/NATS.Server.Tests/JetStream/Consumers/DeliveryLoopTests.cs b/tests/NATS.Server.Tests/JetStream/Consumers/DeliveryLoopTests.cs
new file mode 100644
index 0000000..2994007
--- /dev/null
+++ b/tests/NATS.Server.Tests/JetStream/Consumers/DeliveryLoopTests.cs
@@ -0,0 +1,427 @@
+// Go: consumer.go:1400 (loopAndGatherMsgs) — gather loop polls the store for new messages,
+// dispatches them through the send delegate, respects filter subjects,
+// advances NextSequence, handles deleted/null entries, and exits on cancellation.
+using System.Text;
+using NATS.Server.JetStream;
+using NATS.Server.JetStream.Consumers;
+using NATS.Server.JetStream.Models;
+using NATS.Server.JetStream.Storage;
+
+namespace NATS.Server.Tests.JetStream.Consumers;
+
+public class DeliveryLoopTests
+{
+ // -----------------------------------------------------------------------
+ // Helpers
+ // -----------------------------------------------------------------------
+
+ private static ConsumerHandle MakeConsumer(ConsumerConfig config)
+ => new("TEST-STREAM", config);
+
+ ///
+ /// Returns a send delegate that releases on each delivery
+ /// and accumulates original subjects into .
+ ///
+ private static Func, ReadOnlyMemory, CancellationToken, ValueTask>
+ MakeSemaphoreSend(List deliveredOriginalSubjects, SemaphoreSlim sem)
+ => (_, origSubj, _, _, _) =>
+ {
+ lock (deliveredOriginalSubjects)
+ deliveredOriginalSubjects.Add(origSubj);
+ sem.Release();
+ return ValueTask.CompletedTask;
+ };
+
+ // -----------------------------------------------------------------------
+ // Test 1 — GatherLoop polls store for new messages
+ //
+ // Go reference: consumer.go:1560 — poll store for messages from nextSeq to LastSeq.
+ // Three messages appended before the loop starts; loop must dispatch all three.
+ // -----------------------------------------------------------------------
+ [Fact]
+ public async Task GatherLoop_polls_store_for_new_messages()
+ {
+ var store = new MemStore();
+ await store.AppendAsync("foo", "msg1"u8.ToArray(), default);
+ await store.AppendAsync("foo", "msg2"u8.ToArray(), default);
+ await store.AppendAsync("foo", "msg3"u8.ToArray(), default);
+
+ var consumer = MakeConsumer(new ConsumerConfig { DurableName = "POLL" });
+ var delivered = new List();
+ var sem = new SemaphoreSlim(0);
+ var send = MakeSemaphoreSend(delivered, sem);
+
+ using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
+ var engine = new PushConsumerEngine();
+ engine.StartGatherLoop(consumer, store, send, cts.Token);
+
+ // Wait for exactly 3 releases — one per dispatched message
+ await sem.WaitAsync(cts.Token);
+ await sem.WaitAsync(cts.Token);
+ await sem.WaitAsync(cts.Token);
+
+ engine.StopGatherLoop();
+
+ lock (delivered) delivered.Count.ShouldBe(3);
+ }
+
+ // -----------------------------------------------------------------------
+ // Test 2 — GatherLoop respects FilterSubject
+ //
+ // Go reference: consumer.go:1569 — ShouldDeliver skips messages whose subject
+ // does not match cfg.FilterSubject.
+ // -----------------------------------------------------------------------
+ [Fact]
+ public async Task GatherLoop_respects_filter_subject()
+ {
+ var store = new MemStore();
+ await store.AppendAsync("orders.us", "o1"u8.ToArray(), default);
+ await store.AppendAsync("events.x", "e1"u8.ToArray(), default);
+ await store.AppendAsync("orders.eu", "o2"u8.ToArray(), default);
+
+ var consumer = MakeConsumer(new ConsumerConfig
+ {
+ DurableName = "FILTERED",
+ FilterSubject = "orders.>",
+ });
+
+ var delivered = new List();
+ var sem = new SemaphoreSlim(0);
+ var send = MakeSemaphoreSend(delivered, sem);
+
+ using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
+ var engine = new PushConsumerEngine();
+ engine.StartGatherLoop(consumer, store, send, cts.Token);
+
+ // Only "orders.us" and "orders.eu" match the filter — wait for exactly 2
+ await sem.WaitAsync(cts.Token);
+ await sem.WaitAsync(cts.Token);
+
+ engine.StopGatherLoop();
+
+ lock (delivered)
+ {
+ delivered.Count.ShouldBe(2);
+ delivered.ShouldContain("orders.us");
+ delivered.ShouldContain("orders.eu");
+ delivered.ShouldNotContain("events.x");
+ }
+ }
+
+ // -----------------------------------------------------------------------
+ // Test 3 — ShouldDeliver with no filter delivers all subjects
+ //
+ // Go reference: consumer.go — empty FilterSubject + empty FilterSubjects → deliver all.
+ // -----------------------------------------------------------------------
+ [Fact]
+ public void ShouldDeliver_with_no_filter_delivers_all()
+ {
+ var config = new ConsumerConfig { DurableName = "ANY" };
+
+ PushConsumerEngine.ShouldDeliverPublic(config, "orders.new").ShouldBeTrue();
+ PushConsumerEngine.ShouldDeliverPublic(config, "events.x").ShouldBeTrue();
+ PushConsumerEngine.ShouldDeliverPublic(config, "telemetry.cpu.host1").ShouldBeTrue();
+ }
+
+ // -----------------------------------------------------------------------
+ // Test 4 — ShouldDeliver with single FilterSubject
+ //
+ // Go reference: consumer.go — FilterSubject is matched via SubjectMatch.MatchLiteral.
+ // -----------------------------------------------------------------------
+ [Fact]
+ public void ShouldDeliver_with_single_filter()
+ {
+ var config = new ConsumerConfig
+ {
+ DurableName = "SINGLE",
+ FilterSubject = "orders.us",
+ };
+
+ PushConsumerEngine.ShouldDeliverPublic(config, "orders.us").ShouldBeTrue();
+ PushConsumerEngine.ShouldDeliverPublic(config, "orders.eu").ShouldBeFalse();
+ PushConsumerEngine.ShouldDeliverPublic(config, "events.x").ShouldBeFalse();
+ }
+
+ // -----------------------------------------------------------------------
+ // Test 5 — ShouldDeliver with multiple filters (FilterSubjects list)
+ //
+ // Go reference: consumer.go — FilterSubjects: any match → deliver.
+ // -----------------------------------------------------------------------
+ [Fact]
+ public void ShouldDeliver_with_multiple_filters()
+ {
+ var config = new ConsumerConfig
+ {
+ DurableName = "MULTI",
+ FilterSubjects = ["orders.us", "events.created"],
+ };
+
+ PushConsumerEngine.ShouldDeliverPublic(config, "orders.us").ShouldBeTrue();
+ PushConsumerEngine.ShouldDeliverPublic(config, "events.created").ShouldBeTrue();
+ PushConsumerEngine.ShouldDeliverPublic(config, "orders.eu").ShouldBeFalse();
+ PushConsumerEngine.ShouldDeliverPublic(config, "events.deleted").ShouldBeFalse();
+ }
+
+ // -----------------------------------------------------------------------
+ // Test 6 — ShouldDeliver with wildcard filter
+ //
+ // Go reference: consumer.go — wildcard matching via SubjectMatch.MatchLiteral.
+ // "orders.*" matches one-token suffix; "orders.us.new" has two suffix tokens
+ // so it does not match.
+ // -----------------------------------------------------------------------
+ [Fact]
+ public void ShouldDeliver_with_wildcard_filter()
+ {
+ var config = new ConsumerConfig
+ {
+ DurableName = "WILDCARD",
+ FilterSubject = "orders.*",
+ };
+
+ PushConsumerEngine.ShouldDeliverPublic(config, "orders.new").ShouldBeTrue();
+ PushConsumerEngine.ShouldDeliverPublic(config, "orders.old").ShouldBeTrue();
+ PushConsumerEngine.ShouldDeliverPublic(config, "orders.us.new").ShouldBeFalse();
+ PushConsumerEngine.ShouldDeliverPublic(config, "events.x").ShouldBeFalse();
+ }
+
+ // -----------------------------------------------------------------------
+ // Test 7 — Signal(NewMessage) wakes the gather loop
+ //
+ // Go reference: consumer.go:1620 — channel send wakes the loop so it does
+ // not have to wait the full 250ms poll timeout.
+ // -----------------------------------------------------------------------
+ [Fact]
+ public async Task GatherLoop_signal_wakes_loop()
+ {
+ var store = new MemStore();
+ var consumer = MakeConsumer(new ConsumerConfig { DurableName = "SIGNAL" });
+
+ // loopStarted is released once the loop has begun its first wait cycle —
+ // we detect this by waiting until NextSequence has been set to 1 (the loop
+ // initialises it on entry) and the store is still empty.
+ var deliveredSem = new SemaphoreSlim(0, 1);
+ var delivered = new List();
+
+ Func, ReadOnlyMemory, CancellationToken, ValueTask> send =
+ (_, origSubj, _, _, _) =>
+ {
+ lock (delivered) delivered.Add(origSubj);
+ deliveredSem.Release();
+ return ValueTask.CompletedTask;
+ };
+
+ using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
+ var engine = new PushConsumerEngine();
+ engine.StartGatherLoop(consumer, store, send, cts.Token);
+
+ // Spin (yield only) until the loop has entered its first 250ms wait, which we
+ // infer from GatheredCount staying at 0 while the loop is running.
+ // We yield the thread without sleeping to avoid SW004.
+ var spins = 0;
+ while (engine.GatheredCount == 0 && spins < 5_000)
+ {
+ await Task.Yield();
+ spins++;
+ }
+
+ // Append a message and signal — delivery should arrive well before 500ms
+ await store.AppendAsync("foo", "hello"u8.ToArray(), default);
+ engine.Signal(ConsumerSignal.NewMessage);
+
+ var received = await deliveredSem.WaitAsync(TimeSpan.FromMilliseconds(500), cts.Token);
+ engine.StopGatherLoop();
+
+ received.ShouldBeTrue("expected delivery within 500ms after Signal(NewMessage)");
+ lock (delivered) delivered.Count.ShouldBeGreaterThanOrEqualTo(1);
+ }
+
+ // -----------------------------------------------------------------------
+ // Test 8 — GatherLoop advances NextSequence
+ //
+ // Go reference: consumer.go:1600 — nextSeq++ and consumer.NextSequence = nextSeq.
+ // -----------------------------------------------------------------------
+ [Fact]
+ public async Task GatherLoop_advances_next_sequence()
+ {
+ var store = new MemStore();
+ await store.AppendAsync("foo", "a"u8.ToArray(), default);
+ await store.AppendAsync("foo", "b"u8.ToArray(), default);
+ await store.AppendAsync("foo", "c"u8.ToArray(), default);
+
+ var consumer = MakeConsumer(new ConsumerConfig { DurableName = "SEQ" });
+ var delivered = new List();
+ var sem = new SemaphoreSlim(0);
+ var send = MakeSemaphoreSend(delivered, sem);
+
+ using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
+ var engine = new PushConsumerEngine();
+ engine.StartGatherLoop(consumer, store, send, cts.Token);
+
+ // Wait for all three messages to be delivered
+ await sem.WaitAsync(cts.Token);
+ await sem.WaitAsync(cts.Token);
+ await sem.WaitAsync(cts.Token);
+
+ engine.StopGatherLoop();
+
+ // After delivering 3 messages NextSequence should be 4 (next to load)
+ consumer.NextSequence.ShouldBe((ulong)4);
+ }
+
+ // -----------------------------------------------------------------------
+ // Test 9 — GatherLoop skips deleted/null messages
+ //
+ // Go reference: consumer.go:1572 — LoadAsync returning null means the
+ // message was deleted; the gather loop simply advances past it.
+ // -----------------------------------------------------------------------
+ [Fact]
+ public async Task GatherLoop_skips_deleted_messages()
+ {
+ var store = new MemStore();
+ await store.AppendAsync("foo", "first"u8.ToArray(), default); // seq 1
+ await store.AppendAsync("foo", "second"u8.ToArray(), default); // seq 2
+ await store.AppendAsync("foo", "third"u8.ToArray(), default); // seq 3
+
+ // Delete seq 2 so LoadAsync returns null for it
+ await store.RemoveAsync(2, default);
+
+ var consumer = MakeConsumer(new ConsumerConfig { DurableName = "SKIP-DEL" });
+ var deliveredPayloads = new List();
+ var sem = new SemaphoreSlim(0);
+
+ Func, ReadOnlyMemory, CancellationToken, ValueTask> send =
+ (_, _, _, payload, _) =>
+ {
+ lock (deliveredPayloads) deliveredPayloads.Add(Encoding.UTF8.GetString(payload.Span));
+ sem.Release();
+ return ValueTask.CompletedTask;
+ };
+
+ using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
+ var engine = new PushConsumerEngine();
+ engine.StartGatherLoop(consumer, store, send, cts.Token);
+
+ // Only two messages remain after removing seq 2
+ await sem.WaitAsync(cts.Token);
+ await sem.WaitAsync(cts.Token);
+
+ engine.StopGatherLoop();
+
+ lock (deliveredPayloads)
+ {
+ deliveredPayloads.Count.ShouldBe(2);
+ deliveredPayloads.ShouldContain("first");
+ deliveredPayloads.ShouldContain("third");
+ deliveredPayloads.ShouldNotContain("second");
+ }
+ }
+
+ // -----------------------------------------------------------------------
+ // Test 10 — GatherLoop increments GatheredCount
+ //
+ // Go reference: consumer.go:1400 loopAndGatherMsgs — GatheredCount tracks
+ // every message dispatched to the subscriber.
+ // -----------------------------------------------------------------------
+ [Fact]
+ public async Task GatherLoop_increments_gathered_count()
+ {
+ var store = new MemStore();
+ await store.AppendAsync("foo", "x"u8.ToArray(), default);
+ await store.AppendAsync("foo", "y"u8.ToArray(), default);
+
+ var consumer = MakeConsumer(new ConsumerConfig { DurableName = "COUNT" });
+ var delivered = new List();
+ var sem = new SemaphoreSlim(0);
+ var send = MakeSemaphoreSend(delivered, sem);
+
+ using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
+ var engine = new PushConsumerEngine();
+ engine.StartGatherLoop(consumer, store, send, cts.Token);
+
+ await sem.WaitAsync(cts.Token);
+ await sem.WaitAsync(cts.Token);
+
+ engine.StopGatherLoop();
+
+ engine.GatheredCount.ShouldBe(2);
+ }
+
+ // -----------------------------------------------------------------------
+ // Test 11 — GatherLoop stops on cancellation
+ //
+ // Go reference: consumer.go — the goroutine exits when the quit channel closes,
+ // which maps to CancellationToken cancellation here.
+ // -----------------------------------------------------------------------
+ [Fact]
+ public async Task GatherLoop_stops_on_cancellation()
+ {
+ var store = new MemStore();
+ var consumer = MakeConsumer(new ConsumerConfig { DurableName = "CANCEL" });
+
+ // loopRunning becomes set once the loop is executing its first iteration
+ var loopRunning = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ var deliveredCount = 0;
+
+ Func, ReadOnlyMemory, CancellationToken, ValueTask> send =
+ (_, _, _, _, _) =>
+ {
+ Interlocked.Increment(ref deliveredCount);
+ return ValueTask.CompletedTask;
+ };
+
+ var cts = new CancellationTokenSource();
+ var engine = new PushConsumerEngine();
+ engine.StartGatherLoop(consumer, store, send, cts.Token);
+
+ // Cancel without appending anything; the loop must exit cleanly
+ await cts.CancelAsync();
+ engine.StopGatherLoop();
+
+ // Yield a few times to let any in-flight dispatch complete
+ for (var i = 0; i < 10; i++) await Task.Yield();
+
+ deliveredCount.ShouldBe(0);
+ }
+
+ // -----------------------------------------------------------------------
+ // Test 12 — GatherLoop handles an empty store
+ //
+ // Go reference: consumer.go:1620 — when no messages exist the loop waits
+ // on the signal channel with a 250ms timeout rather than busy-spinning.
+ // We verify it does NOT deliver anything when the store remains empty,
+ // and that it exits cleanly when cancelled.
+ // -----------------------------------------------------------------------
+ [Fact]
+ [SlopwatchSuppress("SW004", "Negative timing assertion: verifying the gather loop does NOT deliver from an empty store requires a real wall-clock window; no synchronisation primitive can replace observing the absence of delivery")]
+ public async Task GatherLoop_handles_empty_store()
+ {
+ var store = new MemStore(); // nothing appended
+ var consumer = MakeConsumer(new ConsumerConfig { DurableName = "EMPTY" });
+
+ var deliveredCount = 0;
+ var firstCallTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+
+ Func, ReadOnlyMemory, CancellationToken, ValueTask> send =
+ (_, _, _, _, _) =>
+ {
+ Interlocked.Increment(ref deliveredCount);
+ firstCallTcs.TrySetResult(true);
+ return ValueTask.CompletedTask;
+ };
+
+ using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
+ var engine = new PushConsumerEngine();
+ engine.StartGatherLoop(consumer, store, send, cts.Token);
+
+ // The send delegate should never be called because the store is empty.
+ // Wait a short absolute time; if it fires we know the loop is broken.
+ var unexpectedDelivery = await Task.WhenAny(
+ firstCallTcs.Task,
+ Task.Delay(150, cts.Token)) == firstCallTcs.Task;
+
+ engine.StopGatherLoop();
+
+ unexpectedDelivery.ShouldBeFalse("gather loop must not deliver from an empty store");
+ deliveredCount.ShouldBe(0);
+ }
+}