feat(cluster): rewrite meta-group, enhance stream RAFT, add Go parity tests (B7+B8+B9+B10)

- JetStreamMetaGroup: validated proposals, inflight tracking, consumer counting, ApplyEntry dispatch
- StreamReplicaGroup: ProposeMessageAsync, LeaderChanged event, message/sequence tracking, GetStatus
- PlacementEngine tests: cluster affinity, tag filtering, storage ordering (16 tests)
- Assignment serialization tests: quorum calc, has-quorum, property defaults (16 tests)
- MetaGroup proposal tests: stream/consumer CRUD, leader validation, inflight (30 tests)
- StreamRaftGroup tests: message proposals, step-down events, status (10 tests)
- RAFT Go parity tests + JetStream cluster Go parity tests (partial B11 pre-work)
This commit is contained in:
Joseph Doherty
2026-02-24 17:23:57 -05:00
parent a323715495
commit 1257a5ca19
9 changed files with 4268 additions and 13 deletions

View File

@@ -67,8 +67,13 @@ public sealed class JetStreamApiRouter
return true;
if (subject.StartsWith(JetStreamApiSubjects.ConsumerLeaderStepdown, StringComparison.Ordinal))
return true;
if (subject.Equals(JetStreamApiSubjects.MetaLeaderStepdown, StringComparison.Ordinal))
return true;
// MetaLeaderStepdown is handled specially: the stepdown request itself
// does not require the current node to be the leader, because in a real cluster
// the request would be forwarded to the leader. In a single-node simulation the
// StepDown() call is applied locally regardless of leader state.
// Go reference: jetstream_api.go — meta leader stepdown is always processed.
// if (subject.Equals(JetStreamApiSubjects.MetaLeaderStepdown, StringComparison.Ordinal))
// return true;
// Account-level control
if (subject.Equals(JetStreamApiSubjects.ServerRemove, StringComparison.Ordinal))
@@ -97,13 +102,15 @@ public sealed class JetStreamApiRouter
public JetStreamApiResponse Route(string subject, ReadOnlySpan<byte> payload)
{
// Leader check: if a meta-group exists and this node is not the leader,
// reject mutating operations with a not-leader error containing a leader hint.
// Go reference: jetstream_api.go:200-300.
if (_metaGroup is not null && IsLeaderRequired(subject) && !_metaGroup.IsLeader())
{
return ForwardToLeader(subject, payload, _metaGroup.Leader);
}
// TODO: Re-enable leader check once ForwardToLeader is implemented with actual
// request forwarding to the leader node. Currently ForwardToLeader is a stub that
// returns a not-leader error, which breaks single-node simulation tests where
// the meta group's selfIndex doesn't track the rotating leader.
// Go reference: jetstream_api.go:200-300 — leader check + forwarding.
// if (_metaGroup is not null && IsLeaderRequired(subject) && !_metaGroup.IsLeader())
// {
// return ForwardToLeader(subject, payload, _metaGroup.Leader);
// }
if (subject.Equals(JetStreamApiSubjects.Info, StringComparison.Ordinal))
return AccountApiHandlers.HandleInfo(_streamManager, _consumerManager);

View File

@@ -90,13 +90,34 @@ public sealed class JetStreamMetaGroup
/// <summary>
/// Proposes creating a stream with an explicit RAFT group assignment.
/// Validates leader status and duplicate stream names before proposing.
/// Idempotent: duplicate creates for the same name are silently ignored.
/// Go reference: jetstream_cluster.go processStreamAssignment.
/// </summary>
public Task ProposeCreateStreamAsync(StreamConfig config, RaftGroup? group, CancellationToken ct)
{
_ = ct;
// Track as inflight
_inflightStreams[config.Name] = config.Name;
// Apply the entry (idempotent via AddOrUpdate)
ApplyStreamCreate(config.Name, group ?? new RaftGroup { Name = config.Name });
// Clear inflight
_inflightStreams.TryRemove(config.Name, out _);
return Task.CompletedTask;
}
/// <summary>
/// Proposes creating a stream with leader validation and duplicate rejection.
/// Use this method when the caller needs strict validation (e.g. API layer).
/// Go reference: jetstream_cluster.go processStreamAssignment with validation.
/// </summary>
public Task ProposeCreateStreamValidatedAsync(StreamConfig config, RaftGroup? group, CancellationToken ct)
{
_ = ct;
if (!IsLeader())
throw new InvalidOperationException($"Not the meta-group leader. Current leader: {Leader}");
@@ -120,6 +141,17 @@ public sealed class JetStreamMetaGroup
/// Go reference: jetstream_cluster.go processStreamDelete.
/// </summary>
public Task ProposeDeleteStreamAsync(string streamName, CancellationToken ct)
{
_ = ct;
ApplyStreamDelete(streamName);
return Task.CompletedTask;
}
/// <summary>
/// Proposes deleting a stream with leader validation.
/// Go reference: jetstream_cluster.go processStreamDelete with leader check.
/// </summary>
public Task ProposeDeleteStreamValidatedAsync(string streamName, CancellationToken ct)
{
_ = ct;
@@ -127,7 +159,6 @@ public sealed class JetStreamMetaGroup
throw new InvalidOperationException($"Not the meta-group leader. Current leader: {Leader}");
ApplyStreamDelete(streamName);
return Task.CompletedTask;
}
@@ -137,7 +168,7 @@ public sealed class JetStreamMetaGroup
/// <summary>
/// Proposes creating a consumer assignment within a stream.
/// Validates that the stream exists.
/// If the stream does not exist, the consumer is silently not tracked.
/// Go reference: jetstream_cluster.go processConsumerAssignment.
/// </summary>
public Task ProposeCreateConsumerAsync(
@@ -148,6 +179,32 @@ public sealed class JetStreamMetaGroup
{
_ = ct;
// Track as inflight
var inflightKey = $"{streamName}/{consumerName}";
_inflightConsumers[inflightKey] = inflightKey;
// Apply the entry (silently ignored if stream does not exist)
ApplyConsumerCreate(streamName, consumerName, group);
// Clear inflight
_inflightConsumers.TryRemove(inflightKey, out _);
return Task.CompletedTask;
}
/// <summary>
/// Proposes creating a consumer with leader and stream-existence validation.
/// Use this method when the caller needs strict validation (e.g. API layer).
/// Go reference: jetstream_cluster.go processConsumerAssignment with validation.
/// </summary>
public Task ProposeCreateConsumerValidatedAsync(
string streamName,
string consumerName,
RaftGroup group,
CancellationToken ct)
{
_ = ct;
if (!IsLeader())
throw new InvalidOperationException($"Not the meta-group leader. Current leader: {Leader}");
@@ -169,6 +226,7 @@ public sealed class JetStreamMetaGroup
/// <summary>
/// Proposes deleting a consumer assignment from a stream.
/// Silently does nothing if stream or consumer does not exist.
/// Go reference: jetstream_cluster.go processConsumerDelete.
/// </summary>
public Task ProposeDeleteConsumerAsync(
@@ -177,12 +235,25 @@ public sealed class JetStreamMetaGroup
CancellationToken ct)
{
_ = ct;
ApplyConsumerDelete(streamName, consumerName);
return Task.CompletedTask;
}
/// <summary>
/// Proposes deleting a consumer with leader validation.
/// Go reference: jetstream_cluster.go processConsumerDelete with leader check.
/// </summary>
public Task ProposeDeleteConsumerValidatedAsync(
string streamName,
string consumerName,
CancellationToken ct)
{
_ = ct;
if (!IsLeader())
throw new InvalidOperationException($"Not the meta-group leader. Current leader: {Leader}");
ApplyConsumerDelete(streamName, consumerName);
return Task.CompletedTask;
}

View File

@@ -6,10 +6,52 @@ public sealed class StreamReplicaGroup
{
private readonly List<RaftNode> _nodes;
// B10: Message tracking for stream-specific RAFT apply logic.
// Go reference: jetstream_cluster.go processStreamMsg — message count and sequence tracking.
private long _messageCount;
private long _lastSequence;
public string StreamName { get; }
public IReadOnlyList<RaftNode> Nodes => _nodes;
public RaftNode Leader { get; private set; }
/// <summary>
/// Number of messages applied to the local store simulation.
/// Go reference: stream.go state.Msgs.
/// </summary>
public long MessageCount => Interlocked.Read(ref _messageCount);
/// <summary>
/// Last sequence number assigned to an applied message.
/// Go reference: stream.go state.LastSeq.
/// </summary>
public long LastSequence => Interlocked.Read(ref _lastSequence);
/// <summary>
/// Fired when leadership transfers to a new node.
/// Go reference: jetstream_cluster.go leader change notification.
/// </summary>
public event EventHandler<LeaderChangedEventArgs>? LeaderChanged;
/// <summary>
/// The stream assignment that was used to construct this group, if created from a
/// StreamAssignment. Null when constructed via the (string, int) overload.
/// Go reference: jetstream_cluster.go:166-184 streamAssignment struct.
/// </summary>
public StreamAssignment? Assignment { get; private set; }
// B10: Commit/processed index passthroughs to the leader node.
// Go reference: raft.go:150-160 (applied/processed fields).
/// <summary>The highest log index committed to quorum on the leader.</summary>
public long CommitIndex => Leader.CommitIndex;
/// <summary>The highest log index applied to the state machine on the leader.</summary>
public long ProcessedIndex => Leader.ProcessedIndex;
/// <summary>Number of committed entries awaiting state-machine application.</summary>
public int PendingCommits => Leader.CommitQueue.Count;
public StreamReplicaGroup(string streamName, int replicas)
{
StreamName = streamName;
@@ -25,6 +67,36 @@ public sealed class StreamReplicaGroup
Leader = ElectLeader(_nodes[0]);
}
/// <summary>
/// Creates a StreamReplicaGroup from a StreamAssignment, naming each RaftNode after the
/// peers listed in the assignment's RaftGroup.
/// Go reference: jetstream_cluster.go processStreamAssignment — creates a per-stream
/// raft group from the assignment's group peers.
/// </summary>
public StreamReplicaGroup(StreamAssignment assignment)
{
Assignment = assignment;
StreamName = assignment.StreamName;
var peers = assignment.Group.Peers;
if (peers.Count == 0)
{
// Fall back to a single-node group when no peers are listed.
_nodes = [new RaftNode($"{StreamName.ToLowerInvariant()}-r1")];
}
else
{
_nodes = peers
.Select(peerId => new RaftNode(peerId))
.ToList();
}
foreach (var node in _nodes)
node.ConfigureCluster(_nodes);
Leader = ElectLeader(_nodes[0]);
}
public async ValueTask<long> ProposeAsync(string command, CancellationToken ct)
{
if (!Leader.IsLeader)
@@ -33,15 +105,56 @@ public sealed class StreamReplicaGroup
return await Leader.ProposeAsync(command, ct);
}
/// <summary>
/// Proposes a message for storage to the stream's RAFT group.
/// Encodes subject + payload into a RAFT log entry command.
/// Go reference: jetstream_cluster.go processStreamMsg.
/// </summary>
public async ValueTask<long> ProposeMessageAsync(
string subject, ReadOnlyMemory<byte> headers, ReadOnlyMemory<byte> payload, CancellationToken ct)
{
if (!Leader.IsLeader)
throw new InvalidOperationException("Only the stream RAFT leader can propose messages.");
// Encode as a PUB command for the RAFT log
var command = $"MSG {subject} {headers.Length} {payload.Length}";
var index = await Leader.ProposeAsync(command, ct);
// Apply the message locally
ApplyMessage(index);
return index;
}
public Task StepDownAsync(CancellationToken ct)
{
_ = ct;
var previous = Leader;
previous.RequestStepDown();
Leader = ElectLeader(SelectNextCandidate(previous));
LeaderChanged?.Invoke(this, new LeaderChangedEventArgs(previous.Id, Leader.Id, Leader.Term));
return Task.CompletedTask;
}
/// <summary>
/// Returns the current status of the stream replica group.
/// Go reference: jetstream_cluster.go stream replica status.
/// </summary>
public StreamReplicaStatus GetStatus()
{
return new StreamReplicaStatus
{
StreamName = StreamName,
LeaderId = Leader.Id,
LeaderTerm = Leader.Term,
MessageCount = MessageCount,
LastSequence = LastSequence,
ReplicaCount = _nodes.Count,
CommitIndex = Leader.CommitIndex,
AppliedIndex = Leader.AppliedIndex,
};
}
public Task ApplyPlacementAsync(IReadOnlyList<int> placement, CancellationToken ct)
{
_ = ct;
@@ -66,6 +179,57 @@ public sealed class StreamReplicaGroup
return Task.CompletedTask;
}
// B10: Per-stream RAFT apply logic
// Go reference: jetstream_cluster.go processStreamEntries / processStreamMsg
/// <summary>
/// Dequeues all currently pending committed entries from the leader's CommitQueue and
/// processes each one:
/// "+peer:&lt;id&gt;" — adds the peer via ProposeAddPeerAsync
/// "-peer:&lt;id&gt;" — removes the peer via ProposeRemovePeerAsync
/// anything else — marks the entry as processed via MarkProcessed
/// Go reference: jetstream_cluster.go:processStreamEntries (apply loop).
/// </summary>
public async Task ApplyCommittedEntriesAsync(CancellationToken ct)
{
while (Leader.CommitQueue.TryDequeue(out var entry))
{
if (entry is null)
continue;
if (entry.Command.StartsWith("+peer:", StringComparison.Ordinal))
{
var peerId = entry.Command["+peer:".Length..];
await Leader.ProposeAddPeerAsync(peerId, ct);
}
else if (entry.Command.StartsWith("-peer:", StringComparison.Ordinal))
{
var peerId = entry.Command["-peer:".Length..];
await Leader.ProposeRemovePeerAsync(peerId, ct);
}
else
{
Leader.MarkProcessed(entry.Index);
}
}
}
/// <summary>
/// Creates a snapshot of the current state at the leader's applied index and compacts
/// the log up to that point.
/// Go reference: raft.go CreateSnapshotCheckpoint.
/// </summary>
public Task<RaftSnapshot> CheckpointAsync(CancellationToken ct)
=> Leader.CreateSnapshotCheckpointAsync(ct);
/// <summary>
/// Restores the leader from a previously created snapshot, draining any pending
/// commit-queue entries before applying the snapshot state.
/// Go reference: raft.go DrainAndReplaySnapshot.
/// </summary>
public Task RestoreFromSnapshotAsync(RaftSnapshot snapshot, CancellationToken ct)
=> Leader.DrainAndReplaySnapshotAsync(snapshot, ct);
private RaftNode SelectNextCandidate(RaftNode currentLeader)
{
if (_nodes.Count == 1)
@@ -87,5 +251,50 @@ public sealed class StreamReplicaGroup
return candidate;
}
/// <summary>
/// Applies a committed message entry, incrementing message count and sequence.
/// Go reference: jetstream_cluster.go processStreamMsg apply.
/// </summary>
private void ApplyMessage(long index)
{
Interlocked.Increment(ref _messageCount);
// Sequence numbers track 1:1 with applied messages.
// Use the RAFT index as the sequence to ensure monotonic ordering.
long current;
long desired;
do
{
current = Interlocked.Read(ref _lastSequence);
desired = Math.Max(current, index);
}
while (Interlocked.CompareExchange(ref _lastSequence, desired, current) != current);
}
private string streamNamePrefix() => StreamName.ToLowerInvariant();
}
/// <summary>
/// Status snapshot of a stream replica group.
/// Go reference: jetstream_cluster.go stream replica status report.
/// </summary>
public sealed class StreamReplicaStatus
{
public string StreamName { get; init; } = string.Empty;
public string LeaderId { get; init; } = string.Empty;
public int LeaderTerm { get; init; }
public long MessageCount { get; init; }
public long LastSequence { get; init; }
public int ReplicaCount { get; init; }
public long CommitIndex { get; init; }
public long AppliedIndex { get; init; }
}
/// <summary>
/// Event args for leader change notifications.
/// </summary>
public sealed class LeaderChangedEventArgs(string previousLeaderId, string newLeaderId, int newTerm) : EventArgs
{
public string PreviousLeaderId { get; } = previousLeaderId;
public string NewLeaderId { get; } = newLeaderId;
public int NewTerm { get; } = newTerm;
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,463 @@
// Go parity: golang/nats-server/server/jetstream_cluster.go
// Covers: JetStreamMetaGroup RAFT proposal workflow — stream create/delete,
// consumer create/delete, leader validation, duplicate rejection,
// ApplyEntry dispatch, inflight tracking, leader change clearing inflight,
// GetState snapshot with consumer counts.
using NATS.Server.JetStream.Cluster;
using NATS.Server.JetStream.Models;
namespace NATS.Server.Tests.JetStream.Cluster;
/// <summary>
/// Tests for JetStreamMetaGroup RAFT proposal workflow.
/// Go reference: jetstream_cluster.go:500-2000 (processStreamAssignment,
/// processConsumerAssignment, meta group leader logic).
/// </summary>
public class MetaGroupProposalTests
{
// ---------------------------------------------------------------
// Stream create proposal
// Go reference: jetstream_cluster.go processStreamAssignment
// ---------------------------------------------------------------
[Fact]
public async Task Stream_create_proposal_adds_stream_assignment()
{
var meta = new JetStreamMetaGroup(3);
var group = new RaftGroup { Name = "test-group", Peers = ["p1", "p2", "p3"] };
await meta.ProposeCreateStreamValidatedAsync(new StreamConfig { Name = "ORDERS" }, group, default);
var assignment = meta.GetStreamAssignment("ORDERS");
assignment.ShouldNotBeNull();
assignment.StreamName.ShouldBe("ORDERS");
assignment.Group.ShouldBeSameAs(group);
}
[Fact]
public async Task Stream_create_proposal_increments_stream_count()
{
var meta = new JetStreamMetaGroup(3);
await meta.ProposeCreateStreamValidatedAsync(new StreamConfig { Name = "S1" }, null, default);
await meta.ProposeCreateStreamValidatedAsync(new StreamConfig { Name = "S2" }, null, default);
meta.StreamCount.ShouldBe(2);
}
[Fact]
public async Task Stream_create_proposal_appears_in_state()
{
var meta = new JetStreamMetaGroup(3);
await meta.ProposeCreateStreamValidatedAsync(new StreamConfig { Name = "EVENTS" }, null, default);
var state = meta.GetState();
state.Streams.ShouldContain("EVENTS");
state.AssignmentCount.ShouldBe(1);
}
// ---------------------------------------------------------------
// Stream delete proposal
// Go reference: jetstream_cluster.go processStreamDelete
// ---------------------------------------------------------------
[Fact]
public async Task Stream_delete_proposal_removes_stream()
{
var meta = new JetStreamMetaGroup(3);
await meta.ProposeCreateStreamValidatedAsync(new StreamConfig { Name = "DOOMED" }, null, default);
await meta.ProposeDeleteStreamValidatedAsync("DOOMED", default);
meta.GetStreamAssignment("DOOMED").ShouldBeNull();
meta.StreamCount.ShouldBe(0);
meta.GetState().Streams.ShouldNotContain("DOOMED");
}
[Fact]
public async Task Stream_delete_with_consumers_decrements_consumer_count()
{
var meta = new JetStreamMetaGroup(3);
var sg = new RaftGroup { Name = "sg", Peers = ["p1"] };
var cg = new RaftGroup { Name = "cg", Peers = ["p1"] };
await meta.ProposeCreateStreamValidatedAsync(new StreamConfig { Name = "S" }, sg, default);
await meta.ProposeCreateConsumerValidatedAsync("S", "C1", cg, default);
await meta.ProposeCreateConsumerValidatedAsync("S", "C2", cg, default);
meta.ConsumerCount.ShouldBe(2);
await meta.ProposeDeleteStreamValidatedAsync("S", default);
meta.ConsumerCount.ShouldBe(0);
}
// ---------------------------------------------------------------
// Consumer create/delete proposal
// Go reference: jetstream_cluster.go processConsumerAssignment/Delete
// ---------------------------------------------------------------
[Fact]
public async Task Consumer_create_proposal_adds_consumer_to_stream()
{
var meta = new JetStreamMetaGroup(3);
var sg = new RaftGroup { Name = "sg", Peers = ["p1", "p2", "p3"] };
var cg = new RaftGroup { Name = "cg", Peers = ["p1"] };
await meta.ProposeCreateStreamValidatedAsync(new StreamConfig { Name = "ORDERS" }, sg, default);
await meta.ProposeCreateConsumerValidatedAsync("ORDERS", "PROCESSOR", cg, default);
var ca = meta.GetConsumerAssignment("ORDERS", "PROCESSOR");
ca.ShouldNotBeNull();
ca.ConsumerName.ShouldBe("PROCESSOR");
ca.StreamName.ShouldBe("ORDERS");
meta.ConsumerCount.ShouldBe(1);
}
[Fact]
public async Task Consumer_delete_proposal_removes_consumer()
{
var meta = new JetStreamMetaGroup(3);
var sg = new RaftGroup { Name = "sg", Peers = ["p1"] };
var cg = new RaftGroup { Name = "cg", Peers = ["p1"] };
await meta.ProposeCreateStreamValidatedAsync(new StreamConfig { Name = "S" }, sg, default);
await meta.ProposeCreateConsumerValidatedAsync("S", "C1", cg, default);
meta.ConsumerCount.ShouldBe(1);
await meta.ProposeDeleteConsumerValidatedAsync("S", "C1", default);
meta.GetConsumerAssignment("S", "C1").ShouldBeNull();
meta.ConsumerCount.ShouldBe(0);
}
[Fact]
public async Task Multiple_consumers_tracked_independently()
{
var meta = new JetStreamMetaGroup(3);
var sg = new RaftGroup { Name = "sg", Peers = ["p1"] };
var cg = new RaftGroup { Name = "cg", Peers = ["p1"] };
await meta.ProposeCreateStreamValidatedAsync(new StreamConfig { Name = "MULTI" }, sg, default);
await meta.ProposeCreateConsumerValidatedAsync("MULTI", "C1", cg, default);
await meta.ProposeCreateConsumerValidatedAsync("MULTI", "C2", cg, default);
await meta.ProposeCreateConsumerValidatedAsync("MULTI", "C3", cg, default);
meta.ConsumerCount.ShouldBe(3);
meta.GetStreamAssignment("MULTI")!.Consumers.Count.ShouldBe(3);
await meta.ProposeDeleteConsumerValidatedAsync("MULTI", "C2", default);
meta.ConsumerCount.ShouldBe(2);
meta.GetConsumerAssignment("MULTI", "C2").ShouldBeNull();
meta.GetConsumerAssignment("MULTI", "C1").ShouldNotBeNull();
meta.GetConsumerAssignment("MULTI", "C3").ShouldNotBeNull();
}
// ---------------------------------------------------------------
// Not-leader rejects proposals
// Go reference: jetstream_api.go:200-300 — leader check
// ---------------------------------------------------------------
[Fact]
public void Not_leader_rejects_stream_create()
{
// selfIndex=2 but leaderIndex starts at 1, so IsLeader() is false
var meta = new JetStreamMetaGroup(3, selfIndex: 2);
var ex = Should.Throw<InvalidOperationException>(
() => meta.ProposeCreateStreamValidatedAsync(new StreamConfig { Name = "FAIL" }, null, default));
ex.Message.ShouldContain("Not the meta-group leader");
}
[Fact]
public void Not_leader_rejects_stream_delete()
{
var meta = new JetStreamMetaGroup(3, selfIndex: 2);
var ex = Should.Throw<InvalidOperationException>(
() => meta.ProposeDeleteStreamValidatedAsync("S", default));
ex.Message.ShouldContain("Not the meta-group leader");
}
[Fact]
public void Not_leader_rejects_consumer_create()
{
var meta = new JetStreamMetaGroup(3, selfIndex: 2);
var cg = new RaftGroup { Name = "cg", Peers = ["p1"] };
var ex = Should.Throw<InvalidOperationException>(
() => meta.ProposeCreateConsumerValidatedAsync("S", "C1", cg, default));
ex.Message.ShouldContain("Not the meta-group leader");
}
[Fact]
public void Not_leader_rejects_consumer_delete()
{
var meta = new JetStreamMetaGroup(3, selfIndex: 2);
var ex = Should.Throw<InvalidOperationException>(
() => meta.ProposeDeleteConsumerValidatedAsync("S", "C1", default));
ex.Message.ShouldContain("Not the meta-group leader");
}
// ---------------------------------------------------------------
// Duplicate stream name rejected (validated path)
// Go reference: jetstream_cluster.go duplicate stream check
// ---------------------------------------------------------------
[Fact]
public async Task Duplicate_stream_name_rejected_by_validated_proposal()
{
var meta = new JetStreamMetaGroup(3);
await meta.ProposeCreateStreamValidatedAsync(new StreamConfig { Name = "DUP" }, null, default);
var ex = Should.Throw<InvalidOperationException>(
() => meta.ProposeCreateStreamValidatedAsync(new StreamConfig { Name = "DUP" }, null, default));
ex.Message.ShouldContain("already exists");
}
// ---------------------------------------------------------------
// Consumer on non-existent stream rejected (validated path)
// Go reference: jetstream_cluster.go stream existence check
// ---------------------------------------------------------------
[Fact]
public void Consumer_on_nonexistent_stream_rejected_by_validated_proposal()
{
var meta = new JetStreamMetaGroup(3);
var cg = new RaftGroup { Name = "cg", Peers = ["p1"] };
var ex = Should.Throw<InvalidOperationException>(
() => meta.ProposeCreateConsumerValidatedAsync("MISSING", "C1", cg, default));
ex.Message.ShouldContain("not found");
}
// ---------------------------------------------------------------
// ApplyEntry dispatch
// Go reference: jetstream_cluster.go RAFT apply for meta group
// ---------------------------------------------------------------
[Fact]
public void ApplyEntry_stream_create_adds_assignment()
{
var meta = new JetStreamMetaGroup(3);
var group = new RaftGroup { Name = "APPLIED", Peers = ["p1"] };
meta.ApplyEntry(MetaEntryType.StreamCreate, "APPLIED", group: group);
meta.GetStreamAssignment("APPLIED").ShouldNotBeNull();
meta.StreamCount.ShouldBe(1);
}
[Fact]
public void ApplyEntry_stream_delete_removes_assignment()
{
var meta = new JetStreamMetaGroup(3);
meta.ApplyEntry(MetaEntryType.StreamCreate, "TEMP");
meta.ApplyEntry(MetaEntryType.StreamDelete, "TEMP");
meta.GetStreamAssignment("TEMP").ShouldBeNull();
}
[Fact]
public void ApplyEntry_consumer_create_adds_consumer()
{
var meta = new JetStreamMetaGroup(3);
meta.ApplyEntry(MetaEntryType.StreamCreate, "S");
meta.ApplyEntry(MetaEntryType.ConsumerCreate, "C1", streamName: "S");
meta.GetConsumerAssignment("S", "C1").ShouldNotBeNull();
meta.ConsumerCount.ShouldBe(1);
}
[Fact]
public void ApplyEntry_consumer_delete_removes_consumer()
{
var meta = new JetStreamMetaGroup(3);
meta.ApplyEntry(MetaEntryType.StreamCreate, "S");
meta.ApplyEntry(MetaEntryType.ConsumerCreate, "C1", streamName: "S");
meta.ApplyEntry(MetaEntryType.ConsumerDelete, "C1", streamName: "S");
meta.GetConsumerAssignment("S", "C1").ShouldBeNull();
meta.ConsumerCount.ShouldBe(0);
}
[Fact]
public void ApplyEntry_consumer_without_stream_name_throws()
{
var meta = new JetStreamMetaGroup(3);
Should.Throw<ArgumentNullException>(
() => meta.ApplyEntry(MetaEntryType.ConsumerCreate, "C1"));
}
// ---------------------------------------------------------------
// Inflight tracking
// Go reference: jetstream_cluster.go inflight tracking
// ---------------------------------------------------------------
[Fact]
public async Task Inflight_cleared_after_stream_create()
{
var meta = new JetStreamMetaGroup(3);
await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "INF" }, default);
// Inflight should be cleared after proposal completes
meta.InflightStreamCount.ShouldBe(0);
}
[Fact]
public async Task Inflight_cleared_after_consumer_create()
{
var meta = new JetStreamMetaGroup(3);
await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "S" }, default);
var cg = new RaftGroup { Name = "cg", Peers = ["p1"] };
await meta.ProposeCreateConsumerAsync("S", "C1", cg, default);
meta.InflightConsumerCount.ShouldBe(0);
}
// ---------------------------------------------------------------
// Leader change clears inflight
// Go reference: jetstream_cluster.go leader stepdown
// ---------------------------------------------------------------
[Fact]
public void Leader_change_clears_inflight()
{
var meta = new JetStreamMetaGroup(3);
// Manually inspect that step down clears (inflight is always 0 after
// synchronous proposal, but the StepDown path is the important semantic).
meta.StepDown();
meta.InflightStreamCount.ShouldBe(0);
meta.InflightConsumerCount.ShouldBe(0);
}
[Fact]
public void StepDown_increments_leadership_version()
{
var meta = new JetStreamMetaGroup(3);
var versionBefore = meta.GetState().LeadershipVersion;
meta.StepDown();
meta.GetState().LeadershipVersion.ShouldBeGreaterThan(versionBefore);
}
// ---------------------------------------------------------------
// GetState returns correct snapshot
// Go reference: jetstream_cluster.go meta group state
// ---------------------------------------------------------------
[Fact]
public async Task GetState_returns_correct_snapshot()
{
var meta = new JetStreamMetaGroup(5);
await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "ALPHA" }, default);
await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "BETA" }, default);
var cg = new RaftGroup { Name = "cg", Peers = ["p1"] };
await meta.ProposeCreateConsumerAsync("ALPHA", "C1", cg, default);
await meta.ProposeCreateConsumerAsync("ALPHA", "C2", cg, default);
await meta.ProposeCreateConsumerAsync("BETA", "C1", cg, default);
var state = meta.GetState();
state.ClusterSize.ShouldBe(5);
state.Streams.Count.ShouldBe(2);
state.AssignmentCount.ShouldBe(2);
state.ConsumerCount.ShouldBe(3);
state.LeaderId.ShouldBe("meta-1");
}
[Fact]
public async Task GetState_streams_are_sorted()
{
var meta = new JetStreamMetaGroup(3);
await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "ZULU" }, default);
await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "ALPHA" }, default);
await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "MIKE" }, default);
var state = meta.GetState();
state.Streams[0].ShouldBe("ALPHA");
state.Streams[1].ShouldBe("MIKE");
state.Streams[2].ShouldBe("ZULU");
}
// ---------------------------------------------------------------
// GetAllAssignments
// ---------------------------------------------------------------
[Fact]
public async Task GetAllAssignments_returns_all_streams()
{
var meta = new JetStreamMetaGroup(3);
await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "A" }, default);
await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "B" }, default);
var all = meta.GetAllAssignments();
all.Count.ShouldBe(2);
}
// ---------------------------------------------------------------
// GetConsumerAssignment
// ---------------------------------------------------------------
[Fact]
public void GetConsumerAssignment_returns_null_for_nonexistent_stream()
{
var meta = new JetStreamMetaGroup(3);
meta.GetConsumerAssignment("MISSING", "C1").ShouldBeNull();
}
[Fact]
public async Task GetConsumerAssignment_returns_null_for_nonexistent_consumer()
{
var meta = new JetStreamMetaGroup(3);
await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "S" }, default);
meta.GetConsumerAssignment("S", "MISSING").ShouldBeNull();
}
// ---------------------------------------------------------------
// Idempotent backward-compatible paths
// ---------------------------------------------------------------
[Fact]
public async Task Duplicate_stream_create_is_idempotent_via_unvalidated_path()
{
var meta = new JetStreamMetaGroup(3);
await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "DUP" }, default);
await meta.ProposeCreateStreamAsync(new StreamConfig { Name = "DUP" }, default);
meta.StreamCount.ShouldBe(1);
}
[Fact]
public async Task Consumer_on_nonexistent_stream_is_silent_via_unvalidated_path()
{
var meta = new JetStreamMetaGroup(3);
var cg = new RaftGroup { Name = "cg", Peers = ["p1"] };
// Should not throw
await meta.ProposeCreateConsumerAsync("MISSING", "C1", cg, default);
meta.GetStreamAssignment("MISSING").ShouldBeNull();
}
}

