feat(batch28): implement jetstream api dispatch and account request core

This commit is contained in:
Joseph Doherty
2026-02-28 22:24:10 -05:00
parent 2c4bebfb8d
commit bf115b116e
7 changed files with 778 additions and 5 deletions

View File

@@ -127,6 +127,72 @@ public sealed partial class Account
return (server, jsa, null);
}
internal void TrackAPI()
{
_mu.EnterReadLock();
var jsa = JetStream;
_mu.ExitReadLock();
if (jsa is null)
return;
jsa.UsageLock.EnterWriteLock();
try
{
jsa.UsageApi++;
jsa.ApiTotal++;
jsa.SendClusterUsageUpdate();
if (jsa.Js is JetStream js)
Interlocked.Add(ref js.ApiTotal, 1);
}
finally
{
jsa.UsageLock.ExitWriteLock();
}
}
internal void TrackAPIErr()
{
_mu.EnterReadLock();
var jsa = JetStream;
_mu.ExitReadLock();
if (jsa is null)
return;
jsa.UsageLock.EnterWriteLock();
try
{
jsa.UsageApi++;
jsa.ApiTotal++;
jsa.UsageErr++;
jsa.ApiErrors++;
jsa.SendClusterUsageUpdate();
if (jsa.Js is JetStream js)
{
Interlocked.Add(ref js.ApiTotal, 1);
Interlocked.Add(ref js.ApiErrors, 1);
}
}
finally
{
jsa.UsageLock.ExitWriteLock();
}
}
internal (bool Enabled, bool ShouldError) CheckJetStream()
{
_mu.EnterReadLock();
try
{
return (JetStream is not null, _nleafs + _nrleafs == 0);
}
finally
{
_mu.ExitReadLock();
}
}
internal (bool MaxBytesRequired, long MaxStreamBytes) MaxBytesLimits(StreamConfig? cfg)
{
_mu.EnterReadLock();

View File

@@ -0,0 +1,160 @@
using System.Text.Json;
using System.Text.Json.Serialization;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
namespace ZB.MOM.NatsNet.Server;
internal delegate void JetStreamApiHandler(ClientConnection c, Account acc, string subject, string reply, byte[] message);
internal sealed class JetStreamApiSubscription
{
public required string Subject { get; init; }
public required JetStreamApiHandler Handler { get; init; }
}
internal sealed class JsApiRoutedRequest
{
public required JetStreamApiSubscription Subscription { get; init; }
public required Account Account { get; init; }
public required string Subject { get; init; }
public required string Reply { get; init; }
public required byte[] Message { get; init; }
}
internal sealed class DelayedApiResponse
{
public ClientInfo? ClientInfo { get; init; }
public required Account Account { get; init; }
public required string Subject { get; init; }
public required string Reply { get; init; }
public string Request { get; init; } = string.Empty;
public byte[]? Header { get; init; }
public required string Response { get; init; }
public DateTime DeadlineUtc { get; init; }
public bool RawResponse { get; init; }
public DelayedApiResponse? Next { get; set; }
}
internal static class JetStreamApi
{
internal static readonly TimeSpan ErrorResponseDelay = TimeSpan.FromMilliseconds(500);
internal static Dictionary<string, string> GenerateJSMappingTable(string domain)
{
var mappings = new Dictionary<string, string>(StringComparer.Ordinal)
{
["INFO"] = JsApiSubjects.JsApiAccountInfo,
["STREAM.>"] = "$JS.API.STREAM.>",
["CONSUMER.>"] = "$JS.API.CONSUMER.>",
["DIRECT.>"] = "$JS.API.DIRECT.>",
["META.>"] = "$JS.API.META.>",
["SERVER.>"] = "$JS.API.SERVER.>",
["ACCOUNT.>"] = "$JS.API.ACCOUNT.>",
["$KV.>"] = "$KV.>",
["$OBJ.>"] = "$OBJ.>",
};
var table = new Dictionary<string, string>(StringComparer.Ordinal);
foreach (var (suffix, target) in mappings)
table[$"$JS.{domain}.API.{suffix}"] = target;
return table;
}
internal static void AddDelayedResponse(ref DelayedApiResponse? head, ref DelayedApiResponse? tail, DelayedApiResponse response)
{
if (head is null)
{
head = response;
tail = response;
return;
}
if (tail is not null && response.DeadlineUtc >= tail.DeadlineUtc)
{
tail.Next = response;
tail = response;
return;
}
DelayedApiResponse? previous = null;
for (var current = head; current is not null; current = current.Next)
{
if (response.DeadlineUtc < current.DeadlineUtc)
{
response.Next = current;
if (previous is null)
head = response;
else
previous.Next = response;
return;
}
previous = current;
}
if (tail is not null)
tail.Next = response;
tail = response;
}
internal static string StreamNameFromSubject(string subject) =>
SubscriptionIndex.TokenAt(subject, 5);
internal static string ConsumerNameFromSubject(string subject) =>
SubscriptionIndex.TokenAt(subject, 6);
internal static bool IsJSONObjectOrArray(ReadOnlySpan<byte> payload)
{
for (var i = 0; i < payload.Length; i++)
{
var ch = payload[i];
if (ch is (byte)' ' or (byte)'\t' or (byte)'\r' or (byte)'\n')
continue;
return ch is (byte)'{' or (byte)'[';
}
return false;
}
internal static bool SubjectMatches(string pattern, string subject)
{
var p = pattern.Split('.', StringSplitOptions.None);
var s = subject.Split('.', StringSplitOptions.None);
for (var i = 0; i < p.Length; i++)
{
if (i >= s.Length)
return p[i] == ">";
var token = p[i];
if (token == ">")
return true;
if (token == "*")
continue;
if (!string.Equals(token, s[i], StringComparison.Ordinal))
return false;
}
return p.Length == s.Length;
}
internal static T? DeserializeStrict<T>(byte[] payload, bool strictMode)
{
var strictOptions = new JsonSerializerOptions
{
PropertyNameCaseInsensitive = true,
UnmappedMemberHandling = JsonUnmappedMemberHandling.Disallow,
};
try
{
return JsonSerializer.Deserialize<T>(payload, strictOptions);
}
catch when (!strictMode)
{
return JsonSerializer.Deserialize<T>(payload);
}
}
}

View File

@@ -1,3 +1,6 @@
using System.Text;
using ZB.MOM.NatsNet.Server.Internal;
namespace ZB.MOM.NatsNet.Server;
internal sealed class JetStreamEngine(JetStream state)
@@ -7,6 +10,7 @@ internal sealed class JetStreamEngine(JetStream state)
private const string JsWillExtend = "will_extend";
private const string JsNoExtend = "no_extend";
private const string JsDomainApiTemplate = "$JS.{0}.API.>";
private const string JsApiAccountPrefix = "$JS.API.ACCOUNT.";
internal void SetStarted()
{
@@ -21,6 +25,91 @@ internal sealed class JetStreamEngine(JetStream state)
}
}
internal void ApiDispatch(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage)
{
// These meta/system directives are handled elsewhere.
if (subject == JsApiSubjects.JsApiLeaderStepDown ||
subject == JsApiSubjects.JsApiRemoveServer ||
subject.StartsWith(JsApiAccountPrefix, StringComparison.Ordinal))
{
return;
}
if (_state.Server is not NatsServer server)
return;
var route = server.MatchJetStreamApiSubscription(subject);
var (header, payload) = c.MsgParts(rawMessage);
var requestBody = Encoding.UTF8.GetString(payload);
if (NatsMessageHeaders.GetHeader(AccountEventConstants.ClientInfoHeader, header) is not { Length: > 0 })
{
var systemAccount = server.SystemAccount();
if (!ReferenceEquals(systemAccount, acc))
return;
if (subject != JsApiSubjects.JsApiAccountInfo)
{
if (c.Kind is ClientKind.Client or ClientKind.Leaf)
{
var response = new ApiResponse
{
Type = JsApiSubjects.JsApiSystemResponseType,
Error = JsApiErrors.NewJSNotEnabledForAccountError(),
};
server.SendAPIErrResponse(null, acc, subject, reply, requestBody, server.JsonResponse(response));
}
return;
}
}
if (route is null)
{
if (c.Kind is ClientKind.Client or ClientKind.Leaf)
{
var (_, requestAcc, _, _, _) = server.GetRequestInfo(c, rawMessage);
if (requestAcc is not null && !ReferenceEquals(requestAcc, server.SystemAccount()))
{
var response = new ApiResponse
{
Type = JsApiSubjects.JsApiSystemResponseType,
Error = JsApiErrors.NewJSBadRequestError(),
};
server.SendAPIErrResponse(null, requestAcc, subject, reply, requestBody, server.JsonResponse(response));
}
}
return;
}
Interlocked.Add(ref _state.ApiInflight, 1);
var request = new JsApiRoutedRequest
{
Subscription = route,
Account = acc,
Subject = subject,
Reply = reply,
Message = rawMessage.ToArray(),
};
var (queued, dropped) = server.EnqueueJSApiRequest(_state, request);
if (!queued)
{
if (dropped > 0)
{
server.PublishAdvisory(
server.SystemAccount(),
JsApiSubjects.JsAdvisoryApiLimitReached,
new { Type = "io.nats.jetstream.advisory.v1.api_limit_reached", Dropped = dropped, Time = DateTime.UtcNow });
}
route.Handler(c, acc, subject, reply, rawMessage);
Interlocked.Add(ref _state.ApiInflight, -1);
}
}
internal bool IsEnabled() => Interlocked.CompareExchange(ref _state.Disabled, 0, 0) == 0;
internal void SetJetStreamStandAlone(bool isStandAlone)

