feat(gateways): implement GatewayInterestTracker for interest-only mode state machine (D1)

Ports the gateway interest-only mode from Go (gateway.go:100-150, 1500-1600):

- Add GatewayInterestTracker with Optimistic/Transitioning/InterestOnly modes
- In Optimistic mode, track no-interest set; switch to InterestOnly when set
  exceeds threshold (default 1000, matching Go defaultGatewayMaxRUnsubThreshold)
- In InterestOnly mode, only forward subjects with tracked RS+ interest;
  use SubjectMatch.MatchLiteral for wildcard pattern support
- Integrate tracker into GatewayConnection: A+/A- messages update tracker,
  SendMessageAsync skips send when ShouldForward returns false
- Expose InterestTracker property on GatewayConnection for observability
- Add 13 unit tests covering all 8 specified behaviors plus edge cases
This commit is contained in:
Joseph Doherty
2026-02-24 15:00:23 -05:00
parent b6c373c5e4
commit 27faf64548
3 changed files with 452 additions and 4 deletions

View File

@@ -9,6 +9,7 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable
private readonly NetworkStream _stream = new(socket, ownsSocket: true);
private readonly SemaphoreSlim _writeGate = new(1, 1);
private readonly CancellationTokenSource _closedCts = new();
private readonly GatewayInterestTracker _interestTracker = new();
private Task? _loopTask;
public string? RemoteId { get; private set; }
@@ -16,6 +17,12 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable
public Func<RemoteSubscription, Task>? RemoteSubscriptionReceived { get; set; }
public Func<GatewayMessage, Task>? MessageReceived { get; set; }
/// <summary>
/// Per-connection interest mode tracker.
/// Go: gateway.go:100-150 — each outbound gateway connection maintains its own interest state.
/// </summary>
public GatewayInterestTracker InterestTracker => _interestTracker;
public async Task PerformOutboundHandshakeAsync(string serverId, CancellationToken ct)
{
await WriteLineAsync($"GATEWAY {serverId}", ct);
@@ -50,6 +57,10 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable
public async Task SendMessageAsync(string account, string subject, string? replyTo, ReadOnlyMemory<byte> payload, CancellationToken ct)
{
// Go: gateway.go:2900 (shouldForwardMsg) — check interest tracker before sending
if (!_interestTracker.ShouldForward(account, subject))
return;
var reply = string.IsNullOrEmpty(replyTo) ? "-" : replyTo;
await _writeGate.WaitAsync(ct);
try
@@ -94,9 +105,12 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable
if (line.StartsWith("A+ ", StringComparison.Ordinal))
{
var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries);
if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var parsedAccount, out var parsedSubject, out var queue))
if (TryParseAccountScopedInterest(parts, out var parsedAccount, out var parsedSubject, out var queue))
{
await RemoteSubscriptionReceived(new RemoteSubscription(parsedSubject, queue, RemoteId ?? string.Empty, parsedAccount));
// Go: gateway.go:1540 — track positive interest on A+
_interestTracker.TrackInterest(parsedAccount, parsedSubject);
if (RemoteSubscriptionReceived != null)
await RemoteSubscriptionReceived(new RemoteSubscription(parsedSubject, queue, RemoteId ?? string.Empty, parsedAccount));
}
continue;
}
@@ -104,9 +118,12 @@ public sealed class GatewayConnection(Socket socket) : IAsyncDisposable
if (line.StartsWith("A- ", StringComparison.Ordinal))
{
var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries);
if (RemoteSubscriptionReceived != null && TryParseAccountScopedInterest(parts, out var parsedAccount, out var parsedSubject, out var queue))
if (TryParseAccountScopedInterest(parts, out var parsedAccount, out var parsedSubject, out var queue))
{
await RemoteSubscriptionReceived(RemoteSubscription.Removal(parsedSubject, queue, RemoteId ?? string.Empty, parsedAccount));
// Go: gateway.go:1560 — track no-interest on A-, may trigger mode switch
_interestTracker.TrackNoInterest(parsedAccount, parsedSubject);
if (RemoteSubscriptionReceived != null)
await RemoteSubscriptionReceived(RemoteSubscription.Removal(parsedSubject, queue, RemoteId ?? string.Empty, parsedAccount));
}
continue;
}

