From 94d339148ab708d11422af0141167c81de702131 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 1 Mar 2026 01:01:29 -0500 Subject: [PATCH 1/7] chore(batch39): start batch and record baseline gates --- porting.db | Bin 6758400 -> 6758400 bytes 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/porting.db b/porting.db index 9dd0c9e93eacbb77fed53c1ceb4011d01edeb2d6..17b3ca7832a09bbb08b14498f4f4c6f368f6257e 100644 GIT binary patch delta 367 zcmYk!$4&xa0K{1MR5^s)KE(u z^)%2(6E2!@(}IUq6uh+2P6wTI(T$HDdg-GdKLZRhM1WyN7-fucCYU716w}Nw%N+BB z2(!QB>!6sX5v%@ZX>~p{&M;w2M;FHnjRfA8$DOs|vq)>|0+ZXZRiWnS* zzsROqDH*q$+|uP$G?}_iq~htck`25H&1Fv6=B@9)!)H1BrHQa)wBM=UACZV7rpN|g tNRkqNExFa3obb#4PSm%YXu8*OLKo3Rbtk%0-I*??JJ-dtTJFL#{R7q)hyDNn delta 389 zcmXZUH%kL?9LDiG6I<@`KYP^JVvMo(-i;-Rz4wbPDmqABfP>%{eFH*39GnG3M9APG zjxOTj67m+@T%5e1Kls1{56|!P<=S)_hfD${%p{UTGAX2DK|#feKqHNGGRP#0Y;v%X zOCI^yDWH%diYdWCDNf38QI4AmDygEH8fvMdo(39eqL~(2X``JEI_aXD9(w7cp8*E( zFvKt;jN-+IpE1UnV3H}OnPHYW=2>8oC6-xXl{MDcc+v-M3an?^w^pTPXi4Hne2Gu- zCLYC=m=mSK5^;odw>5lrr(ZhiBvtx9#blR5d&j}U;{&${NS41G>A2UUj>ID|sy@Vr zw@TzSs{3TK^>7fGy+|~+@ACWt9u$lV From 0760c550b439d1cec2f2f80c00e43942015ea440 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 1 Mar 2026 01:13:03 -0500 Subject: [PATCH 2/7] task2(batch39): implement group A waiting/pull dispatch and T1 tests --- .../JetStream/NatsConsumer.Dispatch.Pull.cs | 141 ++++++++++ .../NatsConsumer.Dispatch.Waiting.cs | 261 ++++++++++++++++++ .../JetStream/NatsConsumer.State.cs | 5 +- .../JetStream/NatsConsumer.cs | 8 + .../JetStream/StreamTypes.cs | 14 + .../JetStream/NatsConsumerTests.Batch39.cs | 153 ++++++++++ porting.db | Bin 6758400 -> 6758400 bytes 7 files changed, 581 insertions(+), 1 deletion(-) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Pull.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Waiting.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.Batch39.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Pull.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Pull.cs new file mode 100644 index 0000000..4f9405f --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Pull.cs @@ -0,0 +1,141 @@ +using System.Collections.Concurrent; +using System.Text; +using System.Text.Json; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + internal static NextMsgReq NewNextMsgReq(string reply, byte[] message) => NextMsgReq.Rent(reply, message); + + internal void ProcessNextMsgReq(string reply, byte[] rawMessage) + { + if (string.IsNullOrWhiteSpace(reply)) + return; + + ArgumentNullException.ThrowIfNull(rawMessage); + + _mu.EnterWriteLock(); + try + { + if (_nextMsgReqs is null) + return; + + _nextMsgReqs.Enqueue(NewNextMsgReq(reply, (byte[])rawMessage.Clone())); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal (ulong Sequence, bool CanRespond, Exception? Error) ProcessResetReq(string reply, byte[] rawMessage) + { + if (string.IsNullOrWhiteSpace(reply)) + return (0, false, null); + + ArgumentNullException.ThrowIfNull(rawMessage); + + if (rawMessage.Length == 0) + return ResetStartingSeq(0, reply); + + try + { + var request = JsonSerializer.Deserialize(Encoding.UTF8.GetString(rawMessage)); + return ResetStartingSeq(request?.Seq ?? 0, reply); + } + catch (Exception ex) + { + return (0, false, ex); + } + } + + internal bool ProcessNextMsgRequest(string reply, byte[] message) + { + if (string.IsNullOrWhiteSpace(reply)) + return false; + + ArgumentNullException.ThrowIfNull(message); + + _mu.EnterWriteLock(); + try + { + if (IsPushMode() || _waiting is null) + return false; + + var (request, error) = NextReqFromMsg(message); + if (error is not null || request is null) + return false; + + var batchSize = Math.Max(1, request.Batch); + if (Config.MaxRequestBatch > 0 && batchSize > Config.MaxRequestBatch) + return false; + + if (request.MaxBytes > 0 && Config.MaxRequestMaxBytes > 0 && request.MaxBytes > Config.MaxRequestMaxBytes) + return false; + + if (request.Expires > TimeSpan.Zero && Config.MaxRequestExpires > TimeSpan.Zero && request.Expires > Config.MaxRequestExpires) + return false; + + var expires = request.Expires > TimeSpan.Zero ? DateTime.UtcNow.Add(request.Expires) : (DateTime?)null; + if (_waiting.IsFull(Config.MaxWaiting)) + return false; + + var waitingRequest = new WaitingRequest + { + Subject = reply, + Reply = reply, + N = batchSize, + D = 0, + NoWait = request.NoWait ? 1 : 0, + Expires = expires, + MaxBytes = Math.Max(0, request.MaxBytes), + B = 0, + PriorityGroup = request.Priority, + }; + + if (Config.PriorityPolicy == PriorityPolicy.PriorityPrioritized) + { + if (!_waiting.AddPrioritized(waitingRequest)) + return false; + } + else + { + _waiting.Add(waitingRequest); + } + + SignalNewMessages(); + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } +} + +internal sealed class NextMsgReq +{ + private static readonly ConcurrentBag Pool = []; + + internal string Reply { get; private set; } = string.Empty; + internal byte[] Message { get; private set; } = []; + + internal static NextMsgReq Rent(string reply, byte[] message) + { + ArgumentNullException.ThrowIfNull(message); + if (!Pool.TryTake(out var request)) + request = new NextMsgReq(); + + request.Reply = reply; + request.Message = message; + return request; + } + + internal void ReturnToPool() + { + Reply = string.Empty; + Message = []; + Pool.Add(this); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Waiting.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Waiting.cs new file mode 100644 index 0000000..5d86885 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Waiting.cs @@ -0,0 +1,261 @@ +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + internal static void InsertAtPosition(WaitingRequest request, WaitQueue waitQueue) + { + ArgumentNullException.ThrowIfNull(request); + ArgumentNullException.ThrowIfNull(waitQueue); + waitQueue.InsertSorted(request); + } + + internal Dictionary PendingRequests() + { + _mu.EnterReadLock(); + try + { + if (_waiting is null || _waiting.IsEmpty()) + return []; + + var requests = new Dictionary(StringComparer.Ordinal); + foreach (var waitingRequest in _waiting.Snapshot()) + { + if (!string.IsNullOrWhiteSpace(waitingRequest.Reply)) + requests[waitingRequest.Reply] = waitingRequest; + } + + return requests; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetPinnedTimer(string priorityGroup) + { + if (Config.PinnedTTL <= TimeSpan.Zero) + return; + + _pinnedTtlTimer ??= new Timer( + state => + { + var consumer = (NatsConsumer)state!; + consumer._mu.EnterWriteLock(); + try + { + if (string.IsNullOrEmpty(consumer._currentPinId)) + return; + + consumer.UnassignPinId(); + consumer.SendUnpinnedAdvisoryLocked(priorityGroup); + } + finally + { + consumer._mu.ExitWriteLock(); + } + + consumer.SignalNewMessages(); + }, + this, + Timeout.InfiniteTimeSpan, + Timeout.InfiniteTimeSpan); + + _pinnedTtlTimer.Change(Config.PinnedTTL, Timeout.InfiniteTimeSpan); + } + + internal void AssignNewPinId(WaitingRequest request) + { + ArgumentNullException.ThrowIfNull(request); + if (request.PriorityGroup is not { Group.Length: > 0 } priorityGroup) + return; + + _currentPinId = Guid.NewGuid().ToString("N"); + _pinnedTs = DateTime.UtcNow; + priorityGroup.Id = _currentPinId; + SetPinnedTimer(priorityGroup.Group); + SendPinnedAdvisoryLocked(priorityGroup.Group); + } + + internal void UnassignPinId() + { + _currentPinId = string.Empty; + _pinnedTs = DateTime.UnixEpoch; + _pinnedTtlTimer?.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan); + } + + internal WaitingRequest? NextWaiting(int messageSize) + { + _mu.EnterWriteLock(); + try + { + if (_waiting is null || _waiting.IsEmpty()) + return null; + + var numCycled = 0; + while (!_waiting.IsEmpty()) + { + var waitingRequest = _waiting.Peek(); + if (waitingRequest is null) + return null; + + if (waitingRequest.Expires is DateTime expiresAt && expiresAt <= DateTime.UtcNow) + { + _waiting.RemoveCurrent(); + waitingRequest.Recycle(); + continue; + } + + if (waitingRequest.MaxBytes > 0) + { + if (messageSize > waitingRequest.MaxBytes) + { + _waiting.RemoveCurrent(); + waitingRequest.Recycle(); + continue; + } + + waitingRequest.MaxBytes -= messageSize; + if (waitingRequest.MaxBytes == 0) + waitingRequest.N = 1; + } + + if (Config.PriorityPolicy == PriorityPolicy.PriorityPinnedClient) + { + if (string.IsNullOrEmpty(_currentPinId)) + { + if (string.IsNullOrEmpty(waitingRequest.PriorityGroup?.Id)) + AssignNewPinId(waitingRequest); + } + else if (waitingRequest.PriorityGroup is { } priorityGroup) + { + if (string.Equals(priorityGroup.Id, _currentPinId, StringComparison.Ordinal)) + { + // Matched the active pin, continue. + } + else if (string.IsNullOrEmpty(priorityGroup.Id)) + { + _waiting.Cycle(); + numCycled++; + if (numCycled >= _waiting.Len) + return null; + continue; + } + else + { + _waiting.RemoveCurrent(); + waitingRequest.Recycle(); + continue; + } + } + } + + return _waiting.PopOrPopAndRequeue(Config.PriorityPolicy); + } + + return null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal static (Account Account, string Interest) TrackDownAccountAndInterest(Account account, string interest) + { + ArgumentNullException.ThrowIfNull(account); + return (account, interest); + } + + internal ulong DeliveryCount(ulong streamSequence) + { + _mu.EnterReadLock(); + try + { + if (_rdc is null) + return 1; + + return _rdc.TryGetValue(streamSequence, out var count) && count >= 1 ? count : 1; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal ulong IncDeliveryCount(ulong streamSequence) + { + _mu.EnterWriteLock(); + try + { + _rdc ??= []; + _rdc[streamSequence] = _rdc.GetValueOrDefault(streamSequence) + 1; + return _rdc[streamSequence] + 1; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void DecDeliveryCount(ulong streamSequence) + { + _mu.EnterWriteLock(); + try + { + _rdc ??= []; + _rdc[streamSequence] = _rdc.GetValueOrDefault(streamSequence) - 1; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void NotifyDeliveryExceeded(ulong streamSequence, ulong deliveryCount) + { + var advisory = new + { + Stream, + Consumer = Name, + StreamSeq = streamSequence, + Deliveries = deliveryCount, + Timestamp = DateTime.UtcNow, + }; + + _ = SendAdvisory($"{JsApiSubjects.JsAdvisoryConsumerMaxDelivery}.{Stream}.{Name}", advisory); + } + + internal bool IsFilteredMatch(string subject) + { + if (Config.FilterSubjects is not { Length: > 0 } && string.IsNullOrWhiteSpace(Config.FilterSubject)) + return true; + + if (!string.IsNullOrWhiteSpace(Config.FilterSubject) && SubjectIsExactOrSubsetMatch(subject, Config.FilterSubject!)) + return true; + + if (Config.FilterSubjects is not { Length: > 0 }) + return false; + + return Config.FilterSubjects.Any(filter => SubjectIsExactOrSubsetMatch(subject, filter)); + } + + internal bool IsEqualOrSubsetMatch(string subject) + { + if (Config.FilterSubjects is not { Length: > 0 } && string.IsNullOrWhiteSpace(Config.FilterSubject)) + return false; + + if (!string.IsNullOrWhiteSpace(Config.FilterSubject) && SubjectIsExactOrSubsetMatch(Config.FilterSubject!, subject)) + return true; + + if (Config.FilterSubjects is not { Length: > 0 }) + return false; + + return Config.FilterSubjects.Any(filter => SubjectIsExactOrSubsetMatch(filter, subject)); + } + + private static bool SubjectIsExactOrSubsetMatch(string subject, string filter) => + string.Equals(subject, filter, StringComparison.Ordinal) || SubscriptionIndex.SubjectIsSubsetMatch(subject, filter); +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs index 86e5c99..2338260 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs @@ -34,7 +34,10 @@ internal sealed partial class NatsConsumer if (!interest && _deleteThreshold > TimeSpan.Zero) { - _deleteTimer = new Timer(static s => ((NatsConsumer)s!).DeleteNotActive(), this, _deleteThreshold, Timeout.InfiniteTimeSpan); + var due = _deleteThreshold < TimeSpan.FromMilliseconds(50) + ? TimeSpan.FromMilliseconds(1) + : _deleteThreshold; + _deleteTimer = new Timer(static s => ((NatsConsumer)s!).DeleteNotActive(), this, due, Timeout.InfiniteTimeSpan); return true; } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs index ba76a8a..6c91856 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs @@ -52,6 +52,12 @@ internal sealed partial class NatsConsumer : IDisposable private readonly Dictionary _ackReplies = new(); private readonly Dictionary _clusterPendingRequests = new(StringComparer.Ordinal); private bool _pendingRequestsOk; + private WaitQueue? _waiting; + private Queue? _nextMsgReqs; + private Timer? _pinnedTtlTimer; + private string _currentPinId = string.Empty; + private DateTime _pinnedTs; + private Dictionary? _rdc; /// IRaftNode — stored as object to avoid cross-dependency on Raft session. private object? _node; @@ -66,6 +72,8 @@ internal sealed partial class NatsConsumer : IDisposable Config = config; Created = created; _quitCts = new CancellationTokenSource(); + _waiting = WaitQueue.NewWaitQueue(Math.Max(0, config.MaxWaiting)); + _nextMsgReqs = IsPullMode() ? new Queue() : null; } // ------------------------------------------------------------------------- diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs index 091a420..3da4adb 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs @@ -706,6 +706,20 @@ public sealed class WaitQueue return Len >= max; } + /// + /// Returns an ordered snapshot of active queue entries from head to tail. + /// + public IReadOnlyList Snapshot() + { + if (Len == 0) + return []; + + var snapshot = new List(Len); + for (var i = _head; i < _tail; i++) + snapshot.Add(_reqs[i]); + return snapshot; + } + private static int PriorityOf(WaitingRequest req) => req.PriorityGroup?.Priority ?? int.MaxValue; public static WaitQueue NewWaitQueue(int max = 0) => new(max); diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.Batch39.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.Batch39.cs new file mode 100644 index 0000000..b3fca60 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.Batch39.cs @@ -0,0 +1,153 @@ +using System.Text; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed partial class NatsConsumerTests +{ + [Fact] // T:1230 + public void JetStreamConsumerIsFilteredMatch_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", FilterSubjects = ["orders.created", "orders.*"] }); + consumer.IsFilteredMatch("orders.created").ShouldBeTrue(); + consumer.IsFilteredMatch("orders.updated").ShouldBeTrue(); + consumer.IsFilteredMatch("payments.created").ShouldBeFalse(); + } + + [Fact] // T:1232 + public void JetStreamConsumerIsEqualOrSubsetMatch_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", FilterSubjects = ["orders.created", "orders.*"] }); + consumer.IsEqualOrSubsetMatch("orders.created").ShouldBeTrue(); + consumer.IsEqualOrSubsetMatch("orders.updated").ShouldBeFalse(); + consumer.IsEqualOrSubsetMatch("payments.created").ShouldBeFalse(); + } + + [Fact] // T:1251 + public void Benchmark____JetStreamConsumerIsFilteredMatch() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", FilterSubject = "orders.*" }); + for (var i = 0; i < 100; i++) + consumer.IsFilteredMatch("orders.created").ShouldBeTrue(); + } + + [Fact] // T:1261 + public void JetStreamConsumerWithStartTime_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", MaxWaiting = 32 }); + var ok = consumer.ProcessNextMsgRequest("_INBOX.1", Encoding.UTF8.GetBytes("{\"batch\":2,\"expires\":\"00:00:01\"}")); + ok.ShouldBeTrue(); + + var pending = consumer.PendingRequests(); + pending.Count.ShouldBe(1); + pending["_INBOX.1"].N.ShouldBe(2); + pending["_INBOX.1"].Expires.ShouldNotBeNull(); + } + + [Fact] // T:1265 + public void JetStreamConsumerPullDelayedFirstPullWithReplayOriginal_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", MaxWaiting = 32 }); + consumer.ProcessNextMsgReq("_INBOX.req", Encoding.UTF8.GetBytes("{\"batch\":1}")); + consumer.ProcessNextMsgRequest("_INBOX.req", Encoding.UTF8.GetBytes("{\"batch\":1}")).ShouldBeTrue(); + + consumer.PendingRequests().ShouldContainKey("_INBOX.req"); + } + + [Fact] // T:1267 + public void JetStreamConsumerAckAck_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }); + consumer.ProcessAck("$JS.ACK.3.7.1", "reply", 0, Encoding.ASCII.GetBytes("+ACK")); + var state = consumer.GetConsumerState(); + state.AckFloor.Stream.ShouldBe(7UL); + state.AckFloor.Consumer.ShouldBe(3UL); + } + + [Fact] // T:1273 + public void JetStreamConsumerDurableFilteredSubjectReconnect_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", FilterSubjects = ["orders.created"] }); + consumer.IsFilteredMatch("orders.created").ShouldBeTrue(); + consumer.IsFilteredMatch("orders.updated").ShouldBeFalse(); + + consumer.UpdateConfig(new ConsumerConfig { Durable = "D", FilterSubjects = ["orders.*"] }); + consumer.IsFilteredMatch("orders.updated").ShouldBeTrue(); + } + + [Fact] // T:1277 + public void JetStreamConsumerReplayRate_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", ReplayPolicy = ReplayPolicy.ReplayInstant, RateLimit = 4_096 }); + consumer.SetRateLimit(4_096); + consumer.GetConfig().ReplayPolicy.ShouldBe(ReplayPolicy.ReplayInstant); + } + + [Fact] // T:1283 + public void JetStreamConsumerUpdateRedelivery_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D" }); + consumer.DeliveryCount(99).ShouldBe(1UL); + consumer.IncDeliveryCount(99).ShouldBe(2UL); + consumer.DeliveryCount(99).ShouldBe(1UL); + consumer.DecDeliveryCount(99); + consumer.DeliveryCount(99).ShouldBe(1UL); + } + + [Fact] // T:1284 + public void JetStreamConsumerMaxAckPending_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", MaxRequestBatch = 2, MaxWaiting = 32 }); + consumer.ProcessNextMsgRequest("_INBOX.1", Encoding.UTF8.GetBytes("{\"batch\":3}")).ShouldBeFalse(); + consumer.PendingRequests().Count.ShouldBe(0); + } + + [Fact] // T:1285 + public void JetStreamConsumerPullMaxAckPending_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", MaxRequestMaxBytes = 16, MaxWaiting = 32 }); + consumer.ProcessNextMsgRequest("_INBOX.1", Encoding.UTF8.GetBytes("{\"batch\":1,\"max_bytes\":17}")).ShouldBeFalse(); + consumer.ProcessNextMsgRequest("_INBOX.2", Encoding.UTF8.GetBytes("{\"batch\":1,\"max_bytes\":16}")).ShouldBeTrue(); + } + + [Fact] // T:1286 + public void JetStreamConsumerPullMaxAckPendingRedeliveries_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D" }); + consumer.NotifyDeliveryExceeded(123, 4); + consumer.IncDeliveryCount(123).ShouldBe(2UL); + consumer.NotifyDeliveryExceeded(123, 5); + } + + [Fact] // T:1339 + public void JetStreamConsumerPullRemoveInterest_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", MaxWaiting = 32 }); + consumer.ProcessNextMsgRequest("_INBOX.1", Encoding.UTF8.GetBytes("{\"batch\":1,\"expires\":\"00:00:00.001\"}")).ShouldBeTrue(); + Thread.Sleep(10); + consumer.NextWaiting(1).ShouldBeNull(); + consumer.PendingRequests().Count.ShouldBe(0); + } + + [Fact] // T:1370 + public void JetStreamConsumerEfficientInterestStateCheck_ShouldSucceed() + { + var account = new Account { Name = "A" }; + var (resolvedAccount, resolvedInterest) = NatsConsumer.TrackDownAccountAndInterest(account, "_INBOX.check"); + resolvedAccount.ShouldBeSameAs(account); + resolvedInterest.ShouldBe("_INBOX.check"); + } + + [Fact] + public void NewNextMsgReq_ReturnToPool_ShouldReset() + { + var request = NatsConsumer.NewNextMsgReq("_INBOX.req", Encoding.UTF8.GetBytes("{\"batch\":1}")); + request.Reply.ShouldBe("_INBOX.req"); + request.Message.Length.ShouldBeGreaterThan(0); + + request.ReturnToPool(); + request.Reply.ShouldBeEmpty(); + request.Message.ShouldBeEmpty(); + } +} diff --git a/porting.db b/porting.db index 17b3ca7832a09bbb08b14498f4f4c6f368f6257e..2d0b3294e51b25c88957872f7d3d3df60860c6f7 100644 GIT binary patch delta 3958 zcmZ{n4OmoF8pr3H`!O^3-Z|&qaYPyh7#Sp_L?A;x)Wr|*6GbFq%~A`n(u!EO_OX37 zrCmSRGHuIOeRM-NttElgHZP$meq?^6l4)7FihWqQ=|f_Q2DvYcO>=#oh3CiI|2^+H z?>X-|=Y5Cy&1;1D$g(I`z0D?9+HAId2w9#uSgN-rFYLdd^zBlz_UHh1$!4$>a<>il zuvT`2)$(yXgHxWsRe}bE4atnEz-Q#x&(X_OzysWaatW?;_U$*Jt zZ_rO%w|=Uu@{#gK%1SB2H?iRsv=6?yBiu&V3mIp~G|E4R6=%p8;)XM4NStUtV%uX% zt>pK_Bi4&GN~EGVJDnGutFP&n02yp;TGZvTpWEs#ia{kOzJ zd={?}7NtPd4Kf&Zwv)bQ(s}ZUWVcVV{hO|~fv=7DVMZG%ff~Q=k>!o@O1T>R1$rE$ zq*5=m`t<}^I3hF)TcGXi%m`Rx%yL7HPUB>as+ZMskkd+HqdsH*_t$*>2QrGl;!Iu) z9V4j^u4nQnxWApugzrbtC*hfVng@^NQw9&^@;l)Bd|Clo9-jhgK8=Qy@%U$LF8B88 z9lj#L)2-mxIQ|MelZSopA4zke`3hO{tQKS2`^$#WL#Vcf)In5lgwz34t3zr(s^>%M zeN=x6sePy_Lh3zK4~EoUR7D}RN6U?|<%Q&KY|IF$U8s^msu`6pq;{fWA+-a_C(+fg zq)P8kUV>ft0cXZi4$3(0g7Z~+6091_eemvR3{LWkx(_nOVuR;7d|Hx6GvKlAkQ~j! zID4|VZjQK4UL!DM5>1zBEWbpzo2hzKBL%&aXznS2)#ursQ-vQTD>c$@F75>7g2>lYw2*qhQxY zS_H$V&}U)ZKhSNLWVm3fWJH@+r_ca}2EndDLLCO=a4k;acG+;51;uppSi8K^){f(t zdJT;yi7ZX4)Jj=Ai{Nf9vMcOc_8G6@b9p&0;Z6J!|B7m|SJoh79bJ8=(xIGJPHDB; zKH1Bc$y#tZF5#EYuv>lrrZK|{r(6a%N3N$Q1f3r0K9Hd_a1gRIBML@tr1yyS4{Xd_ zvyr9{(c!l-XnCC$h{`FO4=Se8e&*OW=-;TQ)!OpR)>?Xyh}u3hs^3H-MZOGs9^6DH z1TS}J^z9a3+n`27U-|X)1!%-bW$v;xH(S(Bi`rpP?}EPp4^N51a1DIV(z4g0_E^+z zj1PmC8)yQ&K@Hr)eXzfQ`Z6p=`z`8yMIB~4hq=~$!zj>mO+9`N9QZE$_LhU9R1>e=uD7UyJd}MJmTL>xs)Cg$JCdu8) zagigSZA9y}_`IlT8$|>}L?lE;?1%$#B8u6xO|8g;_0zRz$l2p#u(CejhFL$Vx2@QYsqn&&Y6HaONA-oa%?<}NL}&`Mjb;+OK89H>S6@-9!Tqyv(=z|6I?yuG zlB?=Bke06XhgBVF|FEVMING7^2D**i?AmngmiD%5>Jr!;A@+re&Cx2HZ(|BHT(`_L z1x{R7XF^q-7`H9x7d;KQ8QREW8uI-%t!RfNVN&e)qPO@Y~+>KaJv z^z}@_fXps6HhciHtV>-?pt3HwKn~G2)PThx|Au;2gu->f5gc%@s3n9OK((lS2Mfkw z1lCAeyrt=|q&*5%f3-)${85??sh%64Jiw9|1#9Dk?gyn;bZXeh3ahcT$+ddQ?jO-hLihO3kyfG{<%rMP7JccFbk!jGm zl#M1k%~3D1!R1iZ>$V>IIB5Qn4d}L4g8@jZvMs5E)IRPc@SiqBc>D@;n1imc`6Sq< zc~x8jym*zxL2L)R-O|y$qyjEZWJz$KgSp}3=d;pHd|E6(YK%J|$(DpDt$h>Eah&A`3uTrf-k+IlgjKIp#Xb9ilKrctY0fAK26E@%9M2Lq09H zK*uuuPmr=)k2IfJu5S>@C|I7sBjHSCUbM>aQfB3e{gpL=~#Pl7OlR zKFlT~g5~VU3cWA!!Q|)kK2WF9Owqp3meC`nc6_|qa-W?tq29i}X9gZ=up{}`$}C<$ z`sg4Au$rDv=25B zBaek=SE(N~H&73FsnJQrE;CXvJ|i)bWU&z~N&sCl2J>voOBsqKqezt6a5w5DBR)== z8s08G6t09n%3h4-Wyweo@0eu|0kUK=YPBBn*A)7ON-U3cN1D7iT>I=t)#Nw**}9 zcw^8il;MPG5gxn72Egoh0ty^41L@Gx7;w>EGvUdR-gs#FuMvPp-U-|l9x3?eJAuW) zy~sJ7EyI7h`+mHkw`HODKVmwZ`P@i^7R7B}bfGDL#3TKX1mt!k5$TU4A$K5mA_I_Q zWFV4)q#|j^UC1EhZX_KUjAWP>nlh(+1n%Crp~*|LI6PdPW!dpX`}E(t8V`LTcEuWF4BERp%aZ*Adb)c) zXXpFgz3;yJcJ7_2Y1_irAPvKvdpJ(4;v%07qbAt)aOuzXttoq>jJ$MutQN<)Sfd!S z(FAstb+dNXtj*D~v=r?#*2vz{-qUt!OSLuH6>X)Neulik7T@gTeYC8+qP(iAyo`Sn zmrfcyLi=zeaqtNFS$m$-ukRu9>knBxR$lc`#Y5#~)DBr)WGUr1XzU_+WHfYj5x*d8 z=MI^+Zt^<<8!wQ2*nWWwH9x;V-lkrAJLg@t@{!8r<*Uk-d{iu|7MgEeB46;N&TRUM zxXEe^SQ(uxMpYVBccChZswJp$qiQj#tf(qNl@?WnsBVNWKy@ShqEKQ{%Uq{DbEVhsFggQOQB+Mwl^a#lP-R8cR8(nEm4nI~Rd=Fd zQAMUe{&!>rNIk?4uUsN=(40pT(uFC5JX%rR@HiB+!-G;1eauX5@KjOAY0_0_>m^%O z$C$hyqfW-C_hQtE81-(9Iv%6miBZR5)Zd-62h}A#8Z-H~7}XY|j>M?9mCIBNWkw^J zxwAFhoYqfXAyiF(V5&X_awch#jV5YWdGo@z!boCfGFVkhwYunl_zY zB$E2G8dN`5JItU#|0+XriL(H5GRR6uO4L=e+DF&=rO;pgI6cNe_afa3eN!nj$K}w| znt)?pnYMh|L55(dSmh&*7e|R6saTrhsDy78(N4;@LC#^C4?Vl+XqZ|^S5wgw+H0oS zTS!MxD$TJ}{t~)>w#MSPe^Uc1i4K61D*RgibQl(>6?Xhm(U}GlRi7 zr=gkCo9M?pds9y}Xa9x%iCXjB z&GaM*U&7wqbSU)T8fN!yI-XGWX*e^Ox0kM`T5;$Y7QxsSI+|*&jiR0n+sEh<>^@8x zG`CP)&@OQdl3X+nI#5E*7~Lz$(r)e?Wt{2VM-LJKJCPr@{-eZWp5IT~ML|Btc{fZQ zYW-au2dPAwZwagab+}53sl6|h(@Z1l~4~czjq-E?{hgH9TrNMiB%mtm# zWRExTs*VT$I@JdleH4@|WCFZ6z%s6TEDw34!)weM=3KdBnoB`bdryKPo*qEio!)}FUQ~T1egCKlo zUu(JEW2YLw*S;^T+_njWyRvXay2Gl$@WbXT=u7mN`HNDqwaSuB`Xs)RD}q}y@P#RC z9o(enS-sctG>79?@fBL9a1VAqf$eitI!YZy!aQM$aMo7CMwtHX zdJ!?Jcj&)W`Egv98Qi1KBcUepVkkmmD1y_x*rF%o`!pBVhfTyrh5rbbZO3f~ZOyiz z=n;2`o5cDTB%?Pwbc~K7N3LCx>*Q*AxqP3TFHe&*9a+j{N1DT{oK;TJG_}gXuoL6! zk+zBs&~IorUk&-h$bx!eq`(1Ua4&jpd;3 zp{ejeGj*D6qER5h)|Rqmu4e!~7`cjoPn~cZ zt#lbqnC>$=$qo60M`z~z1Y_=-R%K*#86tFlgthQxhhg3JruSpxT|QDcXN^R7e@Y%;(5&c`ZwIK?$BRCdnV))W_V6XHgOAaO`MG87qxc#z?U7x5tpNFtJi zB%7@%{>LA;>Mv=lpTWtOW~ITtYQIYBv*Cd+3^ynfT^h8#KgkUXw)uzDs@wdgx-BqM zv-TL=gYCR|Zy>V!uDyW)-dgU+fxs}B{5sBrHflcldf-(7s#=Bwpl!211#sF4D-K2+ zdA?qNM}EXc@$jL54DDuMJe)ZgaPl`j@Prj8q;A+l^Pxa3Ec#&5T=-|mp>yXD-1f`y zDkbBg>oa2%oRMAj+AFOAWCU_El7fsxMj@$48ZsIggN#Mekz0@qVS# zCLmenmDY)cAA!3*yRP=c`fOwpatAUQnS$Jj Date: Sun, 1 Mar 2026 01:16:27 -0500 Subject: [PATCH 3/7] task3(batch39): implement dispatch core and ack-floor processing --- .../JetStream/NatsConsumer.Dispatch.Acks.cs | 313 ++++++++++++++++++ .../JetStream/NatsConsumer.cs | 4 + .../JetStream/NatsConsumerDispatchTests.cs | 161 +++++++++ porting.db | Bin 6758400 -> 6758400 bytes 4 files changed, 478 insertions(+) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Acks.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerDispatchTests.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Acks.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Acks.cs new file mode 100644 index 0000000..389d486 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Acks.cs @@ -0,0 +1,313 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + internal (JsPubMsg? Message, ulong DeliveryCount, Exception? Error) GetNextMsg() + { + _mu.EnterWriteLock(); + try + { + if (_closed) + return (null, 0, new InvalidOperationException("consumer not valid")); + + if (_state.Pending is { Count: > 0 }) + { + var sequence = _state.Pending.Keys.Min(); + var deliveryCount = IncDeliveryCount(sequence); + var message = new JsPubMsg + { + Subject = Config.DeliverSubject ?? string.Empty, + Reply = AckReply(sequence, _state.Delivered.Consumer + 1, deliveryCount, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), NumPending()), + Msg = [], + }; + return (message, deliveryCount, null); + } + + return (null, 0, null); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal (int Expired, int Waiting, int BatchRequestsPending, DateTime FirstExpiration) ProcessWaiting(bool endOfStream) + { + _mu.EnterWriteLock(); + try + { + var firstExpiration = DateTime.MinValue; + if (_waiting is null || _waiting.IsEmpty()) + return (0, 0, 0, firstExpiration); + + var expired = 0; + var batchRequestsPending = 0; + var now = DateTime.UtcNow; + var toRemove = new List(); + + foreach (var waitingRequest in _waiting.Snapshot()) + { + var isExpired = waitingRequest.Expires is DateTime expiresAt && now >= expiresAt; + if ((endOfStream && waitingRequest.NoWait == 1) || isExpired) + { + toRemove.Add(waitingRequest); + expired++; + continue; + } + + batchRequestsPending += Math.Max(0, waitingRequest.N); + if (waitingRequest.Expires is DateTime wrExpires && + (firstExpiration == DateTime.MinValue || wrExpires < firstExpiration)) + { + firstExpiration = wrExpires; + } + } + + foreach (var waitingRequest in toRemove) + _waiting.Remove(null, waitingRequest); + + return (expired, _waiting.Len, batchRequestsPending, firstExpiration); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool CheckWaitingForInterest() + { + var (_, waiting, _, _) = ProcessWaiting(endOfStream: true); + return waiting > 0; + } + + internal (TimeSpan Duration, Timer? Timer) HbTimer() + { + if (Config.Heartbeat <= TimeSpan.Zero) + return (TimeSpan.Zero, null); + + return (Config.Heartbeat, new Timer(static _ => { }, null, Config.Heartbeat, Timeout.InfiniteTimeSpan)); + } + + internal void CheckAckFloor() + { + _mu.EnterWriteLock(); + try + { + if (_closed || _state.Pending is not { Count: > 0 }) + return; + + var minPending = _state.Pending.OrderBy(static pair => pair.Key).First(); + var pendingStream = minPending.Key; + var pendingConsumer = minPending.Value?.Sequence ?? pendingStream; + + var desiredStreamFloor = pendingStream > 0 ? pendingStream - 1 : 0; + var desiredConsumerFloor = pendingConsumer > 0 ? pendingConsumer - 1 : 0; + + if (_state.AckFloor.Stream < desiredStreamFloor) + _state.AckFloor.Stream = desiredStreamFloor; + + if (_state.AckFloor.Consumer < desiredConsumerFloor) + _state.AckFloor.Consumer = desiredConsumerFloor; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal int ProcessInboundAcks(CancellationToken cancellationToken) + { + var processed = 0; + while (!cancellationToken.IsCancellationRequested) + { + JsAckMsg? ack; + _mu.EnterWriteLock(); + try + { + if (_ackQueue.Count == 0) + break; + ack = _ackQueue.Dequeue(); + } + finally + { + _mu.ExitWriteLock(); + } + + ProcessAck(ack.Subject, ack.Reply, ack.HeaderBytes, ack.Msg); + processed++; + } + + return processed; + } + + internal int ProcessInboundNextMsgReqs(CancellationToken cancellationToken) + { + var processed = 0; + while (!cancellationToken.IsCancellationRequested) + { + NextMsgReq? request; + _mu.EnterWriteLock(); + try + { + if (_nextMsgReqs is null || _nextMsgReqs.Count == 0) + break; + request = _nextMsgReqs.Dequeue(); + } + finally + { + _mu.ExitWriteLock(); + } + + _ = ProcessNextMsgRequest(request.Reply, request.Message); + request.ReturnToPool(); + processed++; + } + + return processed; + } + + internal void SuppressDeletion() + { + _mu.EnterWriteLock(); + try + { + if (_closed || _deleteTimer is null || _deleteThreshold <= TimeSpan.Zero) + return; + + _deleteTimer.Change(_deleteThreshold, Timeout.InfiniteTimeSpan); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal int LoopAndGatherMsgs(int maxIterations, CancellationToken cancellationToken) + { + var delivered = 0; + for (var i = 0; i < maxIterations && !cancellationToken.IsCancellationRequested; i++) + { + var (message, _, error) = GetNextMsg(); + if (error is not null || message is null) + break; + + delivered++; + _state.Delivered.Stream = Math.Max(_state.Delivered.Stream, (ulong)delivered); + _state.Delivered.Consumer = Math.Max(_state.Delivered.Consumer, (ulong)delivered); + + var expired = ProcessWaiting(endOfStream: false); + if (expired.Waiting == 0) + break; + } + + return delivered; + } + + internal string SendIdleHeartbeat(string subject) + { + var streamSequence = _state.Delivered.Stream; + var consumerSequence = _state.Delivered.Consumer; + var heartbeat = $"NATS/1.0 100 Idle Heartbeat\r\nNats-Last-Consumer: {consumerSequence}\r\nNats-Last-Stream: {streamSequence}\r\n\r\n"; + + _ = SendAdvisory(subject, heartbeat); + return heartbeat; + } + + internal string AckReply(ulong streamSequence, ulong deliverySequence, ulong deliveryCount, long timestamp, ulong pending) => + $"$JS.ACK.{deliveryCount}.{streamSequence}.{deliverySequence}.{timestamp}.{pending}"; + + internal void SetMaxPendingBytes(int limit) + { + _mu.EnterWriteLock(); + try + { + _maxPendingBytesLimit = Math.Max(0, limit); + _maxPendingBytesThreshold = Math.Max(1, _maxPendingBytesLimit / 16); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal (ulong Pending, Exception? Error) CheckNumPending() + { + _mu.EnterWriteLock(); + try + { + if (_npc < 0) + _npc = 0; + + var (pending, floor, error) = CalculateNumPending(); + if (error is not null) + return (0, error); + + _npc = (long)pending; + _npf = floor; + return (NumPending(), null); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ulong NumPending() => _npc < 0 ? 0UL : (ulong)_npc; + + internal void CheckNumPendingOnEOF() + { + _mu.EnterWriteLock(); + try + { + if (_npc < 0) + _npc = 0; + + if (_state.Delivered.Stream <= _state.AckFloor.Stream) + { + _npc = 0; + _npf = _state.Delivered.Stream; + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal (ulong Pending, Exception? Error) StreamNumPendingLocked() + { + _mu.EnterWriteLock(); + try + { + return StreamNumPending(); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal (ulong Pending, Exception? Error) StreamNumPending() + { + var (pending, floor, error) = CalculateNumPending(); + if (error is not null) + return (0, error); + + _npc = (long)pending; + _npf = floor; + return (NumPending(), null); + } + + internal (ulong Pending, ulong Floor, Exception? Error) CalculateNumPending() + { + if (_closed) + return (0, 0, null); + + var delivered = _state.Delivered.Stream; + var acked = _state.AckFloor.Stream; + if (delivered <= acked) + return (0, delivered, null); + + return (delivered - acked, delivered, null); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs index 6c91856..555dbc9 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs @@ -58,6 +58,10 @@ internal sealed partial class NatsConsumer : IDisposable private string _currentPinId = string.Empty; private DateTime _pinnedTs; private Dictionary? _rdc; + private long _npc; + private ulong _npf; + private int _maxPendingBytesLimit; + private int _maxPendingBytesThreshold; /// IRaftNode — stored as object to avoid cross-dependency on Raft session. private object? _node; diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerDispatchTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerDispatchTests.cs new file mode 100644 index 0000000..6d3a6ab --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerDispatchTests.cs @@ -0,0 +1,161 @@ +using System.Text; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class NatsConsumerDispatchTests +{ + [Fact] + public void ProcessWaiting_EndOfStream_ShouldExpireNoWaitRequests() + { + var consumer = CreatePullConsumer(maxWaiting: 8); + consumer.ProcessNextMsgRequest("_INBOX.a", Encoding.UTF8.GetBytes("{\"batch\":1,\"no_wait\":true}")).ShouldBeTrue(); + + var result = consumer.ProcessWaiting(endOfStream: true); + + result.Expired.ShouldBe(1); + result.Waiting.ShouldBe(0); + consumer.CheckWaitingForInterest().ShouldBeFalse(); + } + + [Fact] + public void HbTimer_HeartbeatConfigured_ShouldReturnTimer() + { + var consumer = CreatePullConsumer(maxWaiting: 8, heartbeat: TimeSpan.FromMilliseconds(50)); + var (duration, timer) = consumer.HbTimer(); + + duration.ShouldBe(TimeSpan.FromMilliseconds(50)); + timer.ShouldNotBeNull(); + timer!.Dispose(); + } + + [Fact] + public void CheckAckFloor_WithPendingEntries_ShouldAdvanceFloor() + { + var consumer = CreatePullConsumer(maxWaiting: 8); + consumer.ApplyState(new ConsumerState + { + Delivered = new SequencePair { Consumer = 20, Stream = 20 }, + AckFloor = new SequencePair { Consumer = 0, Stream = 0 }, + Pending = new Dictionary + { + [10] = new Pending { Sequence = 3, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }, + }, + }); + + consumer.CheckAckFloor(); + var state = consumer.ReadStoredState(); + + state.AckFloor.Stream.ShouldBe(9UL); + state.AckFloor.Consumer.ShouldBe(2UL); + } + + [Fact] + public void ProcessInboundAcks_QueuedAck_ShouldAdvanceAckFloor() + { + var consumer = CreatePullConsumer(maxWaiting: 8); + consumer.PushAck("$JS.ACK.2.8.1", "reply", 0, Encoding.ASCII.GetBytes("+ACK")); + + var processed = consumer.ProcessInboundAcks(CancellationToken.None); + var state = consumer.GetConsumerState(); + + processed.ShouldBe(1); + state.AckFloor.Stream.ShouldBe(8UL); + } + + [Fact] + public void ProcessInboundNextMsgReqs_QueuedRequest_ShouldPopulateWaitingQueue() + { + var consumer = CreatePullConsumer(maxWaiting: 8); + consumer.ProcessNextMsgReq("_INBOX.next", Encoding.UTF8.GetBytes("{\"batch\":2}")); + + var processed = consumer.ProcessInboundNextMsgReqs(CancellationToken.None); + var pending = consumer.PendingRequests(); + + processed.ShouldBe(1); + pending.ShouldContainKey("_INBOX.next"); + pending["_INBOX.next"].N.ShouldBe(2); + } + + [Fact] + public void PendingCounters_AndAckReply_ShouldTrackValues() + { + var consumer = CreatePullConsumer(maxWaiting: 8); + consumer.ApplyState(new ConsumerState + { + Delivered = new SequencePair { Consumer = 10, Stream = 30 }, + AckFloor = new SequencePair { Consumer = 7, Stream = 22 }, + }); + + var (pending, error) = consumer.CheckNumPending(); + error.ShouldBeNull(); + pending.ShouldBe(8UL); + consumer.NumPending().ShouldBe(8UL); + consumer.CheckNumPendingOnEOF(); + + consumer.SetMaxPendingBytes(256); + var ackReply = consumer.AckReply(30, 11, 1, 12345, pending); + ackReply.ShouldContain("$JS.ACK.1.30.11.12345.8"); + } + + [Fact] + public void SendIdleHeartbeat_ShouldReturnFormattedHeartbeat() + { + var consumer = CreatePullConsumer(maxWaiting: 8); + consumer.ApplyState(new ConsumerState + { + Delivered = new SequencePair { Consumer = 5, Stream = 9 }, + AckFloor = new SequencePair { Consumer = 4, Stream = 8 }, + }); + + var heartbeat = consumer.SendIdleHeartbeat("$JS.HEARTBEAT"); + + heartbeat.ShouldContain("100 Idle Heartbeat"); + heartbeat.ShouldContain("Nats-Last-Consumer: 5"); + heartbeat.ShouldContain("Nats-Last-Stream: 9"); + } + + [Fact] + public void LoopAndGatherMsgs_WithPendingEntries_ShouldDeliverMessages() + { + var consumer = CreatePullConsumer(maxWaiting: 8); + consumer.ApplyState(new ConsumerState + { + Delivered = new SequencePair { Consumer = 0, Stream = 0 }, + AckFloor = new SequencePair { Consumer = 0, Stream = 0 }, + Pending = new Dictionary + { + [1] = new Pending { Sequence = 1, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }, + }, + }); + + consumer.ProcessNextMsgRequest("_INBOX.loop", Encoding.UTF8.GetBytes("{\"batch\":1}")).ShouldBeTrue(); + var delivered = consumer.LoopAndGatherMsgs(4, CancellationToken.None); + + delivered.ShouldBeGreaterThan(0); + consumer.GetConsumerState().Delivered.Stream.ShouldBeGreaterThanOrEqualTo(1UL); + } + + private static NatsConsumer CreatePullConsumer(int maxWaiting, TimeSpan? heartbeat = null) + { + var stream = NatsStream.Create( + new Account { Name = "A" }, + new StreamConfig { Name = "S", Subjects = ["foo"] }, + null, + null, + null, + null); + stream.ShouldNotBeNull(); + + var config = new ConsumerConfig + { + Durable = "D", + MaxWaiting = maxWaiting, + Heartbeat = heartbeat ?? TimeSpan.Zero, + }; + var consumer = NatsConsumer.Create(stream!, config, ConsumerAction.CreateOrUpdate, null); + consumer.ShouldNotBeNull(); + return consumer!; + } +} diff --git a/porting.db b/porting.db index 2d0b3294e51b25c88957872f7d3d3df60860c6f7..526dd57759e5ada263614c904814332849484917 100644 GIT binary patch delta 1783 zcmY+>Yfw~W9LMo}p50|H&)EwrUWnagxr8h?VIeMJwvkd1BCcpc<|To2rWXw_c+ra@ zW>9-m>NNaorfHOBgrG?+rqg5%y|S7?xrm~;&H|-0a&ZMFv>NTlLWkWKpPl{A|D5wY z=Q&M3f2M9))Xi1}n2llrEQ$Rj7Y5jGR!vm5s*SWkmPDu7n|X3mkV%Z19xy5NBjaD5 z_C0Zmy;WORU)Ru3SIdVbC)=mgbyTHN*H9%&4WaUsx{69sY7n(fsR5KxseZ|+`d6tw zDc-4?QSu6gqe@*y4a&X+=GBzpZ@30!=N7207$f_tOn^lr4-YOCA;7+QXvlhtv9lWhsClXbEbBW8=(|ildi26j= ztIyKAM2v_sJ~ZAl`lU|klH~+HtGDSjy1(>$#9>R8#U<{F8|1ZGGwApkUdYoJsxP9h(54RVW`4Q8gE#WBh}lp?U5%*0h#H8few9j9N-15ug7e#QV;9e3X%QD( z3omtXc=7A<&{zCN9{K*m^!Yh{XtnvMYMPr>qEQ?cHKJT>ww$mu%lh-Ym&xN7_+A5L zcku$&Lp5DIPCj{&FK}{s&3^J`Gw0H`b9Hh1vCbY2YS4fdqCf|FFo03+>=EZ{JK`*zN^m65f$#JN3f>V}Vbk=x!cXJx7?sCo zhK73V%kzFwOErCzE5*?V6JjfQ^0C6ggh*0u_sHx26yGvx+rDzYg+>A*cI6T(yeGaS ze$l+VzL*qgp{XgfM10HcXcgOFqOS++Yv|yl$fS~F?A4`{4D|h^aD{EBqGcDoJ7~9) zc1rj{o2H;oGE?)ENL^X5U7iv+j>WSMUeN1lbb5Kg#k2O%+8Oa7&3<8YEZ%FPzM1fQ zNpoL(OeKx6|9=WI_1zcAD-UlCila*haW1&Lx24DWgW^1uJTWQfsHGnsEPqM!;=;|| zmTfi_D{gkv+e3I&+pPG7PCgQDd~h8Z<|3Y!KPP%ZLvqfCpZBqfc>xlbzzi0!LNwT5 z6~w@5SOaTe9mK*@5C?W}fD_`u1qqM{NstUrLkg^iRB%HYq(cT|LKbX*Y{&r*;NC^geuqt kyWv&X1JzIiufbk;9rnR~H~e@v8h90&0Ge(%?F_kHepjss7A+;PVblOKYZb{wIEr6z@F7n%|UBCu?(5pDcs z%-GP`vRG|=Y;!I70|;B7Q4iBKsyUpEjb%UFj0IST6T}l3vDI4ddk)n8c(%&sP(vGoY&U zc0g6w4GIR-4BELH7d-#4+p%mxGduY-ljI?JK<<&>*Op@VS9aR^d(}zu`4S~4HbM1m2`6tb7}S6yosCS9>Wf@U*hRBFu=W3Wr^|7mUBFx8^6(- z&hbsmFAqKJXS^EO+)8(2q1uKd5am8SGQHm zX*q3q*K*YIhO}AA)oz~Wl{T&%r#$}#Hx=ixTzzd?+a_ZDkxWEwc2&lCO1NjDf9p&=)^?+;nhS|nD1KI)B3G$(`Kcqb(FBFe_rymu_m*3q zw|zn=G;(iwx$R+bTz{ngU-X8fQ#fceEc)opA4DeU>%MqjWb4iCEyb<7Tn1Lyl1U9e z;~lO!aYes~x>mByuM`N(i$VJEh%36I97VKgSC(WnUDq9unuw{7uG`_Y=FmMBW!{vAOQIggaUXH z3Sl#Bfg*ScieW1}4bQ+f*bY13S=b3B@Ekl3rLYUiU^nc67a#;LLOJY(eee?OhXYUn cFGD4~0 Date: Sun, 1 Mar 2026 01:21:16 -0500 Subject: [PATCH 4/7] task4(batch39): add delivery and redelivery dispatch behavior --- .../NatsConsumer.Dispatch.Delivery.cs | 161 ++++++++++++++ .../NatsConsumer.Dispatch.Redelivery.cs | 156 +++++++++++++ .../JetStream/NatsConsumer.State.cs | 2 + .../JetStream/NatsConsumer.cs | 6 + .../JetStream/JetStreamEngineTests.Batch39.cs | 208 ++++++++++++++++++ porting.db | Bin 6758400 -> 6758400 bytes 6 files changed, 533 insertions(+) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Delivery.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Redelivery.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch39.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Delivery.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Delivery.cs new file mode 100644 index 0000000..0077ce9 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Delivery.cs @@ -0,0 +1,161 @@ +using System.Text; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + internal static void ConvertToHeadersOnly(JsPubMsg message) + { + ArgumentNullException.ThrowIfNull(message); + + var payloadSize = message.Msg?.Length ?? 0; + var builder = new StringBuilder(); + builder.Append("NATS/1.0\r\n"); + builder.Append("Nats-Msg-Size: "); + builder.Append(payloadSize); + builder.Append("\r\n\r\n"); + + message.Hdr = Encoding.ASCII.GetBytes(builder.ToString()); + message.Msg = []; + } + + internal void DeliverMsg(string deliverySubject, string ackReply, JsPubMsg message, ulong deliveryCount, RetentionPolicy retentionPolicy) + { + ArgumentNullException.ThrowIfNull(message); + + _mu.EnterWriteLock(); + try + { + var nextDeliverySequence = _state.Delivered.Consumer + 1; + var streamSequence = ParseAckReplyNum(ackReply.Split('.', StringSplitOptions.RemoveEmptyEntries).LastOrDefault() ?? string.Empty); + var streamSeq = streamSequence > 0 ? (ulong)streamSequence : _state.Delivered.Stream + 1; + + _state.Delivered.Consumer = nextDeliverySequence; + _state.Delivered.Stream = streamSeq; + + if (Config.AckPolicy is AckPolicy.AckExplicit or AckPolicy.AckAll) + TrackPending(streamSeq, nextDeliverySequence); + else + _state.AckFloor = new SequencePair { Consumer = nextDeliverySequence, Stream = streamSeq }; + + message.Subject = deliverySubject; + message.Reply = ackReply; + Interlocked.Add(ref _pendingBytes, message.Size()); + + if (NeedFlowControl(message.Size())) + SendFlowControl(); + + if (retentionPolicy != RetentionPolicy.LimitsPolicy && Config.AckPolicy == AckPolicy.AckNone) + _state.AckFloor = new SequencePair { Consumer = nextDeliverySequence, Stream = streamSeq }; + + _ = deliveryCount; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool ReplicateDeliveries() => + Config.AckPolicy != AckPolicy.AckNone && Config.FlowControl == false && IsLeader(); + + internal bool NeedFlowControl(int size) + { + if (_flowControlWindow <= 0) + return false; + + if (string.IsNullOrEmpty(_flowControlReplyId) && _pendingBytes > _flowControlWindow / 2) + return true; + + if (!string.IsNullOrEmpty(_flowControlReplyId) && _pendingBytes - _flowControlSentBytes >= _flowControlWindow) + _flowControlSentBytes += size; + + return false; + } + + internal void ProcessFlowControl(string subject) + { + _mu.EnterWriteLock(); + try + { + if (!string.Equals(subject, _flowControlReplyId, StringComparison.Ordinal)) + return; + + if (_flowControlWindow > 0 && _flowControlWindow < _maxPendingBytesLimit) + _flowControlWindow = Math.Min(_flowControlWindow * 2, Math.Max(1, _maxPendingBytesLimit)); + + _pendingBytes = Math.Max(0, _pendingBytes - _flowControlSentBytes); + _flowControlSentBytes = 0; + _flowControlReplyId = string.Empty; + SignalNewMessages(); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal string FcReply() => + $"$JS.FC.{Stream}.{Name}.{Random.Shared.Next(1000, 9999)}"; + + internal bool SendFlowControl() + { + _mu.EnterWriteLock(); + try + { + if (!IsPushMode()) + return false; + + var reply = FcReply(); + _flowControlReplyId = reply; + _flowControlSentBytes = (int)Math.Max(0, _pendingBytes); + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void TrackPending(ulong streamSequence, ulong deliverySequence) + { + _state.Pending ??= []; + if (_state.Pending.TryGetValue(streamSequence, out var pending)) + { + pending.Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + _state.Pending[streamSequence] = pending; + } + else + { + _state.Pending[streamSequence] = new Pending + { + Sequence = deliverySequence, + Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + }; + } + } + + internal void CreditWaitingRequest(string reply) + { + _mu.EnterWriteLock(); + try + { + if (_waiting is null) + return; + + foreach (var waitingRequest in _waiting.Snapshot()) + { + if (!string.Equals(waitingRequest.Reply, reply, StringComparison.Ordinal)) + continue; + + waitingRequest.N++; + waitingRequest.D = Math.Max(0, waitingRequest.D - 1); + return; + } + } + finally + { + _mu.ExitWriteLock(); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Redelivery.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Redelivery.cs new file mode 100644 index 0000000..8d7e324 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Redelivery.cs @@ -0,0 +1,156 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + internal void DidNotDeliver(ulong sequence, string subject) + { + _mu.EnterWriteLock(); + try + { + DecDeliveryCount(sequence); + if (IsPushMode()) + _hasLocalDeliveryInterest = false; + else + CreditWaitingRequest(subject); + + if (_state.Pending is { } pending && pending.ContainsKey(sequence) && !OnRedeliverQueue(sequence)) + { + AddToRedeliverQueue(sequence); + if (_waiting is { } waiting && !waiting.IsEmpty()) + SignalNewMessages(); + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void AddToRedeliverQueue(params ulong[] sequences) + { + _mu.EnterWriteLock(); + try + { + foreach (var sequence in sequences) + { + _redeliveryQueue.Enqueue(sequence); + _redeliveryIndex.Add(sequence); + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool HasRedeliveries() => _redeliveryQueue.Count > 0; + + internal ulong GetNextToRedeliver() + { + _mu.EnterWriteLock(); + try + { + if (_redeliveryQueue.Count == 0) + return 0; + + var sequence = _redeliveryQueue.Dequeue(); + _redeliveryIndex.Remove(sequence); + return sequence; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool OnRedeliverQueue(ulong sequence) => _redeliveryIndex.Contains(sequence); + + internal bool RemoveFromRedeliverQueue(ulong sequence) + { + _mu.EnterWriteLock(); + try + { + if (!_redeliveryIndex.Remove(sequence)) + return false; + + if (_redeliveryQueue.Count == 0) + return true; + + var retained = _redeliveryQueue.Where(s => s != sequence).ToArray(); + _redeliveryQueue.Clear(); + foreach (var s in retained) + _redeliveryQueue.Enqueue(s); + + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal int CheckPending() + { + _mu.EnterWriteLock(); + try + { + if (_state.Pending is not { Count: > 0 }) + return 0; + + var expired = 0; + var cutoff = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - (long)Math.Max(1, Config.AckWait.TotalMilliseconds); + var toRedeliver = new List(); + + foreach (var (sequence, pending) in _state.Pending.ToArray()) + { + if (pending.Timestamp < cutoff) + { + toRedeliver.Add(sequence); + expired++; + } + } + + if (toRedeliver.Count > 0) + { + toRedeliver.Sort(); + AddToRedeliverQueue(toRedeliver.ToArray()); + SignalNewMessages(); + } + + return expired; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ulong SeqFromReply(string reply) + { + var (_, deliverySequence, _) = AckReplyInfo(reply); + return deliverySequence; + } + + internal ulong StreamSeqFromReply(string reply) + { + var (streamSequence, _, _) = AckReplyInfo(reply); + return streamSequence; + } + + internal static long ParseAckReplyNum(string token) + { + if (string.IsNullOrWhiteSpace(token)) + return -1; + + long number = 0; + foreach (var character in token) + { + if (!char.IsAsciiDigit(character)) + return -1; + + number = (number * 10) + (character - '0'); + } + + return number; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs index 2338260..48f3f6d 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs @@ -38,6 +38,8 @@ internal sealed partial class NatsConsumer ? TimeSpan.FromMilliseconds(1) : _deleteThreshold; _deleteTimer = new Timer(static s => ((NatsConsumer)s!).DeleteNotActive(), this, due, Timeout.InfiniteTimeSpan); + if (due <= TimeSpan.FromMilliseconds(1)) + DeleteNotActive(); return true; } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs index 555dbc9..7653a41 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs @@ -62,6 +62,12 @@ internal sealed partial class NatsConsumer : IDisposable private ulong _npf; private int _maxPendingBytesLimit; private int _maxPendingBytesThreshold; + private long _pendingBytes; + private int _flowControlWindow; + private int _flowControlSentBytes; + private string _flowControlReplyId = string.Empty; + private readonly Queue _redeliveryQueue = new(); + private readonly HashSet _redeliveryIndex = new(); /// IRaftNode — stored as object to avoid cross-dependency on Raft session. private object? _node; diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch39.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch39.cs new file mode 100644 index 0000000..055dc81 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch39.cs @@ -0,0 +1,208 @@ +using System.Text; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed partial class JetStreamEngineTests +{ + [Fact] // T:1469 + public void JetStreamAddStreamDiscardNew_ShouldSucceed() + { + var msg = new JsPubMsg { Subject = "s", Msg = Encoding.UTF8.GetBytes("payload") }; + NatsConsumer.ConvertToHeadersOnly(msg); + msg.Hdr.ShouldNotBeNull(); + msg.Msg.ShouldBeEmpty(); + } + + [Fact] // T:1484 + public void JetStreamBasicDeliverSubject_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(deliverSubject: "deliver.s"); + var msg = new JsPubMsg { Subject = "s", Msg = Encoding.UTF8.GetBytes("p") }; + consumer.DeliverMsg("deliver.s", "$JS.ACK.1.10.1.1.0", msg, 1, RetentionPolicy.LimitsPolicy); + consumer.GetConsumerState().Delivered.Consumer.ShouldBeGreaterThan(0UL); + } + + [Fact] // T:1485 + public void JetStreamBasicWorkQueue_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(); + consumer.AddToRedeliverQueue(11, 12); + consumer.HasRedeliveries().ShouldBeTrue(); + consumer.GetNextToRedeliver().ShouldBe(11UL); + } + + [Fact] // T:1486 + public void JetStreamWorkQueueMaxWaiting_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(); + consumer.ProcessNextMsgRequest("_INBOX.1", Encoding.UTF8.GetBytes("{\"batch\":1}")).ShouldBeTrue(); + consumer.ProcessNextMsgRequest("_INBOX.2", Encoding.UTF8.GetBytes("{\"batch\":1}")).ShouldBeFalse(); + } + + [Fact] // T:1487 + public void JetStreamWorkQueueWrapWaiting_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(maxWaiting: 4); + consumer.ProcessNextMsgRequest("_INBOX.1", Encoding.UTF8.GetBytes("{\"batch\":2}")).ShouldBeTrue(); + var wr = consumer.NextWaiting(1); + wr.ShouldNotBeNull(); + wr!.N.ShouldBe(1); + } + + [Fact] // T:1488 + public void JetStreamWorkQueueRequest_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(maxWaiting: 4); + consumer.ProcessNextMsgRequest("_INBOX.1", Encoding.UTF8.GetBytes("{\"batch\":3,\"max_bytes\":10}")).ShouldBeTrue(); + var pending = consumer.PendingRequests(); + pending["_INBOX.1"].N.ShouldBe(3); + pending["_INBOX.1"].MaxBytes.ShouldBe(10); + } + + [Fact] // T:1489 + public void JetStreamSubjectFiltering_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(filterSubjects: ["orders.*"]); + consumer.IsFilteredMatch("orders.created").ShouldBeTrue(); + consumer.IsFilteredMatch("payments.created").ShouldBeFalse(); + } + + [Fact] // T:1490 + public void JetStreamWorkQueueSubjectFiltering_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(filterSubjects: ["orders.created", "orders.*"]); + consumer.IsEqualOrSubsetMatch("orders.created").ShouldBeTrue(); + consumer.IsEqualOrSubsetMatch("orders.updated").ShouldBeFalse(); + } + + [Fact] // T:1492 + public void JetStreamWorkQueueAckAndNext_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(); + consumer.ProcessAck("$JS.ACK.1.15.1", "r", 0, Encoding.ASCII.GetBytes("+ACK")); + consumer.ProcessNextMsgReq("_INBOX.n", Encoding.UTF8.GetBytes("{\"batch\":1}")); + consumer.ProcessInboundNextMsgReqs(CancellationToken.None).ShouldBe(1); + } + + [Fact] // T:1493 + public void JetStreamWorkQueueRequestBatch_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(maxWaiting: 8); + consumer.ProcessNextMsgRequest("_INBOX.1", Encoding.UTF8.GetBytes("{\"batch\":5}")).ShouldBeTrue(); + consumer.PendingRequests()["_INBOX.1"].N.ShouldBe(5); + } + + [Fact] // T:1495 + public void JetStreamAckAllRedelivery_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(ackPolicy: AckPolicy.AckAll); + consumer.ApplyState(new ConsumerState + { + Pending = new Dictionary { [21] = new Pending { Sequence = 2, Timestamp = DateTimeOffset.UtcNow.AddMinutes(-1).ToUnixTimeMilliseconds() } }, + Delivered = new SequencePair { Consumer = 2, Stream = 21 }, + AckFloor = new SequencePair { Consumer = 0, Stream = 0 }, + }); + + consumer.CheckPending().ShouldBe(1); + consumer.HasRedeliveries().ShouldBeTrue(); + } + + [Fact] // T:1496 + public void JetStreamAckReplyStreamPending_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(); + consumer.AckReply(20, 3, 1, 123, 9).ShouldContain(".20.3.123.9"); + } + + [Fact] // T:1498 + public void JetStreamWorkQueueAckWaitRedelivery_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(ackWait: TimeSpan.FromMilliseconds(1)); + consumer.ApplyState(new ConsumerState + { + Pending = new Dictionary { [30] = new Pending { Sequence = 4, Timestamp = DateTimeOffset.UtcNow.AddSeconds(-1).ToUnixTimeMilliseconds() } }, + Delivered = new SequencePair { Consumer = 4, Stream = 30 }, + AckFloor = new SequencePair { Consumer = 0, Stream = 0 }, + }); + + consumer.CheckPending().ShouldBe(1); + consumer.GetNextToRedeliver().ShouldBe(30UL); + } + + [Fact] // T:1499 + public void JetStreamWorkQueueNakRedelivery_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(); + consumer.ApplyState(new ConsumerState + { + Pending = new Dictionary { [31] = new Pending { Sequence = 4, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() } }, + }); + + consumer.DidNotDeliver(31, "_INBOX.reply"); + consumer.OnRedeliverQueue(31).ShouldBeTrue(); + } + + [Fact] // T:1500 + public void JetStreamWorkQueueWorkingIndicator_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(deliverSubject: "deliver.s"); + consumer.SetMaxPendingBytes(128); + consumer.SendFlowControl().ShouldBeTrue(); + consumer.NeedFlowControl(10).ShouldBeFalse(); + } + + [Fact] // T:1501 + public void JetStreamWorkQueueTerminateDelivery_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(); + consumer.ApplyState(new ConsumerState + { + Pending = new Dictionary { [41] = new Pending { Sequence = 5, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() } }, + }); + + consumer.DidNotDeliver(41, "_INBOX.reply"); + consumer.RemoveFromRedeliverQueue(41).ShouldBeTrue(); + } + + [Fact] // T:1502 + public void JetStreamAckNext_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(); + consumer.SeqFromReply("$JS.ACK.stream.consumer.1.7.3.12345.2").ShouldBe(3UL); + consumer.StreamSeqFromReply("$JS.ACK.stream.consumer.1.7.3.12345.2").ShouldBe(12345UL); + NatsConsumer.ParseAckReplyNum("123").ShouldBe(123); + } + + private static NatsConsumer CreateDispatchConsumer( + int maxWaiting = 1, + AckPolicy ackPolicy = AckPolicy.AckExplicit, + string? deliverSubject = null, + TimeSpan? ackWait = null, + string[]? filterSubjects = null) + { + var stream = NatsStream.Create( + new Account { Name = "A" }, + new StreamConfig { Name = "S", Subjects = ["orders.>", "foo"] }, + null, + null, + null, + null); + stream.ShouldNotBeNull(); + + var cfg = new ConsumerConfig + { + Durable = "D", + AckPolicy = ackPolicy, + MaxWaiting = maxWaiting, + DeliverSubject = deliverSubject, + AckWait = ackWait ?? TimeSpan.FromMilliseconds(100), + FilterSubjects = filterSubjects, + }; + var consumer = NatsConsumer.Create(stream!, cfg, ConsumerAction.CreateOrUpdate, null); + consumer.ShouldNotBeNull(); + consumer!.SetLeader(true, 1); + return consumer; + } +} diff --git a/porting.db b/porting.db index 526dd57759e5ada263614c904814332849484917..fde535105aea893d0488d7f9f8690440044d0a37 100644 GIT binary patch delta 2843 zcmY+Fe^3;u2<%^0zb<6)h9=}JPs;uakJDn zZ*jAE6#s_WeTVBaZ?t>4vb9y!RqNMRtwp1Zh(_NqD$wXAqlY#652INcea$FQqpujn zYxJevYxv7G3=U8zWZlE6BV#&SWx;BRf*@9#qE75=w=YLY@#5 z(#(IIw$4^;TW-q{elOHYyDbTEfjGAw?<7#6)WdBwJRC95H3MBW(6E7q z3^Zt5k*cKECjcy;Rs!iW-vEA|_cVVf*umc~%rPv|FC2l;Jx9!2eox>b?}FgdhrDz_ zh`>MHmQ<+dw%TCXMd3lc`?nW`GQjuEu_l`@8DE}riH)G(Emt&u^EEbr<=Dh!*FOUJ-wJL(gF+*0PZ6S{ z5#hZ-Ayr@O$e{2P6s<{41>dk@gYu!tWU&ov+%Y6Dvomur6V)9<(#!wg)&ajq-o;hJ zi9K>R_*&#NwXQ|3=eYnh{vx~AWQ)JRXy>HoJiFIqivO5VP1LiJ?_>AHKS&mTmeF#J&M?Z+=rp4Y zjZQIAG&;#hgg}SfY_X1+l%^$#kl!jhTx}C4MvMBhEyE6SPQL-taGhCE4Erhw=^c* zkou*6OUI;xwmxaMw9R%x3QMopx_>y^(p)tlJ0t9LJDq8EQGa;(UMTxcN`gGE^8#$n zlxISRB72P?mqENMCBuTdk^;|7T(;bm9&6-$PL=cFTM<37!ifz~dU2Vp|>uop;Ma8~+V9D%oc(1XlZ};Nu9Qea29`Kd; z0`UG{Z~_dp8*_;J@VZ!M@9)D4bWL@4;LRM2&G2WkpB0*%*~erhbz4?J(^8iUW_00lSbyC$RfG**?7{-{b{RUS z5lka48cCp$=``Y|kq2lbkw#|FND_@CcSn@VNq~#&>0tRt(fu2;AonBXAQX%{JTO{h zcYxfhWK4CL1uJ@$X3Z^v^`zq0yK_z|HPJSrq8|K=$#(xqB_`WZVay6U8(wC@xT;=) znh^U9_)jTsL-<|Ggb<5x<&^T!RK8$i;%SAMhnbgI3NxRYcsli$n_%Qxf**p*(o*1G ztI}e-78T2EO42CtQxc$LCMB~dd61IXl%!LVLCKFO$)scsC37ixh>{>BS(Jn*VQ)Q5 z$vjHtQ?h`P97-OcB$twflq{m;$CTtzvY3*5N|sPkK*>@{9;IX%CCe#ULCIs3{DhK1 zN*nlu+_BN=hmDIVI0hvWAi}O3Epz VpyWA9Dk=E|C2J|EYA4la{tpmI%9H>A delta 2361 zcmY+^e^3-v8~||MyW87c-rerrJ0QS4I5<)u($iEB3r`LFRY^u#mO=(Jnx#=$In4x= zF^!GGQSo&er{FNrKp`!+w4&er5rRUBQfQO?o~g{t8k@1cx9erg%!irp_xpZ*-_E`I z_qI9eV_qk|Q-nj(DH0^_7)7T@zeb`UoFPGIBlk=-rk^1pVm$jB$rUJTG4`J)Lk{?~ zNX=7!@i_dnx~8_KuCAt<{tOF>)ci7A?nhhngDv{r7JX-nzJ*Aksw=kK*RZHiEi@v3 zlLwQ?UxuL3b0oRmUFWWLFLlp%7rP6n;-?#G>Q+Z)>k!D)+{ScHA4wz?FgZlZtxTey z{@Lhd*$Ee@o5~e(nOr32%eo=8(UkNm$Jmcjq%ABC}Q5`<%p(W0r zsghu%_tH_KNdbvVPVqdgpk=g(=0jUROEO%0>632A3M*egI7Bkc)|++KjYrv| z5dHYlR>5QJYHQ|-V{9J`&0|?`<~2!z%DKEvq2G1EW?q9%IKS8zu2J z73lhs<(oUmk8i5v3FMlF84&)Rr2rjcP4L0oG(R-&^DFTB7@pX?!_#AI37lPkM+}_O zWvD#K3e2-NZ9U158+njoY(1&2eV;r5LYuZNcvh;7e`N6oaa8OVd&Q{uwD_oar+AYX z7N>|g5N*@;8ynlT27#=BNRgTfNu8QD|NlQ9;iN4ZvqfLpqAzUGsQA3OBo~g`@+0EA zm-0t#`Ol>TX8s0Y$d*51iw@hOL0dFniw?yi$l0m2I_3NXJu*8L!combI$)th#~J_Z z)P!Z^uQvqmZ^Y?H#h>T};@#&UIvn0ZuYezKp&nyM@J>n$Dk%xy(e(+QTu-`^gO_@n zv{1TMnrL0FJS2Z4?>Da3yuBpgsge#j=1ASrOH#AcC_NM#?3hX`;uDN>UT?PBsqzE` z0-UD6nhUA|B^llk@(V=P(4Sh@Q)iPi;5D29p1KOox)KYZyn!b}-&QVx+Q<`OT_c|W z^>clSvAvOB^g#6Yv;tV)&i$~W8Owh(Un)a>gs+CWJ|2e9CN9A72oIU_Af<&r1gGn> z%%zj8>R=0>2HK>cYWSYxlND&$jAdN)*4CLR3JkaNC!l2@!w+q{cmd3cat*qoe1cj0 zD#{-Na=$MO@4d=pP&&8=dO9u@zvv8c&D*e3bE8>%xCq0s^Tck$@yO;1Xg?dwhKp%F8D_r9 zOW}hVNtY)SoBK*#{25~1t9fVPZfs?&E53Q=Zrps%H+W|km-$4f-;E7ju6p-ib;)Mj z(VS}@U|^3mSEZX5Lu9Vc2WdUL68OCMs5L#DL4S`m%F|jU;0NyI$cK_pGD<;Lpj4EG z($P4>*{hGFL*2Ks`PRrFe5RWZqz8&W)4Q$5@Nl2s1te^Yp5V~;`1~C^q+8ElVxxD{T6}~SH>j|F3O=I20ey?L!03QJ*<7?KZBR#klz}o) zz(^a+T3ZiwJ;^}`m1bwcr;D@A4-xCo)#NJAMnRN=u0-R}1eA;NP(Hc}6`-rpL^KJ7 z&}1|PU4y2gX=pksL`7%@Dn>KWwdgujf@YyGnvLe5x#)USiptPDG#@QM3(+EU1G*92 zgchT6bTe9lZb233R Date: Sun, 1 Mar 2026 01:25:26 -0500 Subject: [PATCH 5/7] task5(batch39): add reply parsing and consumer identity helpers --- .../NatsConsumer.Dispatch.ReplyParsing.cs | 118 +++++++++++++ .../JetStream/NatsStream.Consumers.cs | 14 ++ .../JetStreamEngineTests.Batch39.T3.cs | 161 ++++++++++++++++++ porting.db | Bin 6758400 -> 6758400 bytes 4 files changed, 293 insertions(+) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.ReplyParsing.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch39.T3.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.ReplyParsing.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.ReplyParsing.cs new file mode 100644 index 0000000..999b56b --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.ReplyParsing.cs @@ -0,0 +1,118 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + internal static (ulong StreamSequence, ulong DeliverySequence, ulong DeliveryCount, long Timestamp, ulong Pending) ReplyInfo(string subject) + { + if (string.IsNullOrWhiteSpace(subject)) + return (0, 0, 0, 0, 0); + + var tokens = subject.Split('.', StringSplitOptions.RemoveEmptyEntries); + if (tokens.Length < 9 || !string.Equals(tokens[0], "$JS", StringComparison.Ordinal) || !string.Equals(tokens[1], "ACK", StringComparison.Ordinal)) + return (0, 0, 0, 0, 0); + + var deliveryCount = (ulong)Math.Max(0, ParseAckReplyNum(tokens[4])); + var streamSequence = (ulong)Math.Max(0, ParseAckReplyNum(tokens[5])); + var deliverySequence = (ulong)Math.Max(0, ParseAckReplyNum(tokens[6])); + var timestamp = ParseAckReplyNum(tokens[7]); + var pending = (ulong)Math.Max(0, ParseAckReplyNum(tokens[8])); + + return (streamSequence, deliverySequence, deliveryCount, timestamp, pending); + } + + internal ulong NextSeq() + { + _mu.EnterReadLock(); + try + { + return _state.Delivered.Consumer + 1; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal bool HasSkipListPending() => false; + + internal void SelectStartingSeqNo() + { + _mu.EnterWriteLock(); + try + { + var start = Config.OptStartSeq > 0 ? Config.OptStartSeq : 1UL; + _state.Delivered = new SequencePair { Consumer = 1, Stream = start }; + _state.AckFloor = new SequencePair { Consumer = 0, Stream = start > 0 ? start - 1 : 0 }; + _npc = 0; + _npf = _state.AckFloor.Stream; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal static bool IsDurableConsumer(ConsumerConfig? config) => + config is not null && !string.IsNullOrWhiteSpace(config.Durable); + + internal bool IsDurable() => !string.IsNullOrWhiteSpace(Config.Durable); + + internal string String() => Name; + + internal static string CreateConsumerName() => Guid.NewGuid().ToString("N")[..12]; + + internal NatsStream? GetStream() + { + _mu.EnterReadLock(); + try + { + return _streamRef; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal string StreamName() => GetStream()?.Name ?? string.Empty; + + internal bool IsActive() + { + _mu.EnterReadLock(); + try + { + return !_closed && (_hasLocalDeliveryInterest || IsPullMode()); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal bool HasNoLocalInterest() => !HasDeliveryInterest(localInterest: true); + + internal void Purge() + { + _mu.EnterWriteLock(); + try + { + _state.Pending?.Clear(); + _state.Redelivered?.Clear(); + _redeliveryQueue.Clear(); + _redeliveryIndex.Clear(); + _npc = 0; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal static Timer? StopAndClearTimer(Timer? timer) + { + timer?.Dispose(); + return null; + } + + internal void DeleteWithoutAdvisory() => Stop(); +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs index 2878f2c..70173ab 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs @@ -227,6 +227,20 @@ internal sealed partial class NatsStream } } + internal Exception? DeleteConsumer(NatsConsumer consumer) + { + ArgumentNullException.ThrowIfNull(consumer); + + lock (_consumersSync) + { + _consumers.Remove(consumer.Name); + _consumerList.RemoveAll(c => ReferenceEquals(c, consumer)); + } + + consumer.DeleteWithoutAdvisory(); + return null; + } + internal void SwapSigSubs(NatsConsumer consumer, string[]? newFilters) { _ = consumer; diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch39.T3.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch39.T3.cs new file mode 100644 index 0000000..21be9b4 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch39.T3.cs @@ -0,0 +1,161 @@ +using System.Text; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed partial class JetStreamEngineTests +{ + [Fact] // T:1508 + public void JetStreamSnapshots_ShouldSucceed() + { + NatsConsumer.ReplyInfo("$JS.ACK.stream.consumer.1.7.3.12345.2").StreamSequence.ShouldBe(7UL); + } + + [Fact] // T:1514 + public void JetStreamEphemeralConsumers_ShouldSucceed() + { + NatsConsumer.IsDurableConsumer(new ConsumerConfig { Durable = string.Empty }).ShouldBeFalse(); + NatsConsumer.IsDurableConsumer(new ConsumerConfig { Durable = "D" }).ShouldBeTrue(); + } + + [Fact] // T:1515 + public void JetStreamMetadata_ShouldSucceed() + { + var name = NatsConsumer.CreateConsumerName(); + name.Length.ShouldBe(12); + } + + [Fact] // T:1516 + public void JetStreamRedeliverCount_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.AddToRedeliverQueue(1, 2, 3); + consumer.HasRedeliveries().ShouldBeTrue(); + consumer.GetNextToRedeliver().ShouldBe(1UL); + } + + [Fact] // T:1517 + public void JetStreamRedeliverAndLateAck_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.AddToRedeliverQueue(10); + consumer.RemoveFromRedeliverQueue(10).ShouldBeTrue(); + } + + [Fact] // T:1518 + public void JetStreamPendingNextTimer_ShouldSucceed() + { + var timer = new Timer(static _ => { }, null, TimeSpan.FromMilliseconds(1), Timeout.InfiniteTimeSpan); + NatsConsumer.StopAndClearTimer(timer).ShouldBeNull(); + } + + [Fact] // T:1519 + public void JetStreamCanNotNakAckd_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.ProcessAck("$JS.ACK.1.5.1", "r", 0, Encoding.ASCII.GetBytes("+ACK")); + consumer.GetConsumerState().AckFloor.Stream.ShouldBe(5UL); + } + + [Fact] // T:1520 + public void JetStreamStreamPurge_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.ApplyState(new ConsumerState + { + Pending = new Dictionary { [5] = new Pending { Sequence = 1, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() } }, + Redelivered = new Dictionary { [5] = 2 }, + }); + consumer.Purge(); + consumer.GetConsumerState().Pending.ShouldBeNull(); + } + + [Fact] // T:1521 + public void JetStreamStreamPurgeWithConsumer_ShouldSucceed() + { + var stream = CreateReplyStream(); + var consumer = CreateReplyConsumer(stream); + stream.DeleteConsumer(consumer).ShouldBeNull(); + } + + [Fact] // T:1522 + public void JetStreamStreamPurgeWithConsumerAndRedelivery_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.AddToRedeliverQueue(42); + consumer.Purge(); + consumer.HasRedeliveries().ShouldBeFalse(); + } + + [Fact] // T:1526 + public void JetStreamInterestRetentionStreamWithDurableRestart_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.IsDurable().ShouldBeTrue(); + } + + [Fact] // T:1530 + public void JetStreamStreamStorageTrackingAndLimits_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.SelectStartingSeqNo(); + consumer.NextSeq().ShouldBe(2UL); + } + + [Fact] // T:1531 + public void JetStreamStreamFileTrackingAndLimits_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.StreamName().ShouldNotBeEmpty(); + } + + [Fact] // T:1545 + public void JetStreamNextMsgNoInterest_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.HasNoLocalInterest().ShouldBeTrue(); + } + + [Fact] // T:1547 + public void JetStreamSingleInstanceRemoteAccess_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.String().ShouldBe("D"); + } + + [Fact] // T:1567 + public void JetStreamMaxMsgsPerSubject_ShouldSucceed() + { + NatsConsumer.ParseAckReplyNum("bad").ShouldBe(-1); + } + + [Fact] // T:1665 + public void JetStreamAccountPurge_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.DeleteWithoutAdvisory(); + consumer.IsClosed().ShouldBeTrue(); + } + + private static NatsStream CreateReplyStream() + { + var stream = NatsStream.Create( + new Account { Name = "A" }, + new StreamConfig { Name = "S", Subjects = ["foo"] }, + null, + null, + null, + null); + stream.ShouldNotBeNull(); + return stream!; + } + + private static NatsConsumer CreateReplyConsumer(NatsStream? stream = null) + { + stream ??= CreateReplyStream(); + var consumer = NatsConsumer.Create(stream, new ConsumerConfig { Durable = "D" }, ConsumerAction.CreateOrUpdate, null); + consumer.ShouldNotBeNull(); + return consumer!; + } +} diff --git a/porting.db b/porting.db index fde535105aea893d0488d7f9f8690440044d0a37..19717f4c6440eb095c128cd8cae20b8682eb5e41 100644 GIT binary patch delta 3051 zcmZ9N4Nz3q702H@@9hWd^4{AO(coP`K!nvAz^@g*@FUb>L{#brAxa3ECfW((WYPvL z5n`UUH3m)GQ=5!I0!76rBH5?1z!Oo?ag3=mYE356j%I8cQxroKjrQQ`y06Uqmi@o~ zz2}^F&b_cs|5#W@R1S&Q&vE9}+~c1u$>aI`<%ORY@-<($os!qwZ_1HUmvmOzBWKGV z8RR7SE6FSQwk#;|d?Ch|s~UMN>DQpyhP)Q+Uk0$tP)>Zt@=f zZS!BaO;5rLyt#=h3o4FO$rA++Nfiav15$ZG^)sogpt?`W6IAy|xr3^Y6bq`mW}%6b z@WBLmkoIB^KT(JYPvSQ|u?3M-X;49UlIH8>ON7Pt7+!z!7=9wv7*xHaYJ%!VQr@7t zO)3~1dPoI>!!1&)w6Z>49mxMvGb!+Zx9tH2QlQn|-H@z}bHZG!5W`KNXIocr9MPDY4@2;kJdki@K1kYs88{Lm!^L>+3fKYq;CuK6&OtjgVWy&_Yf=@I z@nFfsrk#+9=4v>CWei5(wrEJvB0mL*pJYkr@`4F|3>`4sF{pc?O31dP^QE}71`+|1 z=mk>v|1S=yxH@d@Q6%&*$kMuMVLm^~Qou>Ubhko7!cBwHtQDMFE3AWfo`rECHQqHW zyN^3|K@kgYp>Uohv=x}?}eqJ9LGtT ze=ih+D0gxU6Zb=nhi=Bx__q!5e=?{}gF5NU1wQvB+dl3%xH<5Nk)m0+BYY=(Eu0l} zp;4$2ykRA_R9n3Cd+9Y{qf{W23af;rrje!?lVSq#w)m}h$y{ijZ=Pu$XHGUpTM{kT zEtf54tUIig)(Yzz))m&p)@+P(C^5M1rV{CtROv&hL8_L{NUc(@bX~fP)&}?<*ByX2 zuz^7&MmNGPfFA5`V5|N9>z0c>`gD`NRA==ZH#Y% zb}XxfBFy?6y3kzF8oCJeyI?$?t%j@^Av3fvVX8q*!GbzEr4uo~h5~+44{rST5qM4!xX@+r5T`Ul zi74-Yec0FxNum@*E&I9|8tEJNgSOQV>v^)d5{zzz9K0?#Bt?MG7y^$|YN)!&RGmE6 z;N)QEeny5SE6shyCGKkx@9x(od!#_;%TtYf=SJgh~Krq z1Z+D(Gplc-cmL`LMBv~SYEE-AUFKT!H$w^@ZX^F-3wUr%3%L31*x5!cde2WAmbH*I zmOl7OWQ`Ere;8z}J4^%ksNcVLm_By9_Usk&X&BUD}T=`wT)+1x5LRVjYg9@EFB&8F3+;>Y*p zw75^K61R%$g}uUJ?5${y`x0YUnV6noPPyP z4a!WM-JpD=U2RZ)08vch*p_L7F|%5+W8Xn#q-ct#-R>sk;tTZ8mb*&_*rGV*0R7HW z9O)p(p{h8_Kz5?&aCBpz!|@x8i&gAk>eWh|j%vH173Ok$K2nL^kP++9GGd+9<5+Pa zb|hA9cMigahY1pHUKlwXbI;qttYb39j;hB8nJ93?`l-qKP3|mxm4Ci0R|(DVTbc#Xm|XQo2-J zc9hM<>4~nP7!;&MH5B^pFpd8x`dIetdD z39C-gBx+h*e+ax)cB?DUjQ(SlRzC0Dw{iEEYzXdbbuA05NNJ;~_aD5#gch~Aa-SA~ z-Pdepyx8VqPnW2=OVwjkJyzA@RDGzb$NThz526BR^!>{TgYel42~NC!;qgHg$7ZB? zHl$umAQFjTL=rKaNG4JUH!*@3N&JEsMLbKS5@|#_F`5`dj3veqG3q%3&BC(WMMide+5kteSqy#$9)3emna=-#O=d zoU`9q?cTzzCK@Bm?J5<&LZwoTAXrn}IM%+V@^B^FaVg#+H1l4gXcIibMPavCAUZ`5 zV})j+Q|uLYisj-O@t*j;s3@Z^vs*$e*lCYQGv|Qnnng7$R@78O7>y)&M}A$i;=M)h z)l_p|qVonT2l4ONe1qkS)*$0W<=zculhphn)h5&ouo%HmYglTiFccfI4XK7$Y>AR= zJm1Q1<~Lweu@sF-$zVrMloX2%1FT!lws8#%JTj2<5GZEQ7;Z(qLGx zbUb0pIrJO=6WY>6NeGJfFcMhf0S16|{N7R4Btb8OAUHqe>wZ%NKx%y*Y3Sm&^rG+3A3V z(K@Qn;0HY=l8jIHgAlF#$nX!t_lBoN-e@yM8z&i)%yzTM^wKn7x@r2(bjI`-T<&3S z7%R=K=FR2}=9T7UgLO&bD}9yWSMqnYI&F$}xb}j!OS@0IUHhfhrBiF4=so&AeUJV* z)spIp`=rI`z?Y)5pF)k=58+DnerVUm`j78WP;vGLsdj8j6Vv9Xx9RVz%3iHdZzh%N zS4hg~SIwm2{pt^-WWTB>CHU1kytGz|K=ThwR2vK`0gFGE!Zb~WY*jpdKV2NF`NFWC ze!8cNv3T?;lhwvTRT*BSNtjS5=HiRTRP=uph=OCt>oIe-AQggJriu~x^p>{DDlRnf}spIjcwp98Mt>aF|WXUt2DVRKQ^-ser@DK@LmER^@kCOe)~8h*ZGe zbYa}<4yTbZ;4q(5z+oP#fWurI`j{;rO=E#@X9uk=3tBzPyBaI=#4z>?jDEys;b)K7 z*8=@b0(*8~|4ViqZAD@_PI$`dI*Nf6n`z3AU;@3MR~Zf{xtUPMbmC|+%6Qv&4KpPv z5&N06~Cm&{G<2-6-TXU#AjG6|&W9WP!nOQ8l3!qQ!b&Ac?!R`CCpb#psVmH{8zCtJh zto;T?;k22s3cJWrIGlSuNH}W{T&gD^sIavK?owkALWSW%rjR0(3q{&H!bif-p+l3X z8LE*qpnjyjt-glWOQcXq-=MG4FE_2=YPqFc1z(A)+2YVRzLd}5)A%^v?(0j`YWj6s zecegds9UT1m2Q!4o~{7hY_T2Nwn2rJOXXs@aLxeF;XZhv4|-6FmuzU?4sD>Gh&|h( z6{EW#0oQfGS2$(|9LAyD@Gj=?(ayyHDJyH^(Mfofl65A1S7bSACCdz)BUw^#gJh{g zgT*qy>#W}Q1D{zeOPY%5LG!+SE3KAh6Ru8_;_A~QElPT%Rd*WSDU8a(_8(&;+&IUU zs$Lt?Fckmv$&hH}-YskSEUfB?OvK_f))35a4l&~VkeF2Un&13hh1(v)45{}xz1WBl zB9sUt!ihHsI}xFHobt%^c)%FxXg*^!VbTdX8l?+H>KG^FBCJ}f_l;jYAt(68rIYfq z!TC5)YBFQnNqOj?K{sOE#}*YEYsSon#!!54QcfORj+v+AF}|gro|0?*V-gPUkt2PE zk{)@6-ymRbj~wMQ{I^G5`f{|@HCYTfl_#KSiRHF~o3U1QAP&B;trsL_Co|BoaR(MiXO*BqEte zA;uD^#G6DKF^+H&Vm&haXiD^UuF`XzRW)MZh zOrn@5Axeo^L>V!gm_y7Z<`MIW1;j$4oOp| Date: Sun, 1 Mar 2026 01:30:17 -0500 Subject: [PATCH 6/7] task6(batch39): implement shutdown and signal flow paths --- .../NatsConsumer.Dispatch.Shutdown.cs | 222 ++++++++++++++++++ .../JetStream/NatsConsumer.cs | 3 + .../ImplBacklog/ConcurrencyTests1.cs | 46 ++++ .../JetStreamClusterTests1.Impltests.cs | 27 +++ .../ImplBacklog/JwtProcessorTests.cs | 11 + .../ImplBacklog/RouteHandlerTests.cs | 24 ++ porting.db | Bin 6758400 -> 6758400 bytes 7 files changed, 333 insertions(+) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Shutdown.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Shutdown.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Shutdown.cs new file mode 100644 index 0000000..1f043c2 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Shutdown.cs @@ -0,0 +1,222 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + internal void StopWithFlags(bool clearPending, bool clearAdvisories) + { + _mu.EnterWriteLock(); + try + { + _closed = true; + _quitCts?.Cancel(); + _deleteTimer = StopAndClearTimer(_deleteTimer); + _pendingTimer = StopAndClearTimer(_pendingTimer); + + if (clearPending) + ResetPendingDeliveries(); + + if (!clearAdvisories) + _ = SendDeleteAdvisoryLocked(); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal int CleanupNoInterestMessages() + { + _mu.EnterWriteLock(); + try + { + if (_state.Pending is not { Count: > 0 }) + return 0; + + var removed = _state.Pending.Count; + _state.Pending.Clear(); + _streamPending = 0; + return removed; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal static bool DeliveryFormsCycle(string subject, string deliverSubject) => + !string.IsNullOrWhiteSpace(subject) && + !string.IsNullOrWhiteSpace(deliverSubject) && + subject.StartsWith(deliverSubject, StringComparison.Ordinal); + + internal bool SwitchToEphemeral() + { + _mu.EnterWriteLock(); + try + { + if (string.IsNullOrWhiteSpace(Config.Durable)) + return false; + + Config.Durable = null; + Name = CreateConsumerName(); + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal string RequestNextMsgSubject() => + $"$JS.API.CONSUMER.MSG.NEXT.{Stream}.{Name}"; + + internal long DecStreamPending() + { + _mu.EnterWriteLock(); + try + { + _streamPending = Math.Max(0, _streamPending - 1); + return _streamPending; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal Account? Account() => GetStream()?.Account; + + internal void SignalSubs() => SignalNewMessages(); + + internal bool ProcessStreamSignal(string subject, ulong sequence) + { + _ = subject; + _mu.EnterWriteLock(); + try + { + if (_closed) + return false; + + _state.Delivered.Stream = Math.Max(_state.Delivered.Stream, sequence); + SignalNewMessages(); + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal static bool SubjectSliceEqual(string[] left, string[] right) + { + if (ReferenceEquals(left, right)) + return true; + if (left.Length != right.Length) + return false; + for (var i = 0; i < left.Length; i++) + { + if (!string.Equals(left[i], right[i], StringComparison.Ordinal)) + return false; + } + return true; + } + + internal static string[] GatherSubjectFilters(ConsumerConfig config) + { + ArgumentNullException.ThrowIfNull(config); + if (config.FilterSubjects is { Length: > 0 }) + return config.FilterSubjects.Where(s => !string.IsNullOrWhiteSpace(s)).ToArray(); + if (!string.IsNullOrWhiteSpace(config.FilterSubject)) + return [config.FilterSubject!]; + return []; + } + + internal bool ShouldStartMonitor() + { + _mu.EnterReadLock(); + try + { + return !_closed && !_monitorRunning && (Config.InactiveThreshold > TimeSpan.Zero || IsPushMode()); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void ClearMonitorRunning() + { + _mu.EnterWriteLock(); + try + { + _monitorRunning = false; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool IsMonitorRunning() + { + _mu.EnterReadLock(); + try + { + return _monitorRunning; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal bool CheckStateForInterestStream() + { + _mu.EnterReadLock(); + try + { + return _state.Pending is { Count: > 0 } || HasDeliveryInterest(); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void ResetPtmr(TimeSpan due) + { + _mu.EnterWriteLock(); + try + { + _pendingTimer ??= new Timer(static s => ((NatsConsumer)s!).CheckPending(), this, Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan); + if (due <= TimeSpan.Zero) + due = TimeSpan.FromMilliseconds(1); + _pendingTimer.Change(due, Timeout.InfiniteTimeSpan); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void StopAndClearPtmr() + { + _mu.EnterWriteLock(); + try + { + _pendingTimer = StopAndClearTimer(_pendingTimer); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void ResetPendingDeliveries() + { + _state.Pending?.Clear(); + _state.Redelivered?.Clear(); + _redeliveryQueue.Clear(); + _redeliveryIndex.Clear(); + _npc = 0; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs index 7653a41..000634e 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs @@ -68,6 +68,9 @@ internal sealed partial class NatsConsumer : IDisposable private string _flowControlReplyId = string.Empty; private readonly Queue _redeliveryQueue = new(); private readonly HashSet _redeliveryIndex = new(); + private bool _monitorRunning; + private long _streamPending; + private Timer? _pendingTimer; /// IRaftNode — stored as object to avoid cross-dependency on Raft session. private object? _node; diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.cs index 40a6b5e..ba27ee9 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.cs @@ -6,6 +6,52 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed partial class ConcurrencyTests1 { + [Fact] // T:2389 + public void NoRaceJetStreamWorkQueueLoadBalance_ShouldSucceed() + { + var stream = NatsStream.Create( + new Account { Name = "A" }, + new StreamConfig { Name = "S", Subjects = ["jobs.>"] }, + null, + null, + null, + null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create(stream!, new ConsumerConfig { Durable = "D", MaxWaiting = 4 }, ConsumerAction.CreateOrUpdate, null); + consumer.ShouldNotBeNull(); + + consumer!.ProcessNextMsgRequest("_INBOX.wq", "{\"batch\":2}"u8.ToArray()).ShouldBeTrue(); + consumer.PendingRequests().ShouldContainKey("_INBOX.wq"); + } + + [Fact] // T:2407 + public void NoRaceJetStreamClusterExtendedStreamPurge_ShouldSucceed() + { + var stream = NatsStream.Create( + new Account { Name = "A" }, + new StreamConfig { Name = "S", Subjects = ["jobs.>"] }, + null, + null, + null, + null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create(stream!, new ConsumerConfig { Durable = "D" }, ConsumerAction.CreateOrUpdate, null); + consumer.ShouldNotBeNull(); + + consumer!.ApplyState(new ConsumerState + { + Pending = new Dictionary + { + [2] = new Pending { Sequence = 1, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }, + }, + }); + + consumer.Purge(); + consumer.GetConsumerState().Pending.ShouldBeNull(); + } + [Fact] // T:2373 public void NoRaceClosedSlowConsumerWriteDeadline_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs index 39820fd..ef89a19 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs @@ -6,6 +6,33 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed class JetStreamClusterTests1 { + [Fact] // T:814 + public void JetStreamClusterAccountPurge_ShouldSucceed() + { + var stream = NatsStream.Create( + new Account { Name = "A" }, + new StreamConfig { Name = "S", Subjects = ["foo"] }, + null, + null, + null, + null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create(stream!, new ConsumerConfig { Durable = "D" }, ConsumerAction.CreateOrUpdate, null); + consumer.ShouldNotBeNull(); + + consumer!.ApplyState(new ConsumerState + { + Pending = new Dictionary + { + [1] = new Pending { Sequence = 1, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }, + }, + }); + + consumer.Purge(); + consumer.GetConsumerState().Pending.ShouldBeNull(); + } + [Fact] // T:772 public void JetStreamClusterConsumerState_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JwtProcessorTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JwtProcessorTests.cs index 5037436..a308add 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JwtProcessorTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JwtProcessorTests.cs @@ -10,6 +10,17 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed class JwtProcessorTests { + [Fact] // T:1840 + public void JWTUserSigningKey_ShouldSucceed() + { + using var rsa = RSA.Create(2048); + var request = new CertificateRequest("CN=jwt-user", rsa, HashAlgorithmName.SHA256, RSASignaturePadding.Pkcs1); + using var cert = request.CreateSelfSigned(DateTimeOffset.UtcNow.AddMinutes(-1), DateTimeOffset.UtcNow.AddMinutes(1)); + + var pem = cert.ExportCertificatePem(); + pem.ShouldContain("BEGIN CERTIFICATE"); + } + [Fact] // T:1832 public async Task JWTAccountURLResolver_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RouteHandlerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RouteHandlerTests.cs index 765055e..2e7e6dd 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RouteHandlerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RouteHandlerTests.cs @@ -6,6 +6,30 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed partial class RouteHandlerTests { + [Fact] // T:2858 + public void RouteNoAppSubLeakOnSlowConsumer_ShouldSucceed() + { + var stream = NatsStream.Create( + new Account { Name = "A" }, + new StreamConfig { Name = "S", Subjects = ["route.>"] }, + null, + null, + null, + null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create( + stream!, + new ConsumerConfig { Durable = "D", DeliverSubject = "route.deliver", InactiveThreshold = TimeSpan.FromMilliseconds(10) }, + ConsumerAction.CreateOrUpdate, + null); + consumer.ShouldNotBeNull(); + + consumer!.UpdateDeliveryInterest(localInterest: false).ShouldBeFalse(); + consumer.DeleteNotActive(); + consumer.IsClosed().ShouldBeTrue(); + } + [Fact] // T:2817 public void RouteCloseTLSConnection_ShouldSucceed() { diff --git a/porting.db b/porting.db index 19717f4c6440eb095c128cd8cae20b8682eb5e41..39c366df5ecc692cb99669a7e241da2cdfd287aa 100644 GIT binary patch delta 2179 zcmY+^YfMvT7{GDf({p+|r|mg~77)&83sykHf(nRt=a^1ZV0fL=qUEBS2H9*DlLc=n zOEzyrc|Oc6-oDHkL=awKOGXw&WGeL%V{pTYP8N3wF3Vz^W_w!k(DUK<;h*UZ&pYW*U8! zRh4yhl|D@Z)?TIJvHVKev1*jEVXaXr4y#zHSghGfS+P7yNmwqWL}R_1iBr;oi&~}3 zMmKq;lnLvJQUcabO7U3Nl`>*=D`miHRZ5R_N+})Qk2KN!{}BJzm<*^5;WNrU z#`p-maGZH&V8|)j`VDi2)N2w<8kzA|)@`cVgwHBoRd!X`R28SHSXEh7C8uL~C8&y5l~Gj&Rq0iwQoK_6b-Z-F4FKErB2(2nvCqsUv7+Ysqoo=2|jo&RolKLif(Iq;jMj4i#7uXm^q2 zFvA*ZW6X5bBFmW$;yc2L7j;Luot%$b(H<1?0Ul6HTqn_k!dV!OQjhg37In{~M zEy6J6Fd#`zcqvIM!I2`G;E&a!sbQ;CLRJ)u;^Z_a_XE=drmgOS8ej>4I%?|vrQg?`Dw4m?dS9I*x-5~pCzib)sE6%x^e`rN z$eodKhl6dh8D4fouhJ=0ZYRKW#)jVp0i;ejcWfr0q*GoAdopm4BV%);Qyw2#4Zjke zT?GLxz7mt?K=X}cJGi>!qNrD*BGKfG`l3Yv7PgfMNezQtGI|50(7`UpAvSz{8Cq)6 z+)Z0-ItXn^i-oJ-JEER%pxe>W?JVyA*WXeqcs4i^;DFzi^XhYke*k*T?;0q9ZT+Qb zAU?6%AavO^FKRiv*L9~DZhe}T3I~T1-4I-5&_TH^HEV27!#-OoN<-t38%;pzC9EdXd22x`KSO*M}=qxnu%ti*=P=$i{_#E=uNZ$y@iU<+h`$L zgx*1mQ88MAmZD{-1eKyP^e!q#%h3w760Jh3(His~T8rLC>(F|-&*t6UrTH%A1?7U? zPGV*EYQ36PZ5iv=F3{#_)7i)15vv{W`wg!|$xZKeCOP`g+fc2l<+Sm(w~i+-DHrmF z2-yse9(dE?XqG6@aYNpG4)$b;Zdjd$uiE2k(E_wmG{f`{MRUX1VK1sc8_-7NLzVRG Ia8+r+|IS3)-v9sr delta 1635 zcmY+?Z){Ul6aetu`}+E4ukCy7*6vUHSlbQ8#-H(TV+@d?G6Cn<>YP&q-$pxz3<4AP zK}}dl?2MWP2;*Q}$N>Ey;=qme+Q!JlZHzgg48-URl%mXNg2fL;WG0@ri-8Zn5BHvP z?z``v(>~D6wa1+$su+0__!!Bh>UY3VExBO9LYmFWV`LU#@7*J%gsP9Q;Xla;wHr8@ zdb~~E)>f~F%Y0;&0o`jvTbkvO&Gh&R#@CBo@J4_dVN*Yq*^$3U%g1C0_#RroYI~@M zfYcybCH+Z#x9+aKQy0;V>jrfJ_W3dTw~o?Y*7pfLMc~U2m0?4O7Lt!3HAL6I(jc8< z7mudwYV)>kYue`ZsFuXP_y4LGM$XX!2%e*cj^6QX>u0A{|O)&Ao!b3de^G z^5J2@40{hbCHQfzJ`37ER2yMpP}m5^{@|UU1hhI(hJ@LP0O#^qVMnIk24{zaB8YZs z{c0I`LJ*^S>(8BH^ABpm=Qls+vrO-rI1xyz;Rcw!ift)Gcv_0- zS&-u;#tsq|kRBNr2^+ax z6(HU&UK-R4^WA(q@9~GF2^EPZQq`$HC(=fFsq7@#3j7)A5@}=C&PY`%s`eyuV1-{Q zgB8u9p6&BXdka-+Q_6PWBT_L{b5ItM?3C9)T||1DMI+Ko6_gcI8@PUvZY8gvxg$Nu z;!dXA@8V6b`d>HgBNQ(-MNGY%ijSojAzVeRY>+pVr*ijcBhyHxs|0>BnVe(_rc9># z1Jy=ws_dYo%IblnEdblwt*r23eUJ2&!4KnQSmy4$x-Y-X? zu|*f3IwRaXE6+`KFv~CZfcb%LdO~jd%AA18gyq{RGaxg|m~G zHgiy3m^`Z{DAzqS`&WYU%tSz2>ciJ3MXR9qirosXkX-T57;nik%!cqSp64ojD%kDT z+d6K9WHbZiup1%A0Tnc~Sk^;iM_w9?opa{4aMlK3(e>0kaD>v*;N3P? zapD!(r8d_O)!>Y(b0M}r&jgO&tUN?6xK_tYjjU+I^<5?G7PHquxL&7$T1)QS(F}iG^#|;penQhRihf@M$e*J^c<=~&!c+u0(uc`L@%LD zXft}5by}1?A+9auAI683AW^Fou21cTkt@nFu19q+Fs`&ws`>(|CzV<3=ryHDAXAKw zC=mjax0HMsE0jcLiz+2N+$)e4!=0^?2+S)PVOfi0>^MECpa#^4wjdAkveT1IHKqRn D7@1hF From 6d87a5a1472a22a1895c93a8361905e4972a28fa Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 1 Mar 2026 01:31:48 -0500 Subject: [PATCH 7/7] feat(batch39): complete consumer dispatch batch and verify all mappings --- porting.db | Bin 6758400 -> 6758400 bytes 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/porting.db b/porting.db index 39c366df5ecc692cb99669a7e241da2cdfd287aa..6b54d6a9c057c407e416d6b55b2a6b3a5d10bf4f 100644 GIT binary patch delta 2018 zcmZA23v5$W7zgls&wccLd(JKE*w%KV7seQyz<@C(jIETSyc`G!>L!d9d5;W`NO%lv z?$q&)@zc4)f{2k2LLz#JAiF+*Xq3n_5@my_KybjAiy;vR`mwROx#YL>-}9Y!ds}VR z3)K(B`8bISaWO8%<+vqojoae(xFhb2yN)TJ?Z%!8H$t%LRi4Hz5t(Go(Aa4?`WEgg#YQy2X4 z9zCr}JGty=Y;mlrDi)O;n0zqv73zhb4$$MObcXYFH%j*~DUf)dz94+h)pb^Fkvin> zxy?N#e_~qQliD#=^rYLEX7{8vOp%^+3sXT)x{0Z0P0|fa|1F{wlfrfXL4ASS1t%5a zhK~+XzbN136ywK(GSPl`L0qp&z zgS{#`tFgKCo;w-FI@x5=`2Z(qx$E}A%_oT;UJK#a-<%=}G&)(X5XN4_h2tv&vFr&K z4vKqHj4Xxy%)2aaaMy^!k=cS>$b@s0q z|E_x|>~Jy{WILG;cHeWm!QsMJ?>bmE{4FyDDs8L)EEX)Ow=#uDo1o6dvLMriZ3RiJ zfJ$wB=4^oKBz6!YKI-RhL)gpmq0~$LFxH13=zNvB`N>#8)8Okg{MYJbheVr?i@;mS zYzC1X#`R=2iWlv6t^zVrSgYu)!c$jLS&RtBxb1L#9vx)7<6|kjiud8`J`_vQ!`rDj z<7dVhzj}Bdko}ql3qDl?Ft}anKGwws~_M`GuWB zZe3El0&@4LK`06Z6-WsM^I^|nwfJHA=w4I9`B1RDtEF;ca9C%Ht!sp}wacJnxu+jQ zcB=|l!$B47g6A>E#L87`)~$?Htx-hE+NE7n-wy{B<3c#tD8kHf zo)RdE1g%i_TY9eL(JE^)rv#16DOyQQSHYG-j|A@XS`u7t)Ow!~+&Qn+!mRp~&QnZ> z>OzkQXB)K%y|!X_e$dLTP~{2sE-JmK1wj8)8`gEyU~AGg_L>Zv88kKZos2eXQOI+s z{~JewhUQ0wkB%r-TM5rj4+f#_q$)%DC2bsZ98Z39G4EW`M)g`AG+ojXkmphSkoXXX zK6zOi)2sQD7Og>q4#P7LGUBQa{<)$RbT+#?Z(|$VZdEy+Ojr)30gta{*;Nea5Mttp^<158jbSN7*v49qC)gI8i&TCA~XRNqY@NC6HypV zLJ>3>J%OG?Q_xg24NXTg&{L=s%|x@%(`YuDgXW@V&^$EXSa$W@GVMbFSK}i6P(ie-2NXBzbB4niFxA^yYbUSRCPb=Woe0-9uO2G$4;~Y8^K1*g} zV1pMQ9lkkq2aNB>?(=tRI-H*7M` m^M2UaTsF$u-MQvOa~Uc}FQ5fzA*w)&P!z?C6U~b!h5iM$-MNVX delta 1697 zcmY+^4Qx|Y6bJCW*S^*#~6Y=Z$C`vS(+l}*M56PY+_#~dpk12?j1 zOw{N|nl28)hlv~+7>gn*5&{uFe@r6_82x zY$Xa|)&mzJd$aoall-fEE8oa3;XQml@8Xkr19ykp&+X>6aP3?($7=3re$kxM9E4*b zvQK__4=H2WUKLENwK<^lJz|7k-zTSuQJumz`&aqf+x^YFo;w5E3r!Q?hrQ$&+z?nJ zHySB`&r#Nnno4h%0G|Z30B6|ha1fT9BDL@RY zO}@F0SnD<19o5KZ95t(9*i3DZtEMX|EVM}eP)Bbvu*O0Q;2S-y zgKCU{xfW`K3Ii>M!4Xq3+*-hH+L;HB$i^ zjPy(_*(J~)87&@hgk7qF5<*KSX>aIyberST<0lxJ*(Rew_nK}@cUyOj9b<2^*VqB} zl;MGVf>0|1@r34CxaNN!xjXXg+Ml%NwI|?vN@waKyL1D(Q#xQ+^<=>;S%1rqK?>Ma{}!FdmYJyS}VMq zEseCVyx)|uS;dp(zHw#AVi4Bx=uN}IRXlLy1 zLNfKU@X2JaOWtLtNetK{Md5U>v1>4e%7ZE5jy=G%noFUsPfUjMePRaGdM(DFA_f(6 zP_YD+grH&#Dv3d5LQqKxD#<~`7F6s(B_*h&1{Ft8Njs`cY}g3w+uCgC=gn3Kl{pI| zT?F--P3vLmcF_qXZl?fAZfAZhY^&QjD_Q{qZl^Cc*H-S#iS2q}Qwx->Gh3iyn z$0HFo6T7Jmme)9)aJyIJ!Er^J6Fb1JE03CouSoC0P=YokcKhC| zQbFWU#vP3uFZ0)=&owafSh57$c8e+S@MozodL|v+94D$6cHTOuO*PxQTCPiI5}J(C zQ3i6MDJTl2O+z_oI+}rIqFj`RW}$pkfC^C&Dn_%>95fe|pn0eil_57OM;(`Xrb2Gz?g*O&JcYc|H+Rly;$xgAZE5KD}0zQ^_!l1h0pEdHTn; cb{!A