feat: port session 02 — Utilities & Queues (util, ipqueue, scheduler, subject_transform)

- ServerUtilities: version helpers, parseSize/parseInt64, parseHostPort, URL redaction,
  comma formatting, refCountedUrlSet, TCP helpers, parallelTaskQueue
- IpQueue<T>: generic intra-process queue with 1-slot Channel<bool> notification signal,
  optional size/len limits, ConcurrentDictionary registry, single-slot List<T> pool
- MsgScheduling: per-subject scheduled message tracking via HashWheel TTLs,
  binary encode/decode with zigzag varint, Timer-based firing
- SubjectTransform: full NATS subject mapping engine (11 transform types: Wildcard,
  Partition, SplitFromLeft, SplitFromRight, SliceFromLeft, SliceFromRight, Split,
  Left, Right, Random, NoTransform), FNV-1a partition hash
- 20 tests (7 util, 9 ipqueue, 4 subject_transform); 45 benchmarks/split tests marked n/a
- All 113 tests pass (112 unit + 1 integration)
- DB: features 328/3673 complete, tests 139/3257 complete (8.7% overall)
This commit is contained in:
Joseph Doherty
2026-02-26 09:39:36 -05:00
parent 8050ee1897
commit 11c0b92fbd
10 changed files with 2786 additions and 8 deletions

View File

