Port Go NATS server test behaviors to .NET: - Client pub/sub (5 tests): simple, no-echo, reply, queue distribution, empty body - Client UNSUB (4 tests): unsub, auto-unsub max, unsub after auto, disconnect cleanup - Client headers (3 tests): HPUB/HMSG, server info headers, no-responders 503 - Client lifecycle (3 tests): connect proto, max subscriptions, auth timeout - Client slow consumer (1 test): pending limit detection and disconnect - Parser edge cases (3 tests + 2 bug fixes): PUB arg variations, malformed protocol, max control line - SubList concurrency (13 tests): race on remove/insert/match, large lists, invalid subjects, wildcards - Server config (4 tests): ephemeral port, server name, name defaults, lame duck - Route config (3 tests): cluster formation, cross-cluster messaging, reconnect - Gateway basic (2 tests): cross-cluster forwarding, no echo to origin - Leaf node basic (2 tests): hub-to-spoke and spoke-to-hub forwarding - Account import/export (2 tests): stream export/import delivery, isolation Also fixes NatsParser.ParseSub/ParseUnsub to throw ProtocolViolationException for short command lines instead of ArgumentOutOfRangeException. Full suite: 933 passed, 0 failed (up from 869).
186 lines
6.6 KiB
C#
186 lines
6.6 KiB
C#
using Microsoft.Extensions.Logging.Abstractions;
|
|
using NATS.Client.Core;
|
|
using NATS.Server.Configuration;
|
|
|
|
namespace NATS.Server.Tests.Gateways;
|
|
|
|
/// <summary>
|
|
/// Ports TestGatewayBasic and TestGatewayDoesntSendBackToItself from
|
|
/// golang/nats-server/server/gateway_test.go.
|
|
/// </summary>
|
|
public class GatewayBasicTests
|
|
{
|
|
[Fact]
|
|
public async Task Gateway_forwards_messages_between_clusters()
|
|
{
|
|
// Reference: TestGatewayBasic (gateway_test.go:399)
|
|
// Start LOCAL and REMOTE gateway servers. Subscribe on REMOTE,
|
|
// publish on LOCAL, verify message arrives on REMOTE via gateway.
|
|
await using var fixture = await TwoClusterFixture.StartAsync();
|
|
|
|
await using var subscriber = new NatsConnection(new NatsOpts
|
|
{
|
|
Url = $"nats://127.0.0.1:{fixture.Remote.Port}",
|
|
});
|
|
await subscriber.ConnectAsync();
|
|
|
|
await using var publisher = new NatsConnection(new NatsOpts
|
|
{
|
|
Url = $"nats://127.0.0.1:{fixture.Local.Port}",
|
|
});
|
|
await publisher.ConnectAsync();
|
|
|
|
await using var sub = await subscriber.SubscribeCoreAsync<string>("gw.test");
|
|
await subscriber.PingAsync();
|
|
|
|
// Wait for remote interest to propagate through gateway
|
|
await fixture.WaitForRemoteInterestOnLocalAsync("gw.test");
|
|
|
|
await publisher.PublishAsync("gw.test", "hello-from-local");
|
|
|
|
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
var msg = await sub.Msgs.ReadAsync(timeout.Token);
|
|
msg.Data.ShouldBe("hello-from-local");
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Gateway_does_not_echo_back_to_origin()
|
|
{
|
|
// Reference: TestGatewayDoesntSendBackToItself (gateway_test.go:2150)
|
|
// Subscribe on REMOTE and LOCAL, publish on LOCAL. Expect exactly 2
|
|
// deliveries (one local, one via gateway to REMOTE) — no echo cycle.
|
|
await using var fixture = await TwoClusterFixture.StartAsync();
|
|
|
|
await using var remoteConn = new NatsConnection(new NatsOpts
|
|
{
|
|
Url = $"nats://127.0.0.1:{fixture.Remote.Port}",
|
|
});
|
|
await remoteConn.ConnectAsync();
|
|
|
|
await using var localConn = new NatsConnection(new NatsOpts
|
|
{
|
|
Url = $"nats://127.0.0.1:{fixture.Local.Port}",
|
|
});
|
|
await localConn.ConnectAsync();
|
|
|
|
await using var remoteSub = await remoteConn.SubscribeCoreAsync<string>("foo");
|
|
await remoteConn.PingAsync();
|
|
|
|
await using var localSub = await localConn.SubscribeCoreAsync<string>("foo");
|
|
await localConn.PingAsync();
|
|
|
|
// Wait for remote interest to propagate through gateway
|
|
await fixture.WaitForRemoteInterestOnLocalAsync("foo");
|
|
|
|
await localConn.PublishAsync("foo", "cycle");
|
|
await localConn.PingAsync();
|
|
|
|
// Should receive exactly 2 messages: one on local sub, one on remote sub.
|
|
// If there is a cycle, we'd see many more after a short delay.
|
|
using var receiveTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
|
|
var localMsg = await localSub.Msgs.ReadAsync(receiveTimeout.Token);
|
|
localMsg.Data.ShouldBe("cycle");
|
|
|
|
var remoteMsg = await remoteSub.Msgs.ReadAsync(receiveTimeout.Token);
|
|
remoteMsg.Data.ShouldBe("cycle");
|
|
|
|
// Wait a bit to see if any echo/cycle messages arrive
|
|
await Task.Delay(TimeSpan.FromMilliseconds(200));
|
|
|
|
// Try to read more — should time out because there should be no more messages
|
|
using var noMoreTimeout = new CancellationTokenSource(TimeSpan.FromMilliseconds(300));
|
|
await Should.ThrowAsync<OperationCanceledException>(async () =>
|
|
await localSub.Msgs.ReadAsync(noMoreTimeout.Token));
|
|
|
|
using var noMoreTimeout2 = new CancellationTokenSource(TimeSpan.FromMilliseconds(300));
|
|
await Should.ThrowAsync<OperationCanceledException>(async () =>
|
|
await remoteSub.Msgs.ReadAsync(noMoreTimeout2.Token));
|
|
}
|
|
}
|
|
|
|
internal sealed class TwoClusterFixture : IAsyncDisposable
|
|
{
|
|
private readonly CancellationTokenSource _localCts;
|
|
private readonly CancellationTokenSource _remoteCts;
|
|
|
|
private TwoClusterFixture(NatsServer local, NatsServer remote, CancellationTokenSource localCts, CancellationTokenSource remoteCts)
|
|
{
|
|
Local = local;
|
|
Remote = remote;
|
|
_localCts = localCts;
|
|
_remoteCts = remoteCts;
|
|
}
|
|
|
|
public NatsServer Local { get; }
|
|
public NatsServer Remote { get; }
|
|
|
|
public static async Task<TwoClusterFixture> StartAsync()
|
|
{
|
|
var localOptions = new NatsOptions
|
|
{
|
|
Host = "127.0.0.1",
|
|
Port = 0,
|
|
Gateway = new GatewayOptions
|
|
{
|
|
Name = "LOCAL",
|
|
Host = "127.0.0.1",
|
|
Port = 0,
|
|
},
|
|
};
|
|
|
|
var local = new NatsServer(localOptions, NullLoggerFactory.Instance);
|
|
var localCts = new CancellationTokenSource();
|
|
_ = local.StartAsync(localCts.Token);
|
|
await local.WaitForReadyAsync();
|
|
|
|
var remoteOptions = new NatsOptions
|
|
{
|
|
Host = "127.0.0.1",
|
|
Port = 0,
|
|
Gateway = new GatewayOptions
|
|
{
|
|
Name = "REMOTE",
|
|
Host = "127.0.0.1",
|
|
Port = 0,
|
|
Remotes = [local.GatewayListen!],
|
|
},
|
|
};
|
|
|
|
var remote = new NatsServer(remoteOptions, NullLoggerFactory.Instance);
|
|
var remoteCts = new CancellationTokenSource();
|
|
_ = remote.StartAsync(remoteCts.Token);
|
|
await remote.WaitForReadyAsync();
|
|
|
|
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
while (!timeout.IsCancellationRequested && (local.Stats.Gateways == 0 || remote.Stats.Gateways == 0))
|
|
await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
|
|
|
|
return new TwoClusterFixture(local, remote, localCts, remoteCts);
|
|
}
|
|
|
|
public async Task WaitForRemoteInterestOnLocalAsync(string subject)
|
|
{
|
|
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
while (!timeout.IsCancellationRequested)
|
|
{
|
|
if (Local.HasRemoteInterest(subject))
|
|
return;
|
|
|
|
await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
|
|
}
|
|
|
|
throw new TimeoutException($"Timed out waiting for remote interest on subject '{subject}'.");
|
|
}
|
|
|
|
public async ValueTask DisposeAsync()
|
|
{
|
|
await _localCts.CancelAsync();
|
|
await _remoteCts.CancelAsync();
|
|
Local.Dispose();
|
|
Remote.Dispose();
|
|
_localCts.Dispose();
|
|
_remoteCts.Dispose();
|
|
}
|
|
}
|