Files
Joseph Doherty 88a82ee860 docs: add XML doc comments to server types and fix flaky test timings
Add XML doc comments to public properties across EventTypes, Connz, Varz,
NatsOptions, StreamConfig, IStreamStore, FileStore, MqttListener,
MqttSessionStore, MessageTraceContext, and JetStreamApiResponse. Fix flaky
tests by increasing timing margins (ResponseTracker expiry 1ms→50ms,
sleep 50ms→200ms) and document known flaky test patterns in tests.md.
2026-03-13 18:47:48 -04:00

556 lines
20 KiB
C#

// Reference: golang/nats-server/server/store_test.go
// Tests ported in this file:
// TestStoreLoadNextMsgWildcardStartBeforeFirstMatch → LoadNextMsg_WildcardStartBeforeFirstMatch
// TestStoreSubjectStateConsistency → SubjectStateConsistency_UpdatesFirstAndLast
// TestStoreCompactCleansUpDmap → Compact_CleansUpDmap (parameterised)
// TestStoreTruncateCleansUpDmap → Truncate_CleansUpDmap (parameterised)
// TestStorePurgeExZero → PurgeEx_ZeroSeq_EquivalentToPurge
// TestStoreUpdateConfigTTLState → UpdateConfigTTLState_MessageSurvivesWhenTtlDisabled
// TestStoreStreamInteriorDeleteAccounting → InteriorDeleteAccounting_MemStore (subset without FileStore restart)
// TestStoreGetSeqFromTimeWithInteriorDeletesGap → GetSeqFromTime_WithInteriorDeletesGap
// TestStoreGetSeqFromTimeWithTrailingDeletes → GetSeqFromTime_WithTrailingDeletes
// TestStoreMaxMsgsPerUpdateBug → MaxMsgsPerUpdateBug_ReducesOnConfigUpdate
// TestFileStoreMultiLastSeqsAndLoadLastMsgWithLazySubjectState → MultiLastSeqs_AndLoadLastMsg_WithLazySubjectState
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Storage;
using NATS.Server.TestUtilities;
namespace NATS.Server.JetStream.Tests.JetStream.Storage;
/// <summary>
/// IStreamStore interface contract tests. Validates behaviour shared by all store
/// implementations using MemStore (the simplest implementation).
/// Each test mirrors a specific Go test from golang/nats-server/server/store_test.go.
/// </summary>
public sealed class StoreInterfaceTests
{
// Helper: cast MemStore to IStreamStore to access sync interface methods.
private static IStreamStore Sync(MemStore ms) => ms;
// -------------------------------------------------------------------------
// LoadNextMsg — wildcard start before first match
// -------------------------------------------------------------------------
// Go: TestStoreLoadNextMsgWildcardStartBeforeFirstMatch server/store_test.go:118
[Fact]
public void LoadNextMsg_WildcardStartBeforeFirstMatch()
{
var ms = new MemStore();
var s = Sync(ms);
// Fill non-matching subjects first so the first wildcard match starts
// strictly after the requested start sequence.
for (var i = 0; i < 100; i++)
s.StoreMsg($"bar.{i}", null, [], 0);
var (seq, _) = s.StoreMsg("foo.1", null, [], 0);
seq.ShouldBe(101UL);
// Loading with wildcard "foo.*" from seq 1 should find the message at seq 101.
var sm = new StoreMsg();
var (msg, skip) = s.LoadNextMsg("foo.*", true, 1, sm);
msg.Subject.ShouldBe("foo.1");
// skip = seq - start, so seq 101 - start 1 = 100
skip.ShouldBe(100UL);
msg.Sequence.ShouldBe(101UL);
// Loading after seq 101 should throw — no more foo.* messages.
Should.Throw<KeyNotFoundException>(() => s.LoadNextMsg("foo.*", true, 102, null));
}
// -------------------------------------------------------------------------
// SubjectStateConsistency
// -------------------------------------------------------------------------
// Go: TestStoreSubjectStateConsistency server/store_test.go:179
[Fact]
public void SubjectStateConsistency_UpdatesFirstAndLast()
{
var ms = new MemStore();
var s = Sync(ms);
SimpleState GetSubjectState()
{
var ss = s.SubjectsState("foo");
ss.TryGetValue("foo", out var state);
return state;
}
ulong LoadFirstSeq()
{
var sm = new StoreMsg();
var (msg, _) = s.LoadNextMsg("foo", false, 0, sm);
return msg.Sequence;
}
ulong LoadLastSeq()
{
var sm = new StoreMsg();
var msg = s.LoadLastMsg("foo", sm);
return msg.Sequence;
}
// Publish an initial batch of messages.
for (var i = 0; i < 4; i++)
s.StoreMsg("foo", null, [], 0);
// Expect 4 msgs, with first=1, last=4.
var ss = GetSubjectState();
ss.Msgs.ShouldBe(4UL);
ss.First.ShouldBe(1UL);
LoadFirstSeq().ShouldBe(1UL);
ss.Last.ShouldBe(4UL);
LoadLastSeq().ShouldBe(4UL);
// Remove first message — first should update to seq 2.
s.RemoveMsg(1).ShouldBeTrue();
ss = GetSubjectState();
ss.Msgs.ShouldBe(3UL);
ss.First.ShouldBe(2UL);
LoadFirstSeq().ShouldBe(2UL);
ss.Last.ShouldBe(4UL);
LoadLastSeq().ShouldBe(4UL);
// Remove last message — last should update to seq 3.
s.RemoveMsg(4).ShouldBeTrue();
ss = GetSubjectState();
ss.Msgs.ShouldBe(2UL);
ss.First.ShouldBe(2UL);
LoadFirstSeq().ShouldBe(2UL);
ss.Last.ShouldBe(3UL);
LoadLastSeq().ShouldBe(3UL);
// Remove first message again.
s.RemoveMsg(2).ShouldBeTrue();
// Only one message left — first and last should both equal 3.
ss = GetSubjectState();
ss.Msgs.ShouldBe(1UL);
ss.First.ShouldBe(3UL);
LoadFirstSeq().ShouldBe(3UL);
ss.Last.ShouldBe(3UL);
LoadLastSeq().ShouldBe(3UL);
// Publish some more messages so we can test another scenario.
for (var i = 0; i < 3; i++)
s.StoreMsg("foo", null, [], 0);
ss = GetSubjectState();
ss.Msgs.ShouldBe(4UL);
ss.First.ShouldBe(3UL);
LoadFirstSeq().ShouldBe(3UL);
ss.Last.ShouldBe(7UL);
LoadLastSeq().ShouldBe(7UL);
// Remove last sequence.
s.RemoveMsg(7).ShouldBeTrue();
// Remove first sequence.
s.RemoveMsg(3).ShouldBeTrue();
// Remove (now) first sequence 5.
s.RemoveMsg(5).ShouldBeTrue();
// ss.First and ss.Last should both be recalculated and equal each other.
ss = GetSubjectState();
ss.Msgs.ShouldBe(1UL);
ss.First.ShouldBe(6UL);
LoadFirstSeq().ShouldBe(6UL);
ss.Last.ShouldBe(6UL);
LoadLastSeq().ShouldBe(6UL);
// Store a new message and immediately remove it (marks lastNeedsUpdate),
// then store another — that new one becomes the real last.
s.StoreMsg("foo", null, [], 0); // seq 8
s.RemoveMsg(8).ShouldBeTrue();
s.StoreMsg("foo", null, [], 0); // seq 9
ss = GetSubjectState();
ss.Msgs.ShouldBe(2UL);
ss.First.ShouldBe(6UL);
LoadFirstSeq().ShouldBe(6UL);
ss.Last.ShouldBe(9UL);
LoadLastSeq().ShouldBe(9UL);
}
// -------------------------------------------------------------------------
// CompactCleansUpDmap — parameterised over compact sequences 2, 3, 4
// -------------------------------------------------------------------------
// Go: TestStoreCompactCleansUpDmap server/store_test.go:449
[Theory]
[InlineData(2UL)]
[InlineData(3UL)]
[InlineData(4UL)]
public void Compact_CleansUpDmap(ulong compactSeq)
{
var ms = new MemStore();
var s = Sync(ms);
// Publish 3 messages — no interior deletes.
for (var i = 0; i < 3; i++)
s.StoreMsg("foo", null, [], 0);
var state = s.State();
state.NumDeleted.ShouldBe(0);
state.Deleted.ShouldBeNull();
// Removing the middle message creates an interior delete.
s.RemoveMsg(2).ShouldBeTrue();
state = s.State();
state.NumDeleted.ShouldBe(1);
state.Deleted.ShouldNotBeNull();
state.Deleted!.Length.ShouldBe(1);
// Compacting must always clean up the interior delete.
s.Compact(compactSeq);
state = s.State();
state.NumDeleted.ShouldBe(0);
state.Deleted.ShouldBeNull();
// Validate first/last sequence.
var expectedFirst = Math.Max(3UL, compactSeq);
state.FirstSeq.ShouldBe(expectedFirst);
state.LastSeq.ShouldBe(3UL);
}
// -------------------------------------------------------------------------
// TruncateCleansUpDmap — parameterised over truncate sequences 0, 1
// -------------------------------------------------------------------------
// Go: TestStoreTruncateCleansUpDmap server/store_test.go:500
[Theory]
[InlineData(0UL)]
[InlineData(1UL)]
public void Truncate_CleansUpDmap(ulong truncateSeq)
{
var ms = new MemStore();
var s = Sync(ms);
// Publish 3 messages — no interior deletes.
for (var i = 0; i < 3; i++)
s.StoreMsg("foo", null, [], 0);
var state = s.State();
state.NumDeleted.ShouldBe(0);
state.Deleted.ShouldBeNull();
// Removing the middle message creates an interior delete.
s.RemoveMsg(2).ShouldBeTrue();
state = s.State();
state.NumDeleted.ShouldBe(1);
state.Deleted.ShouldNotBeNull();
state.Deleted!.Length.ShouldBe(1);
// Truncating must always clean up the interior delete.
s.Truncate(truncateSeq);
state = s.State();
state.NumDeleted.ShouldBe(0);
state.Deleted.ShouldBeNull();
// Validate first/last sequence after truncate.
var expectedFirst = Math.Min(1UL, truncateSeq);
state.FirstSeq.ShouldBe(expectedFirst);
state.LastSeq.ShouldBe(truncateSeq);
}
// -------------------------------------------------------------------------
// PurgeEx with zero sequence — must equal Purge
// -------------------------------------------------------------------------
// Go: TestStorePurgeExZero server/store_test.go:552
[Fact]
public void PurgeEx_ZeroSeq_EquivalentToPurge()
{
var ms = new MemStore();
var s = Sync(ms);
// Simple purge all — stream should be empty but first seq = last seq + 1.
s.Purge();
var ss = s.State();
ss.FirstSeq.ShouldBe(1UL);
ss.LastSeq.ShouldBe(0UL);
// PurgeEx with seq=0 must produce the same result.
s.PurgeEx("", 0, 0);
ss = s.State();
ss.FirstSeq.ShouldBe(1UL);
ss.LastSeq.ShouldBe(0UL);
}
// -------------------------------------------------------------------------
// UpdateConfig TTL state — message survives when TTL is disabled
// -------------------------------------------------------------------------
// Go: TestStoreUpdateConfigTTLState server/store_test.go:574
[Fact]
public async Task UpdateConfigTTLState_MessageSurvivesWhenTtlDisabled()
{
var cfg = new StreamConfig
{
Name = "TEST",
Subjects = ["foo"],
Storage = StorageType.Memory,
AllowMsgTtl = false,
};
var ms = new MemStore(cfg);
var s = Sync(ms);
// TTLs disabled — message with ttl=1s should survive even after 2s.
var (seq, _) = s.StoreMsg("foo", null, [], 1);
await Task.Delay(2_500);
// Should not throw — message should still be present.
var loaded = s.LoadMsg(seq, null);
loaded.Sequence.ShouldBe(seq);
// Now enable TTLs.
cfg.AllowMsgTtl = true;
s.UpdateConfig(cfg);
// TTLs enabled — message with ttl=1s should expire.
var (seq2, _) = s.StoreMsg("foo", null, [], 1);
await PollHelper.WaitOrThrowAsync(() =>
{
try { s.LoadMsg(seq2, null); return false; }
catch (KeyNotFoundException) { return true; }
}, "TTL expiry", timeoutMs: 10_000, intervalMs: 100);
// Now disable TTLs again.
cfg.AllowMsgTtl = false;
s.UpdateConfig(cfg);
// TTLs disabled — message with ttl=1s should survive.
var (seq3, _) = s.StoreMsg("foo", null, [], 1);
await Task.Delay(2_500);
// Should not throw — TTL wheel is gone so message stays.
var loaded3 = s.LoadMsg(seq3, null);
loaded3.Sequence.ShouldBe(seq3);
}
// -------------------------------------------------------------------------
// StreamInteriorDeleteAccounting — MemStore subset (no FileStore restart)
// -------------------------------------------------------------------------
// Go: TestStoreStreamInteriorDeleteAccounting server/store_test.go:621
// Tests TruncateWithRemove and TruncateWithErase variants on MemStore.
[Theory]
[InlineData(false, "TruncateWithRemove")]
[InlineData(false, "TruncateWithErase")]
[InlineData(false, "SkipMsg")]
[InlineData(false, "SkipMsgs")]
[InlineData(true, "TruncateWithRemove")]
[InlineData(true, "TruncateWithErase")]
[InlineData(true, "SkipMsg")]
[InlineData(true, "SkipMsgs")]
public void InteriorDeleteAccounting_StateIsCorrect(bool empty, string actionTitle)
{
var ms = new MemStore();
var s = Sync(ms);
ulong lseq = 0;
if (!empty)
{
var (storedSeq, _) = s.StoreMsg("foo", null, [], 0);
storedSeq.ShouldBe(1UL);
lseq = storedSeq;
}
lseq++;
switch (actionTitle)
{
case "TruncateWithRemove":
{
var (storedSeq, _) = s.StoreMsg("foo", null, [], 0);
storedSeq.ShouldBe(lseq);
s.RemoveMsg(lseq).ShouldBeTrue();
s.Truncate(lseq);
break;
}
case "TruncateWithErase":
{
var (storedSeq, _) = s.StoreMsg("foo", null, [], 0);
storedSeq.ShouldBe(lseq);
s.EraseMsg(lseq).ShouldBeTrue();
s.Truncate(lseq);
break;
}
case "SkipMsg":
{
s.SkipMsg(0);
break;
}
case "SkipMsgs":
{
s.SkipMsgs(lseq, 1);
break;
}
}
// Confirm state.
var before = s.State();
if (empty)
{
before.Msgs.ShouldBe(0UL);
before.FirstSeq.ShouldBe(2UL);
before.LastSeq.ShouldBe(1UL);
}
else
{
before.Msgs.ShouldBe(1UL);
before.FirstSeq.ShouldBe(1UL);
before.LastSeq.ShouldBe(2UL);
}
}
// -------------------------------------------------------------------------
// GetSeqFromTime with interior deletes gap
// -------------------------------------------------------------------------
// Go: TestStoreGetSeqFromTimeWithInteriorDeletesGap server/store_test.go:874
[Fact]
public void GetSeqFromTime_WithInteriorDeletesGap()
{
var ms = new MemStore();
var s = Sync(ms);
// Use StoreRawMsg with explicit timestamps to avoid flakiness under CPU pressure.
var baseTs = (DateTime.UtcNow.Ticks - DateTime.UnixEpoch.Ticks) * 100L;
for (var i = 0; i < 10; i++)
s.StoreRawMsg("foo", null, [], (ulong)(i + 1), baseTs + i * 1_000_000L, 0, false);
var startTs = baseTs + 1_000_000L; // timestamp of seq 2 (i == 1)
// Create a delete gap in the middle: seqs 4-7 deleted.
// A naive binary search would hit deleted sequences and return wrong result.
for (var seq = 4UL; seq <= 7UL; seq++)
s.RemoveMsg(seq).ShouldBeTrue();
// Convert Unix nanoseconds timestamp to DateTime.
var t = new DateTime(startTs / 100L + DateTime.UnixEpoch.Ticks, DateTimeKind.Utc);
var found = s.GetSeqFromTime(t);
found.ShouldBe(2UL);
}
// -------------------------------------------------------------------------
// GetSeqFromTime with trailing deletes
// -------------------------------------------------------------------------
// Go: TestStoreGetSeqFromTimeWithTrailingDeletes server/store_test.go:900
[Fact]
public void GetSeqFromTime_WithTrailingDeletes()
{
var ms = new MemStore();
var s = Sync(ms);
// Use StoreRawMsg with explicit timestamps to avoid flakiness under CPU pressure.
var baseTs = (DateTime.UtcNow.Ticks - DateTime.UnixEpoch.Ticks) * 100L;
for (var i = 0; i < 3; i++)
s.StoreRawMsg("foo", null, [], (ulong)(i + 1), baseTs + i * 1_000_000L, 0, false);
var startTs = baseTs + 1_000_000L; // timestamp of seq 2 (i == 1)
// Delete last message — trailing delete.
s.RemoveMsg(3).ShouldBeTrue();
var t = new DateTime(startTs / 100L + DateTime.UnixEpoch.Ticks, DateTimeKind.Utc);
var found = s.GetSeqFromTime(t);
found.ShouldBe(2UL);
}
// -------------------------------------------------------------------------
// MaxMsgsPerUpdateBug — per-subject limit enforced on config update
// -------------------------------------------------------------------------
// Go: TestStoreMaxMsgsPerUpdateBug server/store_test.go:405
[Fact]
public void MaxMsgsPerUpdateBug_ReducesOnConfigUpdate()
{
var cfg = new StreamConfig
{
Name = "TEST",
Subjects = ["foo"],
MaxMsgsPer = 0,
Storage = StorageType.Memory,
};
var ms = new MemStore(cfg);
var s = Sync(ms);
for (var i = 0; i < 5; i++)
s.StoreMsg("foo", null, [], 0);
var state = s.State();
state.Msgs.ShouldBe(5UL);
state.FirstSeq.ShouldBe(1UL);
state.LastSeq.ShouldBe(5UL);
// Update max messages per-subject from 0 (infinite) to 1.
// Since the per-subject limit was not specified before, messages should be
// removed upon config update, leaving only the most recent.
cfg.MaxMsgsPer = 1;
s.UpdateConfig(cfg);
state = s.State();
state.Msgs.ShouldBe(1UL);
state.FirstSeq.ShouldBe(5UL);
state.LastSeq.ShouldBe(5UL);
}
// -------------------------------------------------------------------------
// MultiLastSeqs and LoadLastMsg with lazy subject state
// -------------------------------------------------------------------------
// Go: TestFileStoreMultiLastSeqsAndLoadLastMsgWithLazySubjectState server/store_test.go:921
[Fact]
public void MultiLastSeqs_AndLoadLastMsg_WithLazySubjectState()
{
var ms = new MemStore();
var s = Sync(ms);
for (var i = 0; i < 3; i++)
s.StoreMsg("foo", null, [], 0);
// MultiLastSeqs for "foo" should return [3].
var seqs = s.MultiLastSeqs(["foo"], 0, -1);
seqs.Length.ShouldBe(1);
seqs[0].ShouldBe(3UL);
// Remove last message — lazy last needs update.
s.RemoveMsg(3).ShouldBeTrue();
seqs = s.MultiLastSeqs(["foo"], 0, -1);
seqs.Length.ShouldBe(1);
seqs[0].ShouldBe(2UL);
// Store another and load it as last.
s.StoreMsg("foo", null, [], 0); // seq 4
var lastMsg = s.LoadLastMsg("foo", null);
lastMsg.Sequence.ShouldBe(4UL);
// Remove seq 4 — lazy last update again.
s.RemoveMsg(4).ShouldBeTrue();
lastMsg = s.LoadLastMsg("foo", null);
lastMsg.Sequence.ShouldBe(2UL);
}
[Fact]
public void FileStore_LoadMsg_preserves_headers_separately_from_payload()
{
var dir = Directory.CreateTempSubdirectory();
using var store = new FileStore(new FileStoreOptions { Directory = dir.FullName });
var hdr = "NATS/1.0\r\nX-Test: one\r\n\r\n"u8.ToArray();
var msg = "payload"u8.ToArray();
var (seq, _) = store.StoreMsg("foo", hdr, msg, 0L);
var loaded = store.LoadMsg(seq, null);
loaded.Header.ShouldNotBeNull();
loaded.Header.ShouldBe(hdr);
loaded.Data.ShouldNotBeNull();
loaded.Data.ShouldBe(msg);
}
}