diff --git a/.slopwatch/baseline.json b/.slopwatch/baseline.json new file mode 100644 index 0000000..913c18b --- /dev/null +++ b/.slopwatch/baseline.json @@ -0,0 +1,7 @@ +{ + "version": 1, + "createdAt": "2026-02-23T02:13:34.57845+00:00", + "updatedAt": "2026-02-23T02:13:34.57845+00:00", + "description": "Initial baseline created by 'slopwatch init' on 2026-02-23 02:13:34 UTC", + "entries": [] +} \ No newline at end of file diff --git a/src/NATS.Server.Host/Program.cs b/src/NATS.Server.Host/Program.cs index b8ffacd..4c30e50 100644 --- a/src/NATS.Server.Host/Program.cs +++ b/src/NATS.Server.Host/Program.cs @@ -40,7 +40,10 @@ try { await server.StartAsync(cts.Token); } -catch (OperationCanceledException) { } +catch (OperationCanceledException) +{ + Log.Information("Server shutdown requested"); +} finally { Log.CloseAndFlush(); diff --git a/src/NATS.Server/NatsClient.cs b/src/NATS.Server/NatsClient.cs index ebc2e2f..21df9ef 100644 --- a/src/NATS.Server/NatsClient.cs +++ b/src/NATS.Server/NatsClient.cs @@ -70,7 +70,10 @@ public sealed class NatsClient : IDisposable await Task.WhenAny(fillTask, processTask); } - catch (OperationCanceledException) { } + catch (OperationCanceledException) + { + _logger.LogDebug("Client {ClientId} operation cancelled", Id); + } catch (Exception ex) { _logger.LogDebug(ex, "Client {ClientId} connection error", Id); diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index fafd641..7e63146 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -15,11 +15,14 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable private readonly ServerInfo _serverInfo; private readonly ILogger _logger; private readonly ILoggerFactory _loggerFactory; + private readonly TaskCompletionSource _listeningStarted = new(TaskCreationOptions.RunContinuationsAsynchronously); private Socket? _listener; private ulong _nextClientId; public SubList SubList => _subList; + public Task WaitForReadyAsync() => _listeningStarted.Task; + public NatsServer(NatsOptions options, ILoggerFactory loggerFactory) { _options = options; @@ -44,6 +47,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _options.Host == "0.0.0.0" ? IPAddress.Any : IPAddress.Parse(_options.Host), _options.Port)); _listener.Listen(128); + _listeningStarted.TrySetResult(); _logger.LogInformation("Listening on {Host}:{Port}", _options.Host, _options.Port); @@ -64,7 +68,10 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _ = RunClientAsync(client, ct); } } - catch (OperationCanceledException) { } + catch (OperationCanceledException) + { + _logger.LogDebug("Accept loop cancelled, server shutting down"); + } } private async Task RunClientAsync(NatsClient client, CancellationToken ct) @@ -73,9 +80,9 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable { await client.RunAsync(ct); } - catch (Exception) + catch (Exception ex) { - // Client disconnected or errored + _logger.LogDebug(ex, "Client {ClientId} disconnected with error", client.Id); } finally { diff --git a/tests/NATS.Server.Tests/IntegrationTests.cs b/tests/NATS.Server.Tests/IntegrationTests.cs index debb8fd..0117ac9 100644 --- a/tests/NATS.Server.Tests/IntegrationTests.cs +++ b/tests/NATS.Server.Tests/IntegrationTests.cs @@ -7,22 +7,26 @@ using NATS.Server; namespace NATS.Server.Tests; -public class IntegrationTests : IAsyncDisposable +public class IntegrationTests : IAsyncLifetime { private readonly NatsServer _server; private readonly int _port; private readonly CancellationTokenSource _cts = new(); - private readonly Task _serverTask; + private Task _serverTask = null!; public IntegrationTests() { _port = GetFreePort(); _server = new NatsServer(new NatsOptions { Port = _port }, NullLoggerFactory.Instance); - _serverTask = _server.StartAsync(_cts.Token); - Thread.Sleep(200); // Let server start } - public async ValueTask DisposeAsync() + public async Task InitializeAsync() + { + _serverTask = _server.StartAsync(_cts.Token); + await _server.WaitForReadyAsync(); + } + + public async Task DisposeAsync() { await _cts.CancelAsync(); _server.Dispose(); @@ -50,23 +54,14 @@ public class IntegrationTests : IAsyncDisposable await pub.ConnectAsync(); await sub.ConnectAsync(); - var received = new TaskCompletionSource(); - - var subscription = Task.Run(async () => - { - await foreach (var msg in sub.SubscribeAsync("test.subject")) - { - received.TrySetResult(msg.Data!); - break; - } - }); - - await Task.Delay(100); // let subscription register + await using var subscription = await sub.SubscribeCoreAsync("test.subject"); + await sub.PingAsync(); await pub.PublishAsync("test.subject", "Hello NATS!"); - var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); - result.ShouldBe("Hello NATS!"); + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var msg = await subscription.Msgs.ReadAsync(timeout.Token); + msg.Data.ShouldBe("Hello NATS!"); } [Fact] @@ -78,22 +73,14 @@ public class IntegrationTests : IAsyncDisposable await pub.ConnectAsync(); await sub.ConnectAsync(); - var received = new TaskCompletionSource(); + await using var subscription = await sub.SubscribeCoreAsync("test.*"); + await sub.PingAsync(); - var subscription = Task.Run(async () => - { - await foreach (var msg in sub.SubscribeAsync("test.*")) - { - received.TrySetResult(msg.Subject); - break; - } - }); - - await Task.Delay(100); await pub.PublishAsync("test.hello", "data"); - var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); - result.ShouldBe("test.hello"); + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var msg = await subscription.Msgs.ReadAsync(timeout.Token); + msg.Subject.ShouldBe("test.hello"); } [Fact] @@ -105,22 +92,14 @@ public class IntegrationTests : IAsyncDisposable await pub.ConnectAsync(); await sub.ConnectAsync(); - var received = new TaskCompletionSource(); + await using var subscription = await sub.SubscribeCoreAsync("test.>"); + await sub.PingAsync(); - var subscription = Task.Run(async () => - { - await foreach (var msg in sub.SubscribeAsync("test.>")) - { - received.TrySetResult(msg.Subject); - break; - } - }); - - await Task.Delay(100); await pub.PublishAsync("test.foo.bar.baz", "data"); - var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); - result.ShouldBe("test.foo.bar.baz"); + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var msg = await subscription.Msgs.ReadAsync(timeout.Token); + msg.Subject.ShouldBe("test.foo.bar.baz"); } [Fact] @@ -134,38 +113,18 @@ public class IntegrationTests : IAsyncDisposable await sub1.ConnectAsync(); await sub2.ConnectAsync(); - var count1 = 0; - var count2 = 0; - var done = new TaskCompletionSource(); + await using var s1 = await sub1.SubscribeCoreAsync("fanout"); + await using var s2 = await sub2.SubscribeCoreAsync("fanout"); + await sub1.PingAsync(); + await sub2.PingAsync(); - var s1 = Task.Run(async () => - { - await foreach (var msg in sub1.SubscribeAsync("fanout")) - { - Interlocked.Increment(ref count1); - if (Volatile.Read(ref count1) + Volatile.Read(ref count2) >= 2) - done.TrySetResult(); - break; - } - }); - - var s2 = Task.Run(async () => - { - await foreach (var msg in sub2.SubscribeAsync("fanout")) - { - Interlocked.Increment(ref count2); - if (Volatile.Read(ref count1) + Volatile.Read(ref count2) >= 2) - done.TrySetResult(); - break; - } - }); - - await Task.Delay(100); await pub.PublishAsync("fanout", "hello"); - await done.Task.WaitAsync(TimeSpan.FromSeconds(5)); - count1.ShouldBe(1); - count2.ShouldBe(1); + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var msg1 = await s1.Msgs.ReadAsync(timeout.Token); + var msg2 = await s2.Msgs.ReadAsync(timeout.Token); + msg1.Data.ShouldBe("hello"); + msg2.Data.ShouldBe("hello"); } [Fact] diff --git a/tests/NATS.Server.Tests/ServerTests.cs b/tests/NATS.Server.Tests/ServerTests.cs index 7dd6560..dd0f7ca 100644 --- a/tests/NATS.Server.Tests/ServerTests.cs +++ b/tests/NATS.Server.Tests/ServerTests.cs @@ -6,7 +6,7 @@ using NATS.Server; namespace NATS.Server.Tests; -public class ServerTests : IAsyncDisposable +public class ServerTests : IAsyncLifetime { private readonly NatsServer _server; private readonly int _port; @@ -19,7 +19,13 @@ public class ServerTests : IAsyncDisposable _server = new NatsServer(new NatsOptions { Port = _port }, NullLoggerFactory.Instance); } - public async ValueTask DisposeAsync() + public async Task InitializeAsync() + { + _ = _server.StartAsync(_cts.Token); + await _server.WaitForReadyAsync(); + } + + public async Task DisposeAsync() { await _cts.CancelAsync(); _server.Dispose(); @@ -46,25 +52,35 @@ public class ServerTests : IAsyncDisposable return Encoding.ASCII.GetString(buf, 0, n); } + /// + /// Reads from a socket until the accumulated data contains the expected substring. + /// + private static async Task ReadUntilAsync(Socket sock, string expected, int timeoutMs = 5000) + { + using var cts = new CancellationTokenSource(timeoutMs); + var sb = new StringBuilder(); + var buf = new byte[4096]; + while (!sb.ToString().Contains(expected)) + { + var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token); + if (n == 0) break; + sb.Append(Encoding.ASCII.GetString(buf, 0, n)); + } + return sb.ToString(); + } + [Fact] public async Task Server_accepts_connection_and_sends_INFO() { - var serverTask = _server.StartAsync(_cts.Token); - await Task.Delay(100); // let server start - using var client = await ConnectClientAsync(); var response = await ReadLineAsync(client); response.ShouldStartWith("INFO "); - await _cts.CancelAsync(); } [Fact] public async Task Server_basic_pubsub() { - var serverTask = _server.StartAsync(_cts.Token); - await Task.Delay(100); - using var pub = await ConnectClientAsync(); using var sub = await ConnectClientAsync(); @@ -72,46 +88,39 @@ public class ServerTests : IAsyncDisposable await ReadLineAsync(pub); await ReadLineAsync(sub); - // CONNECT + SUB on subscriber - await sub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nSUB foo 1\r\n")); - await Task.Delay(50); + // CONNECT + SUB on subscriber, then PING to flush + await sub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nSUB foo 1\r\nPING\r\n")); + var pong = await ReadLineAsync(sub); + pong.ShouldContain("PONG"); // CONNECT + PUB on publisher await pub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPUB foo 5\r\nHello\r\n")); - await Task.Delay(100); - - // Read MSG from subscriber - var buf = new byte[4096]; - var n = await sub.ReceiveAsync(buf, SocketFlags.None); - var msg = Encoding.ASCII.GetString(buf, 0, n); + // Read MSG from subscriber (may arrive across multiple TCP segments) + var msg = await ReadUntilAsync(sub, "Hello\r\n"); msg.ShouldContain("MSG foo 1 5\r\nHello\r\n"); - await _cts.CancelAsync(); } [Fact] public async Task Server_wildcard_matching() { - var serverTask = _server.StartAsync(_cts.Token); - await Task.Delay(100); - using var pub = await ConnectClientAsync(); using var sub = await ConnectClientAsync(); await ReadLineAsync(pub); await ReadLineAsync(sub); - await sub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nSUB foo.* 1\r\n")); - await Task.Delay(50); + // CONNECT + SUB on subscriber, then PING to flush + await sub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nSUB foo.* 1\r\nPING\r\n")); + var pong = await ReadLineAsync(sub); + pong.ShouldContain("PONG"); await pub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPUB foo.bar 5\r\nHello\r\n")); - await Task.Delay(100); var buf = new byte[4096]; var n = await sub.ReceiveAsync(buf, SocketFlags.None); var msg = Encoding.ASCII.GetString(buf, 0, n); msg.ShouldContain("MSG foo.bar 1 5\r\n"); - await _cts.CancelAsync(); } }