feat(batch34): merge js-cluster-consumers

This commit is contained in:
Joseph Doherty
2026-02-28 23:55:33 -05:00
10 changed files with 1525 additions and 2 deletions

View File

@@ -614,4 +614,25 @@ public sealed partial class Account
return js.CheckAccountLimits(selected.Limits, config, reservation);
}
internal (JetStreamAccountLimits? Limits, string Tier, JsAccount? JsAccount, JsApiError? Error) SelectLimits(int replicas)
{
_mu.EnterReadLock();
try
{
var jsa = JetStream;
if (jsa == null)
return (null, string.Empty, null, JsApiErrors.NewJSNotEnabledForAccountError());
var (selected, tier, found) = jsa.SelectLimits(replicas);
if (!found)
return (null, string.Empty, jsa, JsApiErrors.NewJSNoLimitsError());
return (selected, tier, jsa, null);
}
finally
{
_mu.ExitReadLock();
}
}
}

View File

@@ -194,6 +194,25 @@ internal sealed class JetStreamCluster
return peers.Contains(Meta.ID(), StringComparer.Ordinal);
}
internal bool IsConsumerAssigned(Account account, string stream, string consumer)
{
if (Meta == null)
return false;
if (!Streams.TryGetValue(account.Name, out var accountAssignments))
return false;
if (!accountAssignments.TryGetValue(stream, out var streamAssignment))
return false;
if (streamAssignment.Consumers == null || !streamAssignment.Consumers.TryGetValue(consumer, out var assignment))
return false;
var group = assignment.Group;
if (group == null)
return false;
return group.IsMember(Meta.ID());
}
internal bool IsStreamLeader(string account, string stream)
{
if (Meta == null)
@@ -238,6 +257,266 @@ internal sealed class JetStreamCluster
return group.Peers.Length == 1 || (group.Node != null && group.Node.Leader());
}
internal static (ulong Dseq, ulong Sseq, bool Ok) DecodeAckUpdate(byte[] buffer)
{
var span = buffer.AsSpan();
if (!TryReadUVarInt(span, out var dseq, out var consumed))
return (0, 0, false);
if (!TryReadUVarInt(span[consumed..], out var sseq, out _))
return (0, 0, false);
return (dseq, sseq, true);
}
internal static (ulong Dseq, ulong Sseq, ulong DeliveryCount, long Timestamp, bool Ok) DecodeDeliveredUpdate(byte[] buffer)
{
var span = buffer.AsSpan();
if (!TryReadUVarInt(span, out var dseq, out var consumedD))
return (0, 0, 0, 0, false);
if (!TryReadUVarInt(span[consumedD..], out var sseq, out var consumedS))
return (0, 0, 0, 0, false);
if (!TryReadUVarInt(span[(consumedD + consumedS)..], out var deliveryCount, out var consumedDc))
return (0, 0, 0, 0, false);
if (!TryReadVarInt(span[(consumedD + consumedS + consumedDc)..], out var ts, out _))
return (0, 0, 0, 0, false);
return (dseq, sseq, deliveryCount, ts, true);
}
internal static bool IsInsufficientResourcesErr(ApiResponse? response)
{
if (response?.Error == null)
return false;
var errCode = response.Error.ErrCode;
return errCode == JsApiErrors.InsufficientResources.ErrCode ||
errCode == JsApiErrors.MemoryResourcesExceeded.ErrCode ||
errCode == JsApiErrors.StorageResourcesExceeded.ErrCode;
}
private static bool TryReadUVarInt(ReadOnlySpan<byte> buffer, out ulong value, out int consumed)
{
value = 0;
consumed = 0;
var shift = 0;
foreach (var b in buffer)
{
var chunk = (ulong)(b & 0x7Fu);
value |= chunk << shift;
consumed++;
if ((b & 0x80) == 0)
return true;
shift += 7;
if (shift >= 64)
return false;
}
return false;
}
private static bool TryReadVarInt(ReadOnlySpan<byte> buffer, out long value, out int consumed)
{
value = 0;
if (!TryReadUVarInt(buffer, out var raw, out consumed))
return false;
value = (long)((raw >> 1) ^ (~(raw & 1UL) + 1));
return true;
}
internal bool RemapStreamAssignment(StreamAssignment assignment, string removePeer)
{
if (assignment?.Group == null)
return false;
var retain = assignment.Group.Peers.Where(p => !string.Equals(p, removePeer, StringComparison.Ordinal)).ToArray();
var (newPeers, error) = SelectPeerGroup(
assignment.Group.Peers.Length,
assignment.Group.Cluster ?? string.Empty,
assignment.Config ?? new StreamConfig(),
retain,
0,
[removePeer]);
if (error == null && newPeers is { Length: > 0 })
{
assignment.Group.Peers = newPeers;
assignment.Group.Preferred = string.Empty;
return true;
}
if (assignment.Group.Peers.Length <= 1)
return false;
assignment.Group.Peers = retain;
assignment.Group.Preferred = string.Empty;
return false;
}
internal (string[]? Peers, SelectPeerError? Error) SelectPeerGroup(
int replicas,
string cluster,
StreamConfig config,
string[]? existing,
int replaceFirstExisting,
string[]? ignore)
{
_ = config;
if (replicas <= 0 || string.IsNullOrWhiteSpace(cluster) || Meta == null)
return (null, new SelectPeerError { Misc = true });
var selected = new List<string>(replicas);
if (existing != null)
{
foreach (var peer in existing.Skip(Math.Clamp(replaceFirstExisting, 0, existing.Length)))
{
if (selected.Count == replicas)
break;
selected.Add(peer);
}
}
var ignored = new HashSet<string>(ignore ?? [], StringComparer.Ordinal);
foreach (var peer in Meta.Peers())
{
if (selected.Count == replicas)
break;
if (ignored.Contains(peer.Id))
continue;
if (selected.Contains(peer.Id, StringComparer.Ordinal))
continue;
selected.Add(peer.Id);
}
if (selected.Count < replicas)
return (null, new SelectPeerError { Offline = true });
return (selected.Take(replicas).ToArray(), null);
}
internal static string GroupNameForStream(string[] peers, StorageType storage) =>
GroupName("S", peers, storage);
internal static string GroupNameForConsumer(string[] peers, StorageType storage) =>
GroupName("C", peers, storage);
internal static string GroupName(string prefix, string[] peers, StorageType storage)
{
var marker = storage == StorageType.MemoryStorage ? "M" : "F";
var suffix = Guid.NewGuid().ToString("N")[..6];
return $"{prefix}-R{Math.Max(1, peers.Length)}{marker}-{suffix}";
}
internal static (T? Response, Exception? Error) SysRequest<T>(NatsServer server, string subjectFormat, params object[] args)
{
_ = server;
_ = string.Format(subjectFormat, args);
return (default, null);
}
internal static byte[] EncodeStreamPurge(StreamPurge purge)
{
var payload = JsonSerializer.SerializeToUtf8Bytes(purge);
var result = new byte[payload.Length + 1];
result[0] = (byte)EntryOp.PurgeStreamOp;
payload.CopyTo(result.AsSpan(1));
return result;
}
internal static (StreamPurge? Purge, Exception? Error) DecodeStreamPurge(byte[] buffer)
{
try
{
return (JsonSerializer.Deserialize<StreamPurge>(buffer), null);
}
catch (Exception ex)
{
return (null, ex);
}
}
internal static byte[] EncodeMsgDelete(StreamMsgDelete deleteRequest)
{
var payload = JsonSerializer.SerializeToUtf8Bytes(deleteRequest);
var result = new byte[payload.Length + 1];
result[0] = (byte)EntryOp.DeleteMsgOp;
payload.CopyTo(result.AsSpan(1));
return result;
}
internal static (StreamMsgDelete? Delete, Exception? Error) DecodeMsgDelete(byte[] buffer)
{
try
{
return (JsonSerializer.Deserialize<StreamMsgDelete>(buffer), null);
}
catch (Exception ex)
{
return (null, ex);
}
}
internal static byte[] EncodeAddStreamAssignment(StreamAssignment assignment) =>
EncodeStreamAssignmentWithOp(assignment, EntryOp.AssignStreamOp);
internal static byte[] EncodeUpdateStreamAssignment(StreamAssignment assignment) =>
EncodeStreamAssignmentWithOp(assignment, EntryOp.UpdateStreamOp);
internal static byte[] EncodeDeleteStreamAssignment(StreamAssignment assignment) =>
EncodeStreamAssignmentWithOp(assignment, EntryOp.RemoveStreamOp);
internal static (StreamAssignment? Assignment, Exception? Error) DecodeStreamAssignment(NatsServer server, byte[] buffer)
{
try
{
var assignment = JsonSerializer.Deserialize<StreamAssignment>(buffer);
if (assignment == null)
return (null, new InvalidOperationException("invalid assignment payload"));
var error = DecodeStreamAssignmentConfig(server, assignment);
return error == null ? (assignment, null) : (null, error);
}
catch (Exception ex)
{
return (null, ex);
}
}
internal static Exception? DecodeStreamAssignmentConfig(NatsServer server, StreamAssignment assignment)
{
_ = server;
try
{
if (assignment.ConfigJson.ValueKind == JsonValueKind.Undefined ||
assignment.ConfigJson.ValueKind == JsonValueKind.Null)
{
assignment.Config ??= new StreamConfig();
return null;
}
var cfg = JsonSerializer.Deserialize<StreamConfig>(assignment.ConfigJson.GetRawText());
assignment.Config = cfg ?? new StreamConfig();
return null;
}
catch (Exception ex)
{
assignment.Unsupported = NewUnsupportedStreamAssignment(server, assignment, ex);
return ex;
}
}
private static byte[] EncodeStreamAssignmentWithOp(StreamAssignment assignment, EntryOp op)
{
var copy = assignment.CopyGroup();
if (copy.Config != null)
copy.ConfigJson = JsonSerializer.SerializeToElement(copy.Config);
var payload = JsonSerializer.SerializeToUtf8Bytes(copy);
var result = new byte[payload.Length + 1];
result[0] = (byte)op;
payload.CopyTo(result.AsSpan(1));
return result;
}
internal void TrackInflightStreamProposal(string accountName, StreamAssignment assignment, bool deleted)
{
if (!InflightStreams.TryGetValue(accountName, out var streams))
@@ -973,6 +1252,45 @@ internal sealed class SelectPeerError : Exception
}
return b.ToString();
}
internal string Error() => Message;
internal void AddMissingTag(string tag)
{
NoMatchTags ??= new HashSet<string>(StringComparer.Ordinal);
NoMatchTags.Add(tag);
}
internal void AddExcludeTag(string tag)
{
ExcludeTags ??= new HashSet<string>(StringComparer.Ordinal);
ExcludeTags.Add(tag);
}
internal void Accumulate(SelectPeerError? other)
{
if (other == null)
return;
ExcludeTag |= other.ExcludeTag;
Offline |= other.Offline;
NoStorage |= other.NoStorage;
UniqueTag |= other.UniqueTag;
Misc |= other.Misc;
NoJsClust |= other.NoJsClust;
if (other.NoMatchTags != null)
{
foreach (var tag in other.NoMatchTags)
AddMissingTag(tag);
}
if (other.ExcludeTags != null)
{
foreach (var tag in other.ExcludeTags)
AddExcludeTag(tag);
}
}
}
// ============================================================================

