diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs index a62381e..c0c8865 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs @@ -111,6 +111,22 @@ public sealed partial class Account return EnableAllJetStreamServiceImportsAndMappings(); } + internal (NatsServer? Server, JsAccount? JetStreamAccount, Exception? Error) CheckForJetStream() + { + _mu.EnterReadLock(); + var server = Server as NatsServer; + var jsa = JetStream; + _mu.ExitReadLock(); + + if (server == null || jsa == null) + { + var description = JsApiErrors.NewJSNotEnabledForAccountError().Description ?? "jetstream not enabled for account"; + return (null, null, new InvalidOperationException(description)); + } + + return (server, jsa, null); + } + internal (bool MaxBytesRequired, long MaxStreamBytes) MaxBytesLimits(StreamConfig? cfg) { _mu.EnterReadLock(); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs index ba08743..522bc08 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs @@ -4,6 +4,9 @@ internal sealed class JetStreamEngine(JetStream state) { private readonly JetStream _state = state; private static readonly TimeSpan MinUsageUpdateWindow = TimeSpan.FromMilliseconds(250); + private const string JsWillExtend = "will_extend"; + private const string JsNoExtend = "no_extend"; + private const string JsDomainApiTemplate = "$JS.{0}.API.>"; internal void SetStarted() { @@ -320,6 +323,119 @@ internal sealed class JetStreamEngine(JetStream state) else if (cfg.Storage == StorageType.FileStorage) Interlocked.Add(ref _state.StoreReserved, -cfg.MaxBytes); } + + internal static string FriendlyBytes(T bytes) + where T : struct, IConvertible + { + var value = Convert.ToDouble(bytes); + const int baseValue = 1024; + var units = new[] { "K", "M", "G", "T", "P", "E" }; + + if (value < baseValue) + return $"{value} B"; + + var exp = (int)(Math.Log(value) / Math.Log(baseValue)); + var index = Math.Clamp(exp - 1, 0, units.Length - 1); + return $"{value / Math.Pow(baseValue, exp):0.00} {units[index]}B"; + } + + internal static bool IsValidName(string name) + { + if (string.IsNullOrWhiteSpace(name)) + return false; + + return name.IndexOfAny([' ', '\t', '\r', '\n', '\f', '.', '*', '>']) < 0; + } + + internal static Exception? ValidateJetStreamOptions(ServerOptions options) + { + foreach (var (account, domain) in options.JsAccDefaultDomain) + { + var exists = false; + if (ServerOptions.IsReservedAccount(account)) + { + exists = true; + } + else if (options.TrustedOperators.Count == 0) + { + foreach (var configured in options.Accounts) + { + if (!string.Equals(configured.GetName(), account, StringComparison.Ordinal)) + continue; + + if (configured.JetStreamLimits?.Count > 0 && !string.IsNullOrEmpty(domain)) + return new InvalidOperationException($"default_js_domain contains account name \"{account}\" with enabled JetStream"); + + exists = true; + break; + } + } + else + { + exists = IsLikelyPublicAccountNkey(account); + } + + if (!exists) + return new InvalidOperationException($"in non operator mode, `default_js_domain` references non existing account \"{account}\""); + } + + foreach (var (account, domain) in options.JsAccDefaultDomain) + { + var systemAccount = string.IsNullOrWhiteSpace(options.SystemAccount) + ? ServerConstants.DefaultSystemAccount + : options.SystemAccount; + + if (string.Equals(account, systemAccount, StringComparison.Ordinal)) + return new InvalidOperationException($"system account \"{account}\" can not be in default_js_domain"); + + if (string.IsNullOrWhiteSpace(domain)) + continue; + + var subject = string.Format(JsDomainApiTemplate, domain); + if (!Internal.DataStructures.SubscriptionIndex.IsValidSubject(subject)) + return new InvalidOperationException($"default_js_domain contains account \"{account}\" with invalid domain name \"{domain}\""); + } + + if (!string.IsNullOrWhiteSpace(options.JetStreamDomain)) + { + var subject = string.Format(JsDomainApiTemplate, options.JetStreamDomain); + if (!Internal.DataStructures.SubscriptionIndex.IsValidSubject(subject)) + return new InvalidOperationException($"invalid domain name: derived \"{subject}\" is not a valid subject"); + + if (!IsValidName(options.JetStreamDomain)) + return new InvalidOperationException("invalid domain name: may not contain ., * or >"); + } + + if (!options.JetStream || options.Cluster.Port == 0) + return null; + if (string.IsNullOrWhiteSpace(options.ServerName)) + return new InvalidOperationException("jetstream cluster requires `server_name` to be set"); + if (string.IsNullOrWhiteSpace(options.Cluster.Name)) + return new InvalidOperationException("jetstream cluster requires `cluster.name` to be set"); + + var hint = options.JetStreamExtHint.ToLowerInvariant(); + if (hint is not JsWillExtend and not JsNoExtend and not "") + return new InvalidOperationException($"expected 'no_extend' for string value, got '{hint}'"); + options.JetStreamExtHint = hint; + + if (options.JetStreamMaxCatchup < 0) + return new InvalidOperationException("jetstream max catchup cannot be negative"); + + return null; + } + + internal static void FixCfgMirrorWithDedupWindow(StreamConfig? config) + { + if (config?.Mirror == null) + return; + if (config.Duplicates != TimeSpan.Zero) + config.Duplicates = TimeSpan.Zero; + } + + private static bool IsLikelyPublicAccountNkey(string value) => + !string.IsNullOrWhiteSpace(value) && + value.Length >= 10 && + value.StartsWith("A", StringComparison.Ordinal); } internal sealed class StreamAssignmentView diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs index 3a740da..83c3c2f 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs @@ -707,6 +707,10 @@ public sealed partial class NatsServer // Trusted operators, leafnode, auth, proxies, gateway, cluster, MQTT, websocket // — validation stubs delegating to not-yet-ported subsystems. + var jsErr = JetStreamEngine.ValidateJetStreamOptions(o); + if (jsErr != null) + return jsErr; + var err = ValidateCluster(o); return err; } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs index 625632b..f6ee4db 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs @@ -7,6 +7,7 @@ namespace ZB.MOM.NatsNet.Server; public sealed partial class NatsServer { private const string JetStreamStoreDir = "jetstream"; + private const long JetStreamMaxMemDefault = 1024L * 1024L * 256L; public Exception? EnableJetStream(JetStreamConfig? config) { @@ -495,6 +496,81 @@ public sealed partial class NatsServer null); } + internal JetStreamConfig DynJetStreamConfig(string storeDir, long maxStore, long maxMem) + { + var cfg = new JetStreamConfig(); + if (!string.IsNullOrWhiteSpace(storeDir)) + { + cfg.StoreDir = Path.Combine(storeDir, JetStreamStoreDir); + } + else + { + cfg.StoreDir = Path.Combine(Path.GetTempPath(), "nats", JetStreamStoreDir); + Warnf("Temporary storage directory used, data could be lost on system reboot"); + } + + var opts = GetOpts(); + cfg.Strict = !opts.NoJetStreamStrict; + cfg.SyncInterval = opts.SyncInterval; + cfg.SyncAlways = opts.SyncAlways; + + cfg.MaxStore = opts.MaxStoreSet && maxStore >= 0 + ? maxStore + : DiskAvailability.DiskAvailable(cfg.StoreDir); + + if (opts.MaxMemSet && maxMem >= 0) + { + cfg.MaxMemory = maxMem; + } + else + { + var totalAvailable = GC.GetGCMemoryInfo().TotalAvailableMemoryBytes; + cfg.MaxMemory = totalAvailable > 0 && totalAvailable < long.MaxValue + ? totalAvailable / 4 * 3 + : JetStreamMaxMemDefault; + } + + return cfg; + } + + internal void ResourcesExceededError(StorageType storeType) + { + var didAlert = false; + lock (_resourceErrorLock) + { + var now = DateTime.UtcNow; + if (now - _resourceErrorLastUtc > TimeSpan.FromSeconds(10)) + { + var storeName = storeType switch + { + StorageType.MemoryStorage => "memory", + StorageType.FileStorage => "file", + _ => storeType.ToString().ToLowerInvariant(), + }; + + Errorf("JetStream {0} resource limits exceeded for server", storeName); + _resourceErrorLastUtc = now; + didAlert = true; + } + } + + if (!didAlert) + return; + + var js = GetJetStreamState(); + if (js?.Cluster is JetStreamCluster { Meta: not null } cluster) + cluster.Meta.StepDown(); + } + + internal void HandleWritePermissionError() + { + if (!JetStreamEnabled()) + return; + + Errorf("File system permission denied while writing, disabling JetStream"); + _ = Task.Run(() => DisableJetStream()); + } + internal JetStreamEngine? GetJetStream() => _jetStream == null ? null : new JetStreamEngine(_jetStream); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs index fec5439..73ffa88 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs @@ -254,6 +254,8 @@ public sealed partial class NatsServer : INatsServer private readonly ConcurrentDictionary _nodeToInfo = new(StringComparer.Ordinal); private readonly ConcurrentDictionary _raftNodes = new(StringComparer.Ordinal); private JetStream? _jetStream; + private readonly Lock _resourceErrorLock = new(); + private DateTime _resourceErrorLastUtc; private readonly Dictionary _routesToSelf = []; private INetResolver? _routeResolver; private readonly ConcurrentDictionary _rateLimitLogging = new(); diff --git a/porting.db b/porting.db index 82f20b1..57bd442 100644 Binary files a/porting.db and b/porting.db differ