Files
natsdotnet/tests/NATS.Server.Tests/JetStream/Consumers/PushConsumerDeliveryTests.cs
Joseph Doherty 612b15c781 feat: add PushConsumer delivery loop and RedeliveryTracker (C3+C4)
C3 – PushConsumerEngine delivery dispatch:
- Add DeliverSubject property (mirrors consumer.go:1131 dsubj field)
- Add StartDeliveryLoop / StopDeliveryLoop: background Task that drains
  ConsumerHandle.PushFrames and calls a sendMessage delegate per frame
- Delivery loop honours AvailableAtUtc for rate-limiting (consumer.go:5120)
- Data frames: HMSG headers Nats-Sequence, Nats-Time-Stamp, Nats-Subject
  (stream.go:586 JSSequence / JSTimeStamp / JSSubject constants)
- Flow-control frames: "NATS/1.0 100 FlowControl Request" (consumer.go:5501)
- Heartbeat frames: "NATS/1.0 100 Idle Heartbeat" (consumer.go:5222)
- Add DeliverSubject field to ConsumerConfig (consumer.go:115)

C4 – RedeliveryTracker with backoff schedules:
- Schedule(seq, deliveryCount, ackWaitMs): computes deadline using backoff
  array indexed by (deliveryCount-1), clamped at last entry (consumer.go:5540)
- GetDue(): returns sequences whose deadline has passed
- Acknowledge(seq): removes sequence from tracking
- IsMaxDeliveries(seq, maxDeliver): checks threshold for drop decision
- Empty backoff array falls back to ackWaitMs

Tests: 7 PushConsumerDelivery tests + 10 RedeliveryTracker tests (17 total)
2026-02-24 15:01:15 -05:00

318 lines
12 KiB
C#

