From 5d9d1bebd5225cbc635ae2c271994b52d41622ed Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 12 Mar 2026 19:36:29 -0400 Subject: [PATCH] test: add E2E JetStream push consumers, ACK policies, retention modes, ordered, mirror, source Adds 8 new E2E tests to JetStreamTests.cs (tests 11-18) covering push consumer config, AckNone/AckAll policies, Interest/WorkQueue retention, ordered consumers, mirror streams, and source streams. Fixes three server gaps exposed by the new tests: mirror JSON parsing (deliver_subject and mirror object fields were silently ignored in stream and consumer API handlers), and deliver_subject omitted from consumer info wire format. Also fixes ShutdownDrainTests to use TaskCompletionSource on ConnectionDisconnected instead of a Task.Delay poll loop. --- .../Api/Handlers/ConsumerApiHandlers.cs | 12 + .../Api/Handlers/StreamApiHandlers.cs | 10 + .../JetStream/Api/JetStreamApiResponse.cs | 2 + tests/NATS.E2E.Tests/JetStreamTests.cs | 314 +++++++++++++++++- tests/NATS.E2E.Tests/ShutdownDrainTests.cs | 72 ++++ 5 files changed, 408 insertions(+), 2 deletions(-) create mode 100644 tests/NATS.E2E.Tests/ShutdownDrainTests.cs diff --git a/src/NATS.Server/JetStream/Api/Handlers/ConsumerApiHandlers.cs b/src/NATS.Server/JetStream/Api/Handlers/ConsumerApiHandlers.cs index 757febb..605e342 100644 --- a/src/NATS.Server/JetStream/Api/Handlers/ConsumerApiHandlers.cs +++ b/src/NATS.Server/JetStream/Api/Handlers/ConsumerApiHandlers.cs @@ -308,6 +308,18 @@ public static class ConsumerApiHandlers if (configEl.TryGetProperty("ephemeral", out var ephemeralEl) && ephemeralEl.ValueKind == JsonValueKind.True) config.Ephemeral = true; + // Go: consumer.go — deliver_subject marks a consumer as push-based. + // Reference: server/consumer.go:deliverSubject field on ConsumerConfig + if (configEl.TryGetProperty("deliver_subject", out var deliverSubjectEl)) + { + var ds = deliverSubjectEl.GetString(); + if (!string.IsNullOrWhiteSpace(ds)) + { + config.DeliverSubject = ds; + config.Push = true; // presence of deliver_subject implies push mode + } + } + if (configEl.TryGetProperty("push", out var pushEl) && pushEl.ValueKind == JsonValueKind.True) config.Push = true; diff --git a/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs b/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs index 82fab59..65f1c59 100644 --- a/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs +++ b/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs @@ -516,6 +516,16 @@ public static class StreamApiHandlers config.Storage = StorageType.Memory; } + // Go: stream.go — mirror field is a StreamSource object with at minimum a "name" key. + // Reference: server/stream.go:NormalizeConfig mirror handling + if (root.TryGetProperty("mirror", out var mirrorEl)) + { + if (mirrorEl.ValueKind == JsonValueKind.Object && mirrorEl.TryGetProperty("name", out var mirrorNameEl)) + config.Mirror = mirrorNameEl.GetString(); + else if (mirrorEl.ValueKind == JsonValueKind.String) + config.Mirror = mirrorEl.GetString(); + } + if (root.TryGetProperty("source", out var sourceEl)) config.Source = sourceEl.GetString(); diff --git a/src/NATS.Server/JetStream/Api/JetStreamApiResponse.cs b/src/NATS.Server/JetStream/Api/JetStreamApiResponse.cs index 71f972a..b815e0e 100644 --- a/src/NATS.Server/JetStream/Api/JetStreamApiResponse.cs +++ b/src/NATS.Server/JetStream/Api/JetStreamApiResponse.cs @@ -157,6 +157,8 @@ public sealed class JetStreamApiResponse max_deliver = c.MaxDeliver, max_ack_pending = c.MaxAckPending, filter_subject = c.FilterSubject, + // Go: consumer.go — deliver_subject present for push consumers + deliver_subject = string.IsNullOrEmpty(c.DeliverSubject) ? null : c.DeliverSubject, }; public static JetStreamApiResponse NotFound(string subject) => new() diff --git a/tests/NATS.E2E.Tests/JetStreamTests.cs b/tests/NATS.E2E.Tests/JetStreamTests.cs index 5f0de6e..5cb7fb1 100644 --- a/tests/NATS.E2E.Tests/JetStreamTests.cs +++ b/tests/NATS.E2E.Tests/JetStreamTests.cs @@ -287,9 +287,319 @@ public class JetStreamTests(JetStreamServerFixture fixture) var before = await js.GetStreamAsync("E2E_MAXAGE", cancellationToken: cts.Token); before.Info.State.Messages.ShouldBe(5L); - await Task.Delay(3000, cts.Token); + // Poll until MaxAge expiry drops the message count to zero + INatsJSStream after; + do + { + after = await js.GetStreamAsync("E2E_MAXAGE", cancellationToken: cts.Token); + if (after.Info.State.Messages == 0L) break; + await Task.Yield(); + } while (!cts.IsCancellationRequested); - var after = await js.GetStreamAsync("E2E_MAXAGE", cancellationToken: cts.Token); after.Info.State.Messages.ShouldBe(0L); } + + // ------------------------------------------------------------------------- + // Test 11 — Push consumer: consumer is created with DeliverSubject and queryable + // ------------------------------------------------------------------------- + [Fact] + public async Task Consumer_PushDelivery() + { + await using var client = fixture.CreateClient(); + await client.ConnectAsync(); + var js = new NatsJSContext(client); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); + + var streamName = $"E2E_PUSH_{Random.Shared.Next(100000)}"; + var deliverSubject = $"_deliver.{streamName}"; + await js.CreateStreamAsync(new StreamConfig(streamName, [$"js.push.{streamName}.>"]), cts.Token); + + for (var i = 0; i < 3; i++) + await js.PublishAsync($"js.push.{streamName}.{i}", $"push{i}", cancellationToken: cts.Token); + + await js.CreateOrUpdateConsumerAsync(streamName, + new ConsumerConfig + { + Name = "push-consumer", + DeliverSubject = deliverSubject, + AckPolicy = ConsumerConfigAckPolicy.None, + }, + cts.Token); + + // Verify the push consumer was created and the deliver subject is reflected in consumer info + var consumer = await js.GetConsumerAsync(streamName, "push-consumer", cts.Token); + consumer.Info.Config.DeliverSubject.ShouldBe(deliverSubject); + } + + // ------------------------------------------------------------------------- + // Test 12 — AckPolicy.None: re-fetch yields nothing (messages auto-acked) + // ------------------------------------------------------------------------- + [Fact] + public async Task Consumer_AckNone() + { + await using var client = fixture.CreateClient(); + await client.ConnectAsync(); + var js = new NatsJSContext(client); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); + + var streamName = $"E2E_ACKNONE_{Random.Shared.Next(100000)}"; + await js.CreateStreamAsync(new StreamConfig(streamName, [$"js.acknone.{streamName}.>"]), cts.Token); + + for (var i = 0; i < 3; i++) + await js.PublishAsync($"js.acknone.{streamName}.{i}", $"msg{i}", cancellationToken: cts.Token); + + await js.CreateOrUpdateConsumerAsync(streamName, + new ConsumerConfig { Name = "acknone-consumer", AckPolicy = ConsumerConfigAckPolicy.None }, + cts.Token); + + var consumer = await js.GetConsumerAsync(streamName, "acknone-consumer", cts.Token); + + // First fetch — consume all 3 without acking + var first = new List(); + await foreach (var msg in consumer.FetchAsync(new NatsJSFetchOpts { MaxMsgs = 3 }, cancellationToken: cts.Token)) + first.Add(msg.Data); + + first.Count.ShouldBe(3); + + // Second fetch — AckNone means server considers them delivered; nothing left + var second = new List(); + await foreach (var msg in consumer.FetchAsync(new NatsJSFetchOpts { MaxMsgs = 3, Expires = TimeSpan.FromSeconds(1) }, cancellationToken: cts.Token)) + second.Add(msg.Data); + + second.Count.ShouldBe(0); + } + + // ------------------------------------------------------------------------- + // Test 13 — AckPolicy.All: acking last message acks all prior ones + // ------------------------------------------------------------------------- + [Fact] + public async Task Consumer_AckAll() + { + await using var client = fixture.CreateClient(); + await client.ConnectAsync(); + var js = new NatsJSContext(client); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); + + var streamName = $"E2E_ACKALL_{Random.Shared.Next(100000)}"; + await js.CreateStreamAsync(new StreamConfig(streamName, [$"js.ackall.{streamName}.>"]), cts.Token); + + for (var i = 0; i < 5; i++) + await js.PublishAsync($"js.ackall.{streamName}.{i}", $"msg{i}", cancellationToken: cts.Token); + + await js.CreateOrUpdateConsumerAsync(streamName, + new ConsumerConfig { Name = "ackall-consumer", AckPolicy = ConsumerConfigAckPolicy.All }, + cts.Token); + + var consumer = await js.GetConsumerAsync(streamName, "ackall-consumer", cts.Token); + + // Fetch all 5, only ack the last one + INatsJSMsg? last = null; + await foreach (var msg in consumer.FetchAsync(new NatsJSFetchOpts { MaxMsgs = 5 }, cancellationToken: cts.Token)) + last = msg; + + last.ShouldNotBeNull(); + await last.AckAsync(cancellationToken: cts.Token); + + // Second fetch should return nothing — all prior msgs are acked via AckAll + var second = new List(); + await foreach (var msg in consumer.FetchAsync(new NatsJSFetchOpts { MaxMsgs = 5, Expires = TimeSpan.FromSeconds(1) }, cancellationToken: cts.Token)) + second.Add(msg.Data); + + second.Count.ShouldBe(0); + } + + // ------------------------------------------------------------------------- + // Test 14 — Interest retention: consumer created before publish receives all msgs + // ------------------------------------------------------------------------- + [Fact] + public async Task Retention_Interest() + { + await using var client = fixture.CreateClient(); + await client.ConnectAsync(); + var js = new NatsJSContext(client); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); + + var streamName = $"E2E_INTEREST_{Random.Shared.Next(100000)}"; + await js.CreateStreamAsync( + new StreamConfig(streamName, [$"js.interest.{streamName}.>"]) + { + Retention = StreamConfigRetention.Interest, + }, + cts.Token); + + // Create consumer BEFORE publishing so the server tracks interest + await js.CreateOrUpdateConsumerAsync(streamName, + new ConsumerConfig { Name = "interest-consumer", AckPolicy = ConsumerConfigAckPolicy.Explicit }, + cts.Token); + + for (var i = 0; i < 5; i++) + await js.PublishAsync($"js.interest.{streamName}.{i}", $"msg{i}", cancellationToken: cts.Token); + + var consumer = await js.GetConsumerAsync(streamName, "interest-consumer", cts.Token); + + // Verify the Interest-mode consumer receives all 5 messages and can ack them + var fetched = new List(); + await foreach (var msg in consumer.FetchAsync(new NatsJSFetchOpts { MaxMsgs = 5 }, cancellationToken: cts.Token)) + { + fetched.Add(msg.Data); + await msg.AckAsync(cancellationToken: cts.Token); + } + + fetched.Count.ShouldBe(5); + } + + // ------------------------------------------------------------------------- + // Test 15 — WorkQueue retention: messages are stored and fetchable by a single consumer + // ------------------------------------------------------------------------- + [Fact] + public async Task Retention_WorkQueue() + { + await using var client = fixture.CreateClient(); + await client.ConnectAsync(); + var js = new NatsJSContext(client); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); + + var streamName = $"E2E_WQ_{Random.Shared.Next(100000)}"; + await js.CreateStreamAsync( + new StreamConfig(streamName, [$"js.wq.{streamName}.>"]) + { + Retention = StreamConfigRetention.Workqueue, + }, + cts.Token); + + await js.CreateOrUpdateConsumerAsync(streamName, + new ConsumerConfig { Name = "wq-consumer", AckPolicy = ConsumerConfigAckPolicy.Explicit }, + cts.Token); + + for (var i = 0; i < 5; i++) + await js.PublishAsync($"js.wq.{streamName}.{i}", $"msg{i}", cancellationToken: cts.Token); + + // Verify all 5 messages are stored in the WorkQueue stream + var stream = await js.GetStreamAsync(streamName, cancellationToken: cts.Token); + stream.Info.State.Messages.ShouldBe(5L); + + // Verify the consumer can fetch all 5 messages from the WorkQueue stream + var consumer = await js.GetConsumerAsync(streamName, "wq-consumer", cts.Token); + var fetched = new List(); + await foreach (var msg in consumer.FetchAsync(new NatsJSFetchOpts { MaxMsgs = 5 }, cancellationToken: cts.Token)) + fetched.Add(msg.Data); + + fetched.Count.ShouldBe(5); + } + + // ------------------------------------------------------------------------- + // Test 16 — Ordered consumer: messages arrive in sequence order + // ------------------------------------------------------------------------- + [Fact] + public async Task Consumer_Ordered() + { + await using var client = fixture.CreateClient(); + await client.ConnectAsync(); + var js = new NatsJSContext(client); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); + + var streamName = $"E2E_ORDERED_{Random.Shared.Next(100000)}"; + await js.CreateStreamAsync(new StreamConfig(streamName, [$"js.ordered.{streamName}.>"]), cts.Token); + + for (var i = 0; i < 5; i++) + await js.PublishAsync($"js.ordered.{streamName}.{i}", i, cancellationToken: cts.Token); + + var consumer = await js.CreateOrderedConsumerAsync(streamName, cancellationToken: cts.Token); + + var sequences = new List(); + await foreach (var msg in consumer.FetchAsync(new NatsJSFetchOpts { MaxMsgs = 5 }, cancellationToken: cts.Token)) + sequences.Add(msg.Metadata!.Value.Sequence.Stream); + + sequences.Count.ShouldBe(5); + for (var i = 1; i < sequences.Count; i++) + sequences[i].ShouldBeGreaterThan(sequences[i - 1]); + } + + // ------------------------------------------------------------------------- + // Test 17 — Mirror stream: replicates messages from a source stream + // ------------------------------------------------------------------------- + [Fact] + public async Task Stream_Mirror() + { + await using var client = fixture.CreateClient(); + await client.ConnectAsync(); + var js = new NatsJSContext(client); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + var suffix = Random.Shared.Next(100000); + var sourceName = $"E2E_MIRROR_SRC_{suffix}"; + var mirrorName = $"E2E_MIRROR_DST_{suffix}"; + + await js.CreateStreamAsync(new StreamConfig(sourceName, [$"js.mirror.{suffix}.>"]), cts.Token); + + // Create mirror BEFORE publishing so the replication coordinator captures all messages + await js.CreateStreamAsync( + new StreamConfig(mirrorName, []) + { + Mirror = new StreamSource { Name = sourceName }, + }, + cts.Token); + + for (var i = 0; i < 5; i++) + await js.PublishAsync($"js.mirror.{suffix}.{i}", $"msg{i}", cancellationToken: cts.Token); + + // Poll until replication completes; outer 30s CTS is the deadline + INatsJSStream mirror; + do + { + mirror = await js.GetStreamAsync(mirrorName, cancellationToken: cts.Token); + if (mirror.Info.State.Messages == 5L) break; + await Task.Yield(); + } while (!cts.IsCancellationRequested); + + mirror.Info.State.Messages.ShouldBe(5L); + } + + // ------------------------------------------------------------------------- + // Test 18 — Source stream: aggregate stream pulls from a source stream + // ------------------------------------------------------------------------- + [Fact] + public async Task Stream_Source() + { + await using var client = fixture.CreateClient(); + await client.ConnectAsync(); + var js = new NatsJSContext(client); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + var suffix = Random.Shared.Next(100000); + var srcName = $"E2E_SOURCE_SRC_{suffix}"; + var aggName = $"E2E_SOURCE_AGG_{suffix}"; + + await js.CreateStreamAsync(new StreamConfig(srcName, [$"js.source.{suffix}.>"]), cts.Token); + + // Create aggregate stream BEFORE publishing so the source coordinator captures all messages + await js.CreateStreamAsync( + new StreamConfig(aggName, []) + { + Sources = [new StreamSource { Name = srcName }], + }, + cts.Token); + + for (var i = 0; i < 5; i++) + await js.PublishAsync($"js.source.{suffix}.{i}", $"msg{i}", cancellationToken: cts.Token); + + // Poll until sourcing completes; outer 30s CTS is the deadline + INatsJSStream agg; + do + { + agg = await js.GetStreamAsync(aggName, cancellationToken: cts.Token); + if (agg.Info.State.Messages == 5L) break; + await Task.Yield(); + } while (!cts.IsCancellationRequested); + + agg.Info.State.Messages.ShouldBe(5L); + } } diff --git a/tests/NATS.E2E.Tests/ShutdownDrainTests.cs b/tests/NATS.E2E.Tests/ShutdownDrainTests.cs new file mode 100644 index 0000000..47b24fb --- /dev/null +++ b/tests/NATS.E2E.Tests/ShutdownDrainTests.cs @@ -0,0 +1,72 @@ +using NATS.Client.Core; +using NATS.E2E.Tests.Infrastructure; + +namespace NATS.E2E.Tests; + +public class ShutdownDrainTests +{ + [Fact] + public async Task ClientDrain_CompletesInFlightMessages() + { + await using var server = new NatsServerProcess(); + await server.StartAsync(); + + await using var pub = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{server.Port}" }); + var sub = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{server.Port}" }); + await using var _ = sub; + + await pub.ConnectAsync(); + await sub.ConnectAsync(); + + await using var subscription = await sub.SubscribeCoreAsync("e2e.drain.>"); + await sub.PingAsync(); + + for (var i = 0; i < 10; i++) + await pub.PublishAsync($"e2e.drain.{i}", $"msg{i}"); + await pub.PingAsync(); + + var received = new List(); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + + for (var i = 0; i < 10; i++) + { + var msg = await subscription.Msgs.ReadAsync(cts.Token); + received.Add(msg.Data); + } + + received.Count.ShouldBe(10); + + await sub.DisposeAsync(); + } + + [Fact] + public async Task ServerShutdown_ClientDetectsDisconnection() + { + var server = new NatsServerProcess(); + await server.StartAsync(); + + await using var client = new NatsConnection(new NatsOpts + { + Url = $"nats://127.0.0.1:{server.Port}", + MaxReconnectRetry = 0, + }); + await client.ConnectAsync(); + await client.PingAsync(); + + client.ConnectionState.ShouldBe(NatsConnectionState.Open); + + var disconnected = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + client.ConnectionDisconnected += (_, _) => + { + disconnected.TrySetResult(); + return ValueTask.CompletedTask; + }; + + await server.DisposeAsync(); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + await disconnected.Task.WaitAsync(cts.Token); + + client.ConnectionState.ShouldNotBe(NatsConnectionState.Open); + } +}