// Copyright 2021-2025 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // // Adapted from server/ipqueue.go in the NATS server Go source. using System.Collections.Concurrent; using System.Threading.Channels; namespace ZB.MOM.NatsNet.Server.Internal; /// /// Error singletons for IpQueue limit violations. /// Mirrors errIPQLenLimitReached and errIPQSizeLimitReached. /// public static class IpQueueErrors { public static readonly Exception LenLimitReached = new InvalidOperationException("IPQ len limit reached"); public static readonly Exception SizeLimitReached = new InvalidOperationException("IPQ size limit reached"); } /// /// Generic intra-process queue with a one-slot notification channel. /// Mirrors ipQueue[T] in server/ipqueue.go. /// public sealed class IpQueue { /// Default maximum size of the recycled backing-list capacity. public const int DefaultMaxRecycleSize = 4 * 1024; /// /// Functional option type used by . /// Mirrors Go ipQueueOpt. /// public delegate void IpQueueOption(IpQueueOptions options); /// /// Option bag used by . /// Mirrors Go ipQueueOpts. /// public sealed class IpQueueOptions { public int MaxRecycleSize { get; set; } = DefaultMaxRecycleSize; public Func? SizeCalc { get; set; } public ulong MaxSize { get; set; } public int MaxLen { get; set; } } private long _inprogress; private readonly object _lock = new(); // Backing list with a logical start position (mirrors slice + pos). private List? _elts; private int _pos; private ulong _sz; private readonly string _name; private readonly ConcurrentDictionary? _registry; // One-slot notification channel (mirrors chan struct{} with capacity 1). private readonly Channel _ch; // Single-slot list pool to amortise allocations. private List? _pooled; // Options /// Maximum list capacity to allow recycling. public int MaxRecycleSize { get; } private readonly Func? _calc; private readonly ulong _msz; // size limit private readonly int _mlen; // length limit /// Notification channel reader — wait on this to learn items were added. public ChannelReader Ch => _ch.Reader; /// /// Option helper that configures maximum recycled backing-list size. /// Mirrors Go ipqMaxRecycleSize. /// public static IpQueueOption IpqMaxRecycleSize(int max) => options => options.MaxRecycleSize = max; /// /// Option helper that enables size accounting for queue elements. /// Mirrors Go ipqSizeCalculation. /// public static IpQueueOption IpqSizeCalculation(Func calc) => options => options.SizeCalc = calc; /// /// Option helper that limits queue pushes by total accounted size. /// Mirrors Go ipqLimitBySize. /// public static IpQueueOption IpqLimitBySize(ulong max) => options => options.MaxSize = max; /// /// Option helper that limits queue pushes by element count. /// Mirrors Go ipqLimitByLen. /// public static IpQueueOption IpqLimitByLen(int max) => options => options.MaxLen = max; /// /// Factory wrapper for Go parity. /// Mirrors newIPQueue. /// public static IpQueue NewIPQueue( string name, ConcurrentDictionary? registry = null, params IpQueueOption[] options) { var opts = new IpQueueOptions(); foreach (var option in options) option(opts); return new IpQueue( name, registry, opts.MaxRecycleSize, opts.SizeCalc, opts.MaxSize, opts.MaxLen); } /// /// Creates a new queue, optionally registering it in . /// Mirrors newIPQueue. /// public IpQueue( string name, ConcurrentDictionary? registry = null, int maxRecycleSize = DefaultMaxRecycleSize, Func? sizeCalc = null, ulong maxSize = 0, int maxLen = 0) { MaxRecycleSize = maxRecycleSize; _calc = sizeCalc; _msz = maxSize; _mlen = maxLen; _name = name; _registry = registry; _ch = Channel.CreateBounded(new BoundedChannelOptions(1) { FullMode = BoundedChannelFullMode.DropWrite, SingleReader = false, SingleWriter = false, }); registry?.TryAdd(name, this); } /// /// Adds an element to the queue. /// Returns the new logical length and an error if a limit was hit. /// Mirrors ipQueue.push. /// public (int len, Exception? error) Push(T e) { bool shouldSignal; int resultLen; lock (_lock) { var l = (_elts?.Count ?? 0) - _pos; if (_mlen > 0 && l == _mlen) return (l, IpQueueErrors.LenLimitReached); if (_calc != null) { var sz = _calc(e); if (_msz > 0 && _sz + sz > _msz) return (l, IpQueueErrors.SizeLimitReached); _sz += sz; } if (_elts == null) { _elts = _pooled ?? new List(32); _pooled = null; _pos = 0; } _elts.Add(e); resultLen = _elts.Count - _pos; shouldSignal = l == 0; } if (shouldSignal) _ch.Writer.TryWrite(true); return (resultLen, null); } /// /// Returns all pending elements and empties the queue. /// Increments the in-progress counter by the returned count. /// Mirrors ipQueue.pop. /// public T[]? Pop() { lock (_lock) { if (_elts == null) return null; var count = _elts.Count - _pos; if (count == 0) return null; var result = _pos == 0 ? _elts.ToArray() : _elts.GetRange(_pos, count).ToArray(); Interlocked.Add(ref _inprogress, result.Length); _elts = null; _pos = 0; _sz = 0; return result; } } /// /// Returns the first pending element without bulk-removing the rest. /// Does NOT affect the in-progress counter. /// Re-signals the notification channel if more elements remain. /// Mirrors ipQueue.popOne. /// public (T value, bool ok) PopOne() { lock (_lock) { if (_elts == null || _elts.Count - _pos == 0) return (default!, false); var e = _elts[_pos]; var remaining = _elts.Count - _pos - 1; if (_calc != null) _sz -= _calc(e); if (remaining > 0) { _pos++; _ch.Writer.TryWrite(true); // re-signal: more items pending } else { // All consumed — try to pool the backing list. if (_elts.Capacity <= MaxRecycleSize) { _elts.Clear(); _pooled = _elts; } _elts = null; _pos = 0; _sz = 0; } return (e, true); } } /// /// Returns the array obtained via to the pool and /// decrements the in-progress counter. /// Mirrors ipQueue.recycle. /// public void Recycle(T[]? elts) { if (elts == null || elts.Length == 0) return; Interlocked.Add(ref _inprogress, -elts.Length); } /// Returns the current logical queue length. Mirrors ipQueue.len. public int Len() { lock (_lock) return (_elts?.Count ?? 0) - _pos; } /// /// Returns the total calculated size (only meaningful when a size-calc function was provided). /// Mirrors ipQueue.size. /// public ulong Size() { lock (_lock) return _sz; } /// /// Empties the queue and consumes any pending notification signal. /// Returns the number of items drained. /// Mirrors ipQueue.drain. /// public int Drain() { lock (_lock) { var count = (_elts?.Count ?? 0) - _pos; _elts = null; _pos = 0; _sz = 0; _ch.Reader.TryRead(out _); // consume signal return count; } } /// /// Returns the number of elements currently being processed (popped but not yet recycled). /// Mirrors ipQueue.inProgress. /// public long InProgress() => Interlocked.Read(ref _inprogress); /// /// Removes this queue from the server registry. /// Push/pop operations remain valid. /// Mirrors ipQueue.unregister. /// public void Unregister() => _registry?.TryRemove(_name, out _); }