// Copyright 2012-2025 The NATS Authors // Licensed under the Apache License, Version 2.0 using System.Text; using Shouldly; using ZB.MOM.NatsNet.Server.Internal; using ZB.MOM.NatsNet.Server.Internal.DataStructures; namespace ZB.MOM.NatsNet.Server.Tests.Internal.DataStructures; /// /// Tests for SubscriptionIndex — mirrors tests from server/sublist_test.go. /// Cache/no-cache pairs are collapsed into Theory with InlineData. /// public class SubscriptionIndexTests { // Helper factories matching Go's newSub/newQSub/newRemoteQSub. private static Subscription NewSub(string subject) => new() { Subject = Encoding.ASCII.GetBytes(subject), Client = new NatsClient { Kind = ClientKind.Client } }; private static Subscription NewQSub(string subject, string queue) => queue.Length > 0 ? new() { Subject = Encoding.ASCII.GetBytes(subject), Queue = Encoding.ASCII.GetBytes(queue) } : NewSub(subject); private static Subscription NewRemoteQSub(string subject, string queue, int weight) => queue.Length > 0 ? new() { Subject = Encoding.ASCII.GetBytes(subject), Queue = Encoding.ASCII.GetBytes(queue), Qw = weight, Client = new NatsClient { Kind = ClientKind.Router } } : NewSub(subject); private static SubscriptionIndex MakeSl(bool cache) => cache ? SubscriptionIndex.NewSublistWithCache() : SubscriptionIndex.NewSublistNoCache(); private static void VerifyMember(List subs, Subscription expected) { subs.ShouldContain(expected, $"Subscription for [{Encoding.ASCII.GetString(expected.Subject)}] not found in results"); } private static void VerifyQMember(List> qsubs, Subscription expected) { var idx = SubscriptionIndex.FindQSlot(expected.Queue, qsubs); idx.ShouldBeGreaterThanOrEqualTo(0); qsubs[idx].ShouldContain(expected); } // ------------------------------------------------------------------------- // Init & Count (T:2962-2964) // ------------------------------------------------------------------------- [Fact] // T:2962 public void Init_StartsWithZeroCount() { var s = SubscriptionIndex.NewSublistWithCache(); s.Count().ShouldBe(0u); } [Theory] // T:2963, T:2964 [InlineData(true)] [InlineData(false)] public void InsertCount_ReturnsCorrectCount(bool cache) { var s = MakeSl(cache); s.Insert(NewSub("foo")); s.Insert(NewSub("bar")); s.Insert(NewSub("foo.bar")); s.Count().ShouldBe(3u); } // ------------------------------------------------------------------------- // Simple Match (T:2965-2968) // ------------------------------------------------------------------------- [Theory] // T:2965, T:2966 [InlineData(true)] [InlineData(false)] public void Simple_SingleTokenMatch(bool cache) { var s = MakeSl(cache); var sub = NewSub("foo"); s.Insert(sub); var r = s.Match("foo"); r.PSubs.Count.ShouldBe(1); VerifyMember(r.PSubs, sub); } [Theory] // T:2967, T:2968 [InlineData(true)] [InlineData(false)] public void Simple_MultiTokenMatch(bool cache) { var s = MakeSl(cache); var sub = NewSub("foo.bar.baz"); s.Insert(sub); var r = s.Match("foo.bar.baz"); r.PSubs.Count.ShouldBe(1); VerifyMember(r.PSubs, sub); } // ------------------------------------------------------------------------- // Wildcard Match (T:2969-2974) // ------------------------------------------------------------------------- [Theory] // T:2969, T:2970 [InlineData(true)] [InlineData(false)] public void PartialWildcard_MatchesCorrectly(bool cache) { var s = MakeSl(cache); var lsub = NewSub("a.b.c"); var psub = NewSub("a.*.c"); s.Insert(lsub); s.Insert(psub); var r = s.Match("a.b.c"); r.PSubs.Count.ShouldBe(2); VerifyMember(r.PSubs, lsub); VerifyMember(r.PSubs, psub); } [Theory] // T:2971, T:2972 [InlineData(true)] [InlineData(false)] public void PartialWildcardAtEnd_MatchesCorrectly(bool cache) { var s = MakeSl(cache); var lsub = NewSub("a.b.c"); var psub = NewSub("a.b.*"); s.Insert(lsub); s.Insert(psub); var r = s.Match("a.b.c"); r.PSubs.Count.ShouldBe(2); VerifyMember(r.PSubs, lsub); VerifyMember(r.PSubs, psub); } [Theory] // T:2973, T:2974 [InlineData(true)] [InlineData(false)] public void FullWildcard_MatchesAll(bool cache) { var s = MakeSl(cache); var lsub = NewSub("a.b.c"); var fsub = NewSub("a.>"); s.Insert(lsub); s.Insert(fsub); var r = s.Match("a.b.c"); r.PSubs.Count.ShouldBe(2); VerifyMember(r.PSubs, lsub); VerifyMember(r.PSubs, fsub); r = s.Match("a.>"); r.PSubs.Count.ShouldBe(1); VerifyMember(r.PSubs, fsub); } // ------------------------------------------------------------------------- // Remove (T:2975-2984) // ------------------------------------------------------------------------- [Theory] // T:2975, T:2976 [InlineData(true)] [InlineData(false)] public void Remove_BasicRemoval(bool cache) { var s = MakeSl(cache); var sub = NewSub("a.b.c.d"); s.Insert(sub); s.Count().ShouldBe(1u); s.Match("a.b.c.d").PSubs.Count.ShouldBe(1); s.Remove(NewSub("a.b.c")); // wrong sub, no-op s.Count().ShouldBe(1u); s.Remove(sub); s.Count().ShouldBe(0u); s.Match("a.b.c.d").PSubs.Count.ShouldBe(0); } [Theory] // T:2977, T:2978 [InlineData(true)] [InlineData(false)] public void RemoveWildcard_CorrectCountsAndMatch(bool cache) { var s = MakeSl(cache); var sub = NewSub("a.b.c.d"); var psub = NewSub("a.b.*.d"); var fsub = NewSub("a.b.>"); s.Insert(sub); s.Insert(psub); s.Insert(fsub); s.Count().ShouldBe(3u); s.Match("a.b.c.d").PSubs.Count.ShouldBe(3); s.Remove(sub); s.Count().ShouldBe(2u); s.Remove(fsub); s.Count().ShouldBe(1u); s.Remove(psub); s.Count().ShouldBe(0u); s.Match("a.b.c.d").PSubs.Count.ShouldBe(0); } [Theory] // T:2979, T:2980 [InlineData(true)] [InlineData(false)] public void RemoveCleanup_LevelsReclaimed(bool cache) { var s = MakeSl(cache); var sub = NewSub("a.b.c.d.e.f"); s.NumLevels().ShouldBe(0); s.Insert(sub); s.NumLevels().ShouldBe(6); s.Remove(sub); s.NumLevels().ShouldBe(0); } [Theory] // T:2981, T:2982 [InlineData(true)] [InlineData(false)] public void RemoveCleanupWildcards_LevelsReclaimed(bool cache) { var s = MakeSl(cache); var sub = NewSub("a.b.*.d.e.>"); s.NumLevels().ShouldBe(0); s.Insert(sub); s.NumLevels().ShouldBe(6); s.Remove(sub); s.NumLevels().ShouldBe(0); } [Fact] public void PruneNode_NullNode_DoesNotMutateLiteralNodes() { var level = new SubscriptionIndex.Level(); var literal = new SubscriptionIndex.Node(); level.Nodes["foo"] = literal; level.Fwc = new SubscriptionIndex.Node(); level.Pwc = new SubscriptionIndex.Node(); level.PruneNode(null!, "foo"); level.Nodes.Count.ShouldBe(1); level.Nodes["foo"].ShouldBeSameAs(literal); } [Fact] public void IsEmpty_WithAndWithoutChildren_TracksNodeEmptiness() { var node = new SubscriptionIndex.Node(); node.IsEmpty().ShouldBeTrue(); node.Next = new SubscriptionIndex.Level(); node.Next.Nodes["bar"] = new SubscriptionIndex.Node(); node.IsEmpty().ShouldBeFalse(); node.Next.Nodes.Clear(); node.IsEmpty().ShouldBeTrue(); } [Fact] public void NumNodes_WithLiteralAndWildcardEntries_CountsAllBranches() { var level = new SubscriptionIndex.Level(); level.Nodes["foo"] = new SubscriptionIndex.Node(); level.Pwc = new SubscriptionIndex.Node(); level.Fwc = new SubscriptionIndex.Node(); level.NumNodes().ShouldBe(3); level.PruneNode(level.Pwc, SubscriptionIndex.Pwcs); level.PruneNode(level.Fwc, SubscriptionIndex.Fwcs); level.PruneNode(level.Nodes["foo"], "foo"); level.NumNodes().ShouldBe(0); } [Theory] // T:2983, T:2984 [InlineData(true)] [InlineData(false)] public void RemoveWithLargeSubs_CorrectAfterRemoval(bool cache) { var s = MakeSl(cache); for (int i = 0; i < SubscriptionIndex.PlistMin * 2; i++) s.Insert(NewSub("foo")); var r = s.Match("foo"); r.PSubs.Count.ShouldBe(SubscriptionIndex.PlistMin * 2); s.Remove(r.PSubs[SubscriptionIndex.PlistMin]); s.Remove(r.PSubs[0]); s.Remove(r.PSubs[^1]); r = s.Match("foo"); r.PSubs.Count.ShouldBe(SubscriptionIndex.PlistMin * 2 - 3); } // ------------------------------------------------------------------------- // Invalid Subjects (T:2985, T:2986) // ------------------------------------------------------------------------- [Theory] // T:2985, T:2986 [InlineData(true)] [InlineData(false)] public void InvalidSubjects_InsertReturnsError(bool cache) { var s = MakeSl(cache); s.Insert(NewSub(".foo")).ShouldBe(SubscriptionIndex.ErrInvalidSubject); s.Insert(NewSub("foo.")).ShouldBe(SubscriptionIndex.ErrInvalidSubject); s.Insert(NewSub("foo..bar")).ShouldBe(SubscriptionIndex.ErrInvalidSubject); s.Insert(NewSub("foo.bar..baz")).ShouldBe(SubscriptionIndex.ErrInvalidSubject); s.Insert(NewSub("foo.>.bar")).ShouldBe(SubscriptionIndex.ErrInvalidSubject); } // ------------------------------------------------------------------------- // RemoveBatch (T:2987, T:2988) // ------------------------------------------------------------------------- [Fact] // T:2987 public void NoCacheRemoveBatch_DoesNotEnableCache() { var s = SubscriptionIndex.NewSublistNoCache(); s.Insert(NewSub("foo")); var sub = NewSub("bar"); s.Insert(sub); s.RemoveBatch(new[] { sub }); for (int i = 0; i < 10; i++) s.Match("foo"); s.CacheEnabled().ShouldBeFalse(); } [Fact] // T:2988 public void RemoveBatchWithError_StillRemovesPresent() { var s = SubscriptionIndex.NewSublistNoCache(); var sub1 = NewSub("foo"); var sub2 = NewSub("bar"); var sub3 = NewSub("baz"); s.Insert(sub1); s.Insert(sub2); s.Insert(sub3); var notPresent = NewSub("not.inserted"); var err = s.RemoveBatch(new[] { notPresent, sub1, sub3 }); err.ShouldBe(SubscriptionIndex.ErrNotFound); s.Count().ShouldBe(1u); s.Match("bar").PSubs.Count.ShouldBe(1); VerifyMember(s.Match("bar").PSubs, sub2); s.Match("foo").PSubs.Count.ShouldBe(0); s.Match("baz").PSubs.Count.ShouldBe(0); } // ------------------------------------------------------------------------- // Cache behavior (T:2989) // ------------------------------------------------------------------------- [Fact] // T:2989 public void Cache_BasicBehavior() { var s = SubscriptionIndex.NewSublistWithCache(); var sub = NewSub("a.b.c.d"); var psub = NewSub("a.b.*.d"); var fsub = NewSub("a.b.>"); s.Insert(sub); s.Match("a.b.c.d").PSubs.Count.ShouldBe(1); s.Insert(psub); s.Insert(fsub); s.Count().ShouldBe(3u); s.Match("a.b.c.d").PSubs.Count.ShouldBe(3); s.Remove(sub); s.Count().ShouldBe(2u); s.Remove(fsub); s.Count().ShouldBe(1u); s.Remove(psub); s.Count().ShouldBe(0u); s.CacheCount().ShouldBe(0); s.Match("a.b.c.d").PSubs.Count.ShouldBe(0); // Fill cache beyond max. for (int i = 0; i < 2 * SubscriptionIndex.SlCacheMax; i++) s.Match($"foo-{i}"); // Cache sweep runs async; mirror Go test's retry window. var deadline = DateTime.UtcNow + TimeSpan.FromSeconds(2); while (DateTime.UtcNow < deadline && s.CacheCount() > SubscriptionIndex.SlCacheMax) Thread.Sleep(10); s.CacheCount().ShouldBeLessThanOrEqualTo(SubscriptionIndex.SlCacheMax); // Test wildcard cache update. s = SubscriptionIndex.NewSublistWithCache(); s.Insert(NewSub("foo.*")); s.Insert(NewSub("foo.bar")); s.Match("foo.baz").PSubs.Count.ShouldBe(1); s.Match("foo.bar").PSubs.Count.ShouldBe(2); s.Insert(NewSub("foo.>")); s.Match("foo.bar").PSubs.Count.ShouldBe(3); } // ------------------------------------------------------------------------- // Queue Results (T:2990, T:2991) // ------------------------------------------------------------------------- [Theory] // T:2990, T:2991 [InlineData(true)] [InlineData(false)] public void BasicQueueResults_CorrectGrouping(bool cache) { var s = MakeSl(cache); var sub = NewSub("foo"); var sub1 = NewQSub("foo", "bar"); var sub2 = NewQSub("foo", "baz"); s.Insert(sub1); var r = s.Match("foo"); r.PSubs.Count.ShouldBe(0); r.QSubs.Count.ShouldBe(1); r.QSubs[0].Count.ShouldBe(1); VerifyQMember(r.QSubs, sub1); s.Insert(sub2); r = s.Match("foo"); r.PSubs.Count.ShouldBe(0); r.QSubs.Count.ShouldBe(2); VerifyQMember(r.QSubs, sub1); VerifyQMember(r.QSubs, sub2); s.Insert(sub); r = s.Match("foo"); r.PSubs.Count.ShouldBe(1); r.QSubs.Count.ShouldBe(2); var sub3 = NewQSub("foo", "bar"); var sub4 = NewQSub("foo", "baz"); s.Insert(sub3); s.Insert(sub4); r = s.Match("foo"); r.PSubs.Count.ShouldBe(1); r.QSubs.Count.ShouldBe(2); // Verify each group has 2 members. var barIdx = SubscriptionIndex.FindQSlot(Encoding.ASCII.GetBytes("bar"), r.QSubs); var bazIdx = SubscriptionIndex.FindQSlot(Encoding.ASCII.GetBytes("baz"), r.QSubs); r.QSubs[barIdx].Count.ShouldBe(2); r.QSubs[bazIdx].Count.ShouldBe(2); // Removal. s.Remove(sub); r = s.Match("foo"); r.PSubs.Count.ShouldBe(0); r.QSubs.Count.ShouldBe(2); s.Remove(sub1); r = s.Match("foo"); barIdx = SubscriptionIndex.FindQSlot(Encoding.ASCII.GetBytes("bar"), r.QSubs); bazIdx = SubscriptionIndex.FindQSlot(Encoding.ASCII.GetBytes("baz"), r.QSubs); r.QSubs[barIdx].Count.ShouldBe(1); r.QSubs[bazIdx].Count.ShouldBe(2); s.Remove(sub3); // last bar r = s.Match("foo"); r.QSubs.Count.ShouldBe(1); // only baz remains s.Remove(sub2); s.Remove(sub4); r = s.Match("foo"); r.PSubs.Count.ShouldBe(0); r.QSubs.Count.ShouldBe(0); } // ------------------------------------------------------------------------- // Subject Validation (T:2992-2997) // ------------------------------------------------------------------------- [Fact] // T:2992 public void ValidLiteralSubjects_CorrectResults() { SubscriptionIndex.IsValidLiteralSubject("foo").ShouldBeTrue(); SubscriptionIndex.IsValidLiteralSubject(".foo").ShouldBeFalse(); SubscriptionIndex.IsValidLiteralSubject("foo.").ShouldBeFalse(); SubscriptionIndex.IsValidLiteralSubject("foo..bar").ShouldBeFalse(); SubscriptionIndex.IsValidLiteralSubject("foo.bar.*").ShouldBeFalse(); SubscriptionIndex.IsValidLiteralSubject("foo.bar.>").ShouldBeFalse(); SubscriptionIndex.IsValidLiteralSubject("*").ShouldBeFalse(); SubscriptionIndex.IsValidLiteralSubject(">").ShouldBeFalse(); // Wildcard chars that aren't standalone tokens. SubscriptionIndex.IsValidLiteralSubject("foo*").ShouldBeTrue(); SubscriptionIndex.IsValidLiteralSubject("foo**").ShouldBeTrue(); SubscriptionIndex.IsValidLiteralSubject("foo.**").ShouldBeTrue(); SubscriptionIndex.IsValidLiteralSubject("foo*bar").ShouldBeTrue(); SubscriptionIndex.IsValidLiteralSubject("foo.*bar").ShouldBeTrue(); SubscriptionIndex.IsValidLiteralSubject("foo*.bar").ShouldBeTrue(); SubscriptionIndex.IsValidLiteralSubject("*bar").ShouldBeTrue(); SubscriptionIndex.IsValidLiteralSubject("foo>").ShouldBeTrue(); SubscriptionIndex.IsValidLiteralSubject("foo>>").ShouldBeTrue(); SubscriptionIndex.IsValidLiteralSubject("foo.>>").ShouldBeTrue(); SubscriptionIndex.IsValidLiteralSubject("foo>bar").ShouldBeTrue(); SubscriptionIndex.IsValidLiteralSubject("foo.>bar").ShouldBeTrue(); SubscriptionIndex.IsValidLiteralSubject("foo>.bar").ShouldBeTrue(); SubscriptionIndex.IsValidLiteralSubject(">bar").ShouldBeTrue(); } [Fact] // T:2993 public void ValidSubjects_CorrectResults() { SubscriptionIndex.IsValidSubject(".").ShouldBeFalse(); SubscriptionIndex.IsValidSubject(".foo").ShouldBeFalse(); SubscriptionIndex.IsValidSubject("foo.").ShouldBeFalse(); SubscriptionIndex.IsValidSubject("foo..bar").ShouldBeFalse(); SubscriptionIndex.IsValidSubject(">.bar").ShouldBeFalse(); SubscriptionIndex.IsValidSubject("foo.>.bar").ShouldBeFalse(); SubscriptionIndex.IsValidSubject("foo").ShouldBeTrue(); SubscriptionIndex.IsValidSubject("foo.bar.*").ShouldBeTrue(); SubscriptionIndex.IsValidSubject("foo.bar.>").ShouldBeTrue(); SubscriptionIndex.IsValidSubject("*").ShouldBeTrue(); SubscriptionIndex.IsValidSubject(">").ShouldBeTrue(); SubscriptionIndex.IsValidSubject("foo*").ShouldBeTrue(); SubscriptionIndex.IsValidSubject("foo**").ShouldBeTrue(); SubscriptionIndex.IsValidSubject("foo.**").ShouldBeTrue(); SubscriptionIndex.IsValidSubject("foo*bar").ShouldBeTrue(); SubscriptionIndex.IsValidSubject("foo.*bar").ShouldBeTrue(); SubscriptionIndex.IsValidSubject("foo*.bar").ShouldBeTrue(); SubscriptionIndex.IsValidSubject("*bar").ShouldBeTrue(); SubscriptionIndex.IsValidSubject("foo>").ShouldBeTrue(); SubscriptionIndex.IsValidSubject("foo.>>").ShouldBeTrue(); SubscriptionIndex.IsValidSubject("foo>bar").ShouldBeTrue(); SubscriptionIndex.IsValidSubject("foo.>bar").ShouldBeTrue(); SubscriptionIndex.IsValidSubject("foo>.bar").ShouldBeTrue(); SubscriptionIndex.IsValidSubject(">bar").ShouldBeTrue(); } [Fact] // T:2994 public void MatchLiterals_CorrectResults() { SubscriptionIndex.MatchLiteral("foo", "foo").ShouldBeTrue(); SubscriptionIndex.MatchLiteral("foo", "bar").ShouldBeFalse(); SubscriptionIndex.MatchLiteral("foo", "*").ShouldBeTrue(); SubscriptionIndex.MatchLiteral("foo", ">").ShouldBeTrue(); SubscriptionIndex.MatchLiteral("foo.bar", ">").ShouldBeTrue(); SubscriptionIndex.MatchLiteral("foo.bar", "foo.>").ShouldBeTrue(); SubscriptionIndex.MatchLiteral("foo.bar", "bar.>").ShouldBeFalse(); SubscriptionIndex.MatchLiteral("stats.test.22", "stats.>").ShouldBeTrue(); SubscriptionIndex.MatchLiteral("stats.test.22", "stats.*.*").ShouldBeTrue(); SubscriptionIndex.MatchLiteral("foo.bar", "foo").ShouldBeFalse(); SubscriptionIndex.MatchLiteral("stats.test.foos", "stats.test.foos").ShouldBeTrue(); SubscriptionIndex.MatchLiteral("stats.test.foos", "stats.test.foo").ShouldBeFalse(); SubscriptionIndex.MatchLiteral("stats.test", "stats.test.*").ShouldBeFalse(); SubscriptionIndex.MatchLiteral("stats.test.foos", "stats.*").ShouldBeFalse(); SubscriptionIndex.MatchLiteral("stats.test.foos", "stats.*.*.foos").ShouldBeFalse(); // Wildcards as non-token literals. SubscriptionIndex.MatchLiteral("*bar", "*bar").ShouldBeTrue(); SubscriptionIndex.MatchLiteral("foo*", "foo*").ShouldBeTrue(); SubscriptionIndex.MatchLiteral("foo*bar", "foo*bar").ShouldBeTrue(); SubscriptionIndex.MatchLiteral("foo.***.bar", "foo.***.bar").ShouldBeTrue(); SubscriptionIndex.MatchLiteral(">bar", ">bar").ShouldBeTrue(); SubscriptionIndex.MatchLiteral("foo>", "foo>").ShouldBeTrue(); SubscriptionIndex.MatchLiteral("foo>bar", "foo>bar").ShouldBeTrue(); SubscriptionIndex.MatchLiteral("foo.>>>.bar", "foo.>>>.bar").ShouldBeTrue(); } [Fact] // T:2995 public void SubjectIsLiteral_CorrectResults() { SubscriptionIndex.SubjectIsLiteral("foo").ShouldBeTrue(); SubscriptionIndex.SubjectIsLiteral("foo.bar").ShouldBeTrue(); SubscriptionIndex.SubjectIsLiteral("foo*.bar").ShouldBeTrue(); SubscriptionIndex.SubjectIsLiteral("*").ShouldBeFalse(); SubscriptionIndex.SubjectIsLiteral(">").ShouldBeFalse(); SubscriptionIndex.SubjectIsLiteral("foo.*").ShouldBeFalse(); SubscriptionIndex.SubjectIsLiteral("foo.>").ShouldBeFalse(); SubscriptionIndex.SubjectIsLiteral("foo.*.>").ShouldBeFalse(); SubscriptionIndex.SubjectIsLiteral("foo.*.bar").ShouldBeFalse(); SubscriptionIndex.SubjectIsLiteral("foo.bar.>").ShouldBeFalse(); } [Fact] // T:2997 public void TokenAt_ReturnsCorrectTokens() { SubscriptionIndex.TokenAt("foo.bar.baz.*", 0).ShouldBe(string.Empty); SubscriptionIndex.TokenAt("foo.bar.baz.*", 1).ShouldBe("foo"); SubscriptionIndex.TokenAt("foo.bar.baz.*", 2).ShouldBe("bar"); SubscriptionIndex.TokenAt("foo.bar.baz.*", 3).ShouldBe("baz"); SubscriptionIndex.TokenAt("foo.bar.baz.*", 4).ShouldBe("*"); SubscriptionIndex.TokenAt("foo.bar.baz.*", 5).ShouldBe(string.Empty); } // ------------------------------------------------------------------------- // Bad Subject on Remove (T:2998, T:2999) // ------------------------------------------------------------------------- [Theory] // T:2998, T:2999 [InlineData(true)] [InlineData(false)] public void BadSubjectOnRemove_ReturnsError(bool cache) { var s = MakeSl(cache); s.Insert(NewSub("a.b..d")).ShouldBe(SubscriptionIndex.ErrInvalidSubject); s.Remove(NewSub("a.b..d")).ShouldBe(SubscriptionIndex.ErrInvalidSubject); s.Remove(NewSub("a.>.b")).ShouldBe(SubscriptionIndex.ErrInvalidSubject); } // ------------------------------------------------------------------------- // Two-token pub vs single-token sub (T:3000, T:3001) // ------------------------------------------------------------------------- [Theory] // T:3000, T:3001 [InlineData(true)] [InlineData(false)] public void TwoTokenPub_DoesNotMatchSingleTokenSub(bool cache) { var s = MakeSl(cache); var sub = NewSub("foo"); s.Insert(sub); s.Match("foo").PSubs.Count.ShouldBe(1); s.Match("foo.bar").PSubs.Count.ShouldBe(0); } // ------------------------------------------------------------------------- // Wildcards as literals (T:3002-3005) // ------------------------------------------------------------------------- [Theory] // T:3002, T:3003 [InlineData(true)] [InlineData(false)] public void InsertWithWildcardsAsLiterals_OnlyExactMatch(bool cache) { var s = MakeSl(cache); foreach (var subject in new[] { "foo.*-", "foo.>-" }) { var sub = NewSub(subject); s.Insert(sub); s.Match("foo.bar").PSubs.Count.ShouldBe(0); s.Match(subject).PSubs.Count.ShouldBe(1); } } [Theory] // T:3004, T:3005 [InlineData(true)] [InlineData(false)] public void RemoveWithWildcardsAsLiterals_CorrectRemoval(bool cache) { var s = MakeSl(cache); foreach (var subject in new[] { "foo.*-", "foo.>-" }) { var sub = NewSub(subject); s.Insert(sub); s.Remove(NewSub("foo.bar")); // wrong subject, no-op s.Count().ShouldBe(1u); s.Remove(sub); s.Count().ShouldBe(0u); } } // ------------------------------------------------------------------------- // Race tests (T:3006-3010) // ------------------------------------------------------------------------- [Theory] // T:3006, T:3007 [InlineData(true)] [InlineData(false)] public void RaceOnRemove_NoCorruption(bool cache) { var s = MakeSl(cache); var subs = new Subscription[100]; for (int i = 0; i < subs.Length; i++) subs[i] = NewQSub("foo", "bar"); for (int iter = 0; iter < 2; iter++) { foreach (var sub in subs) s.Insert(sub); if (iter == 1) s.Match("foo"); var r = s.Match("foo"); var task = Task.Run(() => { foreach (var sub in subs) s.Remove(sub); }); foreach (var qsub in r.QSubs) for (int i = 0; i < qsub.Count; i++) Encoding.ASCII.GetString(qsub[i].Queue!).ShouldBe("bar"); task.Wait(); } } [Theory] // T:3008, T:3009 [InlineData(true)] [InlineData(false)] public void RaceOnInsert_NoCorruption(bool cache) { var s = MakeSl(cache); var subs = new Subscription[100]; for (int i = 0; i < subs.Length; i++) subs[i] = NewQSub("foo", "bar"); var task = Task.Run(() => { foreach (var sub in subs) s.Insert(sub); }); for (int i = 0; i < 1000; i++) { var r = s.Match("foo"); foreach (var qsub in r.QSubs) foreach (var sub in qsub) Encoding.ASCII.GetString(sub.Queue!).ShouldBe("bar"); } task.Wait(); } [Fact] // T:3010 public void RaceOnMatch_NoCorruption() { var s = SubscriptionIndex.NewSublistNoCache(); s.Insert(NewQSub("foo.*", "workers")); s.Insert(NewQSub("foo.bar", "workers")); s.Insert(NewSub("foo.*")); s.Insert(NewSub("foo.bar")); Exception? error = null; var tasks = Enumerable.Range(0, 2).Select(_ => Task.Run(() => { for (int i = 0; i < 10; i++) { var r = s.Match("foo.bar"); foreach (var sub in r.PSubs) if (!Encoding.ASCII.GetString(sub.Subject).StartsWith("foo.")) Interlocked.CompareExchange(ref error, new Exception($"Wrong subject: {Encoding.ASCII.GetString(sub.Subject)}"), null); foreach (var qsub in r.QSubs) foreach (var sub in qsub) if (Encoding.ASCII.GetString(sub.Queue!) != "workers") Interlocked.CompareExchange(ref error, new Exception($"Wrong queue: {Encoding.ASCII.GetString(sub.Queue!)}"), null); } })).ToArray(); Task.WaitAll(tasks); error.ShouldBeNull(); } // ------------------------------------------------------------------------- // Remote Queue Subscriptions (T:3011, T:3012) // ------------------------------------------------------------------------- [Theory] // T:3011, T:3012 [InlineData(true)] [InlineData(false)] public void RemoteQueueSubscriptions_WeightExpansion(bool cache) { var s = MakeSl(cache); var s1 = NewQSub("foo", "bar"); var s2 = NewQSub("foo", "bar"); s.Insert(s1); s.Insert(s2); var rs1 = NewRemoteQSub("foo", "bar", 10); var rs2 = NewRemoteQSub("foo", "bar", 10); s.Insert(rs1); s.Insert(rs2); s.Count().ShouldBe(4u); var r = s.Match("foo"); r.PSubs.Count.ShouldBe(0); r.QSubs.Count.ShouldBe(1); r.QSubs[0].Count.ShouldBe(22); // 2 normal + 10 + 10 remote s.Remove(s1); s.Remove(rs1); s.Count().ShouldBe(2u); r = s.Match("foo"); r.QSubs[0].Count.ShouldBe(11); // 1 normal + 10 remote Interlocked.Exchange(ref rs2.Qw, 1); s.UpdateRemoteQSub(rs2); r = s.Match("foo"); r.QSubs[0].Count.ShouldBe(2); // 1 normal + 1 remote } // ------------------------------------------------------------------------- // Shared Empty Result (T:3013) // ------------------------------------------------------------------------- [Fact] // T:3013 public void SharedEmptyResult_SameReference() { var s = SubscriptionIndex.NewSublistWithCache(); var r1 = s.Match("foo"); var r2 = s.Match("bar"); r1.PSubs.Count.ShouldBe(0); r2.QSubs.Count.ShouldBe(0); ReferenceEquals(r1, r2).ShouldBeTrue(); } // ------------------------------------------------------------------------- // No-cache Stats (T:3014) // ------------------------------------------------------------------------- [Fact] // T:3014 public void NoCacheStats_CacheCountIsZero() { var s = SubscriptionIndex.NewSublistNoCache(); s.Insert(NewSub("foo")); s.Insert(NewSub("bar")); s.Insert(NewSub("baz")); s.Insert(NewSub("foo.bar.baz")); s.Match("a.b.c"); s.Match("bar"); s.Stats().NumCache.ShouldBe(0u); } // ------------------------------------------------------------------------- // All (T:3015) // ------------------------------------------------------------------------- [Fact] // T:3015 public void All_CollectsAllSubscriptions() { var s = SubscriptionIndex.NewSublistNoCache(); var subs = new[] { NewSub("foo.bar.baz"), NewSub("foo"), NewSub("baz"), }; subs[0].Client!.Kind = ClientKind.Leaf; foreach (var sub in subs) s.Insert(sub); var output = new List(); s.All(output); output.Count.ShouldBe(3); } // ------------------------------------------------------------------------- // IsSubsetMatch (T:3016) // ------------------------------------------------------------------------- [Theory] // T:3016 [InlineData("foo.bar", "foo.bar", true)] [InlineData("foo.*", ">", true)] [InlineData("foo.*", "*.*", true)] [InlineData("foo.*", "foo.*", true)] [InlineData("foo.*", "foo.bar", false)] [InlineData("foo.>", ">", true)] [InlineData("foo.>", "*.>", true)] [InlineData("foo.>", "foo.>", true)] [InlineData("foo.>", "foo.bar", false)] [InlineData("foo..bar", "foo.*", false)] [InlineData("foo.*", "foo..bar", false)] public void IsSubsetMatch_CorrectResults(string subject, string test, bool expected) { SubscriptionIndex.SubjectIsSubsetMatch(subject, test).ShouldBe(expected); } // ------------------------------------------------------------------------- // ReverseMatch (T:3018, T:3019) // ------------------------------------------------------------------------- [Fact] // T:3018 public void ReverseMatch_FindsMatchingSubscriptions() { var s = SubscriptionIndex.NewSublistWithCache(); var fooSub = NewSub("foo"); var barSub = NewSub("bar"); var fooBarSub = NewSub("foo.bar"); var fooBazSub = NewSub("foo.baz"); var fooBarBazSub = NewSub("foo.bar.baz"); s.Insert(fooSub); s.Insert(barSub); s.Insert(fooBarSub); s.Insert(fooBazSub); s.Insert(fooBarBazSub); var r = s.ReverseMatch("foo"); r.PSubs.Count.ShouldBe(1); VerifyMember(r.PSubs, fooSub); r = s.ReverseMatch("bar"); r.PSubs.Count.ShouldBe(1); VerifyMember(r.PSubs, barSub); r = s.ReverseMatch("*"); r.PSubs.Count.ShouldBe(2); VerifyMember(r.PSubs, fooSub); VerifyMember(r.PSubs, barSub); r = s.ReverseMatch("baz"); r.PSubs.Count.ShouldBe(0); r = s.ReverseMatch("foo.*"); r.PSubs.Count.ShouldBe(2); VerifyMember(r.PSubs, fooBarSub); VerifyMember(r.PSubs, fooBazSub); r = s.ReverseMatch("*.*"); r.PSubs.Count.ShouldBe(2); r = s.ReverseMatch("*.bar"); r.PSubs.Count.ShouldBe(1); VerifyMember(r.PSubs, fooBarSub); r = s.ReverseMatch("bar.*"); r.PSubs.Count.ShouldBe(0); r = s.ReverseMatch("foo.>"); r.PSubs.Count.ShouldBe(3); r = s.ReverseMatch(">"); r.PSubs.Count.ShouldBe(5); } [Fact] // T:3019 public void ReverseMatchWider_MatchesWiderFilter() { var s = SubscriptionIndex.NewSublistWithCache(); var sub = NewSub("uplink.*.*.>"); s.Insert(sub); var r = s.ReverseMatch("uplink.1.*.*.>"); r.PSubs.Count.ShouldBe(1); VerifyMember(r.PSubs, sub); r = s.ReverseMatch("uplink.1.2.3.>"); r.PSubs.Count.ShouldBe(1); VerifyMember(r.PSubs, sub); } // ------------------------------------------------------------------------- // Match with empty tokens (T:3020) // ------------------------------------------------------------------------- [Fact] // T:3020 public void MatchWithEmptyTokens_ReturnsNoResults() { var sl = SubscriptionIndex.NewSublistWithCache(); sl.Insert(NewSub(">")); sl.Insert(NewQSub(">", "queue")); foreach (var subj in new[] { ".foo", "..foo", "foo..", "foo.", "foo..bar", "foo...bar" }) { var r = sl.Match(subj); r.PSubs.Count.ShouldBe(0); r.QSubs.Count.ShouldBe(0); } } // ------------------------------------------------------------------------- // SubjectsCollide (T:3021) // ------------------------------------------------------------------------- [Fact] // T:3021 public void SubjectsCollide_CorrectResults() { SubscriptionIndex.SubjectsCollide("foo.*", "foo.*.bar.>").ShouldBeFalse(); SubscriptionIndex.SubjectsCollide("foo.*.bar.>", "foo.*").ShouldBeFalse(); SubscriptionIndex.SubjectsCollide("foo.*", "foo.foo").ShouldBeTrue(); SubscriptionIndex.SubjectsCollide("foo.*", "*.foo").ShouldBeTrue(); SubscriptionIndex.SubjectsCollide("foo.bar.>", "*.bar.foo").ShouldBeTrue(); } // ------------------------------------------------------------------------- // Cache hit rate (T:3022) // ------------------------------------------------------------------------- [Fact] // T:3022 public void AddCacheHitRate_CorrectAggregation() { var sl1 = SubscriptionIndex.NewSublistWithCache(); sl1.Insert(NewSub("foo")); for (int i = 0; i < 4; i++) sl1.Match("foo"); var stats1 = sl1.Stats(); stats1.CacheHitRate.ShouldBe(0.75); var sl2 = SubscriptionIndex.NewSublistWithCache(); sl2.Insert(NewSub("bar")); for (int i = 0; i < 4; i++) sl2.Match("bar"); var stats2 = sl2.Stats(); stats2.CacheHitRate.ShouldBe(0.75); var ts = new SublistStats(); ts.Add(stats1); ts.Add(stats2); ts.CacheHitRate.ShouldBe(0.75); } // ------------------------------------------------------------------------- // HasInterest (T:3024) // ------------------------------------------------------------------------- [Fact] // T:3024 public void HasInterest_CorrectForLiteralsAndWildcards() { var sl = SubscriptionIndex.NewSublistWithCache(); var fooSub = NewSub("foo"); sl.Insert(fooSub); sl.HasInterest("foo").ShouldBeTrue(); sl.HasInterest("bar").ShouldBeFalse(); sl.Remove(fooSub); sl.HasInterest("foo").ShouldBeFalse(); // Partial wildcard. var sub = NewSub("foo.*"); sl.Insert(sub); sl.HasInterest("foo").ShouldBeFalse(); sl.HasInterest("foo.bar").ShouldBeTrue(); sl.HasInterest("foo.bar.baz").ShouldBeFalse(); sl.Remove(sub); // Full wildcard. sub = NewSub("foo.>"); sl.Insert(sub); sl.HasInterest("foo").ShouldBeFalse(); sl.HasInterest("foo.bar").ShouldBeTrue(); sl.HasInterest("foo.bar.baz").ShouldBeTrue(); sl.Remove(sub); // Queue subs. var qsub = NewQSub("foo", "bar"); sl.Insert(qsub); sl.HasInterest("foo").ShouldBeTrue(); sl.HasInterest("foo.bar").ShouldBeFalse(); sl.Remove(qsub); sl.HasInterest("foo").ShouldBeFalse(); } // ------------------------------------------------------------------------- // HasInterest Overlapping (T:3025) // ------------------------------------------------------------------------- [Fact] // T:3025 public void HasInterestOverlapping_BothMatch() { var sl = SubscriptionIndex.NewSublistWithCache(); sl.Insert(NewSub("stream.A.child")); sl.Insert(NewSub("stream.*")); sl.HasInterest("stream.A.child").ShouldBeTrue(); sl.HasInterest("stream.A").ShouldBeTrue(); } // ------------------------------------------------------------------------- // NumInterest (T:3026) // ------------------------------------------------------------------------- [Fact] // T:3026 public void NumInterest_CorrectCounts() { var sl = SubscriptionIndex.NewSublistWithCache(); sl.Insert(NewSub("foo")); var (np, nq) = sl.NumInterest("foo"); np.ShouldBe(1); nq.ShouldBe(0); (np, nq) = sl.NumInterest("bar"); np.ShouldBe(0); nq.ShouldBe(0); // Add queue sub. sl.Insert(NewQSub("foo", "q1")); (np, nq) = sl.NumInterest("foo"); np.ShouldBe(1); nq.ShouldBe(1); } // ------------------------------------------------------------------------- // Notifications (T:3017) — simplified from the large Go test // ------------------------------------------------------------------------- [Fact] // T:3017 public void RegisterInterestNotification_BasicFlow() { var s = SubscriptionIndex.NewSublistWithCache(); var notifications = new List(); Action notify = b => notifications.Add(b); // Wildcards should be rejected. s.RegisterNotification("foo.*", notify).ShouldBe(SubscriptionIndex.ErrInvalidSubject); s.RegisterNotification(">", notify).ShouldBe(SubscriptionIndex.ErrInvalidSubject); // Register for "foo" — no existing interest, should get false. s.RegisterNotification("foo", notify).ShouldBeNull(); notifications.Count.ShouldBe(1); notifications[0].ShouldBeFalse(); notifications.Clear(); // Insert interest — should get true. var sub = NewSub("foo"); s.Insert(sub); notifications.Count.ShouldBe(1); notifications[0].ShouldBeTrue(); notifications.Clear(); // Insert a second — no notification (already have interest). var sub2 = NewSub("foo"); s.Insert(sub2); notifications.Count.ShouldBe(0); // Remove all interest — should get false. s.Remove(sub); s.Remove(sub2); notifications.Count.ShouldBe(1); notifications[0].ShouldBeFalse(); notifications.Clear(); // Clear should return true. s.ClearNotification("foo", notify).ShouldBeTrue(); // Clear again should return false (already cleared). s.ClearNotification("foo", notify).ShouldBeFalse(); } [Fact] // T:3017 (queue notification portion) public void RegisterQueueNotification_BasicFlow() { var s = SubscriptionIndex.NewSublistWithCache(); var notifications = new List(); Action notify = b => notifications.Add(b); s.RegisterQueueNotification("foo.bar", "q1", notify).ShouldBeNull(); notifications.Count.ShouldBe(1); notifications[0].ShouldBeFalse(); notifications.Clear(); var qsub = NewQSub("foo.bar", "q1"); s.Insert(qsub); notifications.Count.ShouldBe(1); notifications[0].ShouldBeTrue(); notifications.Clear(); s.Remove(qsub); notifications.Count.ShouldBe(1); notifications[0].ShouldBeFalse(); notifications.Clear(); s.ClearQueueNotification("foo.bar", "q1", notify).ShouldBeTrue(); } }