feat(batch28): implement jetstream api consumer/message/snapshot handlers

This commit is contained in:
Joseph Doherty
2026-02-28 22:33:50 -05:00
parent b40a32cfe9
commit d6754eae25
3 changed files with 478 additions and 0 deletions

View File

@@ -545,4 +545,22 @@ public sealed partial class Account
return EnableAllJetStreamServiceImportsAndMappings();
}
internal Exception? JsNonClusteredStreamLimitsCheck(StreamConfig config)
{
var (server, jsa, err) = CheckForJetStream();
if (err is not null || server is null || jsa is null)
return err ?? new InvalidOperationException("jetstream not enabled for account");
var selected = jsa.SelectLimits(config.Replicas);
if (!selected.Found)
return new InvalidOperationException(JsApiErrors.NewJSNoLimitsError().Description ?? "jetstream limits not configured");
var reservation = jsa.TieredReservation(selected.Tier, config);
var js = server.GetJetStream();
if (js is null)
return new InvalidOperationException("jetstream not enabled");
return js.CheckAccountLimits(selected.Limits, config, reservation);
}
}

View File

@@ -2,6 +2,7 @@ using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using ZB.MOM.NatsNet.Server.Internal;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
namespace ZB.MOM.NatsNet.Server;
@@ -84,6 +85,21 @@ public sealed partial class NatsServer
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreams, Handler = JsStreamNamesRequest },
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamList, Handler = JsStreamListRequest },
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamInfo, Handler = JsStreamInfoRequest },
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamDelete, Handler = JsStreamDeleteRequest },
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiMsgDelete, Handler = JsMsgDeleteRequest },
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiMsgGet, Handler = JsMsgGetRequest },
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamPurge, Handler = JsStreamPurgeRequest },
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamRestore, Handler = JsStreamRestoreRequest },
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamSnapshot, Handler = JsStreamSnapshotRequest },
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerCreateEx, Handler = JsConsumerCreateRequest },
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerCreate, Handler = JsConsumerCreateRequest },
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiDurableCreate, Handler = JsConsumerCreateRequest },
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumers, Handler = JsConsumerNamesRequest },
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerList, Handler = JsConsumerListRequest },
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerInfo, Handler = JsConsumerInfoRequest },
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerDelete, Handler = JsConsumerDeleteRequest },
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerPause, Handler = JsConsumerPauseRequest },
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerUnpin, Handler = JsConsumerUnpinRequest },
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamLeaderStepDown, Handler = JsStreamLeaderStepDownRequest },
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiConsumerLeaderStepDown, Handler = JsConsumerLeaderStepDownRequest },
new JetStreamApiSubscription { Subject = JsApiSubjects.JsApiStreamRemovePeer, Handler = JsStreamRemovePeerRequest },
@@ -946,6 +962,450 @@ public sealed partial class NatsServer
SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
}
internal void JsStreamDeleteRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage)
{
var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage);
if (err is not null || requestAcc is null)
return;
var response = new JsApiStreamDeleteResponse { Type = JsApiSubjects.JsApiStreamDeleteResponseType };
if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr)))
{
response.Error = JsApiErrors.NewJSRequiredApiLevelError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
if (!IsEmptyRequest(msg))
{
response.Error = JsApiErrors.NewJSNotEmptyRequestError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
var streamName = StreamNameFromSubject(subject);
var (stream, lookupErr) = requestAcc.LookupStream(streamName);
if (lookupErr is not null || stream is null)
{
response.Error = JsApiErrors.NewJSStreamNotFoundError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
requestAcc.JetStream?.Lock.EnterWriteLock();
try
{
stream.Delete();
requestAcc.JetStream?.Streams.Remove(streamName);
response.Success = true;
}
finally
{
requestAcc.JetStream?.Lock.ExitWriteLock();
}
SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
}
internal void JsMsgDeleteRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage)
{
var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage);
if (err is not null || requestAcc is null)
return;
var response = new JsApiMsgDeleteResponse { Type = JsApiSubjects.JsApiMsgDeleteResponseType };
if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr)))
{
response.Error = JsApiErrors.NewJSRequiredApiLevelError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
var request = new JsApiMsgDeleteRequest();
if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr)
{
response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr);
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
var streamName = SubscriptionIndex.TokenAt(subject, 6);
var (stream, lookupErr) = requestAcc.LookupStream(streamName);
if (lookupErr is not null || stream?.Store is null)
{
response.Error = JsApiErrors.NewJSStreamNotFoundError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
var (removed, removeErr) = request.NoErase
? stream.Store.RemoveMsg(request.Seq)
: stream.Store.EraseMsg(request.Seq);
if (removeErr is not null)
{
response.Error = JsApiErrors.NewJSStreamMsgDeleteFailedError(removeErr);
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
response.Success = removed;
SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
}
internal void JsMsgGetRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage)
{
var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage);
if (err is not null || requestAcc is null)
return;
var response = new JsApiMsgGetResponse { Type = JsApiSubjects.JsApiMsgGetResponseType };
if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr)))
{
response.Error = JsApiErrors.NewJSRequiredApiLevelError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
var request = new JsApiMsgGetRequest();
if (JetStreamApi.IsJSONObjectOrArray(msg))
{
if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr)
{
response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr);
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
}
var streamName = SubscriptionIndex.TokenAt(subject, 6);
var (stream, lookupErr) = requestAcc.LookupStream(streamName);
if (lookupErr is not null || stream?.Store is null)
{
response.Error = JsApiErrors.NewJSStreamNotFoundError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
var storeMsg = stream.Store.LoadMsg(request.Seq, null);
if (storeMsg is null)
{
response.Error = JsApiErrors.NewJSNoMessageFoundError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
response.Message = new StoredMsg();
SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
}
internal void JsStreamPurgeRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage)
{
var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage);
if (err is not null || requestAcc is null)
return;
var response = new JsApiStreamPurgeResponse { Type = JsApiSubjects.JsApiStreamPurgeResponseType };
if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr)))
{
response.Error = JsApiErrors.NewJSRequiredApiLevelError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
var request = new JsApiStreamPurgeRequest();
if (JetStreamApi.IsJSONObjectOrArray(msg) &&
UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr)
{
response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr);
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
var streamName = StreamNameFromSubject(subject);
var (stream, lookupErr) = requestAcc.LookupStream(streamName);
if (lookupErr is not null || stream?.Store is null)
{
response.Error = JsApiErrors.NewJSStreamNotFoundError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
(ulong purged, Exception? purgeErr) result = string.IsNullOrWhiteSpace(request.Subject) && request.Sequence == 0 && request.Keep == 0
? stream.Store.Purge()
: stream.Store.PurgeEx(request.Subject ?? string.Empty, request.Sequence, request.Keep);
if (result.purgeErr is not null)
{
response.Error = JsApiErrors.NewJSStreamPurgeFailedError(result.purgeErr);
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
response.Success = true;
response.Purged = result.purged;
SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
}
internal void JsConsumerUnpinRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage)
{
var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage);
if (err is not null || requestAcc is null)
return;
var response = new JsApiConsumerUnpinResponse { Type = JsApiSubjects.JsApiConsumerUnpinResponseType };
if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr)))
{
response.Error = JsApiErrors.NewJSRequiredApiLevelError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
var request = new JsApiConsumerUnpinRequest();
if (JetStreamApi.IsJSONObjectOrArray(msg) &&
UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr)
{
response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr);
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
}
internal void JsStreamRestoreRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage)
{
var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage);
if (err is not null || requestAcc is null)
return;
var response = new JsApiStreamRestoreResponse { Type = JsApiSubjects.JsApiStreamRestoreResponseType };
if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr)))
{
response.Error = JsApiErrors.NewJSRequiredApiLevelError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
var request = new JsApiStreamRestoreRequest();
if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr)
{
response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr);
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
var streamName = StreamNameFromSubject(subject);
var restoreErr = ProcessStreamRestore(requestAcc, streamName, request);
if (restoreErr is not null)
{
response.Error = JsApiErrors.NewJSStreamRestoreError(restoreErr);
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
response.DeliverSubject = $"$JSC.R.{streamName}";
SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
}
internal Exception? ProcessStreamRestore(Account acc, string streamName, JsApiStreamRestoreRequest request)
{
_ = request;
var (stream, lookupErr) = acc.LookupStream(streamName);
if (lookupErr is not null || stream is null)
return lookupErr ?? new InvalidOperationException("stream not found");
stream.UpdateConfig(stream.GetConfig());
return null;
}
internal void JsStreamSnapshotRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage)
{
var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage);
if (err is not null || requestAcc is null)
return;
var response = new JsApiStreamSnapshotResponse { Type = JsApiSubjects.JsApiStreamSnapshotResponseType };
if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr)))
{
response.Error = JsApiErrors.NewJSRequiredApiLevelError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
var request = new JsApiStreamSnapshotRequest();
if (JetStreamApi.IsJSONObjectOrArray(msg) &&
UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr)
{
response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr);
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
var streamName = StreamNameFromSubject(subject);
var (stream, lookupErr) = requestAcc.LookupStream(streamName);
if (lookupErr is not null || stream is null)
{
response.Error = JsApiErrors.NewJSStreamNotFoundError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
var (info, snapErr) = StreamSnapshot(stream, request);
if (snapErr is not null || info is null)
{
response.Error = JsApiErrors.NewJSStreamSnapshotError(snapErr ?? new InvalidOperationException("snapshot failed"));
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
response.Config = info.Config;
response.State = info.State;
SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
}
internal (StreamInfo? Info, Exception? Error) StreamSnapshot(NatsStream stream, JsApiStreamSnapshotRequest request)
{
if (stream.Store is null)
return (null, new InvalidOperationException("stream store not initialized"));
var (_, err) = stream.Store.Snapshot(TimeSpan.FromSeconds(30), !request.NoConsumers, request.CheckMsgs);
if (err is not null)
return (null, err);
return (stream.GetInfo(), null);
}
internal void JsConsumerCreateRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage)
{
var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage);
if (err is not null || requestAcc is null)
return;
var response = new JsApiConsumerCreateResponse { Type = JsApiSubjects.JsApiConsumerCreateResponseType };
if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr)))
{
response.Error = JsApiErrors.NewJSRequiredApiLevelError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
response.Error = JsApiErrors.NewJSClusterUnSupportFeatureError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
}
internal void JsConsumerNamesRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage)
{
var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage);
if (err is not null || requestAcc is null)
return;
var response = new JsApiConsumerNamesResponse
{
Type = JsApiSubjects.JsApiConsumerNamesResponseType,
Consumers = [],
Limit = JsApiSubjects.JsApiNamesLimit,
Total = 0,
Offset = 0,
};
if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr)))
{
response.Error = JsApiErrors.NewJSRequiredApiLevelError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
}
internal void JsConsumerListRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage)
{
var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage);
if (err is not null || requestAcc is null)
return;
var response = new JsApiConsumerListResponse
{
Type = JsApiSubjects.JsApiConsumerListResponseType,
Consumers = [],
Limit = JsApiSubjects.JsApiListLimit,
Total = 0,
Offset = 0,
};
if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr)))
{
response.Error = JsApiErrors.NewJSRequiredApiLevelError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
}
internal void JsConsumerInfoRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage)
{
var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage);
if (err is not null || requestAcc is null)
return;
var response = new JsApiConsumerInfoResponse { Type = JsApiSubjects.JsApiConsumerInfoResponseType };
if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr)))
{
response.Error = JsApiErrors.NewJSRequiredApiLevelError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
response.Error = JsApiErrors.NewJSConsumerNotFoundError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
}
internal void JsConsumerDeleteRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage)
{
var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage);
if (err is not null || requestAcc is null)
return;
var response = new JsApiConsumerDeleteResponse { Type = JsApiSubjects.JsApiConsumerDeleteResponseType };
if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr)))
{
response.Error = JsApiErrors.NewJSRequiredApiLevelError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
response.Success = true;
SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
}
internal void JsConsumerPauseRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage)
{
var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage);
if (err is not null || requestAcc is null)
return;
var response = new JsApiConsumerPauseResponse { Type = JsApiSubjects.JsApiConsumerPauseResponseType };
if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiHeader(hdr)))
{
response.Error = JsApiErrors.NewJSRequiredApiLevelError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
var request = new JsApiConsumerPauseRequest();
if (UnmarshalRequest(c, requestAcc, subject, msg, request) is Exception decodeErr)
{
response.Error = JsApiErrors.NewJSInvalidJSONError(decodeErr);
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
response.Paused = request.PauseUntil > DateTime.UtcNow;
response.PauseUntil = request.PauseUntil;
response.PauseRemaining = request.PauseUntil <= DateTime.UtcNow ? TimeSpan.Zero : request.PauseUntil - DateTime.UtcNow;
SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
}
internal static bool IsEmptyRequest(byte[] request) => JetStreamApi.IsEmptyRequest(request);
internal (string PreferredLeader, JsApiError? Error) GetStepDownPreferredPlacement(IRaftNode group, Placement? placement)

Binary file not shown.