// Go: consumer.go (dispatchToDeliver ~line 5040, sendFlowControl ~line 5495,
// sendIdleHeartbeat ~line 5222, rate-limit logic ~line 5120)
using System.Collections.Concurrent;
using System.Text;
using NATS.Server.JetStream;
using NATS.Server.JetStream.Consumers;
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Storage;
namespace NATS.Server.Tests.JetStream.Consumers;
public class PushConsumerDeliveryTests
{
// Helper: build a ConsumerHandle wired with the given config
private static ConsumerHandle MakeConsumer(ConsumerConfig config)
=> new("TEST-STREAM", config);
// Helper: build a minimal StoredMessage
private static StoredMessage MakeMessage(ulong seq, string subject = "test.subject", string payload = "hello")
=> new()
{
Sequence = seq,
Subject = subject,
Payload = Encoding.UTF8.GetBytes(payload),
TimestampUtc = DateTime.UtcNow,
};
// -------------------------------------------------------------------------
// Test 1 — Delivery loop sends messages in FIFO order
//
// Go reference: consumer.go:5040 — dispatchToDeliver processes the outbound
// queue sequentially; messages must arrive in the order they were enqueued.
// -------------------------------------------------------------------------
[Fact]
public async Task DeliveryLoop_sends_messages_in_FIFO_order()
{
var engine = new PushConsumerEngine();
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "PUSH",
Push = true,
DeliverSubject = "deliver.test",
});
engine.Enqueue(consumer, MakeMessage(1, payload: "first"));
engine.Enqueue(consumer, MakeMessage(2, payload: "second"));
engine.Enqueue(consumer, MakeMessage(3, payload: "third"));
var received = new ConcurrentQueue<(string subject, ReadOnlyMemory<byte> payload)>();
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
engine.StartDeliveryLoop(consumer,
async (subj, _, _, payload, ct) =>
{
received.Enqueue((subj, payload));
await ValueTask.CompletedTask;
},
cts.Token);
// Wait until all three messages are delivered
while (received.Count < 3 && !cts.IsCancellationRequested)
await Task.Delay(5, cts.Token);
engine.StopDeliveryLoop();
received.Count.ShouldBe(3);
var items = received.ToArray();
Encoding.UTF8.GetString(items[0].payload.Span).ShouldBe("first");
Encoding.UTF8.GetString(items[1].payload.Span).ShouldBe("second");
Encoding.UTF8.GetString(items[2].payload.Span).ShouldBe("third");
}
// -------------------------------------------------------------------------
// Test 2 — Rate limiting delays delivery
//
// Go reference: consumer.go:5120 — the rate limiter delays sending when
// AvailableAtUtc is in the future. A frame whose AvailableAtUtc is 100ms
// ahead must not be delivered until that deadline has passed.
// The delivery loop honours frame.AvailableAtUtc directly; this test
// injects a frame with a known future timestamp to verify that behaviour.
// -------------------------------------------------------------------------
[Fact]
public async Task DeliveryLoop_rate_limiting_delays_delivery()
{
var engine = new PushConsumerEngine();
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "RATE",
Push = true,
DeliverSubject = "deliver.rate",
});
// Inject a frame with AvailableAtUtc 150ms in the future to simulate
// what Enqueue() computes when RateLimitBps produces a delay.
var msg = MakeMessage(1);
consumer.PushFrames.Enqueue(new PushFrame
{
IsData = true,
Message = msg,
AvailableAtUtc = DateTime.UtcNow.AddMilliseconds(150),
});
var delivered = new TaskCompletionSource<DateTime>();
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var startedAt = DateTime.UtcNow;
engine.StartDeliveryLoop(consumer,
async (_, _, _, _, _) =>
{
delivered.TrySetResult(DateTime.UtcNow);
await ValueTask.CompletedTask;
},
cts.Token);
var deliveredAt = await delivered.Task.WaitAsync(TimeSpan.FromSeconds(5));
engine.StopDeliveryLoop();
// The loop must have waited at least ~100ms for AvailableAtUtc to pass
var elapsed = deliveredAt - startedAt;
elapsed.TotalMilliseconds.ShouldBeGreaterThan(100);
}
// -------------------------------------------------------------------------
// Test 3 — Heartbeat frames are sent
//
// Go reference: consumer.go:5222 — sendIdleHeartbeat emits a
// "NATS/1.0 100 Idle Heartbeat" status frame on the deliver subject.
// -------------------------------------------------------------------------
[Fact]
public async Task DeliveryLoop_sends_heartbeat_frames()
{
var engine = new PushConsumerEngine();
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "HB",
Push = true,
DeliverSubject = "deliver.hb",
HeartbeatMs = 100,
});
// Enqueue one data message; HeartbeatMs > 0 causes Enqueue to also
// append a heartbeat frame immediately after.
engine.Enqueue(consumer, MakeMessage(1));
var headerSnapshots = new ConcurrentBag<ReadOnlyMemory<byte>>();
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
engine.StartDeliveryLoop(consumer,
async (_, _, headers, _, _) =>
{
headerSnapshots.Add(headers);
await ValueTask.CompletedTask;
},
cts.Token);
// Wait for both the data frame and the heartbeat frame
while (headerSnapshots.Count < 2 && !cts.IsCancellationRequested)
await Task.Delay(5, cts.Token);
engine.StopDeliveryLoop();
headerSnapshots.Count.ShouldBeGreaterThanOrEqualTo(2);
// At least one frame must contain "Idle Heartbeat"
var anyHeartbeat = headerSnapshots.Any(h =>
Encoding.ASCII.GetString(h.Span).Contains("Idle Heartbeat"));
anyHeartbeat.ShouldBeTrue();
}
// -------------------------------------------------------------------------
// Test 4 — Flow control frames are sent
//
// Go reference: consumer.go:5495 — sendFlowControl sends a status frame
// "NATS/1.0 100 FlowControl Request" to the deliver subject.
// -------------------------------------------------------------------------
[Fact]
public async Task DeliveryLoop_sends_flow_control_frames()
{
var engine = new PushConsumerEngine();
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "FC",
Push = true,
DeliverSubject = "deliver.fc",
FlowControl = true,
HeartbeatMs = 100, // Go requires heartbeat when flow control is on
});
engine.Enqueue(consumer, MakeMessage(1));
var headerSnapshots = new ConcurrentBag<ReadOnlyMemory<byte>>();
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
engine.StartDeliveryLoop(consumer,
async (_, _, headers, _, _) =>
{
headerSnapshots.Add(headers);
await ValueTask.CompletedTask;
},
cts.Token);
// data + flow-control + heartbeat = 3 frames
while (headerSnapshots.Count < 3 && !cts.IsCancellationRequested)
await Task.Delay(5, cts.Token);
engine.StopDeliveryLoop();
var anyFlowControl = headerSnapshots.Any(h =>
Encoding.ASCII.GetString(h.Span).Contains("FlowControl"));
anyFlowControl.ShouldBeTrue();
}
// -------------------------------------------------------------------------
// Test 5 — Delivery stops on cancellation
//
// Go reference: consumer.go — the delivery goroutine exits when the qch
// (quit channel) is signalled, which maps to CancellationToken here.
// -------------------------------------------------------------------------
[Fact]
public async Task DeliveryLoop_stops_on_cancellation()
{
var engine = new PushConsumerEngine();
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "CANCEL",
Push = true,
DeliverSubject = "deliver.cancel",
});
var deliveryCount = 0;
var cts = new CancellationTokenSource();
engine.StartDeliveryLoop(consumer,
async (_, _, _, _, _) =>
{
Interlocked.Increment(ref deliveryCount);
await ValueTask.CompletedTask;
},
cts.Token);
// Cancel immediately — nothing enqueued so delivery count must stay 0
await cts.CancelAsync();
engine.StopDeliveryLoop();
// Brief settle — no messages were queued so nothing should have been delivered
await Task.Delay(20);
deliveryCount.ShouldBe(0);
}
// -------------------------------------------------------------------------
// Test 6 — Data frame headers contain JetStream metadata
//
// Go reference: stream.go:586 — JSSequence = "Nats-Sequence",
// JSTimeStamp = "Nats-Time-Stamp", JSSubject = "Nats-Subject"
// -------------------------------------------------------------------------
[Fact]
public async Task DeliveryLoop_data_frame_headers_contain_jetstream_metadata()
{
var engine = new PushConsumerEngine();
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "META",
Push = true,
DeliverSubject = "deliver.meta",
});
var msg = MakeMessage(42, subject: "events.created");
engine.Enqueue(consumer, msg);
ReadOnlyMemory<byte>? capturedHeaders = null;
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var tcs = new TaskCompletionSource<bool>();
engine.StartDeliveryLoop(consumer,
async (_, _, headers, _, _) =>
{
capturedHeaders = headers;
tcs.TrySetResult(true);
await ValueTask.CompletedTask;
},
cts.Token);
await tcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
engine.StopDeliveryLoop();
capturedHeaders.ShouldNotBeNull();
var headerText = Encoding.ASCII.GetString(capturedHeaders!.Value.Span);
headerText.ShouldContain("Nats-Sequence: 42");
headerText.ShouldContain("Nats-Subject: events.created");
headerText.ShouldContain("Nats-Time-Stamp:");
}
// -------------------------------------------------------------------------
// Test 7 — DeliverSubject property is set when StartDeliveryLoop is called
//
// Go reference: consumer.go:1131 — dsubj is set from cfg.DeliverSubject.
// -------------------------------------------------------------------------
[Fact]
public void DeliverSubject_property_is_set_from_consumer_config()
{
var engine = new PushConsumerEngine();
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "DS",
Push = true,
DeliverSubject = "my.deliver.subject",
});
using var cts = new CancellationTokenSource();
engine.StartDeliveryLoop(consumer,
(_, _, _, _, _) => ValueTask.CompletedTask,
cts.Token);
engine.DeliverSubject.ShouldBe("my.deliver.subject");
engine.StopDeliveryLoop();
}
}