using System.Collections.Concurrent; using NATS.Server.Auth; using NATS.Server.JetStream.Api; using NATS.Server.JetStream.Cluster; using NATS.Server.JetStream.MirrorSource; using NATS.Server.JetStream.Models; using NATS.Server.JetStream.Publish; using NATS.Server.JetStream.Storage; using NATS.Server.Subscriptions; namespace NATS.Server.JetStream; public sealed class StreamManager { private readonly Account? _account; private readonly JetStreamMetaGroup? _metaGroup; private readonly ConcurrentDictionary _streams = new(StringComparer.Ordinal); private readonly ConcurrentDictionary _replicaGroups = new(StringComparer.Ordinal); private readonly ConcurrentDictionary> _mirrorsByOrigin = new(StringComparer.Ordinal); private readonly ConcurrentDictionary> _sourcesByOrigin = new(StringComparer.Ordinal); public StreamManager(JetStreamMetaGroup? metaGroup = null, Account? account = null) { _metaGroup = metaGroup; _account = account; } public IReadOnlyCollection StreamNames => _streams.Keys.ToArray(); public JetStreamApiResponse CreateOrUpdate(StreamConfig config) { if (string.IsNullOrWhiteSpace(config.Name)) return JetStreamApiResponse.ErrorResponse(400, "stream name required"); var normalized = NormalizeConfig(config); var isCreate = !_streams.ContainsKey(normalized.Name); if (isCreate && _account is not null && !_account.TryReserveStream()) return JetStreamApiResponse.ErrorResponse(10027, "maximum streams exceeded"); var handle = _streams.AddOrUpdate( normalized.Name, _ => new StreamHandle(normalized, new MemStore()), (_, existing) => existing with { Config = normalized }); _replicaGroups.AddOrUpdate( normalized.Name, _ => new StreamReplicaGroup(normalized.Name, normalized.Replicas), (_, existing) => existing.Nodes.Count == Math.Max(normalized.Replicas, 1) ? existing : new StreamReplicaGroup(normalized.Name, normalized.Replicas)); RebuildReplicationCoordinators(); _metaGroup?.ProposeCreateStreamAsync(normalized, default).GetAwaiter().GetResult(); return BuildStreamInfoResponse(handle); } public JetStreamApiResponse GetInfo(string name) { if (_streams.TryGetValue(name, out var stream)) return BuildStreamInfoResponse(stream); return JetStreamApiResponse.NotFound($"$JS.API.STREAM.INFO.{name}"); } public bool TryGet(string name, out StreamHandle handle) => _streams.TryGetValue(name, out handle!); public ValueTask GetStateAsync(string name, CancellationToken ct) { if (_streams.TryGetValue(name, out var stream)) return stream.Store.GetStateAsync(ct); return ValueTask.FromResult(new StreamState()); } public StreamHandle? FindBySubject(string subject) { foreach (var stream in _streams.Values) { if (stream.Config.Subjects.Any(p => SubjectMatch.MatchLiteral(subject, p))) return stream; } return null; } public PubAck? Capture(string subject, ReadOnlyMemory payload) { var stream = FindBySubject(subject); if (stream == null) return null; if (_replicaGroups.TryGetValue(stream.Config.Name, out var replicaGroup)) _ = replicaGroup.ProposeAsync($"PUB {subject}", default).GetAwaiter().GetResult(); var seq = stream.Store.AppendAsync(subject, payload, default).GetAwaiter().GetResult(); EnforceLimits(stream); var stored = stream.Store.LoadAsync(seq, default).GetAwaiter().GetResult(); if (stored != null) ReplicateIfConfigured(stream.Config.Name, stored); return new PubAck { Stream = stream.Config.Name, Seq = seq, }; } public Task StepDownStreamLeaderAsync(string stream, CancellationToken ct) { if (_replicaGroups.TryGetValue(stream, out var replicaGroup)) return replicaGroup.StepDownAsync(ct); return Task.CompletedTask; } private static StreamConfig NormalizeConfig(StreamConfig config) { var copy = new StreamConfig { Name = config.Name, Subjects = config.Subjects.Count == 0 ? [] : [.. config.Subjects], MaxMsgs = config.MaxMsgs, Replicas = config.Replicas, Mirror = config.Mirror, Source = config.Source, }; return copy; } private static JetStreamApiResponse BuildStreamInfoResponse(StreamHandle handle) { var state = handle.Store.GetStateAsync(default).GetAwaiter().GetResult(); return new JetStreamApiResponse { StreamInfo = new JetStreamStreamInfo { Config = handle.Config, State = state, }, }; } private static void EnforceLimits(StreamHandle stream) { if (stream.Config.MaxMsgs <= 0) return; var maxMessages = (ulong)stream.Config.MaxMsgs; if (stream.Store is MemStore memStore) { memStore.TrimToMaxMessages(maxMessages); return; } if (stream.Store is FileStore fileStore) fileStore.TrimToMaxMessages(maxMessages); } private void RebuildReplicationCoordinators() { _mirrorsByOrigin.Clear(); _sourcesByOrigin.Clear(); foreach (var stream in _streams.Values) { if (!string.IsNullOrWhiteSpace(stream.Config.Mirror) && _streams.TryGetValue(stream.Config.Mirror, out _)) { var list = _mirrorsByOrigin.GetOrAdd(stream.Config.Mirror, _ => []); list.Add(new MirrorCoordinator(stream.Store)); } if (!string.IsNullOrWhiteSpace(stream.Config.Source) && _streams.TryGetValue(stream.Config.Source, out _)) { var list = _sourcesByOrigin.GetOrAdd(stream.Config.Source, _ => []); list.Add(new SourceCoordinator(stream.Store)); } } } private void ReplicateIfConfigured(string originStream, StoredMessage stored) { if (_mirrorsByOrigin.TryGetValue(originStream, out var mirrors)) { foreach (var mirror in mirrors) mirror.OnOriginAppendAsync(stored, default).GetAwaiter().GetResult(); } if (_sourcesByOrigin.TryGetValue(originStream, out var sources)) { foreach (var source in sources) source.OnOriginAppendAsync(stored, default).GetAwaiter().GetResult(); } } } public sealed record StreamHandle(StreamConfig Config, IStreamStore Store);