Move 25 gateway-related test files from NATS.Server.Tests into a dedicated NATS.Server.Gateways.Tests project. Update namespaces, replace private ReadUntilAsync with SocketTestHelper from TestUtilities, inline TestServerFactory usage, add InternalsVisibleTo, and register the project in the solution file. All 261 tests pass.
899 lines
35 KiB
C#
899 lines
35 KiB
C#
using System.Net;
|
|
using System.Net.Sockets;
|
|
using System.Text;
|
|
using Microsoft.Extensions.Logging.Abstractions;
|
|
using NATS.Client.Core;
|
|
using NATS.Server.Configuration;
|
|
using NATS.Server.Gateways;
|
|
using NATS.Server.Subscriptions;
|
|
|
|
namespace NATS.Server.Gateways.Tests.Gateways;
|
|
|
|
/// <summary>
|
|
/// Gateway connection establishment, handshake, lifecycle, and reconnection tests.
|
|
/// Ported from golang/nats-server/server/gateway_test.go.
|
|
/// </summary>
|
|
public class GatewayConnectionTests
|
|
{
|
|
// ── Handshake and Connection Establishment ──────────────────────────
|
|
|
|
// Go: TestGatewayBasic server/gateway_test.go:399
|
|
[Fact]
|
|
public async Task Gateway_outbound_handshake_sets_remote_id()
|
|
{
|
|
using var listener = new TcpListener(IPAddress.Loopback, 0);
|
|
listener.Start();
|
|
var port = ((IPEndPoint)listener.LocalEndpoint).Port;
|
|
|
|
using var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
|
await clientSocket.ConnectAsync(IPAddress.Loopback, port);
|
|
using var serverSocket = await listener.AcceptSocketAsync();
|
|
|
|
await using var gw = new GatewayConnection(serverSocket);
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
|
|
var handshake = gw.PerformOutboundHandshakeAsync("LOCAL-SERVER", cts.Token);
|
|
var line = await ReadLineAsync(clientSocket, cts.Token);
|
|
line.ShouldBe("GATEWAY LOCAL-SERVER");
|
|
await WriteLineAsync(clientSocket, "GATEWAY REMOTE-SERVER", cts.Token);
|
|
await handshake;
|
|
|
|
gw.RemoteId.ShouldBe("REMOTE-SERVER");
|
|
}
|
|
|
|
// Go: TestGatewayBasic server/gateway_test.go:399
|
|
[Fact]
|
|
public async Task Gateway_inbound_handshake_sets_remote_id()
|
|
{
|
|
using var listener = new TcpListener(IPAddress.Loopback, 0);
|
|
listener.Start();
|
|
var port = ((IPEndPoint)listener.LocalEndpoint).Port;
|
|
|
|
using var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
|
await clientSocket.ConnectAsync(IPAddress.Loopback, port);
|
|
using var serverSocket = await listener.AcceptSocketAsync();
|
|
|
|
await using var gw = new GatewayConnection(serverSocket);
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
|
|
var handshake = gw.PerformInboundHandshakeAsync("LOCAL-SERVER", cts.Token);
|
|
await WriteLineAsync(clientSocket, "GATEWAY REMOTE-CLIENT", cts.Token);
|
|
var line = await ReadLineAsync(clientSocket, cts.Token);
|
|
line.ShouldBe("GATEWAY LOCAL-SERVER");
|
|
await handshake;
|
|
|
|
gw.RemoteId.ShouldBe("REMOTE-CLIENT");
|
|
}
|
|
|
|
// Go: TestGatewayBasic server/gateway_test.go:399
|
|
[Fact]
|
|
public async Task Gateway_handshake_rejects_invalid_protocol()
|
|
{
|
|
using var listener = new TcpListener(IPAddress.Loopback, 0);
|
|
listener.Start();
|
|
var port = ((IPEndPoint)listener.LocalEndpoint).Port;
|
|
|
|
using var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
|
await clientSocket.ConnectAsync(IPAddress.Loopback, port);
|
|
using var serverSocket = await listener.AcceptSocketAsync();
|
|
|
|
await using var gw = new GatewayConnection(serverSocket);
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
|
|
var handshake = gw.PerformInboundHandshakeAsync("LOCAL", cts.Token);
|
|
await WriteLineAsync(clientSocket, "INVALID protocol", cts.Token);
|
|
|
|
await Should.ThrowAsync<InvalidOperationException>(async () => await handshake);
|
|
}
|
|
|
|
// Go: TestGatewayBasic server/gateway_test.go:399
|
|
[Fact]
|
|
public async Task Gateway_handshake_rejects_empty_id()
|
|
{
|
|
using var listener = new TcpListener(IPAddress.Loopback, 0);
|
|
listener.Start();
|
|
var port = ((IPEndPoint)listener.LocalEndpoint).Port;
|
|
|
|
using var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
|
await clientSocket.ConnectAsync(IPAddress.Loopback, port);
|
|
using var serverSocket = await listener.AcceptSocketAsync();
|
|
|
|
await using var gw = new GatewayConnection(serverSocket);
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
|
|
var handshake = gw.PerformInboundHandshakeAsync("LOCAL", cts.Token);
|
|
await WriteLineAsync(clientSocket, "GATEWAY ", cts.Token);
|
|
|
|
await Should.ThrowAsync<InvalidOperationException>(async () => await handshake);
|
|
}
|
|
|
|
// Go: TestGatewayBasic server/gateway_test.go:399
|
|
[Fact]
|
|
public async Task Two_clusters_establish_gateway_connections()
|
|
{
|
|
await using var fixture = await GatewayConnectionFixture.StartAsync();
|
|
|
|
fixture.Local.Stats.Gateways.ShouldBeGreaterThan(0);
|
|
fixture.Remote.Stats.Gateways.ShouldBeGreaterThan(0);
|
|
}
|
|
|
|
// Go: TestGatewayBasic server/gateway_test.go:399
|
|
[Fact]
|
|
public async Task Gateway_connection_count_tracked_in_stats()
|
|
{
|
|
await using var fixture = await GatewayConnectionFixture.StartAsync();
|
|
|
|
fixture.Local.Stats.Gateways.ShouldBeGreaterThanOrEqualTo(1);
|
|
fixture.Remote.Stats.Gateways.ShouldBeGreaterThanOrEqualTo(1);
|
|
}
|
|
|
|
// Go: TestGatewayDoesntSendBackToItself server/gateway_test.go:2150
|
|
[Fact]
|
|
public async Task Gateway_does_not_create_echo_cycle()
|
|
{
|
|
await using var fixture = await GatewayConnectionFixture.StartAsync();
|
|
|
|
await using var remoteSub = new NatsConnection(new NatsOpts
|
|
{
|
|
Url = $"nats://127.0.0.1:{fixture.Remote.Port}",
|
|
});
|
|
await remoteSub.ConnectAsync();
|
|
|
|
await using var localConn = new NatsConnection(new NatsOpts
|
|
{
|
|
Url = $"nats://127.0.0.1:{fixture.Local.Port}",
|
|
});
|
|
await localConn.ConnectAsync();
|
|
|
|
await using var sub = await remoteSub.SubscribeCoreAsync<string>("cycle.test");
|
|
await remoteSub.PingAsync();
|
|
await fixture.WaitForRemoteInterestOnLocalAsync("cycle.test");
|
|
|
|
await localConn.PublishAsync("cycle.test", "ping");
|
|
await localConn.PingAsync();
|
|
|
|
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(3));
|
|
var msg = await sub.Msgs.ReadAsync(timeout.Token);
|
|
msg.Data.ShouldBe("ping");
|
|
|
|
// Verify no additional cycle messages arrive
|
|
await Task.Delay(200);
|
|
using var noMoreTimeout = new CancellationTokenSource(TimeSpan.FromMilliseconds(300));
|
|
await Should.ThrowAsync<OperationCanceledException>(async () =>
|
|
await sub.Msgs.ReadAsync(noMoreTimeout.Token));
|
|
}
|
|
|
|
// Go: TestGatewaySolicitShutdown server/gateway_test.go:784
|
|
[Fact]
|
|
public async Task Gateway_manager_shutdown_does_not_hang()
|
|
{
|
|
var options = new GatewayOptions
|
|
{
|
|
Name = "TEST",
|
|
Host = "127.0.0.1",
|
|
Port = 0,
|
|
Remotes = ["127.0.0.1:19999"], // Non-existent host
|
|
};
|
|
var manager = new GatewayManager(
|
|
options,
|
|
new ServerStats(),
|
|
"S1",
|
|
_ => { },
|
|
_ => { },
|
|
NullLogger<GatewayManager>.Instance);
|
|
|
|
await manager.StartAsync(CancellationToken.None);
|
|
// Dispose should complete promptly even with pending reconnect attempts
|
|
var disposeTask = manager.DisposeAsync().AsTask();
|
|
var completed = await Task.WhenAny(disposeTask, Task.Delay(TimeSpan.FromSeconds(5)));
|
|
completed.ShouldBe(disposeTask, "DisposeAsync should complete within timeout");
|
|
}
|
|
|
|
// Go: TestGatewayBasic server/gateway_test.go:399 (reconnection part)
|
|
[Fact]
|
|
public async Task Gateway_reconnects_after_remote_shutdown()
|
|
{
|
|
var localOptions = new NatsOptions
|
|
{
|
|
Host = "127.0.0.1",
|
|
Port = 0,
|
|
Gateway = new GatewayOptions
|
|
{
|
|
Name = "LOCAL",
|
|
Host = "127.0.0.1",
|
|
Port = 0,
|
|
},
|
|
};
|
|
|
|
var local = new NatsServer(localOptions, NullLoggerFactory.Instance);
|
|
var localCts = new CancellationTokenSource();
|
|
_ = local.StartAsync(localCts.Token);
|
|
await local.WaitForReadyAsync();
|
|
|
|
// Start remote
|
|
var remoteOptions = new NatsOptions
|
|
{
|
|
Host = "127.0.0.1",
|
|
Port = 0,
|
|
Gateway = new GatewayOptions
|
|
{
|
|
Name = "REMOTE",
|
|
Host = "127.0.0.1",
|
|
Port = 0,
|
|
Remotes = [local.GatewayListen!],
|
|
},
|
|
};
|
|
var remote = new NatsServer(remoteOptions, NullLoggerFactory.Instance);
|
|
var remoteCts = new CancellationTokenSource();
|
|
_ = remote.StartAsync(remoteCts.Token);
|
|
await remote.WaitForReadyAsync();
|
|
|
|
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
while (!timeout.IsCancellationRequested && (local.Stats.Gateways == 0 || remote.Stats.Gateways == 0))
|
|
await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
|
|
|
|
local.Stats.Gateways.ShouldBeGreaterThan(0);
|
|
remote.Stats.Gateways.ShouldBeGreaterThan(0);
|
|
|
|
// Shutdown remote
|
|
await remoteCts.CancelAsync();
|
|
remote.Dispose();
|
|
|
|
// Wait for gateway count to drop
|
|
using var dropTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
while (!dropTimeout.IsCancellationRequested && local.Stats.Gateways > 0)
|
|
await Task.Delay(50, dropTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
|
|
|
|
// Restart remote connecting to local
|
|
var remote2Options = new NatsOptions
|
|
{
|
|
Host = "127.0.0.1",
|
|
Port = 0,
|
|
Gateway = new GatewayOptions
|
|
{
|
|
Name = "REMOTE2",
|
|
Host = "127.0.0.1",
|
|
Port = 0,
|
|
Remotes = [local.GatewayListen!],
|
|
},
|
|
};
|
|
var remote2 = new NatsServer(remote2Options, NullLoggerFactory.Instance);
|
|
var remote2Cts = new CancellationTokenSource();
|
|
_ = remote2.StartAsync(remote2Cts.Token);
|
|
await remote2.WaitForReadyAsync();
|
|
|
|
// Wait for new gateway link
|
|
using var reconTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
while (!reconTimeout.IsCancellationRequested && (local.Stats.Gateways == 0 || remote2.Stats.Gateways == 0))
|
|
await Task.Delay(50, reconTimeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
|
|
|
|
local.Stats.Gateways.ShouldBeGreaterThan(0);
|
|
remote2.Stats.Gateways.ShouldBeGreaterThan(0);
|
|
|
|
await localCts.CancelAsync();
|
|
await remote2Cts.CancelAsync();
|
|
local.Dispose();
|
|
remote2.Dispose();
|
|
localCts.Dispose();
|
|
remote2Cts.Dispose();
|
|
remoteCts.Dispose();
|
|
}
|
|
|
|
// Go: TestGatewayNoReconnectOnClose server/gateway_test.go:1735
|
|
[Fact]
|
|
public async Task Connection_read_loop_starts_and_processes_messages()
|
|
{
|
|
using var listener = new TcpListener(IPAddress.Loopback, 0);
|
|
listener.Start();
|
|
var port = ((IPEndPoint)listener.LocalEndpoint).Port;
|
|
|
|
using var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
|
await clientSocket.ConnectAsync(IPAddress.Loopback, port);
|
|
using var serverSocket = await listener.AcceptSocketAsync();
|
|
|
|
await using var gw = new GatewayConnection(serverSocket);
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
|
|
// Perform handshake
|
|
var handshake = gw.PerformOutboundHandshakeAsync("LOCAL", cts.Token);
|
|
await ReadLineAsync(clientSocket, cts.Token);
|
|
await WriteLineAsync(clientSocket, "GATEWAY REMOTE", cts.Token);
|
|
await handshake;
|
|
|
|
var receivedMessage = new TaskCompletionSource<GatewayMessage>();
|
|
gw.MessageReceived = msg =>
|
|
{
|
|
receivedMessage.TrySetResult(msg);
|
|
return Task.CompletedTask;
|
|
};
|
|
gw.StartLoop(cts.Token);
|
|
|
|
// Send a GMSG message
|
|
var payload = "hello-gateway"u8.ToArray();
|
|
var line = $"GMSG $G test.subject - {payload.Length}\r\n";
|
|
await clientSocket.SendAsync(Encoding.ASCII.GetBytes(line), SocketFlags.None, cts.Token);
|
|
await clientSocket.SendAsync(payload, SocketFlags.None, cts.Token);
|
|
await clientSocket.SendAsync("\r\n"u8.ToArray(), SocketFlags.None, cts.Token);
|
|
|
|
var msg = await receivedMessage.Task.WaitAsync(cts.Token);
|
|
msg.Subject.ShouldBe("test.subject");
|
|
msg.ReplyTo.ShouldBeNull();
|
|
Encoding.UTF8.GetString(msg.Payload.Span).ShouldBe("hello-gateway");
|
|
}
|
|
|
|
// Go: TestGatewayBasic server/gateway_test.go:399
|
|
[Fact]
|
|
public async Task Connection_read_loop_processes_gmsg_with_reply()
|
|
{
|
|
using var listener = new TcpListener(IPAddress.Loopback, 0);
|
|
listener.Start();
|
|
var port = ((IPEndPoint)listener.LocalEndpoint).Port;
|
|
|
|
using var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
|
await clientSocket.ConnectAsync(IPAddress.Loopback, port);
|
|
using var serverSocket = await listener.AcceptSocketAsync();
|
|
|
|
await using var gw = new GatewayConnection(serverSocket);
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
|
|
var handshake = gw.PerformOutboundHandshakeAsync("LOCAL", cts.Token);
|
|
await ReadLineAsync(clientSocket, cts.Token);
|
|
await WriteLineAsync(clientSocket, "GATEWAY REMOTE", cts.Token);
|
|
await handshake;
|
|
|
|
var receivedMessage = new TaskCompletionSource<GatewayMessage>();
|
|
gw.MessageReceived = msg =>
|
|
{
|
|
receivedMessage.TrySetResult(msg);
|
|
return Task.CompletedTask;
|
|
};
|
|
gw.StartLoop(cts.Token);
|
|
|
|
var payload = "data"u8.ToArray();
|
|
var line = $"GMSG $G test.subject _INBOX.abc {payload.Length}\r\n";
|
|
await clientSocket.SendAsync(Encoding.ASCII.GetBytes(line), SocketFlags.None, cts.Token);
|
|
await clientSocket.SendAsync(payload, SocketFlags.None, cts.Token);
|
|
await clientSocket.SendAsync("\r\n"u8.ToArray(), SocketFlags.None, cts.Token);
|
|
|
|
var msg = await receivedMessage.Task.WaitAsync(cts.Token);
|
|
msg.Subject.ShouldBe("test.subject");
|
|
msg.ReplyTo.ShouldBe("_INBOX.abc");
|
|
msg.Account.ShouldBe("$G");
|
|
}
|
|
|
|
// Go: TestGatewayBasic server/gateway_test.go:399
|
|
[Fact]
|
|
public async Task Connection_read_loop_processes_account_scoped_gmsg()
|
|
{
|
|
using var listener = new TcpListener(IPAddress.Loopback, 0);
|
|
listener.Start();
|
|
var port = ((IPEndPoint)listener.LocalEndpoint).Port;
|
|
|
|
using var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
|
await clientSocket.ConnectAsync(IPAddress.Loopback, port);
|
|
using var serverSocket = await listener.AcceptSocketAsync();
|
|
|
|
await using var gw = new GatewayConnection(serverSocket);
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
|
|
var handshake = gw.PerformOutboundHandshakeAsync("LOCAL", cts.Token);
|
|
await ReadLineAsync(clientSocket, cts.Token);
|
|
await WriteLineAsync(clientSocket, "GATEWAY REMOTE", cts.Token);
|
|
await handshake;
|
|
|
|
var receivedMessage = new TaskCompletionSource<GatewayMessage>();
|
|
gw.MessageReceived = msg =>
|
|
{
|
|
receivedMessage.TrySetResult(msg);
|
|
return Task.CompletedTask;
|
|
};
|
|
gw.StartLoop(cts.Token);
|
|
|
|
var payload = "msg"u8.ToArray();
|
|
var line = $"GMSG ACCT test.subject - {payload.Length}\r\n";
|
|
await clientSocket.SendAsync(Encoding.ASCII.GetBytes(line), SocketFlags.None, cts.Token);
|
|
await clientSocket.SendAsync(payload, SocketFlags.None, cts.Token);
|
|
await clientSocket.SendAsync("\r\n"u8.ToArray(), SocketFlags.None, cts.Token);
|
|
|
|
var msg = await receivedMessage.Task.WaitAsync(cts.Token);
|
|
msg.Account.ShouldBe("ACCT");
|
|
msg.Subject.ShouldBe("test.subject");
|
|
}
|
|
|
|
// Go: TestGatewayDontSendSubInterest server/gateway_test.go:1755
|
|
[Fact]
|
|
public async Task Connection_read_loop_processes_aplus_interest()
|
|
{
|
|
using var listener = new TcpListener(IPAddress.Loopback, 0);
|
|
listener.Start();
|
|
var port = ((IPEndPoint)listener.LocalEndpoint).Port;
|
|
|
|
using var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
|
await clientSocket.ConnectAsync(IPAddress.Loopback, port);
|
|
using var serverSocket = await listener.AcceptSocketAsync();
|
|
|
|
await using var gw = new GatewayConnection(serverSocket);
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
|
|
var handshake = gw.PerformOutboundHandshakeAsync("LOCAL", cts.Token);
|
|
await ReadLineAsync(clientSocket, cts.Token);
|
|
await WriteLineAsync(clientSocket, "GATEWAY REMOTE", cts.Token);
|
|
await handshake;
|
|
|
|
var receivedSub = new TaskCompletionSource<RemoteSubscription>();
|
|
gw.RemoteSubscriptionReceived = sub =>
|
|
{
|
|
receivedSub.TrySetResult(sub);
|
|
return Task.CompletedTask;
|
|
};
|
|
gw.StartLoop(cts.Token);
|
|
|
|
await WriteLineAsync(clientSocket, "A+ MYACC orders.>", cts.Token);
|
|
|
|
var sub = await receivedSub.Task.WaitAsync(cts.Token);
|
|
sub.Subject.ShouldBe("orders.>");
|
|
sub.Account.ShouldBe("MYACC");
|
|
sub.IsRemoval.ShouldBeFalse();
|
|
}
|
|
|
|
// Go: TestGatewayAccountUnsub server/gateway_test.go:1912
|
|
[Fact]
|
|
public async Task Connection_read_loop_processes_aminus_interest()
|
|
{
|
|
using var listener = new TcpListener(IPAddress.Loopback, 0);
|
|
listener.Start();
|
|
var port = ((IPEndPoint)listener.LocalEndpoint).Port;
|
|
|
|
using var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
|
await clientSocket.ConnectAsync(IPAddress.Loopback, port);
|
|
using var serverSocket = await listener.AcceptSocketAsync();
|
|
|
|
await using var gw = new GatewayConnection(serverSocket);
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
|
|
var handshake = gw.PerformOutboundHandshakeAsync("LOCAL", cts.Token);
|
|
await ReadLineAsync(clientSocket, cts.Token);
|
|
await WriteLineAsync(clientSocket, "GATEWAY REMOTE", cts.Token);
|
|
await handshake;
|
|
|
|
var receivedSubs = new List<RemoteSubscription>();
|
|
var tcs = new TaskCompletionSource();
|
|
gw.RemoteSubscriptionReceived = sub =>
|
|
{
|
|
receivedSubs.Add(sub);
|
|
if (receivedSubs.Count >= 2)
|
|
tcs.TrySetResult();
|
|
return Task.CompletedTask;
|
|
};
|
|
gw.StartLoop(cts.Token);
|
|
|
|
await WriteLineAsync(clientSocket, "A+ ACC foo.*", cts.Token);
|
|
await WriteLineAsync(clientSocket, "A- ACC foo.*", cts.Token);
|
|
|
|
await tcs.Task.WaitAsync(cts.Token);
|
|
receivedSubs[0].IsRemoval.ShouldBeFalse();
|
|
receivedSubs[1].IsRemoval.ShouldBeTrue();
|
|
}
|
|
|
|
// Go: TestGatewayQueueSub server/gateway_test.go:2265
|
|
[Fact]
|
|
public async Task Connection_read_loop_processes_aplus_with_queue()
|
|
{
|
|
using var listener = new TcpListener(IPAddress.Loopback, 0);
|
|
listener.Start();
|
|
var port = ((IPEndPoint)listener.LocalEndpoint).Port;
|
|
|
|
using var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
|
await clientSocket.ConnectAsync(IPAddress.Loopback, port);
|
|
using var serverSocket = await listener.AcceptSocketAsync();
|
|
|
|
await using var gw = new GatewayConnection(serverSocket);
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
|
|
var handshake = gw.PerformOutboundHandshakeAsync("LOCAL", cts.Token);
|
|
await ReadLineAsync(clientSocket, cts.Token);
|
|
await WriteLineAsync(clientSocket, "GATEWAY REMOTE", cts.Token);
|
|
await handshake;
|
|
|
|
var receivedSub = new TaskCompletionSource<RemoteSubscription>();
|
|
gw.RemoteSubscriptionReceived = sub =>
|
|
{
|
|
receivedSub.TrySetResult(sub);
|
|
return Task.CompletedTask;
|
|
};
|
|
gw.StartLoop(cts.Token);
|
|
|
|
await WriteLineAsync(clientSocket, "A+ $G foo.bar workers", cts.Token);
|
|
|
|
var sub = await receivedSub.Task.WaitAsync(cts.Token);
|
|
sub.Subject.ShouldBe("foo.bar");
|
|
sub.Queue.ShouldBe("workers");
|
|
sub.Account.ShouldBe("$G");
|
|
}
|
|
|
|
// Go: TestGatewayBasic server/gateway_test.go:399
|
|
[Fact]
|
|
public async Task Send_message_writes_gmsg_protocol()
|
|
{
|
|
using var listener = new TcpListener(IPAddress.Loopback, 0);
|
|
listener.Start();
|
|
var port = ((IPEndPoint)listener.LocalEndpoint).Port;
|
|
|
|
using var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
|
await clientSocket.ConnectAsync(IPAddress.Loopback, port);
|
|
using var serverSocket = await listener.AcceptSocketAsync();
|
|
|
|
await using var gw = new GatewayConnection(serverSocket);
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
|
|
var handshake = gw.PerformOutboundHandshakeAsync("LOCAL", cts.Token);
|
|
await ReadLineAsync(clientSocket, cts.Token);
|
|
await WriteLineAsync(clientSocket, "GATEWAY REMOTE", cts.Token);
|
|
await handshake;
|
|
|
|
var payload = Encoding.UTF8.GetBytes("payload-data");
|
|
await gw.SendMessageAsync("$G", "test.subject", "_INBOX.reply", payload, cts.Token);
|
|
|
|
var buf = new byte[4096];
|
|
var total = new StringBuilder();
|
|
using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
|
|
while (true)
|
|
{
|
|
var n = await clientSocket.ReceiveAsync(buf, SocketFlags.None, readCts.Token);
|
|
if (n == 0) break;
|
|
total.Append(Encoding.ASCII.GetString(buf, 0, n));
|
|
if (total.ToString().Contains("payload-data", StringComparison.Ordinal))
|
|
break;
|
|
}
|
|
|
|
var received = total.ToString();
|
|
received.ShouldContain("GMSG $G test.subject _INBOX.reply");
|
|
received.ShouldContain("payload-data");
|
|
}
|
|
|
|
// Go: TestGatewayBasic server/gateway_test.go:399
|
|
[Fact]
|
|
public async Task Send_aplus_writes_interest_protocol()
|
|
{
|
|
using var listener = new TcpListener(IPAddress.Loopback, 0);
|
|
listener.Start();
|
|
var port = ((IPEndPoint)listener.LocalEndpoint).Port;
|
|
|
|
using var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
|
await clientSocket.ConnectAsync(IPAddress.Loopback, port);
|
|
using var serverSocket = await listener.AcceptSocketAsync();
|
|
|
|
await using var gw = new GatewayConnection(serverSocket);
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
|
|
var handshake = gw.PerformOutboundHandshakeAsync("LOCAL", cts.Token);
|
|
await ReadLineAsync(clientSocket, cts.Token);
|
|
await WriteLineAsync(clientSocket, "GATEWAY REMOTE", cts.Token);
|
|
await handshake;
|
|
|
|
await gw.SendAPlusAsync("$G", "orders.>", null, cts.Token);
|
|
|
|
var line = await ReadLineAsync(clientSocket, cts.Token);
|
|
line.ShouldBe("A+ $G orders.>");
|
|
}
|
|
|
|
// Go: TestGatewayQueueSub server/gateway_test.go:2265
|
|
[Fact]
|
|
public async Task Send_aplus_with_queue_writes_interest_protocol()
|
|
{
|
|
using var listener = new TcpListener(IPAddress.Loopback, 0);
|
|
listener.Start();
|
|
var port = ((IPEndPoint)listener.LocalEndpoint).Port;
|
|
|
|
using var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
|
await clientSocket.ConnectAsync(IPAddress.Loopback, port);
|
|
using var serverSocket = await listener.AcceptSocketAsync();
|
|
|
|
await using var gw = new GatewayConnection(serverSocket);
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
|
|
var handshake = gw.PerformOutboundHandshakeAsync("LOCAL", cts.Token);
|
|
await ReadLineAsync(clientSocket, cts.Token);
|
|
await WriteLineAsync(clientSocket, "GATEWAY REMOTE", cts.Token);
|
|
await handshake;
|
|
|
|
await gw.SendAPlusAsync("$G", "foo", "workers", cts.Token);
|
|
|
|
var line = await ReadLineAsync(clientSocket, cts.Token);
|
|
line.ShouldBe("A+ $G foo workers");
|
|
}
|
|
|
|
// Go: TestGatewayAccountUnsub server/gateway_test.go:1912
|
|
[Fact]
|
|
public async Task Send_aminus_writes_unsubscribe_interest_protocol()
|
|
{
|
|
using var listener = new TcpListener(IPAddress.Loopback, 0);
|
|
listener.Start();
|
|
var port = ((IPEndPoint)listener.LocalEndpoint).Port;
|
|
|
|
using var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
|
await clientSocket.ConnectAsync(IPAddress.Loopback, port);
|
|
using var serverSocket = await listener.AcceptSocketAsync();
|
|
|
|
await using var gw = new GatewayConnection(serverSocket);
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
|
|
var handshake = gw.PerformOutboundHandshakeAsync("LOCAL", cts.Token);
|
|
await ReadLineAsync(clientSocket, cts.Token);
|
|
await WriteLineAsync(clientSocket, "GATEWAY REMOTE", cts.Token);
|
|
await handshake;
|
|
|
|
await gw.SendAMinusAsync("$G", "orders.>", null, cts.Token);
|
|
|
|
var line = await ReadLineAsync(clientSocket, cts.Token);
|
|
line.ShouldBe("A- $G orders.>");
|
|
}
|
|
|
|
// Go: TestGatewayBasic server/gateway_test.go:399
|
|
[Fact]
|
|
public async Task Send_message_with_no_reply_uses_dash()
|
|
{
|
|
using var listener = new TcpListener(IPAddress.Loopback, 0);
|
|
listener.Start();
|
|
var port = ((IPEndPoint)listener.LocalEndpoint).Port;
|
|
|
|
using var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
|
await clientSocket.ConnectAsync(IPAddress.Loopback, port);
|
|
using var serverSocket = await listener.AcceptSocketAsync();
|
|
|
|
await using var gw = new GatewayConnection(serverSocket);
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
|
|
var handshake = gw.PerformOutboundHandshakeAsync("LOCAL", cts.Token);
|
|
await ReadLineAsync(clientSocket, cts.Token);
|
|
await WriteLineAsync(clientSocket, "GATEWAY REMOTE", cts.Token);
|
|
await handshake;
|
|
|
|
await gw.SendMessageAsync("$G", "test.subject", null, new byte[] { 0x41 }, cts.Token);
|
|
|
|
var buf = new byte[4096];
|
|
var total = new StringBuilder();
|
|
using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
|
|
while (true)
|
|
{
|
|
var n = await clientSocket.ReceiveAsync(buf, SocketFlags.None, readCts.Token);
|
|
if (n == 0) break;
|
|
total.Append(Encoding.ASCII.GetString(buf, 0, n));
|
|
if (total.ToString().Contains("\r\n", StringComparison.Ordinal) && total.Length > 20)
|
|
break;
|
|
}
|
|
|
|
total.ToString().ShouldContain("GMSG $G test.subject - 1");
|
|
}
|
|
|
|
// Go: TestGatewayBasic server/gateway_test.go:399
|
|
[Fact]
|
|
public async Task Send_message_with_empty_payload()
|
|
{
|
|
using var listener = new TcpListener(IPAddress.Loopback, 0);
|
|
listener.Start();
|
|
var port = ((IPEndPoint)listener.LocalEndpoint).Port;
|
|
|
|
using var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
|
await clientSocket.ConnectAsync(IPAddress.Loopback, port);
|
|
using var serverSocket = await listener.AcceptSocketAsync();
|
|
|
|
await using var gw = new GatewayConnection(serverSocket);
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
|
|
var handshake = gw.PerformOutboundHandshakeAsync("LOCAL", cts.Token);
|
|
await ReadLineAsync(clientSocket, cts.Token);
|
|
await WriteLineAsync(clientSocket, "GATEWAY REMOTE", cts.Token);
|
|
await handshake;
|
|
|
|
await gw.SendMessageAsync("$G", "test.empty", null, ReadOnlyMemory<byte>.Empty, cts.Token);
|
|
|
|
var buf = new byte[4096];
|
|
var total = new StringBuilder();
|
|
using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
|
|
while (true)
|
|
{
|
|
var n = await clientSocket.ReceiveAsync(buf, SocketFlags.None, readCts.Token);
|
|
if (n == 0) break;
|
|
total.Append(Encoding.ASCII.GetString(buf, 0, n));
|
|
if (total.ToString().Contains("GMSG", StringComparison.Ordinal))
|
|
break;
|
|
}
|
|
|
|
total.ToString().ShouldContain("GMSG $G test.empty - 0");
|
|
}
|
|
|
|
// Go: TestGatewayBasic server/gateway_test.go:399
|
|
[Fact]
|
|
public async Task Connection_dispose_cleans_up_gracefully()
|
|
{
|
|
using var listener = new TcpListener(IPAddress.Loopback, 0);
|
|
listener.Start();
|
|
var port = ((IPEndPoint)listener.LocalEndpoint).Port;
|
|
|
|
using var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
|
await clientSocket.ConnectAsync(IPAddress.Loopback, port);
|
|
using var serverSocket = await listener.AcceptSocketAsync();
|
|
|
|
var gw = new GatewayConnection(serverSocket);
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
|
|
var handshake = gw.PerformOutboundHandshakeAsync("LOCAL", cts.Token);
|
|
await ReadLineAsync(clientSocket, cts.Token);
|
|
await WriteLineAsync(clientSocket, "GATEWAY REMOTE", cts.Token);
|
|
await handshake;
|
|
|
|
gw.StartLoop(cts.Token);
|
|
await gw.DisposeAsync(); // Should not throw
|
|
|
|
// Verify the connection is no longer usable after dispose
|
|
gw.RemoteId.ShouldBe("REMOTE");
|
|
}
|
|
|
|
// Go: TestGatewayBasic server/gateway_test.go:399
|
|
[Fact]
|
|
public async Task Multiple_concurrent_sends_are_serialized()
|
|
{
|
|
using var listener = new TcpListener(IPAddress.Loopback, 0);
|
|
listener.Start();
|
|
var port = ((IPEndPoint)listener.LocalEndpoint).Port;
|
|
|
|
using var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
|
await clientSocket.ConnectAsync(IPAddress.Loopback, port);
|
|
using var serverSocket = await listener.AcceptSocketAsync();
|
|
|
|
await using var gw = new GatewayConnection(serverSocket);
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
|
|
var handshake = gw.PerformOutboundHandshakeAsync("LOCAL", cts.Token);
|
|
await ReadLineAsync(clientSocket, cts.Token);
|
|
await WriteLineAsync(clientSocket, "GATEWAY REMOTE", cts.Token);
|
|
await handshake;
|
|
|
|
// Fire off concurrent sends
|
|
var tasks = new List<Task>();
|
|
for (int i = 0; i < 10; i++)
|
|
{
|
|
var idx = i;
|
|
tasks.Add(gw.SendMessageAsync("$G", $"sub.{idx}", null, Encoding.UTF8.GetBytes($"msg-{idx}"), cts.Token));
|
|
}
|
|
|
|
await Task.WhenAll(tasks);
|
|
|
|
// Drain all data from socket
|
|
var buf = new byte[8192];
|
|
var total = new StringBuilder();
|
|
using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
|
|
while (true)
|
|
{
|
|
try
|
|
{
|
|
var n = await clientSocket.ReceiveAsync(buf, SocketFlags.None, readCts.Token);
|
|
if (n == 0) break;
|
|
total.Append(Encoding.ASCII.GetString(buf, 0, n));
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
|
|
// All 10 messages should be present
|
|
var received = total.ToString();
|
|
for (int i = 0; i < 10; i++)
|
|
{
|
|
received.ShouldContain($"sub.{i}");
|
|
}
|
|
}
|
|
|
|
// ── Helpers ─────────────────────────────────────────────────────────
|
|
|
|
private static async Task<string> ReadLineAsync(Socket socket, CancellationToken ct)
|
|
{
|
|
var bytes = new List<byte>(64);
|
|
var single = new byte[1];
|
|
while (true)
|
|
{
|
|
var read = await socket.ReceiveAsync(single, SocketFlags.None, ct);
|
|
if (read == 0)
|
|
break;
|
|
if (single[0] == (byte)'\n')
|
|
break;
|
|
if (single[0] != (byte)'\r')
|
|
bytes.Add(single[0]);
|
|
}
|
|
|
|
return Encoding.ASCII.GetString([.. bytes]);
|
|
}
|
|
|
|
private static Task WriteLineAsync(Socket socket, string line, CancellationToken ct)
|
|
=> socket.SendAsync(Encoding.ASCII.GetBytes($"{line}\r\n"), SocketFlags.None, ct).AsTask();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Shared fixture for gateway connection tests that need two running server clusters.
|
|
/// </summary>
|
|
internal sealed class GatewayConnectionFixture : IAsyncDisposable
|
|
{
|
|
private readonly CancellationTokenSource _localCts;
|
|
private readonly CancellationTokenSource _remoteCts;
|
|
|
|
private GatewayConnectionFixture(NatsServer local, NatsServer remote, CancellationTokenSource localCts, CancellationTokenSource remoteCts)
|
|
{
|
|
Local = local;
|
|
Remote = remote;
|
|
_localCts = localCts;
|
|
_remoteCts = remoteCts;
|
|
}
|
|
|
|
public NatsServer Local { get; }
|
|
public NatsServer Remote { get; }
|
|
|
|
public static async Task<GatewayConnectionFixture> StartAsync()
|
|
{
|
|
var localOptions = new NatsOptions
|
|
{
|
|
Host = "127.0.0.1",
|
|
Port = 0,
|
|
Gateway = new GatewayOptions
|
|
{
|
|
Name = "LOCAL",
|
|
Host = "127.0.0.1",
|
|
Port = 0,
|
|
},
|
|
};
|
|
|
|
var local = new NatsServer(localOptions, NullLoggerFactory.Instance);
|
|
var localCts = new CancellationTokenSource();
|
|
_ = local.StartAsync(localCts.Token);
|
|
await local.WaitForReadyAsync();
|
|
|
|
var remoteOptions = new NatsOptions
|
|
{
|
|
Host = "127.0.0.1",
|
|
Port = 0,
|
|
Gateway = new GatewayOptions
|
|
{
|
|
Name = "REMOTE",
|
|
Host = "127.0.0.1",
|
|
Port = 0,
|
|
Remotes = [local.GatewayListen!],
|
|
},
|
|
};
|
|
|
|
var remote = new NatsServer(remoteOptions, NullLoggerFactory.Instance);
|
|
var remoteCts = new CancellationTokenSource();
|
|
_ = remote.StartAsync(remoteCts.Token);
|
|
await remote.WaitForReadyAsync();
|
|
|
|
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
while (!timeout.IsCancellationRequested && (local.Stats.Gateways == 0 || remote.Stats.Gateways == 0))
|
|
await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
|
|
|
|
return new GatewayConnectionFixture(local, remote, localCts, remoteCts);
|
|
}
|
|
|
|
public async Task WaitForRemoteInterestOnLocalAsync(string subject)
|
|
{
|
|
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
while (!timeout.IsCancellationRequested)
|
|
{
|
|
if (Local.HasRemoteInterest(subject))
|
|
return;
|
|
await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
|
|
}
|
|
|
|
throw new TimeoutException($"Timed out waiting for remote interest on '{subject}'.");
|
|
}
|
|
|
|
public async ValueTask DisposeAsync()
|
|
{
|
|
await _localCts.CancelAsync();
|
|
await _remoteCts.CancelAsync();
|
|
Local.Dispose();
|
|
Remote.Dispose();
|
|
_localCts.Dispose();
|
|
_remoteCts.Dispose();
|
|
}
|
|
}
|