From d6754eae250c47efbf9e094a7c847badf1c3cd67 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 22:33:50 -0500 Subject: [PATCH] feat(batch28): implement jetstream api consumer/message/snapshot handlers --- .../Accounts/Account.JetStream.cs | 18 + .../NatsServer.JetStreamApi.cs | 460 ++++++++++++++++++ porting.db | Bin 6762496 -> 6766592 bytes 3 files changed, 478 insertions(+) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs index 9560f66..adcafc4 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs @@ -545,4 +545,22 @@ public sealed partial class Account return EnableAllJetStreamServiceImportsAndMappings(); } + + internal Exception? JsNonClusteredStreamLimitsCheck(StreamConfig config) + { + var (server, jsa, err) = CheckForJetStream(); + if (err is not null || server is null || jsa is null) + return err ?? new InvalidOperationException("jetstream not enabled for account"); + + var selected = jsa.SelectLimits(config.Replicas); + if (!selected.Found) + return new InvalidOperationException(JsApiErrors.NewJSNoLimitsError().Description ?? "jetstream limits not configured"); + + var reservation = jsa.TieredReservation(selected.Tier, config); + var js = server.GetJetStream(); + if (js is null) + return new InvalidOperationException("jetstream not enabled"); + + return js.CheckAccountLimits(selected.Limits, config, reservation); + } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs index 6b67d27..9c84838 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs @@ -2,6 +2,7 @@ using System.Text; using System.Text.Json; using System.Text.Json.Serialization; using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; namespace ZB.MOM.NatsNet.Server; @@ -84,6 +85,21 @@ public sealed partial class NatsServer new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreams, Handler = JsStreamNamesRequest }, new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamList, Handler = JsStreamListRequest }, new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamInfo, Handler = JsStreamInfoRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamDelete, Handler = JsStreamDeleteRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiMsgDelete, Handler = JsMsgDeleteRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiMsgGet, Handler = JsMsgGetRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamPurge, Handler = JsStreamPurgeRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamRestore, Handler = JsStreamRestoreRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamSnapshot, Handler = JsStreamSnapshotRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerCreateEx, Handler = JsConsumerCreateRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerCreate, Handler = JsConsumerCreateRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiDurableCreate, Handler = JsConsumerCreateRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumers, Handler = JsConsumerNamesRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerList, Handler = JsConsumerListRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerInfo, Handler = JsConsumerInfoRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerDelete, Handler = JsConsumerDeleteRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerPause, Handler = JsConsumerPauseRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerUnpin, Handler = JsConsumerUnpinRequest }, new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamLeaderStepDown, Handler = JsStreamLeaderStepDownRequest }, new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerLeaderStepDown, Handler = JsConsumerLeaderStepDownRequest }, new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamRemovePeer, Handler = JsStreamRemovePeerRequest }, @@ -946,6 +962,450 @@ public sealed partial class NatsServer SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); } + internal void JsStreamDeleteRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamDeleteResponse { Type = JsApiSubjects.JsApiStreamDeleteResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + if (!IsEmptyRequest(msg)) + { + response.Error = JsApiErrors.NewJSNotEmptyRequestError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = StreamNameFromSubject(subject); + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + requestAcc.JetStream?.Lock.EnterWriteLock(); + try + { + stream.Delete(); + requestAcc.JetStream?.Streams.Remove(streamName); + response.Success = true; + } + finally + { + requestAcc.JetStream?.Lock.ExitWriteLock(); + } + + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsMsgDeleteRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiMsgDeleteResponse { Type = JsApiSubjects.JsApiMsgDeleteResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var request = new JsApiMsgDeleteRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = SubscriptionIndex.TokenAt(subject, 6); + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream?.Store is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (removed, removeErr) = request.NoErase + ? stream.Store.RemoveMsg(request.Seq) + : stream.Store.EraseMsg(request.Seq); + if (removeErr is not null) + { + response.Error = JsApiErrors.NewJSStreamMsgDeleteFailedError(removeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Success = removed; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsMsgGetRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiMsgGetResponse { Type = JsApiSubjects.JsApiMsgGetResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var request = new JsApiMsgGetRequest(); + if (JetStreamApi.IsJSONObjectOrArray(msg)) + { + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + } + + var streamName = SubscriptionIndex.TokenAt(subject, 6); + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream?.Store is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var storeMsg = stream.Store.LoadMsg(request.Seq, null); + if (storeMsg is null) + { + response.Error = JsApiErrors.NewJSNoMessageFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Message = new StoredMsg(); + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsStreamPurgeRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamPurgeResponse { Type = JsApiSubjects.JsApiStreamPurgeResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var request = new JsApiStreamPurgeRequest(); + if (JetStreamApi.IsJSONObjectOrArray(msg) && + UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = StreamNameFromSubject(subject); + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream?.Store is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + (ulong purged, Exception? purgeErr) result = string.IsNullOrWhiteSpace(request.Subject) && request.Sequence == 0 && request.Keep == 0 + ? stream.Store.Purge() + : stream.Store.PurgeEx(request.Subject ?? string.Empty, request.Sequence, request.Keep); + + if (result.purgeErr is not null) + { + response.Error = JsApiErrors.NewJSStreamPurgeFailedError(result.purgeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Success = true; + response.Purged = result.purged; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsConsumerUnpinRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiConsumerUnpinResponse { Type = JsApiSubjects.JsApiConsumerUnpinResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var request = new JsApiConsumerUnpinRequest(); + if (JetStreamApi.IsJSONObjectOrArray(msg) && + UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsStreamRestoreRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamRestoreResponse { Type = JsApiSubjects.JsApiStreamRestoreResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var request = new JsApiStreamRestoreRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = StreamNameFromSubject(subject); + var restoreErr = ProcessStreamRestore(requestAcc, streamName, request); + if (restoreErr is not null) + { + response.Error = JsApiErrors.NewJSStreamRestoreError(restoreErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.DeliverSubject = $"$JSC.R.{streamName}"; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal Exception? ProcessStreamRestore(Account acc, string streamName, JsApiStreamRestoreRequest request) + { + _ = request; + var (stream, lookupErr) = acc.LookupStream(streamName); + if (lookupErr is not null || stream is null) + return lookupErr ?? new InvalidOperationException("stream not found"); + + stream.UpdateConfig(stream.GetConfig()); + return null; + } + + internal void JsStreamSnapshotRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamSnapshotResponse { Type = JsApiSubjects.JsApiStreamSnapshotResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var request = new JsApiStreamSnapshotRequest(); + if (JetStreamApi.IsJSONObjectOrArray(msg) && + UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = StreamNameFromSubject(subject); + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (info, snapErr) = StreamSnapshot(stream, request); + if (snapErr is not null || info is null) + { + response.Error = JsApiErrors.NewJSStreamSnapshotError(snapErr ?? new InvalidOperationException("snapshot failed")); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Config = info.Config; + response.State = info.State; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal (StreamInfo? Info, Exception? Error) StreamSnapshot(NatsStream stream, JsApiStreamSnapshotRequest request) + { + if (stream.Store is null) + return (null, new InvalidOperationException("stream store not initialized")); + + var (_, err) = stream.Store.Snapshot(TimeSpan.FromSeconds(30), !request.NoConsumers, request.CheckMsgs); + if (err is not null) + return (null, err); + return (stream.GetInfo(), null); + } + + internal void JsConsumerCreateRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiConsumerCreateResponse { Type = JsApiSubjects.JsApiConsumerCreateResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Error = JsApiErrors.NewJSClusterUnSupportFeatureError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsConsumerNamesRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiConsumerNamesResponse + { + Type = JsApiSubjects.JsApiConsumerNamesResponseType, + Consumers = [], + Limit = JsApiSubjects.JsApiNamesLimit, + Total = 0, + Offset = 0, + }; + + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsConsumerListRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiConsumerListResponse + { + Type = JsApiSubjects.JsApiConsumerListResponseType, + Consumers = [], + Limit = JsApiSubjects.JsApiListLimit, + Total = 0, + Offset = 0, + }; + + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsConsumerInfoRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiConsumerInfoResponse { Type = JsApiSubjects.JsApiConsumerInfoResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Error = JsApiErrors.NewJSConsumerNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsConsumerDeleteRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiConsumerDeleteResponse { Type = JsApiSubjects.JsApiConsumerDeleteResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Success = true; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsConsumerPauseRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiConsumerPauseResponse { Type = JsApiSubjects.JsApiConsumerPauseResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var request = new JsApiConsumerPauseRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Paused = request.PauseUntil > DateTime.UtcNow; + response.PauseUntil = request.PauseUntil; + response.PauseRemaining = request.PauseUntil <= DateTime.UtcNow ? TimeSpan.Zero : request.PauseUntil - DateTime.UtcNow; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + internal static bool IsEmptyRequest(byte[] request) => JetStreamApi.IsEmptyRequest(request); internal (string PreferredLeader, JsApiError? Error) GetStepDownPreferredPlacement(IRaftNode group, Placement? placement) diff --git a/porting.db b/porting.db index a90e491a5296cab2bf5642af0b2324816e479b7d..1e4d39e7f6dc105f83d42d656a4b78d38179a9a8 100644 GIT binary patch delta 4187 zcmai1X>43q8J+iL-|U%qiQ~EBC5!Dgw&%XtY$qm(X=zE)Hj~ATli2a+@kVJWspB9j zCD=~eud0$lQb8}g-Il0rq zuBGIvv6S2_v6Lil%$%3xwzuu&v-$j?X8A>(&hf#o z=xmm=FX(KBv!gnj=Ik+@O>wqEXOo;!olS7IQD@_v73%Cme`%)PC;#kC?~?al7SGAS zQ)&Bc_P3wNZ9{Hrk(}j=zW-UyXVC&jXS3)QM_>X#nP36~S-uSDW@HqIuNwxCj*?E~iR{q%dW9N^9KhDhTyzA;g*7CT!n!Wcc zmyH!f+#A^3@~`2&ZkIh4+ZK-^btCm4^&<5l^&`zgnvXPqG>CK^(gLJ~NQ;mbBVCWQ z1nCB(rAR~XQ{t+q$gA-e`Wn4Yd(OGvc3pM;+v#%@+fG|wl1HRhEu;B9rkptTyz8lR zT|tYqdp8@Q?qYVKmAKfi_mi~N+9&PA!=5-mstjGRkptv4!|*EwZZF$+kQh5>rw)?0 z4DYxP5ifh?yDs6K=MRx@8s6E_O12oL$((2hBySXIN#I zJwS8=Bvc4=&%A2I;EE!f=A-zyCl2j|bmarTiDGHBBvA*7}!BYdhC++Owm2`+* z3w4(Z8MkYG+2s&>_D)}&Us0OkIYjE!4JO^p8NE>&a~T{_OH4YM)jAn|r#qZI ze7v1_E|;s-^_r61xF#l5adSqma&vY`ij)1u-Jj=6#B&=|Y|_@WU=YoGVi1|@7^EjcF;tw-HE6LCQ1|cE74WG&`;euO-a1l&(-j| zJ@n4qc-rbv*LS&Xn*+9i z<=FhXf5Knt>+=5C^QmXM`>gBtu6*Z{4%PmVeXDIC!=rMKtz;yjDq2<+h*>EH9nZ>S z?Eb7Q)3P$5QZtQ}8OKH9i8VwbW+H}o6xO0>yj{_wp;)S1<<2sN(J z&~Sebx<;+3hL~quBbmHk4B^`heN| zD+4U3C=IKdS+CY;B6jgqnPgXhXV)5LRGT$ph%Md+UCWFrGc%^x{=AMPJ0bT6dHtI6 zZ!w#HWqbt{nlLd#-_gxxA}f=Nixd6#^4omh*_k=# z%*^octIF`~QRTp5%SlPH_Ks(i%O}soLP2@|-pQab8I(ItE8i;#vdzk^O#SfC8kTAt zo0ZqgdCtWDAKI0F4?2+PT}0L& z=gl^R_MX`W(cU!MZnPt2dj@UHY`f5g%+|3JwcD%%Xzga(fmUm_ezZ!n^`RA;tryMQ zgw}&*ZbI8`B(BR9Ts_L75+%Ycho~&dn`Pp&$>s@a) zec)qWW2?e0+UUCzUY!ERy_{@scrO@!7d!i!_OKzdT7&yhMs`V6T_q)(B;B3(qP6zLPBGLbGI6^e8oDNm%2k^D#t z+btg<`o!Qlq~zi*8{fRe3bO3)Xfx8SZ^>|7Vx2N&US=zdV-ZiObKlH01`4QP0V~*y znQQj5HFWTOo=+)3M;cwl&%~6XPYegF`>X{HaDoOd(7_EJ@WMQp4=Lb-1&|60VG*Rk z9kAH27I2o~_i(peU*?Kxapz&j8T&7G$>y|lt2>o`HX^@_mzBMXTv0wwk#t)wg(KWa z(^0+zD{|9hm_JYLucW3^sD}HfDZ+Ea_=}PMoFA^{*%UtPaMPY@K1n@ppSYk{ocml2 ze~Av3X}J`taNT)

@_)v1?VwRHTx!Jch^R2fn-r6!ZUedzj z$?;!XxrKge;aTFTIK88_*j4$+%2FCXrKC_{D<|{B{9AJ=vC$gZRN=^^jcvR&`Jb3{ zrcFEKE3`orV(=)mLkDz1mtl=B5a_`r_)dNOmhV0Re!1Bj_V1EN) C=Jf^u