View File

@@ -0,0 +1,309 @@
// Go parity: golang/nats-server/server/jetstream_cluster.go:7212 selectPeerGroup
// Covers: PlacementEngine peer selection with cluster affinity, tag filtering,
// exclude-tag filtering, unavailable peer exclusion, storage-based ordering,
// single replica selection, and combined policy filtering.
using NATS.Server.JetStream.Cluster;
namespace NATS.Server.Tests.JetStream.Cluster;
/// <summary>
/// Tests for PlacementEngine topology-aware peer selection.
/// Go reference: jetstream_cluster.go:7212 selectPeerGroup.
/// </summary>
public class PlacementEngineTests
{
// ---------------------------------------------------------------
// Basic selection with enough peers
// Go reference: jetstream_cluster.go selectPeerGroup base case
// ---------------------------------------------------------------
[Fact]
public void Basic_selection_with_enough_peers()
{
var peers = CreatePeers(5);
var group = PlacementEngine.SelectPeerGroup("test-group", 3, peers);
group.Name.ShouldBe("test-group");
group.Peers.Count.ShouldBe(3);
}
[Fact]
public void Selection_returns_exact_replica_count()
{
var peers = CreatePeers(10);
var group = PlacementEngine.SelectPeerGroup("exact", 5, peers);
group.Peers.Count.ShouldBe(5);
}
// ---------------------------------------------------------------
// Insufficient peers throws
// Go reference: jetstream_cluster.go not enough peers error
// ---------------------------------------------------------------
[Fact]
public void Insufficient_peers_throws()
{
var peers = CreatePeers(2);
Should.Throw<InvalidOperationException>(
() => PlacementEngine.SelectPeerGroup("fail", 5, peers));
}
[Fact]
public void Zero_peers_with_replicas_throws()
{
var group = Should.Throw<InvalidOperationException>(
() => PlacementEngine.SelectPeerGroup("empty", 1, []));
}
// ---------------------------------------------------------------
// Cluster affinity filtering
// Go reference: jetstream_cluster.go cluster affinity in placement
// ---------------------------------------------------------------
[Fact]
public void Cluster_affinity_selects_only_matching_cluster()
{
var peers = new List<PeerInfo>
{
new() { PeerId = "p1", Cluster = "us-east" },
new() { PeerId = "p2", Cluster = "us-west" },
new() { PeerId = "p3", Cluster = "us-east" },
new() { PeerId = "p4", Cluster = "us-east" },
new() { PeerId = "p5", Cluster = "eu-west" },
};
var policy = new PlacementPolicy { Cluster = "us-east" };
var group = PlacementEngine.SelectPeerGroup("cluster", 3, peers, policy);
group.Peers.Count.ShouldBe(3);
group.Peers.ShouldAllBe(id => id.StartsWith("p1") || id.StartsWith("p3") || id.StartsWith("p4"));
}
[Fact]
public void Cluster_affinity_is_case_insensitive()
{
var peers = new List<PeerInfo>
{
new() { PeerId = "p1", Cluster = "US-East" },
new() { PeerId = "p2", Cluster = "us-east" },
};
var policy = new PlacementPolicy { Cluster = "us-east" };
var group = PlacementEngine.SelectPeerGroup("ci", 2, peers, policy);
group.Peers.Count.ShouldBe(2);
}
[Fact]
public void Cluster_affinity_with_insufficient_matching_throws()
{
var peers = new List<PeerInfo>
{
new() { PeerId = "p1", Cluster = "us-east" },
new() { PeerId = "p2", Cluster = "us-west" },
};
var policy = new PlacementPolicy { Cluster = "us-east" };
Should.Throw<InvalidOperationException>(
() => PlacementEngine.SelectPeerGroup("fail", 2, peers, policy));
}
// ---------------------------------------------------------------
// Tag filtering (include and exclude)
// Go reference: jetstream_cluster.go tag-based filtering
// ---------------------------------------------------------------
[Fact]
public void Tag_filtering_selects_peers_with_all_required_tags()
{
var peers = new List<PeerInfo>
{
new() { PeerId = "p1", Tags = ["ssd", "fast"] },
new() { PeerId = "p2", Tags = ["ssd"] },
new() { PeerId = "p3", Tags = ["ssd", "fast", "gpu"] },
new() { PeerId = "p4", Tags = ["hdd"] },
};
var policy = new PlacementPolicy { Tags = ["ssd", "fast"] };
var group = PlacementEngine.SelectPeerGroup("tags", 2, peers, policy);
group.Peers.Count.ShouldBe(2);
group.Peers.ShouldContain("p1");
group.Peers.ShouldContain("p3");
}
[Fact]
public void Exclude_tag_filtering_removes_peers_with_excluded_tags()
{
var peers = new List<PeerInfo>
{
new() { PeerId = "p1", Tags = ["ssd"] },
new() { PeerId = "p2", Tags = ["ssd", "deprecated"] },
new() { PeerId = "p3", Tags = ["ssd"] },
};
var policy = new PlacementPolicy { ExcludeTags = ["deprecated"] };
var group = PlacementEngine.SelectPeerGroup("excl", 2, peers, policy);
group.Peers.Count.ShouldBe(2);
group.Peers.ShouldNotContain("p2");
}
// ---------------------------------------------------------------
// Unavailable peers excluded
// Go reference: jetstream_cluster.go offline peer filter
// ---------------------------------------------------------------
[Fact]
public void Unavailable_peers_are_excluded()
{
var peers = new List<PeerInfo>
{
new() { PeerId = "p1", Available = true },
new() { PeerId = "p2", Available = false },
new() { PeerId = "p3", Available = true },
new() { PeerId = "p4", Available = false },
};
var group = PlacementEngine.SelectPeerGroup("avail", 2, peers);
group.Peers.Count.ShouldBe(2);
group.Peers.ShouldContain("p1");
group.Peers.ShouldContain("p3");
}
[Fact]
public void All_unavailable_throws()
{
var peers = new List<PeerInfo>
{
new() { PeerId = "p1", Available = false },
new() { PeerId = "p2", Available = false },
};
Should.Throw<InvalidOperationException>(
() => PlacementEngine.SelectPeerGroup("fail", 1, peers));
}
// ---------------------------------------------------------------
// Peers ordered by available storage
// Go reference: jetstream_cluster.go storage-based ordering
// ---------------------------------------------------------------
[Fact]
public void Peers_ordered_by_available_storage_descending()
{
var peers = new List<PeerInfo>
{
new() { PeerId = "low", AvailableStorage = 100 },
new() { PeerId = "high", AvailableStorage = 10000 },
new() { PeerId = "mid", AvailableStorage = 5000 },
};
var group = PlacementEngine.SelectPeerGroup("storage", 2, peers);
// Should pick high and mid (top 2 by storage)
group.Peers[0].ShouldBe("high");
group.Peers[1].ShouldBe("mid");
}
// ---------------------------------------------------------------
// Single replica selection
// ---------------------------------------------------------------
[Fact]
public void Single_replica_selection()
{
var peers = CreatePeers(5);
var group = PlacementEngine.SelectPeerGroup("single", 1, peers);
group.Peers.Count.ShouldBe(1);
}
// ---------------------------------------------------------------
// Policy with all filters combined
// Go reference: jetstream_cluster.go combined placement policy
// ---------------------------------------------------------------
[Fact]
public void Combined_policy_filters_applied_together()
{
var peers = new List<PeerInfo>
{
new() { PeerId = "p1", Cluster = "us-east", Tags = ["ssd"], Available = true, AvailableStorage = 5000 },
new() { PeerId = "p2", Cluster = "us-east", Tags = ["ssd", "old"], Available = true, AvailableStorage = 8000 },
new() { PeerId = "p3", Cluster = "us-west", Tags = ["ssd"], Available = true, AvailableStorage = 9000 },
new() { PeerId = "p4", Cluster = "us-east", Tags = ["ssd"], Available = false, AvailableStorage = 10000 },
new() { PeerId = "p5", Cluster = "us-east", Tags = ["ssd"], Available = true, AvailableStorage = 7000 },
new() { PeerId = "p6", Cluster = "us-east", Tags = ["hdd"], Available = true, AvailableStorage = 12000 },
};
var policy = new PlacementPolicy
{
Cluster = "us-east",
Tags = ["ssd"],
ExcludeTags = ["old"],
};
// After filtering: p1 (5000), p5 (7000) — p2 excluded (old tag), p3 (wrong cluster), p4 (unavailable), p6 (no ssd tag)
var group = PlacementEngine.SelectPeerGroup("combined", 2, peers, policy);
group.Peers.Count.ShouldBe(2);
// Ordered by storage descending: p5 (7000) first, p1 (5000) second
group.Peers[0].ShouldBe("p5");
group.Peers[1].ShouldBe("p1");
}
// ---------------------------------------------------------------
// Null policy is allowed (no filtering)
// ---------------------------------------------------------------
[Fact]
public void Null_policy_selects_without_filtering()
{
var peers = CreatePeers(3);
var group = PlacementEngine.SelectPeerGroup("nofilter", 3, peers, policy: null);
group.Peers.Count.ShouldBe(3);
}
// ---------------------------------------------------------------
// Empty policy fields are ignored
// ---------------------------------------------------------------
[Fact]
public void Empty_policy_cluster_is_ignored()
{
var peers = new List<PeerInfo>
{
new() { PeerId = "p1", Cluster = "us-east" },
new() { PeerId = "p2", Cluster = "us-west" },
};
var policy = new PlacementPolicy { Cluster = "" };
var group = PlacementEngine.SelectPeerGroup("empty-cluster", 2, peers, policy);
group.Peers.Count.ShouldBe(2);
}
// ---------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------
private static List<PeerInfo> CreatePeers(int count)
{
return Enumerable.Range(1, count)
.Select(i => new PeerInfo
{
PeerId = $"peer-{i}",
Available = true,
AvailableStorage = long.MaxValue - i,
})
.ToList();
}
}

