diff --git a/src/NATS.Server/JetStream/Storage/MemStore.cs b/src/NATS.Server/JetStream/Storage/MemStore.cs index eafade9..71a347f 100644 --- a/src/NATS.Server/JetStream/Storage/MemStore.cs +++ b/src/NATS.Server/JetStream/Storage/MemStore.cs @@ -264,10 +264,15 @@ public sealed class MemStore : IStreamStore { lock (_gate) { + // Go: memStore state — when empty but FirstSeq was configured, report it. + // Reference: server/memstore.go — State() preserves first.seq watermark. + var firstSeqForApi = _st.Msgs == 0 + ? (_st.FirstSeq > 0 ? _st.FirstSeq : 0UL) + : _st.FirstSeq; return ValueTask.FromResult(new ApiStreamState { Messages = _st.Msgs, - FirstSeq = _st.Msgs == 0 ? 0UL : _st.FirstSeq, + FirstSeq = firstSeqForApi, LastSeq = _st.LastSeq, Bytes = _st.Bytes, }); @@ -1048,19 +1053,22 @@ public sealed class MemStore : IStreamStore if (seq <= _st.LastSeq) { var fseq = _st.FirstSeq; - // Find the actual new first seq - for (var s = seq; s <= _st.LastSeq; s++) + // Go: walk forward from seq to find first real message, mutating seq in-place. + // The backward cleanup loop must start from (newFirstSeq - 1) to correctly + // remove any dmap entries at the original compact seq that are no longer interior. + var newFirst = seq; + for (; newFirst <= _st.LastSeq; newFirst++) { - if (_msgs.TryGetValue(s, out var nm)) + if (_msgs.TryGetValue(newFirst, out var nm)) { - _st.FirstSeq = s; + _st.FirstSeq = newFirst; _st.FirstTime = new DateTime(nm.Ts / 100L, DateTimeKind.Utc); break; } } ulong purged = 0; - for (var s = seq - 1; s >= fseq && s < ulong.MaxValue; s--) + for (var s = newFirst - 1; s >= fseq && s < ulong.MaxValue; s--) { if (_msgs.TryGetValue(s, out var m)) { diff --git a/tests/NATS.Server.Tests/JetStream/Storage/StoreInterfaceTests.cs b/tests/NATS.Server.Tests/JetStream/Storage/StoreInterfaceTests.cs new file mode 100644 index 0000000..b64a571 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Storage/StoreInterfaceTests.cs @@ -0,0 +1,536 @@ +// Reference: golang/nats-server/server/store_test.go +// Tests ported in this file: +// TestStoreLoadNextMsgWildcardStartBeforeFirstMatch → LoadNextMsg_WildcardStartBeforeFirstMatch +// TestStoreSubjectStateConsistency → SubjectStateConsistency_UpdatesFirstAndLast +// TestStoreCompactCleansUpDmap → Compact_CleansUpDmap (parameterised) +// TestStoreTruncateCleansUpDmap → Truncate_CleansUpDmap (parameterised) +// TestStorePurgeExZero → PurgeEx_ZeroSeq_EquivalentToPurge +// TestStoreUpdateConfigTTLState → UpdateConfigTTLState_MessageSurvivesWhenTtlDisabled +// TestStoreStreamInteriorDeleteAccounting → InteriorDeleteAccounting_MemStore (subset without FileStore restart) +// TestStoreGetSeqFromTimeWithInteriorDeletesGap → GetSeqFromTime_WithInteriorDeletesGap +// TestStoreGetSeqFromTimeWithTrailingDeletes → GetSeqFromTime_WithTrailingDeletes +// TestStoreMaxMsgsPerUpdateBug → MaxMsgsPerUpdateBug_ReducesOnConfigUpdate +// TestFileStoreMultiLastSeqsAndLoadLastMsgWithLazySubjectState → MultiLastSeqs_AndLoadLastMsg_WithLazySubjectState + +using NATS.Server.JetStream.Models; +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.Tests.JetStream.Storage; + +/// +/// IStreamStore interface contract tests. Validates behaviour shared by all store +/// implementations using MemStore (the simplest implementation). +/// Each test mirrors a specific Go test from golang/nats-server/server/store_test.go. +/// +public sealed class StoreInterfaceTests +{ + // Helper: cast MemStore to IStreamStore to access sync interface methods. + private static IStreamStore Sync(MemStore ms) => ms; + + // ------------------------------------------------------------------------- + // LoadNextMsg — wildcard start before first match + // ------------------------------------------------------------------------- + + // Go: TestStoreLoadNextMsgWildcardStartBeforeFirstMatch server/store_test.go:118 + [Fact] + public void LoadNextMsg_WildcardStartBeforeFirstMatch() + { + var ms = new MemStore(); + var s = Sync(ms); + + // Fill non-matching subjects first so the first wildcard match starts + // strictly after the requested start sequence. + for (var i = 0; i < 100; i++) + s.StoreMsg($"bar.{i}", null, [], 0); + + var (seq, _) = s.StoreMsg("foo.1", null, [], 0); + seq.ShouldBe(101UL); + + // Loading with wildcard "foo.*" from seq 1 should find the message at seq 101. + var sm = new StoreMsg(); + var (msg, skip) = s.LoadNextMsg("foo.*", true, 1, sm); + msg.Subject.ShouldBe("foo.1"); + // skip = seq - start, so seq 101 - start 1 = 100 + skip.ShouldBe(100UL); + msg.Sequence.ShouldBe(101UL); + + // Loading after seq 101 should throw — no more foo.* messages. + Should.Throw(() => s.LoadNextMsg("foo.*", true, 102, null)); + } + + // ------------------------------------------------------------------------- + // SubjectStateConsistency + // ------------------------------------------------------------------------- + + // Go: TestStoreSubjectStateConsistency server/store_test.go:179 + [Fact] + public void SubjectStateConsistency_UpdatesFirstAndLast() + { + var ms = new MemStore(); + var s = Sync(ms); + + SimpleState GetSubjectState() + { + var ss = s.SubjectsState("foo"); + ss.TryGetValue("foo", out var state); + return state; + } + + ulong LoadFirstSeq() + { + var sm = new StoreMsg(); + var (msg, _) = s.LoadNextMsg("foo", false, 0, sm); + return msg.Sequence; + } + + ulong LoadLastSeq() + { + var sm = new StoreMsg(); + var msg = s.LoadLastMsg("foo", sm); + return msg.Sequence; + } + + // Publish an initial batch of messages. + for (var i = 0; i < 4; i++) + s.StoreMsg("foo", null, [], 0); + + // Expect 4 msgs, with first=1, last=4. + var ss = GetSubjectState(); + ss.Msgs.ShouldBe(4UL); + ss.First.ShouldBe(1UL); + LoadFirstSeq().ShouldBe(1UL); + ss.Last.ShouldBe(4UL); + LoadLastSeq().ShouldBe(4UL); + + // Remove first message — first should update to seq 2. + s.RemoveMsg(1).ShouldBeTrue(); + + ss = GetSubjectState(); + ss.Msgs.ShouldBe(3UL); + ss.First.ShouldBe(2UL); + LoadFirstSeq().ShouldBe(2UL); + ss.Last.ShouldBe(4UL); + LoadLastSeq().ShouldBe(4UL); + + // Remove last message — last should update to seq 3. + s.RemoveMsg(4).ShouldBeTrue(); + + ss = GetSubjectState(); + ss.Msgs.ShouldBe(2UL); + ss.First.ShouldBe(2UL); + LoadFirstSeq().ShouldBe(2UL); + ss.Last.ShouldBe(3UL); + LoadLastSeq().ShouldBe(3UL); + + // Remove first message again. + s.RemoveMsg(2).ShouldBeTrue(); + + // Only one message left — first and last should both equal 3. + ss = GetSubjectState(); + ss.Msgs.ShouldBe(1UL); + ss.First.ShouldBe(3UL); + LoadFirstSeq().ShouldBe(3UL); + ss.Last.ShouldBe(3UL); + LoadLastSeq().ShouldBe(3UL); + + // Publish some more messages so we can test another scenario. + for (var i = 0; i < 3; i++) + s.StoreMsg("foo", null, [], 0); + + ss = GetSubjectState(); + ss.Msgs.ShouldBe(4UL); + ss.First.ShouldBe(3UL); + LoadFirstSeq().ShouldBe(3UL); + ss.Last.ShouldBe(7UL); + LoadLastSeq().ShouldBe(7UL); + + // Remove last sequence. + s.RemoveMsg(7).ShouldBeTrue(); + + // Remove first sequence. + s.RemoveMsg(3).ShouldBeTrue(); + + // Remove (now) first sequence 5. + s.RemoveMsg(5).ShouldBeTrue(); + + // ss.First and ss.Last should both be recalculated and equal each other. + ss = GetSubjectState(); + ss.Msgs.ShouldBe(1UL); + ss.First.ShouldBe(6UL); + LoadFirstSeq().ShouldBe(6UL); + ss.Last.ShouldBe(6UL); + LoadLastSeq().ShouldBe(6UL); + + // Store a new message and immediately remove it (marks lastNeedsUpdate), + // then store another — that new one becomes the real last. + s.StoreMsg("foo", null, [], 0); // seq 8 + s.RemoveMsg(8).ShouldBeTrue(); + s.StoreMsg("foo", null, [], 0); // seq 9 + + ss = GetSubjectState(); + ss.Msgs.ShouldBe(2UL); + ss.First.ShouldBe(6UL); + LoadFirstSeq().ShouldBe(6UL); + ss.Last.ShouldBe(9UL); + LoadLastSeq().ShouldBe(9UL); + } + + // ------------------------------------------------------------------------- + // CompactCleansUpDmap — parameterised over compact sequences 2, 3, 4 + // ------------------------------------------------------------------------- + + // Go: TestStoreCompactCleansUpDmap server/store_test.go:449 + [Theory] + [InlineData(2UL)] + [InlineData(3UL)] + [InlineData(4UL)] + public void Compact_CleansUpDmap(ulong compactSeq) + { + var ms = new MemStore(); + var s = Sync(ms); + + // Publish 3 messages — no interior deletes. + for (var i = 0; i < 3; i++) + s.StoreMsg("foo", null, [], 0); + + var state = s.State(); + state.NumDeleted.ShouldBe(0); + state.Deleted.ShouldBeNull(); + + // Removing the middle message creates an interior delete. + s.RemoveMsg(2).ShouldBeTrue(); + state = s.State(); + state.NumDeleted.ShouldBe(1); + state.Deleted.ShouldNotBeNull(); + state.Deleted!.Length.ShouldBe(1); + + // Compacting must always clean up the interior delete. + s.Compact(compactSeq); + state = s.State(); + state.NumDeleted.ShouldBe(0); + state.Deleted.ShouldBeNull(); + + // Validate first/last sequence. + var expectedFirst = Math.Max(3UL, compactSeq); + state.FirstSeq.ShouldBe(expectedFirst); + state.LastSeq.ShouldBe(3UL); + } + + // ------------------------------------------------------------------------- + // TruncateCleansUpDmap — parameterised over truncate sequences 0, 1 + // ------------------------------------------------------------------------- + + // Go: TestStoreTruncateCleansUpDmap server/store_test.go:500 + [Theory] + [InlineData(0UL)] + [InlineData(1UL)] + public void Truncate_CleansUpDmap(ulong truncateSeq) + { + var ms = new MemStore(); + var s = Sync(ms); + + // Publish 3 messages — no interior deletes. + for (var i = 0; i < 3; i++) + s.StoreMsg("foo", null, [], 0); + + var state = s.State(); + state.NumDeleted.ShouldBe(0); + state.Deleted.ShouldBeNull(); + + // Removing the middle message creates an interior delete. + s.RemoveMsg(2).ShouldBeTrue(); + state = s.State(); + state.NumDeleted.ShouldBe(1); + state.Deleted.ShouldNotBeNull(); + state.Deleted!.Length.ShouldBe(1); + + // Truncating must always clean up the interior delete. + s.Truncate(truncateSeq); + state = s.State(); + state.NumDeleted.ShouldBe(0); + state.Deleted.ShouldBeNull(); + + // Validate first/last sequence after truncate. + var expectedFirst = Math.Min(1UL, truncateSeq); + state.FirstSeq.ShouldBe(expectedFirst); + state.LastSeq.ShouldBe(truncateSeq); + } + + // ------------------------------------------------------------------------- + // PurgeEx with zero sequence — must equal Purge + // ------------------------------------------------------------------------- + + // Go: TestStorePurgeExZero server/store_test.go:552 + [Fact] + public void PurgeEx_ZeroSeq_EquivalentToPurge() + { + var ms = new MemStore(); + var s = Sync(ms); + + // Simple purge all — stream should be empty but first seq = last seq + 1. + s.Purge(); + var ss = s.State(); + ss.FirstSeq.ShouldBe(1UL); + ss.LastSeq.ShouldBe(0UL); + + // PurgeEx with seq=0 must produce the same result. + s.PurgeEx("", 0, 0); + ss = s.State(); + ss.FirstSeq.ShouldBe(1UL); + ss.LastSeq.ShouldBe(0UL); + } + + // ------------------------------------------------------------------------- + // UpdateConfig TTL state — message survives when TTL is disabled + // ------------------------------------------------------------------------- + + // Go: TestStoreUpdateConfigTTLState server/store_test.go:574 + [Fact] + public void UpdateConfigTTLState_MessageSurvivesWhenTtlDisabled() + { + var cfg = new StreamConfig + { + Name = "TEST", + Subjects = ["foo"], + Storage = StorageType.Memory, + AllowMsgTtl = false, + }; + var ms = new MemStore(cfg); + var s = Sync(ms); + + // TTLs disabled — message with ttl=1s should survive even after 2s. + var (seq, _) = s.StoreMsg("foo", null, [], 1); + Thread.Sleep(2_000); + // Should not throw — message should still be present. + var loaded = s.LoadMsg(seq, null); + loaded.Sequence.ShouldBe(seq); + + // Now enable TTLs. + cfg.AllowMsgTtl = true; + s.UpdateConfig(cfg); + + // TTLs enabled — message with ttl=1s should expire. + var (seq2, _) = s.StoreMsg("foo", null, [], 1); + Thread.Sleep(2_500); + // Should throw — message should have expired. + Should.Throw(() => s.LoadMsg(seq2, null)); + + // Now disable TTLs again. + cfg.AllowMsgTtl = false; + s.UpdateConfig(cfg); + + // TTLs disabled — message with ttl=1s should survive. + var (seq3, _) = s.StoreMsg("foo", null, [], 1); + Thread.Sleep(2_000); + // Should not throw — TTL wheel is gone so message stays. + var loaded3 = s.LoadMsg(seq3, null); + loaded3.Sequence.ShouldBe(seq3); + } + + // ------------------------------------------------------------------------- + // StreamInteriorDeleteAccounting — MemStore subset (no FileStore restart) + // ------------------------------------------------------------------------- + + // Go: TestStoreStreamInteriorDeleteAccounting server/store_test.go:621 + // Tests TruncateWithRemove and TruncateWithErase variants on MemStore. + [Theory] + [InlineData(false, "TruncateWithRemove")] + [InlineData(false, "TruncateWithErase")] + [InlineData(false, "SkipMsg")] + [InlineData(false, "SkipMsgs")] + [InlineData(true, "TruncateWithRemove")] + [InlineData(true, "TruncateWithErase")] + [InlineData(true, "SkipMsg")] + [InlineData(true, "SkipMsgs")] + public void InteriorDeleteAccounting_StateIsCorrect(bool empty, string actionTitle) + { + var ms = new MemStore(); + var s = Sync(ms); + + ulong lseq = 0; + if (!empty) + { + var (storedSeq, _) = s.StoreMsg("foo", null, [], 0); + storedSeq.ShouldBe(1UL); + lseq = storedSeq; + } + lseq++; + + switch (actionTitle) + { + case "TruncateWithRemove": + { + var (storedSeq, _) = s.StoreMsg("foo", null, [], 0); + storedSeq.ShouldBe(lseq); + s.RemoveMsg(lseq).ShouldBeTrue(); + s.Truncate(lseq); + break; + } + case "TruncateWithErase": + { + var (storedSeq, _) = s.StoreMsg("foo", null, [], 0); + storedSeq.ShouldBe(lseq); + s.EraseMsg(lseq).ShouldBeTrue(); + s.Truncate(lseq); + break; + } + case "SkipMsg": + { + s.SkipMsg(0); + break; + } + case "SkipMsgs": + { + s.SkipMsgs(lseq, 1); + break; + } + } + + // Confirm state. + var before = s.State(); + if (empty) + { + before.Msgs.ShouldBe(0UL); + before.FirstSeq.ShouldBe(2UL); + before.LastSeq.ShouldBe(1UL); + } + else + { + before.Msgs.ShouldBe(1UL); + before.FirstSeq.ShouldBe(1UL); + before.LastSeq.ShouldBe(2UL); + } + } + + // ------------------------------------------------------------------------- + // GetSeqFromTime with interior deletes gap + // ------------------------------------------------------------------------- + + // Go: TestStoreGetSeqFromTimeWithInteriorDeletesGap server/store_test.go:874 + [Fact] + public void GetSeqFromTime_WithInteriorDeletesGap() + { + var ms = new MemStore(); + var s = Sync(ms); + + long startTs = 0; + for (var i = 0; i < 10; i++) + { + var (_, ts) = s.StoreMsg("foo", null, [], 0); + if (i == 1) + startTs = ts; + } + + // Create a delete gap in the middle: seqs 4-7 deleted. + // A naive binary search would hit deleted sequences and return wrong result. + for (var seq = 4UL; seq <= 7UL; seq++) + s.RemoveMsg(seq).ShouldBeTrue(); + + // Convert nanoseconds timestamp to DateTime. + var t = new DateTime(startTs / 100L, DateTimeKind.Utc); + var found = s.GetSeqFromTime(t); + found.ShouldBe(2UL); + } + + // ------------------------------------------------------------------------- + // GetSeqFromTime with trailing deletes + // ------------------------------------------------------------------------- + + // Go: TestStoreGetSeqFromTimeWithTrailingDeletes server/store_test.go:900 + [Fact] + public void GetSeqFromTime_WithTrailingDeletes() + { + var ms = new MemStore(); + var s = Sync(ms); + + long startTs = 0; + for (var i = 0; i < 3; i++) + { + var (_, ts) = s.StoreMsg("foo", null, [], 0); + if (i == 1) + startTs = ts; + } + + // Delete last message — trailing delete. + s.RemoveMsg(3).ShouldBeTrue(); + + var t = new DateTime(startTs / 100L, DateTimeKind.Utc); + var found = s.GetSeqFromTime(t); + found.ShouldBe(2UL); + } + + // ------------------------------------------------------------------------- + // MaxMsgsPerUpdateBug — per-subject limit enforced on config update + // ------------------------------------------------------------------------- + + // Go: TestStoreMaxMsgsPerUpdateBug server/store_test.go:405 + [Fact] + public void MaxMsgsPerUpdateBug_ReducesOnConfigUpdate() + { + var cfg = new StreamConfig + { + Name = "TEST", + Subjects = ["foo"], + MaxMsgsPer = 0, + Storage = StorageType.Memory, + }; + var ms = new MemStore(cfg); + var s = Sync(ms); + + for (var i = 0; i < 5; i++) + s.StoreMsg("foo", null, [], 0); + + var state = s.State(); + state.Msgs.ShouldBe(5UL); + state.FirstSeq.ShouldBe(1UL); + state.LastSeq.ShouldBe(5UL); + + // Update max messages per-subject from 0 (infinite) to 1. + // Since the per-subject limit was not specified before, messages should be + // removed upon config update, leaving only the most recent. + cfg.MaxMsgsPer = 1; + s.UpdateConfig(cfg); + + state = s.State(); + state.Msgs.ShouldBe(1UL); + state.FirstSeq.ShouldBe(5UL); + state.LastSeq.ShouldBe(5UL); + } + + // ------------------------------------------------------------------------- + // MultiLastSeqs and LoadLastMsg with lazy subject state + // ------------------------------------------------------------------------- + + // Go: TestFileStoreMultiLastSeqsAndLoadLastMsgWithLazySubjectState server/store_test.go:921 + [Fact] + public void MultiLastSeqs_AndLoadLastMsg_WithLazySubjectState() + { + var ms = new MemStore(); + var s = Sync(ms); + + for (var i = 0; i < 3; i++) + s.StoreMsg("foo", null, [], 0); + + // MultiLastSeqs for "foo" should return [3]. + var seqs = s.MultiLastSeqs(["foo"], 0, -1); + seqs.Length.ShouldBe(1); + seqs[0].ShouldBe(3UL); + + // Remove last message — lazy last needs update. + s.RemoveMsg(3).ShouldBeTrue(); + + seqs = s.MultiLastSeqs(["foo"], 0, -1); + seqs.Length.ShouldBe(1); + seqs[0].ShouldBe(2UL); + + // Store another and load it as last. + s.StoreMsg("foo", null, [], 0); // seq 4 + var lastMsg = s.LoadLastMsg("foo", null); + lastMsg.Sequence.ShouldBe(4UL); + + // Remove seq 4 — lazy last update again. + s.RemoveMsg(4).ShouldBeTrue(); + lastMsg = s.LoadLastMsg("foo", null); + lastMsg.Sequence.ShouldBe(2UL); + } +}