diff --git a/CLAUDE.md b/CLAUDE.md
index 31ce8b4..7d8caf7 100644
--- a/CLAUDE.md
+++ b/CLAUDE.md
@@ -59,6 +59,64 @@ All .NET code must follow the rules in [`docs/standards/dotnet-standards.md`](do
- **Logging**: `Microsoft.Extensions.Logging` with Serilog provider; use `LogContext.PushProperty` for contextual enrichment
- **Naming**: PascalCase for all public members; `ZB.MOM.NatsNet.Server.[Module]` namespace hierarchy
+## Using Codex via tmux-cli
+
+Use `tmux-cli` to launch and interact with OpenAI Codex (`ccc` alias) in a separate tmux pane. This lets you delegate tasks to Codex without blocking your current session.
+
+**Do NOT use the Codex MCP server (`mcp__codex__codex`) for long-running commands.** Use `tmux-cli` + `ccc` instead — it runs in a separate pane, won't time out, and lets you capture output when ready.
+
+### The `ccc` alias
+
+`ccc` runs `codex --dangerously-bypass-approvals-and-sandbox` (with Node/nvm auto-setup). Use it for tasks you want Codex to handle autonomously.
+
+### Workflow
+
+**Always launch a shell pane first** — if a command errors without a shell, the pane closes and output is lost:
+
+```bash
+# 1. Launch a shell (returns pane ID, e.g. "2")
+tmux-cli launch "zsh"
+
+# 2. Send a codex command to that pane
+tmux-cli send 'ccc "your prompt here"' --pane=2
+
+# 3. Wait for codex to finish (no output for N seconds = idle)
+tmux-cli wait_idle --pane=2 --idle-time=10.0
+
+# 4. Capture the output
+tmux-cli capture --pane=2
+
+# 5. Clean up when done
+tmux-cli kill --pane=2
+```
+
+### Key tmux-cli commands
+
+| Command | Purpose |
+|---------|---------|
+| `tmux-cli launch "zsh"` | Start a shell pane (do this FIRST) |
+| `tmux-cli send "cmd" --pane=N` | Send text + Enter to pane N |
+| `tmux-cli send "text" --pane=N --enter=False` | Send text without pressing Enter |
+| `tmux-cli send "text" --pane=N --delay-enter=0.5` | Custom delay before Enter (default 1.5s) |
+| `tmux-cli wait_idle --pane=N --idle-time=3.0` | Wait until pane has no output for N seconds |
+| `tmux-cli capture --pane=N` | Capture current pane output |
+| `tmux-cli list_panes` | List all panes (JSON with IDs and status) |
+| `tmux-cli kill --pane=N` | Kill a pane (cannot kill your own) |
+| `tmux-cli interrupt --pane=N` | Send Ctrl+C to a pane |
+| `tmux-cli escape --pane=N` | Send Escape to a pane |
+
+### Pane identifiers
+
+- Just the pane number: `2` (current window)
+- Full format: `session:window.pane` (e.g. `myapp:1.2`)
+
+### Tips
+
+- Use `wait_idle` instead of polling with repeated `capture` calls
+- Save the pane ID returned by `launch` for subsequent commands
+- Use `capture` to check state before sending input
+- For long-running Codex tasks, increase `--idle-time` (e.g. `--idle-time=15.0`)
+
## Reports
- `reports/current.md` always has the latest porting status.
diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs
index 00a9682..f35f978 100644
--- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs
@@ -1158,6 +1158,58 @@ public sealed partial class ClientConnection
TraceInOp("PRE", pre);
}
+ // =========================================================================
+ // Parser compatibility wrappers (features 2588, 2590, 2591)
+ // =========================================================================
+
+ ///
+ /// Parses protocol bytes using the shared parser state for this connection.
+ /// Mirrors Go client.parse.
+ ///
+ internal Exception? Parse(byte[] buf, IProtocolHandler handler)
+ {
+ ArgumentNullException.ThrowIfNull(buf);
+ ArgumentNullException.ThrowIfNull(handler);
+
+ ParseCtx.Kind = Kind;
+ ParseCtx.HasHeaders = Headers;
+ if (_mcl > 0)
+ ParseCtx.MaxControlLine = _mcl;
+ if (_mpay != 0)
+ ParseCtx.MaxPayload = _mpay;
+
+ return ProtocolParser.Parse(ParseCtx, handler, buf);
+ }
+
+ ///
+ /// Checks max control line enforcement for the current connection kind.
+ /// Mirrors Go client.overMaxControlLineLimit.
+ ///
+ internal Exception? OverMaxControlLineLimit(byte[] arg, int mcl, IProtocolHandler handler)
+ {
+ ArgumentNullException.ThrowIfNull(arg);
+ ArgumentNullException.ThrowIfNull(handler);
+
+ ParseCtx.Kind = Kind;
+ return ProtocolParser.OverMaxControlLineLimit(ParseCtx, handler, arg, mcl);
+ }
+
+ ///
+ /// Re-processes stored pub args for split-buffer message payload handling.
+ /// Mirrors Go client.clonePubArg.
+ ///
+ internal Exception? ClonePubArg(IProtocolHandler handler, bool lmsg)
+ {
+ ArgumentNullException.ThrowIfNull(handler);
+
+ ParseCtx.Kind = Kind;
+ ParseCtx.HasHeaders = Headers;
+ if (_mpay != 0)
+ ParseCtx.MaxPayload = _mpay;
+
+ return ProtocolParser.ClonePubArg(ParseCtx, handler, lmsg);
+ }
+
///
/// Generates the INFO JSON bytes sent to the client on connect.
/// Stub — full implementation in session 09.
diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/DataStructures/SubscriptionIndex.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/DataStructures/SubscriptionIndex.cs
index 8cb1bdc..0a5d6db 100644
--- a/dotnet/src/ZB.MOM.NatsNet.Server/Internal/DataStructures/SubscriptionIndex.cs
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/Internal/DataStructures/SubscriptionIndex.cs
@@ -58,7 +58,7 @@ public sealed class SubscriptionIndex
private long _cacheHits;
private long _inserts;
private long _removes;
- private SublistLevel _root;
+ private Level _root;
private Dictionary? _cache;
private int _ccSweep;
private NotifyMaps? _notify;
@@ -70,7 +70,7 @@ public sealed class SubscriptionIndex
private SubscriptionIndex(bool enableCache)
{
- _root = new SublistLevel();
+ _root = new Level();
_cache = enableCache ? new Dictionary() : null;
}
@@ -116,7 +116,7 @@ public sealed class SubscriptionIndex
try
{
bool sfwc = false, haswc = false, isnew = false;
- SublistNode? n = null;
+ Node? n = null;
var l = _root;
var start = 0;
@@ -135,7 +135,7 @@ public sealed class SubscriptionIndex
{
if (!l.Nodes.TryGetValue(t, out n))
{
- n = new SublistNode();
+ n = new Node();
l.Nodes[t] = n;
}
}
@@ -148,7 +148,7 @@ public sealed class SubscriptionIndex
haswc = true;
if (n == null)
{
- n = new SublistNode();
+ n = new Node();
l.Pwc = n;
}
break;
@@ -158,21 +158,21 @@ public sealed class SubscriptionIndex
sfwc = true;
if (n == null)
{
- n = new SublistNode();
+ n = new Node();
l.Fwc = n;
}
break;
default:
if (!l.Nodes.TryGetValue(t, out n))
{
- n = new SublistNode();
+ n = new Node();
l.Nodes[t] = n;
}
break;
}
}
- n.Next ??= new SublistLevel();
+ n.Next ??= new Level();
l = n.Next;
start = i + 1;
@@ -446,7 +446,7 @@ public sealed class SubscriptionIndex
try
{
bool sfwc = false, haswc = false;
- SublistNode? n = null;
+ Node? n = null;
var l = _root;
var lnts = new LevelNodeToken[32];
@@ -535,7 +535,7 @@ public sealed class SubscriptionIndex
}
}
- private static (bool found, bool last) RemoveFromNode(SublistNode? n, Subscription sub)
+ private static (bool found, bool last) RemoveFromNode(Node? n, Subscription sub)
{
if (n == null) return (false, true);
@@ -743,9 +743,9 @@ public sealed class SubscriptionIndex
// Private: Trie matching (matchLevel)
// -------------------------------------------------------------------------
- private static void MatchLevel(SublistLevel? l, string[] toks, SubscriptionIndexResult results)
+ private static void MatchLevel(Level? l, string[] toks, SubscriptionIndexResult results)
{
- SublistNode? pwc = null, n = null;
+ Node? pwc = null, n = null;
for (int i = 0; i < toks.Length; i++)
{
if (l == null) return;
@@ -760,9 +760,9 @@ public sealed class SubscriptionIndex
if (pwc != null) AddNodeToResults(pwc, results);
}
- private static bool MatchLevelForAny(SublistLevel? l, ReadOnlySpan toks, ref int np, ref int nq)
+ private static bool MatchLevelForAny(Level? l, ReadOnlySpan toks, ref int np, ref int nq)
{
- SublistNode? pwc = null, n = null;
+ Node? pwc = null, n = null;
for (int i = 0; i < toks.Length; i++)
{
if (l == null) return false;
@@ -804,7 +804,7 @@ public sealed class SubscriptionIndex
// Private: Reverse match
// -------------------------------------------------------------------------
- private static void ReverseMatchLevel(SublistLevel? l, ReadOnlySpan toks, SublistNode? n, SubscriptionIndexResult results)
+ private static void ReverseMatchLevel(Level? l, ReadOnlySpan toks, Node? n, SubscriptionIndexResult results)
{
if (l == null) return;
for (int i = 0; i < toks.Length; i++)
@@ -848,7 +848,7 @@ public sealed class SubscriptionIndex
if (n != null) AddNodeToResults(n, results);
}
- private static void GetAllNodes(SublistLevel? l, SubscriptionIndexResult results)
+ private static void GetAllNodes(Level? l, SubscriptionIndexResult results)
{
if (l == null) return;
if (l.Pwc != null) AddNodeToResults(l.Pwc, results);
@@ -864,7 +864,7 @@ public sealed class SubscriptionIndex
// Private: addNodeToResults
// -------------------------------------------------------------------------
- private static void AddNodeToResults(SublistNode n, SubscriptionIndexResult results)
+ private static void AddNodeToResults(Node n, SubscriptionIndexResult results)
{
// Plain subscriptions.
if (n.PList != null)
@@ -940,7 +940,7 @@ public sealed class SubscriptionIndex
}
}
- private static void AddNodeToSubsLocal(SublistNode n, List subs, bool includeLeafHubs)
+ private static void AddNodeToSubsLocal(Node n, List subs, bool includeLeafHubs)
{
if (n.PList != null)
{
@@ -960,7 +960,7 @@ public sealed class SubscriptionIndex
}
}
- private static void CollectLocalSubs(SublistLevel? l, List subs, bool includeLeafHubs)
+ private static void CollectLocalSubs(Level? l, List subs, bool includeLeafHubs)
{
if (l == null) return;
foreach (var n in l.Nodes.Values)
@@ -972,7 +972,7 @@ public sealed class SubscriptionIndex
if (l.Fwc != null) { AddNodeToSubsLocal(l.Fwc, subs, includeLeafHubs); CollectLocalSubs(l.Fwc.Next, subs, includeLeafHubs); }
}
- private static void AddAllNodeToSubs(SublistNode n, List subs)
+ private static void AddAllNodeToSubs(Node n, List subs)
{
if (n.PList != null)
subs.AddRange(n.PList);
@@ -986,7 +986,7 @@ public sealed class SubscriptionIndex
subs.Add(sub);
}
- private static void CollectAllSubs(SublistLevel? l, List subs)
+ private static void CollectAllSubs(Level? l, List subs)
{
if (l == null) return;
foreach (var n in l.Nodes.Values)
@@ -1204,7 +1204,7 @@ public sealed class SubscriptionIndex
// Private: visitLevel (depth calculation for tests)
// -------------------------------------------------------------------------
- private static int VisitLevel(SublistLevel? l, int depth)
+ private static int VisitLevel(Level? l, int depth)
{
if (l == null || l.NumNodes() == 0) return depth;
depth++;
@@ -1529,18 +1529,18 @@ public sealed class SubscriptionIndex
}
// -------------------------------------------------------------------------
- // Nested types: SublistNode, SublistLevel, NotifyMaps
+ // Nested types: Node, Level, NotifyMaps
// -------------------------------------------------------------------------
- internal sealed class SublistNode
+ internal sealed class Node
{
public readonly Dictionary PSubs = new(ReferenceEqualityComparer.Instance);
public Dictionary>? QSubs;
public List? PList;
- public SublistLevel? Next;
+ public Level? Next;
/// Factory method matching Go's newNode().
- public static SublistNode NewNode() => new();
+ public static Node NewNode() => new();
public bool IsEmpty()
{
@@ -1549,14 +1549,14 @@ public sealed class SubscriptionIndex
}
}
- internal sealed class SublistLevel
+ internal sealed class Level
{
- public readonly Dictionary Nodes = new();
- public SublistNode? Pwc;
- public SublistNode? Fwc;
+ public readonly Dictionary Nodes = new();
+ public Node? Pwc;
+ public Node? Fwc;
/// Factory method matching Go's newLevel().
- public static SublistLevel NewLevel() => new();
+ public static Level NewLevel() => new();
public int NumNodes()
{
@@ -1566,8 +1566,9 @@ public sealed class SubscriptionIndex
return num;
}
- public void PruneNode(SublistNode n, string t)
+ public void PruneNode(Node? n, string t)
{
+ if (n == null) return;
if (ReferenceEquals(n, Fwc)) Fwc = null;
else if (ReferenceEquals(n, Pwc)) Pwc = null;
else Nodes.Remove(t);
@@ -1582,11 +1583,11 @@ public sealed class SubscriptionIndex
private readonly struct LevelNodeToken
{
- public readonly SublistLevel Level;
- public readonly SublistNode Node;
+ public readonly Level Level;
+ public readonly Node Node;
public readonly string Token;
- public LevelNodeToken(SublistLevel level, SublistNode node, string token)
+ public LevelNodeToken(Level level, Node node, string token)
{
Level = level;
Node = node;
diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs
index ff828cb..e92739f 100644
--- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs
+++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs
@@ -103,6 +103,23 @@ public sealed class JetStreamMemStore : IStreamStore
_scheduling = new MsgScheduling(RunMsgScheduling);
}
+ ///
+ /// Factory that mirrors Go newMemStore mapping semantics.
+ ///
+ public static JetStreamMemStore NewMemStore(StreamConfig cfg)
+ {
+ var ms = new JetStreamMemStore(cfg);
+
+ if (cfg.FirstSeq > 0)
+ {
+ var (_, err) = ms.PurgeInternal(cfg.FirstSeq);
+ if (err != null)
+ throw err;
+ }
+
+ return ms;
+ }
+
// -----------------------------------------------------------------------
// IStreamStore — store / load
// -----------------------------------------------------------------------
@@ -1133,17 +1150,7 @@ public sealed class JetStreamMemStore : IStreamStore
_mu.EnterReadLock();
try
{
- if (_msgs == null || _msgs.Count == 0) return (Array.Empty(), null);
- var seqs = new List(_fss.Size());
- _fss.IterFast((subj, ss) =>
- {
- if (ss.LastNeedsUpdate)
- RecalculateForSubj(Encoding.UTF8.GetString(subj), ss);
- seqs.Add(ss.Last);
- return true;
- });
- seqs.Sort();
- return (seqs.ToArray(), null);
+ return AllLastSeqsLocked();
}
finally
{
@@ -1151,6 +1158,27 @@ public sealed class JetStreamMemStore : IStreamStore
}
}
+ ///
+ /// Returns sorted per-subject last sequences without taking locks.
+ /// Mirrors Go allLastSeqsLocked.
+ ///
+ private (ulong[] Seqs, Exception? Error) AllLastSeqsLocked()
+ {
+ if (_msgs == null || _msgs.Count == 0) return (Array.Empty(), null);
+
+ var seqs = new List(_fss.Size());
+ _fss.IterFast((subj, ss) =>
+ {
+ if (ss.LastNeedsUpdate)
+ RecalculateForSubj(Encoding.UTF8.GetString(subj), ss);
+ seqs.Add(ss.Last);
+ return true;
+ });
+
+ seqs.Sort();
+ return (seqs.ToArray(), null);
+ }
+
///
public (ulong[] Seqs, Exception? Error) MultiLastSeqs(string[] filters, ulong maxSeq, int maxAllowed)
{
diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/DataStructures/SubscriptionIndexTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/DataStructures/SubscriptionIndexTests.cs
index fe11062..cdbc26e 100644
--- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/DataStructures/SubscriptionIndexTests.cs
+++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/DataStructures/SubscriptionIndexTests.cs
@@ -219,6 +219,51 @@ public class SubscriptionIndexTests
s.NumLevels().ShouldBe(0);
}
+ [Fact]
+ public void PruneNode_NullNode_DoesNotMutateLiteralNodes()
+ {
+ var level = new SubscriptionIndex.Level();
+ var literal = new SubscriptionIndex.Node();
+ level.Nodes["foo"] = literal;
+ level.Fwc = new SubscriptionIndex.Node();
+ level.Pwc = new SubscriptionIndex.Node();
+
+ level.PruneNode(null!, "foo");
+
+ level.Nodes.Count.ShouldBe(1);
+ level.Nodes["foo"].ShouldBeSameAs(literal);
+ }
+
+ [Fact]
+ public void IsEmpty_WithAndWithoutChildren_TracksNodeEmptiness()
+ {
+ var node = new SubscriptionIndex.Node();
+ node.IsEmpty().ShouldBeTrue();
+
+ node.Next = new SubscriptionIndex.Level();
+ node.Next.Nodes["bar"] = new SubscriptionIndex.Node();
+ node.IsEmpty().ShouldBeFalse();
+
+ node.Next.Nodes.Clear();
+ node.IsEmpty().ShouldBeTrue();
+ }
+
+ [Fact]
+ public void NumNodes_WithLiteralAndWildcardEntries_CountsAllBranches()
+ {
+ var level = new SubscriptionIndex.Level();
+ level.Nodes["foo"] = new SubscriptionIndex.Node();
+ level.Pwc = new SubscriptionIndex.Node();
+ level.Fwc = new SubscriptionIndex.Node();
+
+ level.NumNodes().ShouldBe(3);
+
+ level.PruneNode(level.Pwc, SubscriptionIndex.Pwcs);
+ level.PruneNode(level.Fwc, SubscriptionIndex.Fwcs);
+ level.PruneNode(level.Nodes["foo"], "foo");
+ level.NumNodes().ShouldBe(0);
+ }
+
[Theory] // T:2983, T:2984
[InlineData(true)]
[InlineData(false)]
diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamMemoryStoreTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamMemoryStoreTests.cs
index 0d7ca72..e39670b 100644
--- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamMemoryStoreTests.cs
+++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamMemoryStoreTests.cs
@@ -32,7 +32,7 @@ public class JetStreamMemoryStoreTests
private static JetStreamMemStore NewMemStore(StreamConfig? cfg = null)
{
cfg ??= new StreamConfig { Storage = StorageType.MemoryStorage, Name = "test" };
- return new JetStreamMemStore(cfg);
+ return JetStreamMemStore.NewMemStore(cfg);
}
private static byte[] Bytes(string s) => Encoding.UTF8.GetBytes(s);
@@ -41,6 +41,71 @@ public class JetStreamMemoryStoreTests
private static ulong MsgSize(string subj, byte[]? hdr, byte[]? msg)
=> (ulong)(subj.Length + (hdr?.Length ?? 0) + (msg?.Length ?? 0) + 16);
+ [Fact]
+ public void NewMemStore_FactoryMethod_InitializesFirstSequence()
+ {
+ var method = typeof(JetStreamMemStore).GetMethod(
+ "NewMemStore",
+ System.Reflection.BindingFlags.Static | System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.NonPublic);
+ method.ShouldNotBeNull();
+
+ var cfg = new StreamConfig
+ {
+ Name = "factory-init",
+ Storage = StorageType.MemoryStorage,
+ FirstSeq = 1000,
+ };
+
+ var created = method!.Invoke(null, new object?[] { cfg });
+ created.ShouldBeOfType();
+
+ var ms = (JetStreamMemStore)created!;
+ var state = ms.State();
+ state.FirstSeq.ShouldBe(1000UL);
+ state.LastSeq.ShouldBe(999UL);
+ ms.Stop();
+ }
+
+ [Fact]
+ public void AllLastSeqsLocked_MatchesPublicAllLastSeqsOrdering()
+ {
+ var cfg = new StreamConfig
+ {
+ Name = "locked-all-last-seqs",
+ Subjects = new[] { "*.*" },
+ MaxMsgsPer = 50,
+ Storage = StorageType.MemoryStorage,
+ };
+ var ms = NewMemStore(cfg);
+
+ var subjs = new[] { "foo.foo", "foo.bar", "foo.baz", "bar.foo", "bar.bar", "bar.baz" };
+ var msg = Bytes("abc");
+ var rng = new Random(11);
+
+ for (var i = 0; i < 1_000; i++)
+ {
+ var subj = subjs[rng.Next(subjs.Length)];
+ ms.StoreMsg(subj, null, msg, 0);
+ }
+
+ var method = typeof(JetStreamMemStore).GetMethod(
+ "AllLastSeqsLocked",
+ System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic);
+ method.ShouldNotBeNull();
+
+ var result = method!.Invoke(ms, Array.Empty