From 11c0b92fbdc7cbd2d979344041014b6e849f3201 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 26 Feb 2026 09:39:36 -0500 Subject: [PATCH] =?UTF-8?q?feat:=20port=20session=2002=20=E2=80=94=20Utili?= =?UTF-8?q?ties=20&=20Queues=20(util,=20ipqueue,=20scheduler,=20subject=5F?= =?UTF-8?q?transform)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - ServerUtilities: version helpers, parseSize/parseInt64, parseHostPort, URL redaction, comma formatting, refCountedUrlSet, TCP helpers, parallelTaskQueue - IpQueue: generic intra-process queue with 1-slot Channel notification signal, optional size/len limits, ConcurrentDictionary registry, single-slot List pool - MsgScheduling: per-subject scheduled message tracking via HashWheel TTLs, binary encode/decode with zigzag varint, Timer-based firing - SubjectTransform: full NATS subject mapping engine (11 transform types: Wildcard, Partition, SplitFromLeft, SplitFromRight, SliceFromLeft, SliceFromRight, Split, Left, Right, Random, NoTransform), FNV-1a partition hash - 20 tests (7 util, 9 ipqueue, 4 subject_transform); 45 benchmarks/split tests marked n/a - All 113 tests pass (112 unit + 1 integration) - DB: features 328/3673 complete, tests 139/3257 complete (8.7% overall) --- .../ZB.MOM.NatsNet.Server/Internal/IpQueue.cs | 265 ++++++ .../Internal/MsgScheduling.cs | 431 +++++++++ .../Internal/ServerUtilities.cs | 437 +++++++++ .../Internal/SubjectTransform.cs | 842 ++++++++++++++++++ .../Internal/IpQueueTests.cs | 316 +++++++ .../Internal/ServerUtilitiesTests.cs | 194 ++++ .../Internal/SubjectTransformTests.cs | 256 ++++++ porting.db | Bin 2461696 -> 2465792 bytes reports/current.md | 16 +- reports/report_8050ee1.md | 37 + 10 files changed, 2786 insertions(+), 8 deletions(-) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/Internal/IpQueue.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/Internal/MsgScheduling.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/Internal/ServerUtilities.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/Internal/SubjectTransform.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/IpQueueTests.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/ServerUtilitiesTests.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/SubjectTransformTests.cs create mode 100644 reports/report_8050ee1.md diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/IpQueue.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/IpQueue.cs new file mode 100644 index 0000000..ac1d4bc --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/IpQueue.cs @@ -0,0 +1,265 @@ +// Copyright 2021-2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/ipqueue.go in the NATS server Go source. + +using System.Collections.Concurrent; +using System.Threading.Channels; + +namespace ZB.MOM.NatsNet.Server.Internal; + +/// +/// Error singletons for IpQueue limit violations. +/// Mirrors errIPQLenLimitReached and errIPQSizeLimitReached. +/// +public static class IpQueueErrors +{ + public static readonly Exception LenLimitReached = + new InvalidOperationException("IPQ len limit reached"); + + public static readonly Exception SizeLimitReached = + new InvalidOperationException("IPQ size limit reached"); +} + +/// +/// Generic intra-process queue with a one-slot notification channel. +/// Mirrors ipQueue[T] in server/ipqueue.go. +/// +public sealed class IpQueue +{ + /// Default maximum size of the recycled backing-list capacity. + public const int DefaultMaxRecycleSize = 4 * 1024; + + private long _inprogress; + private readonly object _lock = new(); + + // Backing list with a logical start position (mirrors slice + pos). + private List? _elts; + private int _pos; + private ulong _sz; + + private readonly string _name; + private readonly ConcurrentDictionary? _registry; + + // One-slot notification channel (mirrors chan struct{} with capacity 1). + private readonly Channel _ch; + + // Single-slot list pool to amortise allocations. + private List? _pooled; + + // Options + /// Maximum list capacity to allow recycling. + public int MaxRecycleSize { get; } + + private readonly Func? _calc; + private readonly ulong _msz; // size limit + private readonly int _mlen; // length limit + + /// Notification channel reader — wait on this to learn items were added. + public ChannelReader Ch => _ch.Reader; + + /// + /// Creates a new queue, optionally registering it in . + /// Mirrors newIPQueue. + /// + public IpQueue( + string name, + ConcurrentDictionary? registry = null, + int maxRecycleSize = DefaultMaxRecycleSize, + Func? sizeCalc = null, + ulong maxSize = 0, + int maxLen = 0) + { + MaxRecycleSize = maxRecycleSize; + _calc = sizeCalc; + _msz = maxSize; + _mlen = maxLen; + _name = name; + _registry = registry; + + _ch = Channel.CreateBounded(new BoundedChannelOptions(1) + { + FullMode = BoundedChannelFullMode.DropWrite, + SingleReader = false, + SingleWriter = false, + }); + + registry?.TryAdd(name, this); + } + + /// + /// Adds an element to the queue. + /// Returns the new logical length and an error if a limit was hit. + /// Mirrors ipQueue.push. + /// + public (int len, Exception? error) Push(T e) + { + bool shouldSignal; + int resultLen; + + lock (_lock) + { + var l = (_elts?.Count ?? 0) - _pos; + + if (_mlen > 0 && l == _mlen) + return (l, IpQueueErrors.LenLimitReached); + + if (_calc != null) + { + var sz = _calc(e); + if (_msz > 0 && _sz + sz > _msz) + return (l, IpQueueErrors.SizeLimitReached); + _sz += sz; + } + + if (_elts == null) + { + _elts = _pooled ?? new List(32); + _pooled = null; + _pos = 0; + } + + _elts.Add(e); + resultLen = _elts.Count - _pos; + shouldSignal = l == 0; + } + + if (shouldSignal) + _ch.Writer.TryWrite(true); + + return (resultLen, null); + } + + /// + /// Returns all pending elements and empties the queue. + /// Increments the in-progress counter by the returned count. + /// Mirrors ipQueue.pop. + /// + public T[]? Pop() + { + lock (_lock) + { + if (_elts == null) return null; + var count = _elts.Count - _pos; + if (count == 0) return null; + + var result = _pos == 0 + ? _elts.ToArray() + : _elts.GetRange(_pos, count).ToArray(); + + Interlocked.Add(ref _inprogress, result.Length); + _elts = null; + _pos = 0; + _sz = 0; + return result; + } + } + + /// + /// Returns the first pending element without bulk-removing the rest. + /// Does NOT affect the in-progress counter. + /// Re-signals the notification channel if more elements remain. + /// Mirrors ipQueue.popOne. + /// + public (T value, bool ok) PopOne() + { + lock (_lock) + { + if (_elts == null || _elts.Count - _pos == 0) + return (default!, false); + + var e = _elts[_pos]; + var remaining = _elts.Count - _pos - 1; + + if (_calc != null) + _sz -= _calc(e); + + if (remaining > 0) + { + _pos++; + _ch.Writer.TryWrite(true); // re-signal: more items pending + } + else + { + // All consumed — try to pool the backing list. + if (_elts.Capacity <= MaxRecycleSize) + { + _elts.Clear(); + _pooled = _elts; + } + _elts = null; + _pos = 0; + _sz = 0; + } + + return (e, true); + } + } + + /// + /// Returns the array obtained via to the pool and + /// decrements the in-progress counter. + /// Mirrors ipQueue.recycle. + /// + public void Recycle(T[]? elts) + { + if (elts == null || elts.Length == 0) return; + Interlocked.Add(ref _inprogress, -elts.Length); + } + + /// Returns the current logical queue length. Mirrors ipQueue.len. + public int Len() + { + lock (_lock) return (_elts?.Count ?? 0) - _pos; + } + + /// + /// Returns the total calculated size (only meaningful when a size-calc function was provided). + /// Mirrors ipQueue.size. + /// + public ulong Size() + { + lock (_lock) return _sz; + } + + /// + /// Empties the queue and consumes any pending notification signal. + /// Returns the number of items drained. + /// Mirrors ipQueue.drain. + /// + public int Drain() + { + lock (_lock) + { + var count = (_elts?.Count ?? 0) - _pos; + _elts = null; + _pos = 0; + _sz = 0; + _ch.Reader.TryRead(out _); // consume signal + return count; + } + } + + /// + /// Returns the number of elements currently being processed (popped but not yet recycled). + /// Mirrors ipQueue.inProgress. + /// + public long InProgress() => Interlocked.Read(ref _inprogress); + + /// + /// Removes this queue from the server registry. + /// Push/pop operations remain valid. + /// Mirrors ipQueue.unregister. + /// + public void Unregister() => _registry?.TryRemove(_name, out _); +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/MsgScheduling.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/MsgScheduling.cs new file mode 100644 index 0000000..e621cc7 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/MsgScheduling.cs @@ -0,0 +1,431 @@ +// Copyright 2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/scheduler.go in the NATS server Go source. + +using System.Buffers.Binary; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server.Internal; + +/// +/// Error for when we try to decode a binary-encoded message schedule with an unknown version number. +/// Mirrors ErrMsgScheduleInvalidVersion. +/// +public static class MsgSchedulingErrors +{ + public static readonly Exception ErrMsgScheduleInvalidVersion = + new InvalidOperationException("msg scheduling: encoded version not known"); +} + +/// +/// A single scheduled message entry. +/// Mirrors the unnamed struct in the schedules map in scheduler.go. +/// +internal sealed class MsgSchedule +{ + public ulong Seq; + public long Ts; +} + +/// +/// Tracks per-subject scheduled messages using a hash wheel for TTL management. +/// Mirrors MsgScheduling in server/scheduler.go. +/// Note: getScheduledMessages is deferred to session 08/19 (requires JetStream types). +/// +public sealed class MsgScheduling +{ + private const int HeaderLen = 17; // 1 magic + 2 × uint64 + + private readonly Action _run; + private readonly HashWheel _ttls; + private Timer? _timer; + // _running is set to true by the run callback when getScheduledMessages is active (session 08/19). +#pragma warning disable CS0649 + private bool _running; +#pragma warning restore CS0649 + private long _deadline; + private readonly Dictionary _schedules; + private readonly Dictionary _seqToSubj; + private readonly HashSet _inflight; + + /// + /// Creates a new with the given callback. + /// Mirrors newMsgScheduling. + /// + public MsgScheduling(Action run) + { + _run = run; + _ttls = HashWheel.NewHashWheel(); + _schedules = new Dictionary(); + _seqToSubj = new Dictionary(); + _inflight = new HashSet(); + } + + /// + /// Adds a schedule entry and resets the timer. + /// Mirrors MsgScheduling.add. + /// + public void Add(ulong seq, string subj, long ts) + { + Init(seq, subj, ts); + ResetTimer(); + } + + /// + /// Inserts or updates the schedule entry for the given subject. + /// Mirrors MsgScheduling.init. + /// + public void Init(ulong seq, string subj, long ts) + { + if (_schedules.TryGetValue(subj, out var sched)) + { + _seqToSubj.Remove(sched.Seq); + _ttls.Remove(sched.Seq, sched.Ts); + _ttls.Add(seq, ts); + sched.Ts = ts; + sched.Seq = seq; + } + else + { + _ttls.Add(seq, ts); + _schedules[subj] = new MsgSchedule { Seq = seq, Ts = ts }; + } + _seqToSubj[seq] = subj; + _inflight.Remove(subj); + } + + /// + /// Updates the timestamp for an existing schedule without changing the sequence. + /// Mirrors MsgScheduling.update. + /// + public void Update(string subj, long ts) + { + if (!_schedules.TryGetValue(subj, out var sched)) return; + _ttls.Remove(sched.Seq, sched.Ts); + _ttls.Add(sched.Seq, ts); + sched.Ts = ts; + _inflight.Remove(subj); + ResetTimer(); + } + + /// + /// Marks a subject as in-flight (being processed). + /// Mirrors MsgScheduling.markInflight. + /// + public void MarkInflight(string subj) + { + if (_schedules.ContainsKey(subj)) + _inflight.Add(subj); + } + + /// + /// Returns true if the subject is currently in-flight. + /// Mirrors MsgScheduling.isInflight. + /// + public bool IsInflight(string subj) => _inflight.Contains(subj); + + /// + /// Removes the schedule entry for the given sequence number. + /// Mirrors MsgScheduling.remove. + /// + public void Remove(ulong seq) + { + if (!_seqToSubj.TryGetValue(seq, out var subj)) return; + _seqToSubj.Remove(seq); + _schedules.Remove(subj); + } + + /// + /// Removes the schedule entry for the given subject. + /// Mirrors MsgScheduling.removeSubject. + /// + public void RemoveSubject(string subj) + { + if (!_schedules.TryGetValue(subj, out var sched)) return; + _ttls.Remove(sched.Seq, sched.Ts); + _schedules.Remove(subj); + _seqToSubj.Remove(sched.Seq); + } + + /// + /// Clears all in-flight markers. + /// Mirrors MsgScheduling.clearInflight. + /// + public void ClearInflight() => _inflight.Clear(); + + /// + /// Arms or resets the internal timer to fire at the next scheduled expiration. + /// Mirrors MsgScheduling.resetTimer. + /// + public void ResetTimer() + { + if (_running) return; + + var next = _ttls.GetNextExpiration(long.MaxValue); + if (next == long.MaxValue) + { + ClearTimer(ref _timer); + return; + } + + // Convert nanosecond timestamp to DateTime (1 tick = 100 ns). + var nextTicks = DateTime.UnixEpoch.Ticks + next / 100L; + var nextUtc = new DateTime(nextTicks, DateTimeKind.Utc); + var fireIn = nextUtc - DateTime.UtcNow; + + // Clamp minimum interval. + if (fireIn < TimeSpan.FromMilliseconds(250)) + fireIn = TimeSpan.FromMilliseconds(250); + + var deadline = DateTime.UtcNow.Ticks + fireIn.Ticks; + if (_deadline > 0 && deadline > _deadline) return; + + _deadline = deadline; + if (_timer != null) + _timer.Change(fireIn, Timeout.InfiniteTimeSpan); + else + _timer = new Timer(_ => _run(), null, fireIn, Timeout.InfiniteTimeSpan); + } + + // getScheduledMessages is deferred to session 08/19 — requires JetStream inMsg, StoreMsg types. + + /// + /// Encodes the current schedule state to a binary snapshot. + /// Mirrors MsgScheduling.encode. + /// + public byte[] Encode(ulong highSeq) + { + var count = (ulong)_schedules.Count; + var buf = new List(HeaderLen + (int)(count * 20)); + + buf.Add(1); // magic version + AppendUInt64(buf, count); + AppendUInt64(buf, highSeq); + + foreach (var (subj, sched) in _schedules) + { + var slen = (ushort)Math.Min((ulong)subj.Length, ushort.MaxValue); + AppendUInt16(buf, slen); + buf.AddRange(System.Text.Encoding.Latin1.GetBytes(subj[..slen])); + AppendVarint(buf, sched.Ts); + AppendUvarint(buf, sched.Seq); + } + + return [.. buf]; + } + + /// + /// Decodes a binary snapshot into the current schedule. + /// Returns the high-sequence stamp or throws on error. + /// Mirrors MsgScheduling.decode. + /// + public (ulong highSeq, Exception? err) Decode(byte[] b) + { + if (b.Length < HeaderLen) + return (0, new System.IO.EndOfStreamException("short buffer")); + + if (b[0] != 1) + return (0, MsgSchedulingErrors.ErrMsgScheduleInvalidVersion); + + var count = BinaryPrimitives.ReadUInt64LittleEndian(b.AsSpan(1)); + var stamp = BinaryPrimitives.ReadUInt64LittleEndian(b.AsSpan(9)); + var offset = HeaderLen; + + for (ulong i = 0; i < count; i++) + { + if (offset + 2 > b.Length) + return (0, new System.IO.EndOfStreamException("unexpected EOF")); + + var sl = BinaryPrimitives.ReadUInt16LittleEndian(b.AsSpan(offset)); + offset += 2; + + if (offset + sl > b.Length) + return (0, new System.IO.EndOfStreamException("unexpected EOF")); + + var subj = System.Text.Encoding.Latin1.GetString(b, offset, sl); + offset += sl; + + var (ts, tn) = ReadVarint(b, offset); + if (tn < 0) return (0, new System.IO.EndOfStreamException("unexpected EOF")); + offset += tn; + + var (seq, vn) = ReadUvarint(b, offset); + if (vn < 0) return (0, new System.IO.EndOfStreamException("unexpected EOF")); + offset += vn; + + Init(seq, subj, ts); + } + + return (stamp, null); + } + + /// + /// Parses a message schedule pattern and returns the next fire time, + /// whether it repeats, and whether the pattern was valid. + /// Mirrors parseMsgSchedule. + /// + public static (DateTime next, bool repeat, bool ok) ParseMsgSchedule(string pattern, long ts) + { + if (pattern == string.Empty) + return (default, false, true); + + if (pattern.StartsWith("@at ", StringComparison.Ordinal)) + { + if (DateTime.TryParseExact( + pattern[4..], + "yyyy-MM-ddTHH:mm:ssK", + System.Globalization.CultureInfo.InvariantCulture, + System.Globalization.DateTimeStyles.AdjustToUniversal, + out var t)) + return (t, false, true); + return (default, false, false); + } + + if (pattern.StartsWith("@every ", StringComparison.Ordinal)) + { + if (!TryParseDuration(pattern[7..], out var dur)) + return (default, false, false); + + if (dur.TotalSeconds < 1) + return (default, false, false); + + // Advance past a stale next tick the same way Go does. + var prev = DateTimeOffset.FromUnixTimeMilliseconds(ts / 1_000_000).UtcDateTime; + var next = RoundToSecond(prev).Add(dur); + var now = RoundToSecond(DateTime.UtcNow); + if (next < now) + next = now.Add(dur); + + return (next, true, true); + } + + return (default, false, false); + } + + // ------------------------------------------------------------------------- + // Private helpers + // ------------------------------------------------------------------------- + + private static void ClearTimer(ref Timer? timer) + { + var t = timer; + if (t == null) return; + t.Dispose(); + timer = null; + } + + private static DateTime RoundToSecond(DateTime dt) => + new(dt.Year, dt.Month, dt.Day, dt.Hour, dt.Minute, dt.Second, DateTimeKind.Utc); + + // Naive duration parser for strings like "1s", "500ms", "2m", "1h30m". + private static bool TryParseDuration(string s, out TimeSpan result) + { + result = default; + if (s.EndsWith("ms", StringComparison.Ordinal) && + double.TryParse(s[..^2], out var ms)) + { + result = TimeSpan.FromMilliseconds(ms); + return true; + } + if (s.EndsWith('s') && double.TryParse(s[..^1], out var sec)) + { + result = TimeSpan.FromSeconds(sec); + return true; + } + if (s.EndsWith('m') && double.TryParse(s[..^1], out var min)) + { + result = TimeSpan.FromMinutes(min); + return true; + } + if (s.EndsWith('h') && double.TryParse(s[..^1], out var hr)) + { + result = TimeSpan.FromHours(hr); + return true; + } + // Try .NET TimeSpan.Parse as a fallback. + return TimeSpan.TryParse(s, out result); + } + + // ------------------------------------------------------------------------- + // Binary encoding helpers (mirrors encoding/binary in Go) + // ------------------------------------------------------------------------- + + private static void AppendUInt64(List buf, ulong v) + { + Span tmp = stackalloc byte[8]; + BinaryPrimitives.WriteUInt64LittleEndian(tmp, v); + buf.AddRange(tmp.ToArray()); + } + + private static void AppendUInt16(List buf, ushort v) + { + Span tmp = stackalloc byte[2]; + BinaryPrimitives.WriteUInt16LittleEndian(tmp, v); + buf.AddRange(tmp.ToArray()); + } + + /// Appends a zigzag-encoded signed varint (mirrors binary.AppendVarint). + private static void AppendVarint(List buf, long x) + { + var ux = (ulong)(x << 1); + if (x < 0) ux = ~ux; + AppendUvarint(buf, ux); + } + + /// Appends an unsigned varint (mirrors binary.AppendUvarint). + private static void AppendUvarint(List buf, ulong x) + { + while (x >= 0x80) + { + buf.Add((byte)(x | 0x80)); + x >>= 7; + } + buf.Add((byte)x); + } + + /// + /// Reads a zigzag signed varint from starting at . + /// Returns (value, bytesRead); bytesRead is negative on overflow. + /// + private static (long value, int n) ReadVarint(byte[] b, int offset) + { + var (ux, n) = ReadUvarint(b, offset); + var x = (long)(ux >> 1); + if ((ux & 1) != 0) x = ~x; + return (x, n); + } + + /// + /// Reads an unsigned varint from starting at . + /// Returns (value, bytesRead); bytesRead is negative on overflow. + /// + private static (ulong value, int n) ReadUvarint(byte[] b, int offset) + { + ulong x = 0; + var s = 0; + for (var i = offset; i < b.Length; i++) + { + var by = b[i]; + if (i - offset == 10) return (0, -(i - offset + 1)); // overflow + if (by < 0x80) + { + if (i - offset == 9 && by > 1) return (0, -(i - offset + 1)); + return (x | ((ulong)by << s), i - offset + 1); + } + x |= (ulong)(by & 0x7F) << s; + s += 7; + } + return (0, 0); // short buffer + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/ServerUtilities.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/ServerUtilities.cs new file mode 100644 index 0000000..88ed9be --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/ServerUtilities.cs @@ -0,0 +1,437 @@ +// Copyright 2012-2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/util.go in the NATS server Go source. + +using System.Net; +using System.Text.RegularExpressions; + +namespace ZB.MOM.NatsNet.Server.Internal; + +/// +/// General-purpose server utility methods. +/// Mirrors server/util.go. +/// +public static class ServerUtilities +{ + // Semver validation regex — mirrors semVerRe in const.go. + private static readonly Regex SemVerRe = new( + @"^(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)(?:-((?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+([0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$", + RegexOptions.Compiled); + + // ------------------------------------------------------------------------- + // Version helpers + // ------------------------------------------------------------------------- + + /// + /// Parses a semver string into major/minor/patch components. + /// Returns an error if the string is not a valid semver. + /// Mirrors versionComponents. + /// + public static (int major, int minor, int patch, Exception? err) VersionComponents(string version) + { + var m = SemVerRe.Match(version); + if (!m.Success) + return (0, 0, 0, new InvalidOperationException("invalid semver")); + + if (!int.TryParse(m.Groups[1].Value, out var major) || + !int.TryParse(m.Groups[2].Value, out var minor) || + !int.TryParse(m.Groups[3].Value, out var patch)) + return (-1, -1, -1, new InvalidOperationException("invalid semver component")); + + return (major, minor, patch, null); + } + + /// + /// Returns (true, nil) if is at least major.minor.patch. + /// Mirrors versionAtLeastCheckError. + /// + public static (bool ok, Exception? err) VersionAtLeastCheckError( + string version, int emajor, int eminor, int epatch) + { + var (major, minor, patch, err) = VersionComponents(version); + if (err != null) return (false, err); + + if (major > emajor) return (true, null); + if (major == emajor && minor > eminor) return (true, null); + if (major == emajor && minor == eminor && patch >= epatch) return (true, null); + return (false, null); + } + + /// + /// Returns true if is at least major.minor.patch. + /// Mirrors versionAtLeast. + /// + public static bool VersionAtLeast(string version, int emajor, int eminor, int epatch) + { + var (ok, _) = VersionAtLeastCheckError(version, emajor, eminor, epatch); + return ok; + } + + // ------------------------------------------------------------------------- + // Integer parsing helpers (used for NATS protocol parsing) + // ------------------------------------------------------------------------- + + /// + /// Parses a decimal positive integer from ASCII bytes. + /// Returns -1 on error or if the input contains non-digit characters. + /// Mirrors parseSize. + /// + public static int ParseSize(ReadOnlySpan d) + { + const int MaxParseSizeLen = 9; // 999M + + if (d.IsEmpty || d.Length > MaxParseSizeLen) return -1; + + var n = 0; + foreach (var dec in d) + { + if (dec < '0' || dec > '9') return -1; + n = n * 10 + (dec - '0'); + } + return n; + } + + /// + /// Parses a decimal positive int64 from ASCII bytes. + /// Returns -1 on error. + /// Mirrors parseInt64. + /// + public static long ParseInt64(ReadOnlySpan d) + { + if (d.IsEmpty) return -1; + + long n = 0; + foreach (var dec in d) + { + if (dec < '0' || dec > '9') return -1; + n = n * 10 + (dec - '0'); + } + return n; + } + + // ------------------------------------------------------------------------- + // Duration / network helpers + // ------------------------------------------------------------------------- + + /// + /// Converts float64 seconds to a . + /// Mirrors secondsToDuration. + /// + public static TimeSpan SecondsToDuration(double seconds) => + TimeSpan.FromSeconds(seconds); + + /// + /// Splits "host:port" into components, using + /// when no port (or port 0 / -1) is present. + /// Mirrors parseHostPort. + /// + public static (string host, int port, Exception? err) ParseHostPort(string hostPort, int defaultPort) + { + if (string.IsNullOrEmpty(hostPort)) + return ("", -1, new InvalidOperationException("no hostport specified")); + + // Try splitting; if port is missing, append the default and retry. + string host, sPort; + try + { + var ep = ParseEndpoint(hostPort); + host = ep.host; + sPort = ep.port; + } + catch + { + try + { + var ep = ParseEndpoint($"{hostPort}:{defaultPort}"); + host = ep.host; + sPort = ep.port; + } + catch (Exception ex) + { + return ("", -1, ex); + } + } + + if (!int.TryParse(sPort.Trim(), out var port)) + return ("", -1, new InvalidOperationException($"invalid port: {sPort}")); + + if (port == 0 || port == -1) + port = defaultPort; + + return (host.Trim(), port, null); + } + + private static (string host, string port) ParseEndpoint(string hostPort) + { + // net.SplitHostPort equivalent — handles IPv6 [::1]:port + if (hostPort.StartsWith('[')) + { + var closeIdx = hostPort.IndexOf(']'); + if (closeIdx < 0 || closeIdx + 1 >= hostPort.Length || hostPort[closeIdx + 1] != ':') + throw new InvalidOperationException($"missing port in address {hostPort}"); + return (hostPort[1..closeIdx], hostPort[(closeIdx + 2)..]); + } + + var lastColon = hostPort.LastIndexOf(':'); + if (lastColon < 0) + throw new InvalidOperationException($"missing port in address {hostPort}"); + + var host = hostPort[..lastColon]; + var port = hostPort[(lastColon + 1)..]; + + // Reject bare IPv6 addresses (multiple colons without brackets). + if (host.Contains(':')) + throw new InvalidOperationException($"too many colons in address {hostPort}"); + + return (host, port); + } + + /// + /// Returns true if two instances represent the same URL. + /// Mirrors urlsAreEqual. + /// + public static bool UrlsAreEqual(Uri? u1, Uri? u2) => + u1 == u2 || (u1 != null && u2 != null && u1.ToString() == u2.ToString()); + + // ------------------------------------------------------------------------- + // Comma formatting + // ------------------------------------------------------------------------- + + /// + /// Formats an int64 with comma thousands separators. + /// Mirrors comma in util.go. + /// + public static string Comma(long v) + { + if (v == long.MinValue) return "-9,223,372,036,854,775,808"; + + var sign = ""; + if (v < 0) { sign = "-"; v = -v; } + + var parts = new string[7]; + var j = parts.Length - 1; + + while (v > 999) + { + var part = (v % 1000).ToString(); + parts[j--] = part.Length switch { 2 => "0" + part, 1 => "00" + part, _ => part }; + v /= 1000; + } + parts[j] = v.ToString(); + + return sign + string.Join(",", parts.Skip(j)); + } + + // ------------------------------------------------------------------------- + // TCP helpers + // ------------------------------------------------------------------------- + + /// + /// Creates a TCP listener with keepalives disabled (NATS server default). + /// Mirrors natsListen. + /// + public static System.Net.Sockets.TcpListener NatsListen(string address, int port) + { + // .NET TcpListener does not set keepalive by default; the socket can be + // further configured after creation if needed. + var listener = new System.Net.Sockets.TcpListener(IPAddress.Parse(address), port); + return listener; + } + + /// + /// Opens a TCP connection with the given timeout and keepalives disabled. + /// Mirrors natsDialTimeout. + /// + public static async Task NatsDialTimeoutAsync( + string host, int port, TimeSpan timeout) + { + var client = new System.Net.Sockets.TcpClient(); + // Disable keepalive to match Go 1.12 behavior. + client.Client.SetSocketOption( + System.Net.Sockets.SocketOptionLevel.Socket, + System.Net.Sockets.SocketOptionName.KeepAlive, + false); + + using var cts = new CancellationTokenSource(timeout); + await client.ConnectAsync(host, port, cts.Token); + return client; + } + + // ------------------------------------------------------------------------- + // URL redaction + // ------------------------------------------------------------------------- + + /// + /// Returns a copy of where any URL that + /// contains a password has its password replaced with "xxxxx". + /// Mirrors redactURLList. + /// + public static Uri[] RedactUrlList(Uri[] unredacted) + { + var r = new Uri[unredacted.Length]; + var needCopy = false; + + for (var i = 0; i < unredacted.Length; i++) + { + var u = unredacted[i]; + if (u?.UserInfo?.Contains(':') == true) + { + needCopy = true; + var ui = u.UserInfo; + var colon = ui.IndexOf(':'); + var username = ui[..colon]; + var b = new UriBuilder(u) { Password = "xxxxx", UserName = username }; + r[i] = b.Uri; + } + else + { + r[i] = u!; + } + } + + return needCopy ? r : unredacted; + } + + /// + /// Returns the URL string with the password component redacted ("xxxxx"). + /// Returns the original string if no password is present or it cannot be parsed. + /// Mirrors redactURLString. + /// + public static string RedactUrlString(string raw) + { + if (!raw.Contains('@')) return raw; + if (!Uri.TryCreate(raw, UriKind.Absolute, out var u)) return raw; + + if (!u.UserInfo.Contains(':')) return raw; + + var colon = u.UserInfo.IndexOf(':'); + var username = u.UserInfo[..colon]; + var b = new UriBuilder(u) { Password = "xxxxx", UserName = username }; + var result = b.Uri.ToString(); + // UriBuilder adds a trailing slash for authority-only URLs; strip it if the input had none. + if (!raw.EndsWith('/') && result.EndsWith('/')) + result = result[..^1]; + return result; + } + + /// + /// Returns the Host part of each URL in the list. + /// Mirrors getURLsAsString. + /// + public static string[] GetUrlsAsString(Uri[] urls) + { + var result = new string[urls.Length]; + for (var i = 0; i < urls.Length; i++) + result[i] = urls[i].Authority; // host:port + return result; + } + + // ------------------------------------------------------------------------- + // Copy helpers + // ------------------------------------------------------------------------- + + /// + /// Returns a copy of , or null if src is empty. + /// Mirrors copyBytes. + /// + public static byte[]? CopyBytes(byte[]? src) + { + if (src == null || src.Length == 0) return null; + var dst = new byte[src.Length]; + src.CopyTo(dst, 0); + return dst; + } + + /// + /// Returns a copy of , or null if src is null. + /// Mirrors copyStrings. + /// + public static string[]? CopyStrings(string[]? src) + { + if (src == null) return null; + var dst = new string[src.Length]; + src.CopyTo(dst, 0); + return dst; + } + + // ------------------------------------------------------------------------- + // Parallel task queue + // ------------------------------------------------------------------------- + + /// + /// Creates a bounded channel onto which tasks can be posted for parallel + /// execution across a pool of dedicated threads. Close the returned channel + /// to signal workers to stop (after queued items complete). + /// Mirrors parallelTaskQueue. + /// + public static System.Threading.Channels.ChannelWriter CreateParallelTaskQueue(int maxParallelism = 0) + { + var mp = maxParallelism <= 0 ? Environment.ProcessorCount : Math.Max(Environment.ProcessorCount, maxParallelism); + var channel = System.Threading.Channels.Channel.CreateBounded(mp); + + for (var i = 0; i < mp; i++) + { + Task.Run(async () => + { + await foreach (var fn in channel.Reader.ReadAllAsync()) + fn(); + }); + } + + return channel.Writer; + } +} + +// ------------------------------------------------------------------------- +// RefCountedUrlSet (mirrors refCountedUrlSet map[string]int in util.go) +// ------------------------------------------------------------------------- + +/// +/// A reference-counted set of URL strings used for gossip URL management. +/// Mirrors refCountedUrlSet in server/util.go. +/// +public sealed class RefCountedUrlSet +{ + private readonly Dictionary _map = new(); + + /// + /// Adds . Returns true if it was added for the first time. + /// Mirrors refCountedUrlSet.addUrl. + /// + public bool AddUrl(string urlStr) + { + _map.TryGetValue(urlStr, out var count); + _map[urlStr] = count + 1; + return count == 0; + } + + /// + /// Decrements the reference count for . + /// Returns true if this was the last reference (entry removed). + /// Mirrors refCountedUrlSet.removeUrl. + /// + public bool RemoveUrl(string urlStr) + { + if (!_map.TryGetValue(urlStr, out var count)) return false; + if (count == 1) { _map.Remove(urlStr); return true; } + _map[urlStr] = count - 1; + return false; + } + + /// + /// Returns the unique URL strings currently in the set. + /// Mirrors refCountedUrlSet.getAsStringSlice. + /// + public string[] GetAsStringSlice() => [.. _map.Keys]; +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/SubjectTransform.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/SubjectTransform.cs new file mode 100644 index 0000000..f1ef9c6 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/SubjectTransform.cs @@ -0,0 +1,842 @@ +// Copyright 2023-2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/subject_transform.go in the NATS server Go source. + +using System.Text.RegularExpressions; + +namespace ZB.MOM.NatsNet.Server.Internal; + +// ------------------------------------------------------------------------- +// Subject token constants (mirrors const block in server/sublist.go) +// ------------------------------------------------------------------------- + +internal static class SubjectTokens +{ + internal const char Pwc = '*'; // partial wildcard character + internal const string Pwcs = "*"; // partial wildcard string + internal const char Fwc = '>'; // full wildcard character + internal const string Fwcs = ">"; // full wildcard string + internal const string Tsep = "."; // token separator string + internal const char Btsep = '.'; // token separator character + internal const string Empty = ""; // _EMPTY_ +} + +// ------------------------------------------------------------------------- +// Transform type constants (mirrors enum in subject_transform.go) +// ------------------------------------------------------------------------- + +internal static class TransformType +{ + internal const short NoTransform = 0; + internal const short BadTransform = 1; + internal const short Partition = 2; + internal const short Wildcard = 3; + internal const short SplitFromLeft = 4; + internal const short SplitFromRight = 5; + internal const short SliceFromLeft = 6; + internal const short SliceFromRight = 7; + internal const short Split = 8; + internal const short Left = 9; + internal const short Right = 10; + internal const short Random = 11; +} + +// ------------------------------------------------------------------------- +// ISubjectTransformer interface (mirrors SubjectTransformer in Go) +// ------------------------------------------------------------------------- + +/// +/// Transforms NATS subjects according to a source-to-destination mapping. +/// Mirrors SubjectTransformer in server/subject_transform.go. +/// +public interface ISubjectTransformer +{ + (string result, Exception? err) Match(string subject); + string TransformSubject(string subject); + string TransformTokenizedSubject(string[] tokens); +} + +// ------------------------------------------------------------------------- +// SubjectTransform class +// ------------------------------------------------------------------------- + +/// +/// Subject mapping and transform engine. +/// Mirrors subjectTransform in server/subject_transform.go. +/// +public sealed class SubjectTransform : ISubjectTransformer +{ + private readonly string _src; + private readonly string _dest; + private readonly string[] _dtoks; // destination tokens + private readonly string[] _stoks; // source tokens + private readonly short[] _dtokmftypes; + private readonly int[][] _dtokmftokindexesargs; + private readonly int[] _dtokmfintargs; + private readonly string[] _dtokmfstringargs; + + // Subject mapping function regexes (mirrors var block in Go). + private static readonly Regex CommaSep = new(@",\s*", RegexOptions.Compiled); + private static readonly Regex PartitionRe = new(@"\{\{\s*[pP]artition\s*\((.*)\)\s*\}\}", RegexOptions.Compiled); + private static readonly Regex WildcardRe = new(@"\{\{\s*[wW]ildcard\s*\((.*)\)\s*\}\}", RegexOptions.Compiled); + private static readonly Regex SplitFromLeftRe = new(@"\{\{\s*[sS]plit[fF]rom[lL]eft\s*\((.*)\)\s*\}\}", RegexOptions.Compiled); + private static readonly Regex SplitFromRightRe = new(@"\{\{\s*[sS]plit[fF]rom[rR]ight\s*\((.*)\)\s*\}\}", RegexOptions.Compiled); + private static readonly Regex SliceFromLeftRe = new(@"\{\{\s*[sS]lice[fF]rom[lL]eft\s*\((.*)\)\s*\}\}", RegexOptions.Compiled); + private static readonly Regex SliceFromRightRe = new(@"\{\{\s*[sS]lice[fF]rom[rR]ight\s*\((.*)\)\s*\}\}", RegexOptions.Compiled); + private static readonly Regex SplitRe = new(@"\{\{\s*[sS]plit\s*\((.*)\)\s*\}\}", RegexOptions.Compiled); + private static readonly Regex LeftRe = new(@"\{\{\s*[lL]eft\s*\((.*)\)\s*\}\}", RegexOptions.Compiled); + private static readonly Regex RightRe = new(@"\{\{\s*[rR]ight\s*\((.*)\)\s*\}\}", RegexOptions.Compiled); + private static readonly Regex RandomRe = new(@"\{\{\s*[rR]andom\s*\((.*)\)\s*\}\}", RegexOptions.Compiled); + + private SubjectTransform( + string src, string dest, + string[] dtoks, string[] stoks, + short[] dtokmftypes, int[][] dtokmftokindexesargs, + int[] dtokmfintargs, string[] dtokmfstringargs) + { + _src = src; + _dest = dest; + _dtoks = dtoks; + _stoks = stoks; + _dtokmftypes = dtokmftypes; + _dtokmftokindexesargs = dtokmftokindexesargs; + _dtokmfintargs = dtokmfintargs; + _dtokmfstringargs = dtokmfstringargs; + } + + /// + /// Creates a new transform with optional strict mode. + /// Returns (null, null) when dest is empty (no transform needed). + /// Mirrors NewSubjectTransformWithStrict. + /// + public static (SubjectTransform? transform, Exception? err) NewWithStrict( + string src, string dest, bool strict) + { + if (dest == SubjectTokens.Empty) + return (null, null); + + if (src == SubjectTokens.Empty) + src = SubjectTokens.Fwcs; + + var (sv, stokens, npwcs, hasFwc) = SubjectInfo(src); + var (dv, dtokens, dnpwcs, dHasFwc) = SubjectInfo(dest); + + if (!sv || !dv || dnpwcs > 0 || hasFwc != dHasFwc) + return (null, ServerErrors.ErrBadSubject); + + var dtokMfTypes = new List(); + var dtokMfIndexes = new List(); + var dtokMfIntArgs = new List(); + var dtokMfStringArgs = new List(); + + if (npwcs > 0 || hasFwc) + { + // Build source-token index map for partial wildcards. + var sti = new Dictionary(); + for (var i = 0; i < stokens.Length; i++) + { + if (stokens[i].Length == 1 && stokens[i][0] == SubjectTokens.Pwc) + sti[sti.Count + 1] = i; + } + + var nphs = 0; + foreach (var token in dtokens) + { + var (tt, tidxs, tint, tstr, terr) = IndexPlaceHolders(token); + if (terr != null) return (null, terr); + + if (strict && tt != TransformType.NoTransform && tt != TransformType.Wildcard) + return (null, new MappingDestinationException(token, ServerErrors.ErrMappingDestinationNotSupportedForImport)); + + if (tt == TransformType.NoTransform) + { + dtokMfTypes.Add(TransformType.NoTransform); + dtokMfIndexes.Add([-1]); + dtokMfIntArgs.Add(-1); + dtokMfStringArgs.Add(SubjectTokens.Empty); + } + else if (tt == TransformType.Random) + { + dtokMfTypes.Add(TransformType.Random); + dtokMfIndexes.Add([]); + dtokMfIntArgs.Add(tint); + dtokMfStringArgs.Add(SubjectTokens.Empty); + } + else + { + nphs += tidxs.Length; + var stis = new List(); + foreach (var wildcardIndex in tidxs) + { + if (wildcardIndex > npwcs) + return (null, new MappingDestinationException( + $"{token}: [{wildcardIndex}]", + ServerErrors.ErrMappingDestinationIndexOutOfRange)); + stis.Add(sti.GetValueOrDefault(wildcardIndex, 0)); + } + dtokMfTypes.Add(tt); + dtokMfIndexes.Add([.. stis]); + dtokMfIntArgs.Add(tint); + dtokMfStringArgs.Add(tstr); + } + } + + if (strict && nphs < npwcs) + return (null, new MappingDestinationException(dest, ServerErrors.ErrMappingDestinationNotUsingAllWildcards)); + } + else + { + foreach (var token in dtokens) + { + var (tt, _, tint, _, terr) = IndexPlaceHolders(token); + if (terr != null) return (null, terr); + + if (tt == TransformType.NoTransform) + { + dtokMfTypes.Add(TransformType.NoTransform); + dtokMfIndexes.Add([-1]); + dtokMfIntArgs.Add(-1); + dtokMfStringArgs.Add(SubjectTokens.Empty); + } + else if (tt == TransformType.Random || tt == TransformType.Partition) + { + dtokMfTypes.Add(tt); + dtokMfIndexes.Add([]); + dtokMfIntArgs.Add(tint); + dtokMfStringArgs.Add(SubjectTokens.Empty); + } + else + { + return (null, new MappingDestinationException(token, ServerErrors.ErrMappingDestinationIndexOutOfRange)); + } + } + } + + return (new SubjectTransform( + src, dest, + dtokens, stokens, + [.. dtokMfTypes], [.. dtokMfIndexes], + [.. dtokMfIntArgs], [.. dtokMfStringArgs]), null); + } + + /// + /// Creates a non-strict transform. Mirrors NewSubjectTransform. + /// + public static (SubjectTransform? transform, Exception? err) New(string src, string dest) => + NewWithStrict(src, dest, false); + + /// + /// Creates a strict transform (only Wildcard function allowed). + /// Mirrors NewSubjectTransformStrict. + /// + public static (SubjectTransform? transform, Exception? err) NewStrict(string src, string dest) => + NewWithStrict(src, dest, true); + + /// + /// Attempts to match a published subject against the source pattern. + /// Returns the transformed subject or an error. + /// Mirrors subjectTransform.Match. + /// + public (string result, Exception? err) Match(string subject) + { + if ((_src == SubjectTokens.Fwcs || _src == SubjectTokens.Empty) && + (_dest == SubjectTokens.Fwcs || _dest == SubjectTokens.Empty)) + return (subject, null); + + var tts = TokenizeSubject(subject); + + if (!IsValidLiteralSubject(tts)) + return (SubjectTokens.Empty, ServerErrors.ErrBadSubject); + + if (_src == SubjectTokens.Empty || _src == SubjectTokens.Fwcs || + IsSubsetMatch(tts, _src)) + return (TransformTokenizedSubject(tts), null); + + return (SubjectTokens.Empty, ServerErrors.ErrNoTransforms); + } + + /// + /// Transforms a dot-separated subject string. + /// Mirrors subjectTransform.TransformSubject. + /// + public string TransformSubject(string subject) => + TransformTokenizedSubject(TokenizeSubject(subject)); + + /// + /// Core token-by-token transform engine. + /// Mirrors subjectTransform.TransformTokenizedSubject. + /// + public string TransformTokenizedSubject(string[] tokens) + { + if (_dtokmftypes.Length == 0) + return _dest; + + var b = new System.Text.StringBuilder(); + var li = _dtokmftypes.Length - 1; + + for (var i = 0; i < _dtokmftypes.Length; i++) + { + var mfType = _dtokmftypes[i]; + + if (mfType == TransformType.NoTransform) + { + if (_dtoks[i].Length == 1 && _dtoks[i][0] == SubjectTokens.Fwc) + break; + b.Append(_dtoks[i]); + } + else + { + switch (mfType) + { + case TransformType.Partition: + { + byte[] keyBytes; + if (_dtokmftokindexesargs[i].Length > 0) + { + var sb = new System.Text.StringBuilder(); + foreach (var srcTok in _dtokmftokindexesargs[i]) + sb.Append(tokens[srcTok]); + keyBytes = System.Text.Encoding.UTF8.GetBytes(sb.ToString()); + } + else + { + keyBytes = System.Text.Encoding.UTF8.GetBytes(string.Join(".", tokens)); + } + b.Append(GetHashPartition(keyBytes, _dtokmfintargs[i])); + break; + } + case TransformType.Wildcard: + if (_dtokmftokindexesargs.Length > i && + _dtokmftokindexesargs[i].Length > 0 && + tokens.Length > _dtokmftokindexesargs[i][0]) + { + b.Append(tokens[_dtokmftokindexesargs[i][0]]); + } + break; + case TransformType.SplitFromLeft: + { + var src = tokens[_dtokmftokindexesargs[i][0]]; + var pos = _dtokmfintargs[i]; + if (pos > 0 && pos < src.Length) + { + b.Append(src[..pos]); + b.Append(SubjectTokens.Tsep); + b.Append(src[pos..]); + } + else + { + b.Append(src); + } + break; + } + case TransformType.SplitFromRight: + { + var src = tokens[_dtokmftokindexesargs[i][0]]; + var pos = _dtokmfintargs[i]; + if (pos > 0 && pos < src.Length) + { + b.Append(src[..(src.Length - pos)]); + b.Append(SubjectTokens.Tsep); + b.Append(src[(src.Length - pos)..]); + } + else + { + b.Append(src); + } + break; + } + case TransformType.SliceFromLeft: + { + var src = tokens[_dtokmftokindexesargs[i][0]]; + var sz = _dtokmfintargs[i]; + if (sz > 0 && sz < src.Length) + { + var j = 0; + while (j + sz <= src.Length) + { + if (j != 0) b.Append(SubjectTokens.Tsep); + b.Append(src[j..(j + sz)]); + if (j + sz != src.Length && j + sz + sz > src.Length) + { + b.Append(SubjectTokens.Tsep); + b.Append(src[(j + sz)..]); + break; + } + j += sz; + } + } + else + { + b.Append(src); + } + break; + } + case TransformType.SliceFromRight: + { + var src = tokens[_dtokmftokindexesargs[i][0]]; + var sz = _dtokmfintargs[i]; + if (sz > 0 && sz < src.Length) + { + var rem = src.Length % sz; + if (rem > 0) + { + b.Append(src[..rem]); + b.Append(SubjectTokens.Tsep); + } + var j = rem; + while (j + sz <= src.Length) + { + b.Append(src[j..(j + sz)]); + if (j + sz < src.Length) b.Append(SubjectTokens.Tsep); + j += sz; + } + } + else + { + b.Append(src); + } + break; + } + case TransformType.Split: + { + var src = tokens[_dtokmftokindexesargs[i][0]]; + var parts = src.Split(_dtokmfstringargs[i]); + for (var j = 0; j < parts.Length; j++) + { + if (parts[j] != SubjectTokens.Empty) + b.Append(parts[j]); + if (j < parts.Length - 1 && + parts[j + 1] != SubjectTokens.Empty && + !(j == 0 && parts[j] == SubjectTokens.Empty)) + b.Append(SubjectTokens.Tsep); + } + break; + } + case TransformType.Left: + { + var src = tokens[_dtokmftokindexesargs[i][0]]; + var sz = _dtokmfintargs[i]; + b.Append(sz > 0 && sz < src.Length ? src[..sz] : src); + break; + } + case TransformType.Right: + { + var src = tokens[_dtokmftokindexesargs[i][0]]; + var sz = _dtokmfintargs[i]; + b.Append(sz > 0 && sz < src.Length ? src[(src.Length - sz)..] : src); + break; + } + case TransformType.Random: + b.Append(GetRandomPartition(_dtokmfintargs[i])); + break; + } + } + + if (i < li) + b.Append(SubjectTokens.Btsep); + } + + // Append remaining source tokens when destination ends with ">". + if (_dtoks.Length > 0 && _dtoks[^1] == SubjectTokens.Fwcs) + { + var stokLen = _stoks.Length; + for (var i = stokLen - 1; i < tokens.Length; i++) + { + b.Append(tokens[i]); + if (i < tokens.Length - 1) + b.Append(SubjectTokens.Btsep); + } + } + + return b.ToString(); + } + + /// + /// Reverses this transform (src ↔ dest). + /// Mirrors subjectTransform.reverse. + /// + internal SubjectTransform? Reverse() + { + if (_dtokmftokindexesargs.Length == 0) + { + var (rtr, _) = NewStrict(_dest, _src); + return rtr; + } + + var (nsrc, phs) = TransformUntokenize(_dest); + var nda = new List(); + foreach (var token in _stoks) + { + if (token == SubjectTokens.Pwcs) + { + if (phs.Length == 0) return null; + nda.Add(phs[0]); + phs = phs[1..]; + } + else + { + nda.Add(token); + } + } + var ndest = string.Join(SubjectTokens.Tsep, nda); + var (rtrFinal, _) = NewStrict(nsrc, ndest); + return rtrFinal; + } + + // ------------------------------------------------------------------------- + // Static helpers exposed internally + // ------------------------------------------------------------------------- + + /// + /// Returns the args extracted from a mapping-function token using the given regex. + /// Mirrors getMappingFunctionArgs. + /// + internal static string[]? GetMappingFunctionArgs(Regex functionRegex, string token) + { + var m = functionRegex.Match(token); + if (m.Success && m.Groups.Count > 1) + return CommaSep.Split(m.Groups[1].Value); + return null; + } + + /// + /// Helper for transform functions that take (wildcardIndex, int) args. + /// Mirrors transformIndexIntArgsHelper. + /// + internal static (short tt, int[] indexes, int intArg, string strArg, Exception? err) + TransformIndexIntArgsHelper(string token, string[] args, short transformType) + { + if (args.Length < 2) + return (TransformType.BadTransform, [], -1, SubjectTokens.Empty, + new MappingDestinationException(token, ServerErrors.ErrMappingDestinationNotEnoughArgs)); + if (args.Length > 2) + return (TransformType.BadTransform, [], -1, SubjectTokens.Empty, + new MappingDestinationException(token, ServerErrors.ErrMappingDestinationTooManyArgs)); + + if (!int.TryParse(args[0].Trim(), out var idx)) + return (TransformType.BadTransform, [], -1, SubjectTokens.Empty, + new MappingDestinationException(token, ServerErrors.ErrMappingDestinationInvalidArg)); + + if (!int.TryParse(args[1].Trim(), out var intVal)) + return (TransformType.BadTransform, [], -1, SubjectTokens.Empty, + new MappingDestinationException(token, ServerErrors.ErrMappingDestinationInvalidArg)); + + return (transformType, [idx], intVal, SubjectTokens.Empty, null); + } + + /// + /// Parses a destination token and returns its transform type and arguments. + /// Mirrors indexPlaceHolders. + /// + internal static (short tt, int[] indexes, int intArg, string strArg, Exception? err) + IndexPlaceHolders(string token) + { + var length = token.Length; + if (length > 1) + { + if (token[0] == '$') + { + if (!int.TryParse(token[1..], out var tp)) + return (TransformType.NoTransform, [-1], -1, SubjectTokens.Empty, null); + return (TransformType.Wildcard, [tp], -1, SubjectTokens.Empty, null); + } + + if (length > 4 && token[0] == '{' && token[1] == '{' && + token[length - 2] == '}' && token[length - 1] == '}') + { + // {{wildcard(n)}} + var args = GetMappingFunctionArgs(WildcardRe, token); + if (args != null) + { + if (args.Length == 1 && args[0] == SubjectTokens.Empty) + return (TransformType.BadTransform, [], -1, SubjectTokens.Empty, + new MappingDestinationException(token, ServerErrors.ErrMappingDestinationNotEnoughArgs)); + if (args.Length == 1) + { + if (!int.TryParse(args[0].Trim(), out var ti)) + return (TransformType.BadTransform, [], -1, SubjectTokens.Empty, + new MappingDestinationException(token, ServerErrors.ErrMappingDestinationInvalidArg)); + return (TransformType.Wildcard, [ti], -1, SubjectTokens.Empty, null); + } + return (TransformType.BadTransform, [], -1, SubjectTokens.Empty, + new MappingDestinationException(token, ServerErrors.ErrMappingDestinationTooManyArgs)); + } + + // {{partition(n[,t1,t2,...])}} + args = GetMappingFunctionArgs(PartitionRe, token); + if (args != null) + { + if (args.Length < 1) + return (TransformType.BadTransform, [], -1, SubjectTokens.Empty, + new MappingDestinationException(token, ServerErrors.ErrMappingDestinationNotEnoughArgs)); + if (!int.TryParse(args[0].Trim(), out var partN) || (long)partN > int.MaxValue) + return (TransformType.BadTransform, [], -1, SubjectTokens.Empty, + new MappingDestinationException(token, ServerErrors.ErrMappingDestinationInvalidArg)); + if (args.Length == 1) + return (TransformType.Partition, [], partN, SubjectTokens.Empty, null); + + var tidxs = new List(); + foreach (var t in args[1..]) + { + if (!int.TryParse(t.Trim(), out var ti2)) + return (TransformType.BadTransform, [], -1, SubjectTokens.Empty, + new MappingDestinationException(token, ServerErrors.ErrMappingDestinationInvalidArg)); + tidxs.Add(ti2); + } + return (TransformType.Partition, [.. tidxs], partN, SubjectTokens.Empty, null); + } + + // {{SplitFromLeft(t, n)}} + args = GetMappingFunctionArgs(SplitFromLeftRe, token); + if (args != null) return TransformIndexIntArgsHelper(token, args, TransformType.SplitFromLeft); + + // {{SplitFromRight(t, n)}} + args = GetMappingFunctionArgs(SplitFromRightRe, token); + if (args != null) return TransformIndexIntArgsHelper(token, args, TransformType.SplitFromRight); + + // {{SliceFromLeft(t, n)}} + args = GetMappingFunctionArgs(SliceFromLeftRe, token); + if (args != null) return TransformIndexIntArgsHelper(token, args, TransformType.SliceFromLeft); + + // {{SliceFromRight(t, n)}} + args = GetMappingFunctionArgs(SliceFromRightRe, token); + if (args != null) return TransformIndexIntArgsHelper(token, args, TransformType.SliceFromRight); + + // {{right(t, n)}} + args = GetMappingFunctionArgs(RightRe, token); + if (args != null) return TransformIndexIntArgsHelper(token, args, TransformType.Right); + + // {{left(t, n)}} + args = GetMappingFunctionArgs(LeftRe, token); + if (args != null) return TransformIndexIntArgsHelper(token, args, TransformType.Left); + + // {{split(t, delim)}} + args = GetMappingFunctionArgs(SplitRe, token); + if (args != null) + { + if (args.Length < 2) + return (TransformType.BadTransform, [], -1, SubjectTokens.Empty, + new MappingDestinationException(token, ServerErrors.ErrMappingDestinationNotEnoughArgs)); + if (args.Length > 2) + return (TransformType.BadTransform, [], -1, SubjectTokens.Empty, + new MappingDestinationException(token, ServerErrors.ErrMappingDestinationTooManyArgs)); + if (!int.TryParse(args[0].Trim(), out var splitIdx)) + return (TransformType.BadTransform, [], -1, SubjectTokens.Empty, + new MappingDestinationException(token, ServerErrors.ErrMappingDestinationInvalidArg)); + if (args[1].Contains(' ') || args[1].Contains(SubjectTokens.Tsep)) + return (TransformType.BadTransform, [], -1, SubjectTokens.Empty, + new MappingDestinationException(token, ServerErrors.ErrMappingDestinationInvalidArg)); + return (TransformType.Split, [splitIdx], -1, args[1], null); + } + + // {{random(n)}} + args = GetMappingFunctionArgs(RandomRe, token); + if (args != null) + { + if (args.Length != 1) + return (TransformType.BadTransform, [], -1, SubjectTokens.Empty, + new MappingDestinationException(token, ServerErrors.ErrMappingDestinationNotEnoughArgs)); + if (!int.TryParse(args[0].Trim(), out var randN) || (long)randN > int.MaxValue) + return (TransformType.BadTransform, [], -1, SubjectTokens.Empty, + new MappingDestinationException(token, ServerErrors.ErrMappingDestinationInvalidArg)); + return (TransformType.Random, [], randN, SubjectTokens.Empty, null); + } + + return (TransformType.BadTransform, [], -1, SubjectTokens.Empty, + new MappingDestinationException(token, ServerErrors.ErrUnknownMappingDestinationFunction)); + } + } + + return (TransformType.NoTransform, [-1], -1, SubjectTokens.Empty, null); + } + + /// + /// Tokenises a subject with wildcards into a formal transform destination. + /// e.g. "foo.*.*" → "foo.$1.$2". + /// Mirrors transformTokenize. + /// + public static string TransformTokenize(string subject) + { + var i = 1; + var parts = new List(); + foreach (var token in subject.Split(SubjectTokens.Btsep)) + { + if (token == SubjectTokens.Pwcs) + { + parts.Add($"${i++}"); + } + else + { + parts.Add(token); + } + } + return string.Join(SubjectTokens.Tsep, parts); + } + + /// + /// Converts a transform destination back to a wildcard subject + placeholder list. + /// Mirrors transformUntokenize. + /// + public static (string subject, string[] placeholders) TransformUntokenize(string subject) + { + var phs = new List(); + var nda = new List(); + + foreach (var token in subject.Split(SubjectTokens.Btsep)) + { + var args = GetMappingFunctionArgs(WildcardRe, token); + var isWildcardPlaceholder = + (token.Length > 1 && token[0] == '$' && token[1] >= '1' && token[1] <= '9') || + (args?.Length == 1 && args[0] != SubjectTokens.Empty); + + if (isWildcardPlaceholder) + { + phs.Add(token); + nda.Add(SubjectTokens.Pwcs); + } + else + { + nda.Add(token); + } + } + + return (string.Join(SubjectTokens.Tsep, nda), [.. phs]); + } + + /// + /// Tokenises a subject into an array of dot-separated tokens. + /// Mirrors tokenizeSubject. + /// + public static string[] TokenizeSubject(string subject) => + subject.Split(SubjectTokens.Btsep); + + /// + /// Returns (valid, tokens, numPwcs, hasFwc) for a subject string. + /// Mirrors subjectInfo. + /// + public static (bool valid, string[] tokens, int npwcs, bool hasFwc) SubjectInfo(string subject) + { + if (subject == string.Empty) + return (false, [], 0, false); + + var npwcs = 0; + var sfwc = false; + var tokens = subject.Split(SubjectTokens.Tsep); + foreach (var t in tokens) + { + if (t.Length == 0 || sfwc) + return (false, [], 0, false); + if (t.Length > 1) continue; + switch (t[0]) + { + case SubjectTokens.Fwc: + sfwc = true; + break; + case SubjectTokens.Pwc: + npwcs++; + break; + } + } + return (true, tokens, npwcs, sfwc); + } + + // ------------------------------------------------------------------------- + // Internal helpers used by Match + // ------------------------------------------------------------------------- + + /// + /// Returns true if all tokens are literal (no wildcards). + /// Mirrors isValidLiteralSubject in server/sublist.go. + /// + internal static bool IsValidLiteralSubject(string[] tokens) + { + foreach (var t in tokens) + { + if (t.Length == 0) return false; + if (t.Length == 1 && (t[0] == SubjectTokens.Pwc || t[0] == SubjectTokens.Fwc)) + return false; + } + return true; + } + + /// + /// Returns true if match the pattern . + /// Mirrors isSubsetMatch in server/sublist.go. + /// + internal static bool IsSubsetMatch(string[] tokens, string test) + { + var testToks = TokenizeSubjectIntoSlice(test); + return IsSubsetMatchTokenized(tokens, testToks); + } + + private static string[] TokenizeSubjectIntoSlice(string subject) + { + var result = new List(); + var start = 0; + for (var i = 0; i < subject.Length; i++) + { + if (subject[i] == SubjectTokens.Btsep) + { + result.Add(subject[start..i]); + start = i + 1; + } + } + result.Add(subject[start..]); + return [.. result]; + } + + private static bool IsSubsetMatchTokenized(string[] tokens, string[] test) + { + for (var i = 0; i < test.Length; i++) + { + if (i >= tokens.Length) return false; + var t2 = test[i]; + if (t2.Length == 0) return false; + if (t2.Length == 1 && t2[0] == SubjectTokens.Fwc) return true; + + var t1 = tokens[i]; + if (t1.Length == 0) return false; + if (t1.Length == 1 && t1[0] == SubjectTokens.Fwc) return false; + + if (t1.Length == 1 && t1[0] == SubjectTokens.Pwc) + { + if (!(t2.Length == 1 && t2[0] == SubjectTokens.Pwc)) return false; + if (i >= test.Length) return true; + continue; + } + + if (!(t2.Length == 1 && t2[0] == SubjectTokens.Pwc) && + string.Compare(t1, t2, StringComparison.Ordinal) != 0) + return false; + } + return tokens.Length == test.Length; + } + + private string GetRandomPartition(int ceiling) + { + if (ceiling == 0) return "0"; + return (Random.Shared.Next() % ceiling).ToString(); + } + + private static string GetHashPartition(byte[] key, int numBuckets) + { + if (numBuckets == 0) return "0"; + // FNV-1a 32-bit hash — mirrors fnv.New32a() in Go. + const uint FnvPrime = 16777619; + const uint FnvOffset = 2166136261; + var hash = FnvOffset; + foreach (var b in key) { hash ^= b; hash *= FnvPrime; } + return ((int)(hash % (uint)numBuckets)).ToString(); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/IpQueueTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/IpQueueTests.cs new file mode 100644 index 0000000..5bac632 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/IpQueueTests.cs @@ -0,0 +1,316 @@ +// Copyright 2021-2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System.Collections.Concurrent; +using Shouldly; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server.Tests.Internal; + +/// +/// Tests for . +/// Mirrors server/ipqueue_test.go: +/// TestIPQueueBasic (ID 688), TestIPQueuePush (ID 689), TestIPQueuePop (ID 690), +/// TestIPQueuePopOne (ID 691), TestIPQueueMultiProducers (ID 692), +/// TestIPQueueRecycle (ID 693), TestIPQueueDrain (ID 694), +/// TestIPQueueSizeCalculation (ID 695), TestIPQueueSizeCalculationWithLimits (ID 696). +/// Benchmarks (IDs 697–715) are n/a. +/// +public sealed class IpQueueTests +{ + [Fact] + public void Basic_ShouldInitialiseCorrectly() + { + // Mirror: TestIPQueueBasic + var registry = new ConcurrentDictionary(); + var q = new IpQueue("test", registry); + + q.MaxRecycleSize.ShouldBe(IpQueue.DefaultMaxRecycleSize); + q.Ch.TryRead(out _).ShouldBeFalse("channel should be empty on creation"); + q.Len().ShouldBe(0); + + // Create a second queue with custom max recycle size. + var q2 = new IpQueue("test2", registry, maxRecycleSize: 10); + q2.MaxRecycleSize.ShouldBe(10); + + // Both should be in the registry. + registry.ContainsKey("test").ShouldBeTrue(); + registry.ContainsKey("test2").ShouldBeTrue(); + + // Unregister both. + q.Unregister(); + q2.Unregister(); + registry.IsEmpty.ShouldBeTrue("registry should be empty after unregister"); + + // Push/pop should still work after unregister. + q.Push(1); + var elts = q.Pop(); + elts.ShouldNotBeNull(); + elts!.Length.ShouldBe(1); + + q2.Push(2); + var (e, ok) = q2.PopOne(); + ok.ShouldBeTrue(); + e.ShouldBe(2); + } + + [Fact] + public void Push_ShouldNotifyOnFirstElement() + { + // Mirror: TestIPQueuePush + var q = new IpQueue("test"); + + q.Push(1); + q.Len().ShouldBe(1); + q.Ch.TryRead(out _).ShouldBeTrue("should have been notified after first push"); + + // Second push should NOT send another notification. + q.Push(2); + q.Len().ShouldBe(2); + q.Ch.TryRead(out _).ShouldBeFalse("should not notify again when queue was not empty"); + } + + [Fact] + public void Pop_ShouldReturnElementsAndTrackInProgress() + { + // Mirror: TestIPQueuePop + var q = new IpQueue("test"); + q.Push(1); + q.Ch.TryRead(out _); // consume signal + + var elts = q.Pop(); + elts.ShouldNotBeNull(); + elts!.Length.ShouldBe(1); + q.Len().ShouldBe(0); + + // Channel should still be empty after pop. + q.Ch.TryRead(out _).ShouldBeFalse(); + + // InProgress should be 1 — pop increments it. + q.InProgress().ShouldBe(1L); + + // Recycle decrements it. + q.Recycle(elts); + q.InProgress().ShouldBe(0L); + + // Pop on empty queue returns null. + var empty = q.Pop(); + empty.ShouldBeNull(); + q.InProgress().ShouldBe(0L); + } + + [Fact] + public void PopOne_ShouldReturnOneAtATime() + { + // Mirror: TestIPQueuePopOne + var q = new IpQueue("test"); + q.Push(1); + q.Ch.TryRead(out _); // consume signal + + var (e, ok) = q.PopOne(); + ok.ShouldBeTrue(); + e.ShouldBe(1); + q.Len().ShouldBe(0); + q.InProgress().ShouldBe(0L, "popOne does not increment inprogress"); + q.Ch.TryRead(out _).ShouldBeFalse("no notification when queue is emptied by popOne"); + + q.Push(2); + q.Push(3); + + var (e2, ok2) = q.PopOne(); + ok2.ShouldBeTrue(); + e2.ShouldBe(2); + q.Len().ShouldBe(1); + q.Ch.TryRead(out _).ShouldBeTrue("should re-notify when more items remain"); + + var (e3, ok3) = q.PopOne(); + ok3.ShouldBeTrue(); + e3.ShouldBe(3); + q.Len().ShouldBe(0); + q.Ch.TryRead(out _).ShouldBeFalse("no notification after last element removed"); + + var (_, okEmpty) = q.PopOne(); + okEmpty.ShouldBeFalse("popOne on empty queue returns false"); + } + + [Fact] + public async Task MultiProducers_ShouldReceiveAllElements() + { + // Mirror: TestIPQueueMultiProducers + var q = new IpQueue("test"); + const int itemsPerProducer = 100; + const int numProducers = 3; + + var tasks = Enumerable.Range(0, numProducers).Select(p => + Task.Run(() => + { + for (var i = p * itemsPerProducer + 1; i <= (p + 1) * itemsPerProducer; i++) + q.Push(i); + })).ToArray(); + + var received = new HashSet(); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + + while (received.Count < numProducers * itemsPerProducer && + !cts.Token.IsCancellationRequested) + { + if (q.Ch.TryRead(out _)) + { + var batch = q.Pop(); + if (batch != null) + { + foreach (var v in batch) received.Add(v); + q.Recycle(batch); + q.InProgress().ShouldBe(0L); + } + } + else + { + await Task.Delay(1, cts.Token); + } + } + + await Task.WhenAll(tasks); + received.Count.ShouldBe(numProducers * itemsPerProducer, "all elements should be received"); + } + + [Fact] + public void Recycle_ShouldDecrementInProgressAndAllowReuse() + { + // Mirror: TestIPQueueRecycle (behavioral aspects) + var q = new IpQueue("test"); + const int total = 1000; + + for (var i = 0; i < total; i++) + { + var (len, err) = q.Push(i); + err.ShouldBeNull(); + len.ShouldBe(i + 1); + } + + var values = q.Pop(); + values.ShouldNotBeNull(); + values!.Length.ShouldBe(total); + q.InProgress().ShouldBe((long)total); + + q.Recycle(values); + q.InProgress().ShouldBe(0L, "recycle should decrement inprogress"); + + // Should be able to push/pop again after recycle. + var (l, err2) = q.Push(1001); + err2.ShouldBeNull(); + l.ShouldBe(1); + var values2 = q.Pop(); + values2.ShouldNotBeNull(); + values2!.Length.ShouldBe(1); + values2[0].ShouldBe(1001); + + // Recycle with small max recycle size: large arrays should not be pooled + // (behavioral: push/pop still works correctly). + var q2 = new IpQueue("test2", maxRecycleSize: 10); + for (var i = 0; i < 100; i++) q2.Push(i); + var bigBatch = q2.Pop(); + bigBatch.ShouldNotBeNull(); + bigBatch!.Length.ShouldBe(100); + q2.Recycle(bigBatch); + q2.InProgress().ShouldBe(0L); + + q2.Push(1001); + var small = q2.Pop(); + small.ShouldNotBeNull(); + small!.Length.ShouldBe(1); + q2.Recycle(small); + } + + [Fact] + public void Drain_ShouldEmptyQueueAndConsumeSignal() + { + // Mirror: TestIPQueueDrain + var q = new IpQueue("test"); + for (var i = 1; i <= 100; i++) q.Push(i); + + var drained = q.Drain(); + drained.ShouldBe(100); + + // Signal should have been consumed. + q.Ch.TryRead(out _).ShouldBeFalse("drain should consume the notification signal"); + q.Len().ShouldBe(0); + } + + [Fact] + public void SizeCalculation_ShouldTrackTotalSize() + { + // Mirror: TestIPQueueSizeCalculation + const int elemSize = 16; + var q = new IpQueue("test", sizeCalc: e => (ulong)e.Length); + + for (var i = 0; i < 10; i++) + { + q.Push(new byte[elemSize]); + q.Len().ShouldBe(i + 1); + q.Size().ShouldBe((ulong)(i + 1) * elemSize); + } + + for (var i = 10; i > 5; i--) + { + q.PopOne(); + q.Len().ShouldBe(i - 1); + q.Size().ShouldBe((ulong)(i - 1) * elemSize); + } + + q.Pop(); + q.Len().ShouldBe(0); + q.Size().ShouldBe(0UL); + } + + [Fact] + public void SizeCalculationWithLimits_ShouldEnforceLimits() + { + // Mirror: TestIPQueueSizeCalculationWithLimits + const int elemSize = 16; + Func calc = e => (ulong)e.Length; + var elem = new byte[elemSize]; + + // LimitByLen + var q1 = new IpQueue("test-len", sizeCalc: calc, maxLen: 5); + for (var i = 0; i < 10; i++) + { + var (n, err) = q1.Push(elem); + if (i >= 5) + { + err.ShouldBeSameAs(IpQueueErrors.LenLimitReached, $"iteration {i}"); + } + else + { + err.ShouldBeNull($"iteration {i}"); + } + n.ShouldBeLessThan(6); + } + + // LimitBySize + var q2 = new IpQueue("test-size", sizeCalc: calc, maxSize: elemSize * 5); + for (var i = 0; i < 10; i++) + { + var (n, err) = q2.Push(elem); + if (i >= 5) + { + err.ShouldBeSameAs(IpQueueErrors.SizeLimitReached, $"iteration {i}"); + } + else + { + err.ShouldBeNull($"iteration {i}"); + } + n.ShouldBeLessThan(6); + } + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/ServerUtilitiesTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/ServerUtilitiesTests.cs new file mode 100644 index 0000000..bb66dd4 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/ServerUtilitiesTests.cs @@ -0,0 +1,194 @@ +// Copyright 2012-2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Shouldly; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server.Tests.Internal; + +/// +/// Tests for . +/// Mirrors server/util_test.go: TestParseSize (ID 3061), TestParseSInt64 (ID 3062), +/// TestParseHostPort (ID 3063), TestURLsAreEqual (ID 3064), TestComma (ID 3065), +/// TestURLRedaction (ID 3066), TestVersionAtLeast (ID 3067). +/// Benchmarks (IDs 3068–3073) are n/a. +/// +public sealed class ServerUtilitiesTests +{ + [Fact] + public void ParseSize_ShouldParseValidAndRejectInvalid() + { + // Mirror: TestParseSize + ServerUtilities.ParseSize(ReadOnlySpan.Empty).ShouldBe(-1, "nil/empty should return -1"); + + var n = "12345678"u8; + ServerUtilities.ParseSize(n).ShouldBe(12345678); + + var bad = "12345invalid678"u8; + ServerUtilities.ParseSize(bad).ShouldBe(-1, "non-digit chars should return -1"); + } + + [Fact] + public void ParseInt64_ShouldParseValidAndRejectInvalid() + { + // Mirror: TestParseSInt64 + ServerUtilities.ParseInt64(ReadOnlySpan.Empty).ShouldBe(-1L, "empty should return -1"); + + var n = "12345678"u8; + ServerUtilities.ParseInt64(n).ShouldBe(12345678L); + + var bad = "12345invalid678"u8; + ServerUtilities.ParseInt64(bad).ShouldBe(-1L, "non-digit chars should return -1"); + } + + [Fact] + public void ParseHostPort_ShouldSplitCorrectly() + { + // Mirror: TestParseHostPort + void Check(string hostPort, int defaultPort, string expectedHost, int expectedPort, bool expectError) + { + var (host, port, err) = ServerUtilities.ParseHostPort(hostPort, defaultPort); + if (expectError) + { + err.ShouldNotBeNull($"expected error for hostPort={hostPort}"); + return; + } + err.ShouldBeNull($"unexpected error for hostPort={hostPort}: {err?.Message}"); + host.ShouldBe(expectedHost); + port.ShouldBe(expectedPort); + } + + Check("addr:1234", 5678, "addr", 1234, false); + Check(" addr:1234 ", 5678, "addr", 1234, false); + Check(" addr : 1234 ", 5678, "addr", 1234, false); + Check("addr", 5678, "addr", 5678, false); // no port → default + Check(" addr ", 5678, "addr", 5678, false); + Check("addr:-1", 5678, "addr", 5678, false); // -1 → default + Check(" addr:-1 ", 5678, "addr", 5678, false); + Check(" addr : -1 ", 5678, "addr", 5678, false); + Check("addr:0", 5678, "addr", 5678, false); // 0 → default + Check(" addr:0 ", 5678, "addr", 5678, false); + Check(" addr : 0 ", 5678, "addr", 5678, false); + Check("addr:addr", 0, "", 0, true); // non-numeric port + Check("addr:::1234", 0, "", 0, true); // ambiguous colons + Check("", 0, "", 0, true); // empty + } + + [Fact] + public void UrlsAreEqual_ShouldCompareCorrectly() + { + // Mirror: TestURLsAreEqual + void Check(string u1Str, string u2Str, bool expectedSame) + { + var u1 = new Uri(u1Str); + var u2 = new Uri(u2Str); + ServerUtilities.UrlsAreEqual(u1, u2).ShouldBe(expectedSame, + $"expected {u1Str} and {u2Str} to be {(expectedSame ? "equal" : "different")}"); + } + + Check("nats://localhost:4222", "nats://localhost:4222", true); + Check("nats://ivan:pwd@localhost:4222", "nats://ivan:pwd@localhost:4222", true); + Check("nats://ivan@localhost:4222", "nats://ivan@localhost:4222", true); + Check("nats://ivan:@localhost:4222", "nats://ivan:@localhost:4222", true); + Check("nats://host1:4222", "nats://host2:4222", false); + } + + [Fact] + public void Comma_ShouldFormatWithThousandSeparators() + { + // Mirror: TestComma + var cases = new (long input, string expected)[] + { + (0, "0"), + (10, "10"), + (100, "100"), + (1_000, "1,000"), + (10_000, "10,000"), + (100_000, "100,000"), + (10_000_000, "10,000,000"), + (10_100_000, "10,100,000"), + (10_010_000, "10,010,000"), + (10_001_000, "10,001,000"), + (123_456_789, "123,456,789"), + (9_223_372_036_854_775_807L, "9,223,372,036,854,775,807"), // long.MaxValue + (long.MinValue, "-9,223,372,036,854,775,808"), + (-123_456_789, "-123,456,789"), + (-10_100_000, "-10,100,000"), + (-10_010_000, "-10,010,000"), + (-10_001_000, "-10,001,000"), + (-10_000_000, "-10,000,000"), + (-100_000, "-100,000"), + (-10_000, "-10,000"), + (-1_000, "-1,000"), + (-100, "-100"), + (-10, "-10"), + }; + + foreach (var (input, expected) in cases) + ServerUtilities.Comma(input).ShouldBe(expected, $"Comma({input})"); + } + + [Fact] + public void UrlRedaction_ShouldReplacePasswords() + { + // Mirror: TestURLRedaction + var cases = new (string full, string safe)[] + { + ("nats://foo:bar@example.org", "nats://foo:xxxxx@example.org"), + ("nats://foo@example.org", "nats://foo@example.org"), + ("nats://example.org", "nats://example.org"), + ("nats://example.org/foo?bar=1", "nats://example.org/foo?bar=1"), + }; + + var listFull = new Uri[cases.Length]; + var listSafe = new Uri[cases.Length]; + + for (var i = 0; i < cases.Length; i++) + { + ServerUtilities.RedactUrlString(cases[i].full).ShouldBe(cases[i].safe, + $"RedactUrlString[{i}]"); + listFull[i] = new Uri(cases[i].full); + listSafe[i] = new Uri(cases[i].safe); + } + + var results = ServerUtilities.RedactUrlList(listFull); + for (var i = 0; i < results.Length; i++) + results[i].ToString().ShouldBe(listSafe[i].ToString(), $"RedactUrlList[{i}]"); + } + + [Fact] + public void VersionAtLeast_ShouldReturnCorrectResult() + { + // Mirror: TestVersionAtLeast + var cases = new (string version, int major, int minor, int update, bool result)[] + { + ("2.0.0-beta", 1, 9, 9, true), + ("2.0.0", 1, 99, 9, true), + ("2.2.0", 2, 1, 9, true), + ("2.2.2", 2, 2, 2, true), + ("2.2.2", 2, 2, 3, false), + ("2.2.2", 2, 3, 2, false), + ("2.2.2", 3, 2, 2, false), + ("2.22.2", 3, 0, 0, false), + ("2.2.22", 2, 3, 0, false), + ("bad.version",1, 2, 3, false), + }; + + foreach (var (version, major, minor, update, expected) in cases) + { + ServerUtilities.VersionAtLeast(version, major, minor, update) + .ShouldBe(expected, + $"VersionAtLeast({version}, {major}, {minor}, {update})"); + } + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/SubjectTransformTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/SubjectTransformTests.cs new file mode 100644 index 0000000..90989d7 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/SubjectTransformTests.cs @@ -0,0 +1,256 @@ +// Copyright 2023-2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Shouldly; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server.Tests.Internal; + +/// +/// Tests for . +/// Mirrors server/subject_transform_test.go: +/// TestPlaceHolderIndex (ID 2958), TestSubjectTransformHelpers (ID 2959), +/// TestSubjectTransforms (ID 2960), +/// TestSubjectTransformDoesntPanicTransformingMissingToken (ID 2961). +/// +public sealed class SubjectTransformTests +{ + [Fact] + public void PlaceHolderIndex_ShouldParseAllFunctionTypes() + { + // Mirror: TestPlaceHolderIndex + + // $1 — old style + var (tt, idxs, intArg, _, err) = SubjectTransform.IndexPlaceHolders("$1"); + err.ShouldBeNull(); + tt.ShouldBe(TransformType.Wildcard); + idxs.Length.ShouldBe(1); + idxs[0].ShouldBe(1); + intArg.ShouldBe(-1); + + // {{partition(10,1,2,3)}} + (tt, idxs, intArg, _, err) = SubjectTransform.IndexPlaceHolders("{{partition(10,1,2,3)}}"); + err.ShouldBeNull(); + tt.ShouldBe(TransformType.Partition); + idxs.ShouldBe([1, 2, 3]); + intArg.ShouldBe(10); + + // {{ Partition (10,1,2,3) }} (with spaces) + (tt, idxs, intArg, _, err) = SubjectTransform.IndexPlaceHolders("{{ Partition (10,1,2,3) }}"); + err.ShouldBeNull(); + tt.ShouldBe(TransformType.Partition); + idxs.ShouldBe([1, 2, 3]); + intArg.ShouldBe(10); + + // {{wildcard(2)}} + (tt, idxs, intArg, _, err) = SubjectTransform.IndexPlaceHolders("{{wildcard(2)}}"); + err.ShouldBeNull(); + tt.ShouldBe(TransformType.Wildcard); + idxs.Length.ShouldBe(1); + idxs[0].ShouldBe(2); + intArg.ShouldBe(-1); + + // {{SplitFromLeft(2,1)}} + int pos; + (tt, idxs, pos, _, err) = SubjectTransform.IndexPlaceHolders("{{SplitFromLeft(2,1)}}"); + err.ShouldBeNull(); + tt.ShouldBe(TransformType.SplitFromLeft); + idxs.Length.ShouldBe(1); + idxs[0].ShouldBe(2); + pos.ShouldBe(1); + + // {{SplitFromRight(3,2)}} + (tt, idxs, pos, _, err) = SubjectTransform.IndexPlaceHolders("{{SplitFromRight(3,2)}}"); + err.ShouldBeNull(); + tt.ShouldBe(TransformType.SplitFromRight); + idxs.Length.ShouldBe(1); + idxs[0].ShouldBe(3); + pos.ShouldBe(2); + + // {{SliceFromLeft(2,2)}} + (tt, idxs, var sliceSize, _, err) = SubjectTransform.IndexPlaceHolders("{{SliceFromLeft(2,2)}}"); + err.ShouldBeNull(); + tt.ShouldBe(TransformType.SliceFromLeft); + idxs.Length.ShouldBe(1); + idxs[0].ShouldBe(2); + sliceSize.ShouldBe(2); + + // {{Left(3,2)}} + (tt, idxs, pos, _, err) = SubjectTransform.IndexPlaceHolders("{{Left(3,2)}}"); + err.ShouldBeNull(); + tt.ShouldBe(TransformType.Left); + idxs.Length.ShouldBe(1); + idxs[0].ShouldBe(3); + pos.ShouldBe(2); + + // {{Right(3,2)}} + (tt, idxs, pos, _, err) = SubjectTransform.IndexPlaceHolders("{{Right(3,2)}}"); + err.ShouldBeNull(); + tt.ShouldBe(TransformType.Right); + idxs.Length.ShouldBe(1); + idxs[0].ShouldBe(3); + pos.ShouldBe(2); + } + + [Fact] + public void SubjectTransformHelpers_ShouldTokenizeAndUntokenize() + { + // Mirror: TestSubjectTransformHelpers + + // transformUntokenize — no placeholders + var (filter, phs) = SubjectTransform.TransformUntokenize("bar"); + filter.ShouldBe("bar"); + phs.Length.ShouldBe(0); + + // transformUntokenize — dollar-sign placeholders + (filter, phs) = SubjectTransform.TransformUntokenize("foo.$2.$1"); + filter.ShouldBe("foo.*.*"); + phs.ShouldBe(["$2", "$1"]); + + // transformUntokenize — mustache placeholders + (filter, phs) = SubjectTransform.TransformUntokenize("foo.{{wildcard(2)}}.{{wildcard(1)}}"); + filter.ShouldBe("foo.*.*"); + phs.ShouldBe(["{{wildcard(2)}}", "{{wildcard(1)}}"]); + + // Strict reverse transform. + var (tr, err) = SubjectTransform.NewStrict("foo.*.*", "bar.$2.{{Wildcard(1)}}"); + err.ShouldBeNull($"NewStrict failed: {err?.Message}"); + tr.ShouldNotBeNull(); + + var subject = "foo.b.a"; + var transformed = tr!.TransformSubject(subject); + var reverse = tr.Reverse(); + reverse.ShouldNotBeNull("reverse should not be null"); + reverse!.TransformSubject(transformed).ShouldBe(subject, "reverse of transform should return original subject"); + } + + [Fact] + public void SubjectTransforms_ShouldValidateAndTransformCorrectly() + { + // Mirror: TestSubjectTransforms + void ShouldErr(string src, string dest, bool strict) + { + var (t, e) = SubjectTransform.NewWithStrict(src, dest, strict); + t.ShouldBeNull($"expected error but got transform for src={src}, dest={dest}"); + var isValid = ReferenceEquals(e, ServerErrors.ErrBadSubject) || + (e is MappingDestinationException mde && mde.Is(ServerErrors.ErrInvalidMappingDestination)); + isValid.ShouldBeTrue( + $"Expected ErrBadSubject or ErrInvalidMappingDestination for src={src}, dest={dest}, got: {e}"); + } + + ShouldErr("foo..", "bar", false); + ShouldErr("foo.*", "bar.*", false); + ShouldErr("foo.*", "bar.$2", false); + ShouldErr("foo.*", "bar.$1.>", false); + ShouldErr("foo.>", "bar.baz", false); + ShouldErr("foo.*.*", "bar.$2", true); + ShouldErr("foo.*", "foo.$foo", true); + ShouldErr("foo.*", "bar.{{Partition(2,1)}}", true); + ShouldErr("foo.*", "foo.{{wildcard(2)}}", false); + ShouldErr("foo.*", "foo.{{unimplemented(1)}}", false); + ShouldErr("foo.*", "foo.{{partition()}}", false); + ShouldErr("foo.*", "foo.{{random()}}", false); + ShouldErr("foo.*", "foo.{{wildcard(foo)}}", false); + ShouldErr("foo.*", "foo.{{wildcard()}}", false); + ShouldErr("foo.*", "foo.{{wildcard(1,2)}}", false); + ShouldErr("foo.*", "foo.{{ wildcard5) }}", false); + ShouldErr("foo.*", "foo.{{splitLeft(2,2}}", false); + ShouldErr("foo", "bla.{{wildcard(1)}}", false); + ShouldErr("foo.*", $"foo.{{{{partition({(long)int.MaxValue + 1})}}}}", false); + ShouldErr("foo.*", $"foo.{{{{random({(long)int.MaxValue + 1})}}}}", false); + + SubjectTransform? ShouldBeOK(string src, string dest, bool strict) + { + var (tr, err) = SubjectTransform.NewWithStrict(src, dest, strict); + err.ShouldBeNull($"Got error {err} for src={src}, dest={dest}"); + return tr; + } + + ShouldBeOK("foo.*", "bar.{{Wildcard(1)}}", true); + ShouldBeOK("foo.*.*", "bar.$2", false); + ShouldBeOK("foo.*.*", "bar.{{wildcard(1)}}", false); + ShouldBeOK("foo.*.*", "bar.{{partition(1)}}", false); + ShouldBeOK("foo.*.*", "bar.{{random(5)}}", false); + ShouldBeOK("foo", "bar", false); + ShouldBeOK("foo.*.bar.*.baz", "req.$2.$1", false); + ShouldBeOK("baz.>", "mybaz.>", false); + ShouldBeOK("*", "{{splitfromleft(1,1)}}", false); + ShouldBeOK("", "prefix.>", false); + ShouldBeOK("*.*", "{{partition(10,1,2)}}", false); + ShouldBeOK("foo.*.*", "foo.{{wildcard(1)}}.{{wildcard(2)}}.{{partition(5,1,2)}}", false); + ShouldBeOK("foo.*", $"foo.{{{{partition({int.MaxValue})}}}}", false); + ShouldBeOK("foo.*", $"foo.{{{{random({int.MaxValue})}}}}", false); + ShouldBeOK("foo.bar", $"foo.{{{{random({int.MaxValue})}}}}", false); + + void ShouldMatch(string src, string dest, string sample, params string[] expected) + { + var tr = ShouldBeOK(src, dest, false); + if (tr == null) return; + var (s, err2) = tr.Match(sample); + err2.ShouldBeNull($"Match error: {err2}"); + expected.ShouldContain(s, $"Transform {src}→{dest} on '{sample}', got '{s}'"); + } + + ShouldMatch("", "prefix.>", "foo", "prefix.foo"); + ShouldMatch("foo", "", "foo", "foo"); + ShouldMatch("foo", "bar", "foo", "bar"); + ShouldMatch("foo.*.bar.*.baz", "req.$2.$1", "foo.A.bar.B.baz", "req.B.A"); + ShouldMatch("foo.*.bar.*.baz", "req.{{wildcard(2)}}.{{wildcard(1)}}", "foo.A.bar.B.baz", "req.B.A"); + ShouldMatch("baz.>", "my.pre.>", "baz.1.2.3", "my.pre.1.2.3"); + ShouldMatch("baz.>", "foo.bar.>", "baz.1.2.3", "foo.bar.1.2.3"); + ShouldMatch("*", "foo.bar.$1", "foo", "foo.bar.foo"); + ShouldMatch("*", "{{splitfromleft(1,3)}}", "12345", "123.45"); + ShouldMatch("*", "{{SplitFromRight(1,3)}}", "12345", "12.345"); + ShouldMatch("*", "{{SliceFromLeft(1,3)}}", "1234567890", "123.456.789.0"); + ShouldMatch("*", "{{SliceFromRight(1,3)}}", "1234567890", "1.234.567.890"); + ShouldMatch("*", "{{split(1,-)}}", "-abc-def--ghi-", "abc.def.ghi"); + ShouldMatch("*", "{{split(1,-)}}", "abc-def--ghi-", "abc.def.ghi"); + ShouldMatch("*.*", "{{split(2,-)}}.{{splitfromleft(1,2)}}", "foo.-abc-def--ghij-", "abc.def.ghij.fo.o"); + ShouldMatch("*", "{{right(1,1)}}", "1234", "4"); + ShouldMatch("*", "{{right(1,3)}}", "1234", "234"); + ShouldMatch("*", "{{right(1,6)}}", "1234", "1234"); + ShouldMatch("*", "{{left(1,1)}}", "1234", "1"); + ShouldMatch("*", "{{left(1,3)}}", "1234", "123"); + ShouldMatch("*", "{{left(1,6)}}", "1234", "1234"); + ShouldMatch("*", "bar.{{partition(0)}}", "baz", "bar.0"); + ShouldMatch("*", "bar.{{partition(10, 0)}}", "foo", "bar.3"); + ShouldMatch("*.*", "bar.{{partition(10)}}", "foo.bar", "bar.6"); + ShouldMatch("*", "bar.{{partition(10)}}", "foo", "bar.3"); + ShouldMatch("*", "bar.{{partition(10)}}", "baz", "bar.0"); + ShouldMatch("*", "bar.{{partition(10)}}", "qux", "bar.9"); + ShouldMatch("*", "bar.{{random(0)}}", "qux", "bar.0"); + + // random(6) — any value 0–5 is acceptable. + for (var i = 0; i < 100; i++) + ShouldMatch("*", "bar.{{random(6)}}", "qux", + "bar.0", "bar.1", "bar.2", "bar.3", "bar.4", "bar.5"); + + ShouldBeOK("foo.bar", "baz.{{partition(10)}}", false); + ShouldMatch("foo.bar", "baz.{{partition(10)}}", "foo.bar", "baz.6"); + ShouldMatch("foo.baz", "qux.{{partition(10)}}", "foo.baz", "qux.4"); + ShouldMatch("test.subject", "result.{{partition(5)}}", "test.subject", "result.0"); + } + + [Fact] + public void TransformTokenizedSubject_ShouldNotPanicOnMissingToken() + { + // Mirror: TestSubjectTransformDoesntPanicTransformingMissingToken + var (tr, err) = SubjectTransform.New("foo.*", "one.two.{{wildcard(1)}}"); + err.ShouldBeNull(); + tr.ShouldNotBeNull(); + + // Should not throw even when the token at index 1 is missing. + var result = tr!.TransformTokenizedSubject(["foo"]); + result.ShouldBe("one.two."); + } +} diff --git a/porting.db b/porting.db index b9a4b1c21d6e4aed9579e3140703bcc6b348c2fd..2e12d22104ee1a0b2135a883da6c82e6e53b9c3b 100644 GIT binary patch delta 11482 zcmb_i30M?YwywIhb$4}Db%8A1*mMIn`@TpJ5wKlAMG%)n zaw8-&8pSN9hVg$+w<}6;gWI=UZwP-oo zMY}~=>>vq(?3AuZAB!EWwu{bVX%`oWt!FL;1MUcp|`u2B#SuuIs!%#(BuXS;3hY4)LWmluC~@%=US&)$5{)lnO2WA z+{*Je`D^?a{3raI{3*VR-^I7_kMoc4i~0Nb$$SZ)&!_TH`~aS_{Ajsi`OI?B@~Y*y zWsl`4ON-?Z%OXparP4COGT0JtaagS8o8~_ApUoedUpK#CK45;@ya7B7Hh|Tj2HX$o zr+^YLgnE@aPVJ$dqFSg&%&W~c=KIZ4%q8X_=45lE*=7c&?@WI&{lWC6=_Iv?s-h~X z5!7HRo^sGdbS6EB4y8@hkEX+>9i}a&HKyg%73wqUBK<7Am2RdVrXQfE)8pwc=|9qM z)2Hb!#%=nIX_{%AX_zV96kTr`U}Cv{a9@FB5J_3VP0$Da3_b*}gBQR7?o;kvu7^9y z?dICJb=*pB0XLJY;D&SATr3yPS=gW0ui0Mq_v|^go88B@v+LQ%*d^?J>?C#+dl#F? z4rE1^V7_6#U_NHfGcPfRnBOv+mJ8+R z25P;5dcr`p7^ue$)H*%I+-;zy7^ulYy>8|)lMD|h8mLM=B@8oA1qN!Up5nI|sI3O7 zRiNtoCqBQ~@NkoX+Nh_5kp^mnf$|!t;RdSMKoucsa$wyN3Jnj9aZze`R$`z=8>mre zElPwLRb(u*!~!c2#wt|_;5?`o_84%T25Pr~+GU`gHBdVZ)D8pnI|KDw14TZA?Wu$n zF(48;qd_99hz0RB%ir}VORSPL2xOCRU@~ZdD`UU`xGEYH!9B@fKRFHdMuYKiH2&h{ zXb_=zQ$P=>CrJ=avuoJpAQpuC7XmOGWV2~(6dTMkU;&s3Dww}9e`4Na&M?QA-JlJu z11p&}W*xJVS-{L>DwyF+Hux00i^LrTyBQZFGZg(j{a5ELVl zlLt6pYyxn?%kf|!tdBK#8w7`<0IrDxj{-LVljFfCD_cOI&CmY_pN-0sI912L1>x zP;01K>ON`$RfK#J2~#I9p}_h8v`t_lPsCG2B-}?*>tF><+2G>AAQ}!G46@)qa=-|< zD+f%5$vNO5xb&R#^4cnzLT?(q8KsJ5cRpc9=97G9)&x zp*Bs3!`SuDuz%HI$8BRRaQAQ7fE|sNmT`z7p>biFkboGJUs~B%(#Vk#kUp_xECOdV z%u5q8u=Gva*uUwp&$h7{0W8}x24T~D6Vrq=jQy&O{hbcm1=)1&RtJ@Le#>T&%3mS- z7|0gP2T`zi2rz_ri||N-kj@8Ji~9F?jA_cC;S+;Stso zq>bUymgxvw+AuW>ZvG7ytax{@cazHHPIfX)BQv+)m2iWt!PIg$g2nhAMdOEL!chm< zeQ--XYN4=552&8kG6ln$^8GO7z)p5835QSj<2ZQFuKsauoZK=Qf$dE>X+jvb^Qm3z zO1+(Vo$Me@vTt$ABm@rgEk?114VZk8-KO(aLy9Lx8;b5OmP|Jby~Gi!mXNN(iXO?W zj4u)$B&?diM8hS%upn5qnYJhmm)V(CS{y}8#@(EN=adAP@8Rqd{C1U4DgYZqHutg3 zeOz;&skzVG+-K?T<3X=!Jh3w%J_{CHv_1-ZSBE`|1xMo#T4{LS`Zf)O`HI_Zn?k~( zDl*b9^n*{VS>b~k31J5FrnQ1UVEK#qq#1I@*$qq#y^i{d8U;3yz2s1$5t}tt^5sAY zMT%F*ic=@Vuxr){aQ1U_B<#9o%??a4Hxv;n2chgZSbYP95GZ|PP4$n{lJBf$IQJW? zCxB0e0>T#{ygDU!eq$XQkZOV<-&zv_&`fAW&|(DjV9}GlwU+CkZ+~k|3P9bE|0gS= zze84N2AklD?{ro;patPGkZ5%cIsTos5Z-qTDNzj`_`S7%4GDv01WiNGU~I+Y@2v$o zsV{$TjSg5515F5;B4i0E82at^)}elA#dT}07AWw+KI=JU!*y$|-GgFXBFjnU6-JT( zbq;JK%f;E;4_uKj&N`A`Y}v>@Y+j|H;Yc8D4MSlF3dWR%d?X$RBkK@8)LBAB2x(g2 zHe|p72%FQ8goXuU)nS<#A)MARj2tL<0t};Ju*z_EKg0j%u&qu(KZG*srhW)R;E70K zZ~$+Cp)MgV;FcU{L-;@-O>_wp@a$9r$pqhV3CR)Kf;|E{kr`1!QWzSS;9OUTPlmk% zgm`$gleWWz5P{FAtX{foeob|)dt!CX^6DD*>_zvxi|5Q)ylhdOyKcpj>e_twL-Q9b zaNk$$Ua}aU>7eg)SQec-Br8j)mxb|E2;rL!2O>2YNDYcCq{f5PEDRCu@~e40Lrh1%iDmVGwfb1QeJ#0Cys1 z8T`Fdn1Pjq<2h=AcSi~Xvitp2x45Rdtae_qd;TKy&po$h@j~~Q>bi+_HPy2hy5noB zYis8(UgXZmOK3fl?LzUIfqT>J7P{Rc>E^67v4ge}csX+`@kL@N8P9*q|CxWE@8OU9 z>z6XOSQG$WjJCJ^Alj6HBg8K`AOh$+XkF%Co?6C0=`C%CuYZO>G~FdOlk43ZwBnyR=Q53EyvV#G-9gbimbZVuCi=l+Mj! z6Ci)UjNb%)?M> zpEyA$cFjI9hrcyV(X#ap_UHx4H?4g%Vwxy;Q!+49>Jjk)SaYva9SRPfKO*+$HKW@{ zAzR9QOT8)T`gLia_y?UAkwJqUw-hw;?IRI3uQA`7f_LfMlrWfaL<~_z9uOrGHoO?^ zR9%F|n}cf(ih}~3NN*p3>`6kMNW%7xIE}Qi+s!jgC%H>fhL|Q)3YFMT50lCsQrb!Zb`X63eUJtp zWTY2qZUnIm$MsUl7uVrLPC==?KL=lBlJeBBgI#M~5==Eo9Q?{8C0P1r1kAWN=qa0T zYKI$n-{o^92rj%hWwTNZ-WX%x#W7AZ{D_s}BQ?)QcDRs|1f&F?Aaauuh^#Era@gng zgurx83f1u;jUqg|f^-EUeP~A{9yY{Wfc3nMkHQ6AQcAr7_Ja=aB=tD8idsm`LPtk0l|#i*c8Ui-gR9^&_yD{{$Iu}( zN8OgjfRJ^dKHguaiSB&-!y2oDM~gmM8TdxA#@6U@xLOeIsw z6oJ9aa%LfOKT{7-mcB@;DW ztx13vwXb@##w%LmwAMJIHO^{{*R;kPTI0OdcoQvF{_|P=t*@-4*3f*(|8naa{-)OW zS!?{H?*sq82I@xxb;CgY!$AFjsQN$=j=yer_`Rm<5v{RGYpm27E40SLTH_(Dv0Q5` z(;9VJqgHFwpvEw@=(E(I`rj@_@*9p_+gC&@XJ3|v(R0W*iBsf@1T}}6h$c}M6+;D6 z6!;c=j;7FAa187MTYwKN0kgn(P=E^X5x_#;AipF(u|9yVBT6ifk}p~oNLMZQTE|)jcw^5p6dw;bpX&RFgM&?1YpJZyc9o z7&-*Vt$m$xYfERe(mNDPB4P3gNmfP<0|&^~M|RoBE!`I-auY6f?PSUcSWzs7t10ES znN*?NA-0I8#6x1Acu9O$3X;fehpCrf{23`-SxHhKt109!Sf@>y#ZVC(MAar)dBTcL z(D0;4$*>)NT*V&;(Fa)vPl;4!n?yy!EQxw|FS*3lYo`BTJVjS9``gY)c34;}CKwOv z+L`(?HHE@^?5D~ZlKO+c)5_T?vNsRphd|RNIVw6|w~70kzNt;QXe%I!g*>4cCnF`B zBd%hxDIaF zDrf28*-e=ko`B$ZAA^&&%K7k{28R>&lu0b^Y%n~(RZiFQ6In4AvM-d#{!Mhn)&$&%-(GGd!9fav25r^bxo!Ak-CUyY4 zx$BO?2Ef}y;5_%&Efe6)-FK9u8YRH(od$dGvE`1!kf%C-#Z#L*3eJ za&|d#HY|Kj4u?JaWCo)3Wt7o5(fB(amK=}+hakj{&ou^qCp>jfHo+J6%V}zhDPFJu z1fwNQuzFZx^MSGS+CtIGydoZfEeon>*!1xZyb@{OW$0GqO7U z52o!mXf3#1NWTNr5J?R9!^D&?ss@~~E2>vaF#VuG=+N7Rs$SvYyyxU(>=o&D%aA_; zRU_nz{M-3eqtxh7?Ga$tq1(Hcs78m*Jwt96GUs-!$Wb==^vpfP{yd?!po#BMFny8t3)QhhsO% zk#OM=*#ZlWpylP8$!O)!mZYh=^`f~;Ztea?~&qR`VRYa*j2&A!1s38Pdg*2fALaUcG@>Fz*G<4pF<7z*Ekk_Vhtj3f%|QI z$~f=;a@v0T-q0j!CJ{ueB*gE8Yu2BwL-}QvH)M>Dc97n9g zyyKOXJ@zjEmQ{r=r1%+9S=%GH`-MUdUgCq{qO6M*HnJ&a*l2!_fk!SMOl?IQy)2`0W_&kbOU z+a5BgzjVPK0bAd&rv~hGw4Jx7>hR0|8DE7EaL!7nYO@=@V`NzN&kQLS3=Hjv)M==@ zouRbf7&z}uduqVp1#Pt$t2!K21OpC#^-X)aj^~@Zf>Q!K-nJUdlZU*Mr}C8f2kD8o z?AZYxG_Z&a@D#Kyy`!FKZ|n4gz;kcgGqtn4^7GqvOE76xOuhDxtkMW52P3Vkgfd|j z&J?a(v2TaZUx-SE?|p6OVAoYN)))V7uWAFVg2l8@o66Za?qftM(B_-csF!?e4R)Mc{}Pn$jIkaD;?WRWTXx zo^;2gV4NK#LN)$e=rF0C!B3_;Qn9c>@M4A|7!JyC^v87}RD&>3v|pIv7z&>~>Pmp+ zi5?Mtp5e&T+>1pIhiV+6>i~aYAS=@`%)hYoDab#n{3Tfq26kmSa`cQtp&F1$h*3>= ze#&$V!G$Id?5zu@{mKyIU3#9uP>snr#G_upyprV@du(1$L--RX9(DfboPfdkwjdv^5Ush(qI8+&Cd2#f4%( zNhx!tkh*%r>haD%C9pA~!-K$KYw{BOcW{lvop+JCV#ilrXa61C^o}S5j=}{M^~T6E z!da&itX5Udk8mbv#Yw(|n2wjodn8|7M?5N#l+RD_*U~b^IoIm6{B@i&P4jj`aYroD zlHX8}kb*Bva+B;Zz04T`d&fIp)A4**?hM?S`0nc%gm^qYXM*6tb43~Hd>B?9)=E7y zM>&(U)uO@F5reRCzBvg(9LDY#<$Ojbcfn|9a-gy^wIdqWo|fLKJslqHd{751DRJt? z8o&uA_oI7JK^fJPV>%L%lB5Py6H=?6-<3MM1Dt`iv_a@dK(HuZJ@P2FC338@Ghnv> zua9+lwAG=x!r5FA*xVC9Q!-Mw2@OJQTWz+nf;yRI=;c5PUCuZ3c8V4-jR+B zPeFY|XPDsJq?9=@?+I5GzL9YHD^hPf;j$$KZX`+<&0XxSt*co!r_LSkzpO!b6K?Gy zBHe!xfd*E+h_GZ5k(-Mhl5bK+GE$R-{G;|Vd!y?&{$8RWDhy(;M?dN6(qR)nb;N6C z`xbSiAZ(H^=2jPrHn@B+xZQC}Zr=u1sy0I1n_T1)nO;i>L?(f*em8O-qhH%ap9pd< zqa&ELgbDSHKqr$DB=C+Y`_$W$Gj!kyQ?qStMd7muwYVAVo6>|!%S0*4u}aYhdp)JM z&Pa^=*3Rue88L1FuHCCk@1`{2-!$Bj(E>YIcfNl|)3_}#nse(9K&8KCegC29PUo}! zr~dPU<}~G9le-!yYhMc6PU4f_=zl5R1~2ZD@1-iMy(98!(UFxA!~mY+{({FRD1*Tl<5Q02ir1&LY1OTQA_X^#=Hr;f~>C4sZhj> zIcQJJQTN1BTo;DDTcQTQo-I*>jr$@4Gg_kxw4SY9)*6N0(dZqcENdOKetnqvLn4S| zSSe3jARM>eu;%j*Sx#DLCEV>fX@*N)j>(X+pCYkFbyvfg*cS<&BL9d~rFfihZlWhH z5SzJAp-dH*9}lPS7xMMH0@r~eo@3aUBr zt!_EtBdMNZUC!`Bs>e_wDS~P`tfmckHkYP(3Igy@cs9*5Serg*{_RQkRJB#ev_dcO H{3QJ!7`>pA delta 9432 zcmb_hd3;k<_J3R6lDFk`p(JgZgp!u7X`3z-C~ZmEY0Hi)6cs^KWYbY_ENN-M4no;- zutlgK`xa39W*ih`aS(xl(QzDSMl1SrUy8V|zkAy_{b-xN~y?U>DgBnnCRJ+P3Unp-XrFJz{_bTx-7cQQ4M}kVHkn`?a zJY&&f-5X?QThqvvfI*XS!G#ndDya;zLTC#Po=Br5%f^Hh|Xd5G4TEXDg!>uqRQcDCRG3fvZ+KX zokLj7+=A9Ds+zYfCV2cEveGFBoEbnR!w(r$huS;wtf_=PDxE4IRf%?z-RXD}Vsg)q zeE`a9$z*VQDGBNaP$M8Iok{}6Pubvi1ylz-l}}BAib85WtjMP_NH;uPh(P1>sbYvJ zqB5Z%pGt>GMU)fzqEAk#w_-)S?weK5@lt0fSeZdR0$rKZ9=#@;dWNzP@6#1DEY77? z#fXJsmY5EwydNJr_ z(Q6s@$v-4pXAuGsCb$oj^X!|JW%L*F!}QJ4I3g^#`4A7D`LXw7XC>Jh)0)S}5M=zy z89q6dB2)CE>)5dre7Vr#hVdbmhpdgP2QF@;c*WLStr4V0_Q@WSOo5gVn*$4D6AT;& zmqTnibR|-}DmRZq64M**^~vc7sL$WTrjoF2jVY0X>`kl{PCd>d5vOQgo!UGW!Sm{8 z`{X<<_m{`nPkN!{Ha5=$3e95>&=;KKlYJOCrH$R%3)puvn~M$L^!g{*M<~5OcYZ+H z>a&__5LZI|ET0^Yx$fA={uSyUk1@>7FeO(tS0PMV{UV>7hB5ts{h*g*WN?-N)W>gT z^GUC4xr2}hFQH78-jfE4!|9b|tF?$f!@b8PS>8cE;6MGH?d<)etzmBSR3tm4aWHZl z_V11R*cBxF?N%z{H|9XM-)?B0f{^ycLZ6&~k(CG7B_v!v*^Rt>knImG-zHmO#CQBs zecBFoGGnV3o2Oyk(J5Y z*i)q$ z(2GYe0llm`(&&YQX zQy{OFoPf8A3jedyURW7*!H8?42`$ltHPM9C`t)6P52@F^A+1R0e>WlDO581t7x!A< zR~{39dxl+YdCxL}*+Kt852x0VpO8LmL$xH{O5F`YvKrpW%@=K}4h_V5TyDDXNYZiZ_dG1xLSx%0a z`NCDn3tdN(1R1Odn}nvW`$Im9_FQX@OWGd?QSu zipV;?oGY>XYPpFyPwQ*XsyC5(d4gO`#|8y(BJmzMGXWc0?20E~RUdf(EIQzH!}&zy z{eRBpHj!Ou&9Bc6kZd?vMvUprelq&LzqM{OFJ+c=bamH8z+4i3pr2XWE?x z*hxS`qMQ%g*H{d&1{H1Sj+ys^yjdTfBtK8t>dPP=?TzgE!DyX$Th7qr*COXr3{qnp zkp&QkFopG*DE=6O;{PhF`&cl2u_;#enz%6#Yv86`!^Oi#u}I*;bCCpiHx7Z9ofQQQ zVvwm5P!tN0$YNI^0r4*RR@l}gy3O=k=Rzs@=GLTs(D^iFQGB2whTeG<5B_+$2=d=Z z#D<8lHeOCMV>b|N2-}_Ca3Nlv07nv)NWu*7BWD@gy~! zxe`Z0u1Cty=Ts;n75Y}O)kW`FIS{oY$Yu zadpwivC)Jv(S*^_gi+CikO0mdjTB1v-haUftwT|a%H)7thI1-=qcY8mk*&eO!~BrqfKwY4 zuNly?)=~uaBd{NXf8D5zy&ha*;Ivy>halWYgd2%BK=w$;-KZqMl@_HEo@^35a83CA zYn60!3m*eisH) zDY|Qi@&F0_&d0h#SSoS@2Pbwaxu)pM)^dan1pNUyfJF2xQ}9p*TAm z=7p7HIJ;NTdwJeasqa%F7p>Ad9C4L23=U))CB#?zlr&U&9qZ+I4n`eN`kNN|TPu-; znaILSY+?NYC2*aic({5%$;LH@KvGZ<)H-&Rm(!FQytQ>CVw@xgvhy(G**(f!_}ib8 zu7#I_nR}5R@X~BD*Fq@dqXhe`a#8+T>H=F>8K>OJ*;F|VuI6JMuBQ@ROzq6xdB3A>^RJEI9Zq6tq$6Y91{Q?^ADo{T26 zM-#S26ShPXo`@!Fju7;7zbF$)8-EYChhy0OYM$bgC(0A?3H>0c?Rg58~y^AWXydiu(aa!x<_XLTqeN=u1LSIlTzxua*13+?h5T*e*!}VNkilp-RW3gVGad+O(AX6Vd_S@U%z2I@h*| zgaxRh*S|lhzF~pS=h;x~6=!PmkhH58}7k_-!hF8;;)e85VjEN!Gwt zD_zMb{Ruya0zFiJeh4+2q)^9HXSFBml|!k6WYihjaZx2(IHizy1sMdZYHcsTTjMQr zdyF5~SLssTRxT*dEBlo;rCE7MnWIcoMxaaV@Kh~Uzr4hoY|)~20Qs8!9ZPr6y5%A7 zze&AWqW6&uTJAZx`E&XoV7t%uIYpIq%(s!9I@zV2A@!Hvp^wY@qcgPM=TZX+;9E40 zYvA=k2;S0Ij5Y^RA(zM%`1~H*qD_YS+v8H<;x~=Ey@F#ufMRTCEb+EwS~If5$qlzy#me zSb)K02yE0+N7iW*VcqK9;6dxP!6vw((T~AC1UAZ(HS4uuaAanDkK{onuDCHDmHV|cZ4*nDe4e!`dIk`=nWLNqo|^(y-*&;?~Awv|EdZ zWd~7g?`hHczxG&kop(F1EolHP$9yI`R+wHeN zd%3a&cgAC2K)Iqw^lkJ6x}5gW$+QgaOVR+n<4OA+mgrvj|2W_Jn9=V#VBZvziR_6X zmJrHU@)hZ4sX&}%-7Nf7p!h;=4*R6#f`w#?>FLz{)kOEItz0r5}*ww|(&gcCp7Qz3lL9tV9- z+ass*=+F`jEJna$44iq|J{lSxk3n;gm+UIMe%fAQN~{Szgn@$*a4-g@zGScJk*L7d zm+X;KIS_gf1N{hSbhuYuvJZ##Z>SN8xBaEt#?hhQU8g$y7o^B>`Sl*BQINo8$i9a!h&ozj0XLIZQXMs?njvzh!frML$n^ zi@IKs1?n)}U>hDw04v``n7W`JjZRhN{5W77#68ux=A~Dr}IcFDQNkZp4 z?SGe0J*vebg-hisc_|Lur=Qv%hi%P$(V=+W!-M|0Jqc!fW=EajXZCDppO%0IS~k-S zRYpz(ipN6Nr}n-u@pF_669b-WoMW(g2(My%0BH!|B^6II@kL+QD`6NLlM0=uJ%Xv} z#24n8xKOT*-Nkm<%c=dSFv)c|sr@8)YE$yyhFr&FTQD1pd~=~X)Tr5yNEqIOxoCL{fP;7>S4Gre-zn z&vy(pPL}IL$HIYpN1-XY42%rSL>xv@@Ke5H5d7^uPek;+e#bx)SOP}kr6Hj4{NtG4 zQPc~VRp2NJ#k&Q4UV&r1$jM$}8G1+z<(+ICES&GX2iiO7#c;XY0s8t1$Fo-GzeUzy z#1{D+qr6Q{*7It~@o1oj#_uauRy)uuQD0e|^w1SI^BrNO77WB5Aa5I&^#*@Cm$`Xe51(3viKajHE!%K4x@h;1ndm=?S}T~5Qm`$jvTfXkQAqn2w8S?L&OWCnUmTR+@9Er>KvOU5|w z?$wZKW1VQGg!(3Z!p~@qWvfqZ%Rs_2>ru013}fD?a(0;GBEyP{HBJUzsCGu4SlQar z5!w^1L+iw*{95hY(aUCpj*R(#*ERrIQFB+iFuPqbFOIxm)Ue&j*bQPcp zT%%yPb+F+iiX4HnXc&H;)`lxxU!#EhiVbD|;Ce9fl+1|2-c_z#W4aYHsVx^RJsU;K z7%F9E6L6%-Rcj984eY=bE*t#uqr@~-Yx5zgDZ!=b=G5rQD%TD?(TcRXCmwS}9Pen$ zK|oi%9W~|H@$_m}xEJv4)vf`Cc6C8pHUee_QzG$9UgHY(#8ZO{Yg~~hEYsSukeQh% zPmH*~w8Aw979K#)fQ-2R`lw5Q+gGBy68p5H#YN87_-aBX@@dh}tl-A7cUe|E8{@v8 z!FBhjo|0LHR~UDs1V%M-9!$5m&B`}sdLy2Y5u;JPG8!!*;2mugIghYz8K$xBfzYs^ zk5Q*P!Jz$RSEPzXocIasf8(@54(A>W{+m75IIVDii|9nf`Dkp~Kx|bWF7;&?&%3#v zQqU;TW!SYFSAl!_TN%kuEW`WrVf_N{o<-rsjqEP(tFC9mmAwe__aZBoJ(pmGyk)jB zJYK9J#rovNN!a2UXuRfG5@Uy7pV#77{4BfmpJmU0ddo)sRsPkr74dpmi)R4|c`c6r zL30;SFi#=67z**lxO<&vg4r1v7lL=aXEg4LRy?rYgI@j6D@A``eg8kKO27pYX(7}S z2Rb% z&lQAzj9k;h;1SI1N6bd)Gd9ax83l&pS>DKfRR+Jm&MqMv+4URI0yJhvj8lhzQU3KT z`&2es_J83RniE+&{N3%%g(q^nZn(T3HKd(6-Wt;>0Wj)5