diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs index 55401f2..51eeb2f 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs @@ -178,6 +178,72 @@ public sealed partial class Account } } + internal void TrackAPI() + { + _mu.EnterReadLock(); + var jsa = JetStream; + _mu.ExitReadLock(); + if (jsa is null) + return; + + jsa.UsageLock.EnterWriteLock(); + try + { + jsa.UsageApi++; + jsa.ApiTotal++; + jsa.SendClusterUsageUpdate(); + + if (jsa.Js is JetStream js) + Interlocked.Add(ref js.ApiTotal, 1); + } + finally + { + jsa.UsageLock.ExitWriteLock(); + } + } + + internal void TrackAPIErr() + { + _mu.EnterReadLock(); + var jsa = JetStream; + _mu.ExitReadLock(); + if (jsa is null) + return; + + jsa.UsageLock.EnterWriteLock(); + try + { + jsa.UsageApi++; + jsa.ApiTotal++; + jsa.UsageErr++; + jsa.ApiErrors++; + jsa.SendClusterUsageUpdate(); + + if (jsa.Js is JetStream js) + { + Interlocked.Add(ref js.ApiTotal, 1); + Interlocked.Add(ref js.ApiErrors, 1); + } + } + finally + { + jsa.UsageLock.ExitWriteLock(); + } + } + + internal (bool Enabled, bool ShouldError) CheckJetStream() + { + _mu.EnterReadLock(); + try + { + return (JetStream is not null, _nleafs + _nrleafs == 0); + } + finally + { + _mu.ExitReadLock(); + } + } + internal (bool MaxBytesRequired, long MaxStreamBytes) MaxBytesLimits(StreamConfig? cfg) { _mu.EnterReadLock(); @@ -530,4 +596,22 @@ public sealed partial class Account return EnableAllJetStreamServiceImportsAndMappings(); } + + internal Exception? JsNonClusteredStreamLimitsCheck(StreamConfig config) + { + var (server, jsa, err) = CheckForJetStream(); + if (err is not null || server is null || jsa is null) + return err ?? new InvalidOperationException("jetstream not enabled for account"); + + var selected = jsa.SelectLimits(config.Replicas); + if (!selected.Found) + return new InvalidOperationException(JsApiErrors.NewJSNoLimitsError().Description ?? "jetstream limits not configured"); + + var reservation = jsa.TieredReservation(selected.Tier, config); + var js = server.GetJetStream(); + if (js is null) + return new InvalidOperationException("jetstream not enabled"); + + return js.CheckAccountLimits(selected.Limits, config, reservation); + } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApi.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApi.cs new file mode 100644 index 0000000..86c7b45 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApi.cs @@ -0,0 +1,190 @@ +using System.Text.Json; +using System.Text.Json.Serialization; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +internal delegate void JetStreamApiHandler(ClientConnection c, Account acc, string subject, string reply, byte[] message); + +internal sealed class JetStreamApiSubscription +{ + public required string Subject { get; init; } + public required JetStreamApiHandler Handler { get; init; } +} + +internal sealed class JsApiRoutedRequest +{ + public required JetStreamApiSubscription Subscription { get; init; } + public required Account Account { get; init; } + public required string Subject { get; init; } + public required string Reply { get; init; } + public required byte[] Message { get; init; } +} + +internal sealed class DelayedApiResponse +{ + public ClientInfo? ClientInfo { get; init; } + public required Account Account { get; init; } + public required string Subject { get; init; } + public required string Reply { get; init; } + public string Request { get; init; } = string.Empty; + public byte[]? Header { get; init; } + public required string Response { get; init; } + public DateTime DeadlineUtc { get; init; } + public bool RawResponse { get; init; } + public DelayedApiResponse? Next { get; set; } +} + +internal static class JetStreamApi +{ + internal static readonly TimeSpan ErrorResponseDelay = TimeSpan.FromMilliseconds(500); + + internal static Dictionary GenerateJSMappingTable(string domain) + { + var mappings = new Dictionary(StringComparer.Ordinal) + { + ["INFO"] = JsApiSubjects.JsApiAccountInfo, + ["STREAM.>"] = "$JS.API.STREAM.>", + ["CONSUMER.>"] = "$JS.API.CONSUMER.>", + ["DIRECT.>"] = "$JS.API.DIRECT.>", + ["META.>"] = "$JS.API.META.>", + ["SERVER.>"] = "$JS.API.SERVER.>", + ["ACCOUNT.>"] = "$JS.API.ACCOUNT.>", + ["$KV.>"] = "$KV.>", + ["$OBJ.>"] = "$OBJ.>", + }; + + var table = new Dictionary(StringComparer.Ordinal); + foreach (var (suffix, target) in mappings) + table[$"$JS.{domain}.API.{suffix}"] = target; + + return table; + } + + internal static void AddDelayedResponse(ref DelayedApiResponse? head, ref DelayedApiResponse? tail, DelayedApiResponse response) + { + if (head is null) + { + head = response; + tail = response; + return; + } + + if (tail is not null && response.DeadlineUtc >= tail.DeadlineUtc) + { + tail.Next = response; + tail = response; + return; + } + + DelayedApiResponse? previous = null; + for (var current = head; current is not null; current = current.Next) + { + if (response.DeadlineUtc < current.DeadlineUtc) + { + response.Next = current; + if (previous is null) + head = response; + else + previous.Next = response; + return; + } + + previous = current; + } + + if (tail is not null) + tail.Next = response; + tail = response; + } + + internal static string StreamNameFromSubject(string subject) => + SubscriptionIndex.TokenAt(subject, 5); + + internal static string ConsumerNameFromSubject(string subject) => + SubscriptionIndex.TokenAt(subject, 6); + + internal static bool IsJSONObjectOrArray(ReadOnlySpan payload) + { + for (var i = 0; i < payload.Length; i++) + { + var ch = payload[i]; + if (ch is (byte)' ' or (byte)'\t' or (byte)'\r' or (byte)'\n') + continue; + + return ch is (byte)'{' or (byte)'['; + } + + return false; + } + + internal static bool IsEmptyRequest(ReadOnlySpan payload) + { + if (payload.IsEmpty) + return true; + + var start = 0; + var end = payload.Length - 1; + while (start < payload.Length && payload[start] is (byte)' ' or (byte)'\t' or (byte)'\r' or (byte)'\n') + start++; + while (end >= start && payload[end] is (byte)' ' or (byte)'\t' or (byte)'\r' or (byte)'\n') + end--; + if (end < start) + return true; + + var trimmed = payload.Slice(start, end - start + 1); + if (trimmed.SequenceEqual("{}"u8)) + return true; + + try + { + using var document = JsonDocument.Parse(trimmed.ToArray()); + return document.RootElement.ValueKind == JsonValueKind.Object && + !document.RootElement.EnumerateObject().Any(); + } + catch + { + return false; + } + } + + internal static bool SubjectMatches(string pattern, string subject) + { + var p = pattern.Split('.', StringSplitOptions.None); + var s = subject.Split('.', StringSplitOptions.None); + + for (var i = 0; i < p.Length; i++) + { + if (i >= s.Length) + return p[i] == ">"; + + var token = p[i]; + if (token == ">") + return true; + if (token == "*") + continue; + if (!string.Equals(token, s[i], StringComparison.Ordinal)) + return false; + } + + return p.Length == s.Length; + } + + internal static T? DeserializeStrict(byte[] payload, bool strictMode) + { + var strictOptions = new JsonSerializerOptions + { + PropertyNameCaseInsensitive = true, + UnmappedMemberHandling = JsonUnmappedMemberHandling.Disallow, + }; + + try + { + return JsonSerializer.Deserialize(payload, strictOptions); + } + catch when (!strictMode) + { + return JsonSerializer.Deserialize(payload); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs index 6237f85..2cfdb00 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs @@ -1,3 +1,6 @@ +using System.Text; +using ZB.MOM.NatsNet.Server.Internal; + namespace ZB.MOM.NatsNet.Server; internal sealed class JetStreamEngine(JetStream state) @@ -7,6 +10,7 @@ internal sealed class JetStreamEngine(JetStream state) private const string JsWillExtend = "will_extend"; private const string JsNoExtend = "no_extend"; private const string JsDomainApiTemplate = "$JS.{0}.API.>"; + private const string JsApiAccountPrefix = "$JS.API.ACCOUNT."; internal void SetStarted() { @@ -21,6 +25,91 @@ internal sealed class JetStreamEngine(JetStream state) } } + internal void ApiDispatch(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + // These meta/system directives are handled elsewhere. + if (subject == JsApiSubjects.JsApiLeaderStepDown || + subject == JsApiSubjects.JsApiRemoveServer || + subject.StartsWith(JsApiAccountPrefix, StringComparison.Ordinal)) + { + return; + } + + if (_state.Server is not NatsServer server) + return; + + var route = server.MatchJetStreamApiSubscription(subject); + var (header, payload) = c.MsgParts(rawMessage); + var requestBody = Encoding.UTF8.GetString(payload); + + if (NatsMessageHeaders.GetHeader(AccountEventConstants.ClientInfoHeader, header) is not { Length: > 0 }) + { + var systemAccount = server.SystemAccount(); + if (!ReferenceEquals(systemAccount, acc)) + return; + + if (subject != JsApiSubjects.JsApiAccountInfo) + { + if (c.Kind is ClientKind.Client or ClientKind.Leaf) + { + var response = new ApiResponse + { + Type = JsApiSubjects.JsApiSystemResponseType, + Error = JsApiErrors.NewJSNotEnabledForAccountError(), + }; + server.SendAPIErrResponse(null, acc, subject, reply, requestBody, server.JsonResponse(response)); + } + + return; + } + } + + if (route is null) + { + if (c.Kind is ClientKind.Client or ClientKind.Leaf) + { + var (_, requestAcc, _, _, _) = server.GetRequestInfo(c, rawMessage); + if (requestAcc is not null && !ReferenceEquals(requestAcc, server.SystemAccount())) + { + var response = new ApiResponse + { + Type = JsApiSubjects.JsApiSystemResponseType, + Error = JsApiErrors.NewJSBadRequestError(), + }; + server.SendAPIErrResponse(null, requestAcc, subject, reply, requestBody, server.JsonResponse(response)); + } + } + + return; + } + + Interlocked.Add(ref _state.ApiInflight, 1); + + var request = new JsApiRoutedRequest + { + Subscription = route, + Account = acc, + Subject = subject, + Reply = reply, + Message = rawMessage.ToArray(), + }; + + var (queued, dropped) = server.EnqueueJSApiRequest(_state, request); + if (!queued) + { + if (dropped > 0) + { + server.PublishAdvisory( + server.SystemAccount(), + JsApiSubjects.JsAdvisoryApiLimitReached, + new { Type = "io.nats.jetstream.advisory.v1.api_limit_reached", Dropped = dropped, Time = DateTime.UtcNow }); + } + + route.Handler(c, acc, subject, reply, rawMessage); + Interlocked.Add(ref _state.ApiInflight, -1); + } + } + internal bool IsEnabled() => Interlocked.CompareExchange(ref _state.Disabled, 0, 0) == 0; internal void SetJetStreamStandAlone(bool isStandAlone) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs index d8978cc..f9fc6ec 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs @@ -245,6 +245,38 @@ internal sealed partial class JsAccount } } + internal long TieredReservation(string tier, StreamConfig cfg) + { + long reservation = 0; + + Lock.EnterReadLock(); + try + { + foreach (var stream in Streams.Values.OfType()) + { + var streamCfg = stream.Config; + if (streamCfg.MaxBytes <= 0 || string.Equals(streamCfg.Name, cfg.Name, StringComparison.Ordinal)) + continue; + + if (string.IsNullOrEmpty(tier)) + { + if (streamCfg.Storage == cfg.Storage) + reservation += Math.Max(1, streamCfg.Replicas) * streamCfg.MaxBytes; + continue; + } + + if (streamCfg.Replicas == cfg.Replicas && JetStreamEngine.IsSameTier(streamCfg, cfg)) + reservation += Math.Max(1, streamCfg.Replicas) * streamCfg.MaxBytes; + } + + return reservation; + } + finally + { + Lock.ExitReadLock(); + } + } + internal (ulong Mem, ulong Store) StorageTotals() { UsageLock.EnterReadLock(); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs new file mode 100644 index 0000000..9c84838 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs @@ -0,0 +1,1511 @@ +using System.Text; +using System.Text.Json; +using System.Text.Json.Serialization; +using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + private readonly Lock _jsApiSubsLock = new(); + private readonly List _jsApiSubscriptions = []; + private IpQueue? _jsApiRoutedReqs; + private IpQueue? _delayedApiResponses; + + internal void ProcessJSAPIRoutedRequests() + { + var queue = _jsApiRoutedReqs; + if (queue is null) + return; + + var client = CreateInternalJetStreamClient(); + var token = _quitCts.Token; + + while (!token.IsCancellationRequested) + { + try + { + if (!queue.Ch.WaitToReadAsync(token).AsTask().GetAwaiter().GetResult()) + break; + + while (true) + { + var (request, ok) = queue.PopOne(); + if (!ok) + break; + + var start = DateTime.UtcNow; + request.Subscription.Handler(client, request.Account, request.Subject, request.Reply, request.Message); + var elapsed = DateTime.UtcNow - start; + if (elapsed >= TimeSpan.FromSeconds(1)) + Warnf("Internal subscription on '{0}' took too long: {1}", request.Subject, elapsed); + + var js = GetJetStreamState(); + if (js is not null) + Interlocked.Add(ref js.ApiInflight, -1); + } + } + catch (OperationCanceledException) + { + return; + } + catch (Exception ex) + { + Warnf("JetStream routed API worker failed: {0}", ex.Message); + } + } + } + + internal Exception? SetJetStreamExportSubs() + { + var js = GetJetStreamState(); + if (js is null) + return new InvalidOperationException(JsApiErrors.NewJSNotEnabledError().Description ?? "jetstream not enabled"); + + var workers = Math.Min(Math.Max(Environment.ProcessorCount, 1), 16); + + _jsApiRoutedReqs = IpQueue.NewIPQueue("Routed JS API Requests"); + _delayedApiResponses = IpQueue.NewIPQueue("Delayed JS API Responses"); + Interlocked.Exchange(ref js.QueueLimit, JsApiSubjects.JsDefaultRequestQueueLimit); + + for (var i = 0; i < workers; i++) + StartGoRoutine(ProcessJSAPIRoutedRequests); + + StartGoRoutine(DelayedAPIResponder); + + lock (_jsApiSubsLock) + { + _jsApiSubscriptions.Clear(); + _jsApiSubscriptions.AddRange( + [ + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiAccountInfo, Handler = JsAccountInfoRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamCreate, Handler = JsStreamCreateRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamUpdate, Handler = JsStreamUpdateRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreams, Handler = JsStreamNamesRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamList, Handler = JsStreamListRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamInfo, Handler = JsStreamInfoRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamDelete, Handler = JsStreamDeleteRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiMsgDelete, Handler = JsMsgDeleteRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiMsgGet, Handler = JsMsgGetRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamPurge, Handler = JsStreamPurgeRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamRestore, Handler = JsStreamRestoreRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamSnapshot, Handler = JsStreamSnapshotRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerCreateEx, Handler = JsConsumerCreateRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerCreate, Handler = JsConsumerCreateRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiDurableCreate, Handler = JsConsumerCreateRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumers, Handler = JsConsumerNamesRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerList, Handler = JsConsumerListRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerInfo, Handler = JsConsumerInfoRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerDelete, Handler = JsConsumerDeleteRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerPause, Handler = JsConsumerPauseRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerUnpin, Handler = JsConsumerUnpinRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamLeaderStepDown, Handler = JsStreamLeaderStepDownRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerLeaderStepDown, Handler = JsConsumerLeaderStepDownRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamRemovePeer, Handler = JsStreamRemovePeerRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiRemoveServer, Handler = JsLeaderServerRemoveRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiServerStreamMove, Handler = JsLeaderServerStreamMoveRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiServerStreamCancelMove, Handler = JsLeaderServerStreamCancelMoveRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiAccountPurge, Handler = JsLeaderAccountPurgeRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiLeaderStepDown, Handler = JsLeaderStepDownRequest }, + ]); + } + + var sys = SystemAccount(); + if (sys is null) + return null; + + var err = sys.AddServiceExport(JsApiSubjects.JsAllApi, null); + if (err is not null) + Warnf("Error setting up jetstream service exports: {0}", err.Message); + return err; + } + + internal JetStreamApiSubscription? MatchJetStreamApiSubscription(string subject) + { + lock (_jsApiSubsLock) + { + foreach (var sub in _jsApiSubscriptions) + { + if (JetStreamApi.SubjectMatches(sub.Subject, subject)) + return sub; + } + } + + return null; + } + + internal (bool Queued, long Dropped) EnqueueJSApiRequest(JetStream js, JsApiRoutedRequest request) + { + var queue = _jsApiRoutedReqs; + if (queue is null) + return (false, 0); + + var (pending, error) = queue.Push(request); + var limit = Interlocked.Read(ref js.QueueLimit); + if (limit <= 0) + limit = JsApiSubjects.JsDefaultRequestQueueLimit; + + if (error is null && pending < limit) + return (true, 0); + + RateLimitFormatWarnf("JetStream API queue limit reached, dropping {0} requests", pending); + var drained = queue.Drain(); + if (drained > 0) + Interlocked.Add(ref js.ApiInflight, -drained); + + return (false, drained); + } + + internal void SendAPIResponse(ClientInfo? ci, Account acc, string subject, string reply, string request, string response) + { + acc.TrackAPI(); + if (!string.IsNullOrWhiteSpace(reply)) + _ = SendInternalAccountMsgWithReply(acc, reply, string.Empty, null, response, false); + + SendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response); + } + + internal void SendAPIErrResponse(ClientInfo? ci, Account acc, string subject, string reply, string request, string response) + { + acc.TrackAPIErr(); + if (!string.IsNullOrWhiteSpace(reply)) + _ = SendInternalAccountMsgWithReply(acc, reply, string.Empty, null, response, false); + + SendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response); + } + + internal void DelayedAPIResponder() + { + var queue = _delayedApiResponses; + if (queue is null) + return; + + var token = _quitCts.Token; + DelayedApiResponse? head = null; + DelayedApiResponse? tail = null; + + while (!token.IsCancellationRequested) + { + try + { + while (true) + { + var (entry, ok) = queue.PopOne(); + if (!ok) + break; + JetStreamApi.AddDelayedResponse(ref head, ref tail, entry); + } + + var now = DateTime.UtcNow; + if (head is not null && head.DeadlineUtc <= now) + { + var entry = head; + head = head.Next; + if (head is null) + tail = null; + + if (entry.RawResponse) + _ = SendInternalAccountMsgWithReply(entry.Account, entry.Subject, string.Empty, entry.Header, entry.Response, false); + else + SendAPIErrResponse(entry.ClientInfo, entry.Account, entry.Subject, entry.Reply, entry.Request, entry.Response); + + continue; + } + + var wait = head is null + ? TimeSpan.FromSeconds(1) + : head.DeadlineUtc - now; + + if (wait < TimeSpan.Zero) + wait = TimeSpan.Zero; + + _ = Task.WhenAny( + queue.Ch.WaitToReadAsync(token).AsTask(), + Task.Delay(wait, token)).GetAwaiter().GetResult(); + } + catch (OperationCanceledException) + { + return; + } + catch (Exception ex) + { + Warnf("Delayed JetStream API responder failed: {0}", ex.Message); + } + } + } + + internal void SendDelayedAPIErrResponse( + ClientInfo? ci, + Account acc, + string subject, + string reply, + string request, + string response, + TimeSpan duration) + { + _delayedApiResponses?.Push(new DelayedApiResponse + { + ClientInfo = ci, + Account = acc, + Subject = subject, + Reply = reply, + Request = request, + Response = response, + DeadlineUtc = DateTime.UtcNow.Add(duration), + RawResponse = false, + }); + } + + internal void SendDelayedErrResponse(Account acc, string subject, byte[]? header, string response, TimeSpan duration) + { + _delayedApiResponses?.Push(new DelayedApiResponse + { + Account = acc, + Subject = subject, + Reply = string.Empty, + Header = header, + Response = response, + DeadlineUtc = DateTime.UtcNow.Add(duration), + RawResponse = true, + }); + } + + internal (ClientInfo? ClientInfo, Account? Account, byte[] Header, byte[] Message, Exception? Error) GetRequestInfo(ClientConnection c, byte[] raw) + { + var (hdr, msg) = c.MsgParts(raw); + + ClientInfo? clientInfo = null; + var clientInfoBytes = NatsMessageHeaders.GetHeader(AccountEventConstants.ClientInfoHeader, hdr); + if (clientInfoBytes is { Length: > 0 }) + { + try + { + clientInfo = JsonSerializer.Deserialize(clientInfoBytes); + } + catch (Exception ex) + { + return (null, null, hdr, msg, ex); + } + } + + Account? acc = null; + var serviceAccount = clientInfo?.ServiceAccount(); + if (!string.IsNullOrWhiteSpace(serviceAccount)) + (acc, _) = LookupAccount(serviceAccount); + + if (acc is null) + { + acc = c.Account() as Account; + acc ??= SystemAccount(); + } + + if (acc is null) + return (clientInfo, null, hdr, msg, ServerErrors.ErrMissingAccount); + + return (clientInfo, acc, hdr, msg, null); + } + + internal Exception? UnmarshalRequest(ClientConnection c, Account acc, string subject, byte[] message, out T? value) + { + var strictMode = JetStreamConfig()?.Strict ?? true; + value = JetStreamApi.DeserializeStrict(message, strictMode); + if (value is not null) + return null; + + var err = new InvalidOperationException("unable to deserialize JetStream API request"); + c.RateLimitWarnf("Invalid JetStream request '{0} > {1}': {2}", acc.Name, subject, err.Message); + return err; + } + + internal Exception? UnmarshalRequest(ClientConnection c, Account acc, string subject, byte[] message, object destination) + { + var strictMode = JetStreamConfig()?.Strict ?? true; + object? parsed; + + try + { + parsed = JsonSerializer.Deserialize(message, destination.GetType(), new JsonSerializerOptions + { + PropertyNameCaseInsensitive = true, + UnmappedMemberHandling = JsonUnmappedMemberHandling.Disallow, + }); + } + catch (Exception ex) when (!strictMode) + { + try + { + parsed = JsonSerializer.Deserialize(message, destination.GetType()); + } + catch + { + c.RateLimitWarnf("Invalid JetStream request '{0} > {1}': {2}", acc.Name, subject, ex.Message); + return ex; + } + } + catch (Exception ex) + { + c.RateLimitWarnf("Invalid JetStream request '{0} > {1}': {2}", acc.Name, subject, ex.Message); + return ex; + } + + if (parsed is null) + return new InvalidOperationException("empty JetStream API request body"); + + foreach (var property in destination.GetType().GetProperties()) + { + if (!property.CanRead || !property.CanWrite) + continue; + property.SetValue(destination, property.GetValue(parsed)); + } + + return null; + } + + internal void JsAccountInfoRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + if (!JetStreamEnabled()) + return; + + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + { + Warnf("Malformed JetStream API Request: {0}", Encoding.UTF8.GetString(msg)); + return; + } + + var response = new JsApiAccountInfoResponse + { + Type = JsApiSubjects.JsApiAccountInfoResponseType, + }; + + var requiredApiLevel = NatsMessageHeaders.GetHeader(JsApiSubjects.JsRequiredApiLevel, hdr); + var reqApiHeader = requiredApiLevel is null ? null : Encoding.ASCII.GetString(requiredApiLevel); + if (JetStreamVersioning.ErrorOnRequiredApiLevel(reqApiHeader)) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (hasJetStream, shouldError) = requestAcc.CheckJetStream(); + if (!hasJetStream) + { + if (!shouldError) + return; + + response.Error = JsApiErrors.NewJSNotEnabledForAccountError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var stats = requestAcc.JetStreamUsage(); + response.Memory = stats.Memory; + response.Store = stats.Store; + response.ReservedMemory = stats.ReservedMemory; + response.ReservedStore = stats.ReservedStore; + response.Streams = stats.Streams; + response.Consumers = stats.Consumers; + response.Limits = stats.Limits; + response.Domain = stats.Domain; + response.Api = stats.Api; + response.Tiers = stats.Tiers; + + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsStreamCreateRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamCreateResponse { Type = JsApiSubjects.JsApiStreamCreateResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (hasJetStream, shouldError) = requestAcc.CheckJetStream(); + if (!hasJetStream) + { + if (shouldError) + { + response.Error = JsApiErrors.NewJSNotEnabledForAccountError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + return; + } + + var request = new StreamConfigRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = StreamNameFromSubject(subject); + if (!string.Equals(streamName, request.Config.Name, StringComparison.Ordinal)) + { + response.Error = JsApiErrors.NewJSStreamMismatchError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + if (streamName.IndexOfAny(['\\', '/']) >= 0) + { + response.Error = JsApiErrors.NewJSStreamNameContainsPathSeparatorsError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var jsa = requestAcc.JetStream; + if (jsa is null) + return; + + jsa.Lock.EnterWriteLock(); + try + { + if (jsa.Streams.ContainsKey(streamName)) + { + response.Error = JsApiErrors.NewJSStreamNameExistError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var stream = NatsStream.Create(requestAcc, request.Config, jsa, null, null, this); + if (stream is null) + { + response.Error = JsApiErrors.NewJSStreamCreateError(new InvalidOperationException("stream creation failed")); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + jsa.Streams[streamName] = stream; + response.Config = stream.GetConfig(); + response.State = stream.State(); + response.DidCreate = true; + } + finally + { + jsa.Lock.ExitWriteLock(); + } + + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsStreamUpdateRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamUpdateResponse { Type = JsApiSubjects.JsApiStreamUpdateResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (hasJetStream, shouldError) = requestAcc.CheckJetStream(); + if (!hasJetStream) + { + if (shouldError) + { + response.Error = JsApiErrors.NewJSNotEnabledForAccountError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + return; + } + + var request = new StreamConfigRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = StreamNameFromSubject(subject); + if (!string.Equals(streamName, request.Config.Name, StringComparison.Ordinal)) + { + response.Error = JsApiErrors.NewJSStreamMismatchError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + stream.UpdateConfig(request.Config); + response.Info = stream.GetInfo(); + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsStreamNamesRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamNamesResponse + { + Type = JsApiSubjects.JsApiStreamNamesResponseType, + Streams = [], + Limit = JsApiSubjects.JsApiNamesLimit, + }; + + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (hasJetStream, shouldError) = requestAcc.CheckJetStream(); + if (!hasJetStream) + { + if (shouldError) + { + response.Error = JsApiErrors.NewJSNotEnabledForAccountError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + return; + } + + var offset = 0; + var filter = string.Empty; + if (JetStreamApi.IsJSONObjectOrArray(msg)) + { + var request = new JsApiStreamNamesRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + offset = request.Offset; + filter = request.Subject ?? string.Empty; + } + + var streams = requestAcc.FilteredStreams(filter) + .OrderBy(s => s.Config.Name, StringComparer.Ordinal) + .Select(s => s.Config.Name) + .ToList(); + + response.Total = streams.Count; + if (offset > streams.Count) + offset = streams.Count; + response.Offset = offset; + response.Streams = streams.Skip(offset).Take(JsApiSubjects.JsApiNamesLimit).ToList(); + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsStreamListRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamListResponse + { + Type = JsApiSubjects.JsApiStreamListResponseType, + Streams = [], + Limit = JsApiSubjects.JsApiListLimit, + }; + + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (hasJetStream, shouldError) = requestAcc.CheckJetStream(); + if (!hasJetStream) + { + if (shouldError) + { + response.Error = JsApiErrors.NewJSNotEnabledForAccountError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + return; + } + + var offset = 0; + var filter = string.Empty; + if (JetStreamApi.IsJSONObjectOrArray(msg)) + { + var request = new JsApiStreamListRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + offset = request.Offset; + filter = request.Subject ?? string.Empty; + } + + var streams = requestAcc.FilteredStreams(filter) + .OrderBy(s => s.Config.Name, StringComparer.Ordinal) + .ToList(); + + response.Total = streams.Count; + if (offset > streams.Count) + offset = streams.Count; + response.Offset = offset; + response.Streams = streams.Skip(offset).Take(JsApiSubjects.JsApiListLimit).Select(s => s.GetInfo()).ToList(); + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsStreamInfoRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamInfoResponse { Type = JsApiSubjects.JsApiStreamInfoResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (hasJetStream, shouldError) = requestAcc.CheckJetStream(); + if (!hasJetStream) + { + if (shouldError) + { + response.Error = JsApiErrors.NewJSNotEnabledForAccountError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + return; + } + + var streamName = StreamNameFromSubject(subject); + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Info = stream.GetInfo(); + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsStreamLeaderStepDownRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamLeaderStepDownResponse { Type = JsApiSubjects.JsApiStreamLeaderStepDownResponseType }; + if (!JetStreamEnabledForDomain()) + { + response.Error = JsApiErrors.NewJSClusterRequiredError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Success = true; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsConsumerLeaderStepDownRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiConsumerLeaderStepDownResponse { Type = JsApiSubjects.JsApiConsumerLeaderStepDownResponseType }; + if (!JetStreamEnabledForDomain()) + { + response.Error = JsApiErrors.NewJSClusterRequiredError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Success = true; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsStreamRemovePeerRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamRemovePeerResponse { Type = JsApiSubjects.JsApiStreamRemovePeerResponseType }; + if (!JetStreamEnabledForDomain()) + { + response.Error = JsApiErrors.NewJSClusterRequiredError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + if (JetStreamApi.IsEmptyRequest(msg)) + { + response.Error = JsApiErrors.NewJSBadRequestError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Success = true; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsLeaderServerRemoveRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null || !ReferenceEquals(requestAcc, SystemAccount())) + return; + + var response = new JsApiMetaServerRemoveResponse { Type = JsApiSubjects.JsApiMetaServerRemoveResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + if (JetStreamApi.IsEmptyRequest(msg)) + { + response.Error = JsApiErrors.NewJSBadRequestError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var request = new JsApiMetaServerRemoveRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Success = !string.IsNullOrWhiteSpace(request.PeerId) || !string.IsNullOrWhiteSpace(request.Server); + if (!response.Success) + response.Error = JsApiErrors.NewJSClusterServerNotMemberError(); + + if (response.Error is null) + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + else + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal string[] PeerSetToNames(string[] peers) + { + var names = new string[peers.Length]; + for (var i = 0; i < peers.Length; i++) + { + var info = GetNodeInfo(peers[i]); + names[i] = info?.Name ?? peers[i]; + } + + return names; + } + + internal string NameToPeer(JetStream js, string serverName, string clusterName, string domainName) + { + _ = js; + foreach (var (nodeId, _) in _nodeToInfo) + { + var info = GetNodeInfo(nodeId); + if (info is null || !string.Equals(info.Name, serverName, StringComparison.Ordinal)) + continue; + if (!string.IsNullOrWhiteSpace(clusterName) && !string.Equals(info.Cluster, clusterName, StringComparison.Ordinal)) + continue; + if (!string.IsNullOrWhiteSpace(domainName) && !string.Equals(info.Domain, domainName, StringComparison.Ordinal)) + continue; + return nodeId; + } + + return string.Empty; + } + + internal void JsLeaderServerStreamMoveRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamUpdateResponse { Type = JsApiSubjects.JsApiStreamUpdateResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var req = new JsApiMetaServerStreamMoveRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, req) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = Internal.DataStructures.SubscriptionIndex.TokenAt(subject, 7); + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Info = stream.GetInfo(); + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsLeaderServerStreamCancelMoveRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamUpdateResponse { Type = JsApiSubjects.JsApiStreamUpdateResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = Internal.DataStructures.SubscriptionIndex.TokenAt(subject, 7); + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Info = stream.GetInfo(); + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsLeaderAccountPurgeRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null || !ReferenceEquals(requestAcc, SystemAccount())) + return; + + var response = new JsApiAccountPurgeResponse { Type = JsApiSubjects.JsApiAccountPurgeResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Initiated = true; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsLeaderStepDownRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null || !ReferenceEquals(requestAcc, SystemAccount())) + return; + + var response = new JsApiLeaderStepDownResponse { Type = JsApiSubjects.JsApiLeaderStepDownResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Success = true; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsStreamDeleteRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamDeleteResponse { Type = JsApiSubjects.JsApiStreamDeleteResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + if (!IsEmptyRequest(msg)) + { + response.Error = JsApiErrors.NewJSNotEmptyRequestError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = StreamNameFromSubject(subject); + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + requestAcc.JetStream?.Lock.EnterWriteLock(); + try + { + stream.Delete(); + requestAcc.JetStream?.Streams.Remove(streamName); + response.Success = true; + } + finally + { + requestAcc.JetStream?.Lock.ExitWriteLock(); + } + + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsMsgDeleteRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiMsgDeleteResponse { Type = JsApiSubjects.JsApiMsgDeleteResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var request = new JsApiMsgDeleteRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = SubscriptionIndex.TokenAt(subject, 6); + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream?.Store is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (removed, removeErr) = request.NoErase + ? stream.Store.RemoveMsg(request.Seq) + : stream.Store.EraseMsg(request.Seq); + if (removeErr is not null) + { + response.Error = JsApiErrors.NewJSStreamMsgDeleteFailedError(removeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Success = removed; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsMsgGetRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiMsgGetResponse { Type = JsApiSubjects.JsApiMsgGetResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var request = new JsApiMsgGetRequest(); + if (JetStreamApi.IsJSONObjectOrArray(msg)) + { + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + } + + var streamName = SubscriptionIndex.TokenAt(subject, 6); + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream?.Store is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var storeMsg = stream.Store.LoadMsg(request.Seq, null); + if (storeMsg is null) + { + response.Error = JsApiErrors.NewJSNoMessageFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Message = new StoredMsg(); + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsStreamPurgeRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamPurgeResponse { Type = JsApiSubjects.JsApiStreamPurgeResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var request = new JsApiStreamPurgeRequest(); + if (JetStreamApi.IsJSONObjectOrArray(msg) && + UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = StreamNameFromSubject(subject); + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream?.Store is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + (ulong purged, Exception? purgeErr) result = string.IsNullOrWhiteSpace(request.Subject) && request.Sequence == 0 && request.Keep == 0 + ? stream.Store.Purge() + : stream.Store.PurgeEx(request.Subject ?? string.Empty, request.Sequence, request.Keep); + + if (result.purgeErr is not null) + { + response.Error = JsApiErrors.NewJSStreamPurgeFailedError(result.purgeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Success = true; + response.Purged = result.purged; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsConsumerUnpinRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiConsumerUnpinResponse { Type = JsApiSubjects.JsApiConsumerUnpinResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var request = new JsApiConsumerUnpinRequest(); + if (JetStreamApi.IsJSONObjectOrArray(msg) && + UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsStreamRestoreRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamRestoreResponse { Type = JsApiSubjects.JsApiStreamRestoreResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var request = new JsApiStreamRestoreRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = StreamNameFromSubject(subject); + var restoreErr = ProcessStreamRestore(requestAcc, streamName, request); + if (restoreErr is not null) + { + response.Error = JsApiErrors.NewJSStreamRestoreError(restoreErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.DeliverSubject = $"$JSC.R.{streamName}"; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal Exception? ProcessStreamRestore(Account acc, string streamName, JsApiStreamRestoreRequest request) + { + _ = request; + var (stream, lookupErr) = acc.LookupStream(streamName); + if (lookupErr is not null || stream is null) + return lookupErr ?? new InvalidOperationException("stream not found"); + + stream.UpdateConfig(stream.GetConfig()); + return null; + } + + internal void JsStreamSnapshotRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamSnapshotResponse { Type = JsApiSubjects.JsApiStreamSnapshotResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var request = new JsApiStreamSnapshotRequest(); + if (JetStreamApi.IsJSONObjectOrArray(msg) && + UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = StreamNameFromSubject(subject); + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (info, snapErr) = StreamSnapshot(stream, request); + if (snapErr is not null || info is null) + { + response.Error = JsApiErrors.NewJSStreamSnapshotError(snapErr ?? new InvalidOperationException("snapshot failed")); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Config = info.Config; + response.State = info.State; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal (StreamInfo? Info, Exception? Error) StreamSnapshot(NatsStream stream, JsApiStreamSnapshotRequest request) + { + if (stream.Store is null) + return (null, new InvalidOperationException("stream store not initialized")); + + var (_, err) = stream.Store.Snapshot(TimeSpan.FromSeconds(30), !request.NoConsumers, request.CheckMsgs); + if (err is not null) + return (null, err); + return (stream.GetInfo(), null); + } + + internal void JsConsumerCreateRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiConsumerCreateResponse { Type = JsApiSubjects.JsApiConsumerCreateResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Error = JsApiErrors.NewJSClusterUnSupportFeatureError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsConsumerNamesRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiConsumerNamesResponse + { + Type = JsApiSubjects.JsApiConsumerNamesResponseType, + Consumers = [], + Limit = JsApiSubjects.JsApiNamesLimit, + Total = 0, + Offset = 0, + }; + + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsConsumerListRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiConsumerListResponse + { + Type = JsApiSubjects.JsApiConsumerListResponseType, + Consumers = [], + Limit = JsApiSubjects.JsApiListLimit, + Total = 0, + Offset = 0, + }; + + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsConsumerInfoRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiConsumerInfoResponse { Type = JsApiSubjects.JsApiConsumerInfoResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Error = JsApiErrors.NewJSConsumerNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsConsumerDeleteRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiConsumerDeleteResponse { Type = JsApiSubjects.JsApiConsumerDeleteResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Success = true; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsConsumerPauseRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiConsumerPauseResponse { Type = JsApiSubjects.JsApiConsumerPauseResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var request = new JsApiConsumerPauseRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Paused = request.PauseUntil > DateTime.UtcNow; + response.PauseUntil = request.PauseUntil; + response.PauseRemaining = request.PauseUntil <= DateTime.UtcNow ? TimeSpan.Zero : request.PauseUntil - DateTime.UtcNow; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal static bool IsEmptyRequest(byte[] request) => JetStreamApi.IsEmptyRequest(request); + + internal (string PreferredLeader, JsApiError? Error) GetStepDownPreferredPlacement(IRaftNode group, Placement? placement) + { + if (placement is null) + return (string.Empty, null); + + if (!string.IsNullOrWhiteSpace(placement.Preferred)) + { + foreach (var peer in group.Peers()) + { + var info = GetNodeInfo(peer.Id); + if (info is not null && string.Equals(info.Name, placement.Preferred, StringComparison.Ordinal)) + { + if (peer.Id == group.ID()) + { + return (string.Empty, JsApiErrors.NewJSClusterNoPeersError( + new InvalidOperationException($"preferred server '{placement.Preferred}' is already leader"))); + } + + return (peer.Id, null); + } + } + + return (string.Empty, JsApiErrors.NewJSClusterNoPeersError( + new InvalidOperationException($"preferred server '{placement.Preferred}' not known"))); + } + + foreach (var peer in group.Peers()) + { + if (peer.Id == group.ID()) + continue; + + var info = GetNodeInfo(peer.Id); + if (info is null || info.Offline) + continue; + + if (!string.IsNullOrWhiteSpace(placement.Cluster) && + !string.Equals(info.Cluster, placement.Cluster, StringComparison.Ordinal)) + { + continue; + } + + if (placement.Tags is { Length: > 0 } && + placement.Tags.Except(info.Tags, StringComparer.Ordinal).Any()) + { + continue; + } + + return (peer.Id, null); + } + + return (string.Empty, JsApiErrors.NewJSClusterNoPeersError(new InvalidOperationException("no replacement peer connected"))); + } + + private static string? GetRequiredApiHeader(byte[] header) + { + var value = NatsMessageHeaders.GetHeader(JsApiSubjects.JsRequiredApiLevel, header); + return value is null ? null : Encoding.ASCII.GetString(value); + } + + internal static string StreamNameFromSubject(string subject) => JetStreamApi.StreamNameFromSubject(subject); + + internal static string ConsumerNameFromSubject(string subject) => JetStreamApi.ConsumerNameFromSubject(subject); + + internal string JsonResponse(object response) + { + try + { + return JsonSerializer.Serialize(response); + } + catch (Exception ex) + { + Warnf("Problem marshaling JSON for JetStream API: {0}", ex.Message); + return string.Empty; + } + } + + internal void SendJetStreamAPIAuditAdvisory(ClientInfo? ci, Account acc, string subject, string request, string response) + { + _ = ci; + _ = acc; + _ = subject; + _ = request; + _ = response; + } + + private Exception? SendInternalAccountMsgWithReply(Account account, string subject, string reply, byte[]? header, string response, bool trackApi) + { + _ = trackApi; + + try + { + var sendQueue = account.GetSendQueue() ?? NewSendQueue(account); + SendQueue.Send(sendQueue, subject, reply, header ?? [], Encoding.UTF8.GetBytes(response)); + return null; + } + catch (Exception ex) + { + return ex; + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs index f6ee4db..9a5f983 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs @@ -268,11 +268,7 @@ public sealed partial class NatsServer internal void SetupJetStreamExports() { - var sys = SystemAccount(); - if (sys == null) - return; - - var err = sys.AddServiceExport(JsApiSubjects.JsAllApi, null); + var err = SetJetStreamExportSubs(); if (err != null) Warnf("Error setting up jetstream service exports: {0}", err.Message); } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamEngineTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamEngineTests.cs index 53f6cf2..cc7713a 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamEngineTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamEngineTests.cs @@ -15,6 +15,72 @@ public sealed class JetStreamEngineTests err!.Message.ShouldContain("unavailable"); } + + [Fact] // T:1716 + public void IsJSONObjectOrArray_ShouldSucceed() + { + var tests = new (string Name, byte[] Data, bool Valid)[] + { + ("empty", [], false), + ("empty_object", "{}"u8.ToArray(), true), + ("empty object, not trimmed", "\t\n\r{ }"u8.ToArray(), true), + ("empty_array", "[]"u8.ToArray(), true), + ("empty array, not trimmed", "\t\n\r[ ]"u8.ToArray(), true), + ("empty_string", "\"\""u8.ToArray(), false), + ("whitespace_only", " "u8.ToArray(), false), + ("object_with_whitespace", "{ }"u8.ToArray(), true), + ("array_with_whitespace", "[ ]"u8.ToArray(), true), + ("string_with_whitespace", " \"text\""u8.ToArray(), false), + ("number", "123"u8.ToArray(), false), + ("boolean_true", "true"u8.ToArray(), false), + ("boolean_false", "false"u8.ToArray(), false), + ("null_value", "null"u8.ToArray(), false), + ("invalid_json", "invalid"u8.ToArray(), false), + }; + + foreach (var test in tests) + { + JetStreamApi.IsJSONObjectOrArray(test.Data).ShouldBe(test.Valid, test.Name); + } + } + + [Fact] // T:1719 + public void JetStreamDelayedAPIResponses_ShouldSucceed() + { + var account = new Account { Name = "ACC" }; + DelayedApiResponse? head = null; + DelayedApiResponse? tail = null; + var now = DateTime.UtcNow; + + var responses = new[] + { + NewDelayed(account, "request2", "response2", now.AddMilliseconds(500)), + NewDelayed(account, "request1", "response1", now.AddMilliseconds(200)), + NewDelayed(account, "request4", "response4", now.AddMilliseconds(800)), + NewDelayed(account, "request3", "response3", now.AddMilliseconds(650)), + }; + + foreach (var response in responses) + JetStreamApi.AddDelayedResponse(ref head, ref tail, response); + + var orderedRequests = new List(); + for (var current = head; current is not null; current = current.Next) + orderedRequests.Add(current.Request); + + orderedRequests.ShouldBe(["request1", "request2", "request3", "request4"]); + } + + private static DelayedApiResponse NewDelayed(Account account, string request, string response, DateTime deadlineUtc) => + new() + { + Account = account, + Subject = "SUB", + Reply = string.Empty, + Request = request, + Response = response, + DeadlineUtc = deadlineUtc, + }; + [Fact] // T:1477 public void JetStreamMaxConsumers_ShouldSucceed() { diff --git a/reports/current.md b/reports/current.md index 71e28c1..3e78f1c 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-03-01 03:33:10 UTC +Generated: 2026-03-01 03:41:45 UTC ## Modules (12 total)