diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs index 2c8942f..13761e9 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs @@ -1,5 +1,6 @@ using System.Threading.Channels; using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; namespace ZB.MOM.NatsNet.Server; @@ -118,4 +119,195 @@ internal sealed partial class NatsStream foreach (var consumer in snapshot) consumer.Stop(); } + + internal StoredMsg? GetMsg(ulong seq) + { + StoredMsg? result = null; + if (Store == null) + return result; + + var loaded = Store.LoadMsg(seq, new StoreMsg()); + if (loaded == null) + return result; + + result = new StoredMsg + { + Subject = loaded.Subject, + Sequence = loaded.Seq, + Header = loaded.Hdr, + Data = loaded.Msg, + Time = DateTimeOffset.FromUnixTimeMilliseconds(loaded.Ts / 1_000_000L).UtcDateTime, + }; + return result; + } + + internal List GetConsumers() + { + lock (_consumersSync) + { + return [.. _consumerList]; + } + } + + internal int NumPublicConsumers() + { + lock (_consumersSync) + { + return _consumerList.Count(c => !c.Config.Direct); + } + } + + internal List GetPublicConsumers() + { + lock (_consumersSync) + { + return [.. _consumerList.Where(c => !c.Config.Direct)]; + } + } + + internal List GetDirectConsumers() + { + lock (_consumersSync) + { + return [.. _consumerList.Where(c => c.Config.Direct)]; + } + } + + internal void CheckInterestState() + { + if (!IsInterestRetention() || Store == null) + return; + + var consumers = GetConsumers(); + if (consumers.Count == 0) + return; + + ulong floor = ulong.MaxValue; + foreach (var consumer in consumers) + { + var ack = Interlocked.Read(ref consumer.AckFloor); + if (ack > 0 && (ulong)ack < floor) + floor = (ulong)ack; + } + + if (floor != ulong.MaxValue) + Store.Compact(floor); + } + + internal bool IsInterestRetention() => Config.Retention != RetentionPolicy.LimitsPolicy; + + internal int NumConsumers() + { + lock (_consumersSync) + { + return _consumerList.Count; + } + } + + internal void SetConsumer(NatsConsumer consumer) + { + ArgumentNullException.ThrowIfNull(consumer); + + lock (_consumersSync) + { + _consumers[consumer.Name] = consumer; + if (_consumerList.All(c => !ReferenceEquals(c, consumer))) + _consumerList.Add(consumer); + } + } + + internal void RemoveConsumer(NatsConsumer consumer) + { + ArgumentNullException.ThrowIfNull(consumer); + + lock (_consumersSync) + { + _consumers.Remove(consumer.Name); + _consumerList.RemoveAll(c => ReferenceEquals(c, consumer)); + } + } + + internal void SwapSigSubs(NatsConsumer consumer, string[]? newFilters) + { + _ = consumer; + _ = newFilters; + } + + internal NatsConsumer? LookupConsumer(string name) + { + if (string.IsNullOrWhiteSpace(name)) + return _consumers.GetValueOrDefault(string.Empty); + + lock (_consumersSync) + { + return _consumers.GetValueOrDefault(name); + } + } + + internal int NumDirectConsumers() + { + lock (_consumersSync) + { + return _consumerList.Count(c => c.Config.Direct); + } + } + + internal StreamState StateWithDetail(bool details) + { + _ = details; + return State(); + } + + internal bool PartitionUnique(string name, string[] partitions) + { + lock (_consumersSync) + { + foreach (var partition in partitions) + { + foreach (var existing in _consumerList) + { + if (existing.Name == name) + continue; + + var filters = existing.Config.FilterSubjects ?? + (string.IsNullOrWhiteSpace(existing.Config.FilterSubject) ? [] : [existing.Config.FilterSubject!]); + + foreach (var filter in filters) + { + if (SubscriptionIndex.SubjectsCollide(partition, filter)) + return false; + } + } + } + } + + return true; + } + + internal bool PotentialFilteredConsumers() + { + var subjects = Config.Subjects ?? []; + if (subjects.Length == 0) + return false; + + lock (_consumersSync) + { + if (_consumerList.Count == 0) + return false; + } + + if (subjects.Length > 1) + return true; + + return SubscriptionIndex.SubjectHasWildcard(subjects[0]); + } + + internal bool NoInterest(ulong seq, NatsConsumer? observingConsumer) + { + _ = seq; + lock (_consumersSync) + { + return _consumerList.All(c => ReferenceEquals(c, observingConsumer)); + } + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamConsumersTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamConsumersTests.cs index 6b0f4e0..1090410 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamConsumersTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamConsumersTests.cs @@ -58,13 +58,84 @@ public sealed class NatsStreamConsumersTests Should.NotThrow(stream.ResetAndWaitOnConsumers); } - private static NatsStream CreateStream() + [Fact] + public void SetConsumer_LookupAndCounts_ReturnExpectedValues() + { + var stream = CreateStream(); + var standard = new NatsConsumer("S", new ConsumerConfig { Name = "c1", Direct = false }, DateTime.UtcNow); + var direct = new NatsConsumer("S", new ConsumerConfig { Name = "c2", Direct = true }, DateTime.UtcNow); + + stream.SetConsumer(standard); + stream.SetConsumer(direct); + + stream.NumConsumers().ShouldBe(2); + stream.NumPublicConsumers().ShouldBe(1); + stream.NumDirectConsumers().ShouldBe(1); + stream.LookupConsumer("c2").ShouldBe(direct); + } + + [Fact] + public void GetMsg_WhenStored_ReturnsStoredMessage() + { + var stream = CreateStream(); + stream.SetupStore(null).ShouldBeNull(); + stream.Store!.StoreMsg("events", null, [1, 2], ttl: 0); + + var message = stream.GetMsg(1); + + message.ShouldNotBeNull(); + message!.Subject.ShouldBe("events"); + message.Sequence.ShouldBe(1UL); + } + + [Fact] + public void PartitionUnique_WithCollidingFilters_ReturnsFalse() + { + var stream = CreateStream(); + var existing = new NatsConsumer("S", new ConsumerConfig { Name = "existing", FilterSubject = "orders.*" }, DateTime.UtcNow); + stream.SetConsumer(existing); + + var unique = stream.PartitionUnique("new", ["orders.created"]); + + unique.ShouldBeFalse(); + } + + [Fact] + public void PotentialFilteredConsumers_WithWildcardSubjectAndConsumer_ReturnsTrue() + { + var stream = CreateStream(["orders.>"]); + stream.SetConsumer(new NatsConsumer("S", new ConsumerConfig { Name = "c1" }, DateTime.UtcNow)); + + stream.PotentialFilteredConsumers().ShouldBeTrue(); + } + + [Fact] + public void NoInterest_WithOnlyObservingConsumer_ReturnsTrue() + { + var stream = CreateStream(); + var observer = new NatsConsumer("S", new ConsumerConfig { Name = "observer" }, DateTime.UtcNow); + stream.SetConsumer(observer); + + stream.NoInterest(1, observer).ShouldBeTrue(); + } + + [Fact] + public void IsInterestRetention_WhenPolicyIsInterest_ReturnsTrue() + { + var stream = CreateStream(retention: RetentionPolicy.InterestPolicy); + + stream.IsInterestRetention().ShouldBeTrue(); + } + + private static NatsStream CreateStream(string[]? subjects = null, RetentionPolicy retention = RetentionPolicy.LimitsPolicy) { var account = new Account { Name = "A" }; var config = new StreamConfig { Name = "S", Storage = StorageType.MemoryStorage, + Subjects = subjects ?? ["events.>"], + Retention = retention, }; return new NatsStream(account, config, DateTime.UtcNow); } diff --git a/porting.db b/porting.db index c81d811..66904ce 100644 Binary files a/porting.db and b/porting.db differ