diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Acks.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Acks.cs index 43f1817..b671048 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Acks.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Acks.cs @@ -87,6 +87,14 @@ internal sealed partial class NatsConsumer SendAckReply(reply); } + internal bool ProcessAckMsg(ulong streamSequence, ulong deliverySequence, ulong deliveryCount, string reply, bool doSample) + { + ProcessAckMessage(streamSequence, deliverySequence, deliveryCount, reply); + if (doSample && NeedAck()) + SampleAck(reply); + return true; + } + internal bool ProcessNak(ulong streamSequence, ulong deliverySequence, ulong deliveryCount, byte[] message) { ArgumentNullException.ThrowIfNull(message); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs index afae093..86e5c99 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs @@ -1,10 +1,12 @@ using System.Text.Json; +using System.Text; namespace ZB.MOM.NatsNet.Server; internal sealed partial class NatsConsumer { private static readonly TimeSpan DefaultGatewayInterestInterval = TimeSpan.FromSeconds(1); + private ConsumerInfo? _initialInfo; internal bool UpdateDeliveryInterest(bool localInterest) { @@ -582,4 +584,158 @@ internal sealed partial class NatsConsumer _mu.ExitReadLock(); } } + + internal void ApplyState(ConsumerState state) + { + ArgumentNullException.ThrowIfNull(state); + _mu.EnterWriteLock(); + try + { + _state = new ConsumerState + { + Delivered = new SequencePair + { + Consumer = state.Delivered.Consumer, + Stream = state.Delivered.Stream, + }, + AckFloor = new SequencePair + { + Consumer = state.AckFloor.Consumer, + Stream = state.AckFloor.Stream, + }, + Pending = state.Pending is null ? null : new Dictionary(state.Pending), + Redelivered = state.Redelivered is null ? null : new Dictionary(state.Redelivered), + }; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void SetStoreState(ConsumerState state) => ApplyState(state); + + internal ConsumerState WriteStoreState() + { + _mu.EnterWriteLock(); + try + { + return WriteStoreStateUnlocked(); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ConsumerState WriteStoreStateUnlocked() => GetConsumerState(); + + internal ConsumerInfo InitialInfo() + { + _mu.EnterWriteLock(); + try + { + _initialInfo ??= GetInfo(); + return _initialInfo; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void ClearInitialInfo() + { + _mu.EnterWriteLock(); + try + { + _initialInfo = null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ConsumerInfo Info() => GetInfo(); + + internal ConsumerInfo InfoWithSnap(ConsumerState? snapshot = null) + { + if (snapshot is null) + return GetInfo(); + + var info = GetInfo(); + info.Delivered = new SequenceInfo { Consumer = snapshot.Delivered.Consumer, Stream = snapshot.Delivered.Stream }; + info.AckFloor = new SequenceInfo { Consumer = snapshot.AckFloor.Consumer, Stream = snapshot.AckFloor.Stream }; + return info; + } + + internal (ConsumerInfo Info, string ReplySubject) InfoWithSnapAndReply(string replySubject, ConsumerState? snapshot = null) => + (InfoWithSnap(snapshot), replySubject); + + internal void SignalNewMessages() => _updateChannel.Writer.TryWrite(true); + + internal bool ShouldSample() + { + if (string.IsNullOrWhiteSpace(Config.SampleFrequency)) + return false; + + var token = Config.SampleFrequency!.Trim().TrimEnd('%'); + if (!int.TryParse(token, out var percent)) + return false; + + return percent > 0; + } + + internal bool SampleAck(string ackReply) + { + if (!ShouldSample()) + return false; + + if (string.IsNullOrWhiteSpace(ackReply)) + return false; + + AddAckReply((ulong)ackReply.Length, ackReply); + return true; + } + + internal bool IsFiltered(string subject) + { + if (string.IsNullOrWhiteSpace(subject)) + return false; + + if (!string.IsNullOrWhiteSpace(Config.FilterSubject)) + return Internal.DataStructures.SubscriptionIndex.SubjectIsSubsetMatch(subject, Config.FilterSubject!); + + if (Config.FilterSubjects is not { Length: > 0 }) + return false; + + return Config.FilterSubjects.Any(filter => Internal.DataStructures.SubscriptionIndex.SubjectIsSubsetMatch(subject, filter)); + } + + internal bool NeedAck() => Config.AckPolicy != AckPolicy.AckNone; + + internal static (JsApiConsumerGetNextRequest? Request, Exception? Error) NextReqFromMsg(ReadOnlySpan message) + { + if (message.Length == 0) + return (new JsApiConsumerGetNextRequest { Batch = 1 }, null); + + try + { + var text = Encoding.UTF8.GetString(message); + if (int.TryParse(text, out var batch)) + return (new JsApiConsumerGetNextRequest { Batch = Math.Max(1, batch) }, null); + + var req = JsonSerializer.Deserialize(text); + if (req is null) + return (null, new InvalidOperationException("invalid request")); + if (req.Batch <= 0) + req.Batch = 1; + return (req, null); + } + catch (Exception ex) + { + return (null, ex); + } + } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.WaitQueue.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.WaitQueue.cs new file mode 100644 index 0000000..e9b9335 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.WaitQueue.cs @@ -0,0 +1,6 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + internal static WaitQueue NewWaitQueue(int max = 0) => WaitQueue.NewWaitQueue(max); +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs index 8512e52..b49deb0 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs @@ -456,6 +456,42 @@ public sealed class WaitingRequest /// Optional pull request priority group metadata. public PriorityGroup? PriorityGroup { get; set; } + + public bool RecycleIfDone() + { + if (N > 0 || MaxBytes > 0 && B < MaxBytes) + return false; + + Recycle(); + return true; + } + + public void Recycle() + { + Subject = string.Empty; + Reply = null; + N = 0; + D = 0; + NoWait = 0; + Expires = null; + MaxBytes = 0; + B = 0; + PriorityGroup = null; + } +} + +public sealed class WaitingDelivery +{ + public string Reply { get; set; } = string.Empty; + public ulong Sequence { get; set; } + public DateTime Created { get; set; } = DateTime.UtcNow; + + public void Recycle() + { + Reply = string.Empty; + Sequence = 0; + Created = DateTime.UnixEpoch; + } } /// @@ -681,6 +717,8 @@ public sealed class WaitQueue } private static int PriorityOf(WaitingRequest req) => req.PriorityGroup?.Priority ?? int.MaxValue; + + public static WaitQueue NewWaitQueue(int max = 0) => new(max); } /// diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/ConsumerStateTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/ConsumerStateTests.cs index 3f51cf8..13e06cc 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/ConsumerStateTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/ConsumerStateTests.cs @@ -78,4 +78,52 @@ public sealed class ConsumerStateTests state.Delivered.Stream.ShouldBe(100UL); state.AckFloor.Stream.ShouldBe(99UL); } + + [Fact] + public void StoreStateAndInfoSamplingAndFiltering_ShouldBehave() + { + var consumer = CreateConsumer(); + var state = new ConsumerState + { + Delivered = new SequencePair { Consumer = 11, Stream = 22 }, + AckFloor = new SequencePair { Consumer = 10, Stream = 21 }, + Pending = new Dictionary { [22] = new Pending { Sequence = 11, Timestamp = 1 } }, + }; + + consumer.ApplyState(state); + consumer.SetStoreState(state); + consumer.WriteStoreState().Delivered.Stream.ShouldBe(22UL); + consumer.WriteStoreStateUnlocked().Delivered.Stream.ShouldBe(22UL); + consumer.ReadStoredState().Delivered.Stream.ShouldBe(22UL); + + consumer.InitialInfo().Stream.ShouldBe("S"); + consumer.ClearInitialInfo(); + consumer.Info().Name.ShouldBe("D"); + consumer.InfoWithSnap(state).Delivered.Stream.ShouldBe(22UL); + var (info, reply) = consumer.InfoWithSnapAndReply("r", state); + info.Stream.ShouldBe("S"); + reply.ShouldBe("r"); + + consumer.SignalNewMessages(); + consumer.UpdateConfig(new ConsumerConfig { Durable = "D", SampleFrequency = "100%", FilterSubject = "foo.*", AckPolicy = AckPolicy.AckExplicit }); + consumer.ShouldSample().ShouldBeTrue(); + consumer.SampleAck("reply").ShouldBeTrue(); + consumer.ProcessAckMsg(22, 11, 2, "reply", doSample: true).ShouldBeTrue(); + consumer.IsFiltered("foo.bar").ShouldBeTrue(); + consumer.NeedAck().ShouldBeTrue(); + } + + [Fact] + public void NextReqFromMsg_ShouldParseBatchAndJson() + { + var (simple, simpleErr) = NatsConsumer.NextReqFromMsg("5"u8); + simpleErr.ShouldBeNull(); + simple.ShouldNotBeNull(); + simple!.Batch.ShouldBe(5); + + var (jsonReq, jsonErr) = NatsConsumer.NextReqFromMsg("{\"batch\":2,\"expires\":\"00:00:01\"}"u8); + jsonErr.ShouldBeNull(); + jsonReq.ShouldNotBeNull(); + jsonReq!.Batch.ShouldBe(2); + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/WaitQueueTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/WaitQueueTests.cs index 215360b..dd6e937 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/WaitQueueTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/WaitQueueTests.cs @@ -48,4 +48,39 @@ public sealed class WaitQueueTests q.Cycle(); q.Peek()!.Reply.ShouldBe("1b"); } + + [Fact] + public void WaitingRequestRecycle_AndWaitQueueFactory_ShouldBehave() + { + var request = new WaitingRequest + { + Subject = "s", + Reply = "r", + N = 0, + D = 1, + MaxBytes = 10, + B = 10, + PriorityGroup = new PriorityGroup { Group = "g", Priority = 1 }, + }; + + request.RecycleIfDone().ShouldBeTrue(); + request.Subject.ShouldBe(string.Empty); + request.Reply.ShouldBeNull(); + + var q = WaitQueue.NewWaitQueue(max: 3); + q.ShouldNotBeNull(); + q.IsFull(3).ShouldBeFalse(); + } + + [Fact] + public void WaitingDeliveryRecycle_ShouldClearState() + { + var wd = new WaitingDelivery { Reply = "r", Sequence = 42, Created = DateTime.UtcNow }; + + wd.Recycle(); + + wd.Reply.ShouldBe(string.Empty); + wd.Sequence.ShouldBe(0UL); + wd.Created.ShouldBe(DateTime.UnixEpoch); + } } diff --git a/porting.db b/porting.db index 479ad4b..b8fa0aa 100644 Binary files a/porting.db and b/porting.db differ