From 5b2d32c50316f4ac69bae9490d07c0b71508fedd Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 21:15:57 -0500 Subject: [PATCH] feat(batch27): implement jetstream config validation and error tail --- .../Accounts/Account.JetStream.cs | 16 +++ .../JetStream/JetStreamEngine.cs | 116 ++++++++++++++++++ .../ZB.MOM.NatsNet.Server/NatsServer.Init.cs | 4 + .../NatsServer.JetStreamCore.cs | 76 ++++++++++++ .../src/ZB.MOM.NatsNet.Server/NatsServer.cs | 2 + porting.db | Bin 6717440 -> 6721536 bytes 6 files changed, 214 insertions(+) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs index a62381e..c0c8865 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs @@ -111,6 +111,22 @@ public sealed partial class Account return EnableAllJetStreamServiceImportsAndMappings(); } + internal (NatsServer? Server, JsAccount? JetStreamAccount, Exception? Error) CheckForJetStream() + { + _mu.EnterReadLock(); + var server = Server as NatsServer; + var jsa = JetStream; + _mu.ExitReadLock(); + + if (server == null || jsa == null) + { + var description = JsApiErrors.NewJSNotEnabledForAccountError().Description ?? "jetstream not enabled for account"; + return (null, null, new InvalidOperationException(description)); + } + + return (server, jsa, null); + } + internal (bool MaxBytesRequired, long MaxStreamBytes) MaxBytesLimits(StreamConfig? cfg) { _mu.EnterReadLock(); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs index ba08743..522bc08 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs @@ -4,6 +4,9 @@ internal sealed class JetStreamEngine(JetStream state) { private readonly JetStream _state = state; private static readonly TimeSpan MinUsageUpdateWindow = TimeSpan.FromMilliseconds(250); + private const string JsWillExtend = "will_extend"; + private const string JsNoExtend = "no_extend"; + private const string JsDomainApiTemplate = "$JS.{0}.API.>"; internal void SetStarted() { @@ -320,6 +323,119 @@ internal sealed class JetStreamEngine(JetStream state) else if (cfg.Storage == StorageType.FileStorage) Interlocked.Add(ref _state.StoreReserved, -cfg.MaxBytes); } + + internal static string FriendlyBytes(T bytes) + where T : struct, IConvertible + { + var value = Convert.ToDouble(bytes); + const int baseValue = 1024; + var units = new[] { "K", "M", "G", "T", "P", "E" }; + + if (value < baseValue) + return $"{value} B"; + + var exp = (int)(Math.Log(value) / Math.Log(baseValue)); + var index = Math.Clamp(exp - 1, 0, units.Length - 1); + return $"{value / Math.Pow(baseValue, exp):0.00} {units[index]}B"; + } + + internal static bool IsValidName(string name) + { + if (string.IsNullOrWhiteSpace(name)) + return false; + + return name.IndexOfAny([' ', '\t', '\r', '\n', '\f', '.', '*', '>']) < 0; + } + + internal static Exception? ValidateJetStreamOptions(ServerOptions options) + { + foreach (var (account, domain) in options.JsAccDefaultDomain) + { + var exists = false; + if (ServerOptions.IsReservedAccount(account)) + { + exists = true; + } + else if (options.TrustedOperators.Count == 0) + { + foreach (var configured in options.Accounts) + { + if (!string.Equals(configured.GetName(), account, StringComparison.Ordinal)) + continue; + + if (configured.JetStreamLimits?.Count > 0 && !string.IsNullOrEmpty(domain)) + return new InvalidOperationException($"default_js_domain contains account name \"{account}\" with enabled JetStream"); + + exists = true; + break; + } + } + else + { + exists = IsLikelyPublicAccountNkey(account); + } + + if (!exists) + return new InvalidOperationException($"in non operator mode, `default_js_domain` references non existing account \"{account}\""); + } + + foreach (var (account, domain) in options.JsAccDefaultDomain) + { + var systemAccount = string.IsNullOrWhiteSpace(options.SystemAccount) + ? ServerConstants.DefaultSystemAccount + : options.SystemAccount; + + if (string.Equals(account, systemAccount, StringComparison.Ordinal)) + return new InvalidOperationException($"system account \"{account}\" can not be in default_js_domain"); + + if (string.IsNullOrWhiteSpace(domain)) + continue; + + var subject = string.Format(JsDomainApiTemplate, domain); + if (!Internal.DataStructures.SubscriptionIndex.IsValidSubject(subject)) + return new InvalidOperationException($"default_js_domain contains account \"{account}\" with invalid domain name \"{domain}\""); + } + + if (!string.IsNullOrWhiteSpace(options.JetStreamDomain)) + { + var subject = string.Format(JsDomainApiTemplate, options.JetStreamDomain); + if (!Internal.DataStructures.SubscriptionIndex.IsValidSubject(subject)) + return new InvalidOperationException($"invalid domain name: derived \"{subject}\" is not a valid subject"); + + if (!IsValidName(options.JetStreamDomain)) + return new InvalidOperationException("invalid domain name: may not contain ., * or >"); + } + + if (!options.JetStream || options.Cluster.Port == 0) + return null; + if (string.IsNullOrWhiteSpace(options.ServerName)) + return new InvalidOperationException("jetstream cluster requires `server_name` to be set"); + if (string.IsNullOrWhiteSpace(options.Cluster.Name)) + return new InvalidOperationException("jetstream cluster requires `cluster.name` to be set"); + + var hint = options.JetStreamExtHint.ToLowerInvariant(); + if (hint is not JsWillExtend and not JsNoExtend and not "") + return new InvalidOperationException($"expected 'no_extend' for string value, got '{hint}'"); + options.JetStreamExtHint = hint; + + if (options.JetStreamMaxCatchup < 0) + return new InvalidOperationException("jetstream max catchup cannot be negative"); + + return null; + } + + internal static void FixCfgMirrorWithDedupWindow(StreamConfig? config) + { + if (config?.Mirror == null) + return; + if (config.Duplicates != TimeSpan.Zero) + config.Duplicates = TimeSpan.Zero; + } + + private static bool IsLikelyPublicAccountNkey(string value) => + !string.IsNullOrWhiteSpace(value) && + value.Length >= 10 && + value.StartsWith("A", StringComparison.Ordinal); } internal sealed class StreamAssignmentView diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs index 3a740da..83c3c2f 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs @@ -707,6 +707,10 @@ public sealed partial class NatsServer // Trusted operators, leafnode, auth, proxies, gateway, cluster, MQTT, websocket // — validation stubs delegating to not-yet-ported subsystems. + var jsErr = JetStreamEngine.ValidateJetStreamOptions(o); + if (jsErr != null) + return jsErr; + var err = ValidateCluster(o); return err; } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs index 625632b..f6ee4db 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs @@ -7,6 +7,7 @@ namespace ZB.MOM.NatsNet.Server; public sealed partial class NatsServer { private const string JetStreamStoreDir = "jetstream"; + private const long JetStreamMaxMemDefault = 1024L * 1024L * 256L; public Exception? EnableJetStream(JetStreamConfig? config) { @@ -495,6 +496,81 @@ public sealed partial class NatsServer null); } + internal JetStreamConfig DynJetStreamConfig(string storeDir, long maxStore, long maxMem) + { + var cfg = new JetStreamConfig(); + if (!string.IsNullOrWhiteSpace(storeDir)) + { + cfg.StoreDir = Path.Combine(storeDir, JetStreamStoreDir); + } + else + { + cfg.StoreDir = Path.Combine(Path.GetTempPath(), "nats", JetStreamStoreDir); + Warnf("Temporary storage directory used, data could be lost on system reboot"); + } + + var opts = GetOpts(); + cfg.Strict = !opts.NoJetStreamStrict; + cfg.SyncInterval = opts.SyncInterval; + cfg.SyncAlways = opts.SyncAlways; + + cfg.MaxStore = opts.MaxStoreSet && maxStore >= 0 + ? maxStore + : DiskAvailability.DiskAvailable(cfg.StoreDir); + + if (opts.MaxMemSet && maxMem >= 0) + { + cfg.MaxMemory = maxMem; + } + else + { + var totalAvailable = GC.GetGCMemoryInfo().TotalAvailableMemoryBytes; + cfg.MaxMemory = totalAvailable > 0 && totalAvailable < long.MaxValue + ? totalAvailable / 4 * 3 + : JetStreamMaxMemDefault; + } + + return cfg; + } + + internal void ResourcesExceededError(StorageType storeType) + { + var didAlert = false; + lock (_resourceErrorLock) + { + var now = DateTime.UtcNow; + if (now - _resourceErrorLastUtc > TimeSpan.FromSeconds(10)) + { + var storeName = storeType switch + { + StorageType.MemoryStorage => "memory", + StorageType.FileStorage => "file", + _ => storeType.ToString().ToLowerInvariant(), + }; + + Errorf("JetStream {0} resource limits exceeded for server", storeName); + _resourceErrorLastUtc = now; + didAlert = true; + } + } + + if (!didAlert) + return; + + var js = GetJetStreamState(); + if (js?.Cluster is JetStreamCluster { Meta: not null } cluster) + cluster.Meta.StepDown(); + } + + internal void HandleWritePermissionError() + { + if (!JetStreamEnabled()) + return; + + Errorf("File system permission denied while writing, disabling JetStream"); + _ = Task.Run(() => DisableJetStream()); + } + internal JetStreamEngine? GetJetStream() => _jetStream == null ? null : new JetStreamEngine(_jetStream); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs index fec5439..73ffa88 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs @@ -254,6 +254,8 @@ public sealed partial class NatsServer : INatsServer private readonly ConcurrentDictionary _nodeToInfo = new(StringComparer.Ordinal); private readonly ConcurrentDictionary _raftNodes = new(StringComparer.Ordinal); private JetStream? _jetStream; + private readonly Lock _resourceErrorLock = new(); + private DateTime _resourceErrorLastUtc; private readonly Dictionary _routesToSelf = []; private INetResolver? _routeResolver; private readonly ConcurrentDictionary _rateLimitLogging = new(); diff --git a/porting.db b/porting.db index 82f20b196d9dc9409eeceec36407b3af5935ec2d..57bd4428c29b7354e5dec4e9664edf74232f1bdd 100644 GIT binary patch delta 2169 zcma*nT})GF7zglk-g5f=B?1FVJ*6OivVKr-fJ$4z4-{mZV`HGuLXUI7fC73kGwF7r zS+)d|C6A%W5apvlrntqOWFMJs%QA1&T`apXi)PW?EDp1qCA-<5R_kfnv0ePg^UwS6 z@V@7~htbh-)#!zB)tF6~;5hY>bB+dn;%blI#ou~&(j`y2_})eSXAVLU>uXgk7*3 z_CPc2g?-Qht*{^3;6-=|UWNnk3bezk@EUZ$>(B{Za1ahbH}pUV!q5wS5RsWhVj-1{ zyF{8n$s_PlK9{4=WQ|nN9}DO>JG1Z4Z>W+Yq2ZC^ks)D*0#2!xuc2e@QgN}zTj%rj z$NZUnv}&@qimsPQnkT)sC%p}sy;I7WLZVMzIwo1oP7}T~llC@eIcIU2KQ%X+elod? zV}?Hs?fON1u5LtoTgILKSX&r%##;5un~kovAAGjk1qyuFyP3wSr4m+MLpR=(@@do_ zGSii>BN|VQ<8Wv=GURo8YbxDUm2Qv2?X63E1<|5dOL82q97lnH4jc7!he~Mie1||i zC#4*f&%1>le-+u_U13~tY+rJmN;z(hN>4ExjmCR($r|mj(4Ds=fmOTzbDT4_H_Kt; zH2Em}^srULUyN*AX-`G5<}62?r5n+3eR3!GZ?Jr@m7mj|EJKL@zH@7G#{YOYw%G3E z3_F4O;1K)Ev|HFY32zE+=K&RB^ zlDkYMrSYMsfOQS~3e(uw-JUd~Cymv=WFJVLVFTPeN&Z=%lRnR|ANru&DYwdZWWQV| zSIQ;E$Sli{*e9A=VsT`fXS?avAJnyftjrjlXQdm=2Cp$uaF|Dv&l3?xZH4+*(iy*M zv@-sTNxi&}{=RX<(P$)h7?&mf-byL`4m&M9cj(LIVuO_1@5nNGG@jj^Pg=Dpuc`TF zo%{rq8LAFcVyGUYVnX!@6&b2Nlr>bn-V>5C+R!qm z60P*R$?2xHFN%$Jm~l(!u-`0-sJ%&xq47agqi<$`pIp9Pm&HqZWyOT!g{sp}StP7X z2nDZd&sN;Dc*$;~*NTW=nOIQ7D9uG#R-Zd*5y_#67Oo5MfEPAFEW|-PB*11!ge2Gk z$&do6uocoE9Wo#jw!wDT0XrcJvLOd@VHe~