feat: phase B distributed substrate test parity — 39 new tests across 5 subsystems
FileStore basics (4), MemStore/retention (10), RAFT election/append (16), config reload parity (3), monitoring endpoints varz/connz/healthz (6). 972 total tests passing, 0 failures.
This commit is contained in:
165
tests/NATS.Server.Tests/JetStream/Storage/FileStoreBasicTests.cs
Normal file
165
tests/NATS.Server.Tests/JetStream/Storage/FileStoreBasicTests.cs
Normal file
@@ -0,0 +1,165 @@
|
||||
// Reference: golang/nats-server/server/filestore_test.go
|
||||
// Tests ported: TestFileStoreBasics, TestFileStoreMsgHeaders,
|
||||
// TestFileStoreBasicWriteMsgsAndRestore, TestFileStoreRemove
|
||||
|
||||
using NATS.Server.JetStream.Storage;
|
||||
|
||||
namespace NATS.Server.Tests.JetStream.Storage;
|
||||
|
||||
public sealed class FileStoreBasicTests : IDisposable
|
||||
{
|
||||
private readonly string _dir;
|
||||
|
||||
public FileStoreBasicTests()
|
||||
{
|
||||
_dir = Path.Combine(Path.GetTempPath(), $"nats-js-fs-basic-{Guid.NewGuid():N}");
|
||||
Directory.CreateDirectory(_dir);
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (Directory.Exists(_dir))
|
||||
Directory.Delete(_dir, recursive: true);
|
||||
}
|
||||
|
||||
private FileStore CreateStore(string? subdirectory = null)
|
||||
{
|
||||
var dir = subdirectory is null ? _dir : Path.Combine(_dir, subdirectory);
|
||||
return new FileStore(new FileStoreOptions { Directory = dir });
|
||||
}
|
||||
|
||||
// Ref: TestFileStoreBasics — stores 5 msgs, checks sequence numbers,
|
||||
// checks State().Msgs, loads msg by sequence and verifies subject/payload.
|
||||
[Fact]
|
||||
public async Task Store_and_load_messages()
|
||||
{
|
||||
await using var store = CreateStore();
|
||||
|
||||
const string subject = "foo";
|
||||
var payload = "Hello World"u8.ToArray();
|
||||
|
||||
for (var i = 1; i <= 5; i++)
|
||||
{
|
||||
var seq = await store.AppendAsync(subject, payload, default);
|
||||
seq.ShouldBe((ulong)i);
|
||||
}
|
||||
|
||||
var state = await store.GetStateAsync(default);
|
||||
state.Messages.ShouldBe((ulong)5);
|
||||
|
||||
var msg2 = await store.LoadAsync(2, default);
|
||||
msg2.ShouldNotBeNull();
|
||||
msg2!.Subject.ShouldBe(subject);
|
||||
msg2.Payload.ToArray().ShouldBe(payload);
|
||||
|
||||
var msg3 = await store.LoadAsync(3, default);
|
||||
msg3.ShouldNotBeNull();
|
||||
}
|
||||
|
||||
// Ref: TestFileStoreMsgHeaders — stores a message whose payload carries raw
|
||||
// NATS header bytes, then loads it back and verifies the bytes are intact.
|
||||
//
|
||||
// The .NET FileStore keeps headers as part of the payload bytes (callers
|
||||
// embed the NATS wire header in the payload slice they pass in). We
|
||||
// verify round-trip fidelity for a payload that happens to look like a
|
||||
// NATS header line.
|
||||
[Fact]
|
||||
public async Task Store_message_with_headers()
|
||||
{
|
||||
await using var store = CreateStore();
|
||||
|
||||
// Simulate a NATS header embedded in the payload, e.g. "name:derek\r\n\r\nHello World"
|
||||
var headerBytes = "NATS/1.0\r\nname:derek\r\n\r\n"u8.ToArray();
|
||||
var bodyBytes = "Hello World"u8.ToArray();
|
||||
var fullPayload = headerBytes.Concat(bodyBytes).ToArray();
|
||||
|
||||
await store.AppendAsync("foo", fullPayload, default);
|
||||
|
||||
var msg = await store.LoadAsync(1, default);
|
||||
msg.ShouldNotBeNull();
|
||||
msg!.Payload.ToArray().ShouldBe(fullPayload);
|
||||
}
|
||||
|
||||
// Ref: TestFileStoreBasicWriteMsgsAndRestore — stores 100 msgs, disposes
|
||||
// the store, recreates from the same directory, verifies message count
|
||||
// is preserved, stores 100 more, verifies total of 200.
|
||||
[Fact]
|
||||
public async Task Stop_and_restart_preserves_messages()
|
||||
{
|
||||
const int firstBatch = 100;
|
||||
const int secondBatch = 100;
|
||||
|
||||
await using (var store = CreateStore())
|
||||
{
|
||||
for (var i = 1; i <= firstBatch; i++)
|
||||
{
|
||||
var payload = System.Text.Encoding.UTF8.GetBytes($"[{i:D8}] Hello World!");
|
||||
var seq = await store.AppendAsync("foo", payload, default);
|
||||
seq.ShouldBe((ulong)i);
|
||||
}
|
||||
|
||||
var state = await store.GetStateAsync(default);
|
||||
state.Messages.ShouldBe((ulong)firstBatch);
|
||||
}
|
||||
|
||||
// Reopen the same directory.
|
||||
await using (var store = CreateStore())
|
||||
{
|
||||
var state = await store.GetStateAsync(default);
|
||||
state.Messages.ShouldBe((ulong)firstBatch);
|
||||
|
||||
for (var i = firstBatch + 1; i <= firstBatch + secondBatch; i++)
|
||||
{
|
||||
var payload = System.Text.Encoding.UTF8.GetBytes($"[{i:D8}] Hello World!");
|
||||
var seq = await store.AppendAsync("foo", payload, default);
|
||||
seq.ShouldBe((ulong)i);
|
||||
}
|
||||
|
||||
state = await store.GetStateAsync(default);
|
||||
state.Messages.ShouldBe((ulong)(firstBatch + secondBatch));
|
||||
}
|
||||
|
||||
// Reopen again to confirm the second batch survived.
|
||||
await using (var store = CreateStore())
|
||||
{
|
||||
var state = await store.GetStateAsync(default);
|
||||
state.Messages.ShouldBe((ulong)(firstBatch + secondBatch));
|
||||
}
|
||||
}
|
||||
|
||||
// Ref: TestFileStoreBasics (remove section) and Go TestFileStoreRemove
|
||||
// pattern — stores 5 msgs, removes first, last, and a middle message,
|
||||
// verifies State().Msgs decrements correctly after each removal.
|
||||
[Fact]
|
||||
public async Task Remove_messages_updates_state()
|
||||
{
|
||||
await using var store = CreateStore();
|
||||
|
||||
const string subject = "foo";
|
||||
var payload = "Hello World"u8.ToArray();
|
||||
|
||||
for (var i = 0; i < 5; i++)
|
||||
await store.AppendAsync(subject, payload, default);
|
||||
|
||||
// Remove first (seq 1) — expect 4 remaining.
|
||||
(await store.RemoveAsync(1, default)).ShouldBeTrue();
|
||||
(await store.GetStateAsync(default)).Messages.ShouldBe((ulong)4);
|
||||
|
||||
// Remove last (seq 5) — expect 3 remaining.
|
||||
(await store.RemoveAsync(5, default)).ShouldBeTrue();
|
||||
(await store.GetStateAsync(default)).Messages.ShouldBe((ulong)3);
|
||||
|
||||
// Remove a middle message (seq 3) — expect 2 remaining.
|
||||
(await store.RemoveAsync(3, default)).ShouldBeTrue();
|
||||
(await store.GetStateAsync(default)).Messages.ShouldBe((ulong)2);
|
||||
|
||||
// Sequences 2 and 4 should still be loadable.
|
||||
(await store.LoadAsync(2, default)).ShouldNotBeNull();
|
||||
(await store.LoadAsync(4, default)).ShouldNotBeNull();
|
||||
|
||||
// Removed sequences must return null.
|
||||
(await store.LoadAsync(1, default)).ShouldBeNull();
|
||||
(await store.LoadAsync(3, default)).ShouldBeNull();
|
||||
(await store.LoadAsync(5, default)).ShouldBeNull();
|
||||
}
|
||||
}
|
||||
180
tests/NATS.Server.Tests/JetStream/Storage/MemStoreBasicTests.cs
Normal file
180
tests/NATS.Server.Tests/JetStream/Storage/MemStoreBasicTests.cs
Normal file
@@ -0,0 +1,180 @@
|
||||
// Ported from golang/nats-server/server/memstore_test.go:
|
||||
// TestMemStoreBasics, TestMemStorePurge, TestMemStoreMsgHeaders (adapted),
|
||||
// TestMemStoreTimeStamps, TestMemStoreEraseMsg
|
||||
|
||||
using System.Text;
|
||||
using NATS.Server.JetStream.Storage;
|
||||
|
||||
namespace NATS.Server.Tests.JetStream.Storage;
|
||||
|
||||
public class MemStoreBasicTests
|
||||
{
|
||||
// Go ref: TestMemStoreBasics — store a message, verify sequence, state, and payload round-trip.
|
||||
[Fact]
|
||||
public async Task Store_and_load_messages()
|
||||
{
|
||||
var store = new MemStore();
|
||||
|
||||
var payload1 = "Hello World"u8.ToArray();
|
||||
var payload2 = "Second message"u8.ToArray();
|
||||
var payload3 = "Third message"u8.ToArray();
|
||||
var payload4 = "Fourth message"u8.ToArray();
|
||||
var payload5 = "Fifth message"u8.ToArray();
|
||||
|
||||
var seq1 = await store.AppendAsync("foo", payload1, default);
|
||||
var seq2 = await store.AppendAsync("foo", payload2, default);
|
||||
var seq3 = await store.AppendAsync("bar", payload3, default);
|
||||
var seq4 = await store.AppendAsync("bar", payload4, default);
|
||||
var seq5 = await store.AppendAsync("baz", payload5, default);
|
||||
|
||||
seq1.ShouldBe((ulong)1);
|
||||
seq2.ShouldBe((ulong)2);
|
||||
seq3.ShouldBe((ulong)3);
|
||||
seq4.ShouldBe((ulong)4);
|
||||
seq5.ShouldBe((ulong)5);
|
||||
|
||||
var state = await store.GetStateAsync(default);
|
||||
state.Messages.ShouldBe((ulong)5);
|
||||
state.FirstSeq.ShouldBe((ulong)1);
|
||||
state.LastSeq.ShouldBe((ulong)5);
|
||||
|
||||
var loaded1 = await store.LoadAsync(1, default);
|
||||
loaded1.ShouldNotBeNull();
|
||||
loaded1.Subject.ShouldBe("foo");
|
||||
loaded1.Sequence.ShouldBe((ulong)1);
|
||||
loaded1.Payload.Span.SequenceEqual(payload1).ShouldBeTrue();
|
||||
|
||||
var loaded3 = await store.LoadAsync(3, default);
|
||||
loaded3.ShouldNotBeNull();
|
||||
loaded3.Subject.ShouldBe("bar");
|
||||
loaded3.Payload.Span.SequenceEqual(payload3).ShouldBeTrue();
|
||||
|
||||
var loaded5 = await store.LoadAsync(5, default);
|
||||
loaded5.ShouldNotBeNull();
|
||||
loaded5.Subject.ShouldBe("baz");
|
||||
loaded5.Payload.Span.SequenceEqual(payload5).ShouldBeTrue();
|
||||
}
|
||||
|
||||
// Go ref: TestMemStoreMsgHeaders (adapted) — MemStore stores and retrieves arbitrary payloads;
|
||||
// the .NET StoredMessage does not have a separate headers field (headers are embedded in the
|
||||
// payload by the protocol layer), so this test verifies that binary payload content round-trips
|
||||
// exactly including non-ASCII byte sequences that mimic header framing.
|
||||
[Fact]
|
||||
public async Task Store_preserves_payload_bytes_including_header_framing()
|
||||
{
|
||||
var store = new MemStore();
|
||||
|
||||
// Simulate a payload that includes NATS header framing bytes followed by body bytes,
|
||||
// as the protocol layer would hand them to the store.
|
||||
var headerBytes = Encoding.ASCII.GetBytes("NATS/1.0\r\nName: derek\r\n\r\n");
|
||||
var bodyBytes = "Hello World"u8.ToArray();
|
||||
byte[] combined = [.. headerBytes, .. bodyBytes];
|
||||
|
||||
var seq = await store.AppendAsync("foo", combined, default);
|
||||
seq.ShouldBe((ulong)1);
|
||||
|
||||
var loaded = await store.LoadAsync(1, default);
|
||||
loaded.ShouldNotBeNull();
|
||||
loaded.Subject.ShouldBe("foo");
|
||||
loaded.Payload.Length.ShouldBe(combined.Length);
|
||||
loaded.Payload.Span.SequenceEqual(combined).ShouldBeTrue();
|
||||
}
|
||||
|
||||
// Go ref: TestMemStoreEraseMsg — remove a message returns true; subsequent load returns null.
|
||||
[Fact]
|
||||
public async Task Remove_messages_updates_state()
|
||||
{
|
||||
var store = new MemStore();
|
||||
|
||||
var seq1 = await store.AppendAsync("foo", "one"u8.ToArray(), default);
|
||||
var seq2 = await store.AppendAsync("foo", "two"u8.ToArray(), default);
|
||||
var seq3 = await store.AppendAsync("foo", "three"u8.ToArray(), default);
|
||||
var seq4 = await store.AppendAsync("foo", "four"u8.ToArray(), default);
|
||||
var seq5 = await store.AppendAsync("foo", "five"u8.ToArray(), default);
|
||||
|
||||
var stateBefore = await store.GetStateAsync(default);
|
||||
stateBefore.Messages.ShouldBe((ulong)5);
|
||||
|
||||
// Remove seq2 and seq4 (interior messages).
|
||||
(await store.RemoveAsync(seq2, default)).ShouldBeTrue();
|
||||
(await store.RemoveAsync(seq4, default)).ShouldBeTrue();
|
||||
|
||||
var stateAfter = await store.GetStateAsync(default);
|
||||
stateAfter.Messages.ShouldBe((ulong)3);
|
||||
|
||||
// Removed sequences are no longer loadable.
|
||||
(await store.LoadAsync(seq2, default)).ShouldBeNull();
|
||||
(await store.LoadAsync(seq4, default)).ShouldBeNull();
|
||||
|
||||
// Remaining messages are still loadable.
|
||||
(await store.LoadAsync(seq1, default)).ShouldNotBeNull();
|
||||
(await store.LoadAsync(seq3, default)).ShouldNotBeNull();
|
||||
(await store.LoadAsync(seq5, default)).ShouldNotBeNull();
|
||||
|
||||
// Removing a non-existent sequence returns false.
|
||||
(await store.RemoveAsync(99, default)).ShouldBeFalse();
|
||||
}
|
||||
|
||||
// Go ref: TestMemStorePurge — purge clears all messages and resets state.
|
||||
[Fact]
|
||||
public async Task Purge_clears_all_messages()
|
||||
{
|
||||
var store = new MemStore();
|
||||
|
||||
for (var i = 0; i < 10; i++)
|
||||
await store.AppendAsync("foo", Encoding.UTF8.GetBytes($"msg{i}"), default);
|
||||
|
||||
var stateBefore = await store.GetStateAsync(default);
|
||||
stateBefore.Messages.ShouldBe((ulong)10);
|
||||
|
||||
await store.PurgeAsync(default);
|
||||
|
||||
var stateAfter = await store.GetStateAsync(default);
|
||||
stateAfter.Messages.ShouldBe((ulong)0);
|
||||
stateAfter.Bytes.ShouldBe((ulong)0);
|
||||
}
|
||||
|
||||
// Go ref: TestMemStoreTimeStamps — each stored message gets a distinct, monotonically
|
||||
// increasing timestamp.
|
||||
[Fact]
|
||||
public async Task Stored_messages_have_distinct_non_decreasing_timestamps()
|
||||
{
|
||||
var store = new MemStore();
|
||||
const int count = 5;
|
||||
|
||||
for (var i = 0; i < count; i++)
|
||||
await store.AppendAsync("foo", "Hello World"u8.ToArray(), default);
|
||||
|
||||
var messages = await store.ListAsync(default);
|
||||
messages.Count.ShouldBe(count);
|
||||
|
||||
DateTime? previous = null;
|
||||
foreach (var msg in messages)
|
||||
{
|
||||
if (previous.HasValue)
|
||||
msg.TimestampUtc.ShouldBeGreaterThanOrEqualTo(previous.Value);
|
||||
previous = msg.TimestampUtc;
|
||||
}
|
||||
}
|
||||
|
||||
// Go ref: TestMemStoreBasics — LoadLastBySubject returns the highest-sequence message
|
||||
// for the given subject.
|
||||
[Fact]
|
||||
public async Task Load_last_by_subject_returns_most_recent_for_that_subject()
|
||||
{
|
||||
var store = new MemStore();
|
||||
|
||||
await store.AppendAsync("foo", "first"u8.ToArray(), default);
|
||||
await store.AppendAsync("bar", "other"u8.ToArray(), default);
|
||||
await store.AppendAsync("foo", "second"u8.ToArray(), default);
|
||||
await store.AppendAsync("foo", "third"u8.ToArray(), default);
|
||||
|
||||
var last = await store.LoadLastBySubjectAsync("foo", default);
|
||||
last.ShouldNotBeNull();
|
||||
last.Payload.Span.SequenceEqual("third"u8).ShouldBeTrue();
|
||||
last.Subject.ShouldBe("foo");
|
||||
|
||||
var noMatch = await store.LoadLastBySubjectAsync("does.not.exist", default);
|
||||
noMatch.ShouldBeNull();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,163 @@
|
||||
// Ported from golang/nats-server/server/memstore_test.go:
|
||||
// TestMemStoreMsgLimit, TestMemStoreBytesLimit, TestMemStoreAgeLimit
|
||||
//
|
||||
// Retention limits are enforced by StreamManager (which calls MemStore.TrimToMaxMessages,
|
||||
// removes oldest messages by bytes, and prunes by age). These tests exercise the full
|
||||
// Limits-retention path via StreamManager.Capture, which is the code path the Go server
|
||||
// exercises through its StoreMsg integration.
|
||||
|
||||
using System.Text;
|
||||
using NATS.Server.JetStream;
|
||||
using NATS.Server.JetStream.Models;
|
||||
|
||||
namespace NATS.Server.Tests.JetStream.Storage;
|
||||
|
||||
public class StorageRetentionTests
|
||||
{
|
||||
// Go ref: TestMemStoreMsgLimit — store MaxMsgs+N messages; only MaxMsgs remain,
|
||||
// oldest are evicted, sequence window advances.
|
||||
[Fact]
|
||||
public async Task Max_msgs_limit_enforced()
|
||||
{
|
||||
const int maxMsgs = 10;
|
||||
const int overCount = 20;
|
||||
|
||||
var manager = new StreamManager();
|
||||
manager.CreateOrUpdate(new StreamConfig
|
||||
{
|
||||
Name = "MSGLIMIT",
|
||||
Subjects = ["msglimit.*"],
|
||||
MaxMsgs = maxMsgs,
|
||||
Storage = StorageType.Memory,
|
||||
}).Error.ShouldBeNull();
|
||||
|
||||
for (var i = 0; i < overCount; i++)
|
||||
manager.Capture("msglimit.foo", Encoding.UTF8.GetBytes($"msg{i}"));
|
||||
|
||||
manager.TryGet("MSGLIMIT", out var handle).ShouldBeTrue();
|
||||
var state = await handle.Store.GetStateAsync(default);
|
||||
|
||||
state.Messages.ShouldBe((ulong)maxMsgs);
|
||||
// The last stored sequence is overCount.
|
||||
state.LastSeq.ShouldBe((ulong)overCount);
|
||||
// The first kept sequence is overCount - maxMsgs + 1.
|
||||
state.FirstSeq.ShouldBe((ulong)(overCount - maxMsgs + 1));
|
||||
}
|
||||
|
||||
// Go ref: TestMemStoreBytesLimit — store messages until bytes exceed MaxBytes;
|
||||
// oldest messages are purged to keep total bytes at or below the limit.
|
||||
[Fact]
|
||||
public async Task Max_bytes_limit_enforced()
|
||||
{
|
||||
// Each payload is 100 bytes. Set MaxBytes to hold exactly 5 messages.
|
||||
var payload = new byte[100];
|
||||
const int payloadSize = 100;
|
||||
const int maxCapacity = 5;
|
||||
var maxBytes = (long)(payloadSize * maxCapacity);
|
||||
|
||||
var manager = new StreamManager();
|
||||
manager.CreateOrUpdate(new StreamConfig
|
||||
{
|
||||
Name = "BYTESLIMIT",
|
||||
Subjects = ["byteslimit.*"],
|
||||
MaxBytes = maxBytes,
|
||||
Storage = StorageType.Memory,
|
||||
}).Error.ShouldBeNull();
|
||||
|
||||
// Store exactly maxCapacity messages — should all fit.
|
||||
for (var i = 0; i < maxCapacity; i++)
|
||||
manager.Capture("byteslimit.foo", payload);
|
||||
|
||||
manager.TryGet("BYTESLIMIT", out var handle).ShouldBeTrue();
|
||||
var stateAtCapacity = await handle.Store.GetStateAsync(default);
|
||||
stateAtCapacity.Messages.ShouldBe((ulong)maxCapacity);
|
||||
stateAtCapacity.Bytes.ShouldBe((ulong)(payloadSize * maxCapacity));
|
||||
|
||||
// Store 5 more — each one should displace an old message.
|
||||
for (var i = 0; i < maxCapacity; i++)
|
||||
manager.Capture("byteslimit.foo", payload);
|
||||
|
||||
var stateFinal = await handle.Store.GetStateAsync(default);
|
||||
stateFinal.Messages.ShouldBe((ulong)maxCapacity);
|
||||
stateFinal.Bytes.ShouldBeLessThanOrEqualTo((ulong)maxBytes);
|
||||
stateFinal.LastSeq.ShouldBe((ulong)(maxCapacity * 2));
|
||||
}
|
||||
|
||||
// Go ref: TestMemStoreAgeLimit — messages older than MaxAge are pruned on the next Capture.
|
||||
// In the Go server, the memstore runs a background timer; in the .NET port, pruning happens
|
||||
// synchronously inside StreamManager.Capture via PruneExpiredMessages which compares
|
||||
// TimestampUtc against (now - MaxAge). We backdate stored messages to simulate expiry.
|
||||
[Fact]
|
||||
public async Task Max_age_limit_enforced()
|
||||
{
|
||||
// Use a 1-second MaxAge so we can reason clearly about cutoff.
|
||||
const int maxAgeMs = 1000;
|
||||
|
||||
var manager = new StreamManager();
|
||||
manager.CreateOrUpdate(new StreamConfig
|
||||
{
|
||||
Name = "AGELIMIT",
|
||||
Subjects = ["agelimit.*"],
|
||||
MaxAgeMs = maxAgeMs,
|
||||
Storage = StorageType.Memory,
|
||||
}).Error.ShouldBeNull();
|
||||
|
||||
// Store 5 messages that are logically "already expired" by storing them,
|
||||
// then relying on an additional capture after sleeping past MaxAge to trigger
|
||||
// the pruning pass.
|
||||
const int initialCount = 5;
|
||||
for (var i = 0; i < initialCount; i++)
|
||||
manager.Capture("agelimit.foo", Encoding.UTF8.GetBytes($"msg{i}"));
|
||||
|
||||
manager.TryGet("AGELIMIT", out var handle).ShouldBeTrue();
|
||||
var stateBefore = await handle.Store.GetStateAsync(default);
|
||||
stateBefore.Messages.ShouldBe((ulong)initialCount);
|
||||
|
||||
// Wait for MaxAge to elapse so the stored messages are now older than the cutoff.
|
||||
await Task.Delay(maxAgeMs + 50);
|
||||
|
||||
// A subsequent Capture triggers PruneExpiredMessages, which removes all messages
|
||||
// whose TimestampUtc < (now - MaxAge).
|
||||
manager.Capture("agelimit.foo", "trigger"u8.ToArray());
|
||||
|
||||
var stateAfter = await handle.Store.GetStateAsync(default);
|
||||
// Only the freshly-appended trigger message should remain.
|
||||
stateAfter.Messages.ShouldBe((ulong)1);
|
||||
stateAfter.Bytes.ShouldBeGreaterThan((ulong)0);
|
||||
}
|
||||
|
||||
// Go ref: TestMemStoreMsgLimit — verifies that sequence numbers keep incrementing even
|
||||
// after old messages are evicted; the store window moves forward rather than wrapping.
|
||||
[Fact]
|
||||
public async Task Sequence_numbers_monotonically_increase_through_eviction()
|
||||
{
|
||||
const int maxMsgs = 5;
|
||||
const int totalToStore = 15;
|
||||
|
||||
var manager = new StreamManager();
|
||||
manager.CreateOrUpdate(new StreamConfig
|
||||
{
|
||||
Name = "SEQMONOT",
|
||||
Subjects = ["seqmonot.*"],
|
||||
MaxMsgs = maxMsgs,
|
||||
Storage = StorageType.Memory,
|
||||
}).Error.ShouldBeNull();
|
||||
|
||||
for (var i = 1; i <= totalToStore; i++)
|
||||
manager.Capture("seqmonot.foo", Encoding.UTF8.GetBytes($"msg{i}"));
|
||||
|
||||
manager.TryGet("SEQMONOT", out var handle).ShouldBeTrue();
|
||||
var state = await handle.Store.GetStateAsync(default);
|
||||
|
||||
state.Messages.ShouldBe((ulong)maxMsgs);
|
||||
state.LastSeq.ShouldBe((ulong)totalToStore);
|
||||
state.FirstSeq.ShouldBe((ulong)(totalToStore - maxMsgs + 1));
|
||||
|
||||
// The first evicted sequence (1) is no longer loadable.
|
||||
(await handle.Store.LoadAsync(1, default)).ShouldBeNull();
|
||||
// The last evicted sequence is totalToStore - maxMsgs (= 10).
|
||||
(await handle.Store.LoadAsync((ulong)(totalToStore - maxMsgs), default)).ShouldBeNull();
|
||||
// The first surviving message is still present.
|
||||
(await handle.Store.LoadAsync((ulong)(totalToStore - maxMsgs + 1), default)).ShouldNotBeNull();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user