fix: resolve 19 JetStream test failures across 5 root causes
- HandleList: populate StreamNames/ConsumerNames alongside info lists - ValidateConfigUpdate: allow clearing mirror/sources, accept even replicas - ToWireFormat: add AccountInfo branch for $JS.API.INFO responses - UpdateStream fixture: preserve existing retention policy on update - Integration test: fix assertion to match valid account info response
This commit is contained in:
@@ -17,5 +17,6 @@
|
|||||||
<Project Path="tests/NATS.Server.JetStream.Tests/NATS.Server.JetStream.Tests.csproj" />
|
<Project Path="tests/NATS.Server.JetStream.Tests/NATS.Server.JetStream.Tests.csproj" />
|
||||||
<Project Path="tests/NATS.E2E.Tests/NATS.E2E.Tests.csproj" />
|
<Project Path="tests/NATS.E2E.Tests/NATS.E2E.Tests.csproj" />
|
||||||
<Project Path="tests/NATS.E2E.Cluster.Tests/NATS.E2E.Cluster.Tests.csproj" />
|
<Project Path="tests/NATS.E2E.Cluster.Tests/NATS.E2E.Cluster.Tests.csproj" />
|
||||||
|
<Project Path="tests/NATS.Server.Benchmark.Tests/NATS.Server.Benchmark.Tests.csproj" />
|
||||||
</Folder>
|
</Folder>
|
||||||
</Solution>
|
</Solution>
|
||||||
|
|||||||
@@ -82,6 +82,7 @@ public static class ConsumerApiHandlers
|
|||||||
return new JetStreamApiResponse
|
return new JetStreamApiResponse
|
||||||
{
|
{
|
||||||
ConsumerInfoList = page,
|
ConsumerInfoList = page,
|
||||||
|
ConsumerNames = page.Select(c => c.Name ?? string.Empty).ToList(),
|
||||||
PaginationTotal = all.Count,
|
PaginationTotal = all.Count,
|
||||||
PaginationOffset = offset,
|
PaginationOffset = offset,
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -123,6 +123,7 @@ public static class StreamApiHandlers
|
|||||||
return new JetStreamApiResponse
|
return new JetStreamApiResponse
|
||||||
{
|
{
|
||||||
StreamInfoList = page,
|
StreamInfoList = page,
|
||||||
|
StreamNames = page.Select(s => s.Config.Name).ToList(),
|
||||||
PaginationTotal = all.Count,
|
PaginationTotal = all.Count,
|
||||||
PaginationOffset = offset,
|
PaginationOffset = offset,
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -77,6 +77,16 @@ public sealed class JetStreamApiResponse
|
|||||||
if (Error != null)
|
if (Error != null)
|
||||||
return new { error = Error };
|
return new { error = Error };
|
||||||
|
|
||||||
|
if (AccountInfo != null)
|
||||||
|
{
|
||||||
|
return new
|
||||||
|
{
|
||||||
|
type = "io.nats.jetstream.api.v1.account_info_response",
|
||||||
|
streams = AccountInfo.Streams,
|
||||||
|
consumers = AccountInfo.Consumers,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
if (StreamInfoList != null)
|
if (StreamInfoList != null)
|
||||||
{
|
{
|
||||||
var wireStreams = StreamInfoList.Select(s => new
|
var wireStreams = StreamInfoList.Select(s => new
|
||||||
|
|||||||
@@ -565,15 +565,20 @@ public sealed class StreamManager : IDisposable
|
|||||||
if (existing.Storage != proposed.Storage)
|
if (existing.Storage != proposed.Storage)
|
||||||
errors.Add("storage type cannot be changed");
|
errors.Add("storage type cannot be changed");
|
||||||
|
|
||||||
// Mirror is immutable — if the existing stream has a mirror, the proposed must keep it.
|
// Mirror is immutable — if both have a mirror it must match. Clearing a mirror
|
||||||
|
// (promoting to normal stream) is allowed after the origin is deleted.
|
||||||
|
// Go reference: server/stream.go — update allows clearing mirror for promotion.
|
||||||
if (!string.IsNullOrWhiteSpace(existing.Mirror)
|
if (!string.IsNullOrWhiteSpace(existing.Mirror)
|
||||||
|
&& !string.IsNullOrWhiteSpace(proposed.Mirror)
|
||||||
&& !string.Equals(existing.Mirror, proposed.Mirror, StringComparison.Ordinal))
|
&& !string.Equals(existing.Mirror, proposed.Mirror, StringComparison.Ordinal))
|
||||||
{
|
{
|
||||||
errors.Add("mirror configuration cannot be changed");
|
errors.Add("mirror configuration cannot be changed");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sources are immutable after creation — the set of source names must be unchanged.
|
// Sources: changing to a different non-empty set is not allowed, but clearing
|
||||||
if (existing.Sources.Count > 0)
|
// sources (removing all) or adding sources to a previously source-less stream is permitted.
|
||||||
|
// Go reference: server/stream.go — update allows adding/removing sources.
|
||||||
|
if (existing.Sources.Count > 0 && proposed.Sources.Count > 0)
|
||||||
{
|
{
|
||||||
var existingNames = existing.Sources.Select(s => s.Name).OrderBy(n => n, StringComparer.Ordinal).ToList();
|
var existingNames = existing.Sources.Select(s => s.Name).OrderBy(n => n, StringComparer.Ordinal).ToList();
|
||||||
var proposedNames = proposed.Sources.Select(s => s.Name).OrderBy(n => n, StringComparer.Ordinal).ToList();
|
var proposedNames = proposed.Sources.Select(s => s.Name).OrderBy(n => n, StringComparer.Ordinal).ToList();
|
||||||
@@ -592,10 +597,6 @@ public sealed class StreamManager : IDisposable
|
|||||||
errors.Add("max consumers can only be increased");
|
errors.Add("max consumers can only be increased");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Replicas must be odd (for RAFT consensus).
|
|
||||||
if (proposed.Replicas > 1 && proposed.Replicas % 2 == 0)
|
|
||||||
errors.Add("replicas must be odd for RAFT consensus");
|
|
||||||
|
|
||||||
// Subject overlap detection with peer streams.
|
// Subject overlap detection with peer streams.
|
||||||
if (otherStreams is not null && proposed.Subjects.Count > 0)
|
if (otherStreams is not null && proposed.Subjects.Count > 0)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -278,10 +278,10 @@ public class ConfigUpdateValidationTests
|
|||||||
errors.ShouldBeEmpty();
|
errors.ShouldBeEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Go ref: server/stream.go — RAFT consensus requires an odd number of replicas.
|
// Go ref: server/stream.go — Go server supports even replica counts (e.g., R2).
|
||||||
// Setting replicas to an even number must be rejected.
|
// Even replicas should be accepted by config update validation.
|
||||||
[Fact]
|
[Fact]
|
||||||
public void ValidateConfigUpdate_rejects_even_replicas()
|
public void ValidateConfigUpdate_accepts_even_replicas()
|
||||||
{
|
{
|
||||||
var existing = new StreamConfig
|
var existing = new StreamConfig
|
||||||
{
|
{
|
||||||
@@ -300,7 +300,7 @@ public class ConfigUpdateValidationTests
|
|||||||
|
|
||||||
var errors = StreamManager.ValidateConfigUpdate(existing, proposed);
|
var errors = StreamManager.ValidateConfigUpdate(existing, proposed);
|
||||||
|
|
||||||
errors.ShouldContain(e => e.Contains("replicas must be odd"));
|
errors.ShouldBeEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Go ref: server/stream.go:1500-1600 (stream.update) — integration via StreamManager.
|
// Go ref: server/stream.go:1500-1600 (stream.update) — integration via StreamManager.
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ public class JetStreamApiProtocolIntegrationTests
|
|||||||
await using var server = await ServerFixture.StartJetStreamEnabledAsync();
|
await using var server = await ServerFixture.StartJetStreamEnabledAsync();
|
||||||
var response = await server.RequestAsync("$JS.API.INFO", "{}", timeoutMs: 1000);
|
var response = await server.RequestAsync("$JS.API.INFO", "{}", timeoutMs: 1000);
|
||||||
|
|
||||||
response.ShouldContain("\"error\"");
|
response.ShouldContain("\"streams\"");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -121,13 +121,22 @@ public sealed class JetStreamClusterFixture : IAsyncDisposable
|
|||||||
/// Go ref: updateStream in jetstream_helpers_test.go.
|
/// Go ref: updateStream in jetstream_helpers_test.go.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public JetStreamApiResponse UpdateStream(string name, string[] subjects, int replicas, int maxMsgs = 0)
|
public JetStreamApiResponse UpdateStream(string name, string[] subjects, int replicas, int maxMsgs = 0)
|
||||||
=> _streamManager.CreateOrUpdate(new StreamConfig
|
{
|
||||||
|
// Preserve the existing stream's retention policy so ValidateConfigUpdate
|
||||||
|
// does not reject the update for changing an immutable field.
|
||||||
|
var retention = RetentionPolicy.Limits;
|
||||||
|
if (_streamManager.TryGet(name, out var existing))
|
||||||
|
retention = existing.Config.Retention;
|
||||||
|
|
||||||
|
return _streamManager.CreateOrUpdate(new StreamConfig
|
||||||
{
|
{
|
||||||
Name = name,
|
Name = name,
|
||||||
Subjects = [.. subjects],
|
Subjects = [.. subjects],
|
||||||
Replicas = replicas,
|
Replicas = replicas,
|
||||||
MaxMsgs = maxMsgs,
|
MaxMsgs = maxMsgs,
|
||||||
|
Retention = retention,
|
||||||
});
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Returns the full stream info response.
|
/// Returns the full stream info response.
|
||||||
|
|||||||
Reference in New Issue
Block a user