merge: batch2 parser-sublist-memstore remainders
This commit is contained in:
58
CLAUDE.md
58
CLAUDE.md
@@ -59,6 +59,64 @@ All .NET code must follow the rules in [`docs/standards/dotnet-standards.md`](do
|
||||
- **Logging**: `Microsoft.Extensions.Logging` with Serilog provider; use `LogContext.PushProperty` for contextual enrichment
|
||||
- **Naming**: PascalCase for all public members; `ZB.MOM.NatsNet.Server.[Module]` namespace hierarchy
|
||||
|
||||
## Using Codex via tmux-cli
|
||||
|
||||
Use `tmux-cli` to launch and interact with OpenAI Codex (`ccc` alias) in a separate tmux pane. This lets you delegate tasks to Codex without blocking your current session.
|
||||
|
||||
**Do NOT use the Codex MCP server (`mcp__codex__codex`) for long-running commands.** Use `tmux-cli` + `ccc` instead — it runs in a separate pane, won't time out, and lets you capture output when ready.
|
||||
|
||||
### The `ccc` alias
|
||||
|
||||
`ccc` runs `codex --dangerously-bypass-approvals-and-sandbox` (with Node/nvm auto-setup). Use it for tasks you want Codex to handle autonomously.
|
||||
|
||||
### Workflow
|
||||
|
||||
**Always launch a shell pane first** — if a command errors without a shell, the pane closes and output is lost:
|
||||
|
||||
```bash
|
||||
# 1. Launch a shell (returns pane ID, e.g. "2")
|
||||
tmux-cli launch "zsh"
|
||||
|
||||
# 2. Send a codex command to that pane
|
||||
tmux-cli send 'ccc "your prompt here"' --pane=2
|
||||
|
||||
# 3. Wait for codex to finish (no output for N seconds = idle)
|
||||
tmux-cli wait_idle --pane=2 --idle-time=10.0
|
||||
|
||||
# 4. Capture the output
|
||||
tmux-cli capture --pane=2
|
||||
|
||||
# 5. Clean up when done
|
||||
tmux-cli kill --pane=2
|
||||
```
|
||||
|
||||
### Key tmux-cli commands
|
||||
|
||||
| Command | Purpose |
|
||||
|---------|---------|
|
||||
| `tmux-cli launch "zsh"` | Start a shell pane (do this FIRST) |
|
||||
| `tmux-cli send "cmd" --pane=N` | Send text + Enter to pane N |
|
||||
| `tmux-cli send "text" --pane=N --enter=False` | Send text without pressing Enter |
|
||||
| `tmux-cli send "text" --pane=N --delay-enter=0.5` | Custom delay before Enter (default 1.5s) |
|
||||
| `tmux-cli wait_idle --pane=N --idle-time=3.0` | Wait until pane has no output for N seconds |
|
||||
| `tmux-cli capture --pane=N` | Capture current pane output |
|
||||
| `tmux-cli list_panes` | List all panes (JSON with IDs and status) |
|
||||
| `tmux-cli kill --pane=N` | Kill a pane (cannot kill your own) |
|
||||
| `tmux-cli interrupt --pane=N` | Send Ctrl+C to a pane |
|
||||
| `tmux-cli escape --pane=N` | Send Escape to a pane |
|
||||
|
||||
### Pane identifiers
|
||||
|
||||
- Just the pane number: `2` (current window)
|
||||
- Full format: `session:window.pane` (e.g. `myapp:1.2`)
|
||||
|
||||
### Tips
|
||||
|
||||
- Use `wait_idle` instead of polling with repeated `capture` calls
|
||||
- Save the pane ID returned by `launch` for subsequent commands
|
||||
- Use `capture` to check state before sending input
|
||||
- For long-running Codex tasks, increase `--idle-time` (e.g. `--idle-time=15.0`)
|
||||
|
||||
## Reports
|
||||
|
||||
- `reports/current.md` always has the latest porting status.
|
||||
|
||||
@@ -1158,6 +1158,58 @@ public sealed partial class ClientConnection
|
||||
TraceInOp("PRE", pre);
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// Parser compatibility wrappers (features 2588, 2590, 2591)
|
||||
// =========================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Parses protocol bytes using the shared parser state for this connection.
|
||||
/// Mirrors Go <c>client.parse</c>.
|
||||
/// </summary>
|
||||
internal Exception? Parse(byte[] buf, IProtocolHandler handler)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(buf);
|
||||
ArgumentNullException.ThrowIfNull(handler);
|
||||
|
||||
ParseCtx.Kind = Kind;
|
||||
ParseCtx.HasHeaders = Headers;
|
||||
if (_mcl > 0)
|
||||
ParseCtx.MaxControlLine = _mcl;
|
||||
if (_mpay != 0)
|
||||
ParseCtx.MaxPayload = _mpay;
|
||||
|
||||
return ProtocolParser.Parse(ParseCtx, handler, buf);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Checks max control line enforcement for the current connection kind.
|
||||
/// Mirrors Go <c>client.overMaxControlLineLimit</c>.
|
||||
/// </summary>
|
||||
internal Exception? OverMaxControlLineLimit(byte[] arg, int mcl, IProtocolHandler handler)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(arg);
|
||||
ArgumentNullException.ThrowIfNull(handler);
|
||||
|
||||
ParseCtx.Kind = Kind;
|
||||
return ProtocolParser.OverMaxControlLineLimit(ParseCtx, handler, arg, mcl);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Re-processes stored pub args for split-buffer message payload handling.
|
||||
/// Mirrors Go <c>client.clonePubArg</c>.
|
||||
/// </summary>
|
||||
internal Exception? ClonePubArg(IProtocolHandler handler, bool lmsg)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(handler);
|
||||
|
||||
ParseCtx.Kind = Kind;
|
||||
ParseCtx.HasHeaders = Headers;
|
||||
if (_mpay != 0)
|
||||
ParseCtx.MaxPayload = _mpay;
|
||||
|
||||
return ProtocolParser.ClonePubArg(ParseCtx, handler, lmsg);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Generates the INFO JSON bytes sent to the client on connect.
|
||||
/// Stub — full implementation in session 09.
|
||||
|
||||
@@ -58,7 +58,7 @@ public sealed class SubscriptionIndex
|
||||
private long _cacheHits;
|
||||
private long _inserts;
|
||||
private long _removes;
|
||||
private SublistLevel _root;
|
||||
private Level _root;
|
||||
private Dictionary<string, SubscriptionIndexResult>? _cache;
|
||||
private int _ccSweep;
|
||||
private NotifyMaps? _notify;
|
||||
@@ -70,7 +70,7 @@ public sealed class SubscriptionIndex
|
||||
|
||||
private SubscriptionIndex(bool enableCache)
|
||||
{
|
||||
_root = new SublistLevel();
|
||||
_root = new Level();
|
||||
_cache = enableCache ? new Dictionary<string, SubscriptionIndexResult>() : null;
|
||||
}
|
||||
|
||||
@@ -116,7 +116,7 @@ public sealed class SubscriptionIndex
|
||||
try
|
||||
{
|
||||
bool sfwc = false, haswc = false, isnew = false;
|
||||
SublistNode? n = null;
|
||||
Node? n = null;
|
||||
var l = _root;
|
||||
|
||||
var start = 0;
|
||||
@@ -135,7 +135,7 @@ public sealed class SubscriptionIndex
|
||||
{
|
||||
if (!l.Nodes.TryGetValue(t, out n))
|
||||
{
|
||||
n = new SublistNode();
|
||||
n = new Node();
|
||||
l.Nodes[t] = n;
|
||||
}
|
||||
}
|
||||
@@ -148,7 +148,7 @@ public sealed class SubscriptionIndex
|
||||
haswc = true;
|
||||
if (n == null)
|
||||
{
|
||||
n = new SublistNode();
|
||||
n = new Node();
|
||||
l.Pwc = n;
|
||||
}
|
||||
break;
|
||||
@@ -158,21 +158,21 @@ public sealed class SubscriptionIndex
|
||||
sfwc = true;
|
||||
if (n == null)
|
||||
{
|
||||
n = new SublistNode();
|
||||
n = new Node();
|
||||
l.Fwc = n;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
if (!l.Nodes.TryGetValue(t, out n))
|
||||
{
|
||||
n = new SublistNode();
|
||||
n = new Node();
|
||||
l.Nodes[t] = n;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
n.Next ??= new SublistLevel();
|
||||
n.Next ??= new Level();
|
||||
l = n.Next;
|
||||
|
||||
start = i + 1;
|
||||
@@ -446,7 +446,7 @@ public sealed class SubscriptionIndex
|
||||
try
|
||||
{
|
||||
bool sfwc = false, haswc = false;
|
||||
SublistNode? n = null;
|
||||
Node? n = null;
|
||||
var l = _root;
|
||||
|
||||
var lnts = new LevelNodeToken[32];
|
||||
@@ -535,7 +535,7 @@ public sealed class SubscriptionIndex
|
||||
}
|
||||
}
|
||||
|
||||
private static (bool found, bool last) RemoveFromNode(SublistNode? n, Subscription sub)
|
||||
private static (bool found, bool last) RemoveFromNode(Node? n, Subscription sub)
|
||||
{
|
||||
if (n == null) return (false, true);
|
||||
|
||||
@@ -743,9 +743,9 @@ public sealed class SubscriptionIndex
|
||||
// Private: Trie matching (matchLevel)
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
private static void MatchLevel(SublistLevel? l, string[] toks, SubscriptionIndexResult results)
|
||||
private static void MatchLevel(Level? l, string[] toks, SubscriptionIndexResult results)
|
||||
{
|
||||
SublistNode? pwc = null, n = null;
|
||||
Node? pwc = null, n = null;
|
||||
for (int i = 0; i < toks.Length; i++)
|
||||
{
|
||||
if (l == null) return;
|
||||
@@ -760,9 +760,9 @@ public sealed class SubscriptionIndex
|
||||
if (pwc != null) AddNodeToResults(pwc, results);
|
||||
}
|
||||
|
||||
private static bool MatchLevelForAny(SublistLevel? l, ReadOnlySpan<string> toks, ref int np, ref int nq)
|
||||
private static bool MatchLevelForAny(Level? l, ReadOnlySpan<string> toks, ref int np, ref int nq)
|
||||
{
|
||||
SublistNode? pwc = null, n = null;
|
||||
Node? pwc = null, n = null;
|
||||
for (int i = 0; i < toks.Length; i++)
|
||||
{
|
||||
if (l == null) return false;
|
||||
@@ -804,7 +804,7 @@ public sealed class SubscriptionIndex
|
||||
// Private: Reverse match
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
private static void ReverseMatchLevel(SublistLevel? l, ReadOnlySpan<string> toks, SublistNode? n, SubscriptionIndexResult results)
|
||||
private static void ReverseMatchLevel(Level? l, ReadOnlySpan<string> toks, Node? n, SubscriptionIndexResult results)
|
||||
{
|
||||
if (l == null) return;
|
||||
for (int i = 0; i < toks.Length; i++)
|
||||
@@ -848,7 +848,7 @@ public sealed class SubscriptionIndex
|
||||
if (n != null) AddNodeToResults(n, results);
|
||||
}
|
||||
|
||||
private static void GetAllNodes(SublistLevel? l, SubscriptionIndexResult results)
|
||||
private static void GetAllNodes(Level? l, SubscriptionIndexResult results)
|
||||
{
|
||||
if (l == null) return;
|
||||
if (l.Pwc != null) AddNodeToResults(l.Pwc, results);
|
||||
@@ -864,7 +864,7 @@ public sealed class SubscriptionIndex
|
||||
// Private: addNodeToResults
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
private static void AddNodeToResults(SublistNode n, SubscriptionIndexResult results)
|
||||
private static void AddNodeToResults(Node n, SubscriptionIndexResult results)
|
||||
{
|
||||
// Plain subscriptions.
|
||||
if (n.PList != null)
|
||||
@@ -940,7 +940,7 @@ public sealed class SubscriptionIndex
|
||||
}
|
||||
}
|
||||
|
||||
private static void AddNodeToSubsLocal(SublistNode n, List<Subscription> subs, bool includeLeafHubs)
|
||||
private static void AddNodeToSubsLocal(Node n, List<Subscription> subs, bool includeLeafHubs)
|
||||
{
|
||||
if (n.PList != null)
|
||||
{
|
||||
@@ -960,7 +960,7 @@ public sealed class SubscriptionIndex
|
||||
}
|
||||
}
|
||||
|
||||
private static void CollectLocalSubs(SublistLevel? l, List<Subscription> subs, bool includeLeafHubs)
|
||||
private static void CollectLocalSubs(Level? l, List<Subscription> subs, bool includeLeafHubs)
|
||||
{
|
||||
if (l == null) return;
|
||||
foreach (var n in l.Nodes.Values)
|
||||
@@ -972,7 +972,7 @@ public sealed class SubscriptionIndex
|
||||
if (l.Fwc != null) { AddNodeToSubsLocal(l.Fwc, subs, includeLeafHubs); CollectLocalSubs(l.Fwc.Next, subs, includeLeafHubs); }
|
||||
}
|
||||
|
||||
private static void AddAllNodeToSubs(SublistNode n, List<Subscription> subs)
|
||||
private static void AddAllNodeToSubs(Node n, List<Subscription> subs)
|
||||
{
|
||||
if (n.PList != null)
|
||||
subs.AddRange(n.PList);
|
||||
@@ -986,7 +986,7 @@ public sealed class SubscriptionIndex
|
||||
subs.Add(sub);
|
||||
}
|
||||
|
||||
private static void CollectAllSubs(SublistLevel? l, List<Subscription> subs)
|
||||
private static void CollectAllSubs(Level? l, List<Subscription> subs)
|
||||
{
|
||||
if (l == null) return;
|
||||
foreach (var n in l.Nodes.Values)
|
||||
@@ -1204,7 +1204,7 @@ public sealed class SubscriptionIndex
|
||||
// Private: visitLevel (depth calculation for tests)
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
private static int VisitLevel(SublistLevel? l, int depth)
|
||||
private static int VisitLevel(Level? l, int depth)
|
||||
{
|
||||
if (l == null || l.NumNodes() == 0) return depth;
|
||||
depth++;
|
||||
@@ -1529,18 +1529,18 @@ public sealed class SubscriptionIndex
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Nested types: SublistNode, SublistLevel, NotifyMaps
|
||||
// Nested types: Node, Level, NotifyMaps
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
internal sealed class SublistNode
|
||||
internal sealed class Node
|
||||
{
|
||||
public readonly Dictionary<Subscription, byte> PSubs = new(ReferenceEqualityComparer.Instance);
|
||||
public Dictionary<string, Dictionary<Subscription, byte>>? QSubs;
|
||||
public List<Subscription>? PList;
|
||||
public SublistLevel? Next;
|
||||
public Level? Next;
|
||||
|
||||
/// <summary>Factory method matching Go's <c>newNode()</c>.</summary>
|
||||
public static SublistNode NewNode() => new();
|
||||
public static Node NewNode() => new();
|
||||
|
||||
public bool IsEmpty()
|
||||
{
|
||||
@@ -1549,14 +1549,14 @@ public sealed class SubscriptionIndex
|
||||
}
|
||||
}
|
||||
|
||||
internal sealed class SublistLevel
|
||||
internal sealed class Level
|
||||
{
|
||||
public readonly Dictionary<string, SublistNode> Nodes = new();
|
||||
public SublistNode? Pwc;
|
||||
public SublistNode? Fwc;
|
||||
public readonly Dictionary<string, Node> Nodes = new();
|
||||
public Node? Pwc;
|
||||
public Node? Fwc;
|
||||
|
||||
/// <summary>Factory method matching Go's <c>newLevel()</c>.</summary>
|
||||
public static SublistLevel NewLevel() => new();
|
||||
public static Level NewLevel() => new();
|
||||
|
||||
public int NumNodes()
|
||||
{
|
||||
@@ -1566,8 +1566,9 @@ public sealed class SubscriptionIndex
|
||||
return num;
|
||||
}
|
||||
|
||||
public void PruneNode(SublistNode n, string t)
|
||||
public void PruneNode(Node? n, string t)
|
||||
{
|
||||
if (n == null) return;
|
||||
if (ReferenceEquals(n, Fwc)) Fwc = null;
|
||||
else if (ReferenceEquals(n, Pwc)) Pwc = null;
|
||||
else Nodes.Remove(t);
|
||||
@@ -1582,11 +1583,11 @@ public sealed class SubscriptionIndex
|
||||
|
||||
private readonly struct LevelNodeToken
|
||||
{
|
||||
public readonly SublistLevel Level;
|
||||
public readonly SublistNode Node;
|
||||
public readonly Level Level;
|
||||
public readonly Node Node;
|
||||
public readonly string Token;
|
||||
|
||||
public LevelNodeToken(SublistLevel level, SublistNode node, string token)
|
||||
public LevelNodeToken(Level level, Node node, string token)
|
||||
{
|
||||
Level = level;
|
||||
Node = node;
|
||||
|
||||
@@ -103,6 +103,23 @@ public sealed class JetStreamMemStore : IStreamStore
|
||||
_scheduling = new MsgScheduling(RunMsgScheduling);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Factory that mirrors Go <c>newMemStore</c> mapping semantics.
|
||||
/// </summary>
|
||||
public static JetStreamMemStore NewMemStore(StreamConfig cfg)
|
||||
{
|
||||
var ms = new JetStreamMemStore(cfg);
|
||||
|
||||
if (cfg.FirstSeq > 0)
|
||||
{
|
||||
var (_, err) = ms.PurgeInternal(cfg.FirstSeq);
|
||||
if (err != null)
|
||||
throw err;
|
||||
}
|
||||
|
||||
return ms;
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// IStreamStore — store / load
|
||||
// -----------------------------------------------------------------------
|
||||
@@ -1133,17 +1150,7 @@ public sealed class JetStreamMemStore : IStreamStore
|
||||
_mu.EnterReadLock();
|
||||
try
|
||||
{
|
||||
if (_msgs == null || _msgs.Count == 0) return (Array.Empty<ulong>(), null);
|
||||
var seqs = new List<ulong>(_fss.Size());
|
||||
_fss.IterFast((subj, ss) =>
|
||||
{
|
||||
if (ss.LastNeedsUpdate)
|
||||
RecalculateForSubj(Encoding.UTF8.GetString(subj), ss);
|
||||
seqs.Add(ss.Last);
|
||||
return true;
|
||||
});
|
||||
seqs.Sort();
|
||||
return (seqs.ToArray(), null);
|
||||
return AllLastSeqsLocked();
|
||||
}
|
||||
finally
|
||||
{
|
||||
@@ -1151,6 +1158,27 @@ public sealed class JetStreamMemStore : IStreamStore
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns sorted per-subject last sequences without taking locks.
|
||||
/// Mirrors Go <c>allLastSeqsLocked</c>.
|
||||
/// </summary>
|
||||
private (ulong[] Seqs, Exception? Error) AllLastSeqsLocked()
|
||||
{
|
||||
if (_msgs == null || _msgs.Count == 0) return (Array.Empty<ulong>(), null);
|
||||
|
||||
var seqs = new List<ulong>(_fss.Size());
|
||||
_fss.IterFast((subj, ss) =>
|
||||
{
|
||||
if (ss.LastNeedsUpdate)
|
||||
RecalculateForSubj(Encoding.UTF8.GetString(subj), ss);
|
||||
seqs.Add(ss.Last);
|
||||
return true;
|
||||
});
|
||||
|
||||
seqs.Sort();
|
||||
return (seqs.ToArray(), null);
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
public (ulong[] Seqs, Exception? Error) MultiLastSeqs(string[] filters, ulong maxSeq, int maxAllowed)
|
||||
{
|
||||
|
||||
@@ -219,6 +219,51 @@ public class SubscriptionIndexTests
|
||||
s.NumLevels().ShouldBe(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void PruneNode_NullNode_DoesNotMutateLiteralNodes()
|
||||
{
|
||||
var level = new SubscriptionIndex.Level();
|
||||
var literal = new SubscriptionIndex.Node();
|
||||
level.Nodes["foo"] = literal;
|
||||
level.Fwc = new SubscriptionIndex.Node();
|
||||
level.Pwc = new SubscriptionIndex.Node();
|
||||
|
||||
level.PruneNode(null!, "foo");
|
||||
|
||||
level.Nodes.Count.ShouldBe(1);
|
||||
level.Nodes["foo"].ShouldBeSameAs(literal);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void IsEmpty_WithAndWithoutChildren_TracksNodeEmptiness()
|
||||
{
|
||||
var node = new SubscriptionIndex.Node();
|
||||
node.IsEmpty().ShouldBeTrue();
|
||||
|
||||
node.Next = new SubscriptionIndex.Level();
|
||||
node.Next.Nodes["bar"] = new SubscriptionIndex.Node();
|
||||
node.IsEmpty().ShouldBeFalse();
|
||||
|
||||
node.Next.Nodes.Clear();
|
||||
node.IsEmpty().ShouldBeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void NumNodes_WithLiteralAndWildcardEntries_CountsAllBranches()
|
||||
{
|
||||
var level = new SubscriptionIndex.Level();
|
||||
level.Nodes["foo"] = new SubscriptionIndex.Node();
|
||||
level.Pwc = new SubscriptionIndex.Node();
|
||||
level.Fwc = new SubscriptionIndex.Node();
|
||||
|
||||
level.NumNodes().ShouldBe(3);
|
||||
|
||||
level.PruneNode(level.Pwc, SubscriptionIndex.Pwcs);
|
||||
level.PruneNode(level.Fwc, SubscriptionIndex.Fwcs);
|
||||
level.PruneNode(level.Nodes["foo"], "foo");
|
||||
level.NumNodes().ShouldBe(0);
|
||||
}
|
||||
|
||||
[Theory] // T:2983, T:2984
|
||||
[InlineData(true)]
|
||||
[InlineData(false)]
|
||||
|
||||
@@ -32,7 +32,7 @@ public class JetStreamMemoryStoreTests
|
||||
private static JetStreamMemStore NewMemStore(StreamConfig? cfg = null)
|
||||
{
|
||||
cfg ??= new StreamConfig { Storage = StorageType.MemoryStorage, Name = "test" };
|
||||
return new JetStreamMemStore(cfg);
|
||||
return JetStreamMemStore.NewMemStore(cfg);
|
||||
}
|
||||
|
||||
private static byte[] Bytes(string s) => Encoding.UTF8.GetBytes(s);
|
||||
@@ -41,6 +41,71 @@ public class JetStreamMemoryStoreTests
|
||||
private static ulong MsgSize(string subj, byte[]? hdr, byte[]? msg)
|
||||
=> (ulong)(subj.Length + (hdr?.Length ?? 0) + (msg?.Length ?? 0) + 16);
|
||||
|
||||
[Fact]
|
||||
public void NewMemStore_FactoryMethod_InitializesFirstSequence()
|
||||
{
|
||||
var method = typeof(JetStreamMemStore).GetMethod(
|
||||
"NewMemStore",
|
||||
System.Reflection.BindingFlags.Static | System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.NonPublic);
|
||||
method.ShouldNotBeNull();
|
||||
|
||||
var cfg = new StreamConfig
|
||||
{
|
||||
Name = "factory-init",
|
||||
Storage = StorageType.MemoryStorage,
|
||||
FirstSeq = 1000,
|
||||
};
|
||||
|
||||
var created = method!.Invoke(null, new object?[] { cfg });
|
||||
created.ShouldBeOfType<JetStreamMemStore>();
|
||||
|
||||
var ms = (JetStreamMemStore)created!;
|
||||
var state = ms.State();
|
||||
state.FirstSeq.ShouldBe(1000UL);
|
||||
state.LastSeq.ShouldBe(999UL);
|
||||
ms.Stop();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void AllLastSeqsLocked_MatchesPublicAllLastSeqsOrdering()
|
||||
{
|
||||
var cfg = new StreamConfig
|
||||
{
|
||||
Name = "locked-all-last-seqs",
|
||||
Subjects = new[] { "*.*" },
|
||||
MaxMsgsPer = 50,
|
||||
Storage = StorageType.MemoryStorage,
|
||||
};
|
||||
var ms = NewMemStore(cfg);
|
||||
|
||||
var subjs = new[] { "foo.foo", "foo.bar", "foo.baz", "bar.foo", "bar.bar", "bar.baz" };
|
||||
var msg = Bytes("abc");
|
||||
var rng = new Random(11);
|
||||
|
||||
for (var i = 0; i < 1_000; i++)
|
||||
{
|
||||
var subj = subjs[rng.Next(subjs.Length)];
|
||||
ms.StoreMsg(subj, null, msg, 0);
|
||||
}
|
||||
|
||||
var method = typeof(JetStreamMemStore).GetMethod(
|
||||
"AllLastSeqsLocked",
|
||||
System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic);
|
||||
method.ShouldNotBeNull();
|
||||
|
||||
var result = method!.Invoke(ms, Array.Empty<object>());
|
||||
result.ShouldNotBeNull();
|
||||
var (lockedSeqs, lockedErr) = ((ValueTuple<ulong[], Exception?>)result!);
|
||||
|
||||
var (publicSeqs, publicErr) = ms.AllLastSeqs();
|
||||
|
||||
lockedErr.ShouldBeNull();
|
||||
publicErr.ShouldBeNull();
|
||||
lockedSeqs.SequenceEqual(publicSeqs).ShouldBeTrue();
|
||||
|
||||
ms.Stop();
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// TestMemStoreBasics (T:2023)
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
@@ -33,7 +33,7 @@ public class StorageEngineTests
|
||||
private static JetStreamMemStore NewMemStore(StreamConfig cfg)
|
||||
{
|
||||
cfg.Storage = StorageType.MemoryStorage;
|
||||
return new JetStreamMemStore(cfg);
|
||||
return JetStreamMemStore.NewMemStore(cfg);
|
||||
}
|
||||
|
||||
private static byte[] Bytes(string s) => Encoding.UTF8.GetBytes(s);
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
|
||||
using System.Text;
|
||||
using Shouldly;
|
||||
using ZB.MOM.NatsNet.Server;
|
||||
using ZB.MOM.NatsNet.Server.Internal;
|
||||
using ZB.MOM.NatsNet.Server.Protocol;
|
||||
|
||||
@@ -754,6 +755,110 @@ public class ProtocolParserTests
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ParsePub_SplitPayload_ClonesPubArgAndCompletesMessage()
|
||||
{
|
||||
var c = DummyClient();
|
||||
var h = DummyHandler();
|
||||
|
||||
var first = "PUB foo 5\r\nhe"u8.ToArray();
|
||||
ProtocolParser.Parse(c, h, first).ShouldBeNull();
|
||||
c.State.ShouldBe(ParserState.MsgPayload);
|
||||
c.ArgBuf.ShouldNotBeNull();
|
||||
Encoding.ASCII.GetString(c.ArgBuf!).ShouldBe("foo 5");
|
||||
c.MsgBuf.ShouldNotBeNull();
|
||||
Encoding.ASCII.GetString(c.MsgBuf!).ShouldBe("he");
|
||||
h.InboundMessages.Count.ShouldBe(0);
|
||||
|
||||
var second = "llo\r\n"u8.ToArray();
|
||||
ProtocolParser.Parse(c, h, second).ShouldBeNull();
|
||||
c.State.ShouldBe(ParserState.OpStart);
|
||||
h.InboundMessages.Count.ShouldBe(1);
|
||||
Encoding.ASCII.GetString(h.InboundMessages[0]).ShouldBe("hello\r\n");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ClonePubArg_ClientBranch_ReprocessesPubArgs()
|
||||
{
|
||||
var c = DummyClient();
|
||||
var h = DummyHandler();
|
||||
c.Pa.Arg = "foo 5"u8.ToArray();
|
||||
c.Pa.HeaderSize = -1;
|
||||
|
||||
var err = ProtocolParser.ClonePubArg(c, h, lmsg: false);
|
||||
|
||||
err.ShouldBeNull();
|
||||
c.ArgBuf.ShouldNotBeNull();
|
||||
Encoding.ASCII.GetString(c.ArgBuf!).ShouldBe("foo 5");
|
||||
Encoding.ASCII.GetString(c.Pa.Subject!).ShouldBe("foo");
|
||||
c.Pa.Size.ShouldBe(5);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ClonePubArg_RouterLmsgBranch_ReprocessesRoutedOriginArgs()
|
||||
{
|
||||
var c = DummyRouteClient();
|
||||
var h = DummyHandler();
|
||||
c.Pa.Arg = "$G foo 5"u8.ToArray();
|
||||
c.Pa.HeaderSize = -1;
|
||||
|
||||
var err = ProtocolParser.ClonePubArg(c, h, lmsg: true);
|
||||
|
||||
err.ShouldBeNull();
|
||||
c.ArgBuf.ShouldNotBeNull();
|
||||
Encoding.ASCII.GetString(c.ArgBuf!).ShouldBe("$G foo 5");
|
||||
Encoding.ASCII.GetString(c.Pa.Account!).ShouldBe("$G");
|
||||
Encoding.ASCII.GetString(c.Pa.Subject!).ShouldBe("foo");
|
||||
c.Pa.Size.ShouldBe(5);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ClonePubArg_LeafHeaderBranch_ReprocessesLeafHeaderArgs()
|
||||
{
|
||||
var c = new ParseContext { Kind = ClientKind.Leaf, MaxPayload = -1 };
|
||||
var h = DummyHandler();
|
||||
c.Pa.Arg = "$G foo 2 5"u8.ToArray();
|
||||
c.Pa.HeaderSize = 2;
|
||||
|
||||
var err = ProtocolParser.ClonePubArg(c, h, lmsg: false);
|
||||
|
||||
err.ShouldBeNull();
|
||||
c.ArgBuf.ShouldNotBeNull();
|
||||
Encoding.ASCII.GetString(c.ArgBuf!).ShouldBe("$G foo 2 5");
|
||||
Encoding.ASCII.GetString(c.Pa.Account!).ShouldBe("$G");
|
||||
Encoding.ASCII.GetString(c.Pa.Subject!).ShouldBe("foo");
|
||||
c.Pa.HeaderSize.ShouldBe(2);
|
||||
c.Pa.Size.ShouldBe(5);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ClientConnection_ExposesParserCompatibilityMethods()
|
||||
{
|
||||
var flags = System.Reflection.BindingFlags.Instance
|
||||
| System.Reflection.BindingFlags.NonPublic
|
||||
| System.Reflection.BindingFlags.Public;
|
||||
var methods = typeof(ClientConnection).GetMethods(flags);
|
||||
|
||||
methods.Any(method =>
|
||||
method.Name == "Parse" &&
|
||||
method.GetParameters().Length == 2 &&
|
||||
method.GetParameters()[0].ParameterType == typeof(byte[]) &&
|
||||
method.GetParameters()[1].ParameterType == typeof(IProtocolHandler)).ShouldBeTrue();
|
||||
|
||||
methods.Any(method =>
|
||||
method.Name == "OverMaxControlLineLimit" &&
|
||||
method.GetParameters().Length == 3 &&
|
||||
method.GetParameters()[0].ParameterType == typeof(byte[]) &&
|
||||
method.GetParameters()[1].ParameterType == typeof(int) &&
|
||||
method.GetParameters()[2].ParameterType == typeof(IProtocolHandler)).ShouldBeTrue();
|
||||
|
||||
methods.Any(method =>
|
||||
method.Name == "ClonePubArg" &&
|
||||
method.GetParameters().Length == 2 &&
|
||||
method.GetParameters()[0].ParameterType == typeof(IProtocolHandler) &&
|
||||
method.GetParameters()[1].ParameterType == typeof(bool)).ShouldBeTrue();
|
||||
}
|
||||
|
||||
// =====================================================================
|
||||
// TestProtocolHandler — stub handler for tests
|
||||
// =====================================================================
|
||||
@@ -770,6 +875,7 @@ public class ProtocolParserTests
|
||||
public int PingCount { get; private set; }
|
||||
public int PongCount { get; private set; }
|
||||
public byte[]? ConnectArgs { get; private set; }
|
||||
public List<byte[]> InboundMessages { get; } = [];
|
||||
|
||||
public Exception? ProcessConnect(byte[] arg) { ConnectArgs = arg; return null; }
|
||||
public Exception? ProcessInfo(byte[] arg) => null;
|
||||
@@ -786,7 +892,7 @@ public class ProtocolParserTests
|
||||
public Exception? ProcessLeafUnsub(byte[] arg) => null;
|
||||
public Exception? ProcessAccountSub(byte[] arg) => null;
|
||||
public void ProcessAccountUnsub(byte[] arg) { }
|
||||
public void ProcessInboundMsg(byte[] msg) { }
|
||||
public void ProcessInboundMsg(byte[] msg) => InboundMessages.Add(msg);
|
||||
public bool SelectMappedSubject() => false;
|
||||
public void TraceInOp(string name, byte[]? arg) { }
|
||||
public void TraceMsg(byte[] msg) { }
|
||||
|
||||
BIN
porting.db
BIN
porting.db
Binary file not shown.
37
reports/current.md
Normal file
37
reports/current.md
Normal file
@@ -0,0 +1,37 @@
|
||||
# NATS .NET Porting Status Report
|
||||
|
||||
Generated: 2026-02-28 12:18:22 UTC
|
||||
|
||||
## Modules (12 total)
|
||||
|
||||
| Status | Count |
|
||||
|--------|-------|
|
||||
| verified | 12 |
|
||||
|
||||
## Features (3673 total)
|
||||
|
||||
| Status | Count |
|
||||
|--------|-------|
|
||||
| deferred | 2359 |
|
||||
| n_a | 24 |
|
||||
| stub | 1 |
|
||||
| verified | 1289 |
|
||||
|
||||
## Unit Tests (3257 total)
|
||||
|
||||
| Status | Count |
|
||||
|--------|-------|
|
||||
| deferred | 2091 |
|
||||
| n_a | 187 |
|
||||
| verified | 979 |
|
||||
|
||||
## Library Mappings (36 total)
|
||||
|
||||
| Status | Count |
|
||||
|--------|-------|
|
||||
| mapped | 36 |
|
||||
|
||||
|
||||
## Overall Progress
|
||||
|
||||
**2491/6942 items complete (35.9%)**
|
||||
37
reports/report_1a0ac59.md
Normal file
37
reports/report_1a0ac59.md
Normal file
@@ -0,0 +1,37 @@
|
||||
# NATS .NET Porting Status Report
|
||||
|
||||
Generated: 2026-02-28 12:17:02 UTC
|
||||
|
||||
## Modules (12 total)
|
||||
|
||||
| Status | Count |
|
||||
|--------|-------|
|
||||
| verified | 12 |
|
||||
|
||||
## Features (3673 total)
|
||||
|
||||
| Status | Count |
|
||||
|--------|-------|
|
||||
| deferred | 2359 |
|
||||
| n_a | 24 |
|
||||
| stub | 1 |
|
||||
| verified | 1289 |
|
||||
|
||||
## Unit Tests (3257 total)
|
||||
|
||||
| Status | Count |
|
||||
|--------|-------|
|
||||
| deferred | 2091 |
|
||||
| n_a | 187 |
|
||||
| verified | 979 |
|
||||
|
||||
## Library Mappings (36 total)
|
||||
|
||||
| Status | Count |
|
||||
|--------|-------|
|
||||
| mapped | 36 |
|
||||
|
||||
|
||||
## Overall Progress
|
||||
|
||||
**2491/6942 items complete (35.9%)**
|
||||
37
reports/report_38b6fc8.md
Normal file
37
reports/report_38b6fc8.md
Normal file
@@ -0,0 +1,37 @@
|
||||
# NATS .NET Porting Status Report
|
||||
|
||||
Generated: 2026-02-28 12:11:43 UTC
|
||||
|
||||
## Modules (12 total)
|
||||
|
||||
| Status | Count |
|
||||
|--------|-------|
|
||||
| verified | 12 |
|
||||
|
||||
## Features (3673 total)
|
||||
|
||||
| Status | Count |
|
||||
|--------|-------|
|
||||
| deferred | 2361 |
|
||||
| n_a | 24 |
|
||||
| stub | 1 |
|
||||
| verified | 1287 |
|
||||
|
||||
## Unit Tests (3257 total)
|
||||
|
||||
| Status | Count |
|
||||
|--------|-------|
|
||||
| deferred | 2091 |
|
||||
| n_a | 187 |
|
||||
| verified | 979 |
|
||||
|
||||
## Library Mappings (36 total)
|
||||
|
||||
| Status | Count |
|
||||
|--------|-------|
|
||||
| mapped | 36 |
|
||||
|
||||
|
||||
## Overall Progress
|
||||
|
||||
**2489/6942 items complete (35.9%)**
|
||||
37
reports/report_c2a73b4.md
Normal file
37
reports/report_c2a73b4.md
Normal file
@@ -0,0 +1,37 @@
|
||||
# NATS .NET Porting Status Report
|
||||
|
||||
Generated: 2026-02-28 12:17:23 UTC
|
||||
|
||||
## Modules (12 total)
|
||||
|
||||
| Status | Count |
|
||||
|--------|-------|
|
||||
| verified | 12 |
|
||||
|
||||
## Features (3673 total)
|
||||
|
||||
| Status | Count |
|
||||
|--------|-------|
|
||||
| deferred | 2359 |
|
||||
| n_a | 24 |
|
||||
| stub | 1 |
|
||||
| verified | 1289 |
|
||||
|
||||
## Unit Tests (3257 total)
|
||||
|
||||
| Status | Count |
|
||||
|--------|-------|
|
||||
| deferred | 2091 |
|
||||
| n_a | 187 |
|
||||
| verified | 979 |
|
||||
|
||||
## Library Mappings (36 total)
|
||||
|
||||
| Status | Count |
|
||||
|--------|-------|
|
||||
| mapped | 36 |
|
||||
|
||||
|
||||
## Overall Progress
|
||||
|
||||
**2491/6942 items complete (35.9%)**
|
||||
Reference in New Issue
Block a user