using System.Threading.Channels; using ZB.MOM.NatsNet.Server.Internal; using ZB.MOM.NatsNet.Server.Internal.DataStructures; namespace ZB.MOM.NatsNet.Server; internal sealed partial class NatsStream { private readonly object _consumersSync = new(); private readonly Dictionary _consumers = new(StringComparer.Ordinal); private List _consumerList = []; private readonly IpQueue _sigQueue = new("js-signal"); private readonly Channel _signalWake = Channel.CreateBounded(1); private readonly Channel _internalWake = Channel.CreateBounded(1); private readonly JsOutQ _outq = new(); internal static CMsg NewCMsg(string subject, ulong seq) { var msg = CMsg.Rent(); msg.Subject = subject; msg.Seq = seq; return msg; } internal void SignalConsumersLoop(CancellationToken cancellationToken = default) { while (!cancellationToken.IsCancellationRequested) { if (!_signalWake.Reader.TryRead(out _)) { Thread.Sleep(1); continue; } var messages = _sigQueue.Pop(); if (messages == null) continue; foreach (var msg in messages) { SignalConsumers(msg.Subject, msg.Seq); msg.ReturnToPool(); } } } internal void SignalConsumers(string subject, ulong seq) { _ = subject; _ = seq; lock (_consumersSync) { _ = _consumerList.Count; } } internal static JsPubMsg NewJSPubMsg(string destinationSubject, string subject, string? reply, byte[]? hdr, byte[]? msg, NatsConsumer? consumer, ulong seq) { var pub = GetJSPubMsgFromPool(); pub.Subject = destinationSubject; pub.Reply = reply; pub.Hdr = hdr; pub.Msg = msg; pub.Pa = new StoreMsg { Subject = subject, Seq = seq, Hdr = hdr ?? [], Msg = msg ?? [], Buf = [], }; pub.Sync = consumer; return pub; } internal static JsPubMsg GetJSPubMsgFromPool() => JsPubMsg.Rent(); internal void SetupSendCapabilities() { _ = _outq; _signalWake.Writer.TryWrite(true); } internal string AccName() => Account.Name; internal string NameLocked() => Name; internal void InternalLoop(CancellationToken cancellationToken = default) { while (!cancellationToken.IsCancellationRequested) { if (!_internalWake.Reader.TryRead(out _)) { Thread.Sleep(1); continue; } var messages = _sigQueue.Pop(); if (messages == null) continue; foreach (var msg in messages) { SignalConsumers(msg.Subject, msg.Seq); msg.ReturnToPool(); } } } internal void ResetAndWaitOnConsumers() { List snapshot; lock (_consumersSync) { snapshot = [.. _consumerList]; } foreach (var consumer in snapshot) consumer.Stop(); } internal StoredMsg? GetMsg(ulong seq) { StoredMsg? result = null; if (Store == null) return result; var loaded = Store.LoadMsg(seq, new StoreMsg()); if (loaded == null) return result; result = new StoredMsg { Subject = loaded.Subject, Sequence = loaded.Seq, Header = loaded.Hdr, Data = loaded.Msg, Time = DateTimeOffset.FromUnixTimeMilliseconds(loaded.Ts / 1_000_000L).UtcDateTime, }; return result; } internal List GetConsumers() { lock (_consumersSync) { return [.. _consumerList]; } } internal int NumPublicConsumers() { lock (_consumersSync) { return _consumerList.Count(c => !c.Config.Direct); } } internal List GetPublicConsumers() { lock (_consumersSync) { return [.. _consumerList.Where(c => !c.Config.Direct)]; } } internal List GetDirectConsumers() { lock (_consumersSync) { return [.. _consumerList.Where(c => c.Config.Direct)]; } } internal void CheckInterestState() { if (!IsInterestRetention() || Store == null) return; var consumers = GetConsumers(); if (consumers.Count == 0) return; ulong floor = ulong.MaxValue; foreach (var consumer in consumers) { var ack = Interlocked.Read(ref consumer.AckFloor); if (ack > 0 && (ulong)ack < floor) floor = (ulong)ack; } if (floor != ulong.MaxValue) Store.Compact(floor); } internal bool IsInterestRetention() => Config.Retention != RetentionPolicy.LimitsPolicy; internal int NumConsumers() { lock (_consumersSync) { return _consumerList.Count; } } internal void SetConsumer(NatsConsumer consumer) { ArgumentNullException.ThrowIfNull(consumer); lock (_consumersSync) { _consumers[consumer.Name] = consumer; if (_consumerList.All(c => !ReferenceEquals(c, consumer))) _consumerList.Add(consumer); } } internal void RemoveConsumer(NatsConsumer consumer) { ArgumentNullException.ThrowIfNull(consumer); lock (_consumersSync) { _consumers.Remove(consumer.Name); _consumerList.RemoveAll(c => ReferenceEquals(c, consumer)); } } internal Exception? DeleteConsumer(NatsConsumer consumer) { ArgumentNullException.ThrowIfNull(consumer); lock (_consumersSync) { _consumers.Remove(consumer.Name); _consumerList.RemoveAll(c => ReferenceEquals(c, consumer)); } consumer.DeleteWithoutAdvisory(); return null; } internal void SwapSigSubs(NatsConsumer consumer, string[]? newFilters) { _ = consumer; _ = newFilters; } internal NatsConsumer? LookupConsumer(string name) { if (string.IsNullOrWhiteSpace(name)) return _consumers.GetValueOrDefault(string.Empty); lock (_consumersSync) { return _consumers.GetValueOrDefault(name); } } internal int NumDirectConsumers() { lock (_consumersSync) { return _consumerList.Count(c => c.Config.Direct); } } internal StreamState StateWithDetail(bool details) { _ = details; return State(); } internal bool PartitionUnique(string name, string[] partitions) { lock (_consumersSync) { foreach (var partition in partitions) { foreach (var existing in _consumerList) { if (existing.Name == name) continue; var filters = existing.Config.FilterSubjects ?? (string.IsNullOrWhiteSpace(existing.Config.FilterSubject) ? [] : [existing.Config.FilterSubject!]); foreach (var filter in filters) { if (SubscriptionIndex.SubjectsCollide(partition, filter)) return false; } } } } return true; } internal bool PotentialFilteredConsumers() { var subjects = Config.Subjects ?? []; if (subjects.Length == 0) return false; lock (_consumersSync) { if (_consumerList.Count == 0) return false; } if (subjects.Length > 1) return true; return SubscriptionIndex.SubjectHasWildcard(subjects[0]); } internal bool NoInterest(ulong seq, NatsConsumer? observingConsumer) { _ = seq; lock (_consumersSync) { return _consumerList.All(c => ReferenceEquals(c, observingConsumer)); } } internal (NatsConsumer? Consumer, Exception? Error) AddConsumerWithAction( ConsumerConfig config, string oname, ConsumerAction action, bool pedantic = false) => AddConsumerWithAssignment(config, oname, null, isRecovering: false, action, pedantic); internal (NatsConsumer? Consumer, Exception? Error) AddConsumer( ConsumerConfig config, string oname, bool pedantic = false) => AddConsumerWithAssignment(config, oname, null, isRecovering: false, ConsumerAction.CreateOrUpdate, pedantic); internal (NatsConsumer? Consumer, Exception? Error) AddConsumerWithAssignment( ConsumerConfig config, string oname, ConsumerAssignment? assignment, bool isRecovering, ConsumerAction action, bool pedantic = false) { ArgumentNullException.ThrowIfNull(config); _mu.EnterWriteLock(); try { if (_closed) return (null, new InvalidOperationException("stream closed")); var name = !string.IsNullOrWhiteSpace(oname) ? oname : (!string.IsNullOrWhiteSpace(config.Name) ? config.Name! : (config.Durable ?? string.Empty)); if (string.IsNullOrWhiteSpace(name)) return (null, new InvalidOperationException("consumer name required")); config.Name = name; config.Durable ??= name; var defaultsErr = NatsConsumer.SetConsumerConfigDefaults(config, Config, null, pedantic); if (defaultsErr is not null) return (null, new InvalidOperationException(defaultsErr.Description ?? "consumer defaults invalid")); var cfgErr = NatsConsumer.CheckConsumerCfg(config, Config, null, isRecovering); if (cfgErr is not null) return (null, new InvalidOperationException(cfgErr.Description ?? "consumer config invalid")); if (_consumers.TryGetValue(name, out var existing)) { if (action == ConsumerAction.Create) return (null, new InvalidOperationException(JsApiErrors.NewJSConsumerAlreadyExistsError().Description ?? "consumer exists")); existing.UpdateConfig(config); if (assignment is not null) existing.SetConsumerAssignment(assignment); return (existing, null); } if (action == ConsumerAction.Update) return (null, new InvalidOperationException(JsApiErrors.NewJSConsumerDoesNotExistError().Description ?? "consumer does not exist")); var consumer = NatsConsumer.Create(this, config, action, assignment); if (consumer is null) return (null, new InvalidOperationException("consumer create failed")); consumer.SetConsumerAssignment(assignment); consumer.UpdateInactiveThreshold(config); consumer.UpdatePauseState(config); _consumers[name] = consumer; return (consumer, null); } finally { _mu.ExitWriteLock(); } } }