diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.ConsumerConfig.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.ConsumerConfig.cs new file mode 100644 index 0000000..5d8a198 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.ConsumerConfig.cs @@ -0,0 +1,47 @@ +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class Account +{ + internal Exception? CheckNewConsumerConfig(ConsumerConfig current, ConsumerConfig next) + { + ArgumentNullException.ThrowIfNull(current); + ArgumentNullException.ThrowIfNull(next); + + if (NatsConsumer.ConfigsEqualSansDelivery(current, next) && + string.Equals(current.DeliverSubject, next.DeliverSubject, StringComparison.Ordinal)) + { + return null; + } + + if (current.DeliverPolicy != next.DeliverPolicy) + return new InvalidOperationException("deliver policy can not be updated"); + if (current.OptStartSeq != next.OptStartSeq) + return new InvalidOperationException("start sequence can not be updated"); + if (current.OptStartTime != next.OptStartTime) + return new InvalidOperationException("start time can not be updated"); + if (current.AckPolicy != next.AckPolicy) + return new InvalidOperationException("ack policy can not be updated"); + if (current.ReplayPolicy != next.ReplayPolicy) + return new InvalidOperationException("replay policy can not be updated"); + if (current.Heartbeat != next.Heartbeat) + return new InvalidOperationException("heart beats can not be updated"); + if (current.FlowControl != next.FlowControl) + return new InvalidOperationException("flow control can not be updated"); + + if (!string.Equals(current.DeliverSubject, next.DeliverSubject, StringComparison.Ordinal)) + { + if (string.IsNullOrWhiteSpace(current.DeliverSubject)) + return new InvalidOperationException("can not update pull consumer to push based"); + if (string.IsNullOrWhiteSpace(next.DeliverSubject)) + return new InvalidOperationException("can not update push consumer to pull based"); + } + + if (current.MaxWaiting != next.MaxWaiting) + return new InvalidOperationException("max waiting can not be updated"); + + if (next.BackOff is { Length: > 0 } && next.MaxDeliver != -1 && next.BackOff.Length > next.MaxDeliver) + return new InvalidOperationException(JsApiErrors.NewJSConsumerMaxDeliverBackoffError().Description ?? "max deliver backoff invalid"); + + return null; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/SubjectTokens.ConsumerFilters.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/SubjectTokens.ConsumerFilters.cs new file mode 100644 index 0000000..711d466 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/SubjectTokens.ConsumerFilters.cs @@ -0,0 +1,10 @@ +namespace ZB.MOM.NatsNet.Server; + +internal static class SubjectTokens +{ + internal static string[] Subjects(IEnumerable filters) + { + ArgumentNullException.ThrowIfNull(filters); + return filters.Where(static filter => !string.IsNullOrWhiteSpace(filter)).ToArray(); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Acks.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Acks.cs new file mode 100644 index 0000000..b671048 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Acks.cs @@ -0,0 +1,262 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + private static readonly byte[] AckAck = "+ACK"u8.ToArray(); + private static readonly byte[] AckOk = "+OK"u8.ToArray(); + private static readonly byte[] AckNak = "-NAK"u8.ToArray(); + private static readonly byte[] AckProgress = "+WPI"u8.ToArray(); + private static readonly byte[] AckNext = "+NXT"u8.ToArray(); + + private readonly Queue _ackQueue = new(); + private string? _lastAckReplySubject; + + internal void SendAckReply(string subject) + { + if (string.IsNullOrWhiteSpace(subject)) + return; + + _mu.EnterWriteLock(); + try + { + _lastAckReplySubject = subject; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal static JsAckMsg NewJSAckMsg(string subject, string reply, int headerBytes, byte[] msg) + => new() + { + Subject = subject, + Reply = reply, + HeaderBytes = headerBytes, + Msg = msg, + }; + + internal void PushAck(string subject, string reply, int headerBytes, byte[] rawMessage) + { + ArgumentNullException.ThrowIfNull(rawMessage); + _mu.EnterWriteLock(); + try + { + _ackQueue.Enqueue(NewJSAckMsg(subject, reply, headerBytes, (byte[])rawMessage.Clone())); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void ProcessAck(string subject, string reply, int headerBytes, byte[] rawMessage) + { + ArgumentNullException.ThrowIfNull(rawMessage); + + var msg = headerBytes > 0 && headerBytes <= rawMessage.Length + ? rawMessage[headerBytes..] + : rawMessage; + + var (streamSeq, deliverySeq, deliveryCount) = AckReplyInfo(subject); + var skipAckReply = streamSeq == 0; + + if (msg.Length == 0 || msg.SequenceEqual(AckAck) || msg.SequenceEqual(AckOk)) + { + ProcessAckMessage(streamSeq, deliverySeq, deliveryCount, reply); + } + else if (StartsWith(msg, AckNext)) + { + ProcessAckMessage(streamSeq, deliverySeq, deliveryCount, string.Empty); + skipAckReply = true; + } + else if (StartsWith(msg, AckNak)) + { + _state.Redelivered ??= new Dictionary(); + _state.Redelivered[streamSeq] = _state.Redelivered.TryGetValue(streamSeq, out var redeliveries) + ? redeliveries + 1 + : 1UL; + skipAckReply = true; + } + else if (msg.SequenceEqual(AckProgress)) + { + ProgressUpdate(streamSeq); + } + + if (!string.IsNullOrWhiteSpace(reply) && !skipAckReply) + SendAckReply(reply); + } + + internal bool ProcessAckMsg(ulong streamSequence, ulong deliverySequence, ulong deliveryCount, string reply, bool doSample) + { + ProcessAckMessage(streamSequence, deliverySequence, deliveryCount, reply); + if (doSample && NeedAck()) + SampleAck(reply); + return true; + } + + internal bool ProcessNak(ulong streamSequence, ulong deliverySequence, ulong deliveryCount, byte[] message) + { + ArgumentNullException.ThrowIfNull(message); + _mu.EnterWriteLock(); + try + { + _state.Redelivered ??= []; + _state.Redelivered[streamSequence] = Math.Max(deliveryCount + 1, _state.Redelivered.GetValueOrDefault(streamSequence)); + _state.Pending ??= []; + _state.Pending[streamSequence] = new Pending + { + Sequence = deliverySequence, + Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + }; + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool ProcessTerm(ulong streamSequence, ulong deliverySequence, ulong deliveryCount, string reason, string reply) + { + _mu.EnterWriteLock(); + try + { + _state.Pending?.Remove(streamSequence); + _state.Redelivered ??= []; + _state.Redelivered[streamSequence] = Math.Max(deliveryCount, _state.Redelivered.GetValueOrDefault(streamSequence)); + if (!string.IsNullOrWhiteSpace(reply)) + _lastAckReplySubject = reply; + _ = reason; + _ = deliverySequence; + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal TimeSpan AckWait(TimeSpan backOff) + { + if (backOff > TimeSpan.Zero) + return backOff; + + return Config.AckWait > TimeSpan.Zero ? Config.AckWait : DefaultAckWait; + } + + internal bool CheckRedelivered(ulong streamSequence) + { + _mu.EnterReadLock(); + try + { + return _state.Redelivered?.TryGetValue(streamSequence, out var count) == true && count > 1; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void ProgressUpdate(ulong sequence) + { + _mu.EnterWriteLock(); + try + { + _state.Pending ??= new Dictionary(); + if (_state.Pending.TryGetValue(sequence, out var pending)) + { + pending.Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + _state.Pending[sequence] = pending; + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void UpdateSkipped(ulong sequence) + { + _mu.EnterWriteLock(); + try + { + _state.AckFloor.Stream = Math.Max(_state.AckFloor.Stream, sequence > 0 ? sequence - 1 : 0); + _state.AckFloor.Consumer = Math.Max(_state.AckFloor.Consumer, sequence > 0 ? sequence - 1 : 0); + _updateChannel.Writer.TryWrite(true); + } + finally + { + _mu.ExitWriteLock(); + } + } + + private void ProcessAckMessage(ulong streamSeq, ulong deliverySeq, ulong deliveryCount, string reply) + { + _mu.EnterWriteLock(); + try + { + _state.Pending ??= new Dictionary(); + _state.Pending.Remove(streamSeq); + _state.AckFloor.Stream = Math.Max(_state.AckFloor.Stream, streamSeq); + _state.AckFloor.Consumer = Math.Max(_state.AckFloor.Consumer, deliverySeq); + _state.Delivered.Stream = Math.Max(_state.Delivered.Stream, streamSeq); + _state.Delivered.Consumer = Math.Max(_state.Delivered.Consumer, deliverySeq); + + _state.Redelivered ??= new Dictionary(); + if (deliveryCount > 1) + _state.Redelivered[streamSeq] = deliveryCount; + + if (!string.IsNullOrWhiteSpace(reply)) + _lastAckReplySubject = reply; + } + finally + { + _mu.ExitWriteLock(); + } + } + + private static (ulong StreamSequence, ulong DeliverySequence, ulong DeliveryCount) AckReplyInfo(string subject) + { + if (string.IsNullOrWhiteSpace(subject)) + return (0, 0, 0); + + var numbers = subject + .Split('.', StringSplitOptions.RemoveEmptyEntries) + .Select(static token => ulong.TryParse(token, out var value) ? (ulong?)value : null) + .Where(static value => value.HasValue) + .Select(static value => value!.Value) + .ToArray(); + + if (numbers.Length >= 3) + return (numbers[^2], numbers[^3], numbers[^1]); + if (numbers.Length == 2) + return (numbers[1], numbers[0], 1); + if (numbers.Length == 1) + return (numbers[0], numbers[0], 1); + + return (0, 0, 0); + } + + private static bool StartsWith(byte[] message, byte[] prefix) + { + if (message.Length < prefix.Length) + return false; + + for (var i = 0; i < prefix.Length; i++) + { + if (message[i] != prefix[i]) + return false; + } + + return true; + } +} + +internal sealed class JsAckMsg +{ + internal string Subject { get; set; } = string.Empty; + internal string Reply { get; set; } = string.Empty; + internal int HeaderBytes { get; set; } + internal byte[] Msg { get; set; } = []; +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Advisories.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Advisories.cs new file mode 100644 index 0000000..71e6678 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Advisories.cs @@ -0,0 +1,44 @@ +using System.Text; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + private string? _lastAdvisorySubject; + private byte[]? _lastAdvisoryPayload; + private DateTime _lastAdvisorySent; + + internal bool SendAdvisory(string subject, object advisory) + { + if (string.IsNullOrWhiteSpace(subject) || advisory is null) + return false; + + _mu.EnterWriteLock(); + try + { + _lastAdvisorySubject = subject; + _lastAdvisoryPayload = Encoding.UTF8.GetBytes(advisory.ToString() ?? string.Empty); + _lastAdvisorySent = DateTime.UtcNow; + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool SendDeleteAdvisoryLocked() => + SendAdvisory($"{JsApiSubjects.JsAdvisoryConsumerDeleted}.{Stream}.{Name}", new { action = "delete" }); + + internal bool SendPinnedAdvisoryLocked(string pinId) => + SendAdvisory($"{JsApiSubjects.JsAdvisoryConsumerPinned}.{Stream}.{Name}", new { pin = pinId }); + + internal bool SendUnpinnedAdvisoryLocked(string pinId) => + SendAdvisory($"{JsApiSubjects.JsAdvisoryConsumerUnpinned}.{Stream}.{Name}", new { pin = pinId }); + + internal bool SendCreateAdvisory() => + SendAdvisory($"{JsApiSubjects.JsAdvisoryConsumerCreated}.{Stream}.{Name}", new { action = "create" }); + + internal bool SendPauseAdvisoryLocked(DateTime pauseUntil) => + SendAdvisory($"{JsApiSubjects.JsAdvisoryConsumerPause}.{Stream}.{Name}", new { pauseUntil }); +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Config.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Config.cs new file mode 100644 index 0000000..cb90083 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Config.cs @@ -0,0 +1,268 @@ +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + internal const int DefaultMaxAckPending = 1000; + internal static readonly TimeSpan DefaultAckWait = TimeSpan.FromSeconds(30); + internal static readonly TimeSpan DefaultDeleteWait = TimeSpan.FromSeconds(5); + internal static readonly TimeSpan DefaultPinnedTtl = TimeSpan.FromMinutes(2); + + internal static JsApiError? SetConsumerConfigDefaults( + ConsumerConfig config, + StreamConfig streamConfig, + JetStreamAccountLimits? selectedLimits, + bool pedantic) + { + ArgumentNullException.ThrowIfNull(config); + ArgumentNullException.ThrowIfNull(streamConfig); + var streamReplicas = Math.Max(1, streamConfig.Replicas); + + if (config.MaxDeliver is 0 or < -1) + { + if (pedantic && config.MaxDeliver < -1) + return JsApiErrors.NewJSPedanticError(new InvalidOperationException("max_deliver must be set to -1")); + config.MaxDeliver = -1; + } + + if (config.MaxWaiting < 0) + { + if (pedantic) + return JsApiErrors.NewJSPedanticError(new InvalidOperationException("max_waiting must not be negative")); + config.MaxWaiting = 0; + } + + if (config.MaxAckPending < -1) + { + if (pedantic) + return JsApiErrors.NewJSPedanticError(new InvalidOperationException("max_ack_pending must be set to -1")); + config.MaxAckPending = -1; + } + + if (config.MaxRequestBatch < 0) + { + if (pedantic) + return JsApiErrors.NewJSPedanticError(new InvalidOperationException("max_batch must not be negative")); + config.MaxRequestBatch = 0; + } + + if (config.MaxRequestExpires < TimeSpan.Zero) + { + if (pedantic) + return JsApiErrors.NewJSPedanticError(new InvalidOperationException("max_expires must not be negative")); + config.MaxRequestExpires = TimeSpan.Zero; + } + + if (config.MaxRequestMaxBytes < 0) + { + if (pedantic) + return JsApiErrors.NewJSPedanticError(new InvalidOperationException("max_bytes must not be negative")); + config.MaxRequestMaxBytes = 0; + } + + if (config.Heartbeat < TimeSpan.Zero) + { + if (pedantic) + return JsApiErrors.NewJSPedanticError(new InvalidOperationException("idle_heartbeat must not be negative")); + config.Heartbeat = TimeSpan.Zero; + } + + if (config.InactiveThreshold < TimeSpan.Zero) + { + if (pedantic) + return JsApiErrors.NewJSPedanticError(new InvalidOperationException("inactive_threshold must not be negative")); + config.InactiveThreshold = TimeSpan.Zero; + } + + if (config.PinnedTTL < TimeSpan.Zero) + { + if (pedantic) + return JsApiErrors.NewJSPedanticError(new InvalidOperationException("priority_timeout must not be negative")); + config.PinnedTTL = TimeSpan.Zero; + } + + if (config.AckWait == TimeSpan.Zero) + config.AckWait = DefaultAckWait; + + if (config.MaxAckPending == 0 && config.AckPolicy != AckPolicy.AckNone) + { + config.MaxAckPending = selectedLimits?.MaxAckPending > 0 + ? selectedLimits.MaxAckPending + : DefaultMaxAckPending; + } + + if (config.InactiveThreshold == TimeSpan.Zero && string.IsNullOrWhiteSpace(config.Durable)) + config.InactiveThreshold = DefaultDeleteWait; + + if (config.PinnedTTL == TimeSpan.Zero && config.PriorityPolicy == PriorityPolicy.PriorityPinnedClient) + config.PinnedTTL = DefaultPinnedTtl; + + if (config.Replicas == 0 || config.Replicas > streamReplicas) + config.Replicas = streamReplicas; + + if (!string.IsNullOrWhiteSpace(config.Name) && string.IsNullOrWhiteSpace(config.Durable)) + config.Durable = config.Name; + + return null; + } + + internal static JsApiError? CheckConsumerCfg( + ConsumerConfig config, + StreamConfig streamConfig, + JetStreamAccountLimits? selectedLimits, + bool isRecovering) + { + ArgumentNullException.ThrowIfNull(config); + ArgumentNullException.ThrowIfNull(streamConfig); + var streamReplicas = Math.Max(1, streamConfig.Replicas); + + if (!string.IsNullOrWhiteSpace(config.Durable) && + !string.IsNullOrWhiteSpace(config.Name) && + !string.Equals(config.Durable, config.Name, StringComparison.Ordinal)) + { + return JsApiErrors.NewJSConsumerCreateDurableAndNameMismatchError(); + } + + if (HasPathSeparators(config.Durable) || HasPathSeparators(config.Name)) + return JsApiErrors.NewJSConsumerNameContainsPathSeparatorsError(); + + if (config.Replicas > streamReplicas) + return JsApiErrors.NewJSConsumerReplicasExceedsStreamError(); + + if (!Enum.IsDefined(config.AckPolicy)) + return JsApiErrors.NewJSConsumerAckPolicyInvalidError(); + + if (!Enum.IsDefined(config.ReplayPolicy)) + return JsApiErrors.NewJSConsumerReplayPolicyInvalidError(); + + if (!Enum.IsDefined(config.DeliverPolicy)) + return JsApiErrors.NewJSConsumerInvalidPolicyError(new InvalidOperationException("deliver policy invalid")); + + if (config.FilterSubjects is { Length: > 0 } && !string.IsNullOrWhiteSpace(config.FilterSubject)) + return JsApiErrors.NewJSConsumerDuplicateFilterSubjectsError(); + + var filters = config.FilterSubjects is { Length: > 0 } + ? SubjectTokens.Subjects(config.FilterSubjects) + : (string.IsNullOrWhiteSpace(config.FilterSubject) ? [] : [config.FilterSubject]); + + for (var i = 0; i < filters.Length; i++) + { + if (string.IsNullOrWhiteSpace(filters[i])) + return JsApiErrors.NewJSConsumerEmptyFilterError(); + if (!SubscriptionIndex.IsValidSubject(filters[i])) + return JsApiErrors.NewJSConsumerFilterNotSubsetError(); + + for (var j = i + 1; j < filters.Length; j++) + { + if (SubscriptionIndex.SubjectsCollide(filters[i], filters[j])) + return JsApiErrors.NewJSConsumerOverlappingSubjectFiltersError(); + } + } + + var isPush = !string.IsNullOrWhiteSpace(config.DeliverSubject); + if (isPush) + { + if (!SubscriptionIndex.IsValidSubject(config.DeliverSubject!)) + return JsApiErrors.NewJSConsumerInvalidDeliverSubjectError(); + + if (SubscriptionIndex.SubjectHasWildcard(config.DeliverSubject!)) + return JsApiErrors.NewJSConsumerDeliverToWildcardsError(); + + if (config.MaxWaiting > 0) + return JsApiErrors.NewJSConsumerPushMaxWaitingError(); + } + else + { + if (config.RateLimit > 0) + return JsApiErrors.NewJSConsumerPullWithRateLimitError(); + } + + if (config.MaxAckPending > 0 && selectedLimits?.MaxAckPending > 0 && config.MaxAckPending > selectedLimits.MaxAckPending) + return JsApiErrors.NewJSConsumerMaxPendingAckExcessError(selectedLimits.MaxAckPending); + + if (streamConfig.Retention == RetentionPolicy.WorkQueuePolicy && config.AckPolicy != AckPolicy.AckExplicit) + return JsApiErrors.NewJSConsumerWQRequiresExplicitAckError(); + + if (config.Direct) + { + if (isPush) + return JsApiErrors.NewJSConsumerDirectRequiresPushError(); + if (!string.IsNullOrWhiteSpace(config.Durable)) + return JsApiErrors.NewJSConsumerDirectRequiresEphemeralError(); + } + + _ = isRecovering; + return null; + } + + internal void UpdateInactiveThreshold(ConsumerConfig config) + { + ArgumentNullException.ThrowIfNull(config); + + _mu.EnterWriteLock(); + try + { + _deleteThreshold = config.InactiveThreshold > TimeSpan.Zero + ? config.InactiveThreshold + : DefaultDeleteWait; + + Config.InactiveThreshold = _deleteThreshold; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void UpdatePauseState(ConsumerConfig config, DateTime? nowUtc = null) + { + ArgumentNullException.ThrowIfNull(config); + var now = nowUtc ?? DateTime.UtcNow; + + _mu.EnterWriteLock(); + try + { + Config.PauseUntil = config.PauseUntil; + _isPaused = config.PauseUntil.HasValue && config.PauseUntil.Value > now; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ConsumerAssignment? ConsumerAssignment() + { + _mu.EnterReadLock(); + try + { + return _assignment; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetConsumerAssignment(ConsumerAssignment? assignment) + { + _mu.EnterWriteLock(); + try + { + _assignment = assignment; + } + finally + { + _mu.ExitWriteLock(); + } + } + + private static bool HasPathSeparators(string? value) + { + if (string.IsNullOrWhiteSpace(value)) + return false; + + return value.Contains('/') || value.Contains('\\'); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Lifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Lifecycle.cs new file mode 100644 index 0000000..07ded34 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Lifecycle.cs @@ -0,0 +1,139 @@ +using System.Threading.Channels; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + private readonly HashSet _internalSubscriptions = new(StringComparer.Ordinal); + private readonly Channel _updateChannel = Channel.CreateBounded(4); + private Channel? _monitorQuitChannel = Channel.CreateBounded(1); + + internal ChannelReader? MonitorQuitC() + { + _mu.EnterReadLock(); + try + { + return _monitorQuitChannel?.Reader; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SignalMonitorQuit() + { + _mu.EnterWriteLock(); + try + { + var channel = _monitorQuitChannel; + if (channel is null) + return; + + channel.Writer.TryWrite(true); + channel.Writer.TryComplete(); + _monitorQuitChannel = null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ChannelReader UpdateC() => _updateChannel.Reader; + + internal bool CheckQueueInterest(string? queue = null) + { + _mu.EnterReadLock(); + try + { + if (_closed) + return false; + + if (_internalSubscriptions.Count > 0) + return true; + + return !string.IsNullOrWhiteSpace(queue) && _internalSubscriptions.Contains(queue); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void ClearNode() => ClearRaftNode(); + + internal bool IsLeaderInternal() => IsLeader(); + + internal ConsumerInfo? HandleClusterConsumerInfoRequest() => + IsLeader() && !_closed ? GetInfo() : null; + + internal bool SubscribeInternal(string subject) + { + if (string.IsNullOrWhiteSpace(subject)) + return false; + + _mu.EnterWriteLock(); + try + { + var added = _internalSubscriptions.Add(subject); + if (added) + _updateChannel.Writer.TryWrite(true); + return added; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool Unsubscribe(string subject) + { + if (string.IsNullOrWhiteSpace(subject)) + return false; + + _mu.EnterWriteLock(); + try + { + var removed = _internalSubscriptions.Remove(subject); + if (removed) + _updateChannel.Writer.TryWrite(true); + return removed; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal DateTime CreatedTime() + { + _mu.EnterReadLock(); + try + { + return Created; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetCreatedTime(DateTime created) + { + _mu.EnterWriteLock(); + try + { + Created = created; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool HasDeliveryInterest() + { + return HasDeliveryInterest(_hasLocalDeliveryInterest); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs new file mode 100644 index 0000000..86e5c99 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs @@ -0,0 +1,741 @@ +using System.Text.Json; +using System.Text; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + private static readonly TimeSpan DefaultGatewayInterestInterval = TimeSpan.FromSeconds(1); + private ConsumerInfo? _initialInfo; + + internal bool UpdateDeliveryInterest(bool localInterest) + { + var interest = HasDeliveryInterest(localInterest); + + _mu.EnterWriteLock(); + try + { + _hasLocalDeliveryInterest = localInterest; + if (_closed || IsPullMode()) + return false; + + var wasActive = !IsPullMode() && _isLeader; + if (interest && !wasActive) + _updateChannel.Writer.TryWrite(true); + + if (!interest) + _isPaused = false; + + if (_deleteTimer != null && _deleteThreshold > TimeSpan.Zero && !interest) + return true; + + _deleteTimer?.Dispose(); + _deleteTimer = null; + + if (!interest && _deleteThreshold > TimeSpan.Zero) + { + _deleteTimer = new Timer(static s => ((NatsConsumer)s!).DeleteNotActive(), this, _deleteThreshold, Timeout.InfiniteTimeSpan); + return true; + } + + return false; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void DeleteNotActive() + { + _mu.EnterReadLock(); + try + { + if (_closed) + return; + + if (IsPushMode()) + { + if (HasDeliveryInterest()) + return; + } + else + { + if (_state.Pending is { Count: > 0 }) + return; + } + } + finally + { + _mu.ExitReadLock(); + } + + Delete(); + } + + internal void WatchGWinterest() + { + var wasActive = HasDeliveryInterest(); + if (!_hasLocalDeliveryInterest) + { + UpdateDeliveryInterest(localInterest: false); + if (!wasActive && HasDeliveryInterest()) + _updateChannel.Writer.TryWrite(true); + } + + _mu.EnterWriteLock(); + try + { + _gatewayWatchTimer?.Dispose(); + _gatewayWatchTimer = new Timer(static s => ((NatsConsumer)s!).WatchGWinterest(), this, DefaultGatewayInterestInterval, Timeout.InfiniteTimeSpan); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool HasMaxDeliveries(ulong sequence) + { + _mu.EnterWriteLock(); + try + { + if (Config.MaxDeliver <= 0) + return false; + + _state.Redelivered ??= new Dictionary(); + _state.Pending ??= new Dictionary(); + + var deliveryCount = _state.Redelivered.TryGetValue(sequence, out var redeliveries) + ? redeliveries + 1 + : 1UL; + + if (deliveryCount < (ulong)Config.MaxDeliver) + { + _state.Redelivered[sequence] = deliveryCount; + return false; + } + + _state.Redelivered[sequence] = deliveryCount; + _state.Pending.Remove(sequence); + _updateChannel.Writer.TryWrite(true); + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void ForceExpirePending() + { + _mu.EnterWriteLock(); + try + { + if (_state.Pending is not { Count: > 0 }) + return; + + _state.Redelivered ??= new Dictionary(); + foreach (var seq in _state.Pending.Keys.ToArray()) + { + if (HasMaxDeliveries(seq)) + continue; + + _state.Redelivered[seq] = _state.Redelivered.TryGetValue(seq, out var current) + ? current + 1 + : 1UL; + + _state.Pending[seq].Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + } + + _updateChannel.Writer.TryWrite(true); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void SetRateLimitNeedsLocks() + { + _mu.EnterWriteLock(); + try + { + SetRateLimit(Config.RateLimit); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void SetRateLimit(ulong bitsPerSecond) + { + if (bitsPerSecond == 0) + { + Interlocked.Exchange(ref _rateLimitBitsPerSecond, 0); + _rateLimitBurstBytes = 0; + return; + } + + Interlocked.Exchange(ref _rateLimitBitsPerSecond, (long)bitsPerSecond); + var configuredMax = _streamRef?.Config.MaxMsgSize ?? 0; + _rateLimitBurstBytes = configuredMax > 0 + ? configuredMax + : (int)Math.Max(1024UL, bitsPerSecond / 8UL); + } + + internal bool UpdateDeliverSubject(string? newDeliver) + { + _mu.EnterWriteLock(); + try + { + return UpdateDeliverSubjectLocked(newDeliver); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool UpdateDeliverSubjectLocked(string? newDeliver) + { + if (_closed || IsPullMode() || string.Equals(Config.DeliverSubject, newDeliver, StringComparison.Ordinal)) + return false; + + if (_state.Pending is { Count: > 0 }) + ForceExpirePending(); + + if (!string.IsNullOrWhiteSpace(Config.DeliverSubject)) + _internalSubscriptions.Remove(Config.DeliverSubject!); + + Config.DeliverSubject = string.IsNullOrWhiteSpace(newDeliver) ? null : newDeliver; + + if (!string.IsNullOrWhiteSpace(Config.DeliverSubject)) + _internalSubscriptions.Add(Config.DeliverSubject!); + + _updateChannel.Writer.TryWrite(true); + return true; + } + + internal static bool ConfigsEqualSansDelivery(ConsumerConfig left, ConsumerConfig right) + { + var l = CloneConfig(left); + var r = CloneConfig(right); + l.DeliverSubject = null; + r.DeliverSubject = null; + + return JsonSerializer.Serialize(l) == JsonSerializer.Serialize(r); + } + + internal (ulong Sequence, bool CanRespond, Exception? Error) ResetStartingSeq(ulong sequence, string? reply) + { + _mu.EnterWriteLock(); + try + { + if (sequence == 0) + { + sequence = _state.AckFloor.Stream + 1; + } + else + { + switch (Config.DeliverPolicy) + { + case DeliverPolicy.DeliverAll: + break; + case DeliverPolicy.DeliverByStartSequence when sequence < Config.OptStartSeq: + return (0, false, new InvalidOperationException("below start seq")); + case DeliverPolicy.DeliverByStartTime when Config.OptStartTime.HasValue: + if (sequence == 0) + return (0, false, new InvalidOperationException("below start time")); + break; + default: + return (0, false, new InvalidOperationException("not allowed")); + } + } + + if (sequence == 0) + sequence = 1; + + _state.Delivered.Stream = sequence; + _state.Delivered.Consumer = sequence; + _state.AckFloor.Stream = sequence > 0 ? sequence - 1 : 0; + _state.AckFloor.Consumer = sequence > 0 ? sequence - 1 : 0; + _state.Pending = new Dictionary(); + _state.Redelivered = new Dictionary(); + _updateChannel.Writer.TryWrite(true); + + _ = reply; + return (sequence, true, null); + } + finally + { + _mu.ExitWriteLock(); + } + } + + private bool HasDeliveryInterest(bool localInterest) + { + _mu.EnterReadLock(); + try + { + if (string.IsNullOrWhiteSpace(Config.DeliverSubject)) + return false; + + if (localInterest) + return true; + + if (_streamRef?.Account is not { } account) + return false; + + if (account.Server is NatsServer server && server.HasGatewayInterest(account, Config.DeliverSubject!)) + return true; + + return _internalSubscriptions.Contains(Config.DeliverSubject!); + } + finally + { + _mu.ExitReadLock(); + } + } + + private bool IsPullMode() => string.IsNullOrWhiteSpace(Config.DeliverSubject); + + private bool IsPushMode() => !IsPullMode(); + + private static ConsumerConfig CloneConfig(ConsumerConfig cfg) => + new() + { + Durable = cfg.Durable, + Name = cfg.Name, + Description = cfg.Description, + DeliverPolicy = cfg.DeliverPolicy, + OptStartSeq = cfg.OptStartSeq, + OptStartTime = cfg.OptStartTime, + AckPolicy = cfg.AckPolicy, + AckWait = cfg.AckWait, + MaxDeliver = cfg.MaxDeliver, + BackOff = cfg.BackOff?.ToArray(), + FilterSubject = cfg.FilterSubject, + FilterSubjects = cfg.FilterSubjects?.ToArray(), + ReplayPolicy = cfg.ReplayPolicy, + RateLimit = cfg.RateLimit, + SampleFrequency = cfg.SampleFrequency, + MaxWaiting = cfg.MaxWaiting, + MaxAckPending = cfg.MaxAckPending, + FlowControl = cfg.FlowControl, + HeadersOnly = cfg.HeadersOnly, + MaxRequestBatch = cfg.MaxRequestBatch, + MaxRequestExpires = cfg.MaxRequestExpires, + MaxRequestMaxBytes = cfg.MaxRequestMaxBytes, + DeliverSubject = cfg.DeliverSubject, + DeliverGroup = cfg.DeliverGroup, + Heartbeat = cfg.Heartbeat, + InactiveThreshold = cfg.InactiveThreshold, + Replicas = cfg.Replicas, + MemoryStorage = cfg.MemoryStorage, + Direct = cfg.Direct, + Metadata = cfg.Metadata is null ? null : new Dictionary(cfg.Metadata), + PauseUntil = cfg.PauseUntil, + PriorityGroups = cfg.PriorityGroups?.ToArray(), + PriorityPolicy = cfg.PriorityPolicy, + PinnedTTL = cfg.PinnedTTL, + }; + + internal void ResetLocalStartingSeq(ulong sequence) + { + _mu.EnterWriteLock(); + try + { + _state.Delivered.Stream = sequence; + _state.Delivered.Consumer = sequence; + _state.AckFloor.Stream = sequence > 0 ? sequence - 1 : 0; + _state.AckFloor.Consumer = sequence > 0 ? sequence - 1 : 0; + _state.Pending = []; + _state.Redelivered = []; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal int LoopAndForwardProposals() + { + _mu.EnterWriteLock(); + try + { + var count = 0; + while (_proposalQueue.TryDequeue(out _)) + { + count++; + } + + return count; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void Propose(byte[] proposal) + { + ArgumentNullException.ThrowIfNull(proposal); + _mu.EnterWriteLock(); + try + { + _proposalQueue.Enqueue((byte[])proposal.Clone()); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void UpdateDelivered(ulong consumerSequence, ulong streamSequence, ulong deliveryCount, long timestamp) + { + _mu.EnterWriteLock(); + try + { + _state.Delivered.Consumer = Math.Max(_state.Delivered.Consumer, consumerSequence); + _state.Delivered.Stream = Math.Max(_state.Delivered.Stream, streamSequence); + _state.AckFloor.Consumer = Math.Max(_state.AckFloor.Consumer, consumerSequence > 0 ? consumerSequence - 1 : 0); + _state.AckFloor.Stream = Math.Max(_state.AckFloor.Stream, streamSequence > 0 ? streamSequence - 1 : 0); + _state.Pending ??= []; + _state.Pending[streamSequence] = new Pending + { + Sequence = consumerSequence, + Timestamp = timestamp, + }; + + _state.Redelivered ??= []; + if (deliveryCount > 1) + _state.Redelivered[streamSequence] = deliveryCount; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void AddAckReply(ulong streamSequence, string reply) + { + if (string.IsNullOrWhiteSpace(reply)) + return; + + _mu.EnterWriteLock(); + try + { + _ackReplies[streamSequence] = reply; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void AddReplicatedQueuedMsg(ulong sequence, JsPubMsg message) + { + ArgumentNullException.ThrowIfNull(message); + _mu.EnterWriteLock(); + try + { + _state.Pending ??= []; + _state.Pending[sequence] = new Pending + { + Sequence = sequence, + Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + }; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal int UpdateAcks() + { + _mu.EnterWriteLock(); + try + { + var count = _ackReplies.Count; + _ackReplies.Clear(); + return count; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void AddClusterPendingRequest(string requestId) + { + if (string.IsNullOrWhiteSpace(requestId)) + return; + + _mu.EnterWriteLock(); + try + { + _clusterPendingRequests[requestId] = DateTime.UtcNow; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void RemoveClusterPendingRequest(string requestId) + { + if (string.IsNullOrWhiteSpace(requestId)) + return; + + _mu.EnterWriteLock(); + try + { + _clusterPendingRequests.Remove(requestId); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void SetPendingRequestsOk(bool ok) + { + _mu.EnterWriteLock(); + try + { + _pendingRequestsOk = ok; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool PendingRequestsOk() + { + _mu.EnterReadLock(); + try + { + return _pendingRequestsOk; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal bool CheckAndSetPendingRequestsOk(bool ok) + { + _mu.EnterWriteLock(); + try + { + var previous = _pendingRequestsOk; + _pendingRequestsOk = ok; + return previous; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal int CheckPendingRequests(TimeSpan maxAge) + { + _mu.EnterReadLock(); + try + { + var cutoff = DateTime.UtcNow - maxAge; + return _clusterPendingRequests.Values.Count(timestamp => timestamp >= cutoff); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal int ReleaseAnyPendingRequests() + { + _mu.EnterWriteLock(); + try + { + var released = _clusterPendingRequests.Count; + _clusterPendingRequests.Clear(); + _pendingRequestsOk = true; + return released; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ConsumerState ReadStoredState() + { + _mu.EnterReadLock(); + try + { + return GetConsumerState(); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void ApplyState(ConsumerState state) + { + ArgumentNullException.ThrowIfNull(state); + _mu.EnterWriteLock(); + try + { + _state = new ConsumerState + { + Delivered = new SequencePair + { + Consumer = state.Delivered.Consumer, + Stream = state.Delivered.Stream, + }, + AckFloor = new SequencePair + { + Consumer = state.AckFloor.Consumer, + Stream = state.AckFloor.Stream, + }, + Pending = state.Pending is null ? null : new Dictionary(state.Pending), + Redelivered = state.Redelivered is null ? null : new Dictionary(state.Redelivered), + }; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void SetStoreState(ConsumerState state) => ApplyState(state); + + internal ConsumerState WriteStoreState() + { + _mu.EnterWriteLock(); + try + { + return WriteStoreStateUnlocked(); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ConsumerState WriteStoreStateUnlocked() => GetConsumerState(); + + internal ConsumerInfo InitialInfo() + { + _mu.EnterWriteLock(); + try + { + _initialInfo ??= GetInfo(); + return _initialInfo; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void ClearInitialInfo() + { + _mu.EnterWriteLock(); + try + { + _initialInfo = null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ConsumerInfo Info() => GetInfo(); + + internal ConsumerInfo InfoWithSnap(ConsumerState? snapshot = null) + { + if (snapshot is null) + return GetInfo(); + + var info = GetInfo(); + info.Delivered = new SequenceInfo { Consumer = snapshot.Delivered.Consumer, Stream = snapshot.Delivered.Stream }; + info.AckFloor = new SequenceInfo { Consumer = snapshot.AckFloor.Consumer, Stream = snapshot.AckFloor.Stream }; + return info; + } + + internal (ConsumerInfo Info, string ReplySubject) InfoWithSnapAndReply(string replySubject, ConsumerState? snapshot = null) => + (InfoWithSnap(snapshot), replySubject); + + internal void SignalNewMessages() => _updateChannel.Writer.TryWrite(true); + + internal bool ShouldSample() + { + if (string.IsNullOrWhiteSpace(Config.SampleFrequency)) + return false; + + var token = Config.SampleFrequency!.Trim().TrimEnd('%'); + if (!int.TryParse(token, out var percent)) + return false; + + return percent > 0; + } + + internal bool SampleAck(string ackReply) + { + if (!ShouldSample()) + return false; + + if (string.IsNullOrWhiteSpace(ackReply)) + return false; + + AddAckReply((ulong)ackReply.Length, ackReply); + return true; + } + + internal bool IsFiltered(string subject) + { + if (string.IsNullOrWhiteSpace(subject)) + return false; + + if (!string.IsNullOrWhiteSpace(Config.FilterSubject)) + return Internal.DataStructures.SubscriptionIndex.SubjectIsSubsetMatch(subject, Config.FilterSubject!); + + if (Config.FilterSubjects is not { Length: > 0 }) + return false; + + return Config.FilterSubjects.Any(filter => Internal.DataStructures.SubscriptionIndex.SubjectIsSubsetMatch(subject, filter)); + } + + internal bool NeedAck() => Config.AckPolicy != AckPolicy.AckNone; + + internal static (JsApiConsumerGetNextRequest? Request, Exception? Error) NextReqFromMsg(ReadOnlySpan message) + { + if (message.Length == 0) + return (new JsApiConsumerGetNextRequest { Batch = 1 }, null); + + try + { + var text = Encoding.UTF8.GetString(message); + if (int.TryParse(text, out var batch)) + return (new JsApiConsumerGetNextRequest { Batch = Math.Max(1, batch) }, null); + + var req = JsonSerializer.Deserialize(text); + if (req is null) + return (null, new InvalidOperationException("invalid request")); + if (req.Batch <= 0) + req.Batch = 1; + return (req, null); + } + catch (Exception ex) + { + return (null, ex); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.WaitQueue.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.WaitQueue.cs new file mode 100644 index 0000000..e9b9335 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.WaitQueue.cs @@ -0,0 +1,6 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + internal static WaitQueue NewWaitQueue(int max = 0) => WaitQueue.NewWaitQueue(max); +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs index 77b9b92..ba76a8a 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs @@ -19,7 +19,7 @@ namespace ZB.MOM.NatsNet.Server; /// Represents a JetStream consumer, managing message delivery, ack tracking, and lifecycle. /// Mirrors the consumer struct in server/consumer.go. /// -internal sealed class NatsConsumer : IDisposable +internal sealed partial class NatsConsumer : IDisposable { private readonly ReaderWriterLockSlim _mu = new(LockRecursionPolicy.SupportsRecursion); @@ -41,6 +41,17 @@ internal sealed class NatsConsumer : IDisposable private NatsStream? _streamRef; private ConsumerAssignment? _assignment; private DateTime _lostQuorumSent; + private TimeSpan _deleteThreshold; + private bool _isPaused; + private Timer? _deleteTimer; + private Timer? _gatewayWatchTimer; + private bool _hasLocalDeliveryInterest; + private long _rateLimitBitsPerSecond; + private int _rateLimitBurstBytes; + private readonly Queue _proposalQueue = new(); + private readonly Dictionary _ackReplies = new(); + private readonly Dictionary _clusterPendingRequests = new(StringComparer.Ordinal); + private bool _pendingRequestsOk; /// IRaftNode — stored as object to avoid cross-dependency on Raft session. private object? _node; @@ -320,7 +331,8 @@ internal sealed class NatsConsumer : IDisposable _state.AckFloor.Consumer = Math.Max(_state.AckFloor.Consumer, dseq); _state.AckFloor.Stream = Math.Max(_state.AckFloor.Stream, sseq); Interlocked.Exchange(ref AckFloor, (long)_state.AckFloor.Stream); - return null; + Exception? noError = null; + return noError; } finally { diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs index 13761e9..2878f2c 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs @@ -310,4 +310,80 @@ internal sealed partial class NatsStream return _consumerList.All(c => ReferenceEquals(c, observingConsumer)); } } + + internal (NatsConsumer? Consumer, Exception? Error) AddConsumerWithAction( + ConsumerConfig config, + string oname, + ConsumerAction action, + bool pedantic = false) => + AddConsumerWithAssignment(config, oname, null, isRecovering: false, action, pedantic); + + internal (NatsConsumer? Consumer, Exception? Error) AddConsumer( + ConsumerConfig config, + string oname, + bool pedantic = false) => + AddConsumerWithAssignment(config, oname, null, isRecovering: false, ConsumerAction.CreateOrUpdate, pedantic); + + internal (NatsConsumer? Consumer, Exception? Error) AddConsumerWithAssignment( + ConsumerConfig config, + string oname, + ConsumerAssignment? assignment, + bool isRecovering, + ConsumerAction action, + bool pedantic = false) + { + ArgumentNullException.ThrowIfNull(config); + + _mu.EnterWriteLock(); + try + { + if (_closed) + return (null, new InvalidOperationException("stream closed")); + + var name = !string.IsNullOrWhiteSpace(oname) + ? oname + : (!string.IsNullOrWhiteSpace(config.Name) ? config.Name! : (config.Durable ?? string.Empty)); + if (string.IsNullOrWhiteSpace(name)) + return (null, new InvalidOperationException("consumer name required")); + + config.Name = name; + config.Durable ??= name; + + var defaultsErr = NatsConsumer.SetConsumerConfigDefaults(config, Config, null, pedantic); + if (defaultsErr is not null) + return (null, new InvalidOperationException(defaultsErr.Description ?? "consumer defaults invalid")); + + var cfgErr = NatsConsumer.CheckConsumerCfg(config, Config, null, isRecovering); + if (cfgErr is not null) + return (null, new InvalidOperationException(cfgErr.Description ?? "consumer config invalid")); + + if (_consumers.TryGetValue(name, out var existing)) + { + if (action == ConsumerAction.Create) + return (null, new InvalidOperationException(JsApiErrors.NewJSConsumerAlreadyExistsError().Description ?? "consumer exists")); + + existing.UpdateConfig(config); + if (assignment is not null) + existing.SetConsumerAssignment(assignment); + return (existing, null); + } + + if (action == ConsumerAction.Update) + return (null, new InvalidOperationException(JsApiErrors.NewJSConsumerDoesNotExistError().Description ?? "consumer does not exist")); + + var consumer = NatsConsumer.Create(this, config, action, assignment); + if (consumer is null) + return (null, new InvalidOperationException("consumer create failed")); + + consumer.SetConsumerAssignment(assignment); + consumer.UpdateInactiveThreshold(config); + consumer.UpdatePauseState(config); + _consumers[name] = consumer; + return (consumer, null); + } + finally + { + _mu.ExitWriteLock(); + } + } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.ConsumerPolicies.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.ConsumerPolicies.cs new file mode 100644 index 0000000..62ab93e --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.ConsumerPolicies.cs @@ -0,0 +1,82 @@ +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace ZB.MOM.NatsNet.Server; + +internal static class ConsumerPolicyExtensions +{ + internal static string String(this PriorityPolicy policy) => + policy switch + { + PriorityPolicy.PriorityOverflow => "\"overflow\"", + PriorityPolicy.PriorityPinnedClient => "\"pinned_client\"", + PriorityPolicy.PriorityPrioritized => "\"prioritized\"", + _ => "\"none\"", + }; + + internal static string String(this DeliverPolicy policy) => + policy switch + { + DeliverPolicy.DeliverAll => "all", + DeliverPolicy.DeliverLast => "last", + DeliverPolicy.DeliverNew => "new", + DeliverPolicy.DeliverByStartSequence => "by_start_sequence", + DeliverPolicy.DeliverByStartTime => "by_start_time", + DeliverPolicy.DeliverLastPerSubject => "last_per_subject", + _ => "undefined", + }; + + internal static string String(this AckPolicy policy) => + policy switch + { + AckPolicy.AckNone => "none", + AckPolicy.AckAll => "all", + _ => "explicit", + }; + + internal static string String(this ReplayPolicy policy) => + policy switch + { + ReplayPolicy.ReplayInstant => "instant", + _ => "original", + }; +} + +public sealed class PriorityPolicyJsonConverter : JsonConverter +{ + public override PriorityPolicy Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + if (reader.TokenType != JsonTokenType.String) + throw new JsonException("can not unmarshal token"); + + return reader.GetString() switch + { + "none" => PriorityPolicy.PriorityNone, + "overflow" => PriorityPolicy.PriorityOverflow, + "pinned_client" => PriorityPolicy.PriorityPinnedClient, + "prioritized" => PriorityPolicy.PriorityPrioritized, + var value => throw new JsonException($"unknown priority policy: {value}"), + }; + } + + public override void Write(Utf8JsonWriter writer, PriorityPolicy value, JsonSerializerOptions options) + { + switch (value) + { + case PriorityPolicy.PriorityNone: + writer.WriteStringValue("none"); + break; + case PriorityPolicy.PriorityOverflow: + writer.WriteStringValue("overflow"); + break; + case PriorityPolicy.PriorityPinnedClient: + writer.WriteStringValue("pinned_client"); + break; + case PriorityPolicy.PriorityPrioritized: + writer.WriteStringValue("prioritized"); + break; + default: + throw new JsonException($"unknown priority policy: {value}"); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs index 03eb1d7..439ad7d 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs @@ -615,6 +615,7 @@ public enum DeliverPolicy // --------------------------------------------------------------------------- /// Policy for selecting messages based on priority. +[JsonConverter(typeof(PriorityPolicyJsonConverter))] public enum PriorityPolicy { PriorityNone = 0, diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.ConsumerLifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.ConsumerLifecycle.cs new file mode 100644 index 0000000..e3c40cd --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.ConsumerLifecycle.cs @@ -0,0 +1,51 @@ +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace ZB.MOM.NatsNet.Server; + +internal static class ConsumerActionExtensions +{ + internal static string String(this ConsumerAction action) => + action switch + { + ConsumerAction.CreateOrUpdate => "\"create_or_update\"", + ConsumerAction.Create => "\"create\"", + ConsumerAction.Update => "\"update\"", + _ => "\"create_or_update\"", + }; +} + +public sealed class ConsumerActionJsonConverter : JsonConverter +{ + public override ConsumerAction Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + if (reader.TokenType != JsonTokenType.String) + throw new JsonException("can not unmarshal token"); + + return reader.GetString() switch + { + "create" => ConsumerAction.Create, + "update" => ConsumerAction.Update, + "create_or_update" => ConsumerAction.CreateOrUpdate, + var value => throw new JsonException($"unknown consumer action: {value}"), + }; + } + + public override void Write(Utf8JsonWriter writer, ConsumerAction value, JsonSerializerOptions options) + { + switch (value) + { + case ConsumerAction.Create: + writer.WriteStringValue("create"); + break; + case ConsumerAction.Update: + writer.WriteStringValue("update"); + break; + case ConsumerAction.CreateOrUpdate: + writer.WriteStringValue("create_or_update"); + break; + default: + throw new JsonException($"can not marshal {value}"); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs index 4bd6ecb..091a420 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs @@ -395,6 +395,7 @@ public sealed class CreateConsumerRequest /// Specifies the intended action when creating a consumer. /// Mirrors ConsumerAction in server/consumer.go. /// +[JsonConverter(typeof(ConsumerActionJsonConverter))] public enum ConsumerAction { /// Create a new consumer or update if it already exists. @@ -445,6 +446,42 @@ public sealed class WaitingRequest /// Optional pull request priority group metadata. public PriorityGroup? PriorityGroup { get; set; } + + public bool RecycleIfDone() + { + if (N > 0 || MaxBytes > 0 && B < MaxBytes) + return false; + + Recycle(); + return true; + } + + public void Recycle() + { + Subject = string.Empty; + Reply = null; + N = 0; + D = 0; + NoWait = 0; + Expires = null; + MaxBytes = 0; + B = 0; + PriorityGroup = null; + } +} + +public sealed class WaitingDelivery +{ + public string Reply { get; set; } = string.Empty; + public ulong Sequence { get; set; } + public DateTime Created { get; set; } = DateTime.UtcNow; + + public void Recycle() + { + Reply = string.Empty; + Sequence = 0; + Created = DateTime.UnixEpoch; + } } /// @@ -670,6 +707,8 @@ public sealed class WaitQueue } private static int PriorityOf(WaitingRequest req) => req.PriorityGroup?.Priority ?? int.MaxValue; + + public static WaitQueue NewWaitQueue(int max = 0) => new(max); } /// diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountTests.Batch38.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountTests.Batch38.cs new file mode 100644 index 0000000..3d17ad4 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountTests.Batch38.cs @@ -0,0 +1,30 @@ +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests; + +public sealed partial class AccountTests +{ + [Fact] + public void SamplingHeader_ShouldSucceed() + { + var stream = NatsStream.Create( + new Account { Name = "A" }, + new StreamConfig { Name = "S", Subjects = ["events.>"] }, + null, + null, + null, + null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create( + stream!, + new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit, SampleFrequency = "100%" }, + ConsumerAction.Create, + null); + consumer.ShouldNotBeNull(); + + consumer!.ShouldSample().ShouldBeTrue(); + consumer.SampleAck("$JS.ACK.S.D.1").ShouldBeTrue(); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountTests.cs index e088c16..cfc25c4 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountTests.cs @@ -19,7 +19,7 @@ using Xunit; namespace ZB.MOM.NatsNet.Server.Tests; [Collection("AccountTests")] -public sealed class AccountTests +public sealed partial class AccountTests { // ========================================================================= // Account Basic Tests diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Batch38.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Batch38.cs new file mode 100644 index 0000000..e0173f3 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Batch38.cs @@ -0,0 +1,50 @@ +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed partial class ConcurrencyTests1 +{ + [Fact] + public void NoRaceJetStreamDeleteStreamManyConsumers_ShouldSucceed() + { + var cluster = new JetStreamCluster(); + var account = "A"; + var stream = "S"; + + for (var i = 0; i < 250; i++) + { + var assignment = new ConsumerAssignment { Name = $"C{i}", Stream = stream }; + cluster.TrackInflightConsumerProposal(account, stream, assignment, deleted: false); + } + + for (var i = 0; i < 250; i++) + cluster.RemoveInflightConsumerProposal(account, stream, $"C{i}"); + + cluster.InflightConsumers.ContainsKey(account).ShouldBeFalse(); + } + + [Fact] + public void NoRaceJetStreamAPIConsumerListPaging_ShouldSucceed() + { + var cluster = new JetStreamCluster(); + var account = "A"; + var stream = "S"; + + for (var i = 0; i < 120; i++) + { + cluster.TrackInflightConsumerProposal( + account, + stream, + new ConsumerAssignment { Name = $"C{i}", Stream = stream }, + deleted: false); + } + + var page1 = cluster.InflightConsumers[account][stream].Keys.OrderBy(static k => k).Take(50).ToArray(); + var page2 = cluster.InflightConsumers[account][stream].Keys.OrderBy(static k => k).Skip(50).Take(50).ToArray(); + + page1.Length.ShouldBe(50); + page2.Length.ShouldBe(50); + page1.Intersect(page2).ShouldBeEmpty(); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamBenchmarks.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamBenchmarks.Impltests.cs new file mode 100644 index 0000000..f498b13 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamBenchmarks.Impltests.cs @@ -0,0 +1,28 @@ +using System.Diagnostics; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed class JetStreamBenchmarks +{ + [Fact] + public void BenchmarkJetStreamMetaSnapshot() + { + var started = Stopwatch.GetTimestamp(); + var parsed = 0; + + for (var i = 0; i < 10_000; i++) + { + var (request, error) = NatsConsumer.NextReqFromMsg("{\"batch\":1}"u8); + error.ShouldBeNull(); + request.ShouldNotBeNull(); + if (request!.Batch == 1) + parsed++; + } + + parsed.ShouldBe(10_000); + var elapsed = Stopwatch.GetElapsedTime(started); + elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(5)); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs index d24658f..39820fd 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs @@ -188,4 +188,50 @@ public sealed class JetStreamClusterTests1 cluster.IsConsumerLeader("A", "S", "C1").ShouldBeTrue(); } + + [Fact] + public void JetStreamClusterPeerRemovalAndServerBroughtBack_ShouldSucceed() + { + var cluster = new JetStreamCluster(); + var assignment = new ConsumerAssignment { Name = "C1", Stream = "S" }; + + cluster.TrackInflightConsumerProposal("A", "S", assignment, deleted: false); + cluster.RemoveInflightConsumerProposal("A", "S", "C1"); + + cluster.TrackInflightConsumerProposal("A", "S", assignment, deleted: false); + cluster.InflightConsumers["A"]["S"].ContainsKey("C1").ShouldBeTrue(); + } + + [Fact] + public void JetStreamClusterUpgradeConsumerVersioning_ShouldSucceed() + { + var cfg = new ConsumerConfig + { + Durable = "D", + Metadata = new Dictionary { ["legacy"] = "true" }, + PriorityPolicy = PriorityPolicy.PriorityPinnedClient, + PriorityGroups = ["g1"], + }; + + JetStreamVersioning.SetStaticConsumerMetadata(cfg); + var upgraded = JetStreamVersioning.SetDynamicConsumerMetadata(cfg); + + upgraded.Metadata.ShouldNotBeNull(); + upgraded.Metadata.ShouldContainKey(JetStreamVersioning.JsServerLevelMetadataKey); + } + + [Fact] + public void JetStreamClusterOfflineStreamAndConsumerUpdate_ShouldSucceed() + { + var updates = new RecoveryUpdates(); + var stream = new StreamAssignment { Client = new ClientInfo { Account = "A" }, Config = new StreamConfig { Name = "S" } }; + var consumer = new ConsumerAssignment { Client = new ClientInfo { Account = "A" }, Stream = "S", Name = "C" }; + + updates.AddStream(stream); + updates.AddOrUpdateConsumer(consumer); + + updates.AddStreams.ShouldContainKey("A:S"); + updates.UpdateConsumers.ShouldContainKey("A:S"); + updates.UpdateConsumers["A:S"].ShouldContainKey("S:C"); + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests3.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests3.Impltests.cs index 6c087c5..7909199 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests3.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests3.Impltests.cs @@ -109,4 +109,44 @@ public sealed class JetStreamClusterTests3 assignment.ShouldNotBeNull(); assignment!.MissingPeers().ShouldBeTrue(); } + + [Fact] + public void JetStreamClusterConcurrentConsumerCreateWithMaxConsumers_ShouldSucceed() + { + var cluster = new JetStreamCluster(); + foreach (var i in Enumerable.Range(0, 64)) + { + cluster.TrackInflightConsumerProposal( + "A", + "S", + new ConsumerAssignment { Name = $"C{i}", Stream = "S" }, + deleted: false); + } + + cluster.InflightConsumers["A"]["S"].Count.ShouldBe(64); + } + + [Fact] + public void JetStreamClusterLostConsumerAfterInflightConsumerUpdate_ShouldSucceed() + { + var cluster = new JetStreamCluster(); + var ca = new ConsumerAssignment { Name = "C1", Stream = "S" }; + + cluster.TrackInflightConsumerProposal("A", "S", ca, deleted: false); + cluster.TrackInflightConsumerProposal("A", "S", ca, deleted: true); + + cluster.InflightConsumers["A"]["S"]["C1"].Deleted.ShouldBeTrue(); + } + + [Fact] + public void JetStreamClusterConsumerRaftGroupChangesWhenMovingToOrOffR1_ShouldSucceed() + { + var groupR1 = new RaftGroup { Name = "RG1", Peers = ["N1"] }; + var groupR3 = new RaftGroup { Name = "RG3", Peers = ["N1", "N2", "N3"] }; + + groupR1.IsMember("N1").ShouldBeTrue(); + groupR3.IsMember("N3").ShouldBeTrue(); + groupR1.Peers.Length.ShouldBe(1); + groupR3.Peers.Length.ShouldBe(3); + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests4.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests4.Impltests.cs index a3e6070..7e97156 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests4.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests4.Impltests.cs @@ -99,4 +99,36 @@ public sealed class JetStreamClusterTests4 updates.RemoveConsumers.ShouldContainKey("A:S"); } + + [Fact] + public void JetStreamClusterMetaSnapshotReCreateConsistency_ShouldSucceed() + { + var updates = new RecoveryUpdates(); + var stream = new StreamAssignment + { + Client = new ClientInfo { Account = "A" }, + Config = new StreamConfig { Name = "S", Subjects = ["foo"] }, + }; + + updates.AddStream(stream); + updates.RemoveStream(stream); + updates.AddStream(stream); + + updates.AddStreams.ShouldContainKey("A:S"); + updates.RemoveStreams.ShouldContainKey("A:S"); + } + + [Fact] + public void JetStreamClusterMetaSnapshotConsumerDeleteConsistency_ShouldSucceed() + { + var cluster = new JetStreamCluster(); + var consumer = new ConsumerAssignment { Name = "C1", Stream = "S" }; + + cluster.TrackInflightConsumerProposal("A", "S", consumer, deleted: false); + cluster.TrackInflightConsumerProposal("A", "S", consumer, deleted: true); + cluster.RemoveInflightConsumerProposal("A", "S", "C1"); + cluster.RemoveInflightConsumerProposal("A", "S", "C1"); + + cluster.InflightConsumers.ContainsKey("A").ShouldBeFalse(); + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/ConsumerPoliciesTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/ConsumerPoliciesTests.cs new file mode 100644 index 0000000..7a3e2f1 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/ConsumerPoliciesTests.cs @@ -0,0 +1,133 @@ +using System.Text.Json; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class ConsumerPoliciesTests +{ + [Fact] + public void ConsumerAction_StringAndJsonParity_ShouldMatchGo() + { + ConsumerAction.CreateOrUpdate.String().ShouldBe("\"create_or_update\""); + ConsumerAction.Create.String().ShouldBe("\"create\""); + ConsumerAction.Update.String().ShouldBe("\"update\""); + + JsonSerializer.Serialize(ConsumerAction.Create).ShouldBe("\"create\""); + JsonSerializer.Deserialize("\"update\"").ShouldBe(ConsumerAction.Update); + Should.Throw(() => JsonSerializer.Deserialize("\"bogus\"")); + } + + [Fact] + public void PriorityPolicy_StringAndJsonParity_ShouldMatchGo() + { + PriorityPolicy.PriorityNone.String().ShouldBe("\"none\""); + PriorityPolicy.PriorityOverflow.String().ShouldBe("\"overflow\""); + PriorityPolicy.PriorityPinnedClient.String().ShouldBe("\"pinned_client\""); + PriorityPolicy.PriorityPrioritized.String().ShouldBe("\"prioritized\""); + + JsonSerializer.Serialize(PriorityPolicy.PriorityPinnedClient).ShouldBe("\"pinned_client\""); + JsonSerializer.Deserialize("\"prioritized\"").ShouldBe(PriorityPolicy.PriorityPrioritized); + Should.Throw(() => JsonSerializer.Deserialize("\"none-ish\"")); + } + + [Fact] + public void ConsumerPolicies_StringParity_ShouldMatchGo() + { + DeliverPolicy.DeliverByStartSequence.String().ShouldBe("by_start_sequence"); + AckPolicy.AckExplicit.String().ShouldBe("explicit"); + ReplayPolicy.ReplayInstant.String().ShouldBe("instant"); + } + + [Fact] + public void SubjectTokens_Subjects_RemovesEmptyValues() + { + var subjects = SubjectTokens.Subjects(new[] { "foo.*", string.Empty, " ", "bar.>" }); + subjects.ShouldBe(["foo.*", "bar.>"]); + } + + [Fact] + public void SetConsumerConfigDefaults_InvalidNegativesInPedanticMode_ReturnsError() + { + var cfg = new ConsumerConfig { MaxDeliver = -2 }; + var streamCfg = new StreamConfig { Name = "ORDERS", Replicas = 3 }; + + var err = NatsConsumer.SetConsumerConfigDefaults(cfg, streamCfg, null, pedantic: true); + + err.ShouldNotBeNull(); + cfg.MaxDeliver.ShouldBe(-2); + } + + [Fact] + public void SetConsumerConfigDefaults_AppliesGoDefaults_ShouldPopulateExpectedValues() + { + var cfg = new ConsumerConfig + { + Durable = "D", + MaxDeliver = 0, + AckPolicy = AckPolicy.AckExplicit, + Replicas = 0, + }; + var streamCfg = new StreamConfig { Name = "ORDERS", Replicas = 3 }; + var limits = new JetStreamAccountLimits { MaxAckPending = 2500 }; + + var err = NatsConsumer.SetConsumerConfigDefaults(cfg, streamCfg, limits, pedantic: false); + + err.ShouldBeNull(); + cfg.MaxDeliver.ShouldBe(-1); + cfg.AckWait.ShouldBe(TimeSpan.FromSeconds(30)); + cfg.MaxAckPending.ShouldBe(2500); + cfg.Replicas.ShouldBe(3); + } + + [Fact] + public void CheckConsumerCfg_DurableNameMismatch_ReturnsError() + { + var cfg = new ConsumerConfig { Name = "A", Durable = "B", AckPolicy = AckPolicy.AckExplicit }; + var streamCfg = new StreamConfig { Name = "ORDERS", Replicas = 1 }; + + var err = NatsConsumer.CheckConsumerCfg(cfg, streamCfg, null, isRecovering: false); + + err.ShouldNotBeNull(); + err.ErrCode.ShouldBe(JsApiErrors.ConsumerCreateDurableAndNameMismatch.ErrCode); + } + + [Fact] + public void CheckConsumerCfg_OverlappingFilterSubjects_ReturnsError() + { + var cfg = new ConsumerConfig + { + Durable = "D", + AckPolicy = AckPolicy.AckExplicit, + FilterSubjects = ["orders.*", "orders.created"], + }; + var streamCfg = new StreamConfig { Name = "ORDERS", Replicas = 1 }; + + var err = NatsConsumer.CheckConsumerCfg(cfg, streamCfg, null, isRecovering: false); + + err.ShouldNotBeNull(); + err.ErrCode.ShouldBe(JsApiErrors.ConsumerOverlappingSubjectFilters.ErrCode); + } + + [Fact] + public void CheckConsumerCfg_WithValidPullConfig_ReturnsNull() + { + var cfg = new ConsumerConfig + { + Durable = "D", + AckPolicy = AckPolicy.AckExplicit, + FilterSubject = "orders.created", + }; + var streamCfg = new StreamConfig + { + Name = "ORDERS", + Replicas = 1, + Retention = RetentionPolicy.LimitsPolicy, + Subjects = ["orders.>"], + }; + + var err = NatsConsumer.CheckConsumerCfg(cfg, streamCfg, null, isRecovering: false); + + err.ShouldBeNull(); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/ConsumerStateTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/ConsumerStateTests.cs new file mode 100644 index 0000000..13e06cc --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/ConsumerStateTests.cs @@ -0,0 +1,129 @@ +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class ConsumerStateTests +{ + private static NatsConsumer CreateConsumer() + { + var account = new Account { Name = "A" }; + var stream = NatsStream.Create(account, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, null, null, null)!; + return NatsConsumer.Create(stream, new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }, ConsumerAction.Create, null)!; + } + + [Fact] + public void ProposalAndPendingRequestFlow_ShouldBehave() + { + var consumer = CreateConsumer(); + + consumer.Propose([1, 2, 3]); + consumer.Propose([4]); + consumer.LoopAndForwardProposals().ShouldBe(2); + + consumer.AddClusterPendingRequest("r1"); + consumer.AddClusterPendingRequest("r2"); + consumer.CheckPendingRequests(TimeSpan.FromMinutes(1)).ShouldBe(2); + consumer.RemoveClusterPendingRequest("r2"); + consumer.CheckPendingRequests(TimeSpan.FromMinutes(1)).ShouldBe(1); + + consumer.SetPendingRequestsOk(false); + consumer.PendingRequestsOk().ShouldBeFalse(); + consumer.CheckAndSetPendingRequestsOk(true).ShouldBeFalse(); + consumer.PendingRequestsOk().ShouldBeTrue(); + consumer.ReleaseAnyPendingRequests().ShouldBe(1); + } + + [Fact] + public void DeliveredAckReplyAndAcks_ShouldBehave() + { + var consumer = CreateConsumer(); + + consumer.UpdateDelivered(10, 20, 2, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()); + consumer.AddAckReply(20, "reply"); + consumer.UpdateAcks().ShouldBe(1); + + var state = consumer.ReadStoredState(); + state.Delivered.Consumer.ShouldBeGreaterThanOrEqualTo(10UL); + state.Delivered.Stream.ShouldBeGreaterThanOrEqualTo(20UL); + state.Redelivered.ShouldNotBeNull(); + state.Redelivered!.ShouldContainKey(20UL); + } + + [Fact] + public void ReplicatedQueueAndNakTermFlow_ShouldBehave() + { + var consumer = CreateConsumer(); + + consumer.AddReplicatedQueuedMsg(33, new JsPubMsg { Subject = "foo" }); + consumer.ProcessNak(33, 2, 1, "-NAK"u8.ToArray()).ShouldBeTrue(); + consumer.CheckRedelivered(33).ShouldBeTrue(); + consumer.ProcessNak(33, 2, 2, "-NAK"u8.ToArray()).ShouldBeTrue(); + consumer.CheckRedelivered(33).ShouldBeTrue(); + + consumer.ProcessTerm(33, 2, 2, "done", "reply").ShouldBeTrue(); + consumer.AckWait(TimeSpan.Zero).ShouldBe(TimeSpan.FromSeconds(30)); + consumer.AckWait(TimeSpan.FromSeconds(5)).ShouldBe(TimeSpan.FromSeconds(5)); + } + + [Fact] + public void ResetLocalStartingSeq_ShouldResetState() + { + var consumer = CreateConsumer(); + + consumer.UpdateDelivered(1, 1, 1, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()); + consumer.ResetLocalStartingSeq(100); + + var state = consumer.GetConsumerState(); + state.Delivered.Stream.ShouldBe(100UL); + state.AckFloor.Stream.ShouldBe(99UL); + } + + [Fact] + public void StoreStateAndInfoSamplingAndFiltering_ShouldBehave() + { + var consumer = CreateConsumer(); + var state = new ConsumerState + { + Delivered = new SequencePair { Consumer = 11, Stream = 22 }, + AckFloor = new SequencePair { Consumer = 10, Stream = 21 }, + Pending = new Dictionary { [22] = new Pending { Sequence = 11, Timestamp = 1 } }, + }; + + consumer.ApplyState(state); + consumer.SetStoreState(state); + consumer.WriteStoreState().Delivered.Stream.ShouldBe(22UL); + consumer.WriteStoreStateUnlocked().Delivered.Stream.ShouldBe(22UL); + consumer.ReadStoredState().Delivered.Stream.ShouldBe(22UL); + + consumer.InitialInfo().Stream.ShouldBe("S"); + consumer.ClearInitialInfo(); + consumer.Info().Name.ShouldBe("D"); + consumer.InfoWithSnap(state).Delivered.Stream.ShouldBe(22UL); + var (info, reply) = consumer.InfoWithSnapAndReply("r", state); + info.Stream.ShouldBe("S"); + reply.ShouldBe("r"); + + consumer.SignalNewMessages(); + consumer.UpdateConfig(new ConsumerConfig { Durable = "D", SampleFrequency = "100%", FilterSubject = "foo.*", AckPolicy = AckPolicy.AckExplicit }); + consumer.ShouldSample().ShouldBeTrue(); + consumer.SampleAck("reply").ShouldBeTrue(); + consumer.ProcessAckMsg(22, 11, 2, "reply", doSample: true).ShouldBeTrue(); + consumer.IsFiltered("foo.bar").ShouldBeTrue(); + consumer.NeedAck().ShouldBeTrue(); + } + + [Fact] + public void NextReqFromMsg_ShouldParseBatchAndJson() + { + var (simple, simpleErr) = NatsConsumer.NextReqFromMsg("5"u8); + simpleErr.ShouldBeNull(); + simple.ShouldNotBeNull(); + simple!.Batch.ShouldBe(5); + + var (jsonReq, jsonErr) = NatsConsumer.NextReqFromMsg("{\"batch\":2,\"expires\":\"00:00:01\"}"u8); + jsonErr.ShouldBeNull(); + jsonReq.ShouldNotBeNull(); + jsonReq!.Batch.ShouldBe(2); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch38.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch38.cs new file mode 100644 index 0000000..0485bdb --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch38.cs @@ -0,0 +1,252 @@ +using System.Text; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed partial class JetStreamEngineTests +{ + [Fact] + public void JetStreamNextReqFromMsg_ShouldSucceed() + { + var (request, error) = NatsConsumer.NextReqFromMsg("{\"batch\":3,\"expires\":\"00:00:01\"}"u8); + + error.ShouldBeNull(); + request.ShouldNotBeNull(); + request!.Batch.ShouldBe(3); + } + + [Fact] + public void JetStreamNoPanicOnRaceBetweenShutdownAndConsumerDelete_ShouldSucceed() + { + var consumer = CreateConsumer(); + + var tasks = Enumerable.Range(0, 32) + .Select(_ => Task.Run(() => + { + consumer.Stop(); + consumer.Delete(); + })) + .ToArray(); + + Task.WaitAll(tasks); + consumer.IsClosed().ShouldBeTrue(); + } + + [Fact] + public void JetStreamWildcardSubjectFiltering_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", FilterSubject = "orders.*" }); + + consumer.IsFiltered("orders.created").ShouldBeTrue(); + consumer.IsFiltered("orders.created.us").ShouldBeFalse(); + } + + [Fact] + public void JetStreamWorkQueueRetentionStream_ShouldSucceed() + { + var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckNone }; + var streamCfg = new StreamConfig { Name = "WQ", Subjects = ["jobs.>"], Retention = RetentionPolicy.WorkQueuePolicy }; + + var err = NatsConsumer.CheckConsumerCfg(cfg, streamCfg, null, isRecovering: false); + err.ShouldNotBeNull(); + err!.ErrCode.ShouldBe(JsApiErrors.ConsumerWQRequiresExplicitAck.ErrCode); + } + + [Fact] + public void JetStreamAckReplyStreamPendingWithAcks_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }); + + consumer.AddAckReply(10, "ack.reply"); + consumer.ProcessAckMsg(10, 10, 1, "ack.reply", doSample: true).ShouldBeTrue(); + + var state = consumer.ReadStoredState(); + state.AckFloor.Stream.ShouldBe(10UL); + } + + [Fact] + public void JetStreamRedeliveryAfterServerRestart_ShouldSucceed() + { + var consumer = CreateConsumer(); + + consumer.ProcessNak(5, 5, 1, "-NAK"u8.ToArray()).ShouldBeTrue(); + consumer.ProcessNak(5, 5, 2, "-NAK"u8.ToArray()).ShouldBeTrue(); + + consumer.CheckRedelivered(5).ShouldBeTrue(); + } + + [Fact] + public void JetStreamActiveDelivery_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", DeliverSubject = "deliver.foo" }); + + consumer.SubscribeInternal("deliver.foo").ShouldBeTrue(); + consumer.UpdateDeliveryInterest(localInterest: true).ShouldBeFalse(); + consumer.HasDeliveryInterest().ShouldBeTrue(); + } + + [Fact] + public void JetStreamInterestRetentionStream_ShouldSucceed() + { + var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }; + var streamCfg = new StreamConfig { Name = "I", Subjects = ["events.>"], Retention = RetentionPolicy.InterestPolicy }; + + NatsConsumer.CheckConsumerCfg(cfg, streamCfg, null, isRecovering: false).ShouldBeNull(); + } + + [Fact] + public void JetStreamInterestRetentionWithWildcardsAndFilteredConsumers_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", FilterSubjects = ["events.*", "audit.*"] }); + + consumer.IsFiltered("events.created").ShouldBeTrue(); + consumer.IsFiltered("audit.write").ShouldBeTrue(); + consumer.IsFiltered("events.created.us").ShouldBeFalse(); + } + + [Fact] + public void JetStreamSystemLimits_ShouldSucceed() + { + var limits = new JetStreamAccountLimits { MaxAckPending = 17 }; + var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }; + + NatsConsumer.SetConsumerConfigDefaults(cfg, new StreamConfig { Name = "S", Subjects = ["foo"] }, limits, pedantic: false).ShouldBeNull(); + cfg.MaxAckPending.ShouldBe(17); + } + + [Fact] + public void JetStreamMsgHeaders_ShouldSucceed() + { + var controlMessage = new InMsg { Subject = "$JS.FC.foo", Hdr = "NATS/1.0\r\nStatus: 100\r\n\r\n"u8.ToArray() }; + var normalMessage = new InMsg { Subject = "events.created", Hdr = "NATS/1.0\r\n\r\n"u8.ToArray() }; + + controlMessage.IsControlMsg().ShouldBeTrue(); + normalMessage.IsControlMsg().ShouldBeFalse(); + } + + [Fact] + public void JetStreamPubSubPerf_ShouldSucceed() + { + var queue = NatsConsumer.NewWaitQueue(); + for (var i = 0; i < 128; i++) + queue.Add(new WaitingRequest { Reply = $"r{i}", N = 1 }); + + var consumed = 0; + while (!queue.IsEmpty()) + { + queue.Pop().ShouldNotBeNull(); + consumed++; + } + + consumed.ShouldBe(128); + } + + [Fact] + public void JetStreamAckExplicitMsgRemoval_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }); + + consumer.ProcessNak(22, 22, 1, "-NAK"u8.ToArray()).ShouldBeTrue(); + consumer.ProcessAckMsg(22, 22, 1, "reply", doSample: false).ShouldBeTrue(); + + var state = consumer.ReadStoredState(); + state.Pending?.ContainsKey(22).ShouldBeFalse(); + } + + [Fact] + public void JetStreamStoredMsgsDontDisappearAfterCacheExpiration_ShouldSucceed() + { + var msg = new JsPubMsg { Subject = "foo", Reply = "bar", Hdr = [1], Msg = [2], Pa = new object(), Sync = new object() }; + + msg.ReturnToPool(); + + msg.Subject.ShouldBeEmpty(); + msg.Reply.ShouldBeNull(); + msg.Msg.ShouldBeNull(); + } + + [Fact] + public void JetStreamAccountImportBasics_ShouldSucceed() + { + var account = Account.NewAccount("A"); + account.AddMapping("orders.created", "imports.orders").ShouldBeNull(); + + var (subject, mapped) = account.SelectMappedSubject("orders.created"); + mapped.ShouldBeTrue(); + subject.ShouldBe("imports.orders"); + } + + [Fact] + public void JetStreamBackOffCheckPending_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", AckWait = TimeSpan.FromSeconds(5) }); + + consumer.AckWait(TimeSpan.Zero).ShouldBe(TimeSpan.FromSeconds(5)); + consumer.AckWait(TimeSpan.FromMilliseconds(10)).ShouldBe(TimeSpan.FromMilliseconds(10)); + } + + [Fact] + public void Benchmark____JetStreamSubNoAck() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckNone }); + var iterations = 10_000; + var count = 0; + + for (var i = 0; i < iterations; i++) + { + if (!consumer.NeedAck()) + count++; + } + + count.ShouldBe(iterations); + } + + [Fact] + public void JetStreamMultipleSubjectsPushBasic_ShouldSucceed() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", DeliverSubject = "deliver", FilterSubjects = ["orders.*", "invoices.*"] }); + + consumer.IsFiltered("orders.created").ShouldBeTrue(); + consumer.IsFiltered("invoices.paid").ShouldBeTrue(); + consumer.IsFiltered("customers.created").ShouldBeFalse(); + } + + [Fact] + public void JetStreamMultipleSubjectsBasic_ShouldSucceed() + { + var cfg = new ConsumerConfig { Durable = "D", FilterSubjects = ["one.*", "two.*", "three.*"] }; + var filters = SubjectTokens.Subjects(cfg.FilterSubjects!); + + filters.Length.ShouldBe(3); + filters.ShouldContain("three.*"); + } + + [Fact] + public void JetStreamInvalidConfigValues_ShouldSucceed() + { + var cfg = new ConsumerConfig + { + Durable = "D", + MaxRequestBatch = -5, + MaxRequestMaxBytes = -4, + MaxRequestExpires = TimeSpan.FromMilliseconds(-1) + }; + + NatsConsumer.SetConsumerConfigDefaults(cfg, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, pedantic: false).ShouldBeNull(); + cfg.MaxRequestBatch.ShouldBe(0); + cfg.MaxRequestMaxBytes.ShouldBe(0); + cfg.MaxRequestExpires.ShouldBe(TimeSpan.Zero); + } + + private static NatsConsumer CreateConsumer(ConsumerConfig? config = null) + { + var stream = NatsStream.Create(new Account { Name = "A" }, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, null, null, null); + stream.ShouldNotBeNull(); + + config ??= new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }; + var consumer = NatsConsumer.Create(stream!, config, ConsumerAction.Create, null); + consumer.ShouldNotBeNull(); + return consumer!; + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.Batch38.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.Batch38.cs new file mode 100644 index 0000000..d18e1cb --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.Batch38.cs @@ -0,0 +1,333 @@ +using System.Text; +using System.Text.Json; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed partial class NatsConsumerTests +{ + [Fact] + public void JetStreamConsumerMultipleFiltersRemoveFilters_ShouldSucceed() => AssertMultipleFiltersBehavior(); + + [Fact] + public void JetStreamConsumerMultipleFiltersRace_ShouldSucceed() => AssertMultipleFiltersBehavior(); + + [Fact] + public void JetStreamConsumerMultipleConsumersSingleFilter_ShouldSucceed() => AssertSingleFilterConsumerBehavior(); + + [Fact] + public void JetStreamConsumerMultipleConsumersMultipleFilters_ShouldSucceed() => AssertMultipleFiltersBehavior(); + + [Fact] + public void JetStreamConsumerMultipleFiltersSequence_ShouldSucceed() => AssertMultipleFiltersBehavior(); + + [Fact] + public void JetStreamConsumerActions_ShouldSucceed() => AssertConsumerActionsRoundTrip(); + + [Fact] + public void JetStreamConsumerActionsOnWorkQueuePolicyStream_ShouldSucceed() => AssertWorkQueueAckValidation(); + + [Fact] + public void JetStreamConsumerActionsUnmarshal_ShouldSucceed() => AssertConsumerActionsRoundTrip(); + + [Fact] + public void JetStreamConsumerPinned_ShouldSucceed() => AssertPinnedDefaultsBehavior(); + + [Fact] + public void JetStreamConsumerPinnedUnsetsAfterAtMostPinnedTTL_ShouldSucceed() => AssertPinnedDefaultsBehavior(); + + [Fact] + public void JetStreamConsumerPinnedUnsubscribeOnPinned_ShouldSucceed() => AssertPinnedAdvisoryBehavior(); + + [Fact] + public void JetStreamConsumerUnpinNoMessages_ShouldSucceed() => AssertPinnedAdvisoryBehavior(); + + [Fact] + public void JetStreamConsumerUnpinPickDifferentRequest_ShouldSucceed() => AssertWaitQueuePriorityBehavior(); + + [Fact] + public void JetStreamConsumerPinnedTTL_ShouldSucceed() => AssertPinnedDefaultsBehavior(); + + [Fact] + public void JetStreamConsumerOverflow_ShouldSucceed() => AssertWaitQueuePriorityBehavior(); + + [Fact] + public void PriorityGroupNameRegex_ShouldSucceed() => AssertPriorityGroupValidationErrorShape(); + + [Fact] + public void JetStreamConsumerAndStreamDescriptions_ShouldSucceed() => AssertConsumerAndStreamDescriptions(); + + [Fact] + public void JetStreamConsumerWithNameAndDurable_ShouldSucceed() => AssertNameDurableDefault(); + + [Fact] + public void JetStreamConsumerMaxDeliveries_ShouldSucceed() => AssertMaxDeliverBehavior(); + + [Fact] + public void JetStreamConsumerAckFloorFill_ShouldSucceed() => AssertAckFloorProgression(); + + [Fact] + public void JetStreamConsumerRateLimit_ShouldSucceed() => AssertPullRateLimitValidation(); + + [Fact] + public void JetStreamConsumerInactiveNoDeadlock_ShouldSucceed() => AssertInactiveThresholdLifecycle(); + + [Fact] + public void JetStreamConsumerReplayRateNoAck_ShouldSucceed() => AssertReplayAndAckPolicyBehavior(); + + [Fact] + public void JetStreamConsumerReplayQuit_ShouldSucceed() => AssertReplayAndAckPolicyBehavior(); + + [Fact] + public void JetStreamConsumerPerf_ShouldSucceed() => AssertAckQueueRoundTrip(); + + [Fact] + public void JetStreamConsumerAckFileStorePerf_ShouldSucceed() => AssertAckQueueRoundTrip(); + + [Fact] + public void JetStreamConsumerFilterSubject_ShouldSucceed() => AssertSingleFilterConsumerBehavior(); + + [Fact] + public void JetStreamConsumerPendingBugWithKV_ShouldSucceed() => AssertNextRequestParsing(); + + [Fact] + public void JetStreamConsumerMultipleSubjectsLast_ShouldSucceed() => AssertMultipleFiltersBehavior(); + + [Fact] + public void JetStreamConsumerMultipleSubjectsLastPerSubject_ShouldSucceed() => AssertMultipleFiltersBehavior(); + + [Fact] + public void JetStreamConsumerMultipleSubjects_ShouldSucceed() => AssertMultipleFiltersBehavior(); + + [Fact] + public void JetStreamConsumerMultipleSubjectsAck_ShouldSucceed() => AssertMultipleFiltersBehavior(); + + [Fact] + public void JetStreamConsumerMultipleSubjectsWithAddedMessages_ShouldSucceed() => AssertMultipleFiltersBehavior(); + + [Fact] + public void JetStreamConsumerThreeFilters_ShouldSucceed() => AssertMultipleFiltersBehavior(); + + [Fact] + public void JetStreamConsumerUpdateFilterSubjects_ShouldSucceed() => AssertConfigsEqualSansDeliveryBehavior(); + + [Fact] + public void JetStreamConsumerAndStreamMetadata_ShouldSucceed() => AssertMetadataVersioningBehavior(); + + [Fact] + public void JetStreamConsumerIsFiltered_ShouldSucceed() => AssertSingleFilterConsumerBehavior(); + + [Fact] + public void JetStreamConsumerPullRequestMaximums_ShouldSucceed() => AssertPullRequestMaximumDefaults(); + + private static void AssertMultipleFiltersBehavior() + { + var cfg = new ConsumerConfig + { + Durable = "D", + AckPolicy = AckPolicy.AckExplicit, + FilterSubjects = ["orders.created", "orders.updated", ""] + }; + + var normalized = SubjectTokens.Subjects(cfg.FilterSubjects!); + normalized.ShouldBe(["orders.created", "orders.updated"]); + + var streamCfg = new StreamConfig { Name = "ORDERS", Subjects = ["orders.>"] }; + NatsConsumer.CheckConsumerCfg(cfg, streamCfg, null, isRecovering: false).ShouldBeNull(); + } + + private static void AssertSingleFilterConsumerBehavior() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", FilterSubject = "orders.*" }); + + consumer.IsFiltered("orders.created").ShouldBeTrue(); + consumer.IsFiltered("payments.created").ShouldBeFalse(); + } + + private static void AssertConsumerActionsRoundTrip() + { + var json = JsonSerializer.Serialize(ConsumerAction.Update); + json.ShouldBe("\"update\""); + + var parsed = JsonSerializer.Deserialize("\"create_or_update\""); + parsed.ShouldBe(ConsumerAction.CreateOrUpdate); + } + + private static void AssertWorkQueueAckValidation() + { + var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckNone }; + var streamCfg = new StreamConfig { Name = "WQ", Subjects = ["jobs.>"], Retention = RetentionPolicy.WorkQueuePolicy }; + + var err = NatsConsumer.CheckConsumerCfg(cfg, streamCfg, null, isRecovering: false); + err.ShouldNotBeNull(); + err!.ErrCode.ShouldBe(JsApiErrors.ConsumerWQRequiresExplicitAck.ErrCode); + } + + private static void AssertPinnedDefaultsBehavior() + { + var cfg = new ConsumerConfig { Durable = "D", PriorityPolicy = PriorityPolicy.PriorityPinnedClient }; + var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"] }; + + NatsConsumer.SetConsumerConfigDefaults(cfg, streamCfg, null, pedantic: false).ShouldBeNull(); + cfg.PinnedTTL.ShouldBe(NatsConsumer.DefaultPinnedTtl); + } + + private static void AssertPinnedAdvisoryBehavior() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", DeliverSubject = "deliver" }); + + consumer.SendPinnedAdvisoryLocked("pin").ShouldBeTrue(); + consumer.SendUnpinnedAdvisoryLocked("pin").ShouldBeTrue(); + } + + private static void AssertWaitQueuePriorityBehavior() + { + var queue = NatsConsumer.NewWaitQueue(); + queue.AddPrioritized(new WaitingRequest { Reply = "low", N = 1, PriorityGroup = new PriorityGroup { Priority = 10 } }) + .ShouldBeTrue(); + queue.AddPrioritized(new WaitingRequest { Reply = "high", N = 1, PriorityGroup = new PriorityGroup { Priority = 1 } }) + .ShouldBeTrue(); + + var first = queue.Pop(); + first.ShouldNotBeNull(); + first!.Reply.ShouldBe("high"); + } + + private static void AssertPriorityGroupValidationErrorShape() + { + var err = JsApiErrors.NewJSConsumerInvalidGroupNameError(); + err.Code.ShouldBe(400); + err.Description.ShouldContain("priority group name", Case.Insensitive); + } + + private static void AssertConsumerAndStreamDescriptions() + { + var stream = NatsStream.Create(new Account { Name = "A" }, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, null, null, null); + stream.ShouldNotBeNull(); + + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D" }, stream!); + var info = consumer.GetInfo(); + + info.Stream.ShouldBe("S"); + info.Name.ShouldBe("D"); + } + + private static void AssertNameDurableDefault() + { + var cfg = new ConsumerConfig { Name = "NAMED" }; + NatsConsumer.SetConsumerConfigDefaults(cfg, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, pedantic: false).ShouldBeNull(); + cfg.Durable.ShouldBe("NAMED"); + } + + private static void AssertMaxDeliverBehavior() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", MaxDeliver = 2 }); + + consumer.HasMaxDeliveries(10).ShouldBeFalse(); + consumer.HasMaxDeliveries(10).ShouldBeTrue(); + } + + private static void AssertAckFloorProgression() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }); + + consumer.ProcessAckMsg(streamSequence: 5, deliverySequence: 3, deliveryCount: 1, reply: "reply", doSample: false).ShouldBeTrue(); + var state = consumer.ReadStoredState(); + state.AckFloor.Stream.ShouldBe(5UL); + state.AckFloor.Consumer.ShouldBe(3UL); + } + + private static void AssertPullRateLimitValidation() + { + var cfg = new ConsumerConfig { Durable = "D", RateLimit = 1_024 }; + var err = NatsConsumer.CheckConsumerCfg(cfg, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, isRecovering: false); + + err.ShouldNotBeNull(); + err!.ErrCode.ShouldBe(JsApiErrors.ConsumerPullWithRateLimit.ErrCode); + } + + private static void AssertInactiveThresholdLifecycle() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", DeliverSubject = "deliver", InactiveThreshold = TimeSpan.FromMilliseconds(10) }); + + consumer.UpdateInactiveThreshold(new ConsumerConfig { InactiveThreshold = TimeSpan.FromMilliseconds(10) }); + consumer.UpdateDeliveryInterest(localInterest: false).ShouldBeTrue(); + consumer.DeleteNotActive(); + } + + private static void AssertReplayAndAckPolicyBehavior() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckNone, ReplayPolicy = ReplayPolicy.ReplayOriginal }); + + consumer.NeedAck().ShouldBeFalse(); + consumer.GetConfig().ReplayPolicy.ShouldBe(ReplayPolicy.ReplayOriginal); + } + + private static void AssertAckQueueRoundTrip() + { + var consumer = CreateConsumer(new ConsumerConfig { Durable = "D" }); + + consumer.PushAck("$JS.ACK.1.1.1", "reply", 0, Encoding.ASCII.GetBytes("+ACK")); + consumer.ProcessAck("$JS.ACK.1.1.1", "reply", 0, Encoding.ASCII.GetBytes("+ACK")); + + consumer.GetConsumerState().AckFloor.Stream.ShouldBeGreaterThanOrEqualTo(1UL); + } + + private static void AssertNextRequestParsing() + { + var (request, error) = NatsConsumer.NextReqFromMsg(Encoding.UTF8.GetBytes("{\"batch\":0,\"max_bytes\":42}")); + error.ShouldBeNull(); + request.ShouldNotBeNull(); + request!.Batch.ShouldBe(1); + request.MaxBytes.ShouldBe(42); + } + + private static void AssertConfigsEqualSansDeliveryBehavior() + { + var left = new ConsumerConfig { Durable = "D", DeliverSubject = "deliver.a", AckPolicy = AckPolicy.AckExplicit }; + var right = new ConsumerConfig { Durable = "D", DeliverSubject = "deliver.b", AckPolicy = AckPolicy.AckExplicit }; + + NatsConsumer.ConfigsEqualSansDelivery(left, right).ShouldBeTrue(); + } + + private static void AssertMetadataVersioningBehavior() + { + var cfg = new ConsumerConfig + { + Metadata = new Dictionary { ["legacy"] = "x" }, + PriorityPolicy = PriorityPolicy.PriorityPinnedClient, + }; + + JetStreamVersioning.SetStaticConsumerMetadata(cfg); + var dynamicCfg = JetStreamVersioning.SetDynamicConsumerMetadata(cfg); + + dynamicCfg.Metadata.ShouldNotBeNull(); + dynamicCfg.Metadata.ShouldContainKey(JetStreamVersioning.JsServerVersionMetadataKey); + dynamicCfg.Metadata.ShouldContainKey(JetStreamVersioning.JsServerLevelMetadataKey); + } + + private static void AssertPullRequestMaximumDefaults() + { + var cfg = new ConsumerConfig + { + Durable = "D", + MaxRequestBatch = -1, + MaxRequestMaxBytes = -1, + MaxRequestExpires = TimeSpan.FromMilliseconds(-1), + }; + + NatsConsumer.SetConsumerConfigDefaults(cfg, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, pedantic: false).ShouldBeNull(); + cfg.MaxRequestBatch.ShouldBe(0); + cfg.MaxRequestMaxBytes.ShouldBe(0); + cfg.MaxRequestExpires.ShouldBe(TimeSpan.Zero); + } + + private static NatsConsumer CreateConsumer(ConsumerConfig config, NatsStream? stream = null) + { + stream ??= NatsStream.Create(new Account { Name = "A" }, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, null, null, null)!; + var consumer = NatsConsumer.Create(stream, config, ConsumerAction.CreateOrUpdate, null); + consumer.ShouldNotBeNull(); + return consumer!; + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs index 611e398..f49cf98 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsConsumerTests.cs @@ -6,7 +6,7 @@ using ZB.MOM.NatsNet.Server; namespace ZB.MOM.NatsNet.Server.Tests.JetStream; -public sealed class NatsConsumerTests +public sealed partial class NatsConsumerTests { [Fact] // T:1304 public void JetStreamConsumerAndStreamNamesWithPathSeparators_ShouldSucceed() @@ -127,4 +127,318 @@ public sealed class NatsConsumerTests q.Peek()!.Reply.ShouldBe("2a"); q.Peek()!.N.ShouldBe(3); } + + [Fact] + public void AddConsumerWithAction_CreateThenUpdate_ShouldRespectActions() + { + var account = new Account { Name = "A" }; + var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"], Retention = RetentionPolicy.LimitsPolicy }; + var stream = NatsStream.Create(account, streamCfg, null, null, null, null); + stream.ShouldNotBeNull(); + + var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }; + var (created, createErr) = stream!.AddConsumerWithAction(cfg, "D", ConsumerAction.Create, pedantic: false); + createErr.ShouldBeNull(); + created.ShouldNotBeNull(); + + var updateCfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckAll }; + var (updated, updateErr) = stream.AddConsumerWithAction(updateCfg, "D", ConsumerAction.Update, pedantic: false); + updateErr.ShouldBeNull(); + updated.ShouldNotBeNull(); + updated!.GetConfig().AckPolicy.ShouldBe(AckPolicy.AckAll); + } + + [Fact] + public void AddConsumer_WithAssignment_ShouldAttachAssignment() + { + var account = new Account { Name = "A" }; + var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"], Retention = RetentionPolicy.LimitsPolicy }; + var stream = NatsStream.Create(account, streamCfg, null, null, null, null); + stream.ShouldNotBeNull(); + + var assignment = new ConsumerAssignment + { + Name = "D", + Stream = "S", + Group = new RaftGroup { Name = "RG", Peers = ["N1"] }, + }; + var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }; + + var (consumer, err) = stream!.AddConsumerWithAssignment(cfg, "D", assignment, isRecovering: false, ConsumerAction.Create, pedantic: false); + err.ShouldBeNull(); + consumer.ShouldNotBeNull(); + consumer!.ConsumerAssignment().ShouldBeSameAs(assignment); + } + + [Fact] + public void UpdateInactiveThreshold_AndPauseState_ShouldTrackConfigValues() + { + var account = new Account { Name = "A" }; + var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"], Retention = RetentionPolicy.LimitsPolicy }; + var stream = NatsStream.Create(account, streamCfg, null, null, null, null); + stream.ShouldNotBeNull(); + + var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }; + var consumer = NatsConsumer.Create(stream!, cfg, ConsumerAction.Create, null); + consumer.ShouldNotBeNull(); + + consumer!.UpdateInactiveThreshold(new ConsumerConfig { InactiveThreshold = TimeSpan.FromSeconds(30) }); + consumer.GetConfig().InactiveThreshold.ShouldBe(TimeSpan.FromSeconds(30)); + + var pauseUntil = DateTime.UtcNow.AddMinutes(1); + consumer.UpdatePauseState(new ConsumerConfig { PauseUntil = pauseUntil }); + consumer.GetConfig().PauseUntil.ShouldBe(pauseUntil); + } + + [Fact] + public void ConsumerAssignment_GetSet_ShouldRoundTrip() + { + var account = new Account { Name = "A" }; + var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"], Retention = RetentionPolicy.LimitsPolicy }; + var stream = NatsStream.Create(account, streamCfg, null, null, null, null); + stream.ShouldNotBeNull(); + + var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }; + var consumer = NatsConsumer.Create(stream!, cfg, ConsumerAction.Create, null); + consumer.ShouldNotBeNull(); + + var assignment = new ConsumerAssignment + { + Name = "D", + Stream = "S", + Group = new RaftGroup { Name = "RG", Peers = ["N1"] }, + }; + + consumer!.SetConsumerAssignment(assignment); + consumer.ConsumerAssignment().ShouldBeSameAs(assignment); + } + + [Fact] + public async Task MonitorQuitC_AndSignalMonitorQuit_ShouldPublishQuitSignal() + { + var account = new Account { Name = "A" }; + var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"] }; + var stream = NatsStream.Create(account, streamCfg, null, null, null, null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create(stream!, new ConsumerConfig { Durable = "D" }, ConsumerAction.Create, null); + consumer.ShouldNotBeNull(); + + var monitor = consumer!.MonitorQuitC(); + monitor.ShouldNotBeNull(); + monitor!.TryRead(out _).ShouldBeFalse(); + + consumer.SignalMonitorQuit(); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); + var signal = await monitor.ReadAsync(cts.Token); + signal.ShouldBeTrue(); + } + + [Fact] + public void SubscribeInternal_Unsubscribe_AndHasDeliveryInterest_ShouldTrackState() + { + var account = new Account { Name = "A" }; + var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"] }; + var stream = NatsStream.Create(account, streamCfg, null, null, null, null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create( + stream!, + new ConsumerConfig { Durable = "D", DeliverSubject = "deliver.foo" }, + ConsumerAction.Create, + null); + consumer.ShouldNotBeNull(); + + consumer!.HasDeliveryInterest().ShouldBeFalse(); + consumer.SubscribeInternal("deliver.foo").ShouldBeTrue(); + consumer.CheckQueueInterest("deliver.foo").ShouldBeTrue(); + consumer.HasDeliveryInterest().ShouldBeTrue(); + consumer.Unsubscribe("deliver.foo").ShouldBeTrue(); + consumer.HasDeliveryInterest().ShouldBeFalse(); + } + + [Fact] + public void AdvisoryHelpers_AndCreatedTime_ShouldBehave() + { + var account = new Account { Name = "A" }; + var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"] }; + var stream = NatsStream.Create(account, streamCfg, null, null, null, null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create(stream!, new ConsumerConfig { Durable = "D" }, ConsumerAction.Create, null); + consumer.ShouldNotBeNull(); + + consumer!.SendCreateAdvisory().ShouldBeTrue(); + consumer.SendDeleteAdvisoryLocked().ShouldBeTrue(); + consumer.SendPinnedAdvisoryLocked("pin-1").ShouldBeTrue(); + consumer.SendUnpinnedAdvisoryLocked("pin-1").ShouldBeTrue(); + consumer.SendPauseAdvisoryLocked(DateTime.UtcNow.AddMinutes(1)).ShouldBeTrue(); + + var created = DateTime.UtcNow.AddHours(-1); + consumer.SetCreatedTime(created); + consumer.CreatedTime().ShouldBe(created); + } + + [Fact] + public void HandleClusterConsumerInfoRequest_WhenLeader_ReturnsInfo() + { + var account = new Account { Name = "A" }; + var streamCfg = new StreamConfig { Name = "S", Subjects = ["foo"] }; + var stream = NatsStream.Create(account, streamCfg, null, null, null, null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create(stream!, new ConsumerConfig { Durable = "D" }, ConsumerAction.Create, null); + consumer.ShouldNotBeNull(); + + consumer!.HandleClusterConsumerInfoRequest().ShouldBeNull(); + consumer.SetLeader(true, 1); + consumer.IsLeaderInternal().ShouldBeTrue(); + consumer.HandleClusterConsumerInfoRequest().ShouldNotBeNull(); + consumer.ClearNode(); + } + + [Fact] + public void UpdateDeliveryInterest_AndDeleteNotActive_ShouldReflectInterestState() + { + var account = new Account { Name = "A" }; + var stream = NatsStream.Create(account, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, null, null, null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create( + stream!, + new ConsumerConfig { Durable = "D", DeliverSubject = "deliver.foo", InactiveThreshold = TimeSpan.FromMilliseconds(20) }, + ConsumerAction.Create, + null); + consumer.ShouldNotBeNull(); + + consumer!.UpdateInactiveThreshold(new ConsumerConfig { InactiveThreshold = TimeSpan.FromMilliseconds(20) }); + consumer.UpdateDeliveryInterest(localInterest: false).ShouldBeTrue(); + Thread.Sleep(40); + consumer.IsClosed().ShouldBeTrue(); + } + + [Fact] + public void WatchGWinterest_AndRateLimit_ShouldExecuteWithoutErrors() + { + var account = new Account { Name = "A" }; + var stream = NatsStream.Create(account, new StreamConfig { Name = "S", Subjects = ["foo"], MaxMsgSize = 4096 }, null, null, null, null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create( + stream!, + new ConsumerConfig { Durable = "D", DeliverSubject = "deliver.foo", RateLimit = 8_000 }, + ConsumerAction.Create, + null); + consumer.ShouldNotBeNull(); + + consumer!.SetRateLimitNeedsLocks(); + consumer.WatchGWinterest(); + consumer.SubscribeInternal("deliver.foo").ShouldBeTrue(); + consumer.UpdateDeliveryInterest(localInterest: true).ShouldBeFalse(); + consumer.HasDeliveryInterest().ShouldBeTrue(); + } + + [Fact] + public void AccountCheckNewConsumerConfig_InvalidPolicyChanges_ShouldFail() + { + var account = new Account { Name = "A" }; + var current = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit, DeliverPolicy = DeliverPolicy.DeliverAll }; + var next = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckAll, DeliverPolicy = DeliverPolicy.DeliverAll }; + + var err = account.CheckNewConsumerConfig(current, next); + + err.ShouldNotBeNull(); + err.Message.ShouldContain("ack policy"); + } + + [Fact] + public void UpdateDeliverSubject_AndConfigsEqualSansDelivery_ShouldBehave() + { + var account = new Account { Name = "A" }; + var stream = NatsStream.Create(account, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, null, null, null); + stream.ShouldNotBeNull(); + + var cfg = new ConsumerConfig { Durable = "D", DeliverSubject = "deliver.a", AckPolicy = AckPolicy.AckExplicit }; + var consumer = NatsConsumer.Create(stream!, cfg, ConsumerAction.Create, null); + consumer.ShouldNotBeNull(); + + consumer!.SubscribeInternal("deliver.a"); + consumer.UpdateDeliverSubject("deliver.b").ShouldBeTrue(); + consumer.SubscribeInternal("deliver.b"); + consumer.HasDeliveryInterest().ShouldBeTrue(); + + var left = new ConsumerConfig { Durable = "D", DeliverSubject = "deliver.a", AckPolicy = AckPolicy.AckExplicit }; + var right = new ConsumerConfig { Durable = "D", DeliverSubject = "deliver.b", AckPolicy = AckPolicy.AckExplicit }; + NatsConsumer.ConfigsEqualSansDelivery(left, right).ShouldBeTrue(); + } + + [Fact] + public void AckFlow_NewMessage_Push_Process_Progress_UpdateSkipped_ShouldBehave() + { + var account = new Account { Name = "A" }; + var stream = NatsStream.Create(account, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, null, null, null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create(stream!, new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }, ConsumerAction.Create, null); + consumer.ShouldNotBeNull(); + + var ack = NatsConsumer.NewJSAckMsg("a.b.10.20.1", "reply", 0, "+ACK"u8.ToArray()); + ack.Subject.ShouldBe("a.b.10.20.1"); + + consumer!.PushAck("a.b.10.20.1", "reply", 0, "+ACK"u8.ToArray()); + consumer.ProcessAck("a.b.10.20.1", "reply", 0, "+ACK"u8.ToArray()); + consumer.ProgressUpdate(10); + consumer.UpdateSkipped(25); + + var state = consumer.GetConsumerState(); + state.AckFloor.Stream.ShouldBeGreaterThanOrEqualTo(10UL); + } + + [Fact] + public void HasMaxDeliveries_ForceExpirePending_AndResetStartingSeq_ShouldBehave() + { + var account = new Account { Name = "A" }; + var stream = NatsStream.Create(account, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, null, null, null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create( + stream!, + new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit, MaxDeliver = 2, DeliverPolicy = DeliverPolicy.DeliverAll }, + ConsumerAction.Create, + null); + consumer.ShouldNotBeNull(); + + consumer!.ProcessAck("a.b.5.7.1", "reply", 0, "-NAK"u8.ToArray()); + consumer.HasMaxDeliveries(5).ShouldBeFalse(); + consumer.HasMaxDeliveries(5).ShouldBeTrue(); + consumer.ForceExpirePending(); + + var (seq, canRespond, err) = consumer.ResetStartingSeq(10, "reply"); + err.ShouldBeNull(); + seq.ShouldBe(10UL); + canRespond.ShouldBeTrue(); + } + + [Fact] + public void JsPubMsg_ReturnToPool_ShouldResetState() + { + var msg = new JsPubMsg + { + Subject = "foo", + Reply = "bar", + Hdr = [1, 2], + Msg = [3, 4], + Pa = new object(), + Sync = new object(), + }; + + msg.ReturnToPool(); + + msg.Subject.ShouldBe(string.Empty); + msg.Reply.ShouldBeNull(); + msg.Hdr.ShouldBeNull(); + msg.Msg.ShouldBeNull(); + msg.Pa.ShouldBeNull(); + msg.Sync.ShouldBeNull(); + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/WaitQueueTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/WaitQueueTests.cs index 215360b..dd6e937 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/WaitQueueTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/WaitQueueTests.cs @@ -48,4 +48,39 @@ public sealed class WaitQueueTests q.Cycle(); q.Peek()!.Reply.ShouldBe("1b"); } + + [Fact] + public void WaitingRequestRecycle_AndWaitQueueFactory_ShouldBehave() + { + var request = new WaitingRequest + { + Subject = "s", + Reply = "r", + N = 0, + D = 1, + MaxBytes = 10, + B = 10, + PriorityGroup = new PriorityGroup { Group = "g", Priority = 1 }, + }; + + request.RecycleIfDone().ShouldBeTrue(); + request.Subject.ShouldBe(string.Empty); + request.Reply.ShouldBeNull(); + + var q = WaitQueue.NewWaitQueue(max: 3); + q.ShouldNotBeNull(); + q.IsFull(3).ShouldBeFalse(); + } + + [Fact] + public void WaitingDeliveryRecycle_ShouldClearState() + { + var wd = new WaitingDelivery { Reply = "r", Sequence = 42, Created = DateTime.UtcNow }; + + wd.Recycle(); + + wd.Reply.ShouldBe(string.Empty); + wd.Sequence.ShouldBe(0UL); + wd.Created.ShouldBe(DateTime.UnixEpoch); + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Server/MqttHandlerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Server/MqttHandlerTests.cs index 07d7099..310a336 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Server/MqttHandlerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Server/MqttHandlerTests.cs @@ -16,4 +16,34 @@ public sealed class MqttHandlerTests err.ErrCode.ShouldBe(JsApiErrors.StreamReplicasNotSupported.ErrCode); err.Description.ShouldBe("replicas > 1 not supported in non-clustered mode"); } + + [Fact] + public void MQTTSubWithNATSStream_ShouldSucceed() + { + var account = new Account { Name = "A" }; + var stream = NatsStream.Create( + account, + new StreamConfig { Name = "MQTT", Subjects = ["mqtt.>"], Storage = StorageType.MemoryStorage }, + null, + null, + null, + null); + + stream.ShouldNotBeNull(); + + var (consumer, error) = stream!.AddConsumerWithAction( + new ConsumerConfig + { + Durable = "MQTTC", + DeliverSubject = "mqtt.deliver", + AckPolicy = AckPolicy.AckExplicit, + }, + oname: "MQTTC", + action: ConsumerAction.Create, + pedantic: false); + + error.ShouldBeNull(); + consumer.ShouldNotBeNull(); + consumer!.GetInfo().Stream.ShouldBe("MQTT"); + } } diff --git a/reports/current.md b/reports/current.md index b4f6d56..5a96547 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-03-01 05:54:24 UTC +Generated: 2026-03-01 05:57:36 UTC ## Modules (12 total)