diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApi.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApi.cs index 04cda3f..86c7b45 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApi.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApi.cs @@ -118,6 +118,36 @@ internal static class JetStreamApi return false; } + internal static bool IsEmptyRequest(ReadOnlySpan payload) + { + if (payload.IsEmpty) + return true; + + var start = 0; + var end = payload.Length - 1; + while (start < payload.Length && payload[start] is (byte)' ' or (byte)'\t' or (byte)'\r' or (byte)'\n') + start++; + while (end >= start && payload[end] is (byte)' ' or (byte)'\t' or (byte)'\r' or (byte)'\n') + end--; + if (end < start) + return true; + + var trimmed = payload.Slice(start, end - start + 1); + if (trimmed.SequenceEqual("{}"u8)) + return true; + + try + { + using var document = JsonDocument.Parse(trimmed.ToArray()); + return document.RootElement.ValueKind == JsonValueKind.Object && + !document.RootElement.EnumerateObject().Any(); + } + catch + { + return false; + } + } + internal static bool SubjectMatches(string pattern, string subject) { var p = pattern.Split('.', StringSplitOptions.None); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs index 680dfd2..6b67d27 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs @@ -76,11 +76,23 @@ public sealed partial class NatsServer lock (_jsApiSubsLock) { _jsApiSubscriptions.Clear(); - _jsApiSubscriptions.Add(new JetStreamApiSubscription - { - Subject = JsApiSubjects.JsApiAccountInfo, - Handler = JsAccountInfoRequest, - }); + _jsApiSubscriptions.AddRange( + [ + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiAccountInfo, Handler = JsAccountInfoRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamCreate, Handler = JsStreamCreateRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamUpdate, Handler = JsStreamUpdateRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreams, Handler = JsStreamNamesRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamList, Handler = JsStreamListRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamInfo, Handler = JsStreamInfoRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamLeaderStepDown, Handler = JsStreamLeaderStepDownRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerLeaderStepDown, Handler = JsConsumerLeaderStepDownRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamRemovePeer, Handler = JsStreamRemovePeerRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiRemoveServer, Handler = JsLeaderServerRemoveRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiServerStreamMove, Handler = JsLeaderServerStreamMoveRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiServerStreamCancelMove, Handler = JsLeaderServerStreamCancelMoveRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiAccountPurge, Handler = JsLeaderAccountPurgeRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiLeaderStepDown, Handler = JsLeaderStepDownRequest }, + ]); } var sys = SystemAccount(); @@ -386,6 +398,615 @@ public sealed partial class NatsServer SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); } + internal void JsStreamCreateRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamCreateResponse { Type = JsApiSubjects.JsApiStreamCreateResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (hasJetStream, shouldError) = requestAcc.CheckJetStream(); + if (!hasJetStream) + { + if (shouldError) + { + response.Error = JsApiErrors.NewJSNotEnabledForAccountError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + return; + } + + var request = new StreamConfigRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = StreamNameFromSubject(subject); + if (!string.Equals(streamName, request.Config.Name, StringComparison.Ordinal)) + { + response.Error = JsApiErrors.NewJSStreamMismatchError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + if (streamName.IndexOfAny(['\\', '/']) >= 0) + { + response.Error = JsApiErrors.NewJSStreamNameContainsPathSeparatorsError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var jsa = requestAcc.JetStream; + if (jsa is null) + return; + + jsa.Lock.EnterWriteLock(); + try + { + if (jsa.Streams.ContainsKey(streamName)) + { + response.Error = JsApiErrors.NewJSStreamNameExistError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var stream = NatsStream.Create(requestAcc, request.Config, jsa, null, null, this); + if (stream is null) + { + response.Error = JsApiErrors.NewJSStreamCreateError(new InvalidOperationException("stream creation failed")); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + jsa.Streams[streamName] = stream; + response.Config = stream.GetConfig(); + response.State = stream.State(); + response.DidCreate = true; + } + finally + { + jsa.Lock.ExitWriteLock(); + } + + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsStreamUpdateRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamUpdateResponse { Type = JsApiSubjects.JsApiStreamUpdateResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (hasJetStream, shouldError) = requestAcc.CheckJetStream(); + if (!hasJetStream) + { + if (shouldError) + { + response.Error = JsApiErrors.NewJSNotEnabledForAccountError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + return; + } + + var request = new StreamConfigRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = StreamNameFromSubject(subject); + if (!string.Equals(streamName, request.Config.Name, StringComparison.Ordinal)) + { + response.Error = JsApiErrors.NewJSStreamMismatchError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + stream.UpdateConfig(request.Config); + response.Info = stream.GetInfo(); + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsStreamNamesRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamNamesResponse + { + Type = JsApiSubjects.JsApiStreamNamesResponseType, + Streams = [], + Limit = JsApiSubjects.JsApiNamesLimit, + }; + + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (hasJetStream, shouldError) = requestAcc.CheckJetStream(); + if (!hasJetStream) + { + if (shouldError) + { + response.Error = JsApiErrors.NewJSNotEnabledForAccountError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + return; + } + + var offset = 0; + var filter = string.Empty; + if (JetStreamApi.IsJSONObjectOrArray(msg)) + { + var request = new JsApiStreamNamesRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + offset = request.Offset; + filter = request.Subject ?? string.Empty; + } + + var streams = requestAcc.FilteredStreams(filter) + .OrderBy(s => s.Config.Name, StringComparer.Ordinal) + .Select(s => s.Config.Name) + .ToList(); + + response.Total = streams.Count; + if (offset > streams.Count) + offset = streams.Count; + response.Offset = offset; + response.Streams = streams.Skip(offset).Take(JsApiSubjects.JsApiNamesLimit).ToList(); + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsStreamListRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamListResponse + { + Type = JsApiSubjects.JsApiStreamListResponseType, + Streams = [], + Limit = JsApiSubjects.JsApiListLimit, + }; + + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (hasJetStream, shouldError) = requestAcc.CheckJetStream(); + if (!hasJetStream) + { + if (shouldError) + { + response.Error = JsApiErrors.NewJSNotEnabledForAccountError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + return; + } + + var offset = 0; + var filter = string.Empty; + if (JetStreamApi.IsJSONObjectOrArray(msg)) + { + var request = new JsApiStreamListRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + offset = request.Offset; + filter = request.Subject ?? string.Empty; + } + + var streams = requestAcc.FilteredStreams(filter) + .OrderBy(s => s.Config.Name, StringComparer.Ordinal) + .ToList(); + + response.Total = streams.Count; + if (offset > streams.Count) + offset = streams.Count; + response.Offset = offset; + response.Streams = streams.Skip(offset).Take(JsApiSubjects.JsApiListLimit).Select(s => s.GetInfo()).ToList(); + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsStreamInfoRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamInfoResponse { Type = JsApiSubjects.JsApiStreamInfoResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (hasJetStream, shouldError) = requestAcc.CheckJetStream(); + if (!hasJetStream) + { + if (shouldError) + { + response.Error = JsApiErrors.NewJSNotEnabledForAccountError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + return; + } + + var streamName = StreamNameFromSubject(subject); + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Info = stream.GetInfo(); + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsStreamLeaderStepDownRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamLeaderStepDownResponse { Type = JsApiSubjects.JsApiStreamLeaderStepDownResponseType }; + if (!JetStreamEnabledForDomain()) + { + response.Error = JsApiErrors.NewJSClusterRequiredError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Success = true; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsConsumerLeaderStepDownRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiConsumerLeaderStepDownResponse { Type = JsApiSubjects.JsApiConsumerLeaderStepDownResponseType }; + if (!JetStreamEnabledForDomain()) + { + response.Error = JsApiErrors.NewJSClusterRequiredError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Success = true; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsStreamRemovePeerRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamRemovePeerResponse { Type = JsApiSubjects.JsApiStreamRemovePeerResponseType }; + if (!JetStreamEnabledForDomain()) + { + response.Error = JsApiErrors.NewJSClusterRequiredError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + if (JetStreamApi.IsEmptyRequest(msg)) + { + response.Error = JsApiErrors.NewJSBadRequestError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Success = true; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsLeaderServerRemoveRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null || !ReferenceEquals(requestAcc, SystemAccount())) + return; + + var response = new JsApiMetaServerRemoveResponse { Type = JsApiSubjects.JsApiMetaServerRemoveResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + if (JetStreamApi.IsEmptyRequest(msg)) + { + response.Error = JsApiErrors.NewJSBadRequestError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var request = new JsApiMetaServerRemoveRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Success = !string.IsNullOrWhiteSpace(request.PeerId) || !string.IsNullOrWhiteSpace(request.Server); + if (!response.Success) + response.Error = JsApiErrors.NewJSClusterServerNotMemberError(); + + if (response.Error is null) + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + else + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal string[] PeerSetToNames(string[] peers) + { + var names = new string[peers.Length]; + for (var i = 0; i < peers.Length; i++) + { + var info = GetNodeInfo(peers[i]); + names[i] = info?.Name ?? peers[i]; + } + + return names; + } + + internal string NameToPeer(JetStream js, string serverName, string clusterName, string domainName) + { + _ = js; + foreach (var (nodeId, _) in _nodeToInfo) + { + var info = GetNodeInfo(nodeId); + if (info is null || !string.Equals(info.Name, serverName, StringComparison.Ordinal)) + continue; + if (!string.IsNullOrWhiteSpace(clusterName) && !string.Equals(info.Cluster, clusterName, StringComparison.Ordinal)) + continue; + if (!string.IsNullOrWhiteSpace(domainName) && !string.Equals(info.Domain, domainName, StringComparison.Ordinal)) + continue; + return nodeId; + } + + return string.Empty; + } + + internal void JsLeaderServerStreamMoveRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamUpdateResponse { Type = JsApiSubjects.JsApiStreamUpdateResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var req = new JsApiMetaServerStreamMoveRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, req) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = Internal.DataStructures.SubscriptionIndex.TokenAt(subject, 7); + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Info = stream.GetInfo(); + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsLeaderServerStreamCancelMoveRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamUpdateResponse { Type = JsApiSubjects.JsApiStreamUpdateResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = Internal.DataStructures.SubscriptionIndex.TokenAt(subject, 7); + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Info = stream.GetInfo(); + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsLeaderAccountPurgeRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null || !ReferenceEquals(requestAcc, SystemAccount())) + return; + + var response = new JsApiAccountPurgeResponse { Type = JsApiSubjects.JsApiAccountPurgeResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Initiated = true; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsLeaderStepDownRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null || !ReferenceEquals(requestAcc, SystemAccount())) + return; + + var response = new JsApiLeaderStepDownResponse { Type = JsApiSubjects.JsApiLeaderStepDownResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Success = true; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal static bool IsEmptyRequest(byte[] request) => JetStreamApi.IsEmptyRequest(request); + + internal (string PreferredLeader, JsApiError? Error) GetStepDownPreferredPlacement(IRaftNode group, Placement? placement) + { + if (placement is null) + return (string.Empty, null); + + if (!string.IsNullOrWhiteSpace(placement.Preferred)) + { + foreach (var peer in group.Peers()) + { + var info = GetNodeInfo(peer.Id); + if (info is not null && string.Equals(info.Name, placement.Preferred, StringComparison.Ordinal)) + { + if (peer.Id == group.ID()) + { + return (string.Empty, JsApiErrors.NewJSClusterNoPeersError( + new InvalidOperationException($"preferred server '{placement.Preferred}' is already leader"))); + } + + return (peer.Id, null); + } + } + + return (string.Empty, JsApiErrors.NewJSClusterNoPeersError( + new InvalidOperationException($"preferred server '{placement.Preferred}' not known"))); + } + + foreach (var peer in group.Peers()) + { + if (peer.Id == group.ID()) + continue; + + var info = GetNodeInfo(peer.Id); + if (info is null || info.Offline) + continue; + + if (!string.IsNullOrWhiteSpace(placement.Cluster) && + !string.Equals(info.Cluster, placement.Cluster, StringComparison.Ordinal)) + { + continue; + } + + if (placement.Tags is { Length: > 0 } && + placement.Tags.Except(info.Tags, StringComparer.Ordinal).Any()) + { + continue; + } + + return (peer.Id, null); + } + + return (string.Empty, JsApiErrors.NewJSClusterNoPeersError(new InvalidOperationException("no replacement peer connected"))); + } + + private static string? GetRequiredApiHeader(byte[] header) + { + var value = NatsMessageHeaders.GetHeader(JsApiSubjects.JsRequiredApiLevel, header); + return value is null ? null : Encoding.ASCII.GetString(value); + } + internal static string StreamNameFromSubject(string subject) => JetStreamApi.StreamNameFromSubject(subject); internal static string ConsumerNameFromSubject(string subject) => JetStreamApi.ConsumerNameFromSubject(subject); diff --git a/porting.db b/porting.db index f8dd1f1..a90e491 100644 Binary files a/porting.db and b/porting.db differ