batch37 task5 implement group D consumer registry and interest state

This commit is contained in:
Joseph Doherty
2026-02-28 23:47:48 -05:00
parent 6290b17a82
commit a805af1bea
3 changed files with 264 additions and 1 deletions

View File

@@ -1,5 +1,6 @@
using System.Threading.Channels;
using ZB.MOM.NatsNet.Server.Internal;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
namespace ZB.MOM.NatsNet.Server;
@@ -118,4 +119,195 @@ internal sealed partial class NatsStream
foreach (var consumer in snapshot)
consumer.Stop();
}
internal StoredMsg? GetMsg(ulong seq)
{
StoredMsg? result = null;
if (Store == null)
return result;
var loaded = Store.LoadMsg(seq, new StoreMsg());
if (loaded == null)
return result;
result = new StoredMsg
{
Subject = loaded.Subject,
Sequence = loaded.Seq,
Header = loaded.Hdr,
Data = loaded.Msg,
Time = DateTimeOffset.FromUnixTimeMilliseconds(loaded.Ts / 1_000_000L).UtcDateTime,
};
return result;
}
internal List<NatsConsumer> GetConsumers()
{
lock (_consumersSync)
{
return [.. _consumerList];
}
}
internal int NumPublicConsumers()
{
lock (_consumersSync)
{
return _consumerList.Count(c => !c.Config.Direct);
}
}
internal List<NatsConsumer> GetPublicConsumers()
{
lock (_consumersSync)
{
return [.. _consumerList.Where(c => !c.Config.Direct)];
}
}
internal List<NatsConsumer> GetDirectConsumers()
{
lock (_consumersSync)
{
return [.. _consumerList.Where(c => c.Config.Direct)];
}
}
internal void CheckInterestState()
{
if (!IsInterestRetention() || Store == null)
return;
var consumers = GetConsumers();
if (consumers.Count == 0)
return;
ulong floor = ulong.MaxValue;
foreach (var consumer in consumers)
{
var ack = Interlocked.Read(ref consumer.AckFloor);
if (ack > 0 && (ulong)ack < floor)
floor = (ulong)ack;
}
if (floor != ulong.MaxValue)
Store.Compact(floor);
}
internal bool IsInterestRetention() => Config.Retention != RetentionPolicy.LimitsPolicy;
internal int NumConsumers()
{
lock (_consumersSync)
{
return _consumerList.Count;
}
}
internal void SetConsumer(NatsConsumer consumer)
{
ArgumentNullException.ThrowIfNull(consumer);
lock (_consumersSync)
{
_consumers[consumer.Name] = consumer;
if (_consumerList.All(c => !ReferenceEquals(c, consumer)))
_consumerList.Add(consumer);
}
}
internal void RemoveConsumer(NatsConsumer consumer)
{
ArgumentNullException.ThrowIfNull(consumer);
lock (_consumersSync)
{
_consumers.Remove(consumer.Name);
_consumerList.RemoveAll(c => ReferenceEquals(c, consumer));
}
}
internal void SwapSigSubs(NatsConsumer consumer, string[]? newFilters)
{
_ = consumer;
_ = newFilters;
}
internal NatsConsumer? LookupConsumer(string name)
{
if (string.IsNullOrWhiteSpace(name))
return _consumers.GetValueOrDefault(string.Empty);
lock (_consumersSync)
{
return _consumers.GetValueOrDefault(name);
}
}
internal int NumDirectConsumers()
{
lock (_consumersSync)
{
return _consumerList.Count(c => c.Config.Direct);
}
}
internal StreamState StateWithDetail(bool details)
{
_ = details;
return State();
}
internal bool PartitionUnique(string name, string[] partitions)
{
lock (_consumersSync)
{
foreach (var partition in partitions)
{
foreach (var existing in _consumerList)
{
if (existing.Name == name)
continue;
var filters = existing.Config.FilterSubjects ??
(string.IsNullOrWhiteSpace(existing.Config.FilterSubject) ? [] : [existing.Config.FilterSubject!]);
foreach (var filter in filters)
{
if (SubscriptionIndex.SubjectsCollide(partition, filter))
return false;
}
}
}
}
return true;
}
internal bool PotentialFilteredConsumers()
{
var subjects = Config.Subjects ?? [];
if (subjects.Length == 0)
return false;
lock (_consumersSync)
{
if (_consumerList.Count == 0)
return false;
}
if (subjects.Length > 1)
return true;
return SubscriptionIndex.SubjectHasWildcard(subjects[0]);
}
internal bool NoInterest(ulong seq, NatsConsumer? observingConsumer)
{
_ = seq;
lock (_consumersSync)
{
return _consumerList.All(c => ReferenceEquals(c, observingConsumer));
}
}
}