From c2a73b49f7b3075429ea5298d1726d739aa1e88c Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 07:17:01 -0500 Subject: [PATCH] feat(batch2): verify memstore remainder features --- .../JetStream/MemStore.cs | 50 ++++++++++--- .../JetStream/JetStreamMemoryStoreTests.cs | 67 +++++++++++++++++- .../JetStream/StorageEngineTests.cs | 2 +- porting.db | Bin 6356992 -> 6356992 bytes reports/current.md | 8 +-- reports/report_1a0ac59.md | 37 ++++++++++ 6 files changed, 147 insertions(+), 17 deletions(-) create mode 100644 reports/report_1a0ac59.md diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs index ff828cb..e92739f 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs @@ -103,6 +103,23 @@ public sealed class JetStreamMemStore : IStreamStore _scheduling = new MsgScheduling(RunMsgScheduling); } + /// + /// Factory that mirrors Go newMemStore mapping semantics. + /// + public static JetStreamMemStore NewMemStore(StreamConfig cfg) + { + var ms = new JetStreamMemStore(cfg); + + if (cfg.FirstSeq > 0) + { + var (_, err) = ms.PurgeInternal(cfg.FirstSeq); + if (err != null) + throw err; + } + + return ms; + } + // ----------------------------------------------------------------------- // IStreamStore — store / load // ----------------------------------------------------------------------- @@ -1133,17 +1150,7 @@ public sealed class JetStreamMemStore : IStreamStore _mu.EnterReadLock(); try { - if (_msgs == null || _msgs.Count == 0) return (Array.Empty(), null); - var seqs = new List(_fss.Size()); - _fss.IterFast((subj, ss) => - { - if (ss.LastNeedsUpdate) - RecalculateForSubj(Encoding.UTF8.GetString(subj), ss); - seqs.Add(ss.Last); - return true; - }); - seqs.Sort(); - return (seqs.ToArray(), null); + return AllLastSeqsLocked(); } finally { @@ -1151,6 +1158,27 @@ public sealed class JetStreamMemStore : IStreamStore } } + /// + /// Returns sorted per-subject last sequences without taking locks. + /// Mirrors Go allLastSeqsLocked. + /// + private (ulong[] Seqs, Exception? Error) AllLastSeqsLocked() + { + if (_msgs == null || _msgs.Count == 0) return (Array.Empty(), null); + + var seqs = new List(_fss.Size()); + _fss.IterFast((subj, ss) => + { + if (ss.LastNeedsUpdate) + RecalculateForSubj(Encoding.UTF8.GetString(subj), ss); + seqs.Add(ss.Last); + return true; + }); + + seqs.Sort(); + return (seqs.ToArray(), null); + } + /// public (ulong[] Seqs, Exception? Error) MultiLastSeqs(string[] filters, ulong maxSeq, int maxAllowed) { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamMemoryStoreTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamMemoryStoreTests.cs index 0d7ca72..e39670b 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamMemoryStoreTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamMemoryStoreTests.cs @@ -32,7 +32,7 @@ public class JetStreamMemoryStoreTests private static JetStreamMemStore NewMemStore(StreamConfig? cfg = null) { cfg ??= new StreamConfig { Storage = StorageType.MemoryStorage, Name = "test" }; - return new JetStreamMemStore(cfg); + return JetStreamMemStore.NewMemStore(cfg); } private static byte[] Bytes(string s) => Encoding.UTF8.GetBytes(s); @@ -41,6 +41,71 @@ public class JetStreamMemoryStoreTests private static ulong MsgSize(string subj, byte[]? hdr, byte[]? msg) => (ulong)(subj.Length + (hdr?.Length ?? 0) + (msg?.Length ?? 0) + 16); + [Fact] + public void NewMemStore_FactoryMethod_InitializesFirstSequence() + { + var method = typeof(JetStreamMemStore).GetMethod( + "NewMemStore", + System.Reflection.BindingFlags.Static | System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.NonPublic); + method.ShouldNotBeNull(); + + var cfg = new StreamConfig + { + Name = "factory-init", + Storage = StorageType.MemoryStorage, + FirstSeq = 1000, + }; + + var created = method!.Invoke(null, new object?[] { cfg }); + created.ShouldBeOfType(); + + var ms = (JetStreamMemStore)created!; + var state = ms.State(); + state.FirstSeq.ShouldBe(1000UL); + state.LastSeq.ShouldBe(999UL); + ms.Stop(); + } + + [Fact] + public void AllLastSeqsLocked_MatchesPublicAllLastSeqsOrdering() + { + var cfg = new StreamConfig + { + Name = "locked-all-last-seqs", + Subjects = new[] { "*.*" }, + MaxMsgsPer = 50, + Storage = StorageType.MemoryStorage, + }; + var ms = NewMemStore(cfg); + + var subjs = new[] { "foo.foo", "foo.bar", "foo.baz", "bar.foo", "bar.bar", "bar.baz" }; + var msg = Bytes("abc"); + var rng = new Random(11); + + for (var i = 0; i < 1_000; i++) + { + var subj = subjs[rng.Next(subjs.Length)]; + ms.StoreMsg(subj, null, msg, 0); + } + + var method = typeof(JetStreamMemStore).GetMethod( + "AllLastSeqsLocked", + System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic); + method.ShouldNotBeNull(); + + var result = method!.Invoke(ms, Array.Empty()); + result.ShouldNotBeNull(); + var (lockedSeqs, lockedErr) = ((ValueTuple)result!); + + var (publicSeqs, publicErr) = ms.AllLastSeqs(); + + lockedErr.ShouldBeNull(); + publicErr.ShouldBeNull(); + lockedSeqs.SequenceEqual(publicSeqs).ShouldBeTrue(); + + ms.Stop(); + } + // ----------------------------------------------------------------------- // TestMemStoreBasics (T:2023) // ----------------------------------------------------------------------- diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.cs index 4198b4c..13f5095 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.cs @@ -33,7 +33,7 @@ public class StorageEngineTests private static JetStreamMemStore NewMemStore(StreamConfig cfg) { cfg.Storage = StorageType.MemoryStorage; - return new JetStreamMemStore(cfg); + return JetStreamMemStore.NewMemStore(cfg); } private static byte[] Bytes(string s) => Encoding.UTF8.GetBytes(s); diff --git a/porting.db b/porting.db index 6bbe23bec6f13c1c4c597054a54204115de6401d..301f66ec07c2a8501c2eb755a7791b8b3885632d 100644 GIT binary patch delta 1039 zcmbu-TS!xJ90%~7|8|a>m+d&WW$Em=bl%eKoOx?rnq5pS)2Vqkop#irPT7t@K5bM8 z8uSwUp$IY}BuIRyPEX;3Q1~Q0Nl;kyEWLD5U-~L!MUTP%1OMM2zI^$?KN`KEjl%K+ z7xU`+f=gX+F*(nP-l?vAX1rs%*Tkryn^96tYn#a%YW9YAUhgn6j0x01J7 zUEDj3kv{Gq#X7?#^F#g6L9*H{2CbDul<7ge)7Yzt-@jRAZA%2SpaWL5Epaz9sky*p zr&D*hWZJOCCD13O84kLV$+E0i;+0BYtaCPjurW3D3uj`PKAQWL>!Rg5oR3PoNp`w& zpG%>OJ6!hfOtC6_<3_cF@z2g}a}Wo5FhD#cKq4f8QJvd1&DeC?d&{~-BG6o(HxqJ~ zdU@VK$St*(=kGCeH<=&Nkwr~D-D>3}I-J#GvRe#BP5Mn2~|)HC&33ba0*Vt88{2Ia1QFA9va|0T!2Qn2u;uoerSPKXoE}8 z4js@5UC<3Z&#^MLQY!^GBUOfAMoNgGLtiN1uH_-BSKgQ!>H)HsQI%I>mt zD)WgrSZmBRUmdTp6Z^D1X7voq0cAQY)Bf^qI}KeE&3vOg(W*>_<)Wd8R}{FsW#Wup7!0<{zNq0=}spSLa*y^ z5?UtXRmo*@!X>olGqbBEJn+H?KLQ9M1WV1XhNt7k%D=~LX_1sE+m6b~ylu%r+m5M{ zZ9mXmZ^eTA6|pA_pQVk VFMLbBw0Fv*l>SZJXI4rB-oHFC{Zjw{ diff --git a/reports/current.md b/reports/current.md index 506c986..5cf2338 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-02-28 12:11:43 UTC +Generated: 2026-02-28 12:17:02 UTC ## Modules (12 total) @@ -12,10 +12,10 @@ Generated: 2026-02-28 12:11:43 UTC | Status | Count | |--------|-------| -| deferred | 2361 | +| deferred | 2359 | | n_a | 24 | | stub | 1 | -| verified | 1287 | +| verified | 1289 | ## Unit Tests (3257 total) @@ -34,4 +34,4 @@ Generated: 2026-02-28 12:11:43 UTC ## Overall Progress -**2489/6942 items complete (35.9%)** +**2491/6942 items complete (35.9%)** diff --git a/reports/report_1a0ac59.md b/reports/report_1a0ac59.md new file mode 100644 index 0000000..5cf2338 --- /dev/null +++ b/reports/report_1a0ac59.md @@ -0,0 +1,37 @@ +# NATS .NET Porting Status Report + +Generated: 2026-02-28 12:17:02 UTC + +## Modules (12 total) + +| Status | Count | +|--------|-------| +| verified | 12 | + +## Features (3673 total) + +| Status | Count | +|--------|-------| +| deferred | 2359 | +| n_a | 24 | +| stub | 1 | +| verified | 1289 | + +## Unit Tests (3257 total) + +| Status | Count | +|--------|-------| +| deferred | 2091 | +| n_a | 187 | +| verified | 979 | + +## Library Mappings (36 total) + +| Status | Count | +|--------|-------| +| mapped | 36 | + + +## Overall Progress + +**2491/6942 items complete (35.9%)**