feat(batch27): implement jetstream engine state and account enable core

This commit is contained in:
Joseph Doherty
2026-02-28 21:01:11 -05:00
parent 4b7fac7957
commit 12b8c9b4c5
4 changed files with 370 additions and 0 deletions

View File

@@ -2,6 +2,254 @@ namespace ZB.MOM.NatsNet.Server;
public sealed partial class Account
{
private static Dictionary<string, object> DefaultJetStreamAccountTiers()
{
return new Dictionary<string, object>(StringComparer.Ordinal)
{
[string.Empty] = new JetStreamAccountLimits
{
MaxMemory = -1,
MaxStore = -1,
MaxStreams = -1,
MaxConsumers = -1,
MaxAckPending = -1,
MemoryMaxStreamBytes = -1,
StoreMaxStreamBytes = -1,
},
};
}
private static Dictionary<string, JetStreamAccountLimits> ToTypedLimits(Dictionary<string, object> limits)
{
var typed = new Dictionary<string, JetStreamAccountLimits>(StringComparer.Ordinal);
foreach (var (tier, value) in limits)
{
if (value is JetStreamAccountLimits v)
typed[tier] = v;
}
return typed;
}
private static JetStreamAccountLimits SelectLimits(Dictionary<string, JetStreamAccountLimits> limits)
{
if (limits.TryGetValue(string.Empty, out var selected))
return selected;
foreach (var (_, value) in limits)
return value;
return new JetStreamAccountLimits();
}
internal void AssignJetStreamLimits(Dictionary<string, object> limits)
{
_mu.EnterWriteLock();
try
{
JetStreamLimits = limits;
}
finally
{
_mu.ExitWriteLock();
}
}
internal Exception? EnableJetStream(Dictionary<string, object>? limits)
{
_mu.EnterReadLock();
var server = Server as NatsServer;
_mu.ExitReadLock();
if (server == null)
return new InvalidOperationException("jetstream account not registered");
if (ReferenceEquals(server.SystemAccount(), this))
return new InvalidOperationException("jetstream can not be enabled on the system account");
limits ??= DefaultJetStreamAccountTiers();
if (limits.Count == 0)
limits = DefaultJetStreamAccountTiers();
AssignJetStreamLimits(limits);
var typedLimits = ToTypedLimits(limits);
var js = server.GetJetStreamState();
if (js == null)
return new InvalidOperationException("jetstream not enabled");
js.Lock.EnterWriteLock();
try
{
if (js.Accounts.TryGetValue(Name, out var existing))
{
_mu.EnterWriteLock();
JetStream = existing;
_mu.ExitWriteLock();
return EnableAllJetStreamServiceImportsAndMappings();
}
var jsa = new JsAccount
{
Js = js,
Account = this,
StoreDir = Path.Combine(js.Config.StoreDir, Name),
};
foreach (var (tier, tierLimits) in typedLimits)
jsa.Limits[tier] = tierLimits;
js.Accounts[Name] = jsa;
_mu.EnterWriteLock();
JetStream = jsa;
_mu.ExitWriteLock();
}
finally
{
js.Lock.ExitWriteLock();
}
return EnableAllJetStreamServiceImportsAndMappings();
}
internal (bool MaxBytesRequired, long MaxStreamBytes) MaxBytesLimits(StreamConfig? cfg)
{
_mu.EnterReadLock();
var jsa = JetStream;
_mu.ExitReadLock();
if (jsa == null)
return (false, 0);
jsa.UsageLock.EnterReadLock();
try
{
var selected = SelectLimits(jsa.Limits);
var maxStreamBytes = cfg?.Storage == StorageType.MemoryStorage
? selected.MemoryMaxStreamBytes
: selected.StoreMaxStreamBytes;
return (selected.MaxBytesRequired, maxStreamBytes);
}
finally
{
jsa.UsageLock.ExitReadLock();
}
}
internal int NumStreams()
{
_mu.EnterReadLock();
var jsa = JetStream;
_mu.ExitReadLock();
if (jsa == null)
return 0;
jsa.Lock.EnterReadLock();
try
{
return jsa.Streams.Count;
}
finally
{
jsa.Lock.ExitReadLock();
}
}
internal List<NatsStream> Streams() => FilteredStreams(string.Empty);
internal List<NatsStream> FilteredStreams(string filter)
{
_mu.EnterReadLock();
var jsa = JetStream;
_mu.ExitReadLock();
if (jsa == null)
return [];
jsa.Lock.EnterReadLock();
try
{
var streams = new List<NatsStream>();
foreach (var stream in jsa.Streams.Values.OfType<NatsStream>())
{
if (string.IsNullOrWhiteSpace(filter))
{
streams.Add(stream);
continue;
}
foreach (var subject in stream.Config.Subjects ?? [])
{
if (Internal.DataStructures.SubscriptionIndex.SubjectsCollide(filter, subject))
{
streams.Add(stream);
break;
}
}
}
return streams;
}
finally
{
jsa.Lock.ExitReadLock();
}
}
internal (NatsStream? Stream, Exception? Error) LookupStream(string name)
{
_mu.EnterReadLock();
var jsa = JetStream;
_mu.ExitReadLock();
if (jsa == null)
return (null, new InvalidOperationException("jetstream not enabled for account"));
jsa.Lock.EnterReadLock();
try
{
if (jsa.Streams.TryGetValue(name, out var stream) && stream is NatsStream ns)
return (ns, null);
return (null, new InvalidOperationException("stream not found"));
}
finally
{
jsa.Lock.ExitReadLock();
}
}
internal Exception? UpdateJetStreamLimits(Dictionary<string, object>? limits)
{
_mu.EnterReadLock();
var server = Server as NatsServer;
var jsa = JetStream;
_mu.ExitReadLock();
if (server == null)
return new InvalidOperationException("jetstream account not registered");
if (server.GetJetStreamState() == null)
return new InvalidOperationException("jetstream not enabled");
if (jsa == null)
return new InvalidOperationException("jetstream not enabled for account");
limits ??= DefaultJetStreamAccountTiers();
if (limits.Count == 0)
limits = DefaultJetStreamAccountTiers();
AssignJetStreamLimits(limits);
var typed = ToTypedLimits(limits);
jsa.UsageLock.EnterWriteLock();
try
{
jsa.Limits.Clear();
foreach (var (tier, tierLimits) in typed)
jsa.Limits[tier] = tierLimits;
}
finally
{
jsa.UsageLock.ExitWriteLock();
}
return null;
}
internal Exception? EnableAllJetStreamServiceImportsAndMappings()
{
_mu.EnterReadLock();

View File

@@ -0,0 +1,47 @@
namespace ZB.MOM.NatsNet.Server;
internal sealed class JetStreamEngine(JetStream state)
{
private readonly JetStream _state = state;
internal void SetStarted()
{
_state.Lock.EnterWriteLock();
try
{
_state.Started = DateTime.UtcNow;
}
finally
{
_state.Lock.ExitWriteLock();
}
}
internal bool IsEnabled() => Interlocked.CompareExchange(ref _state.Disabled, 0, 0) == 0;
internal void SetJetStreamStandAlone(bool isStandAlone)
{
_state.Lock.EnterWriteLock();
try
{
_state.StandAlone = isStandAlone;
}
finally
{
_state.Lock.ExitWriteLock();
}
}
internal bool IsShuttingDown()
{
_state.Lock.EnterReadLock();
try
{
return _state.ShuttingDown;
}
finally
{
_state.Lock.ExitReadLock();
}
}
}

View File

@@ -424,4 +424,79 @@ public sealed partial class NatsServer
var js = _jetStream;
return js != null && Interlocked.CompareExchange(ref js.Disabled, 0, 0) == 0;
}
public bool JetStreamEnabledForDomain()
{
if (JetStreamEnabled())
return true;
foreach (var value in _nodeToInfo.Values)
{
if (value is NodeInfo { Js: true })
return true;
}
return false;
}
public JetStreamConfig? JetStreamConfig()
{
var js = _jetStream;
if (js == null)
return null;
var cfg = js.Config;
return new JetStreamConfig
{
MaxMemory = cfg.MaxMemory,
MaxStore = cfg.MaxStore,
StoreDir = cfg.StoreDir,
SyncInterval = cfg.SyncInterval,
SyncAlways = cfg.SyncAlways,
Domain = cfg.Domain,
CompressOK = cfg.CompressOK,
UniqueTag = cfg.UniqueTag,
Strict = cfg.Strict,
};
}
public string StoreDir()
{
var js = _jetStream;
return js == null ? string.Empty : js.Config.StoreDir;
}
public int JetStreamNumAccounts()
{
var js = _jetStream;
if (js == null)
return 0;
js.Lock.EnterReadLock();
try
{
return js.Accounts.Count;
}
finally
{
js.Lock.ExitReadLock();
}
}
public (long MemReserved, long StoreReserved, Exception? Error) JetStreamReservedResources()
{
var js = _jetStream;
if (js == null)
return (-1, -1, new InvalidOperationException("jetstream not enabled"));
return (
Interlocked.Read(ref js.MemReserved),
Interlocked.Read(ref js.StoreReserved),
null);
}
internal JetStreamEngine? GetJetStream() =>
_jetStream == null ? null : new JetStreamEngine(_jetStream);
internal JetStream? GetJetStreamState() => _jetStream;
}

Binary file not shown.