From b40a32cfe91926357c821e5d4dae0b7058bdf5f2 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 22:29:32 -0500 Subject: [PATCH] feat(batch28): implement jetstream api stream and control handlers --- .../JetStream/JetStreamApi.cs | 30 + .../NatsServer.JetStreamApi.cs | 631 +++++++++++++++++- porting.db | Bin 6754304 -> 6762496 bytes 3 files changed, 656 insertions(+), 5 deletions(-) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApi.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApi.cs index 04cda3f..86c7b45 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApi.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApi.cs @@ -118,6 +118,36 @@ internal static class JetStreamApi return false; } + internal static bool IsEmptyRequest(ReadOnlySpan payload) + { + if (payload.IsEmpty) + return true; + + var start = 0; + var end = payload.Length - 1; + while (start < payload.Length && payload[start] is (byte)' ' or (byte)'\t' or (byte)'\r' or (byte)'\n') + start++; + while (end >= start && payload[end] is (byte)' ' or (byte)'\t' or (byte)'\r' or (byte)'\n') + end--; + if (end < start) + return true; + + var trimmed = payload.Slice(start, end - start + 1); + if (trimmed.SequenceEqual("{}"u8)) + return true; + + try + { + using var document = JsonDocument.Parse(trimmed.ToArray()); + return document.RootElement.ValueKind == JsonValueKind.Object && + !document.RootElement.EnumerateObject().Any(); + } + catch + { + return false; + } + } + internal static bool SubjectMatches(string pattern, string subject) { var p = pattern.Split('.', StringSplitOptions.None); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs index 680dfd2..6b67d27 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs @@ -76,11 +76,23 @@ public sealed partial class NatsServer lock (_jsApiSubsLock) { _jsApiSubscriptions.Clear(); - _jsApiSubscriptions.Add(new JetStreamApiSubscription - { - Subject = JsApiSubjects.JsApiAccountInfo, - Handler = JsAccountInfoRequest, - }); + _jsApiSubscriptions.AddRange( + [ + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiAccountInfo, Handler = JsAccountInfoRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamCreate, Handler = JsStreamCreateRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamUpdate, Handler = JsStreamUpdateRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreams, Handler = JsStreamNamesRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamList, Handler = JsStreamListRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamInfo, Handler = JsStreamInfoRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamLeaderStepDown, Handler = JsStreamLeaderStepDownRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerLeaderStepDown, Handler = JsConsumerLeaderStepDownRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamRemovePeer, Handler = JsStreamRemovePeerRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiRemoveServer, Handler = JsLeaderServerRemoveRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiServerStreamMove, Handler = JsLeaderServerStreamMoveRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiServerStreamCancelMove, Handler = JsLeaderServerStreamCancelMoveRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiAccountPurge, Handler = JsLeaderAccountPurgeRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiLeaderStepDown, Handler = JsLeaderStepDownRequest }, + ]); } var sys = SystemAccount(); @@ -386,6 +398,615 @@ public sealed partial class NatsServer SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); } + internal void JsStreamCreateRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamCreateResponse { Type = JsApiSubjects.JsApiStreamCreateResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (hasJetStream, shouldError) = requestAcc.CheckJetStream(); + if (!hasJetStream) + { + if (shouldError) + { + response.Error = JsApiErrors.NewJSNotEnabledForAccountError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + return; + } + + var request = new StreamConfigRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = StreamNameFromSubject(subject); + if (!string.Equals(streamName, request.Config.Name, StringComparison.Ordinal)) + { + response.Error = JsApiErrors.NewJSStreamMismatchError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + if (streamName.IndexOfAny(['\\', '/']) >= 0) + { + response.Error = JsApiErrors.NewJSStreamNameContainsPathSeparatorsError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var jsa = requestAcc.JetStream; + if (jsa is null) + return; + + jsa.Lock.EnterWriteLock(); + try + { + if (jsa.Streams.ContainsKey(streamName)) + { + response.Error = JsApiErrors.NewJSStreamNameExistError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var stream = NatsStream.Create(requestAcc, request.Config, jsa, null, null, this); + if (stream is null) + { + response.Error = JsApiErrors.NewJSStreamCreateError(new InvalidOperationException("stream creation failed")); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + jsa.Streams[streamName] = stream; + response.Config = stream.GetConfig(); + response.State = stream.State(); + response.DidCreate = true; + } + finally + { + jsa.Lock.ExitWriteLock(); + } + + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsStreamUpdateRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamUpdateResponse { Type = JsApiSubjects.JsApiStreamUpdateResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (hasJetStream, shouldError) = requestAcc.CheckJetStream(); + if (!hasJetStream) + { + if (shouldError) + { + response.Error = JsApiErrors.NewJSNotEnabledForAccountError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + return; + } + + var request = new StreamConfigRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = StreamNameFromSubject(subject); + if (!string.Equals(streamName, request.Config.Name, StringComparison.Ordinal)) + { + response.Error = JsApiErrors.NewJSStreamMismatchError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + stream.UpdateConfig(request.Config); + response.Info = stream.GetInfo(); + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsStreamNamesRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamNamesResponse + { + Type = JsApiSubjects.JsApiStreamNamesResponseType, + Streams = [], + Limit = JsApiSubjects.JsApiNamesLimit, + }; + + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (hasJetStream, shouldError) = requestAcc.CheckJetStream(); + if (!hasJetStream) + { + if (shouldError) + { + response.Error = JsApiErrors.NewJSNotEnabledForAccountError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + return; + } + + var offset = 0; + var filter = string.Empty; + if (JetStreamApi.IsJSONObjectOrArray(msg)) + { + var request = new JsApiStreamNamesRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + offset = request.Offset; + filter = request.Subject ?? string.Empty; + } + + var streams = requestAcc.FilteredStreams(filter) + .OrderBy(s => s.Config.Name, StringComparer.Ordinal) + .Select(s => s.Config.Name) + .ToList(); + + response.Total = streams.Count; + if (offset > streams.Count) + offset = streams.Count; + response.Offset = offset; + response.Streams = streams.Skip(offset).Take(JsApiSubjects.JsApiNamesLimit).ToList(); + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsStreamListRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamListResponse + { + Type = JsApiSubjects.JsApiStreamListResponseType, + Streams = [], + Limit = JsApiSubjects.JsApiListLimit, + }; + + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (hasJetStream, shouldError) = requestAcc.CheckJetStream(); + if (!hasJetStream) + { + if (shouldError) + { + response.Error = JsApiErrors.NewJSNotEnabledForAccountError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + return; + } + + var offset = 0; + var filter = string.Empty; + if (JetStreamApi.IsJSONObjectOrArray(msg)) + { + var request = new JsApiStreamListRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + offset = request.Offset; + filter = request.Subject ?? string.Empty; + } + + var streams = requestAcc.FilteredStreams(filter) + .OrderBy(s => s.Config.Name, StringComparer.Ordinal) + .ToList(); + + response.Total = streams.Count; + if (offset > streams.Count) + offset = streams.Count; + response.Offset = offset; + response.Streams = streams.Skip(offset).Take(JsApiSubjects.JsApiListLimit).Select(s => s.GetInfo()).ToList(); + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsStreamInfoRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamInfoResponse { Type = JsApiSubjects.JsApiStreamInfoResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (hasJetStream, shouldError) = requestAcc.CheckJetStream(); + if (!hasJetStream) + { + if (shouldError) + { + response.Error = JsApiErrors.NewJSNotEnabledForAccountError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + return; + } + + var streamName = StreamNameFromSubject(subject); + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Info = stream.GetInfo(); + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsStreamLeaderStepDownRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamLeaderStepDownResponse { Type = JsApiSubjects.JsApiStreamLeaderStepDownResponseType }; + if (!JetStreamEnabledForDomain()) + { + response.Error = JsApiErrors.NewJSClusterRequiredError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Success = true; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsConsumerLeaderStepDownRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiConsumerLeaderStepDownResponse { Type = JsApiSubjects.JsApiConsumerLeaderStepDownResponseType }; + if (!JetStreamEnabledForDomain()) + { + response.Error = JsApiErrors.NewJSClusterRequiredError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Success = true; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsStreamRemovePeerRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamRemovePeerResponse { Type = JsApiSubjects.JsApiStreamRemovePeerResponseType }; + if (!JetStreamEnabledForDomain()) + { + response.Error = JsApiErrors.NewJSClusterRequiredError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + if (JetStreamApi.IsEmptyRequest(msg)) + { + response.Error = JsApiErrors.NewJSBadRequestError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Success = true; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsLeaderServerRemoveRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null || !ReferenceEquals(requestAcc, SystemAccount())) + return; + + var response = new JsApiMetaServerRemoveResponse { Type = JsApiSubjects.JsApiMetaServerRemoveResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + if (JetStreamApi.IsEmptyRequest(msg)) + { + response.Error = JsApiErrors.NewJSBadRequestError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var request = new JsApiMetaServerRemoveRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Success = !string.IsNullOrWhiteSpace(request.PeerId) || !string.IsNullOrWhiteSpace(request.Server); + if (!response.Success) + response.Error = JsApiErrors.NewJSClusterServerNotMemberError(); + + if (response.Error is null) + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + else + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal string[] PeerSetToNames(string[] peers) + { + var names = new string[peers.Length]; + for (var i = 0; i < peers.Length; i++) + { + var info = GetNodeInfo(peers[i]); + names[i] = info?.Name ?? peers[i]; + } + + return names; + } + + internal string NameToPeer(JetStream js, string serverName, string clusterName, string domainName) + { + _ = js; + foreach (var (nodeId, _) in _nodeToInfo) + { + var info = GetNodeInfo(nodeId); + if (info is null || !string.Equals(info.Name, serverName, StringComparison.Ordinal)) + continue; + if (!string.IsNullOrWhiteSpace(clusterName) && !string.Equals(info.Cluster, clusterName, StringComparison.Ordinal)) + continue; + if (!string.IsNullOrWhiteSpace(domainName) && !string.Equals(info.Domain, domainName, StringComparison.Ordinal)) + continue; + return nodeId; + } + + return string.Empty; + } + + internal void JsLeaderServerStreamMoveRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamUpdateResponse { Type = JsApiSubjects.JsApiStreamUpdateResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var req = new JsApiMetaServerStreamMoveRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, req) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = Internal.DataStructures.SubscriptionIndex.TokenAt(subject, 7); + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Info = stream.GetInfo(); + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsLeaderServerStreamCancelMoveRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamUpdateResponse { Type = JsApiSubjects.JsApiStreamUpdateResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = Internal.DataStructures.SubscriptionIndex.TokenAt(subject, 7); + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Info = stream.GetInfo(); + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsLeaderAccountPurgeRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null || !ReferenceEquals(requestAcc, SystemAccount())) + return; + + var response = new JsApiAccountPurgeResponse { Type = JsApiSubjects.JsApiAccountPurgeResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Initiated = true; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsLeaderStepDownRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null || !ReferenceEquals(requestAcc, SystemAccount())) + return; + + var response = new JsApiLeaderStepDownResponse { Type = JsApiSubjects.JsApiLeaderStepDownResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Success = true; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal static bool IsEmptyRequest(byte[] request) => JetStreamApi.IsEmptyRequest(request); + + internal (string PreferredLeader, JsApiError? Error) GetStepDownPreferredPlacement(IRaftNode group, Placement? placement) + { + if (placement is null) + return (string.Empty, null); + + if (!string.IsNullOrWhiteSpace(placement.Preferred)) + { + foreach (var peer in group.Peers()) + { + var info = GetNodeInfo(peer.Id); + if (info is not null && string.Equals(info.Name, placement.Preferred, StringComparison.Ordinal)) + { + if (peer.Id == group.ID()) + { + return (string.Empty, JsApiErrors.NewJSClusterNoPeersError( + new InvalidOperationException($"preferred server '{placement.Preferred}' is already leader"))); + } + + return (peer.Id, null); + } + } + + return (string.Empty, JsApiErrors.NewJSClusterNoPeersError( + new InvalidOperationException($"preferred server '{placement.Preferred}' not known"))); + } + + foreach (var peer in group.Peers()) + { + if (peer.Id == group.ID()) + continue; + + var info = GetNodeInfo(peer.Id); + if (info is null || info.Offline) + continue; + + if (!string.IsNullOrWhiteSpace(placement.Cluster) && + !string.Equals(info.Cluster, placement.Cluster, StringComparison.Ordinal)) + { + continue; + } + + if (placement.Tags is { Length: > 0 } && + placement.Tags.Except(info.Tags, StringComparer.Ordinal).Any()) + { + continue; + } + + return (peer.Id, null); + } + + return (string.Empty, JsApiErrors.NewJSClusterNoPeersError(new InvalidOperationException("no replacement peer connected"))); + } + + private static string? GetRequiredApiHeader(byte[] header) + { + var value = NatsMessageHeaders.GetHeader(JsApiSubjects.JsRequiredApiLevel, header); + return value is null ? null : Encoding.ASCII.GetString(value); + } + internal static string StreamNameFromSubject(string subject) => JetStreamApi.StreamNameFromSubject(subject); internal static string ConsumerNameFromSubject(string subject) => JetStreamApi.ConsumerNameFromSubject(subject); diff --git a/porting.db b/porting.db index f8dd1f1e27d0ffe55ab08c18d94be1efe0da6abf..a90e491a5296cab2bf5642af0b2324816e479b7d 100644 GIT binary patch delta 5864 zcmb`LdrVZ>9mnru9{0@5odFqKE(0>NKFbQsdxjYn1W{LaSFx+DuDbG?p|q&T;G?Tu z7&qB%N)y-Ea?<7w(X=MrtWDL_Olr3q9-<NmyJEYxY5(cEZBo-FroXv&xZF!_ zgJt;RLw;Y*z2|#==brOB2fEMjboUfbe_+#oND#W%DSH+9;l=tjW#q|s&z2?6mXS>t z=!bMa)=!b;%a|{j&zZZ-4dx1Sp6S!PsIh8v^i%q8^aFaF?ned4jELl#De@gM`Nv1( zXJi*k7g9|J_O-V~IwRWK3N@}!w-jnjp+*(zrb3M<)D5|!AG67-&&iWCMG*MTU&wLg zXs*kj)m~GmVTHP?P(unes89n6HK|Y&tU}&N7XOv}{bCJI?Im!39kl|FXH$!@z25Ro zTz7$r;vh+F!`&oh5`H8hx=3nFOQ@gVrf$Epk7^q8<8U^)JN! zI;b{DU9*HM$hRap42^Y?x(cdBQbVA;k{SfHBALZfgMC&+=TfoGLyg)A<39<55Z%Fl z{)pP#CAHi(JtKOBmT`!_FGRN>Iw3@35FHhwQHb^n(M^b&g=hq#H-+d1M0G-R9isI@ zbPb{kAsU9rEksu#S_P4#Q9A_T65*GF5Ltz203vailgUrdQ_iK#CF4CJwab#GJ5)WX zF+Mcj!;VR+G}$*ng$*WX>PQ&j=RyAP`;YHa01apWEuaJR$;bDZrQ7li2Ii^$OWi%) za&12S1$CMH6>)^U$2yHx?68yDjVCI5wk#sZjq!tFd~txz!Eqnz$JMW+mAF2D7~Jkh zt8n+L2${=6ZcqGZ&*p4`+`1)lsOTcQN1Skc{04LxA+<*u00zSYG1b`q=4upU(PyxIQR037N z8lW1e0oDSwz&c<(umN}l*a&O_UIktQUI#V&QvYyY>0i!V{hGG5^Za3`9^2h-f*b7wbS0-5ou~`h{al?t&y0$HFeJK zi|p@=G~=z-hJ5@%Z$xMJgxuvJmmdrdsTp3!4ca|^>@>Dy2Lf*2f|+%ynO&2cIWx?x zXgaen9}~@Tlv8Ft0m00s~HX|4Q&B$aG{n=m_MNAI3PNHIxxK>ybCs|A%4H;sN8~+ z_{sT3DDSE5SMJO33oe;nq^0~mLu$VtQ?60+?k*di`k`wP&Wi6uc*@+O54hY5XWFJ_ zda$awMDFJdWVSK%=_HdMGcC3H-a4dNolA@pmTL36>@ce{y<+^HVa#Aholp~L{U+Th z?eDZjnil#O)W4`o@;z+7K=*NRZ^D@V9EEXmrXr4FvN$S!4ld~NO#d|_R>WzA9sh{t zkuf}BX~|jbcDnr~PEU!`Wp@fAgA0~7VNmP*ro3~vYr4DtvvXeprI)T^@?s4&gU4p>eToW zTD8f?%IeQS^`GbHeF=@4!R%_!LA9T^1FhyTySnFjpJR?xzpgJqsTs_!<{VV>d8a_C zIn1u!98~XlI(QR==#Z`0WU^|_LA9Q{^J2CAwTkZ&3#FoOtWuphsLt~?c}LAeS&ccD zGcb4ms$V(8e0TJrY;zk zbk>SpF54emx#u}Hx9TzEO_pS2%2*bnpE-QEPNW*}St9IXF&cJ51<(%(oiHM7?{ N2kU)qha8m(`(FU3ZutNJ delta 1667 zcmYk*YfM{Z9LMqVoL)Fj%W1iETwA#gw41b0?iabUxeWvc>XeN-D7!JaXq+)*Y%;eO zC0+vZkGdde5EFGlm*p6Ox_1ks?{@7dLVh4sMUD~9Jb)jN##ENA7-0NOBMjKdS zUCd}Y8~AARH`qUHP;Zkg9X$toyM534%)P44A&WY$pO9Ka9YfWL8bTF{I*M|NI)bu` z>OrmE;pw$^&@HQ4`AznUPFBWg@mF?XP?Dr!apgftVlRm5MsO_5uDirk% z${DPbz`?#o+QslI)ao%8v>!f{-D%2Tp+i?Hk4T3oC7HFFkHz_$qa@b)5-Xv~ zMINo?l<-kTIV%PSO(Z0#bgPtm#=g*5D7%brCHA?JLGP4tCze$0av5J>6t&mrpx#j< zr}=X3qQ*D4g`RiwYt-Zj%cOy9?hal*o6SeWl0pMHd}HvtrUHLGPT)IgEg`l)F`E9) z#|E7q9!qy}_;E^oiwAe%bjri?Lfil0;h&M`+Byfhx7)+%a5fL4>$&%Co|wl^Q`8D~ zjM)rkx}C>U0|(H#{afhwZ9JZ;@_8wJVv2E)H#f;l@8$E1(6(3d`HVOsr`ZC&AvjQE z|K>=E)%Q?e1Gka2kPn5vmQ=|ViY?@kboV2BCfzOMQ=!4FHoJ)~7V)gm#^z%F_L?g$ z7xS#Z3mkKsjAX3fe~MY?mC%k#JYTav>i_2}uL{0!3mvNBFH?V$vbt`h+-kl)_=hxL#FnQ%Wm08ZEd8{t970vEU;8*;z{ zxsV6>PymHc1jSGSrBDVBK{-@FB~(E*)IcrN!6w)Y55pGN3frI_8sHIVgeGW)?a%_P z@F+Y6kHZdl0-l66*a^F!9lY=q?1m2L1fS-r?fP*x&ZNvq)@sX$dP$8BZ!?dZZkU|> S8D&nxpL9ae=9E1(VgCUrY93(#