diff --git a/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs b/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs index 24f8ed4..fc987aa 100644 --- a/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs +++ b/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs @@ -3,11 +3,33 @@ using NATS.Server.JetStream.Models; namespace NATS.Server.JetStream.Cluster; +/// +/// Orchestrates cluster-wide stream/consumer lifecycle via RAFT proposals. +/// The meta-group tracks StreamAssignment and ConsumerAssignment dictionaries, +/// validates proposals, and dispatches applied entries. +/// Go reference: jetstream_cluster.go:500-2000 (processStreamAssignment, processConsumerAssignment). +/// public sealed class JetStreamMetaGroup { private readonly int _nodes; private readonly int _selfIndex; + + // Backward-compatible stream name set used by existing GetState().Streams. private readonly ConcurrentDictionary _streams = new(StringComparer.Ordinal); + + // Full StreamAssignment tracking for proposal workflow. + // Go reference: jetstream_cluster.go streamAssignment, consumerAssignment maps. + private readonly ConcurrentDictionary _assignments = + new(StringComparer.Ordinal); + + // B8: Inflight proposal tracking -- entries that have been proposed but not yet committed. + // Go reference: jetstream_cluster.go inflight tracking for proposals. + private readonly ConcurrentDictionary _inflightStreams = new(StringComparer.Ordinal); + private readonly ConcurrentDictionary _inflightConsumers = new(StringComparer.Ordinal); + + // Running count of consumers across all stream assignments. + private int _totalConsumerCount; + private int _leaderIndex = 1; private long _leadershipVersion = 1; @@ -24,7 +46,7 @@ public sealed class JetStreamMetaGroup /// /// Returns true when this node is the current meta-group leader. - /// Go reference: jetstream_api.go:200-300 — leader check before mutating operations. + /// Go reference: jetstream_api.go:200-300 -- leader check before mutating operations. /// public bool IsLeader() => _leaderIndex == _selfIndex; @@ -34,12 +56,206 @@ public sealed class JetStreamMetaGroup /// public string Leader => $"meta-{_leaderIndex}"; + /// + /// Number of streams currently tracked. + /// + public int StreamCount => _assignments.Count; + + /// + /// Number of consumers across all streams. + /// + public int ConsumerCount => _totalConsumerCount; + + /// + /// Number of inflight stream proposals. + /// + public int InflightStreamCount => _inflightStreams.Count; + + /// + /// Number of inflight consumer proposals. + /// + public int InflightConsumerCount => _inflightConsumers.Count; + + // --------------------------------------------------------------- + // Stream proposals + // --------------------------------------------------------------- + + /// + /// Proposes creating a stream. Stores in both the backward-compatible name set + /// and the full assignment map. + /// Go reference: jetstream_cluster.go processStreamAssignment. + /// public Task ProposeCreateStreamAsync(StreamConfig config, CancellationToken ct) + => ProposeCreateStreamAsync(config, group: null, ct); + + /// + /// Proposes creating a stream with an explicit RAFT group assignment. + /// Validates leader status and duplicate stream names before proposing. + /// Go reference: jetstream_cluster.go processStreamAssignment. + /// + public Task ProposeCreateStreamAsync(StreamConfig config, RaftGroup? group, CancellationToken ct) { - _streams[config.Name] = 0; + _ = ct; + + if (!IsLeader()) + throw new InvalidOperationException($"Not the meta-group leader. Current leader: {Leader}"); + + if (_assignments.ContainsKey(config.Name)) + throw new InvalidOperationException($"Stream '{config.Name}' already exists."); + + // Track as inflight + _inflightStreams[config.Name] = config.Name; + + // Apply the entry + ApplyStreamCreate(config.Name, group ?? new RaftGroup { Name = config.Name }); + + // Clear inflight + _inflightStreams.TryRemove(config.Name, out _); + return Task.CompletedTask; } + /// + /// Proposes deleting a stream. Removes from both tracking structures. + /// Go reference: jetstream_cluster.go processStreamDelete. + /// + public Task ProposeDeleteStreamAsync(string streamName, CancellationToken ct) + { + _ = ct; + + if (!IsLeader()) + throw new InvalidOperationException($"Not the meta-group leader. Current leader: {Leader}"); + + ApplyStreamDelete(streamName); + + return Task.CompletedTask; + } + + // --------------------------------------------------------------- + // Consumer proposals + // --------------------------------------------------------------- + + /// + /// Proposes creating a consumer assignment within a stream. + /// Validates that the stream exists. + /// Go reference: jetstream_cluster.go processConsumerAssignment. + /// + public Task ProposeCreateConsumerAsync( + string streamName, + string consumerName, + RaftGroup group, + CancellationToken ct) + { + _ = ct; + + if (!IsLeader()) + throw new InvalidOperationException($"Not the meta-group leader. Current leader: {Leader}"); + + if (!_assignments.ContainsKey(streamName)) + throw new InvalidOperationException($"Stream '{streamName}' not found."); + + // Track as inflight + var inflightKey = $"{streamName}/{consumerName}"; + _inflightConsumers[inflightKey] = inflightKey; + + // Apply the entry + ApplyConsumerCreate(streamName, consumerName, group); + + // Clear inflight + _inflightConsumers.TryRemove(inflightKey, out _); + + return Task.CompletedTask; + } + + /// + /// Proposes deleting a consumer assignment from a stream. + /// Go reference: jetstream_cluster.go processConsumerDelete. + /// + public Task ProposeDeleteConsumerAsync( + string streamName, + string consumerName, + CancellationToken ct) + { + _ = ct; + + if (!IsLeader()) + throw new InvalidOperationException($"Not the meta-group leader. Current leader: {Leader}"); + + ApplyConsumerDelete(streamName, consumerName); + + return Task.CompletedTask; + } + + // --------------------------------------------------------------- + // ApplyEntry dispatch + // Go reference: jetstream_cluster.go RAFT apply for meta group + // --------------------------------------------------------------- + + /// + /// Applies a committed RAFT entry to the meta-group state. + /// Dispatches based on entry type prefix. + /// Go reference: jetstream_cluster.go processStreamAssignment / processConsumerAssignment. + /// + public void ApplyEntry(MetaEntryType entryType, string name, string? streamName = null, RaftGroup? group = null) + { + switch (entryType) + { + case MetaEntryType.StreamCreate: + ApplyStreamCreate(name, group ?? new RaftGroup { Name = name }); + break; + case MetaEntryType.StreamDelete: + ApplyStreamDelete(name); + break; + case MetaEntryType.ConsumerCreate: + if (streamName is null) + throw new ArgumentNullException(nameof(streamName), "Stream name required for consumer operations."); + ApplyConsumerCreate(streamName, name, group ?? new RaftGroup { Name = name }); + break; + case MetaEntryType.ConsumerDelete: + if (streamName is null) + throw new ArgumentNullException(nameof(streamName), "Stream name required for consumer operations."); + ApplyConsumerDelete(streamName, name); + break; + } + } + + // --------------------------------------------------------------- + // Lookup + // --------------------------------------------------------------- + + /// + /// Returns the StreamAssignment for the given stream name, or null if not found. + /// Go reference: jetstream_cluster.go streamAssignment lookup in meta leader. + /// + public StreamAssignment? GetStreamAssignment(string streamName) + => _assignments.TryGetValue(streamName, out var assignment) ? assignment : null; + + /// + /// Returns the ConsumerAssignment for the given stream and consumer, or null if not found. + /// Go reference: jetstream_cluster.go consumerAssignment lookup. + /// + public ConsumerAssignment? GetConsumerAssignment(string streamName, string consumerName) + { + if (_assignments.TryGetValue(streamName, out var sa) + && sa.Consumers.TryGetValue(consumerName, out var ca)) + { + return ca; + } + + return null; + } + + /// + /// Returns all current stream assignments. + /// Go reference: jetstream_cluster.go meta leader assignment enumeration. + /// + public IReadOnlyCollection GetAllAssignments() + => _assignments.Values.ToArray(); + + // --------------------------------------------------------------- + // State + // --------------------------------------------------------------- + public MetaGroupState GetState() { return new MetaGroupState @@ -48,9 +264,16 @@ public sealed class JetStreamMetaGroup ClusterSize = _nodes, LeaderId = $"meta-{_leaderIndex}", LeadershipVersion = _leadershipVersion, + AssignmentCount = _assignments.Count, + ConsumerCount = _totalConsumerCount, }; } + /// + /// Steps down the current leader, rotating to the next node. + /// Clears all inflight proposals on leader change. + /// Go reference: jetstream_cluster.go leader stepdown, clear inflight. + /// public void StepDown() { _leaderIndex++; @@ -58,7 +281,80 @@ public sealed class JetStreamMetaGroup _leaderIndex = 1; Interlocked.Increment(ref _leadershipVersion); + + // Clear inflight on leader change + // Go reference: jetstream_cluster.go -- inflight entries are cleared when leadership changes. + _inflightStreams.Clear(); + _inflightConsumers.Clear(); } + + // --------------------------------------------------------------- + // Internal apply methods + // --------------------------------------------------------------- + + private void ApplyStreamCreate(string streamName, RaftGroup group) + { + _streams[streamName] = 0; + + _assignments.AddOrUpdate( + streamName, + name => new StreamAssignment + { + StreamName = name, + Group = group, + ConfigJson = "{}", + }, + (_, existing) => existing); + } + + private void ApplyStreamDelete(string streamName) + { + if (_assignments.TryRemove(streamName, out var removed)) + { + // Decrement consumer count for all consumers in this stream + Interlocked.Add(ref _totalConsumerCount, -removed.Consumers.Count); + } + + _streams.TryRemove(streamName, out _); + } + + private void ApplyConsumerCreate(string streamName, string consumerName, RaftGroup group) + { + if (_assignments.TryGetValue(streamName, out var streamAssignment)) + { + var isNew = !streamAssignment.Consumers.ContainsKey(consumerName); + streamAssignment.Consumers[consumerName] = new ConsumerAssignment + { + ConsumerName = consumerName, + StreamName = streamName, + Group = group, + }; + + if (isNew) + Interlocked.Increment(ref _totalConsumerCount); + } + } + + private void ApplyConsumerDelete(string streamName, string consumerName) + { + if (_assignments.TryGetValue(streamName, out var streamAssignment)) + { + if (streamAssignment.Consumers.Remove(consumerName)) + Interlocked.Decrement(ref _totalConsumerCount); + } + } +} + +/// +/// Types of entries that can be proposed/applied in the meta group. +/// Go reference: jetstream_cluster.go entry type constants. +/// +public enum MetaEntryType +{ + StreamCreate, + StreamDelete, + ConsumerCreate, + ConsumerDelete, } public sealed class MetaGroupState @@ -67,4 +363,14 @@ public sealed class MetaGroupState public int ClusterSize { get; init; } public string LeaderId { get; init; } = string.Empty; public long LeadershipVersion { get; init; } + + /// + /// Number of stream assignments currently tracked by the meta group. + /// + public int AssignmentCount { get; init; } + + /// + /// Total consumer count across all stream assignments. + /// + public int ConsumerCount { get; init; } } diff --git a/tests/NATS.Server.Tests/JetStream/Cluster/AssignmentSerializationTests.cs b/tests/NATS.Server.Tests/JetStream/Cluster/AssignmentSerializationTests.cs new file mode 100644 index 0000000..1be8c9d --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Cluster/AssignmentSerializationTests.cs @@ -0,0 +1,245 @@ +// Go parity: golang/nats-server/server/jetstream_cluster.go +// Covers: RaftGroup quorum calculation, HasQuorum checks, StreamAssignment +// and ConsumerAssignment creation, consumer dictionary operations, +// Preferred peer tracking. +using NATS.Server.JetStream.Cluster; + +namespace NATS.Server.Tests.JetStream.Cluster; + +/// +/// Tests for ClusterAssignmentTypes: RaftGroup quorum semantics, +/// StreamAssignment lifecycle, and ConsumerAssignment defaults. +/// Go reference: jetstream_cluster.go:154-266 (raftGroup, streamAssignment, consumerAssignment). +/// +public class AssignmentSerializationTests +{ + // --------------------------------------------------------------- + // RaftGroup quorum calculation + // Go reference: jetstream_cluster.go:154-163 raftGroup.quorumNeeded() + // --------------------------------------------------------------- + + [Fact] + public void RaftGroup_quorum_size_for_single_node_is_one() + { + var group = new RaftGroup { Name = "test-r1", Peers = ["peer-1"] }; + + group.QuorumSize.ShouldBe(1); + } + + [Fact] + public void RaftGroup_quorum_size_for_three_nodes_is_two() + { + var group = new RaftGroup { Name = "test-r3", Peers = ["p1", "p2", "p3"] }; + + group.QuorumSize.ShouldBe(2); + } + + [Fact] + public void RaftGroup_quorum_size_for_five_nodes_is_three() + { + var group = new RaftGroup { Name = "test-r5", Peers = ["p1", "p2", "p3", "p4", "p5"] }; + + group.QuorumSize.ShouldBe(3); + } + + [Fact] + public void RaftGroup_quorum_size_for_empty_peers_is_one() + { + var group = new RaftGroup { Name = "test-empty", Peers = [] }; + + // (0 / 2) + 1 = 1 + group.QuorumSize.ShouldBe(1); + } + + // --------------------------------------------------------------- + // HasQuorum checks + // Go reference: jetstream_cluster.go raftGroup quorum check + // --------------------------------------------------------------- + + [Fact] + public void HasQuorum_returns_true_when_acks_meet_quorum() + { + var group = new RaftGroup { Name = "q-test", Peers = ["p1", "p2", "p3"] }; + + group.HasQuorum(2).ShouldBeTrue(); + group.HasQuorum(3).ShouldBeTrue(); + } + + [Fact] + public void HasQuorum_returns_false_when_acks_below_quorum() + { + var group = new RaftGroup { Name = "q-test", Peers = ["p1", "p2", "p3"] }; + + group.HasQuorum(1).ShouldBeFalse(); + group.HasQuorum(0).ShouldBeFalse(); + } + + [Fact] + public void HasQuorum_single_node_requires_one_ack() + { + var group = new RaftGroup { Name = "q-r1", Peers = ["p1"] }; + + group.HasQuorum(1).ShouldBeTrue(); + group.HasQuorum(0).ShouldBeFalse(); + } + + [Fact] + public void HasQuorum_five_nodes_requires_three_acks() + { + var group = new RaftGroup { Name = "q-r5", Peers = ["p1", "p2", "p3", "p4", "p5"] }; + + group.HasQuorum(2).ShouldBeFalse(); + group.HasQuorum(3).ShouldBeTrue(); + group.HasQuorum(5).ShouldBeTrue(); + } + + // --------------------------------------------------------------- + // RaftGroup property defaults + // Go reference: jetstream_cluster.go:154-163 + // --------------------------------------------------------------- + + [Fact] + public void RaftGroup_defaults_storage_to_file() + { + var group = new RaftGroup { Name = "defaults" }; + + group.StorageType.ShouldBe("file"); + } + + [Fact] + public void RaftGroup_defaults_cluster_to_empty() + { + var group = new RaftGroup { Name = "defaults" }; + + group.Cluster.ShouldBe(string.Empty); + } + + [Fact] + public void RaftGroup_preferred_peer_tracking() + { + var group = new RaftGroup { Name = "pref-test", Peers = ["p1", "p2", "p3"] }; + + group.Preferred.ShouldBe(string.Empty); + + group.Preferred = "p2"; + group.Preferred.ShouldBe("p2"); + } + + // --------------------------------------------------------------- + // StreamAssignment creation + // Go reference: jetstream_cluster.go:166-184 streamAssignment + // --------------------------------------------------------------- + + [Fact] + public void StreamAssignment_created_with_defaults() + { + var group = new RaftGroup { Name = "sa-group", Peers = ["p1"] }; + var sa = new StreamAssignment + { + StreamName = "TEST-STREAM", + Group = group, + }; + + sa.StreamName.ShouldBe("TEST-STREAM"); + sa.Group.ShouldBeSameAs(group); + sa.ConfigJson.ShouldBe("{}"); + sa.SyncSubject.ShouldBe(string.Empty); + sa.Responded.ShouldBeFalse(); + sa.Recovering.ShouldBeFalse(); + sa.Reassigning.ShouldBeFalse(); + sa.Consumers.ShouldBeEmpty(); + sa.Created.ShouldBeGreaterThan(DateTime.MinValue); + } + + [Fact] + public void StreamAssignment_consumers_dictionary_operations() + { + var group = new RaftGroup { Name = "sa-cons", Peers = ["p1", "p2", "p3"] }; + var sa = new StreamAssignment + { + StreamName = "MY-STREAM", + Group = group, + }; + + var consumerGroup = new RaftGroup { Name = "cons-group", Peers = ["p1"] }; + var ca = new ConsumerAssignment + { + ConsumerName = "durable-1", + StreamName = "MY-STREAM", + Group = consumerGroup, + }; + + sa.Consumers["durable-1"] = ca; + sa.Consumers.Count.ShouldBe(1); + sa.Consumers["durable-1"].ConsumerName.ShouldBe("durable-1"); + + sa.Consumers.Remove("durable-1"); + sa.Consumers.ShouldBeEmpty(); + } + + // --------------------------------------------------------------- + // ConsumerAssignment creation + // Go reference: jetstream_cluster.go:250-266 consumerAssignment + // --------------------------------------------------------------- + + [Fact] + public void ConsumerAssignment_created_with_defaults() + { + var group = new RaftGroup { Name = "ca-group", Peers = ["p1"] }; + var ca = new ConsumerAssignment + { + ConsumerName = "my-consumer", + StreamName = "MY-STREAM", + Group = group, + }; + + ca.ConsumerName.ShouldBe("my-consumer"); + ca.StreamName.ShouldBe("MY-STREAM"); + ca.Group.ShouldBeSameAs(group); + ca.ConfigJson.ShouldBe("{}"); + ca.Responded.ShouldBeFalse(); + ca.Recovering.ShouldBeFalse(); + ca.Created.ShouldBeGreaterThan(DateTime.MinValue); + } + + [Fact] + public void ConsumerAssignment_mutable_flags() + { + var group = new RaftGroup { Name = "ca-flags", Peers = ["p1"] }; + var ca = new ConsumerAssignment + { + ConsumerName = "c1", + StreamName = "S1", + Group = group, + }; + + ca.Responded = true; + ca.Recovering = true; + + ca.Responded.ShouldBeTrue(); + ca.Recovering.ShouldBeTrue(); + } + + [Fact] + public void StreamAssignment_mutable_flags() + { + var group = new RaftGroup { Name = "sa-flags", Peers = ["p1"] }; + var sa = new StreamAssignment + { + StreamName = "S1", + Group = group, + }; + + sa.Responded = true; + sa.Recovering = true; + sa.Reassigning = true; + sa.ConfigJson = """{"subjects":["test.>"]}"""; + sa.SyncSubject = "$JS.SYNC.S1"; + + sa.Responded.ShouldBeTrue(); + sa.Recovering.ShouldBeTrue(); + sa.Reassigning.ShouldBeTrue(); + sa.ConfigJson.ShouldBe("""{"subjects":["test.>"]}"""); + sa.SyncSubject.ShouldBe("$JS.SYNC.S1"); + } +} diff --git a/tests/NATS.Server.Tests/JetStream/Cluster/ClusterAssignmentAndPlacementTests.cs b/tests/NATS.Server.Tests/JetStream/Cluster/ClusterAssignmentAndPlacementTests.cs new file mode 100644 index 0000000..6cf16cb --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Cluster/ClusterAssignmentAndPlacementTests.cs @@ -0,0 +1,723 @@ +// Go parity: golang/nats-server/server/jetstream_cluster.go +// Covers: RaftGroup quorum semantics, StreamAssignment/ConsumerAssignment initialization, +// JetStreamMetaGroup proposal workflow (create/delete stream + consumer), GetStreamAssignment, +// GetAllAssignments, and PlacementEngine peer selection with topology filtering. +using NATS.Server.JetStream.Cluster; +using NATS.Server.JetStream.Models; + +namespace NATS.Server.Tests.JetStream.Cluster; + +/// +/// Tests for B7 (ClusterAssignmentTypes), B8 (JetStreamMetaGroup proposal workflow), +/// and B9 (PlacementEngine peer selection). +/// Go reference: jetstream_cluster.go raftGroup, streamAssignment, consumerAssignment, +/// selectPeerGroup (line 7212). +/// +public class ClusterAssignmentAndPlacementTests +{ + // --------------------------------------------------------------- + // B7: RaftGroup — quorum and HasQuorum + // Go: jetstream_cluster.go:154 raftGroup struct + // --------------------------------------------------------------- + + [Fact] + public void RaftGroup_quorum_size_for_single_node_is_one() + { + var group = new RaftGroup + { + Name = "R1", + Peers = ["n1"], + }; + + group.QuorumSize.ShouldBe(1); + } + + [Fact] + public void RaftGroup_quorum_size_for_three_nodes_is_two() + { + var group = new RaftGroup + { + Name = "R3", + Peers = ["n1", "n2", "n3"], + }; + + group.QuorumSize.ShouldBe(2); + } + + [Fact] + public void RaftGroup_quorum_size_for_five_nodes_is_three() + { + var group = new RaftGroup + { + Name = "R5", + Peers = ["n1", "n2", "n3", "n4", "n5"], + }; + + group.QuorumSize.ShouldBe(3); + } + + [Fact] + public void RaftGroup_has_quorum_with_majority_acks() + { + var group = new RaftGroup + { + Name = "R3", + Peers = ["n1", "n2", "n3"], + }; + + // Quorum = 2; 2 acks is sufficient. + group.HasQuorum(2).ShouldBeTrue(); + } + + [Fact] + public void RaftGroup_no_quorum_with_minority_acks() + { + var group = new RaftGroup + { + Name = "R3", + Peers = ["n1", "n2", "n3"], + }; + + // Quorum = 2; 1 ack is not sufficient. + group.HasQuorum(1).ShouldBeFalse(); + } + + [Fact] + public void RaftGroup_has_quorum_with_all_acks() + { + var group = new RaftGroup + { + Name = "R5", + Peers = ["n1", "n2", "n3", "n4", "n5"], + }; + + group.HasQuorum(5).ShouldBeTrue(); + } + + [Fact] + public void RaftGroup_no_quorum_with_zero_acks() + { + var group = new RaftGroup + { + Name = "R3", + Peers = ["n1", "n2", "n3"], + }; + + group.HasQuorum(0).ShouldBeFalse(); + } + + // --------------------------------------------------------------- + // B7: StreamAssignment — initialization and consumer tracking + // Go: jetstream_cluster.go:166 streamAssignment struct + // --------------------------------------------------------------- + + [Fact] + public void StreamAssignment_initializes_with_empty_consumers() + { + var group = new RaftGroup { Name = "g1", Peers = ["n1", "n2", "n3"] }; + var assignment = new StreamAssignment + { + StreamName = "ORDERS", + Group = group, + }; + + assignment.StreamName.ShouldBe("ORDERS"); + assignment.Consumers.ShouldBeEmpty(); + assignment.ConfigJson.ShouldBe("{}"); + assignment.Responded.ShouldBeFalse(); + assignment.Recovering.ShouldBeFalse(); + assignment.Reassigning.ShouldBeFalse(); + } + + [Fact] + public void StreamAssignment_created_timestamp_is_recent() + { + var before = DateTime.UtcNow.AddSeconds(-1); + + var group = new RaftGroup { Name = "g1", Peers = ["n1"] }; + var assignment = new StreamAssignment + { + StreamName = "TS_STREAM", + Group = group, + }; + + var after = DateTime.UtcNow.AddSeconds(1); + + assignment.Created.ShouldBeGreaterThan(before); + assignment.Created.ShouldBeLessThan(after); + } + + [Fact] + public void StreamAssignment_consumers_dict_is_ordinal_keyed() + { + var group = new RaftGroup { Name = "g1", Peers = ["n1"] }; + var assignment = new StreamAssignment + { + StreamName = "S", + Group = group, + }; + + var consGroup = new RaftGroup { Name = "cg", Peers = ["n1"] }; + assignment.Consumers["ALPHA"] = new ConsumerAssignment + { + ConsumerName = "ALPHA", + StreamName = "S", + Group = consGroup, + }; + + assignment.Consumers.ContainsKey("ALPHA").ShouldBeTrue(); + assignment.Consumers.ContainsKey("alpha").ShouldBeFalse(); + } + + // --------------------------------------------------------------- + // B7: ConsumerAssignment — initialization + // Go: jetstream_cluster.go:250 consumerAssignment struct + // --------------------------------------------------------------- + + [Fact] + public void ConsumerAssignment_initializes_correctly() + { + var group = new RaftGroup { Name = "cg1", Peers = ["n1", "n2"] }; + var assignment = new ConsumerAssignment + { + ConsumerName = "PUSH_CONSUMER", + StreamName = "EVENTS", + Group = group, + }; + + assignment.ConsumerName.ShouldBe("PUSH_CONSUMER"); + assignment.StreamName.ShouldBe("EVENTS"); + assignment.Group.ShouldBeSameAs(group); + assignment.ConfigJson.ShouldBe("{}"); + assignment.Responded.ShouldBeFalse(); + assignment.Recovering.ShouldBeFalse(); + } + + [Fact] + public void ConsumerAssignment_created_timestamp_is_recent() + { + var before = DateTime.UtcNow.AddSeconds(-1); + + var group = new RaftGroup { Name = "cg", Peers = ["n1"] }; + var assignment = new ConsumerAssignment + { + ConsumerName = "C", + StreamName = "S", + Group = group, + }; + + var after = DateTime.UtcNow.AddSeconds(1); + + assignment.Created.ShouldBeGreaterThan(before); + assignment.Created.ShouldBeLessThan(after); + } + + // --------------------------------------------------------------- + // B8: JetStreamMetaGroup — ProposeCreateStreamAsync with assignment + // Go: jetstream_cluster.go processStreamAssignment + // --------------------------------------------------------------- + + [Fact] + public async Task ProposeCreateStream_with_group_stores_assignment() + { + var meta = new JetStreamMetaGroup(3); + var group = new RaftGroup { Name = "ORDERS_grp", Peers = ["n1", "n2", "n3"] }; + + await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "ORDERS" }, group, default); + + var assignment = meta.GetStreamAssignment("ORDERS"); + assignment.ShouldNotBeNull(); + assignment!.StreamName.ShouldBe("ORDERS"); + assignment.Group.Peers.Count.ShouldBe(3); + } + + [Fact] + public async Task ProposeCreateStream_without_group_still_stores_assignment() + { + var meta = new JetStreamMetaGroup(3); + + await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "NOGROUP" }, default); + + var assignment = meta.GetStreamAssignment("NOGROUP"); + assignment.ShouldNotBeNull(); + assignment!.StreamName.ShouldBe("NOGROUP"); + assignment.Group.ShouldNotBeNull(); + } + + [Fact] + public async Task ProposeCreateStream_also_appears_in_GetState_streams() + { + var meta = new JetStreamMetaGroup(3); + var group = new RaftGroup { Name = "g", Peers = ["n1"] }; + + await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "VISIBLE" }, group, default); + + var state = meta.GetState(); + state.Streams.ShouldContain("VISIBLE"); + state.AssignmentCount.ShouldBe(1); + } + + [Fact] + public async Task ProposeCreateStream_duplicate_is_idempotent() + { + var meta = new JetStreamMetaGroup(3); + var group = new RaftGroup { Name = "g", Peers = ["n1"] }; + + await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "DUP" }, group, default); + await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "DUP" }, group, default); + + meta.GetAllAssignments().Count.ShouldBe(1); + meta.GetState().Streams.Count.ShouldBe(1); + } + + // --------------------------------------------------------------- + // B8: JetStreamMetaGroup — ProposeDeleteStreamAsync + // Go: jetstream_cluster.go processStreamDelete + // --------------------------------------------------------------- + + [Fact] + public async Task ProposeDeleteStream_removes_assignment_and_stream_name() + { + var meta = new JetStreamMetaGroup(3); + var group = new RaftGroup { Name = "g", Peers = ["n1"] }; + + await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "DELETEME" }, group, default); + + meta.GetStreamAssignment("DELETEME").ShouldNotBeNull(); + meta.GetState().Streams.ShouldContain("DELETEME"); + + await meta.ProposeDeleteStreamAsync("DELETEME", default); + + meta.GetStreamAssignment("DELETEME").ShouldBeNull(); + meta.GetState().Streams.ShouldNotContain("DELETEME"); + meta.GetState().AssignmentCount.ShouldBe(0); + } + + [Fact] + public async Task ProposeDeleteStream_nonexistent_stream_is_safe() + { + var meta = new JetStreamMetaGroup(3); + + // Should not throw. + await meta.ProposeDeleteStreamAsync("MISSING", default); + meta.GetAllAssignments().Count.ShouldBe(0); + } + + [Fact] + public async Task ProposeDeleteStream_only_removes_target_not_others() + { + var meta = new JetStreamMetaGroup(3); + var group = new RaftGroup { Name = "g", Peers = ["n1"] }; + + await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "KEEP" }, group, default); + await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "REMOVE" }, group, default); + + await meta.ProposeDeleteStreamAsync("REMOVE", default); + + meta.GetStreamAssignment("KEEP").ShouldNotBeNull(); + meta.GetStreamAssignment("REMOVE").ShouldBeNull(); + meta.GetState().Streams.Count.ShouldBe(1); + } + + // --------------------------------------------------------------- + // B8: JetStreamMetaGroup — ProposeCreateConsumerAsync + // Go: jetstream_cluster.go processConsumerAssignment + // --------------------------------------------------------------- + + [Fact] + public async Task ProposeCreateConsumer_adds_consumer_to_stream_assignment() + { + var meta = new JetStreamMetaGroup(3); + var streamGroup = new RaftGroup { Name = "sg", Peers = ["n1", "n2", "n3"] }; + var consumerGroup = new RaftGroup { Name = "cg", Peers = ["n1", "n2"] }; + + await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "ORDERS" }, streamGroup, default); + await meta.ProposeCreateConsumerAsync("ORDERS", "PROCESSOR", consumerGroup, default); + + var assignment = meta.GetStreamAssignment("ORDERS"); + assignment.ShouldNotBeNull(); + assignment!.Consumers.ContainsKey("PROCESSOR").ShouldBeTrue(); + assignment.Consumers["PROCESSOR"].ConsumerName.ShouldBe("PROCESSOR"); + assignment.Consumers["PROCESSOR"].StreamName.ShouldBe("ORDERS"); + } + + [Fact] + public async Task ProposeCreateConsumer_multiple_consumers_on_same_stream() + { + var meta = new JetStreamMetaGroup(3); + var sg = new RaftGroup { Name = "sg", Peers = ["n1"] }; + var cg = new RaftGroup { Name = "cg", Peers = ["n1"] }; + + await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "MULTI" }, sg, default); + await meta.ProposeCreateConsumerAsync("MULTI", "C1", cg, default); + await meta.ProposeCreateConsumerAsync("MULTI", "C2", cg, default); + await meta.ProposeCreateConsumerAsync("MULTI", "C3", cg, default); + + var assignment = meta.GetStreamAssignment("MULTI"); + assignment!.Consumers.Count.ShouldBe(3); + assignment.Consumers.ContainsKey("C1").ShouldBeTrue(); + assignment.Consumers.ContainsKey("C2").ShouldBeTrue(); + assignment.Consumers.ContainsKey("C3").ShouldBeTrue(); + } + + [Fact] + public async Task ProposeCreateConsumer_on_nonexistent_stream_is_safe() + { + var meta = new JetStreamMetaGroup(3); + var cg = new RaftGroup { Name = "cg", Peers = ["n1"] }; + + // Should not throw — stream not found means consumer is simply not tracked. + await meta.ProposeCreateConsumerAsync("MISSING_STREAM", "C1", cg, default); + meta.GetStreamAssignment("MISSING_STREAM").ShouldBeNull(); + } + + // --------------------------------------------------------------- + // B8: JetStreamMetaGroup — ProposeDeleteConsumerAsync + // Go: jetstream_cluster.go processConsumerDelete + // --------------------------------------------------------------- + + [Fact] + public async Task ProposeDeleteConsumer_removes_consumer_from_stream() + { + var meta = new JetStreamMetaGroup(3); + var sg = new RaftGroup { Name = "sg", Peers = ["n1"] }; + var cg = new RaftGroup { Name = "cg", Peers = ["n1"] }; + + await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "EVENTS" }, sg, default); + await meta.ProposeCreateConsumerAsync("EVENTS", "PUSH", cg, default); + + meta.GetStreamAssignment("EVENTS")!.Consumers.ContainsKey("PUSH").ShouldBeTrue(); + + await meta.ProposeDeleteConsumerAsync("EVENTS", "PUSH", default); + + meta.GetStreamAssignment("EVENTS")!.Consumers.ContainsKey("PUSH").ShouldBeFalse(); + } + + [Fact] + public async Task ProposeDeleteConsumer_only_removes_target_consumer() + { + var meta = new JetStreamMetaGroup(3); + var sg = new RaftGroup { Name = "sg", Peers = ["n1"] }; + var cg = new RaftGroup { Name = "cg", Peers = ["n1"] }; + + await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "S" }, sg, default); + await meta.ProposeCreateConsumerAsync("S", "KEEP", cg, default); + await meta.ProposeCreateConsumerAsync("S", "REMOVE", cg, default); + + await meta.ProposeDeleteConsumerAsync("S", "REMOVE", default); + + var assignment = meta.GetStreamAssignment("S"); + assignment!.Consumers.ContainsKey("KEEP").ShouldBeTrue(); + assignment.Consumers.ContainsKey("REMOVE").ShouldBeFalse(); + } + + [Fact] + public async Task ProposeDeleteConsumer_on_nonexistent_consumer_is_safe() + { + var meta = new JetStreamMetaGroup(3); + var sg = new RaftGroup { Name = "sg", Peers = ["n1"] }; + + await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "S" }, sg, default); + + // Should not throw. + await meta.ProposeDeleteConsumerAsync("S", "MISSING_CONSUMER", default); + meta.GetStreamAssignment("S")!.Consumers.ShouldBeEmpty(); + } + + // --------------------------------------------------------------- + // B8: JetStreamMetaGroup — GetStreamAssignment + // --------------------------------------------------------------- + + [Fact] + public void GetStreamAssignment_returns_null_for_missing_stream() + { + var meta = new JetStreamMetaGroup(3); + + meta.GetStreamAssignment("NOT_THERE").ShouldBeNull(); + } + + [Fact] + public async Task GetAllAssignments_returns_all_tracked_streams() + { + var meta = new JetStreamMetaGroup(5); + var group = new RaftGroup { Name = "g", Peers = ["n1", "n2", "n3"] }; + + for (var i = 0; i < 5; i++) + await meta.ProposeCreateStreamAsync(new StreamConfig { Name = $"STREAM{i}" }, group, default); + + meta.GetAllAssignments().Count.ShouldBe(5); + } + + // --------------------------------------------------------------- + // B9: PlacementEngine — basic selection + // Go: jetstream_cluster.go:7212 selectPeerGroup + // --------------------------------------------------------------- + + [Fact] + public void PlacementEngine_selects_requested_number_of_peers() + { + var peers = new List + { + new() { PeerId = "n1" }, + new() { PeerId = "n2" }, + new() { PeerId = "n3" }, + new() { PeerId = "n4" }, + new() { PeerId = "n5" }, + }; + + var group = PlacementEngine.SelectPeerGroup("TEST", replicas: 3, peers); + + group.Peers.Count.ShouldBe(3); + group.Name.ShouldBe("TEST"); + } + + [Fact] + public void PlacementEngine_returns_raft_group_with_correct_name() + { + var peers = new List + { + new() { PeerId = "n1" }, + new() { PeerId = "n2" }, + }; + + var group = PlacementEngine.SelectPeerGroup("MY_GROUP", replicas: 1, peers); + + group.Name.ShouldBe("MY_GROUP"); + } + + // --------------------------------------------------------------- + // B9: PlacementEngine — cluster affinity filtering + // Go: jetstream_cluster.go selectPeerGroup cluster filter + // --------------------------------------------------------------- + + [Fact] + public void PlacementEngine_cluster_affinity_filters_to_matching_cluster() + { + var peers = new List + { + new() { PeerId = "n1", Cluster = "east" }, + new() { PeerId = "n2", Cluster = "east" }, + new() { PeerId = "n3", Cluster = "west" }, + new() { PeerId = "n4", Cluster = "west" }, + }; + + var policy = new PlacementPolicy { Cluster = "east" }; + var group = PlacementEngine.SelectPeerGroup("G", replicas: 2, peers, policy); + + group.Peers.Count.ShouldBe(2); + group.Peers.ShouldContain("n1"); + group.Peers.ShouldContain("n2"); + } + + [Fact] + public void PlacementEngine_cluster_affinity_is_case_insensitive() + { + var peers = new List + { + new() { PeerId = "n1", Cluster = "EAST" }, + new() { PeerId = "n2", Cluster = "west" }, + }; + + var policy = new PlacementPolicy { Cluster = "east" }; + var group = PlacementEngine.SelectPeerGroup("G", replicas: 1, peers, policy); + + group.Peers.ShouldContain("n1"); + } + + // --------------------------------------------------------------- + // B9: PlacementEngine — tag filtering + // Go: jetstream_cluster.go selectPeerGroup tag filter + // --------------------------------------------------------------- + + [Fact] + public void PlacementEngine_tag_filter_selects_peers_with_all_required_tags() + { + var peers = new List + { + new() { PeerId = "n1", Tags = new(StringComparer.OrdinalIgnoreCase) { "ssd", "fast" } }, + new() { PeerId = "n2", Tags = new(StringComparer.OrdinalIgnoreCase) { "ssd" } }, + new() { PeerId = "n3", Tags = new(StringComparer.OrdinalIgnoreCase) { "fast" } }, + new() { PeerId = "n4", Tags = new(StringComparer.OrdinalIgnoreCase) { "ssd", "fast" } }, + }; + + var policy = new PlacementPolicy + { + Tags = new(StringComparer.OrdinalIgnoreCase) { "ssd", "fast" }, + }; + + var group = PlacementEngine.SelectPeerGroup("G", replicas: 2, peers, policy); + + group.Peers.Count.ShouldBe(2); + group.Peers.All(p => p == "n1" || p == "n4").ShouldBeTrue(); + } + + [Fact] + public void PlacementEngine_tag_filter_is_case_insensitive() + { + var peers = new List + { + new() { PeerId = "n1", Tags = new(StringComparer.OrdinalIgnoreCase) { "SSD" } }, + new() { PeerId = "n2", Tags = new(StringComparer.OrdinalIgnoreCase) { "hdd" } }, + }; + + var policy = new PlacementPolicy + { + Tags = new(StringComparer.OrdinalIgnoreCase) { "ssd" }, + }; + + var group = PlacementEngine.SelectPeerGroup("G", replicas: 1, peers, policy); + + group.Peers.ShouldContain("n1"); + } + + // --------------------------------------------------------------- + // B9: PlacementEngine — exclude tag filtering + // Go: jetstream_cluster.go selectPeerGroup exclude-tag logic + // --------------------------------------------------------------- + + [Fact] + public void PlacementEngine_exclude_tag_filters_out_peers_with_those_tags() + { + var peers = new List + { + new() { PeerId = "n1", Tags = new(StringComparer.OrdinalIgnoreCase) { "nvme" } }, + new() { PeerId = "n2", Tags = new(StringComparer.OrdinalIgnoreCase) { "spinning" } }, + new() { PeerId = "n3", Tags = new(StringComparer.OrdinalIgnoreCase) { "nvme" } }, + new() { PeerId = "n4" }, + }; + + var policy = new PlacementPolicy + { + ExcludeTags = new(StringComparer.OrdinalIgnoreCase) { "spinning" }, + }; + + var group = PlacementEngine.SelectPeerGroup("G", replicas: 3, peers, policy); + + group.Peers.ShouldNotContain("n2"); + group.Peers.Count.ShouldBe(3); + } + + [Fact] + public void PlacementEngine_exclude_tag_is_case_insensitive() + { + var peers = new List + { + new() { PeerId = "n1", Tags = new(StringComparer.OrdinalIgnoreCase) { "SLOW" } }, + new() { PeerId = "n2" }, + }; + + var policy = new PlacementPolicy + { + ExcludeTags = new(StringComparer.OrdinalIgnoreCase) { "slow" }, + }; + + var group = PlacementEngine.SelectPeerGroup("G", replicas: 1, peers, policy); + + group.Peers.ShouldNotContain("n1"); + group.Peers.ShouldContain("n2"); + } + + // --------------------------------------------------------------- + // B9: PlacementEngine — throws when not enough peers + // Go: jetstream_cluster.go selectPeerGroup insufficient peer error + // --------------------------------------------------------------- + + [Fact] + public void PlacementEngine_throws_when_not_enough_peers() + { + var peers = new List + { + new() { PeerId = "n1" }, + }; + + var act = () => PlacementEngine.SelectPeerGroup("G", replicas: 3, peers); + + act.ShouldThrow(); + } + + [Fact] + public void PlacementEngine_throws_when_filter_leaves_insufficient_peers() + { + var peers = new List + { + new() { PeerId = "n1", Cluster = "east" }, + new() { PeerId = "n2", Cluster = "east" }, + new() { PeerId = "n3", Cluster = "west" }, + }; + + var policy = new PlacementPolicy { Cluster = "east" }; + var act = () => PlacementEngine.SelectPeerGroup("G", replicas: 3, peers, policy); + + act.ShouldThrow(); + } + + [Fact] + public void PlacementEngine_throws_when_unavailable_peers_reduce_below_requested() + { + var peers = new List + { + new() { PeerId = "n1", Available = true }, + new() { PeerId = "n2", Available = false }, + new() { PeerId = "n3", Available = false }, + }; + + var act = () => PlacementEngine.SelectPeerGroup("G", replicas: 2, peers); + + act.ShouldThrow(); + } + + // --------------------------------------------------------------- + // B9: PlacementEngine — sorts by available storage descending + // Go: jetstream_cluster.go selectPeerGroup storage sort + // --------------------------------------------------------------- + + [Fact] + public void PlacementEngine_sorts_by_available_storage_descending() + { + var peers = new List + { + new() { PeerId = "small", AvailableStorage = 100 }, + new() { PeerId = "large", AvailableStorage = 10_000 }, + new() { PeerId = "medium", AvailableStorage = 500 }, + }; + + var group = PlacementEngine.SelectPeerGroup("G", replicas: 2, peers); + + // Should pick the two with most storage: large and medium. + group.Peers.ShouldContain("large"); + group.Peers.ShouldContain("medium"); + group.Peers.ShouldNotContain("small"); + } + + [Fact] + public void PlacementEngine_unavailable_peers_are_excluded() + { + var peers = new List + { + new() { PeerId = "online1", Available = true }, + new() { PeerId = "offline1", Available = false }, + new() { PeerId = "online2", Available = true }, + }; + + var group = PlacementEngine.SelectPeerGroup("G", replicas: 2, peers); + + group.Peers.ShouldContain("online1"); + group.Peers.ShouldContain("online2"); + group.Peers.ShouldNotContain("offline1"); + } + + [Fact] + public void PlacementEngine_no_policy_selects_all_available_up_to_replicas() + { + var peers = new List + { + new() { PeerId = "n1" }, + new() { PeerId = "n2" }, + new() { PeerId = "n3" }, + }; + + var group = PlacementEngine.SelectPeerGroup("G", replicas: 3, peers); + + group.Peers.Count.ShouldBe(3); + } +}