diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs index ff828cb..e92739f 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs @@ -103,6 +103,23 @@ public sealed class JetStreamMemStore : IStreamStore _scheduling = new MsgScheduling(RunMsgScheduling); } + /// + /// Factory that mirrors Go newMemStore mapping semantics. + /// + public static JetStreamMemStore NewMemStore(StreamConfig cfg) + { + var ms = new JetStreamMemStore(cfg); + + if (cfg.FirstSeq > 0) + { + var (_, err) = ms.PurgeInternal(cfg.FirstSeq); + if (err != null) + throw err; + } + + return ms; + } + // ----------------------------------------------------------------------- // IStreamStore — store / load // ----------------------------------------------------------------------- @@ -1133,17 +1150,7 @@ public sealed class JetStreamMemStore : IStreamStore _mu.EnterReadLock(); try { - if (_msgs == null || _msgs.Count == 0) return (Array.Empty(), null); - var seqs = new List(_fss.Size()); - _fss.IterFast((subj, ss) => - { - if (ss.LastNeedsUpdate) - RecalculateForSubj(Encoding.UTF8.GetString(subj), ss); - seqs.Add(ss.Last); - return true; - }); - seqs.Sort(); - return (seqs.ToArray(), null); + return AllLastSeqsLocked(); } finally { @@ -1151,6 +1158,27 @@ public sealed class JetStreamMemStore : IStreamStore } } + /// + /// Returns sorted per-subject last sequences without taking locks. + /// Mirrors Go allLastSeqsLocked. + /// + private (ulong[] Seqs, Exception? Error) AllLastSeqsLocked() + { + if (_msgs == null || _msgs.Count == 0) return (Array.Empty(), null); + + var seqs = new List(_fss.Size()); + _fss.IterFast((subj, ss) => + { + if (ss.LastNeedsUpdate) + RecalculateForSubj(Encoding.UTF8.GetString(subj), ss); + seqs.Add(ss.Last); + return true; + }); + + seqs.Sort(); + return (seqs.ToArray(), null); + } + /// public (ulong[] Seqs, Exception? Error) MultiLastSeqs(string[] filters, ulong maxSeq, int maxAllowed) { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamMemoryStoreTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamMemoryStoreTests.cs index 0d7ca72..e39670b 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamMemoryStoreTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamMemoryStoreTests.cs @@ -32,7 +32,7 @@ public class JetStreamMemoryStoreTests private static JetStreamMemStore NewMemStore(StreamConfig? cfg = null) { cfg ??= new StreamConfig { Storage = StorageType.MemoryStorage, Name = "test" }; - return new JetStreamMemStore(cfg); + return JetStreamMemStore.NewMemStore(cfg); } private static byte[] Bytes(string s) => Encoding.UTF8.GetBytes(s); @@ -41,6 +41,71 @@ public class JetStreamMemoryStoreTests private static ulong MsgSize(string subj, byte[]? hdr, byte[]? msg) => (ulong)(subj.Length + (hdr?.Length ?? 0) + (msg?.Length ?? 0) + 16); + [Fact] + public void NewMemStore_FactoryMethod_InitializesFirstSequence() + { + var method = typeof(JetStreamMemStore).GetMethod( + "NewMemStore", + System.Reflection.BindingFlags.Static | System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.NonPublic); + method.ShouldNotBeNull(); + + var cfg = new StreamConfig + { + Name = "factory-init", + Storage = StorageType.MemoryStorage, + FirstSeq = 1000, + }; + + var created = method!.Invoke(null, new object?[] { cfg }); + created.ShouldBeOfType(); + + var ms = (JetStreamMemStore)created!; + var state = ms.State(); + state.FirstSeq.ShouldBe(1000UL); + state.LastSeq.ShouldBe(999UL); + ms.Stop(); + } + + [Fact] + public void AllLastSeqsLocked_MatchesPublicAllLastSeqsOrdering() + { + var cfg = new StreamConfig + { + Name = "locked-all-last-seqs", + Subjects = new[] { "*.*" }, + MaxMsgsPer = 50, + Storage = StorageType.MemoryStorage, + }; + var ms = NewMemStore(cfg); + + var subjs = new[] { "foo.foo", "foo.bar", "foo.baz", "bar.foo", "bar.bar", "bar.baz" }; + var msg = Bytes("abc"); + var rng = new Random(11); + + for (var i = 0; i < 1_000; i++) + { + var subj = subjs[rng.Next(subjs.Length)]; + ms.StoreMsg(subj, null, msg, 0); + } + + var method = typeof(JetStreamMemStore).GetMethod( + "AllLastSeqsLocked", + System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic); + method.ShouldNotBeNull(); + + var result = method!.Invoke(ms, Array.Empty()); + result.ShouldNotBeNull(); + var (lockedSeqs, lockedErr) = ((ValueTuple)result!); + + var (publicSeqs, publicErr) = ms.AllLastSeqs(); + + lockedErr.ShouldBeNull(); + publicErr.ShouldBeNull(); + lockedSeqs.SequenceEqual(publicSeqs).ShouldBeTrue(); + + ms.Stop(); + } + // ----------------------------------------------------------------------- // TestMemStoreBasics (T:2023) // ----------------------------------------------------------------------- diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.cs index 4198b4c..13f5095 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.cs @@ -33,7 +33,7 @@ public class StorageEngineTests private static JetStreamMemStore NewMemStore(StreamConfig cfg) { cfg.Storage = StorageType.MemoryStorage; - return new JetStreamMemStore(cfg); + return JetStreamMemStore.NewMemStore(cfg); } private static byte[] Bytes(string s) => Encoding.UTF8.GetBytes(s); diff --git a/porting.db b/porting.db index 6bbe23b..301f66e 100644 Binary files a/porting.db and b/porting.db differ diff --git a/reports/current.md b/reports/current.md index 506c986..5cf2338 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-02-28 12:11:43 UTC +Generated: 2026-02-28 12:17:02 UTC ## Modules (12 total) @@ -12,10 +12,10 @@ Generated: 2026-02-28 12:11:43 UTC | Status | Count | |--------|-------| -| deferred | 2361 | +| deferred | 2359 | | n_a | 24 | | stub | 1 | -| verified | 1287 | +| verified | 1289 | ## Unit Tests (3257 total) @@ -34,4 +34,4 @@ Generated: 2026-02-28 12:11:43 UTC ## Overall Progress -**2489/6942 items complete (35.9%)** +**2491/6942 items complete (35.9%)** diff --git a/reports/report_1a0ac59.md b/reports/report_1a0ac59.md new file mode 100644 index 0000000..5cf2338 --- /dev/null +++ b/reports/report_1a0ac59.md @@ -0,0 +1,37 @@ +# NATS .NET Porting Status Report + +Generated: 2026-02-28 12:17:02 UTC + +## Modules (12 total) + +| Status | Count | +|--------|-------| +| verified | 12 | + +## Features (3673 total) + +| Status | Count | +|--------|-------| +| deferred | 2359 | +| n_a | 24 | +| stub | 1 | +| verified | 1289 | + +## Unit Tests (3257 total) + +| Status | Count | +|--------|-------| +| deferred | 2091 | +| n_a | 187 | +| verified | 979 | + +## Library Mappings (36 total) + +| Status | Count | +|--------|-------| +| mapped | 36 | + + +## Overall Progress + +**2491/6942 items complete (35.9%)**