From a41d0f453cf25142acff2d436a451ba5cf8a6405 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 24 Feb 2026 17:09:32 -0500 Subject: [PATCH] feat(cluster): add placement engine with topology-aware peer selection (B9-prep) --- .../JetStream/Cluster/PlacementEngine.cs | 80 +++++++++++++++++++ 1 file changed, 80 insertions(+) create mode 100644 src/NATS.Server/JetStream/Cluster/PlacementEngine.cs diff --git a/src/NATS.Server/JetStream/Cluster/PlacementEngine.cs b/src/NATS.Server/JetStream/Cluster/PlacementEngine.cs new file mode 100644 index 0000000..c752dc1 --- /dev/null +++ b/src/NATS.Server/JetStream/Cluster/PlacementEngine.cs @@ -0,0 +1,80 @@ +namespace NATS.Server.JetStream.Cluster; + +/// +/// Topology-aware peer selection for stream/consumer replica placement. +/// Go reference: jetstream_cluster.go:7212 selectPeerGroup. +/// +public static class PlacementEngine +{ + /// + /// Selects peers for a new replica group based on available nodes, tags, and cluster affinity. + /// Filters unavailable peers, applies cluster/tag/exclude-tag policy, then picks the top N + /// peers ordered by available storage descending. + /// + public static RaftGroup SelectPeerGroup( + string groupName, + int replicas, + IReadOnlyList availablePeers, + PlacementPolicy? policy = null) + { + // 1. Filter out unavailable peers. + IEnumerable candidates = availablePeers.Where(p => p.Available); + + // 2. If policy has Cluster, filter to matching cluster. + if (policy?.Cluster is { Length: > 0 } cluster) + candidates = candidates.Where(p => string.Equals(p.Cluster, cluster, StringComparison.OrdinalIgnoreCase)); + + // 3. If policy has Tags, filter to peers that have ALL required tags. + if (policy?.Tags is { Count: > 0 } requiredTags) + candidates = candidates.Where(p => requiredTags.All(tag => p.Tags.Contains(tag))); + + // 4. If policy has ExcludeTags, filter out peers with any of those tags. + if (policy?.ExcludeTags is { Count: > 0 } excludeTags) + candidates = candidates.Where(p => !excludeTags.Any(tag => p.Tags.Contains(tag))); + + // 5. If not enough peers after filtering, throw InvalidOperationException. + var filtered = candidates.ToList(); + if (filtered.Count < replicas) + throw new InvalidOperationException( + $"Not enough peers available to satisfy replica count {replicas}. " + + $"Available after policy filtering: {filtered.Count}."); + + // 6. Sort remaining by available storage descending. + var selected = filtered + .OrderByDescending(p => p.AvailableStorage) + .Take(replicas) + .Select(p => p.PeerId) + .ToList(); + + // 7. Return RaftGroup with selected peer IDs. + return new RaftGroup + { + Name = groupName, + Peers = selected, + }; + } +} + +/// +/// Describes a peer node available for placement consideration. +/// Go reference: jetstream_cluster.go peerInfo — peer.id, peer.offline, peer.storage. +/// +public sealed class PeerInfo +{ + public required string PeerId { get; init; } + public string Cluster { get; set; } = string.Empty; + public HashSet Tags { get; init; } = new(StringComparer.OrdinalIgnoreCase); + public bool Available { get; set; } = true; + public long AvailableStorage { get; set; } = long.MaxValue; +} + +/// +/// Placement policy specifying cluster affinity and tag constraints. +/// Go reference: jetstream_cluster.go Placement struct — cluster, tags. +/// +public sealed class PlacementPolicy +{ + public string? Cluster { get; set; } + public HashSet? Tags { get; set; } + public HashSet? ExcludeTags { get; set; } +}