diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.ConsumerConfig.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.ConsumerConfig.cs new file mode 100644 index 0000000..5d8a198 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.ConsumerConfig.cs @@ -0,0 +1,47 @@ +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class Account +{ + internal Exception? CheckNewConsumerConfig(ConsumerConfig current, ConsumerConfig next) + { + ArgumentNullException.ThrowIfNull(current); + ArgumentNullException.ThrowIfNull(next); + + if (NatsConsumer.ConfigsEqualSansDelivery(current, next) && + string.Equals(current.DeliverSubject, next.DeliverSubject, StringComparison.Ordinal)) + { + return null; + } + + if (current.DeliverPolicy != next.DeliverPolicy) + return new InvalidOperationException("deliver policy can not be updated"); + if (current.OptStartSeq != next.OptStartSeq) + return new InvalidOperationException("start sequence can not be updated"); + if (current.OptStartTime != next.OptStartTime) + return new InvalidOperationException("start time can not be updated"); + if (current.AckPolicy != next.AckPolicy) + return new InvalidOperationException("ack policy can not be updated"); + if (current.ReplayPolicy != next.ReplayPolicy) + return new InvalidOperationException("replay policy can not be updated"); + if (current.Heartbeat != next.Heartbeat) + return new InvalidOperationException("heart beats can not be updated"); + if (current.FlowControl != next.FlowControl) + return new InvalidOperationException("flow control can not be updated"); + + if (!string.Equals(current.DeliverSubject, next.DeliverSubject, StringComparison.Ordinal)) + { + if (string.IsNullOrWhiteSpace(current.DeliverSubject)) + return new InvalidOperationException("can not update pull consumer to push based"); + if (string.IsNullOrWhiteSpace(next.DeliverSubject)) + return new InvalidOperationException("can not update push consumer to pull based"); + } + + if (current.MaxWaiting != next.MaxWaiting) + return new InvalidOperationException("max waiting can not be updated"); + + if (next.BackOff is { Length: > 0 } && next.MaxDeliver != -1 && next.BackOff.Length > next.MaxDeliver) + return new InvalidOperationException(JsApiErrors.NewJSConsumerMaxDeliverBackoffError().Description ?? "max deliver backoff invalid"); + + return null; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Acks.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Acks.cs new file mode 100644 index 0000000..f7cea2b --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Acks.cs @@ -0,0 +1,191 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + private static readonly byte[] AckAck = "+ACK"u8.ToArray(); + private static readonly byte[] AckOk = "+OK"u8.ToArray(); + private static readonly byte[] AckNak = "-NAK"u8.ToArray(); + private static readonly byte[] AckProgress = "+WPI"u8.ToArray(); + private static readonly byte[] AckNext = "+NXT"u8.ToArray(); + + private readonly Queue _ackQueue = new(); + private string? _lastAckReplySubject; + + internal void SendAckReply(string subject) + { + if (string.IsNullOrWhiteSpace(subject)) + return; + + _mu.EnterWriteLock(); + try + { + _lastAckReplySubject = subject; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal static JsAckMsg NewJSAckMsg(string subject, string reply, int headerBytes, byte[] msg) + => new() + { + Subject = subject, + Reply = reply, + HeaderBytes = headerBytes, + Msg = msg, + }; + + internal void PushAck(string subject, string reply, int headerBytes, byte[] rawMessage) + { + ArgumentNullException.ThrowIfNull(rawMessage); + _mu.EnterWriteLock(); + try + { + _ackQueue.Enqueue(NewJSAckMsg(subject, reply, headerBytes, (byte[])rawMessage.Clone())); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void ProcessAck(string subject, string reply, int headerBytes, byte[] rawMessage) + { + ArgumentNullException.ThrowIfNull(rawMessage); + + var msg = headerBytes > 0 && headerBytes <= rawMessage.Length + ? rawMessage[headerBytes..] + : rawMessage; + + var (streamSeq, deliverySeq, deliveryCount) = AckReplyInfo(subject); + var skipAckReply = streamSeq == 0; + + if (msg.Length == 0 || msg.SequenceEqual(AckAck) || msg.SequenceEqual(AckOk)) + { + ProcessAckMessage(streamSeq, deliverySeq, deliveryCount, reply); + } + else if (StartsWith(msg, AckNext)) + { + ProcessAckMessage(streamSeq, deliverySeq, deliveryCount, string.Empty); + skipAckReply = true; + } + else if (StartsWith(msg, AckNak)) + { + _state.Redelivered ??= new Dictionary(); + _state.Redelivered[streamSeq] = _state.Redelivered.TryGetValue(streamSeq, out var redeliveries) + ? redeliveries + 1 + : 1UL; + skipAckReply = true; + } + else if (msg.SequenceEqual(AckProgress)) + { + ProgressUpdate(streamSeq); + } + + if (!string.IsNullOrWhiteSpace(reply) && !skipAckReply) + SendAckReply(reply); + } + + internal void ProgressUpdate(ulong sequence) + { + _mu.EnterWriteLock(); + try + { + _state.Pending ??= new Dictionary(); + if (_state.Pending.TryGetValue(sequence, out var pending)) + { + pending.Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + _state.Pending[sequence] = pending; + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void UpdateSkipped(ulong sequence) + { + _mu.EnterWriteLock(); + try + { + _state.AckFloor.Stream = Math.Max(_state.AckFloor.Stream, sequence > 0 ? sequence - 1 : 0); + _state.AckFloor.Consumer = Math.Max(_state.AckFloor.Consumer, sequence > 0 ? sequence - 1 : 0); + _updateChannel.Writer.TryWrite(true); + } + finally + { + _mu.ExitWriteLock(); + } + } + + private void ProcessAckMessage(ulong streamSeq, ulong deliverySeq, ulong deliveryCount, string reply) + { + _mu.EnterWriteLock(); + try + { + _state.Pending ??= new Dictionary(); + _state.Pending.Remove(streamSeq); + _state.AckFloor.Stream = Math.Max(_state.AckFloor.Stream, streamSeq); + _state.AckFloor.Consumer = Math.Max(_state.AckFloor.Consumer, deliverySeq); + _state.Delivered.Stream = Math.Max(_state.Delivered.Stream, streamSeq); + _state.Delivered.Consumer = Math.Max(_state.Delivered.Consumer, deliverySeq); + + _state.Redelivered ??= new Dictionary(); + if (deliveryCount > 1) + _state.Redelivered[streamSeq] = deliveryCount; + + if (!string.IsNullOrWhiteSpace(reply)) + _lastAckReplySubject = reply; + } + finally + { + _mu.ExitWriteLock(); + } + } + + private static (ulong StreamSequence, ulong DeliverySequence, ulong DeliveryCount) AckReplyInfo(string subject) + { + if (string.IsNullOrWhiteSpace(subject)) + return (0, 0, 0); + + var numbers = subject + .Split('.', StringSplitOptions.RemoveEmptyEntries) + .Select(static token => ulong.TryParse(token, out var value) ? (ulong?)value : null) + .Where(static value => value.HasValue) + .Select(static value => value!.Value) + .ToArray(); + + if (numbers.Length >= 3) + return (numbers[^2], numbers[^3], numbers[^1]); + if (numbers.Length == 2) + return (numbers[1], numbers[0], 1); + if (numbers.Length == 1) + return (numbers[0], numbers[0], 1); + + return (0, 0, 0); + } + + private static bool StartsWith(byte[] message, byte[] prefix) + { + if (message.Length < prefix.Length) + return false; + + for (var i = 0; i < prefix.Length; i++) + { + if (message[i] != prefix[i]) + return false; + } + + return true; + } +} + +internal sealed class JsAckMsg +{ + internal string Subject { get; set; } = string.Empty; + internal string Reply { get; set; } = string.Empty; + internal int HeaderBytes { get; set; } + internal byte[] Msg { get; set; } = []; +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Lifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Lifecycle.cs index 75256d5..07ded34 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Lifecycle.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Lifecycle.cs @@ -134,17 +134,6 @@ internal sealed partial class NatsConsumer internal bool HasDeliveryInterest() { - _mu.EnterReadLock(); - try - { - if (_closed || string.IsNullOrWhiteSpace(Config.DeliverSubject)) - return false; - - return _internalSubscriptions.Contains(Config.DeliverSubject!); - } - finally - { - _mu.ExitReadLock(); - } + return HasDeliveryInterest(_hasLocalDeliveryInterest); } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs new file mode 100644 index 0000000..6ff6351 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs @@ -0,0 +1,342 @@ +using System.Text.Json; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + private static readonly TimeSpan DefaultGatewayInterestInterval = TimeSpan.FromSeconds(1); + + internal bool UpdateDeliveryInterest(bool localInterest) + { + var interest = HasDeliveryInterest(localInterest); + + _mu.EnterWriteLock(); + try + { + _hasLocalDeliveryInterest = localInterest; + if (_closed || IsPullMode()) + return false; + + var wasActive = !IsPullMode() && _isLeader; + if (interest && !wasActive) + _updateChannel.Writer.TryWrite(true); + + if (!interest) + _isPaused = false; + + if (_deleteTimer != null && _deleteThreshold > TimeSpan.Zero && !interest) + return true; + + _deleteTimer?.Dispose(); + _deleteTimer = null; + + if (!interest && _deleteThreshold > TimeSpan.Zero) + { + _deleteTimer = new Timer(static s => ((NatsConsumer)s!).DeleteNotActive(), this, _deleteThreshold, Timeout.InfiniteTimeSpan); + return true; + } + + return false; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void DeleteNotActive() + { + _mu.EnterReadLock(); + try + { + if (_closed) + return; + + if (IsPushMode()) + { + if (HasDeliveryInterest()) + return; + } + else + { + if (_state.Pending is { Count: > 0 }) + return; + } + } + finally + { + _mu.ExitReadLock(); + } + + Delete(); + } + + internal void WatchGWinterest() + { + var wasActive = HasDeliveryInterest(); + if (!_hasLocalDeliveryInterest) + { + UpdateDeliveryInterest(localInterest: false); + if (!wasActive && HasDeliveryInterest()) + _updateChannel.Writer.TryWrite(true); + } + + _mu.EnterWriteLock(); + try + { + _gatewayWatchTimer?.Dispose(); + _gatewayWatchTimer = new Timer(static s => ((NatsConsumer)s!).WatchGWinterest(), this, DefaultGatewayInterestInterval, Timeout.InfiniteTimeSpan); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool HasMaxDeliveries(ulong sequence) + { + _mu.EnterWriteLock(); + try + { + if (Config.MaxDeliver <= 0) + return false; + + _state.Redelivered ??= new Dictionary(); + _state.Pending ??= new Dictionary(); + + var deliveryCount = _state.Redelivered.TryGetValue(sequence, out var redeliveries) + ? redeliveries + 1 + : 1UL; + + if (deliveryCount < (ulong)Config.MaxDeliver) + { + _state.Redelivered[sequence] = deliveryCount; + return false; + } + + _state.Redelivered[sequence] = deliveryCount; + _state.Pending.Remove(sequence); + _updateChannel.Writer.TryWrite(true); + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void ForceExpirePending() + { + _mu.EnterWriteLock(); + try + { + if (_state.Pending is not { Count: > 0 }) + return; + + _state.Redelivered ??= new Dictionary(); + foreach (var seq in _state.Pending.Keys.ToArray()) + { + if (HasMaxDeliveries(seq)) + continue; + + _state.Redelivered[seq] = _state.Redelivered.TryGetValue(seq, out var current) + ? current + 1 + : 1UL; + + _state.Pending[seq].Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + } + + _updateChannel.Writer.TryWrite(true); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void SetRateLimitNeedsLocks() + { + _mu.EnterWriteLock(); + try + { + SetRateLimit(Config.RateLimit); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void SetRateLimit(ulong bitsPerSecond) + { + if (bitsPerSecond == 0) + { + Interlocked.Exchange(ref _rateLimitBitsPerSecond, 0); + _rateLimitBurstBytes = 0; + return; + } + + Interlocked.Exchange(ref _rateLimitBitsPerSecond, (long)bitsPerSecond); + var configuredMax = _streamRef?.Config.MaxMsgSize ?? 0; + _rateLimitBurstBytes = configuredMax > 0 + ? configuredMax + : (int)Math.Max(1024UL, bitsPerSecond / 8UL); + } + + internal bool UpdateDeliverSubject(string? newDeliver) + { + _mu.EnterWriteLock(); + try + { + return UpdateDeliverSubjectLocked(newDeliver); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool UpdateDeliverSubjectLocked(string? newDeliver) + { + if (_closed || IsPullMode() || string.Equals(Config.DeliverSubject, newDeliver, StringComparison.Ordinal)) + return false; + + if (_state.Pending is { Count: > 0 }) + ForceExpirePending(); + + if (!string.IsNullOrWhiteSpace(Config.DeliverSubject)) + _internalSubscriptions.Remove(Config.DeliverSubject!); + + Config.DeliverSubject = string.IsNullOrWhiteSpace(newDeliver) ? null : newDeliver; + + if (!string.IsNullOrWhiteSpace(Config.DeliverSubject)) + _internalSubscriptions.Add(Config.DeliverSubject!); + + _updateChannel.Writer.TryWrite(true); + return true; + } + + internal static bool ConfigsEqualSansDelivery(ConsumerConfig left, ConsumerConfig right) + { + var l = CloneConfig(left); + var r = CloneConfig(right); + l.DeliverSubject = null; + r.DeliverSubject = null; + + return JsonSerializer.Serialize(l) == JsonSerializer.Serialize(r); + } + + internal (ulong Sequence, bool CanRespond, Exception? Error) ResetStartingSeq(ulong sequence, string? reply) + { + _mu.EnterWriteLock(); + try + { + if (sequence == 0) + { + sequence = _state.AckFloor.Stream + 1; + } + else + { + switch (Config.DeliverPolicy) + { + case DeliverPolicy.DeliverAll: + break; + case DeliverPolicy.DeliverByStartSequence when sequence < Config.OptStartSeq: + return (0, false, new InvalidOperationException("below start seq")); + case DeliverPolicy.DeliverByStartTime when Config.OptStartTime.HasValue: + if (sequence == 0) + return (0, false, new InvalidOperationException("below start time")); + break; + default: + return (0, false, new InvalidOperationException("not allowed")); + } + } + + if (sequence == 0) + sequence = 1; + + _state.Delivered.Stream = sequence; + _state.Delivered.Consumer = sequence; + _state.AckFloor.Stream = sequence > 0 ? sequence - 1 : 0; + _state.AckFloor.Consumer = sequence > 0 ? sequence - 1 : 0; + _state.Pending = new Dictionary(); + _state.Redelivered = new Dictionary(); + _updateChannel.Writer.TryWrite(true); + + _ = reply; + return (sequence, true, null); + } + finally + { + _mu.ExitWriteLock(); + } + } + + private bool HasDeliveryInterest(bool localInterest) + { + _mu.EnterReadLock(); + try + { + if (string.IsNullOrWhiteSpace(Config.DeliverSubject)) + return false; + + if (localInterest) + return true; + + if (_streamRef?.Account is not { } account) + return false; + + if (account.Server is NatsServer server && server.HasGatewayInterest(account, Config.DeliverSubject!)) + return true; + + return _internalSubscriptions.Contains(Config.DeliverSubject!); + } + finally + { + _mu.ExitReadLock(); + } + } + + private bool IsPullMode() => string.IsNullOrWhiteSpace(Config.DeliverSubject); + + private bool IsPushMode() => !IsPullMode(); + + private static ConsumerConfig CloneConfig(ConsumerConfig cfg) => + new() + { + Durable = cfg.Durable, + Name = cfg.Name, + Description = cfg.Description, + DeliverPolicy = cfg.DeliverPolicy, + OptStartSeq = cfg.OptStartSeq, + OptStartTime = cfg.OptStartTime, + AckPolicy = cfg.AckPolicy, + AckWait = cfg.AckWait, + MaxDeliver = cfg.MaxDeliver, + BackOff = cfg.BackOff?.ToArray(), + FilterSubject = cfg.FilterSubject, + FilterSubjects = cfg.FilterSubjects?.ToArray(), + ReplayPolicy = cfg.ReplayPolicy, + RateLimit = cfg.RateLimit, + SampleFrequency = cfg.SampleFrequency, + MaxWaiting = cfg.MaxWaiting, + MaxAckPending = cfg.MaxAckPending, + FlowControl = cfg.FlowControl, + HeadersOnly = cfg.HeadersOnly, + MaxRequestBatch = cfg.MaxRequestBatch, + MaxRequestExpires = cfg.MaxRequestExpires, + MaxRequestMaxBytes = cfg.MaxRequestMaxBytes, + DeliverSubject = cfg.DeliverSubject, + DeliverGroup = cfg.DeliverGroup, + Heartbeat = cfg.Heartbeat, + InactiveThreshold = cfg.InactiveThreshold, + Replicas = cfg.Replicas, + MemoryStorage = cfg.MemoryStorage, + Direct = cfg.Direct, + Metadata = cfg.Metadata is null ? null : new Dictionary(cfg.Metadata), + PauseUntil = cfg.PauseUntil, + PriorityGroups = cfg.PriorityGroups?.ToArray(), + PriorityPolicy = cfg.PriorityPolicy, + PinnedTTL = cfg.PinnedTTL, + }; +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs index 32062b9..57b2b68 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs @@ -43,6 +43,11 @@ internal sealed partial class NatsConsumer : IDisposable private DateTime _lostQuorumSent; private TimeSpan _deleteThreshold; private bool _isPaused; + private Timer? _deleteTimer; + private Timer? _gatewayWatchTimer; + private bool _hasLocalDeliveryInterest; + private long _rateLimitBitsPerSecond; + private int _rateLimitBurstBytes; /// IRaftNode — stored as object to avoid cross-dependency on Raft session. private object? _node; diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs index d3b1952..8512e52 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs @@ -239,6 +239,16 @@ public sealed class JsPubMsg /// Sync/ack channel (opaque, set at runtime). public object? Sync { get; set; } + + public void ReturnToPool() + { + Subject = string.Empty; + Reply = null; + Hdr = null; + Msg = null; + Pa = null; + Sync = null; + } } /// diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs index 198002e..328ea67 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs @@ -296,4 +296,149 @@ public sealed class NatsConsumerTests consumer.HandleClusterConsumerInfoRequest().ShouldNotBeNull(); consumer.ClearNode(); } + + [Fact] + public void UpdateDeliveryInterest_AndDeleteNotActive_ShouldReflectInterestState() + { + var account = new Account { Name = "A" }; + var stream = NatsStream.Create(account, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, null, null, null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create( + stream!, + new ConsumerConfig { Durable = "D", DeliverSubject = "deliver.foo", InactiveThreshold = TimeSpan.FromMilliseconds(20) }, + ConsumerAction.Create, + null); + consumer.ShouldNotBeNull(); + + consumer!.UpdateInactiveThreshold(new ConsumerConfig { InactiveThreshold = TimeSpan.FromMilliseconds(20) }); + consumer.UpdateDeliveryInterest(localInterest: false).ShouldBeTrue(); + Thread.Sleep(40); + consumer.IsClosed().ShouldBeTrue(); + } + + [Fact] + public void WatchGWinterest_AndRateLimit_ShouldExecuteWithoutErrors() + { + var account = new Account { Name = "A" }; + var stream = NatsStream.Create(account, new StreamConfig { Name = "S", Subjects = ["foo"], MaxMsgSize = 4096 }, null, null, null, null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create( + stream!, + new ConsumerConfig { Durable = "D", DeliverSubject = "deliver.foo", RateLimit = 8_000 }, + ConsumerAction.Create, + null); + consumer.ShouldNotBeNull(); + + consumer!.SetRateLimitNeedsLocks(); + consumer.WatchGWinterest(); + consumer.SubscribeInternal("deliver.foo").ShouldBeTrue(); + consumer.UpdateDeliveryInterest(localInterest: true).ShouldBeFalse(); + consumer.HasDeliveryInterest().ShouldBeTrue(); + } + + [Fact] + public void AccountCheckNewConsumerConfig_InvalidPolicyChanges_ShouldFail() + { + var account = new Account { Name = "A" }; + var current = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit, DeliverPolicy = DeliverPolicy.DeliverAll }; + var next = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckAll, DeliverPolicy = DeliverPolicy.DeliverAll }; + + var err = account.CheckNewConsumerConfig(current, next); + + err.ShouldNotBeNull(); + err.Message.ShouldContain("ack policy"); + } + + [Fact] + public void UpdateDeliverSubject_AndConfigsEqualSansDelivery_ShouldBehave() + { + var account = new Account { Name = "A" }; + var stream = NatsStream.Create(account, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, null, null, null); + stream.ShouldNotBeNull(); + + var cfg = new ConsumerConfig { Durable = "D", DeliverSubject = "deliver.a", AckPolicy = AckPolicy.AckExplicit }; + var consumer = NatsConsumer.Create(stream!, cfg, ConsumerAction.Create, null); + consumer.ShouldNotBeNull(); + + consumer!.SubscribeInternal("deliver.a"); + consumer.UpdateDeliverSubject("deliver.b").ShouldBeTrue(); + consumer.SubscribeInternal("deliver.b"); + consumer.HasDeliveryInterest().ShouldBeTrue(); + + var left = new ConsumerConfig { Durable = "D", DeliverSubject = "deliver.a", AckPolicy = AckPolicy.AckExplicit }; + var right = new ConsumerConfig { Durable = "D", DeliverSubject = "deliver.b", AckPolicy = AckPolicy.AckExplicit }; + NatsConsumer.ConfigsEqualSansDelivery(left, right).ShouldBeTrue(); + } + + [Fact] + public void AckFlow_NewMessage_Push_Process_Progress_UpdateSkipped_ShouldBehave() + { + var account = new Account { Name = "A" }; + var stream = NatsStream.Create(account, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, null, null, null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create(stream!, new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }, ConsumerAction.Create, null); + consumer.ShouldNotBeNull(); + + var ack = NatsConsumer.NewJSAckMsg("a.b.10.20.1", "reply", 0, "+ACK"u8.ToArray()); + ack.Subject.ShouldBe("a.b.10.20.1"); + + consumer!.PushAck("a.b.10.20.1", "reply", 0, "+ACK"u8.ToArray()); + consumer.ProcessAck("a.b.10.20.1", "reply", 0, "+ACK"u8.ToArray()); + consumer.ProgressUpdate(10); + consumer.UpdateSkipped(25); + + var state = consumer.GetConsumerState(); + state.AckFloor.Stream.ShouldBeGreaterThanOrEqualTo(10UL); + } + + [Fact] + public void HasMaxDeliveries_ForceExpirePending_AndResetStartingSeq_ShouldBehave() + { + var account = new Account { Name = "A" }; + var stream = NatsStream.Create(account, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, null, null, null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create( + stream!, + new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit, MaxDeliver = 2, DeliverPolicy = DeliverPolicy.DeliverAll }, + ConsumerAction.Create, + null); + consumer.ShouldNotBeNull(); + + consumer!.ProcessAck("a.b.5.7.1", "reply", 0, "-NAK"u8.ToArray()); + consumer.HasMaxDeliveries(5).ShouldBeFalse(); + consumer.HasMaxDeliveries(5).ShouldBeTrue(); + consumer.ForceExpirePending(); + + var (seq, canRespond, err) = consumer.ResetStartingSeq(10, "reply"); + err.ShouldBeNull(); + seq.ShouldBe(10UL); + canRespond.ShouldBeTrue(); + } + + [Fact] + public void JsPubMsg_ReturnToPool_ShouldResetState() + { + var msg = new JsPubMsg + { + Subject = "foo", + Reply = "bar", + Hdr = [1, 2], + Msg = [3, 4], + Pa = new object(), + Sync = new object(), + }; + + msg.ReturnToPool(); + + msg.Subject.ShouldBe(string.Empty); + msg.Reply.ShouldBeNull(); + msg.Hdr.ShouldBeNull(); + msg.Msg.ShouldBeNull(); + msg.Pa.ShouldBeNull(); + msg.Sync.ShouldBeNull(); + } } diff --git a/porting.db b/porting.db index e3a156e..2b19db4 100644 Binary files a/porting.db and b/porting.db differ