using System.Text.Json; using System.Text; namespace ZB.MOM.NatsNet.Server; internal sealed partial class NatsConsumer { private static readonly TimeSpan DefaultGatewayInterestInterval = TimeSpan.FromSeconds(1); private ConsumerInfo? _initialInfo; internal bool UpdateDeliveryInterest(bool localInterest) { var interest = HasDeliveryInterest(localInterest); _mu.EnterWriteLock(); try { _hasLocalDeliveryInterest = localInterest; if (_closed || IsPullMode()) return false; var wasActive = !IsPullMode() && _isLeader; if (interest && !wasActive) _updateChannel.Writer.TryWrite(true); if (!interest) _isPaused = false; if (_deleteTimer != null && _deleteThreshold > TimeSpan.Zero && !interest) return true; _deleteTimer?.Dispose(); _deleteTimer = null; if (!interest && _deleteThreshold > TimeSpan.Zero) { _deleteTimer = new Timer(static s => ((NatsConsumer)s!).DeleteNotActive(), this, _deleteThreshold, Timeout.InfiniteTimeSpan); return true; } return false; } finally { _mu.ExitWriteLock(); } } internal void DeleteNotActive() { _mu.EnterReadLock(); try { if (_closed) return; if (IsPushMode()) { if (HasDeliveryInterest()) return; } else { if (_state.Pending is { Count: > 0 }) return; } } finally { _mu.ExitReadLock(); } Delete(); } internal void WatchGWinterest() { var wasActive = HasDeliveryInterest(); if (!_hasLocalDeliveryInterest) { UpdateDeliveryInterest(localInterest: false); if (!wasActive && HasDeliveryInterest()) _updateChannel.Writer.TryWrite(true); } _mu.EnterWriteLock(); try { _gatewayWatchTimer?.Dispose(); _gatewayWatchTimer = new Timer(static s => ((NatsConsumer)s!).WatchGWinterest(), this, DefaultGatewayInterestInterval, Timeout.InfiniteTimeSpan); } finally { _mu.ExitWriteLock(); } } internal bool HasMaxDeliveries(ulong sequence) { _mu.EnterWriteLock(); try { if (Config.MaxDeliver <= 0) return false; _state.Redelivered ??= new Dictionary(); _state.Pending ??= new Dictionary(); var deliveryCount = _state.Redelivered.TryGetValue(sequence, out var redeliveries) ? redeliveries + 1 : 1UL; if (deliveryCount < (ulong)Config.MaxDeliver) { _state.Redelivered[sequence] = deliveryCount; return false; } _state.Redelivered[sequence] = deliveryCount; _state.Pending.Remove(sequence); _updateChannel.Writer.TryWrite(true); return true; } finally { _mu.ExitWriteLock(); } } internal void ForceExpirePending() { _mu.EnterWriteLock(); try { if (_state.Pending is not { Count: > 0 }) return; _state.Redelivered ??= new Dictionary(); foreach (var seq in _state.Pending.Keys.ToArray()) { if (HasMaxDeliveries(seq)) continue; _state.Redelivered[seq] = _state.Redelivered.TryGetValue(seq, out var current) ? current + 1 : 1UL; _state.Pending[seq].Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); } _updateChannel.Writer.TryWrite(true); } finally { _mu.ExitWriteLock(); } } internal void SetRateLimitNeedsLocks() { _mu.EnterWriteLock(); try { SetRateLimit(Config.RateLimit); } finally { _mu.ExitWriteLock(); } } internal void SetRateLimit(ulong bitsPerSecond) { if (bitsPerSecond == 0) { Interlocked.Exchange(ref _rateLimitBitsPerSecond, 0); _rateLimitBurstBytes = 0; return; } Interlocked.Exchange(ref _rateLimitBitsPerSecond, (long)bitsPerSecond); var configuredMax = _streamRef?.Config.MaxMsgSize ?? 0; _rateLimitBurstBytes = configuredMax > 0 ? configuredMax : (int)Math.Max(1024UL, bitsPerSecond / 8UL); } internal bool UpdateDeliverSubject(string? newDeliver) { _mu.EnterWriteLock(); try { return UpdateDeliverSubjectLocked(newDeliver); } finally { _mu.ExitWriteLock(); } } internal bool UpdateDeliverSubjectLocked(string? newDeliver) { if (_closed || IsPullMode() || string.Equals(Config.DeliverSubject, newDeliver, StringComparison.Ordinal)) return false; if (_state.Pending is { Count: > 0 }) ForceExpirePending(); if (!string.IsNullOrWhiteSpace(Config.DeliverSubject)) _internalSubscriptions.Remove(Config.DeliverSubject!); Config.DeliverSubject = string.IsNullOrWhiteSpace(newDeliver) ? null : newDeliver; if (!string.IsNullOrWhiteSpace(Config.DeliverSubject)) _internalSubscriptions.Add(Config.DeliverSubject!); _updateChannel.Writer.TryWrite(true); return true; } internal static bool ConfigsEqualSansDelivery(ConsumerConfig left, ConsumerConfig right) { var l = CloneConfig(left); var r = CloneConfig(right); l.DeliverSubject = null; r.DeliverSubject = null; return JsonSerializer.Serialize(l) == JsonSerializer.Serialize(r); } internal (ulong Sequence, bool CanRespond, Exception? Error) ResetStartingSeq(ulong sequence, string? reply) { _mu.EnterWriteLock(); try { if (sequence == 0) { sequence = _state.AckFloor.Stream + 1; } else { switch (Config.DeliverPolicy) { case DeliverPolicy.DeliverAll: break; case DeliverPolicy.DeliverByStartSequence when sequence < Config.OptStartSeq: return (0, false, new InvalidOperationException("below start seq")); case DeliverPolicy.DeliverByStartTime when Config.OptStartTime.HasValue: if (sequence == 0) return (0, false, new InvalidOperationException("below start time")); break; default: return (0, false, new InvalidOperationException("not allowed")); } } if (sequence == 0) sequence = 1; _state.Delivered.Stream = sequence; _state.Delivered.Consumer = sequence; _state.AckFloor.Stream = sequence > 0 ? sequence - 1 : 0; _state.AckFloor.Consumer = sequence > 0 ? sequence - 1 : 0; _state.Pending = new Dictionary(); _state.Redelivered = new Dictionary(); _updateChannel.Writer.TryWrite(true); _ = reply; return (sequence, true, null); } finally { _mu.ExitWriteLock(); } } private bool HasDeliveryInterest(bool localInterest) { _mu.EnterReadLock(); try { if (string.IsNullOrWhiteSpace(Config.DeliverSubject)) return false; if (localInterest) return true; if (_streamRef?.Account is not { } account) return false; if (account.Server is NatsServer server && server.HasGatewayInterest(account, Config.DeliverSubject!)) return true; return _internalSubscriptions.Contains(Config.DeliverSubject!); } finally { _mu.ExitReadLock(); } } private bool IsPullMode() => string.IsNullOrWhiteSpace(Config.DeliverSubject); private bool IsPushMode() => !IsPullMode(); private static ConsumerConfig CloneConfig(ConsumerConfig cfg) => new() { Durable = cfg.Durable, Name = cfg.Name, Description = cfg.Description, DeliverPolicy = cfg.DeliverPolicy, OptStartSeq = cfg.OptStartSeq, OptStartTime = cfg.OptStartTime, AckPolicy = cfg.AckPolicy, AckWait = cfg.AckWait, MaxDeliver = cfg.MaxDeliver, BackOff = cfg.BackOff?.ToArray(), FilterSubject = cfg.FilterSubject, FilterSubjects = cfg.FilterSubjects?.ToArray(), ReplayPolicy = cfg.ReplayPolicy, RateLimit = cfg.RateLimit, SampleFrequency = cfg.SampleFrequency, MaxWaiting = cfg.MaxWaiting, MaxAckPending = cfg.MaxAckPending, FlowControl = cfg.FlowControl, HeadersOnly = cfg.HeadersOnly, MaxRequestBatch = cfg.MaxRequestBatch, MaxRequestExpires = cfg.MaxRequestExpires, MaxRequestMaxBytes = cfg.MaxRequestMaxBytes, DeliverSubject = cfg.DeliverSubject, DeliverGroup = cfg.DeliverGroup, Heartbeat = cfg.Heartbeat, InactiveThreshold = cfg.InactiveThreshold, Replicas = cfg.Replicas, MemoryStorage = cfg.MemoryStorage, Direct = cfg.Direct, Metadata = cfg.Metadata is null ? null : new Dictionary(cfg.Metadata), PauseUntil = cfg.PauseUntil, PriorityGroups = cfg.PriorityGroups?.ToArray(), PriorityPolicy = cfg.PriorityPolicy, PinnedTTL = cfg.PinnedTTL, }; internal void ResetLocalStartingSeq(ulong sequence) { _mu.EnterWriteLock(); try { _state.Delivered.Stream = sequence; _state.Delivered.Consumer = sequence; _state.AckFloor.Stream = sequence > 0 ? sequence - 1 : 0; _state.AckFloor.Consumer = sequence > 0 ? sequence - 1 : 0; _state.Pending = []; _state.Redelivered = []; } finally { _mu.ExitWriteLock(); } } internal int LoopAndForwardProposals() { _mu.EnterWriteLock(); try { var count = 0; while (_proposalQueue.TryDequeue(out _)) { count++; } return count; } finally { _mu.ExitWriteLock(); } } internal void Propose(byte[] proposal) { ArgumentNullException.ThrowIfNull(proposal); _mu.EnterWriteLock(); try { _proposalQueue.Enqueue((byte[])proposal.Clone()); } finally { _mu.ExitWriteLock(); } } internal void UpdateDelivered(ulong consumerSequence, ulong streamSequence, ulong deliveryCount, long timestamp) { _mu.EnterWriteLock(); try { _state.Delivered.Consumer = Math.Max(_state.Delivered.Consumer, consumerSequence); _state.Delivered.Stream = Math.Max(_state.Delivered.Stream, streamSequence); _state.AckFloor.Consumer = Math.Max(_state.AckFloor.Consumer, consumerSequence > 0 ? consumerSequence - 1 : 0); _state.AckFloor.Stream = Math.Max(_state.AckFloor.Stream, streamSequence > 0 ? streamSequence - 1 : 0); _state.Pending ??= []; _state.Pending[streamSequence] = new Pending { Sequence = consumerSequence, Timestamp = timestamp, }; _state.Redelivered ??= []; if (deliveryCount > 1) _state.Redelivered[streamSequence] = deliveryCount; } finally { _mu.ExitWriteLock(); } } internal void AddAckReply(ulong streamSequence, string reply) { if (string.IsNullOrWhiteSpace(reply)) return; _mu.EnterWriteLock(); try { _ackReplies[streamSequence] = reply; } finally { _mu.ExitWriteLock(); } } internal void AddReplicatedQueuedMsg(ulong sequence, JsPubMsg message) { ArgumentNullException.ThrowIfNull(message); _mu.EnterWriteLock(); try { _state.Pending ??= []; _state.Pending[sequence] = new Pending { Sequence = sequence, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), }; } finally { _mu.ExitWriteLock(); } } internal int UpdateAcks() { _mu.EnterWriteLock(); try { var count = _ackReplies.Count; _ackReplies.Clear(); return count; } finally { _mu.ExitWriteLock(); } } internal void AddClusterPendingRequest(string requestId) { if (string.IsNullOrWhiteSpace(requestId)) return; _mu.EnterWriteLock(); try { _clusterPendingRequests[requestId] = DateTime.UtcNow; } finally { _mu.ExitWriteLock(); } } internal void RemoveClusterPendingRequest(string requestId) { if (string.IsNullOrWhiteSpace(requestId)) return; _mu.EnterWriteLock(); try { _clusterPendingRequests.Remove(requestId); } finally { _mu.ExitWriteLock(); } } internal void SetPendingRequestsOk(bool ok) { _mu.EnterWriteLock(); try { _pendingRequestsOk = ok; } finally { _mu.ExitWriteLock(); } } internal bool PendingRequestsOk() { _mu.EnterReadLock(); try { return _pendingRequestsOk; } finally { _mu.ExitReadLock(); } } internal bool CheckAndSetPendingRequestsOk(bool ok) { _mu.EnterWriteLock(); try { var previous = _pendingRequestsOk; _pendingRequestsOk = ok; return previous; } finally { _mu.ExitWriteLock(); } } internal int CheckPendingRequests(TimeSpan maxAge) { _mu.EnterReadLock(); try { var cutoff = DateTime.UtcNow - maxAge; return _clusterPendingRequests.Values.Count(timestamp => timestamp >= cutoff); } finally { _mu.ExitReadLock(); } } internal int ReleaseAnyPendingRequests() { _mu.EnterWriteLock(); try { var released = _clusterPendingRequests.Count; _clusterPendingRequests.Clear(); _pendingRequestsOk = true; return released; } finally { _mu.ExitWriteLock(); } } internal ConsumerState ReadStoredState() { _mu.EnterReadLock(); try { return GetConsumerState(); } finally { _mu.ExitReadLock(); } } internal void ApplyState(ConsumerState state) { ArgumentNullException.ThrowIfNull(state); _mu.EnterWriteLock(); try { _state = new ConsumerState { Delivered = new SequencePair { Consumer = state.Delivered.Consumer, Stream = state.Delivered.Stream, }, AckFloor = new SequencePair { Consumer = state.AckFloor.Consumer, Stream = state.AckFloor.Stream, }, Pending = state.Pending is null ? null : new Dictionary(state.Pending), Redelivered = state.Redelivered is null ? null : new Dictionary(state.Redelivered), }; } finally { _mu.ExitWriteLock(); } } internal void SetStoreState(ConsumerState state) => ApplyState(state); internal ConsumerState WriteStoreState() { _mu.EnterWriteLock(); try { return WriteStoreStateUnlocked(); } finally { _mu.ExitWriteLock(); } } internal ConsumerState WriteStoreStateUnlocked() => GetConsumerState(); internal ConsumerInfo InitialInfo() { _mu.EnterWriteLock(); try { _initialInfo ??= GetInfo(); return _initialInfo; } finally { _mu.ExitWriteLock(); } } internal void ClearInitialInfo() { _mu.EnterWriteLock(); try { _initialInfo = null; } finally { _mu.ExitWriteLock(); } } internal ConsumerInfo Info() => GetInfo(); internal ConsumerInfo InfoWithSnap(ConsumerState? snapshot = null) { if (snapshot is null) return GetInfo(); var info = GetInfo(); info.Delivered = new SequenceInfo { Consumer = snapshot.Delivered.Consumer, Stream = snapshot.Delivered.Stream }; info.AckFloor = new SequenceInfo { Consumer = snapshot.AckFloor.Consumer, Stream = snapshot.AckFloor.Stream }; return info; } internal (ConsumerInfo Info, string ReplySubject) InfoWithSnapAndReply(string replySubject, ConsumerState? snapshot = null) => (InfoWithSnap(snapshot), replySubject); internal void SignalNewMessages() => _updateChannel.Writer.TryWrite(true); internal bool ShouldSample() { if (string.IsNullOrWhiteSpace(Config.SampleFrequency)) return false; var token = Config.SampleFrequency!.Trim().TrimEnd('%'); if (!int.TryParse(token, out var percent)) return false; return percent > 0; } internal bool SampleAck(string ackReply) { if (!ShouldSample()) return false; if (string.IsNullOrWhiteSpace(ackReply)) return false; AddAckReply((ulong)ackReply.Length, ackReply); return true; } internal bool IsFiltered(string subject) { if (string.IsNullOrWhiteSpace(subject)) return false; if (!string.IsNullOrWhiteSpace(Config.FilterSubject)) return Internal.DataStructures.SubscriptionIndex.SubjectIsSubsetMatch(subject, Config.FilterSubject!); if (Config.FilterSubjects is not { Length: > 0 }) return false; return Config.FilterSubjects.Any(filter => Internal.DataStructures.SubscriptionIndex.SubjectIsSubsetMatch(subject, filter)); } internal bool NeedAck() => Config.AckPolicy != AckPolicy.AckNone; internal static (JsApiConsumerGetNextRequest? Request, Exception? Error) NextReqFromMsg(ReadOnlySpan message) { if (message.Length == 0) return (new JsApiConsumerGetNextRequest { Batch = 1 }, null); try { var text = Encoding.UTF8.GetString(message); if (int.TryParse(text, out var batch)) return (new JsApiConsumerGetNextRequest { Batch = Math.Max(1, batch) }, null); var req = JsonSerializer.Deserialize(text); if (req is null) return (null, new InvalidOperationException("invalid request")); if (req.Batch <= 0) req.Batch = 1; return (req, null); } catch (Exception ex) { return (null, ex); } } }