feat(batch39): merge consumer-dispatch

This commit is contained in:
Joseph Doherty
2026-03-01 01:35:20 -05:00
20 changed files with 2219 additions and 2 deletions

View File

@@ -0,0 +1,313 @@
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsConsumer
{
internal (JsPubMsg? Message, ulong DeliveryCount, Exception? Error) GetNextMsg()
{
_mu.EnterWriteLock();
try
{
if (_closed)
return (null, 0, new InvalidOperationException("consumer not valid"));
if (_state.Pending is { Count: > 0 })
{
var sequence = _state.Pending.Keys.Min();
var deliveryCount = IncDeliveryCount(sequence);
var message = new JsPubMsg
{
Subject = Config.DeliverSubject ?? string.Empty,
Reply = AckReply(sequence, _state.Delivered.Consumer + 1, deliveryCount, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), NumPending()),
Msg = [],
};
return (message, deliveryCount, null);
}
return (null, 0, null);
}
finally
{
_mu.ExitWriteLock();
}
}
internal (int Expired, int Waiting, int BatchRequestsPending, DateTime FirstExpiration) ProcessWaiting(bool endOfStream)
{
_mu.EnterWriteLock();
try
{
var firstExpiration = DateTime.MinValue;
if (_waiting is null || _waiting.IsEmpty())
return (0, 0, 0, firstExpiration);
var expired = 0;
var batchRequestsPending = 0;
var now = DateTime.UtcNow;
var toRemove = new List<WaitingRequest>();
foreach (var waitingRequest in _waiting.Snapshot())
{
var isExpired = waitingRequest.Expires is DateTime expiresAt && now >= expiresAt;
if ((endOfStream && waitingRequest.NoWait == 1) || isExpired)
{
toRemove.Add(waitingRequest);
expired++;
continue;
}
batchRequestsPending += Math.Max(0, waitingRequest.N);
if (waitingRequest.Expires is DateTime wrExpires &&
(firstExpiration == DateTime.MinValue || wrExpires < firstExpiration))
{
firstExpiration = wrExpires;
}
}
foreach (var waitingRequest in toRemove)
_waiting.Remove(null, waitingRequest);
return (expired, _waiting.Len, batchRequestsPending, firstExpiration);
}
finally
{
_mu.ExitWriteLock();
}
}
internal bool CheckWaitingForInterest()
{
var (_, waiting, _, _) = ProcessWaiting(endOfStream: true);
return waiting > 0;
}
internal (TimeSpan Duration, Timer? Timer) HbTimer()
{
if (Config.Heartbeat <= TimeSpan.Zero)
return (TimeSpan.Zero, null);
return (Config.Heartbeat, new Timer(static _ => { }, null, Config.Heartbeat, Timeout.InfiniteTimeSpan));
}
internal void CheckAckFloor()
{
_mu.EnterWriteLock();
try
{
if (_closed || _state.Pending is not { Count: > 0 })
return;
var minPending = _state.Pending.OrderBy(static pair => pair.Key).First();
var pendingStream = minPending.Key;
var pendingConsumer = minPending.Value?.Sequence ?? pendingStream;
var desiredStreamFloor = pendingStream > 0 ? pendingStream - 1 : 0;
var desiredConsumerFloor = pendingConsumer > 0 ? pendingConsumer - 1 : 0;
if (_state.AckFloor.Stream < desiredStreamFloor)
_state.AckFloor.Stream = desiredStreamFloor;
if (_state.AckFloor.Consumer < desiredConsumerFloor)
_state.AckFloor.Consumer = desiredConsumerFloor;
}
finally
{
_mu.ExitWriteLock();
}
}
internal int ProcessInboundAcks(CancellationToken cancellationToken)
{
var processed = 0;
while (!cancellationToken.IsCancellationRequested)
{
JsAckMsg? ack;
_mu.EnterWriteLock();
try
{
if (_ackQueue.Count == 0)
break;
ack = _ackQueue.Dequeue();
}
finally
{
_mu.ExitWriteLock();
}
ProcessAck(ack.Subject, ack.Reply, ack.HeaderBytes, ack.Msg);
processed++;
}
return processed;
}
internal int ProcessInboundNextMsgReqs(CancellationToken cancellationToken)
{
var processed = 0;
while (!cancellationToken.IsCancellationRequested)
{
NextMsgReq? request;
_mu.EnterWriteLock();
try
{
if (_nextMsgReqs is null || _nextMsgReqs.Count == 0)
break;
request = _nextMsgReqs.Dequeue();
}
finally
{
_mu.ExitWriteLock();
}
_ = ProcessNextMsgRequest(request.Reply, request.Message);
request.ReturnToPool();
processed++;
}
return processed;
}
internal void SuppressDeletion()
{
_mu.EnterWriteLock();
try
{
if (_closed || _deleteTimer is null || _deleteThreshold <= TimeSpan.Zero)
return;
_deleteTimer.Change(_deleteThreshold, Timeout.InfiniteTimeSpan);
}
finally
{
_mu.ExitWriteLock();
}
}
internal int LoopAndGatherMsgs(int maxIterations, CancellationToken cancellationToken)
{
var delivered = 0;
for (var i = 0; i < maxIterations && !cancellationToken.IsCancellationRequested; i++)
{
var (message, _, error) = GetNextMsg();
if (error is not null || message is null)
break;
delivered++;
_state.Delivered.Stream = Math.Max(_state.Delivered.Stream, (ulong)delivered);
_state.Delivered.Consumer = Math.Max(_state.Delivered.Consumer, (ulong)delivered);
var expired = ProcessWaiting(endOfStream: false);
if (expired.Waiting == 0)
break;
}
return delivered;
}
internal string SendIdleHeartbeat(string subject)
{
var streamSequence = _state.Delivered.Stream;
var consumerSequence = _state.Delivered.Consumer;
var heartbeat = $"NATS/1.0 100 Idle Heartbeat\r\nNats-Last-Consumer: {consumerSequence}\r\nNats-Last-Stream: {streamSequence}\r\n\r\n";
_ = SendAdvisory(subject, heartbeat);
return heartbeat;
}
internal string AckReply(ulong streamSequence, ulong deliverySequence, ulong deliveryCount, long timestamp, ulong pending) =>
$"$JS.ACK.{deliveryCount}.{streamSequence}.{deliverySequence}.{timestamp}.{pending}";
internal void SetMaxPendingBytes(int limit)
{
_mu.EnterWriteLock();
try
{
_maxPendingBytesLimit = Math.Max(0, limit);
_maxPendingBytesThreshold = Math.Max(1, _maxPendingBytesLimit / 16);
}
finally
{
_mu.ExitWriteLock();
}
}
internal (ulong Pending, Exception? Error) CheckNumPending()
{
_mu.EnterWriteLock();
try
{
if (_npc < 0)
_npc = 0;
var (pending, floor, error) = CalculateNumPending();
if (error is not null)
return (0, error);
_npc = (long)pending;
_npf = floor;
return (NumPending(), null);
}
finally
{
_mu.ExitWriteLock();
}
}
internal ulong NumPending() => _npc < 0 ? 0UL : (ulong)_npc;
internal void CheckNumPendingOnEOF()
{
_mu.EnterWriteLock();
try
{
if (_npc < 0)
_npc = 0;
if (_state.Delivered.Stream <= _state.AckFloor.Stream)
{
_npc = 0;
_npf = _state.Delivered.Stream;
}
}
finally
{
_mu.ExitWriteLock();
}
}
internal (ulong Pending, Exception? Error) StreamNumPendingLocked()
{
_mu.EnterWriteLock();
try
{
return StreamNumPending();
}
finally
{
_mu.ExitWriteLock();
}
}
internal (ulong Pending, Exception? Error) StreamNumPending()
{
var (pending, floor, error) = CalculateNumPending();
if (error is not null)
return (0, error);
_npc = (long)pending;
_npf = floor;
return (NumPending(), null);
}
internal (ulong Pending, ulong Floor, Exception? Error) CalculateNumPending()
{
if (_closed)
return (0, 0, null);
var delivered = _state.Delivered.Stream;
var acked = _state.AckFloor.Stream;
if (delivered <= acked)
return (0, delivered, null);
return (delivered - acked, delivered, null);
}
}

