diff --git a/src/NATS.Server/JetStream/Consumers/PriorityGroupManager.cs b/src/NATS.Server/JetStream/Consumers/PriorityGroupManager.cs index 021ec52..2318bbe 100644 --- a/src/NATS.Server/JetStream/Consumers/PriorityGroupManager.cs +++ b/src/NATS.Server/JetStream/Consumers/PriorityGroupManager.cs @@ -92,10 +92,61 @@ public sealed class PriorityGroupManager return active != null && string.Equals(active, consumerId, StringComparison.Ordinal); } + /// + /// Assign a new pin ID to the named group, replacing any existing pin. + /// Go reference: consumer.go (assignNewPinId). + /// + /// The newly generated 22-character pin ID. + public string AssignPinId(string groupName, string consumerId) + { + if (!_groups.TryGetValue(groupName, out var group)) + return string.Empty; + + var pinId = Guid.NewGuid().ToString("N")[..22]; + lock (group.Lock) + { + group.CurrentPinId = pinId; + } + + return pinId; + } + + /// + /// Returns true if the group exists and its current pin ID equals . + /// Go reference: consumer.go (setPinnedTimer). + /// + public bool ValidatePinId(string groupName, string pinId) + { + if (!_groups.TryGetValue(groupName, out var group)) + return false; + + lock (group.Lock) + { + return group.CurrentPinId != null && + string.Equals(group.CurrentPinId, pinId, StringComparison.Ordinal); + } + } + + /// + /// Clear the current pin ID for the named group. No-op if the group does not exist. + /// Go reference: consumer.go (setPinnedTimer). + /// + public void UnassignPinId(string groupName) + { + if (!_groups.TryGetValue(groupName, out var group)) + return; + + lock (group.Lock) + { + group.CurrentPinId = null; + } + } + private sealed class PriorityGroup { public object Lock { get; } = new(); public List Members { get; } = []; + public string? CurrentPinId { get; set; } } private record struct PriorityMember(string ConsumerId, int Priority); diff --git a/tests/NATS.Server.Tests/JetStream/Consumers/PriorityGroupPinningTests.cs b/tests/NATS.Server.Tests/JetStream/Consumers/PriorityGroupPinningTests.cs new file mode 100644 index 0000000..b82a942 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Consumers/PriorityGroupPinningTests.cs @@ -0,0 +1,84 @@ +using NATS.Server.JetStream.Consumers; + +namespace NATS.Server.Tests.JetStream.Consumers; + +/// +/// Tests for priority group pin ID management. +/// Go reference: consumer.go (setPinnedTimer, assignNewPinId). +/// +public class PriorityGroupPinningTests +{ + [Fact] + public void AssignPinId_generates_unique_ids() + { + var mgr = new PriorityGroupManager(); + mgr.Register("group-1", "consumer-a", priority: 0); + + var pin1 = mgr.AssignPinId("group-1", "consumer-a"); + var pin2 = mgr.AssignPinId("group-1", "consumer-a"); + + pin1.ShouldNotBeNullOrEmpty(); + pin2.ShouldNotBeNullOrEmpty(); + pin1.ShouldNotBe(pin2); // each assignment is unique + } + + [Fact] + public void ValidatePinId_accepts_current() + { + var mgr = new PriorityGroupManager(); + mgr.Register("group-1", "consumer-a", priority: 0); + + var pin = mgr.AssignPinId("group-1", "consumer-a"); + mgr.ValidatePinId("group-1", pin).ShouldBeTrue(); + } + + [Fact] + public void ValidatePinId_rejects_expired() + { + var mgr = new PriorityGroupManager(); + mgr.Register("group-1", "consumer-a", priority: 0); + + var pin1 = mgr.AssignPinId("group-1", "consumer-a"); + var pin2 = mgr.AssignPinId("group-1", "consumer-a"); // replaces pin1 + + mgr.ValidatePinId("group-1", pin1).ShouldBeFalse(); + mgr.ValidatePinId("group-1", pin2).ShouldBeTrue(); + } + + [Fact] + public void UnassignPinId_clears() + { + var mgr = new PriorityGroupManager(); + mgr.Register("group-1", "consumer-a", priority: 0); + + var pin = mgr.AssignPinId("group-1", "consumer-a"); + mgr.UnassignPinId("group-1"); + + mgr.ValidatePinId("group-1", pin).ShouldBeFalse(); + } + + [Fact] + public void ValidatePinId_returns_false_for_unknown_group() + { + var mgr = new PriorityGroupManager(); + mgr.ValidatePinId("unknown", "any-pin").ShouldBeFalse(); + } + + [Fact] + public void UnassignPinId_noop_for_unknown_group() + { + var mgr = new PriorityGroupManager(); + // Should not throw + Should.NotThrow(() => mgr.UnassignPinId("unknown")); + } + + [Fact] + public void PinId_is_22_chars() + { + var mgr = new PriorityGroupManager(); + mgr.Register("g1", "c1", priority: 0); + + var pin = mgr.AssignPinId("g1", "c1"); + pin.Length.ShouldBe(22); + } +}