Add ValidateConfigUpdate to StreamManager with immutability rules for storage type, mirror, sources, and retention policy; sealed stream guard; MaxConsumers decrease prevention; even-replica rejection; and subject overlap detection against peer streams. Wire the check into CreateOrUpdate for all update paths. 12 new tests in ConfigUpdateValidationTests.cs cover all rules including the StreamManager integration test.
332 lines
10 KiB
C#
332 lines
10 KiB
C#
// Ported from golang/nats-server/server/jetstream_test.go
|
|
// Go reference: server/stream.go:1500-1600 (stream.update immutable field validation)
|
|
// Covers: TestJetStreamStreamUpdate, TestJetStreamStreamUpdateMaxConsumers
|
|
|
|
using NATS.Server.JetStream;
|
|
using NATS.Server.JetStream.Api;
|
|
using NATS.Server.JetStream.Models;
|
|
using Shouldly;
|
|
|
|
namespace NATS.Server.Tests.JetStream.Streams;
|
|
|
|
public class ConfigUpdateValidationTests
|
|
{
|
|
// Go ref: server/stream.go:1500-1600 (stream.update)
|
|
// A valid update that only changes mutable fields (MaxMsgs) should produce no errors.
|
|
[Fact]
|
|
public void ValidateConfigUpdate_allows_valid_changes()
|
|
{
|
|
var existing = new StreamConfig
|
|
{
|
|
Name = "ORDERS",
|
|
Storage = StorageType.Memory,
|
|
Retention = RetentionPolicy.Limits,
|
|
Subjects = ["orders.*"],
|
|
MaxMsgs = 100,
|
|
};
|
|
var proposed = new StreamConfig
|
|
{
|
|
Name = "ORDERS",
|
|
Storage = StorageType.Memory,
|
|
Retention = RetentionPolicy.Limits,
|
|
Subjects = ["orders.*"],
|
|
MaxMsgs = 500,
|
|
};
|
|
|
|
var errors = StreamManager.ValidateConfigUpdate(existing, proposed);
|
|
|
|
errors.ShouldBeEmpty();
|
|
}
|
|
|
|
// Go ref: server/stream.go:1511-1513 (storage type immutability check)
|
|
// Changing storage type from Memory to File must be rejected.
|
|
[Fact]
|
|
public void ValidateConfigUpdate_rejects_storage_type_change()
|
|
{
|
|
var existing = new StreamConfig
|
|
{
|
|
Name = "ORDERS",
|
|
Storage = StorageType.Memory,
|
|
Subjects = ["orders.*"],
|
|
};
|
|
var proposed = new StreamConfig
|
|
{
|
|
Name = "ORDERS",
|
|
Storage = StorageType.File,
|
|
Subjects = ["orders.*"],
|
|
};
|
|
|
|
var errors = StreamManager.ValidateConfigUpdate(existing, proposed);
|
|
|
|
errors.ShouldContain(e => e.Contains("storage type"));
|
|
}
|
|
|
|
// Go ref: server/stream.go:1530-1535 (mirror immutability)
|
|
// Changing the mirror origin must be rejected.
|
|
[Fact]
|
|
public void ValidateConfigUpdate_rejects_mirror_change()
|
|
{
|
|
var existing = new StreamConfig
|
|
{
|
|
Name = "MIRROR_STREAM",
|
|
Storage = StorageType.Memory,
|
|
Mirror = "ORIGIN_A",
|
|
};
|
|
var proposed = new StreamConfig
|
|
{
|
|
Name = "MIRROR_STREAM",
|
|
Storage = StorageType.Memory,
|
|
Mirror = "ORIGIN_B",
|
|
};
|
|
|
|
var errors = StreamManager.ValidateConfigUpdate(existing, proposed);
|
|
|
|
errors.ShouldContain(e => e.Contains("mirror configuration"));
|
|
}
|
|
|
|
// Go ref: server/stream.go:1520-1525 (retention policy immutability)
|
|
// Changing the retention policy must be rejected.
|
|
[Fact]
|
|
public void ValidateConfigUpdate_rejects_retention_change()
|
|
{
|
|
var existing = new StreamConfig
|
|
{
|
|
Name = "ORDERS",
|
|
Storage = StorageType.Memory,
|
|
Retention = RetentionPolicy.Limits,
|
|
Subjects = ["orders.*"],
|
|
};
|
|
var proposed = new StreamConfig
|
|
{
|
|
Name = "ORDERS",
|
|
Storage = StorageType.Memory,
|
|
Retention = RetentionPolicy.WorkQueue,
|
|
Subjects = ["orders.*"],
|
|
};
|
|
|
|
var errors = StreamManager.ValidateConfigUpdate(existing, proposed);
|
|
|
|
errors.ShouldContain(e => e.Contains("retention policy"));
|
|
}
|
|
|
|
// Go ref: server/stream.go:1500-1502 (sealed stream guard)
|
|
// Any modification attempt on a sealed stream must be rejected.
|
|
[Fact]
|
|
public void ValidateConfigUpdate_rejects_sealed_stream_changes()
|
|
{
|
|
var existing = new StreamConfig
|
|
{
|
|
Name = "SEALED",
|
|
Storage = StorageType.Memory,
|
|
Sealed = true,
|
|
Subjects = ["sealed.*"],
|
|
};
|
|
var proposed = new StreamConfig
|
|
{
|
|
Name = "SEALED",
|
|
Storage = StorageType.Memory,
|
|
Sealed = true,
|
|
Subjects = ["sealed.new.*"],
|
|
};
|
|
|
|
var errors = StreamManager.ValidateConfigUpdate(existing, proposed);
|
|
|
|
errors.ShouldContain(e => e.Contains("sealed stream"));
|
|
}
|
|
|
|
// Go ref: server/stream.go:1537-1542 (sources immutability)
|
|
// Changing the sources list after creation must be rejected.
|
|
[Fact]
|
|
public void ValidateConfigUpdate_rejects_source_change()
|
|
{
|
|
var existing = new StreamConfig
|
|
{
|
|
Name = "AGG",
|
|
Storage = StorageType.Memory,
|
|
Sources =
|
|
[
|
|
new StreamSourceConfig { Name = "SRC_A" },
|
|
new StreamSourceConfig { Name = "SRC_B" },
|
|
],
|
|
};
|
|
var proposed = new StreamConfig
|
|
{
|
|
Name = "AGG",
|
|
Storage = StorageType.Memory,
|
|
Sources =
|
|
[
|
|
new StreamSourceConfig { Name = "SRC_A" },
|
|
new StreamSourceConfig { Name = "SRC_C" },
|
|
],
|
|
};
|
|
|
|
var errors = StreamManager.ValidateConfigUpdate(existing, proposed);
|
|
|
|
errors.ShouldContain(e => e.Contains("sources cannot be changed"));
|
|
}
|
|
|
|
// Go ref: server/jetstream.go — subject overlap detection between streams.
|
|
// Proposing subjects that collide with another stream's subjects must be rejected.
|
|
[Fact]
|
|
public void ValidateConfigUpdate_detects_subject_overlap()
|
|
{
|
|
var existing = new StreamConfig
|
|
{
|
|
Name = "ORDERS",
|
|
Storage = StorageType.Memory,
|
|
Subjects = ["orders.*"],
|
|
};
|
|
var proposed = new StreamConfig
|
|
{
|
|
Name = "ORDERS",
|
|
Storage = StorageType.Memory,
|
|
Subjects = ["orders.>"],
|
|
};
|
|
var otherStreams = new[]
|
|
{
|
|
new StreamConfig
|
|
{
|
|
Name = "ARCHIVE",
|
|
Storage = StorageType.Memory,
|
|
Subjects = ["orders.archived"],
|
|
},
|
|
};
|
|
|
|
var errors = StreamManager.ValidateConfigUpdate(existing, proposed, otherStreams);
|
|
|
|
errors.ShouldContain(e => e.Contains("ARCHIVE"));
|
|
}
|
|
|
|
// Go ref: server/jetstream.go — no error for non-overlapping subject sets.
|
|
// Proposing subjects that do not overlap with other streams must succeed.
|
|
[Fact]
|
|
public void ValidateConfigUpdate_allows_non_overlapping_subjects()
|
|
{
|
|
var existing = new StreamConfig
|
|
{
|
|
Name = "ORDERS",
|
|
Storage = StorageType.Memory,
|
|
Subjects = ["orders.*"],
|
|
};
|
|
var proposed = new StreamConfig
|
|
{
|
|
Name = "ORDERS",
|
|
Storage = StorageType.Memory,
|
|
Subjects = ["orders.>"],
|
|
};
|
|
var otherStreams = new[]
|
|
{
|
|
new StreamConfig
|
|
{
|
|
Name = "EVENTS",
|
|
Storage = StorageType.Memory,
|
|
Subjects = ["events.*"],
|
|
},
|
|
};
|
|
|
|
var errors = StreamManager.ValidateConfigUpdate(existing, proposed, otherStreams);
|
|
|
|
errors.ShouldBeEmpty();
|
|
}
|
|
|
|
// Go ref: server/stream.go — MaxConsumers may not be decreased.
|
|
// Decreasing MaxConsumers from a positive value must be rejected.
|
|
[Fact]
|
|
public void ValidateConfigUpdate_rejects_max_consumers_decrease()
|
|
{
|
|
var existing = new StreamConfig
|
|
{
|
|
Name = "ORDERS",
|
|
Storage = StorageType.Memory,
|
|
Subjects = ["orders.*"],
|
|
MaxConsumers = 10,
|
|
};
|
|
var proposed = new StreamConfig
|
|
{
|
|
Name = "ORDERS",
|
|
Storage = StorageType.Memory,
|
|
Subjects = ["orders.*"],
|
|
MaxConsumers = 5,
|
|
};
|
|
|
|
var errors = StreamManager.ValidateConfigUpdate(existing, proposed);
|
|
|
|
errors.ShouldContain(e => e.Contains("max consumers can only be increased"));
|
|
}
|
|
|
|
// Go ref: server/stream.go — MaxConsumers may be raised without restriction.
|
|
[Fact]
|
|
public void ValidateConfigUpdate_allows_max_consumers_increase()
|
|
{
|
|
var existing = new StreamConfig
|
|
{
|
|
Name = "ORDERS",
|
|
Storage = StorageType.Memory,
|
|
Subjects = ["orders.*"],
|
|
MaxConsumers = 5,
|
|
};
|
|
var proposed = new StreamConfig
|
|
{
|
|
Name = "ORDERS",
|
|
Storage = StorageType.Memory,
|
|
Subjects = ["orders.*"],
|
|
MaxConsumers = 20,
|
|
};
|
|
|
|
var errors = StreamManager.ValidateConfigUpdate(existing, proposed);
|
|
|
|
errors.ShouldBeEmpty();
|
|
}
|
|
|
|
// Go ref: server/stream.go — RAFT consensus requires an odd number of replicas.
|
|
// Setting replicas to an even number must be rejected.
|
|
[Fact]
|
|
public void ValidateConfigUpdate_rejects_even_replicas()
|
|
{
|
|
var existing = new StreamConfig
|
|
{
|
|
Name = "ORDERS",
|
|
Storage = StorageType.Memory,
|
|
Subjects = ["orders.*"],
|
|
Replicas = 1,
|
|
};
|
|
var proposed = new StreamConfig
|
|
{
|
|
Name = "ORDERS",
|
|
Storage = StorageType.Memory,
|
|
Subjects = ["orders.*"],
|
|
Replicas = 2,
|
|
};
|
|
|
|
var errors = StreamManager.ValidateConfigUpdate(existing, proposed);
|
|
|
|
errors.ShouldContain(e => e.Contains("replicas must be odd"));
|
|
}
|
|
|
|
// Go ref: server/stream.go:1500-1600 (stream.update) — integration via StreamManager.
|
|
// CreateOrUpdate must reject an update that changes storage type.
|
|
[Fact]
|
|
public void CreateOrUpdate_rejects_invalid_config_update()
|
|
{
|
|
var manager = new StreamManager();
|
|
|
|
var createResult = manager.CreateOrUpdate(new StreamConfig
|
|
{
|
|
Name = "EVENTS",
|
|
Storage = StorageType.Memory,
|
|
Subjects = ["events.*"],
|
|
});
|
|
createResult.Error.ShouldBeNull();
|
|
|
|
var updateResult = manager.CreateOrUpdate(new StreamConfig
|
|
{
|
|
Name = "EVENTS",
|
|
Storage = StorageType.File,
|
|
Subjects = ["events.*"],
|
|
});
|
|
|
|
updateResult.Error.ShouldNotBeNull();
|
|
updateResult.Error!.Description.ShouldContain("storage type");
|
|
}
|
|
}
|