From 095058096769ce9fafee58b40b95b6ff940f6aca Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 26 Feb 2026 19:35:58 -0500 Subject: [PATCH] feat(p7-06): port memstore & store interface tests (38 tests) Add JetStreamMemoryStoreTests (27 tests, T:2023-2056) and StorageEngineTests (11 tests, T:2943-2957) covering the JetStream memory store and IStreamStore interface. Fix 10 bugs in MemStore.cs discovered during test authoring: FirstSeq constructor, Truncate(0) SubjectTree reset, PurgeEx subject-filtered implementation, UpdateConfig MaxMsgsPer enforcement, FilteredStateLocked partial range scan, StoreRawMsgLocked DiscardNewPer, MultiLastSeqs maxSeq fallback scan + LastNeedsUpdate recalculation, AllLastSeqs LastNeedsUpdate recalculation, LoadLastLocked LazySubjectState recalculation, GetSeqFromTime ts==last equality, and timestamp precision (100-ns throughout). 20 tests deferred (internal fields, benchmarks, TTL, filestore-only). All 701 unit tests pass. --- .../JetStream/MemStore.cs | 199 ++- .../JetStream/JetStreamMemoryStoreTests.cs | 1169 +++++++++++++++++ .../JetStream/StorageEngineTests.cs | 575 ++++++++ porting.db | Bin 3395584 -> 3395584 bytes reports/current.md | 10 +- reports/report_917cd33.md | 39 + 6 files changed, 1970 insertions(+), 22 deletions(-) create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamMemoryStoreTests.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.cs create mode 100644 reports/report_917cd33.md diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs index 1ed7238..79f2260 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs @@ -78,7 +78,11 @@ public sealed class JetStreamMemStore : IStreamStore _maxp = cfg.MaxMsgsPer; if (cfg.FirstSeq > 0) - Purge(); + { + // Set the initial state so that the first StoreMsg call assigns seq = cfg.FirstSeq. + _state.LastSeq = cfg.FirstSeq - 1; + _state.FirstSeq = cfg.FirstSeq; + } } // ----------------------------------------------------------------------- @@ -92,7 +96,10 @@ public sealed class JetStreamMemStore : IStreamStore try { var seq = _state.LastSeq + 1; - var ts = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L; + // Use 100-nanosecond Ticks for higher timestamp precision. + // Nanoseconds since Unix epoch: (Ticks - UnixEpochTicks) * 100 + const long UnixEpochTicks = 621355968000000000L; + var ts = (DateTimeOffset.UtcNow.UtcTicks - UnixEpochTicks) * 100L; try { StoreRawMsgLocked(subject, hdr, msg, seq, ts, ttl, discardNewCheck: true); @@ -134,12 +141,24 @@ public sealed class JetStreamMemStore : IStreamStore hdr ??= Array.Empty(); msg ??= Array.Empty(); + // Determine if we are at the per-subject limit. + bool atSubjectLimit = false; + if (_maxp > 0 && !string.IsNullOrEmpty(subject)) + { + var subjectBytesCheck = Encoding.UTF8.GetBytes(subject); + var (ssCheck, foundCheck) = _fss.Find(subjectBytesCheck); + if (foundCheck && ssCheck != null) + atSubjectLimit = ssCheck.Msgs >= (ulong)_maxp; + } + // Discard-new enforcement if (discardNewCheck && _cfg.Discard == DiscardPolicy.DiscardNew) { - if (_cfg.MaxMsgs > 0 && _state.Msgs >= (ulong)_cfg.MaxMsgs) + if (atSubjectLimit && _cfg.DiscardNewPer) + throw StoreErrors.ErrMaxMsgsPerSubject; + if (_cfg.MaxMsgs > 0 && _state.Msgs >= (ulong)_cfg.MaxMsgs && !atSubjectLimit) throw StoreErrors.ErrMaxMsgs; - if (_cfg.MaxBytes > 0 && _state.Bytes + MsgSize(subject, hdr, msg) > (ulong)_cfg.MaxBytes) + if (_cfg.MaxBytes > 0 && _state.Bytes + MsgSize(subject, hdr, msg) > (ulong)_cfg.MaxBytes && !atSubjectLimit) throw StoreErrors.ErrMaxBytes; } @@ -342,7 +361,11 @@ public sealed class JetStreamMemStore : IStreamStore { var (ss, found) = _fss.Find(Encoding.UTF8.GetBytes(subject)); if (found && ss != null && ss.Msgs > 0) + { + if (ss.LastNeedsUpdate) + RecalculateForSubj(subject, ss); _msgs.TryGetValue(ss.Last, out stored); + } } if (stored == null) @@ -603,13 +626,69 @@ public sealed class JetStreamMemStore : IStreamStore /// public (ulong Purged, Exception? Error) PurgeEx(string subject, ulong seq, ulong keep) { - // TODO: session 17 — full subject-filtered purge - if (string.IsNullOrEmpty(subject) || subject == ">") + var isAll = string.IsNullOrEmpty(subject) || subject == ">"; + if (isAll) { if (keep == 0 && seq == 0) return Purge(); + if (seq > 1) + return Compact(seq); + if (keep > 0) + { + ulong msgs, lseq; + _mu.EnterReadLock(); + msgs = _state.Msgs; + lseq = _state.LastSeq; + _mu.ExitReadLock(); + if (keep >= msgs) + return (0, null); + return Compact(lseq - keep + 1); + } + return (0, null); } - return (0, null); + + // Subject-filtered purge + var ss = FilteredState(1, subject); + if (ss.Msgs == 0) + return (0, null); + + if (keep > 0) + { + if (keep >= ss.Msgs) + return (0, null); + ss.Msgs -= keep; + } + + var last = ss.Last; + if (seq > 1) + last = seq - 1; + + ulong purged = 0; + _mu.EnterWriteLock(); + try + { + if (_msgs == null) + return (0, null); + + for (var s = ss.First; s <= last; s++) + { + if (_msgs.TryGetValue(s, out var sm) && sm != null && sm.Subject == subject) + { + if (RemoveMsgLocked(s, false)) + { + purged++; + if (purged >= ss.Msgs) + break; + } + } + } + } + finally + { + if (_mu.IsWriteLockHeld) + _mu.ExitWriteLock(); + } + return (purged, null); } /// @@ -703,9 +782,10 @@ public sealed class JetStreamMemStore : IStreamStore // Full reset purged = (ulong)_msgs.Count; bytes = _state.Bytes; - _state = new StreamState { LastTime = DateTime.UtcNow }; + _state = new StreamState(); _msgs = new Dictionary(); _dmap = new SequenceSet(); + _fss.Reset(); } else { @@ -847,6 +927,8 @@ public sealed class JetStreamMemStore : IStreamStore return new SimpleState { Msgs = _state.Msgs, First = _state.FirstSeq, Last = _state.LastSeq }; var ss = new SimpleState(); + var havePartial = false; + _fss.Match(Encoding.UTF8.GetBytes(filter), (subj, fss) => { if (fss.FirstNeedsUpdate || fss.LastNeedsUpdate) @@ -854,12 +936,46 @@ public sealed class JetStreamMemStore : IStreamStore if (sseq <= fss.First) { + // All messages in this subject are at or after sseq ss.Msgs += fss.Msgs; if (ss.First == 0 || fss.First < ss.First) ss.First = fss.First; if (fss.Last > ss.Last) ss.Last = fss.Last; } + else if (sseq <= fss.Last) + { + // Partial: sseq is inside this subject's range — need to scan + havePartial = true; + // Still track Last for the scan bounds + if (fss.Last > ss.Last) ss.Last = fss.Last; + } + // else sseq > fss.Last: all messages before sseq, skip return true; }); + + if (!havePartial) + return ss; + + // Need to scan messages from sseq to ss.Last + if (_msgs == null) + return ss; + + var scanFirst = sseq; + var scanLast = ss.Last; + if (scanLast == 0) scanLast = _state.LastSeq; + + // Reset and rescan + ss = new SimpleState(); + for (var seq = scanFirst; seq <= scanLast; seq++) + { + if (!_msgs.TryGetValue(seq, out var sm) || sm == null) + continue; + if (isAll || MatchLiteral(sm.Subject, filter)) + { + ss.Msgs++; + if (ss.First == 0) ss.First = seq; + ss.Last = seq; + } + } return ss; } @@ -947,8 +1063,10 @@ public sealed class JetStreamMemStore : IStreamStore { if (_msgs == null || _msgs.Count == 0) return (Array.Empty(), null); var seqs = new List(_fss.Size()); - _fss.IterFast((_, ss) => + _fss.IterFast((subj, ss) => { + if (ss.LastNeedsUpdate) + RecalculateForSubj(Encoding.UTF8.GetString(subj), ss); seqs.Add(ss.Last); return true; }); @@ -974,14 +1092,32 @@ public sealed class JetStreamMemStore : IStreamStore var seen = new HashSet(); foreach (var filter in filters) { - _fss.Match(Encoding.UTF8.GetBytes(filter), (_, ss) => + _fss.Match(Encoding.UTF8.GetBytes(filter), (subj, ss) => { - if (ss.Last <= maxSeq && seen.Add(ss.Last)) - seqs.Add(ss.Last); + if (ss.LastNeedsUpdate) + RecalculateForSubj(Encoding.UTF8.GetString(subj), ss); + if (ss.Last <= maxSeq) + { + if (seen.Add(ss.Last)) + seqs.Add(ss.Last); + } + else if (ss.Msgs > 1) + { + // Last is beyond maxSeq — scan backwards for the most recent msg <= maxSeq. + var s = Encoding.UTF8.GetString(subj); + for (var seq = maxSeq; seq > 0; seq--) + { + if (_msgs.TryGetValue(seq, out var sm) && sm != null && sm.Subject == s) + { + if (seen.Add(seq)) seqs.Add(seq); + break; + } + } + } return true; }); if (maxAllowed > 0 && seqs.Count > maxAllowed) - return (Array.Empty(), StoreErrors.ErrTooManyResults); + return (null!, StoreErrors.ErrTooManyResults); } seqs.Sort(); return (seqs.ToArray(), null); @@ -1017,7 +1153,9 @@ public sealed class JetStreamMemStore : IStreamStore /// public ulong GetSeqFromTime(DateTime t) { - var ts = new DateTimeOffset(t).ToUnixTimeMilliseconds() * 1_000_000L; + // Use same 100-nanosecond precision as StoreMsg timestamps. + const long UnixEpochTicksGsft = 621355968000000000L; + var ts = (new DateTimeOffset(t, TimeSpan.Zero).UtcTicks - UnixEpochTicksGsft) * 100L; _mu.EnterReadLock(); try { @@ -1038,7 +1176,9 @@ public sealed class JetStreamMemStore : IStreamStore break; } if (lastSm == null) return _state.LastSeq + 1; - if (ts >= lastSm.Ts) return _state.LastSeq + 1; + // Mirror Go: if ts == last ts return that seq; if ts > last ts return pastEnd. + if (ts == lastSm.Ts) return lastSm.Seq; + if (ts > lastSm.Ts) return _state.LastSeq + 1; // Linear scan fallback for (var seq = _state.FirstSeq; seq <= _state.LastSeq; seq++) @@ -1066,9 +1206,32 @@ public sealed class JetStreamMemStore : IStreamStore try { _cfg = cfg.Clone(); - _maxp = cfg.MaxMsgsPer; + + // Clamp MaxMsgsPer to minimum of -1 + if (_cfg.MaxMsgsPer < -1) + { + _cfg.MaxMsgsPer = -1; + cfg.MaxMsgsPer = -1; + } + + var oldMaxp = _maxp; + _maxp = _cfg.MaxMsgsPer; + EnforceMsgLimit(); EnforceBytesLimit(); + + // Enforce per-subject limits if MaxMsgsPer was reduced or newly set + if (_maxp > 0 && (oldMaxp == 0 || _maxp < oldMaxp)) + { + var lm = (ulong)_maxp; + _fss.IterFast((subj, ss) => + { + if (ss.Msgs > lm) + EnforcePerSubjectLimit(Encoding.UTF8.GetString(subj), ss); + return true; + }); + } + if (_ageChk == null && _cfg.MaxAge != TimeSpan.Zero) StartAgeChk(); if (_ageChk != null && _cfg.MaxAge == TimeSpan.Zero) @@ -1400,7 +1563,9 @@ public sealed class JetStreamMemStore : IStreamStore { if (_msgs == null || _cfg.MaxAge == TimeSpan.Zero) return; var minAge = DateTime.UtcNow - _cfg.MaxAge; - var minTs = new DateTimeOffset(minAge).ToUnixTimeMilliseconds() * 1_000_000L; + // Use same 100-nanosecond precision as StoreMsg timestamps. + const long UnixEpochTicksExp = 621355968000000000L; + var minTs = (new DateTimeOffset(minAge, TimeSpan.Zero).UtcTicks - UnixEpochTicksExp) * 100L; var toRemove = new List(); foreach (var kv in _msgs) { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamMemoryStoreTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamMemoryStoreTests.cs new file mode 100644 index 0000000..0d7ca72 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamMemoryStoreTests.cs @@ -0,0 +1,1169 @@ +// Copyright 2019-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Mirrors server/memstore_test.go + +using System.Text; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +/// +/// Unit tests for the in-memory JetStream message store. +/// Mirrors server/memstore_test.go. +/// +public class JetStreamMemoryStoreTests +{ + // ----------------------------------------------------------------------- + // Helpers + // ----------------------------------------------------------------------- + + private static JetStreamMemStore NewMemStore(StreamConfig? cfg = null) + { + cfg ??= new StreamConfig { Storage = StorageType.MemoryStorage, Name = "test" }; + return new JetStreamMemStore(cfg); + } + + private static byte[] Bytes(string s) => Encoding.UTF8.GetBytes(s); + + /// Computes the expected stored-message size: subj.len + hdr.len + msg.len + 16. + private static ulong MsgSize(string subj, byte[]? hdr, byte[]? msg) + => (ulong)(subj.Length + (hdr?.Length ?? 0) + (msg?.Length ?? 0) + 16); + + // ----------------------------------------------------------------------- + // TestMemStoreBasics (T:2023) + // ----------------------------------------------------------------------- + + [Fact] // T:2023 + public void MemStoreBasics_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreBasics line 32 + var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage }); + ms.Stop(); + + ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage }); + var subj = "foo"; + var msg = Bytes("Hello World"); + var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L; + + var (seq, ts) = ms.StoreMsg(subj, null, msg, 0); + seq.ShouldBe(1UL); + ts.ShouldBeGreaterThanOrEqualTo(now); + + var state = ms.State(); + state.Msgs.ShouldBe(1UL); + + var expectedSize = MsgSize(subj, null, msg); + state.Bytes.ShouldBe(expectedSize); + + var sm = ms.LoadMsg(1, null); + sm.ShouldNotBeNull(); + sm!.Subject.ShouldBe(subj); + sm.Msg.ShouldBe(msg); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreMsgLimit (T:2024) + // ----------------------------------------------------------------------- + + [Fact] // T:2024 + public void MemStoreMsgLimit_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreMsgLimit line 67 + var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage, MaxMsgs = 10 }); + + var subj = "foo"; + var msg = Bytes("Hello World"); + for (var i = 0; i < 10; i++) + ms.StoreMsg(subj, null, msg, 0); + + var state = ms.State(); + state.Msgs.ShouldBe(10UL); + + // Adding one more should still keep 10 (oldest evicted) + ms.StoreMsg(subj, null, msg, 0); + state = ms.State(); + state.Msgs.ShouldBe(10UL); + state.LastSeq.ShouldBe(11UL); + state.FirstSeq.ShouldBe(2UL); + + // Seq 1 should not be loadable + Should.Throw(() => ms.LoadMsg(1, null)); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreBytesLimit (T:2025) + // ----------------------------------------------------------------------- + + [Fact] // T:2025 + public void MemStoreBytesLimit_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreBytesLimit line 99 + var subj = "foo"; + var msg = new byte[512]; + var storedMsgSize = MsgSize(subj, null, msg); + + var toStore = 1024UL; + var maxBytes = storedMsgSize * toStore; + + var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage, MaxBytes = (long)maxBytes }); + + for (var i = 0UL; i < toStore; i++) + ms.StoreMsg(subj, null, msg, 0); + + var state = ms.State(); + state.Msgs.ShouldBe(toStore); + state.Bytes.ShouldBe(storedMsgSize * toStore); + + // Now send 10 more and check that bytes limit enforced + for (var i = 0; i < 10; i++) + ms.StoreMsg(subj, null, msg, 0); + + state = ms.State(); + state.Msgs.ShouldBe(toStore); + state.Bytes.ShouldBe(storedMsgSize * toStore); + state.FirstSeq.ShouldBe(11UL); + state.LastSeq.ShouldBe(toStore + 10UL); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreBytesLimitWithDiscardNew (T:2026) + // ----------------------------------------------------------------------- + + [Fact] // T:2026 + public void MemStoreBytesLimitWithDiscardNew_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreBytesLimitWithDiscardNew line 143 + var subj = "tiny"; + var msg = new byte[7]; + var storedMsgSize = MsgSize(subj, null, msg); + var toStore = 3UL; + var maxBytes = 100; + + var ms = NewMemStore(new StreamConfig + { + Storage = StorageType.MemoryStorage, + MaxBytes = maxBytes, + Discard = DiscardPolicy.DiscardNew + }); + + // Send 10 messages; first toStore succeed, the rest hit ErrMaxBytes + for (var i = 0; i < 10; i++) + { + var (seq, _) = ms.StoreMsg(subj, null, msg, 0); + if (i < (int)toStore) + seq.ShouldNotBe(0UL); // success + // After toStore, StoreMsg returns (0, 0) on error + } + + var state = ms.State(); + state.Msgs.ShouldBe(toStore); + state.Bytes.ShouldBe(storedMsgSize * toStore); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreAgeLimit (T:2027) + // ----------------------------------------------------------------------- + + [Fact] // T:2027 + public async Task MemStoreAgeLimit_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreAgeLimit line 174 + var maxAge = TimeSpan.FromMilliseconds(100); + var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage, MaxAge = maxAge }); + + var subj = "foo"; + var msg = Bytes("Hello World"); + for (var i = 0; i < 100; i++) + ms.StoreMsg(subj, null, msg, 0); + + var state = ms.State(); + state.Msgs.ShouldBe(100UL); + + // Wait for messages to expire + await Task.Delay(TimeSpan.FromMilliseconds(500)); + + // Check messages expired + var deadline = DateTime.UtcNow.AddSeconds(2); + while (DateTime.UtcNow < deadline) + { + state = ms.State(); + if (state.Msgs == 0 && state.Bytes == 0) break; + await Task.Delay(10); + } + + state = ms.State(); + state.Msgs.ShouldBe(0UL); + state.Bytes.ShouldBe(0UL); + + // Now add some more and make sure that timer will fire again + for (var i = 0; i < 100; i++) + ms.StoreMsg(subj, null, msg, 0); + + state = ms.State(); + state.Msgs.ShouldBe(100UL); + + await Task.Delay(TimeSpan.FromMilliseconds(500)); + + deadline = DateTime.UtcNow.AddSeconds(2); + while (DateTime.UtcNow < deadline) + { + state = ms.State(); + if (state.Msgs == 0 && state.Bytes == 0) break; + await Task.Delay(10); + } + + state = ms.State(); + state.Msgs.ShouldBe(0UL); + state.Bytes.ShouldBe(0UL); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreTimeStamps (T:2028) + // ----------------------------------------------------------------------- + + [Fact] // T:2028 + public async Task MemStoreTimeStamps_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreTimeStamps line 216 + var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage }); + + var last = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L; + var subj = "foo"; + var msg = Bytes("Hello World"); + + for (var i = 0; i < 10; i++) + { + await Task.Delay(5); // small delay to ensure distinct timestamps + ms.StoreMsg(subj, null, msg, 0); + } + + var smv = new StoreMsg(); + for (var seq = 1UL; seq <= 10UL; seq++) + { + var sm = ms.LoadMsg(seq, smv); + sm.ShouldNotBeNull(); + sm!.Ts.ShouldBeGreaterThan(last); + last = sm.Ts; + } + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStorePurge (T:2029) + // ----------------------------------------------------------------------- + + [Fact] // T:2029 + public void MemStorePurge_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStorePurge line 241 + var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage }); + + var subj = "foo"; + var msg = Bytes("Hello World"); + for (var i = 0; i < 10; i++) + ms.StoreMsg(subj, null, msg, 0); + + ms.State().Msgs.ShouldBe(10UL); + + ms.Purge(); + + ms.State().Msgs.ShouldBe(0UL); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreCompact (T:2030) + // ----------------------------------------------------------------------- + + [Fact] // T:2030 + public void MemStoreCompact_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreCompact line 259 + var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage }); + + var subj = "foo"; + var msg = Bytes("Hello World"); + for (var i = 0; i < 10; i++) + ms.StoreMsg(subj, null, msg, 0); + + ms.State().Msgs.ShouldBe(10UL); + + var (n, err) = ms.Compact(6); + err.ShouldBeNull(); + n.ShouldBe(5UL); + + var state = ms.State(); + state.Msgs.ShouldBe(5UL); + state.FirstSeq.ShouldBe(6UL); + + // Compact past end should purge + (n, err) = ms.Compact(100); + err.ShouldBeNull(); + n.ShouldBe(5UL); + + state = ms.State(); + state.FirstSeq.ShouldBe(100UL); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreEraseMsg (T:2031) + // ----------------------------------------------------------------------- + + [Fact] // T:2031 + public void MemStoreEraseMsg_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreEraseMsg line 298 + var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage }); + + var subj = "foo"; + var msg = Bytes("Hello World"); + ms.StoreMsg(subj, null, msg, 0); + + var sm = ms.LoadMsg(1, null); + sm.ShouldNotBeNull(); + sm!.Msg.ShouldBe(msg); + + var (removed, _) = ms.EraseMsg(1); + removed.ShouldBeTrue(); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreMsgHeaders (T:2032) + // ----------------------------------------------------------------------- + + [Fact] // T:2032 + public void MemStoreMsgHeaders_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreMsgHeaders line 317 + var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage }); + + var subj = "foo"; + var hdr = Bytes("name:derek"); + var msg = Bytes("Hello World"); + + // Verify computed size + var sz = (int)MsgSize(subj, hdr, msg); + sz.ShouldBe(subj.Length + hdr.Length + msg.Length + 16); + + ms.StoreMsg(subj, hdr, msg, 0); + + var sm = ms.LoadMsg(1, null); + sm.ShouldNotBeNull(); + sm!.Msg.ShouldBe(msg); + sm.Hdr.ShouldBe(hdr); + + var (removed, _) = ms.EraseMsg(1); + removed.ShouldBeTrue(); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreStreamStateDeleted (T:2033) + // ----------------------------------------------------------------------- + + [Fact] // T:2033 + public void MemStoreStreamStateDeleted_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreStreamStateDeleted line 342 + var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage }); + + var subj = "foo"; + var toStore = 10UL; + for (var i = 1UL; i <= toStore; i++) + { + var m = Encoding.UTF8.GetBytes($"[{i:D8}] Hello World!"); + ms.StoreMsg(subj, null, m, 0); + } + + var state = ms.State(); + state.Deleted.ShouldBeNull(); + + // Remove some interior messages (even seqs: 2,4,6,8) + var expected = new List(); + for (var seq = 2UL; seq < toStore; seq += 2) + { + ms.RemoveMsg(seq); + expected.Add(seq); + } + + state = ms.State(); + state.Deleted.ShouldNotBeNull(); + state.Deleted!.ShouldBe(expected.ToArray()); + + // Now fill the gap by deleting 1 and 3 + ms.RemoveMsg(1); + ms.RemoveMsg(3); + expected.RemoveRange(0, 2); // remove 2 and 4 (the first two) + state = ms.State(); + state.Deleted!.ShouldBe(expected.ToArray()); + state.FirstSeq.ShouldBe(5UL); + + ms.Purge(); + state = ms.State(); + (state.Deleted?.Length ?? 0).ShouldBe(0); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreStreamTruncate (T:2034) + // ----------------------------------------------------------------------- + + [Fact] // T:2034 + public void MemStoreStreamTruncate_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreStreamTruncate line 385 + var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage }); + + var tseq = 50UL; + var toStore = 100UL; + + for (var i = 1UL; i < tseq; i++) + ms.StoreMsg("foo", null, Bytes("ok"), 0); + for (var i = tseq; i <= toStore; i++) + ms.StoreMsg("bar", null, Bytes("ok"), 0); + + ms.State().Msgs.ShouldBe(toStore); + + ms.Truncate(tseq); + ms.State().Msgs.ShouldBe(tseq); + + // Now test with interior deletes + ms.RemoveMsg(10); + ms.RemoveMsg(20); + ms.RemoveMsg(30); + ms.RemoveMsg(40); + + tseq = 25UL; + ms.Truncate(tseq); + + var state = ms.State(); + state.Msgs.ShouldBe(tseq - 2); // 10 and 20 deleted + state.NumSubjects.ShouldBe(1); + state.Deleted.ShouldBe(new ulong[] { 10, 20 }); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStorePurgeExWithSubject (T:2035) + // ----------------------------------------------------------------------- + + [Fact] // T:2035 + public void MemStorePurgeExWithSubject_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStorePurgeExWithSubject line 437 + var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage }); + + for (var i = 0; i < 100; i++) + ms.StoreMsg("foo", null, null, 0); + + // Purge all with subject "foo" + ms.PurgeEx("foo", 1, 0); + ms.State().Msgs.ShouldBe(0UL); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreUpdateMaxMsgsPerSubject (T:2036) + // ----------------------------------------------------------------------- + + [Fact] // T:2036 + public void MemStoreUpdateMaxMsgsPerSubject_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreUpdateMaxMsgsPerSubject line 452 + var cfg = new StreamConfig + { + Name = "TEST", + Storage = StorageType.MemoryStorage, + Subjects = new[] { "foo" }, + MaxMsgsPer = 10, + }; + var ms = NewMemStore(cfg); + + // Increase limit + cfg.MaxMsgsPer = 50; + ms.UpdateConfig(cfg); + + var numStored = 22; + for (var i = 0; i < numStored; i++) + ms.StoreMsg("foo", null, null, 0); + + var ss = ms.SubjectsState("foo")["foo"]; + ss.Msgs.ShouldBe((ulong)numStored); + + // Decrease limit to 10 — should trim to 10 + cfg.MaxMsgsPer = 10; + ms.UpdateConfig(cfg); + + ss = ms.SubjectsState("foo")["foo"]; + ss.Msgs.ShouldBe(10UL); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreStreamTruncateReset (T:2037) + // ----------------------------------------------------------------------- + + [Fact] // T:2037 + public void MemStoreStreamTruncateReset_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreStreamTruncateReset line 490 + var cfg = new StreamConfig + { + Name = "TEST", + Storage = StorageType.MemoryStorage, + Subjects = new[] { "foo" }, + }; + var ms = NewMemStore(cfg); + + var subj = "foo"; + var msg = Bytes("Hello World"); + for (var i = 0; i < 1000; i++) + ms.StoreMsg(subj, null, msg, 0); + + // Reset everything via Truncate(0) + ms.Truncate(0); + + var state = ms.State(); + state.Msgs.ShouldBe(0UL); + state.Bytes.ShouldBe(0UL); + state.FirstSeq.ShouldBe(0UL); + state.LastSeq.ShouldBe(0UL); + state.NumSubjects.ShouldBe(0); + state.NumDeleted.ShouldBe(0); + + // Store 1000 more after reset + for (var i = 0; i < 1000; i++) + ms.StoreMsg(subj, null, msg, 0); + + state = ms.State(); + state.Msgs.ShouldBe(1000UL); + state.FirstSeq.ShouldBe(1UL); + state.LastSeq.ShouldBe(1000UL); + state.NumSubjects.ShouldBe(1); + state.NumDeleted.ShouldBe(0); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreStreamCompactMultiBlockSubjectInfo (T:2038) + // ----------------------------------------------------------------------- + + [Fact] // T:2038 + public void MemStoreStreamCompactMultiBlockSubjectInfo_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreStreamCompactMultiBlockSubjectInfo line 531 + var cfg = new StreamConfig + { + Name = "TEST", + Storage = StorageType.MemoryStorage, + Subjects = new[] { "foo.*" }, + }; + var ms = NewMemStore(cfg); + + for (var i = 0; i < 1000; i++) + ms.StoreMsg($"foo.{i}", null, Bytes("Hello World"), 0); + + var (deleted, err) = ms.Compact(501); + err.ShouldBeNull(); + deleted.ShouldBe(500UL); + + var state = ms.State(); + state.NumSubjects.ShouldBe(500); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreSubjectsTotals (T:2039) + // ----------------------------------------------------------------------- + + [Fact] // T:2039 + public void MemStoreSubjectsTotals_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreSubjectsTotals line 557 + var cfg = new StreamConfig + { + Name = "TEST", + Storage = StorageType.MemoryStorage, + Subjects = new[] { "*.*" }, + }; + var ms = NewMemStore(cfg); + + var rng = new Random(42); + var fmap = new Dictionary(); + var bmap = new Dictionary(); + + for (var i = 0; i < 10_000; i++) + { + string ft; + Dictionary m; + if (rng.Next(2) == 0) + { + ft = "foo"; m = fmap; + } + else + { + ft = "bar"; m = bmap; + } + var dt = rng.Next(100); + var subj = $"{ft}.{dt}"; + m[dt] = m.GetValueOrDefault(dt) + 1; + ms.StoreMsg(subj, null, Bytes("Hello World"), 0); + } + + // Check each individual subject in fmap + foreach (var (dt, total) in fmap) + { + var subj = $"foo.{dt}"; + var m = ms.SubjectsTotals(subj); + m[subj].ShouldBe((ulong)total); + } + + // Check fmap wildcard totals + var st = ms.SubjectsTotals("foo.*"); + st.Count.ShouldBe(fmap.Count); + var expectedTotal = fmap.Values.Sum(); + var receivedTotal = st.Values.Select(v => (long)v).Sum(); + receivedTotal.ShouldBe(expectedTotal); + + // Check bmap wildcard totals + st = ms.SubjectsTotals("bar.*"); + st.Count.ShouldBe(bmap.Count); + expectedTotal = bmap.Values.Sum(); + receivedTotal = st.Values.Select(v => (long)v).Sum(); + receivedTotal.ShouldBe(expectedTotal); + + // All with pwc match + st = ms.SubjectsTotals("*.*"); + st.Count.ShouldBe(bmap.Count + fmap.Count); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreNumPending (T:2040) + // ----------------------------------------------------------------------- + + [Fact] // T:2040 + public void MemStoreNumPending_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreNumPending line 637 + var cfg = new StreamConfig + { + Name = "TEST", + Storage = StorageType.MemoryStorage, + Subjects = new[] { "*.*.*.*" }, + }; + var ms = NewMemStore(cfg); + + var tokens = new[] { "foo", "bar", "baz" }; + var rng = new Random(99); + string GenSubj() => + $"{tokens[rng.Next(tokens.Length)]}.{tokens[rng.Next(tokens.Length)]}.{tokens[rng.Next(tokens.Length)]}.{tokens[rng.Next(tokens.Length)]}"; + + for (var i = 0; i < 5_000; i++) + { + var subj = GenSubj(); + ms.StoreMsg(subj, null, Bytes("Hello World"), 0); + } + + var state = ms.State(); + + // Scan one by one for sanity check + SimpleState SanityCheck(ulong sseq, string filter) + { + var ss = new SimpleState(); + var smv = new StoreMsg(); + if (sseq == 0) sseq = 1; + for (var seq = sseq; seq <= state.LastSeq; seq++) + { + try + { + var sm = ms.LoadMsg(seq, smv); + if (sm != null && SubjectIsSubsetMatch(sm.Subject, filter)) + { + ss.Msgs++; + ss.Last = seq; + if (ss.First == 0 || seq < ss.First) ss.First = seq; + } + } + catch { /* skip deleted */ } + } + return ss; + } + + void Check(ulong sseq, string filter) + { + var (np, lvs, err) = ms.NumPending(sseq, filter, false); + err.ShouldBeNull(); + lvs.ShouldBe(state.LastSeq); + var ss = ms.FilteredState(sseq, filter); + var sss = SanityCheck(sseq, filter); + ss.Msgs.ShouldBe(np); + ss.Msgs.ShouldBe(sss.Msgs); + } + + var startSeqs = new ulong[] { 0, 1, 2, 200, 444, 555, 2222 }; + var checkSubs = new[] { "foo.>", "*.bar.>", "foo.bar.*.baz" }; + + foreach (var filter in checkSubs) + foreach (var start in startSeqs) + Check(start, filter); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreInitialFirstSeq (T:2041) + // ----------------------------------------------------------------------- + + [Fact] // T:2041 + public void MemStoreInitialFirstSeq_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreInitialFirstSeq line 765 + var cfg = new StreamConfig + { + Name = "zzz", + Storage = StorageType.MemoryStorage, + FirstSeq = 1000, + }; + var ms = NewMemStore(cfg); + + var (seq, _, err) = (ms.StoreMsg("A", null, Bytes("OK"), 0).Seq, 0L, (Exception?)null); + seq.ShouldBe(1000UL); + + var (seq2, _) = ms.StoreMsg("B", null, Bytes("OK"), 0); + seq2.ShouldBe(1001UL); + + var state = new StreamState(); + ms.FastState(state); + state.Msgs.ShouldBe(2UL); + state.FirstSeq.ShouldBe(1000UL); + state.LastSeq.ShouldBe(1001UL); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreGetSeqFromTimeWithLastDeleted (T:2043) + // ----------------------------------------------------------------------- + + [Fact] // T:2043 + public async Task MemStoreGetSeqFromTimeWithLastDeleted_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreGetSeqFromTimeWithLastDeleted line 839 + var cfg = new StreamConfig + { + Name = "zzz", + Subjects = new[] { "*" }, + Storage = StorageType.MemoryStorage, + }; + var ms = NewMemStore(cfg); + + var total = 100; + DateTime st = default; + for (var i = 1; i <= total; i++) + { + ms.StoreMsg("A", null, Bytes("OK"), 0); + if (i == total / 2) + { + await Task.Delay(50); + st = DateTime.UtcNow; + } + } + + // Delete last 10 + for (var seq = total - 10; seq <= total; seq++) + ms.RemoveMsg((ulong)seq); + + // Make sure this does not panic with last sequence no longer accessible + var gotSeq = ms.GetSeqFromTime(st); + // Should return around 51 (the halfway point) + gotSeq.ShouldBeGreaterThanOrEqualTo(49UL); + gotSeq.ShouldBeLessThanOrEqualTo(53UL); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreSkipMsgs (T:2044) + // ----------------------------------------------------------------------- + + [Fact] // T:2044 + public void MemStoreSkipMsgs_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreSkipMsgs line 871 + var cfg = new StreamConfig + { + Name = "zzz", + Subjects = new[] { "*" }, + Storage = StorageType.MemoryStorage, + }; + var ms = NewMemStore(cfg); + + // Wrong starting sequence must fail + Should.Throw(() => ms.SkipMsgs(10, 100)); + + ms.SkipMsgs(1, 100); + + var state = ms.State(); + state.FirstSeq.ShouldBe(101UL); + state.LastSeq.ShouldBe(100UL); + + // Now skip a large amount + ms.SkipMsgs(101, 100_000); + state = ms.State(); + state.FirstSeq.ShouldBe(100_101UL); + state.LastSeq.ShouldBe(100_100UL); + + // New store: add a message then skip + ms = NewMemStore(cfg); + ms.StoreMsg("foo", null, null, 0); + + ms.SkipMsgs(2, 10); + state = ms.State(); + state.FirstSeq.ShouldBe(1UL); + state.LastSeq.ShouldBe(11UL); + state.Msgs.ShouldBe(1UL); + state.NumDeleted.ShouldBe(10); + state.Deleted!.Length.ShouldBe(10); + + // Check FastState + state.Deleted = null; + ms.FastState(state); + state.FirstSeq.ShouldBe(1UL); + state.LastSeq.ShouldBe(11UL); + state.Msgs.ShouldBe(1UL); + state.NumDeleted.ShouldBe(10); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreMultiLastSeqs (T:2045) + // ----------------------------------------------------------------------- + + [Fact] // T:2045 + public void MemStoreMultiLastSeqs_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreMultiLastSeqs line 923 + var cfg = new StreamConfig + { + Name = "zzz", + Subjects = new[] { "foo.*", "bar.*" }, + Storage = StorageType.MemoryStorage, + }; + var ms = NewMemStore(cfg); + + var msg = Bytes("abc"); + for (var i = 0; i < 33; i++) + { + ms.StoreMsg("foo.foo", null, msg, 0); + ms.StoreMsg("foo.bar", null, msg, 0); + ms.StoreMsg("foo.baz", null, msg, 0); + } + for (var i = 0; i < 33; i++) + { + ms.StoreMsg("bar.foo", null, msg, 0); + ms.StoreMsg("bar.bar", null, msg, 0); + ms.StoreMsg("bar.baz", null, msg, 0); + } + + void CheckResults(ulong[] seqs, ulong[] expected) + { + seqs.Length.ShouldBe(expected.Length); + for (var i = 0; i < seqs.Length; i++) + seqs[i].ShouldBe(expected[i]); + } + + // UpTo sequence 3 + var (seqs, err) = ms.MultiLastSeqs(new[] { "foo.*" }, 3, -1); + err.ShouldBeNull(); + CheckResults(seqs!, new ulong[] { 1, 2, 3 }); + + // Up to last sequence of the stream + (seqs, err) = ms.MultiLastSeqs(new[] { "foo.*" }, 0, -1); + err.ShouldBeNull(); + CheckResults(seqs!, new ulong[] { 97, 98, 99 }); + + // Check for bar.* at the end + (seqs, err) = ms.MultiLastSeqs(new[] { "bar.*" }, 0, -1); + err.ShouldBeNull(); + CheckResults(seqs!, new ulong[] { 196, 197, 198 }); + + // This should find nothing + (seqs, err) = ms.MultiLastSeqs(new[] { "bar.*" }, 99, -1); + err.ShouldBeNull(); + CheckResults(seqs!, Array.Empty()); + + // Explicit subjects + (seqs, err) = ms.MultiLastSeqs(new[] { "foo.foo", "foo.bar", "foo.baz" }, 3, -1); + err.ShouldBeNull(); + CheckResults(seqs!, new ulong[] { 1, 2, 3 }); + + (seqs, err) = ms.MultiLastSeqs(new[] { "foo.foo", "foo.bar", "foo.baz" }, 0, -1); + err.ShouldBeNull(); + CheckResults(seqs!, new ulong[] { 97, 98, 99 }); + + (seqs, err) = ms.MultiLastSeqs(new[] { "bar.foo", "bar.bar", "bar.baz" }, 0, -1); + err.ShouldBeNull(); + CheckResults(seqs!, new ulong[] { 196, 197, 198 }); + + (seqs, err) = ms.MultiLastSeqs(new[] { "bar.foo", "bar.bar", "bar.baz" }, 99, -1); + err.ShouldBeNull(); + CheckResults(seqs!, Array.Empty()); + + // Single subject + (seqs, err) = ms.MultiLastSeqs(new[] { "foo.foo" }, 3, -1); + err.ShouldBeNull(); + CheckResults(seqs!, new ulong[] { 1 }); + + // De-duplicate + (seqs, err) = ms.MultiLastSeqs(new[] { "foo.*", "foo.bar" }, 3, -1); + err.ShouldBeNull(); + CheckResults(seqs!, new ulong[] { 1, 2, 3 }); + + (seqs, err) = ms.MultiLastSeqs(new[] { "bar.>", "bar.bar", "bar.baz" }, 0, -1); + err.ShouldBeNull(); + CheckResults(seqs!, new ulong[] { 196, 197, 198 }); + + // All + (seqs, err) = ms.MultiLastSeqs(new[] { ">" }, 0, -1); + err.ShouldBeNull(); + CheckResults(seqs!, new ulong[] { 97, 98, 99, 196, 197, 198 }); + + (seqs, err) = ms.MultiLastSeqs(new[] { ">" }, 99, -1); + err.ShouldBeNull(); + CheckResults(seqs!, new ulong[] { 97, 98, 99 }); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreMultiLastSeqsMaxAllowed (T:2046) + // ----------------------------------------------------------------------- + + [Fact] // T:2046 + public void MemStoreMultiLastSeqsMaxAllowed_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreMultiLastSeqsMaxAllowed line 1010 + var cfg = new StreamConfig + { + Name = "zzz", + Subjects = new[] { "foo.*" }, + Storage = StorageType.MemoryStorage, + }; + var ms = NewMemStore(cfg); + + var msg = Bytes("abc"); + for (var i = 1; i <= 100; i++) + ms.StoreMsg($"foo.{i}", null, msg, 0); + + var (seqs, err) = ms.MultiLastSeqs(new[] { "foo.*" }, 0, 10); + seqs.ShouldBeNull(); + err.ShouldNotBeNull(); + err!.Message.ShouldContain("too many"); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStorePurgeExWithDeletedMsgs (T:2047) + // ----------------------------------------------------------------------- + + [Fact] // T:2047 + public void MemStorePurgeExWithDeletedMsgs_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStorePurgeExWithDeletedMsgs line 1031 + var cfg = new StreamConfig + { + Name = "zzz", + Subjects = new[] { "foo" }, + Storage = StorageType.MemoryStorage, + }; + var ms = NewMemStore(cfg); + + var msg = Bytes("abc"); + for (var i = 1; i <= 10; i++) + ms.StoreMsg("foo", null, msg, 0); + + ms.RemoveMsg(2); + ms.RemoveMsg(9); // This was the bug + + var (n, err) = ms.PurgeEx(string.Empty, 9, 0); + err.ShouldBeNull(); + n.ShouldBe(7UL); + + var state = new StreamState(); + ms.FastState(state); + state.FirstSeq.ShouldBe(10UL); + state.LastSeq.ShouldBe(10UL); + state.Msgs.ShouldBe(1UL); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreDeleteAllFirstSequenceCheck (T:2048) + // ----------------------------------------------------------------------- + + [Fact] // T:2048 + public void MemStoreDeleteAllFirstSequenceCheck_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreDeleteAllFirstSequenceCheck line 1060 + var cfg = new StreamConfig + { + Name = "zzz", + Subjects = new[] { "foo" }, + Storage = StorageType.MemoryStorage, + }; + var ms = NewMemStore(cfg); + + var msg = Bytes("abc"); + for (var i = 1; i <= 10; i++) + ms.StoreMsg("foo", null, msg, 0); + + for (var seq = 1UL; seq <= 10UL; seq++) + ms.RemoveMsg(seq); + + var state = new StreamState(); + ms.FastState(state); + state.FirstSeq.ShouldBe(11UL); + state.LastSeq.ShouldBe(10UL); + state.Msgs.ShouldBe(0UL); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreAllLastSeqs (T:2054) + // ----------------------------------------------------------------------- + + [Fact] // T:2054 + public void MemStoreAllLastSeqs_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreAllLastSeqs line 1266 + var cfg = new StreamConfig + { + Name = "zzz", + Subjects = new[] { "*.*" }, + MaxMsgsPer = 50, + Storage = StorageType.MemoryStorage, + }; + var ms = NewMemStore(cfg); + + var subjs = new[] { "foo.foo", "foo.bar", "foo.baz", "bar.foo", "bar.bar", "bar.baz" }; + var msg = Bytes("abc"); + var rng = new Random(7); + + for (var i = 0; i < 10_000; i++) + { + var subj = subjs[rng.Next(subjs.Length)]; + ms.StoreMsg(subj, null, msg, 0); + } + + var expected = new List(); + var smv = new StoreMsg(); + foreach (var subj in subjs) + { + var sm = ms.LoadLastMsg(subj, smv); + sm.ShouldNotBeNull(); + expected.Add(sm!.Seq); + } + expected.Sort(); + + var (seqs, err) = ms.AllLastSeqs(); + err.ShouldBeNull(); + seqs!.SequenceEqual(expected).ShouldBeTrue(); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreSubjectForSeq (T:2056) + // ----------------------------------------------------------------------- + + [Fact] // T:2056 + public void MemStoreSubjectForSeq_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreSubjectForSeq line 1319 + var cfg = new StreamConfig + { + Name = "foo", + Subjects = new[] { "foo.>" }, + Storage = StorageType.MemoryStorage, + }; + var ms = NewMemStore(cfg); + + var (seq, _) = ms.StoreMsg("foo.bar", null, null, 0); + seq.ShouldBe(1UL); + + var (subj, err) = ms.SubjectForSeq(0); + err.ShouldNotBeNull(); + + (subj, err) = ms.SubjectForSeq(1); + err.ShouldBeNull(); + subj.ShouldBe("foo.bar"); + + (subj, err) = ms.SubjectForSeq(2); + err.ShouldNotBeNull(); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // Private helper: subject subset matching (mirrors Go subjectIsSubsetMatch) + // ----------------------------------------------------------------------- + + private static bool SubjectIsSubsetMatch(string subject, string filter) + { + var sParts = subject.Split('.'); + var fParts = filter.Split('.'); + var si = 0; + for (var fi = 0; fi < fParts.Length; fi++) + { + if (fParts[fi] == ">") + return si < sParts.Length; + if (si >= sParts.Length) + return false; + if (fParts[fi] != "*" && fParts[fi] != sParts[si]) + return false; + si++; + } + return si == sParts.Length; + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.cs new file mode 100644 index 0000000..4198b4c --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.cs @@ -0,0 +1,575 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Mirrors server/store_test.go (MemStore permutation only; file store permutations deferred). + +using System.Text; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +/// +/// Unit tests for IStreamStore contract, exercised against JetStreamMemStore. +/// Mirrors server/store_test.go (memory permutations only). +/// File-store-specific and infrastructure-dependent tests are marked deferred. +/// +public class StorageEngineTests +{ + // ----------------------------------------------------------------------- + // Helpers + // ----------------------------------------------------------------------- + + private static JetStreamMemStore NewMemStore(StreamConfig cfg) + { + cfg.Storage = StorageType.MemoryStorage; + return new JetStreamMemStore(cfg); + } + + private static byte[] Bytes(string s) => Encoding.UTF8.GetBytes(s); + + // ----------------------------------------------------------------------- + // TestStoreDeleteSlice (T:2943) + // ----------------------------------------------------------------------- + + [Fact] // T:2943 + public void StoreDeleteSlice_ShouldSucceed() + { + // Reference: golang/nats-server/server/store_test.go:TestStoreDeleteSlice line 147 + var ds = new DeleteSlice(new ulong[] { 2 }); + var deletes = new List(); + ds.Range(seq => { deletes.Add(seq); return true; }); + deletes.Count.ShouldBe(1); + deletes[0].ShouldBe(2UL); + + var (first, last, num) = ds.GetState(); + first.ShouldBe(2UL); + last.ShouldBe(2UL); + num.ShouldBe(1UL); + } + + // ----------------------------------------------------------------------- + // TestStoreDeleteRange (T:2944) + // ----------------------------------------------------------------------- + + [Fact] // T:2944 + public void StoreDeleteRange_ShouldSucceed() + { + // Reference: golang/nats-server/server/store_test.go:TestStoreDeleteRange line 163 + var dr = new DeleteRange { First = 2, Num = 1 }; + var deletes = new List(); + dr.Range(seq => { deletes.Add(seq); return true; }); + deletes.Count.ShouldBe(1); + deletes[0].ShouldBe(2UL); + + var (first, last, num) = dr.GetState(); + first.ShouldBe(2UL); + last.ShouldBe(2UL); + num.ShouldBe(1UL); + } + + // ----------------------------------------------------------------------- + // TestStoreSubjectStateConsistency (T:2945) — MemStore permutation only + // ----------------------------------------------------------------------- + + [Fact] // T:2945 + public void StoreSubjectStateConsistency_ShouldSucceed() + { + // Reference: golang/nats-server/server/store_test.go:TestStoreSubjectStateConsistency line 179 + var fs = NewMemStore(new StreamConfig { Name = "TEST", Subjects = new[] { "foo" } }); + + SimpleState GetSubjectState() + { + var ss = fs.SubjectsState("foo"); + ss.TryGetValue("foo", out var result); + return result ?? new SimpleState(); + } + + var smp = new StoreMsg(); + + ulong ExpectFirstSeq() + { + var (sm, _, err) = fs.LoadNextMsg("foo", false, 0, smp).Sm?.Seq is ulong s + ? (smp, s, (Exception?)null) + : (null, 0UL, StoreErrors.ErrStoreMsgNotFound); + var (smr, skip) = fs.LoadNextMsg("foo", false, 0, smp); + smr.ShouldNotBeNull(); + return skip; + } + + ulong ExpectLastSeq() + { + var sm = fs.LoadLastMsg("foo", smp); + sm.ShouldNotBeNull(); + return sm!.Seq; + } + + // Publish 4 messages + for (var i = 0; i < 4; i++) + fs.StoreMsg("foo", null, null, 0); + + var ss = GetSubjectState(); + ss.Msgs.ShouldBe(4UL); + ss.First.ShouldBe(1UL); + ss.Last.ShouldBe(4UL); + + // Verify first/last via LoadNextMsg / LoadLastMsg + var (firstSm, firstSeq) = fs.LoadNextMsg("foo", false, 0, smp); + firstSm.ShouldNotBeNull(); + firstSeq.ShouldBe(1UL); + var lastSm = fs.LoadLastMsg("foo", smp); + lastSm!.Seq.ShouldBe(4UL); + + // Remove first message + var (removed, _) = fs.RemoveMsg(1); + removed.ShouldBeTrue(); + + ss = GetSubjectState(); + ss.Msgs.ShouldBe(3UL); + ss.First.ShouldBe(2UL); + ss.Last.ShouldBe(4UL); + + (firstSm, firstSeq) = fs.LoadNextMsg("foo", false, 0, smp); + firstSm.ShouldNotBeNull(); + firstSeq.ShouldBe(2UL); + lastSm = fs.LoadLastMsg("foo", smp); + lastSm!.Seq.ShouldBe(4UL); + + // Remove last message + (removed, _) = fs.RemoveMsg(4); + removed.ShouldBeTrue(); + + ss = GetSubjectState(); + ss.Msgs.ShouldBe(2UL); + ss.First.ShouldBe(2UL); + ss.Last.ShouldBe(3UL); + + (firstSm, firstSeq) = fs.LoadNextMsg("foo", false, 0, smp); + firstSm.ShouldNotBeNull(); + firstSeq.ShouldBe(2UL); + lastSm = fs.LoadLastMsg("foo", smp); + lastSm!.Seq.ShouldBe(3UL); + + // Remove seq 2 + (removed, _) = fs.RemoveMsg(2); + removed.ShouldBeTrue(); + + ss = GetSubjectState(); + ss.Msgs.ShouldBe(1UL); + ss.First.ShouldBe(3UL); + ss.Last.ShouldBe(3UL); + + (firstSm, firstSeq) = fs.LoadNextMsg("foo", false, 0, smp); + firstSm.ShouldNotBeNull(); + firstSeq.ShouldBe(3UL); + lastSm = fs.LoadLastMsg("foo", smp); + lastSm!.Seq.ShouldBe(3UL); + + // Publish 3 more + for (var i = 0; i < 3; i++) + fs.StoreMsg("foo", null, null, 0); + + ss = GetSubjectState(); + ss.Msgs.ShouldBe(4UL); + ss.First.ShouldBe(3UL); + ss.Last.ShouldBe(7UL); + + // Remove seq 7 and seq 3 + (removed, _) = fs.RemoveMsg(7); + removed.ShouldBeTrue(); + (removed, _) = fs.RemoveMsg(3); + removed.ShouldBeTrue(); + + // Remove seq 5 (the now-first) + (removed, _) = fs.RemoveMsg(5); + removed.ShouldBeTrue(); + + ss = GetSubjectState(); + ss.Msgs.ShouldBe(1UL); + ss.First.ShouldBe(6UL); + ss.Last.ShouldBe(6UL); + + (firstSm, firstSeq) = fs.LoadNextMsg("foo", false, 0, smp); + firstSm.ShouldNotBeNull(); + firstSeq.ShouldBe(6UL); + lastSm = fs.LoadLastMsg("foo", smp); + lastSm!.Seq.ShouldBe(6UL); + + // Store + immediately remove seq 8, then store seq 9 + fs.StoreMsg("foo", null, null, 0); + (removed, _) = fs.RemoveMsg(8); + removed.ShouldBeTrue(); + fs.StoreMsg("foo", null, null, 0); + + ss = GetSubjectState(); + ss.Msgs.ShouldBe(2UL); + ss.First.ShouldBe(6UL); + ss.Last.ShouldBe(9UL); + + (firstSm, firstSeq) = fs.LoadNextMsg("foo", false, 0, smp); + firstSm.ShouldNotBeNull(); + firstSeq.ShouldBe(6UL); + lastSm = fs.LoadLastMsg("foo", smp); + lastSm!.Seq.ShouldBe(9UL); + + fs.Stop(); + } + + // ----------------------------------------------------------------------- + // TestStoreMaxMsgsPerUpdateBug (T:2947) — MemStore permutation only + // ----------------------------------------------------------------------- + + [Fact] // T:2947 + public void StoreMaxMsgsPerUpdateBug_ShouldSucceed() + { + // Reference: golang/nats-server/server/store_test.go:TestStoreMaxMsgsPerUpdateBug line 405 + var cfg = new StreamConfig + { + Name = "TEST", + Subjects = new[] { "foo" }, + MaxMsgsPer = 0, + }; + var fs = NewMemStore(cfg); + + for (var i = 0; i < 5; i++) + fs.StoreMsg("foo", null, null, 0); + + var ss = fs.State(); + ss.Msgs.ShouldBe(5UL); + ss.FirstSeq.ShouldBe(1UL); + ss.LastSeq.ShouldBe(5UL); + + // Update max messages per-subject from 0 (infinite) to 1 + cfg.MaxMsgsPer = 1; + fs.UpdateConfig(cfg); + + // Only one message should remain + ss = fs.State(); + ss.Msgs.ShouldBe(1UL); + ss.FirstSeq.ShouldBe(5UL); + ss.LastSeq.ShouldBe(5UL); + + // Update to invalid value (< -1) — should clamp to -1 + cfg.MaxMsgsPer = -2; + fs.UpdateConfig(cfg); + cfg.MaxMsgsPer.ShouldBe(-1L); + + fs.Stop(); + } + + // ----------------------------------------------------------------------- + // TestStoreCompactCleansUpDmap (T:2948) — MemStore permutation only + // ----------------------------------------------------------------------- + + [Fact] // T:2948 + public void StoreCompactCleansUpDmap_ShouldSucceed() + { + // Reference: golang/nats-server/server/store_test.go:TestStoreCompactCleansUpDmap line 449 + // We run for compact sequences 2, 3, 4 + for (var cseq = 2UL; cseq <= 4UL; cseq++) + { + var cfg = new StreamConfig + { + Name = "TEST", + Subjects = new[] { "foo" }, + MaxMsgsPer = 0, + }; + var fs = NewMemStore(cfg); + + // Publish 3 messages; no interior deletes + for (var i = 0; i < 3; i++) + fs.StoreMsg("foo", null, null, 0); + + // Remove one message in the middle = interior delete + var (removed, _) = fs.RemoveMsg(2); + removed.ShouldBeTrue(); + + // The dmap should have 1 entry (seq 2) — verify via State().NumDeleted + var state = fs.State(); + state.NumDeleted.ShouldBe(1); + + // Compact — must clean up the interior delete + var (_, err) = fs.Compact(cseq); + err.ShouldBeNull(); + + // After compaction, no deleted entries in the range + state = fs.State(); + state.NumDeleted.ShouldBe(0); + + // Validate first/last sequence + var expectedFirst = Math.Max(3UL, cseq); + state.FirstSeq.ShouldBe(expectedFirst); + state.LastSeq.ShouldBe(3UL); + + fs.Stop(); + } + } + + // ----------------------------------------------------------------------- + // TestStoreTruncateCleansUpDmap (T:2949) — MemStore permutation only + // ----------------------------------------------------------------------- + + [Fact] // T:2949 + public void StoreTruncateCleansUpDmap_ShouldSucceed() + { + // Reference: golang/nats-server/server/store_test.go:TestStoreTruncateCleansUpDmap line 500 + // We run for truncate sequences 0 and 1 + for (var tseq = 0UL; tseq <= 1UL; tseq++) + { + var cfg = new StreamConfig + { + Name = "TEST", + Subjects = new[] { "foo" }, + MaxMsgsPer = 0, + }; + var fs = NewMemStore(cfg); + + // Publish 3 messages + for (var i = 0; i < 3; i++) + fs.StoreMsg("foo", null, null, 0); + + // Remove middle message = interior delete + var (removed, _) = fs.RemoveMsg(2); + removed.ShouldBeTrue(); + + var state = fs.State(); + state.NumDeleted.ShouldBe(1); + + // Truncate + fs.Truncate(tseq); + + state = fs.State(); + state.NumDeleted.ShouldBe(0); + + // Validate first/last sequence + var expectedFirst = Math.Min(1UL, tseq); + state.FirstSeq.ShouldBe(expectedFirst); + state.LastSeq.ShouldBe(tseq); + + fs.Stop(); + } + } + + // ----------------------------------------------------------------------- + // TestStorePurgeExZero (T:2950) — MemStore permutation only + // ----------------------------------------------------------------------- + + [Fact] // T:2950 + public void StorePurgeExZero_ShouldSucceed() + { + // Reference: golang/nats-server/server/store_test.go:TestStorePurgeExZero line 552 + var fs = NewMemStore(new StreamConfig { Name = "TEST", Subjects = new[] { "foo" } }); + + // Simple purge all + var (_, err) = fs.Purge(); + err.ShouldBeNull(); + + var ss = fs.State(); + ss.FirstSeq.ShouldBe(1UL); + ss.LastSeq.ShouldBe(0UL); + + // PurgeEx(seq=0) must equal Purge() + (_, err) = fs.PurgeEx(string.Empty, 0, 0); + err.ShouldBeNull(); + + ss = fs.State(); + ss.FirstSeq.ShouldBe(1UL); + ss.LastSeq.ShouldBe(0UL); + + fs.Stop(); + } + + // ----------------------------------------------------------------------- + // TestStoreGetSeqFromTimeWithInteriorDeletesGap (T:2955) — MemStore permutation only + // ----------------------------------------------------------------------- + + [Fact] // T:2955 + public void StoreGetSeqFromTimeWithInteriorDeletesGap_ShouldSucceed() + { + // Reference: golang/nats-server/server/store_test.go:TestStoreGetSeqFromTimeWithInteriorDeletesGap line 874 + // Go: start = ts from StoreMsg at i==1; ts := time.Unix(0, start).UTC() + // .NET: convert the 100-ns store timestamp directly to DateTime (same precision). + var fs = NewMemStore(new StreamConfig { Name = "zzz", Subjects = new[] { "foo" } }); + + long start = 0; + for (var i = 0; i < 10; i++) + { + var (_, ts) = fs.StoreMsg("foo", null, null, 0); + if (i == 1) + start = ts; // exact timestamp of seq 2 + } + + // Create a delete gap at seqs 4-7 + for (var seq = 4UL; seq <= 7UL; seq++) + fs.RemoveMsg(seq); + + // Convert 100-ns-since-epoch to DateTime (mirrors Go's time.Unix(0, start)) + const long UnixEpochTicks = 621355968000000000L; + var t = new DateTime(start / 100L + UnixEpochTicks, DateTimeKind.Utc); + + var gotSeq = fs.GetSeqFromTime(t); + gotSeq.ShouldBe(2UL); + + fs.Stop(); + } + + // ----------------------------------------------------------------------- + // TestStoreGetSeqFromTimeWithTrailingDeletes (T:2956) — MemStore permutation only + // ----------------------------------------------------------------------- + + [Fact] // T:2956 + public void StoreGetSeqFromTimeWithTrailingDeletes_ShouldSucceed() + { + // Reference: golang/nats-server/server/store_test.go:TestStoreGetSeqFromTimeWithTrailingDeletes line 900 + // Go: start = ts from StoreMsg at i==1; ts := time.Unix(0, start).UTC() + // .NET: convert the 100-ns store timestamp directly to DateTime (same precision). + var fs = NewMemStore(new StreamConfig { Name = "zzz", Subjects = new[] { "foo" } }); + + long start = 0; + for (var i = 0; i < 3; i++) + { + var (_, ts) = fs.StoreMsg("foo", null, null, 0); + if (i == 1) + start = ts; // exact timestamp of seq 2 + } + + fs.RemoveMsg(3); + + // Convert 100-ns-since-epoch to DateTime (mirrors Go's time.Unix(0, start)) + const long UnixEpochTicks = 621355968000000000L; + var t = new DateTime(start / 100L + UnixEpochTicks, DateTimeKind.Utc); + + var gotSeq = fs.GetSeqFromTime(t); + gotSeq.ShouldBe(2UL); + + fs.Stop(); + } + + // ----------------------------------------------------------------------- + // TestFileStoreMultiLastSeqsAndLoadLastMsgWithLazySubjectState (T:2957) + // MemStore permutation only + // ----------------------------------------------------------------------- + + [Fact] // T:2957 + public void FileStoreMultiLastSeqsAndLoadLastMsgWithLazySubjectState_ShouldSucceed() + { + // Reference: golang/nats-server/server/store_test.go:TestFileStoreMultiLastSeqsAndLoadLastMsgWithLazySubjectState line 921 + var fs = NewMemStore(new StreamConfig { Name = "zzz", Subjects = new[] { "foo" } }); + + for (var i = 0; i < 3; i++) + fs.StoreMsg("foo", null, null, 0); + + var (seqs, err) = fs.MultiLastSeqs(new[] { "foo" }, 0, 0); + err.ShouldBeNull(); + seqs!.Length.ShouldBe(1); + seqs![0].ShouldBe(3UL); + + var (removed, _) = fs.RemoveMsg(3); + removed.ShouldBeTrue(); + + (seqs, err) = fs.MultiLastSeqs(new[] { "foo" }, 0, 0); + err.ShouldBeNull(); + seqs!.Length.ShouldBe(1); + seqs![0].ShouldBe(2UL); + + fs.StoreMsg("foo", null, null, 0); + var sm = fs.LoadLastMsg("foo", null); + sm.ShouldNotBeNull(); + sm!.Seq.ShouldBe(4UL); + + (removed, _) = fs.RemoveMsg(4); + removed.ShouldBeTrue(); + + sm = fs.LoadLastMsg("foo", null); + sm.ShouldNotBeNull(); + sm!.Seq.ShouldBe(2UL); + + fs.Stop(); + } + + // ----------------------------------------------------------------------- + // TestStoreDiscardNew (T:2954) — MemStore permutation only + // ----------------------------------------------------------------------- + + [Fact] // T:2954 + public void StoreDiscardNew_ShouldSucceed() + { + // Reference: golang/nats-server/server/store_test.go:TestStoreDiscardNew line 788 + // Helper that runs the discard-new test for a given config modifier + void Test(Action updateConfig, Exception? expectedErr) + { + var cfg = new StreamConfig + { + Name = "zzz", + Subjects = new[] { "foo" }, + Discard = DiscardPolicy.DiscardNew, + }; + updateConfig(cfg); + cfg.Storage = StorageType.MemoryStorage; + var fs = new JetStreamMemStore(cfg); + + var ts = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L; + var expectedSeq = 1UL; + + void RequireState() + { + var state = fs.State(); + state.Msgs.ShouldBe(1UL); + state.FirstSeq.ShouldBe(expectedSeq); + state.LastSeq.ShouldBe(expectedSeq); + } + + fs.StoreMsg("foo", null, null, 0); + + // StoreRawMsg with discardNewCheck=true + if (expectedErr == null) + { + fs.StoreRawMsg("foo", null, null, 0, ts, 0, true); + expectedSeq++; + } + else + { + Should.Throw(() => fs.StoreRawMsg("foo", null, null, 0, ts, 0, true)); + } + RequireState(); + + // StoreRawMsg with discardNewCheck=false (followers must always accept) + fs.StoreRawMsg("foo", null, null, 0, ts, 0, false); + expectedSeq++; + + // For MaxMsgsPer we stay at 1 msg; otherwise 2 msgs + if (cfg.MaxMsgsPer > 0) + { + RequireState(); + } + else + { + var state = fs.State(); + state.Msgs.ShouldBe(2UL); + state.FirstSeq.ShouldBe(expectedSeq - 1); + state.LastSeq.ShouldBe(expectedSeq); + } + + fs.Stop(); + } + + Test(cfg => cfg.MaxMsgs = 1, StoreErrors.ErrMaxMsgs); + Test(cfg => cfg.MaxBytes = 33, StoreErrors.ErrMaxBytes); + Test(cfg => cfg.MaxMsgsPer = 1, null); + Test(cfg => { cfg.DiscardNewPer = true; cfg.MaxMsgsPer = 1; }, StoreErrors.ErrMaxMsgsPerSubject); + Test(cfg => { cfg.MaxMsgs = 1; cfg.MaxMsgsPer = 1; }, null); + Test(cfg => { cfg.MaxBytes = 33; cfg.MaxMsgsPer = 1; }, null); + Test(cfg => { cfg.DiscardNewPer = true; cfg.MaxMsgs = 1; cfg.MaxMsgsPer = 1; }, StoreErrors.ErrMaxMsgsPerSubject); + Test(cfg => { cfg.DiscardNewPer = true; cfg.MaxBytes = 33; cfg.MaxMsgsPer = 1; }, StoreErrors.ErrMaxMsgsPerSubject); + } +} diff --git a/porting.db b/porting.db index 70f16ac7f47cbd87ed8e29005dd33b6e5c924ca6..d3fe90635ad8def244ec5704943ec857c75f8bcc 100644 GIT binary patch delta 6113 zcmb`Jd2|!ky~m{)jb^mW%(Wc`yj!vXlO}^L%S z5r_Mo`P|=ne{-96UC=kGYy_ zPxxBA96){IegRHD7Nz7UM5j_q>X*%1x~!^kS#6EJ9lu9$bPTm*UX`u~zI`m-dP+}6 zzLkSF)$6TvqQ0hfL2YAWZB4zIxJ}L4%%YESgc4z_;1Jq{7XBiCjlWGDrcP76)J^IO z`W$_N-cN6*-=dr8M!JT6o}Nq>&`;4xG}_#Yrb{#>qzRJ1@ein7)JAF@wSsz)ngvOE z)M^bq!5e{$c3CZZo2L`0(CUt6br-H|jm3n7xTlfOP_T0w&C5(gLRiRID%7{ewEEV|3dGm4=0e*@hCsScAimW{?aV{eZqrU)%ht`D6lpkv>az z(R=7N`gQu3^b*=hm(dgG96FtjgJwTF3AiXU6FRROjo`mdTdlN}dQA0EH>l64bFelF zC4eCsm2lJys)SliHNyF5w6ckWGEhg$Yns)on$>#E%B@+g)vVU2SDdI>37QqpH>u2X zR?S0;W@Xl_Oqvy|SuvWGL9?PYD~hM%!qIbj^+Wan^ioKM$2=MZCuo!c78)IbXbSlt zPC{?!8v<<-deNi_h>d|30l)e=kFwwtg~TRuGXBbZtuu`aew5# z;4X0IxZiMH+ySng+rs&|_1s!+Ik$+L&&}dWxyjrZE{A)XOXK3XXwJmx*zei9?4Q{y z>}PB*+ryqz!4z`7TgLSj3*ao(aoy%6RQ`sW6fX!sn*LZxX*3?}~pGuZW+Cy<(4eLhKOth&#j<@eR=}t`Zx> zI&rR8Ax;&G!~!u>Oc#?yN#sR?@Gs%M@QrX?_=E7N@Ud`OI40~Db_;ESPw)zBgk{3Z zLbdRM@SIRAJR@WaPYJ0)oDhXi_YeHv`8)jA{FnSC{y+Jq5BUIpn17ew&Tryh=Ux0t zzMfye&*7)@Q~2?`gSYe1JY~Iay_Fh4TiHh*dEHFuleGlSV{UT&^2mz&3%GtDUp zX3_N6bjNhX^gGi@(>_z1X}xKMsn#^zRAh2&+O<2uic6^cA(Juic{;jA2R_0Fi4N2Z zMWytnvi%=G-cS^83QkUI7j&nic=#k09e_!x=q-3?M`iH79ZiPC_#gc<;EYD|k7ppg zljS(wVSTp_tTwbA&h$mS4$YY;=Q;7-|L@;G*9J7**rd}@-Kv4liCB0~EeKcuwZLP+ zs}^uJsO4(Gf`uBjfLop_R|_UAj3)vNogl>#CsNdpjS(Lhj=|rT$Lco>Sh%AWXe?Y& z3*-y;JGG$4!b!EDGp*HC5MgZaYsdb8l?kd>*vDANQVWl;U{?zdu@D8$Ehq^`p)XQD z`X?3~YT+MPu&IUbv4GUV-*H#{pceiI3wPDR->`61EqsTC^F(MGh~hq0PN`q{77P2; zLLU~ksD*o2cvUUj#lp|k!W}FuPz$%Qpk!~rY~Ek7qRHMZENHU#4Hh)n`wJGL0$p3t zw4p4`>&^xH{=eW_26_|TDve4X{(tFuyh|YQo!IGmQUi5{Ex)4IX1v03ZBF7;=acTv;7u*Rd~D){f!I!DIHl^FN*S_XfKNKq9?p)kQYf_G}w!V zcu}ku#d%S@7bSR6q8BB3QL-1Mc#+ME>|T`WMQL6%)Qg6B(Qq$H_o5MA^rRR4#EYKt zqNk4|+1PBJJcRws_}Cc7Ob#HwoW~$niLWkNr`Kijg?y%7p9l|J@&vflN+&|bsVEVy z?~r4m;ZOFF@SJ-n!(?R_IC2AVt@2(cG6ecgW$`z8umA*v;%g0zg!Deogr$PJCWIRd?$wZ`l7$F_=6+;t8+gINKmGNl4>!tUtA6nR855 z4s01IIf0D6q&8!qGl|L9$G8^yTy?&m6D#9z%Epm!?)v2lRRZR9ODx>`$i%|6+~NJ{ z9;DcDr}|b9@FfhsM8GQ>xRdGOT+Spxqd znS+M>;S8MKCFQG(<2{Vfu}k9NXS=0{TDvLVGO}Aa-mRS2FW)QG!KIst!L*D$DJ8+8 zy;2mEl>Mkrmim?w_F2q6OW3Ig|#TIFX5GYvD- zl%iF^w|;prbexikp=aIDH282IIr`laF4ZYqstf+jJMM~FH8V zC65hROMxHGNV#Eaxy0H+zn)-47%L)J$8KqcmN0{_1@~vl=HMBK&BC4%);-g&i@|KA zE4h%bB=GKb=>$~2InaknQg{7YnyA=}9jK9ewy%kBrLKvEd@1Hy-;sKtaiTF?iN;3$!L<*m-k4es*>P@8ez$A@OTjSh9(0pNsdxZ-zvg} z$~pRsJOj?!#Bgedz?Wy_{D`nCeJcqTK4%}`E_FA{!TjRBF~j)}f(Kf2*aY3j@d55b zxrd3d`9)IJ8Sc_Teuk2Y8F08ulAz%TZpC*yr9XzE7y_%`m!kV0+&Us91l<``kUff& zyTC6Hcbaepnn(sxj!K&%+5Dr@aOh}E?Prs4Q)Xuow#d&9!mNwNu!~mQaM8q%JlJ_m znyuYt6wz2%-lcXy*(TR-B`&PNyR0ED9O#lZXxaW*6;BL1Na`2+ErgheiHVBY7vTDF z>510k()bXV0%seSQqXfVxCf7f@AmiUi{L2VDx{zt^)QbVG z*^#1;ZUAK%P2;oiz@-w4GOHkChb;}3ylu-P9~nGwx7#FqOS27z^Gz`Wvo^bcjFv3d z6wnbrvN2AsXs!S5kYM^wTLv_q$LAjXM=aaV_w2NV=X?-tZHhoAMN)iEYy{=Dx2`1} zUylveliSVgp4evieH-~O5I=qE9A)8J=MfjHTa~di5fc*$QPSoZ3_Bb{tdQh!I6~}T zc+2DH{|+lj%MJYA=Co&#+Rw00F{t4P0@ z^FiruB}a;XZI0))nK42#*uq_yk%3d|WvN$f-T^F}Gri5(t2eSZ_ zJ4DFFqOI6QxRraiI|gio!&hz$q3~bWW_w;m+}>sz9bscWC^v=>VSO7X?9!lwaL<=( p33m)AmxWp!c`Zrt(>rV{RFUHYHccUv=F4CIXs09aXlE9k@xSOiMuGqU delta 1571 zcmZA03v3ic7y#g%`Db>w*So#Bws+E&_S&OxeQ`b7@@Nz+2-OIJg_;m~ZKVf+K#Q~l zLP#juJ3eg6tV?rak$(PK`zsdab z&p*|XZ&0;Vo9IYV6d0)}$|LEwjp@Tt(kjz3k!51b1j{6rNi9>gOwBTN%Z##2n`K5@ zrrk0fmT4R@oqG{I<<+DnoNAdf;&b!%)TP7pFe*DcjC=>4PK5HViQlnMYsTGV=s<@4`Byvgc?{1^I#gh z0?)$`7yxe2;B#m)(hQHGJ9_Ys_zFIUPvVaqU5*Y%o3YE-Y)mu?ja+WWY$AWn&0fr~ zLlWxCqT=b4l&$}nao!Qj0?yeL3l}X2RfUvyja-E)e0m%^#0%qCiWtis=a6VS;(QF^ zgD5tX@Xx#~kGtbo7LkX~n}jGDh$kyszP9!9G8cOt zw`3QOmAT#T)xb@Uy&iY}uSbPkg=z?2t6)r$4oPwip0P0~I zY=U)AElXPtA>fV}R$x3`iReZA{Twzs>d7VBhVzLrY?%lRV&`$+qH>yVceC|ksE199 zT4~c8m3ugZa6R6JH{o@-8ZX7=IE1_KEqoPUz^(WcK8g>(NXUmQ@Ikz6D-HMJ2haj1 z;4tii2yoZ{7Oa3um=C2e6(&IusUUO63{pbIlQE=#1c{#{6F0FFfF8IHci;wGhV%N$ zukf{euu`8sNG?BezD_KjH>Gk?%y+*In%k&a`te&r@>2W&I ze%Cs+cCAHg)atd(T8&nzy`dFrg<6i5tU1(2>QCxb^-Hx$-J^z8ORZ98tCQ8yYEbp6 z27OGs=rwxIpvUN5%IRvlgqG1M+}6$7vEI%R!Ef^kIqX$#&tWg|69HDt^8;)(Uptu9 z*A?$?=3@r4MA!dc*f5cs!HU8Ai)_CqcVPddZ*aJ8fQ0H}9U#uyT_eFI`%jblf{n`2 zAVa1=LG}d_ts7aPDt5o%IfljQ_gLY;C+peu@#Bz~xhFmqi9iec_Ss|y=~AN6OvQMB zIhyHcus225*_KC@>d$KiZ5OE#y%34H>-`%59%`Jf-zxk;@S|o-8yRo=_+gGOYKfm`~3#Yh9 ztga_3&SWFaG0gs4be?TklwId8AI=cL?&Phi2uAcGBsyvXa|v&Yc${KUr*8p-3tcL{ zq;S0j47L_~IaP7;CldE$Ui-|e|A+#h)^qHxf!3%xmTnu>W1p5eA| zx1X+|Z2>7q3JTgb1UDc>=+(I?NQ~H#+KR;T=J-x5+O6C?+dq-o+`Ax3Yf~p6v7|2b Tj4j-ppsrCPxk8QP_2m2w7Am+M diff --git a/reports/current.md b/reports/current.md index bf4d42e..a891540 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-02-27 00:15:57 UTC +Generated: 2026-02-27 00:35:59 UTC ## Modules (12 total) @@ -21,10 +21,10 @@ Generated: 2026-02-27 00:15:57 UTC | Status | Count | |--------|-------| -| complete | 214 | -| deferred | 215 | +| complete | 252 | +| deferred | 235 | | n_a | 187 | -| not_started | 2527 | +| not_started | 2469 | | verified | 114 | ## Library Mappings (36 total) @@ -36,4 +36,4 @@ Generated: 2026-02-27 00:15:57 UTC ## Overall Progress -**4199/6942 items complete (60.5%)** +**4237/6942 items complete (61.0%)** diff --git a/reports/report_917cd33.md b/reports/report_917cd33.md new file mode 100644 index 0000000..a891540 --- /dev/null +++ b/reports/report_917cd33.md @@ -0,0 +1,39 @@ +# NATS .NET Porting Status Report + +Generated: 2026-02-27 00:35:59 UTC + +## Modules (12 total) + +| Status | Count | +|--------|-------| +| not_started | 1 | +| verified | 11 | + +## Features (3673 total) + +| Status | Count | +|--------|-------| +| complete | 3368 | +| n_a | 26 | +| verified | 279 | + +## Unit Tests (3257 total) + +| Status | Count | +|--------|-------| +| complete | 252 | +| deferred | 235 | +| n_a | 187 | +| not_started | 2469 | +| verified | 114 | + +## Library Mappings (36 total) + +| Status | Count | +|--------|-------| +| mapped | 36 | + + +## Overall Progress + +**4237/6942 items complete (61.0%)**