feat(batch27): merge jetstream-core

This commit is contained in:
Joseph Doherty
2026-02-28 21:46:00 -05:00
21 changed files with 2445 additions and 5 deletions

View File

@@ -0,0 +1,482 @@
namespace ZB.MOM.NatsNet.Server;
public sealed partial class Account
{
private static Dictionary<string, object> DefaultJetStreamAccountTiers()
{
return new Dictionary<string, object>(StringComparer.Ordinal)
{
[string.Empty] = new JetStreamAccountLimits
{
MaxMemory = -1,
MaxStore = -1,
MaxStreams = -1,
MaxConsumers = -1,
MaxAckPending = -1,
MemoryMaxStreamBytes = -1,
StoreMaxStreamBytes = -1,
},
};
}
private static Dictionary<string, JetStreamAccountLimits> ToTypedLimits(Dictionary<string, object> limits)
{
var typed = new Dictionary<string, JetStreamAccountLimits>(StringComparer.Ordinal);
foreach (var (tier, value) in limits)
{
if (value is JetStreamAccountLimits v)
typed[tier] = v;
}
return typed;
}
private static JetStreamAccountLimits SelectLimits(Dictionary<string, JetStreamAccountLimits> limits)
{
if (limits.TryGetValue(string.Empty, out var selected))
return selected;
foreach (var (_, value) in limits)
return value;
return new JetStreamAccountLimits();
}
internal void AssignJetStreamLimits(Dictionary<string, object> limits)
{
_mu.EnterWriteLock();
try
{
JetStreamLimits = limits;
}
finally
{
_mu.ExitWriteLock();
}
}
internal Exception? EnableJetStream(Dictionary<string, object>? limits)
{
_mu.EnterReadLock();
var server = Server as NatsServer;
_mu.ExitReadLock();
if (server == null)
return new InvalidOperationException("jetstream account not registered");
if (ReferenceEquals(server.SystemAccount(), this))
return new InvalidOperationException("jetstream can not be enabled on the system account");
limits ??= DefaultJetStreamAccountTiers();
if (limits.Count == 0)
limits = DefaultJetStreamAccountTiers();
AssignJetStreamLimits(limits);
var typedLimits = ToTypedLimits(limits);
var js = server.GetJetStreamState();
if (js == null)
return new InvalidOperationException("jetstream not enabled");
js.Lock.EnterWriteLock();
try
{
if (js.Accounts.TryGetValue(Name, out var existing))
{
_mu.EnterWriteLock();
JetStream = existing;
_mu.ExitWriteLock();
return EnableAllJetStreamServiceImportsAndMappings();
}
var jsa = new JsAccount
{
Js = js,
Account = this,
StoreDir = Path.Combine(js.Config.StoreDir, Name),
};
foreach (var (tier, tierLimits) in typedLimits)
jsa.Limits[tier] = tierLimits;
js.Accounts[Name] = jsa;
_mu.EnterWriteLock();
JetStream = jsa;
_mu.ExitWriteLock();
}
finally
{
js.Lock.ExitWriteLock();
}
return EnableAllJetStreamServiceImportsAndMappings();
}
internal (NatsServer? Server, JsAccount? JetStreamAccount, Exception? Error) CheckForJetStream()
{
_mu.EnterReadLock();
var server = Server as NatsServer;
var jsa = JetStream;
_mu.ExitReadLock();
if (server == null || jsa == null)
{
var description = JsApiErrors.NewJSNotEnabledForAccountError().Description ?? "jetstream not enabled for account";
return (null, null, new InvalidOperationException(description));
}
return (server, jsa, null);
}
internal (bool MaxBytesRequired, long MaxStreamBytes) MaxBytesLimits(StreamConfig? cfg)
{
_mu.EnterReadLock();
var jsa = JetStream;
_mu.ExitReadLock();
if (jsa == null)
return (false, 0);
jsa.UsageLock.EnterReadLock();
try
{
var selected = SelectLimits(jsa.Limits);
var maxStreamBytes = cfg?.Storage == StorageType.MemoryStorage
? selected.MemoryMaxStreamBytes
: selected.StoreMaxStreamBytes;
return (selected.MaxBytesRequired, maxStreamBytes);
}
finally
{
jsa.UsageLock.ExitReadLock();
}
}
internal int NumStreams()
{
_mu.EnterReadLock();
var jsa = JetStream;
_mu.ExitReadLock();
if (jsa == null)
return 0;
jsa.Lock.EnterReadLock();
try
{
return jsa.Streams.Count;
}
finally
{
jsa.Lock.ExitReadLock();
}
}
internal List<NatsStream> Streams() => FilteredStreams(string.Empty);
internal List<NatsStream> FilteredStreams(string filter)
{
_mu.EnterReadLock();
var jsa = JetStream;
_mu.ExitReadLock();
if (jsa == null)
return [];
jsa.Lock.EnterReadLock();
try
{
var streams = new List<NatsStream>();
foreach (var stream in jsa.Streams.Values.OfType<NatsStream>())
{
if (string.IsNullOrWhiteSpace(filter))
{
streams.Add(stream);
continue;
}
foreach (var subject in stream.Config.Subjects ?? [])
{
if (Internal.DataStructures.SubscriptionIndex.SubjectsCollide(filter, subject))
{
streams.Add(stream);
break;
}
}
}
return streams;
}
finally
{
jsa.Lock.ExitReadLock();
}
}
internal (NatsStream? Stream, Exception? Error) LookupStream(string name)
{
_mu.EnterReadLock();
var jsa = JetStream;
_mu.ExitReadLock();
if (jsa == null)
return (null, new InvalidOperationException("jetstream not enabled for account"));
jsa.Lock.EnterReadLock();
try
{
if (jsa.Streams.TryGetValue(name, out var stream) && stream is NatsStream ns)
return (ns, null);
return (null, new InvalidOperationException("stream not found"));
}
finally
{
jsa.Lock.ExitReadLock();
}
}
internal Exception? UpdateJetStreamLimits(Dictionary<string, object>? limits)
{
_mu.EnterReadLock();
var server = Server as NatsServer;
var jsa = JetStream;
_mu.ExitReadLock();
if (server == null)
return new InvalidOperationException("jetstream account not registered");
if (server.GetJetStreamState() == null)
return new InvalidOperationException("jetstream not enabled");
if (jsa == null)
return new InvalidOperationException("jetstream not enabled for account");
limits ??= DefaultJetStreamAccountTiers();
if (limits.Count == 0)
limits = DefaultJetStreamAccountTiers();
AssignJetStreamLimits(limits);
var typed = ToTypedLimits(limits);
jsa.UsageLock.EnterWriteLock();
try
{
jsa.Limits.Clear();
foreach (var (tier, tierLimits) in typed)
jsa.Limits[tier] = tierLimits;
}
finally
{
jsa.UsageLock.ExitWriteLock();
}
return null;
}
public JetStreamAccountStats JetStreamUsage()
{
_mu.EnterReadLock();
var jsa = JetStream;
var accountName = Name;
var configuredLimits = JetStreamLimits;
_mu.ExitReadLock();
var stats = new JetStreamAccountStats();
if (jsa == null)
return stats;
var (js, _) = jsa.JetStreamAndClustered();
if (js == null)
return stats;
jsa.UsageLock.EnterReadLock();
try
{
long mem = 0;
long store = 0;
foreach (var usage in jsa.Usage.Values)
{
mem += usage.Total.Mem;
store += usage.Total.Store;
}
stats.Memory = (ulong)Math.Max(0, mem);
stats.Store = (ulong)Math.Max(0, store);
stats.Domain = js.Config.Domain;
stats.Api = new JetStreamApiStats
{
Level = JetStreamVersioning.JsApiLevel,
Total = jsa.ApiTotal,
Errors = jsa.ApiErrors,
};
if (jsa.Limits.TryGetValue(string.Empty, out var defaultTier))
{
stats.Limits = defaultTier;
}
else
{
stats.Tiers = new Dictionary<string, JetStreamTier>(StringComparer.Ordinal);
foreach (var (tier, usage) in jsa.Usage)
{
jsa.Limits.TryGetValue(tier, out var tierLimits);
stats.Tiers[tier] = new JetStreamTier
{
Memory = (ulong)Math.Max(0, usage.Total.Mem),
Store = (ulong)Math.Max(0, usage.Total.Store),
Limits = tierLimits ?? new JetStreamAccountLimits(),
};
}
if (configuredLimits != null)
{
foreach (var (tier, value) in configuredLimits)
{
if (stats.Tiers.ContainsKey(tier))
continue;
if (value is not JetStreamAccountLimits lim)
continue;
stats.Tiers[tier] = new JetStreamTier { Limits = lim };
}
}
}
}
finally
{
jsa.UsageLock.ExitReadLock();
}
var allStreams = Streams();
stats.Streams = allStreams.Count;
foreach (var stream in allStreams)
stats.Consumers += stream.State().Consumers;
if (stats.Tiers != null)
{
foreach (var stream in allStreams)
{
var tier = JetStreamEngine.TierName(stream.Config.Replicas);
if (!stats.Tiers.TryGetValue(tier, out var u))
u = new JetStreamTier();
u.Streams++;
u.Consumers += stream.State().Consumers;
stats.Tiers[tier] = u;
}
}
if (stats.Tiers == null || stats.Tiers.Count == 0)
{
var (rmem, rstore) = jsa.ReservedStorage(string.Empty);
stats.ReservedMemory = rmem;
stats.ReservedStore = rstore;
}
else
{
foreach (var tier in stats.Tiers.Keys.ToArray())
{
var tierStats = stats.Tiers[tier];
(tierStats.ReservedMemory, tierStats.ReservedStore) = jsa.ReservedStorage(tier);
stats.Tiers[tier] = tierStats;
}
}
_ = accountName;
return stats;
}
internal Exception? DisableJetStream() => RemoveJetStream();
internal Exception? RemoveJetStream()
{
_mu.EnterWriteLock();
var server = Server as NatsServer;
var jsa = JetStream;
JetStream = null;
_mu.ExitWriteLock();
if (server == null)
return new InvalidOperationException("jetstream account not registered");
var js = server.GetJetStream();
if (js == null)
return new InvalidOperationException("jetstream not enabled for account");
return js.DisableJetStream(jsa);
}
internal bool JetStreamConfigured()
{
_mu.EnterReadLock();
try
{
return JetStreamLimits != null && JetStreamLimits.Count > 0;
}
finally
{
_mu.ExitReadLock();
}
}
internal bool JetStreamEnabled()
{
_mu.EnterReadLock();
try
{
return JetStream != null;
}
finally
{
_mu.ExitReadLock();
}
}
internal Exception? EnableAllJetStreamServiceImportsAndMappings()
{
_mu.EnterReadLock();
var server = Server as NatsServer;
_mu.ExitReadLock();
if (server == null)
return new InvalidOperationException("jetstream account not registered");
var systemAccount = server.SystemAccount();
var destinationName = systemAccount?.Name ?? string.Empty;
if (systemAccount != null && !ServiceImportExists(destinationName, JsApiSubjects.JsAllApi))
{
var err = AddServiceImport(systemAccount, JsApiSubjects.JsAllApi, JsApiSubjects.JsAllApi);
if (err != null)
return new InvalidOperationException($"error setting up jetstream service imports for account: {err.Message}", err);
}
var domain = server.GetOpts().JetStreamDomain;
if (!string.IsNullOrWhiteSpace(domain))
{
var mappings = new Dictionary<string, string>(StringComparer.Ordinal)
{
[$"$JS.{domain}.API.>"] = JsApiSubjects.JsAllApi,
[$"$JS.{domain}.API.INFO"] = JsApiSubjects.JsApiAccountInfo,
};
_mu.EnterReadLock();
try
{
foreach (var mapping in _mappings)
mappings.Remove(mapping.Source);
}
finally
{
_mu.ExitReadLock();
}
foreach (var (src, dest) in mappings)
{
var err = AddMapping(src, dest);
if (err != null)
server.Errorf("Error adding JetStream domain mapping: {0}", err.Message);
}
}
return null;
}
internal Exception? EnableJetStreamInfoServiceImportOnly()
{
if (ServiceImportShadowed(JsApiSubjects.JsApiAccountInfo))
return null;
return EnableAllJetStreamServiceImportsAndMappings();
}
}

