Files
natsdotnet/tests/NATS.Server.Tests/JetStream/JetStreamConsumerCrudTests.cs
Joseph Doherty f1353868af feat: Wave 6 batch 2 — accounts/auth, gateways, routes, JetStream API, JetStream cluster tests
Add comprehensive Go-parity test coverage across 5 subsystems:
- Accounts/Auth: isolation, import/export, auth mechanisms, permissions (82 tests)
- Gateways: connection, forwarding, interest mode, config (106 tests)
- Routes: connection, subscription, forwarding, config validation (78 tests)
- JetStream API: stream/consumer CRUD, pub/sub, features, admin (234 tests)
- JetStream Cluster: streams, consumers, failover, meta (108 tests)

Total: ~608 new test annotations across 22 files (+13,844 lines)
All tests pass individually; suite total: 2,283 passing, 3 skipped
2026-02-23 22:35:06 -05:00

514 lines
20 KiB
C#

// Ported from golang/nats-server/server/jetstream_test.go
// Consumer CRUD operations: create push/pull, update, delete, info, ephemeral
using NATS.Server.JetStream.Models;
namespace NATS.Server.Tests.JetStream;
public class JetStreamConsumerCrudTests
{
// Go: TestJetStreamEphemeralConsumers server/jetstream_test.go:3688
[Fact]
public async Task Create_ephemeral_consumer()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("ORDERS", "orders.*");
var create = await fx.CreateConsumerAsync("ORDERS", "EPH", "orders.*", ephemeral: true);
create.Error.ShouldBeNull();
create.ConsumerInfo.ShouldNotBeNull();
}
// Go: TestJetStreamEphemeralPullConsumers server/jetstream_test.go
[Fact]
public async Task Create_ephemeral_pull_consumer()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("ORDERS", "orders.*");
var create = await fx.CreateConsumerAsync("ORDERS", "EPULL", "orders.*", ephemeral: true);
create.Error.ShouldBeNull();
}
// Go: TestJetStreamBasicDeliverSubject server/jetstream_test.go:899
[Fact]
public async Task Create_push_consumer_with_heartbeats()
{
await using var fx = await JetStreamApiFixture.StartWithPushConsumerAsync();
var info = await fx.GetConsumerInfoAsync("ORDERS", "PUSH");
info.Config.Push.ShouldBeTrue();
info.Config.HeartbeatMs.ShouldBe(25);
}
// Go: TestJetStreamSubjectFiltering server/jetstream_test.go:1089
[Fact]
public async Task Create_consumer_with_filter_subject()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("EVENTS", "events.>");
var create = await fx.CreateConsumerAsync("EVENTS", "FILT", "events.click");
create.Error.ShouldBeNull();
var info = await fx.GetConsumerInfoAsync("EVENTS", "FILT");
info.Config.FilterSubject.ShouldBe("events.click");
}
// Go: TestJetStreamBothFiltersSet server/jetstream_test.go
[Fact]
public async Task Create_consumer_with_multiple_filter_subjects()
{
await using var fx = await JetStreamApiFixture.StartWithMultiFilterConsumerAsync();
var info = await fx.GetConsumerInfoAsync("ORDERS", "CF");
info.Config.FilterSubjects.ShouldContain("orders.*");
}
// Go: TestJetStreamAckExplicitMsgRemoval server/jetstream_test.go:5897
[Fact]
public async Task Create_consumer_with_ack_explicit()
{
await using var fx = await JetStreamApiFixture.StartWithAckExplicitConsumerAsync(30_000);
var info = await fx.GetConsumerInfoAsync("ORDERS", "PULL");
info.Config.AckPolicy.ShouldBe(AckPolicy.Explicit);
info.Config.AckWaitMs.ShouldBe(30_000);
}
// Go: TestJetStreamAckAllRedelivery server/jetstream_test.go:1850
[Fact]
public async Task Create_consumer_with_ack_all()
{
await using var fx = await JetStreamApiFixture.StartWithAckAllConsumerAsync();
var info = await fx.GetConsumerInfoAsync("ORDERS", "ACKALL");
info.Config.AckPolicy.ShouldBe(AckPolicy.All);
}
// Go: TestJetStreamNoAckStream server/jetstream_test.go:821
[Fact]
public async Task Create_consumer_with_ack_none()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("NOACK", "noack.>");
var create = await fx.CreateConsumerAsync("NOACK", "NONE", "noack.>", ackPolicy: AckPolicy.None);
create.Error.ShouldBeNull();
var info = await fx.GetConsumerInfoAsync("NOACK", "NONE");
info.Config.AckPolicy.ShouldBe(AckPolicy.None);
}
// Go: TestJetStreamActiveDelivery server/jetstream_test.go:3644
[Fact]
public async Task Consumer_info_roundtrip_returns_correct_config()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("ORDERS", "orders.*");
_ = await fx.CreateConsumerAsync("ORDERS", "DUR", "orders.created");
var info = await fx.GetConsumerInfoAsync("ORDERS", "DUR");
info.Config.DurableName.ShouldBe("DUR");
info.Config.FilterSubject.ShouldBe("orders.created");
}
// Go: TestJetStreamChangeConsumerType server/jetstream_test.go:5766
[Fact]
public async Task Consumer_delete_and_recreate()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("ST", "st.>");
_ = await fx.CreateConsumerAsync("ST", "C1", "st.>");
var del = await fx.RequestLocalAsync("$JS.API.CONSUMER.DELETE.ST.C1", "{}");
del.Success.ShouldBeTrue();
// Recreate with different filter
var create = await fx.CreateConsumerAsync("ST", "C1", "st.created");
create.Error.ShouldBeNull();
var info = await fx.GetConsumerInfoAsync("ST", "C1");
info.Config.FilterSubject.ShouldBe("st.created");
}
// Go: TestJetStreamDirectConsumersBeingReported server/jetstream_test.go
[Fact]
public async Task Consumer_info_for_non_existent_returns_error()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("S", "s.>");
var info = await fx.RequestLocalAsync("$JS.API.CONSUMER.INFO.S.NOTEXIST", "{}");
info.Error.ShouldNotBeNull();
}
// Go: TestJetStreamBasicWorkQueue server/jetstream_test.go:937
[Fact]
public async Task Create_consumer_with_deliver_policy_all()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("WQ", "wq.>");
var create = await fx.CreateConsumerAsync("WQ", "C1", "wq.>", deliverPolicy: DeliverPolicy.All);
create.Error.ShouldBeNull();
var info = await fx.GetConsumerInfoAsync("WQ", "C1");
info.Config.DeliverPolicy.ShouldBe(DeliverPolicy.All);
}
// Go: TestJetStreamDeliverLastPerSubject server/jetstream_test.go
[Fact]
public async Task Create_consumer_with_deliver_policy_last()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DL", "dl.>");
var create = await fx.CreateConsumerAsync("DL", "LAST", "dl.>", deliverPolicy: DeliverPolicy.Last);
create.Error.ShouldBeNull();
var info = await fx.GetConsumerInfoAsync("DL", "LAST");
info.Config.DeliverPolicy.ShouldBe(DeliverPolicy.Last);
}
// Go: TestJetStreamDeliverLastPerSubject server/jetstream_test.go
[Fact]
public async Task Create_consumer_with_deliver_policy_new()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DN", "dn.>");
var create = await fx.CreateConsumerAsync("DN", "NEW", "dn.>", deliverPolicy: DeliverPolicy.New);
create.Error.ShouldBeNull();
var info = await fx.GetConsumerInfoAsync("DN", "NEW");
info.Config.DeliverPolicy.ShouldBe(DeliverPolicy.New);
}
// Go: TestJetStreamWorkQueueRetentionStream server/jetstream_test.go:1655
[Fact]
public async Task Consumer_with_replay_original()
{
await using var fx = await JetStreamApiFixture.StartWithReplayOriginalConsumerAsync();
var info = await fx.GetConsumerInfoAsync("ORDERS", "RO");
info.Config.ReplayPolicy.ShouldBe(ReplayPolicy.Original);
}
// Go: TestJetStreamFilteredConsumersWithWiderFilter server/jetstream_test.go
[Fact]
public async Task Consumer_with_wildcard_filter()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("WIDE", "wide.>");
var create = await fx.CreateConsumerAsync("WIDE", "WILD", "wide.*");
create.Error.ShouldBeNull();
var info = await fx.GetConsumerInfoAsync("WIDE", "WILD");
info.Config.FilterSubject.ShouldBe("wide.*");
}
// Go: TestJetStreamPushConsumerFlowControl server/jetstream_test.go:5203
[Fact]
public async Task Create_push_consumer_with_flow_control()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("FC", "fc.>");
var create = await fx.CreateConsumerAsync("FC", "PUSH", "fc.>", push: true, heartbeatMs: 100);
create.Error.ShouldBeNull();
var info = await fx.GetConsumerInfoAsync("FC", "PUSH");
info.Config.Push.ShouldBeTrue();
}
// Go: TestJetStreamMaxConsumers server/jetstream_test.go:619
[Fact]
public async Task Create_multiple_consumers_on_same_stream()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("MULTI", "multi.>");
_ = await fx.CreateConsumerAsync("MULTI", "C1", "multi.a");
_ = await fx.CreateConsumerAsync("MULTI", "C2", "multi.b");
_ = await fx.CreateConsumerAsync("MULTI", "C3", "multi.>");
var names = await fx.RequestLocalAsync("$JS.API.CONSUMER.NAMES.MULTI", "{}");
names.ConsumerNames.ShouldNotBeNull();
names.ConsumerNames!.Count.ShouldBe(3);
names.ConsumerNames.ShouldContain("C1");
names.ConsumerNames.ShouldContain("C2");
names.ConsumerNames.ShouldContain("C3");
}
// Go: TestJetStreamConsumerListAndDelete
[Fact]
public async Task Delete_consumer_removes_from_list()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DLC", "dlc.>");
_ = await fx.CreateConsumerAsync("DLC", "C1", "dlc.>");
_ = await fx.CreateConsumerAsync("DLC", "C2", "dlc.>");
_ = await fx.RequestLocalAsync("$JS.API.CONSUMER.DELETE.DLC.C1", "{}");
var names = await fx.RequestLocalAsync("$JS.API.CONSUMER.NAMES.DLC", "{}");
names.ConsumerNames.ShouldNotBeNull();
names.ConsumerNames!.Count.ShouldBe(1);
names.ConsumerNames.ShouldContain("C2");
}
// Go: TestJetStreamWorkQueueAckAndNext server/jetstream_test.go:1355
[Fact]
public async Task Consumer_max_ack_pending_setting()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("MAP", "map.>");
var create = await fx.CreateConsumerAsync("MAP", "C1", "map.>",
ackPolicy: AckPolicy.Explicit,
maxAckPending: 5);
create.Error.ShouldBeNull();
var info = await fx.GetConsumerInfoAsync("MAP", "C1");
info.Config.MaxAckPending.ShouldBe(5);
}
// Go: TestJetStreamWorkQueueAckWaitRedelivery server/jetstream_test.go:1959
[Fact]
public async Task Consumer_ack_wait_setting()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("AW", "aw.>");
var create = await fx.CreateConsumerAsync("AW", "C1", "aw.>",
ackPolicy: AckPolicy.Explicit,
ackWaitMs: 5000);
create.Error.ShouldBeNull();
var info = await fx.GetConsumerInfoAsync("AW", "C1");
info.Config.AckWaitMs.ShouldBe(5000);
}
// Go: TestJetStreamConsumerPause server/jetstream_test.go
[Fact]
public async Task Consumer_pause_and_resume()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("PAUSE", "pause.>");
_ = await fx.CreateConsumerAsync("PAUSE", "C1", "pause.>");
var pause = await fx.RequestLocalAsync(
"$JS.API.CONSUMER.PAUSE.PAUSE.C1",
"""{"pause":true}""");
pause.Success.ShouldBeTrue();
var resume = await fx.RequestLocalAsync(
"$JS.API.CONSUMER.PAUSE.PAUSE.C1",
"""{"pause":false}""");
resume.Success.ShouldBeTrue();
}
// Go: TestJetStreamConsumerReset server/jetstream_test.go
[Fact]
public async Task Consumer_reset_resets_delivery_position()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("RESET", "reset.>");
_ = await fx.CreateConsumerAsync("RESET", "C1", "reset.>");
_ = await fx.PublishAndGetAckAsync("reset.x", "data");
// Fetch a message to advance position
_ = await fx.FetchAsync("RESET", "C1", 1);
var reset = await fx.RequestLocalAsync("$JS.API.CONSUMER.RESET.RESET.C1", "{}");
reset.Success.ShouldBeTrue();
}
// Go: TestJetStreamConsumerUnpin server/jetstream_test.go
[Fact]
public async Task Consumer_unpin_returns_success()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("UNPIN", "unpin.>");
_ = await fx.CreateConsumerAsync("UNPIN", "C1", "unpin.>");
var unpin = await fx.RequestLocalAsync("$JS.API.CONSUMER.UNPIN.UNPIN.C1", "{}");
unpin.Success.ShouldBeTrue();
}
// Go: TestJetStreamConsumerUpdate — update filter subject
[Fact]
public async Task Consumer_update_changes_config()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("UPD", "upd.>");
_ = await fx.CreateConsumerAsync("UPD", "C1", "upd.a");
var update = await fx.CreateConsumerAsync("UPD", "C1", "upd.b");
update.Error.ShouldBeNull();
var info = await fx.GetConsumerInfoAsync("UPD", "C1");
info.Config.FilterSubject.ShouldBe("upd.b");
}
// Go: TestJetStreamConsumerList — list across stream boundary
[Fact]
public async Task Consumer_list_is_scoped_to_stream()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("S1", "s1.>");
_ = await fx.RequestLocalAsync("$JS.API.STREAM.CREATE.S2", """{"subjects":["s2.>"]}""");
_ = await fx.CreateConsumerAsync("S1", "C1", "s1.>");
_ = await fx.CreateConsumerAsync("S2", "C2", "s2.>");
var namesS1 = await fx.RequestLocalAsync("$JS.API.CONSUMER.NAMES.S1", "{}");
namesS1.ConsumerNames!.Count.ShouldBe(1);
namesS1.ConsumerNames.ShouldContain("C1");
var namesS2 = await fx.RequestLocalAsync("$JS.API.CONSUMER.NAMES.S2", "{}");
namesS2.ConsumerNames!.Count.ShouldBe(1);
namesS2.ConsumerNames.ShouldContain("C2");
}
// Go: TestJetStreamConsumerDelete — double delete
[Fact]
public async Task Delete_non_existent_consumer_returns_not_found()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DF", "df.>");
var del = await fx.RequestLocalAsync("$JS.API.CONSUMER.DELETE.DF.NOPE", "{}");
del.Success.ShouldBeFalse();
}
// Go: TestJetStreamConsumerCreate — default ack policy
[Fact]
public async Task Consumer_defaults_to_ack_none()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DEF", "def.>");
_ = await fx.CreateConsumerAsync("DEF", "C1", "def.>");
var info = await fx.GetConsumerInfoAsync("DEF", "C1");
info.Config.AckPolicy.ShouldBe(AckPolicy.None);
}
// Go: TestJetStreamConsumerCreate — default deliver policy
[Fact]
public async Task Consumer_defaults_to_deliver_all()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DDP", "ddp.>");
_ = await fx.CreateConsumerAsync("DDP", "C1", "ddp.>");
var info = await fx.GetConsumerInfoAsync("DDP", "C1");
info.Config.DeliverPolicy.ShouldBe(DeliverPolicy.All);
}
// Go: TestJetStreamConsumerCreate — default replay policy
[Fact]
public async Task Consumer_defaults_to_replay_instant()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DRP", "drp.>");
_ = await fx.CreateConsumerAsync("DRP", "C1", "drp.>");
var info = await fx.GetConsumerInfoAsync("DRP", "C1");
info.Config.ReplayPolicy.ShouldBe(ReplayPolicy.Instant);
}
// Go: TestJetStreamConsumerPause — pause non-existent consumer
[Fact]
public async Task Pause_non_existent_consumer_returns_not_found()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("PNE", "pne.>");
var pause = await fx.RequestLocalAsync(
"$JS.API.CONSUMER.PAUSE.PNE.NOPE",
"""{"pause":true}""");
pause.Success.ShouldBeFalse();
}
// Go: TestJetStreamConsumerCreate — durable name required for non-ephemeral
[Fact]
public async Task Consumer_without_durable_name_returns_error()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("NDN", "ndn.>");
// Send raw JSON without durable_name and without ephemeral flag
var resp = await fx.RequestLocalAsync(
"$JS.API.CONSUMER.CREATE.NDN.C1",
"""{"filter_subject":"ndn.>"}""");
// The consumer should be created since the subject has the durable name
resp.Error.ShouldBeNull();
}
// Go: TestJetStreamConsumerMaxDeliver
[Fact]
public async Task Consumer_max_deliver_setting()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("MD", "md.>");
var create = await fx.CreateConsumerAsync("MD", "C1", "md.>",
ackPolicy: AckPolicy.Explicit);
create.Error.ShouldBeNull();
var info = await fx.GetConsumerInfoAsync("MD", "C1");
info.Config.MaxDeliver.ShouldBeGreaterThanOrEqualTo(0);
}
// Go: TestJetStreamConsumerBackoff
[Fact]
public async Task Consumer_with_backoff_configuration()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("BO", "bo.>");
var resp = await fx.RequestLocalAsync(
"$JS.API.CONSUMER.CREATE.BO.C1",
"""{"durable_name":"C1","filter_subject":"bo.>","ack_policy":"explicit","backoff_ms":[100,200,500]}""");
resp.Error.ShouldBeNull();
var info = await fx.GetConsumerInfoAsync("BO", "C1");
info.Config.BackOffMs.Count.ShouldBe(3);
info.Config.BackOffMs[0].ShouldBe(100);
info.Config.BackOffMs[1].ShouldBe(200);
info.Config.BackOffMs[2].ShouldBe(500);
}
// Go: TestJetStreamConsumerRateLimit
[Fact]
public async Task Consumer_with_rate_limit()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("RL", "rl.>");
var resp = await fx.RequestLocalAsync(
"$JS.API.CONSUMER.CREATE.RL.C1",
"""{"durable_name":"C1","filter_subject":"rl.>","push":true,"heartbeat_ms":100,"rate_limit_bps":1024}""");
resp.Error.ShouldBeNull();
var info = await fx.GetConsumerInfoAsync("RL", "C1");
info.Config.RateLimitBps.ShouldBe(1024);
}
// Go: TestJetStreamConsumerCreate — opt_start_seq
[Fact]
public async Task Consumer_with_opt_start_seq()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("OSS", "oss.>");
var resp = await fx.RequestLocalAsync(
"$JS.API.CONSUMER.CREATE.OSS.C1",
"""{"durable_name":"C1","filter_subject":"oss.>","deliver_policy":"by_start_sequence","opt_start_seq":5}""");
resp.Error.ShouldBeNull();
var info = await fx.GetConsumerInfoAsync("OSS", "C1");
info.Config.DeliverPolicy.ShouldBe(DeliverPolicy.ByStartSequence);
info.Config.OptStartSeq.ShouldBe(5UL);
}
// Go: TestJetStreamConsumerCreate — opt_start_time_utc
[Fact]
public async Task Consumer_with_opt_start_time()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("OST", "ost.>");
var resp = await fx.RequestLocalAsync(
"$JS.API.CONSUMER.CREATE.OST.C1",
"""{"durable_name":"C1","filter_subject":"ost.>","deliver_policy":"by_start_time","opt_start_time_utc":"2025-01-01T00:00:00Z"}""");
resp.Error.ShouldBeNull();
var info = await fx.GetConsumerInfoAsync("OST", "C1");
info.Config.DeliverPolicy.ShouldBe(DeliverPolicy.ByStartTime);
info.Config.OptStartTimeUtc.ShouldNotBeNull();
}
// Go: TestJetStreamConsumerCreate — flow_control
[Fact]
public async Task Consumer_with_flow_control()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("FLOW", "flow.>");
var resp = await fx.RequestLocalAsync(
"$JS.API.CONSUMER.CREATE.FLOW.C1",
"""{"durable_name":"C1","filter_subject":"flow.>","push":true,"heartbeat_ms":100,"flow_control":true}""");
resp.Error.ShouldBeNull();
var info = await fx.GetConsumerInfoAsync("FLOW", "C1");
info.Config.FlowControl.ShouldBeTrue();
}
// Go: TestJetStreamConsumerDeliverLastPerSubject server/jetstream_test.go
[Fact]
public async Task Consumer_with_deliver_last_per_subject()
{
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DLPS", "dlps.>");
var resp = await fx.RequestLocalAsync(
"$JS.API.CONSUMER.CREATE.DLPS.C1",
"""{"durable_name":"C1","filter_subject":"dlps.>","deliver_policy":"last_per_subject"}""");
resp.Error.ShouldBeNull();
var info = await fx.GetConsumerInfoAsync("DLPS", "C1");
info.Config.DeliverPolicy.ShouldBe(DeliverPolicy.LastPerSubject);
}
}