feat: add store interface contract tests & fix Compact dmap cleanup (Task 4)
Port Go store_test.go contract tests for IStreamStore interface using MemStore. Fix CompactInternal to correctly walk forward to find new FirstSeq before backward cleanup (matching Go behavior). 21 new tests ported from store_test.go.
This commit is contained in:
@@ -264,10 +264,15 @@ public sealed class MemStore : IStreamStore
|
|||||||
{
|
{
|
||||||
lock (_gate)
|
lock (_gate)
|
||||||
{
|
{
|
||||||
|
// Go: memStore state — when empty but FirstSeq was configured, report it.
|
||||||
|
// Reference: server/memstore.go — State() preserves first.seq watermark.
|
||||||
|
var firstSeqForApi = _st.Msgs == 0
|
||||||
|
? (_st.FirstSeq > 0 ? _st.FirstSeq : 0UL)
|
||||||
|
: _st.FirstSeq;
|
||||||
return ValueTask.FromResult(new ApiStreamState
|
return ValueTask.FromResult(new ApiStreamState
|
||||||
{
|
{
|
||||||
Messages = _st.Msgs,
|
Messages = _st.Msgs,
|
||||||
FirstSeq = _st.Msgs == 0 ? 0UL : _st.FirstSeq,
|
FirstSeq = firstSeqForApi,
|
||||||
LastSeq = _st.LastSeq,
|
LastSeq = _st.LastSeq,
|
||||||
Bytes = _st.Bytes,
|
Bytes = _st.Bytes,
|
||||||
});
|
});
|
||||||
@@ -1048,19 +1053,22 @@ public sealed class MemStore : IStreamStore
|
|||||||
if (seq <= _st.LastSeq)
|
if (seq <= _st.LastSeq)
|
||||||
{
|
{
|
||||||
var fseq = _st.FirstSeq;
|
var fseq = _st.FirstSeq;
|
||||||
// Find the actual new first seq
|
// Go: walk forward from seq to find first real message, mutating seq in-place.
|
||||||
for (var s = seq; s <= _st.LastSeq; s++)
|
// The backward cleanup loop must start from (newFirstSeq - 1) to correctly
|
||||||
|
// remove any dmap entries at the original compact seq that are no longer interior.
|
||||||
|
var newFirst = seq;
|
||||||
|
for (; newFirst <= _st.LastSeq; newFirst++)
|
||||||
{
|
{
|
||||||
if (_msgs.TryGetValue(s, out var nm))
|
if (_msgs.TryGetValue(newFirst, out var nm))
|
||||||
{
|
{
|
||||||
_st.FirstSeq = s;
|
_st.FirstSeq = newFirst;
|
||||||
_st.FirstTime = new DateTime(nm.Ts / 100L, DateTimeKind.Utc);
|
_st.FirstTime = new DateTime(nm.Ts / 100L, DateTimeKind.Utc);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ulong purged = 0;
|
ulong purged = 0;
|
||||||
for (var s = seq - 1; s >= fseq && s < ulong.MaxValue; s--)
|
for (var s = newFirst - 1; s >= fseq && s < ulong.MaxValue; s--)
|
||||||
{
|
{
|
||||||
if (_msgs.TryGetValue(s, out var m))
|
if (_msgs.TryGetValue(s, out var m))
|
||||||
{
|
{
|
||||||
|
|||||||
536
tests/NATS.Server.Tests/JetStream/Storage/StoreInterfaceTests.cs
Normal file
536
tests/NATS.Server.Tests/JetStream/Storage/StoreInterfaceTests.cs
Normal file
@@ -0,0 +1,536 @@
|
|||||||
|
// Reference: golang/nats-server/server/store_test.go
|
||||||
|
// Tests ported in this file:
|
||||||
|
// TestStoreLoadNextMsgWildcardStartBeforeFirstMatch → LoadNextMsg_WildcardStartBeforeFirstMatch
|
||||||
|
// TestStoreSubjectStateConsistency → SubjectStateConsistency_UpdatesFirstAndLast
|
||||||
|
// TestStoreCompactCleansUpDmap → Compact_CleansUpDmap (parameterised)
|
||||||
|
// TestStoreTruncateCleansUpDmap → Truncate_CleansUpDmap (parameterised)
|
||||||
|
// TestStorePurgeExZero → PurgeEx_ZeroSeq_EquivalentToPurge
|
||||||
|
// TestStoreUpdateConfigTTLState → UpdateConfigTTLState_MessageSurvivesWhenTtlDisabled
|
||||||
|
// TestStoreStreamInteriorDeleteAccounting → InteriorDeleteAccounting_MemStore (subset without FileStore restart)
|
||||||
|
// TestStoreGetSeqFromTimeWithInteriorDeletesGap → GetSeqFromTime_WithInteriorDeletesGap
|
||||||
|
// TestStoreGetSeqFromTimeWithTrailingDeletes → GetSeqFromTime_WithTrailingDeletes
|
||||||
|
// TestStoreMaxMsgsPerUpdateBug → MaxMsgsPerUpdateBug_ReducesOnConfigUpdate
|
||||||
|
// TestFileStoreMultiLastSeqsAndLoadLastMsgWithLazySubjectState → MultiLastSeqs_AndLoadLastMsg_WithLazySubjectState
|
||||||
|
|
||||||
|
using NATS.Server.JetStream.Models;
|
||||||
|
using NATS.Server.JetStream.Storage;
|
||||||
|
|
||||||
|
namespace NATS.Server.Tests.JetStream.Storage;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// IStreamStore interface contract tests. Validates behaviour shared by all store
|
||||||
|
/// implementations using MemStore (the simplest implementation).
|
||||||
|
/// Each test mirrors a specific Go test from golang/nats-server/server/store_test.go.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class StoreInterfaceTests
|
||||||
|
{
|
||||||
|
// Helper: cast MemStore to IStreamStore to access sync interface methods.
|
||||||
|
private static IStreamStore Sync(MemStore ms) => ms;
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// LoadNextMsg — wildcard start before first match
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
// Go: TestStoreLoadNextMsgWildcardStartBeforeFirstMatch server/store_test.go:118
|
||||||
|
[Fact]
|
||||||
|
public void LoadNextMsg_WildcardStartBeforeFirstMatch()
|
||||||
|
{
|
||||||
|
var ms = new MemStore();
|
||||||
|
var s = Sync(ms);
|
||||||
|
|
||||||
|
// Fill non-matching subjects first so the first wildcard match starts
|
||||||
|
// strictly after the requested start sequence.
|
||||||
|
for (var i = 0; i < 100; i++)
|
||||||
|
s.StoreMsg($"bar.{i}", null, [], 0);
|
||||||
|
|
||||||
|
var (seq, _) = s.StoreMsg("foo.1", null, [], 0);
|
||||||
|
seq.ShouldBe(101UL);
|
||||||
|
|
||||||
|
// Loading with wildcard "foo.*" from seq 1 should find the message at seq 101.
|
||||||
|
var sm = new StoreMsg();
|
||||||
|
var (msg, skip) = s.LoadNextMsg("foo.*", true, 1, sm);
|
||||||
|
msg.Subject.ShouldBe("foo.1");
|
||||||
|
// skip = seq - start, so seq 101 - start 1 = 100
|
||||||
|
skip.ShouldBe(100UL);
|
||||||
|
msg.Sequence.ShouldBe(101UL);
|
||||||
|
|
||||||
|
// Loading after seq 101 should throw — no more foo.* messages.
|
||||||
|
Should.Throw<KeyNotFoundException>(() => s.LoadNextMsg("foo.*", true, 102, null));
|
||||||
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// SubjectStateConsistency
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
// Go: TestStoreSubjectStateConsistency server/store_test.go:179
|
||||||
|
[Fact]
|
||||||
|
public void SubjectStateConsistency_UpdatesFirstAndLast()
|
||||||
|
{
|
||||||
|
var ms = new MemStore();
|
||||||
|
var s = Sync(ms);
|
||||||
|
|
||||||
|
SimpleState GetSubjectState()
|
||||||
|
{
|
||||||
|
var ss = s.SubjectsState("foo");
|
||||||
|
ss.TryGetValue("foo", out var state);
|
||||||
|
return state;
|
||||||
|
}
|
||||||
|
|
||||||
|
ulong LoadFirstSeq()
|
||||||
|
{
|
||||||
|
var sm = new StoreMsg();
|
||||||
|
var (msg, _) = s.LoadNextMsg("foo", false, 0, sm);
|
||||||
|
return msg.Sequence;
|
||||||
|
}
|
||||||
|
|
||||||
|
ulong LoadLastSeq()
|
||||||
|
{
|
||||||
|
var sm = new StoreMsg();
|
||||||
|
var msg = s.LoadLastMsg("foo", sm);
|
||||||
|
return msg.Sequence;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Publish an initial batch of messages.
|
||||||
|
for (var i = 0; i < 4; i++)
|
||||||
|
s.StoreMsg("foo", null, [], 0);
|
||||||
|
|
||||||
|
// Expect 4 msgs, with first=1, last=4.
|
||||||
|
var ss = GetSubjectState();
|
||||||
|
ss.Msgs.ShouldBe(4UL);
|
||||||
|
ss.First.ShouldBe(1UL);
|
||||||
|
LoadFirstSeq().ShouldBe(1UL);
|
||||||
|
ss.Last.ShouldBe(4UL);
|
||||||
|
LoadLastSeq().ShouldBe(4UL);
|
||||||
|
|
||||||
|
// Remove first message — first should update to seq 2.
|
||||||
|
s.RemoveMsg(1).ShouldBeTrue();
|
||||||
|
|
||||||
|
ss = GetSubjectState();
|
||||||
|
ss.Msgs.ShouldBe(3UL);
|
||||||
|
ss.First.ShouldBe(2UL);
|
||||||
|
LoadFirstSeq().ShouldBe(2UL);
|
||||||
|
ss.Last.ShouldBe(4UL);
|
||||||
|
LoadLastSeq().ShouldBe(4UL);
|
||||||
|
|
||||||
|
// Remove last message — last should update to seq 3.
|
||||||
|
s.RemoveMsg(4).ShouldBeTrue();
|
||||||
|
|
||||||
|
ss = GetSubjectState();
|
||||||
|
ss.Msgs.ShouldBe(2UL);
|
||||||
|
ss.First.ShouldBe(2UL);
|
||||||
|
LoadFirstSeq().ShouldBe(2UL);
|
||||||
|
ss.Last.ShouldBe(3UL);
|
||||||
|
LoadLastSeq().ShouldBe(3UL);
|
||||||
|
|
||||||
|
// Remove first message again.
|
||||||
|
s.RemoveMsg(2).ShouldBeTrue();
|
||||||
|
|
||||||
|
// Only one message left — first and last should both equal 3.
|
||||||
|
ss = GetSubjectState();
|
||||||
|
ss.Msgs.ShouldBe(1UL);
|
||||||
|
ss.First.ShouldBe(3UL);
|
||||||
|
LoadFirstSeq().ShouldBe(3UL);
|
||||||
|
ss.Last.ShouldBe(3UL);
|
||||||
|
LoadLastSeq().ShouldBe(3UL);
|
||||||
|
|
||||||
|
// Publish some more messages so we can test another scenario.
|
||||||
|
for (var i = 0; i < 3; i++)
|
||||||
|
s.StoreMsg("foo", null, [], 0);
|
||||||
|
|
||||||
|
ss = GetSubjectState();
|
||||||
|
ss.Msgs.ShouldBe(4UL);
|
||||||
|
ss.First.ShouldBe(3UL);
|
||||||
|
LoadFirstSeq().ShouldBe(3UL);
|
||||||
|
ss.Last.ShouldBe(7UL);
|
||||||
|
LoadLastSeq().ShouldBe(7UL);
|
||||||
|
|
||||||
|
// Remove last sequence.
|
||||||
|
s.RemoveMsg(7).ShouldBeTrue();
|
||||||
|
|
||||||
|
// Remove first sequence.
|
||||||
|
s.RemoveMsg(3).ShouldBeTrue();
|
||||||
|
|
||||||
|
// Remove (now) first sequence 5.
|
||||||
|
s.RemoveMsg(5).ShouldBeTrue();
|
||||||
|
|
||||||
|
// ss.First and ss.Last should both be recalculated and equal each other.
|
||||||
|
ss = GetSubjectState();
|
||||||
|
ss.Msgs.ShouldBe(1UL);
|
||||||
|
ss.First.ShouldBe(6UL);
|
||||||
|
LoadFirstSeq().ShouldBe(6UL);
|
||||||
|
ss.Last.ShouldBe(6UL);
|
||||||
|
LoadLastSeq().ShouldBe(6UL);
|
||||||
|
|
||||||
|
// Store a new message and immediately remove it (marks lastNeedsUpdate),
|
||||||
|
// then store another — that new one becomes the real last.
|
||||||
|
s.StoreMsg("foo", null, [], 0); // seq 8
|
||||||
|
s.RemoveMsg(8).ShouldBeTrue();
|
||||||
|
s.StoreMsg("foo", null, [], 0); // seq 9
|
||||||
|
|
||||||
|
ss = GetSubjectState();
|
||||||
|
ss.Msgs.ShouldBe(2UL);
|
||||||
|
ss.First.ShouldBe(6UL);
|
||||||
|
LoadFirstSeq().ShouldBe(6UL);
|
||||||
|
ss.Last.ShouldBe(9UL);
|
||||||
|
LoadLastSeq().ShouldBe(9UL);
|
||||||
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// CompactCleansUpDmap — parameterised over compact sequences 2, 3, 4
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
// Go: TestStoreCompactCleansUpDmap server/store_test.go:449
|
||||||
|
[Theory]
|
||||||
|
[InlineData(2UL)]
|
||||||
|
[InlineData(3UL)]
|
||||||
|
[InlineData(4UL)]
|
||||||
|
public void Compact_CleansUpDmap(ulong compactSeq)
|
||||||
|
{
|
||||||
|
var ms = new MemStore();
|
||||||
|
var s = Sync(ms);
|
||||||
|
|
||||||
|
// Publish 3 messages — no interior deletes.
|
||||||
|
for (var i = 0; i < 3; i++)
|
||||||
|
s.StoreMsg("foo", null, [], 0);
|
||||||
|
|
||||||
|
var state = s.State();
|
||||||
|
state.NumDeleted.ShouldBe(0);
|
||||||
|
state.Deleted.ShouldBeNull();
|
||||||
|
|
||||||
|
// Removing the middle message creates an interior delete.
|
||||||
|
s.RemoveMsg(2).ShouldBeTrue();
|
||||||
|
state = s.State();
|
||||||
|
state.NumDeleted.ShouldBe(1);
|
||||||
|
state.Deleted.ShouldNotBeNull();
|
||||||
|
state.Deleted!.Length.ShouldBe(1);
|
||||||
|
|
||||||
|
// Compacting must always clean up the interior delete.
|
||||||
|
s.Compact(compactSeq);
|
||||||
|
state = s.State();
|
||||||
|
state.NumDeleted.ShouldBe(0);
|
||||||
|
state.Deleted.ShouldBeNull();
|
||||||
|
|
||||||
|
// Validate first/last sequence.
|
||||||
|
var expectedFirst = Math.Max(3UL, compactSeq);
|
||||||
|
state.FirstSeq.ShouldBe(expectedFirst);
|
||||||
|
state.LastSeq.ShouldBe(3UL);
|
||||||
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// TruncateCleansUpDmap — parameterised over truncate sequences 0, 1
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
// Go: TestStoreTruncateCleansUpDmap server/store_test.go:500
|
||||||
|
[Theory]
|
||||||
|
[InlineData(0UL)]
|
||||||
|
[InlineData(1UL)]
|
||||||
|
public void Truncate_CleansUpDmap(ulong truncateSeq)
|
||||||
|
{
|
||||||
|
var ms = new MemStore();
|
||||||
|
var s = Sync(ms);
|
||||||
|
|
||||||
|
// Publish 3 messages — no interior deletes.
|
||||||
|
for (var i = 0; i < 3; i++)
|
||||||
|
s.StoreMsg("foo", null, [], 0);
|
||||||
|
|
||||||
|
var state = s.State();
|
||||||
|
state.NumDeleted.ShouldBe(0);
|
||||||
|
state.Deleted.ShouldBeNull();
|
||||||
|
|
||||||
|
// Removing the middle message creates an interior delete.
|
||||||
|
s.RemoveMsg(2).ShouldBeTrue();
|
||||||
|
state = s.State();
|
||||||
|
state.NumDeleted.ShouldBe(1);
|
||||||
|
state.Deleted.ShouldNotBeNull();
|
||||||
|
state.Deleted!.Length.ShouldBe(1);
|
||||||
|
|
||||||
|
// Truncating must always clean up the interior delete.
|
||||||
|
s.Truncate(truncateSeq);
|
||||||
|
state = s.State();
|
||||||
|
state.NumDeleted.ShouldBe(0);
|
||||||
|
state.Deleted.ShouldBeNull();
|
||||||
|
|
||||||
|
// Validate first/last sequence after truncate.
|
||||||
|
var expectedFirst = Math.Min(1UL, truncateSeq);
|
||||||
|
state.FirstSeq.ShouldBe(expectedFirst);
|
||||||
|
state.LastSeq.ShouldBe(truncateSeq);
|
||||||
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// PurgeEx with zero sequence — must equal Purge
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
// Go: TestStorePurgeExZero server/store_test.go:552
|
||||||
|
[Fact]
|
||||||
|
public void PurgeEx_ZeroSeq_EquivalentToPurge()
|
||||||
|
{
|
||||||
|
var ms = new MemStore();
|
||||||
|
var s = Sync(ms);
|
||||||
|
|
||||||
|
// Simple purge all — stream should be empty but first seq = last seq + 1.
|
||||||
|
s.Purge();
|
||||||
|
var ss = s.State();
|
||||||
|
ss.FirstSeq.ShouldBe(1UL);
|
||||||
|
ss.LastSeq.ShouldBe(0UL);
|
||||||
|
|
||||||
|
// PurgeEx with seq=0 must produce the same result.
|
||||||
|
s.PurgeEx("", 0, 0);
|
||||||
|
ss = s.State();
|
||||||
|
ss.FirstSeq.ShouldBe(1UL);
|
||||||
|
ss.LastSeq.ShouldBe(0UL);
|
||||||
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// UpdateConfig TTL state — message survives when TTL is disabled
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
// Go: TestStoreUpdateConfigTTLState server/store_test.go:574
|
||||||
|
[Fact]
|
||||||
|
public void UpdateConfigTTLState_MessageSurvivesWhenTtlDisabled()
|
||||||
|
{
|
||||||
|
var cfg = new StreamConfig
|
||||||
|
{
|
||||||
|
Name = "TEST",
|
||||||
|
Subjects = ["foo"],
|
||||||
|
Storage = StorageType.Memory,
|
||||||
|
AllowMsgTtl = false,
|
||||||
|
};
|
||||||
|
var ms = new MemStore(cfg);
|
||||||
|
var s = Sync(ms);
|
||||||
|
|
||||||
|
// TTLs disabled — message with ttl=1s should survive even after 2s.
|
||||||
|
var (seq, _) = s.StoreMsg("foo", null, [], 1);
|
||||||
|
Thread.Sleep(2_000);
|
||||||
|
// Should not throw — message should still be present.
|
||||||
|
var loaded = s.LoadMsg(seq, null);
|
||||||
|
loaded.Sequence.ShouldBe(seq);
|
||||||
|
|
||||||
|
// Now enable TTLs.
|
||||||
|
cfg.AllowMsgTtl = true;
|
||||||
|
s.UpdateConfig(cfg);
|
||||||
|
|
||||||
|
// TTLs enabled — message with ttl=1s should expire.
|
||||||
|
var (seq2, _) = s.StoreMsg("foo", null, [], 1);
|
||||||
|
Thread.Sleep(2_500);
|
||||||
|
// Should throw — message should have expired.
|
||||||
|
Should.Throw<KeyNotFoundException>(() => s.LoadMsg(seq2, null));
|
||||||
|
|
||||||
|
// Now disable TTLs again.
|
||||||
|
cfg.AllowMsgTtl = false;
|
||||||
|
s.UpdateConfig(cfg);
|
||||||
|
|
||||||
|
// TTLs disabled — message with ttl=1s should survive.
|
||||||
|
var (seq3, _) = s.StoreMsg("foo", null, [], 1);
|
||||||
|
Thread.Sleep(2_000);
|
||||||
|
// Should not throw — TTL wheel is gone so message stays.
|
||||||
|
var loaded3 = s.LoadMsg(seq3, null);
|
||||||
|
loaded3.Sequence.ShouldBe(seq3);
|
||||||
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// StreamInteriorDeleteAccounting — MemStore subset (no FileStore restart)
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
// Go: TestStoreStreamInteriorDeleteAccounting server/store_test.go:621
|
||||||
|
// Tests TruncateWithRemove and TruncateWithErase variants on MemStore.
|
||||||
|
[Theory]
|
||||||
|
[InlineData(false, "TruncateWithRemove")]
|
||||||
|
[InlineData(false, "TruncateWithErase")]
|
||||||
|
[InlineData(false, "SkipMsg")]
|
||||||
|
[InlineData(false, "SkipMsgs")]
|
||||||
|
[InlineData(true, "TruncateWithRemove")]
|
||||||
|
[InlineData(true, "TruncateWithErase")]
|
||||||
|
[InlineData(true, "SkipMsg")]
|
||||||
|
[InlineData(true, "SkipMsgs")]
|
||||||
|
public void InteriorDeleteAccounting_StateIsCorrect(bool empty, string actionTitle)
|
||||||
|
{
|
||||||
|
var ms = new MemStore();
|
||||||
|
var s = Sync(ms);
|
||||||
|
|
||||||
|
ulong lseq = 0;
|
||||||
|
if (!empty)
|
||||||
|
{
|
||||||
|
var (storedSeq, _) = s.StoreMsg("foo", null, [], 0);
|
||||||
|
storedSeq.ShouldBe(1UL);
|
||||||
|
lseq = storedSeq;
|
||||||
|
}
|
||||||
|
lseq++;
|
||||||
|
|
||||||
|
switch (actionTitle)
|
||||||
|
{
|
||||||
|
case "TruncateWithRemove":
|
||||||
|
{
|
||||||
|
var (storedSeq, _) = s.StoreMsg("foo", null, [], 0);
|
||||||
|
storedSeq.ShouldBe(lseq);
|
||||||
|
s.RemoveMsg(lseq).ShouldBeTrue();
|
||||||
|
s.Truncate(lseq);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case "TruncateWithErase":
|
||||||
|
{
|
||||||
|
var (storedSeq, _) = s.StoreMsg("foo", null, [], 0);
|
||||||
|
storedSeq.ShouldBe(lseq);
|
||||||
|
s.EraseMsg(lseq).ShouldBeTrue();
|
||||||
|
s.Truncate(lseq);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case "SkipMsg":
|
||||||
|
{
|
||||||
|
s.SkipMsg(0);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case "SkipMsgs":
|
||||||
|
{
|
||||||
|
s.SkipMsgs(lseq, 1);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Confirm state.
|
||||||
|
var before = s.State();
|
||||||
|
if (empty)
|
||||||
|
{
|
||||||
|
before.Msgs.ShouldBe(0UL);
|
||||||
|
before.FirstSeq.ShouldBe(2UL);
|
||||||
|
before.LastSeq.ShouldBe(1UL);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
before.Msgs.ShouldBe(1UL);
|
||||||
|
before.FirstSeq.ShouldBe(1UL);
|
||||||
|
before.LastSeq.ShouldBe(2UL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// GetSeqFromTime with interior deletes gap
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
// Go: TestStoreGetSeqFromTimeWithInteriorDeletesGap server/store_test.go:874
|
||||||
|
[Fact]
|
||||||
|
public void GetSeqFromTime_WithInteriorDeletesGap()
|
||||||
|
{
|
||||||
|
var ms = new MemStore();
|
||||||
|
var s = Sync(ms);
|
||||||
|
|
||||||
|
long startTs = 0;
|
||||||
|
for (var i = 0; i < 10; i++)
|
||||||
|
{
|
||||||
|
var (_, ts) = s.StoreMsg("foo", null, [], 0);
|
||||||
|
if (i == 1)
|
||||||
|
startTs = ts;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a delete gap in the middle: seqs 4-7 deleted.
|
||||||
|
// A naive binary search would hit deleted sequences and return wrong result.
|
||||||
|
for (var seq = 4UL; seq <= 7UL; seq++)
|
||||||
|
s.RemoveMsg(seq).ShouldBeTrue();
|
||||||
|
|
||||||
|
// Convert nanoseconds timestamp to DateTime.
|
||||||
|
var t = new DateTime(startTs / 100L, DateTimeKind.Utc);
|
||||||
|
var found = s.GetSeqFromTime(t);
|
||||||
|
found.ShouldBe(2UL);
|
||||||
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// GetSeqFromTime with trailing deletes
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
// Go: TestStoreGetSeqFromTimeWithTrailingDeletes server/store_test.go:900
|
||||||
|
[Fact]
|
||||||
|
public void GetSeqFromTime_WithTrailingDeletes()
|
||||||
|
{
|
||||||
|
var ms = new MemStore();
|
||||||
|
var s = Sync(ms);
|
||||||
|
|
||||||
|
long startTs = 0;
|
||||||
|
for (var i = 0; i < 3; i++)
|
||||||
|
{
|
||||||
|
var (_, ts) = s.StoreMsg("foo", null, [], 0);
|
||||||
|
if (i == 1)
|
||||||
|
startTs = ts;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete last message — trailing delete.
|
||||||
|
s.RemoveMsg(3).ShouldBeTrue();
|
||||||
|
|
||||||
|
var t = new DateTime(startTs / 100L, DateTimeKind.Utc);
|
||||||
|
var found = s.GetSeqFromTime(t);
|
||||||
|
found.ShouldBe(2UL);
|
||||||
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// MaxMsgsPerUpdateBug — per-subject limit enforced on config update
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
// Go: TestStoreMaxMsgsPerUpdateBug server/store_test.go:405
|
||||||
|
[Fact]
|
||||||
|
public void MaxMsgsPerUpdateBug_ReducesOnConfigUpdate()
|
||||||
|
{
|
||||||
|
var cfg = new StreamConfig
|
||||||
|
{
|
||||||
|
Name = "TEST",
|
||||||
|
Subjects = ["foo"],
|
||||||
|
MaxMsgsPer = 0,
|
||||||
|
Storage = StorageType.Memory,
|
||||||
|
};
|
||||||
|
var ms = new MemStore(cfg);
|
||||||
|
var s = Sync(ms);
|
||||||
|
|
||||||
|
for (var i = 0; i < 5; i++)
|
||||||
|
s.StoreMsg("foo", null, [], 0);
|
||||||
|
|
||||||
|
var state = s.State();
|
||||||
|
state.Msgs.ShouldBe(5UL);
|
||||||
|
state.FirstSeq.ShouldBe(1UL);
|
||||||
|
state.LastSeq.ShouldBe(5UL);
|
||||||
|
|
||||||
|
// Update max messages per-subject from 0 (infinite) to 1.
|
||||||
|
// Since the per-subject limit was not specified before, messages should be
|
||||||
|
// removed upon config update, leaving only the most recent.
|
||||||
|
cfg.MaxMsgsPer = 1;
|
||||||
|
s.UpdateConfig(cfg);
|
||||||
|
|
||||||
|
state = s.State();
|
||||||
|
state.Msgs.ShouldBe(1UL);
|
||||||
|
state.FirstSeq.ShouldBe(5UL);
|
||||||
|
state.LastSeq.ShouldBe(5UL);
|
||||||
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// MultiLastSeqs and LoadLastMsg with lazy subject state
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
// Go: TestFileStoreMultiLastSeqsAndLoadLastMsgWithLazySubjectState server/store_test.go:921
|
||||||
|
[Fact]
|
||||||
|
public void MultiLastSeqs_AndLoadLastMsg_WithLazySubjectState()
|
||||||
|
{
|
||||||
|
var ms = new MemStore();
|
||||||
|
var s = Sync(ms);
|
||||||
|
|
||||||
|
for (var i = 0; i < 3; i++)
|
||||||
|
s.StoreMsg("foo", null, [], 0);
|
||||||
|
|
||||||
|
// MultiLastSeqs for "foo" should return [3].
|
||||||
|
var seqs = s.MultiLastSeqs(["foo"], 0, -1);
|
||||||
|
seqs.Length.ShouldBe(1);
|
||||||
|
seqs[0].ShouldBe(3UL);
|
||||||
|
|
||||||
|
// Remove last message — lazy last needs update.
|
||||||
|
s.RemoveMsg(3).ShouldBeTrue();
|
||||||
|
|
||||||
|
seqs = s.MultiLastSeqs(["foo"], 0, -1);
|
||||||
|
seqs.Length.ShouldBe(1);
|
||||||
|
seqs[0].ShouldBe(2UL);
|
||||||
|
|
||||||
|
// Store another and load it as last.
|
||||||
|
s.StoreMsg("foo", null, [], 0); // seq 4
|
||||||
|
var lastMsg = s.LoadLastMsg("foo", null);
|
||||||
|
lastMsg.Sequence.ShouldBe(4UL);
|
||||||
|
|
||||||
|
// Remove seq 4 — lazy last update again.
|
||||||
|
s.RemoveMsg(4).ShouldBeTrue();
|
||||||
|
lastMsg = s.LoadLastMsg("foo", null);
|
||||||
|
lastMsg.Sequence.ShouldBe(2UL);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user