// Copyright 2012-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // // Mirrors server/store_test.go (MemStore permutation only; file store permutations deferred). using System.Text; using Shouldly; using ZB.MOM.NatsNet.Server; namespace ZB.MOM.NatsNet.Server.Tests.JetStream; /// /// Unit tests for IStreamStore contract, exercised against JetStreamMemStore. /// Mirrors server/store_test.go (memory permutations only). /// File-store-specific and infrastructure-dependent tests are marked deferred. /// public class StorageEngineTests { // ----------------------------------------------------------------------- // Helpers // ----------------------------------------------------------------------- private static JetStreamMemStore NewMemStore(StreamConfig cfg) { cfg.Storage = StorageType.MemoryStorage; return JetStreamMemStore.NewMemStore(cfg); } private static byte[] Bytes(string s) => Encoding.UTF8.GetBytes(s); // ----------------------------------------------------------------------- // TestStoreDeleteSlice (T:2943) // ----------------------------------------------------------------------- [Fact] // T:2943 public void StoreDeleteSlice_ShouldSucceed() { // Reference: golang/nats-server/server/store_test.go:TestStoreDeleteSlice line 147 var ds = new DeleteSlice(new ulong[] { 2 }); var deletes = new List(); ds.Range(seq => { deletes.Add(seq); return true; }); deletes.Count.ShouldBe(1); deletes[0].ShouldBe(2UL); var (first, last, num) = ds.GetState(); first.ShouldBe(2UL); last.ShouldBe(2UL); num.ShouldBe(1UL); } // ----------------------------------------------------------------------- // TestStoreDeleteRange (T:2944) // ----------------------------------------------------------------------- [Fact] // T:2944 public void StoreDeleteRange_ShouldSucceed() { // Reference: golang/nats-server/server/store_test.go:TestStoreDeleteRange line 163 var dr = new DeleteRange { First = 2, Num = 1 }; var deletes = new List(); dr.Range(seq => { deletes.Add(seq); return true; }); deletes.Count.ShouldBe(1); deletes[0].ShouldBe(2UL); var (first, last, num) = dr.GetState(); first.ShouldBe(2UL); last.ShouldBe(2UL); num.ShouldBe(1UL); } // ----------------------------------------------------------------------- // TestStoreSubjectStateConsistency (T:2945) — MemStore permutation only // ----------------------------------------------------------------------- [Fact] // T:2945 public void StoreSubjectStateConsistency_ShouldSucceed() { // Reference: golang/nats-server/server/store_test.go:TestStoreSubjectStateConsistency line 179 var fs = NewMemStore(new StreamConfig { Name = "TEST", Subjects = new[] { "foo" } }); SimpleState GetSubjectState() { var ss = fs.SubjectsState("foo"); ss.TryGetValue("foo", out var result); return result ?? new SimpleState(); } var smp = new StoreMsg(); ulong ExpectFirstSeq() { var (sm, _, err) = fs.LoadNextMsg("foo", false, 0, smp).Sm?.Seq is ulong s ? (smp, s, (Exception?)null) : (null, 0UL, StoreErrors.ErrStoreMsgNotFound); var (smr, skip) = fs.LoadNextMsg("foo", false, 0, smp); smr.ShouldNotBeNull(); return skip; } ulong ExpectLastSeq() { var sm = fs.LoadLastMsg("foo", smp); sm.ShouldNotBeNull(); return sm!.Seq; } // Publish 4 messages for (var i = 0; i < 4; i++) fs.StoreMsg("foo", null, null, 0); var ss = GetSubjectState(); ss.Msgs.ShouldBe(4UL); ss.First.ShouldBe(1UL); ss.Last.ShouldBe(4UL); // Verify first/last via LoadNextMsg / LoadLastMsg var (firstSm, firstSeq) = fs.LoadNextMsg("foo", false, 0, smp); firstSm.ShouldNotBeNull(); firstSeq.ShouldBe(1UL); var lastSm = fs.LoadLastMsg("foo", smp); lastSm!.Seq.ShouldBe(4UL); // Remove first message var (removed, _) = fs.RemoveMsg(1); removed.ShouldBeTrue(); ss = GetSubjectState(); ss.Msgs.ShouldBe(3UL); ss.First.ShouldBe(2UL); ss.Last.ShouldBe(4UL); (firstSm, firstSeq) = fs.LoadNextMsg("foo", false, 0, smp); firstSm.ShouldNotBeNull(); firstSeq.ShouldBe(2UL); lastSm = fs.LoadLastMsg("foo", smp); lastSm!.Seq.ShouldBe(4UL); // Remove last message (removed, _) = fs.RemoveMsg(4); removed.ShouldBeTrue(); ss = GetSubjectState(); ss.Msgs.ShouldBe(2UL); ss.First.ShouldBe(2UL); ss.Last.ShouldBe(3UL); (firstSm, firstSeq) = fs.LoadNextMsg("foo", false, 0, smp); firstSm.ShouldNotBeNull(); firstSeq.ShouldBe(2UL); lastSm = fs.LoadLastMsg("foo", smp); lastSm!.Seq.ShouldBe(3UL); // Remove seq 2 (removed, _) = fs.RemoveMsg(2); removed.ShouldBeTrue(); ss = GetSubjectState(); ss.Msgs.ShouldBe(1UL); ss.First.ShouldBe(3UL); ss.Last.ShouldBe(3UL); (firstSm, firstSeq) = fs.LoadNextMsg("foo", false, 0, smp); firstSm.ShouldNotBeNull(); firstSeq.ShouldBe(3UL); lastSm = fs.LoadLastMsg("foo", smp); lastSm!.Seq.ShouldBe(3UL); // Publish 3 more for (var i = 0; i < 3; i++) fs.StoreMsg("foo", null, null, 0); ss = GetSubjectState(); ss.Msgs.ShouldBe(4UL); ss.First.ShouldBe(3UL); ss.Last.ShouldBe(7UL); // Remove seq 7 and seq 3 (removed, _) = fs.RemoveMsg(7); removed.ShouldBeTrue(); (removed, _) = fs.RemoveMsg(3); removed.ShouldBeTrue(); // Remove seq 5 (the now-first) (removed, _) = fs.RemoveMsg(5); removed.ShouldBeTrue(); ss = GetSubjectState(); ss.Msgs.ShouldBe(1UL); ss.First.ShouldBe(6UL); ss.Last.ShouldBe(6UL); (firstSm, firstSeq) = fs.LoadNextMsg("foo", false, 0, smp); firstSm.ShouldNotBeNull(); firstSeq.ShouldBe(6UL); lastSm = fs.LoadLastMsg("foo", smp); lastSm!.Seq.ShouldBe(6UL); // Store + immediately remove seq 8, then store seq 9 fs.StoreMsg("foo", null, null, 0); (removed, _) = fs.RemoveMsg(8); removed.ShouldBeTrue(); fs.StoreMsg("foo", null, null, 0); ss = GetSubjectState(); ss.Msgs.ShouldBe(2UL); ss.First.ShouldBe(6UL); ss.Last.ShouldBe(9UL); (firstSm, firstSeq) = fs.LoadNextMsg("foo", false, 0, smp); firstSm.ShouldNotBeNull(); firstSeq.ShouldBe(6UL); lastSm = fs.LoadLastMsg("foo", smp); lastSm!.Seq.ShouldBe(9UL); fs.Stop(); } // ----------------------------------------------------------------------- // TestStoreMaxMsgsPerUpdateBug (T:2947) — MemStore permutation only // ----------------------------------------------------------------------- [Fact] // T:2947 public void StoreMaxMsgsPerUpdateBug_ShouldSucceed() { // Reference: golang/nats-server/server/store_test.go:TestStoreMaxMsgsPerUpdateBug line 405 var cfg = new StreamConfig { Name = "TEST", Subjects = new[] { "foo" }, MaxMsgsPer = 0, }; var fs = NewMemStore(cfg); for (var i = 0; i < 5; i++) fs.StoreMsg("foo", null, null, 0); var ss = fs.State(); ss.Msgs.ShouldBe(5UL); ss.FirstSeq.ShouldBe(1UL); ss.LastSeq.ShouldBe(5UL); // Update max messages per-subject from 0 (infinite) to 1 cfg.MaxMsgsPer = 1; fs.UpdateConfig(cfg); // Only one message should remain ss = fs.State(); ss.Msgs.ShouldBe(1UL); ss.FirstSeq.ShouldBe(5UL); ss.LastSeq.ShouldBe(5UL); // Update to invalid value (< -1) — should clamp to -1 cfg.MaxMsgsPer = -2; fs.UpdateConfig(cfg); cfg.MaxMsgsPer.ShouldBe(-1L); fs.Stop(); } // ----------------------------------------------------------------------- // TestStoreCompactCleansUpDmap (T:2948) — MemStore permutation only // ----------------------------------------------------------------------- [Fact] // T:2948 public void StoreCompactCleansUpDmap_ShouldSucceed() { // Reference: golang/nats-server/server/store_test.go:TestStoreCompactCleansUpDmap line 449 // We run for compact sequences 2, 3, 4 for (var cseq = 2UL; cseq <= 4UL; cseq++) { var cfg = new StreamConfig { Name = "TEST", Subjects = new[] { "foo" }, MaxMsgsPer = 0, }; var fs = NewMemStore(cfg); // Publish 3 messages; no interior deletes for (var i = 0; i < 3; i++) fs.StoreMsg("foo", null, null, 0); // Remove one message in the middle = interior delete var (removed, _) = fs.RemoveMsg(2); removed.ShouldBeTrue(); // The dmap should have 1 entry (seq 2) — verify via State().NumDeleted var state = fs.State(); state.NumDeleted.ShouldBe(1); // Compact — must clean up the interior delete var (_, err) = fs.Compact(cseq); err.ShouldBeNull(); // After compaction, no deleted entries in the range state = fs.State(); state.NumDeleted.ShouldBe(0); // Validate first/last sequence var expectedFirst = Math.Max(3UL, cseq); state.FirstSeq.ShouldBe(expectedFirst); state.LastSeq.ShouldBe(3UL); fs.Stop(); } } // ----------------------------------------------------------------------- // TestStoreTruncateCleansUpDmap (T:2949) — MemStore permutation only // ----------------------------------------------------------------------- [Fact] // T:2949 public void StoreTruncateCleansUpDmap_ShouldSucceed() { // Reference: golang/nats-server/server/store_test.go:TestStoreTruncateCleansUpDmap line 500 // We run for truncate sequences 0 and 1 for (var tseq = 0UL; tseq <= 1UL; tseq++) { var cfg = new StreamConfig { Name = "TEST", Subjects = new[] { "foo" }, MaxMsgsPer = 0, }; var fs = NewMemStore(cfg); // Publish 3 messages for (var i = 0; i < 3; i++) fs.StoreMsg("foo", null, null, 0); // Remove middle message = interior delete var (removed, _) = fs.RemoveMsg(2); removed.ShouldBeTrue(); var state = fs.State(); state.NumDeleted.ShouldBe(1); // Truncate fs.Truncate(tseq); state = fs.State(); state.NumDeleted.ShouldBe(0); // Validate first/last sequence var expectedFirst = Math.Min(1UL, tseq); state.FirstSeq.ShouldBe(expectedFirst); state.LastSeq.ShouldBe(tseq); fs.Stop(); } } // ----------------------------------------------------------------------- // TestStorePurgeExZero (T:2950) — MemStore permutation only // ----------------------------------------------------------------------- [Fact] // T:2950 public void StorePurgeExZero_ShouldSucceed() { // Reference: golang/nats-server/server/store_test.go:TestStorePurgeExZero line 552 var fs = NewMemStore(new StreamConfig { Name = "TEST", Subjects = new[] { "foo" } }); // Simple purge all var (_, err) = fs.Purge(); err.ShouldBeNull(); var ss = fs.State(); ss.FirstSeq.ShouldBe(1UL); ss.LastSeq.ShouldBe(0UL); // PurgeEx(seq=0) must equal Purge() (_, err) = fs.PurgeEx(string.Empty, 0, 0); err.ShouldBeNull(); ss = fs.State(); ss.FirstSeq.ShouldBe(1UL); ss.LastSeq.ShouldBe(0UL); fs.Stop(); } // ----------------------------------------------------------------------- // TestStoreGetSeqFromTimeWithInteriorDeletesGap (T:2955) — MemStore permutation only // ----------------------------------------------------------------------- [Fact] // T:2955 public void StoreGetSeqFromTimeWithInteriorDeletesGap_ShouldSucceed() { // Reference: golang/nats-server/server/store_test.go:TestStoreGetSeqFromTimeWithInteriorDeletesGap line 874 // Go: start = ts from StoreMsg at i==1; ts := time.Unix(0, start).UTC() // .NET: convert the 100-ns store timestamp directly to DateTime (same precision). var fs = NewMemStore(new StreamConfig { Name = "zzz", Subjects = new[] { "foo" } }); long start = 0; for (var i = 0; i < 10; i++) { var (_, ts) = fs.StoreMsg("foo", null, null, 0); if (i == 1) start = ts; // exact timestamp of seq 2 } // Create a delete gap at seqs 4-7 for (var seq = 4UL; seq <= 7UL; seq++) fs.RemoveMsg(seq); // Convert 100-ns-since-epoch to DateTime (mirrors Go's time.Unix(0, start)) const long UnixEpochTicks = 621355968000000000L; var t = new DateTime(start / 100L + UnixEpochTicks, DateTimeKind.Utc); var gotSeq = fs.GetSeqFromTime(t); gotSeq.ShouldBe(2UL); fs.Stop(); } // ----------------------------------------------------------------------- // TestStoreGetSeqFromTimeWithTrailingDeletes (T:2956) — MemStore permutation only // ----------------------------------------------------------------------- [Fact] // T:2956 public void StoreGetSeqFromTimeWithTrailingDeletes_ShouldSucceed() { // Reference: golang/nats-server/server/store_test.go:TestStoreGetSeqFromTimeWithTrailingDeletes line 900 // Go: start = ts from StoreMsg at i==1; ts := time.Unix(0, start).UTC() // .NET: convert the 100-ns store timestamp directly to DateTime (same precision). var fs = NewMemStore(new StreamConfig { Name = "zzz", Subjects = new[] { "foo" } }); long start = 0; for (var i = 0; i < 3; i++) { var (_, ts) = fs.StoreMsg("foo", null, null, 0); if (i == 1) start = ts; // exact timestamp of seq 2 } fs.RemoveMsg(3); // Convert 100-ns-since-epoch to DateTime (mirrors Go's time.Unix(0, start)) const long UnixEpochTicks = 621355968000000000L; var t = new DateTime(start / 100L + UnixEpochTicks, DateTimeKind.Utc); var gotSeq = fs.GetSeqFromTime(t); gotSeq.ShouldBe(2UL); fs.Stop(); } // ----------------------------------------------------------------------- // TestFileStoreMultiLastSeqsAndLoadLastMsgWithLazySubjectState (T:2957) // MemStore permutation only // ----------------------------------------------------------------------- [Fact] // T:2957 public void FileStoreMultiLastSeqsAndLoadLastMsgWithLazySubjectState_ShouldSucceed() { // Reference: golang/nats-server/server/store_test.go:TestFileStoreMultiLastSeqsAndLoadLastMsgWithLazySubjectState line 921 var fs = NewMemStore(new StreamConfig { Name = "zzz", Subjects = new[] { "foo" } }); for (var i = 0; i < 3; i++) fs.StoreMsg("foo", null, null, 0); var (seqs, err) = fs.MultiLastSeqs(new[] { "foo" }, 0, 0); err.ShouldBeNull(); seqs!.Length.ShouldBe(1); seqs![0].ShouldBe(3UL); var (removed, _) = fs.RemoveMsg(3); removed.ShouldBeTrue(); (seqs, err) = fs.MultiLastSeqs(new[] { "foo" }, 0, 0); err.ShouldBeNull(); seqs!.Length.ShouldBe(1); seqs![0].ShouldBe(2UL); fs.StoreMsg("foo", null, null, 0); var sm = fs.LoadLastMsg("foo", null); sm.ShouldNotBeNull(); sm!.Seq.ShouldBe(4UL); (removed, _) = fs.RemoveMsg(4); removed.ShouldBeTrue(); sm = fs.LoadLastMsg("foo", null); sm.ShouldNotBeNull(); sm!.Seq.ShouldBe(2UL); fs.Stop(); } // ----------------------------------------------------------------------- // TestStoreDiscardNew (T:2954) — MemStore permutation only // ----------------------------------------------------------------------- [Fact] // T:2954 public void StoreDiscardNew_ShouldSucceed() { // Reference: golang/nats-server/server/store_test.go:TestStoreDiscardNew line 788 // Helper that runs the discard-new test for a given config modifier void Test(Action updateConfig, Exception? expectedErr) { var cfg = new StreamConfig { Name = "zzz", Subjects = new[] { "foo" }, Discard = DiscardPolicy.DiscardNew, }; updateConfig(cfg); cfg.Storage = StorageType.MemoryStorage; var fs = new JetStreamMemStore(cfg); var ts = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L; var expectedSeq = 1UL; void RequireState() { var state = fs.State(); state.Msgs.ShouldBe(1UL); state.FirstSeq.ShouldBe(expectedSeq); state.LastSeq.ShouldBe(expectedSeq); } fs.StoreMsg("foo", null, null, 0); // StoreRawMsg with discardNewCheck=true if (expectedErr == null) { fs.StoreRawMsg("foo", null, null, 0, ts, 0, true); expectedSeq++; } else { Should.Throw(() => fs.StoreRawMsg("foo", null, null, 0, ts, 0, true)); } RequireState(); // StoreRawMsg with discardNewCheck=false (followers must always accept) fs.StoreRawMsg("foo", null, null, 0, ts, 0, false); expectedSeq++; // For MaxMsgsPer we stay at 1 msg; otherwise 2 msgs if (cfg.MaxMsgsPer > 0) { RequireState(); } else { var state = fs.State(); state.Msgs.ShouldBe(2UL); state.FirstSeq.ShouldBe(expectedSeq - 1); state.LastSeq.ShouldBe(expectedSeq); } fs.Stop(); } Test(cfg => cfg.MaxMsgs = 1, StoreErrors.ErrMaxMsgs); Test(cfg => cfg.MaxBytes = 33, StoreErrors.ErrMaxBytes); Test(cfg => cfg.MaxMsgsPer = 1, null); Test(cfg => { cfg.DiscardNewPer = true; cfg.MaxMsgsPer = 1; }, StoreErrors.ErrMaxMsgsPerSubject); Test(cfg => { cfg.MaxMsgs = 1; cfg.MaxMsgsPer = 1; }, null); Test(cfg => { cfg.MaxBytes = 33; cfg.MaxMsgsPer = 1; }, null); Test(cfg => { cfg.DiscardNewPer = true; cfg.MaxMsgs = 1; cfg.MaxMsgsPer = 1; }, StoreErrors.ErrMaxMsgsPerSubject); Test(cfg => { cfg.DiscardNewPer = true; cfg.MaxBytes = 33; cfg.MaxMsgsPer = 1; }, StoreErrors.ErrMaxMsgsPerSubject); } }