Files
Joseph Doherty 78b4bc2486 refactor: extract NATS.Server.JetStream.Tests project
Move 225 JetStream-related test files from NATS.Server.Tests into a
dedicated NATS.Server.JetStream.Tests project. This includes root-level
JetStream*.cs files, storage test files (FileStore, MemStore,
StreamStoreContract), and the full JetStream/ subfolder tree (Api,
Cluster, Consumers, MirrorSource, Snapshots, Storage, Streams).

Updated all namespaces, added InternalsVisibleTo, registered in the
solution file, and added the JETSTREAM_INTEGRATION_MATRIX define.
2026-03-12 15:58:10 -04:00

1683 lines
69 KiB
C#

// Go reference: golang/nats-server/server/jetstream_consumer_test.go
// Ports Go pull consumer queue, state, and filter tests to .NET unit tests.
// Tests that require a full NATS networking server are marked [Fact(Skip=...)]
// with the reason "Requires full NATS server infrastructure".
using System.Text;
using NATS.Server.JetStream;
using NATS.Server.JetStream.Consumers;
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Storage;
using NATS.Server.Subscriptions;
namespace NATS.Server.JetStream.Tests.JetStream.Consumers;
/// <summary>
/// Go parity tests ported from jetstream_consumer_test.go covering pull consumer
/// queue behavior, MaxAckPending enforcement, filter semantics, pending count
/// calculations, and pull request ordering.
/// </summary>
public class ConsumerPullQueueTests
{
// =========================================================================
// Helpers
// =========================================================================
private static StreamHandle MakeStream(MemStore store, string name = "TEST", params string[] subjects)
{
var config = new StreamConfig
{
Name = name,
Subjects = subjects.Length > 0 ? [..subjects] : ["test.>"],
};
return new StreamHandle(config, store);
}
private static ConsumerHandle MakeConsumer(ConsumerConfig? config = null, string stream = "TEST")
=> new(stream, config ?? new ConsumerConfig
{
DurableName = "C1",
AckPolicy = AckPolicy.Explicit,
});
private static async Task AppendAsync(MemStore store, string subject, string? payload = null)
=> await store.AppendAsync(subject,
payload is not null ? Encoding.UTF8.GetBytes(payload) : ReadOnlyMemory<byte>.Empty,
CancellationToken.None);
// =========================================================================
// TestJetStreamConsumerPullMaxAckPending (jetstream_consumer_test.go:5083)
// =========================================================================
[Fact]
public async Task PullMaxAckPending_BatchLimitedToMaxAckPending()
{
// Go: TestJetStreamConsumerPullMaxAckPending (jetstream_consumer_test.go:5083)
// A pull consumer with MaxAckPending=33 should not deliver more than 33
// messages in a single batch, even when batch > maxAckPending.
var store = new MemStore();
var stream = MakeStream(store);
for (int i = 0; i < 100; i++)
await AppendAsync(store, "test.bar", $"MSG: {i + 1}");
const int maxAckPending = 33;
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "d22",
AckPolicy = AckPolicy.Explicit,
MaxAckPending = maxAckPending,
});
var engine = new PullConsumerEngine();
// Fetch 100 — should be capped at maxAckPending+1 (check-after-add semantics)
var batch = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 100 }, CancellationToken.None);
batch.Messages.Count.ShouldBeLessThanOrEqualTo(maxAckPending + 1);
batch.Messages.Count.ShouldBeGreaterThan(0);
// After acking all, a subsequent fetch should again be allowed up to maxAckPending
foreach (var msg in batch.Messages)
consumer.AckProcessor.AckSequence(msg.Sequence);
var batch2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = maxAckPending }, CancellationToken.None);
batch2.Messages.Count.ShouldBeLessThanOrEqualTo(maxAckPending + 1);
batch2.Messages.Count.ShouldBeGreaterThan(0);
}
[Fact]
public async Task PullMaxAckPending_BlocksWhenPendingAtMax()
{
// Go: TestJetStreamConsumerPullMaxAckPending (jetstream_consumer_test.go:5083)
// With MaxAckPending=5 and 10 messages, a fetch of 10 should only return ~5.
var store = new MemStore();
var stream = MakeStream(store);
for (int i = 0; i < 10; i++)
await AppendAsync(store, "test.foo", $"msg-{i}");
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "d",
AckPolicy = AckPolicy.Explicit,
MaxAckPending = 5,
});
var engine = new PullConsumerEngine();
var batch = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None);
// Should stop at MaxAckPending+1 due to check-after-register
batch.Messages.Count.ShouldBeLessThanOrEqualTo(6);
batch.Messages.Count.ShouldBeGreaterThan(0);
}
// =========================================================================
// TestJetStreamConsumerPullConsumerFIFO (jetstream_consumer_test.go:6422)
// =========================================================================
[Fact]
public async Task PullConsumerFIFO_MessagesDeliveredInSequenceOrder()
{
// Go: TestJetStreamConsumerPullConsumerFIFO (jetstream_consumer_test.go:6422)
// Messages must be delivered in FIFO (sequence) order.
var store = new MemStore();
var stream = MakeStream(store, "T", "T");
for (int i = 1; i <= 10; i++)
await AppendAsync(store, "T", $"msg-{i}");
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "d",
AckPolicy = AckPolicy.Explicit,
});
var engine = new PullConsumerEngine();
var batch = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None);
batch.Messages.Count.ShouldBe(10);
for (int i = 0; i < batch.Messages.Count; i++)
{
var payload = Encoding.UTF8.GetString(batch.Messages[i].Payload.Span);
payload.ShouldBe($"msg-{i + 1}");
}
}
// =========================================================================
// TestJetStreamConsumerPullConsumerOneShotOnMaxAckLimit (6479)
// =========================================================================
[Fact]
public async Task PullOneShotOnMaxAckLimit_NoMoreMessagesAfterLimit()
{
// Go: TestJetStreamConsumerPullConsumerOneShotOnMaxAckLimit (jetstream_consumer_test.go:6479)
// When MaxAckPending is reached, subsequent fetches should return empty.
var store = new MemStore();
var stream = MakeStream(store, "T", "T");
for (int i = 0; i < 10; i++)
await AppendAsync(store, "T", "OK");
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "d",
AckPolicy = AckPolicy.Explicit,
MaxAckPending = 3,
});
var engine = new PullConsumerEngine();
// First fetch fills up ack pending
var batch1 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5 }, CancellationToken.None);
batch1.Messages.Count.ShouldBeLessThanOrEqualTo(4);
batch1.Messages.Count.ShouldBeGreaterThan(0);
// Pending is now full — further fetch returns empty
var batch2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5 }, CancellationToken.None);
batch2.Messages.Count.ShouldBe(0);
}
// =========================================================================
// TestJetStreamConsumerPullMaxWaiting (jetstream_consumer_test.go:8539)
// =========================================================================
[Fact]
public void PullMaxWaiting_QueueCapacityIsRespected()
{
// Go: TestJetStreamConsumerPullMaxWaiting (jetstream_consumer_test.go:8539)
// PullRequestWaitQueue with maxWaiting=3 should reject the 4th enqueue.
var queue = new PullRequestWaitQueue(maxSize: 3);
queue.Enqueue(new PullWaitingRequest { Batch = 1, Reply = "r1" }).ShouldBeTrue();
queue.Enqueue(new PullWaitingRequest { Batch = 1, Reply = "r2" }).ShouldBeTrue();
queue.Enqueue(new PullWaitingRequest { Batch = 1, Reply = "r3" }).ShouldBeTrue();
queue.Enqueue(new PullWaitingRequest { Batch = 1, Reply = "r4" }).ShouldBeFalse();
queue.Count.ShouldBe(3);
}
[Fact]
public void PullMaxWaiting_DefaultQueueIsUnbounded()
{
// Go: TestJetStreamConsumerPullMaxWaitingOfOne (jetstream_consumer_test.go:8446)
// Default queue (no max) accepts many requests.
var queue = new PullRequestWaitQueue();
for (int i = 0; i < 100; i++)
queue.Enqueue(new PullWaitingRequest { Batch = 1, Reply = $"r{i}" }).ShouldBeTrue();
queue.Count.ShouldBe(100);
}
// =========================================================================
// TestJetStreamConsumerPullOneShotBehavior (jetstream_consumer_test.go:9043)
// =========================================================================
[Fact]
public async Task PullOneShotBehavior_NoWait_EmptyStream_ReturnsEmpty()
{
// Go: TestJetStreamConsumerPullOneShotBehavior (jetstream_consumer_test.go:9043)
// NoWait=true on an empty stream returns immediately with empty batch.
var store = new MemStore();
var stream = MakeStream(store, "TEST", "foo");
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "dlc",
AckPolicy = AckPolicy.Explicit,
FilterSubject = "foo",
});
var engine = new PullConsumerEngine();
var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest
{
Batch = 1,
NoWait = true,
}, CancellationToken.None);
result.Messages.Count.ShouldBe(0);
result.TimedOut.ShouldBeFalse();
}
[Fact]
public async Task PullOneShotBehavior_NoWait_WithMessages_ReturnsMessages()
{
// Go: TestJetStreamConsumerPullOneShotBehavior (jetstream_consumer_test.go:9043)
// NoWait=true with available messages returns them immediately.
var store = new MemStore();
var stream = MakeStream(store, "TEST", "foo");
for (int i = 0; i < 5; i++)
await AppendAsync(store, "foo", "HELLO");
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "dlc",
AckPolicy = AckPolicy.Explicit,
FilterSubject = "foo",
});
var engine = new PullConsumerEngine();
var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest
{
Batch = 3,
NoWait = true,
}, CancellationToken.None);
result.Messages.Count.ShouldBe(3);
}
// =========================================================================
// TestJetStreamConsumerPullTimeout (jetstream_consumer_test.go:9465)
// =========================================================================
[Fact]
public async Task PullTimeout_WithExpires_TimesOutWhenNoMessages()
{
// Go: TestJetStreamConsumerPullTimeout (jetstream_consumer_test.go:9465)
// A pull request with Expires and no messages returns timed out.
var store = new MemStore();
var stream = MakeStream(store);
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "pr",
AckPolicy = AckPolicy.Explicit,
});
var engine = new PullConsumerEngine();
var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest
{
Batch = 200,
ExpiresMs = 50,
}, CancellationToken.None);
result.TimedOut.ShouldBeTrue();
result.Messages.Count.ShouldBe(0);
}
[Fact]
public async Task PullTimeout_WithExpiresAndMessages_ReturnsPartialBatch()
{
// Go: TestJetStreamConsumerPullTimeout (jetstream_consumer_test.go:9465)
// When messages trickle in, partial batch is returned on timeout.
var store = new MemStore();
var stream = MakeStream(store);
await AppendAsync(store, "test.a", "one");
await AppendAsync(store, "test.b", "two");
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "pr",
AckPolicy = AckPolicy.Explicit,
});
var engine = new PullConsumerEngine();
var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest
{
Batch = 100,
ExpiresMs = 100,
}, CancellationToken.None);
// Gets 2 available messages, then times out waiting for more
result.Messages.Count.ShouldBe(2);
}
// =========================================================================
// TestJetStreamConsumerPullMaxBytes (jetstream_consumer_test.go:9522)
// =========================================================================
[Fact]
public async Task PullMaxBytes_LimitsDeliveredMessagesByByteSize()
{
// Go: TestJetStreamConsumerPullMaxBytes (jetstream_consumer_test.go:9522)
// MaxBytes caps the total payload bytes returned per pull request.
var store = new MemStore();
var stream = MakeStream(store, "TEST", "TEST");
// Each message ~100 bytes payload + subject
var payload = new string('Z', 100);
for (int i = 0; i < 10; i++)
await AppendAsync(store, "TEST", payload);
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "pr",
AckPolicy = AckPolicy.Explicit,
});
var engine = new PullConsumerEngine();
// MaxBytes = 250 → should deliver at most 2 messages (each ~104 bytes total)
var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest
{
Batch = 10,
MaxBytes = 250,
NoWait = true,
}, CancellationToken.None);
result.Messages.Count.ShouldBeLessThanOrEqualTo(2);
result.Messages.Count.ShouldBeGreaterThanOrEqualTo(1);
}
[Fact]
public async Task PullMaxBytes_BatchLimitOverridesWhenSmaller()
{
// Go: TestJetStreamConsumerPullMaxBytes (jetstream_consumer_test.go:9579)
// When batch=1 and MaxBytes is very large, batch limit controls.
var store = new MemStore();
var stream = MakeStream(store, "TEST", "TEST");
var payload = new string('Z', 1000);
for (int i = 0; i < 5; i++)
await AppendAsync(store, "TEST", payload);
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "pr",
AckPolicy = AckPolicy.Explicit,
});
var engine = new PullConsumerEngine();
var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest
{
Batch = 1,
MaxBytes = 10_000_000,
NoWait = true,
}, CancellationToken.None);
result.Messages.Count.ShouldBe(1);
}
[Fact]
public async Task PullMaxBytes_MultipleMessagesWithinBudget()
{
// Go: TestJetStreamConsumerPullMaxBytes (jetstream_consumer_test.go:9599)
// batch=5 with large MaxBytes returns 5 messages.
var store = new MemStore();
var stream = MakeStream(store, "TEST", "TEST");
var payload = new string('Z', 50);
for (int i = 0; i < 10; i++)
await AppendAsync(store, "TEST", payload);
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "pr",
AckPolicy = AckPolicy.Explicit,
});
var engine = new PullConsumerEngine();
var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest
{
Batch = 5,
MaxBytes = 10_000_000,
NoWait = true,
}, CancellationToken.None);
result.Messages.Count.ShouldBe(5);
}
[Fact]
public async Task PullMaxBytes_LargerBatchLimitedByMaxBytes()
{
// Go: TestJetStreamConsumerPullMaxBytes (jetstream_consumer_test.go:9618)
// Large batch limited to MaxBytes/messageSize messages.
var store = new MemStore();
var stream = MakeStream(store, "TEST", "TEST");
// msz~=100_000, dsz=99_950; MaxBytes = msz * 4 allows ~4 messages
var payload = new string('Z', 99_950);
for (int i = 0; i < 20; i++)
await AppendAsync(store, "TEST", payload);
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "pr",
AckPolicy = AckPolicy.Explicit,
});
var engine = new PullConsumerEngine();
var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest
{
Batch = 1000,
MaxBytes = 100_000 * 4,
NoWait = true,
}, CancellationToken.None);
result.Messages.Count.ShouldBeLessThanOrEqualTo(5);
result.Messages.Count.ShouldBeGreaterThanOrEqualTo(1);
}
// =========================================================================
// TestJetStreamConsumerPullRemoveInterest (jetstream_consumer_test.go:8367)
// =========================================================================
[Fact]
public async Task PullRemoveInterest_ConsumerWithFilterSkipsNonMatchingMessages()
{
// Go: TestJetStreamConsumerPullRemoveInterest (jetstream_consumer_test.go:8367)
// Consumer with filter subject only receives matching messages.
var store = new MemStore();
var stream = MakeStream(store, "MYS", "MYS");
await AppendAsync(store, "MYS", "unrelated"); // seq 1
await AppendAsync(store, "MYS", "unrelated"); // seq 2
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "worker",
AckPolicy = AckPolicy.Explicit,
FilterSubject = "FILTERED", // doesn't match MYS
});
var engine = new PullConsumerEngine();
var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest
{
Batch = 5,
NoWait = true,
}, CancellationToken.None);
result.Messages.Count.ShouldBe(0);
}
// =========================================================================
// TestJetStreamConsumerPullDelayedFirstPullWithReplayOriginal (3361)
// =========================================================================
[Fact]
public async Task PullDelayedFirstPullWithReplayOriginal_DeliversMessage()
{
// Go: TestJetStreamConsumerPullDelayedFirstPullWithReplayOriginal (jetstream_consumer_test.go:3361)
// ReplayOriginal consumer with delayed first pull still delivers the message.
var store = new MemStore();
var stream = MakeStream(store, "MY_WQ", "MY_WQ");
await AppendAsync(store, "MY_WQ", "Hello World!");
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "d",
AckPolicy = AckPolicy.Explicit,
ReplayPolicy = ReplayPolicy.Original,
});
var engine = new PullConsumerEngine();
// Simulate delay before pull (but keep it short for unit test)
await Task.Delay(50);
var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 1 }, CancellationToken.None);
result.Messages.Count.ShouldBe(1);
Encoding.UTF8.GetString(result.Messages[0].Payload.Span).ShouldBe("Hello World!");
}
// =========================================================================
// TestJetStreamConsumerThreeFilters (jetstream_consumer_test.go:7178)
// =========================================================================
[Fact]
public async Task ThreeFilters_OnlyMatchingSubjectsDelivered()
{
// Go: TestJetStreamConsumerThreeFilters (jetstream_consumer_test.go:7178)
// Consumer with 3 filter subjects ignores messages on non-matching subjects.
var store = new MemStore();
var stream = MakeStream(store, "TEST", "events", "data", "other", "ignored");
await AppendAsync(store, "ignored", "100"); // seq 1
await AppendAsync(store, "events", "0"); // seq 2
await AppendAsync(store, "events", "1"); // seq 3
await AppendAsync(store, "data", "2"); // seq 4
await AppendAsync(store, "ignored", "100"); // seq 5
await AppendAsync(store, "data", "3"); // seq 6
await AppendAsync(store, "other", "4"); // seq 7
await AppendAsync(store, "data", "5"); // seq 8
await AppendAsync(store, "other", "6"); // seq 9
await AppendAsync(store, "data", "7"); // seq 10
await AppendAsync(store, "ignored", "100"); // seq 11
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "multi",
AckPolicy = AckPolicy.Explicit,
FilterSubjects = ["events", "data", "other"],
});
var engine = new PullConsumerEngine();
var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 8 }, CancellationToken.None);
result.Messages.Count.ShouldBe(8);
var payloads = result.Messages.Select(m => Encoding.UTF8.GetString(m.Payload.Span)).ToList();
payloads.ShouldBe(["0", "1", "2", "3", "4", "5", "6", "7"]);
}
[Fact]
public async Task ThreeFilters_FirstFetchDelivers6()
{
// Go: TestJetStreamConsumerThreeFilters (jetstream_consumer_test.go:7178)
// Consumer with 3 filter subjects: fetch 6 returns first 6 matching messages.
var store = new MemStore();
var stream = MakeStream(store, "TEST", "events", "data", "other", "ignored");
await AppendAsync(store, "ignored", "100");
await AppendAsync(store, "events", "0");
await AppendAsync(store, "events", "1");
await AppendAsync(store, "data", "2");
await AppendAsync(store, "ignored", "100");
await AppendAsync(store, "data", "3");
await AppendAsync(store, "other", "4");
await AppendAsync(store, "data", "5");
await AppendAsync(store, "other", "6");
await AppendAsync(store, "data", "7");
await AppendAsync(store, "ignored", "100");
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "multi",
AckPolicy = AckPolicy.Explicit,
FilterSubjects = ["events", "data", "other"],
});
var engine = new PullConsumerEngine();
var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 6 }, CancellationToken.None);
result.Messages.Count.ShouldBe(6);
for (int i = 0; i < 6; i++)
Encoding.UTF8.GetString(result.Messages[i].Payload.Span).ShouldBe(i.ToString());
// Ack all 6
foreach (var msg in result.Messages)
consumer.AckProcessor.AckSequence(msg.Sequence);
// Remaining 2 messages (indices 6 and 7)
var result2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5 }, CancellationToken.None);
result2.Messages.Count.ShouldBe(2);
}
// =========================================================================
// TestJetStreamConsumerUpdateFilterSubjects (jetstream_consumer_test.go:7231)
// =========================================================================
[Fact]
public async Task UpdateFilterSubjects_NewFilterReceivesAdditionalSubject()
{
// Go: TestJetStreamConsumerUpdateFilterSubjects (jetstream_consumer_test.go:7231)
// Consumer with ["events","data"] fetches those messages; updated filter
// ["events","data","other"] on same consumer also fetches the "other" messages
// when starting from beginning.
var store = new MemStore();
var stream = MakeStream(store, "TEST", "events", "data", "other");
await AppendAsync(store, "events", "0"); // seq 1
await AppendAsync(store, "events", "1"); // seq 2
await AppendAsync(store, "data", "2"); // seq 3
await AppendAsync(store, "data", "3"); // seq 4
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "multi",
AckPolicy = AckPolicy.Explicit,
FilterSubjects = ["events", "data"],
});
var engine = new PullConsumerEngine();
var result1 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5 }, CancellationToken.None);
result1.Messages.Count.ShouldBe(4);
for (int i = 0; i < 4; i++)
{
Encoding.UTF8.GetString(result1.Messages[i].Payload.Span).ShouldBe(i.ToString());
consumer.AckProcessor.AckSequence(result1.Messages[i].Sequence);
}
// Add "other" messages after first batch
await AppendAsync(store, "other", "4"); // seq 5
await AppendAsync(store, "data", "5"); // seq 6
// Update consumer filter to include "other"
consumer.Config.FilterSubjects = ["events", "data", "other"];
// Continue fetching from where we left off — should get "other" and "data" messages
var result2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5 }, CancellationToken.None);
result2.Messages.Count.ShouldBe(2);
Encoding.UTF8.GetString(result2.Messages[0].Payload.Span).ShouldBe("4");
Encoding.UTF8.GetString(result2.Messages[1].Payload.Span).ShouldBe("5");
}
// =========================================================================
// TestJetStreamConsumerIsFiltered (jetstream_consumer_test.go:7515)
// =========================================================================
[Theory]
[InlineData(new[] { "one" }, new[] { "one" }, false)]
[InlineData(new[] { "one.>" }, new[] { "one.filter" }, true)]
[InlineData(new[] { "multi", "foo", "bar.>" }, new[] { "multi", "bar.>", "foo" }, false)]
[InlineData(new[] { "events", "data" }, new[] { "data" }, true)]
[InlineData(new[] { "machines", "floors" }, new[] { "machines" }, true)]
public void IsFiltered_CorrectlyDetectsWhetherConsumerFilters(
string[] streamSubjects, string[] filterSubjects, bool expectedIsFiltered)
{
// Go: TestJetStreamConsumerIsFiltered (jetstream_consumer_test.go:7515)
// A consumer is considered filtered if it doesn't receive all stream messages.
var config = new ConsumerConfig
{
FilterSubjects = [..filterSubjects],
};
var filter = CompiledFilter.FromConfig(config);
// A consumer is NOT filtered if all stream subjects match the filter
var allMatch = streamSubjects.All(s => filter.Matches(s));
var isFiltered = !allMatch;
isFiltered.ShouldBe(expectedIsFiltered);
}
// =========================================================================
// TestJetStreamConsumerSingleFilterSubjectInFilterSubjects (11281)
// =========================================================================
[Fact]
public async Task SingleFilterSubjectInFilterSubjects_Works()
{
// Go: TestJetStreamConsumerSingleFilterSubjectInFilterSubjects (jetstream_consumer_test.go:11281)
// FilterSubjects with a single entry behaves identically to FilterSubject.
var store = new MemStore();
var stream = MakeStream(store, "TEST", "foo", "bar");
await AppendAsync(store, "foo", "match");
await AppendAsync(store, "bar", "skip");
await AppendAsync(store, "foo", "match2");
var consumerViaFilterSubjects = MakeConsumer(new ConsumerConfig
{
DurableName = "c1",
FilterSubjects = ["foo"],
});
var consumerViaFilterSubject = MakeConsumer(new ConsumerConfig
{
DurableName = "c2",
FilterSubject = "foo",
});
var engine = new PullConsumerEngine();
var r1 = await engine.FetchAsync(stream, consumerViaFilterSubjects, new PullFetchRequest { Batch = 10 }, CancellationToken.None);
var r2 = await engine.FetchAsync(stream, consumerViaFilterSubject, new PullFetchRequest { Batch = 10 }, CancellationToken.None);
r1.Messages.Count.ShouldBe(r2.Messages.Count);
r1.Messages.Count.ShouldBe(2);
r1.Messages.All(m => m.Subject == "foo").ShouldBeTrue();
r2.Messages.All(m => m.Subject == "foo").ShouldBeTrue();
}
// =========================================================================
// TestJetStreamConsumerMultipleSubjectsLast (jetstream_consumer_test.go:6715)
// =========================================================================
[Fact]
public async Task MultipleSubjectsLast_DeliverPolicyLastWithFilter()
{
// Go: TestJetStreamConsumerMultipleSubjectsLast (jetstream_consumer_test.go:6715)
// DeliverPolicy.Last with FilterSubjects = ["events","data"] starts at last
// message of the stream and delivers from there.
// When the last message in the stream doesn't match the filter, no messages are delivered.
var store = new MemStore();
var stream = MakeStream(store, "name", "events", "data", "other");
await AppendAsync(store, "events", "1"); // seq 1
await AppendAsync(store, "data", "2"); // seq 2
await AppendAsync(store, "other", "3"); // seq 3
await AppendAsync(store, "events", "4"); // seq 4
await AppendAsync(store, "data", "5"); // seq 5
await AppendAsync(store, "data", "6"); // seq 6
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "durable",
AckPolicy = AckPolicy.Explicit,
DeliverPolicy = DeliverPolicy.Last,
FilterSubjects = ["events", "data"],
});
var engine = new PullConsumerEngine();
var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 2 }, CancellationToken.None);
// DeliverPolicy.Last resolves to state.LastSeq = 6 ("data" with payload "6")
// which matches the filter — so it delivers that message.
result.Messages.Count.ShouldBe(1);
result.Messages[0].Subject.ShouldBe("data");
Encoding.UTF8.GetString(result.Messages[0].Payload.Span).ShouldBe("6");
}
// =========================================================================
// TestJetStreamConsumerMultipleSubjectsLastPerSubject (6784)
// =========================================================================
[Fact]
public async Task MultipleSubjectsLastPerSubject_StartsAtLastPerSubject()
{
// Go: TestJetStreamConsumerMultipleSubjectsLastPerSubject (jetstream_consumer_test.go:6784)
// DeliverPolicy.LastPerSubject with FilterSubjects starts at the last msg per subject.
var store = new MemStore();
var stream = MakeStream(store, "name", "events.*", "data.>");
await AppendAsync(store, "events.1", "bad"); // seq 1
await AppendAsync(store, "events.1", "events.1"); // seq 2 — last for events.1
await AppendAsync(store, "data.1", "bad"); // seq 3
await AppendAsync(store, "data.1", "bad"); // seq 4
await AppendAsync(store, "data.1", "data.1"); // seq 5 — last for data.1
await AppendAsync(store, "events.2", "bad"); // seq 6
await AppendAsync(store, "events.2", "events.2"); // seq 7 — last for events.2
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "durable",
AckPolicy = AckPolicy.Explicit,
DeliverPolicy = DeliverPolicy.LastPerSubject,
FilterSubject = "events.1",
});
var engine = new PullConsumerEngine();
var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 3 }, CancellationToken.None);
result.Messages.Count.ShouldBeGreaterThan(0);
// First message should be the last "events.1"
result.Messages[0].Subject.ShouldBe("events.1");
Encoding.UTF8.GetString(result.Messages[0].Payload.Span).ShouldBe("events.1");
}
// =========================================================================
// TestJetStreamConsumerMultipleSubjectsWithEmpty (6911)
// =========================================================================
[Fact]
public async Task MultipleSubjectsWithEmpty_FetchOnEmptyFilteredStreamReturnsEmpty()
{
// Go: TestJetStreamConsumerMultipleSubjectsWithEmpty (jetstream_consumer_test.go:6911)
// Consumer with filter on non-existent subject returns empty.
var store = new MemStore();
var stream = MakeStream(store, "TEST", "foo", "bar");
await AppendAsync(store, "foo", "1");
await AppendAsync(store, "bar", "2");
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "c",
FilterSubjects = ["baz"], // no messages on baz
});
var engine = new PullConsumerEngine();
var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest
{
Batch = 5,
NoWait = true,
}, CancellationToken.None);
result.Messages.Count.ShouldBe(0);
}
// =========================================================================
// TestJetStreamConsumerMultipleSubjectsAck (jetstream_consumer_test.go:6998)
// =========================================================================
[Fact]
public async Task MultipleSubjectsAck_AckFloorAdvancesCorrectly()
{
// Go: TestJetStreamConsumerMultipleSubjectsAck (jetstream_consumer_test.go:6998)
// Acking messages from a multi-filter consumer advances the ack floor correctly.
var store = new MemStore();
var stream = MakeStream(store, "TEST", "foo", "bar", "baz");
await AppendAsync(store, "foo", "f1"); // seq 1
await AppendAsync(store, "bar", "b1"); // seq 2
await AppendAsync(store, "baz", "z1"); // seq 3 — filtered
await AppendAsync(store, "foo", "f2"); // seq 4
await AppendAsync(store, "bar", "b2"); // seq 5
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "c",
AckPolicy = AckPolicy.Explicit,
FilterSubjects = ["foo", "bar"],
});
var engine = new PullConsumerEngine();
var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None);
result.Messages.Count.ShouldBe(4);
// Ack all messages
foreach (var msg in result.Messages)
consumer.AckProcessor.AckSequence(msg.Sequence);
// AckFloor should have advanced past the delivered sequences
consumer.AckProcessor.AckFloor.ShouldBeGreaterThan(0UL);
}
// =========================================================================
// TestJetStreamConsumerMultipleSubjectAndNewAPI (7061)
// =========================================================================
[Fact]
public async Task MultipleSubjectAndNewAPI_FilterSubjectsAndSingleFilterBothWork()
{
// Go: TestJetStreamConsumerMultipleSubjectAndNewAPI (jetstream_consumer_test.go:7061)
// Both FilterSubject and FilterSubjects produce the same results for single filter.
var store = new MemStore();
var stream = MakeStream(store, "TEST", "foo.>", "bar");
await AppendAsync(store, "foo.1", "a");
await AppendAsync(store, "bar", "b");
await AppendAsync(store, "foo.2", "c");
var consumerA = MakeConsumer(new ConsumerConfig
{
DurableName = "cA",
FilterSubjects = ["foo.>"],
});
var consumerB = MakeConsumer(new ConsumerConfig
{
DurableName = "cB",
FilterSubject = "foo.>",
});
var engine = new PullConsumerEngine();
var rA = await engine.FetchAsync(stream, consumerA, new PullFetchRequest { Batch = 5 }, CancellationToken.None);
var rB = await engine.FetchAsync(stream, consumerB, new PullFetchRequest { Batch = 5 }, CancellationToken.None);
rA.Messages.Count.ShouldBe(rB.Messages.Count);
rA.Messages.Count.ShouldBe(2);
}
// =========================================================================
// TestJetStreamConsumerMultipleSubjectsWithAddedMessages (7099)
// =========================================================================
[Fact]
public async Task MultipleSubjectsWithAddedMessages_NewMessagesDeliveredOnSubsequentFetch()
{
// Go: TestJetStreamConsumerMultipleSubjectsWithAddedMessages (jetstream_consumer_test.go:7099)
// After first fetch, newly appended messages are returned on the next fetch.
var store = new MemStore();
var stream = MakeStream(store, "TEST", "events", "data");
await AppendAsync(store, "events", "e1");
await AppendAsync(store, "data", "d1");
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "c",
AckPolicy = AckPolicy.Explicit,
FilterSubjects = ["events", "data"],
});
var engine = new PullConsumerEngine();
var r1 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None);
r1.Messages.Count.ShouldBe(2);
foreach (var msg in r1.Messages)
consumer.AckProcessor.AckSequence(msg.Sequence);
// Add more messages after first fetch
await AppendAsync(store, "events", "e2");
await AppendAsync(store, "data", "d2");
var r2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None);
r2.Messages.Count.ShouldBe(2);
Encoding.UTF8.GetString(r2.Messages[0].Payload.Span).ShouldBe("e2");
Encoding.UTF8.GetString(r2.Messages[1].Payload.Span).ShouldBe("d2");
}
// =========================================================================
// TestJetStreamConsumerBadNumPending (jetstream_consumer_test.go:5263)
// =========================================================================
[Fact]
public async Task BadNumPending_MultipleConsumers_NoPendingAfterFullDelivery()
{
// Go: TestJetStreamConsumerBadNumPending (jetstream_consumer_test.go:5263)
// After all messages are consumed and acked, pending count is 0.
var store = new MemStore();
var stream = MakeStream(store, "ORDERS", "orders.*");
for (int i = 0; i < 10; i++)
await AppendAsync(store, "orders.created", "NEW");
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "c",
AckPolicy = AckPolicy.Explicit,
});
var engine = new PullConsumerEngine();
var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 100 }, CancellationToken.None);
result.Messages.Count.ShouldBe(10);
foreach (var msg in result.Messages)
consumer.AckProcessor.AckSequence(msg.Sequence);
consumer.AckProcessor.PendingCount.ShouldBe(0);
consumer.AckProcessor.AckFloor.ShouldBe(10UL);
}
// =========================================================================
// TestJetStreamConsumerPendingCountWithRedeliveries (5775)
// =========================================================================
[Fact]
public async Task PendingCountWithRedeliveries_UnackedMsgRedeliveredOnExpiry()
{
// Go: TestJetStreamConsumerPendingCountWithRedeliveries (jetstream_consumer_test.go:5775)
// A message that is not acked within AckWait is redelivered.
var store = new MemStore();
var stream = MakeStream(store, "TEST", "foo");
await AppendAsync(store, "foo", "msg1");
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "test",
AckPolicy = AckPolicy.Explicit,
AckWaitMs = 50,
MaxDeliver = 2,
});
var engine = new PullConsumerEngine();
// First fetch — delivers msg1, starts ack timer
var r1 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 1 }, CancellationToken.None);
r1.Messages.Count.ShouldBe(1);
Encoding.UTF8.GetString(r1.Messages[0].Payload.Span).ShouldBe("msg1");
// Do not ack
// Pending count should be 1 (awaiting ack)
consumer.AckProcessor.PendingCount.ShouldBe(1);
// Wait for ack to expire
await Task.Delay(100);
// Add a second message to simulate the Go test scenario
await AppendAsync(store, "foo", "msg2");
// Second fetch should trigger redelivery of expired message
var r2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 1 }, CancellationToken.None);
r2.Messages.Count.ShouldBe(1);
r2.Messages[0].Redelivered.ShouldBeTrue();
// After redelivery, ack and check pending = 0
consumer.AckProcessor.AckSequence(r2.Messages[0].Sequence);
consumer.AckProcessor.PendingCount.ShouldBe(0);
}
// =========================================================================
// TestJetStreamConsumerPendingCountAfterMsgAckAboveFloor (8311)
// =========================================================================
[Fact]
public async Task PendingCountAfterMsgAckAboveFloor_AckLastMsgClearsCount()
{
// Go: TestJetStreamConsumerPendingCountAfterMsgAckAboveFloor (jetstream_consumer_test.go:8311)
// Acking the last fetched message (above ack floor) reduces pending to 0.
var store = new MemStore();
var stream = MakeStream(store, "TEST", "foo");
await AppendAsync(store, "foo"); // seq 1
await AppendAsync(store, "foo"); // seq 2
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "CONSUMER",
AckPolicy = AckPolicy.Explicit,
});
var engine = new PullConsumerEngine();
var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 2 }, CancellationToken.None);
result.Messages.Count.ShouldBe(2);
// Ack only the second (last) message
consumer.AckProcessor.AckSequence(result.Messages[1].Sequence);
// Pending count = 1 (first message still unacked)
consumer.AckProcessor.PendingCount.ShouldBe(1);
// Ack the first message too
consumer.AckProcessor.AckSequence(result.Messages[0].Sequence);
consumer.AckProcessor.PendingCount.ShouldBe(0);
}
// =========================================================================
// TestJetStreamConsumerDontDecrementPendingCountOnSkippedMsg (8239)
// =========================================================================
[Fact]
public async Task DontDecrementPendingOnSkippedMsg_FilteredOutMessagesNotCounted()
{
// Go: TestJetStreamConsumerDontDecrementPendingCountOnSkippedMsg (jetstream_consumer_test.go:8239)
// Messages that don't match the filter are skipped and don't affect pending count.
var store = new MemStore();
var stream = MakeStream(store, "TEST", "foo", "bar");
await AppendAsync(store, "foo", "match"); // seq 1
await AppendAsync(store, "bar", "skip"); // seq 2
await AppendAsync(store, "foo", "match2"); // seq 3
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "CONSUMER",
AckPolicy = AckPolicy.Explicit,
FilterSubject = "foo",
});
var engine = new PullConsumerEngine();
var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None);
result.Messages.Count.ShouldBe(2);
result.Messages.All(m => m.Subject == "foo").ShouldBeTrue();
consumer.AckProcessor.PendingCount.ShouldBe(2);
// The "bar" message was skipped — pending count only reflects delivered messages
consumer.AckProcessor.PendingCount.ShouldBe(result.Messages.Count);
}
// =========================================================================
// TestJetStreamConsumerNumPendingWithMaxPerSubjectGreaterThanOne (6282)
// =========================================================================
[Fact]
public async Task NumPendingWithMaxPerSubject_LastPerSubjectConsumerCountsCorrectly()
{
// Go: TestJetStreamConsumerNumPendingWithMaxPerSubjectGreaterThanOne (jetstream_consumer_test.go:6282)
// DeliverLastPerSubject consumer with filtered subject counts the last msg per subject.
var store = new MemStore();
var stream = MakeStream(store, "TEST", "KV.*.*");
await AppendAsync(store, "KV.plans.foo", "OK");
await AppendAsync(store, "KV.plans.bar", "OK");
await AppendAsync(store, "KV.plans.baz", "OK");
// These should be filtered out by the consumer
await AppendAsync(store, "KV.config.foo", "OK");
await AppendAsync(store, "KV.config.bar", "OK");
await AppendAsync(store, "KV.config.baz", "OK");
// Double up plans
await AppendAsync(store, "KV.plans.bar", "OK2");
await AppendAsync(store, "KV.plans.baz", "OK2");
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "d",
AckPolicy = AckPolicy.Explicit,
DeliverPolicy = DeliverPolicy.LastPerSubject,
FilterSubject = "KV.plans.*",
});
var engine = new PullConsumerEngine();
// For LastPerSubject with "KV.plans.*", we should get the last message per subject
// (KV.plans.foo seq1, KV.plans.bar seq7, KV.plans.baz seq8)
var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None);
// Result must only contain KV.plans.* subjects
result.Messages.All(m => m.Subject.StartsWith("KV.plans.")).ShouldBeTrue();
}
// =========================================================================
// TestJetStreamConsumerPendingLowerThanStreamFirstSeq (6565)
// =========================================================================
[Fact]
public async Task PendingLowerThanStreamFirstSeq_ConsumerSkipsAheadWhenSequenceGap()
{
// Go: TestJetStreamConsumerPendingLowerThanStreamFirstSeq (jetstream_consumer_test.go:6565)
// When consumer's next sequence is lower than stream's first seq after compaction,
// the consumer should still function correctly.
var store = new MemStore();
var stream = MakeStream(store, "TEST", "foo");
for (int i = 0; i < 10; i++)
await AppendAsync(store, "foo", "msg");
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "dur",
AckPolicy = AckPolicy.Explicit,
DeliverPolicy = DeliverPolicy.All,
});
var engine = new PullConsumerEngine();
var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None);
result.Messages.Count.ShouldBe(10);
foreach (var msg in result.Messages)
consumer.AckProcessor.AckSequence(msg.Sequence);
consumer.AckProcessor.AckFloor.ShouldBe(10UL);
// Simulate consumer starting at a sequence before available messages
consumer.NextSequence = 5;
// After resetting next to already-acked sequences, they should be skipped
var result2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None);
result2.Messages.Count.ShouldBe(0); // All sequences <= AckFloor are skipped
}
// =========================================================================
// TestJetStreamConsumerInfoNumPending (jetstream_consumer_test.go:8195)
// =========================================================================
[Fact]
public async Task InfoNumPending_PendingCountMatchesAvailableMessages()
{
// Go: TestJetStreamConsumerInfoNumPending (jetstream_consumer_test.go:8195)
// NumPending reflects the count of messages available for delivery.
var store = new MemStore();
var stream = MakeStream(store, "LIMITS", "js.in.limits");
for (int i = 0; i < 100; i++)
await AppendAsync(store, "js.in.limits", "x");
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "PULL",
AckPolicy = AckPolicy.Explicit,
});
// Before any fetch, pending = number of messages in stream
var state = await stream.Store.GetStateAsync(CancellationToken.None);
state.Messages.ShouldBe(100UL);
var engine = new PullConsumerEngine();
var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 50 }, CancellationToken.None);
result.Messages.Count.ShouldBe(50);
consumer.AckProcessor.PendingCount.ShouldBe(50);
}
// =========================================================================
// TestJetStreamConsumerEfficientInterestStateCheck (10532)
// =========================================================================
[Fact]
public async Task EfficientInterestStateCheck_LargeSequenceGapHandledEfficiently()
{
// Go: TestJetStreamConsumerEfficientInterestStateCheck (jetstream_consumer_test.go:10532)
// A consumer with a large gap in acknowledged sequences should still function
// without excessive computation. Unit test validates skipping below AckFloor.
var store = new MemStore();
var stream = MakeStream(store, "TEST", "foo");
await AppendAsync(store, "foo", "msg1"); // seq 1
await AppendAsync(store, "foo", "msg2"); // seq 2
// Simulate a large gap in sequences by advancing the ack floor
await AppendAsync(store, "foo", "msg3"); // seq 3 (after gap)
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "CONSUMER",
AckPolicy = AckPolicy.Explicit,
});
var engine = new PullConsumerEngine();
var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 3 }, CancellationToken.None);
result.Messages.Count.ShouldBe(3);
// Ack only the first message; others remain pending
consumer.AckProcessor.AckSequence(1);
// Manually set ack floor to simulate a large gap being already acked
consumer.AckProcessor.AckSequence(2);
consumer.AckProcessor.AckSequence(3);
// AckFloor should have advanced
consumer.AckProcessor.AckFloor.ShouldBe(3UL);
consumer.AckProcessor.PendingCount.ShouldBe(0);
}
// =========================================================================
// TestJetStreamConsumerOnlyRecalculatePendingIfFilterSubjectUpdated (10694)
// =========================================================================
[Fact]
public async Task OnlyRecalculatePendingIfFilterUpdated_SameConfigDoesNotReset()
{
// Go: TestJetStreamConsumerOnlyRecalculatePendingIfFilterSubjectUpdated (10694)
// Updating consumer with same config should not reset state.
// Updating with new filter should apply from current position.
var store = new MemStore();
var stream = MakeStream(store, "TEST", "foo");
await AppendAsync(store, "foo", "msg1");
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "DURABLE",
});
var engine = new PullConsumerEngine();
var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 1 }, CancellationToken.None);
result.Messages.Count.ShouldBe(1);
var sequenceBefore = consumer.NextSequence;
// Simulating "same config update" — no change to filter, NextSequence unchanged
// In our engine the consumer's config state is just modified in place
consumer.Config.FilterSubject = null; // still no filter
var result2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5 }, CancellationToken.None);
consumer.NextSequence.ShouldBeGreaterThanOrEqualTo(sequenceBefore);
// Update filter to "foo"
consumer.Config.FilterSubject = "foo";
// Reset sequence to test from beginning
consumer.NextSequence = 1;
var result3 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5 }, CancellationToken.None);
result3.Messages.Count.ShouldBe(1);
result3.Messages.All(m => m.Subject == "foo").ShouldBeTrue();
}
// =========================================================================
// TestJetStreamConsumerAllowOverlappingSubjectsIfNotSubset (10804)
// =========================================================================
[Fact]
public async Task AllowOverlappingSubjects_TwoPartiallyOverlappingFilters_NoDuplicates()
{
// Go: TestJetStreamConsumerAllowOverlappingSubjectsIfNotSubset (jetstream_consumer_test.go:10804)
// Two overlapping wildcard filters (event.foo.* and event.*.foo) deliver
// all matching messages without duplication (7 unique subjects).
var store = new MemStore();
var stream = MakeStream(store, "TEST", "event.>");
var parts = new[] { "foo", "bar", "baz", "oth" };
foreach (var start in parts)
foreach (var end in parts)
await AppendAsync(store, $"event.{start}.{end}");
// Total: 16 messages published
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "DURABLE",
FilterSubjects = ["event.foo.*", "event.*.foo"],
});
var engine = new PullConsumerEngine();
var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 20 }, CancellationToken.None);
// event.foo.X (4 subjects) + event.Y.foo where Y != foo (3 subjects) = 7
result.Messages.Count.ShouldBe(7);
// No duplicates
var subjects = result.Messages.Select(m => m.Subject).ToList();
subjects.Distinct().Count().ShouldBe(7);
// All match at least one filter
var filter = new CompiledFilter(["event.foo.*", "event.*.foo"]);
result.Messages.All(m => filter.Matches(m.Subject)).ShouldBeTrue();
}
// =========================================================================
// TestSortingConsumerPullRequests (jetstream_consumer_test.go:10031)
// =========================================================================
[Fact]
public void SortingConsumerPullRequests_BasicPriorityOrdering()
{
// Go: TestSortingConsumerPullRequests (jetstream_consumer_test.go:10031)
// Lower Priority value = higher precedence (earlier in queue).
var queue = new PullRequestWaitQueue(100);
queue.Enqueue(new PullWaitingRequest { Priority = 1, Reply = null });
queue.Enqueue(new PullWaitingRequest { Priority = 2, Reply = null });
queue.Enqueue(new PullWaitingRequest { Priority = 1, Reply = null });
queue.Enqueue(new PullWaitingRequest { Priority = 3, Reply = null });
var expectedPriorities = new[] { 1, 1, 2, 3 };
for (int i = 0; i < expectedPriorities.Length; i++)
{
var item = queue.Dequeue();
item.ShouldNotBeNull();
item!.Priority.ShouldBe(expectedPriorities[i]);
}
}
[Fact]
public void SortingConsumerPullRequests_StableOrderWithinSamePriority()
{
// Go: TestSortingConsumerPullRequests (jetstream_consumer_test.go:10054) "test if sort is stable"
// Items with the same priority maintain insertion order (stable sort).
var queue = new PullRequestWaitQueue(100);
queue.Enqueue(new PullWaitingRequest { Priority = 1, Reply = "1a" });
queue.Enqueue(new PullWaitingRequest { Priority = 2, Reply = "2a" });
queue.Enqueue(new PullWaitingRequest { Priority = 1, Reply = "1b" });
queue.Enqueue(new PullWaitingRequest { Priority = 2, Reply = "2b" });
queue.Enqueue(new PullWaitingRequest { Priority = 1, Reply = "1c" });
queue.Enqueue(new PullWaitingRequest { Priority = 3, Reply = "3a" });
queue.Enqueue(new PullWaitingRequest { Priority = 2, Reply = "2c" });
var expected = new[] { "1a", "1b", "1c", "2a", "2b", "2c", "3a" };
for (int i = 0; i < expected.Length; i++)
{
var item = queue.Dequeue();
item.ShouldNotBeNull();
item!.Reply.ShouldBe(expected[i]);
}
}
// =========================================================================
// TestWaitQueuePopAndRequeue (jetstream_consumer_test.go:10109)
// =========================================================================
[Fact]
public void WaitQueuePopAndRequeue_BasicRequeueWithBatches()
{
// Go: TestWaitQueuePopAndRequeue (jetstream_consumer_test.go:10109) "basic requeue with batches"
// popAndRequeue decrements n and requeues the item at head until n=0.
var queue = new PullRequestWaitQueue(100);
// 9 items, 3 per priority group, each with n=3
string[] priorities = ["1", "2", "3"];
string[] letters = ["a", "b", "c"];
foreach (var p in priorities)
foreach (var l in letters)
queue.Enqueue(new PullWaitingRequest
{
Priority = int.Parse(p),
Reply = $"{p}{l}",
Batch = 3,
RemainingBatch = 3,
});
// Expected round-robin behavior: 1a,1b,1c pop once each per round; after 3 rounds all are gone
int i = 0;
int j = 0; // priority group index (0=1, 1=2, 2=3)
while (true)
{
var wr1 = queue.PopAndRequeue();
wr1.ShouldNotBeNull();
wr1!.Reply.ShouldBe($"{j + 1}a");
var wr2 = queue.PopAndRequeue();
wr2.ShouldNotBeNull();
wr2!.Reply.ShouldBe($"{j + 1}b");
var wr3 = queue.PopAndRequeue();
wr3.ShouldNotBeNull();
wr3!.Reply.ShouldBe($"{j + 1}c");
i++;
if (i % 3 == 0)
j++;
// Count should match 9 - (j * 3)
queue.Count.ShouldBe(9 - (j * 3));
if (j == 2)
break;
}
}
[Fact]
public void WaitQueuePopAndRequeue_RequestRemovedWhenFullyServed()
{
// Go: TestWaitQueuePopAndRequeue (jetstream_consumer_test.go:10152) "request removal when fully served"
var queue = new PullRequestWaitQueue(100);
queue.Enqueue(new PullWaitingRequest { Priority = 1, Reply = "1a", RemainingBatch = 2 });
queue.Enqueue(new PullWaitingRequest { Priority = 1, Reply = "1b", RemainingBatch = 1 });
queue.Enqueue(new PullWaitingRequest { Priority = 2, Reply = "2a", RemainingBatch = 3 });
var initialCount = queue.Count;
initialCount.ShouldBe(3);
// Pop 1a first time (n=2 -> n=1, requeued)
var wr = queue.PopAndRequeue();
wr.ShouldNotBeNull();
wr!.Reply.ShouldBe("1a");
wr.RemainingBatch.ShouldBe(1);
queue.Count.ShouldBe(initialCount); // still 3
// Pop 1b (n=1 -> n=0, removed)
wr = queue.PopAndRequeue();
wr.ShouldNotBeNull();
wr!.Reply.ShouldBe("1b");
wr.RemainingBatch.ShouldBe(0);
queue.Count.ShouldBe(initialCount - 1); // now 2
// Pop 1a second time (n=1 -> n=0, removed)
wr = queue.PopAndRequeue();
wr.ShouldNotBeNull();
wr!.Reply.ShouldBe("1a");
wr.RemainingBatch.ShouldBe(0);
queue.Count.ShouldBe(initialCount - 2); // now 1
// Only 2a should remain
var next = queue.Peek();
next.ShouldNotBeNull();
next!.Reply.ShouldBe("2a");
next.RemainingBatch.ShouldBe(3);
}
// =========================================================================
// TestJetStreamConsumerStateAlwaysFromStore (jetstream_consumer_test.go:9796)
// =========================================================================
[Fact]
public async Task ConsumerStateAlwaysFromStore_DeliveredAndAckFloorTrack()
{
// Go: TestJetStreamConsumerStateAlwaysFromStore (jetstream_consumer_test.go:9796)
// Consumer delivered/ackFloor state tracks correctly through fetch and ack cycles.
var store = new MemStore();
var stream = MakeStream(store, "TEST", "foo.>");
await AppendAsync(store, "foo.bar"); // seq 1 — filtered
await AppendAsync(store, "foo.other"); // seq 2 — not filtered
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "CONSUMER",
AckPolicy = AckPolicy.Explicit,
FilterSubject = "foo.bar",
});
var engine = new PullConsumerEngine();
// Initial state: no delivered, no ack floor
consumer.AckProcessor.AckFloor.ShouldBe(0UL);
consumer.AckProcessor.PendingCount.ShouldBe(0);
// Fetch — should deliver only foo.bar (seq 1)
var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5 }, CancellationToken.None);
result.Messages.Count.ShouldBe(1);
result.Messages[0].Subject.ShouldBe("foo.bar");
// Delivered but not acked — pending = 1, floor = 0
consumer.AckProcessor.PendingCount.ShouldBe(1);
consumer.AckProcessor.AckFloor.ShouldBe(0UL);
// Ack the message — floor advances to seq 1
consumer.AckProcessor.AckSequence(result.Messages[0].Sequence);
consumer.AckProcessor.PendingCount.ShouldBe(0);
consumer.AckProcessor.AckFloor.ShouldBe(result.Messages[0].Sequence);
}
// =========================================================================
// TestJetStreamConsumerCheckNumPending (jetstream_consumer_test.go:10742)
// =========================================================================
[Fact]
public async Task CheckNumPending_PendingReflectsMessagesFromCurrentSequence()
{
// Go: TestJetStreamConsumerCheckNumPending (jetstream_consumer_test.go:10742)
// NumPending = messages in stream from consumer's current sequence onward.
var store = new MemStore();
var stream = MakeStream(store, "TEST", "foo");
for (int i = 0; i < 5; i++)
await AppendAsync(store, "foo");
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "DURABLE",
});
var engine = new PullConsumerEngine();
// Initial: consumer at seq 1, 5 messages pending
// Fetch 2 messages
var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 2 }, CancellationToken.None);
result.Messages.Count.ShouldBe(2);
// NextSequence now points past fetched messages
consumer.NextSequence.ShouldBe(3UL);
// 3 messages remain to be delivered
var state = await stream.Store.GetStateAsync(CancellationToken.None);
var remaining = state.LastSeq - consumer.NextSequence + 1;
remaining.ShouldBe(3UL);
}
[Fact]
public async Task CheckNumPending_ExcessiveSequenceReturnsZero()
{
// Go: TestJetStreamConsumerCheckNumPending (jetstream_consumer_test.go:10782)
// A sequence past the end of stream reports 0 pending.
var store = new MemStore();
var stream = MakeStream(store, "TEST", "foo");
for (int i = 0; i < 5; i++)
await AppendAsync(store, "foo");
var consumer = MakeConsumer(new ConsumerConfig { DurableName = "DURABLE" });
consumer.NextSequence = 100; // well past end of stream
var engine = new PullConsumerEngine();
var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest
{
Batch = 5,
NoWait = true,
}, CancellationToken.None);
result.Messages.Count.ShouldBe(0);
}
// =========================================================================
// TestJetStreamConsumerMultipleFiltersRace (jetstream_consumer_test.go:93)
// =========================================================================
[Fact]
public async Task MultipleFiltersRace_ConcurrentFetchAllMessagesDelivered()
{
// Go: TestJetStreamConsumerMultipleFiltersRace (jetstream_consumer_test.go:93)
// Multiple concurrent fetches with filter subjects deliver all matching messages.
var store = new MemStore();
var stream = MakeStream(store, "TEST", "foo.>", "bar.>");
// Publish 20 messages on foo.* and 20 on bar.*
for (int i = 0; i < 20; i++)
{
await AppendAsync(store, $"foo.{i}", $"f{i}");
await AppendAsync(store, $"bar.{i}", $"b{i}");
}
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "c",
AckPolicy = AckPolicy.Explicit,
FilterSubjects = ["foo.>", "bar.>"],
});
var engine = new PullConsumerEngine();
var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 40 }, CancellationToken.None);
result.Messages.Count.ShouldBe(40);
// Verify filter applied correctly
var filter = new CompiledFilter(["foo.>", "bar.>"]);
result.Messages.All(m => filter.Matches(m.Subject)).ShouldBeTrue();
}
// =========================================================================
// TestJetStreamConsumerMultipleFitersWithStartDate (jetstream_consumer_test.go:2533)
// =========================================================================
[Fact]
public async Task MultipleFitersWithStartDate_ByStartTimeOnlyDeliversNewEnoughMessages()
{
// Go: TestJetStreamConsumerMultipleFitersWithStartDate (jetstream_consumer_test.go:2533)
// DeliverPolicy.ByStartTime with a cut-off time only delivers messages after that time.
var store = new MemStore();
var stream = MakeStream(store, "TEST", "events", "data");
var cutoff = DateTime.UtcNow.AddMilliseconds(-50);
// Append old message (before cutoff — at least conceptually; MemStore records current time)
// In unit test we simulate by appending before the cutoff datetime we pass
await AppendAsync(store, "events", "old");
await AppendAsync(store, "data", "old");
await Task.Delay(10); // ensure new messages have newer timestamp
// Append new messages after our reference point
await AppendAsync(store, "events", "new1");
await AppendAsync(store, "data", "new2");
var startTime = DateTime.UtcNow.AddMilliseconds(-5); // very recent
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "c",
AckPolicy = AckPolicy.Explicit,
DeliverPolicy = DeliverPolicy.ByStartTime,
OptStartTimeUtc = startTime,
FilterSubjects = ["events", "data"],
});
var engine = new PullConsumerEngine();
var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None);
// Only messages at or after startTime should be delivered
result.Messages.All(m => filter_matches_events_data(m.Subject)).ShouldBeTrue();
result.Messages.Count.ShouldBeGreaterThanOrEqualTo(0);
static bool filter_matches_events_data(string s) => s == "events" || s == "data";
}
// =========================================================================
// TestJetStreamConsumerFilterUpdate (jetstream_consumer_test.go:7397)
// =========================================================================
[Fact]
public async Task FilterUpdate_ChangingFilterSubjectChangesDeliveredMessages()
{
// Go: TestJetStreamConsumerFilterUpdate (jetstream_consumer_test.go:7397)
// Changing FilterSubject on a consumer config changes which messages are delivered.
var store = new MemStore();
var stream = MakeStream(store, "TEST", "foo", "bar");
await AppendAsync(store, "foo", "f1");
await AppendAsync(store, "bar", "b1");
await AppendAsync(store, "foo", "f2");
await AppendAsync(store, "bar", "b2");
var consumer = MakeConsumer(new ConsumerConfig
{
DurableName = "c",
AckPolicy = AckPolicy.Explicit,
FilterSubject = "foo",
});
var engine = new PullConsumerEngine();
var r1 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None);
r1.Messages.Count.ShouldBe(2);
r1.Messages.All(m => m.Subject == "foo").ShouldBeTrue();
foreach (var msg in r1.Messages)
consumer.AckProcessor.AckSequence(msg.Sequence);
// Update filter to "bar" — consumer starts fresh
consumer.Config.FilterSubject = "bar";
consumer.NextSequence = 1;
var r2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None);
r2.Messages.Count.ShouldBe(2);
r2.Messages.All(m => m.Subject == "bar").ShouldBeTrue();
}
}