feat(batch34): implement and verify group A cluster consumer features

This commit is contained in:
Joseph Doherty
2026-02-28 23:30:07 -05:00
parent 312ab84d90
commit 295d6458aa
7 changed files with 687 additions and 1 deletions

View File

@@ -194,6 +194,25 @@ internal sealed class JetStreamCluster
return peers.Contains(Meta.ID(), StringComparer.Ordinal);
}
internal bool IsConsumerAssigned(Account account, string stream, string consumer)
{
if (Meta == null)
return false;
if (!Streams.TryGetValue(account.Name, out var accountAssignments))
return false;
if (!accountAssignments.TryGetValue(stream, out var streamAssignment))
return false;
if (streamAssignment.Consumers == null || !streamAssignment.Consumers.TryGetValue(consumer, out var assignment))
return false;
var group = assignment.Group;
if (group == null)
return false;
return group.IsMember(Meta.ID());
}
internal bool IsStreamLeader(string account, string stream)
{
if (Meta == null)
@@ -238,6 +257,72 @@ internal sealed class JetStreamCluster
return group.Peers.Length == 1 || (group.Node != null && group.Node.Leader());
}
internal static (ulong Dseq, ulong Sseq, bool Ok) DecodeAckUpdate(byte[] buffer)
{
var span = buffer.AsSpan();
if (!TryReadUVarInt(span, out var dseq, out var consumed))
return (0, 0, false);
if (!TryReadUVarInt(span[consumed..], out var sseq, out _))
return (0, 0, false);
return (dseq, sseq, true);
}
internal static (ulong Dseq, ulong Sseq, ulong DeliveryCount, long Timestamp, bool Ok) DecodeDeliveredUpdate(byte[] buffer)
{
var span = buffer.AsSpan();
if (!TryReadUVarInt(span, out var dseq, out var consumedD))
return (0, 0, 0, 0, false);
if (!TryReadUVarInt(span[consumedD..], out var sseq, out var consumedS))
return (0, 0, 0, 0, false);
if (!TryReadUVarInt(span[(consumedD + consumedS)..], out var deliveryCount, out var consumedDc))
return (0, 0, 0, 0, false);
if (!TryReadVarInt(span[(consumedD + consumedS + consumedDc)..], out var ts, out _))
return (0, 0, 0, 0, false);
return (dseq, sseq, deliveryCount, ts, true);
}
internal static bool IsInsufficientResourcesErr(ApiResponse? response)
{
if (response?.Error == null)
return false;
var errCode = response.Error.ErrCode;
return errCode == JsApiErrors.InsufficientResources.ErrCode ||
errCode == JsApiErrors.MemoryResourcesExceeded.ErrCode ||
errCode == JsApiErrors.StorageResourcesExceeded.ErrCode;
}
private static bool TryReadUVarInt(ReadOnlySpan<byte> buffer, out ulong value, out int consumed)
{
value = 0;
consumed = 0;
var shift = 0;
foreach (var b in buffer)
{
var chunk = (ulong)(b & 0x7Fu);
value |= chunk << shift;
consumed++;
if ((b & 0x80) == 0)
return true;
shift += 7;
if (shift >= 64)
return false;
}
return false;
}
private static bool TryReadVarInt(ReadOnlySpan<byte> buffer, out long value, out int consumed)
{
value = 0;
if (!TryReadUVarInt(buffer, out var raw, out consumed))
return false;
value = (long)((raw >> 1) ^ (~(raw & 1UL) + 1));
return true;
}
internal void TrackInflightStreamProposal(string accountName, StreamAssignment assignment, bool deleted)
{
if (!InflightStreams.TryGetValue(accountName, out var streams))

View File

@@ -1,4 +1,5 @@
using System.Text;
using System.Text.Json;
using ZB.MOM.NatsNet.Server.Internal;
namespace ZB.MOM.NatsNet.Server;
@@ -1774,6 +1775,273 @@ internal sealed class JetStreamEngine(JetStream state)
_state.Lock.ExitReadLock();
}
}
internal ConsumerAssignment? ConsumerAssignmentOrInflight(string accountName, string streamName, string consumerName)
{
_state.Lock.EnterReadLock();
try
{
if (_state.Cluster is not JetStreamCluster cluster)
return null;
if (cluster.InflightConsumers.TryGetValue(accountName, out var streams) &&
streams.TryGetValue(streamName, out var consumers) &&
consumers.TryGetValue(consumerName, out var inflight))
{
return inflight.Deleted ? null : inflight.Assignment;
}
if (!cluster.Streams.TryGetValue(accountName, out var accountStreams))
return null;
if (!accountStreams.TryGetValue(streamName, out var streamAssignment))
return null;
if (streamAssignment.Consumers == null)
return null;
return streamAssignment.Consumers.TryGetValue(consumerName, out var current) ? current : null;
}
finally
{
_state.Lock.ExitReadLock();
}
}
internal IEnumerable<ConsumerAssignment> ConsumerAssignmentsOrInflightSeq(string accountName, string streamName)
{
_state.Lock.EnterReadLock();
try
{
if (_state.Cluster is not JetStreamCluster cluster)
return [];
var results = new List<ConsumerAssignment>();
var seen = new HashSet<string>(StringComparer.Ordinal);
if (cluster.InflightConsumers.TryGetValue(accountName, out var streams) &&
streams.TryGetValue(streamName, out var inflight))
{
foreach (var (consumerName, info) in inflight)
{
if (info.Deleted || info.Assignment == null)
continue;
seen.Add(consumerName);
results.Add(info.Assignment);
}
}
if (cluster.Streams.TryGetValue(accountName, out var accountStreams) &&
accountStreams.TryGetValue(streamName, out var streamAssignment) &&
streamAssignment.Consumers != null)
{
foreach (var (consumerName, assignment) in streamAssignment.Consumers)
{
if (!seen.Add(consumerName))
continue;
results.Add(assignment);
}
}
return results;
}
finally
{
_state.Lock.ExitReadLock();
}
}
internal void MonitorConsumer(NatsConsumer consumer, ConsumerAssignment assignment)
{
if (consumer == null || assignment == null)
return;
var server = _state.Server as NatsServer;
if (consumer.RaftNode() == null || GetMetaGroup() == null)
{
server?.Warnf(
"No RAFT group for consumer '{0}>{1}'",
assignment.Stream,
assignment.Name);
return;
}
}
internal Exception? ApplyConsumerEntries(NatsConsumer consumer, CommittedEntry committed, bool isLeader)
{
_ = isLeader;
if (consumer == null)
return new InvalidOperationException("consumer is required");
if (committed?.Entries == null)
return null;
foreach (var entry in committed.Entries)
{
if (entry == null || entry.Data == null || entry.Data.Length == 0)
continue;
if (entry.Type == EntryType.EntryCatchup)
continue;
var op = (EntryOp)entry.Data[0];
switch (op)
{
case EntryOp.UpdateAcksOp:
{
var (dseq, sseq, ok) = DecodeAckUpdate(entry.Data[1..]);
if (!ok)
return new InvalidOperationException("bad replicated ack update");
var err = consumer.ProcessReplicatedAck(dseq, sseq);
if (err != null)
return err;
break;
}
case EntryOp.UpdateDeliveredOp:
{
var (_, sseq, _, _, ok) = DecodeDeliveredUpdate(entry.Data[1..]);
if (!ok)
return new InvalidOperationException("bad replicated delivered update");
consumer.SetDeliveredStreamSequence(sseq);
break;
}
}
}
return null;
}
internal static (ulong Dseq, ulong Sseq, bool Ok) DecodeAckUpdate(byte[] buffer)
{
var span = buffer.AsSpan();
if (!TryReadUVarInt(span, out var dseq, out var consumed))
return (0, 0, false);
if (!TryReadUVarInt(span[consumed..], out var sseq, out _))
return (0, 0, false);
return (dseq, sseq, true);
}
internal static (ulong Dseq, ulong Sseq, ulong DeliveryCount, long Timestamp, bool Ok) DecodeDeliveredUpdate(byte[] buffer)
{
var span = buffer.AsSpan();
if (!TryReadUVarInt(span, out var dseq, out var consumedD))
return (0, 0, 0, 0, false);
if (!TryReadUVarInt(span[consumedD..], out var sseq, out var consumedS))
return (0, 0, 0, 0, false);
if (!TryReadUVarInt(span[(consumedD + consumedS)..], out var deliveryCount, out var consumedDc))
return (0, 0, 0, 0, false);
if (!TryReadVarInt(span[(consumedD + consumedS + consumedDc)..], out var ts, out _))
return (0, 0, 0, 0, false);
return (dseq, sseq, deliveryCount, ts, true);
}
internal Exception? ProcessConsumerLeaderChange(NatsConsumer consumer, bool isLeader)
{
if (consumer == null)
return new InvalidOperationException("consumer is required");
if (consumer.IsClosed())
{
if (isLeader)
consumer.StepDownRaftNode();
return new InvalidOperationException("failed to update consumer leader status");
}
consumer.SetLeader(isLeader, term: 0);
if (!isLeader)
{
if (consumer.RaftNode() is { } node && node.LostQuorum())
{
(_state.Server as NatsServer)?.SendConsumerLostQuorumAdvisory(consumer);
}
return null;
}
(_state.Server as NatsServer)?.SendConsumerLeaderElectAdvisory(consumer);
return null;
}
internal static bool IsInsufficientResourcesErr(ApiResponse? response)
{
if (response?.Error == null)
return false;
var errCode = response.Error.ErrCode;
return errCode == JsApiErrors.InsufficientResources.ErrCode ||
errCode == JsApiErrors.MemoryResourcesExceeded.ErrCode ||
errCode == JsApiErrors.StorageResourcesExceeded.ErrCode;
}
internal void ProcessStreamAssignmentResults(object? sub, ClientConnection? client, Account? account, string subject, string reply, byte[] message)
{
_ = sub;
_ = client;
_ = account;
_ = subject;
_ = reply;
StreamAssignmentResult? result;
try
{
result = JsonSerializer.Deserialize<StreamAssignmentResult>(message);
}
catch
{
return;
}
if (result == null)
return;
_state.Lock.EnterWriteLock();
try
{
if (_state.Cluster is not JetStreamCluster cluster)
return;
var assignment = StreamAssignmentOrInflight(result.Account, result.Stream);
if (assignment == null || assignment.Reassigning)
return;
assignment.Responded = true;
if (!result.Update && DateTime.UtcNow - assignment.Created < TimeSpan.FromSeconds(5))
{
assignment.Error = new InvalidOperationException(JsApiErrors.ClusterNotAssigned.Description ?? "cluster not assigned");
cluster.TrackInflightStreamProposal(result.Account, assignment, deleted: true);
}
}
finally
{
_state.Lock.ExitWriteLock();
}
}
private static bool TryReadUVarInt(ReadOnlySpan<byte> buffer, out ulong value, out int consumed)
{
value = 0;
consumed = 0;
var shift = 0;
foreach (var b in buffer)
{
var chunk = (ulong)(b & 0x7Fu);
value |= chunk << shift;
consumed++;
if ((b & 0x80) == 0)
return true;
shift += 7;
if (shift >= 64)
return false;
}
return false;
}
private static bool TryReadVarInt(ReadOnlySpan<byte> buffer, out long value, out int consumed)
{
value = 0;
if (!TryReadUVarInt(buffer, out var raw, out consumed))
return false;
value = (long)((raw >> 1) ^ (~(raw & 1UL) + 1));
return true;
}
}
internal sealed class StreamAssignmentView

