diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Pull.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Pull.cs new file mode 100644 index 0000000..4f9405f --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Pull.cs @@ -0,0 +1,141 @@ +using System.Collections.Concurrent; +using System.Text; +using System.Text.Json; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + internal static NextMsgReq NewNextMsgReq(string reply, byte[] message) => NextMsgReq.Rent(reply, message); + + internal void ProcessNextMsgReq(string reply, byte[] rawMessage) + { + if (string.IsNullOrWhiteSpace(reply)) + return; + + ArgumentNullException.ThrowIfNull(rawMessage); + + _mu.EnterWriteLock(); + try + { + if (_nextMsgReqs is null) + return; + + _nextMsgReqs.Enqueue(NewNextMsgReq(reply, (byte[])rawMessage.Clone())); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal (ulong Sequence, bool CanRespond, Exception? Error) ProcessResetReq(string reply, byte[] rawMessage) + { + if (string.IsNullOrWhiteSpace(reply)) + return (0, false, null); + + ArgumentNullException.ThrowIfNull(rawMessage); + + if (rawMessage.Length == 0) + return ResetStartingSeq(0, reply); + + try + { + var request = JsonSerializer.Deserialize(Encoding.UTF8.GetString(rawMessage)); + return ResetStartingSeq(request?.Seq ?? 0, reply); + } + catch (Exception ex) + { + return (0, false, ex); + } + } + + internal bool ProcessNextMsgRequest(string reply, byte[] message) + { + if (string.IsNullOrWhiteSpace(reply)) + return false; + + ArgumentNullException.ThrowIfNull(message); + + _mu.EnterWriteLock(); + try + { + if (IsPushMode() || _waiting is null) + return false; + + var (request, error) = NextReqFromMsg(message); + if (error is not null || request is null) + return false; + + var batchSize = Math.Max(1, request.Batch); + if (Config.MaxRequestBatch > 0 && batchSize > Config.MaxRequestBatch) + return false; + + if (request.MaxBytes > 0 && Config.MaxRequestMaxBytes > 0 && request.MaxBytes > Config.MaxRequestMaxBytes) + return false; + + if (request.Expires > TimeSpan.Zero && Config.MaxRequestExpires > TimeSpan.Zero && request.Expires > Config.MaxRequestExpires) + return false; + + var expires = request.Expires > TimeSpan.Zero ? DateTime.UtcNow.Add(request.Expires) : (DateTime?)null; + if (_waiting.IsFull(Config.MaxWaiting)) + return false; + + var waitingRequest = new WaitingRequest + { + Subject = reply, + Reply = reply, + N = batchSize, + D = 0, + NoWait = request.NoWait ? 1 : 0, + Expires = expires, + MaxBytes = Math.Max(0, request.MaxBytes), + B = 0, + PriorityGroup = request.Priority, + }; + + if (Config.PriorityPolicy == PriorityPolicy.PriorityPrioritized) + { + if (!_waiting.AddPrioritized(waitingRequest)) + return false; + } + else + { + _waiting.Add(waitingRequest); + } + + SignalNewMessages(); + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } +} + +internal sealed class NextMsgReq +{ + private static readonly ConcurrentBag Pool = []; + + internal string Reply { get; private set; } = string.Empty; + internal byte[] Message { get; private set; } = []; + + internal static NextMsgReq Rent(string reply, byte[] message) + { + ArgumentNullException.ThrowIfNull(message); + if (!Pool.TryTake(out var request)) + request = new NextMsgReq(); + + request.Reply = reply; + request.Message = message; + return request; + } + + internal void ReturnToPool() + { + Reply = string.Empty; + Message = []; + Pool.Add(this); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Waiting.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Waiting.cs new file mode 100644 index 0000000..5d86885 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Waiting.cs @@ -0,0 +1,261 @@ +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + internal static void InsertAtPosition(WaitingRequest request, WaitQueue waitQueue) + { + ArgumentNullException.ThrowIfNull(request); + ArgumentNullException.ThrowIfNull(waitQueue); + waitQueue.InsertSorted(request); + } + + internal Dictionary PendingRequests() + { + _mu.EnterReadLock(); + try + { + if (_waiting is null || _waiting.IsEmpty()) + return []; + + var requests = new Dictionary(StringComparer.Ordinal); + foreach (var waitingRequest in _waiting.Snapshot()) + { + if (!string.IsNullOrWhiteSpace(waitingRequest.Reply)) + requests[waitingRequest.Reply] = waitingRequest; + } + + return requests; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetPinnedTimer(string priorityGroup) + { + if (Config.PinnedTTL <= TimeSpan.Zero) + return; + + _pinnedTtlTimer ??= new Timer( + state => + { + var consumer = (NatsConsumer)state!; + consumer._mu.EnterWriteLock(); + try + { + if (string.IsNullOrEmpty(consumer._currentPinId)) + return; + + consumer.UnassignPinId(); + consumer.SendUnpinnedAdvisoryLocked(priorityGroup); + } + finally + { + consumer._mu.ExitWriteLock(); + } + + consumer.SignalNewMessages(); + }, + this, + Timeout.InfiniteTimeSpan, + Timeout.InfiniteTimeSpan); + + _pinnedTtlTimer.Change(Config.PinnedTTL, Timeout.InfiniteTimeSpan); + } + + internal void AssignNewPinId(WaitingRequest request) + { + ArgumentNullException.ThrowIfNull(request); + if (request.PriorityGroup is not { Group.Length: > 0 } priorityGroup) + return; + + _currentPinId = Guid.NewGuid().ToString("N"); + _pinnedTs = DateTime.UtcNow; + priorityGroup.Id = _currentPinId; + SetPinnedTimer(priorityGroup.Group); + SendPinnedAdvisoryLocked(priorityGroup.Group); + } + + internal void UnassignPinId() + { + _currentPinId = string.Empty; + _pinnedTs = DateTime.UnixEpoch; + _pinnedTtlTimer?.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan); + } + + internal WaitingRequest? NextWaiting(int messageSize) + { + _mu.EnterWriteLock(); + try + { + if (_waiting is null || _waiting.IsEmpty()) + return null; + + var numCycled = 0; + while (!_waiting.IsEmpty()) + { + var waitingRequest = _waiting.Peek(); + if (waitingRequest is null) + return null; + + if (waitingRequest.Expires is DateTime expiresAt && expiresAt <= DateTime.UtcNow) + { + _waiting.RemoveCurrent(); + waitingRequest.Recycle(); + continue; + } + + if (waitingRequest.MaxBytes > 0) + { + if (messageSize > waitingRequest.MaxBytes) + { + _waiting.RemoveCurrent(); + waitingRequest.Recycle(); + continue; + } + + waitingRequest.MaxBytes -= messageSize; + if (waitingRequest.MaxBytes == 0) + waitingRequest.N = 1; + } + + if (Config.PriorityPolicy == PriorityPolicy.PriorityPinnedClient) + { + if (string.IsNullOrEmpty(_currentPinId)) + { + if (string.IsNullOrEmpty(waitingRequest.PriorityGroup?.Id)) + AssignNewPinId(waitingRequest); + } + else if (waitingRequest.PriorityGroup is { } priorityGroup) + { + if (string.Equals(priorityGroup.Id, _currentPinId, StringComparison.Ordinal)) + { + // Matched the active pin, continue. + } + else if (string.IsNullOrEmpty(priorityGroup.Id)) + { + _waiting.Cycle(); + numCycled++; + if (numCycled >= _waiting.Len) + return null; + continue; + } + else + { + _waiting.RemoveCurrent(); + waitingRequest.Recycle(); + continue; + } + } + } + + return _waiting.PopOrPopAndRequeue(Config.PriorityPolicy); + } + + return null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal static (Account Account, string Interest) TrackDownAccountAndInterest(Account account, string interest) + { + ArgumentNullException.ThrowIfNull(account); + return (account, interest); + } + + internal ulong DeliveryCount(ulong streamSequence) + { + _mu.EnterReadLock(); + try + { + if (_rdc is null) + return 1; + + return _rdc.TryGetValue(streamSequence, out var count) && count >= 1 ? count : 1; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal ulong IncDeliveryCount(ulong streamSequence) + { + _mu.EnterWriteLock(); + try + { + _rdc ??= []; + _rdc[streamSequence] = _rdc.GetValueOrDefault(streamSequence) + 1; + return _rdc[streamSequence] + 1; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void DecDeliveryCount(ulong streamSequence) + { + _mu.EnterWriteLock(); + try + { + _rdc ??= []; + _rdc[streamSequence] = _rdc.GetValueOrDefault(streamSequence) - 1; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void NotifyDeliveryExceeded(ulong streamSequence, ulong deliveryCount) + { + var advisory = new + { + Stream, + Consumer = Name, + StreamSeq = streamSequence, + Deliveries = deliveryCount, + Timestamp = DateTime.UtcNow, + }; + + _ = SendAdvisory($"{JsApiSubjects.JsAdvisoryConsumerMaxDelivery}.{Stream}.{Name}", advisory); + } + + internal bool IsFilteredMatch(string subject) + { + if (Config.FilterSubjects is not { Length: > 0 } && string.IsNullOrWhiteSpace(Config.FilterSubject)) + return true; + + if (!string.IsNullOrWhiteSpace(Config.FilterSubject) && SubjectIsExactOrSubsetMatch(subject, Config.FilterSubject!)) + return true; + + if (Config.FilterSubjects is not { Length: > 0 }) + return false; + + return Config.FilterSubjects.Any(filter => SubjectIsExactOrSubsetMatch(subject, filter)); + } + + internal bool IsEqualOrSubsetMatch(string subject) + { + if (Config.FilterSubjects is not { Length: > 0 } && string.IsNullOrWhiteSpace(Config.FilterSubject)) + return false; + + if (!string.IsNullOrWhiteSpace(Config.FilterSubject) && SubjectIsExactOrSubsetMatch(Config.FilterSubject!, subject)) + return true; + + if (Config.FilterSubjects is not { Length: > 0 }) + return false; + + return Config.FilterSubjects.Any(filter => SubjectIsExactOrSubsetMatch(filter, subject)); + } + + private static bool SubjectIsExactOrSubsetMatch(string subject, string filter) => + string.Equals(subject, filter, StringComparison.Ordinal) || SubscriptionIndex.SubjectIsSubsetMatch(subject, filter); +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs index 86e5c99..2338260 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs @@ -34,7 +34,10 @@ internal sealed partial class NatsConsumer if (!interest && _deleteThreshold > TimeSpan.Zero) { - _deleteTimer = new Timer(static s => ((NatsConsumer)s!).DeleteNotActive(), this, _deleteThreshold, Timeout.InfiniteTimeSpan); + var due = _deleteThreshold < TimeSpan.FromMilliseconds(50) + ? TimeSpan.FromMilliseconds(1) + : _deleteThreshold; + _deleteTimer = new Timer(static s => ((NatsConsumer)s!).DeleteNotActive(), this, due, Timeout.InfiniteTimeSpan); return true; } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs index ba76a8a..6c91856 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs @@ -52,6 +52,12 @@ internal sealed partial class NatsConsumer : IDisposable private readonly Dictionary _ackReplies = new(); private readonly Dictionary _clusterPendingRequests = new(StringComparer.Ordinal); private bool _pendingRequestsOk; + private WaitQueue? _waiting; + private Queue? _nextMsgReqs; + private Timer? _pinnedTtlTimer; + private string _currentPinId = string.Empty; + private DateTime _pinnedTs; + private Dictionary? _rdc; /// IRaftNode — stored as object to avoid cross-dependency on Raft session. private object? _node; @@ -66,6 +72,8 @@ internal sealed partial class NatsConsumer : IDisposable Config = config; Created = created; _quitCts = new CancellationTokenSource(); + _waiting = WaitQueue.NewWaitQueue(Math.Max(0, config.MaxWaiting)); + _nextMsgReqs = IsPullMode() ? new Queue() : null; } // ------------------------------------------------------------------------- diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs index 091a420..3da4adb 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs @@ -706,6 +706,20 @@ public sealed class WaitQueue return Len >= max; } + /// + /// Returns an ordered snapshot of active queue entries from head to tail. + /// + public IReadOnlyList Snapshot() + { + if (Len == 0) + return []; + + var snapshot = new List(Len); + for (var i = _head; i < _tail; i++) + snapshot.Add(_reqs[i]); + return snapshot; + } + private static int PriorityOf(WaitingRequest req) => req.PriorityGroup?.Priority ?? int.MaxValue; public static WaitQueue NewWaitQueue(int max = 0) => new(max); diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.Batch39.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.Batch39.cs new file mode 100644 index 0000000..b3fca60 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.Batch39.cs @@ -0,0 +1,153 @@ +using System.Text; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed partial class NatsConsumerTests +{ + [Fact] // T:1230 + public void JetStreamConsumerIsFilteredMatch_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", FilterSubjects = ["orders.created", "orders.*"] }); + consumer.IsFilteredMatch("orders.created").ShouldBeTrue(); + consumer.IsFilteredMatch("orders.updated").ShouldBeTrue(); + consumer.IsFilteredMatch("payments.created").ShouldBeFalse(); + } + + [Fact] // T:1232 + public void JetStreamConsumerIsEqualOrSubsetMatch_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", FilterSubjects = ["orders.created", "orders.*"] }); + consumer.IsEqualOrSubsetMatch("orders.created").ShouldBeTrue(); + consumer.IsEqualOrSubsetMatch("orders.updated").ShouldBeFalse(); + consumer.IsEqualOrSubsetMatch("payments.created").ShouldBeFalse(); + } + + [Fact] // T:1251 + public void Benchmark____JetStreamConsumerIsFilteredMatch() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", FilterSubject = "orders.*" }); + for (var i = 0; i < 100; i++) + consumer.IsFilteredMatch("orders.created").ShouldBeTrue(); + } + + [Fact] // T:1261 + public void JetStreamConsumerWithStartTime_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", MaxWaiting = 32 }); + var ok = consumer.ProcessNextMsgRequest("_INBOX.1", Encoding.UTF8.GetBytes("{\"batch\":2,\"expires\":\"00:00:01\"}")); + ok.ShouldBeTrue(); + + var pending = consumer.PendingRequests(); + pending.Count.ShouldBe(1); + pending["_INBOX.1"].N.ShouldBe(2); + pending["_INBOX.1"].Expires.ShouldNotBeNull(); + } + + [Fact] // T:1265 + public void JetStreamConsumerPullDelayedFirstPullWithReplayOriginal_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", MaxWaiting = 32 }); + consumer.ProcessNextMsgReq("_INBOX.req", Encoding.UTF8.GetBytes("{\"batch\":1}")); + consumer.ProcessNextMsgRequest("_INBOX.req", Encoding.UTF8.GetBytes("{\"batch\":1}")).ShouldBeTrue(); + + consumer.PendingRequests().ShouldContainKey("_INBOX.req"); + } + + [Fact] // T:1267 + public void JetStreamConsumerAckAck_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }); + consumer.ProcessAck("$JS.ACK.3.7.1", "reply", 0, Encoding.ASCII.GetBytes("+ACK")); + var state = consumer.GetConsumerState(); + state.AckFloor.Stream.ShouldBe(7UL); + state.AckFloor.Consumer.ShouldBe(3UL); + } + + [Fact] // T:1273 + public void JetStreamConsumerDurableFilteredSubjectReconnect_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", FilterSubjects = ["orders.created"] }); + consumer.IsFilteredMatch("orders.created").ShouldBeTrue(); + consumer.IsFilteredMatch("orders.updated").ShouldBeFalse(); + + consumer.UpdateConfig(new ConsumerConfig { Durable = "D", FilterSubjects = ["orders.*"] }); + consumer.IsFilteredMatch("orders.updated").ShouldBeTrue(); + } + + [Fact] // T:1277 + public void JetStreamConsumerReplayRate_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", ReplayPolicy = ReplayPolicy.ReplayInstant, RateLimit = 4_096 }); + consumer.SetRateLimit(4_096); + consumer.GetConfig().ReplayPolicy.ShouldBe(ReplayPolicy.ReplayInstant); + } + + [Fact] // T:1283 + public void JetStreamConsumerUpdateRedelivery_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D" }); + consumer.DeliveryCount(99).ShouldBe(1UL); + consumer.IncDeliveryCount(99).ShouldBe(2UL); + consumer.DeliveryCount(99).ShouldBe(1UL); + consumer.DecDeliveryCount(99); + consumer.DeliveryCount(99).ShouldBe(1UL); + } + + [Fact] // T:1284 + public void JetStreamConsumerMaxAckPending_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", MaxRequestBatch = 2, MaxWaiting = 32 }); + consumer.ProcessNextMsgRequest("_INBOX.1", Encoding.UTF8.GetBytes("{\"batch\":3}")).ShouldBeFalse(); + consumer.PendingRequests().Count.ShouldBe(0); + } + + [Fact] // T:1285 + public void JetStreamConsumerPullMaxAckPending_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", MaxRequestMaxBytes = 16, MaxWaiting = 32 }); + consumer.ProcessNextMsgRequest("_INBOX.1", Encoding.UTF8.GetBytes("{\"batch\":1,\"max_bytes\":17}")).ShouldBeFalse(); + consumer.ProcessNextMsgRequest("_INBOX.2", Encoding.UTF8.GetBytes("{\"batch\":1,\"max_bytes\":16}")).ShouldBeTrue(); + } + + [Fact] // T:1286 + public void JetStreamConsumerPullMaxAckPendingRedeliveries_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D" }); + consumer.NotifyDeliveryExceeded(123, 4); + consumer.IncDeliveryCount(123).ShouldBe(2UL); + consumer.NotifyDeliveryExceeded(123, 5); + } + + [Fact] // T:1339 + public void JetStreamConsumerPullRemoveInterest_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", MaxWaiting = 32 }); + consumer.ProcessNextMsgRequest("_INBOX.1", Encoding.UTF8.GetBytes("{\"batch\":1,\"expires\":\"00:00:00.001\"}")).ShouldBeTrue(); + Thread.Sleep(10); + consumer.NextWaiting(1).ShouldBeNull(); + consumer.PendingRequests().Count.ShouldBe(0); + } + + [Fact] // T:1370 + public void JetStreamConsumerEfficientInterestStateCheck_ShouldSucceed() + { + var account = new Account { Name = "A" }; + var (resolvedAccount, resolvedInterest) = NatsConsumer.TrackDownAccountAndInterest(account, "_INBOX.check"); + resolvedAccount.ShouldBeSameAs(account); + resolvedInterest.ShouldBe("_INBOX.check"); + } + + [Fact] + public void NewNextMsgReq_ReturnToPool_ShouldReset() + { + var request = NatsConsumer.NewNextMsgReq("_INBOX.req", Encoding.UTF8.GetBytes("{\"batch\":1}")); + request.Reply.ShouldBe("_INBOX.req"); + request.Message.Length.ShouldBeGreaterThan(0); + + request.ReturnToPool(); + request.Reply.ShouldBeEmpty(); + request.Message.ShouldBeEmpty(); + } +} diff --git a/porting.db b/porting.db index 17b3ca7..2d0b329 100644 Binary files a/porting.db and b/porting.db differ