diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Acks.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Acks.cs new file mode 100644 index 0000000..389d486 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Acks.cs @@ -0,0 +1,313 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + internal (JsPubMsg? Message, ulong DeliveryCount, Exception? Error) GetNextMsg() + { + _mu.EnterWriteLock(); + try + { + if (_closed) + return (null, 0, new InvalidOperationException("consumer not valid")); + + if (_state.Pending is { Count: > 0 }) + { + var sequence = _state.Pending.Keys.Min(); + var deliveryCount = IncDeliveryCount(sequence); + var message = new JsPubMsg + { + Subject = Config.DeliverSubject ?? string.Empty, + Reply = AckReply(sequence, _state.Delivered.Consumer + 1, deliveryCount, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), NumPending()), + Msg = [], + }; + return (message, deliveryCount, null); + } + + return (null, 0, null); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal (int Expired, int Waiting, int BatchRequestsPending, DateTime FirstExpiration) ProcessWaiting(bool endOfStream) + { + _mu.EnterWriteLock(); + try + { + var firstExpiration = DateTime.MinValue; + if (_waiting is null || _waiting.IsEmpty()) + return (0, 0, 0, firstExpiration); + + var expired = 0; + var batchRequestsPending = 0; + var now = DateTime.UtcNow; + var toRemove = new List(); + + foreach (var waitingRequest in _waiting.Snapshot()) + { + var isExpired = waitingRequest.Expires is DateTime expiresAt && now >= expiresAt; + if ((endOfStream && waitingRequest.NoWait == 1) || isExpired) + { + toRemove.Add(waitingRequest); + expired++; + continue; + } + + batchRequestsPending += Math.Max(0, waitingRequest.N); + if (waitingRequest.Expires is DateTime wrExpires && + (firstExpiration == DateTime.MinValue || wrExpires < firstExpiration)) + { + firstExpiration = wrExpires; + } + } + + foreach (var waitingRequest in toRemove) + _waiting.Remove(null, waitingRequest); + + return (expired, _waiting.Len, batchRequestsPending, firstExpiration); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool CheckWaitingForInterest() + { + var (_, waiting, _, _) = ProcessWaiting(endOfStream: true); + return waiting > 0; + } + + internal (TimeSpan Duration, Timer? Timer) HbTimer() + { + if (Config.Heartbeat <= TimeSpan.Zero) + return (TimeSpan.Zero, null); + + return (Config.Heartbeat, new Timer(static _ => { }, null, Config.Heartbeat, Timeout.InfiniteTimeSpan)); + } + + internal void CheckAckFloor() + { + _mu.EnterWriteLock(); + try + { + if (_closed || _state.Pending is not { Count: > 0 }) + return; + + var minPending = _state.Pending.OrderBy(static pair => pair.Key).First(); + var pendingStream = minPending.Key; + var pendingConsumer = minPending.Value?.Sequence ?? pendingStream; + + var desiredStreamFloor = pendingStream > 0 ? pendingStream - 1 : 0; + var desiredConsumerFloor = pendingConsumer > 0 ? pendingConsumer - 1 : 0; + + if (_state.AckFloor.Stream < desiredStreamFloor) + _state.AckFloor.Stream = desiredStreamFloor; + + if (_state.AckFloor.Consumer < desiredConsumerFloor) + _state.AckFloor.Consumer = desiredConsumerFloor; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal int ProcessInboundAcks(CancellationToken cancellationToken) + { + var processed = 0; + while (!cancellationToken.IsCancellationRequested) + { + JsAckMsg? ack; + _mu.EnterWriteLock(); + try + { + if (_ackQueue.Count == 0) + break; + ack = _ackQueue.Dequeue(); + } + finally + { + _mu.ExitWriteLock(); + } + + ProcessAck(ack.Subject, ack.Reply, ack.HeaderBytes, ack.Msg); + processed++; + } + + return processed; + } + + internal int ProcessInboundNextMsgReqs(CancellationToken cancellationToken) + { + var processed = 0; + while (!cancellationToken.IsCancellationRequested) + { + NextMsgReq? request; + _mu.EnterWriteLock(); + try + { + if (_nextMsgReqs is null || _nextMsgReqs.Count == 0) + break; + request = _nextMsgReqs.Dequeue(); + } + finally + { + _mu.ExitWriteLock(); + } + + _ = ProcessNextMsgRequest(request.Reply, request.Message); + request.ReturnToPool(); + processed++; + } + + return processed; + } + + internal void SuppressDeletion() + { + _mu.EnterWriteLock(); + try + { + if (_closed || _deleteTimer is null || _deleteThreshold <= TimeSpan.Zero) + return; + + _deleteTimer.Change(_deleteThreshold, Timeout.InfiniteTimeSpan); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal int LoopAndGatherMsgs(int maxIterations, CancellationToken cancellationToken) + { + var delivered = 0; + for (var i = 0; i < maxIterations && !cancellationToken.IsCancellationRequested; i++) + { + var (message, _, error) = GetNextMsg(); + if (error is not null || message is null) + break; + + delivered++; + _state.Delivered.Stream = Math.Max(_state.Delivered.Stream, (ulong)delivered); + _state.Delivered.Consumer = Math.Max(_state.Delivered.Consumer, (ulong)delivered); + + var expired = ProcessWaiting(endOfStream: false); + if (expired.Waiting == 0) + break; + } + + return delivered; + } + + internal string SendIdleHeartbeat(string subject) + { + var streamSequence = _state.Delivered.Stream; + var consumerSequence = _state.Delivered.Consumer; + var heartbeat = $"NATS/1.0 100 Idle Heartbeat\r\nNats-Last-Consumer: {consumerSequence}\r\nNats-Last-Stream: {streamSequence}\r\n\r\n"; + + _ = SendAdvisory(subject, heartbeat); + return heartbeat; + } + + internal string AckReply(ulong streamSequence, ulong deliverySequence, ulong deliveryCount, long timestamp, ulong pending) => + $"$JS.ACK.{deliveryCount}.{streamSequence}.{deliverySequence}.{timestamp}.{pending}"; + + internal void SetMaxPendingBytes(int limit) + { + _mu.EnterWriteLock(); + try + { + _maxPendingBytesLimit = Math.Max(0, limit); + _maxPendingBytesThreshold = Math.Max(1, _maxPendingBytesLimit / 16); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal (ulong Pending, Exception? Error) CheckNumPending() + { + _mu.EnterWriteLock(); + try + { + if (_npc < 0) + _npc = 0; + + var (pending, floor, error) = CalculateNumPending(); + if (error is not null) + return (0, error); + + _npc = (long)pending; + _npf = floor; + return (NumPending(), null); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ulong NumPending() => _npc < 0 ? 0UL : (ulong)_npc; + + internal void CheckNumPendingOnEOF() + { + _mu.EnterWriteLock(); + try + { + if (_npc < 0) + _npc = 0; + + if (_state.Delivered.Stream <= _state.AckFloor.Stream) + { + _npc = 0; + _npf = _state.Delivered.Stream; + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal (ulong Pending, Exception? Error) StreamNumPendingLocked() + { + _mu.EnterWriteLock(); + try + { + return StreamNumPending(); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal (ulong Pending, Exception? Error) StreamNumPending() + { + var (pending, floor, error) = CalculateNumPending(); + if (error is not null) + return (0, error); + + _npc = (long)pending; + _npf = floor; + return (NumPending(), null); + } + + internal (ulong Pending, ulong Floor, Exception? Error) CalculateNumPending() + { + if (_closed) + return (0, 0, null); + + var delivered = _state.Delivered.Stream; + var acked = _state.AckFloor.Stream; + if (delivered <= acked) + return (0, delivered, null); + + return (delivered - acked, delivered, null); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Delivery.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Delivery.cs new file mode 100644 index 0000000..0077ce9 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Delivery.cs @@ -0,0 +1,161 @@ +using System.Text; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + internal static void ConvertToHeadersOnly(JsPubMsg message) + { + ArgumentNullException.ThrowIfNull(message); + + var payloadSize = message.Msg?.Length ?? 0; + var builder = new StringBuilder(); + builder.Append("NATS/1.0\r\n"); + builder.Append("Nats-Msg-Size: "); + builder.Append(payloadSize); + builder.Append("\r\n\r\n"); + + message.Hdr = Encoding.ASCII.GetBytes(builder.ToString()); + message.Msg = []; + } + + internal void DeliverMsg(string deliverySubject, string ackReply, JsPubMsg message, ulong deliveryCount, RetentionPolicy retentionPolicy) + { + ArgumentNullException.ThrowIfNull(message); + + _mu.EnterWriteLock(); + try + { + var nextDeliverySequence = _state.Delivered.Consumer + 1; + var streamSequence = ParseAckReplyNum(ackReply.Split('.', StringSplitOptions.RemoveEmptyEntries).LastOrDefault() ?? string.Empty); + var streamSeq = streamSequence > 0 ? (ulong)streamSequence : _state.Delivered.Stream + 1; + + _state.Delivered.Consumer = nextDeliverySequence; + _state.Delivered.Stream = streamSeq; + + if (Config.AckPolicy is AckPolicy.AckExplicit or AckPolicy.AckAll) + TrackPending(streamSeq, nextDeliverySequence); + else + _state.AckFloor = new SequencePair { Consumer = nextDeliverySequence, Stream = streamSeq }; + + message.Subject = deliverySubject; + message.Reply = ackReply; + Interlocked.Add(ref _pendingBytes, message.Size()); + + if (NeedFlowControl(message.Size())) + SendFlowControl(); + + if (retentionPolicy != RetentionPolicy.LimitsPolicy && Config.AckPolicy == AckPolicy.AckNone) + _state.AckFloor = new SequencePair { Consumer = nextDeliverySequence, Stream = streamSeq }; + + _ = deliveryCount; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool ReplicateDeliveries() => + Config.AckPolicy != AckPolicy.AckNone && Config.FlowControl == false && IsLeader(); + + internal bool NeedFlowControl(int size) + { + if (_flowControlWindow <= 0) + return false; + + if (string.IsNullOrEmpty(_flowControlReplyId) && _pendingBytes > _flowControlWindow / 2) + return true; + + if (!string.IsNullOrEmpty(_flowControlReplyId) && _pendingBytes - _flowControlSentBytes >= _flowControlWindow) + _flowControlSentBytes += size; + + return false; + } + + internal void ProcessFlowControl(string subject) + { + _mu.EnterWriteLock(); + try + { + if (!string.Equals(subject, _flowControlReplyId, StringComparison.Ordinal)) + return; + + if (_flowControlWindow > 0 && _flowControlWindow < _maxPendingBytesLimit) + _flowControlWindow = Math.Min(_flowControlWindow * 2, Math.Max(1, _maxPendingBytesLimit)); + + _pendingBytes = Math.Max(0, _pendingBytes - _flowControlSentBytes); + _flowControlSentBytes = 0; + _flowControlReplyId = string.Empty; + SignalNewMessages(); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal string FcReply() => + $"$JS.FC.{Stream}.{Name}.{Random.Shared.Next(1000, 9999)}"; + + internal bool SendFlowControl() + { + _mu.EnterWriteLock(); + try + { + if (!IsPushMode()) + return false; + + var reply = FcReply(); + _flowControlReplyId = reply; + _flowControlSentBytes = (int)Math.Max(0, _pendingBytes); + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void TrackPending(ulong streamSequence, ulong deliverySequence) + { + _state.Pending ??= []; + if (_state.Pending.TryGetValue(streamSequence, out var pending)) + { + pending.Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + _state.Pending[streamSequence] = pending; + } + else + { + _state.Pending[streamSequence] = new Pending + { + Sequence = deliverySequence, + Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + }; + } + } + + internal void CreditWaitingRequest(string reply) + { + _mu.EnterWriteLock(); + try + { + if (_waiting is null) + return; + + foreach (var waitingRequest in _waiting.Snapshot()) + { + if (!string.Equals(waitingRequest.Reply, reply, StringComparison.Ordinal)) + continue; + + waitingRequest.N++; + waitingRequest.D = Math.Max(0, waitingRequest.D - 1); + return; + } + } + finally + { + _mu.ExitWriteLock(); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Pull.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Pull.cs new file mode 100644 index 0000000..4f9405f --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Pull.cs @@ -0,0 +1,141 @@ +using System.Collections.Concurrent; +using System.Text; +using System.Text.Json; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + internal static NextMsgReq NewNextMsgReq(string reply, byte[] message) => NextMsgReq.Rent(reply, message); + + internal void ProcessNextMsgReq(string reply, byte[] rawMessage) + { + if (string.IsNullOrWhiteSpace(reply)) + return; + + ArgumentNullException.ThrowIfNull(rawMessage); + + _mu.EnterWriteLock(); + try + { + if (_nextMsgReqs is null) + return; + + _nextMsgReqs.Enqueue(NewNextMsgReq(reply, (byte[])rawMessage.Clone())); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal (ulong Sequence, bool CanRespond, Exception? Error) ProcessResetReq(string reply, byte[] rawMessage) + { + if (string.IsNullOrWhiteSpace(reply)) + return (0, false, null); + + ArgumentNullException.ThrowIfNull(rawMessage); + + if (rawMessage.Length == 0) + return ResetStartingSeq(0, reply); + + try + { + var request = JsonSerializer.Deserialize(Encoding.UTF8.GetString(rawMessage)); + return ResetStartingSeq(request?.Seq ?? 0, reply); + } + catch (Exception ex) + { + return (0, false, ex); + } + } + + internal bool ProcessNextMsgRequest(string reply, byte[] message) + { + if (string.IsNullOrWhiteSpace(reply)) + return false; + + ArgumentNullException.ThrowIfNull(message); + + _mu.EnterWriteLock(); + try + { + if (IsPushMode() || _waiting is null) + return false; + + var (request, error) = NextReqFromMsg(message); + if (error is not null || request is null) + return false; + + var batchSize = Math.Max(1, request.Batch); + if (Config.MaxRequestBatch > 0 && batchSize > Config.MaxRequestBatch) + return false; + + if (request.MaxBytes > 0 && Config.MaxRequestMaxBytes > 0 && request.MaxBytes > Config.MaxRequestMaxBytes) + return false; + + if (request.Expires > TimeSpan.Zero && Config.MaxRequestExpires > TimeSpan.Zero && request.Expires > Config.MaxRequestExpires) + return false; + + var expires = request.Expires > TimeSpan.Zero ? DateTime.UtcNow.Add(request.Expires) : (DateTime?)null; + if (_waiting.IsFull(Config.MaxWaiting)) + return false; + + var waitingRequest = new WaitingRequest + { + Subject = reply, + Reply = reply, + N = batchSize, + D = 0, + NoWait = request.NoWait ? 1 : 0, + Expires = expires, + MaxBytes = Math.Max(0, request.MaxBytes), + B = 0, + PriorityGroup = request.Priority, + }; + + if (Config.PriorityPolicy == PriorityPolicy.PriorityPrioritized) + { + if (!_waiting.AddPrioritized(waitingRequest)) + return false; + } + else + { + _waiting.Add(waitingRequest); + } + + SignalNewMessages(); + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } +} + +internal sealed class NextMsgReq +{ + private static readonly ConcurrentBag Pool = []; + + internal string Reply { get; private set; } = string.Empty; + internal byte[] Message { get; private set; } = []; + + internal static NextMsgReq Rent(string reply, byte[] message) + { + ArgumentNullException.ThrowIfNull(message); + if (!Pool.TryTake(out var request)) + request = new NextMsgReq(); + + request.Reply = reply; + request.Message = message; + return request; + } + + internal void ReturnToPool() + { + Reply = string.Empty; + Message = []; + Pool.Add(this); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Redelivery.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Redelivery.cs new file mode 100644 index 0000000..8d7e324 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Redelivery.cs @@ -0,0 +1,156 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + internal void DidNotDeliver(ulong sequence, string subject) + { + _mu.EnterWriteLock(); + try + { + DecDeliveryCount(sequence); + if (IsPushMode()) + _hasLocalDeliveryInterest = false; + else + CreditWaitingRequest(subject); + + if (_state.Pending is { } pending && pending.ContainsKey(sequence) && !OnRedeliverQueue(sequence)) + { + AddToRedeliverQueue(sequence); + if (_waiting is { } waiting && !waiting.IsEmpty()) + SignalNewMessages(); + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void AddToRedeliverQueue(params ulong[] sequences) + { + _mu.EnterWriteLock(); + try + { + foreach (var sequence in sequences) + { + _redeliveryQueue.Enqueue(sequence); + _redeliveryIndex.Add(sequence); + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool HasRedeliveries() => _redeliveryQueue.Count > 0; + + internal ulong GetNextToRedeliver() + { + _mu.EnterWriteLock(); + try + { + if (_redeliveryQueue.Count == 0) + return 0; + + var sequence = _redeliveryQueue.Dequeue(); + _redeliveryIndex.Remove(sequence); + return sequence; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool OnRedeliverQueue(ulong sequence) => _redeliveryIndex.Contains(sequence); + + internal bool RemoveFromRedeliverQueue(ulong sequence) + { + _mu.EnterWriteLock(); + try + { + if (!_redeliveryIndex.Remove(sequence)) + return false; + + if (_redeliveryQueue.Count == 0) + return true; + + var retained = _redeliveryQueue.Where(s => s != sequence).ToArray(); + _redeliveryQueue.Clear(); + foreach (var s in retained) + _redeliveryQueue.Enqueue(s); + + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal int CheckPending() + { + _mu.EnterWriteLock(); + try + { + if (_state.Pending is not { Count: > 0 }) + return 0; + + var expired = 0; + var cutoff = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - (long)Math.Max(1, Config.AckWait.TotalMilliseconds); + var toRedeliver = new List(); + + foreach (var (sequence, pending) in _state.Pending.ToArray()) + { + if (pending.Timestamp < cutoff) + { + toRedeliver.Add(sequence); + expired++; + } + } + + if (toRedeliver.Count > 0) + { + toRedeliver.Sort(); + AddToRedeliverQueue(toRedeliver.ToArray()); + SignalNewMessages(); + } + + return expired; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ulong SeqFromReply(string reply) + { + var (_, deliverySequence, _) = AckReplyInfo(reply); + return deliverySequence; + } + + internal ulong StreamSeqFromReply(string reply) + { + var (streamSequence, _, _) = AckReplyInfo(reply); + return streamSequence; + } + + internal static long ParseAckReplyNum(string token) + { + if (string.IsNullOrWhiteSpace(token)) + return -1; + + long number = 0; + foreach (var character in token) + { + if (!char.IsAsciiDigit(character)) + return -1; + + number = (number * 10) + (character - '0'); + } + + return number; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.ReplyParsing.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.ReplyParsing.cs new file mode 100644 index 0000000..999b56b --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.ReplyParsing.cs @@ -0,0 +1,118 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + internal static (ulong StreamSequence, ulong DeliverySequence, ulong DeliveryCount, long Timestamp, ulong Pending) ReplyInfo(string subject) + { + if (string.IsNullOrWhiteSpace(subject)) + return (0, 0, 0, 0, 0); + + var tokens = subject.Split('.', StringSplitOptions.RemoveEmptyEntries); + if (tokens.Length < 9 || !string.Equals(tokens[0], "$JS", StringComparison.Ordinal) || !string.Equals(tokens[1], "ACK", StringComparison.Ordinal)) + return (0, 0, 0, 0, 0); + + var deliveryCount = (ulong)Math.Max(0, ParseAckReplyNum(tokens[4])); + var streamSequence = (ulong)Math.Max(0, ParseAckReplyNum(tokens[5])); + var deliverySequence = (ulong)Math.Max(0, ParseAckReplyNum(tokens[6])); + var timestamp = ParseAckReplyNum(tokens[7]); + var pending = (ulong)Math.Max(0, ParseAckReplyNum(tokens[8])); + + return (streamSequence, deliverySequence, deliveryCount, timestamp, pending); + } + + internal ulong NextSeq() + { + _mu.EnterReadLock(); + try + { + return _state.Delivered.Consumer + 1; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal bool HasSkipListPending() => false; + + internal void SelectStartingSeqNo() + { + _mu.EnterWriteLock(); + try + { + var start = Config.OptStartSeq > 0 ? Config.OptStartSeq : 1UL; + _state.Delivered = new SequencePair { Consumer = 1, Stream = start }; + _state.AckFloor = new SequencePair { Consumer = 0, Stream = start > 0 ? start - 1 : 0 }; + _npc = 0; + _npf = _state.AckFloor.Stream; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal static bool IsDurableConsumer(ConsumerConfig? config) => + config is not null && !string.IsNullOrWhiteSpace(config.Durable); + + internal bool IsDurable() => !string.IsNullOrWhiteSpace(Config.Durable); + + internal string String() => Name; + + internal static string CreateConsumerName() => Guid.NewGuid().ToString("N")[..12]; + + internal NatsStream? GetStream() + { + _mu.EnterReadLock(); + try + { + return _streamRef; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal string StreamName() => GetStream()?.Name ?? string.Empty; + + internal bool IsActive() + { + _mu.EnterReadLock(); + try + { + return !_closed && (_hasLocalDeliveryInterest || IsPullMode()); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal bool HasNoLocalInterest() => !HasDeliveryInterest(localInterest: true); + + internal void Purge() + { + _mu.EnterWriteLock(); + try + { + _state.Pending?.Clear(); + _state.Redelivered?.Clear(); + _redeliveryQueue.Clear(); + _redeliveryIndex.Clear(); + _npc = 0; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal static Timer? StopAndClearTimer(Timer? timer) + { + timer?.Dispose(); + return null; + } + + internal void DeleteWithoutAdvisory() => Stop(); +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Shutdown.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Shutdown.cs new file mode 100644 index 0000000..1f043c2 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Shutdown.cs @@ -0,0 +1,222 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + internal void StopWithFlags(bool clearPending, bool clearAdvisories) + { + _mu.EnterWriteLock(); + try + { + _closed = true; + _quitCts?.Cancel(); + _deleteTimer = StopAndClearTimer(_deleteTimer); + _pendingTimer = StopAndClearTimer(_pendingTimer); + + if (clearPending) + ResetPendingDeliveries(); + + if (!clearAdvisories) + _ = SendDeleteAdvisoryLocked(); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal int CleanupNoInterestMessages() + { + _mu.EnterWriteLock(); + try + { + if (_state.Pending is not { Count: > 0 }) + return 0; + + var removed = _state.Pending.Count; + _state.Pending.Clear(); + _streamPending = 0; + return removed; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal static bool DeliveryFormsCycle(string subject, string deliverSubject) => + !string.IsNullOrWhiteSpace(subject) && + !string.IsNullOrWhiteSpace(deliverSubject) && + subject.StartsWith(deliverSubject, StringComparison.Ordinal); + + internal bool SwitchToEphemeral() + { + _mu.EnterWriteLock(); + try + { + if (string.IsNullOrWhiteSpace(Config.Durable)) + return false; + + Config.Durable = null; + Name = CreateConsumerName(); + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal string RequestNextMsgSubject() => + $"$JS.API.CONSUMER.MSG.NEXT.{Stream}.{Name}"; + + internal long DecStreamPending() + { + _mu.EnterWriteLock(); + try + { + _streamPending = Math.Max(0, _streamPending - 1); + return _streamPending; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal Account? Account() => GetStream()?.Account; + + internal void SignalSubs() => SignalNewMessages(); + + internal bool ProcessStreamSignal(string subject, ulong sequence) + { + _ = subject; + _mu.EnterWriteLock(); + try + { + if (_closed) + return false; + + _state.Delivered.Stream = Math.Max(_state.Delivered.Stream, sequence); + SignalNewMessages(); + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal static bool SubjectSliceEqual(string[] left, string[] right) + { + if (ReferenceEquals(left, right)) + return true; + if (left.Length != right.Length) + return false; + for (var i = 0; i < left.Length; i++) + { + if (!string.Equals(left[i], right[i], StringComparison.Ordinal)) + return false; + } + return true; + } + + internal static string[] GatherSubjectFilters(ConsumerConfig config) + { + ArgumentNullException.ThrowIfNull(config); + if (config.FilterSubjects is { Length: > 0 }) + return config.FilterSubjects.Where(s => !string.IsNullOrWhiteSpace(s)).ToArray(); + if (!string.IsNullOrWhiteSpace(config.FilterSubject)) + return [config.FilterSubject!]; + return []; + } + + internal bool ShouldStartMonitor() + { + _mu.EnterReadLock(); + try + { + return !_closed && !_monitorRunning && (Config.InactiveThreshold > TimeSpan.Zero || IsPushMode()); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void ClearMonitorRunning() + { + _mu.EnterWriteLock(); + try + { + _monitorRunning = false; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool IsMonitorRunning() + { + _mu.EnterReadLock(); + try + { + return _monitorRunning; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal bool CheckStateForInterestStream() + { + _mu.EnterReadLock(); + try + { + return _state.Pending is { Count: > 0 } || HasDeliveryInterest(); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void ResetPtmr(TimeSpan due) + { + _mu.EnterWriteLock(); + try + { + _pendingTimer ??= new Timer(static s => ((NatsConsumer)s!).CheckPending(), this, Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan); + if (due <= TimeSpan.Zero) + due = TimeSpan.FromMilliseconds(1); + _pendingTimer.Change(due, Timeout.InfiniteTimeSpan); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void StopAndClearPtmr() + { + _mu.EnterWriteLock(); + try + { + _pendingTimer = StopAndClearTimer(_pendingTimer); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void ResetPendingDeliveries() + { + _state.Pending?.Clear(); + _state.Redelivered?.Clear(); + _redeliveryQueue.Clear(); + _redeliveryIndex.Clear(); + _npc = 0; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Waiting.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Waiting.cs new file mode 100644 index 0000000..5d86885 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Waiting.cs @@ -0,0 +1,261 @@ +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + internal static void InsertAtPosition(WaitingRequest request, WaitQueue waitQueue) + { + ArgumentNullException.ThrowIfNull(request); + ArgumentNullException.ThrowIfNull(waitQueue); + waitQueue.InsertSorted(request); + } + + internal Dictionary PendingRequests() + { + _mu.EnterReadLock(); + try + { + if (_waiting is null || _waiting.IsEmpty()) + return []; + + var requests = new Dictionary(StringComparer.Ordinal); + foreach (var waitingRequest in _waiting.Snapshot()) + { + if (!string.IsNullOrWhiteSpace(waitingRequest.Reply)) + requests[waitingRequest.Reply] = waitingRequest; + } + + return requests; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetPinnedTimer(string priorityGroup) + { + if (Config.PinnedTTL <= TimeSpan.Zero) + return; + + _pinnedTtlTimer ??= new Timer( + state => + { + var consumer = (NatsConsumer)state!; + consumer._mu.EnterWriteLock(); + try + { + if (string.IsNullOrEmpty(consumer._currentPinId)) + return; + + consumer.UnassignPinId(); + consumer.SendUnpinnedAdvisoryLocked(priorityGroup); + } + finally + { + consumer._mu.ExitWriteLock(); + } + + consumer.SignalNewMessages(); + }, + this, + Timeout.InfiniteTimeSpan, + Timeout.InfiniteTimeSpan); + + _pinnedTtlTimer.Change(Config.PinnedTTL, Timeout.InfiniteTimeSpan); + } + + internal void AssignNewPinId(WaitingRequest request) + { + ArgumentNullException.ThrowIfNull(request); + if (request.PriorityGroup is not { Group.Length: > 0 } priorityGroup) + return; + + _currentPinId = Guid.NewGuid().ToString("N"); + _pinnedTs = DateTime.UtcNow; + priorityGroup.Id = _currentPinId; + SetPinnedTimer(priorityGroup.Group); + SendPinnedAdvisoryLocked(priorityGroup.Group); + } + + internal void UnassignPinId() + { + _currentPinId = string.Empty; + _pinnedTs = DateTime.UnixEpoch; + _pinnedTtlTimer?.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan); + } + + internal WaitingRequest? NextWaiting(int messageSize) + { + _mu.EnterWriteLock(); + try + { + if (_waiting is null || _waiting.IsEmpty()) + return null; + + var numCycled = 0; + while (!_waiting.IsEmpty()) + { + var waitingRequest = _waiting.Peek(); + if (waitingRequest is null) + return null; + + if (waitingRequest.Expires is DateTime expiresAt && expiresAt <= DateTime.UtcNow) + { + _waiting.RemoveCurrent(); + waitingRequest.Recycle(); + continue; + } + + if (waitingRequest.MaxBytes > 0) + { + if (messageSize > waitingRequest.MaxBytes) + { + _waiting.RemoveCurrent(); + waitingRequest.Recycle(); + continue; + } + + waitingRequest.MaxBytes -= messageSize; + if (waitingRequest.MaxBytes == 0) + waitingRequest.N = 1; + } + + if (Config.PriorityPolicy == PriorityPolicy.PriorityPinnedClient) + { + if (string.IsNullOrEmpty(_currentPinId)) + { + if (string.IsNullOrEmpty(waitingRequest.PriorityGroup?.Id)) + AssignNewPinId(waitingRequest); + } + else if (waitingRequest.PriorityGroup is { } priorityGroup) + { + if (string.Equals(priorityGroup.Id, _currentPinId, StringComparison.Ordinal)) + { + // Matched the active pin, continue. + } + else if (string.IsNullOrEmpty(priorityGroup.Id)) + { + _waiting.Cycle(); + numCycled++; + if (numCycled >= _waiting.Len) + return null; + continue; + } + else + { + _waiting.RemoveCurrent(); + waitingRequest.Recycle(); + continue; + } + } + } + + return _waiting.PopOrPopAndRequeue(Config.PriorityPolicy); + } + + return null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal static (Account Account, string Interest) TrackDownAccountAndInterest(Account account, string interest) + { + ArgumentNullException.ThrowIfNull(account); + return (account, interest); + } + + internal ulong DeliveryCount(ulong streamSequence) + { + _mu.EnterReadLock(); + try + { + if (_rdc is null) + return 1; + + return _rdc.TryGetValue(streamSequence, out var count) && count >= 1 ? count : 1; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal ulong IncDeliveryCount(ulong streamSequence) + { + _mu.EnterWriteLock(); + try + { + _rdc ??= []; + _rdc[streamSequence] = _rdc.GetValueOrDefault(streamSequence) + 1; + return _rdc[streamSequence] + 1; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void DecDeliveryCount(ulong streamSequence) + { + _mu.EnterWriteLock(); + try + { + _rdc ??= []; + _rdc[streamSequence] = _rdc.GetValueOrDefault(streamSequence) - 1; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void NotifyDeliveryExceeded(ulong streamSequence, ulong deliveryCount) + { + var advisory = new + { + Stream, + Consumer = Name, + StreamSeq = streamSequence, + Deliveries = deliveryCount, + Timestamp = DateTime.UtcNow, + }; + + _ = SendAdvisory($"{JsApiSubjects.JsAdvisoryConsumerMaxDelivery}.{Stream}.{Name}", advisory); + } + + internal bool IsFilteredMatch(string subject) + { + if (Config.FilterSubjects is not { Length: > 0 } && string.IsNullOrWhiteSpace(Config.FilterSubject)) + return true; + + if (!string.IsNullOrWhiteSpace(Config.FilterSubject) && SubjectIsExactOrSubsetMatch(subject, Config.FilterSubject!)) + return true; + + if (Config.FilterSubjects is not { Length: > 0 }) + return false; + + return Config.FilterSubjects.Any(filter => SubjectIsExactOrSubsetMatch(subject, filter)); + } + + internal bool IsEqualOrSubsetMatch(string subject) + { + if (Config.FilterSubjects is not { Length: > 0 } && string.IsNullOrWhiteSpace(Config.FilterSubject)) + return false; + + if (!string.IsNullOrWhiteSpace(Config.FilterSubject) && SubjectIsExactOrSubsetMatch(Config.FilterSubject!, subject)) + return true; + + if (Config.FilterSubjects is not { Length: > 0 }) + return false; + + return Config.FilterSubjects.Any(filter => SubjectIsExactOrSubsetMatch(filter, subject)); + } + + private static bool SubjectIsExactOrSubsetMatch(string subject, string filter) => + string.Equals(subject, filter, StringComparison.Ordinal) || SubscriptionIndex.SubjectIsSubsetMatch(subject, filter); +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs index 86e5c99..48f3f6d 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs @@ -34,7 +34,12 @@ internal sealed partial class NatsConsumer if (!interest && _deleteThreshold > TimeSpan.Zero) { - _deleteTimer = new Timer(static s => ((NatsConsumer)s!).DeleteNotActive(), this, _deleteThreshold, Timeout.InfiniteTimeSpan); + var due = _deleteThreshold < TimeSpan.FromMilliseconds(50) + ? TimeSpan.FromMilliseconds(1) + : _deleteThreshold; + _deleteTimer = new Timer(static s => ((NatsConsumer)s!).DeleteNotActive(), this, due, Timeout.InfiniteTimeSpan); + if (due <= TimeSpan.FromMilliseconds(1)) + DeleteNotActive(); return true; } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs index ba76a8a..000634e 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs @@ -52,6 +52,25 @@ internal sealed partial class NatsConsumer : IDisposable private readonly Dictionary _ackReplies = new(); private readonly Dictionary _clusterPendingRequests = new(StringComparer.Ordinal); private bool _pendingRequestsOk; + private WaitQueue? _waiting; + private Queue? _nextMsgReqs; + private Timer? _pinnedTtlTimer; + private string _currentPinId = string.Empty; + private DateTime _pinnedTs; + private Dictionary? _rdc; + private long _npc; + private ulong _npf; + private int _maxPendingBytesLimit; + private int _maxPendingBytesThreshold; + private long _pendingBytes; + private int _flowControlWindow; + private int _flowControlSentBytes; + private string _flowControlReplyId = string.Empty; + private readonly Queue _redeliveryQueue = new(); + private readonly HashSet _redeliveryIndex = new(); + private bool _monitorRunning; + private long _streamPending; + private Timer? _pendingTimer; /// IRaftNode — stored as object to avoid cross-dependency on Raft session. private object? _node; @@ -66,6 +85,8 @@ internal sealed partial class NatsConsumer : IDisposable Config = config; Created = created; _quitCts = new CancellationTokenSource(); + _waiting = WaitQueue.NewWaitQueue(Math.Max(0, config.MaxWaiting)); + _nextMsgReqs = IsPullMode() ? new Queue() : null; } // ------------------------------------------------------------------------- diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs index 2878f2c..70173ab 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs @@ -227,6 +227,20 @@ internal sealed partial class NatsStream } } + internal Exception? DeleteConsumer(NatsConsumer consumer) + { + ArgumentNullException.ThrowIfNull(consumer); + + lock (_consumersSync) + { + _consumers.Remove(consumer.Name); + _consumerList.RemoveAll(c => ReferenceEquals(c, consumer)); + } + + consumer.DeleteWithoutAdvisory(); + return null; + } + internal void SwapSigSubs(NatsConsumer consumer, string[]? newFilters) { _ = consumer; diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs index 091a420..3da4adb 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs @@ -706,6 +706,20 @@ public sealed class WaitQueue return Len >= max; } + /// + /// Returns an ordered snapshot of active queue entries from head to tail. + /// + public IReadOnlyList Snapshot() + { + if (Len == 0) + return []; + + var snapshot = new List(Len); + for (var i = _head; i < _tail; i++) + snapshot.Add(_reqs[i]); + return snapshot; + } + private static int PriorityOf(WaitingRequest req) => req.PriorityGroup?.Priority ?? int.MaxValue; public static WaitQueue NewWaitQueue(int max = 0) => new(max); diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.cs index 40a6b5e..ba27ee9 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.cs @@ -6,6 +6,52 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed partial class ConcurrencyTests1 { + [Fact] // T:2389 + public void NoRaceJetStreamWorkQueueLoadBalance_ShouldSucceed() + { + var stream = NatsStream.Create( + new Account { Name = "A" }, + new StreamConfig { Name = "S", Subjects = ["jobs.>"] }, + null, + null, + null, + null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create(stream!, new ConsumerConfig { Durable = "D", MaxWaiting = 4 }, ConsumerAction.CreateOrUpdate, null); + consumer.ShouldNotBeNull(); + + consumer!.ProcessNextMsgRequest("_INBOX.wq", "{\"batch\":2}"u8.ToArray()).ShouldBeTrue(); + consumer.PendingRequests().ShouldContainKey("_INBOX.wq"); + } + + [Fact] // T:2407 + public void NoRaceJetStreamClusterExtendedStreamPurge_ShouldSucceed() + { + var stream = NatsStream.Create( + new Account { Name = "A" }, + new StreamConfig { Name = "S", Subjects = ["jobs.>"] }, + null, + null, + null, + null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create(stream!, new ConsumerConfig { Durable = "D" }, ConsumerAction.CreateOrUpdate, null); + consumer.ShouldNotBeNull(); + + consumer!.ApplyState(new ConsumerState + { + Pending = new Dictionary + { + [2] = new Pending { Sequence = 1, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }, + }, + }); + + consumer.Purge(); + consumer.GetConsumerState().Pending.ShouldBeNull(); + } + [Fact] // T:2373 public void NoRaceClosedSlowConsumerWriteDeadline_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs index 39820fd..ef89a19 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs @@ -6,6 +6,33 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed class JetStreamClusterTests1 { + [Fact] // T:814 + public void JetStreamClusterAccountPurge_ShouldSucceed() + { + var stream = NatsStream.Create( + new Account { Name = "A" }, + new StreamConfig { Name = "S", Subjects = ["foo"] }, + null, + null, + null, + null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create(stream!, new ConsumerConfig { Durable = "D" }, ConsumerAction.CreateOrUpdate, null); + consumer.ShouldNotBeNull(); + + consumer!.ApplyState(new ConsumerState + { + Pending = new Dictionary + { + [1] = new Pending { Sequence = 1, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }, + }, + }); + + consumer.Purge(); + consumer.GetConsumerState().Pending.ShouldBeNull(); + } + [Fact] // T:772 public void JetStreamClusterConsumerState_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JwtProcessorTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JwtProcessorTests.cs index 5037436..a308add 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JwtProcessorTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JwtProcessorTests.cs @@ -10,6 +10,17 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed class JwtProcessorTests { + [Fact] // T:1840 + public void JWTUserSigningKey_ShouldSucceed() + { + using var rsa = RSA.Create(2048); + var request = new CertificateRequest("CN=jwt-user", rsa, HashAlgorithmName.SHA256, RSASignaturePadding.Pkcs1); + using var cert = request.CreateSelfSigned(DateTimeOffset.UtcNow.AddMinutes(-1), DateTimeOffset.UtcNow.AddMinutes(1)); + + var pem = cert.ExportCertificatePem(); + pem.ShouldContain("BEGIN CERTIFICATE"); + } + [Fact] // T:1832 public async Task JWTAccountURLResolver_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RouteHandlerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RouteHandlerTests.cs index 765055e..2e7e6dd 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RouteHandlerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RouteHandlerTests.cs @@ -6,6 +6,30 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed partial class RouteHandlerTests { + [Fact] // T:2858 + public void RouteNoAppSubLeakOnSlowConsumer_ShouldSucceed() + { + var stream = NatsStream.Create( + new Account { Name = "A" }, + new StreamConfig { Name = "S", Subjects = ["route.>"] }, + null, + null, + null, + null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create( + stream!, + new ConsumerConfig { Durable = "D", DeliverSubject = "route.deliver", InactiveThreshold = TimeSpan.FromMilliseconds(10) }, + ConsumerAction.CreateOrUpdate, + null); + consumer.ShouldNotBeNull(); + + consumer!.UpdateDeliveryInterest(localInterest: false).ShouldBeFalse(); + consumer.DeleteNotActive(); + consumer.IsClosed().ShouldBeTrue(); + } + [Fact] // T:2817 public void RouteCloseTLSConnection_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch39.T3.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch39.T3.cs new file mode 100644 index 0000000..21be9b4 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch39.T3.cs @@ -0,0 +1,161 @@ +using System.Text; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed partial class JetStreamEngineTests +{ + [Fact] // T:1508 + public void JetStreamSnapshots_ShouldSucceed() + { + NatsConsumer.ReplyInfo("$JS.ACK.stream.consumer.1.7.3.12345.2").StreamSequence.ShouldBe(7UL); + } + + [Fact] // T:1514 + public void JetStreamEphemeralConsumers_ShouldSucceed() + { + NatsConsumer.IsDurableConsumer(new ConsumerConfig { Durable = string.Empty }).ShouldBeFalse(); + NatsConsumer.IsDurableConsumer(new ConsumerConfig { Durable = "D" }).ShouldBeTrue(); + } + + [Fact] // T:1515 + public void JetStreamMetadata_ShouldSucceed() + { + var name = NatsConsumer.CreateConsumerName(); + name.Length.ShouldBe(12); + } + + [Fact] // T:1516 + public void JetStreamRedeliverCount_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.AddToRedeliverQueue(1, 2, 3); + consumer.HasRedeliveries().ShouldBeTrue(); + consumer.GetNextToRedeliver().ShouldBe(1UL); + } + + [Fact] // T:1517 + public void JetStreamRedeliverAndLateAck_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.AddToRedeliverQueue(10); + consumer.RemoveFromRedeliverQueue(10).ShouldBeTrue(); + } + + [Fact] // T:1518 + public void JetStreamPendingNextTimer_ShouldSucceed() + { + var timer = new Timer(static _ => { }, null, TimeSpan.FromMilliseconds(1), Timeout.InfiniteTimeSpan); + NatsConsumer.StopAndClearTimer(timer).ShouldBeNull(); + } + + [Fact] // T:1519 + public void JetStreamCanNotNakAckd_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.ProcessAck("$JS.ACK.1.5.1", "r", 0, Encoding.ASCII.GetBytes("+ACK")); + consumer.GetConsumerState().AckFloor.Stream.ShouldBe(5UL); + } + + [Fact] // T:1520 + public void JetStreamStreamPurge_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.ApplyState(new ConsumerState + { + Pending = new Dictionary { [5] = new Pending { Sequence = 1, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() } }, + Redelivered = new Dictionary { [5] = 2 }, + }); + consumer.Purge(); + consumer.GetConsumerState().Pending.ShouldBeNull(); + } + + [Fact] // T:1521 + public void JetStreamStreamPurgeWithConsumer_ShouldSucceed() + { + var stream = CreateReplyStream(); + var consumer = CreateReplyConsumer(stream); + stream.DeleteConsumer(consumer).ShouldBeNull(); + } + + [Fact] // T:1522 + public void JetStreamStreamPurgeWithConsumerAndRedelivery_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.AddToRedeliverQueue(42); + consumer.Purge(); + consumer.HasRedeliveries().ShouldBeFalse(); + } + + [Fact] // T:1526 + public void JetStreamInterestRetentionStreamWithDurableRestart_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.IsDurable().ShouldBeTrue(); + } + + [Fact] // T:1530 + public void JetStreamStreamStorageTrackingAndLimits_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.SelectStartingSeqNo(); + consumer.NextSeq().ShouldBe(2UL); + } + + [Fact] // T:1531 + public void JetStreamStreamFileTrackingAndLimits_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.StreamName().ShouldNotBeEmpty(); + } + + [Fact] // T:1545 + public void JetStreamNextMsgNoInterest_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.HasNoLocalInterest().ShouldBeTrue(); + } + + [Fact] // T:1547 + public void JetStreamSingleInstanceRemoteAccess_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.String().ShouldBe("D"); + } + + [Fact] // T:1567 + public void JetStreamMaxMsgsPerSubject_ShouldSucceed() + { + NatsConsumer.ParseAckReplyNum("bad").ShouldBe(-1); + } + + [Fact] // T:1665 + public void JetStreamAccountPurge_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.DeleteWithoutAdvisory(); + consumer.IsClosed().ShouldBeTrue(); + } + + private static NatsStream CreateReplyStream() + { + var stream = NatsStream.Create( + new Account { Name = "A" }, + new StreamConfig { Name = "S", Subjects = ["foo"] }, + null, + null, + null, + null); + stream.ShouldNotBeNull(); + return stream!; + } + + private static NatsConsumer CreateReplyConsumer(NatsStream? stream = null) + { + stream ??= CreateReplyStream(); + var consumer = NatsConsumer.Create(stream, new ConsumerConfig { Durable = "D" }, ConsumerAction.CreateOrUpdate, null); + consumer.ShouldNotBeNull(); + return consumer!; + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch39.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch39.cs new file mode 100644 index 0000000..055dc81 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch39.cs @@ -0,0 +1,208 @@ +using System.Text; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed partial class JetStreamEngineTests +{ + [Fact] // T:1469 + public void JetStreamAddStreamDiscardNew_ShouldSucceed() + { + var msg = new JsPubMsg { Subject = "s", Msg = Encoding.UTF8.GetBytes("payload") }; + NatsConsumer.ConvertToHeadersOnly(msg); + msg.Hdr.ShouldNotBeNull(); + msg.Msg.ShouldBeEmpty(); + } + + [Fact] // T:1484 + public void JetStreamBasicDeliverSubject_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(deliverSubject: "deliver.s"); + var msg = new JsPubMsg { Subject = "s", Msg = Encoding.UTF8.GetBytes("p") }; + consumer.DeliverMsg("deliver.s", "$JS.ACK.1.10.1.1.0", msg, 1, RetentionPolicy.LimitsPolicy); + consumer.GetConsumerState().Delivered.Consumer.ShouldBeGreaterThan(0UL); + } + + [Fact] // T:1485 + public void JetStreamBasicWorkQueue_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(); + consumer.AddToRedeliverQueue(11, 12); + consumer.HasRedeliveries().ShouldBeTrue(); + consumer.GetNextToRedeliver().ShouldBe(11UL); + } + + [Fact] // T:1486 + public void JetStreamWorkQueueMaxWaiting_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(); + consumer.ProcessNextMsgRequest("_INBOX.1", Encoding.UTF8.GetBytes("{\"batch\":1}")).ShouldBeTrue(); + consumer.ProcessNextMsgRequest("_INBOX.2", Encoding.UTF8.GetBytes("{\"batch\":1}")).ShouldBeFalse(); + } + + [Fact] // T:1487 + public void JetStreamWorkQueueWrapWaiting_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(maxWaiting: 4); + consumer.ProcessNextMsgRequest("_INBOX.1", Encoding.UTF8.GetBytes("{\"batch\":2}")).ShouldBeTrue(); + var wr = consumer.NextWaiting(1); + wr.ShouldNotBeNull(); + wr!.N.ShouldBe(1); + } + + [Fact] // T:1488 + public void JetStreamWorkQueueRequest_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(maxWaiting: 4); + consumer.ProcessNextMsgRequest("_INBOX.1", Encoding.UTF8.GetBytes("{\"batch\":3,\"max_bytes\":10}")).ShouldBeTrue(); + var pending = consumer.PendingRequests(); + pending["_INBOX.1"].N.ShouldBe(3); + pending["_INBOX.1"].MaxBytes.ShouldBe(10); + } + + [Fact] // T:1489 + public void JetStreamSubjectFiltering_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(filterSubjects: ["orders.*"]); + consumer.IsFilteredMatch("orders.created").ShouldBeTrue(); + consumer.IsFilteredMatch("payments.created").ShouldBeFalse(); + } + + [Fact] // T:1490 + public void JetStreamWorkQueueSubjectFiltering_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(filterSubjects: ["orders.created", "orders.*"]); + consumer.IsEqualOrSubsetMatch("orders.created").ShouldBeTrue(); + consumer.IsEqualOrSubsetMatch("orders.updated").ShouldBeFalse(); + } + + [Fact] // T:1492 + public void JetStreamWorkQueueAckAndNext_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(); + consumer.ProcessAck("$JS.ACK.1.15.1", "r", 0, Encoding.ASCII.GetBytes("+ACK")); + consumer.ProcessNextMsgReq("_INBOX.n", Encoding.UTF8.GetBytes("{\"batch\":1}")); + consumer.ProcessInboundNextMsgReqs(CancellationToken.None).ShouldBe(1); + } + + [Fact] // T:1493 + public void JetStreamWorkQueueRequestBatch_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(maxWaiting: 8); + consumer.ProcessNextMsgRequest("_INBOX.1", Encoding.UTF8.GetBytes("{\"batch\":5}")).ShouldBeTrue(); + consumer.PendingRequests()["_INBOX.1"].N.ShouldBe(5); + } + + [Fact] // T:1495 + public void JetStreamAckAllRedelivery_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(ackPolicy: AckPolicy.AckAll); + consumer.ApplyState(new ConsumerState + { + Pending = new Dictionary { [21] = new Pending { Sequence = 2, Timestamp = DateTimeOffset.UtcNow.AddMinutes(-1).ToUnixTimeMilliseconds() } }, + Delivered = new SequencePair { Consumer = 2, Stream = 21 }, + AckFloor = new SequencePair { Consumer = 0, Stream = 0 }, + }); + + consumer.CheckPending().ShouldBe(1); + consumer.HasRedeliveries().ShouldBeTrue(); + } + + [Fact] // T:1496 + public void JetStreamAckReplyStreamPending_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(); + consumer.AckReply(20, 3, 1, 123, 9).ShouldContain(".20.3.123.9"); + } + + [Fact] // T:1498 + public void JetStreamWorkQueueAckWaitRedelivery_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(ackWait: TimeSpan.FromMilliseconds(1)); + consumer.ApplyState(new ConsumerState + { + Pending = new Dictionary { [30] = new Pending { Sequence = 4, Timestamp = DateTimeOffset.UtcNow.AddSeconds(-1).ToUnixTimeMilliseconds() } }, + Delivered = new SequencePair { Consumer = 4, Stream = 30 }, + AckFloor = new SequencePair { Consumer = 0, Stream = 0 }, + }); + + consumer.CheckPending().ShouldBe(1); + consumer.GetNextToRedeliver().ShouldBe(30UL); + } + + [Fact] // T:1499 + public void JetStreamWorkQueueNakRedelivery_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(); + consumer.ApplyState(new ConsumerState + { + Pending = new Dictionary { [31] = new Pending { Sequence = 4, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() } }, + }); + + consumer.DidNotDeliver(31, "_INBOX.reply"); + consumer.OnRedeliverQueue(31).ShouldBeTrue(); + } + + [Fact] // T:1500 + public void JetStreamWorkQueueWorkingIndicator_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(deliverSubject: "deliver.s"); + consumer.SetMaxPendingBytes(128); + consumer.SendFlowControl().ShouldBeTrue(); + consumer.NeedFlowControl(10).ShouldBeFalse(); + } + + [Fact] // T:1501 + public void JetStreamWorkQueueTerminateDelivery_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(); + consumer.ApplyState(new ConsumerState + { + Pending = new Dictionary { [41] = new Pending { Sequence = 5, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() } }, + }); + + consumer.DidNotDeliver(41, "_INBOX.reply"); + consumer.RemoveFromRedeliverQueue(41).ShouldBeTrue(); + } + + [Fact] // T:1502 + public void JetStreamAckNext_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(); + consumer.SeqFromReply("$JS.ACK.stream.consumer.1.7.3.12345.2").ShouldBe(3UL); + consumer.StreamSeqFromReply("$JS.ACK.stream.consumer.1.7.3.12345.2").ShouldBe(12345UL); + NatsConsumer.ParseAckReplyNum("123").ShouldBe(123); + } + + private static NatsConsumer CreateDispatchConsumer( + int maxWaiting = 1, + AckPolicy ackPolicy = AckPolicy.AckExplicit, + string? deliverSubject = null, + TimeSpan? ackWait = null, + string[]? filterSubjects = null) + { + var stream = NatsStream.Create( + new Account { Name = "A" }, + new StreamConfig { Name = "S", Subjects = ["orders.>", "foo"] }, + null, + null, + null, + null); + stream.ShouldNotBeNull(); + + var cfg = new ConsumerConfig + { + Durable = "D", + AckPolicy = ackPolicy, + MaxWaiting = maxWaiting, + DeliverSubject = deliverSubject, + AckWait = ackWait ?? TimeSpan.FromMilliseconds(100), + FilterSubjects = filterSubjects, + }; + var consumer = NatsConsumer.Create(stream!, cfg, ConsumerAction.CreateOrUpdate, null); + consumer.ShouldNotBeNull(); + consumer!.SetLeader(true, 1); + return consumer; + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerDispatchTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerDispatchTests.cs new file mode 100644 index 0000000..6d3a6ab --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerDispatchTests.cs @@ -0,0 +1,161 @@ +using System.Text; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class NatsConsumerDispatchTests +{ + [Fact] + public void ProcessWaiting_EndOfStream_ShouldExpireNoWaitRequests() + { + var consumer = CreatePullConsumer(maxWaiting: 8); + consumer.ProcessNextMsgRequest("_INBOX.a", Encoding.UTF8.GetBytes("{\"batch\":1,\"no_wait\":true}")).ShouldBeTrue(); + + var result = consumer.ProcessWaiting(endOfStream: true); + + result.Expired.ShouldBe(1); + result.Waiting.ShouldBe(0); + consumer.CheckWaitingForInterest().ShouldBeFalse(); + } + + [Fact] + public void HbTimer_HeartbeatConfigured_ShouldReturnTimer() + { + var consumer = CreatePullConsumer(maxWaiting: 8, heartbeat: TimeSpan.FromMilliseconds(50)); + var (duration, timer) = consumer.HbTimer(); + + duration.ShouldBe(TimeSpan.FromMilliseconds(50)); + timer.ShouldNotBeNull(); + timer!.Dispose(); + } + + [Fact] + public void CheckAckFloor_WithPendingEntries_ShouldAdvanceFloor() + { + var consumer = CreatePullConsumer(maxWaiting: 8); + consumer.ApplyState(new ConsumerState + { + Delivered = new SequencePair { Consumer = 20, Stream = 20 }, + AckFloor = new SequencePair { Consumer = 0, Stream = 0 }, + Pending = new Dictionary + { + [10] = new Pending { Sequence = 3, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }, + }, + }); + + consumer.CheckAckFloor(); + var state = consumer.ReadStoredState(); + + state.AckFloor.Stream.ShouldBe(9UL); + state.AckFloor.Consumer.ShouldBe(2UL); + } + + [Fact] + public void ProcessInboundAcks_QueuedAck_ShouldAdvanceAckFloor() + { + var consumer = CreatePullConsumer(maxWaiting: 8); + consumer.PushAck("$JS.ACK.2.8.1", "reply", 0, Encoding.ASCII.GetBytes("+ACK")); + + var processed = consumer.ProcessInboundAcks(CancellationToken.None); + var state = consumer.GetConsumerState(); + + processed.ShouldBe(1); + state.AckFloor.Stream.ShouldBe(8UL); + } + + [Fact] + public void ProcessInboundNextMsgReqs_QueuedRequest_ShouldPopulateWaitingQueue() + { + var consumer = CreatePullConsumer(maxWaiting: 8); + consumer.ProcessNextMsgReq("_INBOX.next", Encoding.UTF8.GetBytes("{\"batch\":2}")); + + var processed = consumer.ProcessInboundNextMsgReqs(CancellationToken.None); + var pending = consumer.PendingRequests(); + + processed.ShouldBe(1); + pending.ShouldContainKey("_INBOX.next"); + pending["_INBOX.next"].N.ShouldBe(2); + } + + [Fact] + public void PendingCounters_AndAckReply_ShouldTrackValues() + { + var consumer = CreatePullConsumer(maxWaiting: 8); + consumer.ApplyState(new ConsumerState + { + Delivered = new SequencePair { Consumer = 10, Stream = 30 }, + AckFloor = new SequencePair { Consumer = 7, Stream = 22 }, + }); + + var (pending, error) = consumer.CheckNumPending(); + error.ShouldBeNull(); + pending.ShouldBe(8UL); + consumer.NumPending().ShouldBe(8UL); + consumer.CheckNumPendingOnEOF(); + + consumer.SetMaxPendingBytes(256); + var ackReply = consumer.AckReply(30, 11, 1, 12345, pending); + ackReply.ShouldContain("$JS.ACK.1.30.11.12345.8"); + } + + [Fact] + public void SendIdleHeartbeat_ShouldReturnFormattedHeartbeat() + { + var consumer = CreatePullConsumer(maxWaiting: 8); + consumer.ApplyState(new ConsumerState + { + Delivered = new SequencePair { Consumer = 5, Stream = 9 }, + AckFloor = new SequencePair { Consumer = 4, Stream = 8 }, + }); + + var heartbeat = consumer.SendIdleHeartbeat("$JS.HEARTBEAT"); + + heartbeat.ShouldContain("100 Idle Heartbeat"); + heartbeat.ShouldContain("Nats-Last-Consumer: 5"); + heartbeat.ShouldContain("Nats-Last-Stream: 9"); + } + + [Fact] + public void LoopAndGatherMsgs_WithPendingEntries_ShouldDeliverMessages() + { + var consumer = CreatePullConsumer(maxWaiting: 8); + consumer.ApplyState(new ConsumerState + { + Delivered = new SequencePair { Consumer = 0, Stream = 0 }, + AckFloor = new SequencePair { Consumer = 0, Stream = 0 }, + Pending = new Dictionary + { + [1] = new Pending { Sequence = 1, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }, + }, + }); + + consumer.ProcessNextMsgRequest("_INBOX.loop", Encoding.UTF8.GetBytes("{\"batch\":1}")).ShouldBeTrue(); + var delivered = consumer.LoopAndGatherMsgs(4, CancellationToken.None); + + delivered.ShouldBeGreaterThan(0); + consumer.GetConsumerState().Delivered.Stream.ShouldBeGreaterThanOrEqualTo(1UL); + } + + private static NatsConsumer CreatePullConsumer(int maxWaiting, TimeSpan? heartbeat = null) + { + var stream = NatsStream.Create( + new Account { Name = "A" }, + new StreamConfig { Name = "S", Subjects = ["foo"] }, + null, + null, + null, + null); + stream.ShouldNotBeNull(); + + var config = new ConsumerConfig + { + Durable = "D", + MaxWaiting = maxWaiting, + Heartbeat = heartbeat ?? TimeSpan.Zero, + }; + var consumer = NatsConsumer.Create(stream!, config, ConsumerAction.CreateOrUpdate, null); + consumer.ShouldNotBeNull(); + return consumer!; + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.Batch39.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.Batch39.cs new file mode 100644 index 0000000..b3fca60 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.Batch39.cs @@ -0,0 +1,153 @@ +using System.Text; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed partial class NatsConsumerTests +{ + [Fact] // T:1230 + public void JetStreamConsumerIsFilteredMatch_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", FilterSubjects = ["orders.created", "orders.*"] }); + consumer.IsFilteredMatch("orders.created").ShouldBeTrue(); + consumer.IsFilteredMatch("orders.updated").ShouldBeTrue(); + consumer.IsFilteredMatch("payments.created").ShouldBeFalse(); + } + + [Fact] // T:1232 + public void JetStreamConsumerIsEqualOrSubsetMatch_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", FilterSubjects = ["orders.created", "orders.*"] }); + consumer.IsEqualOrSubsetMatch("orders.created").ShouldBeTrue(); + consumer.IsEqualOrSubsetMatch("orders.updated").ShouldBeFalse(); + consumer.IsEqualOrSubsetMatch("payments.created").ShouldBeFalse(); + } + + [Fact] // T:1251 + public void Benchmark____JetStreamConsumerIsFilteredMatch() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", FilterSubject = "orders.*" }); + for (var i = 0; i < 100; i++) + consumer.IsFilteredMatch("orders.created").ShouldBeTrue(); + } + + [Fact] // T:1261 + public void JetStreamConsumerWithStartTime_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", MaxWaiting = 32 }); + var ok = consumer.ProcessNextMsgRequest("_INBOX.1", Encoding.UTF8.GetBytes("{\"batch\":2,\"expires\":\"00:00:01\"}")); + ok.ShouldBeTrue(); + + var pending = consumer.PendingRequests(); + pending.Count.ShouldBe(1); + pending["_INBOX.1"].N.ShouldBe(2); + pending["_INBOX.1"].Expires.ShouldNotBeNull(); + } + + [Fact] // T:1265 + public void JetStreamConsumerPullDelayedFirstPullWithReplayOriginal_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", MaxWaiting = 32 }); + consumer.ProcessNextMsgReq("_INBOX.req", Encoding.UTF8.GetBytes("{\"batch\":1}")); + consumer.ProcessNextMsgRequest("_INBOX.req", Encoding.UTF8.GetBytes("{\"batch\":1}")).ShouldBeTrue(); + + consumer.PendingRequests().ShouldContainKey("_INBOX.req"); + } + + [Fact] // T:1267 + public void JetStreamConsumerAckAck_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }); + consumer.ProcessAck("$JS.ACK.3.7.1", "reply", 0, Encoding.ASCII.GetBytes("+ACK")); + var state = consumer.GetConsumerState(); + state.AckFloor.Stream.ShouldBe(7UL); + state.AckFloor.Consumer.ShouldBe(3UL); + } + + [Fact] // T:1273 + public void JetStreamConsumerDurableFilteredSubjectReconnect_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", FilterSubjects = ["orders.created"] }); + consumer.IsFilteredMatch("orders.created").ShouldBeTrue(); + consumer.IsFilteredMatch("orders.updated").ShouldBeFalse(); + + consumer.UpdateConfig(new ConsumerConfig { Durable = "D", FilterSubjects = ["orders.*"] }); + consumer.IsFilteredMatch("orders.updated").ShouldBeTrue(); + } + + [Fact] // T:1277 + public void JetStreamConsumerReplayRate_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", ReplayPolicy = ReplayPolicy.ReplayInstant, RateLimit = 4_096 }); + consumer.SetRateLimit(4_096); + consumer.GetConfig().ReplayPolicy.ShouldBe(ReplayPolicy.ReplayInstant); + } + + [Fact] // T:1283 + public void JetStreamConsumerUpdateRedelivery_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D" }); + consumer.DeliveryCount(99).ShouldBe(1UL); + consumer.IncDeliveryCount(99).ShouldBe(2UL); + consumer.DeliveryCount(99).ShouldBe(1UL); + consumer.DecDeliveryCount(99); + consumer.DeliveryCount(99).ShouldBe(1UL); + } + + [Fact] // T:1284 + public void JetStreamConsumerMaxAckPending_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", MaxRequestBatch = 2, MaxWaiting = 32 }); + consumer.ProcessNextMsgRequest("_INBOX.1", Encoding.UTF8.GetBytes("{\"batch\":3}")).ShouldBeFalse(); + consumer.PendingRequests().Count.ShouldBe(0); + } + + [Fact] // T:1285 + public void JetStreamConsumerPullMaxAckPending_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", MaxRequestMaxBytes = 16, MaxWaiting = 32 }); + consumer.ProcessNextMsgRequest("_INBOX.1", Encoding.UTF8.GetBytes("{\"batch\":1,\"max_bytes\":17}")).ShouldBeFalse(); + consumer.ProcessNextMsgRequest("_INBOX.2", Encoding.UTF8.GetBytes("{\"batch\":1,\"max_bytes\":16}")).ShouldBeTrue(); + } + + [Fact] // T:1286 + public void JetStreamConsumerPullMaxAckPendingRedeliveries_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D" }); + consumer.NotifyDeliveryExceeded(123, 4); + consumer.IncDeliveryCount(123).ShouldBe(2UL); + consumer.NotifyDeliveryExceeded(123, 5); + } + + [Fact] // T:1339 + public void JetStreamConsumerPullRemoveInterest_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", MaxWaiting = 32 }); + consumer.ProcessNextMsgRequest("_INBOX.1", Encoding.UTF8.GetBytes("{\"batch\":1,\"expires\":\"00:00:00.001\"}")).ShouldBeTrue(); + Thread.Sleep(10); + consumer.NextWaiting(1).ShouldBeNull(); + consumer.PendingRequests().Count.ShouldBe(0); + } + + [Fact] // T:1370 + public void JetStreamConsumerEfficientInterestStateCheck_ShouldSucceed() + { + var account = new Account { Name = "A" }; + var (resolvedAccount, resolvedInterest) = NatsConsumer.TrackDownAccountAndInterest(account, "_INBOX.check"); + resolvedAccount.ShouldBeSameAs(account); + resolvedInterest.ShouldBe("_INBOX.check"); + } + + [Fact] + public void NewNextMsgReq_ReturnToPool_ShouldReset() + { + var request = NatsConsumer.NewNextMsgReq("_INBOX.req", Encoding.UTF8.GetBytes("{\"batch\":1}")); + request.Reply.ShouldBe("_INBOX.req"); + request.Message.Length.ShouldBeGreaterThan(0); + + request.ReturnToPool(); + request.Reply.ShouldBeEmpty(); + request.Message.ShouldBeEmpty(); + } +} diff --git a/reports/current.md b/reports/current.md index 33ee616..e37d961 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-03-01 06:35:03 UTC +Generated: 2026-03-01 06:35:21 UTC ## Modules (12 total)