diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Delivery.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Delivery.cs new file mode 100644 index 0000000..0077ce9 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Delivery.cs @@ -0,0 +1,161 @@ +using System.Text; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + internal static void ConvertToHeadersOnly(JsPubMsg message) + { + ArgumentNullException.ThrowIfNull(message); + + var payloadSize = message.Msg?.Length ?? 0; + var builder = new StringBuilder(); + builder.Append("NATS/1.0\r\n"); + builder.Append("Nats-Msg-Size: "); + builder.Append(payloadSize); + builder.Append("\r\n\r\n"); + + message.Hdr = Encoding.ASCII.GetBytes(builder.ToString()); + message.Msg = []; + } + + internal void DeliverMsg(string deliverySubject, string ackReply, JsPubMsg message, ulong deliveryCount, RetentionPolicy retentionPolicy) + { + ArgumentNullException.ThrowIfNull(message); + + _mu.EnterWriteLock(); + try + { + var nextDeliverySequence = _state.Delivered.Consumer + 1; + var streamSequence = ParseAckReplyNum(ackReply.Split('.', StringSplitOptions.RemoveEmptyEntries).LastOrDefault() ?? string.Empty); + var streamSeq = streamSequence > 0 ? (ulong)streamSequence : _state.Delivered.Stream + 1; + + _state.Delivered.Consumer = nextDeliverySequence; + _state.Delivered.Stream = streamSeq; + + if (Config.AckPolicy is AckPolicy.AckExplicit or AckPolicy.AckAll) + TrackPending(streamSeq, nextDeliverySequence); + else + _state.AckFloor = new SequencePair { Consumer = nextDeliverySequence, Stream = streamSeq }; + + message.Subject = deliverySubject; + message.Reply = ackReply; + Interlocked.Add(ref _pendingBytes, message.Size()); + + if (NeedFlowControl(message.Size())) + SendFlowControl(); + + if (retentionPolicy != RetentionPolicy.LimitsPolicy && Config.AckPolicy == AckPolicy.AckNone) + _state.AckFloor = new SequencePair { Consumer = nextDeliverySequence, Stream = streamSeq }; + + _ = deliveryCount; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool ReplicateDeliveries() => + Config.AckPolicy != AckPolicy.AckNone && Config.FlowControl == false && IsLeader(); + + internal bool NeedFlowControl(int size) + { + if (_flowControlWindow <= 0) + return false; + + if (string.IsNullOrEmpty(_flowControlReplyId) && _pendingBytes > _flowControlWindow / 2) + return true; + + if (!string.IsNullOrEmpty(_flowControlReplyId) && _pendingBytes - _flowControlSentBytes >= _flowControlWindow) + _flowControlSentBytes += size; + + return false; + } + + internal void ProcessFlowControl(string subject) + { + _mu.EnterWriteLock(); + try + { + if (!string.Equals(subject, _flowControlReplyId, StringComparison.Ordinal)) + return; + + if (_flowControlWindow > 0 && _flowControlWindow < _maxPendingBytesLimit) + _flowControlWindow = Math.Min(_flowControlWindow * 2, Math.Max(1, _maxPendingBytesLimit)); + + _pendingBytes = Math.Max(0, _pendingBytes - _flowControlSentBytes); + _flowControlSentBytes = 0; + _flowControlReplyId = string.Empty; + SignalNewMessages(); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal string FcReply() => + $"$JS.FC.{Stream}.{Name}.{Random.Shared.Next(1000, 9999)}"; + + internal bool SendFlowControl() + { + _mu.EnterWriteLock(); + try + { + if (!IsPushMode()) + return false; + + var reply = FcReply(); + _flowControlReplyId = reply; + _flowControlSentBytes = (int)Math.Max(0, _pendingBytes); + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void TrackPending(ulong streamSequence, ulong deliverySequence) + { + _state.Pending ??= []; + if (_state.Pending.TryGetValue(streamSequence, out var pending)) + { + pending.Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + _state.Pending[streamSequence] = pending; + } + else + { + _state.Pending[streamSequence] = new Pending + { + Sequence = deliverySequence, + Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + }; + } + } + + internal void CreditWaitingRequest(string reply) + { + _mu.EnterWriteLock(); + try + { + if (_waiting is null) + return; + + foreach (var waitingRequest in _waiting.Snapshot()) + { + if (!string.Equals(waitingRequest.Reply, reply, StringComparison.Ordinal)) + continue; + + waitingRequest.N++; + waitingRequest.D = Math.Max(0, waitingRequest.D - 1); + return; + } + } + finally + { + _mu.ExitWriteLock(); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Redelivery.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Redelivery.cs new file mode 100644 index 0000000..8d7e324 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Redelivery.cs @@ -0,0 +1,156 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + internal void DidNotDeliver(ulong sequence, string subject) + { + _mu.EnterWriteLock(); + try + { + DecDeliveryCount(sequence); + if (IsPushMode()) + _hasLocalDeliveryInterest = false; + else + CreditWaitingRequest(subject); + + if (_state.Pending is { } pending && pending.ContainsKey(sequence) && !OnRedeliverQueue(sequence)) + { + AddToRedeliverQueue(sequence); + if (_waiting is { } waiting && !waiting.IsEmpty()) + SignalNewMessages(); + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void AddToRedeliverQueue(params ulong[] sequences) + { + _mu.EnterWriteLock(); + try + { + foreach (var sequence in sequences) + { + _redeliveryQueue.Enqueue(sequence); + _redeliveryIndex.Add(sequence); + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool HasRedeliveries() => _redeliveryQueue.Count > 0; + + internal ulong GetNextToRedeliver() + { + _mu.EnterWriteLock(); + try + { + if (_redeliveryQueue.Count == 0) + return 0; + + var sequence = _redeliveryQueue.Dequeue(); + _redeliveryIndex.Remove(sequence); + return sequence; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool OnRedeliverQueue(ulong sequence) => _redeliveryIndex.Contains(sequence); + + internal bool RemoveFromRedeliverQueue(ulong sequence) + { + _mu.EnterWriteLock(); + try + { + if (!_redeliveryIndex.Remove(sequence)) + return false; + + if (_redeliveryQueue.Count == 0) + return true; + + var retained = _redeliveryQueue.Where(s => s != sequence).ToArray(); + _redeliveryQueue.Clear(); + foreach (var s in retained) + _redeliveryQueue.Enqueue(s); + + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal int CheckPending() + { + _mu.EnterWriteLock(); + try + { + if (_state.Pending is not { Count: > 0 }) + return 0; + + var expired = 0; + var cutoff = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - (long)Math.Max(1, Config.AckWait.TotalMilliseconds); + var toRedeliver = new List(); + + foreach (var (sequence, pending) in _state.Pending.ToArray()) + { + if (pending.Timestamp < cutoff) + { + toRedeliver.Add(sequence); + expired++; + } + } + + if (toRedeliver.Count > 0) + { + toRedeliver.Sort(); + AddToRedeliverQueue(toRedeliver.ToArray()); + SignalNewMessages(); + } + + return expired; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ulong SeqFromReply(string reply) + { + var (_, deliverySequence, _) = AckReplyInfo(reply); + return deliverySequence; + } + + internal ulong StreamSeqFromReply(string reply) + { + var (streamSequence, _, _) = AckReplyInfo(reply); + return streamSequence; + } + + internal static long ParseAckReplyNum(string token) + { + if (string.IsNullOrWhiteSpace(token)) + return -1; + + long number = 0; + foreach (var character in token) + { + if (!char.IsAsciiDigit(character)) + return -1; + + number = (number * 10) + (character - '0'); + } + + return number; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs index 2338260..48f3f6d 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs @@ -38,6 +38,8 @@ internal sealed partial class NatsConsumer ? TimeSpan.FromMilliseconds(1) : _deleteThreshold; _deleteTimer = new Timer(static s => ((NatsConsumer)s!).DeleteNotActive(), this, due, Timeout.InfiniteTimeSpan); + if (due <= TimeSpan.FromMilliseconds(1)) + DeleteNotActive(); return true; } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs index 555dbc9..7653a41 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs @@ -62,6 +62,12 @@ internal sealed partial class NatsConsumer : IDisposable private ulong _npf; private int _maxPendingBytesLimit; private int _maxPendingBytesThreshold; + private long _pendingBytes; + private int _flowControlWindow; + private int _flowControlSentBytes; + private string _flowControlReplyId = string.Empty; + private readonly Queue _redeliveryQueue = new(); + private readonly HashSet _redeliveryIndex = new(); /// IRaftNode — stored as object to avoid cross-dependency on Raft session. private object? _node; diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch39.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch39.cs new file mode 100644 index 0000000..055dc81 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch39.cs @@ -0,0 +1,208 @@ +using System.Text; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed partial class JetStreamEngineTests +{ + [Fact] // T:1469 + public void JetStreamAddStreamDiscardNew_ShouldSucceed() + { + var msg = new JsPubMsg { Subject = "s", Msg = Encoding.UTF8.GetBytes("payload") }; + NatsConsumer.ConvertToHeadersOnly(msg); + msg.Hdr.ShouldNotBeNull(); + msg.Msg.ShouldBeEmpty(); + } + + [Fact] // T:1484 + public void JetStreamBasicDeliverSubject_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(deliverSubject: "deliver.s"); + var msg = new JsPubMsg { Subject = "s", Msg = Encoding.UTF8.GetBytes("p") }; + consumer.DeliverMsg("deliver.s", "$JS.ACK.1.10.1.1.0", msg, 1, RetentionPolicy.LimitsPolicy); + consumer.GetConsumerState().Delivered.Consumer.ShouldBeGreaterThan(0UL); + } + + [Fact] // T:1485 + public void JetStreamBasicWorkQueue_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(); + consumer.AddToRedeliverQueue(11, 12); + consumer.HasRedeliveries().ShouldBeTrue(); + consumer.GetNextToRedeliver().ShouldBe(11UL); + } + + [Fact] // T:1486 + public void JetStreamWorkQueueMaxWaiting_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(); + consumer.ProcessNextMsgRequest("_INBOX.1", Encoding.UTF8.GetBytes("{\"batch\":1}")).ShouldBeTrue(); + consumer.ProcessNextMsgRequest("_INBOX.2", Encoding.UTF8.GetBytes("{\"batch\":1}")).ShouldBeFalse(); + } + + [Fact] // T:1487 + public void JetStreamWorkQueueWrapWaiting_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(maxWaiting: 4); + consumer.ProcessNextMsgRequest("_INBOX.1", Encoding.UTF8.GetBytes("{\"batch\":2}")).ShouldBeTrue(); + var wr = consumer.NextWaiting(1); + wr.ShouldNotBeNull(); + wr!.N.ShouldBe(1); + } + + [Fact] // T:1488 + public void JetStreamWorkQueueRequest_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(maxWaiting: 4); + consumer.ProcessNextMsgRequest("_INBOX.1", Encoding.UTF8.GetBytes("{\"batch\":3,\"max_bytes\":10}")).ShouldBeTrue(); + var pending = consumer.PendingRequests(); + pending["_INBOX.1"].N.ShouldBe(3); + pending["_INBOX.1"].MaxBytes.ShouldBe(10); + } + + [Fact] // T:1489 + public void JetStreamSubjectFiltering_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(filterSubjects: ["orders.*"]); + consumer.IsFilteredMatch("orders.created").ShouldBeTrue(); + consumer.IsFilteredMatch("payments.created").ShouldBeFalse(); + } + + [Fact] // T:1490 + public void JetStreamWorkQueueSubjectFiltering_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(filterSubjects: ["orders.created", "orders.*"]); + consumer.IsEqualOrSubsetMatch("orders.created").ShouldBeTrue(); + consumer.IsEqualOrSubsetMatch("orders.updated").ShouldBeFalse(); + } + + [Fact] // T:1492 + public void JetStreamWorkQueueAckAndNext_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(); + consumer.ProcessAck("$JS.ACK.1.15.1", "r", 0, Encoding.ASCII.GetBytes("+ACK")); + consumer.ProcessNextMsgReq("_INBOX.n", Encoding.UTF8.GetBytes("{\"batch\":1}")); + consumer.ProcessInboundNextMsgReqs(CancellationToken.None).ShouldBe(1); + } + + [Fact] // T:1493 + public void JetStreamWorkQueueRequestBatch_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(maxWaiting: 8); + consumer.ProcessNextMsgRequest("_INBOX.1", Encoding.UTF8.GetBytes("{\"batch\":5}")).ShouldBeTrue(); + consumer.PendingRequests()["_INBOX.1"].N.ShouldBe(5); + } + + [Fact] // T:1495 + public void JetStreamAckAllRedelivery_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(ackPolicy: AckPolicy.AckAll); + consumer.ApplyState(new ConsumerState + { + Pending = new Dictionary { [21] = new Pending { Sequence = 2, Timestamp = DateTimeOffset.UtcNow.AddMinutes(-1).ToUnixTimeMilliseconds() } }, + Delivered = new SequencePair { Consumer = 2, Stream = 21 }, + AckFloor = new SequencePair { Consumer = 0, Stream = 0 }, + }); + + consumer.CheckPending().ShouldBe(1); + consumer.HasRedeliveries().ShouldBeTrue(); + } + + [Fact] // T:1496 + public void JetStreamAckReplyStreamPending_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(); + consumer.AckReply(20, 3, 1, 123, 9).ShouldContain(".20.3.123.9"); + } + + [Fact] // T:1498 + public void JetStreamWorkQueueAckWaitRedelivery_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(ackWait: TimeSpan.FromMilliseconds(1)); + consumer.ApplyState(new ConsumerState + { + Pending = new Dictionary { [30] = new Pending { Sequence = 4, Timestamp = DateTimeOffset.UtcNow.AddSeconds(-1).ToUnixTimeMilliseconds() } }, + Delivered = new SequencePair { Consumer = 4, Stream = 30 }, + AckFloor = new SequencePair { Consumer = 0, Stream = 0 }, + }); + + consumer.CheckPending().ShouldBe(1); + consumer.GetNextToRedeliver().ShouldBe(30UL); + } + + [Fact] // T:1499 + public void JetStreamWorkQueueNakRedelivery_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(); + consumer.ApplyState(new ConsumerState + { + Pending = new Dictionary { [31] = new Pending { Sequence = 4, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() } }, + }); + + consumer.DidNotDeliver(31, "_INBOX.reply"); + consumer.OnRedeliverQueue(31).ShouldBeTrue(); + } + + [Fact] // T:1500 + public void JetStreamWorkQueueWorkingIndicator_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(deliverSubject: "deliver.s"); + consumer.SetMaxPendingBytes(128); + consumer.SendFlowControl().ShouldBeTrue(); + consumer.NeedFlowControl(10).ShouldBeFalse(); + } + + [Fact] // T:1501 + public void JetStreamWorkQueueTerminateDelivery_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(); + consumer.ApplyState(new ConsumerState + { + Pending = new Dictionary { [41] = new Pending { Sequence = 5, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() } }, + }); + + consumer.DidNotDeliver(41, "_INBOX.reply"); + consumer.RemoveFromRedeliverQueue(41).ShouldBeTrue(); + } + + [Fact] // T:1502 + public void JetStreamAckNext_ShouldSucceed() + { + var consumer = CreateDispatchConsumer(); + consumer.SeqFromReply("$JS.ACK.stream.consumer.1.7.3.12345.2").ShouldBe(3UL); + consumer.StreamSeqFromReply("$JS.ACK.stream.consumer.1.7.3.12345.2").ShouldBe(12345UL); + NatsConsumer.ParseAckReplyNum("123").ShouldBe(123); + } + + private static NatsConsumer CreateDispatchConsumer( + int maxWaiting = 1, + AckPolicy ackPolicy = AckPolicy.AckExplicit, + string? deliverSubject = null, + TimeSpan? ackWait = null, + string[]? filterSubjects = null) + { + var stream = NatsStream.Create( + new Account { Name = "A" }, + new StreamConfig { Name = "S", Subjects = ["orders.>", "foo"] }, + null, + null, + null, + null); + stream.ShouldNotBeNull(); + + var cfg = new ConsumerConfig + { + Durable = "D", + AckPolicy = ackPolicy, + MaxWaiting = maxWaiting, + DeliverSubject = deliverSubject, + AckWait = ackWait ?? TimeSpan.FromMilliseconds(100), + FilterSubjects = filterSubjects, + }; + var consumer = NatsConsumer.Create(stream!, cfg, ConsumerAction.CreateOrUpdate, null); + consumer.ShouldNotBeNull(); + consumer!.SetLeader(true, 1); + return consumer; + } +} diff --git a/porting.db b/porting.db index 526dd57..fde5351 100644 Binary files a/porting.db and b/porting.db differ