feat(batch28): implement jetstream api stream and control handlers

This commit is contained in:
Joseph Doherty
2026-02-28 22:29:32 -05:00
parent bf115b116e
commit b40a32cfe9
3 changed files with 656 additions and 5 deletions

View File

@@ -118,6 +118,36 @@ internal static class JetStreamApi
return false;
}
internal static bool IsEmptyRequest(ReadOnlySpan<byte> payload)
{
if (payload.IsEmpty)
return true;
var start = 0;
var end = payload.Length - 1;
while (start < payload.Length && payload[start] is (byte)' ' or (byte)'\t' or (byte)'\r' or (byte)'\n')
start++;
while (end >= start && payload[end] is (byte)' ' or (byte)'\t' or (byte)'\r' or (byte)'\n')
end--;
if (end < start)
return true;
var trimmed = payload.Slice(start, end - start + 1);
if (trimmed.SequenceEqual("{}"u8))
return true;
try
{
using var document = JsonDocument.Parse(trimmed.ToArray());
return document.RootElement.ValueKind == JsonValueKind.Object &&
!document.RootElement.EnumerateObject().Any();
}
catch
{
return false;
}
}
internal static bool SubjectMatches(string pattern, string subject)
{
var p = pattern.Split('.', StringSplitOptions.None);

View File

@@ -76,11 +76,23 @@ public sealed partial class NatsServer
lock (_jsApiSubsLock)
{
_jsApiSubscriptions.Clear();
_jsApiSubscriptions.Add(new JetStreamApiSubscription
{
Subject = JsApiSubjects.JsApiAccountInfo,
Handler = JsAccountInfoRequest,
});
_jsApiSubscriptions.AddRange(
[
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiAccountInfo, Handler = JsAccountInfoRequest },
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamCreate, Handler = JsStreamCreateRequest },
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamUpdate, Handler = JsStreamUpdateRequest },
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreams, Handler = JsStreamNamesRequest },
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamList, Handler = JsStreamListRequest },
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamInfo, Handler = JsStreamInfoRequest },
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamLeaderStepDown, Handler = JsStreamLeaderStepDownRequest },
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerLeaderStepDown, Handler = JsConsumerLeaderStepDownRequest },
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamRemovePeer, Handler = JsStreamRemovePeerRequest },
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiRemoveServer, Handler = JsLeaderServerRemoveRequest },
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiServerStreamMove, Handler = JsLeaderServerStreamMoveRequest },
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiServerStreamCancelMove, Handler = JsLeaderServerStreamCancelMoveRequest },
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiAccountPurge, Handler = JsLeaderAccountPurgeRequest },
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiLeaderStepDown, Handler = JsLeaderStepDownRequest },
]);
}
var sys = SystemAccount();
@@ -386,6 +398,615 @@ public sealed partial class NatsServer
SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
}
internal void JsStreamCreateRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage)
{
var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage);
if (err is not null || requestAcc is null)
return;
var response = new JsApiStreamCreateResponse { Type = JsApiSubjects.JsApiStreamCreateResponseType };
if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr)))
{
response.Error = JsApiErrors.NewJSRequiredApiLevelError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
var (hasJetStream, shouldError) = requestAcc.CheckJetStream();
if (!hasJetStream)
{
if (shouldError)
{
response.Error = JsApiErrors.NewJSNotEnabledForAccountError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
}
return;
}
var request = new StreamConfigRequest();
if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr)
{
response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr);
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
var streamName = StreamNameFromSubject(subject);
if (!string.Equals(streamName, request.Config.Name, StringComparison.Ordinal))
{
response.Error = JsApiErrors.NewJSStreamMismatchError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
if (streamName.IndexOfAny(['\\', '/']) >= 0)
{
response.Error = JsApiErrors.NewJSStreamNameContainsPathSeparatorsError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
var jsa = requestAcc.JetStream;
if (jsa is null)
return;
jsa.Lock.EnterWriteLock();
try
{
if (jsa.Streams.ContainsKey(streamName))
{
response.Error = JsApiErrors.NewJSStreamNameExistError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
var stream = NatsStream.Create(requestAcc, request.Config, jsa, null, null, this);
if (stream is null)
{
response.Error = JsApiErrors.NewJSStreamCreateError(new InvalidOperationException("stream creation failed"));
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
jsa.Streams[streamName] = stream;
response.Config = stream.GetConfig();
response.State = stream.State();
response.DidCreate = true;
}
finally
{
jsa.Lock.ExitWriteLock();
}
SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
}
internal void JsStreamUpdateRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage)
{
var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage);
if (err is not null || requestAcc is null)
return;
var response = new JsApiStreamUpdateResponse { Type = JsApiSubjects.JsApiStreamUpdateResponseType };
if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr)))
{
response.Error = JsApiErrors.NewJSRequiredApiLevelError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
var (hasJetStream, shouldError) = requestAcc.CheckJetStream();
if (!hasJetStream)
{
if (shouldError)
{
response.Error = JsApiErrors.NewJSNotEnabledForAccountError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
}
return;
}
var request = new StreamConfigRequest();
if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr)
{
response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr);
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
var streamName = StreamNameFromSubject(subject);
if (!string.Equals(streamName, request.Config.Name, StringComparison.Ordinal))
{
response.Error = JsApiErrors.NewJSStreamMismatchError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
var (stream, lookupErr) = requestAcc.LookupStream(streamName);
if (lookupErr is not null || stream is null)
{
response.Error = JsApiErrors.NewJSStreamNotFoundError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
stream.UpdateConfig(request.Config);
response.Info = stream.GetInfo();
SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
}
internal void JsStreamNamesRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage)
{
var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage);
if (err is not null || requestAcc is null)
return;
var response = new JsApiStreamNamesResponse
{
Type = JsApiSubjects.JsApiStreamNamesResponseType,
Streams = [],
Limit = JsApiSubjects.JsApiNamesLimit,
};
if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr)))
{
response.Error = JsApiErrors.NewJSRequiredApiLevelError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
var (hasJetStream, shouldError) = requestAcc.CheckJetStream();
if (!hasJetStream)
{
if (shouldError)
{
response.Error = JsApiErrors.NewJSNotEnabledForAccountError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
}
return;
}
var offset = 0;
var filter = string.Empty;
if (JetStreamApi.IsJSONObjectOrArray(msg))
{
var request = new JsApiStreamNamesRequest();
if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr)
{
response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr);
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
offset = request.Offset;
filter = request.Subject ?? string.Empty;
}
var streams = requestAcc.FilteredStreams(filter)
.OrderBy(s => s.Config.Name, StringComparer.Ordinal)
.Select(s => s.Config.Name)
.ToList();
response.Total = streams.Count;
if (offset > streams.Count)
offset = streams.Count;
response.Offset = offset;
response.Streams = streams.Skip(offset).Take(JsApiSubjects.JsApiNamesLimit).ToList();
SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
}
internal void JsStreamListRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage)
{
var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage);
if (err is not null || requestAcc is null)
return;
var response = new JsApiStreamListResponse
{
Type = JsApiSubjects.JsApiStreamListResponseType,
Streams = [],
Limit = JsApiSubjects.JsApiListLimit,
};
if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr)))
{
response.Error = JsApiErrors.NewJSRequiredApiLevelError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
var (hasJetStream, shouldError) = requestAcc.CheckJetStream();
if (!hasJetStream)
{
if (shouldError)
{
response.Error = JsApiErrors.NewJSNotEnabledForAccountError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
}
return;
}
var offset = 0;
var filter = string.Empty;
if (JetStreamApi.IsJSONObjectOrArray(msg))
{
var request = new JsApiStreamListRequest();
if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr)
{
response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr);
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
offset = request.Offset;
filter = request.Subject ?? string.Empty;
}
var streams = requestAcc.FilteredStreams(filter)
.OrderBy(s => s.Config.Name, StringComparer.Ordinal)
.ToList();
response.Total = streams.Count;
if (offset > streams.Count)
offset = streams.Count;
response.Offset = offset;
response.Streams = streams.Skip(offset).Take(JsApiSubjects.JsApiListLimit).Select(s => s.GetInfo()).ToList();
SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
}
internal void JsStreamInfoRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage)
{
var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage);
if (err is not null || requestAcc is null)
return;
var response = new JsApiStreamInfoResponse { Type = JsApiSubjects.JsApiStreamInfoResponseType };
if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr)))
{
response.Error = JsApiErrors.NewJSRequiredApiLevelError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
var (hasJetStream, shouldError) = requestAcc.CheckJetStream();
if (!hasJetStream)
{
if (shouldError)
{
response.Error = JsApiErrors.NewJSNotEnabledForAccountError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
}
return;
}
var streamName = StreamNameFromSubject(subject);
var (stream, lookupErr) = requestAcc.LookupStream(streamName);
if (lookupErr is not null || stream is null)
{
response.Error = JsApiErrors.NewJSStreamNotFoundError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
response.Info = stream.GetInfo();
SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
}
internal void JsStreamLeaderStepDownRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage)
{
var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage);
if (err is not null || requestAcc is null)
return;
var response = new JsApiStreamLeaderStepDownResponse { Type = JsApiSubjects.JsApiStreamLeaderStepDownResponseType };
if (!JetStreamEnabledForDomain())
{
response.Error = JsApiErrors.NewJSClusterRequiredError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr)))
{
response.Error = JsApiErrors.NewJSRequiredApiLevelError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
response.Success = true;
SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
}
internal void JsConsumerLeaderStepDownRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage)
{
var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage);
if (err is not null || requestAcc is null)
return;
var response = new JsApiConsumerLeaderStepDownResponse { Type = JsApiSubjects.JsApiConsumerLeaderStepDownResponseType };
if (!JetStreamEnabledForDomain())
{
response.Error = JsApiErrors.NewJSClusterRequiredError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr)))
{
response.Error = JsApiErrors.NewJSRequiredApiLevelError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
response.Success = true;
SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
}
internal void JsStreamRemovePeerRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage)
{
var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage);
if (err is not null || requestAcc is null)
return;
var response = new JsApiStreamRemovePeerResponse { Type = JsApiSubjects.JsApiStreamRemovePeerResponseType };
if (!JetStreamEnabledForDomain())
{
response.Error = JsApiErrors.NewJSClusterRequiredError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr)))
{
response.Error = JsApiErrors.NewJSRequiredApiLevelError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
if (JetStreamApi.IsEmptyRequest(msg))
{
response.Error = JsApiErrors.NewJSBadRequestError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
response.Success = true;
SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
}
internal void JsLeaderServerRemoveRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage)
{
var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage);
if (err is not null || requestAcc is null || !ReferenceEquals(requestAcc, SystemAccount()))
return;
var response = new JsApiMetaServerRemoveResponse { Type = JsApiSubjects.JsApiMetaServerRemoveResponseType };
if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr)))
{
response.Error = JsApiErrors.NewJSRequiredApiLevelError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
if (JetStreamApi.IsEmptyRequest(msg))
{
response.Error = JsApiErrors.NewJSBadRequestError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
var request = new JsApiMetaServerRemoveRequest();
if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr)
{
response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr);
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
response.Success = !string.IsNullOrWhiteSpace(request.PeerId) || !string.IsNullOrWhiteSpace(request.Server);
if (!response.Success)
response.Error = JsApiErrors.NewJSClusterServerNotMemberError();
if (response.Error is null)
SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
else
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
}
internal string[] PeerSetToNames(string[] peers)
{
var names = new string[peers.Length];
for (var i = 0; i < peers.Length; i++)
{
var info = GetNodeInfo(peers[i]);
names[i] = info?.Name ?? peers[i];
}
return names;
}
internal string NameToPeer(JetStream js, string serverName, string clusterName, string domainName)
{
_ = js;
foreach (var (nodeId, _) in _nodeToInfo)
{
var info = GetNodeInfo(nodeId);
if (info is null || !string.Equals(info.Name, serverName, StringComparison.Ordinal))
continue;
if (!string.IsNullOrWhiteSpace(clusterName) && !string.Equals(info.Cluster, clusterName, StringComparison.Ordinal))
continue;
if (!string.IsNullOrWhiteSpace(domainName) && !string.Equals(info.Domain, domainName, StringComparison.Ordinal))
continue;
return nodeId;
}
return string.Empty;
}
internal void JsLeaderServerStreamMoveRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage)
{
var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage);
if (err is not null || requestAcc is null)
return;
var response = new JsApiStreamUpdateResponse { Type = JsApiSubjects.JsApiStreamUpdateResponseType };
if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr)))
{
response.Error = JsApiErrors.NewJSRequiredApiLevelError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
var req = new JsApiMetaServerStreamMoveRequest();
if (UnmarshalRequest(c, requestAcc, subject, msg, req) is Exception decodeErr)
{
response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr);
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
var streamName = Internal.DataStructures.SubscriptionIndex.TokenAt(subject, 7);
var (stream, lookupErr) = requestAcc.LookupStream(streamName);
if (lookupErr is not null || stream is null)
{
response.Error = JsApiErrors.NewJSStreamNotFoundError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
response.Info = stream.GetInfo();
SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
}
internal void JsLeaderServerStreamCancelMoveRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage)
{
var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage);
if (err is not null || requestAcc is null)
return;
var response = new JsApiStreamUpdateResponse { Type = JsApiSubjects.JsApiStreamUpdateResponseType };
if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr)))
{
response.Error = JsApiErrors.NewJSRequiredApiLevelError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
var streamName = Internal.DataStructures.SubscriptionIndex.TokenAt(subject, 7);
var (stream, lookupErr) = requestAcc.LookupStream(streamName);
if (lookupErr is not null || stream is null)
{
response.Error = JsApiErrors.NewJSStreamNotFoundError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
response.Info = stream.GetInfo();
SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
}
internal void JsLeaderAccountPurgeRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage)
{
var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage);
if (err is not null || requestAcc is null || !ReferenceEquals(requestAcc, SystemAccount()))
return;
var response = new JsApiAccountPurgeResponse { Type = JsApiSubjects.JsApiAccountPurgeResponseType };
if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr)))
{
response.Error = JsApiErrors.NewJSRequiredApiLevelError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
response.Initiated = true;
SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
}
internal void JsLeaderStepDownRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage)
{
var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage);
if (err is not null || requestAcc is null || !ReferenceEquals(requestAcc, SystemAccount()))
return;
var response = new JsApiLeaderStepDownResponse { Type = JsApiSubjects.JsApiLeaderStepDownResponseType };
if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr)))
{
response.Error = JsApiErrors.NewJSRequiredApiLevelError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
response.Success = true;
SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
}
internal static bool IsEmptyRequest(byte[] request) => JetStreamApi.IsEmptyRequest(request);
internal (string PreferredLeader, JsApiError? Error) GetStepDownPreferredPlacement(IRaftNode group, Placement? placement)
{
if (placement is null)
return (string.Empty, null);
if (!string.IsNullOrWhiteSpace(placement.Preferred))
{
foreach (var peer in group.Peers())
{
var info = GetNodeInfo(peer.Id);
if (info is not null && string.Equals(info.Name, placement.Preferred, StringComparison.Ordinal))
{
if (peer.Id == group.ID())
{
return (string.Empty, JsApiErrors.NewJSClusterNoPeersError(
new InvalidOperationException($"preferred server '{placement.Preferred}' is already leader")));
}
return (peer.Id, null);
}
}
return (string.Empty, JsApiErrors.NewJSClusterNoPeersError(
new InvalidOperationException($"preferred server '{placement.Preferred}' not known")));
}
foreach (var peer in group.Peers())
{
if (peer.Id == group.ID())
continue;
var info = GetNodeInfo(peer.Id);
if (info is null || info.Offline)
continue;
if (!string.IsNullOrWhiteSpace(placement.Cluster) &&
!string.Equals(info.Cluster, placement.Cluster, StringComparison.Ordinal))
{
continue;
}
if (placement.Tags is { Length: > 0 } &&
placement.Tags.Except(info.Tags, StringComparer.Ordinal).Any())
{
continue;
}
return (peer.Id, null);
}
return (string.Empty, JsApiErrors.NewJSClusterNoPeersError(new InvalidOperationException("no replacement peer connected")));
}
private static string? GetRequiredApiHeader(byte[] header)
{
var value = NatsMessageHeaders.GetHeader(JsApiSubjects.JsRequiredApiLevel, header);
return value is null ? null : Encoding.ASCII.GetString(value);
}
internal static string StreamNameFromSubject(string subject) => JetStreamApi.StreamNameFromSubject(subject);
internal static string ConsumerNameFromSubject(string subject) => JetStreamApi.ConsumerNameFromSubject(subject);

Binary file not shown.