feat(batch28): merge jetstream-api

This commit is contained in:
Joseph Doherty
2026-02-28 22:41:44 -05:00
8 changed files with 1974 additions and 6 deletions

View File

@@ -178,6 +178,72 @@ public sealed partial class Account
}
}
internal void TrackAPI()
{
_mu.EnterReadLock();
var jsa = JetStream;
_mu.ExitReadLock();
if (jsa is null)
return;
jsa.UsageLock.EnterWriteLock();
try
{
jsa.UsageApi++;
jsa.ApiTotal++;
jsa.SendClusterUsageUpdate();
if (jsa.Js is JetStream js)
Interlocked.Add(ref js.ApiTotal, 1);
}
finally
{
jsa.UsageLock.ExitWriteLock();
}
}
internal void TrackAPIErr()
{
_mu.EnterReadLock();
var jsa = JetStream;
_mu.ExitReadLock();
if (jsa is null)
return;
jsa.UsageLock.EnterWriteLock();
try
{
jsa.UsageApi++;
jsa.ApiTotal++;
jsa.UsageErr++;
jsa.ApiErrors++;
jsa.SendClusterUsageUpdate();
if (jsa.Js is JetStream js)
{
Interlocked.Add(ref js.ApiTotal, 1);
Interlocked.Add(ref js.ApiErrors, 1);
}
}
finally
{
jsa.UsageLock.ExitWriteLock();
}
}
internal (bool Enabled, bool ShouldError) CheckJetStream()
{
_mu.EnterReadLock();
try
{
return (JetStream is not null, _nleafs + _nrleafs == 0);
}
finally
{
_mu.ExitReadLock();
}
}
internal (bool MaxBytesRequired, long MaxStreamBytes) MaxBytesLimits(StreamConfig? cfg)
{
_mu.EnterReadLock();
@@ -530,4 +596,22 @@ public sealed partial class Account
return EnableAllJetStreamServiceImportsAndMappings();
}
internal Exception? JsNonClusteredStreamLimitsCheck(StreamConfig config)
{
var (server, jsa, err) = CheckForJetStream();
if (err is not null || server is null || jsa is null)
return err ?? new InvalidOperationException("jetstream not enabled for account");
var selected = jsa.SelectLimits(config.Replicas);
if (!selected.Found)
return new InvalidOperationException(JsApiErrors.NewJSNoLimitsError().Description ?? "jetstream limits not configured");
var reservation = jsa.TieredReservation(selected.Tier, config);
var js = server.GetJetStream();
if (js is null)
return new InvalidOperationException("jetstream not enabled");
return js.CheckAccountLimits(selected.Limits, config, reservation);
}
}

View File

