Files
natsdotnet/tests/NATS.Server.Tests/JetStream/Cluster/EntryApplicationTests.cs
Joseph Doherty 0f8f34afaa feat: add entry application pipeline for meta and stream RAFT groups (Gap 2.7)
Extend ApplyEntry in JetStreamMetaGroup with PeerAdd/PeerRemove dispatch
to the existing Task 12 peer management methods. Add StreamMsgOp (Store,
Remove, Purge) and ConsumerOp (Ack, Nak, Deliver, Term, Progress) enums
plus ApplyStreamMsgOp and ApplyConsumerEntry methods to StreamReplicaGroup.
Extend ApplyCommittedEntriesAsync to parse smsg:/centry: command prefixes
and route to the new apply methods. Add MetaEntryType.PeerAdd/PeerRemove
enum values. 35 new tests in EntryApplicationTests.cs, all passing.
2026-02-25 08:38:21 -05:00

497 lines
17 KiB
C#

// Go parity: golang/nats-server/server/jetstream_cluster.go:2474-4261
// Covers: entry application pipeline for JetStreamMetaGroup and StreamReplicaGroup —
// meta entry dispatch (StreamCreate, StreamDelete, ConsumerCreate, ConsumerDelete,
// PeerAdd, PeerRemove), stream-level message ops (Store, Remove, Purge),
// consumer-level ops (Ack, Nak, Deliver, Term, Progress), unknown-entry handling.
using NATS.Server.JetStream.Cluster;
using NATS.Server.JetStream.Models;
namespace NATS.Server.Tests.JetStream.Cluster;
/// <summary>
/// Tests for the entry application pipeline in JetStreamMetaGroup and StreamReplicaGroup.
/// Go reference: jetstream_cluster.go:2474-4261 processStreamEntries / processConsumerEntries.
/// </summary>
public class EntryApplicationTests
{
// ---------------------------------------------------------------
// ApplyEntry — StreamCreate (existing behaviour verification)
// Go reference: jetstream_cluster.go processStreamAssignment apply
// ---------------------------------------------------------------
[Fact]
public void ApplyEntry_StreamCreate_creates_stream()
{
// Go ref: jetstream_cluster.go:4541 processStreamAssignment — apply creates stream.
var meta = new JetStreamMetaGroup(3);
var group = new RaftGroup { Name = "orders-group", Peers = ["p1", "p2", "p3"] };
meta.ApplyEntry(MetaEntryType.StreamCreate, "ORDERS", group: group);
meta.StreamCount.ShouldBe(1);
meta.GetStreamAssignment("ORDERS").ShouldNotBeNull();
}
[Fact]
public void ApplyEntry_StreamDelete_removes_stream()
{
// Go ref: jetstream_cluster.go processStreamRemoval apply.
var meta = new JetStreamMetaGroup(3);
meta.ApplyEntry(MetaEntryType.StreamCreate, "ORDERS");
meta.ApplyEntry(MetaEntryType.StreamDelete, "ORDERS");
meta.StreamCount.ShouldBe(0);
meta.GetStreamAssignment("ORDERS").ShouldBeNull();
}
[Fact]
public void ApplyEntry_ConsumerCreate_creates_consumer_on_stream()
{
// Go ref: jetstream_cluster.go:5300 processConsumerAssignment apply.
var meta = new JetStreamMetaGroup(3);
meta.ApplyEntry(MetaEntryType.StreamCreate, "ORDERS");
meta.ApplyEntry(MetaEntryType.ConsumerCreate, "push-consumer", streamName: "ORDERS");
meta.ConsumerCount.ShouldBe(1);
meta.GetConsumerAssignment("ORDERS", "push-consumer").ShouldNotBeNull();
}
[Fact]
public void ApplyEntry_ConsumerDelete_removes_consumer()
{
// Go ref: jetstream_cluster.go processConsumerRemoval apply.
var meta = new JetStreamMetaGroup(3);
meta.ApplyEntry(MetaEntryType.StreamCreate, "ORDERS");
meta.ApplyEntry(MetaEntryType.ConsumerCreate, "push-consumer", streamName: "ORDERS");
meta.ApplyEntry(MetaEntryType.ConsumerDelete, "push-consumer", streamName: "ORDERS");
meta.ConsumerCount.ShouldBe(0);
}
[Fact]
public void ApplyEntry_ConsumerCreate_without_streamName_throws()
{
var meta = new JetStreamMetaGroup(3);
Should.Throw<ArgumentNullException>(() =>
meta.ApplyEntry(MetaEntryType.ConsumerCreate, "consumer"));
}
// ---------------------------------------------------------------
// ApplyEntry — PeerAdd (new entry type dispatch)
// Go reference: jetstream_cluster.go:2290 processAddPeer
// ---------------------------------------------------------------
[Fact]
public void ApplyEntry_PeerAdd_triggers_peer_processing()
{
// Go ref: jetstream_cluster.go:2290 processAddPeer — peer registered on apply.
var meta = new JetStreamMetaGroup(3);
// Should not throw and should register the peer.
meta.ApplyEntry(MetaEntryType.PeerAdd, "peer-42");
meta.GetKnownPeers().ShouldContain("peer-42");
}
[Fact]
public void ApplyEntry_PeerAdd_registers_multiple_peers()
{
var meta = new JetStreamMetaGroup(3);
meta.ApplyEntry(MetaEntryType.PeerAdd, "peer-A");
meta.ApplyEntry(MetaEntryType.PeerAdd, "peer-B");
meta.GetKnownPeers().ShouldContain("peer-A");
meta.GetKnownPeers().ShouldContain("peer-B");
}
[Fact]
public void ApplyEntry_PeerAdd_is_idempotent_for_same_peer()
{
var meta = new JetStreamMetaGroup(3);
meta.ApplyEntry(MetaEntryType.PeerAdd, "peer-X");
meta.ApplyEntry(MetaEntryType.PeerAdd, "peer-X");
// HashSet deduplicates — exactly one entry.
meta.GetKnownPeers().Count(p => p == "peer-X").ShouldBe(1);
}
// ---------------------------------------------------------------
// ApplyEntry — PeerRemove (new entry type dispatch)
// Go reference: jetstream_cluster.go:2342 processRemovePeer
// ---------------------------------------------------------------
[Fact]
public void ApplyEntry_PeerRemove_triggers_peer_processing()
{
// Go ref: jetstream_cluster.go:2342 processRemovePeer — peer removed on apply.
var meta = new JetStreamMetaGroup(3);
meta.ApplyEntry(MetaEntryType.PeerAdd, "peer-42");
meta.ApplyEntry(MetaEntryType.PeerRemove, "peer-42");
meta.GetKnownPeers().ShouldNotContain("peer-42");
}
[Fact]
public void ApplyEntry_PeerRemove_triggers_stream_reassignment()
{
// Go ref: jetstream_cluster.go:2342 processRemovePeer — affected streams identified.
var meta = new JetStreamMetaGroup(3);
var group = new RaftGroup { Name = "stream-group", Peers = ["peer-1", "peer-2", "peer-3"] };
meta.ApplyEntry(MetaEntryType.StreamCreate, "EVENTS", group: group);
meta.ApplyEntry(MetaEntryType.PeerAdd, "peer-1");
meta.ApplyEntry(MetaEntryType.PeerAdd, "peer-replacement");
// Removing peer-1: the stream that had peer-1 should be reassigned.
meta.ApplyEntry(MetaEntryType.PeerRemove, "peer-1");
// peer-1 should no longer be in the known peers set.
meta.GetKnownPeers().ShouldNotContain("peer-1");
}
// ---------------------------------------------------------------
// ApplyStreamMsgOp — Store
// Go reference: jetstream_cluster.go processStreamMsg store
// ---------------------------------------------------------------
[Fact]
public void ApplyStreamMsgOp_Store_increments_message_count()
{
// Go ref: jetstream_cluster.go:2474 processStreamEntries — store op increments Msgs.
var srg = new StreamReplicaGroup("ORDERS", 1);
var before = srg.MessageCount;
srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 1);
srg.MessageCount.ShouldBe(before + 1);
}
[Fact]
public void ApplyStreamMsgOp_Store_advances_last_sequence()
{
var srg = new StreamReplicaGroup("ORDERS", 1);
srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 42);
srg.LastSequence.ShouldBe(42L);
}
[Fact]
public void ApplyStreamMsgOp_Store_multiple_times_accumulates_count()
{
var srg = new StreamReplicaGroup("ORDERS", 1);
srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 1);
srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 2);
srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 3);
srg.MessageCount.ShouldBe(3L);
srg.LastSequence.ShouldBe(3L);
}
// ---------------------------------------------------------------
// ApplyStreamMsgOp — Remove
// Go reference: jetstream_cluster.go processStreamMsg remove
// ---------------------------------------------------------------
[Fact]
public void ApplyStreamMsgOp_Remove_decrements_message_count()
{
// Go ref: jetstream_cluster.go:3100 processStreamEntries — remove op decrements Msgs.
var srg = new StreamReplicaGroup("ORDERS", 1);
srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 1);
srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 2);
srg.ApplyStreamMsgOp(StreamMsgOp.Remove);
srg.MessageCount.ShouldBe(1L);
}
[Fact]
public void ApplyStreamMsgOp_Remove_does_not_go_below_zero()
{
// Go ref: jetstream_cluster.go — safe guard on remove when already empty.
var srg = new StreamReplicaGroup("ORDERS", 1);
// Remove from empty — should not underflow.
srg.ApplyStreamMsgOp(StreamMsgOp.Remove);
srg.MessageCount.ShouldBe(0L);
}
// ---------------------------------------------------------------
// ApplyStreamMsgOp — Purge
// Go reference: jetstream_cluster.go processStreamMsg purge
// ---------------------------------------------------------------
[Fact]
public void ApplyStreamMsgOp_Purge_clears_messages()
{
// Go ref: jetstream_cluster.go:3200 processStreamEntries — purge resets state.
var srg = new StreamReplicaGroup("ORDERS", 1);
srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 1);
srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 2);
srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 3);
srg.ApplyStreamMsgOp(StreamMsgOp.Purge);
srg.MessageCount.ShouldBe(0L);
srg.LastSequence.ShouldBe(0L);
}
[Fact]
public void ApplyStreamMsgOp_Purge_then_Store_increments_from_zero()
{
var srg = new StreamReplicaGroup("ORDERS", 1);
srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 5);
srg.ApplyStreamMsgOp(StreamMsgOp.Purge);
srg.ApplyStreamMsgOp(StreamMsgOp.Store, index: 6);
srg.MessageCount.ShouldBe(1L);
}
// ---------------------------------------------------------------
// ApplyConsumerEntries — Ack
// Go reference: jetstream_cluster.go processConsumerEntries ack
// ---------------------------------------------------------------
[Fact]
public void ApplyConsumerEntries_Ack_processes_acknowledgment()
{
// Go ref: jetstream_cluster.go:3500 processConsumerEntries — ack increments ack floor.
var srg = new StreamReplicaGroup("ORDERS", 1);
srg.ApplyConsumerEntry(ConsumerOp.Ack);
srg.AckCount.ShouldBe(1L);
}
[Fact]
public void ApplyConsumerEntries_Ack_accumulates_across_multiple_calls()
{
var srg = new StreamReplicaGroup("ORDERS", 1);
srg.ApplyConsumerEntry(ConsumerOp.Ack);
srg.ApplyConsumerEntry(ConsumerOp.Ack);
srg.ApplyConsumerEntry(ConsumerOp.Ack);
srg.AckCount.ShouldBe(3L);
}
// ---------------------------------------------------------------
// ApplyConsumerEntries — Nak
// Go reference: jetstream_cluster.go processConsumerEntries nak
// ---------------------------------------------------------------
[Fact]
public void ApplyConsumerEntries_Nak_processes_negative_acknowledgment()
{
// Go ref: jetstream_cluster.go:3520 processConsumerEntries — nak schedules redelivery.
var srg = new StreamReplicaGroup("ORDERS", 1);
srg.ApplyConsumerEntry(ConsumerOp.Nak);
srg.NakCount.ShouldBe(1L);
}
// ---------------------------------------------------------------
// ApplyConsumerEntries — Deliver
// Go reference: jetstream_cluster.go processConsumerEntries deliver
// ---------------------------------------------------------------
[Fact]
public void ApplyConsumerEntries_Deliver_processes_delivery()
{
// Go ref: jetstream_cluster.go:3540 processConsumerEntries — deliver advances dseq.
var srg = new StreamReplicaGroup("ORDERS", 1);
srg.ApplyConsumerEntry(ConsumerOp.Deliver);
srg.DeliverCount.ShouldBe(1L);
}
[Fact]
public void ApplyConsumerEntries_Term_does_not_throw()
{
var srg = new StreamReplicaGroup("ORDERS", 1);
// Term is valid but has no counter in this model.
srg.ApplyConsumerEntry(ConsumerOp.Term);
}
[Fact]
public void ApplyConsumerEntries_Progress_does_not_throw()
{
var srg = new StreamReplicaGroup("ORDERS", 1);
srg.ApplyConsumerEntry(ConsumerOp.Progress);
}
// ---------------------------------------------------------------
// ApplyCommittedEntriesAsync — smsg: dispatch
// Go reference: jetstream_cluster.go processStreamEntries command routing
// ---------------------------------------------------------------
[Fact]
public async Task ApplyCommittedEntriesAsync_smsg_store_increments_count()
{
var srg = new StreamReplicaGroup("ORDERS", 1);
await srg.Leader.ProposeAsync("smsg:store", default);
await srg.ApplyCommittedEntriesAsync(default);
srg.MessageCount.ShouldBe(1L);
}
[Fact]
public async Task ApplyCommittedEntriesAsync_smsg_purge_clears_messages()
{
var srg = new StreamReplicaGroup("ORDERS", 1);
await srg.Leader.ProposeAsync("smsg:store", default);
await srg.Leader.ProposeAsync("smsg:store", default);
await srg.ApplyCommittedEntriesAsync(default);
await srg.Leader.ProposeAsync("smsg:purge", default);
await srg.ApplyCommittedEntriesAsync(default);
srg.MessageCount.ShouldBe(0L);
}
[Fact]
public async Task ApplyCommittedEntriesAsync_smsg_remove_decrements_count()
{
var srg = new StreamReplicaGroup("ORDERS", 1);
await srg.Leader.ProposeAsync("smsg:store", default);
await srg.Leader.ProposeAsync("smsg:store", default);
await srg.ApplyCommittedEntriesAsync(default);
await srg.Leader.ProposeAsync("smsg:remove", default);
await srg.ApplyCommittedEntriesAsync(default);
srg.MessageCount.ShouldBe(1L);
}
// ---------------------------------------------------------------
// ApplyCommittedEntriesAsync — centry: dispatch
// Go reference: jetstream_cluster.go processConsumerEntries command routing
// ---------------------------------------------------------------
[Fact]
public async Task ApplyCommittedEntriesAsync_centry_ack_increments_ack_count()
{
var srg = new StreamReplicaGroup("ORDERS", 1);
await srg.Leader.ProposeAsync("centry:ack", default);
await srg.ApplyCommittedEntriesAsync(default);
srg.AckCount.ShouldBe(1L);
}
[Fact]
public async Task ApplyCommittedEntriesAsync_centry_nak_increments_nak_count()
{
var srg = new StreamReplicaGroup("ORDERS", 1);
await srg.Leader.ProposeAsync("centry:nak", default);
await srg.ApplyCommittedEntriesAsync(default);
srg.NakCount.ShouldBe(1L);
}
[Fact]
public async Task ApplyCommittedEntriesAsync_centry_deliver_increments_deliver_count()
{
var srg = new StreamReplicaGroup("ORDERS", 1);
await srg.Leader.ProposeAsync("centry:deliver", default);
await srg.ApplyCommittedEntriesAsync(default);
srg.DeliverCount.ShouldBe(1L);
}
// ---------------------------------------------------------------
// Unknown entry type — logged and skipped
// Go reference: jetstream_cluster.go default case in apply loop
// ---------------------------------------------------------------
[Fact]
public async Task Unknown_entry_type_logged_and_skipped()
{
// Go ref: jetstream_cluster.go processStreamEntries — unknown ops are skipped.
var srg = new StreamReplicaGroup("ORDERS", 1);
await srg.Leader.ProposeAsync("smsg:unknown-op", default);
// Should not throw.
await srg.ApplyCommittedEntriesAsync(default);
// Message count unchanged; unknown command is recorded.
srg.MessageCount.ShouldBe(0L);
srg.LastUnknownCommand.ShouldBe("smsg:unknown-op");
}
[Fact]
public async Task Unknown_centry_op_logged_and_skipped()
{
var srg = new StreamReplicaGroup("ORDERS", 1);
await srg.Leader.ProposeAsync("centry:bogus", default);
await srg.ApplyCommittedEntriesAsync(default);
srg.AckCount.ShouldBe(0L);
srg.LastUnknownCommand.ShouldBe("centry:bogus");
}
[Fact]
public async Task Completely_unknown_prefix_is_logged_and_skipped()
{
// A command with an entirely unrecognised prefix is recorded and skipped.
var srg = new StreamReplicaGroup("ORDERS", 1);
await srg.Leader.ProposeAsync("xyzzy:something", default);
await srg.ApplyCommittedEntriesAsync(default);
srg.LastUnknownCommand.ShouldBe("xyzzy:something");
}
// ---------------------------------------------------------------
// MetaEntryType enum values exist
// ---------------------------------------------------------------
[Fact]
public void MetaEntryType_enum_includes_PeerAdd_and_PeerRemove()
{
// Compile-time check: ensures the enum values exist.
_ = MetaEntryType.PeerAdd;
_ = MetaEntryType.PeerRemove;
}
// ---------------------------------------------------------------
// StreamMsgOp and ConsumerOp enum values exist
// ---------------------------------------------------------------
[Fact]
public void StreamMsgOp_enum_has_expected_values()
{
_ = StreamMsgOp.Store;
_ = StreamMsgOp.Remove;
_ = StreamMsgOp.Purge;
}
[Fact]
public void ConsumerOp_enum_has_expected_values()
{
_ = ConsumerOp.Ack;
_ = ConsumerOp.Nak;
_ = ConsumerOp.Deliver;
_ = ConsumerOp.Term;
_ = ConsumerOp.Progress;
}
}