diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Acks.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Acks.cs index f7cea2b..43f1817 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Acks.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Acks.cs @@ -87,6 +87,69 @@ internal sealed partial class NatsConsumer SendAckReply(reply); } + internal bool ProcessNak(ulong streamSequence, ulong deliverySequence, ulong deliveryCount, byte[] message) + { + ArgumentNullException.ThrowIfNull(message); + _mu.EnterWriteLock(); + try + { + _state.Redelivered ??= []; + _state.Redelivered[streamSequence] = Math.Max(deliveryCount + 1, _state.Redelivered.GetValueOrDefault(streamSequence)); + _state.Pending ??= []; + _state.Pending[streamSequence] = new Pending + { + Sequence = deliverySequence, + Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + }; + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool ProcessTerm(ulong streamSequence, ulong deliverySequence, ulong deliveryCount, string reason, string reply) + { + _mu.EnterWriteLock(); + try + { + _state.Pending?.Remove(streamSequence); + _state.Redelivered ??= []; + _state.Redelivered[streamSequence] = Math.Max(deliveryCount, _state.Redelivered.GetValueOrDefault(streamSequence)); + if (!string.IsNullOrWhiteSpace(reply)) + _lastAckReplySubject = reply; + _ = reason; + _ = deliverySequence; + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal TimeSpan AckWait(TimeSpan backOff) + { + if (backOff > TimeSpan.Zero) + return backOff; + + return Config.AckWait > TimeSpan.Zero ? Config.AckWait : DefaultAckWait; + } + + internal bool CheckRedelivered(ulong streamSequence) + { + _mu.EnterReadLock(); + try + { + return _state.Redelivered?.TryGetValue(streamSequence, out var count) == true && count > 1; + } + finally + { + _mu.ExitReadLock(); + } + } + internal void ProgressUpdate(ulong sequence) { _mu.EnterWriteLock(); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs index 6ff6351..afae093 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs @@ -339,4 +339,247 @@ internal sealed partial class NatsConsumer PriorityPolicy = cfg.PriorityPolicy, PinnedTTL = cfg.PinnedTTL, }; + + internal void ResetLocalStartingSeq(ulong sequence) + { + _mu.EnterWriteLock(); + try + { + _state.Delivered.Stream = sequence; + _state.Delivered.Consumer = sequence; + _state.AckFloor.Stream = sequence > 0 ? sequence - 1 : 0; + _state.AckFloor.Consumer = sequence > 0 ? sequence - 1 : 0; + _state.Pending = []; + _state.Redelivered = []; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal int LoopAndForwardProposals() + { + _mu.EnterWriteLock(); + try + { + var count = 0; + while (_proposalQueue.TryDequeue(out _)) + { + count++; + } + + return count; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void Propose(byte[] proposal) + { + ArgumentNullException.ThrowIfNull(proposal); + _mu.EnterWriteLock(); + try + { + _proposalQueue.Enqueue((byte[])proposal.Clone()); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void UpdateDelivered(ulong consumerSequence, ulong streamSequence, ulong deliveryCount, long timestamp) + { + _mu.EnterWriteLock(); + try + { + _state.Delivered.Consumer = Math.Max(_state.Delivered.Consumer, consumerSequence); + _state.Delivered.Stream = Math.Max(_state.Delivered.Stream, streamSequence); + _state.AckFloor.Consumer = Math.Max(_state.AckFloor.Consumer, consumerSequence > 0 ? consumerSequence - 1 : 0); + _state.AckFloor.Stream = Math.Max(_state.AckFloor.Stream, streamSequence > 0 ? streamSequence - 1 : 0); + _state.Pending ??= []; + _state.Pending[streamSequence] = new Pending + { + Sequence = consumerSequence, + Timestamp = timestamp, + }; + + _state.Redelivered ??= []; + if (deliveryCount > 1) + _state.Redelivered[streamSequence] = deliveryCount; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void AddAckReply(ulong streamSequence, string reply) + { + if (string.IsNullOrWhiteSpace(reply)) + return; + + _mu.EnterWriteLock(); + try + { + _ackReplies[streamSequence] = reply; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void AddReplicatedQueuedMsg(ulong sequence, JsPubMsg message) + { + ArgumentNullException.ThrowIfNull(message); + _mu.EnterWriteLock(); + try + { + _state.Pending ??= []; + _state.Pending[sequence] = new Pending + { + Sequence = sequence, + Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + }; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal int UpdateAcks() + { + _mu.EnterWriteLock(); + try + { + var count = _ackReplies.Count; + _ackReplies.Clear(); + return count; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void AddClusterPendingRequest(string requestId) + { + if (string.IsNullOrWhiteSpace(requestId)) + return; + + _mu.EnterWriteLock(); + try + { + _clusterPendingRequests[requestId] = DateTime.UtcNow; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void RemoveClusterPendingRequest(string requestId) + { + if (string.IsNullOrWhiteSpace(requestId)) + return; + + _mu.EnterWriteLock(); + try + { + _clusterPendingRequests.Remove(requestId); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void SetPendingRequestsOk(bool ok) + { + _mu.EnterWriteLock(); + try + { + _pendingRequestsOk = ok; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool PendingRequestsOk() + { + _mu.EnterReadLock(); + try + { + return _pendingRequestsOk; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal bool CheckAndSetPendingRequestsOk(bool ok) + { + _mu.EnterWriteLock(); + try + { + var previous = _pendingRequestsOk; + _pendingRequestsOk = ok; + return previous; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal int CheckPendingRequests(TimeSpan maxAge) + { + _mu.EnterReadLock(); + try + { + var cutoff = DateTime.UtcNow - maxAge; + return _clusterPendingRequests.Values.Count(timestamp => timestamp >= cutoff); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal int ReleaseAnyPendingRequests() + { + _mu.EnterWriteLock(); + try + { + var released = _clusterPendingRequests.Count; + _clusterPendingRequests.Clear(); + _pendingRequestsOk = true; + return released; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ConsumerState ReadStoredState() + { + _mu.EnterReadLock(); + try + { + return GetConsumerState(); + } + finally + { + _mu.ExitReadLock(); + } + } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs index 57b2b68..ba76a8a 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs @@ -48,6 +48,10 @@ internal sealed partial class NatsConsumer : IDisposable private bool _hasLocalDeliveryInterest; private long _rateLimitBitsPerSecond; private int _rateLimitBurstBytes; + private readonly Queue _proposalQueue = new(); + private readonly Dictionary _ackReplies = new(); + private readonly Dictionary _clusterPendingRequests = new(StringComparer.Ordinal); + private bool _pendingRequestsOk; /// IRaftNode — stored as object to avoid cross-dependency on Raft session. private object? _node; diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/ConsumerStateTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/ConsumerStateTests.cs new file mode 100644 index 0000000..3f51cf8 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/ConsumerStateTests.cs @@ -0,0 +1,81 @@ +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class ConsumerStateTests +{ + private static NatsConsumer CreateConsumer() + { + var account = new Account { Name = "A" }; + var stream = NatsStream.Create(account, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, null, null, null)!; + return NatsConsumer.Create(stream, new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }, ConsumerAction.Create, null)!; + } + + [Fact] + public void ProposalAndPendingRequestFlow_ShouldBehave() + { + var consumer = CreateConsumer(); + + consumer.Propose([1, 2, 3]); + consumer.Propose([4]); + consumer.LoopAndForwardProposals().ShouldBe(2); + + consumer.AddClusterPendingRequest("r1"); + consumer.AddClusterPendingRequest("r2"); + consumer.CheckPendingRequests(TimeSpan.FromMinutes(1)).ShouldBe(2); + consumer.RemoveClusterPendingRequest("r2"); + consumer.CheckPendingRequests(TimeSpan.FromMinutes(1)).ShouldBe(1); + + consumer.SetPendingRequestsOk(false); + consumer.PendingRequestsOk().ShouldBeFalse(); + consumer.CheckAndSetPendingRequestsOk(true).ShouldBeFalse(); + consumer.PendingRequestsOk().ShouldBeTrue(); + consumer.ReleaseAnyPendingRequests().ShouldBe(1); + } + + [Fact] + public void DeliveredAckReplyAndAcks_ShouldBehave() + { + var consumer = CreateConsumer(); + + consumer.UpdateDelivered(10, 20, 2, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()); + consumer.AddAckReply(20, "reply"); + consumer.UpdateAcks().ShouldBe(1); + + var state = consumer.ReadStoredState(); + state.Delivered.Consumer.ShouldBeGreaterThanOrEqualTo(10UL); + state.Delivered.Stream.ShouldBeGreaterThanOrEqualTo(20UL); + state.Redelivered.ShouldNotBeNull(); + state.Redelivered!.ShouldContainKey(20UL); + } + + [Fact] + public void ReplicatedQueueAndNakTermFlow_ShouldBehave() + { + var consumer = CreateConsumer(); + + consumer.AddReplicatedQueuedMsg(33, new JsPubMsg { Subject = "foo" }); + consumer.ProcessNak(33, 2, 1, "-NAK"u8.ToArray()).ShouldBeTrue(); + consumer.CheckRedelivered(33).ShouldBeTrue(); + consumer.ProcessNak(33, 2, 2, "-NAK"u8.ToArray()).ShouldBeTrue(); + consumer.CheckRedelivered(33).ShouldBeTrue(); + + consumer.ProcessTerm(33, 2, 2, "done", "reply").ShouldBeTrue(); + consumer.AckWait(TimeSpan.Zero).ShouldBe(TimeSpan.FromSeconds(30)); + consumer.AckWait(TimeSpan.FromSeconds(5)).ShouldBe(TimeSpan.FromSeconds(5)); + } + + [Fact] + public void ResetLocalStartingSeq_ShouldResetState() + { + var consumer = CreateConsumer(); + + consumer.UpdateDelivered(1, 1, 1, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()); + consumer.ResetLocalStartingSeq(100); + + var state = consumer.GetConsumerState(); + state.Delivered.Stream.ShouldBe(100UL); + state.AckFloor.Stream.ShouldBe(99UL); + } +} diff --git a/porting.db b/porting.db index 2b19db4..479ad4b 100644 Binary files a/porting.db and b/porting.db differ