- git mv JetStreamApiFixture, JetStreamClusterFixture, LeafFixture, Parity utilities, and TestData from NATS.Server.Tests to NATS.Server.TestUtilities - Update namespaces to NATS.Server.TestUtilities (and .Parity sub-ns) - Make fixture classes public for cross-project access - Add PollHelper to replace Task.Delay polling with SemaphoreSlim waits - Refactor all fixture polling loops to use PollHelper - Add 'using NATS.Server.TestUtilities;' to ~75 consuming test files - Rename local fixture duplicates (MetaGroupTestFixture, LeafProtocolTestFixture) to avoid shadowing shared fixtures - Remove TestData entry from NATS.Server.Tests.csproj (moved to TestUtilities)
1253 lines
53 KiB
C#
1253 lines
53 KiB
C#
// Go reference: golang/nats-server/server/consumer_test.go
|
|
// golang/nats-server/server/jetstream_test.go
|
|
//
|
|
// Ports Go consumer lifecycle tests for pause/resume, replay rate, priority pull
|
|
// requests, consumer state, max delivery underflow, inactive threshold, and
|
|
// miscellaneous lifecycle correctness.
|
|
|
|
using System.Text;
|
|
using NATS.Server.JetStream;
|
|
using NATS.Server.JetStream.Api;
|
|
using NATS.Server.JetStream.Consumers;
|
|
using NATS.Server.JetStream.Models;
|
|
using NATS.Server.JetStream.Storage;
|
|
using NATS.Server.TestUtilities;
|
|
|
|
namespace NATS.Server.Tests.JetStream.Consumers;
|
|
|
|
/// <summary>
|
|
/// Consumer lifecycle tests: pause/resume, replay, priority pull, state persistence,
|
|
/// max-delivery underflow, inactive threshold, and assorted edge cases.
|
|
/// </summary>
|
|
public class ConsumerLifecycleTests
|
|
{
|
|
// =========================================================================
|
|
// TestJetStreamConsumerPauseViaConfig — consumer_test.go
|
|
// Creating a consumer with Paused=true should store the paused flag.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Pause_via_config_creates_paused_consumer()
|
|
{
|
|
// Go: TestJetStreamConsumerPauseViaConfig consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("PSC", "psc.>");
|
|
|
|
// Create consumer and immediately pause it via config
|
|
var create = await fx.CreateConsumerAsync("PSC", "C1", "psc.>");
|
|
create.Error.ShouldBeNull();
|
|
create.ConsumerInfo.ShouldNotBeNull();
|
|
|
|
// Pause via API
|
|
var pause = await fx.RequestLocalAsync("$JS.API.CONSUMER.PAUSE.PSC.C1", """{"pause":true}""");
|
|
pause.Success.ShouldBeTrue();
|
|
|
|
// Consumer should still exist
|
|
var info = await fx.GetConsumerInfoAsync("PSC", "C1");
|
|
info.Config.DurableName.ShouldBe("C1");
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerPauseViaEndpoint — consumer_test.go
|
|
// Pausing via the PAUSE API endpoint sets the paused flag.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Pause_via_endpoint_marks_consumer_paused()
|
|
{
|
|
// Go: TestJetStreamConsumerPauseViaEndpoint consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("PVE", "pve.>");
|
|
|
|
_ = await fx.CreateConsumerAsync("PVE", "PAUSER", "pve.>");
|
|
|
|
var resp = await fx.RequestLocalAsync("$JS.API.CONSUMER.PAUSE.PVE.PAUSER", """{"pause":true}""");
|
|
resp.Error.ShouldBeNull();
|
|
resp.Success.ShouldBeTrue();
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerPauseResumeViaEndpoint — consumer_test.go
|
|
// A paused consumer can be resumed by setting pause=false.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Pause_then_resume_via_endpoint()
|
|
{
|
|
// Go: TestJetStreamConsumerPauseResumeViaEndpoint consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("PRV", "prv.>");
|
|
|
|
_ = await fx.CreateConsumerAsync("PRV", "C1", "prv.>");
|
|
|
|
var pause = await fx.RequestLocalAsync("$JS.API.CONSUMER.PAUSE.PRV.C1", """{"pause":true}""");
|
|
pause.Success.ShouldBeTrue();
|
|
|
|
var resume = await fx.RequestLocalAsync("$JS.API.CONSUMER.PAUSE.PRV.C1", """{"pause":false}""");
|
|
resume.Success.ShouldBeTrue();
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerPauseAdvisories — consumer_test.go
|
|
// Pause and resume operations complete without error (advisory publication
|
|
// is a server-side side effect; here we verify the API shape is correct).
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Pause_and_resume_operations_return_success()
|
|
{
|
|
// Go: TestJetStreamConsumerPauseAdvisories consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("PA", "pa.>");
|
|
|
|
_ = await fx.CreateConsumerAsync("PA", "ADV", "pa.>");
|
|
|
|
// Pause
|
|
var p1 = await fx.RequestLocalAsync("$JS.API.CONSUMER.PAUSE.PA.ADV", """{"pause":true}""");
|
|
p1.Success.ShouldBeTrue();
|
|
p1.Error.ShouldBeNull();
|
|
|
|
// Resume
|
|
var p2 = await fx.RequestLocalAsync("$JS.API.CONSUMER.PAUSE.PA.ADV", """{"pause":false}""");
|
|
p2.Success.ShouldBeTrue();
|
|
p2.Error.ShouldBeNull();
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerReplayRate — consumer_test.go
|
|
// ReplayPolicy.Original delays deliveries to match original timing.
|
|
// The config must be stored and the policy must be respected.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Replay_original_policy_is_stored_on_consumer()
|
|
{
|
|
// Go: TestJetStreamConsumerReplayRate consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("RPR", "rpr.>");
|
|
|
|
var resp = await fx.RequestLocalAsync(
|
|
"$JS.API.CONSUMER.CREATE.RPR.C1",
|
|
"""{"durable_name":"C1","filter_subject":"rpr.>","replay_policy":"original","ack_policy":"explicit"}""");
|
|
resp.Error.ShouldBeNull();
|
|
|
|
var info = await fx.GetConsumerInfoAsync("RPR", "C1");
|
|
info.Config.ReplayPolicy.ShouldBe(ReplayPolicy.Original);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerReplayQuit — consumer_test.go
|
|
// Replay original policy: consumer can fetch messages (delivery works).
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Replay_original_consumer_can_fetch_messages()
|
|
{
|
|
// Go: TestJetStreamConsumerReplayQuit consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithReplayOriginalConsumerAsync();
|
|
|
|
_ = await fx.PublishAndGetAckAsync("orders.created", "msg-for-replay");
|
|
|
|
// Fetch both messages (the one published in fixture setup and the one above)
|
|
var batch = await fx.FetchAsync("ORDERS", "RO", 5);
|
|
batch.Messages.Count.ShouldBeGreaterThanOrEqualTo(1);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerPrioritized — consumer_test.go
|
|
// Priority-group consumers: only the highest-priority consumer is active.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public void Consumer_prioritized_active_is_lowest_priority_number()
|
|
{
|
|
// Go: TestJetStreamConsumerPrioritized consumer_test.go
|
|
var mgr = new PriorityGroupManager();
|
|
|
|
// Register three consumers in a group with different priorities
|
|
mgr.Register("work-group", "consumer-A", priority: 10);
|
|
mgr.Register("work-group", "consumer-B", priority: 1);
|
|
mgr.Register("work-group", "consumer-C", priority: 5);
|
|
|
|
// Consumer-B (priority 1) should be the active one
|
|
var active = mgr.GetActiveConsumer("work-group");
|
|
active.ShouldBe("consumer-B");
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerPriorityPullRequests — consumer_test.go
|
|
// The pull request wait queue sorts by priority (lower value = first served).
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public void Consumer_priority_pull_requests_served_in_priority_order()
|
|
{
|
|
// Go: TestJetStreamConsumerPriorityPullRequests consumer_test.go
|
|
var queue = new PullRequestWaitQueue();
|
|
|
|
// Enqueue requests with varying priorities
|
|
queue.Enqueue(new PullWaitingRequest { Priority = 5, Batch = 1, RemainingBatch = 1 });
|
|
queue.Enqueue(new PullWaitingRequest { Priority = 1, Batch = 1, RemainingBatch = 1 });
|
|
queue.Enqueue(new PullWaitingRequest { Priority = 3, Batch = 1, RemainingBatch = 1 });
|
|
|
|
// First dequeued should be lowest-priority-number (highest priority = 1)
|
|
var first = queue.Dequeue();
|
|
first.ShouldNotBeNull();
|
|
first!.Priority.ShouldBe(1);
|
|
|
|
var second = queue.Dequeue();
|
|
second.ShouldNotBeNull();
|
|
second!.Priority.ShouldBe(3);
|
|
|
|
var third = queue.Dequeue();
|
|
third.ShouldNotBeNull();
|
|
third!.Priority.ShouldBe(5);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerMaxDeliverUnderflow — consumer_test.go
|
|
// A consumer with MaxDeliver set can deliver messages up to that count,
|
|
// then stops re-delivering (underflow: delivered < max_deliver is OK).
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Max_deliver_underflow_consumer_delivers_up_to_limit()
|
|
{
|
|
// Go: TestJetStreamConsumerMaxDeliverUnderflow consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("MDU", "mdu.>");
|
|
|
|
// Create consumer with max_deliver=3, ack_wait short for redelivery
|
|
var resp = await fx.RequestLocalAsync(
|
|
"$JS.API.CONSUMER.CREATE.MDU.C1",
|
|
"""{"durable_name":"C1","filter_subject":"mdu.>","ack_policy":"explicit","max_deliver":3,"ack_wait_ms":50}""");
|
|
resp.Error.ShouldBeNull();
|
|
|
|
_ = await fx.PublishAndGetAckAsync("mdu.x", "payload");
|
|
|
|
var batch = await fx.FetchAsync("MDU", "C1", 1);
|
|
batch.Messages.Count.ShouldBe(1);
|
|
|
|
// Consumer info should reflect the config
|
|
var info = await fx.GetConsumerInfoAsync("MDU", "C1");
|
|
info.Config.MaxDeliver.ShouldBe(3);
|
|
info.Config.AckPolicy.ShouldBe(AckPolicy.Explicit);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerNoWaitNoMessagesOnEos — consumer_test.go
|
|
// NoWait fetch when stream is at end of stream returns empty immediately.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task NoWait_at_eos_returns_empty()
|
|
{
|
|
// Go: TestJetStreamConsumerNoWaitNoMessagesOnEos consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("NWEOS", "nweos.>");
|
|
_ = await fx.CreateConsumerAsync("NWEOS", "C1", "nweos.>");
|
|
|
|
// Stream is empty — NoWait should return empty batch
|
|
var batch = await fx.FetchWithNoWaitAsync("NWEOS", "C1", 5);
|
|
batch.Messages.Count.ShouldBe(0);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerNoWaitNoMessagesOnEosWithDeliveredMsgs — consumer_test.go
|
|
// After consuming all messages, NoWait at EOS returns empty.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task NoWait_after_all_messages_consumed_returns_empty()
|
|
{
|
|
// Go: TestJetStreamConsumerNoWaitNoMessagesOnEosWithDeliveredMsgs consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("NWDM", "nwdm.>");
|
|
_ = await fx.CreateConsumerAsync("NWDM", "C1", "nwdm.>");
|
|
|
|
_ = await fx.PublishAndGetAckAsync("nwdm.x", "msg1");
|
|
_ = await fx.PublishAndGetAckAsync("nwdm.x", "msg2");
|
|
|
|
// Consume all messages
|
|
var batch1 = await fx.FetchAsync("NWDM", "C1", 5);
|
|
batch1.Messages.Count.ShouldBe(2);
|
|
|
|
// Now NoWait at EOS should return empty
|
|
var batch2 = await fx.FetchWithNoWaitAsync("NWDM", "C1", 5);
|
|
batch2.Messages.Count.ShouldBe(0);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerActionsViaAPI — consumer_test.go
|
|
// Consumer API allows creating, reading info, and deleting consumers.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Consumer_actions_create_info_delete_via_api()
|
|
{
|
|
// Go: TestJetStreamConsumerActionsViaAPI consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CAV", "cav.>");
|
|
|
|
// Create
|
|
var create = await fx.CreateConsumerAsync("CAV", "DUR1", "cav.>");
|
|
create.Error.ShouldBeNull();
|
|
create.ConsumerInfo.ShouldNotBeNull();
|
|
|
|
// Info
|
|
var info = await fx.GetConsumerInfoAsync("CAV", "DUR1");
|
|
info.Config.DurableName.ShouldBe("DUR1");
|
|
|
|
// List
|
|
var names = await fx.RequestLocalAsync("$JS.API.CONSUMER.NAMES.CAV", "{}");
|
|
names.ConsumerNames.ShouldNotBeNull();
|
|
names.ConsumerNames!.ShouldContain("DUR1");
|
|
|
|
// Delete
|
|
var del = await fx.RequestLocalAsync("$JS.API.CONSUMER.DELETE.CAV.DUR1", "{}");
|
|
del.Success.ShouldBeTrue();
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerActionsUnmarshal — consumer_test.go
|
|
// Consumer config fields survive round-trip JSON serialisation.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Consumer_config_fields_survive_serialisation()
|
|
{
|
|
// Go: TestJetStreamConsumerActionsUnmarshal consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CAU", "cau.>");
|
|
|
|
var resp = await fx.RequestLocalAsync(
|
|
"$JS.API.CONSUMER.CREATE.CAU.DUR2",
|
|
"""{"durable_name":"DUR2","filter_subject":"cau.>","ack_policy":"explicit","ack_wait_ms":5000,"max_deliver":5,"backoff_ms":[100,200,400]}""");
|
|
resp.Error.ShouldBeNull();
|
|
|
|
var info = await fx.GetConsumerInfoAsync("CAU", "DUR2");
|
|
info.Config.AckPolicy.ShouldBe(AckPolicy.Explicit);
|
|
info.Config.AckWaitMs.ShouldBe(5000);
|
|
info.Config.MaxDeliver.ShouldBe(5);
|
|
info.Config.BackOffMs.ShouldBe(new List<int> { 100, 200, 400 });
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerWorkQueuePolicyOverlap — consumer_test.go
|
|
// Two consumers on a WorkQueue stream can each have non-overlapping filters.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task WorkQueue_consumers_with_non_overlapping_filters()
|
|
{
|
|
// Go: TestJetStreamConsumerWorkQueuePolicyOverlap consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "WQO",
|
|
Subjects = ["wqo.>"],
|
|
Retention = RetentionPolicy.WorkQueue,
|
|
});
|
|
|
|
var c1 = await fx.CreateConsumerAsync("WQO", "C1", "wqo.a");
|
|
c1.Error.ShouldBeNull();
|
|
|
|
var c2 = await fx.CreateConsumerAsync("WQO", "C2", "wqo.b");
|
|
c2.Error.ShouldBeNull();
|
|
|
|
_ = await fx.PublishAndGetAckAsync("wqo.a", "for-c1");
|
|
_ = await fx.PublishAndGetAckAsync("wqo.b", "for-c2");
|
|
|
|
var batchC1 = await fx.FetchAsync("WQO", "C1", 5);
|
|
batchC1.Messages.Count.ShouldBe(1);
|
|
batchC1.Messages[0].Subject.ShouldBe("wqo.a");
|
|
|
|
var batchC2 = await fx.FetchAsync("WQO", "C2", 5);
|
|
batchC2.Messages.Count.ShouldBe(1);
|
|
batchC2.Messages[0].Subject.ShouldBe("wqo.b");
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerLongSubjectHang — consumer_test.go
|
|
// Consumer with a long filter subject (no hang, fetch works normally).
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Consumer_with_long_filter_subject_works()
|
|
{
|
|
// Go: TestJetStreamConsumerLongSubjectHang consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("LSH", ">");
|
|
|
|
var longSubject = "a." + string.Concat(Enumerable.Repeat("x.", 20)) + "end";
|
|
_ = await fx.CreateConsumerAsync("LSH", "LONG", longSubject);
|
|
|
|
_ = await fx.PublishAndGetAckAsync(longSubject, "payload");
|
|
|
|
var batch = await fx.FetchAsync("LSH", "LONG", 5);
|
|
batch.Messages.Count.ShouldBe(1);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerStuckAckPending — consumer_test.go
|
|
// After acking all messages the pending count returns to 0.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Stuck_ack_pending_clears_after_ack_all()
|
|
{
|
|
// Go: TestJetStreamConsumerStuckAckPending consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("SAP", "sap.>");
|
|
_ = await fx.CreateConsumerAsync("SAP", "C1", "sap.>",
|
|
ackPolicy: AckPolicy.All);
|
|
|
|
for (var i = 0; i < 5; i++)
|
|
_ = await fx.PublishAndGetAckAsync("sap.x", $"msg-{i}");
|
|
|
|
var batch = await fx.FetchAsync("SAP", "C1", 10);
|
|
batch.Messages.Count.ShouldBe(5);
|
|
|
|
var before = await fx.GetPendingCountAsync("SAP", "C1");
|
|
before.ShouldBeGreaterThan(0);
|
|
|
|
// Ack all messages up to the last sequence
|
|
await fx.AckAllAsync("SAP", "C1", batch.Messages[^1].Sequence);
|
|
|
|
var after = await fx.GetPendingCountAsync("SAP", "C1");
|
|
after.ShouldBe(0);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerSwitchLeaderDuringInflightAck — consumer_test.go
|
|
// Ack processor correctly handles acking a sequence that is already
|
|
// acknowledged (idempotent ack).
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public void Ack_processor_idempotent_double_ack()
|
|
{
|
|
// Go: TestJetStreamConsumerSwitchLeaderDuringInflightAck consumer_test.go
|
|
var ack = new AckProcessor();
|
|
ack.Register(1, 30_000);
|
|
ack.Register(2, 30_000);
|
|
|
|
// Ack sequence 1 twice — should not throw or corrupt state
|
|
ack.AckSequence(1);
|
|
ack.AckSequence(1); // idempotent
|
|
|
|
ack.PendingCount.ShouldBe(1); // sequence 2 is still pending
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerEphemeralRecoveryAfterServerRestart — consumer_test.go
|
|
// ConsumerFileStore can persist and reload state across a "restart"
|
|
// (i.e., create a new ConsumerFileStore for the same file path).
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public void Ephemeral_consumer_state_persists_and_recovers()
|
|
{
|
|
// Go: TestJetStreamConsumerEphemeralRecoveryAfterServerRestart consumer_test.go
|
|
var dir = Path.Combine(Path.GetTempPath(), $"nats-consumer-test-{Guid.NewGuid():N}");
|
|
Directory.CreateDirectory(dir);
|
|
var stateFile = Path.Combine(dir, "state.bin");
|
|
|
|
try
|
|
{
|
|
var cfg = new ConsumerConfig
|
|
{
|
|
DurableName = "EPHEMERAL",
|
|
AckPolicy = AckPolicy.Explicit,
|
|
};
|
|
|
|
// Simulate delivering a message and persisting state
|
|
var store1 = new ConsumerFileStore(stateFile, cfg);
|
|
store1.UpdateDelivered(1, 1, 1, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000);
|
|
store1.Stop();
|
|
|
|
// "Restart": load from the same file
|
|
var store2 = new ConsumerFileStore(stateFile, cfg);
|
|
var state = store2.State();
|
|
store2.Stop();
|
|
|
|
// Delivered sequence should have been persisted
|
|
state.Delivered.Consumer.ShouldBe(1UL);
|
|
}
|
|
finally
|
|
{
|
|
if (Directory.Exists(dir))
|
|
Directory.Delete(dir, recursive: true);
|
|
}
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerMaxDeliveryAndServerRestart — consumer_test.go
|
|
// The redelivery tracker correctly tracks delivery counts for sequences.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public void Redelivery_tracker_tracks_max_delivery_count()
|
|
{
|
|
// Go: TestJetStreamConsumerMaxDeliveryAndServerRestart consumer_test.go
|
|
var tracker = new RedeliveryTracker(backoffMs: [50, 100, 200]);
|
|
|
|
// Schedule sequence 1 with delivery count 1
|
|
tracker.Schedule(1, 1, ackWaitMs: 5000);
|
|
tracker.IsTracking(1).ShouldBeTrue();
|
|
|
|
// Not yet at max deliveries of 3
|
|
tracker.IsMaxDeliveries(1, 3).ShouldBeFalse();
|
|
|
|
// Advance to max
|
|
tracker.Schedule(1, 3, ackWaitMs: 5000);
|
|
tracker.IsMaxDeliveries(1, 3).ShouldBeTrue();
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerDeleteAndServerRestart — consumer_test.go
|
|
// Deleting a consumer removes it from the manager.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Delete_consumer_removes_it()
|
|
{
|
|
// Go: TestJetStreamConsumerDeleteAndServerRestart consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DCR", "dcr.>");
|
|
|
|
_ = await fx.CreateConsumerAsync("DCR", "TODEL", "dcr.>");
|
|
|
|
// Verify it exists
|
|
var names = await fx.RequestLocalAsync("$JS.API.CONSUMER.NAMES.DCR", "{}");
|
|
names.ConsumerNames.ShouldNotBeNull();
|
|
names.ConsumerNames!.ShouldContain("TODEL");
|
|
|
|
// Delete
|
|
var del = await fx.RequestLocalAsync("$JS.API.CONSUMER.DELETE.DCR.TODEL", "{}");
|
|
del.Success.ShouldBeTrue();
|
|
|
|
// Should no longer exist
|
|
var names2 = await fx.RequestLocalAsync("$JS.API.CONSUMER.NAMES.DCR", "{}");
|
|
(names2.ConsumerNames == null || !names2.ConsumerNames.Contains("TODEL")).ShouldBeTrue();
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerDurableReconnectWithOnlyPending — consumer_test.go
|
|
// A durable consumer with pending acks resumes delivery from where it left off.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Durable_consumer_resumes_with_pending_acks()
|
|
{
|
|
// Go: TestJetStreamConsumerDurableReconnectWithOnlyPending consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DRWP", "drwp.>");
|
|
_ = await fx.CreateConsumerAsync("DRWP", "DUR", "drwp.>",
|
|
ackPolicy: AckPolicy.Explicit,
|
|
ackWaitMs: 30_000);
|
|
|
|
for (var i = 1; i <= 3; i++)
|
|
_ = await fx.PublishAndGetAckAsync("drwp.x", $"msg-{i}");
|
|
|
|
// First fetch: get 2 of 3 (pending ack for both)
|
|
var batch1 = await fx.FetchAsync("DRWP", "DUR", 2);
|
|
batch1.Messages.Count.ShouldBe(2);
|
|
|
|
// Pending should be 2 (ack not sent)
|
|
var pending = await fx.GetPendingCountAsync("DRWP", "DUR");
|
|
pending.ShouldBe(2);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerDurableFilteredSubjectReconnect — consumer_test.go
|
|
// Durable consumer with filter subject delivers only matching messages.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Durable_filtered_consumer_delivers_matching_on_reconnect()
|
|
{
|
|
// Go: TestJetStreamConsumerDurableFilteredSubjectReconnect consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DFS", "dfs.>");
|
|
_ = await fx.CreateConsumerAsync("DFS", "DUR", "dfs.a");
|
|
|
|
_ = await fx.PublishAndGetAckAsync("dfs.a", "for-dur");
|
|
_ = await fx.PublishAndGetAckAsync("dfs.b", "not-for-dur");
|
|
_ = await fx.PublishAndGetAckAsync("dfs.a", "also-for-dur");
|
|
|
|
var batch = await fx.FetchAsync("DFS", "DUR", 10);
|
|
batch.Messages.Count.ShouldBe(2);
|
|
batch.Messages.All(m => m.Subject == "dfs.a").ShouldBeTrue();
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerInactiveNoDeadlock — consumer_test.go
|
|
// Deleting a consumer does not cause a deadlock — the operation completes.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Consumer_delete_does_not_deadlock()
|
|
{
|
|
// Go: TestJetStreamConsumerInactiveNoDeadlock consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("IND", "ind.>");
|
|
_ = await fx.CreateConsumerAsync("IND", "EPHEM", "ind.>", ephemeral: true);
|
|
|
|
// Delete should return promptly (no deadlock)
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
var delTask = fx.RequestLocalAsync("$JS.API.CONSUMER.DELETE.IND.EPHEM", "{}");
|
|
var completed = await Task.WhenAny(delTask, Task.Delay(-1, cts.Token));
|
|
completed.ShouldBe(delTask);
|
|
|
|
var del = await delTask;
|
|
del.Success.ShouldBeTrue();
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerReconnect — consumer_test.go
|
|
// A consumer that delivers messages then pauses/resumes still works.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Consumer_reconnect_after_pause_delivers_messages()
|
|
{
|
|
// Go: TestJetStreamConsumerReconnect consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CRC", "crc.>");
|
|
_ = await fx.CreateConsumerAsync("CRC", "C1", "crc.>");
|
|
|
|
_ = await fx.PublishAndGetAckAsync("crc.x", "pre-pause");
|
|
|
|
var pre = await fx.FetchAsync("CRC", "C1", 1);
|
|
pre.Messages.Count.ShouldBe(1);
|
|
|
|
// Pause
|
|
var pause = await fx.RequestLocalAsync("$JS.API.CONSUMER.PAUSE.CRC.C1", """{"pause":true}""");
|
|
pause.Success.ShouldBeTrue();
|
|
|
|
// Resume
|
|
var resume = await fx.RequestLocalAsync("$JS.API.CONSUMER.PAUSE.CRC.C1", """{"pause":false}""");
|
|
resume.Success.ShouldBeTrue();
|
|
|
|
// Publish after resume
|
|
_ = await fx.PublishAndGetAckAsync("crc.x", "post-resume");
|
|
|
|
var post = await fx.FetchAsync("CRC", "C1", 5);
|
|
post.Messages.Count.ShouldBe(1);
|
|
Encoding.UTF8.GetString(post.Messages[0].Payload.Span).ShouldBe("post-resume");
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerDeliverNewMaxRedeliveriesAndServerRestart — consumer_test.go
|
|
// Consumer with DeliverPolicy.New: the initial sequence is resolved at first
|
|
// fetch to LastSeq+1. Messages published before the first fetch are skipped
|
|
// because the engine resolves the start position when NextSequence==1.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Deliver_new_policy_config_is_stored_correctly()
|
|
{
|
|
// Go: TestJetStreamConsumerDeliverNewMaxRedeliveriesAndServerRestart consumer_test.go
|
|
// The .NET engine resolves DeliverPolicy.New's initial sequence at first-fetch
|
|
// time (LastSeq+1), so messages published before the first fetch are not
|
|
// visible. This test verifies the config is stored and the consumer is
|
|
// functional for messages published after the cursor is anchored.
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DNMR", "dnmr.>");
|
|
|
|
// Publish before consumer is created
|
|
_ = await fx.PublishAndGetAckAsync("dnmr.x", "pre-creation");
|
|
_ = await fx.PublishAndGetAckAsync("dnmr.x", "pre-creation-2");
|
|
|
|
// Create consumer with DeliverPolicy.New
|
|
var resp = await fx.RequestLocalAsync(
|
|
"$JS.API.CONSUMER.CREATE.DNMR.C1",
|
|
"""{"durable_name":"C1","filter_subject":"dnmr.>","deliver_policy":"new"}""");
|
|
resp.Error.ShouldBeNull();
|
|
|
|
// Verify the deliver policy is stored correctly
|
|
var info = await fx.GetConsumerInfoAsync("DNMR", "C1");
|
|
info.Config.DeliverPolicy.ShouldBe(DeliverPolicy.New);
|
|
|
|
// Fetch with NoWait — stream is at LastSeq, DeliverPolicy.New anchors to
|
|
// LastSeq+1, so there are no new messages yet (all were pre-creation).
|
|
var batch = await fx.FetchWithNoWaitAsync("DNMR", "C1", 10);
|
|
batch.Messages.Count.ShouldBe(0);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerEOFBugNewFileStore — consumer_test.go
|
|
// A consumer whose stream has messages at the boundary can still fetch.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Consumer_can_fetch_from_stream_with_messages_at_boundary()
|
|
{
|
|
// Go: TestJetStreamConsumerEOFBugNewFileStore consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("EOFC", "eofc.>");
|
|
_ = await fx.CreateConsumerAsync("EOFC", "C1", "eofc.>");
|
|
|
|
// Publish exactly the number of messages that would fit in one block
|
|
for (var i = 0; i < 10; i++)
|
|
_ = await fx.PublishAndGetAckAsync("eofc.x", $"msg-{i:D4}");
|
|
|
|
var batch = await fx.FetchAsync("EOFC", "C1", 10);
|
|
batch.Messages.Count.ShouldBe(10);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerCleanupWithRetentionPolicy — consumer_test.go
|
|
// WorkQueue retention policy: messages are removed after being consumed.
|
|
// Consumer can still be deleted cleanly.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task WorkQueue_consumer_cleanup_works_with_retention_policy()
|
|
{
|
|
// Go: TestJetStreamConsumerCleanupWithRetentionPolicy consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "CWRP",
|
|
Subjects = ["cwrp.>"],
|
|
Retention = RetentionPolicy.WorkQueue,
|
|
});
|
|
|
|
_ = await fx.CreateConsumerAsync("CWRP", "WORKER", "cwrp.>");
|
|
|
|
_ = await fx.PublishAndGetAckAsync("cwrp.x", "job1");
|
|
|
|
var batch = await fx.FetchAsync("CWRP", "WORKER", 5);
|
|
batch.Messages.Count.ShouldBe(1);
|
|
|
|
// Consumer can be deleted without error
|
|
var del = await fx.RequestLocalAsync("$JS.API.CONSUMER.DELETE.CWRP.WORKER", "{}");
|
|
del.Success.ShouldBeTrue();
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerPendingBugWithKV — consumer_test.go
|
|
// Pending count returns 0 when no messages have been fetched.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Pending_count_is_zero_before_fetch()
|
|
{
|
|
// Go: TestJetStreamConsumerPendingBugWithKV consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("PCBK", "pcbk.>");
|
|
_ = await fx.CreateConsumerAsync("PCBK", "C1", "pcbk.>",
|
|
ackPolicy: AckPolicy.Explicit);
|
|
|
|
for (var i = 0; i < 5; i++)
|
|
_ = await fx.PublishAndGetAckAsync("pcbk.x", $"msg-{i}");
|
|
|
|
var pending = await fx.GetPendingCountAsync("PCBK", "C1");
|
|
pending.ShouldBe(0);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerBadCreateErr — consumer_test.go
|
|
// Creating a consumer with invalid config (no durable name) returns an error.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Consumer_bad_create_returns_error()
|
|
{
|
|
// Go: TestJetStreamConsumerBadCreateErr consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("BCE", "bce.>");
|
|
|
|
// No durable_name specified and not marked ephemeral — should fail
|
|
var resp = await fx.RequestLocalAsync(
|
|
"$JS.API.CONSUMER.CREATE.BCE.",
|
|
"""{"filter_subject":"bce.>"}""");
|
|
|
|
// Either an error response or a not-found (depends on subject parsing)
|
|
(resp.Error != null || resp.ConsumerInfo == null).ShouldBeTrue();
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerInternalClientLeak — consumer_test.go
|
|
// Creating and deleting many consumers does not leak handles.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Consumer_create_delete_cycle_no_leak()
|
|
{
|
|
// Go: TestJetStreamConsumerInternalClientLeak consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("ICL", "icl.>");
|
|
|
|
for (var i = 0; i < 10; i++)
|
|
{
|
|
var name = $"C{i:D3}";
|
|
var create = await fx.CreateConsumerAsync("ICL", name, "icl.>");
|
|
create.Error.ShouldBeNull();
|
|
|
|
var del = await fx.RequestLocalAsync($"$JS.API.CONSUMER.DELETE.ICL.{name}", "{}");
|
|
del.Success.ShouldBeTrue();
|
|
}
|
|
|
|
// After all creates and deletes, no consumers remain
|
|
var names = await fx.RequestLocalAsync("$JS.API.CONSUMER.NAMES.ICL", "{}");
|
|
(names.ConsumerNames == null || names.ConsumerNames.Count == 0).ShouldBeTrue();
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerEventingRaceOnShutdown — consumer_test.go
|
|
// Consumer creation and pause/resume sequence is safe (no race on shutdown).
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Consumer_pause_resume_sequence_is_safe()
|
|
{
|
|
// Go: TestJetStreamConsumerEventingRaceOnShutdown consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("EROS", "eros.>");
|
|
|
|
_ = await fx.CreateConsumerAsync("EROS", "C1", "eros.>");
|
|
_ = await fx.PublishAndGetAckAsync("eros.x", "msg");
|
|
|
|
// Rapid pause/resume sequence
|
|
for (var i = 0; i < 5; i++)
|
|
{
|
|
var p = await fx.RequestLocalAsync("$JS.API.CONSUMER.PAUSE.EROS.C1", """{"pause":true}""");
|
|
p.Success.ShouldBeTrue();
|
|
var r = await fx.RequestLocalAsync("$JS.API.CONSUMER.PAUSE.EROS.C1", """{"pause":false}""");
|
|
r.Success.ShouldBeTrue();
|
|
}
|
|
|
|
// Consumer should still be functional
|
|
var batch = await fx.FetchAsync("EROS", "C1", 5);
|
|
batch.Messages.Count.ShouldBe(1);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerNoMsgPayload — consumer_test.go
|
|
// A message with empty payload can be published and fetched correctly.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Consumer_can_receive_empty_payload_message()
|
|
{
|
|
// Go: TestJetStreamConsumerNoMsgPayload consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("NMP", "nmp.>");
|
|
_ = await fx.CreateConsumerAsync("NMP", "C1", "nmp.>");
|
|
|
|
_ = await fx.PublishAndGetAckAsync("nmp.x", "");
|
|
|
|
var batch = await fx.FetchAsync("NMP", "C1", 1);
|
|
batch.Messages.Count.ShouldBe(1);
|
|
batch.Messages[0].Payload.Length.ShouldBe(0);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerUpdateSurvival — consumer_test.go
|
|
// Updating a consumer preserves messages already in the stream.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Consumer_update_does_not_lose_messages()
|
|
{
|
|
// Go: TestJetStreamConsumerUpdateSurvival consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CUS", "cus.>");
|
|
|
|
// Publish messages before consumer creation
|
|
for (var i = 0; i < 3; i++)
|
|
_ = await fx.PublishAndGetAckAsync("cus.x", $"msg-{i}");
|
|
|
|
// Create consumer
|
|
_ = await fx.CreateConsumerAsync("CUS", "C1", "cus.>");
|
|
|
|
// Update consumer (same config — idempotent)
|
|
_ = await fx.CreateConsumerAsync("CUS", "C1", "cus.>");
|
|
|
|
var batch = await fx.FetchAsync("CUS", "C1", 10);
|
|
batch.Messages.Count.ShouldBe(3);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerAckSamplingSpecifiedUsingUpdateConsumer — consumer_test.go
|
|
// Updating a consumer to add ack sampling config preserves existing messages.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Consumer_update_preserves_ack_policy_and_messages()
|
|
{
|
|
// Go: TestJetStreamConsumerAckSamplingSpecifiedUsingUpdateConsumer consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CAS", "cas.>");
|
|
|
|
_ = await fx.CreateConsumerAsync("CAS", "C1", "cas.>",
|
|
ackPolicy: AckPolicy.Explicit);
|
|
|
|
_ = await fx.PublishAndGetAckAsync("cas.x", "msg1");
|
|
|
|
// Update: change ack wait (allowed update)
|
|
var update = await fx.CreateConsumerAsync("CAS", "C1", "cas.>",
|
|
ackPolicy: AckPolicy.Explicit, ackWaitMs: 10_000);
|
|
update.Error.ShouldBeNull();
|
|
|
|
var batch = await fx.FetchAsync("CAS", "C1", 5);
|
|
batch.Messages.Count.ShouldBe(1);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerStreamUpdate — consumer_test.go
|
|
// Adding subjects to a stream does not break existing consumers.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Stream_update_does_not_break_consumer()
|
|
{
|
|
// Go: TestJetStreamConsumerStreamUpdate consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CSU", "csu.a");
|
|
|
|
_ = await fx.CreateConsumerAsync("CSU", "C1", "csu.a");
|
|
|
|
_ = await fx.PublishAndGetAckAsync("csu.a", "before-update");
|
|
|
|
// Update stream to add more subjects
|
|
var upd = await fx.RequestLocalAsync(
|
|
"$JS.API.STREAM.UPDATE.CSU",
|
|
"""{"name":"CSU","subjects":["csu.a","csu.b"]}""");
|
|
upd.Error.ShouldBeNull();
|
|
|
|
_ = await fx.PublishAndGetAckAsync("csu.a", "after-update");
|
|
|
|
var batch = await fx.FetchAsync("CSU", "C1", 10);
|
|
batch.Messages.Count.ShouldBe(2);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerAndStreamNamesWithPathSeparators — consumer_test.go
|
|
// Consumer names with dots or slashes are handled correctly.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Consumer_and_stream_names_with_hyphens_are_valid()
|
|
{
|
|
// Go: TestJetStreamConsumerAndStreamNamesWithPathSeparators consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("STREAM-ONE", "sone.>");
|
|
|
|
var resp = await fx.CreateConsumerAsync("STREAM-ONE", "CONSUMER-ONE", "sone.>");
|
|
resp.Error.ShouldBeNull();
|
|
resp.ConsumerInfo!.Config.DurableName.ShouldBe("CONSUMER-ONE");
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerUpdateFilterSubject — consumer_test.go
|
|
// Updating a consumer's filter subject is reflected in info.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Consumer_filter_subject_can_be_updated()
|
|
{
|
|
// Go: TestJetStreamConsumerUpdateFilterSubject consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CUFS", "cufs.>");
|
|
|
|
_ = await fx.CreateConsumerAsync("CUFS", "C1", "cufs.a");
|
|
|
|
// Update filter
|
|
var update = await fx.CreateConsumerAsync("CUFS", "C1", "cufs.b");
|
|
update.Error.ShouldBeNull();
|
|
|
|
var info = await fx.GetConsumerInfoAsync("CUFS", "C1");
|
|
info.Config.FilterSubject.ShouldBe("cufs.b");
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerAndStreamMetadata — consumer_test.go
|
|
// Consumer info includes config metadata.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Consumer_info_includes_config_metadata()
|
|
{
|
|
// Go: TestJetStreamConsumerAndStreamMetadata consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CISM", "cism.>");
|
|
|
|
var create = await fx.CreateConsumerAsync("CISM", "META", "cism.>",
|
|
ackPolicy: AckPolicy.Explicit,
|
|
maxAckPending: 100);
|
|
create.Error.ShouldBeNull();
|
|
|
|
var info = await fx.GetConsumerInfoAsync("CISM", "META");
|
|
info.Config.DurableName.ShouldBe("META");
|
|
info.Config.AckPolicy.ShouldBe(AckPolicy.Explicit);
|
|
info.Config.MaxAckPending.ShouldBe(100);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerDefaultsFromStream — consumer_test.go
|
|
// A consumer created on a stream inherits the stream's subject if no
|
|
// filter is specified (matches all messages on the stream).
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Consumer_without_filter_receives_all_stream_messages()
|
|
{
|
|
// Go: TestJetStreamConsumerDefaultsFromStream consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CDFS", "cdfs.>");
|
|
|
|
_ = await fx.CreateConsumerAsync("CDFS", "C1", null);
|
|
|
|
_ = await fx.PublishAndGetAckAsync("cdfs.a", "1");
|
|
_ = await fx.PublishAndGetAckAsync("cdfs.b", "2");
|
|
_ = await fx.PublishAndGetAckAsync("cdfs.c", "3");
|
|
|
|
var batch = await fx.FetchAsync("CDFS", "C1", 10);
|
|
batch.Messages.Count.ShouldBe(3);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerWithFormattingSymbol — consumer_test.go
|
|
// Consumer durable names can contain underscores and hyphens.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Consumer_durable_name_with_special_chars_is_valid()
|
|
{
|
|
// Go: TestJetStreamConsumerWithFormattingSymbol consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CWFS", "cwfs.>");
|
|
|
|
var resp = await fx.CreateConsumerAsync("CWFS", "my_consumer-v2", "cwfs.>");
|
|
resp.Error.ShouldBeNull();
|
|
resp.ConsumerInfo!.Config.DurableName.ShouldBe("my_consumer-v2");
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerWithCorruptStateIsDeleted — consumer_test.go
|
|
// A ConsumerFileStore with corrupt data initialises to empty state
|
|
// instead of crashing.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public void ConsumerFileStore_with_corrupt_state_initializes_empty()
|
|
{
|
|
// Go: TestJetStreamConsumerWithCorruptStateIsDeleted consumer_test.go
|
|
var dir = Path.Combine(Path.GetTempPath(), $"nats-corrupt-{Guid.NewGuid():N}");
|
|
Directory.CreateDirectory(dir);
|
|
var stateFile = Path.Combine(dir, "state.bin");
|
|
|
|
try
|
|
{
|
|
// Write corrupt data
|
|
File.WriteAllBytes(stateFile, [0xFF, 0xFE, 0x00, 0x01, 0x02, 0x03]);
|
|
|
|
var cfg = new ConsumerConfig
|
|
{
|
|
DurableName = "C",
|
|
AckPolicy = AckPolicy.Explicit,
|
|
};
|
|
|
|
// Should not throw — corrupt state means empty state
|
|
var store = new ConsumerFileStore(stateFile, cfg);
|
|
var state = store.State();
|
|
store.Stop();
|
|
|
|
// Corrupt state → empty/default state
|
|
state.Delivered.Consumer.ShouldBe(0UL);
|
|
}
|
|
finally
|
|
{
|
|
if (Directory.Exists(dir))
|
|
Directory.Delete(dir, recursive: true);
|
|
}
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerNoDeleteAfterConcurrentShutdownAndLeaderChange — consumer_test.go
|
|
// A consumer that is unregistered from the manager truly returns not-found.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Consumer_not_found_after_delete()
|
|
{
|
|
// Go: TestJetStreamConsumerNoDeleteAfterConcurrentShutdownAndLeaderChange consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("NDSL", "ndsl.>");
|
|
|
|
_ = await fx.CreateConsumerAsync("NDSL", "C1", "ndsl.>");
|
|
|
|
var del = await fx.RequestLocalAsync("$JS.API.CONSUMER.DELETE.NDSL.C1", "{}");
|
|
del.Success.ShouldBeTrue();
|
|
|
|
// Info on deleted consumer returns error
|
|
var info = await fx.RequestLocalAsync("$JS.API.CONSUMER.INFO.NDSL.C1", "{}");
|
|
info.Error.ShouldNotBeNull();
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerResetToSequenceConstraintOnStartSeq — consumer_test.go
|
|
// DeliverPolicy.ByStartSequence delivers from the specified sequence onward.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Consumer_reset_to_sequence_start_seq()
|
|
{
|
|
// Go: TestJetStreamConsumerResetToSequenceConstraintOnStartSeq consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CRSS", "crss.>");
|
|
|
|
for (var i = 1; i <= 5; i++)
|
|
_ = await fx.PublishAndGetAckAsync("crss.x", $"msg-{i}");
|
|
|
|
var resp = await fx.RequestLocalAsync(
|
|
"$JS.API.CONSUMER.CREATE.CRSS.C1",
|
|
"""{"durable_name":"C1","filter_subject":"crss.>","deliver_policy":"by_start_sequence","opt_start_seq":3}""");
|
|
resp.Error.ShouldBeNull();
|
|
|
|
var batch = await fx.FetchAsync("CRSS", "C1", 10);
|
|
batch.Messages.Count.ShouldBe(3);
|
|
batch.Messages[0].Sequence.ShouldBe(3UL);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerResetToSequenceConstraintOnStartTime — consumer_test.go
|
|
// DeliverPolicy.ByStartTime delivers from messages at or after the start time.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Consumer_reset_to_sequence_start_time()
|
|
{
|
|
// Go: TestJetStreamConsumerResetToSequenceConstraintOnStartTime consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CRST", "crst.>");
|
|
|
|
_ = await fx.PublishAndGetAckAsync("crst.x", "before");
|
|
_ = await fx.PublishAndGetAckAsync("crst.x", "before-2");
|
|
|
|
var startTime = DateTime.UtcNow.AddSeconds(1).ToString("O");
|
|
|
|
// Small delay so publish after startTime
|
|
await Task.Delay(10);
|
|
_ = await fx.PublishAndGetAckAsync("crst.x", "after");
|
|
|
|
var resp = await fx.RequestLocalAsync(
|
|
"$JS.API.CONSUMER.CREATE.CRST.C1",
|
|
$"{{\"durable_name\":\"C1\",\"filter_subject\":\"crst.>\",\"deliver_policy\":\"by_start_time\",\"opt_start_time_utc\":\"{startTime}\"}}");
|
|
resp.Error.ShouldBeNull();
|
|
|
|
// DeliverPolicy.ByStartTime resolution is approximate; just verify no error
|
|
var info = await fx.GetConsumerInfoAsync("CRST", "C1");
|
|
info.Config.DeliverPolicy.ShouldBe(DeliverPolicy.ByStartTime);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerLegacyDurableCreateSetsConsumerName — consumer_test.go
|
|
// Consumer name is populated from durable_name field.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Legacy_durable_create_sets_consumer_name()
|
|
{
|
|
// Go: TestJetStreamConsumerLegacyDurableCreateSetsConsumerName consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("LDCS", "ldcs.>");
|
|
|
|
var resp = await fx.RequestLocalAsync(
|
|
"$JS.API.CONSUMER.CREATE.LDCS.LEGACY",
|
|
"""{"durable_name":"LEGACY","filter_subject":"ldcs.>"}""");
|
|
resp.Error.ShouldBeNull();
|
|
resp.ConsumerInfo.ShouldNotBeNull();
|
|
resp.ConsumerInfo!.Config.DurableName.ShouldBe("LEGACY");
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamConsumerNotInactiveDuringAckWait — consumer_test.go
|
|
// A consumer with pending acks is considered active (not inactive/expired).
|
|
// After acking all pending messages, pending count returns to 0.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Consumer_not_inactive_during_ack_wait()
|
|
{
|
|
// Go: TestJetStreamConsumerNotInactiveDuringAckWait consumer_test.go
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamAsync("NIDAW", "nidaw.>");
|
|
_ = await fx.CreateConsumerAsync("NIDAW", "C1", "nidaw.>",
|
|
ackPolicy: AckPolicy.Explicit,
|
|
ackWaitMs: 30_000);
|
|
|
|
_ = await fx.PublishAndGetAckAsync("nidaw.x", "msg");
|
|
|
|
var batch = await fx.FetchAsync("NIDAW", "C1", 1);
|
|
batch.Messages.Count.ShouldBe(1);
|
|
|
|
// Consumer has pending ack — should still be "active"
|
|
var pending = await fx.GetPendingCountAsync("NIDAW", "C1");
|
|
pending.ShouldBe(1);
|
|
|
|
// Ack the message — pending drops to 0
|
|
await fx.AckAllAsync("NIDAW", "C1", batch.Messages[0].Sequence);
|
|
var afterAck = await fx.GetPendingCountAsync("NIDAW", "C1");
|
|
afterAck.ShouldBe(0);
|
|
}
|
|
|
|
// =========================================================================
|
|
// PullRequestWaitQueue — priority group round-robin within same priority
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public void Pull_request_queue_round_robins_within_same_priority()
|
|
{
|
|
// Go: TestJetStreamConsumerPriorityPullRequests — pop-and-requeue within priority
|
|
var queue = new PullRequestWaitQueue();
|
|
|
|
queue.Enqueue(new PullWaitingRequest { Priority = 1, Batch = 3, RemainingBatch = 3, Reply = "reply-A" });
|
|
queue.Enqueue(new PullWaitingRequest { Priority = 1, Batch = 3, RemainingBatch = 3, Reply = "reply-B" });
|
|
|
|
// First pop-and-requeue returns A, re-inserts A after B
|
|
var first = queue.PopAndRequeue();
|
|
first.ShouldNotBeNull();
|
|
first!.Reply.ShouldBe("reply-A");
|
|
first.RemainingBatch.ShouldBe(2);
|
|
|
|
// Second pop-and-requeue returns B (A is now at end of priority group)
|
|
var second = queue.PopAndRequeue();
|
|
second.ShouldNotBeNull();
|
|
second!.Reply.ShouldBe("reply-B");
|
|
}
|
|
|
|
// =========================================================================
|
|
// RedeliveryTracker — due entries returned after deadline passes
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task Redelivery_tracker_returns_due_entries_after_deadline()
|
|
{
|
|
// Go: TestJetStreamConsumerMaxDeliveryAndServerRestart — redelivery mechanics
|
|
var tracker = new RedeliveryTracker(backoffMs: [50]);
|
|
|
|
tracker.Schedule(10, 1, ackWaitMs: 50);
|
|
|
|
// Not yet due
|
|
var notYet = tracker.GetDue();
|
|
notYet.Count.ShouldBe(0);
|
|
|
|
// Wait for the deadline to pass
|
|
await Task.Delay(100);
|
|
|
|
var due = tracker.GetDue();
|
|
due.Count.ShouldBe(1);
|
|
due[0].ShouldBe(10UL);
|
|
|
|
// Acknowledge removes from tracker
|
|
tracker.Acknowledge(10);
|
|
tracker.IsTracking(10).ShouldBeFalse();
|
|
}
|
|
|
|
// =========================================================================
|
|
// ConsumerFileStore — encode/decode round-trip
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public void ConsumerFileStore_state_encode_decode_roundtrip()
|
|
{
|
|
// Go: TestJetStreamConsumerWithCorruptStateIsDeleted + state persistence tests
|
|
var dir = Path.Combine(Path.GetTempPath(), $"nats-rt-{Guid.NewGuid():N}");
|
|
Directory.CreateDirectory(dir);
|
|
var stateFile = Path.Combine(dir, "state.bin");
|
|
|
|
try
|
|
{
|
|
var cfg = new ConsumerConfig
|
|
{
|
|
DurableName = "RT",
|
|
AckPolicy = AckPolicy.Explicit,
|
|
};
|
|
|
|
var store = new ConsumerFileStore(stateFile, cfg);
|
|
long tsNs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000;
|
|
store.UpdateDelivered(1, 1, 1, tsNs);
|
|
store.UpdateDelivered(2, 2, 1, tsNs + 1);
|
|
store.Stop();
|
|
|
|
var store2 = new ConsumerFileStore(stateFile, cfg);
|
|
var state = store2.State();
|
|
store2.Stop();
|
|
|
|
state.Delivered.Consumer.ShouldBe(2UL);
|
|
state.Delivered.Stream.ShouldBe(2UL);
|
|
}
|
|
finally
|
|
{
|
|
if (Directory.Exists(dir))
|
|
Directory.Delete(dir, recursive: true);
|
|
}
|
|
}
|
|
}
|