Files
natsdotnet/tests/NATS.E2E.Tests/JetStreamTests.cs
Joseph Doherty c30e67a69d Fix E2E test gaps and add comprehensive E2E + parity test suites
- Fix pull consumer fetch: send original stream subject in HMSG (not inbox)
  so NATS client distinguishes data messages from control messages
- Fix MaxAge expiry: add background timer in StreamManager for periodic pruning
- Fix JetStream wire format: Go-compatible anonymous objects with string enums,
  proper offset-based pagination for stream/consumer list APIs
- Add 42 E2E black-box tests (core messaging, auth, TLS, accounts, JetStream)
- Add ~1000 parity tests across all subsystems (gaps closure)
- Update gap inventory docs to reflect implementation status
2026-03-12 14:09:23 -04:00

296 lines
12 KiB
C#

using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
using NATS.E2E.Tests.Infrastructure;
namespace NATS.E2E.Tests;
[Collection("E2E-JetStream")]
public class JetStreamTests(JetStreamServerFixture fixture)
{
// -------------------------------------------------------------------------
// Test 1 — Create a stream and verify its reported info matches config
// -------------------------------------------------------------------------
[Fact]
public async Task Stream_CreateAndInfo()
{
await using var client = fixture.CreateClient();
await client.ConnectAsync();
var js = new NatsJSContext(client);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var stream = await js.CreateStreamAsync(
new StreamConfig("E2E_CREATE", ["js.create.>"]),
cts.Token);
stream.Info.Config.Name.ShouldBe("E2E_CREATE");
stream.Info.Config.Subjects.ShouldNotBeNull();
stream.Info.Config.Subjects.ShouldContain("js.create.>");
}
// -------------------------------------------------------------------------
// Test 2 — List streams and verify all created streams appear
// -------------------------------------------------------------------------
[Fact]
public async Task Stream_ListAndNames()
{
await using var client = fixture.CreateClient();
await client.ConnectAsync();
var js = new NatsJSContext(client);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
await js.CreateStreamAsync(new StreamConfig("E2E_LIST_A", ["js.list.a.>"]), cts.Token);
await js.CreateStreamAsync(new StreamConfig("E2E_LIST_B", ["js.list.b.>"]), cts.Token);
await js.CreateStreamAsync(new StreamConfig("E2E_LIST_C", ["js.list.c.>"]), cts.Token);
var names = new List<string>();
await foreach (var stream in js.ListStreamsAsync(cancellationToken: cts.Token))
{
var name = stream.Info.Config.Name;
if (name is not null)
names.Add(name);
}
names.ShouldContain("E2E_LIST_A");
names.ShouldContain("E2E_LIST_B");
names.ShouldContain("E2E_LIST_C");
}
// -------------------------------------------------------------------------
// Test 3 — Delete a stream and verify it is gone
// -------------------------------------------------------------------------
[Fact]
public async Task Stream_Delete()
{
await using var client = fixture.CreateClient();
await client.ConnectAsync();
var js = new NatsJSContext(client);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
await js.CreateStreamAsync(new StreamConfig("E2E_DEL", ["js.del.>"]), cts.Token);
await js.DeleteStreamAsync("E2E_DEL", cts.Token);
await Should.ThrowAsync<NatsJSApiException>(async () =>
await js.GetStreamAsync("E2E_DEL", cancellationToken: cts.Token));
}
// -------------------------------------------------------------------------
// Test 4 — Publish messages and verify stream state reflects them
// -------------------------------------------------------------------------
[Fact]
public async Task Stream_PublishAndGet()
{
await using var client = fixture.CreateClient();
await client.ConnectAsync();
var js = new NatsJSContext(client);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
// Use unique stream name to avoid contamination from parallel tests
var streamName = $"E2E_PUB_{Random.Shared.Next(100000)}";
await js.CreateStreamAsync(new StreamConfig(streamName, [$"js.pub.{streamName}.>"]), cts.Token);
await js.PublishAsync($"js.pub.{streamName}.one", "msg1", cancellationToken: cts.Token);
await js.PublishAsync($"js.pub.{streamName}.two", "msg2", cancellationToken: cts.Token);
await js.PublishAsync($"js.pub.{streamName}.three", "msg3", cancellationToken: cts.Token);
var stream = await js.GetStreamAsync(streamName, cancellationToken: cts.Token);
stream.Info.State.Messages.ShouldBe(3L);
}
// -------------------------------------------------------------------------
// Test 5 — Purge a stream and verify message count drops to zero
// -------------------------------------------------------------------------
[Fact]
public async Task Stream_Purge()
{
await using var client = fixture.CreateClient();
await client.ConnectAsync();
var js = new NatsJSContext(client);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
await js.CreateStreamAsync(new StreamConfig("E2E_PURGE", ["js.purge.>"]), cts.Token);
for (var i = 0; i < 5; i++)
await js.PublishAsync($"js.purge.msg{i}", $"data{i}", cancellationToken: cts.Token);
var stream = await js.GetStreamAsync("E2E_PURGE", cancellationToken: cts.Token);
await stream.PurgeAsync(new StreamPurgeRequest(), cts.Token);
var refreshed = await js.GetStreamAsync("E2E_PURGE", cancellationToken: cts.Token);
refreshed.Info.State.Messages.ShouldBe(0L);
}
// -------------------------------------------------------------------------
// Test 6 — Create a durable pull consumer and fetch messages
// -------------------------------------------------------------------------
[Fact]
public async Task Consumer_CreatePullAndConsume()
{
await using var client = fixture.CreateClient();
await client.ConnectAsync();
var js = new NatsJSContext(client);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
await js.CreateStreamAsync(new StreamConfig("E2E_PULL", ["js.pull.>"]), cts.Token);
for (var i = 0; i < 5; i++)
await js.PublishAsync($"js.pull.msg{i}", $"payload{i}", cancellationToken: cts.Token);
await js.CreateOrUpdateConsumerAsync("E2E_PULL",
new ConsumerConfig { Name = "pull-consumer", AckPolicy = ConsumerConfigAckPolicy.Explicit },
cts.Token);
var consumer = await js.GetConsumerAsync("E2E_PULL", "pull-consumer", cts.Token);
var received = new List<string?>();
await foreach (var msg in consumer.FetchAsync<string>(new NatsJSFetchOpts { MaxMsgs = 5 }, cancellationToken: cts.Token))
{
received.Add(msg.Data);
await msg.AckAsync(cancellationToken: cts.Token);
}
received.Count.ShouldBe(5);
}
// -------------------------------------------------------------------------
// Test 7 — Explicit ack: fetching after ack yields no further messages
// -------------------------------------------------------------------------
[Fact]
public async Task Consumer_AckExplicit()
{
await using var client = fixture.CreateClient();
await client.ConnectAsync();
var js = new NatsJSContext(client);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
await js.CreateStreamAsync(new StreamConfig("E2E_ACK", ["js.ack.>"]), cts.Token);
await js.PublishAsync("js.ack.one", "hello", cancellationToken: cts.Token);
await js.CreateOrUpdateConsumerAsync("E2E_ACK",
new ConsumerConfig { Name = "ack-consumer", AckPolicy = ConsumerConfigAckPolicy.Explicit },
cts.Token);
var consumer = await js.GetConsumerAsync("E2E_ACK", "ack-consumer", cts.Token);
// Fetch and ack the single message
await foreach (var msg in consumer.FetchAsync<string>(new NatsJSFetchOpts { MaxMsgs = 1 }, cancellationToken: cts.Token))
await msg.AckAsync(cancellationToken: cts.Token);
// Second fetch should return nothing
var second = new List<string?>();
await foreach (var msg in consumer.FetchAsync<string>(new NatsJSFetchOpts { MaxMsgs = 1, Expires = TimeSpan.FromSeconds(1) }, cancellationToken: cts.Token))
second.Add(msg.Data);
second.Count.ShouldBe(0);
}
// -------------------------------------------------------------------------
// Test 8 — List consumers, delete one, verify count drops
// -------------------------------------------------------------------------
[Fact]
public async Task Consumer_ListAndDelete()
{
await using var client = fixture.CreateClient();
await client.ConnectAsync();
var js = new NatsJSContext(client);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
await js.CreateStreamAsync(new StreamConfig("E2E_CONS_LIST", ["js.conslist.>"]), cts.Token);
await js.CreateOrUpdateConsumerAsync("E2E_CONS_LIST",
new ConsumerConfig { Name = "cons-one", AckPolicy = ConsumerConfigAckPolicy.None },
cts.Token);
await js.CreateOrUpdateConsumerAsync("E2E_CONS_LIST",
new ConsumerConfig { Name = "cons-two", AckPolicy = ConsumerConfigAckPolicy.None },
cts.Token);
var beforeNames = new List<string>();
await foreach (var c in js.ListConsumersAsync("E2E_CONS_LIST", cts.Token))
{
var name = c.Info.Name;
if (name is not null)
beforeNames.Add(name);
}
beforeNames.Count.ShouldBe(2);
await js.DeleteConsumerAsync("E2E_CONS_LIST", "cons-one", cts.Token);
var afterNames = new List<string>();
await foreach (var c in js.ListConsumersAsync("E2E_CONS_LIST", cts.Token))
{
var name = c.Info.Name;
if (name is not null)
afterNames.Add(name);
}
afterNames.Count.ShouldBe(1);
afterNames.ShouldContain("cons-two");
}
// -------------------------------------------------------------------------
// Test 9 — MaxMsgs retention evicts oldest messages when limit is reached
// -------------------------------------------------------------------------
[Fact]
public async Task Retention_LimitsMaxMessages()
{
await using var client = fixture.CreateClient();
await client.ConnectAsync();
var js = new NatsJSContext(client);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
await js.CreateStreamAsync(
new StreamConfig("E2E_MAXMSGS", ["js.maxmsgs.>"])
{
MaxMsgs = 10,
},
cts.Token);
for (var i = 0; i < 15; i++)
await js.PublishAsync($"js.maxmsgs.{i}", $"val{i}", cancellationToken: cts.Token);
var stream = await js.GetStreamAsync("E2E_MAXMSGS", cancellationToken: cts.Token);
stream.Info.State.Messages.ShouldBe(10L);
}
// -------------------------------------------------------------------------
// Test 10 — MaxAge retention expires messages after the configured window
// -------------------------------------------------------------------------
[Fact]
public async Task Retention_MaxAge()
{
await using var client = fixture.CreateClient();
await client.ConnectAsync();
var js = new NatsJSContext(client);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await js.CreateStreamAsync(
new StreamConfig("E2E_MAXAGE", ["js.maxage.>"])
{
MaxAge = TimeSpan.FromSeconds(2),
},
cts.Token);
for (var i = 0; i < 5; i++)
await js.PublishAsync($"js.maxage.{i}", $"val{i}", cancellationToken: cts.Token);
var before = await js.GetStreamAsync("E2E_MAXAGE", cancellationToken: cts.Token);
before.Info.State.Messages.ShouldBe(5L);
await Task.Delay(3000, cts.Token);
var after = await js.GetStreamAsync("E2E_MAXAGE", cancellationToken: cts.Token);
after.Info.State.Messages.ShouldBe(0L);
}
}