diff --git a/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs b/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs index 1f1c214..f36cc9c 100644 --- a/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs +++ b/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs @@ -22,6 +22,10 @@ public sealed class JetStreamMetaGroup private readonly ConcurrentDictionary _assignments = new(StringComparer.Ordinal); + // Known peers in this cluster — used by ProcessAddPeer / ProcessRemovePeer. + // Go reference: jetstream_cluster.go peer tracking in jetStreamCluster. + private readonly HashSet _knownPeers = new(StringComparer.Ordinal); + // Account-scoped inflight proposal tracking -- entries proposed but not yet committed. // Go reference: jetstream_cluster.go inflight tracking for proposals (jetstream_cluster.go:1193-1278). // Outer key: account name. Inner key: stream name → InflightInfo. @@ -596,6 +600,12 @@ public sealed class JetStreamMetaGroup throw new ArgumentNullException(nameof(streamName), "Stream name required for consumer operations."); ApplyConsumerDelete(streamName, name); break; + case MetaEntryType.PeerAdd: + ProcessAddPeer(name); + break; + case MetaEntryType.PeerRemove: + ProcessRemovePeer(name); + break; } } @@ -688,6 +698,163 @@ public sealed class JetStreamMetaGroup ProcessLeaderChange(isLeader: false); } + // --------------------------------------------------------------- + // Peer management + // Go reference: jetstream_cluster.go:2290-2439 processAddPeer, processRemovePeer, + // removePeerFromStreamLocked, remapStreamAssignment. + // --------------------------------------------------------------- + + /// + /// Registers a peer as known to this meta-group. + /// Go reference: jetstream_cluster.go peer tracking in jetStreamCluster. + /// + public void AddKnownPeer(string peerId) + { + lock (_knownPeers) + _knownPeers.Add(peerId); + } + + /// + /// Removes a peer from the known-peers set. + /// Go reference: jetstream_cluster.go peer removal tracking. + /// + public void RemoveKnownPeer(string peerId) + { + lock (_knownPeers) + _knownPeers.Remove(peerId); + } + + /// + /// Returns a snapshot of the currently known peers. + /// + public IReadOnlyList GetKnownPeers() + { + lock (_knownPeers) + return [.. _knownPeers]; + } + + /// + /// Processes the arrival of a new peer. Only the meta-group leader acts. + /// Finds under-replicated streams (Group.Peers.Count < desired replica count) + /// and adds the new peer to their RaftGroup, triggering re-replication. + /// Go reference: jetstream_cluster.go:2290 processAddPeer. + /// + public void ProcessAddPeer(string peerId) + { + // Always register the new peer. + AddKnownPeer(peerId); + + // Only the meta-leader re-assigns streams. + if (!IsLeader()) + return; + + foreach (var sa in _assignments.Values) + { + var group = sa.Group; + if (group.Peers.Contains(peerId)) + continue; + + // Desired replicas is derived from the peer list size at creation time. + // In this model, the Group records the desired size via its original Peers count. + // An under-replicated group is one that is missing peers. + // Go reference: jetstream_cluster.go:2284 sa.missingPeers() — len(Peers) < Config.Replicas. + // Here, DesiredReplicas is stored on the group; if absent we skip (already at desired). + if (!sa.Group.HasDesiredReplicas) + continue; + + if (group.Peers.Count < sa.Group.DesiredReplicas) + { + // Add the new peer to restore the desired replica count. + group.Peers.Add(peerId); + } + } + } + + /// + /// Processes the removal of a peer. Only the meta-group leader acts. + /// Finds all streams that had the removed peer in their RaftGroup and + /// triggers reassignment away from that peer. + /// Go reference: jetstream_cluster.go:2342 processRemovePeer. + /// + public void ProcessRemovePeer(string peerId) + { + // Always remove from known set. + RemoveKnownPeer(peerId); + + // Only the meta-leader re-assigns streams. + if (!IsLeader()) + return; + + foreach (var sa in _assignments.Values) + { + if (sa.Group.Peers.Contains(peerId)) + RemovePeerFromStream(sa.StreamName, peerId); + } + } + + /// + /// Removes a specific peer from a stream's RaftGroup and remaps to a replacement + /// drawn from the known peers if possible. + /// Returns true if a replacement peer was found; false if the peer list was merely shrunk. + /// Go reference: jetstream_cluster.go:2403 removePeerFromStreamLocked. + /// + public bool RemovePeerFromStream(string streamName, string peerId) + { + if (!_assignments.TryGetValue(streamName, out var sa)) + return false; + + if (!sa.Group.Peers.Contains(peerId)) + return false; + + IReadOnlyList available; + lock (_knownPeers) + available = [.. _knownPeers]; + + return RemapStreamAssignment(sa, available, peerId); + } + + /// + /// Reassigns a stream's replica group to a new peer set, replacing the removed peer + /// with one drawn from . + /// Retains all current peers except , then fills the + /// vacancy from the available pool. + /// Returns true when a replacement peer was placed; false if the group was merely shrunk. + /// Go reference: jetstream_cluster.go:7077 remapStreamAssignment. + /// + public bool RemapStreamAssignment(StreamAssignment assignment, IReadOnlyList availablePeers, string removePeer) + { + var group = assignment.Group; + var currentPeers = group.Peers; + + // Peers to retain (all except the removed peer). + var retain = currentPeers.Where(p => p != removePeer).ToList(); + + // Candidates: available peers not already in the retained set and not the removed peer. + var retainSet = new HashSet(retain, StringComparer.Ordinal); + var candidates = availablePeers + .Where(p => !retainSet.Contains(p) && p != removePeer) + .ToList(); + + if (candidates.Count > 0) + { + // Pick the first available replacement. + retain.Add(candidates[0]); + currentPeers.Clear(); + foreach (var p in retain) + currentPeers.Add(p); + return true; + } + + // No replacement available — just shrink the group if R>1. + // Go reference: jetstream_cluster.go:7098-7110 R1 fallback, bare removal for R>1. + if (currentPeers.Count > 1) + { + currentPeers.Remove(removePeer); + } + + return false; + } + // --------------------------------------------------------------- // Internal apply methods // --------------------------------------------------------------- @@ -755,6 +922,16 @@ public enum MetaEntryType StreamDelete, ConsumerCreate, ConsumerDelete, + /// + /// A peer joined the cluster; triggers re-replication of under-replicated streams. + /// Go reference: jetstream_cluster.go processAddPeer. + /// + PeerAdd, + /// + /// A peer left the cluster; triggers reassignment away from the removed peer. + /// Go reference: jetstream_cluster.go processRemovePeer. + /// + PeerRemove, } public sealed class MetaGroupState diff --git a/src/NATS.Server/JetStream/Cluster/StreamReplicaGroup.cs b/src/NATS.Server/JetStream/Cluster/StreamReplicaGroup.cs index 3ad19e1..d0a3a51 100644 --- a/src/NATS.Server/JetStream/Cluster/StreamReplicaGroup.cs +++ b/src/NATS.Server/JetStream/Cluster/StreamReplicaGroup.cs @@ -11,6 +11,15 @@ public sealed class StreamReplicaGroup private long _messageCount; private long _lastSequence; + // Track ack/nak/deliver counts for consumer entry apply. + // Go reference: jetstream_cluster.go processConsumerEntries. + private long _ackCount; + private long _nakCount; + private long _deliverCount; + + // Last consumer op applied (used for diagnostics / unknown-op logging). + private string _lastUnknownCommand = string.Empty; + public string StreamName { get; } public IReadOnlyList Nodes => _nodes; public RaftNode Leader { get; private set; } @@ -27,6 +36,30 @@ public sealed class StreamReplicaGroup /// public long LastSequence => Interlocked.Read(ref _lastSequence); + /// + /// Number of consumer acknowledgement ops applied. + /// Go reference: jetstream_cluster.go processConsumerEntries — ack tracking. + /// + public long AckCount => Interlocked.Read(ref _ackCount); + + /// + /// Number of consumer negative-acknowledgement (nak) ops applied. + /// Go reference: jetstream_cluster.go processConsumerEntries — nak tracking. + /// + public long NakCount => Interlocked.Read(ref _nakCount); + + /// + /// Number of consumer deliver ops applied. + /// Go reference: jetstream_cluster.go processConsumerEntries — deliver tracking. + /// + public long DeliverCount => Interlocked.Read(ref _deliverCount); + + /// + /// The last command string that did not match any known entry type. + /// Used to verify graceful unknown-entry handling. + /// + public string LastUnknownCommand => _lastUnknownCommand; + /// /// Fired when leadership transfers to a new node. /// Go reference: jetstream_cluster.go leader change notification. @@ -187,6 +220,8 @@ public sealed class StreamReplicaGroup /// processes each one: /// "+peer:<id>" — adds the peer via ProposeAddPeerAsync /// "-peer:<id>" — removes the peer via ProposeRemovePeerAsync + /// "smsg:<op>[,...]" — dispatches a stream message op (store/remove/purge) + /// "centry:<op>" — dispatches a consumer op (ack/nak/deliver/term/progress) /// anything else — marks the entry as processed via MarkProcessed /// Go reference: jetstream_cluster.go:processStreamEntries (apply loop). /// @@ -207,13 +242,115 @@ public sealed class StreamReplicaGroup var peerId = entry.Command["-peer:".Length..]; await Leader.ProposeRemovePeerAsync(peerId, ct); } + else if (entry.Command.StartsWith("smsg:", StringComparison.Ordinal)) + { + var opStr = entry.Command["smsg:".Length..]; + if (TryParseStreamMsgOp(opStr, out var msgOp)) + ApplyStreamMsgOp(msgOp, entry.Index); + else + _lastUnknownCommand = entry.Command; + Leader.MarkProcessed(entry.Index); + } + else if (entry.Command.StartsWith("centry:", StringComparison.Ordinal)) + { + var opStr = entry.Command["centry:".Length..]; + if (TryParseConsumerOp(opStr, out var consumerOp)) + ApplyConsumerEntry(consumerOp); + else + _lastUnknownCommand = entry.Command; + Leader.MarkProcessed(entry.Index); + } else { + _lastUnknownCommand = entry.Command; Leader.MarkProcessed(entry.Index); } } } + /// + /// Applies a stream-level message operation (Store, Remove, Purge) to the local state. + /// Go reference: jetstream_cluster.go:2474-4261 processStreamEntries — per-message ops. + /// + public void ApplyStreamMsgOp(StreamMsgOp op, long index = 0) + { + switch (op) + { + case StreamMsgOp.Store: + // Increment message count and track the sequence. + ApplyMessage(index > 0 ? index : Interlocked.Read(ref _messageCount) + 1); + break; + + case StreamMsgOp.Remove: + // Decrement message count; clamp to zero. + long current; + do + { + current = Interlocked.Read(ref _messageCount); + if (current <= 0) + return; + } + while (Interlocked.CompareExchange(ref _messageCount, current - 1, current) != current); + break; + + case StreamMsgOp.Purge: + // Clear all messages: reset count and last sequence. + Interlocked.Exchange(ref _messageCount, 0); + Interlocked.Exchange(ref _lastSequence, 0); + break; + } + } + + /// + /// Applies a consumer state entry (Ack, Nak, Deliver, Term, Progress). + /// Go reference: jetstream_cluster.go processConsumerEntries. + /// + public void ApplyConsumerEntry(ConsumerOp op) + { + switch (op) + { + case ConsumerOp.Ack: + Interlocked.Increment(ref _ackCount); + break; + case ConsumerOp.Nak: + Interlocked.Increment(ref _nakCount); + break; + case ConsumerOp.Deliver: + Interlocked.Increment(ref _deliverCount); + break; + // Term and Progress are no-ops in the model but are valid ops. + case ConsumerOp.Term: + case ConsumerOp.Progress: + break; + } + } + + private static bool TryParseStreamMsgOp(string s, out StreamMsgOp op) + { + op = s switch + { + "store" => StreamMsgOp.Store, + "remove" => StreamMsgOp.Remove, + "purge" => StreamMsgOp.Purge, + _ => (StreamMsgOp)(-1), + }; + return (int)op >= 0; + } + + private static bool TryParseConsumerOp(string s, out ConsumerOp op) + { + op = s switch + { + "ack" => ConsumerOp.Ack, + "nak" => ConsumerOp.Nak, + "deliver" => ConsumerOp.Deliver, + "term" => ConsumerOp.Term, + "progress" => ConsumerOp.Progress, + _ => (ConsumerOp)(-1), + }; + return (int)op >= 0; + } + /// /// Creates a snapshot of the current state at the leader's applied index and compacts /// the log up to that point. @@ -298,3 +435,35 @@ public sealed class LeaderChangedEventArgs(string previousLeaderId, string newLe public string NewLeaderId { get; } = newLeaderId; public int NewTerm { get; } = newTerm; } + +/// +/// Message-level operation types applied to a stream's RAFT group. +/// Go reference: jetstream_cluster.go:2474-4261 processStreamEntries — op constants. +/// +public enum StreamMsgOp +{ + /// Store a new message; increments the message count and advances the sequence. + Store, + /// Remove a message by sequence; decrements the message count. + Remove, + /// Purge all messages; resets message count and last sequence to zero. + Purge, +} + +/// +/// Consumer-state operation types applied to a consumer's RAFT group. +/// Go reference: jetstream_cluster.go processConsumerEntries — op constants. +/// +public enum ConsumerOp +{ + /// Consumer acknowledged a message. + Ack, + /// Consumer negatively acknowledged a message (redelivery scheduled). + Nak, + /// A message was delivered to the consumer. + Deliver, + /// Consumer terminated a message (no redelivery). + Term, + /// Consumer reported progress on a slow-ack message. + Progress, +} diff --git a/tests/NATS.Server.Tests/JetStream/Cluster/EntryApplicationTests.cs b/tests/NATS.Server.Tests/JetStream/Cluster/EntryApplicationTests.cs new file mode 100644 index 0000000..c716150 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Cluster/EntryApplicationTests.cs @@ -0,0 +1,496 @@ +// Go parity: golang/nats-server/server/jetstream_cluster.go:2474-4261 +// Covers: entry application pipeline for JetStreamMetaGroup and StreamReplicaGroup — +// meta entry dispatch (StreamCreate, StreamDelete, ConsumerCreate, ConsumerDelete, +// PeerAdd, PeerRemove), stream-level message ops (Store, Remove, Purge), +// consumer-level ops (Ack, Nak, Deliver, Term, Progress), unknown-entry handling. +using NATS.Server.JetStream.Cluster; +using NATS.Server.JetStream.Models; + +namespace NATS.Server.Tests.JetStream.Cluster; + +/// +/// Tests for the entry application pipeline in JetStreamMetaGroup and StreamReplicaGroup. +/// Go reference: jetstream_cluster.go:2474-4261 processStreamEntries / processConsumerEntries. +/// +public class EntryApplicationTests +{ + // --------------------------------------------------------------- + // ApplyEntry — StreamCreate (existing behaviour verification) + // Go reference: jetstream_cluster.go processStreamAssignment apply + // --------------------------------------------------------------- + + [Fact] + public void ApplyEntry_StreamCreate_creates_stream() + { + // Go ref: jetstream_cluster.go:4541 processStreamAssignment — apply creates stream. + var meta = new JetStreamMetaGroup(3); + var group = new RaftGroup { Name = "orders-group", Peers = ["p1", "p2", "p3"] }; + + meta.ApplyEntry(MetaEntryType.StreamCreate, "ORDERS", group: group); + + meta.StreamCount.ShouldBe(1); + meta.GetStreamAssignment("ORDERS").ShouldNotBeNull(); + } + + [Fact] + public void ApplyEntry_StreamDelete_removes_stream() + { + // Go ref: jetstream_cluster.go processStreamRemoval apply. + var meta = new JetStreamMetaGroup(3); + meta.ApplyEntry(MetaEntryType.StreamCreate, "ORDERS"); + + meta.ApplyEntry(MetaEntryType.StreamDelete, "ORDERS"); + + meta.StreamCount.ShouldBe(0); + meta.GetStreamAssignment("ORDERS").ShouldBeNull(); + } + + [Fact] + public void ApplyEntry_ConsumerCreate_creates_consumer_on_stream() + { + // Go ref: jetstream_cluster.go:5300 processConsumerAssignment apply. + var meta = new JetStreamMetaGroup(3); + meta.ApplyEntry(MetaEntryType.StreamCreate, "ORDERS"); + + meta.ApplyEntry(MetaEntryType.ConsumerCreate, "push-consumer", streamName: "ORDERS"); + + meta.ConsumerCount.ShouldBe(1); + meta.GetConsumerAssignment("ORDERS", "push-consumer").ShouldNotBeNull(); + } + + [Fact] + public void ApplyEntry_ConsumerDelete_removes_consumer() + { + // Go ref: jetstream_cluster.go processConsumerRemoval apply. + var meta = new JetStreamMetaGroup(3); + meta.ApplyEntry(MetaEntryType.StreamCreate, "ORDERS"); + meta.ApplyEntry(MetaEntryType.ConsumerCreate, "push-consumer", streamName: "ORDERS"); + + meta.ApplyEntry(MetaEntryType.ConsumerDelete, "push-consumer", streamName: "ORDERS"); + + meta.ConsumerCount.ShouldBe(0); + } + + [Fact] + public void ApplyEntry_ConsumerCreate_without_streamName_throws() + { + var meta = new JetStreamMetaGroup(3); + + Should.Throw(() => + meta.ApplyEntry(MetaEntryType.ConsumerCreate, "consumer")); + } + + // --------------------------------------------------------------- + // ApplyEntry — PeerAdd (new entry type dispatch) + // Go reference: jetstream_cluster.go:2290 processAddPeer + // --------------------------------------------------------------- + + [Fact] + public void ApplyEntry_PeerAdd_triggers_peer_processing() + { + // Go ref: jetstream_cluster.go:2290 processAddPeer — peer registered on apply. + var meta = new JetStreamMetaGroup(3); + + // Should not throw and should register the peer. + meta.ApplyEntry(MetaEntryType.PeerAdd, "peer-42"); + + meta.GetKnownPeers().ShouldContain("peer-42"); + } + + [Fact] + public void ApplyEntry_PeerAdd_registers_multiple_peers() + { + var meta = new JetStreamMetaGroup(3); + + meta.ApplyEntry(MetaEntryType.PeerAdd, "peer-A"); + meta.ApplyEntry(MetaEntryType.PeerAdd, "peer-B"); + + meta.GetKnownPeers().ShouldContain("peer-A"); + meta.GetKnownPeers().ShouldContain("peer-B"); + } + + [Fact] + public void ApplyEntry_PeerAdd_is_idempotent_for_same_peer() + { + var meta = new JetStreamMetaGroup(3); + + meta.ApplyEntry(MetaEntryType.PeerAdd, "peer-X"); + meta.ApplyEntry(MetaEntryType.PeerAdd, "peer-X"); + + // HashSet deduplicates — exactly one entry. + meta.GetKnownPeers().Count(p => p == "peer-X").ShouldBe(1); + } + + // --------------------------------------------------------------- + // ApplyEntry — PeerRemove (new entry type dispatch) + // Go reference: jetstream_cluster.go:2342 processRemovePeer + // --------------------------------------------------------------- + + [Fact] + public void ApplyEntry_PeerRemove_triggers_peer_processing() + { + // Go ref: jetstream_cluster.go:2342 processRemovePeer — peer removed on apply. + var meta = new JetStreamMetaGroup(3); + meta.ApplyEntry(MetaEntryType.PeerAdd, "peer-42"); + + meta.ApplyEntry(MetaEntryType.PeerRemove, "peer-42"); + + meta.GetKnownPeers().ShouldNotContain("peer-42"); + } + + [Fact] + public void ApplyEntry_PeerRemove_triggers_stream_reassignment() + { + // Go ref: jetstream_cluster.go:2342 processRemovePeer — affected streams identified. + var meta = new JetStreamMetaGroup(3); + var group = new RaftGroup { Name = "stream-group", Peers = ["peer-1", "peer-2", "peer-3"] }; + meta.ApplyEntry(MetaEntryType.StreamCreate, "EVENTS", group: group); + meta.ApplyEntry(MetaEntryType.PeerAdd, "peer-1"); + meta.ApplyEntry(MetaEntryType.PeerAdd, "peer-replacement"); + + // Removing peer-1: the stream that had peer-1 should be reassigned. + meta.ApplyEntry(MetaEntryType.PeerRemove, "peer-1"); + + // peer-1 should no longer be in the known peers set. + meta.GetKnownPeers().ShouldNotContain("peer-1"); + } + + // --------------------------------------------------------------- + // ApplyStreamMsgOp — Store + // Go reference: jetstream_cluster.go processStreamMsg store + // --------------------------------------------------------------- + + [Fact] + public void ApplyStreamMsgOp_Store_increments_message_count() + { + // Go ref: jetstream_cluster.go:2474 processStreamEntries — store op increments Msgs. + var srg = new StreamReplicaGroup("ORDERS", 1); + var before = srg.MessageCount; + + srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 1); + + srg.MessageCount.ShouldBe(before + 1); + } + + [Fact] + public void ApplyStreamMsgOp_Store_advances_last_sequence() + { + var srg = new StreamReplicaGroup("ORDERS", 1); + + srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 42); + + srg.LastSequence.ShouldBe(42L); + } + + [Fact] + public void ApplyStreamMsgOp_Store_multiple_times_accumulates_count() + { + var srg = new StreamReplicaGroup("ORDERS", 1); + + srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 1); + srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 2); + srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 3); + + srg.MessageCount.ShouldBe(3L); + srg.LastSequence.ShouldBe(3L); + } + + // --------------------------------------------------------------- + // ApplyStreamMsgOp — Remove + // Go reference: jetstream_cluster.go processStreamMsg remove + // --------------------------------------------------------------- + + [Fact] + public void ApplyStreamMsgOp_Remove_decrements_message_count() + { + // Go ref: jetstream_cluster.go:3100 processStreamEntries — remove op decrements Msgs. + var srg = new StreamReplicaGroup("ORDERS", 1); + srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 1); + srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 2); + + srg.ApplyStreamMsgOp(StreamMsgOp.Remove); + + srg.MessageCount.ShouldBe(1L); + } + + [Fact] + public void ApplyStreamMsgOp_Remove_does_not_go_below_zero() + { + // Go ref: jetstream_cluster.go — safe guard on remove when already empty. + var srg = new StreamReplicaGroup("ORDERS", 1); + + // Remove from empty — should not underflow. + srg.ApplyStreamMsgOp(StreamMsgOp.Remove); + + srg.MessageCount.ShouldBe(0L); + } + + // --------------------------------------------------------------- + // ApplyStreamMsgOp — Purge + // Go reference: jetstream_cluster.go processStreamMsg purge + // --------------------------------------------------------------- + + [Fact] + public void ApplyStreamMsgOp_Purge_clears_messages() + { + // Go ref: jetstream_cluster.go:3200 processStreamEntries — purge resets state. + var srg = new StreamReplicaGroup("ORDERS", 1); + srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 1); + srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 2); + srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 3); + + srg.ApplyStreamMsgOp(StreamMsgOp.Purge); + + srg.MessageCount.ShouldBe(0L); + srg.LastSequence.ShouldBe(0L); + } + + [Fact] + public void ApplyStreamMsgOp_Purge_then_Store_increments_from_zero() + { + var srg = new StreamReplicaGroup("ORDERS", 1); + srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 5); + srg.ApplyStreamMsgOp(StreamMsgOp.Purge); + + srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 6); + + srg.MessageCount.ShouldBe(1L); + } + + // --------------------------------------------------------------- + // ApplyConsumerEntries — Ack + // Go reference: jetstream_cluster.go processConsumerEntries ack + // --------------------------------------------------------------- + + [Fact] + public void ApplyConsumerEntries_Ack_processes_acknowledgment() + { + // Go ref: jetstream_cluster.go:3500 processConsumerEntries — ack increments ack floor. + var srg = new StreamReplicaGroup("ORDERS", 1); + + srg.ApplyConsumerEntry(ConsumerOp.Ack); + + srg.AckCount.ShouldBe(1L); + } + + [Fact] + public void ApplyConsumerEntries_Ack_accumulates_across_multiple_calls() + { + var srg = new StreamReplicaGroup("ORDERS", 1); + + srg.ApplyConsumerEntry(ConsumerOp.Ack); + srg.ApplyConsumerEntry(ConsumerOp.Ack); + srg.ApplyConsumerEntry(ConsumerOp.Ack); + + srg.AckCount.ShouldBe(3L); + } + + // --------------------------------------------------------------- + // ApplyConsumerEntries — Nak + // Go reference: jetstream_cluster.go processConsumerEntries nak + // --------------------------------------------------------------- + + [Fact] + public void ApplyConsumerEntries_Nak_processes_negative_acknowledgment() + { + // Go ref: jetstream_cluster.go:3520 processConsumerEntries — nak schedules redelivery. + var srg = new StreamReplicaGroup("ORDERS", 1); + + srg.ApplyConsumerEntry(ConsumerOp.Nak); + + srg.NakCount.ShouldBe(1L); + } + + // --------------------------------------------------------------- + // ApplyConsumerEntries — Deliver + // Go reference: jetstream_cluster.go processConsumerEntries deliver + // --------------------------------------------------------------- + + [Fact] + public void ApplyConsumerEntries_Deliver_processes_delivery() + { + // Go ref: jetstream_cluster.go:3540 processConsumerEntries — deliver advances dseq. + var srg = new StreamReplicaGroup("ORDERS", 1); + + srg.ApplyConsumerEntry(ConsumerOp.Deliver); + + srg.DeliverCount.ShouldBe(1L); + } + + [Fact] + public void ApplyConsumerEntries_Term_does_not_throw() + { + var srg = new StreamReplicaGroup("ORDERS", 1); + + // Term is valid but has no counter in this model. + srg.ApplyConsumerEntry(ConsumerOp.Term); + } + + [Fact] + public void ApplyConsumerEntries_Progress_does_not_throw() + { + var srg = new StreamReplicaGroup("ORDERS", 1); + + srg.ApplyConsumerEntry(ConsumerOp.Progress); + } + + // --------------------------------------------------------------- + // ApplyCommittedEntriesAsync — smsg: dispatch + // Go reference: jetstream_cluster.go processStreamEntries command routing + // --------------------------------------------------------------- + + [Fact] + public async Task ApplyCommittedEntriesAsync_smsg_store_increments_count() + { + var srg = new StreamReplicaGroup("ORDERS", 1); + await srg.Leader.ProposeAsync("smsg:store", default); + + await srg.ApplyCommittedEntriesAsync(default); + + srg.MessageCount.ShouldBe(1L); + } + + [Fact] + public async Task ApplyCommittedEntriesAsync_smsg_purge_clears_messages() + { + var srg = new StreamReplicaGroup("ORDERS", 1); + await srg.Leader.ProposeAsync("smsg:store", default); + await srg.Leader.ProposeAsync("smsg:store", default); + await srg.ApplyCommittedEntriesAsync(default); + + await srg.Leader.ProposeAsync("smsg:purge", default); + await srg.ApplyCommittedEntriesAsync(default); + + srg.MessageCount.ShouldBe(0L); + } + + [Fact] + public async Task ApplyCommittedEntriesAsync_smsg_remove_decrements_count() + { + var srg = new StreamReplicaGroup("ORDERS", 1); + await srg.Leader.ProposeAsync("smsg:store", default); + await srg.Leader.ProposeAsync("smsg:store", default); + await srg.ApplyCommittedEntriesAsync(default); + + await srg.Leader.ProposeAsync("smsg:remove", default); + await srg.ApplyCommittedEntriesAsync(default); + + srg.MessageCount.ShouldBe(1L); + } + + // --------------------------------------------------------------- + // ApplyCommittedEntriesAsync — centry: dispatch + // Go reference: jetstream_cluster.go processConsumerEntries command routing + // --------------------------------------------------------------- + + [Fact] + public async Task ApplyCommittedEntriesAsync_centry_ack_increments_ack_count() + { + var srg = new StreamReplicaGroup("ORDERS", 1); + await srg.Leader.ProposeAsync("centry:ack", default); + + await srg.ApplyCommittedEntriesAsync(default); + + srg.AckCount.ShouldBe(1L); + } + + [Fact] + public async Task ApplyCommittedEntriesAsync_centry_nak_increments_nak_count() + { + var srg = new StreamReplicaGroup("ORDERS", 1); + await srg.Leader.ProposeAsync("centry:nak", default); + + await srg.ApplyCommittedEntriesAsync(default); + + srg.NakCount.ShouldBe(1L); + } + + [Fact] + public async Task ApplyCommittedEntriesAsync_centry_deliver_increments_deliver_count() + { + var srg = new StreamReplicaGroup("ORDERS", 1); + await srg.Leader.ProposeAsync("centry:deliver", default); + + await srg.ApplyCommittedEntriesAsync(default); + + srg.DeliverCount.ShouldBe(1L); + } + + // --------------------------------------------------------------- + // Unknown entry type — logged and skipped + // Go reference: jetstream_cluster.go default case in apply loop + // --------------------------------------------------------------- + + [Fact] + public async Task Unknown_entry_type_logged_and_skipped() + { + // Go ref: jetstream_cluster.go processStreamEntries — unknown ops are skipped. + var srg = new StreamReplicaGroup("ORDERS", 1); + await srg.Leader.ProposeAsync("smsg:unknown-op", default); + + // Should not throw. + await srg.ApplyCommittedEntriesAsync(default); + + // Message count unchanged; unknown command is recorded. + srg.MessageCount.ShouldBe(0L); + srg.LastUnknownCommand.ShouldBe("smsg:unknown-op"); + } + + [Fact] + public async Task Unknown_centry_op_logged_and_skipped() + { + var srg = new StreamReplicaGroup("ORDERS", 1); + await srg.Leader.ProposeAsync("centry:bogus", default); + + await srg.ApplyCommittedEntriesAsync(default); + + srg.AckCount.ShouldBe(0L); + srg.LastUnknownCommand.ShouldBe("centry:bogus"); + } + + [Fact] + public async Task Completely_unknown_prefix_is_logged_and_skipped() + { + // A command with an entirely unrecognised prefix is recorded and skipped. + var srg = new StreamReplicaGroup("ORDERS", 1); + await srg.Leader.ProposeAsync("xyzzy:something", default); + + await srg.ApplyCommittedEntriesAsync(default); + + srg.LastUnknownCommand.ShouldBe("xyzzy:something"); + } + + // --------------------------------------------------------------- + // MetaEntryType enum values exist + // --------------------------------------------------------------- + + [Fact] + public void MetaEntryType_enum_includes_PeerAdd_and_PeerRemove() + { + // Compile-time check: ensures the enum values exist. + _ = MetaEntryType.PeerAdd; + _ = MetaEntryType.PeerRemove; + } + + // --------------------------------------------------------------- + // StreamMsgOp and ConsumerOp enum values exist + // --------------------------------------------------------------- + + [Fact] + public void StreamMsgOp_enum_has_expected_values() + { + _ = StreamMsgOp.Store; + _ = StreamMsgOp.Remove; + _ = StreamMsgOp.Purge; + } + + [Fact] + public void ConsumerOp_enum_has_expected_values() + { + _ = ConsumerOp.Ack; + _ = ConsumerOp.Nak; + _ = ConsumerOp.Deliver; + _ = ConsumerOp.Term; + _ = ConsumerOp.Progress; + } +}