diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Acks.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Acks.cs new file mode 100644 index 0000000..389d486 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Acks.cs @@ -0,0 +1,313 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + internal (JsPubMsg? Message, ulong DeliveryCount, Exception? Error) GetNextMsg() + { + _mu.EnterWriteLock(); + try + { + if (_closed) + return (null, 0, new InvalidOperationException("consumer not valid")); + + if (_state.Pending is { Count: > 0 }) + { + var sequence = _state.Pending.Keys.Min(); + var deliveryCount = IncDeliveryCount(sequence); + var message = new JsPubMsg + { + Subject = Config.DeliverSubject ?? string.Empty, + Reply = AckReply(sequence, _state.Delivered.Consumer + 1, deliveryCount, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), NumPending()), + Msg = [], + }; + return (message, deliveryCount, null); + } + + return (null, 0, null); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal (int Expired, int Waiting, int BatchRequestsPending, DateTime FirstExpiration) ProcessWaiting(bool endOfStream) + { + _mu.EnterWriteLock(); + try + { + var firstExpiration = DateTime.MinValue; + if (_waiting is null || _waiting.IsEmpty()) + return (0, 0, 0, firstExpiration); + + var expired = 0; + var batchRequestsPending = 0; + var now = DateTime.UtcNow; + var toRemove = new List(); + + foreach (var waitingRequest in _waiting.Snapshot()) + { + var isExpired = waitingRequest.Expires is DateTime expiresAt && now >= expiresAt; + if ((endOfStream && waitingRequest.NoWait == 1) || isExpired) + { + toRemove.Add(waitingRequest); + expired++; + continue; + } + + batchRequestsPending += Math.Max(0, waitingRequest.N); + if (waitingRequest.Expires is DateTime wrExpires && + (firstExpiration == DateTime.MinValue || wrExpires < firstExpiration)) + { + firstExpiration = wrExpires; + } + } + + foreach (var waitingRequest in toRemove) + _waiting.Remove(null, waitingRequest); + + return (expired, _waiting.Len, batchRequestsPending, firstExpiration); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool CheckWaitingForInterest() + { + var (_, waiting, _, _) = ProcessWaiting(endOfStream: true); + return waiting > 0; + } + + internal (TimeSpan Duration, Timer? Timer) HbTimer() + { + if (Config.Heartbeat <= TimeSpan.Zero) + return (TimeSpan.Zero, null); + + return (Config.Heartbeat, new Timer(static _ => { }, null, Config.Heartbeat, Timeout.InfiniteTimeSpan)); + } + + internal void CheckAckFloor() + { + _mu.EnterWriteLock(); + try + { + if (_closed || _state.Pending is not { Count: > 0 }) + return; + + var minPending = _state.Pending.OrderBy(static pair => pair.Key).First(); + var pendingStream = minPending.Key; + var pendingConsumer = minPending.Value?.Sequence ?? pendingStream; + + var desiredStreamFloor = pendingStream > 0 ? pendingStream - 1 : 0; + var desiredConsumerFloor = pendingConsumer > 0 ? pendingConsumer - 1 : 0; + + if (_state.AckFloor.Stream < desiredStreamFloor) + _state.AckFloor.Stream = desiredStreamFloor; + + if (_state.AckFloor.Consumer < desiredConsumerFloor) + _state.AckFloor.Consumer = desiredConsumerFloor; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal int ProcessInboundAcks(CancellationToken cancellationToken) + { + var processed = 0; + while (!cancellationToken.IsCancellationRequested) + { + JsAckMsg? ack; + _mu.EnterWriteLock(); + try + { + if (_ackQueue.Count == 0) + break; + ack = _ackQueue.Dequeue(); + } + finally + { + _mu.ExitWriteLock(); + } + + ProcessAck(ack.Subject, ack.Reply, ack.HeaderBytes, ack.Msg); + processed++; + } + + return processed; + } + + internal int ProcessInboundNextMsgReqs(CancellationToken cancellationToken) + { + var processed = 0; + while (!cancellationToken.IsCancellationRequested) + { + NextMsgReq? request; + _mu.EnterWriteLock(); + try + { + if (_nextMsgReqs is null || _nextMsgReqs.Count == 0) + break; + request = _nextMsgReqs.Dequeue(); + } + finally + { + _mu.ExitWriteLock(); + } + + _ = ProcessNextMsgRequest(request.Reply, request.Message); + request.ReturnToPool(); + processed++; + } + + return processed; + } + + internal void SuppressDeletion() + { + _mu.EnterWriteLock(); + try + { + if (_closed || _deleteTimer is null || _deleteThreshold <= TimeSpan.Zero) + return; + + _deleteTimer.Change(_deleteThreshold, Timeout.InfiniteTimeSpan); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal int LoopAndGatherMsgs(int maxIterations, CancellationToken cancellationToken) + { + var delivered = 0; + for (var i = 0; i < maxIterations && !cancellationToken.IsCancellationRequested; i++) + { + var (message, _, error) = GetNextMsg(); + if (error is not null || message is null) + break; + + delivered++; + _state.Delivered.Stream = Math.Max(_state.Delivered.Stream, (ulong)delivered); + _state.Delivered.Consumer = Math.Max(_state.Delivered.Consumer, (ulong)delivered); + + var expired = ProcessWaiting(endOfStream: false); + if (expired.Waiting == 0) + break; + } + + return delivered; + } + + internal string SendIdleHeartbeat(string subject) + { + var streamSequence = _state.Delivered.Stream; + var consumerSequence = _state.Delivered.Consumer; + var heartbeat = $"NATS/1.0 100 Idle Heartbeat\r\nNats-Last-Consumer: {consumerSequence}\r\nNats-Last-Stream: {streamSequence}\r\n\r\n"; + + _ = SendAdvisory(subject, heartbeat); + return heartbeat; + } + + internal string AckReply(ulong streamSequence, ulong deliverySequence, ulong deliveryCount, long timestamp, ulong pending) => + $"$JS.ACK.{deliveryCount}.{streamSequence}.{deliverySequence}.{timestamp}.{pending}"; + + internal void SetMaxPendingBytes(int limit) + { + _mu.EnterWriteLock(); + try + { + _maxPendingBytesLimit = Math.Max(0, limit); + _maxPendingBytesThreshold = Math.Max(1, _maxPendingBytesLimit / 16); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal (ulong Pending, Exception? Error) CheckNumPending() + { + _mu.EnterWriteLock(); + try + { + if (_npc < 0) + _npc = 0; + + var (pending, floor, error) = CalculateNumPending(); + if (error is not null) + return (0, error); + + _npc = (long)pending; + _npf = floor; + return (NumPending(), null); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ulong NumPending() => _npc < 0 ? 0UL : (ulong)_npc; + + internal void CheckNumPendingOnEOF() + { + _mu.EnterWriteLock(); + try + { + if (_npc < 0) + _npc = 0; + + if (_state.Delivered.Stream <= _state.AckFloor.Stream) + { + _npc = 0; + _npf = _state.Delivered.Stream; + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal (ulong Pending, Exception? Error) StreamNumPendingLocked() + { + _mu.EnterWriteLock(); + try + { + return StreamNumPending(); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal (ulong Pending, Exception? Error) StreamNumPending() + { + var (pending, floor, error) = CalculateNumPending(); + if (error is not null) + return (0, error); + + _npc = (long)pending; + _npf = floor; + return (NumPending(), null); + } + + internal (ulong Pending, ulong Floor, Exception? Error) CalculateNumPending() + { + if (_closed) + return (0, 0, null); + + var delivered = _state.Delivered.Stream; + var acked = _state.AckFloor.Stream; + if (delivered <= acked) + return (0, delivered, null); + + return (delivered - acked, delivered, null); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs index 6c91856..555dbc9 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs @@ -58,6 +58,10 @@ internal sealed partial class NatsConsumer : IDisposable private string _currentPinId = string.Empty; private DateTime _pinnedTs; private Dictionary? _rdc; + private long _npc; + private ulong _npf; + private int _maxPendingBytesLimit; + private int _maxPendingBytesThreshold; /// IRaftNode — stored as object to avoid cross-dependency on Raft session. private object? _node; diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerDispatchTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerDispatchTests.cs new file mode 100644 index 0000000..6d3a6ab --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerDispatchTests.cs @@ -0,0 +1,161 @@ +using System.Text; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class NatsConsumerDispatchTests +{ + [Fact] + public void ProcessWaiting_EndOfStream_ShouldExpireNoWaitRequests() + { + var consumer = CreatePullConsumer(maxWaiting: 8); + consumer.ProcessNextMsgRequest("_INBOX.a", Encoding.UTF8.GetBytes("{\"batch\":1,\"no_wait\":true}")).ShouldBeTrue(); + + var result = consumer.ProcessWaiting(endOfStream: true); + + result.Expired.ShouldBe(1); + result.Waiting.ShouldBe(0); + consumer.CheckWaitingForInterest().ShouldBeFalse(); + } + + [Fact] + public void HbTimer_HeartbeatConfigured_ShouldReturnTimer() + { + var consumer = CreatePullConsumer(maxWaiting: 8, heartbeat: TimeSpan.FromMilliseconds(50)); + var (duration, timer) = consumer.HbTimer(); + + duration.ShouldBe(TimeSpan.FromMilliseconds(50)); + timer.ShouldNotBeNull(); + timer!.Dispose(); + } + + [Fact] + public void CheckAckFloor_WithPendingEntries_ShouldAdvanceFloor() + { + var consumer = CreatePullConsumer(maxWaiting: 8); + consumer.ApplyState(new ConsumerState + { + Delivered = new SequencePair { Consumer = 20, Stream = 20 }, + AckFloor = new SequencePair { Consumer = 0, Stream = 0 }, + Pending = new Dictionary + { + [10] = new Pending { Sequence = 3, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }, + }, + }); + + consumer.CheckAckFloor(); + var state = consumer.ReadStoredState(); + + state.AckFloor.Stream.ShouldBe(9UL); + state.AckFloor.Consumer.ShouldBe(2UL); + } + + [Fact] + public void ProcessInboundAcks_QueuedAck_ShouldAdvanceAckFloor() + { + var consumer = CreatePullConsumer(maxWaiting: 8); + consumer.PushAck("$JS.ACK.2.8.1", "reply", 0, Encoding.ASCII.GetBytes("+ACK")); + + var processed = consumer.ProcessInboundAcks(CancellationToken.None); + var state = consumer.GetConsumerState(); + + processed.ShouldBe(1); + state.AckFloor.Stream.ShouldBe(8UL); + } + + [Fact] + public void ProcessInboundNextMsgReqs_QueuedRequest_ShouldPopulateWaitingQueue() + { + var consumer = CreatePullConsumer(maxWaiting: 8); + consumer.ProcessNextMsgReq("_INBOX.next", Encoding.UTF8.GetBytes("{\"batch\":2}")); + + var processed = consumer.ProcessInboundNextMsgReqs(CancellationToken.None); + var pending = consumer.PendingRequests(); + + processed.ShouldBe(1); + pending.ShouldContainKey("_INBOX.next"); + pending["_INBOX.next"].N.ShouldBe(2); + } + + [Fact] + public void PendingCounters_AndAckReply_ShouldTrackValues() + { + var consumer = CreatePullConsumer(maxWaiting: 8); + consumer.ApplyState(new ConsumerState + { + Delivered = new SequencePair { Consumer = 10, Stream = 30 }, + AckFloor = new SequencePair { Consumer = 7, Stream = 22 }, + }); + + var (pending, error) = consumer.CheckNumPending(); + error.ShouldBeNull(); + pending.ShouldBe(8UL); + consumer.NumPending().ShouldBe(8UL); + consumer.CheckNumPendingOnEOF(); + + consumer.SetMaxPendingBytes(256); + var ackReply = consumer.AckReply(30, 11, 1, 12345, pending); + ackReply.ShouldContain("$JS.ACK.1.30.11.12345.8"); + } + + [Fact] + public void SendIdleHeartbeat_ShouldReturnFormattedHeartbeat() + { + var consumer = CreatePullConsumer(maxWaiting: 8); + consumer.ApplyState(new ConsumerState + { + Delivered = new SequencePair { Consumer = 5, Stream = 9 }, + AckFloor = new SequencePair { Consumer = 4, Stream = 8 }, + }); + + var heartbeat = consumer.SendIdleHeartbeat("$JS.HEARTBEAT"); + + heartbeat.ShouldContain("100 Idle Heartbeat"); + heartbeat.ShouldContain("Nats-Last-Consumer: 5"); + heartbeat.ShouldContain("Nats-Last-Stream: 9"); + } + + [Fact] + public void LoopAndGatherMsgs_WithPendingEntries_ShouldDeliverMessages() + { + var consumer = CreatePullConsumer(maxWaiting: 8); + consumer.ApplyState(new ConsumerState + { + Delivered = new SequencePair { Consumer = 0, Stream = 0 }, + AckFloor = new SequencePair { Consumer = 0, Stream = 0 }, + Pending = new Dictionary + { + [1] = new Pending { Sequence = 1, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }, + }, + }); + + consumer.ProcessNextMsgRequest("_INBOX.loop", Encoding.UTF8.GetBytes("{\"batch\":1}")).ShouldBeTrue(); + var delivered = consumer.LoopAndGatherMsgs(4, CancellationToken.None); + + delivered.ShouldBeGreaterThan(0); + consumer.GetConsumerState().Delivered.Stream.ShouldBeGreaterThanOrEqualTo(1UL); + } + + private static NatsConsumer CreatePullConsumer(int maxWaiting, TimeSpan? heartbeat = null) + { + var stream = NatsStream.Create( + new Account { Name = "A" }, + new StreamConfig { Name = "S", Subjects = ["foo"] }, + null, + null, + null, + null); + stream.ShouldNotBeNull(); + + var config = new ConsumerConfig + { + Durable = "D", + MaxWaiting = maxWaiting, + Heartbeat = heartbeat ?? TimeSpan.Zero, + }; + var consumer = NatsConsumer.Create(stream!, config, ConsumerAction.CreateOrUpdate, null); + consumer.ShouldNotBeNull(); + return consumer!; + } +} diff --git a/porting.db b/porting.db index 2d0b329..526dd57 100644 Binary files a/porting.db and b/porting.db differ