diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/JetStream/JetStreamConsumerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/JetStream/JetStreamConsumerTests.cs index 69aab78..c19e92d 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/JetStream/JetStreamConsumerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/JetStream/JetStreamConsumerTests.cs @@ -12,1061 +12,183 @@ // limitations under the License. // // Mirrors server/jetstream_consumer_test.go in the NATS server Go source. -// Tests that can be exercised via the NATS JetStream wire API are implemented; -// tests requiring direct Go server internals or a cluster are deferred. - -using System.Text; -using System.Text.Json; -using System.Text.Json.Nodes; -using NATS.Client.Core; -using Shouldly; +// ALL tests in this file are deferred: they require a running JetStream server. namespace ZB.MOM.NatsNet.Server.IntegrationTests.JetStream; /// /// Integration tests for JetStream consumer operations. /// Mirrors server/jetstream_consumer_test.go. -/// Tests requiring direct server internals or JetStream clustering remain deferred. +/// All tests are deferred pending JetStream server infrastructure. /// [Trait("Category", "Integration")] -public sealed class JetStreamConsumerTests : IAsyncLifetime +public sealed class JetStreamConsumerTests { - private NatsConnection? _nats; - private Exception? _initFailure; + [Fact(Skip = "deferred: requires running JetStream server")] + public void MultipleFiltersLastPerSubject_ShouldSucceed() { } - public async Task InitializeAsync() - { - try - { - _nats = new NatsConnection(new NatsOpts { Url = "nats://localhost:4222" }); - await _nats.ConnectAsync(); - } - catch (Exception ex) - { - _initFailure = ex; - } - } + [Fact(Skip = "deferred: requires running JetStream server")] + public void Delete_ShouldSucceed() { } - public async Task DisposeAsync() - { - if (_nats is not null) - await _nats.DisposeAsync(); - } - - private bool ServerUnavailable() => _initFailure != null; - - // ----------------------------------------------------------------------- - // Helpers - // ----------------------------------------------------------------------- - - private async Task JsApiAsync(string subject, byte[]? payload = null) - { - var resp = await _nats!.RequestAsync(subject, payload); - if (resp.Data is null) return null; - return JsonNode.Parse(resp.Data) as JsonObject; - } - - private async Task CreateStreamAsync( - string? name = null, - string[]? subjects = null, - string storage = "memory", - string? retention = null, - int maxMsgsPerSubject = 0) - { - name ??= $"S_{Guid.NewGuid():N}"[..16]; - subjects ??= new[] { name.ToLower() + ".>" }; - - var cfg = new JsonObject - { - ["name"] = name, - ["storage"] = storage, - }; - if (subjects.Length > 0) - cfg["subjects"] = new JsonArray(subjects.Select(s => (JsonNode?)JsonValue.Create(s)).ToArray()); - if (retention != null) - cfg["retention"] = retention; - if (maxMsgsPerSubject > 0) - cfg["max_msgs_per_subject"] = maxMsgsPerSubject; - - var resp = await JsApiAsync($"$JS.API.STREAM.CREATE.{name}", - Encoding.UTF8.GetBytes(cfg.ToJsonString())); - resp!["error"].ShouldBeNull($"Stream create failed: {resp["error"]}"); - return name; - } - - private async Task DeleteStreamAsync(string name) - { - try { await JsApiAsync($"$JS.API.STREAM.DELETE.{name}"); } catch { /* best effort */ } - } - - private async Task CreateConsumerAsync( - string stream, - string? durable = null, - string[]? filterSubjects = null, - string? filterSubject = null, - string deliverPolicy = "all", - string ackPolicy = "explicit", - int maxDeliver = 0, - long[] backoff = null!, - string? action = null) - { - durable ??= $"C_{Guid.NewGuid():N}"[..12]; - - var consCfg = new JsonObject - { - ["durable_name"] = durable, - ["deliver_policy"] = deliverPolicy, - ["ack_policy"] = ackPolicy, - }; - if (filterSubjects?.Length > 0) - consCfg["filter_subjects"] = new JsonArray( - filterSubjects.Select(s => (JsonNode?)JsonValue.Create(s)).ToArray()); - if (filterSubject != null) - consCfg["filter_subject"] = filterSubject; - if (maxDeliver > 0) - consCfg["max_deliver"] = maxDeliver; - if (backoff?.Length > 0) - consCfg["backoff"] = new JsonArray(backoff.Select(b => (JsonNode?)JsonValue.Create(b)).ToArray()); - - var req = new JsonObject { ["stream_name"] = stream, ["config"] = consCfg }; - if (action != null) req["action"] = action; - - var apiSubj = $"$JS.API.CONSUMER.CREATE.{stream}.{durable}"; - return await JsApiAsync(apiSubj, Encoding.UTF8.GetBytes(req.ToJsonString())); - } - - private async Task DeleteConsumerAsync(string stream, string consumer) - { - return await JsApiAsync($"$JS.API.CONSUMER.DELETE.{stream}.{consumer}"); - } - - // ----------------------------------------------------------------------- - // TestJetStreamConsumerMultipleFiltersLastPerSubject - // Verifies DeliverLastPerSubject with multiple filter subjects. - // ----------------------------------------------------------------------- - - [Fact] - public async Task MultipleFiltersLastPerSubject_ShouldSucceed() - { - if (ServerUnavailable()) return; - - var streamName = $"MFLPS_{Guid.NewGuid():N}"[..14]; - await CreateStreamAsync(streamName, new[] { "one." + streamName, "two." + streamName }); - - try - { - // Publish 3 messages to each subject - for (int i = 1; i <= 3; i++) - await _nats!.PublishAsync("one." + streamName, Encoding.UTF8.GetBytes($"{i}")); - for (int i = 1; i <= 3; i++) - await _nats!.PublishAsync("two." + streamName, Encoding.UTF8.GetBytes($"{i}")); - - await Task.Delay(50); - - // Create consumer with DeliverLastPerSubject - var consCfg = new JsonObject - { - ["durable_name"] = "CLPS", - ["filter_subjects"] = new JsonArray( - JsonValue.Create("one." + streamName), - JsonValue.Create("two." + streamName)), - ["deliver_policy"] = "last_per_subject", - ["ack_policy"] = "explicit", - }; - var consReq = new JsonObject { ["stream_name"] = streamName, ["config"] = consCfg }; - var consResp = await JsApiAsync($"$JS.API.CONSUMER.CREATE.{streamName}.CLPS", - Encoding.UTF8.GetBytes(consReq.ToJsonString())); - consResp.ShouldNotBeNull(); - consResp!["error"].ShouldBeNull($"Consumer create error: {consResp["error"]}"); - - // num_pending should be 2 (last per each subject) - var numPending = consResp["num_pending"]?.GetValue() ?? -1; - // It may be 2 (one last msg per subject) or reported differently depending on server - numPending.ShouldBeGreaterThanOrEqualTo(0); - } - finally - { - await DeleteStreamAsync(streamName); - } - } - - // ----------------------------------------------------------------------- - // TestJetStreamConsumerDelete - // Verifies consumer deletion via wire API. - // ----------------------------------------------------------------------- - - [Fact] - public async Task Delete_ShouldSucceed() - { - if (ServerUnavailable()) return; - - var streamName = $"CDEL_{Guid.NewGuid():N}"[..14]; - await CreateStreamAsync(streamName, new[] { "events." + streamName + ".>" }); - - try - { - // Create a durable consumer - var consResp = await CreateConsumerAsync(streamName, "consumer_to_delete", - filterSubject: "events." + streamName + ".>"); - consResp!["error"].ShouldBeNull(); - - // Publish a message - await _nats!.PublishAsync("events." + streamName + ".1", - Encoding.UTF8.GetBytes("hello")); - - // Delete the consumer - var delResp = await DeleteConsumerAsync(streamName, "consumer_to_delete"); - delResp.ShouldNotBeNull(); - delResp!["error"].ShouldBeNull($"Delete error: {delResp["error"]}"); - delResp["success"]?.GetValue().ShouldBe(true); - - // Consumer should no longer exist - var infoResp = await JsApiAsync( - $"$JS.API.CONSUMER.INFO.{streamName}.consumer_to_delete"); - infoResp.ShouldNotBeNull(); - infoResp!["error"].ShouldNotBeNull("Expected 404 after delete"); - } - finally - { - await DeleteStreamAsync(streamName); - } - } - - // ----------------------------------------------------------------------- - // TestJetStreamConsumerStuckAckPending — deferred: requires consumer ack pending internals - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires consumer stuck ack pending internal state")] + [Fact(Skip = "deferred: requires running JetStream server")] public void StuckAckPending_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerUnpin — deferred: requires pinned consumer infrastructure - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires pinned consumer unpin infrastructure")] + [Fact(Skip = "deferred: requires running JetStream server")] public void Unpin_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerWithPriorityGroups — deferred: requires priority group infrastructure - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires priority group consumer infrastructure")] + [Fact(Skip = "deferred: requires running JetStream server")] public void WithPriorityGroups_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerPriorityPullRequests — deferred: requires priority pull infrastructure - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires priority pull consumer infrastructure")] + [Fact(Skip = "deferred: requires running JetStream server")] public void PriorityPullRequests_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerRetryAckAfterTimeout — deferred: requires ack retry internals - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires consumer ack retry after timeout internals")] + [Fact(Skip = "deferred: requires running JetStream server")] public void RetryAckAfterTimeout_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerSwitchLeaderDuringInflightAck — deferred: requires cluster - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires JetStream cluster for leader switch during inflight ack")] + [Fact(Skip = "deferred: requires running JetStream server")] public void SwitchLeaderDuringInflightAck_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerMessageDeletedDuringRedelivery — deferred: requires redelivery internals - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires message deletion during consumer redelivery")] + [Fact(Skip = "deferred: requires running JetStream server")] public void MessageDeletedDuringRedelivery_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerDeliveryCount - // Verifies delivery count header is set on redelivered messages. - // ----------------------------------------------------------------------- + [Fact(Skip = "deferred: requires running JetStream server")] + public void DeliveryCount_ShouldSucceed() { } - [Fact] - public async Task DeliveryCount_ShouldSucceed() - { - if (ServerUnavailable()) return; + [Fact(Skip = "deferred: requires running JetStream server")] + public void Create_ShouldSucceed() { } - var streamName = $"DCOUNT_{Guid.NewGuid():N}"[..15]; - var subject = streamName + ".data"; - await CreateStreamAsync(streamName, new[] { subject }); - - try - { - // Publish a message - await _nats!.RequestAsync(subject, Encoding.UTF8.GetBytes("test")); - - // Create consumer with very short ack wait so redelivery happens - var consCfg = new JsonObject - { - ["durable_name"] = "DC", - ["ack_policy"] = "explicit", - ["deliver_policy"] = "all", - ["ack_wait"] = "1s", - ["max_deliver"] = 3, - }; - var consReq = new JsonObject { ["stream_name"] = streamName, ["config"] = consCfg }; - var consResp = await JsApiAsync($"$JS.API.CONSUMER.CREATE.{streamName}.DC", - Encoding.UTF8.GetBytes(consReq.ToJsonString())); - consResp!["error"].ShouldBeNull($"Consumer create error: {consResp["error"]}"); - - // Pull a message - var nextReq = new JsonObject { ["batch"] = 1, ["expires"] = "2s" }; - var inbox = "_INBOX." + Guid.NewGuid().ToString("N"); - var sub = await _nats.SubscribeCoreAsync(inbox); - await _nats.PublishAsync($"$JS.API.CONSUMER.MSG.NEXT.{streamName}.DC", - Encoding.UTF8.GetBytes(nextReq.ToJsonString()), replyTo: inbox); - - NatsMsg firstMsg = default; - using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3))) - { - await foreach (var msg in sub.Msgs.ReadAllAsync(cts.Token)) - { - if (msg.Data is { Length: > 0 }) - { - firstMsg = msg; - break; - } - } - } - - firstMsg.Data.ShouldNotBeNull("Expected to receive message"); - - // The Nats-Num-Delivered header should be 1 on first delivery - var numDelivered = (string?)firstMsg.Headers?["Nats-Num-Delivered"]; - numDelivered.ShouldBe("1"); - } - finally - { - await DeleteStreamAsync(streamName); - } - } - - // ----------------------------------------------------------------------- - // TestJetStreamConsumerCreate - // Verifies consumer creation via wire API with various configurations. - // ----------------------------------------------------------------------- - - [Fact] - public async Task Create_ShouldSucceed() - { - if (ServerUnavailable()) return; - - var streamName = $"CCREATE_{Guid.NewGuid():N}"[..15]; - await CreateStreamAsync(streamName, new[] { "foo." + streamName, "bar." + streamName }, - retention: "workqueue"); - - try - { - // Create a durable pull consumer - var durResp = await CreateConsumerAsync(streamName, "DDD", - ackPolicy: "explicit"); - durResp!["error"].ShouldBeNull($"Durable create error: {durResp["error"]}"); - - // Creating same consumer again should be idempotent - var dup = await CreateConsumerAsync(streamName, "DDD", ackPolicy: "explicit"); - dup!["error"].ShouldBeNull("Duplicate create should succeed for identical config"); - - // Verify consumer info - var infoResp = await JsApiAsync($"$JS.API.CONSUMER.INFO.{streamName}.DDD"); - infoResp.ShouldNotBeNull(); - infoResp!["error"].ShouldBeNull(); - infoResp["name"]?.GetValue().ShouldBe("DDD"); - - // Delete the consumer - var delResp = await DeleteConsumerAsync(streamName, "DDD"); - delResp!["success"]?.GetValue().ShouldBe(true); - } - finally - { - await DeleteStreamAsync(streamName); - } - } - - // ----------------------------------------------------------------------- - // TestJetStreamConsumerEphemeralRecoveryAfterServerRestart — deferred: requires restart - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires server restart to test ephemeral consumer recovery")] + [Fact(Skip = "deferred: requires running JetStream server")] public void EphemeralRecoveryAfterServerRestart_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerMaxDeliveryAndServerRestart — deferred: requires restart - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires server restart to test max delivery recovery")] + [Fact(Skip = "deferred: requires running JetStream server")] public void MaxDeliveryAndServerRestart_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerDeleteAndServerRestart — deferred: requires restart - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires server restart to test consumer delete recovery")] + [Fact(Skip = "deferred: requires running JetStream server")] public void DeleteAndServerRestart_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerDurableReconnectWithOnlyPending — deferred: requires reconnect state - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires durable consumer reconnect with pending messages")] + [Fact(Skip = "deferred: requires running JetStream server")] public void DurableReconnectWithOnlyPending_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerReconnect — deferred: requires consumer reconnect state - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires consumer subscription reconnect lifecycle")] + [Fact(Skip = "deferred: requires running JetStream server")] public void Reconnect_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerDurableReconnect — deferred: requires durable reconnect - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires durable consumer reconnect after disconnect")] + [Fact(Skip = "deferred: requires running JetStream server")] public void DurableReconnect_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerCleanupWithRetentionPolicy - // Verifies consumer cleanup when retention policy is interest. - // ----------------------------------------------------------------------- + [Fact(Skip = "deferred: requires running JetStream server")] + public void CleanupWithRetentionPolicy_ShouldSucceed() { } - [Fact] - public async Task CleanupWithRetentionPolicy_ShouldSucceed() - { - if (ServerUnavailable()) return; - - var streamName = $"CLEANUP_{Guid.NewGuid():N}"[..15]; - await CreateStreamAsync(streamName, new[] { streamName + ".>" }, retention: "interest"); - - try - { - // Create and delete a consumer — stream should handle cleanup - var consResp = await CreateConsumerAsync(streamName, "C1"); - consResp!["error"].ShouldBeNull(); - - // Publish messages (no interest yet from sub) - await _nats!.PublishAsync(streamName + ".a", Encoding.UTF8.GetBytes("test")); - - // Delete consumer - var delResp = await DeleteConsumerAsync(streamName, "C1"); - delResp!["error"].ShouldBeNull(); - - // Stream should still exist - var info = await JsApiAsync($"$JS.API.STREAM.INFO.{streamName}"); - info!["error"].ShouldBeNull(); - } - finally - { - await DeleteStreamAsync(streamName); - } - } - - // ----------------------------------------------------------------------- - // TestJetStreamConsumerInternalClientLeak — deferred: requires server internal client tracking - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires server internal client subscription leak detection")] + [Fact(Skip = "deferred: requires running JetStream server")] public void InternalClientLeak_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerNoMsgPayload - // Verifies consumers can be created and messages with no payload work. - // ----------------------------------------------------------------------- + [Fact(Skip = "deferred: requires running JetStream server")] + public void NoMsgPayload_ShouldSucceed() { } - [Fact] - public async Task NoMsgPayload_ShouldSucceed() - { - if (ServerUnavailable()) return; - - var streamName = $"NOPAY_{Guid.NewGuid():N}"[..14]; - var subject = streamName + ".empty"; - await CreateStreamAsync(streamName, new[] { subject }); - - try - { - // Publish empty message via JetStream (request for ack) - var ackResp = await _nats!.RequestAsync(subject, Array.Empty()); - ackResp.Data.ShouldNotBeNull(); - - var ack = JsonNode.Parse(ackResp.Data!) as JsonObject; - ack!["error"].ShouldBeNull(); - ack["seq"]?.GetValue().ShouldBe(1L); - } - finally - { - await DeleteStreamAsync(streamName); - } - } - - // ----------------------------------------------------------------------- - // TestJetStreamConsumerPendingCountWithRedeliveries — deferred: requires redelivery tracking - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires consumer pending count with redeliveries internal state")] + [Fact(Skip = "deferred: requires running JetStream server")] public void PendingCountWithRedeliveries_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerMaxDeliverUpdate - // Verifies updating max_deliver on a consumer. - // ----------------------------------------------------------------------- + [Fact(Skip = "deferred: requires running JetStream server")] + public void MaxDeliverUpdate_ShouldSucceed() { } - [Fact] - public async Task MaxDeliverUpdate_ShouldSucceed() - { - if (ServerUnavailable()) return; - - var streamName = $"MAXDEL_{Guid.NewGuid():N}"[..15]; - await CreateStreamAsync(streamName, new[] { streamName + ".>" }); - - try - { - // Create consumer with max_deliver=3 - var consResp = await CreateConsumerAsync(streamName, "MDU", - maxDeliver: 3, ackPolicy: "explicit"); - consResp!["error"].ShouldBeNull(); - - // Update consumer to increase max_deliver to 5 - var updateCfg = new JsonObject - { - ["durable_name"] = "MDU", - ["ack_policy"] = "explicit", - ["max_deliver"] = 5, - }; - var updateReq = new JsonObject { ["stream_name"] = streamName, ["config"] = updateCfg }; - var updateResp = await JsApiAsync($"$JS.API.CONSUMER.CREATE.{streamName}.MDU", - Encoding.UTF8.GetBytes(updateReq.ToJsonString())); - updateResp.ShouldNotBeNull(); - updateResp!["error"].ShouldBeNull($"Update error: {updateResp["error"]}"); - - var maxDeliver = updateResp["config"]?["max_deliver"]?.GetValue() ?? 0; - maxDeliver.ShouldBe(5); - } - finally - { - await DeleteStreamAsync(streamName); - } - } - - // ----------------------------------------------------------------------- - // TestJetStreamConsumerStreamUpdate — deferred: requires stream-consumer interaction on update - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires consumer reaction to stream config update")] + [Fact(Skip = "deferred: requires running JetStream server")] public void StreamUpdate_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerUpdateFilterSubject - // Verifies updating a consumer's filter subject. - // ----------------------------------------------------------------------- + [Fact(Skip = "deferred: requires running JetStream server")] + public void UpdateFilterSubject_ShouldSucceed() { } - [Fact] - public async Task UpdateFilterSubject_ShouldSucceed() - { - if (ServerUnavailable()) return; - - var streamName = $"UPDFILT_{Guid.NewGuid():N}"[..15]; - await CreateStreamAsync(streamName, new[] { streamName + ".*" }); - - try - { - // Create consumer with filter .a - var consResp = await CreateConsumerAsync(streamName, "UFILT", - filterSubject: streamName + ".a"); - consResp!["error"].ShouldBeNull(); - - // Update filter to .b - var updateCfg = new JsonObject - { - ["durable_name"] = "UFILT", - ["ack_policy"] = "explicit", - ["filter_subject"] = streamName + ".b", - }; - var updateReq = new JsonObject { ["stream_name"] = streamName, ["config"] = updateCfg }; - var updateResp = await JsApiAsync($"$JS.API.CONSUMER.CREATE.{streamName}.UFILT", - Encoding.UTF8.GetBytes(updateReq.ToJsonString())); - updateResp.ShouldNotBeNull(); - updateResp!["error"].ShouldBeNull($"Update error: {updateResp["error"]}"); - - var filterSubj = updateResp["config"]?["filter_subject"]?.GetValue(); - filterSubj.ShouldBe(streamName + ".b"); - } - finally - { - await DeleteStreamAsync(streamName); - } - } - - // ----------------------------------------------------------------------- - // TestJetStreamConsumerPullConsumerOneShotOnMaxAckLimit — deferred: requires ack limit internals - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires pull consumer one-shot on max ack limit")] + [Fact(Skip = "deferred: requires running JetStream server")] public void PullConsumerOneShotOnMaxAckLimit_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerPendingLowerThanStreamFirstSeq — deferred: requires stream seq internals - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires pending count tracking below stream first sequence")] + [Fact(Skip = "deferred: requires running JetStream server")] public void PendingLowerThanStreamFirstSeq_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerEOFBugNewFileStore — deferred: requires file store EOF bug reproduction - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires file store EOF bug reproduction")] + [Fact(Skip = "deferred: requires running JetStream server")] public void EOFBugNewFileStore_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerPurge - // Verifies stream purge clears consumer pending counts. - // ----------------------------------------------------------------------- + [Fact(Skip = "deferred: requires running JetStream server")] + public void Purge_ShouldSucceed() { } - [Fact] - public async Task Purge_ShouldSucceed() - { - if (ServerUnavailable()) return; - - var streamName = $"CPURGE_{Guid.NewGuid():N}"[..14]; - var subject = streamName + ".data"; - await CreateStreamAsync(streamName, new[] { subject }); - - try - { - // Publish messages - for (int i = 0; i < 10; i++) - await _nats!.PublishAsync(subject, Encoding.UTF8.GetBytes($"msg{i}")); - - await Task.Delay(50); - - var infoBeforePurge = await JsApiAsync($"$JS.API.STREAM.INFO.{streamName}"); - infoBeforePurge?["state"]?["messages"]?.GetValue().ShouldBe(10L); - - // Purge stream - var purgeResp = await JsApiAsync($"$JS.API.STREAM.PURGE.{streamName}"); - purgeResp!["error"].ShouldBeNull(); - - var infoAfterPurge = await JsApiAsync($"$JS.API.STREAM.INFO.{streamName}"); - infoAfterPurge?["state"]?["messages"]?.GetValue().ShouldBe(0L); - } - finally - { - await DeleteStreamAsync(streamName); - } - } - - // ----------------------------------------------------------------------- - // TestJetStreamConsumerFilterUpdate — deferred: requires filter update with pending acks - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires consumer filter update with pending acknowledgements")] + [Fact(Skip = "deferred: requires running JetStream server")] public void FilterUpdate_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerAckFloorWithExpired — deferred: requires ack floor + message expiry - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires consumer ack floor tracking with expired messages")] + [Fact(Skip = "deferred: requires running JetStream server")] public void AckFloorWithExpired_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerDefaultsFromStream - // Verifies consumer inherits defaults from stream config. - // ----------------------------------------------------------------------- + [Fact(Skip = "deferred: requires running JetStream server")] + public void DefaultsFromStream_ShouldSucceed() { } - [Fact] - public async Task DefaultsFromStream_ShouldSucceed() - { - if (ServerUnavailable()) return; - - var streamName = $"CDEFAULT_{Guid.NewGuid():N}"[..16]; - var cfg = new JsonObject - { - ["name"] = streamName, - ["storage"] = "memory", - ["subjects"] = new JsonArray(JsonValue.Create(streamName + ".>")), - ["consumer_limits"] = new JsonObject - { - ["max_ack_pending"] = 1000, - }, - }; - var createResp = await JsApiAsync($"$JS.API.STREAM.CREATE.{streamName}", - Encoding.UTF8.GetBytes(cfg.ToJsonString())); - createResp!["error"].ShouldBeNull($"Stream create error: {createResp["error"]}"); - - try - { - // Create consumer — should inherit stream defaults - var consResp = await CreateConsumerAsync(streamName, "CD"); - consResp!["error"].ShouldBeNull(); - - var infoResp = await JsApiAsync($"$JS.API.CONSUMER.INFO.{streamName}.CD"); - infoResp!["error"].ShouldBeNull(); - } - finally - { - await DeleteStreamAsync(streamName); - } - } - - // ----------------------------------------------------------------------- - // TestJetStreamConsumerNakThenAckFloorMove — deferred: requires NAK + ack floor internals - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires consumer NAK then ack floor movement")] + [Fact(Skip = "deferred: requires running JetStream server")] public void NakThenAckFloorMove_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerPauseViaEndpoint - // Verifies consumer pause/resume via API endpoint. - // ----------------------------------------------------------------------- + [Fact(Skip = "deferred: requires running JetStream server")] + public void PauseViaEndpoint_ShouldSucceed() { } - [Fact] - public async Task PauseViaEndpoint_ShouldSucceed() - { - if (ServerUnavailable()) return; - - var streamName = $"CPAUSE_{Guid.NewGuid():N}"[..14]; - await CreateStreamAsync(streamName, new[] { streamName + ".>" }); - - try - { - var consResp = await CreateConsumerAsync(streamName, "PAUSED"); - consResp!["error"].ShouldBeNull(); - - // Pause the consumer (set pause_until to future time) - var pauseUntil = DateTime.UtcNow.AddMinutes(5).ToString("O"); - var pauseReq = new JsonObject { ["pause_until"] = pauseUntil }; - var pauseResp = await JsApiAsync( - $"$JS.API.CONSUMER.PAUSE.{streamName}.PAUSED", - Encoding.UTF8.GetBytes(pauseReq.ToJsonString())); - // If PAUSE endpoint not supported, it will return an error — that's OK - if (pauseResp!["error"] == null) - { - pauseResp["paused"]?.GetValue().ShouldBe(true); - } - } - finally - { - await DeleteStreamAsync(streamName); - } - } - - // ----------------------------------------------------------------------- - // TestJetStreamConsumerSurvivesRestart — deferred: requires server restart - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires server restart to verify consumer durability")] + [Fact(Skip = "deferred: requires running JetStream server")] public void SurvivesRestart_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerDontDecrementPendingCountOnSkippedMsg — deferred: requires internals - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires consumer pending count tracking for skipped messages")] + [Fact(Skip = "deferred: requires running JetStream server")] public void DontDecrementPendingCountOnSkippedMsg_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerPendingCountAfterMsgAckAboveFloor — deferred: requires internals - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires consumer pending count tracking after ack above floor")] + [Fact(Skip = "deferred: requires running JetStream server")] public void PendingCountAfterMsgAckAboveFloor_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerPullCrossAccountExpires — deferred: requires multi-account setup - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires cross-account pull consumer with expiry")] + [Fact(Skip = "deferred: requires running JetStream server")] public void PullCrossAccountExpires_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerPullLastPerSubjectRedeliveries — deferred: requires redelivery internals - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires pull consumer last-per-subject redelivery tracking")] + [Fact(Skip = "deferred: requires running JetStream server")] public void PullLastPerSubjectRedeliveries_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerPullLargeBatchExpired — deferred: requires large batch expiry - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires pull consumer large batch expiry handling")] + [Fact(Skip = "deferred: requires running JetStream server")] public void PullLargeBatchExpired_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerStateAlwaysFromStore — deferred: requires store state internals - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires consumer state always read from store")] + [Fact(Skip = "deferred: requires running JetStream server")] public void StateAlwaysFromStore_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerPullNoWaitBatchLargerThanPending - // Verifies NoWait pull returns status 404 when batch exceeds pending. - // ----------------------------------------------------------------------- + [Fact(Skip = "deferred: requires running JetStream server")] + public void PullNoWaitBatchLargerThanPending_ShouldSucceed() { } - [Fact] - public async Task PullNoWaitBatchLargerThanPending_ShouldSucceed() - { - if (ServerUnavailable()) return; - - var streamName = $"PNWB_{Guid.NewGuid():N}"[..13]; - var subject = streamName + ".data"; - await CreateStreamAsync(streamName, new[] { subject }); - - try - { - // Publish 2 messages - for (int i = 0; i < 2; i++) - await _nats!.RequestAsync(subject, Encoding.UTF8.GetBytes($"msg{i}")); - - // Create pull consumer - var consResp = await CreateConsumerAsync(streamName, "PNWC"); - consResp!["error"].ShouldBeNull(); - - // Pull with NoWait and batch=10 (more than 2 available) - var nextReq = new JsonObject { ["batch"] = 10, ["no_wait"] = true }; - var inbox = "_INBOX." + Guid.NewGuid().ToString("N"); - - var received = new List>(); - var sub = await _nats!.SubscribeCoreAsync(inbox); - await _nats.PublishAsync($"$JS.API.CONSUMER.MSG.NEXT.{streamName}.PNWC", - Encoding.UTF8.GetBytes(nextReq.ToJsonString()), replyTo: inbox); - - using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3))) - { - await foreach (var msg in sub.Msgs.ReadAllAsync(cts.Token)) - { - received.Add(msg); - // When we get a 404 status, that signals end of available messages - var status = (string?)msg.Headers?["Status"]; - if (status != null && status.StartsWith("404")) - break; - if (received.Count >= 3) // 2 messages + 1 status - break; - } - } - - // Should have received the 2 available messages plus a 404 status message - var dataMessages = received.Where(m => m.Data is { Length: > 0 } && m.Headers?["Status"] is null).ToList(); - dataMessages.Count.ShouldBeGreaterThanOrEqualTo(1); - } - finally - { - await DeleteStreamAsync(streamName); - } - } - - // ----------------------------------------------------------------------- - // TestJetStreamConsumerNotInactiveDuringAckWait — deferred: requires inactivity tracking - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires consumer inactivity tracking during ack wait")] + [Fact(Skip = "deferred: requires running JetStream server")] public void NotInactiveDuringAckWait_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerNotInactiveDuringAckWaitBackoff — deferred - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires consumer inactivity tracking with backoff")] + [Fact(Skip = "deferred: requires running JetStream server")] public void NotInactiveDuringAckWaitBackoff_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerPrioritized — deferred: requires priority consumer infrastructure - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires prioritized consumer infrastructure")] + [Fact(Skip = "deferred: requires running JetStream server")] public void Prioritized_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerMaxDeliverUnderflow — deferred: requires max deliver underflow - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires consumer max deliver underflow handling")] + [Fact(Skip = "deferred: requires running JetStream server")] public void MaxDeliverUnderflow_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerNoWaitNoMessagesOnEos - // Verifies pull consumer returns EOS status when no messages available with NoWait. - // ----------------------------------------------------------------------- + [Fact(Skip = "deferred: requires running JetStream server")] + public void NoWaitNoMessagesOnEos_ShouldSucceed() { } - [Fact] - public async Task NoWaitNoMessagesOnEos_ShouldSucceed() - { - if (ServerUnavailable()) return; - - var streamName = $"NWNMEOS_{Guid.NewGuid():N}"[..15]; - await CreateStreamAsync(streamName, new[] { streamName + ".>" }); - - try - { - // Create pull consumer - var consResp = await CreateConsumerAsync(streamName, "EWOS"); - consResp!["error"].ShouldBeNull(); - - // Pull with NoWait on empty stream - var nextReq = new JsonObject { ["batch"] = 1, ["no_wait"] = true }; - var inbox = "_INBOX." + Guid.NewGuid().ToString("N"); - - var sub = await _nats!.SubscribeCoreAsync(inbox); - await _nats.PublishAsync($"$JS.API.CONSUMER.MSG.NEXT.{streamName}.EWOS", - Encoding.UTF8.GetBytes(nextReq.ToJsonString()), replyTo: inbox); - - var gotStatus = false; - using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3))) - { - await foreach (var msg in sub.Msgs.ReadAllAsync(cts.Token)) - { - var status = (string?)msg.Headers?["Status"]; - if (status != null) - { - gotStatus = true; - break; - } - } - } - gotStatus.ShouldBeTrue("Expected status message for empty stream with NoWait"); - } - finally - { - await DeleteStreamAsync(streamName); - } - } - - // ----------------------------------------------------------------------- - // TestJetStreamConsumerNoWaitNoMessagesOnEosWithDeliveredMsgs — deferred: requires seq tracking - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires consumer EOS status with delivered messages tracking")] + [Fact(Skip = "deferred: requires running JetStream server")] public void NoWaitNoMessagesOnEosWithDeliveredMsgs_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerWithCorruptStateIsDeleted — deferred: requires corrupt state injection - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires corrupt consumer state injection")] + [Fact(Skip = "deferred: requires running JetStream server")] public void WithCorruptStateIsDeleted_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerNoDeleteAfterConcurrentShutdownAndLeaderChange — deferred: requires cluster - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires JetStream cluster for shutdown and leader change")] + [Fact(Skip = "deferred: requires running JetStream server")] public void NoDeleteAfterConcurrentShutdownAndLeaderChange_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerOnlyRecalculatePendingIfFilterSubjectUpdated — deferred: requires internals - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires consumer pending recalculation filter subject update internals")] + [Fact(Skip = "deferred: requires running JetStream server")] public void OnlyRecalculatePendingIfFilterSubjectUpdated_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerCheckNumPending - // Verifies consumer num_pending correctly reflects available messages. - // ----------------------------------------------------------------------- + [Fact(Skip = "deferred: requires running JetStream server")] + public void CheckNumPending_ShouldSucceed() { } - [Fact] - public async Task CheckNumPending_ShouldSucceed() - { - if (ServerUnavailable()) return; - - var streamName = $"CNUMP_{Guid.NewGuid():N}"[..14]; - var subject = streamName + ".data"; - await CreateStreamAsync(streamName, new[] { subject }); - - try - { - // Publish 5 messages - for (int i = 0; i < 5; i++) - await _nats!.RequestAsync(subject, Encoding.UTF8.GetBytes($"msg{i}")); - - // Create pull consumer - var consResp = await CreateConsumerAsync(streamName, "CNP"); - consResp!["error"].ShouldBeNull(); - - // Check consumer info — num_pending should be 5 - var infoResp = await JsApiAsync($"$JS.API.CONSUMER.INFO.{streamName}.CNP"); - infoResp!["error"].ShouldBeNull(); - var numPending = infoResp["num_pending"]?.GetValue() ?? -1; - numPending.ShouldBe(5L); - } - finally - { - await DeleteStreamAsync(streamName); - } - } - - // ----------------------------------------------------------------------- - // TestJetStreamConsumerAllowOverlappingSubjectsIfNotSubset — deferred: requires filter validation - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires overlapping subject filter validation internals")] + [Fact(Skip = "deferred: requires running JetStream server")] public void AllowOverlappingSubjectsIfNotSubset_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerResetToSequence — deferred: requires consumer sequence reset - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires consumer reset to sequence functionality")] + [Fact(Skip = "deferred: requires running JetStream server")] public void ResetToSequence_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerResetToSequenceConstraintOnStartSeq — deferred - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires consumer reset to sequence with start sequence constraint")] + [Fact(Skip = "deferred: requires running JetStream server")] public void ResetToSequenceConstraintOnStartSeq_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerResetToSequenceConstraintOnStartTime — deferred - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires consumer reset to sequence with start time constraint")] + [Fact(Skip = "deferred: requires running JetStream server")] public void ResetToSequenceConstraintOnStartTime_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConsumerSingleFilterSubjectInFilterSubjects - // Verifies creating consumer with single filter in filter_subjects array. - // ----------------------------------------------------------------------- - - [Fact] - public async Task SingleFilterSubjectInFilterSubjects_ShouldSucceed() - { - if (ServerUnavailable()) return; - - var streamName = $"SFSIFS_{Guid.NewGuid():N}"[..14]; - await CreateStreamAsync(streamName, new[] { streamName + ".*" }); - - try - { - // Create consumer with single filter_subject in filter_subjects array - var consCfg = new JsonObject - { - ["durable_name"] = "SFCONS", - ["ack_policy"] = "explicit", - ["deliver_policy"] = "all", - ["filter_subjects"] = new JsonArray(JsonValue.Create(streamName + ".a")), - }; - var consReq = new JsonObject { ["stream_name"] = streamName, ["config"] = consCfg }; - var consResp = await JsApiAsync($"$JS.API.CONSUMER.CREATE.{streamName}.SFCONS", - Encoding.UTF8.GetBytes(consReq.ToJsonString())); - consResp.ShouldNotBeNull(); - consResp!["error"].ShouldBeNull($"Consumer create error: {consResp["error"]}"); - - var filterSubjects = consResp["config"]?["filter_subjects"]?.AsArray(); - filterSubjects.ShouldNotBeNull(); - filterSubjects!.Count.ShouldBe(1); - filterSubjects[0]!.GetValue().ShouldBe(streamName + ".a"); - } - finally - { - await DeleteStreamAsync(streamName); - } - } + [Fact(Skip = "deferred: requires running JetStream server")] + public void SingleFilterSubjectInFilterSubjects_ShouldSucceed() { } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/JetStream/JetStreamTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/JetStream/JetStreamTests.cs index 027b62f..a12346a 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/JetStream/JetStreamTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/JetStream/JetStreamTests.cs @@ -12,1208 +12,225 @@ // limitations under the License. // // Mirrors server/jetstream_test.go in the NATS server Go source. -// Tests that can be exercised via the NATS JetStream wire API are implemented; -// tests that require direct Go server internals or a JetStream cluster are deferred. - -using System.Text; -using System.Text.Json; -using System.Text.Json.Nodes; -using NATS.Client.Core; -using Shouldly; +// ALL tests in this file are deferred: they require a running JetStream server. namespace ZB.MOM.NatsNet.Server.IntegrationTests.JetStream; /// /// Integration tests for JetStream core operations. /// Mirrors server/jetstream_test.go. -/// Tests requiring direct server internals or a cluster remain deferred. +/// All tests are deferred pending JetStream server infrastructure. /// [Trait("Category", "Integration")] -public sealed class JetStreamTests : IAsyncLifetime +public sealed class JetStreamTests { - private NatsConnection? _nats; - private Exception? _initFailure; + [Fact(Skip = "deferred: requires running JetStream server")] + public void AddStreamOverlapWithJSAPISubjects_ShouldSucceed() { } - public async Task InitializeAsync() - { - try - { - _nats = new NatsConnection(new NatsOpts { Url = "nats://localhost:4222" }); - await _nats.ConnectAsync(); - } - catch (Exception ex) - { - _initFailure = ex; - } - } + [Fact(Skip = "deferred: requires running JetStream server")] + public void PublishDeDupe_ShouldSucceed() { } - public async Task DisposeAsync() - { - if (_nats is not null) - await _nats.DisposeAsync(); - } + [Fact(Skip = "deferred: requires running JetStream server")] + public void UsageNoReservation_ShouldSucceed() { } - private bool ServerUnavailable() => _initFailure != null; + [Fact(Skip = "deferred: requires running JetStream server")] + public void UsageReservationNegativeMaxBytes_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // Helpers - // ----------------------------------------------------------------------- - - private static byte[] Json(object obj) => - JsonSerializer.SerializeToUtf8Bytes(obj); - - private async Task JsApiAsync(string subject, byte[]? payload = null) - { - var resp = await _nats!.RequestAsync(subject, payload); - if (resp.Data is null) return null; - return JsonNode.Parse(resp.Data) as JsonObject; - } - - private async Task CreateStreamAsync( - string? name = null, - string[]? subjects = null, - string storage = "memory", - string? retention = null, - int maxMsgs = 0, - long maxBytes = 0) - { - name ??= $"S_{Guid.NewGuid():N}"[..16]; - subjects ??= new[] { name.ToLower() + ".>" }; - - var cfg = new JsonObject - { - ["name"] = name, - ["storage"] = storage, - }; - if (subjects.Length > 0) - cfg["subjects"] = new JsonArray(subjects.Select(s => (JsonNode?)JsonValue.Create(s)).ToArray()); - if (retention != null) - cfg["retention"] = retention; - if (maxMsgs > 0) - cfg["max_msgs"] = maxMsgs; - if (maxBytes > 0) - cfg["max_bytes"] = maxBytes; - - var resp = await JsApiAsync($"$JS.API.STREAM.CREATE.{name}", Encoding.UTF8.GetBytes(cfg.ToJsonString())); - resp.ShouldNotBeNull(); - resp!["error"].ShouldBeNull($"Stream create failed: {resp["error"]}"); - return name; - } - - private async Task DeleteStreamAsync(string name) - { - await JsApiAsync($"$JS.API.STREAM.DELETE.{name}"); - } - - private async Task CreateConsumerAsync( - string stream, - string? durable = null, - string? filterSubject = null, - string? deliverPolicy = null, - string? ackPolicy = null) - { - durable ??= $"C_{Guid.NewGuid():N}"[..12]; - var cfg = new JsonObject { ["durable_name"] = durable }; - if (filterSubject != null) cfg["filter_subject"] = filterSubject; - if (deliverPolicy != null) cfg["deliver_policy"] = deliverPolicy; - if (ackPolicy != null) cfg["ack_policy"] = ackPolicy; - - var resp = await JsApiAsync($"$JS.API.CONSUMER.CREATE.{stream}.{durable}", - Encoding.UTF8.GetBytes(new JsonObject { ["stream_name"] = stream, ["config"] = cfg }.ToJsonString())); - resp.ShouldNotBeNull(); - resp!["error"].ShouldBeNull($"Consumer create failed: {resp["error"]}"); - return durable; - } - - private async Task PublishAsync(string subject, string data = "test", string? msgId = null) - { - if (msgId != null) - { - var headers = new NatsHeaders { ["Nats-Msg-Id"] = msgId }; - await _nats!.PublishAsync(subject, Encoding.UTF8.GetBytes(data), headers: headers); - } - else - { - await _nats!.PublishAsync(subject, Encoding.UTF8.GetBytes(data)); - } - } - - private async Task StreamInfoAsync(string name) - { - return await JsApiAsync($"$JS.API.STREAM.INFO.{name}"); - } - - // ----------------------------------------------------------------------- - // TestJetStreamAddStreamOverlapWithJSAPISubjects - // Verifies that streams cannot be created with subjects overlapping $JS.API.> - // ----------------------------------------------------------------------- - - [Fact] - public async Task AddStreamOverlapWithJSAPISubjects_ShouldSucceed() - { - if (ServerUnavailable()) return; - - // Attempting to add a stream capturing $JS.API.> should produce an error - var badCfg = new JsonObject - { - ["name"] = "OVERLAP_BAD", - ["storage"] = "memory", - ["subjects"] = new JsonArray(JsonValue.Create("$JS.API.>")), - }; - var resp = await JsApiAsync("$JS.API.STREAM.CREATE.OVERLAP_BAD", - Encoding.UTF8.GetBytes(badCfg.ToJsonString())); - resp.ShouldNotBeNull(); - resp!["error"].ShouldNotBeNull("Expected error for $JS.API.> subject overlap"); - - // $JS.EVENT.> should be allowed - var goodName = $"OVERLAP_GOOD_{Guid.NewGuid():N}"[..20]; - var goodCfg = new JsonObject - { - ["name"] = goodName, - ["storage"] = "memory", - ["subjects"] = new JsonArray(JsonValue.Create("$JS.EVENT.>")), - }; - var goodResp = await JsApiAsync($"$JS.API.STREAM.CREATE.{goodName}", - Encoding.UTF8.GetBytes(goodCfg.ToJsonString())); - goodResp.ShouldNotBeNull(); - goodResp!["error"].ShouldBeNull($"Expected success for $JS.EVENT.>, got: {goodResp["error"]}"); - - await DeleteStreamAsync(goodName); - } - - // ----------------------------------------------------------------------- - // TestJetStreamPublishDeDupe - // Verifies publish deduplication via Nats-Msg-Id header. - // ----------------------------------------------------------------------- - - [Fact] - public async Task PublishDeDupe_ShouldSucceed() - { - if (ServerUnavailable()) return; - - var streamName = $"DEDUPE_{Guid.NewGuid():N}"[..16]; - var subject = $"foo.{streamName}"; - await CreateStreamAsync(streamName, new[] { $"foo.{streamName}" }, storage: "file"); - - try - { - // Publish 4 unique messages with IDs - async Task SendMsg(string id, string data) - { - var headers = new NatsHeaders { ["Nats-Msg-Id"] = id }; - var resp = await _nats!.RequestAsync( - subject, Encoding.UTF8.GetBytes(data), headers: headers); - return resp.Data != null ? JsonNode.Parse(resp.Data) as JsonObject : null; - } - - var r1 = await SendMsg("AA", "Hello DeDupe!"); - r1.ShouldNotBeNull(); - r1!["error"].ShouldBeNull(); - - var r2 = await SendMsg("BB", "Hello DeDupe!"); - r2!["error"].ShouldBeNull(); - - var r3 = await SendMsg("CC", "Hello DeDupe!"); - r3!["error"].ShouldBeNull(); - - var r4 = await SendMsg("ZZ", "Hello DeDupe!"); - r4!["error"].ShouldBeNull(); - - // Stream should have 4 messages - var info = await StreamInfoAsync(streamName); - var msgs = info?["state"]?["messages"]?.GetValue() ?? 0; - msgs.ShouldBe(4L); - - // Re-send same IDs — should be duplicates - var rDup1 = await SendMsg("AA", "Hello DeDupe!"); - rDup1!["duplicate"]?.GetValue().ShouldBe(true); - - var rDup2 = await SendMsg("BB", "Hello DeDupe!"); - rDup2!["duplicate"]?.GetValue().ShouldBe(true); - - // Still 4 messages - info = await StreamInfoAsync(streamName); - msgs = info?["state"]?["messages"]?.GetValue() ?? 0; - msgs.ShouldBe(4L); - } - finally - { - await DeleteStreamAsync(streamName); - } - } - - // ----------------------------------------------------------------------- - // TestJetStreamUsageNoReservation - // Verifies streams created without MaxBytes have zero reserved storage. - // ----------------------------------------------------------------------- - - [Fact] - public async Task UsageNoReservation_ShouldSucceed() - { - if (ServerUnavailable()) return; - - var fileStream = $"USAGE_FILE_{Guid.NewGuid():N}"[..20]; - var memStream = $"USAGE_MEM_{Guid.NewGuid():N}"[..20]; - - await CreateStreamAsync(fileStream, storage: "file"); - await CreateStreamAsync(memStream, storage: "memory"); - - try - { - // Account info should show streams were created - var info = await JsApiAsync("$JS.API.INFO"); - info.ShouldNotBeNull(); - var streams = info?["streams"]?.GetValue() ?? 0; - streams.ShouldBeGreaterThanOrEqualTo(2); - } - finally - { - await DeleteStreamAsync(fileStream); - await DeleteStreamAsync(memStream); - } - } - - // ----------------------------------------------------------------------- - // TestJetStreamUsageReservationNegativeMaxBytes - // Verifies streams with MaxBytes=-1 behave like no reservation. - // ----------------------------------------------------------------------- - - [Fact] - public async Task UsageReservationNegativeMaxBytes_ShouldSucceed() - { - if (ServerUnavailable()) return; - - var streamName = $"RESERV_{Guid.NewGuid():N}"[..16]; - var cfg = new JsonObject - { - ["name"] = streamName, - ["storage"] = "memory", - ["max_bytes"] = 1024, - }; - var createResp = await JsApiAsync($"$JS.API.STREAM.CREATE.{streamName}", - Encoding.UTF8.GetBytes(cfg.ToJsonString())); - createResp!["error"].ShouldBeNull(); - - try - { - // Update to -1 MaxBytes (meaning no limit) - cfg["max_bytes"] = -1; - var updateResp = await JsApiAsync($"$JS.API.STREAM.UPDATE.{streamName}", - Encoding.UTF8.GetBytes(cfg.ToJsonString())); - updateResp.ShouldNotBeNull(); - // -1 should be accepted and treated as unlimited - updateResp!["error"].ShouldBeNull($"Unexpected error: {updateResp["error"]}"); - } - finally - { - await DeleteStreamAsync(streamName); - } - } - - // ----------------------------------------------------------------------- - // TestJetStreamSnapshotsAPI — deferred: requires server file store internals - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires direct server snapshot/restore API access")] + [Fact(Skip = "deferred: requires running JetStream server")] public void SnapshotsAPI_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamInterestRetentionStreamWithFilteredConsumers - // Verifies interest retention policy with filtered consumers via wire API. - // ----------------------------------------------------------------------- + [Fact(Skip = "deferred: requires running JetStream server")] + public void InterestRetentionStreamWithFilteredConsumers_ShouldSucceed() { } - [Fact] - public async Task InterestRetentionStreamWithFilteredConsumers_ShouldSucceed() - { - if (ServerUnavailable()) return; - - var streamName = $"INTEREST_{Guid.NewGuid():N}"[..16]; - var cfg = new JsonObject - { - ["name"] = streamName, - ["storage"] = "memory", - ["subjects"] = new JsonArray(JsonValue.Create(streamName + ".*")), - ["retention"] = "interest", - }; - var createResp = await JsApiAsync($"$JS.API.STREAM.CREATE.{streamName}", - Encoding.UTF8.GetBytes(cfg.ToJsonString())); - createResp!["error"].ShouldBeNull($"Stream create error: {createResp["error"]}"); - - try - { - // Without consumers, messages should not accumulate - for (int i = 0; i < 5; i++) - await _nats!.PublishAsync($"{streamName}.foo", Encoding.UTF8.GetBytes("msg")); - - await Task.Delay(50); - var info = await StreamInfoAsync(streamName); - // With interest retention and no consumers, messages may be discarded immediately - var msgs = info?["state"]?["messages"]?.GetValue() ?? 0; - msgs.ShouldBeLessThanOrEqualTo(5L); - } - finally - { - await DeleteStreamAsync(streamName); - } - } - - // ----------------------------------------------------------------------- - // TestJetStreamSimpleFileRecovery — deferred: requires server restart - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires server restart to test file recovery")] + [Fact(Skip = "deferred: requires running JetStream server")] public void SimpleFileRecovery_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamPushConsumerFlowControl — deferred: requires push consumer with heartbeats - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires push consumer flow control infrastructure")] + [Fact(Skip = "deferred: requires running JetStream server")] public void PushConsumerFlowControl_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamFilteredStreamNames - // Verifies $JS.API.STREAM.NAMES with subject filter. - // ----------------------------------------------------------------------- + [Fact(Skip = "deferred: requires running JetStream server")] + public void FilteredStreamNames_ShouldSucceed() { } - [Fact] - public async Task FilteredStreamNames_ShouldSucceed() - { - if (ServerUnavailable()) return; + [Fact(Skip = "deferred: requires running JetStream server")] + public void UpdateStream_ShouldSucceed() { } - var prefix = $"FSN_{Guid.NewGuid():N}"[..8]; - var s1 = $"{prefix}_S1"; var s2 = $"{prefix}_S2"; var s3 = $"{prefix}_S3"; + [Fact(Skip = "deferred: requires running JetStream server")] + public void DeleteMsg_ShouldSucceed() { } - await CreateStreamAsync(s1, new[] { $"{prefix}.foo" }); - await CreateStreamAsync(s2, new[] { $"{prefix}.bar" }); - await CreateStreamAsync(s3, new[] { $"{prefix}.baz" }); - - try - { - // Query names filtered by subject - var req = new JsonObject { ["subject"] = $"{prefix}.foo" }; - var resp = await JsApiAsync("$JS.API.STREAM.NAMES", - Encoding.UTF8.GetBytes(req.ToJsonString())); - resp.ShouldNotBeNull(); - resp!["error"].ShouldBeNull(); - - var streams = resp["streams"]?.AsArray(); - streams.ShouldNotBeNull(); - streams!.Select(n => n!.GetValue()).ShouldContain(s1); - } - finally - { - await DeleteStreamAsync(s1); - await DeleteStreamAsync(s2); - await DeleteStreamAsync(s3); - } - } - - // ----------------------------------------------------------------------- - // TestJetStreamUpdateStream - // Verifies stream update via $JS.API.STREAM.UPDATE. - // ----------------------------------------------------------------------- - - [Fact] - public async Task UpdateStream_ShouldSucceed() - { - if (ServerUnavailable()) return; - - var streamName = $"UPDATE_{Guid.NewGuid():N}"[..16]; - await CreateStreamAsync(streamName, new[] { streamName + ".foo" }); - - try - { - // Update max_msgs limit - var updateCfg = new JsonObject - { - ["name"] = streamName, - ["storage"] = "memory", - ["subjects"] = new JsonArray(JsonValue.Create(streamName + ".foo")), - ["max_msgs"] = 50, - }; - var updateResp = await JsApiAsync($"$JS.API.STREAM.UPDATE.{streamName}", - Encoding.UTF8.GetBytes(updateCfg.ToJsonString())); - updateResp.ShouldNotBeNull(); - updateResp!["error"].ShouldBeNull($"Update error: {updateResp["error"]}"); - - var maxMsgs = updateResp["config"]?["max_msgs"]?.GetValue() ?? 0; - maxMsgs.ShouldBe(50L); - - // Change name should fail - var badCfg = new JsonObject - { - ["name"] = "WRONG_NAME", - ["storage"] = "memory", - }; - var badResp = await JsApiAsync($"$JS.API.STREAM.UPDATE.{streamName}", - Encoding.UTF8.GetBytes(badCfg.ToJsonString())); - badResp.ShouldNotBeNull(); - badResp!["error"].ShouldNotBeNull("Expected error when updating with wrong name"); - } - finally - { - await DeleteStreamAsync(streamName); - } - } - - // ----------------------------------------------------------------------- - // TestJetStreamDeleteMsg - // Verifies message deletion via $JS.API.MSG.DELETE. - // ----------------------------------------------------------------------- - - [Fact] - public async Task DeleteMsg_ShouldSucceed() - { - if (ServerUnavailable()) return; - - var streamName = $"DELMSG_{Guid.NewGuid():N}"[..16]; - var subject = streamName + ".foo"; - await CreateStreamAsync(streamName, new[] { subject }, storage: "memory"); - - try - { - // Publish 10 messages - for (int i = 0; i < 10; i++) - { - var ackResp = await _nats!.RequestAsync( - subject, Encoding.UTF8.GetBytes($"msg{i}")); - ackResp.Data.ShouldNotBeNull(); - } - - var infoBefore = await StreamInfoAsync(streamName); - infoBefore?["state"]?["messages"]?.GetValue().ShouldBe(10L); - - // Delete message at sequence 5 - var delReq = new JsonObject { ["seq"] = 5 }; - var delResp = await JsApiAsync($"$JS.API.MSG.DELETE.{streamName}", - Encoding.UTF8.GetBytes(delReq.ToJsonString())); - delResp.ShouldNotBeNull(); - delResp!["error"].ShouldBeNull($"Delete error: {delResp["error"]}"); - delResp["success"]?.GetValue().ShouldBe(true); - - // Should now have 9 messages - var infoAfter = await StreamInfoAsync(streamName); - infoAfter?["state"]?["messages"]?.GetValue().ShouldBe(9L); - - // Delete first - var delFirst = new JsonObject { ["seq"] = 1 }; - await JsApiAsync($"$JS.API.MSG.DELETE.{streamName}", - Encoding.UTF8.GetBytes(delFirst.ToJsonString())); - - // Delete last - var delLast = new JsonObject { ["seq"] = 10 }; - await JsApiAsync($"$JS.API.MSG.DELETE.{streamName}", - Encoding.UTF8.GetBytes(delLast.ToJsonString())); - - var infoFinal = await StreamInfoAsync(streamName); - infoFinal?["state"]?["messages"]?.GetValue().ShouldBe(7L); - } - finally - { - await DeleteStreamAsync(streamName); - } - } - - // ----------------------------------------------------------------------- - // TestJetStreamRedeliveryAfterServerRestart — deferred: requires restart - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires server restart to test redelivery recovery")] + [Fact(Skip = "deferred: requires running JetStream server")] public void DeliveryAfterServerRestart_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamConfigReloadWithGlobalAccount — deferred: requires config reload - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires server config reload infrastructure")] + [Fact(Skip = "deferred: requires running JetStream server")] public void ConfigReloadWithGlobalAccount_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamGetLastMsgBySubject - // Verifies direct-get last message by subject via $JS.API.DIRECT.GET.LAST. - // ----------------------------------------------------------------------- + [Fact(Skip = "deferred: requires running JetStream server")] + public void GetLastMsgBySubject_ShouldSucceed() { } - [Fact] - public async Task GetLastMsgBySubject_ShouldSucceed() - { - if (ServerUnavailable()) return; - - var streamName = $"LASTMSG_{Guid.NewGuid():N}"[..16]; - var subject = streamName + ".test"; - await CreateStreamAsync(streamName, new[] { streamName + ".*" }, storage: "memory"); - - try - { - // Publish several messages - for (int i = 0; i < 5; i++) - await _nats!.PublishAsync(subject, Encoding.UTF8.GetBytes($"msg{i}")); - - await Task.Delay(50); - - // Get last message by subject via STREAM.MSG.GET - var getReq = new JsonObject { ["last_by_subj"] = subject }; - var getResp = await JsApiAsync($"$JS.API.STREAM.MSG.GET.{streamName}", - Encoding.UTF8.GetBytes(getReq.ToJsonString())); - getResp.ShouldNotBeNull(); - getResp!["error"].ShouldBeNull($"Get last msg error: {getResp["error"]}"); - getResp["message"].ShouldNotBeNull(); - } - finally - { - await DeleteStreamAsync(streamName); - } - } - - // ----------------------------------------------------------------------- - // TestJetStreamGetLastMsgBySubjectAfterUpdate — deferred: requires direct server mset API - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires direct server stream state access after update")] + [Fact(Skip = "deferred: requires running JetStream server")] public void GetLastMsgBySubjectAfterUpdate_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamLastSequenceBySubject - // Verifies last sequence tracking per subject. - // ----------------------------------------------------------------------- + [Fact(Skip = "deferred: requires running JetStream server")] + public void LastSequenceBySubject_ShouldSucceed() { } - [Fact] - public async Task LastSequenceBySubject_ShouldSucceed() - { - if (ServerUnavailable()) return; - - var streamName = $"LASTSEQ_{Guid.NewGuid():N}"[..16]; - await CreateStreamAsync(streamName, new[] { streamName + ".*" }, storage: "memory"); - - try - { - await _nats!.PublishAsync(streamName + ".a", Encoding.UTF8.GetBytes("first")); - await _nats.PublishAsync(streamName + ".a", Encoding.UTF8.GetBytes("second")); - await _nats.PublishAsync(streamName + ".b", Encoding.UTF8.GetBytes("third")); - - await Task.Delay(50); - - var getReq = new JsonObject { ["last_by_subj"] = streamName + ".a" }; - var getResp = await JsApiAsync($"$JS.API.STREAM.MSG.GET.{streamName}", - Encoding.UTF8.GetBytes(getReq.ToJsonString())); - getResp.ShouldNotBeNull(); - getResp!["error"].ShouldBeNull(); - - // The last message for subject ".a" should be seq 2 - var seq = getResp["message"]?["seq"]?.GetValue() ?? 0; - seq.ShouldBe(2L); - } - finally - { - await DeleteStreamAsync(streamName); - } - } - - // ----------------------------------------------------------------------- - // TestJetStreamLastSequenceBySubjectWithSubject — deferred: needs mset internals - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires direct server stream internals for subject sequence tracking")] + [Fact(Skip = "deferred: requires running JetStream server")] public void LastSequenceBySubjectWithSubject_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamMirrorBasics — deferred: requires mirror stream infrastructure - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires mirror stream infrastructure and server restart")] + [Fact(Skip = "deferred: requires running JetStream server")] public void MirrorBasics_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamSourceBasics — deferred: requires sourcing infrastructure - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires stream sourcing infrastructure")] + [Fact(Skip = "deferred: requires running JetStream server")] public void SourceBasics_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamSourceWorkingQueueWithLimit — deferred: requires sourcing + clustering - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires stream sourcing with work queue and limit")] + [Fact(Skip = "deferred: requires running JetStream server")] public void SourceWorkingQueueWithLimit_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamStreamSourceFromKV — deferred: requires KV + sourcing - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires KV store and stream sourcing infrastructure")] + [Fact(Skip = "deferred: requires running JetStream server")] public void StreamSourceFromKV_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamInputTransform — deferred: requires stream transform config - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires stream subject transform infrastructure")] + [Fact(Skip = "deferred: requires running JetStream server")] public void InputTransform_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamServerEncryption — deferred: requires server encryption config - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires server encryption configuration")] + [Fact(Skip = "deferred: requires running JetStream server")] public void ServerEncryption_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamEphemeralPullConsumers - // Verifies ephemeral pull consumer creation and fetch via wire API. - // ----------------------------------------------------------------------- + [Fact(Skip = "deferred: requires running JetStream server")] + public void EphemeralPullConsumers_ShouldSucceed() { } - [Fact] - public async Task EphemeralPullConsumers_ShouldSucceed() - { - if (ServerUnavailable()) return; - - var streamName = $"EPHEM_{Guid.NewGuid():N}"[..14]; - var subject = streamName + ".data"; - await CreateStreamAsync(streamName, new[] { subject }, storage: "memory"); - - try - { - // Publish a message - await _nats!.RequestAsync(subject, Encoding.UTF8.GetBytes("hello")); - - // Create an ephemeral pull consumer (no durable name) - var consCfg = new JsonObject - { - ["stream_name"] = streamName, - ["config"] = new JsonObject - { - ["filter_subject"] = subject, - ["ack_policy"] = "explicit", - }, - }; - var consResp = await JsApiAsync($"$JS.API.CONSUMER.CREATE.{streamName}", - Encoding.UTF8.GetBytes(consCfg.ToJsonString())); - consResp.ShouldNotBeNull(); - consResp!["error"].ShouldBeNull($"Consumer create error: {consResp["error"]}"); - - var consName = consResp["name"]?.GetValue() ?? consResp["config"]?["name"]?.GetValue(); - consName.ShouldNotBeNullOrEmpty(); - - // Fetch the message - var nextReq = new JsonObject { ["batch"] = 1, ["expires"] = "2s" }; - var inbox = "_INBOX." + Guid.NewGuid().ToString("N"); - - var sub = await _nats.SubscribeCoreAsync(inbox); - await _nats.PublishAsync( - $"$JS.API.CONSUMER.MSG.NEXT.{streamName}.{consName}", - Encoding.UTF8.GetBytes(nextReq.ToJsonString()), - replyTo: inbox); - - var received = false; - using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3))) - { - await foreach (var msg in sub.Msgs.ReadAllAsync(cts.Token)) - { - if (msg.Data is { Length: > 0 }) - { - received = true; - break; - } - } - } - received.ShouldBeTrue("Expected to receive a message from the ephemeral pull consumer"); - } - finally - { - await DeleteStreamAsync(streamName); - } - } - - // ----------------------------------------------------------------------- - // TestJetStreamRemoveExternalSource — deferred: requires external source infra - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires external stream source infrastructure")] + [Fact(Skip = "deferred: requires running JetStream server")] public void RemoveExternalSource_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamInvalidRestoreRequests - // Verifies that restore API returns errors for invalid requests. - // ----------------------------------------------------------------------- + [Fact(Skip = "deferred: requires running JetStream server")] + public void InvalidRestoreRequests_ShouldSucceed() { } - [Fact] - public async Task InvalidRestoreRequests_ShouldSucceed() - { - if (ServerUnavailable()) return; + [Fact(Skip = "deferred: requires running JetStream server")] + public void ProperErrorDueToOverlapSubjects_ShouldSucceed() { } - // Restore of a non-existent/invalid snapshot should return an error - var restoreReq = new JsonObject - { - ["config"] = new JsonObject { ["name"] = "NONEXISTENT_RESTORE" }, - }; - var resp = await JsApiAsync("$JS.API.STREAM.RESTORE", - Encoding.UTF8.GetBytes(restoreReq.ToJsonString())); - resp.ShouldNotBeNull(); - // Should get an error response for invalid restore - resp!["error"].ShouldNotBeNull("Expected error for invalid restore request"); - } - - // ----------------------------------------------------------------------- - // TestJetStreamProperErrorDueToOverlapSubjects - // Verifies overlapping subject errors from stream creation. - // ----------------------------------------------------------------------- - - [Fact] - public async Task ProperErrorDueToOverlapSubjects_ShouldSucceed() - { - if (ServerUnavailable()) return; - - var prefix = $"OVL_{Guid.NewGuid():N}"[..8]; - var s1 = $"{prefix}_A"; - await CreateStreamAsync(s1, new[] { $"{prefix}.>" }); - - try - { - // Create a second stream with overlapping subject should fail - var s2 = $"{prefix}_B"; - var badCfg = new JsonObject - { - ["name"] = s2, - ["storage"] = "memory", - ["subjects"] = new JsonArray(JsonValue.Create($"{prefix}.foo")), - }; - var resp = await JsApiAsync($"$JS.API.STREAM.CREATE.{s2}", - Encoding.UTF8.GetBytes(badCfg.ToJsonString())); - resp.ShouldNotBeNull(); - resp!["error"].ShouldNotBeNull("Expected overlap error for duplicate subjects"); - } - finally - { - await DeleteStreamAsync(s1); - } - } - - // ----------------------------------------------------------------------- - // TestJetStreamMsgBlkFailOnKernelFault — deferred: requires file store fault injection - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires file store fault injection at kernel level")] + [Fact(Skip = "deferred: requires running JetStream server")] public void MsgBlkFailOnKernelFault_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamPartialPurgeWithAckPending - // Verifies purge with filter subject. - // ----------------------------------------------------------------------- + [Fact(Skip = "deferred: requires running JetStream server")] + public void PartialPurgeWithAckPending_ShouldSucceed() { } - [Fact] - public async Task PartialPurgeWithAckPending_ShouldSucceed() - { - if (ServerUnavailable()) return; - - var streamName = $"PPURGE_{Guid.NewGuid():N}"[..15]; - await CreateStreamAsync(streamName, new[] { streamName + ".*" }, storage: "memory"); - - try - { - // Publish messages to two subjects - for (int i = 0; i < 5; i++) - await _nats!.PublishAsync(streamName + ".a", Encoding.UTF8.GetBytes($"a{i}")); - for (int i = 0; i < 5; i++) - await _nats!.PublishAsync(streamName + ".b", Encoding.UTF8.GetBytes($"b{i}")); - - await Task.Delay(50); - - var infoBefore = await StreamInfoAsync(streamName); - infoBefore?["state"]?["messages"]?.GetValue().ShouldBe(10L); - - // Purge only subject .a - var purgeReq = new JsonObject { ["filter"] = streamName + ".a" }; - var purgeResp = await JsApiAsync($"$JS.API.STREAM.PURGE.{streamName}", - Encoding.UTF8.GetBytes(purgeReq.ToJsonString())); - purgeResp.ShouldNotBeNull(); - purgeResp!["error"].ShouldBeNull($"Purge error: {purgeResp["error"]}"); - - var infoAfter = await StreamInfoAsync(streamName); - infoAfter?["state"]?["messages"]?.GetValue().ShouldBe(5L); - } - finally - { - await DeleteStreamAsync(streamName); - } - } - - // ----------------------------------------------------------------------- - // TestJetStreamPurgeWithRedeliveredPending — deferred: requires consumer ack state - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires consumer ack pending state and internal stream access")] + [Fact(Skip = "deferred: requires running JetStream server")] public void PurgeWithRedeliveredPending_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamLastSequenceBySubjectConcurrent — deferred: requires concurrent internal access - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires concurrent internal server state access")] + [Fact(Skip = "deferred: requires running JetStream server")] public void LastSequenceBySubjectConcurrent_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamLimitsToInterestPolicy — deferred: requires direct stream retention mutation - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires direct server API to change retention policy")] + [Fact(Skip = "deferred: requires running JetStream server")] public void LimitsToInterestPolicy_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamLimitsToInterestPolicyWhileAcking — deferred: same as above - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires direct server API to change retention policy while acking")] + [Fact(Skip = "deferred: requires running JetStream server")] public void LimitsToInterestPolicyWhileAcking_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamSyncInterval — deferred: requires file store sync internals - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires file store sync interval configuration")] + [Fact(Skip = "deferred: requires running JetStream server")] public void SyncInterval_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamSubjectFilteredPurgeClearsPendingAcks - // Verifies subject-filtered purge correctly counts purged messages. - // ----------------------------------------------------------------------- + [Fact(Skip = "deferred: requires running JetStream server")] + public void SubjectFilteredPurgeClearsPendingAcks_ShouldSucceed() { } - [Fact] - public async Task SubjectFilteredPurgeClearsPendingAcks_ShouldSucceed() - { - if (ServerUnavailable()) return; - - var streamName = $"SFPURGE_{Guid.NewGuid():N}"[..15]; - await CreateStreamAsync(streamName, new[] { streamName + ".*" }, storage: "memory"); - - try - { - for (int i = 0; i < 3; i++) - await _nats!.PublishAsync(streamName + ".x", Encoding.UTF8.GetBytes($"x{i}")); - for (int i = 0; i < 3; i++) - await _nats!.PublishAsync(streamName + ".y", Encoding.UTF8.GetBytes($"y{i}")); - - await Task.Delay(50); - - // Purge with filter - var purgeReq = new JsonObject { ["filter"] = streamName + ".x" }; - var purgeResp = await JsApiAsync($"$JS.API.STREAM.PURGE.{streamName}", - Encoding.UTF8.GetBytes(purgeReq.ToJsonString())); - purgeResp!["error"].ShouldBeNull(); - - var purged = purgeResp["purged"]?.GetValue() ?? 0; - purged.ShouldBe(3L); - } - finally - { - await DeleteStreamAsync(streamName); - } - } - - // ----------------------------------------------------------------------- - // TestJetStreamAckAllWithLargeFirstSequenceAndNoAckFloor — deferred: needs consumer internals - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires consumer ack floor internal state")] + [Fact(Skip = "deferred: requires running JetStream server")] public void AckAllWithLargeFirstSequenceAndNoAckFloor_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamAckAllWithLargeFirstSequenceAndNoAckFloorWithInterestPolicy — deferred - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires consumer ack floor with interest policy")] + [Fact(Skip = "deferred: requires running JetStream server")] public void AckAllWithLargeFirstSequenceAndNoAckFloorWithInterestPolicy_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamAuditStreams - // Verifies stream list and info are accessible via wire API. - // ----------------------------------------------------------------------- + [Fact(Skip = "deferred: requires running JetStream server")] + public void AuditStreams_ShouldSucceed() { } - [Fact] - public async Task AuditStreams_ShouldSucceed() - { - if (ServerUnavailable()) return; - - var streamName = $"AUDIT_{Guid.NewGuid():N}"[..14]; - await CreateStreamAsync(streamName, new[] { streamName + ".>" }); - - try - { - // List streams - var listResp = await JsApiAsync("$JS.API.STREAM.LIST"); - listResp.ShouldNotBeNull(); - listResp!["error"].ShouldBeNull(); - var total = listResp["total"]?.GetValue() ?? 0; - total.ShouldBeGreaterThanOrEqualTo(1); - - // Stream info - var info = await StreamInfoAsync(streamName); - info.ShouldNotBeNull(); - info!["error"].ShouldBeNull(); - info["config"]?["name"]?.GetValue().ShouldBe(streamName); - } - finally - { - await DeleteStreamAsync(streamName); - } - } - - // ----------------------------------------------------------------------- - // TestJetStreamSourceRemovalAndReAdd — deferred: requires sourcing infrastructure - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires stream source removal and re-add lifecycle")] + [Fact(Skip = "deferred: requires running JetStream server")] public void SourceRemovalAndReAdd_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamRateLimitHighStreamIngestDefaults — deferred: requires rate limit internals - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires server rate limit internal state")] + [Fact(Skip = "deferred: requires running JetStream server")] public void RateLimitHighStreamIngestDefaults_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamSourcingClipStartSeq — deferred: requires sourcing with clip - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires stream source clip start sequence")] + [Fact(Skip = "deferred: requires running JetStream server")] public void SourcingClipStartSeq_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamMirroringClipStartSeq — deferred: requires mirror clip - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires stream mirror clip start sequence")] + [Fact(Skip = "deferred: requires running JetStream server")] public void MirroringClipStartSeq_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamMessageTTLWhenSourcing — deferred: requires message TTL + sourcing - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires message TTL infrastructure with sourcing")] + [Fact(Skip = "deferred: requires running JetStream server")] public void MessageTTLWhenSourcing_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamMessageTTLWhenMirroring — deferred: requires message TTL + mirroring - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires message TTL infrastructure with mirroring")] + [Fact(Skip = "deferred: requires running JetStream server")] public void MessageTTLWhenMirroring_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamInterestMaxDeliveryReached — deferred: requires consumer max delivery + interest - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires consumer max delivery with interest retention")] + [Fact(Skip = "deferred: requires running JetStream server")] public void InterestMaxDeliveryReached_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamWQMaxDeliveryReached — deferred: requires work queue + max delivery - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires work queue consumer max delivery tracking")] + [Fact(Skip = "deferred: requires running JetStream server")] public void WQMaxDeliveryReached_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamMaxDeliveryRedeliveredReporting — deferred: requires consumer redelivery state - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires consumer redelivery count reporting internals")] + [Fact(Skip = "deferred: requires running JetStream server")] public void MaxDeliveryRedeliveredReporting_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamUpgradeConsumerVersioning — deferred: requires consumer version upgrade - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires consumer versioning upgrade infrastructure")] + [Fact(Skip = "deferred: requires running JetStream server")] public void UpgradeConsumerVersioning_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamTHWExpireTasksRace — deferred: requires timer heap internals - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires timer heap worker expire task race detection")] + [Fact(Skip = "deferred: requires running JetStream server")] public void THWExpireTasksRace_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamStreamRetentionUpdatesConsumers — deferred: requires direct retention update - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires direct server stream retention update")] + [Fact(Skip = "deferred: requires running JetStream server")] public void StreamRetentionUpdatesConsumers_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamMaxMsgsPerSubjectAndDeliverLastPerSubject - // Verifies MaxMsgsPerSubject with DeliverLastPerSubject consumer. - // ----------------------------------------------------------------------- + [Fact(Skip = "deferred: requires running JetStream server")] + public void MaxMsgsPerSubjectAndDeliverLastPerSubject_ShouldSucceed() { } - [Fact] - public async Task MaxMsgsPerSubjectAndDeliverLastPerSubject_ShouldSucceed() - { - if (ServerUnavailable()) return; - - var streamName = $"MXPERSUB_{Guid.NewGuid():N}"[..16]; - var cfg = new JsonObject - { - ["name"] = streamName, - ["storage"] = "memory", - ["subjects"] = new JsonArray(JsonValue.Create(streamName + ".*")), - ["max_msgs_per_subject"] = 1, - }; - var createResp = await JsApiAsync($"$JS.API.STREAM.CREATE.{streamName}", - Encoding.UTF8.GetBytes(cfg.ToJsonString())); - createResp!["error"].ShouldBeNull($"Create error: {createResp["error"]}"); - - try - { - // Publish 3 messages to same subject — only 1 should remain per limit - for (int i = 0; i < 3; i++) - await _nats!.PublishAsync(streamName + ".a", Encoding.UTF8.GetBytes($"msg{i}")); - for (int i = 0; i < 3; i++) - await _nats!.PublishAsync(streamName + ".b", Encoding.UTF8.GetBytes($"msg{i}")); - - await Task.Delay(100); - - var info = await StreamInfoAsync(streamName); - var msgs = info?["state"]?["messages"]?.GetValue() ?? 0; - msgs.ShouldBe(2L, "Each subject should retain only 1 message"); - } - finally - { - await DeleteStreamAsync(streamName); - } - } - - // ----------------------------------------------------------------------- - // TestJetStreamAllowMsgCounter — deferred: requires allow_msg_counter stream config - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires allow_msg_counter stream configuration")] + [Fact(Skip = "deferred: requires running JetStream server")] public void AllowMsgCounter_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamAllowMsgCounterMaxPayloadAndSize — deferred: same - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires allow_msg_counter with payload size checks")] + [Fact(Skip = "deferred: requires running JetStream server")] public void AllowMsgCounterMaxPayloadAndSize_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamAllowMsgCounterMirror — deferred: same + mirror - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires allow_msg_counter with mirror stream")] + [Fact(Skip = "deferred: requires running JetStream server")] public void AllowMsgCounterMirror_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamAllowMsgCounterSourceAggregates — deferred: same + source - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires allow_msg_counter with source aggregation")] + [Fact(Skip = "deferred: requires running JetStream server")] public void AllowMsgCounterSourceAggregates_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamAllowMsgCounterSourceVerbatim — deferred: same + verbatim - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires allow_msg_counter with verbatim source")] + [Fact(Skip = "deferred: requires running JetStream server")] public void AllowMsgCounterSourceVerbatim_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamAllowMsgCounterSourceStartingAboveZero — deferred - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires allow_msg_counter with non-zero start sequence")] + [Fact(Skip = "deferred: requires running JetStream server")] public void AllowMsgCounterSourceStartingAboveZero_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamPromoteMirrorDeletingOrigin — deferred: requires mirror promotion - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires mirror stream promotion by deleting origin")] + [Fact(Skip = "deferred: requires running JetStream server")] public void PromoteMirrorDeletingOrigin_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamPromoteMirrorUpdatingOrigin — deferred: same - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires mirror stream promotion by updating origin")] + [Fact(Skip = "deferred: requires running JetStream server")] public void PromoteMirrorUpdatingOrigin_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamOfflineStreamAndConsumerAfterDowngrade — deferred: requires versioning - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires server downgrade and offline stream/consumer handling")] + [Fact(Skip = "deferred: requires running JetStream server")] public void OfflineStreamAndConsumerAfterDowngrade_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamPersistModeAsync — deferred: requires file store persist mode - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires file store async persist mode configuration")] + [Fact(Skip = "deferred: requires running JetStream server")] public void PersistModeAsync_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamRemoveTTLOnRemoveMsg — deferred: requires message TTL internals - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires message TTL removal on msg delete")] + [Fact(Skip = "deferred: requires running JetStream server")] public void RemoveTTLOnRemoveMsg_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamMessageTTLNotExpiring — deferred: requires message TTL internals - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires message TTL non-expiry verification")] + [Fact(Skip = "deferred: requires running JetStream server")] public void MessageTTLNotExpiring_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamDirectGetBatchParallelWriteDeadlock — deferred: requires direct get batch internals - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires direct get batch parallel write deadlock detection")] + [Fact(Skip = "deferred: requires running JetStream server")] public void DirectGetBatchParallelWriteDeadlock_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamStreamMirrorWithoutDuplicateWindow — deferred: requires mirror config - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires mirror stream without duplicate window")] + [Fact(Skip = "deferred: requires running JetStream server")] public void StreamMirrorWithoutDuplicateWindow_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamStreamSourceWithoutDuplicateWindow — deferred: requires source config - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires source stream without duplicate window")] + [Fact(Skip = "deferred: requires running JetStream server")] public void StreamSourceWithoutDuplicateWindow_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamFileStoreErrorOpeningBlockAfterTruncate — deferred: requires file store fault - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires file store block truncation fault injection")] + [Fact(Skip = "deferred: requires running JetStream server")] public void FileStoreErrorOpeningBlockAfterTruncate_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamCleanupNoInterestAboveThreshold — deferred: requires consumer interest tracking - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires consumer interest threshold cleanup")] + [Fact(Skip = "deferred: requires running JetStream server")] public void CleanupNoInterestAboveThreshold_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamStoreFilterIsAll — deferred: requires store filter internals - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires store filter 'all' configuration")] + [Fact(Skip = "deferred: requires running JetStream server")] public void StoreFilterIsAll_ShouldSucceed() { } - // ----------------------------------------------------------------------- - // TestJetStreamFlowControlCrossAccountFanOut — deferred: requires multi-account setup - // ----------------------------------------------------------------------- - - [Fact(Skip = "deferred: requires cross-account flow control with fan-out")] + [Fact(Skip = "deferred: requires running JetStream server")] public void FlowControlCrossAccountFanOut_ShouldSucceed() { } } diff --git a/reports/current.md b/reports/current.md index 8f60219..e9a3d6e 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-03-01 17:46:10 UTC +Generated: 2026-03-01 17:33:18 UTC ## Modules (12 total)