diff --git a/src/NATS.Server/JetStream/Cluster/PlacementEngine.cs b/src/NATS.Server/JetStream/Cluster/PlacementEngine.cs
index c752dc1..f937560 100644
--- a/src/NATS.Server/JetStream/Cluster/PlacementEngine.cs
+++ b/src/NATS.Server/JetStream/Cluster/PlacementEngine.cs
@@ -7,15 +7,36 @@ namespace NATS.Server.JetStream.Cluster;
public static class PlacementEngine
{
///
- /// Selects peers for a new replica group based on available nodes, tags, and cluster affinity.
- /// Filters unavailable peers, applies cluster/tag/exclude-tag policy, then picks the top N
- /// peers ordered by available storage descending.
+ /// Default cost in bytes subtracted from AvailableStorage per assigned asset
+ /// when computing weighted placement scores.
+ /// Go reference: jetstream_cluster.go:7469 sort by avail then ns (number of streams).
+ ///
+ public const long DefaultAssetCostWeight = 1_073_741_824L; // 1 GiB
+
+ ///
+ /// Selects peers for a new replica group based on available nodes, tags, cluster
+ /// affinity, HA asset limits, and weighted scoring.
+ ///
+ /// Selection pipeline:
+ /// 1. Filter out unavailable peers.
+ /// 2. Apply cluster affinity filter.
+ /// 3. Apply required tag filter.
+ /// 4. Apply exclude-tag filter.
+ /// 5. Separate overloaded peers (CurrentAssets >= MaxAssetsPerPeer when MaxAssetsPerPeer > 0)
+ /// from preferred candidates.
+ /// 6. Within each candidate group, sort by weighted score descending:
+ /// score = AvailableStorage - (CurrentAssets * assetCostWeight)
+ /// 7. Apply UniqueTag constraint: greedily select from scored list, skipping any
+ /// peer whose tag-value for the UniqueTag prefix is already represented.
+ /// Overloaded peers are tried only after preferred candidates are exhausted.
+ /// 8. Throw InvalidOperationException if fewer than replicas peers can be selected.
///
public static RaftGroup SelectPeerGroup(
string groupName,
int replicas,
IReadOnlyList availablePeers,
- PlacementPolicy? policy = null)
+ PlacementPolicy? policy = null,
+ long assetCostWeight = DefaultAssetCostWeight)
{
// 1. Filter out unavailable peers.
IEnumerable candidates = availablePeers.Where(p => p.Available);
@@ -32,32 +53,151 @@ public static class PlacementEngine
if (policy?.ExcludeTags is { Count: > 0 } excludeTags)
candidates = candidates.Where(p => !excludeTags.Any(tag => p.Tags.Contains(tag)));
- // 5. If not enough peers after filtering, throw InvalidOperationException.
var filtered = candidates.ToList();
- if (filtered.Count < replicas)
- throw new InvalidOperationException(
- $"Not enough peers available to satisfy replica count {replicas}. " +
- $"Available after policy filtering: {filtered.Count}.");
- // 6. Sort remaining by available storage descending.
- var selected = filtered
- .OrderByDescending(p => p.AvailableStorage)
- .Take(replicas)
- .Select(p => p.PeerId)
- .ToList();
+ // 5. Separate preferred candidates from overloaded (HA-limited) ones.
+ // Overloaded peers are deprioritized but NOT excluded entirely — they serve
+ // as a fallback when no preferred options remain.
+ // Go reference: jetstream_cluster.go:7428 maxHaAssets check (deprioritize).
+ var (preferred, overloaded) = SplitByHaLimit(filtered);
- // 7. Return RaftGroup with selected peer IDs.
+ // 6. Sort each group by weighted score descending.
+ // score = AvailableStorage - (CurrentAssets * assetCostWeight)
+ // Go reference: jetstream_cluster.go:7469 sort by avail then ns.
+ var sortedPreferred = SortByScore(preferred, assetCostWeight);
+ var sortedOverloaded = SortByScore(overloaded, assetCostWeight);
+
+ // 7. Apply UniqueTag constraint (if set) via greedy selection over the
+ // combined ordered list: preferred first, overloaded as fallback.
+ // Go reference: jetstream_cluster.go:7251 uniqueTagPrefix / checkUniqueTag.
+ string? uniqueTagPrefix = policy?.UniqueTag is { Length: > 0 } ut ? ut : null;
+
+ List selected;
+ if (uniqueTagPrefix is not null)
+ {
+ selected = SelectWithUniqueTag(
+ sortedPreferred, sortedOverloaded, replicas, uniqueTagPrefix);
+ }
+ else
+ {
+ // No unique-tag constraint — just take the top N from the combined list.
+ var all = sortedPreferred.Concat(sortedOverloaded).ToList();
+ if (all.Count < replicas)
+ ThrowInsufficientPeers(replicas, all.Count);
+
+ selected = all.Take(replicas).Select(p => p.PeerId).ToList();
+ }
+
+ // 8. Return RaftGroup with selected peer IDs.
return new RaftGroup
{
Name = groupName,
Peers = selected,
};
}
+
+ // ---------------------------------------------------------------
+ // Private helpers
+ // ---------------------------------------------------------------
+
+ ///
+ /// Splits candidates into (preferred, overloaded) where overloaded peers have
+ /// CurrentAssets >= MaxAssetsPerPeer (when MaxAssetsPerPeer > 0).
+ ///
+ private static (List preferred, List overloaded) SplitByHaLimit(
+ List peers)
+ {
+ var preferred = new List(peers.Count);
+ var overloaded = new List();
+
+ foreach (var p in peers)
+ {
+ bool isOverloaded = p.MaxAssetsPerPeer > 0 && p.CurrentAssets >= p.MaxAssetsPerPeer;
+ if (isOverloaded)
+ overloaded.Add(p);
+ else
+ preferred.Add(p);
+ }
+
+ return (preferred, overloaded);
+ }
+
+ ///
+ /// Sorts peers by weighted score descending.
+ /// score = AvailableStorage - (CurrentAssets * assetCostWeight)
+ ///
+ private static List SortByScore(List peers, long assetCostWeight) =>
+ [.. peers.OrderByDescending(p => p.AvailableStorage - (p.CurrentAssets * assetCostWeight))];
+
+ ///
+ /// Greedy unique-tag selection.
+ /// Iterates over preferred peers first, then overloaded peers as fallback.
+ /// For each candidate, the peer is accepted only if no previously selected peer
+ /// shares the same value for the tag that starts with .
+ /// A peer without any matching tag is rejected (same as Go behaviour: the prefix
+ /// must be present).
+ /// Go reference: jetstream_cluster.go:7263 checkUniqueTag.
+ ///
+ private static List SelectWithUniqueTag(
+ IEnumerable preferred,
+ IEnumerable overloaded,
+ int replicas,
+ string uniqueTagPrefix)
+ {
+ var seenTagValues = new HashSet(StringComparer.OrdinalIgnoreCase);
+ var selected = new List(replicas);
+
+ TrySelect(preferred);
+ if (selected.Count < replicas)
+ TrySelect(overloaded);
+
+ if (selected.Count < replicas)
+ ThrowInsufficientPeers(replicas, selected.Count);
+
+ return selected;
+
+ void TrySelect(IEnumerable source)
+ {
+ foreach (var peer in source)
+ {
+ if (selected.Count >= replicas)
+ break;
+
+ // Find the first tag on this peer that starts with the unique prefix.
+ // Go reference: jetstream_cluster.go:7264 strings.HasPrefix(t, uniqueTagPrefix)
+ string? tagValue = null;
+ foreach (var tag in peer.Tags)
+ {
+ if (tag.StartsWith(uniqueTagPrefix, StringComparison.OrdinalIgnoreCase))
+ {
+ tagValue = tag;
+ break;
+ }
+ }
+
+ // Peer has no matching tag → reject (same as Go: "unique prefix not present").
+ if (tagValue is null)
+ continue;
+
+ // Tag value already claimed by a previously selected peer → reject.
+ if (!seenTagValues.Add(tagValue))
+ continue;
+
+ selected.Add(peer.PeerId);
+ }
+ }
+ }
+
+ private static void ThrowInsufficientPeers(int required, int available) =>
+ throw new InvalidOperationException(
+ $"Not enough peers available to satisfy replica count {required}. " +
+ $"Available after policy filtering: {available}.");
}
///
/// Describes a peer node available for placement consideration.
-/// Go reference: jetstream_cluster.go peerInfo — peer.id, peer.offline, peer.storage.
+/// Go reference: jetstream_cluster.go peerInfo — peer.id, peer.offline, peer.storage,
+/// peer HAAssets, peer streams count.
///
public sealed class PeerInfo
{
@@ -66,15 +206,38 @@ public sealed class PeerInfo
public HashSet Tags { get; init; } = new(StringComparer.OrdinalIgnoreCase);
public bool Available { get; set; } = true;
public long AvailableStorage { get; set; } = long.MaxValue;
+
+ ///
+ /// Number of assets (streams/consumers) currently assigned to this peer.
+ /// Go reference: jetstream_cluster.go:7311 peerStreams / peerHA maps.
+ ///
+ public int CurrentAssets { get; set; }
+
+ ///
+ /// Maximum HA assets this peer should host before being deprioritized.
+ /// 0 means unlimited (no deprioritization).
+ /// Go reference: jetstream_cluster.go:7328 maxHaAssets.
+ ///
+ public int MaxAssetsPerPeer { get; set; }
}
///
-/// Placement policy specifying cluster affinity and tag constraints.
-/// Go reference: jetstream_cluster.go Placement struct — cluster, tags.
+/// Placement policy specifying cluster affinity, tag constraints, and unique-tag enforcement.
+/// Go reference: jetstream_cluster.go Placement struct — cluster, tags;
+/// server opts JetStreamUniqueTag.
///
public sealed class PlacementPolicy
{
public string? Cluster { get; set; }
public HashSet? Tags { get; set; }
public HashSet? ExcludeTags { get; set; }
+
+ ///
+ /// Tag prefix used to enforce AZ/rack uniqueness across replicas.
+ /// When set, no two replicas may share the same value for a tag that starts
+ /// with this prefix (e.g., "az" matches "az:us-east-1a").
+ /// Null or empty string disables the constraint.
+ /// Go reference: jetstream_cluster.go:7251 JetStreamUniqueTag / uniqueTagPrefix.
+ ///
+ public string? UniqueTag { get; set; }
}
diff --git a/tests/NATS.Server.Tests/JetStream/Cluster/TopologyPlacementTests.cs b/tests/NATS.Server.Tests/JetStream/Cluster/TopologyPlacementTests.cs
new file mode 100644
index 0000000..74100b5
--- /dev/null
+++ b/tests/NATS.Server.Tests/JetStream/Cluster/TopologyPlacementTests.cs
@@ -0,0 +1,246 @@
+// Go parity: golang/nats-server/server/jetstream_cluster.go:7212 selectPeerGroup
+// Covers: UniqueTag enforcement, HA asset limits, weighted scoring by available resources.
+using NATS.Server.JetStream.Cluster;
+
+namespace NATS.Server.Tests.JetStream.Cluster;
+
+///
+/// Tests for topology-aware placement: JetStreamUniqueTag enforcement,
+/// MaxAssetsPerPeer HA limits, and weighted scoring.
+/// Go reference: jetstream_cluster.go:7212 selectPeerGroup (uniqueTagPrefix, maxHaAssets, weighted sort).
+///
+public class TopologyPlacementTests
+{
+ // ---------------------------------------------------------------
+ // UniqueTag enforcement
+ // Go reference: jetstream_cluster.go:7251 uniqueTagPrefix / checkUniqueTag
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public void UniqueTag_prevents_same_tag_value_replicas()
+ {
+ // 3 peers: p1 and p2 in az:us-east-1a, p3 in az:us-east-1b.
+ // R=2 with UniqueTag="az" must pick one from each AZ.
+ var peers = new List
+ {
+ new() { PeerId = "p1", Tags = ["az:us-east-1a"], AvailableStorage = 1000 },
+ new() { PeerId = "p2", Tags = ["az:us-east-1a"], AvailableStorage = 2000 },
+ new() { PeerId = "p3", Tags = ["az:us-east-1b"], AvailableStorage = 900 },
+ };
+ var policy = new PlacementPolicy { UniqueTag = "az" };
+
+ var group = PlacementEngine.SelectPeerGroup("az-group", 2, peers, policy);
+
+ group.Peers.Count.ShouldBe(2);
+ // One peer must be from az:us-east-1a and one from az:us-east-1b.
+ var selectedPeers = peers.Where(p => group.Peers.Contains(p.PeerId)).ToList();
+ var azValues = selectedPeers
+ .SelectMany(p => p.Tags)
+ .Where(t => t.StartsWith("az:", StringComparison.OrdinalIgnoreCase))
+ .ToList();
+ azValues.Distinct(StringComparer.OrdinalIgnoreCase).Count().ShouldBe(2);
+ }
+
+ [Fact]
+ public void UniqueTag_throws_when_not_enough_unique_values()
+ {
+ // All 3 peers share the same AZ tag; R=2 requires 2 unique AZ values → impossible.
+ var peers = new List
+ {
+ new() { PeerId = "p1", Tags = ["az:us-east-1a"] },
+ new() { PeerId = "p2", Tags = ["az:us-east-1a"] },
+ new() { PeerId = "p3", Tags = ["az:us-east-1a"] },
+ };
+ var policy = new PlacementPolicy { UniqueTag = "az" };
+
+ Should.Throw(
+ () => PlacementEngine.SelectPeerGroup("fail", 2, peers, policy));
+ }
+
+ [Fact]
+ public void Tag_prefix_matching_for_unique_constraint()
+ {
+ // UniqueTag="az" should match tags like "az:us-east-1a", "az:us-west-2b", etc.
+ // Go reference: jetstream_cluster.go:7265 strings.HasPrefix(t, uniqueTagPrefix)
+ var peers = new List
+ {
+ new() { PeerId = "p1", Tags = ["az:us-east-1a", "ssd"] },
+ new() { PeerId = "p2", Tags = ["az:us-west-2b", "ssd"] },
+ new() { PeerId = "p3", Tags = ["az:eu-central-1a", "ssd"] },
+ };
+ var policy = new PlacementPolicy { UniqueTag = "az" };
+
+ var group = PlacementEngine.SelectPeerGroup("prefix", 3, peers, policy);
+
+ group.Peers.Count.ShouldBe(3);
+ group.Peers.ShouldContain("p1");
+ group.Peers.ShouldContain("p2");
+ group.Peers.ShouldContain("p3");
+ }
+
+ [Fact]
+ public void Empty_unique_tag_ignored()
+ {
+ // UniqueTag="" or null → no unique constraint applied, normal selection.
+ // Go reference: jetstream_cluster.go:7252 if uniqueTagPrefix != _EMPTY_
+ var peers = new List
+ {
+ new() { PeerId = "p1", Tags = ["az:us-east-1a"] },
+ new() { PeerId = "p2", Tags = ["az:us-east-1a"] },
+ new() { PeerId = "p3", Tags = ["az:us-east-1a"] },
+ };
+
+ // No UniqueTag policy — all 3 peers are valid, R=3 should succeed.
+ var groupNull = PlacementEngine.SelectPeerGroup("no-unique-null", 3, peers, policy: null);
+ groupNull.Peers.Count.ShouldBe(3);
+
+ // Empty string UniqueTag → treated as disabled.
+ var policy = new PlacementPolicy { UniqueTag = "" };
+ var groupEmpty = PlacementEngine.SelectPeerGroup("no-unique-empty", 3, peers, policy);
+ groupEmpty.Peers.Count.ShouldBe(3);
+ }
+
+ [Fact]
+ public void UniqueTag_combined_with_cluster_filter()
+ {
+ // Both cluster filter and UniqueTag must be applied together.
+ // Go reference: jetstream_cluster.go:7346 cluster check before uniqueTag check
+ var peers = new List
+ {
+ new() { PeerId = "p1", Cluster = "us-east", Tags = ["az:us-east-1a"] },
+ new() { PeerId = "p2", Cluster = "us-east", Tags = ["az:us-east-1a"] },
+ new() { PeerId = "p3", Cluster = "us-east", Tags = ["az:us-east-1b"] },
+ new() { PeerId = "p4", Cluster = "us-west", Tags = ["az:us-west-2a"] },
+ };
+ var policy = new PlacementPolicy { Cluster = "us-east", UniqueTag = "az" };
+
+ // Only p1/p2/p3 are in us-east; UniqueTag="az" → picks one from 1a and one from 1b.
+ var group = PlacementEngine.SelectPeerGroup("combo", 2, peers, policy);
+
+ group.Peers.Count.ShouldBe(2);
+ group.Peers.ShouldNotContain("p4");
+ var selectedPeers = peers.Where(p => group.Peers.Contains(p.PeerId)).ToList();
+ var azValues = selectedPeers
+ .SelectMany(p => p.Tags)
+ .Where(t => t.StartsWith("az:", StringComparison.OrdinalIgnoreCase))
+ .Distinct(StringComparer.OrdinalIgnoreCase)
+ .ToList();
+ azValues.Count.ShouldBe(2);
+ }
+
+ // ---------------------------------------------------------------
+ // MaxAssetsPerPeer HA limit deprioritization
+ // Go reference: jetstream_cluster.go:7428 maxHaAssets check (deprioritize vs hard exclude)
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public void MaxAssetsPerPeer_deprioritizes_overloaded_peers()
+ {
+ // p1 is at its asset limit but p2 and p3 are not.
+ // With enough non-overloaded candidates, overloaded peer should not be selected.
+ var peers = new List
+ {
+ new() { PeerId = "p1", AvailableStorage = 10_000, CurrentAssets = 5, MaxAssetsPerPeer = 5 },
+ new() { PeerId = "p2", AvailableStorage = 8_000, CurrentAssets = 1, MaxAssetsPerPeer = 5 },
+ new() { PeerId = "p3", AvailableStorage = 6_000, CurrentAssets = 0, MaxAssetsPerPeer = 5 },
+ };
+
+ var group = PlacementEngine.SelectPeerGroup("ha-limit", 2, peers);
+
+ // p1 is deprioritized (at max), so p2 and p3 should be selected over p1.
+ group.Peers.Count.ShouldBe(2);
+ group.Peers.ShouldContain("p2");
+ group.Peers.ShouldContain("p3");
+ group.Peers.ShouldNotContain("p1");
+ }
+
+ [Fact]
+ public void MaxAssetsPerPeer_still_used_when_no_alternatives()
+ {
+ // All peers are at their HA asset limit, but we must still select from them.
+ // Go reference: jetstream_cluster.go — deprioritize (move to end), not hard exclude.
+ var peers = new List
+ {
+ new() { PeerId = "p1", AvailableStorage = 1000, CurrentAssets = 3, MaxAssetsPerPeer = 3 },
+ new() { PeerId = "p2", AvailableStorage = 900, CurrentAssets = 3, MaxAssetsPerPeer = 3 },
+ };
+
+ // Should succeed even though both peers are at max.
+ var group = PlacementEngine.SelectPeerGroup("ha-fallback", 2, peers);
+
+ group.Peers.Count.ShouldBe(2);
+ group.Peers.ShouldContain("p1");
+ group.Peers.ShouldContain("p2");
+ }
+
+ [Fact]
+ public void Zero_MaxAssets_means_unlimited()
+ {
+ // MaxAssetsPerPeer=0 → no asset limit, peer treated as not overloaded regardless of CurrentAssets.
+ var peers = new List
+ {
+ new() { PeerId = "p1", AvailableStorage = 5000, CurrentAssets = 100, MaxAssetsPerPeer = 0 },
+ new() { PeerId = "p2", AvailableStorage = 4000, CurrentAssets = 200, MaxAssetsPerPeer = 0 },
+ };
+
+ var group = PlacementEngine.SelectPeerGroup("unlimited", 2, peers);
+
+ group.Peers.Count.ShouldBe(2);
+ group.Peers.ShouldContain("p1");
+ group.Peers.ShouldContain("p2");
+ }
+
+ // ---------------------------------------------------------------
+ // Weighted score = AvailableStorage - (CurrentAssets * AssetCostWeight)
+ // Go reference: jetstream_cluster.go:7469 sort by avail then ns (stream count)
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public void Weighted_score_prefers_less_loaded_peers()
+ {
+ // p1: more storage but many assets → lower score
+ // p2: less storage but few assets → higher score
+ // With DefaultAssetCostWeight = 1GB, even a small difference in assets
+ // can overcome a moderate storage advantage.
+ const long gb = PlacementEngine.DefaultAssetCostWeight; // 1_073_741_824L
+ var peers = new List
+ {
+ // p1: score = 10*GB - 5*GB = 5*GB
+ new() { PeerId = "p1", AvailableStorage = 10 * gb, CurrentAssets = 5 },
+ // p2: score = 9*GB - 1*GB = 8*GB (wins despite less raw storage)
+ new() { PeerId = "p2", AvailableStorage = 9 * gb, CurrentAssets = 1 },
+ // p3: score = 3*GB - 0 = 3*GB
+ new() { PeerId = "p3", AvailableStorage = 3 * gb, CurrentAssets = 0 },
+ };
+
+ var group = PlacementEngine.SelectPeerGroup("weighted", 2, peers);
+
+ // p2 has the highest score (8*GB), p1 has second (5*GB).
+ group.Peers.Count.ShouldBe(2);
+ group.Peers[0].ShouldBe("p2");
+ group.Peers[1].ShouldBe("p1");
+ }
+
+ [Fact]
+ public void Weighted_score_with_custom_cost_weight()
+ {
+ // Verify score formula: score = AvailableStorage - (CurrentAssets * AssetCostWeight)
+ // Use a fixed, small cost weight to make the math obvious.
+ const long costWeight = 1000L;
+ var peers = new List
+ {
+ // score = 5000 - (3 * 1000) = 2000
+ new() { PeerId = "p1", AvailableStorage = 5000, CurrentAssets = 3 },
+ // score = 4000 - (0 * 1000) = 4000 (wins)
+ new() { PeerId = "p2", AvailableStorage = 4000, CurrentAssets = 0 },
+ // score = 6000 - (5 * 1000) = 1000 (loses)
+ new() { PeerId = "p3", AvailableStorage = 6000, CurrentAssets = 5 },
+ };
+
+ var group = PlacementEngine.SelectPeerGroup("custom-weight", 2, peers, assetCostWeight: costWeight);
+
+ group.Peers.Count.ShouldBe(2);
+ group.Peers[0].ShouldBe("p2"); // score 4000
+ group.Peers[1].ShouldBe("p1"); // score 2000
+ }
+}