From 5e49006cfa7ea21c270b0abbf20d98f16754ba99 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 25 Feb 2026 11:25:38 -0500 Subject: [PATCH] feat: add stream config update validation (Gap 4.8) Add ValidateConfigUpdate to StreamManager with immutability rules for storage type, mirror, sources, and retention policy; sealed stream guard; MaxConsumers decrease prevention; even-replica rejection; and subject overlap detection against peer streams. Wire the check into CreateOrUpdate for all update paths. 12 new tests in ConfigUpdateValidationTests.cs cover all rules including the StreamManager integration test. --- src/NATS.Server/JetStream/StreamManager.cs | 133 +++++++ .../Streams/ConfigUpdateValidationTests.cs | 331 ++++++++++++++++++ 2 files changed, 464 insertions(+) create mode 100644 tests/NATS.Server.Tests/JetStream/Streams/ConfigUpdateValidationTests.cs diff --git a/src/NATS.Server/JetStream/StreamManager.cs b/src/NATS.Server/JetStream/StreamManager.cs index 987933b..671d5a2 100644 --- a/src/NATS.Server/JetStream/StreamManager.cs +++ b/src/NATS.Server/JetStream/StreamManager.cs @@ -83,6 +83,18 @@ public sealed class StreamManager if (isCreate && _account is not null && !_account.TryReserveStream()) return JetStreamApiResponse.ErrorResponse(10027, "maximum streams exceeded"); + // Go: stream.go:update — validate immutable fields on update. + // Reference: server/stream.go:1500-1600 (stream.update) + if (!isCreate && _streams.TryGetValue(normalized.Name, out var existingHandle)) + { + var otherStreams = _streams.Values + .Where(s => !string.Equals(s.Config.Name, normalized.Name, StringComparison.Ordinal)) + .Select(s => s.Config); + var updateErrors = ValidateConfigUpdate(existingHandle.Config, normalized, otherStreams); + if (updateErrors.Count > 0) + return JetStreamApiResponse.ErrorResponse(400, updateErrors[0]); + } + var handle = _streams.AddOrUpdate( normalized.Name, _ => new StreamHandle(normalized, CreateStore(normalized)), @@ -462,6 +474,86 @@ public sealed class StreamManager return null; } + /// + /// Validates that is a legal update of . + /// Returns an empty list when the update is valid; otherwise returns one or more error strings. + /// The parameter is used to detect subject overlap with peer streams. + /// Go reference: server/stream.go:1500-1600 (stream.update immutable-field checks). + /// + public static IReadOnlyList ValidateConfigUpdate( + StreamConfig existing, + StreamConfig proposed, + IEnumerable? otherStreams = null) + { + List errors = []; + + // Sealed streams reject all modifications. + if (existing.Sealed) + { + errors.Add("sealed stream cannot be modified"); + return errors; + } + + // Storage type is immutable. + if (existing.Storage != proposed.Storage) + errors.Add("storage type cannot be changed"); + + // Mirror is immutable — if the existing stream has a mirror, the proposed must keep it. + if (!string.IsNullOrWhiteSpace(existing.Mirror) + && !string.Equals(existing.Mirror, proposed.Mirror, StringComparison.Ordinal)) + { + errors.Add("mirror configuration cannot be changed"); + } + + // Sources are immutable after creation — the set of source names must be unchanged. + if (existing.Sources.Count > 0) + { + var existingNames = existing.Sources.Select(s => s.Name).OrderBy(n => n, StringComparer.Ordinal).ToList(); + var proposedNames = proposed.Sources.Select(s => s.Name).OrderBy(n => n, StringComparer.Ordinal).ToList(); + if (!existingNames.SequenceEqual(proposedNames, StringComparer.Ordinal)) + errors.Add("sources cannot be changed after creation"); + } + + // Retention policy is immutable. + if (existing.Retention != proposed.Retention) + errors.Add("retention policy cannot be changed"); + + // MaxConsumers may only be increased (or left unlimited). + if (existing.MaxConsumers > 0 && proposed.MaxConsumers > 0 + && proposed.MaxConsumers < existing.MaxConsumers) + { + errors.Add("max consumers can only be increased"); + } + + // Replicas must be odd (for RAFT consensus). + if (proposed.Replicas > 1 && proposed.Replicas % 2 == 0) + errors.Add("replicas must be odd for RAFT consensus"); + + // Subject overlap detection with peer streams. + if (otherStreams is not null && proposed.Subjects.Count > 0) + { + foreach (var otherStream in otherStreams) + { + foreach (var proposed_subj in proposed.Subjects) + { + foreach (var other_subj in otherStream.Subjects) + { + if (SubjectMatch.MatchLiteral(proposed_subj, other_subj) + || SubjectMatch.MatchLiteral(other_subj, proposed_subj) + || SubjectMatch.SubjectsCollide(proposed_subj, other_subj)) + { + errors.Add($"subjects overlap with stream '{otherStream.Name}'"); + goto nextStream; + } + } + } + nextStream:; + } + } + + return errors; + } + private static JetStreamApiResponse BuildStreamInfoResponse(StreamHandle handle) { var state = handle.Store.GetStateAsync(default).GetAwaiter().GetResult(); @@ -630,6 +722,47 @@ public sealed class StreamManager }; } + /// + /// Returns mirror monitoring info for the given stream, or null if the stream does not exist + /// or is not configured as a mirror. + /// Go reference: server/stream.go:2739-2743 (mirrorInfo) + /// + public MirrorInfoResponse? GetMirrorInfo(string streamName) + { + if (!_streams.TryGetValue(streamName, out var stream)) + return null; + + if (string.IsNullOrWhiteSpace(stream.Config.Mirror)) + return null; + + if (!_mirrorsByOrigin.TryGetValue(stream.Config.Mirror, out var coordinators)) + return null; + + var first = coordinators.Count > 0 ? coordinators[0] : null; + return first?.GetMirrorInfo(streamName); + } + + /// + /// Returns source monitoring info for all sources configured on the given stream. + /// Returns an empty array when the stream does not exist or has no sources. + /// Go reference: server/stream.go:2687-2695 (sourcesInfo) + /// + public SourceInfoResponse[] GetSourceInfos(string streamName) + { + if (!_streams.TryGetValue(streamName, out _)) + return []; + + var results = new List(); + + foreach (var (_, coordinators) in _sourcesByOrigin) + { + foreach (var coord in coordinators) + results.Add(coord.GetSourceInfo()); + } + + return [.. results]; + } + private static IStreamStore CreateStore(StreamConfig config) { return config.Storage switch diff --git a/tests/NATS.Server.Tests/JetStream/Streams/ConfigUpdateValidationTests.cs b/tests/NATS.Server.Tests/JetStream/Streams/ConfigUpdateValidationTests.cs new file mode 100644 index 0000000..10b484a --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Streams/ConfigUpdateValidationTests.cs @@ -0,0 +1,331 @@ +// Ported from golang/nats-server/server/jetstream_test.go +// Go reference: server/stream.go:1500-1600 (stream.update immutable field validation) +// Covers: TestJetStreamStreamUpdate, TestJetStreamStreamUpdateMaxConsumers + +using NATS.Server.JetStream; +using NATS.Server.JetStream.Api; +using NATS.Server.JetStream.Models; +using Shouldly; + +namespace NATS.Server.Tests.JetStream.Streams; + +public class ConfigUpdateValidationTests +{ + // Go ref: server/stream.go:1500-1600 (stream.update) + // A valid update that only changes mutable fields (MaxMsgs) should produce no errors. + [Fact] + public void ValidateConfigUpdate_allows_valid_changes() + { + var existing = new StreamConfig + { + Name = "ORDERS", + Storage = StorageType.Memory, + Retention = RetentionPolicy.Limits, + Subjects = ["orders.*"], + MaxMsgs = 100, + }; + var proposed = new StreamConfig + { + Name = "ORDERS", + Storage = StorageType.Memory, + Retention = RetentionPolicy.Limits, + Subjects = ["orders.*"], + MaxMsgs = 500, + }; + + var errors = StreamManager.ValidateConfigUpdate(existing, proposed); + + errors.ShouldBeEmpty(); + } + + // Go ref: server/stream.go:1511-1513 (storage type immutability check) + // Changing storage type from Memory to File must be rejected. + [Fact] + public void ValidateConfigUpdate_rejects_storage_type_change() + { + var existing = new StreamConfig + { + Name = "ORDERS", + Storage = StorageType.Memory, + Subjects = ["orders.*"], + }; + var proposed = new StreamConfig + { + Name = "ORDERS", + Storage = StorageType.File, + Subjects = ["orders.*"], + }; + + var errors = StreamManager.ValidateConfigUpdate(existing, proposed); + + errors.ShouldContain(e => e.Contains("storage type")); + } + + // Go ref: server/stream.go:1530-1535 (mirror immutability) + // Changing the mirror origin must be rejected. + [Fact] + public void ValidateConfigUpdate_rejects_mirror_change() + { + var existing = new StreamConfig + { + Name = "MIRROR_STREAM", + Storage = StorageType.Memory, + Mirror = "ORIGIN_A", + }; + var proposed = new StreamConfig + { + Name = "MIRROR_STREAM", + Storage = StorageType.Memory, + Mirror = "ORIGIN_B", + }; + + var errors = StreamManager.ValidateConfigUpdate(existing, proposed); + + errors.ShouldContain(e => e.Contains("mirror configuration")); + } + + // Go ref: server/stream.go:1520-1525 (retention policy immutability) + // Changing the retention policy must be rejected. + [Fact] + public void ValidateConfigUpdate_rejects_retention_change() + { + var existing = new StreamConfig + { + Name = "ORDERS", + Storage = StorageType.Memory, + Retention = RetentionPolicy.Limits, + Subjects = ["orders.*"], + }; + var proposed = new StreamConfig + { + Name = "ORDERS", + Storage = StorageType.Memory, + Retention = RetentionPolicy.WorkQueue, + Subjects = ["orders.*"], + }; + + var errors = StreamManager.ValidateConfigUpdate(existing, proposed); + + errors.ShouldContain(e => e.Contains("retention policy")); + } + + // Go ref: server/stream.go:1500-1502 (sealed stream guard) + // Any modification attempt on a sealed stream must be rejected. + [Fact] + public void ValidateConfigUpdate_rejects_sealed_stream_changes() + { + var existing = new StreamConfig + { + Name = "SEALED", + Storage = StorageType.Memory, + Sealed = true, + Subjects = ["sealed.*"], + }; + var proposed = new StreamConfig + { + Name = "SEALED", + Storage = StorageType.Memory, + Sealed = true, + Subjects = ["sealed.new.*"], + }; + + var errors = StreamManager.ValidateConfigUpdate(existing, proposed); + + errors.ShouldContain(e => e.Contains("sealed stream")); + } + + // Go ref: server/stream.go:1537-1542 (sources immutability) + // Changing the sources list after creation must be rejected. + [Fact] + public void ValidateConfigUpdate_rejects_source_change() + { + var existing = new StreamConfig + { + Name = "AGG", + Storage = StorageType.Memory, + Sources = + [ + new StreamSourceConfig { Name = "SRC_A" }, + new StreamSourceConfig { Name = "SRC_B" }, + ], + }; + var proposed = new StreamConfig + { + Name = "AGG", + Storage = StorageType.Memory, + Sources = + [ + new StreamSourceConfig { Name = "SRC_A" }, + new StreamSourceConfig { Name = "SRC_C" }, + ], + }; + + var errors = StreamManager.ValidateConfigUpdate(existing, proposed); + + errors.ShouldContain(e => e.Contains("sources cannot be changed")); + } + + // Go ref: server/jetstream.go — subject overlap detection between streams. + // Proposing subjects that collide with another stream's subjects must be rejected. + [Fact] + public void ValidateConfigUpdate_detects_subject_overlap() + { + var existing = new StreamConfig + { + Name = "ORDERS", + Storage = StorageType.Memory, + Subjects = ["orders.*"], + }; + var proposed = new StreamConfig + { + Name = "ORDERS", + Storage = StorageType.Memory, + Subjects = ["orders.>"], + }; + var otherStreams = new[] + { + new StreamConfig + { + Name = "ARCHIVE", + Storage = StorageType.Memory, + Subjects = ["orders.archived"], + }, + }; + + var errors = StreamManager.ValidateConfigUpdate(existing, proposed, otherStreams); + + errors.ShouldContain(e => e.Contains("ARCHIVE")); + } + + // Go ref: server/jetstream.go — no error for non-overlapping subject sets. + // Proposing subjects that do not overlap with other streams must succeed. + [Fact] + public void ValidateConfigUpdate_allows_non_overlapping_subjects() + { + var existing = new StreamConfig + { + Name = "ORDERS", + Storage = StorageType.Memory, + Subjects = ["orders.*"], + }; + var proposed = new StreamConfig + { + Name = "ORDERS", + Storage = StorageType.Memory, + Subjects = ["orders.>"], + }; + var otherStreams = new[] + { + new StreamConfig + { + Name = "EVENTS", + Storage = StorageType.Memory, + Subjects = ["events.*"], + }, + }; + + var errors = StreamManager.ValidateConfigUpdate(existing, proposed, otherStreams); + + errors.ShouldBeEmpty(); + } + + // Go ref: server/stream.go — MaxConsumers may not be decreased. + // Decreasing MaxConsumers from a positive value must be rejected. + [Fact] + public void ValidateConfigUpdate_rejects_max_consumers_decrease() + { + var existing = new StreamConfig + { + Name = "ORDERS", + Storage = StorageType.Memory, + Subjects = ["orders.*"], + MaxConsumers = 10, + }; + var proposed = new StreamConfig + { + Name = "ORDERS", + Storage = StorageType.Memory, + Subjects = ["orders.*"], + MaxConsumers = 5, + }; + + var errors = StreamManager.ValidateConfigUpdate(existing, proposed); + + errors.ShouldContain(e => e.Contains("max consumers can only be increased")); + } + + // Go ref: server/stream.go — MaxConsumers may be raised without restriction. + [Fact] + public void ValidateConfigUpdate_allows_max_consumers_increase() + { + var existing = new StreamConfig + { + Name = "ORDERS", + Storage = StorageType.Memory, + Subjects = ["orders.*"], + MaxConsumers = 5, + }; + var proposed = new StreamConfig + { + Name = "ORDERS", + Storage = StorageType.Memory, + Subjects = ["orders.*"], + MaxConsumers = 20, + }; + + var errors = StreamManager.ValidateConfigUpdate(existing, proposed); + + errors.ShouldBeEmpty(); + } + + // Go ref: server/stream.go — RAFT consensus requires an odd number of replicas. + // Setting replicas to an even number must be rejected. + [Fact] + public void ValidateConfigUpdate_rejects_even_replicas() + { + var existing = new StreamConfig + { + Name = "ORDERS", + Storage = StorageType.Memory, + Subjects = ["orders.*"], + Replicas = 1, + }; + var proposed = new StreamConfig + { + Name = "ORDERS", + Storage = StorageType.Memory, + Subjects = ["orders.*"], + Replicas = 2, + }; + + var errors = StreamManager.ValidateConfigUpdate(existing, proposed); + + errors.ShouldContain(e => e.Contains("replicas must be odd")); + } + + // Go ref: server/stream.go:1500-1600 (stream.update) — integration via StreamManager. + // CreateOrUpdate must reject an update that changes storage type. + [Fact] + public void CreateOrUpdate_rejects_invalid_config_update() + { + var manager = new StreamManager(); + + var createResult = manager.CreateOrUpdate(new StreamConfig + { + Name = "EVENTS", + Storage = StorageType.Memory, + Subjects = ["events.*"], + }); + createResult.Error.ShouldBeNull(); + + var updateResult = manager.CreateOrUpdate(new StreamConfig + { + Name = "EVENTS", + Storage = StorageType.File, + Subjects = ["events.*"], + }); + + updateResult.Error.ShouldNotBeNull(); + updateResult.Error!.Description.ShouldContain("storage type"); + } +}