diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs new file mode 100644 index 0000000..c20d2d9 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.JetStream.cs @@ -0,0 +1,62 @@ +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class Account +{ + internal Exception? EnableAllJetStreamServiceImportsAndMappings() + { + _mu.EnterReadLock(); + var server = Server as NatsServer; + _mu.ExitReadLock(); + + if (server == null) + return new InvalidOperationException("jetstream account not registered"); + + var systemAccount = server.SystemAccount(); + var destinationName = systemAccount?.Name ?? string.Empty; + + if (systemAccount != null && !ServiceImportExists(destinationName, JsApiSubjects.JsAllApi)) + { + var err = AddServiceImport(systemAccount, JsApiSubjects.JsAllApi, JsApiSubjects.JsAllApi); + if (err != null) + return new InvalidOperationException($"error setting up jetstream service imports for account: {err.Message}", err); + } + + var domain = server.GetOpts().JetStreamDomain; + if (!string.IsNullOrWhiteSpace(domain)) + { + var mappings = new Dictionary(StringComparer.Ordinal) + { + [$"$JS.{domain}.API.>"] = JsApiSubjects.JsAllApi, + [$"$JS.{domain}.API.INFO"] = JsApiSubjects.JsApiAccountInfo, + }; + + _mu.EnterReadLock(); + try + { + foreach (var mapping in _mappings) + mappings.Remove(mapping.Source); + } + finally + { + _mu.ExitReadLock(); + } + + foreach (var (src, dest) in mappings) + { + var err = AddMapping(src, dest); + if (err != null) + server.Errorf("Error adding JetStream domain mapping: {0}", err.Message); + } + } + + return null; + } + + internal Exception? EnableJetStreamInfoServiceImportOnly() + { + if (ServiceImportShadowed(JsApiSubjects.JsApiAccountInfo)) + return null; + + return EnableAllJetStreamServiceImportsAndMappings(); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs index 9718c11..fb75725 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs @@ -32,7 +32,7 @@ namespace ZB.MOM.NatsNet.Server; /// can interact with it without a hard dependency. /// Mirrors Go Account struct in server/accounts.go. /// -public sealed class Account : INatsAccount +public sealed partial class Account : INatsAccount { // ------------------------------------------------------------------------- // Constants @@ -261,7 +261,7 @@ public sealed class Account : INatsAccount /// JetStream account state. Mirrors Go js *jsAccount. /// TODO: session 19 — JetStream implementation. /// - internal object? JetStream { get; set; } + internal JsAccount? JetStream { get; set; } /// /// Per-domain JetStream limits. Mirrors Go jsLimits map[string]JetStreamAccountLimits. @@ -2279,7 +2279,7 @@ public sealed class Account : INatsAccount } if (sid is { Length: > 0 } && InternalClient != null) - InternalClient.ProcessUnsub(sid); + InternalClient.RemoveSubscriptionBySid(sid); if (tracking && requestor != null && !delivered) SendBackendErrorTrackingLatency(serviceImport, reason); @@ -2355,7 +2355,7 @@ public sealed class Account : INatsAccount } if (sid != null && InternalClient != null) - InternalClient.ProcessUnsub(sid); + InternalClient.RemoveSubscriptionBySid(sid); } /// @@ -2548,7 +2548,7 @@ public sealed class Account : INatsAccount if (InternalClient == null && Server is NatsServer server) { InternalClient = server.CreateInternalAccountClient(); - InternalClient.Account = this; + InternalClient.SetAccount(this); } return InternalClient; @@ -2573,7 +2573,7 @@ public sealed class Account : INatsAccount _mu.EnterReadLock(); var internalClient = InternalClient; _mu.ExitReadLock(); - internalClient?.ProcessUnsub(sub.Sid); + internalClient?.RemoveSubscriptionBySid(sub.Sid); } /// @@ -2685,7 +2685,7 @@ public sealed class Account : INatsAccount return; foreach (var sid in subscriptionIds) - internalClient.ProcessUnsub(sid); + internalClient.RemoveSubscriptionBySid(sid); internalClient.CloseConnection(ClosedState.InternalClient); } @@ -4170,7 +4170,7 @@ public sealed class Account : INatsAccount return new ClientInfo { Id = client.Cid, - Account = client.Account?.Name ?? string.Empty, + Account = client.Account()?.Name ?? string.Empty, Name = client.Opts.Name ?? string.Empty, Rtt = client.GetRttValue(), Start = client.Start == default ? string.Empty : client.Start.ToUniversalTime().ToString("O"), diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs index 7888792..7e43765 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs @@ -1686,7 +1686,7 @@ public sealed partial class ClientConnection } } - internal void ProcessUnsub(byte[] sid) + internal void RemoveSubscriptionBySid(byte[] sid) { lock (_mu) { diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamTypes.cs index 658ffd0..d7d2923 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamTypes.cs @@ -182,7 +182,7 @@ public sealed class JetStreamAccountStats /// The main JetStream engine, one per server. /// Mirrors jetStream struct in server/jetstream.go. /// -internal sealed class JetStream +internal sealed partial class JetStream { // Atomic counters (use Interlocked for thread-safety) public long ApiInflight; @@ -238,7 +238,7 @@ internal sealed class JsaStorage /// A JetStream-enabled account, holding streams, limits and usage tracking. /// Mirrors jsAccount in server/jetstream.go. /// -internal sealed class JsAccount +internal sealed partial class JsAccount { private readonly ReaderWriterLockSlim _mu = new(); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs new file mode 100644 index 0000000..774daa8 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamCore.cs @@ -0,0 +1,427 @@ +using System.Security.Cryptography; +using System.Text; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + private const string JetStreamStoreDir = "jetstream"; + + public Exception? EnableJetStream(JetStreamConfig? config) + { + if (JetStreamEnabled()) + return new InvalidOperationException("jetstream already enabled"); + + Noticef("Starting JetStream"); + + if (config == null || config.MaxMemory <= 0 || config.MaxStore <= 0) + { + config = new JetStreamConfig + { + StoreDir = string.IsNullOrWhiteSpace(GetOpts().StoreDir) + ? Path.Combine(Path.GetTempPath(), JetStreamStoreDir) + : Path.Combine(GetOpts().StoreDir, JetStreamStoreDir), + MaxMemory = GetOpts().JetStreamMaxMemory > 0 ? GetOpts().JetStreamMaxMemory : 1, + MaxStore = GetOpts().JetStreamMaxStore > 0 ? GetOpts().JetStreamMaxStore : 1, + SyncInterval = GetOpts().SyncInterval, + SyncAlways = GetOpts().SyncAlways, + Domain = GetOpts().JetStreamDomain, + }; + } + else if (!string.IsNullOrWhiteSpace(config.StoreDir)) + { + config.StoreDir = Path.Combine(config.StoreDir, JetStreamStoreDir); + } + + if (string.IsNullOrWhiteSpace(config.StoreDir)) + { + config.StoreDir = Path.Combine(Path.GetTempPath(), JetStreamStoreDir); + Warnf("Temporary storage directory used, data could be lost on system reboot"); + } + + var err = CheckStoreDir(config); + if (err != null) + return err; + + return EnableJetStreamInternal(config); + } + + private KeyGen? JsKeyGen(string jsKey, string info) + { + if (string.IsNullOrEmpty(jsKey)) + return null; + + return context => + { + using var hmac = new HMACSHA256(Encoding.UTF8.GetBytes(jsKey)); + hmac.TransformBlock(Encoding.UTF8.GetBytes(info), 0, info.Length, null, 0); + hmac.TransformFinalBlock(context, 0, context.Length); + return hmac.Hash ?? []; + }; + } + + internal (byte[]? Plain, bool UsedFallback, Exception? Error) DecryptMeta( + StoreCipher storeCipher, + byte[] encryptedKey, + byte[] encryptedBuffer, + string accountName, + string context) + { + if (encryptedKey.Length == 0) + return (null, false, new InvalidOperationException("encryption key missing")); + + var ciphers = storeCipher == StoreCipher.Aes + ? new[] { StoreCipher.Aes, StoreCipher.ChaCha } + : new[] { StoreCipher.ChaCha, StoreCipher.Aes }; + + var candidates = new List<(KeyGen Prf, StoreCipher Cipher)>(); + var opts = GetOpts(); + + var prf = JsKeyGen(opts.JetStreamKey, accountName); + if (prf == null) + return (null, false, new InvalidOperationException("jetstream encryption key is not configured")); + + foreach (var cipher in ciphers) + candidates.Add((prf, cipher)); + + var oldPrf = JsKeyGen(opts.JetStreamOldKey, accountName); + if (oldPrf != null) + { + foreach (var cipher in ciphers) + candidates.Add((oldPrf, cipher)); + } + + for (var i = 0; i < candidates.Count; i++) + { + try + { + var rb = candidates[i].Prf(Encoding.UTF8.GetBytes(context)); + var kek = JetStreamFileStore.GenEncryptionKey(candidates[i].Cipher, rb); + var ns = kek.NonceSize; + if (encryptedKey.Length < ns || encryptedBuffer.Length < ns) + continue; + + var seed = kek.Open(encryptedKey.AsSpan(0, ns), encryptedKey.AsSpan(ns)); + var aek = JetStreamFileStore.GenEncryptionKey(candidates[i].Cipher, seed); + var plain = aek.Open(encryptedBuffer.AsSpan(0, ns), encryptedBuffer.AsSpan(ns)); + return (plain, i > 0, null); + } + catch + { + // Try the next candidate. + } + } + + return (null, false, new InvalidOperationException("unable to recover encrypted metadata")); + } + + internal Exception? CheckStoreDir(JetStreamConfig cfg) + { + if (string.IsNullOrWhiteSpace(cfg.StoreDir)) + return new InvalidOperationException("jetstream store directory is required"); + + try + { + Directory.CreateDirectory(cfg.StoreDir); + return null; + } + catch (Exception ex) + { + return ex; + } + } + + internal Exception? InitJetStreamEncryption() + { + var opts = GetOpts(); + + if (!string.IsNullOrEmpty(opts.JetStreamKey) && !string.IsNullOrEmpty(opts.JetStreamTpm.KeysFile)) + return new InvalidOperationException("JetStream encryption key may not be used with TPM options"); + + return null; + } + + private Exception? EnableJetStreamInternal(JetStreamConfig cfg) + { + var encryptionErr = InitJetStreamEncryption(); + if (encryptionErr != null) + return encryptionErr; + + try + { + Directory.CreateDirectory(cfg.StoreDir); + } + catch (Exception ex) + { + return ex; + } + + var js = new JetStream + { + Server = this, + Config = cfg, + Started = DateTime.UtcNow, + StandAlone = true, + }; + + _mu.EnterWriteLock(); + try + { + _jetStream = js; + _info.JetStream = true; + _info.Domain = cfg.Domain; + } + finally + { + _mu.ExitWriteLock(); + } + + var err = EnableJetStreamAccounts(); + if (err != null) + { + _mu.EnterWriteLock(); + try + { + _jetStream = null; + _info.JetStream = false; + } + finally + { + _mu.ExitWriteLock(); + } + } + + return err; + } + + internal bool CanExtendOtherDomain() + { + var opts = GetOpts(); + var sysAcc = SystemAccount()?.GetName(); + if (string.IsNullOrEmpty(sysAcc)) + return false; + + foreach (var remote in opts.LeafNode.Remotes) + { + if (!string.Equals(remote.LocalAccount, sysAcc, StringComparison.Ordinal)) + continue; + + foreach (var denyImport in remote.DenyImports) + { + if (SubscriptionIndex.SubjectIsSubsetMatch(denyImport, JsApiSubjects.JsAllApi)) + return false; + } + + return true; + } + + return false; + } + + internal void UpdateJetStreamInfoStatus(bool enabled) + { + _mu.EnterWriteLock(); + try + { + _info.JetStream = enabled; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal Exception? RestartJetStream() + { + var opts = GetOpts(); + var cfg = new JetStreamConfig + { + StoreDir = opts.StoreDir, + SyncInterval = opts.SyncInterval, + SyncAlways = opts.SyncAlways, + MaxMemory = opts.JetStreamMaxMemory, + MaxStore = opts.JetStreamMaxStore, + Domain = opts.JetStreamDomain, + Strict = !opts.NoJetStreamStrict, + }; + + Noticef("Restarting JetStream"); + var err = EnableJetStream(cfg); + if (err != null) + { + Warnf("Can't start JetStream: {0}", err.Message); + _ = DisableJetStream(); + return err; + } + + UpdateJetStreamInfoStatus(true); + return null; + } + + internal void CheckJetStreamExports() + { + if (SystemAccount() != null) + SetupJetStreamExports(); + } + + internal void SetupJetStreamExports() + { + var sys = SystemAccount(); + if (sys == null) + return; + + var err = sys.AddServiceExport(JsApiSubjects.JsAllApi, null); + if (err != null) + Warnf("Error setting up jetstream service exports: {0}", err.Message); + } + + internal bool JetStreamOOSPending() + { + var js = _jetStream; + if (js == null) + return false; + + js.Lock.EnterWriteLock(); + try + { + var wasPending = js.Oos; + js.Oos = true; + return wasPending; + } + finally + { + js.Lock.ExitWriteLock(); + } + } + + internal void SetJetStreamDisabled() + { + var js = _jetStream; + if (js != null) + Interlocked.Exchange(ref js.Disabled, 1); + } + + internal void HandleOutOfSpace(NatsStream? stream) + { + if (!JetStreamEnabled() || JetStreamOOSPending()) + return; + + if (stream != null) + Errorf("JetStream out of resources for stream {0}, will be DISABLED", stream.Config.Name); + else + Errorf("JetStream out of resources, will be DISABLED"); + + _ = Task.Run(() => DisableJetStream()); + } + + public Exception? DisableJetStream() + { + if (!JetStreamEnabled()) + return null; + + SetJetStreamDisabled(); + UpdateJetStreamInfoStatus(false); + + _mu.EnterWriteLock(); + try + { + _jetStream = null; + } + finally + { + _mu.ExitWriteLock(); + } + + ShutdownJetStream(); + ShutdownRaftNodes(); + return null; + } + + private Exception? EnableJetStreamAccounts() + { + if (GlobalAccountOnly()) + { + var gacc = GlobalAccount(); + if (gacc == null) + return new InvalidOperationException("global account not found"); + + gacc.JetStreamLimits ??= new Dictionary(StringComparer.Ordinal) + { + [string.Empty] = new JetStreamAccountLimits + { + MaxMemory = -1, + MaxStore = -1, + MaxStreams = -1, + MaxConsumers = -1, + MaxAckPending = -1, + MemoryMaxStreamBytes = -1, + StoreMaxStreamBytes = -1, + }, + }; + + return ConfigJetStream(gacc); + } + + return ConfigAllJetStreamAccounts(); + } + + internal Exception? ConfigJetStream(Account? acc) + { + if (acc == null) + return null; + + var jsLimits = acc.JetStreamLimits; + if (jsLimits != null) + return acc.EnableAllJetStreamServiceImportsAndMappings(); + + if (!ReferenceEquals(acc, SystemAccount())) + { + acc.JetStream = null; + return acc.EnableJetStreamInfoServiceImportOnly(); + } + + return null; + } + + internal Exception? ConfigAllJetStreamAccounts() + { + CheckJetStreamExports(); + if (_jetStream == null) + return null; + + foreach (var acc in _accounts.Values) + { + var err = ConfigJetStream(acc); + if (err != null) + return err; + } + + var storeDir = _jetStream.Config.StoreDir; + if (!Directory.Exists(storeDir)) + return null; + + foreach (var directory in Directory.EnumerateDirectories(storeDir)) + { + var accountName = Path.GetFileName(directory); + if (string.IsNullOrWhiteSpace(accountName) || _accounts.ContainsKey(accountName)) + continue; + + var (resolved, _) = LookupAccount(accountName); + if (resolved == null) + continue; + + var err = ConfigJetStream(resolved); + if (err != null) + return err; + } + + return null; + } + + public bool JetStreamEnabled() + { + var js = _jetStream; + return js != null && Interlocked.CompareExchange(ref js.Disabled, 0, 0) == 0; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs index ae0aee9..fec5439 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs @@ -253,6 +253,7 @@ public sealed partial class NatsServer : INatsServer private long _cproto; // count of INFO-capable clients private readonly ConcurrentDictionary _nodeToInfo = new(StringComparer.Ordinal); private readonly ConcurrentDictionary _raftNodes = new(StringComparer.Ordinal); + private JetStream? _jetStream; private readonly Dictionary _routesToSelf = []; private INetResolver? _routeResolver; private readonly ConcurrentDictionary _rateLimitLogging = new(); diff --git a/porting.db b/porting.db index 9bac02c..86c296c 100644 Binary files a/porting.db and b/porting.db differ