diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs index c0c8865..55401f2 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs @@ -127,6 +127,57 @@ public sealed partial class Account return (server, jsa, null); } + internal (NatsServer? Server, JetStream? JetStream, JsAccount? JetStreamAccount) GetJetStreamFromAccount() + { + _mu.EnterReadLock(); + var jetStreamAccount = JetStream; + _mu.ExitReadLock(); + if (jetStreamAccount == null) + return (null, null, null); + + jetStreamAccount.Lock.EnterReadLock(); + var jetStream = jetStreamAccount.Js as JetStream; + jetStreamAccount.Lock.ExitReadLock(); + if (jetStream == null) + return (null, null, null); + + return (jetStream.Server as NatsServer, jetStream, jetStreamAccount); + } + + internal bool JetStreamIsStreamLeader(string stream) + { + var (server, jetStream, _) = GetJetStreamFromAccount(); + if (server == null || jetStream == null) + return false; + + jetStream.Lock.EnterReadLock(); + try + { + return (jetStream.Cluster as JetStreamCluster)?.IsStreamLeader(Name, stream) == true; + } + finally + { + jetStream.Lock.ExitReadLock(); + } + } + + internal bool JetStreamIsConsumerLeader(string stream, string consumer) + { + var (server, jetStream, _) = GetJetStreamFromAccount(); + if (server == null || jetStream == null) + return false; + + jetStream.Lock.EnterReadLock(); + try + { + return (jetStream.Cluster as JetStreamCluster)?.IsConsumerLeader(Name, stream, consumer) == true; + } + finally + { + jetStream.Lock.ExitReadLock(); + } + } + internal (bool MaxBytesRequired, long MaxStreamBytes) MaxBytesLimits(StreamConfig? cfg) { _mu.EnterReadLock(); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs index d43900e..d563da5 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs @@ -90,6 +90,252 @@ internal sealed class JetStreamCluster /// Duration of last meta-snapshot (nanoseconds). public long LastMetaSnapDuration { get; set; } + + internal static UnsupportedStreamAssignment NewUnsupportedStreamAssignment( + NatsServer server, + StreamAssignment assignment, + Exception? error) + { + var reason = "stopped"; + if (error != null) + { + var msg = error.Message; + reason = msg.StartsWith("json:", StringComparison.Ordinal) + ? $"unsupported - config error: {msg["json:".Length..].TrimStart()}" + : $"stopped - {msg}"; + } + else if (assignment.Config != null && !JetStreamVersioning.SupportsRequiredApiLevel(assignment.Config.Metadata)) + { + var required = JetStreamVersioning.GetRequiredApiLevel(assignment.Config.Metadata); + if (!string.IsNullOrEmpty(required)) + reason = $"unsupported - required API level: {required}, current API level: {JetStreamVersioning.JsApiLevel}"; + } + + var config = assignment.Config ?? new StreamConfig { Name = assignment.Subject ?? string.Empty }; + return new UnsupportedStreamAssignment + { + Reason = reason, + Info = new StreamInfo + { + Created = assignment.Created, + Config = JetStreamVersioning.SetDynamicStreamMetadata(config), + }, + }; + } + + internal static UnsupportedConsumerAssignment NewUnsupportedConsumerAssignment( + ConsumerAssignment assignment, + Exception? error) + { + var reason = "stopped"; + if (error != null) + { + var msg = error.Message; + reason = msg.StartsWith("json:", StringComparison.Ordinal) + ? $"unsupported - config error: {msg["json:".Length..].TrimStart()}" + : $"stopped - {msg}"; + } + else if (assignment.Config != null && !JetStreamVersioning.SupportsRequiredApiLevel(assignment.Config.Metadata)) + { + var required = JetStreamVersioning.GetRequiredApiLevel(assignment.Config.Metadata); + if (!string.IsNullOrEmpty(required)) + reason = $"unsupported - required API level: {required}, current API level: {JetStreamVersioning.JsApiLevel}"; + } + + var config = assignment.Config ?? new ConsumerConfig { Name = assignment.Name }; + return new UnsupportedConsumerAssignment + { + Reason = reason, + Info = new ConsumerInfo + { + Stream = assignment.Stream, + Name = assignment.Name, + Created = assignment.Created, + Config = JetStreamVersioning.SetDynamicConsumerMetadata(config), + TimeStamp = DateTime.UtcNow, + }, + }; + } + + internal bool IsLeader() + { + if (this == null) + return true; + + return Meta != null && Meta.Leader(); + } + + internal bool IsStreamCurrent(string account, string stream) + { + if (Meta == null) + return false; + + if (!Streams.TryGetValue(account, out var accountAssignments)) + return false; + if (!accountAssignments.TryGetValue(stream, out var assignment)) + return false; + if (assignment.Group == null) + return false; + + return assignment.Group.Node == null || assignment.Group.Node.Current(); + } + + internal bool IsStreamAssigned(Account account, string stream) + { + if (Meta == null) + return false; + + if (!Streams.TryGetValue(account.Name, out var accountAssignments)) + return false; + if (!accountAssignments.TryGetValue(stream, out var assignment)) + return false; + + var peers = assignment.Group?.Peers ?? []; + return peers.Contains(Meta.ID(), StringComparer.Ordinal); + } + + internal bool IsStreamLeader(string account, string stream) + { + if (Meta == null) + return false; + + if (!Streams.TryGetValue(account, out var accountAssignments)) + return false; + if (!accountAssignments.TryGetValue(stream, out var assignment)) + return false; + + var group = assignment.Group; + if (group == null) + return false; + + var ourId = Meta.ID(); + if (!group.Peers.Contains(ourId, StringComparer.Ordinal)) + return false; + + return group.Peers.Length == 1 || (group.Node != null && group.Node.Leader()); + } + + internal bool IsConsumerLeader(string account, string stream, string consumer) + { + if (Meta == null) + return false; + + if (!Streams.TryGetValue(account, out var accountAssignments)) + return false; + if (!accountAssignments.TryGetValue(stream, out var assignment)) + return false; + if (assignment.Consumers == null || !assignment.Consumers.TryGetValue(consumer, out var consumerAssignment)) + return false; + + var group = consumerAssignment.Group; + if (group == null) + return false; + + var ourId = Meta.ID(); + if (!group.Peers.Contains(ourId, StringComparer.Ordinal)) + return false; + + return group.Peers.Length == 1 || (group.Node != null && group.Node.Leader()); + } + + internal void TrackInflightStreamProposal(string accountName, StreamAssignment assignment, bool deleted) + { + if (!InflightStreams.TryGetValue(accountName, out var streams)) + { + streams = new Dictionary(StringComparer.Ordinal); + InflightStreams[accountName] = streams; + } + + var streamName = assignment.Config?.Name ?? assignment.Subject ?? string.Empty; + if (streams.TryGetValue(streamName, out var inflight)) + { + inflight.Ops++; + inflight.Deleted = deleted; + inflight.Assignment = assignment; + return; + } + + streams[streamName] = new InflightStreamInfo + { + Ops = 1, + Deleted = deleted, + Assignment = assignment, + }; + } + + internal void RemoveInflightStreamProposal(string accountName, string streamName) + { + if (!InflightStreams.TryGetValue(accountName, out var streams)) + return; + if (!streams.TryGetValue(streamName, out var inflight)) + return; + + if (inflight.Ops > 1) + { + inflight.Ops--; + return; + } + + streams.Remove(streamName); + if (streams.Count == 0) + InflightStreams.Remove(accountName); + } + + internal void TrackInflightConsumerProposal( + string accountName, + string streamName, + ConsumerAssignment assignment, + bool deleted) + { + if (!InflightConsumers.TryGetValue(accountName, out var streams)) + { + streams = new Dictionary>(StringComparer.Ordinal); + InflightConsumers[accountName] = streams; + } + + if (!streams.TryGetValue(streamName, out var consumers)) + { + consumers = new Dictionary(StringComparer.Ordinal); + streams[streamName] = consumers; + } + + if (consumers.TryGetValue(assignment.Name, out var inflight)) + { + inflight.Ops++; + inflight.Deleted = deleted; + inflight.Assignment = assignment; + return; + } + + consumers[assignment.Name] = new InflightConsumerInfo + { + Ops = 1, + Deleted = deleted, + Assignment = assignment, + }; + } + + internal void RemoveInflightConsumerProposal(string accountName, string streamName, string consumerName) + { + if (!InflightConsumers.TryGetValue(accountName, out var streams)) + return; + if (!streams.TryGetValue(streamName, out var consumers)) + return; + if (!consumers.TryGetValue(consumerName, out var inflight)) + return; + + if (inflight.Ops > 1) + { + inflight.Ops--; + return; + } + + consumers.Remove(consumerName); + if (consumers.Count == 0) + streams.Remove(streamName); + if (streams.Count == 0) + InflightConsumers.Remove(accountName); + } } // ============================================================================ @@ -255,6 +501,28 @@ internal sealed class UnsupportedStreamAssignment public object? SysClient { get; set; } /// Info subscription (object to avoid session dep). public object? InfoSub { get; set; } + + internal void SetupInfoSub(NatsServer server, StreamAssignment assignment) + { + if (InfoSub != null) + return; + + SysClient = server.CreateInternalJetStreamClient(); + InfoSub = $"{assignment.Client?.ServiceAccount() ?? string.Empty}:{assignment.Config?.Name ?? string.Empty}"; + } + + internal StreamInfoClusterResponse HandleClusterStreamInfoRequest() => + new() + { + OfflineReason = Reason, + StreamInfo = Info, + }; + + internal void CloseInfoSub() + { + InfoSub = null; + SysClient = null; + } } // ============================================================================ @@ -301,6 +569,28 @@ internal sealed class UnsupportedConsumerAssignment public object? SysClient { get; set; } /// Info subscription (object to avoid session dep). public object? InfoSub { get; set; } + + internal void SetupInfoSub(NatsServer server, ConsumerAssignment assignment) + { + if (InfoSub != null) + return; + + SysClient = server.CreateInternalJetStreamClient(); + InfoSub = $"{assignment.Client?.ServiceAccount() ?? string.Empty}:{assignment.Stream}:{assignment.Name}"; + } + + internal ConsumerInfoClusterResponse HandleClusterConsumerInfoRequest() => + new() + { + OfflineReason = Reason, + ConsumerInfo = Info, + }; + + internal void CloseInfoSub() + { + InfoSub = null; + SysClient = null; + } } // ============================================================================ @@ -374,6 +664,85 @@ internal sealed class RecoveryUpdates public Dictionary AddStreams { get; set; } = new(); public Dictionary UpdateStreams { get; set; } = new(); public Dictionary> UpdateConsumers { get; set; } = new(); + + internal void RemoveStream(StreamAssignment assignment) + { + var key = StreamRecoveryKey(assignment); + RemoveStreams[key] = assignment; + AddStreams.Remove(key); + UpdateStreams.Remove(key); + UpdateConsumers.Remove(key); + RemoveConsumers.Remove(key); + } + + internal void AddStream(StreamAssignment assignment) + { + var key = StreamRecoveryKey(assignment); + AddStreams[key] = assignment; + } + + internal void UpdateStream(StreamAssignment assignment) + { + var key = StreamRecoveryKey(assignment); + UpdateStreams[key] = assignment; + } + + internal void RemoveConsumer(ConsumerAssignment assignment) + { + var streamKey = ConsumerStreamRecoveryKey(assignment); + var consumerKey = ConsumerRecoveryKey(assignment); + + if (!RemoveConsumers.TryGetValue(streamKey, out var consumers)) + { + consumers = new Dictionary(StringComparer.Ordinal); + RemoveConsumers[streamKey] = consumers; + } + + consumers[consumerKey] = assignment; + if (UpdateConsumers.TryGetValue(streamKey, out var updates)) + updates.Remove(consumerKey); + } + + internal void AddOrUpdateConsumer(ConsumerAssignment assignment) + { + var streamKey = ConsumerStreamRecoveryKey(assignment); + var consumerKey = ConsumerRecoveryKey(assignment); + + if (!UpdateConsumers.TryGetValue(streamKey, out var consumers)) + { + consumers = new Dictionary(StringComparer.Ordinal); + UpdateConsumers[streamKey] = consumers; + } + + consumers[consumerKey] = assignment; + } + + private static string StreamRecoveryKey(StreamAssignment assignment) => + $"{assignment.Client?.ServiceAccount() ?? string.Empty}:{assignment.Config?.Name ?? assignment.Subject ?? string.Empty}"; + + private static string ConsumerStreamRecoveryKey(ConsumerAssignment assignment) => + $"{assignment.Client?.ServiceAccount() ?? string.Empty}:{assignment.Stream}"; + + private static string ConsumerRecoveryKey(ConsumerAssignment assignment) => + $"{assignment.Stream}:{assignment.Name}"; +} + +internal sealed class StreamInfoClusterResponse +{ + [JsonPropertyName("offline_reason")] + public string? OfflineReason { get; set; } + + [JsonPropertyName("stream_info")] + public StreamInfo? StreamInfo { get; set; } +} + +internal sealed class ConsumerInfoClusterResponse +{ + [JsonPropertyName("offline_reason")] + public string? OfflineReason { get; set; } + + [JsonPropertyName("consumer_info")] + public ConsumerInfo? ConsumerInfo { get; set; } } // ============================================================================ diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs index 522bc08..6237f85 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs @@ -324,6 +324,297 @@ internal sealed class JetStreamEngine(JetStream state) Interlocked.Add(ref _state.StoreReserved, -cfg.MaxBytes); } + internal bool IsStreamHealthy(Account? account, StreamAssignment? assignment) + { + if (assignment?.Unsupported != null) + return true; + if (_state.Cluster is not JetStreamCluster) + return true; + if (assignment?.Group == null || assignment.Config == null || account == null) + return false; + + var (stream, _) = account.LookupStream(assignment.Config.Name); + if (stream == null) + return false; + + if (assignment.Config.Replicas <= 1) + return true; + + var node = assignment.Group.Node; + return node != null && node.Healthy(); + } + + internal bool IsConsumerHealthy(NatsStream? stream, string consumer, ConsumerAssignment? assignment) + { + if (assignment?.Unsupported != null) + return true; + if (_state.Cluster is not JetStreamCluster) + return true; + if (stream == null || assignment?.Group == null) + return false; + + if (assignment.Group.Peers.Length <= 1) + return true; + + return assignment.Group.Node?.Healthy() == true; + } + + internal bool SubjectsOverlap(string account, IEnumerable subjects, StreamAssignment? ownAssignment = null) + { + if (_state.Cluster is not JetStreamCluster cluster) + return false; + + var targetSubjects = subjects.ToArray(); + if (!cluster.Streams.TryGetValue(account, out var accountAssignments)) + return false; + + foreach (var assignment in accountAssignments.Values) + { + if (ownAssignment != null && + assignment.Config?.Name != null && + assignment.Config.Name == ownAssignment.Config?.Name) + { + continue; + } + + var existingSubjects = assignment.Config?.Subjects ?? []; + foreach (var existing in existingSubjects) + { + foreach (var target in targetSubjects) + { + if (Internal.DataStructures.SubscriptionIndex.SubjectsCollide(target, existing)) + return true; + } + } + } + + return false; + } + + internal bool IsClustered() => _state.Cluster is JetStreamCluster; + + internal bool IsClusteredNoLock() => Interlocked.CompareExchange(ref _state.Clustered, 0, 0) == 1; + + internal Exception? SetupMetaGroup() + { + var server = Server(); + if (server == null) + return new InvalidOperationException("jetstream server unavailable"); + + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is JetStreamCluster) + { + Interlocked.Exchange(ref _state.Clustered, 1); + return null; + } + + var systemAccount = server.SystemAccount(); + if (systemAccount == null) + return ServerErrors.ErrNoSysAccount; + + var storeDir = Path.Combine(_state.Config.StoreDir, systemAccount.Name, "_js_", "_meta_"); + Directory.CreateDirectory(storeDir); + + var raftConfig = new RaftConfig + { + Name = "_meta_", + Store = storeDir, + Recovering = true, + }; + + var (meta, error) = server.StartRaftNode(systemAccount.Name, raftConfig); + if (error != null) + return error; + + _state.Cluster = new JetStreamCluster + { + Meta = meta, + Streams = new Dictionary>(StringComparer.Ordinal), + Server = server, + Client = server.CreateInternalJetStreamClient(), + Qch = System.Threading.Channels.Channel.CreateUnbounded(), + Stopped = System.Threading.Channels.Channel.CreateUnbounded(), + }; + + _state.MetaRecovering = true; + Interlocked.Exchange(ref _state.Clustered, 1); + return null; + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal IRaftNode? GetMetaGroup() + { + _state.Lock.EnterReadLock(); + try + { + return (_state.Cluster as JetStreamCluster)?.Meta; + } + finally + { + _state.Lock.ExitReadLock(); + } + } + + internal NatsServer? Server() => _state.Server as NatsServer; + + internal bool IsLeaderless() + { + var meta = GetMetaGroup(); + if (meta == null) + return false; + if (!meta.Leaderless()) + return false; + + return DateTime.UtcNow - meta.Created() > TimeSpan.FromSeconds(10); + } + + internal bool IsGroupLeaderless(RaftGroup? group) + { + if (group?.Node == null) + return false; + + var meta = GetMetaGroup(); + if (meta == null) + return false; + if (!group.Peers.Contains(meta.ID(), StringComparer.Ordinal)) + return false; + if (!group.Node.Leaderless()) + return false; + + return DateTime.UtcNow - group.Node.Created() > TimeSpan.FromSeconds(10); + } + + internal System.Threading.Channels.Channel? ClusterQuitC() + { + _state.Lock.EnterReadLock(); + try + { + return (_state.Cluster as JetStreamCluster)?.Qch; + } + finally + { + _state.Lock.ExitReadLock(); + } + } + + internal System.Threading.Channels.Channel? ClusterStoppedC() + { + _state.Lock.EnterReadLock(); + try + { + return (_state.Cluster as JetStreamCluster)?.Stopped; + } + finally + { + _state.Lock.ExitReadLock(); + } + } + + internal void SetMetaRecovering() + { + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is JetStreamCluster) + _state.MetaRecovering = true; + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal void ClearMetaRecovering() + { + _state.Lock.EnterWriteLock(); + try + { + _state.MetaRecovering = false; + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal bool IsMetaRecovering() + { + _state.Lock.EnterReadLock(); + try + { + return _state.MetaRecovering; + } + finally + { + _state.Lock.ExitReadLock(); + } + } + + internal void CheckForOrphans() + { + var server = Server(); + if (server == null) + return; + + _state.Lock.EnterReadLock(); + var meta = (_state.Cluster as JetStreamCluster)?.Meta; + _state.Lock.ExitReadLock(); + + if (meta == null || meta.Leaderless() || !meta.Healthy()) + return; + + var (streams, consumers) = GetOrphans(); + foreach (var stream in streams) + stream.Delete(); + foreach (var consumer in consumers) + consumer.Delete(); + } + + internal (List Streams, List Consumers) GetOrphans() + { + var orphanStreams = new List(); + var orphanConsumers = new List(); + + _state.Lock.EnterReadLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return (orphanStreams, orphanConsumers); + + foreach (var (accountName, jsa) in _state.Accounts) + { + cluster.Streams.TryGetValue(accountName, out var assignments); + assignments ??= new Dictionary(StringComparer.Ordinal); + + jsa.Lock.EnterReadLock(); + try + { + foreach (var (streamName, streamValue) in jsa.Streams) + { + if (!assignments.ContainsKey(streamName) && streamValue is NatsStream stream) + orphanStreams.Add(stream); + } + } + finally + { + jsa.Lock.ExitReadLock(); + } + } + } + finally + { + _state.Lock.ExitReadLock(); + } + + return (orphanStreams, orphanConsumers); + } + internal static string FriendlyBytes(T bytes) where T : struct, IConvertible { diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs index 7437ae5..d8978cc 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs @@ -174,6 +174,33 @@ internal sealed partial class JsAccount } } + internal bool StreamAssigned(string stream) + { + Lock.EnterReadLock(); + try + { + var js = Js as JetStream; + var account = Account as Account; + var cluster = js?.Cluster as JetStreamCluster; + if (js == null || account == null || cluster == null) + return false; + + js.Lock.EnterReadLock(); + try + { + return cluster.IsStreamAssigned(account, stream); + } + finally + { + js.Lock.ExitReadLock(); + } + } + finally + { + Lock.ExitReadLock(); + } + } + internal Account? Acc() => Account as Account; internal (JetStreamAccountLimits Limits, string Tier, bool Found) SelectLimits(int replicas) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterMeta.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterMeta.cs new file mode 100644 index 0000000..c6de5e7 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterMeta.cs @@ -0,0 +1,300 @@ +using System.Text.Json; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + internal (int JetStreamEnabled, int Total) TrackedJetStreamServers() + { + if (!IsRunning() || _sys == null) + return (-1, -1); + + var js = 0; + var total = 0; + foreach (var value in _nodeToInfo.Values) + { + if (value is not NodeInfo info) + continue; + + if (info.Js) + js++; + total++; + } + + return (js, total); + } + + internal (JetStream? JetStream, JetStreamCluster? Cluster) GetJetStreamCluster() + { + if (IsShuttingDown()) + return (null, null); + + var js = GetJetStreamState(); + if (js == null) + return (null, null); + + return (js, js.Cluster as JetStreamCluster); + } + + public bool JetStreamIsClustered() + { + var js = GetJetStreamState(); + return js != null && Interlocked.CompareExchange(ref js.Clustered, 0, 0) == 1; + } + + public bool JetStreamIsLeader() + { + var (_, cluster) = GetJetStreamCluster(); + return cluster?.IsLeader() == true; + } + + public bool JetStreamIsCurrent() + { + var js = GetJetStreamState(); + if (js == null) + return false; + + js.Lock.EnterReadLock(); + try + { + if (js.Cluster is not JetStreamCluster cluster) + return true; + + return cluster.Meta?.Current() == true; + } + finally + { + js.Lock.ExitReadLock(); + } + } + + public Exception? JetStreamSnapshotMeta() + { + var js = GetJetStreamState(); + if (js == null) + return ToException(JsApiErrors.NewJSNotEnabledError()); + + js.Lock.EnterReadLock(); + JetStreamCluster? cluster; + IRaftNode? meta; + try + { + cluster = js.Cluster as JetStreamCluster; + meta = cluster?.Meta; + if (cluster == null || meta == null) + return ToException(JsApiErrors.NewJSClusterNotActiveError()); + if (!cluster.IsLeader()) + return ToException(JsApiErrors.NewJSClusterNotLeaderError()); + } + finally + { + js.Lock.ExitReadLock(); + } + + var snapshot = JsonSerializer.SerializeToUtf8Bytes(cluster!.Streams); + meta!.InstallSnapshot(snapshot, force: false); + return null; + } + + public Exception? JetStreamStepdownStream(string account, string stream) + { + var (_, cluster) = GetJetStreamCluster(); + if (cluster == null) + return ToException(JsApiErrors.NewJSClusterNotActiveError()); + + var (acc, error) = LookupAccount(account); + if (error != null) + return error; + if (acc == null) + return new InvalidOperationException("account not found"); + + var (_, streamError) = acc.LookupStream(stream); + if (streamError != null) + return streamError; + + if (cluster.Streams.TryGetValue(account, out var assignments) && + assignments.TryGetValue(stream, out var assignment)) + { + assignment.Group?.Node?.StepDown(); + } + + return null; + } + + public Exception? JetStreamStepdownConsumer(string account, string stream, string consumer) + { + var (_, cluster) = GetJetStreamCluster(); + if (cluster == null) + return ToException(JsApiErrors.NewJSClusterNotActiveError()); + + var (acc, error) = LookupAccount(account); + if (error != null) + return error; + if (acc == null) + return new InvalidOperationException("account not found"); + + var (_, streamError) = acc.LookupStream(stream); + if (streamError != null) + return streamError; + + if (!cluster.Streams.TryGetValue(account, out var assignments) || + !assignments.TryGetValue(stream, out var assignment) || + assignment.Consumers == null || + !assignment.Consumers.TryGetValue(consumer, out var consumerAssignment)) + { + return ToException(JsApiErrors.NewJSConsumerNotFoundError()); + } + + consumerAssignment.Group?.Node?.StepDown(); + return null; + } + + public Exception? JetStreamSnapshotStream(string account, string stream) + { + var (_, cluster) = GetJetStreamCluster(); + if (cluster == null) + return ToException(JsApiErrors.NewJSClusterNotActiveError()); + + var (acc, error) = LookupAccount(account); + if (error != null) + return error; + if (acc == null) + return new InvalidOperationException("account not found"); + + var (natsStream, streamError) = acc.LookupStream(stream); + if (streamError != null) + return streamError; + if (natsStream == null) + return null; + + if (cluster.Streams.TryGetValue(account, out var assignments) && + assignments.TryGetValue(stream, out var assignment)) + { + var snapshot = JsonSerializer.SerializeToUtf8Bytes(natsStream.State()); + assignment.Group?.Node?.InstallSnapshot(snapshot, force: false); + } + + return null; + } + + public string[] JetStreamClusterPeers() + { + var js = GetJetStreamState(); + if (js == null) + return []; + + js.Lock.EnterReadLock(); + try + { + if (js.Cluster is not JetStreamCluster cluster || cluster.Meta == null || !cluster.IsLeader()) + return []; + + var names = new List(); + foreach (var peer in cluster.Meta.Peers()) + { + if (!_nodeToInfo.TryGetValue(peer.Id, out var value) || value is not NodeInfo info) + continue; + if (info.Offline || !info.Js || info.Stats == null) + continue; + + names.Add(info.Name); + } + + return [.. names]; + } + finally + { + js.Lock.ExitReadLock(); + } + } + + public bool JetStreamIsStreamLeader(string account, string stream) + { + var (js, cluster) = GetJetStreamCluster(); + if (js == null || cluster == null) + return false; + + js.Lock.EnterReadLock(); + try + { + return cluster.IsStreamLeader(account, stream); + } + finally + { + js.Lock.ExitReadLock(); + } + } + + public bool JetStreamIsStreamCurrent(string account, string stream) + { + var js = GetJetStreamState(); + if (js == null) + return false; + + js.Lock.EnterReadLock(); + try + { + return (js.Cluster as JetStreamCluster)?.IsStreamCurrent(account, stream) ?? false; + } + finally + { + js.Lock.ExitReadLock(); + } + } + + public bool JetStreamIsConsumerLeader(string account, string stream, string consumer) + { + var (js, cluster) = GetJetStreamCluster(); + if (js == null || cluster == null) + return false; + + js.Lock.EnterReadLock(); + try + { + return cluster.IsConsumerLeader(account, stream, consumer); + } + finally + { + js.Lock.ExitReadLock(); + } + } + + internal Exception? EnableJetStreamClustering() + { + if (!IsRunning()) + return null; + + var js = GetJetStream(); + if (js == null) + return ToException(JsApiErrors.NewJSNotEnabledForAccountError()); + + if (js.IsClustered()) + return null; + + return js.SetupMetaGroup(); + } + + public bool JetStreamIsStreamAssigned(string account, string stream) + { + var (js, cluster) = GetJetStreamCluster(); + if (js == null || cluster == null) + return false; + + var (acc, _) = LookupAccount(account); + if (acc == null) + return false; + + js.Lock.EnterReadLock(); + try + { + return cluster.IsStreamAssigned(acc, stream); + } + finally + { + js.Lock.ExitReadLock(); + } + } + + private static Exception ToException(JsApiError error) => + new(error.Description ?? "jetstream error"); +}