Files
natsdotnet/tests/NATS.Server.JetStream.Tests/JetStream/Cluster/JsClusterStreamReplicationTests.cs
Joseph Doherty 78b4bc2486 refactor: extract NATS.Server.JetStream.Tests project
Move 225 JetStream-related test files from NATS.Server.Tests into a
dedicated NATS.Server.JetStream.Tests project. This includes root-level
JetStream*.cs files, storage test files (FileStore, MemStore,
StreamStoreContract), and the full JetStream/ subfolder tree (Api,
Cluster, Consumers, MirrorSource, Snapshots, Storage, Streams).

Updated all namespaces, added InternalsVisibleTo, registered in the
solution file, and added the JETSTREAM_INTEGRATION_MATRIX define.
2026-03-12 15:58:10 -04:00

1065 lines
41 KiB
C#

// Go parity: golang/nats-server/server/jetstream_cluster_1_test.go
// Covers: R1 and R3 stream creation, replica group behaviors, publish preservation,
// stream state, multi-stream coexistence, update, delete, purge, max limits,
// subject filtering, wildcard subjects, memory vs file store in cluster.
using System.Text;
using NATS.Server.JetStream.Api;
using NATS.Server.JetStream.Cluster;
using NATS.Server.JetStream.Models;
using NATS.Server.TestUtilities;
namespace NATS.Server.JetStream.Tests.JetStream.Cluster;
/// <summary>
/// Tests covering JetStream cluster stream replication semantics:
/// R1 and R3 stream creation, replica group sizes, publish durability,
/// state accuracy, multi-stream coexistence, update/delete/purge, limits,
/// subject filtering, wildcard subjects, and storage type.
/// Ported from Go jetstream_cluster_1_test.go.
/// </summary>
public class JsClusterStreamReplicationTests
{
// ---------------------------------------------------------------
// Go: TestJetStreamClusterSingleReplicaStreams server/jetstream_cluster_1_test.go:223
// ---------------------------------------------------------------
[Fact]
public async Task R1_stream_creation_in_three_node_cluster_succeeds()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
var resp = await cluster.CreateStreamAsync("R1BASIC", ["r1basic.>"], replicas: 1);
resp.Error.ShouldBeNull();
resp.StreamInfo.ShouldNotBeNull();
resp.StreamInfo!.Config.Name.ShouldBe("R1BASIC");
resp.StreamInfo.Config.Replicas.ShouldBe(1);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299
// ---------------------------------------------------------------
[Fact]
public async Task R3_stream_creation_in_three_node_cluster_succeeds()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
var resp = await cluster.CreateStreamAsync("R3BASIC", ["r3basic.>"], replicas: 3);
resp.Error.ShouldBeNull();
resp.StreamInfo.ShouldNotBeNull();
resp.StreamInfo!.Config.Name.ShouldBe("R3BASIC");
resp.StreamInfo.Config.Replicas.ShouldBe(3);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterSingleReplicaStreams server/jetstream_cluster_1_test.go:223
// ---------------------------------------------------------------
[Fact]
public async Task R1_stream_has_single_node_replica_group()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("R1GROUP", ["r1g.>"], replicas: 1);
var group = cluster.GetReplicaGroup("R1GROUP");
group.ShouldNotBeNull();
group!.Nodes.Count.ShouldBe(1);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299
// ---------------------------------------------------------------
[Fact]
public async Task R3_stream_has_three_node_replica_group()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("R3GROUP", ["r3g.>"], replicas: 3);
var group = cluster.GetReplicaGroup("R3GROUP");
group.ShouldNotBeNull();
group!.Nodes.Count.ShouldBe(3);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterSingleReplicaStreams server/jetstream_cluster_1_test.go:223
// ---------------------------------------------------------------
[Fact]
public async Task R1_replica_group_has_a_leader()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("R1LEAD", ["r1lead.>"], replicas: 1);
var leaderId = cluster.GetStreamLeaderId("R1LEAD");
leaderId.ShouldNotBeNullOrWhiteSpace();
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299
// ---------------------------------------------------------------
[Fact]
public async Task R3_replica_group_has_a_leader()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("R3LEAD", ["r3lead.>"], replicas: 3);
var leaderId = cluster.GetStreamLeaderId("R3LEAD");
leaderId.ShouldNotBeNullOrWhiteSpace();
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterSingleReplicaStreams server/jetstream_cluster_1_test.go:223
// ---------------------------------------------------------------
[Fact]
public async Task Publish_to_R1_stream_preserves_messages()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("R1PUB", ["r1pub.>"], replicas: 1);
for (var i = 0; i < 10; i++)
{
var ack = await cluster.PublishAsync("r1pub.event", $"msg-{i}");
ack.Stream.ShouldBe("R1PUB");
ack.Seq.ShouldBe((ulong)(i + 1));
}
var state = await cluster.GetStreamStateAsync("R1PUB");
state.Messages.ShouldBe(10UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299
// ---------------------------------------------------------------
[Fact]
public async Task Publish_to_R3_stream_preserves_messages()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("R3PUB", ["r3pub.>"], replicas: 3);
for (var i = 0; i < 10; i++)
{
var ack = await cluster.PublishAsync("r3pub.event", $"msg-{i}");
ack.Stream.ShouldBe("R3PUB");
ack.Seq.ShouldBe((ulong)(i + 1));
}
var state = await cluster.GetStreamStateAsync("R3PUB");
state.Messages.ShouldBe(10UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterExtendedStreamInfo server/jetstream_cluster_1_test.go:1878
// ---------------------------------------------------------------
[Fact]
public async Task Stream_info_consistency_for_R1_replicated_stream()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("INFOR1", ["infor1.>"], replicas: 1);
for (var i = 0; i < 5; i++)
await cluster.PublishAsync("infor1.event", $"msg-{i}");
var info = await cluster.GetStreamInfoAsync("INFOR1");
info.Error.ShouldBeNull();
info.StreamInfo.ShouldNotBeNull();
info.StreamInfo!.Config.Name.ShouldBe("INFOR1");
info.StreamInfo.Config.Replicas.ShouldBe(1);
info.StreamInfo.State.Messages.ShouldBe(5UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterExtendedStreamInfo server/jetstream_cluster_1_test.go:1878
// ---------------------------------------------------------------
[Fact]
public async Task Stream_info_consistency_for_R3_replicated_stream()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("INFOR3", ["infor3.>"], replicas: 3);
for (var i = 0; i < 5; i++)
await cluster.PublishAsync("infor3.event", $"msg-{i}");
var info = await cluster.GetStreamInfoAsync("INFOR3");
info.Error.ShouldBeNull();
info.StreamInfo.ShouldNotBeNull();
info.StreamInfo!.Config.Name.ShouldBe("INFOR3");
info.StreamInfo.Config.Replicas.ShouldBe(3);
info.StreamInfo.State.Messages.ShouldBe(5UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterStreamSynchedTimeStamps server/jetstream_cluster_1_test.go:977
// ---------------------------------------------------------------
[Fact]
public async Task Stream_state_msg_count_accurate_after_publishes_R1()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("STATER1", ["stater1.>"], replicas: 1);
const int count = 25;
for (var i = 0; i < count; i++)
await cluster.PublishAsync("stater1.data", $"payload-{i}");
var state = await cluster.GetStreamStateAsync("STATER1");
state.Messages.ShouldBe((ulong)count);
state.FirstSeq.ShouldBe(1UL);
state.LastSeq.ShouldBe((ulong)count);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterStreamSynchedTimeStamps server/jetstream_cluster_1_test.go:977
// ---------------------------------------------------------------
[Fact]
public async Task Stream_state_msg_count_accurate_after_publishes_R3()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("STATER3", ["stater3.>"], replicas: 3);
const int count = 25;
for (var i = 0; i < count; i++)
await cluster.PublishAsync("stater3.data", $"payload-{i}");
var state = await cluster.GetStreamStateAsync("STATER3");
state.Messages.ShouldBe((ulong)count);
state.FirstSeq.ShouldBe(1UL);
state.LastSeq.ShouldBe((ulong)count);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterStreamSynchedTimeStamps server/jetstream_cluster_1_test.go:977
// ---------------------------------------------------------------
[Fact]
public async Task Stream_state_bytes_non_zero_after_publishes()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("BYTECHK", ["bytechk.>"], replicas: 3);
for (var i = 0; i < 10; i++)
await cluster.PublishAsync("bytechk.data", new string('X', 100));
var state = await cluster.GetStreamStateAsync("BYTECHK");
state.Messages.ShouldBe(10UL);
state.Bytes.ShouldBeGreaterThan(0UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterSingleReplicaStreams / TestJetStreamClusterMultiReplicaStreams
// server/jetstream_cluster_1_test.go:223, 299
// ---------------------------------------------------------------
[Fact]
public async Task R1_and_R3_streams_coexist_in_same_cluster()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
var r1 = await cluster.CreateStreamAsync("COEXR1", ["coex.r1.>"], replicas: 1);
var r3 = await cluster.CreateStreamAsync("COEXR3", ["coex.r3.>"], replicas: 3);
r1.Error.ShouldBeNull();
r3.Error.ShouldBeNull();
var groupR1 = cluster.GetReplicaGroup("COEXR1");
var groupR3 = cluster.GetReplicaGroup("COEXR3");
groupR1!.Nodes.Count.ShouldBe(1);
groupR3!.Nodes.Count.ShouldBe(3);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299
// ---------------------------------------------------------------
[Fact]
public async Task Multiple_streams_with_different_replica_counts_coexist()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(5);
await cluster.CreateStreamAsync("MIX1", ["mix1.>"], replicas: 1);
await cluster.CreateStreamAsync("MIX3", ["mix3.>"], replicas: 3);
await cluster.CreateStreamAsync("MIX5", ["mix5.>"], replicas: 5);
cluster.GetReplicaGroup("MIX1")!.Nodes.Count.ShouldBe(1);
cluster.GetReplicaGroup("MIX3")!.Nodes.Count.ShouldBe(3);
cluster.GetReplicaGroup("MIX5")!.Nodes.Count.ShouldBe(5);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterStreamUpdate server/jetstream_cluster_1_test.go:1433
// ---------------------------------------------------------------
[Fact]
public async Task Stream_update_changes_replica_count_from_1_to_3()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("UPDREP", ["updrep.>"], replicas: 1);
cluster.GetReplicaGroup("UPDREP")!.Nodes.Count.ShouldBe(1);
// Update via CreateOrUpdate — new replica group is created if replicas differ
var update = cluster.UpdateStream("UPDREP", ["updrep.>"], replicas: 3);
update.Error.ShouldBeNull();
update.StreamInfo!.Config.Replicas.ShouldBe(3);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterStreamUpdate server/jetstream_cluster_1_test.go:1433
// ---------------------------------------------------------------
[Fact]
public async Task Stream_update_does_not_lose_existing_messages()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("UPDMSG", ["updmsg.>"], replicas: 3);
for (var i = 0; i < 10; i++)
await cluster.PublishAsync("updmsg.event", $"msg-{i}");
var update = cluster.UpdateStream("UPDMSG", ["updmsg.>"], replicas: 3, maxMsgs: 50);
update.Error.ShouldBeNull();
var state = await cluster.GetStreamStateAsync("UPDMSG");
state.Messages.ShouldBe(10UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterDelete server/jetstream_cluster_1_test.go:472
// ---------------------------------------------------------------
[Fact]
public async Task Stream_delete_removes_stream_and_replica_group()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("DELDEMO", ["deldemo.>"], replicas: 3);
cluster.GetReplicaGroup("DELDEMO").ShouldNotBeNull();
var del = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}DELDEMO", "{}");
del.Success.ShouldBeTrue();
// Replica group should be gone
cluster.GetReplicaGroup("DELDEMO").ShouldBeNull();
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterDelete server/jetstream_cluster_1_test.go:472
// ---------------------------------------------------------------
[Fact]
public async Task Stream_delete_reflected_in_account_info()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("DELACCT", ["delacct.>"], replicas: 3);
await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}DELACCT", "{}");
var info = await cluster.RequestAsync(JetStreamApiSubjects.Info, "{}");
info.AccountInfo.ShouldNotBeNull();
info.AccountInfo!.Streams.ShouldBe(0);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterStreamPurge server/jetstream_cluster_1_test.go:522
// ---------------------------------------------------------------
[Fact]
public async Task Stream_purge_clears_all_messages_in_R3_stream()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("PURGER3", ["purger3.>"], replicas: 3);
for (var i = 0; i < 50; i++)
await cluster.PublishAsync("purger3.data", $"msg-{i}");
var before = await cluster.GetStreamStateAsync("PURGER3");
before.Messages.ShouldBe(50UL);
var purge = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamPurge}PURGER3", "{}");
purge.Success.ShouldBeTrue();
var after = await cluster.GetStreamStateAsync("PURGER3");
after.Messages.ShouldBe(0UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterStreamPurge server/jetstream_cluster_1_test.go:522
// ---------------------------------------------------------------
[Fact]
public async Task Stream_purge_preserves_stream_config()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("PURGECFG", ["purgecfg.>"], replicas: 3);
for (var i = 0; i < 10; i++)
await cluster.PublishAsync("purgecfg.data", $"msg-{i}");
await cluster.RequestAsync($"{JetStreamApiSubjects.StreamPurge}PURGECFG", "{}");
var info = await cluster.GetStreamInfoAsync("PURGECFG");
info.Error.ShouldBeNull();
info.StreamInfo.ShouldNotBeNull();
info.StreamInfo!.Config.Name.ShouldBe("PURGECFG");
info.StreamInfo.Config.Replicas.ShouldBe(3);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterStreamLimits server/jetstream_cluster_1_test.go:3248
// ---------------------------------------------------------------
[Fact]
public async Task Max_messages_enforced_in_R1_replicated_stream()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
var resp = cluster.CreateStreamDirect(new StreamConfig
{
Name = "MAXMSGR1",
Subjects = ["maxmsgr1.>"],
Replicas = 1,
MaxMsgs = 5,
});
resp.Error.ShouldBeNull();
for (var i = 0; i < 10; i++)
await cluster.PublishAsync("maxmsgr1.event", $"msg-{i}");
var state = await cluster.GetStreamStateAsync("MAXMSGR1");
state.Messages.ShouldBeLessThanOrEqualTo(5UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterStreamLimits server/jetstream_cluster_1_test.go:3248
// ---------------------------------------------------------------
[Fact]
public async Task Max_messages_enforced_in_R3_replicated_stream()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
var resp = cluster.CreateStreamDirect(new StreamConfig
{
Name = "MAXMSGR3",
Subjects = ["maxmsgr3.>"],
Replicas = 3,
MaxMsgs = 5,
});
resp.Error.ShouldBeNull();
for (var i = 0; i < 10; i++)
await cluster.PublishAsync("maxmsgr3.event", $"msg-{i}");
var state = await cluster.GetStreamStateAsync("MAXMSGR3");
state.Messages.ShouldBeLessThanOrEqualTo(5UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterMaxBytesForStream server/jetstream_cluster_1_test.go:1099
// ---------------------------------------------------------------
[Fact]
public async Task Max_bytes_enforced_in_R3_replicated_stream()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
var resp = cluster.CreateStreamDirect(new StreamConfig
{
Name = "MAXBYTESR3",
Subjects = ["maxbytesr3.>"],
Replicas = 3,
MaxBytes = 512,
Discard = DiscardPolicy.Old,
});
resp.Error.ShouldBeNull();
for (var i = 0; i < 20; i++)
await cluster.PublishAsync("maxbytesr3.data", new string('Y', 64));
var state = await cluster.GetStreamStateAsync("MAXBYTESR3");
((long)state.Bytes).ShouldBeLessThanOrEqualTo(512 + 128);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterStreamUpdateSubjects server/jetstream_cluster_1_test.go:571
// ---------------------------------------------------------------
[Fact]
public async Task Subject_filtering_routes_to_correct_R3_stream()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("FILTDERA", ["filter.a.>"], replicas: 3);
await cluster.CreateStreamAsync("FILTDERB", ["filter.b.>"], replicas: 3);
var ackA = await cluster.PublishAsync("filter.a.event", "msgA");
ackA.Stream.ShouldBe("FILTDERA");
var ackB = await cluster.PublishAsync("filter.b.event", "msgB");
ackB.Stream.ShouldBe("FILTDERB");
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterStreamOverlapSubjects server/jetstream_cluster_1_test.go:1248
// ---------------------------------------------------------------
[Fact]
public async Task Multiple_subjects_in_single_R3_stream_all_captured()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("MULTISUB", ["sub.alpha", "sub.beta", "sub.gamma"], replicas: 3);
await cluster.PublishAsync("sub.alpha", "alpha-msg");
await cluster.PublishAsync("sub.beta", "beta-msg");
await cluster.PublishAsync("sub.gamma", "gamma-msg");
var state = await cluster.GetStreamStateAsync("MULTISUB");
state.Messages.ShouldBe(3UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299
// ---------------------------------------------------------------
[Fact]
public async Task Wildcard_subject_captures_all_matching_messages_in_R3_stream()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("WILDCARD", ["wc.>"], replicas: 3);
await cluster.PublishAsync("wc.a", "msg1");
await cluster.PublishAsync("wc.b.c", "msg2");
await cluster.PublishAsync("wc.x.y.z", "msg3");
var state = await cluster.GetStreamStateAsync("WILDCARD");
state.Messages.ShouldBe(3UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterMemoryStore server/jetstream_cluster_1_test.go:423
// ---------------------------------------------------------------
[Fact]
public async Task Memory_store_R1_stream_reflects_correct_backend_type()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("MEMR1", ["memr1.>"], replicas: 1, storage: StorageType.Memory);
var backend = cluster.GetStoreBackendType("MEMR1");
backend.ShouldBe("memory");
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterMemoryStore server/jetstream_cluster_1_test.go:423
// ---------------------------------------------------------------
[Fact]
public async Task Memory_store_R3_stream_reflects_correct_backend_type()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("MEMR3", ["memr3.>"], replicas: 3, storage: StorageType.Memory);
var backend = cluster.GetStoreBackendType("MEMR3");
backend.ShouldBe("memory");
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterMultiReplicaStreamsDefaultFileMem server/jetstream_cluster_1_test.go:355
// ---------------------------------------------------------------
[Fact]
public async Task Default_storage_type_is_memory_for_R3_stream()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
var resp = await cluster.CreateStreamAsync("DEFMEM", ["defmem.>"], replicas: 3);
resp.StreamInfo!.Config.Storage.ShouldBe(StorageType.Memory);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterStreamSynchedTimeStamps server/jetstream_cluster_1_test.go:977
// ---------------------------------------------------------------
[Fact]
public async Task R3_stream_sequences_are_strictly_monotonic()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("SEQR3", ["seqr3.>"], replicas: 3);
var sequences = new List<ulong>();
for (var i = 0; i < 20; i++)
{
var ack = await cluster.PublishAsync("seqr3.event", $"msg-{i}");
sequences.Add(ack.Seq);
}
for (var i = 1; i < sequences.Count; i++)
sequences[i].ShouldBeGreaterThan(sequences[i - 1]);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterSingleReplicaStreams server/jetstream_cluster_1_test.go:223
// ---------------------------------------------------------------
[Fact]
public async Task R1_stream_sequences_are_strictly_monotonic()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("SEQR1", ["seqr1.>"], replicas: 1);
var sequences = new List<ulong>();
for (var i = 0; i < 20; i++)
{
var ack = await cluster.PublishAsync("seqr1.event", $"msg-{i}");
sequences.Add(ack.Seq);
}
for (var i = 1; i < sequences.Count; i++)
sequences[i].ShouldBeGreaterThan(sequences[i - 1]);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterDoubleAdd server/jetstream_cluster_1_test.go:1551
// ---------------------------------------------------------------
[Fact]
public async Task R1_stream_creation_is_idempotent()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
var first = await cluster.CreateStreamAsync("IDEMP1", ["idemp1.>"], replicas: 1);
first.Error.ShouldBeNull();
var second = await cluster.CreateStreamAsync("IDEMP1", ["idemp1.>"], replicas: 1);
second.Error.ShouldBeNull();
second.StreamInfo!.Config.Name.ShouldBe("IDEMP1");
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterDoubleAdd server/jetstream_cluster_1_test.go:1551
// ---------------------------------------------------------------
[Fact]
public async Task R3_stream_creation_is_idempotent()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
var first = await cluster.CreateStreamAsync("IDEMP3", ["idemp3.>"], replicas: 3);
first.Error.ShouldBeNull();
var second = await cluster.CreateStreamAsync("IDEMP3", ["idemp3.>"], replicas: 3);
second.Error.ShouldBeNull();
second.StreamInfo!.Config.Name.ShouldBe("IDEMP3");
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterStreamInfoList server/jetstream_cluster_1_test.go:1284
// ---------------------------------------------------------------
[Fact]
public async Task Stream_names_api_lists_all_replicated_streams()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("LST1", ["lst1.>"], replicas: 1);
await cluster.CreateStreamAsync("LST3A", ["lst3a.>"], replicas: 3);
await cluster.CreateStreamAsync("LST3B", ["lst3b.>"], replicas: 3);
var names = await cluster.RequestAsync(JetStreamApiSubjects.StreamNames, "{}");
names.StreamNames.ShouldNotBeNull();
names.StreamNames!.Count.ShouldBe(3);
names.StreamNames.ShouldContain("LST1");
names.StreamNames.ShouldContain("LST3A");
names.StreamNames.ShouldContain("LST3B");
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterStreamInfoList server/jetstream_cluster_1_test.go:1284
// ---------------------------------------------------------------
[Fact]
public async Task Stream_info_via_api_router_returns_replicated_config()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("APIR3", ["apir3.>"], replicas: 3);
var resp = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamInfo}APIR3", "{}");
resp.Error.ShouldBeNull();
resp.StreamInfo.ShouldNotBeNull();
resp.StreamInfo!.Config.Name.ShouldBe("APIR3");
resp.StreamInfo.Config.Replicas.ShouldBe(3);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterStreamPurge server/jetstream_cluster_1_test.go:522
// ---------------------------------------------------------------
[Fact]
public async Task R1_stream_purge_clears_messages_and_stream_exists()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("PURGER1", ["purger1.>"], replicas: 1);
for (var i = 0; i < 20; i++)
await cluster.PublishAsync("purger1.data", $"msg-{i}");
await cluster.RequestAsync($"{JetStreamApiSubjects.StreamPurge}PURGER1", "{}");
var state = await cluster.GetStreamStateAsync("PURGER1");
state.Messages.ShouldBe(0UL);
// Stream still exists after purge
var info = await cluster.GetStreamInfoAsync("PURGER1");
info.Error.ShouldBeNull();
info.StreamInfo.ShouldNotBeNull();
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299
// ---------------------------------------------------------------
[Fact]
public async Task R3_stream_publish_ack_carries_correct_stream_name()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("ACKNAME", ["ackname.>"], replicas: 3);
var ack = await cluster.PublishAsync("ackname.event", "payload");
ack.Stream.ShouldBe("ACKNAME");
ack.ErrorCode.ShouldBeNull();
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterStreamExtendedUpdates server/jetstream_cluster_1_test.go:1513
// ---------------------------------------------------------------
[Fact]
public async Task Update_max_msgs_on_R3_stream_takes_effect()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("UPDMAX", ["updmax.>"], replicas: 3);
for (var i = 0; i < 10; i++)
await cluster.PublishAsync("updmax.event", $"msg-{i}");
var update = cluster.UpdateStream("UPDMAX", ["updmax.>"], replicas: 3, maxMsgs: 5);
update.Error.ShouldBeNull();
update.StreamInfo!.Config.MaxMsgs.ShouldBe(5);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterSingleReplicaStreams server/jetstream_cluster_1_test.go:223
// ---------------------------------------------------------------
[Fact]
public async Task R1_stream_info_first_and_last_seq_accurate()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("SEQCHKR1", ["seqchkr1.>"], replicas: 1);
for (var i = 0; i < 8; i++)
await cluster.PublishAsync("seqchkr1.event", $"msg-{i}");
var state = await cluster.GetStreamStateAsync("SEQCHKR1");
state.FirstSeq.ShouldBe(1UL);
state.LastSeq.ShouldBe(8UL);
state.Messages.ShouldBe(8UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299
// ---------------------------------------------------------------
[Fact]
public async Task R3_stream_info_first_and_last_seq_accurate()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("SEQCHKR3", ["seqchkr3.>"], replicas: 3);
for (var i = 0; i < 8; i++)
await cluster.PublishAsync("seqchkr3.event", $"msg-{i}");
var state = await cluster.GetStreamStateAsync("SEQCHKR3");
state.FirstSeq.ShouldBe(1UL);
state.LastSeq.ShouldBe(8UL);
state.Messages.ShouldBe(8UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterDelete server/jetstream_cluster_1_test.go:472
// ---------------------------------------------------------------
[Fact]
public async Task Deleting_R1_stream_removes_it_from_stream_names()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("DELR1", ["delr1.>"], replicas: 1);
await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}DELR1", "{}");
var names = await cluster.RequestAsync(JetStreamApiSubjects.StreamNames, "{}");
// Either empty list or does not contain deleted stream
if (names.StreamNames != null)
names.StreamNames.ShouldNotContain("DELR1");
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterDelete server/jetstream_cluster_1_test.go:472
// ---------------------------------------------------------------
[Fact]
public async Task Deleting_R3_stream_removes_it_from_stream_names()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("DELR3", ["delr3.>"], replicas: 3);
await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}DELR3", "{}");
var names = await cluster.RequestAsync(JetStreamApiSubjects.StreamNames, "{}");
if (names.StreamNames != null)
names.StreamNames.ShouldNotContain("DELR3");
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterStreamPublishWithActiveConsumers server/jetstream_cluster_1_test.go:1132
// ---------------------------------------------------------------
[Fact]
public async Task R1_stream_with_consumer_delivers_all_messages()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("R1CONS", ["r1cons.>"], replicas: 1);
await cluster.CreateConsumerAsync("R1CONS", "worker", filterSubject: "r1cons.>");
for (var i = 0; i < 10; i++)
await cluster.PublishAsync("r1cons.task", $"job-{i}");
var batch = await cluster.FetchAsync("R1CONS", "worker", 10);
batch.Messages.Count.ShouldBe(10);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterStreamPublishWithActiveConsumers server/jetstream_cluster_1_test.go:1132
// ---------------------------------------------------------------
[Fact]
public async Task R3_stream_with_consumer_delivers_all_messages()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("R3CONS", ["r3cons.>"], replicas: 3);
await cluster.CreateConsumerAsync("R3CONS", "worker", filterSubject: "r3cons.>");
for (var i = 0; i < 10; i++)
await cluster.PublishAsync("r3cons.task", $"job-{i}");
var batch = await cluster.FetchAsync("R3CONS", "worker", 10);
batch.Messages.Count.ShouldBe(10);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterStreamLimits server/jetstream_cluster_1_test.go:3248
// ---------------------------------------------------------------
[Fact]
public async Task Single_token_wildcard_subject_captures_correct_messages()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("STARWILD", ["sw.*"], replicas: 3);
await cluster.PublishAsync("sw.alpha", "msg1");
await cluster.PublishAsync("sw.beta", "msg2");
var state = await cluster.GetStreamStateAsync("STARWILD");
state.Messages.ShouldBe(2UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterInterestRetention server/jetstream_cluster_1_test.go:2109
// ---------------------------------------------------------------
[Fact]
public async Task Interest_retention_R3_stream_stores_messages()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
var resp = cluster.CreateStreamDirect(new StreamConfig
{
Name = "INTR3",
Subjects = ["intr3.>"],
Replicas = 3,
Retention = RetentionPolicy.Interest,
});
resp.Error.ShouldBeNull();
for (var i = 0; i < 5; i++)
await cluster.PublishAsync("intr3.event", $"msg-{i}");
var state = await cluster.GetStreamStateAsync("INTR3");
state.Messages.ShouldBe(5UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterWorkQueueRetention server/jetstream_cluster_1_test.go:2179
// ---------------------------------------------------------------
[Fact]
public async Task Work_queue_retention_R1_stream_removes_acked_messages()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
var resp = cluster.CreateStreamDirect(new StreamConfig
{
Name = "WQR1",
Subjects = ["wqr1.>"],
Replicas = 1,
Retention = RetentionPolicy.WorkQueue,
MaxConsumers = 1,
});
resp.Error.ShouldBeNull();
await cluster.CreateConsumerAsync("WQR1", "proc", filterSubject: "wqr1.>", ackPolicy: AckPolicy.All);
await cluster.PublishAsync("wqr1.task", "job-1");
var stateBefore = await cluster.GetStreamStateAsync("WQR1");
stateBefore.Messages.ShouldBe(1UL);
cluster.AckAll("WQR1", "proc", 1);
await cluster.PublishAsync("wqr1.task", "job-2");
var stateAfter = await cluster.GetStreamStateAsync("WQR1");
stateAfter.Messages.ShouldBe(1UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterStreamInfoList server/jetstream_cluster_1_test.go:1284
// ---------------------------------------------------------------
[Fact]
public async Task Ten_streams_with_mixed_replicas_all_tracked()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
for (var i = 0; i < 10; i++)
{
var replicas = i % 2 == 0 ? 1 : 3;
await cluster.CreateStreamAsync($"TEN{i}", [$"ten{i}.>"], replicas: replicas);
}
var names = await cluster.RequestAsync(JetStreamApiSubjects.StreamNames, "{}");
names.StreamNames.ShouldNotBeNull();
names.StreamNames!.Count.ShouldBe(10);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterStreamUpdate server/jetstream_cluster_1_test.go:1433
// ---------------------------------------------------------------
[Fact]
public async Task Re_creating_deleted_stream_works_correctly()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("RECREATE", ["recreate.>"], replicas: 3);
for (var i = 0; i < 5; i++)
await cluster.PublishAsync("recreate.event", $"msg-{i}");
await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}RECREATE", "{}");
// Re-create the stream
var resp = await cluster.CreateStreamAsync("RECREATE", ["recreate.>"], replicas: 3);
resp.Error.ShouldBeNull();
resp.StreamInfo.ShouldNotBeNull();
// New stream starts empty
var state = await cluster.GetStreamStateAsync("RECREATE");
state.Messages.ShouldBe(0UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299
// ---------------------------------------------------------------
[Fact]
public async Task R3_stream_state_accurate_after_sequential_publishes()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("SEQSTATE", ["seqstate.>"], replicas: 3);
for (var i = 1; i <= 30; i++)
await cluster.PublishAsync("seqstate.event", $"msg-{i}");
var state = await cluster.GetStreamStateAsync("SEQSTATE");
state.Messages.ShouldBe(30UL);
state.FirstSeq.ShouldBe(1UL);
state.LastSeq.ShouldBe(30UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterStreamLimits server/jetstream_cluster_1_test.go:3248
// ---------------------------------------------------------------
[Fact]
public async Task Max_msgs_per_subject_enforced_in_R3_stream()
{
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
var resp = cluster.CreateStreamDirect(new StreamConfig
{
Name = "MPSUB",
Subjects = ["mpsub.>"],
Replicas = 3,
MaxMsgsPer = 2,
});
resp.Error.ShouldBeNull();
for (var i = 0; i < 6; i++)
await cluster.PublishAsync("mpsub.topic", $"msg-{i}");
var state = await cluster.GetStreamStateAsync("MPSUB");
state.Messages.ShouldBeLessThanOrEqualTo(2UL);
}
}