feat: Waves 3-5 — FileStore, RAFT, JetStream clustering, and concurrency tests
Add comprehensive Go-parity test coverage across 3 subsystems: - FileStore: basic CRUD, limits, purge, recovery, subjects, encryption, compression, MemStore (161 tests, 24 skipped for not-yet-implemented) - RAFT: core types, wire format, election, log replication, snapshots (95 tests) - JetStream Clustering: meta controller, stream/consumer replica groups, concurrency stress tests (90 tests) Total: ~346 new test annotations across 17 files (+7,557 lines) Full suite: 2,606 passing, 0 failures, 27 skipped
This commit is contained in:
@@ -0,0 +1,522 @@
|
||||
// Go parity: golang/nats-server/server/jetstream_cluster_1_test.go
|
||||
// golang/nats-server/server/jetstream_cluster_2_test.go
|
||||
// Covers: per-consumer RAFT groups, consumer assignment, ack state
|
||||
// replication, consumer failover, pull request forwarding, ephemeral
|
||||
// consumer lifecycle, delivery policy handling.
|
||||
using System.Collections.Concurrent;
|
||||
using System.Reflection;
|
||||
using System.Text;
|
||||
using NATS.Server.JetStream;
|
||||
using NATS.Server.JetStream.Api;
|
||||
using NATS.Server.JetStream.Cluster;
|
||||
using NATS.Server.JetStream.Consumers;
|
||||
using NATS.Server.JetStream.Models;
|
||||
using NATS.Server.JetStream.Publish;
|
||||
using NATS.Server.JetStream.Storage;
|
||||
|
||||
namespace NATS.Server.Tests.JetStream.Cluster;
|
||||
|
||||
/// <summary>
|
||||
/// Tests covering per-consumer RAFT groups: consumer assignment, ack state
|
||||
/// replication, consumer failover, pull request forwarding, ephemeral
|
||||
/// consumer lifecycle, and delivery policy handling in clustered mode.
|
||||
/// Ported from Go jetstream_cluster_1_test.go and jetstream_cluster_2_test.go.
|
||||
/// </summary>
|
||||
public class ConsumerReplicaGroupTests
|
||||
{
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterConsumerState server/jetstream_cluster_1_test.go:700
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Consumer_creation_registers_in_manager()
|
||||
{
|
||||
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
|
||||
await fx.CreateStreamAsync("REG", ["reg.>"], replicas: 3);
|
||||
|
||||
var resp = await fx.CreateConsumerAsync("REG", "d1");
|
||||
resp.ConsumerInfo.ShouldNotBeNull();
|
||||
resp.ConsumerInfo!.Config.DurableName.ShouldBe("d1");
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterConsumerState server/jetstream_cluster_1_test.go:700
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Consumer_pending_count_tracks_unacked_messages()
|
||||
{
|
||||
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
|
||||
await fx.CreateStreamAsync("PEND", ["pend.>"], replicas: 3);
|
||||
await fx.CreateConsumerAsync("PEND", "acker", filterSubject: "pend.>", ackPolicy: AckPolicy.Explicit);
|
||||
|
||||
for (var i = 0; i < 5; i++)
|
||||
await fx.PublishAsync("pend.event", $"msg-{i}");
|
||||
|
||||
var batch = await fx.FetchAsync("PEND", "acker", 3);
|
||||
batch.Messages.Count.ShouldBe(3);
|
||||
|
||||
fx.GetPendingCount("PEND", "acker").ShouldBe(3);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterFullConsumerState server/jetstream_cluster_1_test.go:795
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task AckAll_reduces_pending_count()
|
||||
{
|
||||
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
|
||||
await fx.CreateStreamAsync("ACKRED", ["ar.>"], replicas: 3);
|
||||
await fx.CreateConsumerAsync("ACKRED", "acker", filterSubject: "ar.>", ackPolicy: AckPolicy.All);
|
||||
|
||||
for (var i = 0; i < 10; i++)
|
||||
await fx.PublishAsync("ar.event", $"msg-{i}");
|
||||
|
||||
await fx.FetchAsync("ACKRED", "acker", 10);
|
||||
fx.AckAll("ACKRED", "acker", 7);
|
||||
|
||||
fx.GetPendingCount("ACKRED", "acker").ShouldBe(3);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterFullConsumerState server/jetstream_cluster_1_test.go:795
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task AckAll_to_last_seq_clears_all_pending()
|
||||
{
|
||||
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
|
||||
await fx.CreateStreamAsync("ACKCLEAR", ["ac.>"], replicas: 3);
|
||||
await fx.CreateConsumerAsync("ACKCLEAR", "acker", filterSubject: "ac.>", ackPolicy: AckPolicy.All);
|
||||
|
||||
for (var i = 0; i < 5; i++)
|
||||
await fx.PublishAsync("ac.event", $"msg-{i}");
|
||||
|
||||
await fx.FetchAsync("ACKCLEAR", "acker", 5);
|
||||
fx.AckAll("ACKCLEAR", "acker", 5);
|
||||
|
||||
fx.GetPendingCount("ACKCLEAR", "acker").ShouldBe(0);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterConsumerRedeliveredInfo server/jetstream_cluster_1_test.go:659
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Consumer_redelivery_sets_redelivered_flag()
|
||||
{
|
||||
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
|
||||
await fx.CreateStreamAsync("REDEL", ["rd.>"], replicas: 3);
|
||||
await fx.CreateConsumerAsync("REDEL", "rdc", filterSubject: "rd.>",
|
||||
ackPolicy: AckPolicy.Explicit, ackWaitMs: 1, maxDeliver: 5);
|
||||
|
||||
await fx.PublishAsync("rd.event", "will-redeliver");
|
||||
|
||||
var batch1 = await fx.FetchAsync("REDEL", "rdc", 1);
|
||||
batch1.Messages.Count.ShouldBe(1);
|
||||
batch1.Messages[0].Redelivered.ShouldBeFalse();
|
||||
|
||||
await Task.Delay(50);
|
||||
|
||||
var batch2 = await fx.FetchAsync("REDEL", "rdc", 1);
|
||||
batch2.Messages.Count.ShouldBe(1);
|
||||
batch2.Messages[0].Redelivered.ShouldBeTrue();
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterRestoreSingleConsumer server/jetstream_cluster_1_test.go:1028
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Consumer_survives_stream_leader_stepdown()
|
||||
{
|
||||
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
|
||||
await fx.CreateStreamAsync("CSURV", ["csv.>"], replicas: 3);
|
||||
await fx.CreateConsumerAsync("CSURV", "durable1", filterSubject: "csv.>");
|
||||
|
||||
for (var i = 0; i < 10; i++)
|
||||
await fx.PublishAsync("csv.event", $"msg-{i}");
|
||||
|
||||
var batch1 = await fx.FetchAsync("CSURV", "durable1", 5);
|
||||
batch1.Messages.Count.ShouldBe(5);
|
||||
|
||||
await fx.StepDownStreamLeaderAsync("CSURV");
|
||||
|
||||
var batch2 = await fx.FetchAsync("CSURV", "durable1", 5);
|
||||
batch2.Messages.Count.ShouldBe(5);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterPullConsumerLeakedSubs server/jetstream_cluster_2_test.go:2239
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Pull_consumer_fetch_returns_correct_batch()
|
||||
{
|
||||
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
|
||||
await fx.CreateStreamAsync("PULL", ["pull.>"], replicas: 3);
|
||||
await fx.CreateConsumerAsync("PULL", "puller", filterSubject: "pull.>");
|
||||
|
||||
for (var i = 0; i < 20; i++)
|
||||
await fx.PublishAsync("pull.event", $"msg-{i}");
|
||||
|
||||
var batch = await fx.FetchAsync("PULL", "puller", 5);
|
||||
batch.Messages.Count.ShouldBe(5);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterConsumerLastActiveReporting server/jetstream_cluster_2_test.go:2371
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Consumer_info_returns_correct_config()
|
||||
{
|
||||
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
|
||||
await fx.CreateStreamAsync("INFO", ["ci.>"], replicas: 3);
|
||||
await fx.CreateConsumerAsync("INFO", "info_dur", filterSubject: "ci.>", ackPolicy: AckPolicy.Explicit);
|
||||
|
||||
var info = await fx.GetConsumerInfoAsync("INFO", "info_dur");
|
||||
info.Config.DurableName.ShouldBe("info_dur");
|
||||
info.Config.AckPolicy.ShouldBe(AckPolicy.Explicit);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterEphemeralConsumerNoImmediateInterest server/jetstream_cluster_1_test.go:2481
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Ephemeral_consumer_creation_succeeds()
|
||||
{
|
||||
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
|
||||
await fx.CreateStreamAsync("EPHEM", ["eph.>"], replicas: 3);
|
||||
|
||||
var resp = await fx.CreateConsumerAsync("EPHEM", null, ephemeral: true);
|
||||
resp.ConsumerInfo.ShouldNotBeNull();
|
||||
resp.ConsumerInfo!.Config.DurableName.ShouldNotBeNullOrEmpty();
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterEphemeralConsumersNotReplicated server/jetstream_cluster_1_test.go:2599
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Ephemeral_consumers_get_unique_names()
|
||||
{
|
||||
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
|
||||
await fx.CreateStreamAsync("UNIQ", ["u.>"], replicas: 3);
|
||||
|
||||
var resp1 = await fx.CreateConsumerAsync("UNIQ", null, ephemeral: true);
|
||||
var resp2 = await fx.CreateConsumerAsync("UNIQ", null, ephemeral: true);
|
||||
|
||||
resp1.ConsumerInfo!.Config.DurableName
|
||||
.ShouldNotBe(resp2.ConsumerInfo!.Config.DurableName);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterCreateConcurrentDurableConsumers server/jetstream_cluster_2_test.go:1572
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Durable_consumer_create_is_idempotent()
|
||||
{
|
||||
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
|
||||
await fx.CreateStreamAsync("IDEMP", ["id.>"], replicas: 3);
|
||||
|
||||
var resp1 = await fx.CreateConsumerAsync("IDEMP", "same");
|
||||
var resp2 = await fx.CreateConsumerAsync("IDEMP", "same");
|
||||
|
||||
resp1.ConsumerInfo!.Config.DurableName.ShouldBe("same");
|
||||
resp2.ConsumerInfo!.Config.DurableName.ShouldBe("same");
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterMaxConsumers server/jetstream_cluster_2_test.go:1978
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Consumer_delete_succeeds()
|
||||
{
|
||||
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
|
||||
await fx.CreateStreamAsync("DEL", ["del.>"], replicas: 3);
|
||||
await fx.CreateConsumerAsync("DEL", "to_delete");
|
||||
|
||||
var resp = await fx.RequestAsync($"{JetStreamApiSubjects.ConsumerDelete}DEL.to_delete", "{}");
|
||||
resp.Success.ShouldBeTrue();
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterConsumerPause server/jetstream_cluster_1_test.go:4203
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Consumer_pause_and_resume_via_api()
|
||||
{
|
||||
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
|
||||
await fx.CreateStreamAsync("PAUSE", ["pause.>"], replicas: 3);
|
||||
await fx.CreateConsumerAsync("PAUSE", "pausable");
|
||||
|
||||
var pause = await fx.RequestAsync($"{JetStreamApiSubjects.ConsumerPause}PAUSE.pausable", """{"pause":true}""");
|
||||
pause.Success.ShouldBeTrue();
|
||||
|
||||
var resume = await fx.RequestAsync($"{JetStreamApiSubjects.ConsumerPause}PAUSE.pausable", """{"pause":false}""");
|
||||
resume.Success.ShouldBeTrue();
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterConsumerResetPendingDeliveriesOnMaxAckPendingUpdate
|
||||
// server/jetstream_cluster_1_test.go:8696
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Consumer_reset_resets_sequence_to_beginning()
|
||||
{
|
||||
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
|
||||
await fx.CreateStreamAsync("RESET", ["reset.>"], replicas: 3);
|
||||
await fx.CreateConsumerAsync("RESET", "resettable", filterSubject: "reset.>");
|
||||
|
||||
for (var i = 0; i < 5; i++)
|
||||
await fx.PublishAsync("reset.event", $"msg-{i}");
|
||||
|
||||
// Advance the consumer
|
||||
await fx.FetchAsync("RESET", "resettable", 3);
|
||||
|
||||
// Reset
|
||||
var resp = await fx.RequestAsync($"{JetStreamApiSubjects.ConsumerReset}RESET.resettable", "{}");
|
||||
resp.Success.ShouldBeTrue();
|
||||
|
||||
// After reset should re-deliver from sequence 1
|
||||
var batch = await fx.FetchAsync("RESET", "resettable", 5);
|
||||
batch.Messages.Count.ShouldBe(5);
|
||||
batch.Messages[0].Sequence.ShouldBe(1UL);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterFlowControlRequiresHeartbeats server/jetstream_cluster_2_test.go:2712
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Consumer_with_filter_subject_delivers_matching_only()
|
||||
{
|
||||
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
|
||||
await fx.CreateStreamAsync("FILT", ["filt.>"], replicas: 3);
|
||||
await fx.CreateConsumerAsync("FILT", "filtered", filterSubject: "filt.alpha");
|
||||
|
||||
await fx.PublishAsync("filt.alpha", "match");
|
||||
await fx.PublishAsync("filt.beta", "no-match");
|
||||
await fx.PublishAsync("filt.alpha", "match2");
|
||||
|
||||
var batch = await fx.FetchAsync("FILT", "filtered", 10);
|
||||
batch.Messages.Count.ShouldBe(2);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterConsumerDeliverPolicy server/jetstream_cluster_2_test.go:550
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task DeliverPolicy_Last_starts_at_last_message()
|
||||
{
|
||||
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
|
||||
await fx.CreateStreamAsync("DLAST", ["dl.>"], replicas: 3);
|
||||
|
||||
for (var i = 0; i < 5; i++)
|
||||
await fx.PublishAsync("dl.event", $"msg-{i}");
|
||||
|
||||
await fx.CreateConsumerAsync("DLAST", "last_c", filterSubject: "dl.>",
|
||||
deliverPolicy: DeliverPolicy.Last);
|
||||
|
||||
var batch = await fx.FetchAsync("DLAST", "last_c", 10);
|
||||
batch.Messages.Count.ShouldBe(1);
|
||||
batch.Messages[0].Sequence.ShouldBe(5UL);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterConsumerDeliverPolicy server/jetstream_cluster_2_test.go:550
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task DeliverPolicy_New_skips_existing_messages()
|
||||
{
|
||||
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
|
||||
await fx.CreateStreamAsync("DNEW", ["dn.>"], replicas: 3);
|
||||
|
||||
for (var i = 0; i < 5; i++)
|
||||
await fx.PublishAsync("dn.event", $"msg-{i}");
|
||||
|
||||
await fx.CreateConsumerAsync("DNEW", "new_c", filterSubject: "dn.>",
|
||||
deliverPolicy: DeliverPolicy.New);
|
||||
|
||||
var batch = await fx.FetchAsync("DNEW", "new_c", 10);
|
||||
batch.Messages.Count.ShouldBe(0);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterConsumerDeliverPolicy server/jetstream_cluster_2_test.go:550
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task DeliverPolicy_ByStartSequence_starts_at_given_seq()
|
||||
{
|
||||
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
|
||||
await fx.CreateStreamAsync("DSTART", ["ds.>"], replicas: 3);
|
||||
|
||||
for (var i = 0; i < 10; i++)
|
||||
await fx.PublishAsync("ds.event", $"msg-{i}");
|
||||
|
||||
await fx.CreateConsumerAsync("DSTART", "start_c", filterSubject: "ds.>",
|
||||
deliverPolicy: DeliverPolicy.ByStartSequence, optStartSeq: 7);
|
||||
|
||||
var batch = await fx.FetchAsync("DSTART", "start_c", 10);
|
||||
batch.Messages.Count.ShouldBe(4);
|
||||
batch.Messages[0].Sequence.ShouldBe(7UL);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterConsumerUnpin server/jetstream_cluster_1_test.go:4109
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Consumer_unpin_api_returns_success()
|
||||
{
|
||||
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
|
||||
await fx.CreateStreamAsync("UNPIN", ["unpin.>"], replicas: 3);
|
||||
await fx.CreateConsumerAsync("UNPIN", "pinned");
|
||||
|
||||
var resp = await fx.RequestAsync($"{JetStreamApiSubjects.ConsumerUnpin}UNPIN.pinned", "{}");
|
||||
resp.Success.ShouldBeTrue();
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterConsumerLeaderStepdown server/jetstream_cluster_2_test.go:1400
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Consumer_leader_stepdown_api_returns_success()
|
||||
{
|
||||
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
|
||||
await fx.CreateStreamAsync("CLS", ["cls.>"], replicas: 3);
|
||||
await fx.CreateConsumerAsync("CLS", "dur1");
|
||||
|
||||
var resp = await fx.RequestAsync($"{JetStreamApiSubjects.ConsumerLeaderStepdown}CLS.dur1", "{}");
|
||||
resp.Success.ShouldBeTrue();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Self-contained fixture for consumer replica group tests.
|
||||
/// </summary>
|
||||
internal sealed class ConsumerReplicaFixture : IAsyncDisposable
|
||||
{
|
||||
private readonly JetStreamMetaGroup _metaGroup;
|
||||
private readonly StreamManager _streamManager;
|
||||
private readonly ConsumerManager _consumerManager;
|
||||
private readonly JetStreamApiRouter _router;
|
||||
private readonly JetStreamPublisher _publisher;
|
||||
|
||||
private ConsumerReplicaFixture(
|
||||
JetStreamMetaGroup metaGroup,
|
||||
StreamManager streamManager,
|
||||
ConsumerManager consumerManager,
|
||||
JetStreamApiRouter router,
|
||||
JetStreamPublisher publisher)
|
||||
{
|
||||
_metaGroup = metaGroup;
|
||||
_streamManager = streamManager;
|
||||
_consumerManager = consumerManager;
|
||||
_router = router;
|
||||
_publisher = publisher;
|
||||
}
|
||||
|
||||
public static Task<ConsumerReplicaFixture> StartAsync(int nodes)
|
||||
{
|
||||
var meta = new JetStreamMetaGroup(nodes);
|
||||
var consumerManager = new ConsumerManager(meta);
|
||||
var streamManager = new StreamManager(meta, consumerManager: consumerManager);
|
||||
var router = new JetStreamApiRouter(streamManager, consumerManager, meta);
|
||||
var publisher = new JetStreamPublisher(streamManager);
|
||||
return Task.FromResult(new ConsumerReplicaFixture(meta, streamManager, consumerManager, router, publisher));
|
||||
}
|
||||
|
||||
public Task CreateStreamAsync(string name, string[] subjects, int replicas)
|
||||
{
|
||||
var response = _streamManager.CreateOrUpdate(new StreamConfig
|
||||
{
|
||||
Name = name,
|
||||
Subjects = [.. subjects],
|
||||
Replicas = replicas,
|
||||
});
|
||||
if (response.Error is not null)
|
||||
throw new InvalidOperationException(response.Error.Description);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task<JetStreamApiResponse> CreateConsumerAsync(
|
||||
string stream,
|
||||
string? durableName,
|
||||
string? filterSubject = null,
|
||||
AckPolicy ackPolicy = AckPolicy.None,
|
||||
int ackWaitMs = 30_000,
|
||||
int maxDeliver = 1,
|
||||
bool ephemeral = false,
|
||||
DeliverPolicy deliverPolicy = DeliverPolicy.All,
|
||||
ulong optStartSeq = 0)
|
||||
{
|
||||
var config = new ConsumerConfig
|
||||
{
|
||||
DurableName = durableName ?? string.Empty,
|
||||
AckPolicy = ackPolicy,
|
||||
AckWaitMs = ackWaitMs,
|
||||
MaxDeliver = maxDeliver,
|
||||
Ephemeral = ephemeral,
|
||||
DeliverPolicy = deliverPolicy,
|
||||
OptStartSeq = optStartSeq,
|
||||
};
|
||||
if (!string.IsNullOrWhiteSpace(filterSubject))
|
||||
config.FilterSubject = filterSubject;
|
||||
|
||||
return Task.FromResult(_consumerManager.CreateOrUpdate(stream, config));
|
||||
}
|
||||
|
||||
public Task<PubAck> PublishAsync(string subject, string payload)
|
||||
{
|
||||
if (_publisher.TryCapture(subject, Encoding.UTF8.GetBytes(payload), null, out var ack))
|
||||
{
|
||||
if (ack.ErrorCode == null && _streamManager.TryGet(ack.Stream, out var handle))
|
||||
{
|
||||
var stored = handle.Store.LoadAsync(ack.Seq, default).GetAwaiter().GetResult();
|
||||
if (stored != null)
|
||||
_consumerManager.OnPublished(ack.Stream, stored);
|
||||
}
|
||||
|
||||
return Task.FromResult(ack);
|
||||
}
|
||||
|
||||
throw new InvalidOperationException($"Publish to '{subject}' did not match a stream.");
|
||||
}
|
||||
|
||||
public Task<PullFetchBatch> FetchAsync(string stream, string durableName, int batch)
|
||||
=> _consumerManager.FetchAsync(stream, durableName, batch, _streamManager, default).AsTask();
|
||||
|
||||
public void AckAll(string stream, string durableName, ulong sequence)
|
||||
=> _consumerManager.AckAll(stream, durableName, sequence);
|
||||
|
||||
public int GetPendingCount(string stream, string durableName)
|
||||
=> _consumerManager.GetPendingCount(stream, durableName);
|
||||
|
||||
public Task<JetStreamConsumerInfo> GetConsumerInfoAsync(string stream, string durableName)
|
||||
{
|
||||
var resp = _consumerManager.GetInfo(stream, durableName);
|
||||
if (resp.ConsumerInfo == null)
|
||||
throw new InvalidOperationException("Consumer not found.");
|
||||
return Task.FromResult(resp.ConsumerInfo);
|
||||
}
|
||||
|
||||
public Task StepDownStreamLeaderAsync(string stream)
|
||||
=> _streamManager.StepDownStreamLeaderAsync(stream, default);
|
||||
|
||||
public Task<JetStreamApiResponse> RequestAsync(string subject, string payload)
|
||||
=> Task.FromResult(_router.Route(subject, Encoding.UTF8.GetBytes(payload)));
|
||||
|
||||
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
|
||||
}
|
||||
@@ -0,0 +1,631 @@
|
||||
// Go parity: golang/nats-server/server/jetstream_cluster_1_test.go
|
||||
// Covers: meta group leadership, API routing through meta leader,
|
||||
// stream/consumer placement decisions, asset distribution,
|
||||
// R1/R3 placement, preferred tags, cluster-wide operations.
|
||||
using System.Collections.Concurrent;
|
||||
using System.Reflection;
|
||||
using System.Text;
|
||||
using NATS.Server.JetStream;
|
||||
using NATS.Server.JetStream.Api;
|
||||
using NATS.Server.JetStream.Cluster;
|
||||
using NATS.Server.JetStream.Consumers;
|
||||
using NATS.Server.JetStream.Models;
|
||||
using NATS.Server.JetStream.Publish;
|
||||
|
||||
namespace NATS.Server.Tests.JetStream.Cluster;
|
||||
|
||||
/// <summary>
|
||||
/// Tests covering JetStream meta controller leadership, API routing through
|
||||
/// the meta leader, stream/consumer placement decisions, asset distribution,
|
||||
/// R1/R3 placement, and cluster-wide operations.
|
||||
/// Ported from Go jetstream_cluster_1_test.go and jetstream_cluster_2_test.go.
|
||||
/// </summary>
|
||||
public class JetStreamMetaControllerTests
|
||||
{
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterLeader server/jetstream_cluster_1_test.go:73
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void Meta_group_initial_leader_is_meta_1()
|
||||
{
|
||||
var meta = new JetStreamMetaGroup(3);
|
||||
var state = meta.GetState();
|
||||
|
||||
state.LeaderId.ShouldBe("meta-1");
|
||||
state.ClusterSize.ShouldBe(3);
|
||||
state.LeadershipVersion.ShouldBe(1);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterLeader server/jetstream_cluster_1_test.go:73
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void Meta_group_stepdown_advances_leader_id()
|
||||
{
|
||||
var meta = new JetStreamMetaGroup(3);
|
||||
meta.GetState().LeaderId.ShouldBe("meta-1");
|
||||
|
||||
meta.StepDown();
|
||||
meta.GetState().LeaderId.ShouldBe("meta-2");
|
||||
|
||||
meta.StepDown();
|
||||
meta.GetState().LeaderId.ShouldBe("meta-3");
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterLeader server/jetstream_cluster_1_test.go:73
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void Meta_group_stepdown_wraps_around_to_first_node()
|
||||
{
|
||||
var meta = new JetStreamMetaGroup(3);
|
||||
|
||||
meta.StepDown(); // meta-2
|
||||
meta.StepDown(); // meta-3
|
||||
meta.StepDown(); // meta-1 (wrap)
|
||||
|
||||
meta.GetState().LeaderId.ShouldBe("meta-1");
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterLeader server/jetstream_cluster_1_test.go:73
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void Meta_group_leadership_version_increments_on_each_stepdown()
|
||||
{
|
||||
var meta = new JetStreamMetaGroup(3);
|
||||
|
||||
for (var i = 1; i <= 5; i++)
|
||||
{
|
||||
meta.GetState().LeadershipVersion.ShouldBe(i);
|
||||
meta.StepDown();
|
||||
}
|
||||
|
||||
meta.GetState().LeadershipVersion.ShouldBe(6);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterConfig server/jetstream_cluster_1_test.go:43
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Meta_group_propose_creates_stream_record()
|
||||
{
|
||||
var meta = new JetStreamMetaGroup(3);
|
||||
|
||||
await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "TEST" }, default);
|
||||
|
||||
var state = meta.GetState();
|
||||
state.Streams.Count.ShouldBe(1);
|
||||
state.Streams.ShouldContain("TEST");
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Meta_group_tracks_multiple_stream_proposals()
|
||||
{
|
||||
var meta = new JetStreamMetaGroup(5);
|
||||
|
||||
for (var i = 0; i < 10; i++)
|
||||
await meta.ProposeCreateStreamAsync(new StreamConfig { Name = $"S{i}" }, default);
|
||||
|
||||
var state = meta.GetState();
|
||||
state.Streams.Count.ShouldBe(10);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Meta_group_streams_are_sorted_alphabetically()
|
||||
{
|
||||
var meta = new JetStreamMetaGroup(3);
|
||||
|
||||
await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "ZULU" }, default);
|
||||
await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "ALPHA" }, default);
|
||||
await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "MIKE" }, default);
|
||||
|
||||
var state = meta.GetState();
|
||||
state.Streams[0].ShouldBe("ALPHA");
|
||||
state.Streams[1].ShouldBe("MIKE");
|
||||
state.Streams[2].ShouldBe("ZULU");
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterConfig server/jetstream_cluster_1_test.go:43
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Meta_group_duplicate_stream_proposal_is_idempotent()
|
||||
{
|
||||
var meta = new JetStreamMetaGroup(3);
|
||||
|
||||
await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "DUP" }, default);
|
||||
await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "DUP" }, default);
|
||||
|
||||
meta.GetState().Streams.Count.ShouldBe(1);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterExpandCluster server/jetstream_cluster_1_test.go:86
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void Meta_group_single_node_cluster_has_leader()
|
||||
{
|
||||
var meta = new JetStreamMetaGroup(1);
|
||||
var state = meta.GetState();
|
||||
|
||||
state.ClusterSize.ShouldBe(1);
|
||||
state.LeaderId.ShouldBe("meta-1");
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterExpandCluster server/jetstream_cluster_1_test.go:86
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void Meta_group_single_node_stepdown_returns_to_same_leader()
|
||||
{
|
||||
var meta = new JetStreamMetaGroup(1);
|
||||
meta.StepDown();
|
||||
|
||||
meta.GetState().LeaderId.ShouldBe("meta-1");
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterLeaderStepdown server/jetstream_cluster_1_test.go:5464
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Api_meta_leader_stepdown_changes_leader_and_preserves_streams()
|
||||
{
|
||||
await using var fx = await MetaControllerFixture.StartAsync(nodes: 3);
|
||||
|
||||
await fx.CreateStreamAsync("KEEPME", ["keep.>"], replicas: 3);
|
||||
|
||||
var before = fx.GetMetaState();
|
||||
var leaderBefore = before.LeaderId;
|
||||
|
||||
var resp = await fx.RequestAsync(JetStreamApiSubjects.MetaLeaderStepdown, "{}");
|
||||
resp.Success.ShouldBeTrue();
|
||||
|
||||
var after = fx.GetMetaState();
|
||||
after.LeaderId.ShouldNotBe(leaderBefore);
|
||||
after.Streams.ShouldContain("KEEPME");
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterAccountInfo server/jetstream_cluster_1_test.go:94
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Api_routing_through_meta_leader_returns_account_info()
|
||||
{
|
||||
await using var fx = await MetaControllerFixture.StartAsync(nodes: 3);
|
||||
|
||||
await fx.CreateStreamAsync("A", ["a.>"], replicas: 3);
|
||||
await fx.CreateStreamAsync("B", ["b.>"], replicas: 3);
|
||||
await fx.CreateConsumerAsync("A", "c1");
|
||||
|
||||
var resp = await fx.RequestAsync(JetStreamApiSubjects.Info, "{}");
|
||||
resp.AccountInfo.ShouldNotBeNull();
|
||||
resp.AccountInfo!.Streams.ShouldBe(2);
|
||||
resp.AccountInfo.Consumers.ShouldBe(1);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterStreamLimitWithAccountDefaults server/jetstream_cluster_1_test.go:124
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Placement_planner_r1_creates_single_node_placement()
|
||||
{
|
||||
var planner = new AssetPlacementPlanner(nodes: 5);
|
||||
var placement = planner.PlanReplicas(replicas: 1);
|
||||
|
||||
placement.Count.ShouldBe(1);
|
||||
placement[0].ShouldBe(1);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void Placement_planner_r3_creates_three_node_placement()
|
||||
{
|
||||
var planner = new AssetPlacementPlanner(nodes: 5);
|
||||
var placement = planner.PlanReplicas(replicas: 3);
|
||||
|
||||
placement.Count.ShouldBe(3);
|
||||
placement[0].ShouldBe(1);
|
||||
placement[1].ShouldBe(2);
|
||||
placement[2].ShouldBe(3);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void Placement_planner_caps_replicas_at_cluster_size()
|
||||
{
|
||||
var planner = new AssetPlacementPlanner(nodes: 3);
|
||||
var placement = planner.PlanReplicas(replicas: 7);
|
||||
|
||||
placement.Count.ShouldBe(3);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void Placement_planner_negative_replicas_returns_one()
|
||||
{
|
||||
var planner = new AssetPlacementPlanner(nodes: 5);
|
||||
var placement = planner.PlanReplicas(replicas: -1);
|
||||
|
||||
placement.Count.ShouldBe(1);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterConfig server/jetstream_cluster_1_test.go:43
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void Placement_planner_zero_nodes_returns_one()
|
||||
{
|
||||
var planner = new AssetPlacementPlanner(nodes: 0);
|
||||
var placement = planner.PlanReplicas(replicas: 3);
|
||||
|
||||
placement.Count.ShouldBe(1);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterStreamCreate server/jetstream_cluster_1_test.go:160
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Stream_create_via_meta_leader_sets_replica_group()
|
||||
{
|
||||
await using var fx = await MetaControllerFixture.StartAsync(nodes: 5);
|
||||
|
||||
var resp = await fx.CreateStreamAsync("REPGRP", ["rg.>"], replicas: 3);
|
||||
resp.Error.ShouldBeNull();
|
||||
|
||||
// The stream manager creates a replica group internally
|
||||
var meta = fx.GetMetaState();
|
||||
meta.Streams.ShouldContain("REPGRP");
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterMaxStreamsReached server/jetstream_cluster_1_test.go:3177
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Multiple_stream_creates_all_tracked_in_meta_group()
|
||||
{
|
||||
await using var fx = await MetaControllerFixture.StartAsync(nodes: 3);
|
||||
|
||||
for (var i = 0; i < 20; i++)
|
||||
await fx.CreateStreamAsync($"MS{i}", [$"ms{i}.>"], replicas: 3);
|
||||
|
||||
var meta = fx.GetMetaState();
|
||||
meta.Streams.Count.ShouldBe(20);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterStreamNames server/jetstream_cluster_1_test.go:1284
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Stream_names_api_returns_all_streams_through_meta_leader()
|
||||
{
|
||||
await using var fx = await MetaControllerFixture.StartAsync(nodes: 3);
|
||||
|
||||
await fx.CreateStreamAsync("S1", ["s1.>"], replicas: 3);
|
||||
await fx.CreateStreamAsync("S2", ["s2.>"], replicas: 1);
|
||||
await fx.CreateStreamAsync("S3", ["s3.>"], replicas: 3);
|
||||
|
||||
var resp = await fx.RequestAsync(JetStreamApiSubjects.StreamNames, "{}");
|
||||
resp.StreamNames.ShouldNotBeNull();
|
||||
resp.StreamNames!.Count.ShouldBe(3);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterDelete server/jetstream_cluster_1_test.go:472
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Stream_delete_removes_from_active_names()
|
||||
{
|
||||
await using var fx = await MetaControllerFixture.StartAsync(nodes: 3);
|
||||
|
||||
await fx.CreateStreamAsync("DEL1", ["d1.>"], replicas: 3);
|
||||
await fx.CreateStreamAsync("DEL2", ["d2.>"], replicas: 3);
|
||||
|
||||
var del = await fx.RequestAsync($"{JetStreamApiSubjects.StreamDelete}DEL1", "{}");
|
||||
del.Success.ShouldBeTrue();
|
||||
|
||||
var names = await fx.RequestAsync(JetStreamApiSubjects.StreamNames, "{}");
|
||||
names.StreamNames!.Count.ShouldBe(1);
|
||||
names.StreamNames.ShouldContain("DEL2");
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterDoubleAdd server/jetstream_cluster_1_test.go:1551
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Stream_create_idempotent_with_same_config()
|
||||
{
|
||||
await using var fx = await MetaControllerFixture.StartAsync(nodes: 3);
|
||||
|
||||
var first = await fx.CreateStreamAsync("IDEM", ["idem.>"], replicas: 3);
|
||||
first.Error.ShouldBeNull();
|
||||
|
||||
var second = await fx.CreateStreamAsync("IDEM", ["idem.>"], replicas: 3);
|
||||
second.Error.ShouldBeNull();
|
||||
|
||||
var meta = fx.GetMetaState();
|
||||
meta.Streams.Count.ShouldBe(1);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterStreamInfoList server/jetstream_cluster_1_test.go:1284
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Consumer_create_tracked_in_cluster()
|
||||
{
|
||||
await using var fx = await MetaControllerFixture.StartAsync(nodes: 3);
|
||||
|
||||
await fx.CreateStreamAsync("CC", ["cc.>"], replicas: 3);
|
||||
await fx.CreateConsumerAsync("CC", "d1");
|
||||
await fx.CreateConsumerAsync("CC", "d2");
|
||||
|
||||
var names = await fx.RequestAsync($"{JetStreamApiSubjects.ConsumerNames}CC", "{}");
|
||||
names.ConsumerNames.ShouldNotBeNull();
|
||||
names.ConsumerNames!.Count.ShouldBe(2);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterPeerRemovalAPI server/jetstream_cluster_1_test.go:3469
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Peer_removal_api_routed_through_meta()
|
||||
{
|
||||
await using var fx = await MetaControllerFixture.StartAsync(nodes: 3);
|
||||
await fx.CreateStreamAsync("PR", ["pr.>"], replicas: 3);
|
||||
|
||||
var resp = await fx.RequestAsync($"{JetStreamApiSubjects.StreamPeerRemove}PR", """{"peer":"n2"}""");
|
||||
resp.Success.ShouldBeTrue();
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterMetaSnapshotsAndCatchup server/jetstream_cluster_1_test.go:833
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Meta_state_preserved_across_multiple_stepdowns()
|
||||
{
|
||||
await using var fx = await MetaControllerFixture.StartAsync(nodes: 3);
|
||||
|
||||
await fx.CreateStreamAsync("M1", ["m1.>"], replicas: 3);
|
||||
await fx.CreateStreamAsync("M2", ["m2.>"], replicas: 3);
|
||||
|
||||
(await fx.RequestAsync(JetStreamApiSubjects.MetaLeaderStepdown, "{}")).Success.ShouldBeTrue();
|
||||
(await fx.RequestAsync(JetStreamApiSubjects.MetaLeaderStepdown, "{}")).Success.ShouldBeTrue();
|
||||
(await fx.RequestAsync(JetStreamApiSubjects.MetaLeaderStepdown, "{}")).Success.ShouldBeTrue();
|
||||
|
||||
var state = fx.GetMetaState();
|
||||
state.Streams.ShouldContain("M1");
|
||||
state.Streams.ShouldContain("M2");
|
||||
state.LeadershipVersion.ShouldBe(4);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterMetaSnapshotsMultiChange server/jetstream_cluster_1_test.go:881
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Create_and_delete_across_stepdowns_reflected_in_names()
|
||||
{
|
||||
await using var fx = await MetaControllerFixture.StartAsync(nodes: 3);
|
||||
|
||||
await fx.CreateStreamAsync("A", ["a.>"], replicas: 3);
|
||||
(await fx.RequestAsync(JetStreamApiSubjects.MetaLeaderStepdown, "{}")).Success.ShouldBeTrue();
|
||||
|
||||
await fx.CreateStreamAsync("B", ["b.>"], replicas: 3);
|
||||
(await fx.RequestAsync($"{JetStreamApiSubjects.StreamDelete}A", "{}")).Success.ShouldBeTrue();
|
||||
|
||||
(await fx.RequestAsync(JetStreamApiSubjects.MetaLeaderStepdown, "{}")).Success.ShouldBeTrue();
|
||||
|
||||
var names = await fx.RequestAsync(JetStreamApiSubjects.StreamNames, "{}");
|
||||
names.StreamNames!.Count.ShouldBe(1);
|
||||
names.StreamNames.ShouldContain("B");
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterStreamCreate server/jetstream_cluster_1_test.go:160
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Stream_info_for_nonexistent_stream_returns_404()
|
||||
{
|
||||
await using var fx = await MetaControllerFixture.StartAsync(nodes: 3);
|
||||
|
||||
var resp = await fx.RequestAsync($"{JetStreamApiSubjects.StreamInfo}MISSING", "{}");
|
||||
resp.Error.ShouldNotBeNull();
|
||||
resp.Error!.Code.ShouldBe(404);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterConsumerCreate server/jetstream_cluster_1_test.go:700
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Consumer_info_for_nonexistent_consumer_returns_404()
|
||||
{
|
||||
await using var fx = await MetaControllerFixture.StartAsync(nodes: 3);
|
||||
await fx.CreateStreamAsync("NOCON", ["nc.>"], replicas: 3);
|
||||
|
||||
var resp = await fx.RequestAsync($"{JetStreamApiSubjects.ConsumerInfo}NOCON.MISSING", "{}");
|
||||
resp.Error.ShouldNotBeNull();
|
||||
resp.Error!.Code.ShouldBe(404);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterStreamCreate server/jetstream_cluster_1_test.go:160
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void Stream_create_without_name_returns_error()
|
||||
{
|
||||
var streamManager = new StreamManager();
|
||||
var resp = streamManager.CreateOrUpdate(new StreamConfig { Name = "" });
|
||||
|
||||
resp.Error.ShouldNotBeNull();
|
||||
resp.Error!.Description.ShouldContain("name");
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterStreamCreate server/jetstream_cluster_1_test.go:160
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Unknown_api_subject_returns_404()
|
||||
{
|
||||
await using var fx = await MetaControllerFixture.StartAsync(nodes: 3);
|
||||
|
||||
var resp = await fx.RequestAsync("$JS.API.UNKNOWN.SUBJECT", "{}");
|
||||
resp.Error.ShouldNotBeNull();
|
||||
resp.Error!.Code.ShouldBe(404);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterAccountPurge server/jetstream_cluster_1_test.go:3891
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Account_purge_via_meta_returns_success()
|
||||
{
|
||||
await using var fx = await MetaControllerFixture.StartAsync(nodes: 3);
|
||||
await fx.CreateStreamAsync("P", ["p.>"], replicas: 3);
|
||||
|
||||
var resp = await fx.RequestAsync($"{JetStreamApiSubjects.AccountPurge}GLOBAL", "{}");
|
||||
resp.Success.ShouldBeTrue();
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterServerRemove server/jetstream_cluster_1_test.go:3620
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Server_remove_via_meta_returns_success()
|
||||
{
|
||||
await using var fx = await MetaControllerFixture.StartAsync(nodes: 3);
|
||||
|
||||
var resp = await fx.RequestAsync(JetStreamApiSubjects.ServerRemove, "{}");
|
||||
resp.Success.ShouldBeTrue();
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterAccountStreamMove server/jetstream_cluster_1_test.go:3750
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Account_stream_move_via_meta_returns_success()
|
||||
{
|
||||
await using var fx = await MetaControllerFixture.StartAsync(nodes: 3);
|
||||
|
||||
var resp = await fx.RequestAsync($"{JetStreamApiSubjects.AccountStreamMove}TEST", "{}");
|
||||
resp.Success.ShouldBeTrue();
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterAccountStreamMoveCancel server/jetstream_cluster_1_test.go:3780
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Account_stream_move_cancel_via_meta_returns_success()
|
||||
{
|
||||
await using var fx = await MetaControllerFixture.StartAsync(nodes: 3);
|
||||
|
||||
var resp = await fx.RequestAsync($"{JetStreamApiSubjects.AccountStreamMoveCancel}TEST", "{}");
|
||||
resp.Success.ShouldBeTrue();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Self-contained fixture for JetStream meta controller tests.
|
||||
/// </summary>
|
||||
internal sealed class MetaControllerFixture : IAsyncDisposable
|
||||
{
|
||||
private readonly JetStreamMetaGroup _metaGroup;
|
||||
private readonly StreamManager _streamManager;
|
||||
private readonly ConsumerManager _consumerManager;
|
||||
private readonly JetStreamApiRouter _router;
|
||||
private readonly JetStreamPublisher _publisher;
|
||||
|
||||
private MetaControllerFixture(
|
||||
JetStreamMetaGroup metaGroup,
|
||||
StreamManager streamManager,
|
||||
ConsumerManager consumerManager,
|
||||
JetStreamApiRouter router,
|
||||
JetStreamPublisher publisher)
|
||||
{
|
||||
_metaGroup = metaGroup;
|
||||
_streamManager = streamManager;
|
||||
_consumerManager = consumerManager;
|
||||
_router = router;
|
||||
_publisher = publisher;
|
||||
}
|
||||
|
||||
public static Task<MetaControllerFixture> StartAsync(int nodes)
|
||||
{
|
||||
var meta = new JetStreamMetaGroup(nodes);
|
||||
var consumerManager = new ConsumerManager(meta);
|
||||
var streamManager = new StreamManager(meta, consumerManager: consumerManager);
|
||||
var router = new JetStreamApiRouter(streamManager, consumerManager, meta);
|
||||
var publisher = new JetStreamPublisher(streamManager);
|
||||
return Task.FromResult(new MetaControllerFixture(meta, streamManager, consumerManager, router, publisher));
|
||||
}
|
||||
|
||||
public Task<JetStreamApiResponse> CreateStreamAsync(string name, string[] subjects, int replicas)
|
||||
{
|
||||
var response = _streamManager.CreateOrUpdate(new StreamConfig
|
||||
{
|
||||
Name = name,
|
||||
Subjects = [.. subjects],
|
||||
Replicas = replicas,
|
||||
});
|
||||
return Task.FromResult(response);
|
||||
}
|
||||
|
||||
public Task<JetStreamApiResponse> CreateConsumerAsync(string stream, string durableName)
|
||||
{
|
||||
return Task.FromResult(_consumerManager.CreateOrUpdate(stream, new ConsumerConfig
|
||||
{
|
||||
DurableName = durableName,
|
||||
}));
|
||||
}
|
||||
|
||||
public MetaGroupState GetMetaState() => _metaGroup.GetState();
|
||||
|
||||
public Task<JetStreamApiResponse> RequestAsync(string subject, string payload)
|
||||
=> Task.FromResult(_router.Route(subject, Encoding.UTF8.GetBytes(payload)));
|
||||
|
||||
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
|
||||
}
|
||||
@@ -0,0 +1,381 @@
|
||||
// Go parity: golang/nats-server/server/jetstream_cluster_1_test.go
|
||||
// Covers: per-stream RAFT groups, stream assignment proposal, replica count
|
||||
// enforcement, leader election for stream group, data replication across
|
||||
// stream replicas, placement scaling, stepdown behavior.
|
||||
using System.Collections.Concurrent;
|
||||
using System.Reflection;
|
||||
using System.Text;
|
||||
using NATS.Server.JetStream;
|
||||
using NATS.Server.JetStream.Api;
|
||||
using NATS.Server.JetStream.Cluster;
|
||||
using NATS.Server.JetStream.Models;
|
||||
using NATS.Server.JetStream.Publish;
|
||||
using NATS.Server.Raft;
|
||||
|
||||
namespace NATS.Server.Tests.JetStream.Cluster;
|
||||
|
||||
/// <summary>
|
||||
/// Tests covering per-stream RAFT groups: stream assignment proposal,
|
||||
/// replica count enforcement, leader election, data replication across
|
||||
/// replicas, placement scaling, and stepdown behavior.
|
||||
/// Ported from Go jetstream_cluster_1_test.go.
|
||||
/// </summary>
|
||||
public class StreamReplicaGroupTests
|
||||
{
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void Replica_group_r3_creates_three_raft_nodes()
|
||||
{
|
||||
var group = new StreamReplicaGroup("TEST", replicas: 3);
|
||||
|
||||
group.Nodes.Count.ShouldBe(3);
|
||||
group.StreamName.ShouldBe("TEST");
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterSingleReplicaStreams server/jetstream_cluster_1_test.go:223
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void Replica_group_r1_creates_single_raft_node()
|
||||
{
|
||||
var group = new StreamReplicaGroup("R1S", replicas: 1);
|
||||
|
||||
group.Nodes.Count.ShouldBe(1);
|
||||
group.Leader.IsLeader.ShouldBeTrue();
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void Replica_group_zero_replicas_creates_one_node()
|
||||
{
|
||||
var group = new StreamReplicaGroup("ZERO", replicas: 0);
|
||||
|
||||
group.Nodes.Count.ShouldBe(1);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void Replica_group_negative_replicas_creates_one_node()
|
||||
{
|
||||
var group = new StreamReplicaGroup("NEG", replicas: -1);
|
||||
|
||||
group.Nodes.Count.ShouldBe(1);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterStreamLeaderStepDown server/jetstream_cluster_1_test.go:4925
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void Replica_group_elects_initial_leader_on_creation()
|
||||
{
|
||||
var group = new StreamReplicaGroup("ELECT", replicas: 3);
|
||||
|
||||
group.Leader.ShouldNotBeNull();
|
||||
group.Leader.IsLeader.ShouldBeTrue();
|
||||
group.Leader.Role.ShouldBe(RaftRole.Leader);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterStreamLeaderStepDown server/jetstream_cluster_1_test.go:4925
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void Replica_group_leader_id_follows_naming_convention()
|
||||
{
|
||||
var group = new StreamReplicaGroup("MY_STREAM", replicas: 3);
|
||||
|
||||
group.Leader.Id.ShouldStartWith("my_stream-r");
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterStreamLeaderStepDown server/jetstream_cluster_1_test.go:4925
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Replica_group_stepdown_changes_leader()
|
||||
{
|
||||
var group = new StreamReplicaGroup("STEP", replicas: 3);
|
||||
var before = group.Leader.Id;
|
||||
|
||||
await group.StepDownAsync(default);
|
||||
|
||||
group.Leader.Id.ShouldNotBe(before);
|
||||
group.Leader.IsLeader.ShouldBeTrue();
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterLeader server/jetstream_cluster_1_test.go:73
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Replica_group_consecutive_stepdowns_cycle_leaders()
|
||||
{
|
||||
var group = new StreamReplicaGroup("CYCLE", replicas: 3);
|
||||
var leaders = new List<string> { group.Leader.Id };
|
||||
|
||||
await group.StepDownAsync(default);
|
||||
leaders.Add(group.Leader.Id);
|
||||
|
||||
await group.StepDownAsync(default);
|
||||
leaders.Add(group.Leader.Id);
|
||||
|
||||
leaders[1].ShouldNotBe(leaders[0]);
|
||||
leaders[2].ShouldNotBe(leaders[1]);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterLeader server/jetstream_cluster_1_test.go:73
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Replica_group_stepdown_wraps_around()
|
||||
{
|
||||
var group = new StreamReplicaGroup("WRAP", replicas: 3);
|
||||
var ids = new HashSet<string>();
|
||||
|
||||
for (var i = 0; i < 6; i++)
|
||||
{
|
||||
ids.Add(group.Leader.Id);
|
||||
await group.StepDownAsync(default);
|
||||
}
|
||||
|
||||
// Should have cycled through all 3 nodes
|
||||
ids.Count.ShouldBe(3);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterStreamLeaderStepDown server/jetstream_cluster_1_test.go:4925
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Replica_group_leader_accepts_proposals()
|
||||
{
|
||||
var group = new StreamReplicaGroup("PROPOSE", replicas: 3);
|
||||
|
||||
var index = await group.ProposeAsync("PUB test.1", default);
|
||||
|
||||
index.ShouldBeGreaterThan(0);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterStreamLeaderStepDown server/jetstream_cluster_1_test.go:4925
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Replica_group_sequential_proposals_have_increasing_indices()
|
||||
{
|
||||
var group = new StreamReplicaGroup("SEQPROP", replicas: 3);
|
||||
|
||||
var idx1 = await group.ProposeAsync("PUB test.1", default);
|
||||
var idx2 = await group.ProposeAsync("PUB test.2", default);
|
||||
var idx3 = await group.ProposeAsync("PUB test.3", default);
|
||||
|
||||
idx2.ShouldBeGreaterThan(idx1);
|
||||
idx3.ShouldBeGreaterThan(idx2);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterStreamNormalCatchup server/jetstream_cluster_1_test.go:1607
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Replica_group_proposals_survive_stepdown()
|
||||
{
|
||||
var group = new StreamReplicaGroup("SURVIVE", replicas: 3);
|
||||
|
||||
await group.ProposeAsync("PUB a.1", default);
|
||||
await group.ProposeAsync("PUB a.2", default);
|
||||
|
||||
await group.StepDownAsync(default);
|
||||
|
||||
// New leader should accept proposals
|
||||
var idx = await group.ProposeAsync("PUB a.3", default);
|
||||
idx.ShouldBeGreaterThan(0);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterExpandCluster server/jetstream_cluster_1_test.go:86
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Replica_group_apply_placement_scales_up()
|
||||
{
|
||||
var group = new StreamReplicaGroup("SCALEUP", replicas: 1);
|
||||
group.Nodes.Count.ShouldBe(1);
|
||||
|
||||
await group.ApplyPlacementAsync([1, 2, 3], default);
|
||||
|
||||
group.Nodes.Count.ShouldBe(3);
|
||||
group.Leader.IsLeader.ShouldBeTrue();
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterExpandCluster server/jetstream_cluster_1_test.go:86
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Replica_group_apply_placement_scales_down()
|
||||
{
|
||||
var group = new StreamReplicaGroup("SCALEDN", replicas: 5);
|
||||
group.Nodes.Count.ShouldBe(5);
|
||||
|
||||
await group.ApplyPlacementAsync([1, 2], default);
|
||||
|
||||
group.Nodes.Count.ShouldBe(2);
|
||||
group.Leader.IsLeader.ShouldBeTrue();
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterExpandCluster server/jetstream_cluster_1_test.go:86
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Replica_group_apply_same_size_is_noop()
|
||||
{
|
||||
var group = new StreamReplicaGroup("NOOP", replicas: 3);
|
||||
var leaderBefore = group.Leader.Id;
|
||||
|
||||
await group.ApplyPlacementAsync([1, 2, 3], default);
|
||||
|
||||
group.Nodes.Count.ShouldBe(3);
|
||||
// Leader should remain the same since placement is a no-op
|
||||
group.Leader.Id.ShouldBe(leaderBefore);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterMultiReplicaStreams server/jetstream_cluster_1_test.go:299
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void Replica_group_all_nodes_share_cluster()
|
||||
{
|
||||
var group = new StreamReplicaGroup("SHARED", replicas: 3);
|
||||
|
||||
foreach (var node in group.Nodes)
|
||||
node.Members.Count.ShouldBe(3);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterStreamSynchedTimeStamps server/jetstream_cluster_1_test.go:977
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Stream_manager_creates_replica_group_on_stream_create()
|
||||
{
|
||||
var meta = new JetStreamMetaGroup(3);
|
||||
var streamManager = new StreamManager(meta);
|
||||
|
||||
streamManager.CreateOrUpdate(new StreamConfig
|
||||
{
|
||||
Name = "REPL",
|
||||
Subjects = ["repl.>"],
|
||||
Replicas = 3,
|
||||
});
|
||||
|
||||
// Use reflection to verify internal replica group was created
|
||||
var field = typeof(StreamManager)
|
||||
.GetField("_replicaGroups", BindingFlags.NonPublic | BindingFlags.Instance)!;
|
||||
var groups = (ConcurrentDictionary<string, StreamReplicaGroup>)field.GetValue(streamManager)!;
|
||||
|
||||
groups.ContainsKey("REPL").ShouldBeTrue();
|
||||
groups["REPL"].Nodes.Count.ShouldBe(3);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterStreamLeaderStepDown server/jetstream_cluster_1_test.go:4925
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task Stream_leader_stepdown_via_stream_manager_changes_leader()
|
||||
{
|
||||
var meta = new JetStreamMetaGroup(3);
|
||||
var streamManager = new StreamManager(meta);
|
||||
|
||||
streamManager.CreateOrUpdate(new StreamConfig
|
||||
{
|
||||
Name = "SD",
|
||||
Subjects = ["sd.>"],
|
||||
Replicas = 3,
|
||||
});
|
||||
|
||||
var field = typeof(StreamManager)
|
||||
.GetField("_replicaGroups", BindingFlags.NonPublic | BindingFlags.Instance)!;
|
||||
var groups = (ConcurrentDictionary<string, StreamReplicaGroup>)field.GetValue(streamManager)!;
|
||||
var leaderBefore = groups["SD"].Leader.Id;
|
||||
|
||||
await streamManager.StepDownStreamLeaderAsync("SD", default);
|
||||
|
||||
groups["SD"].Leader.Id.ShouldNotBe(leaderBefore);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterStreamDelete server/jetstream_cluster_1_test.go:472
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void Stream_delete_removes_replica_group()
|
||||
{
|
||||
var meta = new JetStreamMetaGroup(3);
|
||||
var streamManager = new StreamManager(meta);
|
||||
|
||||
streamManager.CreateOrUpdate(new StreamConfig
|
||||
{
|
||||
Name = "DELRG",
|
||||
Subjects = ["delrg.>"],
|
||||
Replicas = 3,
|
||||
});
|
||||
|
||||
streamManager.Delete("DELRG").ShouldBeTrue();
|
||||
|
||||
var field = typeof(StreamManager)
|
||||
.GetField("_replicaGroups", BindingFlags.NonPublic | BindingFlags.Instance)!;
|
||||
var groups = (ConcurrentDictionary<string, StreamReplicaGroup>)field.GetValue(streamManager)!;
|
||||
|
||||
groups.ContainsKey("DELRG").ShouldBeFalse();
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Go: TestJetStreamClusterStreamUpdate server/jetstream_cluster_1_test.go:1433
|
||||
// ---------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void Stream_update_preserves_replica_group_when_replicas_unchanged()
|
||||
{
|
||||
var meta = new JetStreamMetaGroup(3);
|
||||
var streamManager = new StreamManager(meta);
|
||||
|
||||
streamManager.CreateOrUpdate(new StreamConfig
|
||||
{
|
||||
Name = "UPD",
|
||||
Subjects = ["upd.>"],
|
||||
Replicas = 3,
|
||||
});
|
||||
|
||||
var field = typeof(StreamManager)
|
||||
.GetField("_replicaGroups", BindingFlags.NonPublic | BindingFlags.Instance)!;
|
||||
var groups = (ConcurrentDictionary<string, StreamReplicaGroup>)field.GetValue(streamManager)!;
|
||||
var groupBefore = groups["UPD"];
|
||||
|
||||
streamManager.CreateOrUpdate(new StreamConfig
|
||||
{
|
||||
Name = "UPD",
|
||||
Subjects = ["upd.>", "upd2.>"],
|
||||
Replicas = 3,
|
||||
MaxMsgs = 100,
|
||||
});
|
||||
|
||||
// Same replica count means the group reference should be the same
|
||||
groups["UPD"].ShouldBeSameAs(groupBefore);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user