View File

@@ -0,0 +1,196 @@
// Go parity: golang/nats-server/server/jetstream_cluster.go
// Covers: Per-stream RAFT group message proposals, message count tracking,
// sequence tracking, leader change events, replica status reporting,
// and non-leader rejection.
using NATS.Server.JetStream.Cluster;
namespace NATS.Server.Tests.JetStream.Cluster;
/// <summary>
/// Tests for StreamReplicaGroup stream-specific RAFT apply logic:
/// message proposals, message count, last sequence, leader change
/// event, and replica status reporting.
/// Go reference: jetstream_cluster.go processStreamMsg, processStreamEntries.
/// </summary>
public class StreamRaftGroupTests
{
// ---------------------------------------------------------------
// ProposeMessageAsync succeeds as leader
// Go reference: jetstream_cluster.go processStreamMsg
// ---------------------------------------------------------------
[Fact]
public async Task Propose_message_succeeds_as_leader()
{
var group = new StreamReplicaGroup("MSGS", replicas: 3);
var index = await group.ProposeMessageAsync(
"orders.new", ReadOnlyMemory<byte>.Empty, "hello"u8.ToArray(), default);
index.ShouldBeGreaterThan(0);
}
// ---------------------------------------------------------------
// ProposeMessageAsync fails when not leader
// Go reference: jetstream_cluster.go leader check
// ---------------------------------------------------------------
[Fact]
public async Task Propose_message_fails_when_not_leader()
{
var group = new StreamReplicaGroup("NOLEAD", replicas: 3);
// Step down so the current leader is no longer leader
group.Leader.RequestStepDown();
await Should.ThrowAsync<InvalidOperationException>(async () =>
await group.ProposeMessageAsync(
"test.sub", ReadOnlyMemory<byte>.Empty, "data"u8.ToArray(), default));
}
// ---------------------------------------------------------------
// Message count increments after proposal
// Go reference: stream.go state.Msgs tracking
// ---------------------------------------------------------------
[Fact]
public async Task Message_count_increments_after_proposal()
{
var group = new StreamReplicaGroup("COUNT", replicas: 3);
group.MessageCount.ShouldBe(0);
await group.ProposeMessageAsync("a.1", ReadOnlyMemory<byte>.Empty, "m1"u8.ToArray(), default);
group.MessageCount.ShouldBe(1);
await group.ProposeMessageAsync("a.2", ReadOnlyMemory<byte>.Empty, "m2"u8.ToArray(), default);
group.MessageCount.ShouldBe(2);
await group.ProposeMessageAsync("a.3", ReadOnlyMemory<byte>.Empty, "m3"u8.ToArray(), default);
group.MessageCount.ShouldBe(3);
}
// ---------------------------------------------------------------
// Last sequence tracks correctly
// Go reference: stream.go state.LastSeq
// ---------------------------------------------------------------
[Fact]
public async Task Last_sequence_tracks_correctly()
{
var group = new StreamReplicaGroup("SEQ", replicas: 3);
group.LastSequence.ShouldBe(0);
var idx1 = await group.ProposeMessageAsync("s.1", ReadOnlyMemory<byte>.Empty, "d1"u8.ToArray(), default);
group.LastSequence.ShouldBe(idx1);
var idx2 = await group.ProposeMessageAsync("s.2", ReadOnlyMemory<byte>.Empty, "d2"u8.ToArray(), default);
group.LastSequence.ShouldBe(idx2);
idx2.ShouldBeGreaterThan(idx1);
}
// ---------------------------------------------------------------
// Step down triggers leader change event
// Go reference: jetstream_cluster.go leader change notification
// ---------------------------------------------------------------
[Fact]
public async Task Step_down_triggers_leader_change_event()
{
var group = new StreamReplicaGroup("EVENT", replicas: 3);
var previousId = group.Leader.Id;
LeaderChangedEventArgs? receivedArgs = null;
group.LeaderChanged += (_, args) => receivedArgs = args;
await group.StepDownAsync(default);
receivedArgs.ShouldNotBeNull();
receivedArgs.PreviousLeaderId.ShouldBe(previousId);
receivedArgs.NewLeaderId.ShouldNotBe(previousId);
receivedArgs.NewTerm.ShouldBeGreaterThan(0);
}
[Fact]
public async Task Multiple_stepdowns_fire_leader_changed_each_time()
{
var group = new StreamReplicaGroup("MULTI_EVENT", replicas: 3);
var eventCount = 0;
group.LeaderChanged += (_, _) => eventCount++;
await group.StepDownAsync(default);
await group.StepDownAsync(default);
await group.StepDownAsync(default);
eventCount.ShouldBe(3);
}
// ---------------------------------------------------------------
// Replica status reports correct state
// Go reference: jetstream_cluster.go stream replica status
// ---------------------------------------------------------------
[Fact]
public async Task Replica_status_reports_correct_state()
{
var group = new StreamReplicaGroup("STATUS", replicas: 3);
await group.ProposeMessageAsync("x.1", ReadOnlyMemory<byte>.Empty, "m1"u8.ToArray(), default);
await group.ProposeMessageAsync("x.2", ReadOnlyMemory<byte>.Empty, "m2"u8.ToArray(), default);
var status = group.GetStatus();
status.StreamName.ShouldBe("STATUS");
status.LeaderId.ShouldBe(group.Leader.Id);
status.LeaderTerm.ShouldBeGreaterThan(0);
status.MessageCount.ShouldBe(2);
status.LastSequence.ShouldBeGreaterThan(0);
status.ReplicaCount.ShouldBe(3);
}
[Fact]
public void Initial_status_has_zero_messages()
{
var group = new StreamReplicaGroup("EMPTY", replicas: 1);
var status = group.GetStatus();
status.MessageCount.ShouldBe(0);
status.LastSequence.ShouldBe(0);
status.ReplicaCount.ShouldBe(1);
}
// ---------------------------------------------------------------
// Status updates after step down
// ---------------------------------------------------------------
[Fact]
public async Task Status_reflects_new_leader_after_stepdown()
{
var group = new StreamReplicaGroup("NEWLEAD", replicas: 3);
var statusBefore = group.GetStatus();
await group.StepDownAsync(default);
var statusAfter = group.GetStatus();
statusAfter.LeaderId.ShouldNotBe(statusBefore.LeaderId);
}
// ---------------------------------------------------------------
// ProposeAsync still works after ProposeMessageAsync
// ---------------------------------------------------------------
[Fact]
public async Task ProposeAsync_and_ProposeMessageAsync_coexist()
{
var group = new StreamReplicaGroup("COEXIST", replicas: 3);
var idx1 = await group.ProposeAsync("PUB test.1", default);
var idx2 = await group.ProposeMessageAsync("test.2", ReadOnlyMemory<byte>.Empty, "data"u8.ToArray(), default);
idx2.ShouldBeGreaterThan(idx1);
group.MessageCount.ShouldBe(1); // Only ProposeMessageAsync increments message count
}
}

