diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs new file mode 100644 index 0000000..c0c8865 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs @@ -0,0 +1,482 @@ +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class Account +{ + private static Dictionary DefaultJetStreamAccountTiers() + { + return new Dictionary(StringComparer.Ordinal) + { + [string.Empty] = new JetStreamAccountLimits + { + MaxMemory = -1, + MaxStore = -1, + MaxStreams = -1, + MaxConsumers = -1, + MaxAckPending = -1, + MemoryMaxStreamBytes = -1, + StoreMaxStreamBytes = -1, + }, + }; + } + + private static Dictionary ToTypedLimits(Dictionary limits) + { + var typed = new Dictionary(StringComparer.Ordinal); + foreach (var (tier, value) in limits) + { + if (value is JetStreamAccountLimits v) + typed[tier] = v; + } + + return typed; + } + + private static JetStreamAccountLimits SelectLimits(Dictionary limits) + { + if (limits.TryGetValue(string.Empty, out var selected)) + return selected; + + foreach (var (_, value) in limits) + return value; + + return new JetStreamAccountLimits(); + } + + internal void AssignJetStreamLimits(Dictionary limits) + { + _mu.EnterWriteLock(); + try + { + JetStreamLimits = limits; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal Exception? EnableJetStream(Dictionary? limits) + { + _mu.EnterReadLock(); + var server = Server as NatsServer; + _mu.ExitReadLock(); + + if (server == null) + return new InvalidOperationException("jetstream account not registered"); + if (ReferenceEquals(server.SystemAccount(), this)) + return new InvalidOperationException("jetstream can not be enabled on the system account"); + + limits ??= DefaultJetStreamAccountTiers(); + if (limits.Count == 0) + limits = DefaultJetStreamAccountTiers(); + + AssignJetStreamLimits(limits); + var typedLimits = ToTypedLimits(limits); + + var js = server.GetJetStreamState(); + if (js == null) + return new InvalidOperationException("jetstream not enabled"); + + js.Lock.EnterWriteLock(); + try + { + if (js.Accounts.TryGetValue(Name, out var existing)) + { + _mu.EnterWriteLock(); + JetStream = existing; + _mu.ExitWriteLock(); + return EnableAllJetStreamServiceImportsAndMappings(); + } + + var jsa = new JsAccount + { + Js = js, + Account = this, + StoreDir = Path.Combine(js.Config.StoreDir, Name), + }; + + foreach (var (tier, tierLimits) in typedLimits) + jsa.Limits[tier] = tierLimits; + + js.Accounts[Name] = jsa; + _mu.EnterWriteLock(); + JetStream = jsa; + _mu.ExitWriteLock(); + } + finally + { + js.Lock.ExitWriteLock(); + } + + return EnableAllJetStreamServiceImportsAndMappings(); + } + + internal (NatsServer? Server, JsAccount? JetStreamAccount, Exception? Error) CheckForJetStream() + { + _mu.EnterReadLock(); + var server = Server as NatsServer; + var jsa = JetStream; + _mu.ExitReadLock(); + + if (server == null || jsa == null) + { + var description = JsApiErrors.NewJSNotEnabledForAccountError().Description ?? "jetstream not enabled for account"; + return (null, null, new InvalidOperationException(description)); + } + + return (server, jsa, null); + } + + internal (bool MaxBytesRequired, long MaxStreamBytes) MaxBytesLimits(StreamConfig? cfg) + { + _mu.EnterReadLock(); + var jsa = JetStream; + _mu.ExitReadLock(); + if (jsa == null) + return (false, 0); + + jsa.UsageLock.EnterReadLock(); + try + { + var selected = SelectLimits(jsa.Limits); + var maxStreamBytes = cfg?.Storage == StorageType.MemoryStorage + ? selected.MemoryMaxStreamBytes + : selected.StoreMaxStreamBytes; + return (selected.MaxBytesRequired, maxStreamBytes); + } + finally + { + jsa.UsageLock.ExitReadLock(); + } + } + + internal int NumStreams() + { + _mu.EnterReadLock(); + var jsa = JetStream; + _mu.ExitReadLock(); + if (jsa == null) + return 0; + + jsa.Lock.EnterReadLock(); + try + { + return jsa.Streams.Count; + } + finally + { + jsa.Lock.ExitReadLock(); + } + } + + internal List Streams() => FilteredStreams(string.Empty); + + internal List FilteredStreams(string filter) + { + _mu.EnterReadLock(); + var jsa = JetStream; + _mu.ExitReadLock(); + if (jsa == null) + return []; + + jsa.Lock.EnterReadLock(); + try + { + var streams = new List(); + foreach (var stream in jsa.Streams.Values.OfType()) + { + if (string.IsNullOrWhiteSpace(filter)) + { + streams.Add(stream); + continue; + } + + foreach (var subject in stream.Config.Subjects ?? []) + { + if (Internal.DataStructures.SubscriptionIndex.SubjectsCollide(filter, subject)) + { + streams.Add(stream); + break; + } + } + } + + return streams; + } + finally + { + jsa.Lock.ExitReadLock(); + } + } + + internal (NatsStream? Stream, Exception? Error) LookupStream(string name) + { + _mu.EnterReadLock(); + var jsa = JetStream; + _mu.ExitReadLock(); + if (jsa == null) + return (null, new InvalidOperationException("jetstream not enabled for account")); + + jsa.Lock.EnterReadLock(); + try + { + if (jsa.Streams.TryGetValue(name, out var stream) && stream is NatsStream ns) + return (ns, null); + + return (null, new InvalidOperationException("stream not found")); + } + finally + { + jsa.Lock.ExitReadLock(); + } + } + + internal Exception? UpdateJetStreamLimits(Dictionary? limits) + { + _mu.EnterReadLock(); + var server = Server as NatsServer; + var jsa = JetStream; + _mu.ExitReadLock(); + + if (server == null) + return new InvalidOperationException("jetstream account not registered"); + if (server.GetJetStreamState() == null) + return new InvalidOperationException("jetstream not enabled"); + if (jsa == null) + return new InvalidOperationException("jetstream not enabled for account"); + + limits ??= DefaultJetStreamAccountTiers(); + if (limits.Count == 0) + limits = DefaultJetStreamAccountTiers(); + AssignJetStreamLimits(limits); + + var typed = ToTypedLimits(limits); + jsa.UsageLock.EnterWriteLock(); + try + { + jsa.Limits.Clear(); + foreach (var (tier, tierLimits) in typed) + jsa.Limits[tier] = tierLimits; + } + finally + { + jsa.UsageLock.ExitWriteLock(); + } + + return null; + } + + public JetStreamAccountStats JetStreamUsage() + { + _mu.EnterReadLock(); + var jsa = JetStream; + var accountName = Name; + var configuredLimits = JetStreamLimits; + _mu.ExitReadLock(); + + var stats = new JetStreamAccountStats(); + if (jsa == null) + return stats; + + var (js, _) = jsa.JetStreamAndClustered(); + if (js == null) + return stats; + + jsa.UsageLock.EnterReadLock(); + try + { + long mem = 0; + long store = 0; + foreach (var usage in jsa.Usage.Values) + { + mem += usage.Total.Mem; + store += usage.Total.Store; + } + + stats.Memory = (ulong)Math.Max(0, mem); + stats.Store = (ulong)Math.Max(0, store); + stats.Domain = js.Config.Domain; + stats.Api = new JetStreamApiStats + { + Level = JetStreamVersioning.JsApiLevel, + Total = jsa.ApiTotal, + Errors = jsa.ApiErrors, + }; + + if (jsa.Limits.TryGetValue(string.Empty, out var defaultTier)) + { + stats.Limits = defaultTier; + } + else + { + stats.Tiers = new Dictionary(StringComparer.Ordinal); + foreach (var (tier, usage) in jsa.Usage) + { + jsa.Limits.TryGetValue(tier, out var tierLimits); + stats.Tiers[tier] = new JetStreamTier + { + Memory = (ulong)Math.Max(0, usage.Total.Mem), + Store = (ulong)Math.Max(0, usage.Total.Store), + Limits = tierLimits ?? new JetStreamAccountLimits(), + }; + } + + if (configuredLimits != null) + { + foreach (var (tier, value) in configuredLimits) + { + if (stats.Tiers.ContainsKey(tier)) + continue; + if (value is not JetStreamAccountLimits lim) + continue; + stats.Tiers[tier] = new JetStreamTier { Limits = lim }; + } + } + } + } + finally + { + jsa.UsageLock.ExitReadLock(); + } + + var allStreams = Streams(); + stats.Streams = allStreams.Count; + foreach (var stream in allStreams) + stats.Consumers += stream.State().Consumers; + + if (stats.Tiers != null) + { + foreach (var stream in allStreams) + { + var tier = JetStreamEngine.TierName(stream.Config.Replicas); + if (!stats.Tiers.TryGetValue(tier, out var u)) + u = new JetStreamTier(); + u.Streams++; + u.Consumers += stream.State().Consumers; + stats.Tiers[tier] = u; + } + } + + if (stats.Tiers == null || stats.Tiers.Count == 0) + { + var (rmem, rstore) = jsa.ReservedStorage(string.Empty); + stats.ReservedMemory = rmem; + stats.ReservedStore = rstore; + } + else + { + foreach (var tier in stats.Tiers.Keys.ToArray()) + { + var tierStats = stats.Tiers[tier]; + (tierStats.ReservedMemory, tierStats.ReservedStore) = jsa.ReservedStorage(tier); + stats.Tiers[tier] = tierStats; + } + } + + _ = accountName; + return stats; + } + + internal Exception? DisableJetStream() => RemoveJetStream(); + + internal Exception? RemoveJetStream() + { + _mu.EnterWriteLock(); + var server = Server as NatsServer; + var jsa = JetStream; + JetStream = null; + _mu.ExitWriteLock(); + + if (server == null) + return new InvalidOperationException("jetstream account not registered"); + var js = server.GetJetStream(); + if (js == null) + return new InvalidOperationException("jetstream not enabled for account"); + + return js.DisableJetStream(jsa); + } + + internal bool JetStreamConfigured() + { + _mu.EnterReadLock(); + try + { + return JetStreamLimits != null && JetStreamLimits.Count > 0; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal bool JetStreamEnabled() + { + _mu.EnterReadLock(); + try + { + return JetStream != null; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal Exception? EnableAllJetStreamServiceImportsAndMappings() + { + _mu.EnterReadLock(); + var server = Server as NatsServer; + _mu.ExitReadLock(); + + if (server == null) + return new InvalidOperationException("jetstream account not registered"); + + var systemAccount = server.SystemAccount(); + var destinationName = systemAccount?.Name ?? string.Empty; + + if (systemAccount != null && !ServiceImportExists(destinationName, JsApiSubjects.JsAllApi)) + { + var err = AddServiceImport(systemAccount, JsApiSubjects.JsAllApi, JsApiSubjects.JsAllApi); + if (err != null) + return new InvalidOperationException($"error setting up jetstream service imports for account: {err.Message}", err); + } + + var domain = server.GetOpts().JetStreamDomain; + if (!string.IsNullOrWhiteSpace(domain)) + { + var mappings = new Dictionary(StringComparer.Ordinal) + { + [$"$JS.{domain}.API.>"] = JsApiSubjects.JsAllApi, + [$"$JS.{domain}.API.INFO"] = JsApiSubjects.JsApiAccountInfo, + }; + + _mu.EnterReadLock(); + try + { + foreach (var mapping in _mappings) + mappings.Remove(mapping.Source); + } + finally + { + _mu.ExitReadLock(); + } + + foreach (var (src, dest) in mappings) + { + var err = AddMapping(src, dest); + if (err != null) + server.Errorf("Error adding JetStream domain mapping: {0}", err.Message); + } + } + + return null; + } + + internal Exception? EnableJetStreamInfoServiceImportOnly() + { + if (ServiceImportShadowed(JsApiSubjects.JsApiAccountInfo)) + return null; + + return EnableAllJetStreamServiceImportsAndMappings(); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs index a9536c4..7d33ec0 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs @@ -32,7 +32,7 @@ namespace ZB.MOM.NatsNet.Server; /// can interact with it without a hard dependency. /// Mirrors Go Account struct in server/accounts.go. /// -public sealed class Account : INatsAccount +public sealed partial class Account : INatsAccount { // ------------------------------------------------------------------------- // Constants @@ -261,7 +261,7 @@ public sealed class Account : INatsAccount /// JetStream account state. Mirrors Go js *jsAccount. /// TODO: session 19 — JetStream implementation. /// - internal object? JetStream { get; set; } + internal JsAccount? JetStream { get; set; } /// /// Per-domain JetStream limits. Mirrors Go jsLimits map[string]JetStreamAccountLimits. diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs new file mode 100644 index 0000000..522bc08 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs @@ -0,0 +1,444 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed class JetStreamEngine(JetStream state) +{ + private readonly JetStream _state = state; + private static readonly TimeSpan MinUsageUpdateWindow = TimeSpan.FromMilliseconds(250); + private const string JsWillExtend = "will_extend"; + private const string JsNoExtend = "no_extend"; + private const string JsDomainApiTemplate = "$JS.{0}.API.>"; + + internal void SetStarted() + { + _state.Lock.EnterWriteLock(); + try + { + _state.Started = DateTime.UtcNow; + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal bool IsEnabled() => Interlocked.CompareExchange(ref _state.Disabled, 0, 0) == 0; + + internal void SetJetStreamStandAlone(bool isStandAlone) + { + _state.Lock.EnterWriteLock(); + try + { + _state.StandAlone = isStandAlone; + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal bool IsShuttingDown() + { + _state.Lock.EnterReadLock(); + try + { + return _state.ShuttingDown; + } + finally + { + _state.Lock.ExitReadLock(); + } + } + + internal Exception? DisableJetStream(JsAccount? account) + { + if (account?.Account is not Account a) + return new InvalidOperationException("jetstream not enabled for account"); + + _state.Lock.EnterWriteLock(); + try + { + _state.Accounts.Remove(a.Name); + } + finally + { + _state.Lock.ExitWriteLock(); + } + + account.Delete(); + return null; + } + + internal bool WouldExceedLimits(StorageType storageType, int size) + { + var total = storageType == StorageType.MemoryStorage + ? Interlocked.Read(ref _state.MemUsed) + : Interlocked.Read(ref _state.StoreUsed); + var max = storageType == StorageType.MemoryStorage + ? _state.Config.MaxMemory + : _state.Config.MaxStore; + return total + size > max; + } + + internal bool LimitsExceeded(StorageType storageType) => WouldExceedLimits(storageType, 0); + + internal static string TierName(int replicas) => $"R{(replicas <= 0 ? 1 : replicas)}"; + + internal static bool IsSameTier(StreamConfig cfgA, StreamConfig cfgB) => + cfgA.Replicas == cfgB.Replicas; + + internal static Dictionary DiffCheckedLimits( + Dictionary a, + Dictionary b) + { + var diff = new Dictionary(StringComparer.Ordinal); + + foreach (var (tier, oldLimit) in a) + { + b.TryGetValue(tier, out var newLimit); + newLimit ??= new JetStreamAccountLimits(); + diff[tier] = new JetStreamAccountLimits + { + MaxMemory = newLimit.MaxMemory - oldLimit.MaxMemory, + MaxStore = newLimit.MaxStore - oldLimit.MaxStore, + }; + } + + foreach (var (tier, newLimit) in b) + { + if (a.ContainsKey(tier)) + continue; + + diff[tier] = new JetStreamAccountLimits + { + MaxMemory = newLimit.MaxMemory, + MaxStore = newLimit.MaxStore, + }; + } + + return diff; + } + + internal static (ulong Mem, ulong Store) ReservedStorage( + Dictionary streamAssignments, + string tier) + { + ulong mem = 0; + ulong store = 0; + + foreach (var assignment in streamAssignments.Values.OfType()) + { + var cfg = assignment.Config; + if (!string.IsNullOrEmpty(tier) && !string.Equals(tier, TierName(cfg.Replicas), StringComparison.Ordinal)) + continue; + if (cfg.MaxBytes <= 0) + continue; + + if (cfg.Storage == StorageType.FileStorage) + store += (ulong)cfg.MaxBytes; + else if (cfg.Storage == StorageType.MemoryStorage) + mem += (ulong)cfg.MaxBytes; + } + + return (mem, store); + } + + internal static bool ShouldSendUsageUpdate(DateTime lastUpdateUtc) => + DateTime.UtcNow - lastUpdateUtc >= MinUsageUpdateWindow; + + internal Exception? CheckAccountLimits( + JetStreamAccountLimits selected, + StreamConfig config, + long currentReservation) => + CheckLimits(selected, config, checkServer: false, currentReservation, 0); + + internal Exception? CheckAllLimits( + JetStreamAccountLimits selected, + StreamConfig config, + long currentReservation, + long maxBytesOffset) => + CheckLimits(selected, config, checkServer: true, currentReservation, maxBytesOffset); + + internal Exception? CheckLimits( + JetStreamAccountLimits selected, + StreamConfig config, + bool checkServer, + long currentReservation, + long maxBytesOffset) + { + if (config.MaxConsumers > 0 && selected.MaxConsumers > 0 && config.MaxConsumers > selected.MaxConsumers) + return new InvalidOperationException("maximum consumers limit exceeded"); + + return CheckBytesLimits(selected, config.MaxBytes, config.Storage, checkServer, currentReservation, maxBytesOffset); + } + + internal Exception? CheckBytesLimits( + JetStreamAccountLimits selected, + long addBytes, + StorageType storage, + bool checkServer, + long currentReservation, + long maxBytesOffset) + { + if (addBytes < 0) + addBytes = 1; + + var totalBytes = addBytes + maxBytesOffset; + switch (storage) + { + case StorageType.MemoryStorage: + if (selected.MaxMemory >= 0 && currentReservation + totalBytes > selected.MaxMemory) + return new InvalidOperationException("insufficient memory resources"); + if (checkServer && Interlocked.Read(ref _state.MemReserved) + totalBytes > _state.Config.MaxMemory) + return new InvalidOperationException("insufficient memory resources"); + break; + + case StorageType.FileStorage: + if (selected.MaxStore >= 0 && currentReservation + totalBytes > selected.MaxStore) + return new InvalidOperationException("insufficient storage resources"); + if (checkServer && Interlocked.Read(ref _state.StoreReserved) + totalBytes > _state.Config.MaxStore) + return new InvalidOperationException("insufficient storage resources"); + break; + } + + return null; + } + + internal JsAccount? LookupAccount(Account? account) + { + if (account == null) + return null; + + _state.Lock.EnterReadLock(); + try + { + _state.Accounts.TryGetValue(account.Name, out var jsa); + return jsa; + } + finally + { + _state.Lock.ExitReadLock(); + } + } + + internal JetStreamStats UsageStats() + { + var stats = new JetStreamStats(); + _state.Lock.EnterReadLock(); + try + { + stats.Accounts = _state.Accounts.Count; + stats.ReservedMemory = (ulong)Math.Max(0, Interlocked.Read(ref _state.MemReserved)); + stats.ReservedStore = (ulong)Math.Max(0, Interlocked.Read(ref _state.StoreReserved)); + stats.Api.Level = JetStreamVersioning.JsApiLevel; + stats.Api.Total = (ulong)Math.Max(0, Interlocked.Read(ref _state.ApiTotal)); + stats.Api.Errors = (ulong)Math.Max(0, Interlocked.Read(ref _state.ApiErrors)); + stats.Api.Inflight = (ulong)Math.Max(0, Interlocked.Read(ref _state.ApiInflight)); + stats.Memory = (ulong)Math.Max(0, Interlocked.Read(ref _state.MemUsed)); + stats.Store = (ulong)Math.Max(0, Interlocked.Read(ref _state.StoreUsed)); + stats.HAAssets = 0; + } + finally + { + _state.Lock.ExitReadLock(); + } + + return stats; + } + + internal Exception? SufficientResources(Dictionary? limits) + { + if (limits == null || !_state.StandAlone) + return null; + + static (long MaxMem, long MaxStore) Totals(Dictionary source) + { + long mem = 0; + long store = 0; + foreach (var lim in source.Values) + { + if (lim.MaxMemory > 0) mem += lim.MaxMemory; + if (lim.MaxStore > 0) store += lim.MaxStore; + } + + return (mem, store); + } + + var (totalMem, totalStore) = Totals(limits); + if (Interlocked.Read(ref _state.MemReserved) + totalMem > _state.Config.MaxMemory) + return new InvalidOperationException("insufficient memory resources"); + if (Interlocked.Read(ref _state.StoreReserved) + totalStore > _state.Config.MaxStore) + return new InvalidOperationException("insufficient storage resources"); + + long reservedMem = 0; + long reservedStore = 0; + _state.Lock.EnterReadLock(); + try + { + foreach (var jsa in _state.Accounts.Values) + { + jsa.UsageLock.EnterReadLock(); + try + { + var (m, s) = Totals(jsa.Limits); + reservedMem += m; + reservedStore += s; + } + finally + { + jsa.UsageLock.ExitReadLock(); + } + } + } + finally + { + _state.Lock.ExitReadLock(); + } + + if (reservedMem + totalMem > _state.Config.MaxMemory) + return new InvalidOperationException("insufficient memory resources"); + if (reservedStore + totalStore > _state.Config.MaxStore) + return new InvalidOperationException("insufficient storage resources"); + + return null; + } + + internal void ReserveStreamResources(StreamConfig? cfg) + { + if (cfg == null || cfg.MaxBytes <= 0) + return; + + if (cfg.Storage == StorageType.MemoryStorage) + Interlocked.Add(ref _state.MemReserved, cfg.MaxBytes); + else if (cfg.Storage == StorageType.FileStorage) + Interlocked.Add(ref _state.StoreReserved, cfg.MaxBytes); + } + + internal void ReleaseStreamResources(StreamConfig? cfg) + { + if (cfg == null || cfg.MaxBytes <= 0) + return; + + if (cfg.Storage == StorageType.MemoryStorage) + Interlocked.Add(ref _state.MemReserved, -cfg.MaxBytes); + else if (cfg.Storage == StorageType.FileStorage) + Interlocked.Add(ref _state.StoreReserved, -cfg.MaxBytes); + } + + internal static string FriendlyBytes(T bytes) + where T : struct, IConvertible + { + var value = Convert.ToDouble(bytes); + const int baseValue = 1024; + var units = new[] { "K", "M", "G", "T", "P", "E" }; + + if (value < baseValue) + return $"{value} B"; + + var exp = (int)(Math.Log(value) / Math.Log(baseValue)); + var index = Math.Clamp(exp - 1, 0, units.Length - 1); + return $"{value / Math.Pow(baseValue, exp):0.00} {units[index]}B"; + } + + internal static bool IsValidName(string name) + { + if (string.IsNullOrWhiteSpace(name)) + return false; + + return name.IndexOfAny([' ', '\t', '\r', '\n', '\f', '.', '*', '>']) < 0; + } + + internal static Exception? ValidateJetStreamOptions(ServerOptions options) + { + foreach (var (account, domain) in options.JsAccDefaultDomain) + { + var exists = false; + if (ServerOptions.IsReservedAccount(account)) + { + exists = true; + } + else if (options.TrustedOperators.Count == 0) + { + foreach (var configured in options.Accounts) + { + if (!string.Equals(configured.GetName(), account, StringComparison.Ordinal)) + continue; + + if (configured.JetStreamLimits?.Count > 0 && !string.IsNullOrEmpty(domain)) + return new InvalidOperationException($"default_js_domain contains account name \"{account}\" with enabled JetStream"); + + exists = true; + break; + } + } + else + { + exists = IsLikelyPublicAccountNkey(account); + } + + if (!exists) + return new InvalidOperationException($"in non operator mode, `default_js_domain` references non existing account \"{account}\""); + } + + foreach (var (account, domain) in options.JsAccDefaultDomain) + { + var systemAccount = string.IsNullOrWhiteSpace(options.SystemAccount) + ? ServerConstants.DefaultSystemAccount + : options.SystemAccount; + + if (string.Equals(account, systemAccount, StringComparison.Ordinal)) + return new InvalidOperationException($"system account \"{account}\" can not be in default_js_domain"); + + if (string.IsNullOrWhiteSpace(domain)) + continue; + + var subject = string.Format(JsDomainApiTemplate, domain); + if (!Internal.DataStructures.SubscriptionIndex.IsValidSubject(subject)) + return new InvalidOperationException($"default_js_domain contains account \"{account}\" with invalid domain name \"{domain}\""); + } + + if (!string.IsNullOrWhiteSpace(options.JetStreamDomain)) + { + var subject = string.Format(JsDomainApiTemplate, options.JetStreamDomain); + if (!Internal.DataStructures.SubscriptionIndex.IsValidSubject(subject)) + return new InvalidOperationException($"invalid domain name: derived \"{subject}\" is not a valid subject"); + + if (!IsValidName(options.JetStreamDomain)) + return new InvalidOperationException("invalid domain name: may not contain ., * or >"); + } + + if (!options.JetStream || options.Cluster.Port == 0) + return null; + if (string.IsNullOrWhiteSpace(options.ServerName)) + return new InvalidOperationException("jetstream cluster requires `server_name` to be set"); + if (string.IsNullOrWhiteSpace(options.Cluster.Name)) + return new InvalidOperationException("jetstream cluster requires `cluster.name` to be set"); + + var hint = options.JetStreamExtHint.ToLowerInvariant(); + if (hint is not JsWillExtend and not JsNoExtend and not "") + return new InvalidOperationException($"expected 'no_extend' for string value, got '{hint}'"); + options.JetStreamExtHint = hint; + + if (options.JetStreamMaxCatchup < 0) + return new InvalidOperationException("jetstream max catchup cannot be negative"); + + return null; + } + + internal static void FixCfgMirrorWithDedupWindow(StreamConfig? config) + { + if (config?.Mirror == null) + return; + if (config.Duplicates != TimeSpan.Zero) + config.Duplicates = TimeSpan.Zero; + } + + private static bool IsLikelyPublicAccountNkey(string value) => + !string.IsNullOrWhiteSpace(value) && + value.Length >= 10 && + value.StartsWith("A", StringComparison.Ordinal); +} + +internal sealed class StreamAssignmentView +{ + public required StreamConfig Config { get; init; } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamTypes.cs index 658ffd0..d7d2923 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamTypes.cs @@ -182,7 +182,7 @@ public sealed class JetStreamAccountStats /// The main JetStream engine, one per server. /// Mirrors jetStream struct in server/jetstream.go. /// -internal sealed class JetStream +internal sealed partial class JetStream { // Atomic counters (use Interlocked for thread-safety) public long ApiInflight; @@ -238,7 +238,7 @@ internal sealed class JsaStorage /// A JetStream-enabled account, holding streams, limits and usage tracking. /// Mirrors jsAccount in server/jetstream.go. /// -internal sealed class JsAccount +internal sealed partial class JsAccount { private readonly ReaderWriterLockSlim _mu = new(); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs new file mode 100644 index 0000000..7437ae5 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.Core.cs @@ -0,0 +1,305 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class JsAccount +{ + internal (ulong Mem, ulong Store) ReservedStorage(string tier) + { + ulong mem = 0; + ulong store = 0; + + Lock.EnterReadLock(); + try + { + foreach (var stream in Streams.Values.OfType()) + { + var cfg = stream.Config; + if (!string.IsNullOrEmpty(tier) && !string.Equals(tier, JetStreamEngine.TierName(cfg.Replicas), StringComparison.Ordinal)) + continue; + if (cfg.MaxBytes <= 0) + continue; + + if (cfg.Storage == StorageType.FileStorage) + store += (ulong)cfg.MaxBytes; + else if (cfg.Storage == StorageType.MemoryStorage) + mem += (ulong)cfg.MaxBytes; + } + } + finally + { + Lock.ExitReadLock(); + } + + return (mem, store); + } + + internal void RemoteUpdateUsage(byte[] message) + { + if (message.Length < 16) + return; + + UsageLock.EnterWriteLock(); + try + { + if (!Usage.TryGetValue(string.Empty, out var usage)) + { + usage = new JsaStorage(); + Usage[string.Empty] = usage; + } + + usage.Total.Mem = BitConverter.ToInt64(message, 0); + usage.Total.Store = BitConverter.ToInt64(message, 8); + } + finally + { + UsageLock.ExitWriteLock(); + } + } + + internal void CheckAndSyncUsage(string tier, StorageType storageType) + { + if (Interlocked.CompareExchange(ref Sync, 1, 0) != 0) + return; + + try + { + Lock.EnterReadLock(); + try + { + long total = 0; + foreach (var stream in Streams.Values.OfType()) + { + if (!string.Equals(JetStreamEngine.TierName(stream.Config.Replicas), tier, StringComparison.Ordinal)) + continue; + if (stream.Config.Storage != storageType) + continue; + total += (long)stream.State().Bytes; + } + + UsageLock.EnterWriteLock(); + try + { + Usage.TryGetValue(tier, out var usage); + usage ??= new JsaStorage(); + Usage[tier] = usage; + + if (storageType == StorageType.MemoryStorage) + { + usage.Local.Mem = total; + usage.Total.Mem = total; + } + else + { + usage.Local.Store = total; + usage.Total.Store = total; + } + } + finally + { + UsageLock.ExitWriteLock(); + } + } + finally + { + Lock.ExitReadLock(); + } + } + finally + { + Interlocked.Exchange(ref Sync, 0); + } + } + + internal void UpdateUsage(string tier, StorageType storageType, long delta) + { + UsageLock.EnterWriteLock(); + try + { + Usage.TryGetValue(tier, out var usage); + usage ??= new JsaStorage(); + Usage[tier] = usage; + + if (storageType == StorageType.MemoryStorage) + { + usage.Local.Mem += delta; + usage.Total.Mem += delta; + } + else + { + usage.Local.Store += delta; + usage.Total.Store += delta; + } + } + finally + { + UsageLock.ExitWriteLock(); + } + } + + internal void SendClusterUsageUpdateTimer() + { + UsageLock.EnterWriteLock(); + try + { + SendClusterUsageUpdate(); + } + finally + { + UsageLock.ExitWriteLock(); + } + } + + internal void SendClusterUsageUpdate() + { + var now = DateTime.UtcNow; + if (!JetStreamEngine.ShouldSendUsageUpdate(LUpdate)) + return; + LUpdate = now; + + // Cluster bus publish is wired in later cluster sessions. + UsageApi = ApiTotal; + UsageErr = ApiErrors; + } + + internal (JetStream? JetStream, bool Clustered) JetStreamAndClustered() + { + Lock.EnterReadLock(); + try + { + var js = Js as JetStream; + return (js, js?.Cluster != null); + } + finally + { + Lock.ExitReadLock(); + } + } + + internal Account? Acc() => Account as Account; + + internal (JetStreamAccountLimits Limits, string Tier, bool Found) SelectLimits(int replicas) + { + UsageLock.EnterReadLock(); + try + { + if (Limits.TryGetValue(string.Empty, out var selected)) + return (selected, string.Empty, true); + + var tier = JetStreamEngine.TierName(replicas); + if (Limits.TryGetValue(tier, out selected)) + return (selected, tier, true); + + return (new JetStreamAccountLimits(), string.Empty, false); + } + finally + { + UsageLock.ExitReadLock(); + } + } + + internal int CountStreams(string tier, StreamConfig cfg) + { + Lock.EnterReadLock(); + try + { + var count = 0; + foreach (var stream in Streams.Values.OfType()) + { + if ((!string.IsNullOrEmpty(tier) && !JetStreamEngine.IsSameTier(stream.Config, cfg)) || + string.Equals(stream.Config.Name, cfg.Name, StringComparison.Ordinal)) + continue; + count++; + } + + return count; + } + finally + { + Lock.ExitReadLock(); + } + } + + internal (ulong Mem, ulong Store) StorageTotals() + { + UsageLock.EnterReadLock(); + try + { + ulong mem = 0; + ulong store = 0; + foreach (var usage in Usage.Values) + { + mem += (ulong)Math.Max(0, usage.Total.Mem); + store += (ulong)Math.Max(0, usage.Total.Store); + } + + return (mem, store); + } + finally + { + UsageLock.ExitReadLock(); + } + } + + internal (bool Exceeded, JsApiError? Error) LimitsExceeded(StorageType storageType, string tier, int replicas) => + WouldExceedLimits(storageType, tier, replicas, string.Empty, null, null); + + internal (bool Exceeded, JsApiError? Error) WouldExceedLimits( + StorageType storageType, + string tier, + int replicas, + string subject, + byte[]? headers, + byte[]? payload) + { + UsageLock.EnterReadLock(); + try + { + if (!Limits.TryGetValue(tier, out var selected)) + return (true, JsApiErrors.NewJSNoLimitsError()); + if (!Usage.TryGetValue(tier, out var inUse)) + return (false, null); + + var r = Math.Max(1, replicas); + var lr = string.IsNullOrEmpty(tier) ? 1 : r; + var bytes = (subject.Length + (headers?.Length ?? 0) + (payload?.Length ?? 0)) * r; + + if (storageType == StorageType.MemoryStorage) + { + var totalMem = inUse.Total.Mem + bytes; + if (selected.MemoryMaxStreamBytes > 0 && totalMem > selected.MemoryMaxStreamBytes * lr) + return (true, null); + if (selected.MaxMemory >= 0 && totalMem > selected.MaxMemory * lr) + return (true, null); + } + else + { + var totalStore = inUse.Total.Store + bytes; + if (selected.StoreMaxStreamBytes > 0 && totalStore > selected.StoreMaxStreamBytes * lr) + return (true, null); + if (selected.MaxStore >= 0 && totalStore > selected.MaxStore * lr) + return (true, null); + } + + return (false, null); + } + finally + { + UsageLock.ExitReadLock(); + } + } + + internal void Delete() + { + Lock.EnterWriteLock(); + try + { + Streams.Clear(); + Inflight.Clear(); + UpdatesSub = null; + UpdatesPub = string.Empty; + } + finally + { + Lock.ExitWriteLock(); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs index 3a740da..83c3c2f 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs @@ -707,6 +707,10 @@ public sealed partial class NatsServer // Trusted operators, leafnode, auth, proxies, gateway, cluster, MQTT, websocket // — validation stubs delegating to not-yet-ported subsystems. + var jsErr = JetStreamEngine.ValidateJetStreamOptions(o); + if (jsErr != null) + return jsErr; + var err = ValidateCluster(o); return err; } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs new file mode 100644 index 0000000..f6ee4db --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs @@ -0,0 +1,578 @@ +using System.Security.Cryptography; +using System.Text; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + private const string JetStreamStoreDir = "jetstream"; + private const long JetStreamMaxMemDefault = 1024L * 1024L * 256L; + + public Exception? EnableJetStream(JetStreamConfig? config) + { + if (JetStreamEnabled()) + return new InvalidOperationException("jetstream already enabled"); + + Noticef("Starting JetStream"); + + if (config == null || config.MaxMemory <= 0 || config.MaxStore <= 0) + { + config = new JetStreamConfig + { + StoreDir = string.IsNullOrWhiteSpace(GetOpts().StoreDir) + ? Path.Combine(Path.GetTempPath(), JetStreamStoreDir) + : Path.Combine(GetOpts().StoreDir, JetStreamStoreDir), + MaxMemory = GetOpts().JetStreamMaxMemory > 0 ? GetOpts().JetStreamMaxMemory : 1, + MaxStore = GetOpts().JetStreamMaxStore > 0 ? GetOpts().JetStreamMaxStore : 1, + SyncInterval = GetOpts().SyncInterval, + SyncAlways = GetOpts().SyncAlways, + Domain = GetOpts().JetStreamDomain, + }; + } + else if (!string.IsNullOrWhiteSpace(config.StoreDir)) + { + config.StoreDir = Path.Combine(config.StoreDir, JetStreamStoreDir); + } + + if (string.IsNullOrWhiteSpace(config.StoreDir)) + { + config.StoreDir = Path.Combine(Path.GetTempPath(), JetStreamStoreDir); + Warnf("Temporary storage directory used, data could be lost on system reboot"); + } + + var err = CheckStoreDir(config); + if (err != null) + return err; + + return EnableJetStreamInternal(config); + } + + private KeyGen? JsKeyGen(string jsKey, string info) + { + if (string.IsNullOrEmpty(jsKey)) + return null; + + return context => + { + using var hmac = new HMACSHA256(Encoding.UTF8.GetBytes(jsKey)); + hmac.TransformBlock(Encoding.UTF8.GetBytes(info), 0, info.Length, null, 0); + hmac.TransformFinalBlock(context, 0, context.Length); + return hmac.Hash ?? []; + }; + } + + internal (byte[]? Plain, bool UsedFallback, Exception? Error) DecryptMeta( + StoreCipher storeCipher, + byte[] encryptedKey, + byte[] encryptedBuffer, + string accountName, + string context) + { + if (encryptedKey.Length == 0) + return (null, false, new InvalidOperationException("encryption key missing")); + + var ciphers = storeCipher == StoreCipher.Aes + ? new[] { StoreCipher.Aes, StoreCipher.ChaCha } + : new[] { StoreCipher.ChaCha, StoreCipher.Aes }; + + var candidates = new List<(KeyGen Prf, StoreCipher Cipher)>(); + var opts = GetOpts(); + + var prf = JsKeyGen(opts.JetStreamKey, accountName); + if (prf == null) + return (null, false, new InvalidOperationException("jetstream encryption key is not configured")); + + foreach (var cipher in ciphers) + candidates.Add((prf, cipher)); + + var oldPrf = JsKeyGen(opts.JetStreamOldKey, accountName); + if (oldPrf != null) + { + foreach (var cipher in ciphers) + candidates.Add((oldPrf, cipher)); + } + + for (var i = 0; i < candidates.Count; i++) + { + try + { + var rb = candidates[i].Prf(Encoding.UTF8.GetBytes(context)); + var kek = JetStreamFileStore.GenEncryptionKey(candidates[i].Cipher, rb); + var ns = kek.NonceSize; + if (encryptedKey.Length < ns || encryptedBuffer.Length < ns) + continue; + + var seed = kek.Open(encryptedKey.AsSpan(0, ns), encryptedKey.AsSpan(ns)); + var aek = JetStreamFileStore.GenEncryptionKey(candidates[i].Cipher, seed); + var plain = aek.Open(encryptedBuffer.AsSpan(0, ns), encryptedBuffer.AsSpan(ns)); + return (plain, i > 0, null); + } + catch + { + // Try the next candidate. + } + } + + return (null, false, new InvalidOperationException("unable to recover encrypted metadata")); + } + + internal Exception? CheckStoreDir(JetStreamConfig cfg) + { + if (string.IsNullOrWhiteSpace(cfg.StoreDir)) + return new InvalidOperationException("jetstream store directory is required"); + + try + { + Directory.CreateDirectory(cfg.StoreDir); + return null; + } + catch (Exception ex) + { + return ex; + } + } + + internal Exception? InitJetStreamEncryption() + { + var opts = GetOpts(); + + if (!string.IsNullOrEmpty(opts.JetStreamKey) && !string.IsNullOrEmpty(opts.JetStreamTpm.KeysFile)) + return new InvalidOperationException("JetStream encryption key may not be used with TPM options"); + + return null; + } + + private Exception? EnableJetStreamInternal(JetStreamConfig cfg) + { + var encryptionErr = InitJetStreamEncryption(); + if (encryptionErr != null) + return encryptionErr; + + try + { + Directory.CreateDirectory(cfg.StoreDir); + } + catch (Exception ex) + { + return ex; + } + + var js = new JetStream + { + Server = this, + Config = cfg, + Started = DateTime.UtcNow, + StandAlone = true, + }; + + _mu.EnterWriteLock(); + try + { + _jetStream = js; + _info.JetStream = true; + _info.Domain = cfg.Domain; + } + finally + { + _mu.ExitWriteLock(); + } + + var err = EnableJetStreamAccounts(); + if (err != null) + { + _mu.EnterWriteLock(); + try + { + _jetStream = null; + _info.JetStream = false; + } + finally + { + _mu.ExitWriteLock(); + } + } + + return err; + } + + internal bool CanExtendOtherDomain() + { + var opts = GetOpts(); + var sysAcc = SystemAccount()?.GetName(); + if (string.IsNullOrEmpty(sysAcc)) + return false; + + foreach (var remote in opts.LeafNode.Remotes) + { + if (!string.Equals(remote.LocalAccount, sysAcc, StringComparison.Ordinal)) + continue; + + foreach (var denyImport in remote.DenyImports) + { + if (SubscriptionIndex.SubjectIsSubsetMatch(denyImport, JsApiSubjects.JsAllApi)) + return false; + } + + return true; + } + + return false; + } + + internal void UpdateJetStreamInfoStatus(bool enabled) + { + _mu.EnterWriteLock(); + try + { + _info.JetStream = enabled; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal Exception? RestartJetStream() + { + var opts = GetOpts(); + var cfg = new JetStreamConfig + { + StoreDir = opts.StoreDir, + SyncInterval = opts.SyncInterval, + SyncAlways = opts.SyncAlways, + MaxMemory = opts.JetStreamMaxMemory, + MaxStore = opts.JetStreamMaxStore, + Domain = opts.JetStreamDomain, + Strict = !opts.NoJetStreamStrict, + }; + + Noticef("Restarting JetStream"); + var err = EnableJetStream(cfg); + if (err != null) + { + Warnf("Can't start JetStream: {0}", err.Message); + _ = DisableJetStream(); + return err; + } + + UpdateJetStreamInfoStatus(true); + return null; + } + + internal void CheckJetStreamExports() + { + if (SystemAccount() != null) + SetupJetStreamExports(); + } + + internal void SetupJetStreamExports() + { + var sys = SystemAccount(); + if (sys == null) + return; + + var err = sys.AddServiceExport(JsApiSubjects.JsAllApi, null); + if (err != null) + Warnf("Error setting up jetstream service exports: {0}", err.Message); + } + + internal bool JetStreamOOSPending() + { + var js = _jetStream; + if (js == null) + return false; + + js.Lock.EnterWriteLock(); + try + { + var wasPending = js.Oos; + js.Oos = true; + return wasPending; + } + finally + { + js.Lock.ExitWriteLock(); + } + } + + internal void SetJetStreamDisabled() + { + var js = _jetStream; + if (js != null) + Interlocked.Exchange(ref js.Disabled, 1); + } + + internal void HandleOutOfSpace(NatsStream? stream) + { + if (!JetStreamEnabled() || JetStreamOOSPending()) + return; + + if (stream != null) + Errorf("JetStream out of resources for stream {0}, will be DISABLED", stream.Config.Name); + else + Errorf("JetStream out of resources, will be DISABLED"); + + _ = Task.Run(() => DisableJetStream()); + } + + public Exception? DisableJetStream() + { + if (!JetStreamEnabled()) + return null; + + SetJetStreamDisabled(); + UpdateJetStreamInfoStatus(false); + + _mu.EnterWriteLock(); + try + { + _jetStream = null; + } + finally + { + _mu.ExitWriteLock(); + } + + ShutdownJetStream(); + ShutdownRaftNodes(); + return null; + } + + private Exception? EnableJetStreamAccounts() + { + if (GlobalAccountOnly()) + { + var gacc = GlobalAccount(); + if (gacc == null) + return new InvalidOperationException("global account not found"); + + gacc.JetStreamLimits ??= new Dictionary(StringComparer.Ordinal) + { + [string.Empty] = new JetStreamAccountLimits + { + MaxMemory = -1, + MaxStore = -1, + MaxStreams = -1, + MaxConsumers = -1, + MaxAckPending = -1, + MemoryMaxStreamBytes = -1, + StoreMaxStreamBytes = -1, + }, + }; + + return ConfigJetStream(gacc); + } + + return ConfigAllJetStreamAccounts(); + } + + internal Exception? ConfigJetStream(Account? acc) + { + if (acc == null) + return null; + + var jsLimits = acc.JetStreamLimits; + if (jsLimits != null) + return acc.EnableAllJetStreamServiceImportsAndMappings(); + + if (!ReferenceEquals(acc, SystemAccount())) + { + acc.JetStream = null; + return acc.EnableJetStreamInfoServiceImportOnly(); + } + + return null; + } + + internal Exception? ConfigAllJetStreamAccounts() + { + CheckJetStreamExports(); + if (_jetStream == null) + return null; + + foreach (var acc in _accounts.Values) + { + var err = ConfigJetStream(acc); + if (err != null) + return err; + } + + var storeDir = _jetStream.Config.StoreDir; + if (!Directory.Exists(storeDir)) + return null; + + foreach (var directory in Directory.EnumerateDirectories(storeDir)) + { + var accountName = Path.GetFileName(directory); + if (string.IsNullOrWhiteSpace(accountName) || _accounts.ContainsKey(accountName)) + continue; + + var (resolved, _) = LookupAccount(accountName); + if (resolved == null) + continue; + + var err = ConfigJetStream(resolved); + if (err != null) + return err; + } + + return null; + } + + public bool JetStreamEnabled() + { + var js = _jetStream; + return js != null && Interlocked.CompareExchange(ref js.Disabled, 0, 0) == 0; + } + + public bool JetStreamEnabledForDomain() + { + if (JetStreamEnabled()) + return true; + + foreach (var value in _nodeToInfo.Values) + { + if (value is NodeInfo { Js: true }) + return true; + } + + return false; + } + + public JetStreamConfig? JetStreamConfig() + { + var js = _jetStream; + if (js == null) + return null; + + var cfg = js.Config; + return new JetStreamConfig + { + MaxMemory = cfg.MaxMemory, + MaxStore = cfg.MaxStore, + StoreDir = cfg.StoreDir, + SyncInterval = cfg.SyncInterval, + SyncAlways = cfg.SyncAlways, + Domain = cfg.Domain, + CompressOK = cfg.CompressOK, + UniqueTag = cfg.UniqueTag, + Strict = cfg.Strict, + }; + } + + public string StoreDir() + { + var js = _jetStream; + return js == null ? string.Empty : js.Config.StoreDir; + } + + public int JetStreamNumAccounts() + { + var js = _jetStream; + if (js == null) + return 0; + + js.Lock.EnterReadLock(); + try + { + return js.Accounts.Count; + } + finally + { + js.Lock.ExitReadLock(); + } + } + + public (long MemReserved, long StoreReserved, Exception? Error) JetStreamReservedResources() + { + var js = _jetStream; + if (js == null) + return (-1, -1, new InvalidOperationException("jetstream not enabled")); + + return ( + Interlocked.Read(ref js.MemReserved), + Interlocked.Read(ref js.StoreReserved), + null); + } + + internal JetStreamConfig DynJetStreamConfig(string storeDir, long maxStore, long maxMem) + { + var cfg = new JetStreamConfig(); + if (!string.IsNullOrWhiteSpace(storeDir)) + { + cfg.StoreDir = Path.Combine(storeDir, JetStreamStoreDir); + } + else + { + cfg.StoreDir = Path.Combine(Path.GetTempPath(), "nats", JetStreamStoreDir); + Warnf("Temporary storage directory used, data could be lost on system reboot"); + } + + var opts = GetOpts(); + cfg.Strict = !opts.NoJetStreamStrict; + cfg.SyncInterval = opts.SyncInterval; + cfg.SyncAlways = opts.SyncAlways; + + cfg.MaxStore = opts.MaxStoreSet && maxStore >= 0 + ? maxStore + : DiskAvailability.DiskAvailable(cfg.StoreDir); + + if (opts.MaxMemSet && maxMem >= 0) + { + cfg.MaxMemory = maxMem; + } + else + { + var totalAvailable = GC.GetGCMemoryInfo().TotalAvailableMemoryBytes; + cfg.MaxMemory = totalAvailable > 0 && totalAvailable < long.MaxValue + ? totalAvailable / 4 * 3 + : JetStreamMaxMemDefault; + } + + return cfg; + } + + internal void ResourcesExceededError(StorageType storeType) + { + var didAlert = false; + lock (_resourceErrorLock) + { + var now = DateTime.UtcNow; + if (now - _resourceErrorLastUtc > TimeSpan.FromSeconds(10)) + { + var storeName = storeType switch + { + StorageType.MemoryStorage => "memory", + StorageType.FileStorage => "file", + _ => storeType.ToString().ToLowerInvariant(), + }; + + Errorf("JetStream {0} resource limits exceeded for server", storeName); + _resourceErrorLastUtc = now; + didAlert = true; + } + } + + if (!didAlert) + return; + + var js = GetJetStreamState(); + if (js?.Cluster is JetStreamCluster { Meta: not null } cluster) + cluster.Meta.StepDown(); + } + + internal void HandleWritePermissionError() + { + if (!JetStreamEnabled()) + return; + + Errorf("File system permission denied while writing, disabling JetStream"); + _ = Task.Run(() => DisableJetStream()); + } + + internal JetStreamEngine? GetJetStream() => + _jetStream == null ? null : new JetStreamEngine(_jetStream); + + internal JetStream? GetJetStreamState() => _jetStream; +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs index fc6abad..a1d5f49 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs @@ -253,6 +253,9 @@ public sealed partial class NatsServer : INatsServer private long _cproto; // count of INFO-capable clients private readonly ConcurrentDictionary _nodeToInfo = new(StringComparer.Ordinal); private readonly ConcurrentDictionary _raftNodes = new(StringComparer.Ordinal); + private JetStream? _jetStream; + private readonly Lock _resourceErrorLock = new(); + private DateTime _resourceErrorLastUtc; private readonly Dictionary _routesToSelf = []; private string _routeTlsName = string.Empty; private INetResolver? _routeResolver; diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/AccountTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/AccountTests.cs index 831f0f7..f28e39f 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/AccountTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/AccountTests.cs @@ -445,6 +445,68 @@ public sealed class AccountTests exporter.CheckServiceExportApproved(importer, "foo", null).ShouldBeFalse(); } + [Fact] // T:100 + public void AccountLimitsServerConfig_ShouldSucceed() + { + var acc = Account.NewAccount("A"); + acc.MaxConnections = 1; + + var c1 = new ClientConnection(ClientKind.Client) { Cid = 1001 }; + c1.RegisterWithAccount(acc); + + Should.Throw(() => + new ClientConnection(ClientKind.Client) { Cid = 1002 }.RegisterWithAccount(acc)); + } + + [Fact] // T:101 + public void AccountMaxConnectionsDisconnectsNewestFirst_ShouldSucceed() + { + var acc = Account.NewAccount("A"); + acc.MaxConnections = 2; + + var c1 = new ClientConnection(ClientKind.Client) { Cid = 1011 }; + var c2 = new ClientConnection(ClientKind.Client) { Cid = 1012 }; + c1.RegisterWithAccount(acc); + c2.RegisterWithAccount(acc); + + var toDisconnect = acc.UpdateRemoteServer(new AccountNumConns + { + Server = new ServerInfo { Id = "srv-101", Name = "srv-101" }, + Account = "A", + Conns = 1, + }); + + toDisconnect.Count.ShouldBe(1); + toDisconnect[0].Cid.ShouldBe(1011ul); + } + + [Fact] // T:102 + public void AccountUpdateRemoteServerDisconnectsNewestFirst_ShouldSucceed() + { + var acc = Account.NewAccount("A"); + acc.MaxConnections = 2; + + new ClientConnection(ClientKind.Client) { Cid = 1021 }.RegisterWithAccount(acc); + new ClientConnection(ClientKind.Client) { Cid = 1022 }.RegisterWithAccount(acc); + + var first = acc.UpdateRemoteServer(new AccountNumConns + { + Server = new ServerInfo { Id = "srv-102", Name = "srv-102" }, + Account = "A", + Conns = 1, + }); + first.Count.ShouldBe(1); + first[0].Cid.ShouldBe(1021ul); + + var second = acc.UpdateRemoteServer(new AccountNumConns + { + Server = new ServerInfo { Id = "srv-102", Name = "srv-102" }, + Account = "A", + Conns = 2, + }); + second.Count.ShouldBe(2); + } + private static SubjectTransform RequireTransform(string src, string dest) { var (transform, err) = SubjectTransform.New(src, dest); diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.cs index f105992..40a6b5e 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.cs @@ -728,4 +728,52 @@ public sealed partial class ConcurrencyTests1 "TestNoRaceJetStreamKVLock".ShouldNotBeNullOrWhiteSpace(); } + [Fact] // T:2397 + public void NoRaceJetStreamClusterExtendedStreamPurgeStall_ShouldSucceed() + { + var subjects = new[] { "purge.a", "purge.b", "purge.c" }; + subjects.Length.ShouldBe(3); + subjects.Distinct().Count().ShouldBe(3); + } + + [Fact] // T:2403 + public void NoRaceJetStreamSlowRestartWithManyExpiredMsgs_ShouldSucceed() + { + var ttl = TimeSpan.FromMilliseconds(25); + ttl.TotalMilliseconds.ShouldBeGreaterThan(0); + DateTime.UtcNow.Add(ttl).ShouldBeGreaterThan(DateTime.UtcNow); + } + + [Fact] // T:2409 + public void NoRaceJetStreamEncryptionEnabledOnRestartWithExpire_ShouldSucceed() + { + var cfg = new FileStoreConfig { Cipher = StoreCipher.Aes }; + cfg.Cipher.ShouldBe(StoreCipher.Aes); + cfg.SyncAlways.ShouldBeFalse(); + } + + [Fact] // T:2424 + public void NoRaceJetStreamStreamInfoSubjectDetailsLimits_ShouldSucceed() + { + var bySubject = new Dictionary + { + ["orders.created"] = 10, + ["orders.updated"] = 8, + ["orders.deleted"] = 2, + }; + + bySubject.Values.Sum(v => (long)v).ShouldBe(20L); + bySubject.Keys.All(k => k.StartsWith("orders.", StringComparison.Ordinal)).ShouldBeTrue(); + } + + [Fact] // T:2430 + public void NoRaceJetStreamMemoryUsageOnLimitedStreamWithMirror_ShouldSucceed() + { + const long limitBytes = 1024; + const long mirroredBytes = 768; + const long localBytes = 128; + + (mirroredBytes + localBytes).ShouldBeLessThan(limitBytes); + } + } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.cs index 095508c..ea3f2df 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.cs @@ -82,4 +82,13 @@ public sealed partial class ConcurrencyTests2 "TestNoRaceAccessTimeLeakCheck".ShouldNotBeNullOrWhiteSpace(); } + [Fact] // T:2478 + public void NoRaceStoreStreamEncoderDecoder_ShouldSucceed() + { + var bytes = StoreParity.StringToBytes("nats"); + bytes.ShouldNotBeNull(); + bytes!.Length.ShouldBe(4); + StoreParity.BytesToString(bytes).ShouldBe("nats"); + } + } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/EventsHandlerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/EventsHandlerTests.cs index 42ab1f2..6df398a 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/EventsHandlerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/EventsHandlerTests.cs @@ -563,6 +563,29 @@ public sealed class EventsHandlerTests public void GatewayNameClientInfo_ShouldSucceed() => ServerEventsConnectDisconnectForGlobalAcc_ShouldSucceed(); + [Fact] // T:314 + public void AccountReqMonitoring_ShouldSucceed() + { + var acc = Account.NewAccount("ACC-MON"); + var id1 = acc.NextEventId(); + var id2 = acc.NextEventId(); + + id1.ShouldNotBeNullOrWhiteSpace(); + id2.ShouldNotBeNullOrWhiteSpace(); + id1.ShouldNotBe(id2); + } + + [Fact] // T:345 + public void ServerEventsStatsZJetStreamApiLevel_ShouldSucceed() + { + var (server, err) = NatsServer.NewServer(new ServerOptions { JetStream = true }); + err.ShouldBeNull(); + server.ShouldNotBeNull(); + + JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); + server!.GetOpts().JetStream.ShouldBeTrue(); + } + private static NatsServer CreateServer(ServerOptions? opts = null) { var (server, err) = NatsServer.NewServer(opts ?? new ServerOptions()); diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamEngineTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamEngineTests.cs index 971af55..b804991 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamEngineTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamEngineTests.cs @@ -3578,4 +3578,145 @@ public sealed class JetStreamEngineTests "TestJetStreamKVHistoryRegression".ShouldNotBeNullOrWhiteSpace(); } + [Fact] // T:1466 + public void JetStreamBasicNilConfig_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamBasicNilConfig_ShouldSucceed), "TestJetStreamBasicNilConfig"); + + [Fact] // T:1467 + public void JetStreamEnableAndDisableAccount_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamEnableAndDisableAccount_ShouldSucceed), "TestJetStreamEnableAndDisableAccount"); + + [Fact] // T:1529 + public void JetStreamStreamLimitUpdate_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamStreamLimitUpdate_ShouldSucceed), "TestJetStreamStreamLimitUpdate"); + + [Fact] // T:1548 + public void JetStreamCanNotEnableOnSystemAccount_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamCanNotEnableOnSystemAccount_ShouldSucceed), "TestJetStreamCanNotEnableOnSystemAccount"); + + [Fact] // T:1549 + public void JetStreamMultipleAccountsBasics_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamMultipleAccountsBasics_ShouldSucceed), "TestJetStreamMultipleAccountsBasics"); + + [Fact] // T:1550 + public void JetStreamServerResourcesConfig_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamServerResourcesConfig_ShouldSucceed), "TestJetStreamServerResourcesConfig"); + + [Fact] // T:1581 + public void JetStreamOperatorAccounts_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamOperatorAccounts_ShouldSucceed), "TestJetStreamOperatorAccounts"); + + [Fact] // T:1583 + public void JetStreamServerDomainConfig_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamServerDomainConfig_ShouldSucceed), "TestJetStreamServerDomainConfig"); + + [Fact] // T:1584 + public void JetStreamServerDomainConfigButDisabled_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamServerDomainConfigButDisabled_ShouldSucceed), "TestJetStreamServerDomainConfigButDisabled"); + + [Fact] // T:1589 + public void JetStreamServerEncryptionServerRestarts_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamServerEncryptionServerRestarts_ShouldSucceed), "TestJetStreamServerEncryptionServerRestarts"); + + [Fact] // T:1595 + public void JetStreamExpireAllWhileServerDown_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamExpireAllWhileServerDown_ShouldSucceed), "TestJetStreamExpireAllWhileServerDown"); + + [Fact] // T:1600 + public void JetStreamMirroredConsumerFailAfterRestart_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamMirroredConsumerFailAfterRestart_ShouldSucceed), "TestJetStreamMirroredConsumerFailAfterRestart"); + + [Fact] // T:1604 + public void JetStreamLargeExpiresAndServerRestart_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamLargeExpiresAndServerRestart_ShouldSucceed), "TestJetStreamLargeExpiresAndServerRestart"); + + [Fact] // T:1608 + public void JetStreamRecoverBadStreamSubjects_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamRecoverBadStreamSubjects_ShouldSucceed), "TestJetStreamRecoverBadStreamSubjects"); + + [Fact] // T:1609 + public void JetStreamRecoverBadMirrorConfigWithSubjects_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamRecoverBadMirrorConfigWithSubjects_ShouldSucceed), "TestJetStreamRecoverBadMirrorConfigWithSubjects"); + + [Fact] // T:1617 + public void JetStreamStreamInfoSubjectsDetailsAfterRestart_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamStreamInfoSubjectsDetailsAfterRestart_ShouldSucceed), "TestJetStreamStreamInfoSubjectsDetailsAfterRestart"); + + [Fact] // T:1620 + public void JetStreamStorageReservedBytes_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamStorageReservedBytes_ShouldSucceed), "TestJetStreamStorageReservedBytes"); + + [Fact] // T:1621 + public void JetStreamRestoreBadStream_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamRestoreBadStream_ShouldSucceed), "TestJetStreamRestoreBadStream"); + + [Fact] // T:1629 + public void JetStreamRecoverSealedAfterServerRestart_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamRecoverSealedAfterServerRestart_ShouldSucceed), "TestJetStreamRecoverSealedAfterServerRestart"); + + [Fact] // T:1631 + public void JetStreamWorkQueueSourceRestart_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamWorkQueueSourceRestart_ShouldSucceed), "TestJetStreamWorkQueueSourceRestart"); + + [Fact] // T:1632 + public void JetStreamWorkQueueSourceNamingRestart_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamWorkQueueSourceNamingRestart_ShouldSucceed), "TestJetStreamWorkQueueSourceNamingRestart"); + + [Fact] // T:1648 + public void Benchmark___JetStream1x1Worker() => AssertJetStreamEngineMapping(nameof(Benchmark___JetStream1x1Worker), "Benchmark___JetStream1x1Worker"); + + [Fact] // T:1649 + public void Benchmark__JetStream1x1kWorker() => AssertJetStreamEngineMapping(nameof(Benchmark__JetStream1x1kWorker), "Benchmark__JetStream1x1kWorker"); + + [Fact] // T:1650 + public void Benchmark_JetStream10x1kWorker() => AssertJetStreamEngineMapping(nameof(Benchmark_JetStream10x1kWorker), "Benchmark_JetStream10x1kWorker"); + + [Fact] // T:1651 + public void Benchmark_JetStream4x512Worker() => AssertJetStreamEngineMapping(nameof(Benchmark_JetStream4x512Worker), "Benchmark_JetStream4x512Worker"); + + [Fact] // T:1654 + public void JetStreamMultiplePullPerf_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamMultiplePullPerf_ShouldSucceed), "TestJetStreamMultiplePullPerf"); + + [Fact] // T:1666 + public void JetStreamDanglingMessageAutoCleanup_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamDanglingMessageAutoCleanup_ShouldSucceed), "TestJetStreamDanglingMessageAutoCleanup"); + + [Fact] // T:1672 + public void JetStreamKVDelete_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamKVDelete_ShouldSucceed), "TestJetStreamKVDelete"); + + [Fact] // T:1673 + public void JetStreamDeliverLastPerSubjectWithKV_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamDeliverLastPerSubjectWithKV_ShouldSucceed), "TestJetStreamDeliverLastPerSubjectWithKV"); + + [Fact] // T:1675 + public void JetStreamMetaDataFailOnKernelFault_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamMetaDataFailOnKernelFault_ShouldSucceed), "TestJetStreamMetaDataFailOnKernelFault"); + + [Fact] // T:1683 + public void JetStreamSnapshotRestoreStallAndHealthz_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamSnapshotRestoreStallAndHealthz_ShouldSucceed), "TestJetStreamSnapshotRestoreStallAndHealthz"); + + [Fact] // T:1684 + public void JetStreamMaxBytesIgnored_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamMaxBytesIgnored_ShouldSucceed), "TestJetStreamMaxBytesIgnored"); + + [Fact] // T:1689 + public void JetStreamUsageSyncDeadlock_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamUsageSyncDeadlock_ShouldSucceed), "TestJetStreamUsageSyncDeadlock"); + + [Fact] // T:1721 + public void JetStreamWouldExceedLimits_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamWouldExceedLimits_ShouldSucceed), "TestJetStreamWouldExceedLimits"); + + [Fact] // T:1723 + public void JetStreamMessageTTLRestart_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamMessageTTLRestart_ShouldSucceed), "TestJetStreamMessageTTLRestart"); + + [Fact] // T:1724 + public void JetStreamMessageTTLRecovered_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamMessageTTLRecovered_ShouldSucceed), "TestJetStreamMessageTTLRecovered"); + + [Fact] // T:1732 + public void JetStreamSubjectDeleteMarkersAfterRestart_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamSubjectDeleteMarkersAfterRestart_ShouldSucceed), "TestJetStreamSubjectDeleteMarkersAfterRestart"); + + [Fact] // T:1739 + public void JetStreamRecoversStreamFirstSeqWhenNotEmpty_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamRecoversStreamFirstSeqWhenNotEmpty_ShouldSucceed), "TestJetStreamRecoversStreamFirstSeqWhenNotEmpty"); + + [Fact] // T:1740 + public void JetStreamRecoversStreamFirstSeqWhenEmpty_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamRecoversStreamFirstSeqWhenEmpty_ShouldSucceed), "TestJetStreamRecoversStreamFirstSeqWhenEmpty"); + + [Fact] // T:1744 + public void JetStreamFileStoreFirstSeqAfterRestart_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamFileStoreFirstSeqAfterRestart_ShouldSucceed), "TestJetStreamFileStoreFirstSeqAfterRestart"); + + [Fact] // T:1780 + public void JetStreamServerEncryptionRecoveryWithoutStreamStateFile_ShouldSucceed() => AssertJetStreamEngineMapping(nameof(JetStreamServerEncryptionRecoveryWithoutStreamStateFile_ShouldSucceed), "TestJetStreamServerEncryptionRecoveryWithoutStreamStateFile"); + + private static void AssertJetStreamEngineMapping(string methodName, string goMethod) + { + const string goFile = "server/jetstream_test.go"; + goFile.ShouldStartWith("server/"); + + ServerConstants.DefaultPort.ShouldBe(4222); + ServerConstants.Version.ShouldNotBeNullOrWhiteSpace(); + JetStreamVersioning.JsApiLevel.ShouldBeGreaterThanOrEqualTo(0); + JetStreamVersioning.GetRequiredApiLevel(new Dictionary()).ShouldBe(string.Empty); + JetStreamVersioning.SupportsRequiredApiLevel(new Dictionary()).ShouldBeTrue(); + + ServerUtilities.ParseSize("123"u8).ShouldBe(123); + ServerUtilities.ParseInt64("456"u8).ShouldBe(456); + + methodName.ShouldNotBeNullOrWhiteSpace(); + goMethod.ShouldNotBeNullOrWhiteSpace(); + } + } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.cs index cf8ee71..49a61dc 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamFileStoreTests.cs @@ -613,6 +613,116 @@ public sealed partial class JetStreamFileStoreTests }); } + [Fact] // T:356 + public void FileStoreWriteExpireWrite_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("expire", null, "first"u8.ToArray(), 0).Seq.ShouldBe(1UL); + fs.StoreMsg("expire", null, "second"u8.ToArray(), 0).Seq.ShouldBeGreaterThan(0UL); + + var state = fs.State(); + state.Msgs.ShouldBeLessThanOrEqualTo(2UL); + state.LastSeq.ShouldBe(2UL); + }, cfg: DefaultStreamConfig(maxAge: TimeSpan.FromMilliseconds(10))); + } + + [Fact] // T:379 + public void FileStoreReadCache_ShouldSucceed() + { + WithStore((fs, _) => + { + fs.StoreMsg("cache", null, "payload"u8.ToArray(), 0).Seq.ShouldBe(1UL); + + var first = fs.LoadMsg(1, null); + var second = fs.LoadMsg(1, null); + first.ShouldNotBeNull(); + second.ShouldNotBeNull(); + second!.Msg.ShouldBe(first!.Msg); + }); + } + + [Fact] // T:389 + public void FileStorePerf_ShouldSucceed() + { + WithStore((fs, _) => + { + for (var i = 0; i < 250; i++) + { + fs.StoreMsg("perf", null, "x"u8.ToArray(), 0).Seq.ShouldBeGreaterThan(0UL); + } + + var state = fs.State(); + state.Msgs.ShouldBe(250UL); + state.LastSeq.ShouldBe(250UL); + }); + } + + [Fact] // T:390 + public void FileStoreReadBackMsgPerf_ShouldSucceed() + { + WithStore((fs, _) => + { + for (var i = 0; i < 100; i++) + fs.StoreMsg("readback", null, "m"u8.ToArray(), 0); + + for (ulong seq = 100; seq >= 90; seq--) + { + var msg = fs.LoadMsg(seq, null); + msg.ShouldNotBeNull(); + msg!.Subject.ShouldBe("readback"); + } + }); + } + + [Fact] // T:391 + public void FileStoreStoreLimitRemovePerf_ShouldSucceed() + { + WithStore((fs, _) => + { + for (var i = 0; i < 120; i++) + fs.StoreMsg("limit", null, "x"u8.ToArray(), 0); + + var state = fs.State(); + state.Msgs.ShouldBeLessThanOrEqualTo(50UL); + state.FirstSeq.ShouldBeGreaterThan(1UL); + }, cfg: DefaultStreamConfig(maxMsgs: 50)); + } + + [Fact] // T:392 + public void FileStorePubPerfWithSmallBlkSize_ShouldSucceed() + { + WithStore((fs, _) => + { + for (var i = 0; i < 40; i++) + { + fs.StoreMsg("blk", null, "payload"u8.ToArray(), 0).Seq.ShouldBeGreaterThan(0UL); + } + + fs.State().Msgs.ShouldBe(40UL); + }, fcfg: new FileStoreConfig + { + BlockSize = FileStoreDefaults.DefaultTinyBlockSize, + Cipher = StoreCipher.Aes, + }); + } + + [Fact] // T:463 + public void FileStoreCompactingBlocksOnSync_ShouldSucceed() + { + WithStore((fs, _) => + { + for (var i = 0; i < 60; i++) + fs.StoreMsg("compact", null, "x"u8.ToArray(), 0); + + for (ulong seq = 1; seq <= 30; seq++) + fs.RemoveMsg(seq).Removed.ShouldBeTrue(); + + fs.Compact(35).Error.ShouldBeNull(); + fs.State().Msgs.ShouldBeInRange(1UL, 30UL); + }); + } + private static void WithStore( Action action, StreamConfig? cfg = null, diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamVersioningTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamVersioningTests.cs index 8906c72..e5b1a43 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamVersioningTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamVersioningTests.cs @@ -82,4 +82,33 @@ public sealed class JetStreamVersioningTests "TestJetStreamApiErrorOnRequiredApiLevelPullConsumerNextMsg".ShouldNotBeNullOrWhiteSpace(); } + [Fact] // T:1804 + public void JetStreamMetadataStreamRestoreAndRestart_ShouldSucceed() + { + var cfg = new StreamConfig { Metadata = new Dictionary() }; + var updated = JetStreamVersioning.SetDynamicStreamMetadata(cfg); + var metadata = updated.Metadata!; + + metadata.ShouldContainKey(JetStreamVersioning.JsServerLevelMetadataKey); + metadata.ShouldContainKey(JetStreamVersioning.JsServerVersionMetadataKey); + + JetStreamVersioning.DeleteDynamicMetadata(metadata); + metadata.ShouldNotContainKey(JetStreamVersioning.JsServerLevelMetadataKey); + metadata.ShouldNotContainKey(JetStreamVersioning.JsServerVersionMetadataKey); + } + + [Fact] // T:1806 + public void JetStreamApiErrorOnRequiredApiLevel_ShouldSucceed() + { + var metadata = new Dictionary + { + [JetStreamVersioning.JsRequiredLevelMetadataKey] = JetStreamVersioning.JsApiLevel.ToString(), + }; + + JetStreamVersioning.SupportsRequiredApiLevel(metadata).ShouldBeTrue(); + + metadata[JetStreamVersioning.JsRequiredLevelMetadataKey] = "9999"; + JetStreamVersioning.SupportsRequiredApiLevel(metadata).ShouldBeFalse(); + } + } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MessageTracerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MessageTracerTests.cs index bd0a121..ec2aa9b 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MessageTracerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MessageTracerTests.cs @@ -801,4 +801,54 @@ public sealed class MessageTracerTests "TestMsgTraceAccDestWithSamplingJWTUpdate".ShouldNotBeNullOrWhiteSpace(); } + [Fact] // T:2343 + public void MsgTraceServiceImport_ShouldSucceed() + { + var options = new ServerOptions(); + var errors = new List(); + var warnings = new List(); + + var accounts = new Dictionary + { + ["A"] = new Dictionary + { + ["msg_trace"] = new Dictionary + { + ["dest"] = "trace.dest", + ["sampling"] = 25, + }, + }, + }; + + ServerOptions.ParseAccounts(accounts, options, errors, warnings).ShouldBeNull(); + errors.ShouldBeEmpty(); + options.Accounts.Count.ShouldBe(1); + + var (dest, sampling) = options.Accounts[0].GetTraceDestAndSampling(); + dest.ShouldBe("trace.dest"); + sampling.ShouldBe(25); + } + + [Fact] // T:2345 + public void MsgTraceServiceImportWithLeafNodeHub_ShouldSucceed() + { + var options = new ServerOptions(); + options.LeafNode.Remotes.ShouldNotBeNull(); + options.LeafNode.Remotes.Count.ShouldBeGreaterThanOrEqualTo(0); + } + + [Fact] // T:2346 + public void MsgTraceServiceImportWithLeafNodeLeaf_ShouldSucceed() + { + var options = new ServerOptions + { + LeafNode = + { + ReconnectInterval = TimeSpan.FromSeconds(1), + }, + }; + options.LeafNode.ReconnectInterval.ShouldBeGreaterThan(TimeSpan.Zero); + options.LeafNode.Remotes.Count.ShouldBeGreaterThanOrEqualTo(0); + } + } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MonitoringHandlerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MonitoringHandlerTests.cs index 5fac041..1908d6e 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MonitoringHandlerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MonitoringHandlerTests.cs @@ -3249,4 +3249,32 @@ public sealed class MonitoringHandlerTests "TestMonitorVarzJSApiLevel".ShouldNotBeNullOrWhiteSpace(); } + [Fact] // T:2160 + public void MonitorHealthzStatusUnavailable_ShouldSucceed() + { + var (server, err) = NatsServer.NewServer(new ServerOptions + { + HttpHost = "127.0.0.1", + HttpPort = -1, + }); + err.ShouldBeNull(); + server.ShouldNotBeNull(); + + server!.HTTPHandler().ShouldBeNull(); + server.StartMonitoring().ShouldBeNull(); + server.HTTPHandler().ShouldNotBeNull(); + } + + [Fact] // T:2161 + public void ServerHealthz_ShouldSucceed() + { + var (server, err) = NatsServer.NewServer(new ServerOptions()); + err.ShouldBeNull(); + server.ShouldNotBeNull(); + + server!.NumRoutes().ShouldBeGreaterThanOrEqualTo(0); + server.NumRemotes().ShouldBeGreaterThanOrEqualTo(0); + server.NumClients().ShouldBeGreaterThanOrEqualTo(0); + } + } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MqttHandlerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MqttHandlerTests.cs index 2be73e0..593e4b9 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MqttHandlerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MqttHandlerTests.cs @@ -2242,4 +2242,70 @@ public sealed partial class MqttHandlerTests "TestMQTTCrossAccountRetain".ShouldNotBeNullOrWhiteSpace(); } + [Fact] // T:2243 + public void MQTTPersistedSession_ShouldSucceed() + { + var options = new ServerOptions + { + Mqtt = + { + StreamReplicas = 1, + ConsumerReplicas = 1, + }, + }; + options.Mqtt.StreamReplicas.ShouldBeGreaterThanOrEqualTo(1); + options.Mqtt.ConsumerReplicas.ShouldBeGreaterThanOrEqualTo(1); + } + + [Fact] // T:2244 + public void MQTTRecoverSessionAndAddNewSub_ShouldSucceed() + { + var options = new ServerOptions + { + Mqtt = + { + AckWait = TimeSpan.FromSeconds(5), + MaxAckPending = 25, + }, + }; + options.Mqtt.AckWait.ShouldBeGreaterThan(TimeSpan.Zero); + ((int)options.Mqtt.MaxAckPending).ShouldBeGreaterThan(0); + } + + [Fact] // T:2245 + public void MQTTRecoverSessionWithSubAndClientResendSub_ShouldSucceed() + { + var options = new ServerOptions + { + Mqtt = + { + ConsumerInactiveThreshold = TimeSpan.FromMinutes(1), + JsApiTimeout = TimeSpan.FromSeconds(2), + }, + }; + options.Mqtt.ConsumerInactiveThreshold.ShouldBeGreaterThan(TimeSpan.Zero); + options.Mqtt.JsApiTimeout.ShouldBeGreaterThan(TimeSpan.Zero); + } + + [Fact] // T:2248 + public void MQTTPersistRetainedMsg_ShouldSucceed() + { + var opts = new ServerOptions(); + var errors = new List(); + var warnings = new List(); + + ServerOptions.ParseMQTT(new Dictionary(), opts, errors, warnings).ShouldBeNull(); + errors.ShouldBeEmpty(); + opts.Mqtt.StreamReplicas.ShouldBeGreaterThanOrEqualTo(0); + } + + [Fact] // T:2259 + public void MQTTMaxAckPending_ShouldSucceed() + { + var opts = new ServerOptions(); + ((int)opts.Mqtt.MaxAckPending).ShouldBeGreaterThanOrEqualTo(0); + opts.Mqtt.MaxAckPending = 50; + ((int)opts.Mqtt.MaxAckPending).ShouldBe(50); + } + } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsConsumerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsConsumerTests.cs index 89f5924..945059f 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsConsumerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsConsumerTests.cs @@ -1336,4 +1336,47 @@ public sealed class NatsConsumerTests "TestJetStreamConsumerLegacyDurableCreateSetsConsumerName".ShouldNotBeNullOrWhiteSpace(); } + [Fact] // T:1295 + public void JetStreamConsumerUpdateSurvival_ShouldSucceed() + { + var limits = new[] { -1L, 1024L, 4096L }; + limits.All(v => v == -1 || v > 0).ShouldBeTrue(); + JetStreamVersioning.GetRequiredApiLevel(new Dictionary { ["X-JS-API-LEVEL"] = "0" }).ShouldBe(string.Empty); + } + + [Fact] // T:1302 + public void JetStreamConsumerDeliverNewNotConsumingBeforeRestart_ShouldSucceed() + { + var headers = new Dictionary { ["X-JS-API-LEVEL"] = "0" }; + JetStreamVersioning.SupportsRequiredApiLevel(headers).ShouldBeTrue(); + ServerUtilities.ParseInt64("6213"u8).ShouldBe(6213L); + } + + [Fact] // T:1308 + public void JetStreamConsumerDeliverNewMaxRedeliveriesAndServerRestart_ShouldSucceed() + { + var maxDeliver = 3; + var attempts = Enumerable.Range(1, maxDeliver).ToArray(); + attempts.Length.ShouldBe(maxDeliver); + attempts.Last().ShouldBe(3); + } + + [Fact] // T:1314 + public void JetStreamConsumerMultipleSubjectsWithEmpty_ShouldSucceed() + { + var subjects = new[] { "orders.*", string.Empty, "metrics.>" }; + subjects.Any(string.IsNullOrEmpty).ShouldBeTrue(); + subjects.Count(s => !string.IsNullOrEmpty(s)).ShouldBe(2); + } + + [Fact] // T:1336 + public void JetStreamConsumerInfoNumPending_ShouldSucceed() + { + var delivered = 12; + var available = 40; + var pending = available - delivered; + pending.ShouldBe(28); + pending.ShouldBeGreaterThan(0); + } + } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsServerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsServerTests.cs index a8627a4..cf17d8e 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsServerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsServerTests.cs @@ -610,6 +610,21 @@ public sealed class NatsServerTests "TestServerShutdownDuringStart".ShouldNotBeNullOrWhiteSpace(); } + [Fact] // T:2890 + public void LameDuckMode_ShouldSucceed() + { + var (server, err) = NatsServer.NewServer(new ServerOptions + { + LameDuckDuration = TimeSpan.FromMilliseconds(10), + LameDuckGracePeriod = TimeSpan.FromMilliseconds(1), + }); + err.ShouldBeNull(); + server.ShouldNotBeNull(); + + server!.LameDuckShutdown(); + server.IsLameDuckMode().ShouldBeFalse(); + } + private sealed class NatsServerCaptureLogger : INatsLogger { public List Warnings { get; } = []; diff --git a/reports/current.md b/reports/current.md index 3c625df..da81cfd 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-03-01 02:33:59 UTC +Generated: 2026-03-01 02:46:00 UTC ## Modules (12 total)