- JetStreamMetaGroup: validated proposals, inflight tracking, consumer counting, ApplyEntry dispatch - StreamReplicaGroup: ProposeMessageAsync, LeaderChanged event, message/sequence tracking, GetStatus - PlacementEngine tests: cluster affinity, tag filtering, storage ordering (16 tests) - Assignment serialization tests: quorum calc, has-quorum, property defaults (16 tests) - MetaGroup proposal tests: stream/consumer CRUD, leader validation, inflight (30 tests) - StreamRaftGroup tests: message proposals, step-down events, status (10 tests) - RAFT Go parity tests + JetStream cluster Go parity tests (partial B11 pre-work)
310 lines
11 KiB
C#
310 lines
11 KiB
C#
// Go parity: golang/nats-server/server/jetstream_cluster.go:7212 selectPeerGroup
|
|
// Covers: PlacementEngine peer selection with cluster affinity, tag filtering,
|
|
// exclude-tag filtering, unavailable peer exclusion, storage-based ordering,
|
|
// single replica selection, and combined policy filtering.
|
|
using NATS.Server.JetStream.Cluster;
|
|
|
|
namespace NATS.Server.Tests.JetStream.Cluster;
|
|
|
|
/// <summary>
|
|
/// Tests for PlacementEngine topology-aware peer selection.
|
|
/// Go reference: jetstream_cluster.go:7212 selectPeerGroup.
|
|
/// </summary>
|
|
public class PlacementEngineTests
|
|
{
|
|
// ---------------------------------------------------------------
|
|
// Basic selection with enough peers
|
|
// Go reference: jetstream_cluster.go selectPeerGroup base case
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public void Basic_selection_with_enough_peers()
|
|
{
|
|
var peers = CreatePeers(5);
|
|
|
|
var group = PlacementEngine.SelectPeerGroup("test-group", 3, peers);
|
|
|
|
group.Name.ShouldBe("test-group");
|
|
group.Peers.Count.ShouldBe(3);
|
|
}
|
|
|
|
[Fact]
|
|
public void Selection_returns_exact_replica_count()
|
|
{
|
|
var peers = CreatePeers(10);
|
|
|
|
var group = PlacementEngine.SelectPeerGroup("exact", 5, peers);
|
|
|
|
group.Peers.Count.ShouldBe(5);
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Insufficient peers throws
|
|
// Go reference: jetstream_cluster.go not enough peers error
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public void Insufficient_peers_throws()
|
|
{
|
|
var peers = CreatePeers(2);
|
|
|
|
Should.Throw<InvalidOperationException>(
|
|
() => PlacementEngine.SelectPeerGroup("fail", 5, peers));
|
|
}
|
|
|
|
[Fact]
|
|
public void Zero_peers_with_replicas_throws()
|
|
{
|
|
var group = Should.Throw<InvalidOperationException>(
|
|
() => PlacementEngine.SelectPeerGroup("empty", 1, []));
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Cluster affinity filtering
|
|
// Go reference: jetstream_cluster.go cluster affinity in placement
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public void Cluster_affinity_selects_only_matching_cluster()
|
|
{
|
|
var peers = new List<PeerInfo>
|
|
{
|
|
new() { PeerId = "p1", Cluster = "us-east" },
|
|
new() { PeerId = "p2", Cluster = "us-west" },
|
|
new() { PeerId = "p3", Cluster = "us-east" },
|
|
new() { PeerId = "p4", Cluster = "us-east" },
|
|
new() { PeerId = "p5", Cluster = "eu-west" },
|
|
};
|
|
var policy = new PlacementPolicy { Cluster = "us-east" };
|
|
|
|
var group = PlacementEngine.SelectPeerGroup("cluster", 3, peers, policy);
|
|
|
|
group.Peers.Count.ShouldBe(3);
|
|
group.Peers.ShouldAllBe(id => id.StartsWith("p1") || id.StartsWith("p3") || id.StartsWith("p4"));
|
|
}
|
|
|
|
[Fact]
|
|
public void Cluster_affinity_is_case_insensitive()
|
|
{
|
|
var peers = new List<PeerInfo>
|
|
{
|
|
new() { PeerId = "p1", Cluster = "US-East" },
|
|
new() { PeerId = "p2", Cluster = "us-east" },
|
|
};
|
|
var policy = new PlacementPolicy { Cluster = "us-east" };
|
|
|
|
var group = PlacementEngine.SelectPeerGroup("ci", 2, peers, policy);
|
|
|
|
group.Peers.Count.ShouldBe(2);
|
|
}
|
|
|
|
[Fact]
|
|
public void Cluster_affinity_with_insufficient_matching_throws()
|
|
{
|
|
var peers = new List<PeerInfo>
|
|
{
|
|
new() { PeerId = "p1", Cluster = "us-east" },
|
|
new() { PeerId = "p2", Cluster = "us-west" },
|
|
};
|
|
var policy = new PlacementPolicy { Cluster = "us-east" };
|
|
|
|
Should.Throw<InvalidOperationException>(
|
|
() => PlacementEngine.SelectPeerGroup("fail", 2, peers, policy));
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Tag filtering (include and exclude)
|
|
// Go reference: jetstream_cluster.go tag-based filtering
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public void Tag_filtering_selects_peers_with_all_required_tags()
|
|
{
|
|
var peers = new List<PeerInfo>
|
|
{
|
|
new() { PeerId = "p1", Tags = ["ssd", "fast"] },
|
|
new() { PeerId = "p2", Tags = ["ssd"] },
|
|
new() { PeerId = "p3", Tags = ["ssd", "fast", "gpu"] },
|
|
new() { PeerId = "p4", Tags = ["hdd"] },
|
|
};
|
|
var policy = new PlacementPolicy { Tags = ["ssd", "fast"] };
|
|
|
|
var group = PlacementEngine.SelectPeerGroup("tags", 2, peers, policy);
|
|
|
|
group.Peers.Count.ShouldBe(2);
|
|
group.Peers.ShouldContain("p1");
|
|
group.Peers.ShouldContain("p3");
|
|
}
|
|
|
|
[Fact]
|
|
public void Exclude_tag_filtering_removes_peers_with_excluded_tags()
|
|
{
|
|
var peers = new List<PeerInfo>
|
|
{
|
|
new() { PeerId = "p1", Tags = ["ssd"] },
|
|
new() { PeerId = "p2", Tags = ["ssd", "deprecated"] },
|
|
new() { PeerId = "p3", Tags = ["ssd"] },
|
|
};
|
|
var policy = new PlacementPolicy { ExcludeTags = ["deprecated"] };
|
|
|
|
var group = PlacementEngine.SelectPeerGroup("excl", 2, peers, policy);
|
|
|
|
group.Peers.Count.ShouldBe(2);
|
|
group.Peers.ShouldNotContain("p2");
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Unavailable peers excluded
|
|
// Go reference: jetstream_cluster.go offline peer filter
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public void Unavailable_peers_are_excluded()
|
|
{
|
|
var peers = new List<PeerInfo>
|
|
{
|
|
new() { PeerId = "p1", Available = true },
|
|
new() { PeerId = "p2", Available = false },
|
|
new() { PeerId = "p3", Available = true },
|
|
new() { PeerId = "p4", Available = false },
|
|
};
|
|
|
|
var group = PlacementEngine.SelectPeerGroup("avail", 2, peers);
|
|
|
|
group.Peers.Count.ShouldBe(2);
|
|
group.Peers.ShouldContain("p1");
|
|
group.Peers.ShouldContain("p3");
|
|
}
|
|
|
|
[Fact]
|
|
public void All_unavailable_throws()
|
|
{
|
|
var peers = new List<PeerInfo>
|
|
{
|
|
new() { PeerId = "p1", Available = false },
|
|
new() { PeerId = "p2", Available = false },
|
|
};
|
|
|
|
Should.Throw<InvalidOperationException>(
|
|
() => PlacementEngine.SelectPeerGroup("fail", 1, peers));
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Peers ordered by available storage
|
|
// Go reference: jetstream_cluster.go storage-based ordering
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public void Peers_ordered_by_available_storage_descending()
|
|
{
|
|
var peers = new List<PeerInfo>
|
|
{
|
|
new() { PeerId = "low", AvailableStorage = 100 },
|
|
new() { PeerId = "high", AvailableStorage = 10000 },
|
|
new() { PeerId = "mid", AvailableStorage = 5000 },
|
|
};
|
|
|
|
var group = PlacementEngine.SelectPeerGroup("storage", 2, peers);
|
|
|
|
// Should pick high and mid (top 2 by storage)
|
|
group.Peers[0].ShouldBe("high");
|
|
group.Peers[1].ShouldBe("mid");
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Single replica selection
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public void Single_replica_selection()
|
|
{
|
|
var peers = CreatePeers(5);
|
|
|
|
var group = PlacementEngine.SelectPeerGroup("single", 1, peers);
|
|
|
|
group.Peers.Count.ShouldBe(1);
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Policy with all filters combined
|
|
// Go reference: jetstream_cluster.go combined placement policy
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public void Combined_policy_filters_applied_together()
|
|
{
|
|
var peers = new List<PeerInfo>
|
|
{
|
|
new() { PeerId = "p1", Cluster = "us-east", Tags = ["ssd"], Available = true, AvailableStorage = 5000 },
|
|
new() { PeerId = "p2", Cluster = "us-east", Tags = ["ssd", "old"], Available = true, AvailableStorage = 8000 },
|
|
new() { PeerId = "p3", Cluster = "us-west", Tags = ["ssd"], Available = true, AvailableStorage = 9000 },
|
|
new() { PeerId = "p4", Cluster = "us-east", Tags = ["ssd"], Available = false, AvailableStorage = 10000 },
|
|
new() { PeerId = "p5", Cluster = "us-east", Tags = ["ssd"], Available = true, AvailableStorage = 7000 },
|
|
new() { PeerId = "p6", Cluster = "us-east", Tags = ["hdd"], Available = true, AvailableStorage = 12000 },
|
|
};
|
|
var policy = new PlacementPolicy
|
|
{
|
|
Cluster = "us-east",
|
|
Tags = ["ssd"],
|
|
ExcludeTags = ["old"],
|
|
};
|
|
|
|
// After filtering: p1 (5000), p5 (7000) — p2 excluded (old tag), p3 (wrong cluster), p4 (unavailable), p6 (no ssd tag)
|
|
var group = PlacementEngine.SelectPeerGroup("combined", 2, peers, policy);
|
|
|
|
group.Peers.Count.ShouldBe(2);
|
|
// Ordered by storage descending: p5 (7000) first, p1 (5000) second
|
|
group.Peers[0].ShouldBe("p5");
|
|
group.Peers[1].ShouldBe("p1");
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Null policy is allowed (no filtering)
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public void Null_policy_selects_without_filtering()
|
|
{
|
|
var peers = CreatePeers(3);
|
|
|
|
var group = PlacementEngine.SelectPeerGroup("nofilter", 3, peers, policy: null);
|
|
|
|
group.Peers.Count.ShouldBe(3);
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Empty policy fields are ignored
|
|
// ---------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public void Empty_policy_cluster_is_ignored()
|
|
{
|
|
var peers = new List<PeerInfo>
|
|
{
|
|
new() { PeerId = "p1", Cluster = "us-east" },
|
|
new() { PeerId = "p2", Cluster = "us-west" },
|
|
};
|
|
var policy = new PlacementPolicy { Cluster = "" };
|
|
|
|
var group = PlacementEngine.SelectPeerGroup("empty-cluster", 2, peers, policy);
|
|
|
|
group.Peers.Count.ShouldBe(2);
|
|
}
|
|
|
|
// ---------------------------------------------------------------
|
|
// Helpers
|
|
// ---------------------------------------------------------------
|
|
|
|
private static List<PeerInfo> CreatePeers(int count)
|
|
{
|
|
return Enumerable.Range(1, count)
|
|
.Select(i => new PeerInfo
|
|
{
|
|
PeerId = $"peer-{i}",
|
|
Available = true,
|
|
AvailableStorage = long.MaxValue - i,
|
|
})
|
|
.ToList();
|
|
}
|
|
}
|