diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs index 1ed7238..79f2260 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MemStore.cs @@ -78,7 +78,11 @@ public sealed class JetStreamMemStore : IStreamStore _maxp = cfg.MaxMsgsPer; if (cfg.FirstSeq > 0) - Purge(); + { + // Set the initial state so that the first StoreMsg call assigns seq = cfg.FirstSeq. + _state.LastSeq = cfg.FirstSeq - 1; + _state.FirstSeq = cfg.FirstSeq; + } } // ----------------------------------------------------------------------- @@ -92,7 +96,10 @@ public sealed class JetStreamMemStore : IStreamStore try { var seq = _state.LastSeq + 1; - var ts = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L; + // Use 100-nanosecond Ticks for higher timestamp precision. + // Nanoseconds since Unix epoch: (Ticks - UnixEpochTicks) * 100 + const long UnixEpochTicks = 621355968000000000L; + var ts = (DateTimeOffset.UtcNow.UtcTicks - UnixEpochTicks) * 100L; try { StoreRawMsgLocked(subject, hdr, msg, seq, ts, ttl, discardNewCheck: true); @@ -134,12 +141,24 @@ public sealed class JetStreamMemStore : IStreamStore hdr ??= Array.Empty(); msg ??= Array.Empty(); + // Determine if we are at the per-subject limit. + bool atSubjectLimit = false; + if (_maxp > 0 && !string.IsNullOrEmpty(subject)) + { + var subjectBytesCheck = Encoding.UTF8.GetBytes(subject); + var (ssCheck, foundCheck) = _fss.Find(subjectBytesCheck); + if (foundCheck && ssCheck != null) + atSubjectLimit = ssCheck.Msgs >= (ulong)_maxp; + } + // Discard-new enforcement if (discardNewCheck && _cfg.Discard == DiscardPolicy.DiscardNew) { - if (_cfg.MaxMsgs > 0 && _state.Msgs >= (ulong)_cfg.MaxMsgs) + if (atSubjectLimit && _cfg.DiscardNewPer) + throw StoreErrors.ErrMaxMsgsPerSubject; + if (_cfg.MaxMsgs > 0 && _state.Msgs >= (ulong)_cfg.MaxMsgs && !atSubjectLimit) throw StoreErrors.ErrMaxMsgs; - if (_cfg.MaxBytes > 0 && _state.Bytes + MsgSize(subject, hdr, msg) > (ulong)_cfg.MaxBytes) + if (_cfg.MaxBytes > 0 && _state.Bytes + MsgSize(subject, hdr, msg) > (ulong)_cfg.MaxBytes && !atSubjectLimit) throw StoreErrors.ErrMaxBytes; } @@ -342,7 +361,11 @@ public sealed class JetStreamMemStore : IStreamStore { var (ss, found) = _fss.Find(Encoding.UTF8.GetBytes(subject)); if (found && ss != null && ss.Msgs > 0) + { + if (ss.LastNeedsUpdate) + RecalculateForSubj(subject, ss); _msgs.TryGetValue(ss.Last, out stored); + } } if (stored == null) @@ -603,13 +626,69 @@ public sealed class JetStreamMemStore : IStreamStore /// public (ulong Purged, Exception? Error) PurgeEx(string subject, ulong seq, ulong keep) { - // TODO: session 17 — full subject-filtered purge - if (string.IsNullOrEmpty(subject) || subject == ">") + var isAll = string.IsNullOrEmpty(subject) || subject == ">"; + if (isAll) { if (keep == 0 && seq == 0) return Purge(); + if (seq > 1) + return Compact(seq); + if (keep > 0) + { + ulong msgs, lseq; + _mu.EnterReadLock(); + msgs = _state.Msgs; + lseq = _state.LastSeq; + _mu.ExitReadLock(); + if (keep >= msgs) + return (0, null); + return Compact(lseq - keep + 1); + } + return (0, null); } - return (0, null); + + // Subject-filtered purge + var ss = FilteredState(1, subject); + if (ss.Msgs == 0) + return (0, null); + + if (keep > 0) + { + if (keep >= ss.Msgs) + return (0, null); + ss.Msgs -= keep; + } + + var last = ss.Last; + if (seq > 1) + last = seq - 1; + + ulong purged = 0; + _mu.EnterWriteLock(); + try + { + if (_msgs == null) + return (0, null); + + for (var s = ss.First; s <= last; s++) + { + if (_msgs.TryGetValue(s, out var sm) && sm != null && sm.Subject == subject) + { + if (RemoveMsgLocked(s, false)) + { + purged++; + if (purged >= ss.Msgs) + break; + } + } + } + } + finally + { + if (_mu.IsWriteLockHeld) + _mu.ExitWriteLock(); + } + return (purged, null); } /// @@ -703,9 +782,10 @@ public sealed class JetStreamMemStore : IStreamStore // Full reset purged = (ulong)_msgs.Count; bytes = _state.Bytes; - _state = new StreamState { LastTime = DateTime.UtcNow }; + _state = new StreamState(); _msgs = new Dictionary(); _dmap = new SequenceSet(); + _fss.Reset(); } else { @@ -847,6 +927,8 @@ public sealed class JetStreamMemStore : IStreamStore return new SimpleState { Msgs = _state.Msgs, First = _state.FirstSeq, Last = _state.LastSeq }; var ss = new SimpleState(); + var havePartial = false; + _fss.Match(Encoding.UTF8.GetBytes(filter), (subj, fss) => { if (fss.FirstNeedsUpdate || fss.LastNeedsUpdate) @@ -854,12 +936,46 @@ public sealed class JetStreamMemStore : IStreamStore if (sseq <= fss.First) { + // All messages in this subject are at or after sseq ss.Msgs += fss.Msgs; if (ss.First == 0 || fss.First < ss.First) ss.First = fss.First; if (fss.Last > ss.Last) ss.Last = fss.Last; } + else if (sseq <= fss.Last) + { + // Partial: sseq is inside this subject's range — need to scan + havePartial = true; + // Still track Last for the scan bounds + if (fss.Last > ss.Last) ss.Last = fss.Last; + } + // else sseq > fss.Last: all messages before sseq, skip return true; }); + + if (!havePartial) + return ss; + + // Need to scan messages from sseq to ss.Last + if (_msgs == null) + return ss; + + var scanFirst = sseq; + var scanLast = ss.Last; + if (scanLast == 0) scanLast = _state.LastSeq; + + // Reset and rescan + ss = new SimpleState(); + for (var seq = scanFirst; seq <= scanLast; seq++) + { + if (!_msgs.TryGetValue(seq, out var sm) || sm == null) + continue; + if (isAll || MatchLiteral(sm.Subject, filter)) + { + ss.Msgs++; + if (ss.First == 0) ss.First = seq; + ss.Last = seq; + } + } return ss; } @@ -947,8 +1063,10 @@ public sealed class JetStreamMemStore : IStreamStore { if (_msgs == null || _msgs.Count == 0) return (Array.Empty(), null); var seqs = new List(_fss.Size()); - _fss.IterFast((_, ss) => + _fss.IterFast((subj, ss) => { + if (ss.LastNeedsUpdate) + RecalculateForSubj(Encoding.UTF8.GetString(subj), ss); seqs.Add(ss.Last); return true; }); @@ -974,14 +1092,32 @@ public sealed class JetStreamMemStore : IStreamStore var seen = new HashSet(); foreach (var filter in filters) { - _fss.Match(Encoding.UTF8.GetBytes(filter), (_, ss) => + _fss.Match(Encoding.UTF8.GetBytes(filter), (subj, ss) => { - if (ss.Last <= maxSeq && seen.Add(ss.Last)) - seqs.Add(ss.Last); + if (ss.LastNeedsUpdate) + RecalculateForSubj(Encoding.UTF8.GetString(subj), ss); + if (ss.Last <= maxSeq) + { + if (seen.Add(ss.Last)) + seqs.Add(ss.Last); + } + else if (ss.Msgs > 1) + { + // Last is beyond maxSeq — scan backwards for the most recent msg <= maxSeq. + var s = Encoding.UTF8.GetString(subj); + for (var seq = maxSeq; seq > 0; seq--) + { + if (_msgs.TryGetValue(seq, out var sm) && sm != null && sm.Subject == s) + { + if (seen.Add(seq)) seqs.Add(seq); + break; + } + } + } return true; }); if (maxAllowed > 0 && seqs.Count > maxAllowed) - return (Array.Empty(), StoreErrors.ErrTooManyResults); + return (null!, StoreErrors.ErrTooManyResults); } seqs.Sort(); return (seqs.ToArray(), null); @@ -1017,7 +1153,9 @@ public sealed class JetStreamMemStore : IStreamStore /// public ulong GetSeqFromTime(DateTime t) { - var ts = new DateTimeOffset(t).ToUnixTimeMilliseconds() * 1_000_000L; + // Use same 100-nanosecond precision as StoreMsg timestamps. + const long UnixEpochTicksGsft = 621355968000000000L; + var ts = (new DateTimeOffset(t, TimeSpan.Zero).UtcTicks - UnixEpochTicksGsft) * 100L; _mu.EnterReadLock(); try { @@ -1038,7 +1176,9 @@ public sealed class JetStreamMemStore : IStreamStore break; } if (lastSm == null) return _state.LastSeq + 1; - if (ts >= lastSm.Ts) return _state.LastSeq + 1; + // Mirror Go: if ts == last ts return that seq; if ts > last ts return pastEnd. + if (ts == lastSm.Ts) return lastSm.Seq; + if (ts > lastSm.Ts) return _state.LastSeq + 1; // Linear scan fallback for (var seq = _state.FirstSeq; seq <= _state.LastSeq; seq++) @@ -1066,9 +1206,32 @@ public sealed class JetStreamMemStore : IStreamStore try { _cfg = cfg.Clone(); - _maxp = cfg.MaxMsgsPer; + + // Clamp MaxMsgsPer to minimum of -1 + if (_cfg.MaxMsgsPer < -1) + { + _cfg.MaxMsgsPer = -1; + cfg.MaxMsgsPer = -1; + } + + var oldMaxp = _maxp; + _maxp = _cfg.MaxMsgsPer; + EnforceMsgLimit(); EnforceBytesLimit(); + + // Enforce per-subject limits if MaxMsgsPer was reduced or newly set + if (_maxp > 0 && (oldMaxp == 0 || _maxp < oldMaxp)) + { + var lm = (ulong)_maxp; + _fss.IterFast((subj, ss) => + { + if (ss.Msgs > lm) + EnforcePerSubjectLimit(Encoding.UTF8.GetString(subj), ss); + return true; + }); + } + if (_ageChk == null && _cfg.MaxAge != TimeSpan.Zero) StartAgeChk(); if (_ageChk != null && _cfg.MaxAge == TimeSpan.Zero) @@ -1400,7 +1563,9 @@ public sealed class JetStreamMemStore : IStreamStore { if (_msgs == null || _cfg.MaxAge == TimeSpan.Zero) return; var minAge = DateTime.UtcNow - _cfg.MaxAge; - var minTs = new DateTimeOffset(minAge).ToUnixTimeMilliseconds() * 1_000_000L; + // Use same 100-nanosecond precision as StoreMsg timestamps. + const long UnixEpochTicksExp = 621355968000000000L; + var minTs = (new DateTimeOffset(minAge, TimeSpan.Zero).UtcTicks - UnixEpochTicksExp) * 100L; var toRemove = new List(); foreach (var kv in _msgs) { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamMemoryStoreTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamMemoryStoreTests.cs new file mode 100644 index 0000000..0d7ca72 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamMemoryStoreTests.cs @@ -0,0 +1,1169 @@ +// Copyright 2019-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Mirrors server/memstore_test.go + +using System.Text; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +/// +/// Unit tests for the in-memory JetStream message store. +/// Mirrors server/memstore_test.go. +/// +public class JetStreamMemoryStoreTests +{ + // ----------------------------------------------------------------------- + // Helpers + // ----------------------------------------------------------------------- + + private static JetStreamMemStore NewMemStore(StreamConfig? cfg = null) + { + cfg ??= new StreamConfig { Storage = StorageType.MemoryStorage, Name = "test" }; + return new JetStreamMemStore(cfg); + } + + private static byte[] Bytes(string s) => Encoding.UTF8.GetBytes(s); + + /// Computes the expected stored-message size: subj.len + hdr.len + msg.len + 16. + private static ulong MsgSize(string subj, byte[]? hdr, byte[]? msg) + => (ulong)(subj.Length + (hdr?.Length ?? 0) + (msg?.Length ?? 0) + 16); + + // ----------------------------------------------------------------------- + // TestMemStoreBasics (T:2023) + // ----------------------------------------------------------------------- + + [Fact] // T:2023 + public void MemStoreBasics_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreBasics line 32 + var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage }); + ms.Stop(); + + ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage }); + var subj = "foo"; + var msg = Bytes("Hello World"); + var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L; + + var (seq, ts) = ms.StoreMsg(subj, null, msg, 0); + seq.ShouldBe(1UL); + ts.ShouldBeGreaterThanOrEqualTo(now); + + var state = ms.State(); + state.Msgs.ShouldBe(1UL); + + var expectedSize = MsgSize(subj, null, msg); + state.Bytes.ShouldBe(expectedSize); + + var sm = ms.LoadMsg(1, null); + sm.ShouldNotBeNull(); + sm!.Subject.ShouldBe(subj); + sm.Msg.ShouldBe(msg); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreMsgLimit (T:2024) + // ----------------------------------------------------------------------- + + [Fact] // T:2024 + public void MemStoreMsgLimit_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreMsgLimit line 67 + var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage, MaxMsgs = 10 }); + + var subj = "foo"; + var msg = Bytes("Hello World"); + for (var i = 0; i < 10; i++) + ms.StoreMsg(subj, null, msg, 0); + + var state = ms.State(); + state.Msgs.ShouldBe(10UL); + + // Adding one more should still keep 10 (oldest evicted) + ms.StoreMsg(subj, null, msg, 0); + state = ms.State(); + state.Msgs.ShouldBe(10UL); + state.LastSeq.ShouldBe(11UL); + state.FirstSeq.ShouldBe(2UL); + + // Seq 1 should not be loadable + Should.Throw(() => ms.LoadMsg(1, null)); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreBytesLimit (T:2025) + // ----------------------------------------------------------------------- + + [Fact] // T:2025 + public void MemStoreBytesLimit_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreBytesLimit line 99 + var subj = "foo"; + var msg = new byte[512]; + var storedMsgSize = MsgSize(subj, null, msg); + + var toStore = 1024UL; + var maxBytes = storedMsgSize * toStore; + + var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage, MaxBytes = (long)maxBytes }); + + for (var i = 0UL; i < toStore; i++) + ms.StoreMsg(subj, null, msg, 0); + + var state = ms.State(); + state.Msgs.ShouldBe(toStore); + state.Bytes.ShouldBe(storedMsgSize * toStore); + + // Now send 10 more and check that bytes limit enforced + for (var i = 0; i < 10; i++) + ms.StoreMsg(subj, null, msg, 0); + + state = ms.State(); + state.Msgs.ShouldBe(toStore); + state.Bytes.ShouldBe(storedMsgSize * toStore); + state.FirstSeq.ShouldBe(11UL); + state.LastSeq.ShouldBe(toStore + 10UL); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreBytesLimitWithDiscardNew (T:2026) + // ----------------------------------------------------------------------- + + [Fact] // T:2026 + public void MemStoreBytesLimitWithDiscardNew_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreBytesLimitWithDiscardNew line 143 + var subj = "tiny"; + var msg = new byte[7]; + var storedMsgSize = MsgSize(subj, null, msg); + var toStore = 3UL; + var maxBytes = 100; + + var ms = NewMemStore(new StreamConfig + { + Storage = StorageType.MemoryStorage, + MaxBytes = maxBytes, + Discard = DiscardPolicy.DiscardNew + }); + + // Send 10 messages; first toStore succeed, the rest hit ErrMaxBytes + for (var i = 0; i < 10; i++) + { + var (seq, _) = ms.StoreMsg(subj, null, msg, 0); + if (i < (int)toStore) + seq.ShouldNotBe(0UL); // success + // After toStore, StoreMsg returns (0, 0) on error + } + + var state = ms.State(); + state.Msgs.ShouldBe(toStore); + state.Bytes.ShouldBe(storedMsgSize * toStore); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreAgeLimit (T:2027) + // ----------------------------------------------------------------------- + + [Fact] // T:2027 + public async Task MemStoreAgeLimit_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreAgeLimit line 174 + var maxAge = TimeSpan.FromMilliseconds(100); + var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage, MaxAge = maxAge }); + + var subj = "foo"; + var msg = Bytes("Hello World"); + for (var i = 0; i < 100; i++) + ms.StoreMsg(subj, null, msg, 0); + + var state = ms.State(); + state.Msgs.ShouldBe(100UL); + + // Wait for messages to expire + await Task.Delay(TimeSpan.FromMilliseconds(500)); + + // Check messages expired + var deadline = DateTime.UtcNow.AddSeconds(2); + while (DateTime.UtcNow < deadline) + { + state = ms.State(); + if (state.Msgs == 0 && state.Bytes == 0) break; + await Task.Delay(10); + } + + state = ms.State(); + state.Msgs.ShouldBe(0UL); + state.Bytes.ShouldBe(0UL); + + // Now add some more and make sure that timer will fire again + for (var i = 0; i < 100; i++) + ms.StoreMsg(subj, null, msg, 0); + + state = ms.State(); + state.Msgs.ShouldBe(100UL); + + await Task.Delay(TimeSpan.FromMilliseconds(500)); + + deadline = DateTime.UtcNow.AddSeconds(2); + while (DateTime.UtcNow < deadline) + { + state = ms.State(); + if (state.Msgs == 0 && state.Bytes == 0) break; + await Task.Delay(10); + } + + state = ms.State(); + state.Msgs.ShouldBe(0UL); + state.Bytes.ShouldBe(0UL); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreTimeStamps (T:2028) + // ----------------------------------------------------------------------- + + [Fact] // T:2028 + public async Task MemStoreTimeStamps_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreTimeStamps line 216 + var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage }); + + var last = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L; + var subj = "foo"; + var msg = Bytes("Hello World"); + + for (var i = 0; i < 10; i++) + { + await Task.Delay(5); // small delay to ensure distinct timestamps + ms.StoreMsg(subj, null, msg, 0); + } + + var smv = new StoreMsg(); + for (var seq = 1UL; seq <= 10UL; seq++) + { + var sm = ms.LoadMsg(seq, smv); + sm.ShouldNotBeNull(); + sm!.Ts.ShouldBeGreaterThan(last); + last = sm.Ts; + } + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStorePurge (T:2029) + // ----------------------------------------------------------------------- + + [Fact] // T:2029 + public void MemStorePurge_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStorePurge line 241 + var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage }); + + var subj = "foo"; + var msg = Bytes("Hello World"); + for (var i = 0; i < 10; i++) + ms.StoreMsg(subj, null, msg, 0); + + ms.State().Msgs.ShouldBe(10UL); + + ms.Purge(); + + ms.State().Msgs.ShouldBe(0UL); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreCompact (T:2030) + // ----------------------------------------------------------------------- + + [Fact] // T:2030 + public void MemStoreCompact_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreCompact line 259 + var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage }); + + var subj = "foo"; + var msg = Bytes("Hello World"); + for (var i = 0; i < 10; i++) + ms.StoreMsg(subj, null, msg, 0); + + ms.State().Msgs.ShouldBe(10UL); + + var (n, err) = ms.Compact(6); + err.ShouldBeNull(); + n.ShouldBe(5UL); + + var state = ms.State(); + state.Msgs.ShouldBe(5UL); + state.FirstSeq.ShouldBe(6UL); + + // Compact past end should purge + (n, err) = ms.Compact(100); + err.ShouldBeNull(); + n.ShouldBe(5UL); + + state = ms.State(); + state.FirstSeq.ShouldBe(100UL); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreEraseMsg (T:2031) + // ----------------------------------------------------------------------- + + [Fact] // T:2031 + public void MemStoreEraseMsg_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreEraseMsg line 298 + var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage }); + + var subj = "foo"; + var msg = Bytes("Hello World"); + ms.StoreMsg(subj, null, msg, 0); + + var sm = ms.LoadMsg(1, null); + sm.ShouldNotBeNull(); + sm!.Msg.ShouldBe(msg); + + var (removed, _) = ms.EraseMsg(1); + removed.ShouldBeTrue(); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreMsgHeaders (T:2032) + // ----------------------------------------------------------------------- + + [Fact] // T:2032 + public void MemStoreMsgHeaders_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreMsgHeaders line 317 + var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage }); + + var subj = "foo"; + var hdr = Bytes("name:derek"); + var msg = Bytes("Hello World"); + + // Verify computed size + var sz = (int)MsgSize(subj, hdr, msg); + sz.ShouldBe(subj.Length + hdr.Length + msg.Length + 16); + + ms.StoreMsg(subj, hdr, msg, 0); + + var sm = ms.LoadMsg(1, null); + sm.ShouldNotBeNull(); + sm!.Msg.ShouldBe(msg); + sm.Hdr.ShouldBe(hdr); + + var (removed, _) = ms.EraseMsg(1); + removed.ShouldBeTrue(); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreStreamStateDeleted (T:2033) + // ----------------------------------------------------------------------- + + [Fact] // T:2033 + public void MemStoreStreamStateDeleted_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreStreamStateDeleted line 342 + var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage }); + + var subj = "foo"; + var toStore = 10UL; + for (var i = 1UL; i <= toStore; i++) + { + var m = Encoding.UTF8.GetBytes($"[{i:D8}] Hello World!"); + ms.StoreMsg(subj, null, m, 0); + } + + var state = ms.State(); + state.Deleted.ShouldBeNull(); + + // Remove some interior messages (even seqs: 2,4,6,8) + var expected = new List(); + for (var seq = 2UL; seq < toStore; seq += 2) + { + ms.RemoveMsg(seq); + expected.Add(seq); + } + + state = ms.State(); + state.Deleted.ShouldNotBeNull(); + state.Deleted!.ShouldBe(expected.ToArray()); + + // Now fill the gap by deleting 1 and 3 + ms.RemoveMsg(1); + ms.RemoveMsg(3); + expected.RemoveRange(0, 2); // remove 2 and 4 (the first two) + state = ms.State(); + state.Deleted!.ShouldBe(expected.ToArray()); + state.FirstSeq.ShouldBe(5UL); + + ms.Purge(); + state = ms.State(); + (state.Deleted?.Length ?? 0).ShouldBe(0); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreStreamTruncate (T:2034) + // ----------------------------------------------------------------------- + + [Fact] // T:2034 + public void MemStoreStreamTruncate_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreStreamTruncate line 385 + var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage }); + + var tseq = 50UL; + var toStore = 100UL; + + for (var i = 1UL; i < tseq; i++) + ms.StoreMsg("foo", null, Bytes("ok"), 0); + for (var i = tseq; i <= toStore; i++) + ms.StoreMsg("bar", null, Bytes("ok"), 0); + + ms.State().Msgs.ShouldBe(toStore); + + ms.Truncate(tseq); + ms.State().Msgs.ShouldBe(tseq); + + // Now test with interior deletes + ms.RemoveMsg(10); + ms.RemoveMsg(20); + ms.RemoveMsg(30); + ms.RemoveMsg(40); + + tseq = 25UL; + ms.Truncate(tseq); + + var state = ms.State(); + state.Msgs.ShouldBe(tseq - 2); // 10 and 20 deleted + state.NumSubjects.ShouldBe(1); + state.Deleted.ShouldBe(new ulong[] { 10, 20 }); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStorePurgeExWithSubject (T:2035) + // ----------------------------------------------------------------------- + + [Fact] // T:2035 + public void MemStorePurgeExWithSubject_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStorePurgeExWithSubject line 437 + var ms = NewMemStore(new StreamConfig { Storage = StorageType.MemoryStorage }); + + for (var i = 0; i < 100; i++) + ms.StoreMsg("foo", null, null, 0); + + // Purge all with subject "foo" + ms.PurgeEx("foo", 1, 0); + ms.State().Msgs.ShouldBe(0UL); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreUpdateMaxMsgsPerSubject (T:2036) + // ----------------------------------------------------------------------- + + [Fact] // T:2036 + public void MemStoreUpdateMaxMsgsPerSubject_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreUpdateMaxMsgsPerSubject line 452 + var cfg = new StreamConfig + { + Name = "TEST", + Storage = StorageType.MemoryStorage, + Subjects = new[] { "foo" }, + MaxMsgsPer = 10, + }; + var ms = NewMemStore(cfg); + + // Increase limit + cfg.MaxMsgsPer = 50; + ms.UpdateConfig(cfg); + + var numStored = 22; + for (var i = 0; i < numStored; i++) + ms.StoreMsg("foo", null, null, 0); + + var ss = ms.SubjectsState("foo")["foo"]; + ss.Msgs.ShouldBe((ulong)numStored); + + // Decrease limit to 10 — should trim to 10 + cfg.MaxMsgsPer = 10; + ms.UpdateConfig(cfg); + + ss = ms.SubjectsState("foo")["foo"]; + ss.Msgs.ShouldBe(10UL); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreStreamTruncateReset (T:2037) + // ----------------------------------------------------------------------- + + [Fact] // T:2037 + public void MemStoreStreamTruncateReset_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreStreamTruncateReset line 490 + var cfg = new StreamConfig + { + Name = "TEST", + Storage = StorageType.MemoryStorage, + Subjects = new[] { "foo" }, + }; + var ms = NewMemStore(cfg); + + var subj = "foo"; + var msg = Bytes("Hello World"); + for (var i = 0; i < 1000; i++) + ms.StoreMsg(subj, null, msg, 0); + + // Reset everything via Truncate(0) + ms.Truncate(0); + + var state = ms.State(); + state.Msgs.ShouldBe(0UL); + state.Bytes.ShouldBe(0UL); + state.FirstSeq.ShouldBe(0UL); + state.LastSeq.ShouldBe(0UL); + state.NumSubjects.ShouldBe(0); + state.NumDeleted.ShouldBe(0); + + // Store 1000 more after reset + for (var i = 0; i < 1000; i++) + ms.StoreMsg(subj, null, msg, 0); + + state = ms.State(); + state.Msgs.ShouldBe(1000UL); + state.FirstSeq.ShouldBe(1UL); + state.LastSeq.ShouldBe(1000UL); + state.NumSubjects.ShouldBe(1); + state.NumDeleted.ShouldBe(0); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreStreamCompactMultiBlockSubjectInfo (T:2038) + // ----------------------------------------------------------------------- + + [Fact] // T:2038 + public void MemStoreStreamCompactMultiBlockSubjectInfo_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreStreamCompactMultiBlockSubjectInfo line 531 + var cfg = new StreamConfig + { + Name = "TEST", + Storage = StorageType.MemoryStorage, + Subjects = new[] { "foo.*" }, + }; + var ms = NewMemStore(cfg); + + for (var i = 0; i < 1000; i++) + ms.StoreMsg($"foo.{i}", null, Bytes("Hello World"), 0); + + var (deleted, err) = ms.Compact(501); + err.ShouldBeNull(); + deleted.ShouldBe(500UL); + + var state = ms.State(); + state.NumSubjects.ShouldBe(500); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreSubjectsTotals (T:2039) + // ----------------------------------------------------------------------- + + [Fact] // T:2039 + public void MemStoreSubjectsTotals_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreSubjectsTotals line 557 + var cfg = new StreamConfig + { + Name = "TEST", + Storage = StorageType.MemoryStorage, + Subjects = new[] { "*.*" }, + }; + var ms = NewMemStore(cfg); + + var rng = new Random(42); + var fmap = new Dictionary(); + var bmap = new Dictionary(); + + for (var i = 0; i < 10_000; i++) + { + string ft; + Dictionary m; + if (rng.Next(2) == 0) + { + ft = "foo"; m = fmap; + } + else + { + ft = "bar"; m = bmap; + } + var dt = rng.Next(100); + var subj = $"{ft}.{dt}"; + m[dt] = m.GetValueOrDefault(dt) + 1; + ms.StoreMsg(subj, null, Bytes("Hello World"), 0); + } + + // Check each individual subject in fmap + foreach (var (dt, total) in fmap) + { + var subj = $"foo.{dt}"; + var m = ms.SubjectsTotals(subj); + m[subj].ShouldBe((ulong)total); + } + + // Check fmap wildcard totals + var st = ms.SubjectsTotals("foo.*"); + st.Count.ShouldBe(fmap.Count); + var expectedTotal = fmap.Values.Sum(); + var receivedTotal = st.Values.Select(v => (long)v).Sum(); + receivedTotal.ShouldBe(expectedTotal); + + // Check bmap wildcard totals + st = ms.SubjectsTotals("bar.*"); + st.Count.ShouldBe(bmap.Count); + expectedTotal = bmap.Values.Sum(); + receivedTotal = st.Values.Select(v => (long)v).Sum(); + receivedTotal.ShouldBe(expectedTotal); + + // All with pwc match + st = ms.SubjectsTotals("*.*"); + st.Count.ShouldBe(bmap.Count + fmap.Count); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreNumPending (T:2040) + // ----------------------------------------------------------------------- + + [Fact] // T:2040 + public void MemStoreNumPending_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreNumPending line 637 + var cfg = new StreamConfig + { + Name = "TEST", + Storage = StorageType.MemoryStorage, + Subjects = new[] { "*.*.*.*" }, + }; + var ms = NewMemStore(cfg); + + var tokens = new[] { "foo", "bar", "baz" }; + var rng = new Random(99); + string GenSubj() => + $"{tokens[rng.Next(tokens.Length)]}.{tokens[rng.Next(tokens.Length)]}.{tokens[rng.Next(tokens.Length)]}.{tokens[rng.Next(tokens.Length)]}"; + + for (var i = 0; i < 5_000; i++) + { + var subj = GenSubj(); + ms.StoreMsg(subj, null, Bytes("Hello World"), 0); + } + + var state = ms.State(); + + // Scan one by one for sanity check + SimpleState SanityCheck(ulong sseq, string filter) + { + var ss = new SimpleState(); + var smv = new StoreMsg(); + if (sseq == 0) sseq = 1; + for (var seq = sseq; seq <= state.LastSeq; seq++) + { + try + { + var sm = ms.LoadMsg(seq, smv); + if (sm != null && SubjectIsSubsetMatch(sm.Subject, filter)) + { + ss.Msgs++; + ss.Last = seq; + if (ss.First == 0 || seq < ss.First) ss.First = seq; + } + } + catch { /* skip deleted */ } + } + return ss; + } + + void Check(ulong sseq, string filter) + { + var (np, lvs, err) = ms.NumPending(sseq, filter, false); + err.ShouldBeNull(); + lvs.ShouldBe(state.LastSeq); + var ss = ms.FilteredState(sseq, filter); + var sss = SanityCheck(sseq, filter); + ss.Msgs.ShouldBe(np); + ss.Msgs.ShouldBe(sss.Msgs); + } + + var startSeqs = new ulong[] { 0, 1, 2, 200, 444, 555, 2222 }; + var checkSubs = new[] { "foo.>", "*.bar.>", "foo.bar.*.baz" }; + + foreach (var filter in checkSubs) + foreach (var start in startSeqs) + Check(start, filter); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreInitialFirstSeq (T:2041) + // ----------------------------------------------------------------------- + + [Fact] // T:2041 + public void MemStoreInitialFirstSeq_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreInitialFirstSeq line 765 + var cfg = new StreamConfig + { + Name = "zzz", + Storage = StorageType.MemoryStorage, + FirstSeq = 1000, + }; + var ms = NewMemStore(cfg); + + var (seq, _, err) = (ms.StoreMsg("A", null, Bytes("OK"), 0).Seq, 0L, (Exception?)null); + seq.ShouldBe(1000UL); + + var (seq2, _) = ms.StoreMsg("B", null, Bytes("OK"), 0); + seq2.ShouldBe(1001UL); + + var state = new StreamState(); + ms.FastState(state); + state.Msgs.ShouldBe(2UL); + state.FirstSeq.ShouldBe(1000UL); + state.LastSeq.ShouldBe(1001UL); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreGetSeqFromTimeWithLastDeleted (T:2043) + // ----------------------------------------------------------------------- + + [Fact] // T:2043 + public async Task MemStoreGetSeqFromTimeWithLastDeleted_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreGetSeqFromTimeWithLastDeleted line 839 + var cfg = new StreamConfig + { + Name = "zzz", + Subjects = new[] { "*" }, + Storage = StorageType.MemoryStorage, + }; + var ms = NewMemStore(cfg); + + var total = 100; + DateTime st = default; + for (var i = 1; i <= total; i++) + { + ms.StoreMsg("A", null, Bytes("OK"), 0); + if (i == total / 2) + { + await Task.Delay(50); + st = DateTime.UtcNow; + } + } + + // Delete last 10 + for (var seq = total - 10; seq <= total; seq++) + ms.RemoveMsg((ulong)seq); + + // Make sure this does not panic with last sequence no longer accessible + var gotSeq = ms.GetSeqFromTime(st); + // Should return around 51 (the halfway point) + gotSeq.ShouldBeGreaterThanOrEqualTo(49UL); + gotSeq.ShouldBeLessThanOrEqualTo(53UL); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreSkipMsgs (T:2044) + // ----------------------------------------------------------------------- + + [Fact] // T:2044 + public void MemStoreSkipMsgs_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreSkipMsgs line 871 + var cfg = new StreamConfig + { + Name = "zzz", + Subjects = new[] { "*" }, + Storage = StorageType.MemoryStorage, + }; + var ms = NewMemStore(cfg); + + // Wrong starting sequence must fail + Should.Throw(() => ms.SkipMsgs(10, 100)); + + ms.SkipMsgs(1, 100); + + var state = ms.State(); + state.FirstSeq.ShouldBe(101UL); + state.LastSeq.ShouldBe(100UL); + + // Now skip a large amount + ms.SkipMsgs(101, 100_000); + state = ms.State(); + state.FirstSeq.ShouldBe(100_101UL); + state.LastSeq.ShouldBe(100_100UL); + + // New store: add a message then skip + ms = NewMemStore(cfg); + ms.StoreMsg("foo", null, null, 0); + + ms.SkipMsgs(2, 10); + state = ms.State(); + state.FirstSeq.ShouldBe(1UL); + state.LastSeq.ShouldBe(11UL); + state.Msgs.ShouldBe(1UL); + state.NumDeleted.ShouldBe(10); + state.Deleted!.Length.ShouldBe(10); + + // Check FastState + state.Deleted = null; + ms.FastState(state); + state.FirstSeq.ShouldBe(1UL); + state.LastSeq.ShouldBe(11UL); + state.Msgs.ShouldBe(1UL); + state.NumDeleted.ShouldBe(10); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreMultiLastSeqs (T:2045) + // ----------------------------------------------------------------------- + + [Fact] // T:2045 + public void MemStoreMultiLastSeqs_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreMultiLastSeqs line 923 + var cfg = new StreamConfig + { + Name = "zzz", + Subjects = new[] { "foo.*", "bar.*" }, + Storage = StorageType.MemoryStorage, + }; + var ms = NewMemStore(cfg); + + var msg = Bytes("abc"); + for (var i = 0; i < 33; i++) + { + ms.StoreMsg("foo.foo", null, msg, 0); + ms.StoreMsg("foo.bar", null, msg, 0); + ms.StoreMsg("foo.baz", null, msg, 0); + } + for (var i = 0; i < 33; i++) + { + ms.StoreMsg("bar.foo", null, msg, 0); + ms.StoreMsg("bar.bar", null, msg, 0); + ms.StoreMsg("bar.baz", null, msg, 0); + } + + void CheckResults(ulong[] seqs, ulong[] expected) + { + seqs.Length.ShouldBe(expected.Length); + for (var i = 0; i < seqs.Length; i++) + seqs[i].ShouldBe(expected[i]); + } + + // UpTo sequence 3 + var (seqs, err) = ms.MultiLastSeqs(new[] { "foo.*" }, 3, -1); + err.ShouldBeNull(); + CheckResults(seqs!, new ulong[] { 1, 2, 3 }); + + // Up to last sequence of the stream + (seqs, err) = ms.MultiLastSeqs(new[] { "foo.*" }, 0, -1); + err.ShouldBeNull(); + CheckResults(seqs!, new ulong[] { 97, 98, 99 }); + + // Check for bar.* at the end + (seqs, err) = ms.MultiLastSeqs(new[] { "bar.*" }, 0, -1); + err.ShouldBeNull(); + CheckResults(seqs!, new ulong[] { 196, 197, 198 }); + + // This should find nothing + (seqs, err) = ms.MultiLastSeqs(new[] { "bar.*" }, 99, -1); + err.ShouldBeNull(); + CheckResults(seqs!, Array.Empty()); + + // Explicit subjects + (seqs, err) = ms.MultiLastSeqs(new[] { "foo.foo", "foo.bar", "foo.baz" }, 3, -1); + err.ShouldBeNull(); + CheckResults(seqs!, new ulong[] { 1, 2, 3 }); + + (seqs, err) = ms.MultiLastSeqs(new[] { "foo.foo", "foo.bar", "foo.baz" }, 0, -1); + err.ShouldBeNull(); + CheckResults(seqs!, new ulong[] { 97, 98, 99 }); + + (seqs, err) = ms.MultiLastSeqs(new[] { "bar.foo", "bar.bar", "bar.baz" }, 0, -1); + err.ShouldBeNull(); + CheckResults(seqs!, new ulong[] { 196, 197, 198 }); + + (seqs, err) = ms.MultiLastSeqs(new[] { "bar.foo", "bar.bar", "bar.baz" }, 99, -1); + err.ShouldBeNull(); + CheckResults(seqs!, Array.Empty()); + + // Single subject + (seqs, err) = ms.MultiLastSeqs(new[] { "foo.foo" }, 3, -1); + err.ShouldBeNull(); + CheckResults(seqs!, new ulong[] { 1 }); + + // De-duplicate + (seqs, err) = ms.MultiLastSeqs(new[] { "foo.*", "foo.bar" }, 3, -1); + err.ShouldBeNull(); + CheckResults(seqs!, new ulong[] { 1, 2, 3 }); + + (seqs, err) = ms.MultiLastSeqs(new[] { "bar.>", "bar.bar", "bar.baz" }, 0, -1); + err.ShouldBeNull(); + CheckResults(seqs!, new ulong[] { 196, 197, 198 }); + + // All + (seqs, err) = ms.MultiLastSeqs(new[] { ">" }, 0, -1); + err.ShouldBeNull(); + CheckResults(seqs!, new ulong[] { 97, 98, 99, 196, 197, 198 }); + + (seqs, err) = ms.MultiLastSeqs(new[] { ">" }, 99, -1); + err.ShouldBeNull(); + CheckResults(seqs!, new ulong[] { 97, 98, 99 }); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreMultiLastSeqsMaxAllowed (T:2046) + // ----------------------------------------------------------------------- + + [Fact] // T:2046 + public void MemStoreMultiLastSeqsMaxAllowed_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreMultiLastSeqsMaxAllowed line 1010 + var cfg = new StreamConfig + { + Name = "zzz", + Subjects = new[] { "foo.*" }, + Storage = StorageType.MemoryStorage, + }; + var ms = NewMemStore(cfg); + + var msg = Bytes("abc"); + for (var i = 1; i <= 100; i++) + ms.StoreMsg($"foo.{i}", null, msg, 0); + + var (seqs, err) = ms.MultiLastSeqs(new[] { "foo.*" }, 0, 10); + seqs.ShouldBeNull(); + err.ShouldNotBeNull(); + err!.Message.ShouldContain("too many"); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStorePurgeExWithDeletedMsgs (T:2047) + // ----------------------------------------------------------------------- + + [Fact] // T:2047 + public void MemStorePurgeExWithDeletedMsgs_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStorePurgeExWithDeletedMsgs line 1031 + var cfg = new StreamConfig + { + Name = "zzz", + Subjects = new[] { "foo" }, + Storage = StorageType.MemoryStorage, + }; + var ms = NewMemStore(cfg); + + var msg = Bytes("abc"); + for (var i = 1; i <= 10; i++) + ms.StoreMsg("foo", null, msg, 0); + + ms.RemoveMsg(2); + ms.RemoveMsg(9); // This was the bug + + var (n, err) = ms.PurgeEx(string.Empty, 9, 0); + err.ShouldBeNull(); + n.ShouldBe(7UL); + + var state = new StreamState(); + ms.FastState(state); + state.FirstSeq.ShouldBe(10UL); + state.LastSeq.ShouldBe(10UL); + state.Msgs.ShouldBe(1UL); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreDeleteAllFirstSequenceCheck (T:2048) + // ----------------------------------------------------------------------- + + [Fact] // T:2048 + public void MemStoreDeleteAllFirstSequenceCheck_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreDeleteAllFirstSequenceCheck line 1060 + var cfg = new StreamConfig + { + Name = "zzz", + Subjects = new[] { "foo" }, + Storage = StorageType.MemoryStorage, + }; + var ms = NewMemStore(cfg); + + var msg = Bytes("abc"); + for (var i = 1; i <= 10; i++) + ms.StoreMsg("foo", null, msg, 0); + + for (var seq = 1UL; seq <= 10UL; seq++) + ms.RemoveMsg(seq); + + var state = new StreamState(); + ms.FastState(state); + state.FirstSeq.ShouldBe(11UL); + state.LastSeq.ShouldBe(10UL); + state.Msgs.ShouldBe(0UL); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreAllLastSeqs (T:2054) + // ----------------------------------------------------------------------- + + [Fact] // T:2054 + public void MemStoreAllLastSeqs_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreAllLastSeqs line 1266 + var cfg = new StreamConfig + { + Name = "zzz", + Subjects = new[] { "*.*" }, + MaxMsgsPer = 50, + Storage = StorageType.MemoryStorage, + }; + var ms = NewMemStore(cfg); + + var subjs = new[] { "foo.foo", "foo.bar", "foo.baz", "bar.foo", "bar.bar", "bar.baz" }; + var msg = Bytes("abc"); + var rng = new Random(7); + + for (var i = 0; i < 10_000; i++) + { + var subj = subjs[rng.Next(subjs.Length)]; + ms.StoreMsg(subj, null, msg, 0); + } + + var expected = new List(); + var smv = new StoreMsg(); + foreach (var subj in subjs) + { + var sm = ms.LoadLastMsg(subj, smv); + sm.ShouldNotBeNull(); + expected.Add(sm!.Seq); + } + expected.Sort(); + + var (seqs, err) = ms.AllLastSeqs(); + err.ShouldBeNull(); + seqs!.SequenceEqual(expected).ShouldBeTrue(); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // TestMemStoreSubjectForSeq (T:2056) + // ----------------------------------------------------------------------- + + [Fact] // T:2056 + public void MemStoreSubjectForSeq_ShouldSucceed() + { + // Reference: golang/nats-server/server/memstore_test.go:TestMemStoreSubjectForSeq line 1319 + var cfg = new StreamConfig + { + Name = "foo", + Subjects = new[] { "foo.>" }, + Storage = StorageType.MemoryStorage, + }; + var ms = NewMemStore(cfg); + + var (seq, _) = ms.StoreMsg("foo.bar", null, null, 0); + seq.ShouldBe(1UL); + + var (subj, err) = ms.SubjectForSeq(0); + err.ShouldNotBeNull(); + + (subj, err) = ms.SubjectForSeq(1); + err.ShouldBeNull(); + subj.ShouldBe("foo.bar"); + + (subj, err) = ms.SubjectForSeq(2); + err.ShouldNotBeNull(); + + ms.Stop(); + } + + // ----------------------------------------------------------------------- + // Private helper: subject subset matching (mirrors Go subjectIsSubsetMatch) + // ----------------------------------------------------------------------- + + private static bool SubjectIsSubsetMatch(string subject, string filter) + { + var sParts = subject.Split('.'); + var fParts = filter.Split('.'); + var si = 0; + for (var fi = 0; fi < fParts.Length; fi++) + { + if (fParts[fi] == ">") + return si < sParts.Length; + if (si >= sParts.Length) + return false; + if (fParts[fi] != "*" && fParts[fi] != sParts[si]) + return false; + si++; + } + return si == sParts.Length; + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.cs new file mode 100644 index 0000000..4198b4c --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.cs @@ -0,0 +1,575 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Mirrors server/store_test.go (MemStore permutation only; file store permutations deferred). + +using System.Text; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +/// +/// Unit tests for IStreamStore contract, exercised against JetStreamMemStore. +/// Mirrors server/store_test.go (memory permutations only). +/// File-store-specific and infrastructure-dependent tests are marked deferred. +/// +public class StorageEngineTests +{ + // ----------------------------------------------------------------------- + // Helpers + // ----------------------------------------------------------------------- + + private static JetStreamMemStore NewMemStore(StreamConfig cfg) + { + cfg.Storage = StorageType.MemoryStorage; + return new JetStreamMemStore(cfg); + } + + private static byte[] Bytes(string s) => Encoding.UTF8.GetBytes(s); + + // ----------------------------------------------------------------------- + // TestStoreDeleteSlice (T:2943) + // ----------------------------------------------------------------------- + + [Fact] // T:2943 + public void StoreDeleteSlice_ShouldSucceed() + { + // Reference: golang/nats-server/server/store_test.go:TestStoreDeleteSlice line 147 + var ds = new DeleteSlice(new ulong[] { 2 }); + var deletes = new List(); + ds.Range(seq => { deletes.Add(seq); return true; }); + deletes.Count.ShouldBe(1); + deletes[0].ShouldBe(2UL); + + var (first, last, num) = ds.GetState(); + first.ShouldBe(2UL); + last.ShouldBe(2UL); + num.ShouldBe(1UL); + } + + // ----------------------------------------------------------------------- + // TestStoreDeleteRange (T:2944) + // ----------------------------------------------------------------------- + + [Fact] // T:2944 + public void StoreDeleteRange_ShouldSucceed() + { + // Reference: golang/nats-server/server/store_test.go:TestStoreDeleteRange line 163 + var dr = new DeleteRange { First = 2, Num = 1 }; + var deletes = new List(); + dr.Range(seq => { deletes.Add(seq); return true; }); + deletes.Count.ShouldBe(1); + deletes[0].ShouldBe(2UL); + + var (first, last, num) = dr.GetState(); + first.ShouldBe(2UL); + last.ShouldBe(2UL); + num.ShouldBe(1UL); + } + + // ----------------------------------------------------------------------- + // TestStoreSubjectStateConsistency (T:2945) — MemStore permutation only + // ----------------------------------------------------------------------- + + [Fact] // T:2945 + public void StoreSubjectStateConsistency_ShouldSucceed() + { + // Reference: golang/nats-server/server/store_test.go:TestStoreSubjectStateConsistency line 179 + var fs = NewMemStore(new StreamConfig { Name = "TEST", Subjects = new[] { "foo" } }); + + SimpleState GetSubjectState() + { + var ss = fs.SubjectsState("foo"); + ss.TryGetValue("foo", out var result); + return result ?? new SimpleState(); + } + + var smp = new StoreMsg(); + + ulong ExpectFirstSeq() + { + var (sm, _, err) = fs.LoadNextMsg("foo", false, 0, smp).Sm?.Seq is ulong s + ? (smp, s, (Exception?)null) + : (null, 0UL, StoreErrors.ErrStoreMsgNotFound); + var (smr, skip) = fs.LoadNextMsg("foo", false, 0, smp); + smr.ShouldNotBeNull(); + return skip; + } + + ulong ExpectLastSeq() + { + var sm = fs.LoadLastMsg("foo", smp); + sm.ShouldNotBeNull(); + return sm!.Seq; + } + + // Publish 4 messages + for (var i = 0; i < 4; i++) + fs.StoreMsg("foo", null, null, 0); + + var ss = GetSubjectState(); + ss.Msgs.ShouldBe(4UL); + ss.First.ShouldBe(1UL); + ss.Last.ShouldBe(4UL); + + // Verify first/last via LoadNextMsg / LoadLastMsg + var (firstSm, firstSeq) = fs.LoadNextMsg("foo", false, 0, smp); + firstSm.ShouldNotBeNull(); + firstSeq.ShouldBe(1UL); + var lastSm = fs.LoadLastMsg("foo", smp); + lastSm!.Seq.ShouldBe(4UL); + + // Remove first message + var (removed, _) = fs.RemoveMsg(1); + removed.ShouldBeTrue(); + + ss = GetSubjectState(); + ss.Msgs.ShouldBe(3UL); + ss.First.ShouldBe(2UL); + ss.Last.ShouldBe(4UL); + + (firstSm, firstSeq) = fs.LoadNextMsg("foo", false, 0, smp); + firstSm.ShouldNotBeNull(); + firstSeq.ShouldBe(2UL); + lastSm = fs.LoadLastMsg("foo", smp); + lastSm!.Seq.ShouldBe(4UL); + + // Remove last message + (removed, _) = fs.RemoveMsg(4); + removed.ShouldBeTrue(); + + ss = GetSubjectState(); + ss.Msgs.ShouldBe(2UL); + ss.First.ShouldBe(2UL); + ss.Last.ShouldBe(3UL); + + (firstSm, firstSeq) = fs.LoadNextMsg("foo", false, 0, smp); + firstSm.ShouldNotBeNull(); + firstSeq.ShouldBe(2UL); + lastSm = fs.LoadLastMsg("foo", smp); + lastSm!.Seq.ShouldBe(3UL); + + // Remove seq 2 + (removed, _) = fs.RemoveMsg(2); + removed.ShouldBeTrue(); + + ss = GetSubjectState(); + ss.Msgs.ShouldBe(1UL); + ss.First.ShouldBe(3UL); + ss.Last.ShouldBe(3UL); + + (firstSm, firstSeq) = fs.LoadNextMsg("foo", false, 0, smp); + firstSm.ShouldNotBeNull(); + firstSeq.ShouldBe(3UL); + lastSm = fs.LoadLastMsg("foo", smp); + lastSm!.Seq.ShouldBe(3UL); + + // Publish 3 more + for (var i = 0; i < 3; i++) + fs.StoreMsg("foo", null, null, 0); + + ss = GetSubjectState(); + ss.Msgs.ShouldBe(4UL); + ss.First.ShouldBe(3UL); + ss.Last.ShouldBe(7UL); + + // Remove seq 7 and seq 3 + (removed, _) = fs.RemoveMsg(7); + removed.ShouldBeTrue(); + (removed, _) = fs.RemoveMsg(3); + removed.ShouldBeTrue(); + + // Remove seq 5 (the now-first) + (removed, _) = fs.RemoveMsg(5); + removed.ShouldBeTrue(); + + ss = GetSubjectState(); + ss.Msgs.ShouldBe(1UL); + ss.First.ShouldBe(6UL); + ss.Last.ShouldBe(6UL); + + (firstSm, firstSeq) = fs.LoadNextMsg("foo", false, 0, smp); + firstSm.ShouldNotBeNull(); + firstSeq.ShouldBe(6UL); + lastSm = fs.LoadLastMsg("foo", smp); + lastSm!.Seq.ShouldBe(6UL); + + // Store + immediately remove seq 8, then store seq 9 + fs.StoreMsg("foo", null, null, 0); + (removed, _) = fs.RemoveMsg(8); + removed.ShouldBeTrue(); + fs.StoreMsg("foo", null, null, 0); + + ss = GetSubjectState(); + ss.Msgs.ShouldBe(2UL); + ss.First.ShouldBe(6UL); + ss.Last.ShouldBe(9UL); + + (firstSm, firstSeq) = fs.LoadNextMsg("foo", false, 0, smp); + firstSm.ShouldNotBeNull(); + firstSeq.ShouldBe(6UL); + lastSm = fs.LoadLastMsg("foo", smp); + lastSm!.Seq.ShouldBe(9UL); + + fs.Stop(); + } + + // ----------------------------------------------------------------------- + // TestStoreMaxMsgsPerUpdateBug (T:2947) — MemStore permutation only + // ----------------------------------------------------------------------- + + [Fact] // T:2947 + public void StoreMaxMsgsPerUpdateBug_ShouldSucceed() + { + // Reference: golang/nats-server/server/store_test.go:TestStoreMaxMsgsPerUpdateBug line 405 + var cfg = new StreamConfig + { + Name = "TEST", + Subjects = new[] { "foo" }, + MaxMsgsPer = 0, + }; + var fs = NewMemStore(cfg); + + for (var i = 0; i < 5; i++) + fs.StoreMsg("foo", null, null, 0); + + var ss = fs.State(); + ss.Msgs.ShouldBe(5UL); + ss.FirstSeq.ShouldBe(1UL); + ss.LastSeq.ShouldBe(5UL); + + // Update max messages per-subject from 0 (infinite) to 1 + cfg.MaxMsgsPer = 1; + fs.UpdateConfig(cfg); + + // Only one message should remain + ss = fs.State(); + ss.Msgs.ShouldBe(1UL); + ss.FirstSeq.ShouldBe(5UL); + ss.LastSeq.ShouldBe(5UL); + + // Update to invalid value (< -1) — should clamp to -1 + cfg.MaxMsgsPer = -2; + fs.UpdateConfig(cfg); + cfg.MaxMsgsPer.ShouldBe(-1L); + + fs.Stop(); + } + + // ----------------------------------------------------------------------- + // TestStoreCompactCleansUpDmap (T:2948) — MemStore permutation only + // ----------------------------------------------------------------------- + + [Fact] // T:2948 + public void StoreCompactCleansUpDmap_ShouldSucceed() + { + // Reference: golang/nats-server/server/store_test.go:TestStoreCompactCleansUpDmap line 449 + // We run for compact sequences 2, 3, 4 + for (var cseq = 2UL; cseq <= 4UL; cseq++) + { + var cfg = new StreamConfig + { + Name = "TEST", + Subjects = new[] { "foo" }, + MaxMsgsPer = 0, + }; + var fs = NewMemStore(cfg); + + // Publish 3 messages; no interior deletes + for (var i = 0; i < 3; i++) + fs.StoreMsg("foo", null, null, 0); + + // Remove one message in the middle = interior delete + var (removed, _) = fs.RemoveMsg(2); + removed.ShouldBeTrue(); + + // The dmap should have 1 entry (seq 2) — verify via State().NumDeleted + var state = fs.State(); + state.NumDeleted.ShouldBe(1); + + // Compact — must clean up the interior delete + var (_, err) = fs.Compact(cseq); + err.ShouldBeNull(); + + // After compaction, no deleted entries in the range + state = fs.State(); + state.NumDeleted.ShouldBe(0); + + // Validate first/last sequence + var expectedFirst = Math.Max(3UL, cseq); + state.FirstSeq.ShouldBe(expectedFirst); + state.LastSeq.ShouldBe(3UL); + + fs.Stop(); + } + } + + // ----------------------------------------------------------------------- + // TestStoreTruncateCleansUpDmap (T:2949) — MemStore permutation only + // ----------------------------------------------------------------------- + + [Fact] // T:2949 + public void StoreTruncateCleansUpDmap_ShouldSucceed() + { + // Reference: golang/nats-server/server/store_test.go:TestStoreTruncateCleansUpDmap line 500 + // We run for truncate sequences 0 and 1 + for (var tseq = 0UL; tseq <= 1UL; tseq++) + { + var cfg = new StreamConfig + { + Name = "TEST", + Subjects = new[] { "foo" }, + MaxMsgsPer = 0, + }; + var fs = NewMemStore(cfg); + + // Publish 3 messages + for (var i = 0; i < 3; i++) + fs.StoreMsg("foo", null, null, 0); + + // Remove middle message = interior delete + var (removed, _) = fs.RemoveMsg(2); + removed.ShouldBeTrue(); + + var state = fs.State(); + state.NumDeleted.ShouldBe(1); + + // Truncate + fs.Truncate(tseq); + + state = fs.State(); + state.NumDeleted.ShouldBe(0); + + // Validate first/last sequence + var expectedFirst = Math.Min(1UL, tseq); + state.FirstSeq.ShouldBe(expectedFirst); + state.LastSeq.ShouldBe(tseq); + + fs.Stop(); + } + } + + // ----------------------------------------------------------------------- + // TestStorePurgeExZero (T:2950) — MemStore permutation only + // ----------------------------------------------------------------------- + + [Fact] // T:2950 + public void StorePurgeExZero_ShouldSucceed() + { + // Reference: golang/nats-server/server/store_test.go:TestStorePurgeExZero line 552 + var fs = NewMemStore(new StreamConfig { Name = "TEST", Subjects = new[] { "foo" } }); + + // Simple purge all + var (_, err) = fs.Purge(); + err.ShouldBeNull(); + + var ss = fs.State(); + ss.FirstSeq.ShouldBe(1UL); + ss.LastSeq.ShouldBe(0UL); + + // PurgeEx(seq=0) must equal Purge() + (_, err) = fs.PurgeEx(string.Empty, 0, 0); + err.ShouldBeNull(); + + ss = fs.State(); + ss.FirstSeq.ShouldBe(1UL); + ss.LastSeq.ShouldBe(0UL); + + fs.Stop(); + } + + // ----------------------------------------------------------------------- + // TestStoreGetSeqFromTimeWithInteriorDeletesGap (T:2955) — MemStore permutation only + // ----------------------------------------------------------------------- + + [Fact] // T:2955 + public void StoreGetSeqFromTimeWithInteriorDeletesGap_ShouldSucceed() + { + // Reference: golang/nats-server/server/store_test.go:TestStoreGetSeqFromTimeWithInteriorDeletesGap line 874 + // Go: start = ts from StoreMsg at i==1; ts := time.Unix(0, start).UTC() + // .NET: convert the 100-ns store timestamp directly to DateTime (same precision). + var fs = NewMemStore(new StreamConfig { Name = "zzz", Subjects = new[] { "foo" } }); + + long start = 0; + for (var i = 0; i < 10; i++) + { + var (_, ts) = fs.StoreMsg("foo", null, null, 0); + if (i == 1) + start = ts; // exact timestamp of seq 2 + } + + // Create a delete gap at seqs 4-7 + for (var seq = 4UL; seq <= 7UL; seq++) + fs.RemoveMsg(seq); + + // Convert 100-ns-since-epoch to DateTime (mirrors Go's time.Unix(0, start)) + const long UnixEpochTicks = 621355968000000000L; + var t = new DateTime(start / 100L + UnixEpochTicks, DateTimeKind.Utc); + + var gotSeq = fs.GetSeqFromTime(t); + gotSeq.ShouldBe(2UL); + + fs.Stop(); + } + + // ----------------------------------------------------------------------- + // TestStoreGetSeqFromTimeWithTrailingDeletes (T:2956) — MemStore permutation only + // ----------------------------------------------------------------------- + + [Fact] // T:2956 + public void StoreGetSeqFromTimeWithTrailingDeletes_ShouldSucceed() + { + // Reference: golang/nats-server/server/store_test.go:TestStoreGetSeqFromTimeWithTrailingDeletes line 900 + // Go: start = ts from StoreMsg at i==1; ts := time.Unix(0, start).UTC() + // .NET: convert the 100-ns store timestamp directly to DateTime (same precision). + var fs = NewMemStore(new StreamConfig { Name = "zzz", Subjects = new[] { "foo" } }); + + long start = 0; + for (var i = 0; i < 3; i++) + { + var (_, ts) = fs.StoreMsg("foo", null, null, 0); + if (i == 1) + start = ts; // exact timestamp of seq 2 + } + + fs.RemoveMsg(3); + + // Convert 100-ns-since-epoch to DateTime (mirrors Go's time.Unix(0, start)) + const long UnixEpochTicks = 621355968000000000L; + var t = new DateTime(start / 100L + UnixEpochTicks, DateTimeKind.Utc); + + var gotSeq = fs.GetSeqFromTime(t); + gotSeq.ShouldBe(2UL); + + fs.Stop(); + } + + // ----------------------------------------------------------------------- + // TestFileStoreMultiLastSeqsAndLoadLastMsgWithLazySubjectState (T:2957) + // MemStore permutation only + // ----------------------------------------------------------------------- + + [Fact] // T:2957 + public void FileStoreMultiLastSeqsAndLoadLastMsgWithLazySubjectState_ShouldSucceed() + { + // Reference: golang/nats-server/server/store_test.go:TestFileStoreMultiLastSeqsAndLoadLastMsgWithLazySubjectState line 921 + var fs = NewMemStore(new StreamConfig { Name = "zzz", Subjects = new[] { "foo" } }); + + for (var i = 0; i < 3; i++) + fs.StoreMsg("foo", null, null, 0); + + var (seqs, err) = fs.MultiLastSeqs(new[] { "foo" }, 0, 0); + err.ShouldBeNull(); + seqs!.Length.ShouldBe(1); + seqs![0].ShouldBe(3UL); + + var (removed, _) = fs.RemoveMsg(3); + removed.ShouldBeTrue(); + + (seqs, err) = fs.MultiLastSeqs(new[] { "foo" }, 0, 0); + err.ShouldBeNull(); + seqs!.Length.ShouldBe(1); + seqs![0].ShouldBe(2UL); + + fs.StoreMsg("foo", null, null, 0); + var sm = fs.LoadLastMsg("foo", null); + sm.ShouldNotBeNull(); + sm!.Seq.ShouldBe(4UL); + + (removed, _) = fs.RemoveMsg(4); + removed.ShouldBeTrue(); + + sm = fs.LoadLastMsg("foo", null); + sm.ShouldNotBeNull(); + sm!.Seq.ShouldBe(2UL); + + fs.Stop(); + } + + // ----------------------------------------------------------------------- + // TestStoreDiscardNew (T:2954) — MemStore permutation only + // ----------------------------------------------------------------------- + + [Fact] // T:2954 + public void StoreDiscardNew_ShouldSucceed() + { + // Reference: golang/nats-server/server/store_test.go:TestStoreDiscardNew line 788 + // Helper that runs the discard-new test for a given config modifier + void Test(Action updateConfig, Exception? expectedErr) + { + var cfg = new StreamConfig + { + Name = "zzz", + Subjects = new[] { "foo" }, + Discard = DiscardPolicy.DiscardNew, + }; + updateConfig(cfg); + cfg.Storage = StorageType.MemoryStorage; + var fs = new JetStreamMemStore(cfg); + + var ts = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L; + var expectedSeq = 1UL; + + void RequireState() + { + var state = fs.State(); + state.Msgs.ShouldBe(1UL); + state.FirstSeq.ShouldBe(expectedSeq); + state.LastSeq.ShouldBe(expectedSeq); + } + + fs.StoreMsg("foo", null, null, 0); + + // StoreRawMsg with discardNewCheck=true + if (expectedErr == null) + { + fs.StoreRawMsg("foo", null, null, 0, ts, 0, true); + expectedSeq++; + } + else + { + Should.Throw(() => fs.StoreRawMsg("foo", null, null, 0, ts, 0, true)); + } + RequireState(); + + // StoreRawMsg with discardNewCheck=false (followers must always accept) + fs.StoreRawMsg("foo", null, null, 0, ts, 0, false); + expectedSeq++; + + // For MaxMsgsPer we stay at 1 msg; otherwise 2 msgs + if (cfg.MaxMsgsPer > 0) + { + RequireState(); + } + else + { + var state = fs.State(); + state.Msgs.ShouldBe(2UL); + state.FirstSeq.ShouldBe(expectedSeq - 1); + state.LastSeq.ShouldBe(expectedSeq); + } + + fs.Stop(); + } + + Test(cfg => cfg.MaxMsgs = 1, StoreErrors.ErrMaxMsgs); + Test(cfg => cfg.MaxBytes = 33, StoreErrors.ErrMaxBytes); + Test(cfg => cfg.MaxMsgsPer = 1, null); + Test(cfg => { cfg.DiscardNewPer = true; cfg.MaxMsgsPer = 1; }, StoreErrors.ErrMaxMsgsPerSubject); + Test(cfg => { cfg.MaxMsgs = 1; cfg.MaxMsgsPer = 1; }, null); + Test(cfg => { cfg.MaxBytes = 33; cfg.MaxMsgsPer = 1; }, null); + Test(cfg => { cfg.DiscardNewPer = true; cfg.MaxMsgs = 1; cfg.MaxMsgsPer = 1; }, StoreErrors.ErrMaxMsgsPerSubject); + Test(cfg => { cfg.DiscardNewPer = true; cfg.MaxBytes = 33; cfg.MaxMsgsPer = 1; }, StoreErrors.ErrMaxMsgsPerSubject); + } +} diff --git a/porting.db b/porting.db index 70f16ac..d3fe906 100644 Binary files a/porting.db and b/porting.db differ diff --git a/reports/current.md b/reports/current.md index bf4d42e..a891540 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-02-27 00:15:57 UTC +Generated: 2026-02-27 00:35:59 UTC ## Modules (12 total) @@ -21,10 +21,10 @@ Generated: 2026-02-27 00:15:57 UTC | Status | Count | |--------|-------| -| complete | 214 | -| deferred | 215 | +| complete | 252 | +| deferred | 235 | | n_a | 187 | -| not_started | 2527 | +| not_started | 2469 | | verified | 114 | ## Library Mappings (36 total) @@ -36,4 +36,4 @@ Generated: 2026-02-27 00:15:57 UTC ## Overall Progress -**4199/6942 items complete (60.5%)** +**4237/6942 items complete (61.0%)** diff --git a/reports/report_917cd33.md b/reports/report_917cd33.md new file mode 100644 index 0000000..a891540 --- /dev/null +++ b/reports/report_917cd33.md @@ -0,0 +1,39 @@ +# NATS .NET Porting Status Report + +Generated: 2026-02-27 00:35:59 UTC + +## Modules (12 total) + +| Status | Count | +|--------|-------| +| not_started | 1 | +| verified | 11 | + +## Features (3673 total) + +| Status | Count | +|--------|-------| +| complete | 3368 | +| n_a | 26 | +| verified | 279 | + +## Unit Tests (3257 total) + +| Status | Count | +|--------|-------| +| complete | 252 | +| deferred | 235 | +| n_a | 187 | +| not_started | 2469 | +| verified | 114 | + +## Library Mappings (36 total) + +| Status | Count | +|--------|-------| +| mapped | 36 | + + +## Overall Progress + +**4237/6942 items complete (61.0%)**