feat(batch27): implement jetstream config validation and error tail

This commit is contained in:
Joseph Doherty
2026-02-28 21:15:57 -05:00
parent 68f32c7b85
commit 5b2d32c503
6 changed files with 214 additions and 0 deletions

View File

@@ -111,6 +111,22 @@ public sealed partial class Account
return EnableAllJetStreamServiceImportsAndMappings();
}
internal (NatsServer? Server, JsAccount? JetStreamAccount, Exception? Error) CheckForJetStream()
{
_mu.EnterReadLock();
var server = Server as NatsServer;
var jsa = JetStream;
_mu.ExitReadLock();
if (server == null || jsa == null)
{
var description = JsApiErrors.NewJSNotEnabledForAccountError().Description ?? "jetstream not enabled for account";
return (null, null, new InvalidOperationException(description));
}
return (server, jsa, null);
}
internal (bool MaxBytesRequired, long MaxStreamBytes) MaxBytesLimits(StreamConfig? cfg)
{
_mu.EnterReadLock();

View File

@@ -4,6 +4,9 @@ internal sealed class JetStreamEngine(JetStream state)
{
private readonly JetStream _state = state;
private static readonly TimeSpan MinUsageUpdateWindow = TimeSpan.FromMilliseconds(250);
private const string JsWillExtend = "will_extend";
private const string JsNoExtend = "no_extend";
private const string JsDomainApiTemplate = "$JS.{0}.API.>";
internal void SetStarted()
{
@@ -320,6 +323,119 @@ internal sealed class JetStreamEngine(JetStream state)
else if (cfg.Storage == StorageType.FileStorage)
Interlocked.Add(ref _state.StoreReserved, -cfg.MaxBytes);
}
internal static string FriendlyBytes<T>(T bytes)
where T : struct, IConvertible
{
var value = Convert.ToDouble(bytes);
const int baseValue = 1024;
var units = new[] { "K", "M", "G", "T", "P", "E" };
if (value < baseValue)
return $"{value} B";
var exp = (int)(Math.Log(value) / Math.Log(baseValue));
var index = Math.Clamp(exp - 1, 0, units.Length - 1);
return $"{value / Math.Pow(baseValue, exp):0.00} {units[index]}B";
}
internal static bool IsValidName(string name)
{
if (string.IsNullOrWhiteSpace(name))
return false;
return name.IndexOfAny([' ', '\t', '\r', '\n', '\f', '.', '*', '>']) < 0;
}
internal static Exception? ValidateJetStreamOptions(ServerOptions options)
{
foreach (var (account, domain) in options.JsAccDefaultDomain)
{
var exists = false;
if (ServerOptions.IsReservedAccount(account))
{
exists = true;
}
else if (options.TrustedOperators.Count == 0)
{
foreach (var configured in options.Accounts)
{
if (!string.Equals(configured.GetName(), account, StringComparison.Ordinal))
continue;
if (configured.JetStreamLimits?.Count > 0 && !string.IsNullOrEmpty(domain))
return new InvalidOperationException($"default_js_domain contains account name \"{account}\" with enabled JetStream");
exists = true;
break;
}
}
else
{
exists = IsLikelyPublicAccountNkey(account);
}
if (!exists)
return new InvalidOperationException($"in non operator mode, `default_js_domain` references non existing account \"{account}\"");
}
foreach (var (account, domain) in options.JsAccDefaultDomain)
{
var systemAccount = string.IsNullOrWhiteSpace(options.SystemAccount)
? ServerConstants.DefaultSystemAccount
: options.SystemAccount;
if (string.Equals(account, systemAccount, StringComparison.Ordinal))
return new InvalidOperationException($"system account \"{account}\" can not be in default_js_domain");
if (string.IsNullOrWhiteSpace(domain))
continue;
var subject = string.Format(JsDomainApiTemplate, domain);
if (!Internal.DataStructures.SubscriptionIndex.IsValidSubject(subject))
return new InvalidOperationException($"default_js_domain contains account \"{account}\" with invalid domain name \"{domain}\"");
}
if (!string.IsNullOrWhiteSpace(options.JetStreamDomain))
{
var subject = string.Format(JsDomainApiTemplate, options.JetStreamDomain);
if (!Internal.DataStructures.SubscriptionIndex.IsValidSubject(subject))
return new InvalidOperationException($"invalid domain name: derived \"{subject}\" is not a valid subject");
if (!IsValidName(options.JetStreamDomain))
return new InvalidOperationException("invalid domain name: may not contain ., * or >");
}
if (!options.JetStream || options.Cluster.Port == 0)
return null;
if (string.IsNullOrWhiteSpace(options.ServerName))
return new InvalidOperationException("jetstream cluster requires `server_name` to be set");
if (string.IsNullOrWhiteSpace(options.Cluster.Name))
return new InvalidOperationException("jetstream cluster requires `cluster.name` to be set");
var hint = options.JetStreamExtHint.ToLowerInvariant();
if (hint is not JsWillExtend and not JsNoExtend and not "")
return new InvalidOperationException($"expected 'no_extend' for string value, got '{hint}'");
options.JetStreamExtHint = hint;
if (options.JetStreamMaxCatchup < 0)
return new InvalidOperationException("jetstream max catchup cannot be negative");
return null;
}
internal static void FixCfgMirrorWithDedupWindow(StreamConfig? config)
{
if (config?.Mirror == null)
return;
if (config.Duplicates != TimeSpan.Zero)
config.Duplicates = TimeSpan.Zero;
}
private static bool IsLikelyPublicAccountNkey(string value) =>
!string.IsNullOrWhiteSpace(value) &&
value.Length >= 10 &&
value.StartsWith("A", StringComparison.Ordinal);
}
internal sealed class StreamAssignmentView

View File

@@ -707,6 +707,10 @@ public sealed partial class NatsServer
// Trusted operators, leafnode, auth, proxies, gateway, cluster, MQTT, websocket
// — validation stubs delegating to not-yet-ported subsystems.
var jsErr = JetStreamEngine.ValidateJetStreamOptions(o);
if (jsErr != null)
return jsErr;
var err = ValidateCluster(o);
return err;
}

