// Copyright 2021-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Adapted from server/ipqueue.go in the NATS server Go source.
using System.Collections.Concurrent;
using System.Threading.Channels;
namespace ZB.MOM.NatsNet.Server.Internal;
///
/// Error singletons for IpQueue limit violations.
/// Mirrors errIPQLenLimitReached and errIPQSizeLimitReached.
///
public static class IpQueueErrors
{
public static readonly Exception LenLimitReached =
new InvalidOperationException("IPQ len limit reached");
public static readonly Exception SizeLimitReached =
new InvalidOperationException("IPQ size limit reached");
}
///
/// Generic intra-process queue with a one-slot notification channel.
/// Mirrors ipQueue[T] in server/ipqueue.go.
///
public sealed class IpQueue
{
/// Default maximum size of the recycled backing-list capacity.
public const int DefaultMaxRecycleSize = 4 * 1024;
///
/// Functional option type used by .
/// Mirrors Go ipQueueOpt.
///
public delegate void IpQueueOption(IpQueueOptions options);
///
/// Option bag used by .
/// Mirrors Go ipQueueOpts.
///
public sealed class IpQueueOptions
{
public int MaxRecycleSize { get; set; } = DefaultMaxRecycleSize;
public Func? SizeCalc { get; set; }
public ulong MaxSize { get; set; }
public int MaxLen { get; set; }
}
private long _inprogress;
private readonly object _lock = new();
// Backing list with a logical start position (mirrors slice + pos).
private List? _elts;
private int _pos;
private ulong _sz;
private readonly string _name;
private readonly ConcurrentDictionary? _registry;
// One-slot notification channel (mirrors chan struct{} with capacity 1).
private readonly Channel _ch;
// Single-slot list pool to amortise allocations.
private List? _pooled;
// Options
/// Maximum list capacity to allow recycling.
public int MaxRecycleSize { get; }
private readonly Func? _calc;
private readonly ulong _msz; // size limit
private readonly int _mlen; // length limit
/// Notification channel reader — wait on this to learn items were added.
public ChannelReader Ch => _ch.Reader;
///
/// Option helper that configures maximum recycled backing-list size.
/// Mirrors Go ipqMaxRecycleSize.
///
public static IpQueueOption IpqMaxRecycleSize(int max) =>
options => options.MaxRecycleSize = max;
///
/// Option helper that enables size accounting for queue elements.
/// Mirrors Go ipqSizeCalculation.
///
public static IpQueueOption IpqSizeCalculation(Func calc) =>
options => options.SizeCalc = calc;
///
/// Option helper that limits queue pushes by total accounted size.
/// Mirrors Go ipqLimitBySize.
///
public static IpQueueOption IpqLimitBySize(ulong max) =>
options => options.MaxSize = max;
///
/// Option helper that limits queue pushes by element count.
/// Mirrors Go ipqLimitByLen.
///
public static IpQueueOption IpqLimitByLen(int max) =>
options => options.MaxLen = max;
///
/// Factory wrapper for Go parity.
/// Mirrors newIPQueue.
///
public static IpQueue NewIPQueue(
string name,
ConcurrentDictionary? registry = null,
params IpQueueOption[] options)
{
var opts = new IpQueueOptions();
foreach (var option in options)
option(opts);
return new IpQueue(
name,
registry,
opts.MaxRecycleSize,
opts.SizeCalc,
opts.MaxSize,
opts.MaxLen);
}
///
/// Creates a new queue, optionally registering it in .
/// Mirrors newIPQueue.
///
public IpQueue(
string name,
ConcurrentDictionary? registry = null,
int maxRecycleSize = DefaultMaxRecycleSize,
Func? sizeCalc = null,
ulong maxSize = 0,
int maxLen = 0)
{
MaxRecycleSize = maxRecycleSize;
_calc = sizeCalc;
_msz = maxSize;
_mlen = maxLen;
_name = name;
_registry = registry;
_ch = Channel.CreateBounded(new BoundedChannelOptions(1)
{
FullMode = BoundedChannelFullMode.DropWrite,
SingleReader = false,
SingleWriter = false,
});
registry?.TryAdd(name, this);
}
///
/// Adds an element to the queue.
/// Returns the new logical length and an error if a limit was hit.
/// Mirrors ipQueue.push.
///
public (int len, Exception? error) Push(T e)
{
bool shouldSignal;
int resultLen;
lock (_lock)
{
var l = (_elts?.Count ?? 0) - _pos;
if (_mlen > 0 && l == _mlen)
return (l, IpQueueErrors.LenLimitReached);
if (_calc != null)
{
var sz = _calc(e);
if (_msz > 0 && _sz + sz > _msz)
return (l, IpQueueErrors.SizeLimitReached);
_sz += sz;
}
if (_elts == null)
{
_elts = _pooled ?? new List(32);
_pooled = null;
_pos = 0;
}
_elts.Add(e);
resultLen = _elts.Count - _pos;
shouldSignal = l == 0;
}
if (shouldSignal)
_ch.Writer.TryWrite(true);
return (resultLen, null);
}
///
/// Returns all pending elements and empties the queue.
/// Increments the in-progress counter by the returned count.
/// Mirrors ipQueue.pop.
///
public T[]? Pop()
{
lock (_lock)
{
if (_elts == null) return null;
var count = _elts.Count - _pos;
if (count == 0) return null;
var result = _pos == 0
? _elts.ToArray()
: _elts.GetRange(_pos, count).ToArray();
Interlocked.Add(ref _inprogress, result.Length);
_elts = null;
_pos = 0;
_sz = 0;
return result;
}
}
///
/// Returns the first pending element without bulk-removing the rest.
/// Does NOT affect the in-progress counter.
/// Re-signals the notification channel if more elements remain.
/// Mirrors ipQueue.popOne.
///
public (T value, bool ok) PopOne()
{
lock (_lock)
{
if (_elts == null || _elts.Count - _pos == 0)
return (default!, false);
var e = _elts[_pos];
var remaining = _elts.Count - _pos - 1;
if (_calc != null)
_sz -= _calc(e);
if (remaining > 0)
{
_pos++;
_ch.Writer.TryWrite(true); // re-signal: more items pending
}
else
{
// All consumed — try to pool the backing list.
if (_elts.Capacity <= MaxRecycleSize)
{
_elts.Clear();
_pooled = _elts;
}
_elts = null;
_pos = 0;
_sz = 0;
}
return (e, true);
}
}
///
/// Returns the array obtained via to the pool and
/// decrements the in-progress counter.
/// Mirrors ipQueue.recycle.
///
public void Recycle(T[]? elts)
{
if (elts == null || elts.Length == 0) return;
Interlocked.Add(ref _inprogress, -elts.Length);
}
/// Returns the current logical queue length. Mirrors ipQueue.len.
public int Len()
{
lock (_lock) return (_elts?.Count ?? 0) - _pos;
}
///
/// Returns the total calculated size (only meaningful when a size-calc function was provided).
/// Mirrors ipQueue.size.
///
public ulong Size()
{
lock (_lock) return _sz;
}
///
/// Empties the queue and consumes any pending notification signal.
/// Returns the number of items drained.
/// Mirrors ipQueue.drain.
///
public int Drain()
{
lock (_lock)
{
var count = (_elts?.Count ?? 0) - _pos;
_elts = null;
_pos = 0;
_sz = 0;
_ch.Reader.TryRead(out _); // consume signal
return count;
}
}
///
/// Returns the number of elements currently being processed (popped but not yet recycled).
/// Mirrors ipQueue.inProgress.
///
public long InProgress() => Interlocked.Read(ref _inprogress);
///
/// Removes this queue from the server registry.
/// Push/pop operations remain valid.
/// Mirrors ipQueue.unregister.
///
public void Unregister() => _registry?.TryRemove(_name, out _);
}