Files
natsdotnet/tests/NATS.Server.Tests/JetStream/RetentionPolicyParityTests.cs
Joseph Doherty 61b1a00800 feat: phase C jetstream depth test parity — 34 new tests across 7 subsystems
Stream lifecycle, publish/ack, consumer delivery, retention policy,
API endpoints, cluster formation, and leader failover tests ported
from Go nats-server reference. 1006 total tests passing.
2026-02-23 19:55:31 -05:00

236 lines
9.5 KiB
C#

// Ported from golang/nats-server/server/jetstream_test.go:
// TestJetStreamLimitsRetention, TestJetStreamInterestStream,
// TestJetStreamWorkQueueRetention, TestJetStreamWorkQueueAckAll
//
// These tests exercise the three JetStream retention policies through
// StreamManager.Capture, which is the same code path the Go server uses
// when routing published messages into a stream store.
using System.Text;
using NATS.Server.JetStream;
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Validation;
namespace NATS.Server.Tests.JetStream;
public class RetentionPolicyParityTests
{
// Go ref: TestJetStreamLimitsRetention — Limits retention keeps messages up to
// configured MaxMsgs cap, evicting oldest first. MaxMsgsPer limits per-subject depth.
// Sequence numbers advance monotonically even as old messages are dropped.
[Fact]
public async Task Limits_retention_evicts_oldest_when_max_msgs_exceeded()
{
const int maxMsgs = 3;
var manager = new StreamManager();
manager.CreateOrUpdate(new StreamConfig
{
Name = "LIMITS",
Subjects = ["limits.*"],
Retention = RetentionPolicy.Limits,
MaxMsgs = maxMsgs,
Storage = StorageType.Memory,
}).Error.ShouldBeNull();
// Publish more messages than the cap allows.
for (var i = 1; i <= 6; i++)
manager.Capture("limits.foo", Encoding.UTF8.GetBytes($"msg{i}"));
manager.TryGet("LIMITS", out var handle).ShouldBeTrue();
var state = await handle.Store.GetStateAsync(default);
// Only the last maxMsgs messages remain.
state.Messages.ShouldBe((ulong)maxMsgs);
// Sequence numbers are monotonically increasing — they do not wrap.
state.LastSeq.ShouldBe((ulong)6);
state.FirstSeq.ShouldBe((ulong)(6 - maxMsgs + 1));
// The evicted messages are no longer retrievable.
(await handle.Store.LoadAsync(1, default)).ShouldBeNull();
(await handle.Store.LoadAsync(2, default)).ShouldBeNull();
(await handle.Store.LoadAsync(3, default)).ShouldBeNull();
}
// Go ref: TestJetStreamLimitsRetention — MaxMsgsPer prunes per-subject depth independently
// of the global MaxMsgs cap under Limits retention.
[Fact]
public async Task Limits_retention_prunes_per_subject_depth_independently()
{
var manager = new StreamManager();
manager.CreateOrUpdate(new StreamConfig
{
Name = "LIMITS_PER",
Subjects = ["lper.*"],
Retention = RetentionPolicy.Limits,
MaxMsgsPer = 1,
Storage = StorageType.Memory,
}).Error.ShouldBeNull();
// Publish two messages to the same subject — only the latest survives.
manager.Capture("lper.a", "first"u8.ToArray());
manager.Capture("lper.a", "second"u8.ToArray());
// Publish to a different subject — it keeps its own slot.
manager.Capture("lper.b", "only"u8.ToArray());
manager.TryGet("LIMITS_PER", out var handle).ShouldBeTrue();
var state = await handle.Store.GetStateAsync(default);
// One message per subject: lper.a (seq=2), lper.b (seq=3).
state.Messages.ShouldBe((ulong)2);
// The first lper.a message was pruned.
(await handle.Store.LoadAsync(1, default)).ShouldBeNull();
// The second lper.a and the lper.b message survive.
(await handle.Store.LoadAsync(2, default)).ShouldNotBeNull();
(await handle.Store.LoadAsync(3, default)).ShouldNotBeNull();
}
// Go ref: TestJetStreamInterestStream — Interest retention behaves like Limits for
// bounded pruning (MaxMsgs, MaxMsgsPer, MaxAgeMs still apply). It does NOT use an
// ack-floor to remove messages; pruning is driven purely by limit configuration.
[Fact]
public async Task Interest_retention_applies_limits_pruning_but_not_ack_floor_pruning()
{
var consumers = new ConsumerManager();
var manager = new StreamManager(consumerManager: consumers);
manager.CreateOrUpdate(new StreamConfig
{
Name = "INTEREST",
Subjects = ["interest.*"],
Retention = RetentionPolicy.Interest,
MaxMsgs = 5,
Storage = StorageType.Memory,
}).Error.ShouldBeNull();
consumers.CreateOrUpdate("INTEREST", new ConsumerConfig
{
DurableName = "C1",
AckPolicy = AckPolicy.All,
}).Error.ShouldBeNull();
// Publish 3 messages and acknowledge through seq=2.
manager.Capture("interest.foo", "one"u8.ToArray());
manager.Capture("interest.foo", "two"u8.ToArray());
manager.Capture("interest.foo", "three"u8.ToArray());
consumers.AckAll("INTEREST", "C1", 2);
// Trigger a retention pass via another publish.
manager.Capture("interest.foo", "four"u8.ToArray());
manager.TryGet("INTEREST", out var handle).ShouldBeTrue();
var state = await handle.Store.GetStateAsync(default);
// Interest retention does NOT remove messages based on ack floor —
// all 4 messages remain because MaxMsgs=5 has not been exceeded.
state.Messages.ShouldBe((ulong)4);
}
// Go ref: TestJetStreamWorkQueueRetention — WorkQueue validation rejects a stream whose
// MaxConsumers is 0 (Go: ErrJetStreamWorkQueueMaxConsumers).
[Fact]
public void WorkQueue_retention_validation_rejects_zero_max_consumers()
{
var result = JetStreamConfigValidator.Validate(new StreamConfig
{
Name = "WQ_INVALID",
Subjects = ["wq.invalid"],
Retention = RetentionPolicy.WorkQueue,
MaxConsumers = 0,
});
result.IsValid.ShouldBeFalse();
result.Message.ShouldNotBeNullOrWhiteSpace();
}
// Go ref: TestJetStreamWorkQueueRetention — WorkQueue retention removes messages once
// a consumer's ack floor advances past them. Messages below the ack floor are pruned
// on the next Capture call; messages above it remain available.
[Fact]
public async Task WorkQueue_retention_removes_messages_below_ack_floor_on_next_publish()
{
var consumers = new ConsumerManager();
var manager = new StreamManager(consumerManager: consumers);
manager.CreateOrUpdate(new StreamConfig
{
Name = "WQ",
Subjects = ["wq.*"],
Retention = RetentionPolicy.WorkQueue,
MaxConsumers = 1,
Storage = StorageType.Memory,
}).Error.ShouldBeNull();
consumers.CreateOrUpdate("WQ", new ConsumerConfig
{
DurableName = "WORKER",
AckPolicy = AckPolicy.All,
}).Error.ShouldBeNull();
// Publish three messages.
manager.Capture("wq.a", "first"u8.ToArray());
manager.Capture("wq.a", "second"u8.ToArray());
manager.Capture("wq.a", "third"u8.ToArray());
// Acknowledge through seq=2 — floor advances to 2.
consumers.AckAll("WQ", "WORKER", 2).ShouldBeTrue();
// Next publish triggers the WorkQueue retention pass.
manager.Capture("wq.a", "fourth"u8.ToArray());
manager.TryGet("WQ", out var handle).ShouldBeTrue();
var state = await handle.Store.GetStateAsync(default);
// Messages 1 and 2 were at or below the ack floor and must be removed.
// Messages 3 and 4 are above the floor and must still be present.
state.Messages.ShouldBe((ulong)2);
(await handle.Store.LoadAsync(1, default)).ShouldBeNull();
(await handle.Store.LoadAsync(2, default)).ShouldBeNull();
(await handle.Store.LoadAsync(3, default)).ShouldNotBeNull();
(await handle.Store.LoadAsync(4, default)).ShouldNotBeNull();
}
// Go ref: TestJetStreamWorkQueueAckAll — a full AckAll to the last sequence causes
// all previously stored messages to be pruned on the next Capture. The stream then
// contains only the newly published message.
[Fact]
public async Task WorkQueue_retention_prunes_all_messages_when_ack_floor_reaches_last_seq()
{
var consumers = new ConsumerManager();
var manager = new StreamManager(consumerManager: consumers);
manager.CreateOrUpdate(new StreamConfig
{
Name = "WQ_FULL",
Subjects = ["wqf.*"],
Retention = RetentionPolicy.WorkQueue,
MaxConsumers = 1,
Storage = StorageType.Memory,
}).Error.ShouldBeNull();
consumers.CreateOrUpdate("WQ_FULL", new ConsumerConfig
{
DurableName = "WORKER",
AckPolicy = AckPolicy.All,
}).Error.ShouldBeNull();
manager.Capture("wqf.a", "one"u8.ToArray());
manager.Capture("wqf.a", "two"u8.ToArray());
manager.Capture("wqf.a", "three"u8.ToArray());
// Acknowledge through the last sequence — floor reaches seq=3.
consumers.AckAll("WQ_FULL", "WORKER", 3).ShouldBeTrue();
// Trigger retention pass.
manager.Capture("wqf.a", "four"u8.ToArray());
manager.TryGet("WQ_FULL", out var handle).ShouldBeTrue();
var state = await handle.Store.GetStateAsync(default);
// All three previously stored messages are pruned; only seq=4 remains.
state.Messages.ShouldBe((ulong)1);
state.LastSeq.ShouldBe((ulong)4);
(await handle.Store.LoadAsync(1, default)).ShouldBeNull();
(await handle.Store.LoadAsync(2, default)).ShouldBeNull();
(await handle.Store.LoadAsync(3, default)).ShouldBeNull();
(await handle.Store.LoadAsync(4, default)).ShouldNotBeNull();
}
}