View File

@@ -218,6 +218,38 @@ internal sealed partial class JsAccount
}
}
internal long TieredReservation(string tier, StreamConfig cfg)
{
long reservation = 0;
Lock.EnterReadLock();
try
{
foreach (var stream in Streams.Values.OfType<NatsStream>())
{
var streamCfg = stream.Config;
if (streamCfg.MaxBytes <= 0 || string.Equals(streamCfg.Name, cfg.Name, StringComparison.Ordinal))
continue;
if (string.IsNullOrEmpty(tier))
{
if (streamCfg.Storage == cfg.Storage)
reservation += Math.Max(1, streamCfg.Replicas) * streamCfg.MaxBytes;
continue;
}
if (streamCfg.Replicas == cfg.Replicas && JetStreamEngine.IsSameTier(streamCfg, cfg))
reservation += Math.Max(1, streamCfg.Replicas) * streamCfg.MaxBytes;
}
return reservation;
}
finally
{
Lock.ExitReadLock();
}
}
internal (ulong Mem, ulong Store) StorageTotals()
{
UsageLock.EnterReadLock();

View File

@@ -0,0 +1,430 @@
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using ZB.MOM.NatsNet.Server.Internal;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class NatsServer
{
private readonly Lock _jsApiSubsLock = new();
private readonly List<JetStreamApiSubscription> _jsApiSubscriptions = [];
private IpQueue<JsApiRoutedRequest>? _jsApiRoutedReqs;
private IpQueue<DelayedApiResponse>? _delayedApiResponses;
internal void ProcessJSAPIRoutedRequests()
{
var queue = _jsApiRoutedReqs;
if (queue is null)
return;
var client = CreateInternalJetStreamClient();
var token = _quitCts.Token;
while (!token.IsCancellationRequested)
{
try
{
if (!queue.Ch.WaitToReadAsync(token).AsTask().GetAwaiter().GetResult())
break;
while (true)
{
var (request, ok) = queue.PopOne();
if (!ok)
break;
var start = DateTime.UtcNow;
request.Subscription.Handler(client, request.Account, request.Subject, request.Reply, request.Message);
var elapsed = DateTime.UtcNow - start;
if (elapsed >= TimeSpan.FromSeconds(1))
Warnf("Internal subscription on '{0}' took too long: {1}", request.Subject, elapsed);
var js = GetJetStreamState();
if (js is not null)
Interlocked.Add(ref js.ApiInflight, -1);
}
}
catch (OperationCanceledException)
{
return;
}
catch (Exception ex)
{
Warnf("JetStream routed API worker failed: {0}", ex.Message);
}
}
}
internal Exception? SetJetStreamExportSubs()
{
var js = GetJetStreamState();
if (js is null)
return new InvalidOperationException(JsApiErrors.NewJSNotEnabledError().Description ?? "jetstream not enabled");
var workers = Math.Min(Math.Max(Environment.ProcessorCount, 1), 16);
_jsApiRoutedReqs = IpQueue<JsApiRoutedRequest>.NewIPQueue("Routed JS API Requests");
_delayedApiResponses = IpQueue<DelayedApiResponse>.NewIPQueue("Delayed JS API Responses");
Interlocked.Exchange(ref js.QueueLimit, JsApiSubjects.JsDefaultRequestQueueLimit);
for (var i = 0; i < workers; i++)
StartGoRoutine(ProcessJSAPIRoutedRequests);
StartGoRoutine(DelayedAPIResponder);
lock (_jsApiSubsLock)
{
_jsApiSubscriptions.Clear();
_jsApiSubscriptions.Add(new JetStreamApiSubscription
{
Subject = JsApiSubjects.JsApiAccountInfo,
Handler = JsAccountInfoRequest,
});
}
var sys = SystemAccount();
if (sys is null)
return null;
var err = sys.AddServiceExport(JsApiSubjects.JsAllApi, null);
if (err is not null)
Warnf("Error setting up jetstream service exports: {0}", err.Message);
return err;
}
internal JetStreamApiSubscription? MatchJetStreamApiSubscription(string subject)
{
lock (_jsApiSubsLock)
{
foreach (var sub in _jsApiSubscriptions)
{
if (JetStreamApi.SubjectMatches(sub.Subject, subject))
return sub;
}
}
return null;
}
internal (bool Queued, long Dropped) EnqueueJSApiRequest(JetStream js, JsApiRoutedRequest request)
{
var queue = _jsApiRoutedReqs;
if (queue is null)
return (false, 0);
var (pending, error) = queue.Push(request);
var limit = Interlocked.Read(ref js.QueueLimit);
if (limit <= 0)
limit = JsApiSubjects.JsDefaultRequestQueueLimit;
if (error is null && pending < limit)
return (true, 0);
RateLimitFormatWarnf("JetStream API queue limit reached, dropping {0} requests", pending);
var drained = queue.Drain();
if (drained > 0)
Interlocked.Add(ref js.ApiInflight, -drained);
return (false, drained);
}
internal void SendAPIResponse(ClientInfo? ci, Account acc, string subject, string reply, string request, string response)
{
acc.TrackAPI();
if (!string.IsNullOrWhiteSpace(reply))
_ = SendInternalAccountMsgWithReply(acc, reply, string.Empty, null, response, false);
SendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response);
}
internal void SendAPIErrResponse(ClientInfo? ci, Account acc, string subject, string reply, string request, string response)
{
acc.TrackAPIErr();
if (!string.IsNullOrWhiteSpace(reply))
_ = SendInternalAccountMsgWithReply(acc, reply, string.Empty, null, response, false);
SendJetStreamAPIAuditAdvisory(ci, acc, subject, request, response);
}
internal void DelayedAPIResponder()
{
var queue = _delayedApiResponses;
if (queue is null)
return;
var token = _quitCts.Token;
DelayedApiResponse? head = null;
DelayedApiResponse? tail = null;
while (!token.IsCancellationRequested)
{
try
{
while (true)
{
var (entry, ok) = queue.PopOne();
if (!ok)
break;
JetStreamApi.AddDelayedResponse(ref head, ref tail, entry);
}
var now = DateTime.UtcNow;
if (head is not null && head.DeadlineUtc <= now)
{
var entry = head;
head = head.Next;
if (head is null)
tail = null;
if (entry.RawResponse)
_ = SendInternalAccountMsgWithReply(entry.Account, entry.Subject, string.Empty, entry.Header, entry.Response, false);
else
SendAPIErrResponse(entry.ClientInfo, entry.Account, entry.Subject, entry.Reply, entry.Request, entry.Response);
continue;
}
var wait = head is null
? TimeSpan.FromSeconds(1)
: head.DeadlineUtc - now;
if (wait < TimeSpan.Zero)
wait = TimeSpan.Zero;
_ = Task.WhenAny(
queue.Ch.WaitToReadAsync(token).AsTask(),
Task.Delay(wait, token)).GetAwaiter().GetResult();
}
catch (OperationCanceledException)
{
return;
}
catch (Exception ex)
{
Warnf("Delayed JetStream API responder failed: {0}", ex.Message);
}
}
}
internal void SendDelayedAPIErrResponse(
ClientInfo? ci,
Account acc,
string subject,
string reply,
string request,
string response,
TimeSpan duration)
{
_delayedApiResponses?.Push(new DelayedApiResponse
{
ClientInfo = ci,
Account = acc,
Subject = subject,
Reply = reply,
Request = request,
Response = response,
DeadlineUtc = DateTime.UtcNow.Add(duration),
RawResponse = false,
});
}
internal void SendDelayedErrResponse(Account acc, string subject, byte[]? header, string response, TimeSpan duration)
{
_delayedApiResponses?.Push(new DelayedApiResponse
{
Account = acc,
Subject = subject,
Reply = string.Empty,
Header = header,
Response = response,
DeadlineUtc = DateTime.UtcNow.Add(duration),
RawResponse = true,
});
}
internal (ClientInfo? ClientInfo, Account? Account, byte[] Header, byte[] Message, Exception? Error) GetRequestInfo(ClientConnection c, byte[] raw)
{
var (hdr, msg) = c.MsgParts(raw);
ClientInfo? clientInfo = null;
var clientInfoBytes = NatsMessageHeaders.GetHeader(AccountEventConstants.ClientInfoHeader, hdr);
if (clientInfoBytes is { Length: > 0 })
{
try
{
clientInfo = JsonSerializer.Deserialize<ClientInfo>(clientInfoBytes);
}
catch (Exception ex)
{
return (null, null, hdr, msg, ex);
}
}
Account? acc = null;
var serviceAccount = clientInfo?.ServiceAccount();
if (!string.IsNullOrWhiteSpace(serviceAccount))
(acc, _) = LookupAccount(serviceAccount);
if (acc is null)
{
acc = c.Account() as Account;
acc ??= SystemAccount();
}
if (acc is null)
return (clientInfo, null, hdr, msg, ServerErrors.ErrMissingAccount);
return (clientInfo, acc, hdr, msg, null);
}
internal Exception? UnmarshalRequest<T>(ClientConnection c, Account acc, string subject, byte[] message, out T? value)
{
var strictMode = JetStreamConfig()?.Strict ?? true;
value = JetStreamApi.DeserializeStrict<T>(message, strictMode);
if (value is not null)
return null;
var err = new InvalidOperationException("unable to deserialize JetStream API request");
c.RateLimitWarnf("Invalid JetStream request '{0} > {1}': {2}", acc.Name, subject, err.Message);
return err;
}
internal Exception? UnmarshalRequest(ClientConnection c, Account acc, string subject, byte[] message, object destination)
{
var strictMode = JetStreamConfig()?.Strict ?? true;
object? parsed;
try
{
parsed = JsonSerializer.Deserialize(message, destination.GetType(), new JsonSerializerOptions
{
PropertyNameCaseInsensitive = true,
UnmappedMemberHandling = JsonUnmappedMemberHandling.Disallow,
});
}
catch (Exception ex) when (!strictMode)
{
try
{
parsed = JsonSerializer.Deserialize(message, destination.GetType());
}
catch
{
c.RateLimitWarnf("Invalid JetStream request '{0} > {1}': {2}", acc.Name, subject, ex.Message);
return ex;
}
}
catch (Exception ex)
{
c.RateLimitWarnf("Invalid JetStream request '{0} > {1}': {2}", acc.Name, subject, ex.Message);
return ex;
}
if (parsed is null)
return new InvalidOperationException("empty JetStream API request body");
foreach (var property in destination.GetType().GetProperties())
{
if (!property.CanRead || !property.CanWrite)
continue;
property.SetValue(destination, property.GetValue(parsed));
}
return null;
}
internal void JsAccountInfoRequest(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage)
{
if (!JetStreamEnabled())
return;
var (ci, requestAcc, hdr, msg, err) = GetRequestInfo(c, rawMessage);
if (err is not null || requestAcc is null)
{
Warnf("Malformed JetStream API Request: {0}", Encoding.UTF8.GetString(msg));
return;
}
var response = new JsApiAccountInfoResponse
{
Type = JsApiSubjects.JsApiAccountInfoResponseType,
};
var requiredApiLevel = NatsMessageHeaders.GetHeader(JsApiSubjects.JsRequiredApiLevel, hdr);
var reqApiHeader = requiredApiLevel is null ? null : Encoding.ASCII.GetString(requiredApiLevel);
if (JetStreamVersioning.ErrorOnRequiredApiLevel(reqApiHeader))
{
response.Error = JsApiErrors.NewJSRequiredApiLevelError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
var (hasJetStream, shouldError) = requestAcc.CheckJetStream();
if (!hasJetStream)
{
if (!shouldError)
return;
response.Error = JsApiErrors.NewJSNotEnabledForAccountError();
SendAPIErrResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
return;
}
var stats = requestAcc.JetStreamUsage();
response.Memory = stats.Memory;
response.Store = stats.Store;
response.ReservedMemory = stats.ReservedMemory;
response.ReservedStore = stats.ReservedStore;
response.Streams = stats.Streams;
response.Consumers = stats.Consumers;
response.Limits = stats.Limits;
response.Domain = stats.Domain;
response.Api = stats.Api;
response.Tiers = stats.Tiers;
SendAPIResponse(ci, requestAcc, subject, reply, Encoding.UTF8.GetString(msg), JsonResponse(response));
}
internal static string StreamNameFromSubject(string subject) => JetStreamApi.StreamNameFromSubject(subject);
internal static string ConsumerNameFromSubject(string subject) => JetStreamApi.ConsumerNameFromSubject(subject);
internal string JsonResponse(object response)
{
try
{
return JsonSerializer.Serialize(response);
}
catch (Exception ex)
{
Warnf("Problem marshaling JSON for JetStream API: {0}", ex.Message);
return string.Empty;
}
}
internal void SendJetStreamAPIAuditAdvisory(ClientInfo? ci, Account acc, string subject, string request, string response)
{
_ = ci;
_ = acc;
_ = subject;
_ = request;
_ = response;
}
private Exception? SendInternalAccountMsgWithReply(Account account, string subject, string reply, byte[]? header, string response, bool trackApi)
{
_ = trackApi;
try
{
var sendQueue = account.GetSendQueue() ?? NewSendQueue(account);
SendQueue.Send(sendQueue, subject, reply, header ?? [], Encoding.UTF8.GetBytes(response));
return null;
}
catch (Exception ex)
{
return ex;
}
}
}

View File

@@ -268,11 +268,7 @@ public sealed partial class NatsServer
internal void SetupJetStreamExports()
{
var sys = SystemAccount();
if (sys == null)
return;
var err = sys.AddServiceExport(JsApiSubjects.JsAllApi, null);
var err = SetJetStreamExportSubs();
if (err != null)
Warnf("Error setting up jetstream service exports: {0}", err.Message);
}