// Copyright 2012-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Mirrors server/store_test.go (MemStore permutation only; file store permutations deferred).
using System.Text;
using Shouldly;
using ZB.MOM.NatsNet.Server;
namespace ZB.MOM.NatsNet.Server.Tests.JetStream;
///
/// Unit tests for IStreamStore contract, exercised against JetStreamMemStore.
/// Mirrors server/store_test.go (memory permutations only).
/// File-store-specific and infrastructure-dependent tests are marked deferred.
///
public class StorageEngineTests
{
// -----------------------------------------------------------------------
// Helpers
// -----------------------------------------------------------------------
private static JetStreamMemStore NewMemStore(StreamConfig cfg)
{
cfg.Storage = StorageType.MemoryStorage;
return JetStreamMemStore.NewMemStore(cfg);
}
private static byte[] Bytes(string s) => Encoding.UTF8.GetBytes(s);
// -----------------------------------------------------------------------
// TestStoreDeleteSlice (T:2943)
// -----------------------------------------------------------------------
[Fact] // T:2943
public void StoreDeleteSlice_ShouldSucceed()
{
// Reference: golang/nats-server/server/store_test.go:TestStoreDeleteSlice line 147
var ds = new DeleteSlice(new ulong[] { 2 });
var deletes = new List();
ds.Range(seq => { deletes.Add(seq); return true; });
deletes.Count.ShouldBe(1);
deletes[0].ShouldBe(2UL);
var (first, last, num) = ds.GetState();
first.ShouldBe(2UL);
last.ShouldBe(2UL);
num.ShouldBe(1UL);
}
// -----------------------------------------------------------------------
// TestStoreDeleteRange (T:2944)
// -----------------------------------------------------------------------
[Fact] // T:2944
public void StoreDeleteRange_ShouldSucceed()
{
// Reference: golang/nats-server/server/store_test.go:TestStoreDeleteRange line 163
var dr = new DeleteRange { First = 2, Num = 1 };
var deletes = new List();
dr.Range(seq => { deletes.Add(seq); return true; });
deletes.Count.ShouldBe(1);
deletes[0].ShouldBe(2UL);
var (first, last, num) = dr.GetState();
first.ShouldBe(2UL);
last.ShouldBe(2UL);
num.ShouldBe(1UL);
}
// -----------------------------------------------------------------------
// TestStoreSubjectStateConsistency (T:2945) — MemStore permutation only
// -----------------------------------------------------------------------
[Fact] // T:2945
public void StoreSubjectStateConsistency_ShouldSucceed()
{
// Reference: golang/nats-server/server/store_test.go:TestStoreSubjectStateConsistency line 179
var fs = NewMemStore(new StreamConfig { Name = "TEST", Subjects = new[] { "foo" } });
SimpleState GetSubjectState()
{
var ss = fs.SubjectsState("foo");
ss.TryGetValue("foo", out var result);
return result ?? new SimpleState();
}
var smp = new StoreMsg();
ulong ExpectFirstSeq()
{
var (sm, _, err) = fs.LoadNextMsg("foo", false, 0, smp).Sm?.Seq is ulong s
? (smp, s, (Exception?)null)
: (null, 0UL, StoreErrors.ErrStoreMsgNotFound);
var (smr, skip) = fs.LoadNextMsg("foo", false, 0, smp);
smr.ShouldNotBeNull();
return skip;
}
ulong ExpectLastSeq()
{
var sm = fs.LoadLastMsg("foo", smp);
sm.ShouldNotBeNull();
return sm!.Seq;
}
// Publish 4 messages
for (var i = 0; i < 4; i++)
fs.StoreMsg("foo", null, null, 0);
var ss = GetSubjectState();
ss.Msgs.ShouldBe(4UL);
ss.First.ShouldBe(1UL);
ss.Last.ShouldBe(4UL);
// Verify first/last via LoadNextMsg / LoadLastMsg
var (firstSm, firstSeq) = fs.LoadNextMsg("foo", false, 0, smp);
firstSm.ShouldNotBeNull();
firstSeq.ShouldBe(1UL);
var lastSm = fs.LoadLastMsg("foo", smp);
lastSm!.Seq.ShouldBe(4UL);
// Remove first message
var (removed, _) = fs.RemoveMsg(1);
removed.ShouldBeTrue();
ss = GetSubjectState();
ss.Msgs.ShouldBe(3UL);
ss.First.ShouldBe(2UL);
ss.Last.ShouldBe(4UL);
(firstSm, firstSeq) = fs.LoadNextMsg("foo", false, 0, smp);
firstSm.ShouldNotBeNull();
firstSeq.ShouldBe(2UL);
lastSm = fs.LoadLastMsg("foo", smp);
lastSm!.Seq.ShouldBe(4UL);
// Remove last message
(removed, _) = fs.RemoveMsg(4);
removed.ShouldBeTrue();
ss = GetSubjectState();
ss.Msgs.ShouldBe(2UL);
ss.First.ShouldBe(2UL);
ss.Last.ShouldBe(3UL);
(firstSm, firstSeq) = fs.LoadNextMsg("foo", false, 0, smp);
firstSm.ShouldNotBeNull();
firstSeq.ShouldBe(2UL);
lastSm = fs.LoadLastMsg("foo", smp);
lastSm!.Seq.ShouldBe(3UL);
// Remove seq 2
(removed, _) = fs.RemoveMsg(2);
removed.ShouldBeTrue();
ss = GetSubjectState();
ss.Msgs.ShouldBe(1UL);
ss.First.ShouldBe(3UL);
ss.Last.ShouldBe(3UL);
(firstSm, firstSeq) = fs.LoadNextMsg("foo", false, 0, smp);
firstSm.ShouldNotBeNull();
firstSeq.ShouldBe(3UL);
lastSm = fs.LoadLastMsg("foo", smp);
lastSm!.Seq.ShouldBe(3UL);
// Publish 3 more
for (var i = 0; i < 3; i++)
fs.StoreMsg("foo", null, null, 0);
ss = GetSubjectState();
ss.Msgs.ShouldBe(4UL);
ss.First.ShouldBe(3UL);
ss.Last.ShouldBe(7UL);
// Remove seq 7 and seq 3
(removed, _) = fs.RemoveMsg(7);
removed.ShouldBeTrue();
(removed, _) = fs.RemoveMsg(3);
removed.ShouldBeTrue();
// Remove seq 5 (the now-first)
(removed, _) = fs.RemoveMsg(5);
removed.ShouldBeTrue();
ss = GetSubjectState();
ss.Msgs.ShouldBe(1UL);
ss.First.ShouldBe(6UL);
ss.Last.ShouldBe(6UL);
(firstSm, firstSeq) = fs.LoadNextMsg("foo", false, 0, smp);
firstSm.ShouldNotBeNull();
firstSeq.ShouldBe(6UL);
lastSm = fs.LoadLastMsg("foo", smp);
lastSm!.Seq.ShouldBe(6UL);
// Store + immediately remove seq 8, then store seq 9
fs.StoreMsg("foo", null, null, 0);
(removed, _) = fs.RemoveMsg(8);
removed.ShouldBeTrue();
fs.StoreMsg("foo", null, null, 0);
ss = GetSubjectState();
ss.Msgs.ShouldBe(2UL);
ss.First.ShouldBe(6UL);
ss.Last.ShouldBe(9UL);
(firstSm, firstSeq) = fs.LoadNextMsg("foo", false, 0, smp);
firstSm.ShouldNotBeNull();
firstSeq.ShouldBe(6UL);
lastSm = fs.LoadLastMsg("foo", smp);
lastSm!.Seq.ShouldBe(9UL);
fs.Stop();
}
// -----------------------------------------------------------------------
// TestStoreMaxMsgsPerUpdateBug (T:2947) — MemStore permutation only
// -----------------------------------------------------------------------
[Fact] // T:2947
public void StoreMaxMsgsPerUpdateBug_ShouldSucceed()
{
// Reference: golang/nats-server/server/store_test.go:TestStoreMaxMsgsPerUpdateBug line 405
var cfg = new StreamConfig
{
Name = "TEST",
Subjects = new[] { "foo" },
MaxMsgsPer = 0,
};
var fs = NewMemStore(cfg);
for (var i = 0; i < 5; i++)
fs.StoreMsg("foo", null, null, 0);
var ss = fs.State();
ss.Msgs.ShouldBe(5UL);
ss.FirstSeq.ShouldBe(1UL);
ss.LastSeq.ShouldBe(5UL);
// Update max messages per-subject from 0 (infinite) to 1
cfg.MaxMsgsPer = 1;
fs.UpdateConfig(cfg);
// Only one message should remain
ss = fs.State();
ss.Msgs.ShouldBe(1UL);
ss.FirstSeq.ShouldBe(5UL);
ss.LastSeq.ShouldBe(5UL);
// Update to invalid value (< -1) — should clamp to -1
cfg.MaxMsgsPer = -2;
fs.UpdateConfig(cfg);
cfg.MaxMsgsPer.ShouldBe(-1L);
fs.Stop();
}
// -----------------------------------------------------------------------
// TestStoreCompactCleansUpDmap (T:2948) — MemStore permutation only
// -----------------------------------------------------------------------
[Fact] // T:2948
public void StoreCompactCleansUpDmap_ShouldSucceed()
{
// Reference: golang/nats-server/server/store_test.go:TestStoreCompactCleansUpDmap line 449
// We run for compact sequences 2, 3, 4
for (var cseq = 2UL; cseq <= 4UL; cseq++)
{
var cfg = new StreamConfig
{
Name = "TEST",
Subjects = new[] { "foo" },
MaxMsgsPer = 0,
};
var fs = NewMemStore(cfg);
// Publish 3 messages; no interior deletes
for (var i = 0; i < 3; i++)
fs.StoreMsg("foo", null, null, 0);
// Remove one message in the middle = interior delete
var (removed, _) = fs.RemoveMsg(2);
removed.ShouldBeTrue();
// The dmap should have 1 entry (seq 2) — verify via State().NumDeleted
var state = fs.State();
state.NumDeleted.ShouldBe(1);
// Compact — must clean up the interior delete
var (_, err) = fs.Compact(cseq);
err.ShouldBeNull();
// After compaction, no deleted entries in the range
state = fs.State();
state.NumDeleted.ShouldBe(0);
// Validate first/last sequence
var expectedFirst = Math.Max(3UL, cseq);
state.FirstSeq.ShouldBe(expectedFirst);
state.LastSeq.ShouldBe(3UL);
fs.Stop();
}
}
// -----------------------------------------------------------------------
// TestStoreTruncateCleansUpDmap (T:2949) — MemStore permutation only
// -----------------------------------------------------------------------
[Fact] // T:2949
public void StoreTruncateCleansUpDmap_ShouldSucceed()
{
// Reference: golang/nats-server/server/store_test.go:TestStoreTruncateCleansUpDmap line 500
// We run for truncate sequences 0 and 1
for (var tseq = 0UL; tseq <= 1UL; tseq++)
{
var cfg = new StreamConfig
{
Name = "TEST",
Subjects = new[] { "foo" },
MaxMsgsPer = 0,
};
var fs = NewMemStore(cfg);
// Publish 3 messages
for (var i = 0; i < 3; i++)
fs.StoreMsg("foo", null, null, 0);
// Remove middle message = interior delete
var (removed, _) = fs.RemoveMsg(2);
removed.ShouldBeTrue();
var state = fs.State();
state.NumDeleted.ShouldBe(1);
// Truncate
fs.Truncate(tseq);
state = fs.State();
state.NumDeleted.ShouldBe(0);
// Validate first/last sequence
var expectedFirst = Math.Min(1UL, tseq);
state.FirstSeq.ShouldBe(expectedFirst);
state.LastSeq.ShouldBe(tseq);
fs.Stop();
}
}
// -----------------------------------------------------------------------
// TestStorePurgeExZero (T:2950) — MemStore permutation only
// -----------------------------------------------------------------------
[Fact] // T:2950
public void StorePurgeExZero_ShouldSucceed()
{
// Reference: golang/nats-server/server/store_test.go:TestStorePurgeExZero line 552
var fs = NewMemStore(new StreamConfig { Name = "TEST", Subjects = new[] { "foo" } });
// Simple purge all
var (_, err) = fs.Purge();
err.ShouldBeNull();
var ss = fs.State();
ss.FirstSeq.ShouldBe(1UL);
ss.LastSeq.ShouldBe(0UL);
// PurgeEx(seq=0) must equal Purge()
(_, err) = fs.PurgeEx(string.Empty, 0, 0);
err.ShouldBeNull();
ss = fs.State();
ss.FirstSeq.ShouldBe(1UL);
ss.LastSeq.ShouldBe(0UL);
fs.Stop();
}
// -----------------------------------------------------------------------
// TestStoreGetSeqFromTimeWithInteriorDeletesGap (T:2955) — MemStore permutation only
// -----------------------------------------------------------------------
[Fact] // T:2955
public void StoreGetSeqFromTimeWithInteriorDeletesGap_ShouldSucceed()
{
// Reference: golang/nats-server/server/store_test.go:TestStoreGetSeqFromTimeWithInteriorDeletesGap line 874
// Go: start = ts from StoreMsg at i==1; ts := time.Unix(0, start).UTC()
// .NET: convert the 100-ns store timestamp directly to DateTime (same precision).
var fs = NewMemStore(new StreamConfig { Name = "zzz", Subjects = new[] { "foo" } });
long start = 0;
for (var i = 0; i < 10; i++)
{
var (_, ts) = fs.StoreMsg("foo", null, null, 0);
if (i == 1)
start = ts; // exact timestamp of seq 2
}
// Create a delete gap at seqs 4-7
for (var seq = 4UL; seq <= 7UL; seq++)
fs.RemoveMsg(seq);
// Convert 100-ns-since-epoch to DateTime (mirrors Go's time.Unix(0, start))
const long UnixEpochTicks = 621355968000000000L;
var t = new DateTime(start / 100L + UnixEpochTicks, DateTimeKind.Utc);
var gotSeq = fs.GetSeqFromTime(t);
gotSeq.ShouldBe(2UL);
fs.Stop();
}
// -----------------------------------------------------------------------
// TestStoreGetSeqFromTimeWithTrailingDeletes (T:2956) — MemStore permutation only
// -----------------------------------------------------------------------
[Fact] // T:2956
public void StoreGetSeqFromTimeWithTrailingDeletes_ShouldSucceed()
{
// Reference: golang/nats-server/server/store_test.go:TestStoreGetSeqFromTimeWithTrailingDeletes line 900
// Go: start = ts from StoreMsg at i==1; ts := time.Unix(0, start).UTC()
// .NET: convert the 100-ns store timestamp directly to DateTime (same precision).
var fs = NewMemStore(new StreamConfig { Name = "zzz", Subjects = new[] { "foo" } });
long start = 0;
for (var i = 0; i < 3; i++)
{
var (_, ts) = fs.StoreMsg("foo", null, null, 0);
if (i == 1)
start = ts; // exact timestamp of seq 2
}
fs.RemoveMsg(3);
// Convert 100-ns-since-epoch to DateTime (mirrors Go's time.Unix(0, start))
const long UnixEpochTicks = 621355968000000000L;
var t = new DateTime(start / 100L + UnixEpochTicks, DateTimeKind.Utc);
var gotSeq = fs.GetSeqFromTime(t);
gotSeq.ShouldBe(2UL);
fs.Stop();
}
// -----------------------------------------------------------------------
// TestFileStoreMultiLastSeqsAndLoadLastMsgWithLazySubjectState (T:2957)
// MemStore permutation only
// -----------------------------------------------------------------------
[Fact] // T:2957
public void FileStoreMultiLastSeqsAndLoadLastMsgWithLazySubjectState_ShouldSucceed()
{
// Reference: golang/nats-server/server/store_test.go:TestFileStoreMultiLastSeqsAndLoadLastMsgWithLazySubjectState line 921
var fs = NewMemStore(new StreamConfig { Name = "zzz", Subjects = new[] { "foo" } });
for (var i = 0; i < 3; i++)
fs.StoreMsg("foo", null, null, 0);
var (seqs, err) = fs.MultiLastSeqs(new[] { "foo" }, 0, 0);
err.ShouldBeNull();
seqs!.Length.ShouldBe(1);
seqs![0].ShouldBe(3UL);
var (removed, _) = fs.RemoveMsg(3);
removed.ShouldBeTrue();
(seqs, err) = fs.MultiLastSeqs(new[] { "foo" }, 0, 0);
err.ShouldBeNull();
seqs!.Length.ShouldBe(1);
seqs![0].ShouldBe(2UL);
fs.StoreMsg("foo", null, null, 0);
var sm = fs.LoadLastMsg("foo", null);
sm.ShouldNotBeNull();
sm!.Seq.ShouldBe(4UL);
(removed, _) = fs.RemoveMsg(4);
removed.ShouldBeTrue();
sm = fs.LoadLastMsg("foo", null);
sm.ShouldNotBeNull();
sm!.Seq.ShouldBe(2UL);
fs.Stop();
}
// -----------------------------------------------------------------------
// TestStoreDiscardNew (T:2954) — MemStore permutation only
// -----------------------------------------------------------------------
[Fact] // T:2954
public void StoreDiscardNew_ShouldSucceed()
{
// Reference: golang/nats-server/server/store_test.go:TestStoreDiscardNew line 788
// Helper that runs the discard-new test for a given config modifier
void Test(Action updateConfig, Exception? expectedErr)
{
var cfg = new StreamConfig
{
Name = "zzz",
Subjects = new[] { "foo" },
Discard = DiscardPolicy.DiscardNew,
};
updateConfig(cfg);
cfg.Storage = StorageType.MemoryStorage;
var fs = new JetStreamMemStore(cfg);
var ts = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L;
var expectedSeq = 1UL;
void RequireState()
{
var state = fs.State();
state.Msgs.ShouldBe(1UL);
state.FirstSeq.ShouldBe(expectedSeq);
state.LastSeq.ShouldBe(expectedSeq);
}
fs.StoreMsg("foo", null, null, 0);
// StoreRawMsg with discardNewCheck=true
if (expectedErr == null)
{
fs.StoreRawMsg("foo", null, null, 0, ts, 0, true);
expectedSeq++;
}
else
{
Should.Throw(() => fs.StoreRawMsg("foo", null, null, 0, ts, 0, true));
}
RequireState();
// StoreRawMsg with discardNewCheck=false (followers must always accept)
fs.StoreRawMsg("foo", null, null, 0, ts, 0, false);
expectedSeq++;
// For MaxMsgsPer we stay at 1 msg; otherwise 2 msgs
if (cfg.MaxMsgsPer > 0)
{
RequireState();
}
else
{
var state = fs.State();
state.Msgs.ShouldBe(2UL);
state.FirstSeq.ShouldBe(expectedSeq - 1);
state.LastSeq.ShouldBe(expectedSeq);
}
fs.Stop();
}
Test(cfg => cfg.MaxMsgs = 1, StoreErrors.ErrMaxMsgs);
Test(cfg => cfg.MaxBytes = 33, StoreErrors.ErrMaxBytes);
Test(cfg => cfg.MaxMsgsPer = 1, null);
Test(cfg => { cfg.DiscardNewPer = true; cfg.MaxMsgsPer = 1; }, StoreErrors.ErrMaxMsgsPerSubject);
Test(cfg => { cfg.MaxMsgs = 1; cfg.MaxMsgsPer = 1; }, null);
Test(cfg => { cfg.MaxBytes = 33; cfg.MaxMsgsPer = 1; }, null);
Test(cfg => { cfg.DiscardNewPer = true; cfg.MaxMsgs = 1; cfg.MaxMsgsPer = 1; }, StoreErrors.ErrMaxMsgsPerSubject);
Test(cfg => { cfg.DiscardNewPer = true; cfg.MaxBytes = 33; cfg.MaxMsgsPer = 1; }, StoreErrors.ErrMaxMsgsPerSubject);
}
}