feat(batch27): implement jetstream bootstrap and account wiring
This commit is contained in:
@@ -0,0 +1,62 @@
|
|||||||
|
namespace ZB.MOM.NatsNet.Server;
|
||||||
|
|
||||||
|
public sealed partial class Account
|
||||||
|
{
|
||||||
|
internal Exception? EnableAllJetStreamServiceImportsAndMappings()
|
||||||
|
{
|
||||||
|
_mu.EnterReadLock();
|
||||||
|
var server = Server as NatsServer;
|
||||||
|
_mu.ExitReadLock();
|
||||||
|
|
||||||
|
if (server == null)
|
||||||
|
return new InvalidOperationException("jetstream account not registered");
|
||||||
|
|
||||||
|
var systemAccount = server.SystemAccount();
|
||||||
|
var destinationName = systemAccount?.Name ?? string.Empty;
|
||||||
|
|
||||||
|
if (systemAccount != null && !ServiceImportExists(destinationName, JsApiSubjects.JsAllApi))
|
||||||
|
{
|
||||||
|
var err = AddServiceImport(systemAccount, JsApiSubjects.JsAllApi, JsApiSubjects.JsAllApi);
|
||||||
|
if (err != null)
|
||||||
|
return new InvalidOperationException($"error setting up jetstream service imports for account: {err.Message}", err);
|
||||||
|
}
|
||||||
|
|
||||||
|
var domain = server.GetOpts().JetStreamDomain;
|
||||||
|
if (!string.IsNullOrWhiteSpace(domain))
|
||||||
|
{
|
||||||
|
var mappings = new Dictionary<string, string>(StringComparer.Ordinal)
|
||||||
|
{
|
||||||
|
[$"$JS.{domain}.API.>"] = JsApiSubjects.JsAllApi,
|
||||||
|
[$"$JS.{domain}.API.INFO"] = JsApiSubjects.JsApiAccountInfo,
|
||||||
|
};
|
||||||
|
|
||||||
|
_mu.EnterReadLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
foreach (var mapping in _mappings)
|
||||||
|
mappings.Remove(mapping.Source);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_mu.ExitReadLock();
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach (var (src, dest) in mappings)
|
||||||
|
{
|
||||||
|
var err = AddMapping(src, dest);
|
||||||
|
if (err != null)
|
||||||
|
server.Errorf("Error adding JetStream domain mapping: {0}", err.Message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
internal Exception? EnableJetStreamInfoServiceImportOnly()
|
||||||
|
{
|
||||||
|
if (ServiceImportShadowed(JsApiSubjects.JsApiAccountInfo))
|
||||||
|
return null;
|
||||||
|
|
||||||
|
return EnableAllJetStreamServiceImportsAndMappings();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -32,7 +32,7 @@ namespace ZB.MOM.NatsNet.Server;
|
|||||||
/// <see cref="ClientConnection"/> can interact with it without a hard dependency.
|
/// <see cref="ClientConnection"/> can interact with it without a hard dependency.
|
||||||
/// Mirrors Go <c>Account</c> struct in server/accounts.go.
|
/// Mirrors Go <c>Account</c> struct in server/accounts.go.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public sealed class Account : INatsAccount
|
public sealed partial class Account : INatsAccount
|
||||||
{
|
{
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
// Constants
|
// Constants
|
||||||
@@ -261,7 +261,7 @@ public sealed class Account : INatsAccount
|
|||||||
/// JetStream account state. Mirrors Go <c>js *jsAccount</c>.
|
/// JetStream account state. Mirrors Go <c>js *jsAccount</c>.
|
||||||
/// TODO: session 19 — JetStream implementation.
|
/// TODO: session 19 — JetStream implementation.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
internal object? JetStream { get; set; }
|
internal JsAccount? JetStream { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Per-domain JetStream limits. Mirrors Go <c>jsLimits map[string]JetStreamAccountLimits</c>.
|
/// Per-domain JetStream limits. Mirrors Go <c>jsLimits map[string]JetStreamAccountLimits</c>.
|
||||||
@@ -2279,7 +2279,7 @@ public sealed class Account : INatsAccount
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (sid is { Length: > 0 } && InternalClient != null)
|
if (sid is { Length: > 0 } && InternalClient != null)
|
||||||
InternalClient.ProcessUnsub(sid);
|
InternalClient.RemoveSubscriptionBySid(sid);
|
||||||
|
|
||||||
if (tracking && requestor != null && !delivered)
|
if (tracking && requestor != null && !delivered)
|
||||||
SendBackendErrorTrackingLatency(serviceImport, reason);
|
SendBackendErrorTrackingLatency(serviceImport, reason);
|
||||||
@@ -2355,7 +2355,7 @@ public sealed class Account : INatsAccount
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (sid != null && InternalClient != null)
|
if (sid != null && InternalClient != null)
|
||||||
InternalClient.ProcessUnsub(sid);
|
InternalClient.RemoveSubscriptionBySid(sid);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -2548,7 +2548,7 @@ public sealed class Account : INatsAccount
|
|||||||
if (InternalClient == null && Server is NatsServer server)
|
if (InternalClient == null && Server is NatsServer server)
|
||||||
{
|
{
|
||||||
InternalClient = server.CreateInternalAccountClient();
|
InternalClient = server.CreateInternalAccountClient();
|
||||||
InternalClient.Account = this;
|
InternalClient.SetAccount(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
return InternalClient;
|
return InternalClient;
|
||||||
@@ -2573,7 +2573,7 @@ public sealed class Account : INatsAccount
|
|||||||
_mu.EnterReadLock();
|
_mu.EnterReadLock();
|
||||||
var internalClient = InternalClient;
|
var internalClient = InternalClient;
|
||||||
_mu.ExitReadLock();
|
_mu.ExitReadLock();
|
||||||
internalClient?.ProcessUnsub(sub.Sid);
|
internalClient?.RemoveSubscriptionBySid(sub.Sid);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -2685,7 +2685,7 @@ public sealed class Account : INatsAccount
|
|||||||
return;
|
return;
|
||||||
|
|
||||||
foreach (var sid in subscriptionIds)
|
foreach (var sid in subscriptionIds)
|
||||||
internalClient.ProcessUnsub(sid);
|
internalClient.RemoveSubscriptionBySid(sid);
|
||||||
|
|
||||||
internalClient.CloseConnection(ClosedState.InternalClient);
|
internalClient.CloseConnection(ClosedState.InternalClient);
|
||||||
}
|
}
|
||||||
@@ -4170,7 +4170,7 @@ public sealed class Account : INatsAccount
|
|||||||
return new ClientInfo
|
return new ClientInfo
|
||||||
{
|
{
|
||||||
Id = client.Cid,
|
Id = client.Cid,
|
||||||
Account = client.Account?.Name ?? string.Empty,
|
Account = client.Account()?.Name ?? string.Empty,
|
||||||
Name = client.Opts.Name ?? string.Empty,
|
Name = client.Opts.Name ?? string.Empty,
|
||||||
Rtt = client.GetRttValue(),
|
Rtt = client.GetRttValue(),
|
||||||
Start = client.Start == default ? string.Empty : client.Start.ToUniversalTime().ToString("O"),
|
Start = client.Start == default ? string.Empty : client.Start.ToUniversalTime().ToString("O"),
|
||||||
|
|||||||
@@ -1686,7 +1686,7 @@ public sealed partial class ClientConnection
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
internal void ProcessUnsub(byte[] sid)
|
internal void RemoveSubscriptionBySid(byte[] sid)
|
||||||
{
|
{
|
||||||
lock (_mu)
|
lock (_mu)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -182,7 +182,7 @@ public sealed class JetStreamAccountStats
|
|||||||
/// The main JetStream engine, one per server.
|
/// The main JetStream engine, one per server.
|
||||||
/// Mirrors <c>jetStream</c> struct in server/jetstream.go.
|
/// Mirrors <c>jetStream</c> struct in server/jetstream.go.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
internal sealed class JetStream
|
internal sealed partial class JetStream
|
||||||
{
|
{
|
||||||
// Atomic counters (use Interlocked for thread-safety)
|
// Atomic counters (use Interlocked for thread-safety)
|
||||||
public long ApiInflight;
|
public long ApiInflight;
|
||||||
@@ -238,7 +238,7 @@ internal sealed class JsaStorage
|
|||||||
/// A JetStream-enabled account, holding streams, limits and usage tracking.
|
/// A JetStream-enabled account, holding streams, limits and usage tracking.
|
||||||
/// Mirrors <c>jsAccount</c> in server/jetstream.go.
|
/// Mirrors <c>jsAccount</c> in server/jetstream.go.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
internal sealed class JsAccount
|
internal sealed partial class JsAccount
|
||||||
{
|
{
|
||||||
private readonly ReaderWriterLockSlim _mu = new();
|
private readonly ReaderWriterLockSlim _mu = new();
|
||||||
|
|
||||||
|
|||||||
427
dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs
Normal file
427
dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs
Normal file
@@ -0,0 +1,427 @@
|
|||||||
|
using System.Security.Cryptography;
|
||||||
|
using System.Text;
|
||||||
|
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
|
||||||
|
|
||||||
|
namespace ZB.MOM.NatsNet.Server;
|
||||||
|
|
||||||
|
public sealed partial class NatsServer
|
||||||
|
{
|
||||||
|
private const string JetStreamStoreDir = "jetstream";
|
||||||
|
|
||||||
|
public Exception? EnableJetStream(JetStreamConfig? config)
|
||||||
|
{
|
||||||
|
if (JetStreamEnabled())
|
||||||
|
return new InvalidOperationException("jetstream already enabled");
|
||||||
|
|
||||||
|
Noticef("Starting JetStream");
|
||||||
|
|
||||||
|
if (config == null || config.MaxMemory <= 0 || config.MaxStore <= 0)
|
||||||
|
{
|
||||||
|
config = new JetStreamConfig
|
||||||
|
{
|
||||||
|
StoreDir = string.IsNullOrWhiteSpace(GetOpts().StoreDir)
|
||||||
|
? Path.Combine(Path.GetTempPath(), JetStreamStoreDir)
|
||||||
|
: Path.Combine(GetOpts().StoreDir, JetStreamStoreDir),
|
||||||
|
MaxMemory = GetOpts().JetStreamMaxMemory > 0 ? GetOpts().JetStreamMaxMemory : 1,
|
||||||
|
MaxStore = GetOpts().JetStreamMaxStore > 0 ? GetOpts().JetStreamMaxStore : 1,
|
||||||
|
SyncInterval = GetOpts().SyncInterval,
|
||||||
|
SyncAlways = GetOpts().SyncAlways,
|
||||||
|
Domain = GetOpts().JetStreamDomain,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
else if (!string.IsNullOrWhiteSpace(config.StoreDir))
|
||||||
|
{
|
||||||
|
config.StoreDir = Path.Combine(config.StoreDir, JetStreamStoreDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (string.IsNullOrWhiteSpace(config.StoreDir))
|
||||||
|
{
|
||||||
|
config.StoreDir = Path.Combine(Path.GetTempPath(), JetStreamStoreDir);
|
||||||
|
Warnf("Temporary storage directory used, data could be lost on system reboot");
|
||||||
|
}
|
||||||
|
|
||||||
|
var err = CheckStoreDir(config);
|
||||||
|
if (err != null)
|
||||||
|
return err;
|
||||||
|
|
||||||
|
return EnableJetStreamInternal(config);
|
||||||
|
}
|
||||||
|
|
||||||
|
private KeyGen? JsKeyGen(string jsKey, string info)
|
||||||
|
{
|
||||||
|
if (string.IsNullOrEmpty(jsKey))
|
||||||
|
return null;
|
||||||
|
|
||||||
|
return context =>
|
||||||
|
{
|
||||||
|
using var hmac = new HMACSHA256(Encoding.UTF8.GetBytes(jsKey));
|
||||||
|
hmac.TransformBlock(Encoding.UTF8.GetBytes(info), 0, info.Length, null, 0);
|
||||||
|
hmac.TransformFinalBlock(context, 0, context.Length);
|
||||||
|
return hmac.Hash ?? [];
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
internal (byte[]? Plain, bool UsedFallback, Exception? Error) DecryptMeta(
|
||||||
|
StoreCipher storeCipher,
|
||||||
|
byte[] encryptedKey,
|
||||||
|
byte[] encryptedBuffer,
|
||||||
|
string accountName,
|
||||||
|
string context)
|
||||||
|
{
|
||||||
|
if (encryptedKey.Length == 0)
|
||||||
|
return (null, false, new InvalidOperationException("encryption key missing"));
|
||||||
|
|
||||||
|
var ciphers = storeCipher == StoreCipher.Aes
|
||||||
|
? new[] { StoreCipher.Aes, StoreCipher.ChaCha }
|
||||||
|
: new[] { StoreCipher.ChaCha, StoreCipher.Aes };
|
||||||
|
|
||||||
|
var candidates = new List<(KeyGen Prf, StoreCipher Cipher)>();
|
||||||
|
var opts = GetOpts();
|
||||||
|
|
||||||
|
var prf = JsKeyGen(opts.JetStreamKey, accountName);
|
||||||
|
if (prf == null)
|
||||||
|
return (null, false, new InvalidOperationException("jetstream encryption key is not configured"));
|
||||||
|
|
||||||
|
foreach (var cipher in ciphers)
|
||||||
|
candidates.Add((prf, cipher));
|
||||||
|
|
||||||
|
var oldPrf = JsKeyGen(opts.JetStreamOldKey, accountName);
|
||||||
|
if (oldPrf != null)
|
||||||
|
{
|
||||||
|
foreach (var cipher in ciphers)
|
||||||
|
candidates.Add((oldPrf, cipher));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (var i = 0; i < candidates.Count; i++)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var rb = candidates[i].Prf(Encoding.UTF8.GetBytes(context));
|
||||||
|
var kek = JetStreamFileStore.GenEncryptionKey(candidates[i].Cipher, rb);
|
||||||
|
var ns = kek.NonceSize;
|
||||||
|
if (encryptedKey.Length < ns || encryptedBuffer.Length < ns)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
var seed = kek.Open(encryptedKey.AsSpan(0, ns), encryptedKey.AsSpan(ns));
|
||||||
|
var aek = JetStreamFileStore.GenEncryptionKey(candidates[i].Cipher, seed);
|
||||||
|
var plain = aek.Open(encryptedBuffer.AsSpan(0, ns), encryptedBuffer.AsSpan(ns));
|
||||||
|
return (plain, i > 0, null);
|
||||||
|
}
|
||||||
|
catch
|
||||||
|
{
|
||||||
|
// Try the next candidate.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return (null, false, new InvalidOperationException("unable to recover encrypted metadata"));
|
||||||
|
}
|
||||||
|
|
||||||
|
internal Exception? CheckStoreDir(JetStreamConfig cfg)
|
||||||
|
{
|
||||||
|
if (string.IsNullOrWhiteSpace(cfg.StoreDir))
|
||||||
|
return new InvalidOperationException("jetstream store directory is required");
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
Directory.CreateDirectory(cfg.StoreDir);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
return ex;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal Exception? InitJetStreamEncryption()
|
||||||
|
{
|
||||||
|
var opts = GetOpts();
|
||||||
|
|
||||||
|
if (!string.IsNullOrEmpty(opts.JetStreamKey) && !string.IsNullOrEmpty(opts.JetStreamTpm.KeysFile))
|
||||||
|
return new InvalidOperationException("JetStream encryption key may not be used with TPM options");
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Exception? EnableJetStreamInternal(JetStreamConfig cfg)
|
||||||
|
{
|
||||||
|
var encryptionErr = InitJetStreamEncryption();
|
||||||
|
if (encryptionErr != null)
|
||||||
|
return encryptionErr;
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
Directory.CreateDirectory(cfg.StoreDir);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
return ex;
|
||||||
|
}
|
||||||
|
|
||||||
|
var js = new JetStream
|
||||||
|
{
|
||||||
|
Server = this,
|
||||||
|
Config = cfg,
|
||||||
|
Started = DateTime.UtcNow,
|
||||||
|
StandAlone = true,
|
||||||
|
};
|
||||||
|
|
||||||
|
_mu.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
_jetStream = js;
|
||||||
|
_info.JetStream = true;
|
||||||
|
_info.Domain = cfg.Domain;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_mu.ExitWriteLock();
|
||||||
|
}
|
||||||
|
|
||||||
|
var err = EnableJetStreamAccounts();
|
||||||
|
if (err != null)
|
||||||
|
{
|
||||||
|
_mu.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
_jetStream = null;
|
||||||
|
_info.JetStream = false;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_mu.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return err;
|
||||||
|
}
|
||||||
|
|
||||||
|
internal bool CanExtendOtherDomain()
|
||||||
|
{
|
||||||
|
var opts = GetOpts();
|
||||||
|
var sysAcc = SystemAccount()?.GetName();
|
||||||
|
if (string.IsNullOrEmpty(sysAcc))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
foreach (var remote in opts.LeafNode.Remotes)
|
||||||
|
{
|
||||||
|
if (!string.Equals(remote.LocalAccount, sysAcc, StringComparison.Ordinal))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
foreach (var denyImport in remote.DenyImports)
|
||||||
|
{
|
||||||
|
if (SubscriptionIndex.SubjectIsSubsetMatch(denyImport, JsApiSubjects.JsAllApi))
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
internal void UpdateJetStreamInfoStatus(bool enabled)
|
||||||
|
{
|
||||||
|
_mu.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
_info.JetStream = enabled;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_mu.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal Exception? RestartJetStream()
|
||||||
|
{
|
||||||
|
var opts = GetOpts();
|
||||||
|
var cfg = new JetStreamConfig
|
||||||
|
{
|
||||||
|
StoreDir = opts.StoreDir,
|
||||||
|
SyncInterval = opts.SyncInterval,
|
||||||
|
SyncAlways = opts.SyncAlways,
|
||||||
|
MaxMemory = opts.JetStreamMaxMemory,
|
||||||
|
MaxStore = opts.JetStreamMaxStore,
|
||||||
|
Domain = opts.JetStreamDomain,
|
||||||
|
Strict = !opts.NoJetStreamStrict,
|
||||||
|
};
|
||||||
|
|
||||||
|
Noticef("Restarting JetStream");
|
||||||
|
var err = EnableJetStream(cfg);
|
||||||
|
if (err != null)
|
||||||
|
{
|
||||||
|
Warnf("Can't start JetStream: {0}", err.Message);
|
||||||
|
_ = DisableJetStream();
|
||||||
|
return err;
|
||||||
|
}
|
||||||
|
|
||||||
|
UpdateJetStreamInfoStatus(true);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
internal void CheckJetStreamExports()
|
||||||
|
{
|
||||||
|
if (SystemAccount() != null)
|
||||||
|
SetupJetStreamExports();
|
||||||
|
}
|
||||||
|
|
||||||
|
internal void SetupJetStreamExports()
|
||||||
|
{
|
||||||
|
var sys = SystemAccount();
|
||||||
|
if (sys == null)
|
||||||
|
return;
|
||||||
|
|
||||||
|
var err = sys.AddServiceExport(JsApiSubjects.JsAllApi, null);
|
||||||
|
if (err != null)
|
||||||
|
Warnf("Error setting up jetstream service exports: {0}", err.Message);
|
||||||
|
}
|
||||||
|
|
||||||
|
internal bool JetStreamOOSPending()
|
||||||
|
{
|
||||||
|
var js = _jetStream;
|
||||||
|
if (js == null)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
js.Lock.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var wasPending = js.Oos;
|
||||||
|
js.Oos = true;
|
||||||
|
return wasPending;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
js.Lock.ExitWriteLock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal void SetJetStreamDisabled()
|
||||||
|
{
|
||||||
|
var js = _jetStream;
|
||||||
|
if (js != null)
|
||||||
|
Interlocked.Exchange(ref js.Disabled, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
internal void HandleOutOfSpace(NatsStream? stream)
|
||||||
|
{
|
||||||
|
if (!JetStreamEnabled() || JetStreamOOSPending())
|
||||||
|
return;
|
||||||
|
|
||||||
|
if (stream != null)
|
||||||
|
Errorf("JetStream out of resources for stream {0}, will be DISABLED", stream.Config.Name);
|
||||||
|
else
|
||||||
|
Errorf("JetStream out of resources, will be DISABLED");
|
||||||
|
|
||||||
|
_ = Task.Run(() => DisableJetStream());
|
||||||
|
}
|
||||||
|
|
||||||
|
public Exception? DisableJetStream()
|
||||||
|
{
|
||||||
|
if (!JetStreamEnabled())
|
||||||
|
return null;
|
||||||
|
|
||||||
|
SetJetStreamDisabled();
|
||||||
|
UpdateJetStreamInfoStatus(false);
|
||||||
|
|
||||||
|
_mu.EnterWriteLock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
_jetStream = null;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_mu.ExitWriteLock();
|
||||||
|
}
|
||||||
|
|
||||||
|
ShutdownJetStream();
|
||||||
|
ShutdownRaftNodes();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Exception? EnableJetStreamAccounts()
|
||||||
|
{
|
||||||
|
if (GlobalAccountOnly())
|
||||||
|
{
|
||||||
|
var gacc = GlobalAccount();
|
||||||
|
if (gacc == null)
|
||||||
|
return new InvalidOperationException("global account not found");
|
||||||
|
|
||||||
|
gacc.JetStreamLimits ??= new Dictionary<string, object>(StringComparer.Ordinal)
|
||||||
|
{
|
||||||
|
[string.Empty] = new JetStreamAccountLimits
|
||||||
|
{
|
||||||
|
MaxMemory = -1,
|
||||||
|
MaxStore = -1,
|
||||||
|
MaxStreams = -1,
|
||||||
|
MaxConsumers = -1,
|
||||||
|
MaxAckPending = -1,
|
||||||
|
MemoryMaxStreamBytes = -1,
|
||||||
|
StoreMaxStreamBytes = -1,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
return ConfigJetStream(gacc);
|
||||||
|
}
|
||||||
|
|
||||||
|
return ConfigAllJetStreamAccounts();
|
||||||
|
}
|
||||||
|
|
||||||
|
internal Exception? ConfigJetStream(Account? acc)
|
||||||
|
{
|
||||||
|
if (acc == null)
|
||||||
|
return null;
|
||||||
|
|
||||||
|
var jsLimits = acc.JetStreamLimits;
|
||||||
|
if (jsLimits != null)
|
||||||
|
return acc.EnableAllJetStreamServiceImportsAndMappings();
|
||||||
|
|
||||||
|
if (!ReferenceEquals(acc, SystemAccount()))
|
||||||
|
{
|
||||||
|
acc.JetStream = null;
|
||||||
|
return acc.EnableJetStreamInfoServiceImportOnly();
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
internal Exception? ConfigAllJetStreamAccounts()
|
||||||
|
{
|
||||||
|
CheckJetStreamExports();
|
||||||
|
if (_jetStream == null)
|
||||||
|
return null;
|
||||||
|
|
||||||
|
foreach (var acc in _accounts.Values)
|
||||||
|
{
|
||||||
|
var err = ConfigJetStream(acc);
|
||||||
|
if (err != null)
|
||||||
|
return err;
|
||||||
|
}
|
||||||
|
|
||||||
|
var storeDir = _jetStream.Config.StoreDir;
|
||||||
|
if (!Directory.Exists(storeDir))
|
||||||
|
return null;
|
||||||
|
|
||||||
|
foreach (var directory in Directory.EnumerateDirectories(storeDir))
|
||||||
|
{
|
||||||
|
var accountName = Path.GetFileName(directory);
|
||||||
|
if (string.IsNullOrWhiteSpace(accountName) || _accounts.ContainsKey(accountName))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
var (resolved, _) = LookupAccount(accountName);
|
||||||
|
if (resolved == null)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
var err = ConfigJetStream(resolved);
|
||||||
|
if (err != null)
|
||||||
|
return err;
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool JetStreamEnabled()
|
||||||
|
{
|
||||||
|
var js = _jetStream;
|
||||||
|
return js != null && Interlocked.CompareExchange(ref js.Disabled, 0, 0) == 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -253,6 +253,7 @@ public sealed partial class NatsServer : INatsServer
|
|||||||
private long _cproto; // count of INFO-capable clients
|
private long _cproto; // count of INFO-capable clients
|
||||||
private readonly ConcurrentDictionary<string, object?> _nodeToInfo = new(StringComparer.Ordinal);
|
private readonly ConcurrentDictionary<string, object?> _nodeToInfo = new(StringComparer.Ordinal);
|
||||||
private readonly ConcurrentDictionary<string, object?> _raftNodes = new(StringComparer.Ordinal);
|
private readonly ConcurrentDictionary<string, object?> _raftNodes = new(StringComparer.Ordinal);
|
||||||
|
private JetStream? _jetStream;
|
||||||
private readonly Dictionary<string, string> _routesToSelf = [];
|
private readonly Dictionary<string, string> _routesToSelf = [];
|
||||||
private INetResolver? _routeResolver;
|
private INetResolver? _routeResolver;
|
||||||
private readonly ConcurrentDictionary<string, object?> _rateLimitLogging = new();
|
private readonly ConcurrentDictionary<string, object?> _rateLimitLogging = new();
|
||||||
|
|||||||
BIN
porting.db
BIN
porting.db
Binary file not shown.
Reference in New Issue
Block a user