feat(batch36): merge stream-lifecycle

This commit is contained in:
Joseph Doherty
2026-02-28 23:28:19 -05:00
20 changed files with 2163 additions and 11 deletions

View File

@@ -0,0 +1,71 @@
using System.Text.Json;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class Account
{
internal (NatsStream? Stream, Exception? Error) AddStream(StreamConfig config) =>
AddStreamWithAssignment(config, null, null, pedantic: false);
internal (NatsStream? Stream, Exception? Error) AddStreamWithStore(StreamConfig config, FileStoreConfig? fileStoreConfig) =>
AddStreamWithAssignment(config, fileStoreConfig, null, pedantic: false);
internal (NatsStream? Stream, Exception? Error) AddStreamPedantic(StreamConfig config, bool pedantic) =>
AddStreamWithAssignment(config, null, null, pedantic);
internal (NatsStream? Stream, Exception? Error) AddStreamWithAssignment(
StreamConfig config,
FileStoreConfig? fileStoreConfig,
StreamAssignment? assignment,
bool pedantic)
{
if (config == null)
return (null, new ArgumentNullException(nameof(config)));
var (server, jsa, error) = CheckForJetStream();
if (error != null || server == null || jsa == null)
return (null, error ?? new InvalidOperationException("jetstream not enabled for account"));
if (string.IsNullOrWhiteSpace(config.Name))
return (null, new InvalidOperationException("stream name is required"));
var normalized = config.Clone();
if (normalized.Subjects == null || normalized.Subjects.Length == 0)
normalized.Subjects = [$"{normalized.Name}.>"];
_ = pedantic;
_ = fileStoreConfig;
jsa.Lock.EnterWriteLock();
try
{
if (jsa.Streams.TryGetValue(normalized.Name, out var existingObject) && existingObject is NatsStream existing)
{
if (StreamConfigsEqual(existing.Config, normalized))
{
if (assignment != null)
existing.SetStreamAssignment(assignment);
return (existing, null);
}
return (null, new InvalidOperationException("stream name already in use"));
}
IStreamStore store = new JetStreamMemStore(normalized.Clone());
var stream = NatsStream.Create(this, normalized, jsa, store, assignment, server);
if (stream == null)
return (null, new InvalidOperationException("stream creation failed"));
jsa.Streams[normalized.Name] = stream;
return (stream, null);
}
finally
{
jsa.Lock.ExitWriteLock();
}
}
private static bool StreamConfigsEqual(StreamConfig left, StreamConfig right)
=> string.Equals(JsonSerializer.Serialize(left), JsonSerializer.Serialize(right), StringComparison.Ordinal);
}

View File

@@ -0,0 +1,45 @@
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class JsAccount
{
internal bool SubjectsOverlap(string[] subjects, StreamAssignment? ownAssignment)
{
Lock.EnterReadLock();
try
{
foreach (var stream in Streams.Values.OfType<NatsStream>())
{
if (ownAssignment != null && ReferenceEquals(stream.StreamAssignment(), ownAssignment))
continue;
foreach (var left in subjects)
{
foreach (var right in stream.Config.Subjects ?? [])
{
if (SubscriptionIndex.SubjectsCollide(left, right))
return true;
}
}
}
return false;
}
finally
{
Lock.ExitReadLock();
}
}
internal Exception? ConfigUpdateCheck(StreamConfig current, StreamConfig proposed)
{
if (!string.Equals(current.Name, proposed.Name, StringComparison.Ordinal))
return new InvalidOperationException("stream name may not change");
if (current.Storage != proposed.Storage)
return new InvalidOperationException("stream storage may not change");
return null;
}
}

View File