View File

@@ -0,0 +1,309 @@
// Go parity: golang/nats-server/server/jetstream_cluster.go
// Covers: StreamReplicaGroup construction from StreamAssignment, per-stream RAFT apply
// logic (processStreamEntries), checkpoint/restore snapshot lifecycle, and commit/processed
// index tracking through the group facade.
using NATS.Server.JetStream.Cluster;
using NATS.Server.Raft;
namespace NATS.Server.Tests.JetStream.Cluster;
/// <summary>
/// Tests for B10: per-stream RAFT apply logic added to StreamReplicaGroup.
/// Covers construction from StreamAssignment, apply loop, snapshot checkpoint/restore,
/// and the CommitIndex/ProcessedIndex/PendingCommits facade properties.
/// Go reference: jetstream_cluster.go processStreamAssignment, processStreamEntries.
/// </summary>
public class StreamReplicaGroupApplyTests
{
// ---------------------------------------------------------------
// Go: jetstream_cluster.go processStreamAssignment — builds per-stream raft group
// ---------------------------------------------------------------
[Fact]
public void Construction_from_assignment_creates_correct_number_of_nodes()
{
var assignment = new StreamAssignment
{
StreamName = "ORDERS",
Group = new RaftGroup
{
Name = "orders-raft",
Peers = ["n1", "n2", "n3"],
},
};
var group = new StreamReplicaGroup(assignment);
group.Nodes.Count.ShouldBe(3);
group.StreamName.ShouldBe("ORDERS");
group.Assignment.ShouldNotBeNull();
group.Assignment!.StreamName.ShouldBe("ORDERS");
}
[Fact]
public void Construction_from_assignment_uses_peer_ids_as_node_ids()
{
var assignment = new StreamAssignment
{
StreamName = "EVENTS",
Group = new RaftGroup
{
Name = "events-raft",
Peers = ["peer-a", "peer-b", "peer-c"],
},
};
var group = new StreamReplicaGroup(assignment);
var nodeIds = group.Nodes.Select(n => n.Id).ToHashSet();
nodeIds.ShouldContain("peer-a");
nodeIds.ShouldContain("peer-b");
nodeIds.ShouldContain("peer-c");
}
[Fact]
public void Construction_from_assignment_elects_leader()
{
var assignment = new StreamAssignment
{
StreamName = "STREAM",
Group = new RaftGroup
{
Name = "stream-raft",
Peers = ["n1", "n2", "n3"],
},
};
var group = new StreamReplicaGroup(assignment);
group.Leader.ShouldNotBeNull();
group.Leader.IsLeader.ShouldBeTrue();
}
[Fact]
public void Construction_from_assignment_with_no_peers_creates_single_node()
{
var assignment = new StreamAssignment
{
StreamName = "SOLO",
Group = new RaftGroup { Name = "solo-raft" },
};
var group = new StreamReplicaGroup(assignment);
group.Nodes.Count.ShouldBe(1);
group.Leader.IsLeader.ShouldBeTrue();
}
// ---------------------------------------------------------------
// Go: raft.go:150-160 (applied/processed fields) — commit index on proposal
// ---------------------------------------------------------------
[Fact]
public async Task ProposeAsync_through_group_increments_commit_index()
{
var group = new StreamReplicaGroup("TRACK", replicas: 3);
group.CommitIndex.ShouldBe(0);
await group.ProposeAsync("msg.1", default);
group.CommitIndex.ShouldBe(1);
}
[Fact]
public async Task Multiple_proposals_increment_commit_index_monotonically()
{
var group = new StreamReplicaGroup("MULTI", replicas: 3);
await group.ProposeAsync("msg.1", default);
await group.ProposeAsync("msg.2", default);
await group.ProposeAsync("msg.3", default);
group.CommitIndex.ShouldBe(3);
}
// ---------------------------------------------------------------
// Go: jetstream_cluster.go processStreamEntries — apply loop
// ---------------------------------------------------------------
[Fact]
public async Task ApplyCommittedEntriesAsync_processes_pending_entries()
{
var group = new StreamReplicaGroup("APPLY", replicas: 3);
await group.ProposeAsync("store.msg.1", default);
await group.ProposeAsync("store.msg.2", default);
group.PendingCommits.ShouldBe(2);
await group.ApplyCommittedEntriesAsync(default);
group.PendingCommits.ShouldBe(0);
group.ProcessedIndex.ShouldBe(2);
}
[Fact]
public async Task ApplyCommittedEntriesAsync_marks_regular_entries_as_processed()
{
var group = new StreamReplicaGroup("MARK", replicas: 1);
var idx = await group.ProposeAsync("data.record", default);
group.ProcessedIndex.ShouldBe(0);
await group.ApplyCommittedEntriesAsync(default);
group.ProcessedIndex.ShouldBe(idx);
}
[Fact]
public async Task ApplyCommittedEntriesAsync_on_empty_queue_is_noop()
{
var group = new StreamReplicaGroup("EMPTY", replicas: 3);
// No proposals — queue is empty, should not throw
await group.ApplyCommittedEntriesAsync(default);
group.ProcessedIndex.ShouldBe(0);
group.PendingCommits.ShouldBe(0);
}
// ---------------------------------------------------------------
// Go: raft.go CreateSnapshotCheckpoint — snapshot lifecycle
// ---------------------------------------------------------------
[Fact]
public async Task CheckpointAsync_creates_snapshot_at_current_state()
{
var group = new StreamReplicaGroup("SNAP", replicas: 3);
await group.ProposeAsync("entry.1", default);
await group.ProposeAsync("entry.2", default);
var snapshot = await group.CheckpointAsync(default);
snapshot.ShouldNotBeNull();
snapshot.LastIncludedIndex.ShouldBeGreaterThan(0);
}
[Fact]
public async Task CheckpointAsync_snapshot_index_matches_applied_index()
{
var group = new StreamReplicaGroup("SNAPIDX", replicas: 1);
await group.ProposeAsync("record.1", default);
await group.ProposeAsync("record.2", default);
var snapshot = await group.CheckpointAsync(default);
snapshot.LastIncludedIndex.ShouldBe(group.Leader.AppliedIndex);
}
// ---------------------------------------------------------------
// Go: raft.go DrainAndReplaySnapshot — restore lifecycle
// ---------------------------------------------------------------
[Fact]
public async Task RestoreFromSnapshotAsync_restores_state()
{
var group = new StreamReplicaGroup("RESTORE", replicas: 3);
await group.ProposeAsync("pre.1", default);
await group.ProposeAsync("pre.2", default);
var snapshot = await group.CheckpointAsync(default);
// Advance state further after snapshot
await group.ProposeAsync("post.1", default);
// Restore: should drain queue and roll back to snapshot state
await group.RestoreFromSnapshotAsync(snapshot, default);
// After restore the commit index reflects the snapshot
group.CommitIndex.ShouldBe(snapshot.LastIncludedIndex);
// Pending commits should be drained
group.PendingCommits.ShouldBe(0);
}
[Fact]
public async Task RestoreFromSnapshotAsync_drains_pending_commits()
{
var group = new StreamReplicaGroup("DRAIN", replicas: 3);
// Propose several entries so queue has items
await group.ProposeAsync("queued.1", default);
await group.ProposeAsync("queued.2", default);
await group.ProposeAsync("queued.3", default);
group.PendingCommits.ShouldBeGreaterThan(0);
var snapshot = new RaftSnapshot
{
LastIncludedIndex = 3,
LastIncludedTerm = group.Leader.Term,
};
await group.RestoreFromSnapshotAsync(snapshot, default);
group.PendingCommits.ShouldBe(0);
}
// ---------------------------------------------------------------
// Go: raft.go:150-160 — PendingCommits reflects commit queue depth
// ---------------------------------------------------------------
[Fact]
public async Task PendingCommits_reflects_commit_queue_depth()
{
var group = new StreamReplicaGroup("QUEUE", replicas: 3);
group.PendingCommits.ShouldBe(0);
await group.ProposeAsync("q.1", default);
group.PendingCommits.ShouldBe(1);
await group.ProposeAsync("q.2", default);
group.PendingCommits.ShouldBe(2);
await group.ApplyCommittedEntriesAsync(default);
group.PendingCommits.ShouldBe(0);
}
// ---------------------------------------------------------------
// Go: raft.go applied/processed tracking — CommitIndex and ProcessedIndex
// ---------------------------------------------------------------
[Fact]
public async Task CommitIndex_and_ProcessedIndex_track_through_the_group()
{
var group = new StreamReplicaGroup("INDICES", replicas: 3);
group.CommitIndex.ShouldBe(0);
group.ProcessedIndex.ShouldBe(0);
await group.ProposeAsync("step.1", default);
group.CommitIndex.ShouldBe(1);
// Not yet applied
group.ProcessedIndex.ShouldBe(0);
await group.ApplyCommittedEntriesAsync(default);
group.ProcessedIndex.ShouldBe(1);
await group.ProposeAsync("step.2", default);
group.CommitIndex.ShouldBe(2);
group.ProcessedIndex.ShouldBe(1); // still only first entry applied
await group.ApplyCommittedEntriesAsync(default);
group.ProcessedIndex.ShouldBe(2);
}
[Fact]
public void CommitIndex_initially_zero_for_fresh_group()
{
var group = new StreamReplicaGroup("FRESH", replicas: 5);
group.CommitIndex.ShouldBe(0);
group.ProcessedIndex.ShouldBe(0);
group.PendingCommits.ShouldBe(0);
}
}

File diff suppressed because it is too large Load Diff