feat(consumer): add priority group pin ID management

Add AssignPinId, ValidatePinId, and UnassignPinId to PriorityGroupManager,
plus CurrentPinId tracking on PriorityGroup, porting Go consumer.go
(setPinnedTimer, assignNewPinId) pin ID semantics. Covered by 7 new tests.
This commit is contained in:
Joseph Doherty
2026-02-25 02:23:02 -05:00
parent dcc3e4460e
commit 2eaa736b21
2 changed files with 135 additions and 0 deletions

View File

@@ -92,10 +92,61 @@ public sealed class PriorityGroupManager
return active != null && string.Equals(active, consumerId, StringComparison.Ordinal);
}
/// <summary>
/// Assign a new pin ID to the named group, replacing any existing pin.
/// Go reference: consumer.go (assignNewPinId).
/// </summary>
/// <returns>The newly generated 22-character pin ID.</returns>
public string AssignPinId(string groupName, string consumerId)
{
if (!_groups.TryGetValue(groupName, out var group))
return string.Empty;
var pinId = Guid.NewGuid().ToString("N")[..22];
lock (group.Lock)
{
group.CurrentPinId = pinId;
}
return pinId;
}
/// <summary>
/// Returns <c>true</c> if the group exists and its current pin ID equals <paramref name="pinId"/>.
/// Go reference: consumer.go (setPinnedTimer).
/// </summary>
public bool ValidatePinId(string groupName, string pinId)
{
if (!_groups.TryGetValue(groupName, out var group))
return false;
lock (group.Lock)
{
return group.CurrentPinId != null &&
string.Equals(group.CurrentPinId, pinId, StringComparison.Ordinal);
}
}
/// <summary>
/// Clear the current pin ID for the named group. No-op if the group does not exist.
/// Go reference: consumer.go (setPinnedTimer).
/// </summary>
public void UnassignPinId(string groupName)
{
if (!_groups.TryGetValue(groupName, out var group))
return;
lock (group.Lock)
{
group.CurrentPinId = null;
}
}
private sealed class PriorityGroup
{
public object Lock { get; } = new();
public List<PriorityMember> Members { get; } = [];
public string? CurrentPinId { get; set; }
}
private record struct PriorityMember(string ConsumerId, int Priority);

View File

@@ -0,0 +1,84 @@
using NATS.Server.JetStream.Consumers;
namespace NATS.Server.Tests.JetStream.Consumers;
/// <summary>
/// Tests for priority group pin ID management.
/// Go reference: consumer.go (setPinnedTimer, assignNewPinId).
/// </summary>
public class PriorityGroupPinningTests
{
[Fact]
public void AssignPinId_generates_unique_ids()
{
var mgr = new PriorityGroupManager();
mgr.Register("group-1", "consumer-a", priority: 0);
var pin1 = mgr.AssignPinId("group-1", "consumer-a");
var pin2 = mgr.AssignPinId("group-1", "consumer-a");
pin1.ShouldNotBeNullOrEmpty();
pin2.ShouldNotBeNullOrEmpty();
pin1.ShouldNotBe(pin2); // each assignment is unique
}
[Fact]
public void ValidatePinId_accepts_current()
{
var mgr = new PriorityGroupManager();
mgr.Register("group-1", "consumer-a", priority: 0);
var pin = mgr.AssignPinId("group-1", "consumer-a");
mgr.ValidatePinId("group-1", pin).ShouldBeTrue();
}
[Fact]
public void ValidatePinId_rejects_expired()
{
var mgr = new PriorityGroupManager();
mgr.Register("group-1", "consumer-a", priority: 0);
var pin1 = mgr.AssignPinId("group-1", "consumer-a");
var pin2 = mgr.AssignPinId("group-1", "consumer-a"); // replaces pin1
mgr.ValidatePinId("group-1", pin1).ShouldBeFalse();
mgr.ValidatePinId("group-1", pin2).ShouldBeTrue();
}
[Fact]
public void UnassignPinId_clears()
{
var mgr = new PriorityGroupManager();
mgr.Register("group-1", "consumer-a", priority: 0);
var pin = mgr.AssignPinId("group-1", "consumer-a");
mgr.UnassignPinId("group-1");
mgr.ValidatePinId("group-1", pin).ShouldBeFalse();
}
[Fact]
public void ValidatePinId_returns_false_for_unknown_group()
{
var mgr = new PriorityGroupManager();
mgr.ValidatePinId("unknown", "any-pin").ShouldBeFalse();
}
[Fact]
public void UnassignPinId_noop_for_unknown_group()
{
var mgr = new PriorityGroupManager();
// Should not throw
Should.NotThrow(() => mgr.UnassignPinId("unknown"));
}
[Fact]
public void PinId_is_22_chars()
{
var mgr = new PriorityGroupManager();
mgr.Register("g1", "c1", priority: 0);
var pin = mgr.AssignPinId("g1", "c1");
pin.Length.ShouldBe(22);
}
}