// Go parity: golang/nats-server/server/jetstream_cluster.go:2474-4261 // Covers: entry application pipeline for JetStreamMetaGroup and StreamReplicaGroup — // meta entry dispatch (StreamCreate, StreamDelete, ConsumerCreate, ConsumerDelete, // PeerAdd, PeerRemove), stream-level message ops (Store, Remove, Purge), // consumer-level ops (Ack, Nak, Deliver, Term, Progress), unknown-entry handling. using NATS.Server.JetStream.Cluster; using NATS.Server.JetStream.Models; namespace NATS.Server.JetStream.Tests.JetStream.Cluster; /// /// Tests for the entry application pipeline in JetStreamMetaGroup and StreamReplicaGroup. /// Go reference: jetstream_cluster.go:2474-4261 processStreamEntries / processConsumerEntries. /// public class EntryApplicationTests { // --------------------------------------------------------------- // ApplyEntry — StreamCreate (existing behaviour verification) // Go reference: jetstream_cluster.go processStreamAssignment apply // --------------------------------------------------------------- [Fact] public void ApplyEntry_StreamCreate_creates_stream() { // Go ref: jetstream_cluster.go:4541 processStreamAssignment — apply creates stream. var meta = new JetStreamMetaGroup(3); var group = new RaftGroup { Name = "orders-group", Peers = ["p1", "p2", "p3"] }; meta.ApplyEntry(MetaEntryType.StreamCreate, "ORDERS", group: group); meta.StreamCount.ShouldBe(1); meta.GetStreamAssignment("ORDERS").ShouldNotBeNull(); } [Fact] public void ApplyEntry_StreamDelete_removes_stream() { // Go ref: jetstream_cluster.go processStreamRemoval apply. var meta = new JetStreamMetaGroup(3); meta.ApplyEntry(MetaEntryType.StreamCreate, "ORDERS"); meta.ApplyEntry(MetaEntryType.StreamDelete, "ORDERS"); meta.StreamCount.ShouldBe(0); meta.GetStreamAssignment("ORDERS").ShouldBeNull(); } [Fact] public void ApplyEntry_ConsumerCreate_creates_consumer_on_stream() { // Go ref: jetstream_cluster.go:5300 processConsumerAssignment apply. var meta = new JetStreamMetaGroup(3); meta.ApplyEntry(MetaEntryType.StreamCreate, "ORDERS"); meta.ApplyEntry(MetaEntryType.ConsumerCreate, "push-consumer", streamName: "ORDERS"); meta.ConsumerCount.ShouldBe(1); meta.GetConsumerAssignment("ORDERS", "push-consumer").ShouldNotBeNull(); } [Fact] public void ApplyEntry_ConsumerDelete_removes_consumer() { // Go ref: jetstream_cluster.go processConsumerRemoval apply. var meta = new JetStreamMetaGroup(3); meta.ApplyEntry(MetaEntryType.StreamCreate, "ORDERS"); meta.ApplyEntry(MetaEntryType.ConsumerCreate, "push-consumer", streamName: "ORDERS"); meta.ApplyEntry(MetaEntryType.ConsumerDelete, "push-consumer", streamName: "ORDERS"); meta.ConsumerCount.ShouldBe(0); } [Fact] public void ApplyEntry_ConsumerCreate_without_streamName_throws() { var meta = new JetStreamMetaGroup(3); Should.Throw(() => meta.ApplyEntry(MetaEntryType.ConsumerCreate, "consumer")); } // --------------------------------------------------------------- // ApplyEntry — PeerAdd (new entry type dispatch) // Go reference: jetstream_cluster.go:2290 processAddPeer // --------------------------------------------------------------- [Fact] public void ApplyEntry_PeerAdd_triggers_peer_processing() { // Go ref: jetstream_cluster.go:2290 processAddPeer — peer registered on apply. var meta = new JetStreamMetaGroup(3); // Should not throw and should register the peer. meta.ApplyEntry(MetaEntryType.PeerAdd, "peer-42"); meta.GetKnownPeers().ShouldContain("peer-42"); } [Fact] public void ApplyEntry_PeerAdd_registers_multiple_peers() { var meta = new JetStreamMetaGroup(3); meta.ApplyEntry(MetaEntryType.PeerAdd, "peer-A"); meta.ApplyEntry(MetaEntryType.PeerAdd, "peer-B"); meta.GetKnownPeers().ShouldContain("peer-A"); meta.GetKnownPeers().ShouldContain("peer-B"); } [Fact] public void ApplyEntry_PeerAdd_is_idempotent_for_same_peer() { var meta = new JetStreamMetaGroup(3); meta.ApplyEntry(MetaEntryType.PeerAdd, "peer-X"); meta.ApplyEntry(MetaEntryType.PeerAdd, "peer-X"); // HashSet deduplicates — exactly one entry. meta.GetKnownPeers().Count(p => p == "peer-X").ShouldBe(1); } // --------------------------------------------------------------- // ApplyEntry — PeerRemove (new entry type dispatch) // Go reference: jetstream_cluster.go:2342 processRemovePeer // --------------------------------------------------------------- [Fact] public void ApplyEntry_PeerRemove_triggers_peer_processing() { // Go ref: jetstream_cluster.go:2342 processRemovePeer — peer removed on apply. var meta = new JetStreamMetaGroup(3); meta.ApplyEntry(MetaEntryType.PeerAdd, "peer-42"); meta.ApplyEntry(MetaEntryType.PeerRemove, "peer-42"); meta.GetKnownPeers().ShouldNotContain("peer-42"); } [Fact] public void ApplyEntry_PeerRemove_triggers_stream_reassignment() { // Go ref: jetstream_cluster.go:2342 processRemovePeer — affected streams identified. var meta = new JetStreamMetaGroup(3); var group = new RaftGroup { Name = "stream-group", Peers = ["peer-1", "peer-2", "peer-3"] }; meta.ApplyEntry(MetaEntryType.StreamCreate, "EVENTS", group: group); meta.ApplyEntry(MetaEntryType.PeerAdd, "peer-1"); meta.ApplyEntry(MetaEntryType.PeerAdd, "peer-replacement"); // Removing peer-1: the stream that had peer-1 should be reassigned. meta.ApplyEntry(MetaEntryType.PeerRemove, "peer-1"); // peer-1 should no longer be in the known peers set. meta.GetKnownPeers().ShouldNotContain("peer-1"); } // --------------------------------------------------------------- // ApplyStreamMsgOp — Store // Go reference: jetstream_cluster.go processStreamMsg store // --------------------------------------------------------------- [Fact] public void ApplyStreamMsgOp_Store_increments_message_count() { // Go ref: jetstream_cluster.go:2474 processStreamEntries — store op increments Msgs. var srg = new StreamReplicaGroup("ORDERS", 1); var before = srg.MessageCount; srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 1); srg.MessageCount.ShouldBe(before + 1); } [Fact] public void ApplyStreamMsgOp_Store_advances_last_sequence() { var srg = new StreamReplicaGroup("ORDERS", 1); srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 42); srg.LastSequence.ShouldBe(42L); } [Fact] public void ApplyStreamMsgOp_Store_multiple_times_accumulates_count() { var srg = new StreamReplicaGroup("ORDERS", 1); srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 1); srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 2); srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 3); srg.MessageCount.ShouldBe(3L); srg.LastSequence.ShouldBe(3L); } // --------------------------------------------------------------- // ApplyStreamMsgOp — Remove // Go reference: jetstream_cluster.go processStreamMsg remove // --------------------------------------------------------------- [Fact] public void ApplyStreamMsgOp_Remove_decrements_message_count() { // Go ref: jetstream_cluster.go:3100 processStreamEntries — remove op decrements Msgs. var srg = new StreamReplicaGroup("ORDERS", 1); srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 1); srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 2); srg.ApplyStreamMsgOp(StreamMsgOp.Remove); srg.MessageCount.ShouldBe(1L); } [Fact] public void ApplyStreamMsgOp_Remove_does_not_go_below_zero() { // Go ref: jetstream_cluster.go — safe guard on remove when already empty. var srg = new StreamReplicaGroup("ORDERS", 1); // Remove from empty — should not underflow. srg.ApplyStreamMsgOp(StreamMsgOp.Remove); srg.MessageCount.ShouldBe(0L); } // --------------------------------------------------------------- // ApplyStreamMsgOp — Purge // Go reference: jetstream_cluster.go processStreamMsg purge // --------------------------------------------------------------- [Fact] public void ApplyStreamMsgOp_Purge_clears_messages() { // Go ref: jetstream_cluster.go:3200 processStreamEntries — purge resets state. var srg = new StreamReplicaGroup("ORDERS", 1); srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 1); srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 2); srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 3); srg.ApplyStreamMsgOp(StreamMsgOp.Purge); srg.MessageCount.ShouldBe(0L); srg.LastSequence.ShouldBe(0L); } [Fact] public void ApplyStreamMsgOp_Purge_then_Store_increments_from_zero() { var srg = new StreamReplicaGroup("ORDERS", 1); srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 5); srg.ApplyStreamMsgOp(StreamMsgOp.Purge); srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 6); srg.MessageCount.ShouldBe(1L); } // --------------------------------------------------------------- // ApplyConsumerEntries — Ack // Go reference: jetstream_cluster.go processConsumerEntries ack // --------------------------------------------------------------- [Fact] public void ApplyConsumerEntries_Ack_processes_acknowledgment() { // Go ref: jetstream_cluster.go:3500 processConsumerEntries — ack increments ack floor. var srg = new StreamReplicaGroup("ORDERS", 1); srg.ApplyConsumerEntry(ConsumerOp.Ack); srg.AckCount.ShouldBe(1L); } [Fact] public void ApplyConsumerEntries_Ack_accumulates_across_multiple_calls() { var srg = new StreamReplicaGroup("ORDERS", 1); srg.ApplyConsumerEntry(ConsumerOp.Ack); srg.ApplyConsumerEntry(ConsumerOp.Ack); srg.ApplyConsumerEntry(ConsumerOp.Ack); srg.AckCount.ShouldBe(3L); } // --------------------------------------------------------------- // ApplyConsumerEntries — Nak // Go reference: jetstream_cluster.go processConsumerEntries nak // --------------------------------------------------------------- [Fact] public void ApplyConsumerEntries_Nak_processes_negative_acknowledgment() { // Go ref: jetstream_cluster.go:3520 processConsumerEntries — nak schedules redelivery. var srg = new StreamReplicaGroup("ORDERS", 1); srg.ApplyConsumerEntry(ConsumerOp.Nak); srg.NakCount.ShouldBe(1L); } // --------------------------------------------------------------- // ApplyConsumerEntries — Deliver // Go reference: jetstream_cluster.go processConsumerEntries deliver // --------------------------------------------------------------- [Fact] public void ApplyConsumerEntries_Deliver_processes_delivery() { // Go ref: jetstream_cluster.go:3540 processConsumerEntries — deliver advances dseq. var srg = new StreamReplicaGroup("ORDERS", 1); srg.ApplyConsumerEntry(ConsumerOp.Deliver); srg.DeliverCount.ShouldBe(1L); } [Fact] public void ApplyConsumerEntries_Term_does_not_throw() { var srg = new StreamReplicaGroup("ORDERS", 1); // Term is valid but has no counter in this model. srg.ApplyConsumerEntry(ConsumerOp.Term); } [Fact] public void ApplyConsumerEntries_Progress_does_not_throw() { var srg = new StreamReplicaGroup("ORDERS", 1); srg.ApplyConsumerEntry(ConsumerOp.Progress); } // --------------------------------------------------------------- // ApplyCommittedEntriesAsync — smsg: dispatch // Go reference: jetstream_cluster.go processStreamEntries command routing // --------------------------------------------------------------- [Fact] public async Task ApplyCommittedEntriesAsync_smsg_store_increments_count() { var srg = new StreamReplicaGroup("ORDERS", 1); await srg.Leader.ProposeAsync("smsg:store", default); await srg.ApplyCommittedEntriesAsync(default); srg.MessageCount.ShouldBe(1L); } [Fact] public async Task ApplyCommittedEntriesAsync_smsg_purge_clears_messages() { var srg = new StreamReplicaGroup("ORDERS", 1); await srg.Leader.ProposeAsync("smsg:store", default); await srg.Leader.ProposeAsync("smsg:store", default); await srg.ApplyCommittedEntriesAsync(default); await srg.Leader.ProposeAsync("smsg:purge", default); await srg.ApplyCommittedEntriesAsync(default); srg.MessageCount.ShouldBe(0L); } [Fact] public async Task ApplyCommittedEntriesAsync_smsg_remove_decrements_count() { var srg = new StreamReplicaGroup("ORDERS", 1); await srg.Leader.ProposeAsync("smsg:store", default); await srg.Leader.ProposeAsync("smsg:store", default); await srg.ApplyCommittedEntriesAsync(default); await srg.Leader.ProposeAsync("smsg:remove", default); await srg.ApplyCommittedEntriesAsync(default); srg.MessageCount.ShouldBe(1L); } // --------------------------------------------------------------- // ApplyCommittedEntriesAsync — centry: dispatch // Go reference: jetstream_cluster.go processConsumerEntries command routing // --------------------------------------------------------------- [Fact] public async Task ApplyCommittedEntriesAsync_centry_ack_increments_ack_count() { var srg = new StreamReplicaGroup("ORDERS", 1); await srg.Leader.ProposeAsync("centry:ack", default); await srg.ApplyCommittedEntriesAsync(default); srg.AckCount.ShouldBe(1L); } [Fact] public async Task ApplyCommittedEntriesAsync_centry_nak_increments_nak_count() { var srg = new StreamReplicaGroup("ORDERS", 1); await srg.Leader.ProposeAsync("centry:nak", default); await srg.ApplyCommittedEntriesAsync(default); srg.NakCount.ShouldBe(1L); } [Fact] public async Task ApplyCommittedEntriesAsync_centry_deliver_increments_deliver_count() { var srg = new StreamReplicaGroup("ORDERS", 1); await srg.Leader.ProposeAsync("centry:deliver", default); await srg.ApplyCommittedEntriesAsync(default); srg.DeliverCount.ShouldBe(1L); } // --------------------------------------------------------------- // Unknown entry type — logged and skipped // Go reference: jetstream_cluster.go default case in apply loop // --------------------------------------------------------------- [Fact] public async Task Unknown_entry_type_logged_and_skipped() { // Go ref: jetstream_cluster.go processStreamEntries — unknown ops are skipped. var srg = new StreamReplicaGroup("ORDERS", 1); await srg.Leader.ProposeAsync("smsg:unknown-op", default); // Should not throw. await srg.ApplyCommittedEntriesAsync(default); // Message count unchanged; unknown command is recorded. srg.MessageCount.ShouldBe(0L); srg.LastUnknownCommand.ShouldBe("smsg:unknown-op"); } [Fact] public async Task Unknown_centry_op_logged_and_skipped() { var srg = new StreamReplicaGroup("ORDERS", 1); await srg.Leader.ProposeAsync("centry:bogus", default); await srg.ApplyCommittedEntriesAsync(default); srg.AckCount.ShouldBe(0L); srg.LastUnknownCommand.ShouldBe("centry:bogus"); } [Fact] public async Task Completely_unknown_prefix_is_logged_and_skipped() { // A command with an entirely unrecognised prefix is recorded and skipped. var srg = new StreamReplicaGroup("ORDERS", 1); await srg.Leader.ProposeAsync("xyzzy:something", default); await srg.ApplyCommittedEntriesAsync(default); srg.LastUnknownCommand.ShouldBe("xyzzy:something"); } // --------------------------------------------------------------- // MetaEntryType enum values exist // --------------------------------------------------------------- [Fact] public void MetaEntryType_enum_includes_PeerAdd_and_PeerRemove() { // Compile-time check: ensures the enum values exist. _ = MetaEntryType.PeerAdd; _ = MetaEntryType.PeerRemove; } // --------------------------------------------------------------- // StreamMsgOp and ConsumerOp enum values exist // --------------------------------------------------------------- [Fact] public void StreamMsgOp_enum_has_expected_values() { _ = StreamMsgOp.Store; _ = StreamMsgOp.Remove; _ = StreamMsgOp.Purge; } [Fact] public void ConsumerOp_enum_has_expected_values() { _ = ConsumerOp.Ack; _ = ConsumerOp.Nak; _ = ConsumerOp.Deliver; _ = ConsumerOp.Term; _ = ConsumerOp.Progress; } }