From 27faf64548c1debb68378815f1d2ab76dbd09262 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 24 Feb 2026 15:00:23 -0500 Subject: [PATCH] feat(gateways): implement GatewayInterestTracker for interest-only mode state machine (D1) Ports the gateway interest-only mode from Go (gateway.go:100-150, 1500-1600): - Add GatewayInterestTracker with Optimistic/Transitioning/InterestOnly modes - In Optimistic mode, track no-interest set; switch to InterestOnly when set exceeds threshold (default 1000, matching Go defaultGatewayMaxRUnsubThreshold) - In InterestOnly mode, only forward subjects with tracked RS+ interest; use SubjectMatch.MatchLiteral for wildcard pattern support - Integrate tracker into GatewayConnection: A+/A- messages update tracker, SendMessageAsync skips send when ShouldForward returns false - Expose InterestTracker property on GatewayConnection for observability - Add 13 unit tests covering all 8 specified behaviors plus edge cases --- src/NATS.Server/Gateways/GatewayConnection.cs | 25 +- .../Gateways/GatewayInterestTracker.cs | 190 ++++++++++++++ .../Gateways/GatewayInterestTrackerTests.cs | 241 ++++++++++++++++++ 3 files changed, 452 insertions(+), 4 deletions(-) create mode 100644 src/NATS.Server/Gateways/GatewayInterestTracker.cs create mode 100644 tests/NATS.Server.Tests/Gateways/GatewayInterestTrackerTests.cs diff --git a/src/NATS.Server/Gateways/GatewayConnection.cs b/src/NATS.Server/Gateways/GatewayConnection.cs index 001a724..123cf21 100644 --- a/src/NATS.Server/Gateways/GatewayConnection.cs +++ b/src/NATS.Server/Gateways/GatewayConnection.cs @@ -9,6 +9,7 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable private readonly NetworkStream _stream = new(socket, ownsSocket: true); private readonly SemaphoreSlim _writeGate = new(1, 1); private readonly CancellationTokenSource _closedCts = new(); + private readonly GatewayInterestTracker _interestTracker = new(); private Task? _loopTask; public string? RemoteId { get; private set; } @@ -16,6 +17,12 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable public Func? RemoteSubscriptionReceived { get; set; } public Func? MessageReceived { get; set; } + /// + /// Per-connection interest mode tracker. + /// Go: gateway.go:100-150 — each outbound gateway connection maintains its own interest state. + /// + public GatewayInterestTracker InterestTracker => _interestTracker; + public async Task PerformOutboundHandshakeAsync(string serverId, CancellationToken ct) { await WriteLineAsync($"GATEWAY {serverId}", ct); @@ -50,6 +57,10 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable public async Task SendMessageAsync(string account, string subject, string? replyTo, ReadOnlyMemory payload, CancellationToken ct) { + // Go: gateway.go:2900 (shouldForwardMsg) — check interest tracker before sending + if (!_interestTracker.ShouldForward(account, subject)) + return; + var reply = string.IsNullOrEmpty(replyTo) ? "-" : replyTo; await _writeGate.WaitAsync(ct); try @@ -94,9 +105,12 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable if (line.StartsWith("A+ ", StringComparison.Ordinal)) { var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries); - if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var parsedAccount, out var parsedSubject, out var queue)) + if (TryParseAccountScopedInterest(parts, out var parsedAccount, out var parsedSubject, out var queue)) { - await RemoteSubscriptionReceived(new RemoteSubscription(parsedSubject, queue, RemoteId ?? string.Empty, parsedAccount)); + // Go: gateway.go:1540 — track positive interest on A+ + _interestTracker.TrackInterest(parsedAccount, parsedSubject); + if (RemoteSubscriptionReceived != null) + await RemoteSubscriptionReceived(new RemoteSubscription(parsedSubject, queue, RemoteId ?? string.Empty, parsedAccount)); } continue; } @@ -104,9 +118,12 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable if (line.StartsWith("A- ", StringComparison.Ordinal)) { var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries); - if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var parsedAccount, out var parsedSubject, out var queue)) + if (TryParseAccountScopedInterest(parts, out var parsedAccount, out var parsedSubject, out var queue)) { - await RemoteSubscriptionReceived(RemoteSubscription.Removal(parsedSubject, queue, RemoteId ?? string.Empty, parsedAccount)); + // Go: gateway.go:1560 — track no-interest on A-, may trigger mode switch + _interestTracker.TrackNoInterest(parsedAccount, parsedSubject); + if (RemoteSubscriptionReceived != null) + await RemoteSubscriptionReceived(RemoteSubscription.Removal(parsedSubject, queue, RemoteId ?? string.Empty, parsedAccount)); } continue; } diff --git a/src/NATS.Server/Gateways/GatewayInterestTracker.cs b/src/NATS.Server/Gateways/GatewayInterestTracker.cs new file mode 100644 index 0000000..cf77e64 --- /dev/null +++ b/src/NATS.Server/Gateways/GatewayInterestTracker.cs @@ -0,0 +1,190 @@ +// Go: gateway.go:100-150 (InterestMode enum) +// Go: gateway.go:1500-1600 (switchToInterestOnlyMode) +using System.Collections.Concurrent; +using NATS.Server.Subscriptions; + +namespace NATS.Server.Gateways; + +/// +/// Tracks the interest mode for each account on a gateway connection. +/// In Optimistic mode, all messages are forwarded unless a subject is in the +/// no-interest set. Once the no-interest set exceeds the threshold (1000), +/// the account switches to InterestOnly mode where only subjects with tracked +/// RS+ interest are forwarded. +/// +public enum GatewayInterestMode +{ + /// Forward everything (initial state). Track subjects with no interest. + Optimistic, + + /// Mode transition in progress. + Transitioning, + + /// Only forward subjects with known remote interest (RS+ received). + InterestOnly, +} + +/// +/// Per-account interest state machine for a gateway connection. +/// Go reference: gateway.go:100-150 (struct srvGateway, interestMode fields), +/// gateway.go:1500-1600 (switchToInterestOnlyMode, processGatewayAccountUnsub). +/// +public sealed class GatewayInterestTracker +{ + /// + /// Number of no-interest subjects before switching to InterestOnly mode. + /// Go: gateway.go:134 (defaultGatewayMaxRUnsubThreshold = 1000) + /// + public const int DefaultNoInterestThreshold = 1000; + + private readonly int _noInterestThreshold; + + // Per-account state: mode + no-interest set (Optimistic) or positive interest set (InterestOnly) + private readonly ConcurrentDictionary _accounts = new(StringComparer.Ordinal); + + public GatewayInterestTracker(int noInterestThreshold = DefaultNoInterestThreshold) + { + _noInterestThreshold = noInterestThreshold; + } + + /// + /// Returns the current interest mode for the given account. + /// Accounts default to Optimistic until the no-interest threshold is exceeded. + /// + public GatewayInterestMode GetMode(string account) + => _accounts.TryGetValue(account, out var state) ? state.Mode : GatewayInterestMode.Optimistic; + + /// + /// Track a positive interest (RS+ received from remote) for an account/subject. + /// Go: gateway.go:1540 (processGatewayAccountSub — adds to interest set) + /// + public void TrackInterest(string account, string subject) + { + var state = GetOrCreateState(account); + lock (state) + { + // In Optimistic mode, remove from no-interest set if present + if (state.Mode == GatewayInterestMode.Optimistic) + { + state.NoInterestSet.Remove(subject); + return; + } + + // In InterestOnly mode, add to the positive interest set + if (state.Mode == GatewayInterestMode.InterestOnly) + { + state.InterestSet.Add(subject); + } + } + } + + /// + /// Track a no-interest event (RS- received from remote) for an account/subject. + /// When the no-interest set crosses the threshold, switches to InterestOnly mode. + /// Go: gateway.go:1560 (processGatewayAccountUnsub — tracks no-interest, triggers switch) + /// + public void TrackNoInterest(string account, string subject) + { + var state = GetOrCreateState(account); + lock (state) + { + if (state.Mode == GatewayInterestMode.InterestOnly) + { + // In InterestOnly mode, remove from positive interest set + state.InterestSet.Remove(subject); + return; + } + + if (state.Mode == GatewayInterestMode.Optimistic) + { + state.NoInterestSet.Add(subject); + + if (state.NoInterestSet.Count >= _noInterestThreshold) + DoSwitchToInterestOnly(state); + } + } + } + + /// + /// Determines whether a message should be forwarded to the remote gateway + /// for the given account and subject. + /// Go: gateway.go:2900 (shouldForwardMsg — checks mode and interest) + /// + public bool ShouldForward(string account, string subject) + { + if (!_accounts.TryGetValue(account, out var state)) + return true; // Optimistic by default — no state yet means forward + + lock (state) + { + return state.Mode switch + { + GatewayInterestMode.Optimistic => + // Forward unless subject is in no-interest set + !state.NoInterestSet.Contains(subject), + + GatewayInterestMode.Transitioning => + // During transition, be conservative and forward + true, + + GatewayInterestMode.InterestOnly => + // Only forward if at least one interest pattern matches + MatchesAnyInterest(state, subject), + + _ => true, + }; + } + } + + /// + /// Explicitly switch an account to InterestOnly mode. + /// Called when the remote signals it is in interest-only mode. + /// Go: gateway.go:1500 (switchToInterestOnlyMode) + /// + public void SwitchToInterestOnly(string account) + { + var state = GetOrCreateState(account); + lock (state) + { + if (state.Mode != GatewayInterestMode.InterestOnly) + DoSwitchToInterestOnly(state); + } + } + + // ── Private helpers ──────────────────────────────────────────────── + + private AccountState GetOrCreateState(string account) + => _accounts.GetOrAdd(account, _ => new AccountState()); + + private static void DoSwitchToInterestOnly(AccountState state) + { + // Go: gateway.go:1510-1530 — clear no-interest, build positive interest from what remains + state.Mode = GatewayInterestMode.InterestOnly; + state.NoInterestSet.Clear(); + // InterestSet starts empty; subsequent RS+ events will populate it + } + + private static bool MatchesAnyInterest(AccountState state, string subject) + { + foreach (var pattern in state.InterestSet) + { + // Use SubjectMatch.MatchLiteral to support wildcard patterns in the interest set + if (SubjectMatch.MatchLiteral(subject, pattern)) + return true; + } + + return false; + } + + /// Per-account mutable state. All access must be under the instance lock. + private sealed class AccountState + { + public GatewayInterestMode Mode { get; set; } = GatewayInterestMode.Optimistic; + + /// Subjects with no remote interest (used in Optimistic mode). + public HashSet NoInterestSet { get; } = new(StringComparer.Ordinal); + + /// Subjects/patterns with positive remote interest (used in InterestOnly mode). + public HashSet InterestSet { get; } = new(StringComparer.Ordinal); + } +} diff --git a/tests/NATS.Server.Tests/Gateways/GatewayInterestTrackerTests.cs b/tests/NATS.Server.Tests/Gateways/GatewayInterestTrackerTests.cs new file mode 100644 index 0000000..06d8af6 --- /dev/null +++ b/tests/NATS.Server.Tests/Gateways/GatewayInterestTrackerTests.cs @@ -0,0 +1,241 @@ +// Go: gateway.go:100-150 (InterestMode enum), gateway.go:1500-1600 (switchToInterestOnlyMode) +using NATS.Server.Gateways; + +namespace NATS.Server.Tests.Gateways; + +/// +/// Unit tests for GatewayInterestTracker — the per-connection interest mode state machine. +/// Covers Optimistic/InterestOnly modes, threshold-based switching, and per-account isolation. +/// Go reference: gateway_test.go, TestGatewaySwitchToInterestOnlyModeImmediately (line 6934), +/// TestGatewayAccountInterest (line 1794), TestGatewayAccountUnsub (line 1912). +/// +public class GatewayInterestTrackerTests +{ + // Go: TestGatewayBasic server/gateway_test.go:399 — initial state is Optimistic + [Fact] + public void StartsInOptimisticMode() + { + var tracker = new GatewayInterestTracker(); + + tracker.GetMode("$G").ShouldBe(GatewayInterestMode.Optimistic); + tracker.GetMode("ACCT_A").ShouldBe(GatewayInterestMode.Optimistic); + tracker.GetMode("ANY_ACCOUNT").ShouldBe(GatewayInterestMode.Optimistic); + } + + // Go: TestGatewayBasic server/gateway_test.go:399 — optimistic mode forwards everything + [Fact] + public void OptimisticForwardsEverything() + { + var tracker = new GatewayInterestTracker(); + + tracker.ShouldForward("$G", "any.subject").ShouldBeTrue(); + tracker.ShouldForward("$G", "orders.created").ShouldBeTrue(); + tracker.ShouldForward("$G", "deeply.nested.subject.path").ShouldBeTrue(); + tracker.ShouldForward("ACCT", "foo").ShouldBeTrue(); + } + + // Go: TestGatewayAccountUnsub server/gateway_test.go:1912 — RS- adds to no-interest + [Fact] + public void TrackNoInterest_AddsToNoInterestSet() + { + var tracker = new GatewayInterestTracker(); + + tracker.TrackNoInterest("$G", "orders.created"); + + // Should not forward that specific subject in Optimistic mode + tracker.ShouldForward("$G", "orders.created").ShouldBeFalse(); + // Other subjects still forwarded + tracker.ShouldForward("$G", "orders.updated").ShouldBeTrue(); + tracker.ShouldForward("$G", "payments.created").ShouldBeTrue(); + } + + // Go: TestGatewaySwitchToInterestOnlyModeImmediately server/gateway_test.go:6934 — threshold switch + [Fact] + public void SwitchesToInterestOnlyAfterThreshold() + { + const int threshold = 10; + var tracker = new GatewayInterestTracker(noInterestThreshold: threshold); + + tracker.GetMode("$G").ShouldBe(GatewayInterestMode.Optimistic); + + // Add subjects up to (but not reaching) the threshold + for (int i = 0; i < threshold - 1; i++) + tracker.TrackNoInterest("$G", $"subject.{i}"); + + tracker.GetMode("$G").ShouldBe(GatewayInterestMode.Optimistic); + + // One more crosses the threshold + tracker.TrackNoInterest("$G", $"subject.{threshold - 1}"); + + tracker.GetMode("$G").ShouldBe(GatewayInterestMode.InterestOnly); + } + + // Go: TestGatewaySwitchToInterestOnlyModeImmediately server/gateway_test.go:6934 + [Fact] + public void InterestOnlyMode_OnlyForwardsTrackedSubjects() + { + const int threshold = 5; + var tracker = new GatewayInterestTracker(noInterestThreshold: threshold); + + // Trigger mode switch + for (int i = 0; i < threshold; i++) + tracker.TrackNoInterest("$G", $"noise.{i}"); + + tracker.GetMode("$G").ShouldBe(GatewayInterestMode.InterestOnly); + + // Nothing forwarded until interest is explicitly tracked + tracker.ShouldForward("$G", "orders.created").ShouldBeFalse(); + + // Track a positive interest + tracker.TrackInterest("$G", "orders.created"); + + // Now only that subject is forwarded + tracker.ShouldForward("$G", "orders.created").ShouldBeTrue(); + tracker.ShouldForward("$G", "orders.updated").ShouldBeFalse(); + tracker.ShouldForward("$G", "payments.done").ShouldBeFalse(); + } + + // Go: TestGatewaySubjectInterest server/gateway_test.go:1972 — wildcard interest in InterestOnly + [Fact] + public void InterestOnlyMode_SupportsWildcards() + { + const int threshold = 3; + var tracker = new GatewayInterestTracker(noInterestThreshold: threshold); + + // Trigger InterestOnly mode + for (int i = 0; i < threshold; i++) + tracker.TrackNoInterest("$G", $"x.{i}"); + + tracker.GetMode("$G").ShouldBe(GatewayInterestMode.InterestOnly); + + // Register a wildcard interest + tracker.TrackInterest("$G", "foo.>"); + + // Matching subjects are forwarded + tracker.ShouldForward("$G", "foo.bar").ShouldBeTrue(); + tracker.ShouldForward("$G", "foo.bar.baz").ShouldBeTrue(); + tracker.ShouldForward("$G", "foo.anything.deep.nested").ShouldBeTrue(); + + // Non-matching subjects are not forwarded + tracker.ShouldForward("$G", "other.subject").ShouldBeFalse(); + tracker.ShouldForward("$G", "foo").ShouldBeFalse(); // "foo.>" requires at least one token after "foo" + } + + // Go: TestGatewayAccountInterest server/gateway_test.go:1794 — per-account mode isolation + [Fact] + public void ModePerAccount() + { + const int threshold = 5; + var tracker = new GatewayInterestTracker(noInterestThreshold: threshold); + + // Switch ACCT_A to InterestOnly + for (int i = 0; i < threshold; i++) + tracker.TrackNoInterest("ACCT_A", $"noise.{i}"); + + tracker.GetMode("ACCT_A").ShouldBe(GatewayInterestMode.InterestOnly); + + // ACCT_B remains Optimistic + tracker.GetMode("ACCT_B").ShouldBe(GatewayInterestMode.Optimistic); + + // ACCT_A blocks unknown subjects, ACCT_B forwards + tracker.ShouldForward("ACCT_A", "orders.created").ShouldBeFalse(); + tracker.ShouldForward("ACCT_B", "orders.created").ShouldBeTrue(); + } + + // Go: TestGatewaySwitchToInterestOnlyModeImmediately server/gateway_test.go:6934 + [Fact] + public void ModePersistsAfterSwitch() + { + const int threshold = 3; + var tracker = new GatewayInterestTracker(noInterestThreshold: threshold); + + // Trigger switch + for (int i = 0; i < threshold; i++) + tracker.TrackNoInterest("$G", $"y.{i}"); + + tracker.GetMode("$G").ShouldBe(GatewayInterestMode.InterestOnly); + + // TrackInterest in InterestOnly mode — mode stays InterestOnly + tracker.TrackInterest("$G", "orders.created"); + tracker.GetMode("$G").ShouldBe(GatewayInterestMode.InterestOnly); + + // TrackNoInterest in InterestOnly mode — mode stays InterestOnly + tracker.TrackNoInterest("$G", "something.else"); + tracker.GetMode("$G").ShouldBe(GatewayInterestMode.InterestOnly); + } + + // Go: TestGatewayAccountInterest server/gateway_test.go:1794 — explicit SwitchToInterestOnly + [Fact] + public void ExplicitSwitchToInterestOnly_SetsMode() + { + var tracker = new GatewayInterestTracker(); + + tracker.GetMode("$G").ShouldBe(GatewayInterestMode.Optimistic); + + tracker.SwitchToInterestOnly("$G"); + + tracker.GetMode("$G").ShouldBe(GatewayInterestMode.InterestOnly); + } + + // Go: TestGatewayAccountUnsub server/gateway_test.go:1912 — RS+ restores interest after RS- + [Fact] + public void TrackInterest_InOptimisticMode_RemovesFromNoInterestSet() + { + var tracker = new GatewayInterestTracker(); + + // Mark no interest + tracker.TrackNoInterest("$G", "orders.created"); + tracker.ShouldForward("$G", "orders.created").ShouldBeFalse(); + + // Remote re-subscribes — track interest again + tracker.TrackInterest("$G", "orders.created"); + tracker.ShouldForward("$G", "orders.created").ShouldBeTrue(); + } + + // Go: TestGatewaySwitchToInterestOnlyModeImmediately server/gateway_test.go:6934 + [Fact] + public void InterestOnlyMode_TrackNoInterest_RemovesFromInterestSet() + { + const int threshold = 3; + var tracker = new GatewayInterestTracker(noInterestThreshold: threshold); + + // Trigger InterestOnly + for (int i = 0; i < threshold; i++) + tracker.TrackNoInterest("$G", $"z.{i}"); + + tracker.TrackInterest("$G", "orders.created"); + tracker.ShouldForward("$G", "orders.created").ShouldBeTrue(); + + // Remote unsubscribes — subject removed from interest set + tracker.TrackNoInterest("$G", "orders.created"); + tracker.ShouldForward("$G", "orders.created").ShouldBeFalse(); + } + + // Go: TestGatewaySubjectInterest server/gateway_test.go:1972 — pwc wildcard in InterestOnly + [Fact] + public void InterestOnlyMode_SupportsPwcWildcard() + { + const int threshold = 3; + var tracker = new GatewayInterestTracker(noInterestThreshold: threshold); + + for (int i = 0; i < threshold; i++) + tracker.TrackNoInterest("$G", $"n.{i}"); + + tracker.TrackInterest("$G", "orders.*"); + + tracker.ShouldForward("$G", "orders.created").ShouldBeTrue(); + tracker.ShouldForward("$G", "orders.deleted").ShouldBeTrue(); + tracker.ShouldForward("$G", "orders.deep.nested").ShouldBeFalse(); // * is single token + tracker.ShouldForward("$G", "payments.created").ShouldBeFalse(); + } + + // Go: TestGatewayAccountInterest server/gateway_test.go:1794 — unknown account defaults optimistic + [Fact] + public void UnknownAccount_DefaultsToOptimisticForwarding() + { + var tracker = new GatewayInterestTracker(); + + // Account never seen — should forward everything + tracker.ShouldForward("BRAND_NEW_ACCOUNT", "any.subject").ShouldBeTrue(); + } +}