From bf115b116e0edef7b294e7dc185b2ee783c4b723 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 22:24:10 -0500 Subject: [PATCH] feat(batch28): implement jetstream api dispatch and account request core --- .../Accounts/Account.JetStream.cs | 66 +++ .../JetStream/JetStreamApi.cs | 160 +++++++ .../JetStream/JetStreamEngine.cs | 89 ++++ .../JetStream/JsAccount.Core.cs | 32 ++ .../NatsServer.JetStreamApi.cs | 430 ++++++++++++++++++ .../NatsServer.JetStreamCore.cs | 6 +- porting.db | Bin 6750208 -> 6754304 bytes 7 files changed, 778 insertions(+), 5 deletions(-) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApi.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs index c0c8865..9560f66 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs @@ -127,6 +127,72 @@ public sealed partial class Account return (server, jsa, null); } + internal void TrackAPI() + { + _mu.EnterReadLock(); + var jsa = JetStream; + _mu.ExitReadLock(); + if (jsa is null) + return; + + jsa.UsageLock.EnterWriteLock(); + try + { + jsa.UsageApi++; + jsa.ApiTotal++; + jsa.SendClusterUsageUpdate(); + + if (jsa.Js is JetStream js) + Interlocked.Add(ref js.ApiTotal, 1); + } + finally + { + jsa.UsageLock.ExitWriteLock(); + } + } + + internal void TrackAPIErr() + { + _mu.EnterReadLock(); + var jsa = JetStream; + _mu.ExitReadLock(); + if (jsa is null) + return; + + jsa.UsageLock.EnterWriteLock(); + try + { + jsa.UsageApi++; + jsa.ApiTotal++; + jsa.UsageErr++; + jsa.ApiErrors++; + jsa.SendClusterUsageUpdate(); + + if (jsa.Js is JetStream js) + { + Interlocked.Add(ref js.ApiTotal, 1); + Interlocked.Add(ref js.ApiErrors, 1); + } + } + finally + { + jsa.UsageLock.ExitWriteLock(); + } + } + + internal (bool Enabled, bool ShouldError) CheckJetStream() + { + _mu.EnterReadLock(); + try + { + return (JetStream is not null, _nleafs + _nrleafs == 0); + } + finally + { + _mu.ExitReadLock(); + } + } + internal (bool MaxBytesRequired, long MaxStreamBytes) MaxBytesLimits(StreamConfig? cfg) { _mu.EnterReadLock(); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApi.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApi.cs new file mode 100644 index 0000000..04cda3f --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApi.cs @@ -0,0 +1,160 @@ +using System.Text.Json; +using System.Text.Json.Serialization; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +internal delegate void JetStreamApiHandler(ClientConnection c, Account acc, string subject, string reply, byte[] message); + +internal sealed class JetStreamApiSubscription +{ + public required string Subject { get; init; } + public required JetStreamApiHandler Handler { get; init; } +} + +internal sealed class JsApiRoutedRequest +{ + public required JetStreamApiSubscription Subscription { get; init; } + public required Account Account { get; init; } + public required string Subject { get; init; } + public required string Reply { get; init; } + public required byte[] Message { get; init; } +} + +internal sealed class DelayedApiResponse +{ + public ClientInfo? ClientInfo { get; init; } + public required Account Account { get; init; } + public required string Subject { get; init; } + public required string Reply { get; init; } + public string Request { get; init; } = string.Empty; + public byte[]? Header { get; init; } + public required string Response { get; init; } + public DateTime DeadlineUtc { get; init; } + public bool RawResponse { get; init; } + public DelayedApiResponse? Next { get; set; } +} + +internal static class JetStreamApi +{ + internal static readonly TimeSpan ErrorResponseDelay = TimeSpan.FromMilliseconds(500); + + internal static Dictionary GenerateJSMappingTable(string domain) + { + var mappings = new Dictionary(StringComparer.Ordinal) + { + ["INFO"] = JsApiSubjects.JsApiAccountInfo, + ["STREAM.>"] = "$JS.API.STREAM.>", + ["CONSUMER.>"] = "$JS.API.CONSUMER.>", + ["DIRECT.>"] = "$JS.API.DIRECT.>", + ["META.>"] = "$JS.API.META.>", + ["SERVER.>"] = "$JS.API.SERVER.>", + ["ACCOUNT.>"] = "$JS.API.ACCOUNT.>", + ["$KV.>"] = "$KV.>", + ["$OBJ.>"] = "$OBJ.>", + }; + + var table = new Dictionary(StringComparer.Ordinal); + foreach (var (suffix, target) in mappings) + table[$"$JS.{domain}.API.{suffix}"] = target; + + return table; + } + + internal static void AddDelayedResponse(ref DelayedApiResponse? head, ref DelayedApiResponse? tail, DelayedApiResponse response) + { + if (head is null) + { + head = response; + tail = response; + return; + } + + if (tail is not null && response.DeadlineUtc >= tail.DeadlineUtc) + { + tail.Next = response; + tail = response; + return; + } + + DelayedApiResponse? previous = null; + for (var current = head; current is not null; current = current.Next) + { + if (response.DeadlineUtc < current.DeadlineUtc) + { + response.Next = current; + if (previous is null) + head = response; + else + previous.Next = response; + return; + } + + previous = current; + } + + if (tail is not null) + tail.Next = response; + tail = response; + } + + internal static string StreamNameFromSubject(string subject) => + SubscriptionIndex.TokenAt(subject, 5); + + internal static string ConsumerNameFromSubject(string subject) => + SubscriptionIndex.TokenAt(subject, 6); + + internal static bool IsJSONObjectOrArray(ReadOnlySpan payload) + { + for (var i = 0; i < payload.Length; i++) + { + var ch = payload[i]; + if (ch is (byte)' ' or (byte)'\t' or (byte)'\r' or (byte)'\n') + continue; + + return ch is (byte)'{' or (byte)'['; + } + + return false; + } + + internal static bool SubjectMatches(string pattern, string subject) + { + var p = pattern.Split('.', StringSplitOptions.None); + var s = subject.Split('.', StringSplitOptions.None); + + for (var i = 0; i < p.Length; i++) + { + if (i >= s.Length) + return p[i] == ">"; + + var token = p[i]; + if (token == ">") + return true; + if (token == "*") + continue; + if (!string.Equals(token, s[i], StringComparison.Ordinal)) + return false; + } + + return p.Length == s.Length; + } + + internal static T? DeserializeStrict(byte[] payload, bool strictMode) + { + var strictOptions = new JsonSerializerOptions + { + PropertyNameCaseInsensitive = true, + UnmappedMemberHandling = JsonUnmappedMemberHandling.Disallow, + }; + + try + { + return JsonSerializer.Deserialize(payload, strictOptions); + } + catch when (!strictMode) + { + return JsonSerializer.Deserialize(payload); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs index 522bc08..38aae48 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs @@ -1,3 +1,6 @@ +using System.Text; +using ZB.MOM.NatsNet.Server.Internal; + namespace ZB.MOM.NatsNet.Server; internal sealed class JetStreamEngine(JetStream state) @@ -7,6 +10,7 @@ internal sealed class JetStreamEngine(JetStream state) private const string JsWillExtend = "will_extend"; private const string JsNoExtend = "no_extend"; private const string JsDomainApiTemplate = "$JS.{0}.API.>"; + private const string JsApiAccountPrefix = "$JS.API.ACCOUNT."; internal void SetStarted() { @@ -21,6 +25,91 @@ internal sealed class JetStreamEngine(JetStream state) } } + internal void ApiDispatch(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + // These meta/system directives are handled elsewhere. + if (subject == JsApiSubjects.JsApiLeaderStepDown || + subject == JsApiSubjects.JsApiRemoveServer || + subject.StartsWith(JsApiAccountPrefix, StringComparison.Ordinal)) + { + return; + } + + if (_state.Server is not NatsServer server) + return; + + var route = server.MatchJetStreamApiSubscription(subject); + var (header, payload) = c.MsgParts(rawMessage); + var requestBody = Encoding.UTF8.GetString(payload); + + if (NatsMessageHeaders.GetHeader(AccountEventConstants.ClientInfoHeader, header) is not { Length: > 0 }) + { + var systemAccount = server.SystemAccount(); + if (!ReferenceEquals(systemAccount, acc)) + return; + + if (subject != JsApiSubjects.JsApiAccountInfo) + { + if (c.Kind is ClientKind.Client or ClientKind.Leaf) + { + var response = new ApiResponse + { + Type = JsApiSubjects.JsApiSystemResponseType, + Error = JsApiErrors.NewJSNotEnabledForAccountError(), + }; + server.SendAPIErrResponse(null, acc, subject, reply, requestBody, server.JsonResponse(response)); + } + + return; + } + } + + if (route is null) + { + if (c.Kind is ClientKind.Client or ClientKind.Leaf) + { + var (_, requestAcc, _, _, _) = server.GetRequestInfo(c, rawMessage); + if (requestAcc is not null && !ReferenceEquals(requestAcc, server.SystemAccount())) + { + var response = new ApiResponse + { + Type = JsApiSubjects.JsApiSystemResponseType, + Error = JsApiErrors.NewJSBadRequestError(), + }; + server.SendAPIErrResponse(null, requestAcc, subject, reply, requestBody, server.JsonResponse(response)); + } + } + + return; + } + + Interlocked.Add(ref _state.ApiInflight, 1); + + var request = new JsApiRoutedRequest + { + Subscription = route, + Account = acc, + Subject = subject, + Reply = reply, + Message = rawMessage.ToArray(), + }; + + var (queued, dropped) = server.EnqueueJSApiRequest(_state, request); + if (!queued) + { + if (dropped > 0) + { + server.PublishAdvisory( + server.SystemAccount(), + JsApiSubjects.JsAdvisoryApiLimitReached, + new { Type = "io.nats.jetstream.advisory.v1.api_limit_reached", Dropped = dropped, Time = DateTime.UtcNow }); + } + + route.Handler(c, acc, subject, reply, rawMessage); + Interlocked.Add(ref _state.ApiInflight, -1); + } + } + internal bool IsEnabled() => Interlocked.CompareExchange(ref _state.Disabled, 0, 0) == 0; internal void SetJetStreamStandAlone(bool isStandAlone) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs index 7437ae5..e6ea739 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs @@ -218,6 +218,38 @@ internal sealed partial class JsAccount } } + internal long TieredReservation(string tier, StreamConfig cfg) + { + long reservation = 0; + + Lock.EnterReadLock(); + try + { + foreach (var stream in Streams.Values.OfType()) + { + var streamCfg = stream.Config; + if (streamCfg.MaxBytes <= 0 || string.Equals(streamCfg.Name, cfg.Name, StringComparison.Ordinal)) + continue; + + if (string.IsNullOrEmpty(tier)) + { + if (streamCfg.Storage == cfg.Storage) + reservation += Math.Max(1, streamCfg.Replicas) * streamCfg.MaxBytes; + continue; + } + + if (streamCfg.Replicas == cfg.Replicas && JetStreamEngine.IsSameTier(streamCfg, cfg)) + reservation += Math.Max(1, streamCfg.Replicas) * streamCfg.MaxBytes; + } + + return reservation; + } + finally + { + Lock.ExitReadLock(); + } + } + internal (ulong Mem, ulong Store) StorageTotals() { UsageLock.EnterReadLock(); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs new file mode 100644 index 0000000..680dfd2 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs @@ -0,0 +1,430 @@ +using System.Text; +using System.Text.Json; +using System.Text.Json.Serialization; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + private readonly Lock _jsApiSubsLock = new(); + private readonly List _jsApiSubscriptions = []; + private IpQueue? _jsApiRoutedReqs; + private IpQueue? _delayedApiResponses; + + internal void ProcessJSAPIRoutedRequests() + { + var queue = _jsApiRoutedReqs; + if (queue is null) + return; + + var client = CreateInternalJetStreamClient(); + var token = _quitCts.Token; + + while (!token.IsCancellationRequested) + { + try + { + if (!queue.Ch.WaitToReadAsync(token).AsTask().GetAwaiter().GetResult()) + break; + + while (true) + { + var (request, ok) = queue.PopOne(); + if (!ok) + break; + + var start = DateTime.UtcNow; + request.Subscription.Handler(client, request.Account, request.Subject, request.Reply, request.Message); + var elapsed = DateTime.UtcNow - start; + if (elapsed >= TimeSpan.FromSeconds(1)) + Warnf("Internal subscription on '{0}' took too long: {1}", request.Subject, elapsed); + + var js = GetJetStreamState(); + if (js is not null) + Interlocked.Add(ref js.ApiInflight, -1); + } + } + catch (OperationCanceledException) + { + return; + } + catch (Exception ex) + { + Warnf("JetStream routed API worker failed: {0}", ex.Message); + } + } + } + + internal Exception? SetJetStreamExportSubs() + { + var js = GetJetStreamState(); + if (js is null) + return new InvalidOperationException(JsApiErrors.NewJSNotEnabledError().Description ?? "jetstream not enabled"); + + var workers = Math.Min(Math.Max(Environment.ProcessorCount, 1), 16); + + _jsApiRoutedReqs = IpQueue.NewIPQueue("Routed JS API Requests"); + _delayedApiResponses = IpQueue.NewIPQueue("Delayed JS API Responses"); + Interlocked.Exchange(ref js.QueueLimit, JsApiSubjects.JsDefaultRequestQueueLimit); + + for (var i = 0; i < workers; i++) + StartGoRoutine(ProcessJSAPIRoutedRequests); + + StartGoRoutine(DelayedAPIResponder); + + lock (_jsApiSubsLock) + { + _jsApiSubscriptions.Clear(); + _jsApiSubscriptions.Add(new JetStreamApiSubscription + { + Subject = JsApiSubjects.JsApiAccountInfo, + Handler = JsAccountInfoRequest, + }); + } + + var sys = SystemAccount(); + if (sys is null) + return null; + + var err = sys.AddServiceExport(JsApiSubjects.JsAllApi, null); + if (err is not null) + Warnf("Error setting up jetstream service exports: {0}", err.Message); + return err; + } + + internal JetStreamApiSubscription? MatchJetStreamApiSubscription(string subject) + { + lock (_jsApiSubsLock) + { + foreach (var sub in _jsApiSubscriptions) + { + if (JetStreamApi.SubjectMatches(sub.Subject, subject)) + return sub; + } + } + + return null; + } + + internal (bool Queued, long Dropped) EnqueueJSApiRequest(JetStream js, JsApiRoutedRequest request) + { + var queue = _jsApiRoutedReqs; + if (queue is null) + return (false, 0); + + var (pending, error) = queue.Push(request); + var limit = Interlocked.Read(ref js.QueueLimit); + if (limit <= 0) + limit = JsApiSubjects.JsDefaultRequestQueueLimit; + + if (error is null && pending < limit) + return (true, 0); + + RateLimitFormatWarnf("JetStream API queue limit reached, dropping {0} requests", pending); + var drained = queue.Drain(); + if (drained > 0) + Interlocked.Add(ref js.ApiInflight, -drained); + + return (false, drained); + } + + internal void SendAPIResponse(ClientInfo? ci, Account acc, string subject, string reply, string request, string response) + { + acc.TrackAPI(); + if (!string.IsNullOrWhiteSpace(reply)) + _ = SendInternalAccountMsgWithReply(acc, reply, string.Empty, null, response, false); + + SendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response); + } + + internal void SendAPIErrResponse(ClientInfo? ci, Account acc, string subject, string reply, string request, string response) + { + acc.TrackAPIErr(); + if (!string.IsNullOrWhiteSpace(reply)) + _ = SendInternalAccountMsgWithReply(acc, reply, string.Empty, null, response, false); + + SendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response); + } + + internal void DelayedAPIResponder() + { + var queue = _delayedApiResponses; + if (queue is null) + return; + + var token = _quitCts.Token; + DelayedApiResponse? head = null; + DelayedApiResponse? tail = null; + + while (!token.IsCancellationRequested) + { + try + { + while (true) + { + var (entry, ok) = queue.PopOne(); + if (!ok) + break; + JetStreamApi.AddDelayedResponse(ref head, ref tail, entry); + } + + var now = DateTime.UtcNow; + if (head is not null && head.DeadlineUtc <= now) + { + var entry = head; + head = head.Next; + if (head is null) + tail = null; + + if (entry.RawResponse) + _ = SendInternalAccountMsgWithReply(entry.Account, entry.Subject, string.Empty, entry.Header, entry.Response, false); + else + SendAPIErrResponse(entry.ClientInfo, entry.Account, entry.Subject, entry.Reply, entry.Request, entry.Response); + + continue; + } + + var wait = head is null + ? TimeSpan.FromSeconds(1) + : head.DeadlineUtc - now; + + if (wait < TimeSpan.Zero) + wait = TimeSpan.Zero; + + _ = Task.WhenAny( + queue.Ch.WaitToReadAsync(token).AsTask(), + Task.Delay(wait, token)).GetAwaiter().GetResult(); + } + catch (OperationCanceledException) + { + return; + } + catch (Exception ex) + { + Warnf("Delayed JetStream API responder failed: {0}", ex.Message); + } + } + } + + internal void SendDelayedAPIErrResponse( + ClientInfo? ci, + Account acc, + string subject, + string reply, + string request, + string response, + TimeSpan duration) + { + _delayedApiResponses?.Push(new DelayedApiResponse + { + ClientInfo = ci, + Account = acc, + Subject = subject, + Reply = reply, + Request = request, + Response = response, + DeadlineUtc = DateTime.UtcNow.Add(duration), + RawResponse = false, + }); + } + + internal void SendDelayedErrResponse(Account acc, string subject, byte[]? header, string response, TimeSpan duration) + { + _delayedApiResponses?.Push(new DelayedApiResponse + { + Account = acc, + Subject = subject, + Reply = string.Empty, + Header = header, + Response = response, + DeadlineUtc = DateTime.UtcNow.Add(duration), + RawResponse = true, + }); + } + + internal (ClientInfo? ClientInfo, Account? Account, byte[] Header, byte[] Message, Exception? Error) GetRequestInfo(ClientConnection c, byte[] raw) + { + var (hdr, msg) = c.MsgParts(raw); + + ClientInfo? clientInfo = null; + var clientInfoBytes = NatsMessageHeaders.GetHeader(AccountEventConstants.ClientInfoHeader, hdr); + if (clientInfoBytes is { Length: > 0 }) + { + try + { + clientInfo = JsonSerializer.Deserialize(clientInfoBytes); + } + catch (Exception ex) + { + return (null, null, hdr, msg, ex); + } + } + + Account? acc = null; + var serviceAccount = clientInfo?.ServiceAccount(); + if (!string.IsNullOrWhiteSpace(serviceAccount)) + (acc, _) = LookupAccount(serviceAccount); + + if (acc is null) + { + acc = c.Account() as Account; + acc ??= SystemAccount(); + } + + if (acc is null) + return (clientInfo, null, hdr, msg, ServerErrors.ErrMissingAccount); + + return (clientInfo, acc, hdr, msg, null); + } + + internal Exception? UnmarshalRequest(ClientConnection c, Account acc, string subject, byte[] message, out T? value) + { + var strictMode = JetStreamConfig()?.Strict ?? true; + value = JetStreamApi.DeserializeStrict(message, strictMode); + if (value is not null) + return null; + + var err = new InvalidOperationException("unable to deserialize JetStream API request"); + c.RateLimitWarnf("Invalid JetStream request '{0} > {1}': {2}", acc.Name, subject, err.Message); + return err; + } + + internal Exception? UnmarshalRequest(ClientConnection c, Account acc, string subject, byte[] message, object destination) + { + var strictMode = JetStreamConfig()?.Strict ?? true; + object? parsed; + + try + { + parsed = JsonSerializer.Deserialize(message, destination.GetType(), new JsonSerializerOptions + { + PropertyNameCaseInsensitive = true, + UnmappedMemberHandling = JsonUnmappedMemberHandling.Disallow, + }); + } + catch (Exception ex) when (!strictMode) + { + try + { + parsed = JsonSerializer.Deserialize(message, destination.GetType()); + } + catch + { + c.RateLimitWarnf("Invalid JetStream request '{0} > {1}': {2}", acc.Name, subject, ex.Message); + return ex; + } + } + catch (Exception ex) + { + c.RateLimitWarnf("Invalid JetStream request '{0} > {1}': {2}", acc.Name, subject, ex.Message); + return ex; + } + + if (parsed is null) + return new InvalidOperationException("empty JetStream API request body"); + + foreach (var property in destination.GetType().GetProperties()) + { + if (!property.CanRead || !property.CanWrite) + continue; + property.SetValue(destination, property.GetValue(parsed)); + } + + return null; + } + + internal void JsAccountInfoRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + if (!JetStreamEnabled()) + return; + + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + { + Warnf("Malformed JetStream API Request: {0}", Encoding.UTF8.GetString(msg)); + return; + } + + var response = new JsApiAccountInfoResponse + { + Type = JsApiSubjects.JsApiAccountInfoResponseType, + }; + + var requiredApiLevel = NatsMessageHeaders.GetHeader(JsApiSubjects.JsRequiredApiLevel, hdr); + var reqApiHeader = requiredApiLevel is null ? null : Encoding.ASCII.GetString(requiredApiLevel); + if (JetStreamVersioning.ErrorOnRequiredApiLevel(reqApiHeader)) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (hasJetStream, shouldError) = requestAcc.CheckJetStream(); + if (!hasJetStream) + { + if (!shouldError) + return; + + response.Error = JsApiErrors.NewJSNotEnabledForAccountError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var stats = requestAcc.JetStreamUsage(); + response.Memory = stats.Memory; + response.Store = stats.Store; + response.ReservedMemory = stats.ReservedMemory; + response.ReservedStore = stats.ReservedStore; + response.Streams = stats.Streams; + response.Consumers = stats.Consumers; + response.Limits = stats.Limits; + response.Domain = stats.Domain; + response.Api = stats.Api; + response.Tiers = stats.Tiers; + + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal static string StreamNameFromSubject(string subject) => JetStreamApi.StreamNameFromSubject(subject); + + internal static string ConsumerNameFromSubject(string subject) => JetStreamApi.ConsumerNameFromSubject(subject); + + internal string JsonResponse(object response) + { + try + { + return JsonSerializer.Serialize(response); + } + catch (Exception ex) + { + Warnf("Problem marshaling JSON for JetStream API: {0}", ex.Message); + return string.Empty; + } + } + + internal void SendJetStreamAPIAuditAdvisory(ClientInfo? ci, Account acc, string subject, string request, string response) + { + _ = ci; + _ = acc; + _ = subject; + _ = request; + _ = response; + } + + private Exception? SendInternalAccountMsgWithReply(Account account, string subject, string reply, byte[]? header, string response, bool trackApi) + { + _ = trackApi; + + try + { + var sendQueue = account.GetSendQueue() ?? NewSendQueue(account); + SendQueue.Send(sendQueue, subject, reply, header ?? [], Encoding.UTF8.GetBytes(response)); + return null; + } + catch (Exception ex) + { + return ex; + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs index f6ee4db..9a5f983 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs @@ -268,11 +268,7 @@ public sealed partial class NatsServer internal void SetupJetStreamExports() { - var sys = SystemAccount(); - if (sys == null) - return; - - var err = sys.AddServiceExport(JsApiSubjects.JsAllApi, null); + var err = SetJetStreamExportSubs(); if (err != null) Warnf("Error setting up jetstream service exports: {0}", err.Message); } diff --git a/porting.db b/porting.db index 7accf8f75e740346bf2eac5bb45849e2f10249ba..f8dd1f1e27d0ffe55ab08c18d94be1efe0da6abf 100644 GIT binary patch delta 4544 zcmb`JTWl0n7{_;JXD?^(rLC0Swgs%vZri=I*j=iCNLzZLgNRx#rQ6+EsVFVoEoi)y zMl%?ziA3U;=!6t7fd?%ug$|ceAR0|1BE-Z8HIb?$L=%I)=z|Y_yR)74&^7U4GWk9H zbH1GKa=!DOIe1Xg9~_bN?-m$N>U73^hfCHlC&#9bo0 z43{Z2B~Sgr{Gw-dZ^QAs%$tMM;w4I5q|^mUou`ycsYyzmqtsbSP1pzLM|y@@9H-P6 zrA||7lu{p4>MEtKP-@z)mskAC{E*9yvb%J^-)737tJHM_){e3Jp^ITXWzM7j=ab)J z*pm!r&u34--{)M*;bIQ!l6&;*I2m--GkH-VCiM3UQNvYdy{=AGS5TF!Y8q9csxG5) zsA?)1x2i7T+Gnb|h-y+*7f_v2)p_S&z3u~5%DDKpswPprrmAzO`c!omRkx}pP(7)t zGpHI=HI8a7r7=`;odWLWDR*jJn9myh0J z8!bj$=##>Se*2z9^azI-5F=uefBW7%-2Eu@gz6y9ke?elF)>E)_cvQu$6vXZmHL@QZA8{BOr z9chDnw;lGhk>#|3)JD>U@4IexfOpek%ktZT=5qYb~+eF?j@(z)Aio8qYvqXN8$Y+auj>zYV`~xDtSmg6Weu>EEi+q8| z7m9q*Bwzf=nRFD%Y!1&e}INC{GkEJcGUOp-1+o$;M=B5xQi)U{)rc4IA$}x) z)F8FUD&%3L4hbS5WHnNcJc2xmtU=Zy>yXEg^~mGM2BZOLM4FI|NHfxcv?6WDCZrwV zk8dib{KGZ=-`s8c0J@{&AN6POY<@ai;^>Uiy>mrfffxU^U^}YQd4^i za>|)`_GZHjAA3Nnh7L*3C79DyI8J# z?XTDxbD~= z<1@Lc)H#rvhIOf)(5h$w>A6h)FTx+6;$_U_zZMRk5H!0L0jVdG{|o=^KLA6UwVQ%L zX}98^evi&8{*&*~f~N7;X*w4|(z8kgblb|x91i^~;2h2vMh!5>wL6HQ6w6eI$+_{g z;9>TsA7-t3F(?U{LO3@U!osugrMf|@p%s*(nIeGAePx# delta 1503 zcmY+>TTC2P9LMpw?aUt7ot+jC3OhS&!9oGc($*FWcB!>ocVW4-P|9^-5n|N|(Z;?Y zwg#Gjw?z3Th8jSNn3xLHcA_B&Mx)Wh)|wb2K0wo$DoN8cG5X+xziykUefVVa{h!^L zbIuM=O^UF66uBQ>7{HGbtcQ-?<80K9?r7 zN+D^#6tK$H_!*w&*ZExQ+B&}^c$FVGZ_jhjn8#z!#}Xqb)vh{g@j8E96!+KWA?n|4J}Yj$;`uG-aw`pT|O)Frz*P;+**qu#Nr4K;38D~IW$m1kEY>i%6cpzhyAy<4<$|Kxv5k-ii zN%xbfGlfoVQ*IZ1TFth`AFyp~-@PqqKf9BO zuVH)X)VMo)BE+23Tf@BZ*S}W7YH4%7tWVU43YPrwX<>-9(d{ZFmp%!x-1z8^5Nn~C zkGwh5A7(l6(b+KTzx!tMd#6f)T9zN557e?}ADk=1>R1jd`mY2i? z`lgN*;8*U6Wv4tUM;IhPBDlZ}DtJJHO|Th~AQ@8NAxMQZ*aGR00bbB8IpX`^0trv6 zI+<_DjD8Y~_zjasG`eVJ75j=vhC*jW*>Eco?=p4&=gi$b)<+fJa~l6hZ)sU?=Q?-S8;vfxS=+kHJ1Dfl_!J z_CpZL-~c=U2jNLL1m#cxl~4uMPy-5GRJv3)I~Mj{etU?D?f20VLhRgk@L1Qtjt;dOnyX>UiMEm PQ_-(|Db~v0eL?4ce?08!