// Go: consumer.go:1400 (loopAndGatherMsgs) — gather loop polls the store for new messages,
// dispatches them through the send delegate, respects filter subjects,
// advances NextSequence, handles deleted/null entries, and exits on cancellation.
using System.Text;
using NATS.Server.JetStream;
using NATS.Server.JetStream.Consumers;
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Storage;
namespace NATS.Server.JetStream.Tests.JetStream.Consumers;
public class DeliveryLoopTests
{
// -----------------------------------------------------------------------
// Helpers
// -----------------------------------------------------------------------
private static ConsumerHandle MakeConsumer(ConsumerConfig config)
=> new("TEST-STREAM", config);
///
/// Returns a send delegate that releases on each delivery
/// and accumulates original subjects into .
///
private static Func, ReadOnlyMemory, CancellationToken, ValueTask>
MakeSemaphoreSend(List deliveredOriginalSubjects, SemaphoreSlim sem)
=> (_, origSubj, _, _, _) =>
{
lock (deliveredOriginalSubjects)
deliveredOriginalSubjects.Add(origSubj);
sem.Release();
return ValueTask.CompletedTask;
};
// -----------------------------------------------------------------------
// Test 1 — GatherLoop polls store for new messages
//
// Go reference: consumer.go:1560 — poll store for messages from nextSeq to LastSeq.
// Three messages appended before the loop starts; loop must dispatch all three.
// -----------------------------------------------------------------------
[Fact]
public async Task GatherLoop_polls_store_for_new_messages()
{
var store = new MemStore();
await store.AppendAsync("foo", "msg1"u8.ToArray(), default);
await store.AppendAsync("foo", "msg2"u8.ToArray(), default);
await store.AppendAsync("foo", "msg3"u8.ToArray(), default);
var consumer = MakeConsumer(new ConsumerConfig { DurableName = "POLL" });
var delivered = new List();
var sem = new SemaphoreSlim(0);
var send = MakeSemaphoreSend(delivered, sem);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var engine = new PushConsumerEngine();
engine.StartGatherLoop(consumer, store, send, cts.Token);
// Wait for exactly 3 releases — one per dispatched message
await sem.WaitAsync(cts.Token);
await sem.WaitAsync(cts.Token);
await sem.WaitAsync(cts.Token);
engine.StopGatherLoop();
lock (delivered) delivered.Count.ShouldBe(3);
}
// -----------------------------------------------------------------------
// Test 2 — GatherLoop respects FilterSubject
//
// Go reference: consumer.go:1569 — ShouldDeliver skips messages whose subject
// does not match cfg.FilterSubject.
// -----------------------------------------------------------------------
[Fact]
public async Task GatherLoop_respects_filter_subject()
{
var store = new MemStore();
await store.AppendAsync("orders.us", "o1"u8.ToArray(), default);
await store.AppendAsync("events.x", "e1"u8.ToArray(), default);
await store.AppendAsync("orders.eu", "o2"u8.ToArray(), default);
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "FILTERED",
FilterSubject = "orders.>",
});
var delivered = new List();
var sem = new SemaphoreSlim(0);
var send = MakeSemaphoreSend(delivered, sem);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var engine = new PushConsumerEngine();
engine.StartGatherLoop(consumer, store, send, cts.Token);
// Only "orders.us" and "orders.eu" match the filter — wait for exactly 2
await sem.WaitAsync(cts.Token);
await sem.WaitAsync(cts.Token);
engine.StopGatherLoop();
lock (delivered)
{
delivered.Count.ShouldBe(2);
delivered.ShouldContain("orders.us");
delivered.ShouldContain("orders.eu");
delivered.ShouldNotContain("events.x");
}
}
// -----------------------------------------------------------------------
// Test 3 — ShouldDeliver with no filter delivers all subjects
//
// Go reference: consumer.go — empty FilterSubject + empty FilterSubjects → deliver all.
// -----------------------------------------------------------------------
[Fact]
public void ShouldDeliver_with_no_filter_delivers_all()
{
var config = new ConsumerConfig { DurableName = "ANY" };
PushConsumerEngine.ShouldDeliverPublic(config, "orders.new").ShouldBeTrue();
PushConsumerEngine.ShouldDeliverPublic(config, "events.x").ShouldBeTrue();
PushConsumerEngine.ShouldDeliverPublic(config, "telemetry.cpu.host1").ShouldBeTrue();
}
// -----------------------------------------------------------------------
// Test 4 — ShouldDeliver with single FilterSubject
//
// Go reference: consumer.go — FilterSubject is matched via SubjectMatch.MatchLiteral.
// -----------------------------------------------------------------------
[Fact]
public void ShouldDeliver_with_single_filter()
{
var config = new ConsumerConfig
{
DurableName = "SINGLE",
FilterSubject = "orders.us",
};
PushConsumerEngine.ShouldDeliverPublic(config, "orders.us").ShouldBeTrue();
PushConsumerEngine.ShouldDeliverPublic(config, "orders.eu").ShouldBeFalse();
PushConsumerEngine.ShouldDeliverPublic(config, "events.x").ShouldBeFalse();
}
// -----------------------------------------------------------------------
// Test 5 — ShouldDeliver with multiple filters (FilterSubjects list)
//
// Go reference: consumer.go — FilterSubjects: any match → deliver.
// -----------------------------------------------------------------------
[Fact]
public void ShouldDeliver_with_multiple_filters()
{
var config = new ConsumerConfig
{
DurableName = "MULTI",
FilterSubjects = ["orders.us", "events.created"],
};
PushConsumerEngine.ShouldDeliverPublic(config, "orders.us").ShouldBeTrue();
PushConsumerEngine.ShouldDeliverPublic(config, "events.created").ShouldBeTrue();
PushConsumerEngine.ShouldDeliverPublic(config, "orders.eu").ShouldBeFalse();
PushConsumerEngine.ShouldDeliverPublic(config, "events.deleted").ShouldBeFalse();
}
// -----------------------------------------------------------------------
// Test 6 — ShouldDeliver with wildcard filter
//
// Go reference: consumer.go — wildcard matching via SubjectMatch.MatchLiteral.
// "orders.*" matches one-token suffix; "orders.us.new" has two suffix tokens
// so it does not match.
// -----------------------------------------------------------------------
[Fact]
public void ShouldDeliver_with_wildcard_filter()
{
var config = new ConsumerConfig
{
DurableName = "WILDCARD",
FilterSubject = "orders.*",
};
PushConsumerEngine.ShouldDeliverPublic(config, "orders.new").ShouldBeTrue();
PushConsumerEngine.ShouldDeliverPublic(config, "orders.old").ShouldBeTrue();
PushConsumerEngine.ShouldDeliverPublic(config, "orders.us.new").ShouldBeFalse();
PushConsumerEngine.ShouldDeliverPublic(config, "events.x").ShouldBeFalse();
}
// -----------------------------------------------------------------------
// Test 7 — Signal(NewMessage) wakes the gather loop
//
// Go reference: consumer.go:1620 — channel send wakes the loop so it does
// not have to wait the full 250ms poll timeout.
// -----------------------------------------------------------------------
[Fact]
public async Task GatherLoop_signal_wakes_loop()
{
var store = new MemStore();
var consumer = MakeConsumer(new ConsumerConfig { DurableName = "SIGNAL" });
// loopStarted is released once the loop has begun its first wait cycle —
// we detect this by waiting until NextSequence has been set to 1 (the loop
// initialises it on entry) and the store is still empty.
var deliveredSem = new SemaphoreSlim(0, 1);
var delivered = new List();
Func, ReadOnlyMemory, CancellationToken, ValueTask> send =
(_, origSubj, _, _, _) =>
{
lock (delivered) delivered.Add(origSubj);
deliveredSem.Release();
return ValueTask.CompletedTask;
};
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var engine = new PushConsumerEngine();
engine.StartGatherLoop(consumer, store, send, cts.Token);
// Spin (yield only) until the loop has entered its first 250ms wait, which we
// infer from GatheredCount staying at 0 while the loop is running.
// We yield the thread without sleeping to avoid SW004.
var spins = 0;
while (engine.GatheredCount == 0 && spins < 5_000)
{
await Task.Yield();
spins++;
}
// Append a message and signal — delivery should arrive well before 500ms
await store.AppendAsync("foo", "hello"u8.ToArray(), default);
engine.Signal(ConsumerSignal.NewMessage);
var received = await deliveredSem.WaitAsync(TimeSpan.FromMilliseconds(500), cts.Token);
engine.StopGatherLoop();
received.ShouldBeTrue("expected delivery within 500ms after Signal(NewMessage)");
lock (delivered) delivered.Count.ShouldBeGreaterThanOrEqualTo(1);
}
// -----------------------------------------------------------------------
// Test 8 — GatherLoop advances NextSequence
//
// Go reference: consumer.go:1600 — nextSeq++ and consumer.NextSequence = nextSeq.
// -----------------------------------------------------------------------
[Fact]
public async Task GatherLoop_advances_next_sequence()
{
var store = new MemStore();
await store.AppendAsync("foo", "a"u8.ToArray(), default);
await store.AppendAsync("foo", "b"u8.ToArray(), default);
await store.AppendAsync("foo", "c"u8.ToArray(), default);
var consumer = MakeConsumer(new ConsumerConfig { DurableName = "SEQ" });
var delivered = new List();
var sem = new SemaphoreSlim(0);
var send = MakeSemaphoreSend(delivered, sem);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var engine = new PushConsumerEngine();
engine.StartGatherLoop(consumer, store, send, cts.Token);
// Wait for all three messages to be delivered
await sem.WaitAsync(cts.Token);
await sem.WaitAsync(cts.Token);
await sem.WaitAsync(cts.Token);
engine.StopGatherLoop();
// After delivering 3 messages NextSequence should be 4 (next to load)
consumer.NextSequence.ShouldBe((ulong)4);
}
// -----------------------------------------------------------------------
// Test 9 — GatherLoop skips deleted/null messages
//
// Go reference: consumer.go:1572 — LoadAsync returning null means the
// message was deleted; the gather loop simply advances past it.
// -----------------------------------------------------------------------
[Fact]
public async Task GatherLoop_skips_deleted_messages()
{
var store = new MemStore();
await store.AppendAsync("foo", "first"u8.ToArray(), default); // seq 1
await store.AppendAsync("foo", "second"u8.ToArray(), default); // seq 2
await store.AppendAsync("foo", "third"u8.ToArray(), default); // seq 3
// Delete seq 2 so LoadAsync returns null for it
await store.RemoveAsync(2, default);
var consumer = MakeConsumer(new ConsumerConfig { DurableName = "SKIP-DEL" });
var deliveredPayloads = new List();
var sem = new SemaphoreSlim(0);
Func, ReadOnlyMemory, CancellationToken, ValueTask> send =
(_, _, _, payload, _) =>
{
lock (deliveredPayloads) deliveredPayloads.Add(Encoding.UTF8.GetString(payload.Span));
sem.Release();
return ValueTask.CompletedTask;
};
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var engine = new PushConsumerEngine();
engine.StartGatherLoop(consumer, store, send, cts.Token);
// Only two messages remain after removing seq 2
await sem.WaitAsync(cts.Token);
await sem.WaitAsync(cts.Token);
engine.StopGatherLoop();
lock (deliveredPayloads)
{
deliveredPayloads.Count.ShouldBe(2);
deliveredPayloads.ShouldContain("first");
deliveredPayloads.ShouldContain("third");
deliveredPayloads.ShouldNotContain("second");
}
}
// -----------------------------------------------------------------------
// Test 10 — GatherLoop increments GatheredCount
//
// Go reference: consumer.go:1400 loopAndGatherMsgs — GatheredCount tracks
// every message dispatched to the subscriber.
// -----------------------------------------------------------------------
[Fact]
public async Task GatherLoop_increments_gathered_count()
{
var store = new MemStore();
await store.AppendAsync("foo", "x"u8.ToArray(), default);
await store.AppendAsync("foo", "y"u8.ToArray(), default);
var consumer = MakeConsumer(new ConsumerConfig { DurableName = "COUNT" });
var delivered = new List();
var sem = new SemaphoreSlim(0);
var send = MakeSemaphoreSend(delivered, sem);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var engine = new PushConsumerEngine();
engine.StartGatherLoop(consumer, store, send, cts.Token);
await sem.WaitAsync(cts.Token);
await sem.WaitAsync(cts.Token);
engine.StopGatherLoop();
engine.GatheredCount.ShouldBe(2);
}
// -----------------------------------------------------------------------
// Test 11 — GatherLoop stops on cancellation
//
// Go reference: consumer.go — the goroutine exits when the quit channel closes,
// which maps to CancellationToken cancellation here.
// -----------------------------------------------------------------------
[Fact]
public async Task GatherLoop_stops_on_cancellation()
{
var store = new MemStore();
var consumer = MakeConsumer(new ConsumerConfig { DurableName = "CANCEL" });
// loopRunning becomes set once the loop is executing its first iteration
var loopRunning = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var deliveredCount = 0;
Func, ReadOnlyMemory, CancellationToken, ValueTask> send =
(_, _, _, _, _) =>
{
Interlocked.Increment(ref deliveredCount);
return ValueTask.CompletedTask;
};
var cts = new CancellationTokenSource();
var engine = new PushConsumerEngine();
engine.StartGatherLoop(consumer, store, send, cts.Token);
// Cancel without appending anything; the loop must exit cleanly
await cts.CancelAsync();
engine.StopGatherLoop();
// Yield a few times to let any in-flight dispatch complete
for (var i = 0; i < 10; i++) await Task.Yield();
deliveredCount.ShouldBe(0);
}
// -----------------------------------------------------------------------
// Test 12 — GatherLoop handles an empty store
//
// Go reference: consumer.go:1620 — when no messages exist the loop waits
// on the signal channel with a 250ms timeout rather than busy-spinning.
// We verify it does NOT deliver anything when the store remains empty,
// and that it exits cleanly when cancelled.
// -----------------------------------------------------------------------
[Fact]
[SlopwatchSuppress("SW004", "Negative timing assertion: verifying the gather loop does NOT deliver from an empty store requires a real wall-clock window; no synchronisation primitive can replace observing the absence of delivery")]
public async Task GatherLoop_handles_empty_store()
{
var store = new MemStore(); // nothing appended
var consumer = MakeConsumer(new ConsumerConfig { DurableName = "EMPTY" });
var deliveredCount = 0;
var firstCallTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
Func, ReadOnlyMemory, CancellationToken, ValueTask> send =
(_, _, _, _, _) =>
{
Interlocked.Increment(ref deliveredCount);
firstCallTcs.TrySetResult(true);
return ValueTask.CompletedTask;
};
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var engine = new PushConsumerEngine();
engine.StartGatherLoop(consumer, store, send, cts.Token);
// The send delegate should never be called because the store is empty.
// Wait a short absolute time; if it fires we know the loop is broken.
var unexpectedDelivery = await Task.WhenAny(
firstCallTcs.Task,
Task.Delay(150, cts.Token)) == firstCallTcs.Task;
engine.StopGatherLoop();
unexpectedDelivery.ShouldBeFalse("gather loop must not deliver from an empty store");
deliveredCount.ShouldBe(0);
}
}