// Go parity: golang/nats-server/server/jetstream_cluster.go
// Covers: RaftGroup quorum semantics, StreamAssignment/ConsumerAssignment initialization,
// JetStreamMetaGroup proposal workflow (create/delete stream + consumer), GetStreamAssignment,
// GetAllAssignments, and PlacementEngine peer selection with topology filtering.
using NATS.Server.JetStream.Cluster;
using NATS.Server.JetStream.Models;
namespace NATS.Server.Tests.JetStream.Cluster;
///
/// Tests for B7 (ClusterAssignmentTypes), B8 (JetStreamMetaGroup proposal workflow),
/// and B9 (PlacementEngine peer selection).
/// Go reference: jetstream_cluster.go raftGroup, streamAssignment, consumerAssignment,
/// selectPeerGroup (line 7212).
///
public class ClusterAssignmentAndPlacementTests
{
// ---------------------------------------------------------------
// B7: RaftGroup — quorum and HasQuorum
// Go: jetstream_cluster.go:154 raftGroup struct
// ---------------------------------------------------------------
[Fact]
public void RaftGroup_quorum_size_for_single_node_is_one()
{
var group = new RaftGroup
{
Name = "R1",
Peers = ["n1"],
};
group.QuorumSize.ShouldBe(1);
}
[Fact]
public void RaftGroup_quorum_size_for_three_nodes_is_two()
{
var group = new RaftGroup
{
Name = "R3",
Peers = ["n1", "n2", "n3"],
};
group.QuorumSize.ShouldBe(2);
}
[Fact]
public void RaftGroup_quorum_size_for_five_nodes_is_three()
{
var group = new RaftGroup
{
Name = "R5",
Peers = ["n1", "n2", "n3", "n4", "n5"],
};
group.QuorumSize.ShouldBe(3);
}
[Fact]
public void RaftGroup_has_quorum_with_majority_acks()
{
var group = new RaftGroup
{
Name = "R3",
Peers = ["n1", "n2", "n3"],
};
// Quorum = 2; 2 acks is sufficient.
group.HasQuorum(2).ShouldBeTrue();
}
[Fact]
public void RaftGroup_no_quorum_with_minority_acks()
{
var group = new RaftGroup
{
Name = "R3",
Peers = ["n1", "n2", "n3"],
};
// Quorum = 2; 1 ack is not sufficient.
group.HasQuorum(1).ShouldBeFalse();
}
[Fact]
public void RaftGroup_has_quorum_with_all_acks()
{
var group = new RaftGroup
{
Name = "R5",
Peers = ["n1", "n2", "n3", "n4", "n5"],
};
group.HasQuorum(5).ShouldBeTrue();
}
[Fact]
public void RaftGroup_no_quorum_with_zero_acks()
{
var group = new RaftGroup
{
Name = "R3",
Peers = ["n1", "n2", "n3"],
};
group.HasQuorum(0).ShouldBeFalse();
}
// ---------------------------------------------------------------
// B7: StreamAssignment — initialization and consumer tracking
// Go: jetstream_cluster.go:166 streamAssignment struct
// ---------------------------------------------------------------
[Fact]
public void StreamAssignment_initializes_with_empty_consumers()
{
var group = new RaftGroup { Name = "g1", Peers = ["n1", "n2", "n3"] };
var assignment = new StreamAssignment
{
StreamName = "ORDERS",
Group = group,
};
assignment.StreamName.ShouldBe("ORDERS");
assignment.Consumers.ShouldBeEmpty();
assignment.ConfigJson.ShouldBe("{}");
assignment.Responded.ShouldBeFalse();
assignment.Recovering.ShouldBeFalse();
assignment.Reassigning.ShouldBeFalse();
}
[Fact]
public void StreamAssignment_created_timestamp_is_recent()
{
var before = DateTime.UtcNow.AddSeconds(-1);
var group = new RaftGroup { Name = "g1", Peers = ["n1"] };
var assignment = new StreamAssignment
{
StreamName = "TS_STREAM",
Group = group,
};
var after = DateTime.UtcNow.AddSeconds(1);
assignment.Created.ShouldBeGreaterThan(before);
assignment.Created.ShouldBeLessThan(after);
}
[Fact]
public void StreamAssignment_consumers_dict_is_ordinal_keyed()
{
var group = new RaftGroup { Name = "g1", Peers = ["n1"] };
var assignment = new StreamAssignment
{
StreamName = "S",
Group = group,
};
var consGroup = new RaftGroup { Name = "cg", Peers = ["n1"] };
assignment.Consumers["ALPHA"] = new ConsumerAssignment
{
ConsumerName = "ALPHA",
StreamName = "S",
Group = consGroup,
};
assignment.Consumers.ContainsKey("ALPHA").ShouldBeTrue();
assignment.Consumers.ContainsKey("alpha").ShouldBeFalse();
}
// ---------------------------------------------------------------
// B7: ConsumerAssignment — initialization
// Go: jetstream_cluster.go:250 consumerAssignment struct
// ---------------------------------------------------------------
[Fact]
public void ConsumerAssignment_initializes_correctly()
{
var group = new RaftGroup { Name = "cg1", Peers = ["n1", "n2"] };
var assignment = new ConsumerAssignment
{
ConsumerName = "PUSH_CONSUMER",
StreamName = "EVENTS",
Group = group,
};
assignment.ConsumerName.ShouldBe("PUSH_CONSUMER");
assignment.StreamName.ShouldBe("EVENTS");
assignment.Group.ShouldBeSameAs(group);
assignment.ConfigJson.ShouldBe("{}");
assignment.Responded.ShouldBeFalse();
assignment.Recovering.ShouldBeFalse();
}
[Fact]
public void ConsumerAssignment_created_timestamp_is_recent()
{
var before = DateTime.UtcNow.AddSeconds(-1);
var group = new RaftGroup { Name = "cg", Peers = ["n1"] };
var assignment = new ConsumerAssignment
{
ConsumerName = "C",
StreamName = "S",
Group = group,
};
var after = DateTime.UtcNow.AddSeconds(1);
assignment.Created.ShouldBeGreaterThan(before);
assignment.Created.ShouldBeLessThan(after);
}
// ---------------------------------------------------------------
// B8: JetStreamMetaGroup — ProposeCreateStreamAsync with assignment
// Go: jetstream_cluster.go processStreamAssignment
// ---------------------------------------------------------------
[Fact]
public async Task ProposeCreateStream_with_group_stores_assignment()
{
var meta = new JetStreamMetaGroup(3);
var group = new RaftGroup { Name = "ORDERS_grp", Peers = ["n1", "n2", "n3"] };
await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "ORDERS" }, group, default);
var assignment = meta.GetStreamAssignment("ORDERS");
assignment.ShouldNotBeNull();
assignment!.StreamName.ShouldBe("ORDERS");
assignment.Group.Peers.Count.ShouldBe(3);
}
[Fact]
public async Task ProposeCreateStream_without_group_still_stores_assignment()
{
var meta = new JetStreamMetaGroup(3);
await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "NOGROUP" }, default);
var assignment = meta.GetStreamAssignment("NOGROUP");
assignment.ShouldNotBeNull();
assignment!.StreamName.ShouldBe("NOGROUP");
assignment.Group.ShouldNotBeNull();
}
[Fact]
public async Task ProposeCreateStream_also_appears_in_GetState_streams()
{
var meta = new JetStreamMetaGroup(3);
var group = new RaftGroup { Name = "g", Peers = ["n1"] };
await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "VISIBLE" }, group, default);
var state = meta.GetState();
state.Streams.ShouldContain("VISIBLE");
state.AssignmentCount.ShouldBe(1);
}
[Fact]
public async Task ProposeCreateStream_duplicate_is_idempotent()
{
var meta = new JetStreamMetaGroup(3);
var group = new RaftGroup { Name = "g", Peers = ["n1"] };
await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "DUP" }, group, default);
await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "DUP" }, group, default);
meta.GetAllAssignments().Count.ShouldBe(1);
meta.GetState().Streams.Count.ShouldBe(1);
}
// ---------------------------------------------------------------
// B8: JetStreamMetaGroup — ProposeDeleteStreamAsync
// Go: jetstream_cluster.go processStreamDelete
// ---------------------------------------------------------------
[Fact]
public async Task ProposeDeleteStream_removes_assignment_and_stream_name()
{
var meta = new JetStreamMetaGroup(3);
var group = new RaftGroup { Name = "g", Peers = ["n1"] };
await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "DELETEME" }, group, default);
meta.GetStreamAssignment("DELETEME").ShouldNotBeNull();
meta.GetState().Streams.ShouldContain("DELETEME");
await meta.ProposeDeleteStreamAsync("DELETEME", default);
meta.GetStreamAssignment("DELETEME").ShouldBeNull();
meta.GetState().Streams.ShouldNotContain("DELETEME");
meta.GetState().AssignmentCount.ShouldBe(0);
}
[Fact]
public async Task ProposeDeleteStream_nonexistent_stream_is_safe()
{
var meta = new JetStreamMetaGroup(3);
// Should not throw.
await meta.ProposeDeleteStreamAsync("MISSING", default);
meta.GetAllAssignments().Count.ShouldBe(0);
}
[Fact]
public async Task ProposeDeleteStream_only_removes_target_not_others()
{
var meta = new JetStreamMetaGroup(3);
var group = new RaftGroup { Name = "g", Peers = ["n1"] };
await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "KEEP" }, group, default);
await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "REMOVE" }, group, default);
await meta.ProposeDeleteStreamAsync("REMOVE", default);
meta.GetStreamAssignment("KEEP").ShouldNotBeNull();
meta.GetStreamAssignment("REMOVE").ShouldBeNull();
meta.GetState().Streams.Count.ShouldBe(1);
}
// ---------------------------------------------------------------
// B8: JetStreamMetaGroup — ProposeCreateConsumerAsync
// Go: jetstream_cluster.go processConsumerAssignment
// ---------------------------------------------------------------
[Fact]
public async Task ProposeCreateConsumer_adds_consumer_to_stream_assignment()
{
var meta = new JetStreamMetaGroup(3);
var streamGroup = new RaftGroup { Name = "sg", Peers = ["n1", "n2", "n3"] };
var consumerGroup = new RaftGroup { Name = "cg", Peers = ["n1", "n2"] };
await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "ORDERS" }, streamGroup, default);
await meta.ProposeCreateConsumerAsync("ORDERS", "PROCESSOR", consumerGroup, default);
var assignment = meta.GetStreamAssignment("ORDERS");
assignment.ShouldNotBeNull();
assignment!.Consumers.ContainsKey("PROCESSOR").ShouldBeTrue();
assignment.Consumers["PROCESSOR"].ConsumerName.ShouldBe("PROCESSOR");
assignment.Consumers["PROCESSOR"].StreamName.ShouldBe("ORDERS");
}
[Fact]
public async Task ProposeCreateConsumer_multiple_consumers_on_same_stream()
{
var meta = new JetStreamMetaGroup(3);
var sg = new RaftGroup { Name = "sg", Peers = ["n1"] };
var cg = new RaftGroup { Name = "cg", Peers = ["n1"] };
await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "MULTI" }, sg, default);
await meta.ProposeCreateConsumerAsync("MULTI", "C1", cg, default);
await meta.ProposeCreateConsumerAsync("MULTI", "C2", cg, default);
await meta.ProposeCreateConsumerAsync("MULTI", "C3", cg, default);
var assignment = meta.GetStreamAssignment("MULTI");
assignment!.Consumers.Count.ShouldBe(3);
assignment.Consumers.ContainsKey("C1").ShouldBeTrue();
assignment.Consumers.ContainsKey("C2").ShouldBeTrue();
assignment.Consumers.ContainsKey("C3").ShouldBeTrue();
}
[Fact]
public async Task ProposeCreateConsumer_on_nonexistent_stream_is_safe()
{
var meta = new JetStreamMetaGroup(3);
var cg = new RaftGroup { Name = "cg", Peers = ["n1"] };
// Should not throw — stream not found means consumer is simply not tracked.
await meta.ProposeCreateConsumerAsync("MISSING_STREAM", "C1", cg, default);
meta.GetStreamAssignment("MISSING_STREAM").ShouldBeNull();
}
// ---------------------------------------------------------------
// B8: JetStreamMetaGroup — ProposeDeleteConsumerAsync
// Go: jetstream_cluster.go processConsumerDelete
// ---------------------------------------------------------------
[Fact]
public async Task ProposeDeleteConsumer_removes_consumer_from_stream()
{
var meta = new JetStreamMetaGroup(3);
var sg = new RaftGroup { Name = "sg", Peers = ["n1"] };
var cg = new RaftGroup { Name = "cg", Peers = ["n1"] };
await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "EVENTS" }, sg, default);
await meta.ProposeCreateConsumerAsync("EVENTS", "PUSH", cg, default);
meta.GetStreamAssignment("EVENTS")!.Consumers.ContainsKey("PUSH").ShouldBeTrue();
await meta.ProposeDeleteConsumerAsync("EVENTS", "PUSH", default);
meta.GetStreamAssignment("EVENTS")!.Consumers.ContainsKey("PUSH").ShouldBeFalse();
}
[Fact]
public async Task ProposeDeleteConsumer_only_removes_target_consumer()
{
var meta = new JetStreamMetaGroup(3);
var sg = new RaftGroup { Name = "sg", Peers = ["n1"] };
var cg = new RaftGroup { Name = "cg", Peers = ["n1"] };
await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "S" }, sg, default);
await meta.ProposeCreateConsumerAsync("S", "KEEP", cg, default);
await meta.ProposeCreateConsumerAsync("S", "REMOVE", cg, default);
await meta.ProposeDeleteConsumerAsync("S", "REMOVE", default);
var assignment = meta.GetStreamAssignment("S");
assignment!.Consumers.ContainsKey("KEEP").ShouldBeTrue();
assignment.Consumers.ContainsKey("REMOVE").ShouldBeFalse();
}
[Fact]
public async Task ProposeDeleteConsumer_on_nonexistent_consumer_is_safe()
{
var meta = new JetStreamMetaGroup(3);
var sg = new RaftGroup { Name = "sg", Peers = ["n1"] };
await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "S" }, sg, default);
// Should not throw.
await meta.ProposeDeleteConsumerAsync("S", "MISSING_CONSUMER", default);
meta.GetStreamAssignment("S")!.Consumers.ShouldBeEmpty();
}
// ---------------------------------------------------------------
// B8: JetStreamMetaGroup — GetStreamAssignment
// ---------------------------------------------------------------
[Fact]
public void GetStreamAssignment_returns_null_for_missing_stream()
{
var meta = new JetStreamMetaGroup(3);
meta.GetStreamAssignment("NOT_THERE").ShouldBeNull();
}
[Fact]
public async Task GetAllAssignments_returns_all_tracked_streams()
{
var meta = new JetStreamMetaGroup(5);
var group = new RaftGroup { Name = "g", Peers = ["n1", "n2", "n3"] };
for (var i = 0; i < 5; i++)
await meta.ProposeCreateStreamAsync(new StreamConfig { Name = $"STREAM{i}" }, group, default);
meta.GetAllAssignments().Count.ShouldBe(5);
}
// ---------------------------------------------------------------
// B9: PlacementEngine — basic selection
// Go: jetstream_cluster.go:7212 selectPeerGroup
// ---------------------------------------------------------------
[Fact]
public void PlacementEngine_selects_requested_number_of_peers()
{
var peers = new List
{
new() { PeerId = "n1" },
new() { PeerId = "n2" },
new() { PeerId = "n3" },
new() { PeerId = "n4" },
new() { PeerId = "n5" },
};
var group = PlacementEngine.SelectPeerGroup("TEST", replicas: 3, peers);
group.Peers.Count.ShouldBe(3);
group.Name.ShouldBe("TEST");
}
[Fact]
public void PlacementEngine_returns_raft_group_with_correct_name()
{
var peers = new List
{
new() { PeerId = "n1" },
new() { PeerId = "n2" },
};
var group = PlacementEngine.SelectPeerGroup("MY_GROUP", replicas: 1, peers);
group.Name.ShouldBe("MY_GROUP");
}
// ---------------------------------------------------------------
// B9: PlacementEngine — cluster affinity filtering
// Go: jetstream_cluster.go selectPeerGroup cluster filter
// ---------------------------------------------------------------
[Fact]
public void PlacementEngine_cluster_affinity_filters_to_matching_cluster()
{
var peers = new List
{
new() { PeerId = "n1", Cluster = "east" },
new() { PeerId = "n2", Cluster = "east" },
new() { PeerId = "n3", Cluster = "west" },
new() { PeerId = "n4", Cluster = "west" },
};
var policy = new PlacementPolicy { Cluster = "east" };
var group = PlacementEngine.SelectPeerGroup("G", replicas: 2, peers, policy);
group.Peers.Count.ShouldBe(2);
group.Peers.ShouldContain("n1");
group.Peers.ShouldContain("n2");
}
[Fact]
public void PlacementEngine_cluster_affinity_is_case_insensitive()
{
var peers = new List
{
new() { PeerId = "n1", Cluster = "EAST" },
new() { PeerId = "n2", Cluster = "west" },
};
var policy = new PlacementPolicy { Cluster = "east" };
var group = PlacementEngine.SelectPeerGroup("G", replicas: 1, peers, policy);
group.Peers.ShouldContain("n1");
}
// ---------------------------------------------------------------
// B9: PlacementEngine — tag filtering
// Go: jetstream_cluster.go selectPeerGroup tag filter
// ---------------------------------------------------------------
[Fact]
public void PlacementEngine_tag_filter_selects_peers_with_all_required_tags()
{
var peers = new List
{
new() { PeerId = "n1", Tags = new(StringComparer.OrdinalIgnoreCase) { "ssd", "fast" } },
new() { PeerId = "n2", Tags = new(StringComparer.OrdinalIgnoreCase) { "ssd" } },
new() { PeerId = "n3", Tags = new(StringComparer.OrdinalIgnoreCase) { "fast" } },
new() { PeerId = "n4", Tags = new(StringComparer.OrdinalIgnoreCase) { "ssd", "fast" } },
};
var policy = new PlacementPolicy
{
Tags = new(StringComparer.OrdinalIgnoreCase) { "ssd", "fast" },
};
var group = PlacementEngine.SelectPeerGroup("G", replicas: 2, peers, policy);
group.Peers.Count.ShouldBe(2);
group.Peers.All(p => p == "n1" || p == "n4").ShouldBeTrue();
}
[Fact]
public void PlacementEngine_tag_filter_is_case_insensitive()
{
var peers = new List
{
new() { PeerId = "n1", Tags = new(StringComparer.OrdinalIgnoreCase) { "SSD" } },
new() { PeerId = "n2", Tags = new(StringComparer.OrdinalIgnoreCase) { "hdd" } },
};
var policy = new PlacementPolicy
{
Tags = new(StringComparer.OrdinalIgnoreCase) { "ssd" },
};
var group = PlacementEngine.SelectPeerGroup("G", replicas: 1, peers, policy);
group.Peers.ShouldContain("n1");
}
// ---------------------------------------------------------------
// B9: PlacementEngine — exclude tag filtering
// Go: jetstream_cluster.go selectPeerGroup exclude-tag logic
// ---------------------------------------------------------------
[Fact]
public void PlacementEngine_exclude_tag_filters_out_peers_with_those_tags()
{
var peers = new List
{
new() { PeerId = "n1", Tags = new(StringComparer.OrdinalIgnoreCase) { "nvme" } },
new() { PeerId = "n2", Tags = new(StringComparer.OrdinalIgnoreCase) { "spinning" } },
new() { PeerId = "n3", Tags = new(StringComparer.OrdinalIgnoreCase) { "nvme" } },
new() { PeerId = "n4" },
};
var policy = new PlacementPolicy
{
ExcludeTags = new(StringComparer.OrdinalIgnoreCase) { "spinning" },
};
var group = PlacementEngine.SelectPeerGroup("G", replicas: 3, peers, policy);
group.Peers.ShouldNotContain("n2");
group.Peers.Count.ShouldBe(3);
}
[Fact]
public void PlacementEngine_exclude_tag_is_case_insensitive()
{
var peers = new List
{
new() { PeerId = "n1", Tags = new(StringComparer.OrdinalIgnoreCase) { "SLOW" } },
new() { PeerId = "n2" },
};
var policy = new PlacementPolicy
{
ExcludeTags = new(StringComparer.OrdinalIgnoreCase) { "slow" },
};
var group = PlacementEngine.SelectPeerGroup("G", replicas: 1, peers, policy);
group.Peers.ShouldNotContain("n1");
group.Peers.ShouldContain("n2");
}
// ---------------------------------------------------------------
// B9: PlacementEngine — throws when not enough peers
// Go: jetstream_cluster.go selectPeerGroup insufficient peer error
// ---------------------------------------------------------------
[Fact]
public void PlacementEngine_throws_when_not_enough_peers()
{
var peers = new List
{
new() { PeerId = "n1" },
};
var act = () => PlacementEngine.SelectPeerGroup("G", replicas: 3, peers);
act.ShouldThrow();
}
[Fact]
public void PlacementEngine_throws_when_filter_leaves_insufficient_peers()
{
var peers = new List
{
new() { PeerId = "n1", Cluster = "east" },
new() { PeerId = "n2", Cluster = "east" },
new() { PeerId = "n3", Cluster = "west" },
};
var policy = new PlacementPolicy { Cluster = "east" };
var act = () => PlacementEngine.SelectPeerGroup("G", replicas: 3, peers, policy);
act.ShouldThrow();
}
[Fact]
public void PlacementEngine_throws_when_unavailable_peers_reduce_below_requested()
{
var peers = new List
{
new() { PeerId = "n1", Available = true },
new() { PeerId = "n2", Available = false },
new() { PeerId = "n3", Available = false },
};
var act = () => PlacementEngine.SelectPeerGroup("G", replicas: 2, peers);
act.ShouldThrow();
}
// ---------------------------------------------------------------
// B9: PlacementEngine — sorts by available storage descending
// Go: jetstream_cluster.go selectPeerGroup storage sort
// ---------------------------------------------------------------
[Fact]
public void PlacementEngine_sorts_by_available_storage_descending()
{
var peers = new List
{
new() { PeerId = "small", AvailableStorage = 100 },
new() { PeerId = "large", AvailableStorage = 10_000 },
new() { PeerId = "medium", AvailableStorage = 500 },
};
var group = PlacementEngine.SelectPeerGroup("G", replicas: 2, peers);
// Should pick the two with most storage: large and medium.
group.Peers.ShouldContain("large");
group.Peers.ShouldContain("medium");
group.Peers.ShouldNotContain("small");
}
[Fact]
public void PlacementEngine_unavailable_peers_are_excluded()
{
var peers = new List
{
new() { PeerId = "online1", Available = true },
new() { PeerId = "offline1", Available = false },
new() { PeerId = "online2", Available = true },
};
var group = PlacementEngine.SelectPeerGroup("G", replicas: 2, peers);
group.Peers.ShouldContain("online1");
group.Peers.ShouldContain("online2");
group.Peers.ShouldNotContain("offline1");
}
[Fact]
public void PlacementEngine_no_policy_selects_all_available_up_to_replicas()
{
var peers = new List
{
new() { PeerId = "n1" },
new() { PeerId = "n2" },
new() { PeerId = "n3" },
};
var group = PlacementEngine.SelectPeerGroup("G", replicas: 3, peers);
group.Peers.Count.ShouldBe(3);
}
}