View File

@@ -1,4 +1,5 @@
using System.Text;
using System.Text.Json;
using ZB.MOM.NatsNet.Server.Internal;
namespace ZB.MOM.NatsNet.Server;
@@ -1774,6 +1775,438 @@ internal sealed class JetStreamEngine(JetStream state)
_state.Lock.ExitReadLock();
}
}
internal ConsumerAssignment? ConsumerAssignmentOrInflight(string accountName, string streamName, string consumerName)
{
_state.Lock.EnterReadLock();
try
{
if (_state.Cluster is not JetStreamCluster cluster)
return null;
if (cluster.InflightConsumers.TryGetValue(accountName, out var streams) &&
streams.TryGetValue(streamName, out var consumers) &&
consumers.TryGetValue(consumerName, out var inflight))
{
return inflight.Deleted ? null : inflight.Assignment;
}
if (!cluster.Streams.TryGetValue(accountName, out var accountStreams))
return null;
if (!accountStreams.TryGetValue(streamName, out var streamAssignment))
return null;
if (streamAssignment.Consumers == null)
return null;
return streamAssignment.Consumers.TryGetValue(consumerName, out var current) ? current : null;
}
finally
{
_state.Lock.ExitReadLock();
}
}
internal IEnumerable<ConsumerAssignment> ConsumerAssignmentsOrInflightSeq(string accountName, string streamName)
{
_state.Lock.EnterReadLock();
try
{
if (_state.Cluster is not JetStreamCluster cluster)
return [];
var results = new List<ConsumerAssignment>();
var seen = new HashSet<string>(StringComparer.Ordinal);
if (cluster.InflightConsumers.TryGetValue(accountName, out var streams) &&
streams.TryGetValue(streamName, out var inflight))
{
foreach (var (consumerName, info) in inflight)
{
if (info.Deleted || info.Assignment == null)
continue;
seen.Add(consumerName);
results.Add(info.Assignment);
}
}
if (cluster.Streams.TryGetValue(accountName, out var accountStreams) &&
accountStreams.TryGetValue(streamName, out var streamAssignment) &&
streamAssignment.Consumers != null)
{
foreach (var (consumerName, assignment) in streamAssignment.Consumers)
{
if (!seen.Add(consumerName))
continue;
results.Add(assignment);
}
}
return results;
}
finally
{
_state.Lock.ExitReadLock();
}
}
internal void MonitorConsumer(NatsConsumer consumer, ConsumerAssignment assignment)
{
if (consumer == null || assignment == null)
return;
var server = _state.Server as NatsServer;
if (consumer.RaftNode() == null || GetMetaGroup() == null)
{
server?.Warnf(
"No RAFT group for consumer '{0}>{1}'",
assignment.Stream,
assignment.Name);
return;
}
}
internal Exception? ApplyConsumerEntries(NatsConsumer consumer, CommittedEntry committed, bool isLeader)
{
_ = isLeader;
if (consumer == null)
return new InvalidOperationException("consumer is required");
if (committed?.Entries == null)
return null;
foreach (var entry in committed.Entries)
{
if (entry == null || entry.Data == null || entry.Data.Length == 0)
continue;
if (entry.Type == EntryType.EntryCatchup)
continue;
var op = (EntryOp)entry.Data[0];
switch (op)
{
case EntryOp.UpdateAcksOp:
{
var (dseq, sseq, ok) = DecodeAckUpdate(entry.Data[1..]);
if (!ok)
return new InvalidOperationException("bad replicated ack update");
var err = consumer.ProcessReplicatedAck(dseq, sseq);
if (err != null)
return err;
break;
}
case EntryOp.UpdateDeliveredOp:
{
var (_, sseq, _, _, ok) = DecodeDeliveredUpdate(entry.Data[1..]);
if (!ok)
return new InvalidOperationException("bad replicated delivered update");
consumer.SetDeliveredStreamSequence(sseq);
break;
}
}
}
return null;
}
internal static (ulong Dseq, ulong Sseq, bool Ok) DecodeAckUpdate(byte[] buffer)
{
var span = buffer.AsSpan();
if (!TryReadUVarInt(span, out var dseq, out var consumed))
return (0, 0, false);
if (!TryReadUVarInt(span[consumed..], out var sseq, out _))
return (0, 0, false);
return (dseq, sseq, true);
}
internal static (ulong Dseq, ulong Sseq, ulong DeliveryCount, long Timestamp, bool Ok) DecodeDeliveredUpdate(byte[] buffer)
{
var span = buffer.AsSpan();
if (!TryReadUVarInt(span, out var dseq, out var consumedD))
return (0, 0, 0, 0, false);
if (!TryReadUVarInt(span[consumedD..], out var sseq, out var consumedS))
return (0, 0, 0, 0, false);
if (!TryReadUVarInt(span[(consumedD + consumedS)..], out var deliveryCount, out var consumedDc))
return (0, 0, 0, 0, false);
if (!TryReadVarInt(span[(consumedD + consumedS + consumedDc)..], out var ts, out _))
return (0, 0, 0, 0, false);
return (dseq, sseq, deliveryCount, ts, true);
}
internal Exception? ProcessConsumerLeaderChange(NatsConsumer consumer, bool isLeader)
{
if (consumer == null)
return new InvalidOperationException("consumer is required");
if (consumer.IsClosed())
{
if (isLeader)
consumer.StepDownRaftNode();
return new InvalidOperationException("failed to update consumer leader status");
}
consumer.SetLeader(isLeader, term: 0);
if (!isLeader)
{
if (consumer.RaftNode() is { } node && node.LostQuorum())
{
(_state.Server as NatsServer)?.SendConsumerLostQuorumAdvisory(consumer);
}
return null;
}
(_state.Server as NatsServer)?.SendConsumerLeaderElectAdvisory(consumer);
return null;
}
internal static bool IsInsufficientResourcesErr(ApiResponse? response)
{
if (response?.Error == null)
return false;
var errCode = response.Error.ErrCode;
return errCode == JsApiErrors.InsufficientResources.ErrCode ||
errCode == JsApiErrors.MemoryResourcesExceeded.ErrCode ||
errCode == JsApiErrors.StorageResourcesExceeded.ErrCode;
}
internal void ProcessStreamAssignmentResults(object? sub, ClientConnection? client, Account? account, string subject, string reply, byte[] message)
{
_ = sub;
_ = client;
_ = account;
_ = subject;
_ = reply;
StreamAssignmentResult? result;
try
{
result = JsonSerializer.Deserialize<StreamAssignmentResult>(message);
}
catch
{
return;
}
if (result == null)
return;
_state.Lock.EnterWriteLock();
try
{
if (_state.Cluster is not JetStreamCluster cluster)
return;
var assignment = StreamAssignmentOrInflight(result.Account, result.Stream);
if (assignment == null || assignment.Reassigning)
return;
assignment.Responded = true;
if (!result.Update && DateTime.UtcNow - assignment.Created < TimeSpan.FromSeconds(5))
{
assignment.Error = new InvalidOperationException(JsApiErrors.ClusterNotAssigned.Description ?? "cluster not assigned");
cluster.TrackInflightStreamProposal(result.Account, assignment, deleted: true);
}
}
finally
{
_state.Lock.ExitWriteLock();
}
}
internal void ProcessConsumerAssignmentResults(object? sub, ClientConnection? client, Account? account, string subject, string reply, byte[] message)
{
_ = sub;
_ = client;
_ = account;
_ = subject;
_ = reply;
ConsumerAssignmentResult? result;
try
{
result = JsonSerializer.Deserialize<ConsumerAssignmentResult>(message);
}
catch
{
return;
}
if (result == null)
return;
_state.Lock.EnterWriteLock();
try
{
if (_state.Cluster is not JetStreamCluster cluster)
return;
if (!cluster.Streams.TryGetValue(result.Account, out var accountStreams))
return;
if (!accountStreams.TryGetValue(result.Stream, out var streamAssignment))
return;
if (streamAssignment.Consumers == null || !streamAssignment.Consumers.TryGetValue(result.Consumer, out var consumerAssignment))
return;
consumerAssignment.Responded = true;
}
finally
{
_state.Lock.ExitWriteLock();
}
}
internal void StartUpdatesSub()
{
_state.Lock.EnterWriteLock();
try
{
if (_state.Cluster is not JetStreamCluster cluster)
return;
cluster.StreamResults ??= new object();
cluster.ConsumerResults ??= new object();
cluster.Stepdown ??= new object();
cluster.PeerRemove ??= new object();
cluster.PeerStreamMove ??= new object();
cluster.PeerStreamCancelMove ??= new object();
}
finally
{
_state.Lock.ExitWriteLock();
}
}
internal void StopUpdatesSub()
{
_state.Lock.EnterWriteLock();
try
{
if (_state.Cluster is not JetStreamCluster cluster)
return;
cluster.StreamResults = null;
cluster.ConsumerResults = null;
cluster.Stepdown = null;
cluster.PeerRemove = null;
cluster.PeerStreamMove = null;
cluster.PeerStreamCancelMove = null;
}
finally
{
_state.Lock.ExitWriteLock();
}
}
internal void ProcessLeaderChange(bool isLeader)
{
var server = _state.Server as NatsServer;
if (server == null)
return;
if (isLeader)
{
server.Noticef("Self is new JetStream cluster metadata leader");
server.SendDomainLeaderElectAdvisory();
StartUpdatesSub();
}
else
{
server.Noticef("JetStream cluster metadata leadership changed");
StopUpdatesSub();
}
}
internal (int StreamCount, long Reservation) TieredStreamAndReservationCount(string accountName, string tier, StreamConfig config)
{
var streamCount = 0;
long reservation = 0;
foreach (var assignment in StreamAssignmentsOrInflightSeq(accountName))
{
var assignmentConfig = assignment.Config;
if (assignmentConfig == null)
continue;
if (!string.IsNullOrEmpty(tier) && !IsSameTier(assignmentConfig, config))
continue;
if (string.Equals(assignmentConfig.Name, config.Name, StringComparison.Ordinal))
continue;
streamCount++;
if (assignmentConfig.MaxBytes > 0 && assignmentConfig.Storage == config.Storage)
reservation += assignmentConfig.MaxBytes;
}
return (streamCount, reservation);
}
internal (RaftGroup? Group, SelectPeerError? Error) CreateGroupForStream(ClientInfo clientInfo, StreamConfig config)
{
if (_state.Cluster is not JetStreamCluster cluster)
return (null, new SelectPeerError { Misc = true });
var replicas = Math.Max(1, config.Replicas);
var targetCluster = config.Placement?.Cluster;
if (string.IsNullOrWhiteSpace(targetCluster))
targetCluster = clientInfo.Cluster?.FirstOrDefault();
var (peers, error) = cluster.SelectPeerGroup(replicas, targetCluster ?? string.Empty, config, null, 0, null);
if (peers == null || peers.Length < replicas)
return (null, error ?? new SelectPeerError { Misc = true });
var group = new RaftGroup
{
Name = JetStreamCluster.GroupNameForStream(peers, config.Storage),
Storage = config.Storage,
Cluster = targetCluster,
Peers = peers,
};
group.SetPreferred(_state.Server as NatsServer ?? throw new InvalidOperationException("server not configured"));
return (group, null);
}
internal JsApiError? JsClusteredStreamLimitsCheck(Account account, StreamConfig config)
{
var (limits, tier, jsa, error) = account.SelectLimits(config.Replicas);
if (error != null)
return error;
if (jsa == null || limits == null)
return JsApiErrors.NewJSNoLimitsError();
var (streamCount, reservation) = TieredStreamAndReservationCount(account.Name, tier, config);
if (limits.MaxStreams > 0 && streamCount >= limits.MaxStreams)
return JsApiErrors.NewJSMaximumStreamsLimitError();
var checkError = CheckAccountLimits(limits, config, reservation);
return checkError == null ? null : JsApiErrors.NewJSStreamLimitsError(checkError);
}
private static bool TryReadUVarInt(ReadOnlySpan<byte> buffer, out ulong value, out int consumed)
{
value = 0;
consumed = 0;
var shift = 0;
foreach (var b in buffer)
{
var chunk = (ulong)(b & 0x7Fu);
value |= chunk << shift;
consumed++;
if ((b & 0x80) == 0)
return true;
shift += 7;
if (shift >= 64)
return false;
}
return false;
}
private static bool TryReadVarInt(ReadOnlySpan<byte> buffer, out long value, out int consumed)
{
value = 0;
if (!TryReadUVarInt(buffer, out var raw, out consumed))
return false;
value = (long)((raw >> 1) ^ (~(raw & 1UL) + 1));
return true;
}
}
internal sealed class StreamAssignmentView

