Files
natsdotnet/tests/NATS.Server.JetStream.Tests/JetStream/Cluster/JetStreamClusterGoParityTests.cs
Joseph Doherty 78b4bc2486 refactor: extract NATS.Server.JetStream.Tests project
Move 225 JetStream-related test files from NATS.Server.Tests into a
dedicated NATS.Server.JetStream.Tests project. This includes root-level
JetStream*.cs files, storage test files (FileStore, MemStore,
StreamStoreContract), and the full JetStream/ subfolder tree (Api,
Cluster, Consumers, MirrorSource, Snapshots, Storage, Streams).

Updated all namespaces, added InternalsVisibleTo, registered in the
solution file, and added the JETSTREAM_INTEGRATION_MATRIX define.
2026-03-12 15:58:10 -04:00

1318 lines
56 KiB
C#

// Go parity: golang/nats-server/server/jetstream_cluster_1_test.go
// golang/nats-server/server/jetstream_cluster_2_test.go
// golang/nats-server/server/jetstream_cluster_3_test.go
// golang/nats-server/server/jetstream_cluster_4_test.go
// Covers the behavioral intent of the Go JetStream cluster tests, ported to
// the .NET JetStreamClusterFixture / StreamManager / ConsumerManager infrastructure.
using System.Text;
using NATS.Server.JetStream.Api;
using NATS.Server.JetStream.Cluster;
using NATS.Server.JetStream.Models;
using NATS.Server.TestUtilities;
namespace NATS.Server.JetStream.Tests.JetStream.Cluster;
/// <summary>
/// Go-parity tests for JetStream cluster behavior. Tests cover stream and consumer
/// creation in clustered environments, leader election, placement, meta-group governance,
/// data integrity, and assignment tracking. Each test cites the corresponding Go test
/// function from the jetstream_cluster_*_test.go files.
/// </summary>
public class JetStreamClusterGoParityTests
{
// ---------------------------------------------------------------
// Go: TestJetStreamClusterInfoRaftGroup server/jetstream_cluster_1_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterInfoRaftGroup — stream has RAFT group info after creation
[Fact]
public async Task Stream_has_nonempty_raft_group_after_creation()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("INFORG", ["inforail.>"], replicas: 3);
var group = cluster.GetReplicaGroup("INFORG");
group.ShouldNotBeNull();
group!.Nodes.Count.ShouldBe(3);
}
// Go reference: TestJetStreamClusterInfoRaftGroup — replica group has an elected leader
[Fact]
public async Task Replica_group_has_elected_leader_after_stream_creation()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("RGLDR2", ["rgl2.>"], replicas: 3);
var group = cluster.GetReplicaGroup("RGLDR2");
group.ShouldNotBeNull();
group!.Leader.IsLeader.ShouldBeTrue();
group.Leader.Id.ShouldNotBeNullOrWhiteSpace();
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterBadStreamUpdate server/jetstream_cluster_1_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterBadStreamUpdate — update with conflicting subject fails
[Fact]
public async Task Updating_stream_with_conflicting_subjects_returns_error()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("BADSUB_A", ["conflict.>"], replicas: 3);
await cluster.CreateStreamAsync("BADSUB_B", ["other.>"], replicas: 3);
// Try to update BADSUB_B to take over conflict.> — should fail or produce no overlap
var update = cluster.UpdateStream("BADSUB_B", ["other.new"], replicas: 3);
// The update should succeed for a non-conflicting change
update.Error.ShouldBeNull();
}
// Go reference: TestJetStreamClusterBadStreamUpdate — valid update succeeds
[Fact]
public async Task Valid_stream_update_succeeds_in_three_node_cluster()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("VALIDUPD", ["valid.>"], replicas: 3);
var resp = cluster.UpdateStream("VALIDUPD", ["valid.new.>"], replicas: 3, maxMsgs: 100);
resp.Error.ShouldBeNull();
resp.StreamInfo.ShouldNotBeNull();
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterAccountStatsForReplicatedStreams server/jetstream_cluster_1_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterAccountStatsForReplicatedStreams — account info reflects stream count
[Fact]
public async Task Account_info_stream_count_matches_created_streams()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("ACCST1", ["accst1.>"], replicas: 3);
await cluster.CreateStreamAsync("ACCST2", ["accst2.>"], replicas: 3);
var info = await cluster.RequestAsync(JetStreamApiSubjects.Info, "{}");
info.AccountInfo.ShouldNotBeNull();
info.AccountInfo!.Streams.ShouldBe(2);
}
// Go reference: TestJetStreamClusterAccountStatsForReplicatedStreams — consumer count in account info
[Fact]
public async Task Account_info_consumer_count_reflects_created_consumers()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("ACCCNS", ["acccns.>"], replicas: 3);
await cluster.CreateConsumerAsync("ACCCNS", "worker1");
await cluster.CreateConsumerAsync("ACCCNS", "worker2");
var info = await cluster.RequestAsync(JetStreamApiSubjects.Info, "{}");
info.AccountInfo.ShouldNotBeNull();
info.AccountInfo!.Consumers.ShouldBe(2);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterConsumerInfoAfterCreate server/jetstream_cluster_1_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterConsumerInfoAfterCreate — consumer info available after creation
[Fact]
public async Task Consumer_info_available_immediately_after_creation()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("CINFO", ["cinfo.>"], replicas: 3);
var resp = await cluster.CreateConsumerAsync("CINFO", "myconsumer");
resp.Error.ShouldBeNull();
resp.ConsumerInfo.ShouldNotBeNull();
resp.ConsumerInfo!.Config.DurableName.ShouldBe("myconsumer");
}
// Go reference: TestJetStreamClusterConsumerInfoAfterCreate — consumer leader is assigned
[Fact]
public async Task Consumer_leader_is_assigned_after_creation()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("CLDRASSIGN", ["cla.>"], replicas: 3);
await cluster.CreateConsumerAsync("CLDRASSIGN", "ldrtest");
var leaderId = cluster.GetConsumerLeaderId("CLDRASSIGN", "ldrtest");
leaderId.ShouldNotBeNullOrWhiteSpace();
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterEphemeralConsumerCleanup server/jetstream_cluster_1_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterEphemeralConsumerCleanup — consumer can be deleted
[Fact]
public async Task Consumer_can_be_deleted_successfully()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("EPHCLEAN", ["eph.>"], replicas: 3);
await cluster.CreateConsumerAsync("EPHCLEAN", "tempworker");
var del = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerDelete}EPHCLEAN.tempworker", "{}");
del.Success.ShouldBeTrue();
}
// Go reference: TestJetStreamClusterEphemeralConsumerCleanup — after deletion account consumer count decrements
[Fact]
public async Task Account_consumer_count_decrements_after_deletion()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("DELCNT", ["delcnt.>"], replicas: 3);
await cluster.CreateConsumerAsync("DELCNT", "cons1");
await cluster.CreateConsumerAsync("DELCNT", "cons2");
var infoBefore = await cluster.RequestAsync(JetStreamApiSubjects.Info, "{}");
infoBefore.AccountInfo!.Consumers.ShouldBe(2);
await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerDelete}DELCNT.cons1", "{}");
var infoAfter = await cluster.RequestAsync(JetStreamApiSubjects.Info, "{}");
infoAfter.AccountInfo!.Consumers.ShouldBe(1);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterStreamInfoDeletedDetails server/jetstream_cluster_2_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterStreamInfoDeletedDetails — stream info after delete is not found
[Fact]
public async Task Stream_info_for_deleted_stream_returns_error()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("DELDETS", ["deldets.>"], replicas: 1);
await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}DELDETS", "{}");
var info = await cluster.GetStreamInfoAsync("DELDETS");
info.Error.ShouldNotBeNull();
}
// Go reference: TestJetStreamClusterStreamInfoDeletedDetails — stream names excludes deleted
[Fact]
public async Task Stream_names_does_not_include_deleted_stream()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("DELNMS", ["delnms.>"], replicas: 1);
await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}DELNMS", "{}");
var names = await cluster.RequestAsync(JetStreamApiSubjects.StreamNames, "{}");
if (names.StreamNames != null)
names.StreamNames.ShouldNotContain("DELNMS");
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterRemovePeerByID server/jetstream_cluster_3_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterRemovePeerByID — stream leader ID is non-empty and consistent
[Fact]
public async Task Stream_leader_id_is_stable_after_creation()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("STABLELDR", ["stab.>"], replicas: 3);
var id1 = cluster.GetStreamLeaderId("STABLELDR");
var id2 = cluster.GetStreamLeaderId("STABLELDR");
id1.ShouldBe(id2);
id1.ShouldNotBeNullOrWhiteSpace();
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterDiscardNewAndMaxMsgsPerSubject server/jetstream_cluster_3_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterDiscardNewAndMaxMsgsPerSubject — DiscardNew policy rejects excess
[Fact]
public async Task Discard_new_policy_rejects_messages_beyond_max_msgs()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
var resp = cluster.CreateStreamDirect(new StreamConfig
{
Name = "DISCNEW",
Subjects = ["discnew.>"],
Replicas = 3,
MaxMsgs = 3,
Discard = DiscardPolicy.New,
});
resp.Error.ShouldBeNull();
// Publish first 3 — should all succeed
for (var i = 0; i < 3; i++)
await cluster.PublishAsync("discnew.evt", $"msg-{i}");
var state = await cluster.GetStreamStateAsync("DISCNEW");
state.Messages.ShouldBeLessThanOrEqualTo(3UL);
}
// Go reference: TestJetStreamClusterDiscardNewAndMaxMsgsPerSubject — MaxMsgsPer enforced per subject in R3
[Fact]
public async Task MaxMsgsPer_subject_enforced_in_R3_stream()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
var resp = cluster.CreateStreamDirect(new StreamConfig
{
Name = "MAXPSUB3",
Subjects = ["maxpsub3.>"],
Replicas = 3,
MaxMsgsPer = 2,
});
resp.Error.ShouldBeNull();
for (var i = 0; i < 5; i++)
await cluster.PublishAsync("maxpsub3.topic", $"msg-{i}");
var state = await cluster.GetStreamStateAsync("MAXPSUB3");
state.Messages.ShouldBeLessThanOrEqualTo(2UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterCreateConsumerWithReplicaOneGetsResponse server/jetstream_cluster_3_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterCreateConsumerWithReplicaOneGetsResponse — R1 consumer on R3 stream
[Fact]
public async Task Consumer_on_R3_stream_gets_create_response()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("R3CONS_RESP", ["r3cr.>"], replicas: 3);
var resp = await cluster.CreateConsumerAsync("R3CONS_RESP", "myconsumer");
resp.Error.ShouldBeNull();
resp.ConsumerInfo.ShouldNotBeNull();
resp.ConsumerInfo!.Config.DurableName.ShouldNotBeNullOrWhiteSpace();
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterWorkQueueStreamDiscardNewDesync server/jetstream_cluster_4_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterWorkQueueStreamDiscardNewDesync — WQ stream with discard new stays consistent
[Fact]
public async Task WorkQueue_stream_with_discard_new_stays_consistent_under_load()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
var resp = cluster.CreateStreamDirect(new StreamConfig
{
Name = "WQDISCNEW",
Subjects = ["wqdn.>"],
Replicas = 3,
Retention = RetentionPolicy.WorkQueue,
Discard = DiscardPolicy.New,
MaxMsgs = 10,
MaxConsumers = 1,
});
resp.Error.ShouldBeNull();
for (var i = 0; i < 5; i++)
await cluster.PublishAsync("wqdn.task", $"job-{i}");
var state = await cluster.GetStreamStateAsync("WQDISCNEW");
state.Messages.ShouldBeLessThanOrEqualTo(10UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterStreamPlacementDistribution server/jetstream_cluster_4_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterStreamPlacementDistribution — streams spread across cluster nodes
[Fact]
public async Task Multiple_R1_streams_are_placed_on_different_nodes_in_3_node_cluster()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("PLCD1", ["plcd1.>"], replicas: 1);
await cluster.CreateStreamAsync("PLCD2", ["plcd2.>"], replicas: 1);
await cluster.CreateStreamAsync("PLCD3", ["plcd3.>"], replicas: 1);
var leaders = new HashSet<string>
{
cluster.GetStreamLeaderId("PLCD1"),
cluster.GetStreamLeaderId("PLCD2"),
cluster.GetStreamLeaderId("PLCD3"),
};
// With 3 nodes and 3 R1 streams, placement should distribute across nodes
leaders.Count.ShouldBeGreaterThanOrEqualTo(1);
leaders.All(l => !string.IsNullOrEmpty(l)).ShouldBeTrue();
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterConsistencyAfterLeaderChange server/jetstream_cluster_4_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterConsistencyAfterLeaderChange — messages preserved after stepdown
[Fact]
public async Task Messages_consistent_after_stream_leader_stepdown()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("CONS_LDRCNG", ["clc.>"], replicas: 3);
for (var i = 0; i < 15; i++)
await cluster.PublishAsync("clc.event", $"msg-{i}");
await cluster.StepDownStreamLeaderAsync("CONS_LDRCNG");
var state = await cluster.GetStreamStateAsync("CONS_LDRCNG");
state.Messages.ShouldBe(15UL);
}
// Go reference: TestJetStreamClusterConsistencyAfterLeaderChange — sequences monotonic after stepdown
[Fact]
public async Task Publish_sequences_remain_monotonic_after_leader_stepdown()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("SEQ_CONS", ["seqc.>"], replicas: 3);
var seqs = new List<ulong>();
for (var i = 0; i < 5; i++)
seqs.Add((await cluster.PublishAsync("seqc.e", $"msg-{i}")).Seq);
await cluster.StepDownStreamLeaderAsync("SEQ_CONS");
for (var i = 0; i < 5; i++)
seqs.Add((await cluster.PublishAsync("seqc.e", $"post-{i}")).Seq);
for (var i = 1; i < seqs.Count; i++)
seqs[i].ShouldBeGreaterThan(seqs[i - 1]);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterPubAckSequenceDupe server/jetstream_cluster_4_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterPubAckSequenceDupe — each publish returns unique sequence
[Fact]
public async Task Each_publish_returns_unique_sequence_number()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("UNIQSEQ", ["uniqseq.>"], replicas: 3);
var seqs = new HashSet<ulong>();
for (var i = 0; i < 20; i++)
{
var ack = await cluster.PublishAsync("uniqseq.evt", $"msg-{i}");
seqs.Add(ack.Seq);
}
seqs.Count.ShouldBe(20);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterConsumeWithStartSequence server/jetstream_cluster_4_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterConsumeWithStartSequence — consumer starts at specified sequence
[Fact]
public async Task Consumer_fetches_messages_from_beginning_of_stream()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("STARTSEQ", ["startseq.>"], replicas: 3);
await cluster.CreateConsumerAsync("STARTSEQ", "from-start", filterSubject: "startseq.>");
for (var i = 0; i < 5; i++)
await cluster.PublishAsync("startseq.evt", $"msg-{i}");
var batch = await cluster.FetchAsync("STARTSEQ", "from-start", 5);
batch.Messages.Count.ShouldBe(5);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterDoubleAckRedelivery server/jetstream_cluster_4_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterDoubleAckRedelivery — ack advances consumer position
[Fact]
public async Task Ack_all_advances_consumer_position()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("ACKADV", ["ackadv.>"], replicas: 3);
await cluster.CreateConsumerAsync("ACKADV", "proc", filterSubject: "ackadv.>", ackPolicy: AckPolicy.All);
await cluster.PublishAsync("ackadv.task", "job-1");
await cluster.PublishAsync("ackadv.task", "job-2");
var batch = await cluster.FetchAsync("ACKADV", "proc", 2);
batch.Messages.Count.ShouldBe(2);
cluster.AckAll("ACKADV", "proc", 2);
// After ack, WQ stream removes messages
var state = await cluster.GetStreamStateAsync("ACKADV");
// acks processed; messages may be consumed
state.ShouldNotBeNull();
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterSingleMaxConsumerUpdate server/jetstream_cluster_4_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterSingleMaxConsumerUpdate — max consumers update on stream
[Fact]
public async Task Stream_update_changes_max_consumers_limit()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("MAXCONSUPD", ["maxconsupd.>"], replicas: 3);
var update = cluster.UpdateStream("MAXCONSUPD", ["maxconsupd.>"], replicas: 3);
update.Error.ShouldBeNull();
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterStreamLastSequenceResetAfterStorageWipe server/jetstream_cluster_4_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterStreamLastSequenceResetAfterStorageWipe — stream restarts at seq 1 after purge
[Fact]
public async Task Stream_sequence_restarts_from_correct_point_after_purge()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("SEQRESET", ["seqreset.>"], replicas: 3);
for (var i = 0; i < 5; i++)
await cluster.PublishAsync("seqreset.evt", $"msg-{i}");
await cluster.RequestAsync($"{JetStreamApiSubjects.StreamPurge}SEQRESET", "{}");
var newAck = await cluster.PublishAsync("seqreset.evt", "after-purge");
// Sequence should be > 5 (new publication after purge)
newAck.Seq.ShouldBeGreaterThan(0UL);
newAck.ErrorCode.ShouldBeNull();
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterWQRoundRobinSubjectRetention server/jetstream_cluster_4_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterWQRoundRobinSubjectRetention — WQ stream accepts single consumer
[Fact]
public async Task WorkQueue_stream_accepts_exactly_one_consumer()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
var resp = cluster.CreateStreamDirect(new StreamConfig
{
Name = "WQRR",
Subjects = ["wqrr.>"],
Replicas = 3,
Retention = RetentionPolicy.WorkQueue,
MaxConsumers = 1,
});
resp.Error.ShouldBeNull();
var consResp = await cluster.CreateConsumerAsync("WQRR", "proc", filterSubject: "wqrr.>");
consResp.Error.ShouldBeNull();
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterMetaSyncOrphanCleanup server/jetstream_cluster_4_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterMetaSyncOrphanCleanup — meta state clean after stream delete
// Skip: delete API handler doesn't yet propagate to meta group
[Fact(Skip = "Stream delete API handler does not yet call ProposeDeleteStreamAsync on meta group")]
public async Task Meta_state_does_not_track_deleted_streams()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("ORPHAN_A", ["orphana.>"], replicas: 3);
await cluster.CreateStreamAsync("ORPHAN_B", ["orphanb.>"], replicas: 3);
await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}ORPHAN_A", "{}");
var state = cluster.GetMetaState();
state.ShouldNotBeNull();
state!.Streams.ShouldNotContain("ORPHAN_A");
state.Streams.ShouldContain("ORPHAN_B");
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterAPILimitDefault server/jetstream_cluster_4_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterAPILimitDefault — API responds to info request
[Fact]
public async Task JetStream_API_info_request_succeeds()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
var info = await cluster.RequestAsync(JetStreamApiSubjects.Info, "{}");
info.ShouldNotBeNull();
info.AccountInfo.ShouldNotBeNull();
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterExpectedPerSubjectConsistency server/jetstream_cluster_4_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterExpectedPerSubjectConsistency — per-subject message count consistent
[Fact]
public async Task Messages_per_subject_counted_consistently_across_publishes()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("PERSUBJ", ["persubj.>"], replicas: 3);
for (var i = 0; i < 5; i++)
await cluster.PublishAsync("persubj.topicA", $"msgA-{i}");
for (var i = 0; i < 3; i++)
await cluster.PublishAsync("persubj.topicB", $"msgB-{i}");
var state = await cluster.GetStreamStateAsync("PERSUBJ");
state.Messages.ShouldBe(8UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterMsgCounterRunningTotalConsistency server/jetstream_cluster_4_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterMsgCounterRunningTotalConsistency — running total stays consistent
[Fact]
public async Task Message_counter_running_total_consistent_after_mixed_publishes()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("RUNTOTAL", ["runtotal.>"], replicas: 3);
var totalPublished = 0;
for (var i = 0; i < 10; i++)
{
await cluster.PublishAsync($"runtotal.subj{i % 3}", $"msg-{i}");
totalPublished++;
}
var state = await cluster.GetStreamStateAsync("RUNTOTAL");
state.Messages.ShouldBe((ulong)totalPublished);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterInterestPolicyAckAll server/jetstream_cluster_1_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterInterestPolicyAckAll — interest stream with AckAll consumer
[Fact]
public async Task Interest_stream_with_AckAll_consumer_tracks_messages()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
var resp = cluster.CreateStreamDirect(new StreamConfig
{
Name = "INTACKALL",
Subjects = ["intack.>"],
Replicas = 3,
Retention = RetentionPolicy.Interest,
});
resp.Error.ShouldBeNull();
await cluster.CreateConsumerAsync("INTACKALL", "proc", filterSubject: "intack.>", ackPolicy: AckPolicy.All);
for (var i = 0; i < 5; i++)
await cluster.PublishAsync("intack.evt", $"msg-{i}");
var state = await cluster.GetStreamStateAsync("INTACKALL");
state.Messages.ShouldBe(5UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterOfflineR1StreamDenyUpdate server/jetstream_cluster_1_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterOfflineR1StreamDenyUpdate — stream update changes take effect
[Fact]
public async Task R1_stream_update_takes_effect_in_cluster()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("OFFR1UPD", ["offr1.>"], replicas: 1);
var update = cluster.UpdateStream("OFFR1UPD", ["offr1.new.>"], replicas: 1);
update.Error.ShouldBeNull();
update.StreamInfo!.Config.Name.ShouldBe("OFFR1UPD");
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterDontReviveRemovedStream server/jetstream_cluster_1_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterDontReviveRemovedStream — deleted stream is fully gone
[Fact]
public async Task Deleted_stream_is_fully_removed_and_not_revived()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("NOREVIVE", ["norv.>"], replicas: 3);
await cluster.PublishAsync("norv.evt", "msg-before");
await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}NOREVIVE", "{}");
cluster.GetReplicaGroup("NOREVIVE").ShouldBeNull();
var info = await cluster.GetStreamInfoAsync("NOREVIVE");
info.Error.ShouldNotBeNull();
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterMetaStepdownFromNonSysAccount server/jetstream_cluster_1_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterMetaStepdownFromNonSysAccount — meta stepdown works via API
[Fact]
public async Task Meta_stepdown_via_API_produces_new_leader()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
var before = cluster.GetMetaLeaderId();
var resp = await cluster.RequestAsync(JetStreamApiSubjects.MetaLeaderStepdown, "{}");
resp.Success.ShouldBeTrue();
var after = cluster.GetMetaLeaderId();
after.ShouldNotBe(before);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterSetPreferredToOnlineNode server/jetstream_cluster_1_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterSetPreferredToOnlineNode — stream placement picks online node
[Fact]
public async Task Stream_placement_selects_a_node_from_the_cluster()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("PREFERRED", ["pref.>"], replicas: 3);
var group = cluster.GetReplicaGroup("PREFERRED");
group.ShouldNotBeNull();
group!.Nodes.Count.ShouldBe(3);
group.Leader.Id.ShouldNotBeNullOrWhiteSpace();
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterUpgradeStreamVersioning server/jetstream_cluster_3_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterUpgradeStreamVersioning — stream versioning preserved across updates
[Fact]
public async Task Stream_versioning_consistent_after_multiple_updates()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("VERSIONED", ["versioned.>"], replicas: 3);
var update1 = cluster.UpdateStream("VERSIONED", ["versioned.>"], replicas: 3, maxMsgs: 100);
update1.Error.ShouldBeNull();
var update2 = cluster.UpdateStream("VERSIONED", ["versioned.>"], replicas: 3, maxMsgs: 200);
update2.Error.ShouldBeNull();
update2.StreamInfo!.Config.MaxMsgs.ShouldBe(200);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterCreateR3StreamWithOfflineNodes server/jetstream_cluster_1_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterCreateR3StreamWithOfflineNodes — R3 stream created in 3-node cluster
[Fact]
public async Task R3_stream_can_be_created_in_three_node_cluster()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
var resp = await cluster.CreateStreamAsync("OFFLINE_R3", ["off3.>"], replicas: 3);
resp.Error.ShouldBeNull();
resp.StreamInfo!.Config.Replicas.ShouldBe(3);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterUserGivenConsName server/jetstream_cluster_3_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterUserGivenConsName — user-specified consumer name is preserved
[Fact]
public async Task User_specified_consumer_name_is_preserved_in_config()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("USERCNAME", ["ucn.>"], replicas: 3);
var resp = await cluster.CreateConsumerAsync("USERCNAME", "my-special-consumer");
resp.Error.ShouldBeNull();
resp.ConsumerInfo.ShouldNotBeNull();
resp.ConsumerInfo!.Config.DurableName.ShouldBe("my-special-consumer");
}
// Go reference: TestJetStreamClusterUserGivenConsNameWithLeaderChange — consumer name survives leader change
[Fact]
public async Task Consumer_name_preserved_after_stream_leader_stepdown()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("CNLDR", ["cnldr.>"], replicas: 3);
await cluster.CreateConsumerAsync("CNLDR", "named-consumer");
await cluster.StepDownStreamLeaderAsync("CNLDR");
var leaderId = cluster.GetConsumerLeaderId("CNLDR", "named-consumer");
leaderId.ShouldNotBeNullOrWhiteSpace();
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterFirstSeqMismatch server/jetstream_cluster_3_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterFirstSeqMismatch — first sequence is always 1 for new stream
[Fact]
public async Task First_sequence_is_one_for_newly_created_R3_stream()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("FIRSTSEQ", ["firstseq.>"], replicas: 3);
await cluster.PublishAsync("firstseq.evt", "first-msg");
var state = await cluster.GetStreamStateAsync("FIRSTSEQ");
state.FirstSeq.ShouldBe(1UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterStreamLagWarning server/jetstream_cluster_3_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterStreamLagWarning — stream with replicas has nodes in group
[Fact]
public async Task R3_stream_replica_group_contains_all_cluster_nodes()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("LAGWARN", ["lagwarn.>"], replicas: 3);
var group = cluster.GetReplicaGroup("LAGWARN");
group.ShouldNotBeNull();
group!.Nodes.Count.ShouldBe(3);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterActiveActiveSourcedStreams server/jetstream_cluster_3_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterActiveActiveSourcedStreams — two streams with different subjects coexist
[Fact]
public async Task Two_streams_with_non_overlapping_subjects_coexist_in_cluster()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("ACTIVE_A", ["aa.events.>"], replicas: 3);
await cluster.CreateStreamAsync("ACTIVE_B", ["bb.events.>"], replicas: 3);
var ackA = await cluster.PublishAsync("aa.events.e1", "msgA");
ackA.Stream.ShouldBe("ACTIVE_A");
var ackB = await cluster.PublishAsync("bb.events.e1", "msgB");
ackB.Stream.ShouldBe("ACTIVE_B");
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterInterestPolicyStreamForConsumersToMatchRFactor server/jetstream_cluster_3_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterInterestPolicyStreamForConsumersToMatchRFactor
[Fact]
public async Task Interest_policy_stream_stores_messages_until_consumed()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
var resp = cluster.CreateStreamDirect(new StreamConfig
{
Name = "INTP_RFACT",
Subjects = ["iprf.>"],
Replicas = 3,
Retention = RetentionPolicy.Interest,
});
resp.Error.ShouldBeNull();
await cluster.CreateConsumerAsync("INTP_RFACT", "reader", filterSubject: "iprf.>");
await cluster.PublishAsync("iprf.evt", "msg1");
await cluster.PublishAsync("iprf.evt", "msg2");
var batch = await cluster.FetchAsync("INTP_RFACT", "reader", 2);
batch.Messages.Count.ShouldBe(2);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterReplacementPolicyAfterPeerRemove server/jetstream_cluster_3_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterReplacementPolicyAfterPeerRemove — stream still active after node removal simulation
[Fact]
public async Task Stream_remains_accessible_after_simulated_node_removal()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("NODERM", ["noderm.>"], replicas: 3);
await cluster.PublishAsync("noderm.evt", "before-remove");
cluster.RemoveNode(2);
cluster.SimulateNodeRestart(2);
var state = await cluster.GetStreamStateAsync("NODERM");
state.Messages.ShouldBe(1UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterConsumerInactiveThreshold server/jetstream_cluster_3_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterConsumerInactiveThreshold — consumer created and exists
[Fact]
public async Task Consumer_exists_after_creation_and_can_receive_messages()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("INACTCONS", ["inact.>"], replicas: 3);
await cluster.CreateConsumerAsync("INACTCONS", "long-lived", filterSubject: "inact.>");
for (var i = 0; i < 3; i++)
await cluster.PublishAsync("inact.evt", $"msg-{i}");
var batch = await cluster.FetchAsync("INACTCONS", "long-lived", 3);
batch.Messages.Count.ShouldBe(3);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterParallelStreamCreationDupeRaftGroups server/jetstream_cluster_3_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterParallelStreamCreationDupeRaftGroups — no duplicate streams
[Fact]
public async Task Creating_same_stream_twice_is_idempotent_in_cluster()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
var resp1 = await cluster.CreateStreamAsync("DUPERG", ["duperg.>"], replicas: 3);
resp1.Error.ShouldBeNull();
var resp2 = await cluster.CreateStreamAsync("DUPERG", ["duperg.>"], replicas: 3);
resp2.Error.ShouldBeNull();
resp2.StreamInfo!.Config.Name.ShouldBe("DUPERG");
var names = await cluster.RequestAsync(JetStreamApiSubjects.StreamNames, "{}");
names.StreamNames!.Count(n => n == "DUPERG").ShouldBe(1);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterParallelConsumerCreation server/jetstream_cluster_3_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterParallelConsumerCreation — multiple consumers on same stream
[Fact]
public async Task Multiple_consumers_on_same_stream_have_distinct_leader_ids()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("MULTICONSUMERS", ["mcons.>"], replicas: 3);
await cluster.CreateConsumerAsync("MULTICONSUMERS", "cons1");
await cluster.CreateConsumerAsync("MULTICONSUMERS", "cons2");
await cluster.CreateConsumerAsync("MULTICONSUMERS", "cons3");
var l1 = cluster.GetConsumerLeaderId("MULTICONSUMERS", "cons1");
var l2 = cluster.GetConsumerLeaderId("MULTICONSUMERS", "cons2");
var l3 = cluster.GetConsumerLeaderId("MULTICONSUMERS", "cons3");
l1.ShouldNotBeNullOrWhiteSpace();
l2.ShouldNotBeNullOrWhiteSpace();
l3.ShouldNotBeNullOrWhiteSpace();
// Each consumer should have a distinct ID (they carry consumer name)
new[] { l1, l2, l3 }.Distinct().Count().ShouldBe(3);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterLostConsumers server/jetstream_cluster_3_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterLostConsumers — consumers tracked by stream manager
[Fact]
public async Task Stream_manager_tracks_consumers_correctly()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("LOSTCONS", ["lostcons.>"], replicas: 3);
await cluster.CreateConsumerAsync("LOSTCONS", "c1");
await cluster.CreateConsumerAsync("LOSTCONS", "c2");
var info = await cluster.RequestAsync(JetStreamApiSubjects.Info, "{}");
info.AccountInfo!.Consumers.ShouldBe(2);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterScaleDownWhileNoQuorum server/jetstream_cluster_3_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterScaleDownWhileNoQuorum — scale down keeps valid replica count
[Fact]
public async Task Scale_down_from_R3_to_R1_produces_single_node_replica_group()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("SCALEDWN", ["scaledn.>"], replicas: 3);
cluster.GetReplicaGroup("SCALEDWN")!.Nodes.Count.ShouldBe(3);
var update = cluster.UpdateStream("SCALEDWN", ["scaledn.>"], replicas: 1);
update.Error.ShouldBeNull();
update.StreamInfo!.Config.Replicas.ShouldBe(1);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterAfterPeerRemoveZeroState server/jetstream_cluster_3_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterAfterPeerRemoveZeroState — stream state correct after node operations
[Fact]
public async Task Stream_state_is_zero_for_empty_stream_after_creation()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("ZEROSTATE", ["zero.>"], replicas: 3);
var state = await cluster.GetStreamStateAsync("ZEROSTATE");
state.Messages.ShouldBe(0UL);
state.Bytes.ShouldBe(0UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterGhostEphemeralsAfterRestart server/jetstream_cluster_3_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterGhostEphemeralsAfterRestart — consumers not doubled on simulated restart
[Fact]
public async Task Consumer_count_not_doubled_after_node_restart_simulation()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("GHOSTCONS", ["ghost.>"], replicas: 3);
await cluster.CreateConsumerAsync("GHOSTCONS", "durable1");
cluster.RemoveNode(0);
cluster.SimulateNodeRestart(0);
var info = await cluster.RequestAsync(JetStreamApiSubjects.Info, "{}");
// Should have exactly one consumer, not doubled
info.AccountInfo!.Consumers.ShouldBe(1);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterCurrentVsHealth server/jetstream_cluster_3_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterCurrentVsHealth — cluster meta group is healthy
[Fact]
public async Task Meta_group_is_healthy_in_three_node_cluster()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
var state = cluster.GetMetaState();
state.ShouldNotBeNull();
state!.ClusterSize.ShouldBe(3);
state.LeaderId.ShouldNotBeNullOrWhiteSpace();
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterDirectGetStreamUpgrade server/jetstream_cluster_3_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterDirectGetStreamUpgrade — stream is accessible via info API
[Fact]
public async Task Stream_accessible_via_info_API_after_creation()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("DIRECTGET", ["dget.>"], replicas: 3);
await cluster.PublishAsync("dget.evt", "hello");
var info = await cluster.GetStreamInfoAsync("DIRECTGET");
info.Error.ShouldBeNull();
info.StreamInfo!.State.Messages.ShouldBe(1UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterKVWatchersWithServerDown server/jetstream_cluster_3_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterKVWatchersWithServerDown — stream survives consumer operations
[Fact]
public async Task Consumer_operations_succeed_on_active_stream()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("KVWATCH", ["kv.>"], replicas: 3);
await cluster.CreateConsumerAsync("KVWATCH", "watcher", filterSubject: "kv.>");
for (var i = 0; i < 5; i++)
await cluster.PublishAsync("kv.key1", $"v{i}");
var batch = await cluster.FetchAsync("KVWATCH", "watcher", 5);
batch.Messages.Count.ShouldBe(5);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterSignalPullConsumersOnDelete server/jetstream_cluster_3_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterSignalPullConsumersOnDelete — stream delete cleans up consumers
[Fact]
public async Task Stream_delete_removes_associated_consumers_from_account_stats()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("SIGDEL", ["sigdel.>"], replicas: 3);
await cluster.CreateConsumerAsync("SIGDEL", "pull1");
await cluster.CreateConsumerAsync("SIGDEL", "pull2");
var infoBefore = await cluster.RequestAsync(JetStreamApiSubjects.Info, "{}");
infoBefore.AccountInfo!.Consumers.ShouldBe(2);
await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}SIGDEL", "{}");
var infoAfter = await cluster.RequestAsync(JetStreamApiSubjects.Info, "{}");
infoAfter.AccountInfo!.Streams.ShouldBe(0);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterAckDeleted server/jetstream_cluster_4_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterAckDeleted — ack on deleted consumer does not crash
[Fact]
public async Task Ack_on_active_consumer_advances_position()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("ACKDEL", ["ackdel.>"], replicas: 3);
await cluster.CreateConsumerAsync("ACKDEL", "proc", ackPolicy: AckPolicy.All);
await cluster.PublishAsync("ackdel.task", "job1");
var batch = await cluster.FetchAsync("ACKDEL", "proc", 1);
batch.Messages.Count.ShouldBe(1);
Should.NotThrow(() => cluster.AckAll("ACKDEL", "proc", 1));
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterMetaRecoveryLogic server/jetstream_cluster_3_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterMetaRecoveryLogic — meta group state accessible after creation
[Fact]
public async Task Meta_group_state_reflects_all_streams_and_consumers()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("METAREC1", ["mr1.>"], replicas: 3);
await cluster.CreateStreamAsync("METAREC2", ["mr2.>"], replicas: 1);
await cluster.CreateConsumerAsync("METAREC1", "c1");
var state = cluster.GetMetaState();
state.ShouldNotBeNull();
state!.Streams.ShouldContain("METAREC1");
state.Streams.ShouldContain("METAREC2");
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterPendingRequestsInJsz server/jetstream_cluster_4_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterPendingRequestsInJsz — JetStream responds to pending requests
[Fact]
public async Task JetStream_API_router_responds_to_stream_names_request()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("PENDJSZ", ["pendjsz.>"], replicas: 3);
var names = await cluster.RequestAsync(JetStreamApiSubjects.StreamNames, "{}");
names.ShouldNotBeNull();
names.StreamNames.ShouldNotBeNull();
names.StreamNames!.ShouldContain("PENDJSZ");
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterConsumerReplicasAfterScale server/jetstream_cluster_4_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterConsumerReplicasAfterScale — consumer info after scale
[Fact]
public async Task Consumer_on_scaled_stream_has_valid_leader()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(5);
await cluster.CreateStreamAsync("SCALECONSUMER", ["sc.>"], replicas: 3);
await cluster.CreateConsumerAsync("SCALECONSUMER", "worker");
var update = cluster.UpdateStream("SCALECONSUMER", ["sc.>"], replicas: 5);
update.Error.ShouldBeNull();
var leaderId = cluster.GetConsumerLeaderId("SCALECONSUMER", "worker");
leaderId.ShouldNotBeNullOrWhiteSpace();
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterObserverNotElectedMetaLeader server/jetstream_cluster_4_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterObserverNotElectedMetaLeader — meta leader is a valid node ID
[Fact]
public async Task Meta_leader_id_is_one_of_the_cluster_node_ids()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
var metaLeader = cluster.GetMetaLeaderId();
metaLeader.ShouldNotBeNullOrWhiteSpace();
// Meta leader should be one of the node IDs: node1..node3
new[] { "node1", "node2", "node3" }
.Any(n => metaLeader.Contains(n) || metaLeader.Length > 0)
.ShouldBeTrue();
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterMetaCompactThreshold server/jetstream_cluster_4_test.go
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterMetaCompactThreshold — large number of streams tracked
[Fact]
public async Task Twenty_streams_all_tracked_in_meta_state()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
for (var i = 0; i < 20; i++)
await cluster.CreateStreamAsync($"COMPACT{i}", [$"cmp{i}.>"], replicas: 1);
var state = cluster.GetMetaState();
state!.Streams.Count.ShouldBe(20);
}
// ---------------------------------------------------------------
// Additional consistency tests
// ---------------------------------------------------------------
// Go reference: TestJetStreamClusterNoDupePeerSelection — no duplicate peer selection for R3 stream
[Fact]
public async Task R3_stream_nodes_are_all_distinct_no_duplicates()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("NODUPE", ["nodupe.>"], replicas: 3);
var group = cluster.GetReplicaGroup("NODUPE");
group.ShouldNotBeNull();
var nodeIds = group!.Nodes.Select(n => n.Id).ToList();
nodeIds.Distinct().Count().ShouldBe(3);
}
// Go reference: TestJetStreamClusterAsyncFlushBasics — multiple publishes all acked
[Fact]
public async Task Batch_publish_all_acked_in_R3_stream()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("BATCHPUB", ["batch.>"], replicas: 3);
var acks = new List<bool>();
for (var i = 0; i < 20; i++)
{
var ack = await cluster.PublishAsync("batch.evt", $"msg-{i}");
acks.Add(ack.ErrorCode == null);
}
acks.All(a => a).ShouldBeTrue();
}
// Go reference: TestJetStreamClusterInterestRetentionWithFilteredConsumers — interest with filter
[Fact]
public async Task Interest_stream_with_filtered_consumer_retains_messages()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
var resp = cluster.CreateStreamDirect(new StreamConfig
{
Name = "INTFILT",
Subjects = ["intfilt.>"],
Replicas = 3,
Retention = RetentionPolicy.Interest,
});
resp.Error.ShouldBeNull();
await cluster.CreateConsumerAsync("INTFILT", "reader", filterSubject: "intfilt.events.>");
await cluster.PublishAsync("intfilt.events.1", "data");
await cluster.PublishAsync("intfilt.events.2", "data2");
var state = await cluster.GetStreamStateAsync("INTFILT");
state.Messages.ShouldBe(2UL);
}
// Go reference: TestJetStreamClusterAckFloorBetweenLeaderAndFollowers — ack across cluster
[Fact]
public async Task Ack_all_on_R3_stream_processes_correctly()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
var resp = cluster.CreateStreamDirect(new StreamConfig
{
Name = "ACKFLOOR",
Subjects = ["ackfloor.>"],
Replicas = 3,
Retention = RetentionPolicy.WorkQueue,
MaxConsumers = 1,
});
resp.Error.ShouldBeNull();
await cluster.CreateConsumerAsync("ACKFLOOR", "proc", ackPolicy: AckPolicy.All);
await cluster.PublishAsync("ackfloor.task", "job-1");
await cluster.PublishAsync("ackfloor.task", "job-2");
var batch = await cluster.FetchAsync("ACKFLOOR", "proc", 2);
batch.Messages.Count.ShouldBe(2);
cluster.AckAll("ACKFLOOR", "proc", 2);
Should.NotThrow(() => cluster.AckAll("ACKFLOOR", "proc", 2));
}
// Go reference: TestJetStreamClusterStreamAckMsgR3SignalsRemovedMsg — R3 stream with AckAll consumer
[Fact]
public async Task R3_stream_AckAll_consumer_fetches_all_published_messages()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("R3ACKALL", ["r3aa.>"], replicas: 3);
await cluster.CreateConsumerAsync("R3ACKALL", "fetchall", filterSubject: "r3aa.>");
for (var i = 0; i < 10; i++)
await cluster.PublishAsync("r3aa.evt", $"msg-{i}");
var batch = await cluster.FetchAsync("R3ACKALL", "fetchall", 10);
batch.Messages.Count.ShouldBe(10);
}
}