View File

@@ -0,0 +1,190 @@
// Go: gateway.go:100-150 (InterestMode enum)
// Go: gateway.go:1500-1600 (switchToInterestOnlyMode)
using System.Collections.Concurrent;
using NATS.Server.Subscriptions;
namespace NATS.Server.Gateways;
/// <summary>
/// Tracks the interest mode for each account on a gateway connection.
/// In Optimistic mode, all messages are forwarded unless a subject is in the
/// no-interest set. Once the no-interest set exceeds the threshold (1000),
/// the account switches to InterestOnly mode where only subjects with tracked
/// RS+ interest are forwarded.
/// </summary>
public enum GatewayInterestMode
{
/// <summary>Forward everything (initial state). Track subjects with no interest.</summary>
Optimistic,
/// <summary>Mode transition in progress.</summary>
Transitioning,
/// <summary>Only forward subjects with known remote interest (RS+ received).</summary>
InterestOnly,
}
/// <summary>
/// Per-account interest state machine for a gateway connection.
/// Go reference: gateway.go:100-150 (struct srvGateway, interestMode fields),
/// gateway.go:1500-1600 (switchToInterestOnlyMode, processGatewayAccountUnsub).
/// </summary>
public sealed class GatewayInterestTracker
{
/// <summary>
/// Number of no-interest subjects before switching to InterestOnly mode.
/// Go: gateway.go:134 (defaultGatewayMaxRUnsubThreshold = 1000)
/// </summary>
public const int DefaultNoInterestThreshold = 1000;
private readonly int _noInterestThreshold;
// Per-account state: mode + no-interest set (Optimistic) or positive interest set (InterestOnly)
private readonly ConcurrentDictionary<string, AccountState> _accounts = new(StringComparer.Ordinal);
public GatewayInterestTracker(int noInterestThreshold = DefaultNoInterestThreshold)
{
_noInterestThreshold = noInterestThreshold;
}
/// <summary>
/// Returns the current interest mode for the given account.
/// Accounts default to Optimistic until the no-interest threshold is exceeded.
/// </summary>
public GatewayInterestMode GetMode(string account)
=> _accounts.TryGetValue(account, out var state) ? state.Mode : GatewayInterestMode.Optimistic;
/// <summary>
/// Track a positive interest (RS+ received from remote) for an account/subject.
/// Go: gateway.go:1540 (processGatewayAccountSub — adds to interest set)
/// </summary>
public void TrackInterest(string account, string subject)
{
var state = GetOrCreateState(account);
lock (state)
{
// In Optimistic mode, remove from no-interest set if present
if (state.Mode == GatewayInterestMode.Optimistic)
{
state.NoInterestSet.Remove(subject);
return;
}
// In InterestOnly mode, add to the positive interest set
if (state.Mode == GatewayInterestMode.InterestOnly)
{
state.InterestSet.Add(subject);
}
}
}
/// <summary>
/// Track a no-interest event (RS- received from remote) for an account/subject.
/// When the no-interest set crosses the threshold, switches to InterestOnly mode.
/// Go: gateway.go:1560 (processGatewayAccountUnsub — tracks no-interest, triggers switch)
/// </summary>
public void TrackNoInterest(string account, string subject)
{
var state = GetOrCreateState(account);
lock (state)
{
if (state.Mode == GatewayInterestMode.InterestOnly)
{
// In InterestOnly mode, remove from positive interest set
state.InterestSet.Remove(subject);
return;
}
if (state.Mode == GatewayInterestMode.Optimistic)
{
state.NoInterestSet.Add(subject);
if (state.NoInterestSet.Count >= _noInterestThreshold)
DoSwitchToInterestOnly(state);
}
}
}
/// <summary>
/// Determines whether a message should be forwarded to the remote gateway
/// for the given account and subject.
/// Go: gateway.go:2900 (shouldForwardMsg — checks mode and interest)
/// </summary>
public bool ShouldForward(string account, string subject)
{
if (!_accounts.TryGetValue(account, out var state))
return true; // Optimistic by default — no state yet means forward
lock (state)
{
return state.Mode switch
{
GatewayInterestMode.Optimistic =>
// Forward unless subject is in no-interest set
!state.NoInterestSet.Contains(subject),
GatewayInterestMode.Transitioning =>
// During transition, be conservative and forward
true,
GatewayInterestMode.InterestOnly =>
// Only forward if at least one interest pattern matches
MatchesAnyInterest(state, subject),
_ => true,
};
}
}
/// <summary>
/// Explicitly switch an account to InterestOnly mode.
/// Called when the remote signals it is in interest-only mode.
/// Go: gateway.go:1500 (switchToInterestOnlyMode)
/// </summary>
public void SwitchToInterestOnly(string account)
{
var state = GetOrCreateState(account);
lock (state)
{
if (state.Mode != GatewayInterestMode.InterestOnly)
DoSwitchToInterestOnly(state);
}
}
// ── Private helpers ────────────────────────────────────────────────
private AccountState GetOrCreateState(string account)
=> _accounts.GetOrAdd(account, _ => new AccountState());
private static void DoSwitchToInterestOnly(AccountState state)
{
// Go: gateway.go:1510-1530 — clear no-interest, build positive interest from what remains
state.Mode = GatewayInterestMode.InterestOnly;
state.NoInterestSet.Clear();
// InterestSet starts empty; subsequent RS+ events will populate it
}
private static bool MatchesAnyInterest(AccountState state, string subject)
{
foreach (var pattern in state.InterestSet)
{
// Use SubjectMatch.MatchLiteral to support wildcard patterns in the interest set
if (SubjectMatch.MatchLiteral(subject, pattern))
return true;
}
return false;
}
/// <summary>Per-account mutable state. All access must be under the instance lock.</summary>
private sealed class AccountState
{
public GatewayInterestMode Mode { get; set; } = GatewayInterestMode.Optimistic;
/// <summary>Subjects with no remote interest (used in Optimistic mode).</summary>
public HashSet<string> NoInterestSet { get; } = new(StringComparer.Ordinal);
/// <summary>Subjects/patterns with positive remote interest (used in InterestOnly mode).</summary>
public HashSet<string> InterestSet { get; } = new(StringComparer.Ordinal);
}
}