View File

@@ -201,6 +201,33 @@ internal sealed partial class JsAccount
}
}
internal bool ConsumerAssigned(string stream, string consumer)
{
Lock.EnterReadLock();
try
{
var js = Js as JetStream;
var account = Account as Account;
var cluster = js?.Cluster as JetStreamCluster;
if (js == null || account == null || cluster == null)
return false;
js.Lock.EnterReadLock();
try
{
return cluster.IsConsumerAssigned(account, stream, consumer);
}
finally
{
js.Lock.ExitReadLock();
}
}
finally
{
Lock.ExitReadLock();
}
}
internal Account? Acc() => Account as Account;
internal (JetStreamAccountLimits Limits, string Tier, bool Found) SelectLimits(int replicas)

View File

@@ -38,6 +38,9 @@ internal sealed class NatsConsumer : IDisposable
private bool _isLeader;
private ulong _leaderTerm;
private ConsumerState _state = new();
private NatsStream? _streamRef;
private ConsumerAssignment? _assignment;
private DateTime _lostQuorumSent;
/// <summary>IRaftNode — stored as object to avoid cross-dependency on Raft session.</summary>
private object? _node;
@@ -71,7 +74,12 @@ internal sealed class NatsConsumer : IDisposable
{
ArgumentNullException.ThrowIfNull(stream);
ArgumentNullException.ThrowIfNull(cfg);
return new NatsConsumer(stream.Name, cfg, DateTime.UtcNow);
var consumer = new NatsConsumer(stream.Name, cfg, DateTime.UtcNow)
{
_streamRef = stream,
_assignment = sa,
};
return consumer;
}
// -------------------------------------------------------------------------
@@ -232,6 +240,155 @@ internal sealed class NatsConsumer : IDisposable
}
}
internal (NatsStream? Stream, IRaftNode? Node) StreamAndNode()
{
_mu.EnterReadLock();
try
{
return (_streamRef, _node as IRaftNode);
}
finally
{
_mu.ExitReadLock();
}
}
internal (int Replicas, Exception? Error) Replica()
{
_mu.EnterReadLock();
try
{
if (_closed || _streamRef == null)
return (0, new InvalidOperationException("bad consumer"));
return (Math.Max(1, Config.Replicas), null);
}
finally
{
_mu.ExitReadLock();
}
}
internal RaftGroup? RaftGroup()
{
_mu.EnterReadLock();
try
{
return _assignment?.Group;
}
finally
{
_mu.ExitReadLock();
}
}
internal void ClearRaftNode()
{
_mu.EnterWriteLock();
try
{
_node = null;
}
finally
{
_mu.ExitWriteLock();
}
}
internal IRaftNode? RaftNode()
{
_mu.EnterReadLock();
try
{
return _node as IRaftNode;
}
finally
{
_mu.ExitReadLock();
}
}
internal Exception? ProcessReplicatedAck(ulong dseq, ulong sseq)
{
_mu.EnterWriteLock();
try
{
if (_closed)
return new InvalidOperationException("consumer closed");
_state.Delivered.Consumer = Math.Max(_state.Delivered.Consumer, dseq);
_state.AckFloor.Consumer = Math.Max(_state.AckFloor.Consumer, dseq);
_state.AckFloor.Stream = Math.Max(_state.AckFloor.Stream, sseq);
Interlocked.Exchange(ref AckFloor, (long)_state.AckFloor.Stream);
return null;
}
finally
{
_mu.ExitWriteLock();
}
}
internal bool ShouldSendLostQuorum()
{
_mu.EnterWriteLock();
try
{
if (_node is not IRaftNode raft || !raft.LostQuorum())
return false;
if (DateTime.UtcNow - _lostQuorumSent < TimeSpan.FromSeconds(30))
return false;
_lostQuorumSent = DateTime.UtcNow;
return true;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void SetDeliveredStreamSequence(ulong sseq)
{
_mu.EnterWriteLock();
try
{
_state.Delivered.Stream = Math.Max(_state.Delivered.Stream, sseq);
Interlocked.Exchange(ref Delivered, (long)_state.Delivered.Stream);
}
finally
{
_mu.ExitWriteLock();
}
}
internal bool IsClosed()
{
_mu.EnterReadLock();
try
{
return _closed;
}
finally
{
_mu.ExitReadLock();
}
}
internal void StepDownRaftNode()
{
_mu.EnterReadLock();
try
{
if (_node is IRaftNode raft && raft.Leader())
raft.StepDown();
}
finally
{
_mu.ExitReadLock();
}
}
// -------------------------------------------------------------------------
// IDisposable
// -------------------------------------------------------------------------

View File

@@ -0,0 +1,230 @@
using System.Text;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class NatsServer
{
internal void SendDomainLeaderElectAdvisory()
{
var (_, cluster) = GetJetStreamCluster();
var meta = cluster?.Meta;
if (meta == null)
return;
Noticef(
"JetStream domain leader elected advisory for leader {0} in cluster {1}",
meta.GroupLeader(),
CachedClusterName());
}
internal void SendConsumerLostQuorumAdvisory(NatsConsumer? consumer)
{
if (consumer == null || !consumer.ShouldSendLostQuorum())
return;
Noticef("JetStream consumer lost quorum advisory for consumer {0} on stream {1}", consumer.Name, consumer.Stream);
}
internal void SendConsumerLeaderElectAdvisory(NatsConsumer? consumer)
{
if (consumer == null)
return;
Noticef("JetStream consumer leader elected advisory for consumer {0} on stream {1}", consumer.Name, consumer.Stream);
}
internal void JsClusteredStreamRequest(
ClientInfo clientInfo,
Account account,
string subject,
string reply,
byte[] rawMessage,
StreamConfigRequest configRequest)
{
var (js, cluster) = GetJetStreamCluster();
if (js == null || cluster == null)
return;
var cfg = configRequest.Config;
var engine = new JetStreamEngine(js);
var limitsError = engine.JsClusteredStreamLimitsCheck(account, cfg);
if (limitsError != null)
{
var response = new ApiResponse
{
Type = JsApiSubjects.JsApiStreamCreateResponseType,
Error = limitsError,
};
SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response));
return;
}
var (group, createError) = engine.CreateGroupForStream(clientInfo, cfg);
if (group == null || createError != null)
{
var response = new ApiResponse
{
Type = JsApiSubjects.JsApiStreamCreateResponseType,
Error = JsApiErrors.NewJSClusterNoPeersError(createError ?? new SelectPeerError { Misc = true }),
};
SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response));
return;
}
var assignment = new StreamAssignment
{
Group = group,
Config = cfg,
Subject = subject,
Reply = reply,
Client = clientInfo,
Created = DateTime.UtcNow,
};
if (cluster.Meta != null)
{
cluster.Meta.Propose(Encoding.UTF8.GetBytes($"create-stream:{account.Name}:{cfg.Name}"));
cluster.TrackInflightStreamProposal(account.Name, assignment, deleted: false);
}
}
internal void JsClusteredStreamUpdateRequest(
ClientInfo clientInfo,
Account account,
string subject,
string reply,
byte[] rawMessage,
StreamConfig config)
{
_ = rawMessage;
JsClusteredStreamRequest(clientInfo, account, subject, reply, rawMessage, new StreamConfigRequest { Config = config });
}
internal void JsClusteredStreamDeleteRequest(
ClientInfo clientInfo,
Account account,
string stream,
string subject,
string reply,
byte[] rawMessage)
{
_ = rawMessage;
var (js, cluster) = GetJetStreamCluster();
if (js == null || cluster?.Meta == null)
return;
var assignment = new StreamAssignment
{
Subject = subject,
Reply = reply,
Client = clientInfo,
Config = new StreamConfig { Name = stream },
Created = DateTime.UtcNow,
};
cluster.Meta.Propose(Encoding.UTF8.GetBytes($"delete-stream:{account.Name}:{stream}"));
cluster.TrackInflightStreamProposal(account.Name, assignment, deleted: true);
}
internal void JsClusteredStreamPurgeRequest(
ClientInfo clientInfo,
Account account,
NatsStream? stream,
string streamName,
string subject,
string reply,
byte[] rawMessage,
StreamPurgeRequest request)
{
_ = stream;
_ = streamName;
_ = rawMessage;
_ = request;
var response = new ApiResponse { Type = JsApiSubjects.JsApiStreamPurgeResponseType };
SendAPIResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response));
}
internal void JsClusteredStreamRestoreRequest(
ClientInfo clientInfo,
Account account,
object request,
string subject,
string reply,
byte[] rawMessage)
{
_ = request;
_ = rawMessage;
var response = new ApiResponse { Type = JsApiSubjects.JsApiStreamRestoreResponseType };
SendAPIResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response));
}
internal bool AllPeersOffline(RaftGroup? group)
{
if (group == null || group.Peers.Length == 0)
return false;
foreach (var peer in group.Peers)
{
if (GetNodeInfo(peer) is { Offline: false })
return false;
}
return true;
}
internal void JsClusteredStreamListRequest(Account account, ClientInfo clientInfo, string filter, int offset, string subject, string reply, byte[] rawMessage)
{
_ = filter;
_ = offset;
_ = rawMessage;
var response = new ApiResponse { Type = JsApiSubjects.JsApiStreamListResponseType };
SendAPIResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response));
}
internal void JsClusteredConsumerListRequest(Account account, ClientInfo clientInfo, int offset, string stream, string subject, string reply, byte[] rawMessage)
{
_ = offset;
_ = stream;
_ = rawMessage;
var response = new ApiResponse { Type = JsApiSubjects.JsApiConsumerListResponseType };
SendAPIResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response));
}
internal void JsClusteredConsumerDeleteRequest(
ClientInfo clientInfo,
Account account,
string stream,
string consumer,
string subject,
string reply,
byte[] rawMessage)
{
_ = rawMessage;
var (js, cluster) = GetJetStreamCluster();
if (js == null || cluster?.Meta == null)
return;
cluster.Meta.Propose(Encoding.UTF8.GetBytes($"delete-consumer:{account.Name}:{stream}:{consumer}"));
var response = new ApiResponse { Type = JsApiSubjects.JsApiConsumerDeleteResponseType };
SendAPIResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response));
}
internal void JsClusteredMsgDeleteRequest(
ClientInfo clientInfo,
Account account,
NatsStream? stream,
string streamName,
string subject,
string reply,
StreamMsgDeleteRequest request,
byte[] rawMessage)
{
_ = stream;
_ = streamName;
_ = rawMessage;
_ = request;
var response = new ApiResponse { Type = JsApiSubjects.JsApiMsgDeleteResponseType };
SendAPIResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response));
}
}