View File

@@ -7,6 +7,7 @@ namespace ZB.MOM.NatsNet.Server;
public sealed partial class NatsServer
{
private const string JetStreamStoreDir = "jetstream";
private const long JetStreamMaxMemDefault = 1024L * 1024L * 256L;
public Exception? EnableJetStream(JetStreamConfig? config)
{
@@ -495,6 +496,81 @@ public sealed partial class NatsServer
null);
}
internal JetStreamConfig DynJetStreamConfig(string storeDir, long maxStore, long maxMem)
{
var cfg = new JetStreamConfig();
if (!string.IsNullOrWhiteSpace(storeDir))
{
cfg.StoreDir = Path.Combine(storeDir, JetStreamStoreDir);
}
else
{
cfg.StoreDir = Path.Combine(Path.GetTempPath(), "nats", JetStreamStoreDir);
Warnf("Temporary storage directory used, data could be lost on system reboot");
}
var opts = GetOpts();
cfg.Strict = !opts.NoJetStreamStrict;
cfg.SyncInterval = opts.SyncInterval;
cfg.SyncAlways = opts.SyncAlways;
cfg.MaxStore = opts.MaxStoreSet && maxStore >= 0
? maxStore
: DiskAvailability.DiskAvailable(cfg.StoreDir);
if (opts.MaxMemSet && maxMem >= 0)
{
cfg.MaxMemory = maxMem;
}
else
{
var totalAvailable = GC.GetGCMemoryInfo().TotalAvailableMemoryBytes;
cfg.MaxMemory = totalAvailable > 0 && totalAvailable < long.MaxValue
? totalAvailable / 4 * 3
: JetStreamMaxMemDefault;
}
return cfg;
}
internal void ResourcesExceededError(StorageType storeType)
{
var didAlert = false;
lock (_resourceErrorLock)
{
var now = DateTime.UtcNow;
if (now - _resourceErrorLastUtc > TimeSpan.FromSeconds(10))
{
var storeName = storeType switch
{
StorageType.MemoryStorage => "memory",
StorageType.FileStorage => "file",
_ => storeType.ToString().ToLowerInvariant(),
};
Errorf("JetStream {0} resource limits exceeded for server", storeName);
_resourceErrorLastUtc = now;
didAlert = true;
}
}
if (!didAlert)
return;
var js = GetJetStreamState();
if (js?.Cluster is JetStreamCluster { Meta: not null } cluster)
cluster.Meta.StepDown();
}
internal void HandleWritePermissionError()
{
if (!JetStreamEnabled())
return;
Errorf("File system permission denied while writing, disabling JetStream");
_ = Task.Run(() => DisableJetStream());
}
internal JetStreamEngine? GetJetStream() =>
_jetStream == null ? null : new JetStreamEngine(_jetStream);

View File

@@ -254,6 +254,8 @@ public sealed partial class NatsServer : INatsServer
private readonly ConcurrentDictionary<string, object?> _nodeToInfo = new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, object?> _raftNodes = new(StringComparer.Ordinal);
private JetStream? _jetStream;
private readonly Lock _resourceErrorLock = new();
private DateTime _resourceErrorLastUtc;
private readonly Dictionary<string, string> _routesToSelf = [];
private INetResolver? _routeResolver;
private readonly ConcurrentDictionary<string, object?> _rateLimitLogging = new();

Binary file not shown.