diff --git a/src/NATS.Server/JetStream/ConsumerManager.cs b/src/NATS.Server/JetStream/ConsumerManager.cs index a0fd71b..31c216a 100644 --- a/src/NATS.Server/JetStream/ConsumerManager.cs +++ b/src/NATS.Server/JetStream/ConsumerManager.cs @@ -320,4 +320,11 @@ public sealed record ConsumerHandle(string Stream, ConsumerConfig Config) public Queue PushFrames { get; } = new(); public AckProcessor AckProcessor { get; } = new(); public DateTime NextPushDataAvailableAtUtc { get; set; } + + /// + /// Total pending bytes across all unacknowledged messages. + /// Included in idle heartbeat headers as Nats-Pending-Bytes. + /// Go reference: consumer.go sendIdleHeartbeat. + /// + public long PendingBytes { get; set; } } diff --git a/src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs b/src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs index 7ddc4c1..9c51852 100644 --- a/src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs +++ b/src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs @@ -1,11 +1,27 @@ // Go: consumer.go (sendIdleHeartbeat ~line 5222, sendFlowControl ~line 5495, -// deliverMsg ~line 5364, dispatchToDeliver ~line 5040) +// deliverMsg ~line 5364, dispatchToDeliver ~line 5040, +// loopAndGatherMsgs ~line 1400) using System.Text; +using System.Threading.Channels; using NATS.Server.JetStream.Models; using NATS.Server.JetStream.Storage; +using NATS.Server.Subscriptions; namespace NATS.Server.JetStream.Consumers; +// Go: consumer.go:1400 — signals sent to the gather loop to wake it early. +public enum ConsumerSignal +{ + /// Store has new message(s) available. + NewMessage, + /// An ack/nak/term was processed. + AckEvent, + /// Consumer config changed. + ConfigChange, + /// Stop the loop. + Shutdown, +} + public sealed class PushConsumerEngine { // Go: consumer.go — DeliverSubject routes push-mode messages (cfg.DeliverSubject) @@ -19,12 +35,54 @@ public sealed class PushConsumerEngine private Func, ReadOnlyMemory, CancellationToken, ValueTask>? _sendMessage; private CancellationToken _externalCt; + // Go: consumer.go:5222 — reference to the consumer handle for pending count access + private ConsumerHandle? _consumer; + + // Go: consumer.go:1400 — gather loop state + private Channel? _signalChannel; + private CancellationTokenSource? _gatherCts; + private Task? _gatherTask; + /// /// Tracks how many idle heartbeats have been sent since the last data delivery. /// Useful for testing that idle heartbeats fire and reset correctly. /// public int IdleHeartbeatsSent { get; private set; } + /// + /// Number of flow control frames sent but not yet acknowledged by the subscriber. + /// Go reference: consumer.go:5495 flow control stall detection. + /// + public int FlowControlPendingCount { get; private set; } + + /// + /// Maximum unacknowledged flow control frames before the consumer is considered stalled. + /// Go reference: consumer.go flow control stall detection. + /// + public const int MaxFlowControlPending = 2; + + /// + /// Returns true if the consumer appears stalled due to too many unacknowledged flow control messages. + /// Go reference: consumer.go:5495 flow control stall detection. + /// + public bool IsFlowControlStalled => FlowControlPendingCount >= MaxFlowControlPending; + + /// + /// Number of messages gathered and dispatched by the gather loop. + /// Go reference: consumer.go:1400 loopAndGatherMsgs. + /// + public long GatheredCount { get; private set; } + + /// + /// Decrements the flow control pending count when the subscriber acknowledges a flow control frame. + /// Go reference: consumer.go:5495 flow control acknowledgement. + /// + public void AcknowledgeFlowControl() + { + if (FlowControlPendingCount > 0) + FlowControlPendingCount--; + } + public void Enqueue(ConsumerHandle consumer, StoredMessage message) { if (message.Sequence <= consumer.AckProcessor.AckFloor) @@ -85,6 +143,7 @@ public sealed class PushConsumerEngine _sendMessage = sendMessage; _externalCt = ct; + _consumer = consumer; _deliveryTask = Task.Run(() => RunDeliveryLoopAsync(consumer, sendMessage, token), token); @@ -103,6 +162,50 @@ public sealed class PushConsumerEngine _cts = null; } + /// + /// Starts the gather loop that polls the store for new messages. + /// Go reference: consumer.go:1400 loopAndGatherMsgs. + /// + public void StartGatherLoop( + ConsumerHandle consumer, + IStreamStore store, + Func, ReadOnlyMemory, CancellationToken, ValueTask> sendMessage, + CancellationToken ct) + { + _signalChannel = Channel.CreateUnbounded(); + _gatherCts = CancellationTokenSource.CreateLinkedTokenSource(ct); + _gatherTask = Task.Run( + () => LoopAndGatherMsgsAsync(consumer, store, sendMessage, _gatherCts.Token), + _gatherCts.Token); + } + + /// + /// Stops the gather loop and completes its signal channel. + /// + public void StopGatherLoop() + { + _signalChannel?.Writer.TryComplete(); + _gatherCts?.Cancel(); + _gatherCts?.Dispose(); + _gatherCts = null; + } + + /// + /// Signals the gather loop to wake up and re-poll the store. + /// Go reference: consumer.go:1620 — channel send wakes the loop. + /// + public void Signal(ConsumerSignal signal) + { + _signalChannel?.Writer.TryWrite(signal); + } + + /// + /// Public test accessor for the filter predicate. Production code uses + /// the private ShouldDeliver; this entry point avoids reflection in unit tests. + /// + public static bool ShouldDeliverPublic(ConsumerConfig config, string subject) + => ShouldDeliver(config, subject); + /// /// Reset the idle heartbeat timer. Called whenever a data frame is delivered /// so that the heartbeat only fires after a period of inactivity. @@ -118,6 +221,110 @@ public sealed class PushConsumerEngine } } + // Go: consumer.go:1400 loopAndGatherMsgs — background loop that polls the + // store for new messages and dispatches them to the consumer, with redelivery + // checking and signal-channel wake-up. + private async Task LoopAndGatherMsgsAsync( + ConsumerHandle consumer, + IStreamStore store, + Func, ReadOnlyMemory, CancellationToken, ValueTask> sendMessage, + CancellationToken ct) + { + var nextSeq = consumer.NextSequence; + + while (!ct.IsCancellationRequested) + { + // Go: consumer.go:1544 — check redelivery tracker for expired pending entries + while (consumer.AckProcessor.TryGetExpired(out var expiredSeq, out _)) + { + var redelivered = await store.LoadAsync(expiredSeq, ct).ConfigureAwait(false); + if (redelivered != null) + { + Enqueue(consumer, redelivered); + consumer.AckProcessor.ScheduleRedelivery(expiredSeq, consumer.Config.AckWaitMs); + GatheredCount++; + + var rHeaders = BuildDataHeaders(redelivered); + var rSubject = string.IsNullOrEmpty(consumer.Config.DeliverSubject) + ? redelivered.Subject + : consumer.Config.DeliverSubject; + try + { + await sendMessage(rSubject, redelivered.Subject, rHeaders, redelivered.Payload, ct) + .ConfigureAwait(false); + } + catch (OperationCanceledException) when (ct.IsCancellationRequested) + { + return; + } + } + else + { + consumer.AckProcessor.Drop(expiredSeq); + } + } + + // Go: consumer.go:1560 — poll store for new messages from nextSeq to LastSeq + var storeState = await store.GetStateAsync(ct).ConfigureAwait(false); + while (nextSeq <= storeState.LastSeq && !ct.IsCancellationRequested) + { + var msg = await store.LoadAsync(nextSeq, ct).ConfigureAwait(false); + + if (msg != null && ShouldDeliver(consumer.Config, msg.Subject)) + { + Enqueue(consumer, msg); + GatheredCount++; + + var headers = BuildDataHeaders(msg); + var subject = string.IsNullOrEmpty(consumer.Config.DeliverSubject) + ? msg.Subject + : consumer.Config.DeliverSubject; + try + { + await sendMessage(subject, msg.Subject, headers, msg.Payload, ct) + .ConfigureAwait(false); + } + catch (OperationCanceledException) when (ct.IsCancellationRequested) + { + return; + } + } + + nextSeq++; + consumer.NextSequence = nextSeq; + } + + // Go: consumer.go:1620 — wait for a signal or the poll timeout before re-checking + try + { + using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(ct); + timeoutCts.CancelAfter(250); // Poll every 250ms if no signal arrives + await _signalChannel!.Reader.ReadAsync(timeoutCts.Token).ConfigureAwait(false); + } + catch (OperationCanceledException) when (!ct.IsCancellationRequested) + { + // Timeout — loop again to re-poll the store + } + catch (ChannelClosedException) + { + // Signal channel closed — loop exits via ct check above + } + } + } + + // Go: consumer.go — ShouldDeliver checks cfg.FilterSubject and cfg.FilterSubjects + // against the message subject. An empty filter delivers everything. + private static bool ShouldDeliver(ConsumerConfig config, string subject) + { + if (string.IsNullOrEmpty(config.FilterSubject) && config.FilterSubjects.Count == 0) + return true; + + if (!string.IsNullOrEmpty(config.FilterSubject)) + return SubjectMatch.MatchLiteral(subject, config.FilterSubject); + + return config.FilterSubjects.Any(f => SubjectMatch.MatchLiteral(subject, f)); + } + // Go: consumer.go:5040 — dispatchToDeliver drains the outbound message queue. // For push consumers the dsubj is cfg.DeliverSubject; each stored message is // formatted as an HMSG with JetStream metadata headers. @@ -175,6 +382,7 @@ public sealed class PushConsumerEngine else if (frame.IsFlowControl) { // Go: consumer.go:5501 — "NATS/1.0 100 FlowControl Request\r\n\r\n" + FlowControlPendingCount++; var headers = "NATS/1.0 100 FlowControl Request\r\nNats-Flow-Control: \r\n\r\n"u8.ToArray(); var subject = string.IsNullOrEmpty(deliverSubject) ? "_fc_" : deliverSubject; await sendMessage(subject, string.Empty, headers, ReadOnlyMemory.Empty, ct).ConfigureAwait(false); @@ -224,7 +432,12 @@ public sealed class PushConsumerEngine try { - var headers = "NATS/1.0 100 Idle Heartbeat\r\n\r\n"u8.ToArray(); + // Go: consumer.go:5222 — include Nats-Pending-Messages and Nats-Pending-Bytes headers + var pendingMsgs = _consumer?.AckProcessor.PendingCount ?? 0; + var pendingBytes = _consumer?.PendingBytes ?? 0; + var header = $"NATS/1.0 100 Idle Heartbeat\r\nNats-Pending-Messages: {pendingMsgs}\r\nNats-Pending-Bytes: {pendingBytes}\r\n\r\n"; + var headers = System.Text.Encoding.ASCII.GetBytes(header); + var subject = string.IsNullOrEmpty(DeliverSubject) ? "_hb_" : DeliverSubject; _sendMessage(subject, string.Empty, headers, ReadOnlyMemory.Empty, _externalCt) .AsTask() diff --git a/tests/NATS.Server.Tests/JetStream/Consumers/IdleHeartbeatTests.cs b/tests/NATS.Server.Tests/JetStream/Consumers/IdleHeartbeatTests.cs new file mode 100644 index 0000000..7af49c2 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Consumers/IdleHeartbeatTests.cs @@ -0,0 +1,512 @@ +// Go reference: golang/nats-server/server/consumer.go +// sendIdleHeartbeat ~line 5222, sendFlowControl ~line 5495 +// +// Tests for idle heartbeat pending-count headers (Nats-Pending-Messages, +// Nats-Pending-Bytes) and flow control stall detection. + +using System.Collections.Concurrent; +using System.Text; +using NATS.Server.JetStream; +using NATS.Server.JetStream.Consumers; +using NATS.Server.JetStream.Models; +using NATS.Server.JetStream.Storage; +using NATS.Server.Tests; + +namespace NATS.Server.Tests.JetStream.Consumers; + +public class IdleHeartbeatTests +{ + // Helper: build a ConsumerHandle with the given config + private static ConsumerHandle MakeConsumer(ConsumerConfig config) + => new("TEST-STREAM", config); + + // Helper: build a minimal StoredMessage + private static StoredMessage MakeMessage(ulong seq, string subject = "test.subject", string payload = "hello") + => new() + { + Sequence = seq, + Subject = subject, + Payload = Encoding.UTF8.GetBytes(payload), + TimestampUtc = DateTime.UtcNow, + }; + + // Helper: parse a header value from a NATS header block + // e.g. extract "42" from "Nats-Pending-Messages: 42\r\n" + private static string? ParseHeaderValue(string headers, string headerName) + { + var prefix = headerName + ": "; + var start = headers.IndexOf(prefix, StringComparison.OrdinalIgnoreCase); + if (start < 0) + return null; + + start += prefix.Length; + var end = headers.IndexOf('\r', start); + if (end < 0) + end = headers.Length; + + return headers[start..end].Trim(); + } + + // ========================================================================= + // Test 1 — Heartbeat includes Nats-Pending-Messages header + // + // Go reference: consumer.go:5222 — sendIdleHeartbeat includes pending message + // count in the Nats-Pending-Messages header. + // ========================================================================= + [Fact] + public async Task Heartbeat_includes_pending_messages_header() + { + var engine = new PushConsumerEngine(); + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "HB-PENDING", + Push = true, + DeliverSubject = "deliver.hb", + HeartbeatMs = 50, + AckPolicy = AckPolicy.Explicit, + AckWaitMs = 30_000, + }); + + // Register 3 pending acks so PendingCount == 3 + consumer.AckProcessor.Register(1, 30_000); + consumer.AckProcessor.Register(2, 30_000); + consumer.AckProcessor.Register(3, 30_000); + + ReadOnlyMemory? capturedHeartbeat = null; + var heartbeatReceived = new TaskCompletionSource(); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + + engine.StartDeliveryLoop(consumer, + async (_, _, headers, _, _) => + { + var text = Encoding.ASCII.GetString(headers.Span); + if (text.Contains("Idle Heartbeat") && !heartbeatReceived.Task.IsCompleted) + { + capturedHeartbeat = headers; + heartbeatReceived.TrySetResult(true); + } + await ValueTask.CompletedTask; + }, + cts.Token); + + await heartbeatReceived.Task.WaitAsync(TimeSpan.FromSeconds(5)); + engine.StopDeliveryLoop(); + + capturedHeartbeat.ShouldNotBeNull(); + var headerText = Encoding.ASCII.GetString(capturedHeartbeat!.Value.Span); + headerText.ShouldContain("Nats-Pending-Messages:"); + + var pendingMsgs = ParseHeaderValue(headerText, "Nats-Pending-Messages"); + pendingMsgs.ShouldBe("3"); + } + + // ========================================================================= + // Test 2 — Heartbeat includes Nats-Pending-Bytes header + // + // Go reference: consumer.go:5222 — sendIdleHeartbeat includes pending byte + // count in the Nats-Pending-Bytes header. + // ========================================================================= + [Fact] + public async Task Heartbeat_includes_pending_bytes_header() + { + var engine = new PushConsumerEngine(); + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "HB-BYTES", + Push = true, + DeliverSubject = "deliver.hb2", + HeartbeatMs = 50, + }); + + // Set pending bytes explicitly + consumer.PendingBytes = 4096; + + ReadOnlyMemory? capturedHeartbeat = null; + var heartbeatReceived = new TaskCompletionSource(); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + + engine.StartDeliveryLoop(consumer, + async (_, _, headers, _, _) => + { + var text = Encoding.ASCII.GetString(headers.Span); + if (text.Contains("Idle Heartbeat") && !heartbeatReceived.Task.IsCompleted) + { + capturedHeartbeat = headers; + heartbeatReceived.TrySetResult(true); + } + await ValueTask.CompletedTask; + }, + cts.Token); + + await heartbeatReceived.Task.WaitAsync(TimeSpan.FromSeconds(5)); + engine.StopDeliveryLoop(); + + capturedHeartbeat.ShouldNotBeNull(); + var headerText = Encoding.ASCII.GetString(capturedHeartbeat!.Value.Span); + headerText.ShouldContain("Nats-Pending-Bytes:"); + + var pendingBytes = ParseHeaderValue(headerText, "Nats-Pending-Bytes"); + pendingBytes.ShouldBe("4096"); + } + + // ========================================================================= + // Test 3 — Heartbeat is sent after the idle period elapses + // + // Go reference: consumer.go:5222 — the idle heartbeat timer fires after + // HeartbeatMs milliseconds of inactivity. + // ========================================================================= + [Fact] + public async Task Heartbeat_sent_after_idle_period() + { + var engine = new PushConsumerEngine(); + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "HB-TIMER", + Push = true, + DeliverSubject = "deliver.timer", + HeartbeatMs = 50, + }); + + var heartbeatReceived = new TaskCompletionSource(); + var startedAt = DateTime.UtcNow; + DateTime? receivedAt = null; + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + + // Start loop with no messages — only the timer can fire a heartbeat + engine.StartDeliveryLoop(consumer, + async (_, _, headers, _, _) => + { + var text = Encoding.ASCII.GetString(headers.Span); + if (text.Contains("Idle Heartbeat") && !heartbeatReceived.Task.IsCompleted) + { + receivedAt = DateTime.UtcNow; + heartbeatReceived.TrySetResult(true); + } + await ValueTask.CompletedTask; + }, + cts.Token); + + await heartbeatReceived.Task.WaitAsync(TimeSpan.FromSeconds(5)); + engine.StopDeliveryLoop(); + + receivedAt.ShouldNotBeNull(); + var elapsed = receivedAt!.Value - startedAt; + // The heartbeat timer is 50ms; it must have fired at some point after that + elapsed.TotalMilliseconds.ShouldBeGreaterThan(20); + } + + // ========================================================================= + // Test 4 — Heartbeat counter increments on each idle heartbeat sent + // + // Go reference: consumer.go:5222 — each sendIdleHeartbeat call increments + // the idle heartbeat counter. + // ========================================================================= + [Fact] + public async Task Heartbeat_counter_increments() + { + var engine = new PushConsumerEngine(); + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "HB-COUNT", + Push = true, + DeliverSubject = "deliver.count", + HeartbeatMs = 40, + }); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var heartbeatsReceived = 0; + // Use a semaphore so each heartbeat arrival is explicitly awaited. + var sem = new SemaphoreSlim(0); + + engine.StartDeliveryLoop(consumer, + async (_, _, headers, _, _) => + { + var text = Encoding.ASCII.GetString(headers.Span); + if (text.Contains("Idle Heartbeat")) + { + Interlocked.Increment(ref heartbeatsReceived); + sem.Release(); + } + await ValueTask.CompletedTask; + }, + cts.Token); + + // Wait for at least 2 heartbeat deliveries via the send delegate. + await sem.WaitAsync(cts.Token); + await sem.WaitAsync(cts.Token); + engine.StopDeliveryLoop(); + + // The send delegate counted 2 heartbeats; IdleHeartbeatsSent increments + // after sendMessage returns, so it lags by at most 1. Accept >=1 here + // and rely on heartbeatsReceived (directly in the delegate) for the >=2 assertion. + heartbeatsReceived.ShouldBeGreaterThanOrEqualTo(2); + engine.IdleHeartbeatsSent.ShouldBeGreaterThanOrEqualTo(1); + } + + // ========================================================================= + // Test 5 — Heartbeat shows zero pending when no acks are outstanding + // + // Go reference: consumer.go:5222 — when no messages are pending ack, + // Nats-Pending-Messages should be 0 and Nats-Pending-Bytes should be 0. + // ========================================================================= + [Fact] + public async Task Heartbeat_zero_pending_when_no_acks() + { + var engine = new PushConsumerEngine(); + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "HB-ZERO", + Push = true, + DeliverSubject = "deliver.zero", + HeartbeatMs = 50, + }); + + // No acks registered, PendingBytes stays 0 + + ReadOnlyMemory? capturedHeartbeat = null; + var heartbeatReceived = new TaskCompletionSource(); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + + engine.StartDeliveryLoop(consumer, + async (_, _, headers, _, _) => + { + var text = Encoding.ASCII.GetString(headers.Span); + if (text.Contains("Idle Heartbeat") && !heartbeatReceived.Task.IsCompleted) + { + capturedHeartbeat = headers; + heartbeatReceived.TrySetResult(true); + } + await ValueTask.CompletedTask; + }, + cts.Token); + + await heartbeatReceived.Task.WaitAsync(TimeSpan.FromSeconds(5)); + engine.StopDeliveryLoop(); + + capturedHeartbeat.ShouldNotBeNull(); + var headerText = Encoding.ASCII.GetString(capturedHeartbeat!.Value.Span); + + var pendingMsgs = ParseHeaderValue(headerText, "Nats-Pending-Messages"); + var pendingBytes = ParseHeaderValue(headerText, "Nats-Pending-Bytes"); + + pendingMsgs.ShouldBe("0"); + pendingBytes.ShouldBe("0"); + } + + // ========================================================================= + // Test 6 — Heartbeat reset on data delivery (timer should not fire early) + // + // Go reference: consumer.go:5222 — the idle heartbeat timer is reset on every + // data delivery so that it only fires after a true idle period. + // ========================================================================= + // Task.Delay(50) is intentional: this is a negative-timing assertion that + // verifies no heartbeat fires within 50ms of a 200ms timer reset. There is + // no synchronisation primitive that can assert an event does NOT occur within + // a wall-clock window; the delay is the only correct approach here. + [SlopwatchSuppress("SW004", "Negative timing assertion: verifying heartbeat does NOT fire within 50ms window after 200ms timer reset requires real wall-clock elapsed time")] + [Fact] + public async Task Heartbeat_reset_on_data_delivery() + { + var engine = new PushConsumerEngine(); + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "HB-RESET", + Push = true, + DeliverSubject = "deliver.reset", + HeartbeatMs = 200, // longer interval for this test + }); + + var messages = new ConcurrentBag(); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var dataDelivered = new TaskCompletionSource(); + + engine.StartDeliveryLoop(consumer, + async (_, _, headers, _, _) => + { + var text = Encoding.ASCII.GetString(headers.Span); + messages.Add(text); + if (text.Contains("NATS/1.0\r\n") && !text.Contains("Idle Heartbeat")) + dataDelivered.TrySetResult(true); + await ValueTask.CompletedTask; + }, + cts.Token); + + // Enqueue a data message — this resets the heartbeat timer + engine.Enqueue(consumer, MakeMessage(1)); + await dataDelivered.Task.WaitAsync(TimeSpan.FromSeconds(5)); + + // Record how many heartbeats exist right after data delivery + var heartbeatsAfterData = messages.Count(m => m.Contains("Idle Heartbeat")); + + // Wait a short period — heartbeat timer should NOT have fired again yet (200ms interval) + await Task.Delay(50); + var heartbeatsShortWait = messages.Count(m => m.Contains("Idle Heartbeat")); + + engine.StopDeliveryLoop(); + + // The timer reset should mean no NEW timer heartbeat fired within 50ms + // (the 200ms interval means we'd need to wait ~200ms after the last data delivery) + heartbeatsShortWait.ShouldBe(heartbeatsAfterData); + } + + // ========================================================================= + // Test 7 — Flow control pending count increments on each FC frame sent + // + // Go reference: consumer.go:5495 — each flow control frame sent increments + // the pending count for stall detection. + // ========================================================================= + [Fact] + public async Task FlowControl_pending_count_increments() + { + var engine = new PushConsumerEngine(); + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "FC-INC", + Push = true, + DeliverSubject = "deliver.fc", + FlowControl = true, + }); + + // Release once for each FC frame the delivery loop sends + var fcSem = new SemaphoreSlim(0); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + + engine.StartDeliveryLoop(consumer, + async (_, _, headers, _, _) => + { + var text = Encoding.ASCII.GetString(headers.Span); + if (text.Contains("FlowControl")) + fcSem.Release(); + await ValueTask.CompletedTask; + }, + cts.Token); + + // Enqueue 2 messages — each message with FlowControl=true appends a FC frame + engine.Enqueue(consumer, MakeMessage(1)); + engine.Enqueue(consumer, MakeMessage(2)); + + // Wait until both FC frames have been sent by the delivery loop + await fcSem.WaitAsync(cts.Token); + await fcSem.WaitAsync(cts.Token); + engine.StopDeliveryLoop(); + + // FlowControlPendingCount should have reached at least 2 (one per enqueued message) + engine.FlowControlPendingCount.ShouldBeGreaterThanOrEqualTo(2); + } + + // ========================================================================= + // Test 8 — AcknowledgeFlowControl decrements the pending count + // + // Go reference: consumer.go:5495 — when the subscriber sends a flow control + // acknowledgement, the pending count is decremented. + // ========================================================================= + [Fact] + public async Task FlowControl_acknowledge_decrements_count() + { + var engine = new PushConsumerEngine(); + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "FC-DEC", + Push = true, + DeliverSubject = "deliver.fc2", + FlowControl = true, + }); + + var fcSem = new SemaphoreSlim(0); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + + engine.StartDeliveryLoop(consumer, + async (_, _, headers, _, _) => + { + var text = Encoding.ASCII.GetString(headers.Span); + if (text.Contains("FlowControl")) + fcSem.Release(); + await ValueTask.CompletedTask; + }, + cts.Token); + + // Enqueue 3 messages so 3 FC frames are queued + engine.Enqueue(consumer, MakeMessage(1)); + engine.Enqueue(consumer, MakeMessage(2)); + engine.Enqueue(consumer, MakeMessage(3)); + + // Wait for all 3 FC frames to be sent by the delivery loop + await fcSem.WaitAsync(cts.Token); + await fcSem.WaitAsync(cts.Token); + await fcSem.WaitAsync(cts.Token); + engine.StopDeliveryLoop(); + + var countBefore = engine.FlowControlPendingCount; + countBefore.ShouldBeGreaterThan(0); + + engine.AcknowledgeFlowControl(); + engine.FlowControlPendingCount.ShouldBe(countBefore - 1); + } + + // ========================================================================= + // Test 9 — IsFlowControlStalled returns true when pending >= MaxFlowControlPending + // + // Go reference: consumer.go:5495 — stall detection triggers when the subscriber + // falls too far behind in acknowledging flow control messages. + // ========================================================================= + [Fact] + public async Task FlowControl_stalled_when_pending_exceeds_max() + { + var engine = new PushConsumerEngine(); + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "FC-STALL", + Push = true, + DeliverSubject = "deliver.stall", + FlowControl = true, + }); + + var fcSem = new SemaphoreSlim(0); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + + engine.StartDeliveryLoop(consumer, + async (_, _, headers, _, _) => + { + var text = Encoding.ASCII.GetString(headers.Span); + if (text.Contains("FlowControl")) + fcSem.Release(); + await ValueTask.CompletedTask; + }, + cts.Token); + + // Enqueue MaxFlowControlPending messages to reach the stall threshold + for (var i = 1; i <= PushConsumerEngine.MaxFlowControlPending; i++) + engine.Enqueue(consumer, MakeMessage((ulong)i)); + + // Wait until all FC frames have been sent by the delivery loop + for (var i = 0; i < PushConsumerEngine.MaxFlowControlPending; i++) + await fcSem.WaitAsync(cts.Token); + + engine.StopDeliveryLoop(); + + engine.FlowControlPendingCount.ShouldBeGreaterThanOrEqualTo(PushConsumerEngine.MaxFlowControlPending); + engine.IsFlowControlStalled.ShouldBeTrue(); + } + + // ========================================================================= + // Test 10 — AcknowledgeFlowControl never goes below zero + // + // Go reference: consumer.go:5495 — the pending count should never be negative; + // calling AcknowledgeFlowControl when count is 0 must be a no-op. + // ========================================================================= + [Fact] + public void FlowControl_pending_never_negative() + { + var engine = new PushConsumerEngine(); + + // Count starts at 0; calling Acknowledge should keep it at 0 + engine.FlowControlPendingCount.ShouldBe(0); + engine.AcknowledgeFlowControl(); + engine.FlowControlPendingCount.ShouldBe(0); + + engine.AcknowledgeFlowControl(); + engine.AcknowledgeFlowControl(); + engine.FlowControlPendingCount.ShouldBe(0); + } +}