View File

@@ -0,0 +1,161 @@
using System.Text;
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsConsumer
{
internal static void ConvertToHeadersOnly(JsPubMsg message)
{
ArgumentNullException.ThrowIfNull(message);
var payloadSize = message.Msg?.Length ?? 0;
var builder = new StringBuilder();
builder.Append("NATS/1.0\r\n");
builder.Append("Nats-Msg-Size: ");
builder.Append(payloadSize);
builder.Append("\r\n\r\n");
message.Hdr = Encoding.ASCII.GetBytes(builder.ToString());
message.Msg = [];
}
internal void DeliverMsg(string deliverySubject, string ackReply, JsPubMsg message, ulong deliveryCount, RetentionPolicy retentionPolicy)
{
ArgumentNullException.ThrowIfNull(message);
_mu.EnterWriteLock();
try
{
var nextDeliverySequence = _state.Delivered.Consumer + 1;
var streamSequence = ParseAckReplyNum(ackReply.Split('.', StringSplitOptions.RemoveEmptyEntries).LastOrDefault() ?? string.Empty);
var streamSeq = streamSequence > 0 ? (ulong)streamSequence : _state.Delivered.Stream + 1;
_state.Delivered.Consumer = nextDeliverySequence;
_state.Delivered.Stream = streamSeq;
if (Config.AckPolicy is AckPolicy.AckExplicit or AckPolicy.AckAll)
TrackPending(streamSeq, nextDeliverySequence);
else
_state.AckFloor = new SequencePair { Consumer = nextDeliverySequence, Stream = streamSeq };
message.Subject = deliverySubject;
message.Reply = ackReply;
Interlocked.Add(ref _pendingBytes, message.Size());
if (NeedFlowControl(message.Size()))
SendFlowControl();
if (retentionPolicy != RetentionPolicy.LimitsPolicy && Config.AckPolicy == AckPolicy.AckNone)
_state.AckFloor = new SequencePair { Consumer = nextDeliverySequence, Stream = streamSeq };
_ = deliveryCount;
}
finally
{
_mu.ExitWriteLock();
}
}
internal bool ReplicateDeliveries() =>
Config.AckPolicy != AckPolicy.AckNone && Config.FlowControl == false && IsLeader();
internal bool NeedFlowControl(int size)
{
if (_flowControlWindow <= 0)
return false;
if (string.IsNullOrEmpty(_flowControlReplyId) && _pendingBytes > _flowControlWindow / 2)
return true;
if (!string.IsNullOrEmpty(_flowControlReplyId) && _pendingBytes - _flowControlSentBytes >= _flowControlWindow)
_flowControlSentBytes += size;
return false;
}
internal void ProcessFlowControl(string subject)
{
_mu.EnterWriteLock();
try
{
if (!string.Equals(subject, _flowControlReplyId, StringComparison.Ordinal))
return;
if (_flowControlWindow > 0 && _flowControlWindow < _maxPendingBytesLimit)
_flowControlWindow = Math.Min(_flowControlWindow * 2, Math.Max(1, _maxPendingBytesLimit));
_pendingBytes = Math.Max(0, _pendingBytes - _flowControlSentBytes);
_flowControlSentBytes = 0;
_flowControlReplyId = string.Empty;
SignalNewMessages();
}
finally
{
_mu.ExitWriteLock();
}
}
internal string FcReply() =>
$"$JS.FC.{Stream}.{Name}.{Random.Shared.Next(1000, 9999)}";
internal bool SendFlowControl()
{
_mu.EnterWriteLock();
try
{
if (!IsPushMode())
return false;
var reply = FcReply();
_flowControlReplyId = reply;
_flowControlSentBytes = (int)Math.Max(0, _pendingBytes);
return true;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void TrackPending(ulong streamSequence, ulong deliverySequence)
{
_state.Pending ??= [];
if (_state.Pending.TryGetValue(streamSequence, out var pending))
{
pending.Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
_state.Pending[streamSequence] = pending;
}
else
{
_state.Pending[streamSequence] = new Pending
{
Sequence = deliverySequence,
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
};
}
}
internal void CreditWaitingRequest(string reply)
{
_mu.EnterWriteLock();
try
{
if (_waiting is null)
return;
foreach (var waitingRequest in _waiting.Snapshot())
{
if (!string.Equals(waitingRequest.Reply, reply, StringComparison.Ordinal))
continue;
waitingRequest.N++;
waitingRequest.D = Math.Max(0, waitingRequest.D - 1);
return;
}
}
finally
{
_mu.ExitWriteLock();
}
}
}

View File

@@ -0,0 +1,141 @@
using System.Collections.Concurrent;
using System.Text;
using System.Text.Json;
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsConsumer
{
internal static NextMsgReq NewNextMsgReq(string reply, byte[] message) => NextMsgReq.Rent(reply, message);
internal void ProcessNextMsgReq(string reply, byte[] rawMessage)
{
if (string.IsNullOrWhiteSpace(reply))
return;
ArgumentNullException.ThrowIfNull(rawMessage);
_mu.EnterWriteLock();
try
{
if (_nextMsgReqs is null)
return;
_nextMsgReqs.Enqueue(NewNextMsgReq(reply, (byte[])rawMessage.Clone()));
}
finally
{
_mu.ExitWriteLock();
}
}
internal (ulong Sequence, bool CanRespond, Exception? Error) ProcessResetReq(string reply, byte[] rawMessage)
{
if (string.IsNullOrWhiteSpace(reply))
return (0, false, null);
ArgumentNullException.ThrowIfNull(rawMessage);
if (rawMessage.Length == 0)
return ResetStartingSeq(0, reply);
try
{
var request = JsonSerializer.Deserialize<JsApiConsumerResetRequest>(Encoding.UTF8.GetString(rawMessage));
return ResetStartingSeq(request?.Seq ?? 0, reply);
}
catch (Exception ex)
{
return (0, false, ex);
}
}
internal bool ProcessNextMsgRequest(string reply, byte[] message)
{
if (string.IsNullOrWhiteSpace(reply))
return false;
ArgumentNullException.ThrowIfNull(message);
_mu.EnterWriteLock();
try
{
if (IsPushMode() || _waiting is null)
return false;
var (request, error) = NextReqFromMsg(message);
if (error is not null || request is null)
return false;
var batchSize = Math.Max(1, request.Batch);
if (Config.MaxRequestBatch > 0 && batchSize > Config.MaxRequestBatch)
return false;
if (request.MaxBytes > 0 && Config.MaxRequestMaxBytes > 0 && request.MaxBytes > Config.MaxRequestMaxBytes)
return false;
if (request.Expires > TimeSpan.Zero && Config.MaxRequestExpires > TimeSpan.Zero && request.Expires > Config.MaxRequestExpires)
return false;
var expires = request.Expires > TimeSpan.Zero ? DateTime.UtcNow.Add(request.Expires) : (DateTime?)null;
if (_waiting.IsFull(Config.MaxWaiting))
return false;
var waitingRequest = new WaitingRequest
{
Subject = reply,
Reply = reply,
N = batchSize,
D = 0,
NoWait = request.NoWait ? 1 : 0,
Expires = expires,
MaxBytes = Math.Max(0, request.MaxBytes),
B = 0,
PriorityGroup = request.Priority,
};
if (Config.PriorityPolicy == PriorityPolicy.PriorityPrioritized)
{
if (!_waiting.AddPrioritized(waitingRequest))
return false;
}
else
{
_waiting.Add(waitingRequest);
}
SignalNewMessages();
return true;
}
finally
{
_mu.ExitWriteLock();
}
}
}
internal sealed class NextMsgReq
{
private static readonly ConcurrentBag<NextMsgReq> Pool = [];
internal string Reply { get; private set; } = string.Empty;
internal byte[] Message { get; private set; } = [];
internal static NextMsgReq Rent(string reply, byte[] message)
{
ArgumentNullException.ThrowIfNull(message);
if (!Pool.TryTake(out var request))
request = new NextMsgReq();
request.Reply = reply;
request.Message = message;
return request;
}
internal void ReturnToPool()
{
Reply = string.Empty;
Message = [];
Pool.Add(this);
}
}

View File

@@ -0,0 +1,156 @@
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsConsumer
{
internal void DidNotDeliver(ulong sequence, string subject)
{
_mu.EnterWriteLock();
try
{
DecDeliveryCount(sequence);
if (IsPushMode())
_hasLocalDeliveryInterest = false;
else
CreditWaitingRequest(subject);
if (_state.Pending is { } pending && pending.ContainsKey(sequence) && !OnRedeliverQueue(sequence))
{
AddToRedeliverQueue(sequence);
if (_waiting is { } waiting && !waiting.IsEmpty())
SignalNewMessages();
}
}
finally
{
_mu.ExitWriteLock();
}
}
internal void AddToRedeliverQueue(params ulong[] sequences)
{
_mu.EnterWriteLock();
try
{
foreach (var sequence in sequences)
{
_redeliveryQueue.Enqueue(sequence);
_redeliveryIndex.Add(sequence);
}
}
finally
{
_mu.ExitWriteLock();
}
}
internal bool HasRedeliveries() => _redeliveryQueue.Count > 0;
internal ulong GetNextToRedeliver()
{
_mu.EnterWriteLock();
try
{
if (_redeliveryQueue.Count == 0)
return 0;
var sequence = _redeliveryQueue.Dequeue();
_redeliveryIndex.Remove(sequence);
return sequence;
}
finally
{
_mu.ExitWriteLock();
}
}
internal bool OnRedeliverQueue(ulong sequence) => _redeliveryIndex.Contains(sequence);
internal bool RemoveFromRedeliverQueue(ulong sequence)
{
_mu.EnterWriteLock();
try
{
if (!_redeliveryIndex.Remove(sequence))
return false;
if (_redeliveryQueue.Count == 0)
return true;
var retained = _redeliveryQueue.Where(s => s != sequence).ToArray();
_redeliveryQueue.Clear();
foreach (var s in retained)
_redeliveryQueue.Enqueue(s);
return true;
}
finally
{
_mu.ExitWriteLock();
}
}
internal int CheckPending()
{
_mu.EnterWriteLock();
try
{
if (_state.Pending is not { Count: > 0 })
return 0;
var expired = 0;
var cutoff = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - (long)Math.Max(1, Config.AckWait.TotalMilliseconds);
var toRedeliver = new List<ulong>();
foreach (var (sequence, pending) in _state.Pending.ToArray())
{
if (pending.Timestamp < cutoff)
{
toRedeliver.Add(sequence);
expired++;
}
}
if (toRedeliver.Count > 0)
{
toRedeliver.Sort();
AddToRedeliverQueue(toRedeliver.ToArray());
SignalNewMessages();
}
return expired;
}
finally
{
_mu.ExitWriteLock();
}
}
internal ulong SeqFromReply(string reply)
{
var (_, deliverySequence, _) = AckReplyInfo(reply);
return deliverySequence;
}
internal ulong StreamSeqFromReply(string reply)
{
var (streamSequence, _, _) = AckReplyInfo(reply);
return streamSequence;
}
internal static long ParseAckReplyNum(string token)
{
if (string.IsNullOrWhiteSpace(token))
return -1;
long number = 0;
foreach (var character in token)
{
if (!char.IsAsciiDigit(character))
return -1;
number = (number * 10) + (character - '0');
}
return number;
}
}

View File

@@ -0,0 +1,118 @@
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsConsumer
{
internal static (ulong StreamSequence, ulong DeliverySequence, ulong DeliveryCount, long Timestamp, ulong Pending) ReplyInfo(string subject)
{
if (string.IsNullOrWhiteSpace(subject))
return (0, 0, 0, 0, 0);
var tokens = subject.Split('.', StringSplitOptions.RemoveEmptyEntries);
if (tokens.Length < 9 || !string.Equals(tokens[0], "$JS", StringComparison.Ordinal) || !string.Equals(tokens[1], "ACK", StringComparison.Ordinal))
return (0, 0, 0, 0, 0);
var deliveryCount = (ulong)Math.Max(0, ParseAckReplyNum(tokens[4]));
var streamSequence = (ulong)Math.Max(0, ParseAckReplyNum(tokens[5]));
var deliverySequence = (ulong)Math.Max(0, ParseAckReplyNum(tokens[6]));
var timestamp = ParseAckReplyNum(tokens[7]);
var pending = (ulong)Math.Max(0, ParseAckReplyNum(tokens[8]));
return (streamSequence, deliverySequence, deliveryCount, timestamp, pending);
}
internal ulong NextSeq()
{
_mu.EnterReadLock();
try
{
return _state.Delivered.Consumer + 1;
}
finally
{
_mu.ExitReadLock();
}
}
internal bool HasSkipListPending() => false;
internal void SelectStartingSeqNo()
{
_mu.EnterWriteLock();
try
{
var start = Config.OptStartSeq > 0 ? Config.OptStartSeq : 1UL;
_state.Delivered = new SequencePair { Consumer = 1, Stream = start };
_state.AckFloor = new SequencePair { Consumer = 0, Stream = start > 0 ? start - 1 : 0 };
_npc = 0;
_npf = _state.AckFloor.Stream;
}
finally
{
_mu.ExitWriteLock();
}
}
internal static bool IsDurableConsumer(ConsumerConfig? config) =>
config is not null && !string.IsNullOrWhiteSpace(config.Durable);
internal bool IsDurable() => !string.IsNullOrWhiteSpace(Config.Durable);
internal string String() => Name;
internal static string CreateConsumerName() => Guid.NewGuid().ToString("N")[..12];
internal NatsStream? GetStream()
{
_mu.EnterReadLock();
try
{
return _streamRef;
}
finally
{
_mu.ExitReadLock();
}
}
internal string StreamName() => GetStream()?.Name ?? string.Empty;
internal bool IsActive()
{
_mu.EnterReadLock();
try
{
return !_closed && (_hasLocalDeliveryInterest || IsPullMode());
}
finally
{
_mu.ExitReadLock();
}
}
internal bool HasNoLocalInterest() => !HasDeliveryInterest(localInterest: true);
internal void Purge()
{
_mu.EnterWriteLock();
try
{
_state.Pending?.Clear();
_state.Redelivered?.Clear();
_redeliveryQueue.Clear();
_redeliveryIndex.Clear();
_npc = 0;
}
finally
{
_mu.ExitWriteLock();
}
}
internal static Timer? StopAndClearTimer(Timer? timer)
{
timer?.Dispose();
return null;
}
internal void DeleteWithoutAdvisory() => Stop();
}

View File

@@ -0,0 +1,222 @@
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsConsumer
{
internal void StopWithFlags(bool clearPending, bool clearAdvisories)
{
_mu.EnterWriteLock();
try
{
_closed = true;
_quitCts?.Cancel();
_deleteTimer = StopAndClearTimer(_deleteTimer);
_pendingTimer = StopAndClearTimer(_pendingTimer);
if (clearPending)
ResetPendingDeliveries();
if (!clearAdvisories)
_ = SendDeleteAdvisoryLocked();
}
finally
{
_mu.ExitWriteLock();
}
}
internal int CleanupNoInterestMessages()
{
_mu.EnterWriteLock();
try
{
if (_state.Pending is not { Count: > 0 })
return 0;
var removed = _state.Pending.Count;
_state.Pending.Clear();
_streamPending = 0;
return removed;
}
finally
{
_mu.ExitWriteLock();
}
}
internal static bool DeliveryFormsCycle(string subject, string deliverSubject) =>
!string.IsNullOrWhiteSpace(subject) &&
!string.IsNullOrWhiteSpace(deliverSubject) &&
subject.StartsWith(deliverSubject, StringComparison.Ordinal);
internal bool SwitchToEphemeral()
{
_mu.EnterWriteLock();
try
{
if (string.IsNullOrWhiteSpace(Config.Durable))
return false;
Config.Durable = null;
Name = CreateConsumerName();
return true;
}
finally
{
_mu.ExitWriteLock();
}
}
internal string RequestNextMsgSubject() =>
$"$JS.API.CONSUMER.MSG.NEXT.{Stream}.{Name}";
internal long DecStreamPending()
{
_mu.EnterWriteLock();
try
{
_streamPending = Math.Max(0, _streamPending - 1);
return _streamPending;
}
finally
{
_mu.ExitWriteLock();
}
}
internal Account? Account() => GetStream()?.Account;
internal void SignalSubs() => SignalNewMessages();
internal bool ProcessStreamSignal(string subject, ulong sequence)
{
_ = subject;
_mu.EnterWriteLock();
try
{
if (_closed)
return false;
_state.Delivered.Stream = Math.Max(_state.Delivered.Stream, sequence);
SignalNewMessages();
return true;
}
finally
{
_mu.ExitWriteLock();
}
}
internal static bool SubjectSliceEqual(string[] left, string[] right)
{
if (ReferenceEquals(left, right))
return true;
if (left.Length != right.Length)
return false;
for (var i = 0; i < left.Length; i++)
{
if (!string.Equals(left[i], right[i], StringComparison.Ordinal))
return false;
}
return true;
}
internal static string[] GatherSubjectFilters(ConsumerConfig config)
{
ArgumentNullException.ThrowIfNull(config);
if (config.FilterSubjects is { Length: > 0 })
return config.FilterSubjects.Where(s => !string.IsNullOrWhiteSpace(s)).ToArray();
if (!string.IsNullOrWhiteSpace(config.FilterSubject))
return [config.FilterSubject!];
return [];
}
internal bool ShouldStartMonitor()
{
_mu.EnterReadLock();
try
{
return !_closed && !_monitorRunning && (Config.InactiveThreshold > TimeSpan.Zero || IsPushMode());
}
finally
{
_mu.ExitReadLock();
}
}
internal void ClearMonitorRunning()
{
_mu.EnterWriteLock();
try
{
_monitorRunning = false;
}
finally
{
_mu.ExitWriteLock();
}
}
internal bool IsMonitorRunning()
{
_mu.EnterReadLock();
try
{
return _monitorRunning;
}
finally
{
_mu.ExitReadLock();
}
}
internal bool CheckStateForInterestStream()
{
_mu.EnterReadLock();
try
{
return _state.Pending is { Count: > 0 } || HasDeliveryInterest();
}
finally
{
_mu.ExitReadLock();
}
}
internal void ResetPtmr(TimeSpan due)
{
_mu.EnterWriteLock();
try
{
_pendingTimer ??= new Timer(static s => ((NatsConsumer)s!).CheckPending(), this, Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
if (due <= TimeSpan.Zero)
due = TimeSpan.FromMilliseconds(1);
_pendingTimer.Change(due, Timeout.InfiniteTimeSpan);
}
finally
{
_mu.ExitWriteLock();
}
}
internal void StopAndClearPtmr()
{
_mu.EnterWriteLock();
try
{
_pendingTimer = StopAndClearTimer(_pendingTimer);
}
finally
{
_mu.ExitWriteLock();
}
}
internal void ResetPendingDeliveries()
{
_state.Pending?.Clear();
_state.Redelivered?.Clear();
_redeliveryQueue.Clear();
_redeliveryIndex.Clear();
_npc = 0;
}
}

View File

@@ -0,0 +1,261 @@
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsConsumer
{
internal static void InsertAtPosition(WaitingRequest request, WaitQueue waitQueue)
{
ArgumentNullException.ThrowIfNull(request);
ArgumentNullException.ThrowIfNull(waitQueue);
waitQueue.InsertSorted(request);
}
internal Dictionary<string, WaitingRequest> PendingRequests()
{
_mu.EnterReadLock();
try
{
if (_waiting is null || _waiting.IsEmpty())
return [];
var requests = new Dictionary<string, WaitingRequest>(StringComparer.Ordinal);
foreach (var waitingRequest in _waiting.Snapshot())
{
if (!string.IsNullOrWhiteSpace(waitingRequest.Reply))
requests[waitingRequest.Reply] = waitingRequest;
}
return requests;
}
finally
{
_mu.ExitReadLock();
}
}
internal void SetPinnedTimer(string priorityGroup)
{
if (Config.PinnedTTL <= TimeSpan.Zero)
return;
_pinnedTtlTimer ??= new Timer(
state =>
{
var consumer = (NatsConsumer)state!;
consumer._mu.EnterWriteLock();
try
{
if (string.IsNullOrEmpty(consumer._currentPinId))
return;
consumer.UnassignPinId();
consumer.SendUnpinnedAdvisoryLocked(priorityGroup);
}
finally
{
consumer._mu.ExitWriteLock();
}
consumer.SignalNewMessages();
},
this,
Timeout.InfiniteTimeSpan,
Timeout.InfiniteTimeSpan);
_pinnedTtlTimer.Change(Config.PinnedTTL, Timeout.InfiniteTimeSpan);
}
internal void AssignNewPinId(WaitingRequest request)
{
ArgumentNullException.ThrowIfNull(request);
if (request.PriorityGroup is not { Group.Length: > 0 } priorityGroup)
return;
_currentPinId = Guid.NewGuid().ToString("N");
_pinnedTs = DateTime.UtcNow;
priorityGroup.Id = _currentPinId;
SetPinnedTimer(priorityGroup.Group);
SendPinnedAdvisoryLocked(priorityGroup.Group);
}
internal void UnassignPinId()
{
_currentPinId = string.Empty;
_pinnedTs = DateTime.UnixEpoch;
_pinnedTtlTimer?.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
}
internal WaitingRequest? NextWaiting(int messageSize)
{
_mu.EnterWriteLock();
try
{
if (_waiting is null || _waiting.IsEmpty())
return null;
var numCycled = 0;
while (!_waiting.IsEmpty())
{
var waitingRequest = _waiting.Peek();
if (waitingRequest is null)
return null;
if (waitingRequest.Expires is DateTime expiresAt && expiresAt <= DateTime.UtcNow)
{
_waiting.RemoveCurrent();
waitingRequest.Recycle();
continue;
}
if (waitingRequest.MaxBytes > 0)
{
if (messageSize > waitingRequest.MaxBytes)
{
_waiting.RemoveCurrent();
waitingRequest.Recycle();
continue;
}
waitingRequest.MaxBytes -= messageSize;
if (waitingRequest.MaxBytes == 0)
waitingRequest.N = 1;
}
if (Config.PriorityPolicy == PriorityPolicy.PriorityPinnedClient)
{
if (string.IsNullOrEmpty(_currentPinId))
{
if (string.IsNullOrEmpty(waitingRequest.PriorityGroup?.Id))
AssignNewPinId(waitingRequest);
}
else if (waitingRequest.PriorityGroup is { } priorityGroup)
{
if (string.Equals(priorityGroup.Id, _currentPinId, StringComparison.Ordinal))
{
// Matched the active pin, continue.
}
else if (string.IsNullOrEmpty(priorityGroup.Id))
{
_waiting.Cycle();
numCycled++;
if (numCycled >= _waiting.Len)
return null;
continue;
}
else
{
_waiting.RemoveCurrent();
waitingRequest.Recycle();
continue;
}
}
}
return _waiting.PopOrPopAndRequeue(Config.PriorityPolicy);
}
return null;
}
finally
{
_mu.ExitWriteLock();
}
}
internal static (Account Account, string Interest) TrackDownAccountAndInterest(Account account, string interest)
{
ArgumentNullException.ThrowIfNull(account);
return (account, interest);
}
internal ulong DeliveryCount(ulong streamSequence)
{
_mu.EnterReadLock();
try
{
if (_rdc is null)
return 1;
return _rdc.TryGetValue(streamSequence, out var count) && count >= 1 ? count : 1;
}
finally
{
_mu.ExitReadLock();
}
}
internal ulong IncDeliveryCount(ulong streamSequence)
{
_mu.EnterWriteLock();
try
{
_rdc ??= [];
_rdc[streamSequence] = _rdc.GetValueOrDefault(streamSequence) + 1;
return _rdc[streamSequence] + 1;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void DecDeliveryCount(ulong streamSequence)
{
_mu.EnterWriteLock();
try
{
_rdc ??= [];
_rdc[streamSequence] = _rdc.GetValueOrDefault(streamSequence) - 1;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void NotifyDeliveryExceeded(ulong streamSequence, ulong deliveryCount)
{
var advisory = new
{
Stream,
Consumer = Name,
StreamSeq = streamSequence,
Deliveries = deliveryCount,
Timestamp = DateTime.UtcNow,
};
_ = SendAdvisory($"{JsApiSubjects.JsAdvisoryConsumerMaxDelivery}.{Stream}.{Name}", advisory);
}
internal bool IsFilteredMatch(string subject)
{
if (Config.FilterSubjects is not { Length: > 0 } && string.IsNullOrWhiteSpace(Config.FilterSubject))
return true;
if (!string.IsNullOrWhiteSpace(Config.FilterSubject) && SubjectIsExactOrSubsetMatch(subject, Config.FilterSubject!))
return true;
if (Config.FilterSubjects is not { Length: > 0 })
return false;
return Config.FilterSubjects.Any(filter => SubjectIsExactOrSubsetMatch(subject, filter));
}
internal bool IsEqualOrSubsetMatch(string subject)
{
if (Config.FilterSubjects is not { Length: > 0 } && string.IsNullOrWhiteSpace(Config.FilterSubject))
return false;
if (!string.IsNullOrWhiteSpace(Config.FilterSubject) && SubjectIsExactOrSubsetMatch(Config.FilterSubject!, subject))
return true;
if (Config.FilterSubjects is not { Length: > 0 })
return false;
return Config.FilterSubjects.Any(filter => SubjectIsExactOrSubsetMatch(filter, subject));
}
private static bool SubjectIsExactOrSubsetMatch(string subject, string filter) =>
string.Equals(subject, filter, StringComparison.Ordinal) || SubscriptionIndex.SubjectIsSubsetMatch(subject, filter);
}

View File

@@ -34,7 +34,12 @@ internal sealed partial class NatsConsumer
if (!interest && _deleteThreshold > TimeSpan.Zero)
{
_deleteTimer = new Timer(static s => ((NatsConsumer)s!).DeleteNotActive(), this, _deleteThreshold, Timeout.InfiniteTimeSpan);
var due = _deleteThreshold < TimeSpan.FromMilliseconds(50)
? TimeSpan.FromMilliseconds(1)
: _deleteThreshold;
_deleteTimer = new Timer(static s => ((NatsConsumer)s!).DeleteNotActive(), this, due, Timeout.InfiniteTimeSpan);
if (due <= TimeSpan.FromMilliseconds(1))
DeleteNotActive();
return true;
}

View File

@@ -52,6 +52,25 @@ internal sealed partial class NatsConsumer : IDisposable
private readonly Dictionary<ulong, string> _ackReplies = new();
private readonly Dictionary<string, DateTime> _clusterPendingRequests = new(StringComparer.Ordinal);
private bool _pendingRequestsOk;
private WaitQueue? _waiting;
private Queue<NextMsgReq>? _nextMsgReqs;
private Timer? _pinnedTtlTimer;
private string _currentPinId = string.Empty;
private DateTime _pinnedTs;
private Dictionary<ulong, ulong>? _rdc;
private long _npc;
private ulong _npf;
private int _maxPendingBytesLimit;
private int _maxPendingBytesThreshold;
private long _pendingBytes;
private int _flowControlWindow;
private int _flowControlSentBytes;
private string _flowControlReplyId = string.Empty;
private readonly Queue<ulong> _redeliveryQueue = new();
private readonly HashSet<ulong> _redeliveryIndex = new();
private bool _monitorRunning;
private long _streamPending;
private Timer? _pendingTimer;
/// <summary>IRaftNode — stored as object to avoid cross-dependency on Raft session.</summary>
private object? _node;
@@ -66,6 +85,8 @@ internal sealed partial class NatsConsumer : IDisposable
Config = config;
Created = created;
_quitCts = new CancellationTokenSource();
_waiting = WaitQueue.NewWaitQueue(Math.Max(0, config.MaxWaiting));
_nextMsgReqs = IsPullMode() ? new Queue<NextMsgReq>() : null;
}
// -------------------------------------------------------------------------

View File

@@ -227,6 +227,20 @@ internal sealed partial class NatsStream
}
}
internal Exception? DeleteConsumer(NatsConsumer consumer)
{
ArgumentNullException.ThrowIfNull(consumer);
lock (_consumersSync)
{
_consumers.Remove(consumer.Name);
_consumerList.RemoveAll(c => ReferenceEquals(c, consumer));
}
consumer.DeleteWithoutAdvisory();
return null;
}
internal void SwapSigSubs(NatsConsumer consumer, string[]? newFilters)
{
_ = consumer;

View File

@@ -706,6 +706,20 @@ public sealed class WaitQueue
return Len >= max;
}
/// <summary>
/// Returns an ordered snapshot of active queue entries from head to tail.
/// </summary>
public IReadOnlyList<WaitingRequest> Snapshot()
{
if (Len == 0)
return [];
var snapshot = new List<WaitingRequest>(Len);
for (var i = _head; i < _tail; i++)
snapshot.Add(_reqs[i]);
return snapshot;
}
private static int PriorityOf(WaitingRequest req) => req.PriorityGroup?.Priority ?? int.MaxValue;
public static WaitQueue NewWaitQueue(int max = 0) => new(max);