Port batch 32 JS cluster meta feature implementations

This commit is contained in:
Joseph Doherty
2026-02-28 22:30:33 -05:00
parent e878246bed
commit 7e8b81b648
5 changed files with 1038 additions and 0 deletions

View File

@@ -127,6 +127,57 @@ public sealed partial class Account
return (server, jsa, null);
}
internal (NatsServer? Server, JetStream? JetStream, JsAccount? JetStreamAccount) GetJetStreamFromAccount()
{
_mu.EnterReadLock();
var jetStreamAccount = JetStream;
_mu.ExitReadLock();
if (jetStreamAccount == null)
return (null, null, null);
jetStreamAccount.Lock.EnterReadLock();
var jetStream = jetStreamAccount.Js as JetStream;
jetStreamAccount.Lock.ExitReadLock();
if (jetStream == null)
return (null, null, null);
return (jetStream.Server as NatsServer, jetStream, jetStreamAccount);
}
internal bool JetStreamIsStreamLeader(string stream)
{
var (server, jetStream, _) = GetJetStreamFromAccount();
if (server == null || jetStream == null)
return false;
jetStream.Lock.EnterReadLock();
try
{
return (jetStream.Cluster as JetStreamCluster)?.IsStreamLeader(Name, stream) == true;
}
finally
{
jetStream.Lock.ExitReadLock();
}
}
internal bool JetStreamIsConsumerLeader(string stream, string consumer)
{
var (server, jetStream, _) = GetJetStreamFromAccount();
if (server == null || jetStream == null)
return false;
jetStream.Lock.EnterReadLock();
try
{
return (jetStream.Cluster as JetStreamCluster)?.IsConsumerLeader(Name, stream, consumer) == true;
}
finally
{
jetStream.Lock.ExitReadLock();
}
}
internal (bool MaxBytesRequired, long MaxStreamBytes) MaxBytesLimits(StreamConfig? cfg)
{
_mu.EnterReadLock();

View File

@@ -90,6 +90,252 @@ internal sealed class JetStreamCluster
/// <summary>Duration of last meta-snapshot (nanoseconds).</summary>
public long LastMetaSnapDuration { get; set; }
internal static UnsupportedStreamAssignment NewUnsupportedStreamAssignment(
NatsServer server,
StreamAssignment assignment,
Exception? error)
{
var reason = "stopped";
if (error != null)
{
var msg = error.Message;
reason = msg.StartsWith("json:", StringComparison.Ordinal)
? $"unsupported - config error: {msg["json:".Length..].TrimStart()}"
: $"stopped - {msg}";
}
else if (assignment.Config != null && !JetStreamVersioning.SupportsRequiredApiLevel(assignment.Config.Metadata))
{
var required = JetStreamVersioning.GetRequiredApiLevel(assignment.Config.Metadata);
if (!string.IsNullOrEmpty(required))
reason = $"unsupported - required API level: {required}, current API level: {JetStreamVersioning.JsApiLevel}";
}
var config = assignment.Config ?? new StreamConfig { Name = assignment.Subject ?? string.Empty };
return new UnsupportedStreamAssignment
{
Reason = reason,
Info = new StreamInfo
{
Created = assignment.Created,
Config = JetStreamVersioning.SetDynamicStreamMetadata(config),
},
};
}
internal static UnsupportedConsumerAssignment NewUnsupportedConsumerAssignment(
ConsumerAssignment assignment,
Exception? error)
{
var reason = "stopped";
if (error != null)
{
var msg = error.Message;
reason = msg.StartsWith("json:", StringComparison.Ordinal)
? $"unsupported - config error: {msg["json:".Length..].TrimStart()}"
: $"stopped - {msg}";
}
else if (assignment.Config != null && !JetStreamVersioning.SupportsRequiredApiLevel(assignment.Config.Metadata))
{
var required = JetStreamVersioning.GetRequiredApiLevel(assignment.Config.Metadata);
if (!string.IsNullOrEmpty(required))
reason = $"unsupported - required API level: {required}, current API level: {JetStreamVersioning.JsApiLevel}";
}
var config = assignment.Config ?? new ConsumerConfig { Name = assignment.Name };
return new UnsupportedConsumerAssignment
{
Reason = reason,
Info = new ConsumerInfo
{
Stream = assignment.Stream,
Name = assignment.Name,
Created = assignment.Created,
Config = JetStreamVersioning.SetDynamicConsumerMetadata(config),
TimeStamp = DateTime.UtcNow,
},
};
}
internal bool IsLeader()
{
if (this == null)
return true;
return Meta != null && Meta.Leader();
}
internal bool IsStreamCurrent(string account, string stream)
{
if (Meta == null)
return false;
if (!Streams.TryGetValue(account, out var accountAssignments))
return false;
if (!accountAssignments.TryGetValue(stream, out var assignment))
return false;
if (assignment.Group == null)
return false;
return assignment.Group.Node == null || assignment.Group.Node.Current();
}
internal bool IsStreamAssigned(Account account, string stream)
{
if (Meta == null)
return false;
if (!Streams.TryGetValue(account.Name, out var accountAssignments))
return false;
if (!accountAssignments.TryGetValue(stream, out var assignment))
return false;
var peers = assignment.Group?.Peers ?? [];
return peers.Contains(Meta.ID(), StringComparer.Ordinal);
}
internal bool IsStreamLeader(string account, string stream)
{
if (Meta == null)
return false;
if (!Streams.TryGetValue(account, out var accountAssignments))
return false;
if (!accountAssignments.TryGetValue(stream, out var assignment))
return false;
var group = assignment.Group;
if (group == null)
return false;
var ourId = Meta.ID();
if (!group.Peers.Contains(ourId, StringComparer.Ordinal))
return false;
return group.Peers.Length == 1 || (group.Node != null && group.Node.Leader());
}
internal bool IsConsumerLeader(string account, string stream, string consumer)
{
if (Meta == null)
return false;
if (!Streams.TryGetValue(account, out var accountAssignments))
return false;
if (!accountAssignments.TryGetValue(stream, out var assignment))
return false;
if (assignment.Consumers == null || !assignment.Consumers.TryGetValue(consumer, out var consumerAssignment))
return false;
var group = consumerAssignment.Group;
if (group == null)
return false;
var ourId = Meta.ID();
if (!group.Peers.Contains(ourId, StringComparer.Ordinal))
return false;
return group.Peers.Length == 1 || (group.Node != null && group.Node.Leader());
}
internal void TrackInflightStreamProposal(string accountName, StreamAssignment assignment, bool deleted)
{
if (!InflightStreams.TryGetValue(accountName, out var streams))
{
streams = new Dictionary<string, InflightStreamInfo>(StringComparer.Ordinal);
InflightStreams[accountName] = streams;
}
var streamName = assignment.Config?.Name ?? assignment.Subject ?? string.Empty;
if (streams.TryGetValue(streamName, out var inflight))
{
inflight.Ops++;
inflight.Deleted = deleted;
inflight.Assignment = assignment;
return;
}
streams[streamName] = new InflightStreamInfo
{
Ops = 1,
Deleted = deleted,
Assignment = assignment,
};
}
internal void RemoveInflightStreamProposal(string accountName, string streamName)
{
if (!InflightStreams.TryGetValue(accountName, out var streams))
return;
if (!streams.TryGetValue(streamName, out var inflight))
return;
if (inflight.Ops > 1)
{
inflight.Ops--;
return;
}
streams.Remove(streamName);
if (streams.Count == 0)
InflightStreams.Remove(accountName);
}
internal void TrackInflightConsumerProposal(
string accountName,
string streamName,
ConsumerAssignment assignment,
bool deleted)
{
if (!InflightConsumers.TryGetValue(accountName, out var streams))
{
streams = new Dictionary<string, Dictionary<string, InflightConsumerInfo>>(StringComparer.Ordinal);
InflightConsumers[accountName] = streams;
}
if (!streams.TryGetValue(streamName, out var consumers))
{
consumers = new Dictionary<string, InflightConsumerInfo>(StringComparer.Ordinal);
streams[streamName] = consumers;
}
if (consumers.TryGetValue(assignment.Name, out var inflight))
{
inflight.Ops++;
inflight.Deleted = deleted;
inflight.Assignment = assignment;
return;
}
consumers[assignment.Name] = new InflightConsumerInfo
{
Ops = 1,
Deleted = deleted,
Assignment = assignment,
};
}
internal void RemoveInflightConsumerProposal(string accountName, string streamName, string consumerName)
{
if (!InflightConsumers.TryGetValue(accountName, out var streams))
return;
if (!streams.TryGetValue(streamName, out var consumers))
return;
if (!consumers.TryGetValue(consumerName, out var inflight))
return;
if (inflight.Ops > 1)
{
inflight.Ops--;
return;
}
consumers.Remove(consumerName);
if (consumers.Count == 0)
streams.Remove(streamName);
if (streams.Count == 0)
InflightConsumers.Remove(accountName);
}
}
// ============================================================================
@@ -255,6 +501,28 @@ internal sealed class UnsupportedStreamAssignment
public object? SysClient { get; set; }
/// <summary>Info subscription (object to avoid session dep).</summary>
public object? InfoSub { get; set; }
internal void SetupInfoSub(NatsServer server, StreamAssignment assignment)
{
if (InfoSub != null)
return;
SysClient = server.CreateInternalJetStreamClient();
InfoSub = $"{assignment.Client?.ServiceAccount() ?? string.Empty}:{assignment.Config?.Name ?? string.Empty}";
}
internal StreamInfoClusterResponse HandleClusterStreamInfoRequest() =>
new()
{
OfflineReason = Reason,
StreamInfo = Info,
};
internal void CloseInfoSub()
{
InfoSub = null;
SysClient = null;
}
}
// ============================================================================
@@ -301,6 +569,28 @@ internal sealed class UnsupportedConsumerAssignment
public object? SysClient { get; set; }
/// <summary>Info subscription (object to avoid session dep).</summary>
public object? InfoSub { get; set; }
internal void SetupInfoSub(NatsServer server, ConsumerAssignment assignment)
{
if (InfoSub != null)
return;
SysClient = server.CreateInternalJetStreamClient();
InfoSub = $"{assignment.Client?.ServiceAccount() ?? string.Empty}:{assignment.Stream}:{assignment.Name}";
}
internal ConsumerInfoClusterResponse HandleClusterConsumerInfoRequest() =>
new()
{
OfflineReason = Reason,
ConsumerInfo = Info,
};
internal void CloseInfoSub()
{
InfoSub = null;
SysClient = null;
}
}
// ============================================================================
@@ -374,6 +664,85 @@ internal sealed class RecoveryUpdates
public Dictionary<string, StreamAssignment> AddStreams { get; set; } = new();
public Dictionary<string, StreamAssignment> UpdateStreams { get; set; } = new();
public Dictionary<string, Dictionary<string, ConsumerAssignment>> UpdateConsumers { get; set; } = new();
internal void RemoveStream(StreamAssignment assignment)
{
var key = StreamRecoveryKey(assignment);
RemoveStreams[key] = assignment;
AddStreams.Remove(key);
UpdateStreams.Remove(key);
UpdateConsumers.Remove(key);
RemoveConsumers.Remove(key);
}
internal void AddStream(StreamAssignment assignment)
{
var key = StreamRecoveryKey(assignment);
AddStreams[key] = assignment;
}
internal void UpdateStream(StreamAssignment assignment)
{
var key = StreamRecoveryKey(assignment);
UpdateStreams[key] = assignment;
}
internal void RemoveConsumer(ConsumerAssignment assignment)
{
var streamKey = ConsumerStreamRecoveryKey(assignment);
var consumerKey = ConsumerRecoveryKey(assignment);
if (!RemoveConsumers.TryGetValue(streamKey, out var consumers))
{
consumers = new Dictionary<string, ConsumerAssignment>(StringComparer.Ordinal);
RemoveConsumers[streamKey] = consumers;
}
consumers[consumerKey] = assignment;
if (UpdateConsumers.TryGetValue(streamKey, out var updates))
updates.Remove(consumerKey);
}
internal void AddOrUpdateConsumer(ConsumerAssignment assignment)
{
var streamKey = ConsumerStreamRecoveryKey(assignment);
var consumerKey = ConsumerRecoveryKey(assignment);
if (!UpdateConsumers.TryGetValue(streamKey, out var consumers))
{
consumers = new Dictionary<string, ConsumerAssignment>(StringComparer.Ordinal);
UpdateConsumers[streamKey] = consumers;
}
consumers[consumerKey] = assignment;
}
private static string StreamRecoveryKey(StreamAssignment assignment) =>
$"{assignment.Client?.ServiceAccount() ?? string.Empty}:{assignment.Config?.Name ?? assignment.Subject ?? string.Empty}";
private static string ConsumerStreamRecoveryKey(ConsumerAssignment assignment) =>
$"{assignment.Client?.ServiceAccount() ?? string.Empty}:{assignment.Stream}";
private static string ConsumerRecoveryKey(ConsumerAssignment assignment) =>
$"{assignment.Stream}:{assignment.Name}";
}
internal sealed class StreamInfoClusterResponse
{
[JsonPropertyName("offline_reason")]
public string? OfflineReason { get; set; }
[JsonPropertyName("stream_info")]
public StreamInfo? StreamInfo { get; set; }
}
internal sealed class ConsumerInfoClusterResponse
{
[JsonPropertyName("offline_reason")]
public string? OfflineReason { get; set; }
[JsonPropertyName("consumer_info")]
public ConsumerInfo? ConsumerInfo { get; set; }
}
// ============================================================================

