Files
natsnet/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs
2026-02-28 22:59:08 -05:00

357 lines
7.5 KiB
C#

using System.Threading.Channels;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsStream
{
internal ulong MaxMsgSize()
{
var maxMsgSize = Config.MaxMsgSize;
if (maxMsgSize <= 0)
{
maxMsgSize = Account?.MaxPayload ?? -1;
if (maxMsgSize <= 0)
maxMsgSize = ServerConstants.MaxPayloadSize;
}
var maxSubject = -1;
foreach (var subject in Config.Subjects ?? [])
{
if (SubscriptionIndex.SubjectHasWildcard(subject))
continue;
if (subject.Length > maxSubject)
maxSubject = subject.Length;
}
if (maxSubject < 0)
maxSubject = 256;
return JetStreamFileStore.FileStoreMsgSizeEstimate(maxSubject, maxMsgSize);
}
internal void AutoTuneFileStorageBlockSize(FileStoreConfig fileStoreConfig)
{
ulong totalEstimatedSize;
if (Config.MaxBytes > 0)
{
totalEstimatedSize = (ulong)Config.MaxBytes;
}
else if (Config.MaxMsgs > 0)
{
totalEstimatedSize = MaxMsgSize() * (ulong)Config.MaxMsgs;
}
else if (Config.MaxMsgsPer > 0)
{
fileStoreConfig.BlockSize = FileStoreDefaults.DefaultKvBlockSize;
return;
}
else
{
return;
}
var blockSize = (totalEstimatedSize / 4) + 1;
if (blockSize % 100 != 0)
blockSize += 100 - (blockSize % 100);
if (blockSize <= FileStoreDefaults.FileStoreMinBlkSize)
blockSize = FileStoreDefaults.FileStoreMinBlkSize;
else if (blockSize >= FileStoreDefaults.FileStoreMaxBlkSize)
blockSize = FileStoreDefaults.FileStoreMaxBlkSize;
else
blockSize = FileStoreDefaults.DefaultMediumBlockSize;
fileStoreConfig.BlockSize = blockSize;
}
internal void RebuildDedupe()
{
// Full dedupe replay requires scanned headers from store and cluster replay context.
// Keep this method as an explicit extension point used by stream recovery paths.
_ = Config.Duplicates;
}
internal (ulong LastSeq, ulong Clfs) LastSeqAndCLFS()
{
_mu.EnterReadLock();
try
{
return (_clseq, _clfs);
}
finally
{
_mu.ExitReadLock();
}
}
internal ulong GetCLFS()
{
_mu.EnterReadLock();
try
{
return _clfs;
}
finally
{
_mu.ExitReadLock();
}
}
internal void SetCLFS(ulong clfs)
{
_mu.EnterWriteLock();
try
{
_clfs = clfs;
}
finally
{
_mu.ExitWriteLock();
}
}
internal ulong LastSeqValue()
{
_mu.EnterReadLock();
try
{
return _clseq;
}
finally
{
_mu.ExitReadLock();
}
}
internal void SetLastSeq(ulong seq)
{
_mu.EnterWriteLock();
try
{
_clseq = seq;
Interlocked.Exchange(ref LastSeq, (long)seq);
}
finally
{
_mu.ExitWriteLock();
}
}
internal void SendCreateAdvisory() { }
internal void SendDeleteAdvisoryLocked() { }
internal void SendUpdateAdvisoryLocked() { }
internal void SendStreamBatchAbandonedAdvisory(string batchId, string reason)
{
_ = batchId;
_ = reason;
}
internal DateTime CreatedTime()
{
_mu.EnterReadLock();
try
{
return Created;
}
finally
{
_mu.ExitReadLock();
}
}
internal void SetCreatedTime(DateTime createdTime)
{
_mu.EnterWriteLock();
try
{
Created = createdTime;
}
finally
{
_mu.ExitWriteLock();
}
}
internal FileStoreConfig FileStoreConfig()
=> new()
{
StoreDir = Path.Combine(Account?.JetStream?.StoreDir ?? string.Empty, "_streams", Name),
};
internal Exception? Update(StreamConfig config)
{
if (config == null)
return new ArgumentNullException(nameof(config));
UpdateConfig(config);
Exception? error = null;
return error;
}
internal Exception? UpdatePedantic(StreamConfig config, bool pedantic)
{
_ = pedantic;
return Update(config);
}
internal StreamAssignment? StreamAssignment()
{
_mu.EnterReadLock();
try
{
return _assignment;
}
finally
{
_mu.ExitReadLock();
}
}
internal void SetStreamAssignment(StreamAssignment? assignment)
{
_mu.EnterWriteLock();
try
{
_assignment = assignment;
if (assignment?.Group?.Node != null)
{
_node = assignment.Group.Node;
assignment.Group.Node.UpdateKnownPeers(assignment.Group.Peers);
}
_updateChannel.Writer.TryWrite(true);
}
finally
{
_mu.ExitWriteLock();
}
}
internal ChannelReader<bool>? MonitorQuitC()
{
_mu.EnterWriteLock();
try
{
_monitorQuitChannel ??= Channel.CreateBounded<bool>(1);
return _monitorQuitChannel.Reader;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void SignalMonitorQuit()
{
_mu.EnterWriteLock();
try
{
if (_monitorQuitChannel != null)
{
_monitorQuitChannel.Writer.TryComplete();
_monitorQuitChannel = null;
}
}
finally
{
_mu.ExitWriteLock();
}
}
internal ChannelReader<bool> UpdateC() => _updateChannel.Reader;
internal bool IsLeaderNodeState()
{
_mu.EnterReadLock();
try
{
if (_node is IRaftNode raftNode)
return raftNode.State() == RaftState.Leader;
return true;
}
finally
{
_mu.ExitReadLock();
}
}
internal bool IsLeaderInternal()
{
_mu.EnterReadLock();
try
{
if (_assignment?.Group?.Node is IRaftNode node)
return node.Leader();
return true;
}
finally
{
_mu.ExitReadLock();
}
}
internal void StartClusterSubs()
{
_mu.EnterWriteLock();
try
{
_clusterSubsActive = true;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void StopClusterSubs()
{
_mu.EnterWriteLock();
try
{
_clusterSubsActive = false;
}
finally
{
_mu.ExitWriteLock();
}
}
internal bool ClusterSubsActive()
{
_mu.EnterReadLock();
try
{
return _clusterSubsActive;
}
finally
{
_mu.ExitReadLock();
}
}
internal Account? AccountLocked(bool needLock)
{
if (needLock)
_mu.EnterReadLock();
try
{
return Account;
}
finally
{
if (needLock)
_mu.ExitReadLock();
}
}
}