Move 225 JetStream-related test files from NATS.Server.Tests into a dedicated NATS.Server.JetStream.Tests project. This includes root-level JetStream*.cs files, storage test files (FileStore, MemStore, StreamStoreContract), and the full JetStream/ subfolder tree (Api, Cluster, Consumers, MirrorSource, Snapshots, Storage, Streams). Updated all namespaces, added InternalsVisibleTo, registered in the solution file, and added the JETSTREAM_INTEGRATION_MATRIX define.
645 lines
26 KiB
C#
645 lines
26 KiB
C#
// Go parity: golang/nats-server/server/jetstream_cluster_1_test.go
|
|
// Covers: consumer creation, ack propagation, consumer state,
|
|
// ephemeral consumers, consumer scaling, pull/push delivery,
|
|
// redelivery, ack policies, filter subjects.
|
|
using System.Text;
|
|
using NATS.Server.JetStream;
|
|
using NATS.Server.JetStream.Api;
|
|
using NATS.Server.JetStream.Cluster;
|
|
using NATS.Server.JetStream.Consumers;
|
|
using NATS.Server.JetStream.Models;
|
|
using NATS.Server.JetStream.Publish;
|
|
|
|
namespace NATS.Server.JetStream.Tests.JetStream.Cluster;
|
|
|
|
/// <summary>
|
|
/// Tests covering clustered JetStream consumer creation, leader election,
|
|
/// ack propagation, delivery policies, ephemeral consumers, and scaling.
|
|
/// Ported from Go jetstream_cluster_1_test.go and jetstream_cluster_2_test.go.
|
|
/// </summary>
|
|
public class JetStreamClusterConsumerTests
|
|
{
|
|
// ---------------------------------------------------------------
|
|
// Go: TestJetStreamClusterConsumerState server/jetstream_cluster_1_test.go:700
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public async Task Consumer_state_tracks_pending_after_fetch()
|
|
{
|
|
await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3);
|
|
|
|
await fx.CreateStreamAsync("CSTATE", ["cs.>"], replicas: 3);
|
|
await fx.CreateConsumerAsync("CSTATE", "track", filterSubject: "cs.>", ackPolicy: AckPolicy.Explicit);
|
|
|
|
for (var i = 0; i < 5; i++)
|
|
await fx.PublishAsync("cs.event", $"msg-{i}");
|
|
|
|
var batch = await fx.FetchAsync("CSTATE", "track", 3);
|
|
batch.Messages.Count.ShouldBe(3);
|
|
|
|
var pending = fx.GetPendingCount("CSTATE", "track");
|
|
pending.ShouldBe(3);
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Go: TestJetStreamClusterConsumerRedeliveredInfo server/jetstream_cluster_1_test.go:659
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public async Task Consumer_redelivery_marks_messages_as_redelivered()
|
|
{
|
|
await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3);
|
|
|
|
await fx.CreateStreamAsync("REDELIV", ["rd.>"], replicas: 3);
|
|
await fx.CreateConsumerAsync("REDELIV", "rdc", filterSubject: "rd.>",
|
|
ackPolicy: AckPolicy.Explicit, ackWaitMs: 1, maxDeliver: 5);
|
|
|
|
await fx.PublishAsync("rd.event", "will-redeliver");
|
|
|
|
// First fetch should get the message
|
|
var batch1 = await fx.FetchAsync("REDELIV", "rdc", 1);
|
|
batch1.Messages.Count.ShouldBe(1);
|
|
batch1.Messages[0].Redelivered.ShouldBeFalse();
|
|
|
|
// Wait for ack timeout
|
|
await Task.Delay(50);
|
|
|
|
// Second fetch should get redelivered message
|
|
var batch2 = await fx.FetchAsync("REDELIV", "rdc", 1);
|
|
batch2.Messages.Count.ShouldBe(1);
|
|
batch2.Messages[0].Redelivered.ShouldBeTrue();
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Go: TestJetStreamClusterFullConsumerState server/jetstream_cluster_1_test.go:795
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public async Task Full_consumer_state_reflects_ack_floor_after_ack_all()
|
|
{
|
|
await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3);
|
|
|
|
await fx.CreateStreamAsync("FULLCS", ["fcs.>"], replicas: 3);
|
|
await fx.CreateConsumerAsync("FULLCS", "full", filterSubject: "fcs.>", ackPolicy: AckPolicy.All);
|
|
|
|
for (var i = 0; i < 10; i++)
|
|
await fx.PublishAsync("fcs.event", $"msg-{i}");
|
|
|
|
var batch = await fx.FetchAsync("FULLCS", "full", 10);
|
|
batch.Messages.Count.ShouldBe(10);
|
|
|
|
// Ack all up to sequence 5
|
|
fx.AckAll("FULLCS", "full", 5);
|
|
|
|
var pending = fx.GetPendingCount("FULLCS", "full");
|
|
pending.ShouldBe(5);
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Go: TestJetStreamClusterEphemeralConsumerNoImmediateInterest server/jetstream_cluster_1_test.go:2481
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public async Task Ephemeral_consumer_creation_succeeds()
|
|
{
|
|
await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3);
|
|
|
|
await fx.CreateStreamAsync("EPHEM", ["eph.>"], replicas: 3);
|
|
var resp = await fx.CreateConsumerAsync("EPHEM", null, ephemeral: true);
|
|
|
|
resp.ConsumerInfo.ShouldNotBeNull();
|
|
resp.ConsumerInfo!.Config.DurableName.ShouldNotBeNullOrEmpty();
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Go: TestJetStreamClusterEphemeralConsumersNotReplicated server/jetstream_cluster_1_test.go:2599
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public async Task Multiple_ephemeral_consumers_have_unique_names()
|
|
{
|
|
await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3);
|
|
|
|
await fx.CreateStreamAsync("EPHUNIQ", ["eu.>"], replicas: 3);
|
|
|
|
var resp1 = await fx.CreateConsumerAsync("EPHUNIQ", null, ephemeral: true);
|
|
var resp2 = await fx.CreateConsumerAsync("EPHUNIQ", null, ephemeral: true);
|
|
|
|
resp1.ConsumerInfo!.Config.DurableName.ShouldNotBe(resp2.ConsumerInfo!.Config.DurableName);
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Go: TestJetStreamClusterCreateConcurrentDurableConsumers server/jetstream_cluster_2_test.go:1572
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public async Task Concurrent_durable_consumer_creation_is_idempotent()
|
|
{
|
|
await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3);
|
|
|
|
await fx.CreateStreamAsync("CONC", ["conc.>"], replicas: 3);
|
|
|
|
// Create same consumer twice; both should succeed
|
|
var resp1 = await fx.CreateConsumerAsync("CONC", "same");
|
|
var resp2 = await fx.CreateConsumerAsync("CONC", "same");
|
|
|
|
resp1.ConsumerInfo.ShouldNotBeNull();
|
|
resp2.ConsumerInfo.ShouldNotBeNull();
|
|
resp1.ConsumerInfo!.Config.DurableName.ShouldBe("same");
|
|
resp2.ConsumerInfo!.Config.DurableName.ShouldBe("same");
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Go: TestJetStreamClusterPullConsumerLeakedSubs server/jetstream_cluster_2_test.go:2239
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public async Task Pull_consumer_fetch_returns_correct_batch_size()
|
|
{
|
|
await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3);
|
|
|
|
await fx.CreateStreamAsync("PULLBS", ["pb.>"], replicas: 3);
|
|
await fx.CreateConsumerAsync("PULLBS", "puller", filterSubject: "pb.>", ackPolicy: AckPolicy.None);
|
|
|
|
for (var i = 0; i < 20; i++)
|
|
await fx.PublishAsync("pb.event", $"msg-{i}");
|
|
|
|
var batch = await fx.FetchAsync("PULLBS", "puller", 5);
|
|
batch.Messages.Count.ShouldBe(5);
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Go: TestJetStreamClusterConsumerLastActiveReporting server/jetstream_cluster_2_test.go:2371
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public async Task Consumer_info_returns_config_after_creation()
|
|
{
|
|
await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3);
|
|
|
|
await fx.CreateStreamAsync("CINFO", ["ci.>"], replicas: 3);
|
|
await fx.CreateConsumerAsync("CINFO", "info_dur", filterSubject: "ci.>", ackPolicy: AckPolicy.Explicit);
|
|
|
|
var info = await fx.GetConsumerInfoAsync("CINFO", "info_dur");
|
|
info.ShouldNotBeNull();
|
|
info.Config.DurableName.ShouldBe("info_dur");
|
|
info.Config.AckPolicy.ShouldBe(AckPolicy.Explicit);
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Go: TestJetStreamClusterAckPendingWithExpired server/jetstream_cluster_2_test.go:309
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public async Task Ack_pending_tracks_expired_messages()
|
|
{
|
|
await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3);
|
|
|
|
await fx.CreateStreamAsync("ACKEXP", ["ae.>"], replicas: 3);
|
|
await fx.CreateConsumerAsync("ACKEXP", "acker", filterSubject: "ae.>",
|
|
ackPolicy: AckPolicy.Explicit, ackWaitMs: 1, maxDeliver: 10);
|
|
|
|
await fx.PublishAsync("ae.event", "will-expire");
|
|
|
|
// Fetch to register pending
|
|
var batch1 = await fx.FetchAsync("ACKEXP", "acker", 1);
|
|
batch1.Messages.Count.ShouldBe(1);
|
|
|
|
fx.GetPendingCount("ACKEXP", "acker").ShouldBe(1);
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Go: TestJetStreamClusterAckPendingWithMaxRedelivered server/jetstream_cluster_2_test.go:377
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public async Task Max_deliver_limits_redelivery_attempts()
|
|
{
|
|
await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3);
|
|
|
|
await fx.CreateStreamAsync("MAXRED", ["mr.>"], replicas: 3);
|
|
// maxDeliver=2: allows initial delivery (deliveries=1) + one redelivery (deliveries=2).
|
|
// After ScheduleRedelivery increments to deliveries=2, the next check has deliveries=2 > maxDeliver=2 = false,
|
|
// so it redelivers once more. Only at deliveries=3 > 2 does it stop.
|
|
await fx.CreateConsumerAsync("MAXRED", "maxr", filterSubject: "mr.>",
|
|
ackPolicy: AckPolicy.Explicit, ackWaitMs: 1, maxDeliver: 2);
|
|
|
|
await fx.PublishAsync("mr.event", "limited-redeliver");
|
|
|
|
// First fetch (initial delivery, Register sets deliveries=1)
|
|
var batch1 = await fx.FetchAsync("MAXRED", "maxr", 1);
|
|
batch1.Messages.Count.ShouldBe(1);
|
|
|
|
// Wait for expiry
|
|
await Task.Delay(50);
|
|
|
|
// Second fetch: TryGetExpired returns deliveries=1, 1 > 2 is false, so redeliver.
|
|
// ScheduleRedelivery increments to deliveries=2.
|
|
var batch2 = await fx.FetchAsync("MAXRED", "maxr", 1);
|
|
batch2.Messages.Count.ShouldBe(1);
|
|
batch2.Messages[0].Redelivered.ShouldBeTrue();
|
|
|
|
// Wait for expiry
|
|
await Task.Delay(50);
|
|
|
|
// Third fetch: TryGetExpired returns deliveries=2, 2 > 2 is false, so redeliver again.
|
|
// ScheduleRedelivery increments to deliveries=3.
|
|
var batch3 = await fx.FetchAsync("MAXRED", "maxr", 1);
|
|
batch3.Messages.Count.ShouldBe(1);
|
|
batch3.Messages[0].Redelivered.ShouldBeTrue();
|
|
|
|
// Wait for expiry
|
|
await Task.Delay(50);
|
|
|
|
// Fourth fetch: TryGetExpired returns deliveries=3, 3 > 2 is true, so AckAll triggers
|
|
// and returns empty batch (max deliver exceeded).
|
|
var batch4 = await fx.FetchAsync("MAXRED", "maxr", 1);
|
|
batch4.Messages.Count.ShouldBe(0);
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Go: TestJetStreamClusterMaxConsumers server/jetstream_cluster_2_test.go:1978
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public async Task Consumer_delete_succeeds_in_cluster()
|
|
{
|
|
await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3);
|
|
|
|
await fx.CreateStreamAsync("CDEL", ["cdel.>"], replicas: 3);
|
|
await fx.CreateConsumerAsync("CDEL", "to_delete");
|
|
|
|
var del = await fx.RequestAsync($"{JetStreamApiSubjects.ConsumerDelete}CDEL.to_delete", "{}");
|
|
del.Success.ShouldBeTrue();
|
|
|
|
var info = await fx.RequestAsync($"{JetStreamApiSubjects.ConsumerInfo}CDEL.to_delete", "{}");
|
|
info.Error.ShouldNotBeNull();
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Go: TestJetStreamClusterFlowControlRequiresHeartbeats server/jetstream_cluster_2_test.go:2712
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public async Task Consumer_with_filter_subjects_delivers_matching_only()
|
|
{
|
|
await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3);
|
|
|
|
await fx.CreateStreamAsync("FILT", ["filt.>"], replicas: 3);
|
|
await fx.CreateConsumerAsync("FILT", "filtered", filterSubject: "filt.alpha");
|
|
|
|
await fx.PublishAsync("filt.alpha", "match");
|
|
await fx.PublishAsync("filt.beta", "no-match");
|
|
await fx.PublishAsync("filt.alpha", "match2");
|
|
|
|
var batch = await fx.FetchAsync("FILT", "filtered", 10);
|
|
batch.Messages.Count.ShouldBe(2);
|
|
batch.Messages[0].Subject.ShouldBe("filt.alpha");
|
|
batch.Messages[1].Subject.ShouldBe("filt.alpha");
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Go: TestJetStreamClusterConsumerScaleUp server/jetstream_cluster_1_test.go:4203
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public async Task Consumer_pause_and_resume_via_api()
|
|
{
|
|
await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3);
|
|
|
|
await fx.CreateStreamAsync("PAUSE", ["pause.>"], replicas: 3);
|
|
await fx.CreateConsumerAsync("PAUSE", "pausable");
|
|
|
|
var pauseResp = await fx.RequestAsync($"{JetStreamApiSubjects.ConsumerPause}PAUSE.pausable", """{"pause":true}""");
|
|
pauseResp.Success.ShouldBeTrue();
|
|
|
|
var resumeResp = await fx.RequestAsync($"{JetStreamApiSubjects.ConsumerPause}PAUSE.pausable", """{"pause":false}""");
|
|
resumeResp.Success.ShouldBeTrue();
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Go: TestJetStreamClusterConsumerResetPendingDeliveriesOnMaxAckPendingUpdate
|
|
// server/jetstream_cluster_1_test.go:8696
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public async Task Consumer_reset_resets_next_sequence_and_returns_success()
|
|
{
|
|
await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3);
|
|
|
|
await fx.CreateStreamAsync("RESET", ["reset.>"], replicas: 3);
|
|
await fx.CreateConsumerAsync("RESET", "resettable", filterSubject: "reset.>");
|
|
|
|
for (var i = 0; i < 5; i++)
|
|
await fx.PublishAsync("reset.event", $"msg-{i}");
|
|
|
|
// Fetch some messages to advance the consumer
|
|
var batch1 = await fx.FetchAsync("RESET", "resettable", 3);
|
|
batch1.Messages.Count.ShouldBe(3);
|
|
|
|
// Reset via API
|
|
var resetResp = await fx.RequestAsync($"{JetStreamApiSubjects.ConsumerReset}RESET.resettable", "{}");
|
|
resetResp.Success.ShouldBeTrue();
|
|
|
|
// After reset, consumer should re-deliver from sequence 1
|
|
var batch2 = await fx.FetchAsync("RESET", "resettable", 5);
|
|
batch2.Messages.Count.ShouldBe(5);
|
|
batch2.Messages[0].Sequence.ShouldBe(1UL);
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Go: TestJetStreamClusterPushConsumerQueueGroup server/jetstream_cluster_2_test.go:2300
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public async Task Push_consumer_creation_with_heartbeat()
|
|
{
|
|
await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3);
|
|
|
|
await fx.CreateStreamAsync("PUSHHB", ["ph.>"], replicas: 3);
|
|
var resp = await fx.CreateConsumerAsync("PUSHHB", "pusher", push: true, heartbeatMs: 100);
|
|
|
|
resp.ConsumerInfo.ShouldNotBeNull();
|
|
resp.ConsumerInfo!.Config.Push.ShouldBeTrue();
|
|
resp.ConsumerInfo.Config.HeartbeatMs.ShouldBe(100);
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Go: TestJetStreamClusterScaleConsumer server/jetstream_cluster_1_test.go:4109
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public async Task Consumer_unpin_via_api()
|
|
{
|
|
await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3);
|
|
|
|
await fx.CreateStreamAsync("UNPIN", ["unpin.>"], replicas: 3);
|
|
await fx.CreateConsumerAsync("UNPIN", "pinned");
|
|
|
|
var resp = await fx.RequestAsync($"{JetStreamApiSubjects.ConsumerUnpin}UNPIN.pinned", "{}");
|
|
resp.Success.ShouldBeTrue();
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Additional: Consumer AckAll policy acks all up to given sequence
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public async Task AckAll_policy_consumer_acks_all_preceding_messages()
|
|
{
|
|
await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3);
|
|
|
|
await fx.CreateStreamAsync("ACKALL", ["aa.>"], replicas: 3);
|
|
await fx.CreateConsumerAsync("ACKALL", "acker", filterSubject: "aa.>", ackPolicy: AckPolicy.All);
|
|
|
|
for (var i = 0; i < 10; i++)
|
|
await fx.PublishAsync("aa.event", $"msg-{i}");
|
|
|
|
var batch = await fx.FetchAsync("ACKALL", "acker", 10);
|
|
batch.Messages.Count.ShouldBe(10);
|
|
|
|
// Ack up to seq 7 (all 1-7 should be acked, 8-10 remain pending)
|
|
fx.AckAll("ACKALL", "acker", 7);
|
|
fx.GetPendingCount("ACKALL", "acker").ShouldBe(3);
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Additional: DeliverPolicy.Last consumer starts at last message
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public async Task DeliverPolicy_Last_consumer_starts_at_last_sequence()
|
|
{
|
|
await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3);
|
|
|
|
await fx.CreateStreamAsync("DLAST", ["dl.>"], replicas: 3);
|
|
|
|
for (var i = 0; i < 5; i++)
|
|
await fx.PublishAsync("dl.event", $"msg-{i}");
|
|
|
|
await fx.CreateConsumerAsync("DLAST", "last_cons", filterSubject: "dl.>",
|
|
deliverPolicy: DeliverPolicy.Last);
|
|
|
|
var batch = await fx.FetchAsync("DLAST", "last_cons", 10);
|
|
batch.Messages.Count.ShouldBe(1);
|
|
batch.Messages[0].Sequence.ShouldBe(5UL);
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Additional: DeliverPolicy.New consumer skips existing messages
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public async Task DeliverPolicy_New_consumer_skips_existing()
|
|
{
|
|
await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3);
|
|
|
|
await fx.CreateStreamAsync("DNEW", ["dn.>"], replicas: 3);
|
|
|
|
for (var i = 0; i < 5; i++)
|
|
await fx.PublishAsync("dn.event", $"msg-{i}");
|
|
|
|
await fx.CreateConsumerAsync("DNEW", "new_cons", filterSubject: "dn.>",
|
|
deliverPolicy: DeliverPolicy.New);
|
|
|
|
// Should get no messages since consumer starts at LastSeq+1
|
|
var batch = await fx.FetchAsync("DNEW", "new_cons", 10);
|
|
batch.Messages.Count.ShouldBe(0);
|
|
|
|
// Publish a new message after consumer creation
|
|
await fx.PublishAsync("dn.event", "after-consumer");
|
|
|
|
var batch2 = await fx.FetchAsync("DNEW", "new_cons", 10);
|
|
batch2.Messages.Count.ShouldBe(1);
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Additional: DeliverPolicy.ByStartSequence
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public async Task DeliverPolicy_ByStartSequence_starts_at_given_sequence()
|
|
{
|
|
await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3);
|
|
|
|
await fx.CreateStreamAsync("DSTART", ["ds.>"], replicas: 3);
|
|
|
|
for (var i = 0; i < 10; i++)
|
|
await fx.PublishAsync("ds.event", $"msg-{i}");
|
|
|
|
await fx.CreateConsumerAsync("DSTART", "start_cons", filterSubject: "ds.>",
|
|
deliverPolicy: DeliverPolicy.ByStartSequence, optStartSeq: 7);
|
|
|
|
var batch = await fx.FetchAsync("DSTART", "start_cons", 10);
|
|
batch.Messages.Count.ShouldBe(4); // seq 7, 8, 9, 10
|
|
batch.Messages[0].Sequence.ShouldBe(7UL);
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Additional: Multiple filter subjects
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public async Task Consumer_with_multiple_filter_subjects()
|
|
{
|
|
await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3);
|
|
|
|
await fx.CreateStreamAsync("MFILT", ["mf.>"], replicas: 3);
|
|
await fx.CreateConsumerAsync("MFILT", "multi_filt",
|
|
filterSubjects: ["mf.alpha", "mf.gamma"]);
|
|
|
|
await fx.PublishAsync("mf.alpha", "a");
|
|
await fx.PublishAsync("mf.beta", "b");
|
|
await fx.PublishAsync("mf.gamma", "g");
|
|
await fx.PublishAsync("mf.delta", "d");
|
|
|
|
var batch = await fx.FetchAsync("MFILT", "multi_filt", 10);
|
|
batch.Messages.Count.ShouldBe(2);
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Additional: NoWait fetch returns empty when no messages
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public async Task NoWait_fetch_returns_empty_when_no_pending()
|
|
{
|
|
await using var fx = await ClusterConsumerFixture.StartAsync(nodes: 3);
|
|
|
|
await fx.CreateStreamAsync("NOWAIT", ["nw.>"], replicas: 3);
|
|
await fx.CreateConsumerAsync("NOWAIT", "nw_cons", filterSubject: "nw.>");
|
|
|
|
var batch = await fx.FetchNoWaitAsync("NOWAIT", "nw_cons", 5);
|
|
batch.Messages.Count.ShouldBe(0);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Self-contained fixture for JetStream cluster consumer tests.
|
|
/// </summary>
|
|
internal sealed class ClusterConsumerFixture : IAsyncDisposable
|
|
{
|
|
private readonly JetStreamMetaGroup _metaGroup;
|
|
private readonly StreamManager _streamManager;
|
|
private readonly ConsumerManager _consumerManager;
|
|
private readonly JetStreamApiRouter _router;
|
|
private readonly JetStreamPublisher _publisher;
|
|
|
|
private ClusterConsumerFixture(
|
|
JetStreamMetaGroup metaGroup,
|
|
StreamManager streamManager,
|
|
ConsumerManager consumerManager,
|
|
JetStreamApiRouter router,
|
|
JetStreamPublisher publisher)
|
|
{
|
|
_metaGroup = metaGroup;
|
|
_streamManager = streamManager;
|
|
_consumerManager = consumerManager;
|
|
_router = router;
|
|
_publisher = publisher;
|
|
}
|
|
|
|
public static Task<ClusterConsumerFixture> StartAsync(int nodes)
|
|
{
|
|
var meta = new JetStreamMetaGroup(nodes);
|
|
var consumerManager = new ConsumerManager(meta);
|
|
var streamManager = new StreamManager(meta, consumerManager: consumerManager);
|
|
var router = new JetStreamApiRouter(streamManager, consumerManager, meta);
|
|
var publisher = new JetStreamPublisher(streamManager);
|
|
return Task.FromResult(new ClusterConsumerFixture(meta, streamManager, consumerManager, router, publisher));
|
|
}
|
|
|
|
public Task CreateStreamAsync(string name, string[] subjects, int replicas)
|
|
{
|
|
var response = _streamManager.CreateOrUpdate(new StreamConfig
|
|
{
|
|
Name = name,
|
|
Subjects = [.. subjects],
|
|
Replicas = replicas,
|
|
});
|
|
if (response.Error is not null)
|
|
throw new InvalidOperationException(response.Error.Description);
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
public Task<JetStreamApiResponse> CreateConsumerAsync(
|
|
string stream,
|
|
string? durableName,
|
|
string? filterSubject = null,
|
|
AckPolicy ackPolicy = AckPolicy.None,
|
|
int ackWaitMs = 30_000,
|
|
int maxDeliver = 1,
|
|
bool ephemeral = false,
|
|
bool push = false,
|
|
int heartbeatMs = 0,
|
|
DeliverPolicy deliverPolicy = DeliverPolicy.All,
|
|
ulong optStartSeq = 0,
|
|
IReadOnlyList<string>? filterSubjects = null)
|
|
{
|
|
var config = new ConsumerConfig
|
|
{
|
|
DurableName = durableName ?? string.Empty,
|
|
AckPolicy = ackPolicy,
|
|
AckWaitMs = ackWaitMs,
|
|
MaxDeliver = maxDeliver,
|
|
Ephemeral = ephemeral,
|
|
Push = push,
|
|
HeartbeatMs = heartbeatMs,
|
|
DeliverPolicy = deliverPolicy,
|
|
OptStartSeq = optStartSeq,
|
|
};
|
|
if (!string.IsNullOrWhiteSpace(filterSubject))
|
|
config.FilterSubject = filterSubject;
|
|
if (filterSubjects is { Count: > 0 })
|
|
config.FilterSubjects = [.. filterSubjects];
|
|
|
|
return Task.FromResult(_consumerManager.CreateOrUpdate(stream, config));
|
|
}
|
|
|
|
public Task<PubAck> PublishAsync(string subject, string payload)
|
|
{
|
|
if (_publisher.TryCapture(subject, Encoding.UTF8.GetBytes(payload), null, out var ack))
|
|
{
|
|
if (ack.ErrorCode == null && _streamManager.TryGet(ack.Stream, out var handle))
|
|
{
|
|
var stored = handle.Store.LoadAsync(ack.Seq, default).GetAwaiter().GetResult();
|
|
if (stored != null)
|
|
_consumerManager.OnPublished(ack.Stream, stored);
|
|
}
|
|
|
|
return Task.FromResult(ack);
|
|
}
|
|
|
|
throw new InvalidOperationException($"Publish to '{subject}' did not match a stream.");
|
|
}
|
|
|
|
public Task<PullFetchBatch> FetchAsync(string stream, string durableName, int batch)
|
|
=> _consumerManager.FetchAsync(stream, durableName, batch, _streamManager, default).AsTask();
|
|
|
|
public Task<PullFetchBatch> FetchNoWaitAsync(string stream, string durableName, int batch)
|
|
=> _consumerManager.FetchAsync(stream, durableName, new PullFetchRequest
|
|
{
|
|
Batch = batch,
|
|
NoWait = true,
|
|
}, _streamManager, default).AsTask();
|
|
|
|
public void AckAll(string stream, string durableName, ulong sequence)
|
|
=> _consumerManager.AckAll(stream, durableName, sequence);
|
|
|
|
public int GetPendingCount(string stream, string durableName)
|
|
=> _consumerManager.GetPendingCount(stream, durableName);
|
|
|
|
public Task<JetStreamConsumerInfo> GetConsumerInfoAsync(string stream, string durableName)
|
|
{
|
|
var resp = _consumerManager.GetInfo(stream, durableName);
|
|
if (resp.ConsumerInfo == null)
|
|
throw new InvalidOperationException("Consumer not found.");
|
|
return Task.FromResult(resp.ConsumerInfo);
|
|
}
|
|
|
|
public Task<JetStreamApiResponse> RequestAsync(string subject, string payload)
|
|
=> Task.FromResult(_router.Route(subject, Encoding.UTF8.GetBytes(payload)));
|
|
|
|
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
|
|
}
|