@@ -0,0 +1,265 @@
// Copyright 2021-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Adapted from server/ipqueue.go in the NATS server Go source.
using System.Collections.Concurrent;
using System.Threading.Channels;
namespace ZB.MOM.NatsNet.Server.Internal;
/// <summary>
/// Error singletons for IpQueue limit violations.
/// Mirrors <c>errIPQLenLimitReached</c> and <c>errIPQSizeLimitReached</c>.
/// </summary>
public static class IpQueueErrors
{
public static readonly Exception LenLimitReached =
new InvalidOperationException("IPQ len limit reached");
public static readonly Exception SizeLimitReached =
new InvalidOperationException("IPQ size limit reached");
}
/// <summary>
/// Generic intra-process queue with a one-slot notification channel.
/// Mirrors <c>ipQueue[T]</c> in server/ipqueue.go.
/// </summary>
public sealed class IpQueue<T>
{
/// <summary>Default maximum size of the recycled backing-list capacity.</summary>
public const int DefaultMaxRecycleSize = 4 * 1024;
private long _inprogress;
private readonly object _lock = new();
// Backing list with a logical start position (mirrors slice + pos).
private List<T>? _elts;
private int _pos;
private ulong _sz;
private readonly string _name;
private readonly ConcurrentDictionary<string, object>? _registry;
// One-slot notification channel (mirrors chan struct{} with capacity 1).
private readonly Channel<bool> _ch;
// Single-slot list pool to amortise allocations.
private List<T>? _pooled;
// Options
/// <summary>Maximum list capacity to allow recycling.</summary>
public int MaxRecycleSize { get; }
private readonly Func<T, ulong>? _calc;
private readonly ulong _msz; // size limit
private readonly int _mlen; // length limit
/// <summary>Notification channel reader — wait on this to learn items were added.</summary>
public ChannelReader<bool> Ch => _ch.Reader;
/// <summary>
/// Creates a new queue, optionally registering it in <paramref name="registry"/>.
/// Mirrors <c>newIPQueue</c>.
/// </summary>
public IpQueue(
string name,
ConcurrentDictionary<string, object>? registry = null,
int maxRecycleSize = DefaultMaxRecycleSize,
Func<T, ulong>? sizeCalc = null,
ulong maxSize = 0,
int maxLen = 0)
{
MaxRecycleSize = maxRecycleSize;
_calc = sizeCalc;
_msz = maxSize;
_mlen = maxLen;
_name = name;
_registry = registry;
_ch = Channel.CreateBounded<bool>(new BoundedChannelOptions(1)
{
FullMode = BoundedChannelFullMode.DropWrite,
SingleReader = false,
SingleWriter = false,
});
registry?.TryAdd(name, this);
}
/// <summary>
/// Adds an element to the queue.
/// Returns the new logical length and an error if a limit was hit.
/// Mirrors <c>ipQueue.push</c>.
/// </summary>
public (int len, Exception? error) Push(T e)
{
bool shouldSignal;
int resultLen;
lock (_lock)
{
var l = (_elts?.Count ?? 0) - _pos;
if (_mlen > 0 && l == _mlen)
return (l, IpQueueErrors.LenLimitReached);
if (_calc != null)
{
var sz = _calc(e);
if (_msz > 0 && _sz + sz > _msz)
return (l, IpQueueErrors.SizeLimitReached);
_sz += sz;
}
if (_elts == null)
{
_elts = _pooled ?? new List<T>(32);
_pooled = null;
_pos = 0;
}
_elts.Add(e);
resultLen = _elts.Count - _pos;
shouldSignal = l == 0;
}
if (shouldSignal)
_ch.Writer.TryWrite(true);
return (resultLen, null);
}
/// <summary>
/// Returns all pending elements and empties the queue.
/// Increments the in-progress counter by the returned count.
/// Mirrors <c>ipQueue.pop</c>.
/// </summary>
public T[]? Pop()
{
lock (_lock)
{
if (_elts == null) return null;
var count = _elts.Count - _pos;
if (count == 0) return null;
var result = _pos == 0
? _elts.ToArray()
: _elts.GetRange(_pos, count).ToArray();
Interlocked.Add(ref _inprogress, result.Length);
_elts = null;
_pos = 0;
_sz = 0;
return result;
}
}
/// <summary>
/// Returns the first pending element without bulk-removing the rest.
/// Does NOT affect the in-progress counter.
/// Re-signals the notification channel if more elements remain.
/// Mirrors <c>ipQueue.popOne</c>.
/// </summary>
public (T value, bool ok) PopOne()
{
lock (_lock)
{
if (_elts == null || _elts.Count - _pos == 0)
return (default!, false);
var e = _elts[_pos];
var remaining = _elts.Count - _pos - 1;
if (_calc != null)
_sz -= _calc(e);
if (remaining > 0)
{
_pos++;
_ch.Writer.TryWrite(true); // re-signal: more items pending
}
else
{
// All consumed — try to pool the backing list.
if (_elts.Capacity <= MaxRecycleSize)
{
_elts.Clear();
_pooled = _elts;
}
_elts = null;
_pos = 0;
_sz = 0;
}
return (e, true);
}
}
/// <summary>
/// Returns the array obtained via <see cref="Pop"/> to the pool and
/// decrements the in-progress counter.
/// Mirrors <c>ipQueue.recycle</c>.
/// </summary>
public void Recycle(T[]? elts)
{
if (elts == null || elts.Length == 0) return;
Interlocked.Add(ref _inprogress, -elts.Length);
}
/// <summary>Returns the current logical queue length. Mirrors <c>ipQueue.len</c>.</summary>
public int Len()
{
lock (_lock) return (_elts?.Count ?? 0) - _pos;
}
/// <summary>
/// Returns the total calculated size (only meaningful when a size-calc function was provided).
/// Mirrors <c>ipQueue.size</c>.
/// </summary>
public ulong Size()
{
lock (_lock) return _sz;
}
/// <summary>
/// Empties the queue and consumes any pending notification signal.
/// Returns the number of items drained.
/// Mirrors <c>ipQueue.drain</c>.
/// </summary>
public int Drain()
{
lock (_lock)
{
var count = (_elts?.Count ?? 0) - _pos;
_elts = null;
_pos = 0;
_sz = 0;
_ch.Reader.TryRead(out _); // consume signal
return count;
}
}
/// <summary>
/// Returns the number of elements currently being processed (popped but not yet recycled).
/// Mirrors <c>ipQueue.inProgress</c>.
/// </summary>
public long InProgress() => Interlocked.Read(ref _inprogress);
/// <summary>
/// Removes this queue from the server registry.
/// Push/pop operations remain valid.
/// Mirrors <c>ipQueue.unregister</c>.
/// </summary>
public void Unregister() => _registry?.TryRemove(_name, out _);
}