diff --git a/src/NATS.Server/NatsClient.cs b/src/NATS.Server/NatsClient.cs index 21df9ef..44cbc56 100644 --- a/src/NATS.Server/NatsClient.cs +++ b/src/NATS.Server/NatsClient.cs @@ -29,6 +29,7 @@ public sealed class NatsClient : IDisposable private readonly ServerInfo _serverInfo; private readonly NatsParser _parser; private readonly SemaphoreSlim _writeLock = new(1, 1); + private CancellationTokenSource? _clientCts; private readonly Dictionary _subs = new(); private readonly ILogger _logger; @@ -58,15 +59,16 @@ public sealed class NatsClient : IDisposable public async Task RunAsync(CancellationToken ct) { + _clientCts = CancellationTokenSource.CreateLinkedTokenSource(ct); var pipe = new Pipe(); try { // Send INFO - await SendInfoAsync(ct); + await SendInfoAsync(_clientCts.Token); // Start read pump and command processing in parallel - var fillTask = FillPipeAsync(pipe.Writer, ct); - var processTask = ProcessCommandsAsync(pipe.Reader, ct); + var fillTask = FillPipeAsync(pipe.Writer, _clientCts.Token); + var processTask = ProcessCommandsAsync(pipe.Reader, _clientCts.Token); await Task.WhenAny(fillTask, processTask); } @@ -81,6 +83,14 @@ public sealed class NatsClient : IDisposable finally { Router?.RemoveClient(this); + try + { + _socket.Shutdown(SocketShutdown.Both); + } + catch (Exception ex) + { + _logger.LogDebug(ex, "Client {ClientId} socket shutdown error", Id); + } } } @@ -282,6 +292,36 @@ public sealed class NatsClient : IDisposable } } + public async Task SendErrAsync(string message) + { + var errLine = Encoding.ASCII.GetBytes($"-ERR '{message}'\r\n"); + try + { + await WriteAsync(errLine, _clientCts?.Token ?? CancellationToken.None); + } + catch (OperationCanceledException) + { + // Expected during shutdown + } + catch (IOException ex) + { + _logger.LogDebug(ex, "Client {ClientId} failed to send -ERR", Id); + } + catch (ObjectDisposedException ex) + { + _logger.LogDebug(ex, "Client {ClientId} failed to send -ERR (disposed)", Id); + } + } + + public async Task SendErrAndCloseAsync(string message) + { + await SendErrAsync(message); + if (_clientCts is { } cts) + await cts.CancelAsync(); + else + _socket.Close(); + } + public void RemoveAllSubscriptions(SubList subList) { foreach (var sub in _subs.Values) @@ -291,6 +331,7 @@ public sealed class NatsClient : IDisposable public void Dispose() { + _clientCts?.Dispose(); _stream.Dispose(); _socket.Dispose(); _writeLock.Dispose(); diff --git a/src/NATS.Server/Protocol/NatsProtocol.cs b/src/NATS.Server/Protocol/NatsProtocol.cs index 1d126bc..75ccde0 100644 --- a/src/NATS.Server/Protocol/NatsProtocol.cs +++ b/src/NATS.Server/Protocol/NatsProtocol.cs @@ -19,6 +19,13 @@ public static class NatsProtocol public static readonly byte[] MsgPrefix = "MSG "u8.ToArray(); public static readonly byte[] HmsgPrefix = "HMSG "u8.ToArray(); public static readonly byte[] ErrPrefix = "-ERR "u8.ToArray(); + + // Standard error messages (matching Go server) + public const string ErrMaxConnectionsExceeded = "maximum connections exceeded"; + public const string ErrStaleConnection = "Stale Connection"; + public const string ErrMaxPayloadViolation = "Maximum Payload Violation"; + public const string ErrInvalidPublishSubject = "Invalid Publish Subject"; + public const string ErrInvalidSubject = "Invalid Subject"; } public sealed class ServerInfo diff --git a/tests/NATS.Server.Tests/ClientTests.cs b/tests/NATS.Server.Tests/ClientTests.cs index 9c5540b..9e6b9a8 100644 --- a/tests/NATS.Server.Tests/ClientTests.cs +++ b/tests/NATS.Server.Tests/ClientTests.cs @@ -86,4 +86,48 @@ public class ClientTests : IAsyncDisposable await _cts.CancelAsync(); } + + [Fact] + public async Task Client_SendErrAsync_writes_correct_wire_format() + { + var runTask = _natsClient.RunAsync(_cts.Token); + + // Read INFO first + var buf = new byte[4096]; + using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + await _clientSocket.ReceiveAsync(buf, SocketFlags.None, readCts.Token); + + // Trigger SendErrAsync + await _natsClient.SendErrAsync("Invalid Subject"); + + var n = await _clientSocket.ReceiveAsync(buf, SocketFlags.None, readCts.Token); + var response = Encoding.ASCII.GetString(buf, 0, n); + + response.ShouldBe("-ERR 'Invalid Subject'\r\n"); + + await _cts.CancelAsync(); + } + + [Fact] + public async Task Client_SendErrAndCloseAsync_sends_error_then_disconnects() + { + var runTask = _natsClient.RunAsync(_cts.Token); + + // Read INFO first + var buf = new byte[4096]; + using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + await _clientSocket.ReceiveAsync(buf, SocketFlags.None, readCts.Token); + + // Trigger SendErrAndCloseAsync + await _natsClient.SendErrAndCloseAsync("maximum connections exceeded"); + + var n = await _clientSocket.ReceiveAsync(buf, SocketFlags.None, readCts.Token); + var response = Encoding.ASCII.GetString(buf, 0, n); + + response.ShouldBe("-ERR 'maximum connections exceeded'\r\n"); + + // Connection should be closed — next read returns 0 + n = await _clientSocket.ReceiveAsync(buf, SocketFlags.None, readCts.Token); + n.ShouldBe(0); + } }