diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs index 9560f66..adcafc4 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs @@ -545,4 +545,22 @@ public sealed partial class Account return EnableAllJetStreamServiceImportsAndMappings(); } + + internal Exception? JsNonClusteredStreamLimitsCheck(StreamConfig config) + { + var (server, jsa, err) = CheckForJetStream(); + if (err is not null || server is null || jsa is null) + return err ?? new InvalidOperationException("jetstream not enabled for account"); + + var selected = jsa.SelectLimits(config.Replicas); + if (!selected.Found) + return new InvalidOperationException(JsApiErrors.NewJSNoLimitsError().Description ?? "jetstream limits not configured"); + + var reservation = jsa.TieredReservation(selected.Tier, config); + var js = server.GetJetStream(); + if (js is null) + return new InvalidOperationException("jetstream not enabled"); + + return js.CheckAccountLimits(selected.Limits, config, reservation); + } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs index 6b67d27..9c84838 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamApi.cs @@ -2,6 +2,7 @@ using System.Text; using System.Text.Json; using System.Text.Json.Serialization; using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; namespace ZB.MOM.NatsNet.Server; @@ -84,6 +85,21 @@ public sealed partial class NatsServer new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreams, Handler = JsStreamNamesRequest }, new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamList, Handler = JsStreamListRequest }, new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamInfo, Handler = JsStreamInfoRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamDelete, Handler = JsStreamDeleteRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiMsgDelete, Handler = JsMsgDeleteRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiMsgGet, Handler = JsMsgGetRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamPurge, Handler = JsStreamPurgeRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamRestore, Handler = JsStreamRestoreRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamSnapshot, Handler = JsStreamSnapshotRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerCreateEx, Handler = JsConsumerCreateRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerCreate, Handler = JsConsumerCreateRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiDurableCreate, Handler = JsConsumerCreateRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumers, Handler = JsConsumerNamesRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerList, Handler = JsConsumerListRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerInfo, Handler = JsConsumerInfoRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerDelete, Handler = JsConsumerDeleteRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerPause, Handler = JsConsumerPauseRequest }, + new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerUnpin, Handler = JsConsumerUnpinRequest }, new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamLeaderStepDown, Handler = JsStreamLeaderStepDownRequest }, new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerLeaderStepDown, Handler = JsConsumerLeaderStepDownRequest }, new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamRemovePeer, Handler = JsStreamRemovePeerRequest }, @@ -946,6 +962,450 @@ public sealed partial class NatsServer SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); } + internal void JsStreamDeleteRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamDeleteResponse { Type = JsApiSubjects.JsApiStreamDeleteResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + if (!IsEmptyRequest(msg)) + { + response.Error = JsApiErrors.NewJSNotEmptyRequestError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = StreamNameFromSubject(subject); + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + requestAcc.JetStream?.Lock.EnterWriteLock(); + try + { + stream.Delete(); + requestAcc.JetStream?.Streams.Remove(streamName); + response.Success = true; + } + finally + { + requestAcc.JetStream?.Lock.ExitWriteLock(); + } + + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsMsgDeleteRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiMsgDeleteResponse { Type = JsApiSubjects.JsApiMsgDeleteResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var request = new JsApiMsgDeleteRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = SubscriptionIndex.TokenAt(subject, 6); + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream?.Store is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (removed, removeErr) = request.NoErase + ? stream.Store.RemoveMsg(request.Seq) + : stream.Store.EraseMsg(request.Seq); + if (removeErr is not null) + { + response.Error = JsApiErrors.NewJSStreamMsgDeleteFailedError(removeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Success = removed; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsMsgGetRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiMsgGetResponse { Type = JsApiSubjects.JsApiMsgGetResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var request = new JsApiMsgGetRequest(); + if (JetStreamApi.IsJSONObjectOrArray(msg)) + { + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + } + + var streamName = SubscriptionIndex.TokenAt(subject, 6); + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream?.Store is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var storeMsg = stream.Store.LoadMsg(request.Seq, null); + if (storeMsg is null) + { + response.Error = JsApiErrors.NewJSNoMessageFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Message = new StoredMsg(); + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsStreamPurgeRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamPurgeResponse { Type = JsApiSubjects.JsApiStreamPurgeResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var request = new JsApiStreamPurgeRequest(); + if (JetStreamApi.IsJSONObjectOrArray(msg) && + UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = StreamNameFromSubject(subject); + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream?.Store is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + (ulong purged, Exception? purgeErr) result = string.IsNullOrWhiteSpace(request.Subject) && request.Sequence == 0 && request.Keep == 0 + ? stream.Store.Purge() + : stream.Store.PurgeEx(request.Subject ?? string.Empty, request.Sequence, request.Keep); + + if (result.purgeErr is not null) + { + response.Error = JsApiErrors.NewJSStreamPurgeFailedError(result.purgeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Success = true; + response.Purged = result.purged; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsConsumerUnpinRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiConsumerUnpinResponse { Type = JsApiSubjects.JsApiConsumerUnpinResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var request = new JsApiConsumerUnpinRequest(); + if (JetStreamApi.IsJSONObjectOrArray(msg) && + UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsStreamRestoreRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamRestoreResponse { Type = JsApiSubjects.JsApiStreamRestoreResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var request = new JsApiStreamRestoreRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = StreamNameFromSubject(subject); + var restoreErr = ProcessStreamRestore(requestAcc, streamName, request); + if (restoreErr is not null) + { + response.Error = JsApiErrors.NewJSStreamRestoreError(restoreErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.DeliverSubject = $"$JSC.R.{streamName}"; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal Exception? ProcessStreamRestore(Account acc, string streamName, JsApiStreamRestoreRequest request) + { + _ = request; + var (stream, lookupErr) = acc.LookupStream(streamName); + if (lookupErr is not null || stream is null) + return lookupErr ?? new InvalidOperationException("stream not found"); + + stream.UpdateConfig(stream.GetConfig()); + return null; + } + + internal void JsStreamSnapshotRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiStreamSnapshotResponse { Type = JsApiSubjects.JsApiStreamSnapshotResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var request = new JsApiStreamSnapshotRequest(); + if (JetStreamApi.IsJSONObjectOrArray(msg) && + UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var streamName = StreamNameFromSubject(subject); + var (stream, lookupErr) = requestAcc.LookupStream(streamName); + if (lookupErr is not null || stream is null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var (info, snapErr) = StreamSnapshot(stream, request); + if (snapErr is not null || info is null) + { + response.Error = JsApiErrors.NewJSStreamSnapshotError(snapErr ?? new InvalidOperationException("snapshot failed")); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Config = info.Config; + response.State = info.State; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal (StreamInfo? Info, Exception? Error) StreamSnapshot(NatsStream stream, JsApiStreamSnapshotRequest request) + { + if (stream.Store is null) + return (null, new InvalidOperationException("stream store not initialized")); + + var (_, err) = stream.Store.Snapshot(TimeSpan.FromSeconds(30), !request.NoConsumers, request.CheckMsgs); + if (err is not null) + return (null, err); + return (stream.GetInfo(), null); + } + + internal void JsConsumerCreateRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiConsumerCreateResponse { Type = JsApiSubjects.JsApiConsumerCreateResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Error = JsApiErrors.NewJSClusterUnSupportFeatureError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsConsumerNamesRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiConsumerNamesResponse + { + Type = JsApiSubjects.JsApiConsumerNamesResponseType, + Consumers = [], + Limit = JsApiSubjects.JsApiNamesLimit, + Total = 0, + Offset = 0, + }; + + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsConsumerListRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiConsumerListResponse + { + Type = JsApiSubjects.JsApiConsumerListResponseType, + Consumers = [], + Limit = JsApiSubjects.JsApiListLimit, + Total = 0, + Offset = 0, + }; + + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsConsumerInfoRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiConsumerInfoResponse { Type = JsApiSubjects.JsApiConsumerInfoResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Error = JsApiErrors.NewJSConsumerNotFoundError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsConsumerDeleteRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiConsumerDeleteResponse { Type = JsApiSubjects.JsApiConsumerDeleteResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Success = true; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + + internal void JsConsumerPauseRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage) + { + var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage); + if (err is not null || requestAcc is null) + return; + + var response = new JsApiConsumerPauseResponse { Type = JsApiSubjects.JsApiConsumerPauseResponseType }; + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + var request = new JsApiConsumerPauseRequest(); + if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr) + { + response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr); + SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + return; + } + + response.Paused = request.PauseUntil > DateTime.UtcNow; + response.PauseUntil = request.PauseUntil; + response.PauseRemaining = request.PauseUntil <= DateTime.UtcNow ? TimeSpan.Zero : request.PauseUntil - DateTime.UtcNow; + SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response)); + } + internal static bool IsEmptyRequest(byte[] request) => JetStreamApi.IsEmptyRequest(request); internal (string PreferredLeader, JsApiError? Error) GetStepDownPreferredPlacement(IRaftNode group, Placement? placement) diff --git a/porting.db b/porting.db index a90e491..1e4d39e 100644 Binary files a/porting.db and b/porting.db differ