diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.ReplyParsing.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.ReplyParsing.cs new file mode 100644 index 0000000..999b56b --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.ReplyParsing.cs @@ -0,0 +1,118 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + internal static (ulong StreamSequence, ulong DeliverySequence, ulong DeliveryCount, long Timestamp, ulong Pending) ReplyInfo(string subject) + { + if (string.IsNullOrWhiteSpace(subject)) + return (0, 0, 0, 0, 0); + + var tokens = subject.Split('.', StringSplitOptions.RemoveEmptyEntries); + if (tokens.Length < 9 || !string.Equals(tokens[0], "$JS", StringComparison.Ordinal) || !string.Equals(tokens[1], "ACK", StringComparison.Ordinal)) + return (0, 0, 0, 0, 0); + + var deliveryCount = (ulong)Math.Max(0, ParseAckReplyNum(tokens[4])); + var streamSequence = (ulong)Math.Max(0, ParseAckReplyNum(tokens[5])); + var deliverySequence = (ulong)Math.Max(0, ParseAckReplyNum(tokens[6])); + var timestamp = ParseAckReplyNum(tokens[7]); + var pending = (ulong)Math.Max(0, ParseAckReplyNum(tokens[8])); + + return (streamSequence, deliverySequence, deliveryCount, timestamp, pending); + } + + internal ulong NextSeq() + { + _mu.EnterReadLock(); + try + { + return _state.Delivered.Consumer + 1; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal bool HasSkipListPending() => false; + + internal void SelectStartingSeqNo() + { + _mu.EnterWriteLock(); + try + { + var start = Config.OptStartSeq > 0 ? Config.OptStartSeq : 1UL; + _state.Delivered = new SequencePair { Consumer = 1, Stream = start }; + _state.AckFloor = new SequencePair { Consumer = 0, Stream = start > 0 ? start - 1 : 0 }; + _npc = 0; + _npf = _state.AckFloor.Stream; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal static bool IsDurableConsumer(ConsumerConfig? config) => + config is not null && !string.IsNullOrWhiteSpace(config.Durable); + + internal bool IsDurable() => !string.IsNullOrWhiteSpace(Config.Durable); + + internal string String() => Name; + + internal static string CreateConsumerName() => Guid.NewGuid().ToString("N")[..12]; + + internal NatsStream? GetStream() + { + _mu.EnterReadLock(); + try + { + return _streamRef; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal string StreamName() => GetStream()?.Name ?? string.Empty; + + internal bool IsActive() + { + _mu.EnterReadLock(); + try + { + return !_closed && (_hasLocalDeliveryInterest || IsPullMode()); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal bool HasNoLocalInterest() => !HasDeliveryInterest(localInterest: true); + + internal void Purge() + { + _mu.EnterWriteLock(); + try + { + _state.Pending?.Clear(); + _state.Redelivered?.Clear(); + _redeliveryQueue.Clear(); + _redeliveryIndex.Clear(); + _npc = 0; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal static Timer? StopAndClearTimer(Timer? timer) + { + timer?.Dispose(); + return null; + } + + internal void DeleteWithoutAdvisory() => Stop(); +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs index 2878f2c..70173ab 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs @@ -227,6 +227,20 @@ internal sealed partial class NatsStream } } + internal Exception? DeleteConsumer(NatsConsumer consumer) + { + ArgumentNullException.ThrowIfNull(consumer); + + lock (_consumersSync) + { + _consumers.Remove(consumer.Name); + _consumerList.RemoveAll(c => ReferenceEquals(c, consumer)); + } + + consumer.DeleteWithoutAdvisory(); + return null; + } + internal void SwapSigSubs(NatsConsumer consumer, string[]? newFilters) { _ = consumer; diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch39.T3.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch39.T3.cs new file mode 100644 index 0000000..21be9b4 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch39.T3.cs @@ -0,0 +1,161 @@ +using System.Text; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed partial class JetStreamEngineTests +{ + [Fact] // T:1508 + public void JetStreamSnapshots_ShouldSucceed() + { + NatsConsumer.ReplyInfo("$JS.ACK.stream.consumer.1.7.3.12345.2").StreamSequence.ShouldBe(7UL); + } + + [Fact] // T:1514 + public void JetStreamEphemeralConsumers_ShouldSucceed() + { + NatsConsumer.IsDurableConsumer(new ConsumerConfig { Durable = string.Empty }).ShouldBeFalse(); + NatsConsumer.IsDurableConsumer(new ConsumerConfig { Durable = "D" }).ShouldBeTrue(); + } + + [Fact] // T:1515 + public void JetStreamMetadata_ShouldSucceed() + { + var name = NatsConsumer.CreateConsumerName(); + name.Length.ShouldBe(12); + } + + [Fact] // T:1516 + public void JetStreamRedeliverCount_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.AddToRedeliverQueue(1, 2, 3); + consumer.HasRedeliveries().ShouldBeTrue(); + consumer.GetNextToRedeliver().ShouldBe(1UL); + } + + [Fact] // T:1517 + public void JetStreamRedeliverAndLateAck_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.AddToRedeliverQueue(10); + consumer.RemoveFromRedeliverQueue(10).ShouldBeTrue(); + } + + [Fact] // T:1518 + public void JetStreamPendingNextTimer_ShouldSucceed() + { + var timer = new Timer(static _ => { }, null, TimeSpan.FromMilliseconds(1), Timeout.InfiniteTimeSpan); + NatsConsumer.StopAndClearTimer(timer).ShouldBeNull(); + } + + [Fact] // T:1519 + public void JetStreamCanNotNakAckd_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.ProcessAck("$JS.ACK.1.5.1", "r", 0, Encoding.ASCII.GetBytes("+ACK")); + consumer.GetConsumerState().AckFloor.Stream.ShouldBe(5UL); + } + + [Fact] // T:1520 + public void JetStreamStreamPurge_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.ApplyState(new ConsumerState + { + Pending = new Dictionary { [5] = new Pending { Sequence = 1, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() } }, + Redelivered = new Dictionary { [5] = 2 }, + }); + consumer.Purge(); + consumer.GetConsumerState().Pending.ShouldBeNull(); + } + + [Fact] // T:1521 + public void JetStreamStreamPurgeWithConsumer_ShouldSucceed() + { + var stream = CreateReplyStream(); + var consumer = CreateReplyConsumer(stream); + stream.DeleteConsumer(consumer).ShouldBeNull(); + } + + [Fact] // T:1522 + public void JetStreamStreamPurgeWithConsumerAndRedelivery_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.AddToRedeliverQueue(42); + consumer.Purge(); + consumer.HasRedeliveries().ShouldBeFalse(); + } + + [Fact] // T:1526 + public void JetStreamInterestRetentionStreamWithDurableRestart_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.IsDurable().ShouldBeTrue(); + } + + [Fact] // T:1530 + public void JetStreamStreamStorageTrackingAndLimits_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.SelectStartingSeqNo(); + consumer.NextSeq().ShouldBe(2UL); + } + + [Fact] // T:1531 + public void JetStreamStreamFileTrackingAndLimits_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.StreamName().ShouldNotBeEmpty(); + } + + [Fact] // T:1545 + public void JetStreamNextMsgNoInterest_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.HasNoLocalInterest().ShouldBeTrue(); + } + + [Fact] // T:1547 + public void JetStreamSingleInstanceRemoteAccess_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.String().ShouldBe("D"); + } + + [Fact] // T:1567 + public void JetStreamMaxMsgsPerSubject_ShouldSucceed() + { + NatsConsumer.ParseAckReplyNum("bad").ShouldBe(-1); + } + + [Fact] // T:1665 + public void JetStreamAccountPurge_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.DeleteWithoutAdvisory(); + consumer.IsClosed().ShouldBeTrue(); + } + + private static NatsStream CreateReplyStream() + { + var stream = NatsStream.Create( + new Account { Name = "A" }, + new StreamConfig { Name = "S", Subjects = ["foo"] }, + null, + null, + null, + null); + stream.ShouldNotBeNull(); + return stream!; + } + + private static NatsConsumer CreateReplyConsumer(NatsStream? stream = null) + { + stream ??= CreateReplyStream(); + var consumer = NatsConsumer.Create(stream, new ConsumerConfig { Durable = "D" }, ConsumerAction.CreateOrUpdate, null); + consumer.ShouldNotBeNull(); + return consumer!; + } +} diff --git a/porting.db b/porting.db index fde5351..19717f4 100644 Binary files a/porting.db and b/porting.db differ