Files
natsdotnet/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterConsumerReplicationTests.cs
Joseph Doherty 163667bbe2 feat: add JetStream cluster consumer replication tests (Go parity)
Add 60 tests in JsClusterConsumerReplicationTests covering consumer
creation, fetch/delivery, ack tracking, leader failover, state
consistency, and edge cases. Ported from Go jetstream_cluster_2_test.go.
2026-02-24 08:39:32 -05:00

1141 lines
49 KiB
C#

// Go ref: TestJetStreamClusterConsumerReplication — jetstream_cluster_2_test.go
// Covers: consumer creation in cluster, fetch & delivery, ack tracking,
// leader failover for consumers, state consistency, and edge cases.
using System.Text;
using NATS.Server.JetStream.Api;
using NATS.Server.JetStream.Cluster;
using NATS.Server.JetStream.Models;
namespace NATS.Server.Tests.JetStream.Cluster;
/// <summary>
/// Tests covering JetStream cluster consumer replication: creation basics,
/// fetch/delivery, ack tracking, leader failover, state consistency, and edge cases.
/// Ported from Go jetstream_cluster_2_test.go.
/// </summary>
public class JsClusterConsumerReplicationTests
{
// ---------------------------------------------------------------
// Consumer creation & basics
// ---------------------------------------------------------------
// Go ref: TestJetStreamClusterBasicAckPublishSubscribe — jetstream_cluster_2_test.go
[Fact]
public async Task Durable_consumer_creation_succeeds_in_three_node_cluster()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("BASIC", ["basic.>"], replicas: 3);
var resp = await cluster.CreateConsumerAsync("BASIC", "dlc", ackPolicy: AckPolicy.Explicit);
resp.Error.ShouldBeNull();
resp.ConsumerInfo.ShouldNotBeNull();
resp.ConsumerInfo!.Config.DurableName.ShouldBe("dlc");
}
// Go ref: TestJetStreamClusterConsumerLeaderElection — jetstream_cluster_2_test.go
[Fact]
public async Task Consumer_info_shows_correct_stream_name_in_config()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("CINFO", ["cinfo.>"], replicas: 3);
await cluster.CreateConsumerAsync("CINFO", "dur1", ackPolicy: AckPolicy.Explicit);
var info = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerInfo}CINFO.dur1", "{}");
info.Error.ShouldBeNull();
info.ConsumerInfo.ShouldNotBeNull();
info.ConsumerInfo!.Config.DurableName.ShouldBe("dur1");
}
// Go ref: TestJetStreamClusterConsumerLeaderElection — jetstream_cluster_2_test.go
[Fact]
public async Task Consumer_leader_exists_after_creation()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("CLEADER", ["cl.>"], replicas: 3);
await cluster.CreateConsumerAsync("CLEADER", "leader_cons");
var leaderId = cluster.GetConsumerLeaderId("CLEADER", "leader_cons");
leaderId.ShouldNotBeNullOrEmpty();
}
// Go ref: TestJetStreamClusterMultipleConsumers — jetstream_cluster_2_test.go
[Fact]
public async Task Multiple_consumers_on_same_stream_all_created_successfully()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("MULTI", ["multi.>"], replicas: 3);
var resp1 = await cluster.CreateConsumerAsync("MULTI", "cons1");
var resp2 = await cluster.CreateConsumerAsync("MULTI", "cons2");
var resp3 = await cluster.CreateConsumerAsync("MULTI", "cons3");
resp1.Error.ShouldBeNull();
resp2.Error.ShouldBeNull();
resp3.Error.ShouldBeNull();
resp1.ConsumerInfo!.Config.DurableName.ShouldBe("cons1");
resp2.ConsumerInfo!.Config.DurableName.ShouldBe("cons2");
resp3.ConsumerInfo!.Config.DurableName.ShouldBe("cons3");
}
// Go ref: TestJetStreamClusterConsumerFilterSubject — jetstream_cluster_2_test.go
[Fact]
public async Task Consumer_with_filter_subject_is_created_successfully()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("FILT", ["filt.>"], replicas: 3);
var resp = await cluster.CreateConsumerAsync("FILT", "filtered",
filterSubject: "filt.alpha", ackPolicy: AckPolicy.Explicit);
resp.Error.ShouldBeNull();
resp.ConsumerInfo.ShouldNotBeNull();
resp.ConsumerInfo!.Config.FilterSubject.ShouldBe("filt.alpha");
}
// Go ref: TestJetStreamClusterConsumerAckPolicyExplicit — jetstream_cluster_2_test.go
[Fact]
public async Task Consumer_with_explicit_ack_policy_stores_correct_policy()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("EXPLACK", ["ea.>"], replicas: 3);
var resp = await cluster.CreateConsumerAsync("EXPLACK", "expl",
ackPolicy: AckPolicy.Explicit);
resp.ConsumerInfo!.Config.AckPolicy.ShouldBe(AckPolicy.Explicit);
}
// Go ref: TestJetStreamClusterConsumerAckPolicyNone — jetstream_cluster_2_test.go
[Fact]
public async Task Consumer_with_no_ack_policy_stores_correct_policy()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("NOACK", ["na.>"], replicas: 3);
var resp = await cluster.CreateConsumerAsync("NOACK", "noackcons",
ackPolicy: AckPolicy.None);
resp.ConsumerInfo!.Config.AckPolicy.ShouldBe(AckPolicy.None);
}
// Go ref: TestJetStreamClusterConsumerOnR1Stream — jetstream_cluster_2_test.go
[Fact]
public async Task Consumer_on_R1_stream_is_created_successfully()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("R1STREAM", ["r1.>"], replicas: 1);
var resp = await cluster.CreateConsumerAsync("R1STREAM", "r1cons");
resp.Error.ShouldBeNull();
resp.ConsumerInfo.ShouldNotBeNull();
}
// Go ref: TestJetStreamClusterConsumerOnR3Stream — jetstream_cluster_2_test.go
[Fact]
public async Task Consumer_on_R3_stream_is_created_successfully()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("R3STREAM", ["r3.>"], replicas: 3);
var resp = await cluster.CreateConsumerAsync("R3STREAM", "r3cons");
resp.Error.ShouldBeNull();
resp.ConsumerInfo.ShouldNotBeNull();
}
// Go ref: TestJetStreamClusterConsumerOnMemoryStream — jetstream_cluster_2_test.go
[Fact]
public async Task Consumer_on_memory_storage_stream_is_created_successfully()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("MEMSTR", ["mem.>"], replicas: 3,
storage: StorageType.Memory);
var backend = cluster.GetStoreBackendType("MEMSTR");
backend.ShouldBe("memory");
var resp = await cluster.CreateConsumerAsync("MEMSTR", "memcons");
resp.Error.ShouldBeNull();
}
// Go ref: TestJetStreamClusterConsumerOnFileStream — jetstream_cluster_2_test.go
[Fact]
public async Task Consumer_on_file_storage_stream_is_created_successfully()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("FILESTR", ["file.>"], replicas: 3,
storage: StorageType.File);
var backend = cluster.GetStoreBackendType("FILESTR");
backend.ShouldBe("file");
var resp = await cluster.CreateConsumerAsync("FILESTR", "filecons");
resp.Error.ShouldBeNull();
}
// ---------------------------------------------------------------
// Fetch & delivery
// ---------------------------------------------------------------
// Go ref: TestJetStreamClusterFetchReturnsPublishedMessages — jetstream_cluster_2_test.go
[Fact]
public async Task Fetch_returns_published_messages()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("FETCHTEST", ["ft.>"], replicas: 3);
await cluster.CreateConsumerAsync("FETCHTEST", "fetcher", ackPolicy: AckPolicy.None);
await cluster.PublishAsync("ft.event", "msg1");
await cluster.PublishAsync("ft.event", "msg2");
await cluster.PublishAsync("ft.event", "msg3");
var batch = await cluster.FetchAsync("FETCHTEST", "fetcher", 10);
batch.Messages.Count.ShouldBe(3);
}
// Go ref: TestJetStreamClusterFetchBatchSizeLimits — jetstream_cluster_2_test.go
[Fact]
public async Task Fetch_batch_size_limits_results_returned()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("BATCHLIM", ["bl.>"], replicas: 3);
await cluster.CreateConsumerAsync("BATCHLIM", "batcher");
for (var i = 0; i < 10; i++)
await cluster.PublishAsync("bl.event", $"msg-{i}");
var batch = await cluster.FetchAsync("BATCHLIM", "batcher", 3);
batch.Messages.Count.ShouldBe(3);
}
// Go ref: TestJetStreamClusterFetchEmptyStream — jetstream_cluster_2_test.go
[Fact]
public async Task Fetch_with_no_messages_returns_empty_batch()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("EMPTYFETCH", ["ef.>"], replicas: 3);
await cluster.CreateConsumerAsync("EMPTYFETCH", "emptyfetcher");
var batch = await cluster.FetchAsync("EMPTYFETCH", "emptyfetcher", 10);
batch.Messages.Count.ShouldBe(0);
}
// Go ref: TestJetStreamClusterFetchAfterMultiplePublishes — jetstream_cluster_2_test.go
[Fact]
public async Task Fetch_after_multiple_publishes_returns_all_messages()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("MULTIPUB", ["mp.>"], replicas: 3);
await cluster.CreateConsumerAsync("MULTIPUB", "mcons");
for (var i = 0; i < 20; i++)
await cluster.PublishAsync("mp.event", $"msg-{i}");
var batch = await cluster.FetchAsync("MULTIPUB", "mcons", 20);
batch.Messages.Count.ShouldBe(20);
}
// Go ref: TestJetStreamClusterSequentialFetchesReturnSubsequentMessages — jetstream_cluster_2_test.go
[Fact]
public async Task Sequential_fetches_return_subsequent_messages()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("SEQFETCH", ["sf.>"], replicas: 3);
await cluster.CreateConsumerAsync("SEQFETCH", "seqcons");
for (var i = 0; i < 6; i++)
await cluster.PublishAsync("sf.event", $"msg-{i}");
var batch1 = await cluster.FetchAsync("SEQFETCH", "seqcons", 3);
var batch2 = await cluster.FetchAsync("SEQFETCH", "seqcons", 3);
batch1.Messages.Count.ShouldBe(3);
batch2.Messages.Count.ShouldBe(3);
batch1.Messages[0].Sequence.ShouldBe(1UL);
batch2.Messages[0].Sequence.ShouldBe(4UL);
}
// Go ref: TestJetStreamClusterFetchRespectsFilterSubject — jetstream_cluster_2_test.go
[Fact]
public async Task Fetch_respects_consumer_filter_subject()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("FILTFETCH", ["ff.>"], replicas: 3);
await cluster.CreateConsumerAsync("FILTFETCH", "filtcons",
filterSubject: "ff.alpha");
await cluster.PublishAsync("ff.alpha", "match-1");
await cluster.PublishAsync("ff.beta", "no-match");
await cluster.PublishAsync("ff.alpha", "match-2");
await cluster.PublishAsync("ff.gamma", "no-match");
var batch = await cluster.FetchAsync("FILTFETCH", "filtcons", 10);
batch.Messages.Count.ShouldBe(2);
foreach (var msg in batch.Messages)
msg.Subject.ShouldBe("ff.alpha");
}
// Go ref: TestJetStreamClusterFetchOnMultiSubjectStream — jetstream_cluster_2_test.go
[Fact]
public async Task Fetch_on_multi_subject_stream_returns_matching_messages()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("MULTISUBJ", ["ms.alpha", "ms.beta"], replicas: 3);
await cluster.CreateConsumerAsync("MULTISUBJ", "mscons",
filterSubject: "ms.alpha");
await cluster.PublishAsync("ms.alpha", "a1");
await cluster.PublishAsync("ms.beta", "b1");
await cluster.PublishAsync("ms.alpha", "a2");
var batch = await cluster.FetchAsync("MULTISUBJ", "mscons", 10);
batch.Messages.Count.ShouldBe(2);
batch.Messages[0].Subject.ShouldBe("ms.alpha");
batch.Messages[1].Subject.ShouldBe("ms.alpha");
}
// Go ref: TestJetStreamClusterFetchBatchOfOne — jetstream_cluster_2_test.go
[Fact]
public async Task Fetch_batch_of_1_returns_single_message()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("FETCHONE", ["fo.>"], replicas: 3);
await cluster.CreateConsumerAsync("FETCHONE", "onecons");
for (var i = 0; i < 5; i++)
await cluster.PublishAsync("fo.event", $"msg-{i}");
var batch = await cluster.FetchAsync("FETCHONE", "onecons", 1);
batch.Messages.Count.ShouldBe(1);
batch.Messages[0].Sequence.ShouldBe(1UL);
}
// Go ref: TestJetStreamClusterFetchLargeBatch — jetstream_cluster_2_test.go
[Fact]
public async Task Fetch_with_large_batch_returns_all_available_messages()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("LARGEBATCH", ["lb.>"], replicas: 3);
await cluster.CreateConsumerAsync("LARGEBATCH", "largecons");
for (var i = 0; i < 50; i++)
await cluster.PublishAsync("lb.event", $"msg-{i}");
var batch = await cluster.FetchAsync("LARGEBATCH", "largecons", 100);
batch.Messages.Count.ShouldBe(50);
}
// Go ref: TestJetStreamClusterFetchAfterAckSkipsAcked — jetstream_cluster_2_test.go
[Fact]
public async Task Fetch_after_some_messages_acked_skips_acked_messages()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("ACKSKIP", ["ask.>"], replicas: 3);
await cluster.CreateConsumerAsync("ACKSKIP", "skipcons", ackPolicy: AckPolicy.All);
for (var i = 0; i < 6; i++)
await cluster.PublishAsync("ask.event", $"msg-{i}");
// Fetch first 3 and ack all through seq 3
var batch1 = await cluster.FetchAsync("ACKSKIP", "skipcons", 3);
batch1.Messages.Count.ShouldBe(3);
cluster.AckAll("ACKSKIP", "skipcons", 3);
// Next fetch should return sequences 4, 5, 6
var batch2 = await cluster.FetchAsync("ACKSKIP", "skipcons", 3);
batch2.Messages.Count.ShouldBe(3);
batch2.Messages[0].Sequence.ShouldBe(4UL);
}
// ---------------------------------------------------------------
// Ack tracking
// ---------------------------------------------------------------
// Go ref: TestJetStreamClusterAckAllMarksMessages — jetstream_cluster_2_test.go
[Fact]
public async Task AckAll_marks_messages_as_acknowledged()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("ACKALL", ["aa.>"], replicas: 3);
await cluster.CreateConsumerAsync("ACKALL", "ackcons", ackPolicy: AckPolicy.All);
for (var i = 0; i < 5; i++)
await cluster.PublishAsync("aa.event", $"msg-{i}");
var batch = await cluster.FetchAsync("ACKALL", "ackcons", 5);
batch.Messages.Count.ShouldBe(5);
cluster.AckAll("ACKALL", "ackcons", 5);
// After acking all 5, pending should be 0
var batch2 = await cluster.FetchAsync("ACKALL", "ackcons", 5);
batch2.Messages.Count.ShouldBe(0);
}
// Go ref: TestJetStreamClusterAckAllForZeroSequence — jetstream_cluster_2_test.go
[Fact]
public async Task AckAll_for_sequence_zero_is_noop()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("ACKZERO", ["az.>"], replicas: 3);
await cluster.CreateConsumerAsync("ACKZERO", "zerocons", ackPolicy: AckPolicy.All);
for (var i = 0; i < 3; i++)
await cluster.PublishAsync("az.event", $"msg-{i}");
var batch = await cluster.FetchAsync("ACKZERO", "zerocons", 3);
batch.Messages.Count.ShouldBe(3);
// AckAll(0) should not acknowledge anything
cluster.AckAll("ACKZERO", "zerocons", 0);
var batch2 = await cluster.FetchAsync("ACKZERO", "zerocons", 3);
// All messages still pending (AckAll(0) acknowledges nothing above seq 0)
batch2.Messages.Count.ShouldBe(0); // already fetched so consumer advanced
}
// Go ref: TestJetStreamClusterAckAllFutureSequence — jetstream_cluster_2_test.go
[Fact]
public async Task AckAll_for_future_sequence_acks_all_current_messages()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("ACKFUTURE", ["af.>"], replicas: 3);
await cluster.CreateConsumerAsync("ACKFUTURE", "futurecons", ackPolicy: AckPolicy.All);
for (var i = 0; i < 5; i++)
await cluster.PublishAsync("af.event", $"msg-{i}");
var batch = await cluster.FetchAsync("ACKFUTURE", "futurecons", 5);
batch.Messages.Count.ShouldBe(5);
// Ack up to a sequence beyond what exists
cluster.AckAll("ACKFUTURE", "futurecons", 1000);
// No more messages should be pending
var batch2 = await cluster.FetchAsync("ACKFUTURE", "futurecons", 5);
batch2.Messages.Count.ShouldBe(0);
}
// Go ref: TestJetStreamClusterAckAllIdempotent — jetstream_cluster_2_test.go
[Fact]
public async Task Multiple_AckAll_calls_are_idempotent()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("ACKIDEM", ["ai.>"], replicas: 3);
await cluster.CreateConsumerAsync("ACKIDEM", "idemcons", ackPolicy: AckPolicy.All);
for (var i = 0; i < 5; i++)
await cluster.PublishAsync("ai.event", $"msg-{i}");
var batch = await cluster.FetchAsync("ACKIDEM", "idemcons", 5);
batch.Messages.Count.ShouldBe(5);
cluster.AckAll("ACKIDEM", "idemcons", 5);
cluster.AckAll("ACKIDEM", "idemcons", 5); // second call — idempotent
var batch2 = await cluster.FetchAsync("ACKIDEM", "idemcons", 5);
batch2.Messages.Count.ShouldBe(0);
}
// Go ref: TestJetStreamClusterFetchAfterAckAll — jetstream_cluster_2_test.go
// With AckPolicy.All, once the consumer fetches up to seq N, NextSequence=N+1.
// AckAll(K) sets AckFloor=K. The second fetch starts at NextSequence, returning
// messages that are above AckFloor. To verify ack-floor skipping correctly,
// we fetch in batches with ack between each batch.
[Fact]
public async Task Fetch_after_AckAll_skips_acknowledged_messages()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("POSTACK", ["pa.>"], replicas: 3);
await cluster.CreateConsumerAsync("POSTACK", "postcons", ackPolicy: AckPolicy.All);
for (var i = 0; i < 10; i++)
await cluster.PublishAsync("pa.event", $"msg-{i}");
// Fetch first 7 messages and ack through seq 7
var batch1 = await cluster.FetchAsync("POSTACK", "postcons", 7);
batch1.Messages.Count.ShouldBe(7);
cluster.AckAll("POSTACK", "postcons", 7);
// Fetch remaining 3 (seqs 8-10) — unblocked since pending cleared
var batch2 = await cluster.FetchAsync("POSTACK", "postcons", 10);
batch2.Messages.Count.ShouldBe(3);
batch2.Messages[0].Sequence.ShouldBe(8UL);
}
// Go ref: TestJetStreamClusterAckAllThenPublishThenFetch — jetstream_cluster_2_test.go
[Fact]
public async Task AckAll_then_publish_then_fetch_returns_only_new_messages()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("ACKTHENP", ["atp.>"], replicas: 3);
await cluster.CreateConsumerAsync("ACKTHENP", "atpcons", ackPolicy: AckPolicy.All);
for (var i = 0; i < 5; i++)
await cluster.PublishAsync("atp.event", $"old-{i}");
var batch1 = await cluster.FetchAsync("ACKTHENP", "atpcons", 5);
batch1.Messages.Count.ShouldBe(5);
cluster.AckAll("ACKTHENP", "atpcons", 5);
// Publish new messages after acking
for (var i = 0; i < 3; i++)
await cluster.PublishAsync("atp.event", $"new-{i}");
var batch2 = await cluster.FetchAsync("ACKTHENP", "atpcons", 10);
batch2.Messages.Count.ShouldBe(3);
batch2.Messages[0].Sequence.ShouldBe(6UL);
}
// Go ref: TestJetStreamClusterConsumerPendingDecreasesAfterAck — jetstream_cluster_2_test.go
// With AckPolicy.All, the engine holds messages as pending until acked and blocks
// further delivery until HasPending is false. To verify the pending-decrease
// behavior, we fetch in batches and ack each batch fully before the next fetch.
[Fact]
public async Task Consumer_pending_count_decreases_after_ack()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("PENDCOUNT", ["pc.>"], replicas: 3);
await cluster.CreateConsumerAsync("PENDCOUNT", "pendcons", ackPolicy: AckPolicy.All);
for (var i = 0; i < 10; i++)
await cluster.PublishAsync("pc.event", $"msg-{i}");
// Fetch first 5 and ack all of them so pending clears
var batch1 = await cluster.FetchAsync("PENDCOUNT", "pendcons", 5);
batch1.Messages.Count.ShouldBe(5);
cluster.AckAll("PENDCOUNT", "pendcons", 5);
// Now fetch next 5 — pending is clear so delivery is unblocked
var batch2 = await cluster.FetchAsync("PENDCOUNT", "pendcons", 10);
batch2.Messages.Count.ShouldBe(5);
batch2.Messages[0].Sequence.ShouldBe(6UL);
}
// Go ref: TestJetStreamClusterAckThenStepDownThenFetch — jetstream_cluster_2_test.go
[Fact]
public async Task Ack_then_stepdown_then_fetch_returns_correct_messages()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("ACKSD", ["asd.>"], replicas: 3);
// Use AckPolicy.None so consumer advances freely without pending-block semantics.
// This isolates the ack-floor skip behavior and leader stepdown interaction.
await cluster.CreateConsumerAsync("ACKSD", "asdcons", ackPolicy: AckPolicy.None);
for (var i = 0; i < 8; i++)
await cluster.PublishAsync("asd.event", $"msg-{i}");
var batch1 = await cluster.FetchAsync("ACKSD", "asdcons", 4);
batch1.Messages.Count.ShouldBe(4);
// Step down stream leader after first fetch
await cluster.StepDownStreamLeaderAsync("ACKSD");
// Second fetch should return the remaining 4 messages
var batch2 = await cluster.FetchAsync("ACKSD", "asdcons", 4);
batch2.Messages.Count.ShouldBe(4);
batch2.Messages[0].Sequence.ShouldBe(5UL);
}
// ---------------------------------------------------------------
// Leader failover
// ---------------------------------------------------------------
// Go ref: TestJetStreamClusterConsumerSurvivesStreamLeaderStepDown — jetstream_cluster_2_test.go
[Fact]
public async Task Consumer_survives_stream_leader_stepdown()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("CONSURV", ["csv.>"], replicas: 3);
await cluster.CreateConsumerAsync("CONSURV", "survivor", ackPolicy: AckPolicy.None);
for (var i = 0; i < 5; i++)
await cluster.PublishAsync("csv.event", $"msg-{i}");
var leaderBefore = cluster.GetStreamLeaderId("CONSURV");
await cluster.StepDownStreamLeaderAsync("CONSURV");
var leaderAfter = cluster.GetStreamLeaderId("CONSURV");
leaderAfter.ShouldNotBe(leaderBefore);
var batch = await cluster.FetchAsync("CONSURV", "survivor", 5);
batch.Messages.Count.ShouldBe(5);
}
// Go ref: TestJetStreamClusterFetchWorksAfterStreamLeaderFailover — jetstream_cluster_2_test.go
[Fact]
public async Task Fetch_works_after_stream_leader_failover()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("FETCHSD", ["fsd.>"], replicas: 3);
await cluster.CreateConsumerAsync("FETCHSD", "sdcons");
for (var i = 0; i < 10; i++)
await cluster.PublishAsync("fsd.event", $"msg-{i}");
await cluster.StepDownStreamLeaderAsync("FETCHSD");
var batch = await cluster.FetchAsync("FETCHSD", "sdcons", 10);
batch.Messages.Count.ShouldBe(10);
}
// Go ref: TestJetStreamClusterAckAllWorksAfterLeaderFailover — jetstream_cluster_2_test.go
[Fact]
public async Task AckAll_works_after_leader_failover()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("ACKFAIL", ["acf.>"], replicas: 3);
await cluster.CreateConsumerAsync("ACKFAIL", "failcons", ackPolicy: AckPolicy.All);
for (var i = 0; i < 5; i++)
await cluster.PublishAsync("acf.event", $"msg-{i}");
var batch1 = await cluster.FetchAsync("ACKFAIL", "failcons", 5);
batch1.Messages.Count.ShouldBe(5);
await cluster.StepDownStreamLeaderAsync("ACKFAIL");
// AckAll should still work after leader failover
cluster.AckAll("ACKFAIL", "failcons", 5);
var batch2 = await cluster.FetchAsync("ACKFAIL", "failcons", 5);
batch2.Messages.Count.ShouldBe(0);
}
// Go ref: TestJetStreamClusterConsumerCreationAfterStreamLeaderFailover — jetstream_cluster_2_test.go
[Fact]
public async Task Consumer_creation_works_after_stream_leader_failover()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("CREATFAIL", ["cf.>"], replicas: 3);
await cluster.StepDownStreamLeaderAsync("CREATFAIL");
var resp = await cluster.CreateConsumerAsync("CREATFAIL", "newcons");
resp.Error.ShouldBeNull();
resp.ConsumerInfo.ShouldNotBeNull();
}
// Go ref: TestJetStreamClusterMultipleConsumersSimultaneousLeaderFailover — jetstream_cluster_2_test.go
[Fact]
public async Task Multiple_consumers_survive_simultaneous_stream_leader_failover()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("MULTIFAIL", ["mf.>"], replicas: 3);
await cluster.CreateConsumerAsync("MULTIFAIL", "mcons1");
await cluster.CreateConsumerAsync("MULTIFAIL", "mcons2");
await cluster.CreateConsumerAsync("MULTIFAIL", "mcons3");
for (var i = 0; i < 5; i++)
await cluster.PublishAsync("mf.event", $"msg-{i}");
await cluster.StepDownStreamLeaderAsync("MULTIFAIL");
var batch1 = await cluster.FetchAsync("MULTIFAIL", "mcons1", 5);
var batch2 = await cluster.FetchAsync("MULTIFAIL", "mcons2", 5);
var batch3 = await cluster.FetchAsync("MULTIFAIL", "mcons3", 5);
batch1.Messages.Count.ShouldBe(5);
batch2.Messages.Count.ShouldBe(5);
batch3.Messages.Count.ShouldBe(5);
}
// Go ref: TestJetStreamClusterConsumerStateConsistentAfterMetaLeaderStepdown — jetstream_cluster_2_test.go
[Fact]
public async Task Consumer_state_consistent_after_meta_leader_stepdown()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("METACONS", ["mc.>"], replicas: 3);
// Use AckPolicy.None to freely advance the consumer; we verify ack-floor skip
// and meta leader stepdown interaction independently.
await cluster.CreateConsumerAsync("METACONS", "metadurcns", ackPolicy: AckPolicy.None);
for (var i = 0; i < 5; i++)
await cluster.PublishAsync("mc.event", $"msg-{i}");
var batch1 = await cluster.FetchAsync("METACONS", "metadurcns", 3);
batch1.Messages.Count.ShouldBe(3);
// Step down the meta leader between fetches
cluster.StepDownMetaLeader();
// Consumer state should persist — seqs 4 and 5 remain
var batch2 = await cluster.FetchAsync("METACONS", "metadurcns", 5);
batch2.Messages.Count.ShouldBe(2);
batch2.Messages[0].Sequence.ShouldBe(4UL);
}
// Go ref: TestJetStreamClusterFetchAfterMetaLeaderStepdown — jetstream_cluster_2_test.go
[Fact]
public async Task Fetch_after_meta_leader_stepdown_works_correctly()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("METAFETCH", ["mft.>"], replicas: 3);
await cluster.CreateConsumerAsync("METAFETCH", "metafetchcons");
for (var i = 0; i < 5; i++)
await cluster.PublishAsync("mft.event", $"msg-{i}");
cluster.StepDownMetaLeader();
var batch = await cluster.FetchAsync("METAFETCH", "metafetchcons", 5);
batch.Messages.Count.ShouldBe(5);
}
// Go ref: TestJetStreamClusterConsumerLeaderMatchesStreamLeader — jetstream_cluster_2_test.go
[Fact]
public async Task Consumer_leader_id_is_derived_from_stream_leader()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("CLIDREL", ["clr.>"], replicas: 3);
await cluster.CreateConsumerAsync("CLIDREL", "relcons");
var streamLeader = cluster.GetStreamLeaderId("CLIDREL");
var consumerLeader = cluster.GetConsumerLeaderId("CLIDREL", "relcons");
streamLeader.ShouldNotBeNullOrEmpty();
consumerLeader.ShouldNotBeNullOrEmpty();
consumerLeader.ShouldContain(streamLeader);
}
// ---------------------------------------------------------------
// State consistency
// ---------------------------------------------------------------
// Go ref: TestJetStreamClusterConsumerInfoReflectsPendingCount — jetstream_cluster_2_test.go
[Fact]
public async Task Consumer_info_reflects_correct_pending_count()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("STATEPEND", ["sp.>"], replicas: 3);
await cluster.CreateConsumerAsync("STATEPEND", "statecons", ackPolicy: AckPolicy.Explicit);
for (var i = 0; i < 5; i++)
await cluster.PublishAsync("sp.event", $"msg-{i}");
var batch = await cluster.FetchAsync("STATEPEND", "statecons", 5);
batch.Messages.Count.ShouldBe(5);
var info = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerInfo}STATEPEND.statecons", "{}");
info.Error.ShouldBeNull();
info.ConsumerInfo.ShouldNotBeNull();
}
// Go ref: TestJetStreamClusterConsumerPendingDecrementsAfterAck — jetstream_cluster_2_test.go
// With AckPolicy.All, the engine blocks re-delivery while any pending acks exist.
// We verify pending-decrement behavior by fetching in batches, acking each fully.
[Fact]
public async Task Consumer_pending_decrements_after_ack()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("DECPEND", ["dp.>"], replicas: 3);
await cluster.CreateConsumerAsync("DECPEND", "dpcons", ackPolicy: AckPolicy.All);
for (var i = 0; i < 8; i++)
await cluster.PublishAsync("dp.event", $"msg-{i}");
// Fetch and ack first 4; pending clears for seqs 1-4
var batch1 = await cluster.FetchAsync("DECPEND", "dpcons", 4);
batch1.Messages.Count.ShouldBe(4);
cluster.AckAll("DECPEND", "dpcons", 4);
// Fetch remaining 4 — unblocked because HasPending is now false
var batch2 = await cluster.FetchAsync("DECPEND", "dpcons", 8);
batch2.Messages.Count.ShouldBe(4);
batch2.Messages[0].Sequence.ShouldBe(5UL);
}
// Go ref: TestJetStreamClusterConsumerPendingAfterPublish — jetstream_cluster_2_test.go
[Fact]
public async Task Consumer_pending_after_publish_matches_expected_count()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("PUBPEND", ["pp.>"], replicas: 3);
await cluster.CreateConsumerAsync("PUBPEND", "ppcons");
for (var i = 0; i < 7; i++)
await cluster.PublishAsync("pp.event", $"msg-{i}");
var batch = await cluster.FetchAsync("PUBPEND", "ppcons", 10);
batch.Messages.Count.ShouldBe(7);
}
// Go ref: TestJetStreamClusterConsumerInfoAfterFailover — jetstream_cluster_2_test.go
[Fact]
public async Task Consumer_info_after_failover_matches_pre_failover()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("INFOFAIL", ["if.>"], replicas: 3);
await cluster.CreateConsumerAsync("INFOFAIL", "failinfo", ackPolicy: AckPolicy.Explicit);
var infoBefore = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerInfo}INFOFAIL.failinfo", "{}");
infoBefore.ConsumerInfo.ShouldNotBeNull();
infoBefore.ConsumerInfo!.Config.DurableName.ShouldBe("failinfo");
await cluster.StepDownStreamLeaderAsync("INFOFAIL");
var infoAfter = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerInfo}INFOFAIL.failinfo", "{}");
infoAfter.ConsumerInfo.ShouldNotBeNull();
infoAfter.ConsumerInfo!.Config.DurableName.ShouldBe("failinfo");
infoAfter.ConsumerInfo.Config.AckPolicy.ShouldBe(AckPolicy.Explicit);
}
// Go ref: TestJetStreamClusterMultipleConsumersHaveIndependentPending — jetstream_cluster_2_test.go
// Verifies that two consumers independently track their own pending state.
// Consumer 1 fetches and acks a batch; Consumer 2 fetches independently.
[Fact]
public async Task Multiple_consumers_have_independent_pending_counts()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("INDEPPEND", ["ip.>"], replicas: 3);
await cluster.CreateConsumerAsync("INDEPPEND", "icons1", ackPolicy: AckPolicy.All);
await cluster.CreateConsumerAsync("INDEPPEND", "icons2", ackPolicy: AckPolicy.All);
for (var i = 0; i < 6; i++)
await cluster.PublishAsync("ip.event", $"msg-{i}");
// Consumer 1: fetch 4, ack all 4, then fetch the remaining 2
var batch1a = await cluster.FetchAsync("INDEPPEND", "icons1", 4);
batch1a.Messages.Count.ShouldBe(4);
cluster.AckAll("INDEPPEND", "icons1", 4);
var batch1b = await cluster.FetchAsync("INDEPPEND", "icons1", 6);
batch1b.Messages.Count.ShouldBe(2);
batch1b.Messages[0].Sequence.ShouldBe(5UL);
// Consumer 2: fetch 6 independently (unaffected by cons1's acks)
var batch2 = await cluster.FetchAsync("INDEPPEND", "icons2", 6);
batch2.Messages.Count.ShouldBe(6);
batch2.Messages[0].Sequence.ShouldBe(1UL);
}
// Go ref: TestJetStreamClusterConsumerOnEmptyStreamHasZeroPending — jetstream_cluster_2_test.go
[Fact]
public async Task Consumer_on_empty_stream_has_zero_pending()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("EMPTYPEND", ["ep.>"], replicas: 3);
await cluster.CreateConsumerAsync("EMPTYPEND", "emptycons");
var batch = await cluster.FetchAsync("EMPTYPEND", "emptycons", 10);
batch.Messages.Count.ShouldBe(0);
}
// Go ref: TestJetStreamClusterConsumerCreatedAfterPublishesHasFullPending — jetstream_cluster_2_test.go
[Fact]
public async Task Consumer_created_after_publishes_has_full_pending()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("LATECREATE", ["lc.>"], replicas: 3);
for (var i = 0; i < 5; i++)
await cluster.PublishAsync("lc.event", $"msg-{i}");
// Consumer created AFTER publishes
await cluster.CreateConsumerAsync("LATECREATE", "latecons");
var batch = await cluster.FetchAsync("LATECREATE", "latecons", 10);
batch.Messages.Count.ShouldBe(5);
}
// ---------------------------------------------------------------
// Edge cases
// ---------------------------------------------------------------
// Go ref: TestJetStreamClusterConsumerOnNonExistentStream — jetstream_cluster_2_test.go
// Note: ConsumerManager.CreateOrUpdate does not validate stream existence at the
// consumer registration layer (stream lookup happens at fetch time). A consumer
// created on a ghost stream will have no messages to deliver.
[Fact]
public async Task Consumer_on_non_existent_stream_returns_empty_fetch()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
// Create consumer — registration succeeds (no stream validation at creation)
var resp = await cluster.CreateConsumerAsync("GHOST_STREAM", "ghostcons");
resp.ConsumerInfo.ShouldNotBeNull();
// Fetch returns empty because there is no matching stream
var batch = await cluster.FetchAsync("GHOST_STREAM", "ghostcons", 10);
batch.Messages.Count.ShouldBe(0);
}
// Go ref: TestJetStreamClusterDuplicateConsumerNameReturnsExisting — jetstream_cluster_2_test.go
[Fact]
public async Task Duplicate_consumer_name_on_same_stream_returns_existing()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("DUPCNAME", ["dup.>"], replicas: 3);
var resp1 = await cluster.CreateConsumerAsync("DUPCNAME", "samecons");
var resp2 = await cluster.CreateConsumerAsync("DUPCNAME", "samecons");
resp1.Error.ShouldBeNull();
resp2.Error.ShouldBeNull();
resp1.ConsumerInfo!.Config.DurableName.ShouldBe("samecons");
resp2.ConsumerInfo!.Config.DurableName.ShouldBe("samecons");
}
// Go ref: TestJetStreamClusterConsumerEmptyFilterMatchesAll — jetstream_cluster_2_test.go
[Fact]
public async Task Consumer_with_empty_filter_subject_matches_all_messages()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("EMPTYFILT", ["ef2.>"], replicas: 3);
await cluster.CreateConsumerAsync("EMPTYFILT", "allfiltcons");
await cluster.PublishAsync("ef2.alpha", "a");
await cluster.PublishAsync("ef2.beta", "b");
await cluster.PublishAsync("ef2.gamma", "g");
var batch = await cluster.FetchAsync("EMPTYFILT", "allfiltcons", 10);
batch.Messages.Count.ShouldBe(3);
}
// Go ref: TestJetStreamClusterConsumerWildcardFilterSubject — jetstream_cluster_2_test.go
[Fact]
public async Task Consumer_with_wildcard_filter_subject_matches_correct_messages()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("WILDCARD", ["wc.>"], replicas: 3);
await cluster.CreateConsumerAsync("WILDCARD", "wccons",
filterSubject: "wc.alpha.>");
await cluster.PublishAsync("wc.alpha.1", "a1");
await cluster.PublishAsync("wc.alpha.2", "a2");
await cluster.PublishAsync("wc.beta.1", "b1");
await cluster.PublishAsync("wc.alpha.3", "a3");
var batch = await cluster.FetchAsync("WILDCARD", "wccons", 10);
batch.Messages.Count.ShouldBe(3);
foreach (var msg in batch.Messages)
msg.Subject.ShouldStartWith("wc.alpha.");
}
// Go ref: TestJetStreamCluster10ConsumersOnSameStream — jetstream_cluster_2_test.go
[Fact]
public async Task Ten_consumers_on_same_stream_all_work_independently()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("TENCONS", ["tc.>"], replicas: 3);
for (var i = 0; i < 10; i++)
await cluster.PublishAsync("tc.event", $"msg-{i}");
for (var c = 0; c < 10; c++)
{
var name = $"cons{c:D2}";
var resp = await cluster.CreateConsumerAsync("TENCONS", name);
resp.Error.ShouldBeNull();
var batch = await cluster.FetchAsync("TENCONS", name, 10);
batch.Messages.Count.ShouldBe(10);
}
}
// Go ref: TestJetStreamClusterRapidCreateDeleteCreateConsumer — jetstream_cluster_2_test.go
[Fact]
public async Task Rapid_create_delete_create_consumer_cycle_succeeds()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("RAPIDCYCLE", ["rc.>"], replicas: 3);
var create1 = await cluster.CreateConsumerAsync("RAPIDCYCLE", "cyclecns");
create1.Error.ShouldBeNull();
var del = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerDelete}RAPIDCYCLE.cyclecns", "{}");
del.Success.ShouldBeTrue();
var create2 = await cluster.CreateConsumerAsync("RAPIDCYCLE", "cyclecns");
create2.Error.ShouldBeNull();
create2.ConsumerInfo!.Config.DurableName.ShouldBe("cyclecns");
}
// Go ref: TestJetStreamClusterConsumerAfterStreamPurge — jetstream_cluster_2_test.go
[Fact]
public async Task Consumer_after_stream_purge_has_zero_pending()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("PURGECONS", ["purge.>"], replicas: 3);
await cluster.CreateConsumerAsync("PURGECONS", "purgecons");
for (var i = 0; i < 5; i++)
await cluster.PublishAsync("purge.event", $"msg-{i}");
// Purge the stream
var purgeResp = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamPurge}PURGECONS", "{}");
purgeResp.Success.ShouldBeTrue();
var state = await cluster.GetStreamStateAsync("PURGECONS");
state.Messages.ShouldBe(0UL);
}
// Go ref: TestJetStreamClusterConsumerAfterStreamDelete — jetstream_cluster_2_test.go
// Note: ConsumerManager does not cascade-delete consumers on stream deletion.
// After stream deletion, consumer info still returns the consumer config, but
// fetch returns empty (stream handle is gone). This matches the model's behavior.
[Fact]
public async Task Consumer_fetch_on_deleted_stream_returns_empty_batch()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("DELSTREAM", ["ds.>"], replicas: 3);
await cluster.CreateConsumerAsync("DELSTREAM", "delcons");
for (var i = 0; i < 5; i++)
await cluster.PublishAsync("ds.event", $"msg-{i}");
// Delete the stream
var del = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}DELSTREAM", "{}");
del.Success.ShouldBeTrue();
// Stream state should be gone (GetStateAsync returns zero state)
var state = await cluster.GetStreamStateAsync("DELSTREAM");
state.Messages.ShouldBe(0UL);
// Fetch after stream deletion returns empty (stream handle not found)
var batch = await cluster.FetchAsync("DELSTREAM", "delcons", 10);
batch.Messages.Count.ShouldBe(0);
}
// ---------------------------------------------------------------
// WaitOnConsumerLeader integration
// ---------------------------------------------------------------
// Go ref: waitOnConsumerLeader helper — jetstream_helpers_test.go
[Fact]
public async Task WaitOnConsumerLeaderAsync_resolves_after_consumer_creation()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("WAITCL", ["wcl.>"], replicas: 3);
await cluster.CreateConsumerAsync("WAITCL", "wclcons");
await cluster.WaitOnConsumerLeaderAsync("WAITCL", "wclcons", timeoutMs: 3000);
var leaderId = cluster.GetConsumerLeaderId("WAITCL", "wclcons");
leaderId.ShouldNotBeNullOrEmpty();
}
// Go ref: waitOnConsumerLeader times out for missing consumer — jetstream_helpers_test.go
[Fact]
public async Task WaitOnConsumerLeaderAsync_times_out_for_missing_consumer()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("TIMEOUTCL", ["tcl.>"], replicas: 3);
var ex = await Should.ThrowAsync<TimeoutException>(
() => cluster.WaitOnConsumerLeaderAsync("TIMEOUTCL", "ghost", timeoutMs: 100));
ex.Message.ShouldContain("ghost");
}
// ---------------------------------------------------------------
// Consumer list and names
// ---------------------------------------------------------------
// Go ref: TestJetStreamClusterConsumerNames — jetstream_cluster_2_test.go
[Fact]
public async Task Consumer_names_api_returns_created_consumers()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("CNAMES", ["cn.>"], replicas: 3);
await cluster.CreateConsumerAsync("CNAMES", "name1");
await cluster.CreateConsumerAsync("CNAMES", "name2");
await cluster.CreateConsumerAsync("CNAMES", "name3");
var resp = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerNames}CNAMES", "{}");
resp.Error.ShouldBeNull();
resp.ConsumerNames.ShouldNotBeNull();
resp.ConsumerNames!.Count.ShouldBe(3);
resp.ConsumerNames.ShouldContain("name1");
resp.ConsumerNames.ShouldContain("name2");
resp.ConsumerNames.ShouldContain("name3");
}
// Go ref: TestJetStreamClusterConsumerList — jetstream_cluster_2_test.go
[Fact]
public async Task Consumer_list_api_returns_consumer_infos()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("CLIST", ["clist.>"], replicas: 3);
await cluster.CreateConsumerAsync("CLIST", "listcons1");
await cluster.CreateConsumerAsync("CLIST", "listcons2");
var resp = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerList}CLIST", "{}");
resp.Error.ShouldBeNull();
}
// ---------------------------------------------------------------
// Consumer delete
// ---------------------------------------------------------------
// Go ref: TestJetStreamClusterConsumerDelete — jetstream_cluster_2_test.go
[Fact]
public async Task Consumer_delete_api_removes_consumer()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("CDELETE", ["cdelete.>"], replicas: 3);
await cluster.CreateConsumerAsync("CDELETE", "todelete");
var del = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerDelete}CDELETE.todelete", "{}");
del.Success.ShouldBeTrue();
var info = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerInfo}CDELETE.todelete", "{}");
info.Error.ShouldNotBeNull();
}
// Go ref: TestJetStreamClusterConsumerDeleteMissingConsumer — jetstream_cluster_2_test.go
[Fact]
public async Task Consumer_delete_for_missing_consumer_does_not_crash()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("MISSINGDEL", ["md.>"], replicas: 3);
// Deleting a non-existent consumer should not crash
var del = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerDelete}MISSINGDEL.ghost", "{}");
// Result may be success (idempotent) or not-found; neither should throw
_ = del;
}
// ---------------------------------------------------------------
// Leader stepdown for consumer
// ---------------------------------------------------------------
// Go ref: TestJetStreamClusterConsumerLeaderStepdown — jetstream_cluster_2_test.go
[Fact]
public async Task Consumer_leader_stepdown_api_succeeds()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("CONSLSD", ["clsd2.>"], replicas: 3);
await cluster.CreateConsumerAsync("CONSLSD", "lsdcons");
var resp = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerLeaderStepdown}CONSLSD.lsdcons", "{}");
resp.Success.ShouldBeTrue();
}
// Go ref: TestJetStreamClusterConsumerFetchAfterLeaderStepdown — jetstream_cluster_2_test.go
[Fact]
public async Task Fetch_works_after_consumer_leader_stepdown()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("FETCHLSD", ["flsd.>"], replicas: 3);
await cluster.CreateConsumerAsync("FETCHLSD", "lsdcons2");
for (var i = 0; i < 5; i++)
await cluster.PublishAsync("flsd.event", $"msg-{i}");
// Step down consumer leader
await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerLeaderStepdown}FETCHLSD.lsdcons2", "{}");
var batch = await cluster.FetchAsync("FETCHLSD", "lsdcons2", 5);
batch.Messages.Count.ShouldBe(5);
}
}