// Go parity: golang/nats-server/server/jetstream_cluster_1_test.go // Covers: consumer creation, ack propagation, consumer state, // ephemeral consumers, consumer scaling, pull/push delivery, // redelivery, ack policies, filter subjects. using System.Text; using NATS.Server.JetStream; using NATS.Server.JetStream.Api; using NATS.Server.JetStream.Cluster; using NATS.Server.JetStream.Consumers; using NATS.Server.JetStream.Models; using NATS.Server.JetStream.Publish; namespace NATS.Server.JetStream.Tests.JetStream.Cluster; /// /// Tests covering clustered JetStream consumer creation, leader election, /// ack propagation, delivery policies, ephemeral consumers, and scaling. /// Ported from Go jetstream_cluster_1_test.go and jetstream_cluster_2_test.go. /// public class JetStreamClusterConsumerTests { // --------------------------------------------------------------- // Go: TestJetStreamClusterConsumerState server/jetstream_cluster_1_test.go:700 // --------------------------------------------------------------- [Fact] public async Task Consumer_state_tracks_pending_after_fetch() { await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("CSTATE", ["cs.>"], replicas: 3); await fx.CreateConsumerAsync("CSTATE", "track", filterSubject: "cs.>", ackPolicy: AckPolicy.Explicit); for (var i = 0; i < 5; i++) await fx.PublishAsync("cs.event", $"msg-{i}"); var batch = await fx.FetchAsync("CSTATE", "track", 3); batch.Messages.Count.ShouldBe(3); var pending = fx.GetPendingCount("CSTATE", "track"); pending.ShouldBe(3); } // --------------------------------------------------------------- // Go: TestJetStreamClusterConsumerRedeliveredInfo server/jetstream_cluster_1_test.go:659 // --------------------------------------------------------------- [Fact] public async Task Consumer_redelivery_marks_messages_as_redelivered() { await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("REDELIV", ["rd.>"], replicas: 3); await fx.CreateConsumerAsync("REDELIV", "rdc", filterSubject: "rd.>", ackPolicy: AckPolicy.Explicit, ackWaitMs: 1, maxDeliver: 5); await fx.PublishAsync("rd.event", "will-redeliver"); // First fetch should get the message var batch1 = await fx.FetchAsync("REDELIV", "rdc", 1); batch1.Messages.Count.ShouldBe(1); batch1.Messages[0].Redelivered.ShouldBeFalse(); // Wait for ack timeout await Task.Delay(50); // Second fetch should get redelivered message var batch2 = await fx.FetchAsync("REDELIV", "rdc", 1); batch2.Messages.Count.ShouldBe(1); batch2.Messages[0].Redelivered.ShouldBeTrue(); } // --------------------------------------------------------------- // Go: TestJetStreamClusterFullConsumerState server/jetstream_cluster_1_test.go:795 // --------------------------------------------------------------- [Fact] public async Task Full_consumer_state_reflects_ack_floor_after_ack_all() { await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("FULLCS", ["fcs.>"], replicas: 3); await fx.CreateConsumerAsync("FULLCS", "full", filterSubject: "fcs.>", ackPolicy: AckPolicy.All); for (var i = 0; i < 10; i++) await fx.PublishAsync("fcs.event", $"msg-{i}"); var batch = await fx.FetchAsync("FULLCS", "full", 10); batch.Messages.Count.ShouldBe(10); // Ack all up to sequence 5 fx.AckAll("FULLCS", "full", 5); var pending = fx.GetPendingCount("FULLCS", "full"); pending.ShouldBe(5); } // --------------------------------------------------------------- // Go: TestJetStreamClusterEphemeralConsumerNoImmediateInterest server/jetstream_cluster_1_test.go:2481 // --------------------------------------------------------------- [Fact] public async Task Ephemeral_consumer_creation_succeeds() { await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("EPHEM", ["eph.>"], replicas: 3); var resp = await fx.CreateConsumerAsync("EPHEM", null, ephemeral: true); resp.ConsumerInfo.ShouldNotBeNull(); resp.ConsumerInfo!.Config.DurableName.ShouldNotBeNullOrEmpty(); } // --------------------------------------------------------------- // Go: TestJetStreamClusterEphemeralConsumersNotReplicated server/jetstream_cluster_1_test.go:2599 // --------------------------------------------------------------- [Fact] public async Task Multiple_ephemeral_consumers_have_unique_names() { await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("EPHUNIQ", ["eu.>"], replicas: 3); var resp1 = await fx.CreateConsumerAsync("EPHUNIQ", null, ephemeral: true); var resp2 = await fx.CreateConsumerAsync("EPHUNIQ", null, ephemeral: true); resp1.ConsumerInfo!.Config.DurableName.ShouldNotBe(resp2.ConsumerInfo!.Config.DurableName); } // --------------------------------------------------------------- // Go: TestJetStreamClusterCreateConcurrentDurableConsumers server/jetstream_cluster_2_test.go:1572 // --------------------------------------------------------------- [Fact] public async Task Concurrent_durable_consumer_creation_is_idempotent() { await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("CONC", ["conc.>"], replicas: 3); // Create same consumer twice; both should succeed var resp1 = await fx.CreateConsumerAsync("CONC", "same"); var resp2 = await fx.CreateConsumerAsync("CONC", "same"); resp1.ConsumerInfo.ShouldNotBeNull(); resp2.ConsumerInfo.ShouldNotBeNull(); resp1.ConsumerInfo!.Config.DurableName.ShouldBe("same"); resp2.ConsumerInfo!.Config.DurableName.ShouldBe("same"); } // --------------------------------------------------------------- // Go: TestJetStreamClusterPullConsumerLeakedSubs server/jetstream_cluster_2_test.go:2239 // --------------------------------------------------------------- [Fact] public async Task Pull_consumer_fetch_returns_correct_batch_size() { await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("PULLBS", ["pb.>"], replicas: 3); await fx.CreateConsumerAsync("PULLBS", "puller", filterSubject: "pb.>", ackPolicy: AckPolicy.None); for (var i = 0; i < 20; i++) await fx.PublishAsync("pb.event", $"msg-{i}"); var batch = await fx.FetchAsync("PULLBS", "puller", 5); batch.Messages.Count.ShouldBe(5); } // --------------------------------------------------------------- // Go: TestJetStreamClusterConsumerLastActiveReporting server/jetstream_cluster_2_test.go:2371 // --------------------------------------------------------------- [Fact] public async Task Consumer_info_returns_config_after_creation() { await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("CINFO", ["ci.>"], replicas: 3); await fx.CreateConsumerAsync("CINFO", "info_dur", filterSubject: "ci.>", ackPolicy: AckPolicy.Explicit); var info = await fx.GetConsumerInfoAsync("CINFO", "info_dur"); info.ShouldNotBeNull(); info.Config.DurableName.ShouldBe("info_dur"); info.Config.AckPolicy.ShouldBe(AckPolicy.Explicit); } // --------------------------------------------------------------- // Go: TestJetStreamClusterAckPendingWithExpired server/jetstream_cluster_2_test.go:309 // --------------------------------------------------------------- [Fact] public async Task Ack_pending_tracks_expired_messages() { await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("ACKEXP", ["ae.>"], replicas: 3); await fx.CreateConsumerAsync("ACKEXP", "acker", filterSubject: "ae.>", ackPolicy: AckPolicy.Explicit, ackWaitMs: 1, maxDeliver: 10); await fx.PublishAsync("ae.event", "will-expire"); // Fetch to register pending var batch1 = await fx.FetchAsync("ACKEXP", "acker", 1); batch1.Messages.Count.ShouldBe(1); fx.GetPendingCount("ACKEXP", "acker").ShouldBe(1); } // --------------------------------------------------------------- // Go: TestJetStreamClusterAckPendingWithMaxRedelivered server/jetstream_cluster_2_test.go:377 // --------------------------------------------------------------- [Fact] public async Task Max_deliver_limits_redelivery_attempts() { await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("MAXRED", ["mr.>"], replicas: 3); // maxDeliver=2: allows initial delivery (deliveries=1) + one redelivery (deliveries=2). // After ScheduleRedelivery increments to deliveries=2, the next check has deliveries=2 > maxDeliver=2 = false, // so it redelivers once more. Only at deliveries=3 > 2 does it stop. await fx.CreateConsumerAsync("MAXRED", "maxr", filterSubject: "mr.>", ackPolicy: AckPolicy.Explicit, ackWaitMs: 1, maxDeliver: 2); await fx.PublishAsync("mr.event", "limited-redeliver"); // First fetch (initial delivery, Register sets deliveries=1) var batch1 = await fx.FetchAsync("MAXRED", "maxr", 1); batch1.Messages.Count.ShouldBe(1); // Wait for expiry await Task.Delay(50); // Second fetch: TryGetExpired returns deliveries=1, 1 > 2 is false, so redeliver. // ScheduleRedelivery increments to deliveries=2. var batch2 = await fx.FetchAsync("MAXRED", "maxr", 1); batch2.Messages.Count.ShouldBe(1); batch2.Messages[0].Redelivered.ShouldBeTrue(); // Wait for expiry await Task.Delay(50); // Third fetch: TryGetExpired returns deliveries=2, 2 > 2 is false, so redeliver again. // ScheduleRedelivery increments to deliveries=3. var batch3 = await fx.FetchAsync("MAXRED", "maxr", 1); batch3.Messages.Count.ShouldBe(1); batch3.Messages[0].Redelivered.ShouldBeTrue(); // Wait for expiry await Task.Delay(50); // Fourth fetch: TryGetExpired returns deliveries=3, 3 > 2 is true, so AckAll triggers // and returns empty batch (max deliver exceeded). var batch4 = await fx.FetchAsync("MAXRED", "maxr", 1); batch4.Messages.Count.ShouldBe(0); } // --------------------------------------------------------------- // Go: TestJetStreamClusterMaxConsumers server/jetstream_cluster_2_test.go:1978 // --------------------------------------------------------------- [Fact] public async Task Consumer_delete_succeeds_in_cluster() { await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("CDEL", ["cdel.>"], replicas: 3); await fx.CreateConsumerAsync("CDEL", "to_delete"); var del = await fx.RequestAsync($"{JetStreamApiSubjects.ConsumerDelete}CDEL.to_delete", "{}"); del.Success.ShouldBeTrue(); var info = await fx.RequestAsync($"{JetStreamApiSubjects.ConsumerInfo}CDEL.to_delete", "{}"); info.Error.ShouldNotBeNull(); } // --------------------------------------------------------------- // Go: TestJetStreamClusterFlowControlRequiresHeartbeats server/jetstream_cluster_2_test.go:2712 // --------------------------------------------------------------- [Fact] public async Task Consumer_with_filter_subjects_delivers_matching_only() { await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("FILT", ["filt.>"], replicas: 3); await fx.CreateConsumerAsync("FILT", "filtered", filterSubject: "filt.alpha"); await fx.PublishAsync("filt.alpha", "match"); await fx.PublishAsync("filt.beta", "no-match"); await fx.PublishAsync("filt.alpha", "match2"); var batch = await fx.FetchAsync("FILT", "filtered", 10); batch.Messages.Count.ShouldBe(2); batch.Messages[0].Subject.ShouldBe("filt.alpha"); batch.Messages[1].Subject.ShouldBe("filt.alpha"); } // --------------------------------------------------------------- // Go: TestJetStreamClusterConsumerScaleUp server/jetstream_cluster_1_test.go:4203 // --------------------------------------------------------------- [Fact] public async Task Consumer_pause_and_resume_via_api() { await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("PAUSE", ["pause.>"], replicas: 3); await fx.CreateConsumerAsync("PAUSE", "pausable"); var pauseResp = await fx.RequestAsync($"{JetStreamApiSubjects.ConsumerPause}PAUSE.pausable", """{"pause":true}"""); pauseResp.Success.ShouldBeTrue(); var resumeResp = await fx.RequestAsync($"{JetStreamApiSubjects.ConsumerPause}PAUSE.pausable", """{"pause":false}"""); resumeResp.Success.ShouldBeTrue(); } // --------------------------------------------------------------- // Go: TestJetStreamClusterConsumerResetPendingDeliveriesOnMaxAckPendingUpdate // server/jetstream_cluster_1_test.go:8696 // --------------------------------------------------------------- [Fact] public async Task Consumer_reset_resets_next_sequence_and_returns_success() { await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("RESET", ["reset.>"], replicas: 3); await fx.CreateConsumerAsync("RESET", "resettable", filterSubject: "reset.>"); for (var i = 0; i < 5; i++) await fx.PublishAsync("reset.event", $"msg-{i}"); // Fetch some messages to advance the consumer var batch1 = await fx.FetchAsync("RESET", "resettable", 3); batch1.Messages.Count.ShouldBe(3); // Reset via API var resetResp = await fx.RequestAsync($"{JetStreamApiSubjects.ConsumerReset}RESET.resettable", "{}"); resetResp.Success.ShouldBeTrue(); // After reset, consumer should re-deliver from sequence 1 var batch2 = await fx.FetchAsync("RESET", "resettable", 5); batch2.Messages.Count.ShouldBe(5); batch2.Messages[0].Sequence.ShouldBe(1UL); } // --------------------------------------------------------------- // Go: TestJetStreamClusterPushConsumerQueueGroup server/jetstream_cluster_2_test.go:2300 // --------------------------------------------------------------- [Fact] public async Task Push_consumer_creation_with_heartbeat() { await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("PUSHHB", ["ph.>"], replicas: 3); var resp = await fx.CreateConsumerAsync("PUSHHB", "pusher", push: true, heartbeatMs: 100); resp.ConsumerInfo.ShouldNotBeNull(); resp.ConsumerInfo!.Config.Push.ShouldBeTrue(); resp.ConsumerInfo.Config.HeartbeatMs.ShouldBe(100); } // --------------------------------------------------------------- // Go: TestJetStreamClusterScaleConsumer server/jetstream_cluster_1_test.go:4109 // --------------------------------------------------------------- [Fact] public async Task Consumer_unpin_via_api() { await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("UNPIN", ["unpin.>"], replicas: 3); await fx.CreateConsumerAsync("UNPIN", "pinned"); var resp = await fx.RequestAsync($"{JetStreamApiSubjects.ConsumerUnpin}UNPIN.pinned", "{}"); resp.Success.ShouldBeTrue(); } // --------------------------------------------------------------- // Additional: Consumer AckAll policy acks all up to given sequence // --------------------------------------------------------------- [Fact] public async Task AckAll_policy_consumer_acks_all_preceding_messages() { await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("ACKALL", ["aa.>"], replicas: 3); await fx.CreateConsumerAsync("ACKALL", "acker", filterSubject: "aa.>", ackPolicy: AckPolicy.All); for (var i = 0; i < 10; i++) await fx.PublishAsync("aa.event", $"msg-{i}"); var batch = await fx.FetchAsync("ACKALL", "acker", 10); batch.Messages.Count.ShouldBe(10); // Ack up to seq 7 (all 1-7 should be acked, 8-10 remain pending) fx.AckAll("ACKALL", "acker", 7); fx.GetPendingCount("ACKALL", "acker").ShouldBe(3); } // --------------------------------------------------------------- // Additional: DeliverPolicy.Last consumer starts at last message // --------------------------------------------------------------- [Fact] public async Task DeliverPolicy_Last_consumer_starts_at_last_sequence() { await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("DLAST", ["dl.>"], replicas: 3); for (var i = 0; i < 5; i++) await fx.PublishAsync("dl.event", $"msg-{i}"); await fx.CreateConsumerAsync("DLAST", "last_cons", filterSubject: "dl.>", deliverPolicy: DeliverPolicy.Last); var batch = await fx.FetchAsync("DLAST", "last_cons", 10); batch.Messages.Count.ShouldBe(1); batch.Messages[0].Sequence.ShouldBe(5UL); } // --------------------------------------------------------------- // Additional: DeliverPolicy.New consumer skips existing messages // --------------------------------------------------------------- [Fact] public async Task DeliverPolicy_New_consumer_skips_existing() { await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("DNEW", ["dn.>"], replicas: 3); for (var i = 0; i < 5; i++) await fx.PublishAsync("dn.event", $"msg-{i}"); await fx.CreateConsumerAsync("DNEW", "new_cons", filterSubject: "dn.>", deliverPolicy: DeliverPolicy.New); // Should get no messages since consumer starts at LastSeq+1 var batch = await fx.FetchAsync("DNEW", "new_cons", 10); batch.Messages.Count.ShouldBe(0); // Publish a new message after consumer creation await fx.PublishAsync("dn.event", "after-consumer"); var batch2 = await fx.FetchAsync("DNEW", "new_cons", 10); batch2.Messages.Count.ShouldBe(1); } // --------------------------------------------------------------- // Additional: DeliverPolicy.ByStartSequence // --------------------------------------------------------------- [Fact] public async Task DeliverPolicy_ByStartSequence_starts_at_given_sequence() { await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("DSTART", ["ds.>"], replicas: 3); for (var i = 0; i < 10; i++) await fx.PublishAsync("ds.event", $"msg-{i}"); await fx.CreateConsumerAsync("DSTART", "start_cons", filterSubject: "ds.>", deliverPolicy: DeliverPolicy.ByStartSequence, optStartSeq: 7); var batch = await fx.FetchAsync("DSTART", "start_cons", 10); batch.Messages.Count.ShouldBe(4); // seq 7, 8, 9, 10 batch.Messages[0].Sequence.ShouldBe(7UL); } // --------------------------------------------------------------- // Additional: Multiple filter subjects // --------------------------------------------------------------- [Fact] public async Task Consumer_with_multiple_filter_subjects() { await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("MFILT", ["mf.>"], replicas: 3); await fx.CreateConsumerAsync("MFILT", "multi_filt", filterSubjects: ["mf.alpha", "mf.gamma"]); await fx.PublishAsync("mf.alpha", "a"); await fx.PublishAsync("mf.beta", "b"); await fx.PublishAsync("mf.gamma", "g"); await fx.PublishAsync("mf.delta", "d"); var batch = await fx.FetchAsync("MFILT", "multi_filt", 10); batch.Messages.Count.ShouldBe(2); } // --------------------------------------------------------------- // Additional: NoWait fetch returns empty when no messages // --------------------------------------------------------------- [Fact] public async Task NoWait_fetch_returns_empty_when_no_pending() { await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("NOWAIT", ["nw.>"], replicas: 3); await fx.CreateConsumerAsync("NOWAIT", "nw_cons", filterSubject: "nw.>"); var batch = await fx.FetchNoWaitAsync("NOWAIT", "nw_cons", 5); batch.Messages.Count.ShouldBe(0); } } /// /// Self-contained fixture for JetStream cluster consumer tests. /// internal sealed class ClusterConsumerFixture : IAsyncDisposable { private readonly JetStreamMetaGroup _metaGroup; private readonly StreamManager _streamManager; private readonly ConsumerManager _consumerManager; private readonly JetStreamApiRouter _router; private readonly JetStreamPublisher _publisher; private ClusterConsumerFixture( JetStreamMetaGroup metaGroup, StreamManager streamManager, ConsumerManager consumerManager, JetStreamApiRouter router, JetStreamPublisher publisher) { _metaGroup = metaGroup; _streamManager = streamManager; _consumerManager = consumerManager; _router = router; _publisher = publisher; } public static Task StartAsync(int nodes) { var meta = new JetStreamMetaGroup(nodes); var consumerManager = new ConsumerManager(meta); var streamManager = new StreamManager(meta, consumerManager: consumerManager); var router = new JetStreamApiRouter(streamManager, consumerManager, meta); var publisher = new JetStreamPublisher(streamManager); return Task.FromResult(new ClusterConsumerFixture(meta, streamManager, consumerManager, router, publisher)); } public Task CreateStreamAsync(string name, string[] subjects, int replicas) { var response = _streamManager.CreateOrUpdate(new StreamConfig { Name = name, Subjects = [.. subjects], Replicas = replicas, }); if (response.Error is not null) throw new InvalidOperationException(response.Error.Description); return Task.CompletedTask; } public Task CreateConsumerAsync( string stream, string? durableName, string? filterSubject = null, AckPolicy ackPolicy = AckPolicy.None, int ackWaitMs = 30_000, int maxDeliver = 1, bool ephemeral = false, bool push = false, int heartbeatMs = 0, DeliverPolicy deliverPolicy = DeliverPolicy.All, ulong optStartSeq = 0, IReadOnlyList? filterSubjects = null) { var config = new ConsumerConfig { DurableName = durableName ?? string.Empty, AckPolicy = ackPolicy, AckWaitMs = ackWaitMs, MaxDeliver = maxDeliver, Ephemeral = ephemeral, Push = push, HeartbeatMs = heartbeatMs, DeliverPolicy = deliverPolicy, OptStartSeq = optStartSeq, }; if (!string.IsNullOrWhiteSpace(filterSubject)) config.FilterSubject = filterSubject; if (filterSubjects is { Count: > 0 }) config.FilterSubjects = [.. filterSubjects]; return Task.FromResult(_consumerManager.CreateOrUpdate(stream, config)); } public Task PublishAsync(string subject, string payload) { if (_publisher.TryCapture(subject, Encoding.UTF8.GetBytes(payload), null, out var ack)) { if (ack.ErrorCode == null && _streamManager.TryGet(ack.Stream, out var handle)) { var stored = handle.Store.LoadAsync(ack.Seq, default).GetAwaiter().GetResult(); if (stored != null) _consumerManager.OnPublished(ack.Stream, stored); } return Task.FromResult(ack); } throw new InvalidOperationException($"Publish to '{subject}' did not match a stream."); } public Task FetchAsync(string stream, string durableName, int batch) => _consumerManager.FetchAsync(stream, durableName, batch, _streamManager, default).AsTask(); public Task FetchNoWaitAsync(string stream, string durableName, int batch) => _consumerManager.FetchAsync(stream, durableName, new PullFetchRequest { Batch = batch, NoWait = true, }, _streamManager, default).AsTask(); public void AckAll(string stream, string durableName, ulong sequence) => _consumerManager.AckAll(stream, durableName, sequence); public int GetPendingCount(string stream, string durableName) => _consumerManager.GetPendingCount(stream, durableName); public Task GetConsumerInfoAsync(string stream, string durableName) { var resp = _consumerManager.GetInfo(stream, durableName); if (resp.ConsumerInfo == null) throw new InvalidOperationException("Consumer not found."); return Task.FromResult(resp.ConsumerInfo); } public Task RequestAsync(string subject, string payload) => Task.FromResult(_router.Route(subject, Encoding.UTF8.GetBytes(payload))); public ValueTask DisposeAsync() => ValueTask.CompletedTask; }