View File

@@ -324,6 +324,297 @@ internal sealed class JetStreamEngine(JetStream state)
Interlocked.Add(ref _state.StoreReserved, -cfg.MaxBytes);
}
internal bool IsStreamHealthy(Account? account, StreamAssignment? assignment)
{
if (assignment?.Unsupported != null)
return true;
if (_state.Cluster is not JetStreamCluster)
return true;
if (assignment?.Group == null || assignment.Config == null || account == null)
return false;
var (stream, _) = account.LookupStream(assignment.Config.Name);
if (stream == null)
return false;
if (assignment.Config.Replicas <= 1)
return true;
var node = assignment.Group.Node;
return node != null && node.Healthy();
}
internal bool IsConsumerHealthy(NatsStream? stream, string consumer, ConsumerAssignment? assignment)
{
if (assignment?.Unsupported != null)
return true;
if (_state.Cluster is not JetStreamCluster)
return true;
if (stream == null || assignment?.Group == null)
return false;
if (assignment.Group.Peers.Length <= 1)
return true;
return assignment.Group.Node?.Healthy() == true;
}
internal bool SubjectsOverlap(string account, IEnumerable<string> subjects, StreamAssignment? ownAssignment = null)
{
if (_state.Cluster is not JetStreamCluster cluster)
return false;
var targetSubjects = subjects.ToArray();
if (!cluster.Streams.TryGetValue(account, out var accountAssignments))
return false;
foreach (var assignment in accountAssignments.Values)
{
if (ownAssignment != null &&
assignment.Config?.Name != null &&
assignment.Config.Name == ownAssignment.Config?.Name)
{
continue;
}
var existingSubjects = assignment.Config?.Subjects ?? [];
foreach (var existing in existingSubjects)
{
foreach (var target in targetSubjects)
{
if (Internal.DataStructures.SubscriptionIndex.SubjectsCollide(target, existing))
return true;
}
}
}
return false;
}
internal bool IsClustered() => _state.Cluster is JetStreamCluster;
internal bool IsClusteredNoLock() => Interlocked.CompareExchange(ref _state.Clustered, 0, 0) == 1;
internal Exception? SetupMetaGroup()
{
var server = Server();
if (server == null)
return new InvalidOperationException("jetstream server unavailable");
_state.Lock.EnterWriteLock();
try
{
if (_state.Cluster is JetStreamCluster)
{
Interlocked.Exchange(ref _state.Clustered, 1);
return null;
}
var systemAccount = server.SystemAccount();
if (systemAccount == null)
return ServerErrors.ErrNoSysAccount;
var storeDir = Path.Combine(_state.Config.StoreDir, systemAccount.Name, "_js_", "_meta_");
Directory.CreateDirectory(storeDir);
var raftConfig = new RaftConfig
{
Name = "_meta_",
Store = storeDir,
Recovering = true,
};
var (meta, error) = server.StartRaftNode(systemAccount.Name, raftConfig);
if (error != null)
return error;
_state.Cluster = new JetStreamCluster
{
Meta = meta,
Streams = new Dictionary<string, Dictionary<string, StreamAssignment>>(StringComparer.Ordinal),
Server = server,
Client = server.CreateInternalJetStreamClient(),
Qch = System.Threading.Channels.Channel.CreateUnbounded<bool>(),
Stopped = System.Threading.Channels.Channel.CreateUnbounded<bool>(),
};
_state.MetaRecovering = true;
Interlocked.Exchange(ref _state.Clustered, 1);
return null;
}
finally
{
_state.Lock.ExitWriteLock();
}
}
internal IRaftNode? GetMetaGroup()
{
_state.Lock.EnterReadLock();
try
{
return (_state.Cluster as JetStreamCluster)?.Meta;
}
finally
{
_state.Lock.ExitReadLock();
}
}
internal NatsServer? Server() => _state.Server as NatsServer;
internal bool IsLeaderless()
{
var meta = GetMetaGroup();
if (meta == null)
return false;
if (!meta.Leaderless())
return false;
return DateTime.UtcNow - meta.Created() > TimeSpan.FromSeconds(10);
}
internal bool IsGroupLeaderless(RaftGroup? group)
{
if (group?.Node == null)
return false;
var meta = GetMetaGroup();
if (meta == null)
return false;
if (!group.Peers.Contains(meta.ID(), StringComparer.Ordinal))
return false;
if (!group.Node.Leaderless())
return false;
return DateTime.UtcNow - group.Node.Created() > TimeSpan.FromSeconds(10);
}
internal System.Threading.Channels.Channel<bool>? ClusterQuitC()
{
_state.Lock.EnterReadLock();
try
{
return (_state.Cluster as JetStreamCluster)?.Qch;
}
finally
{
_state.Lock.ExitReadLock();
}
}
internal System.Threading.Channels.Channel<bool>? ClusterStoppedC()
{
_state.Lock.EnterReadLock();
try
{
return (_state.Cluster as JetStreamCluster)?.Stopped;
}
finally
{
_state.Lock.ExitReadLock();
}
}
internal void SetMetaRecovering()
{
_state.Lock.EnterWriteLock();
try
{
if (_state.Cluster is JetStreamCluster)
_state.MetaRecovering = true;
}
finally
{
_state.Lock.ExitWriteLock();
}
}
internal void ClearMetaRecovering()
{
_state.Lock.EnterWriteLock();
try
{
_state.MetaRecovering = false;
}
finally
{
_state.Lock.ExitWriteLock();
}
}
internal bool IsMetaRecovering()
{
_state.Lock.EnterReadLock();
try
{
return _state.MetaRecovering;
}
finally
{
_state.Lock.ExitReadLock();
}
}
internal void CheckForOrphans()
{
var server = Server();
if (server == null)
return;
_state.Lock.EnterReadLock();
var meta = (_state.Cluster as JetStreamCluster)?.Meta;
_state.Lock.ExitReadLock();
if (meta == null || meta.Leaderless() || !meta.Healthy())
return;
var (streams, consumers) = GetOrphans();
foreach (var stream in streams)
stream.Delete();
foreach (var consumer in consumers)
consumer.Delete();
}
internal (List<NatsStream> Streams, List<NatsConsumer> Consumers) GetOrphans()
{
var orphanStreams = new List<NatsStream>();
var orphanConsumers = new List<NatsConsumer>();
_state.Lock.EnterReadLock();
try
{
if (_state.Cluster is not JetStreamCluster cluster)
return (orphanStreams, orphanConsumers);
foreach (var (accountName, jsa) in _state.Accounts)
{
cluster.Streams.TryGetValue(accountName, out var assignments);
assignments ??= new Dictionary<string, StreamAssignment>(StringComparer.Ordinal);
jsa.Lock.EnterReadLock();
try
{
foreach (var (streamName, streamValue) in jsa.Streams)
{
if (!assignments.ContainsKey(streamName) && streamValue is NatsStream stream)
orphanStreams.Add(stream);
}
}
finally
{
jsa.Lock.ExitReadLock();
}
}
}
finally
{
_state.Lock.ExitReadLock();
}
return (orphanStreams, orphanConsumers);
}
internal static string FriendlyBytes<T>(T bytes)
where T : struct, IConvertible
{

View File

@@ -174,6 +174,33 @@ internal sealed partial class JsAccount
}
}
internal bool StreamAssigned(string stream)
{
Lock.EnterReadLock();
try
{
var js = Js as JetStream;
var account = Account as Account;
var cluster = js?.Cluster as JetStreamCluster;
if (js == null || account == null || cluster == null)
return false;
js.Lock.EnterReadLock();
try
{
return cluster.IsStreamAssigned(account, stream);
}
finally
{
js.Lock.ExitReadLock();
}
}
finally
{
Lock.ExitReadLock();
}
}
internal Account? Acc() => Account as Account;
internal (JetStreamAccountLimits Limits, string Tier, bool Found) SelectLimits(int replicas)