View File

@@ -0,0 +1,241 @@
// Go: gateway.go:100-150 (InterestMode enum), gateway.go:1500-1600 (switchToInterestOnlyMode)
using NATS.Server.Gateways;
namespace NATS.Server.Tests.Gateways;
/// <summary>
/// Unit tests for GatewayInterestTracker — the per-connection interest mode state machine.
/// Covers Optimistic/InterestOnly modes, threshold-based switching, and per-account isolation.
/// Go reference: gateway_test.go, TestGatewaySwitchToInterestOnlyModeImmediately (line 6934),
/// TestGatewayAccountInterest (line 1794), TestGatewayAccountUnsub (line 1912).
/// </summary>
public class GatewayInterestTrackerTests
{
// Go: TestGatewayBasic server/gateway_test.go:399 — initial state is Optimistic
[Fact]
public void StartsInOptimisticMode()
{
var tracker = new GatewayInterestTracker();
tracker.GetMode("$G").ShouldBe(GatewayInterestMode.Optimistic);
tracker.GetMode("ACCT_A").ShouldBe(GatewayInterestMode.Optimistic);
tracker.GetMode("ANY_ACCOUNT").ShouldBe(GatewayInterestMode.Optimistic);
}
// Go: TestGatewayBasic server/gateway_test.go:399 — optimistic mode forwards everything
[Fact]
public void OptimisticForwardsEverything()
{
var tracker = new GatewayInterestTracker();
tracker.ShouldForward("$G", "any.subject").ShouldBeTrue();
tracker.ShouldForward("$G", "orders.created").ShouldBeTrue();
tracker.ShouldForward("$G", "deeply.nested.subject.path").ShouldBeTrue();
tracker.ShouldForward("ACCT", "foo").ShouldBeTrue();
}
// Go: TestGatewayAccountUnsub server/gateway_test.go:1912 — RS- adds to no-interest
[Fact]
public void TrackNoInterest_AddsToNoInterestSet()
{
var tracker = new GatewayInterestTracker();
tracker.TrackNoInterest("$G", "orders.created");
// Should not forward that specific subject in Optimistic mode
tracker.ShouldForward("$G", "orders.created").ShouldBeFalse();
// Other subjects still forwarded
tracker.ShouldForward("$G", "orders.updated").ShouldBeTrue();
tracker.ShouldForward("$G", "payments.created").ShouldBeTrue();
}
// Go: TestGatewaySwitchToInterestOnlyModeImmediately server/gateway_test.go:6934 — threshold switch
[Fact]
public void SwitchesToInterestOnlyAfterThreshold()
{
const int threshold = 10;
var tracker = new GatewayInterestTracker(noInterestThreshold: threshold);
tracker.GetMode("$G").ShouldBe(GatewayInterestMode.Optimistic);
// Add subjects up to (but not reaching) the threshold
for (int i = 0; i < threshold - 1; i++)
tracker.TrackNoInterest("$G", $"subject.{i}");
tracker.GetMode("$G").ShouldBe(GatewayInterestMode.Optimistic);
// One more crosses the threshold
tracker.TrackNoInterest("$G", $"subject.{threshold - 1}");
tracker.GetMode("$G").ShouldBe(GatewayInterestMode.InterestOnly);
}
// Go: TestGatewaySwitchToInterestOnlyModeImmediately server/gateway_test.go:6934
[Fact]
public void InterestOnlyMode_OnlyForwardsTrackedSubjects()
{
const int threshold = 5;
var tracker = new GatewayInterestTracker(noInterestThreshold: threshold);
// Trigger mode switch
for (int i = 0; i < threshold; i++)
tracker.TrackNoInterest("$G", $"noise.{i}");
tracker.GetMode("$G").ShouldBe(GatewayInterestMode.InterestOnly);
// Nothing forwarded until interest is explicitly tracked
tracker.ShouldForward("$G", "orders.created").ShouldBeFalse();
// Track a positive interest
tracker.TrackInterest("$G", "orders.created");
// Now only that subject is forwarded
tracker.ShouldForward("$G", "orders.created").ShouldBeTrue();
tracker.ShouldForward("$G", "orders.updated").ShouldBeFalse();
tracker.ShouldForward("$G", "payments.done").ShouldBeFalse();
}
// Go: TestGatewaySubjectInterest server/gateway_test.go:1972 — wildcard interest in InterestOnly
[Fact]
public void InterestOnlyMode_SupportsWildcards()
{
const int threshold = 3;
var tracker = new GatewayInterestTracker(noInterestThreshold: threshold);
// Trigger InterestOnly mode
for (int i = 0; i < threshold; i++)
tracker.TrackNoInterest("$G", $"x.{i}");
tracker.GetMode("$G").ShouldBe(GatewayInterestMode.InterestOnly);
// Register a wildcard interest
tracker.TrackInterest("$G", "foo.>");
// Matching subjects are forwarded
tracker.ShouldForward("$G", "foo.bar").ShouldBeTrue();
tracker.ShouldForward("$G", "foo.bar.baz").ShouldBeTrue();
tracker.ShouldForward("$G", "foo.anything.deep.nested").ShouldBeTrue();
// Non-matching subjects are not forwarded
tracker.ShouldForward("$G", "other.subject").ShouldBeFalse();
tracker.ShouldForward("$G", "foo").ShouldBeFalse(); // "foo.>" requires at least one token after "foo"
}
// Go: TestGatewayAccountInterest server/gateway_test.go:1794 — per-account mode isolation
[Fact]
public void ModePerAccount()
{
const int threshold = 5;
var tracker = new GatewayInterestTracker(noInterestThreshold: threshold);
// Switch ACCT_A to InterestOnly
for (int i = 0; i < threshold; i++)
tracker.TrackNoInterest("ACCT_A", $"noise.{i}");
tracker.GetMode("ACCT_A").ShouldBe(GatewayInterestMode.InterestOnly);
// ACCT_B remains Optimistic
tracker.GetMode("ACCT_B").ShouldBe(GatewayInterestMode.Optimistic);
// ACCT_A blocks unknown subjects, ACCT_B forwards
tracker.ShouldForward("ACCT_A", "orders.created").ShouldBeFalse();
tracker.ShouldForward("ACCT_B", "orders.created").ShouldBeTrue();
}
// Go: TestGatewaySwitchToInterestOnlyModeImmediately server/gateway_test.go:6934
[Fact]
public void ModePersistsAfterSwitch()
{
const int threshold = 3;
var tracker = new GatewayInterestTracker(noInterestThreshold: threshold);
// Trigger switch
for (int i = 0; i < threshold; i++)
tracker.TrackNoInterest("$G", $"y.{i}");
tracker.GetMode("$G").ShouldBe(GatewayInterestMode.InterestOnly);
// TrackInterest in InterestOnly mode — mode stays InterestOnly
tracker.TrackInterest("$G", "orders.created");
tracker.GetMode("$G").ShouldBe(GatewayInterestMode.InterestOnly);
// TrackNoInterest in InterestOnly mode — mode stays InterestOnly
tracker.TrackNoInterest("$G", "something.else");
tracker.GetMode("$G").ShouldBe(GatewayInterestMode.InterestOnly);
}
// Go: TestGatewayAccountInterest server/gateway_test.go:1794 — explicit SwitchToInterestOnly
[Fact]
public void ExplicitSwitchToInterestOnly_SetsMode()
{
var tracker = new GatewayInterestTracker();
tracker.GetMode("$G").ShouldBe(GatewayInterestMode.Optimistic);
tracker.SwitchToInterestOnly("$G");
tracker.GetMode("$G").ShouldBe(GatewayInterestMode.InterestOnly);
}
// Go: TestGatewayAccountUnsub server/gateway_test.go:1912 — RS+ restores interest after RS-
[Fact]
public void TrackInterest_InOptimisticMode_RemovesFromNoInterestSet()
{
var tracker = new GatewayInterestTracker();
// Mark no interest
tracker.TrackNoInterest("$G", "orders.created");
tracker.ShouldForward("$G", "orders.created").ShouldBeFalse();
// Remote re-subscribes — track interest again
tracker.TrackInterest("$G", "orders.created");
tracker.ShouldForward("$G", "orders.created").ShouldBeTrue();
}
// Go: TestGatewaySwitchToInterestOnlyModeImmediately server/gateway_test.go:6934
[Fact]
public void InterestOnlyMode_TrackNoInterest_RemovesFromInterestSet()
{
const int threshold = 3;
var tracker = new GatewayInterestTracker(noInterestThreshold: threshold);
// Trigger InterestOnly
for (int i = 0; i < threshold; i++)
tracker.TrackNoInterest("$G", $"z.{i}");
tracker.TrackInterest("$G", "orders.created");
tracker.ShouldForward("$G", "orders.created").ShouldBeTrue();
// Remote unsubscribes — subject removed from interest set
tracker.TrackNoInterest("$G", "orders.created");
tracker.ShouldForward("$G", "orders.created").ShouldBeFalse();
}
// Go: TestGatewaySubjectInterest server/gateway_test.go:1972 — pwc wildcard in InterestOnly
[Fact]
public void InterestOnlyMode_SupportsPwcWildcard()
{
const int threshold = 3;
var tracker = new GatewayInterestTracker(noInterestThreshold: threshold);
for (int i = 0; i < threshold; i++)
tracker.TrackNoInterest("$G", $"n.{i}");
tracker.TrackInterest("$G", "orders.*");
tracker.ShouldForward("$G", "orders.created").ShouldBeTrue();
tracker.ShouldForward("$G", "orders.deleted").ShouldBeTrue();
tracker.ShouldForward("$G", "orders.deep.nested").ShouldBeFalse(); // * is single token
tracker.ShouldForward("$G", "payments.created").ShouldBeFalse();
}
// Go: TestGatewayAccountInterest server/gateway_test.go:1794 — unknown account defaults optimistic
[Fact]
public void UnknownAccount_DefaultsToOptimisticForwarding()
{
var tracker = new GatewayInterestTracker();
// Account never seen — should forward everything
tracker.ShouldForward("BRAND_NEW_ACCOUNT", "any.subject").ShouldBeTrue();
}
}