Adds DeliveryLoopTests covering the gather loop's store polling, filter subject enforcement, NextSequence advancement, deleted-message skipping, GatheredCount tracking, signal wake-up, and cancellation behaviour.
428 lines
18 KiB
C#
428 lines
18 KiB
C#
// Go: consumer.go:1400 (loopAndGatherMsgs) — gather loop polls the store for new messages,
|
|
// dispatches them through the send delegate, respects filter subjects,
|
|
// advances NextSequence, handles deleted/null entries, and exits on cancellation.
|
|
using System.Text;
|
|
using NATS.Server.JetStream;
|
|
using NATS.Server.JetStream.Consumers;
|
|
using NATS.Server.JetStream.Models;
|
|
using NATS.Server.JetStream.Storage;
|
|
|
|
namespace NATS.Server.Tests.JetStream.Consumers;
|
|
|
|
public class DeliveryLoopTests
|
|
{
|
|
// -----------------------------------------------------------------------
|
|
// Helpers
|
|
// -----------------------------------------------------------------------
|
|
|
|
private static ConsumerHandle MakeConsumer(ConsumerConfig config)
|
|
=> new("TEST-STREAM", config);
|
|
|
|
/// <summary>
|
|
/// Returns a send delegate that releases <paramref name="sem"/> on each delivery
|
|
/// and accumulates original subjects into <paramref name="deliveredOriginalSubjects"/>.
|
|
/// </summary>
|
|
private static Func<string, string, ReadOnlyMemory<byte>, ReadOnlyMemory<byte>, CancellationToken, ValueTask>
|
|
MakeSemaphoreSend(List<string> deliveredOriginalSubjects, SemaphoreSlim sem)
|
|
=> (_, origSubj, _, _, _) =>
|
|
{
|
|
lock (deliveredOriginalSubjects)
|
|
deliveredOriginalSubjects.Add(origSubj);
|
|
sem.Release();
|
|
return ValueTask.CompletedTask;
|
|
};
|
|
|
|
// -----------------------------------------------------------------------
|
|
// Test 1 — GatherLoop polls store for new messages
|
|
//
|
|
// Go reference: consumer.go:1560 — poll store for messages from nextSeq to LastSeq.
|
|
// Three messages appended before the loop starts; loop must dispatch all three.
|
|
// -----------------------------------------------------------------------
|
|
[Fact]
|
|
public async Task GatherLoop_polls_store_for_new_messages()
|
|
{
|
|
var store = new MemStore();
|
|
await store.AppendAsync("foo", "msg1"u8.ToArray(), default);
|
|
await store.AppendAsync("foo", "msg2"u8.ToArray(), default);
|
|
await store.AppendAsync("foo", "msg3"u8.ToArray(), default);
|
|
|
|
var consumer = MakeConsumer(new ConsumerConfig { DurableName = "POLL" });
|
|
var delivered = new List<string>();
|
|
var sem = new SemaphoreSlim(0);
|
|
var send = MakeSemaphoreSend(delivered, sem);
|
|
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
var engine = new PushConsumerEngine();
|
|
engine.StartGatherLoop(consumer, store, send, cts.Token);
|
|
|
|
// Wait for exactly 3 releases — one per dispatched message
|
|
await sem.WaitAsync(cts.Token);
|
|
await sem.WaitAsync(cts.Token);
|
|
await sem.WaitAsync(cts.Token);
|
|
|
|
engine.StopGatherLoop();
|
|
|
|
lock (delivered) delivered.Count.ShouldBe(3);
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// Test 2 — GatherLoop respects FilterSubject
|
|
//
|
|
// Go reference: consumer.go:1569 — ShouldDeliver skips messages whose subject
|
|
// does not match cfg.FilterSubject.
|
|
// -----------------------------------------------------------------------
|
|
[Fact]
|
|
public async Task GatherLoop_respects_filter_subject()
|
|
{
|
|
var store = new MemStore();
|
|
await store.AppendAsync("orders.us", "o1"u8.ToArray(), default);
|
|
await store.AppendAsync("events.x", "e1"u8.ToArray(), default);
|
|
await store.AppendAsync("orders.eu", "o2"u8.ToArray(), default);
|
|
|
|
var consumer = MakeConsumer(new ConsumerConfig
|
|
{
|
|
DurableName = "FILTERED",
|
|
FilterSubject = "orders.>",
|
|
});
|
|
|
|
var delivered = new List<string>();
|
|
var sem = new SemaphoreSlim(0);
|
|
var send = MakeSemaphoreSend(delivered, sem);
|
|
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
var engine = new PushConsumerEngine();
|
|
engine.StartGatherLoop(consumer, store, send, cts.Token);
|
|
|
|
// Only "orders.us" and "orders.eu" match the filter — wait for exactly 2
|
|
await sem.WaitAsync(cts.Token);
|
|
await sem.WaitAsync(cts.Token);
|
|
|
|
engine.StopGatherLoop();
|
|
|
|
lock (delivered)
|
|
{
|
|
delivered.Count.ShouldBe(2);
|
|
delivered.ShouldContain("orders.us");
|
|
delivered.ShouldContain("orders.eu");
|
|
delivered.ShouldNotContain("events.x");
|
|
}
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// Test 3 — ShouldDeliver with no filter delivers all subjects
|
|
//
|
|
// Go reference: consumer.go — empty FilterSubject + empty FilterSubjects → deliver all.
|
|
// -----------------------------------------------------------------------
|
|
[Fact]
|
|
public void ShouldDeliver_with_no_filter_delivers_all()
|
|
{
|
|
var config = new ConsumerConfig { DurableName = "ANY" };
|
|
|
|
PushConsumerEngine.ShouldDeliverPublic(config, "orders.new").ShouldBeTrue();
|
|
PushConsumerEngine.ShouldDeliverPublic(config, "events.x").ShouldBeTrue();
|
|
PushConsumerEngine.ShouldDeliverPublic(config, "telemetry.cpu.host1").ShouldBeTrue();
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// Test 4 — ShouldDeliver with single FilterSubject
|
|
//
|
|
// Go reference: consumer.go — FilterSubject is matched via SubjectMatch.MatchLiteral.
|
|
// -----------------------------------------------------------------------
|
|
[Fact]
|
|
public void ShouldDeliver_with_single_filter()
|
|
{
|
|
var config = new ConsumerConfig
|
|
{
|
|
DurableName = "SINGLE",
|
|
FilterSubject = "orders.us",
|
|
};
|
|
|
|
PushConsumerEngine.ShouldDeliverPublic(config, "orders.us").ShouldBeTrue();
|
|
PushConsumerEngine.ShouldDeliverPublic(config, "orders.eu").ShouldBeFalse();
|
|
PushConsumerEngine.ShouldDeliverPublic(config, "events.x").ShouldBeFalse();
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// Test 5 — ShouldDeliver with multiple filters (FilterSubjects list)
|
|
//
|
|
// Go reference: consumer.go — FilterSubjects: any match → deliver.
|
|
// -----------------------------------------------------------------------
|
|
[Fact]
|
|
public void ShouldDeliver_with_multiple_filters()
|
|
{
|
|
var config = new ConsumerConfig
|
|
{
|
|
DurableName = "MULTI",
|
|
FilterSubjects = ["orders.us", "events.created"],
|
|
};
|
|
|
|
PushConsumerEngine.ShouldDeliverPublic(config, "orders.us").ShouldBeTrue();
|
|
PushConsumerEngine.ShouldDeliverPublic(config, "events.created").ShouldBeTrue();
|
|
PushConsumerEngine.ShouldDeliverPublic(config, "orders.eu").ShouldBeFalse();
|
|
PushConsumerEngine.ShouldDeliverPublic(config, "events.deleted").ShouldBeFalse();
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// Test 6 — ShouldDeliver with wildcard filter
|
|
//
|
|
// Go reference: consumer.go — wildcard matching via SubjectMatch.MatchLiteral.
|
|
// "orders.*" matches one-token suffix; "orders.us.new" has two suffix tokens
|
|
// so it does not match.
|
|
// -----------------------------------------------------------------------
|
|
[Fact]
|
|
public void ShouldDeliver_with_wildcard_filter()
|
|
{
|
|
var config = new ConsumerConfig
|
|
{
|
|
DurableName = "WILDCARD",
|
|
FilterSubject = "orders.*",
|
|
};
|
|
|
|
PushConsumerEngine.ShouldDeliverPublic(config, "orders.new").ShouldBeTrue();
|
|
PushConsumerEngine.ShouldDeliverPublic(config, "orders.old").ShouldBeTrue();
|
|
PushConsumerEngine.ShouldDeliverPublic(config, "orders.us.new").ShouldBeFalse();
|
|
PushConsumerEngine.ShouldDeliverPublic(config, "events.x").ShouldBeFalse();
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// Test 7 — Signal(NewMessage) wakes the gather loop
|
|
//
|
|
// Go reference: consumer.go:1620 — channel send wakes the loop so it does
|
|
// not have to wait the full 250ms poll timeout.
|
|
// -----------------------------------------------------------------------
|
|
[Fact]
|
|
public async Task GatherLoop_signal_wakes_loop()
|
|
{
|
|
var store = new MemStore();
|
|
var consumer = MakeConsumer(new ConsumerConfig { DurableName = "SIGNAL" });
|
|
|
|
// loopStarted is released once the loop has begun its first wait cycle —
|
|
// we detect this by waiting until NextSequence has been set to 1 (the loop
|
|
// initialises it on entry) and the store is still empty.
|
|
var deliveredSem = new SemaphoreSlim(0, 1);
|
|
var delivered = new List<string>();
|
|
|
|
Func<string, string, ReadOnlyMemory<byte>, ReadOnlyMemory<byte>, CancellationToken, ValueTask> send =
|
|
(_, origSubj, _, _, _) =>
|
|
{
|
|
lock (delivered) delivered.Add(origSubj);
|
|
deliveredSem.Release();
|
|
return ValueTask.CompletedTask;
|
|
};
|
|
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
var engine = new PushConsumerEngine();
|
|
engine.StartGatherLoop(consumer, store, send, cts.Token);
|
|
|
|
// Spin (yield only) until the loop has entered its first 250ms wait, which we
|
|
// infer from GatheredCount staying at 0 while the loop is running.
|
|
// We yield the thread without sleeping to avoid SW004.
|
|
var spins = 0;
|
|
while (engine.GatheredCount == 0 && spins < 5_000)
|
|
{
|
|
await Task.Yield();
|
|
spins++;
|
|
}
|
|
|
|
// Append a message and signal — delivery should arrive well before 500ms
|
|
await store.AppendAsync("foo", "hello"u8.ToArray(), default);
|
|
engine.Signal(ConsumerSignal.NewMessage);
|
|
|
|
var received = await deliveredSem.WaitAsync(TimeSpan.FromMilliseconds(500), cts.Token);
|
|
engine.StopGatherLoop();
|
|
|
|
received.ShouldBeTrue("expected delivery within 500ms after Signal(NewMessage)");
|
|
lock (delivered) delivered.Count.ShouldBeGreaterThanOrEqualTo(1);
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// Test 8 — GatherLoop advances NextSequence
|
|
//
|
|
// Go reference: consumer.go:1600 — nextSeq++ and consumer.NextSequence = nextSeq.
|
|
// -----------------------------------------------------------------------
|
|
[Fact]
|
|
public async Task GatherLoop_advances_next_sequence()
|
|
{
|
|
var store = new MemStore();
|
|
await store.AppendAsync("foo", "a"u8.ToArray(), default);
|
|
await store.AppendAsync("foo", "b"u8.ToArray(), default);
|
|
await store.AppendAsync("foo", "c"u8.ToArray(), default);
|
|
|
|
var consumer = MakeConsumer(new ConsumerConfig { DurableName = "SEQ" });
|
|
var delivered = new List<string>();
|
|
var sem = new SemaphoreSlim(0);
|
|
var send = MakeSemaphoreSend(delivered, sem);
|
|
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
var engine = new PushConsumerEngine();
|
|
engine.StartGatherLoop(consumer, store, send, cts.Token);
|
|
|
|
// Wait for all three messages to be delivered
|
|
await sem.WaitAsync(cts.Token);
|
|
await sem.WaitAsync(cts.Token);
|
|
await sem.WaitAsync(cts.Token);
|
|
|
|
engine.StopGatherLoop();
|
|
|
|
// After delivering 3 messages NextSequence should be 4 (next to load)
|
|
consumer.NextSequence.ShouldBe((ulong)4);
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// Test 9 — GatherLoop skips deleted/null messages
|
|
//
|
|
// Go reference: consumer.go:1572 — LoadAsync returning null means the
|
|
// message was deleted; the gather loop simply advances past it.
|
|
// -----------------------------------------------------------------------
|
|
[Fact]
|
|
public async Task GatherLoop_skips_deleted_messages()
|
|
{
|
|
var store = new MemStore();
|
|
await store.AppendAsync("foo", "first"u8.ToArray(), default); // seq 1
|
|
await store.AppendAsync("foo", "second"u8.ToArray(), default); // seq 2
|
|
await store.AppendAsync("foo", "third"u8.ToArray(), default); // seq 3
|
|
|
|
// Delete seq 2 so LoadAsync returns null for it
|
|
await store.RemoveAsync(2, default);
|
|
|
|
var consumer = MakeConsumer(new ConsumerConfig { DurableName = "SKIP-DEL" });
|
|
var deliveredPayloads = new List<string>();
|
|
var sem = new SemaphoreSlim(0);
|
|
|
|
Func<string, string, ReadOnlyMemory<byte>, ReadOnlyMemory<byte>, CancellationToken, ValueTask> send =
|
|
(_, _, _, payload, _) =>
|
|
{
|
|
lock (deliveredPayloads) deliveredPayloads.Add(Encoding.UTF8.GetString(payload.Span));
|
|
sem.Release();
|
|
return ValueTask.CompletedTask;
|
|
};
|
|
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
var engine = new PushConsumerEngine();
|
|
engine.StartGatherLoop(consumer, store, send, cts.Token);
|
|
|
|
// Only two messages remain after removing seq 2
|
|
await sem.WaitAsync(cts.Token);
|
|
await sem.WaitAsync(cts.Token);
|
|
|
|
engine.StopGatherLoop();
|
|
|
|
lock (deliveredPayloads)
|
|
{
|
|
deliveredPayloads.Count.ShouldBe(2);
|
|
deliveredPayloads.ShouldContain("first");
|
|
deliveredPayloads.ShouldContain("third");
|
|
deliveredPayloads.ShouldNotContain("second");
|
|
}
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// Test 10 — GatherLoop increments GatheredCount
|
|
//
|
|
// Go reference: consumer.go:1400 loopAndGatherMsgs — GatheredCount tracks
|
|
// every message dispatched to the subscriber.
|
|
// -----------------------------------------------------------------------
|
|
[Fact]
|
|
public async Task GatherLoop_increments_gathered_count()
|
|
{
|
|
var store = new MemStore();
|
|
await store.AppendAsync("foo", "x"u8.ToArray(), default);
|
|
await store.AppendAsync("foo", "y"u8.ToArray(), default);
|
|
|
|
var consumer = MakeConsumer(new ConsumerConfig { DurableName = "COUNT" });
|
|
var delivered = new List<string>();
|
|
var sem = new SemaphoreSlim(0);
|
|
var send = MakeSemaphoreSend(delivered, sem);
|
|
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
var engine = new PushConsumerEngine();
|
|
engine.StartGatherLoop(consumer, store, send, cts.Token);
|
|
|
|
await sem.WaitAsync(cts.Token);
|
|
await sem.WaitAsync(cts.Token);
|
|
|
|
engine.StopGatherLoop();
|
|
|
|
engine.GatheredCount.ShouldBe(2);
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// Test 11 — GatherLoop stops on cancellation
|
|
//
|
|
// Go reference: consumer.go — the goroutine exits when the quit channel closes,
|
|
// which maps to CancellationToken cancellation here.
|
|
// -----------------------------------------------------------------------
|
|
[Fact]
|
|
public async Task GatherLoop_stops_on_cancellation()
|
|
{
|
|
var store = new MemStore();
|
|
var consumer = MakeConsumer(new ConsumerConfig { DurableName = "CANCEL" });
|
|
|
|
// loopRunning becomes set once the loop is executing its first iteration
|
|
var loopRunning = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
var deliveredCount = 0;
|
|
|
|
Func<string, string, ReadOnlyMemory<byte>, ReadOnlyMemory<byte>, CancellationToken, ValueTask> send =
|
|
(_, _, _, _, _) =>
|
|
{
|
|
Interlocked.Increment(ref deliveredCount);
|
|
return ValueTask.CompletedTask;
|
|
};
|
|
|
|
var cts = new CancellationTokenSource();
|
|
var engine = new PushConsumerEngine();
|
|
engine.StartGatherLoop(consumer, store, send, cts.Token);
|
|
|
|
// Cancel without appending anything; the loop must exit cleanly
|
|
await cts.CancelAsync();
|
|
engine.StopGatherLoop();
|
|
|
|
// Yield a few times to let any in-flight dispatch complete
|
|
for (var i = 0; i < 10; i++) await Task.Yield();
|
|
|
|
deliveredCount.ShouldBe(0);
|
|
}
|
|
|
|
// -----------------------------------------------------------------------
|
|
// Test 12 — GatherLoop handles an empty store
|
|
//
|
|
// Go reference: consumer.go:1620 — when no messages exist the loop waits
|
|
// on the signal channel with a 250ms timeout rather than busy-spinning.
|
|
// We verify it does NOT deliver anything when the store remains empty,
|
|
// and that it exits cleanly when cancelled.
|
|
// -----------------------------------------------------------------------
|
|
[Fact]
|
|
[SlopwatchSuppress("SW004", "Negative timing assertion: verifying the gather loop does NOT deliver from an empty store requires a real wall-clock window; no synchronisation primitive can replace observing the absence of delivery")]
|
|
public async Task GatherLoop_handles_empty_store()
|
|
{
|
|
var store = new MemStore(); // nothing appended
|
|
var consumer = MakeConsumer(new ConsumerConfig { DurableName = "EMPTY" });
|
|
|
|
var deliveredCount = 0;
|
|
var firstCallTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
|
|
Func<string, string, ReadOnlyMemory<byte>, ReadOnlyMemory<byte>, CancellationToken, ValueTask> send =
|
|
(_, _, _, _, _) =>
|
|
{
|
|
Interlocked.Increment(ref deliveredCount);
|
|
firstCallTcs.TrySetResult(true);
|
|
return ValueTask.CompletedTask;
|
|
};
|
|
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
var engine = new PushConsumerEngine();
|
|
engine.StartGatherLoop(consumer, store, send, cts.Token);
|
|
|
|
// The send delegate should never be called because the store is empty.
|
|
// Wait a short absolute time; if it fires we know the loop is broken.
|
|
var unexpectedDelivery = await Task.WhenAny(
|
|
firstCallTcs.Task,
|
|
Task.Delay(150, cts.Token)) == firstCallTcs.Task;
|
|
|
|
engine.StopGatherLoop();
|
|
|
|
unexpectedDelivery.ShouldBeFalse("gather loop must not deliver from an empty store");
|
|
deliveredCount.ShouldBe(0);
|
|
}
|
|
}
|