diff --git a/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs b/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs index eddb230..fd6d82f 100644 --- a/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs +++ b/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs @@ -264,6 +264,101 @@ public sealed class JetStreamMetaGroup return Task.CompletedTask; } + // --------------------------------------------------------------- + // Validated assignment processing + // Go reference: jetstream_cluster.go:4541-5925 + // --------------------------------------------------------------- + + /// + /// Validates and processes a stream assignment. + /// Returns false if the assignment is invalid (empty name, null group). + /// Idempotent: duplicate assignments for the same stream name are accepted. + /// Go reference: jetstream_cluster.go:4541 processStreamAssignment. + /// + public bool ProcessStreamAssignment(StreamAssignment sa) + { + if (string.IsNullOrEmpty(sa.StreamName) || sa.Group == null) + return false; + + AddStreamAssignment(sa); + return true; + } + + /// + /// Updates an existing stream assignment's configuration. + /// Returns false if the stream does not exist. + /// Go reference: jetstream_cluster.go processUpdateStreamAssignment. + /// + public bool ProcessUpdateStreamAssignment(StreamAssignment sa) + { + if (!_assignments.TryGetValue(sa.StreamName, out var existing)) + return false; + + // Update the config while preserving consumers and other state + var updated = new StreamAssignment + { + StreamName = sa.StreamName, + Group = sa.Group, + ConfigJson = sa.ConfigJson, + Created = existing.Created, + SyncSubject = existing.SyncSubject, + }; + // Copy consumers from old to new + foreach (var (name, ca) in existing.Consumers) + updated.Consumers[name] = ca; + + _assignments[sa.StreamName] = updated; + return true; + } + + /// + /// Removes a stream and all its consumers. + /// Returns false if stream didn't exist. Returns true if removed. + /// Go reference: jetstream_cluster.go processStreamRemoval. + /// + public bool ProcessStreamRemoval(string streamName) + { + if (!_assignments.ContainsKey(streamName)) + return false; + + RemoveStreamAssignment(streamName); + return true; + } + + /// + /// Validates and processes a consumer assignment. + /// Returns false if the parent stream does not exist or consumer name is empty. + /// Go reference: jetstream_cluster.go:5300 processConsumerAssignment. + /// + public bool ProcessConsumerAssignment(ConsumerAssignment ca) + { + if (string.IsNullOrEmpty(ca.ConsumerName) || string.IsNullOrEmpty(ca.StreamName)) + return false; + + if (!_assignments.ContainsKey(ca.StreamName)) + return false; + + AddConsumerAssignment(ca.StreamName, ca); + return true; + } + + /// + /// Removes a consumer assignment. + /// Returns false if stream or consumer doesn't exist. + /// Go reference: jetstream_cluster.go processConsumerRemoval. + /// + public bool ProcessConsumerRemoval(string streamName, string consumerName) + { + if (!_assignments.TryGetValue(streamName, out var sa)) + return false; + + if (!sa.Consumers.ContainsKey(consumerName)) + return false; + + RemoveConsumerAssignment(streamName, consumerName); + return true; + } + // --------------------------------------------------------------- // Monitor-facing mutation methods // Called by JetStreamClusterMonitor when processing committed RAFT entries. diff --git a/tests/NATS.Server.Tests/JetStream/Cluster/JetStreamAssignmentProcessingTests.cs b/tests/NATS.Server.Tests/JetStream/Cluster/JetStreamAssignmentProcessingTests.cs new file mode 100644 index 0000000..5ed8cf9 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Cluster/JetStreamAssignmentProcessingTests.cs @@ -0,0 +1,169 @@ +using NATS.Server.JetStream.Cluster; + +namespace NATS.Server.Tests.JetStream.Cluster; + +/// +/// Tests for validated stream/consumer assignment processing. +/// Go reference: jetstream_cluster.go:4541-5925. +/// +public class JetStreamAssignmentProcessingTests +{ + [Fact] + public void ProcessStreamAssignment_validates_config() + { + var meta = new JetStreamMetaGroup(3); + var sa = new StreamAssignment + { + StreamName = "valid-stream", + Group = new RaftGroup { Name = "rg-1", Peers = ["n1", "n2", "n3"] }, + ConfigJson = """{"subjects":["test.>"]}""", + }; + + meta.ProcessStreamAssignment(sa).ShouldBeTrue(); + meta.StreamCount.ShouldBe(1); + } + + [Fact] + public void ProcessStreamAssignment_rejects_empty_name() + { + var meta = new JetStreamMetaGroup(3); + var sa = CreateStreamAssignment("", "rg-1"); + meta.ProcessStreamAssignment(sa).ShouldBeFalse(); + meta.StreamCount.ShouldBe(0); + } + + [Fact] + public void ProcessUpdateStreamAssignment_applies_config_change() + { + var meta = new JetStreamMetaGroup(3); + meta.ProcessStreamAssignment(CreateStreamAssignment("updatable", "rg-u", """{"subjects":["old.>"]}""")); + + var updated = CreateStreamAssignment("updatable", "rg-u", """{"subjects":["new.>"]}"""); + meta.ProcessUpdateStreamAssignment(updated).ShouldBeTrue(); + + var assignment = meta.GetStreamAssignment("updatable"); + assignment!.ConfigJson.ShouldContain("new.>"); + } + + [Fact] + public void ProcessUpdateStreamAssignment_returns_false_for_nonexistent() + { + var meta = new JetStreamMetaGroup(3); + var sa = CreateStreamAssignment("ghost", "rg-g"); + meta.ProcessUpdateStreamAssignment(sa).ShouldBeFalse(); + } + + [Fact] + public void ProcessConsumerAssignment_requires_existing_stream() + { + var meta = new JetStreamMetaGroup(3); + var ca = new ConsumerAssignment + { + ConsumerName = "orphan-consumer", + StreamName = "nonexistent-stream", + Group = new RaftGroup { Name = "rg-c", Peers = ["n1", "n2", "n3"] }, + }; + + meta.ProcessConsumerAssignment(ca).ShouldBeFalse(); + } + + [Fact] + public void ProcessConsumerAssignment_succeeds_with_existing_stream() + { + var meta = new JetStreamMetaGroup(3); + meta.ProcessStreamAssignment(CreateStreamAssignment("s1", "rg-s1")); + + var ca = new ConsumerAssignment + { + ConsumerName = "c1", + StreamName = "s1", + Group = new RaftGroup { Name = "rg-c1", Peers = ["n1", "n2", "n3"] }, + }; + + meta.ProcessConsumerAssignment(ca).ShouldBeTrue(); + meta.ConsumerCount.ShouldBe(1); + } + + [Fact] + public void ProcessStreamRemoval_cascades_to_consumers() + { + var meta = new JetStreamMetaGroup(3); + meta.ProcessStreamAssignment(CreateStreamAssignment("cascade", "rg-cas")); + meta.ProcessConsumerAssignment(new ConsumerAssignment + { + ConsumerName = "c1", + StreamName = "cascade", + Group = new RaftGroup { Name = "rg-c1", Peers = ["n1", "n2", "n3"] }, + }); + + meta.ProcessStreamRemoval("cascade").ShouldBeTrue(); + meta.StreamCount.ShouldBe(0); + meta.ConsumerCount.ShouldBe(0); + } + + [Fact] + public void ProcessStreamRemoval_returns_false_for_nonexistent() + { + var meta = new JetStreamMetaGroup(3); + meta.ProcessStreamRemoval("nope").ShouldBeFalse(); + } + + [Fact] + public void ProcessConsumerRemoval_returns_false_for_nonexistent_stream() + { + var meta = new JetStreamMetaGroup(3); + meta.ProcessConsumerRemoval("ghost", "c1").ShouldBeFalse(); + } + + [Fact] + public void ProcessConsumerRemoval_returns_false_for_nonexistent_consumer() + { + var meta = new JetStreamMetaGroup(3); + meta.ProcessStreamAssignment(CreateStreamAssignment("s1", "rg-s1")); + meta.ProcessConsumerRemoval("s1", "nope").ShouldBeFalse(); + } + + [Fact] + public void ProcessConsumerRemoval_succeeds() + { + var meta = new JetStreamMetaGroup(3); + meta.ProcessStreamAssignment(CreateStreamAssignment("s1", "rg-s1")); + meta.ProcessConsumerAssignment(new ConsumerAssignment + { + ConsumerName = "c1", + StreamName = "s1", + Group = new RaftGroup { Name = "rg-c1", Peers = ["n1", "n2"] }, + }); + + meta.ProcessConsumerRemoval("s1", "c1").ShouldBeTrue(); + meta.ConsumerCount.ShouldBe(0); + } + + [Fact] + public void ProcessUpdateStreamAssignment_preserves_consumers() + { + var meta = new JetStreamMetaGroup(3); + meta.ProcessStreamAssignment(CreateStreamAssignment("s1", "rg-s1", """{"subjects":["old"]}""")); + meta.ProcessConsumerAssignment(new ConsumerAssignment + { + ConsumerName = "c1", + StreamName = "s1", + Group = new RaftGroup { Name = "rg-c1", Peers = ["n1", "n2"] }, + }); + + var updated = CreateStreamAssignment("s1", "rg-s1", """{"subjects":["new"]}"""); + meta.ProcessUpdateStreamAssignment(updated).ShouldBeTrue(); + + meta.ConsumerCount.ShouldBe(1); + meta.GetConsumerAssignment("s1", "c1").ShouldNotBeNull(); + } + + // Helper to create a StreamAssignment (StreamName is `required` so we must always provide it) + private static StreamAssignment CreateStreamAssignment(string name, string groupName, string config = "{}") + => new() + { + StreamName = name, + Group = new RaftGroup { Name = groupName, Peers = ["n1", "n2", "n3"] }, + ConfigJson = config, + }; +}