@@ -0,0 +1,190 @@
using System.Text.Json;
using System.Text.Json.Serialization;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
namespace ZB.MOM.NatsNet.Server;
internal delegate void JetStreamApiHandler(ClientConnection c, Account acc, string subject, string reply, byte[] message);
internal sealed class JetStreamApiSubscription
{
public required string Subject { get; init; }
public required JetStreamApiHandler Handler { get; init; }
}
internal sealed class JsApiRoutedRequest
{
public required JetStreamApiSubscription Subscription { get; init; }
public required Account Account { get; init; }
public required string Subject { get; init; }
public required string Reply { get; init; }
public required byte[] Message { get; init; }
}
internal sealed class DelayedApiResponse
{
public ClientInfo? ClientInfo { get; init; }
public required Account Account { get; init; }
public required string Subject { get; init; }
public required string Reply { get; init; }
public string Request { get; init; } = string.Empty;
public byte[]? Header { get; init; }
public required string Response { get; init; }
public DateTime DeadlineUtc { get; init; }
public bool RawResponse { get; init; }
public DelayedApiResponse? Next { get; set; }
}
internal static class JetStreamApi
{
internal static readonly TimeSpan ErrorResponseDelay = TimeSpan.FromMilliseconds(500);
internal static Dictionary<string, string> GenerateJSMappingTable(string domain)
{
var mappings = new Dictionary<string, string>(StringComparer.Ordinal)
{
["INFO"] = JsApiSubjects.JsApiAccountInfo,
["STREAM.>"] = "$JS.API.STREAM.>",
["CONSUMER.>"] = "$JS.API.CONSUMER.>",
["DIRECT.>"] = "$JS.API.DIRECT.>",
["META.>"] = "$JS.API.META.>",
["SERVER.>"] = "$JS.API.SERVER.>",
["ACCOUNT.>"] = "$JS.API.ACCOUNT.>",
["$KV.>"] = "$KV.>",
["$OBJ.>"] = "$OBJ.>",
};
var table = new Dictionary<string, string>(StringComparer.Ordinal);
foreach (var (suffix, target) in mappings)
table[$"$JS.{domain}.API.{suffix}"] = target;
return table;
}
internal static void AddDelayedResponse(ref DelayedApiResponse? head, ref DelayedApiResponse? tail, DelayedApiResponse response)
{
if (head is null)
{
head = response;
tail = response;
return;
}
if (tail is not null && response.DeadlineUtc >= tail.DeadlineUtc)
{
tail.Next = response;
tail = response;
return;
}
DelayedApiResponse? previous = null;
for (var current = head; current is not null; current = current.Next)
{
if (response.DeadlineUtc < current.DeadlineUtc)
{
response.Next = current;
if (previous is null)
head = response;
else
previous.Next = response;
return;
}
previous = current;
}
if (tail is not null)
tail.Next = response;
tail = response;
}
internal static string StreamNameFromSubject(string subject) =>
SubscriptionIndex.TokenAt(subject, 5);
internal static string ConsumerNameFromSubject(string subject) =>
SubscriptionIndex.TokenAt(subject, 6);
internal static bool IsJSONObjectOrArray(ReadOnlySpan<byte> payload)
{
for (var i = 0; i < payload.Length; i++)
{
var ch = payload[i];
if (ch is (byte)' ' or (byte)'\t' or (byte)'\r' or (byte)'\n')
continue;
return ch is (byte)'{' or (byte)'[';
}
return false;
}
internal static bool IsEmptyRequest(ReadOnlySpan<byte> payload)
{
if (payload.IsEmpty)
return true;
var start = 0;
var end = payload.Length - 1;
while (start < payload.Length && payload[start] is (byte)' ' or (byte)'\t' or (byte)'\r' or (byte)'\n')
start++;
while (end >= start && payload[end] is (byte)' ' or (byte)'\t' or (byte)'\r' or (byte)'\n')
end--;
if (end < start)
return true;
var trimmed = payload.Slice(start, end - start + 1);
if (trimmed.SequenceEqual("{}"u8))
return true;
try
{
using var document = JsonDocument.Parse(trimmed.ToArray());
return document.RootElement.ValueKind == JsonValueKind.Object &&
!document.RootElement.EnumerateObject().Any();
}
catch
{
return false;
}
}
internal static bool SubjectMatches(string pattern, string subject)
{
var p = pattern.Split('.', StringSplitOptions.None);
var s = subject.Split('.', StringSplitOptions.None);
for (var i = 0; i < p.Length; i++)
{
if (i >= s.Length)
return p[i] == ">";
var token = p[i];
if (token == ">")
return true;
if (token == "*")
continue;
if (!string.Equals(token, s[i], StringComparison.Ordinal))
return false;
}
return p.Length == s.Length;
}
internal static T? DeserializeStrict<T>(byte[] payload, bool strictMode)
{
var strictOptions = new JsonSerializerOptions
{
PropertyNameCaseInsensitive = true,
UnmappedMemberHandling = JsonUnmappedMemberHandling.Disallow,
};
try
{
return JsonSerializer.Deserialize<T>(payload, strictOptions);
}
catch when (!strictMode)
{
return JsonSerializer.Deserialize<T>(payload);
}
}
}

View File

