diff --git a/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs b/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs index eb84e6d..1a80c0a 100644 --- a/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs +++ b/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs @@ -233,7 +233,7 @@ public sealed class PullConsumerEngine // is empty or the consumer has caught up to the end of the stream. if (expiresCts is not null) { - message = await WaitForMessageAsync(stream.Store, sequence, effectiveCt); + message = await WaitForMessageAsync(stream, sequence, effectiveCt); } else { @@ -291,19 +291,21 @@ public sealed class PullConsumerEngine } /// - /// Poll-wait for a message to appear at the given sequence, retrying with a - /// short delay until the cancellation token fires (typically from ExpiresMs). + /// Wait for a message to appear at the given sequence using signal-based wakeup. + /// Publishers call after each append, + /// eliminating the 5ms polling delay. + /// Go reference: consumer.go — channel signaling from publisher to waiting consumer. /// - private static async ValueTask WaitForMessageAsync(IStreamStore store, ulong sequence, CancellationToken ct) + private static async ValueTask WaitForMessageAsync(StreamHandle stream, ulong sequence, CancellationToken ct) { while (!ct.IsCancellationRequested) { - var message = await store.LoadAsync(sequence, ct); + var message = await stream.Store.LoadAsync(sequence, ct); if (message is not null) return message; - // Yield briefly before retrying — the ExpiresMs CTS will cancel when time is up - await Task.Delay(5, ct).ConfigureAwait(false); + // Wait for publisher to signal a new message instead of polling. + await stream.WaitForPublishAsync(ct).ConfigureAwait(false); } ct.ThrowIfCancellationRequested(); diff --git a/src/NATS.Server/JetStream/StreamManager.cs b/src/NATS.Server/JetStream/StreamManager.cs index 2cfc40a..82699d0 100644 --- a/src/NATS.Server/JetStream/StreamManager.cs +++ b/src/NATS.Server/JetStream/StreamManager.cs @@ -457,6 +457,9 @@ public sealed class StreamManager : IDisposable var seq = stream.Store.AppendAsync(storeSubject, payload, default).GetAwaiter().GetResult(); EnforceRuntimePolicies(stream, DateTime.UtcNow); + // Wake up any pull consumers waiting at the stream tail. + stream.NotifyPublish(); + // Only load the stored message when replication is configured (mirror/source). // Avoids unnecessary disk I/O on the hot publish path. if (_mirrorsByOrigin.ContainsKey(stream.Config.Name) || _sourcesByOrigin.ContainsKey(stream.Config.Name)) @@ -507,6 +510,9 @@ public sealed class StreamManager : IDisposable var seq = stream.Store.AppendAsync(storeSubject, newPayload, default).GetAwaiter().GetResult(); EnforceRuntimePolicies(stream, DateTime.UtcNow); + // Wake up any pull consumers waiting at the stream tail. + stream.NotifyPublish(); + if (_mirrorsByOrigin.ContainsKey(stream.Config.Name) || _sourcesByOrigin.ContainsKey(stream.Config.Name)) { var stored = stream.Store.LoadAsync(seq, default).GetAwaiter().GetResult(); @@ -961,4 +967,25 @@ public sealed class StreamManager : IDisposable } } -public sealed record StreamHandle(StreamConfig Config, IStreamStore Store); +public sealed record StreamHandle(StreamConfig Config, IStreamStore Store) +{ + // Signal-based wakeup for pull consumers waiting at the stream tail. + // Go reference: consumer.go — channel signaling from publisher to waiting consumer. + private volatile TaskCompletionSource _publishSignal = new(TaskCreationOptions.RunContinuationsAsynchronously); + + /// + /// Notifies waiting consumers that a new message has been published. + /// + public void NotifyPublish() + { + var old = Interlocked.Exchange(ref _publishSignal, + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously)); + old.TrySetResult(); + } + + /// + /// Waits until a new message is published to this stream. + /// + public Task WaitForPublishAsync(CancellationToken ct) + => _publishSignal.Task.WaitAsync(ct); +} diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 98c6d10..87aea24 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -1694,7 +1694,14 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable var numPending = batch - delivered; var ackReply = BuildAckReply(ackPrefix, message.Sequence, deliverySeq, tsNanos, numPending); - DeliverMessage(inboxSub, message.Subject, ackReply, minHeaders, message.Payload); + // Bypass DeliverMessage — we already know the target client is sender. + // Skip permission check and auto-unsub overhead for JS delivery inbox. + // Go reference: consumer.go — batch delivery with deferred flush. + sender.SendMessageNoFlush(message.Subject, inboxSub.Sid, ackReply, minHeaders, message.Payload); + + // Batch flush every 64 messages to amortize write-loop wakeup cost. + if ((delivered & 63) == 0) + sender.SignalFlush(); if (consumer.Config.AckPolicy is JetStream.Models.AckPolicy.Explicit or JetStream.Models.AckPolicy.All) { @@ -1708,6 +1715,11 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable } else { + // Flush any buffered messages before blocking on the signal. + // Without this, messages queued via SendMessageNoFlush would sit + // in the buffer until the next batch boundary or loop exit. + sender.SignalFlush(); + // No message available — send idle heartbeat if needed if (DateTime.UtcNow - lastDeliveryTime >= hbInterval) { @@ -1719,8 +1731,17 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable lastDeliveryTime = DateTime.UtcNow; } - // Poll briefly before retrying - await Task.Delay(5, ct).ConfigureAwait(false); + // Wait for publisher to signal a new message, with a heartbeat-interval + // timeout so the heartbeat check is re-evaluated periodically. + // Go reference: consumer.go — channel signaling from publisher. + try + { + await streamHandle.WaitForPublishAsync(ct).WaitAsync(hbInterval, ct).ConfigureAwait(false); + } + catch (TimeoutException) + { + // Heartbeat interval elapsed — loop back to re-check + } } } } @@ -1729,13 +1750,16 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable // ExpiresMs timeout — expected } + // Final flush for any remaining batched messages + sender.SignalFlush(); + consumer.NextSequence = sequence; - // Send terminal status + // Send terminal status directly to sender (last message, includes flush) ReadOnlyMemory statusHeader = delivered == 0 ? System.Text.Encoding.UTF8.GetBytes("NATS/1.0 404 No Messages\r\n\r\n") : System.Text.Encoding.UTF8.GetBytes("NATS/1.0 408 Request Timeout\r\n\r\n"); - DeliverMessage(inboxSub, replyTo, null, statusHeader, default); + sender.SendMessage(replyTo, inboxSub.Sid, null, statusHeader, default); } private void DeliverFetchedMessages(Subscription inboxSub, string streamName, string consumerName,