// Ported from golang/nats-server/server/jetstream_test.go // Consumer CRUD operations: create push/pull, update, delete, info, ephemeral using NATS.Server.JetStream.Models; using NATS.Server.TestUtilities; namespace NATS.Server.JetStream.Tests.JetStream; public class JetStreamConsumerCrudTests { // Go: TestJetStreamEphemeralConsumers server/jetstream_test.go:3688 [Fact] public async Task Create_ephemeral_consumer() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("ORDERS", "orders.*"); var create = await fx.CreateConsumerAsync("ORDERS", "EPH", "orders.*", ephemeral: true); create.Error.ShouldBeNull(); create.ConsumerInfo.ShouldNotBeNull(); } // Go: TestJetStreamEphemeralPullConsumers server/jetstream_test.go [Fact] public async Task Create_ephemeral_pull_consumer() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("ORDERS", "orders.*"); var create = await fx.CreateConsumerAsync("ORDERS", "EPULL", "orders.*", ephemeral: true); create.Error.ShouldBeNull(); } // Go: TestJetStreamBasicDeliverSubject server/jetstream_test.go:899 [Fact] public async Task Create_push_consumer_with_heartbeats() { await using var fx = await JetStreamApiFixture.StartWithPushConsumerAsync(); var info = await fx.GetConsumerInfoAsync("ORDERS", "PUSH"); info.Config.Push.ShouldBeTrue(); info.Config.HeartbeatMs.ShouldBe(25); } // Go: TestJetStreamSubjectFiltering server/jetstream_test.go:1089 [Fact] public async Task Create_consumer_with_filter_subject() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("EVENTS", "events.>"); var create = await fx.CreateConsumerAsync("EVENTS", "FILT", "events.click"); create.Error.ShouldBeNull(); var info = await fx.GetConsumerInfoAsync("EVENTS", "FILT"); info.Config.FilterSubject.ShouldBe("events.click"); } // Go: TestJetStreamBothFiltersSet server/jetstream_test.go [Fact] public async Task Create_consumer_with_multiple_filter_subjects() { await using var fx = await JetStreamApiFixture.StartWithMultiFilterConsumerAsync(); var info = await fx.GetConsumerInfoAsync("ORDERS", "CF"); info.Config.FilterSubjects.ShouldContain("orders.*"); } // Go: TestJetStreamAckExplicitMsgRemoval server/jetstream_test.go:5897 [Fact] public async Task Create_consumer_with_ack_explicit() { await using var fx = await JetStreamApiFixture.StartWithAckExplicitConsumerAsync(30_000); var info = await fx.GetConsumerInfoAsync("ORDERS", "PULL"); info.Config.AckPolicy.ShouldBe(AckPolicy.Explicit); info.Config.AckWaitMs.ShouldBe(30_000); } // Go: TestJetStreamAckAllRedelivery server/jetstream_test.go:1850 [Fact] public async Task Create_consumer_with_ack_all() { await using var fx = await JetStreamApiFixture.StartWithAckAllConsumerAsync(); var info = await fx.GetConsumerInfoAsync("ORDERS", "ACKALL"); info.Config.AckPolicy.ShouldBe(AckPolicy.All); } // Go: TestJetStreamNoAckStream server/jetstream_test.go:821 [Fact] public async Task Create_consumer_with_ack_none() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("NOACK", "noack.>"); var create = await fx.CreateConsumerAsync("NOACK", "NONE", "noack.>", ackPolicy: AckPolicy.None); create.Error.ShouldBeNull(); var info = await fx.GetConsumerInfoAsync("NOACK", "NONE"); info.Config.AckPolicy.ShouldBe(AckPolicy.None); } // Go: TestJetStreamActiveDelivery server/jetstream_test.go:3644 [Fact] public async Task Consumer_info_roundtrip_returns_correct_config() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("ORDERS", "orders.*"); _ = await fx.CreateConsumerAsync("ORDERS", "DUR", "orders.created"); var info = await fx.GetConsumerInfoAsync("ORDERS", "DUR"); info.Config.DurableName.ShouldBe("DUR"); info.Config.FilterSubject.ShouldBe("orders.created"); } // Go: TestJetStreamChangeConsumerType server/jetstream_test.go:5766 [Fact] public async Task Consumer_delete_and_recreate() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("ST", "st.>"); _ = await fx.CreateConsumerAsync("ST", "C1", "st.>"); var del = await fx.RequestLocalAsync("$JS.API.CONSUMER.DELETE.ST.C1", "{}"); del.Success.ShouldBeTrue(); // Recreate with different filter var create = await fx.CreateConsumerAsync("ST", "C1", "st.created"); create.Error.ShouldBeNull(); var info = await fx.GetConsumerInfoAsync("ST", "C1"); info.Config.FilterSubject.ShouldBe("st.created"); } // Go: TestJetStreamDirectConsumersBeingReported server/jetstream_test.go [Fact] public async Task Consumer_info_for_non_existent_returns_error() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("S", "s.>"); var info = await fx.RequestLocalAsync("$JS.API.CONSUMER.INFO.S.NOTEXIST", "{}"); info.Error.ShouldNotBeNull(); } // Go: TestJetStreamBasicWorkQueue server/jetstream_test.go:937 [Fact] public async Task Create_consumer_with_deliver_policy_all() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("WQ", "wq.>"); var create = await fx.CreateConsumerAsync("WQ", "C1", "wq.>", deliverPolicy: DeliverPolicy.All); create.Error.ShouldBeNull(); var info = await fx.GetConsumerInfoAsync("WQ", "C1"); info.Config.DeliverPolicy.ShouldBe(DeliverPolicy.All); } // Go: TestJetStreamDeliverLastPerSubject server/jetstream_test.go [Fact] public async Task Create_consumer_with_deliver_policy_last() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DL", "dl.>"); var create = await fx.CreateConsumerAsync("DL", "LAST", "dl.>", deliverPolicy: DeliverPolicy.Last); create.Error.ShouldBeNull(); var info = await fx.GetConsumerInfoAsync("DL", "LAST"); info.Config.DeliverPolicy.ShouldBe(DeliverPolicy.Last); } // Go: TestJetStreamDeliverLastPerSubject server/jetstream_test.go [Fact] public async Task Create_consumer_with_deliver_policy_new() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DN", "dn.>"); var create = await fx.CreateConsumerAsync("DN", "NEW", "dn.>", deliverPolicy: DeliverPolicy.New); create.Error.ShouldBeNull(); var info = await fx.GetConsumerInfoAsync("DN", "NEW"); info.Config.DeliverPolicy.ShouldBe(DeliverPolicy.New); } // Go: TestJetStreamWorkQueueRetentionStream server/jetstream_test.go:1655 [Fact] public async Task Consumer_with_replay_original() { await using var fx = await JetStreamApiFixture.StartWithReplayOriginalConsumerAsync(); var info = await fx.GetConsumerInfoAsync("ORDERS", "RO"); info.Config.ReplayPolicy.ShouldBe(ReplayPolicy.Original); } // Go: TestJetStreamFilteredConsumersWithWiderFilter server/jetstream_test.go [Fact] public async Task Consumer_with_wildcard_filter() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("WIDE", "wide.>"); var create = await fx.CreateConsumerAsync("WIDE", "WILD", "wide.*"); create.Error.ShouldBeNull(); var info = await fx.GetConsumerInfoAsync("WIDE", "WILD"); info.Config.FilterSubject.ShouldBe("wide.*"); } // Go: TestJetStreamPushConsumerFlowControl server/jetstream_test.go:5203 [Fact] public async Task Create_push_consumer_with_flow_control() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("FC", "fc.>"); var create = await fx.CreateConsumerAsync("FC", "PUSH", "fc.>", push: true, heartbeatMs: 100); create.Error.ShouldBeNull(); var info = await fx.GetConsumerInfoAsync("FC", "PUSH"); info.Config.Push.ShouldBeTrue(); } // Go: TestJetStreamMaxConsumers server/jetstream_test.go:619 [Fact] public async Task Create_multiple_consumers_on_same_stream() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("MULTI", "multi.>"); _ = await fx.CreateConsumerAsync("MULTI", "C1", "multi.a"); _ = await fx.CreateConsumerAsync("MULTI", "C2", "multi.b"); _ = await fx.CreateConsumerAsync("MULTI", "C3", "multi.>"); var names = await fx.RequestLocalAsync("$JS.API.CONSUMER.NAMES.MULTI", "{}"); names.ConsumerNames.ShouldNotBeNull(); names.ConsumerNames!.Count.ShouldBe(3); names.ConsumerNames.ShouldContain("C1"); names.ConsumerNames.ShouldContain("C2"); names.ConsumerNames.ShouldContain("C3"); } // Go: TestJetStreamConsumerListAndDelete [Fact] public async Task Delete_consumer_removes_from_list() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DLC", "dlc.>"); _ = await fx.CreateConsumerAsync("DLC", "C1", "dlc.>"); _ = await fx.CreateConsumerAsync("DLC", "C2", "dlc.>"); _ = await fx.RequestLocalAsync("$JS.API.CONSUMER.DELETE.DLC.C1", "{}"); var names = await fx.RequestLocalAsync("$JS.API.CONSUMER.NAMES.DLC", "{}"); names.ConsumerNames.ShouldNotBeNull(); names.ConsumerNames!.Count.ShouldBe(1); names.ConsumerNames.ShouldContain("C2"); } // Go: TestJetStreamWorkQueueAckAndNext server/jetstream_test.go:1355 [Fact] public async Task Consumer_max_ack_pending_setting() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("MAP", "map.>"); var create = await fx.CreateConsumerAsync("MAP", "C1", "map.>", ackPolicy: AckPolicy.Explicit, maxAckPending: 5); create.Error.ShouldBeNull(); var info = await fx.GetConsumerInfoAsync("MAP", "C1"); info.Config.MaxAckPending.ShouldBe(5); } // Go: TestJetStreamWorkQueueAckWaitRedelivery server/jetstream_test.go:1959 [Fact] public async Task Consumer_ack_wait_setting() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("AW", "aw.>"); var create = await fx.CreateConsumerAsync("AW", "C1", "aw.>", ackPolicy: AckPolicy.Explicit, ackWaitMs: 5000); create.Error.ShouldBeNull(); var info = await fx.GetConsumerInfoAsync("AW", "C1"); info.Config.AckWaitMs.ShouldBe(5000); } // Go: TestJetStreamConsumerPause server/jetstream_test.go [Fact] public async Task Consumer_pause_and_resume() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("PAUSE", "pause.>"); _ = await fx.CreateConsumerAsync("PAUSE", "C1", "pause.>"); var pause = await fx.RequestLocalAsync( "$JS.API.CONSUMER.PAUSE.PAUSE.C1", """{"pause":true}"""); pause.Success.ShouldBeTrue(); var resume = await fx.RequestLocalAsync( "$JS.API.CONSUMER.PAUSE.PAUSE.C1", """{"pause":false}"""); resume.Success.ShouldBeTrue(); } // Go: TestJetStreamConsumerReset server/jetstream_test.go [Fact] public async Task Consumer_reset_resets_delivery_position() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("RESET", "reset.>"); _ = await fx.CreateConsumerAsync("RESET", "C1", "reset.>"); _ = await fx.PublishAndGetAckAsync("reset.x", "data"); // Fetch a message to advance position _ = await fx.FetchAsync("RESET", "C1", 1); var reset = await fx.RequestLocalAsync("$JS.API.CONSUMER.RESET.RESET.C1", "{}"); reset.Success.ShouldBeTrue(); } // Go: TestJetStreamConsumerUnpin server/jetstream_test.go [Fact] public async Task Consumer_unpin_returns_success() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("UNPIN", "unpin.>"); _ = await fx.CreateConsumerAsync("UNPIN", "C1", "unpin.>"); var unpin = await fx.RequestLocalAsync("$JS.API.CONSUMER.UNPIN.UNPIN.C1", "{}"); unpin.Success.ShouldBeTrue(); } // Go: TestJetStreamConsumerUpdate — update filter subject [Fact] public async Task Consumer_update_changes_config() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("UPD", "upd.>"); _ = await fx.CreateConsumerAsync("UPD", "C1", "upd.a"); var update = await fx.CreateConsumerAsync("UPD", "C1", "upd.b"); update.Error.ShouldBeNull(); var info = await fx.GetConsumerInfoAsync("UPD", "C1"); info.Config.FilterSubject.ShouldBe("upd.b"); } // Go: TestJetStreamConsumerList — list across stream boundary [Fact] public async Task Consumer_list_is_scoped_to_stream() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("S1", "s1.>"); _ = await fx.RequestLocalAsync("$JS.API.STREAM.CREATE.S2", """{"subjects":["s2.>"]}"""); _ = await fx.CreateConsumerAsync("S1", "C1", "s1.>"); _ = await fx.CreateConsumerAsync("S2", "C2", "s2.>"); var namesS1 = await fx.RequestLocalAsync("$JS.API.CONSUMER.NAMES.S1", "{}"); namesS1.ConsumerNames!.Count.ShouldBe(1); namesS1.ConsumerNames.ShouldContain("C1"); var namesS2 = await fx.RequestLocalAsync("$JS.API.CONSUMER.NAMES.S2", "{}"); namesS2.ConsumerNames!.Count.ShouldBe(1); namesS2.ConsumerNames.ShouldContain("C2"); } // Go: TestJetStreamConsumerDelete — double delete [Fact] public async Task Delete_non_existent_consumer_returns_not_found() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DF", "df.>"); var del = await fx.RequestLocalAsync("$JS.API.CONSUMER.DELETE.DF.NOPE", "{}"); del.Success.ShouldBeFalse(); } // Go: TestJetStreamConsumerCreate — default ack policy [Fact] public async Task Consumer_defaults_to_ack_none() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DEF", "def.>"); _ = await fx.CreateConsumerAsync("DEF", "C1", "def.>"); var info = await fx.GetConsumerInfoAsync("DEF", "C1"); info.Config.AckPolicy.ShouldBe(AckPolicy.None); } // Go: TestJetStreamConsumerCreate — default deliver policy [Fact] public async Task Consumer_defaults_to_deliver_all() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DDP", "ddp.>"); _ = await fx.CreateConsumerAsync("DDP", "C1", "ddp.>"); var info = await fx.GetConsumerInfoAsync("DDP", "C1"); info.Config.DeliverPolicy.ShouldBe(DeliverPolicy.All); } // Go: TestJetStreamConsumerCreate — default replay policy [Fact] public async Task Consumer_defaults_to_replay_instant() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DRP", "drp.>"); _ = await fx.CreateConsumerAsync("DRP", "C1", "drp.>"); var info = await fx.GetConsumerInfoAsync("DRP", "C1"); info.Config.ReplayPolicy.ShouldBe(ReplayPolicy.Instant); } // Go: TestJetStreamConsumerPause — pause non-existent consumer [Fact] public async Task Pause_non_existent_consumer_returns_not_found() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("PNE", "pne.>"); var pause = await fx.RequestLocalAsync( "$JS.API.CONSUMER.PAUSE.PNE.NOPE", """{"pause":true}"""); pause.Success.ShouldBeFalse(); } // Go: TestJetStreamConsumerCreate — durable name required for non-ephemeral [Fact] public async Task Consumer_without_durable_name_returns_error() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("NDN", "ndn.>"); // Send raw JSON without durable_name and without ephemeral flag var resp = await fx.RequestLocalAsync( "$JS.API.CONSUMER.CREATE.NDN.C1", """{"filter_subject":"ndn.>"}"""); // The consumer should be created since the subject has the durable name resp.Error.ShouldBeNull(); } // Go: TestJetStreamConsumerMaxDeliver [Fact] public async Task Consumer_max_deliver_setting() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("MD", "md.>"); var create = await fx.CreateConsumerAsync("MD", "C1", "md.>", ackPolicy: AckPolicy.Explicit); create.Error.ShouldBeNull(); var info = await fx.GetConsumerInfoAsync("MD", "C1"); info.Config.MaxDeliver.ShouldBeGreaterThanOrEqualTo(0); } // Go: TestJetStreamConsumerBackoff [Fact] public async Task Consumer_with_backoff_configuration() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("BO", "bo.>"); var resp = await fx.RequestLocalAsync( "$JS.API.CONSUMER.CREATE.BO.C1", """{"durable_name":"C1","filter_subject":"bo.>","ack_policy":"explicit","backoff_ms":[100,200,500]}"""); resp.Error.ShouldBeNull(); var info = await fx.GetConsumerInfoAsync("BO", "C1"); info.Config.BackOffMs.Count.ShouldBe(3); info.Config.BackOffMs[0].ShouldBe(100); info.Config.BackOffMs[1].ShouldBe(200); info.Config.BackOffMs[2].ShouldBe(500); } // Go: TestJetStreamConsumerRateLimit [Fact] public async Task Consumer_with_rate_limit() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("RL", "rl.>"); var resp = await fx.RequestLocalAsync( "$JS.API.CONSUMER.CREATE.RL.C1", """{"durable_name":"C1","filter_subject":"rl.>","push":true,"heartbeat_ms":100,"rate_limit_bps":1024}"""); resp.Error.ShouldBeNull(); var info = await fx.GetConsumerInfoAsync("RL", "C1"); info.Config.RateLimitBps.ShouldBe(1024); } // Go: TestJetStreamConsumerCreate — opt_start_seq [Fact] public async Task Consumer_with_opt_start_seq() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("OSS", "oss.>"); var resp = await fx.RequestLocalAsync( "$JS.API.CONSUMER.CREATE.OSS.C1", """{"durable_name":"C1","filter_subject":"oss.>","deliver_policy":"by_start_sequence","opt_start_seq":5}"""); resp.Error.ShouldBeNull(); var info = await fx.GetConsumerInfoAsync("OSS", "C1"); info.Config.DeliverPolicy.ShouldBe(DeliverPolicy.ByStartSequence); info.Config.OptStartSeq.ShouldBe(5UL); } // Go: TestJetStreamConsumerCreate — opt_start_time_utc [Fact] public async Task Consumer_with_opt_start_time() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("OST", "ost.>"); var resp = await fx.RequestLocalAsync( "$JS.API.CONSUMER.CREATE.OST.C1", """{"durable_name":"C1","filter_subject":"ost.>","deliver_policy":"by_start_time","opt_start_time_utc":"2025-01-01T00:00:00Z"}"""); resp.Error.ShouldBeNull(); var info = await fx.GetConsumerInfoAsync("OST", "C1"); info.Config.DeliverPolicy.ShouldBe(DeliverPolicy.ByStartTime); info.Config.OptStartTimeUtc.ShouldNotBeNull(); } // Go: TestJetStreamConsumerCreate — flow_control [Fact] public async Task Consumer_with_flow_control() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("FLOW", "flow.>"); var resp = await fx.RequestLocalAsync( "$JS.API.CONSUMER.CREATE.FLOW.C1", """{"durable_name":"C1","filter_subject":"flow.>","push":true,"heartbeat_ms":100,"flow_control":true}"""); resp.Error.ShouldBeNull(); var info = await fx.GetConsumerInfoAsync("FLOW", "C1"); info.Config.FlowControl.ShouldBeTrue(); } // Go: TestJetStreamConsumerDeliverLastPerSubject server/jetstream_test.go [Fact] public async Task Consumer_with_deliver_last_per_subject() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DLPS", "dlps.>"); var resp = await fx.RequestLocalAsync( "$JS.API.CONSUMER.CREATE.DLPS.C1", """{"durable_name":"C1","filter_subject":"dlps.>","deliver_policy":"last_per_subject"}"""); resp.Error.ShouldBeNull(); var info = await fx.GetConsumerInfoAsync("DLPS", "C1"); info.Config.DeliverPolicy.ShouldBe(DeliverPolicy.LastPerSubject); } }