// Go: consumer.go:1400 (loopAndGatherMsgs) — gather loop polls the store for new messages, // dispatches them through the send delegate, respects filter subjects, // advances NextSequence, handles deleted/null entries, and exits on cancellation. using System.Text; using NATS.Server.JetStream; using NATS.Server.JetStream.Consumers; using NATS.Server.JetStream.Models; using NATS.Server.JetStream.Storage; namespace NATS.Server.Tests.JetStream.Consumers; public class DeliveryLoopTests { // ----------------------------------------------------------------------- // Helpers // ----------------------------------------------------------------------- private static ConsumerHandle MakeConsumer(ConsumerConfig config) => new("TEST-STREAM", config); /// /// Returns a send delegate that releases on each delivery /// and accumulates original subjects into . /// private static Func, ReadOnlyMemory, CancellationToken, ValueTask> MakeSemaphoreSend(List deliveredOriginalSubjects, SemaphoreSlim sem) => (_, origSubj, _, _, _) => { lock (deliveredOriginalSubjects) deliveredOriginalSubjects.Add(origSubj); sem.Release(); return ValueTask.CompletedTask; }; // ----------------------------------------------------------------------- // Test 1 — GatherLoop polls store for new messages // // Go reference: consumer.go:1560 — poll store for messages from nextSeq to LastSeq. // Three messages appended before the loop starts; loop must dispatch all three. // ----------------------------------------------------------------------- [Fact] public async Task GatherLoop_polls_store_for_new_messages() { var store = new MemStore(); await store.AppendAsync("foo", "msg1"u8.ToArray(), default); await store.AppendAsync("foo", "msg2"u8.ToArray(), default); await store.AppendAsync("foo", "msg3"u8.ToArray(), default); var consumer = MakeConsumer(new ConsumerConfig { DurableName = "POLL" }); var delivered = new List(); var sem = new SemaphoreSlim(0); var send = MakeSemaphoreSend(delivered, sem); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var engine = new PushConsumerEngine(); engine.StartGatherLoop(consumer, store, send, cts.Token); // Wait for exactly 3 releases — one per dispatched message await sem.WaitAsync(cts.Token); await sem.WaitAsync(cts.Token); await sem.WaitAsync(cts.Token); engine.StopGatherLoop(); lock (delivered) delivered.Count.ShouldBe(3); } // ----------------------------------------------------------------------- // Test 2 — GatherLoop respects FilterSubject // // Go reference: consumer.go:1569 — ShouldDeliver skips messages whose subject // does not match cfg.FilterSubject. // ----------------------------------------------------------------------- [Fact] public async Task GatherLoop_respects_filter_subject() { var store = new MemStore(); await store.AppendAsync("orders.us", "o1"u8.ToArray(), default); await store.AppendAsync("events.x", "e1"u8.ToArray(), default); await store.AppendAsync("orders.eu", "o2"u8.ToArray(), default); var consumer = MakeConsumer(new ConsumerConfig { DurableName = "FILTERED", FilterSubject = "orders.>", }); var delivered = new List(); var sem = new SemaphoreSlim(0); var send = MakeSemaphoreSend(delivered, sem); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var engine = new PushConsumerEngine(); engine.StartGatherLoop(consumer, store, send, cts.Token); // Only "orders.us" and "orders.eu" match the filter — wait for exactly 2 await sem.WaitAsync(cts.Token); await sem.WaitAsync(cts.Token); engine.StopGatherLoop(); lock (delivered) { delivered.Count.ShouldBe(2); delivered.ShouldContain("orders.us"); delivered.ShouldContain("orders.eu"); delivered.ShouldNotContain("events.x"); } } // ----------------------------------------------------------------------- // Test 3 — ShouldDeliver with no filter delivers all subjects // // Go reference: consumer.go — empty FilterSubject + empty FilterSubjects → deliver all. // ----------------------------------------------------------------------- [Fact] public void ShouldDeliver_with_no_filter_delivers_all() { var config = new ConsumerConfig { DurableName = "ANY" }; PushConsumerEngine.ShouldDeliverPublic(config, "orders.new").ShouldBeTrue(); PushConsumerEngine.ShouldDeliverPublic(config, "events.x").ShouldBeTrue(); PushConsumerEngine.ShouldDeliverPublic(config, "telemetry.cpu.host1").ShouldBeTrue(); } // ----------------------------------------------------------------------- // Test 4 — ShouldDeliver with single FilterSubject // // Go reference: consumer.go — FilterSubject is matched via SubjectMatch.MatchLiteral. // ----------------------------------------------------------------------- [Fact] public void ShouldDeliver_with_single_filter() { var config = new ConsumerConfig { DurableName = "SINGLE", FilterSubject = "orders.us", }; PushConsumerEngine.ShouldDeliverPublic(config, "orders.us").ShouldBeTrue(); PushConsumerEngine.ShouldDeliverPublic(config, "orders.eu").ShouldBeFalse(); PushConsumerEngine.ShouldDeliverPublic(config, "events.x").ShouldBeFalse(); } // ----------------------------------------------------------------------- // Test 5 — ShouldDeliver with multiple filters (FilterSubjects list) // // Go reference: consumer.go — FilterSubjects: any match → deliver. // ----------------------------------------------------------------------- [Fact] public void ShouldDeliver_with_multiple_filters() { var config = new ConsumerConfig { DurableName = "MULTI", FilterSubjects = ["orders.us", "events.created"], }; PushConsumerEngine.ShouldDeliverPublic(config, "orders.us").ShouldBeTrue(); PushConsumerEngine.ShouldDeliverPublic(config, "events.created").ShouldBeTrue(); PushConsumerEngine.ShouldDeliverPublic(config, "orders.eu").ShouldBeFalse(); PushConsumerEngine.ShouldDeliverPublic(config, "events.deleted").ShouldBeFalse(); } // ----------------------------------------------------------------------- // Test 6 — ShouldDeliver with wildcard filter // // Go reference: consumer.go — wildcard matching via SubjectMatch.MatchLiteral. // "orders.*" matches one-token suffix; "orders.us.new" has two suffix tokens // so it does not match. // ----------------------------------------------------------------------- [Fact] public void ShouldDeliver_with_wildcard_filter() { var config = new ConsumerConfig { DurableName = "WILDCARD", FilterSubject = "orders.*", }; PushConsumerEngine.ShouldDeliverPublic(config, "orders.new").ShouldBeTrue(); PushConsumerEngine.ShouldDeliverPublic(config, "orders.old").ShouldBeTrue(); PushConsumerEngine.ShouldDeliverPublic(config, "orders.us.new").ShouldBeFalse(); PushConsumerEngine.ShouldDeliverPublic(config, "events.x").ShouldBeFalse(); } // ----------------------------------------------------------------------- // Test 7 — Signal(NewMessage) wakes the gather loop // // Go reference: consumer.go:1620 — channel send wakes the loop so it does // not have to wait the full 250ms poll timeout. // ----------------------------------------------------------------------- [Fact] public async Task GatherLoop_signal_wakes_loop() { var store = new MemStore(); var consumer = MakeConsumer(new ConsumerConfig { DurableName = "SIGNAL" }); // loopStarted is released once the loop has begun its first wait cycle — // we detect this by waiting until NextSequence has been set to 1 (the loop // initialises it on entry) and the store is still empty. var deliveredSem = new SemaphoreSlim(0, 1); var delivered = new List(); Func, ReadOnlyMemory, CancellationToken, ValueTask> send = (_, origSubj, _, _, _) => { lock (delivered) delivered.Add(origSubj); deliveredSem.Release(); return ValueTask.CompletedTask; }; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var engine = new PushConsumerEngine(); engine.StartGatherLoop(consumer, store, send, cts.Token); // Spin (yield only) until the loop has entered its first 250ms wait, which we // infer from GatheredCount staying at 0 while the loop is running. // We yield the thread without sleeping to avoid SW004. var spins = 0; while (engine.GatheredCount == 0 && spins < 5_000) { await Task.Yield(); spins++; } // Append a message and signal — delivery should arrive well before 500ms await store.AppendAsync("foo", "hello"u8.ToArray(), default); engine.Signal(ConsumerSignal.NewMessage); var received = await deliveredSem.WaitAsync(TimeSpan.FromMilliseconds(500), cts.Token); engine.StopGatherLoop(); received.ShouldBeTrue("expected delivery within 500ms after Signal(NewMessage)"); lock (delivered) delivered.Count.ShouldBeGreaterThanOrEqualTo(1); } // ----------------------------------------------------------------------- // Test 8 — GatherLoop advances NextSequence // // Go reference: consumer.go:1600 — nextSeq++ and consumer.NextSequence = nextSeq. // ----------------------------------------------------------------------- [Fact] public async Task GatherLoop_advances_next_sequence() { var store = new MemStore(); await store.AppendAsync("foo", "a"u8.ToArray(), default); await store.AppendAsync("foo", "b"u8.ToArray(), default); await store.AppendAsync("foo", "c"u8.ToArray(), default); var consumer = MakeConsumer(new ConsumerConfig { DurableName = "SEQ" }); var delivered = new List(); var sem = new SemaphoreSlim(0); var send = MakeSemaphoreSend(delivered, sem); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var engine = new PushConsumerEngine(); engine.StartGatherLoop(consumer, store, send, cts.Token); // Wait for all three messages to be delivered await sem.WaitAsync(cts.Token); await sem.WaitAsync(cts.Token); await sem.WaitAsync(cts.Token); engine.StopGatherLoop(); // After delivering 3 messages NextSequence should be 4 (next to load) consumer.NextSequence.ShouldBe((ulong)4); } // ----------------------------------------------------------------------- // Test 9 — GatherLoop skips deleted/null messages // // Go reference: consumer.go:1572 — LoadAsync returning null means the // message was deleted; the gather loop simply advances past it. // ----------------------------------------------------------------------- [Fact] public async Task GatherLoop_skips_deleted_messages() { var store = new MemStore(); await store.AppendAsync("foo", "first"u8.ToArray(), default); // seq 1 await store.AppendAsync("foo", "second"u8.ToArray(), default); // seq 2 await store.AppendAsync("foo", "third"u8.ToArray(), default); // seq 3 // Delete seq 2 so LoadAsync returns null for it await store.RemoveAsync(2, default); var consumer = MakeConsumer(new ConsumerConfig { DurableName = "SKIP-DEL" }); var deliveredPayloads = new List(); var sem = new SemaphoreSlim(0); Func, ReadOnlyMemory, CancellationToken, ValueTask> send = (_, _, _, payload, _) => { lock (deliveredPayloads) deliveredPayloads.Add(Encoding.UTF8.GetString(payload.Span)); sem.Release(); return ValueTask.CompletedTask; }; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var engine = new PushConsumerEngine(); engine.StartGatherLoop(consumer, store, send, cts.Token); // Only two messages remain after removing seq 2 await sem.WaitAsync(cts.Token); await sem.WaitAsync(cts.Token); engine.StopGatherLoop(); lock (deliveredPayloads) { deliveredPayloads.Count.ShouldBe(2); deliveredPayloads.ShouldContain("first"); deliveredPayloads.ShouldContain("third"); deliveredPayloads.ShouldNotContain("second"); } } // ----------------------------------------------------------------------- // Test 10 — GatherLoop increments GatheredCount // // Go reference: consumer.go:1400 loopAndGatherMsgs — GatheredCount tracks // every message dispatched to the subscriber. // ----------------------------------------------------------------------- [Fact] public async Task GatherLoop_increments_gathered_count() { var store = new MemStore(); await store.AppendAsync("foo", "x"u8.ToArray(), default); await store.AppendAsync("foo", "y"u8.ToArray(), default); var consumer = MakeConsumer(new ConsumerConfig { DurableName = "COUNT" }); var delivered = new List(); var sem = new SemaphoreSlim(0); var send = MakeSemaphoreSend(delivered, sem); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var engine = new PushConsumerEngine(); engine.StartGatherLoop(consumer, store, send, cts.Token); await sem.WaitAsync(cts.Token); await sem.WaitAsync(cts.Token); engine.StopGatherLoop(); engine.GatheredCount.ShouldBe(2); } // ----------------------------------------------------------------------- // Test 11 — GatherLoop stops on cancellation // // Go reference: consumer.go — the goroutine exits when the quit channel closes, // which maps to CancellationToken cancellation here. // ----------------------------------------------------------------------- [Fact] public async Task GatherLoop_stops_on_cancellation() { var store = new MemStore(); var consumer = MakeConsumer(new ConsumerConfig { DurableName = "CANCEL" }); // loopRunning becomes set once the loop is executing its first iteration var loopRunning = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var deliveredCount = 0; Func, ReadOnlyMemory, CancellationToken, ValueTask> send = (_, _, _, _, _) => { Interlocked.Increment(ref deliveredCount); return ValueTask.CompletedTask; }; var cts = new CancellationTokenSource(); var engine = new PushConsumerEngine(); engine.StartGatherLoop(consumer, store, send, cts.Token); // Cancel without appending anything; the loop must exit cleanly await cts.CancelAsync(); engine.StopGatherLoop(); // Yield a few times to let any in-flight dispatch complete for (var i = 0; i < 10; i++) await Task.Yield(); deliveredCount.ShouldBe(0); } // ----------------------------------------------------------------------- // Test 12 — GatherLoop handles an empty store // // Go reference: consumer.go:1620 — when no messages exist the loop waits // on the signal channel with a 250ms timeout rather than busy-spinning. // We verify it does NOT deliver anything when the store remains empty, // and that it exits cleanly when cancelled. // ----------------------------------------------------------------------- [Fact] [SlopwatchSuppress("SW004", "Negative timing assertion: verifying the gather loop does NOT deliver from an empty store requires a real wall-clock window; no synchronisation primitive can replace observing the absence of delivery")] public async Task GatherLoop_handles_empty_store() { var store = new MemStore(); // nothing appended var consumer = MakeConsumer(new ConsumerConfig { DurableName = "EMPTY" }); var deliveredCount = 0; var firstCallTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); Func, ReadOnlyMemory, CancellationToken, ValueTask> send = (_, _, _, _, _) => { Interlocked.Increment(ref deliveredCount); firstCallTcs.TrySetResult(true); return ValueTask.CompletedTask; }; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var engine = new PushConsumerEngine(); engine.StartGatherLoop(consumer, store, send, cts.Token); // The send delegate should never be called because the store is empty. // Wait a short absolute time; if it fires we know the loop is broken. var unexpectedDelivery = await Task.WhenAny( firstCallTcs.Task, Task.Delay(150, cts.Token)) == firstCallTcs.Task; engine.StopGatherLoop(); unexpectedDelivery.ShouldBeFalse("gather loop must not deliver from an empty store"); deliveredCount.ShouldBe(0); } }