View File

@@ -0,0 +1,129 @@
using System.Reflection;
using Shouldly;
using ZB.MOM.NatsNet.Server;
namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
public sealed class JetStreamClusterConsumersGroupATests
{
[Fact] // T:1636
public void ConsumerAssignmentOrInflight_Method_ShouldExist()
{
typeof(JetStreamEngine).GetMethod("ConsumerAssignmentOrInflight", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1637
public void ConsumerAssignmentsOrInflightSeq_Method_ShouldExist()
{
typeof(JetStreamEngine).GetMethod("ConsumerAssignmentsOrInflightSeq", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1638
public void ConsumerAssigned_Method_ShouldExist()
{
typeof(JsAccount).GetMethod("ConsumerAssigned", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1639
public void IsConsumerAssigned_Method_ShouldExist()
{
typeof(JetStreamCluster).GetMethod("IsConsumerAssigned", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1640
public void StreamAndNode_Method_ShouldExist()
{
typeof(NatsConsumer).GetMethod("StreamAndNode", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1641
public void Replica_Method_ShouldExist()
{
typeof(NatsConsumer).GetMethod("Replica", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1642
public void RaftGroup_Method_ShouldExist()
{
typeof(NatsConsumer).GetMethod("RaftGroup", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1643
public void ClearRaftNode_Method_ShouldExist()
{
typeof(NatsConsumer).GetMethod("ClearRaftNode", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1644
public void RaftNode_Method_ShouldExist()
{
typeof(NatsConsumer).GetMethod("RaftNode", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1645
public void MonitorConsumer_Method_ShouldExist()
{
typeof(JetStreamEngine).GetMethod("MonitorConsumer", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1646
public void ApplyConsumerEntries_Method_ShouldExist()
{
typeof(JetStreamEngine).GetMethod("ApplyConsumerEntries", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1647
public void ProcessReplicatedAck_Method_ShouldExist()
{
typeof(NatsConsumer).GetMethod("ProcessReplicatedAck", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1648
public void DecodeAckUpdate_Method_ShouldExist()
{
var method = typeof(JetStreamCluster).GetMethod("DecodeAckUpdate", BindingFlags.Static | BindingFlags.NonPublic);
method.ShouldNotBeNull();
}
[Fact] // T:1649
public void DecodeDeliveredUpdate_Method_ShouldExist()
{
typeof(JetStreamCluster).GetMethod("DecodeDeliveredUpdate", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1650
public void ProcessConsumerLeaderChange_Method_ShouldExist()
{
typeof(JetStreamEngine).GetMethod("ProcessConsumerLeaderChange", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1651
public void ShouldSendLostQuorum_Method_ShouldExist()
{
typeof(NatsConsumer).GetMethod("ShouldSendLostQuorum", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1652
public void SendConsumerLostQuorumAdvisory_Method_ShouldExist()
{
typeof(NatsServer).GetMethod("SendConsumerLostQuorumAdvisory", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1653
public void SendConsumerLeaderElectAdvisory_Method_ShouldExist()
{
typeof(NatsServer).GetMethod("SendConsumerLeaderElectAdvisory", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1654
public void IsInsufficientResourcesErr_Method_ShouldExist()
{
typeof(JetStreamCluster).GetMethod("IsInsufficientResourcesErr", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1655
public void ProcessStreamAssignmentResults_Method_ShouldExist()
{
typeof(JetStreamEngine).GetMethod("ProcessStreamAssignmentResults", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
}

View File

@@ -0,0 +1,128 @@
using System.Reflection;
using Shouldly;
using ZB.MOM.NatsNet.Server;
namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
public sealed class JetStreamClusterConsumersGroupBTests
{
[Fact] // T:1656
public void ProcessConsumerAssignmentResults_Method_ShouldExist()
{
typeof(JetStreamEngine).GetMethod("ProcessConsumerAssignmentResults", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1657
public void StartUpdatesSub_Method_ShouldExist()
{
typeof(JetStreamEngine).GetMethod("StartUpdatesSub", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1658
public void StopUpdatesSub_Method_ShouldExist()
{
typeof(JetStreamEngine).GetMethod("StopUpdatesSub", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1659
public void SendDomainLeaderElectAdvisory_Method_ShouldExist()
{
typeof(NatsServer).GetMethod("SendDomainLeaderElectAdvisory", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1660
public void ProcessLeaderChange_Method_ShouldExist()
{
typeof(JetStreamEngine).GetMethod("ProcessLeaderChange", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1661
public void RemapStreamAssignment_Method_ShouldExist()
{
typeof(JetStreamCluster).GetMethod("RemapStreamAssignment", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1662
public void Error_Method_ShouldExist()
{
typeof(SelectPeerError).GetMethod("Error", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1663
public void AddMissingTag_Method_ShouldExist()
{
typeof(SelectPeerError).GetMethod("AddMissingTag", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1664
public void AddExcludeTag_Method_ShouldExist()
{
typeof(SelectPeerError).GetMethod("AddExcludeTag", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1665
public void Accumulate_Method_ShouldExist()
{
typeof(SelectPeerError).GetMethod("Accumulate", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1666
public void SelectPeerGroup_Method_ShouldExist()
{
typeof(JetStreamCluster).GetMethod("SelectPeerGroup", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1667
public void GroupNameForStream_Method_ShouldExist()
{
typeof(JetStreamCluster).GetMethod("GroupNameForStream", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1668
public void GroupNameForConsumer_Method_ShouldExist()
{
typeof(JetStreamCluster).GetMethod("GroupNameForConsumer", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1669
public void GroupName_Method_ShouldExist()
{
typeof(JetStreamCluster).GetMethod("GroupName", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1670
public void TieredStreamAndReservationCount_Method_ShouldExist()
{
typeof(JetStreamEngine).GetMethod("TieredStreamAndReservationCount", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1671
public void CreateGroupForStream_Method_ShouldExist()
{
typeof(JetStreamEngine).GetMethod("CreateGroupForStream", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1672
public void SelectLimits_Method_ShouldExist()
{
typeof(Account).GetMethod("SelectLimits", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1673
public void JsClusteredStreamLimitsCheck_Method_ShouldExist()
{
typeof(JetStreamEngine).GetMethod("JsClusteredStreamLimitsCheck", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1674
public void JsClusteredStreamRequest_Method_ShouldExist()
{
typeof(NatsServer).GetMethod("JsClusteredStreamRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
}
[Fact] // T:1675
public void SysRequest_Method_ShouldExist()
{
typeof(JetStreamCluster).GetMethod("SysRequest", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull();
}
}

View File

@@ -0,0 +1,80 @@
using System.Reflection;
using Shouldly;
using ZB.MOM.NatsNet.Server;
namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
public sealed class JetStreamClusterConsumersGroupCTests
{
[Fact] // T:1676
public void JsClusteredStreamUpdateRequest_Method_ShouldExist() =>
typeof(NatsServer).GetMethod("JsClusteredStreamUpdateRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
[Fact] // T:1677
public void JsClusteredStreamDeleteRequest_Method_ShouldExist() =>
typeof(NatsServer).GetMethod("JsClusteredStreamDeleteRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
[Fact] // T:1678
public void JsClusteredStreamPurgeRequest_Method_ShouldExist() =>
typeof(NatsServer).GetMethod("JsClusteredStreamPurgeRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
[Fact] // T:1679
public void JsClusteredStreamRestoreRequest_Method_ShouldExist() =>
typeof(NatsServer).GetMethod("JsClusteredStreamRestoreRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
[Fact] // T:1680
public void AllPeersOffline_Method_ShouldExist() =>
typeof(NatsServer).GetMethod("AllPeersOffline", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
[Fact] // T:1681
public void JsClusteredStreamListRequest_Method_ShouldExist() =>
typeof(NatsServer).GetMethod("JsClusteredStreamListRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
[Fact] // T:1682
public void JsClusteredConsumerListRequest_Method_ShouldExist() =>
typeof(NatsServer).GetMethod("JsClusteredConsumerListRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
[Fact] // T:1683
public void EncodeStreamPurge_Method_ShouldExist() =>
typeof(JetStreamCluster).GetMethod("EncodeStreamPurge", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull();
[Fact] // T:1684
public void DecodeStreamPurge_Method_ShouldExist() =>
typeof(JetStreamCluster).GetMethod("DecodeStreamPurge", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull();
[Fact] // T:1685
public void JsClusteredConsumerDeleteRequest_Method_ShouldExist() =>
typeof(NatsServer).GetMethod("JsClusteredConsumerDeleteRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
[Fact] // T:1686
public void EncodeMsgDelete_Method_ShouldExist() =>
typeof(JetStreamCluster).GetMethod("EncodeMsgDelete", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull();
[Fact] // T:1687
public void DecodeMsgDelete_Method_ShouldExist() =>
typeof(JetStreamCluster).GetMethod("DecodeMsgDelete", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull();
[Fact] // T:1688
public void JsClusteredMsgDeleteRequest_Method_ShouldExist() =>
typeof(NatsServer).GetMethod("JsClusteredMsgDeleteRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
[Fact] // T:1689
public void EncodeAddStreamAssignment_Method_ShouldExist() =>
typeof(JetStreamCluster).GetMethod("EncodeAddStreamAssignment", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull();
[Fact] // T:1690
public void EncodeUpdateStreamAssignment_Method_ShouldExist() =>
typeof(JetStreamCluster).GetMethod("EncodeUpdateStreamAssignment", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull();
[Fact] // T:1691
public void EncodeDeleteStreamAssignment_Method_ShouldExist() =>
typeof(JetStreamCluster).GetMethod("EncodeDeleteStreamAssignment", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull();
[Fact] // T:1692
public void DecodeStreamAssignment_Method_ShouldExist() =>
typeof(JetStreamCluster).GetMethod("DecodeStreamAssignment", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull();
[Fact] // T:1693
public void DecodeStreamAssignmentConfig_Method_ShouldExist() =>
typeof(JetStreamCluster).GetMethod("DecodeStreamAssignmentConfig", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull();
}