@@ -1,3 +1,6 @@
using System.Text;
using ZB.MOM.NatsNet.Server.Internal;
namespace ZB.MOM.NatsNet.Server;
internal sealed class JetStreamEngine(JetStream state)
@@ -7,6 +10,7 @@ internal sealed class JetStreamEngine(JetStream state)
private const string JsWillExtend = "will_extend";
private const string JsNoExtend = "no_extend";
private const string JsDomainApiTemplate = "$JS.{0}.API.>";
private const string JsApiAccountPrefix = "$JS.API.ACCOUNT.";
internal void SetStarted()
{
@@ -21,6 +25,91 @@ internal sealed class JetStreamEngine(JetStream state)
}
}
internal void ApiDispatch(ClientConnection c, Account acc, string subject, string reply, byte[] rawMessage)
{
// These meta/system directives are handled elsewhere.
if (subject == JsApiSubjects.JsApiLeaderStepDown ||
subject == JsApiSubjects.JsApiRemoveServer ||
subject.StartsWith(JsApiAccountPrefix, StringComparison.Ordinal))
{
return;
}
if (_state.Server is not NatsServer server)
return;
var route = server.MatchJetStreamApiSubscription(subject);
var (header, payload) = c.MsgParts(rawMessage);
var requestBody = Encoding.UTF8.GetString(payload);
if (NatsMessageHeaders.GetHeader(AccountEventConstants.ClientInfoHeader, header) is not { Length: > 0 })
{
var systemAccount = server.SystemAccount();
if (!ReferenceEquals(systemAccount, acc))
return;
if (subject != JsApiSubjects.JsApiAccountInfo)
{
if (c.Kind is ClientKind.Client or ClientKind.Leaf)
{
var response = new ApiResponse
{
Type = JsApiSubjects.JsApiSystemResponseType,
Error = JsApiErrors.NewJSNotEnabledForAccountError(),
};
server.SendAPIErrResponse(null, acc, subject, reply, requestBody, server.JsonResponse(response));
}
return;
}
}
if (route is null)
{
if (c.Kind is ClientKind.Client or ClientKind.Leaf)
{
var (_, requestAcc, _, _, _) = server.GetRequestInfo(c, rawMessage);
if (requestAcc is not null && !ReferenceEquals(requestAcc, server.SystemAccount()))
{
var response = new ApiResponse
{
Type = JsApiSubjects.JsApiSystemResponseType,
Error = JsApiErrors.NewJSBadRequestError(),
};
server.SendAPIErrResponse(null, requestAcc, subject, reply, requestBody, server.JsonResponse(response));
}
}
return;
}
Interlocked.Add(ref _state.ApiInflight, 1);
var request = new JsApiRoutedRequest
{
Subscription = route,
Account = acc,
Subject = subject,
Reply = reply,
Message = rawMessage.ToArray(),
};
var (queued, dropped) = server.EnqueueJSApiRequest(_state, request);
if (!queued)
{
if (dropped > 0)
{
server.PublishAdvisory(
server.SystemAccount(),
JsApiSubjects.JsAdvisoryApiLimitReached,
new { Type = "io.nats.jetstream.advisory.v1.api_limit_reached", Dropped = dropped, Time = DateTime.UtcNow });
}
route.Handler(c, acc, subject, reply, rawMessage);
Interlocked.Add(ref _state.ApiInflight, -1);
}
}
internal bool IsEnabled() => Interlocked.CompareExchange(ref _state.Disabled, 0, 0) == 0;
internal void SetJetStreamStandAlone(bool isStandAlone)

View File

