feat: add stream config update validation (Gap 4.8)

Add ValidateConfigUpdate to StreamManager with immutability rules for storage type,
mirror, sources, and retention policy; sealed stream guard; MaxConsumers decrease
prevention; even-replica rejection; and subject overlap detection against peer streams.
Wire the check into CreateOrUpdate for all update paths. 12 new tests in
ConfigUpdateValidationTests.cs cover all rules including the StreamManager integration test.
This commit is contained in:
Joseph Doherty
2026-02-25 11:25:38 -05:00
parent 79a3ccba4c
commit 5e49006cfa
2 changed files with 464 additions and 0 deletions

View File

@@ -83,6 +83,18 @@ public sealed class StreamManager
if (isCreate && _account is not null && !_account.TryReserveStream())
return JetStreamApiResponse.ErrorResponse(10027, "maximum streams exceeded");
// Go: stream.go:update — validate immutable fields on update.
// Reference: server/stream.go:1500-1600 (stream.update)
if (!isCreate && _streams.TryGetValue(normalized.Name, out var existingHandle))
{
var otherStreams = _streams.Values
.Where(s => !string.Equals(s.Config.Name, normalized.Name, StringComparison.Ordinal))
.Select(s => s.Config);
var updateErrors = ValidateConfigUpdate(existingHandle.Config, normalized, otherStreams);
if (updateErrors.Count > 0)
return JetStreamApiResponse.ErrorResponse(400, updateErrors[0]);
}
var handle = _streams.AddOrUpdate(
normalized.Name,
_ => new StreamHandle(normalized, CreateStore(normalized)),
@@ -462,6 +474,86 @@ public sealed class StreamManager
return null;
}
/// <summary>
/// Validates that <paramref name="proposed"/> is a legal update of <paramref name="existing"/>.
/// Returns an empty list when the update is valid; otherwise returns one or more error strings.
/// The <paramref name="otherStreams"/> parameter is used to detect subject overlap with peer streams.
/// Go reference: server/stream.go:1500-1600 (stream.update immutable-field checks).
/// </summary>
public static IReadOnlyList<string> ValidateConfigUpdate(
StreamConfig existing,
StreamConfig proposed,
IEnumerable<StreamConfig>? otherStreams = null)
{
List<string> errors = [];
// Sealed streams reject all modifications.
if (existing.Sealed)
{
errors.Add("sealed stream cannot be modified");
return errors;
}
// Storage type is immutable.
if (existing.Storage != proposed.Storage)
errors.Add("storage type cannot be changed");
// Mirror is immutable — if the existing stream has a mirror, the proposed must keep it.
if (!string.IsNullOrWhiteSpace(existing.Mirror)
&& !string.Equals(existing.Mirror, proposed.Mirror, StringComparison.Ordinal))
{
errors.Add("mirror configuration cannot be changed");
}
// Sources are immutable after creation — the set of source names must be unchanged.
if (existing.Sources.Count > 0)
{
var existingNames = existing.Sources.Select(s => s.Name).OrderBy(n => n, StringComparer.Ordinal).ToList();
var proposedNames = proposed.Sources.Select(s => s.Name).OrderBy(n => n, StringComparer.Ordinal).ToList();
if (!existingNames.SequenceEqual(proposedNames, StringComparer.Ordinal))
errors.Add("sources cannot be changed after creation");
}
// Retention policy is immutable.
if (existing.Retention != proposed.Retention)
errors.Add("retention policy cannot be changed");
// MaxConsumers may only be increased (or left unlimited).
if (existing.MaxConsumers > 0 && proposed.MaxConsumers > 0
&& proposed.MaxConsumers < existing.MaxConsumers)
{
errors.Add("max consumers can only be increased");
}
// Replicas must be odd (for RAFT consensus).
if (proposed.Replicas > 1 && proposed.Replicas % 2 == 0)
errors.Add("replicas must be odd for RAFT consensus");
// Subject overlap detection with peer streams.
if (otherStreams is not null && proposed.Subjects.Count > 0)
{
foreach (var otherStream in otherStreams)
{
foreach (var proposed_subj in proposed.Subjects)
{
foreach (var other_subj in otherStream.Subjects)
{
if (SubjectMatch.MatchLiteral(proposed_subj, other_subj)
|| SubjectMatch.MatchLiteral(other_subj, proposed_subj)
|| SubjectMatch.SubjectsCollide(proposed_subj, other_subj))
{
errors.Add($"subjects overlap with stream '{otherStream.Name}'");
goto nextStream;
}
}
}
nextStream:;
}
}
return errors;
}
private static JetStreamApiResponse BuildStreamInfoResponse(StreamHandle handle)
{
var state = handle.Store.GetStateAsync(default).GetAwaiter().GetResult();
@@ -630,6 +722,47 @@ public sealed class StreamManager
};
}
/// <summary>
/// Returns mirror monitoring info for the given stream, or null if the stream does not exist
/// or is not configured as a mirror.
/// Go reference: server/stream.go:2739-2743 (mirrorInfo)
/// </summary>
public MirrorInfoResponse? GetMirrorInfo(string streamName)
{
if (!_streams.TryGetValue(streamName, out var stream))
return null;
if (string.IsNullOrWhiteSpace(stream.Config.Mirror))
return null;
if (!_mirrorsByOrigin.TryGetValue(stream.Config.Mirror, out var coordinators))
return null;
var first = coordinators.Count > 0 ? coordinators[0] : null;
return first?.GetMirrorInfo(streamName);
}
/// <summary>
/// Returns source monitoring info for all sources configured on the given stream.
/// Returns an empty array when the stream does not exist or has no sources.
/// Go reference: server/stream.go:2687-2695 (sourcesInfo)
/// </summary>
public SourceInfoResponse[] GetSourceInfos(string streamName)
{
if (!_streams.TryGetValue(streamName, out _))
return [];
var results = new List<SourceInfoResponse>();
foreach (var (_, coordinators) in _sourcesByOrigin)
{
foreach (var coord in coordinators)
results.Add(coord.GetSourceInfo());
}
return [.. results];
}
private static IStreamStore CreateStore(StreamConfig config)
{
return config.Storage switch

View File

@@ -0,0 +1,331 @@
// Ported from golang/nats-server/server/jetstream_test.go
// Go reference: server/stream.go:1500-1600 (stream.update immutable field validation)
// Covers: TestJetStreamStreamUpdate, TestJetStreamStreamUpdateMaxConsumers
using NATS.Server.JetStream;
using NATS.Server.JetStream.Api;
using NATS.Server.JetStream.Models;
using Shouldly;
namespace NATS.Server.Tests.JetStream.Streams;
public class ConfigUpdateValidationTests
{
// Go ref: server/stream.go:1500-1600 (stream.update)
// A valid update that only changes mutable fields (MaxMsgs) should produce no errors.
[Fact]
public void ValidateConfigUpdate_allows_valid_changes()
{
var existing = new StreamConfig
{
Name = "ORDERS",
Storage = StorageType.Memory,
Retention = RetentionPolicy.Limits,
Subjects = ["orders.*"],
MaxMsgs = 100,
};
var proposed = new StreamConfig
{
Name = "ORDERS",
Storage = StorageType.Memory,
Retention = RetentionPolicy.Limits,
Subjects = ["orders.*"],
MaxMsgs = 500,
};
var errors = StreamManager.ValidateConfigUpdate(existing, proposed);
errors.ShouldBeEmpty();
}
// Go ref: server/stream.go:1511-1513 (storage type immutability check)
// Changing storage type from Memory to File must be rejected.
[Fact]
public void ValidateConfigUpdate_rejects_storage_type_change()
{
var existing = new StreamConfig
{
Name = "ORDERS",
Storage = StorageType.Memory,
Subjects = ["orders.*"],
};
var proposed = new StreamConfig
{
Name = "ORDERS",
Storage = StorageType.File,
Subjects = ["orders.*"],
};
var errors = StreamManager.ValidateConfigUpdate(existing, proposed);
errors.ShouldContain(e => e.Contains("storage type"));
}
// Go ref: server/stream.go:1530-1535 (mirror immutability)
// Changing the mirror origin must be rejected.
[Fact]
public void ValidateConfigUpdate_rejects_mirror_change()
{
var existing = new StreamConfig
{
Name = "MIRROR_STREAM",
Storage = StorageType.Memory,
Mirror = "ORIGIN_A",
};
var proposed = new StreamConfig
{
Name = "MIRROR_STREAM",
Storage = StorageType.Memory,
Mirror = "ORIGIN_B",
};
var errors = StreamManager.ValidateConfigUpdate(existing, proposed);
errors.ShouldContain(e => e.Contains("mirror configuration"));
}
// Go ref: server/stream.go:1520-1525 (retention policy immutability)
// Changing the retention policy must be rejected.
[Fact]
public void ValidateConfigUpdate_rejects_retention_change()
{
var existing = new StreamConfig
{
Name = "ORDERS",
Storage = StorageType.Memory,
Retention = RetentionPolicy.Limits,
Subjects = ["orders.*"],
};
var proposed = new StreamConfig
{
Name = "ORDERS",
Storage = StorageType.Memory,
Retention = RetentionPolicy.WorkQueue,
Subjects = ["orders.*"],
};
var errors = StreamManager.ValidateConfigUpdate(existing, proposed);
errors.ShouldContain(e => e.Contains("retention policy"));
}
// Go ref: server/stream.go:1500-1502 (sealed stream guard)
// Any modification attempt on a sealed stream must be rejected.
[Fact]
public void ValidateConfigUpdate_rejects_sealed_stream_changes()
{
var existing = new StreamConfig
{
Name = "SEALED",
Storage = StorageType.Memory,
Sealed = true,
Subjects = ["sealed.*"],
};
var proposed = new StreamConfig
{
Name = "SEALED",
Storage = StorageType.Memory,
Sealed = true,
Subjects = ["sealed.new.*"],
};
var errors = StreamManager.ValidateConfigUpdate(existing, proposed);
errors.ShouldContain(e => e.Contains("sealed stream"));
}
// Go ref: server/stream.go:1537-1542 (sources immutability)
// Changing the sources list after creation must be rejected.
[Fact]
public void ValidateConfigUpdate_rejects_source_change()
{
var existing = new StreamConfig
{
Name = "AGG",
Storage = StorageType.Memory,
Sources =
[
new StreamSourceConfig { Name = "SRC_A" },
new StreamSourceConfig { Name = "SRC_B" },
],
};
var proposed = new StreamConfig
{
Name = "AGG",
Storage = StorageType.Memory,
Sources =
[
new StreamSourceConfig { Name = "SRC_A" },
new StreamSourceConfig { Name = "SRC_C" },
],
};
var errors = StreamManager.ValidateConfigUpdate(existing, proposed);
errors.ShouldContain(e => e.Contains("sources cannot be changed"));
}
// Go ref: server/jetstream.go — subject overlap detection between streams.
// Proposing subjects that collide with another stream's subjects must be rejected.
[Fact]
public void ValidateConfigUpdate_detects_subject_overlap()
{
var existing = new StreamConfig
{
Name = "ORDERS",
Storage = StorageType.Memory,
Subjects = ["orders.*"],
};
var proposed = new StreamConfig
{
Name = "ORDERS",
Storage = StorageType.Memory,
Subjects = ["orders.>"],
};
var otherStreams = new[]
{
new StreamConfig
{
Name = "ARCHIVE",
Storage = StorageType.Memory,
Subjects = ["orders.archived"],
},
};
var errors = StreamManager.ValidateConfigUpdate(existing, proposed, otherStreams);
errors.ShouldContain(e => e.Contains("ARCHIVE"));
}
// Go ref: server/jetstream.go — no error for non-overlapping subject sets.
// Proposing subjects that do not overlap with other streams must succeed.
[Fact]
public void ValidateConfigUpdate_allows_non_overlapping_subjects()
{
var existing = new StreamConfig
{
Name = "ORDERS",
Storage = StorageType.Memory,
Subjects = ["orders.*"],
};
var proposed = new StreamConfig
{
Name = "ORDERS",
Storage = StorageType.Memory,
Subjects = ["orders.>"],
};
var otherStreams = new[]
{
new StreamConfig
{
Name = "EVENTS",
Storage = StorageType.Memory,
Subjects = ["events.*"],
},
};
var errors = StreamManager.ValidateConfigUpdate(existing, proposed, otherStreams);
errors.ShouldBeEmpty();
}
// Go ref: server/stream.go — MaxConsumers may not be decreased.
// Decreasing MaxConsumers from a positive value must be rejected.
[Fact]
public void ValidateConfigUpdate_rejects_max_consumers_decrease()
{
var existing = new StreamConfig
{
Name = "ORDERS",
Storage = StorageType.Memory,
Subjects = ["orders.*"],
MaxConsumers = 10,
};
var proposed = new StreamConfig
{
Name = "ORDERS",
Storage = StorageType.Memory,
Subjects = ["orders.*"],
MaxConsumers = 5,
};
var errors = StreamManager.ValidateConfigUpdate(existing, proposed);
errors.ShouldContain(e => e.Contains("max consumers can only be increased"));
}
// Go ref: server/stream.go — MaxConsumers may be raised without restriction.
[Fact]
public void ValidateConfigUpdate_allows_max_consumers_increase()
{
var existing = new StreamConfig
{
Name = "ORDERS",
Storage = StorageType.Memory,
Subjects = ["orders.*"],
MaxConsumers = 5,
};
var proposed = new StreamConfig
{
Name = "ORDERS",
Storage = StorageType.Memory,
Subjects = ["orders.*"],
MaxConsumers = 20,
};
var errors = StreamManager.ValidateConfigUpdate(existing, proposed);
errors.ShouldBeEmpty();
}
// Go ref: server/stream.go — RAFT consensus requires an odd number of replicas.
// Setting replicas to an even number must be rejected.
[Fact]
public void ValidateConfigUpdate_rejects_even_replicas()
{
var existing = new StreamConfig
{
Name = "ORDERS",
Storage = StorageType.Memory,
Subjects = ["orders.*"],
Replicas = 1,
};
var proposed = new StreamConfig
{
Name = "ORDERS",
Storage = StorageType.Memory,
Subjects = ["orders.*"],
Replicas = 2,
};
var errors = StreamManager.ValidateConfigUpdate(existing, proposed);
errors.ShouldContain(e => e.Contains("replicas must be odd"));
}
// Go ref: server/stream.go:1500-1600 (stream.update) — integration via StreamManager.
// CreateOrUpdate must reject an update that changes storage type.
[Fact]
public void CreateOrUpdate_rejects_invalid_config_update()
{
var manager = new StreamManager();
var createResult = manager.CreateOrUpdate(new StreamConfig
{
Name = "EVENTS",
Storage = StorageType.Memory,
Subjects = ["events.*"],
});
createResult.Error.ShouldBeNull();
var updateResult = manager.CreateOrUpdate(new StreamConfig
{
Name = "EVENTS",
Storage = StorageType.File,
Subjects = ["events.*"],
});
updateResult.Error.ShouldNotBeNull();
updateResult.Error!.Description.ShouldContain("storage type");
}
}