From 2c4bebfb8d60c7fd9402504422cc02b3bbaddeb9 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 22:14:26 -0500 Subject: [PATCH 1/6] chore(batch28): start jetstream api batch --- porting.db | Bin 6750208 -> 6750208 bytes 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/porting.db b/porting.db index 1ce78f22a679b4f5e0a7439d53119440d3f46d9d..7accf8f75e740346bf2eac5bb45849e2f10249ba 100644 GIT binary patch delta 464 zcmYk$J8Tkh0LStBbHD+|{m){RT0q-EMb1YnKC9IF0&9K0pC>AwkHoguRwoyRuXb`i zV>Icao1IJ;42gq@Nds)F6YHZi~;n<=t|t!!gE zJJ`t(yVy;MVfHY>UiPt{101AGg+q*Tm@$rUlw%y{1SdJgY0hw#bDZY_7rDe`u5k6O zJusJS%BS*;@zMNgT4urw%U^OyF3N>QWzHU!;A5EQhCuhs{kAinjp0tI782n!2SjsM8x zTm}bR92hznk`5-MBqk;#OeTz)xVW*wU}EBh5X10GPR_Ff{$4`JDA;Ls3 zFo|Lzu{l5$)g0syhp8dP5o(F!kl-lCI8Gh)G;o4OnrJ4;Nm96+;xsLsp_Mk;>EJA# zba9UJbkoBHE|R8~OZ0J>D_rFo*U6CO1~<9IZTcBtkUQLEh+*zA!hIg_kVib`2~T;( z^JUe#7<+Ghv$t&5uC{~nkKB|Sa!r1gpUT-q)hoqHk&%847<4_lK53ox$bcU|=aw<6rQ%=oNj~x8fVI7sOvXEx*W& z^;`RHy|Y|xHu^O>Zf=XW!ZoLi?P7hMXx4Lavz%U4o2HRVm|C(&2;mgxDk>G)J1v$f uDrw{$Q%hxb$Db9|_J546s-}5&U3p%3a-Nr-QO}rX+>`fAl->2ojgEiw1)fg; From bf115b116e0edef7b294e7dc185b2ee783c4b723 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 22:24:10 -0500 Subject: [PATCH 2/6] feat(batch28): implement jetstream api dispatch and account request core --- .../Accounts/Account.JetStream.cs | 66 +++ .../JetStream/JetStreamApi.cs | 160 +++++++ .../JetStream/JetStreamEngine.cs | 89 ++++ .../JetStream/JsAccount.Core.cs | 32 ++ .../NatsServer.JetStreamApi.cs | 430 ++++++++++++++++++ .../NatsServer.JetStreamCore.cs | 6 +- porting.db | Bin 6750208 -> 6754304 bytes 7 files changed, 778 insertions(+), 5 deletions(-) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApi.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs index c0c8865..9560f66 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs @@ -127,6 +127,72 @@ public sealed partial class Account return (server, jsa, null); } + internal void TrackAPI() + { + _mu.EnterReadLock(); + var jsa = JetStream; + _mu.ExitReadLock(); + if (jsa is null) + return; + + jsa.UsageLock.EnterWriteLock(); + try + { + jsa.UsageApi++; + jsa.ApiTotal++; + jsa.SendClusterUsageUpdate(); + + if (jsa.Js is JetStream js) + Interlocked.Add(ref js.ApiTotal, 1); + } + finally + { + jsa.UsageLock.ExitWriteLock(); + } + } + + internal void TrackAPIErr() + { + _mu.EnterReadLock(); + var jsa = JetStream; + _mu.ExitReadLock(); + if (jsa is null) + return; + + jsa.UsageLock.EnterWriteLock(); + try + { + jsa.UsageApi++; + jsa.ApiTotal++; + jsa.UsageErr++; + jsa.ApiErrors++; + jsa.SendClusterUsageUpdate(); + + if (jsa.Js is JetStream js) + { + Interlocked.Add(ref js.ApiTotal, 1); + Interlocked.Add(ref js.ApiErrors, 1); + } + } + finally + { + jsa.UsageLock.ExitWriteLock(); + } + } + + internal (bool Enabled, bool ShouldError) CheckJetStream() + { + _mu.EnterReadLock(); + try + { + return (JetStream is not null, _nleafs + _nrleafs == 0); + } + finally + { + _mu.ExitReadLock(); + } + } + internal (bool MaxBytesRequired, long MaxStreamBytes) MaxBytesLimits(StreamConfig? cfg) { _mu.EnterReadLock(); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApi.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApi.cs new file mode 100644 index 0000000..04cda3f --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApi.cs @@ -0,0 +1,160 @@ +using System.Text.Json; +using System.Text.Json.Serialization; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +internal delegate void JetStreamApiHandler(ClientConnection c, Account acc, string subject, string reply, byte[] message); + +internal sealed class JetStreamApiSubscription +{ + public required string Subject { get; init; } + public required JetStreamApiHandler Handler { get; init; } +} + +internal sealed class JsApiRoutedRequest +{ + public required JetStreamApiSubscription Subscription { get; init; } + public required Account Account { get; init; } + public required string Subject { get; init; } + public required string Reply { get; init; } + public required byte[] Message { get; init; } +} + +internal sealed class DelayedApiResponse +{ + public ClientInfo? ClientInfo { get; init; } + public required Account Account { get; init; } + public required string Subject { get; init; } + public required string Reply { get; init; } + public string Request { get; init; } = string.Empty; + public byte[]? Header { get; init; } + public required string Response { get; init; } + public DateTime DeadlineUtc { get; init; } + public bool RawResponse { get; init; } + public DelayedApiResponse? Next { get; set; } +} + +internal static class JetStreamApi +{ + internal static readonly TimeSpan ErrorResponseDelay = TimeSpan.FromMilliseconds(500); + + internal static Dictionary GenerateJSMappingTable(string domain) + { + var mappings = new Dictionary(StringComparer.Ordinal) + { + ["INFO"] = JsApiSubjects.JsApiAccountInfo, + ["STREAM.>"] = "$JS.API.STREAM.>", + ["CONSUMER.>"] = "$JS.API.CONSUMER.>", + ["DIRECT.>"] = "$JS.API.DIRECT.>", + ["META.>"] = "$JS.API.META.>", + ["SERVER.>"] = "$JS.API.SERVER.>", + ["ACCOUNT.>"] = "$JS.API.ACCOUNT.>", + ["$KV.>"] = "$KV.>", + ["$OBJ.>"] = "$OBJ.>", + }; + + var table = new Dictionary(StringComparer.Ordinal); + foreach (var (suffix, target) in mappings) + table[$"$JS.{domain}.API.{suffix}"] = target; + + return table; + } + + internal static void AddDelayedResponse(ref DelayedApiResponse? head, ref DelayedApiResponse? tail, DelayedApiResponse response) + { + if (head is null) + { + head = response; + tail = response; + return; + } + + if (tail is not null && response.DeadlineUtc >= tail.DeadlineUtc) + { + tail.Next = response; + tail = response; + return; + } + + DelayedApiResponse? previous = null; + for (var current = head; current is not null; current = current.Next) + { + if (response.DeadlineUtc < current.DeadlineUtc) + { + response.Next = current; + if (previous is null) + head = response; + else + previous.Next = response; + return; + } + + previous = current; + } + + if (tail is not null) + tail.Next = response; + tail = response; + } + + internal static string StreamNameFromSubject(string subject) => + SubscriptionIndex.TokenAt(subject, 5); + + internal static string ConsumerNameFromSubject(string subject) => + SubscriptionIndex.TokenAt(subject, 6); + + internal static bool IsJSONObjectOrArray(ReadOnlySpan payload) + { + for (var i = 0; i < payload.Length; i++) + { + var ch = payload[i]; + if (ch is (byte)' ' or (byte)'\t' or (byte)'\r' or (byte)'\n') + continue; + + return ch is (byte)'{' or (byte)'['; + } + + return false; + } + + internal static bool SubjectMatches(string pattern, string subject) + { + var p = pattern.Split('.', StringSplitOptions.None); + var s = subject.Split('.', StringSplitOptions.None); + + for (var i = 0; i < p.Length; i++) + { + if (i >= s.Length) + return p[i] == ">"; + + var token = p[i]; + if (token == ">") + return true; + if (token == "*") + continue; + if (!string.Equals(token, s[i], StringComparison.Ordinal)) + return false; + } + + return p.Length == s.Length; + } + + internal static T? DeserializeStrict(byte[] payload, bool strictMode) + { + var strictOptions = new JsonSerializerOptions + { + PropertyNameCaseInsensitive = true, + UnmappedMemberHandling = JsonUnmappedMemberHandling.Disallow, + }; + + try + { + return JsonSerializer.Deserialize(payload, strictOptions); + } + catch when (!strictMode) + { + return JsonSerializer.Deserialize(payload); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs index 522bc08..38aae48 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs @@ -1,3 +1,6 @@ +using System.Text; +using ZB.MOM.NatsNet.Server.Internal; + namespace ZB.MOM.NatsNet.Server; internal sealed class JetStreamEngine(JetStream state) @@ -7,6 +10,7 @@ internal sealed class JetStreamEngine(JetStream state) private const string JsWillExtend = "will_extend"; private const string JsNoExtend = "no_extend"; private const string JsDomainApiTemplate = "$JS.{0}.API.>"; + private const string JsApiAccountPrefix = "$JS.API.ACCOUNT."; internal void SetStarted() { @@ -21,6 +25,91 @@ internal sealed class JetStreamEngine(JetStream state) } } + internal void ApiDispatch(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + // These meta/system directives are handled elsewhere. + if (subject == JsApiSubjects.JsApiLeaderStepDown || + subject == JsApiSubjects.JsApiRemoveServer || + subject.StartsWith(JsApiAccountPrefix, StringComparison.Ordinal)) + { + return; + } + + if (_state.Server is not NatsServer server) + return; + + var route = server.MatchJetStreamApiSubscription(subject); + var (header, payload) = c.MsgParts(rawMessage); + var requestBody = Encoding.UTF8.GetString(payload); + + if (NatsMessageHeaders.GetHeader(AccountEventConstants.ClientInfoHeader, header) is not { Length: > 0 }) + { + var systemAccount = server.SystemAccount(); + if (!ReferenceEquals(systemAccount, acc)) + return; + + if (subject != JsApiSubjects.JsApiAccountInfo) + { + if (c.Kind is ClientKind.Client or ClientKind.Leaf) + { + var response = new ApiResponse + { + Type = JsApiSubjects.JsApiSystemResponseType, + Error = JsApiErrors.NewJSNotEnabledForAccountError(), + }; + server.SendAPIErrResponse(null, acc, subject, reply, requestBody, server.JsonResponse(response)); + } + + return; + } + } + + if (route is null) + { + if (c.Kind is ClientKind.Client or ClientKind.Leaf) + { + var (_, requestAcc, _, _, _) = server.GetRequestInfo(c, rawMessage); + if (requestAcc is not null && !ReferenceEquals(requestAcc, server.SystemAccount())) + { + var response = new ApiResponse + { + Type = JsApiSubjects.JsApiSystemResponseType, + Error = JsApiErrors.NewJSBadRequestError(), + }; + server.SendAPIErrResponse(null, requestAcc, subject, reply, requestBody, server.JsonResponse(response)); + } + } + + return; + } + + Interlocked.Add(ref _state.ApiInflight, 1); + + var request = new JsApiRoutedRequest + { + Subscription = route, + Account = acc, + Subject = subject, + Reply = reply, + Message = rawMessage.ToArray(), + }; + + var (queued, dropped) = server.EnqueueJSApiRequest(_state, request); + if (!queued) + { + if (dropped > 0) + { + server.PublishAdvisory( + server.SystemAccount(), + JsApiSubjects.JsAdvisoryApiLimitReached, + new { Type = "io.nats.jetstream.advisory.v1.api_limit_reached", Dropped = dropped, Time = DateTime.UtcNow }); + } + + route.Handler(c, acc, subject, reply, rawMessage); + Interlocked.Add(ref _state.ApiInflight, -1); + } + } + internal bool IsEnabled() => Interlocked.CompareExchange(ref _state.Disabled, 0, 0) == 0; internal void SetJetStreamStandAlone(bool isStandAlone) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs index 7437ae5..e6ea739 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs @@ -218,6 +218,38 @@ internal sealed partial class JsAccount } } + internal long TieredReservation(string tier, StreamConfig cfg) + { + long reservation = 0; + + Lock.EnterReadLock(); + try + { + foreach (var stream in Streams.Values.OfType()) + { + var streamCfg = stream.Config; + if (streamCfg.MaxBytes <= 0 || string.Equals(streamCfg.Name, cfg.Name, StringComparison.Ordinal)) + continue; + + if (string.IsNullOrEmpty(tier)) + { + if (streamCfg.Storage == cfg.Storage) + reservation += Math.Max(1, streamCfg.Replicas) * streamCfg.MaxBytes; + continue; + } + + if (streamCfg.Replicas == cfg.Replicas && JetStreamEngine.IsSameTier(streamCfg, cfg)) + reservation += Math.Max(1, streamCfg.Replicas) * streamCfg.MaxBytes; + } + + return reservation; + } + finally + { + Lock.ExitReadLock(); + } + } + internal (ulong Mem, ulong Store) StorageTotals() { UsageLock.EnterReadLock(); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs new file mode 100644 index 0000000..680dfd2 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs @@ -0,0 +1,430 @@ +using System.Text; +using System.Text.Json; +using System.Text.Json.Serialization; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + private readonly Lock _jsApiSubsLock = new(); + private readonly List _jsApiSubscriptions = []; + private IpQueue? _jsApiRoutedReqs; + private IpQueue? _delayedApiResponses; + + internal void ProcessJSAPIRoutedRequests() + { + var queue = _jsApiRoutedReqs; + if (queue is null) + return; + + var client = CreateInternalJetStreamClient(); + var token = _quitCts.Token; + + while (!token.IsCancellationRequested) + { + try + { + if (!queue.Ch.WaitToReadAsync(token).AsTask().GetAwaiter().GetResult()) + break; + + while (true) + { + var (request, ok) = queue.PopOne(); + if (!ok) + break; + + var start = DateTime.UtcNow; + request.Subscription.Handler(client, request.Account, request.Subject, request.Reply, request.Message); + var elapsed = DateTime.UtcNow - start; + if (elapsed >= TimeSpan.FromSeconds(1)) + Warnf("Internal subscription on '{0}' took too long: {1}", request.Subject, elapsed); + + var js = GetJetStreamState(); + if (js is not null) + Interlocked.Add(ref js.ApiInflight, -1); + } + } + catch (OperationCanceledException) + { + return; + } + catch (Exception ex) + { + Warnf("JetStream routed API worker failed: {0}", ex.Message); + } + } + } + + internal Exception? SetJetStreamExportSubs() + { + var js = GetJetStreamState(); + if (js is null) + return new InvalidOperationException(JsApiErrors.NewJSNotEnabledError().Description ?? "jetstream not enabled"); + + var workers = Math.Min(Math.Max(Environment.ProcessorCount, 1), 16); + + _jsApiRoutedReqs = IpQueue.NewIPQueue("Routed JS API Requests"); + _delayedApiResponses = IpQueue.NewIPQueue("Delayed JS API Responses"); + Interlocked.Exchange(ref js.QueueLimit, JsApiSubjects.JsDefaultRequestQueueLimit); + + for (var i = 0; i < workers; i++) + StartGoRoutine(ProcessJSAPIRoutedRequests); + + StartGoRoutine(DelayedAPIResponder); + + lock (_jsApiSubsLock) + { + _jsApiSubscriptions.Clear(); + _jsApiSubscriptions.Add(new JetStreamApiSubscription + { + Subject = JsApiSubjects.JsApiAccountInfo, + Handler = JsAccountInfoRequest, + }); + } + + var sys = SystemAccount(); + if (sys is null) + return null; + + var err = sys.AddServiceExport(JsApiSubjects.JsAllApi, null); + if (err is not null) + Warnf("Error setting up jetstream service exports: {0}", err.Message); + return err; + } + + internal JetStreamApiSubscription? MatchJetStreamApiSubscription(string subject) + { + lock (_jsApiSubsLock) + { + foreach (var sub in _jsApiSubscriptions) + { + if (JetStreamApi.SubjectMatches(sub.Subject, subject)) + return sub; + } + } + + return null; + } + + internal (bool Queued, long Dropped) EnqueueJSApiRequest(JetStream js, JsApiRoutedRequest request) + { + var queue = _jsApiRoutedReqs; + if (queue is null) + return (false, 0); + + var (pending, error) = queue.Push(request); + var limit = Interlocked.Read(ref js.QueueLimit); + if (limit <= 0) + limit = JsApiSubjects.JsDefaultRequestQueueLimit; + + if (error is null && pending < limit) + return (true, 0); + + RateLimitFormatWarnf("JetStream API queue limit reached, dropping {0} requests", pending); + var drained = queue.Drain(); + if (drained > 0) + Interlocked.Add(ref js.ApiInflight, -drained); + + return (false, drained); + } + + internal void SendAPIResponse(ClientInfo? ci, Account acc, string subject, string reply, string request, string response) + { + acc.TrackAPI(); + if (!string.IsNullOrWhiteSpace(reply)) + _ = SendInternalAccountMsgWithReply(acc, reply, string.Empty, null, response, false); + + SendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response); + } + + internal void SendAPIErrResponse(ClientInfo? ci, Account acc, string subject, string reply, string request, string response) + { + acc.TrackAPIErr(); + if (!string.IsNullOrWhiteSpace(reply)) + _ = SendInternalAccountMsgWithReply(acc, reply, string.Empty, null, response, false); + + SendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response); + } + + internal void DelayedAPIResponder() + { + var queue = _delayedApiResponses; + if (queue is null) + return; + + var token = _quitCts.Token; + DelayedApiResponse? head = null; + DelayedApiResponse? tail = null; + + while (!token.IsCancellationRequested) + { + try + { + while (true) + { + var (entry, ok) = queue.PopOne(); + if (!ok) + break; + JetStreamApi.AddDelayedResponse(ref head, ref tail, entry); + } + + var now = DateTime.UtcNow; + if (head is not null && head.DeadlineUtc <= now) + { + var entry = head; + head = head.Next; + if (head is null) + tail = null; + + if (entry.RawResponse) + _ = SendInternalAccountMsgWithReply(entry.Account, entry.Subject, string.Empty, entry.Header, entry.Response, false); + else + SendAPIErrResponse(entry.ClientInfo, entry.Account, entry.Subject, entry.Reply, entry.Request, entry.Response); + + continue; + } + + var wait = head is null + ? TimeSpan.FromSeconds(1) + : head.DeadlineUtc - now; + + if (wait < TimeSpan.Zero) + wait = TimeSpan.Zero; + + _ = Task.WhenAny( + queue.Ch.WaitToReadAsync(token).AsTask(), + Task.Delay(wait, token)).GetAwaiter().GetResult(); + } + catch (OperationCanceledException) + { + return; + } + catch (Exception ex) + { + Warnf("Delayed JetStream API responder failed: {0}", ex.Message); + } + } + } + + internal void SendDelayedAPIErrResponse( + ClientInfo? ci, + Account acc, + string subject, + string reply, + string request, + string response, + TimeSpan duration) + { + _delayedApiResponses?.Push(new DelayedApiResponse + { + ClientInfo = ci, + Account = acc, + Subject = subject, + Reply = reply, + Request = request, + Response = response, + DeadlineUtc = DateTime.UtcNow.Add(duration), + RawResponse = false, + }); + } + + internal void SendDelayedErrResponse(Account acc, string subject, byte[]? header, string response, TimeSpan duration) + { + _delayedApiResponses?.Push(new DelayedApiResponse + { + Account = acc, + Subject = subject, + Reply = string.Empty, + Header = header, + Response = response, + DeadlineUtc = DateTime.UtcNow.Add(duration), + RawResponse = true, + }); + } + + internal (ClientInfo? ClientInfo, Account? Account, byte[] Header, byte[] Message, Exception? Error) GetRequestInfo(ClientConnection c, byte[] raw) + { + var (hdr, msg) = c.MsgParts(raw); + + ClientInfo? clientInfo = null; + var clientInfoBytes = NatsMessageHeaders.GetHeader(AccountEventConstants.ClientInfoHeader, hdr); + if (clientInfoBytes is { Length: > 0 }) + { + try + { + clientInfo = JsonSerializer.Deserialize(clientInfoBytes); + } + catch (Exception ex) + { + return (null, null, hdr, msg, ex); + } + } + + Account? acc = null; + var serviceAccount = clientInfo?.ServiceAccount(); + if (!string.IsNullOrWhiteSpace(serviceAccount)) + (acc, _) = LookupAccount(serviceAccount); + + if (acc is null) + { + acc = c.Account() as Account; + acc ??= SystemAccount(); + } + + if (acc is null) + return (clientInfo, null, hdr, msg, ServerErrors.ErrMissingAccount); + + return (clientInfo, acc, hdr, msg, null); + } + + internal Exception? UnmarshalRequest(ClientConnection c, Account acc, string subject, byte[] message, out T? value) + { + var strictMode = JetStreamConfig()?.Strict ?? true; + value = JetStreamApi.DeserializeStrict(message, strictMode); + if (value is not null) + return null; + + var err = new InvalidOperationException("unable to deserialize JetStream API request"); + c.RateLimitWarnf("Invalid JetStream request '{0} > {1}': {2}", acc.Name, subject, err.Message); + return err; + } + + internal Exception? UnmarshalRequest(ClientConnection c, Account acc, string subject, byte[] message, object destination) + { + var strictMode = JetStreamConfig()?.Strict ?? true; + object? parsed; + + try + { + parsed = JsonSerializer.Deserialize(message, destination.GetType(), new JsonSerializerOptions + { + PropertyNameCaseInsensitive = true, + UnmappedMemberHandling = JsonUnmappedMemberHandling.Disallow, + }); + } + catch (Exception ex) when (!strictMode) + { + try + { + parsed = JsonSerializer.Deserialize(message, destination.GetType()); + } + catch + { + c.RateLimitWarnf("Invalid JetStream request '{0} > {1}': {2}", acc.Name, subject, ex.Message); + return ex; + } + } + catch (Exception ex) + { + c.RateLimitWarnf("Invalid JetStream request '{0} > {1}': {2}", acc.Name, subject, ex.Message); + return ex; + } + + if (parsed is null) + return new InvalidOperationException("empty JetStream API request body"); + + foreach (var property in destination.GetType().GetProperties()) + { + if (!property.CanRead || !property.CanWrite) + continue; + property.SetValue(destination, property.GetValue(parsed)); + } + + return null; + } + + internal void JsAccountInfoRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + if (!JetStreamEnabled()) + return; + + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + { + Warnf("Malformed JetStream API Request: {0}", Encoding.UTF8.GetString(msg)); + return; + } + + var response = new JsApiAccountInfoResponse + { + Type = JsApiSubjects.JsApiAccountInfoResponseType, + }; + + var requiredApiLevel = NatsMessageHeaders.GetHeader(JsApiSubjects.JsRequiredApiLevel, hdr); + var reqApiHeader = requiredApiLevel is null ? null : Encoding.ASCII.GetString(requiredApiLevel); + if (JetStreamVersioning.ErrorOnRequiredApiLevel(reqApiHeader)) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (hasJetStream, shouldError) = requestAcc.CheckJetStream(); + if (!hasJetStream) + { + if (!shouldError) + return; + + response.Error = JsApiErrors.NewJSNotEnabledForAccountError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var stats = requestAcc.JetStreamUsage(); + response.Memory = stats.Memory; + response.Store = stats.Store; + response.ReservedMemory = stats.ReservedMemory; + response.ReservedStore = stats.ReservedStore; + response.Streams = stats.Streams; + response.Consumers = stats.Consumers; + response.Limits = stats.Limits; + response.Domain = stats.Domain; + response.Api = stats.Api; + response.Tiers = stats.Tiers; + + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal static string StreamNameFromSubject(string subject) => JetStreamApi.StreamNameFromSubject(subject); + + internal static string ConsumerNameFromSubject(string subject) => JetStreamApi.ConsumerNameFromSubject(subject); + + internal string JsonResponse(object response) + { + try + { + return JsonSerializer.Serialize(response); + } + catch (Exception ex) + { + Warnf("Problem marshaling JSON for JetStream API: {0}", ex.Message); + return string.Empty; + } + } + + internal void SendJetStreamAPIAuditAdvisory(ClientInfo? ci, Account acc, string subject, string request, string response) + { + _ = ci; + _ = acc; + _ = subject; + _ = request; + _ = response; + } + + private Exception? SendInternalAccountMsgWithReply(Account account, string subject, string reply, byte[]? header, string response, bool trackApi) + { + _ = trackApi; + + try + { + var sendQueue = account.GetSendQueue() ?? NewSendQueue(account); + SendQueue.Send(sendQueue, subject, reply, header ?? [], Encoding.UTF8.GetBytes(response)); + return null; + } + catch (Exception ex) + { + return ex; + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs index f6ee4db..9a5f983 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs @@ -268,11 +268,7 @@ public sealed partial class NatsServer internal void SetupJetStreamExports() { - var sys = SystemAccount(); - if (sys == null) - return; - - var err = sys.AddServiceExport(JsApiSubjects.JsAllApi, null); + var err = SetJetStreamExportSubs(); if (err != null) Warnf("Error setting up jetstream service exports: {0}", err.Message); } diff --git a/porting.db b/porting.db index 7accf8f75e740346bf2eac5bb45849e2f10249ba..f8dd1f1e27d0ffe55ab08c18d94be1efe0da6abf 100644 GIT binary patch delta 4544 zcmb`JTWl0n7{_;JXD?^(rLC0Swgs%vZri=I*j=iCNLzZLgNRx#rQ6+EsVFVoEoi)y zMl%?ziA3U;=!6t7fd?%ug$|ceAR0|1BE-Z8HIb?$L=%I)=z|Y_yR)74&^7U4GWk9H zbH1GKa=!DOIe1Xg9~_bN?-m$N>U73^hfCHlC&#9bo0 z43{Z2B~Sgr{Gw-dZ^QAs%$tMM;w4I5q|^mUou`ycsYyzmqtsbSP1pzLM|y@@9H-P6 zrA||7lu{p4>MEtKP-@z)mskAC{E*9yvb%J^-)737tJHM_){e3Jp^ITXWzM7j=ab)J z*pm!r&u34--{)M*;bIQ!l6&;*I2m--GkH-VCiM3UQNvYdy{=AGS5TF!Y8q9csxG5) zsA?)1x2i7T+Gnb|h-y+*7f_v2)p_S&z3u~5%DDKpswPprrmAzO`c!omRkx}pP(7)t zGpHI=HI8a7r7=`;odWLWDR*jJn9myh0J z8!bj$=##>Se*2z9^azI-5F=uefBW7%-2Eu@gz6y9ke?elF)>E)_cvQu$6vXZmHL@QZA8{BOr z9chDnw;lGhk>#|3)JD>U@4IexfOpek%ktZT=5qYb~+eF?j@(z)Aio8qYvqXN8$Y+auj>zYV`~xDtSmg6Weu>EEi+q8| z7m9q*Bwzf=nRFD%Y!1&e}INC{GkEJcGUOp-1+o$;M=B5xQi)U{)rc4IA$}x) z)F8FUD&%3L4hbS5WHnNcJc2xmtU=Zy>yXEg^~mGM2BZOLM4FI|NHfxcv?6WDCZrwV zk8dib{KGZ=-`s8c0J@{&AN6POY<@ai;^>Uiy>mrfffxU^U^}YQd4^i za>|)`_GZHjAA3Nnh7L*3C79DyI8J# z?XTDxbD~= z<1@Lc)H#rvhIOf)(5h$w>A6h)FTx+6;$_U_zZMRk5H!0L0jVdG{|o=^KLA6UwVQ%L zX}98^evi&8{*&*~f~N7;X*w4|(z8kgblb|x91i^~;2h2vMh!5>wL6HQ6w6eI$+_{g z;9>TsA7-t3F(?U{LO3@U!osugrMf|@p%s*(nIeGAePx# delta 1503 zcmY+>TTC2P9LMpw?aUt7ot+jC3OhS&!9oGc($*FWcB!>ocVW4-P|9^-5n|N|(Z;?Y zwg#Gjw?z3Th8jSNn3xLHcA_B&Mx)Wh)|wb2K0wo$DoN8cG5X+xziykUefVVa{h!^L zbIuM=O^UF66uBQ>7{HGbtcQ-?<80K9?r7 zN+D^#6tK$H_!*w&*ZExQ+B&}^c$FVGZ_jhjn8#z!#}Xqb)vh{g@j8E96!+KWA?n|4J}Yj$;`uG-aw`pT|O)Frz*P;+**qu#Nr4K;38D~IW$m1kEY>i%6cpzhyAy<4<$|Kxv5k-ii zN%xbfGlfoVQ*IZ1TFth`AFyp~-@PqqKf9BO zuVH)X)VMo)BE+23Tf@BZ*S}W7YH4%7tWVU43YPrwX<>-9(d{ZFmp%!x-1z8^5Nn~C zkGwh5A7(l6(b+KTzx!tMd#6f)T9zN557e?}ADk=1>R1jd`mY2i? z`lgN*;8*U6Wv4tUM;IhPBDlZ}DtJJHO|Th~AQ@8NAxMQZ*aGR00bbB8IpX`^0trv6 zI+<_DjD8Y~_zjasG`eVJ75j=vhC*jW*>Eco?=p4&=gi$b)<+fJa~l6hZ)sU?=Q?-S8;vfxS=+kHJ1Dfl_!J z_CpZL-~c=U2jNLL1m#cxl~4uMPy-5GRJv3)I~Mj{etU?D?f20VLhRgk@L1Qtjt;dOnyX>UiMEm PQ_-(|Db~v0eL?4ce?08! From b40a32cfe91926357c821e5d4dae0b7058bdf5f2 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 22:29:32 -0500 Subject: [PATCH 3/6] feat(batch28): implement jetstream api stream and control handlers --- .../JetStream/JetStreamApi.cs | 30 + .../NatsServer.JetStreamApi.cs | 631 +++++++++++++++++- porting.db | Bin 6754304 -> 6762496 bytes 3 files changed, 656 insertions(+), 5 deletions(-) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApi.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApi.cs index 04cda3f..86c7b45 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApi.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApi.cs @@ -118,6 +118,36 @@ internal static class JetStreamApi return false; } + internal static bool IsEmptyRequest(ReadOnlySpan payload) + { + if (payload.IsEmpty) + return true; + + var start = 0; + var end = payload.Length - 1; + while (start < payload.Length && payload[start] is (byte)' ' or (byte)'\t' or (byte)'\r' or (byte)'\n') + start++; + while (end >= start && payload[end] is (byte)' ' or (byte)'\t' or (byte)'\r' or (byte)'\n') + end--; + if (end < start) + return true; + + var trimmed = payload.Slice(start, end - start + 1); + if (trimmed.SequenceEqual("{}"u8)) + return true; + + try + { + using var document = JsonDocument.Parse(trimmed.ToArray()); + return document.RootElement.ValueKind == JsonValueKind.Object && + !document.RootElement.EnumerateObject().Any(); + } + catch + { + return false; + } + } + internal static bool SubjectMatches(string pattern, string subject) { var p = pattern.Split('.', StringSplitOptions.None); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs index 680dfd2..6b67d27 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs @@ -76,11 +76,23 @@ public sealed partial class NatsServer lock (_jsApiSubsLock) { _jsApiSubscriptions.Clear(); - _jsApiSubscriptions.Add(new JetStreamApiSubscription - { - Subject = JsApiSubjects.JsApiAccountInfo, - Handler = JsAccountInfoRequest, - }); + _jsApiSubscriptions.AddRange( + [ + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiAccountInfo, Handler = JsAccountInfoRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamCreate, Handler = JsStreamCreateRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamUpdate, Handler = JsStreamUpdateRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreams, Handler = JsStreamNamesRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamList, Handler = JsStreamListRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamInfo, Handler = JsStreamInfoRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamLeaderStepDown, Handler = JsStreamLeaderStepDownRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerLeaderStepDown, Handler = JsConsumerLeaderStepDownRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamRemovePeer, Handler = JsStreamRemovePeerRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiRemoveServer, Handler = JsLeaderServerRemoveRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiServerStreamMove, Handler = JsLeaderServerStreamMoveRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiServerStreamCancelMove, Handler = JsLeaderServerStreamCancelMoveRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiAccountPurge, Handler = JsLeaderAccountPurgeRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiLeaderStepDown, Handler = JsLeaderStepDownRequest }, + ]); } var sys = SystemAccount(); @@ -386,6 +398,615 @@ public sealed partial class NatsServer SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); } + internal void JsStreamCreateRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamCreateResponse { Type = JsApiSubjects.JsApiStreamCreateResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (hasJetStream, shouldError) = requestAcc.CheckJetStream(); + if (!hasJetStream) + { + if (shouldError) + { + response.Error = JsApiErrors.NewJSNotEnabledForAccountError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + return; + } + + var request = new StreamConfigRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = StreamNameFromSubject(subject); + if (!string.Equals(streamName, request.Config.Name, StringComparison.Ordinal)) + { + response.Error = JsApiErrors.NewJSStreamMismatchError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + if (streamName.IndexOfAny(['\\', '/']) >= 0) + { + response.Error = JsApiErrors.NewJSStreamNameContainsPathSeparatorsError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var jsa = requestAcc.JetStream; + if (jsa is null) + return; + + jsa.Lock.EnterWriteLock(); + try + { + if (jsa.Streams.ContainsKey(streamName)) + { + response.Error = JsApiErrors.NewJSStreamNameExistError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var stream = NatsStream.Create(requestAcc, request.Config, jsa, null, null, this); + if (stream is null) + { + response.Error = JsApiErrors.NewJSStreamCreateError(new InvalidOperationException("stream creation failed")); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + jsa.Streams[streamName] = stream; + response.Config = stream.GetConfig(); + response.State = stream.State(); + response.DidCreate = true; + } + finally + { + jsa.Lock.ExitWriteLock(); + } + + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsStreamUpdateRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamUpdateResponse { Type = JsApiSubjects.JsApiStreamUpdateResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (hasJetStream, shouldError) = requestAcc.CheckJetStream(); + if (!hasJetStream) + { + if (shouldError) + { + response.Error = JsApiErrors.NewJSNotEnabledForAccountError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + return; + } + + var request = new StreamConfigRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = StreamNameFromSubject(subject); + if (!string.Equals(streamName, request.Config.Name, StringComparison.Ordinal)) + { + response.Error = JsApiErrors.NewJSStreamMismatchError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + stream.UpdateConfig(request.Config); + response.Info = stream.GetInfo(); + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsStreamNamesRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamNamesResponse + { + Type = JsApiSubjects.JsApiStreamNamesResponseType, + Streams = [], + Limit = JsApiSubjects.JsApiNamesLimit, + }; + + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (hasJetStream, shouldError) = requestAcc.CheckJetStream(); + if (!hasJetStream) + { + if (shouldError) + { + response.Error = JsApiErrors.NewJSNotEnabledForAccountError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + return; + } + + var offset = 0; + var filter = string.Empty; + if (JetStreamApi.IsJSONObjectOrArray(msg)) + { + var request = new JsApiStreamNamesRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + offset = request.Offset; + filter = request.Subject ?? string.Empty; + } + + var streams = requestAcc.FilteredStreams(filter) + .OrderBy(s => s.Config.Name, StringComparer.Ordinal) + .Select(s => s.Config.Name) + .ToList(); + + response.Total = streams.Count; + if (offset > streams.Count) + offset = streams.Count; + response.Offset = offset; + response.Streams = streams.Skip(offset).Take(JsApiSubjects.JsApiNamesLimit).ToList(); + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsStreamListRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamListResponse + { + Type = JsApiSubjects.JsApiStreamListResponseType, + Streams = [], + Limit = JsApiSubjects.JsApiListLimit, + }; + + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (hasJetStream, shouldError) = requestAcc.CheckJetStream(); + if (!hasJetStream) + { + if (shouldError) + { + response.Error = JsApiErrors.NewJSNotEnabledForAccountError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + return; + } + + var offset = 0; + var filter = string.Empty; + if (JetStreamApi.IsJSONObjectOrArray(msg)) + { + var request = new JsApiStreamListRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + offset = request.Offset; + filter = request.Subject ?? string.Empty; + } + + var streams = requestAcc.FilteredStreams(filter) + .OrderBy(s => s.Config.Name, StringComparer.Ordinal) + .ToList(); + + response.Total = streams.Count; + if (offset > streams.Count) + offset = streams.Count; + response.Offset = offset; + response.Streams = streams.Skip(offset).Take(JsApiSubjects.JsApiListLimit).Select(s => s.GetInfo()).ToList(); + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsStreamInfoRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamInfoResponse { Type = JsApiSubjects.JsApiStreamInfoResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (hasJetStream, shouldError) = requestAcc.CheckJetStream(); + if (!hasJetStream) + { + if (shouldError) + { + response.Error = JsApiErrors.NewJSNotEnabledForAccountError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + return; + } + + var streamName = StreamNameFromSubject(subject); + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Info = stream.GetInfo(); + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsStreamLeaderStepDownRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamLeaderStepDownResponse { Type = JsApiSubjects.JsApiStreamLeaderStepDownResponseType }; + if (!JetStreamEnabledForDomain()) + { + response.Error = JsApiErrors.NewJSClusterRequiredError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Success = true; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsConsumerLeaderStepDownRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiConsumerLeaderStepDownResponse { Type = JsApiSubjects.JsApiConsumerLeaderStepDownResponseType }; + if (!JetStreamEnabledForDomain()) + { + response.Error = JsApiErrors.NewJSClusterRequiredError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Success = true; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsStreamRemovePeerRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamRemovePeerResponse { Type = JsApiSubjects.JsApiStreamRemovePeerResponseType }; + if (!JetStreamEnabledForDomain()) + { + response.Error = JsApiErrors.NewJSClusterRequiredError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + if (JetStreamApi.IsEmptyRequest(msg)) + { + response.Error = JsApiErrors.NewJSBadRequestError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Success = true; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsLeaderServerRemoveRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null || !ReferenceEquals(requestAcc, SystemAccount())) + return; + + var response = new JsApiMetaServerRemoveResponse { Type = JsApiSubjects.JsApiMetaServerRemoveResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + if (JetStreamApi.IsEmptyRequest(msg)) + { + response.Error = JsApiErrors.NewJSBadRequestError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var request = new JsApiMetaServerRemoveRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Success = !string.IsNullOrWhiteSpace(request.PeerId) || !string.IsNullOrWhiteSpace(request.Server); + if (!response.Success) + response.Error = JsApiErrors.NewJSClusterServerNotMemberError(); + + if (response.Error is null) + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + else + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal string[] PeerSetToNames(string[] peers) + { + var names = new string[peers.Length]; + for (var i = 0; i < peers.Length; i++) + { + var info = GetNodeInfo(peers[i]); + names[i] = info?.Name ?? peers[i]; + } + + return names; + } + + internal string NameToPeer(JetStream js, string serverName, string clusterName, string domainName) + { + _ = js; + foreach (var (nodeId, _) in _nodeToInfo) + { + var info = GetNodeInfo(nodeId); + if (info is null || !string.Equals(info.Name, serverName, StringComparison.Ordinal)) + continue; + if (!string.IsNullOrWhiteSpace(clusterName) && !string.Equals(info.Cluster, clusterName, StringComparison.Ordinal)) + continue; + if (!string.IsNullOrWhiteSpace(domainName) && !string.Equals(info.Domain, domainName, StringComparison.Ordinal)) + continue; + return nodeId; + } + + return string.Empty; + } + + internal void JsLeaderServerStreamMoveRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamUpdateResponse { Type = JsApiSubjects.JsApiStreamUpdateResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var req = new JsApiMetaServerStreamMoveRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, req) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = Internal.DataStructures.SubscriptionIndex.TokenAt(subject, 7); + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Info = stream.GetInfo(); + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsLeaderServerStreamCancelMoveRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamUpdateResponse { Type = JsApiSubjects.JsApiStreamUpdateResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = Internal.DataStructures.SubscriptionIndex.TokenAt(subject, 7); + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Info = stream.GetInfo(); + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsLeaderAccountPurgeRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null || !ReferenceEquals(requestAcc, SystemAccount())) + return; + + var response = new JsApiAccountPurgeResponse { Type = JsApiSubjects.JsApiAccountPurgeResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Initiated = true; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsLeaderStepDownRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null || !ReferenceEquals(requestAcc, SystemAccount())) + return; + + var response = new JsApiLeaderStepDownResponse { Type = JsApiSubjects.JsApiLeaderStepDownResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Success = true; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal static bool IsEmptyRequest(byte[] request) => JetStreamApi.IsEmptyRequest(request); + + internal (string PreferredLeader, JsApiError? Error) GetStepDownPreferredPlacement(IRaftNode group, Placement? placement) + { + if (placement is null) + return (string.Empty, null); + + if (!string.IsNullOrWhiteSpace(placement.Preferred)) + { + foreach (var peer in group.Peers()) + { + var info = GetNodeInfo(peer.Id); + if (info is not null && string.Equals(info.Name, placement.Preferred, StringComparison.Ordinal)) + { + if (peer.Id == group.ID()) + { + return (string.Empty, JsApiErrors.NewJSClusterNoPeersError( + new InvalidOperationException($"preferred server '{placement.Preferred}' is already leader"))); + } + + return (peer.Id, null); + } + } + + return (string.Empty, JsApiErrors.NewJSClusterNoPeersError( + new InvalidOperationException($"preferred server '{placement.Preferred}' not known"))); + } + + foreach (var peer in group.Peers()) + { + if (peer.Id == group.ID()) + continue; + + var info = GetNodeInfo(peer.Id); + if (info is null || info.Offline) + continue; + + if (!string.IsNullOrWhiteSpace(placement.Cluster) && + !string.Equals(info.Cluster, placement.Cluster, StringComparison.Ordinal)) + { + continue; + } + + if (placement.Tags is { Length: > 0 } && + placement.Tags.Except(info.Tags, StringComparer.Ordinal).Any()) + { + continue; + } + + return (peer.Id, null); + } + + return (string.Empty, JsApiErrors.NewJSClusterNoPeersError(new InvalidOperationException("no replacement peer connected"))); + } + + private static string? GetRequiredApiHeader(byte[] header) + { + var value = NatsMessageHeaders.GetHeader(JsApiSubjects.JsRequiredApiLevel, header); + return value is null ? null : Encoding.ASCII.GetString(value); + } + internal static string StreamNameFromSubject(string subject) => JetStreamApi.StreamNameFromSubject(subject); internal static string ConsumerNameFromSubject(string subject) => JetStreamApi.ConsumerNameFromSubject(subject); diff --git a/porting.db b/porting.db index f8dd1f1e27d0ffe55ab08c18d94be1efe0da6abf..a90e491a5296cab2bf5642af0b2324816e479b7d 100644 GIT binary patch delta 5864 zcmb`LdrVZ>9mnru9{0@5odFqKE(0>NKFbQsdxjYn1W{LaSFx+DuDbG?p|q&T;G?Tu z7&qB%N)y-Ea?<7w(X=MrtWDL_Olr3q9-<NmyJEYxY5(cEZBo-FroXv&xZF!_ zgJt;RLw;Y*z2|#==brOB2fEMjboUfbe_+#oND#W%DSH+9;l=tjW#q|s&z2?6mXS>t z=!bMa)=!b;%a|{j&zZZ-4dx1Sp6S!PsIh8v^i%q8^aFaF?ned4jELl#De@gM`Nv1( zXJi*k7g9|J_O-V~IwRWK3N@}!w-jnjp+*(zrb3M<)D5|!AG67-&&iWCMG*MTU&wLg zXs*kj)m~GmVTHP?P(unes89n6HK|Y&tU}&N7XOv}{bCJI?Im!39kl|FXH$!@z25Ro zTz7$r;vh+F!`&oh5`H8hx=3nFOQ@gVrf$Epk7^q8<8U^)JN! zI;b{DU9*HM$hRap42^Y?x(cdBQbVA;k{SfHBALZfgMC&+=TfoGLyg)A<39<55Z%Fl z{)pP#CAHi(JtKOBmT`!_FGRN>Iw3@35FHhwQHb^n(M^b&g=hq#H-+d1M0G-R9isI@ zbPb{kAsU9rEksu#S_P4#Q9A_T65*GF5Ltz203vailgUrdQ_iK#CF4CJwab#GJ5)WX zF+Mcj!;VR+G}$*ng$*WX>PQ&j=RyAP`;YHa01apWEuaJR$;bDZrQ7li2Ii^$OWi%) za&12S1$CMH6>)^U$2yHx?68yDjVCI5wk#sZjq!tFd~txz!Eqnz$JMW+mAF2D7~Jkh zt8n+L2${=6ZcqGZ&*p4`+`1)lsOTcQN1Skc{04LxA+<*u00zSYG1b`q=4upU(PyxIQR037N z8lW1e0oDSwz&c<(umN}l*a&O_UIktQUI#V&QvYyY>0i!V{hGG5^Za3`9^2h-f*b7wbS0-5ou~`h{al?t&y0$HFeJK zi|p@=G~=z-hJ5@%Z$xMJgxuvJmmdrdsTp3!4ca|^>@>Dy2Lf*2f|+%ynO&2cIWx?x zXgaen9}~@Tlv8Ft0m00s~HX|4Q&B$aG{n=m_MNAI3PNHIxxK>ybCs|A%4H;sN8~+ z_{sT3DDSE5SMJO33oe;nq^0~mLu$VtQ?60+?k*di`k`wP&Wi6uc*@+O54hY5XWFJ_ zda$awMDFJdWVSK%=_HdMGcC3H-a4dNolA@pmTL36>@ce{y<+^HVa#Aholp~L{U+Th z?eDZjnil#O)W4`o@;z+7K=*NRZ^D@V9EEXmrXr4FvN$S!4ld~NO#d|_R>WzA9sh{t zkuf}BX~|jbcDnr~PEU!`Wp@fAgA0~7VNmP*ro3~vYr4DtvvXeprI)T^@?s4&gU4p>eToW zTD8f?%IeQS^`GbHeF=@4!R%_!LA9T^1FhyTySnFjpJR?xzpgJqsTs_!<{VV>d8a_C zIn1u!98~XlI(QR==#Z`0WU^|_LA9Q{^J2CAwTkZ&3#FoOtWuphsLt~?c}LAeS&ccD zGcb4ms$V(8e0TJrY;zk zbk>SpF54emx#u}Hx9TzEO_pS2%2*bnpE-QEPNW*}St9IXF&cJ51<(%(oiHM7?{ N2kU)qha8m(`(FU3ZutNJ delta 1667 zcmYk*YfM{Z9LMqVoL)Fj%W1iETwA#gw41b0?iabUxeWvc>XeN-D7!JaXq+)*Y%;eO zC0+vZkGdde5EFGlm*p6Ox_1ks?{@7dLVh4sMUD~9Jb)jN##ENA7-0NOBMjKdS zUCd}Y8~AARH`qUHP;Zkg9X$toyM534%)P44A&WY$pO9Ka9YfWL8bTF{I*M|NI)bu` z>OrmE;pw$^&@HQ4`AznUPFBWg@mF?XP?Dr!apgftVlRm5MsO_5uDirk% z${DPbz`?#o+QslI)ao%8v>!f{-D%2Tp+i?Hk4T3oC7HFFkHz_$qa@b)5-Xv~ zMINo?l<-kTIV%PSO(Z0#bgPtm#=g*5D7%brCHA?JLGP4tCze$0av5J>6t&mrpx#j< zr}=X3qQ*D4g`RiwYt-Zj%cOy9?hal*o6SeWl0pMHd}HvtrUHLGPT)IgEg`l)F`E9) z#|E7q9!qy}_;E^oiwAe%bjri?Lfil0;h&M`+Byfhx7)+%a5fL4>$&%Co|wl^Q`8D~ zjM)rkx}C>U0|(H#{afhwZ9JZ;@_8wJVv2E)H#f;l@8$E1(6(3d`HVOsr`ZC&AvjQE z|K>=E)%Q?e1Gka2kPn5vmQ=|ViY?@kboV2BCfzOMQ=!4FHoJ)~7V)gm#^z%F_L?g$ z7xS#Z3mkKsjAX3fe~MY?mC%k#JYTav>i_2}uL{0!3mvNBFH?V$vbt`h+-kl)_=hxL#FnQ%Wm08ZEd8{t970vEU;8*;z{ zxsV6>PymHc1jSGSrBDVBK{-@FB~(E*)IcrN!6w)Y55pGN3frI_8sHIVgeGW)?a%_P z@F+Y6kHZdl0-l66*a^F!9lY=q?1m2L1fS-r?fP*x&ZNvq)@sX$dP$8BZ!?dZZkU|> S8D&nxpL9ae=9E1(VgCUrY93(# From d6754eae250c47efbf9e094a7c847badf1c3cd67 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 22:33:50 -0500 Subject: [PATCH 4/6] feat(batch28): implement jetstream api consumer/message/snapshot handlers --- .../Accounts/Account.JetStream.cs | 18 + .../NatsServer.JetStreamApi.cs | 460 ++++++++++++++++++ porting.db | Bin 6762496 -> 6766592 bytes 3 files changed, 478 insertions(+) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs index 9560f66..adcafc4 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs @@ -545,4 +545,22 @@ public sealed partial class Account return EnableAllJetStreamServiceImportsAndMappings(); } + + internal Exception? JsNonClusteredStreamLimitsCheck(StreamConfig config) + { + var (server, jsa, err) = CheckForJetStream(); + if (err is not null || server is null || jsa is null) + return err ?? new InvalidOperationException("jetstream not enabled for account"); + + var selected = jsa.SelectLimits(config.Replicas); + if (!selected.Found) + return new InvalidOperationException(JsApiErrors.NewJSNoLimitsError().Description ?? "jetstream limits not configured"); + + var reservation = jsa.TieredReservation(selected.Tier, config); + var js = server.GetJetStream(); + if (js is null) + return new InvalidOperationException("jetstream not enabled"); + + return js.CheckAccountLimits(selected.Limits, config, reservation); + } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs index 6b67d27..9c84838 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs @@ -2,6 +2,7 @@ using System.Text; using System.Text.Json; using System.Text.Json.Serialization; using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; namespace ZB.MOM.NatsNet.Server; @@ -84,6 +85,21 @@ public sealed partial class NatsServer new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreams, Handler = JsStreamNamesRequest }, new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamList, Handler = JsStreamListRequest }, new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamInfo, Handler = JsStreamInfoRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamDelete, Handler = JsStreamDeleteRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiMsgDelete, Handler = JsMsgDeleteRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiMsgGet, Handler = JsMsgGetRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamPurge, Handler = JsStreamPurgeRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamRestore, Handler = JsStreamRestoreRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamSnapshot, Handler = JsStreamSnapshotRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerCreateEx, Handler = JsConsumerCreateRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerCreate, Handler = JsConsumerCreateRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiDurableCreate, Handler = JsConsumerCreateRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumers, Handler = JsConsumerNamesRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerList, Handler = JsConsumerListRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerInfo, Handler = JsConsumerInfoRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerDelete, Handler = JsConsumerDeleteRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerPause, Handler = JsConsumerPauseRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerUnpin, Handler = JsConsumerUnpinRequest }, new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamLeaderStepDown, Handler = JsStreamLeaderStepDownRequest }, new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerLeaderStepDown, Handler = JsConsumerLeaderStepDownRequest }, new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamRemovePeer, Handler = JsStreamRemovePeerRequest }, @@ -946,6 +962,450 @@ public sealed partial class NatsServer SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); } + internal void JsStreamDeleteRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamDeleteResponse { Type = JsApiSubjects.JsApiStreamDeleteResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + if (!IsEmptyRequest(msg)) + { + response.Error = JsApiErrors.NewJSNotEmptyRequestError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = StreamNameFromSubject(subject); + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + requestAcc.JetStream?.Lock.EnterWriteLock(); + try + { + stream.Delete(); + requestAcc.JetStream?.Streams.Remove(streamName); + response.Success = true; + } + finally + { + requestAcc.JetStream?.Lock.ExitWriteLock(); + } + + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsMsgDeleteRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiMsgDeleteResponse { Type = JsApiSubjects.JsApiMsgDeleteResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var request = new JsApiMsgDeleteRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = SubscriptionIndex.TokenAt(subject, 6); + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream?.Store is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (removed, removeErr) = request.NoErase + ? stream.Store.RemoveMsg(request.Seq) + : stream.Store.EraseMsg(request.Seq); + if (removeErr is not null) + { + response.Error = JsApiErrors.NewJSStreamMsgDeleteFailedError(removeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Success = removed; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsMsgGetRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiMsgGetResponse { Type = JsApiSubjects.JsApiMsgGetResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var request = new JsApiMsgGetRequest(); + if (JetStreamApi.IsJSONObjectOrArray(msg)) + { + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + } + + var streamName = SubscriptionIndex.TokenAt(subject, 6); + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream?.Store is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var storeMsg = stream.Store.LoadMsg(request.Seq, null); + if (storeMsg is null) + { + response.Error = JsApiErrors.NewJSNoMessageFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Message = new StoredMsg(); + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsStreamPurgeRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamPurgeResponse { Type = JsApiSubjects.JsApiStreamPurgeResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var request = new JsApiStreamPurgeRequest(); + if (JetStreamApi.IsJSONObjectOrArray(msg) && + UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = StreamNameFromSubject(subject); + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream?.Store is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + (ulong purged, Exception? purgeErr) result = string.IsNullOrWhiteSpace(request.Subject) && request.Sequence == 0 && request.Keep == 0 + ? stream.Store.Purge() + : stream.Store.PurgeEx(request.Subject ?? string.Empty, request.Sequence, request.Keep); + + if (result.purgeErr is not null) + { + response.Error = JsApiErrors.NewJSStreamPurgeFailedError(result.purgeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Success = true; + response.Purged = result.purged; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsConsumerUnpinRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiConsumerUnpinResponse { Type = JsApiSubjects.JsApiConsumerUnpinResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var request = new JsApiConsumerUnpinRequest(); + if (JetStreamApi.IsJSONObjectOrArray(msg) && + UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsStreamRestoreRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamRestoreResponse { Type = JsApiSubjects.JsApiStreamRestoreResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var request = new JsApiStreamRestoreRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = StreamNameFromSubject(subject); + var restoreErr = ProcessStreamRestore(requestAcc, streamName, request); + if (restoreErr is not null) + { + response.Error = JsApiErrors.NewJSStreamRestoreError(restoreErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.DeliverSubject = $"$JSC.R.{streamName}"; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal Exception? ProcessStreamRestore(Account acc, string streamName, JsApiStreamRestoreRequest request) + { + _ = request; + var (stream, lookupErr) = acc.LookupStream(streamName); + if (lookupErr is not null || stream is null) + return lookupErr ?? new InvalidOperationException("stream not found"); + + stream.UpdateConfig(stream.GetConfig()); + return null; + } + + internal void JsStreamSnapshotRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamSnapshotResponse { Type = JsApiSubjects.JsApiStreamSnapshotResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var request = new JsApiStreamSnapshotRequest(); + if (JetStreamApi.IsJSONObjectOrArray(msg) && + UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = StreamNameFromSubject(subject); + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (info, snapErr) = StreamSnapshot(stream, request); + if (snapErr is not null || info is null) + { + response.Error = JsApiErrors.NewJSStreamSnapshotError(snapErr ?? new InvalidOperationException("snapshot failed")); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Config = info.Config; + response.State = info.State; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal (StreamInfo? Info, Exception? Error) StreamSnapshot(NatsStream stream, JsApiStreamSnapshotRequest request) + { + if (stream.Store is null) + return (null, new InvalidOperationException("stream store not initialized")); + + var (_, err) = stream.Store.Snapshot(TimeSpan.FromSeconds(30), !request.NoConsumers, request.CheckMsgs); + if (err is not null) + return (null, err); + return (stream.GetInfo(), null); + } + + internal void JsConsumerCreateRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiConsumerCreateResponse { Type = JsApiSubjects.JsApiConsumerCreateResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Error = JsApiErrors.NewJSClusterUnSupportFeatureError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsConsumerNamesRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiConsumerNamesResponse + { + Type = JsApiSubjects.JsApiConsumerNamesResponseType, + Consumers = [], + Limit = JsApiSubjects.JsApiNamesLimit, + Total = 0, + Offset = 0, + }; + + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsConsumerListRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiConsumerListResponse + { + Type = JsApiSubjects.JsApiConsumerListResponseType, + Consumers = [], + Limit = JsApiSubjects.JsApiListLimit, + Total = 0, + Offset = 0, + }; + + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsConsumerInfoRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiConsumerInfoResponse { Type = JsApiSubjects.JsApiConsumerInfoResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Error = JsApiErrors.NewJSConsumerNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsConsumerDeleteRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiConsumerDeleteResponse { Type = JsApiSubjects.JsApiConsumerDeleteResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Success = true; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsConsumerPauseRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiConsumerPauseResponse { Type = JsApiSubjects.JsApiConsumerPauseResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var request = new JsApiConsumerPauseRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Paused = request.PauseUntil > DateTime.UtcNow; + response.PauseUntil = request.PauseUntil; + response.PauseRemaining = request.PauseUntil <= DateTime.UtcNow ? TimeSpan.Zero : request.PauseUntil - DateTime.UtcNow; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + internal static bool IsEmptyRequest(byte[] request) => JetStreamApi.IsEmptyRequest(request); internal (string PreferredLeader, JsApiError? Error) GetStepDownPreferredPlacement(IRaftNode group, Placement? placement) diff --git a/porting.db b/porting.db index a90e491a5296cab2bf5642af0b2324816e479b7d..1e4d39e7f6dc105f83d42d656a4b78d38179a9a8 100644 GIT binary patch delta 4187 zcmai1X>43q8J+iL-|U%qiQ~EBC5!Dgw&%XtY$qm(X=zE)Hj~ATli2a+@kVJWspB9j zCD=~eud0$lQb8}g-Il0rq zuBGIvv6S2_v6Lil%$%3xwzuu&v-$j?X8A>(&hf#o z=xmm=FX(KBv!gnj=Ik+@O>wqEXOo;!olS7IQD@_v73%Cme`%)PC;#kC?~?al7SGAS zQ)&Bc_P3wNZ9{Hrk(}j=zW-UyXVC&jXS3)QM_>X#nP36~S-uSDW@HqIuNwxCj*?E~iR{q%dW9N^9KhDhTyzA;g*7CT!n!Wcc zmyH!f+#A^3@~`2&ZkIh4+ZK-^btCm4^&<5l^&`zgnvXPqG>CK^(gLJ~NQ;mbBVCWQ z1nCB(rAR~XQ{t+q$gA-e`Wn4Yd(OGvc3pM;+v#%@+fG|wl1HRhEu;B9rkptTyz8lR zT|tYqdp8@Q?qYVKmAKfi_mi~N+9&PA!=5-mstjGRkptv4!|*EwZZF$+kQh5>rw)?0 z4DYxP5ifh?yDs6K=MRx@8s6E_O12oL$((2hBySXIN#I zJwS8=Bvc4=&%A2I;EE!f=A-zyCl2j|bmarTiDGHBBvA*7}!BYdhC++Owm2`+* z3w4(Z8MkYG+2s&>_D)}&Us0OkIYjE!4JO^p8NE>&a~T{_OH4YM)jAn|r#qZI ze7v1_E|;s-^_r61xF#l5adSqma&vY`ij)1u-Jj=6#B&=|Y|_@WU=YoGVi1|@7^EjcF;tw-HE6LCQ1|cE74WG&`;euO-a1l&(-j| zJ@n4qc-rbv*LS&Xn*+9i z<=FhXf5Knt>+=5C^QmXM`>gBtu6*Z{4%PmVeXDIC!=rMKtz;yjDq2<+h*>EH9nZ>S z?Eb7Q)3P$5QZtQ}8OKH9i8VwbW+H}o6xO0>yj{_wp;)S1<<2sN(J z&~Sebx<;+3hL~quBbmHk4B^`heN| zD+4U3C=IKdS+CY;B6jgqnPgXhXV)5LRGT$ph%Md+UCWFrGc%^x{=AMPJ0bT6dHtI6 zZ!w#HWqbt{nlLd#-_gxxA}f=Nixd6#^4omh*_k=# z%*^octIF`~QRTp5%SlPH_Ks(i%O}soLP2@|-pQab8I(ItE8i;#vdzk^O#SfC8kTAt zo0ZqgdCtWDAKI0F4?2+PT}0L& z=gl^R_MX`W(cU!MZnPt2dj@UHY`f5g%+|3JwcD%%Xzga(fmUm_ezZ!n^`RA;tryMQ zgw}&*ZbI8`B(BR9Ts_L75+%Ycho~&dn`Pp&$>s@a) zec)qWW2?e0+UUCzUY!ERy_{@scrO@!7d!i!_OKzdT7&yhMs`V6T_q)(B;B3(qP6zLPBGLbGI6^e8oDNm%2k^D#t z+btg<`o!Qlq~zi*8{fRe3bO3)Xfx8SZ^>|7Vx2N&US=zdV-ZiObKlH01`4QP0V~*y znQQj5HFWTOo=+)3M;cwl&%~6XPYegF`>X{HaDoOd(7_EJ@WMQp4=Lb-1&|60VG*Rk z9kAH27I2o~_i(peU*?Kxapz&j8T&7G$>y|lt2>o`HX^@_mzBMXTv0wwk#t)wg(KWa z(^0+zD{|9hm_JYLucW3^sD}HfDZ+Ea_=}PMoFA^{*%UtPaMPY@K1n@ppSYk{ocml2 ze~Av3X}J`taNT)

@_)v1?VwRHTx!Jch^R2fn-r6!ZUedzj z$?;!XxrKge;aTFTIK88_*j4$+%2FCXrKC_{D<|{B{9AJ=vC$gZRN=^^jcvR&`Jb3{ zrcFEKE3`orV(=)mLkDz1mtl=B5a_`r_)dNOmhV0Re!1Bj_V1EN) C=Jf^u From 28115ae79c686d6a6d70dc03990a30fef37a1d73 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 22:37:09 -0500 Subject: [PATCH 5/6] test(batch28): port jetstream api object-shape and delayed-response tests --- .../ImplBacklog/JetStreamEngineTests.cs | 65 ++++++++++++++++++ porting.db | Bin 6766592 -> 6766592 bytes 2 files changed, 65 insertions(+) diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamEngineTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamEngineTests.cs index b804991..26f073f 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamEngineTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamEngineTests.cs @@ -6,6 +6,71 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed class JetStreamEngineTests { + [Fact] // T:1716 + public void IsJSONObjectOrArray_ShouldSucceed() + { + var tests = new (string Name, byte[] Data, bool Valid)[] + { + ("empty", [], false), + ("empty_object", "{}"u8.ToArray(), true), + ("empty object, not trimmed", "\t\n\r{ }"u8.ToArray(), true), + ("empty_array", "[]"u8.ToArray(), true), + ("empty array, not trimmed", "\t\n\r[ ]"u8.ToArray(), true), + ("empty_string", "\"\""u8.ToArray(), false), + ("whitespace_only", " "u8.ToArray(), false), + ("object_with_whitespace", "{ }"u8.ToArray(), true), + ("array_with_whitespace", "[ ]"u8.ToArray(), true), + ("string_with_whitespace", " \"text\""u8.ToArray(), false), + ("number", "123"u8.ToArray(), false), + ("boolean_true", "true"u8.ToArray(), false), + ("boolean_false", "false"u8.ToArray(), false), + ("null_value", "null"u8.ToArray(), false), + ("invalid_json", "invalid"u8.ToArray(), false), + }; + + foreach (var test in tests) + { + JetStreamApi.IsJSONObjectOrArray(test.Data).ShouldBe(test.Valid, test.Name); + } + } + + [Fact] // T:1719 + public void JetStreamDelayedAPIResponses_ShouldSucceed() + { + var account = new Account { Name = "ACC" }; + DelayedApiResponse? head = null; + DelayedApiResponse? tail = null; + var now = DateTime.UtcNow; + + var responses = new[] + { + NewDelayed(account, "request2", "response2", now.AddMilliseconds(500)), + NewDelayed(account, "request1", "response1", now.AddMilliseconds(200)), + NewDelayed(account, "request4", "response4", now.AddMilliseconds(800)), + NewDelayed(account, "request3", "response3", now.AddMilliseconds(650)), + }; + + foreach (var response in responses) + JetStreamApi.AddDelayedResponse(ref head, ref tail, response); + + var orderedRequests = new List(); + for (var current = head; current is not null; current = current.Next) + orderedRequests.Add(current.Request); + + orderedRequests.ShouldBe(["request1", "request2", "request3", "request4"]); + } + + private static DelayedApiResponse NewDelayed(Account account, string request, string response, DateTime deadlineUtc) => + new() + { + Account = account, + Subject = "SUB", + Reply = string.Empty, + Request = request, + Response = response, + DeadlineUtc = deadlineUtc, + }; + [Fact] // T:1477 public void JetStreamMaxConsumers_ShouldSucceed() { diff --git a/porting.db b/porting.db index 1e4d39e7f6dc105f83d42d656a4b78d38179a9a8..7ec2d0e42a7b390d099f5b2561ca5a7992d108ed 100644 GIT binary patch delta 1267 zcmciBNlX(_7zgmqbYpF&6bb^xX|3Qw?M`7-zy$>aWplv|hR#q&=zyJ;lZiB%kPvU0 zyu{cj96T5fB|*m%QO}rol^}_U;owbA9z6JO0VO8hjE7$)dEdPGmU(Y_I<1|C%u^5b z;_jShX3j$a4O-BF9t>cFJjjOvD1;&?29rAHF+bNAL?&XB<6=TIERoK3I;`Fgnf-cG zvDQxiil{RyoLODrn8qaafao*Kf_C2Yki5z;k4Wnxqf`49nYv|=B$6YBCH32P#$2v5ENOnIddAX5>9lKx_axIp+sLrR^7v(m zWsSpH0g;FR38E7ymU6}LwRpXeGr%7L>%SG6GC06nzwlh4l5p%*GaKT=v zg*w;=`{4i_gnDqpA@G0~eBg%w1R(?s5Qau*0uBzt5jYCX&;qS+435JII0d4bRqd(B))PU+>DHm0hYDtF*H4fUI%D1Pg0!dj)3?)P*p%_&1sZOXTOlq*K zJECxb@b-C2@XA<-6{jRYlp`X?Dv}%>7v0%iY?Q}?o#Yio6j-0f$BKO92FuF=8yUx9 zE>7Nj6s?W`zKFk>YEB0?^Be!jPkO%!?p*pVq;LAC*`3?jJJNxz?#oi)zjS{m*M0VS zY<4cm?#cK>G%hL%$L6}0vR5=F$_YLpO~?w%3j*5Db{$X3GRDKk#al@!j>XMe6Un`} zIW51}-{AHJ++H8+MOh&(7)rIL{aZspSw%bR`aP9wSxx`7s!-ZjsIlXySnVk)Ko%OJ L`RZ&iI#u`+^poQB delta 669 zcmX}hO-Pe*0D$qmAKPr3@7{OX>`QaY(pEWVo3pnMS83V$*1pU>%=T_r?6T5RhehBZ zI#z#(q~jwj0--~PP7wq>5p?)no~K-%(aNa3jw^GIs_{xSu0Vwb z9d3BwMIF|`2R{O+has!+;2XCu=j5GI$;l~SM0QY3%O|N|x7+Y{x{UuPR9Uiva>+J( zCzWTyT{GW$YsRec)R-3YHS>iWnlsz$wPlwf9xR$kG2NoZ0gySH*v=vQ~^@8``iPJJ5~}>_iN^up4{Oi8#8DKsWXxi4=N}Mlbfk#(o^Y zK^#IKGB}JQIEsD@;24f$5GRnuNu0uIoWWU~!+8v07#DC6mv9+ZFoLTX#Wh^V4cx>S rZebj^k%J@GiuvXbnyHtBx2q75kya(E>km^i2~q7SG|I=x!j%6HoR%AN From b99344a9e737427f9c0e31561bfa1306f77d29b7 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 22:39:04 -0500 Subject: [PATCH 6/6] feat(batch28): complete jetstream api feature and test port --- porting.db | Bin 6766592 -> 6766592 bytes reports/current.md | 12 ++++++------ 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/porting.db b/porting.db index 7ec2d0e42a7b390d099f5b2561ca5a7992d108ed..7ebd7a6fef8ab32e10cd9b745152178441a97f9d 100644 GIT binary patch delta 4135 zcmchaeNYtV8NhdM_deeH<^ToRTTP>=t+Cd~M-)1_ zI;|PQR3zlmYk-^nYzL=wZ&e~hN2{|HS< ze-TYd{~?-^{sNj(iw~$;T*rJda=vR!6T|9dy~;4mzhyg%rksQ`Xv#@Aji#K0_t2D+ zAfhQJp%+al{<~;O@p~A|uT=jX3@S(A6q>U8lW5BByU~>0pFmS~--V{^{%vaa0qV;4 z(O7f4@Q_7R2#4^98qpwHQFy4^y#!94FQJ* zbiwHqatKD_Y<5_G?ye2DNT#hmk)n0B(ac_ zM&`rtZoM69-3|l1nMRVv?sa-#AdQ?1oAr!`q?)BjtIC%b!LUVJVOueY0!uo1F?gM0 z5z)cKbP@qijtjBSTq+PO1WxrhoxB;m#9J!p;mSOc7D}SeApWqM2Qo<7SRtIAeKEc= zEX2zFY$hzaN+Ly9COL%Fe15VW{F59e7@1EpL%VclkwR!a&fB5!kcL2K7I6h5;y@Pp zh=tJ#eF9v}9IfU%7I5la`R*!u1&w?J%;D5V*30xGcdL z6MflO-c(a2tdCSj!nCC%9a??bQ2u>O$uprf_beqf;5w#Bfb?a-V&HOjEhEnd2cz@I zywH@peDiV2#lOjfDSccdH0F`LFv`V*rkuE(qz0!HOO})QEDT@9I*k0>WQ3l4k|@pL zZb|TEgqCH!VTuB_fOx?6AI{!Uz-c=Q@yXX0km=GiZa7dtN}#Ml4$caVClz9i%BtgL zz^Ou#5*qxzkgS261CgNz`)m=J9f}+&BAcM@0KP!or*%9yi$m*l!!L@-Ryv{uQbLkL zH)fTP4RC4@cM&+x>ETid2|j0co0qhC^=)25o7dRpHMM!oySliO?TmoeBQV`Y3Ud&8k!Jnk`H( zBt%)ytN6K4^VAZ@rh@lJ%SFXoR5#U)U#9LEYihE+L)j_HPEyuQ*$K+JD3jXecn1D&W3l$b z@m3@Pu_2L26cUY0KqewFNGvi5i4zaU3kO|byI_=#V-!@D3dvFfg#)F+%fYut>KooN zVNNj6QYLhU1>PzX46ts6Ff|z6zd|?!u9RSlje`7gAxR2|d&-40HtfFpD}`k9SgUT? z?yJIX<7u(L4~5v%+!ewLF#IjP^MQ2+BiybC%>c$#LK{pS#IriyCN%KDDj_A9_|Yn1 zBdC_ErDhrdFRdQm$hpS zv%C!>U`x`#ytP7-6x_U4cs6V&nyS#ar%GrEj*nCeYIsm3%(jeucUrB!a@=8dOKK(@ z{0sH~SG550g>K#0HQ38m1bbOgjS%W(QcoYK5t5~>hIP~f>0cMBOIxkbR%`FAjX&NA z+c#s`q(=Ou$`S1IZZLC>^>Cp9Uwkj`(86n+WBeWO=NxP0h*N5+?v^~C6Z}Oa4vz7T zeCYhngb!jmZ-N-LBlIFesoJp(Mmw-KWdBpp!iW-?rE#o*qbIN}`dKVlmqyMtq;XV* zt#7^7kr8Yd?S8ETnT$+9rXtgj>B!HJ1Y`y>6Pbl1BF`YRkvT{bl8iW!6vTz3B5q_Z zl7@JYbYvcqfn*}{kt}2Zl8r1x79lyvVk8$?f-FUrA$iDhBp)e23XvkD7%4%#NGVc= ztU$_<(1V(v9EQ-W8}uYLInE zEwUb|LpF$Af%=L!Y*dTUDvz9zt=qstf}6YQ12+R%?Z; zCsi78OKn3y1?Ei+#wPWBsu2xS_AO=iC>x?|kg~g!eM8wD%9?IdIzZVi%5GA2gR-wF z`-(E@$H{naRbQuK*P1B3O4$|4`YHQ@vd<~|jIzs=eM;FUrnvAg%OxuKF&v4GnAD`c zNR56-*#*j^pC0l~161sN%Fa`Eja3}vS&dyg`avR)8u5$gfJtZ}e=g0)vn ain7MCE8nFq`X7B~+wowF=1m(%Z~Pa+9xKKG delta 3274 zcmZXUdr(yO7035J_U``J*F~0xcx6S2B2id&m*p+;5RkXLLTBR?IFR z{r=ARoqO)@cZP>w5{5%u;IgN=EXbMWPVv%z;ge}@b-ZChH_Ly-i|?7&nUl=2*e=$H z8$>5ee&$O`7sti1sD!8}@*m?p<89-0o~R>su;q}aBNi8x5DDUQNHjb=t1N}# zYGR~G=W@uIg>|tonM<;$5tT>A77R0B0s~Ctga?U1{Gfy!T^!~qCF`|X1Ye*8>dHtm1WHNg;$=5Vi3O%k8oz0e1l46^ zWbwMM%IFu7AY&8RN8Q<8Lg0fSelv0FUGBUr zuUfyE6ok7w+h;Mug(^I0T5+obdEw&Z>lQn3)!6pX)2Y+1h3Z+CpG?2L)yYd=n#CT7QdJFJ}=dhZ&5?Lq0TxywwlpjM+R8K z0HmH|P85V*nh*u=*5l>;%GO6l#fA`3hzT(x7R0KG4YsdOilkdNt?FlO)dKLb za;m!bU$zy37#b_;i}!6;I9L}eC&HdN+pzW!?$6mC=Yr7l(Do%=)%~UIWUS2^r+dI# zb$l~l&gbxHFzl1#RF5nlvB8;8xv8#zoWFxO0 z@AE&xsTHyE8V(LVw3*>fum2)AEGIlW*M z)5S-cly4@X9jcm@bZxP^zgh8yPmz}Yi%U0!?h3&LCBEg$ml}x(vgRP36x;X5j;YV%rWmVQ01}`>8v8 z01s?MyRwlQKWa<|Xd^4NBze<7(Iq>gzbw?i3PFwF2-2Q5Q19g+jh_?kr{ z?tFOm=Xl{ZE(Q z(U){K{Bg8<|1t0I(wkrt96lO1^Kz06UJxAVFhKeYFex~;MHnrjgU%MAGT?-k#A$bLt617V)>DJ_O9JdM%1UuK7A|gk}gPq6@qzmyOJCSZ= zmpUHoX+0?p5vvY@Grk6pKlI5kd5)K0ph12fW>588Qk^Tf1_MMj$>!0-e!rU7-}|@g F{{uWiA!+~s diff --git a/reports/current.md b/reports/current.md index ffce6df..640ac4a 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-03-01 02:57:31 UTC +Generated: 2026-03-01 03:38:48 UTC ## Modules (12 total) @@ -13,18 +13,18 @@ Generated: 2026-03-01 02:57:31 UTC | Status | Count | |--------|-------| | complete | 22 | -| deferred | 1430 | +| deferred | 1375 | | n_a | 24 | | stub | 1 | -| verified | 2196 | +| verified | 2251 | ## Unit Tests (3257 total) | Status | Count | |--------|-------| -| deferred | 1504 | +| deferred | 1502 | | n_a | 307 | -| verified | 1446 | +| verified | 1448 | ## Library Mappings (36 total) @@ -35,4 +35,4 @@ Generated: 2026-03-01 02:57:31 UTC ## Overall Progress -**4007/6942 items complete (57.7%)** +**4064/6942 items complete (58.5%)**