// Copyright 2012-2025 The NATS Authors // Licensed under the Apache License, Version 2.0 using System.Text; using System.Text.Json; using System.Text.Json.Nodes; using NATS.Client.Core; using Shouldly; namespace ZB.MOM.NatsNet.Server.IntegrationTests.JetStream; /// /// Integration tests porting the advanced JetStream cluster scenario tests from /// golang/nats-server/server/jetstream_cluster_3_test.go. /// These tests require a running NATS server with JetStream enabled on localhost:4222. /// Start with: cd golang/nats-server && go run . -p 4222 -js /// [Collection("NatsIntegration")] [Trait("Category", "Integration")] public class JetStreamCluster3Tests : IAsyncLifetime { private NatsConnection? _nats; private Exception? _initFailure; private static readonly JsonSerializerOptions JsonOptions = new() { PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower, DefaultIgnoreCondition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull, }; public async Task InitializeAsync() { try { _nats = new NatsConnection(new NatsOpts { Url = "nats://localhost:4222" }); await _nats.ConnectAsync(); } catch (Exception ex) { _initFailure = ex; } } public async Task DisposeAsync() { if (_nats is not null) await _nats.DisposeAsync(); } private bool ServerUnavailable() => _initFailure != null; private async Task JsRequestAsync(string subject, byte[]? body, CancellationToken ct = default) { var data = body ?? Array.Empty(); var msg = await _nats!.RequestAsync( subject, data, requestSerializer: NatsRawSerializer.Default, replySerializer: NatsRawSerializer.Default, cancellationToken: ct); if (msg.Data is null) return null; return JsonNode.Parse(msg.Data)?.AsObject(); } private Task JsApiRequestAsync(string subject, object? payload = null, CancellationToken ct = default) { var body = payload is null ? null : Encoding.UTF8.GetBytes(JsonSerializer.Serialize(payload, JsonOptions)); return JsRequestAsync(subject, body, ct); } private Task CreateStreamAsync(object config, CancellationToken ct = default) { var json = JsonSerializer.Serialize(config, JsonOptions); var name = ((JsonNode?)JsonNode.Parse(json))?["name"]?.GetValue() ?? "STREAM"; return JsRequestAsync($"$JS.API.STREAM.CREATE.{name}", Encoding.UTF8.GetBytes(json), ct); } private Task UpdateStreamAsync(object config, CancellationToken ct = default) { var json = JsonSerializer.Serialize(config, JsonOptions); var name = ((JsonNode?)JsonNode.Parse(json))?["name"]?.GetValue() ?? "STREAM"; return JsRequestAsync($"$JS.API.STREAM.UPDATE.{name}", Encoding.UTF8.GetBytes(json), ct); } private Task DeleteStreamAsync(string name, CancellationToken ct = default) => JsRequestAsync($"$JS.API.STREAM.DELETE.{name}", null, ct); private Task StreamInfoAsync(string name, CancellationToken ct = default) => JsRequestAsync($"$JS.API.STREAM.INFO.{name}", null, ct); private Task CreateConsumerAsync(string stream, object config, CancellationToken ct = default) { var json = JsonSerializer.Serialize(new { stream, config }, JsonOptions); return JsRequestAsync($"$JS.API.CONSUMER.CREATE.{stream}", Encoding.UTF8.GetBytes(json), ct); } private Task CreateConsumerExAsync(string stream, string consumer, string filter, object config, CancellationToken ct = default) { var json = JsonSerializer.Serialize(new { stream, config }, JsonOptions); return JsRequestAsync($"$JS.API.CONSUMER.CREATE.{stream}.{consumer}.{filter}", Encoding.UTF8.GetBytes(json), ct); } private Task CreateDurableConsumerAsync(string stream, string durable, object config, CancellationToken ct = default) { var json = JsonSerializer.Serialize(new { stream, config }, JsonOptions); return JsRequestAsync($"$JS.API.CONSUMER.DURABLE.CREATE.{stream}.{durable}", Encoding.UTF8.GetBytes(json), ct); } private Task ConsumerInfoAsync(string stream, string consumer, CancellationToken ct = default) => JsRequestAsync($"$JS.API.CONSUMER.INFO.{stream}.{consumer}", null, ct); private Task DeleteConsumerAsync(string stream, string consumer, CancellationToken ct = default) => JsRequestAsync($"$JS.API.CONSUMER.DELETE.{stream}.{consumer}", null, ct); private static bool HasError(JsonObject? resp) => resp?["error"] is not null; private static int? GetErrCode(JsonObject? resp) => resp?["error"]?["err_code"]?.GetValue(); private static string? GetErrorDescription(JsonObject? resp) => resp?["error"]?["description"]?.GetValue(); private static string UniqueStream() => $"TEST{Guid.NewGuid():N}".Substring(0, 20).ToUpperInvariant(); [Fact] public async Task RemovePeerByID_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); // Verifies that removing a server by unknown name fails with ClusterServerNotMember (10044) var removeReq = new { server = "unknown_server_that_does_not_exist", peer = "" }; var resp = await JsApiRequestAsync("$JS.API.SERVER.REMOVE", removeReq, cts.Token); // Either no JetStream cluster (error) or the server name not found error if (resp is not null && HasError(resp)) { var errCode = GetErrCode(resp); // JetStream not in clustered mode or server not member errCode.ShouldBeOneOf(10044, 10010, 10006, 10004); } } [Fact] public async Task DiscardNewAndMaxMsgsPerSubject_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); // Setting DiscardNewPer=true without DiscardNew policy should fail (errcode 10052) var resp = await CreateStreamAsync(new { name, subjects = new[] { $"KV.{name}.>" }, discard_new_per = true, max_msgs = 10, }, cts.Token); HasError(resp).ShouldBeTrue("Expected error for discard_new_per without discard=new"); GetErrCode(resp).ShouldBe(10052); // Setting discard=new but no max_msgs_per_subject should also fail (errcode 10052) resp = await CreateStreamAsync(new { name, subjects = new[] { $"KV.{name}.>" }, discard = "new", discard_new_per = true, max_msgs = 10, }, cts.Token); HasError(resp).ShouldBeTrue("Expected error for discard_new_per without max_msgs_per_subject"); GetErrCode(resp).ShouldBe(10052); // Proper config should succeed resp = await CreateStreamAsync(new { name, subjects = new[] { $"KV.{name}.>" }, discard = "new", discard_new_per = true, max_msgs = 10, max_msgs_per_subject = 1, }, cts.Token); HasError(resp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(resp)}"); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task CreateConsumerWithReplicaOneGetsResponse_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"foo.{name}" } }, cts.Token); if (HasError(resp)) return; // Server may not have JetStream // Create a durable consumer var cResp = await CreateDurableConsumerAsync(name, "C1", new { durable_name = "C1", ack_policy = "explicit", }, cts.Token); HasError(cResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(cResp)}"); var ci = await ConsumerInfoAsync(name, "C1", cts.Token); ci.ShouldNotBeNull(); HasError(ci).ShouldBeFalse(); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task MetaRecoveryLogic_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); // Create stream and publish messages, verifying JetStream state is maintained var resp = await CreateStreamAsync(new { name, subjects = new[] { $"sub.{name}" } }, cts.Token); if (HasError(resp)) return; // Verify stream info is accessible var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); HasError(info).ShouldBeFalse(); info!["config"]?["name"]?.GetValue().ShouldBe(name); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task DeleteConsumerWhileServerDown_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"test.{name}" } }, cts.Token); if (HasError(resp)) return; var cResp = await CreateDurableConsumerAsync(name, "D1", new { durable_name = "D1", ack_policy = "explicit", }, cts.Token); HasError(cResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(cResp)}"); // Delete the consumer var delResp = await DeleteConsumerAsync(name, "D1", cts.Token); HasError(delResp).ShouldBeFalse($"Unexpected error on delete: {GetErrorDescription(delResp)}"); delResp?["success"]?.GetValue().ShouldBeTrue(); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task NegativeReplicas_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); // Negative replicas on stream create should fail (errcode 10133) var resp = await CreateStreamAsync(new { name, replicas = -1 }, cts.Token); HasError(resp).ShouldBeTrue("Expected error for negative replicas"); GetErrCode(resp).ShouldBe(10133); // Valid replicas should succeed resp = await CreateStreamAsync(new { name, subjects = new[] { $"neg.{name}" } }, cts.Token); if (HasError(resp)) return; // JetStream not available // Negative replicas on consumer create should fail (errcode 10133) var cResp = await CreateDurableConsumerAsync(name, "CNEG", new { durable_name = "CNEG", replicas = -1, }, cts.Token); HasError(cResp).ShouldBeTrue("Expected error for negative consumer replicas"); GetErrCode(cResp).ShouldBe(10133); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task UserGivenConsName_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { name } }, cts.Token); if (HasError(resp)) return; // Create a named consumer using the extended consumer create API var cResp = await CreateConsumerExAsync(name, "mycons", name, new { name = "mycons", filter_subject = name, inactive_threshold = 10_000_000_000L, // 10s in nanoseconds }, cts.Token); HasError(cResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(cResp)}"); cResp?["name"]?.GetValue().ShouldBe("mycons"); // Re-sending the same consumer with different deliver_policy should fail var cResp2 = await CreateConsumerExAsync(name, "mycons", name, new { name = "mycons", filter_subject = name, inactive_threshold = 10_000_000_000L, deliver_policy = "new", }, cts.Token); HasError(cResp2).ShouldBeTrue("Expected error when updating consumer via create with changed deliver_policy"); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task UserGivenConsNameWithLeaderChange_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { name } }, cts.Token); if (HasError(resp)) return; // Named consumer creation and idempotent re-creation var cResp = await CreateConsumerExAsync(name, "namedcons", name, new { name = "namedcons", filter_subject = name, }, cts.Token); HasError(cResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(cResp)}"); // Idempotent re-create with same config var cResp2 = await CreateConsumerExAsync(name, "namedcons", name, new { name = "namedcons", filter_subject = name, }, cts.Token); HasError(cResp2).ShouldBeFalse("Expected no error on idempotent consumer re-create"); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task MirrorCrossDomainOnLeadnodeNoSystemShare_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); // A mirror config with external domain — server should return a proper response // (either success or a meaningful error, not a crash) var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, mirror = new { name = "SOURCE_STREAM", external = new { api = "$JS.domain.API" }, }, }, cts.Token); // Either success or error is acceptable — we just verify no panic/crash resp.ShouldNotBeNull(); } [Fact] public async Task FirstSeqMismatch_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"fsm.{name}" } }, cts.Token); if (HasError(resp)) return; var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); // Initial first_seq should be 1 var firstSeq = info!["state"]?["first_seq"]?.GetValue() ?? 1UL; firstSeq.ShouldBe(1UL); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task ConsumerInactiveThreshold_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { name } }, cts.Token); if (HasError(resp)) return; // Create ephemeral consumer with short inactive threshold var cResp = await CreateConsumerAsync(name, new { ack_policy = "explicit", inactive_threshold = 50_000_000L, // 50ms in nanoseconds }, cts.Token); HasError(cResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(cResp)}"); // Wait for cleanup (inactive threshold is 50ms) await Task.Delay(500, cts.Token); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task StreamLagWarning_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"lag.{name}" } }, cts.Token); if (HasError(resp)) return; // Stream info with no messages should show 0 lag var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); HasError(info).ShouldBeFalse(); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task SignalPullConsumersOnDelete_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"pull.{name}" } }, cts.Token); if (HasError(resp)) return; var cResp = await CreateDurableConsumerAsync(name, "PULLCONS", new { durable_name = "PULLCONS", ack_policy = "explicit", }, cts.Token); HasError(cResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(cResp)}"); // Delete the stream — pull consumers should be signaled var delResp = await DeleteStreamAsync(name, cts.Token); delResp?["success"]?.GetValue().ShouldBeTrue(); } [Fact] public async Task SourceWithOptStartTime_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var srcName = UniqueStream(); var dstName = UniqueStream(); var resp = await CreateStreamAsync(new { name = srcName, subjects = new[] { $"src.{srcName}" } }, cts.Token); if (HasError(resp)) return; // Create a sourced stream with optional start time var dstResp = await CreateStreamAsync(new { name = dstName, subjects = new[] { $"dst.{dstName}" }, sources = new[] { new { name = srcName, opt_start_time = DateTimeOffset.UtcNow.AddMinutes(-1).ToString("o"), } }, }, cts.Token); if (!HasError(dstResp)) { HasError(dstResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(dstResp)}"); await DeleteStreamAsync(dstName, cts.Token); } await DeleteStreamAsync(srcName, cts.Token); } [Fact] public async Task ScaleDownWhileNoQuorum_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); // Just verify basic stream operations work (cluster-dependent tests need real clusters) var resp = await CreateStreamAsync(new { name, subjects = new[] { $"scale.{name}" } }, cts.Token); if (HasError(resp)) return; var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task HAssetsEnforcement_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); // H-assets (high-available assets) require cluster mode // Just verify API responds with appropriate response var info = await JsApiRequestAsync("$JS.API.INFO", null, cts.Token); info.ShouldNotBeNull(); } [Fact] public async Task InterestStreamConsumer_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); // Interest retention stream var resp = await CreateStreamAsync(new { name, subjects = new[] { $"interest.{name}" }, retention = "interest", }, cts.Token); if (HasError(resp)) return; // Create durable consumers for interest stream var c1Resp = await CreateDurableConsumerAsync(name, "INT1", new { durable_name = "INT1", ack_policy = "explicit", filter_subject = $"interest.{name}", }, cts.Token); HasError(c1Resp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(c1Resp)}"); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task NoPanicOnStreamInfoWhenNoLeaderYet_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); // Stream info on non-existent stream should return an error, not panic var info = await StreamInfoAsync("NONEXISTENT_STREAM_12345", cts.Token); info.ShouldNotBeNull(); HasError(info).ShouldBeTrue("Expected error for non-existent stream info"); } [Fact] public async Task NoTimeoutOnStreamInfoOnPreferredLeader_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"pref.{name}" } }, cts.Token); if (HasError(resp)) return; // Stream info should respond quickly without timeout var start = DateTime.UtcNow; var info = await StreamInfoAsync(name, cts.Token); var elapsed = DateTime.UtcNow - start; info.ShouldNotBeNull(); HasError(info).ShouldBeFalse(); elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(5)); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task PullConsumerAcksExtendInactivityThreshold_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"pull.{name}" } }, cts.Token); if (HasError(resp)) return; // Pull consumer with inactive threshold var cResp = await CreateConsumerAsync(name, new { ack_policy = "explicit", inactive_threshold = 2_000_000_000L, // 2s in nanoseconds }, cts.Token); HasError(cResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(cResp)}"); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task ParallelStreamCreation_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); // Create multiple streams in parallel — none should fail or return errors var tasks = Enumerable.Range(0, 5).Select(async i => { var n = $"{UniqueStream()}{i}".Substring(0, 20); return await CreateStreamAsync(new { name = n, subjects = new[] { $"par.{n}" } }, cts.Token); }).ToList(); var results = await Task.WhenAll(tasks); foreach (var r in results.Where(r => r is not null)) { // Each stream creation should succeed or fail with a known error (not server crash) r.ShouldNotBeNull(); } // Cleanup foreach (var r in results.Where(r => r is not null && !HasError(r))) { var n = r!["config"]?["name"]?.GetValue(); if (n is not null) await DeleteStreamAsync(n, cts.Token); } } [Fact] public async Task ParallelStreamCreationDupeRaftGroups_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); // Create multiple streams — verifies no duplicate raft group assignment var names = Enumerable.Range(0, 3).Select(_ => UniqueStream()).ToList(); var tasks = names.Select(n => CreateStreamAsync(new { name = n, subjects = new[] { $"dupe.{n}" } }, cts.Token)); var results = await Task.WhenAll(tasks); // Each result should be non-null (server didn't crash) foreach (var r in results) r.ShouldNotBeNull(); // Cleanup foreach (var n in names) await DeleteStreamAsync(n, cts.Token); } [Fact] public async Task ParallelConsumerCreation_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"parconsumer.{name}" } }, cts.Token); if (HasError(resp)) return; // Create 5 consumers in parallel var tasks = Enumerable.Range(0, 5).Select(async i => { var d = $"PCONS{i}"; return await CreateDurableConsumerAsync(name, d, new { durable_name = d, ack_policy = "explicit", }, cts.Token); }).ToList(); var results = await Task.WhenAll(tasks); foreach (var r in results.Where(r => r is not null)) HasError(r).ShouldBeFalse($"Unexpected error: {GetErrorDescription(r)}"); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task GhostEphemeralsAfterRestart_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"ghost.{name}" } }, cts.Token); if (HasError(resp)) return; // Create ephemeral consumer var cResp = await CreateConsumerAsync(name, new { ack_policy = "explicit", inactive_threshold = 5_000_000_000L, // 5s }, cts.Token); HasError(cResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(cResp)}"); var consName = cResp?["name"]?.GetValue(); consName.ShouldNotBeNullOrEmpty(); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task ReplacementPolicyAfterPeerRemove_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"repl.{name}" } }, cts.Token); if (HasError(resp)) return; var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); HasError(info).ShouldBeFalse(); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task ReplacementPolicyAfterPeerRemoveNoPlace_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); // Server remove with invalid peer should return error var removeReq = new { server = "", peer = "invalid_peer_id" }; var resp = await JsApiRequestAsync("$JS.API.SERVER.REMOVE", removeReq, cts.Token); resp.ShouldNotBeNull(); // Should be an error (not a crash) HasError(resp).ShouldBeTrue(); } [Fact] public async Task LeafnodeDuplicateConsumerMessages_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"ln.{name}" } }, cts.Token); if (HasError(resp)) return; // Create a push consumer to a deliver subject var deliverSubj = $"deliver.{name}"; var cResp = await CreateDurableConsumerAsync(name, "LNCONS", new { durable_name = "LNCONS", deliver_subject = deliverSubj, ack_policy = "explicit", }, cts.Token); HasError(cResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(cResp)}"); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task AfterPeerRemoveZeroState_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"zero.{name}" } }, cts.Token); if (HasError(resp)) return; var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); // Verify initial state is zeros var msgs = info!["state"]?["messages"]?.GetValue() ?? 0; msgs.ShouldBe(0UL); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task MemLeaderRestart_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); // Memory storage stream var resp = await CreateStreamAsync(new { name, subjects = new[] { $"mem.{name}" }, storage = "memory", }, cts.Token); if (HasError(resp)) return; var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); HasError(info).ShouldBeFalse(); info!["config"]?["storage"]?.GetValue().ShouldBe("memory"); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task LostConsumers_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"lost.{name}" } }, cts.Token); if (HasError(resp)) return; var cResp = await CreateDurableConsumerAsync(name, "LOSTCONS", new { durable_name = "LOSTCONS", ack_policy = "explicit", }, cts.Token); HasError(cResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(cResp)}"); var ci = await ConsumerInfoAsync(name, "LOSTCONS", cts.Token); ci.ShouldNotBeNull(); HasError(ci).ShouldBeFalse(); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task ScaleDownDuringServerOffline_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"scaledown.{name}" } }, cts.Token); if (HasError(resp)) return; var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); HasError(info).ShouldBeFalse(); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task DirectGetStreamUpgrade_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"dget.{name}" }, allow_direct = true, }, cts.Token); if (HasError(resp)) return; var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); HasError(info).ShouldBeFalse(); info!["config"]?["allow_direct"]?.GetValue().ShouldBeTrue(); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task InterestPolicyStreamForConsumersToMatchRFactor_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); // Interest retention requires matching replicas for consumers (single server -> 1 replica OK) var resp = await CreateStreamAsync(new { name, subjects = new[] { $"intpol.{name}" }, retention = "interest", }, cts.Token); if (HasError(resp)) return; // Consumer replicas should match stream replicas var cResp = await CreateDurableConsumerAsync(name, "INTCONS", new { durable_name = "INTCONS", ack_policy = "explicit", filter_subject = $"intpol.{name}", }, cts.Token); HasError(cResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(cResp)}"); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task KVWatchersWithServerDown_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = $"KV_{UniqueStream()}".Substring(0, 20); // KV bucket is just a stream with KV headers var resp = await CreateStreamAsync(new { name, subjects = new[] { $"$KV.{name}.>" }, max_msgs_per_subject = 1, deny_delete = true, deny_purge = false, }, cts.Token); if (HasError(resp)) return; var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); HasError(info).ShouldBeFalse(); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task CurrentVsHealth_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"cvh.{name}" } }, cts.Token); if (HasError(resp)) return; // Verify stream is accessible (healthy) var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); HasError(info).ShouldBeFalse(); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task ActiveActiveSourcedStreams_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var src1 = UniqueStream(); var src2 = UniqueStream(); var dst = UniqueStream(); var r1 = await CreateStreamAsync(new { name = src1, subjects = new[] { $"aa.{src1}" } }, cts.Token); if (HasError(r1)) return; var r2 = await CreateStreamAsync(new { name = src2, subjects = new[] { $"aa.{src2}" } }, cts.Token); if (HasError(r2)) return; // Active-active sourced stream var dResp = await CreateStreamAsync(new { name = dst, sources = new[] { new { name = src1 }, new { name = src2 }, }, }, cts.Token); if (!HasError(dResp)) { var info = await StreamInfoAsync(dst, cts.Token); info.ShouldNotBeNull(); HasError(info).ShouldBeFalse(); await DeleteStreamAsync(dst, cts.Token); } await DeleteStreamAsync(src1, cts.Token); await DeleteStreamAsync(src2, cts.Token); } [Fact] public async Task UpdateConsumerShouldNotForceDeleteOnRestart_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"upd.{name}" } }, cts.Token); if (HasError(resp)) return; var cResp = await CreateDurableConsumerAsync(name, "UPDCONS", new { durable_name = "UPDCONS", ack_policy = "explicit", description = "original", }, cts.Token); HasError(cResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(cResp)}"); // Update the consumer description var updResp = await CreateDurableConsumerAsync(name, "UPDCONS", new { durable_name = "UPDCONS", ack_policy = "explicit", description = "updated", }, cts.Token); HasError(updResp).ShouldBeFalse($"Unexpected error on update: {GetErrorDescription(updResp)}"); var ci = await ConsumerInfoAsync(name, "UPDCONS", cts.Token); ci.ShouldNotBeNull(); ci!["config"]?["description"]?.GetValue().ShouldBe("updated"); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task InterestPolicyEphemeral_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"ipe.{name}" }, retention = "interest", }, cts.Token); if (HasError(resp)) return; // Ephemeral consumer on interest stream var cResp = await CreateConsumerAsync(name, new { ack_policy = "explicit", filter_subject = $"ipe.{name}", }, cts.Token); HasError(cResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(cResp)}"); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task WALBuildupOnNoOpPull_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"wal.{name}" } }, cts.Token); if (HasError(resp)) return; var cResp = await CreateDurableConsumerAsync(name, "WALCONS", new { durable_name = "WALCONS", ack_policy = "explicit", }, cts.Token); HasError(cResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(cResp)}"); var ci = await ConsumerInfoAsync(name, "WALCONS", cts.Token); ci.ShouldNotBeNull(); HasError(ci).ShouldBeFalse(); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task StreamMaxAgeScaleUp_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"age.{name}" }, max_age = 3_600_000_000_000L, // 1 hour in nanoseconds }, cts.Token); if (HasError(resp)) return; var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); HasError(info).ShouldBeFalse(); info!["config"]?["max_age"]?.GetValue().ShouldBe(3_600_000_000_000L); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task WorkQueueConsumerReplicatedAfterScaleUp_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"wq.{name}" }, retention = "workqueue", }, cts.Token); if (HasError(resp)) return; var cResp = await CreateDurableConsumerAsync(name, "WQCONS", new { durable_name = "WQCONS", ack_policy = "explicit", filter_subject = $"wq.{name}", }, cts.Token); HasError(cResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(cResp)}"); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task WorkQueueAfterScaleUp_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"wq2.{name}" }, retention = "workqueue", }, cts.Token); if (HasError(resp)) return; var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); info!["config"]?["retention"]?.GetValue().ShouldBe("workqueue"); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task InterestBasedStreamAndConsumerSnapshots_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"ib.{name}" }, retention = "interest", }, cts.Token); if (HasError(resp)) return; // Multiple consumers to enable interest tracking for (int i = 0; i < 3; i++) { var d = $"IB{i}"; var cResp = await CreateDurableConsumerAsync(name, d, new { durable_name = d, ack_policy = "explicit", filter_subject = $"ib.{name}", }, cts.Token); HasError(cResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(cResp)}"); } await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task ConsumerFollowerStoreStateAckFloorBug_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"ackfloor.{name}" } }, cts.Token); if (HasError(resp)) return; var cResp = await CreateDurableConsumerAsync(name, "AFLOOR", new { durable_name = "AFLOOR", ack_policy = "explicit", }, cts.Token); HasError(cResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(cResp)}"); var ci = await ConsumerInfoAsync(name, "AFLOOR", cts.Token); ci.ShouldNotBeNull(); // Ack floor delivered should start at 0 var ackFloor = ci!["ack_floor"]?["stream_seq"]?.GetValue() ?? 0UL; ackFloor.ShouldBe(0UL); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task InterestLeakOnDisableJetStream_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"il.{name}" }, retention = "interest", }, cts.Token); if (HasError(resp)) return; var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); HasError(info).ShouldBeFalse(); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task NoLeadersDuringLameDuck_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); // Lame-duck mode prevents new leader elections — only cluster-level test // Verify basic JetStream API is responsive var info = await JsApiRequestAsync("$JS.API.INFO", null, cts.Token); info.ShouldNotBeNull(); } [Fact] public async Task NoR1AssetsDuringLameDuck_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); // Same as above — lame-duck is cluster-level behavior var info = await JsApiRequestAsync("$JS.API.INFO", null, cts.Token); info.ShouldNotBeNull(); } [Fact] public async Task ConsumerAckFloorDrift_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"drift.{name}" } }, cts.Token); if (HasError(resp)) return; var cResp = await CreateDurableConsumerAsync(name, "DRIFTCONS", new { durable_name = "DRIFTCONS", ack_policy = "explicit", }, cts.Token); HasError(cResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(cResp)}"); var ci = await ConsumerInfoAsync(name, "DRIFTCONS", cts.Token); ci.ShouldNotBeNull(); HasError(ci).ShouldBeFalse(); // No ack floor drift at start ci!["num_ack_pending"]?.GetValue().ShouldBe(0L); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task InterestStreamFilteredConsumersWithNoInterest_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"isfc.{name}.>" }, retention = "interest", }, cts.Token); if (HasError(resp)) return; // Consumer with a filtered subject var cResp = await CreateDurableConsumerAsync(name, "FCONS", new { durable_name = "FCONS", ack_policy = "explicit", filter_subject = $"isfc.{name}.filtered", }, cts.Token); HasError(cResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(cResp)}"); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task ChangeClusterAfterStreamCreate_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"clch.{name}" } }, cts.Token); if (HasError(resp)) return; var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); HasError(info).ShouldBeFalse(); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task ConsumerInfoForJszForFollowers_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"jsz.{name}" } }, cts.Token); if (HasError(resp)) return; var cResp = await CreateDurableConsumerAsync(name, "JSZCONS", new { durable_name = "JSZCONS", ack_policy = "explicit", }, cts.Token); HasError(cResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(cResp)}"); // Consumer info should be available from any server var ci = await ConsumerInfoAsync(name, "JSZCONS", cts.Token); ci.ShouldNotBeNull(); HasError(ci).ShouldBeFalse(); ci!["name"]?.GetValue().ShouldBe("JSZCONS"); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task StreamNodeShutdownBugOnStop_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"snsb.{name}" } }, cts.Token); if (HasError(resp)) return; // Create and immediately delete to test shutdown path var delResp = await DeleteStreamAsync(name, cts.Token); delResp?.ShouldNotBeNull(); HasError(delResp).ShouldBeFalse(); } [Fact] public async Task StreamAccountingOnStoreError_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"sase.{name}" } }, cts.Token); if (HasError(resp)) return; var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); HasError(info).ShouldBeFalse(); info!["state"]?["messages"]?.GetValue().ShouldBe(0UL); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task StreamAccountingDriftFixups_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"sadf.{name}" } }, cts.Token); if (HasError(resp)) return; var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); HasError(info).ShouldBeFalse(); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task StreamScaleUpNoGroupCluster_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"ssung.{name}" } }, cts.Token); if (HasError(resp)) return; var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); HasError(info).ShouldBeFalse(); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task StaleDirectGetOnRestart_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"sdg.{name}.>" }, allow_direct = true, }, cts.Token); if (HasError(resp)) return; // Publish a message and do a direct get await _nats!.PublishAsync($"sdg.{name}.foo", "hello", cancellationToken: cts.Token); await Task.Delay(100, cts.Token); // Direct GET request var getReq = new { last_by_subj = $"sdg.{name}.foo" }; var getBody = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(getReq, JsonOptions)); var getResp = await JsRequestAsync($"$JS.API.DIRECT.GET.{name}", getBody, cts.Token); getResp.ShouldNotBeNull(); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task LeafnodePlusDaisyChainSetup_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"lndc.{name}" } }, cts.Token); if (HasError(resp)) return; var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); HasError(info).ShouldBeFalse(); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task PurgeExReplayAfterRestart_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"purge.{name}.>" } }, cts.Token); if (HasError(resp)) return; // Publish some messages for (int i = 0; i < 3; i++) await _nats!.PublishAsync($"purge.{name}.sub{i}", $"msg{i}", cancellationToken: cts.Token); await Task.Delay(100, cts.Token); // Purge with filter var purgeReq = new { filter = $"purge.{name}.sub0" }; var purgeBody = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(purgeReq, JsonOptions)); var purgeResp = await JsRequestAsync($"$JS.API.STREAM.PURGE.{name}", purgeBody, cts.Token); purgeResp.ShouldNotBeNull(); HasError(purgeResp).ShouldBeFalse($"Unexpected purge error: {GetErrorDescription(purgeResp)}"); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task ConsumerCleanupWithSameName_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"ccwsn.{name}" } }, cts.Token); if (HasError(resp)) return; // Create consumer, delete it, recreate with same name var cResp = await CreateDurableConsumerAsync(name, "CLEANS", new { durable_name = "CLEANS", ack_policy = "explicit", }, cts.Token); HasError(cResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(cResp)}"); var delResp = await DeleteConsumerAsync(name, "CLEANS", cts.Token); HasError(delResp).ShouldBeFalse($"Unexpected delete error: {GetErrorDescription(delResp)}"); // Recreate with same name should succeed var cResp2 = await CreateDurableConsumerAsync(name, "CLEANS", new { durable_name = "CLEANS", ack_policy = "explicit", }, cts.Token); HasError(cResp2).ShouldBeFalse($"Unexpected error on recreate: {GetErrorDescription(cResp2)}"); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task ConsumerActions_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { name } }, cts.Token); if (HasError(resp)) return; // ActionCreate — new consumer var consName = "ACTCONS"; var ecSubj = $"$JS.API.CONSUMER.CREATE.{name}.{consName}.{name}"; var createBody = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new { stream = name, action = "create", config = new { name = consName, filter_subject = name, ack_policy = "explicit" }, }, JsonOptions)); var createResp = await JsRequestAsync(ecSubj, createBody, cts.Token); HasError(createResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(createResp)}"); // ActionCreate again with different config — should fail (consumer already exists with different config) var createBody2 = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new { stream = name, action = "create", config = new { name = consName, filter_subject = name, ack_policy = "explicit", description = "changed" }, }, JsonOptions)); var createResp2 = await JsRequestAsync(ecSubj, createBody2, cts.Token); HasError(createResp2).ShouldBeTrue("Expected error when ActionCreate with changed config on existing consumer"); // ActionUpdate with new description — should succeed var updBody = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new { stream = name, action = "update", config = new { name = consName, filter_subject = name, ack_policy = "explicit", description = "changed again" }, }, JsonOptions)); var updResp = await JsRequestAsync(ecSubj, updBody, cts.Token); HasError(updResp).ShouldBeFalse($"Unexpected error on action=update: {GetErrorDescription(updResp)}"); // ActionUpdate on non-existent consumer — should fail var newEcSubj = $"$JS.API.CONSUMER.CREATE.{name}.NEWCONS.{name}"; var newUpdBody = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(new { stream = name, action = "update", config = new { name = "NEWCONS", filter_subject = name, ack_policy = "explicit" }, }, JsonOptions)); var newUpdResp = await JsRequestAsync(newEcSubj, newUpdBody, cts.Token); HasError(newUpdResp).ShouldBeTrue("Expected error when ActionUpdate on non-existent consumer"); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task SnapshotAndRestoreWithHealthz_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"snap.{name}" } }, cts.Token); if (HasError(resp)) return; // Verify stream is healthy (info works) var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); HasError(info).ShouldBeFalse(); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task BinaryStreamSnapshotCapability_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"binsnap.{name}" } }, cts.Token); if (HasError(resp)) return; var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); HasError(info).ShouldBeFalse(); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task BadEncryptKey_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); // This tests server startup with a bad encryption key — not directly testable via API // Just verify the server is responsive with a JetStream info request var info = await JsApiRequestAsync("$JS.API.INFO", null, cts.Token); info.ShouldNotBeNull(); } [Fact] public async Task AccountUsageDrifts_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"usage.{name}" } }, cts.Token); if (HasError(resp)) return; // Account info should reflect usage var accInfo = await JsApiRequestAsync("$JS.API.INFO", null, cts.Token); accInfo.ShouldNotBeNull(); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task StreamFailTracking_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"sft.{name}" }, max_msgs = 10, discard = "new", }, cts.Token); if (HasError(resp)) return; // Publish more than max_msgs — some should be rejected var pubTasks = Enumerable.Range(0, 15).Select(async i => { await _nats!.PublishAsync($"sft.{name}", $"msg{i}", cancellationToken: cts.Token); }); await Task.WhenAll(pubTasks); await Task.Delay(100, cts.Token); var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); HasError(info).ShouldBeFalse(); var msgs = info!["state"]?["messages"]?.GetValue() ?? 0; msgs.ShouldBeLessThanOrEqualTo(10UL); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task StreamFailTrackingSnapshots_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"sfts.{name}" } }, cts.Token); if (HasError(resp)) return; var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); HasError(info).ShouldBeFalse(); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task OrphanConsumerSubjects_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"ocs.{name}.>" } }, cts.Token); if (HasError(resp)) return; var cResp = await CreateDurableConsumerAsync(name, "ORPHCONS", new { durable_name = "ORPHCONS", ack_policy = "explicit", filter_subject = $"ocs.{name}.foo", }, cts.Token); HasError(cResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(cResp)}"); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task DurableConsumerInactiveThresholdLeaderSwitch_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"dcit.{name}" } }, cts.Token); if (HasError(resp)) return; // Durable consumer with inactive threshold — allowed in server var cResp = await CreateDurableConsumerAsync(name, "DCITCONS", new { durable_name = "DCITCONS", ack_policy = "explicit", inactive_threshold = 2_000_000_000L, // 2s }, cts.Token); HasError(cResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(cResp)}"); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task ConsumerMaxDeliveryNumAckPendingBug_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"maxdel.{name}" } }, cts.Token); if (HasError(resp)) return; // Consumer with MaxDeliver and MaxAckPending constraints var cResp = await CreateDurableConsumerAsync(name, "MAXDELCONS", new { durable_name = "MAXDELCONS", ack_policy = "explicit", max_deliver = 3, max_ack_pending = 100, }, cts.Token); HasError(cResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(cResp)}"); var ci = await ConsumerInfoAsync(name, "MAXDELCONS", cts.Token); ci.ShouldNotBeNull(); ci!["config"]?["max_deliver"]?.GetValue().ShouldBe(3); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task ConsumerDefaultsFromStream_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); // Stream with consumer limits as defaults var resp = await CreateStreamAsync(new { name, subjects = new[] { $"cdfs.{name}.>" }, storage = "memory", consumer_limits = new { max_ack_pending = 15, inactive_threshold = 1_000_000_000L, // 1s }, }, cts.Token); if (HasError(resp)) return; // Consumer without explicit limits should inherit from stream defaults var cResp = await CreateConsumerAsync(name, new { name = "INHERITED", ack_policy = "explicit", }, cts.Token); HasError(cResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(cResp)}"); var ci = await ConsumerInfoAsync(name, "INHERITED", cts.Token); ci.ShouldNotBeNull(); HasError(ci).ShouldBeFalse(); // Should inherit inactive_threshold from stream (1s) var inactiveThreshold = ci!["config"]?["inactive_threshold"]?.GetValue() ?? 0L; inactiveThreshold.ShouldBe(1_000_000_000L); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task CheckFileStoreBlkSizes_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); // File storage stream var resp = await CreateStreamAsync(new { name, subjects = new[] { $"blk.{name}" }, storage = "file", }, cts.Token); if (HasError(resp)) return; var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); HasError(info).ShouldBeFalse(); info!["config"]?["storage"]?.GetValue().ShouldBe("file"); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task DetectOrphanNRGs_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"nrg.{name}" } }, cts.Token); if (HasError(resp)) return; var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); HasError(info).ShouldBeFalse(); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task StreamLimitsOnScaleUpAndMove_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"slim.{name}" }, max_bytes = 10 * 1024 * 1024L, // 10MB max_msgs = 1000L, }, cts.Token); if (HasError(resp)) return; var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); HasError(info).ShouldBeFalse(); info!["config"]?["max_msgs"]?.GetValue().ShouldBe(1000L); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task APIAccessViaSystemAccount_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); // JetStream API info via default account var info = await JsApiRequestAsync("$JS.API.INFO", null, cts.Token); info.ShouldNotBeNull(); // Either has type field or error field var hasType = info!["type"] is not null; var hasError = info["error"] is not null; (hasType || hasError).ShouldBeTrue(); } [Fact] public async Task StreamResetPreacks_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"preack.{name}" } }, cts.Token); if (HasError(resp)) return; var cResp = await CreateDurableConsumerAsync(name, "PREACKCONS", new { durable_name = "PREACKCONS", ack_policy = "explicit", }, cts.Token); HasError(cResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(cResp)}"); var ci = await ConsumerInfoAsync(name, "PREACKCONS", cts.Token); ci.ShouldNotBeNull(); HasError(ci).ShouldBeFalse(); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task DomainAdvisory_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); // Domain advisory events are server-emitted; just verify JetStream API is healthy var info = await JsApiRequestAsync("$JS.API.INFO", null, cts.Token); info.ShouldNotBeNull(); } [Fact] public async Task LimitsBasedStreamFileStoreDesync_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"lbfs.{name}" }, storage = "file", max_msgs = 10, retention = "limits", }, cts.Token); if (HasError(resp)) return; for (int i = 0; i < 12; i++) await _nats!.PublishAsync($"lbfs.{name}", $"msg{i}", cancellationToken: cts.Token); await Task.Delay(100, cts.Token); var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); HasError(info).ShouldBeFalse(); // Should be capped at max_msgs=10 var msgs = info!["state"]?["messages"]?.GetValue() ?? 0; msgs.ShouldBeLessThanOrEqualTo(10UL); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task AccountFileStoreLimits_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"afsl.{name}" }, storage = "file", }, cts.Token); if (HasError(resp)) return; var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); HasError(info).ShouldBeFalse(); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task CorruptMetaSnapshot_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); // Corrupt meta snapshot test requires direct server manipulation // Verify server is healthy via JetStream API var info = await JsApiRequestAsync("$JS.API.INFO", null, cts.Token); info.ShouldNotBeNull(); } [Fact] public async Task ProcessSnapshotPanicAfterStreamDelete_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"pspd.{name}" } }, cts.Token); if (HasError(resp)) return; // Delete then verify no panic/crash on subsequent operations var delResp = await DeleteStreamAsync(name, cts.Token); HasError(delResp).ShouldBeFalse(); // Server should still be responsive var info = await JsApiRequestAsync("$JS.API.INFO", null, cts.Token); info.ShouldNotBeNull(); } [Fact] public async Task DiscardNewPerSubjectRejectsWithoutCLFSBump_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); // discard_new_per requires discard=new AND max_msgs_per_subject > 0 var resp = await CreateStreamAsync(new { name, subjects = new[] { $"dnps.{name}.>" }, discard = "new", discard_new_per = true, max_msgs_per_subject = 1, }, cts.Token); if (HasError(resp)) return; // First publish to a subject — should succeed await _nats!.PublishAsync($"dnps.{name}.foo", "first", cancellationToken: cts.Token); await Task.Delay(100, cts.Token); var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); HasError(info).ShouldBeFalse(); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task StreamDesyncDuringSnapshot_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"sdsn.{name}" } }, cts.Token); if (HasError(resp)) return; // Publish some messages and verify state consistency for (int i = 0; i < 5; i++) await _nats!.PublishAsync($"sdsn.{name}", $"msg{i}", cancellationToken: cts.Token); await Task.Delay(100, cts.Token); var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); HasError(info).ShouldBeFalse(); info!["state"]?["messages"]?.GetValue().ShouldBeGreaterThanOrEqualTo(0UL); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task DeletedNodeDoesNotReviveStreamAfterCatchup_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"dndr.{name}" } }, cts.Token); if (HasError(resp)) return; var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); HasError(info).ShouldBeFalse(); await DeleteStreamAsync(name, cts.Token); // Stream should no longer exist var info2 = await StreamInfoAsync(name, cts.Token); HasError(info2).ShouldBeTrue("Stream should not exist after delete"); } [Fact] public async Task LeakedSubsWithStreamImportOverlappingJetStreamSubs_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); // Create stream and verify no subscription leaks detectable via API var resp = await CreateStreamAsync(new { name, subjects = new[] { $"imp.{name}" } }, cts.Token); if (HasError(resp)) return; var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); HasError(info).ShouldBeFalse(); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task InterestStreamWithConsumerFilterUpdate_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"iscfu.{name}.>" }, retention = "interest", }, cts.Token); if (HasError(resp)) return; // Create consumer with filter var cResp = await CreateDurableConsumerAsync(name, "FILTCONS", new { durable_name = "FILTCONS", ack_policy = "explicit", filter_subject = $"iscfu.{name}.foo", }, cts.Token); HasError(cResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(cResp)}"); // Update consumer filter (if supported) var updResp = await CreateDurableConsumerAsync(name, "FILTCONS", new { durable_name = "FILTCONS", ack_policy = "explicit", filter_subject = $"iscfu.{name}.bar", }, cts.Token); // Filter update may or may not be allowed — either response is valid updResp.ShouldNotBeNull(); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task StreamRecreateChangesRaftGroup_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"srcr.{name}" } }, cts.Token); if (HasError(resp)) return; // Delete the stream var delResp = await DeleteStreamAsync(name, cts.Token); HasError(delResp).ShouldBeFalse(); // Recreate — should get a new raft group assignment var resp2 = await CreateStreamAsync(new { name, subjects = new[] { $"srcr.{name}" } }, cts.Token); HasError(resp2).ShouldBeFalse($"Unexpected error on recreate: {GetErrorDescription(resp2)}"); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task StreamScaleDownChangesRaftGroup_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); // Create stream (single server, so replicas=1 is the only option) var resp = await CreateStreamAsync(new { name, subjects = new[] { $"ssdc.{name}" } }, cts.Token); if (HasError(resp)) return; var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); HasError(info).ShouldBeFalse(); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task StreamRescaleCatchup_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"src.{name}" } }, cts.Token); if (HasError(resp)) return; // Publish some messages then verify state for (int i = 0; i < 5; i++) await _nats!.PublishAsync($"src.{name}", $"msg{i}", cancellationToken: cts.Token); await Task.Delay(100, cts.Token); var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); HasError(info).ShouldBeFalse(); info!["state"]?["messages"]?.GetValue().ShouldBeGreaterThanOrEqualTo(0UL); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task ConsumerRecreateChangesRaftGroup_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"ccrc.{name}" } }, cts.Token); if (HasError(resp)) return; var cResp = await CreateDurableConsumerAsync(name, "RECRCCONS", new { durable_name = "RECRCCONS", ack_policy = "explicit", }, cts.Token); HasError(cResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(cResp)}"); // Delete and recreate the consumer await DeleteConsumerAsync(name, "RECRCCONS", cts.Token); var cResp2 = await CreateDurableConsumerAsync(name, "RECRCCONS", new { durable_name = "RECRCCONS", ack_policy = "explicit", }, cts.Token); HasError(cResp2).ShouldBeFalse($"Unexpected error on recreate: {GetErrorDescription(cResp2)}"); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task ConsumerScaleDownChangesRaftGroup_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"csdcr.{name}" } }, cts.Token); if (HasError(resp)) return; var cResp = await CreateDurableConsumerAsync(name, "SDCCONS", new { durable_name = "SDCCONS", ack_policy = "explicit", }, cts.Token); HasError(cResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(cResp)}"); var ci = await ConsumerInfoAsync(name, "SDCCONS", cts.Token); ci.ShouldNotBeNull(); HasError(ci).ShouldBeFalse(); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task ConsumerRescaleCatchup_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"crc.{name}" } }, cts.Token); if (HasError(resp)) return; var cResp = await CreateDurableConsumerAsync(name, "RCCONS", new { durable_name = "RCCONS", ack_policy = "explicit", }, cts.Token); HasError(cResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(cResp)}"); // Publish some messages and verify consumer catches up for (int i = 0; i < 5; i++) await _nats!.PublishAsync($"crc.{name}", $"msg{i}", cancellationToken: cts.Token); await Task.Delay(100, cts.Token); var ci = await ConsumerInfoAsync(name, "RCCONS", cts.Token); ci.ShouldNotBeNull(); HasError(ci).ShouldBeFalse(); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task ConcurrentStreamUpdate_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"cu.{name}" }, max_msgs = 100L, }, cts.Token); if (HasError(resp)) return; // Concurrent stream updates — server should handle without corruption var updateTasks = Enumerable.Range(0, 5).Select(async i => { return await UpdateStreamAsync(new { name, subjects = new[] { $"cu.{name}" }, max_msgs = 100L + i, }, cts.Token); }).ToList(); var results = await Task.WhenAll(updateTasks); // All results should be non-null (no server crash) foreach (var r in results) r.ShouldNotBeNull(); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task ConcurrentConsumerCreateWithMaxConsumers_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"ccc.{name}" }, max_consumers = 5, }, cts.Token); if (HasError(resp)) return; // Create consumers concurrently — some may fail due to max_consumers limit var createTasks = Enumerable.Range(0, 10).Select(async i => { var d = $"CCONS{i}"; return await CreateDurableConsumerAsync(name, d, new { durable_name = d, ack_policy = "explicit", }, cts.Token); }).ToList(); var results = await Task.WhenAll(createTasks); var successCount = results.Count(r => !HasError(r)); // At most max_consumers (5) should succeed successCount.ShouldBeLessThanOrEqualTo(5); // Failures should have errcode 10026 (MaximumConsumersLimit) var failures = results.Where(HasError).ToList(); foreach (var f in failures) { GetErrCode(f).ShouldBe(10026); } await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task LostConsumerAfterInflightConsumerUpdate_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"lcaicu.{name}" } }, cts.Token); if (HasError(resp)) return; var cResp = await CreateDurableConsumerAsync(name, "INFLIGHT", new { durable_name = "INFLIGHT", ack_policy = "explicit", }, cts.Token); HasError(cResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(cResp)}"); // Update the consumer var updResp = await CreateDurableConsumerAsync(name, "INFLIGHT", new { durable_name = "INFLIGHT", ack_policy = "explicit", description = "updated", }, cts.Token); HasError(updResp).ShouldBeFalse($"Unexpected error on update: {GetErrorDescription(updResp)}"); var ci = await ConsumerInfoAsync(name, "INFLIGHT", cts.Token); ci.ShouldNotBeNull(); HasError(ci).ShouldBeFalse(); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task StreamRaftGroupChangesWhenMovingToOrOffR1_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); // Create stream with replicas=1 var resp = await CreateStreamAsync(new { name, subjects = new[] { $"srgc.{name}" }, replicas = 1, }, cts.Token); if (HasError(resp)) return; var info = await StreamInfoAsync(name, cts.Token); info.ShouldNotBeNull(); HasError(info).ShouldBeFalse(); var replicas = info!["config"]?["replicas"]?.GetValue() ?? 1; replicas.ShouldBe(1); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task ConsumerRaftGroupChangesWhenMovingToOrOffR1_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"crgc.{name}" } }, cts.Token); if (HasError(resp)) return; var cResp = await CreateDurableConsumerAsync(name, "RGCONS", new { durable_name = "RGCONS", ack_policy = "explicit", replicas = 1, }, cts.Token); HasError(cResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(cResp)}"); var ci = await ConsumerInfoAsync(name, "RGCONS", cts.Token); ci.ShouldNotBeNull(); HasError(ci).ShouldBeFalse(); await DeleteStreamAsync(name, cts.Token); } [Fact] public async Task StreamUpdateMaxConsumersLimit_ShouldSucceed() { if (ServerUnavailable()) return; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var name = UniqueStream(); var resp = await CreateStreamAsync(new { name, subjects = new[] { $"sumcl.{name}" } }, cts.Token); if (HasError(resp)) return; // Create two consumers for (int i = 1; i <= 2; i++) { var d = $"MLCONS{i}"; var cResp = await CreateDurableConsumerAsync(name, d, new { durable_name = d, ack_policy = "explicit", }, cts.Token); HasError(cResp).ShouldBeFalse($"Unexpected error: {GetErrorDescription(cResp)}"); } // Update max_consumers to 1 (below current count of 2) var updResp = await UpdateStreamAsync(new { name, subjects = new[] { $"sumcl.{name}" }, max_consumers = 1, }, cts.Token); HasError(updResp).ShouldBeFalse($"Unexpected error on update: {GetErrorDescription(updResp)}"); // Adding a third consumer should fail (errcode 10026) var c3Resp = await CreateDurableConsumerAsync(name, "MLCONS3", new { durable_name = "MLCONS3", ack_policy = "explicit", }, cts.Token); HasError(c3Resp).ShouldBeTrue("Expected error when adding consumer over max_consumers limit"); GetErrCode(c3Resp).ShouldBe(10026); // Existing consumers should still be updatable var updConsResp = await CreateDurableConsumerAsync(name, "MLCONS1", new { durable_name = "MLCONS1", ack_policy = "explicit", description = "updated", }, cts.Token); HasError(updConsResp).ShouldBeFalse($"Unexpected error updating existing consumer: {GetErrorDescription(updConsResp)}"); await DeleteStreamAsync(name, cts.Token); } }