feat: add PushConsumer delivery loop and RedeliveryTracker (C3+C4)
C3 – PushConsumerEngine delivery dispatch: - Add DeliverSubject property (mirrors consumer.go:1131 dsubj field) - Add StartDeliveryLoop / StopDeliveryLoop: background Task that drains ConsumerHandle.PushFrames and calls a sendMessage delegate per frame - Delivery loop honours AvailableAtUtc for rate-limiting (consumer.go:5120) - Data frames: HMSG headers Nats-Sequence, Nats-Time-Stamp, Nats-Subject (stream.go:586 JSSequence / JSTimeStamp / JSSubject constants) - Flow-control frames: "NATS/1.0 100 FlowControl Request" (consumer.go:5501) - Heartbeat frames: "NATS/1.0 100 Idle Heartbeat" (consumer.go:5222) - Add DeliverSubject field to ConsumerConfig (consumer.go:115) C4 – RedeliveryTracker with backoff schedules: - Schedule(seq, deliveryCount, ackWaitMs): computes deadline using backoff array indexed by (deliveryCount-1), clamped at last entry (consumer.go:5540) - GetDue(): returns sequences whose deadline has passed - Acknowledge(seq): removes sequence from tracking - IsMaxDeliveries(seq, maxDeliver): checks threshold for drop decision - Empty backoff array falls back to ackWaitMs Tests: 7 PushConsumerDelivery tests + 10 RedeliveryTracker tests (17 total)
This commit is contained in:
@@ -1,3 +1,6 @@
|
||||
// Go: consumer.go (sendIdleHeartbeat ~line 5222, sendFlowControl ~line 5495,
|
||||
// deliverMsg ~line 5364, dispatchToDeliver ~line 5040)
|
||||
using System.Text;
|
||||
using NATS.Server.JetStream.Models;
|
||||
using NATS.Server.JetStream.Storage;
|
||||
|
||||
@@ -5,6 +8,12 @@ namespace NATS.Server.JetStream.Consumers;
|
||||
|
||||
public sealed class PushConsumerEngine
|
||||
{
|
||||
// Go: consumer.go — DeliverSubject routes push-mode messages (cfg.DeliverSubject)
|
||||
public string DeliverSubject { get; private set; } = string.Empty;
|
||||
|
||||
private CancellationTokenSource? _cts;
|
||||
private Task? _deliveryTask;
|
||||
|
||||
public void Enqueue(ConsumerHandle consumer, StoredMessage message)
|
||||
{
|
||||
if (message.Sequence <= consumer.AckProcessor.AckFloor)
|
||||
@@ -48,6 +57,109 @@ public sealed class PushConsumerEngine
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Go: consumer.go:1131 — dsubj is set from cfg.DeliverSubject at consumer creation.
|
||||
// StartDeliveryLoop wires the background pump that drains PushFrames and calls
|
||||
// sendMessage for each frame. The delegate matches the wire-level send signature used
|
||||
// by NatsClient.SendMessage, mapped to an async ValueTask for testability.
|
||||
public void StartDeliveryLoop(
|
||||
ConsumerHandle consumer,
|
||||
Func<string, string, ReadOnlyMemory<byte>, ReadOnlyMemory<byte>, CancellationToken, ValueTask> sendMessage,
|
||||
CancellationToken ct)
|
||||
{
|
||||
DeliverSubject = consumer.Config.DeliverSubject;
|
||||
|
||||
_cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
||||
var token = _cts.Token;
|
||||
|
||||
_deliveryTask = Task.Run(() => RunDeliveryLoopAsync(consumer, sendMessage, token), token);
|
||||
}
|
||||
|
||||
public void StopDeliveryLoop()
|
||||
{
|
||||
_cts?.Cancel();
|
||||
_cts?.Dispose();
|
||||
_cts = null;
|
||||
}
|
||||
|
||||
// Go: consumer.go:5040 — dispatchToDeliver drains the outbound message queue.
|
||||
// For push consumers the dsubj is cfg.DeliverSubject; each stored message is
|
||||
// formatted as an HMSG with JetStream metadata headers.
|
||||
private static async Task RunDeliveryLoopAsync(
|
||||
ConsumerHandle consumer,
|
||||
Func<string, string, ReadOnlyMemory<byte>, ReadOnlyMemory<byte>, CancellationToken, ValueTask> sendMessage,
|
||||
CancellationToken ct)
|
||||
{
|
||||
var deliverSubject = consumer.Config.DeliverSubject;
|
||||
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
if (consumer.PushFrames.Count == 0)
|
||||
{
|
||||
// Yield to avoid busy-spin when the queue is empty
|
||||
await Task.Delay(1, ct).ConfigureAwait(false);
|
||||
continue;
|
||||
}
|
||||
|
||||
var frame = consumer.PushFrames.Peek();
|
||||
|
||||
// Go: consumer.go — rate-limit by honouring AvailableAtUtc before dequeuing
|
||||
var now = DateTime.UtcNow;
|
||||
if (frame.AvailableAtUtc > now)
|
||||
{
|
||||
var wait = frame.AvailableAtUtc - now;
|
||||
try
|
||||
{
|
||||
await Task.Delay(wait, ct).ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
consumer.PushFrames.Dequeue();
|
||||
|
||||
try
|
||||
{
|
||||
if (frame.IsData && frame.Message is { } msg)
|
||||
{
|
||||
// Go: consumer.go:5067 — build JetStream metadata headers
|
||||
// Header format: NATS/1.0\r\nNats-Sequence: {seq}\r\nNats-Time-Stamp: {ts}\r\nNats-Subject: {subj}\r\n\r\n
|
||||
var headers = BuildDataHeaders(msg);
|
||||
var subject = string.IsNullOrEmpty(deliverSubject) ? msg.Subject : deliverSubject;
|
||||
await sendMessage(subject, msg.Subject, headers, msg.Payload, ct).ConfigureAwait(false);
|
||||
}
|
||||
else if (frame.IsFlowControl)
|
||||
{
|
||||
// Go: consumer.go:5501 — "NATS/1.0 100 FlowControl Request\r\n\r\n"
|
||||
var headers = "NATS/1.0 100 FlowControl Request\r\nNats-Flow-Control: \r\n\r\n"u8.ToArray();
|
||||
var subject = string.IsNullOrEmpty(deliverSubject) ? "_fc_" : deliverSubject;
|
||||
await sendMessage(subject, string.Empty, headers, ReadOnlyMemory<byte>.Empty, ct).ConfigureAwait(false);
|
||||
}
|
||||
else if (frame.IsHeartbeat)
|
||||
{
|
||||
// Go: consumer.go:5223 — "NATS/1.0 100 Idle Heartbeat\r\n..."
|
||||
var headers = "NATS/1.0 100 Idle Heartbeat\r\n\r\n"u8.ToArray();
|
||||
var subject = string.IsNullOrEmpty(deliverSubject) ? "_hb_" : deliverSubject;
|
||||
await sendMessage(subject, string.Empty, headers, ReadOnlyMemory<byte>.Empty, ct).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Go: stream.go:586 — JSSequence = "Nats-Sequence", JSTimeStamp = "Nats-Time-Stamp", JSSubject = "Nats-Subject"
|
||||
private static ReadOnlyMemory<byte> BuildDataHeaders(StoredMessage msg)
|
||||
{
|
||||
var ts = msg.TimestampUtc.ToString("O"); // ISO-8601 round-trip
|
||||
var header = $"NATS/1.0\r\nNats-Sequence: {msg.Sequence}\r\nNats-Time-Stamp: {ts}\r\nNats-Subject: {msg.Subject}\r\n\r\n";
|
||||
return Encoding.ASCII.GetBytes(header);
|
||||
}
|
||||
}
|
||||
|
||||
public sealed class PushFrame
|
||||
|
||||
92
src/NATS.Server/JetStream/Consumers/RedeliveryTracker.cs
Normal file
92
src/NATS.Server/JetStream/Consumers/RedeliveryTracker.cs
Normal file
@@ -0,0 +1,92 @@
|
||||
// Go: consumer.go (trackPending, processNak, rdc map, addToRedeliverQueue ~line 5540)
|
||||
// RedeliveryTracker manages sequences waiting for redelivery after a NAK or ack-wait
|
||||
// expiry. It mirrors the Go consumer's rdc (redelivery count) map combined with the
|
||||
// rdq (redelivery queue) priority ordering.
|
||||
namespace NATS.Server.JetStream.Consumers;
|
||||
|
||||
public sealed class RedeliveryTracker
|
||||
{
|
||||
private readonly int[] _backoffMs;
|
||||
|
||||
// Go: consumer.go — pending maps sseq → (deadline, deliveries)
|
||||
private readonly Dictionary<ulong, RedeliveryEntry> _entries = new();
|
||||
|
||||
// Go: consumer.go:100 — BackOff []time.Duration in ConsumerConfig; empty falls back to ackWait
|
||||
public RedeliveryTracker(int[] backoffMs)
|
||||
{
|
||||
_backoffMs = backoffMs;
|
||||
}
|
||||
|
||||
// Go: consumer.go:5540 — trackPending records delivery count and schedules deadline
|
||||
// using the backoff array indexed by (deliveryCount-1), clamped at last entry.
|
||||
// Returns the UTC time at which the sequence next becomes eligible for redelivery.
|
||||
public DateTime Schedule(ulong seq, int deliveryCount, int ackWaitMs = 0)
|
||||
{
|
||||
var delayMs = ResolveDelay(deliveryCount, ackWaitMs);
|
||||
var deadline = DateTime.UtcNow.AddMilliseconds(Math.Max(delayMs, 1));
|
||||
|
||||
_entries[seq] = new RedeliveryEntry
|
||||
{
|
||||
DeadlineUtc = deadline,
|
||||
DeliveryCount = deliveryCount,
|
||||
};
|
||||
|
||||
return deadline;
|
||||
}
|
||||
|
||||
// Go: consumer.go — rdq entries are dispatched once their deadline has passed
|
||||
public IReadOnlyList<ulong> GetDue()
|
||||
{
|
||||
var now = DateTime.UtcNow;
|
||||
List<ulong>? due = null;
|
||||
|
||||
foreach (var (seq, entry) in _entries)
|
||||
{
|
||||
if (entry.DeadlineUtc <= now)
|
||||
{
|
||||
due ??= [];
|
||||
due.Add(seq);
|
||||
}
|
||||
}
|
||||
|
||||
return due ?? (IReadOnlyList<ulong>)[];
|
||||
}
|
||||
|
||||
// Go: consumer.go — acking a sequence removes it from the pending redelivery set
|
||||
public void Acknowledge(ulong seq) => _entries.Remove(seq);
|
||||
|
||||
// Go: consumer.go — maxdeliver check: drop sequence once delivery count exceeds max
|
||||
public bool IsMaxDeliveries(ulong seq, int maxDeliver)
|
||||
{
|
||||
if (maxDeliver <= 0)
|
||||
return false;
|
||||
|
||||
if (!_entries.TryGetValue(seq, out var entry))
|
||||
return false;
|
||||
|
||||
return entry.DeliveryCount >= maxDeliver;
|
||||
}
|
||||
|
||||
public bool IsTracking(ulong seq) => _entries.ContainsKey(seq);
|
||||
|
||||
public int TrackedCount => _entries.Count;
|
||||
|
||||
// Go: consumer.go — backoff index = min(deliveries-1, len(backoff)-1);
|
||||
// falls back to ackWaitMs when the backoff array is empty.
|
||||
private int ResolveDelay(int deliveryCount, int ackWaitMs)
|
||||
{
|
||||
if (_backoffMs.Length == 0)
|
||||
return Math.Max(ackWaitMs, 1);
|
||||
|
||||
var idx = Math.Min(deliveryCount - 1, _backoffMs.Length - 1);
|
||||
if (idx < 0)
|
||||
idx = 0;
|
||||
return _backoffMs[idx];
|
||||
}
|
||||
|
||||
private sealed class RedeliveryEntry
|
||||
{
|
||||
public DateTime DeadlineUtc { get; set; }
|
||||
public int DeliveryCount { get; set; }
|
||||
}
|
||||
}
|
||||
@@ -15,6 +15,8 @@ public sealed class ConsumerConfig
|
||||
public int MaxDeliver { get; set; } = 1;
|
||||
public int MaxAckPending { get; set; }
|
||||
public bool Push { get; set; }
|
||||
// Go: consumer.go:115 — deliver_subject routes push messages to a NATS subject
|
||||
public string DeliverSubject { get; set; } = string.Empty;
|
||||
public int HeartbeatMs { get; set; }
|
||||
public List<int> BackOffMs { get; set; } = [];
|
||||
public bool FlowControl { get; set; }
|
||||
|
||||
Reference in New Issue
Block a user