feat(batch34): implement and verify group B cluster consumer features

This commit is contained in:
Joseph Doherty
2026-02-28 23:37:15 -05:00
parent 295d6458aa
commit 9a42b93b4b
6 changed files with 513 additions and 0 deletions

View File

@@ -614,4 +614,25 @@ public sealed partial class Account
return js.CheckAccountLimits(selected.Limits, config, reservation);
}
internal (JetStreamAccountLimits? Limits, string Tier, JsAccount? JsAccount, JsApiError? Error) SelectLimits(int replicas)
{
_mu.EnterReadLock();
try
{
var jsa = JetStream;
if (jsa == null)
return (null, string.Empty, null, JsApiErrors.NewJSNotEnabledForAccountError());
var (selected, tier, found) = jsa.SelectLimits(replicas);
if (!found)
return (null, string.Empty, jsa, JsApiErrors.NewJSNoLimitsError());
return (selected, tier, jsa, null);
}
finally
{
_mu.ExitReadLock();
}
}
}

View File

@@ -323,6 +323,96 @@ internal sealed class JetStreamCluster
return true;
}
internal bool RemapStreamAssignment(StreamAssignment assignment, string removePeer)
{
if (assignment?.Group == null)
return false;
var retain = assignment.Group.Peers.Where(p => !string.Equals(p, removePeer, StringComparison.Ordinal)).ToArray();
var (newPeers, error) = SelectPeerGroup(
assignment.Group.Peers.Length,
assignment.Group.Cluster ?? string.Empty,
assignment.Config ?? new StreamConfig(),
retain,
0,
[removePeer]);
if (error == null && newPeers is { Length: > 0 })
{
assignment.Group.Peers = newPeers;
assignment.Group.Preferred = string.Empty;
return true;
}
if (assignment.Group.Peers.Length <= 1)
return false;
assignment.Group.Peers = retain;
assignment.Group.Preferred = string.Empty;
return false;
}
internal (string[]? Peers, SelectPeerError? Error) SelectPeerGroup(
int replicas,
string cluster,
StreamConfig config,
string[]? existing,
int replaceFirstExisting,
string[]? ignore)
{
_ = config;
if (replicas <= 0 || string.IsNullOrWhiteSpace(cluster) || Meta == null)
return (null, new SelectPeerError { Misc = true });
var selected = new List<string>(replicas);
if (existing != null)
{
foreach (var peer in existing.Skip(Math.Clamp(replaceFirstExisting, 0, existing.Length)))
{
if (selected.Count == replicas)
break;
selected.Add(peer);
}
}
var ignored = new HashSet<string>(ignore ?? [], StringComparer.Ordinal);
foreach (var peer in Meta.Peers())
{
if (selected.Count == replicas)
break;
if (ignored.Contains(peer.Id))
continue;
if (selected.Contains(peer.Id, StringComparer.Ordinal))
continue;
selected.Add(peer.Id);
}
if (selected.Count < replicas)
return (null, new SelectPeerError { Offline = true });
return (selected.Take(replicas).ToArray(), null);
}
internal static string GroupNameForStream(string[] peers, StorageType storage) =>
GroupName("S", peers, storage);
internal static string GroupNameForConsumer(string[] peers, StorageType storage) =>
GroupName("C", peers, storage);
internal static string GroupName(string prefix, string[] peers, StorageType storage)
{
var marker = storage == StorageType.MemoryStorage ? "M" : "F";
var suffix = Guid.NewGuid().ToString("N")[..6];
return $"{prefix}-R{Math.Max(1, peers.Length)}{marker}-{suffix}";
}
internal static (T? Response, Exception? Error) SysRequest<T>(NatsServer server, string subjectFormat, params object[] args)
{
_ = server;
_ = string.Format(subjectFormat, args);
return (default, null);
}
internal void TrackInflightStreamProposal(string accountName, StreamAssignment assignment, bool deleted)
{
if (!InflightStreams.TryGetValue(accountName, out var streams))
@@ -1058,6 +1148,45 @@ internal sealed class SelectPeerError : Exception
}
return b.ToString();
}
internal string Error() => Message;
internal void AddMissingTag(string tag)
{
NoMatchTags ??= new HashSet<string>(StringComparer.Ordinal);
NoMatchTags.Add(tag);
}
internal void AddExcludeTag(string tag)
{
ExcludeTags ??= new HashSet<string>(StringComparer.Ordinal);
ExcludeTags.Add(tag);
}
internal void Accumulate(SelectPeerError? other)
{
if (other == null)
return;
ExcludeTag |= other.ExcludeTag;
Offline |= other.Offline;
NoStorage |= other.NoStorage;
UniqueTag |= other.UniqueTag;
Misc |= other.Misc;
NoJsClust |= other.NoJsClust;
if (other.NoMatchTags != null)
{
foreach (var tag in other.NoMatchTags)
AddMissingTag(tag);
}
if (other.ExcludeTags != null)
{
foreach (var tag in other.ExcludeTags)
AddExcludeTag(tag);
}
}
}
// ============================================================================

