diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/IpQueue.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/IpQueue.cs
new file mode 100644
index 0000000..ac1d4bc
--- /dev/null
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/IpQueue.cs
@@ -0,0 +1,265 @@
+// Copyright 2021-2025 The NATS Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Adapted from server/ipqueue.go in the NATS server Go source.
+
+using System.Collections.Concurrent;
+using System.Threading.Channels;
+
+namespace ZB.MOM.NatsNet.Server.Internal;
+
+///
+/// Error singletons for IpQueue limit violations.
+/// Mirrors errIPQLenLimitReached and errIPQSizeLimitReached.
+///
+public static class IpQueueErrors
+{
+ public static readonly Exception LenLimitReached =
+ new InvalidOperationException("IPQ len limit reached");
+
+ public static readonly Exception SizeLimitReached =
+ new InvalidOperationException("IPQ size limit reached");
+}
+
+///
+/// Generic intra-process queue with a one-slot notification channel.
+/// Mirrors ipQueue[T] in server/ipqueue.go.
+///
+public sealed class IpQueue
+{
+ /// Default maximum size of the recycled backing-list capacity.
+ public const int DefaultMaxRecycleSize = 4 * 1024;
+
+ private long _inprogress;
+ private readonly object _lock = new();
+
+ // Backing list with a logical start position (mirrors slice + pos).
+ private List? _elts;
+ private int _pos;
+ private ulong _sz;
+
+ private readonly string _name;
+ private readonly ConcurrentDictionary? _registry;
+
+ // One-slot notification channel (mirrors chan struct{} with capacity 1).
+ private readonly Channel _ch;
+
+ // Single-slot list pool to amortise allocations.
+ private List? _pooled;
+
+ // Options
+ /// Maximum list capacity to allow recycling.
+ public int MaxRecycleSize { get; }
+
+ private readonly Func? _calc;
+ private readonly ulong _msz; // size limit
+ private readonly int _mlen; // length limit
+
+ /// Notification channel reader — wait on this to learn items were added.
+ public ChannelReader Ch => _ch.Reader;
+
+ ///
+ /// Creates a new queue, optionally registering it in .
+ /// Mirrors newIPQueue.
+ ///
+ public IpQueue(
+ string name,
+ ConcurrentDictionary? registry = null,
+ int maxRecycleSize = DefaultMaxRecycleSize,
+ Func? sizeCalc = null,
+ ulong maxSize = 0,
+ int maxLen = 0)
+ {
+ MaxRecycleSize = maxRecycleSize;
+ _calc = sizeCalc;
+ _msz = maxSize;
+ _mlen = maxLen;
+ _name = name;
+ _registry = registry;
+
+ _ch = Channel.CreateBounded(new BoundedChannelOptions(1)
+ {
+ FullMode = BoundedChannelFullMode.DropWrite,
+ SingleReader = false,
+ SingleWriter = false,
+ });
+
+ registry?.TryAdd(name, this);
+ }
+
+ ///
+ /// Adds an element to the queue.
+ /// Returns the new logical length and an error if a limit was hit.
+ /// Mirrors ipQueue.push.
+ ///
+ public (int len, Exception? error) Push(T e)
+ {
+ bool shouldSignal;
+ int resultLen;
+
+ lock (_lock)
+ {
+ var l = (_elts?.Count ?? 0) - _pos;
+
+ if (_mlen > 0 && l == _mlen)
+ return (l, IpQueueErrors.LenLimitReached);
+
+ if (_calc != null)
+ {
+ var sz = _calc(e);
+ if (_msz > 0 && _sz + sz > _msz)
+ return (l, IpQueueErrors.SizeLimitReached);
+ _sz += sz;
+ }
+
+ if (_elts == null)
+ {
+ _elts = _pooled ?? new List(32);
+ _pooled = null;
+ _pos = 0;
+ }
+
+ _elts.Add(e);
+ resultLen = _elts.Count - _pos;
+ shouldSignal = l == 0;
+ }
+
+ if (shouldSignal)
+ _ch.Writer.TryWrite(true);
+
+ return (resultLen, null);
+ }
+
+ ///
+ /// Returns all pending elements and empties the queue.
+ /// Increments the in-progress counter by the returned count.
+ /// Mirrors ipQueue.pop.
+ ///
+ public T[]? Pop()
+ {
+ lock (_lock)
+ {
+ if (_elts == null) return null;
+ var count = _elts.Count - _pos;
+ if (count == 0) return null;
+
+ var result = _pos == 0
+ ? _elts.ToArray()
+ : _elts.GetRange(_pos, count).ToArray();
+
+ Interlocked.Add(ref _inprogress, result.Length);
+ _elts = null;
+ _pos = 0;
+ _sz = 0;
+ return result;
+ }
+ }
+
+ ///
+ /// Returns the first pending element without bulk-removing the rest.
+ /// Does NOT affect the in-progress counter.
+ /// Re-signals the notification channel if more elements remain.
+ /// Mirrors ipQueue.popOne.
+ ///
+ public (T value, bool ok) PopOne()
+ {
+ lock (_lock)
+ {
+ if (_elts == null || _elts.Count - _pos == 0)
+ return (default!, false);
+
+ var e = _elts[_pos];
+ var remaining = _elts.Count - _pos - 1;
+
+ if (_calc != null)
+ _sz -= _calc(e);
+
+ if (remaining > 0)
+ {
+ _pos++;
+ _ch.Writer.TryWrite(true); // re-signal: more items pending
+ }
+ else
+ {
+ // All consumed — try to pool the backing list.
+ if (_elts.Capacity <= MaxRecycleSize)
+ {
+ _elts.Clear();
+ _pooled = _elts;
+ }
+ _elts = null;
+ _pos = 0;
+ _sz = 0;
+ }
+
+ return (e, true);
+ }
+ }
+
+ ///
+ /// Returns the array obtained via to the pool and
+ /// decrements the in-progress counter.
+ /// Mirrors ipQueue.recycle.
+ ///
+ public void Recycle(T[]? elts)
+ {
+ if (elts == null || elts.Length == 0) return;
+ Interlocked.Add(ref _inprogress, -elts.Length);
+ }
+
+ /// Returns the current logical queue length. Mirrors ipQueue.len.
+ public int Len()
+ {
+ lock (_lock) return (_elts?.Count ?? 0) - _pos;
+ }
+
+ ///
+ /// Returns the total calculated size (only meaningful when a size-calc function was provided).
+ /// Mirrors ipQueue.size.
+ ///
+ public ulong Size()
+ {
+ lock (_lock) return _sz;
+ }
+
+ ///
+ /// Empties the queue and consumes any pending notification signal.
+ /// Returns the number of items drained.
+ /// Mirrors ipQueue.drain.
+ ///
+ public int Drain()
+ {
+ lock (_lock)
+ {
+ var count = (_elts?.Count ?? 0) - _pos;
+ _elts = null;
+ _pos = 0;
+ _sz = 0;
+ _ch.Reader.TryRead(out _); // consume signal
+ return count;
+ }
+ }
+
+ ///
+ /// Returns the number of elements currently being processed (popped but not yet recycled).
+ /// Mirrors ipQueue.inProgress.
+ ///
+ public long InProgress() => Interlocked.Read(ref _inprogress);
+
+ ///
+ /// Removes this queue from the server registry.
+ /// Push/pop operations remain valid.
+ /// Mirrors ipQueue.unregister.
+ ///
+ public void Unregister() => _registry?.TryRemove(_name, out _);
+}
diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/MsgScheduling.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/MsgScheduling.cs
new file mode 100644
index 0000000..e621cc7
--- /dev/null
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/MsgScheduling.cs
@@ -0,0 +1,431 @@
+// Copyright 2025 The NATS Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Adapted from server/scheduler.go in the NATS server Go source.
+
+using System.Buffers.Binary;
+using ZB.MOM.NatsNet.Server.Internal.DataStructures;
+
+namespace ZB.MOM.NatsNet.Server.Internal;
+
+///
+/// Error for when we try to decode a binary-encoded message schedule with an unknown version number.
+/// Mirrors ErrMsgScheduleInvalidVersion.
+///
+public static class MsgSchedulingErrors
+{
+ public static readonly Exception ErrMsgScheduleInvalidVersion =
+ new InvalidOperationException("msg scheduling: encoded version not known");
+}
+
+///
+/// A single scheduled message entry.
+/// Mirrors the unnamed struct in the schedules map in scheduler.go.
+///
+internal sealed class MsgSchedule
+{
+ public ulong Seq;
+ public long Ts;
+}
+
+///
+/// Tracks per-subject scheduled messages using a hash wheel for TTL management.
+/// Mirrors MsgScheduling in server/scheduler.go.
+/// Note: getScheduledMessages is deferred to session 08/19 (requires JetStream types).
+///
+public sealed class MsgScheduling
+{
+ private const int HeaderLen = 17; // 1 magic + 2 × uint64
+
+ private readonly Action _run;
+ private readonly HashWheel _ttls;
+ private Timer? _timer;
+ // _running is set to true by the run callback when getScheduledMessages is active (session 08/19).
+#pragma warning disable CS0649
+ private bool _running;
+#pragma warning restore CS0649
+ private long _deadline;
+ private readonly Dictionary _schedules;
+ private readonly Dictionary _seqToSubj;
+ private readonly HashSet _inflight;
+
+ ///
+ /// Creates a new with the given callback.
+ /// Mirrors newMsgScheduling.
+ ///
+ public MsgScheduling(Action run)
+ {
+ _run = run;
+ _ttls = HashWheel.NewHashWheel();
+ _schedules = new Dictionary();
+ _seqToSubj = new Dictionary();
+ _inflight = new HashSet();
+ }
+
+ ///
+ /// Adds a schedule entry and resets the timer.
+ /// Mirrors MsgScheduling.add.
+ ///
+ public void Add(ulong seq, string subj, long ts)
+ {
+ Init(seq, subj, ts);
+ ResetTimer();
+ }
+
+ ///
+ /// Inserts or updates the schedule entry for the given subject.
+ /// Mirrors MsgScheduling.init.
+ ///
+ public void Init(ulong seq, string subj, long ts)
+ {
+ if (_schedules.TryGetValue(subj, out var sched))
+ {
+ _seqToSubj.Remove(sched.Seq);
+ _ttls.Remove(sched.Seq, sched.Ts);
+ _ttls.Add(seq, ts);
+ sched.Ts = ts;
+ sched.Seq = seq;
+ }
+ else
+ {
+ _ttls.Add(seq, ts);
+ _schedules[subj] = new MsgSchedule { Seq = seq, Ts = ts };
+ }
+ _seqToSubj[seq] = subj;
+ _inflight.Remove(subj);
+ }
+
+ ///
+ /// Updates the timestamp for an existing schedule without changing the sequence.
+ /// Mirrors MsgScheduling.update.
+ ///
+ public void Update(string subj, long ts)
+ {
+ if (!_schedules.TryGetValue(subj, out var sched)) return;
+ _ttls.Remove(sched.Seq, sched.Ts);
+ _ttls.Add(sched.Seq, ts);
+ sched.Ts = ts;
+ _inflight.Remove(subj);
+ ResetTimer();
+ }
+
+ ///
+ /// Marks a subject as in-flight (being processed).
+ /// Mirrors MsgScheduling.markInflight.
+ ///
+ public void MarkInflight(string subj)
+ {
+ if (_schedules.ContainsKey(subj))
+ _inflight.Add(subj);
+ }
+
+ ///
+ /// Returns true if the subject is currently in-flight.
+ /// Mirrors MsgScheduling.isInflight.
+ ///
+ public bool IsInflight(string subj) => _inflight.Contains(subj);
+
+ ///
+ /// Removes the schedule entry for the given sequence number.
+ /// Mirrors MsgScheduling.remove.
+ ///
+ public void Remove(ulong seq)
+ {
+ if (!_seqToSubj.TryGetValue(seq, out var subj)) return;
+ _seqToSubj.Remove(seq);
+ _schedules.Remove(subj);
+ }
+
+ ///
+ /// Removes the schedule entry for the given subject.
+ /// Mirrors MsgScheduling.removeSubject.
+ ///
+ public void RemoveSubject(string subj)
+ {
+ if (!_schedules.TryGetValue(subj, out var sched)) return;
+ _ttls.Remove(sched.Seq, sched.Ts);
+ _schedules.Remove(subj);
+ _seqToSubj.Remove(sched.Seq);
+ }
+
+ ///
+ /// Clears all in-flight markers.
+ /// Mirrors MsgScheduling.clearInflight.
+ ///
+ public void ClearInflight() => _inflight.Clear();
+
+ ///
+ /// Arms or resets the internal timer to fire at the next scheduled expiration.
+ /// Mirrors MsgScheduling.resetTimer.
+ ///
+ public void ResetTimer()
+ {
+ if (_running) return;
+
+ var next = _ttls.GetNextExpiration(long.MaxValue);
+ if (next == long.MaxValue)
+ {
+ ClearTimer(ref _timer);
+ return;
+ }
+
+ // Convert nanosecond timestamp to DateTime (1 tick = 100 ns).
+ var nextTicks = DateTime.UnixEpoch.Ticks + next / 100L;
+ var nextUtc = new DateTime(nextTicks, DateTimeKind.Utc);
+ var fireIn = nextUtc - DateTime.UtcNow;
+
+ // Clamp minimum interval.
+ if (fireIn < TimeSpan.FromMilliseconds(250))
+ fireIn = TimeSpan.FromMilliseconds(250);
+
+ var deadline = DateTime.UtcNow.Ticks + fireIn.Ticks;
+ if (_deadline > 0 && deadline > _deadline) return;
+
+ _deadline = deadline;
+ if (_timer != null)
+ _timer.Change(fireIn, Timeout.InfiniteTimeSpan);
+ else
+ _timer = new Timer(_ => _run(), null, fireIn, Timeout.InfiniteTimeSpan);
+ }
+
+ // getScheduledMessages is deferred to session 08/19 — requires JetStream inMsg, StoreMsg types.
+
+ ///
+ /// Encodes the current schedule state to a binary snapshot.
+ /// Mirrors MsgScheduling.encode.
+ ///
+ public byte[] Encode(ulong highSeq)
+ {
+ var count = (ulong)_schedules.Count;
+ var buf = new List(HeaderLen + (int)(count * 20));
+
+ buf.Add(1); // magic version
+ AppendUInt64(buf, count);
+ AppendUInt64(buf, highSeq);
+
+ foreach (var (subj, sched) in _schedules)
+ {
+ var slen = (ushort)Math.Min((ulong)subj.Length, ushort.MaxValue);
+ AppendUInt16(buf, slen);
+ buf.AddRange(System.Text.Encoding.Latin1.GetBytes(subj[..slen]));
+ AppendVarint(buf, sched.Ts);
+ AppendUvarint(buf, sched.Seq);
+ }
+
+ return [.. buf];
+ }
+
+ ///
+ /// Decodes a binary snapshot into the current schedule.
+ /// Returns the high-sequence stamp or throws on error.
+ /// Mirrors MsgScheduling.decode.
+ ///
+ public (ulong highSeq, Exception? err) Decode(byte[] b)
+ {
+ if (b.Length < HeaderLen)
+ return (0, new System.IO.EndOfStreamException("short buffer"));
+
+ if (b[0] != 1)
+ return (0, MsgSchedulingErrors.ErrMsgScheduleInvalidVersion);
+
+ var count = BinaryPrimitives.ReadUInt64LittleEndian(b.AsSpan(1));
+ var stamp = BinaryPrimitives.ReadUInt64LittleEndian(b.AsSpan(9));
+ var offset = HeaderLen;
+
+ for (ulong i = 0; i < count; i++)
+ {
+ if (offset + 2 > b.Length)
+ return (0, new System.IO.EndOfStreamException("unexpected EOF"));
+
+ var sl = BinaryPrimitives.ReadUInt16LittleEndian(b.AsSpan(offset));
+ offset += 2;
+
+ if (offset + sl > b.Length)
+ return (0, new System.IO.EndOfStreamException("unexpected EOF"));
+
+ var subj = System.Text.Encoding.Latin1.GetString(b, offset, sl);
+ offset += sl;
+
+ var (ts, tn) = ReadVarint(b, offset);
+ if (tn < 0) return (0, new System.IO.EndOfStreamException("unexpected EOF"));
+ offset += tn;
+
+ var (seq, vn) = ReadUvarint(b, offset);
+ if (vn < 0) return (0, new System.IO.EndOfStreamException("unexpected EOF"));
+ offset += vn;
+
+ Init(seq, subj, ts);
+ }
+
+ return (stamp, null);
+ }
+
+ ///
+ /// Parses a message schedule pattern and returns the next fire time,
+ /// whether it repeats, and whether the pattern was valid.
+ /// Mirrors parseMsgSchedule.
+ ///
+ public static (DateTime next, bool repeat, bool ok) ParseMsgSchedule(string pattern, long ts)
+ {
+ if (pattern == string.Empty)
+ return (default, false, true);
+
+ if (pattern.StartsWith("@at ", StringComparison.Ordinal))
+ {
+ if (DateTime.TryParseExact(
+ pattern[4..],
+ "yyyy-MM-ddTHH:mm:ssK",
+ System.Globalization.CultureInfo.InvariantCulture,
+ System.Globalization.DateTimeStyles.AdjustToUniversal,
+ out var t))
+ return (t, false, true);
+ return (default, false, false);
+ }
+
+ if (pattern.StartsWith("@every ", StringComparison.Ordinal))
+ {
+ if (!TryParseDuration(pattern[7..], out var dur))
+ return (default, false, false);
+
+ if (dur.TotalSeconds < 1)
+ return (default, false, false);
+
+ // Advance past a stale next tick the same way Go does.
+ var prev = DateTimeOffset.FromUnixTimeMilliseconds(ts / 1_000_000).UtcDateTime;
+ var next = RoundToSecond(prev).Add(dur);
+ var now = RoundToSecond(DateTime.UtcNow);
+ if (next < now)
+ next = now.Add(dur);
+
+ return (next, true, true);
+ }
+
+ return (default, false, false);
+ }
+
+ // -------------------------------------------------------------------------
+ // Private helpers
+ // -------------------------------------------------------------------------
+
+ private static void ClearTimer(ref Timer? timer)
+ {
+ var t = timer;
+ if (t == null) return;
+ t.Dispose();
+ timer = null;
+ }
+
+ private static DateTime RoundToSecond(DateTime dt) =>
+ new(dt.Year, dt.Month, dt.Day, dt.Hour, dt.Minute, dt.Second, DateTimeKind.Utc);
+
+ // Naive duration parser for strings like "1s", "500ms", "2m", "1h30m".
+ private static bool TryParseDuration(string s, out TimeSpan result)
+ {
+ result = default;
+ if (s.EndsWith("ms", StringComparison.Ordinal) &&
+ double.TryParse(s[..^2], out var ms))
+ {
+ result = TimeSpan.FromMilliseconds(ms);
+ return true;
+ }
+ if (s.EndsWith('s') && double.TryParse(s[..^1], out var sec))
+ {
+ result = TimeSpan.FromSeconds(sec);
+ return true;
+ }
+ if (s.EndsWith('m') && double.TryParse(s[..^1], out var min))
+ {
+ result = TimeSpan.FromMinutes(min);
+ return true;
+ }
+ if (s.EndsWith('h') && double.TryParse(s[..^1], out var hr))
+ {
+ result = TimeSpan.FromHours(hr);
+ return true;
+ }
+ // Try .NET TimeSpan.Parse as a fallback.
+ return TimeSpan.TryParse(s, out result);
+ }
+
+ // -------------------------------------------------------------------------
+ // Binary encoding helpers (mirrors encoding/binary in Go)
+ // -------------------------------------------------------------------------
+
+ private static void AppendUInt64(List buf, ulong v)
+ {
+ Span tmp = stackalloc byte[8];
+ BinaryPrimitives.WriteUInt64LittleEndian(tmp, v);
+ buf.AddRange(tmp.ToArray());
+ }
+
+ private static void AppendUInt16(List buf, ushort v)
+ {
+ Span tmp = stackalloc byte[2];
+ BinaryPrimitives.WriteUInt16LittleEndian(tmp, v);
+ buf.AddRange(tmp.ToArray());
+ }
+
+ /// Appends a zigzag-encoded signed varint (mirrors binary.AppendVarint).
+ private static void AppendVarint(List buf, long x)
+ {
+ var ux = (ulong)(x << 1);
+ if (x < 0) ux = ~ux;
+ AppendUvarint(buf, ux);
+ }
+
+ /// Appends an unsigned varint (mirrors binary.AppendUvarint).
+ private static void AppendUvarint(List buf, ulong x)
+ {
+ while (x >= 0x80)
+ {
+ buf.Add((byte)(x | 0x80));
+ x >>= 7;
+ }
+ buf.Add((byte)x);
+ }
+
+ ///
+ /// Reads a zigzag signed varint from starting at .
+ /// Returns (value, bytesRead); bytesRead is negative on overflow.
+ ///
+ private static (long value, int n) ReadVarint(byte[] b, int offset)
+ {
+ var (ux, n) = ReadUvarint(b, offset);
+ var x = (long)(ux >> 1);
+ if ((ux & 1) != 0) x = ~x;
+ return (x, n);
+ }
+
+ ///
+ /// Reads an unsigned varint from starting at .
+ /// Returns (value, bytesRead); bytesRead is negative on overflow.
+ ///
+ private static (ulong value, int n) ReadUvarint(byte[] b, int offset)
+ {
+ ulong x = 0;
+ var s = 0;
+ for (var i = offset; i < b.Length; i++)
+ {
+ var by = b[i];
+ if (i - offset == 10) return (0, -(i - offset + 1)); // overflow
+ if (by < 0x80)
+ {
+ if (i - offset == 9 && by > 1) return (0, -(i - offset + 1));
+ return (x | ((ulong)by << s), i - offset + 1);
+ }
+ x |= (ulong)(by & 0x7F) << s;
+ s += 7;
+ }
+ return (0, 0); // short buffer
+ }
+}
diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/ServerUtilities.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/ServerUtilities.cs
new file mode 100644
index 0000000..88ed9be
--- /dev/null
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/ServerUtilities.cs
@@ -0,0 +1,437 @@
+// Copyright 2012-2025 The NATS Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Adapted from server/util.go in the NATS server Go source.
+
+using System.Net;
+using System.Text.RegularExpressions;
+
+namespace ZB.MOM.NatsNet.Server.Internal;
+
+///
+/// General-purpose server utility methods.
+/// Mirrors server/util.go.
+///
+public static class ServerUtilities
+{
+ // Semver validation regex — mirrors semVerRe in const.go.
+ private static readonly Regex SemVerRe = new(
+ @"^(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)(?:-((?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+([0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$",
+ RegexOptions.Compiled);
+
+ // -------------------------------------------------------------------------
+ // Version helpers
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Parses a semver string into major/minor/patch components.
+ /// Returns an error if the string is not a valid semver.
+ /// Mirrors versionComponents.
+ ///
+ public static (int major, int minor, int patch, Exception? err) VersionComponents(string version)
+ {
+ var m = SemVerRe.Match(version);
+ if (!m.Success)
+ return (0, 0, 0, new InvalidOperationException("invalid semver"));
+
+ if (!int.TryParse(m.Groups[1].Value, out var major) ||
+ !int.TryParse(m.Groups[2].Value, out var minor) ||
+ !int.TryParse(m.Groups[3].Value, out var patch))
+ return (-1, -1, -1, new InvalidOperationException("invalid semver component"));
+
+ return (major, minor, patch, null);
+ }
+
+ ///
+ /// Returns (true, nil) if is at least major.minor.patch.
+ /// Mirrors versionAtLeastCheckError.
+ ///
+ public static (bool ok, Exception? err) VersionAtLeastCheckError(
+ string version, int emajor, int eminor, int epatch)
+ {
+ var (major, minor, patch, err) = VersionComponents(version);
+ if (err != null) return (false, err);
+
+ if (major > emajor) return (true, null);
+ if (major == emajor && minor > eminor) return (true, null);
+ if (major == emajor && minor == eminor && patch >= epatch) return (true, null);
+ return (false, null);
+ }
+
+ ///
+ /// Returns true if is at least major.minor.patch.
+ /// Mirrors versionAtLeast.
+ ///
+ public static bool VersionAtLeast(string version, int emajor, int eminor, int epatch)
+ {
+ var (ok, _) = VersionAtLeastCheckError(version, emajor, eminor, epatch);
+ return ok;
+ }
+
+ // -------------------------------------------------------------------------
+ // Integer parsing helpers (used for NATS protocol parsing)
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Parses a decimal positive integer from ASCII bytes.
+ /// Returns -1 on error or if the input contains non-digit characters.
+ /// Mirrors parseSize.
+ ///
+ public static int ParseSize(ReadOnlySpan d)
+ {
+ const int MaxParseSizeLen = 9; // 999M
+
+ if (d.IsEmpty || d.Length > MaxParseSizeLen) return -1;
+
+ var n = 0;
+ foreach (var dec in d)
+ {
+ if (dec < '0' || dec > '9') return -1;
+ n = n * 10 + (dec - '0');
+ }
+ return n;
+ }
+
+ ///
+ /// Parses a decimal positive int64 from ASCII bytes.
+ /// Returns -1 on error.
+ /// Mirrors parseInt64.
+ ///
+ public static long ParseInt64(ReadOnlySpan d)
+ {
+ if (d.IsEmpty) return -1;
+
+ long n = 0;
+ foreach (var dec in d)
+ {
+ if (dec < '0' || dec > '9') return -1;
+ n = n * 10 + (dec - '0');
+ }
+ return n;
+ }
+
+ // -------------------------------------------------------------------------
+ // Duration / network helpers
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Converts float64 seconds to a .
+ /// Mirrors secondsToDuration.
+ ///
+ public static TimeSpan SecondsToDuration(double seconds) =>
+ TimeSpan.FromSeconds(seconds);
+
+ ///
+ /// Splits "host:port" into components, using
+ /// when no port (or port 0 / -1) is present.
+ /// Mirrors parseHostPort.
+ ///
+ public static (string host, int port, Exception? err) ParseHostPort(string hostPort, int defaultPort)
+ {
+ if (string.IsNullOrEmpty(hostPort))
+ return ("", -1, new InvalidOperationException("no hostport specified"));
+
+ // Try splitting; if port is missing, append the default and retry.
+ string host, sPort;
+ try
+ {
+ var ep = ParseEndpoint(hostPort);
+ host = ep.host;
+ sPort = ep.port;
+ }
+ catch
+ {
+ try
+ {
+ var ep = ParseEndpoint($"{hostPort}:{defaultPort}");
+ host = ep.host;
+ sPort = ep.port;
+ }
+ catch (Exception ex)
+ {
+ return ("", -1, ex);
+ }
+ }
+
+ if (!int.TryParse(sPort.Trim(), out var port))
+ return ("", -1, new InvalidOperationException($"invalid port: {sPort}"));
+
+ if (port == 0 || port == -1)
+ port = defaultPort;
+
+ return (host.Trim(), port, null);
+ }
+
+ private static (string host, string port) ParseEndpoint(string hostPort)
+ {
+ // net.SplitHostPort equivalent — handles IPv6 [::1]:port
+ if (hostPort.StartsWith('['))
+ {
+ var closeIdx = hostPort.IndexOf(']');
+ if (closeIdx < 0 || closeIdx + 1 >= hostPort.Length || hostPort[closeIdx + 1] != ':')
+ throw new InvalidOperationException($"missing port in address {hostPort}");
+ return (hostPort[1..closeIdx], hostPort[(closeIdx + 2)..]);
+ }
+
+ var lastColon = hostPort.LastIndexOf(':');
+ if (lastColon < 0)
+ throw new InvalidOperationException($"missing port in address {hostPort}");
+
+ var host = hostPort[..lastColon];
+ var port = hostPort[(lastColon + 1)..];
+
+ // Reject bare IPv6 addresses (multiple colons without brackets).
+ if (host.Contains(':'))
+ throw new InvalidOperationException($"too many colons in address {hostPort}");
+
+ return (host, port);
+ }
+
+ ///
+ /// Returns true if two instances represent the same URL.
+ /// Mirrors urlsAreEqual.
+ ///
+ public static bool UrlsAreEqual(Uri? u1, Uri? u2) =>
+ u1 == u2 || (u1 != null && u2 != null && u1.ToString() == u2.ToString());
+
+ // -------------------------------------------------------------------------
+ // Comma formatting
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Formats an int64 with comma thousands separators.
+ /// Mirrors comma in util.go.
+ ///
+ public static string Comma(long v)
+ {
+ if (v == long.MinValue) return "-9,223,372,036,854,775,808";
+
+ var sign = "";
+ if (v < 0) { sign = "-"; v = -v; }
+
+ var parts = new string[7];
+ var j = parts.Length - 1;
+
+ while (v > 999)
+ {
+ var part = (v % 1000).ToString();
+ parts[j--] = part.Length switch { 2 => "0" + part, 1 => "00" + part, _ => part };
+ v /= 1000;
+ }
+ parts[j] = v.ToString();
+
+ return sign + string.Join(",", parts.Skip(j));
+ }
+
+ // -------------------------------------------------------------------------
+ // TCP helpers
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Creates a TCP listener with keepalives disabled (NATS server default).
+ /// Mirrors natsListen.
+ ///
+ public static System.Net.Sockets.TcpListener NatsListen(string address, int port)
+ {
+ // .NET TcpListener does not set keepalive by default; the socket can be
+ // further configured after creation if needed.
+ var listener = new System.Net.Sockets.TcpListener(IPAddress.Parse(address), port);
+ return listener;
+ }
+
+ ///
+ /// Opens a TCP connection with the given timeout and keepalives disabled.
+ /// Mirrors natsDialTimeout.
+ ///
+ public static async Task NatsDialTimeoutAsync(
+ string host, int port, TimeSpan timeout)
+ {
+ var client = new System.Net.Sockets.TcpClient();
+ // Disable keepalive to match Go 1.12 behavior.
+ client.Client.SetSocketOption(
+ System.Net.Sockets.SocketOptionLevel.Socket,
+ System.Net.Sockets.SocketOptionName.KeepAlive,
+ false);
+
+ using var cts = new CancellationTokenSource(timeout);
+ await client.ConnectAsync(host, port, cts.Token);
+ return client;
+ }
+
+ // -------------------------------------------------------------------------
+ // URL redaction
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Returns a copy of where any URL that
+ /// contains a password has its password replaced with "xxxxx".
+ /// Mirrors redactURLList.
+ ///
+ public static Uri[] RedactUrlList(Uri[] unredacted)
+ {
+ var r = new Uri[unredacted.Length];
+ var needCopy = false;
+
+ for (var i = 0; i < unredacted.Length; i++)
+ {
+ var u = unredacted[i];
+ if (u?.UserInfo?.Contains(':') == true)
+ {
+ needCopy = true;
+ var ui = u.UserInfo;
+ var colon = ui.IndexOf(':');
+ var username = ui[..colon];
+ var b = new UriBuilder(u) { Password = "xxxxx", UserName = username };
+ r[i] = b.Uri;
+ }
+ else
+ {
+ r[i] = u!;
+ }
+ }
+
+ return needCopy ? r : unredacted;
+ }
+
+ ///
+ /// Returns the URL string with the password component redacted ("xxxxx").
+ /// Returns the original string if no password is present or it cannot be parsed.
+ /// Mirrors redactURLString.
+ ///
+ public static string RedactUrlString(string raw)
+ {
+ if (!raw.Contains('@')) return raw;
+ if (!Uri.TryCreate(raw, UriKind.Absolute, out var u)) return raw;
+
+ if (!u.UserInfo.Contains(':')) return raw;
+
+ var colon = u.UserInfo.IndexOf(':');
+ var username = u.UserInfo[..colon];
+ var b = new UriBuilder(u) { Password = "xxxxx", UserName = username };
+ var result = b.Uri.ToString();
+ // UriBuilder adds a trailing slash for authority-only URLs; strip it if the input had none.
+ if (!raw.EndsWith('/') && result.EndsWith('/'))
+ result = result[..^1];
+ return result;
+ }
+
+ ///
+ /// Returns the Host part of each URL in the list.
+ /// Mirrors getURLsAsString.
+ ///
+ public static string[] GetUrlsAsString(Uri[] urls)
+ {
+ var result = new string[urls.Length];
+ for (var i = 0; i < urls.Length; i++)
+ result[i] = urls[i].Authority; // host:port
+ return result;
+ }
+
+ // -------------------------------------------------------------------------
+ // Copy helpers
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Returns a copy of , or null if src is empty.
+ /// Mirrors copyBytes.
+ ///
+ public static byte[]? CopyBytes(byte[]? src)
+ {
+ if (src == null || src.Length == 0) return null;
+ var dst = new byte[src.Length];
+ src.CopyTo(dst, 0);
+ return dst;
+ }
+
+ ///
+ /// Returns a copy of , or null if src is null.
+ /// Mirrors copyStrings.
+ ///
+ public static string[]? CopyStrings(string[]? src)
+ {
+ if (src == null) return null;
+ var dst = new string[src.Length];
+ src.CopyTo(dst, 0);
+ return dst;
+ }
+
+ // -------------------------------------------------------------------------
+ // Parallel task queue
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Creates a bounded channel onto which tasks can be posted for parallel
+ /// execution across a pool of dedicated threads. Close the returned channel
+ /// to signal workers to stop (after queued items complete).
+ /// Mirrors parallelTaskQueue.
+ ///
+ public static System.Threading.Channels.ChannelWriter CreateParallelTaskQueue(int maxParallelism = 0)
+ {
+ var mp = maxParallelism <= 0 ? Environment.ProcessorCount : Math.Max(Environment.ProcessorCount, maxParallelism);
+ var channel = System.Threading.Channels.Channel.CreateBounded(mp);
+
+ for (var i = 0; i < mp; i++)
+ {
+ Task.Run(async () =>
+ {
+ await foreach (var fn in channel.Reader.ReadAllAsync())
+ fn();
+ });
+ }
+
+ return channel.Writer;
+ }
+}
+
+// -------------------------------------------------------------------------
+// RefCountedUrlSet (mirrors refCountedUrlSet map[string]int in util.go)
+// -------------------------------------------------------------------------
+
+///
+/// A reference-counted set of URL strings used for gossip URL management.
+/// Mirrors refCountedUrlSet in server/util.go.
+///
+public sealed class RefCountedUrlSet
+{
+ private readonly Dictionary _map = new();
+
+ ///
+ /// Adds . Returns true if it was added for the first time.
+ /// Mirrors refCountedUrlSet.addUrl.
+ ///
+ public bool AddUrl(string urlStr)
+ {
+ _map.TryGetValue(urlStr, out var count);
+ _map[urlStr] = count + 1;
+ return count == 0;
+ }
+
+ ///
+ /// Decrements the reference count for .
+ /// Returns true if this was the last reference (entry removed).
+ /// Mirrors refCountedUrlSet.removeUrl.
+ ///
+ public bool RemoveUrl(string urlStr)
+ {
+ if (!_map.TryGetValue(urlStr, out var count)) return false;
+ if (count == 1) { _map.Remove(urlStr); return true; }
+ _map[urlStr] = count - 1;
+ return false;
+ }
+
+ ///
+ /// Returns the unique URL strings currently in the set.
+ /// Mirrors refCountedUrlSet.getAsStringSlice.
+ ///
+ public string[] GetAsStringSlice() => [.. _map.Keys];
+}
diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/SubjectTransform.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/SubjectTransform.cs
new file mode 100644
index 0000000..f1ef9c6
--- /dev/null
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/SubjectTransform.cs
@@ -0,0 +1,842 @@
+// Copyright 2023-2025 The NATS Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// Adapted from server/subject_transform.go in the NATS server Go source.
+
+using System.Text.RegularExpressions;
+
+namespace ZB.MOM.NatsNet.Server.Internal;
+
+// -------------------------------------------------------------------------
+// Subject token constants (mirrors const block in server/sublist.go)
+// -------------------------------------------------------------------------
+
+internal static class SubjectTokens
+{
+ internal const char Pwc = '*'; // partial wildcard character
+ internal const string Pwcs = "*"; // partial wildcard string
+ internal const char Fwc = '>'; // full wildcard character
+ internal const string Fwcs = ">"; // full wildcard string
+ internal const string Tsep = "."; // token separator string
+ internal const char Btsep = '.'; // token separator character
+ internal const string Empty = ""; // _EMPTY_
+}
+
+// -------------------------------------------------------------------------
+// Transform type constants (mirrors enum in subject_transform.go)
+// -------------------------------------------------------------------------
+
+internal static class TransformType
+{
+ internal const short NoTransform = 0;
+ internal const short BadTransform = 1;
+ internal const short Partition = 2;
+ internal const short Wildcard = 3;
+ internal const short SplitFromLeft = 4;
+ internal const short SplitFromRight = 5;
+ internal const short SliceFromLeft = 6;
+ internal const short SliceFromRight = 7;
+ internal const short Split = 8;
+ internal const short Left = 9;
+ internal const short Right = 10;
+ internal const short Random = 11;
+}
+
+// -------------------------------------------------------------------------
+// ISubjectTransformer interface (mirrors SubjectTransformer in Go)
+// -------------------------------------------------------------------------
+
+///
+/// Transforms NATS subjects according to a source-to-destination mapping.
+/// Mirrors SubjectTransformer in server/subject_transform.go.
+///
+public interface ISubjectTransformer
+{
+ (string result, Exception? err) Match(string subject);
+ string TransformSubject(string subject);
+ string TransformTokenizedSubject(string[] tokens);
+}
+
+// -------------------------------------------------------------------------
+// SubjectTransform class
+// -------------------------------------------------------------------------
+
+///
+/// Subject mapping and transform engine.
+/// Mirrors subjectTransform in server/subject_transform.go.
+///
+public sealed class SubjectTransform : ISubjectTransformer
+{
+ private readonly string _src;
+ private readonly string _dest;
+ private readonly string[] _dtoks; // destination tokens
+ private readonly string[] _stoks; // source tokens
+ private readonly short[] _dtokmftypes;
+ private readonly int[][] _dtokmftokindexesargs;
+ private readonly int[] _dtokmfintargs;
+ private readonly string[] _dtokmfstringargs;
+
+ // Subject mapping function regexes (mirrors var block in Go).
+ private static readonly Regex CommaSep = new(@",\s*", RegexOptions.Compiled);
+ private static readonly Regex PartitionRe = new(@"\{\{\s*[pP]artition\s*\((.*)\)\s*\}\}", RegexOptions.Compiled);
+ private static readonly Regex WildcardRe = new(@"\{\{\s*[wW]ildcard\s*\((.*)\)\s*\}\}", RegexOptions.Compiled);
+ private static readonly Regex SplitFromLeftRe = new(@"\{\{\s*[sS]plit[fF]rom[lL]eft\s*\((.*)\)\s*\}\}", RegexOptions.Compiled);
+ private static readonly Regex SplitFromRightRe = new(@"\{\{\s*[sS]plit[fF]rom[rR]ight\s*\((.*)\)\s*\}\}", RegexOptions.Compiled);
+ private static readonly Regex SliceFromLeftRe = new(@"\{\{\s*[sS]lice[fF]rom[lL]eft\s*\((.*)\)\s*\}\}", RegexOptions.Compiled);
+ private static readonly Regex SliceFromRightRe = new(@"\{\{\s*[sS]lice[fF]rom[rR]ight\s*\((.*)\)\s*\}\}", RegexOptions.Compiled);
+ private static readonly Regex SplitRe = new(@"\{\{\s*[sS]plit\s*\((.*)\)\s*\}\}", RegexOptions.Compiled);
+ private static readonly Regex LeftRe = new(@"\{\{\s*[lL]eft\s*\((.*)\)\s*\}\}", RegexOptions.Compiled);
+ private static readonly Regex RightRe = new(@"\{\{\s*[rR]ight\s*\((.*)\)\s*\}\}", RegexOptions.Compiled);
+ private static readonly Regex RandomRe = new(@"\{\{\s*[rR]andom\s*\((.*)\)\s*\}\}", RegexOptions.Compiled);
+
+ private SubjectTransform(
+ string src, string dest,
+ string[] dtoks, string[] stoks,
+ short[] dtokmftypes, int[][] dtokmftokindexesargs,
+ int[] dtokmfintargs, string[] dtokmfstringargs)
+ {
+ _src = src;
+ _dest = dest;
+ _dtoks = dtoks;
+ _stoks = stoks;
+ _dtokmftypes = dtokmftypes;
+ _dtokmftokindexesargs = dtokmftokindexesargs;
+ _dtokmfintargs = dtokmfintargs;
+ _dtokmfstringargs = dtokmfstringargs;
+ }
+
+ ///
+ /// Creates a new transform with optional strict mode.
+ /// Returns (null, null) when dest is empty (no transform needed).
+ /// Mirrors NewSubjectTransformWithStrict.
+ ///
+ public static (SubjectTransform? transform, Exception? err) NewWithStrict(
+ string src, string dest, bool strict)
+ {
+ if (dest == SubjectTokens.Empty)
+ return (null, null);
+
+ if (src == SubjectTokens.Empty)
+ src = SubjectTokens.Fwcs;
+
+ var (sv, stokens, npwcs, hasFwc) = SubjectInfo(src);
+ var (dv, dtokens, dnpwcs, dHasFwc) = SubjectInfo(dest);
+
+ if (!sv || !dv || dnpwcs > 0 || hasFwc != dHasFwc)
+ return (null, ServerErrors.ErrBadSubject);
+
+ var dtokMfTypes = new List();
+ var dtokMfIndexes = new List();
+ var dtokMfIntArgs = new List();
+ var dtokMfStringArgs = new List();
+
+ if (npwcs > 0 || hasFwc)
+ {
+ // Build source-token index map for partial wildcards.
+ var sti = new Dictionary();
+ for (var i = 0; i < stokens.Length; i++)
+ {
+ if (stokens[i].Length == 1 && stokens[i][0] == SubjectTokens.Pwc)
+ sti[sti.Count + 1] = i;
+ }
+
+ var nphs = 0;
+ foreach (var token in dtokens)
+ {
+ var (tt, tidxs, tint, tstr, terr) = IndexPlaceHolders(token);
+ if (terr != null) return (null, terr);
+
+ if (strict && tt != TransformType.NoTransform && tt != TransformType.Wildcard)
+ return (null, new MappingDestinationException(token, ServerErrors.ErrMappingDestinationNotSupportedForImport));
+
+ if (tt == TransformType.NoTransform)
+ {
+ dtokMfTypes.Add(TransformType.NoTransform);
+ dtokMfIndexes.Add([-1]);
+ dtokMfIntArgs.Add(-1);
+ dtokMfStringArgs.Add(SubjectTokens.Empty);
+ }
+ else if (tt == TransformType.Random)
+ {
+ dtokMfTypes.Add(TransformType.Random);
+ dtokMfIndexes.Add([]);
+ dtokMfIntArgs.Add(tint);
+ dtokMfStringArgs.Add(SubjectTokens.Empty);
+ }
+ else
+ {
+ nphs += tidxs.Length;
+ var stis = new List();
+ foreach (var wildcardIndex in tidxs)
+ {
+ if (wildcardIndex > npwcs)
+ return (null, new MappingDestinationException(
+ $"{token}: [{wildcardIndex}]",
+ ServerErrors.ErrMappingDestinationIndexOutOfRange));
+ stis.Add(sti.GetValueOrDefault(wildcardIndex, 0));
+ }
+ dtokMfTypes.Add(tt);
+ dtokMfIndexes.Add([.. stis]);
+ dtokMfIntArgs.Add(tint);
+ dtokMfStringArgs.Add(tstr);
+ }
+ }
+
+ if (strict && nphs < npwcs)
+ return (null, new MappingDestinationException(dest, ServerErrors.ErrMappingDestinationNotUsingAllWildcards));
+ }
+ else
+ {
+ foreach (var token in dtokens)
+ {
+ var (tt, _, tint, _, terr) = IndexPlaceHolders(token);
+ if (terr != null) return (null, terr);
+
+ if (tt == TransformType.NoTransform)
+ {
+ dtokMfTypes.Add(TransformType.NoTransform);
+ dtokMfIndexes.Add([-1]);
+ dtokMfIntArgs.Add(-1);
+ dtokMfStringArgs.Add(SubjectTokens.Empty);
+ }
+ else if (tt == TransformType.Random || tt == TransformType.Partition)
+ {
+ dtokMfTypes.Add(tt);
+ dtokMfIndexes.Add([]);
+ dtokMfIntArgs.Add(tint);
+ dtokMfStringArgs.Add(SubjectTokens.Empty);
+ }
+ else
+ {
+ return (null, new MappingDestinationException(token, ServerErrors.ErrMappingDestinationIndexOutOfRange));
+ }
+ }
+ }
+
+ return (new SubjectTransform(
+ src, dest,
+ dtokens, stokens,
+ [.. dtokMfTypes], [.. dtokMfIndexes],
+ [.. dtokMfIntArgs], [.. dtokMfStringArgs]), null);
+ }
+
+ ///
+ /// Creates a non-strict transform. Mirrors NewSubjectTransform.
+ ///
+ public static (SubjectTransform? transform, Exception? err) New(string src, string dest) =>
+ NewWithStrict(src, dest, false);
+
+ ///
+ /// Creates a strict transform (only Wildcard function allowed).
+ /// Mirrors NewSubjectTransformStrict.
+ ///
+ public static (SubjectTransform? transform, Exception? err) NewStrict(string src, string dest) =>
+ NewWithStrict(src, dest, true);
+
+ ///
+ /// Attempts to match a published subject against the source pattern.
+ /// Returns the transformed subject or an error.
+ /// Mirrors subjectTransform.Match.
+ ///
+ public (string result, Exception? err) Match(string subject)
+ {
+ if ((_src == SubjectTokens.Fwcs || _src == SubjectTokens.Empty) &&
+ (_dest == SubjectTokens.Fwcs || _dest == SubjectTokens.Empty))
+ return (subject, null);
+
+ var tts = TokenizeSubject(subject);
+
+ if (!IsValidLiteralSubject(tts))
+ return (SubjectTokens.Empty, ServerErrors.ErrBadSubject);
+
+ if (_src == SubjectTokens.Empty || _src == SubjectTokens.Fwcs ||
+ IsSubsetMatch(tts, _src))
+ return (TransformTokenizedSubject(tts), null);
+
+ return (SubjectTokens.Empty, ServerErrors.ErrNoTransforms);
+ }
+
+ ///
+ /// Transforms a dot-separated subject string.
+ /// Mirrors subjectTransform.TransformSubject.
+ ///
+ public string TransformSubject(string subject) =>
+ TransformTokenizedSubject(TokenizeSubject(subject));
+
+ ///
+ /// Core token-by-token transform engine.
+ /// Mirrors subjectTransform.TransformTokenizedSubject.
+ ///
+ public string TransformTokenizedSubject(string[] tokens)
+ {
+ if (_dtokmftypes.Length == 0)
+ return _dest;
+
+ var b = new System.Text.StringBuilder();
+ var li = _dtokmftypes.Length - 1;
+
+ for (var i = 0; i < _dtokmftypes.Length; i++)
+ {
+ var mfType = _dtokmftypes[i];
+
+ if (mfType == TransformType.NoTransform)
+ {
+ if (_dtoks[i].Length == 1 && _dtoks[i][0] == SubjectTokens.Fwc)
+ break;
+ b.Append(_dtoks[i]);
+ }
+ else
+ {
+ switch (mfType)
+ {
+ case TransformType.Partition:
+ {
+ byte[] keyBytes;
+ if (_dtokmftokindexesargs[i].Length > 0)
+ {
+ var sb = new System.Text.StringBuilder();
+ foreach (var srcTok in _dtokmftokindexesargs[i])
+ sb.Append(tokens[srcTok]);
+ keyBytes = System.Text.Encoding.UTF8.GetBytes(sb.ToString());
+ }
+ else
+ {
+ keyBytes = System.Text.Encoding.UTF8.GetBytes(string.Join(".", tokens));
+ }
+ b.Append(GetHashPartition(keyBytes, _dtokmfintargs[i]));
+ break;
+ }
+ case TransformType.Wildcard:
+ if (_dtokmftokindexesargs.Length > i &&
+ _dtokmftokindexesargs[i].Length > 0 &&
+ tokens.Length > _dtokmftokindexesargs[i][0])
+ {
+ b.Append(tokens[_dtokmftokindexesargs[i][0]]);
+ }
+ break;
+ case TransformType.SplitFromLeft:
+ {
+ var src = tokens[_dtokmftokindexesargs[i][0]];
+ var pos = _dtokmfintargs[i];
+ if (pos > 0 && pos < src.Length)
+ {
+ b.Append(src[..pos]);
+ b.Append(SubjectTokens.Tsep);
+ b.Append(src[pos..]);
+ }
+ else
+ {
+ b.Append(src);
+ }
+ break;
+ }
+ case TransformType.SplitFromRight:
+ {
+ var src = tokens[_dtokmftokindexesargs[i][0]];
+ var pos = _dtokmfintargs[i];
+ if (pos > 0 && pos < src.Length)
+ {
+ b.Append(src[..(src.Length - pos)]);
+ b.Append(SubjectTokens.Tsep);
+ b.Append(src[(src.Length - pos)..]);
+ }
+ else
+ {
+ b.Append(src);
+ }
+ break;
+ }
+ case TransformType.SliceFromLeft:
+ {
+ var src = tokens[_dtokmftokindexesargs[i][0]];
+ var sz = _dtokmfintargs[i];
+ if (sz > 0 && sz < src.Length)
+ {
+ var j = 0;
+ while (j + sz <= src.Length)
+ {
+ if (j != 0) b.Append(SubjectTokens.Tsep);
+ b.Append(src[j..(j + sz)]);
+ if (j + sz != src.Length && j + sz + sz > src.Length)
+ {
+ b.Append(SubjectTokens.Tsep);
+ b.Append(src[(j + sz)..]);
+ break;
+ }
+ j += sz;
+ }
+ }
+ else
+ {
+ b.Append(src);
+ }
+ break;
+ }
+ case TransformType.SliceFromRight:
+ {
+ var src = tokens[_dtokmftokindexesargs[i][0]];
+ var sz = _dtokmfintargs[i];
+ if (sz > 0 && sz < src.Length)
+ {
+ var rem = src.Length % sz;
+ if (rem > 0)
+ {
+ b.Append(src[..rem]);
+ b.Append(SubjectTokens.Tsep);
+ }
+ var j = rem;
+ while (j + sz <= src.Length)
+ {
+ b.Append(src[j..(j + sz)]);
+ if (j + sz < src.Length) b.Append(SubjectTokens.Tsep);
+ j += sz;
+ }
+ }
+ else
+ {
+ b.Append(src);
+ }
+ break;
+ }
+ case TransformType.Split:
+ {
+ var src = tokens[_dtokmftokindexesargs[i][0]];
+ var parts = src.Split(_dtokmfstringargs[i]);
+ for (var j = 0; j < parts.Length; j++)
+ {
+ if (parts[j] != SubjectTokens.Empty)
+ b.Append(parts[j]);
+ if (j < parts.Length - 1 &&
+ parts[j + 1] != SubjectTokens.Empty &&
+ !(j == 0 && parts[j] == SubjectTokens.Empty))
+ b.Append(SubjectTokens.Tsep);
+ }
+ break;
+ }
+ case TransformType.Left:
+ {
+ var src = tokens[_dtokmftokindexesargs[i][0]];
+ var sz = _dtokmfintargs[i];
+ b.Append(sz > 0 && sz < src.Length ? src[..sz] : src);
+ break;
+ }
+ case TransformType.Right:
+ {
+ var src = tokens[_dtokmftokindexesargs[i][0]];
+ var sz = _dtokmfintargs[i];
+ b.Append(sz > 0 && sz < src.Length ? src[(src.Length - sz)..] : src);
+ break;
+ }
+ case TransformType.Random:
+ b.Append(GetRandomPartition(_dtokmfintargs[i]));
+ break;
+ }
+ }
+
+ if (i < li)
+ b.Append(SubjectTokens.Btsep);
+ }
+
+ // Append remaining source tokens when destination ends with ">".
+ if (_dtoks.Length > 0 && _dtoks[^1] == SubjectTokens.Fwcs)
+ {
+ var stokLen = _stoks.Length;
+ for (var i = stokLen - 1; i < tokens.Length; i++)
+ {
+ b.Append(tokens[i]);
+ if (i < tokens.Length - 1)
+ b.Append(SubjectTokens.Btsep);
+ }
+ }
+
+ return b.ToString();
+ }
+
+ ///
+ /// Reverses this transform (src ↔ dest).
+ /// Mirrors subjectTransform.reverse.
+ ///
+ internal SubjectTransform? Reverse()
+ {
+ if (_dtokmftokindexesargs.Length == 0)
+ {
+ var (rtr, _) = NewStrict(_dest, _src);
+ return rtr;
+ }
+
+ var (nsrc, phs) = TransformUntokenize(_dest);
+ var nda = new List();
+ foreach (var token in _stoks)
+ {
+ if (token == SubjectTokens.Pwcs)
+ {
+ if (phs.Length == 0) return null;
+ nda.Add(phs[0]);
+ phs = phs[1..];
+ }
+ else
+ {
+ nda.Add(token);
+ }
+ }
+ var ndest = string.Join(SubjectTokens.Tsep, nda);
+ var (rtrFinal, _) = NewStrict(nsrc, ndest);
+ return rtrFinal;
+ }
+
+ // -------------------------------------------------------------------------
+ // Static helpers exposed internally
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Returns the args extracted from a mapping-function token using the given regex.
+ /// Mirrors getMappingFunctionArgs.
+ ///
+ internal static string[]? GetMappingFunctionArgs(Regex functionRegex, string token)
+ {
+ var m = functionRegex.Match(token);
+ if (m.Success && m.Groups.Count > 1)
+ return CommaSep.Split(m.Groups[1].Value);
+ return null;
+ }
+
+ ///
+ /// Helper for transform functions that take (wildcardIndex, int) args.
+ /// Mirrors transformIndexIntArgsHelper.
+ ///
+ internal static (short tt, int[] indexes, int intArg, string strArg, Exception? err)
+ TransformIndexIntArgsHelper(string token, string[] args, short transformType)
+ {
+ if (args.Length < 2)
+ return (TransformType.BadTransform, [], -1, SubjectTokens.Empty,
+ new MappingDestinationException(token, ServerErrors.ErrMappingDestinationNotEnoughArgs));
+ if (args.Length > 2)
+ return (TransformType.BadTransform, [], -1, SubjectTokens.Empty,
+ new MappingDestinationException(token, ServerErrors.ErrMappingDestinationTooManyArgs));
+
+ if (!int.TryParse(args[0].Trim(), out var idx))
+ return (TransformType.BadTransform, [], -1, SubjectTokens.Empty,
+ new MappingDestinationException(token, ServerErrors.ErrMappingDestinationInvalidArg));
+
+ if (!int.TryParse(args[1].Trim(), out var intVal))
+ return (TransformType.BadTransform, [], -1, SubjectTokens.Empty,
+ new MappingDestinationException(token, ServerErrors.ErrMappingDestinationInvalidArg));
+
+ return (transformType, [idx], intVal, SubjectTokens.Empty, null);
+ }
+
+ ///
+ /// Parses a destination token and returns its transform type and arguments.
+ /// Mirrors indexPlaceHolders.
+ ///
+ internal static (short tt, int[] indexes, int intArg, string strArg, Exception? err)
+ IndexPlaceHolders(string token)
+ {
+ var length = token.Length;
+ if (length > 1)
+ {
+ if (token[0] == '$')
+ {
+ if (!int.TryParse(token[1..], out var tp))
+ return (TransformType.NoTransform, [-1], -1, SubjectTokens.Empty, null);
+ return (TransformType.Wildcard, [tp], -1, SubjectTokens.Empty, null);
+ }
+
+ if (length > 4 && token[0] == '{' && token[1] == '{' &&
+ token[length - 2] == '}' && token[length - 1] == '}')
+ {
+ // {{wildcard(n)}}
+ var args = GetMappingFunctionArgs(WildcardRe, token);
+ if (args != null)
+ {
+ if (args.Length == 1 && args[0] == SubjectTokens.Empty)
+ return (TransformType.BadTransform, [], -1, SubjectTokens.Empty,
+ new MappingDestinationException(token, ServerErrors.ErrMappingDestinationNotEnoughArgs));
+ if (args.Length == 1)
+ {
+ if (!int.TryParse(args[0].Trim(), out var ti))
+ return (TransformType.BadTransform, [], -1, SubjectTokens.Empty,
+ new MappingDestinationException(token, ServerErrors.ErrMappingDestinationInvalidArg));
+ return (TransformType.Wildcard, [ti], -1, SubjectTokens.Empty, null);
+ }
+ return (TransformType.BadTransform, [], -1, SubjectTokens.Empty,
+ new MappingDestinationException(token, ServerErrors.ErrMappingDestinationTooManyArgs));
+ }
+
+ // {{partition(n[,t1,t2,...])}}
+ args = GetMappingFunctionArgs(PartitionRe, token);
+ if (args != null)
+ {
+ if (args.Length < 1)
+ return (TransformType.BadTransform, [], -1, SubjectTokens.Empty,
+ new MappingDestinationException(token, ServerErrors.ErrMappingDestinationNotEnoughArgs));
+ if (!int.TryParse(args[0].Trim(), out var partN) || (long)partN > int.MaxValue)
+ return (TransformType.BadTransform, [], -1, SubjectTokens.Empty,
+ new MappingDestinationException(token, ServerErrors.ErrMappingDestinationInvalidArg));
+ if (args.Length == 1)
+ return (TransformType.Partition, [], partN, SubjectTokens.Empty, null);
+
+ var tidxs = new List();
+ foreach (var t in args[1..])
+ {
+ if (!int.TryParse(t.Trim(), out var ti2))
+ return (TransformType.BadTransform, [], -1, SubjectTokens.Empty,
+ new MappingDestinationException(token, ServerErrors.ErrMappingDestinationInvalidArg));
+ tidxs.Add(ti2);
+ }
+ return (TransformType.Partition, [.. tidxs], partN, SubjectTokens.Empty, null);
+ }
+
+ // {{SplitFromLeft(t, n)}}
+ args = GetMappingFunctionArgs(SplitFromLeftRe, token);
+ if (args != null) return TransformIndexIntArgsHelper(token, args, TransformType.SplitFromLeft);
+
+ // {{SplitFromRight(t, n)}}
+ args = GetMappingFunctionArgs(SplitFromRightRe, token);
+ if (args != null) return TransformIndexIntArgsHelper(token, args, TransformType.SplitFromRight);
+
+ // {{SliceFromLeft(t, n)}}
+ args = GetMappingFunctionArgs(SliceFromLeftRe, token);
+ if (args != null) return TransformIndexIntArgsHelper(token, args, TransformType.SliceFromLeft);
+
+ // {{SliceFromRight(t, n)}}
+ args = GetMappingFunctionArgs(SliceFromRightRe, token);
+ if (args != null) return TransformIndexIntArgsHelper(token, args, TransformType.SliceFromRight);
+
+ // {{right(t, n)}}
+ args = GetMappingFunctionArgs(RightRe, token);
+ if (args != null) return TransformIndexIntArgsHelper(token, args, TransformType.Right);
+
+ // {{left(t, n)}}
+ args = GetMappingFunctionArgs(LeftRe, token);
+ if (args != null) return TransformIndexIntArgsHelper(token, args, TransformType.Left);
+
+ // {{split(t, delim)}}
+ args = GetMappingFunctionArgs(SplitRe, token);
+ if (args != null)
+ {
+ if (args.Length < 2)
+ return (TransformType.BadTransform, [], -1, SubjectTokens.Empty,
+ new MappingDestinationException(token, ServerErrors.ErrMappingDestinationNotEnoughArgs));
+ if (args.Length > 2)
+ return (TransformType.BadTransform, [], -1, SubjectTokens.Empty,
+ new MappingDestinationException(token, ServerErrors.ErrMappingDestinationTooManyArgs));
+ if (!int.TryParse(args[0].Trim(), out var splitIdx))
+ return (TransformType.BadTransform, [], -1, SubjectTokens.Empty,
+ new MappingDestinationException(token, ServerErrors.ErrMappingDestinationInvalidArg));
+ if (args[1].Contains(' ') || args[1].Contains(SubjectTokens.Tsep))
+ return (TransformType.BadTransform, [], -1, SubjectTokens.Empty,
+ new MappingDestinationException(token, ServerErrors.ErrMappingDestinationInvalidArg));
+ return (TransformType.Split, [splitIdx], -1, args[1], null);
+ }
+
+ // {{random(n)}}
+ args = GetMappingFunctionArgs(RandomRe, token);
+ if (args != null)
+ {
+ if (args.Length != 1)
+ return (TransformType.BadTransform, [], -1, SubjectTokens.Empty,
+ new MappingDestinationException(token, ServerErrors.ErrMappingDestinationNotEnoughArgs));
+ if (!int.TryParse(args[0].Trim(), out var randN) || (long)randN > int.MaxValue)
+ return (TransformType.BadTransform, [], -1, SubjectTokens.Empty,
+ new MappingDestinationException(token, ServerErrors.ErrMappingDestinationInvalidArg));
+ return (TransformType.Random, [], randN, SubjectTokens.Empty, null);
+ }
+
+ return (TransformType.BadTransform, [], -1, SubjectTokens.Empty,
+ new MappingDestinationException(token, ServerErrors.ErrUnknownMappingDestinationFunction));
+ }
+ }
+
+ return (TransformType.NoTransform, [-1], -1, SubjectTokens.Empty, null);
+ }
+
+ ///
+ /// Tokenises a subject with wildcards into a formal transform destination.
+ /// e.g. "foo.*.*" → "foo.$1.$2".
+ /// Mirrors transformTokenize.
+ ///
+ public static string TransformTokenize(string subject)
+ {
+ var i = 1;
+ var parts = new List();
+ foreach (var token in subject.Split(SubjectTokens.Btsep))
+ {
+ if (token == SubjectTokens.Pwcs)
+ {
+ parts.Add($"${i++}");
+ }
+ else
+ {
+ parts.Add(token);
+ }
+ }
+ return string.Join(SubjectTokens.Tsep, parts);
+ }
+
+ ///
+ /// Converts a transform destination back to a wildcard subject + placeholder list.
+ /// Mirrors transformUntokenize.
+ ///
+ public static (string subject, string[] placeholders) TransformUntokenize(string subject)
+ {
+ var phs = new List();
+ var nda = new List();
+
+ foreach (var token in subject.Split(SubjectTokens.Btsep))
+ {
+ var args = GetMappingFunctionArgs(WildcardRe, token);
+ var isWildcardPlaceholder =
+ (token.Length > 1 && token[0] == '$' && token[1] >= '1' && token[1] <= '9') ||
+ (args?.Length == 1 && args[0] != SubjectTokens.Empty);
+
+ if (isWildcardPlaceholder)
+ {
+ phs.Add(token);
+ nda.Add(SubjectTokens.Pwcs);
+ }
+ else
+ {
+ nda.Add(token);
+ }
+ }
+
+ return (string.Join(SubjectTokens.Tsep, nda), [.. phs]);
+ }
+
+ ///
+ /// Tokenises a subject into an array of dot-separated tokens.
+ /// Mirrors tokenizeSubject.
+ ///
+ public static string[] TokenizeSubject(string subject) =>
+ subject.Split(SubjectTokens.Btsep);
+
+ ///
+ /// Returns (valid, tokens, numPwcs, hasFwc) for a subject string.
+ /// Mirrors subjectInfo.
+ ///
+ public static (bool valid, string[] tokens, int npwcs, bool hasFwc) SubjectInfo(string subject)
+ {
+ if (subject == string.Empty)
+ return (false, [], 0, false);
+
+ var npwcs = 0;
+ var sfwc = false;
+ var tokens = subject.Split(SubjectTokens.Tsep);
+ foreach (var t in tokens)
+ {
+ if (t.Length == 0 || sfwc)
+ return (false, [], 0, false);
+ if (t.Length > 1) continue;
+ switch (t[0])
+ {
+ case SubjectTokens.Fwc:
+ sfwc = true;
+ break;
+ case SubjectTokens.Pwc:
+ npwcs++;
+ break;
+ }
+ }
+ return (true, tokens, npwcs, sfwc);
+ }
+
+ // -------------------------------------------------------------------------
+ // Internal helpers used by Match
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Returns true if all tokens are literal (no wildcards).
+ /// Mirrors isValidLiteralSubject in server/sublist.go.
+ ///
+ internal static bool IsValidLiteralSubject(string[] tokens)
+ {
+ foreach (var t in tokens)
+ {
+ if (t.Length == 0) return false;
+ if (t.Length == 1 && (t[0] == SubjectTokens.Pwc || t[0] == SubjectTokens.Fwc))
+ return false;
+ }
+ return true;
+ }
+
+ ///
+ /// Returns true if match the pattern .
+ /// Mirrors isSubsetMatch in server/sublist.go.
+ ///
+ internal static bool IsSubsetMatch(string[] tokens, string test)
+ {
+ var testToks = TokenizeSubjectIntoSlice(test);
+ return IsSubsetMatchTokenized(tokens, testToks);
+ }
+
+ private static string[] TokenizeSubjectIntoSlice(string subject)
+ {
+ var result = new List();
+ var start = 0;
+ for (var i = 0; i < subject.Length; i++)
+ {
+ if (subject[i] == SubjectTokens.Btsep)
+ {
+ result.Add(subject[start..i]);
+ start = i + 1;
+ }
+ }
+ result.Add(subject[start..]);
+ return [.. result];
+ }
+
+ private static bool IsSubsetMatchTokenized(string[] tokens, string[] test)
+ {
+ for (var i = 0; i < test.Length; i++)
+ {
+ if (i >= tokens.Length) return false;
+ var t2 = test[i];
+ if (t2.Length == 0) return false;
+ if (t2.Length == 1 && t2[0] == SubjectTokens.Fwc) return true;
+
+ var t1 = tokens[i];
+ if (t1.Length == 0) return false;
+ if (t1.Length == 1 && t1[0] == SubjectTokens.Fwc) return false;
+
+ if (t1.Length == 1 && t1[0] == SubjectTokens.Pwc)
+ {
+ if (!(t2.Length == 1 && t2[0] == SubjectTokens.Pwc)) return false;
+ if (i >= test.Length) return true;
+ continue;
+ }
+
+ if (!(t2.Length == 1 && t2[0] == SubjectTokens.Pwc) &&
+ string.Compare(t1, t2, StringComparison.Ordinal) != 0)
+ return false;
+ }
+ return tokens.Length == test.Length;
+ }
+
+ private string GetRandomPartition(int ceiling)
+ {
+ if (ceiling == 0) return "0";
+ return (Random.Shared.Next() % ceiling).ToString();
+ }
+
+ private static string GetHashPartition(byte[] key, int numBuckets)
+ {
+ if (numBuckets == 0) return "0";
+ // FNV-1a 32-bit hash — mirrors fnv.New32a() in Go.
+ const uint FnvPrime = 16777619;
+ const uint FnvOffset = 2166136261;
+ var hash = FnvOffset;
+ foreach (var b in key) { hash ^= b; hash *= FnvPrime; }
+ return ((int)(hash % (uint)numBuckets)).ToString();
+ }
+}
diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/IpQueueTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/IpQueueTests.cs
new file mode 100644
index 0000000..5bac632
--- /dev/null
+++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/IpQueueTests.cs
@@ -0,0 +1,316 @@
+// Copyright 2021-2025 The NATS Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System.Collections.Concurrent;
+using Shouldly;
+using ZB.MOM.NatsNet.Server.Internal;
+
+namespace ZB.MOM.NatsNet.Server.Tests.Internal;
+
+///
+/// Tests for .
+/// Mirrors server/ipqueue_test.go:
+/// TestIPQueueBasic (ID 688), TestIPQueuePush (ID 689), TestIPQueuePop (ID 690),
+/// TestIPQueuePopOne (ID 691), TestIPQueueMultiProducers (ID 692),
+/// TestIPQueueRecycle (ID 693), TestIPQueueDrain (ID 694),
+/// TestIPQueueSizeCalculation (ID 695), TestIPQueueSizeCalculationWithLimits (ID 696).
+/// Benchmarks (IDs 697–715) are n/a.
+///
+public sealed class IpQueueTests
+{
+ [Fact]
+ public void Basic_ShouldInitialiseCorrectly()
+ {
+ // Mirror: TestIPQueueBasic
+ var registry = new ConcurrentDictionary();
+ var q = new IpQueue("test", registry);
+
+ q.MaxRecycleSize.ShouldBe(IpQueue.DefaultMaxRecycleSize);
+ q.Ch.TryRead(out _).ShouldBeFalse("channel should be empty on creation");
+ q.Len().ShouldBe(0);
+
+ // Create a second queue with custom max recycle size.
+ var q2 = new IpQueue("test2", registry, maxRecycleSize: 10);
+ q2.MaxRecycleSize.ShouldBe(10);
+
+ // Both should be in the registry.
+ registry.ContainsKey("test").ShouldBeTrue();
+ registry.ContainsKey("test2").ShouldBeTrue();
+
+ // Unregister both.
+ q.Unregister();
+ q2.Unregister();
+ registry.IsEmpty.ShouldBeTrue("registry should be empty after unregister");
+
+ // Push/pop should still work after unregister.
+ q.Push(1);
+ var elts = q.Pop();
+ elts.ShouldNotBeNull();
+ elts!.Length.ShouldBe(1);
+
+ q2.Push(2);
+ var (e, ok) = q2.PopOne();
+ ok.ShouldBeTrue();
+ e.ShouldBe(2);
+ }
+
+ [Fact]
+ public void Push_ShouldNotifyOnFirstElement()
+ {
+ // Mirror: TestIPQueuePush
+ var q = new IpQueue("test");
+
+ q.Push(1);
+ q.Len().ShouldBe(1);
+ q.Ch.TryRead(out _).ShouldBeTrue("should have been notified after first push");
+
+ // Second push should NOT send another notification.
+ q.Push(2);
+ q.Len().ShouldBe(2);
+ q.Ch.TryRead(out _).ShouldBeFalse("should not notify again when queue was not empty");
+ }
+
+ [Fact]
+ public void Pop_ShouldReturnElementsAndTrackInProgress()
+ {
+ // Mirror: TestIPQueuePop
+ var q = new IpQueue("test");
+ q.Push(1);
+ q.Ch.TryRead(out _); // consume signal
+
+ var elts = q.Pop();
+ elts.ShouldNotBeNull();
+ elts!.Length.ShouldBe(1);
+ q.Len().ShouldBe(0);
+
+ // Channel should still be empty after pop.
+ q.Ch.TryRead(out _).ShouldBeFalse();
+
+ // InProgress should be 1 — pop increments it.
+ q.InProgress().ShouldBe(1L);
+
+ // Recycle decrements it.
+ q.Recycle(elts);
+ q.InProgress().ShouldBe(0L);
+
+ // Pop on empty queue returns null.
+ var empty = q.Pop();
+ empty.ShouldBeNull();
+ q.InProgress().ShouldBe(0L);
+ }
+
+ [Fact]
+ public void PopOne_ShouldReturnOneAtATime()
+ {
+ // Mirror: TestIPQueuePopOne
+ var q = new IpQueue("test");
+ q.Push(1);
+ q.Ch.TryRead(out _); // consume signal
+
+ var (e, ok) = q.PopOne();
+ ok.ShouldBeTrue();
+ e.ShouldBe(1);
+ q.Len().ShouldBe(0);
+ q.InProgress().ShouldBe(0L, "popOne does not increment inprogress");
+ q.Ch.TryRead(out _).ShouldBeFalse("no notification when queue is emptied by popOne");
+
+ q.Push(2);
+ q.Push(3);
+
+ var (e2, ok2) = q.PopOne();
+ ok2.ShouldBeTrue();
+ e2.ShouldBe(2);
+ q.Len().ShouldBe(1);
+ q.Ch.TryRead(out _).ShouldBeTrue("should re-notify when more items remain");
+
+ var (e3, ok3) = q.PopOne();
+ ok3.ShouldBeTrue();
+ e3.ShouldBe(3);
+ q.Len().ShouldBe(0);
+ q.Ch.TryRead(out _).ShouldBeFalse("no notification after last element removed");
+
+ var (_, okEmpty) = q.PopOne();
+ okEmpty.ShouldBeFalse("popOne on empty queue returns false");
+ }
+
+ [Fact]
+ public async Task MultiProducers_ShouldReceiveAllElements()
+ {
+ // Mirror: TestIPQueueMultiProducers
+ var q = new IpQueue("test");
+ const int itemsPerProducer = 100;
+ const int numProducers = 3;
+
+ var tasks = Enumerable.Range(0, numProducers).Select(p =>
+ Task.Run(() =>
+ {
+ for (var i = p * itemsPerProducer + 1; i <= (p + 1) * itemsPerProducer; i++)
+ q.Push(i);
+ })).ToArray();
+
+ var received = new HashSet();
+ var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
+
+ while (received.Count < numProducers * itemsPerProducer &&
+ !cts.Token.IsCancellationRequested)
+ {
+ if (q.Ch.TryRead(out _))
+ {
+ var batch = q.Pop();
+ if (batch != null)
+ {
+ foreach (var v in batch) received.Add(v);
+ q.Recycle(batch);
+ q.InProgress().ShouldBe(0L);
+ }
+ }
+ else
+ {
+ await Task.Delay(1, cts.Token);
+ }
+ }
+
+ await Task.WhenAll(tasks);
+ received.Count.ShouldBe(numProducers * itemsPerProducer, "all elements should be received");
+ }
+
+ [Fact]
+ public void Recycle_ShouldDecrementInProgressAndAllowReuse()
+ {
+ // Mirror: TestIPQueueRecycle (behavioral aspects)
+ var q = new IpQueue("test");
+ const int total = 1000;
+
+ for (var i = 0; i < total; i++)
+ {
+ var (len, err) = q.Push(i);
+ err.ShouldBeNull();
+ len.ShouldBe(i + 1);
+ }
+
+ var values = q.Pop();
+ values.ShouldNotBeNull();
+ values!.Length.ShouldBe(total);
+ q.InProgress().ShouldBe((long)total);
+
+ q.Recycle(values);
+ q.InProgress().ShouldBe(0L, "recycle should decrement inprogress");
+
+ // Should be able to push/pop again after recycle.
+ var (l, err2) = q.Push(1001);
+ err2.ShouldBeNull();
+ l.ShouldBe(1);
+ var values2 = q.Pop();
+ values2.ShouldNotBeNull();
+ values2!.Length.ShouldBe(1);
+ values2[0].ShouldBe(1001);
+
+ // Recycle with small max recycle size: large arrays should not be pooled
+ // (behavioral: push/pop still works correctly).
+ var q2 = new IpQueue("test2", maxRecycleSize: 10);
+ for (var i = 0; i < 100; i++) q2.Push(i);
+ var bigBatch = q2.Pop();
+ bigBatch.ShouldNotBeNull();
+ bigBatch!.Length.ShouldBe(100);
+ q2.Recycle(bigBatch);
+ q2.InProgress().ShouldBe(0L);
+
+ q2.Push(1001);
+ var small = q2.Pop();
+ small.ShouldNotBeNull();
+ small!.Length.ShouldBe(1);
+ q2.Recycle(small);
+ }
+
+ [Fact]
+ public void Drain_ShouldEmptyQueueAndConsumeSignal()
+ {
+ // Mirror: TestIPQueueDrain
+ var q = new IpQueue("test");
+ for (var i = 1; i <= 100; i++) q.Push(i);
+
+ var drained = q.Drain();
+ drained.ShouldBe(100);
+
+ // Signal should have been consumed.
+ q.Ch.TryRead(out _).ShouldBeFalse("drain should consume the notification signal");
+ q.Len().ShouldBe(0);
+ }
+
+ [Fact]
+ public void SizeCalculation_ShouldTrackTotalSize()
+ {
+ // Mirror: TestIPQueueSizeCalculation
+ const int elemSize = 16;
+ var q = new IpQueue("test", sizeCalc: e => (ulong)e.Length);
+
+ for (var i = 0; i < 10; i++)
+ {
+ q.Push(new byte[elemSize]);
+ q.Len().ShouldBe(i + 1);
+ q.Size().ShouldBe((ulong)(i + 1) * elemSize);
+ }
+
+ for (var i = 10; i > 5; i--)
+ {
+ q.PopOne();
+ q.Len().ShouldBe(i - 1);
+ q.Size().ShouldBe((ulong)(i - 1) * elemSize);
+ }
+
+ q.Pop();
+ q.Len().ShouldBe(0);
+ q.Size().ShouldBe(0UL);
+ }
+
+ [Fact]
+ public void SizeCalculationWithLimits_ShouldEnforceLimits()
+ {
+ // Mirror: TestIPQueueSizeCalculationWithLimits
+ const int elemSize = 16;
+ Func calc = e => (ulong)e.Length;
+ var elem = new byte[elemSize];
+
+ // LimitByLen
+ var q1 = new IpQueue("test-len", sizeCalc: calc, maxLen: 5);
+ for (var i = 0; i < 10; i++)
+ {
+ var (n, err) = q1.Push(elem);
+ if (i >= 5)
+ {
+ err.ShouldBeSameAs(IpQueueErrors.LenLimitReached, $"iteration {i}");
+ }
+ else
+ {
+ err.ShouldBeNull($"iteration {i}");
+ }
+ n.ShouldBeLessThan(6);
+ }
+
+ // LimitBySize
+ var q2 = new IpQueue("test-size", sizeCalc: calc, maxSize: elemSize * 5);
+ for (var i = 0; i < 10; i++)
+ {
+ var (n, err) = q2.Push(elem);
+ if (i >= 5)
+ {
+ err.ShouldBeSameAs(IpQueueErrors.SizeLimitReached, $"iteration {i}");
+ }
+ else
+ {
+ err.ShouldBeNull($"iteration {i}");
+ }
+ n.ShouldBeLessThan(6);
+ }
+ }
+}
diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/ServerUtilitiesTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/ServerUtilitiesTests.cs
new file mode 100644
index 0000000..bb66dd4
--- /dev/null
+++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/ServerUtilitiesTests.cs
@@ -0,0 +1,194 @@
+// Copyright 2012-2025 The NATS Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using Shouldly;
+using ZB.MOM.NatsNet.Server.Internal;
+
+namespace ZB.MOM.NatsNet.Server.Tests.Internal;
+
+///
+/// Tests for .
+/// Mirrors server/util_test.go: TestParseSize (ID 3061), TestParseSInt64 (ID 3062),
+/// TestParseHostPort (ID 3063), TestURLsAreEqual (ID 3064), TestComma (ID 3065),
+/// TestURLRedaction (ID 3066), TestVersionAtLeast (ID 3067).
+/// Benchmarks (IDs 3068–3073) are n/a.
+///
+public sealed class ServerUtilitiesTests
+{
+ [Fact]
+ public void ParseSize_ShouldParseValidAndRejectInvalid()
+ {
+ // Mirror: TestParseSize
+ ServerUtilities.ParseSize(ReadOnlySpan.Empty).ShouldBe(-1, "nil/empty should return -1");
+
+ var n = "12345678"u8;
+ ServerUtilities.ParseSize(n).ShouldBe(12345678);
+
+ var bad = "12345invalid678"u8;
+ ServerUtilities.ParseSize(bad).ShouldBe(-1, "non-digit chars should return -1");
+ }
+
+ [Fact]
+ public void ParseInt64_ShouldParseValidAndRejectInvalid()
+ {
+ // Mirror: TestParseSInt64
+ ServerUtilities.ParseInt64(ReadOnlySpan.Empty).ShouldBe(-1L, "empty should return -1");
+
+ var n = "12345678"u8;
+ ServerUtilities.ParseInt64(n).ShouldBe(12345678L);
+
+ var bad = "12345invalid678"u8;
+ ServerUtilities.ParseInt64(bad).ShouldBe(-1L, "non-digit chars should return -1");
+ }
+
+ [Fact]
+ public void ParseHostPort_ShouldSplitCorrectly()
+ {
+ // Mirror: TestParseHostPort
+ void Check(string hostPort, int defaultPort, string expectedHost, int expectedPort, bool expectError)
+ {
+ var (host, port, err) = ServerUtilities.ParseHostPort(hostPort, defaultPort);
+ if (expectError)
+ {
+ err.ShouldNotBeNull($"expected error for hostPort={hostPort}");
+ return;
+ }
+ err.ShouldBeNull($"unexpected error for hostPort={hostPort}: {err?.Message}");
+ host.ShouldBe(expectedHost);
+ port.ShouldBe(expectedPort);
+ }
+
+ Check("addr:1234", 5678, "addr", 1234, false);
+ Check(" addr:1234 ", 5678, "addr", 1234, false);
+ Check(" addr : 1234 ", 5678, "addr", 1234, false);
+ Check("addr", 5678, "addr", 5678, false); // no port → default
+ Check(" addr ", 5678, "addr", 5678, false);
+ Check("addr:-1", 5678, "addr", 5678, false); // -1 → default
+ Check(" addr:-1 ", 5678, "addr", 5678, false);
+ Check(" addr : -1 ", 5678, "addr", 5678, false);
+ Check("addr:0", 5678, "addr", 5678, false); // 0 → default
+ Check(" addr:0 ", 5678, "addr", 5678, false);
+ Check(" addr : 0 ", 5678, "addr", 5678, false);
+ Check("addr:addr", 0, "", 0, true); // non-numeric port
+ Check("addr:::1234", 0, "", 0, true); // ambiguous colons
+ Check("", 0, "", 0, true); // empty
+ }
+
+ [Fact]
+ public void UrlsAreEqual_ShouldCompareCorrectly()
+ {
+ // Mirror: TestURLsAreEqual
+ void Check(string u1Str, string u2Str, bool expectedSame)
+ {
+ var u1 = new Uri(u1Str);
+ var u2 = new Uri(u2Str);
+ ServerUtilities.UrlsAreEqual(u1, u2).ShouldBe(expectedSame,
+ $"expected {u1Str} and {u2Str} to be {(expectedSame ? "equal" : "different")}");
+ }
+
+ Check("nats://localhost:4222", "nats://localhost:4222", true);
+ Check("nats://ivan:pwd@localhost:4222", "nats://ivan:pwd@localhost:4222", true);
+ Check("nats://ivan@localhost:4222", "nats://ivan@localhost:4222", true);
+ Check("nats://ivan:@localhost:4222", "nats://ivan:@localhost:4222", true);
+ Check("nats://host1:4222", "nats://host2:4222", false);
+ }
+
+ [Fact]
+ public void Comma_ShouldFormatWithThousandSeparators()
+ {
+ // Mirror: TestComma
+ var cases = new (long input, string expected)[]
+ {
+ (0, "0"),
+ (10, "10"),
+ (100, "100"),
+ (1_000, "1,000"),
+ (10_000, "10,000"),
+ (100_000, "100,000"),
+ (10_000_000, "10,000,000"),
+ (10_100_000, "10,100,000"),
+ (10_010_000, "10,010,000"),
+ (10_001_000, "10,001,000"),
+ (123_456_789, "123,456,789"),
+ (9_223_372_036_854_775_807L, "9,223,372,036,854,775,807"), // long.MaxValue
+ (long.MinValue, "-9,223,372,036,854,775,808"),
+ (-123_456_789, "-123,456,789"),
+ (-10_100_000, "-10,100,000"),
+ (-10_010_000, "-10,010,000"),
+ (-10_001_000, "-10,001,000"),
+ (-10_000_000, "-10,000,000"),
+ (-100_000, "-100,000"),
+ (-10_000, "-10,000"),
+ (-1_000, "-1,000"),
+ (-100, "-100"),
+ (-10, "-10"),
+ };
+
+ foreach (var (input, expected) in cases)
+ ServerUtilities.Comma(input).ShouldBe(expected, $"Comma({input})");
+ }
+
+ [Fact]
+ public void UrlRedaction_ShouldReplacePasswords()
+ {
+ // Mirror: TestURLRedaction
+ var cases = new (string full, string safe)[]
+ {
+ ("nats://foo:bar@example.org", "nats://foo:xxxxx@example.org"),
+ ("nats://foo@example.org", "nats://foo@example.org"),
+ ("nats://example.org", "nats://example.org"),
+ ("nats://example.org/foo?bar=1", "nats://example.org/foo?bar=1"),
+ };
+
+ var listFull = new Uri[cases.Length];
+ var listSafe = new Uri[cases.Length];
+
+ for (var i = 0; i < cases.Length; i++)
+ {
+ ServerUtilities.RedactUrlString(cases[i].full).ShouldBe(cases[i].safe,
+ $"RedactUrlString[{i}]");
+ listFull[i] = new Uri(cases[i].full);
+ listSafe[i] = new Uri(cases[i].safe);
+ }
+
+ var results = ServerUtilities.RedactUrlList(listFull);
+ for (var i = 0; i < results.Length; i++)
+ results[i].ToString().ShouldBe(listSafe[i].ToString(), $"RedactUrlList[{i}]");
+ }
+
+ [Fact]
+ public void VersionAtLeast_ShouldReturnCorrectResult()
+ {
+ // Mirror: TestVersionAtLeast
+ var cases = new (string version, int major, int minor, int update, bool result)[]
+ {
+ ("2.0.0-beta", 1, 9, 9, true),
+ ("2.0.0", 1, 99, 9, true),
+ ("2.2.0", 2, 1, 9, true),
+ ("2.2.2", 2, 2, 2, true),
+ ("2.2.2", 2, 2, 3, false),
+ ("2.2.2", 2, 3, 2, false),
+ ("2.2.2", 3, 2, 2, false),
+ ("2.22.2", 3, 0, 0, false),
+ ("2.2.22", 2, 3, 0, false),
+ ("bad.version",1, 2, 3, false),
+ };
+
+ foreach (var (version, major, minor, update, expected) in cases)
+ {
+ ServerUtilities.VersionAtLeast(version, major, minor, update)
+ .ShouldBe(expected,
+ $"VersionAtLeast({version}, {major}, {minor}, {update})");
+ }
+ }
+}
diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/SubjectTransformTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/SubjectTransformTests.cs
new file mode 100644
index 0000000..90989d7
--- /dev/null
+++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/SubjectTransformTests.cs
@@ -0,0 +1,256 @@
+// Copyright 2023-2025 The NATS Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using Shouldly;
+using ZB.MOM.NatsNet.Server.Internal;
+
+namespace ZB.MOM.NatsNet.Server.Tests.Internal;
+
+///
+/// Tests for .
+/// Mirrors server/subject_transform_test.go:
+/// TestPlaceHolderIndex (ID 2958), TestSubjectTransformHelpers (ID 2959),
+/// TestSubjectTransforms (ID 2960),
+/// TestSubjectTransformDoesntPanicTransformingMissingToken (ID 2961).
+///
+public sealed class SubjectTransformTests
+{
+ [Fact]
+ public void PlaceHolderIndex_ShouldParseAllFunctionTypes()
+ {
+ // Mirror: TestPlaceHolderIndex
+
+ // $1 — old style
+ var (tt, idxs, intArg, _, err) = SubjectTransform.IndexPlaceHolders("$1");
+ err.ShouldBeNull();
+ tt.ShouldBe(TransformType.Wildcard);
+ idxs.Length.ShouldBe(1);
+ idxs[0].ShouldBe(1);
+ intArg.ShouldBe(-1);
+
+ // {{partition(10,1,2,3)}}
+ (tt, idxs, intArg, _, err) = SubjectTransform.IndexPlaceHolders("{{partition(10,1,2,3)}}");
+ err.ShouldBeNull();
+ tt.ShouldBe(TransformType.Partition);
+ idxs.ShouldBe([1, 2, 3]);
+ intArg.ShouldBe(10);
+
+ // {{ Partition (10,1,2,3) }} (with spaces)
+ (tt, idxs, intArg, _, err) = SubjectTransform.IndexPlaceHolders("{{ Partition (10,1,2,3) }}");
+ err.ShouldBeNull();
+ tt.ShouldBe(TransformType.Partition);
+ idxs.ShouldBe([1, 2, 3]);
+ intArg.ShouldBe(10);
+
+ // {{wildcard(2)}}
+ (tt, idxs, intArg, _, err) = SubjectTransform.IndexPlaceHolders("{{wildcard(2)}}");
+ err.ShouldBeNull();
+ tt.ShouldBe(TransformType.Wildcard);
+ idxs.Length.ShouldBe(1);
+ idxs[0].ShouldBe(2);
+ intArg.ShouldBe(-1);
+
+ // {{SplitFromLeft(2,1)}}
+ int pos;
+ (tt, idxs, pos, _, err) = SubjectTransform.IndexPlaceHolders("{{SplitFromLeft(2,1)}}");
+ err.ShouldBeNull();
+ tt.ShouldBe(TransformType.SplitFromLeft);
+ idxs.Length.ShouldBe(1);
+ idxs[0].ShouldBe(2);
+ pos.ShouldBe(1);
+
+ // {{SplitFromRight(3,2)}}
+ (tt, idxs, pos, _, err) = SubjectTransform.IndexPlaceHolders("{{SplitFromRight(3,2)}}");
+ err.ShouldBeNull();
+ tt.ShouldBe(TransformType.SplitFromRight);
+ idxs.Length.ShouldBe(1);
+ idxs[0].ShouldBe(3);
+ pos.ShouldBe(2);
+
+ // {{SliceFromLeft(2,2)}}
+ (tt, idxs, var sliceSize, _, err) = SubjectTransform.IndexPlaceHolders("{{SliceFromLeft(2,2)}}");
+ err.ShouldBeNull();
+ tt.ShouldBe(TransformType.SliceFromLeft);
+ idxs.Length.ShouldBe(1);
+ idxs[0].ShouldBe(2);
+ sliceSize.ShouldBe(2);
+
+ // {{Left(3,2)}}
+ (tt, idxs, pos, _, err) = SubjectTransform.IndexPlaceHolders("{{Left(3,2)}}");
+ err.ShouldBeNull();
+ tt.ShouldBe(TransformType.Left);
+ idxs.Length.ShouldBe(1);
+ idxs[0].ShouldBe(3);
+ pos.ShouldBe(2);
+
+ // {{Right(3,2)}}
+ (tt, idxs, pos, _, err) = SubjectTransform.IndexPlaceHolders("{{Right(3,2)}}");
+ err.ShouldBeNull();
+ tt.ShouldBe(TransformType.Right);
+ idxs.Length.ShouldBe(1);
+ idxs[0].ShouldBe(3);
+ pos.ShouldBe(2);
+ }
+
+ [Fact]
+ public void SubjectTransformHelpers_ShouldTokenizeAndUntokenize()
+ {
+ // Mirror: TestSubjectTransformHelpers
+
+ // transformUntokenize — no placeholders
+ var (filter, phs) = SubjectTransform.TransformUntokenize("bar");
+ filter.ShouldBe("bar");
+ phs.Length.ShouldBe(0);
+
+ // transformUntokenize — dollar-sign placeholders
+ (filter, phs) = SubjectTransform.TransformUntokenize("foo.$2.$1");
+ filter.ShouldBe("foo.*.*");
+ phs.ShouldBe(["$2", "$1"]);
+
+ // transformUntokenize — mustache placeholders
+ (filter, phs) = SubjectTransform.TransformUntokenize("foo.{{wildcard(2)}}.{{wildcard(1)}}");
+ filter.ShouldBe("foo.*.*");
+ phs.ShouldBe(["{{wildcard(2)}}", "{{wildcard(1)}}"]);
+
+ // Strict reverse transform.
+ var (tr, err) = SubjectTransform.NewStrict("foo.*.*", "bar.$2.{{Wildcard(1)}}");
+ err.ShouldBeNull($"NewStrict failed: {err?.Message}");
+ tr.ShouldNotBeNull();
+
+ var subject = "foo.b.a";
+ var transformed = tr!.TransformSubject(subject);
+ var reverse = tr.Reverse();
+ reverse.ShouldNotBeNull("reverse should not be null");
+ reverse!.TransformSubject(transformed).ShouldBe(subject, "reverse of transform should return original subject");
+ }
+
+ [Fact]
+ public void SubjectTransforms_ShouldValidateAndTransformCorrectly()
+ {
+ // Mirror: TestSubjectTransforms
+ void ShouldErr(string src, string dest, bool strict)
+ {
+ var (t, e) = SubjectTransform.NewWithStrict(src, dest, strict);
+ t.ShouldBeNull($"expected error but got transform for src={src}, dest={dest}");
+ var isValid = ReferenceEquals(e, ServerErrors.ErrBadSubject) ||
+ (e is MappingDestinationException mde && mde.Is(ServerErrors.ErrInvalidMappingDestination));
+ isValid.ShouldBeTrue(
+ $"Expected ErrBadSubject or ErrInvalidMappingDestination for src={src}, dest={dest}, got: {e}");
+ }
+
+ ShouldErr("foo..", "bar", false);
+ ShouldErr("foo.*", "bar.*", false);
+ ShouldErr("foo.*", "bar.$2", false);
+ ShouldErr("foo.*", "bar.$1.>", false);
+ ShouldErr("foo.>", "bar.baz", false);
+ ShouldErr("foo.*.*", "bar.$2", true);
+ ShouldErr("foo.*", "foo.$foo", true);
+ ShouldErr("foo.*", "bar.{{Partition(2,1)}}", true);
+ ShouldErr("foo.*", "foo.{{wildcard(2)}}", false);
+ ShouldErr("foo.*", "foo.{{unimplemented(1)}}", false);
+ ShouldErr("foo.*", "foo.{{partition()}}", false);
+ ShouldErr("foo.*", "foo.{{random()}}", false);
+ ShouldErr("foo.*", "foo.{{wildcard(foo)}}", false);
+ ShouldErr("foo.*", "foo.{{wildcard()}}", false);
+ ShouldErr("foo.*", "foo.{{wildcard(1,2)}}", false);
+ ShouldErr("foo.*", "foo.{{ wildcard5) }}", false);
+ ShouldErr("foo.*", "foo.{{splitLeft(2,2}}", false);
+ ShouldErr("foo", "bla.{{wildcard(1)}}", false);
+ ShouldErr("foo.*", $"foo.{{{{partition({(long)int.MaxValue + 1})}}}}", false);
+ ShouldErr("foo.*", $"foo.{{{{random({(long)int.MaxValue + 1})}}}}", false);
+
+ SubjectTransform? ShouldBeOK(string src, string dest, bool strict)
+ {
+ var (tr, err) = SubjectTransform.NewWithStrict(src, dest, strict);
+ err.ShouldBeNull($"Got error {err} for src={src}, dest={dest}");
+ return tr;
+ }
+
+ ShouldBeOK("foo.*", "bar.{{Wildcard(1)}}", true);
+ ShouldBeOK("foo.*.*", "bar.$2", false);
+ ShouldBeOK("foo.*.*", "bar.{{wildcard(1)}}", false);
+ ShouldBeOK("foo.*.*", "bar.{{partition(1)}}", false);
+ ShouldBeOK("foo.*.*", "bar.{{random(5)}}", false);
+ ShouldBeOK("foo", "bar", false);
+ ShouldBeOK("foo.*.bar.*.baz", "req.$2.$1", false);
+ ShouldBeOK("baz.>", "mybaz.>", false);
+ ShouldBeOK("*", "{{splitfromleft(1,1)}}", false);
+ ShouldBeOK("", "prefix.>", false);
+ ShouldBeOK("*.*", "{{partition(10,1,2)}}", false);
+ ShouldBeOK("foo.*.*", "foo.{{wildcard(1)}}.{{wildcard(2)}}.{{partition(5,1,2)}}", false);
+ ShouldBeOK("foo.*", $"foo.{{{{partition({int.MaxValue})}}}}", false);
+ ShouldBeOK("foo.*", $"foo.{{{{random({int.MaxValue})}}}}", false);
+ ShouldBeOK("foo.bar", $"foo.{{{{random({int.MaxValue})}}}}", false);
+
+ void ShouldMatch(string src, string dest, string sample, params string[] expected)
+ {
+ var tr = ShouldBeOK(src, dest, false);
+ if (tr == null) return;
+ var (s, err2) = tr.Match(sample);
+ err2.ShouldBeNull($"Match error: {err2}");
+ expected.ShouldContain(s, $"Transform {src}→{dest} on '{sample}', got '{s}'");
+ }
+
+ ShouldMatch("", "prefix.>", "foo", "prefix.foo");
+ ShouldMatch("foo", "", "foo", "foo");
+ ShouldMatch("foo", "bar", "foo", "bar");
+ ShouldMatch("foo.*.bar.*.baz", "req.$2.$1", "foo.A.bar.B.baz", "req.B.A");
+ ShouldMatch("foo.*.bar.*.baz", "req.{{wildcard(2)}}.{{wildcard(1)}}", "foo.A.bar.B.baz", "req.B.A");
+ ShouldMatch("baz.>", "my.pre.>", "baz.1.2.3", "my.pre.1.2.3");
+ ShouldMatch("baz.>", "foo.bar.>", "baz.1.2.3", "foo.bar.1.2.3");
+ ShouldMatch("*", "foo.bar.$1", "foo", "foo.bar.foo");
+ ShouldMatch("*", "{{splitfromleft(1,3)}}", "12345", "123.45");
+ ShouldMatch("*", "{{SplitFromRight(1,3)}}", "12345", "12.345");
+ ShouldMatch("*", "{{SliceFromLeft(1,3)}}", "1234567890", "123.456.789.0");
+ ShouldMatch("*", "{{SliceFromRight(1,3)}}", "1234567890", "1.234.567.890");
+ ShouldMatch("*", "{{split(1,-)}}", "-abc-def--ghi-", "abc.def.ghi");
+ ShouldMatch("*", "{{split(1,-)}}", "abc-def--ghi-", "abc.def.ghi");
+ ShouldMatch("*.*", "{{split(2,-)}}.{{splitfromleft(1,2)}}", "foo.-abc-def--ghij-", "abc.def.ghij.fo.o");
+ ShouldMatch("*", "{{right(1,1)}}", "1234", "4");
+ ShouldMatch("*", "{{right(1,3)}}", "1234", "234");
+ ShouldMatch("*", "{{right(1,6)}}", "1234", "1234");
+ ShouldMatch("*", "{{left(1,1)}}", "1234", "1");
+ ShouldMatch("*", "{{left(1,3)}}", "1234", "123");
+ ShouldMatch("*", "{{left(1,6)}}", "1234", "1234");
+ ShouldMatch("*", "bar.{{partition(0)}}", "baz", "bar.0");
+ ShouldMatch("*", "bar.{{partition(10, 0)}}", "foo", "bar.3");
+ ShouldMatch("*.*", "bar.{{partition(10)}}", "foo.bar", "bar.6");
+ ShouldMatch("*", "bar.{{partition(10)}}", "foo", "bar.3");
+ ShouldMatch("*", "bar.{{partition(10)}}", "baz", "bar.0");
+ ShouldMatch("*", "bar.{{partition(10)}}", "qux", "bar.9");
+ ShouldMatch("*", "bar.{{random(0)}}", "qux", "bar.0");
+
+ // random(6) — any value 0–5 is acceptable.
+ for (var i = 0; i < 100; i++)
+ ShouldMatch("*", "bar.{{random(6)}}", "qux",
+ "bar.0", "bar.1", "bar.2", "bar.3", "bar.4", "bar.5");
+
+ ShouldBeOK("foo.bar", "baz.{{partition(10)}}", false);
+ ShouldMatch("foo.bar", "baz.{{partition(10)}}", "foo.bar", "baz.6");
+ ShouldMatch("foo.baz", "qux.{{partition(10)}}", "foo.baz", "qux.4");
+ ShouldMatch("test.subject", "result.{{partition(5)}}", "test.subject", "result.0");
+ }
+
+ [Fact]
+ public void TransformTokenizedSubject_ShouldNotPanicOnMissingToken()
+ {
+ // Mirror: TestSubjectTransformDoesntPanicTransformingMissingToken
+ var (tr, err) = SubjectTransform.New("foo.*", "one.two.{{wildcard(1)}}");
+ err.ShouldBeNull();
+ tr.ShouldNotBeNull();
+
+ // Should not throw even when the token at index 1 is missing.
+ var result = tr!.TransformTokenizedSubject(["foo"]);
+ result.ShouldBe("one.two.");
+ }
+}
diff --git a/porting.db b/porting.db
index b9a4b1c..2e12d22 100644
Binary files a/porting.db and b/porting.db differ
diff --git a/reports/current.md b/reports/current.md
index 7dc7ceb..b385c39 100644
--- a/reports/current.md
+++ b/reports/current.md
@@ -1,6 +1,6 @@
# NATS .NET Porting Status Report
-Generated: 2026-02-26 14:15:20 UTC
+Generated: 2026-02-26 14:39:36 UTC
## Modules (12 total)
@@ -13,17 +13,17 @@ Generated: 2026-02-26 14:15:20 UTC
| Status | Count |
|--------|-------|
-| complete | 265 |
-| n_a | 74 |
-| not_started | 3334 |
+| complete | 328 |
+| n_a | 79 |
+| not_started | 3266 |
## Unit Tests (3257 total)
| Status | Count |
|--------|-------|
-| complete | 119 |
-| n_a | 12 |
-| not_started | 3126 |
+| complete | 139 |
+| n_a | 49 |
+| not_started | 3069 |
## Library Mappings (36 total)
@@ -34,4 +34,4 @@ Generated: 2026-02-26 14:15:20 UTC
## Overall Progress
-**481/6942 items complete (6.9%)**
+**606/6942 items complete (8.7%)**
diff --git a/reports/report_8050ee1.md b/reports/report_8050ee1.md
new file mode 100644
index 0000000..b385c39
--- /dev/null
+++ b/reports/report_8050ee1.md
@@ -0,0 +1,37 @@
+# NATS .NET Porting Status Report
+
+Generated: 2026-02-26 14:39:36 UTC
+
+## Modules (12 total)
+
+| Status | Count |
+|--------|-------|
+| complete | 11 |
+| not_started | 1 |
+
+## Features (3673 total)
+
+| Status | Count |
+|--------|-------|
+| complete | 328 |
+| n_a | 79 |
+| not_started | 3266 |
+
+## Unit Tests (3257 total)
+
+| Status | Count |
+|--------|-------|
+| complete | 139 |
+| n_a | 49 |
+| not_started | 3069 |
+
+## Library Mappings (36 total)
+
+| Status | Count |
+|--------|-------|
+| mapped | 36 |
+
+
+## Overall Progress
+
+**606/6942 items complete (8.7%)**