using NATS.Server.Raft; namespace NATS.Server.JetStream.Cluster; public sealed class StreamReplicaGroup { private readonly List _nodes; // B10: Message tracking for stream-specific RAFT apply logic. // Go reference: jetstream_cluster.go processStreamMsg — message count and sequence tracking. private long _messageCount; private long _lastSequence; // Track ack/nak/deliver counts for consumer entry apply. // Go reference: jetstream_cluster.go processConsumerEntries. private long _ackCount; private long _nakCount; private long _deliverCount; // Last consumer op applied (used for diagnostics / unknown-op logging). private string _lastUnknownCommand = string.Empty; public string StreamName { get; } public IReadOnlyList Nodes => _nodes; public RaftNode Leader { get; private set; } /// /// Number of messages applied to the local store simulation. /// Go reference: stream.go state.Msgs. /// public long MessageCount => Interlocked.Read(ref _messageCount); /// /// Last sequence number assigned to an applied message. /// Go reference: stream.go state.LastSeq. /// public long LastSequence => Interlocked.Read(ref _lastSequence); /// /// Number of consumer acknowledgement ops applied. /// Go reference: jetstream_cluster.go processConsumerEntries — ack tracking. /// public long AckCount => Interlocked.Read(ref _ackCount); /// /// Number of consumer negative-acknowledgement (nak) ops applied. /// Go reference: jetstream_cluster.go processConsumerEntries — nak tracking. /// public long NakCount => Interlocked.Read(ref _nakCount); /// /// Number of consumer deliver ops applied. /// Go reference: jetstream_cluster.go processConsumerEntries — deliver tracking. /// public long DeliverCount => Interlocked.Read(ref _deliverCount); /// /// The last command string that did not match any known entry type. /// Used to verify graceful unknown-entry handling. /// public string LastUnknownCommand => _lastUnknownCommand; /// /// Fired when leadership transfers to a new node. /// Go reference: jetstream_cluster.go leader change notification. /// public event EventHandler? LeaderChanged; /// /// The stream assignment that was used to construct this group, if created from a /// StreamAssignment. Null when constructed via the (string, int) overload. /// Go reference: jetstream_cluster.go:166-184 streamAssignment struct. /// public StreamAssignment? Assignment { get; private set; } // B10: Commit/processed index passthroughs to the leader node. // Go reference: raft.go:150-160 (applied/processed fields). /// The highest log index committed to quorum on the leader. public long CommitIndex => Leader.CommitIndex; /// The highest log index applied to the state machine on the leader. public long ProcessedIndex => Leader.ProcessedIndex; /// Number of committed entries awaiting state-machine application. public int PendingCommits => Leader.CommitQueue.Count; public StreamReplicaGroup(string streamName, int replicas) { StreamName = streamName; var nodeCount = Math.Max(replicas, 1); _nodes = Enumerable.Range(1, nodeCount) .Select(i => new RaftNode($"{streamName.ToLowerInvariant()}-r{i}")) .ToList(); foreach (var node in _nodes) node.ConfigureCluster(_nodes); Leader = ElectLeader(_nodes[0]); } /// /// Creates a StreamReplicaGroup from a StreamAssignment, naming each RaftNode after the /// peers listed in the assignment's RaftGroup. /// Go reference: jetstream_cluster.go processStreamAssignment — creates a per-stream /// raft group from the assignment's group peers. /// public StreamReplicaGroup(StreamAssignment assignment) { Assignment = assignment; StreamName = assignment.StreamName; var peers = assignment.Group.Peers; if (peers.Count == 0) { // Fall back to a single-node group when no peers are listed. _nodes = [new RaftNode($"{StreamName.ToLowerInvariant()}-r1")]; } else { _nodes = peers .Select(peerId => new RaftNode(peerId)) .ToList(); } foreach (var node in _nodes) node.ConfigureCluster(_nodes); Leader = ElectLeader(_nodes[0]); } public async ValueTask ProposeAsync(string command, CancellationToken ct) { if (!Leader.IsLeader) Leader = ElectLeader(SelectNextCandidate(Leader)); return await Leader.ProposeAsync(command, ct); } /// /// Proposes a message for storage to the stream's RAFT group. /// Encodes subject + payload into a RAFT log entry command. /// Go reference: jetstream_cluster.go processStreamMsg. /// public async ValueTask ProposeMessageAsync( string subject, ReadOnlyMemory headers, ReadOnlyMemory payload, CancellationToken ct) { if (!Leader.IsLeader) throw new InvalidOperationException("Only the stream RAFT leader can propose messages."); // Encode as a PUB command for the RAFT log var command = $"MSG {subject} {headers.Length} {payload.Length}"; var index = await Leader.ProposeAsync(command, ct); // Apply the message locally ApplyMessage(index); return index; } public Task StepDownAsync(CancellationToken ct) { _ = ct; var previous = Leader; previous.RequestStepDown(); Leader = ElectLeader(SelectNextCandidate(previous)); LeaderChanged?.Invoke(this, new LeaderChangedEventArgs(previous.Id, Leader.Id, Leader.Term)); return Task.CompletedTask; } /// /// Returns the current status of the stream replica group. /// Go reference: jetstream_cluster.go stream replica status. /// public StreamReplicaStatus GetStatus() { return new StreamReplicaStatus { StreamName = StreamName, LeaderId = Leader.Id, LeaderTerm = Leader.Term, MessageCount = MessageCount, LastSequence = LastSequence, ReplicaCount = _nodes.Count, CommitIndex = Leader.CommitIndex, AppliedIndex = Leader.AppliedIndex, }; } public Task ApplyPlacementAsync(IReadOnlyList placement, CancellationToken ct) { _ = ct; var targetCount = Math.Max(placement.Count, 1); if (targetCount == _nodes.Count) return Task.CompletedTask; if (targetCount > _nodes.Count) { for (var i = _nodes.Count + 1; i <= targetCount; i++) _nodes.Add(new RaftNode($"{streamNamePrefix()}-r{i}")); } else { _nodes.RemoveRange(targetCount, _nodes.Count - targetCount); } foreach (var node in _nodes) node.ConfigureCluster(_nodes); Leader = ElectLeader(_nodes[0]); return Task.CompletedTask; } // B10: Per-stream RAFT apply logic // Go reference: jetstream_cluster.go processStreamEntries / processStreamMsg /// /// Dequeues all currently pending committed entries from the leader's CommitQueue and /// processes each one: /// "+peer:<id>" — adds the peer via ProposeAddPeerAsync /// "-peer:<id>" — removes the peer via ProposeRemovePeerAsync /// "smsg:<op>[,...]" — dispatches a stream message op (store/remove/purge) /// "centry:<op>" — dispatches a consumer op (ack/nak/deliver/term/progress) /// anything else — marks the entry as processed via MarkProcessed /// Go reference: jetstream_cluster.go:processStreamEntries (apply loop). /// public async Task ApplyCommittedEntriesAsync(CancellationToken ct) { while (Leader.CommitQueue.TryDequeue(out var entry)) { if (entry is null) continue; if (entry.Command.StartsWith("+peer:", StringComparison.Ordinal)) { var peerId = entry.Command["+peer:".Length..]; await Leader.ProposeAddPeerAsync(peerId, ct); } else if (entry.Command.StartsWith("-peer:", StringComparison.Ordinal)) { var peerId = entry.Command["-peer:".Length..]; await Leader.ProposeRemovePeerAsync(peerId, ct); } else if (entry.Command.StartsWith("smsg:", StringComparison.Ordinal)) { var opStr = entry.Command["smsg:".Length..]; if (TryParseStreamMsgOp(opStr, out var msgOp)) ApplyStreamMsgOp(msgOp, entry.Index); else _lastUnknownCommand = entry.Command; Leader.MarkProcessed(entry.Index); } else if (entry.Command.StartsWith("centry:", StringComparison.Ordinal)) { var opStr = entry.Command["centry:".Length..]; if (TryParseConsumerOp(opStr, out var consumerOp)) ApplyConsumerEntry(consumerOp); else _lastUnknownCommand = entry.Command; Leader.MarkProcessed(entry.Index); } else { _lastUnknownCommand = entry.Command; Leader.MarkProcessed(entry.Index); } } } /// /// Applies a stream-level message operation (Store, Remove, Purge) to the local state. /// Go reference: jetstream_cluster.go:2474-4261 processStreamEntries — per-message ops. /// public void ApplyStreamMsgOp(StreamMsgOp op, long index = 0) { switch (op) { case StreamMsgOp.Store: // Increment message count and track the sequence. ApplyMessage(index > 0 ? index : Interlocked.Read(ref _messageCount) + 1); break; case StreamMsgOp.Remove: // Decrement message count; clamp to zero. long current; do { current = Interlocked.Read(ref _messageCount); if (current <= 0) return; } while (Interlocked.CompareExchange(ref _messageCount, current - 1, current) != current); break; case StreamMsgOp.Purge: // Clear all messages: reset count and last sequence. Interlocked.Exchange(ref _messageCount, 0); Interlocked.Exchange(ref _lastSequence, 0); break; } } /// /// Applies a consumer state entry (Ack, Nak, Deliver, Term, Progress). /// Go reference: jetstream_cluster.go processConsumerEntries. /// public void ApplyConsumerEntry(ConsumerOp op) { switch (op) { case ConsumerOp.Ack: Interlocked.Increment(ref _ackCount); break; case ConsumerOp.Nak: Interlocked.Increment(ref _nakCount); break; case ConsumerOp.Deliver: Interlocked.Increment(ref _deliverCount); break; // Term and Progress are no-ops in the model but are valid ops. case ConsumerOp.Term: case ConsumerOp.Progress: break; } } private static bool TryParseStreamMsgOp(string s, out StreamMsgOp op) { op = s switch { "store" => StreamMsgOp.Store, "remove" => StreamMsgOp.Remove, "purge" => StreamMsgOp.Purge, _ => (StreamMsgOp)(-1), }; return (int)op >= 0; } private static bool TryParseConsumerOp(string s, out ConsumerOp op) { op = s switch { "ack" => ConsumerOp.Ack, "nak" => ConsumerOp.Nak, "deliver" => ConsumerOp.Deliver, "term" => ConsumerOp.Term, "progress" => ConsumerOp.Progress, _ => (ConsumerOp)(-1), }; return (int)op >= 0; } /// /// Creates a snapshot of the current state at the leader's applied index and compacts /// the log up to that point. /// Go reference: raft.go CreateSnapshotCheckpoint. /// public Task CheckpointAsync(CancellationToken ct) => Leader.CreateSnapshotCheckpointAsync(ct); /// /// Restores the leader from a previously created snapshot, draining any pending /// commit-queue entries before applying the snapshot state. /// Go reference: raft.go DrainAndReplaySnapshot. /// public Task RestoreFromSnapshotAsync(RaftSnapshot snapshot, CancellationToken ct) => Leader.DrainAndReplaySnapshotAsync(snapshot, ct); private RaftNode SelectNextCandidate(RaftNode currentLeader) { if (_nodes.Count == 1) return _nodes[0]; var index = _nodes.FindIndex(n => n.Id == currentLeader.Id); if (index < 0) return _nodes[0]; return _nodes[(index + 1) % _nodes.Count]; } private RaftNode ElectLeader(RaftNode candidate) { candidate.StartElection(_nodes.Count); foreach (var voter in _nodes.Where(n => n.Id != candidate.Id)) candidate.ReceiveVote(voter.GrantVote(candidate.Term), _nodes.Count); return candidate; } /// /// Applies a committed message entry, incrementing message count and sequence. /// Go reference: jetstream_cluster.go processStreamMsg apply. /// private void ApplyMessage(long index) { Interlocked.Increment(ref _messageCount); // Sequence numbers track 1:1 with applied messages. // Use the RAFT index as the sequence to ensure monotonic ordering. long current; long desired; do { current = Interlocked.Read(ref _lastSequence); desired = Math.Max(current, index); } while (Interlocked.CompareExchange(ref _lastSequence, desired, current) != current); } private string streamNamePrefix() => StreamName.ToLowerInvariant(); } /// /// Status snapshot of a stream replica group. /// Go reference: jetstream_cluster.go stream replica status report. /// public sealed class StreamReplicaStatus { public string StreamName { get; init; } = string.Empty; public string LeaderId { get; init; } = string.Empty; public int LeaderTerm { get; init; } public long MessageCount { get; init; } public long LastSequence { get; init; } public int ReplicaCount { get; init; } public long CommitIndex { get; init; } public long AppliedIndex { get; init; } } /// /// Event args for leader change notifications. /// public sealed class LeaderChangedEventArgs(string previousLeaderId, string newLeaderId, int newTerm) : EventArgs { public string PreviousLeaderId { get; } = previousLeaderId; public string NewLeaderId { get; } = newLeaderId; public int NewTerm { get; } = newTerm; } /// /// Message-level operation types applied to a stream's RAFT group. /// Go reference: jetstream_cluster.go:2474-4261 processStreamEntries — op constants. /// public enum StreamMsgOp { /// Store a new message; increments the message count and advances the sequence. Store, /// Remove a message by sequence; decrements the message count. Remove, /// Purge all messages; resets message count and last sequence to zero. Purge, } /// /// Consumer-state operation types applied to a consumer's RAFT group. /// Go reference: jetstream_cluster.go processConsumerEntries — op constants. /// public enum ConsumerOp { /// Consumer acknowledged a message. Ack, /// Consumer negatively acknowledged a message (redelivery scheduled). Nak, /// A message was delivered to the consumer. Deliver, /// Consumer terminated a message (no redelivery). Term, /// Consumer reported progress on a slow-ack message. Progress, }