Files
natsdotnet/tests/NATS.Server.JetStream.Tests/JetStream/Cluster/ConsumerReplicaGroupTests.cs
Joseph Doherty 78b4bc2486 refactor: extract NATS.Server.JetStream.Tests project
Move 225 JetStream-related test files from NATS.Server.Tests into a
dedicated NATS.Server.JetStream.Tests project. This includes root-level
JetStream*.cs files, storage test files (FileStore, MemStore,
StreamStoreContract), and the full JetStream/ subfolder tree (Api,
Cluster, Consumers, MirrorSource, Snapshots, Storage, Streams).

Updated all namespaces, added InternalsVisibleTo, registered in the
solution file, and added the JETSTREAM_INTEGRATION_MATRIX define.
2026-03-12 15:58:10 -04:00

523 lines
21 KiB
C#

// Go parity: golang/nats-server/server/jetstream_cluster_1_test.go
// golang/nats-server/server/jetstream_cluster_2_test.go
// Covers: per-consumer RAFT groups, consumer assignment, ack state
// replication, consumer failover, pull request forwarding, ephemeral
// consumer lifecycle, delivery policy handling.
using System.Collections.Concurrent;
using System.Reflection;
using System.Text;
using NATS.Server.JetStream;
using NATS.Server.JetStream.Api;
using NATS.Server.JetStream.Cluster;
using NATS.Server.JetStream.Consumers;
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Publish;
using NATS.Server.JetStream.Storage;
namespace NATS.Server.JetStream.Tests.JetStream.Cluster;
/// <summary>
/// Tests covering per-consumer RAFT groups: consumer assignment, ack state
/// replication, consumer failover, pull request forwarding, ephemeral
/// consumer lifecycle, and delivery policy handling in clustered mode.
/// Ported from Go jetstream_cluster_1_test.go and jetstream_cluster_2_test.go.
/// </summary>
public class ConsumerReplicaGroupTests
{
// ---------------------------------------------------------------
// Go: TestJetStreamClusterConsumerState server/jetstream_cluster_1_test.go:700
// ---------------------------------------------------------------
[Fact]
public async Task Consumer_creation_registers_in_manager()
{
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("REG", ["reg.>"], replicas: 3);
var resp = await fx.CreateConsumerAsync("REG", "d1");
resp.ConsumerInfo.ShouldNotBeNull();
resp.ConsumerInfo!.Config.DurableName.ShouldBe("d1");
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterConsumerState server/jetstream_cluster_1_test.go:700
// ---------------------------------------------------------------
[Fact]
public async Task Consumer_pending_count_tracks_unacked_messages()
{
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("PEND", ["pend.>"], replicas: 3);
await fx.CreateConsumerAsync("PEND", "acker", filterSubject: "pend.>", ackPolicy: AckPolicy.Explicit);
for (var i = 0; i < 5; i++)
await fx.PublishAsync("pend.event", $"msg-{i}");
var batch = await fx.FetchAsync("PEND", "acker", 3);
batch.Messages.Count.ShouldBe(3);
fx.GetPendingCount("PEND", "acker").ShouldBe(3);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterFullConsumerState server/jetstream_cluster_1_test.go:795
// ---------------------------------------------------------------
[Fact]
public async Task AckAll_reduces_pending_count()
{
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("ACKRED", ["ar.>"], replicas: 3);
await fx.CreateConsumerAsync("ACKRED", "acker", filterSubject: "ar.>", ackPolicy: AckPolicy.All);
for (var i = 0; i < 10; i++)
await fx.PublishAsync("ar.event", $"msg-{i}");
await fx.FetchAsync("ACKRED", "acker", 10);
fx.AckAll("ACKRED", "acker", 7);
fx.GetPendingCount("ACKRED", "acker").ShouldBe(3);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterFullConsumerState server/jetstream_cluster_1_test.go:795
// ---------------------------------------------------------------
[Fact]
public async Task AckAll_to_last_seq_clears_all_pending()
{
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("ACKCLEAR", ["ac.>"], replicas: 3);
await fx.CreateConsumerAsync("ACKCLEAR", "acker", filterSubject: "ac.>", ackPolicy: AckPolicy.All);
for (var i = 0; i < 5; i++)
await fx.PublishAsync("ac.event", $"msg-{i}");
await fx.FetchAsync("ACKCLEAR", "acker", 5);
fx.AckAll("ACKCLEAR", "acker", 5);
fx.GetPendingCount("ACKCLEAR", "acker").ShouldBe(0);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterConsumerRedeliveredInfo server/jetstream_cluster_1_test.go:659
// ---------------------------------------------------------------
[Fact]
public async Task Consumer_redelivery_sets_redelivered_flag()
{
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("REDEL", ["rd.>"], replicas: 3);
await fx.CreateConsumerAsync("REDEL", "rdc", filterSubject: "rd.>",
ackPolicy: AckPolicy.Explicit, ackWaitMs: 1, maxDeliver: 5);
await fx.PublishAsync("rd.event", "will-redeliver");
var batch1 = await fx.FetchAsync("REDEL", "rdc", 1);
batch1.Messages.Count.ShouldBe(1);
batch1.Messages[0].Redelivered.ShouldBeFalse();
await Task.Delay(50);
var batch2 = await fx.FetchAsync("REDEL", "rdc", 1);
batch2.Messages.Count.ShouldBe(1);
batch2.Messages[0].Redelivered.ShouldBeTrue();
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterRestoreSingleConsumer server/jetstream_cluster_1_test.go:1028
// ---------------------------------------------------------------
[Fact]
public async Task Consumer_survives_stream_leader_stepdown()
{
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("CSURV", ["csv.>"], replicas: 3);
await fx.CreateConsumerAsync("CSURV", "durable1", filterSubject: "csv.>");
for (var i = 0; i < 10; i++)
await fx.PublishAsync("csv.event", $"msg-{i}");
var batch1 = await fx.FetchAsync("CSURV", "durable1", 5);
batch1.Messages.Count.ShouldBe(5);
await fx.StepDownStreamLeaderAsync("CSURV");
var batch2 = await fx.FetchAsync("CSURV", "durable1", 5);
batch2.Messages.Count.ShouldBe(5);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterPullConsumerLeakedSubs server/jetstream_cluster_2_test.go:2239
// ---------------------------------------------------------------
[Fact]
public async Task Pull_consumer_fetch_returns_correct_batch()
{
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("PULL", ["pull.>"], replicas: 3);
await fx.CreateConsumerAsync("PULL", "puller", filterSubject: "pull.>");
for (var i = 0; i < 20; i++)
await fx.PublishAsync("pull.event", $"msg-{i}");
var batch = await fx.FetchAsync("PULL", "puller", 5);
batch.Messages.Count.ShouldBe(5);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterConsumerLastActiveReporting server/jetstream_cluster_2_test.go:2371
// ---------------------------------------------------------------
[Fact]
public async Task Consumer_info_returns_correct_config()
{
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("INFO", ["ci.>"], replicas: 3);
await fx.CreateConsumerAsync("INFO", "info_dur", filterSubject: "ci.>", ackPolicy: AckPolicy.Explicit);
var info = await fx.GetConsumerInfoAsync("INFO", "info_dur");
info.Config.DurableName.ShouldBe("info_dur");
info.Config.AckPolicy.ShouldBe(AckPolicy.Explicit);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterEphemeralConsumerNoImmediateInterest server/jetstream_cluster_1_test.go:2481
// ---------------------------------------------------------------
[Fact]
public async Task Ephemeral_consumer_creation_succeeds()
{
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("EPHEM", ["eph.>"], replicas: 3);
var resp = await fx.CreateConsumerAsync("EPHEM", null, ephemeral: true);
resp.ConsumerInfo.ShouldNotBeNull();
resp.ConsumerInfo!.Config.DurableName.ShouldNotBeNullOrEmpty();
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterEphemeralConsumersNotReplicated server/jetstream_cluster_1_test.go:2599
// ---------------------------------------------------------------
[Fact]
public async Task Ephemeral_consumers_get_unique_names()
{
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("UNIQ", ["u.>"], replicas: 3);
var resp1 = await fx.CreateConsumerAsync("UNIQ", null, ephemeral: true);
var resp2 = await fx.CreateConsumerAsync("UNIQ", null, ephemeral: true);
resp1.ConsumerInfo!.Config.DurableName
.ShouldNotBe(resp2.ConsumerInfo!.Config.DurableName);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterCreateConcurrentDurableConsumers server/jetstream_cluster_2_test.go:1572
// ---------------------------------------------------------------
[Fact]
public async Task Durable_consumer_create_is_idempotent()
{
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("IDEMP", ["id.>"], replicas: 3);
var resp1 = await fx.CreateConsumerAsync("IDEMP", "same");
var resp2 = await fx.CreateConsumerAsync("IDEMP", "same");
resp1.ConsumerInfo!.Config.DurableName.ShouldBe("same");
resp2.ConsumerInfo!.Config.DurableName.ShouldBe("same");
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterMaxConsumers server/jetstream_cluster_2_test.go:1978
// ---------------------------------------------------------------
[Fact]
public async Task Consumer_delete_succeeds()
{
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("DEL", ["del.>"], replicas: 3);
await fx.CreateConsumerAsync("DEL", "to_delete");
var resp = await fx.RequestAsync($"{JetStreamApiSubjects.ConsumerDelete}DEL.to_delete", "{}");
resp.Success.ShouldBeTrue();
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterConsumerPause server/jetstream_cluster_1_test.go:4203
// ---------------------------------------------------------------
[Fact]
public async Task Consumer_pause_and_resume_via_api()
{
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("PAUSE", ["pause.>"], replicas: 3);
await fx.CreateConsumerAsync("PAUSE", "pausable");
var pause = await fx.RequestAsync($"{JetStreamApiSubjects.ConsumerPause}PAUSE.pausable", """{"pause":true}""");
pause.Success.ShouldBeTrue();
var resume = await fx.RequestAsync($"{JetStreamApiSubjects.ConsumerPause}PAUSE.pausable", """{"pause":false}""");
resume.Success.ShouldBeTrue();
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterConsumerResetPendingDeliveriesOnMaxAckPendingUpdate
// server/jetstream_cluster_1_test.go:8696
// ---------------------------------------------------------------
[Fact]
public async Task Consumer_reset_resets_sequence_to_beginning()
{
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("RESET", ["reset.>"], replicas: 3);
await fx.CreateConsumerAsync("RESET", "resettable", filterSubject: "reset.>");
for (var i = 0; i < 5; i++)
await fx.PublishAsync("reset.event", $"msg-{i}");
// Advance the consumer
await fx.FetchAsync("RESET", "resettable", 3);
// Reset
var resp = await fx.RequestAsync($"{JetStreamApiSubjects.ConsumerReset}RESET.resettable", "{}");
resp.Success.ShouldBeTrue();
// After reset should re-deliver from sequence 1
var batch = await fx.FetchAsync("RESET", "resettable", 5);
batch.Messages.Count.ShouldBe(5);
batch.Messages[0].Sequence.ShouldBe(1UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterFlowControlRequiresHeartbeats server/jetstream_cluster_2_test.go:2712
// ---------------------------------------------------------------
[Fact]
public async Task Consumer_with_filter_subject_delivers_matching_only()
{
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("FILT", ["filt.>"], replicas: 3);
await fx.CreateConsumerAsync("FILT", "filtered", filterSubject: "filt.alpha");
await fx.PublishAsync("filt.alpha", "match");
await fx.PublishAsync("filt.beta", "no-match");
await fx.PublishAsync("filt.alpha", "match2");
var batch = await fx.FetchAsync("FILT", "filtered", 10);
batch.Messages.Count.ShouldBe(2);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterConsumerDeliverPolicy server/jetstream_cluster_2_test.go:550
// ---------------------------------------------------------------
[Fact]
public async Task DeliverPolicy_Last_starts_at_last_message()
{
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("DLAST", ["dl.>"], replicas: 3);
for (var i = 0; i < 5; i++)
await fx.PublishAsync("dl.event", $"msg-{i}");
await fx.CreateConsumerAsync("DLAST", "last_c", filterSubject: "dl.>",
deliverPolicy: DeliverPolicy.Last);
var batch = await fx.FetchAsync("DLAST", "last_c", 10);
batch.Messages.Count.ShouldBe(1);
batch.Messages[0].Sequence.ShouldBe(5UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterConsumerDeliverPolicy server/jetstream_cluster_2_test.go:550
// ---------------------------------------------------------------
[Fact]
public async Task DeliverPolicy_New_skips_existing_messages()
{
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("DNEW", ["dn.>"], replicas: 3);
for (var i = 0; i < 5; i++)
await fx.PublishAsync("dn.event", $"msg-{i}");
await fx.CreateConsumerAsync("DNEW", "new_c", filterSubject: "dn.>",
deliverPolicy: DeliverPolicy.New);
var batch = await fx.FetchAsync("DNEW", "new_c", 10);
batch.Messages.Count.ShouldBe(0);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterConsumerDeliverPolicy server/jetstream_cluster_2_test.go:550
// ---------------------------------------------------------------
[Fact]
public async Task DeliverPolicy_ByStartSequence_starts_at_given_seq()
{
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("DSTART", ["ds.>"], replicas: 3);
for (var i = 0; i < 10; i++)
await fx.PublishAsync("ds.event", $"msg-{i}");
await fx.CreateConsumerAsync("DSTART", "start_c", filterSubject: "ds.>",
deliverPolicy: DeliverPolicy.ByStartSequence, optStartSeq: 7);
var batch = await fx.FetchAsync("DSTART", "start_c", 10);
batch.Messages.Count.ShouldBe(4);
batch.Messages[0].Sequence.ShouldBe(7UL);
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterConsumerUnpin server/jetstream_cluster_1_test.go:4109
// ---------------------------------------------------------------
[Fact]
public async Task Consumer_unpin_api_returns_success()
{
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("UNPIN", ["unpin.>"], replicas: 3);
await fx.CreateConsumerAsync("UNPIN", "pinned");
var resp = await fx.RequestAsync($"{JetStreamApiSubjects.ConsumerUnpin}UNPIN.pinned", "{}");
resp.Success.ShouldBeTrue();
}
// ---------------------------------------------------------------
// Go: TestJetStreamClusterConsumerLeaderStepdown server/jetstream_cluster_2_test.go:1400
// ---------------------------------------------------------------
[Fact]
public async Task Consumer_leader_stepdown_api_returns_success()
{
await using var fx = await ConsumerReplicaFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("CLS", ["cls.>"], replicas: 3);
await fx.CreateConsumerAsync("CLS", "dur1");
var resp = await fx.RequestAsync($"{JetStreamApiSubjects.ConsumerLeaderStepdown}CLS.dur1", "{}");
resp.Success.ShouldBeTrue();
}
}
/// <summary>
/// Self-contained fixture for consumer replica group tests.
/// </summary>
internal sealed class ConsumerReplicaFixture : IAsyncDisposable
{
private readonly JetStreamMetaGroup _metaGroup;
private readonly StreamManager _streamManager;
private readonly ConsumerManager _consumerManager;
private readonly JetStreamApiRouter _router;
private readonly JetStreamPublisher _publisher;
private ConsumerReplicaFixture(
JetStreamMetaGroup metaGroup,
StreamManager streamManager,
ConsumerManager consumerManager,
JetStreamApiRouter router,
JetStreamPublisher publisher)
{
_metaGroup = metaGroup;
_streamManager = streamManager;
_consumerManager = consumerManager;
_router = router;
_publisher = publisher;
}
public static Task<ConsumerReplicaFixture> StartAsync(int nodes)
{
var meta = new JetStreamMetaGroup(nodes);
var consumerManager = new ConsumerManager(meta);
var streamManager = new StreamManager(meta, consumerManager: consumerManager);
var router = new JetStreamApiRouter(streamManager, consumerManager, meta);
var publisher = new JetStreamPublisher(streamManager);
return Task.FromResult(new ConsumerReplicaFixture(meta, streamManager, consumerManager, router, publisher));
}
public Task CreateStreamAsync(string name, string[] subjects, int replicas)
{
var response = _streamManager.CreateOrUpdate(new StreamConfig
{
Name = name,
Subjects = [.. subjects],
Replicas = replicas,
});
if (response.Error is not null)
throw new InvalidOperationException(response.Error.Description);
return Task.CompletedTask;
}
public Task<JetStreamApiResponse> CreateConsumerAsync(
string stream,
string? durableName,
string? filterSubject = null,
AckPolicy ackPolicy = AckPolicy.None,
int ackWaitMs = 30_000,
int maxDeliver = 1,
bool ephemeral = false,
DeliverPolicy deliverPolicy = DeliverPolicy.All,
ulong optStartSeq = 0)
{
var config = new ConsumerConfig
{
DurableName = durableName ?? string.Empty,
AckPolicy = ackPolicy,
AckWaitMs = ackWaitMs,
MaxDeliver = maxDeliver,
Ephemeral = ephemeral,
DeliverPolicy = deliverPolicy,
OptStartSeq = optStartSeq,
};
if (!string.IsNullOrWhiteSpace(filterSubject))
config.FilterSubject = filterSubject;
return Task.FromResult(_consumerManager.CreateOrUpdate(stream, config));
}
public Task<PubAck> PublishAsync(string subject, string payload)
{
if (_publisher.TryCapture(subject, Encoding.UTF8.GetBytes(payload), null, out var ack))
{
if (ack.ErrorCode == null && _streamManager.TryGet(ack.Stream, out var handle))
{
var stored = handle.Store.LoadAsync(ack.Seq, default).GetAwaiter().GetResult();
if (stored != null)
_consumerManager.OnPublished(ack.Stream, stored);
}
return Task.FromResult(ack);
}
throw new InvalidOperationException($"Publish to '{subject}' did not match a stream.");
}
public Task<PullFetchBatch> FetchAsync(string stream, string durableName, int batch)
=> _consumerManager.FetchAsync(stream, durableName, batch, _streamManager, default).AsTask();
public void AckAll(string stream, string durableName, ulong sequence)
=> _consumerManager.AckAll(stream, durableName, sequence);
public int GetPendingCount(string stream, string durableName)
=> _consumerManager.GetPendingCount(stream, durableName);
public Task<JetStreamConsumerInfo> GetConsumerInfoAsync(string stream, string durableName)
{
var resp = _consumerManager.GetInfo(stream, durableName);
if (resp.ConsumerInfo == null)
throw new InvalidOperationException("Consumer not found.");
return Task.FromResult(resp.ConsumerInfo);
}
public Task StepDownStreamLeaderAsync(string stream)
=> _streamManager.StepDownStreamLeaderAsync(stream, default);
public Task<JetStreamApiResponse> RequestAsync(string subject, string payload)
=> Task.FromResult(_router.Route(subject, Encoding.UTF8.GetBytes(payload)));
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
}