feat: add ephemeral port (port=0) support
This commit is contained in:
@@ -38,6 +38,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
public string ServerId => _serverInfo.ServerId;
|
public string ServerId => _serverInfo.ServerId;
|
||||||
public string ServerName => _serverInfo.ServerName;
|
public string ServerName => _serverInfo.ServerName;
|
||||||
public int ClientCount => _clients.Count;
|
public int ClientCount => _clients.Count;
|
||||||
|
public int Port => _options.Port;
|
||||||
public IEnumerable<NatsClient> GetClients() => _clients.Values;
|
public IEnumerable<NatsClient> GetClients() => _clients.Values;
|
||||||
|
|
||||||
public Task WaitForReadyAsync() => _listeningStarted.Task;
|
public Task WaitForReadyAsync() => _listeningStarted.Task;
|
||||||
@@ -82,6 +83,15 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
_options.Port));
|
_options.Port));
|
||||||
Interlocked.Exchange(ref _startTimeTicks, DateTime.UtcNow.Ticks);
|
Interlocked.Exchange(ref _startTimeTicks, DateTime.UtcNow.Ticks);
|
||||||
_listener.Listen(128);
|
_listener.Listen(128);
|
||||||
|
|
||||||
|
// Resolve ephemeral port if port=0
|
||||||
|
if (_options.Port == 0)
|
||||||
|
{
|
||||||
|
var actualPort = ((IPEndPoint)_listener.LocalEndPoint!).Port;
|
||||||
|
_options.Port = actualPort;
|
||||||
|
_serverInfo.Port = actualPort;
|
||||||
|
}
|
||||||
|
|
||||||
_listeningStarted.TrySetResult();
|
_listeningStarted.TrySetResult();
|
||||||
|
|
||||||
_logger.LogInformation("Listening on {Host}:{Port}", _options.Host, _options.Port);
|
_logger.LogInformation("Listening on {Host}:{Port}", _options.Host, _options.Port);
|
||||||
|
|||||||
@@ -213,6 +213,39 @@ public class ServerTests : IAsyncLifetime
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public class EphemeralPortTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public async Task Server_resolves_ephemeral_port()
|
||||||
|
{
|
||||||
|
using var cts = new CancellationTokenSource();
|
||||||
|
var server = new NatsServer(new NatsOptions { Port = 0 }, NullLoggerFactory.Instance);
|
||||||
|
|
||||||
|
_ = server.StartAsync(cts.Token);
|
||||||
|
await server.WaitForReadyAsync();
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
// Port should have been resolved to a real port
|
||||||
|
server.Port.ShouldBeGreaterThan(0);
|
||||||
|
|
||||||
|
// Connect a raw socket to prove the port actually works
|
||||||
|
using var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||||
|
await client.ConnectAsync(IPAddress.Loopback, server.Port);
|
||||||
|
|
||||||
|
var buf = new byte[4096];
|
||||||
|
var n = await client.ReceiveAsync(buf, SocketFlags.None);
|
||||||
|
var response = Encoding.ASCII.GetString(buf, 0, n);
|
||||||
|
response.ShouldStartWith("INFO ");
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
await cts.CancelAsync();
|
||||||
|
server.Dispose();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public class MaxConnectionsTests : IAsyncLifetime
|
public class MaxConnectionsTests : IAsyncLifetime
|
||||||
{
|
{
|
||||||
private readonly NatsServer _server;
|
private readonly NatsServer _server;
|
||||||
@@ -423,3 +456,72 @@ public class PingKeepaliveTests : IAsyncLifetime
|
|||||||
client.Dispose();
|
client.Dispose();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public class CloseReasonTests : IAsyncLifetime
|
||||||
|
{
|
||||||
|
private readonly NatsServer _server;
|
||||||
|
private readonly int _port;
|
||||||
|
private readonly CancellationTokenSource _cts = new();
|
||||||
|
|
||||||
|
public CloseReasonTests()
|
||||||
|
{
|
||||||
|
_port = GetFreePort();
|
||||||
|
_server = new NatsServer(new NatsOptions { Port = _port }, NullLoggerFactory.Instance);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task InitializeAsync()
|
||||||
|
{
|
||||||
|
_ = _server.StartAsync(_cts.Token);
|
||||||
|
await _server.WaitForReadyAsync();
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task DisposeAsync()
|
||||||
|
{
|
||||||
|
await _cts.CancelAsync();
|
||||||
|
_server.Dispose();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int GetFreePort()
|
||||||
|
{
|
||||||
|
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||||
|
sock.Bind(new IPEndPoint(IPAddress.Loopback, 0));
|
||||||
|
return ((IPEndPoint)sock.LocalEndPoint!).Port;
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Client_close_reason_set_on_normal_disconnect()
|
||||||
|
{
|
||||||
|
// Connect a raw TCP client
|
||||||
|
using var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||||
|
await client.ConnectAsync(IPAddress.Loopback, _port);
|
||||||
|
|
||||||
|
// Read INFO
|
||||||
|
var buf = new byte[4096];
|
||||||
|
await client.ReceiveAsync(buf, SocketFlags.None);
|
||||||
|
|
||||||
|
// Send CONNECT + PING, wait for PONG
|
||||||
|
await client.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPING\r\n"));
|
||||||
|
var sb = new StringBuilder();
|
||||||
|
using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||||
|
while (!sb.ToString().Contains("PONG"))
|
||||||
|
{
|
||||||
|
var n = await client.ReceiveAsync(buf, SocketFlags.None, readCts.Token);
|
||||||
|
if (n == 0) break;
|
||||||
|
sb.Append(Encoding.ASCII.GetString(buf, 0, n));
|
||||||
|
}
|
||||||
|
sb.ToString().ShouldContain("PONG");
|
||||||
|
|
||||||
|
// Get the NatsClient from the server
|
||||||
|
var natsClient = _server.GetClients().First();
|
||||||
|
|
||||||
|
// Close the TCP socket (normal client disconnect)
|
||||||
|
client.Shutdown(SocketShutdown.Both);
|
||||||
|
client.Close();
|
||||||
|
|
||||||
|
// Wait for the server to detect the disconnect
|
||||||
|
await Task.Delay(500);
|
||||||
|
|
||||||
|
// The close reason should be ClientClosed (normal disconnect falls through to finally)
|
||||||
|
natsClient.CloseReason.ShouldBe(ClosedState.ClientClosed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user