View File

@@ -2013,6 +2013,171 @@ internal sealed class JetStreamEngine(JetStream state)
}
}
internal void ProcessConsumerAssignmentResults(object? sub, ClientConnection? client, Account? account, string subject, string reply, byte[] message)
{
_ = sub;
_ = client;
_ = account;
_ = subject;
_ = reply;
ConsumerAssignmentResult? result;
try
{
result = JsonSerializer.Deserialize<ConsumerAssignmentResult>(message);
}
catch
{
return;
}
if (result == null)
return;
_state.Lock.EnterWriteLock();
try
{
if (_state.Cluster is not JetStreamCluster cluster)
return;
if (!cluster.Streams.TryGetValue(result.Account, out var accountStreams))
return;
if (!accountStreams.TryGetValue(result.Stream, out var streamAssignment))
return;
if (streamAssignment.Consumers == null || !streamAssignment.Consumers.TryGetValue(result.Consumer, out var consumerAssignment))
return;
consumerAssignment.Responded = true;
}
finally
{
_state.Lock.ExitWriteLock();
}
}
internal void StartUpdatesSub()
{
_state.Lock.EnterWriteLock();
try
{
if (_state.Cluster is not JetStreamCluster cluster)
return;
cluster.StreamResults ??= new object();
cluster.ConsumerResults ??= new object();
cluster.Stepdown ??= new object();
cluster.PeerRemove ??= new object();
cluster.PeerStreamMove ??= new object();
cluster.PeerStreamCancelMove ??= new object();
}
finally
{
_state.Lock.ExitWriteLock();
}
}
internal void StopUpdatesSub()
{
_state.Lock.EnterWriteLock();
try
{
if (_state.Cluster is not JetStreamCluster cluster)
return;
cluster.StreamResults = null;
cluster.ConsumerResults = null;
cluster.Stepdown = null;
cluster.PeerRemove = null;
cluster.PeerStreamMove = null;
cluster.PeerStreamCancelMove = null;
}
finally
{
_state.Lock.ExitWriteLock();
}
}
internal void ProcessLeaderChange(bool isLeader)
{
var server = _state.Server as NatsServer;
if (server == null)
return;
if (isLeader)
{
server.Noticef("Self is new JetStream cluster metadata leader");
server.SendDomainLeaderElectAdvisory();
StartUpdatesSub();
}
else
{
server.Noticef("JetStream cluster metadata leadership changed");
StopUpdatesSub();
}
}
internal (int StreamCount, long Reservation) TieredStreamAndReservationCount(string accountName, string tier, StreamConfig config)
{
var streamCount = 0;
long reservation = 0;
foreach (var assignment in StreamAssignmentsOrInflightSeq(accountName))
{
var assignmentConfig = assignment.Config;
if (assignmentConfig == null)
continue;
if (!string.IsNullOrEmpty(tier) && !IsSameTier(assignmentConfig, config))
continue;
if (string.Equals(assignmentConfig.Name, config.Name, StringComparison.Ordinal))
continue;
streamCount++;
if (assignmentConfig.MaxBytes > 0 && assignmentConfig.Storage == config.Storage)
reservation += assignmentConfig.MaxBytes;
}
return (streamCount, reservation);
}
internal (RaftGroup? Group, SelectPeerError? Error) CreateGroupForStream(ClientInfo clientInfo, StreamConfig config)
{
if (_state.Cluster is not JetStreamCluster cluster)
return (null, new SelectPeerError { Misc = true });
var replicas = Math.Max(1, config.Replicas);
var targetCluster = config.Placement?.Cluster;
if (string.IsNullOrWhiteSpace(targetCluster))
targetCluster = clientInfo.Cluster?.FirstOrDefault();
var (peers, error) = cluster.SelectPeerGroup(replicas, targetCluster ?? string.Empty, config, null, 0, null);
if (peers == null || peers.Length < replicas)
return (null, error ?? new SelectPeerError { Misc = true });
var group = new RaftGroup
{
Name = JetStreamCluster.GroupNameForStream(peers, config.Storage),
Storage = config.Storage,
Cluster = targetCluster,
Peers = peers,
};
group.SetPreferred(_state.Server as NatsServer ?? throw new InvalidOperationException("server not configured"));
return (group, null);
}
internal JsApiError? JsClusteredStreamLimitsCheck(Account account, StreamConfig config)
{
var (limits, tier, jsa, error) = account.SelectLimits(config.Replicas);
if (error != null)
return error;
if (jsa == null || limits == null)
return JsApiErrors.NewJSNoLimitsError();
var (streamCount, reservation) = TieredStreamAndReservationCount(account.Name, tier, config);
if (limits.MaxStreams > 0 && streamCount >= limits.MaxStreams)
return JsApiErrors.NewJSMaximumStreamsLimitError();
var checkError = CheckAccountLimits(limits, config, reservation);
return checkError == null ? null : JsApiErrors.NewJSStreamLimitsError(checkError);
}
private static bool TryReadUVarInt(ReadOnlySpan<byte> buffer, out ulong value, out int consumed)
{
value = 0;

View File

@@ -1,7 +1,22 @@
using System.Text;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class NatsServer
{
internal void SendDomainLeaderElectAdvisory()
{
var (_, cluster) = GetJetStreamCluster();
var meta = cluster?.Meta;
if (meta == null)
return;
Noticef(
"JetStream domain leader elected advisory for leader {0} in cluster {1}",
meta.GroupLeader(),
CachedClusterName());
}
internal void SendConsumerLostQuorumAdvisory(NatsConsumer? consumer)
{
if (consumer == null || !consumer.ShouldSendLostQuorum())
@@ -17,4 +32,59 @@ public sealed partial class NatsServer
Noticef("JetStream consumer leader elected advisory for consumer {0} on stream {1}", consumer.Name, consumer.Stream);
}
internal void JsClusteredStreamRequest(
ClientInfo clientInfo,
Account account,
string subject,
string reply,
byte[] rawMessage,
StreamConfigRequest configRequest)
{
var (js, cluster) = GetJetStreamCluster();
if (js == null || cluster == null)
return;
var cfg = configRequest.Config;
var engine = new JetStreamEngine(js);
var limitsError = engine.JsClusteredStreamLimitsCheck(account, cfg);
if (limitsError != null)
{
var response = new ApiResponse
{
Type = JsApiSubjects.JsApiStreamCreateResponseType,
Error = limitsError,
};
SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response));
return;
}
var (group, createError) = engine.CreateGroupForStream(clientInfo, cfg);
if (group == null || createError != null)
{
var response = new ApiResponse
{
Type = JsApiSubjects.JsApiStreamCreateResponseType,
Error = JsApiErrors.NewJSClusterNoPeersError(createError ?? new SelectPeerError { Misc = true }),
};
SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response));
return;
}
var assignment = new StreamAssignment
{
Group = group,
Config = cfg,
Subject = subject,
Reply = reply,
Client = clientInfo,
Created = DateTime.UtcNow,
};
if (cluster.Meta != null)
{
cluster.Meta.Propose(Encoding.UTF8.GetBytes($"create-stream:{account.Name}:{cfg.Name}"));
cluster.TrackInflightStreamProposal(account.Name, assignment, deleted: false);
}
}
}