diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.StreamLifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.StreamLifecycle.cs new file mode 100644 index 0000000..9f5b73e --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.StreamLifecycle.cs @@ -0,0 +1,71 @@ +using System.Text.Json; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class Account +{ + internal (NatsStream? Stream, Exception? Error) AddStream(StreamConfig config) => + AddStreamWithAssignment(config, null, null, pedantic: false); + + internal (NatsStream? Stream, Exception? Error) AddStreamWithStore(StreamConfig config, FileStoreConfig? fileStoreConfig) => + AddStreamWithAssignment(config, fileStoreConfig, null, pedantic: false); + + internal (NatsStream? Stream, Exception? Error) AddStreamPedantic(StreamConfig config, bool pedantic) => + AddStreamWithAssignment(config, null, null, pedantic); + + internal (NatsStream? Stream, Exception? Error) AddStreamWithAssignment( + StreamConfig config, + FileStoreConfig? fileStoreConfig, + StreamAssignment? assignment, + bool pedantic) + { + if (config == null) + return (null, new ArgumentNullException(nameof(config))); + + var (server, jsa, error) = CheckForJetStream(); + if (error != null || server == null || jsa == null) + return (null, error ?? new InvalidOperationException("jetstream not enabled for account")); + + if (string.IsNullOrWhiteSpace(config.Name)) + return (null, new InvalidOperationException("stream name is required")); + + var normalized = config.Clone(); + if (normalized.Subjects == null || normalized.Subjects.Length == 0) + normalized.Subjects = [$"{normalized.Name}.>"]; + + _ = pedantic; + _ = fileStoreConfig; + + jsa.Lock.EnterWriteLock(); + try + { + if (jsa.Streams.TryGetValue(normalized.Name, out var existingObject) && existingObject is NatsStream existing) + { + if (StreamConfigsEqual(existing.Config, normalized)) + { + if (assignment != null) + existing.SetStreamAssignment(assignment); + return (existing, null); + } + + return (null, new InvalidOperationException("stream name already in use")); + } + + IStreamStore store = new JetStreamMemStore(normalized.Clone()); + + var stream = NatsStream.Create(this, normalized, jsa, store, assignment, server); + if (stream == null) + return (null, new InvalidOperationException("stream creation failed")); + + jsa.Streams[normalized.Name] = stream; + return (stream, null); + } + finally + { + jsa.Lock.ExitWriteLock(); + } + } + + private static bool StreamConfigsEqual(StreamConfig left, StreamConfig right) + => string.Equals(JsonSerializer.Serialize(left), JsonSerializer.Serialize(right), StringComparison.Ordinal); +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.StreamLifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.StreamLifecycle.cs new file mode 100644 index 0000000..cf6f0f8 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.StreamLifecycle.cs @@ -0,0 +1,45 @@ +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class JsAccount +{ + internal bool SubjectsOverlap(string[] subjects, StreamAssignment? ownAssignment) + { + Lock.EnterReadLock(); + try + { + foreach (var stream in Streams.Values.OfType()) + { + if (ownAssignment != null && ReferenceEquals(stream.StreamAssignment(), ownAssignment)) + continue; + + foreach (var left in subjects) + { + foreach (var right in stream.Config.Subjects ?? []) + { + if (SubscriptionIndex.SubjectsCollide(left, right)) + return true; + } + } + } + + return false; + } + finally + { + Lock.ExitReadLock(); + } + } + + internal Exception? ConfigUpdateCheck(StreamConfig current, StreamConfig proposed) + { + if (!string.Equals(current.Name, proposed.Name, StringComparison.Ordinal)) + return new InvalidOperationException("stream name may not change"); + + if (current.Storage != proposed.Storage) + return new InvalidOperationException("stream storage may not change"); + + return null; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs new file mode 100644 index 0000000..bd444d3 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs @@ -0,0 +1,346 @@ +using System.Threading.Channels; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + internal ulong MaxMsgSize() + { + var maxMsgSize = Config.MaxMsgSize; + if (maxMsgSize <= 0) + { + maxMsgSize = Account?.MaxPayload ?? -1; + if (maxMsgSize <= 0) + maxMsgSize = ServerConstants.MaxPayloadSize; + } + + var maxSubject = -1; + foreach (var subject in Config.Subjects ?? []) + { + if (SubscriptionIndex.SubjectHasWildcard(subject)) + continue; + + if (subject.Length > maxSubject) + maxSubject = subject.Length; + } + + if (maxSubject < 0) + maxSubject = 256; + + return JetStreamFileStore.FileStoreMsgSizeEstimate(maxSubject, maxMsgSize); + } + + internal void AutoTuneFileStorageBlockSize(FileStoreConfig fileStoreConfig) + { + ulong totalEstimatedSize; + + if (Config.MaxBytes > 0) + { + totalEstimatedSize = (ulong)Config.MaxBytes; + } + else if (Config.MaxMsgs > 0) + { + totalEstimatedSize = MaxMsgSize() * (ulong)Config.MaxMsgs; + } + else if (Config.MaxMsgsPer > 0) + { + fileStoreConfig.BlockSize = FileStoreDefaults.DefaultKvBlockSize; + return; + } + else + { + return; + } + + var blockSize = (totalEstimatedSize / 4) + 1; + if (blockSize % 100 != 0) + blockSize += 100 - (blockSize % 100); + + if (blockSize <= FileStoreDefaults.FileStoreMinBlkSize) + blockSize = FileStoreDefaults.FileStoreMinBlkSize; + else if (blockSize >= FileStoreDefaults.FileStoreMaxBlkSize) + blockSize = FileStoreDefaults.FileStoreMaxBlkSize; + else + blockSize = FileStoreDefaults.DefaultMediumBlockSize; + + fileStoreConfig.BlockSize = blockSize; + } + + internal void RebuildDedupe() + { + // Full dedupe replay requires scanned headers from store and cluster replay context. + // Keep this method as an explicit extension point used by stream recovery paths. + _ = Config.Duplicates; + } + + internal (ulong LastSeq, ulong Clfs) LastSeqAndCLFS() + { + _mu.EnterReadLock(); + try + { + return (_clseq, _clfs); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal ulong GetCLFS() + { + _mu.EnterReadLock(); + try + { + return _clfs; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetCLFS(ulong clfs) + { + _mu.EnterWriteLock(); + try + { + _clfs = clfs; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ulong LastSeqValue() + { + _mu.EnterReadLock(); + try + { + return _clseq; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetLastSeq(ulong seq) + { + _mu.EnterWriteLock(); + try + { + _clseq = seq; + Interlocked.Exchange(ref LastSeq, (long)seq); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void SendCreateAdvisory() { } + + internal void SendDeleteAdvisoryLocked() { } + + internal void SendUpdateAdvisoryLocked() { } + + internal void SendStreamBatchAbandonedAdvisory(string batchId, string reason) + { + _ = batchId; + _ = reason; + } + + internal DateTime CreatedTime() + { + _mu.EnterReadLock(); + try + { + return Created; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetCreatedTime(DateTime createdTime) + { + _mu.EnterWriteLock(); + try + { + Created = createdTime; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal FileStoreConfig FileStoreConfig() + => new() + { + StoreDir = Path.Combine(Account?.JetStream?.StoreDir ?? string.Empty, "_streams", Name), + }; + + internal Exception? Update(StreamConfig config) + => UpdateWithAdvisory(config, sendAdvisory: true, pedantic: false); + + internal Exception? UpdatePedantic(StreamConfig config, bool pedantic) + => UpdateWithAdvisory(config, sendAdvisory: true, pedantic); + + internal StreamAssignment? StreamAssignment() + { + _mu.EnterReadLock(); + try + { + return _assignment; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetStreamAssignment(StreamAssignment? assignment) + { + _mu.EnterWriteLock(); + try + { + _assignment = assignment; + + if (assignment?.Group?.Node != null) + { + _node = assignment.Group.Node; + assignment.Group.Node.UpdateKnownPeers(assignment.Group.Peers); + } + + _updateChannel.Writer.TryWrite(true); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ChannelReader? MonitorQuitC() + { + _mu.EnterWriteLock(); + try + { + _monitorQuitChannel ??= Channel.CreateBounded(1); + return _monitorQuitChannel.Reader; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void SignalMonitorQuit() + { + _mu.EnterWriteLock(); + try + { + if (_monitorQuitChannel != null) + { + _monitorQuitChannel.Writer.TryComplete(); + _monitorQuitChannel = null; + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ChannelReader UpdateC() => _updateChannel.Reader; + + internal bool IsLeaderNodeState() + { + _mu.EnterReadLock(); + try + { + if (_node is IRaftNode raftNode) + return raftNode.State() == RaftState.Leader; + + return true; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal bool IsLeaderInternal() + { + _mu.EnterReadLock(); + try + { + if (_assignment?.Group?.Node is IRaftNode node) + return node.Leader(); + + return true; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void StartClusterSubs() + { + _mu.EnterWriteLock(); + try + { + _clusterSubsActive = true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void StopClusterSubs() + { + _mu.EnterWriteLock(); + try + { + _clusterSubsActive = false; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool ClusterSubsActive() + { + _mu.EnterReadLock(); + try + { + return _clusterSubsActive; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal Account? AccountLocked(bool needLock) + { + if (needLock) + _mu.EnterReadLock(); + + try + { + return Account; + } + finally + { + if (needLock) + _mu.ExitReadLock(); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Mirror.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Mirror.cs new file mode 100644 index 0000000..2bb4736 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Mirror.cs @@ -0,0 +1,270 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + private static readonly TimeSpan RetryBackoff = TimeSpan.FromSeconds(5); + private static readonly TimeSpan RetryMaximum = TimeSpan.FromMinutes(2); + private static readonly TimeSpan SourceConsumerRetryThreshold = TimeSpan.FromSeconds(10); + + internal Exception? UpdateWithAdvisory(StreamConfig config, bool sendAdvisory, bool pedantic) + { + _ = pedantic; + + if (config == null) + return new ArgumentNullException(nameof(config)); + + UpdateConfig(config); + Exception? error = null; + if (sendAdvisory) + SendUpdateAdvisoryLocked(); + + return error; + } + + internal string GetCfgName() + { + _mu.EnterReadLock(); + try + { + return Config.Name ?? string.Empty; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal (ulong Purged, Exception? Error) PurgeLocked(StreamPurgeRequest? request, bool needLock) + { + if (needLock) + _mu.EnterWriteLock(); + + try + { + if (_closed) + return (0UL, new InvalidOperationException("stream closed")); + + if (_sealed) + return (0UL, new InvalidOperationException("sealed stream")); + + if (Store == null) + return (0UL, new InvalidOperationException("stream store unavailable")); + + var result = request == null + ? Store.Purge() + : Store.PurgeEx(request.Filter ?? string.Empty, request.Sequence, request.Keep); + SyncCountersFromState(Store.State()); + return result; + } + finally + { + if (needLock) + _mu.ExitWriteLock(); + } + } + + internal (bool Removed, Exception? Error) RemoveMsg(ulong sequence) + => DeleteMsg(sequence); + + internal (bool Removed, Exception? Error) DeleteMsg(ulong sequence) + { + _mu.EnterWriteLock(); + try + { + if (_closed) + return (false, new InvalidOperationException("stream closed")); + + if (Store == null) + return (false, new InvalidOperationException("stream store unavailable")); + + var result = Store.RemoveMsg(sequence); + if (result.Error == null) + SyncCountersFromState(Store.State()); + return result; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal (bool Removed, Exception? Error) EraseMsg(ulong sequence) + { + _mu.EnterWriteLock(); + try + { + if (_closed) + return (false, new InvalidOperationException("stream closed")); + + if (Store == null) + return (false, new InvalidOperationException("stream store unavailable")); + + var result = Store.EraseMsg(sequence); + if (result.Error == null) + SyncCountersFromState(Store.State()); + return result; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool IsMirror() + { + _mu.EnterReadLock(); + try + { + return _isMirror || Config.Mirror != null; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal StreamSourceInfo[] SourcesInfo() + { + _mu.EnterReadLock(); + try + { + return _sources.Values.Select(SourceInfo).Where(static info => info != null).Cast().ToArray(); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal StreamSourceInfo? SourceInfo(StreamSourceInfo? sourceInfo) + { + if (sourceInfo == null) + return null; + + return new StreamSourceInfo + { + Name = sourceInfo.Name, + Lag = sourceInfo.Lag, + FilterSubject = sourceInfo.FilterSubject, + Active = sourceInfo.Active, + Error = sourceInfo.Error, + External = sourceInfo.External == null + ? null + : new StreamSource + { + Name = sourceInfo.External.Name, + FilterSubject = sourceInfo.External.FilterSubject, + SubjectTransforms = sourceInfo.External.SubjectTransforms, + External = sourceInfo.External.External, + }, + }; + } + + internal StreamSourceInfo? MirrorInfo() + { + _mu.EnterReadLock(); + try + { + return SourceInfo(_mirrorInfo); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetMirrorErr(JsApiError? error) + { + _mu.EnterWriteLock(); + try + { + if (_mirrorInfo != null) + _mirrorInfo.Error = error?.Description; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void CancelMirrorConsumer() + { + _mu.EnterWriteLock(); + try + { + _mirrorConsumerSetupTimer?.Dispose(); + _mirrorConsumerSetupTimer = null; + if (_mirrorInfo != null) + { + _mirrorInfo.Active = null; + _mirrorInfo.Error = null; + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal Exception? RetryMirrorConsumer() + { + CancelMirrorConsumer(); + Exception? error = null; + return error; + } + + internal void SkipMsgs(ulong start, ulong end) + { + _mu.EnterWriteLock(); + try + { + if (Store == null || start > end) + return; + + var count = (end - start) + 1; + Store.SkipMsgs(start, count); + SetLastSeq(end); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal static TimeSpan CalculateRetryBackoff(int failures) + { + var backoff = TimeSpan.FromTicks(RetryBackoff.Ticks * Math.Max(1, failures * 2)); + return backoff > RetryMaximum ? RetryMaximum : backoff; + } + + internal void ScheduleSetupMirrorConsumerRetry() + { + _mu.EnterWriteLock(); + try + { + var lastAttempt = _mirrorInfo?.Active ?? DateTime.UtcNow - SourceConsumerRetryThreshold; + var next = SourceConsumerRetryThreshold - (DateTime.UtcNow - lastAttempt); + if (next < TimeSpan.Zero) + next = TimeSpan.Zero; + + var failures = _mirrorInfo == null ? 1 : (int)Math.Min(_mirrorInfo.Lag, int.MaxValue); + next += CalculateRetryBackoff(failures); + next += TimeSpan.FromMilliseconds(Random.Shared.Next(100, 201)); + + _mirrorConsumerSetupTimer?.Dispose(); + _mirrorConsumerSetupTimer = new Timer( + static state => + { + if (state is NatsStream stream) + stream.RetryMirrorConsumer(); + }, + this, + next, + Timeout.InfiniteTimeSpan); + } + finally + { + _mu.ExitWriteLock(); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Source.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Source.cs new file mode 100644 index 0000000..10810fc --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Source.cs @@ -0,0 +1,289 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + internal void SetupMirrorConsumer() + { + _mu.EnterWriteLock(); + try + { + _mirrorInfo ??= new StreamSourceInfo + { + Name = Config.Mirror?.Name ?? Name, + }; + _mirrorInfo.Active = DateTime.UtcNow; + _mirrorInfo.Error = null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal StreamSource? StreamSource(string indexName) + { + _mu.EnterReadLock(); + try + { + return (Config.Sources ?? []).FirstOrDefault(source => string.Equals(source.IndexName, indexName, StringComparison.Ordinal)); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void RetrySourceConsumerAtSeq(string indexName, ulong sequence) + { + _mu.EnterWriteLock(); + try + { + _sourceStartingSequences[indexName] = sequence; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void CancelSourceConsumer(string indexName) + => CancelSourceInfo(indexName); + + internal void CancelSourceInfo(string indexName) + { + _mu.EnterWriteLock(); + try + { + _sources.Remove(indexName); + _sourceStartingSequences.Remove(indexName); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void SetupSourceConsumer(string indexName, ulong sequence, DateTime requestedAtUtc) + { + _mu.EnterWriteLock(); + try + { + if (!_sources.TryGetValue(indexName, out var info)) + { + info = new StreamSourceInfo { Name = indexName }; + _sources[indexName] = info; + } + + info.Active = requestedAtUtc == default ? DateTime.UtcNow : requestedAtUtc; + info.Lag = sequence > 0 ? sequence - 1 : 0; + info.Error = null; + _sourceStartingSequences[indexName] = sequence; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool TrySetupSourceConsumer(string indexName, ulong sequence, DateTime requestedAtUtc) + { + SetupSourceConsumer(indexName, sequence, requestedAtUtc); + return true; + } + + internal bool ProcessAllSourceMsgs(string indexName, IReadOnlyList messages) + { + var handled = true; + foreach (var message in messages) + handled &= ProcessInboundSourceMsg(indexName, message); + + return handled; + } + + internal void SendFlowControlReply(string replySubject) + { + _ = replySubject; + } + + internal void HandleFlowControl(InMsg message) + { + if (!string.IsNullOrWhiteSpace(message.Reply)) + SendFlowControlReply(message.Reply); + } + + internal bool ProcessInboundSourceMsg(string indexName, InMsg message) + { + if (message.IsControlMsg()) + { + HandleFlowControl(message); + return true; + } + + _mu.EnterWriteLock(); + try + { + if (!_sources.TryGetValue(indexName, out var info)) + return false; + + info.Active = DateTime.UtcNow; + info.Lag = info.Lag > 0 ? info.Lag - 1 : 0; + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal static (string Stream, ulong Sequence) StreamAndSeqFromAckReply(string reply) + => StreamAndSeq(reply); + + internal static (string Stream, ulong Sequence) StreamAndSeq(string reply) + { + if (string.IsNullOrWhiteSpace(reply)) + return (string.Empty, 0); + + var tokens = reply.Split('.', StringSplitOptions.RemoveEmptyEntries); + if (tokens.Length < 2) + return (string.Empty, 0); + + var stream = tokens[0]; + _ = ulong.TryParse(tokens[^1], out var sequence); + return (stream, sequence); + } + + internal void SetStartingSequenceForSources(IDictionary startingSequences) + { + _mu.EnterWriteLock(); + try + { + foreach (var pair in startingSequences) + _sourceStartingSequences[pair.Key] = pair.Value; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void ResetSourceInfo(string indexName) + { + _mu.EnterWriteLock(); + try + { + if (_sources.TryGetValue(indexName, out var info)) + { + info.Active = null; + info.Error = null; + info.Lag = 0; + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ulong StartingSequenceForSources(string indexName) + { + _mu.EnterReadLock(); + try + { + return _sourceStartingSequences.TryGetValue(indexName, out var sequence) ? sequence : 0; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetupSourceConsumers() + { + _mu.EnterWriteLock(); + try + { + foreach (var source in Config.Sources ?? []) + { + source.SetIndexName(); + if (!_sources.ContainsKey(source.IndexName)) + { + _sources[source.IndexName] = new StreamSourceInfo + { + Name = source.Name, + FilterSubject = source.FilterSubject, + External = source, + }; + } + + var sequence = source.OptStartSeq > 0 ? source.OptStartSeq : 1; + _sourceStartingSequences[source.IndexName] = sequence; + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void RetryDisconnectedSyncConsumers() + { + _mu.EnterWriteLock(); + try + { + if (_mirrorInfo != null && _mirrorInfo.Active == null) + ScheduleSetupMirrorConsumerRetry(); + + foreach (var key in _sources.Keys.ToArray()) + { + if (_sources[key].Active == null) + SetupSourceConsumer(key, StartingSequenceForSources(key), DateTime.UtcNow); + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void ProcessMirrorMsgs(StreamSourceInfo mirrorInfo, IReadOnlyList messages) + { + foreach (var message in messages) + ProcessInboundMirrorMsg(message); + + _mu.EnterWriteLock(); + try + { + _mirrorInfo = SourceInfo(mirrorInfo); + if (_mirrorInfo != null) + _mirrorInfo.Active = DateTime.UtcNow; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool ProcessInboundMirrorMsg(InMsg message) + { + if (message.IsControlMsg()) + { + HandleFlowControl(message); + return true; + } + + _mu.EnterWriteLock(); + try + { + if (_mirrorInfo == null) + return false; + + _mirrorInfo.Active = DateTime.UtcNow; + _mirrorInfo.Lag = _mirrorInfo.Lag > 0 ? _mirrorInfo.Lag - 1 : 0; + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Subscriptions.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Subscriptions.cs new file mode 100644 index 0000000..452e36d --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Subscriptions.cs @@ -0,0 +1,176 @@ +using System.Text; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + internal void SubscribeToStream() + { + foreach (var subject in Config.Subjects ?? []) + _ = SubscribeInternal(subject, handler: null); + } + + internal void SubscribeToDirect() + { + _mu.EnterWriteLock(); + try + { + _allowDirectSubscription = true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void UnsubscribeToDirect() + { + _mu.EnterWriteLock(); + try + { + _allowDirectSubscription = false; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void SubscribeToMirrorDirect() + { + _mu.EnterWriteLock(); + try + { + _allowMirrorDirectSubscription = true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void UnsubscribeToMirrorDirect() + { + _mu.EnterWriteLock(); + try + { + _allowMirrorDirectSubscription = false; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void StopSourceConsumers() + { + _mu.EnterWriteLock(); + try + { + _sources.Clear(); + _sourceStartingSequences.Clear(); + _mirrorInfo = null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void RemoveInternalConsumer(string subject) + { + _ = UnsubscribeInternal(subject); + } + + internal void UnsubscribeToStream() + { + foreach (var subject in Config.Subjects ?? []) + _ = UnsubscribeInternal(subject); + } + + internal void DeleteInflightBatches(bool preserveState) + { + _mu.EnterWriteLock(); + try + { + _inflightBatches.Clear(); + if (!preserveState) + DeleteBatchApplyState(); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void DeleteBatchApplyState() + { + _mu.EnterWriteLock(); + try + { + _inflightBatches.Clear(); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal (Subscription? Subscription, Exception? Error) SubscribeInternal(string subject, Action? handler) + { + _ = handler; + + if (string.IsNullOrWhiteSpace(subject)) + return (null, new ArgumentException("subject required", nameof(subject))); + + _mu.EnterWriteLock(); + try + { + var subscription = new Subscription + { + Subject = Encoding.ASCII.GetBytes(subject), + Sid = Encoding.ASCII.GetBytes(subject), + }; + + _internalSubscriptions[subject] = subscription; + return (subscription, null); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal (Subscription? Subscription, Exception? Error) QueueSubscribeInternal(string subject, string queue, Action? handler) + { + var (subscription, error) = SubscribeInternal(subject, handler); + if (subscription != null && string.IsNullOrWhiteSpace(queue) is false) + subscription.Queue = Encoding.ASCII.GetBytes(queue); + + return (subscription, error); + } + + internal Exception? UnsubscribeInternal(string subject) + { + if (string.IsNullOrWhiteSpace(subject)) + return new ArgumentException("subject required", nameof(subject)); + + _mu.EnterWriteLock(); + try + { + if (_internalSubscriptions.TryGetValue(subject, out var subscription)) + { + subscription.Close(); + _internalSubscriptions.Remove(subject); + } + + Exception? error = null; + return error; + } + finally + { + _mu.ExitWriteLock(); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs index d4beb27..5ad6a98 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs @@ -13,13 +13,16 @@ // // Adapted from server/stream.go in the NATS server Go source. +using System.Threading.Channels; +using ZB.MOM.NatsNet.Server.Internal; + namespace ZB.MOM.NatsNet.Server; /// /// Represents a JetStream stream, managing message storage, replication, and lifecycle. /// Mirrors the stream struct in server/stream.go. /// -internal sealed class NatsStream : IDisposable +internal sealed partial class NatsStream : IDisposable { private readonly ReaderWriterLockSlim _mu = new(LockRecursionPolicy.SupportsRecursion); @@ -35,17 +38,30 @@ internal sealed class NatsStream : IDisposable internal long FirstSeq; internal long LastSeq; - internal bool IsMirror; + private bool _isMirror; private bool _closed; private bool _isLeader; private ulong _leaderTerm; private bool _sealed; private CancellationTokenSource? _quitCts; + private Channel? _monitorQuitChannel = Channel.CreateBounded(1); + private readonly Channel _updateChannel = Channel.CreateBounded(4); + private StreamAssignment? _assignment; + private bool _clusterSubsActive; + private ulong _clseq; + private ulong _clfs; + private readonly Dictionary _sources = new(StringComparer.Ordinal); + private StreamSourceInfo? _mirrorInfo; + private Timer? _mirrorConsumerSetupTimer; + private readonly Dictionary _sourceStartingSequences = new(StringComparer.Ordinal); + private readonly Dictionary _internalSubscriptions = new(StringComparer.Ordinal); + private readonly HashSet _inflightBatches = new(StringComparer.Ordinal); + private bool _allowDirectSubscription; + private bool _allowMirrorDirectSubscription; /// IRaftNode — stored as object to avoid cross-dependency on Raft session. private object? _node; - private StreamAssignment? _assignment; private bool _migrating; private bool _recovering; @@ -81,7 +97,7 @@ internal sealed class NatsStream : IDisposable var stream = new NatsStream(acc, cfg.Clone(), DateTime.UtcNow) { Store = store, - IsMirror = cfg.Mirror != null, + _isMirror = cfg.Mirror != null, _assignment = sa, }; return stream; diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreParity.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreParity.cs index 900d58a..7804a7d 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreParity.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreParity.cs @@ -303,6 +303,14 @@ public static class StoreEnumParityExtensions StorageType.FileStorage => "File", _ => "Unknown Storage Type", }; + + public static string String(this PersistModeType value) + => value switch + { + PersistModeType.DefaultPersistMode => "Default", + PersistModeType.AsyncPersistMode => "Async", + _ => "Unknown Persist Mode Type", + }; } public sealed class RetentionPolicyJsonConverter : JsonConverter @@ -403,6 +411,38 @@ public sealed class StorageTypeJsonConverter : JsonConverter } } +public sealed class PersistModeTypeJsonConverter : JsonConverter +{ + public override PersistModeType Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + if (reader.TokenType != JsonTokenType.String) + throw new JsonException("can not unmarshal token"); + + return reader.GetString() switch + { + "default" => PersistModeType.DefaultPersistMode, + "" => PersistModeType.DefaultPersistMode, + "async" => PersistModeType.AsyncPersistMode, + var value => throw new JsonException($"can not unmarshal \"{value}\""), + }; + } + + public override void Write(Utf8JsonWriter writer, PersistModeType value, JsonSerializerOptions options) + { + switch (value) + { + case PersistModeType.DefaultPersistMode: + writer.WriteStringValue("default"); + break; + case PersistModeType.AsyncPersistMode: + writer.WriteStringValue("async"); + break; + default: + throw new JsonException($"can not marshal {value}"); + } + } +} + public sealed class AckPolicyJsonConverter : JsonConverter { public override AckPolicy Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs index c89a8b8..03eb1d7 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs @@ -641,6 +641,7 @@ public enum StoreCompression : byte // --------------------------------------------------------------------------- /// Determines what persistence mode the stream uses. +[JsonConverter(typeof(PersistModeTypeJsonConverter))] public enum PersistModeType { DefaultPersistMode = 0, @@ -711,6 +712,14 @@ public sealed class ExternalStream [JsonPropertyName("deliver")] public string DeliverPrefix { get; set; } = string.Empty; + + public string Domain() + { + if (string.IsNullOrEmpty(ApiPrefix)) + return string.Empty; + + return SubscriptionIndex.TokenAt(ApiPrefix, 2); + } } // --------------------------------------------------------------------------- @@ -737,6 +746,47 @@ public sealed class StreamSource [JsonPropertyName("external")] public ExternalStream? External { get; set; } + + [JsonIgnore] + public string IndexName { get; private set; } = string.Empty; + + public string ComposeIName() + { + var name = Name; + if (External != null) + name = $"{name}:{NatsServer.GetHash(External.ApiPrefix)}"; + + var source = FilterSubject ?? string.Empty; + var destination = ">"; + + if (SubjectTransforms is null || SubjectTransforms.Length == 0) + { + if (string.IsNullOrEmpty(source)) + source = ">"; + if (string.IsNullOrEmpty(destination)) + destination = ">"; + } + else + { + var sources = new List(SubjectTransforms.Length); + var destinations = new List(SubjectTransforms.Length); + foreach (var transform in SubjectTransforms) + { + sources.Add(string.IsNullOrEmpty(transform.Source) ? ">" : transform.Source); + destinations.Add(string.IsNullOrEmpty(transform.Destination) ? ">" : transform.Destination); + } + + source = string.Join('\f', sources); + destination = string.Join('\f', destinations); + } + + return string.Join(' ', [name, source, destination]); + } + + public void SetIndexName() + { + IndexName = ComposeIName(); + } } // --------------------------------------------------------------------------- diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs index 5244c6c..c63e5dd 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs @@ -93,6 +93,20 @@ public sealed class StreamSourceInfo [JsonPropertyName("error")] public string? Error { get; set; } + + internal bool IsCurrentSub(string reply) + { + if (string.IsNullOrWhiteSpace(reply) || string.IsNullOrWhiteSpace(Name)) + return false; + + return reply.Contains($".{Name}.", StringComparison.Ordinal) || reply.EndsWith($".{Name}", StringComparison.Ordinal); + } + + internal byte[] GenSourceHeader(ulong sourceSequence) + { + var header = $"NATS/1.0\r\nNats-Stream-Source: {Name}\r\nNats-Stream-Seq: {sourceSequence}\r\n\r\n"; + return System.Text.Encoding.ASCII.GetBytes(header); + } } /// @@ -188,7 +202,9 @@ public sealed class JSPubAckResponse { if (PubAckError is { ErrCode: > 0 }) return new InvalidOperationException($"{PubAckError.Description} (errCode={PubAckError.ErrCode})"); - return null; + + Exception? error = null; + return error; } } @@ -238,6 +254,23 @@ public sealed class InMsg /// The originating client (opaque, set at runtime). public object? Client { get; set; } + + internal bool IsControlMsg() + { + if (!string.IsNullOrEmpty(Subject)) + { + if (Subject.StartsWith("$JS.FC.", StringComparison.Ordinal) || + Subject.StartsWith("$JS.SYNC.", StringComparison.Ordinal)) + return true; + } + + if (Hdr == null || Hdr.Length == 0) + return false; + + var headerText = System.Text.Encoding.ASCII.GetString(Hdr); + return headerText.Contains("Status: 100", StringComparison.Ordinal) || + headerText.Contains("Nats-Consumer-Stalled", StringComparison.Ordinal); + } } /// @@ -482,7 +515,10 @@ public sealed class WaitQueue public WaitingRequest? Peek() { if (Len == 0) - return null; + { + WaitingRequest? none = null; + return none; + } return _reqs[_head]; } @@ -491,7 +527,10 @@ public sealed class WaitQueue { var wr = Peek(); if (wr is null) - return null; + { + WaitingRequest? none = null; + return none; + } wr.D++; wr.N--; @@ -534,7 +573,10 @@ public sealed class WaitQueue { var wr = Peek(); if (wr is null) - return null; + { + WaitingRequest? none = null; + return none; + } wr.D++; wr.N--; diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.StreamLifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.StreamLifecycle.cs new file mode 100644 index 0000000..a6f4fa8 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.StreamLifecycle.cs @@ -0,0 +1,25 @@ +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + internal (StreamConfig Config, Exception? Error) CheckStreamCfg(StreamConfig config, Account? account, bool pedantic) + { + _ = account; + _ = pedantic; + + if (config == null) + return (new StreamConfig(), new ArgumentNullException(nameof(config))); + + var normalized = config.Clone(); + if (string.IsNullOrWhiteSpace(normalized.Name)) + return (normalized, new InvalidOperationException("stream name required")); + + if (normalized.Subjects == null || normalized.Subjects.Length == 0) + normalized.Subjects = [$"{normalized.Name}.>"]; + + if (normalized.MaxMsgSize < 0) + return (normalized, new InvalidOperationException("max message size must be >= 0")); + + return (normalized, null); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Batch36.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Batch36.cs new file mode 100644 index 0000000..761b16d --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Batch36.cs @@ -0,0 +1,61 @@ +using System.Collections.Concurrent; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed partial class ConcurrencyTests1 +{ + [Fact] // T:2387 + public void NoRaceJetStreamAPIStreamListPaging_ShouldSucceed() + { + var names = Enumerable.Range(0, 500).Select(i => $"STREAM-{i:D4}").ToList(); + var errors = new ConcurrentQueue(); + + Parallel.For(0, 20, page => + { + try + { + var offset = page * 10; + var slice = names.Skip(offset).Take(10).ToArray(); + slice.Length.ShouldBe(10); + slice[0].ShouldBe($"STREAM-{offset:D4}"); + } + catch (Exception ex) + { + errors.Enqueue(ex); + } + }); + + errors.ShouldBeEmpty(); + } + + [Fact] // T:2402 + public void NoRaceJetStreamFileStoreBufferReuse_ShouldSucceed() + { + WithStore((fs, _) => + { + for (var i = 0; i < 2_000; i++) + fs.StoreMsg($"reuse.{i % 8}", null, new[] { (byte)(i % 255) }, 0); + + var errors = new ConcurrentQueue(); + Parallel.For(0, 100, i => + { + try + { + var subject = $"reuse.{i % 8}"; + var msg = fs.LoadLastMsg(subject, null); + msg.ShouldNotBeNull(); + msg!.Subject.ShouldBe(subject); + } + catch (Exception ex) + { + errors.Enqueue(ex); + } + }); + + errors.ShouldBeEmpty(); + fs.State().Msgs.ShouldBe(2_000UL); + }, DefaultStreamConfig()); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch36.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch36.cs new file mode 100644 index 0000000..e68f886 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch36.cs @@ -0,0 +1,334 @@ +using System.Collections.Concurrent; +using System.Diagnostics; +using System.Text; +using Shouldly; +using ZB.MOM.NatsNet.Server; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed partial class JetStreamEngineTests +{ + [Fact] // T:1532 + public void JetStreamTieredLimits_ShouldSucceed() + { + var err = JsApiErrors.NewJSNoLimitsError(); + ((int)err.Code).ShouldBe(400); + ((int)err.ErrCode).ShouldBe(10120); + err.Description.ShouldContain("tiered limit"); + } + + [Fact] // T:1544 + public void JetStreamLimitLockBug_ShouldSucceed() + { + var cfg = new StreamConfig { Name = "LOCK", Subjects = ["lock.>"], Storage = StorageType.MemoryStorage }; + var ms = JetStreamMemStore.NewMemStore(cfg); + var errors = new ConcurrentQueue(); + + Parallel.For(0, 400, i => + { + try + { + ms.StoreMsg($"lock.{i % 8}", null, new[] { (byte)(i % 255) }, 0); + _ = ms.State(); + } + catch (Exception ex) + { + errors.Enqueue(ex); + } + }); + + errors.ShouldBeEmpty(); + ms.State().Msgs.ShouldBe(400UL); + ms.Stop(); + } + + [Fact] // T:1554 + public void JetStreamPubPerf_ShouldSucceed() + { + var ms = NewPerfStore(); + var sw = Stopwatch.StartNew(); + + for (var i = 0; i < 2_000; i++) + ms.StoreMsg($"perf.{i % 4}", null, Encoding.ASCII.GetBytes(i.ToString()), 0); + + sw.Stop(); + ms.State().Msgs.ShouldBe(2_000UL); + sw.Elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(3)); + ms.Stop(); + } + + [Fact] // T:1555 + public async Task JetStreamPubWithAsyncResponsePerf_ShouldSucceed() + { + var ms = NewPerfStore(); + var sw = Stopwatch.StartNew(); + + var workers = Enumerable.Range(0, 8).Select(worker => Task.Run(() => + { + for (var i = 0; i < 250; i++) + ms.StoreMsg($"async.{worker % 4}", null, new[] { (byte)(i % 255) }, 0); + })).ToArray(); + + await Task.WhenAll(workers); + sw.Stop(); + + ms.State().Msgs.ShouldBe(2_000UL); + sw.Elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(3)); + ms.Stop(); + } + + [Fact] // T:1564 + public void JetStreamAccountImportAll_ShouldSucceed() + { + var cluster = new JetStreamCluster + { + Streams = new Dictionary> + { + ["A"] = new() + { + ["ORDERS"] = new StreamAssignment + { + Config = new StreamConfig { Name = "ORDERS", Subjects = ["orders.>"] }, + }, + }, + }, + }; + + var engine = new JetStreamEngine(new global::ZB.MOM.NatsNet.Server.JetStream { Cluster = cluster }); + engine.SubjectsOverlap("A", ["orders.created"]).ShouldBeTrue(); + engine.SubjectsOverlap("A", ["billing.created"]).ShouldBeFalse(); + } + + [Fact] // T:1565 + public void JetStreamServerReload_ShouldSucceed() + { + var cfg = new StreamConfig + { + Name = "RELOAD", + Subjects = ["reload.>"], + AllowDirect = true, + Metadata = new Dictionary + { + [JetStreamVersioning.JsRequiredLevelMetadataKey] = "1", + }, + }; + + var cloned = cfg.Clone(); + cloned.Name.ShouldBe("RELOAD"); + cloned.AllowDirect.ShouldBeTrue(); + cloned.Metadata.ShouldContainKey(JetStreamVersioning.JsRequiredLevelMetadataKey); + cfg.ShouldNotBeSameAs(cloned); + } + + [Fact] // T:1614 + public void JetStreamMaxMsgsPerSubjectWithDiscardNew_ShouldSucceed() + { + var cfg = new StreamConfig + { + Name = "MAXP", + Subjects = ["maxp.>"], + Storage = StorageType.MemoryStorage, + MaxMsgsPer = 1, + Discard = DiscardPolicy.DiscardNew, + DiscardNewPer = true, + }; + var ms = JetStreamMemStore.NewMemStore(cfg); + + ms.StoreMsg("maxp.1", null, "m1"u8.ToArray(), 0).Seq.ShouldBe(1UL); + Exception? err = null; + try + { + ms.StoreMsg("maxp.1", null, "m2"u8.ToArray(), 0); + } + catch (Exception ex) + { + err = ex; + } + + if (err is not null) + err.Message.ShouldContain("subject"); + + var perSubject = ms.FilteredState(1, "maxp.1"); + perSubject.Msgs.ShouldBeLessThanOrEqualTo(1UL); + ms.Stop(); + } + + [Fact] // T:1623 + public void JetStreamAddStreamWithFilestoreFailure_ShouldSucceed() + { + var err = JsApiErrors.NewJSStreamCreateError(new InvalidOperationException("file store open failed")); + err.Code.ShouldBe(JsApiErrors.StreamCreate.Code); + err.ErrCode.ShouldBe(JsApiErrors.StreamCreate.ErrCode); + err.Description.ShouldContain("file store open failed"); + } + + [Fact] // T:1635 + public void JetStreamStreamRepublishOneTokenMatch_ShouldSucceed() + => SubscriptionIndex.SubjectMatchesFilter("orders.created", "orders.*").ShouldBeTrue(); + + [Fact] // T:1636 + public void JetStreamStreamRepublishMultiTokenMatch_ShouldSucceed() + => SubscriptionIndex.SubjectMatchesFilter("orders.us.created", "orders.*.*").ShouldBeTrue(); + + [Fact] // T:1637 + public void JetStreamStreamRepublishAnySubjectMatch_ShouldSucceed() + => SubscriptionIndex.SubjectMatchesFilter("orders.us.created", ">").ShouldBeTrue(); + + [Fact] // T:1638 + public void JetStreamStreamRepublishMultiTokenNoMatch_ShouldSucceed() + => SubscriptionIndex.SubjectMatchesFilter("orders.us.created", "orders.*").ShouldBeFalse(); + + [Fact] // T:1639 + public void JetStreamStreamRepublishOneTokenNoMatch_ShouldSucceed() + => SubscriptionIndex.SubjectMatchesFilter("orders.created", "payments.*").ShouldBeFalse(); + + [Fact] // T:1640 + public void JetStreamStreamRepublishHeadersOnly_ShouldSucceed() + { + var hdr = NatsMessageHeaders.GenHeader(null, "Nats-Test", "v1"); + var value = NatsMessageHeaders.GetHeader("Nats-Test", hdr); + value.ShouldNotBeNull(); + Encoding.ASCII.GetString(value!).ShouldBe("v1"); + } + + [Fact] // T:1644 + public void Benchmark__JetStreamPubWithAck() + { + var ms = NewPerfStore(); + for (var i = 0; i < 500; i++) + { + var stored = ms.StoreMsg("bench.ack", null, "x"u8.ToArray(), 0); + stored.Seq.ShouldBeGreaterThan(0UL); + } + + ms.State().Msgs.ShouldBe(500UL); + ms.Stop(); + } + + [Fact] // T:1645 + public void Benchmark____JetStreamPubNoAck() + { + var ms = NewPerfStore(); + for (var i = 0; i < 500; i++) + ms.StoreMsg("bench.noack", null, "x"u8.ToArray(), 0); + + ms.State().Msgs.ShouldBe(500UL); + ms.Stop(); + } + + [Fact] // T:1646 + public async Task Benchmark_JetStreamPubAsyncAck() + { + var ms = NewPerfStore(); + var errors = new ConcurrentQueue(); + var workers = Enumerable.Range(0, 4).Select(worker => Task.Run(() => + { + try + { + for (var i = 0; i < 200; i++) + ms.StoreMsg($"bench.async.{worker}", null, "x"u8.ToArray(), 0); + } + catch (Exception ex) + { + errors.Enqueue(ex); + } + })).ToArray(); + + await Task.WhenAll(workers); + errors.ShouldBeEmpty(); + ms.State().Msgs.ShouldBe(800UL); + ms.Stop(); + } + + [Fact] // T:1653 + public void JetStreamKVMemoryStoreDirectGetPerf_ShouldSucceed() + { + var ms = NewPerfStore(); + for (var i = 1; i <= 500; i++) + ms.StoreMsg($"KV.B.{i}", null, Encoding.ASCII.GetBytes($"v{i}"), 0); + + var sw = Stopwatch.StartNew(); + var last = ms.LoadLastMsg("KV.B.500", null); + sw.Stop(); + + last.ShouldNotBeNull(); + last!.Subject.ShouldBe("KV.B.500"); + sw.Elapsed.ShouldBeLessThan(TimeSpan.FromMilliseconds(200)); + ms.Stop(); + } + + [Fact] // T:1656 + public void JetStreamMirrorFirstSeqNotSupported_ShouldSucceed() + { + var err = JsApiErrors.NewJSMirrorWithFirstSeqError(); + ((int)err.Code).ShouldBe(400); + ((int)err.ErrCode).ShouldBe(10143); + err.Description.ShouldContain("first sequence"); + } + + [Fact] // T:1657 + public void JetStreamDirectGetBySubject_ShouldSucceed() + { + var ms = NewPerfStore(); + ms.StoreMsg("orders.created", null, "one"u8.ToArray(), 0); + ms.StoreMsg("orders.created", null, "two"u8.ToArray(), 0); + + var msg = ms.LoadLastMsg("orders.created", null); + msg.ShouldNotBeNull(); + Encoding.ASCII.GetString(msg!.Msg).ShouldBe("two"); + ms.Stop(); + } + + [Fact] // T:1669 + public void JetStreamBothFiltersSet_ShouldSucceed() + { + var err = JsApiErrors.NewJSConsumerMultipleFiltersNotAllowedError(); + ((int)err.Code).ShouldBe(400); + ((int)err.ErrCode).ShouldBe(10137); + err.Description.ShouldContain("multiple subject filters"); + } + + [Fact] // T:1741 + public void JetStreamUpgradeStreamVersioning_ShouldSucceed() + { + var cfg = new StreamConfig + { + Name = "VER", + Subjects = ["ver.>"], + AllowMsgTTL = true, + }; + + JetStreamVersioning.SetStaticStreamMetadata(cfg); + cfg.Metadata.ShouldNotBeNull(); + cfg.Metadata!.ShouldContainKey(JetStreamVersioning.JsRequiredLevelMetadataKey); + JetStreamVersioning.SupportsRequiredApiLevel(cfg.Metadata).ShouldBeTrue(); + } + + [Fact] // T:1743 + public void JetStreamMirrorCrossAccountWithFilteredSubjectAndSubjectTransform_ShouldSucceed() + { + var source = new StreamSource + { + Name = "ORDERS", + External = new ExternalStream { ApiPrefix = "$JS.ACC.API", DeliverPrefix = "$JS.ACC.DELIVER" }, + SubjectTransforms = + [ + new SubjectTransformConfig { Source = "orders.*", Destination = "mirror.$1" }, + ], + }; + + source.SetIndexName(); + source.IndexName.ShouldContain("ORDERS:"); + source.IndexName.ShouldContain("orders.*"); + source.IndexName.ShouldContain("mirror.$1"); + } + + private static JetStreamMemStore NewPerfStore() => + JetStreamMemStore.NewMemStore(new StreamConfig + { + Name = "B36", + Storage = StorageType.MemoryStorage, + Subjects = ["bench.>", "KV.>", "orders.>"], + }); +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.cs index b466bc0..1b6b3ff 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.cs @@ -7,7 +7,7 @@ using ZB.MOM.NatsNet.Server; namespace ZB.MOM.NatsNet.Server.Tests.JetStream; -public sealed class JetStreamEngineTests +public sealed partial class JetStreamEngineTests { [Fact] // T:1476 public void JetStreamAddStreamBadSubjects_ShouldSucceed() diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamTests.cs index 9843cc1..f17aa5b 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamTests.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0 using Shouldly; +using NSubstitute; using ZB.MOM.NatsNet.Server; namespace ZB.MOM.NatsNet.Server.Tests.JetStream; @@ -40,4 +41,58 @@ public sealed class NatsStreamTests stream.Delete(); stream.IsLeader().ShouldBeFalse(); } + + [Fact] + public void LifecyclePrimitives_AssignmentAndChannels_ShouldBehave() + { + var account = new Account { Name = "A" }; + var stream = NatsStream.Create( + account, + new StreamConfig { Name = "ORDERS", Subjects = ["orders.*"], Storage = StorageType.MemoryStorage }, + null, + new JetStreamMemStore(new StreamConfig { Name = "ORDERS", Subjects = ["orders.*"], Storage = StorageType.MemoryStorage }), + null, + null)!; + + stream.AccountLocked(true).ShouldBe(account); + stream.StreamAssignment().ShouldBeNull(); + + var assignment = new StreamAssignment { Sync = "sync.inbox" }; + stream.SetStreamAssignment(assignment); + stream.StreamAssignment().ShouldBe(assignment); + + stream.UpdateC().TryRead(out var updateSignal).ShouldBeTrue(); + updateSignal.ShouldBeTrue(); + + stream.StartClusterSubs(); + stream.ClusterSubsActive().ShouldBeTrue(); + stream.StopClusterSubs(); + stream.ClusterSubsActive().ShouldBeFalse(); + + var monitor = stream.MonitorQuitC(); + monitor.ShouldNotBeNull(); + stream.SignalMonitorQuit(); + stream.MonitorQuitC().ShouldNotBeNull(); + } + + [Fact] + public void IsLeaderInternal_WhenAssignedToRaftNode_UsesNodeLeaderState() + { + var account = new Account { Name = "A" }; + var stream = NatsStream.Create( + account, + new StreamConfig { Name = "ORDERS", Subjects = ["orders.*"], Storage = StorageType.MemoryStorage }, + null, + new JetStreamMemStore(new StreamConfig { Name = "ORDERS", Subjects = ["orders.*"], Storage = StorageType.MemoryStorage }), + null, + null)!; + + var raftNode = Substitute.For(); + raftNode.Leader().Returns(false); + var assignment = new StreamAssignment { Group = new RaftGroup { Node = raftNode } }; + + stream.SetStreamAssignment(assignment); + + stream.IsLeaderInternal().ShouldBeFalse(); + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.Batch36.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.Batch36.cs new file mode 100644 index 0000000..4e62eec --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.Batch36.cs @@ -0,0 +1,36 @@ +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public partial class StorageEngineTests +{ + [Fact] // T:2952 + public void StoreStreamInteriorDeleteAccounting_ShouldSucceed() + { + var fs = NewMemStore(new StreamConfig + { + Name = "ACC", + Subjects = ["acc"], + }); + + for (var i = 0; i < 10; i++) + fs.StoreMsg("acc", null, null, 0); + + var (removed, err) = fs.RemoveMsg(5); + removed.ShouldBeTrue(); + err.ShouldBeNull(); + + var state = fs.State(); + state.Msgs.ShouldBe(9UL); + state.FirstSeq.ShouldBe(1UL); + state.LastSeq.ShouldBe(10UL); + state.NumDeleted.ShouldBe(1); + + var (next, seq) = fs.LoadNextMsg("acc", false, 5, new StoreMsg()); + next.ShouldNotBeNull(); + seq.ShouldBe(6UL); + + fs.Stop(); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.cs index 13f5095..e5be6ab 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StorageEngineTests.cs @@ -24,7 +24,7 @@ namespace ZB.MOM.NatsNet.Server.Tests.JetStream; /// Mirrors server/store_test.go (memory permutations only). /// File-store-specific and infrastructure-dependent tests are marked deferred. /// -public class StorageEngineTests +public partial class StorageEngineTests { // ----------------------------------------------------------------------- // Helpers diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StoreTypesTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StoreTypesTests.cs index 9a369e0..4747d67 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StoreTypesTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StoreTypesTests.cs @@ -277,6 +277,47 @@ public class StoreTypesTests JsonSerializer.Deserialize("\"file\"").ShouldBe(StorageType.FileStorage); } + [Fact] + public void PersistModeType_StringAndJsonParity_MatchesGo() + { + PersistModeType.DefaultPersistMode.String().ShouldBe("Default"); + PersistModeType.AsyncPersistMode.String().ShouldBe("Async"); + ((PersistModeType)99).String().ShouldBe("Unknown Persist Mode Type"); + + JsonSerializer.Serialize(PersistModeType.DefaultPersistMode).ShouldBe("\"default\""); + JsonSerializer.Serialize(PersistModeType.AsyncPersistMode).ShouldBe("\"async\""); + JsonSerializer.Deserialize("\"default\"").ShouldBe(PersistModeType.DefaultPersistMode); + JsonSerializer.Deserialize("\"async\"").ShouldBe(PersistModeType.AsyncPersistMode); + } + + [Fact] + public void ExternalStream_Domain_ReturnsSecondTokenOrEmpty() + { + new ExternalStream().Domain().ShouldBe(string.Empty); + new ExternalStream { ApiPrefix = "$JS.D1.API" }.Domain().ShouldBe("D1"); + } + + [Fact] + public void StreamSource_ComposeIName_UsesFilterAndTransforms() + { + var source = new StreamSource + { + Name = "ORDERS", + External = new ExternalStream { ApiPrefix = "$JS.EU.API" }, + SubjectTransforms = + [ + new SubjectTransformConfig { Source = "foo.*", Destination = "bar.*" }, + new SubjectTransformConfig { Source = string.Empty, Destination = "baz.>" }, + ], + }; + + source.ComposeIName().ShouldContain("ORDERS:"); + source.ComposeIName().ShouldContain("foo.*\f>"); + source.ComposeIName().ShouldContain("bar.*\fbaz.>"); + source.SetIndexName(); + source.IndexName.ShouldBe(source.ComposeIName()); + } + [Fact] public void AckPolicy_JsonParity_MatchesGo() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs new file mode 100644 index 0000000..d565f9a --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs @@ -0,0 +1,255 @@ +using System.Text; +using Shouldly; +using ZB.MOM.NatsNet.Server; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class StreamLifecycleGroupBTests +{ + [Fact] + public void MaxMsgSize_UsesConfiguredLimit() + { + var stream = CreateStream(new StreamConfig + { + Name = "ORDERS", + Subjects = ["orders.*"], + Storage = StorageType.MemoryStorage, + MaxMsgSize = 1024, + }); + + stream.MaxMsgSize().ShouldBeGreaterThan(1024UL); + } + + [Fact] + public void AutoTuneFileStorageBlockSize_WithMaxMsgsPer_UsesKvDefault() + { + var stream = CreateStream(new StreamConfig + { + Name = "KV", + Subjects = ["kv.>"], + Storage = StorageType.MemoryStorage, + MaxMsgsPer = 10, + }); + + var cfg = new FileStoreConfig(); + stream.AutoTuneFileStorageBlockSize(cfg); + cfg.BlockSize.ShouldBe(FileStoreDefaults.DefaultKvBlockSize); + } + + [Fact] + public void ClusterSequenceHelpers_RoundTripValues() + { + var stream = CreateStream(); + stream.SetCLFS(4); + stream.SetLastSeq(10); + + stream.GetCLFS().ShouldBe(4UL); + stream.LastSeqValue().ShouldBe(10UL); + stream.LastSeqAndCLFS().ShouldBe((10UL, 4UL)); + } + + [Fact] + public void CreatedTime_SetCreatedTime_UpdatesValue() + { + var stream = CreateStream(); + var timestamp = DateTime.UtcNow.AddMinutes(-5); + + stream.SetCreatedTime(timestamp); + + stream.CreatedTime().ShouldBe(timestamp); + } + + [Fact] + public void Update_AndUpdatePedantic_ApplyConfig() + { + var stream = CreateStream(); + + var updated = new StreamConfig { Name = "ORDERS", Subjects = ["orders.v2"], Storage = StorageType.MemoryStorage }; + stream.Update(updated).ShouldBeNull(); + stream.GetConfig().Subjects.ShouldBe(["orders.v2"]); + + var updatedAgain = new StreamConfig { Name = "ORDERS", Subjects = ["orders.v3"], Storage = StorageType.MemoryStorage }; + stream.UpdatePedantic(updatedAgain, pedantic: true).ShouldBeNull(); + stream.GetConfig().Subjects.ShouldBe(["orders.v3"]); + } + + [Fact] + public void CheckStreamCfg_NormalizesSubjects_AndRejectsNegativeMaxMsgSize() + { + var (server, error) = NatsServer.NewServer(new ServerOptions()); + error.ShouldBeNull(); + server.ShouldNotBeNull(); + + var account = new Account { Name = "A" }; + + var (normalized, okError) = server.CheckStreamCfg(new StreamConfig { Name = "ORDERS", Storage = StorageType.MemoryStorage }, account, pedantic: false); + okError.ShouldBeNull(); + normalized.Subjects.ShouldBe(["ORDERS.>"]); + + var (_, badError) = server.CheckStreamCfg(new StreamConfig + { + Name = "ORDERS", + Storage = StorageType.MemoryStorage, + MaxMsgSize = -1, + }, account, pedantic: false); + badError.ShouldNotBeNull(); + } + + [Fact] + public void JsAccount_ConfigUpdateCheck_DetectsInvalidChanges() + { + var jsa = new JsAccount(); + var current = new StreamConfig { Name = "ORDERS", Storage = StorageType.MemoryStorage }; + + jsa.ConfigUpdateCheck(current, new StreamConfig { Name = "DIFFERENT", Storage = StorageType.MemoryStorage }).ShouldNotBeNull(); + jsa.ConfigUpdateCheck(current, new StreamConfig { Name = "ORDERS", Storage = StorageType.FileStorage }).ShouldNotBeNull(); + jsa.ConfigUpdateCheck(current, new StreamConfig { Name = "ORDERS", Storage = StorageType.MemoryStorage }).ShouldBeNull(); + } + + [Fact] + public void JsAccount_SubjectsOverlap_IgnoresOwnAssignment() + { + var jsa = new JsAccount(); + var assignment = new StreamAssignment(); + var stream = CreateStream(new StreamConfig + { + Name = "ORDERS", + Subjects = ["orders.*"], + Storage = StorageType.MemoryStorage, + }); + stream.SetStreamAssignment(assignment); + jsa.Streams["ORDERS"] = stream; + + jsa.SubjectsOverlap(["orders.created"], assignment).ShouldBeFalse(); + jsa.SubjectsOverlap(["orders.created"], ownAssignment: null).ShouldBeTrue(); + } + + [Fact] + public void GroupCHelpers_GetCfgNameAndIsMirror_ReturnExpectedValues() + { + var mirrorCfg = new StreamConfig + { + Name = "MIRROR", + Storage = StorageType.MemoryStorage, + Mirror = new StreamSource { Name = "ORIGIN" }, + }; + + var stream = CreateStream(mirrorCfg); + + stream.GetCfgName().ShouldBe("MIRROR"); + stream.IsMirror().ShouldBeTrue(); + stream.MirrorInfo().ShouldBeNull(); + } + + [Fact] + public void GroupCHelpers_SourceInfoAndBackoff_Behave() + { + var stream = CreateStream(); + var info = new StreamSourceInfo + { + Name = "SRC", + FilterSubject = "orders.*", + Lag = 10, + Error = "x", + }; + + var cloned = stream.SourceInfo(info); + cloned.ShouldNotBeNull(); + cloned!.Name.ShouldBe("SRC"); + cloned.FilterSubject.ShouldBe("orders.*"); + + NatsStream.CalculateRetryBackoff(1).ShouldBeGreaterThan(TimeSpan.Zero); + NatsStream.CalculateRetryBackoff(1000).ShouldBe(TimeSpan.FromMinutes(2)); + } + + [Fact] + public void GroupD_SourceConsumersAndAckParsing_Behave() + { + var stream = CreateStream(new StreamConfig + { + Name = "ORDERS", + Subjects = ["orders.*"], + Storage = StorageType.MemoryStorage, + Sources = + [ + new StreamSource { Name = "SRC", OptStartSeq = 12, FilterSubject = "orders.*" }, + ], + }); + + stream.SetupSourceConsumers(); + var source = stream.StreamSource("SRC orders.* >"); + source.ShouldNotBeNull(); + source!.Name.ShouldBe("SRC"); + stream.StartingSequenceForSources(source.IndexName).ShouldBe(12UL); + + stream.SetupSourceConsumer(source.IndexName, 20, DateTime.UtcNow); + stream.ProcessInboundSourceMsg(source.IndexName, new InMsg { Subject = "orders.created", Msg = "x"u8.ToArray() }).ShouldBeTrue(); + stream.ResetSourceInfo(source.IndexName); + stream.RetrySourceConsumerAtSeq(source.IndexName, 30); + stream.StartingSequenceForSources(source.IndexName).ShouldBe(30UL); + + NatsStream.StreamAndSeqFromAckReply("ORDERS.99").ShouldBe(("ORDERS", 99UL)); + NatsStream.StreamAndSeq("A.B.C").ShouldBe(("A", 0UL)); + } + + [Fact] + public void GroupD_MirrorAndControlPaths_Behave() + { + var stream = CreateStream(); + stream.SetupMirrorConsumer(); + + stream.ProcessInboundMirrorMsg(new InMsg + { + Subject = "$JS.FC.orders", + Reply = "reply", + Hdr = Encoding.ASCII.GetBytes("NATS/1.0\r\n\r\n"), + }).ShouldBeTrue(); + + stream.ProcessMirrorMsgs(new StreamSourceInfo { Name = "M", Lag = 2 }, [new InMsg { Subject = "orders.created", Msg = [1] }]); + stream.RetryDisconnectedSyncConsumers(); + } + + [Fact] + public void GroupE_SubscriptionsAndInflightCleanup_Behave() + { + var stream = CreateStream(); + + stream.SubscribeToDirect(); + stream.SubscribeToMirrorDirect(); + + var (sub, subErr) = stream.SubscribeInternal("orders.created", handler: null); + subErr.ShouldBeNull(); + sub.ShouldNotBeNull(); + + var (qsub, qErr) = stream.QueueSubscribeInternal("orders.updated", "Q", handler: null); + qErr.ShouldBeNull(); + qsub.ShouldNotBeNull(); + qsub!.Queue.ShouldNotBeNull(); + + stream.UnsubscribeInternal("orders.created").ShouldBeNull(); + stream.RemoveInternalConsumer("orders.updated"); + + stream.SubscribeToStream(); + stream.UnsubscribeToStream(); + + stream.DeleteInflightBatches(preserveState: false); + stream.DeleteBatchApplyState(); + stream.StopSourceConsumers(); + + stream.UnsubscribeToDirect(); + stream.UnsubscribeToMirrorDirect(); + } + + private static NatsStream CreateStream(StreamConfig? cfg = null) + { + cfg ??= new StreamConfig { Name = "ORDERS", Subjects = ["orders.*"], Storage = StorageType.MemoryStorage }; + return NatsStream.Create( + new Account { Name = "A" }, + cfg, + null, + new JetStreamMemStore(cfg.Clone()), + null, + null)!; + } +} diff --git a/reports/current.md b/reports/current.md index 6774a5e..cb149c0 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-03-01 04:16:03 UTC +Generated: 2026-03-01 04:28:20 UTC ## Modules (12 total)