From ae4bc1f683fb6f8b4a9145d4eba354ada2d04111 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 25 Feb 2026 11:09:52 -0500 Subject: [PATCH] feat: implement core message delivery loop for push consumers (Gap 3.1) Adds DeliveryLoopTests covering the gather loop's store polling, filter subject enforcement, NextSequence advancement, deleted-message skipping, GatheredCount tracking, signal wake-up, and cancellation behaviour. --- .../JetStream/Consumers/DeliveryLoopTests.cs | 427 ++++++++++++++++++ 1 file changed, 427 insertions(+) create mode 100644 tests/NATS.Server.Tests/JetStream/Consumers/DeliveryLoopTests.cs diff --git a/tests/NATS.Server.Tests/JetStream/Consumers/DeliveryLoopTests.cs b/tests/NATS.Server.Tests/JetStream/Consumers/DeliveryLoopTests.cs new file mode 100644 index 0000000..2994007 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Consumers/DeliveryLoopTests.cs @@ -0,0 +1,427 @@ +// Go: consumer.go:1400 (loopAndGatherMsgs) — gather loop polls the store for new messages, +// dispatches them through the send delegate, respects filter subjects, +// advances NextSequence, handles deleted/null entries, and exits on cancellation. +using System.Text; +using NATS.Server.JetStream; +using NATS.Server.JetStream.Consumers; +using NATS.Server.JetStream.Models; +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.Tests.JetStream.Consumers; + +public class DeliveryLoopTests +{ + // ----------------------------------------------------------------------- + // Helpers + // ----------------------------------------------------------------------- + + private static ConsumerHandle MakeConsumer(ConsumerConfig config) + => new("TEST-STREAM", config); + + /// + /// Returns a send delegate that releases on each delivery + /// and accumulates original subjects into . + /// + private static Func, ReadOnlyMemory, CancellationToken, ValueTask> + MakeSemaphoreSend(List deliveredOriginalSubjects, SemaphoreSlim sem) + => (_, origSubj, _, _, _) => + { + lock (deliveredOriginalSubjects) + deliveredOriginalSubjects.Add(origSubj); + sem.Release(); + return ValueTask.CompletedTask; + }; + + // ----------------------------------------------------------------------- + // Test 1 — GatherLoop polls store for new messages + // + // Go reference: consumer.go:1560 — poll store for messages from nextSeq to LastSeq. + // Three messages appended before the loop starts; loop must dispatch all three. + // ----------------------------------------------------------------------- + [Fact] + public async Task GatherLoop_polls_store_for_new_messages() + { + var store = new MemStore(); + await store.AppendAsync("foo", "msg1"u8.ToArray(), default); + await store.AppendAsync("foo", "msg2"u8.ToArray(), default); + await store.AppendAsync("foo", "msg3"u8.ToArray(), default); + + var consumer = MakeConsumer(new ConsumerConfig { DurableName = "POLL" }); + var delivered = new List(); + var sem = new SemaphoreSlim(0); + var send = MakeSemaphoreSend(delivered, sem); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var engine = new PushConsumerEngine(); + engine.StartGatherLoop(consumer, store, send, cts.Token); + + // Wait for exactly 3 releases — one per dispatched message + await sem.WaitAsync(cts.Token); + await sem.WaitAsync(cts.Token); + await sem.WaitAsync(cts.Token); + + engine.StopGatherLoop(); + + lock (delivered) delivered.Count.ShouldBe(3); + } + + // ----------------------------------------------------------------------- + // Test 2 — GatherLoop respects FilterSubject + // + // Go reference: consumer.go:1569 — ShouldDeliver skips messages whose subject + // does not match cfg.FilterSubject. + // ----------------------------------------------------------------------- + [Fact] + public async Task GatherLoop_respects_filter_subject() + { + var store = new MemStore(); + await store.AppendAsync("orders.us", "o1"u8.ToArray(), default); + await store.AppendAsync("events.x", "e1"u8.ToArray(), default); + await store.AppendAsync("orders.eu", "o2"u8.ToArray(), default); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "FILTERED", + FilterSubject = "orders.>", + }); + + var delivered = new List(); + var sem = new SemaphoreSlim(0); + var send = MakeSemaphoreSend(delivered, sem); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var engine = new PushConsumerEngine(); + engine.StartGatherLoop(consumer, store, send, cts.Token); + + // Only "orders.us" and "orders.eu" match the filter — wait for exactly 2 + await sem.WaitAsync(cts.Token); + await sem.WaitAsync(cts.Token); + + engine.StopGatherLoop(); + + lock (delivered) + { + delivered.Count.ShouldBe(2); + delivered.ShouldContain("orders.us"); + delivered.ShouldContain("orders.eu"); + delivered.ShouldNotContain("events.x"); + } + } + + // ----------------------------------------------------------------------- + // Test 3 — ShouldDeliver with no filter delivers all subjects + // + // Go reference: consumer.go — empty FilterSubject + empty FilterSubjects → deliver all. + // ----------------------------------------------------------------------- + [Fact] + public void ShouldDeliver_with_no_filter_delivers_all() + { + var config = new ConsumerConfig { DurableName = "ANY" }; + + PushConsumerEngine.ShouldDeliverPublic(config, "orders.new").ShouldBeTrue(); + PushConsumerEngine.ShouldDeliverPublic(config, "events.x").ShouldBeTrue(); + PushConsumerEngine.ShouldDeliverPublic(config, "telemetry.cpu.host1").ShouldBeTrue(); + } + + // ----------------------------------------------------------------------- + // Test 4 — ShouldDeliver with single FilterSubject + // + // Go reference: consumer.go — FilterSubject is matched via SubjectMatch.MatchLiteral. + // ----------------------------------------------------------------------- + [Fact] + public void ShouldDeliver_with_single_filter() + { + var config = new ConsumerConfig + { + DurableName = "SINGLE", + FilterSubject = "orders.us", + }; + + PushConsumerEngine.ShouldDeliverPublic(config, "orders.us").ShouldBeTrue(); + PushConsumerEngine.ShouldDeliverPublic(config, "orders.eu").ShouldBeFalse(); + PushConsumerEngine.ShouldDeliverPublic(config, "events.x").ShouldBeFalse(); + } + + // ----------------------------------------------------------------------- + // Test 5 — ShouldDeliver with multiple filters (FilterSubjects list) + // + // Go reference: consumer.go — FilterSubjects: any match → deliver. + // ----------------------------------------------------------------------- + [Fact] + public void ShouldDeliver_with_multiple_filters() + { + var config = new ConsumerConfig + { + DurableName = "MULTI", + FilterSubjects = ["orders.us", "events.created"], + }; + + PushConsumerEngine.ShouldDeliverPublic(config, "orders.us").ShouldBeTrue(); + PushConsumerEngine.ShouldDeliverPublic(config, "events.created").ShouldBeTrue(); + PushConsumerEngine.ShouldDeliverPublic(config, "orders.eu").ShouldBeFalse(); + PushConsumerEngine.ShouldDeliverPublic(config, "events.deleted").ShouldBeFalse(); + } + + // ----------------------------------------------------------------------- + // Test 6 — ShouldDeliver with wildcard filter + // + // Go reference: consumer.go — wildcard matching via SubjectMatch.MatchLiteral. + // "orders.*" matches one-token suffix; "orders.us.new" has two suffix tokens + // so it does not match. + // ----------------------------------------------------------------------- + [Fact] + public void ShouldDeliver_with_wildcard_filter() + { + var config = new ConsumerConfig + { + DurableName = "WILDCARD", + FilterSubject = "orders.*", + }; + + PushConsumerEngine.ShouldDeliverPublic(config, "orders.new").ShouldBeTrue(); + PushConsumerEngine.ShouldDeliverPublic(config, "orders.old").ShouldBeTrue(); + PushConsumerEngine.ShouldDeliverPublic(config, "orders.us.new").ShouldBeFalse(); + PushConsumerEngine.ShouldDeliverPublic(config, "events.x").ShouldBeFalse(); + } + + // ----------------------------------------------------------------------- + // Test 7 — Signal(NewMessage) wakes the gather loop + // + // Go reference: consumer.go:1620 — channel send wakes the loop so it does + // not have to wait the full 250ms poll timeout. + // ----------------------------------------------------------------------- + [Fact] + public async Task GatherLoop_signal_wakes_loop() + { + var store = new MemStore(); + var consumer = MakeConsumer(new ConsumerConfig { DurableName = "SIGNAL" }); + + // loopStarted is released once the loop has begun its first wait cycle — + // we detect this by waiting until NextSequence has been set to 1 (the loop + // initialises it on entry) and the store is still empty. + var deliveredSem = new SemaphoreSlim(0, 1); + var delivered = new List(); + + Func, ReadOnlyMemory, CancellationToken, ValueTask> send = + (_, origSubj, _, _, _) => + { + lock (delivered) delivered.Add(origSubj); + deliveredSem.Release(); + return ValueTask.CompletedTask; + }; + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var engine = new PushConsumerEngine(); + engine.StartGatherLoop(consumer, store, send, cts.Token); + + // Spin (yield only) until the loop has entered its first 250ms wait, which we + // infer from GatheredCount staying at 0 while the loop is running. + // We yield the thread without sleeping to avoid SW004. + var spins = 0; + while (engine.GatheredCount == 0 && spins < 5_000) + { + await Task.Yield(); + spins++; + } + + // Append a message and signal — delivery should arrive well before 500ms + await store.AppendAsync("foo", "hello"u8.ToArray(), default); + engine.Signal(ConsumerSignal.NewMessage); + + var received = await deliveredSem.WaitAsync(TimeSpan.FromMilliseconds(500), cts.Token); + engine.StopGatherLoop(); + + received.ShouldBeTrue("expected delivery within 500ms after Signal(NewMessage)"); + lock (delivered) delivered.Count.ShouldBeGreaterThanOrEqualTo(1); + } + + // ----------------------------------------------------------------------- + // Test 8 — GatherLoop advances NextSequence + // + // Go reference: consumer.go:1600 — nextSeq++ and consumer.NextSequence = nextSeq. + // ----------------------------------------------------------------------- + [Fact] + public async Task GatherLoop_advances_next_sequence() + { + var store = new MemStore(); + await store.AppendAsync("foo", "a"u8.ToArray(), default); + await store.AppendAsync("foo", "b"u8.ToArray(), default); + await store.AppendAsync("foo", "c"u8.ToArray(), default); + + var consumer = MakeConsumer(new ConsumerConfig { DurableName = "SEQ" }); + var delivered = new List(); + var sem = new SemaphoreSlim(0); + var send = MakeSemaphoreSend(delivered, sem); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var engine = new PushConsumerEngine(); + engine.StartGatherLoop(consumer, store, send, cts.Token); + + // Wait for all three messages to be delivered + await sem.WaitAsync(cts.Token); + await sem.WaitAsync(cts.Token); + await sem.WaitAsync(cts.Token); + + engine.StopGatherLoop(); + + // After delivering 3 messages NextSequence should be 4 (next to load) + consumer.NextSequence.ShouldBe((ulong)4); + } + + // ----------------------------------------------------------------------- + // Test 9 — GatherLoop skips deleted/null messages + // + // Go reference: consumer.go:1572 — LoadAsync returning null means the + // message was deleted; the gather loop simply advances past it. + // ----------------------------------------------------------------------- + [Fact] + public async Task GatherLoop_skips_deleted_messages() + { + var store = new MemStore(); + await store.AppendAsync("foo", "first"u8.ToArray(), default); // seq 1 + await store.AppendAsync("foo", "second"u8.ToArray(), default); // seq 2 + await store.AppendAsync("foo", "third"u8.ToArray(), default); // seq 3 + + // Delete seq 2 so LoadAsync returns null for it + await store.RemoveAsync(2, default); + + var consumer = MakeConsumer(new ConsumerConfig { DurableName = "SKIP-DEL" }); + var deliveredPayloads = new List(); + var sem = new SemaphoreSlim(0); + + Func, ReadOnlyMemory, CancellationToken, ValueTask> send = + (_, _, _, payload, _) => + { + lock (deliveredPayloads) deliveredPayloads.Add(Encoding.UTF8.GetString(payload.Span)); + sem.Release(); + return ValueTask.CompletedTask; + }; + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var engine = new PushConsumerEngine(); + engine.StartGatherLoop(consumer, store, send, cts.Token); + + // Only two messages remain after removing seq 2 + await sem.WaitAsync(cts.Token); + await sem.WaitAsync(cts.Token); + + engine.StopGatherLoop(); + + lock (deliveredPayloads) + { + deliveredPayloads.Count.ShouldBe(2); + deliveredPayloads.ShouldContain("first"); + deliveredPayloads.ShouldContain("third"); + deliveredPayloads.ShouldNotContain("second"); + } + } + + // ----------------------------------------------------------------------- + // Test 10 — GatherLoop increments GatheredCount + // + // Go reference: consumer.go:1400 loopAndGatherMsgs — GatheredCount tracks + // every message dispatched to the subscriber. + // ----------------------------------------------------------------------- + [Fact] + public async Task GatherLoop_increments_gathered_count() + { + var store = new MemStore(); + await store.AppendAsync("foo", "x"u8.ToArray(), default); + await store.AppendAsync("foo", "y"u8.ToArray(), default); + + var consumer = MakeConsumer(new ConsumerConfig { DurableName = "COUNT" }); + var delivered = new List(); + var sem = new SemaphoreSlim(0); + var send = MakeSemaphoreSend(delivered, sem); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var engine = new PushConsumerEngine(); + engine.StartGatherLoop(consumer, store, send, cts.Token); + + await sem.WaitAsync(cts.Token); + await sem.WaitAsync(cts.Token); + + engine.StopGatherLoop(); + + engine.GatheredCount.ShouldBe(2); + } + + // ----------------------------------------------------------------------- + // Test 11 — GatherLoop stops on cancellation + // + // Go reference: consumer.go — the goroutine exits when the quit channel closes, + // which maps to CancellationToken cancellation here. + // ----------------------------------------------------------------------- + [Fact] + public async Task GatherLoop_stops_on_cancellation() + { + var store = new MemStore(); + var consumer = MakeConsumer(new ConsumerConfig { DurableName = "CANCEL" }); + + // loopRunning becomes set once the loop is executing its first iteration + var loopRunning = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var deliveredCount = 0; + + Func, ReadOnlyMemory, CancellationToken, ValueTask> send = + (_, _, _, _, _) => + { + Interlocked.Increment(ref deliveredCount); + return ValueTask.CompletedTask; + }; + + var cts = new CancellationTokenSource(); + var engine = new PushConsumerEngine(); + engine.StartGatherLoop(consumer, store, send, cts.Token); + + // Cancel without appending anything; the loop must exit cleanly + await cts.CancelAsync(); + engine.StopGatherLoop(); + + // Yield a few times to let any in-flight dispatch complete + for (var i = 0; i < 10; i++) await Task.Yield(); + + deliveredCount.ShouldBe(0); + } + + // ----------------------------------------------------------------------- + // Test 12 — GatherLoop handles an empty store + // + // Go reference: consumer.go:1620 — when no messages exist the loop waits + // on the signal channel with a 250ms timeout rather than busy-spinning. + // We verify it does NOT deliver anything when the store remains empty, + // and that it exits cleanly when cancelled. + // ----------------------------------------------------------------------- + [Fact] + [SlopwatchSuppress("SW004", "Negative timing assertion: verifying the gather loop does NOT deliver from an empty store requires a real wall-clock window; no synchronisation primitive can replace observing the absence of delivery")] + public async Task GatherLoop_handles_empty_store() + { + var store = new MemStore(); // nothing appended + var consumer = MakeConsumer(new ConsumerConfig { DurableName = "EMPTY" }); + + var deliveredCount = 0; + var firstCallTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + Func, ReadOnlyMemory, CancellationToken, ValueTask> send = + (_, _, _, _, _) => + { + Interlocked.Increment(ref deliveredCount); + firstCallTcs.TrySetResult(true); + return ValueTask.CompletedTask; + }; + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var engine = new PushConsumerEngine(); + engine.StartGatherLoop(consumer, store, send, cts.Token); + + // The send delegate should never be called because the store is empty. + // Wait a short absolute time; if it fires we know the loop is broken. + var unexpectedDelivery = await Task.WhenAny( + firstCallTcs.Task, + Task.Delay(150, cts.Token)) == firstCallTcs.Task; + + engine.StopGatherLoop(); + + unexpectedDelivery.ShouldBeFalse("gather loop must not deliver from an empty store"); + deliveredCount.ShouldBe(0); + } +}