View File

@@ -0,0 +1,300 @@
using System.Text.Json;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class NatsServer
{
internal (int JetStreamEnabled, int Total) TrackedJetStreamServers()
{
if (!IsRunning() || _sys == null)
return (-1, -1);
var js = 0;
var total = 0;
foreach (var value in _nodeToInfo.Values)
{
if (value is not NodeInfo info)
continue;
if (info.Js)
js++;
total++;
}
return (js, total);
}
internal (JetStream? JetStream, JetStreamCluster? Cluster) GetJetStreamCluster()
{
if (IsShuttingDown())
return (null, null);
var js = GetJetStreamState();
if (js == null)
return (null, null);
return (js, js.Cluster as JetStreamCluster);
}
public bool JetStreamIsClustered()
{
var js = GetJetStreamState();
return js != null && Interlocked.CompareExchange(ref js.Clustered, 0, 0) == 1;
}
public bool JetStreamIsLeader()
{
var (_, cluster) = GetJetStreamCluster();
return cluster?.IsLeader() == true;
}
public bool JetStreamIsCurrent()
{
var js = GetJetStreamState();
if (js == null)
return false;
js.Lock.EnterReadLock();
try
{
if (js.Cluster is not JetStreamCluster cluster)
return true;
return cluster.Meta?.Current() == true;
}
finally
{
js.Lock.ExitReadLock();
}
}
public Exception? JetStreamSnapshotMeta()
{
var js = GetJetStreamState();
if (js == null)
return ToException(JsApiErrors.NewJSNotEnabledError());
js.Lock.EnterReadLock();
JetStreamCluster? cluster;
IRaftNode? meta;
try
{
cluster = js.Cluster as JetStreamCluster;
meta = cluster?.Meta;
if (cluster == null || meta == null)
return ToException(JsApiErrors.NewJSClusterNotActiveError());
if (!cluster.IsLeader())
return ToException(JsApiErrors.NewJSClusterNotLeaderError());
}
finally
{
js.Lock.ExitReadLock();
}
var snapshot = JsonSerializer.SerializeToUtf8Bytes(cluster!.Streams);
meta!.InstallSnapshot(snapshot, force: false);
return null;
}
public Exception? JetStreamStepdownStream(string account, string stream)
{
var (_, cluster) = GetJetStreamCluster();
if (cluster == null)
return ToException(JsApiErrors.NewJSClusterNotActiveError());
var (acc, error) = LookupAccount(account);
if (error != null)
return error;
if (acc == null)
return new InvalidOperationException("account not found");
var (_, streamError) = acc.LookupStream(stream);
if (streamError != null)
return streamError;
if (cluster.Streams.TryGetValue(account, out var assignments) &&
assignments.TryGetValue(stream, out var assignment))
{
assignment.Group?.Node?.StepDown();
}
return null;
}
public Exception? JetStreamStepdownConsumer(string account, string stream, string consumer)
{
var (_, cluster) = GetJetStreamCluster();
if (cluster == null)
return ToException(JsApiErrors.NewJSClusterNotActiveError());
var (acc, error) = LookupAccount(account);
if (error != null)
return error;
if (acc == null)
return new InvalidOperationException("account not found");
var (_, streamError) = acc.LookupStream(stream);
if (streamError != null)
return streamError;
if (!cluster.Streams.TryGetValue(account, out var assignments) ||
!assignments.TryGetValue(stream, out var assignment) ||
assignment.Consumers == null ||
!assignment.Consumers.TryGetValue(consumer, out var consumerAssignment))
{
return ToException(JsApiErrors.NewJSConsumerNotFoundError());
}
consumerAssignment.Group?.Node?.StepDown();
return null;
}
public Exception? JetStreamSnapshotStream(string account, string stream)
{
var (_, cluster) = GetJetStreamCluster();
if (cluster == null)
return ToException(JsApiErrors.NewJSClusterNotActiveError());
var (acc, error) = LookupAccount(account);
if (error != null)
return error;
if (acc == null)
return new InvalidOperationException("account not found");
var (natsStream, streamError) = acc.LookupStream(stream);
if (streamError != null)
return streamError;
if (natsStream == null)
return null;
if (cluster.Streams.TryGetValue(account, out var assignments) &&
assignments.TryGetValue(stream, out var assignment))
{
var snapshot = JsonSerializer.SerializeToUtf8Bytes(natsStream.State());
assignment.Group?.Node?.InstallSnapshot(snapshot, force: false);
}
return null;
}
public string[] JetStreamClusterPeers()
{
var js = GetJetStreamState();
if (js == null)
return [];
js.Lock.EnterReadLock();
try
{
if (js.Cluster is not JetStreamCluster cluster || cluster.Meta == null || !cluster.IsLeader())
return [];
var names = new List<string>();
foreach (var peer in cluster.Meta.Peers())
{
if (!_nodeToInfo.TryGetValue(peer.Id, out var value) || value is not NodeInfo info)
continue;
if (info.Offline || !info.Js || info.Stats == null)
continue;
names.Add(info.Name);
}
return [.. names];
}
finally
{
js.Lock.ExitReadLock();
}
}
public bool JetStreamIsStreamLeader(string account, string stream)
{
var (js, cluster) = GetJetStreamCluster();
if (js == null || cluster == null)
return false;
js.Lock.EnterReadLock();
try
{
return cluster.IsStreamLeader(account, stream);
}
finally
{
js.Lock.ExitReadLock();
}
}
public bool JetStreamIsStreamCurrent(string account, string stream)
{
var js = GetJetStreamState();
if (js == null)
return false;
js.Lock.EnterReadLock();
try
{
return (js.Cluster as JetStreamCluster)?.IsStreamCurrent(account, stream) ?? false;
}
finally
{
js.Lock.ExitReadLock();
}
}
public bool JetStreamIsConsumerLeader(string account, string stream, string consumer)
{
var (js, cluster) = GetJetStreamCluster();
if (js == null || cluster == null)
return false;
js.Lock.EnterReadLock();
try
{
return cluster.IsConsumerLeader(account, stream, consumer);
}
finally
{
js.Lock.ExitReadLock();
}
}
internal Exception? EnableJetStreamClustering()
{
if (!IsRunning())
return null;
var js = GetJetStream();
if (js == null)
return ToException(JsApiErrors.NewJSNotEnabledForAccountError());
if (js.IsClustered())
return null;
return js.SetupMetaGroup();
}
public bool JetStreamIsStreamAssigned(string account, string stream)
{
var (js, cluster) = GetJetStreamCluster();
if (js == null || cluster == null)
return false;
var (acc, _) = LookupAccount(account);
if (acc == null)
return false;
js.Lock.EnterReadLock();
try
{
return cluster.IsStreamAssigned(acc, stream);
}
finally
{
js.Lock.ExitReadLock();
}
}
private static Exception ToException(JsApiError error) =>
new(error.Description ?? "jetstream error");
}