using System.Collections.Concurrent; using System.Text; using NATS.Server.Auth; using NATS.Server.JetStream.Api; using NATS.Server.JetStream.Cluster; using NATS.Server.JetStream.MirrorSource; using NATS.Server.JetStream.Models; using NATS.Server.JetStream.Publish; using NATS.Server.JetStream.Snapshots; using NATS.Server.JetStream.Storage; using NATS.Server.JetStream.Validation; using NATS.Server.Subscriptions; namespace NATS.Server.JetStream; public sealed class StreamManager : IDisposable { private readonly Account? _account; private readonly ConsumerManager? _consumerManager; private readonly JetStreamMetaGroup? _metaGroup; private readonly ConcurrentDictionary _streams = new(StringComparer.Ordinal); private readonly ConcurrentDictionary _replicaGroups = new(StringComparer.Ordinal); private readonly ConcurrentDictionary> _mirrorsByOrigin = new(StringComparer.Ordinal); private readonly ConcurrentDictionary> _sourcesByOrigin = new(StringComparer.Ordinal); private readonly StreamSnapshotService _snapshotService = new(); private readonly CancellationTokenSource _expiryTimerCts = new(); private readonly string? _storeDir; private Task? _expiryTimerTask; public StreamManager(JetStreamMetaGroup? metaGroup = null, Account? account = null, ConsumerManager? consumerManager = null, string? storeDir = null) { _metaGroup = metaGroup; _account = account; _consumerManager = consumerManager; _storeDir = storeDir; _expiryTimerTask = RunExpiryTimerAsync(_expiryTimerCts.Token); } public void Dispose() { _expiryTimerCts.Cancel(); _expiryTimerCts.Dispose(); } /// /// Periodically prunes expired messages from streams with MaxAge configured. /// Go reference: stream.go — expireMsgs runs on a timer (checkMaxAge interval). /// private async Task RunExpiryTimerAsync(CancellationToken ct) { using var timer = new PeriodicTimer(TimeSpan.FromSeconds(1)); while (!ct.IsCancellationRequested) { var ticked = false; try { ticked = await timer.WaitForNextTickAsync(ct); } catch (OperationCanceledException) { return; // Shutdown requested via Dispose — exit the timer loop } if (!ticked) return; var nowUtc = DateTime.UtcNow; foreach (var stream in _streams.Values) { if (stream.Config.MaxAgeMs > 0) PruneExpiredMessages(stream, nowUtc); } } } public IReadOnlyCollection StreamNames => _streams.Keys.ToArray(); public MetaGroupState? GetMetaState() => _metaGroup?.GetState(); public IReadOnlyList ListNames() => [.. _streams.Keys.OrderBy(x => x, StringComparer.Ordinal)]; public IReadOnlyList ListStreamInfos() { return _streams.OrderBy(kv => kv.Key, StringComparer.Ordinal) .Select(kv => { var state = kv.Value.Store.GetStateAsync(default).GetAwaiter().GetResult(); return new JetStreamStreamInfo { Config = kv.Value.Config, State = state, }; }) .ToList(); } public JetStreamApiResponse CreateOrUpdate(StreamConfig config) { if (!JetStreamConfigValidator.IsValidName(config.Name)) return JetStreamApiResponse.ErrorResponse(400, "invalid stream name"); if (Encoding.UTF8.GetByteCount(config.Description) > JetStreamApiLimits.JSMaxDescriptionLen) return JetStreamApiResponse.ErrorResponse(400, "stream description is too long"); if (!JetStreamConfigValidator.IsMetadataWithinLimit(config.Metadata)) return JetStreamApiResponse.ErrorResponse(400, "stream metadata exceeds maximum size"); var normalized = NormalizeConfig(config); // Go: NewJSMirrorWithFirstSeqError — mirror + FirstSeq is invalid. // Reference: server/stream.go:1028-1031 if (!string.IsNullOrWhiteSpace(normalized.Mirror) && normalized.FirstSeq > 0) return JetStreamApiResponse.ErrorResponse(10054, "mirror configuration can not have a first sequence set"); // Go: NewJSMirrorWithMsgSchedulesError / NewJSSourceWithMsgSchedulesError // Reference: server/stream.go:1040-1046 if (normalized.AllowMsgSchedules && !string.IsNullOrWhiteSpace(normalized.Mirror)) return JetStreamApiResponse.ErrorResponse(10054, "mirror configuration can not have message schedules"); if (normalized.AllowMsgSchedules && normalized.Sources.Count > 0) return JetStreamApiResponse.ErrorResponse(10054, "source configuration can not have message schedules"); // Go: SubjectDeleteMarkerTTL + Mirror is invalid. // Reference: server/stream.go:1050-1053 if (normalized.SubjectDeleteMarkerTtlMs > 0 && !string.IsNullOrWhiteSpace(normalized.Mirror)) return JetStreamApiResponse.ErrorResponse(10054, "mirror configuration can not have subject delete marker TTL"); // Go: NewJSMirrorWithAtomicPublishError (10198) — mirror + AllowAtomicPublish is invalid. // Reference: server/stream.go:1735-1737 if (normalized.AllowAtomicPublish && !string.IsNullOrWhiteSpace(normalized.Mirror)) return JetStreamApiResponse.ErrorResponse( AtomicBatchPublishErrorCodes.MirrorWithAtomicPublish, "stream mirrors can not also use atomic publishing"); // Go: RePublish cycle detection — destination must not overlap stream subjects. // Reference: server/stream.go:1060-1080 (checkRePublish) if (!string.IsNullOrWhiteSpace(normalized.RePublishDest)) { var cycleError = CheckRepublishCycle(normalized); if (cycleError != null) return cycleError; } var isCreate = !_streams.ContainsKey(normalized.Name); if (isCreate && _account is not null && !_account.TryReserveStream()) return JetStreamApiResponse.ErrorResponse(10027, "maximum streams exceeded"); // Go: subject overlap detection on create — reject new streams whose subjects // collide with existing streams. // Reference: server/stream.go — checkStreamOverlap during addStream. if (isCreate && normalized.Subjects.Count > 0) { var otherStreams = _streams.Values.Select(s => s.Config); var overlapErrors = new List(); foreach (var otherStream in otherStreams) { foreach (var proposedSubj in normalized.Subjects) { foreach (var otherSubj in otherStream.Subjects) { if (SubjectMatch.MatchLiteral(proposedSubj, otherSubj) || SubjectMatch.MatchLiteral(otherSubj, proposedSubj) || SubjectMatch.SubjectsCollide(proposedSubj, otherSubj)) { return JetStreamApiResponse.ErrorResponse(400, $"subjects overlap with stream '{otherStream.Name}'"); } } } } } // Go: stream.go:update — validate immutable fields on update. // Reference: server/stream.go:1500-1600 (stream.update) if (!isCreate && _streams.TryGetValue(normalized.Name, out var existingHandle)) { var otherStreams = _streams.Values .Where(s => !string.Equals(s.Config.Name, normalized.Name, StringComparison.Ordinal)) .Select(s => s.Config); var updateErrors = ValidateConfigUpdate(existingHandle.Config, normalized, otherStreams); if (updateErrors.Count > 0) return JetStreamApiResponse.ErrorResponse(400, updateErrors[0]); } var handle = _streams.AddOrUpdate( normalized.Name, _ => new StreamHandle(normalized, CreateStore(normalized)), (_, existing) => { if (existing.Config.Storage == normalized.Storage) return existing with { Config = normalized }; return new StreamHandle(normalized, CreateStore(normalized)); }); _replicaGroups.AddOrUpdate( normalized.Name, _ => new StreamReplicaGroup(normalized.Name, normalized.Replicas), (_, existing) => existing.Nodes.Count == Math.Max(normalized.Replicas, 1) ? existing : new StreamReplicaGroup(normalized.Name, normalized.Replicas)); RebuildReplicationCoordinators(); _metaGroup?.ProposeCreateStreamAsync(normalized, default).GetAwaiter().GetResult(); return BuildStreamInfoResponse(handle); } public JetStreamApiResponse GetInfo(string name) { if (_streams.TryGetValue(name, out var stream)) return BuildStreamInfoResponse(stream); return JetStreamApiResponse.NotFound($"$JS.API.STREAM.INFO.{name}"); } public bool TryGet(string name, out StreamHandle handle) => _streams.TryGetValue(name, out handle!); public bool Exists(string name) => _streams.ContainsKey(name); public bool Delete(string name) { if (!_streams.TryRemove(name, out _)) return false; _replicaGroups.TryRemove(name, out _); _account?.ReleaseStream(); RebuildReplicationCoordinators(); // Go: propagate stream deletion to meta group so cluster state is updated. // Reference: server/jetstream_cluster.go — processStreamRemoval updates meta state. _metaGroup?.ProposeDeleteStreamAsync(name, default).GetAwaiter().GetResult(); return true; } public bool Purge(string name) { if (!_streams.TryGetValue(name, out var stream)) return false; if (stream.Config.Sealed || stream.Config.DenyPurge) return false; stream.Store.PurgeAsync(default).GetAwaiter().GetResult(); return true; } /// /// Extended purge with optional subject filter, sequence cutoff, and keep-last-N. /// Returns the number of messages purged, or -1 if the stream was not found. /// Go reference: jetstream_api.go:1200-1350 — purge options: filter, seq, keep. /// public long PurgeEx(string name, string? filter, ulong? seq, ulong? keep) { if (!_streams.TryGetValue(name, out var stream)) return -1; if (stream.Config.Sealed || stream.Config.DenyPurge) return -1; // No options — purge everything (backward-compatible with the original Purge). if (filter is null && seq is null && keep is null) { var stateBefore = stream.Store.GetStateAsync(default).GetAwaiter().GetResult(); var count = stateBefore.Messages; stream.Store.PurgeAsync(default).GetAwaiter().GetResult(); return (long)count; } var messages = stream.Store.ListAsync(default).GetAwaiter().GetResult(); long purged = 0; // Filter + Keep: keep last N per matching subject. if (filter is not null && keep is not null) { var matching = messages .Where(m => SubjectMatch.MatchLiteral(m.Subject, filter)) .GroupBy(m => m.Subject, StringComparer.Ordinal); foreach (var group in matching) { var ordered = group.OrderByDescending(m => m.Sequence).ToList(); foreach (var msg in ordered.Skip((int)keep.Value)) { if (stream.Store.RemoveAsync(msg.Sequence, default).GetAwaiter().GetResult()) purged++; } } return purged; } // Filter only: remove all messages matching the subject pattern. if (filter is not null) { // If seq is also set, only purge matching messages below that sequence. foreach (var msg in messages) { if (!SubjectMatch.MatchLiteral(msg.Subject, filter)) continue; if (seq is not null && msg.Sequence >= seq.Value) continue; if (stream.Store.RemoveAsync(msg.Sequence, default).GetAwaiter().GetResult()) purged++; } return purged; } // Seq only: remove all messages with sequence < seq. if (seq is not null) { foreach (var msg in messages) { if (msg.Sequence >= seq.Value) continue; if (stream.Store.RemoveAsync(msg.Sequence, default).GetAwaiter().GetResult()) purged++; } return purged; } // Keep only (no filter): keep the last N messages globally, delete the rest. if (keep is not null) { var ordered = messages.OrderByDescending(m => m.Sequence).ToList(); foreach (var msg in ordered.Skip((int)keep.Value)) { if (stream.Store.RemoveAsync(msg.Sequence, default).GetAwaiter().GetResult()) purged++; } return purged; } return purged; } public StoredMessage? GetMessage(string name, ulong sequence) { if (!_streams.TryGetValue(name, out var stream)) return null; return stream.Store.LoadAsync(sequence, default).GetAwaiter().GetResult(); } public bool DeleteMessage(string name, ulong sequence) { if (!_streams.TryGetValue(name, out var stream)) return false; if (stream.Config.Sealed || stream.Config.DenyDelete) return false; return stream.Store.RemoveAsync(sequence, default).GetAwaiter().GetResult(); } public byte[]? CreateSnapshot(string name) { if (!_streams.TryGetValue(name, out var stream)) return null; return _snapshotService.SnapshotAsync(stream, default).GetAwaiter().GetResult(); } public bool RestoreSnapshot(string name, ReadOnlyMemory snapshot) { if (!_streams.TryGetValue(name, out var stream)) return false; _snapshotService.RestoreAsync(stream, snapshot, default).GetAwaiter().GetResult(); return true; } public ValueTask GetStateAsync(string name, CancellationToken ct) { if (_streams.TryGetValue(name, out var stream)) return stream.Store.GetStateAsync(ct); return ValueTask.FromResult(new Models.ApiStreamState()); } public StreamHandle? FindBySubject(string subject) { foreach (var stream in _streams.Values) { if (stream.Config.Subjects.Any(p => SubjectMatch.MatchLiteral(subject, p))) return stream; } return null; } public PubAck? Capture(string subject, ReadOnlyMemory payload) { var stream = FindBySubject(subject); if (stream == null) return null; // Go: sealed stream rejects all publishes. // Reference: server/stream.go — processJetStreamMsg checks mset.cfg.Sealed. if (stream.Config.Sealed) return new PubAck { Stream = stream.Config.Name, ErrorCode = 10054 }; if (stream.Config.MaxMsgSize > 0 && payload.Length > stream.Config.MaxMsgSize) { return new PubAck { Stream = stream.Config.Name, ErrorCode = 10054, }; } // Go: memStoreMsgSize — full message size includes subject + headers + payload + 16 bytes overhead. var msgSize = subject.Length + payload.Length + 16; var stateBefore = stream.Store.GetStateAsync(default).GetAwaiter().GetResult(); // Go: DiscardPolicy.New — reject when MaxMsgs reached. // Reference: server/stream.go — processJetStreamMsg checks discard new + maxMsgs. if (stream.Config.MaxMsgs > 0 && stream.Config.Discard == DiscardPolicy.New && (long)stateBefore.Messages >= stream.Config.MaxMsgs) { return new PubAck { Stream = stream.Config.Name, ErrorCode = 10054 }; } if (stream.Config.MaxBytes > 0 && (long)stateBefore.Bytes + msgSize > stream.Config.MaxBytes) { if (stream.Config.Discard == DiscardPolicy.New) { return new PubAck { Stream = stream.Config.Name, ErrorCode = 10054, }; } while ((long)stateBefore.Bytes + msgSize > stream.Config.MaxBytes && stateBefore.FirstSeq > 0) { stream.Store.RemoveAsync(stateBefore.FirstSeq, default).GetAwaiter().GetResult(); stateBefore = stream.Store.GetStateAsync(default).GetAwaiter().GetResult(); } } // Go: single-replica streams don't use RAFT consensus — skip the propose overhead. // Reference: server/stream.go — processJetStreamMsg only proposes when R > 1. if (stream.Config.Replicas > 1 && _replicaGroups.TryGetValue(stream.Config.Name, out var replicaGroup)) { _ = replicaGroup.ProposeAsync($"PUB {subject}", default).GetAwaiter().GetResult(); } // Go: stream.go:processMsgSubjectTransform — apply input subject transform before store. // Reference: server/stream.go:1810-1830 var storeSubject = ApplyInputTransform(stream.Config, subject); var seq = stream.Store.AppendAsync(storeSubject, payload, default).GetAwaiter().GetResult(); EnforceRuntimePolicies(stream, DateTime.UtcNow); // Wake up any pull consumers waiting at the stream tail. stream.NotifyPublish(); // Only load the stored message when replication is configured (mirror/source). // Avoids unnecessary disk I/O on the hot publish path. if (_mirrorsByOrigin.ContainsKey(stream.Config.Name) || _sourcesByOrigin.ContainsKey(stream.Config.Name)) { var stored = stream.Store.LoadAsync(seq, default).GetAwaiter().GetResult(); if (stored != null) ReplicateIfConfigured(stream.Config.Name, stored); } return new PubAck { Stream = stream.Config.Name, Seq = seq, }; } /// /// Captures a counter increment message for a stream with AllowMsgCounter=true. /// Go reference: server/stream.go — processJetStreamMsg counter path. /// The server loads the last stored value for the subject, adds the increment, /// and stores the new total as a JSON payload. /// public PubAck? CaptureCounter(string subject, long increment) { var stream = FindBySubject(subject); if (stream == null) return null; if (!stream.Config.AllowMsgCounter) return new PubAck { Stream = stream.Config.Name, ErrorCode = 10054 }; if (stream.Config.Sealed) return new PubAck { Stream = stream.Config.Name, ErrorCode = 10054 }; // Go: stream.go — counter increment: load last value, add increment, store new total. var storeSubject = ApplyInputTransform(stream.Config, subject); var lastMsg = stream.Store.LoadLastBySubjectAsync(storeSubject, default).GetAwaiter().GetResult(); var existing = lastMsg != null ? CounterValue.FromPayload(lastMsg.Payload.Span) : new CounterValue(); var newTotal = existing.AsLong() + increment; var newPayload = CounterValue.FromLong(newTotal).ToPayload(); if (_replicaGroups.TryGetValue(stream.Config.Name, out var replicaGroup)) _ = replicaGroup.ProposeAsync($"PUB {subject}", default).GetAwaiter().GetResult(); var seq = stream.Store.AppendAsync(storeSubject, newPayload, default).GetAwaiter().GetResult(); EnforceRuntimePolicies(stream, DateTime.UtcNow); // Wake up any pull consumers waiting at the stream tail. stream.NotifyPublish(); if (_mirrorsByOrigin.ContainsKey(stream.Config.Name) || _sourcesByOrigin.ContainsKey(stream.Config.Name)) { var stored = stream.Store.LoadAsync(seq, default).GetAwaiter().GetResult(); if (stored != null) ReplicateIfConfigured(stream.Config.Name, stored); } return new PubAck { Stream = stream.Config.Name, Seq = seq, }; } public Task StepDownStreamLeaderAsync(string stream, CancellationToken ct) { if (_replicaGroups.TryGetValue(stream, out var replicaGroup)) return replicaGroup.StepDownAsync(ct); return Task.CompletedTask; } private static StreamConfig NormalizeConfig(StreamConfig config) { // Go: mirror streams must not carry subject lists — they inherit subjects from origin. // Reference: server/stream.go:1020-1025 (clearMirrorSubjects recovery path) var subjects = !string.IsNullOrWhiteSpace(config.Mirror) ? (List)[] : config.Subjects.Count == 0 ? [] : [.. config.Subjects]; var copy = new StreamConfig { Name = config.Name, Subjects = subjects, MaxMsgs = config.MaxMsgs, MaxBytes = config.MaxBytes, MaxMsgsPer = config.MaxMsgsPer, MaxAgeMs = config.MaxAgeMs, MaxMsgSize = config.MaxMsgSize, MaxConsumers = config.MaxConsumers, DuplicateWindowMs = config.DuplicateWindowMs, Sealed = config.Sealed, DenyDelete = config.DenyDelete, DenyPurge = config.DenyPurge, AllowDirect = config.AllowDirect, AllowMsgTtl = config.AllowMsgTtl, FirstSeq = config.FirstSeq, Retention = config.Retention, Discard = config.Discard, Storage = config.Storage, Replicas = config.Replicas, Mirror = config.Mirror, Source = config.Source, Sources = config.Sources.Count == 0 ? [] : [.. config.Sources.Select(s => new StreamSourceConfig { Name = s.Name, SubjectTransformPrefix = s.SubjectTransformPrefix, SourceAccount = s.SourceAccount, FilterSubject = s.FilterSubject, DuplicateWindowMs = s.DuplicateWindowMs, SubjectTransforms = [.. s.SubjectTransforms.Select(t => new SubjectTransformConfig { Source = t.Source, Destination = t.Destination, })], })], // Go: StreamConfig.SubjectTransform SubjectTransformSource = config.SubjectTransformSource, SubjectTransformDest = config.SubjectTransformDest, // Go: StreamConfig.RePublish RePublishSource = config.RePublishSource, RePublishDest = config.RePublishDest, RePublishHeadersOnly = config.RePublishHeadersOnly, // Go: StreamConfig.SubjectDeleteMarkerTTL SubjectDeleteMarkerTtlMs = config.SubjectDeleteMarkerTtlMs, // Go: StreamConfig.AllowMsgSchedules AllowMsgSchedules = config.AllowMsgSchedules, // Go: StreamConfig.AllowMsgCounter — CRDT counter semantics AllowMsgCounter = config.AllowMsgCounter, // Go: StreamConfig.AllowAtomicPublish — atomic batch publish AllowAtomicPublish = config.AllowAtomicPublish, // Go: StreamConfig.PersistMode — async vs sync persistence PersistMode = config.PersistMode, // Go: StreamConfig.Metadata — user and server key/value metadata Metadata = config.Metadata == null ? null : new Dictionary(config.Metadata), }; return copy; } // Go reference: server/stream.go:1810-1830 (processMsgSubjectTransform) private static string ApplyInputTransform(StreamConfig config, string subject) { if (string.IsNullOrWhiteSpace(config.SubjectTransformDest)) return subject; var src = string.IsNullOrWhiteSpace(config.SubjectTransformSource) ? ">" : config.SubjectTransformSource; var transform = SubjectTransform.Create(src, config.SubjectTransformDest); if (transform == null) return subject; return transform.Apply(subject) ?? subject; } // Go reference: server/stream.go:1060-1080 — checks that RePublish destination // does not cycle back onto any of the stream's own subjects. private static JetStreamApiResponse? CheckRepublishCycle(StreamConfig config) { if (string.IsNullOrWhiteSpace(config.RePublishDest)) return null; foreach (var streamSubject in config.Subjects) { // If the republish destination matches any stream subject pattern, it's a cycle. if (SubjectMatch.MatchLiteral(config.RePublishDest, streamSubject) || SubjectMatch.MatchLiteral(streamSubject, config.RePublishDest)) { return JetStreamApiResponse.ErrorResponse(10054, "stream configuration for republish destination forms a cycle"); } // If a specific source filter is set, only check subjects reachable from that filter. if (!string.IsNullOrWhiteSpace(config.RePublishSource)) { // If the source filter matches the stream subject AND the dest also matches → cycle. if (SubjectMatch.MatchLiteral(config.RePublishSource, streamSubject) && (SubjectMatch.MatchLiteral(config.RePublishDest, streamSubject) || SubjectMatch.MatchLiteral(streamSubject, config.RePublishDest))) { return JetStreamApiResponse.ErrorResponse(10054, "stream configuration for republish destination forms a cycle"); } } } return null; } /// /// Validates that is a legal update of . /// Returns an empty list when the update is valid; otherwise returns one or more error strings. /// The parameter is used to detect subject overlap with peer streams. /// Go reference: server/stream.go:1500-1600 (stream.update immutable-field checks). /// public static IReadOnlyList ValidateConfigUpdate( StreamConfig existing, StreamConfig proposed, IEnumerable? otherStreams = null) { List errors = []; // Sealed streams reject all modifications. if (existing.Sealed) { errors.Add("sealed stream cannot be modified"); return errors; } // Storage type is immutable. if (existing.Storage != proposed.Storage) errors.Add("storage type cannot be changed"); // Mirror is immutable — if both have a mirror it must match. Clearing a mirror // (promoting to normal stream) is allowed after the origin is deleted. // Go reference: server/stream.go — update allows clearing mirror for promotion. if (!string.IsNullOrWhiteSpace(existing.Mirror) && !string.IsNullOrWhiteSpace(proposed.Mirror) && !string.Equals(existing.Mirror, proposed.Mirror, StringComparison.Ordinal)) { errors.Add("mirror configuration cannot be changed"); } // Sources: changing to a different non-empty set is not allowed, but clearing // sources (removing all) or adding sources to a previously source-less stream is permitted. // Go reference: server/stream.go — update allows adding/removing sources. if (existing.Sources.Count > 0 && proposed.Sources.Count > 0) { var existingNames = existing.Sources.Select(s => s.Name).OrderBy(n => n, StringComparer.Ordinal).ToList(); var proposedNames = proposed.Sources.Select(s => s.Name).OrderBy(n => n, StringComparer.Ordinal).ToList(); if (!existingNames.SequenceEqual(proposedNames, StringComparer.Ordinal)) errors.Add("sources cannot be changed after creation"); } // Retention policy is immutable. if (existing.Retention != proposed.Retention) errors.Add("retention policy cannot be changed"); // MaxConsumers may only be increased (or left unlimited). if (existing.MaxConsumers > 0 && proposed.MaxConsumers > 0 && proposed.MaxConsumers < existing.MaxConsumers) { errors.Add("max consumers can only be increased"); } // Subject overlap detection with peer streams. if (otherStreams is not null && proposed.Subjects.Count > 0) { foreach (var otherStream in otherStreams) { foreach (var proposed_subj in proposed.Subjects) { foreach (var other_subj in otherStream.Subjects) { if (SubjectMatch.MatchLiteral(proposed_subj, other_subj) || SubjectMatch.MatchLiteral(other_subj, proposed_subj) || SubjectMatch.SubjectsCollide(proposed_subj, other_subj)) { errors.Add($"subjects overlap with stream '{otherStream.Name}'"); goto nextStream; } } } nextStream:; } } return errors; } private static JetStreamApiResponse BuildStreamInfoResponse(StreamHandle handle) { var state = handle.Store.GetStateAsync(default).GetAwaiter().GetResult(); return new JetStreamApiResponse { StreamInfo = new JetStreamStreamInfo { Config = handle.Config, State = state, }, }; } private void EnforceRuntimePolicies(StreamHandle stream, DateTime nowUtc) { switch (stream.Config.Retention) { case RetentionPolicy.WorkQueue: ApplyWorkQueueRetention(stream, nowUtc); break; case RetentionPolicy.Interest: ApplyInterestRetention(stream, nowUtc); break; default: ApplyLimitsRetention(stream, nowUtc); break; } } private static void ApplyLimitsRetention(StreamHandle stream, DateTime nowUtc) { EnforceLimits(stream); if (stream.Config.MaxMsgsPer > 0) PrunePerSubject(stream); if (stream.Config.MaxAgeMs > 0) PruneExpiredMessages(stream, nowUtc); } private void ApplyWorkQueueRetention(StreamHandle stream, DateTime nowUtc) { ApplyLimitsRetention(stream, nowUtc); if (_consumerManager == null) return; var ackFloor = _consumerManager.GetAckFloor(stream.Config.Name); if (ackFloor == 0) return; var messages = stream.Store.ListAsync(default).GetAwaiter().GetResult(); foreach (var message in messages) { if (message.Sequence <= ackFloor) stream.Store.RemoveAsync(message.Sequence, default).GetAwaiter().GetResult(); } } private static void ApplyInterestRetention(StreamHandle stream, DateTime nowUtc) { // Interest retention relies on consumer interest lifecycle that is modeled // separately; bounded pruning remains aligned with limits retention. ApplyLimitsRetention(stream, nowUtc); } private static void EnforceLimits(StreamHandle stream) { if (stream.Config.MaxMsgs <= 0) return; var maxMessages = (ulong)stream.Config.MaxMsgs; if (stream.Store is MemStore memStore) { memStore.TrimToMaxMessages(maxMessages); return; } if (stream.Store is FileStore fileStore) fileStore.TrimToMaxMessages(maxMessages); } private static void PrunePerSubject(StreamHandle stream) { if (stream.Config.MaxMsgsPer <= 0) return; var maxPerSubject = stream.Config.MaxMsgsPer; var messages = stream.Store.ListAsync(default).GetAwaiter().GetResult(); foreach (var group in messages.GroupBy(m => m.Subject, StringComparer.Ordinal)) { foreach (var message in group.OrderByDescending(m => m.Sequence).Skip(maxPerSubject)) stream.Store.RemoveAsync(message.Sequence, default).GetAwaiter().GetResult(); } } private static void PruneExpiredMessages(StreamHandle stream, DateTime nowUtc) { if (stream.Config.MaxAgeMs <= 0) return; var cutoff = nowUtc.AddMilliseconds(-stream.Config.MaxAgeMs); var messages = stream.Store.ListAsync(default).GetAwaiter().GetResult(); foreach (var message in messages) { if (message.TimestampUtc < cutoff) stream.Store.RemoveAsync(message.Sequence, default).GetAwaiter().GetResult(); } } private void RebuildReplicationCoordinators() { _mirrorsByOrigin.Clear(); _sourcesByOrigin.Clear(); foreach (var stream in _streams.Values) { if (!string.IsNullOrWhiteSpace(stream.Config.Mirror) && _streams.TryGetValue(stream.Config.Mirror, out _)) { var list = _mirrorsByOrigin.GetOrAdd(stream.Config.Mirror, _ => []); list.Add(new MirrorCoordinator(stream.Store)); } if (!string.IsNullOrWhiteSpace(stream.Config.Source) && _streams.TryGetValue(stream.Config.Source, out _)) { var list = _sourcesByOrigin.GetOrAdd(stream.Config.Source, _ => []); list.Add(new SourceCoordinator(stream.Store, new StreamSourceConfig { Name = stream.Config.Source }) { AllowMsgCounter = stream.Config.AllowMsgCounter, }); } if (stream.Config.Sources.Count > 0) { foreach (var source in stream.Config.Sources) { if (string.IsNullOrWhiteSpace(source.Name) || !_streams.TryGetValue(source.Name, out _)) continue; var list = _sourcesByOrigin.GetOrAdd(source.Name, _ => []); list.Add(new SourceCoordinator(stream.Store, source) { AllowMsgCounter = stream.Config.AllowMsgCounter, }); } } } } private void ReplicateIfConfigured(string originStream, StoredMessage stored) { if (_mirrorsByOrigin.TryGetValue(originStream, out var mirrors)) { foreach (var mirror in mirrors) mirror.OnOriginAppendAsync(stored, default).GetAwaiter().GetResult(); } if (_sourcesByOrigin.TryGetValue(originStream, out var sources)) { foreach (var source in sources) source.OnOriginAppendAsync(stored, default).GetAwaiter().GetResult(); } } public string GetStoreBackendType(string streamName) { if (!_streams.TryGetValue(streamName, out var stream)) return "missing"; return stream.Store switch { FileStore => "file", _ => "memory", }; } /// /// Returns mirror monitoring info for the given stream, or null if the stream does not exist /// or is not configured as a mirror. /// Go reference: server/stream.go:2739-2743 (mirrorInfo) /// public MirrorInfoResponse? GetMirrorInfo(string streamName) { if (!_streams.TryGetValue(streamName, out var stream)) return null; if (string.IsNullOrWhiteSpace(stream.Config.Mirror)) return null; if (!_mirrorsByOrigin.TryGetValue(stream.Config.Mirror, out var coordinators)) return null; var first = coordinators.Count > 0 ? coordinators[0] : null; return first?.GetMirrorInfo(streamName); } /// /// Returns source monitoring info for all sources configured on the given stream. /// Returns an empty array when the stream does not exist or has no sources. /// Go reference: server/stream.go:2687-2695 (sourcesInfo) /// public SourceInfoResponse[] GetSourceInfos(string streamName) { if (!_streams.TryGetValue(streamName, out _)) return []; var results = new List(); foreach (var (_, coordinators) in _sourcesByOrigin) { foreach (var coord in coordinators) results.Add(coord.GetSourceInfo()); } return [.. results]; } private IStreamStore CreateStore(StreamConfig config) { return config.Storage switch { StorageType.File => new FileStore(new FileStoreOptions { Directory = Path.Combine( _storeDir ?? Path.Combine(Path.GetTempPath(), "natsdotnet-js-store"), config.Name), MaxAgeMs = config.MaxAgeMs, }), // Go: newMemStore — pass full config so FirstSeq, MaxMsgsPer, AllowMsgTtl, etc. apply. // Reference: server/memstore.go:99 (newMemStore constructor). _ => new MemStore(config), }; } } public sealed record StreamHandle(StreamConfig Config, IStreamStore Store) { // Signal-based wakeup for pull consumers waiting at the stream tail. // Go reference: consumer.go — channel signaling from publisher to waiting consumer. private volatile TaskCompletionSource _publishSignal = new(TaskCreationOptions.RunContinuationsAsynchronously); /// /// Notifies waiting consumers that a new message has been published. /// public void NotifyPublish() { var old = Interlocked.Exchange(ref _publishSignal, new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously)); old.TrySetResult(); } /// /// Waits until a new message is published to this stream. /// public Task WaitForPublishAsync(CancellationToken ct) => _publishSignal.Task.WaitAsync(ct); }