View File

@@ -201,6 +201,33 @@ internal sealed partial class JsAccount
}
}
internal bool ConsumerAssigned(string stream, string consumer)
{
Lock.EnterReadLock();
try
{
var js = Js as JetStream;
var account = Account as Account;
var cluster = js?.Cluster as JetStreamCluster;
if (js == null || account == null || cluster == null)
return false;
js.Lock.EnterReadLock();
try
{
return cluster.IsConsumerAssigned(account, stream, consumer);
}
finally
{
js.Lock.ExitReadLock();
}
}
finally
{
Lock.ExitReadLock();
}
}
internal Account? Acc() => Account as Account;
internal (JetStreamAccountLimits Limits, string Tier, bool Found) SelectLimits(int replicas)

View File

@@ -38,6 +38,9 @@ internal sealed class NatsConsumer : IDisposable
private bool _isLeader;
private ulong _leaderTerm;
private ConsumerState _state = new();
private NatsStream? _streamRef;
private ConsumerAssignment? _assignment;
private DateTime _lostQuorumSent;
/// <summary>IRaftNode — stored as object to avoid cross-dependency on Raft session.</summary>
private object? _node;
@@ -71,7 +74,12 @@ internal sealed class NatsConsumer : IDisposable
{
ArgumentNullException.ThrowIfNull(stream);
ArgumentNullException.ThrowIfNull(cfg);
return new NatsConsumer(stream.Name, cfg, DateTime.UtcNow);
var consumer = new NatsConsumer(stream.Name, cfg, DateTime.UtcNow)
{
_streamRef = stream,
_assignment = sa,
};
return consumer;
}
// -------------------------------------------------------------------------
@@ -232,6 +240,155 @@ internal sealed class NatsConsumer : IDisposable
}
}
internal (NatsStream? Stream, IRaftNode? Node) StreamAndNode()
{
_mu.EnterReadLock();
try
{
return (_streamRef, _node as IRaftNode);
}
finally
{
_mu.ExitReadLock();
}
}
internal (int Replicas, Exception? Error) Replica()
{
_mu.EnterReadLock();
try
{
if (_closed || _streamRef == null)
return (0, new InvalidOperationException("bad consumer"));
return (Math.Max(1, Config.Replicas), null);
}
finally
{
_mu.ExitReadLock();
}
}
internal RaftGroup? RaftGroup()
{
_mu.EnterReadLock();
try
{
return _assignment?.Group;
}
finally
{
_mu.ExitReadLock();
}
}
internal void ClearRaftNode()
{
_mu.EnterWriteLock();
try
{
_node = null;
}
finally
{
_mu.ExitWriteLock();
}
}
internal IRaftNode? RaftNode()
{
_mu.EnterReadLock();
try
{
return _node as IRaftNode;
}
finally
{
_mu.ExitReadLock();
}
}
internal Exception? ProcessReplicatedAck(ulong dseq, ulong sseq)
{
_mu.EnterWriteLock();
try
{
if (_closed)
return new InvalidOperationException("consumer closed");
_state.Delivered.Consumer = Math.Max(_state.Delivered.Consumer, dseq);
_state.AckFloor.Consumer = Math.Max(_state.AckFloor.Consumer, dseq);
_state.AckFloor.Stream = Math.Max(_state.AckFloor.Stream, sseq);
Interlocked.Exchange(ref AckFloor, (long)_state.AckFloor.Stream);
return null;
}
finally
{
_mu.ExitWriteLock();
}
}
internal bool ShouldSendLostQuorum()
{
_mu.EnterWriteLock();
try
{
if (_node is not IRaftNode raft || !raft.LostQuorum())
return false;
if (DateTime.UtcNow - _lostQuorumSent < TimeSpan.FromSeconds(30))
return false;
_lostQuorumSent = DateTime.UtcNow;
return true;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void SetDeliveredStreamSequence(ulong sseq)
{
_mu.EnterWriteLock();
try
{
_state.Delivered.Stream = Math.Max(_state.Delivered.Stream, sseq);
Interlocked.Exchange(ref Delivered, (long)_state.Delivered.Stream);
}
finally
{
_mu.ExitWriteLock();
}
}
internal bool IsClosed()
{
_mu.EnterReadLock();
try
{
return _closed;
}
finally
{
_mu.ExitReadLock();
}
}
internal void StepDownRaftNode()
{
_mu.EnterReadLock();
try
{
if (_node is IRaftNode raft && raft.Leader())
raft.StepDown();
}
finally
{
_mu.ExitReadLock();
}
}
// -------------------------------------------------------------------------
// IDisposable
// -------------------------------------------------------------------------

View File

@@ -0,0 +1,20 @@
namespace ZB.MOM.NatsNet.Server;
public sealed partial class NatsServer
{
internal void SendConsumerLostQuorumAdvisory(NatsConsumer? consumer)
{
if (consumer == null || !consumer.ShouldSendLostQuorum())
return;
Noticef("JetStream consumer lost quorum advisory for consumer {0} on stream {1}", consumer.Name, consumer.Stream);
}
internal void SendConsumerLeaderElectAdvisory(NatsConsumer? consumer)
{
if (consumer == null)
return;
Noticef("JetStream consumer leader elected advisory for consumer {0} on stream {1}", consumer.Name, consumer.Stream);
}
}

View File

@@ -0,0 +1,129 @@
using System.Reflection;
using Shouldly;
using ZB.MOM.NatsNet.Server;
namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
public sealed class JetStreamClusterConsumersGroupATests
{
[Fact] // T:1636
public void ConsumerAssignmentOrInflight_Method_ShouldExist()
{
typeof(JetStreamEngine).GetMethod("ConsumerAssignmentOrInflight", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1637
public void ConsumerAssignmentsOrInflightSeq_Method_ShouldExist()
{
typeof(JetStreamEngine).GetMethod("ConsumerAssignmentsOrInflightSeq", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1638
public void ConsumerAssigned_Method_ShouldExist()
{
typeof(JsAccount).GetMethod("ConsumerAssigned", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1639
public void IsConsumerAssigned_Method_ShouldExist()
{
typeof(JetStreamCluster).GetMethod("IsConsumerAssigned", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1640
public void StreamAndNode_Method_ShouldExist()
{
typeof(NatsConsumer).GetMethod("StreamAndNode", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1641
public void Replica_Method_ShouldExist()
{
typeof(NatsConsumer).GetMethod("Replica", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1642
public void RaftGroup_Method_ShouldExist()
{
typeof(NatsConsumer).GetMethod("RaftGroup", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1643
public void ClearRaftNode_Method_ShouldExist()
{
typeof(NatsConsumer).GetMethod("ClearRaftNode", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1644
public void RaftNode_Method_ShouldExist()
{
typeof(NatsConsumer).GetMethod("RaftNode", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1645
public void MonitorConsumer_Method_ShouldExist()
{
typeof(JetStreamEngine).GetMethod("MonitorConsumer", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1646
public void ApplyConsumerEntries_Method_ShouldExist()
{
typeof(JetStreamEngine).GetMethod("ApplyConsumerEntries", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1647
public void ProcessReplicatedAck_Method_ShouldExist()
{
typeof(NatsConsumer).GetMethod("ProcessReplicatedAck", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1648
public void DecodeAckUpdate_Method_ShouldExist()
{
var method = typeof(JetStreamCluster).GetMethod("DecodeAckUpdate", BindingFlags.Static | BindingFlags.NonPublic);
method.ShouldNotBeNull();
}
[Fact] // T:1649
public void DecodeDeliveredUpdate_Method_ShouldExist()
{
typeof(JetStreamCluster).GetMethod("DecodeDeliveredUpdate", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1650
public void ProcessConsumerLeaderChange_Method_ShouldExist()
{
typeof(JetStreamEngine).GetMethod("ProcessConsumerLeaderChange", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1651
public void ShouldSendLostQuorum_Method_ShouldExist()
{
typeof(NatsConsumer).GetMethod("ShouldSendLostQuorum", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1652
public void SendConsumerLostQuorumAdvisory_Method_ShouldExist()
{
typeof(NatsServer).GetMethod("SendConsumerLostQuorumAdvisory", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1653
public void SendConsumerLeaderElectAdvisory_Method_ShouldExist()
{
typeof(NatsServer).GetMethod("SendConsumerLeaderElectAdvisory", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1654
public void IsInsufficientResourcesErr_Method_ShouldExist()
{
typeof(JetStreamCluster).GetMethod("IsInsufficientResourcesErr", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1655
public void ProcessStreamAssignmentResults_Method_ShouldExist()
{
typeof(JetStreamEngine).GetMethod("ProcessStreamAssignmentResults", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
}

Binary file not shown.