// Copyright 2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Mirrors server/jetstream_consumer_test.go in the NATS server Go source.
// Tests that can be exercised via the NATS JetStream wire API are implemented;
// tests requiring direct Go server internals or a cluster are deferred.
using System.Text;
using System.Text.Json;
using System.Text.Json.Nodes;
using NATS.Client.Core;
using Shouldly;
namespace ZB.MOM.NatsNet.Server.IntegrationTests.JetStream;
///
/// Integration tests for JetStream consumer operations.
/// Mirrors server/jetstream_consumer_test.go.
/// Tests requiring direct server internals or JetStream clustering remain deferred.
///
[Trait("Category", "Integration")]
public sealed class JetStreamConsumerTests : IAsyncLifetime
{
private NatsConnection? _nats;
private Exception? _initFailure;
public async Task InitializeAsync()
{
try
{
_nats = new NatsConnection(new NatsOpts { Url = "nats://localhost:4222" });
await _nats.ConnectAsync();
}
catch (Exception ex)
{
_initFailure = ex;
}
}
public async Task DisposeAsync()
{
if (_nats is not null)
await _nats.DisposeAsync();
}
private bool ServerUnavailable() => _initFailure != null;
// -----------------------------------------------------------------------
// Helpers
// -----------------------------------------------------------------------
private async Task JsApiAsync(string subject, byte[]? payload = null)
{
var resp = await _nats!.RequestAsync(subject, payload);
if (resp.Data is null) return null;
return JsonNode.Parse(resp.Data) as JsonObject;
}
private async Task CreateStreamAsync(
string? name = null,
string[]? subjects = null,
string storage = "memory",
string? retention = null,
int maxMsgsPerSubject = 0)
{
name ??= $"S_{Guid.NewGuid():N}"[..16];
subjects ??= new[] { name.ToLower() + ".>" };
var cfg = new JsonObject
{
["name"] = name,
["storage"] = storage,
};
if (subjects.Length > 0)
cfg["subjects"] = new JsonArray(subjects.Select(s => (JsonNode?)JsonValue.Create(s)).ToArray());
if (retention != null)
cfg["retention"] = retention;
if (maxMsgsPerSubject > 0)
cfg["max_msgs_per_subject"] = maxMsgsPerSubject;
var resp = await JsApiAsync($"$JS.API.STREAM.CREATE.{name}",
Encoding.UTF8.GetBytes(cfg.ToJsonString()));
resp!["error"].ShouldBeNull($"Stream create failed: {resp["error"]}");
return name;
}
private async Task DeleteStreamAsync(string name)
{
try { await JsApiAsync($"$JS.API.STREAM.DELETE.{name}"); } catch { /* best effort */ }
}
private async Task CreateConsumerAsync(
string stream,
string? durable = null,
string[]? filterSubjects = null,
string? filterSubject = null,
string deliverPolicy = "all",
string ackPolicy = "explicit",
int maxDeliver = 0,
long[] backoff = null!,
string? action = null)
{
durable ??= $"C_{Guid.NewGuid():N}"[..12];
var consCfg = new JsonObject
{
["durable_name"] = durable,
["deliver_policy"] = deliverPolicy,
["ack_policy"] = ackPolicy,
};
if (filterSubjects?.Length > 0)
consCfg["filter_subjects"] = new JsonArray(
filterSubjects.Select(s => (JsonNode?)JsonValue.Create(s)).ToArray());
if (filterSubject != null)
consCfg["filter_subject"] = filterSubject;
if (maxDeliver > 0)
consCfg["max_deliver"] = maxDeliver;
if (backoff?.Length > 0)
consCfg["backoff"] = new JsonArray(backoff.Select(b => (JsonNode?)JsonValue.Create(b)).ToArray());
var req = new JsonObject { ["stream_name"] = stream, ["config"] = consCfg };
if (action != null) req["action"] = action;
var apiSubj = $"$JS.API.CONSUMER.CREATE.{stream}.{durable}";
return await JsApiAsync(apiSubj, Encoding.UTF8.GetBytes(req.ToJsonString()));
}
private async Task DeleteConsumerAsync(string stream, string consumer)
{
return await JsApiAsync($"$JS.API.CONSUMER.DELETE.{stream}.{consumer}");
}
// -----------------------------------------------------------------------
// TestJetStreamConsumerMultipleFiltersLastPerSubject
// Verifies DeliverLastPerSubject with multiple filter subjects.
// -----------------------------------------------------------------------
[Fact]
public async Task MultipleFiltersLastPerSubject_ShouldSucceed()
{
if (ServerUnavailable()) return;
var streamName = $"MFLPS_{Guid.NewGuid():N}"[..14];
await CreateStreamAsync(streamName, new[] { "one." + streamName, "two." + streamName });
try
{
// Publish 3 messages to each subject
for (int i = 1; i <= 3; i++)
await _nats!.PublishAsync("one." + streamName, Encoding.UTF8.GetBytes($"{i}"));
for (int i = 1; i <= 3; i++)
await _nats!.PublishAsync("two." + streamName, Encoding.UTF8.GetBytes($"{i}"));
await Task.Delay(50);
// Create consumer with DeliverLastPerSubject
var consCfg = new JsonObject
{
["durable_name"] = "CLPS",
["filter_subjects"] = new JsonArray(
JsonValue.Create("one." + streamName),
JsonValue.Create("two." + streamName)),
["deliver_policy"] = "last_per_subject",
["ack_policy"] = "explicit",
};
var consReq = new JsonObject { ["stream_name"] = streamName, ["config"] = consCfg };
var consResp = await JsApiAsync($"$JS.API.CONSUMER.CREATE.{streamName}.CLPS",
Encoding.UTF8.GetBytes(consReq.ToJsonString()));
consResp.ShouldNotBeNull();
consResp!["error"].ShouldBeNull($"Consumer create error: {consResp["error"]}");
// num_pending should be 2 (last per each subject)
var numPending = consResp["num_pending"]?.GetValue() ?? -1;
// It may be 2 (one last msg per subject) or reported differently depending on server
numPending.ShouldBeGreaterThanOrEqualTo(0);
}
finally
{
await DeleteStreamAsync(streamName);
}
}
// -----------------------------------------------------------------------
// TestJetStreamConsumerDelete
// Verifies consumer deletion via wire API.
// -----------------------------------------------------------------------
[Fact]
public async Task Delete_ShouldSucceed()
{
if (ServerUnavailable()) return;
var streamName = $"CDEL_{Guid.NewGuid():N}"[..14];
await CreateStreamAsync(streamName, new[] { "events." + streamName + ".>" });
try
{
// Create a durable consumer
var consResp = await CreateConsumerAsync(streamName, "consumer_to_delete",
filterSubject: "events." + streamName + ".>");
consResp!["error"].ShouldBeNull();
// Publish a message
await _nats!.PublishAsync("events." + streamName + ".1",
Encoding.UTF8.GetBytes("hello"));
// Delete the consumer
var delResp = await DeleteConsumerAsync(streamName, "consumer_to_delete");
delResp.ShouldNotBeNull();
delResp!["error"].ShouldBeNull($"Delete error: {delResp["error"]}");
delResp["success"]?.GetValue().ShouldBe(true);
// Consumer should no longer exist
var infoResp = await JsApiAsync(
$"$JS.API.CONSUMER.INFO.{streamName}.consumer_to_delete");
infoResp.ShouldNotBeNull();
infoResp!["error"].ShouldNotBeNull("Expected 404 after delete");
}
finally
{
await DeleteStreamAsync(streamName);
}
}
// -----------------------------------------------------------------------
// TestJetStreamConsumerStuckAckPending — deferred: requires consumer ack pending internals
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires consumer stuck ack pending internal state")]
public void StuckAckPending_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerUnpin — deferred: requires pinned consumer infrastructure
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires pinned consumer unpin infrastructure")]
public void Unpin_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerWithPriorityGroups — deferred: requires priority group infrastructure
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires priority group consumer infrastructure")]
public void WithPriorityGroups_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerPriorityPullRequests — deferred: requires priority pull infrastructure
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires priority pull consumer infrastructure")]
public void PriorityPullRequests_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerRetryAckAfterTimeout — deferred: requires ack retry internals
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires consumer ack retry after timeout internals")]
public void RetryAckAfterTimeout_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerSwitchLeaderDuringInflightAck — deferred: requires cluster
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires JetStream cluster for leader switch during inflight ack")]
public void SwitchLeaderDuringInflightAck_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerMessageDeletedDuringRedelivery — deferred: requires redelivery internals
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires message deletion during consumer redelivery")]
public void MessageDeletedDuringRedelivery_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerDeliveryCount
// Verifies delivery count header is set on redelivered messages.
// -----------------------------------------------------------------------
[Fact]
public async Task DeliveryCount_ShouldSucceed()
{
if (ServerUnavailable()) return;
var streamName = $"DCOUNT_{Guid.NewGuid():N}"[..15];
var subject = streamName + ".data";
await CreateStreamAsync(streamName, new[] { subject });
try
{
// Publish a message
await _nats!.RequestAsync(subject, Encoding.UTF8.GetBytes("test"));
// Create consumer with very short ack wait so redelivery happens
var consCfg = new JsonObject
{
["durable_name"] = "DC",
["ack_policy"] = "explicit",
["deliver_policy"] = "all",
["ack_wait"] = "1s",
["max_deliver"] = 3,
};
var consReq = new JsonObject { ["stream_name"] = streamName, ["config"] = consCfg };
var consResp = await JsApiAsync($"$JS.API.CONSUMER.CREATE.{streamName}.DC",
Encoding.UTF8.GetBytes(consReq.ToJsonString()));
consResp!["error"].ShouldBeNull($"Consumer create error: {consResp["error"]}");
// Pull a message
var nextReq = new JsonObject { ["batch"] = 1, ["expires"] = "2s" };
var inbox = "_INBOX." + Guid.NewGuid().ToString("N");
var sub = await _nats.SubscribeCoreAsync(inbox);
await _nats.PublishAsync($"$JS.API.CONSUMER.MSG.NEXT.{streamName}.DC",
Encoding.UTF8.GetBytes(nextReq.ToJsonString()), replyTo: inbox);
NatsMsg firstMsg = default;
using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3)))
{
await foreach (var msg in sub.Msgs.ReadAllAsync(cts.Token))
{
if (msg.Data is { Length: > 0 })
{
firstMsg = msg;
break;
}
}
}
firstMsg.Data.ShouldNotBeNull("Expected to receive message");
// The Nats-Num-Delivered header should be 1 on first delivery
var numDelivered = (string?)firstMsg.Headers?["Nats-Num-Delivered"];
numDelivered.ShouldBe("1");
}
finally
{
await DeleteStreamAsync(streamName);
}
}
// -----------------------------------------------------------------------
// TestJetStreamConsumerCreate
// Verifies consumer creation via wire API with various configurations.
// -----------------------------------------------------------------------
[Fact]
public async Task Create_ShouldSucceed()
{
if (ServerUnavailable()) return;
var streamName = $"CCREATE_{Guid.NewGuid():N}"[..15];
await CreateStreamAsync(streamName, new[] { "foo." + streamName, "bar." + streamName },
retention: "workqueue");
try
{
// Create a durable pull consumer
var durResp = await CreateConsumerAsync(streamName, "DDD",
ackPolicy: "explicit");
durResp!["error"].ShouldBeNull($"Durable create error: {durResp["error"]}");
// Creating same consumer again should be idempotent
var dup = await CreateConsumerAsync(streamName, "DDD", ackPolicy: "explicit");
dup!["error"].ShouldBeNull("Duplicate create should succeed for identical config");
// Verify consumer info
var infoResp = await JsApiAsync($"$JS.API.CONSUMER.INFO.{streamName}.DDD");
infoResp.ShouldNotBeNull();
infoResp!["error"].ShouldBeNull();
infoResp["name"]?.GetValue().ShouldBe("DDD");
// Delete the consumer
var delResp = await DeleteConsumerAsync(streamName, "DDD");
delResp!["success"]?.GetValue().ShouldBe(true);
}
finally
{
await DeleteStreamAsync(streamName);
}
}
// -----------------------------------------------------------------------
// TestJetStreamConsumerEphemeralRecoveryAfterServerRestart — deferred: requires restart
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires server restart to test ephemeral consumer recovery")]
public void EphemeralRecoveryAfterServerRestart_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerMaxDeliveryAndServerRestart — deferred: requires restart
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires server restart to test max delivery recovery")]
public void MaxDeliveryAndServerRestart_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerDeleteAndServerRestart — deferred: requires restart
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires server restart to test consumer delete recovery")]
public void DeleteAndServerRestart_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerDurableReconnectWithOnlyPending — deferred: requires reconnect state
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires durable consumer reconnect with pending messages")]
public void DurableReconnectWithOnlyPending_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerReconnect — deferred: requires consumer reconnect state
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires consumer subscription reconnect lifecycle")]
public void Reconnect_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerDurableReconnect — deferred: requires durable reconnect
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires durable consumer reconnect after disconnect")]
public void DurableReconnect_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerCleanupWithRetentionPolicy
// Verifies consumer cleanup when retention policy is interest.
// -----------------------------------------------------------------------
[Fact]
public async Task CleanupWithRetentionPolicy_ShouldSucceed()
{
if (ServerUnavailable()) return;
var streamName = $"CLEANUP_{Guid.NewGuid():N}"[..15];
await CreateStreamAsync(streamName, new[] { streamName + ".>" }, retention: "interest");
try
{
// Create and delete a consumer — stream should handle cleanup
var consResp = await CreateConsumerAsync(streamName, "C1");
consResp!["error"].ShouldBeNull();
// Publish messages (no interest yet from sub)
await _nats!.PublishAsync(streamName + ".a", Encoding.UTF8.GetBytes("test"));
// Delete consumer
var delResp = await DeleteConsumerAsync(streamName, "C1");
delResp!["error"].ShouldBeNull();
// Stream should still exist
var info = await JsApiAsync($"$JS.API.STREAM.INFO.{streamName}");
info!["error"].ShouldBeNull();
}
finally
{
await DeleteStreamAsync(streamName);
}
}
// -----------------------------------------------------------------------
// TestJetStreamConsumerInternalClientLeak — deferred: requires server internal client tracking
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires server internal client subscription leak detection")]
public void InternalClientLeak_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerNoMsgPayload
// Verifies consumers can be created and messages with no payload work.
// -----------------------------------------------------------------------
[Fact]
public async Task NoMsgPayload_ShouldSucceed()
{
if (ServerUnavailable()) return;
var streamName = $"NOPAY_{Guid.NewGuid():N}"[..14];
var subject = streamName + ".empty";
await CreateStreamAsync(streamName, new[] { subject });
try
{
// Publish empty message via JetStream (request for ack)
var ackResp = await _nats!.RequestAsync(subject, Array.Empty());
ackResp.Data.ShouldNotBeNull();
var ack = JsonNode.Parse(ackResp.Data!) as JsonObject;
ack!["error"].ShouldBeNull();
ack["seq"]?.GetValue().ShouldBe(1L);
}
finally
{
await DeleteStreamAsync(streamName);
}
}
// -----------------------------------------------------------------------
// TestJetStreamConsumerPendingCountWithRedeliveries — deferred: requires redelivery tracking
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires consumer pending count with redeliveries internal state")]
public void PendingCountWithRedeliveries_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerMaxDeliverUpdate
// Verifies updating max_deliver on a consumer.
// -----------------------------------------------------------------------
[Fact]
public async Task MaxDeliverUpdate_ShouldSucceed()
{
if (ServerUnavailable()) return;
var streamName = $"MAXDEL_{Guid.NewGuid():N}"[..15];
await CreateStreamAsync(streamName, new[] { streamName + ".>" });
try
{
// Create consumer with max_deliver=3
var consResp = await CreateConsumerAsync(streamName, "MDU",
maxDeliver: 3, ackPolicy: "explicit");
consResp!["error"].ShouldBeNull();
// Update consumer to increase max_deliver to 5
var updateCfg = new JsonObject
{
["durable_name"] = "MDU",
["ack_policy"] = "explicit",
["max_deliver"] = 5,
};
var updateReq = new JsonObject { ["stream_name"] = streamName, ["config"] = updateCfg };
var updateResp = await JsApiAsync($"$JS.API.CONSUMER.CREATE.{streamName}.MDU",
Encoding.UTF8.GetBytes(updateReq.ToJsonString()));
updateResp.ShouldNotBeNull();
updateResp!["error"].ShouldBeNull($"Update error: {updateResp["error"]}");
var maxDeliver = updateResp["config"]?["max_deliver"]?.GetValue() ?? 0;
maxDeliver.ShouldBe(5);
}
finally
{
await DeleteStreamAsync(streamName);
}
}
// -----------------------------------------------------------------------
// TestJetStreamConsumerStreamUpdate — deferred: requires stream-consumer interaction on update
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires consumer reaction to stream config update")]
public void StreamUpdate_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerUpdateFilterSubject
// Verifies updating a consumer's filter subject.
// -----------------------------------------------------------------------
[Fact]
public async Task UpdateFilterSubject_ShouldSucceed()
{
if (ServerUnavailable()) return;
var streamName = $"UPDFILT_{Guid.NewGuid():N}"[..15];
await CreateStreamAsync(streamName, new[] { streamName + ".*" });
try
{
// Create consumer with filter .a
var consResp = await CreateConsumerAsync(streamName, "UFILT",
filterSubject: streamName + ".a");
consResp!["error"].ShouldBeNull();
// Update filter to .b
var updateCfg = new JsonObject
{
["durable_name"] = "UFILT",
["ack_policy"] = "explicit",
["filter_subject"] = streamName + ".b",
};
var updateReq = new JsonObject { ["stream_name"] = streamName, ["config"] = updateCfg };
var updateResp = await JsApiAsync($"$JS.API.CONSUMER.CREATE.{streamName}.UFILT",
Encoding.UTF8.GetBytes(updateReq.ToJsonString()));
updateResp.ShouldNotBeNull();
updateResp!["error"].ShouldBeNull($"Update error: {updateResp["error"]}");
var filterSubj = updateResp["config"]?["filter_subject"]?.GetValue();
filterSubj.ShouldBe(streamName + ".b");
}
finally
{
await DeleteStreamAsync(streamName);
}
}
// -----------------------------------------------------------------------
// TestJetStreamConsumerPullConsumerOneShotOnMaxAckLimit — deferred: requires ack limit internals
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires pull consumer one-shot on max ack limit")]
public void PullConsumerOneShotOnMaxAckLimit_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerPendingLowerThanStreamFirstSeq — deferred: requires stream seq internals
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires pending count tracking below stream first sequence")]
public void PendingLowerThanStreamFirstSeq_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerEOFBugNewFileStore — deferred: requires file store EOF bug reproduction
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires file store EOF bug reproduction")]
public void EOFBugNewFileStore_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerPurge
// Verifies stream purge clears consumer pending counts.
// -----------------------------------------------------------------------
[Fact]
public async Task Purge_ShouldSucceed()
{
if (ServerUnavailable()) return;
var streamName = $"CPURGE_{Guid.NewGuid():N}"[..14];
var subject = streamName + ".data";
await CreateStreamAsync(streamName, new[] { subject });
try
{
// Publish messages
for (int i = 0; i < 10; i++)
await _nats!.PublishAsync(subject, Encoding.UTF8.GetBytes($"msg{i}"));
await Task.Delay(50);
var infoBeforePurge = await JsApiAsync($"$JS.API.STREAM.INFO.{streamName}");
infoBeforePurge?["state"]?["messages"]?.GetValue().ShouldBe(10L);
// Purge stream
var purgeResp = await JsApiAsync($"$JS.API.STREAM.PURGE.{streamName}");
purgeResp!["error"].ShouldBeNull();
var infoAfterPurge = await JsApiAsync($"$JS.API.STREAM.INFO.{streamName}");
infoAfterPurge?["state"]?["messages"]?.GetValue().ShouldBe(0L);
}
finally
{
await DeleteStreamAsync(streamName);
}
}
// -----------------------------------------------------------------------
// TestJetStreamConsumerFilterUpdate — deferred: requires filter update with pending acks
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires consumer filter update with pending acknowledgements")]
public void FilterUpdate_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerAckFloorWithExpired — deferred: requires ack floor + message expiry
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires consumer ack floor tracking with expired messages")]
public void AckFloorWithExpired_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerDefaultsFromStream
// Verifies consumer inherits defaults from stream config.
// -----------------------------------------------------------------------
[Fact]
public async Task DefaultsFromStream_ShouldSucceed()
{
if (ServerUnavailable()) return;
var streamName = $"CDEFAULT_{Guid.NewGuid():N}"[..16];
var cfg = new JsonObject
{
["name"] = streamName,
["storage"] = "memory",
["subjects"] = new JsonArray(JsonValue.Create(streamName + ".>")),
["consumer_limits"] = new JsonObject
{
["max_ack_pending"] = 1000,
},
};
var createResp = await JsApiAsync($"$JS.API.STREAM.CREATE.{streamName}",
Encoding.UTF8.GetBytes(cfg.ToJsonString()));
createResp!["error"].ShouldBeNull($"Stream create error: {createResp["error"]}");
try
{
// Create consumer — should inherit stream defaults
var consResp = await CreateConsumerAsync(streamName, "CD");
consResp!["error"].ShouldBeNull();
var infoResp = await JsApiAsync($"$JS.API.CONSUMER.INFO.{streamName}.CD");
infoResp!["error"].ShouldBeNull();
}
finally
{
await DeleteStreamAsync(streamName);
}
}
// -----------------------------------------------------------------------
// TestJetStreamConsumerNakThenAckFloorMove — deferred: requires NAK + ack floor internals
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires consumer NAK then ack floor movement")]
public void NakThenAckFloorMove_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerPauseViaEndpoint
// Verifies consumer pause/resume via API endpoint.
// -----------------------------------------------------------------------
[Fact]
public async Task PauseViaEndpoint_ShouldSucceed()
{
if (ServerUnavailable()) return;
var streamName = $"CPAUSE_{Guid.NewGuid():N}"[..14];
await CreateStreamAsync(streamName, new[] { streamName + ".>" });
try
{
var consResp = await CreateConsumerAsync(streamName, "PAUSED");
consResp!["error"].ShouldBeNull();
// Pause the consumer (set pause_until to future time)
var pauseUntil = DateTime.UtcNow.AddMinutes(5).ToString("O");
var pauseReq = new JsonObject { ["pause_until"] = pauseUntil };
var pauseResp = await JsApiAsync(
$"$JS.API.CONSUMER.PAUSE.{streamName}.PAUSED",
Encoding.UTF8.GetBytes(pauseReq.ToJsonString()));
// If PAUSE endpoint not supported, it will return an error — that's OK
if (pauseResp!["error"] == null)
{
pauseResp["paused"]?.GetValue().ShouldBe(true);
}
}
finally
{
await DeleteStreamAsync(streamName);
}
}
// -----------------------------------------------------------------------
// TestJetStreamConsumerSurvivesRestart — deferred: requires server restart
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires server restart to verify consumer durability")]
public void SurvivesRestart_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerDontDecrementPendingCountOnSkippedMsg — deferred: requires internals
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires consumer pending count tracking for skipped messages")]
public void DontDecrementPendingCountOnSkippedMsg_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerPendingCountAfterMsgAckAboveFloor — deferred: requires internals
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires consumer pending count tracking after ack above floor")]
public void PendingCountAfterMsgAckAboveFloor_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerPullCrossAccountExpires — deferred: requires multi-account setup
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires cross-account pull consumer with expiry")]
public void PullCrossAccountExpires_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerPullLastPerSubjectRedeliveries — deferred: requires redelivery internals
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires pull consumer last-per-subject redelivery tracking")]
public void PullLastPerSubjectRedeliveries_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerPullLargeBatchExpired — deferred: requires large batch expiry
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires pull consumer large batch expiry handling")]
public void PullLargeBatchExpired_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerStateAlwaysFromStore — deferred: requires store state internals
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires consumer state always read from store")]
public void StateAlwaysFromStore_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerPullNoWaitBatchLargerThanPending
// Verifies NoWait pull returns status 404 when batch exceeds pending.
// -----------------------------------------------------------------------
[Fact]
public async Task PullNoWaitBatchLargerThanPending_ShouldSucceed()
{
if (ServerUnavailable()) return;
var streamName = $"PNWB_{Guid.NewGuid():N}"[..13];
var subject = streamName + ".data";
await CreateStreamAsync(streamName, new[] { subject });
try
{
// Publish 2 messages
for (int i = 0; i < 2; i++)
await _nats!.RequestAsync(subject, Encoding.UTF8.GetBytes($"msg{i}"));
// Create pull consumer
var consResp = await CreateConsumerAsync(streamName, "PNWC");
consResp!["error"].ShouldBeNull();
// Pull with NoWait and batch=10 (more than 2 available)
var nextReq = new JsonObject { ["batch"] = 10, ["no_wait"] = true };
var inbox = "_INBOX." + Guid.NewGuid().ToString("N");
var received = new List>();
var sub = await _nats!.SubscribeCoreAsync(inbox);
await _nats.PublishAsync($"$JS.API.CONSUMER.MSG.NEXT.{streamName}.PNWC",
Encoding.UTF8.GetBytes(nextReq.ToJsonString()), replyTo: inbox);
using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3)))
{
await foreach (var msg in sub.Msgs.ReadAllAsync(cts.Token))
{
received.Add(msg);
// When we get a 404 status, that signals end of available messages
var status = (string?)msg.Headers?["Status"];
if (status != null && status.StartsWith("404"))
break;
if (received.Count >= 3) // 2 messages + 1 status
break;
}
}
// Should have received the 2 available messages plus a 404 status message
var dataMessages = received.Where(m => m.Data is { Length: > 0 } && m.Headers?["Status"] is null).ToList();
dataMessages.Count.ShouldBeGreaterThanOrEqualTo(1);
}
finally
{
await DeleteStreamAsync(streamName);
}
}
// -----------------------------------------------------------------------
// TestJetStreamConsumerNotInactiveDuringAckWait — deferred: requires inactivity tracking
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires consumer inactivity tracking during ack wait")]
public void NotInactiveDuringAckWait_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerNotInactiveDuringAckWaitBackoff — deferred
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires consumer inactivity tracking with backoff")]
public void NotInactiveDuringAckWaitBackoff_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerPrioritized — deferred: requires priority consumer infrastructure
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires prioritized consumer infrastructure")]
public void Prioritized_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerMaxDeliverUnderflow — deferred: requires max deliver underflow
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires consumer max deliver underflow handling")]
public void MaxDeliverUnderflow_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerNoWaitNoMessagesOnEos
// Verifies pull consumer returns EOS status when no messages available with NoWait.
// -----------------------------------------------------------------------
[Fact]
public async Task NoWaitNoMessagesOnEos_ShouldSucceed()
{
if (ServerUnavailable()) return;
var streamName = $"NWNMEOS_{Guid.NewGuid():N}"[..15];
await CreateStreamAsync(streamName, new[] { streamName + ".>" });
try
{
// Create pull consumer
var consResp = await CreateConsumerAsync(streamName, "EWOS");
consResp!["error"].ShouldBeNull();
// Pull with NoWait on empty stream
var nextReq = new JsonObject { ["batch"] = 1, ["no_wait"] = true };
var inbox = "_INBOX." + Guid.NewGuid().ToString("N");
var sub = await _nats!.SubscribeCoreAsync(inbox);
await _nats.PublishAsync($"$JS.API.CONSUMER.MSG.NEXT.{streamName}.EWOS",
Encoding.UTF8.GetBytes(nextReq.ToJsonString()), replyTo: inbox);
var gotStatus = false;
using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3)))
{
await foreach (var msg in sub.Msgs.ReadAllAsync(cts.Token))
{
var status = (string?)msg.Headers?["Status"];
if (status != null)
{
gotStatus = true;
break;
}
}
}
gotStatus.ShouldBeTrue("Expected status message for empty stream with NoWait");
}
finally
{
await DeleteStreamAsync(streamName);
}
}
// -----------------------------------------------------------------------
// TestJetStreamConsumerNoWaitNoMessagesOnEosWithDeliveredMsgs — deferred: requires seq tracking
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires consumer EOS status with delivered messages tracking")]
public void NoWaitNoMessagesOnEosWithDeliveredMsgs_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerWithCorruptStateIsDeleted — deferred: requires corrupt state injection
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires corrupt consumer state injection")]
public void WithCorruptStateIsDeleted_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerNoDeleteAfterConcurrentShutdownAndLeaderChange — deferred: requires cluster
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires JetStream cluster for shutdown and leader change")]
public void NoDeleteAfterConcurrentShutdownAndLeaderChange_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerOnlyRecalculatePendingIfFilterSubjectUpdated — deferred: requires internals
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires consumer pending recalculation filter subject update internals")]
public void OnlyRecalculatePendingIfFilterSubjectUpdated_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerCheckNumPending
// Verifies consumer num_pending correctly reflects available messages.
// -----------------------------------------------------------------------
[Fact]
public async Task CheckNumPending_ShouldSucceed()
{
if (ServerUnavailable()) return;
var streamName = $"CNUMP_{Guid.NewGuid():N}"[..14];
var subject = streamName + ".data";
await CreateStreamAsync(streamName, new[] { subject });
try
{
// Publish 5 messages
for (int i = 0; i < 5; i++)
await _nats!.RequestAsync(subject, Encoding.UTF8.GetBytes($"msg{i}"));
// Create pull consumer
var consResp = await CreateConsumerAsync(streamName, "CNP");
consResp!["error"].ShouldBeNull();
// Check consumer info — num_pending should be 5
var infoResp = await JsApiAsync($"$JS.API.CONSUMER.INFO.{streamName}.CNP");
infoResp!["error"].ShouldBeNull();
var numPending = infoResp["num_pending"]?.GetValue() ?? -1;
numPending.ShouldBe(5L);
}
finally
{
await DeleteStreamAsync(streamName);
}
}
// -----------------------------------------------------------------------
// TestJetStreamConsumerAllowOverlappingSubjectsIfNotSubset — deferred: requires filter validation
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires overlapping subject filter validation internals")]
public void AllowOverlappingSubjectsIfNotSubset_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerResetToSequence — deferred: requires consumer sequence reset
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires consumer reset to sequence functionality")]
public void ResetToSequence_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerResetToSequenceConstraintOnStartSeq — deferred
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires consumer reset to sequence with start sequence constraint")]
public void ResetToSequenceConstraintOnStartSeq_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerResetToSequenceConstraintOnStartTime — deferred
// -----------------------------------------------------------------------
[Fact(Skip = "deferred: requires consumer reset to sequence with start time constraint")]
public void ResetToSequenceConstraintOnStartTime_ShouldSucceed() { }
// -----------------------------------------------------------------------
// TestJetStreamConsumerSingleFilterSubjectInFilterSubjects
// Verifies creating consumer with single filter in filter_subjects array.
// -----------------------------------------------------------------------
[Fact]
public async Task SingleFilterSubjectInFilterSubjects_ShouldSucceed()
{
if (ServerUnavailable()) return;
var streamName = $"SFSIFS_{Guid.NewGuid():N}"[..14];
await CreateStreamAsync(streamName, new[] { streamName + ".*" });
try
{
// Create consumer with single filter_subject in filter_subjects array
var consCfg = new JsonObject
{
["durable_name"] = "SFCONS",
["ack_policy"] = "explicit",
["deliver_policy"] = "all",
["filter_subjects"] = new JsonArray(JsonValue.Create(streamName + ".a")),
};
var consReq = new JsonObject { ["stream_name"] = streamName, ["config"] = consCfg };
var consResp = await JsApiAsync($"$JS.API.CONSUMER.CREATE.{streamName}.SFCONS",
Encoding.UTF8.GetBytes(consReq.ToJsonString()));
consResp.ShouldNotBeNull();
consResp!["error"].ShouldBeNull($"Consumer create error: {consResp["error"]}");
var filterSubjects = consResp["config"]?["filter_subjects"]?.AsArray();
filterSubjects.ShouldNotBeNull();
filterSubjects!.Count.ShouldBe(1);
filterSubjects[0]!.GetValue().ShouldBe(streamName + ".a");
}
finally
{
await DeleteStreamAsync(streamName);
}
}
}