From e9c86c51c37abda48cd6f66c3d780c306471ea2c Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 13 Mar 2026 01:14:21 -0400 Subject: [PATCH] fix: resolve 19 JetStream test failures across 5 root causes - HandleList: populate StreamNames/ConsumerNames alongside info lists - ValidateConfigUpdate: allow clearing mirror/sources, accept even replicas - ToWireFormat: add AccountInfo branch for $JS.API.INFO responses - UpdateStream fixture: preserve existing retention policy on update - Integration test: fix assertion to match valid account info response --- NatsDotNet.slnx | 1 + .../JetStream/Api/Handlers/ConsumerApiHandlers.cs | 1 + .../JetStream/Api/Handlers/StreamApiHandlers.cs | 1 + .../JetStream/Api/JetStreamApiResponse.cs | 10 ++++++++++ src/NATS.Server/JetStream/StreamManager.cs | 15 ++++++++------- .../Streams/ConfigUpdateValidationTests.cs | 8 ++++---- .../JetStreamApiProtocolIntegrationTests.cs | 2 +- .../JetStreamClusterFixture.cs | 11 ++++++++++- 8 files changed, 36 insertions(+), 13 deletions(-) diff --git a/NatsDotNet.slnx b/NatsDotNet.slnx index bfdc298..e800388 100644 --- a/NatsDotNet.slnx +++ b/NatsDotNet.slnx @@ -17,5 +17,6 @@ + diff --git a/src/NATS.Server/JetStream/Api/Handlers/ConsumerApiHandlers.cs b/src/NATS.Server/JetStream/Api/Handlers/ConsumerApiHandlers.cs index 605e342..2cd508c 100644 --- a/src/NATS.Server/JetStream/Api/Handlers/ConsumerApiHandlers.cs +++ b/src/NATS.Server/JetStream/Api/Handlers/ConsumerApiHandlers.cs @@ -82,6 +82,7 @@ public static class ConsumerApiHandlers return new JetStreamApiResponse { ConsumerInfoList = page, + ConsumerNames = page.Select(c => c.Name ?? string.Empty).ToList(), PaginationTotal = all.Count, PaginationOffset = offset, }; diff --git a/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs b/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs index 65f1c59..72951d4 100644 --- a/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs +++ b/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs @@ -123,6 +123,7 @@ public static class StreamApiHandlers return new JetStreamApiResponse { StreamInfoList = page, + StreamNames = page.Select(s => s.Config.Name).ToList(), PaginationTotal = all.Count, PaginationOffset = offset, }; diff --git a/src/NATS.Server/JetStream/Api/JetStreamApiResponse.cs b/src/NATS.Server/JetStream/Api/JetStreamApiResponse.cs index b815e0e..7ac16c3 100644 --- a/src/NATS.Server/JetStream/Api/JetStreamApiResponse.cs +++ b/src/NATS.Server/JetStream/Api/JetStreamApiResponse.cs @@ -77,6 +77,16 @@ public sealed class JetStreamApiResponse if (Error != null) return new { error = Error }; + if (AccountInfo != null) + { + return new + { + type = "io.nats.jetstream.api.v1.account_info_response", + streams = AccountInfo.Streams, + consumers = AccountInfo.Consumers, + }; + } + if (StreamInfoList != null) { var wireStreams = StreamInfoList.Select(s => new diff --git a/src/NATS.Server/JetStream/StreamManager.cs b/src/NATS.Server/JetStream/StreamManager.cs index bcca1b5..c30c5b0 100644 --- a/src/NATS.Server/JetStream/StreamManager.cs +++ b/src/NATS.Server/JetStream/StreamManager.cs @@ -565,15 +565,20 @@ public sealed class StreamManager : IDisposable if (existing.Storage != proposed.Storage) errors.Add("storage type cannot be changed"); - // Mirror is immutable — if the existing stream has a mirror, the proposed must keep it. + // Mirror is immutable — if both have a mirror it must match. Clearing a mirror + // (promoting to normal stream) is allowed after the origin is deleted. + // Go reference: server/stream.go — update allows clearing mirror for promotion. if (!string.IsNullOrWhiteSpace(existing.Mirror) + && !string.IsNullOrWhiteSpace(proposed.Mirror) && !string.Equals(existing.Mirror, proposed.Mirror, StringComparison.Ordinal)) { errors.Add("mirror configuration cannot be changed"); } - // Sources are immutable after creation — the set of source names must be unchanged. - if (existing.Sources.Count > 0) + // Sources: changing to a different non-empty set is not allowed, but clearing + // sources (removing all) or adding sources to a previously source-less stream is permitted. + // Go reference: server/stream.go — update allows adding/removing sources. + if (existing.Sources.Count > 0 && proposed.Sources.Count > 0) { var existingNames = existing.Sources.Select(s => s.Name).OrderBy(n => n, StringComparer.Ordinal).ToList(); var proposedNames = proposed.Sources.Select(s => s.Name).OrderBy(n => n, StringComparer.Ordinal).ToList(); @@ -592,10 +597,6 @@ public sealed class StreamManager : IDisposable errors.Add("max consumers can only be increased"); } - // Replicas must be odd (for RAFT consensus). - if (proposed.Replicas > 1 && proposed.Replicas % 2 == 0) - errors.Add("replicas must be odd for RAFT consensus"); - // Subject overlap detection with peer streams. if (otherStreams is not null && proposed.Subjects.Count > 0) { diff --git a/tests/NATS.Server.JetStream.Tests/JetStream/Streams/ConfigUpdateValidationTests.cs b/tests/NATS.Server.JetStream.Tests/JetStream/Streams/ConfigUpdateValidationTests.cs index ec6a13d..8f9c7a4 100644 --- a/tests/NATS.Server.JetStream.Tests/JetStream/Streams/ConfigUpdateValidationTests.cs +++ b/tests/NATS.Server.JetStream.Tests/JetStream/Streams/ConfigUpdateValidationTests.cs @@ -278,10 +278,10 @@ public class ConfigUpdateValidationTests errors.ShouldBeEmpty(); } - // Go ref: server/stream.go — RAFT consensus requires an odd number of replicas. - // Setting replicas to an even number must be rejected. + // Go ref: server/stream.go — Go server supports even replica counts (e.g., R2). + // Even replicas should be accepted by config update validation. [Fact] - public void ValidateConfigUpdate_rejects_even_replicas() + public void ValidateConfigUpdate_accepts_even_replicas() { var existing = new StreamConfig { @@ -300,7 +300,7 @@ public class ConfigUpdateValidationTests var errors = StreamManager.ValidateConfigUpdate(existing, proposed); - errors.ShouldContain(e => e.Contains("replicas must be odd")); + errors.ShouldBeEmpty(); } // Go ref: server/stream.go:1500-1600 (stream.update) — integration via StreamManager. diff --git a/tests/NATS.Server.JetStream.Tests/JetStreamApiProtocolIntegrationTests.cs b/tests/NATS.Server.JetStream.Tests/JetStreamApiProtocolIntegrationTests.cs index cea1926..2862472 100644 --- a/tests/NATS.Server.JetStream.Tests/JetStreamApiProtocolIntegrationTests.cs +++ b/tests/NATS.Server.JetStream.Tests/JetStreamApiProtocolIntegrationTests.cs @@ -12,7 +12,7 @@ public class JetStreamApiProtocolIntegrationTests await using var server = await ServerFixture.StartJetStreamEnabledAsync(); var response = await server.RequestAsync("$JS.API.INFO", "{}", timeoutMs: 1000); - response.ShouldContain("\"error\""); + response.ShouldContain("\"streams\""); } } diff --git a/tests/NATS.Server.TestUtilities/JetStreamClusterFixture.cs b/tests/NATS.Server.TestUtilities/JetStreamClusterFixture.cs index 3dd3f98..9de4787 100644 --- a/tests/NATS.Server.TestUtilities/JetStreamClusterFixture.cs +++ b/tests/NATS.Server.TestUtilities/JetStreamClusterFixture.cs @@ -121,13 +121,22 @@ public sealed class JetStreamClusterFixture : IAsyncDisposable /// Go ref: updateStream in jetstream_helpers_test.go. /// public JetStreamApiResponse UpdateStream(string name, string[] subjects, int replicas, int maxMsgs = 0) - => _streamManager.CreateOrUpdate(new StreamConfig + { + // Preserve the existing stream's retention policy so ValidateConfigUpdate + // does not reject the update for changing an immutable field. + var retention = RetentionPolicy.Limits; + if (_streamManager.TryGet(name, out var existing)) + retention = existing.Config.Retention; + + return _streamManager.CreateOrUpdate(new StreamConfig { Name = name, Subjects = [.. subjects], Replicas = replicas, MaxMsgs = maxMsgs, + Retention = retention, }); + } /// /// Returns the full stream info response.