diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs index c0c8865..9560f66 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs @@ -127,6 +127,72 @@ public sealed partial class Account return (server, jsa, null); } + internal void TrackAPI() + { + _mu.EnterReadLock(); + var jsa = JetStream; + _mu.ExitReadLock(); + if (jsa is null) + return; + + jsa.UsageLock.EnterWriteLock(); + try + { + jsa.UsageApi++; + jsa.ApiTotal++; + jsa.SendClusterUsageUpdate(); + + if (jsa.Js is JetStream js) + Interlocked.Add(ref js.ApiTotal, 1); + } + finally + { + jsa.UsageLock.ExitWriteLock(); + } + } + + internal void TrackAPIErr() + { + _mu.EnterReadLock(); + var jsa = JetStream; + _mu.ExitReadLock(); + if (jsa is null) + return; + + jsa.UsageLock.EnterWriteLock(); + try + { + jsa.UsageApi++; + jsa.ApiTotal++; + jsa.UsageErr++; + jsa.ApiErrors++; + jsa.SendClusterUsageUpdate(); + + if (jsa.Js is JetStream js) + { + Interlocked.Add(ref js.ApiTotal, 1); + Interlocked.Add(ref js.ApiErrors, 1); + } + } + finally + { + jsa.UsageLock.ExitWriteLock(); + } + } + + internal (bool Enabled, bool ShouldError) CheckJetStream() + { + _mu.EnterReadLock(); + try + { + return (JetStream is not null, _nleafs + _nrleafs == 0); + } + finally + { + _mu.ExitReadLock(); + } + } + internal (bool MaxBytesRequired, long MaxStreamBytes) MaxBytesLimits(StreamConfig? cfg) { _mu.EnterReadLock(); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApi.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApi.cs new file mode 100644 index 0000000..04cda3f --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApi.cs @@ -0,0 +1,160 @@ +using System.Text.Json; +using System.Text.Json.Serialization; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +internal delegate void JetStreamApiHandler(ClientConnection c, Account acc, string subject, string reply, byte[] message); + +internal sealed class JetStreamApiSubscription +{ + public required string Subject { get; init; } + public required JetStreamApiHandler Handler { get; init; } +} + +internal sealed class JsApiRoutedRequest +{ + public required JetStreamApiSubscription Subscription { get; init; } + public required Account Account { get; init; } + public required string Subject { get; init; } + public required string Reply { get; init; } + public required byte[] Message { get; init; } +} + +internal sealed class DelayedApiResponse +{ + public ClientInfo? ClientInfo { get; init; } + public required Account Account { get; init; } + public required string Subject { get; init; } + public required string Reply { get; init; } + public string Request { get; init; } = string.Empty; + public byte[]? Header { get; init; } + public required string Response { get; init; } + public DateTime DeadlineUtc { get; init; } + public bool RawResponse { get; init; } + public DelayedApiResponse? Next { get; set; } +} + +internal static class JetStreamApi +{ + internal static readonly TimeSpan ErrorResponseDelay = TimeSpan.FromMilliseconds(500); + + internal static Dictionary GenerateJSMappingTable(string domain) + { + var mappings = new Dictionary(StringComparer.Ordinal) + { + ["INFO"] = JsApiSubjects.JsApiAccountInfo, + ["STREAM.>"] = "$JS.API.STREAM.>", + ["CONSUMER.>"] = "$JS.API.CONSUMER.>", + ["DIRECT.>"] = "$JS.API.DIRECT.>", + ["META.>"] = "$JS.API.META.>", + ["SERVER.>"] = "$JS.API.SERVER.>", + ["ACCOUNT.>"] = "$JS.API.ACCOUNT.>", + ["$KV.>"] = "$KV.>", + ["$OBJ.>"] = "$OBJ.>", + }; + + var table = new Dictionary(StringComparer.Ordinal); + foreach (var (suffix, target) in mappings) + table[$"$JS.{domain}.API.{suffix}"] = target; + + return table; + } + + internal static void AddDelayedResponse(ref DelayedApiResponse? head, ref DelayedApiResponse? tail, DelayedApiResponse response) + { + if (head is null) + { + head = response; + tail = response; + return; + } + + if (tail is not null && response.DeadlineUtc >= tail.DeadlineUtc) + { + tail.Next = response; + tail = response; + return; + } + + DelayedApiResponse? previous = null; + for (var current = head; current is not null; current = current.Next) + { + if (response.DeadlineUtc < current.DeadlineUtc) + { + response.Next = current; + if (previous is null) + head = response; + else + previous.Next = response; + return; + } + + previous = current; + } + + if (tail is not null) + tail.Next = response; + tail = response; + } + + internal static string StreamNameFromSubject(string subject) => + SubscriptionIndex.TokenAt(subject, 5); + + internal static string ConsumerNameFromSubject(string subject) => + SubscriptionIndex.TokenAt(subject, 6); + + internal static bool IsJSONObjectOrArray(ReadOnlySpan payload) + { + for (var i = 0; i < payload.Length; i++) + { + var ch = payload[i]; + if (ch is (byte)' ' or (byte)'\t' or (byte)'\r' or (byte)'\n') + continue; + + return ch is (byte)'{' or (byte)'['; + } + + return false; + } + + internal static bool SubjectMatches(string pattern, string subject) + { + var p = pattern.Split('.', StringSplitOptions.None); + var s = subject.Split('.', StringSplitOptions.None); + + for (var i = 0; i < p.Length; i++) + { + if (i >= s.Length) + return p[i] == ">"; + + var token = p[i]; + if (token == ">") + return true; + if (token == "*") + continue; + if (!string.Equals(token, s[i], StringComparison.Ordinal)) + return false; + } + + return p.Length == s.Length; + } + + internal static T? DeserializeStrict(byte[] payload, bool strictMode) + { + var strictOptions = new JsonSerializerOptions + { + PropertyNameCaseInsensitive = true, + UnmappedMemberHandling = JsonUnmappedMemberHandling.Disallow, + }; + + try + { + return JsonSerializer.Deserialize(payload, strictOptions); + } + catch when (!strictMode) + { + return JsonSerializer.Deserialize(payload); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs index 522bc08..38aae48 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs @@ -1,3 +1,6 @@ +using System.Text; +using ZB.MOM.NatsNet.Server.Internal; + namespace ZB.MOM.NatsNet.Server; internal sealed class JetStreamEngine(JetStream state) @@ -7,6 +10,7 @@ internal sealed class JetStreamEngine(JetStream state) private const string JsWillExtend = "will_extend"; private const string JsNoExtend = "no_extend"; private const string JsDomainApiTemplate = "$JS.{0}.API.>"; + private const string JsApiAccountPrefix = "$JS.API.ACCOUNT."; internal void SetStarted() { @@ -21,6 +25,91 @@ internal sealed class JetStreamEngine(JetStream state) } } + internal void ApiDispatch(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + // These meta/system directives are handled elsewhere. + if (subject == JsApiSubjects.JsApiLeaderStepDown || + subject == JsApiSubjects.JsApiRemoveServer || + subject.StartsWith(JsApiAccountPrefix, StringComparison.Ordinal)) + { + return; + } + + if (_state.Server is not NatsServer server) + return; + + var route = server.MatchJetStreamApiSubscription(subject); + var (header, payload) = c.MsgParts(rawMessage); + var requestBody = Encoding.UTF8.GetString(payload); + + if (NatsMessageHeaders.GetHeader(AccountEventConstants.ClientInfoHeader, header) is not { Length: > 0 }) + { + var systemAccount = server.SystemAccount(); + if (!ReferenceEquals(systemAccount, acc)) + return; + + if (subject != JsApiSubjects.JsApiAccountInfo) + { + if (c.Kind is ClientKind.Client or ClientKind.Leaf) + { + var response = new ApiResponse + { + Type = JsApiSubjects.JsApiSystemResponseType, + Error = JsApiErrors.NewJSNotEnabledForAccountError(), + }; + server.SendAPIErrResponse(null, acc, subject, reply, requestBody, server.JsonResponse(response)); + } + + return; + } + } + + if (route is null) + { + if (c.Kind is ClientKind.Client or ClientKind.Leaf) + { + var (_, requestAcc, _, _, _) = server.GetRequestInfo(c, rawMessage); + if (requestAcc is not null && !ReferenceEquals(requestAcc, server.SystemAccount())) + { + var response = new ApiResponse + { + Type = JsApiSubjects.JsApiSystemResponseType, + Error = JsApiErrors.NewJSBadRequestError(), + }; + server.SendAPIErrResponse(null, requestAcc, subject, reply, requestBody, server.JsonResponse(response)); + } + } + + return; + } + + Interlocked.Add(ref _state.ApiInflight, 1); + + var request = new JsApiRoutedRequest + { + Subscription = route, + Account = acc, + Subject = subject, + Reply = reply, + Message = rawMessage.ToArray(), + }; + + var (queued, dropped) = server.EnqueueJSApiRequest(_state, request); + if (!queued) + { + if (dropped > 0) + { + server.PublishAdvisory( + server.SystemAccount(), + JsApiSubjects.JsAdvisoryApiLimitReached, + new { Type = "io.nats.jetstream.advisory.v1.api_limit_reached", Dropped = dropped, Time = DateTime.UtcNow }); + } + + route.Handler(c, acc, subject, reply, rawMessage); + Interlocked.Add(ref _state.ApiInflight, -1); + } + } + internal bool IsEnabled() => Interlocked.CompareExchange(ref _state.Disabled, 0, 0) == 0; internal void SetJetStreamStandAlone(bool isStandAlone) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs index 7437ae5..e6ea739 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs @@ -218,6 +218,38 @@ internal sealed partial class JsAccount } } + internal long TieredReservation(string tier, StreamConfig cfg) + { + long reservation = 0; + + Lock.EnterReadLock(); + try + { + foreach (var stream in Streams.Values.OfType()) + { + var streamCfg = stream.Config; + if (streamCfg.MaxBytes <= 0 || string.Equals(streamCfg.Name, cfg.Name, StringComparison.Ordinal)) + continue; + + if (string.IsNullOrEmpty(tier)) + { + if (streamCfg.Storage == cfg.Storage) + reservation += Math.Max(1, streamCfg.Replicas) * streamCfg.MaxBytes; + continue; + } + + if (streamCfg.Replicas == cfg.Replicas && JetStreamEngine.IsSameTier(streamCfg, cfg)) + reservation += Math.Max(1, streamCfg.Replicas) * streamCfg.MaxBytes; + } + + return reservation; + } + finally + { + Lock.ExitReadLock(); + } + } + internal (ulong Mem, ulong Store) StorageTotals() { UsageLock.EnterReadLock(); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs new file mode 100644 index 0000000..680dfd2 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs @@ -0,0 +1,430 @@ +using System.Text; +using System.Text.Json; +using System.Text.Json.Serialization; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + private readonly Lock _jsApiSubsLock = new(); + private readonly List _jsApiSubscriptions = []; + private IpQueue? _jsApiRoutedReqs; + private IpQueue? _delayedApiResponses; + + internal void ProcessJSAPIRoutedRequests() + { + var queue = _jsApiRoutedReqs; + if (queue is null) + return; + + var client = CreateInternalJetStreamClient(); + var token = _quitCts.Token; + + while (!token.IsCancellationRequested) + { + try + { + if (!queue.Ch.WaitToReadAsync(token).AsTask().GetAwaiter().GetResult()) + break; + + while (true) + { + var (request, ok) = queue.PopOne(); + if (!ok) + break; + + var start = DateTime.UtcNow; + request.Subscription.Handler(client, request.Account, request.Subject, request.Reply, request.Message); + var elapsed = DateTime.UtcNow - start; + if (elapsed >= TimeSpan.FromSeconds(1)) + Warnf("Internal subscription on '{0}' took too long: {1}", request.Subject, elapsed); + + var js = GetJetStreamState(); + if (js is not null) + Interlocked.Add(ref js.ApiInflight, -1); + } + } + catch (OperationCanceledException) + { + return; + } + catch (Exception ex) + { + Warnf("JetStream routed API worker failed: {0}", ex.Message); + } + } + } + + internal Exception? SetJetStreamExportSubs() + { + var js = GetJetStreamState(); + if (js is null) + return new InvalidOperationException(JsApiErrors.NewJSNotEnabledError().Description ?? "jetstream not enabled"); + + var workers = Math.Min(Math.Max(Environment.ProcessorCount, 1), 16); + + _jsApiRoutedReqs = IpQueue.NewIPQueue("Routed JS API Requests"); + _delayedApiResponses = IpQueue.NewIPQueue("Delayed JS API Responses"); + Interlocked.Exchange(ref js.QueueLimit, JsApiSubjects.JsDefaultRequestQueueLimit); + + for (var i = 0; i < workers; i++) + StartGoRoutine(ProcessJSAPIRoutedRequests); + + StartGoRoutine(DelayedAPIResponder); + + lock (_jsApiSubsLock) + { + _jsApiSubscriptions.Clear(); + _jsApiSubscriptions.Add(new JetStreamApiSubscription + { + Subject = JsApiSubjects.JsApiAccountInfo, + Handler = JsAccountInfoRequest, + }); + } + + var sys = SystemAccount(); + if (sys is null) + return null; + + var err = sys.AddServiceExport(JsApiSubjects.JsAllApi, null); + if (err is not null) + Warnf("Error setting up jetstream service exports: {0}", err.Message); + return err; + } + + internal JetStreamApiSubscription? MatchJetStreamApiSubscription(string subject) + { + lock (_jsApiSubsLock) + { + foreach (var sub in _jsApiSubscriptions) + { + if (JetStreamApi.SubjectMatches(sub.Subject, subject)) + return sub; + } + } + + return null; + } + + internal (bool Queued, long Dropped) EnqueueJSApiRequest(JetStream js, JsApiRoutedRequest request) + { + var queue = _jsApiRoutedReqs; + if (queue is null) + return (false, 0); + + var (pending, error) = queue.Push(request); + var limit = Interlocked.Read(ref js.QueueLimit); + if (limit <= 0) + limit = JsApiSubjects.JsDefaultRequestQueueLimit; + + if (error is null && pending < limit) + return (true, 0); + + RateLimitFormatWarnf("JetStream API queue limit reached, dropping {0} requests", pending); + var drained = queue.Drain(); + if (drained > 0) + Interlocked.Add(ref js.ApiInflight, -drained); + + return (false, drained); + } + + internal void SendAPIResponse(ClientInfo? ci, Account acc, string subject, string reply, string request, string response) + { + acc.TrackAPI(); + if (!string.IsNullOrWhiteSpace(reply)) + _ = SendInternalAccountMsgWithReply(acc, reply, string.Empty, null, response, false); + + SendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response); + } + + internal void SendAPIErrResponse(ClientInfo? ci, Account acc, string subject, string reply, string request, string response) + { + acc.TrackAPIErr(); + if (!string.IsNullOrWhiteSpace(reply)) + _ = SendInternalAccountMsgWithReply(acc, reply, string.Empty, null, response, false); + + SendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response); + } + + internal void DelayedAPIResponder() + { + var queue = _delayedApiResponses; + if (queue is null) + return; + + var token = _quitCts.Token; + DelayedApiResponse? head = null; + DelayedApiResponse? tail = null; + + while (!token.IsCancellationRequested) + { + try + { + while (true) + { + var (entry, ok) = queue.PopOne(); + if (!ok) + break; + JetStreamApi.AddDelayedResponse(ref head, ref tail, entry); + } + + var now = DateTime.UtcNow; + if (head is not null && head.DeadlineUtc <= now) + { + var entry = head; + head = head.Next; + if (head is null) + tail = null; + + if (entry.RawResponse) + _ = SendInternalAccountMsgWithReply(entry.Account, entry.Subject, string.Empty, entry.Header, entry.Response, false); + else + SendAPIErrResponse(entry.ClientInfo, entry.Account, entry.Subject, entry.Reply, entry.Request, entry.Response); + + continue; + } + + var wait = head is null + ? TimeSpan.FromSeconds(1) + : head.DeadlineUtc - now; + + if (wait < TimeSpan.Zero) + wait = TimeSpan.Zero; + + _ = Task.WhenAny( + queue.Ch.WaitToReadAsync(token).AsTask(), + Task.Delay(wait, token)).GetAwaiter().GetResult(); + } + catch (OperationCanceledException) + { + return; + } + catch (Exception ex) + { + Warnf("Delayed JetStream API responder failed: {0}", ex.Message); + } + } + } + + internal void SendDelayedAPIErrResponse( + ClientInfo? ci, + Account acc, + string subject, + string reply, + string request, + string response, + TimeSpan duration) + { + _delayedApiResponses?.Push(new DelayedApiResponse + { + ClientInfo = ci, + Account = acc, + Subject = subject, + Reply = reply, + Request = request, + Response = response, + DeadlineUtc = DateTime.UtcNow.Add(duration), + RawResponse = false, + }); + } + + internal void SendDelayedErrResponse(Account acc, string subject, byte[]? header, string response, TimeSpan duration) + { + _delayedApiResponses?.Push(new DelayedApiResponse + { + Account = acc, + Subject = subject, + Reply = string.Empty, + Header = header, + Response = response, + DeadlineUtc = DateTime.UtcNow.Add(duration), + RawResponse = true, + }); + } + + internal (ClientInfo? ClientInfo, Account? Account, byte[] Header, byte[] Message, Exception? Error) GetRequestInfo(ClientConnection c, byte[] raw) + { + var (hdr, msg) = c.MsgParts(raw); + + ClientInfo? clientInfo = null; + var clientInfoBytes = NatsMessageHeaders.GetHeader(AccountEventConstants.ClientInfoHeader, hdr); + if (clientInfoBytes is { Length: > 0 }) + { + try + { + clientInfo = JsonSerializer.Deserialize(clientInfoBytes); + } + catch (Exception ex) + { + return (null, null, hdr, msg, ex); + } + } + + Account? acc = null; + var serviceAccount = clientInfo?.ServiceAccount(); + if (!string.IsNullOrWhiteSpace(serviceAccount)) + (acc, _) = LookupAccount(serviceAccount); + + if (acc is null) + { + acc = c.Account() as Account; + acc ??= SystemAccount(); + } + + if (acc is null) + return (clientInfo, null, hdr, msg, ServerErrors.ErrMissingAccount); + + return (clientInfo, acc, hdr, msg, null); + } + + internal Exception? UnmarshalRequest(ClientConnection c, Account acc, string subject, byte[] message, out T? value) + { + var strictMode = JetStreamConfig()?.Strict ?? true; + value = JetStreamApi.DeserializeStrict(message, strictMode); + if (value is not null) + return null; + + var err = new InvalidOperationException("unable to deserialize JetStream API request"); + c.RateLimitWarnf("Invalid JetStream request '{0} > {1}': {2}", acc.Name, subject, err.Message); + return err; + } + + internal Exception? UnmarshalRequest(ClientConnection c, Account acc, string subject, byte[] message, object destination) + { + var strictMode = JetStreamConfig()?.Strict ?? true; + object? parsed; + + try + { + parsed = JsonSerializer.Deserialize(message, destination.GetType(), new JsonSerializerOptions + { + PropertyNameCaseInsensitive = true, + UnmappedMemberHandling = JsonUnmappedMemberHandling.Disallow, + }); + } + catch (Exception ex) when (!strictMode) + { + try + { + parsed = JsonSerializer.Deserialize(message, destination.GetType()); + } + catch + { + c.RateLimitWarnf("Invalid JetStream request '{0} > {1}': {2}", acc.Name, subject, ex.Message); + return ex; + } + } + catch (Exception ex) + { + c.RateLimitWarnf("Invalid JetStream request '{0} > {1}': {2}", acc.Name, subject, ex.Message); + return ex; + } + + if (parsed is null) + return new InvalidOperationException("empty JetStream API request body"); + + foreach (var property in destination.GetType().GetProperties()) + { + if (!property.CanRead || !property.CanWrite) + continue; + property.SetValue(destination, property.GetValue(parsed)); + } + + return null; + } + + internal void JsAccountInfoRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + if (!JetStreamEnabled()) + return; + + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + { + Warnf("Malformed JetStream API Request: {0}", Encoding.UTF8.GetString(msg)); + return; + } + + var response = new JsApiAccountInfoResponse + { + Type = JsApiSubjects.JsApiAccountInfoResponseType, + }; + + var requiredApiLevel = NatsMessageHeaders.GetHeader(JsApiSubjects.JsRequiredApiLevel, hdr); + var reqApiHeader = requiredApiLevel is null ? null : Encoding.ASCII.GetString(requiredApiLevel); + if (JetStreamVersioning.ErrorOnRequiredApiLevel(reqApiHeader)) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (hasJetStream, shouldError) = requestAcc.CheckJetStream(); + if (!hasJetStream) + { + if (!shouldError) + return; + + response.Error = JsApiErrors.NewJSNotEnabledForAccountError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var stats = requestAcc.JetStreamUsage(); + response.Memory = stats.Memory; + response.Store = stats.Store; + response.ReservedMemory = stats.ReservedMemory; + response.ReservedStore = stats.ReservedStore; + response.Streams = stats.Streams; + response.Consumers = stats.Consumers; + response.Limits = stats.Limits; + response.Domain = stats.Domain; + response.Api = stats.Api; + response.Tiers = stats.Tiers; + + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal static string StreamNameFromSubject(string subject) => JetStreamApi.StreamNameFromSubject(subject); + + internal static string ConsumerNameFromSubject(string subject) => JetStreamApi.ConsumerNameFromSubject(subject); + + internal string JsonResponse(object response) + { + try + { + return JsonSerializer.Serialize(response); + } + catch (Exception ex) + { + Warnf("Problem marshaling JSON for JetStream API: {0}", ex.Message); + return string.Empty; + } + } + + internal void SendJetStreamAPIAuditAdvisory(ClientInfo? ci, Account acc, string subject, string request, string response) + { + _ = ci; + _ = acc; + _ = subject; + _ = request; + _ = response; + } + + private Exception? SendInternalAccountMsgWithReply(Account account, string subject, string reply, byte[]? header, string response, bool trackApi) + { + _ = trackApi; + + try + { + var sendQueue = account.GetSendQueue() ?? NewSendQueue(account); + SendQueue.Send(sendQueue, subject, reply, header ?? [], Encoding.UTF8.GetBytes(response)); + return null; + } + catch (Exception ex) + { + return ex; + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs index f6ee4db..9a5f983 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs @@ -268,11 +268,7 @@ public sealed partial class NatsServer internal void SetupJetStreamExports() { - var sys = SystemAccount(); - if (sys == null) - return; - - var err = sys.AddServiceExport(JsApiSubjects.JsAllApi, null); + var err = SetJetStreamExportSubs(); if (err != null) Warnf("Error setting up jetstream service exports: {0}", err.Message); } diff --git a/porting.db b/porting.db index 7accf8f..f8dd1f1 100644 Binary files a/porting.db and b/porting.db differ