diff --git a/src/NATS.Server/JetStream/Consumers/FilterSkipTracker.cs b/src/NATS.Server/JetStream/Consumers/FilterSkipTracker.cs new file mode 100644 index 0000000..154a8c6 --- /dev/null +++ b/src/NATS.Server/JetStream/Consumers/FilterSkipTracker.cs @@ -0,0 +1,115 @@ +// Go: consumer.go isFilteredMatch, skipMsgs tracking (~line 4200) +namespace NATS.Server.JetStream.Consumers; + +using NATS.Server.Subscriptions; + +/// +/// Tracks filter matching for consumer message delivery. +/// Uses SubjectMatch.MatchLiteral() for NATS token-based filter matching (not Regex). +/// Maintains a sorted set of skipped sequences for gap tracking. +/// Go reference: consumer.go isFilteredMatch, skipMsgs tracking. +/// +public sealed class FilterSkipTracker +{ + private readonly string? _filterSubject; + private readonly IReadOnlyList _filterSubjects; + private readonly SortedSet _skippedSequences = new(); + private long _matchCount; + private long _skipCount; + + public FilterSkipTracker(string? filterSubject = null, IReadOnlyList? filterSubjects = null) + { + _filterSubject = filterSubject; + _filterSubjects = filterSubjects ?? Array.Empty(); + } + + /// Number of messages that matched the filter. + public long MatchCount => Interlocked.Read(ref _matchCount); + + /// Number of messages that were skipped (didn't match filter). + public long SkipCount => Interlocked.Read(ref _skipCount); + + /// Number of currently tracked skipped sequences. + public int SkippedSequenceCount => _skippedSequences.Count; + + /// + /// Returns true if the filter is active (has at least one filter subject). + /// + public bool HasFilter => !string.IsNullOrEmpty(_filterSubject) || _filterSubjects.Count > 0; + + /// + /// Checks if a message subject matches the filter. + /// Returns true (should deliver) when: + /// - No filter is configured + /// - Subject matches the single filter + /// - Subject matches any of the multiple filters + /// Uses SubjectMatch.MatchLiteral for NATS token-based matching. + /// Go reference: consumer.go isFilteredMatch. + /// + public bool ShouldDeliver(string subject) + { + if (!HasFilter) + { + Interlocked.Increment(ref _matchCount); + return true; + } + + if (!string.IsNullOrEmpty(_filterSubject)) + { + if (SubjectMatch.MatchLiteral(subject, _filterSubject)) + { + Interlocked.Increment(ref _matchCount); + return true; + } + } + + foreach (var filter in _filterSubjects) + { + if (SubjectMatch.MatchLiteral(subject, filter)) + { + Interlocked.Increment(ref _matchCount); + return true; + } + } + + Interlocked.Increment(ref _skipCount); + return false; + } + + /// + /// Records a skipped sequence for gap tracking. + /// + public void RecordSkip(ulong sequence) + { + _skippedSequences.Add(sequence); + } + + /// + /// Returns the next unskipped sequence >= startSeq. + /// Used to find the next deliverable message efficiently. + /// + public ulong NextUnskippedSequence(ulong startSeq) + { + var seq = startSeq; + while (_skippedSequences.Contains(seq)) + seq++; + return seq; + } + + /// + /// Clears skipped sequences below the given floor (e.g., ack floor). + /// Prevents unbounded growth. + /// + public void PurgeBelow(ulong floor) + { + _skippedSequences.RemoveWhere(s => s < floor); + } + + /// Resets all state. + public void Reset() + { + _skippedSequences.Clear(); + Interlocked.Exchange(ref _matchCount, 0); + Interlocked.Exchange(ref _skipCount, 0); + } +} diff --git a/tests/NATS.Server.Tests/JetStream/Consumers/FilterSkipTests.cs b/tests/NATS.Server.Tests/JetStream/Consumers/FilterSkipTests.cs new file mode 100644 index 0000000..bab2377 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Consumers/FilterSkipTests.cs @@ -0,0 +1,245 @@ +// Go: consumer.go isFilteredMatch, skipMsgs tracking +// FilterSkipTracker tests — verifies NATS token-based filter matching +// and skip sequence gap tracking. +using NATS.Server.JetStream.Consumers; + +namespace NATS.Server.Tests.JetStream.Consumers; + +public class FilterSkipTests +{ + // ------------------------------------------------------------------------- + // Test 1 — No filter always matches every subject + // + // Go reference: consumer.go isFilteredMatch — when no filter subjects are + // configured all messages are delivered. + // ------------------------------------------------------------------------- + [Fact] + public void ShouldDeliver_no_filter_always_matches() + { + var tracker = new FilterSkipTracker(); + + tracker.ShouldDeliver("orders.us").ShouldBeTrue(); + tracker.ShouldDeliver("events.payment").ShouldBeTrue(); + tracker.ShouldDeliver("anything").ShouldBeTrue(); + } + + // ------------------------------------------------------------------------- + // Test 2 — Single exact filter matches only the matching subject + // + // Go reference: consumer.go isFilteredMatch — literal subject match. + // ------------------------------------------------------------------------- + [Fact] + public void ShouldDeliver_single_filter_exact_match() + { + var tracker = new FilterSkipTracker(filterSubject: "orders.us"); + + tracker.ShouldDeliver("orders.us").ShouldBeTrue(); + } + + // ------------------------------------------------------------------------- + // Test 3 — Single filter does not match a different subject + // + // Go reference: consumer.go isFilteredMatch — non-matching subjects are + // skipped. + // ------------------------------------------------------------------------- + [Fact] + public void ShouldDeliver_single_filter_no_match() + { + var tracker = new FilterSkipTracker(filterSubject: "orders.us"); + + tracker.ShouldDeliver("events.x").ShouldBeFalse(); + } + + // ------------------------------------------------------------------------- + // Test 4 — Star wildcard matches a single token + // + // Go reference: consumer.go isFilteredMatch — SubjectMatch.MatchLiteral + // treats '*' as a single-token wildcard, so "orders.*" matches "orders.us" + // but not "orders.us.east" (two remaining tokens). + // ------------------------------------------------------------------------- + [Fact] + public void ShouldDeliver_wildcard_star() + { + var tracker = new FilterSkipTracker(filterSubject: "orders.*"); + + tracker.ShouldDeliver("orders.us").ShouldBeTrue(); + tracker.ShouldDeliver("orders.us.east").ShouldBeFalse(); + } + + // ------------------------------------------------------------------------- + // Test 5 — Greater-than wildcard matches remaining tokens + // + // Go reference: consumer.go isFilteredMatch — '>' matches one or more + // remaining tokens. + // ------------------------------------------------------------------------- + [Fact] + public void ShouldDeliver_wildcard_gt() + { + var tracker = new FilterSkipTracker(filterSubject: "orders.>"); + + tracker.ShouldDeliver("orders.us.east").ShouldBeTrue(); + tracker.ShouldDeliver("orders.eu").ShouldBeTrue(); + tracker.ShouldDeliver("events.x").ShouldBeFalse(); + } + + // ------------------------------------------------------------------------- + // Test 6 — Multiple filter subjects: matches any of them + // + // Go reference: consumer.go isFilteredMatch — when FilterSubjects is + // populated, a message matches if any entry matches. + // ------------------------------------------------------------------------- + [Fact] + public void ShouldDeliver_multiple_filters() + { + var tracker = new FilterSkipTracker(filterSubjects: ["orders.>", "events.>"]); + + tracker.ShouldDeliver("orders.us").ShouldBeTrue(); + tracker.ShouldDeliver("events.payment").ShouldBeTrue(); + tracker.ShouldDeliver("metrics.cpu").ShouldBeFalse(); + } + + // ------------------------------------------------------------------------- + // Test 7 — MatchCount increments when message is delivered + // + // Go reference: consumer.go — consumer tracks matched message counts. + // ------------------------------------------------------------------------- + [Fact] + public void MatchCount_increments_on_match() + { + var tracker = new FilterSkipTracker(filterSubject: "orders.us"); + + tracker.ShouldDeliver("orders.us"); + tracker.ShouldDeliver("orders.us"); + + tracker.MatchCount.ShouldBe(2L); + tracker.SkipCount.ShouldBe(0L); + } + + // ------------------------------------------------------------------------- + // Test 8 — SkipCount increments when message does not match + // + // Go reference: consumer.go skipMsgs — non-matching messages are counted. + // ------------------------------------------------------------------------- + [Fact] + public void SkipCount_increments_on_skip() + { + var tracker = new FilterSkipTracker(filterSubject: "orders.us"); + + tracker.ShouldDeliver("events.x"); + tracker.ShouldDeliver("events.y"); + + tracker.SkipCount.ShouldBe(2L); + tracker.MatchCount.ShouldBe(0L); + } + + // ------------------------------------------------------------------------- + // Test 9 — RecordSkip stores a sequence number in the skipped set + // + // Go reference: consumer.go skipMsgs — stores gap sequences for later + // resolution during delivery. + // ------------------------------------------------------------------------- + [Fact] + public void RecordSkip_tracks_sequence() + { + var tracker = new FilterSkipTracker(filterSubject: "orders.us"); + + tracker.RecordSkip(5UL); + tracker.RecordSkip(7UL); + + tracker.SkippedSequenceCount.ShouldBe(2); + } + + // ------------------------------------------------------------------------- + // Test 10 — NextUnskippedSequence skips over all recorded sequences + // + // Go reference: consumer.go — finding the next deliverable sequence after + // gaps caused by filter skips. + // ------------------------------------------------------------------------- + [Fact] + public void NextUnskippedSequence_skips_recorded() + { + var tracker = new FilterSkipTracker(filterSubject: "orders.us"); + + tracker.RecordSkip(2UL); + tracker.RecordSkip(3UL); + + // seq 1 is not skipped + tracker.NextUnskippedSequence(1UL).ShouldBe(1UL); + // seq 2 and 3 are skipped → next unskipped is 4 + tracker.NextUnskippedSequence(2UL).ShouldBe(4UL); + // seq 4 is not skipped + tracker.NextUnskippedSequence(4UL).ShouldBe(4UL); + } + + // ------------------------------------------------------------------------- + // Test 11 — PurgeBelow removes entries below the floor sequence + // + // Go reference: consumer.go — ack floor advancement purges old skip entries + // to prevent unbounded growth. + // ------------------------------------------------------------------------- + [Fact] + public void PurgeBelow_removes_old_entries() + { + var tracker = new FilterSkipTracker(filterSubject: "orders.us"); + + tracker.RecordSkip(1UL); + tracker.RecordSkip(3UL); + tracker.RecordSkip(5UL); + tracker.RecordSkip(7UL); + + tracker.PurgeBelow(5UL); + + // sequences 1 and 3 should be gone (< 5); 5 and 7 remain (>= 5) + tracker.SkippedSequenceCount.ShouldBe(2); + tracker.NextUnskippedSequence(5UL).ShouldBe(6UL); // 5 still skipped + tracker.NextUnskippedSequence(1UL).ShouldBe(1UL); // 1 was purged + } + + // ------------------------------------------------------------------------- + // Test 12 — HasFilter is false when no filter is configured + // + // Go reference: consumer.go — no filter means deliver all messages. + // ------------------------------------------------------------------------- + [Fact] + public void HasFilter_false_when_empty() + { + var tracker = new FilterSkipTracker(); + + tracker.HasFilter.ShouldBeFalse(); + } + + // ------------------------------------------------------------------------- + // Test 13 — HasFilter is true when a single filter is configured + // + // Go reference: consumer.go — FilterSubject set means selective delivery. + // ------------------------------------------------------------------------- + [Fact] + public void HasFilter_true_with_single_filter() + { + var tracker = new FilterSkipTracker(filterSubject: "orders.us"); + + tracker.HasFilter.ShouldBeTrue(); + } + + // ------------------------------------------------------------------------- + // Test 14 — Reset clears all counters and skipped sequences + // + // Go reference: consumer.go — consumer state reset on reconfiguration. + // ------------------------------------------------------------------------- + [Fact] + public void Reset_clears_all_state() + { + var tracker = new FilterSkipTracker(filterSubject: "orders.us"); + + tracker.ShouldDeliver("orders.us"); + tracker.ShouldDeliver("events.x"); + tracker.RecordSkip(10UL); + tracker.RecordSkip(11UL); + + tracker.Reset(); + + tracker.MatchCount.ShouldBe(0L); + tracker.SkipCount.ShouldBe(0L); + tracker.SkippedSequenceCount.ShouldBe(0); + } +}