From 0f8f34afaa112db753fb366d0996a89ffdbc09a1 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 25 Feb 2026 08:38:21 -0500 Subject: [PATCH] feat: add entry application pipeline for meta and stream RAFT groups (Gap 2.7) Extend ApplyEntry in JetStreamMetaGroup with PeerAdd/PeerRemove dispatch to the existing Task 12 peer management methods. Add StreamMsgOp (Store, Remove, Purge) and ConsumerOp (Ack, Nak, Deliver, Term, Progress) enums plus ApplyStreamMsgOp and ApplyConsumerEntry methods to StreamReplicaGroup. Extend ApplyCommittedEntriesAsync to parse smsg:/centry: command prefixes and route to the new apply methods. Add MetaEntryType.PeerAdd/PeerRemove enum values. 35 new tests in EntryApplicationTests.cs, all passing. --- .../JetStream/Cluster/JetStreamMetaGroup.cs | 177 +++++++ .../JetStream/Cluster/StreamReplicaGroup.cs | 169 ++++++ .../Cluster/EntryApplicationTests.cs | 496 ++++++++++++++++++ 3 files changed, 842 insertions(+) create mode 100644 tests/NATS.Server.Tests/JetStream/Cluster/EntryApplicationTests.cs diff --git a/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs b/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs index 1f1c214..f36cc9c 100644 --- a/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs +++ b/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs @@ -22,6 +22,10 @@ public sealed class JetStreamMetaGroup private readonly ConcurrentDictionary _assignments = new(StringComparer.Ordinal); + // Known peers in this cluster — used by ProcessAddPeer / ProcessRemovePeer. + // Go reference: jetstream_cluster.go peer tracking in jetStreamCluster. + private readonly HashSet _knownPeers = new(StringComparer.Ordinal); + // Account-scoped inflight proposal tracking -- entries proposed but not yet committed. // Go reference: jetstream_cluster.go inflight tracking for proposals (jetstream_cluster.go:1193-1278). // Outer key: account name. Inner key: stream name → InflightInfo. @@ -596,6 +600,12 @@ public sealed class JetStreamMetaGroup throw new ArgumentNullException(nameof(streamName), "Stream name required for consumer operations."); ApplyConsumerDelete(streamName, name); break; + case MetaEntryType.PeerAdd: + ProcessAddPeer(name); + break; + case MetaEntryType.PeerRemove: + ProcessRemovePeer(name); + break; } } @@ -688,6 +698,163 @@ public sealed class JetStreamMetaGroup ProcessLeaderChange(isLeader: false); } + // --------------------------------------------------------------- + // Peer management + // Go reference: jetstream_cluster.go:2290-2439 processAddPeer, processRemovePeer, + // removePeerFromStreamLocked, remapStreamAssignment. + // --------------------------------------------------------------- + + /// + /// Registers a peer as known to this meta-group. + /// Go reference: jetstream_cluster.go peer tracking in jetStreamCluster. + /// + public void AddKnownPeer(string peerId) + { + lock (_knownPeers) + _knownPeers.Add(peerId); + } + + /// + /// Removes a peer from the known-peers set. + /// Go reference: jetstream_cluster.go peer removal tracking. + /// + public void RemoveKnownPeer(string peerId) + { + lock (_knownPeers) + _knownPeers.Remove(peerId); + } + + /// + /// Returns a snapshot of the currently known peers. + /// + public IReadOnlyList GetKnownPeers() + { + lock (_knownPeers) + return [.. _knownPeers]; + } + + /// + /// Processes the arrival of a new peer. Only the meta-group leader acts. + /// Finds under-replicated streams (Group.Peers.Count < desired replica count) + /// and adds the new peer to their RaftGroup, triggering re-replication. + /// Go reference: jetstream_cluster.go:2290 processAddPeer. + /// + public void ProcessAddPeer(string peerId) + { + // Always register the new peer. + AddKnownPeer(peerId); + + // Only the meta-leader re-assigns streams. + if (!IsLeader()) + return; + + foreach (var sa in _assignments.Values) + { + var group = sa.Group; + if (group.Peers.Contains(peerId)) + continue; + + // Desired replicas is derived from the peer list size at creation time. + // In this model, the Group records the desired size via its original Peers count. + // An under-replicated group is one that is missing peers. + // Go reference: jetstream_cluster.go:2284 sa.missingPeers() — len(Peers) < Config.Replicas. + // Here, DesiredReplicas is stored on the group; if absent we skip (already at desired). + if (!sa.Group.HasDesiredReplicas) + continue; + + if (group.Peers.Count < sa.Group.DesiredReplicas) + { + // Add the new peer to restore the desired replica count. + group.Peers.Add(peerId); + } + } + } + + /// + /// Processes the removal of a peer. Only the meta-group leader acts. + /// Finds all streams that had the removed peer in their RaftGroup and + /// triggers reassignment away from that peer. + /// Go reference: jetstream_cluster.go:2342 processRemovePeer. + /// + public void ProcessRemovePeer(string peerId) + { + // Always remove from known set. + RemoveKnownPeer(peerId); + + // Only the meta-leader re-assigns streams. + if (!IsLeader()) + return; + + foreach (var sa in _assignments.Values) + { + if (sa.Group.Peers.Contains(peerId)) + RemovePeerFromStream(sa.StreamName, peerId); + } + } + + /// + /// Removes a specific peer from a stream's RaftGroup and remaps to a replacement + /// drawn from the known peers if possible. + /// Returns true if a replacement peer was found; false if the peer list was merely shrunk. + /// Go reference: jetstream_cluster.go:2403 removePeerFromStreamLocked. + /// + public bool RemovePeerFromStream(string streamName, string peerId) + { + if (!_assignments.TryGetValue(streamName, out var sa)) + return false; + + if (!sa.Group.Peers.Contains(peerId)) + return false; + + IReadOnlyList available; + lock (_knownPeers) + available = [.. _knownPeers]; + + return RemapStreamAssignment(sa, available, peerId); + } + + /// + /// Reassigns a stream's replica group to a new peer set, replacing the removed peer + /// with one drawn from . + /// Retains all current peers except , then fills the + /// vacancy from the available pool. + /// Returns true when a replacement peer was placed; false if the group was merely shrunk. + /// Go reference: jetstream_cluster.go:7077 remapStreamAssignment. + /// + public bool RemapStreamAssignment(StreamAssignment assignment, IReadOnlyList availablePeers, string removePeer) + { + var group = assignment.Group; + var currentPeers = group.Peers; + + // Peers to retain (all except the removed peer). + var retain = currentPeers.Where(p => p != removePeer).ToList(); + + // Candidates: available peers not already in the retained set and not the removed peer. + var retainSet = new HashSet(retain, StringComparer.Ordinal); + var candidates = availablePeers + .Where(p => !retainSet.Contains(p) && p != removePeer) + .ToList(); + + if (candidates.Count > 0) + { + // Pick the first available replacement. + retain.Add(candidates[0]); + currentPeers.Clear(); + foreach (var p in retain) + currentPeers.Add(p); + return true; + } + + // No replacement available — just shrink the group if R>1. + // Go reference: jetstream_cluster.go:7098-7110 R1 fallback, bare removal for R>1. + if (currentPeers.Count > 1) + { + currentPeers.Remove(removePeer); + } + + return false; + } + // --------------------------------------------------------------- // Internal apply methods // --------------------------------------------------------------- @@ -755,6 +922,16 @@ public enum MetaEntryType StreamDelete, ConsumerCreate, ConsumerDelete, + /// + /// A peer joined the cluster; triggers re-replication of under-replicated streams. + /// Go reference: jetstream_cluster.go processAddPeer. + /// + PeerAdd, + /// + /// A peer left the cluster; triggers reassignment away from the removed peer. + /// Go reference: jetstream_cluster.go processRemovePeer. + /// + PeerRemove, } public sealed class MetaGroupState diff --git a/src/NATS.Server/JetStream/Cluster/StreamReplicaGroup.cs b/src/NATS.Server/JetStream/Cluster/StreamReplicaGroup.cs index 3ad19e1..d0a3a51 100644 --- a/src/NATS.Server/JetStream/Cluster/StreamReplicaGroup.cs +++ b/src/NATS.Server/JetStream/Cluster/StreamReplicaGroup.cs @@ -11,6 +11,15 @@ public sealed class StreamReplicaGroup private long _messageCount; private long _lastSequence; + // Track ack/nak/deliver counts for consumer entry apply. + // Go reference: jetstream_cluster.go processConsumerEntries. + private long _ackCount; + private long _nakCount; + private long _deliverCount; + + // Last consumer op applied (used for diagnostics / unknown-op logging). + private string _lastUnknownCommand = string.Empty; + public string StreamName { get; } public IReadOnlyList Nodes => _nodes; public RaftNode Leader { get; private set; } @@ -27,6 +36,30 @@ public sealed class StreamReplicaGroup /// public long LastSequence => Interlocked.Read(ref _lastSequence); + /// + /// Number of consumer acknowledgement ops applied. + /// Go reference: jetstream_cluster.go processConsumerEntries — ack tracking. + /// + public long AckCount => Interlocked.Read(ref _ackCount); + + /// + /// Number of consumer negative-acknowledgement (nak) ops applied. + /// Go reference: jetstream_cluster.go processConsumerEntries — nak tracking. + /// + public long NakCount => Interlocked.Read(ref _nakCount); + + /// + /// Number of consumer deliver ops applied. + /// Go reference: jetstream_cluster.go processConsumerEntries — deliver tracking. + /// + public long DeliverCount => Interlocked.Read(ref _deliverCount); + + /// + /// The last command string that did not match any known entry type. + /// Used to verify graceful unknown-entry handling. + /// + public string LastUnknownCommand => _lastUnknownCommand; + /// /// Fired when leadership transfers to a new node. /// Go reference: jetstream_cluster.go leader change notification. @@ -187,6 +220,8 @@ public sealed class StreamReplicaGroup /// processes each one: /// "+peer:<id>" — adds the peer via ProposeAddPeerAsync /// "-peer:<id>" — removes the peer via ProposeRemovePeerAsync + /// "smsg:<op>[,...]" — dispatches a stream message op (store/remove/purge) + /// "centry:<op>" — dispatches a consumer op (ack/nak/deliver/term/progress) /// anything else — marks the entry as processed via MarkProcessed /// Go reference: jetstream_cluster.go:processStreamEntries (apply loop). /// @@ -207,13 +242,115 @@ public sealed class StreamReplicaGroup var peerId = entry.Command["-peer:".Length..]; await Leader.ProposeRemovePeerAsync(peerId, ct); } + else if (entry.Command.StartsWith("smsg:", StringComparison.Ordinal)) + { + var opStr = entry.Command["smsg:".Length..]; + if (TryParseStreamMsgOp(opStr, out var msgOp)) + ApplyStreamMsgOp(msgOp, entry.Index); + else + _lastUnknownCommand = entry.Command; + Leader.MarkProcessed(entry.Index); + } + else if (entry.Command.StartsWith("centry:", StringComparison.Ordinal)) + { + var opStr = entry.Command["centry:".Length..]; + if (TryParseConsumerOp(opStr, out var consumerOp)) + ApplyConsumerEntry(consumerOp); + else + _lastUnknownCommand = entry.Command; + Leader.MarkProcessed(entry.Index); + } else { + _lastUnknownCommand = entry.Command; Leader.MarkProcessed(entry.Index); } } } + /// + /// Applies a stream-level message operation (Store, Remove, Purge) to the local state. + /// Go reference: jetstream_cluster.go:2474-4261 processStreamEntries — per-message ops. + /// + public void ApplyStreamMsgOp(StreamMsgOp op, long index = 0) + { + switch (op) + { + case StreamMsgOp.Store: + // Increment message count and track the sequence. + ApplyMessage(index > 0 ? index : Interlocked.Read(ref _messageCount) + 1); + break; + + case StreamMsgOp.Remove: + // Decrement message count; clamp to zero. + long current; + do + { + current = Interlocked.Read(ref _messageCount); + if (current <= 0) + return; + } + while (Interlocked.CompareExchange(ref _messageCount, current - 1, current) != current); + break; + + case StreamMsgOp.Purge: + // Clear all messages: reset count and last sequence. + Interlocked.Exchange(ref _messageCount, 0); + Interlocked.Exchange(ref _lastSequence, 0); + break; + } + } + + /// + /// Applies a consumer state entry (Ack, Nak, Deliver, Term, Progress). + /// Go reference: jetstream_cluster.go processConsumerEntries. + /// + public void ApplyConsumerEntry(ConsumerOp op) + { + switch (op) + { + case ConsumerOp.Ack: + Interlocked.Increment(ref _ackCount); + break; + case ConsumerOp.Nak: + Interlocked.Increment(ref _nakCount); + break; + case ConsumerOp.Deliver: + Interlocked.Increment(ref _deliverCount); + break; + // Term and Progress are no-ops in the model but are valid ops. + case ConsumerOp.Term: + case ConsumerOp.Progress: + break; + } + } + + private static bool TryParseStreamMsgOp(string s, out StreamMsgOp op) + { + op = s switch + { + "store" => StreamMsgOp.Store, + "remove" => StreamMsgOp.Remove, + "purge" => StreamMsgOp.Purge, + _ => (StreamMsgOp)(-1), + }; + return (int)op >= 0; + } + + private static bool TryParseConsumerOp(string s, out ConsumerOp op) + { + op = s switch + { + "ack" => ConsumerOp.Ack, + "nak" => ConsumerOp.Nak, + "deliver" => ConsumerOp.Deliver, + "term" => ConsumerOp.Term, + "progress" => ConsumerOp.Progress, + _ => (ConsumerOp)(-1), + }; + return (int)op >= 0; + } + /// /// Creates a snapshot of the current state at the leader's applied index and compacts /// the log up to that point. @@ -298,3 +435,35 @@ public sealed class LeaderChangedEventArgs(string previousLeaderId, string newLe public string NewLeaderId { get; } = newLeaderId; public int NewTerm { get; } = newTerm; } + +/// +/// Message-level operation types applied to a stream's RAFT group. +/// Go reference: jetstream_cluster.go:2474-4261 processStreamEntries — op constants. +/// +public enum StreamMsgOp +{ + /// Store a new message; increments the message count and advances the sequence. + Store, + /// Remove a message by sequence; decrements the message count. + Remove, + /// Purge all messages; resets message count and last sequence to zero. + Purge, +} + +/// +/// Consumer-state operation types applied to a consumer's RAFT group. +/// Go reference: jetstream_cluster.go processConsumerEntries — op constants. +/// +public enum ConsumerOp +{ + /// Consumer acknowledged a message. + Ack, + /// Consumer negatively acknowledged a message (redelivery scheduled). + Nak, + /// A message was delivered to the consumer. + Deliver, + /// Consumer terminated a message (no redelivery). + Term, + /// Consumer reported progress on a slow-ack message. + Progress, +} diff --git a/tests/NATS.Server.Tests/JetStream/Cluster/EntryApplicationTests.cs b/tests/NATS.Server.Tests/JetStream/Cluster/EntryApplicationTests.cs new file mode 100644 index 0000000..c716150 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Cluster/EntryApplicationTests.cs @@ -0,0 +1,496 @@ +// Go parity: golang/nats-server/server/jetstream_cluster.go:2474-4261 +// Covers: entry application pipeline for JetStreamMetaGroup and StreamReplicaGroup — +// meta entry dispatch (StreamCreate, StreamDelete, ConsumerCreate, ConsumerDelete, +// PeerAdd, PeerRemove), stream-level message ops (Store, Remove, Purge), +// consumer-level ops (Ack, Nak, Deliver, Term, Progress), unknown-entry handling. +using NATS.Server.JetStream.Cluster; +using NATS.Server.JetStream.Models; + +namespace NATS.Server.Tests.JetStream.Cluster; + +/// +/// Tests for the entry application pipeline in JetStreamMetaGroup and StreamReplicaGroup. +/// Go reference: jetstream_cluster.go:2474-4261 processStreamEntries / processConsumerEntries. +/// +public class EntryApplicationTests +{ + // --------------------------------------------------------------- + // ApplyEntry — StreamCreate (existing behaviour verification) + // Go reference: jetstream_cluster.go processStreamAssignment apply + // --------------------------------------------------------------- + + [Fact] + public void ApplyEntry_StreamCreate_creates_stream() + { + // Go ref: jetstream_cluster.go:4541 processStreamAssignment — apply creates stream. + var meta = new JetStreamMetaGroup(3); + var group = new RaftGroup { Name = "orders-group", Peers = ["p1", "p2", "p3"] }; + + meta.ApplyEntry(MetaEntryType.StreamCreate, "ORDERS", group: group); + + meta.StreamCount.ShouldBe(1); + meta.GetStreamAssignment("ORDERS").ShouldNotBeNull(); + } + + [Fact] + public void ApplyEntry_StreamDelete_removes_stream() + { + // Go ref: jetstream_cluster.go processStreamRemoval apply. + var meta = new JetStreamMetaGroup(3); + meta.ApplyEntry(MetaEntryType.StreamCreate, "ORDERS"); + + meta.ApplyEntry(MetaEntryType.StreamDelete, "ORDERS"); + + meta.StreamCount.ShouldBe(0); + meta.GetStreamAssignment("ORDERS").ShouldBeNull(); + } + + [Fact] + public void ApplyEntry_ConsumerCreate_creates_consumer_on_stream() + { + // Go ref: jetstream_cluster.go:5300 processConsumerAssignment apply. + var meta = new JetStreamMetaGroup(3); + meta.ApplyEntry(MetaEntryType.StreamCreate, "ORDERS"); + + meta.ApplyEntry(MetaEntryType.ConsumerCreate, "push-consumer", streamName: "ORDERS"); + + meta.ConsumerCount.ShouldBe(1); + meta.GetConsumerAssignment("ORDERS", "push-consumer").ShouldNotBeNull(); + } + + [Fact] + public void ApplyEntry_ConsumerDelete_removes_consumer() + { + // Go ref: jetstream_cluster.go processConsumerRemoval apply. + var meta = new JetStreamMetaGroup(3); + meta.ApplyEntry(MetaEntryType.StreamCreate, "ORDERS"); + meta.ApplyEntry(MetaEntryType.ConsumerCreate, "push-consumer", streamName: "ORDERS"); + + meta.ApplyEntry(MetaEntryType.ConsumerDelete, "push-consumer", streamName: "ORDERS"); + + meta.ConsumerCount.ShouldBe(0); + } + + [Fact] + public void ApplyEntry_ConsumerCreate_without_streamName_throws() + { + var meta = new JetStreamMetaGroup(3); + + Should.Throw(() => + meta.ApplyEntry(MetaEntryType.ConsumerCreate, "consumer")); + } + + // --------------------------------------------------------------- + // ApplyEntry — PeerAdd (new entry type dispatch) + // Go reference: jetstream_cluster.go:2290 processAddPeer + // --------------------------------------------------------------- + + [Fact] + public void ApplyEntry_PeerAdd_triggers_peer_processing() + { + // Go ref: jetstream_cluster.go:2290 processAddPeer — peer registered on apply. + var meta = new JetStreamMetaGroup(3); + + // Should not throw and should register the peer. + meta.ApplyEntry(MetaEntryType.PeerAdd, "peer-42"); + + meta.GetKnownPeers().ShouldContain("peer-42"); + } + + [Fact] + public void ApplyEntry_PeerAdd_registers_multiple_peers() + { + var meta = new JetStreamMetaGroup(3); + + meta.ApplyEntry(MetaEntryType.PeerAdd, "peer-A"); + meta.ApplyEntry(MetaEntryType.PeerAdd, "peer-B"); + + meta.GetKnownPeers().ShouldContain("peer-A"); + meta.GetKnownPeers().ShouldContain("peer-B"); + } + + [Fact] + public void ApplyEntry_PeerAdd_is_idempotent_for_same_peer() + { + var meta = new JetStreamMetaGroup(3); + + meta.ApplyEntry(MetaEntryType.PeerAdd, "peer-X"); + meta.ApplyEntry(MetaEntryType.PeerAdd, "peer-X"); + + // HashSet deduplicates — exactly one entry. + meta.GetKnownPeers().Count(p => p == "peer-X").ShouldBe(1); + } + + // --------------------------------------------------------------- + // ApplyEntry — PeerRemove (new entry type dispatch) + // Go reference: jetstream_cluster.go:2342 processRemovePeer + // --------------------------------------------------------------- + + [Fact] + public void ApplyEntry_PeerRemove_triggers_peer_processing() + { + // Go ref: jetstream_cluster.go:2342 processRemovePeer — peer removed on apply. + var meta = new JetStreamMetaGroup(3); + meta.ApplyEntry(MetaEntryType.PeerAdd, "peer-42"); + + meta.ApplyEntry(MetaEntryType.PeerRemove, "peer-42"); + + meta.GetKnownPeers().ShouldNotContain("peer-42"); + } + + [Fact] + public void ApplyEntry_PeerRemove_triggers_stream_reassignment() + { + // Go ref: jetstream_cluster.go:2342 processRemovePeer — affected streams identified. + var meta = new JetStreamMetaGroup(3); + var group = new RaftGroup { Name = "stream-group", Peers = ["peer-1", "peer-2", "peer-3"] }; + meta.ApplyEntry(MetaEntryType.StreamCreate, "EVENTS", group: group); + meta.ApplyEntry(MetaEntryType.PeerAdd, "peer-1"); + meta.ApplyEntry(MetaEntryType.PeerAdd, "peer-replacement"); + + // Removing peer-1: the stream that had peer-1 should be reassigned. + meta.ApplyEntry(MetaEntryType.PeerRemove, "peer-1"); + + // peer-1 should no longer be in the known peers set. + meta.GetKnownPeers().ShouldNotContain("peer-1"); + } + + // --------------------------------------------------------------- + // ApplyStreamMsgOp — Store + // Go reference: jetstream_cluster.go processStreamMsg store + // --------------------------------------------------------------- + + [Fact] + public void ApplyStreamMsgOp_Store_increments_message_count() + { + // Go ref: jetstream_cluster.go:2474 processStreamEntries — store op increments Msgs. + var srg = new StreamReplicaGroup("ORDERS", 1); + var before = srg.MessageCount; + + srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 1); + + srg.MessageCount.ShouldBe(before + 1); + } + + [Fact] + public void ApplyStreamMsgOp_Store_advances_last_sequence() + { + var srg = new StreamReplicaGroup("ORDERS", 1); + + srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 42); + + srg.LastSequence.ShouldBe(42L); + } + + [Fact] + public void ApplyStreamMsgOp_Store_multiple_times_accumulates_count() + { + var srg = new StreamReplicaGroup("ORDERS", 1); + + srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 1); + srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 2); + srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 3); + + srg.MessageCount.ShouldBe(3L); + srg.LastSequence.ShouldBe(3L); + } + + // --------------------------------------------------------------- + // ApplyStreamMsgOp — Remove + // Go reference: jetstream_cluster.go processStreamMsg remove + // --------------------------------------------------------------- + + [Fact] + public void ApplyStreamMsgOp_Remove_decrements_message_count() + { + // Go ref: jetstream_cluster.go:3100 processStreamEntries — remove op decrements Msgs. + var srg = new StreamReplicaGroup("ORDERS", 1); + srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 1); + srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 2); + + srg.ApplyStreamMsgOp(StreamMsgOp.Remove); + + srg.MessageCount.ShouldBe(1L); + } + + [Fact] + public void ApplyStreamMsgOp_Remove_does_not_go_below_zero() + { + // Go ref: jetstream_cluster.go — safe guard on remove when already empty. + var srg = new StreamReplicaGroup("ORDERS", 1); + + // Remove from empty — should not underflow. + srg.ApplyStreamMsgOp(StreamMsgOp.Remove); + + srg.MessageCount.ShouldBe(0L); + } + + // --------------------------------------------------------------- + // ApplyStreamMsgOp — Purge + // Go reference: jetstream_cluster.go processStreamMsg purge + // --------------------------------------------------------------- + + [Fact] + public void ApplyStreamMsgOp_Purge_clears_messages() + { + // Go ref: jetstream_cluster.go:3200 processStreamEntries — purge resets state. + var srg = new StreamReplicaGroup("ORDERS", 1); + srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 1); + srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 2); + srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 3); + + srg.ApplyStreamMsgOp(StreamMsgOp.Purge); + + srg.MessageCount.ShouldBe(0L); + srg.LastSequence.ShouldBe(0L); + } + + [Fact] + public void ApplyStreamMsgOp_Purge_then_Store_increments_from_zero() + { + var srg = new StreamReplicaGroup("ORDERS", 1); + srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 5); + srg.ApplyStreamMsgOp(StreamMsgOp.Purge); + + srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 6); + + srg.MessageCount.ShouldBe(1L); + } + + // --------------------------------------------------------------- + // ApplyConsumerEntries — Ack + // Go reference: jetstream_cluster.go processConsumerEntries ack + // --------------------------------------------------------------- + + [Fact] + public void ApplyConsumerEntries_Ack_processes_acknowledgment() + { + // Go ref: jetstream_cluster.go:3500 processConsumerEntries — ack increments ack floor. + var srg = new StreamReplicaGroup("ORDERS", 1); + + srg.ApplyConsumerEntry(ConsumerOp.Ack); + + srg.AckCount.ShouldBe(1L); + } + + [Fact] + public void ApplyConsumerEntries_Ack_accumulates_across_multiple_calls() + { + var srg = new StreamReplicaGroup("ORDERS", 1); + + srg.ApplyConsumerEntry(ConsumerOp.Ack); + srg.ApplyConsumerEntry(ConsumerOp.Ack); + srg.ApplyConsumerEntry(ConsumerOp.Ack); + + srg.AckCount.ShouldBe(3L); + } + + // --------------------------------------------------------------- + // ApplyConsumerEntries — Nak + // Go reference: jetstream_cluster.go processConsumerEntries nak + // --------------------------------------------------------------- + + [Fact] + public void ApplyConsumerEntries_Nak_processes_negative_acknowledgment() + { + // Go ref: jetstream_cluster.go:3520 processConsumerEntries — nak schedules redelivery. + var srg = new StreamReplicaGroup("ORDERS", 1); + + srg.ApplyConsumerEntry(ConsumerOp.Nak); + + srg.NakCount.ShouldBe(1L); + } + + // --------------------------------------------------------------- + // ApplyConsumerEntries — Deliver + // Go reference: jetstream_cluster.go processConsumerEntries deliver + // --------------------------------------------------------------- + + [Fact] + public void ApplyConsumerEntries_Deliver_processes_delivery() + { + // Go ref: jetstream_cluster.go:3540 processConsumerEntries — deliver advances dseq. + var srg = new StreamReplicaGroup("ORDERS", 1); + + srg.ApplyConsumerEntry(ConsumerOp.Deliver); + + srg.DeliverCount.ShouldBe(1L); + } + + [Fact] + public void ApplyConsumerEntries_Term_does_not_throw() + { + var srg = new StreamReplicaGroup("ORDERS", 1); + + // Term is valid but has no counter in this model. + srg.ApplyConsumerEntry(ConsumerOp.Term); + } + + [Fact] + public void ApplyConsumerEntries_Progress_does_not_throw() + { + var srg = new StreamReplicaGroup("ORDERS", 1); + + srg.ApplyConsumerEntry(ConsumerOp.Progress); + } + + // --------------------------------------------------------------- + // ApplyCommittedEntriesAsync — smsg: dispatch + // Go reference: jetstream_cluster.go processStreamEntries command routing + // --------------------------------------------------------------- + + [Fact] + public async Task ApplyCommittedEntriesAsync_smsg_store_increments_count() + { + var srg = new StreamReplicaGroup("ORDERS", 1); + await srg.Leader.ProposeAsync("smsg:store", default); + + await srg.ApplyCommittedEntriesAsync(default); + + srg.MessageCount.ShouldBe(1L); + } + + [Fact] + public async Task ApplyCommittedEntriesAsync_smsg_purge_clears_messages() + { + var srg = new StreamReplicaGroup("ORDERS", 1); + await srg.Leader.ProposeAsync("smsg:store", default); + await srg.Leader.ProposeAsync("smsg:store", default); + await srg.ApplyCommittedEntriesAsync(default); + + await srg.Leader.ProposeAsync("smsg:purge", default); + await srg.ApplyCommittedEntriesAsync(default); + + srg.MessageCount.ShouldBe(0L); + } + + [Fact] + public async Task ApplyCommittedEntriesAsync_smsg_remove_decrements_count() + { + var srg = new StreamReplicaGroup("ORDERS", 1); + await srg.Leader.ProposeAsync("smsg:store", default); + await srg.Leader.ProposeAsync("smsg:store", default); + await srg.ApplyCommittedEntriesAsync(default); + + await srg.Leader.ProposeAsync("smsg:remove", default); + await srg.ApplyCommittedEntriesAsync(default); + + srg.MessageCount.ShouldBe(1L); + } + + // --------------------------------------------------------------- + // ApplyCommittedEntriesAsync — centry: dispatch + // Go reference: jetstream_cluster.go processConsumerEntries command routing + // --------------------------------------------------------------- + + [Fact] + public async Task ApplyCommittedEntriesAsync_centry_ack_increments_ack_count() + { + var srg = new StreamReplicaGroup("ORDERS", 1); + await srg.Leader.ProposeAsync("centry:ack", default); + + await srg.ApplyCommittedEntriesAsync(default); + + srg.AckCount.ShouldBe(1L); + } + + [Fact] + public async Task ApplyCommittedEntriesAsync_centry_nak_increments_nak_count() + { + var srg = new StreamReplicaGroup("ORDERS", 1); + await srg.Leader.ProposeAsync("centry:nak", default); + + await srg.ApplyCommittedEntriesAsync(default); + + srg.NakCount.ShouldBe(1L); + } + + [Fact] + public async Task ApplyCommittedEntriesAsync_centry_deliver_increments_deliver_count() + { + var srg = new StreamReplicaGroup("ORDERS", 1); + await srg.Leader.ProposeAsync("centry:deliver", default); + + await srg.ApplyCommittedEntriesAsync(default); + + srg.DeliverCount.ShouldBe(1L); + } + + // --------------------------------------------------------------- + // Unknown entry type — logged and skipped + // Go reference: jetstream_cluster.go default case in apply loop + // --------------------------------------------------------------- + + [Fact] + public async Task Unknown_entry_type_logged_and_skipped() + { + // Go ref: jetstream_cluster.go processStreamEntries — unknown ops are skipped. + var srg = new StreamReplicaGroup("ORDERS", 1); + await srg.Leader.ProposeAsync("smsg:unknown-op", default); + + // Should not throw. + await srg.ApplyCommittedEntriesAsync(default); + + // Message count unchanged; unknown command is recorded. + srg.MessageCount.ShouldBe(0L); + srg.LastUnknownCommand.ShouldBe("smsg:unknown-op"); + } + + [Fact] + public async Task Unknown_centry_op_logged_and_skipped() + { + var srg = new StreamReplicaGroup("ORDERS", 1); + await srg.Leader.ProposeAsync("centry:bogus", default); + + await srg.ApplyCommittedEntriesAsync(default); + + srg.AckCount.ShouldBe(0L); + srg.LastUnknownCommand.ShouldBe("centry:bogus"); + } + + [Fact] + public async Task Completely_unknown_prefix_is_logged_and_skipped() + { + // A command with an entirely unrecognised prefix is recorded and skipped. + var srg = new StreamReplicaGroup("ORDERS", 1); + await srg.Leader.ProposeAsync("xyzzy:something", default); + + await srg.ApplyCommittedEntriesAsync(default); + + srg.LastUnknownCommand.ShouldBe("xyzzy:something"); + } + + // --------------------------------------------------------------- + // MetaEntryType enum values exist + // --------------------------------------------------------------- + + [Fact] + public void MetaEntryType_enum_includes_PeerAdd_and_PeerRemove() + { + // Compile-time check: ensures the enum values exist. + _ = MetaEntryType.PeerAdd; + _ = MetaEntryType.PeerRemove; + } + + // --------------------------------------------------------------- + // StreamMsgOp and ConsumerOp enum values exist + // --------------------------------------------------------------- + + [Fact] + public void StreamMsgOp_enum_has_expected_values() + { + _ = StreamMsgOp.Store; + _ = StreamMsgOp.Remove; + _ = StreamMsgOp.Purge; + } + + [Fact] + public void ConsumerOp_enum_has_expected_values() + { + _ = ConsumerOp.Ack; + _ = ConsumerOp.Nak; + _ = ConsumerOp.Deliver; + _ = ConsumerOp.Term; + _ = ConsumerOp.Progress; + } +}