// Go parity: golang/nats-server/server/jetstream_cluster_1_test.go // golang/nats-server/server/jetstream_cluster_2_test.go // Covers: per-consumer RAFT groups, consumer assignment, ack state // replication, consumer failover, pull request forwarding, ephemeral // consumer lifecycle, delivery policy handling. using System.Collections.Concurrent; using System.Reflection; using System.Text; using NATS.Server.JetStream; using NATS.Server.JetStream.Api; using NATS.Server.JetStream.Cluster; using NATS.Server.JetStream.Consumers; using NATS.Server.JetStream.Models; using NATS.Server.JetStream.Publish; using NATS.Server.JetStream.Storage; namespace NATS.Server.JetStream.Tests.JetStream.Cluster; /// /// Tests covering per-consumer RAFT groups: consumer assignment, ack state /// replication, consumer failover, pull request forwarding, ephemeral /// consumer lifecycle, and delivery policy handling in clustered mode. /// Ported from Go jetstream_cluster_1_test.go and jetstream_cluster_2_test.go. /// public class ConsumerReplicaGroupTests { // --------------------------------------------------------------- // Go: TestJetStreamClusterConsumerState server/jetstream_cluster_1_test.go:700 // --------------------------------------------------------------- [Fact] public async Task Consumer_creation_registers_in_manager() { await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("REG", ["reg.>"], replicas: 3); var resp = await fx.CreateConsumerAsync("REG", "d1"); resp.ConsumerInfo.ShouldNotBeNull(); resp.ConsumerInfo!.Config.DurableName.ShouldBe("d1"); } // --------------------------------------------------------------- // Go: TestJetStreamClusterConsumerState server/jetstream_cluster_1_test.go:700 // --------------------------------------------------------------- [Fact] public async Task Consumer_pending_count_tracks_unacked_messages() { await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("PEND", ["pend.>"], replicas: 3); await fx.CreateConsumerAsync("PEND", "acker", filterSubject: "pend.>", ackPolicy: AckPolicy.Explicit); for (var i = 0; i < 5; i++) await fx.PublishAsync("pend.event", $"msg-{i}"); var batch = await fx.FetchAsync("PEND", "acker", 3); batch.Messages.Count.ShouldBe(3); fx.GetPendingCount("PEND", "acker").ShouldBe(3); } // --------------------------------------------------------------- // Go: TestJetStreamClusterFullConsumerState server/jetstream_cluster_1_test.go:795 // --------------------------------------------------------------- [Fact] public async Task AckAll_reduces_pending_count() { await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("ACKRED", ["ar.>"], replicas: 3); await fx.CreateConsumerAsync("ACKRED", "acker", filterSubject: "ar.>", ackPolicy: AckPolicy.All); for (var i = 0; i < 10; i++) await fx.PublishAsync("ar.event", $"msg-{i}"); await fx.FetchAsync("ACKRED", "acker", 10); fx.AckAll("ACKRED", "acker", 7); fx.GetPendingCount("ACKRED", "acker").ShouldBe(3); } // --------------------------------------------------------------- // Go: TestJetStreamClusterFullConsumerState server/jetstream_cluster_1_test.go:795 // --------------------------------------------------------------- [Fact] public async Task AckAll_to_last_seq_clears_all_pending() { await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("ACKCLEAR", ["ac.>"], replicas: 3); await fx.CreateConsumerAsync("ACKCLEAR", "acker", filterSubject: "ac.>", ackPolicy: AckPolicy.All); for (var i = 0; i < 5; i++) await fx.PublishAsync("ac.event", $"msg-{i}"); await fx.FetchAsync("ACKCLEAR", "acker", 5); fx.AckAll("ACKCLEAR", "acker", 5); fx.GetPendingCount("ACKCLEAR", "acker").ShouldBe(0); } // --------------------------------------------------------------- // Go: TestJetStreamClusterConsumerRedeliveredInfo server/jetstream_cluster_1_test.go:659 // --------------------------------------------------------------- [Fact] public async Task Consumer_redelivery_sets_redelivered_flag() { await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("REDEL", ["rd.>"], replicas: 3); await fx.CreateConsumerAsync("REDEL", "rdc", filterSubject: "rd.>", ackPolicy: AckPolicy.Explicit, ackWaitMs: 1, maxDeliver: 5); await fx.PublishAsync("rd.event", "will-redeliver"); var batch1 = await fx.FetchAsync("REDEL", "rdc", 1); batch1.Messages.Count.ShouldBe(1); batch1.Messages[0].Redelivered.ShouldBeFalse(); await Task.Delay(50); var batch2 = await fx.FetchAsync("REDEL", "rdc", 1); batch2.Messages.Count.ShouldBe(1); batch2.Messages[0].Redelivered.ShouldBeTrue(); } // --------------------------------------------------------------- // Go: TestJetStreamClusterRestoreSingleConsumer server/jetstream_cluster_1_test.go:1028 // --------------------------------------------------------------- [Fact] public async Task Consumer_survives_stream_leader_stepdown() { await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("CSURV", ["csv.>"], replicas: 3); await fx.CreateConsumerAsync("CSURV", "durable1", filterSubject: "csv.>"); for (var i = 0; i < 10; i++) await fx.PublishAsync("csv.event", $"msg-{i}"); var batch1 = await fx.FetchAsync("CSURV", "durable1", 5); batch1.Messages.Count.ShouldBe(5); await fx.StepDownStreamLeaderAsync("CSURV"); var batch2 = await fx.FetchAsync("CSURV", "durable1", 5); batch2.Messages.Count.ShouldBe(5); } // --------------------------------------------------------------- // Go: TestJetStreamClusterPullConsumerLeakedSubs server/jetstream_cluster_2_test.go:2239 // --------------------------------------------------------------- [Fact] public async Task Pull_consumer_fetch_returns_correct_batch() { await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("PULL", ["pull.>"], replicas: 3); await fx.CreateConsumerAsync("PULL", "puller", filterSubject: "pull.>"); for (var i = 0; i < 20; i++) await fx.PublishAsync("pull.event", $"msg-{i}"); var batch = await fx.FetchAsync("PULL", "puller", 5); batch.Messages.Count.ShouldBe(5); } // --------------------------------------------------------------- // Go: TestJetStreamClusterConsumerLastActiveReporting server/jetstream_cluster_2_test.go:2371 // --------------------------------------------------------------- [Fact] public async Task Consumer_info_returns_correct_config() { await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("INFO", ["ci.>"], replicas: 3); await fx.CreateConsumerAsync("INFO", "info_dur", filterSubject: "ci.>", ackPolicy: AckPolicy.Explicit); var info = await fx.GetConsumerInfoAsync("INFO", "info_dur"); info.Config.DurableName.ShouldBe("info_dur"); info.Config.AckPolicy.ShouldBe(AckPolicy.Explicit); } // --------------------------------------------------------------- // Go: TestJetStreamClusterEphemeralConsumerNoImmediateInterest server/jetstream_cluster_1_test.go:2481 // --------------------------------------------------------------- [Fact] public async Task Ephemeral_consumer_creation_succeeds() { await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("EPHEM", ["eph.>"], replicas: 3); var resp = await fx.CreateConsumerAsync("EPHEM", null, ephemeral: true); resp.ConsumerInfo.ShouldNotBeNull(); resp.ConsumerInfo!.Config.DurableName.ShouldNotBeNullOrEmpty(); } // --------------------------------------------------------------- // Go: TestJetStreamClusterEphemeralConsumersNotReplicated server/jetstream_cluster_1_test.go:2599 // --------------------------------------------------------------- [Fact] public async Task Ephemeral_consumers_get_unique_names() { await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("UNIQ", ["u.>"], replicas: 3); var resp1 = await fx.CreateConsumerAsync("UNIQ", null, ephemeral: true); var resp2 = await fx.CreateConsumerAsync("UNIQ", null, ephemeral: true); resp1.ConsumerInfo!.Config.DurableName .ShouldNotBe(resp2.ConsumerInfo!.Config.DurableName); } // --------------------------------------------------------------- // Go: TestJetStreamClusterCreateConcurrentDurableConsumers server/jetstream_cluster_2_test.go:1572 // --------------------------------------------------------------- [Fact] public async Task Durable_consumer_create_is_idempotent() { await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("IDEMP", ["id.>"], replicas: 3); var resp1 = await fx.CreateConsumerAsync("IDEMP", "same"); var resp2 = await fx.CreateConsumerAsync("IDEMP", "same"); resp1.ConsumerInfo!.Config.DurableName.ShouldBe("same"); resp2.ConsumerInfo!.Config.DurableName.ShouldBe("same"); } // --------------------------------------------------------------- // Go: TestJetStreamClusterMaxConsumers server/jetstream_cluster_2_test.go:1978 // --------------------------------------------------------------- [Fact] public async Task Consumer_delete_succeeds() { await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("DEL", ["del.>"], replicas: 3); await fx.CreateConsumerAsync("DEL", "to_delete"); var resp = await fx.RequestAsync($"{JetStreamApiSubjects.ConsumerDelete}DEL.to_delete", "{}"); resp.Success.ShouldBeTrue(); } // --------------------------------------------------------------- // Go: TestJetStreamClusterConsumerPause server/jetstream_cluster_1_test.go:4203 // --------------------------------------------------------------- [Fact] public async Task Consumer_pause_and_resume_via_api() { await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("PAUSE", ["pause.>"], replicas: 3); await fx.CreateConsumerAsync("PAUSE", "pausable"); var pause = await fx.RequestAsync($"{JetStreamApiSubjects.ConsumerPause}PAUSE.pausable", """{"pause":true}"""); pause.Success.ShouldBeTrue(); var resume = await fx.RequestAsync($"{JetStreamApiSubjects.ConsumerPause}PAUSE.pausable", """{"pause":false}"""); resume.Success.ShouldBeTrue(); } // --------------------------------------------------------------- // Go: TestJetStreamClusterConsumerResetPendingDeliveriesOnMaxAckPendingUpdate // server/jetstream_cluster_1_test.go:8696 // --------------------------------------------------------------- [Fact] public async Task Consumer_reset_resets_sequence_to_beginning() { await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("RESET", ["reset.>"], replicas: 3); await fx.CreateConsumerAsync("RESET", "resettable", filterSubject: "reset.>"); for (var i = 0; i < 5; i++) await fx.PublishAsync("reset.event", $"msg-{i}"); // Advance the consumer await fx.FetchAsync("RESET", "resettable", 3); // Reset var resp = await fx.RequestAsync($"{JetStreamApiSubjects.ConsumerReset}RESET.resettable", "{}"); resp.Success.ShouldBeTrue(); // After reset should re-deliver from sequence 1 var batch = await fx.FetchAsync("RESET", "resettable", 5); batch.Messages.Count.ShouldBe(5); batch.Messages[0].Sequence.ShouldBe(1UL); } // --------------------------------------------------------------- // Go: TestJetStreamClusterFlowControlRequiresHeartbeats server/jetstream_cluster_2_test.go:2712 // --------------------------------------------------------------- [Fact] public async Task Consumer_with_filter_subject_delivers_matching_only() { await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("FILT", ["filt.>"], replicas: 3); await fx.CreateConsumerAsync("FILT", "filtered", filterSubject: "filt.alpha"); await fx.PublishAsync("filt.alpha", "match"); await fx.PublishAsync("filt.beta", "no-match"); await fx.PublishAsync("filt.alpha", "match2"); var batch = await fx.FetchAsync("FILT", "filtered", 10); batch.Messages.Count.ShouldBe(2); } // --------------------------------------------------------------- // Go: TestJetStreamClusterConsumerDeliverPolicy server/jetstream_cluster_2_test.go:550 // --------------------------------------------------------------- [Fact] public async Task DeliverPolicy_Last_starts_at_last_message() { await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("DLAST", ["dl.>"], replicas: 3); for (var i = 0; i < 5; i++) await fx.PublishAsync("dl.event", $"msg-{i}"); await fx.CreateConsumerAsync("DLAST", "last_c", filterSubject: "dl.>", deliverPolicy: DeliverPolicy.Last); var batch = await fx.FetchAsync("DLAST", "last_c", 10); batch.Messages.Count.ShouldBe(1); batch.Messages[0].Sequence.ShouldBe(5UL); } // --------------------------------------------------------------- // Go: TestJetStreamClusterConsumerDeliverPolicy server/jetstream_cluster_2_test.go:550 // --------------------------------------------------------------- [Fact] public async Task DeliverPolicy_New_skips_existing_messages() { await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("DNEW", ["dn.>"], replicas: 3); for (var i = 0; i < 5; i++) await fx.PublishAsync("dn.event", $"msg-{i}"); await fx.CreateConsumerAsync("DNEW", "new_c", filterSubject: "dn.>", deliverPolicy: DeliverPolicy.New); var batch = await fx.FetchAsync("DNEW", "new_c", 10); batch.Messages.Count.ShouldBe(0); } // --------------------------------------------------------------- // Go: TestJetStreamClusterConsumerDeliverPolicy server/jetstream_cluster_2_test.go:550 // --------------------------------------------------------------- [Fact] public async Task DeliverPolicy_ByStartSequence_starts_at_given_seq() { await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("DSTART", ["ds.>"], replicas: 3); for (var i = 0; i < 10; i++) await fx.PublishAsync("ds.event", $"msg-{i}"); await fx.CreateConsumerAsync("DSTART", "start_c", filterSubject: "ds.>", deliverPolicy: DeliverPolicy.ByStartSequence, optStartSeq: 7); var batch = await fx.FetchAsync("DSTART", "start_c", 10); batch.Messages.Count.ShouldBe(4); batch.Messages[0].Sequence.ShouldBe(7UL); } // --------------------------------------------------------------- // Go: TestJetStreamClusterConsumerUnpin server/jetstream_cluster_1_test.go:4109 // --------------------------------------------------------------- [Fact] public async Task Consumer_unpin_api_returns_success() { await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("UNPIN", ["unpin.>"], replicas: 3); await fx.CreateConsumerAsync("UNPIN", "pinned"); var resp = await fx.RequestAsync($"{JetStreamApiSubjects.ConsumerUnpin}UNPIN.pinned", "{}"); resp.Success.ShouldBeTrue(); } // --------------------------------------------------------------- // Go: TestJetStreamClusterConsumerLeaderStepdown server/jetstream_cluster_2_test.go:1400 // --------------------------------------------------------------- [Fact] public async Task Consumer_leader_stepdown_api_returns_success() { await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3); await fx.CreateStreamAsync("CLS", ["cls.>"], replicas: 3); await fx.CreateConsumerAsync("CLS", "dur1"); var resp = await fx.RequestAsync($"{JetStreamApiSubjects.ConsumerLeaderStepdown}CLS.dur1", "{}"); resp.Success.ShouldBeTrue(); } } /// /// Self-contained fixture for consumer replica group tests. /// internal sealed class ConsumerReplicaFixture : IAsyncDisposable { private readonly JetStreamMetaGroup _metaGroup; private readonly StreamManager _streamManager; private readonly ConsumerManager _consumerManager; private readonly JetStreamApiRouter _router; private readonly JetStreamPublisher _publisher; private ConsumerReplicaFixture( JetStreamMetaGroup metaGroup, StreamManager streamManager, ConsumerManager consumerManager, JetStreamApiRouter router, JetStreamPublisher publisher) { _metaGroup = metaGroup; _streamManager = streamManager; _consumerManager = consumerManager; _router = router; _publisher = publisher; } public static Task StartAsync(int nodes) { var meta = new JetStreamMetaGroup(nodes); var consumerManager = new ConsumerManager(meta); var streamManager = new StreamManager(meta, consumerManager: consumerManager); var router = new JetStreamApiRouter(streamManager, consumerManager, meta); var publisher = new JetStreamPublisher(streamManager); return Task.FromResult(new ConsumerReplicaFixture(meta, streamManager, consumerManager, router, publisher)); } public Task CreateStreamAsync(string name, string[] subjects, int replicas) { var response = _streamManager.CreateOrUpdate(new StreamConfig { Name = name, Subjects = [.. subjects], Replicas = replicas, }); if (response.Error is not null) throw new InvalidOperationException(response.Error.Description); return Task.CompletedTask; } public Task CreateConsumerAsync( string stream, string? durableName, string? filterSubject = null, AckPolicy ackPolicy = AckPolicy.None, int ackWaitMs = 30_000, int maxDeliver = 1, bool ephemeral = false, DeliverPolicy deliverPolicy = DeliverPolicy.All, ulong optStartSeq = 0) { var config = new ConsumerConfig { DurableName = durableName ?? string.Empty, AckPolicy = ackPolicy, AckWaitMs = ackWaitMs, MaxDeliver = maxDeliver, Ephemeral = ephemeral, DeliverPolicy = deliverPolicy, OptStartSeq = optStartSeq, }; if (!string.IsNullOrWhiteSpace(filterSubject)) config.FilterSubject = filterSubject; return Task.FromResult(_consumerManager.CreateOrUpdate(stream, config)); } public Task PublishAsync(string subject, string payload) { if (_publisher.TryCapture(subject, Encoding.UTF8.GetBytes(payload), null, out var ack)) { if (ack.ErrorCode == null && _streamManager.TryGet(ack.Stream, out var handle)) { var stored = handle.Store.LoadAsync(ack.Seq, default).GetAwaiter().GetResult(); if (stored != null) _consumerManager.OnPublished(ack.Stream, stored); } return Task.FromResult(ack); } throw new InvalidOperationException($"Publish to '{subject}' did not match a stream."); } public Task FetchAsync(string stream, string durableName, int batch) => _consumerManager.FetchAsync(stream, durableName, batch, _streamManager, default).AsTask(); public void AckAll(string stream, string durableName, ulong sequence) => _consumerManager.AckAll(stream, durableName, sequence); public int GetPendingCount(string stream, string durableName) => _consumerManager.GetPendingCount(stream, durableName); public Task GetConsumerInfoAsync(string stream, string durableName) { var resp = _consumerManager.GetInfo(stream, durableName); if (resp.ConsumerInfo == null) throw new InvalidOperationException("Consumer not found."); return Task.FromResult(resp.ConsumerInfo); } public Task StepDownStreamLeaderAsync(string stream) => _streamManager.StepDownStreamLeaderAsync(stream, default); public Task RequestAsync(string subject, string payload) => Task.FromResult(_router.Route(subject, Encoding.UTF8.GetBytes(payload))); public ValueTask DisposeAsync() => ValueTask.CompletedTask; }