diff --git a/src/NATS.Server/JetStream/StreamManager.cs b/src/NATS.Server/JetStream/StreamManager.cs
index 987933b..671d5a2 100644
--- a/src/NATS.Server/JetStream/StreamManager.cs
+++ b/src/NATS.Server/JetStream/StreamManager.cs
@@ -83,6 +83,18 @@ public sealed class StreamManager
if (isCreate && _account is not null && !_account.TryReserveStream())
return JetStreamApiResponse.ErrorResponse(10027, "maximum streams exceeded");
+ // Go: stream.go:update — validate immutable fields on update.
+ // Reference: server/stream.go:1500-1600 (stream.update)
+ if (!isCreate && _streams.TryGetValue(normalized.Name, out var existingHandle))
+ {
+ var otherStreams = _streams.Values
+ .Where(s => !string.Equals(s.Config.Name, normalized.Name, StringComparison.Ordinal))
+ .Select(s => s.Config);
+ var updateErrors = ValidateConfigUpdate(existingHandle.Config, normalized, otherStreams);
+ if (updateErrors.Count > 0)
+ return JetStreamApiResponse.ErrorResponse(400, updateErrors[0]);
+ }
+
var handle = _streams.AddOrUpdate(
normalized.Name,
_ => new StreamHandle(normalized, CreateStore(normalized)),
@@ -462,6 +474,86 @@ public sealed class StreamManager
return null;
}
+ ///
+ /// Validates that is a legal update of .
+ /// Returns an empty list when the update is valid; otherwise returns one or more error strings.
+ /// The parameter is used to detect subject overlap with peer streams.
+ /// Go reference: server/stream.go:1500-1600 (stream.update immutable-field checks).
+ ///
+ public static IReadOnlyList ValidateConfigUpdate(
+ StreamConfig existing,
+ StreamConfig proposed,
+ IEnumerable? otherStreams = null)
+ {
+ List errors = [];
+
+ // Sealed streams reject all modifications.
+ if (existing.Sealed)
+ {
+ errors.Add("sealed stream cannot be modified");
+ return errors;
+ }
+
+ // Storage type is immutable.
+ if (existing.Storage != proposed.Storage)
+ errors.Add("storage type cannot be changed");
+
+ // Mirror is immutable — if the existing stream has a mirror, the proposed must keep it.
+ if (!string.IsNullOrWhiteSpace(existing.Mirror)
+ && !string.Equals(existing.Mirror, proposed.Mirror, StringComparison.Ordinal))
+ {
+ errors.Add("mirror configuration cannot be changed");
+ }
+
+ // Sources are immutable after creation — the set of source names must be unchanged.
+ if (existing.Sources.Count > 0)
+ {
+ var existingNames = existing.Sources.Select(s => s.Name).OrderBy(n => n, StringComparer.Ordinal).ToList();
+ var proposedNames = proposed.Sources.Select(s => s.Name).OrderBy(n => n, StringComparer.Ordinal).ToList();
+ if (!existingNames.SequenceEqual(proposedNames, StringComparer.Ordinal))
+ errors.Add("sources cannot be changed after creation");
+ }
+
+ // Retention policy is immutable.
+ if (existing.Retention != proposed.Retention)
+ errors.Add("retention policy cannot be changed");
+
+ // MaxConsumers may only be increased (or left unlimited).
+ if (existing.MaxConsumers > 0 && proposed.MaxConsumers > 0
+ && proposed.MaxConsumers < existing.MaxConsumers)
+ {
+ errors.Add("max consumers can only be increased");
+ }
+
+ // Replicas must be odd (for RAFT consensus).
+ if (proposed.Replicas > 1 && proposed.Replicas % 2 == 0)
+ errors.Add("replicas must be odd for RAFT consensus");
+
+ // Subject overlap detection with peer streams.
+ if (otherStreams is not null && proposed.Subjects.Count > 0)
+ {
+ foreach (var otherStream in otherStreams)
+ {
+ foreach (var proposed_subj in proposed.Subjects)
+ {
+ foreach (var other_subj in otherStream.Subjects)
+ {
+ if (SubjectMatch.MatchLiteral(proposed_subj, other_subj)
+ || SubjectMatch.MatchLiteral(other_subj, proposed_subj)
+ || SubjectMatch.SubjectsCollide(proposed_subj, other_subj))
+ {
+ errors.Add($"subjects overlap with stream '{otherStream.Name}'");
+ goto nextStream;
+ }
+ }
+ }
+ nextStream:;
+ }
+ }
+
+ return errors;
+ }
+
private static JetStreamApiResponse BuildStreamInfoResponse(StreamHandle handle)
{
var state = handle.Store.GetStateAsync(default).GetAwaiter().GetResult();
@@ -630,6 +722,47 @@ public sealed class StreamManager
};
}
+ ///
+ /// Returns mirror monitoring info for the given stream, or null if the stream does not exist
+ /// or is not configured as a mirror.
+ /// Go reference: server/stream.go:2739-2743 (mirrorInfo)
+ ///
+ public MirrorInfoResponse? GetMirrorInfo(string streamName)
+ {
+ if (!_streams.TryGetValue(streamName, out var stream))
+ return null;
+
+ if (string.IsNullOrWhiteSpace(stream.Config.Mirror))
+ return null;
+
+ if (!_mirrorsByOrigin.TryGetValue(stream.Config.Mirror, out var coordinators))
+ return null;
+
+ var first = coordinators.Count > 0 ? coordinators[0] : null;
+ return first?.GetMirrorInfo(streamName);
+ }
+
+ ///
+ /// Returns source monitoring info for all sources configured on the given stream.
+ /// Returns an empty array when the stream does not exist or has no sources.
+ /// Go reference: server/stream.go:2687-2695 (sourcesInfo)
+ ///
+ public SourceInfoResponse[] GetSourceInfos(string streamName)
+ {
+ if (!_streams.TryGetValue(streamName, out _))
+ return [];
+
+ var results = new List();
+
+ foreach (var (_, coordinators) in _sourcesByOrigin)
+ {
+ foreach (var coord in coordinators)
+ results.Add(coord.GetSourceInfo());
+ }
+
+ return [.. results];
+ }
+
private static IStreamStore CreateStore(StreamConfig config)
{
return config.Storage switch
diff --git a/tests/NATS.Server.Tests/JetStream/Streams/ConfigUpdateValidationTests.cs b/tests/NATS.Server.Tests/JetStream/Streams/ConfigUpdateValidationTests.cs
new file mode 100644
index 0000000..10b484a
--- /dev/null
+++ b/tests/NATS.Server.Tests/JetStream/Streams/ConfigUpdateValidationTests.cs
@@ -0,0 +1,331 @@
+// Ported from golang/nats-server/server/jetstream_test.go
+// Go reference: server/stream.go:1500-1600 (stream.update immutable field validation)
+// Covers: TestJetStreamStreamUpdate, TestJetStreamStreamUpdateMaxConsumers
+
+using NATS.Server.JetStream;
+using NATS.Server.JetStream.Api;
+using NATS.Server.JetStream.Models;
+using Shouldly;
+
+namespace NATS.Server.Tests.JetStream.Streams;
+
+public class ConfigUpdateValidationTests
+{
+ // Go ref: server/stream.go:1500-1600 (stream.update)
+ // A valid update that only changes mutable fields (MaxMsgs) should produce no errors.
+ [Fact]
+ public void ValidateConfigUpdate_allows_valid_changes()
+ {
+ var existing = new StreamConfig
+ {
+ Name = "ORDERS",
+ Storage = StorageType.Memory,
+ Retention = RetentionPolicy.Limits,
+ Subjects = ["orders.*"],
+ MaxMsgs = 100,
+ };
+ var proposed = new StreamConfig
+ {
+ Name = "ORDERS",
+ Storage = StorageType.Memory,
+ Retention = RetentionPolicy.Limits,
+ Subjects = ["orders.*"],
+ MaxMsgs = 500,
+ };
+
+ var errors = StreamManager.ValidateConfigUpdate(existing, proposed);
+
+ errors.ShouldBeEmpty();
+ }
+
+ // Go ref: server/stream.go:1511-1513 (storage type immutability check)
+ // Changing storage type from Memory to File must be rejected.
+ [Fact]
+ public void ValidateConfigUpdate_rejects_storage_type_change()
+ {
+ var existing = new StreamConfig
+ {
+ Name = "ORDERS",
+ Storage = StorageType.Memory,
+ Subjects = ["orders.*"],
+ };
+ var proposed = new StreamConfig
+ {
+ Name = "ORDERS",
+ Storage = StorageType.File,
+ Subjects = ["orders.*"],
+ };
+
+ var errors = StreamManager.ValidateConfigUpdate(existing, proposed);
+
+ errors.ShouldContain(e => e.Contains("storage type"));
+ }
+
+ // Go ref: server/stream.go:1530-1535 (mirror immutability)
+ // Changing the mirror origin must be rejected.
+ [Fact]
+ public void ValidateConfigUpdate_rejects_mirror_change()
+ {
+ var existing = new StreamConfig
+ {
+ Name = "MIRROR_STREAM",
+ Storage = StorageType.Memory,
+ Mirror = "ORIGIN_A",
+ };
+ var proposed = new StreamConfig
+ {
+ Name = "MIRROR_STREAM",
+ Storage = StorageType.Memory,
+ Mirror = "ORIGIN_B",
+ };
+
+ var errors = StreamManager.ValidateConfigUpdate(existing, proposed);
+
+ errors.ShouldContain(e => e.Contains("mirror configuration"));
+ }
+
+ // Go ref: server/stream.go:1520-1525 (retention policy immutability)
+ // Changing the retention policy must be rejected.
+ [Fact]
+ public void ValidateConfigUpdate_rejects_retention_change()
+ {
+ var existing = new StreamConfig
+ {
+ Name = "ORDERS",
+ Storage = StorageType.Memory,
+ Retention = RetentionPolicy.Limits,
+ Subjects = ["orders.*"],
+ };
+ var proposed = new StreamConfig
+ {
+ Name = "ORDERS",
+ Storage = StorageType.Memory,
+ Retention = RetentionPolicy.WorkQueue,
+ Subjects = ["orders.*"],
+ };
+
+ var errors = StreamManager.ValidateConfigUpdate(existing, proposed);
+
+ errors.ShouldContain(e => e.Contains("retention policy"));
+ }
+
+ // Go ref: server/stream.go:1500-1502 (sealed stream guard)
+ // Any modification attempt on a sealed stream must be rejected.
+ [Fact]
+ public void ValidateConfigUpdate_rejects_sealed_stream_changes()
+ {
+ var existing = new StreamConfig
+ {
+ Name = "SEALED",
+ Storage = StorageType.Memory,
+ Sealed = true,
+ Subjects = ["sealed.*"],
+ };
+ var proposed = new StreamConfig
+ {
+ Name = "SEALED",
+ Storage = StorageType.Memory,
+ Sealed = true,
+ Subjects = ["sealed.new.*"],
+ };
+
+ var errors = StreamManager.ValidateConfigUpdate(existing, proposed);
+
+ errors.ShouldContain(e => e.Contains("sealed stream"));
+ }
+
+ // Go ref: server/stream.go:1537-1542 (sources immutability)
+ // Changing the sources list after creation must be rejected.
+ [Fact]
+ public void ValidateConfigUpdate_rejects_source_change()
+ {
+ var existing = new StreamConfig
+ {
+ Name = "AGG",
+ Storage = StorageType.Memory,
+ Sources =
+ [
+ new StreamSourceConfig { Name = "SRC_A" },
+ new StreamSourceConfig { Name = "SRC_B" },
+ ],
+ };
+ var proposed = new StreamConfig
+ {
+ Name = "AGG",
+ Storage = StorageType.Memory,
+ Sources =
+ [
+ new StreamSourceConfig { Name = "SRC_A" },
+ new StreamSourceConfig { Name = "SRC_C" },
+ ],
+ };
+
+ var errors = StreamManager.ValidateConfigUpdate(existing, proposed);
+
+ errors.ShouldContain(e => e.Contains("sources cannot be changed"));
+ }
+
+ // Go ref: server/jetstream.go — subject overlap detection between streams.
+ // Proposing subjects that collide with another stream's subjects must be rejected.
+ [Fact]
+ public void ValidateConfigUpdate_detects_subject_overlap()
+ {
+ var existing = new StreamConfig
+ {
+ Name = "ORDERS",
+ Storage = StorageType.Memory,
+ Subjects = ["orders.*"],
+ };
+ var proposed = new StreamConfig
+ {
+ Name = "ORDERS",
+ Storage = StorageType.Memory,
+ Subjects = ["orders.>"],
+ };
+ var otherStreams = new[]
+ {
+ new StreamConfig
+ {
+ Name = "ARCHIVE",
+ Storage = StorageType.Memory,
+ Subjects = ["orders.archived"],
+ },
+ };
+
+ var errors = StreamManager.ValidateConfigUpdate(existing, proposed, otherStreams);
+
+ errors.ShouldContain(e => e.Contains("ARCHIVE"));
+ }
+
+ // Go ref: server/jetstream.go — no error for non-overlapping subject sets.
+ // Proposing subjects that do not overlap with other streams must succeed.
+ [Fact]
+ public void ValidateConfigUpdate_allows_non_overlapping_subjects()
+ {
+ var existing = new StreamConfig
+ {
+ Name = "ORDERS",
+ Storage = StorageType.Memory,
+ Subjects = ["orders.*"],
+ };
+ var proposed = new StreamConfig
+ {
+ Name = "ORDERS",
+ Storage = StorageType.Memory,
+ Subjects = ["orders.>"],
+ };
+ var otherStreams = new[]
+ {
+ new StreamConfig
+ {
+ Name = "EVENTS",
+ Storage = StorageType.Memory,
+ Subjects = ["events.*"],
+ },
+ };
+
+ var errors = StreamManager.ValidateConfigUpdate(existing, proposed, otherStreams);
+
+ errors.ShouldBeEmpty();
+ }
+
+ // Go ref: server/stream.go — MaxConsumers may not be decreased.
+ // Decreasing MaxConsumers from a positive value must be rejected.
+ [Fact]
+ public void ValidateConfigUpdate_rejects_max_consumers_decrease()
+ {
+ var existing = new StreamConfig
+ {
+ Name = "ORDERS",
+ Storage = StorageType.Memory,
+ Subjects = ["orders.*"],
+ MaxConsumers = 10,
+ };
+ var proposed = new StreamConfig
+ {
+ Name = "ORDERS",
+ Storage = StorageType.Memory,
+ Subjects = ["orders.*"],
+ MaxConsumers = 5,
+ };
+
+ var errors = StreamManager.ValidateConfigUpdate(existing, proposed);
+
+ errors.ShouldContain(e => e.Contains("max consumers can only be increased"));
+ }
+
+ // Go ref: server/stream.go — MaxConsumers may be raised without restriction.
+ [Fact]
+ public void ValidateConfigUpdate_allows_max_consumers_increase()
+ {
+ var existing = new StreamConfig
+ {
+ Name = "ORDERS",
+ Storage = StorageType.Memory,
+ Subjects = ["orders.*"],
+ MaxConsumers = 5,
+ };
+ var proposed = new StreamConfig
+ {
+ Name = "ORDERS",
+ Storage = StorageType.Memory,
+ Subjects = ["orders.*"],
+ MaxConsumers = 20,
+ };
+
+ var errors = StreamManager.ValidateConfigUpdate(existing, proposed);
+
+ errors.ShouldBeEmpty();
+ }
+
+ // Go ref: server/stream.go — RAFT consensus requires an odd number of replicas.
+ // Setting replicas to an even number must be rejected.
+ [Fact]
+ public void ValidateConfigUpdate_rejects_even_replicas()
+ {
+ var existing = new StreamConfig
+ {
+ Name = "ORDERS",
+ Storage = StorageType.Memory,
+ Subjects = ["orders.*"],
+ Replicas = 1,
+ };
+ var proposed = new StreamConfig
+ {
+ Name = "ORDERS",
+ Storage = StorageType.Memory,
+ Subjects = ["orders.*"],
+ Replicas = 2,
+ };
+
+ var errors = StreamManager.ValidateConfigUpdate(existing, proposed);
+
+ errors.ShouldContain(e => e.Contains("replicas must be odd"));
+ }
+
+ // Go ref: server/stream.go:1500-1600 (stream.update) — integration via StreamManager.
+ // CreateOrUpdate must reject an update that changes storage type.
+ [Fact]
+ public void CreateOrUpdate_rejects_invalid_config_update()
+ {
+ var manager = new StreamManager();
+
+ var createResult = manager.CreateOrUpdate(new StreamConfig
+ {
+ Name = "EVENTS",
+ Storage = StorageType.Memory,
+ Subjects = ["events.*"],
+ });
+ createResult.Error.ShouldBeNull();
+
+ var updateResult = manager.CreateOrUpdate(new StreamConfig
+ {
+ Name = "EVENTS",
+ Storage = StorageType.File,
+ Subjects = ["events.*"],
+ });
+
+ updateResult.Error.ShouldNotBeNull();
+ updateResult.Error!.Description.ShouldContain("storage type");
+ }
+}