fix: resolve slopwatch issues — add logging to empty catches and eliminate test timing delays
Replace empty catch blocks with meaningful log statements in NatsServer, NatsClient, and Program. Add WaitForReadyAsync() to NatsServer for deterministic server startup. Replace Task.Delay/Thread.Sleep in tests with PING/PONG protocol flush and SubscribeCoreAsync for reliable subscription synchronization.
This commit is contained in:
7
.slopwatch/baseline.json
Normal file
7
.slopwatch/baseline.json
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
{
|
||||||
|
"version": 1,
|
||||||
|
"createdAt": "2026-02-23T02:13:34.57845+00:00",
|
||||||
|
"updatedAt": "2026-02-23T02:13:34.57845+00:00",
|
||||||
|
"description": "Initial baseline created by 'slopwatch init' on 2026-02-23 02:13:34 UTC",
|
||||||
|
"entries": []
|
||||||
|
}
|
||||||
@@ -40,7 +40,10 @@ try
|
|||||||
{
|
{
|
||||||
await server.StartAsync(cts.Token);
|
await server.StartAsync(cts.Token);
|
||||||
}
|
}
|
||||||
catch (OperationCanceledException) { }
|
catch (OperationCanceledException)
|
||||||
|
{
|
||||||
|
Log.Information("Server shutdown requested");
|
||||||
|
}
|
||||||
finally
|
finally
|
||||||
{
|
{
|
||||||
Log.CloseAndFlush();
|
Log.CloseAndFlush();
|
||||||
|
|||||||
@@ -70,7 +70,10 @@ public sealed class NatsClient : IDisposable
|
|||||||
|
|
||||||
await Task.WhenAny(fillTask, processTask);
|
await Task.WhenAny(fillTask, processTask);
|
||||||
}
|
}
|
||||||
catch (OperationCanceledException) { }
|
catch (OperationCanceledException)
|
||||||
|
{
|
||||||
|
_logger.LogDebug("Client {ClientId} operation cancelled", Id);
|
||||||
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
_logger.LogDebug(ex, "Client {ClientId} connection error", Id);
|
_logger.LogDebug(ex, "Client {ClientId} connection error", Id);
|
||||||
|
|||||||
@@ -15,11 +15,14 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
private readonly ServerInfo _serverInfo;
|
private readonly ServerInfo _serverInfo;
|
||||||
private readonly ILogger<NatsServer> _logger;
|
private readonly ILogger<NatsServer> _logger;
|
||||||
private readonly ILoggerFactory _loggerFactory;
|
private readonly ILoggerFactory _loggerFactory;
|
||||||
|
private readonly TaskCompletionSource _listeningStarted = new(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||||
private Socket? _listener;
|
private Socket? _listener;
|
||||||
private ulong _nextClientId;
|
private ulong _nextClientId;
|
||||||
|
|
||||||
public SubList SubList => _subList;
|
public SubList SubList => _subList;
|
||||||
|
|
||||||
|
public Task WaitForReadyAsync() => _listeningStarted.Task;
|
||||||
|
|
||||||
public NatsServer(NatsOptions options, ILoggerFactory loggerFactory)
|
public NatsServer(NatsOptions options, ILoggerFactory loggerFactory)
|
||||||
{
|
{
|
||||||
_options = options;
|
_options = options;
|
||||||
@@ -44,6 +47,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
_options.Host == "0.0.0.0" ? IPAddress.Any : IPAddress.Parse(_options.Host),
|
_options.Host == "0.0.0.0" ? IPAddress.Any : IPAddress.Parse(_options.Host),
|
||||||
_options.Port));
|
_options.Port));
|
||||||
_listener.Listen(128);
|
_listener.Listen(128);
|
||||||
|
_listeningStarted.TrySetResult();
|
||||||
|
|
||||||
_logger.LogInformation("Listening on {Host}:{Port}", _options.Host, _options.Port);
|
_logger.LogInformation("Listening on {Host}:{Port}", _options.Host, _options.Port);
|
||||||
|
|
||||||
@@ -64,7 +68,10 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
_ = RunClientAsync(client, ct);
|
_ = RunClientAsync(client, ct);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (OperationCanceledException) { }
|
catch (OperationCanceledException)
|
||||||
|
{
|
||||||
|
_logger.LogDebug("Accept loop cancelled, server shutting down");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task RunClientAsync(NatsClient client, CancellationToken ct)
|
private async Task RunClientAsync(NatsClient client, CancellationToken ct)
|
||||||
@@ -73,9 +80,9 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
{
|
{
|
||||||
await client.RunAsync(ct);
|
await client.RunAsync(ct);
|
||||||
}
|
}
|
||||||
catch (Exception)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
// Client disconnected or errored
|
_logger.LogDebug(ex, "Client {ClientId} disconnected with error", client.Id);
|
||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -7,22 +7,26 @@ using NATS.Server;
|
|||||||
|
|
||||||
namespace NATS.Server.Tests;
|
namespace NATS.Server.Tests;
|
||||||
|
|
||||||
public class IntegrationTests : IAsyncDisposable
|
public class IntegrationTests : IAsyncLifetime
|
||||||
{
|
{
|
||||||
private readonly NatsServer _server;
|
private readonly NatsServer _server;
|
||||||
private readonly int _port;
|
private readonly int _port;
|
||||||
private readonly CancellationTokenSource _cts = new();
|
private readonly CancellationTokenSource _cts = new();
|
||||||
private readonly Task _serverTask;
|
private Task _serverTask = null!;
|
||||||
|
|
||||||
public IntegrationTests()
|
public IntegrationTests()
|
||||||
{
|
{
|
||||||
_port = GetFreePort();
|
_port = GetFreePort();
|
||||||
_server = new NatsServer(new NatsOptions { Port = _port }, NullLoggerFactory.Instance);
|
_server = new NatsServer(new NatsOptions { Port = _port }, NullLoggerFactory.Instance);
|
||||||
_serverTask = _server.StartAsync(_cts.Token);
|
|
||||||
Thread.Sleep(200); // Let server start
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public async ValueTask DisposeAsync()
|
public async Task InitializeAsync()
|
||||||
|
{
|
||||||
|
_serverTask = _server.StartAsync(_cts.Token);
|
||||||
|
await _server.WaitForReadyAsync();
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task DisposeAsync()
|
||||||
{
|
{
|
||||||
await _cts.CancelAsync();
|
await _cts.CancelAsync();
|
||||||
_server.Dispose();
|
_server.Dispose();
|
||||||
@@ -50,23 +54,14 @@ public class IntegrationTests : IAsyncDisposable
|
|||||||
await pub.ConnectAsync();
|
await pub.ConnectAsync();
|
||||||
await sub.ConnectAsync();
|
await sub.ConnectAsync();
|
||||||
|
|
||||||
var received = new TaskCompletionSource<string>();
|
await using var subscription = await sub.SubscribeCoreAsync<string>("test.subject");
|
||||||
|
await sub.PingAsync();
|
||||||
var subscription = Task.Run(async () =>
|
|
||||||
{
|
|
||||||
await foreach (var msg in sub.SubscribeAsync<string>("test.subject"))
|
|
||||||
{
|
|
||||||
received.TrySetResult(msg.Data!);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
await Task.Delay(100); // let subscription register
|
|
||||||
|
|
||||||
await pub.PublishAsync("test.subject", "Hello NATS!");
|
await pub.PublishAsync("test.subject", "Hello NATS!");
|
||||||
|
|
||||||
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||||
result.ShouldBe("Hello NATS!");
|
var msg = await subscription.Msgs.ReadAsync(timeout.Token);
|
||||||
|
msg.Data.ShouldBe("Hello NATS!");
|
||||||
}
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
@@ -78,22 +73,14 @@ public class IntegrationTests : IAsyncDisposable
|
|||||||
await pub.ConnectAsync();
|
await pub.ConnectAsync();
|
||||||
await sub.ConnectAsync();
|
await sub.ConnectAsync();
|
||||||
|
|
||||||
var received = new TaskCompletionSource<string>();
|
await using var subscription = await sub.SubscribeCoreAsync<string>("test.*");
|
||||||
|
await sub.PingAsync();
|
||||||
|
|
||||||
var subscription = Task.Run(async () =>
|
|
||||||
{
|
|
||||||
await foreach (var msg in sub.SubscribeAsync<string>("test.*"))
|
|
||||||
{
|
|
||||||
received.TrySetResult(msg.Subject);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
await Task.Delay(100);
|
|
||||||
await pub.PublishAsync("test.hello", "data");
|
await pub.PublishAsync("test.hello", "data");
|
||||||
|
|
||||||
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||||
result.ShouldBe("test.hello");
|
var msg = await subscription.Msgs.ReadAsync(timeout.Token);
|
||||||
|
msg.Subject.ShouldBe("test.hello");
|
||||||
}
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
@@ -105,22 +92,14 @@ public class IntegrationTests : IAsyncDisposable
|
|||||||
await pub.ConnectAsync();
|
await pub.ConnectAsync();
|
||||||
await sub.ConnectAsync();
|
await sub.ConnectAsync();
|
||||||
|
|
||||||
var received = new TaskCompletionSource<string>();
|
await using var subscription = await sub.SubscribeCoreAsync<string>("test.>");
|
||||||
|
await sub.PingAsync();
|
||||||
|
|
||||||
var subscription = Task.Run(async () =>
|
|
||||||
{
|
|
||||||
await foreach (var msg in sub.SubscribeAsync<string>("test.>"))
|
|
||||||
{
|
|
||||||
received.TrySetResult(msg.Subject);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
await Task.Delay(100);
|
|
||||||
await pub.PublishAsync("test.foo.bar.baz", "data");
|
await pub.PublishAsync("test.foo.bar.baz", "data");
|
||||||
|
|
||||||
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||||
result.ShouldBe("test.foo.bar.baz");
|
var msg = await subscription.Msgs.ReadAsync(timeout.Token);
|
||||||
|
msg.Subject.ShouldBe("test.foo.bar.baz");
|
||||||
}
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
@@ -134,38 +113,18 @@ public class IntegrationTests : IAsyncDisposable
|
|||||||
await sub1.ConnectAsync();
|
await sub1.ConnectAsync();
|
||||||
await sub2.ConnectAsync();
|
await sub2.ConnectAsync();
|
||||||
|
|
||||||
var count1 = 0;
|
await using var s1 = await sub1.SubscribeCoreAsync<string>("fanout");
|
||||||
var count2 = 0;
|
await using var s2 = await sub2.SubscribeCoreAsync<string>("fanout");
|
||||||
var done = new TaskCompletionSource();
|
await sub1.PingAsync();
|
||||||
|
await sub2.PingAsync();
|
||||||
|
|
||||||
var s1 = Task.Run(async () =>
|
|
||||||
{
|
|
||||||
await foreach (var msg in sub1.SubscribeAsync<string>("fanout"))
|
|
||||||
{
|
|
||||||
Interlocked.Increment(ref count1);
|
|
||||||
if (Volatile.Read(ref count1) + Volatile.Read(ref count2) >= 2)
|
|
||||||
done.TrySetResult();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
var s2 = Task.Run(async () =>
|
|
||||||
{
|
|
||||||
await foreach (var msg in sub2.SubscribeAsync<string>("fanout"))
|
|
||||||
{
|
|
||||||
Interlocked.Increment(ref count2);
|
|
||||||
if (Volatile.Read(ref count1) + Volatile.Read(ref count2) >= 2)
|
|
||||||
done.TrySetResult();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
await Task.Delay(100);
|
|
||||||
await pub.PublishAsync("fanout", "hello");
|
await pub.PublishAsync("fanout", "hello");
|
||||||
|
|
||||||
await done.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||||
count1.ShouldBe(1);
|
var msg1 = await s1.Msgs.ReadAsync(timeout.Token);
|
||||||
count2.ShouldBe(1);
|
var msg2 = await s2.Msgs.ReadAsync(timeout.Token);
|
||||||
|
msg1.Data.ShouldBe("hello");
|
||||||
|
msg2.Data.ShouldBe("hello");
|
||||||
}
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ using NATS.Server;
|
|||||||
|
|
||||||
namespace NATS.Server.Tests;
|
namespace NATS.Server.Tests;
|
||||||
|
|
||||||
public class ServerTests : IAsyncDisposable
|
public class ServerTests : IAsyncLifetime
|
||||||
{
|
{
|
||||||
private readonly NatsServer _server;
|
private readonly NatsServer _server;
|
||||||
private readonly int _port;
|
private readonly int _port;
|
||||||
@@ -19,7 +19,13 @@ public class ServerTests : IAsyncDisposable
|
|||||||
_server = new NatsServer(new NatsOptions { Port = _port }, NullLoggerFactory.Instance);
|
_server = new NatsServer(new NatsOptions { Port = _port }, NullLoggerFactory.Instance);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async ValueTask DisposeAsync()
|
public async Task InitializeAsync()
|
||||||
|
{
|
||||||
|
_ = _server.StartAsync(_cts.Token);
|
||||||
|
await _server.WaitForReadyAsync();
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task DisposeAsync()
|
||||||
{
|
{
|
||||||
await _cts.CancelAsync();
|
await _cts.CancelAsync();
|
||||||
_server.Dispose();
|
_server.Dispose();
|
||||||
@@ -46,25 +52,35 @@ public class ServerTests : IAsyncDisposable
|
|||||||
return Encoding.ASCII.GetString(buf, 0, n);
|
return Encoding.ASCII.GetString(buf, 0, n);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Reads from a socket until the accumulated data contains the expected substring.
|
||||||
|
/// </summary>
|
||||||
|
private static async Task<string> ReadUntilAsync(Socket sock, string expected, int timeoutMs = 5000)
|
||||||
|
{
|
||||||
|
using var cts = new CancellationTokenSource(timeoutMs);
|
||||||
|
var sb = new StringBuilder();
|
||||||
|
var buf = new byte[4096];
|
||||||
|
while (!sb.ToString().Contains(expected))
|
||||||
|
{
|
||||||
|
var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token);
|
||||||
|
if (n == 0) break;
|
||||||
|
sb.Append(Encoding.ASCII.GetString(buf, 0, n));
|
||||||
|
}
|
||||||
|
return sb.ToString();
|
||||||
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
public async Task Server_accepts_connection_and_sends_INFO()
|
public async Task Server_accepts_connection_and_sends_INFO()
|
||||||
{
|
{
|
||||||
var serverTask = _server.StartAsync(_cts.Token);
|
|
||||||
await Task.Delay(100); // let server start
|
|
||||||
|
|
||||||
using var client = await ConnectClientAsync();
|
using var client = await ConnectClientAsync();
|
||||||
var response = await ReadLineAsync(client);
|
var response = await ReadLineAsync(client);
|
||||||
|
|
||||||
response.ShouldStartWith("INFO ");
|
response.ShouldStartWith("INFO ");
|
||||||
await _cts.CancelAsync();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
public async Task Server_basic_pubsub()
|
public async Task Server_basic_pubsub()
|
||||||
{
|
{
|
||||||
var serverTask = _server.StartAsync(_cts.Token);
|
|
||||||
await Task.Delay(100);
|
|
||||||
|
|
||||||
using var pub = await ConnectClientAsync();
|
using var pub = await ConnectClientAsync();
|
||||||
using var sub = await ConnectClientAsync();
|
using var sub = await ConnectClientAsync();
|
||||||
|
|
||||||
@@ -72,46 +88,39 @@ public class ServerTests : IAsyncDisposable
|
|||||||
await ReadLineAsync(pub);
|
await ReadLineAsync(pub);
|
||||||
await ReadLineAsync(sub);
|
await ReadLineAsync(sub);
|
||||||
|
|
||||||
// CONNECT + SUB on subscriber
|
// CONNECT + SUB on subscriber, then PING to flush
|
||||||
await sub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nSUB foo 1\r\n"));
|
await sub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nSUB foo 1\r\nPING\r\n"));
|
||||||
await Task.Delay(50);
|
var pong = await ReadLineAsync(sub);
|
||||||
|
pong.ShouldContain("PONG");
|
||||||
|
|
||||||
// CONNECT + PUB on publisher
|
// CONNECT + PUB on publisher
|
||||||
await pub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPUB foo 5\r\nHello\r\n"));
|
await pub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPUB foo 5\r\nHello\r\n"));
|
||||||
await Task.Delay(100);
|
|
||||||
|
|
||||||
// Read MSG from subscriber
|
|
||||||
var buf = new byte[4096];
|
|
||||||
var n = await sub.ReceiveAsync(buf, SocketFlags.None);
|
|
||||||
var msg = Encoding.ASCII.GetString(buf, 0, n);
|
|
||||||
|
|
||||||
|
// Read MSG from subscriber (may arrive across multiple TCP segments)
|
||||||
|
var msg = await ReadUntilAsync(sub, "Hello\r\n");
|
||||||
msg.ShouldContain("MSG foo 1 5\r\nHello\r\n");
|
msg.ShouldContain("MSG foo 1 5\r\nHello\r\n");
|
||||||
await _cts.CancelAsync();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
public async Task Server_wildcard_matching()
|
public async Task Server_wildcard_matching()
|
||||||
{
|
{
|
||||||
var serverTask = _server.StartAsync(_cts.Token);
|
|
||||||
await Task.Delay(100);
|
|
||||||
|
|
||||||
using var pub = await ConnectClientAsync();
|
using var pub = await ConnectClientAsync();
|
||||||
using var sub = await ConnectClientAsync();
|
using var sub = await ConnectClientAsync();
|
||||||
|
|
||||||
await ReadLineAsync(pub);
|
await ReadLineAsync(pub);
|
||||||
await ReadLineAsync(sub);
|
await ReadLineAsync(sub);
|
||||||
|
|
||||||
await sub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nSUB foo.* 1\r\n"));
|
// CONNECT + SUB on subscriber, then PING to flush
|
||||||
await Task.Delay(50);
|
await sub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nSUB foo.* 1\r\nPING\r\n"));
|
||||||
|
var pong = await ReadLineAsync(sub);
|
||||||
|
pong.ShouldContain("PONG");
|
||||||
|
|
||||||
await pub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPUB foo.bar 5\r\nHello\r\n"));
|
await pub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPUB foo.bar 5\r\nHello\r\n"));
|
||||||
await Task.Delay(100);
|
|
||||||
|
|
||||||
var buf = new byte[4096];
|
var buf = new byte[4096];
|
||||||
var n = await sub.ReceiveAsync(buf, SocketFlags.None);
|
var n = await sub.ReceiveAsync(buf, SocketFlags.None);
|
||||||
var msg = Encoding.ASCII.GetString(buf, 0, n);
|
var msg = Encoding.ASCII.GetString(buf, 0, n);
|
||||||
|
|
||||||
msg.ShouldContain("MSG foo.bar 1 5\r\n");
|
msg.ShouldContain("MSG foo.bar 1 5\r\n");
|
||||||
await _cts.CancelAsync();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user