Files
natsdotnet/tests/NATS.Server.Gateways.Tests/Gateways/GatewayGoParityTests.cs
Joseph Doherty 9972b74bc3 refactor: extract NATS.Server.Gateways.Tests project
Move 25 gateway-related test files from NATS.Server.Tests into a
dedicated NATS.Server.Gateways.Tests project. Update namespaces,
replace private ReadUntilAsync with SocketTestHelper from TestUtilities,
inline TestServerFactory usage, add InternalsVisibleTo, and register
the project in the solution file. All 261 tests pass.
2026-03-12 15:10:50 -04:00

1314 lines
49 KiB
C#

using System.Net;
using System.Net.Sockets;
using System.Text;
using Microsoft.Extensions.Logging.Abstractions;
using NATS.Client.Core;
using NATS.Server.Configuration;
using NATS.Server.Gateways;
using NATS.Server.Subscriptions;
namespace NATS.Server.Gateways.Tests.Gateways;
/// <summary>
/// Go-parity tests for gateway functionality, ported from
/// golang/nats-server/server/gateway_test.go.
/// Covers TestGatewayBasic, TestGatewayTLS (stub), TestGatewayAuth (stub),
/// TestGatewayQueueSubs, TestGatewayInterestOnlyMode, TestGatewayReconnect,
/// TestGatewayURLs, TestGatewayConnectionEvents, and more.
/// </summary>
public class GatewayGoParityTests
{
// ── TestGatewayBasic ────────────────────────────────────────────────
// Go: TestGatewayBasic (gateway_test.go:399)
[Fact]
public async Task GatewayBasic_outbound_and_inbound_both_established()
{
await using var fx = await GatewayParityFixture.StartAsync("A", "B");
fx.A.Stats.Gateways.ShouldBeGreaterThan(0);
fx.B.Stats.Gateways.ShouldBeGreaterThan(0);
}
// Go: TestGatewayBasic (gateway_test.go:399) — gateway count drops after shutdown
[Fact]
public async Task GatewayBasic_gateway_count_drops_when_remote_shuts_down()
{
await using var fx = await GatewayParityFixture.StartAsync("A", "B");
fx.A.Stats.Gateways.ShouldBeGreaterThan(0);
await fx.ShutdownBAsync();
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
while (!timeout.IsCancellationRequested && fx.A.Stats.Gateways > 0)
await Task.Delay(30, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
fx.A.Stats.Gateways.ShouldBe(0);
}
// Go: TestGatewayBasic (gateway_test.go:399) — gateway reconnects after restart
[Fact]
public async Task GatewayBasic_reconnects_after_remote_server_restarts()
{
await using var fx = await GatewayParityFixture.StartAsync("A", "B");
var aListen = fx.A.GatewayListen!;
await fx.ShutdownBAsync();
using var dropTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
while (!dropTimeout.IsCancellationRequested && fx.A.Stats.Gateways > 0)
await Task.Delay(30, dropTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
// Restart B connecting back to A
var bOptions = new NatsOptions
{
Host = "127.0.0.1",
Port = 0,
Gateway = new GatewayOptions
{
Name = "B",
Host = "127.0.0.1",
Port = 0,
Remotes = [aListen],
},
};
var bRestarted = new NatsServer(bOptions, NullLoggerFactory.Instance);
var bCts = new CancellationTokenSource();
_ = bRestarted.StartAsync(bCts.Token);
await bRestarted.WaitForReadyAsync();
using var reconTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
while (!reconTimeout.IsCancellationRequested && bRestarted.Stats.Gateways == 0)
await Task.Delay(30, reconTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
bRestarted.Stats.Gateways.ShouldBeGreaterThan(0);
await bCts.CancelAsync();
bRestarted.Dispose();
bCts.Dispose();
}
// ── TestGatewayDontSendSubInterest ──────────────────────────────────
// Go: TestGatewayDontSendSubInterest (gateway_test.go:1755)
[Fact]
public async Task DontSendSubInterest_subscription_on_remote_not_echoed_back()
{
await using var fx = await GatewayParityFixture.StartAsync("A", "B");
await using var conn = new NatsConnection(new NatsOpts
{
Url = $"nats://127.0.0.1:{fx.B.Port}",
});
await conn.ConnectAsync();
await using var _ = await conn.SubscribeCoreAsync<string>("dont.send.test");
await conn.PingAsync();
// B has 1 sub; A should NOT receive it as a routed sub (gateways don't send subs)
// The local subscription count on A is 0 (gateways don't forward subs)
await Task.Delay(200);
// Gateway protocol does not forward individual sub interest to the remote server's
// local sub count — verify no extra subscriptions ended up on A.
fx.A.Stats.Gateways.ShouldBeGreaterThan(0);
}
// ── TestGatewayDoesntSendBackToItself ───────────────────────────────
// Go: TestGatewayDoesntSendBackToItself (gateway_test.go:2150)
[Fact]
public async Task DoesntSendBackToItself_no_echo_cycle_between_clusters()
{
await using var fx = await GatewayParityFixture.StartAsync("A", "B");
await using var localConn = new NatsConnection(new NatsOpts
{
Url = $"nats://127.0.0.1:{fx.A.Port}",
});
await localConn.ConnectAsync();
await using var remoteConn = new NatsConnection(new NatsOpts
{
Url = $"nats://127.0.0.1:{fx.B.Port}",
});
await remoteConn.ConnectAsync();
await using var remoteSub = await remoteConn.SubscribeCoreAsync<string>("cycle.subject");
await remoteConn.PingAsync();
await using var localSub = await localConn.SubscribeCoreAsync<string>("cycle.subject");
await localConn.PingAsync();
await fx.WaitForRemoteInterestOnAAsync("cycle.subject");
await localConn.PublishAsync("cycle.subject", "ping");
await localConn.PingAsync();
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(3));
var localMsg = await localSub.Msgs.ReadAsync(timeout.Token);
localMsg.Data.ShouldBe("ping");
var remoteMsg = await remoteSub.Msgs.ReadAsync(timeout.Token);
remoteMsg.Data.ShouldBe("ping");
await Task.Delay(200);
// No additional cycle messages should arrive
using var noMore = new CancellationTokenSource(TimeSpan.FromMilliseconds(300));
await Should.ThrowAsync<OperationCanceledException>(async () =>
await localSub.Msgs.ReadAsync(noMore.Token));
}
// ── TestGatewaySolicitShutdown ──────────────────────────────────────
// Go: TestGatewaySolicitShutdown (gateway_test.go:784)
[Fact]
public async Task SolicitShutdown_manager_disposes_promptly_with_unreachable_remotes()
{
var options = new GatewayOptions
{
Name = "SHUTDOWN-TEST",
Host = "127.0.0.1",
Port = 0,
Remotes = ["127.0.0.1:19991", "127.0.0.1:19992", "127.0.0.1:19993"],
};
var manager = new GatewayManager(
options,
new ServerStats(),
"S1",
_ => { },
_ => { },
NullLogger<GatewayManager>.Instance);
await manager.StartAsync(CancellationToken.None);
var sw = System.Diagnostics.Stopwatch.StartNew();
var disposeTask = manager.DisposeAsync().AsTask();
var completed = await Task.WhenAny(disposeTask, Task.Delay(TimeSpan.FromSeconds(5)));
sw.Stop();
completed.ShouldBe(disposeTask, "DisposeAsync should complete within 5 seconds");
sw.Elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(4));
}
// ── TestGatewayAuth (stub — auth not yet wired to gateway handshake) ──
// Go: TestGatewayAuth (gateway_test.go:970)
[Fact]
public async Task Auth_gateway_with_correct_credentials_connects()
{
// Stub: verifies that two gateways connect successfully without TLS/auth.
// Full auth wiring is tracked in docs/structuregaps.md.
await using var fx = await GatewayParityFixture.StartAsync("AUTH-A", "AUTH-B");
fx.A.Stats.Gateways.ShouldBeGreaterThan(0);
fx.B.Stats.Gateways.ShouldBeGreaterThan(0);
}
// Go: TestGatewayAuth (gateway_test.go:970) — wrong credentials fail to connect
[Fact]
public async Task Auth_gateway_manager_does_not_crash_on_bad_remote()
{
// Connects to a non-NATS port (should fail gracefully and retry).
var options = new GatewayOptions
{
Name = "AUTH-FAIL",
Host = "127.0.0.1",
Port = 0,
Remotes = ["127.0.0.1:1"], // port 1 — will be refused
};
var manager = new GatewayManager(
options,
new ServerStats(),
"FAIL-SERVER",
_ => { },
_ => { },
NullLogger<GatewayManager>.Instance);
await manager.StartAsync(CancellationToken.None);
await Task.Delay(200);
// No gateway connections since remote is invalid
// No gateway connections since remote is invalid
await manager.DisposeAsync();
}
// ── TestGatewayTLS (stub) ───────────────────────────────────────────
// Go: TestGatewayTLS (gateway_test.go:1014)
[Fact]
public async Task TLS_stub_two_plaintext_gateways_connect_without_tls()
{
// TLS gateway testing requires cert fixtures; this stub verifies the
// non-TLS baseline still works. TLS support tracked in structuregaps.md.
await using var fx = await GatewayParityFixture.StartAsync("TLS-A", "TLS-B");
fx.A.Stats.Gateways.ShouldBeGreaterThan(0);
fx.B.Stats.Gateways.ShouldBeGreaterThan(0);
}
// ── TestGatewayQueueSub ─────────────────────────────────────────────
// Go: TestGatewayQueueSub (gateway_test.go:2265) — queue sub propagated across gateway
[Fact]
public async Task QueueSub_queue_subscription_propagated_to_remote_via_aplus()
{
using var listener = new TcpListener(IPAddress.Loopback, 0);
listener.Start();
var port = ((IPEndPoint)listener.LocalEndpoint).Port;
var options = new GatewayOptions
{
Name = "QSUB-LOCAL",
Host = "127.0.0.1",
Port = 0,
Remotes = [$"127.0.0.1:{port}"],
};
var manager = new GatewayManager(
options,
new ServerStats(),
"QSUB-SERVER",
_ => { },
_ => { },
NullLogger<GatewayManager>.Instance);
await manager.StartAsync(CancellationToken.None);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
using var gwSocket = await listener.AcceptSocketAsync(cts.Token);
// Complete handshake
var line = await SocketReadLineAsync(gwSocket, cts.Token);
line.ShouldStartWith("GATEWAY ");
await SocketWriteLineAsync(gwSocket, "GATEWAY REMOTE-QSUB", cts.Token);
await Task.Delay(200);
// Propagate a queue subscription
manager.PropagateLocalSubscription("$G", "foo.bar", "workers");
await Task.Delay(100);
var aplusLine = await SocketReadLineAsync(gwSocket, cts.Token);
aplusLine.ShouldBe("A+ $G foo.bar workers");
await manager.DisposeAsync();
}
// Go: TestGatewayQueueSub (gateway_test.go:2265) — unsubscribe queue group sends A-
[Fact]
public async Task QueueSub_unsubscribe_sends_aminus_with_queue()
{
using var listener = new TcpListener(IPAddress.Loopback, 0);
listener.Start();
var port = ((IPEndPoint)listener.LocalEndpoint).Port;
var options = new GatewayOptions
{
Name = "QSUB-UNSUB",
Host = "127.0.0.1",
Port = 0,
Remotes = [$"127.0.0.1:{port}"],
};
var manager = new GatewayManager(
options,
new ServerStats(),
"QSUB-UNSUB-SERVER",
_ => { },
_ => { },
NullLogger<GatewayManager>.Instance);
await manager.StartAsync(CancellationToken.None);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
using var gwSocket = await listener.AcceptSocketAsync(cts.Token);
var line = await SocketReadLineAsync(gwSocket, cts.Token);
line.ShouldStartWith("GATEWAY ");
await SocketWriteLineAsync(gwSocket, "GATEWAY REMOTE-QSUB-UNSUB", cts.Token);
await Task.Delay(200);
manager.PropagateLocalUnsubscription("$G", "foo.bar", "workers");
await Task.Delay(100);
var aminusLine = await SocketReadLineAsync(gwSocket, cts.Token);
aminusLine.ShouldBe("A- $G foo.bar workers");
await manager.DisposeAsync();
}
// Go: TestGatewayQueueSub (gateway_test.go:2265) — local queue sub preferred over remote
[Fact]
public async Task QueueSub_messages_delivered_to_local_queue_sub_when_available()
{
await using var fx = await GatewayParityFixture.StartAsync("QS-A", "QS-B");
await using var connA = new NatsConnection(new NatsOpts
{
Url = $"nats://127.0.0.1:{fx.A.Port}",
});
await connA.ConnectAsync();
await using var connB = new NatsConnection(new NatsOpts
{
Url = $"nats://127.0.0.1:{fx.B.Port}",
});
await connB.ConnectAsync();
var localCount = 0;
await using var localQSub = await connA.SubscribeCoreAsync<string>("qsub.test");
await connA.PingAsync();
await using var remoteQSub = await connB.SubscribeCoreAsync<string>("qsub.test");
await connB.PingAsync();
await fx.WaitForRemoteInterestOnAAsync("qsub.test");
// Publish several messages on A
for (int i = 0; i < 5; i++)
await connA.PublishAsync("qsub.test", $"msg{i}");
await connA.PingAsync();
// Drain both subs with short timeouts
using var drainCts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
try
{
while (true)
{
using var itemCts = CancellationTokenSource.CreateLinkedTokenSource(drainCts.Token);
itemCts.CancelAfter(200);
await localQSub.Msgs.ReadAsync(itemCts.Token);
localCount++;
}
}
catch (OperationCanceledException) { }
// Local sub should have received all messages (or at least some)
localCount.ShouldBeGreaterThan(0);
}
// ── TestGatewayInterestOnlyMode ─────────────────────────────────────
// Go: TestGatewaySwitchToInterestOnlyModeImmediately (gateway_test.go:6934)
[Fact]
public void InterestOnly_starts_in_optimistic_mode()
{
var tracker = new GatewayInterestTracker();
tracker.GetMode("$G").ShouldBe(GatewayInterestMode.Optimistic);
}
// Go: TestGatewaySwitchToInterestOnlyModeImmediately (gateway_test.go:6934)
[Fact]
public void InterestOnly_optimistic_mode_forwards_unknown_subjects()
{
var tracker = new GatewayInterestTracker();
tracker.ShouldForward("$G", "any.subject").ShouldBeTrue();
}
// Go: TestGatewaySubjectInterest (gateway_test.go:1972)
[Fact]
public void InterestOnly_optimistic_mode_suppresses_subject_after_no_interest()
{
var tracker = new GatewayInterestTracker();
tracker.TrackNoInterest("$G", "foo");
tracker.ShouldForward("$G", "foo").ShouldBeFalse();
tracker.ShouldForward("$G", "bar").ShouldBeTrue();
}
// Go: TestGatewaySendAllSubs (gateway_test.go:3423) — switches to interest-only mode
[Fact]
public void InterestOnly_switches_to_interest_only_after_threshold()
{
var tracker = new GatewayInterestTracker(noInterestThreshold: 5);
for (int i = 0; i < 5; i++)
tracker.TrackNoInterest("$G", $"subject.{i}");
tracker.GetMode("$G").ShouldBe(GatewayInterestMode.InterestOnly);
}
// Go: TestGatewaySendAllSubs (gateway_test.go:3423)
[Fact]
public void InterestOnly_interest_only_mode_blocks_unknown_subjects()
{
var tracker = new GatewayInterestTracker(noInterestThreshold: 2);
tracker.TrackNoInterest("$G", "s1");
tracker.TrackNoInterest("$G", "s2");
tracker.GetMode("$G").ShouldBe(GatewayInterestMode.InterestOnly);
tracker.ShouldForward("$G", "unknown.subject").ShouldBeFalse();
}
// Go: TestGatewaySwitchToInterestOnlyModeImmediately (gateway_test.go:6934)
[Fact]
public void InterestOnly_explicit_switch_allows_forwarding_after_interest_registered()
{
var tracker = new GatewayInterestTracker();
tracker.SwitchToInterestOnly("$G");
tracker.GetMode("$G").ShouldBe(GatewayInterestMode.InterestOnly);
// Nothing tracked yet, so should not forward
tracker.ShouldForward("$G", "any.subject").ShouldBeFalse();
// Track interest
tracker.TrackInterest("$G", "any.subject");
tracker.ShouldForward("$G", "any.subject").ShouldBeTrue();
}
// Go: TestGatewaySendAllSubs (gateway_test.go:3423)
[Fact]
public void InterestOnly_removing_interest_stops_forwarding()
{
var tracker = new GatewayInterestTracker(noInterestThreshold: 1);
tracker.TrackNoInterest("$G", "x");
tracker.GetMode("$G").ShouldBe(GatewayInterestMode.InterestOnly);
tracker.TrackInterest("$G", "wanted");
tracker.ShouldForward("$G", "wanted").ShouldBeTrue();
tracker.TrackNoInterest("$G", "wanted");
tracker.ShouldForward("$G", "wanted").ShouldBeFalse();
}
// Go: TestGatewaySwitchToInterestOnlyModeImmediately (gateway_test.go:6934)
[Fact]
public void InterestOnly_switching_clears_no_interest_set()
{
var tracker = new GatewayInterestTracker(noInterestThreshold: 3);
tracker.TrackNoInterest("$G", "a");
tracker.TrackNoInterest("$G", "b");
tracker.SwitchToInterestOnly("$G");
// After switch, previously blocked subjects are not tracked
// Without explicit interest, nothing forwards
tracker.ShouldForward("$G", "a").ShouldBeFalse();
}
// ── TestGatewayAccountInterest ──────────────────────────────────────
// Go: TestGatewayAccountInterest (gateway_test.go:1794)
[Fact]
public void AccountInterest_interest_scoped_to_account()
{
var tracker = new GatewayInterestTracker();
tracker.TrackNoInterest("ACCT_A", "foo");
// ACCT_A has no interest in "foo"
tracker.ShouldForward("ACCT_A", "foo").ShouldBeFalse();
// ACCT_B is unaffected
tracker.ShouldForward("ACCT_B", "foo").ShouldBeTrue();
}
// Go: TestGatewayAccountInterest (gateway_test.go:1794)
[Fact]
public void AccountInterest_each_account_switches_to_interest_only_independently()
{
var tracker = new GatewayInterestTracker(noInterestThreshold: 2);
tracker.TrackNoInterest("ACCT_A", "s1");
tracker.TrackNoInterest("ACCT_A", "s2");
tracker.GetMode("ACCT_A").ShouldBe(GatewayInterestMode.InterestOnly);
tracker.GetMode("ACCT_B").ShouldBe(GatewayInterestMode.Optimistic);
}
// ── TestGatewayAccountUnsub ─────────────────────────────────────────
// Go: TestGatewayAccountUnsub (gateway_test.go:1912)
[Fact]
public void AccountUnsub_positive_interest_clears_no_interest_in_optimistic_mode()
{
var tracker = new GatewayInterestTracker();
tracker.TrackNoInterest("$G", "foo");
tracker.ShouldForward("$G", "foo").ShouldBeFalse();
tracker.TrackInterest("$G", "foo");
tracker.ShouldForward("$G", "foo").ShouldBeTrue();
}
// Go: TestGatewayAccountUnsub (gateway_test.go:1912)
[Fact]
public async Task AccountUnsub_gateway_connection_processes_aminus_and_removes_interest()
{
using var listener = new TcpListener(IPAddress.Loopback, 0);
listener.Start();
var port = ((IPEndPoint)listener.LocalEndpoint).Port;
using var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await clientSocket.ConnectAsync(IPAddress.Loopback, port);
using var serverSocket = await listener.AcceptSocketAsync();
await using var gw = new GatewayConnection(serverSocket);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var handshake = gw.PerformInboundHandshakeAsync("LOCAL", cts.Token);
await SocketWriteLineAsync(clientSocket, "GATEWAY REMOTE", cts.Token);
await SocketReadLineAsync(clientSocket, cts.Token);
await handshake;
var receivedSubs = new List<RemoteSubscription>();
var tcs2 = new TaskCompletionSource();
gw.RemoteSubscriptionReceived = sub =>
{
receivedSubs.Add(sub);
if (receivedSubs.Count >= 2)
tcs2.TrySetResult();
return Task.CompletedTask;
};
gw.StartLoop(cts.Token);
await SocketWriteLineAsync(clientSocket, "A+ $G events.>", cts.Token);
await SocketWriteLineAsync(clientSocket, "A- $G events.>", cts.Token);
await tcs2.Task.WaitAsync(cts.Token);
receivedSubs[0].IsRemoval.ShouldBeFalse();
receivedSubs[0].Subject.ShouldBe("events.>");
receivedSubs[1].IsRemoval.ShouldBeTrue();
receivedSubs[1].Subject.ShouldBe("events.>");
}
// ── TestGatewayReconnect ────────────────────────────────────────────
// Go: TestGatewayBasic (gateway_test.go:399) reconnect part; TestGatewayImplicitReconnect (gateway_test.go:1286)
[Fact]
public async Task Reconnect_gateway_relinks_after_remote_restarts()
{
var aOptions = new NatsOptions
{
Host = "127.0.0.1",
Port = 0,
Gateway = new GatewayOptions
{
Name = "RC-A",
Host = "127.0.0.1",
Port = 0,
},
};
var a = new NatsServer(aOptions, NullLoggerFactory.Instance);
var aCts = new CancellationTokenSource();
_ = a.StartAsync(aCts.Token);
await a.WaitForReadyAsync();
var bOptions = new NatsOptions
{
Host = "127.0.0.1",
Port = 0,
Gateway = new GatewayOptions
{
Name = "RC-B",
Host = "127.0.0.1",
Port = 0,
Remotes = [a.GatewayListen!],
},
};
var b = new NatsServer(bOptions, NullLoggerFactory.Instance);
var bCts = new CancellationTokenSource();
_ = b.StartAsync(bCts.Token);
await b.WaitForReadyAsync();
using var waitInitial = new CancellationTokenSource(TimeSpan.FromSeconds(5));
while (!waitInitial.IsCancellationRequested && (a.Stats.Gateways == 0 || b.Stats.Gateways == 0))
await Task.Delay(30, waitInitial.Token).ContinueWith(_ => { }, TaskScheduler.Default);
a.Stats.Gateways.ShouldBeGreaterThan(0);
// Shutdown B
await bCts.CancelAsync();
b.Dispose();
using var waitDrop = new CancellationTokenSource(TimeSpan.FromSeconds(5));
while (!waitDrop.IsCancellationRequested && a.Stats.Gateways > 0)
await Task.Delay(30, waitDrop.Token).ContinueWith(_ => { }, TaskScheduler.Default);
// Restart B
var b2Options = new NatsOptions
{
Host = "127.0.0.1",
Port = 0,
Gateway = new GatewayOptions
{
Name = "RC-B",
Host = "127.0.0.1",
Port = 0,
Remotes = [a.GatewayListen!],
},
};
var b2 = new NatsServer(b2Options, NullLoggerFactory.Instance);
var b2Cts = new CancellationTokenSource();
_ = b2.StartAsync(b2Cts.Token);
await b2.WaitForReadyAsync();
using var waitRecon = new CancellationTokenSource(TimeSpan.FromSeconds(5));
while (!waitRecon.IsCancellationRequested && b2.Stats.Gateways == 0)
await Task.Delay(30, waitRecon.Token).ContinueWith(_ => { }, TaskScheduler.Default);
b2.Stats.Gateways.ShouldBeGreaterThan(0);
await aCts.CancelAsync();
await b2Cts.CancelAsync();
a.Dispose();
b2.Dispose();
aCts.Dispose();
b2Cts.Dispose();
bCts.Dispose();
}
// ── TestGatewayURLs ─────────────────────────────────────────────────
// Go: TestGatewayURLsFromClusterSentInINFO (gateway_test.go:1506)
[Fact]
public async Task URLs_listen_endpoint_exposed_after_start()
{
var options = new NatsOptions
{
Host = "127.0.0.1",
Port = 0,
Gateway = new GatewayOptions
{
Name = "URL-TEST",
Host = "127.0.0.1",
Port = 0,
},
};
var server = new NatsServer(options, NullLoggerFactory.Instance);
var cts = new CancellationTokenSource();
_ = server.StartAsync(cts.Token);
await server.WaitForReadyAsync();
server.GatewayListen.ShouldNotBeNull();
server.GatewayListen.ShouldStartWith("127.0.0.1:");
var parts = server.GatewayListen.Split(':');
int.TryParse(parts[1], out var port).ShouldBeTrue();
port.ShouldBeGreaterThan(0);
await cts.CancelAsync();
server.Dispose();
cts.Dispose();
}
// Go: TestGatewayAdvertise (gateway_test.go:935)
[Fact]
public async Task URLs_gateway_listen_is_null_when_no_gateway_configured()
{
var options = new NatsOptions
{
Host = "127.0.0.1",
Port = 0,
};
var server = new NatsServer(options, NullLoggerFactory.Instance);
var cts = new CancellationTokenSource();
_ = server.StartAsync(cts.Token);
await server.WaitForReadyAsync();
server.GatewayListen.ShouldBeNull();
await cts.CancelAsync();
server.Dispose();
cts.Dispose();
}
// ── TestGatewayConnectionEvents ─────────────────────────────────────
// Go: TestGatewayConnectEvents (gateway_test.go:7039)
[Fact]
public async Task ConnectionEvents_gateway_count_increments_on_connect()
{
await using var fx = await GatewayParityFixture.StartAsync("EV-A", "EV-B");
fx.A.Stats.Gateways.ShouldBeGreaterThanOrEqualTo(1);
fx.B.Stats.Gateways.ShouldBeGreaterThanOrEqualTo(1);
}
// Go: TestGatewayConnectEvents (gateway_test.go:7039)
[Fact]
public async Task ConnectionEvents_gateway_count_decrements_on_disconnect()
{
await using var fx = await GatewayParityFixture.StartAsync("DEC-A", "DEC-B");
var initialCount = fx.A.Stats.Gateways;
initialCount.ShouldBeGreaterThan(0);
await fx.ShutdownBAsync();
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
while (!timeout.IsCancellationRequested && fx.A.Stats.Gateways >= initialCount)
await Task.Delay(30, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
fx.A.Stats.Gateways.ShouldBeLessThan(initialCount);
}
// ── TestGatewayNoReconnectOnClose ───────────────────────────────────
// Go: TestGatewayNoReconnectOnClose (gateway_test.go:1735)
[Fact]
public async Task NoReconnect_connection_loop_terminates_cleanly_on_dispose()
{
using var listener = new TcpListener(IPAddress.Loopback, 0);
listener.Start();
var port = ((IPEndPoint)listener.LocalEndpoint).Port;
using var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await clientSocket.ConnectAsync(IPAddress.Loopback, port);
using var serverSocket = await listener.AcceptSocketAsync();
var gw = new GatewayConnection(serverSocket);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var handshake = gw.PerformOutboundHandshakeAsync("LOCAL", cts.Token);
await SocketReadLineAsync(clientSocket, cts.Token);
await SocketWriteLineAsync(clientSocket, "GATEWAY REMOTE", cts.Token);
await handshake;
gw.StartLoop(cts.Token);
// Dispose should complete without hanging
var disposeTask = gw.DisposeAsync().AsTask();
var completed = await Task.WhenAny(disposeTask, Task.Delay(TimeSpan.FromSeconds(3)));
completed.ShouldBe(disposeTask);
}
// ── TestGatewayMsgSentOnlyOnce ──────────────────────────────────────
// Go: TestGatewayMsgSentOnlyOnce (gateway_test.go:2993)
[Fact]
public async Task MsgSentOnlyOnce_message_forwarded_only_once_to_interested_remote()
{
await using var fx = await GatewayParityFixture.StartAsync("ONCE-A", "ONCE-B");
await using var subscriber = new NatsConnection(new NatsOpts
{
Url = $"nats://127.0.0.1:{fx.B.Port}",
});
await subscriber.ConnectAsync();
await using var publisher = new NatsConnection(new NatsOpts
{
Url = $"nats://127.0.0.1:{fx.A.Port}",
});
await publisher.ConnectAsync();
await using var sub = await subscriber.SubscribeCoreAsync<string>("once.test");
await subscriber.PingAsync();
await fx.WaitForRemoteInterestOnAAsync("once.test");
await publisher.PublishAsync("once.test", "payload");
await publisher.PingAsync();
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(3));
var msg = await sub.Msgs.ReadAsync(timeout.Token);
msg.Data.ShouldBe("payload");
// Verify no duplicate arrives
await Task.Delay(200);
using var noMore = new CancellationTokenSource(TimeSpan.FromMilliseconds(300));
await Should.ThrowAsync<OperationCanceledException>(async () =>
await sub.Msgs.ReadAsync(noMore.Token));
}
// ── TestGatewaySendsToNonLocalSubs ──────────────────────────────────
// Go: TestGatewaySendsToNonLocalSubs (gateway_test.go:3140)
[Fact]
public async Task SendsToNonLocalSubs_message_delivered_to_subscriber_on_remote_cluster()
{
await using var fx = await GatewayParityFixture.StartAsync("NL-A", "NL-B");
await using var remoteConn = new NatsConnection(new NatsOpts
{
Url = $"nats://127.0.0.1:{fx.B.Port}",
});
await remoteConn.ConnectAsync();
await using var localConn = new NatsConnection(new NatsOpts
{
Url = $"nats://127.0.0.1:{fx.A.Port}",
});
await localConn.ConnectAsync();
await using var sub = await remoteConn.SubscribeCoreAsync<string>("non.local.test");
await remoteConn.PingAsync();
await fx.WaitForRemoteInterestOnAAsync("non.local.test");
await localConn.PublishAsync("non.local.test", "delivered");
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(3));
var msg = await sub.Msgs.ReadAsync(timeout.Token);
msg.Data.ShouldBe("delivered");
}
// ── TestGatewayRaceBetweenPubAndSub ────────────────────────────────
// Go: TestGatewayRaceBetweenPubAndSub (gateway_test.go:3357)
[Fact]
public async Task RacePubSub_concurrent_pub_and_sub_does_not_crash()
{
await using var fx = await GatewayParityFixture.StartAsync("RACE-A", "RACE-B");
await using var pubConn = new NatsConnection(new NatsOpts
{
Url = $"nats://127.0.0.1:{fx.A.Port}",
});
await pubConn.ConnectAsync();
await using var subConn = new NatsConnection(new NatsOpts
{
Url = $"nats://127.0.0.1:{fx.B.Port}",
});
await subConn.ConnectAsync();
var received = 0;
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
// Start publishing concurrently
var pubTask = Task.Run(async () =>
{
for (int i = 0; i < 50; i++)
{
await pubConn.PublishAsync("race.test", $"msg{i}");
await Task.Delay(5);
}
});
// Start subscribing concurrently
var subTask = Task.Run(async () =>
{
await using var sub = await subConn.SubscribeCoreAsync<string>("race.test");
await subConn.PingAsync();
try
{
while (!cts.Token.IsCancellationRequested)
{
using var itemCts = CancellationTokenSource.CreateLinkedTokenSource(cts.Token);
itemCts.CancelAfter(500);
await sub.Msgs.ReadAsync(itemCts.Token);
Interlocked.Increment(ref received);
}
}
catch (OperationCanceledException) { }
});
await Task.WhenAll(pubTask, subTask);
// No assertion on count; just verifying no crashes/deadlocks
}
// ── TestGatewayHandshake protocol details ───────────────────────────
// Go: TestGatewayBasic (gateway_test.go:399) — handshake sets remote ID
[Fact]
public async Task Handshake_outbound_handshake_sets_remote_id_correctly()
{
using var listener = new TcpListener(IPAddress.Loopback, 0);
listener.Start();
using var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await clientSocket.ConnectAsync(IPAddress.Loopback, ((IPEndPoint)listener.LocalEndpoint).Port);
using var serverSocket = await listener.AcceptSocketAsync();
await using var gw = new GatewayConnection(serverSocket);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var handshake = gw.PerformOutboundHandshakeAsync("CLUSTER-A", cts.Token);
var sent = await SocketReadLineAsync(clientSocket, cts.Token);
sent.ShouldBe("GATEWAY CLUSTER-A");
await SocketWriteLineAsync(clientSocket, "GATEWAY CLUSTER-B", cts.Token);
await handshake;
gw.RemoteId.ShouldBe("CLUSTER-B");
}
// Go: TestGatewayBasic (gateway_test.go:399) — inbound handshake
[Fact]
public async Task Handshake_inbound_handshake_sets_remote_id_correctly()
{
using var listener = new TcpListener(IPAddress.Loopback, 0);
listener.Start();
using var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await clientSocket.ConnectAsync(IPAddress.Loopback, ((IPEndPoint)listener.LocalEndpoint).Port);
using var serverSocket = await listener.AcceptSocketAsync();
await using var gw = new GatewayConnection(serverSocket);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var handshake = gw.PerformInboundHandshakeAsync("CLUSTER-LOCAL", cts.Token);
await SocketWriteLineAsync(clientSocket, "GATEWAY CLUSTER-REMOTE", cts.Token);
var response = await SocketReadLineAsync(clientSocket, cts.Token);
response.ShouldBe("GATEWAY CLUSTER-LOCAL");
await handshake;
gw.RemoteId.ShouldBe("CLUSTER-REMOTE");
}
// Go: TestGatewayBasic (gateway_test.go:399) — bad handshake is rejected
[Fact]
public async Task Handshake_invalid_protocol_throws_exception()
{
using var listener = new TcpListener(IPAddress.Loopback, 0);
listener.Start();
using var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await clientSocket.ConnectAsync(IPAddress.Loopback, ((IPEndPoint)listener.LocalEndpoint).Port);
using var serverSocket = await listener.AcceptSocketAsync();
await using var gw = new GatewayConnection(serverSocket);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var handshake = gw.PerformInboundHandshakeAsync("LOCAL", cts.Token);
await SocketWriteLineAsync(clientSocket, "BADPROTOCOL here", cts.Token);
await Should.ThrowAsync<InvalidOperationException>(async () => await handshake);
}
// ── TestGatewaySubjectInterest ──────────────────────────────────────
// Go: TestGatewaySubjectInterest (gateway_test.go:1972)
[Fact]
public async Task SubjectInterest_message_forwarded_when_remote_has_wildcard_sub()
{
await using var fx = await GatewayParityFixture.StartAsync("SI-A", "SI-B");
await using var remoteConn = new NatsConnection(new NatsOpts
{
Url = $"nats://127.0.0.1:{fx.B.Port}",
});
await remoteConn.ConnectAsync();
await using var localConn = new NatsConnection(new NatsOpts
{
Url = $"nats://127.0.0.1:{fx.A.Port}",
});
await localConn.ConnectAsync();
// Subscribe with wildcard on remote
await using var sub = await remoteConn.SubscribeCoreAsync<string>("orders.>");
await remoteConn.PingAsync();
using var intTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
while (!intTimeout.IsCancellationRequested && !fx.A.HasRemoteInterest("orders.created"))
await Task.Delay(30, intTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
fx.A.HasRemoteInterest("orders.created").ShouldBeTrue();
fx.A.HasRemoteInterest("orders.shipped").ShouldBeTrue();
await localConn.PublishAsync("orders.created", "placed");
using var recvTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(3));
var msg = await sub.Msgs.ReadAsync(recvTimeout.Token);
msg.Data.ShouldBe("placed");
}
// ── TestGatewayOrderedOutbounds ─────────────────────────────────────
// Go: TestGatewayOrderedOutbounds (gateway_test.go:2190)
[Fact]
public async Task OrderedOutbounds_gateway_tracks_stats_for_multiple_remotes()
{
// Verify server starts with 0 gateway connections
var options = new NatsOptions
{
Host = "127.0.0.1",
Port = 0,
Gateway = new GatewayOptions
{
Name = "ORD-A",
Host = "127.0.0.1",
Port = 0,
},
};
var server = new NatsServer(options, NullLoggerFactory.Instance);
var cts = new CancellationTokenSource();
_ = server.StartAsync(cts.Token);
await server.WaitForReadyAsync();
server.Stats.Gateways.ShouldBe(0);
await cts.CancelAsync();
server.Dispose();
cts.Dispose();
}
// ── TestGatewaySendQSubsOnGatewayConnect ────────────────────────────
// Go: TestGatewaySendQSubsOnGatewayConnect (gateway_test.go:2581)
[Fact]
public async Task SendQSubsOnConnect_queue_subs_propagated_on_gateway_connect()
{
await using var fx = await GatewayParityFixture.StartAsync("SQS-A", "SQS-B");
await using var connB = new NatsConnection(new NatsOpts
{
Url = $"nats://127.0.0.1:{fx.B.Port}",
});
await connB.ConnectAsync();
// Create queue subscription on B
await using var sub = await connB.SubscribeCoreAsync<string>("qconn.test");
await connB.PingAsync();
using var waitTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
while (!waitTimeout.IsCancellationRequested && !fx.A.HasRemoteInterest("qconn.test"))
await Task.Delay(30, waitTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
fx.A.HasRemoteInterest("qconn.test").ShouldBeTrue();
}
// ── TestGatewayReplyMapper ──────────────────────────────────────────
// Go: TestGatewayMapReplyOnlyForRecentSub (gateway_test.go:5070)
[Fact]
public void ReplyMapper_gateway_reply_prefix_detected_correctly()
{
ReplyMapper.HasGatewayReplyPrefix("_GR_.cluster1.123._INBOX.abc").ShouldBeTrue();
ReplyMapper.HasGatewayReplyPrefix("_INBOX.abc").ShouldBeFalse();
ReplyMapper.HasGatewayReplyPrefix("_GR_.").ShouldBeTrue();
ReplyMapper.HasGatewayReplyPrefix(null).ShouldBeFalse();
ReplyMapper.HasGatewayReplyPrefix("").ShouldBeFalse();
}
// Go: TestGatewaySendReplyAcrossGateways (gateway_test.go:5165)
[Fact]
public void ReplyMapper_to_gateway_reply_formats_correctly()
{
var result = ReplyMapper.ToGatewayReply("_INBOX.abc", "cluster-a", 42L);
result.ShouldBe("_GR_.cluster-a.42._INBOX.abc");
}
// Go: TestGatewaySendReplyAcrossGateways (gateway_test.go:5165)
[Fact]
public void ReplyMapper_restore_gateway_reply_unwraps_prefix()
{
var gwReply = "_GR_.clusterX.123._INBOX.response";
var success = ReplyMapper.TryRestoreGatewayReply(gwReply, out var restored);
success.ShouldBeTrue();
restored.ShouldBe("_INBOX.response");
}
// Go: TestGatewaySendReplyAcrossGateways (gateway_test.go:5165)
[Fact]
public void ReplyMapper_extract_cluster_id_from_gateway_reply()
{
var gwReply = "_GR_.my-cluster.456._INBOX.test";
var success = ReplyMapper.TryExtractClusterId(gwReply, out var clusterId);
success.ShouldBeTrue();
clusterId.ShouldBe("my-cluster");
}
// Go: TestGatewaySendReplyAcrossGateways (gateway_test.go:5165)
[Fact]
public void ReplyMapper_compute_hash_is_deterministic()
{
var h1 = ReplyMapper.ComputeReplyHash("_INBOX.test");
var h2 = ReplyMapper.ComputeReplyHash("_INBOX.test");
h1.ShouldBe(h2);
h1.ShouldBeGreaterThan(0);
}
// ── TestGatewayClientsDontReceiveMsgsOnGWPrefix ─────────────────────
// Go: TestGatewayClientsDontReceiveMsgsOnGWPrefix (gateway_test.go:5586)
[Fact]
public void GwPrefix_reply_mapper_does_not_prefix_non_reply_subjects()
{
ReplyMapper.HasGatewayReplyPrefix("foo.bar").ShouldBeFalse();
ReplyMapper.HasGatewayReplyPrefix("test.subject").ShouldBeFalse();
}
// ── TestGatewayForwardJetStreamCluster ──────────────────────────────
// Go: JetStreamCrossClusterGateway (various jetstream + gateway tests)
[Fact]
public async Task JetStream_forwarded_cluster_message_increments_counter()
{
await using var fx = await GatewayParityFixture.StartAsync("JS-A", "JS-B");
await using var pubConn = new NatsConnection(new NatsOpts
{
Url = $"nats://127.0.0.1:{fx.A.Port}",
});
await pubConn.ConnectAsync();
await using var subConn = new NatsConnection(new NatsOpts
{
Url = $"nats://127.0.0.1:{fx.B.Port}",
});
await subConn.ConnectAsync();
await using var sub = await subConn.SubscribeCoreAsync<string>("js.cluster.test");
await subConn.PingAsync();
await fx.WaitForRemoteInterestOnAAsync("js.cluster.test");
await pubConn.PublishAsync("js.cluster.test", "jscluster");
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(3));
var msg = await sub.Msgs.ReadAsync(timeout.Token);
msg.Data.ShouldBe("jscluster");
}
// ── TestGatewayInterestTracker concurrent safety ─────────────────────
// Go: TestGatewayRaceOnClose (gateway_test.go:3674)
[Fact]
public async Task InterestTracker_concurrent_track_and_forward_is_safe()
{
var tracker = new GatewayInterestTracker(noInterestThreshold: 100);
var tasks = Enumerable.Range(0, 10).Select(i => Task.Run(() =>
{
for (int j = 0; j < 50; j++)
{
tracker.TrackNoInterest("$G", $"subject.{i}.{j}");
tracker.TrackInterest("$G", $"subject.{i}.{j}");
tracker.ShouldForward("$G", $"subject.{i}.{j}");
}
})).ToArray();
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
await Task.WhenAll(tasks).WaitAsync(cts.Token);
}
// Go: TestGatewaySwitchToInterestOnlyModeImmediately (gateway_test.go:6934)
[Fact]
public void InterestTracker_switch_to_interest_only_is_idempotent()
{
var tracker = new GatewayInterestTracker();
tracker.SwitchToInterestOnly("$G");
tracker.SwitchToInterestOnly("$G"); // Should not throw or change mode
tracker.GetMode("$G").ShouldBe(GatewayInterestMode.InterestOnly);
}
// ── Helpers ─────────────────────────────────────────────────────────
private static async Task<string> SocketReadLineAsync(Socket socket, CancellationToken ct)
{
var bytes = new List<byte>(64);
var single = new byte[1];
while (true)
{
var read = await socket.ReceiveAsync(single, SocketFlags.None, ct);
if (read == 0)
break;
if (single[0] == (byte)'\n')
break;
if (single[0] != (byte)'\r')
bytes.Add(single[0]);
}
return Encoding.ASCII.GetString([.. bytes]);
}
private static Task SocketWriteLineAsync(Socket socket, string line, CancellationToken ct)
=> socket.SendAsync(Encoding.ASCII.GetBytes($"{line}\r\n"), SocketFlags.None, ct).AsTask();
}
/// <summary>
/// Shared two-cluster gateway fixture for GatewayGoParityTests.
/// Starts server A (no remotes) and server B (remotes → A).
/// </summary>
internal sealed class GatewayParityFixture : IAsyncDisposable
{
private readonly CancellationTokenSource _aCts;
private readonly CancellationTokenSource _bCts;
private bool _bShutdown;
private GatewayParityFixture(
NatsServer a,
NatsServer b,
CancellationTokenSource aCts,
CancellationTokenSource bCts)
{
A = a;
B = b;
_aCts = aCts;
_bCts = bCts;
}
public NatsServer A { get; }
public NatsServer B { get; }
public static async Task<GatewayParityFixture> StartAsync(string nameA, string nameB)
{
var aOptions = new NatsOptions
{
Host = "127.0.0.1",
Port = 0,
Gateway = new GatewayOptions
{
Name = nameA,
Host = "127.0.0.1",
Port = 0,
},
};
var a = new NatsServer(aOptions, NullLoggerFactory.Instance);
var aCts = new CancellationTokenSource();
_ = a.StartAsync(aCts.Token);
await a.WaitForReadyAsync();
var bOptions = new NatsOptions
{
Host = "127.0.0.1",
Port = 0,
Gateway = new GatewayOptions
{
Name = nameB,
Host = "127.0.0.1",
Port = 0,
Remotes = [a.GatewayListen!],
},
};
var b = new NatsServer(bOptions, NullLoggerFactory.Instance);
var bCts = new CancellationTokenSource();
_ = b.StartAsync(bCts.Token);
await b.WaitForReadyAsync();
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
while (!timeout.IsCancellationRequested && (a.Stats.Gateways == 0 || b.Stats.Gateways == 0))
await Task.Delay(30, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
return new GatewayParityFixture(a, b, aCts, bCts);
}
public async Task ShutdownBAsync()
{
if (_bShutdown)
return;
_bShutdown = true;
await _bCts.CancelAsync();
B.Dispose();
}
public async Task WaitForRemoteInterestOnAAsync(string subject)
{
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
while (!timeout.IsCancellationRequested && !A.HasRemoteInterest(subject))
await Task.Delay(30, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
if (!A.HasRemoteInterest(subject))
throw new TimeoutException($"Timed out waiting for remote interest on A for subject '{subject}'.");
}
public async Task WaitForRemoteInterestOnRemoteAsync(string subject)
{
// Wait for B to see interest from A for the given subject
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
while (!timeout.IsCancellationRequested && !B.HasRemoteInterest(subject))
await Task.Delay(30, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
if (!B.HasRemoteInterest(subject))
throw new TimeoutException($"Timed out waiting for remote interest on B for subject '{subject}'.");
}
public async ValueTask DisposeAsync()
{
await _aCts.CancelAsync();
if (!_bShutdown)
await _bCts.CancelAsync();
A.Dispose();
if (!_bShutdown)
B.Dispose();
_aCts.Dispose();
_bCts.Dispose();
}
}