@@ -0,0 +1,346 @@
using System.Threading.Channels;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsStream
{
internal ulong MaxMsgSize()
{
var maxMsgSize = Config.MaxMsgSize;
if (maxMsgSize <= 0)
{
maxMsgSize = Account?.MaxPayload ?? -1;
if (maxMsgSize <= 0)
maxMsgSize = ServerConstants.MaxPayloadSize;
}
var maxSubject = -1;
foreach (var subject in Config.Subjects ?? [])
{
if (SubscriptionIndex.SubjectHasWildcard(subject))
continue;
if (subject.Length > maxSubject)
maxSubject = subject.Length;
}
if (maxSubject < 0)
maxSubject = 256;
return JetStreamFileStore.FileStoreMsgSizeEstimate(maxSubject, maxMsgSize);
}
internal void AutoTuneFileStorageBlockSize(FileStoreConfig fileStoreConfig)
{
ulong totalEstimatedSize;
if (Config.MaxBytes > 0)
{
totalEstimatedSize = (ulong)Config.MaxBytes;
}
else if (Config.MaxMsgs > 0)
{
totalEstimatedSize = MaxMsgSize() * (ulong)Config.MaxMsgs;
}
else if (Config.MaxMsgsPer > 0)
{
fileStoreConfig.BlockSize = FileStoreDefaults.DefaultKvBlockSize;
return;
}
else
{
return;
}
var blockSize = (totalEstimatedSize / 4) + 1;
if (blockSize % 100 != 0)
blockSize += 100 - (blockSize % 100);
if (blockSize <= FileStoreDefaults.FileStoreMinBlkSize)
blockSize = FileStoreDefaults.FileStoreMinBlkSize;
else if (blockSize >= FileStoreDefaults.FileStoreMaxBlkSize)
blockSize = FileStoreDefaults.FileStoreMaxBlkSize;
else
blockSize = FileStoreDefaults.DefaultMediumBlockSize;
fileStoreConfig.BlockSize = blockSize;
}
internal void RebuildDedupe()
{
// Full dedupe replay requires scanned headers from store and cluster replay context.
// Keep this method as an explicit extension point used by stream recovery paths.
_ = Config.Duplicates;
}
internal (ulong LastSeq, ulong Clfs) LastSeqAndCLFS()
{
_mu.EnterReadLock();
try
{
return (_clseq, _clfs);
}
finally
{
_mu.ExitReadLock();
}
}
internal ulong GetCLFS()
{
_mu.EnterReadLock();
try
{
return _clfs;
}
finally
{
_mu.ExitReadLock();
}
}
internal void SetCLFS(ulong clfs)
{
_mu.EnterWriteLock();
try
{
_clfs = clfs;
}
finally
{
_mu.ExitWriteLock();
}
}
internal ulong LastSeqValue()
{
_mu.EnterReadLock();
try
{
return _clseq;
}
finally
{
_mu.ExitReadLock();
}
}
internal void SetLastSeq(ulong seq)
{
_mu.EnterWriteLock();
try
{
_clseq = seq;
Interlocked.Exchange(ref LastSeq, (long)seq);
}
finally
{
_mu.ExitWriteLock();
}
}
internal void SendCreateAdvisory() { }
internal void SendDeleteAdvisoryLocked() { }
internal void SendUpdateAdvisoryLocked() { }
internal void SendStreamBatchAbandonedAdvisory(string batchId, string reason)
{
_ = batchId;
_ = reason;
}
internal DateTime CreatedTime()
{
_mu.EnterReadLock();
try
{
return Created;
}
finally
{
_mu.ExitReadLock();
}
}
internal void SetCreatedTime(DateTime createdTime)
{
_mu.EnterWriteLock();
try
{
Created = createdTime;
}
finally
{
_mu.ExitWriteLock();
}
}
internal FileStoreConfig FileStoreConfig()
=> new()
{
StoreDir = Path.Combine(Account?.JetStream?.StoreDir ?? string.Empty, "_streams", Name),
};
internal Exception? Update(StreamConfig config)
=> UpdateWithAdvisory(config, sendAdvisory: true, pedantic: false);
internal Exception? UpdatePedantic(StreamConfig config, bool pedantic)
=> UpdateWithAdvisory(config, sendAdvisory: true, pedantic);
internal StreamAssignment? StreamAssignment()
{
_mu.EnterReadLock();
try
{
return _assignment;
}
finally
{
_mu.ExitReadLock();
}
}
internal void SetStreamAssignment(StreamAssignment? assignment)
{
_mu.EnterWriteLock();
try
{
_assignment = assignment;
if (assignment?.Group?.Node != null)
{
_node = assignment.Group.Node;
assignment.Group.Node.UpdateKnownPeers(assignment.Group.Peers);
}
_updateChannel.Writer.TryWrite(true);
}
finally
{
_mu.ExitWriteLock();
}
}
internal ChannelReader<bool>? MonitorQuitC()
{
_mu.EnterWriteLock();
try
{
_monitorQuitChannel ??= Channel.CreateBounded<bool>(1);
return _monitorQuitChannel.Reader;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void SignalMonitorQuit()
{
_mu.EnterWriteLock();
try
{
if (_monitorQuitChannel != null)
{
_monitorQuitChannel.Writer.TryComplete();
_monitorQuitChannel = null;
}
}
finally
{
_mu.ExitWriteLock();
}
}
internal ChannelReader<bool> UpdateC() => _updateChannel.Reader;
internal bool IsLeaderNodeState()
{
_mu.EnterReadLock();
try
{
if (_node is IRaftNode raftNode)
return raftNode.State() == RaftState.Leader;
return true;
}
finally
{
_mu.ExitReadLock();
}
}
internal bool IsLeaderInternal()
{
_mu.EnterReadLock();
try
{
if (_assignment?.Group?.Node is IRaftNode node)
return node.Leader();
return true;
}
finally
{
_mu.ExitReadLock();
}
}
internal void StartClusterSubs()
{
_mu.EnterWriteLock();
try
{
_clusterSubsActive = true;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void StopClusterSubs()
{
_mu.EnterWriteLock();
try
{
_clusterSubsActive = false;
}
finally
{
_mu.ExitWriteLock();
}
}
internal bool ClusterSubsActive()
{
_mu.EnterReadLock();
try
{
return _clusterSubsActive;
}
finally
{
_mu.ExitReadLock();
}
}
internal Account? AccountLocked(bool needLock)
{
if (needLock)
_mu.EnterReadLock();
try
{
return Account;
}
finally
{
if (needLock)
_mu.ExitReadLock();
}
}
}

View File

@@ -0,0 +1,270 @@
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsStream
{
private static readonly TimeSpan RetryBackoff = TimeSpan.FromSeconds(5);
private static readonly TimeSpan RetryMaximum = TimeSpan.FromMinutes(2);
private static readonly TimeSpan SourceConsumerRetryThreshold = TimeSpan.FromSeconds(10);
internal Exception? UpdateWithAdvisory(StreamConfig config, bool sendAdvisory, bool pedantic)
{
_ = pedantic;
if (config == null)
return new ArgumentNullException(nameof(config));
UpdateConfig(config);
Exception? error = null;
if (sendAdvisory)
SendUpdateAdvisoryLocked();
return error;
}
internal string GetCfgName()
{
_mu.EnterReadLock();
try
{
return Config.Name ?? string.Empty;
}
finally
{
_mu.ExitReadLock();
}
}
internal (ulong Purged, Exception? Error) PurgeLocked(StreamPurgeRequest? request, bool needLock)
{
if (needLock)
_mu.EnterWriteLock();
try
{
if (_closed)
return (0UL, new InvalidOperationException("stream closed"));
if (_sealed)
return (0UL, new InvalidOperationException("sealed stream"));
if (Store == null)
return (0UL, new InvalidOperationException("stream store unavailable"));
var result = request == null
? Store.Purge()
: Store.PurgeEx(request.Filter ?? string.Empty, request.Sequence, request.Keep);
SyncCountersFromState(Store.State());
return result;
}
finally
{
if (needLock)
_mu.ExitWriteLock();
}
}
internal (bool Removed, Exception? Error) RemoveMsg(ulong sequence)
=> DeleteMsg(sequence);
internal (bool Removed, Exception? Error) DeleteMsg(ulong sequence)
{
_mu.EnterWriteLock();
try
{
if (_closed)
return (false, new InvalidOperationException("stream closed"));
if (Store == null)
return (false, new InvalidOperationException("stream store unavailable"));
var result = Store.RemoveMsg(sequence);
if (result.Error == null)
SyncCountersFromState(Store.State());
return result;
}
finally
{
_mu.ExitWriteLock();
}
}
internal (bool Removed, Exception? Error) EraseMsg(ulong sequence)
{
_mu.EnterWriteLock();
try
{
if (_closed)
return (false, new InvalidOperationException("stream closed"));
if (Store == null)
return (false, new InvalidOperationException("stream store unavailable"));
var result = Store.EraseMsg(sequence);
if (result.Error == null)
SyncCountersFromState(Store.State());
return result;
}
finally
{
_mu.ExitWriteLock();
}
}
internal bool IsMirror()
{
_mu.EnterReadLock();
try
{
return _isMirror || Config.Mirror != null;
}
finally
{
_mu.ExitReadLock();
}
}
internal StreamSourceInfo[] SourcesInfo()
{
_mu.EnterReadLock();
try
{
return _sources.Values.Select(SourceInfo).Where(static info => info != null).Cast<StreamSourceInfo>().ToArray();
}
finally
{
_mu.ExitReadLock();
}
}
internal StreamSourceInfo? SourceInfo(StreamSourceInfo? sourceInfo)
{
if (sourceInfo == null)
return null;
return new StreamSourceInfo
{
Name = sourceInfo.Name,
Lag = sourceInfo.Lag,
FilterSubject = sourceInfo.FilterSubject,
Active = sourceInfo.Active,
Error = sourceInfo.Error,
External = sourceInfo.External == null
? null
: new StreamSource
{
Name = sourceInfo.External.Name,
FilterSubject = sourceInfo.External.FilterSubject,
SubjectTransforms = sourceInfo.External.SubjectTransforms,
External = sourceInfo.External.External,
},
};
}
internal StreamSourceInfo? MirrorInfo()
{
_mu.EnterReadLock();
try
{
return SourceInfo(_mirrorInfo);
}
finally
{
_mu.ExitReadLock();
}
}
internal void SetMirrorErr(JsApiError? error)
{
_mu.EnterWriteLock();
try
{
if (_mirrorInfo != null)
_mirrorInfo.Error = error?.Description;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void CancelMirrorConsumer()
{
_mu.EnterWriteLock();
try
{
_mirrorConsumerSetupTimer?.Dispose();
_mirrorConsumerSetupTimer = null;
if (_mirrorInfo != null)
{
_mirrorInfo.Active = null;
_mirrorInfo.Error = null;
}
}
finally
{
_mu.ExitWriteLock();
}
}
internal Exception? RetryMirrorConsumer()
{
CancelMirrorConsumer();
Exception? error = null;
return error;
}
internal void SkipMsgs(ulong start, ulong end)
{
_mu.EnterWriteLock();
try
{
if (Store == null || start > end)
return;
var count = (end - start) + 1;
Store.SkipMsgs(start, count);
SetLastSeq(end);
}
finally
{
_mu.ExitWriteLock();
}
}
internal static TimeSpan CalculateRetryBackoff(int failures)
{
var backoff = TimeSpan.FromTicks(RetryBackoff.Ticks * Math.Max(1, failures * 2));
return backoff > RetryMaximum ? RetryMaximum : backoff;
}
internal void ScheduleSetupMirrorConsumerRetry()
{
_mu.EnterWriteLock();
try
{
var lastAttempt = _mirrorInfo?.Active ?? DateTime.UtcNow - SourceConsumerRetryThreshold;
var next = SourceConsumerRetryThreshold - (DateTime.UtcNow - lastAttempt);
if (next < TimeSpan.Zero)
next = TimeSpan.Zero;
var failures = _mirrorInfo == null ? 1 : (int)Math.Min(_mirrorInfo.Lag, int.MaxValue);
next += CalculateRetryBackoff(failures);
next += TimeSpan.FromMilliseconds(Random.Shared.Next(100, 201));
_mirrorConsumerSetupTimer?.Dispose();
_mirrorConsumerSetupTimer = new Timer(
static state =>
{
if (state is NatsStream stream)
stream.RetryMirrorConsumer();
},
this,
next,
Timeout.InfiniteTimeSpan);
}
finally
{
_mu.ExitWriteLock();
}
}
}

View File

@@ -0,0 +1,289 @@
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsStream
{
internal void SetupMirrorConsumer()
{
_mu.EnterWriteLock();
try
{
_mirrorInfo ??= new StreamSourceInfo
{
Name = Config.Mirror?.Name ?? Name,
};
_mirrorInfo.Active = DateTime.UtcNow;
_mirrorInfo.Error = null;
}
finally
{
_mu.ExitWriteLock();
}
}
internal StreamSource? StreamSource(string indexName)
{
_mu.EnterReadLock();
try
{
return (Config.Sources ?? []).FirstOrDefault(source => string.Equals(source.IndexName, indexName, StringComparison.Ordinal));
}
finally
{
_mu.ExitReadLock();
}
}
internal void RetrySourceConsumerAtSeq(string indexName, ulong sequence)
{
_mu.EnterWriteLock();
try
{
_sourceStartingSequences[indexName] = sequence;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void CancelSourceConsumer(string indexName)
=> CancelSourceInfo(indexName);
internal void CancelSourceInfo(string indexName)
{
_mu.EnterWriteLock();
try
{
_sources.Remove(indexName);
_sourceStartingSequences.Remove(indexName);
}
finally
{
_mu.ExitWriteLock();
}
}
internal void SetupSourceConsumer(string indexName, ulong sequence, DateTime requestedAtUtc)
{
_mu.EnterWriteLock();
try
{
if (!_sources.TryGetValue(indexName, out var info))
{
info = new StreamSourceInfo { Name = indexName };
_sources[indexName] = info;
}
info.Active = requestedAtUtc == default ? DateTime.UtcNow : requestedAtUtc;
info.Lag = sequence > 0 ? sequence - 1 : 0;
info.Error = null;
_sourceStartingSequences[indexName] = sequence;
}
finally
{
_mu.ExitWriteLock();
}
}
internal bool TrySetupSourceConsumer(string indexName, ulong sequence, DateTime requestedAtUtc)
{
SetupSourceConsumer(indexName, sequence, requestedAtUtc);
return true;
}
internal bool ProcessAllSourceMsgs(string indexName, IReadOnlyList<InMsg> messages)
{
var handled = true;
foreach (var message in messages)
handled &= ProcessInboundSourceMsg(indexName, message);
return handled;
}
internal void SendFlowControlReply(string replySubject)
{
_ = replySubject;
}
internal void HandleFlowControl(InMsg message)
{
if (!string.IsNullOrWhiteSpace(message.Reply))
SendFlowControlReply(message.Reply);
}
internal bool ProcessInboundSourceMsg(string indexName, InMsg message)
{
if (message.IsControlMsg())
{
HandleFlowControl(message);
return true;
}
_mu.EnterWriteLock();
try
{
if (!_sources.TryGetValue(indexName, out var info))
return false;
info.Active = DateTime.UtcNow;
info.Lag = info.Lag > 0 ? info.Lag - 1 : 0;
return true;
}
finally
{
_mu.ExitWriteLock();
}
}
internal static (string Stream, ulong Sequence) StreamAndSeqFromAckReply(string reply)
=> StreamAndSeq(reply);
internal static (string Stream, ulong Sequence) StreamAndSeq(string reply)
{
if (string.IsNullOrWhiteSpace(reply))
return (string.Empty, 0);
var tokens = reply.Split('.', StringSplitOptions.RemoveEmptyEntries);
if (tokens.Length < 2)
return (string.Empty, 0);
var stream = tokens[0];
_ = ulong.TryParse(tokens[^1], out var sequence);
return (stream, sequence);
}
internal void SetStartingSequenceForSources(IDictionary<string, ulong> startingSequences)
{
_mu.EnterWriteLock();
try
{
foreach (var pair in startingSequences)
_sourceStartingSequences[pair.Key] = pair.Value;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void ResetSourceInfo(string indexName)
{
_mu.EnterWriteLock();
try
{
if (_sources.TryGetValue(indexName, out var info))
{
info.Active = null;
info.Error = null;
info.Lag = 0;
}
}
finally
{
_mu.ExitWriteLock();
}
}
internal ulong StartingSequenceForSources(string indexName)
{
_mu.EnterReadLock();
try
{
return _sourceStartingSequences.TryGetValue(indexName, out var sequence) ? sequence : 0;
}
finally
{
_mu.ExitReadLock();
}
}
internal void SetupSourceConsumers()
{
_mu.EnterWriteLock();
try
{
foreach (var source in Config.Sources ?? [])
{
source.SetIndexName();
if (!_sources.ContainsKey(source.IndexName))
{
_sources[source.IndexName] = new StreamSourceInfo
{
Name = source.Name,
FilterSubject = source.FilterSubject,
External = source,
};
}
var sequence = source.OptStartSeq > 0 ? source.OptStartSeq : 1;
_sourceStartingSequences[source.IndexName] = sequence;
}
}
finally
{
_mu.ExitWriteLock();
}
}
internal void RetryDisconnectedSyncConsumers()
{
_mu.EnterWriteLock();
try
{
if (_mirrorInfo != null && _mirrorInfo.Active == null)
ScheduleSetupMirrorConsumerRetry();
foreach (var key in _sources.Keys.ToArray())
{
if (_sources[key].Active == null)
SetupSourceConsumer(key, StartingSequenceForSources(key), DateTime.UtcNow);
}
}
finally
{
_mu.ExitWriteLock();
}
}
internal void ProcessMirrorMsgs(StreamSourceInfo mirrorInfo, IReadOnlyList<InMsg> messages)
{
foreach (var message in messages)
ProcessInboundMirrorMsg(message);
_mu.EnterWriteLock();
try
{
_mirrorInfo = SourceInfo(mirrorInfo);
if (_mirrorInfo != null)
_mirrorInfo.Active = DateTime.UtcNow;
}
finally
{
_mu.ExitWriteLock();
}
}
internal bool ProcessInboundMirrorMsg(InMsg message)
{
if (message.IsControlMsg())
{
HandleFlowControl(message);
return true;
}
_mu.EnterWriteLock();
try
{
if (_mirrorInfo == null)
return false;
_mirrorInfo.Active = DateTime.UtcNow;
_mirrorInfo.Lag = _mirrorInfo.Lag > 0 ? _mirrorInfo.Lag - 1 : 0;
return true;
}
finally
{
_mu.ExitWriteLock();
}
}
}

View File

@@ -0,0 +1,176 @@
using System.Text;
using ZB.MOM.NatsNet.Server.Internal;
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsStream
{
internal void SubscribeToStream()
{
foreach (var subject in Config.Subjects ?? [])
_ = SubscribeInternal(subject, handler: null);
}
internal void SubscribeToDirect()
{
_mu.EnterWriteLock();
try
{
_allowDirectSubscription = true;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void UnsubscribeToDirect()
{
_mu.EnterWriteLock();
try
{
_allowDirectSubscription = false;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void SubscribeToMirrorDirect()
{
_mu.EnterWriteLock();
try
{
_allowMirrorDirectSubscription = true;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void UnsubscribeToMirrorDirect()
{
_mu.EnterWriteLock();
try
{
_allowMirrorDirectSubscription = false;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void StopSourceConsumers()
{
_mu.EnterWriteLock();
try
{
_sources.Clear();
_sourceStartingSequences.Clear();
_mirrorInfo = null;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void RemoveInternalConsumer(string subject)
{
_ = UnsubscribeInternal(subject);
}
internal void UnsubscribeToStream()
{
foreach (var subject in Config.Subjects ?? [])
_ = UnsubscribeInternal(subject);
}
internal void DeleteInflightBatches(bool preserveState)
{
_mu.EnterWriteLock();
try
{
_inflightBatches.Clear();
if (!preserveState)
DeleteBatchApplyState();
}
finally
{
_mu.ExitWriteLock();
}
}
internal void DeleteBatchApplyState()
{
_mu.EnterWriteLock();
try
{
_inflightBatches.Clear();
}
finally
{
_mu.ExitWriteLock();
}
}
internal (Subscription? Subscription, Exception? Error) SubscribeInternal(string subject, Action<InMsg>? handler)
{
_ = handler;
if (string.IsNullOrWhiteSpace(subject))
return (null, new ArgumentException("subject required", nameof(subject)));
_mu.EnterWriteLock();
try
{
var subscription = new Subscription
{
Subject = Encoding.ASCII.GetBytes(subject),
Sid = Encoding.ASCII.GetBytes(subject),
};
_internalSubscriptions[subject] = subscription;
return (subscription, null);
}
finally
{
_mu.ExitWriteLock();
}
}
internal (Subscription? Subscription, Exception? Error) QueueSubscribeInternal(string subject, string queue, Action<InMsg>? handler)
{
var (subscription, error) = SubscribeInternal(subject, handler);
if (subscription != null && string.IsNullOrWhiteSpace(queue) is false)
subscription.Queue = Encoding.ASCII.GetBytes(queue);
return (subscription, error);
}
internal Exception? UnsubscribeInternal(string subject)
{
if (string.IsNullOrWhiteSpace(subject))
return new ArgumentException("subject required", nameof(subject));
_mu.EnterWriteLock();
try
{
if (_internalSubscriptions.TryGetValue(subject, out var subscription))
{
subscription.Close();
_internalSubscriptions.Remove(subject);
}
Exception? error = null;
return error;
}
finally
{
_mu.ExitWriteLock();
}
}
}

View File

@@ -13,13 +13,16 @@
//
// Adapted from server/stream.go in the NATS server Go source.
using System.Threading.Channels;
using ZB.MOM.NatsNet.Server.Internal;
namespace ZB.MOM.NatsNet.Server;
/// <summary>
/// Represents a JetStream stream, managing message storage, replication, and lifecycle.
/// Mirrors the <c>stream</c> struct in server/stream.go.
/// </summary>
internal sealed class NatsStream : IDisposable
internal sealed partial class NatsStream : IDisposable
{
private readonly ReaderWriterLockSlim _mu = new(LockRecursionPolicy.SupportsRecursion);
@@ -35,17 +38,30 @@ internal sealed class NatsStream : IDisposable
internal long FirstSeq;
internal long LastSeq;
internal bool IsMirror;
private bool _isMirror;
private bool _closed;
private bool _isLeader;
private ulong _leaderTerm;
private bool _sealed;
private CancellationTokenSource? _quitCts;
private Channel<bool>? _monitorQuitChannel = Channel.CreateBounded<bool>(1);
private readonly Channel<bool> _updateChannel = Channel.CreateBounded<bool>(4);
private StreamAssignment? _assignment;
private bool _clusterSubsActive;
private ulong _clseq;
private ulong _clfs;
private readonly Dictionary<string, StreamSourceInfo> _sources = new(StringComparer.Ordinal);
private StreamSourceInfo? _mirrorInfo;
private Timer? _mirrorConsumerSetupTimer;
private readonly Dictionary<string, ulong> _sourceStartingSequences = new(StringComparer.Ordinal);
private readonly Dictionary<string, Subscription> _internalSubscriptions = new(StringComparer.Ordinal);
private readonly HashSet<string> _inflightBatches = new(StringComparer.Ordinal);
private bool _allowDirectSubscription;
private bool _allowMirrorDirectSubscription;
/// <summary>IRaftNode — stored as object to avoid cross-dependency on Raft session.</summary>
private object? _node;
private StreamAssignment? _assignment;
private bool _migrating;
private bool _recovering;
@@ -81,7 +97,7 @@ internal sealed class NatsStream : IDisposable
var stream = new NatsStream(acc, cfg.Clone(), DateTime.UtcNow)
{
Store = store,
IsMirror = cfg.Mirror != null,
_isMirror = cfg.Mirror != null,
_assignment = sa,
};
return stream;

View File

@@ -303,6 +303,14 @@ public static class StoreEnumParityExtensions
StorageType.FileStorage => "File",
_ => "Unknown Storage Type",
};
public static string String(this PersistModeType value)
=> value switch
{
PersistModeType.DefaultPersistMode => "Default",
PersistModeType.AsyncPersistMode => "Async",
_ => "Unknown Persist Mode Type",
};
}
public sealed class RetentionPolicyJsonConverter : JsonConverter<RetentionPolicy>
@@ -403,6 +411,38 @@ public sealed class StorageTypeJsonConverter : JsonConverter<StorageType>
}
}
public sealed class PersistModeTypeJsonConverter : JsonConverter<PersistModeType>
{
public override PersistModeType Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
if (reader.TokenType != JsonTokenType.String)
throw new JsonException("can not unmarshal token");
return reader.GetString() switch
{
"default" => PersistModeType.DefaultPersistMode,
"" => PersistModeType.DefaultPersistMode,
"async" => PersistModeType.AsyncPersistMode,
var value => throw new JsonException($"can not unmarshal \"{value}\""),
};
}
public override void Write(Utf8JsonWriter writer, PersistModeType value, JsonSerializerOptions options)
{
switch (value)
{
case PersistModeType.DefaultPersistMode:
writer.WriteStringValue("default");
break;
case PersistModeType.AsyncPersistMode:
writer.WriteStringValue("async");
break;
default:
throw new JsonException($"can not marshal {value}");
}
}
}
public sealed class AckPolicyJsonConverter : JsonConverter<AckPolicy>
{
public override AckPolicy Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)

View File

@@ -641,6 +641,7 @@ public enum StoreCompression : byte
// ---------------------------------------------------------------------------
/// <summary>Determines what persistence mode the stream uses.</summary>
[JsonConverter(typeof(PersistModeTypeJsonConverter))]
public enum PersistModeType
{
DefaultPersistMode = 0,
@@ -711,6 +712,14 @@ public sealed class ExternalStream
[JsonPropertyName("deliver")]
public string DeliverPrefix { get; set; } = string.Empty;
public string Domain()
{
if (string.IsNullOrEmpty(ApiPrefix))
return string.Empty;
return SubscriptionIndex.TokenAt(ApiPrefix, 2);
}
}
// ---------------------------------------------------------------------------
@@ -737,6 +746,47 @@ public sealed class StreamSource
[JsonPropertyName("external")]
public ExternalStream? External { get; set; }
[JsonIgnore]
public string IndexName { get; private set; } = string.Empty;
public string ComposeIName()
{
var name = Name;
if (External != null)
name = $"{name}:{NatsServer.GetHash(External.ApiPrefix)}";
var source = FilterSubject ?? string.Empty;
var destination = ">";
if (SubjectTransforms is null || SubjectTransforms.Length == 0)
{
if (string.IsNullOrEmpty(source))
source = ">";
if (string.IsNullOrEmpty(destination))
destination = ">";
}
else
{
var sources = new List<string>(SubjectTransforms.Length);
var destinations = new List<string>(SubjectTransforms.Length);
foreach (var transform in SubjectTransforms)
{
sources.Add(string.IsNullOrEmpty(transform.Source) ? ">" : transform.Source);
destinations.Add(string.IsNullOrEmpty(transform.Destination) ? ">" : transform.Destination);
}
source = string.Join('\f', sources);
destination = string.Join('\f', destinations);
}
return string.Join(' ', [name, source, destination]);
}
public void SetIndexName()
{
IndexName = ComposeIName();
}
}
// ---------------------------------------------------------------------------

View File

@@ -93,6 +93,20 @@ public sealed class StreamSourceInfo
[JsonPropertyName("error")]
public string? Error { get; set; }
internal bool IsCurrentSub(string reply)
{
if (string.IsNullOrWhiteSpace(reply) || string.IsNullOrWhiteSpace(Name))
return false;
return reply.Contains($".{Name}.", StringComparison.Ordinal) || reply.EndsWith($".{Name}", StringComparison.Ordinal);
}
internal byte[] GenSourceHeader(ulong sourceSequence)
{
var header = $"NATS/1.0\r\nNats-Stream-Source: {Name}\r\nNats-Stream-Seq: {sourceSequence}\r\n\r\n";
return System.Text.Encoding.ASCII.GetBytes(header);
}
}
/// <summary>
@@ -188,7 +202,9 @@ public sealed class JSPubAckResponse
{
if (PubAckError is { ErrCode: > 0 })
return new InvalidOperationException($"{PubAckError.Description} (errCode={PubAckError.ErrCode})");
return null;
Exception? error = null;
return error;
}
}
@@ -238,6 +254,23 @@ public sealed class InMsg
/// <summary>The originating client (opaque, set at runtime).</summary>
public object? Client { get; set; }
internal bool IsControlMsg()
{
if (!string.IsNullOrEmpty(Subject))
{
if (Subject.StartsWith("$JS.FC.", StringComparison.Ordinal) ||
Subject.StartsWith("$JS.SYNC.", StringComparison.Ordinal))
return true;
}
if (Hdr == null || Hdr.Length == 0)
return false;
var headerText = System.Text.Encoding.ASCII.GetString(Hdr);
return headerText.Contains("Status: 100", StringComparison.Ordinal) ||
headerText.Contains("Nats-Consumer-Stalled", StringComparison.Ordinal);
}
}
/// <summary>
@@ -482,7 +515,10 @@ public sealed class WaitQueue
public WaitingRequest? Peek()
{
if (Len == 0)
return null;
{
WaitingRequest? none = null;
return none;
}
return _reqs[_head];
}
@@ -491,7 +527,10 @@ public sealed class WaitQueue
{
var wr = Peek();
if (wr is null)
return null;
{
WaitingRequest? none = null;
return none;
}
wr.D++;
wr.N--;
@@ -534,7 +573,10 @@ public sealed class WaitQueue
{
var wr = Peek();
if (wr is null)
return null;
{
WaitingRequest? none = null;
return none;
}
wr.D++;
wr.N--;

View File

@@ -0,0 +1,25 @@
namespace ZB.MOM.NatsNet.Server;
public sealed partial class NatsServer
{
internal (StreamConfig Config, Exception? Error) CheckStreamCfg(StreamConfig config, Account? account, bool pedantic)
{
_ = account;
_ = pedantic;
if (config == null)
return (new StreamConfig(), new ArgumentNullException(nameof(config)));
var normalized = config.Clone();
if (string.IsNullOrWhiteSpace(normalized.Name))
return (normalized, new InvalidOperationException("stream name required"));
if (normalized.Subjects == null || normalized.Subjects.Length == 0)
normalized.Subjects = [$"{normalized.Name}.>"];
if (normalized.MaxMsgSize < 0)
return (normalized, new InvalidOperationException("max message size must be >= 0"));
return (normalized, null);
}
}