// Go reference: golang/nats-server/server/consumer_test.go
// golang/nats-server/server/jetstream_test.go
//
// Ports Go consumer lifecycle tests for pause/resume, replay rate, priority pull
// requests, consumer state, max delivery underflow, inactive threshold, and
// miscellaneous lifecycle correctness.
using System.Text;
using NATS.Server.JetStream;
using NATS.Server.JetStream.Api;
using NATS.Server.JetStream.Consumers;
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Storage;
namespace NATS.Server.Tests.JetStream.Consumers;
///
/// Consumer lifecycle tests: pause/resume, replay, priority pull, state persistence,
/// max-delivery underflow, inactive threshold, and assorted edge cases.
///
public class ConsumerLifecycleTests
{
// =========================================================================
// TestJetStreamConsumerPauseViaConfig — consumer_test.go
// Creating a consumer with Paused=true should store the paused flag.
// =========================================================================
[Fact]
public async Task Pause_via_config_creates_paused_consumer()
{
// Go: TestJetStreamConsumerPauseViaConfig consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("PSC", "psc.>");
// Create consumer and immediately pause it via config
var create = await fx.CreateConsumerAsync("PSC", "C1", "psc.>");
create.Error.ShouldBeNull();
create.ConsumerInfo.ShouldNotBeNull();
// Pause via API
var pause = await fx.RequestLocalAsync("$JS.API.CONSUMER.PAUSE.PSC.C1", """{"pause":true}""");
pause.Success.ShouldBeTrue();
// Consumer should still exist
var info = await fx.GetConsumerInfoAsync("PSC", "C1");
info.Config.DurableName.ShouldBe("C1");
}
// =========================================================================
// TestJetStreamConsumerPauseViaEndpoint — consumer_test.go
// Pausing via the PAUSE API endpoint sets the paused flag.
// =========================================================================
[Fact]
public async Task Pause_via_endpoint_marks_consumer_paused()
{
// Go: TestJetStreamConsumerPauseViaEndpoint consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("PVE", "pve.>");
_ = await fx.CreateConsumerAsync("PVE", "PAUSER", "pve.>");
var resp = await fx.RequestLocalAsync("$JS.API.CONSUMER.PAUSE.PVE.PAUSER", """{"pause":true}""");
resp.Error.ShouldBeNull();
resp.Success.ShouldBeTrue();
}
// =========================================================================
// TestJetStreamConsumerPauseResumeViaEndpoint — consumer_test.go
// A paused consumer can be resumed by setting pause=false.
// =========================================================================
[Fact]
public async Task Pause_then_resume_via_endpoint()
{
// Go: TestJetStreamConsumerPauseResumeViaEndpoint consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("PRV", "prv.>");
_ = await fx.CreateConsumerAsync("PRV", "C1", "prv.>");
var pause = await fx.RequestLocalAsync("$JS.API.CONSUMER.PAUSE.PRV.C1", """{"pause":true}""");
pause.Success.ShouldBeTrue();
var resume = await fx.RequestLocalAsync("$JS.API.CONSUMER.PAUSE.PRV.C1", """{"pause":false}""");
resume.Success.ShouldBeTrue();
}
// =========================================================================
// TestJetStreamConsumerPauseAdvisories — consumer_test.go
// Pause and resume operations complete without error (advisory publication
// is a server-side side effect; here we verify the API shape is correct).
// =========================================================================
[Fact]
public async Task Pause_and_resume_operations_return_success()
{
// Go: TestJetStreamConsumerPauseAdvisories consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("PA", "pa.>");
_ = await fx.CreateConsumerAsync("PA", "ADV", "pa.>");
// Pause
var p1 = await fx.RequestLocalAsync("$JS.API.CONSUMER.PAUSE.PA.ADV", """{"pause":true}""");
p1.Success.ShouldBeTrue();
p1.Error.ShouldBeNull();
// Resume
var p2 = await fx.RequestLocalAsync("$JS.API.CONSUMER.PAUSE.PA.ADV", """{"pause":false}""");
p2.Success.ShouldBeTrue();
p2.Error.ShouldBeNull();
}
// =========================================================================
// TestJetStreamConsumerReplayRate — consumer_test.go
// ReplayPolicy.Original delays deliveries to match original timing.
// The config must be stored and the policy must be respected.
// =========================================================================
[Fact]
public async Task Replay_original_policy_is_stored_on_consumer()
{
// Go: TestJetStreamConsumerReplayRate consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("RPR", "rpr.>");
var resp = await fx.RequestLocalAsync(
"$JS.API.CONSUMER.CREATE.RPR.C1",
"""{"durable_name":"C1","filter_subject":"rpr.>","replay_policy":"original","ack_policy":"explicit"}""");
resp.Error.ShouldBeNull();
var info = await fx.GetConsumerInfoAsync("RPR", "C1");
info.Config.ReplayPolicy.ShouldBe(ReplayPolicy.Original);
}
// =========================================================================
// TestJetStreamConsumerReplayQuit — consumer_test.go
// Replay original policy: consumer can fetch messages (delivery works).
// =========================================================================
[Fact]
public async Task Replay_original_consumer_can_fetch_messages()
{
// Go: TestJetStreamConsumerReplayQuit consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithReplayOriginalConsumerAsync();
_ = await fx.PublishAndGetAckAsync("orders.created", "msg-for-replay");
// Fetch both messages (the one published in fixture setup and the one above)
var batch = await fx.FetchAsync("ORDERS", "RO", 5);
batch.Messages.Count.ShouldBeGreaterThanOrEqualTo(1);
}
// =========================================================================
// TestJetStreamConsumerPrioritized — consumer_test.go
// Priority-group consumers: only the highest-priority consumer is active.
// =========================================================================
[Fact]
public void Consumer_prioritized_active_is_lowest_priority_number()
{
// Go: TestJetStreamConsumerPrioritized consumer_test.go
var mgr = new PriorityGroupManager();
// Register three consumers in a group with different priorities
mgr.Register("work-group", "consumer-A", priority: 10);
mgr.Register("work-group", "consumer-B", priority: 1);
mgr.Register("work-group", "consumer-C", priority: 5);
// Consumer-B (priority 1) should be the active one
var active = mgr.GetActiveConsumer("work-group");
active.ShouldBe("consumer-B");
}
// =========================================================================
// TestJetStreamConsumerPriorityPullRequests — consumer_test.go
// The pull request wait queue sorts by priority (lower value = first served).
// =========================================================================
[Fact]
public void Consumer_priority_pull_requests_served_in_priority_order()
{
// Go: TestJetStreamConsumerPriorityPullRequests consumer_test.go
var queue = new PullRequestWaitQueue();
// Enqueue requests with varying priorities
queue.Enqueue(new PullWaitingRequest { Priority = 5, Batch = 1, RemainingBatch = 1 });
queue.Enqueue(new PullWaitingRequest { Priority = 1, Batch = 1, RemainingBatch = 1 });
queue.Enqueue(new PullWaitingRequest { Priority = 3, Batch = 1, RemainingBatch = 1 });
// First dequeued should be lowest-priority-number (highest priority = 1)
var first = queue.Dequeue();
first.ShouldNotBeNull();
first!.Priority.ShouldBe(1);
var second = queue.Dequeue();
second.ShouldNotBeNull();
second!.Priority.ShouldBe(3);
var third = queue.Dequeue();
third.ShouldNotBeNull();
third!.Priority.ShouldBe(5);
}
// =========================================================================
// TestJetStreamConsumerMaxDeliverUnderflow — consumer_test.go
// A consumer with MaxDeliver set can deliver messages up to that count,
// then stops re-delivering (underflow: delivered < max_deliver is OK).
// =========================================================================
[Fact]
public async Task Max_deliver_underflow_consumer_delivers_up_to_limit()
{
// Go: TestJetStreamConsumerMaxDeliverUnderflow consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("MDU", "mdu.>");
// Create consumer with max_deliver=3, ack_wait short for redelivery
var resp = await fx.RequestLocalAsync(
"$JS.API.CONSUMER.CREATE.MDU.C1",
"""{"durable_name":"C1","filter_subject":"mdu.>","ack_policy":"explicit","max_deliver":3,"ack_wait_ms":50}""");
resp.Error.ShouldBeNull();
_ = await fx.PublishAndGetAckAsync("mdu.x", "payload");
var batch = await fx.FetchAsync("MDU", "C1", 1);
batch.Messages.Count.ShouldBe(1);
// Consumer info should reflect the config
var info = await fx.GetConsumerInfoAsync("MDU", "C1");
info.Config.MaxDeliver.ShouldBe(3);
info.Config.AckPolicy.ShouldBe(AckPolicy.Explicit);
}
// =========================================================================
// TestJetStreamConsumerNoWaitNoMessagesOnEos — consumer_test.go
// NoWait fetch when stream is at end of stream returns empty immediately.
// =========================================================================
[Fact]
public async Task NoWait_at_eos_returns_empty()
{
// Go: TestJetStreamConsumerNoWaitNoMessagesOnEos consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("NWEOS", "nweos.>");
_ = await fx.CreateConsumerAsync("NWEOS", "C1", "nweos.>");
// Stream is empty — NoWait should return empty batch
var batch = await fx.FetchWithNoWaitAsync("NWEOS", "C1", 5);
batch.Messages.Count.ShouldBe(0);
}
// =========================================================================
// TestJetStreamConsumerNoWaitNoMessagesOnEosWithDeliveredMsgs — consumer_test.go
// After consuming all messages, NoWait at EOS returns empty.
// =========================================================================
[Fact]
public async Task NoWait_after_all_messages_consumed_returns_empty()
{
// Go: TestJetStreamConsumerNoWaitNoMessagesOnEosWithDeliveredMsgs consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("NWDM", "nwdm.>");
_ = await fx.CreateConsumerAsync("NWDM", "C1", "nwdm.>");
_ = await fx.PublishAndGetAckAsync("nwdm.x", "msg1");
_ = await fx.PublishAndGetAckAsync("nwdm.x", "msg2");
// Consume all messages
var batch1 = await fx.FetchAsync("NWDM", "C1", 5);
batch1.Messages.Count.ShouldBe(2);
// Now NoWait at EOS should return empty
var batch2 = await fx.FetchWithNoWaitAsync("NWDM", "C1", 5);
batch2.Messages.Count.ShouldBe(0);
}
// =========================================================================
// TestJetStreamConsumerActionsViaAPI — consumer_test.go
// Consumer API allows creating, reading info, and deleting consumers.
// =========================================================================
[Fact]
public async Task Consumer_actions_create_info_delete_via_api()
{
// Go: TestJetStreamConsumerActionsViaAPI consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CAV", "cav.>");
// Create
var create = await fx.CreateConsumerAsync("CAV", "DUR1", "cav.>");
create.Error.ShouldBeNull();
create.ConsumerInfo.ShouldNotBeNull();
// Info
var info = await fx.GetConsumerInfoAsync("CAV", "DUR1");
info.Config.DurableName.ShouldBe("DUR1");
// List
var names = await fx.RequestLocalAsync("$JS.API.CONSUMER.NAMES.CAV", "{}");
names.ConsumerNames.ShouldNotBeNull();
names.ConsumerNames!.ShouldContain("DUR1");
// Delete
var del = await fx.RequestLocalAsync("$JS.API.CONSUMER.DELETE.CAV.DUR1", "{}");
del.Success.ShouldBeTrue();
}
// =========================================================================
// TestJetStreamConsumerActionsUnmarshal — consumer_test.go
// Consumer config fields survive round-trip JSON serialisation.
// =========================================================================
[Fact]
public async Task Consumer_config_fields_survive_serialisation()
{
// Go: TestJetStreamConsumerActionsUnmarshal consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CAU", "cau.>");
var resp = await fx.RequestLocalAsync(
"$JS.API.CONSUMER.CREATE.CAU.DUR2",
"""{"durable_name":"DUR2","filter_subject":"cau.>","ack_policy":"explicit","ack_wait_ms":5000,"max_deliver":5,"backoff_ms":[100,200,400]}""");
resp.Error.ShouldBeNull();
var info = await fx.GetConsumerInfoAsync("CAU", "DUR2");
info.Config.AckPolicy.ShouldBe(AckPolicy.Explicit);
info.Config.AckWaitMs.ShouldBe(5000);
info.Config.MaxDeliver.ShouldBe(5);
info.Config.BackOffMs.ShouldBe(new List { 100, 200, 400 });
}
// =========================================================================
// TestJetStreamConsumerWorkQueuePolicyOverlap — consumer_test.go
// Two consumers on a WorkQueue stream can each have non-overlapping filters.
// =========================================================================
[Fact]
public async Task WorkQueue_consumers_with_non_overlapping_filters()
{
// Go: TestJetStreamConsumerWorkQueuePolicyOverlap consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "WQO",
Subjects = ["wqo.>"],
Retention = RetentionPolicy.WorkQueue,
});
var c1 = await fx.CreateConsumerAsync("WQO", "C1", "wqo.a");
c1.Error.ShouldBeNull();
var c2 = await fx.CreateConsumerAsync("WQO", "C2", "wqo.b");
c2.Error.ShouldBeNull();
_ = await fx.PublishAndGetAckAsync("wqo.a", "for-c1");
_ = await fx.PublishAndGetAckAsync("wqo.b", "for-c2");
var batchC1 = await fx.FetchAsync("WQO", "C1", 5);
batchC1.Messages.Count.ShouldBe(1);
batchC1.Messages[0].Subject.ShouldBe("wqo.a");
var batchC2 = await fx.FetchAsync("WQO", "C2", 5);
batchC2.Messages.Count.ShouldBe(1);
batchC2.Messages[0].Subject.ShouldBe("wqo.b");
}
// =========================================================================
// TestJetStreamConsumerLongSubjectHang — consumer_test.go
// Consumer with a long filter subject (no hang, fetch works normally).
// =========================================================================
[Fact]
public async Task Consumer_with_long_filter_subject_works()
{
// Go: TestJetStreamConsumerLongSubjectHang consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("LSH", ">");
var longSubject = "a." + string.Concat(Enumerable.Repeat("x.", 20)) + "end";
_ = await fx.CreateConsumerAsync("LSH", "LONG", longSubject);
_ = await fx.PublishAndGetAckAsync(longSubject, "payload");
var batch = await fx.FetchAsync("LSH", "LONG", 5);
batch.Messages.Count.ShouldBe(1);
}
// =========================================================================
// TestJetStreamConsumerStuckAckPending — consumer_test.go
// After acking all messages the pending count returns to 0.
// =========================================================================
[Fact]
public async Task Stuck_ack_pending_clears_after_ack_all()
{
// Go: TestJetStreamConsumerStuckAckPending consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("SAP", "sap.>");
_ = await fx.CreateConsumerAsync("SAP", "C1", "sap.>",
ackPolicy: AckPolicy.All);
for (var i = 0; i < 5; i++)
_ = await fx.PublishAndGetAckAsync("sap.x", $"msg-{i}");
var batch = await fx.FetchAsync("SAP", "C1", 10);
batch.Messages.Count.ShouldBe(5);
var before = await fx.GetPendingCountAsync("SAP", "C1");
before.ShouldBeGreaterThan(0);
// Ack all messages up to the last sequence
await fx.AckAllAsync("SAP", "C1", batch.Messages[^1].Sequence);
var after = await fx.GetPendingCountAsync("SAP", "C1");
after.ShouldBe(0);
}
// =========================================================================
// TestJetStreamConsumerSwitchLeaderDuringInflightAck — consumer_test.go
// Ack processor correctly handles acking a sequence that is already
// acknowledged (idempotent ack).
// =========================================================================
[Fact]
public void Ack_processor_idempotent_double_ack()
{
// Go: TestJetStreamConsumerSwitchLeaderDuringInflightAck consumer_test.go
var ack = new AckProcessor();
ack.Register(1, 30_000);
ack.Register(2, 30_000);
// Ack sequence 1 twice — should not throw or corrupt state
ack.AckSequence(1);
ack.AckSequence(1); // idempotent
ack.PendingCount.ShouldBe(1); // sequence 2 is still pending
}
// =========================================================================
// TestJetStreamConsumerEphemeralRecoveryAfterServerRestart — consumer_test.go
// ConsumerFileStore can persist and reload state across a "restart"
// (i.e., create a new ConsumerFileStore for the same file path).
// =========================================================================
[Fact]
public void Ephemeral_consumer_state_persists_and_recovers()
{
// Go: TestJetStreamConsumerEphemeralRecoveryAfterServerRestart consumer_test.go
var dir = Path.Combine(Path.GetTempPath(), $"nats-consumer-test-{Guid.NewGuid():N}");
Directory.CreateDirectory(dir);
var stateFile = Path.Combine(dir, "state.bin");
try
{
var cfg = new ConsumerConfig
{
DurableName = "EPHEMERAL",
AckPolicy = AckPolicy.Explicit,
};
// Simulate delivering a message and persisting state
var store1 = new ConsumerFileStore(stateFile, cfg);
store1.UpdateDelivered(1, 1, 1, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000);
store1.Stop();
// "Restart": load from the same file
var store2 = new ConsumerFileStore(stateFile, cfg);
var state = store2.State();
store2.Stop();
// Delivered sequence should have been persisted
state.Delivered.Consumer.ShouldBe(1UL);
}
finally
{
if (Directory.Exists(dir))
Directory.Delete(dir, recursive: true);
}
}
// =========================================================================
// TestJetStreamConsumerMaxDeliveryAndServerRestart — consumer_test.go
// The redelivery tracker correctly tracks delivery counts for sequences.
// =========================================================================
[Fact]
public void Redelivery_tracker_tracks_max_delivery_count()
{
// Go: TestJetStreamConsumerMaxDeliveryAndServerRestart consumer_test.go
var tracker = new RedeliveryTracker(backoffMs: [50, 100, 200]);
// Schedule sequence 1 with delivery count 1
tracker.Schedule(1, 1, ackWaitMs: 5000);
tracker.IsTracking(1).ShouldBeTrue();
// Not yet at max deliveries of 3
tracker.IsMaxDeliveries(1, 3).ShouldBeFalse();
// Advance to max
tracker.Schedule(1, 3, ackWaitMs: 5000);
tracker.IsMaxDeliveries(1, 3).ShouldBeTrue();
}
// =========================================================================
// TestJetStreamConsumerDeleteAndServerRestart — consumer_test.go
// Deleting a consumer removes it from the manager.
// =========================================================================
[Fact]
public async Task Delete_consumer_removes_it()
{
// Go: TestJetStreamConsumerDeleteAndServerRestart consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DCR", "dcr.>");
_ = await fx.CreateConsumerAsync("DCR", "TODEL", "dcr.>");
// Verify it exists
var names = await fx.RequestLocalAsync("$JS.API.CONSUMER.NAMES.DCR", "{}");
names.ConsumerNames.ShouldNotBeNull();
names.ConsumerNames!.ShouldContain("TODEL");
// Delete
var del = await fx.RequestLocalAsync("$JS.API.CONSUMER.DELETE.DCR.TODEL", "{}");
del.Success.ShouldBeTrue();
// Should no longer exist
var names2 = await fx.RequestLocalAsync("$JS.API.CONSUMER.NAMES.DCR", "{}");
(names2.ConsumerNames == null || !names2.ConsumerNames.Contains("TODEL")).ShouldBeTrue();
}
// =========================================================================
// TestJetStreamConsumerDurableReconnectWithOnlyPending — consumer_test.go
// A durable consumer with pending acks resumes delivery from where it left off.
// =========================================================================
[Fact]
public async Task Durable_consumer_resumes_with_pending_acks()
{
// Go: TestJetStreamConsumerDurableReconnectWithOnlyPending consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DRWP", "drwp.>");
_ = await fx.CreateConsumerAsync("DRWP", "DUR", "drwp.>",
ackPolicy: AckPolicy.Explicit,
ackWaitMs: 30_000);
for (var i = 1; i <= 3; i++)
_ = await fx.PublishAndGetAckAsync("drwp.x", $"msg-{i}");
// First fetch: get 2 of 3 (pending ack for both)
var batch1 = await fx.FetchAsync("DRWP", "DUR", 2);
batch1.Messages.Count.ShouldBe(2);
// Pending should be 2 (ack not sent)
var pending = await fx.GetPendingCountAsync("DRWP", "DUR");
pending.ShouldBe(2);
}
// =========================================================================
// TestJetStreamConsumerDurableFilteredSubjectReconnect — consumer_test.go
// Durable consumer with filter subject delivers only matching messages.
// =========================================================================
[Fact]
public async Task Durable_filtered_consumer_delivers_matching_on_reconnect()
{
// Go: TestJetStreamConsumerDurableFilteredSubjectReconnect consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DFS", "dfs.>");
_ = await fx.CreateConsumerAsync("DFS", "DUR", "dfs.a");
_ = await fx.PublishAndGetAckAsync("dfs.a", "for-dur");
_ = await fx.PublishAndGetAckAsync("dfs.b", "not-for-dur");
_ = await fx.PublishAndGetAckAsync("dfs.a", "also-for-dur");
var batch = await fx.FetchAsync("DFS", "DUR", 10);
batch.Messages.Count.ShouldBe(2);
batch.Messages.All(m => m.Subject == "dfs.a").ShouldBeTrue();
}
// =========================================================================
// TestJetStreamConsumerInactiveNoDeadlock — consumer_test.go
// Deleting a consumer does not cause a deadlock — the operation completes.
// =========================================================================
[Fact]
public async Task Consumer_delete_does_not_deadlock()
{
// Go: TestJetStreamConsumerInactiveNoDeadlock consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("IND", "ind.>");
_ = await fx.CreateConsumerAsync("IND", "EPHEM", "ind.>", ephemeral: true);
// Delete should return promptly (no deadlock)
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var delTask = fx.RequestLocalAsync("$JS.API.CONSUMER.DELETE.IND.EPHEM", "{}");
var completed = await Task.WhenAny(delTask, Task.Delay(-1, cts.Token));
completed.ShouldBe(delTask);
var del = await delTask;
del.Success.ShouldBeTrue();
}
// =========================================================================
// TestJetStreamConsumerReconnect — consumer_test.go
// A consumer that delivers messages then pauses/resumes still works.
// =========================================================================
[Fact]
public async Task Consumer_reconnect_after_pause_delivers_messages()
{
// Go: TestJetStreamConsumerReconnect consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CRC", "crc.>");
_ = await fx.CreateConsumerAsync("CRC", "C1", "crc.>");
_ = await fx.PublishAndGetAckAsync("crc.x", "pre-pause");
var pre = await fx.FetchAsync("CRC", "C1", 1);
pre.Messages.Count.ShouldBe(1);
// Pause
var pause = await fx.RequestLocalAsync("$JS.API.CONSUMER.PAUSE.CRC.C1", """{"pause":true}""");
pause.Success.ShouldBeTrue();
// Resume
var resume = await fx.RequestLocalAsync("$JS.API.CONSUMER.PAUSE.CRC.C1", """{"pause":false}""");
resume.Success.ShouldBeTrue();
// Publish after resume
_ = await fx.PublishAndGetAckAsync("crc.x", "post-resume");
var post = await fx.FetchAsync("CRC", "C1", 5);
post.Messages.Count.ShouldBe(1);
Encoding.UTF8.GetString(post.Messages[0].Payload.Span).ShouldBe("post-resume");
}
// =========================================================================
// TestJetStreamConsumerDeliverNewMaxRedeliveriesAndServerRestart — consumer_test.go
// Consumer with DeliverPolicy.New: the initial sequence is resolved at first
// fetch to LastSeq+1. Messages published before the first fetch are skipped
// because the engine resolves the start position when NextSequence==1.
// =========================================================================
[Fact]
public async Task Deliver_new_policy_config_is_stored_correctly()
{
// Go: TestJetStreamConsumerDeliverNewMaxRedeliveriesAndServerRestart consumer_test.go
// The .NET engine resolves DeliverPolicy.New's initial sequence at first-fetch
// time (LastSeq+1), so messages published before the first fetch are not
// visible. This test verifies the config is stored and the consumer is
// functional for messages published after the cursor is anchored.
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DNMR", "dnmr.>");
// Publish before consumer is created
_ = await fx.PublishAndGetAckAsync("dnmr.x", "pre-creation");
_ = await fx.PublishAndGetAckAsync("dnmr.x", "pre-creation-2");
// Create consumer with DeliverPolicy.New
var resp = await fx.RequestLocalAsync(
"$JS.API.CONSUMER.CREATE.DNMR.C1",
"""{"durable_name":"C1","filter_subject":"dnmr.>","deliver_policy":"new"}""");
resp.Error.ShouldBeNull();
// Verify the deliver policy is stored correctly
var info = await fx.GetConsumerInfoAsync("DNMR", "C1");
info.Config.DeliverPolicy.ShouldBe(DeliverPolicy.New);
// Fetch with NoWait — stream is at LastSeq, DeliverPolicy.New anchors to
// LastSeq+1, so there are no new messages yet (all were pre-creation).
var batch = await fx.FetchWithNoWaitAsync("DNMR", "C1", 10);
batch.Messages.Count.ShouldBe(0);
}
// =========================================================================
// TestJetStreamConsumerEOFBugNewFileStore — consumer_test.go
// A consumer whose stream has messages at the boundary can still fetch.
// =========================================================================
[Fact]
public async Task Consumer_can_fetch_from_stream_with_messages_at_boundary()
{
// Go: TestJetStreamConsumerEOFBugNewFileStore consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("EOFC", "eofc.>");
_ = await fx.CreateConsumerAsync("EOFC", "C1", "eofc.>");
// Publish exactly the number of messages that would fit in one block
for (var i = 0; i < 10; i++)
_ = await fx.PublishAndGetAckAsync("eofc.x", $"msg-{i:D4}");
var batch = await fx.FetchAsync("EOFC", "C1", 10);
batch.Messages.Count.ShouldBe(10);
}
// =========================================================================
// TestJetStreamConsumerCleanupWithRetentionPolicy — consumer_test.go
// WorkQueue retention policy: messages are removed after being consumed.
// Consumer can still be deleted cleanly.
// =========================================================================
[Fact]
public async Task WorkQueue_consumer_cleanup_works_with_retention_policy()
{
// Go: TestJetStreamConsumerCleanupWithRetentionPolicy consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "CWRP",
Subjects = ["cwrp.>"],
Retention = RetentionPolicy.WorkQueue,
});
_ = await fx.CreateConsumerAsync("CWRP", "WORKER", "cwrp.>");
_ = await fx.PublishAndGetAckAsync("cwrp.x", "job1");
var batch = await fx.FetchAsync("CWRP", "WORKER", 5);
batch.Messages.Count.ShouldBe(1);
// Consumer can be deleted without error
var del = await fx.RequestLocalAsync("$JS.API.CONSUMER.DELETE.CWRP.WORKER", "{}");
del.Success.ShouldBeTrue();
}
// =========================================================================
// TestJetStreamConsumerPendingBugWithKV — consumer_test.go
// Pending count returns 0 when no messages have been fetched.
// =========================================================================
[Fact]
public async Task Pending_count_is_zero_before_fetch()
{
// Go: TestJetStreamConsumerPendingBugWithKV consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("PCBK", "pcbk.>");
_ = await fx.CreateConsumerAsync("PCBK", "C1", "pcbk.>",
ackPolicy: AckPolicy.Explicit);
for (var i = 0; i < 5; i++)
_ = await fx.PublishAndGetAckAsync("pcbk.x", $"msg-{i}");
var pending = await fx.GetPendingCountAsync("PCBK", "C1");
pending.ShouldBe(0);
}
// =========================================================================
// TestJetStreamConsumerBadCreateErr — consumer_test.go
// Creating a consumer with invalid config (no durable name) returns an error.
// =========================================================================
[Fact]
public async Task Consumer_bad_create_returns_error()
{
// Go: TestJetStreamConsumerBadCreateErr consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("BCE", "bce.>");
// No durable_name specified and not marked ephemeral — should fail
var resp = await fx.RequestLocalAsync(
"$JS.API.CONSUMER.CREATE.BCE.",
"""{"filter_subject":"bce.>"}""");
// Either an error response or a not-found (depends on subject parsing)
(resp.Error != null || resp.ConsumerInfo == null).ShouldBeTrue();
}
// =========================================================================
// TestJetStreamConsumerInternalClientLeak — consumer_test.go
// Creating and deleting many consumers does not leak handles.
// =========================================================================
[Fact]
public async Task Consumer_create_delete_cycle_no_leak()
{
// Go: TestJetStreamConsumerInternalClientLeak consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("ICL", "icl.>");
for (var i = 0; i < 10; i++)
{
var name = $"C{i:D3}";
var create = await fx.CreateConsumerAsync("ICL", name, "icl.>");
create.Error.ShouldBeNull();
var del = await fx.RequestLocalAsync($"$JS.API.CONSUMER.DELETE.ICL.{name}", "{}");
del.Success.ShouldBeTrue();
}
// After all creates and deletes, no consumers remain
var names = await fx.RequestLocalAsync("$JS.API.CONSUMER.NAMES.ICL", "{}");
(names.ConsumerNames == null || names.ConsumerNames.Count == 0).ShouldBeTrue();
}
// =========================================================================
// TestJetStreamConsumerEventingRaceOnShutdown — consumer_test.go
// Consumer creation and pause/resume sequence is safe (no race on shutdown).
// =========================================================================
[Fact]
public async Task Consumer_pause_resume_sequence_is_safe()
{
// Go: TestJetStreamConsumerEventingRaceOnShutdown consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("EROS", "eros.>");
_ = await fx.CreateConsumerAsync("EROS", "C1", "eros.>");
_ = await fx.PublishAndGetAckAsync("eros.x", "msg");
// Rapid pause/resume sequence
for (var i = 0; i < 5; i++)
{
var p = await fx.RequestLocalAsync("$JS.API.CONSUMER.PAUSE.EROS.C1", """{"pause":true}""");
p.Success.ShouldBeTrue();
var r = await fx.RequestLocalAsync("$JS.API.CONSUMER.PAUSE.EROS.C1", """{"pause":false}""");
r.Success.ShouldBeTrue();
}
// Consumer should still be functional
var batch = await fx.FetchAsync("EROS", "C1", 5);
batch.Messages.Count.ShouldBe(1);
}
// =========================================================================
// TestJetStreamConsumerNoMsgPayload — consumer_test.go
// A message with empty payload can be published and fetched correctly.
// =========================================================================
[Fact]
public async Task Consumer_can_receive_empty_payload_message()
{
// Go: TestJetStreamConsumerNoMsgPayload consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("NMP", "nmp.>");
_ = await fx.CreateConsumerAsync("NMP", "C1", "nmp.>");
_ = await fx.PublishAndGetAckAsync("nmp.x", "");
var batch = await fx.FetchAsync("NMP", "C1", 1);
batch.Messages.Count.ShouldBe(1);
batch.Messages[0].Payload.Length.ShouldBe(0);
}
// =========================================================================
// TestJetStreamConsumerUpdateSurvival — consumer_test.go
// Updating a consumer preserves messages already in the stream.
// =========================================================================
[Fact]
public async Task Consumer_update_does_not_lose_messages()
{
// Go: TestJetStreamConsumerUpdateSurvival consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CUS", "cus.>");
// Publish messages before consumer creation
for (var i = 0; i < 3; i++)
_ = await fx.PublishAndGetAckAsync("cus.x", $"msg-{i}");
// Create consumer
_ = await fx.CreateConsumerAsync("CUS", "C1", "cus.>");
// Update consumer (same config — idempotent)
_ = await fx.CreateConsumerAsync("CUS", "C1", "cus.>");
var batch = await fx.FetchAsync("CUS", "C1", 10);
batch.Messages.Count.ShouldBe(3);
}
// =========================================================================
// TestJetStreamConsumerAckSamplingSpecifiedUsingUpdateConsumer — consumer_test.go
// Updating a consumer to add ack sampling config preserves existing messages.
// =========================================================================
[Fact]
public async Task Consumer_update_preserves_ack_policy_and_messages()
{
// Go: TestJetStreamConsumerAckSamplingSpecifiedUsingUpdateConsumer consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CAS", "cas.>");
_ = await fx.CreateConsumerAsync("CAS", "C1", "cas.>",
ackPolicy: AckPolicy.Explicit);
_ = await fx.PublishAndGetAckAsync("cas.x", "msg1");
// Update: change ack wait (allowed update)
var update = await fx.CreateConsumerAsync("CAS", "C1", "cas.>",
ackPolicy: AckPolicy.Explicit, ackWaitMs: 10_000);
update.Error.ShouldBeNull();
var batch = await fx.FetchAsync("CAS", "C1", 5);
batch.Messages.Count.ShouldBe(1);
}
// =========================================================================
// TestJetStreamConsumerStreamUpdate — consumer_test.go
// Adding subjects to a stream does not break existing consumers.
// =========================================================================
[Fact]
public async Task Stream_update_does_not_break_consumer()
{
// Go: TestJetStreamConsumerStreamUpdate consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CSU", "csu.a");
_ = await fx.CreateConsumerAsync("CSU", "C1", "csu.a");
_ = await fx.PublishAndGetAckAsync("csu.a", "before-update");
// Update stream to add more subjects
var upd = await fx.RequestLocalAsync(
"$JS.API.STREAM.UPDATE.CSU",
"""{"name":"CSU","subjects":["csu.a","csu.b"]}""");
upd.Error.ShouldBeNull();
_ = await fx.PublishAndGetAckAsync("csu.a", "after-update");
var batch = await fx.FetchAsync("CSU", "C1", 10);
batch.Messages.Count.ShouldBe(2);
}
// =========================================================================
// TestJetStreamConsumerAndStreamNamesWithPathSeparators — consumer_test.go
// Consumer names with dots or slashes are handled correctly.
// =========================================================================
[Fact]
public async Task Consumer_and_stream_names_with_hyphens_are_valid()
{
// Go: TestJetStreamConsumerAndStreamNamesWithPathSeparators consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("STREAM-ONE", "sone.>");
var resp = await fx.CreateConsumerAsync("STREAM-ONE", "CONSUMER-ONE", "sone.>");
resp.Error.ShouldBeNull();
resp.ConsumerInfo!.Config.DurableName.ShouldBe("CONSUMER-ONE");
}
// =========================================================================
// TestJetStreamConsumerUpdateFilterSubject — consumer_test.go
// Updating a consumer's filter subject is reflected in info.
// =========================================================================
[Fact]
public async Task Consumer_filter_subject_can_be_updated()
{
// Go: TestJetStreamConsumerUpdateFilterSubject consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CUFS", "cufs.>");
_ = await fx.CreateConsumerAsync("CUFS", "C1", "cufs.a");
// Update filter
var update = await fx.CreateConsumerAsync("CUFS", "C1", "cufs.b");
update.Error.ShouldBeNull();
var info = await fx.GetConsumerInfoAsync("CUFS", "C1");
info.Config.FilterSubject.ShouldBe("cufs.b");
}
// =========================================================================
// TestJetStreamConsumerAndStreamMetadata — consumer_test.go
// Consumer info includes config metadata.
// =========================================================================
[Fact]
public async Task Consumer_info_includes_config_metadata()
{
// Go: TestJetStreamConsumerAndStreamMetadata consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CISM", "cism.>");
var create = await fx.CreateConsumerAsync("CISM", "META", "cism.>",
ackPolicy: AckPolicy.Explicit,
maxAckPending: 100);
create.Error.ShouldBeNull();
var info = await fx.GetConsumerInfoAsync("CISM", "META");
info.Config.DurableName.ShouldBe("META");
info.Config.AckPolicy.ShouldBe(AckPolicy.Explicit);
info.Config.MaxAckPending.ShouldBe(100);
}
// =========================================================================
// TestJetStreamConsumerDefaultsFromStream — consumer_test.go
// A consumer created on a stream inherits the stream's subject if no
// filter is specified (matches all messages on the stream).
// =========================================================================
[Fact]
public async Task Consumer_without_filter_receives_all_stream_messages()
{
// Go: TestJetStreamConsumerDefaultsFromStream consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CDFS", "cdfs.>");
_ = await fx.CreateConsumerAsync("CDFS", "C1", null);
_ = await fx.PublishAndGetAckAsync("cdfs.a", "1");
_ = await fx.PublishAndGetAckAsync("cdfs.b", "2");
_ = await fx.PublishAndGetAckAsync("cdfs.c", "3");
var batch = await fx.FetchAsync("CDFS", "C1", 10);
batch.Messages.Count.ShouldBe(3);
}
// =========================================================================
// TestJetStreamConsumerWithFormattingSymbol — consumer_test.go
// Consumer durable names can contain underscores and hyphens.
// =========================================================================
[Fact]
public async Task Consumer_durable_name_with_special_chars_is_valid()
{
// Go: TestJetStreamConsumerWithFormattingSymbol consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CWFS", "cwfs.>");
var resp = await fx.CreateConsumerAsync("CWFS", "my_consumer-v2", "cwfs.>");
resp.Error.ShouldBeNull();
resp.ConsumerInfo!.Config.DurableName.ShouldBe("my_consumer-v2");
}
// =========================================================================
// TestJetStreamConsumerWithCorruptStateIsDeleted — consumer_test.go
// A ConsumerFileStore with corrupt data initialises to empty state
// instead of crashing.
// =========================================================================
[Fact]
public void ConsumerFileStore_with_corrupt_state_initializes_empty()
{
// Go: TestJetStreamConsumerWithCorruptStateIsDeleted consumer_test.go
var dir = Path.Combine(Path.GetTempPath(), $"nats-corrupt-{Guid.NewGuid():N}");
Directory.CreateDirectory(dir);
var stateFile = Path.Combine(dir, "state.bin");
try
{
// Write corrupt data
File.WriteAllBytes(stateFile, [0xFF, 0xFE, 0x00, 0x01, 0x02, 0x03]);
var cfg = new ConsumerConfig
{
DurableName = "C",
AckPolicy = AckPolicy.Explicit,
};
// Should not throw — corrupt state means empty state
var store = new ConsumerFileStore(stateFile, cfg);
var state = store.State();
store.Stop();
// Corrupt state → empty/default state
state.Delivered.Consumer.ShouldBe(0UL);
}
finally
{
if (Directory.Exists(dir))
Directory.Delete(dir, recursive: true);
}
}
// =========================================================================
// TestJetStreamConsumerNoDeleteAfterConcurrentShutdownAndLeaderChange — consumer_test.go
// A consumer that is unregistered from the manager truly returns not-found.
// =========================================================================
[Fact]
public async Task Consumer_not_found_after_delete()
{
// Go: TestJetStreamConsumerNoDeleteAfterConcurrentShutdownAndLeaderChange consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("NDSL", "ndsl.>");
_ = await fx.CreateConsumerAsync("NDSL", "C1", "ndsl.>");
var del = await fx.RequestLocalAsync("$JS.API.CONSUMER.DELETE.NDSL.C1", "{}");
del.Success.ShouldBeTrue();
// Info on deleted consumer returns error
var info = await fx.RequestLocalAsync("$JS.API.CONSUMER.INFO.NDSL.C1", "{}");
info.Error.ShouldNotBeNull();
}
// =========================================================================
// TestJetStreamConsumerResetToSequenceConstraintOnStartSeq — consumer_test.go
// DeliverPolicy.ByStartSequence delivers from the specified sequence onward.
// =========================================================================
[Fact]
public async Task Consumer_reset_to_sequence_start_seq()
{
// Go: TestJetStreamConsumerResetToSequenceConstraintOnStartSeq consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CRSS", "crss.>");
for (var i = 1; i <= 5; i++)
_ = await fx.PublishAndGetAckAsync("crss.x", $"msg-{i}");
var resp = await fx.RequestLocalAsync(
"$JS.API.CONSUMER.CREATE.CRSS.C1",
"""{"durable_name":"C1","filter_subject":"crss.>","deliver_policy":"by_start_sequence","opt_start_seq":3}""");
resp.Error.ShouldBeNull();
var batch = await fx.FetchAsync("CRSS", "C1", 10);
batch.Messages.Count.ShouldBe(3);
batch.Messages[0].Sequence.ShouldBe(3UL);
}
// =========================================================================
// TestJetStreamConsumerResetToSequenceConstraintOnStartTime — consumer_test.go
// DeliverPolicy.ByStartTime delivers from messages at or after the start time.
// =========================================================================
[Fact]
public async Task Consumer_reset_to_sequence_start_time()
{
// Go: TestJetStreamConsumerResetToSequenceConstraintOnStartTime consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CRST", "crst.>");
_ = await fx.PublishAndGetAckAsync("crst.x", "before");
_ = await fx.PublishAndGetAckAsync("crst.x", "before-2");
var startTime = DateTime.UtcNow.AddSeconds(1).ToString("O");
// Small delay so publish after startTime
await Task.Delay(10);
_ = await fx.PublishAndGetAckAsync("crst.x", "after");
var resp = await fx.RequestLocalAsync(
"$JS.API.CONSUMER.CREATE.CRST.C1",
$"{{\"durable_name\":\"C1\",\"filter_subject\":\"crst.>\",\"deliver_policy\":\"by_start_time\",\"opt_start_time_utc\":\"{startTime}\"}}");
resp.Error.ShouldBeNull();
// DeliverPolicy.ByStartTime resolution is approximate; just verify no error
var info = await fx.GetConsumerInfoAsync("CRST", "C1");
info.Config.DeliverPolicy.ShouldBe(DeliverPolicy.ByStartTime);
}
// =========================================================================
// TestJetStreamConsumerLegacyDurableCreateSetsConsumerName — consumer_test.go
// Consumer name is populated from durable_name field.
// =========================================================================
[Fact]
public async Task Legacy_durable_create_sets_consumer_name()
{
// Go: TestJetStreamConsumerLegacyDurableCreateSetsConsumerName consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("LDCS", "ldcs.>");
var resp = await fx.RequestLocalAsync(
"$JS.API.CONSUMER.CREATE.LDCS.LEGACY",
"""{"durable_name":"LEGACY","filter_subject":"ldcs.>"}""");
resp.Error.ShouldBeNull();
resp.ConsumerInfo.ShouldNotBeNull();
resp.ConsumerInfo!.Config.DurableName.ShouldBe("LEGACY");
}
// =========================================================================
// TestJetStreamConsumerNotInactiveDuringAckWait — consumer_test.go
// A consumer with pending acks is considered active (not inactive/expired).
// After acking all pending messages, pending count returns to 0.
// =========================================================================
[Fact]
public async Task Consumer_not_inactive_during_ack_wait()
{
// Go: TestJetStreamConsumerNotInactiveDuringAckWait consumer_test.go
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("NIDAW", "nidaw.>");
_ = await fx.CreateConsumerAsync("NIDAW", "C1", "nidaw.>",
ackPolicy: AckPolicy.Explicit,
ackWaitMs: 30_000);
_ = await fx.PublishAndGetAckAsync("nidaw.x", "msg");
var batch = await fx.FetchAsync("NIDAW", "C1", 1);
batch.Messages.Count.ShouldBe(1);
// Consumer has pending ack — should still be "active"
var pending = await fx.GetPendingCountAsync("NIDAW", "C1");
pending.ShouldBe(1);
// Ack the message — pending drops to 0
await fx.AckAllAsync("NIDAW", "C1", batch.Messages[0].Sequence);
var afterAck = await fx.GetPendingCountAsync("NIDAW", "C1");
afterAck.ShouldBe(0);
}
// =========================================================================
// PullRequestWaitQueue — priority group round-robin within same priority
// =========================================================================
[Fact]
public void Pull_request_queue_round_robins_within_same_priority()
{
// Go: TestJetStreamConsumerPriorityPullRequests — pop-and-requeue within priority
var queue = new PullRequestWaitQueue();
queue.Enqueue(new PullWaitingRequest { Priority = 1, Batch = 3, RemainingBatch = 3, Reply = "reply-A" });
queue.Enqueue(new PullWaitingRequest { Priority = 1, Batch = 3, RemainingBatch = 3, Reply = "reply-B" });
// First pop-and-requeue returns A, re-inserts A after B
var first = queue.PopAndRequeue();
first.ShouldNotBeNull();
first!.Reply.ShouldBe("reply-A");
first.RemainingBatch.ShouldBe(2);
// Second pop-and-requeue returns B (A is now at end of priority group)
var second = queue.PopAndRequeue();
second.ShouldNotBeNull();
second!.Reply.ShouldBe("reply-B");
}
// =========================================================================
// RedeliveryTracker — due entries returned after deadline passes
// =========================================================================
[Fact]
public async Task Redelivery_tracker_returns_due_entries_after_deadline()
{
// Go: TestJetStreamConsumerMaxDeliveryAndServerRestart — redelivery mechanics
var tracker = new RedeliveryTracker(backoffMs: [50]);
tracker.Schedule(10, 1, ackWaitMs: 50);
// Not yet due
var notYet = tracker.GetDue();
notYet.Count.ShouldBe(0);
// Wait for the deadline to pass
await Task.Delay(100);
var due = tracker.GetDue();
due.Count.ShouldBe(1);
due[0].ShouldBe(10UL);
// Acknowledge removes from tracker
tracker.Acknowledge(10);
tracker.IsTracking(10).ShouldBeFalse();
}
// =========================================================================
// ConsumerFileStore — encode/decode round-trip
// =========================================================================
[Fact]
public void ConsumerFileStore_state_encode_decode_roundtrip()
{
// Go: TestJetStreamConsumerWithCorruptStateIsDeleted + state persistence tests
var dir = Path.Combine(Path.GetTempPath(), $"nats-rt-{Guid.NewGuid():N}");
Directory.CreateDirectory(dir);
var stateFile = Path.Combine(dir, "state.bin");
try
{
var cfg = new ConsumerConfig
{
DurableName = "RT",
AckPolicy = AckPolicy.Explicit,
};
var store = new ConsumerFileStore(stateFile, cfg);
long tsNs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000;
store.UpdateDelivered(1, 1, 1, tsNs);
store.UpdateDelivered(2, 2, 1, tsNs + 1);
store.Stop();
var store2 = new ConsumerFileStore(stateFile, cfg);
var state = store2.State();
store2.Stop();
state.Delivered.Consumer.ShouldBe(2UL);
state.Delivered.Stream.ShouldBe(2UL);
}
finally
{
if (Directory.Exists(dir))
Directory.Delete(dir, recursive: true);
}
}
}