@@ -245,6 +245,38 @@ internal sealed partial class JsAccount
}
}
internal long TieredReservation(string tier, StreamConfig cfg)
{
long reservation = 0;
Lock.EnterReadLock();
try
{
foreach (var stream in Streams.Values.OfType<NatsStream>())
{
var streamCfg = stream.Config;
if (streamCfg.MaxBytes <= 0 || string.Equals(streamCfg.Name, cfg.Name, StringComparison.Ordinal))
continue;
if (string.IsNullOrEmpty(tier))
{
if (streamCfg.Storage == cfg.Storage)
reservation += Math.Max(1, streamCfg.Replicas) * streamCfg.MaxBytes;
continue;
}
if (streamCfg.Replicas == cfg.Replicas && JetStreamEngine.IsSameTier(streamCfg, cfg))
reservation += Math.Max(1, streamCfg.Replicas) * streamCfg.MaxBytes;
}
return reservation;
}
finally
{
Lock.ExitReadLock();
}
}
internal (ulong Mem, ulong Store) StorageTotals()
{
UsageLock.EnterReadLock();

File diff suppressed because it is too large Load Diff

View File

@@ -268,11 +268,7 @@ public sealed partial class NatsServer
internal void SetupJetStreamExports()
{
var sys = SystemAccount();
if (sys == null)
return;
var err = sys.AddServiceExport(JsApiSubjects.JsAllApi, null);
var err = SetJetStreamExportSubs();
if (err != null)
Warnf("Error setting up jetstream service exports: {0}", err.Message);
}

View File

@@ -15,6 +15,72 @@ public sealed class JetStreamEngineTests
err!.Message.ShouldContain("unavailable");
}
[Fact] // T:1716
public void IsJSONObjectOrArray_ShouldSucceed()
{
var tests = new (string Name, byte[] Data, bool Valid)[]
{
("empty", [], false),
("empty_object", "{}"u8.ToArray(), true),
("empty object, not trimmed", "\t\n\r{ }"u8.ToArray(), true),
("empty_array", "[]"u8.ToArray(), true),
("empty array, not trimmed", "\t\n\r[ ]"u8.ToArray(), true),
("empty_string", "\"\""u8.ToArray(), false),
("whitespace_only", " "u8.ToArray(), false),
("object_with_whitespace", "{ }"u8.ToArray(), true),
("array_with_whitespace", "[ ]"u8.ToArray(), true),
("string_with_whitespace", " \"text\""u8.ToArray(), false),
("number", "123"u8.ToArray(), false),
("boolean_true", "true"u8.ToArray(), false),
("boolean_false", "false"u8.ToArray(), false),
("null_value", "null"u8.ToArray(), false),
("invalid_json", "invalid"u8.ToArray(), false),
};
foreach (var test in tests)
{
JetStreamApi.IsJSONObjectOrArray(test.Data).ShouldBe(test.Valid, test.Name);
}
}
[Fact] // T:1719
public void JetStreamDelayedAPIResponses_ShouldSucceed()
{
var account = new Account { Name = "ACC" };
DelayedApiResponse? head = null;
DelayedApiResponse? tail = null;
var now = DateTime.UtcNow;
var responses = new[]
{
NewDelayed(account, "request2", "response2", now.AddMilliseconds(500)),
NewDelayed(account, "request1", "response1", now.AddMilliseconds(200)),
NewDelayed(account, "request4", "response4", now.AddMilliseconds(800)),
NewDelayed(account, "request3", "response3", now.AddMilliseconds(650)),
};
foreach (var response in responses)
JetStreamApi.AddDelayedResponse(ref head, ref tail, response);
var orderedRequests = new List<string>();
for (var current = head; current is not null; current = current.Next)
orderedRequests.Add(current.Request);
orderedRequests.ShouldBe(["request1", "request2", "request3", "request4"]);
}
private static DelayedApiResponse NewDelayed(Account account, string request, string response, DateTime deadlineUtc) =>
new()
{
Account = account,
Subject = "SUB",
Reply = string.Empty,
Request = request,
Response = response,
DeadlineUtc = deadlineUtc,
};
[Fact] // T:1477
public void JetStreamMaxConsumers_ShouldSucceed()
{

View File

@@ -1,6 +1,6 @@
# NATS .NET Porting Status Report
Generated: 2026-03-01 03:33:10 UTC
Generated: 2026-03-01 03:41:45 UTC
## Modules (12 total)