View File

@@ -32,7 +32,7 @@ namespace ZB.MOM.NatsNet.Server;
/// <see cref="ClientConnection"/> can interact with it without a hard dependency.
/// Mirrors Go <c>Account</c> struct in server/accounts.go.
/// </summary>
public sealed class Account : INatsAccount
public sealed partial class Account : INatsAccount
{
// -------------------------------------------------------------------------
// Constants
@@ -261,7 +261,7 @@ public sealed class Account : INatsAccount
/// JetStream account state. Mirrors Go <c>js *jsAccount</c>.
/// TODO: session 19 — JetStream implementation.
/// </summary>
internal object? JetStream { get; set; }
internal JsAccount? JetStream { get; set; }
/// <summary>
/// Per-domain JetStream limits. Mirrors Go <c>jsLimits map[string]JetStreamAccountLimits</c>.

View File

@@ -0,0 +1,444 @@
namespace ZB.MOM.NatsNet.Server;
internal sealed class JetStreamEngine(JetStream state)
{
private readonly JetStream _state = state;
private static readonly TimeSpan MinUsageUpdateWindow = TimeSpan.FromMilliseconds(250);
private const string JsWillExtend = "will_extend";
private const string JsNoExtend = "no_extend";
private const string JsDomainApiTemplate = "$JS.{0}.API.>";
internal void SetStarted()
{
_state.Lock.EnterWriteLock();
try
{
_state.Started = DateTime.UtcNow;
}
finally
{
_state.Lock.ExitWriteLock();
}
}
internal bool IsEnabled() => Interlocked.CompareExchange(ref _state.Disabled, 0, 0) == 0;
internal void SetJetStreamStandAlone(bool isStandAlone)
{
_state.Lock.EnterWriteLock();
try
{
_state.StandAlone = isStandAlone;
}
finally
{
_state.Lock.ExitWriteLock();
}
}
internal bool IsShuttingDown()
{
_state.Lock.EnterReadLock();
try
{
return _state.ShuttingDown;
}
finally
{
_state.Lock.ExitReadLock();
}
}
internal Exception? DisableJetStream(JsAccount? account)
{
if (account?.Account is not Account a)
return new InvalidOperationException("jetstream not enabled for account");
_state.Lock.EnterWriteLock();
try
{
_state.Accounts.Remove(a.Name);
}
finally
{
_state.Lock.ExitWriteLock();
}
account.Delete();
return null;
}
internal bool WouldExceedLimits(StorageType storageType, int size)
{
var total = storageType == StorageType.MemoryStorage
? Interlocked.Read(ref _state.MemUsed)
: Interlocked.Read(ref _state.StoreUsed);
var max = storageType == StorageType.MemoryStorage
? _state.Config.MaxMemory
: _state.Config.MaxStore;
return total + size > max;
}
internal bool LimitsExceeded(StorageType storageType) => WouldExceedLimits(storageType, 0);
internal static string TierName(int replicas) => $"R{(replicas <= 0 ? 1 : replicas)}";
internal static bool IsSameTier(StreamConfig cfgA, StreamConfig cfgB) =>
cfgA.Replicas == cfgB.Replicas;
internal static Dictionary<string, JetStreamAccountLimits> DiffCheckedLimits(
Dictionary<string, JetStreamAccountLimits> a,
Dictionary<string, JetStreamAccountLimits> b)
{
var diff = new Dictionary<string, JetStreamAccountLimits>(StringComparer.Ordinal);
foreach (var (tier, oldLimit) in a)
{
b.TryGetValue(tier, out var newLimit);
newLimit ??= new JetStreamAccountLimits();
diff[tier] = new JetStreamAccountLimits
{
MaxMemory = newLimit.MaxMemory - oldLimit.MaxMemory,
MaxStore = newLimit.MaxStore - oldLimit.MaxStore,
};
}
foreach (var (tier, newLimit) in b)
{
if (a.ContainsKey(tier))
continue;
diff[tier] = new JetStreamAccountLimits
{
MaxMemory = newLimit.MaxMemory,
MaxStore = newLimit.MaxStore,
};
}
return diff;
}
internal static (ulong Mem, ulong Store) ReservedStorage(
Dictionary<string, object?> streamAssignments,
string tier)
{
ulong mem = 0;
ulong store = 0;
foreach (var assignment in streamAssignments.Values.OfType<StreamAssignmentView>())
{
var cfg = assignment.Config;
if (!string.IsNullOrEmpty(tier) && !string.Equals(tier, TierName(cfg.Replicas), StringComparison.Ordinal))
continue;
if (cfg.MaxBytes <= 0)
continue;
if (cfg.Storage == StorageType.FileStorage)
store += (ulong)cfg.MaxBytes;
else if (cfg.Storage == StorageType.MemoryStorage)
mem += (ulong)cfg.MaxBytes;
}
return (mem, store);
}
internal static bool ShouldSendUsageUpdate(DateTime lastUpdateUtc) =>
DateTime.UtcNow - lastUpdateUtc >= MinUsageUpdateWindow;
internal Exception? CheckAccountLimits(
JetStreamAccountLimits selected,
StreamConfig config,
long currentReservation) =>
CheckLimits(selected, config, checkServer: false, currentReservation, 0);
internal Exception? CheckAllLimits(
JetStreamAccountLimits selected,
StreamConfig config,
long currentReservation,
long maxBytesOffset) =>
CheckLimits(selected, config, checkServer: true, currentReservation, maxBytesOffset);
internal Exception? CheckLimits(
JetStreamAccountLimits selected,
StreamConfig config,
bool checkServer,
long currentReservation,
long maxBytesOffset)
{
if (config.MaxConsumers > 0 && selected.MaxConsumers > 0 && config.MaxConsumers > selected.MaxConsumers)
return new InvalidOperationException("maximum consumers limit exceeded");
return CheckBytesLimits(selected, config.MaxBytes, config.Storage, checkServer, currentReservation, maxBytesOffset);
}
internal Exception? CheckBytesLimits(
JetStreamAccountLimits selected,
long addBytes,
StorageType storage,
bool checkServer,
long currentReservation,
long maxBytesOffset)
{
if (addBytes < 0)
addBytes = 1;
var totalBytes = addBytes + maxBytesOffset;
switch (storage)
{
case StorageType.MemoryStorage:
if (selected.MaxMemory >= 0 && currentReservation + totalBytes > selected.MaxMemory)
return new InvalidOperationException("insufficient memory resources");
if (checkServer && Interlocked.Read(ref _state.MemReserved) + totalBytes > _state.Config.MaxMemory)
return new InvalidOperationException("insufficient memory resources");
break;
case StorageType.FileStorage:
if (selected.MaxStore >= 0 && currentReservation + totalBytes > selected.MaxStore)
return new InvalidOperationException("insufficient storage resources");
if (checkServer && Interlocked.Read(ref _state.StoreReserved) + totalBytes > _state.Config.MaxStore)
return new InvalidOperationException("insufficient storage resources");
break;
}
return null;
}
internal JsAccount? LookupAccount(Account? account)
{
if (account == null)
return null;
_state.Lock.EnterReadLock();
try
{
_state.Accounts.TryGetValue(account.Name, out var jsa);
return jsa;
}
finally
{
_state.Lock.ExitReadLock();
}
}
internal JetStreamStats UsageStats()
{
var stats = new JetStreamStats();
_state.Lock.EnterReadLock();
try
{
stats.Accounts = _state.Accounts.Count;
stats.ReservedMemory = (ulong)Math.Max(0, Interlocked.Read(ref _state.MemReserved));
stats.ReservedStore = (ulong)Math.Max(0, Interlocked.Read(ref _state.StoreReserved));
stats.Api.Level = JetStreamVersioning.JsApiLevel;
stats.Api.Total = (ulong)Math.Max(0, Interlocked.Read(ref _state.ApiTotal));
stats.Api.Errors = (ulong)Math.Max(0, Interlocked.Read(ref _state.ApiErrors));
stats.Api.Inflight = (ulong)Math.Max(0, Interlocked.Read(ref _state.ApiInflight));
stats.Memory = (ulong)Math.Max(0, Interlocked.Read(ref _state.MemUsed));
stats.Store = (ulong)Math.Max(0, Interlocked.Read(ref _state.StoreUsed));
stats.HAAssets = 0;
}
finally
{
_state.Lock.ExitReadLock();
}
return stats;
}
internal Exception? SufficientResources(Dictionary<string, JetStreamAccountLimits>? limits)
{
if (limits == null || !_state.StandAlone)
return null;
static (long MaxMem, long MaxStore) Totals(Dictionary<string, JetStreamAccountLimits> source)
{
long mem = 0;
long store = 0;
foreach (var lim in source.Values)
{
if (lim.MaxMemory > 0) mem += lim.MaxMemory;
if (lim.MaxStore > 0) store += lim.MaxStore;
}
return (mem, store);
}
var (totalMem, totalStore) = Totals(limits);
if (Interlocked.Read(ref _state.MemReserved) + totalMem > _state.Config.MaxMemory)
return new InvalidOperationException("insufficient memory resources");
if (Interlocked.Read(ref _state.StoreReserved) + totalStore > _state.Config.MaxStore)
return new InvalidOperationException("insufficient storage resources");
long reservedMem = 0;
long reservedStore = 0;
_state.Lock.EnterReadLock();
try
{
foreach (var jsa in _state.Accounts.Values)
{
jsa.UsageLock.EnterReadLock();
try
{
var (m, s) = Totals(jsa.Limits);
reservedMem += m;
reservedStore += s;
}
finally
{
jsa.UsageLock.ExitReadLock();
}
}
}
finally
{
_state.Lock.ExitReadLock();
}
if (reservedMem + totalMem > _state.Config.MaxMemory)
return new InvalidOperationException("insufficient memory resources");
if (reservedStore + totalStore > _state.Config.MaxStore)
return new InvalidOperationException("insufficient storage resources");
return null;
}
internal void ReserveStreamResources(StreamConfig? cfg)
{
if (cfg == null || cfg.MaxBytes <= 0)
return;
if (cfg.Storage == StorageType.MemoryStorage)
Interlocked.Add(ref _state.MemReserved, cfg.MaxBytes);
else if (cfg.Storage == StorageType.FileStorage)
Interlocked.Add(ref _state.StoreReserved, cfg.MaxBytes);
}
internal void ReleaseStreamResources(StreamConfig? cfg)
{
if (cfg == null || cfg.MaxBytes <= 0)
return;
if (cfg.Storage == StorageType.MemoryStorage)
Interlocked.Add(ref _state.MemReserved, -cfg.MaxBytes);
else if (cfg.Storage == StorageType.FileStorage)
Interlocked.Add(ref _state.StoreReserved, -cfg.MaxBytes);
}
internal static string FriendlyBytes<T>(T bytes)
where T : struct, IConvertible
{
var value = Convert.ToDouble(bytes);
const int baseValue = 1024;
var units = new[] { "K", "M", "G", "T", "P", "E" };
if (value < baseValue)
return $"{value} B";
var exp = (int)(Math.Log(value) / Math.Log(baseValue));
var index = Math.Clamp(exp - 1, 0, units.Length - 1);
return $"{value / Math.Pow(baseValue, exp):0.00} {units[index]}B";
}
internal static bool IsValidName(string name)
{
if (string.IsNullOrWhiteSpace(name))
return false;
return name.IndexOfAny([' ', '\t', '\r', '\n', '\f', '.', '*', '>']) < 0;
}
internal static Exception? ValidateJetStreamOptions(ServerOptions options)
{
foreach (var (account, domain) in options.JsAccDefaultDomain)
{
var exists = false;
if (ServerOptions.IsReservedAccount(account))
{
exists = true;
}
else if (options.TrustedOperators.Count == 0)
{
foreach (var configured in options.Accounts)
{
if (!string.Equals(configured.GetName(), account, StringComparison.Ordinal))
continue;
if (configured.JetStreamLimits?.Count > 0 && !string.IsNullOrEmpty(domain))
return new InvalidOperationException($"default_js_domain contains account name \"{account}\" with enabled JetStream");
exists = true;
break;
}
}
else
{
exists = IsLikelyPublicAccountNkey(account);
}
if (!exists)
return new InvalidOperationException($"in non operator mode, `default_js_domain` references non existing account \"{account}\"");
}
foreach (var (account, domain) in options.JsAccDefaultDomain)
{
var systemAccount = string.IsNullOrWhiteSpace(options.SystemAccount)
? ServerConstants.DefaultSystemAccount
: options.SystemAccount;
if (string.Equals(account, systemAccount, StringComparison.Ordinal))
return new InvalidOperationException($"system account \"{account}\" can not be in default_js_domain");
if (string.IsNullOrWhiteSpace(domain))
continue;
var subject = string.Format(JsDomainApiTemplate, domain);
if (!Internal.DataStructures.SubscriptionIndex.IsValidSubject(subject))
return new InvalidOperationException($"default_js_domain contains account \"{account}\" with invalid domain name \"{domain}\"");
}
if (!string.IsNullOrWhiteSpace(options.JetStreamDomain))
{
var subject = string.Format(JsDomainApiTemplate, options.JetStreamDomain);
if (!Internal.DataStructures.SubscriptionIndex.IsValidSubject(subject))
return new InvalidOperationException($"invalid domain name: derived \"{subject}\" is not a valid subject");
if (!IsValidName(options.JetStreamDomain))
return new InvalidOperationException("invalid domain name: may not contain ., * or >");
}
if (!options.JetStream || options.Cluster.Port == 0)
return null;
if (string.IsNullOrWhiteSpace(options.ServerName))
return new InvalidOperationException("jetstream cluster requires `server_name` to be set");
if (string.IsNullOrWhiteSpace(options.Cluster.Name))
return new InvalidOperationException("jetstream cluster requires `cluster.name` to be set");
var hint = options.JetStreamExtHint.ToLowerInvariant();
if (hint is not JsWillExtend and not JsNoExtend and not "")
return new InvalidOperationException($"expected 'no_extend' for string value, got '{hint}'");
options.JetStreamExtHint = hint;
if (options.JetStreamMaxCatchup < 0)
return new InvalidOperationException("jetstream max catchup cannot be negative");
return null;
}
internal static void FixCfgMirrorWithDedupWindow(StreamConfig? config)
{
if (config?.Mirror == null)
return;
if (config.Duplicates != TimeSpan.Zero)
config.Duplicates = TimeSpan.Zero;
}
private static bool IsLikelyPublicAccountNkey(string value) =>
!string.IsNullOrWhiteSpace(value) &&
value.Length >= 10 &&
value.StartsWith("A", StringComparison.Ordinal);
}
internal sealed class StreamAssignmentView
{
public required StreamConfig Config { get; init; }
}

View File

@@ -182,7 +182,7 @@ public sealed class JetStreamAccountStats
/// The main JetStream engine, one per server.
/// Mirrors <c>jetStream</c> struct in server/jetstream.go.
/// </summary>
internal sealed class JetStream
internal sealed partial class JetStream
{
// Atomic counters (use Interlocked for thread-safety)
public long ApiInflight;
@@ -238,7 +238,7 @@ internal sealed class JsaStorage
/// A JetStream-enabled account, holding streams, limits and usage tracking.
/// Mirrors <c>jsAccount</c> in server/jetstream.go.
/// </summary>
internal sealed class JsAccount
internal sealed partial class JsAccount
{
private readonly ReaderWriterLockSlim _mu = new();

View File

@@ -0,0 +1,305 @@
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class JsAccount
{
internal (ulong Mem, ulong Store) ReservedStorage(string tier)
{
ulong mem = 0;
ulong store = 0;
Lock.EnterReadLock();
try
{
foreach (var stream in Streams.Values.OfType<NatsStream>())
{
var cfg = stream.Config;
if (!string.IsNullOrEmpty(tier) && !string.Equals(tier, JetStreamEngine.TierName(cfg.Replicas), StringComparison.Ordinal))
continue;
if (cfg.MaxBytes <= 0)
continue;
if (cfg.Storage == StorageType.FileStorage)
store += (ulong)cfg.MaxBytes;
else if (cfg.Storage == StorageType.MemoryStorage)
mem += (ulong)cfg.MaxBytes;
}
}
finally
{
Lock.ExitReadLock();
}
return (mem, store);
}
internal void RemoteUpdateUsage(byte[] message)
{
if (message.Length < 16)
return;
UsageLock.EnterWriteLock();
try
{
if (!Usage.TryGetValue(string.Empty, out var usage))
{
usage = new JsaStorage();
Usage[string.Empty] = usage;
}
usage.Total.Mem = BitConverter.ToInt64(message, 0);
usage.Total.Store = BitConverter.ToInt64(message, 8);
}
finally
{
UsageLock.ExitWriteLock();
}
}
internal void CheckAndSyncUsage(string tier, StorageType storageType)
{
if (Interlocked.CompareExchange(ref Sync, 1, 0) != 0)
return;
try
{
Lock.EnterReadLock();
try
{
long total = 0;
foreach (var stream in Streams.Values.OfType<NatsStream>())
{
if (!string.Equals(JetStreamEngine.TierName(stream.Config.Replicas), tier, StringComparison.Ordinal))
continue;
if (stream.Config.Storage != storageType)
continue;
total += (long)stream.State().Bytes;
}
UsageLock.EnterWriteLock();
try
{
Usage.TryGetValue(tier, out var usage);
usage ??= new JsaStorage();
Usage[tier] = usage;
if (storageType == StorageType.MemoryStorage)
{
usage.Local.Mem = total;
usage.Total.Mem = total;
}
else
{
usage.Local.Store = total;
usage.Total.Store = total;
}
}
finally
{
UsageLock.ExitWriteLock();
}
}
finally
{
Lock.ExitReadLock();
}
}
finally
{
Interlocked.Exchange(ref Sync, 0);
}
}
internal void UpdateUsage(string tier, StorageType storageType, long delta)
{
UsageLock.EnterWriteLock();
try
{
Usage.TryGetValue(tier, out var usage);
usage ??= new JsaStorage();
Usage[tier] = usage;
if (storageType == StorageType.MemoryStorage)
{
usage.Local.Mem += delta;
usage.Total.Mem += delta;
}
else
{
usage.Local.Store += delta;
usage.Total.Store += delta;
}
}
finally
{
UsageLock.ExitWriteLock();
}
}
internal void SendClusterUsageUpdateTimer()
{
UsageLock.EnterWriteLock();
try
{
SendClusterUsageUpdate();
}
finally
{
UsageLock.ExitWriteLock();
}
}
internal void SendClusterUsageUpdate()
{
var now = DateTime.UtcNow;
if (!JetStreamEngine.ShouldSendUsageUpdate(LUpdate))
return;
LUpdate = now;
// Cluster bus publish is wired in later cluster sessions.
UsageApi = ApiTotal;
UsageErr = ApiErrors;
}
internal (JetStream? JetStream, bool Clustered) JetStreamAndClustered()
{
Lock.EnterReadLock();
try
{
var js = Js as JetStream;
return (js, js?.Cluster != null);
}
finally
{
Lock.ExitReadLock();
}
}
internal Account? Acc() => Account as Account;
internal (JetStreamAccountLimits Limits, string Tier, bool Found) SelectLimits(int replicas)
{
UsageLock.EnterReadLock();
try
{
if (Limits.TryGetValue(string.Empty, out var selected))
return (selected, string.Empty, true);
var tier = JetStreamEngine.TierName(replicas);
if (Limits.TryGetValue(tier, out selected))
return (selected, tier, true);
return (new JetStreamAccountLimits(), string.Empty, false);
}
finally
{
UsageLock.ExitReadLock();
}
}
internal int CountStreams(string tier, StreamConfig cfg)
{
Lock.EnterReadLock();
try
{
var count = 0;
foreach (var stream in Streams.Values.OfType<NatsStream>())
{
if ((!string.IsNullOrEmpty(tier) && !JetStreamEngine.IsSameTier(stream.Config, cfg)) ||
string.Equals(stream.Config.Name, cfg.Name, StringComparison.Ordinal))
continue;
count++;
}
return count;
}
finally
{
Lock.ExitReadLock();
}
}
internal (ulong Mem, ulong Store) StorageTotals()
{
UsageLock.EnterReadLock();
try
{
ulong mem = 0;
ulong store = 0;
foreach (var usage in Usage.Values)
{
mem += (ulong)Math.Max(0, usage.Total.Mem);
store += (ulong)Math.Max(0, usage.Total.Store);
}
return (mem, store);
}
finally
{
UsageLock.ExitReadLock();
}
}
internal (bool Exceeded, JsApiError? Error) LimitsExceeded(StorageType storageType, string tier, int replicas) =>
WouldExceedLimits(storageType, tier, replicas, string.Empty, null, null);
internal (bool Exceeded, JsApiError? Error) WouldExceedLimits(
StorageType storageType,
string tier,
int replicas,
string subject,
byte[]? headers,
byte[]? payload)
{
UsageLock.EnterReadLock();
try
{
if (!Limits.TryGetValue(tier, out var selected))
return (true, JsApiErrors.NewJSNoLimitsError());
if (!Usage.TryGetValue(tier, out var inUse))
return (false, null);
var r = Math.Max(1, replicas);
var lr = string.IsNullOrEmpty(tier) ? 1 : r;
var bytes = (subject.Length + (headers?.Length ?? 0) + (payload?.Length ?? 0)) * r;
if (storageType == StorageType.MemoryStorage)
{
var totalMem = inUse.Total.Mem + bytes;
if (selected.MemoryMaxStreamBytes > 0 && totalMem > selected.MemoryMaxStreamBytes * lr)
return (true, null);
if (selected.MaxMemory >= 0 && totalMem > selected.MaxMemory * lr)
return (true, null);
}
else
{
var totalStore = inUse.Total.Store + bytes;
if (selected.StoreMaxStreamBytes > 0 && totalStore > selected.StoreMaxStreamBytes * lr)
return (true, null);
if (selected.MaxStore >= 0 && totalStore > selected.MaxStore * lr)
return (true, null);
}
return (false, null);
}
finally
{
UsageLock.ExitReadLock();
}
}
internal void Delete()
{
Lock.EnterWriteLock();
try
{
Streams.Clear();
Inflight.Clear();
UpdatesSub = null;
UpdatesPub = string.Empty;
}
finally
{
Lock.ExitWriteLock();
}
}
}

View File

@@ -707,6 +707,10 @@ public sealed partial class NatsServer
// Trusted operators, leafnode, auth, proxies, gateway, cluster, MQTT, websocket
// — validation stubs delegating to not-yet-ported subsystems.
var jsErr = JetStreamEngine.ValidateJetStreamOptions(o);
if (jsErr != null)
return jsErr;
var err = ValidateCluster(o);
return err;
}

View File

@@ -0,0 +1,578 @@
using System.Security.Cryptography;
using System.Text;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class NatsServer
{
private const string JetStreamStoreDir = "jetstream";
private const long JetStreamMaxMemDefault = 1024L * 1024L * 256L;
public Exception? EnableJetStream(JetStreamConfig? config)
{
if (JetStreamEnabled())
return new InvalidOperationException("jetstream already enabled");
Noticef("Starting JetStream");
if (config == null || config.MaxMemory <= 0 || config.MaxStore <= 0)
{
config = new JetStreamConfig
{
StoreDir = string.IsNullOrWhiteSpace(GetOpts().StoreDir)
? Path.Combine(Path.GetTempPath(), JetStreamStoreDir)
: Path.Combine(GetOpts().StoreDir, JetStreamStoreDir),
MaxMemory = GetOpts().JetStreamMaxMemory > 0 ? GetOpts().JetStreamMaxMemory : 1,
MaxStore = GetOpts().JetStreamMaxStore > 0 ? GetOpts().JetStreamMaxStore : 1,
SyncInterval = GetOpts().SyncInterval,
SyncAlways = GetOpts().SyncAlways,
Domain = GetOpts().JetStreamDomain,
};
}
else if (!string.IsNullOrWhiteSpace(config.StoreDir))
{
config.StoreDir = Path.Combine(config.StoreDir, JetStreamStoreDir);
}
if (string.IsNullOrWhiteSpace(config.StoreDir))
{
config.StoreDir = Path.Combine(Path.GetTempPath(), JetStreamStoreDir);
Warnf("Temporary storage directory used, data could be lost on system reboot");
}
var err = CheckStoreDir(config);
if (err != null)
return err;
return EnableJetStreamInternal(config);
}
private KeyGen? JsKeyGen(string jsKey, string info)
{
if (string.IsNullOrEmpty(jsKey))
return null;
return context =>
{
using var hmac = new HMACSHA256(Encoding.UTF8.GetBytes(jsKey));
hmac.TransformBlock(Encoding.UTF8.GetBytes(info), 0, info.Length, null, 0);
hmac.TransformFinalBlock(context, 0, context.Length);
return hmac.Hash ?? [];
};
}
internal (byte[]? Plain, bool UsedFallback, Exception? Error) DecryptMeta(
StoreCipher storeCipher,
byte[] encryptedKey,
byte[] encryptedBuffer,
string accountName,
string context)
{
if (encryptedKey.Length == 0)
return (null, false, new InvalidOperationException("encryption key missing"));
var ciphers = storeCipher == StoreCipher.Aes
? new[] { StoreCipher.Aes, StoreCipher.ChaCha }
: new[] { StoreCipher.ChaCha, StoreCipher.Aes };
var candidates = new List<(KeyGen Prf, StoreCipher Cipher)>();
var opts = GetOpts();
var prf = JsKeyGen(opts.JetStreamKey, accountName);
if (prf == null)
return (null, false, new InvalidOperationException("jetstream encryption key is not configured"));
foreach (var cipher in ciphers)
candidates.Add((prf, cipher));
var oldPrf = JsKeyGen(opts.JetStreamOldKey, accountName);
if (oldPrf != null)
{
foreach (var cipher in ciphers)
candidates.Add((oldPrf, cipher));
}
for (var i = 0; i < candidates.Count; i++)
{
try
{
var rb = candidates[i].Prf(Encoding.UTF8.GetBytes(context));
var kek = JetStreamFileStore.GenEncryptionKey(candidates[i].Cipher, rb);
var ns = kek.NonceSize;
if (encryptedKey.Length < ns || encryptedBuffer.Length < ns)
continue;
var seed = kek.Open(encryptedKey.AsSpan(0, ns), encryptedKey.AsSpan(ns));
var aek = JetStreamFileStore.GenEncryptionKey(candidates[i].Cipher, seed);
var plain = aek.Open(encryptedBuffer.AsSpan(0, ns), encryptedBuffer.AsSpan(ns));
return (plain, i > 0, null);
}
catch
{
// Try the next candidate.
}
}
return (null, false, new InvalidOperationException("unable to recover encrypted metadata"));
}
internal Exception? CheckStoreDir(JetStreamConfig cfg)
{
if (string.IsNullOrWhiteSpace(cfg.StoreDir))
return new InvalidOperationException("jetstream store directory is required");
try
{
Directory.CreateDirectory(cfg.StoreDir);
return null;
}
catch (Exception ex)
{
return ex;
}
}
internal Exception? InitJetStreamEncryption()
{
var opts = GetOpts();
if (!string.IsNullOrEmpty(opts.JetStreamKey) && !string.IsNullOrEmpty(opts.JetStreamTpm.KeysFile))
return new InvalidOperationException("JetStream encryption key may not be used with TPM options");
return null;
}
private Exception? EnableJetStreamInternal(JetStreamConfig cfg)
{
var encryptionErr = InitJetStreamEncryption();
if (encryptionErr != null)
return encryptionErr;
try
{
Directory.CreateDirectory(cfg.StoreDir);
}
catch (Exception ex)
{
return ex;
}
var js = new JetStream
{
Server = this,
Config = cfg,
Started = DateTime.UtcNow,
StandAlone = true,
};
_mu.EnterWriteLock();
try
{
_jetStream = js;
_info.JetStream = true;
_info.Domain = cfg.Domain;
}
finally
{
_mu.ExitWriteLock();
}
var err = EnableJetStreamAccounts();
if (err != null)
{
_mu.EnterWriteLock();
try
{
_jetStream = null;
_info.JetStream = false;
}
finally
{
_mu.ExitWriteLock();
}
}
return err;
}
internal bool CanExtendOtherDomain()
{
var opts = GetOpts();
var sysAcc = SystemAccount()?.GetName();
if (string.IsNullOrEmpty(sysAcc))
return false;
foreach (var remote in opts.LeafNode.Remotes)
{
if (!string.Equals(remote.LocalAccount, sysAcc, StringComparison.Ordinal))
continue;
foreach (var denyImport in remote.DenyImports)
{
if (SubscriptionIndex.SubjectIsSubsetMatch(denyImport, JsApiSubjects.JsAllApi))
return false;
}
return true;
}
return false;
}
internal void UpdateJetStreamInfoStatus(bool enabled)
{
_mu.EnterWriteLock();
try
{
_info.JetStream = enabled;
}
finally
{
_mu.ExitWriteLock();
}
}
internal Exception? RestartJetStream()
{
var opts = GetOpts();
var cfg = new JetStreamConfig
{
StoreDir = opts.StoreDir,
SyncInterval = opts.SyncInterval,
SyncAlways = opts.SyncAlways,
MaxMemory = opts.JetStreamMaxMemory,
MaxStore = opts.JetStreamMaxStore,
Domain = opts.JetStreamDomain,
Strict = !opts.NoJetStreamStrict,
};
Noticef("Restarting JetStream");
var err = EnableJetStream(cfg);
if (err != null)
{
Warnf("Can't start JetStream: {0}", err.Message);
_ = DisableJetStream();
return err;
}
UpdateJetStreamInfoStatus(true);
return null;
}
internal void CheckJetStreamExports()
{
if (SystemAccount() != null)
SetupJetStreamExports();
}
internal void SetupJetStreamExports()
{
var sys = SystemAccount();
if (sys == null)
return;
var err = sys.AddServiceExport(JsApiSubjects.JsAllApi, null);
if (err != null)
Warnf("Error setting up jetstream service exports: {0}", err.Message);
}
internal bool JetStreamOOSPending()
{
var js = _jetStream;
if (js == null)
return false;
js.Lock.EnterWriteLock();
try
{
var wasPending = js.Oos;
js.Oos = true;
return wasPending;
}
finally
{
js.Lock.ExitWriteLock();
}
}
internal void SetJetStreamDisabled()
{
var js = _jetStream;
if (js != null)
Interlocked.Exchange(ref js.Disabled, 1);
}
internal void HandleOutOfSpace(NatsStream? stream)
{
if (!JetStreamEnabled() || JetStreamOOSPending())
return;
if (stream != null)
Errorf("JetStream out of resources for stream {0}, will be DISABLED", stream.Config.Name);
else
Errorf("JetStream out of resources, will be DISABLED");
_ = Task.Run(() => DisableJetStream());
}
public Exception? DisableJetStream()
{
if (!JetStreamEnabled())
return null;
SetJetStreamDisabled();
UpdateJetStreamInfoStatus(false);
_mu.EnterWriteLock();
try
{
_jetStream = null;
}
finally
{
_mu.ExitWriteLock();
}
ShutdownJetStream();
ShutdownRaftNodes();
return null;
}
private Exception? EnableJetStreamAccounts()
{
if (GlobalAccountOnly())
{
var gacc = GlobalAccount();
if (gacc == null)
return new InvalidOperationException("global account not found");
gacc.JetStreamLimits ??= new Dictionary<string, object>(StringComparer.Ordinal)
{
[string.Empty] = new JetStreamAccountLimits
{
MaxMemory = -1,
MaxStore = -1,
MaxStreams = -1,
MaxConsumers = -1,
MaxAckPending = -1,
MemoryMaxStreamBytes = -1,
StoreMaxStreamBytes = -1,
},
};
return ConfigJetStream(gacc);
}
return ConfigAllJetStreamAccounts();
}
internal Exception? ConfigJetStream(Account? acc)
{
if (acc == null)
return null;
var jsLimits = acc.JetStreamLimits;
if (jsLimits != null)
return acc.EnableAllJetStreamServiceImportsAndMappings();
if (!ReferenceEquals(acc, SystemAccount()))
{
acc.JetStream = null;
return acc.EnableJetStreamInfoServiceImportOnly();
}
return null;
}
internal Exception? ConfigAllJetStreamAccounts()
{
CheckJetStreamExports();
if (_jetStream == null)
return null;
foreach (var acc in _accounts.Values)
{
var err = ConfigJetStream(acc);
if (err != null)
return err;
}
var storeDir = _jetStream.Config.StoreDir;
if (!Directory.Exists(storeDir))
return null;
foreach (var directory in Directory.EnumerateDirectories(storeDir))
{
var accountName = Path.GetFileName(directory);
if (string.IsNullOrWhiteSpace(accountName) || _accounts.ContainsKey(accountName))
continue;
var (resolved, _) = LookupAccount(accountName);
if (resolved == null)
continue;
var err = ConfigJetStream(resolved);
if (err != null)
return err;
}
return null;
}
public bool JetStreamEnabled()
{
var js = _jetStream;
return js != null && Interlocked.CompareExchange(ref js.Disabled, 0, 0) == 0;
}
public bool JetStreamEnabledForDomain()
{
if (JetStreamEnabled())
return true;
foreach (var value in _nodeToInfo.Values)
{
if (value is NodeInfo { Js: true })
return true;
}
return false;
}
public JetStreamConfig? JetStreamConfig()
{
var js = _jetStream;
if (js == null)
return null;
var cfg = js.Config;
return new JetStreamConfig
{
MaxMemory = cfg.MaxMemory,
MaxStore = cfg.MaxStore,
StoreDir = cfg.StoreDir,
SyncInterval = cfg.SyncInterval,
SyncAlways = cfg.SyncAlways,
Domain = cfg.Domain,
CompressOK = cfg.CompressOK,
UniqueTag = cfg.UniqueTag,
Strict = cfg.Strict,
};
}
public string StoreDir()
{
var js = _jetStream;
return js == null ? string.Empty : js.Config.StoreDir;
}
public int JetStreamNumAccounts()
{
var js = _jetStream;
if (js == null)
return 0;
js.Lock.EnterReadLock();
try
{
return js.Accounts.Count;
}
finally
{
js.Lock.ExitReadLock();
}
}
public (long MemReserved, long StoreReserved, Exception? Error) JetStreamReservedResources()
{
var js = _jetStream;
if (js == null)
return (-1, -1, new InvalidOperationException("jetstream not enabled"));
return (
Interlocked.Read(ref js.MemReserved),
Interlocked.Read(ref js.StoreReserved),
null);
}
internal JetStreamConfig DynJetStreamConfig(string storeDir, long maxStore, long maxMem)
{
var cfg = new JetStreamConfig();
if (!string.IsNullOrWhiteSpace(storeDir))
{
cfg.StoreDir = Path.Combine(storeDir, JetStreamStoreDir);
}
else
{
cfg.StoreDir = Path.Combine(Path.GetTempPath(), "nats", JetStreamStoreDir);
Warnf("Temporary storage directory used, data could be lost on system reboot");
}
var opts = GetOpts();
cfg.Strict = !opts.NoJetStreamStrict;
cfg.SyncInterval = opts.SyncInterval;
cfg.SyncAlways = opts.SyncAlways;
cfg.MaxStore = opts.MaxStoreSet && maxStore >= 0
? maxStore
: DiskAvailability.DiskAvailable(cfg.StoreDir);
if (opts.MaxMemSet && maxMem >= 0)
{
cfg.MaxMemory = maxMem;
}
else
{
var totalAvailable = GC.GetGCMemoryInfo().TotalAvailableMemoryBytes;
cfg.MaxMemory = totalAvailable > 0 && totalAvailable < long.MaxValue
? totalAvailable / 4 * 3
: JetStreamMaxMemDefault;
}
return cfg;
}
internal void ResourcesExceededError(StorageType storeType)
{
var didAlert = false;
lock (_resourceErrorLock)
{
var now = DateTime.UtcNow;
if (now - _resourceErrorLastUtc > TimeSpan.FromSeconds(10))
{
var storeName = storeType switch
{
StorageType.MemoryStorage => "memory",
StorageType.FileStorage => "file",
_ => storeType.ToString().ToLowerInvariant(),
};
Errorf("JetStream {0} resource limits exceeded for server", storeName);
_resourceErrorLastUtc = now;
didAlert = true;
}
}
if (!didAlert)
return;
var js = GetJetStreamState();
if (js?.Cluster is JetStreamCluster { Meta: not null } cluster)
cluster.Meta.StepDown();
}
internal void HandleWritePermissionError()
{
if (!JetStreamEnabled())
return;
Errorf("File system permission denied while writing, disabling JetStream");
_ = Task.Run(() => DisableJetStream());
}
internal JetStreamEngine? GetJetStream() =>
_jetStream == null ? null : new JetStreamEngine(_jetStream);
internal JetStream? GetJetStreamState() => _jetStream;
}

View File

@@ -253,6 +253,9 @@ public sealed partial class NatsServer : INatsServer
private long _cproto; // count of INFO-capable clients
private readonly ConcurrentDictionary<string, object?> _nodeToInfo = new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, object?> _raftNodes = new(StringComparer.Ordinal);
private JetStream? _jetStream;
private readonly Lock _resourceErrorLock = new();
private DateTime _resourceErrorLastUtc;
private readonly Dictionary<string, string> _routesToSelf = [];
private string _routeTlsName = string.Empty;
private INetResolver? _routeResolver;