using System.Collections.Concurrent; using NATS.Server.Auth; using NATS.Server.JetStream.Api; using NATS.Server.JetStream.Cluster; using NATS.Server.JetStream.MirrorSource; using NATS.Server.JetStream.Models; using NATS.Server.JetStream.Publish; using NATS.Server.JetStream.Snapshots; using NATS.Server.JetStream.Storage; using NATS.Server.Subscriptions; namespace NATS.Server.JetStream; public sealed class StreamManager { private readonly Account? _account; private readonly ConsumerManager? _consumerManager; private readonly JetStreamMetaGroup? _metaGroup; private readonly ConcurrentDictionary _streams = new(StringComparer.Ordinal); private readonly ConcurrentDictionary _replicaGroups = new(StringComparer.Ordinal); private readonly ConcurrentDictionary> _mirrorsByOrigin = new(StringComparer.Ordinal); private readonly ConcurrentDictionary> _sourcesByOrigin = new(StringComparer.Ordinal); private readonly StreamSnapshotService _snapshotService = new(); public StreamManager(JetStreamMetaGroup? metaGroup = null, Account? account = null, ConsumerManager? consumerManager = null) { _metaGroup = metaGroup; _account = account; _consumerManager = consumerManager; } public IReadOnlyCollection StreamNames => _streams.Keys.ToArray(); public MetaGroupState? GetMetaState() => _metaGroup?.GetState(); public IReadOnlyList ListNames() => [.. _streams.Keys.OrderBy(x => x, StringComparer.Ordinal)]; public JetStreamApiResponse CreateOrUpdate(StreamConfig config) { if (string.IsNullOrWhiteSpace(config.Name)) return JetStreamApiResponse.ErrorResponse(400, "stream name required"); var normalized = NormalizeConfig(config); var isCreate = !_streams.ContainsKey(normalized.Name); if (isCreate && _account is not null && !_account.TryReserveStream()) return JetStreamApiResponse.ErrorResponse(10027, "maximum streams exceeded"); var handle = _streams.AddOrUpdate( normalized.Name, _ => new StreamHandle(normalized, CreateStore(normalized)), (_, existing) => { if (existing.Config.Storage == normalized.Storage) return existing with { Config = normalized }; return new StreamHandle(normalized, CreateStore(normalized)); }); _replicaGroups.AddOrUpdate( normalized.Name, _ => new StreamReplicaGroup(normalized.Name, normalized.Replicas), (_, existing) => existing.Nodes.Count == Math.Max(normalized.Replicas, 1) ? existing : new StreamReplicaGroup(normalized.Name, normalized.Replicas)); RebuildReplicationCoordinators(); _metaGroup?.ProposeCreateStreamAsync(normalized, default).GetAwaiter().GetResult(); return BuildStreamInfoResponse(handle); } public JetStreamApiResponse GetInfo(string name) { if (_streams.TryGetValue(name, out var stream)) return BuildStreamInfoResponse(stream); return JetStreamApiResponse.NotFound($"$JS.API.STREAM.INFO.{name}"); } public bool TryGet(string name, out StreamHandle handle) => _streams.TryGetValue(name, out handle!); public bool Delete(string name) { if (!_streams.TryRemove(name, out _)) return false; _replicaGroups.TryRemove(name, out _); _account?.ReleaseStream(); RebuildReplicationCoordinators(); return true; } public bool Purge(string name) { if (!_streams.TryGetValue(name, out var stream)) return false; if (stream.Config.Sealed || stream.Config.DenyPurge) return false; stream.Store.PurgeAsync(default).GetAwaiter().GetResult(); return true; } public StoredMessage? GetMessage(string name, ulong sequence) { if (!_streams.TryGetValue(name, out var stream)) return null; return stream.Store.LoadAsync(sequence, default).GetAwaiter().GetResult(); } public bool DeleteMessage(string name, ulong sequence) { if (!_streams.TryGetValue(name, out var stream)) return false; if (stream.Config.Sealed || stream.Config.DenyDelete) return false; return stream.Store.RemoveAsync(sequence, default).GetAwaiter().GetResult(); } public byte[]? CreateSnapshot(string name) { if (!_streams.TryGetValue(name, out var stream)) return null; return _snapshotService.SnapshotAsync(stream, default).GetAwaiter().GetResult(); } public bool RestoreSnapshot(string name, ReadOnlyMemory snapshot) { if (!_streams.TryGetValue(name, out var stream)) return false; _snapshotService.RestoreAsync(stream, snapshot, default).GetAwaiter().GetResult(); return true; } public ValueTask GetStateAsync(string name, CancellationToken ct) { if (_streams.TryGetValue(name, out var stream)) return stream.Store.GetStateAsync(ct); return ValueTask.FromResult(new StreamState()); } public StreamHandle? FindBySubject(string subject) { foreach (var stream in _streams.Values) { if (stream.Config.Subjects.Any(p => SubjectMatch.MatchLiteral(subject, p))) return stream; } return null; } public PubAck? Capture(string subject, ReadOnlyMemory payload) { var stream = FindBySubject(subject); if (stream == null) return null; if (stream.Config.MaxMsgSize > 0 && payload.Length > stream.Config.MaxMsgSize) { return new PubAck { Stream = stream.Config.Name, ErrorCode = 10054, }; } PruneExpiredMessages(stream, DateTime.UtcNow); var stateBefore = stream.Store.GetStateAsync(default).GetAwaiter().GetResult(); if (stream.Config.MaxBytes > 0 && (long)stateBefore.Bytes + payload.Length > stream.Config.MaxBytes) { if (stream.Config.Discard == DiscardPolicy.New) { return new PubAck { Stream = stream.Config.Name, ErrorCode = 10054, }; } while ((long)stateBefore.Bytes + payload.Length > stream.Config.MaxBytes && stateBefore.FirstSeq > 0) { stream.Store.RemoveAsync(stateBefore.FirstSeq, default).GetAwaiter().GetResult(); stateBefore = stream.Store.GetStateAsync(default).GetAwaiter().GetResult(); } } if (_replicaGroups.TryGetValue(stream.Config.Name, out var replicaGroup)) _ = replicaGroup.ProposeAsync($"PUB {subject}", default).GetAwaiter().GetResult(); var seq = stream.Store.AppendAsync(subject, payload, default).GetAwaiter().GetResult(); EnforceRuntimePolicies(stream, DateTime.UtcNow); var stored = stream.Store.LoadAsync(seq, default).GetAwaiter().GetResult(); if (stored != null) ReplicateIfConfigured(stream.Config.Name, stored); return new PubAck { Stream = stream.Config.Name, Seq = seq, }; } public Task StepDownStreamLeaderAsync(string stream, CancellationToken ct) { if (_replicaGroups.TryGetValue(stream, out var replicaGroup)) return replicaGroup.StepDownAsync(ct); return Task.CompletedTask; } private static StreamConfig NormalizeConfig(StreamConfig config) { var copy = new StreamConfig { Name = config.Name, Subjects = config.Subjects.Count == 0 ? [] : [.. config.Subjects], MaxMsgs = config.MaxMsgs, MaxBytes = config.MaxBytes, MaxMsgsPer = config.MaxMsgsPer, MaxAgeMs = config.MaxAgeMs, MaxMsgSize = config.MaxMsgSize, MaxConsumers = config.MaxConsumers, DuplicateWindowMs = config.DuplicateWindowMs, Sealed = config.Sealed, DenyDelete = config.DenyDelete, DenyPurge = config.DenyPurge, AllowDirect = config.AllowDirect, Retention = config.Retention, Discard = config.Discard, Storage = config.Storage, Replicas = config.Replicas, Mirror = config.Mirror, Source = config.Source, Sources = config.Sources.Count == 0 ? [] : [.. config.Sources.Select(s => new StreamSourceConfig { Name = s.Name, SubjectTransformPrefix = s.SubjectTransformPrefix, SourceAccount = s.SourceAccount, })], }; return copy; } private static JetStreamApiResponse BuildStreamInfoResponse(StreamHandle handle) { var state = handle.Store.GetStateAsync(default).GetAwaiter().GetResult(); return new JetStreamApiResponse { StreamInfo = new JetStreamStreamInfo { Config = handle.Config, State = state, }, }; } private void EnforceRuntimePolicies(StreamHandle stream, DateTime nowUtc) { switch (stream.Config.Retention) { case RetentionPolicy.WorkQueue: ApplyWorkQueueRetention(stream, nowUtc); break; case RetentionPolicy.Interest: ApplyInterestRetention(stream, nowUtc); break; default: ApplyLimitsRetention(stream, nowUtc); break; } } private static void ApplyLimitsRetention(StreamHandle stream, DateTime nowUtc) { EnforceLimits(stream); PrunePerSubject(stream); PruneExpiredMessages(stream, nowUtc); } private void ApplyWorkQueueRetention(StreamHandle stream, DateTime nowUtc) { ApplyLimitsRetention(stream, nowUtc); if (_consumerManager == null) return; var ackFloor = _consumerManager.GetAckFloor(stream.Config.Name); if (ackFloor == 0) return; var messages = stream.Store.ListAsync(default).GetAwaiter().GetResult(); foreach (var message in messages) { if (message.Sequence <= ackFloor) stream.Store.RemoveAsync(message.Sequence, default).GetAwaiter().GetResult(); } } private static void ApplyInterestRetention(StreamHandle stream, DateTime nowUtc) { // Interest retention relies on consumer interest lifecycle that is modeled // separately; bounded pruning remains aligned with limits retention. ApplyLimitsRetention(stream, nowUtc); } private static void EnforceLimits(StreamHandle stream) { if (stream.Config.MaxMsgs <= 0) return; var maxMessages = (ulong)stream.Config.MaxMsgs; if (stream.Store is MemStore memStore) { memStore.TrimToMaxMessages(maxMessages); return; } if (stream.Store is FileStore fileStore) fileStore.TrimToMaxMessages(maxMessages); } private static void PrunePerSubject(StreamHandle stream) { if (stream.Config.MaxMsgsPer <= 0) return; var maxPerSubject = stream.Config.MaxMsgsPer; var messages = stream.Store.ListAsync(default).GetAwaiter().GetResult(); foreach (var group in messages.GroupBy(m => m.Subject, StringComparer.Ordinal)) { foreach (var message in group.OrderByDescending(m => m.Sequence).Skip(maxPerSubject)) stream.Store.RemoveAsync(message.Sequence, default).GetAwaiter().GetResult(); } } private static void PruneExpiredMessages(StreamHandle stream, DateTime nowUtc) { if (stream.Config.MaxAgeMs <= 0) return; var cutoff = nowUtc.AddMilliseconds(-stream.Config.MaxAgeMs); var messages = stream.Store.ListAsync(default).GetAwaiter().GetResult(); foreach (var message in messages) { if (message.TimestampUtc < cutoff) stream.Store.RemoveAsync(message.Sequence, default).GetAwaiter().GetResult(); } } private void RebuildReplicationCoordinators() { _mirrorsByOrigin.Clear(); _sourcesByOrigin.Clear(); foreach (var stream in _streams.Values) { if (!string.IsNullOrWhiteSpace(stream.Config.Mirror) && _streams.TryGetValue(stream.Config.Mirror, out _)) { var list = _mirrorsByOrigin.GetOrAdd(stream.Config.Mirror, _ => []); list.Add(new MirrorCoordinator(stream.Store)); } if (!string.IsNullOrWhiteSpace(stream.Config.Source) && _streams.TryGetValue(stream.Config.Source, out _)) { var list = _sourcesByOrigin.GetOrAdd(stream.Config.Source, _ => []); list.Add(new SourceCoordinator(stream.Store, new StreamSourceConfig { Name = stream.Config.Source })); } if (stream.Config.Sources.Count > 0) { foreach (var source in stream.Config.Sources) { if (string.IsNullOrWhiteSpace(source.Name) || !_streams.TryGetValue(source.Name, out _)) continue; var list = _sourcesByOrigin.GetOrAdd(source.Name, _ => []); list.Add(new SourceCoordinator(stream.Store, source)); } } } } private void ReplicateIfConfigured(string originStream, StoredMessage stored) { if (_mirrorsByOrigin.TryGetValue(originStream, out var mirrors)) { foreach (var mirror in mirrors) mirror.OnOriginAppendAsync(stored, default).GetAwaiter().GetResult(); } if (_sourcesByOrigin.TryGetValue(originStream, out var sources)) { foreach (var source in sources) source.OnOriginAppendAsync(stored, default).GetAwaiter().GetResult(); } } public string GetStoreBackendType(string streamName) { if (!_streams.TryGetValue(streamName, out var stream)) return "missing"; return stream.Store switch { FileStore => "file", _ => "memory", }; } private static IStreamStore CreateStore(StreamConfig config) { return config.Storage switch { StorageType.File => new FileStore(new FileStoreOptions { Directory = Path.Combine(Path.GetTempPath(), "natsdotnet-js-store", config.Name), MaxAgeMs = config.MaxAgeMs, }), _ => new MemStore(), }; } } public sealed record StreamHandle(StreamConfig Config, IStreamStore Store);